mirror of
https://github.com/goharbor/harbor.git
synced 2025-01-26 01:21:22 +01:00
270 lines
8.9 KiB
Go
270 lines
8.9 KiB
Go
package job
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/vmware/harbor/dao"
|
|
"github.com/vmware/harbor/job/config"
|
|
"github.com/vmware/harbor/job/replication"
|
|
"github.com/vmware/harbor/job/utils"
|
|
"github.com/vmware/harbor/models"
|
|
uti "github.com/vmware/harbor/utils"
|
|
"github.com/vmware/harbor/utils/log"
|
|
)
|
|
|
|
// RepJobParm wraps the parm of a job
|
|
type RepJobParm struct {
|
|
LocalRegURL string
|
|
TargetURL string
|
|
TargetUsername string
|
|
TargetPassword string
|
|
Repository string
|
|
Tags []string
|
|
Enabled int
|
|
Operation string
|
|
}
|
|
|
|
// SM is the state machine to handle job, it handles one job at a time.
|
|
type SM struct {
|
|
JobID int64
|
|
CurrentState string
|
|
PreviousState string
|
|
//The states that don't have to exist in transition map, such as "Error", "Canceled"
|
|
ForcedStates map[string]struct{}
|
|
Transitions map[string]map[string]struct{}
|
|
Handlers map[string]StateHandler
|
|
desiredState string
|
|
Logger *log.Logger
|
|
Parms *RepJobParm
|
|
lock *sync.Mutex
|
|
}
|
|
|
|
// EnterState transit the statemachine from the current state to the state in parameter.
|
|
// It returns the next state the statemachine should tranit to.
|
|
func (sm *SM) EnterState(s string) (string, error) {
|
|
log.Debugf("Job id: %d, transiting from State: %s, to State: %s", sm.JobID, sm.CurrentState, s)
|
|
targets, ok := sm.Transitions[sm.CurrentState]
|
|
_, exist := targets[s]
|
|
_, isForced := sm.ForcedStates[s]
|
|
if !exist && !isForced {
|
|
return "", fmt.Errorf("Job id: %d, transition from %s to %s does not exist!", sm.JobID, sm.CurrentState, s)
|
|
}
|
|
exitHandler, ok := sm.Handlers[sm.CurrentState]
|
|
if ok {
|
|
if err := exitHandler.Exit(); err != nil {
|
|
return "", err
|
|
}
|
|
} else {
|
|
log.Debugf("Job id: %d, no handler found for state:%s, skip", sm.JobID, sm.CurrentState)
|
|
}
|
|
enterHandler, ok := sm.Handlers[s]
|
|
var next = models.JobContinue
|
|
var err error
|
|
if ok {
|
|
if next, err = enterHandler.Enter(); err != nil {
|
|
return "", err
|
|
}
|
|
} else {
|
|
log.Debugf("Job id: %d, no handler found for state:%s, skip", sm.JobID, s)
|
|
}
|
|
sm.PreviousState = sm.CurrentState
|
|
sm.CurrentState = s
|
|
log.Debugf("Job id: %d, transition succeeded, current state: %s", sm.JobID, s)
|
|
return next, nil
|
|
}
|
|
|
|
// Start kicks off the statemachine to transit from current state to s, and moves on
|
|
// It will search the transit map if the next state is "_continue", and
|
|
// will enter error state if there's more than one possible path when next state is "_continue"
|
|
func (sm *SM) Start(s string) {
|
|
n, err := sm.EnterState(s)
|
|
log.Debugf("Job id: %d, next state from handler: %s", sm.JobID, n)
|
|
for len(n) > 0 && err == nil {
|
|
if d := sm.getDesiredState(); len(d) > 0 {
|
|
log.Debugf("Job id: %d. Desired state: %s, will ignore the next state from handler", sm.JobID, d)
|
|
n = d
|
|
sm.setDesiredState("")
|
|
continue
|
|
}
|
|
if n == models.JobContinue && len(sm.Transitions[sm.CurrentState]) == 1 {
|
|
for n = range sm.Transitions[sm.CurrentState] {
|
|
break
|
|
}
|
|
log.Debugf("Job id: %d, Continue to state: %s", sm.JobID, n)
|
|
continue
|
|
}
|
|
if n == models.JobContinue && len(sm.Transitions[sm.CurrentState]) != 1 {
|
|
log.Errorf("Job id: %d, next state is continue but there are %d possible next states in transition table", sm.JobID, len(sm.Transitions[sm.CurrentState]))
|
|
err = fmt.Errorf("Unable to continue")
|
|
break
|
|
}
|
|
n, err = sm.EnterState(n)
|
|
log.Debugf("Job id: %d, next state from handler: %s", sm.JobID, n)
|
|
}
|
|
if err != nil {
|
|
log.Warningf("Job id: %d, the statemachin will enter error state due to error: %v", sm.JobID, err)
|
|
sm.EnterState(models.JobError)
|
|
}
|
|
}
|
|
|
|
// AddTransition add a transition to the transition table of state machine, the handler is the handler of target state "to"
|
|
func (sm *SM) AddTransition(from string, to string, h StateHandler) {
|
|
_, ok := sm.Transitions[from]
|
|
if !ok {
|
|
sm.Transitions[from] = make(map[string]struct{})
|
|
}
|
|
sm.Transitions[from][to] = struct{}{}
|
|
sm.Handlers[to] = h
|
|
}
|
|
|
|
// RemoveTransition removes a transition from transition table of the state machine
|
|
func (sm *SM) RemoveTransition(from string, to string) {
|
|
_, ok := sm.Transitions[from]
|
|
if !ok {
|
|
return
|
|
}
|
|
delete(sm.Transitions[from], to)
|
|
}
|
|
|
|
// Stop will set the desired state as "stopped" such that when next tranisition happen the state machine will stop handling the current job
|
|
// and the worker can release itself to the workerpool.
|
|
func (sm *SM) Stop(id int64) {
|
|
log.Debugf("Trying to stop the job: %d", id)
|
|
sm.lock.Lock()
|
|
defer sm.lock.Unlock()
|
|
//need to check if the sm switched to other job
|
|
if id == sm.JobID {
|
|
sm.desiredState = models.JobStopped
|
|
log.Debugf("Desired state of job %d is set to stopped", id)
|
|
} else {
|
|
log.Debugf("State machine has switched to job %d, so the action to stop job %d will be ignored", sm.JobID, id)
|
|
}
|
|
}
|
|
|
|
func (sm *SM) getDesiredState() string {
|
|
sm.lock.Lock()
|
|
defer sm.lock.Unlock()
|
|
return sm.desiredState
|
|
}
|
|
|
|
func (sm *SM) setDesiredState(s string) {
|
|
sm.lock.Lock()
|
|
defer sm.lock.Unlock()
|
|
sm.desiredState = s
|
|
}
|
|
|
|
// Init initialzie the state machine, it will be called once in the lifecycle of state machine.
|
|
func (sm *SM) Init() {
|
|
sm.lock = &sync.Mutex{}
|
|
sm.Handlers = make(map[string]StateHandler)
|
|
sm.Transitions = make(map[string]map[string]struct{})
|
|
sm.ForcedStates = map[string]struct{}{
|
|
models.JobError: struct{}{},
|
|
models.JobStopped: struct{}{},
|
|
models.JobCanceled: struct{}{},
|
|
}
|
|
}
|
|
|
|
// Reset resets the state machine so it will start handling another job.
|
|
func (sm *SM) Reset(jid int64) error {
|
|
//To ensure the new jobID is visible to the thread to stop the SM
|
|
sm.lock.Lock()
|
|
sm.JobID = jid
|
|
sm.desiredState = ""
|
|
sm.lock.Unlock()
|
|
|
|
//init parms
|
|
job, err := dao.GetRepJob(sm.JobID)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to get job, error: %v", err)
|
|
}
|
|
if job == nil {
|
|
return fmt.Errorf("The job doesn't exist in DB, job id: %d", sm.JobID)
|
|
}
|
|
policy, err := dao.GetRepPolicy(job.PolicyID)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to get policy, error: %v", err)
|
|
}
|
|
if policy == nil {
|
|
return fmt.Errorf("The policy doesn't exist in DB, policy id:%d", job.PolicyID)
|
|
}
|
|
sm.Parms = &RepJobParm{
|
|
LocalRegURL: config.LocalHarborURL(),
|
|
Repository: job.Repository,
|
|
Tags: job.TagList,
|
|
Enabled: policy.Enabled,
|
|
Operation: job.Operation,
|
|
}
|
|
if policy.Enabled == 0 {
|
|
//worker will cancel this job
|
|
return nil
|
|
}
|
|
target, err := dao.GetRepTarget(policy.TargetID)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to get target, error: %v", err)
|
|
}
|
|
if target == nil {
|
|
return fmt.Errorf("The target doesn't exist in DB, target id: %d", policy.TargetID)
|
|
}
|
|
sm.Parms.TargetURL = target.URL
|
|
sm.Parms.TargetUsername = target.Username
|
|
pwd := target.Password
|
|
|
|
if len(pwd) != 0 {
|
|
pwd, err = uti.ReversibleDecrypt(pwd)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to decrypt password: %v", err)
|
|
}
|
|
}
|
|
|
|
sm.Parms.TargetPassword = pwd
|
|
|
|
//init states handlers
|
|
sm.Logger = utils.NewLogger(sm.JobID)
|
|
sm.Handlers = make(map[string]StateHandler)
|
|
sm.Transitions = make(map[string]map[string]struct{})
|
|
sm.CurrentState = models.JobPending
|
|
|
|
sm.AddTransition(models.JobPending, models.JobRunning, StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobRunning})
|
|
sm.Handlers[models.JobError] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobError}
|
|
sm.Handlers[models.JobStopped] = StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobStopped}
|
|
|
|
switch sm.Parms.Operation {
|
|
case models.RepOpTransfer:
|
|
err = addImgTransferTransition(sm)
|
|
case models.RepOpDelete:
|
|
err = addImgDeleteTransition(sm)
|
|
default:
|
|
err = fmt.Errorf("unsupported operation: %s", sm.Parms.Operation)
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func addImgTransferTransition(sm *SM) error {
|
|
base, err := replication.InitBaseHandler(sm.Parms.Repository, sm.Parms.LocalRegURL, config.UISecret(),
|
|
sm.Parms.TargetURL, sm.Parms.TargetUsername, sm.Parms.TargetPassword,
|
|
sm.Parms.Tags, sm.Logger)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
sm.AddTransition(models.JobRunning, replication.StateCheck, &replication.Checker{BaseHandler: base})
|
|
sm.AddTransition(replication.StateCheck, replication.StatePullManifest, &replication.ManifestPuller{BaseHandler: base})
|
|
sm.AddTransition(replication.StatePullManifest, replication.StateTransferBlob, &replication.BlobTransfer{BaseHandler: base})
|
|
sm.AddTransition(replication.StatePullManifest, models.JobFinished, &StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobFinished})
|
|
sm.AddTransition(replication.StateTransferBlob, replication.StatePushManifest, &replication.ManifestPusher{BaseHandler: base})
|
|
sm.AddTransition(replication.StatePushManifest, replication.StatePullManifest, &replication.ManifestPuller{BaseHandler: base})
|
|
return nil
|
|
}
|
|
|
|
func addImgDeleteTransition(sm *SM) error {
|
|
deleter := replication.NewDeleter(sm.Parms.Repository, sm.Parms.Tags, sm.Parms.TargetURL,
|
|
sm.Parms.TargetUsername, sm.Parms.TargetPassword, sm.Logger)
|
|
|
|
sm.AddTransition(models.JobRunning, replication.StateDelete, deleter)
|
|
sm.AddTransition(replication.StateDelete, models.JobFinished, &StatusUpdater{DummyHandler{JobID: sm.JobID}, models.JobFinished})
|
|
|
|
return nil
|
|
}
|