feat: Single Active Replication (#21347)

feat(replication): add Single Active Replication per policy

* Added single_active_replication field to schema & DB table
* Updated API, controller & handler to enforce single execution per policy
* Added checkbox in UI to enable/disable single_active_replication for a
  policy
* Implemented necessary backend & frontend tests
* Prevents parallel runs per policy if single_active_replication is enabled

Signed-off-by: bupd <bupdprasanth@gmail.com>
This commit is contained in:
Prasanth Baskar 2025-08-11 15:32:56 +05:30 committed by GitHub
parent 91d2fa6377
commit 221037378a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 158 additions and 5 deletions

View File

@ -7462,6 +7462,12 @@ definitions:
type: boolean
description: Whether to enable copy by chunk.
x-isnullable: true
single_active_replication:
type: boolean
description: |-
Whether to skip execution until the previous active execution finishes,
avoiding the execution of the same replication rules multiple times in parallel.
x-isnullable: true # make this field optional to keep backward compatibility
ReplicationTrigger:
type: object
properties:

View File

@ -6,4 +6,6 @@ ALTER SEQUENCE permission_policy_id_seq AS BIGINT;
ALTER TABLE role_permission ALTER COLUMN permission_policy_id TYPE BIGINT;
ALTER TABLE vulnerability_record ADD COLUMN IF NOT EXISTS status text;
ALTER TABLE vulnerability_record ADD COLUMN IF NOT EXISTS status text;
ALTER TABLE replication_policy ADD COLUMN IF NOT EXISTS single_active_replication boolean;

View File

@ -109,10 +109,37 @@ func (c *controller) Start(ctx context.Context, policy *replicationmodel.Policy,
if op := operator.FromContext(ctx); op != "" {
extra["operator"] = op
}
var count int64
// If running executions are found, skip the current execution and mark it as error.
if policy.SingleActiveReplication {
var err error
count, err = c.execMgr.Count(ctx, &q.Query{
Keywords: map[string]any{
"VendorType": job.ReplicationVendorType,
"VendorID": policy.ID,
"Status": job.RunningStatus.String(),
},
})
if err != nil {
return 0, fmt.Errorf("failed to count running executions for policy ID: %d: %v", policy.ID, err)
}
}
id, err := c.execMgr.Create(ctx, job.ReplicationVendorType, policy.ID, trigger, extra)
if err != nil {
return 0, err
}
if policy.SingleActiveReplication {
if count > 0 {
if err = c.execMgr.MarkError(ctx, id, "Execution skipped: active replication still in progress."); err != nil {
return 0, err
}
return id, nil
}
}
// start the replication flow in background
// as the process runs inside a goroutine, the transaction in the outer ctx
// may be submitted already when the process starts, so create an new context

View File

@ -101,6 +101,38 @@ func (r *replicationTestSuite) TestStart() {
r.execMgr.AssertExpectations(r.T())
r.flowCtl.AssertExpectations(r.T())
r.ormCreator.AssertExpectations(r.T())
r.SetupTest()
// run replication flow with SingleActiveReplication, flow should not start
r.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
r.execMgr.On("MarkError", mock.Anything, mock.Anything, mock.Anything).Return(nil)
r.execMgr.On("Count", mock.Anything, mock.Anything).Return(int64(1), nil) // Simulate an existing running execution
id, err = r.ctl.Start(context.Background(), &repctlmodel.Policy{Enabled: true, SingleActiveReplication: true}, nil, task.ExecutionTriggerManual)
r.Require().Nil(err)
r.Equal(int64(1), id)
time.Sleep(1 * time.Second) // wait the functions called in the goroutine
r.flowCtl.AssertNumberOfCalls(r.T(), "Start", 0)
r.execMgr.AssertNumberOfCalls(r.T(), "MarkError", 1) // Ensure execution marked as final status error
r.execMgr.AssertExpectations(r.T())
r.flowCtl.AssertExpectations(r.T())
r.ormCreator.AssertExpectations(r.T())
r.SetupTest()
// no error when running the replication flow with SingleActiveReplication
r.execMgr.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1), nil)
r.execMgr.On("Get", mock.Anything, mock.Anything).Return(&task.Execution{}, nil)
r.flowCtl.On("Start", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
r.ormCreator.On("Create").Return(nil)
r.execMgr.On("Count", mock.Anything, mock.Anything).Return(int64(0), nil) // Simulate no running execution
id, err = r.ctl.Start(context.Background(), &repctlmodel.Policy{Enabled: true, SingleActiveReplication: true}, nil, task.ExecutionTriggerManual)
r.Require().Nil(err)
r.Equal(int64(1), id)
time.Sleep(1 * time.Second) // wait the functions called in the goroutine
r.execMgr.AssertExpectations(r.T())
r.flowCtl.AssertExpectations(r.T())
r.ormCreator.AssertExpectations(r.T())
}
func (r *replicationTestSuite) TestStop() {

View File

@ -47,6 +47,7 @@ type Policy struct {
UpdateTime time.Time `json:"update_time"`
Speed int32 `json:"speed"`
CopyByChunk bool `json:"copy_by_chunk"`
SingleActiveReplication bool `json:"single_active_replication"`
}
// IsScheduledTrigger returns true when the policy is scheduled trigger and enabled
@ -141,6 +142,7 @@ func (p *Policy) From(policy *replicationmodel.Policy) error {
p.UpdateTime = policy.UpdateTime
p.Speed = policy.Speed
p.CopyByChunk = policy.CopyByChunk
p.SingleActiveReplication = policy.SingleActiveReplication
if policy.SrcRegistryID > 0 {
p.SrcRegistry = &model.Registry{
@ -186,6 +188,7 @@ func (p *Policy) To() (*replicationmodel.Policy, error) {
UpdateTime: p.UpdateTime,
Speed: p.Speed,
CopyByChunk: p.CopyByChunk,
SingleActiveReplication: p.SingleActiveReplication,
}
if p.SrcRegistry != nil {
policy.SrcRegistryID = p.SrcRegistry.ID

View File

@ -43,6 +43,7 @@ type Policy struct {
UpdateTime time.Time `orm:"column(update_time);auto_now"`
Speed int32 `orm:"column(speed_kb)"`
CopyByChunk bool `orm:"column(copy_by_chunk)"`
SingleActiveReplication bool `orm:"column(single_active_replication)"`
}
// TableName set table name for ORM

View File

@ -825,6 +825,39 @@
'REPLICATION.ENABLED_RULE' | translate
}}</label>
</div>
<div
class="clr-checkbox-wrapper"
[hidden]="!isNotEventBased()">
<input
type="checkbox"
class="clr-checkbox"
[checked]="true"
id="singleActiveReplication"
formControlName="single_active_replication" />
<label
for="singleActiveReplication"
class="clr-control-label single-active"
>{{
'REPLICATION.SINGLE_ACTIVE_REPLICATION'
| translate
}}
<clr-tooltip class="override-tooltip">
<clr-icon
clrTooltipTrigger
shape="info-circle"
size="24"></clr-icon>
<clr-tooltip-content
clrPosition="top-left"
clrSize="md"
*clrIfOpen>
<span>{{
'TOOLTIP.SINGLE_ACTIVE_REPLICATION'
| translate
}}</span>
</clr-tooltip-content>
</clr-tooltip>
</label>
</div>
</div>
</div>
</form>

View File

@ -246,6 +246,10 @@ clr-modal {
width: 8.6rem;
}
.single-active {
width: 16rem;
}
.des-tooltip {
margin-left: 0.5rem;
}

View File

@ -334,6 +334,7 @@ export class CreateEditRuleComponent implements OnInit, OnDestroy {
override: true,
speed: -1,
copy_by_chunk: false,
single_active_replication: false,
});
}
@ -367,6 +368,7 @@ export class CreateEditRuleComponent implements OnInit, OnDestroy {
dest_namespace_replace_count: Flatten_Level.FLATTEN_LEVEl_1,
speed: -1,
copy_by_chunk: false,
single_active_replication: false,
});
this.isPushMode = true;
this.selectedUnit = BandwidthUnit.KB;
@ -410,6 +412,7 @@ export class CreateEditRuleComponent implements OnInit, OnDestroy {
override: rule.override,
speed: speed,
copy_by_chunk: rule.copy_by_chunk,
single_active_replication: rule.single_active_replication,
});
let filtersArray = this.getFilterArray(rule);
this.noSelectedEndpoint = false;
@ -510,6 +513,9 @@ export class CreateEditRuleComponent implements OnInit, OnDestroy {
}
onSubmit() {
if (this.ruleForm.value.trigger.type === 'event_based') {
this.ruleForm.get('single_active_replication').setValue(false);
}
if (this.ruleForm.value.trigger.type !== 'scheduled') {
this.ruleForm
.get('trigger')

View File

@ -138,7 +138,7 @@
}}</a>
</clr-dg-cell>
<clr-dg-cell>
{{ getStatusStr(j.status) }}
{{ getStatusStr(j.status, j.status_text) }}
<clr-tooltip>
<clr-icon
*ngIf="j.status_text"

