Implement job context for harbor jobs

initialize DAO
get configurations from admin server
This commit is contained in:
Steven Zou 2018-03-23 17:26:41 +08:00
parent e160804be2
commit 75e29782f6
8 changed files with 135 additions and 18 deletions

View File

@ -27,3 +27,6 @@ logger:
level: "INFO"
archive_period: 1 #days
#Admin server endpoint
admin_server: "http://10.160.178.186:9010/"

View File

@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io/ioutil"
"net/url"
"strconv"
"strings"
@ -27,6 +28,8 @@ const (
jobServiceLoggerBasePath = "JOB_SERVICE_LOGGER_BASE_PATH"
jobServiceLoggerLevel = "JOB_SERVICE_LOGGER_LEVEL"
jobServiceLoggerArchivePeriod = "JOB_SERVICE_LOGGER_ARCHIVE_PERIOD"
jobServiceAdminServerEndpoint = "JOB_SERVICE_ADMIN_SERVER_ENDPOINT"
jobServiceAuthSecret = "JOBSERVICE_SECRET"
//JobServiceProtocolHTTPS points to the 'https' protocol
JobServiceProtocolHTTPS = "https"
@ -48,6 +51,8 @@ type Configuration struct {
//Server listening port
Port uint `yaml:"port"`
AdminServer string `yaml:"admin_server"`
//Additional config when using https
HTTPSConfig *HTTPSConfig `yaml:"https_config,omitempty"`
@ -141,6 +146,16 @@ func GetLogArchivePeriod() uint {
return 1 //return default
}
//GetAuthSecret get the auth secret from the env
func GetAuthSecret() string {
return utils.ReadEnv(jobServiceAuthSecret)
}
//GetAdminServerEndpoint return the admin server endpoint
func GetAdminServerEndpoint() string {
return DefaultConfig.AdminServer
}
//Load env variables
func (c *Configuration) loadEnvs() {
prot := utils.ReadEnv(jobServiceProtocol)
@ -251,6 +266,11 @@ func (c *Configuration) loadEnvs() {
}
}
//admin server
if adminServer := utils.ReadEnv(jobServiceAdminServerEndpoint); !utils.IsEmptyStr(adminServer) {
c.AdminServer = adminServer
}
}
//Check if the configurations are valid settings.
@ -321,5 +341,9 @@ func (c *Configuration) validate() error {
return fmt.Errorf("logger archive period should be greater than 0")
}
if _, err := url.Parse(c.AdminServer); err != nil {
return fmt.Errorf("invalid admin server endpoint: %s", err)
}
return nil //valid
}

View File

