mirror of
https://github.com/goharbor/harbor.git
synced 2025-02-16 11:51:47 +01:00
Add jobservice task queue related task
add jobservice metrics add redis client Signed-off-by: DQ <dengq@vmware.com>
This commit is contained in:
parent
ee9be8d742
commit
7eebbeebdf
@ -1,12 +1,16 @@
|
||||
LOG_LEVEL={{log_level}}
|
||||
HARBOR_EXPORTER_PORT=8080
|
||||
HARBOR_EXPORTER_METRICS_PATH=/metrics
|
||||
HARBOR_EXPORTER_METRICS_ENABLED=true
|
||||
HARBOR_EXPORTER_MAX_REQUESTS=30
|
||||
HARBOR_EXPORTER_CACHE_TIME=30
|
||||
HARBOR_EXPORTER_CACHE_TIME=23
|
||||
HARBOR_EXPORTER_CACHE_CLEAN_INTERVAL=14400
|
||||
HARBOR_METRIC_NAMESPACE=harbor
|
||||
HARBOR_METRIC_SUBSYSTEM=exporter
|
||||
HARBOR_SERVICE_HOST=core
|
||||
HARBOR_REDIS_URL={{redis_url_js}}
|
||||
HARBOR_REDIS_NAMESPACE=harbor_job_service_namespace
|
||||
HARBOR_REDIS_TIMEOUT=3600
|
||||
{%if internal_tls.enabled %}
|
||||
HARBOR_SERVICE_PORT=8443
|
||||
HARBOR_SERVICE_SCHEME=https
|
||||
|
@ -47,6 +47,12 @@ func main() {
|
||||
},
|
||||
})
|
||||
|
||||
exporter.InitBackendWorker(&exporter.RedisPoolConfig{
|
||||
URL: viper.GetString("redis.url"),
|
||||
Namespace: viper.GetString("redis.namespace"),
|
||||
IdleTimeoutSecond: viper.GetInt("redis.timeout"),
|
||||
})
|
||||
|
||||
exporterOpt := &exporter.Opt{
|
||||
Port: viper.GetInt("exporter.port"),
|
||||
MetricsPath: viper.GetString("exporter.metrics_path"),
|
||||
|
@ -86,11 +86,8 @@ func CacheInit(opt *Opt) {
|
||||
cacheCleanInterval = defaultCacheCleanInterval
|
||||
}
|
||||
ticker := time.NewTicker(time.Duration(cacheCleanInterval) * time.Second)
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
StartCacheCleaner()
|
||||
}
|
||||
for range ticker.C {
|
||||
StartCacheCleaner()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
@ -36,6 +36,7 @@ func NewExporter(opt *Opt) *Exporter {
|
||||
exporter.RegisterCollector(healthCollectorName, NewHealthCollect(hbrCli))
|
||||
exporter.RegisterCollector(systemInfoCollectorName, NewSystemInfoCollector(hbrCli))
|
||||
exporter.RegisterCollector(ProjectCollectorName, NewProjectCollector())
|
||||
exporter.RegisterCollector(JobServiceCollectorName, NewJobServiceCollector())
|
||||
r := prometheus.NewRegistry()
|
||||
r.MustRegister(exporter)
|
||||
exporter.Server = newServer(opt, r)
|
||||
@ -53,7 +54,7 @@ type Exporter struct {
|
||||
// RegisterCollector register a collector to expoter
|
||||
func (e *Exporter) RegisterCollector(name string, c prometheus.Collector) error {
|
||||
if _, ok := e.collectors[name]; ok {
|
||||
return errors.New("Collector name is already registered")
|
||||
return errors.New("collector name is already registered")
|
||||
}
|
||||
e.collectors[name] = c
|
||||
log.Infof("collector %s registered ...", name)
|
||||
@ -92,3 +93,11 @@ func (e *Exporter) Collect(c chan<- prometheus.Metric) {
|
||||
v.Collect(c)
|
||||
}
|
||||
}
|
||||
|
||||
func checkErr(err error, arg string) {
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
log.Errorf("%s: %v", arg, err)
|
||||
}
|
||||
|
106
src/pkg/exporter/js_collector.go
Normal file
106
src/pkg/exporter/js_collector.go
Normal file
@ -0,0 +1,106 @@
|
||||
package exporter
|
||||
|
||||
import (
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
// JobServiceCollectorName ...
|
||||
const JobServiceCollectorName = "JobServiceCollector"
|
||||
|
||||
var (
|
||||
jobServiceTaskQueueSize = typedDesc{
|
||||
desc: newDescWithLables("", "task_queue_size", "Total number of tasks", "type"),
|
||||
valueType: prometheus.GaugeValue,
|
||||
}
|
||||
jobServiceTaskQueueLatency = typedDesc{
|
||||
desc: newDescWithLables("", "task_queue_latency", "the time last taks processed", "type"),
|
||||
valueType: prometheus.GaugeValue,
|
||||
}
|
||||
jobServiceConcurrency = typedDesc{
|
||||
desc: newDescWithLables("", "task_concurrecy", "Total number of concurrency on a pool", "type", "pool"),
|
||||
valueType: prometheus.GaugeValue,
|
||||
}
|
||||
jobServiceScheduledJobTotal = typedDesc{
|
||||
desc: newDesc("", "task_scheduled_total", "total number of scheduled job"),
|
||||
valueType: prometheus.GaugeValue,
|
||||
}
|
||||
jobServiceScheduledJobFails = typedDesc{
|
||||
desc: newDescWithLables("", "task_scheduled_fails", "fail number of a scheduled job", "type"),
|
||||
valueType: prometheus.GaugeValue,
|
||||
}
|
||||
)
|
||||
|
||||
// NewJobServiceCollector ...
|
||||
func NewJobServiceCollector() *JobServiceCollector {
|
||||
return &JobServiceCollector{Namespace: namespace}
|
||||
}
|
||||
|
||||
// JobServiceCollector ...
|
||||
type JobServiceCollector struct {
|
||||
Namespace string
|
||||
}
|
||||
|
||||
// Describe implements prometheus.Collector
|
||||
func (hc *JobServiceCollector) Describe(c chan<- *prometheus.Desc) {
|
||||
c <- jobServiceTaskQueueSize.Desc()
|
||||
c <- jobServiceTaskQueueLatency.Desc()
|
||||
c <- jobServiceConcurrency.Desc()
|
||||
c <- jobServiceScheduledJobFails.Desc()
|
||||
}
|
||||
|
||||
// Collect implements prometheus.Collector
|
||||
func (hc *JobServiceCollector) Collect(c chan<- prometheus.Metric) {
|
||||
for _, m := range hc.getJobserviceInfo() {
|
||||
c <- m
|
||||
}
|
||||
}
|
||||
|
||||
func (hc *JobServiceCollector) getJobserviceInfo() []prometheus.Metric {
|
||||
if CacheEnabled() {
|
||||
value, ok := CacheGet(JobServiceCollectorName)
|
||||
if ok {
|
||||
return value.([]prometheus.Metric)
|
||||
}
|
||||
}
|
||||
result := []prometheus.Metric{}
|
||||
|
||||
// Get concurrency info via raw redis client
|
||||
rdsConn := GetRedisPool().Get()
|
||||
values, err := redis.Values(rdsConn.Do("SMEMBERS", redisKeyKnownJobs(jsNamespace)))
|
||||
checkErr(err, "err when get known jobs")
|
||||
var jobs []string
|
||||
for _, v := range values {
|
||||
jobs = append(jobs, string(v.([]byte)))
|
||||
}
|
||||
for _, job := range jobs {
|
||||
values, err := redis.Values(rdsConn.Do("HGETALL", redisKeyJobsLockInfo(jsNamespace, job)))
|
||||
checkErr(err, "err when get job lock info")
|
||||
for i := 0; i < len(values); i += 2 {
|
||||
key, _ := redis.String(values[i], nil)
|
||||
value, _ := redis.Float64(values[i+1], nil)
|
||||
result = append(result, jobServiceConcurrency.MustNewConstMetric(value, job, key))
|
||||
}
|
||||
}
|
||||
rdsConn.Close()
|
||||
|
||||
// get info via jobservice client
|
||||
cli := GetBackendWorker()
|
||||
// get queue info
|
||||
qs, err := cli.Queues()
|
||||
checkErr(err, "error when get work task queues info")
|
||||
for _, q := range qs {
|
||||
result = append(result, jobServiceTaskQueueSize.MustNewConstMetric(float64(q.Count), q.JobName))
|
||||
result = append(result, jobServiceTaskQueueLatency.MustNewConstMetric(float64(q.Latency), q.JobName))
|
||||
}
|
||||
|
||||
// get scheduled job info
|
||||
_, total, err := cli.ScheduledJobs(0)
|
||||
checkErr(err, "error when get scheduled job number")
|
||||
result = append(result, jobServiceScheduledJobTotal.MustNewConstMetric(float64(total)))
|
||||
|
||||
if CacheEnabled() {
|
||||
CachePut(JobServiceCollectorName, result)
|
||||
}
|
||||
return result
|
||||
}
|
90
src/pkg/exporter/js_worker.go
Normal file
90
src/pkg/exporter/js_worker.go
Normal file
@ -0,0 +1,90 @@
|
||||
package exporter
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/gocraft/work"
|
||||
redislib "github.com/goharbor/harbor/src/lib/redis"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
)
|
||||
|
||||
const (
|
||||
dialConnectionTimeout = 30 * time.Second
|
||||
dialReadTimeout = 10 * time.Second
|
||||
dialWriteTimeout = 10 * time.Second
|
||||
pageItemNum = 20.0
|
||||
)
|
||||
|
||||
var (
|
||||
redisPool *redis.Pool
|
||||
jsClient *work.Client
|
||||
jsNamespace string
|
||||
)
|
||||
|
||||
// RedisPoolConfig ...
|
||||
type RedisPoolConfig struct {
|
||||
URL string
|
||||
Namespace string
|
||||
IdleTimeoutSecond int
|
||||
}
|
||||
|
||||
// InitRedisCli ...
|
||||
func InitRedisCli(rdsCfg *RedisPoolConfig) {
|
||||
pool, err := redislib.GetRedisPool("JobService", rdsCfg.URL, &redislib.PoolParam{
|
||||
PoolMaxIdle: 6,
|
||||
PoolIdleTimeout: time.Duration(rdsCfg.IdleTimeoutSecond) * time.Second,
|
||||
DialConnectionTimeout: dialConnectionTimeout,
|
||||
DialReadTimeout: dialReadTimeout,
|
||||
DialWriteTimeout: dialWriteTimeout,
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
redisPool = pool
|
||||
}
|
||||
|
||||
// InitBackendWorker initiate backend worker
|
||||
func InitBackendWorker(redisPoolConfig *RedisPoolConfig) {
|
||||
pool, err := redislib.GetRedisPool("JobService", redisPoolConfig.URL, &redislib.PoolParam{
|
||||
PoolMaxIdle: 6,
|
||||
PoolIdleTimeout: time.Duration(redisPoolConfig.IdleTimeoutSecond) * time.Second,
|
||||
DialConnectionTimeout: dialConnectionTimeout,
|
||||
DialReadTimeout: dialReadTimeout,
|
||||
DialWriteTimeout: dialWriteTimeout,
|
||||
})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
redisPool = pool
|
||||
jsNamespace = fmt.Sprintf("{%s}", redisPoolConfig.Namespace)
|
||||
// Start the backend worker
|
||||
jsClient = work.NewClient(jsNamespace, pool)
|
||||
|
||||
}
|
||||
|
||||
// GetBackendWorker ...
|
||||
func GetBackendWorker() *work.Client {
|
||||
return jsClient
|
||||
}
|
||||
|
||||
// GetRedisPool ...
|
||||
func GetRedisPool() *redis.Pool {
|
||||
return redisPool
|
||||
}
|
||||
|
||||
func redisNamespacePrefix(namespace string) string {
|
||||
l := len(namespace)
|
||||
if (l > 0) && (namespace[l-1] != ':') {
|
||||
namespace = namespace + ":"
|
||||
}
|
||||
return namespace
|
||||
}
|
||||
|
||||
func redisKeyJobsLockInfo(namespace, jobName string) string {
|
||||
return redisNamespacePrefix(namespace) + "jobs:" + jobName + ":lock_info"
|
||||
}
|
||||
|
||||
func redisKeyKnownJobs(namespace string) string {
|
||||
return redisNamespacePrefix(namespace) + "known_jobs"
|
||||
}
|
@ -227,11 +227,3 @@ func updateProjectArtifactInfo(projectMap map[int64]*projectInfo) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func checkErr(err error, arg string) {
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
|
||||
log.Errorf("%s: %v", arg, err)
|
||||
}
|
||||
|
@ -41,7 +41,13 @@ func setupTest(t *testing.T) {
|
||||
|
||||
// register projAdmin and assign project admin role
|
||||
aliceID, err := dao.Register(alice)
|
||||
if err != nil {
|
||||
t.Errorf("register user error %v", err)
|
||||
}
|
||||
bobID, err := dao.Register(bob)
|
||||
if err != nil {
|
||||
t.Errorf("register user error %v", err)
|
||||
}
|
||||
eveID, err := dao.Register(eve)
|
||||
if err != nil {
|
||||
t.Errorf("register user error %v", err)
|
||||
@ -50,6 +56,9 @@ func setupTest(t *testing.T) {
|
||||
// Create Project
|
||||
ctx := orm.NewContext(context.Background(), dao.GetOrmer())
|
||||
proID1, err := proctl.Ctl.Create(ctx, &testPro1)
|
||||
if err != nil {
|
||||
t.Errorf("project creating %v", err)
|
||||
}
|
||||
proID2, err := proctl.Ctl.Create(ctx, &testPro2)
|
||||
if err != nil {
|
||||
t.Errorf("project creating %v", err)
|
||||
@ -67,6 +76,9 @@ func setupTest(t *testing.T) {
|
||||
// Add repo to project
|
||||
repo1.ProjectID = testPro1.ProjectID
|
||||
repo1ID, err := repository.Mgr.Create(ctx, &repo1)
|
||||
if err != nil {
|
||||
t.Errorf("add repo error %v", err)
|
||||
}
|
||||
repo1.RepositoryID = repo1ID
|
||||
repo2.ProjectID = testPro2.ProjectID
|
||||
repo2ID, err := repository.Mgr.Create(ctx, &repo2)
|
||||
@ -79,6 +91,9 @@ func setupTest(t *testing.T) {
|
||||
art1.RepositoryID = repo1ID
|
||||
art1.PushTime = time.Now()
|
||||
_, err = artifact.Mgr.Create(ctx, &art1)
|
||||
if err != nil {
|
||||
t.Errorf("add repo error %v", err)
|
||||
}
|
||||
|
||||
art2.ProjectID = testPro2.ProjectID
|
||||
art2.RepositoryID = repo2ID
|
||||
@ -91,7 +106,13 @@ func setupTest(t *testing.T) {
|
||||
pmIDs = make([]int, 0)
|
||||
alice.UserID, bob.UserID, eve.UserID = int(aliceID), int(bobID), int(eveID)
|
||||
p1m1ID, err := project.AddProjectMember(models.Member{ProjectID: proID1, Role: common.RoleDeveloper, EntityID: int(aliceID), EntityType: common.UserMember})
|
||||
if err != nil {
|
||||
t.Errorf("add project member error %v", err)
|
||||
}
|
||||
p2m1ID, err := project.AddProjectMember(models.Member{ProjectID: proID2, Role: common.RoleMaintainer, EntityID: int(bobID), EntityType: common.UserMember})
|
||||
if err != nil {
|
||||
t.Errorf("add project member error %v", err)
|
||||
}
|
||||
p2m2ID, err := project.AddProjectMember(models.Member{ProjectID: proID2, Role: common.RoleMaintainer, EntityID: int(eveID), EntityType: common.UserMember})
|
||||
if err != nil {
|
||||
t.Errorf("add project member error %v", err)
|
||||
|
Loading…
Reference in New Issue
Block a user