From 75e29782f6318bc65bc589ac61cfaee7e3822734 Mon Sep 17 00:00:00 2001 From: Steven Zou Date: Fri, 23 Mar 2018 17:26:41 +0800 Subject: [PATCH] Implement job context for harbor jobs initialize DAO get configurations from admin server --- src/jobservice_v2/config.yml | 3 + src/jobservice_v2/config/config.go | 24 +++++++ src/jobservice_v2/env/job_context.go | 8 ++- src/jobservice_v2/job/impl/context.go | 68 ++++++++++++++++--- src/jobservice_v2/job/impl/replication_job.go | 3 + src/jobservice_v2/logger/sweeper.go | 2 +- src/jobservice_v2/main.go | 22 ++++++ src/jobservice_v2/runtime/bootstrap.go | 23 +++++-- 8 files changed, 135 insertions(+), 18 deletions(-) diff --git a/src/jobservice_v2/config.yml b/src/jobservice_v2/config.yml index 49df5d4e0..be919b473 100644 --- a/src/jobservice_v2/config.yml +++ b/src/jobservice_v2/config.yml @@ -27,3 +27,6 @@ logger: level: "INFO" archive_period: 1 #days +#Admin server endpoint +admin_server: "http://10.160.178.186:9010/" + diff --git a/src/jobservice_v2/config/config.go b/src/jobservice_v2/config/config.go index 7dd0a498d..0a08c4df9 100644 --- a/src/jobservice_v2/config/config.go +++ b/src/jobservice_v2/config/config.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io/ioutil" + "net/url" "strconv" "strings" @@ -27,6 +28,8 @@ const ( jobServiceLoggerBasePath = "JOB_SERVICE_LOGGER_BASE_PATH" jobServiceLoggerLevel = "JOB_SERVICE_LOGGER_LEVEL" jobServiceLoggerArchivePeriod = "JOB_SERVICE_LOGGER_ARCHIVE_PERIOD" + jobServiceAdminServerEndpoint = "JOB_SERVICE_ADMIN_SERVER_ENDPOINT" + jobServiceAuthSecret = "JOBSERVICE_SECRET" //JobServiceProtocolHTTPS points to the 'https' protocol JobServiceProtocolHTTPS = "https" @@ -48,6 +51,8 @@ type Configuration struct { //Server listening port Port uint `yaml:"port"` + AdminServer string `yaml:"admin_server"` + //Additional config when using https HTTPSConfig *HTTPSConfig `yaml:"https_config,omitempty"` @@ -141,6 +146,16 @@ func GetLogArchivePeriod() uint { return 1 //return default } +//GetAuthSecret get the auth secret from the env +func GetAuthSecret() string { + return utils.ReadEnv(jobServiceAuthSecret) +} + +//GetAdminServerEndpoint return the admin server endpoint +func GetAdminServerEndpoint() string { + return DefaultConfig.AdminServer +} + //Load env variables func (c *Configuration) loadEnvs() { prot := utils.ReadEnv(jobServiceProtocol) @@ -251,6 +266,11 @@ func (c *Configuration) loadEnvs() { } } + //admin server + if adminServer := utils.ReadEnv(jobServiceAdminServerEndpoint); !utils.IsEmptyStr(adminServer) { + c.AdminServer = adminServer + } + } //Check if the configurations are valid settings. @@ -321,5 +341,9 @@ func (c *Configuration) validate() error { return fmt.Errorf("logger archive period should be greater than 0") } + if _, err := url.Parse(c.AdminServer); err != nil { + return fmt.Errorf("invalid admin server endpoint: %s", err) + } + return nil //valid } diff --git a/src/jobservice_v2/env/job_context.go b/src/jobservice_v2/env/job_context.go index 93d4e8e36..d9fc51577 100644 --- a/src/jobservice_v2/env/job_context.go +++ b/src/jobservice_v2/env/job_context.go @@ -26,8 +26,9 @@ type JobContext interface { //prop string : key of the context property // //Returns: - // The data of the specified context property - Get(prop string) interface{} + // The data of the specified context property if have + // bool to indicate if the property existing + Get(prop string) (interface{}, bool) //SystemContext returns the system context // @@ -61,3 +62,6 @@ type JobData struct { Args map[string]interface{} ExtraData map[string]interface{} } + +//JobContextInitializer is a func to initialize the concrete job context +type JobContextInitializer func(ctx *Context) (JobContext, error) diff --git a/src/jobservice_v2/job/impl/context.go b/src/jobservice_v2/job/impl/context.go index e46793afa..8cdb408bd 100644 --- a/src/jobservice_v2/job/impl/context.go +++ b/src/jobservice_v2/job/impl/context.go @@ -8,6 +8,10 @@ import ( "fmt" "reflect" + "github.com/vmware/harbor/src/adminserver/client" + "github.com/vmware/harbor/src/common" + "github.com/vmware/harbor/src/common/dao" + "github.com/vmware/harbor/src/common/models" "github.com/vmware/harbor/src/jobservice_v2/config" "github.com/vmware/harbor/src/jobservice_v2/env" "github.com/vmware/harbor/src/jobservice_v2/job" @@ -28,25 +32,58 @@ type Context struct { //checkin func checkInFunc job.CheckInFunc + + //other required information + properties map[string]interface{} + + //admin server client + adminClient client.Client } //NewContext ... -func NewContext(sysCtx context.Context) *Context { +func NewContext(sysCtx context.Context, adminClient client.Client) *Context { return &Context{ - sysContext: sysCtx, + sysContext: sysCtx, + adminClient: adminClient, + properties: make(map[string]interface{}), } } -//InitDao ... -func (c *Context) InitDao() error { - return nil +//Init ... +func (c *Context) Init() error { + configs, err := c.adminClient.GetCfgs() + if err != nil { + return err + } + + db := getDBFromConfig(configs) + + return dao.InitDatabase(db) } //Build implements the same method in env.JobContext interface //This func will build the job execution context before running func (c *Context) Build(dep env.JobData) (env.JobContext, error) { jContext := &Context{ - sysContext: c.sysContext, + sysContext: c.sysContext, + adminClient: c.adminClient, + properties: make(map[string]interface{}), + } + + //Copy properties + if len(c.properties) > 0 { + for k, v := range c.properties { + jContext.properties[k] = v + } + } + + //Refresh admin server properties + props, err := c.adminClient.GetCfgs() + if err != nil { + return nil, err + } + for k, v := range props { + jContext.properties[k] = v } //Init logger here @@ -83,8 +120,9 @@ func (c *Context) Build(dep env.JobData) (env.JobContext, error) { } //Get implements the same method in env.JobContext interface -func (c *Context) Get(prop string) interface{} { - return nil +func (c *Context) Get(prop string) (interface{}, bool) { + v, ok := c.properties[prop] + return v, ok } //SystemContext implements the same method in env.JobContext interface @@ -116,3 +154,17 @@ func (c *Context) OPCommand() (string, bool) { func (c *Context) GetLogger() logger.Interface { return c.logger } + +func getDBFromConfig(cfg map[string]interface{}) *models.Database { + database := &models.Database{} + database.Type = cfg[common.DatabaseType].(string) + mysql := &models.MySQL{} + mysql.Host = cfg[common.MySQLHost].(string) + mysql.Port = int(cfg[common.MySQLPort].(float64)) + mysql.Username = cfg[common.MySQLUsername].(string) + mysql.Password = cfg[common.MySQLPassword].(string) + mysql.Database = cfg[common.MySQLDatabase].(string) + database.MySQL = mysql + + return database +} diff --git a/src/jobservice_v2/job/impl/replication_job.go b/src/jobservice_v2/job/impl/replication_job.go index f82d30428..24706e9cd 100644 --- a/src/jobservice_v2/job/impl/replication_job.go +++ b/src/jobservice_v2/job/impl/replication_job.go @@ -56,6 +56,9 @@ func (rj *ReplicationJob) Run(ctx env.JobContext, params map[string]interface{}) logger.Info("=======Replication job running=======") logger.Infof("params: %#v\n", params) logger.Infof("context: %#v\n", ctx) + if v, ok := ctx.Get("email_from"); ok { + fmt.Printf("Get prop form context: email_from=%s\n", v) + } /*if 1 != 0 { return errors.New("I suicide") diff --git a/src/jobservice_v2/logger/sweeper.go b/src/jobservice_v2/logger/sweeper.go index a4a9ca0e0..34276eb72 100644 --- a/src/jobservice_v2/logger/sweeper.go +++ b/src/jobservice_v2/logger/sweeper.go @@ -81,7 +81,7 @@ func (s *Sweeper) clear() { } for _, logFile := range logFiles { - if logFile.ModTime().Add(oneDay * time.Second).Before(time.Now()) { + if logFile.ModTime().Add(time.Duration(s.period*oneDay) * time.Second).Before(time.Now()) { logFilePath := fmt.Sprintf("%s%s%s", s.workDir, string(os.PathSeparator), logFile.Name()) if err := os.Remove(logFilePath); err == nil { cleared++ diff --git a/src/jobservice_v2/main.go b/src/jobservice_v2/main.go index 0f3bf6bab..a233a7342 100644 --- a/src/jobservice_v2/main.go +++ b/src/jobservice_v2/main.go @@ -1,9 +1,14 @@ package main import ( + "errors" "flag" "fmt" + "github.com/vmware/harbor/src/adminserver/client" + "github.com/vmware/harbor/src/jobservice_v2/config" + "github.com/vmware/harbor/src/jobservice_v2/env" + "github.com/vmware/harbor/src/jobservice_v2/job/impl" "github.com/vmware/harbor/src/jobservice_v2/runtime" "github.com/vmware/harbor/src/jobservice_v2/utils" ) @@ -20,6 +25,23 @@ func main() { return } + //Set job context initializer + runtime.JobService.SetJobContextInitializer(func(ctx *env.Context) (env.JobContext, error) { + secret := config.GetAuthSecret() + if utils.IsEmptyStr(secret) { + return nil, errors.New("empty auth secret") + } + + adminClient := client.NewClient(config.GetAdminServerEndpoint(), &client.Config{Secret: secret}) + jobCtx := impl.NewContext(ctx.SystemContext, adminClient) + + if err := jobCtx.Init(); err != nil { + return nil, err + } + + return jobCtx, nil + }) + //Start runtime.JobService.LoadAndRun(*configPath, true) } diff --git a/src/jobservice_v2/runtime/bootstrap.go b/src/jobservice_v2/runtime/bootstrap.go index 3e09a5b51..197b0e672 100644 --- a/src/jobservice_v2/runtime/bootstrap.go +++ b/src/jobservice_v2/runtime/bootstrap.go @@ -26,7 +26,16 @@ import ( var JobService = &Bootstrap{} //Bootstrap is coordinating process to help load and start the other components to serve. -type Bootstrap struct{} +type Bootstrap struct { + jobConextInitializer env.JobContextInitializer +} + +//SetJobContextInitializer set the job context initializer +func (bs *Bootstrap) SetJobContextInitializer(initializer env.JobContextInitializer) { + if initializer != nil { + bs.jobConextInitializer = initializer + } +} //LoadAndRun will load configurations, initialize components and then start the related process to serve requests. //Return error if meet any problems. @@ -48,13 +57,13 @@ func (bs *Bootstrap) LoadAndRun(configFile string, detectEnv bool) { } //Build specified job context - //TODO: - jobCtx := impl.NewContext(ctx) - if err := jobCtx.InitDao(); err != nil { - log.Errorf("Failed to build job conetxt with error: %s\n", err) - return + if bs.jobConextInitializer != nil { + if jobCtx, err := bs.jobConextInitializer(rootContext); err == nil { + rootContext.JobContext = jobCtx + } else { + log.Fatalf("Failed to initialize job context: %s\n", err) + } } - rootContext.JobContext = jobCtx //Start the pool var backendPool pool.Interface