From c26f655bce23c21f390dd2294dd38c476efc9b5a Mon Sep 17 00:00:00 2001 From: Wang Yan Date: Sun, 28 Apr 2019 15:09:56 +0800 Subject: [PATCH] =?UTF-8?q?add=20periodic=20job=20UUID=20to=20upstream=20j?= =?UTF-8?q?ob=20id=20and=20use=20execution=20log=20as=20the=E2=80=A6=20(#7?= =?UTF-8?q?530)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * add periodic job UUID to upstream job id and use execution log as the periodic log Signed-off-by: wang yan * add comments to fix codacy Signed-off-by: wang yan * Update code per comments Signed-off-by: wang yan --- src/common/job/client.go | 33 ++++++++++++++ src/common/job/client_test.go | 8 ++++ src/common/job/models/models.go | 44 ++++++++++--------- src/common/job/test/server.go | 27 +++++++++++- src/core/api/admin_job.go | 16 ++++++- .../service/notifications/admin/handler.go | 15 +++++-- .../operation/scheduler/scheduler_test.go | 4 ++ .../policy/scheduler/scheduler_test.go | 5 +++ 8 files changed, 126 insertions(+), 26 deletions(-) diff --git a/src/common/job/client.go b/src/common/job/client.go index 1070ae735..80ddd16d9 100644 --- a/src/common/job/client.go +++ b/src/common/job/client.go @@ -3,6 +3,7 @@ package job import ( "bytes" "encoding/json" + "fmt" "io/ioutil" "net/http" "strings" @@ -10,6 +11,7 @@ import ( commonhttp "github.com/goharbor/harbor/src/common/http" "github.com/goharbor/harbor/src/common/http/modifier/auth" "github.com/goharbor/harbor/src/common/job/models" + "github.com/goharbor/harbor/src/jobservice/job" ) // Client wraps interface to access jobservice. @@ -17,6 +19,7 @@ type Client interface { SubmitJob(*models.JobData) (string, error) GetJobLog(uuid string) ([]byte, error) PostAction(uuid, action string) error + GetExecutions(uuid string) ([]job.Stats, error) // TODO Redirect joblog when we see there's memory issue. } @@ -103,6 +106,36 @@ func (d *DefaultClient) GetJobLog(uuid string) ([]byte, error) { return data, nil } +// GetExecutions ... +func (d *DefaultClient) GetExecutions(periodicJobID string) ([]job.Stats, error) { + url := fmt.Sprintf("%s/api/v1/jobs/%s/executions", d.endpoint, periodicJobID) + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return nil, err + } + resp, err := d.client.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + data, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + if resp.StatusCode != http.StatusOK { + return nil, &commonhttp.Error{ + Code: resp.StatusCode, + Message: string(data), + } + } + var exes []job.Stats + err = json.Unmarshal(data, &exes) + if err != nil { + return nil, err + } + return exes, nil +} + // PostAction call jobservice's API to operate action for job specified by uuid func (d *DefaultClient) PostAction(uuid, action string) error { url := d.endpoint + "/api/v1/jobs/" + uuid diff --git a/src/common/job/client_test.go b/src/common/job/client_test.go index b1837cd5a..8dd208841 100644 --- a/src/common/job/client_test.go +++ b/src/common/job/client_test.go @@ -47,6 +47,14 @@ func TestGetJobLog(t *testing.T) { assert.Contains(text, "The content in this file is for mocking the get log api.") } +func TestGetExecutions(t *testing.T) { + assert := assert.New(t) + exes, err := testClient.GetExecutions(ID) + assert.Nil(err) + stat := exes[0] + assert.Equal(ID+"@123123", stat.Info.JobID) +} + func TestPostAction(t *testing.T) { assert := assert.New(t) err := testClient.PostAction(ID, "fff") diff --git a/src/common/job/models/models.go b/src/common/job/models/models.go index d11ac6826..4cf21c716 100644 --- a/src/common/job/models/models.go +++ b/src/common/job/models/models.go @@ -28,25 +28,28 @@ type JobMetadata struct { // JobStats keeps the result of job launching. type JobStats struct { - Stats *JobStatData `json:"job"` + Stats *StatsInfo `json:"job"` } -// JobStatData keeps the stats of job -type JobStatData struct { - JobID string `json:"id"` - Status string `json:"status"` - JobName string `json:"name"` - JobKind string `json:"kind"` - IsUnique bool `json:"unique"` - RefLink string `json:"ref_link,omitempty"` - CronSpec string `json:"cron_spec,omitempty"` - EnqueueTime int64 `json:"enqueue_time"` - UpdateTime int64 `json:"update_time"` - RunAt int64 `json:"run_at,omitempty"` - CheckIn string `json:"check_in,omitempty"` - CheckInAt int64 `json:"check_in_at,omitempty"` - DieAt int64 `json:"die_at,omitempty"` - HookStatus string `json:"hook_status,omitempty"` +// StatsInfo keeps the stats of job +type StatsInfo struct { + JobID string `json:"id"` + Status string `json:"status"` + JobName string `json:"name"` + JobKind string `json:"kind"` + IsUnique bool `json:"unique"` + RefLink string `json:"ref_link,omitempty"` + CronSpec string `json:"cron_spec,omitempty"` + EnqueueTime int64 `json:"enqueue_time"` + UpdateTime int64 `json:"update_time"` + RunAt int64 `json:"run_at,omitempty"` + CheckIn string `json:"check_in,omitempty"` + CheckInAt int64 `json:"check_in_at,omitempty"` + DieAt int64 `json:"die_at,omitempty"` + WebHookURL string `json:"web_hook_url,omitempty"` + UpstreamJobID string `json:"upstream_job_id,omitempty"` // Ref the upstream job if existing + NumericPID int64 `json:"numeric_policy_id,omitempty"` // The numeric policy ID of the periodic job + Parameters Parameters `json:"parameters,omitempty"` } // JobPoolStats represents the healthy and status of all the running worker pools. @@ -71,9 +74,10 @@ type JobActionRequest struct { // JobStatusChange is designed for reporting the status change via hook. type JobStatusChange struct { - JobID string `json:"job_id"` - Status string `json:"status"` - CheckIn string `json:"check_in,omitempty"` + JobID string `json:"job_id"` + Status string `json:"status"` + CheckIn string `json:"check_in,omitempty"` + Metadata *StatsInfo `json:"metadata,omitempty"` } // Message is designed for sub/pub messages diff --git a/src/common/job/test/server.go b/src/common/job/test/server.go index e103b877c..2f91429df 100644 --- a/src/common/job/test/server.go +++ b/src/common/job/test/server.go @@ -12,6 +12,8 @@ import ( "time" "github.com/goharbor/harbor/src/common/job/models" + "github.com/goharbor/harbor/src/jobservice/job" + job_models "github.com/goharbor/harbor/src/jobservice/job" ) const ( @@ -45,6 +47,29 @@ func NewJobServiceServer() *httptest.Server { panic(err) } }) + mux.HandleFunc(fmt.Sprintf("%s/%s/executions", jobsPrefix, jobUUID), + func(rw http.ResponseWriter, req *http.Request) { + if req.Method != http.MethodGet { + rw.WriteHeader(http.StatusMethodNotAllowed) + return + } + var stats []job.Stats + stat := job_models.Stats{ + Info: &job_models.StatsInfo{ + JobID: jobUUID + "@123123", + Status: "Pending", + RunAt: time.Now().Unix(), + IsUnique: false, + }, + } + stats = append(stats, stat) + b, _ := json.Marshal(stats) + if _, err := rw.Write(b); err != nil { + panic(err) + } + rw.WriteHeader(http.StatusOK) + return + }) mux.HandleFunc(fmt.Sprintf("%s/%s", jobsPrefix, jobUUID), func(rw http.ResponseWriter, req *http.Request) { if req.Method != http.MethodPost { @@ -77,7 +102,7 @@ func NewJobServiceServer() *httptest.Server { json.Unmarshal(data, &jobReq) if jobReq.Job.Name == "replication" { respData := models.JobStats{ - Stats: &models.JobStatData{ + Stats: &models.StatsInfo{ JobID: jobUUID, Status: "Pending", RunAt: time.Now().Unix(), diff --git a/src/core/api/admin_job.go b/src/core/api/admin_job.go index bc7cdbb16..679be6b2f 100644 --- a/src/core/api/admin_job.go +++ b/src/core/api/admin_job.go @@ -169,8 +169,22 @@ func (aj *AJAPI) getLog(id int64) { aj.SendNotFoundError(errors.New("Failed to get Job")) return } + jobID := job.UUID + // to get the latest execution job id, then to query job log. + if job.Kind == common_job.JobKindPeriodic { + exes, err := utils_core.GetJobServiceClient().GetExecutions(job.UUID) + if err != nil { + aj.SendInternalServerError(err) + return + } + if len(exes) == 0 { + aj.SendNotFoundError(errors.New("no execution log ")) + return + } + jobID = exes[0].Info.JobID + } - logBytes, err := utils_core.GetJobServiceClient().GetJobLog(job.UUID) + logBytes, err := utils_core.GetJobServiceClient().GetJobLog(jobID) if err != nil { if httpErr, ok := err.(*common_http.Error); ok { aj.RenderError(httpErr.Code, "") diff --git a/src/core/service/notifications/admin/handler.go b/src/core/service/notifications/admin/handler.go index 310b8926e..326c6b52d 100644 --- a/src/core/service/notifications/admin/handler.go +++ b/src/core/service/notifications/admin/handler.go @@ -38,9 +38,10 @@ var statusMap = map[string]string{ // Handler handles reqeust on /service/notifications/jobs/adminjob/*, which listens to the webhook of jobservice. type Handler struct { api.BaseController - id int64 - UUID string - status string + id int64 + UUID string + status string + UpstreamJobID string } // Prepare ... @@ -60,7 +61,13 @@ func (h *Handler) Prepare() { return } h.id = id - h.UUID = data.JobID + // UpstreamJobID is the periodic job id + if data.Metadata.UpstreamJobID != "" { + h.UUID = data.Metadata.UpstreamJobID + } else { + h.UUID = data.JobID + } + status, ok := statusMap[data.Status] if !ok { log.Infof("drop the job status update event: job id-%d, status-%s", h.id, status) diff --git a/src/replication/operation/scheduler/scheduler_test.go b/src/replication/operation/scheduler/scheduler_test.go index 94a25ca97..0ef398d6d 100644 --- a/src/replication/operation/scheduler/scheduler_test.go +++ b/src/replication/operation/scheduler/scheduler_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/goharbor/harbor/src/common/job/models" + "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/replication/model" ) @@ -24,6 +25,9 @@ func (client TestClient) GetJobLog(uuid string) ([]byte, error) { func (client TestClient) PostAction(uuid, action string) error { return nil } +func (client TestClient) GetExecutions(uuid string) ([]job.Stats, error) { + return nil, nil +} func TestPreprocess(t *testing.T) { items, err := generateData() diff --git a/src/replication/policy/scheduler/scheduler_test.go b/src/replication/policy/scheduler/scheduler_test.go index 9b6c97a1e..27f6c9456 100644 --- a/src/replication/policy/scheduler/scheduler_test.go +++ b/src/replication/policy/scheduler/scheduler_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/goharbor/harbor/src/common/job/models" + "github.com/goharbor/harbor/src/jobservice/job" "github.com/goharbor/harbor/src/replication/config" "github.com/goharbor/harbor/src/replication/dao" rep_models "github.com/goharbor/harbor/src/replication/dao/models" @@ -51,6 +52,10 @@ func (f *fakedJobserviceClient) PostAction(uuid, action string) error { f.stopped = true return nil } +func (f *fakedJobserviceClient) GetExecutions(uuid string) ([]job.Stats, error) { + f.stopped = true + return nil, nil +} type fakedScheduleJobDAO struct { idCounter int64