Refactor redis pool and job stats manager for clarifying functions

also add UT cases for logger and part of opm package
This commit is contained in:
Steven Zou 2018-03-28 11:24:59 +08:00
parent 02b73b50f2
commit 76785c3cb1
10 changed files with 329 additions and 158 deletions

View File

@ -82,8 +82,8 @@ func TestDefaultConfig(t *testing.T) {
t.Errorf("Load config from yaml file, expect nil error but got error '%s'\n", err)
}
if endpoint := GetAdminServerEndpoint(); endpoint != "http://localhost:9010/" {
t.Errorf("expect default admin server endpoint 'http://localhost:9010/' but got '%s'\n", endpoint)
if endpoint := GetAdminServerEndpoint(); endpoint != "http://127.0.0.1:9010/" {
t.Errorf("expect default admin server endpoint 'http://127.0.0.1:9010/' but got '%s'\n", endpoint)
}
if basePath := GetLogBasePath(); basePath != "/tmp/job_logs" {

View File

@ -28,5 +28,5 @@ logger:
archive_period: 1 #days
#Admin server endpoint
admin_server: "http://localhost:9010/"
admin_server: "http://127.0.0.1:9010/"

View File

@ -0,0 +1,75 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package logger
import (
"fmt"
"testing"
)
func TestServiceLogger(t *testing.T) {
testingLogger := &fakeLogger{}
SetLogger(testingLogger)
Debug("DEBUG")
Debugf("%s\n", "DEBUGF")
Info("INFO")
Infof("%s\n", "INFOF")
Warning("WARNING")
Warningf("%s\n", "WARNINGF")
Error("ERROR")
Errorf("%s\n", "ERRORF")
Fatal("FATAL")
Fatalf("%s\n", "FATALF")
}
type fakeLogger struct{}
//For debuging
func (fl *fakeLogger) Debug(v ...interface{}) {
fmt.Println(v...)
}
//For debuging with format
func (fl *fakeLogger) Debugf(format string, v ...interface{}) {
fmt.Printf(format, v...)
}
//For logging info
func (fl *fakeLogger) Info(v ...interface{}) {
fmt.Println(v...)
}
//For logging info with format
func (fl *fakeLogger) Infof(format string, v ...interface{}) {
fmt.Printf(format, v...)
}
//For warning
func (fl *fakeLogger) Warning(v ...interface{}) {
fmt.Println(v...)
}
//For warning with format
func (fl *fakeLogger) Warningf(format string, v ...interface{}) {
fmt.Printf(format, v...)
}
//For logging error
func (fl *fakeLogger) Error(v ...interface{}) {
fmt.Println(v...)
}
//For logging error with format
func (fl *fakeLogger) Errorf(format string, v ...interface{}) {
fmt.Printf(format, v...)
}
//For fatal error
func (fl *fakeLogger) Fatal(v ...interface{}) {
fmt.Println(v...)
}
//For fatal error with error
func (fl *fakeLogger) Fatalf(format string, v ...interface{}) {
fmt.Printf(format, v...)
}

View File

@ -0,0 +1,36 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package logger
import (
"context"
"fmt"
"os"
"testing"
"time"
)
func TestSweeper(t *testing.T) {
workDir := "/tmp/sweeper_logs"
if err := os.MkdirAll(workDir, 0755); err != nil {
t.Error(err)
}
_, err := os.Create(fmt.Sprintf("%s/sweeper_test.log", workDir))
if err != nil {
t.Error(err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sweeper := NewSweeper(ctx, workDir, 1)
sweeper.Start()
<-time.After(100 * time.Millisecond)
if err := os.Remove(fmt.Sprintf("%s/sweeper_test.log", workDir)); err != nil {
t.Error(err)
}
if err := os.Remove(workDir); err != nil {
t.Error(err)
}
}

View File

@ -0,0 +1,42 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package opm
import (
"fmt"
"net/http"
"net/http/httptest"
"testing"
"github.com/vmware/harbor/src/jobservice_v2/models"
)
func TestHookClient(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "ok")
}))
defer ts.Close()
err := DefaultHookClient.ReportStatus(ts.URL, models.JobStatusChange{
JobID: "fake_job_ID",
Status: "running",
})
if err != nil {
t.Error(err)
}
}
func TestReportStatusFailed(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("failed"))
}))
defer ts.Close()
err := DefaultHookClient.ReportStatus(ts.URL, models.JobStatusChange{
JobID: "fake_job_ID",
Status: "running",
})
if err == nil {
t.Error("expect error but got nil")
}
}

View File