@ -26,8 +26,9 @@ type JobContext interface {
//prop string : key of the context property
//
//Returns:
// The data of the specified context property
Get(prop string) interface{}
// The data of the specified context property if have
// bool to indicate if the property existing
Get(prop string) (interface{}, bool)
//SystemContext returns the system context
//
@ -61,3 +62,6 @@ type JobData struct {
Args map[string]interface{}
ExtraData map[string]interface{}
}
//JobContextInitializer is a func to initialize the concrete job context
type JobContextInitializer func(ctx *Context) (JobContext, error)

View File

@ -8,6 +8,10 @@ import (
"fmt"
"reflect"
"github.com/vmware/harbor/src/adminserver/client"
"github.com/vmware/harbor/src/common"
"github.com/vmware/harbor/src/common/dao"
"github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/jobservice_v2/config"
"github.com/vmware/harbor/src/jobservice_v2/env"
"github.com/vmware/harbor/src/jobservice_v2/job"
@ -28,25 +32,58 @@ type Context struct {
//checkin func
checkInFunc job.CheckInFunc
//other required information
properties map[string]interface{}
//admin server client
adminClient client.Client
}
//NewContext ...
func NewContext(sysCtx context.Context) *Context {
func NewContext(sysCtx context.Context, adminClient client.Client) *Context {
return &Context{
sysContext: sysCtx,
sysContext: sysCtx,
adminClient: adminClient,
properties: make(map[string]interface{}),
}
}
//InitDao ...
func (c *Context) InitDao() error {
return nil
//Init ...
func (c *Context) Init() error {
configs, err := c.adminClient.GetCfgs()
if err != nil {
return err
}
db := getDBFromConfig(configs)
return dao.InitDatabase(db)
}
//Build implements the same method in env.JobContext interface
//This func will build the job execution context before running
func (c *Context) Build(dep env.JobData) (env.JobContext, error) {
jContext := &Context{
sysContext: c.sysContext,
sysContext: c.sysContext,
adminClient: c.adminClient,
properties: make(map[string]interface{}),
}
//Copy properties
if len(c.properties) > 0 {
for k, v := range c.properties {
jContext.properties[k] = v
}
}
//Refresh admin server properties
props, err := c.adminClient.GetCfgs()
if err != nil {
return nil, err
}
for k, v := range props {
jContext.properties[k] = v
}
//Init logger here
@ -83,8 +120,9 @@ func (c *Context) Build(dep env.JobData) (env.JobContext, error) {
}
//Get implements the same method in env.JobContext interface
func (c *Context) Get(prop string) interface{} {
return nil
func (c *Context) Get(prop string) (interface{}, bool) {
v, ok := c.properties[prop]
return v, ok
}
//SystemContext implements the same method in env.JobContext interface
@ -116,3 +154,17 @@ func (c *Context) OPCommand() (string, bool) {
func (c *Context) GetLogger() logger.Interface {
return c.logger
}
func getDBFromConfig(cfg map[string]interface{}) *models.Database {
database := &models.Database{}
database.Type = cfg[common.DatabaseType].(string)
mysql := &models.MySQL{}
mysql.Host = cfg[common.MySQLHost].(string)
mysql.Port = int(cfg[common.MySQLPort].(float64))
mysql.Username = cfg[common.MySQLUsername].(string)
mysql.Password = cfg[common.MySQLPassword].(string)
mysql.Database = cfg[common.MySQLDatabase].(string)
database.MySQL = mysql
return database
}

View File

@ -56,6 +56,9 @@ func (rj *ReplicationJob) Run(ctx env.JobContext, params map[string]interface{})
logger.Info("=======Replication job running=======")
logger.Infof("params: %#v\n", params)
logger.Infof("context: %#v\n", ctx)
if v, ok := ctx.Get("email_from"); ok {
fmt.Printf("Get prop form context: email_from=%s\n", v)
}
/*if 1 != 0 {
return errors.New("I suicide")

View File

@ -81,7 +81,7 @@ func (s *Sweeper) clear() {
}
for _, logFile := range logFiles {
if logFile.ModTime().Add(oneDay * time.Second).Before(time.Now()) {
if logFile.ModTime().Add(time.Duration(s.period*oneDay) * time.Second).Before(time.Now()) {
logFilePath := fmt.Sprintf("%s%s%s", s.workDir, string(os.PathSeparator), logFile.Name())
if err := os.Remove(logFilePath); err == nil {
cleared++

View File

@ -1,9 +1,14 @@
package main
import (
"errors"
"flag"
"fmt"
"github.com/vmware/harbor/src/adminserver/client"
"github.com/vmware/harbor/src/jobservice_v2/config"
"github.com/vmware/harbor/src/jobservice_v2/env"
"github.com/vmware/harbor/src/jobservice_v2/job/impl"
"github.com/vmware/harbor/src/jobservice_v2/runtime"
"github.com/vmware/harbor/src/jobservice_v2/utils"
)
@ -20,6 +25,23 @@ func main() {
return
}
//Set job context initializer
runtime.JobService.SetJobContextInitializer(func(ctx *env.Context) (env.JobContext, error) {
secret := config.GetAuthSecret()
if utils.IsEmptyStr(secret) {
return nil, errors.New("empty auth secret")
}
adminClient := client.NewClient(config.GetAdminServerEndpoint(), &client.Config{Secret: secret})
jobCtx := impl.NewContext(ctx.SystemContext, adminClient)
if err := jobCtx.Init(); err != nil {
return nil, err
}
return jobCtx, nil
})
//Start
runtime.JobService.LoadAndRun(*configPath, true)
}

View File

@ -26,7 +26,16 @@ import (
var JobService = &Bootstrap{}
//Bootstrap is coordinating process to help load and start the other components to serve.
type Bootstrap struct{}
type Bootstrap struct {
jobConextInitializer env.JobContextInitializer
}
//SetJobContextInitializer set the job context initializer
func (bs *Bootstrap) SetJobContextInitializer(initializer env.JobContextInitializer) {
if initializer != nil {
bs.jobConextInitializer = initializer
}
}
//LoadAndRun will load configurations, initialize components and then start the related process to serve requests.
//Return error if meet any problems.
@ -48,13 +57,13 @@ func (bs *Bootstrap) LoadAndRun(configFile string, detectEnv bool) {
}
//Build specified job context
//TODO:
jobCtx := impl.NewContext(ctx)
if err := jobCtx.InitDao(); err != nil {
log.Errorf("Failed to build job conetxt with error: %s\n", err)
return
if bs.jobConextInitializer != nil {
if jobCtx, err := bs.jobConextInitializer(rootContext); err == nil {
rootContext.JobContext = jobCtx
} else {
log.Fatalf("Failed to initialize job context: %s\n", err)
}
}
rootContext.JobContext = jobCtx
//Start the pool
var backendPool pool.Interface