2016-04-20 08:24:17 +02:00
package job
import (
"fmt"
2016-05-11 10:05:19 +02:00
"sync"
2016-04-20 08:24:17 +02:00
"github.com/vmware/harbor/dao"
2016-05-13 15:43:17 +02:00
"github.com/vmware/harbor/job/config"
2016-05-18 12:17:40 +02:00
"github.com/vmware/harbor/job/replication"
2016-05-11 10:05:19 +02:00
"github.com/vmware/harbor/job/utils"
2016-05-10 13:38:50 +02:00
"github.com/vmware/harbor/models"
2016-05-27 04:45:21 +02:00
uti "github.com/vmware/harbor/utils"
2016-04-20 08:24:17 +02:00
"github.com/vmware/harbor/utils/log"
)
2016-05-30 08:44:03 +02:00
// RepJobParm wraps the parm of a job
2016-05-10 13:38:50 +02:00
type RepJobParm struct {
LocalRegURL string
TargetURL string
TargetUsername string
TargetPassword string
Repository string
2016-05-27 04:45:21 +02:00
Tags [ ] string
2016-05-10 13:38:50 +02:00
Enabled int
Operation string
2016-04-20 08:24:17 +02:00
}
2016-05-30 08:44:03 +02:00
// SM is the state machine to handle job, it handles one job at a time.
type SM struct {
2016-04-20 08:24:17 +02:00
JobID int64
CurrentState string
PreviousState string
//The states that don't have to exist in transition map, such as "Error", "Canceled"
ForcedStates map [ string ] struct { }
Transitions map [ string ] map [ string ] struct { }
Handlers map [ string ] StateHandler
2016-05-03 09:51:52 +02:00
desiredState string
2016-05-23 13:39:13 +02:00
Logger * log . Logger
2016-05-10 13:38:50 +02:00
Parms * RepJobParm
lock * sync . Mutex
2016-04-20 08:24:17 +02:00
}
2016-05-30 08:44:03 +02:00
// EnterState transit the statemachine from the current state to the state in parameter.
2016-05-03 09:51:52 +02:00
// It returns the next state the statemachine should tranit to.
2016-05-30 08:44:03 +02:00
func ( sm * SM ) EnterState ( s string ) ( string , error ) {
2016-05-19 10:09:44 +02:00
log . Debugf ( "Job id: %d, transiting from State: %s, to State: %s" , sm . JobID , sm . CurrentState , s )
2016-04-20 08:24:17 +02:00
targets , ok := sm . Transitions [ sm . CurrentState ]
_ , exist := targets [ s ]
_ , isForced := sm . ForcedStates [ s ]
if ! exist && ! isForced {
2016-05-19 10:09:44 +02:00
return "" , fmt . Errorf ( "Job id: %d, transition from %s to %s does not exist!" , sm . JobID , sm . CurrentState , s )
2016-04-20 08:24:17 +02:00
}
exitHandler , ok := sm . Handlers [ sm . CurrentState ]
if ok {
if err := exitHandler . Exit ( ) ; err != nil {
2016-05-03 09:51:52 +02:00
return "" , err
2016-04-20 08:24:17 +02:00
}
} else {
2016-05-19 10:09:44 +02:00
log . Debugf ( "Job id: %d, no handler found for state:%s, skip" , sm . JobID , sm . CurrentState )
2016-04-20 08:24:17 +02:00
}
enterHandler , ok := sm . Handlers [ s ]
2016-05-30 08:44:03 +02:00
var next = models . JobContinue
2016-05-03 09:51:52 +02:00
var err error
2016-04-20 08:24:17 +02:00
if ok {
2016-05-03 09:51:52 +02:00
if next , err = enterHandler . Enter ( ) ; err != nil {
return "" , err
2016-04-20 08:24:17 +02:00
}
} else {
2016-05-19 10:09:44 +02:00
log . Debugf ( "Job id: %d, no handler found for state:%s, skip" , sm . JobID , s )
2016-04-20 08:24:17 +02:00
}
sm . PreviousState = sm . CurrentState
sm . CurrentState = s
2016-05-19 10:09:44 +02:00
log . Debugf ( "Job id: %d, transition succeeded, current state: %s" , sm . JobID , s )
2016-05-03 09:51:52 +02:00
return next , nil
}
// Start kicks off the statemachine to transit from current state to s, and moves on
// It will search the transit map if the next state is "_continue", and
// will enter error state if there's more than one possible path when next state is "_continue"
2016-05-30 08:44:03 +02:00
func ( sm * SM ) Start ( s string ) {
2016-05-03 09:51:52 +02:00
n , err := sm . EnterState ( s )
2016-05-19 10:09:44 +02:00
log . Debugf ( "Job id: %d, next state from handler: %s" , sm . JobID , n )
2016-05-03 09:51:52 +02:00
for len ( n ) > 0 && err == nil {
if d := sm . getDesiredState ( ) ; len ( d ) > 0 {
2016-05-19 10:09:44 +02:00
log . Debugf ( "Job id: %d. Desired state: %s, will ignore the next state from handler" , sm . JobID , d )
2016-05-03 09:51:52 +02:00
n = d
sm . setDesiredState ( "" )
continue
}
2016-05-10 13:38:50 +02:00
if n == models . JobContinue && len ( sm . Transitions [ sm . CurrentState ] ) == 1 {
2016-05-03 09:51:52 +02:00
for n = range sm . Transitions [ sm . CurrentState ] {
break
}
2016-05-19 10:09:44 +02:00
log . Debugf ( "Job id: %d, Continue to state: %s" , sm . JobID , n )
2016-05-03 09:51:52 +02:00
continue
}
2016-05-10 13:38:50 +02:00
if n == models . JobContinue && len ( sm . Transitions [ sm . CurrentState ] ) != 1 {
2016-05-19 10:09:44 +02:00
log . Errorf ( "Job id: %d, next state is continue but there are %d possible next states in transition table" , sm . JobID , len ( sm . Transitions [ sm . CurrentState ] ) )
2016-05-03 09:51:52 +02:00
err = fmt . Errorf ( "Unable to continue" )
break
}
n , err = sm . EnterState ( n )
2016-05-19 10:09:44 +02:00
log . Debugf ( "Job id: %d, next state from handler: %s" , sm . JobID , n )
2016-05-03 09:51:52 +02:00
}
if err != nil {
2016-05-19 10:09:44 +02:00
log . Warningf ( "Job id: %d, the statemachin will enter error state due to error: %v" , sm . JobID , err )
2016-05-10 13:38:50 +02:00
sm . EnterState ( models . JobError )
2016-05-03 09:51:52 +02:00
}
2016-04-20 08:24:17 +02:00
}
2016-05-30 08:44:03 +02:00
// AddTransition add a transition to the transition table of state machine, the handler is the handler of target state "to"
func ( sm * SM ) AddTransition ( from string , to string , h StateHandler ) {
2016-04-20 08:24:17 +02:00
_ , ok := sm . Transitions [ from ]
if ! ok {
sm . Transitions [ from ] = make ( map [ string ] struct { } )
}
sm . Transitions [ from ] [ to ] = struct { } { }
sm . Handlers [ to ] = h
}
2016-05-30 08:44:03 +02:00
// RemoveTransition removes a transition from transition table of the state machine
func ( sm * SM ) RemoveTransition ( from string , to string ) {
2016-04-20 08:24:17 +02:00
_ , ok := sm . Transitions [ from ]
if ! ok {
return
}
delete ( sm . Transitions [ from ] , to )
}
2016-05-30 08:44:03 +02:00
// Stop will set the desired state as "stopped" such that when next tranisition happen the state machine will stop handling the current job
// and the worker can release itself to the workerpool.
func ( sm * SM ) Stop ( id int64 ) {
2016-05-19 10:09:44 +02:00
log . Debugf ( "Trying to stop the job: %d" , id )
sm . lock . Lock ( )
defer sm . lock . Unlock ( )
//need to check if the sm switched to other job
if id == sm . JobID {
sm . desiredState = models . JobStopped
log . Debugf ( "Desired state of job %d is set to stopped" , id )
} else {
log . Debugf ( "State machine has switched to job %d, so the action to stop job %d will be ignored" , sm . JobID , id )
}
2016-05-03 09:51:52 +02:00
}
2016-05-30 08:44:03 +02:00
func ( sm * SM ) getDesiredState ( ) string {
2016-05-03 09:51:52 +02:00
sm . lock . Lock ( )
defer sm . lock . Unlock ( )
return sm . desiredState
}
2016-05-30 08:44:03 +02:00
func ( sm * SM ) setDesiredState ( s string ) {
2016-05-03 09:51:52 +02:00
sm . lock . Lock ( )
defer sm . lock . Unlock ( )
sm . desiredState = s
}
2016-05-30 08:44:03 +02:00
// Init initialzie the state machine, it will be called once in the lifecycle of state machine.
func ( sm * SM ) Init ( ) {
2016-05-13 15:43:17 +02:00
sm . lock = & sync . Mutex { }
sm . Handlers = make ( map [ string ] StateHandler )
sm . Transitions = make ( map [ string ] map [ string ] struct { } )
2016-05-19 10:09:44 +02:00
sm . ForcedStates = map [ string ] struct { } {
models . JobError : struct { } { } ,
models . JobStopped : struct { } { } ,
models . JobCanceled : struct { } { } ,
}
2016-05-13 15:43:17 +02:00
}
2016-05-19 10:09:44 +02:00
2016-05-30 08:44:03 +02:00
// Reset resets the state machine so it will start handling another job.
func ( sm * SM ) Reset ( jid int64 ) error {
2016-05-19 10:09:44 +02:00
//To ensure the new jobID is visible to the thread to stop the SM
sm . lock . Lock ( )
2016-05-13 15:43:17 +02:00
sm . JobID = jid
2016-05-19 10:09:44 +02:00
sm . desiredState = ""
sm . lock . Unlock ( )
2016-05-16 07:57:30 +02:00
2016-05-10 13:38:50 +02:00
//init parms
job , err := dao . GetRepJob ( sm . JobID )
if err != nil {
return fmt . Errorf ( "Failed to get job, error: %v" , err )
}
if job == nil {
return fmt . Errorf ( "The job doesn't exist in DB, job id: %d" , sm . JobID )
}
policy , err := dao . GetRepPolicy ( job . PolicyID )
if err != nil {
return fmt . Errorf ( "Failed to get policy, error: %v" , err )
}
if policy == nil {
return fmt . Errorf ( "The policy doesn't exist in DB, policy id:%d" , job . PolicyID )
}
sm . Parms = & RepJobParm {
2016-05-25 09:24:44 +02:00
LocalRegURL : config . LocalHarborURL ( ) ,
2016-05-10 13:38:50 +02:00
Repository : job . Repository ,
2016-05-27 04:45:21 +02:00
Tags : job . TagList ,
2016-05-10 13:38:50 +02:00
Enabled : policy . Enabled ,
Operation : job . Operation ,
}
if policy . Enabled == 0 {
2016-05-19 10:09:44 +02:00
//worker will cancel this job
2016-05-10 13:38:50 +02:00
return nil
}
target , err := dao . GetRepTarget ( policy . TargetID )
if err != nil {
return fmt . Errorf ( "Failed to get target, error: %v" , err )
}
if target == nil {
return fmt . Errorf ( "The target doesn't exist in DB, target id: %d" , policy . TargetID )
}
sm . Parms . TargetURL = target . URL
sm . Parms . TargetUsername = target . Username
2016-05-27 04:45:21 +02:00
pwd := target . Password
if len ( pwd ) != 0 {
pwd , err = uti . ReversibleDecrypt ( pwd )
if err != nil {
return fmt . Errorf ( "failed to decrypt password: %v" , err )
}
}
sm . Parms . TargetPassword = pwd
2016-05-10 13:38:50 +02:00
//init states handlers
2016-05-23 13:39:13 +02:00
sm . Logger = utils . NewLogger ( sm . JobID )
2016-05-27 09:04:20 +02:00
sm . Handlers = make ( map [ string ] StateHandler )
sm . Transitions = make ( map [ string ] map [ string ] struct { } )
2016-05-10 13:38:50 +02:00
sm . CurrentState = models . JobPending
2016-05-19 10:09:44 +02:00
sm . AddTransition ( models . JobPending , models . JobRunning , StatusUpdater { DummyHandler { JobID : sm . JobID } , models . JobRunning } )
sm . Handlers [ models . JobError ] = StatusUpdater { DummyHandler { JobID : sm . JobID } , models . JobError }
sm . Handlers [ models . JobStopped ] = StatusUpdater { DummyHandler { JobID : sm . JobID } , models . JobStopped }
2016-05-26 05:27:16 +02:00
switch sm . Parms . Operation {
case models . RepOpTransfer :
err = addImgTransferTransition ( sm )
case models . RepOpDelete :
err = addImgDeleteTransition ( sm )
default :
err = fmt . Errorf ( "unsupported operation: %s" , sm . Parms . Operation )
2016-05-10 13:38:50 +02:00
}
2016-05-26 05:27:16 +02:00
return err
2016-04-20 08:24:17 +02:00
}
2016-05-11 10:05:19 +02:00
2016-05-30 08:44:03 +02:00
func addImgTransferTransition ( sm * SM ) error {
2016-05-25 09:24:44 +02:00
base , err := replication . InitBaseHandler ( sm . Parms . Repository , sm . Parms . LocalRegURL , config . UISecret ( ) ,
2016-05-11 10:05:19 +02:00
sm . Parms . TargetURL , sm . Parms . TargetUsername , sm . Parms . TargetPassword ,
2016-05-27 04:45:21 +02:00
sm . Parms . Tags , sm . Logger )
2016-05-11 10:05:19 +02:00
if err != nil {
return err
}
2016-05-18 12:17:40 +02:00
sm . AddTransition ( models . JobRunning , replication . StateCheck , & replication . Checker { BaseHandler : base } )
sm . AddTransition ( replication . StateCheck , replication . StatePullManifest , & replication . ManifestPuller { BaseHandler : base } )
sm . AddTransition ( replication . StatePullManifest , replication . StateTransferBlob , & replication . BlobTransfer { BaseHandler : base } )
sm . AddTransition ( replication . StatePullManifest , models . JobFinished , & StatusUpdater { DummyHandler { JobID : sm . JobID } , models . JobFinished } )
sm . AddTransition ( replication . StateTransferBlob , replication . StatePushManifest , & replication . ManifestPusher { BaseHandler : base } )
sm . AddTransition ( replication . StatePushManifest , replication . StatePullManifest , & replication . ManifestPuller { BaseHandler : base } )
2016-05-11 10:05:19 +02:00
return nil
}
2016-05-26 05:27:16 +02:00
2016-05-30 08:44:03 +02:00
func addImgDeleteTransition ( sm * SM ) error {
2016-05-27 04:45:21 +02:00
deleter := replication . NewDeleter ( sm . Parms . Repository , sm . Parms . Tags , sm . Parms . TargetURL ,
2016-05-26 05:27:16 +02:00
sm . Parms . TargetUsername , sm . Parms . TargetPassword , sm . Logger )
sm . AddTransition ( models . JobRunning , replication . StateDelete , deleter )
sm . AddTransition ( replication . StateDelete , models . JobFinished , & StatusUpdater { DummyHandler { JobID : sm . JobID } , models . JobFinished } )
return nil
}