View File

@ -644,7 +644,11 @@ export class ReplicationComponent implements OnInit, OnDestroy {
}
}
getStatusStr(status: string): string {
getStatusStr(status: string, status_text: string): string {
// If status is Failed and status_text has 'Execution skipped', it means the replication task is skipped.
if (status === 'Failed' && status_text.startsWith('Execution skipped'))
return 'Skipped';
if (STATUS_MAP && STATUS_MAP[status]) {
return STATUS_MAP[status];
}

View File

@ -76,6 +76,7 @@
"PULL_BASED": "Lade die Ressourcen von der entfernten Registry auf den lokalen Harbor runter.",
"DESTINATION_NAMESPACE": "Spezifizieren des Ziel-Namespace. Wenn das Feld leer ist, werden die Ressourcen unter dem gleichen Namespace abgelegt wie in der Quelle.",
"OVERRIDE": "Spezifizieren, ob die Ressourcen am Ziel überschrieben werden sollen, falls eine Ressource mit gleichem Namen existiert.",
"SINGLE_ACTIVE_REPLICATION": "Specify whether to skip execution until the previous active execution finishes, avoiding the execution of the same replication rules multiple times in parallel.",
"EMAIL": "E-Mail sollte eine gültige E-Mail-Adresse wie name@example.com sein.",
"USER_NAME": "Darf keine Sonderzeichen enthalten und sollte kürzer als 255 Zeichen sein.",
"FULL_NAME": "Maximale Länge soll 20 Zeichen sein.",
@ -578,6 +579,7 @@
"ALLOWED_CHARACTERS": "Erlaubte Sonderzeichen",
"TOTAL": "Gesamt",
"OVERRIDE": "Überschreiben",
"SINGLE_ACTIVE_REPLICATION": "Single active replication",
"ENABLED_RULE": "Aktiviere Regel",
"OVERRIDE_INFO": "Überschreiben",
"OPERATION": "Operation",

View File

@ -76,6 +76,7 @@
"PULL_BASED": "Pull the resources from the remote registry to the local Harbor.",
"DESTINATION_NAMESPACE": "Specify the destination namespace. If empty, the resources will be put under the same namespace as the source.",
"OVERRIDE": "Specify whether to override the resources at the destination if a resource with the same name exists.",
"SINGLE_ACTIVE_REPLICATION": "Specify whether to skip execution until the previous active execution finishes, avoiding the execution of the same replication rules multiple times in parallel.",
"EMAIL": "Email should be a valid email address like name@example.com.",
"USER_NAME": "Cannot contain special characters and maximum length should be 255 characters.",
"FULL_NAME": "Maximum length should be 20 characters.",
@ -578,6 +579,7 @@
"ALLOWED_CHARACTERS": "Allowed special characters",
"TOTAL": "Total",
"OVERRIDE": "Override",
"SINGLE_ACTIVE_REPLICATION": "Single active replication",
"ENABLED_RULE": "Enable rule",
"OVERRIDE_INFO": "Override",
"OPERATION": "Operation",

View File

@ -76,6 +76,7 @@
"PULL_BASED": "Pull de recursos del remote registry al local Harbor.",
"DESTINATION_NAMESPACE": "Especificar el namespace de destino. Si esta vacio, los recursos se colocan en el mismo namespace del recurso.",
"OVERRIDE": "Especifique si desea anular los recursos en el destino si existe un recurso con el mismo nombre.",
"SINGLE_ACTIVE_REPLICATION": "Specify whether to skip execution until the previous active execution finishes, avoiding the execution of the same replication rules multiple times in parallel.",
"EMAIL": "El email debe ser una dirección válida como nombre@ejemplo.com.",
"USER_NAME": "Debe tener una longitud máxima de 255 caracteres y no puede contener caracteres especiales.",
"FULL_NAME": "La longitud máxima debería ser de 20 caracteres.",
@ -578,6 +579,7 @@
"ALLOWED_CHARACTERS": "Caracteres Especiales Permitidos",
"TOTAL": "Total",
"OVERRIDE": "Sobreescribir",
"SINGLE_ACTIVE_REPLICATION": "Single active replication",
"ENABLED_RULE": "Activar regla",
"OVERRIDE_INFO": "Sobreescribir",
"CURRENT": "Actual",

View File

@ -76,6 +76,7 @@
"PULL_BASED": "Pull les ressources du registre distant vers le Harbor local.",
"DESTINATION_NAMESPACE": "Spécifier l'espace de nom de destination. Si vide, les ressources seront placées sous le même espace de nom que la source.",
"OVERRIDE": "Spécifier s'il faut remplacer les ressources dans la destination si une ressource avec le même nom existe.",
"SINGLE_ACTIVE_REPLICATION": "Specify whether to skip execution until the previous active execution finishes, avoiding the execution of the same replication rules multiple times in parallel.",
"EMAIL": "L'e-mail doit être une adresse e-mail valide comme name@example.com.",
"USER_NAME": "Ne peut pas contenir de caractères spéciaux et la longueur maximale est de 255 caractères.",
"FULL_NAME": "La longueur maximale est de 20 caractères.",
@ -578,6 +579,7 @@
"ALLOWED_CHARACTERS": "Caractères spéciaux autorisés",
"TOTAL": "Total",
"OVERRIDE": "Surcharger",
"SINGLE_ACTIVE_REPLICATION": "Single active replication",
"ENABLED_RULE": "Activer la règle",
"OVERRIDE_INFO": "Surcharger",
"OPERATION": "Opération",

View File

@ -76,6 +76,7 @@
"PULL_BASED": "원격 레지스트리의 리소스를 로컬 'Harbor'로 가져옵니다.",
"DESTINATION_NAMESPACE": "대상 네임스페이스를 지정합니다. 비어 있으면 리소스는 소스와 동일한 네임스페이스에 배치됩니다.",
"OVERRIDE": "동일한 이름의 리소스가 있는 경우 대상의 리소스를 재정의할지 여부를 지정합니다.",
"SINGLE_ACTIVE_REPLICATION": "Specify whether to skip execution until the previous active execution finishes, avoiding the execution of the same replication rules multiple times in parallel.",
"EMAIL": "이메일은 name@example.com과 같은 유효한 이메일 주소여야 합니다.",
"USER_NAME": "특수 문자를 포함할 수 없으며 최대 길이는 255자입니다.",
"FULL_NAME": "최대 길이는 20자입니다.",
@ -575,6 +576,7 @@
"ALLOWED_CHARACTERS": "허용되는 특수 문자",
"TOTAL": "총",
"OVERRIDE": "Override",
"SINGLE_ACTIVE_REPLICATION": "Single active replication",
"ENABLED_RULE": "규칙 활성화",
"OVERRIDE_INFO": "Override",
"OPERATION": "작업",

View File

@ -76,6 +76,7 @@
"PULL_BASED": "Trazer recursos do repositório remoto para o Harbor local.",
"DESTINATION_NAMESPACE": "Especificar o namespace de destino. Se vazio, os recursos serão colocados no mesmo namespace que a fonte.",
"OVERRIDE": "Sobrescrever recursos no destino se já existir com o mesmo nome.",
"SINGLE_ACTIVE_REPLICATION": "Specify whether to skip execution until the previous active execution finishes, avoiding the execution of the same replication rules multiple times in parallel.",
"EMAIL": "Deve ser um endereço de e-mail válido como nome@exemplo.com.",
"USER_NAME": "Não pode conter caracteres especiais. Tamanho máximo de 255 caracteres.",
"FULL_NAME": "Tamanho máximo de 20 caracteres.",
@ -576,6 +577,7 @@
"ALLOWED_CHARACTERS": "Símbolos permitidos",
"TOTAL": "Total",
"OVERRIDE": "Sobrescrever",
"SINGLE_ACTIVE_REPLICATION": "Single active replication",
"ENABLED_RULE": "Habiltar regra",
"OVERRIDE_INFO": "Sobrescrever",
"CURRENT": "atual",

View File

@ -76,6 +76,7 @@
"PULL_BASED": "Kaynakları uzak kayıt defterinden yerel Harbora çekin.",
"DESTINATION_NAMESPACE": "Hedef ad alanını belirtin. Boşsa, kaynaklar, kaynak ile aynı ad alanına yerleştirilir.",
"OVERRIDE": "Aynı adı taşıyan bir kaynak varsa, hedefteki kaynakları geçersiz kılmayacağınızı belirtin.",
"SINGLE_ACTIVE_REPLICATION": "Specify whether to skip execution until the previous active execution finishes, avoiding the execution of the same replication rules multiple times in parallel.",
"EMAIL": "E-posta, ad@example.com gibi geçerli bir e-posta adresi olmalıdır.",
"USER_NAME": "Özel karakterler içeremez ve maksimum uzunluk 255 karakter olmalıdır.",
"FULL_NAME": "Maksimum uzunluk 20 karakter olmalıdır.",
@ -579,6 +580,7 @@
"ALLOWED_CHARACTERS": "İzin verilen özel karakterler",
"TOTAL": "Toplam",
"OVERRIDE": "Geçersiz Kıl",
"SINGLE_ACTIVE_REPLICATION": "Single active replication",
"ENABLED_RULE": "Kuralı etkinleştir",
"OVERRIDE_INFO": "Geçersiz Kıl",
"OPERATION": "Operasyon",

View File

@ -76,6 +76,7 @@
"PULL_BASED": "把资源由远端仓库拉取到本地Harbor。",
"DESTINATION_NAMESPACE": "指定目标名称空间。如果不填,资源会被放到和源相同的名称空间下。",
"OVERRIDE": "如果存在具有相同名称的资源,请指定是否覆盖目标上的资源。",
"SINGLE_ACTIVE_REPLICATION": "Specify whether to skip execution until the previous active execution finishes, avoiding the execution of the same replication rules multiple times in parallel.",
"EMAIL": "请使用正确的邮箱地址比如name@example.com。",
"USER_NAME": "不能包含特殊字符且长度不能超过255。",
"FULL_NAME": "长度不能超过20。",
@ -576,6 +577,7 @@
"ALLOWED_CHARACTERS": "允许的特殊字符",
"TOTAL": "总数",
"OVERRIDE": "覆盖",
"SINGLE_ACTIVE_REPLICATION": "Single active replication",
"ENABLED_RULE": "启用规则",
"OVERRIDE_INFO": "覆盖",
"CURRENT": "当前仓库",

View File

@ -113,6 +113,14 @@ func (r *replicationAPI) CreateReplicationPolicy(ctx context.Context, params ope
policy.CopyByChunk = *params.Policy.CopyByChunk
}
if params.Policy.SingleActiveReplication != nil {
// Validate and assign SingleActiveReplication only for non-event_based triggers
if params.Policy.Trigger != nil && params.Policy.Trigger.Type == model.TriggerTypeEventBased && *params.Policy.SingleActiveReplication {
return r.SendError(ctx, fmt.Errorf("single active replication is not allowed for event_based triggers"))
}
policy.SingleActiveReplication = *params.Policy.SingleActiveReplication
}
id, err := r.ctl.CreatePolicy(ctx, policy)
if err != nil {
return r.SendError(ctx, err)
@ -181,6 +189,14 @@ func (r *replicationAPI) UpdateReplicationPolicy(ctx context.Context, params ope
policy.CopyByChunk = *params.Policy.CopyByChunk
}
if params.Policy.SingleActiveReplication != nil {
// Validate and assign SingleActiveReplication only for non-event_based triggers
if params.Policy.Trigger != nil && params.Policy.Trigger.Type == model.TriggerTypeEventBased && *params.Policy.SingleActiveReplication {
return r.SendError(ctx, fmt.Errorf("single active replication is not allowed for event_based triggers"))
}
policy.SingleActiveReplication = *params.Policy.SingleActiveReplication
}
if err := r.ctl.UpdatePolicy(ctx, policy); err != nil {
return r.SendError(ctx, err)
}
@ -446,6 +462,7 @@ func convertReplicationPolicy(policy *repctlmodel.Policy) *models.ReplicationPol
Speed: &policy.Speed,
UpdateTime: strfmt.DateTime(policy.UpdateTime),
CopyByChunk: &policy.CopyByChunk,
SingleActiveReplication: &policy.SingleActiveReplication,
}
if policy.SrcRegistry != nil {
p.SrcRegistry = convertRegistry(policy.SrcRegistry)

View File

@ -151,7 +151,8 @@ replication_policy_payload = {
"deletion": False,
"override": True,
"speed": -1,
"copy_by_chunk": False
"copy_by_chunk": False,
"single_active_replication": False
}
create_replication_policy = Permission("{}/replication/policies".format(harbor_base_url), "POST", 201, replication_policy_payload, "id", id_from_header=True)
list_replication_policy = Permission("{}/replication/policies".format(harbor_base_url), "GET", 200, replication_policy_payload)
@ -204,7 +205,8 @@ if "replication" in resources or "all" == resources:
"deletion": False,
"override": True,
"speed": -1,
"copy_by_chunk": False
"copy_by_chunk": False,
"single_active_replication": False
}
response = requests.post("{}/replication/policies".format(harbor_base_url), data=json.dumps(replication_policy_payload), verify=False, auth=(admin_user_name, admin_password), headers={"Content-Type": "application/json"})
replication_policy_id = int(response.headers["Location"].split("/")[-1])