Refactor redis pool and job stats manager for clarifying functions

also add UT cases for logger and part of opm package

Fix config test case issue
This commit is contained in:
Steven Zou 2018-03-28 11:24:59 +08:00
parent 0918a0ac7e
commit b943bed6e6
10 changed files with 329 additions and 158 deletions

View File

@ -82,8 +82,8 @@ func TestDefaultConfig(t *testing.T) {
t.Errorf("Load config from yaml file, expect nil error but got error '%s'\n", err)
}
if endpoint := GetAdminServerEndpoint(); endpoint != "http://localhost:9010/" {
t.Errorf("expect default admin server endpoint 'http://localhost:9010/' but got '%s'\n", endpoint)
if endpoint := GetAdminServerEndpoint(); endpoint != "http://127.0.0.1:8888/" {
t.Errorf("expect default admin server endpoint 'http://127.0.0.1:8888/' but got '%s'\n", endpoint)
}
if basePath := GetLogBasePath(); basePath != "/tmp/job_logs" {

View File

@ -28,5 +28,5 @@ logger:
archive_period: 1 #days
#Admin server endpoint
admin_server: "http://localhost:9010/"
admin_server: "http://127.0.0.1:8888/"

View File

@ -0,0 +1,75 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package logger
import (
"fmt"
"testing"
)
func TestServiceLogger(t *testing.T) {
testingLogger := &fakeLogger{}
SetLogger(testingLogger)
Debug("DEBUG")
Debugf("%s\n", "DEBUGF")
Info("INFO")
Infof("%s\n", "INFOF")
Warning("WARNING")
Warningf("%s\n", "WARNINGF")
Error("ERROR")
Errorf("%s\n", "ERRORF")
Fatal("FATAL")
Fatalf("%s\n", "FATALF")
}
type fakeLogger struct{}
//For debuging
func (fl *fakeLogger) Debug(v ...interface{}) {
fmt.Println(v...)
}
//For debuging with format
func (fl *fakeLogger) Debugf(format string, v ...interface{}) {
fmt.Printf(format, v...)
}
//For logging info
func (fl *fakeLogger) Info(v ...interface{}) {
fmt.Println(v...)
}
//For logging info with format
func (fl *fakeLogger) Infof(format string, v ...interface{}) {
fmt.Printf(format, v...)
}
//For warning
func (fl *fakeLogger) Warning(v ...interface{}) {
fmt.Println(v...)
}
//For warning with format
func (fl *fakeLogger) Warningf(format string, v ...interface{}) {
fmt.Printf(format, v...)
}
//For logging error
func (fl *fakeLogger) Error(v ...interface{}) {
fmt.Println(v...)
}
//For logging error with format
func (fl *fakeLogger) Errorf(format string, v ...interface{}) {
fmt.Printf(format, v...)
}
//For fatal error
func (fl *fakeLogger) Fatal(v ...interface{}) {
fmt.Println(v...)
}
//For fatal error with error
func (fl *fakeLogger) Fatalf(format string, v ...interface{}) {
fmt.Printf(format, v...)
}

View File

@ -0,0 +1,36 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package logger
import (
"context"
"fmt"
"os"
"testing"
"time"
)
func TestSweeper(t *testing.T) {
workDir := "/tmp/sweeper_logs"
if err := os.MkdirAll(workDir, 0755); err != nil {
t.Error(err)
}
_, err := os.Create(fmt.Sprintf("%s/sweeper_test.log", workDir))
if err != nil {
t.Error(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sweeper := NewSweeper(ctx, workDir, 1)
sweeper.Start()
<-time.After(100 * time.Millisecond)
if err := os.Remove(fmt.Sprintf("%s/sweeper_test.log", workDir)); err != nil {
t.Error(err)
}
if err := os.Remove(workDir); err != nil {
t.Error(err)
}
}

View File

@ -0,0 +1,42 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package opm
import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
"github.com/vmware/harbor/src/jobservice_v2/models"
)
func TestHookClient(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "ok")
}))
defer ts.Close()
err := DefaultHookClient.ReportStatus(ts.URL, models.JobStatusChange{
JobID: "fake_job_ID",
Status: "running",
})
if err != nil {
t.Error(err)
}
}
func TestReportStatusFailed(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("failed"))
}))
defer ts.Close()
err := DefaultHookClient.ReportStatus(ts.URL, models.JobStatusChange{
JobID: "fake_job_ID",
Status: "running",
})
if err == nil {
t.Error("expect error but got nil")
}
}

View File