@ -0,0 +1,19 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package opm
import "testing"
func TestHookStore(t *testing.T) {
store := NewHookStore()
reportURL := "http://localhost:9090/report"
store.Add("id_1", reportURL)
url, ok := store.Get("id_1")
if !ok || url != reportURL {
t.Errorf("expect hook url '%s' but got '%s'", reportURL, url)
}
u, ok := store.Remove("id_1")
if !ok || u != reportURL {
t.Errorf("expect deleted '%s' but failed to do", reportURL)
}
}

View File

@ -30,29 +30,14 @@ type JobStatsManager interface {
//Async method to retry
SetJobStatus(jobID string, status string)
//Stop the job
//Send command fro the specified job
//
//jobID string : ID of the being stopped job
//jobID string : ID of the being retried job
//command string : the command applied to the job like stop/cancel
//
//Returns:
// error if meet any problems
Stop(jobID string) error
//Cancel the job
//
//jobID string : ID of the being cancelled job
//
//Returns:
// error if meet any problems
Cancel(jobID string) error
//Retry the job
//
//jobID string : ID of the being retried job
//
//Returns:
// error if meet any problems
Retry(jobID string) error
// error if it was not successfully sent
SendCommand(jobID string, command string) error
//CtlCommand checks if control command is fired for the specified job.
//
@ -86,4 +71,12 @@ type JobStatsManager interface {
//Returns:
// error if meet any problems
RegisterHook(jobID string, hookURL string, isCached bool) error
//Mark the periodic job stats expired
//
//jobID string : ID of job
//
//Returns:
// error if meet any problems
ExpirePeriodicJobStats(jobID string) error
}

View File

@ -17,8 +17,6 @@ import (
"github.com/vmware/harbor/src/jobservice_v2/period"
"github.com/robfig/cron"
"github.com/gocraft/work"
"github.com/garyburd/redigo/redis"
@ -43,9 +41,6 @@ const (
//CtlCommandRetry : command retry
CtlCommandRetry = "retry"
//Copy from period.enqueuer
periodicEnqueuerHorizon = 4 * time.Minute
//EventRegisterStatusHook is event name of registering hook
EventRegisterStatusHook = "register_hook"
)
@ -99,6 +94,10 @@ func (rjs *RedisJobStatsManager) Start() {
//Shutdown is implementation of same method in JobStatsManager interface.
func (rjs *RedisJobStatsManager) Shutdown() {
defer func() {
rjs.isRunning = false
}()
if !rjs.isRunning {
return
}
@ -206,102 +205,17 @@ func (rjs *RedisJobStatsManager) loop() {
}
}
//Stop the specified job.
func (rjs *RedisJobStatsManager) Stop(jobID string) error {
//SendCommand for the specified job
func (rjs *RedisJobStatsManager) SendCommand(jobID string, command string) error {
if utils.IsEmptyStr(jobID) {
return errors.New("empty job ID")
}
theJob, err := rjs.getJobStats(jobID)
if err != nil {
return err
if command != CtlCommandStop && command != CtlCommandCancel {
return errors.New("unknown command")
}
switch theJob.Stats.JobKind {
case job.JobKindGeneric:
//nothing need to do
case job.JobKindScheduled:
//we need to delete the scheduled job in the queue if it is not running yet
//otherwise, nothing need to do
if theJob.Stats.Status == job.JobStatusScheduled {
if err := rjs.client.DeleteScheduledJob(theJob.Stats.RunAt, jobID); err != nil {
return err
}
}
case job.JobKindPeriodic:
//firstly delete the periodic job policy
if err := rjs.scheduler.UnSchedule(jobID); err != nil {
return err
}
//secondly we need try to delete the job instances scheduled for this periodic job, a try best action
rjs.deleteScheduledJobsOfPeriodicPolicy(theJob.Stats.JobID, theJob.Stats.CronSpec) //ignore error as we have logged
//thirdly expire the job stats of this periodic job if exists
if err := rjs.expirePeriodicJobStats(theJob.Stats.JobID); err != nil {
//only logged
logger.Errorf("Expire the stats of job %s failed with error: %s\n", theJob.Stats.JobID, err)
}
default:
break
}
//Check if the job has 'running' instance
if theJob.Stats.Status == job.JobStatusRunning {
//Send 'stop' ctl command to the running instance
if err := rjs.writeCtlCommand(jobID, CtlCommandStop); err != nil {
return err
}
}
return nil
}
//Cancel the specified job.
//Async method, not blocking
func (rjs *RedisJobStatsManager) Cancel(jobID string) error {
if utils.IsEmptyStr(jobID) {
return errors.New("empty job ID")
}
theJob, err := rjs.getJobStats(jobID)
if err != nil {
return err
}
switch theJob.Stats.JobKind {
case job.JobKindGeneric:
if theJob.Stats.Status != job.JobStatusRunning {
return fmt.Errorf("only running job can be cancelled, job '%s' seems not running now", theJob.Stats.JobID)
}
//Send 'cancel' ctl command to the running instance
if err := rjs.writeCtlCommand(jobID, CtlCommandCancel); err != nil {
return err
}
break
default:
return fmt.Errorf("job kind '%s' does not support 'cancel' operation", theJob.Stats.JobKind)
}
return nil
}
//Retry the specified job.
//Async method, not blocking
func (rjs *RedisJobStatsManager) Retry(jobID string) error {
if utils.IsEmptyStr(jobID) {
return errors.New("empty job ID")
}
theJob, err := rjs.getJobStats(jobID)
if err != nil {
return err
}
if theJob.Stats.DieAt == 0 {
return fmt.Errorf("job '%s' is not a retryable job", jobID)
}
return rjs.client.RetryDeadJob(theJob.Stats.DieAt, jobID)
return rjs.writeCtlCommand(jobID, command)
}
//CheckIn mesage
@ -363,6 +277,20 @@ func (rjs *RedisJobStatsManager) RegisterHook(jobID string, hookURL string, isCa
return nil
}
//ExpirePeriodicJobStats marks the periodic job stats expired
func (rjs *RedisJobStatsManager) ExpirePeriodicJobStats(jobID string) error {
conn := rjs.redisPool.Get()
defer conn.Close()
//The periodic job (policy) is stopped/unscheduled and then
//the stats of periodic job now can be expired
key := utils.KeyJobStats(rjs.namespace, jobID)
expireTime := 24 * 60 * 60 //1 day
_, err := conn.Do("EXPIRE", key, expireTime)
return err
}
func (rjs *RedisJobStatsManager) submitStatusReportingItem(jobID string, status, checkIn string) {
//Let it run in a separate goroutine to avoid waiting more time
go func() {
@ -402,43 +330,6 @@ func (rjs *RedisJobStatsManager) reportStatus(jobID string, hookURL, status, che
return DefaultHookClient.ReportStatus(hookURL, reportingStatus)
}
func (rjs *RedisJobStatsManager) expirePeriodicJobStats(jobID string) error {
conn := rjs.redisPool.Get()
defer conn.Close()
//The periodic job (policy) is stopped/unscheduled and then
//the stats of periodic job now can be expired
key := utils.KeyJobStats(rjs.namespace, jobID)
expireTime := 24 * 60 * 60 //1 day
_, err := conn.Do("EXPIRE", key, expireTime)
return err
}
func (rjs *RedisJobStatsManager) deleteScheduledJobsOfPeriodicPolicy(policyID string, cronSpec string) error {
schedule, err := cron.Parse(cronSpec)
if err != nil {
logger.Errorf("cron spec '%s' is not valid", cronSpec)
return err
}
now := utils.NowEpochSeconds()
nowTime := time.Unix(now, 0)
horizon := nowTime.Add(periodicEnqueuerHorizon)
//try to delete more
//return the last error if occurred
for t := schedule.Next(nowTime); t.Before(horizon); t = schedule.Next(t) {
epoch := t.Unix()
if err = rjs.client.DeleteScheduledJob(epoch, policyID); err != nil {
//only logged
logger.Warningf("delete scheduled instance for periodic job %s failed with error: %s\n", policyID, err)
}
}
return err
}
func (rjs *RedisJobStatsManager) getCrlCommand(jobID string) (string, error) {
conn := rjs.redisPool.Get()
defer conn.Close()

View File

@ -0,0 +1,6 @@
// Copyright 2018 The Harbor Authors. All rights reserved.
package opm
import "testing"
func TestRetrieveJob(t *testing.T) {}

View File

@ -9,6 +9,7 @@ import (
"github.com/garyburd/redigo/redis"
"github.com/gocraft/work"
"github.com/robfig/cron"
"github.com/vmware/harbor/src/jobservice_v2/env"
"github.com/vmware/harbor/src/jobservice_v2/job"
"github.com/vmware/harbor/src/jobservice_v2/logger"
@ -25,6 +26,9 @@ var (
const (
workerPoolStatusHealthy = "Healthy"
workerPoolStatusDead = "Dead"
//Copy from period.enqueuer
periodicEnqueuerHorizon = 4 * time.Minute
)
//GoCraftWorkPool is the pool implementation based on gocraft/work powered by redis.
@ -352,17 +356,98 @@ func (gcwp *GoCraftWorkPool) Stats() (models.JobPoolStats, error) {
//StopJob will stop the job
func (gcwp *GoCraftWorkPool) StopJob(jobID string) error {
return gcwp.statsManager.Stop(jobID)
if utils.IsEmptyStr(jobID) {
return errors.New("empty job ID")
}
theJob, err := gcwp.statsManager.Retrieve(jobID)
if err != nil {
return err
}
switch theJob.Stats.JobKind {
case job.JobKindGeneric:
//nothing need to do
case job.JobKindScheduled:
//we need to delete the scheduled job in the queue if it is not running yet
//otherwise, nothing need to do
if theJob.Stats.Status == job.JobStatusScheduled {
if err := gcwp.client.DeleteScheduledJob(theJob.Stats.RunAt, jobID); err != nil {
return err
}
}
case job.JobKindPeriodic:
//firstly delete the periodic job policy
if err := gcwp.scheduler.UnSchedule(jobID); err != nil {
return err
}
//secondly we need try to delete the job instances scheduled for this periodic job, a try best action
gcwp.deleteScheduledJobsOfPeriodicPolicy(theJob.Stats.JobID, theJob.Stats.CronSpec) //ignore error as we have logged
//thirdly expire the job stats of this periodic job if exists
if err := gcwp.statsManager.ExpirePeriodicJobStats(theJob.Stats.JobID); err != nil {
//only logged
logger.Errorf("Expire the stats of job %s failed with error: %s\n", theJob.Stats.JobID, err)
}
default:
break
}
//Check if the job has 'running' instance
if theJob.Stats.Status == job.JobStatusRunning {
//Send 'stop' ctl command to the running instance
if err := gcwp.statsManager.SendCommand(jobID, opm.CtlCommandStop); err != nil {
return err
}
}
return nil
}
//CancelJob will cancel the job
func (gcwp *GoCraftWorkPool) CancelJob(jobID string) error {
return gcwp.statsManager.Cancel(jobID)
if utils.IsEmptyStr(jobID) {
return errors.New("empty job ID")
}
theJob, err := gcwp.statsManager.Retrieve(jobID)
if err != nil {
return err
}
switch theJob.Stats.JobKind {
case job.JobKindGeneric:
if theJob.Stats.Status != job.JobStatusRunning {
return fmt.Errorf("only running job can be cancelled, job '%s' seems not running now", theJob.Stats.JobID)
}
//Send 'cancel' ctl command to the running instance
if err := gcwp.statsManager.SendCommand(jobID, opm.CtlCommandCancel); err != nil {
return err
}
break
default:
return fmt.Errorf("job kind '%s' does not support 'cancel' operation", theJob.Stats.JobKind)
}
return nil
}
//RetryJob retry the job
func (gcwp *GoCraftWorkPool) RetryJob(jobID string) error {
return gcwp.statsManager.Retry(jobID)
if utils.IsEmptyStr(jobID) {
return errors.New("empty job ID")
}
theJob, err := gcwp.statsManager.Retrieve(jobID)
if err != nil {
return err
}
if theJob.Stats.DieAt == 0 {
return fmt.Errorf("job '%s' is not a retryable job", jobID)
}
return gcwp.client.RetryDeadJob(theJob.Stats.DieAt, jobID)
}
//IsKnownJob ...
@ -395,6 +480,30 @@ func (gcwp *GoCraftWorkPool) RegisterHook(jobID string, hookURL string) error {
return gcwp.statsManager.RegisterHook(jobID, hookURL, false)
}
func (gcwp *GoCraftWorkPool) deleteScheduledJobsOfPeriodicPolicy(policyID string, cronSpec string) error {
schedule, err := cron.Parse(cronSpec)
if err != nil {
logger.Errorf("cron spec '%s' is not valid", cronSpec)
return err
}
now := utils.NowEpochSeconds()
nowTime := time.Unix(now, 0)
horizon := nowTime.Add(periodicEnqueuerHorizon)
//try to delete more
//return the last error if occurred
for t := schedule.Next(nowTime); t.Before(horizon); t = schedule.Next(t) {
epoch := t.Unix()
if err = gcwp.client.DeleteScheduledJob(epoch, policyID); err != nil {
//only logged
logger.Warningf("delete scheduled instance for periodic job %s failed with error: %s\n", policyID, err)
}
}
return err
}
func (gcwp *GoCraftWorkPool) handleSchedulePolicy(data interface{}) error {
if data == nil {
return errors.New("nil data interface")