diff --git a/tests/apitests/python/library/artifact.py b/tests/apitests/python/library/artifact.py
index fea3a98989..ee2fc88689 100644
--- a/tests/apitests/python/library/artifact.py
+++ b/tests/apitests/python/library/artifact.py
@@ -118,7 +118,7 @@ class Artifact(base.Base, object):
             None: False,
         }.get(artifact, True)
 
-    def waiting_for_reference_exist(self, project_name, repo_name, reference, ignore_not_found = True, period = 60, loop_count = 8, **kwargs):
+    def waiting_for_reference_exist(self, project_name, repo_name, reference, ignore_not_found = True, period = 60, loop_count = 18, **kwargs):
         _loop_count = loop_count
         while True:
             print("Waiting for reference {} round...".format(_loop_count))
diff --git a/tests/apitests/python/library/base.py b/tests/apitests/python/library/base.py
index 1aec6732f0..89f07cc0ee 100644
--- a/tests/apitests/python/library/base.py
+++ b/tests/apitests/python/library/base.py
@@ -28,7 +28,7 @@ def get_endpoint():
 
 def _create_client(server, credential, debug, api_type="products"):
     cfg = None
-    if api_type in ('projectv2', 'artifact', 'repository', 'scan'):
+    if api_type in ('projectv2', 'artifact', 'repository', 'scan', 'preheat'):
         cfg = v2_swagger_client.Configuration()
     else:
         cfg = swagger_client.Configuration()
@@ -55,6 +55,7 @@ def _create_client(server, credential, debug, api_type="products"):
         "products":   swagger_client.ProductsApi(swagger_client.ApiClient(cfg)),
         "projectv2":  v2_swagger_client.ProjectApi(v2_swagger_client.ApiClient(cfg)),
         "artifact":   v2_swagger_client.ArtifactApi(v2_swagger_client.ApiClient(cfg)),
+        "preheat":   v2_swagger_client.PreheatApi(v2_swagger_client.ApiClient(cfg)),
         "repository": v2_swagger_client.RepositoryApi(v2_swagger_client.ApiClient(cfg)),
         "scan": v2_swagger_client.ScanApi(v2_swagger_client.ApiClient(cfg)),
         "scanner": swagger_client.ScannersApi(swagger_client.ApiClient(cfg)),
