diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 795bcf2f8..db548b434 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -1499,6 +1499,11 @@ paths: format: int required: true description: The ID of the policy that triggered this job. + - name: op_uuid + in: query + type: string + required: false + description: The UUID of one trigger of replication policy. - name: num in: query type: integer @@ -1987,6 +1992,8 @@ paths: responses: '200': description: Trigger the replication successfully. + schema: + $ref: '#/definitions/ReplicationResponse' '401': description: User need to log in first. '404': @@ -3926,6 +3933,12 @@ definitions: policy_id: type: integer description: The ID of replication policy + ReplicationResponse: + type: object + properties: + uuid: + type: string + description: UUID of the replication RepositoryDescription: type: object properties: diff --git a/make/migrations/postgresql/0003_add_replication_op_uuid.up.sql b/make/migrations/postgresql/0003_add_replication_op_uuid.up.sql new file mode 100644 index 000000000..994487a79 --- /dev/null +++ b/make/migrations/postgresql/0003_add_replication_op_uuid.up.sql @@ -0,0 +1 @@ +ALTER TABLE replication_job ADD COLUMN op_uuid varchar(64); \ No newline at end of file diff --git a/src/common/dao/replication_job.go b/src/common/dao/replication_job.go index 3773fba37..c25eccc1f 100644 --- a/src/common/dao/replication_job.go +++ b/src/common/dao/replication_job.go @@ -356,6 +356,9 @@ func repJobQueryConditions(query ...*models.RepJobQuery) orm.QuerySeter { if q.PolicyID != 0 { qs = qs.Filter("PolicyID", q.PolicyID) } + if len(q.OpUUID) > 0 { + qs = qs.Filter("OpUUID__exact", q.OpUUID) + } if len(q.Repository) > 0 { qs = qs.Filter("Repository__icontains", q.Repository) } diff --git a/src/common/models/replication_job.go b/src/common/models/replication_job.go index 3b3c7df80..337df5d2c 100644 --- a/src/common/models/replication_job.go +++ b/src/common/models/replication_job.go @@ -54,15 +54,15 @@ type RepPolicy struct { // RepJob is the model for a replication job, which is the execution unit on job service, currently it is used to transfer/remove // a repository to/from a remote registry instance. type RepJob struct { - ID int64 `orm:"pk;auto;column(id)" json:"id"` - Status string `orm:"column(status)" json:"status"` - Repository string `orm:"column(repository)" json:"repository"` - PolicyID int64 `orm:"column(policy_id)" json:"policy_id"` - Operation string `orm:"column(operation)" json:"operation"` - Tags string `orm:"column(tags)" json:"-"` - TagList []string `orm:"-" json:"tags"` - UUID string `orm:"column(job_uuid)" json:"-"` - // Policy RepPolicy `orm:"-" json:"policy"` + ID int64 `orm:"pk;auto;column(id)" json:"id"` + Status string `orm:"column(status)" json:"status"` + Repository string `orm:"column(repository)" json:"repository"` + PolicyID int64 `orm:"column(policy_id)" json:"policy_id"` + OpUUID string `orm:"column(op_uuid)" json:"op_uuid"` + Operation string `orm:"column(operation)" json:"operation"` + Tags string `orm:"column(tags)" json:"-"` + TagList []string `orm:"-" json:"tags"` + UUID string `orm:"column(job_uuid)" json:"-"` CreationTime time.Time `orm:"column(creation_time);auto_now_add" json:"creation_time"` UpdateTime time.Time `orm:"column(update_time);auto_now" json:"update_time"` } @@ -126,6 +126,7 @@ func (r *RepPolicy) TableName() string { // RepJobQuery holds query conditions for replication job type RepJobQuery struct { PolicyID int64 + OpUUID string Repository string Statuses []string Operations []string diff --git a/src/core/api/models/replication.go b/src/core/api/models/replication.go index a24877caf..84617300a 100644 --- a/src/core/api/models/replication.go +++ b/src/core/api/models/replication.go @@ -23,6 +23,11 @@ type Replication struct { PolicyID int64 `json:"policy_id"` } +// ReplicationResponse describes response of a replication request, it gives +type ReplicationResponse struct { + UUID string `json:"uuid"` +} + // Valid ... func (r *Replication) Valid(v *validation.Validation) { if r.PolicyID <= 0 { diff --git a/src/core/api/replication.go b/src/core/api/replication.go index fd9a9d2d3..5b948474f 100644 --- a/src/core/api/replication.go +++ b/src/core/api/replication.go @@ -17,6 +17,7 @@ package api import ( "fmt" "net/http" + "strings" "github.com/goharbor/harbor/src/common/dao" "github.com/goharbor/harbor/src/common/models" @@ -26,6 +27,8 @@ import ( "github.com/goharbor/harbor/src/replication/core" "github.com/goharbor/harbor/src/replication/event/notification" "github.com/goharbor/harbor/src/replication/event/topic" + + "github.com/docker/distribution/uuid" ) // ReplicationAPI handles API calls for replication @@ -78,16 +81,27 @@ func (r *ReplicationAPI) Post() { return } - if err = startReplication(replication.PolicyID); err != nil { + opUUID, err := startReplication(replication.PolicyID) + if err != nil { r.HandleInternalServerError(fmt.Sprintf("failed to publish replication topic for policy %d: %v", replication.PolicyID, err)) return } log.Infof("replication signal for policy %d sent", replication.PolicyID) + + r.Data["json"] = api_models.ReplicationResponse{ + UUID: opUUID, + } + r.ServeJSON() } -func startReplication(policyID int64) error { - return notifier.Publish(topic.StartReplicationTopic, +// startReplication triggers a replication and return the uuid of this replication. +func startReplication(policyID int64) (string, error) { + opUUID := strings.Replace(uuid.Generate().String(), "-", "", -1) + return opUUID, notifier.Publish(topic.StartReplicationTopic, notification.StartReplicationNotification{ PolicyID: policyID, + Metadata: map[string]interface{}{ + "op_uuid": opUUID, + }, }) } diff --git a/src/core/api/replication_job.go b/src/core/api/replication_job.go index a166b49a7..609842f7f 100644 --- a/src/core/api/replication_job.go +++ b/src/core/api/replication_job.go @@ -94,6 +94,7 @@ func (ra *RepJobAPI) List() { query.Repository = ra.GetString("repository") query.Statuses = ra.GetStrings("status") + query.OpUUID = ra.GetString("op_uuid") startTimeStr := ra.GetString("start_time") if len(startTimeStr) != 0 { diff --git a/src/core/api/replication_policy.go b/src/core/api/replication_policy.go index cd11f0da1..c894b3b2a 100644 --- a/src/core/api/replication_policy.go +++ b/src/core/api/replication_policy.go @@ -180,7 +180,7 @@ func (pa *RepPolicyAPI) Post() { if policy.ReplicateExistingImageNow { go func() { - if err = startReplication(id); err != nil { + if _, err = startReplication(id); err != nil { log.Errorf("failed to send replication signal for policy %d: %v", id, err) return } @@ -262,7 +262,7 @@ func (pa *RepPolicyAPI) Put() { if policy.ReplicateExistingImageNow { go func() { - if err = startReplication(id); err != nil { + if _, err = startReplication(id); err != nil { log.Errorf("failed to send replication signal for policy %d: %v", id, err) return } diff --git a/src/replication/core/controller.go b/src/replication/core/controller.go index 0a0626a75..768a47a3f 100644 --- a/src/replication/core/controller.go +++ b/src/replication/core/controller.go @@ -16,6 +16,8 @@ package core import ( "fmt" + "reflect" + "strings" common_models "github.com/goharbor/harbor/src/common/models" "github.com/goharbor/harbor/src/common/utils/log" @@ -27,6 +29,8 @@ import ( "github.com/goharbor/harbor/src/replication/source" "github.com/goharbor/harbor/src/replication/target" "github.com/goharbor/harbor/src/replication/trigger" + + "github.com/docker/distribution/uuid" ) // Controller defines the methods that a replicatoin controllter should implement @@ -220,9 +224,16 @@ func (ctl *DefaultController) Replicate(policyID int64, metadata ...map[string]i targets = append(targets, target) } + // Get operation uuid from metadata, if none provided, generate one. + opUUID, err := getOpUUID(metadata...) + if err != nil { + return err + } + // submit the replication return ctl.replicator.Replicate(&replicator.Replication{ PolicyID: policyID, + OpUUID: opUUID, Candidates: candidates, Targets: targets, }) @@ -290,3 +301,26 @@ func buildFilterChain(policy *models.ReplicationPolicy, sourcer *source.Sourcer) return source.NewDefaultFilterChain(filters) } + +// getOpUUID get operation uuid from metadata or generate one if none found. +func getOpUUID(metadata ...map[string]interface{}) (string, error) { + if len(metadata) <= 0 { + return strings.Replace(uuid.Generate().String(), "-", "", -1), nil + } + + opUUID, ok := metadata[0]["op_uuid"] + if !ok { + return strings.Replace(uuid.Generate().String(), "-", "", -1), nil + } + + id, ok := opUUID.(string) + if !ok { + return "", fmt.Errorf("operation uuid should have type 'string', but got '%s'", reflect.TypeOf(opUUID).Name()) + } + + if id == "" { + return "", fmt.Errorf("provided operation uuid is empty") + } + + return id, nil +} diff --git a/src/replication/core/controller_test.go b/src/replication/core/controller_test.go index f7ded4556..248912305 100644 --- a/src/replication/core/controller_test.go +++ b/src/replication/core/controller_test.go @@ -156,3 +156,26 @@ func TestBuildFilterChain(t *testing.T) { chain := buildFilterChain(policy, sourcer) assert.Equal(t, 3, len(chain.Filters())) } + +func TestGetOpUUID(t *testing.T) { + uuid, err := getOpUUID() + assert.Nil(t, err) + assert.NotEmpty(t, uuid) + + uuid, err = getOpUUID(map[string]interface{}{ + "name": "test", + }) + assert.Nil(t, err) + assert.NotEmpty(t, uuid) + + uuid, err = getOpUUID(map[string]interface{}{ + "op_uuid": 0, + }) + assert.NotNil(t, err) + + uuid, err = getOpUUID(map[string]interface{}{ + "op_uuid": "0", + }) + assert.Nil(t, err) + assert.Equal(t, uuid, "0") +} diff --git a/src/replication/replicator/replicator.go b/src/replication/replicator/replicator.go index 9e3fdeb34..fad3a9640 100644 --- a/src/replication/replicator/replicator.go +++ b/src/replication/replicator/replicator.go @@ -30,6 +30,7 @@ import ( // Replication holds information for a replication type Replication struct { PolicyID int64 + OpUUID string Candidates []models.FilterItem Targets []*common_models.RepTarget Operation string @@ -60,6 +61,9 @@ func (d *DefaultReplicator) Replicate(replication *Replication) error { operation := "" for _, candidate := range replication.Candidates { strs := strings.SplitN(candidate.Value, ":", 2) + if len(strs) != 2 { + return fmt.Errorf("malforld image '%s'", candidate.Value) + } repositories[strs[0]] = append(repositories[strs[0]], strs[1]) operation = candidate.Operation } @@ -69,6 +73,7 @@ func (d *DefaultReplicator) Replicate(replication *Replication) error { // create job in database id, err := dao.AddRepJob(common_models.RepJob{ PolicyID: replication.PolicyID, + OpUUID: replication.OpUUID, Repository: repository, TagList: tags, Operation: operation,