@ -0,0 +1,19 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package opm
import "testing"
func TestHookStore(t *testing.T) {
store := NewHookStore()
reportURL := "http://localhost:9090/report"
store.Add("id_1", reportURL)
url, ok := store.Get("id_1")
if !ok || url != reportURL {
t.Errorf("expect hook url '%s' but got '%s'", reportURL, url)
}
u, ok := store.Remove("id_1")
if !ok || u != reportURL {
t.Errorf("expect deleted '%s' but failed to do", reportURL)
}
}

View File

@ -30,29 +30,14 @@ type JobStatsManager interface {
//Async method to retry
SetJobStatus(jobID string, status string)
//Stop the job
//Send command fro the specified job
//
//jobID string : ID of the being stopped job
//jobID string : ID of the being retried job
//command string : the command applied to the job like stop/cancel
//
//Returns:
// error if meet any problems
Stop(jobID string) error
//Cancel the job
//
//jobID string : ID of the being cancelled job
//
//Returns:
// error if meet any problems
Cancel(jobID string) error
//Retry the job
//
//jobID string : ID of the being retried job
//
//Returns:
// error if meet any problems
Retry(jobID string) error
// error if it was not successfully sent
SendCommand(jobID string, command string) error
//CtlCommand checks if control command is fired for the specified job.
//
@ -86,4 +71,12 @@ type JobStatsManager interface {
//Returns:
// error if meet any problems
RegisterHook(jobID string, hookURL string, isCached bool) error
//Mark the periodic job stats expired
//
//jobID string : ID of job
//
//Returns:
// error if meet any problems
ExpirePeriodicJobStats(jobID string) error
}

View File

@ -17,8 +17,6 @@ import (
"github.com/vmware/harbor/src/jobservice_v2/period"
"github.com/robfig/cron"
"github.com/gocraft/work"
"github.com/garyburd/redigo/redis"
@ -43,9 +41,6 @@ const (
//CtlCommandRetry : command retry
CtlCommandRetry = "retry"
//Copy from period.enqueuer
periodicEnqueuerHorizon = 4 * time.Minute
//EventRegisterStatusHook is event name of registering hook
EventRegisterStatusHook = "register_hook"
)
@ -99,6 +94,10 @@ func (rjs *RedisJobStatsManager) Start() {
//Shutdown is implementation of same method in JobStatsManager interface.
func (rjs *RedisJobStatsManager) Shutdown() {
defer func() {
rjs.isRunning = false
}()
if !rjs.isRunning {
return
}
@ -206,102 +205,17 @@ func (rjs *RedisJobStatsManager) loop() {
}
}
//Stop the specified job.
func (rjs *RedisJobStatsManager) Stop(jobID string) error {
//SendCommand for the specified job
func (rjs *RedisJobStatsManager) SendCommand(jobID string, command string) error {
if utils.IsEmptyStr(jobID) {
return errors.New("empty job ID")
}
theJob, err := rjs.getJobStats(jobID)
if err != nil {
return err
if command != CtlCommandStop && command != CtlCommandCancel {
return errors.New("unknown command")
}
switch theJob.Stats.JobKind {
case job.JobKindGeneric:
//nothing need to do
case job.JobKindScheduled:
//we need to delete the scheduled job in the queue if it is not running yet
//otherwise, nothing need to do
if theJob.Stats.Status == job.JobStatusScheduled {
if err := rjs.client.DeleteScheduledJob(theJob.Stats.RunAt, jobID); err != nil {
return err
}
}
case job.JobKindPeriodic:
//firstly delete the periodic job policy
if err := rjs.scheduler.UnSchedule(jobID); err != nil {
return err
}
//secondly we need try to delete the job instances scheduled for this periodic job, a try best action
rjs.deleteScheduledJobsOfPeriodicPolicy(theJob.Stats.JobID, theJob.Stats.CronSpec) //ignore error as we have logged
//thirdly expire the job stats of this periodic job if exists
if err := rjs.expirePeriodicJobStats(theJob.Stats.JobID); err != nil {
//only logged
logger.Errorf("Expire the stats of job %s failed with error: %s\n", theJob.Stats.JobID, err)
}
default:
break
}
//Check if the job has 'running' instance
if theJob.Stats.Status == job.JobStatusRunning {
//Send 'stop' ctl command to the running instance
if err := rjs.writeCtlCommand(jobID, CtlCommandStop); err != nil {
return err
}
}
return nil
}
//Cancel the specified job.
//Async method, not blocking
func (rjs *RedisJobStatsManager) Cancel(jobID string) error {
if utils.IsEmptyStr(jobID) {
return errors.New("empty job ID")
}
theJob, err := rjs.getJobStats(jobID)
if err != nil {
return err
}
switch theJob.Stats.JobKind {
case job.JobKindGeneric:
if theJob.Stats.Status != job.JobStatusRunning {
return fmt.Errorf("only running job can be cancelled, job '%s' seems not running now", theJob.Stats.JobID)
}
//Send 'cancel' ctl command to the running instance
if err := rjs.writeCtlCommand(jobID, CtlCommandCancel); err != nil {
return err
}
break
default:
return fmt.Errorf("job kind '%s' does not support 'cancel' operation", theJob.Stats.JobKind)
}
return nil
}
//Retry the specified job.
//Async method, not blocking
func (rjs *RedisJobStatsManager) Retry(jobID string) error {
if utils.IsEmptyStr(jobID) {
return errors.New("empty job ID")
}
theJob, err := rjs.getJobStats(jobID)
if err != nil {
return err
}
if theJob.Stats.DieAt == 0 {
return fmt.Errorf("job '%s' is not a retryable job", jobID)
}
return rjs.client.RetryDeadJob(theJob.Stats.DieAt, jobID)
return rjs.writeCtlCommand(jobID, command)
}
//CheckIn mesage
@ -363,6 +277,20 @@ func (rjs *RedisJobStatsManager) RegisterHook(jobID string, hookURL string, isCa
return nil
}
//ExpirePeriodicJobStats marks the periodic job stats expired
func (rjs *RedisJobStatsManager) ExpirePeriodicJobStats(jobID string) error {
conn := rjs.redisPool.Get()
defer conn.Close()
//The periodic job (policy) is stopped/unscheduled and then
//the stats of periodic job now can be expired
key := utils.KeyJobStats(rjs.namespace, jobID)
expireTime := 24 * 60 * 60 //1 day
_, err := conn.Do("EXPIRE", key, expireTime)
return err
}
func (rjs *RedisJobStatsManager) submitStatusReportingItem(jobID string, status, checkIn string) {
//Let it run in a separate goroutine to avoid waiting more time
go func() {
@ -402,43 +330,6 @@ func (rjs *RedisJobStatsManager) reportStatus(jobID string, hookURL, status, che
return DefaultHookClient.ReportStatus(hookURL, reportingStatus)
}
func (rjs *RedisJobStatsManager) expirePeriodicJobStats(jobID string) error {
conn := rjs.redisPool.Get()
defer conn.Close()
//The periodic job (policy) is stopped/unscheduled and then
//the stats of periodic job now can be expired
key := utils.KeyJobStats(rjs.namespace, jobID)
expireTime := 24 * 60 * 60 //1 day
_, err := conn.Do("EXPIRE", key, expireTime)
return err
}
func (rjs *RedisJobStatsManager) deleteScheduledJobsOfPeriodicPolicy(policyID string, cronSpec string) error {
schedule, err := cron.Parse(cronSpec)
if err != nil {
logger.Errorf("cron spec '%s' is not valid", cronSpec)
return err
}
now := utils.NowEpochSeconds()
nowTime := time.Unix(now, 0)
horizon := nowTime.Add(periodicEnqueuerHorizon)
//try to delete more
//return the last error if occurred
for t := schedule.Next(nowTime); t.Before(horizon); t = schedule.Next(t) {
epoch := t.Unix()
if err = rjs.client.DeleteScheduledJob(epoch, policyID); err != nil {
//only logged
logger.Warningf("delete scheduled instance for periodic job %s failed with error: %s\n", policyID, err)
}
}
return err
}
func (rjs *RedisJobStatsManager) getCrlCommand(jobID string) (string, error) {
conn := rjs.redisPool.Get()
defer conn.Close()

View File

@ -0,0 +1,6 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package opm
import "testing"
func TestRetrieveJob(t *testing.T) {}

View File

@ -9,6 +9,7 @@ import (
"github.com/garyburd/redigo/redis"
"github.com/gocraft/work"
"github.com/robfig/cron"
"github.com/vmware/harbor/src/jobservice_v2/env"
"github.com/vmware/harbor/src/jobservice_v2/job"
"github.com/vmware/harbor/src/jobservice_v2/logger"
@ -25,6 +26,9 @@ var (
const (
workerPoolStatusHealthy = "Healthy"
workerPoolStatusDead = "Dead"
//Copy from period.enqueuer
periodicEnqueuerHorizon = 4 * time.Minute
)
//GoCraftWorkPool is the pool implementation based on gocraft/work powered by redis.
@ -352,17 +356,98 @@ func (gcwp *GoCraftWorkPool) Stats() (models.JobPoolStats, error) {
//StopJob will stop the job
func (gcwp *GoCraftWorkPool) StopJob(jobID string) error {
return gcwp.statsManager.Stop(jobID)
if utils.IsEmptyStr(jobID) {
return errors.New("empty job ID")
}
theJob, err := gcwp.statsManager.Retrieve(jobID)
if err != nil {
return err
}
switch theJob.Stats.JobKind {
case job.JobKindGeneric:
//nothing need to do
case job.JobKindScheduled:
//we need to delete the scheduled job in the queue if it is not running yet
//otherwise, nothing need to do
if theJob.Stats.Status == job.JobStatusScheduled {
if err := gcwp.client.DeleteScheduledJob(theJob.Stats.RunAt, jobID); err != nil {
return err
}
}
case job.JobKindPeriodic:
//firstly delete the periodic job policy
if err := gcwp.scheduler.UnSchedule(jobID); err != nil {
return err
}
//secondly we need try to delete the job instances scheduled for this periodic job, a try best action
gcwp.deleteScheduledJobsOfPeriodicPolicy(theJob.Stats.JobID, theJob.Stats.CronSpec) //ignore error as we have logged
//thirdly expire the job stats of this periodic job if exists
if err := gcwp.statsManager.ExpirePeriodicJobStats(theJob.Stats.JobID); err != nil {
//only logged
logger.Errorf("Expire the stats of job %s failed with error: %s\n", theJob.Stats.JobID, err)
}
default:
break
}
//Check if the job has 'running' instance
if theJob.Stats.Status == job.JobStatusRunning {
//Send 'stop' ctl command to the running instance
if err := gcwp.statsManager.SendCommand(jobID, opm.CtlCommandStop); err != nil {
return err
}
}
return nil
}
//CancelJob will cancel the job
func (gcwp *GoCraftWorkPool) CancelJob(jobID string) error {
return gcwp.statsManager.Cancel(jobID)
if utils.IsEmptyStr(jobID) {
return errors.New("empty job ID")
}
theJob, err := gcwp.statsManager.Retrieve(jobID)
if err != nil {
return err
}
switch theJob.Stats.JobKind {
case job.JobKindGeneric:
if theJob.Stats.Status != job.JobStatusRunning {
return fmt.Errorf("only running job can be cancelled, job '%s' seems not running now", theJob.Stats.JobID)
}
//Send 'cancel' ctl command to the running instance
if err := gcwp.statsManager.SendCommand(jobID, opm.CtlCommandCancel); err != nil {
return err
}
break
default:
return fmt.Errorf("job kind '%s' does not support 'cancel' operation", theJob.Stats.JobKind)
}
return nil
}
//RetryJob retry the job
func (gcwp *GoCraftWorkPool) RetryJob(jobID string) error {
return gcwp.statsManager.Retry(jobID)
if utils.IsEmptyStr(jobID) {
return errors.New("empty job ID")
}
theJob, err := gcwp.statsManager.Retrieve(jobID)
if err != nil {
return err
}
if theJob.Stats.DieAt == 0 {
return fmt.Errorf("job '%s' is not a retryable job", jobID)
}
return gcwp.client.RetryDeadJob(theJob.Stats.DieAt, jobID)
}
//IsKnownJob ...
@ -395,6 +480,30 @@ func (gcwp *GoCraftWorkPool) RegisterHook(jobID string, hookURL string) error {
return gcwp.statsManager.RegisterHook(jobID, hookURL, false)
}
func (gcwp *GoCraftWorkPool) deleteScheduledJobsOfPeriodicPolicy(policyID string, cronSpec string) error {
schedule, err := cron.Parse(cronSpec)
if err != nil {
logger.Errorf("cron spec '%s' is not valid", cronSpec)
return err
}
now := utils.NowEpochSeconds()
nowTime := time.Unix(now, 0)
horizon := nowTime.Add(periodicEnqueuerHorizon)
//try to delete more
//return the last error if occurred
for t := schedule.Next(nowTime); t.Before(horizon); t = schedule.Next(t) {
epoch := t.Unix()
if err = gcwp.client.DeleteScheduledJob(epoch, policyID); err != nil {
//only logged
logger.Warningf("delete scheduled instance for periodic job %s failed with error: %s\n", policyID, err)
}
}
return err
}
func (gcwp *GoCraftWorkPool) handleSchedulePolicy(data interface{}) error {
if data == nil {
return errors.New("nil data interface")