Upgrade the replication_job table

This commit migrates the replication_job table, add one execution record and one task record for each job

Signed-off-by: Wenkai Yin <yinw@vmware.com>
This commit is contained in:
Wenkai Yin 2019-04-08 14:33:27 +08:00
parent 462aab5a1e
commit 4ffa0c3da0
4 changed files with 47 additions and 31 deletions

View File

@ -49,15 +49,15 @@ DROP TRIGGER replication_target_update_time_at_modtime ON replication_target;
ALTER TABLE replication_target RENAME TO registry;
ALTER TABLE registry ALTER COLUMN url TYPE varchar(256);
ALTER TABLE registry ADD COLUMN credential_type varchar(16);
UPDATE registry SET credential_type='basic' WHERE credential_type='';
ALTER TABLE registry RENAME COLUMN username TO access_key;
ALTER TABLE registry RENAME COLUMN password TO access_secret;
ALTER TABLE registry ALTER COLUMN access_secret TYPE varchar(1024);
ALTER TABLE registry ADD COLUMN type varchar(32);
UPDATE registry SET type='harbor' WHERE type='';
ALTER TABLE registry DROP COLUMN target_type;
ALTER TABLE registry ADD COLUMN description text;
ALTER TABLE registry ADD COLUMN health varchar(16);
UPDATE registry SET type='harbor';
UPDATE registry SET credential_type='basic';
/*upgrade the replication_policy*/
ALTER TABLE replication_policy ADD COLUMN creator varchar(256);
@ -107,21 +107,40 @@ create table replication_task (
);
CREATE INDEX task_execution ON replication_task (execution_id);
create table replication_schedule_job (
id SERIAL NOT NULL,
policy_id int NOT NULL,
job_id varchar(64),
status varchar(32),
creation_time timestamp default CURRENT_TIMESTAMP,
update_time timestamp NULL,
PRIMARY KEY (id)
);
CREATE INDEX replication_schedule_job_index ON replication_schedule_job (policy_id);
/*
* TODO
* consider how to handle the replication_job;
* the replication_job contains schedule job;
* the schedule job has been removed from jobservice, how to handle this?
* keep consistent with the webhook handler?
*/
/*migrate each replication_job record to one replication_execution and one replication_task record*/
DO $$
DECLARE
job RECORD;
execid integer;
BEGIN
FOR job IN SELECT * FROM replication_job WHERE operation != 'schedule'
LOOP
/*insert one execution record*/
INSERT INTO replication_execution (policy_id, start_time) VALUES (job.policy_id, job.creation_time) RETURNING id INTO execid;
/*insert one task record*/
INSERT INTO replication_task (execution_id, resource_type, src_resource, dst_resource, operation, job_id, status, start_time, end_time)
VALUES (execid, 'image', CONCAT(job.repository,':[', job.tags,']'), CONCAT(job.repository,':[', job.tags,']'),
job.operation, job.job_uuid, job.status, job.creation_time, job.update_time);
END LOOP;
END $$;
UPDATE replication_task SET status='Pending' WHERE status='pending';
UPDATE replication_task SET status='InProgress' WHERE status='scheduled';
UPDATE replication_task SET status='InProgress' WHERE status='running';
UPDATE replication_task SET status='Failed' WHERE status='error';
UPDATE replication_task SET status='Succeed' WHERE status='finished';
UPDATE replication_task SET operation='copy' WHERE operation='transfer';
UPDATE replication_task SET operation='deletion' WHERE operation='delete';
/*upgrade the replication_job to replication_schedule_job*/
DELETE FROM replication_job WHERE operation != 'schedule';
ALTER TABLE replication_job RENAME COLUMN job_uuid TO job_id;
ALTER TABLE replication_job DROP COLUMN repository;
ALTER TABLE replication_job DROP COLUMN operation;
ALTER TABLE replication_job DROP COLUMN tags;
ALTER TABLE replication_job DROP COLUMN op_uuid;
DROP INDEX policy;
DROP INDEX poid_uptime;
DROP INDEX poid_status;
DROP TRIGGER replication_job_update_time_at_modtime ON replication_job;
ALTER TABLE replication_job RENAME TO replication_schedule_job;

View File

@ -103,11 +103,6 @@ func cleanByUser(username string) {
o.Rollback()
log.Error(err)
}
err = execUpdate(o, `delete from replication_job where id < 99`)
if err != nil {
log.Error(err)
}
err = execUpdate(o, `delete from replication_policy where id < 99`)
if err != nil {
log.Error(err)
@ -164,8 +159,8 @@ func testForAll(m *testing.M) int {
func clearAll() {
tables := []string{"project_member",
"project_metadata", "access_log", "repository", "replication_policy",
"registry", "replication_job", "img_scan_job",
"img_scan_overview", "clair_vuln_timestamp", "project", "harbor_user"}
"registry", "replication_execution", "replication_task", "img_scan_job",
"replication_schedule_job", "img_scan_overview", "clair_vuln_timestamp", "project", "harbor_user"}
for _, t := range tables {
if err := ClearTable(t); err != nil {
log.Errorf("Failed to clear table: %s,error: %v", t, err)

View File

@ -32,8 +32,7 @@ func TestMain(m *testing.M) {
`DROP TABLE "access", "access_log", "admin_job", "alembic_version", "clair_vuln_timestamp",
"harbor_label", "harbor_resource_label", "harbor_user", "img_scan_job", "img_scan_overview",
"job_log", "project", "project_member", "project_metadata", "properties", "registry",
"replication_immediate_trigger", "replication_job", "replication_policy", "replication_policy_ng",
"replication_target", "repository", "robot", "role", "schema_migrations", "user_group",
"replication_policy", "repository", "robot", "role", "schema_migrations", "user_group",
"replication_execution", "replication_task", "replication_schedule_job", "oidc_user";`,
`DROP FUNCTION "update_update_time_at_column"();`,
}

View File

@ -18,6 +18,7 @@ import (
"time"
"fmt"
"github.com/astaxie/beego/orm"
"github.com/goharbor/harbor/src/common/dao"
"github.com/goharbor/harbor/src/common/utils/log"
@ -199,10 +200,12 @@ func generateStatus(execution *models.Execution) string {
}
func executionFinished(status string) bool {
if status == models.ExecutionStatusInProgress {
return false
}
if status == models.ExecutionStatusStopped ||
status == models.ExecutionStatusSucceed ||
status == models.ExecutionStatusFailed {
return true
}
return false
}
// DeleteExecution ...