diff --git a/tests/apitests/python/library/preheat.py b/tests/apitests/python/library/preheat.py
new file mode 100644
index 0000000000..93ec3a9d82
--- /dev/null
+++ b/tests/apitests/python/library/preheat.py
@@ -0,0 +1,75 @@
+# -*- coding: utf-8 -*-
+
+import time
+import base
+import v2_swagger_client
+from v2_swagger_client.rest import ApiException
+
+class Preheat(base.Base, object):
+    def __init__(self):
+        super(Preheat,self).__init__(api_type = "preheat")
+
+    def create_instance(self, name = None, description="It's a dragonfly instance", vendor="dragonfly",
+                        endpoint_url="http://20.32.244.16", auth_mode="NONE", enabled=True, insecure=True,
+                        expect_status_code = 201, expect_response_body = None, **kwargs):
+        if name is None:
+            name = base._random_name("instance")
+        client = self._get_client(**kwargs)
+        instance = v2_swagger_client.Instance(name=name, description=description,vendor=vendor,
+            endpoint=endpoint_url, auth_mode=auth_mode, enabled=enabled)
+        print("instance:",instance)
+        try:
+            _, status_code, header = client.create_instance_with_http_info(instance)
+        except ApiException as e:
+            base._assert_status_code(expect_status_code, e.status)
+            if expect_response_body is not None:
+                base._assert_status_body(expect_response_body, e.body)
+            return
+        base._assert_status_code(expect_status_code, status_code)
+        base._assert_status_code(201, status_code)
+        return base._get_id_from_header(header), name
+
+    def create_policy(self, project_name, project_id, provider_id, name = None, description="It's a dragonfly policy",
+                        filters=r'[{"type":"repository","value":"re*"},{"type":"tag","value":"v1.0*"}]', trigger=r'{"type":"manual","trigger_setting":{"cron":""}}', enabled=True,
+                        expect_status_code = 201, expect_response_body = None, **kwargs):
+        if name is None:
+            name = base._random_name("policy")
+        client = self._get_client(**kwargs)
+        policy = v2_swagger_client.PreheatPolicy(name=name, project_id=project_id, provider_id=provider_id,
+                                                   description=description,filters=filters,
+                                                   trigger=trigger, enabled=enabled)
+        print("policy:",policy)
+        try:
+            data, status_code, header = client.create_policy_with_http_info(project_name, policy)
+        except ApiException as e:
+            base._assert_status_code(expect_status_code, e.status)
+            if expect_response_body is not None:
+                base._assert_status_body(expect_response_body, e.body)
+            return
+        base._assert_status_code(expect_status_code, status_code)
+        base._assert_status_code(201, status_code)
+        return base._get_id_from_header(header), name
+
+    def get_instance(self, instance_name, **kwargs):
+        client = self._get_client(**kwargs)
+        return client.get_instance(instance_name)
+
+    def get_policy(self, project_name, preheat_policy_name, **kwargs):
+        client = self._get_client(**kwargs)
+        return client.get_policy(project_name, preheat_policy_name)
+
+    def update_policy(self, project_name, preheat_policy_name, policy, **kwargs):
+        client = self._get_client(**kwargs)
+        return client.update_policy(project_name, preheat_policy_name, policy)
+
+    def delete_instance(self, preheat_instance_name, expect_status_code = 200, expect_response_body = None, **kwargs):
+        client = self._get_client(**kwargs)
+        try:
+            _, status_code, header = _, status_code, _ = client.delete_instance_with_http_info(preheat_instance_name)
+        except ApiException as e:
+            base._assert_status_code(expect_status_code, e.status)
+            if expect_response_body is not None:
+                base._assert_status_body(expect_response_body, e.body)
+        else:
+            base._assert_status_code(expect_status_code, status_code)
+            base._assert_status_code(200, status_code)
diff --git a/tests/apitests/python/test_p2p.py b/tests/apitests/python/test_p2p.py
new file mode 100644
index 0000000000..c11b7ba206
--- /dev/null
+++ b/tests/apitests/python/test_p2p.py
@@ -0,0 +1,103 @@
+from __future__ import absolute_import
+
+
+import unittest
+import urllib
+import sys
+
+from testutils import ADMIN_CLIENT, TEARDOWN, harbor_server, suppress_urllib3_warning
+from library.base import _random_name
+from library.base import _assert_status_code
+from library.project import Project
+from library.user import User
+from library.repository import Repository
+from library.repository import push_image_to_project
+from library.registry import Registry
+from library.repository import pull_harbor_image
+from library.artifact import Artifact
+from library.preheat import Preheat
+import library.containerd
+import v2_swagger_client
+
+class TestP2P(unittest.TestCase):
+    @suppress_urllib3_warning
+    def setUp(self):
+        self.url = ADMIN_CLIENT["endpoint"]
+        self.user_password = "Aa123456"
+        self.project= Project()
+        self.user= User()
+        self.repo= Repository()
+        self.registry = Registry()
+        self.artifact = Artifact()
+        self.preheat = Preheat()
+
+    @unittest.skipIf(TEARDOWN == False, "Test data won't be erased.")
+    def tearDown(self):
+        print("Case completed")
+
+    def do_validate(self, registry_type):
+        """
+        Test case:
+            Proxy Cache Image From Harbor
+        Test step and expected result:
+            1. Create a new registry;
+            2. Create a new project;
+            3. Add a new user as a member of project;
+            4. Pull image from this project by docker CLI;
+            5. Pull image from this project by ctr CLI;
+            6. Pull manifest index from this project by docker CLI;
+            7. Pull manifest from this project by ctr CLI;
+            8. Image pulled by docker CLI should be cached;
+            9. Image pulled by ctr CLI should be cached;
+            10. Manifest index pulled by docker CLI should be cached;
+            11. Manifest index pulled by ctr CLI should be cached;
+        Tear down:
+            1. Delete project(PA);
+            2. Delete user(UA).
+        """
+        user_id, user_name = self.user.create_user(user_password = self.user_password, **ADMIN_CLIENT)
+        USER_CLIENT=dict(with_signature = True, endpoint = self.url, username = user_name, password = self.user_password)
+
+        #2. Create a new distribution instance;
+        instance_id, instance_name = self.preheat.create_instance( **ADMIN_CLIENT)
+
+        #This need to be removed once issue #13378 fixed.
+        instance = self.preheat.get_instance(instance_name)
+        print("instance:", instance)
+
+        #2. Create a new project;
+        project_id, project_name = self.project.create_project(metadata = {"public": "false"}, **USER_CLIENT)
+        print("project_id:",project_id)
+        print("project_name:",project_name)
+
+        #This need to be removed once issue #13378 fixed.
+        policy_id, policy_name = self.preheat.create_policy(project_name, project_id, instance.id, **USER_CLIENT)
+        #policy_id, _ = self.preheat.create_policy(project_name, project_id, instance_id, **USER_CLIENT)
+        policy = self.preheat.get_policy(project_name, policy_name)
+        print("policy:", policy)
+
+        policy_new = v2_swagger_client.PreheatPolicy(id = policy.id, name="policy_new_name", project_id=project_id, provider_id=instance.id,
+                                    description="edit this policy",filters=r'[{"type":"repository","value":"zgila/alpine*"},{"type":"tag","value":"v1.0*"},{"type":"label","value":"release"}]',
+                                    trigger=r'{"type":"scheduled","trigger_setting":{"cron":"0 8 * * * *"}}', enabled=False)
+
+        self.preheat.update_policy(project_name, policy.name, policy_new, **USER_CLIENT)
+
+        self.preheat.delete_instance(instance.name, expect_status_code=403, **USER_CLIENT)
+
+        self.project.delete_project(project_id, **USER_CLIENT)
+
+        self.preheat.delete_instance(instance.name, **ADMIN_CLIENT)
+
+    def test_create_instance(self):
+        self.do_validate("harbor")
+
+    def suite():
+        suite = unittest.TestSuite(unittest.makeSuite(TestP2P))
+        return suite
+
+if __name__ == '__main__':
+    result = unittest.TextTestRunner(sys.stdout, verbosity=2, failfast=True).run(TestP2P.suite())
+    print("Test result:",result)
+    if not result.wasSuccessful():
+        raise Exception(r"P2P test failed!")
+
diff --git a/tests/robot-cases/Group0-BAT/API_DB.robot b/tests/robot-cases/Group0-BAT/API_DB.robot
index df3301ad3d..1c5a50c7ce 100644
--- a/tests/robot-cases/Group0-BAT/API_DB.robot
+++ b/tests/robot-cases/Group0-BAT/API_DB.robot
@@ -152,3 +152,7 @@ Test Case - Proxy Cache
 Test Case - Tag Immutability
     [Tags]  tag_immutability
     Harbor API Test  ./tests/apitests/python/test_tag_immutability.py
+
+Test Case - P2P
+    [Tags]  p2p
+    Harbor API Test  ./tests/apitests/python/test_p2p.py