add periodic job UUID to upstream job id and use execution log as the… (#7530)

* add periodic job UUID to upstream job id and use execution log as the periodic log

Signed-off-by: wang yan <wangyan@vmware.com>

* add comments to fix codacy

Signed-off-by: wang yan <wangyan@vmware.com>

* Update code per comments

Signed-off-by: wang yan <wangyan@vmware.com>
This commit is contained in:
Wang Yan 2019-04-28 15:09:56 +08:00 committed by GitHub
parent 630fa48ac8
commit c26f655bce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 126 additions and 26 deletions

View File

@ -3,6 +3,7 @@ package job
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"
@ -10,6 +11,7 @@ import (
commonhttp "github.com/goharbor/harbor/src/common/http"
"github.com/goharbor/harbor/src/common/http/modifier/auth"
"github.com/goharbor/harbor/src/common/job/models"
"github.com/goharbor/harbor/src/jobservice/job"
)
// Client wraps interface to access jobservice.
@ -17,6 +19,7 @@ type Client interface {
SubmitJob(*models.JobData) (string, error)
GetJobLog(uuid string) ([]byte, error)
PostAction(uuid, action string) error
GetExecutions(uuid string) ([]job.Stats, error)
// TODO Redirect joblog when we see there's memory issue.
}
@ -103,6 +106,36 @@ func (d *DefaultClient) GetJobLog(uuid string) ([]byte, error) {
return data, nil
}
// GetExecutions ...
func (d *DefaultClient) GetExecutions(periodicJobID string) ([]job.Stats, error) {
url := fmt.Sprintf("%s/api/v1/jobs/%s/executions", d.endpoint, periodicJobID)
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return nil, err
}
resp, err := d.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
data, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, &commonhttp.Error{
Code: resp.StatusCode,
Message: string(data),
}
}
var exes []job.Stats
err = json.Unmarshal(data, &exes)
if err != nil {
return nil, err
}
return exes, nil
}
// PostAction call jobservice's API to operate action for job specified by uuid
func (d *DefaultClient) PostAction(uuid, action string) error {
url := d.endpoint + "/api/v1/jobs/" + uuid

View File

@ -47,6 +47,14 @@ func TestGetJobLog(t *testing.T) {
assert.Contains(text, "The content in this file is for mocking the get log api.")
}
func TestGetExecutions(t *testing.T) {
assert := assert.New(t)
exes, err := testClient.GetExecutions(ID)
assert.Nil(err)
stat := exes[0]
assert.Equal(ID+"@123123", stat.Info.JobID)
}
func TestPostAction(t *testing.T) {
assert := assert.New(t)
err := testClient.PostAction(ID, "fff")

View File

@ -28,25 +28,28 @@ type JobMetadata struct {
// JobStats keeps the result of job launching.
type JobStats struct {
Stats *JobStatData `json:"job"`
Stats *StatsInfo `json:"job"`
}
// JobStatData keeps the stats of job
type JobStatData struct {
JobID string `json:"id"`
Status string `json:"status"`
JobName string `json:"name"`
JobKind string `json:"kind"`
IsUnique bool `json:"unique"`
RefLink string `json:"ref_link,omitempty"`
CronSpec string `json:"cron_spec,omitempty"`
EnqueueTime int64 `json:"enqueue_time"`
UpdateTime int64 `json:"update_time"`
RunAt int64 `json:"run_at,omitempty"`
CheckIn string `json:"check_in,omitempty"`
CheckInAt int64 `json:"check_in_at,omitempty"`
DieAt int64 `json:"die_at,omitempty"`
HookStatus string `json:"hook_status,omitempty"`
// StatsInfo keeps the stats of job
type StatsInfo struct {
JobID string `json:"id"`
Status string `json:"status"`
JobName string `json:"name"`
JobKind string `json:"kind"`
IsUnique bool `json:"unique"`
RefLink string `json:"ref_link,omitempty"`
CronSpec string `json:"cron_spec,omitempty"`
EnqueueTime int64 `json:"enqueue_time"`
UpdateTime int64 `json:"update_time"`
RunAt int64 `json:"run_at,omitempty"`
CheckIn string `json:"check_in,omitempty"`
CheckInAt int64 `json:"check_in_at,omitempty"`
DieAt int64 `json:"die_at,omitempty"`
WebHookURL string `json:"web_hook_url,omitempty"`
UpstreamJobID string `json:"upstream_job_id,omitempty"` // Ref the upstream job if existing
NumericPID int64 `json:"numeric_policy_id,omitempty"` // The numeric policy ID of the periodic job
Parameters Parameters `json:"parameters,omitempty"`
}
// JobPoolStats represents the healthy and status of all the running worker pools.
@ -71,9 +74,10 @@ type JobActionRequest struct {
// JobStatusChange is designed for reporting the status change via hook.
type JobStatusChange struct {
JobID string `json:"job_id"`
Status string `json:"status"`
CheckIn string `json:"check_in,omitempty"`
JobID string `json:"job_id"`
Status string `json:"status"`
CheckIn string `json:"check_in,omitempty"`
Metadata *StatsInfo `json:"metadata,omitempty"`
}
// Message is designed for sub/pub messages

View File

@ -12,6 +12,8 @@ import (
"time"
"github.com/goharbor/harbor/src/common/job/models"
"github.com/goharbor/harbor/src/jobservice/job"
job_models "github.com/goharbor/harbor/src/jobservice/job"
)
const (
@ -45,6 +47,29 @@ func NewJobServiceServer() *httptest.Server {
panic(err)
}
})
mux.HandleFunc(fmt.Sprintf("%s/%s/executions", jobsPrefix, jobUUID),
func(rw http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodGet {
rw.WriteHeader(http.StatusMethodNotAllowed)
return
}
var stats []job.Stats
stat := job_models.Stats{
Info: &job_models.StatsInfo{
JobID: jobUUID + "@123123",
Status: "Pending",
RunAt: time.Now().Unix(),
IsUnique: false,
},
}
stats = append(stats, stat)
b, _ := json.Marshal(stats)
if _, err := rw.Write(b); err != nil {
panic(err)
}
rw.WriteHeader(http.StatusOK)
return
})
mux.HandleFunc(fmt.Sprintf("%s/%s", jobsPrefix, jobUUID),
func(rw http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
@ -77,7 +102,7 @@ func NewJobServiceServer() *httptest.Server {
json.Unmarshal(data, &jobReq)
if jobReq.Job.Name == "replication" {
respData := models.JobStats{
Stats: &models.JobStatData{
Stats: &models.StatsInfo{
JobID: jobUUID,
Status: "Pending",
RunAt: time.Now().Unix(),

View File

@ -169,8 +169,22 @@ func (aj *AJAPI) getLog(id int64) {
aj.SendNotFoundError(errors.New("Failed to get Job"))
return
}
jobID := job.UUID
// to get the latest execution job id, then to query job log.
if job.Kind == common_job.JobKindPeriodic {
exes, err := utils_core.GetJobServiceClient().GetExecutions(job.UUID)
if err != nil {
aj.SendInternalServerError(err)
return
}
if len(exes) == 0 {
aj.SendNotFoundError(errors.New("no execution log "))
return
}
jobID = exes[0].Info.JobID
}
logBytes, err := utils_core.GetJobServiceClient().GetJobLog(job.UUID)
logBytes, err := utils_core.GetJobServiceClient().GetJobLog(jobID)
if err != nil {
if httpErr, ok := err.(*common_http.Error); ok {
aj.RenderError(httpErr.Code, "")

View File

@ -38,9 +38,10 @@ var statusMap = map[string]string{
// Handler handles reqeust on /service/notifications/jobs/adminjob/*, which listens to the webhook of jobservice.
type Handler struct {
api.BaseController
id int64
UUID string
status string
id int64
UUID string
status string
UpstreamJobID string
}
// Prepare ...
@ -60,7 +61,13 @@ func (h *Handler) Prepare() {
return
}
h.id = id
h.UUID = data.JobID
// UpstreamJobID is the periodic job id
if data.Metadata.UpstreamJobID != "" {
h.UUID = data.Metadata.UpstreamJobID
} else {
h.UUID = data.JobID
}
status, ok := statusMap[data.Status]
if !ok {
log.Infof("drop the job status update event: job id-%d, status-%s", h.id, status)

View File

@ -5,6 +5,7 @@ import (
"testing"
"github.com/goharbor/harbor/src/common/job/models"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/replication/model"
)
@ -24,6 +25,9 @@ func (client TestClient) GetJobLog(uuid string) ([]byte, error) {
func (client TestClient) PostAction(uuid, action string) error {
return nil
}
func (client TestClient) GetExecutions(uuid string) ([]job.Stats, error) {
return nil, nil
}
func TestPreprocess(t *testing.T) {
items, err := generateData()

View File

@ -19,6 +19,7 @@ import (
"testing"
"github.com/goharbor/harbor/src/common/job/models"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/replication/config"
"github.com/goharbor/harbor/src/replication/dao"
rep_models "github.com/goharbor/harbor/src/replication/dao/models"
@ -51,6 +52,10 @@ func (f *fakedJobserviceClient) PostAction(uuid, action string) error {
f.stopped = true
return nil
}
func (f *fakedJobserviceClient) GetExecutions(uuid string) ([]job.Stats, error) {
f.stopped = true
return nil, nil
}
type fakedScheduleJobDAO struct {
idCounter int64