Merge pull request #14463 from ninjadq/metrics_for_job_service

Enabled Prometheus for Jobservice
This commit is contained in:
Qian Deng 2021-03-30 18:08:28 +08:00 committed by GitHub
commit ee9be8d742
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 131 additions and 7 deletions

View File

@ -44,3 +44,10 @@ job_loggers:
loggers:
- name: "STD_OUTPUT" # Same with above
level: "{{level}}"
{% if metric.enabled %}
metric:
enabled: true
path: {{ metric.path }}
port: {{ metric.port }}
{% endif %}

View File

@ -20,3 +20,8 @@ HTTPS_PROXY={{jobservice_https_proxy}}
NO_PROXY={{jobservice_no_proxy}}
REGISTRY_CREDENTIAL_USERNAME={{registry_username}}
REGISTRY_CREDENTIAL_PASSWORD={{registry_password}}
{% if metric.enabled %}
METRIC_NAMESPACE=harbor
METRIC_SUBSYSTEM=jobservice
{% endif %}

View File

@ -206,6 +206,10 @@ http {
server core:{{ metric.port }};
}
upstream js_metrics {
server jobservice:{{ metric.port }};
}
upstream registry_metrics {
server registry:{{ metric.port }};
}
@ -218,6 +222,7 @@ http {
listen 9090;
location = /metrics {
if ($arg_comp = core) { proxy_pass http://core_metrics; }
if ($arg_comp = jobservice) { proxy_pass http://js_metrics; }
if ($arg_comp = registry) { proxy_pass http://registry_metrics; }
proxy_pass http://harbor_exporter;
}

View File

@ -238,6 +238,10 @@ http {
server core:{{ metric.port }};
}
upstream js_metrics {
server jobservice:{{ metric.port }};
}
upstream registry_metrics {
server registry:{{ metric.port }};
}
@ -250,6 +254,7 @@ http {
listen 9090;
location = {{ metric.path }} {
if ($arg_comp = core) { proxy_pass http://core_metrics; }
if ($arg_comp = jobservice) { proxy_pass http://js_metrics; }
if ($arg_comp = registry) { proxy_pass http://registry_metrics; }
proxy_pass http://harbor_exporter;
}

View File

@ -33,4 +33,5 @@ def prepare_job_service(config_dict):
internal_tls=config_dict['internal_tls'],
max_job_workers=config_dict['max_job_workers'],
redis_url=config_dict['redis_url_js'],
level=log_level)
level=log_level,
metric=config_dict['metric'])

View File

@ -78,6 +78,9 @@ type Configuration struct {
// Logger configurations
LoggerConfigs []*LoggerConfig `yaml:"loggers,omitempty"`
// Metric configurations
Metric *MetricConfig `yaml:"metric,omitempty"`
}
// HTTPSConfig keeps additional configurations when using https protocol
@ -104,6 +107,13 @@ type PoolConfig struct {
RedisPoolCfg *RedisPoolConfig `yaml:"redis_pool,omitempty"`
}
// MetricConfig used for configure metrics
type MetricConfig struct {
Enabled bool `yaml:"enabled"`
Path string `yaml:"path"`
Port int `yaml:"port"`
}
// CustomizedSettings keeps the customized settings of logger
type CustomizedSettings map[string]interface{}

View File

@ -14,6 +14,8 @@
package core
import (
"testing"
"github.com/goharbor/harbor/src/jobservice/common/query"
"github.com/goharbor/harbor/src/jobservice/common/utils"
"github.com/goharbor/harbor/src/jobservice/job"
@ -23,7 +25,6 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"testing"
)
// ControllerTestSuite tests functions of core controller
@ -229,6 +230,10 @@ func (suite *ControllerTestSuite) Start() error {
return suite.worker.Start()
}
func (suite *ControllerTestSuite) GetPoolID() string {
return suite.worker.GetPoolID()
}
func (suite *ControllerTestSuite) RegisterJobs(jobs map[string]interface{}) error {
return suite.worker.RegisterJobs(jobs)
}
@ -295,6 +300,10 @@ func (f *fakeWorker) Start() error {
return f.Called().Error(0)
}
func (f *fakeWorker) GetPoolID() string {
return f.Called().String()
}
func (f *fakeWorker) RegisterJobs(jobs map[string]interface{}) error {
return f.Called(jobs).Error(0)
}

View File

@ -27,6 +27,7 @@ import (
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/jobservice/period"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/metric"
)
const (
@ -56,10 +57,11 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
execContext job.Context
tracker job.Tracker
)
// Track the running job now
jID := j.ID
// used to instrument process time
now := time.Now()
// Check if the job is a periodic one as periodic job has its own ID format
if eID, yes := isPeriodicJobExecution(j); yes {
jID = eID
@ -109,7 +111,8 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
if err != nil {
// log error
logger.Errorf("Job '%s:%s' exit with error: %s", j.Name, j.ID, err)
metric.JobserviceTotalTask.WithLabelValues(j.Name, "fail").Inc()
metric.JobservieTaskProcessTimeSummary.WithLabelValues(j.Name, "fail").Observe(time.Since(now).Seconds())
if er := tracker.Fail(); er != nil {
logger.Errorf("Error occurred when marking the status of job %s:%s to failure: %s", j.Name, j.ID, er)
}
@ -124,11 +127,14 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
} else {
if latest == job.StoppedStatus {
// Logged
metric.JobserviceTotalTask.WithLabelValues(j.Name, "stop").Inc()
metric.JobservieTaskProcessTimeSummary.WithLabelValues(j.Name, "stop").Observe(time.Since(now).Seconds())
logger.Infof("Job %s:%s is stopped", j.Name, j.ID)
return
}
}
metric.JobserviceTotalTask.WithLabelValues(j.Name, "success").Inc()
metric.JobservieTaskProcessTimeSummary.WithLabelValues(j.Name, "success").Observe(time.Since(now).Seconds())
// Mark job status to success.
logger.Infof("Job '%s:%s' exit with success", j.Name, j.ID)
if er := tracker.Succeed(); er != nil {

View File

@ -44,6 +44,7 @@ import (
"github.com/goharbor/harbor/src/jobservice/worker"
"github.com/goharbor/harbor/src/jobservice/worker/cworker"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/metric"
redislib "github.com/goharbor/harbor/src/lib/redis"
"github.com/goharbor/harbor/src/pkg/p2p/preheat"
"github.com/goharbor/harbor/src/pkg/retention"
@ -61,6 +62,9 @@ const (
// JobService ...
var JobService = &Bootstrap{}
// workerPoolID
var workerPoolID string
// Bootstrap is coordinating process to help load and start the other components to serve.
type Bootstrap struct {
jobContextInitializer job.ContextInitializer
@ -174,6 +178,8 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
// Initialize controller
ctl := core.NewController(backendWorker, manager)
// Initialize Prometheus backend
go bs.createMetricServer(cfg)
// Start the API server
apiServer := bs.createAPIServer(ctx, cfg, ctl)
@ -205,6 +211,7 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
node := ctx.Value(utils.NodeID)
// Blocking here
logger.Infof("API server is serving at %d with [%s] mode at node [%s]", cfg.Port, cfg.Protocol, node)
metric.JobserviceInfo.WithLabelValues(node.(string), workerPoolID, fmt.Sprint(cfg.PoolConfig.WorkerCount)).Set(1)
if er := apiServer.Start(); er != nil {
if !terminated {
// Tell the listening goroutine
@ -221,6 +228,14 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
return
}
func (bs *Bootstrap) createMetricServer(cfg *config.Configuration) {
if cfg.Metric != nil && cfg.Metric.Enabled {
metric.RegisterJobServiceCollectors()
metric.ServeProm(cfg.Metric.Path, cfg.Metric.Port)
logger.Infof("Prom backend is serving at %s:%d", cfg.Metric.Path, cfg.Metric.Port)
}
}
// Load and run the API server.
func (bs *Bootstrap) createAPIServer(ctx context.Context, cfg *config.Configuration, ctl core.Interface) *api.Server {
// Initialized API server
@ -249,6 +264,8 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(
lcmCtl lcm.Controller,
) (worker.Interface, error) {
redisWorker := cworker.NewWorker(ctx, ns, workers, redisPool, lcmCtl)
workerPoolID = redisWorker.GetPoolID()
// Register jobs here
if err := redisWorker.RegisterJobs(
map[string]interface{}{
@ -274,11 +291,9 @@ func (bs *Bootstrap) loadAndRunRedisWorkerPool(
// exit
return nil, err
}
if err := redisWorker.Start(); err != nil {
return nil, err
}
return redisWorker, nil
}

View File

@ -163,6 +163,12 @@ func (w *basicWorker) Start() error {
return nil
}
// GetPoolID returns the worker pool id
func (w *basicWorker) GetPoolID() string {
v := reflect.ValueOf(*w.pool)
return v.FieldByName("workerPoolID").String()
}
// RegisterJobs is used to register multiple jobs to worker.
func (w *basicWorker) RegisterJobs(jobs map[string]interface{}) error {
if jobs == nil || len(jobs) == 0 {

View File

@ -32,6 +32,12 @@ type Interface interface {
// error if failed to register
RegisterJobs(jobs map[string]interface{}) error
// Get the worker pool ID
//
// Return:
// string : the pool ID
GetPoolID() string
// Enqueue job
//
// jobName string : the name of enqueuing job

View File

@ -0,0 +1,49 @@
package metric
import (
"os"
"github.com/prometheus/client_golang/prometheus"
)
// RegisterJobServiceCollectors ...
func RegisterJobServiceCollectors() {
prometheus.MustRegister([]prometheus.Collector{
JobserviceInfo,
JobserviceTotalTask,
JobservieTaskProcessTimeSummary,
}...)
}
var (
// JobserviceInfo used for collect jobservice information
JobserviceInfo = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: os.Getenv(NamespaceEnvKey),
Subsystem: os.Getenv(SubsystemEnvKey),
Name: "info",
Help: "the information of jobservice",
},
[]string{"node", "pool_id", "workers"},
)
// JobserviceTotalTask used for collect data
JobserviceTotalTask = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: os.Getenv(NamespaceEnvKey),
Subsystem: os.Getenv(SubsystemEnvKey),
Name: "task_total",
Help: "The total number of requests",
},
[]string{"type", "status"},
)
// JobservieTaskProcessTimeSummary used for instrument task running time
JobservieTaskProcessTimeSummary = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Namespace: os.Getenv(NamespaceEnvKey),
Subsystem: os.Getenv(SubsystemEnvKey),
Name: "task_process_time_seconds",
Help: "The time duration of the task process time",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
},
[]string{"type", "status"})
)