Refactor trace code

* use lib trace helper function
* add gracefull shutdown
* Add commens for new added exposed function
* Add licence on top of new created files
* Update trace library
* Update configs
* Add attribute and namespance in config

Signed-off-by: Qian Deng <dengq@vmware.com>
This commit is contained in:
Qian Deng 2021-08-27 16:24:14 +00:00
parent 6fec5b2873
commit bad913cf6d
23 changed files with 382 additions and 183 deletions

View File

@ -200,18 +200,29 @@ proxy:
# port: 9090
# path: /metrics
trace:
enabled: true
sample_rate: 1
jaeger:
endpoint: http://localhost
username: http://localhost
password:
agent_host: localhost
agent_port: 6832
otel:
endpoint: http://localhost
url_path: /v1/traces
compression: no
insecure: false
timeout: 10s
# Trace related config
# only can enable one trace provider(jaeger or otel) at the same time,
# and when using jaeger as provider, can only enable it with agent mode or collector mode.
# if using jaeger collector mode, uncomment endpoint and uncomment username, password if needed
# if using jaeger agetn mode uncomment agent_host and agent_port
# trace:
# enabled: true
# # set sample_rate to 1 if you wanna sampling 100% of trace data; set 0.5 if you wanna sampling 50% of trace data, and so forth
# sample_rate: 1
# # # namespace used to diferenciate different harbor services
# # namespace:
# # # attributes is a key value dict contains user defined attributes used to initialize trace provider
# # attributes:
# # application: harbor
# # jaeger:
# # endpoint: http://hostname:14268/api/traces
# # username:
# # password:
# # agent_host: hostname
# # agent_port: 6832
# # otel:
# # endpoint: hostname:4318
# # url_path: /v1/traces
# # compression: false
# # insecure: true
# # timeout: 10s

View File

@ -151,7 +151,9 @@ class Metric:
class JaegerExporter:
def __init__(self, config: dict):
if not config:
return None
self.enabled = False
return
self.enabled = True
self.endpoint = config.get('endpoint')
self.username = config.get('username')
self.password = config.get('password')
@ -165,7 +167,9 @@ class JaegerExporter:
class OtelExporter:
def __init__(self, config: dict):
if not config:
return None
self.enabled = False
return
self.enabled = True
self.endpoint = config.get('endpoint')
self.url_path = config.get('url_path')
self.compression = config.get('compression') or False
@ -180,15 +184,17 @@ class OtelExporter:
class Trace:
def __init__(self, config: dict):
self.enabled = config.get('enabled', False)
self.enabled = config.get('enabled') or False
self.sample_rate = config.get('sample_rate', 1)
self.jaeger = config.get('jaeger', {})
self.otel_exporter = config.get('otel_exporter', {})
self.namespace = config.get('namespace') or ''
self.jaeger = JaegerExporter(config.get('jaeger'))
self.otel = OtelExporter(config.get('otel'))
self.attributes = config.get('attributes') or {}
def validate(self):
if self.jaeger is None and self.otel_exporter is None:
if not self.jaeger.enabled and not self.enabled:
raise Exception('Trace enabled but no trace exporter set')
if self.jaeger is not None:
if self.jaeger.enabled:
JaegerExporter(self.jaeger).validate()
if self.otel_exporter is not None:
OtelExporter(self.otel_exporter).validate()
if self.otel.enabled:
OtelExporter(self.otel).validate()

View File

@ -67,18 +67,20 @@ METRIC_SUBSYSTEM=core
TRACE_ENABLED=true
TRACE_SERVICE_NAME=harbor-core
TRACE_SAMPLE_RATE={{ trace.sample_rate }}
{% if trace.jaeger is defined %}
TRACE_NAMESPACE={{ trace.namespace }}
TRACE_ATTRIBUTES={{ trace.attributes | to_json | safe }}
{% if trace.jaeger.enabled %}
TRACE_JAEGER_ENDPOINT={{ trace.jaeger.endpoint if trace.jaeger.endpoint else '' }}
TRACE_JAEGER_USERNAME={{ trace.jaeger.username if trace.jaeger.username else '' }}
TRACE_JAEGER_PASSWORD={{ trace.jaeger.password if trace.jaeger.password else '' }}
TRACE_JAEGER_AGENT_HOSTNAME={{ trace.jaeger.agent_host if trace.jaeger.agent_host else '' }}
TRACE_JAEGER_AGENT_PORT={{ trace.jaeger.agent_port if trace.jaeger.agent_port else '' }}
{% endif %}
{%if trace.otel is defined %}
TRACE_OTEL_ENDPOINT={{ trace.otel.endpoint if trace.otel.endpoint else '' }}
{%if trace.otel.enabled %}
TRACE_OTEL_ENDPOINT={{ trace.otel.endpoint }}
TRACE_OTEL_URL_PATH={{ trace.otel.url_path if trace.otel.url_path else '' }}
TRACE_OTEL_COMPRESSION={{ trace.otel.compression if trace.otel.compression else '' }}
TRACE_OTEL_COMPRESSION={{ trace.otel.compression }}
TRACE_OTEL_TIMEOUT={{ trace.otel.timeout }}
TRACE_OTEL_INSECURE={{ trace.otel.insecure if trace.otel.insecure else '' }}
TRACE_OTEL_INSECURE={{ trace.otel.insecure }}
{% endif %}
{% endif %}

View File

@ -30,18 +30,20 @@ METRIC_SUBSYSTEM=jobservice
TRACE_ENABLED=true
TRACE_SERVICE_NAME=harbor-jobservice
TRACE_SAMPLE_RATE={{ trace.sample_rate }}
{% if trace.jaeger is defined %}
TRACE_NAMESPACE={{ trace.namespace }}
TRACE_ATTRIBUTES={{ trace.attributes | to_json | safe }}
{% if trace.jaeger.enabled %}
TRACE_JAEGER_ENDPOINT={{ trace.jaeger.endpoint if trace.jaeger.endpoint else '' }}
TRACE_JAEGER_USERNAME={{ trace.jaeger.username if trace.jaeger.username else '' }}
TRACE_JAEGER_PASSWORD={{ trace.jaeger.password if trace.jaeger.password else '' }}
TRACE_JAEGER_AGENT_HOSTNAME={{ trace.jaeger.agent_host if trace.jaeger.agent_host else '' }}
TRACE_JAEGER_AGENT_PORT={{ trace.jaeger.agent_port if trace.jaeger.agent_port else '' }}
{% endif %}
{%if trace.otel is defined %}
TRACE_OTEL_ENDPOINT={{ trace.otel.endpoint if trace.otel.endpoint else '' }}
{%if trace.otel.enabled %}
TRACE_OTEL_ENDPOINT={{ trace.otel.endpoint }}
TRACE_OTEL_URL_PATH={{ trace.otel.url_path if trace.otel.url_path else '' }}
TRACE_OTEL_COMPRESSION={{ trace.otel.compression if trace.otel.compression else '' }}
TRACE_OTEL_COMPRESSION={{ trace.otel.compression }}
TRACE_OTEL_TIMEOUT={{ trace.otel.timeout }}
TRACE_OTEL_INSECURE={{ trace.otel.insecure if trace.otel.insecure else '' }}
TRACE_OTEL_INSECURE={{ trace.otel.insecure }}
{% endif %}
{% endif %}

View File

@ -13,18 +13,20 @@ INTERNAL_VERIFY_CLIENT_CERT=true
TRACE_ENABLED=true
TRACE_SERVICE_NAME=harbor-registryctl
TRACE_SAMPLE_RATE={{ trace.sample_rate }}
{% if trace.jaeger is defined %}
TRACE_NAMESPACE={{ trace.namespace }}
TRACE_ATTRIBUTES={{ trace.attributes | to_json | safe }}
{% if trace.jaeger.enabled %}
TRACE_JAEGER_ENDPOINT={{ trace.jaeger.endpoint if trace.jaeger.endpoint else '' }}
TRACE_JAEGER_USERNAME={{ trace.jaeger.username if trace.jaeger.username else '' }}
TRACE_JAEGER_PASSWORD={{ trace.jaeger.password if trace.jaeger.password else '' }}
TRACE_JAEGER_AGENT_HOSTNAME={{ trace.jaeger.agent_host if trace.jaeger.agent_host else '' }}
TRACE_JAEGER_AGENT_PORT={{ trace.jaeger.agent_port if trace.jaeger.agent_port else '' }}
{% endif %}
{%if trace.otel is defined %}
TRACE_OTEL_ENDPOINT={{ trace.otel.endpoint if trace.otel.endpoint else '' }}
{%if trace.otel.enabled %}
TRACE_OTEL_ENDPOINT={{ trace.otel.endpoint }}
TRACE_OTEL_URL_PATH={{ trace.otel.url_path if trace.otel.url_path else '' }}
TRACE_OTEL_COMPRESSION={{ trace.otel.compression if trace.otel.compression else '' }}
TRACE_OTEL_COMPRESSION={{ trace.otel.compression }}
TRACE_OTEL_TIMEOUT={{ trace.otel.timeout }}
TRACE_OTEL_INSECURE={{ trace.otel.insecure if trace.otel.insecure else '' }}
{% endif %}
TRACE_OTEL_INSECURE={{ trace.otel.insecure }}
{% endif %}
{% endif %}

View File

@ -329,8 +329,7 @@ def parse_yaml_config(config_file_path, with_notary, with_trivy, with_chartmuseu
# trace configs
trace_config = configs.get('trace')
if trace_config:
config_dict = Trace(trace_config)
config_dict['trace'] = Trace(trace_config or {})
if config_dict['internal_tls'].enabled:
config_dict['portal_url'] = 'https://portal:8443'

View File

@ -1,8 +1,16 @@
import json
from jinja2 import Environment, FileSystemLoader
from .misc import mark_file
jinja_env = Environment(loader=FileSystemLoader('/'), trim_blocks=True, lstrip_blocks=True)
def to_json(value):
return json.dumps(value)
jinja_env.filters['to_json'] = to_json
def render_jinja(src, dest,mode=0o640, uid=0, gid=0, **kw):
t = jinja_env.get_template(src)
with open(dest, 'w') as f:

View File

@ -67,6 +67,7 @@ func newDefaultTransport() *http.Transport {
}
}
// WithInternalTLSConfig returns a TransportOption that configures the transport to use the internal TLS configuration
func WithInternalTLSConfig() func(*http.Transport) {
return func(tr *http.Transport) {
tlsConfig, err := GetInternalTLSConfig()
@ -76,24 +77,29 @@ func WithInternalTLSConfig() func(*http.Transport) {
tr.TLSClientConfig = tlsConfig
}
}
// WithInsecureSkipVerify returns a TransportOption that configures the transport to skip verification of the server's certificate
func WithInsecureSkipVerify(skipVerify bool) func(*http.Transport) {
return func(tr *http.Transport) {
tr.TLSClientConfig.InsecureSkipVerify = skipVerify
}
}
// WithMaxIdleConnsPerHost returns a TransportOption that configures the transport to use the specified number of idle connections per host
func WithMaxIdleConns(maxIdleConns int) func(*http.Transport) {
return func(tr *http.Transport) {
tr.MaxIdleConns = maxIdleConns
}
}
// WithIdleConnTimeout returns a TransportOption that configures the transport to use the specified idle connection timeout
func WithIdleconnectionTimeout(idleConnectionTimeout time.Duration) func(*http.Transport) {
return func(tr *http.Transport) {
tr.IdleConnTimeout = idleConnectionTimeout
}
}
// NewTransport returns a new http.Transport with the specified options
func NewTransport(opts ...func(*http.Transport)) http.RoundTripper {
tr := newDefaultTransport()
for _, opt := range opts {
@ -102,11 +108,15 @@ func NewTransport(opts ...func(*http.Transport)) http.RoundTripper {
return tr
}
// TransportConfig is the configuration for http transport
type TransportConfig struct {
Insecure bool
}
// TransportOption is the option for http transport
type TransportOption func(*TransportConfig)
// WithInsecure returns a TransportOption that configures the transport to skip verification of the server's certificate
func WithInsecure(skipVerify bool) TransportOption {
return func(cfg *TransportConfig) {
cfg.Insecure = skipVerify

View File

@ -87,17 +87,27 @@ func updateInitPassword(ctx context.Context, userID int, password string) error
return nil
}
func gracefulShutdown(closing, done chan struct{}) {
func gracefulShutdown(closing, done chan struct{}, shutdowns ...func()) {
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
log.Infof("capture system signal %s, to close \"closing\" channel", <-signals)
close(closing)
select {
case <-done:
shutdownChan := make(chan struct{}, 1)
go func() {
for _, s := range shutdowns {
s()
}
<-done
log.Infof("Goroutines exited normally")
shutdownChan <- struct{}{}
}()
select {
case <-shutdownChan:
log.Infof("all shutdown jobs done")
case <-time.After(time.Second * 3):
log.Infof("Timeout waiting goroutines to exit")
}
os.Exit(0)
}
@ -177,14 +187,7 @@ func main() {
go metric.ServeProm(metricCfg.Path, metricCfg.Port)
}
ctx := context.Background()
if tracelib.Enabled() {
tp := tracelib.InitGlobalTracer(ctx)
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Errorf("Error shutting down tracer provider: %v", err)
}
}()
}
shutdownTracerProvider := tracelib.InitGlobalTracer(ctx)
token.InitCreators()
database, err := config.Database()
if err != nil {
@ -196,7 +199,6 @@ func main() {
if err = migration.Migrate(database); err != nil {
log.Fatalf("failed to migrate: %v", err)
}
// ctx := orm.Context()
ctx = orm.Clone(ctx)
if err := config.Load(ctx); err != nil {
log.Fatalf("failed to load config: %v", err)
@ -222,7 +224,7 @@ func main() {
closing := make(chan struct{})
done := make(chan struct{})
go gracefulShutdown(closing, done)
go gracefulShutdown(closing, done, shutdownTracerProvider)
// Start health checker for registries
go registry.Ctl.StartRegularHealthCheck(orm.Context(), closing, done)

View File

@ -26,13 +26,13 @@ import (
"github.com/goharbor/harbor/src/server/middleware/log"
"github.com/goharbor/harbor/src/server/middleware/mergeslash"
"github.com/goharbor/harbor/src/server/middleware/metric"
"github.com/goharbor/harbor/src/server/middleware/trace"
"github.com/goharbor/harbor/src/server/middleware/notification"
"github.com/goharbor/harbor/src/server/middleware/orm"
"github.com/goharbor/harbor/src/server/middleware/readonly"
"github.com/goharbor/harbor/src/server/middleware/requestid"
"github.com/goharbor/harbor/src/server/middleware/security"
"github.com/goharbor/harbor/src/server/middleware/session"
"github.com/goharbor/harbor/src/server/middleware/trace"
"github.com/goharbor/harbor/src/server/middleware/transaction"
)

View File

@ -28,7 +28,6 @@ import (
"github.com/goharbor/harbor/src/jobservice/logger"
"github.com/goharbor/harbor/src/jobservice/runtime"
cfgLib "github.com/goharbor/harbor/src/lib/config"
"github.com/goharbor/harbor/src/lib/log"
tracelib "github.com/goharbor/harbor/src/lib/trace"
_ "github.com/goharbor/harbor/src/pkg/config/rest"
)
@ -60,14 +59,7 @@ func main() {
panic(err)
}
if tracelib.Enabled() {
tp := tracelib.InitGlobalTracer(ctx)
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Errorf("Error shutting down tracer provider: %v", err)
}
}()
}
defer tracelib.InitGlobalTracer(context.Background()).Shutdown()
// Set job context initializer
runtime.JobService.SetJobContextInitializer(func(ctx context.Context) (job.Context, error) {

View File

@ -15,6 +15,7 @@
package runner
import (
"context"
"fmt"
"runtime"
"time"
@ -28,10 +29,14 @@ import (
"github.com/goharbor/harbor/src/jobservice/period"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/metric"
tracelib "github.com/goharbor/harbor/src/lib/trace"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
)
const (
maxTrackRetries = 6
tracerName = "goharbor/harbor/src/jobservice/runner/redis"
)
// RedisJob is a job wrapper to wrap the job.Interface to the style which can be recognized by the redis worker.
@ -52,6 +57,9 @@ func NewRedisJob(job interface{}, ctx *env.Context, ctl lcm.Controller) *RedisJo
// Run the job
func (rj *RedisJob) Run(j *work.Job) (err error) {
_, span := tracelib.StartTrace(context.Background(), tracerName, "run-job")
defer span.End()
var (
runningJob job.Interface
execContext job.Context
@ -65,9 +73,10 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
// Check if the job is a periodic one as periodic job has its own ID format
if eID, yes := isPeriodicJobExecution(j); yes {
jID = eID
span.SetAttributes(attribute.Key("periodicJob").Bool(true))
logger.Infof("Start to run periodical job execution: %s", eID)
}
span.SetAttributes(attribute.Key("jobID").String(jID), attribute.Key("jobName").String(j.Name))
// As the job stats may not be ready when job executing sometimes (corner case),
// the track call here may get NOT_FOUND error. For that case, let's do retry to recovery.
@ -76,7 +85,6 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
if err == nil {
break
}
if errs.IsObjectNotFoundError(err) {
if retried < maxTrackRetries {
// Still have chance to re-track the given job.
@ -84,6 +92,7 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
b := backoff(retried)
logger.Errorf("Track job %s: stats may not have been ready yet, hold for %d ms and retry again", jID, b)
<-time.After(time.Duration(b) * time.Millisecond)
span.AddEvent("retring to get job stat")
continue
} else {
// Exit and never try.
@ -94,7 +103,7 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
// Log error and exit
logger.Errorf("Job '%s:%s' exit with error: failed to get job tracker: %s", j.Name, j.ID, err)
tracelib.RecordError(span, err, "failed to get job tracker")
// ELSE:
// As tracker creation failed, there is no way to mark the job status change.
// Also a non nil error return consumes a fail. If all retries are failed here,
@ -113,8 +122,10 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
logger.Errorf("Job '%s:%s' exit with error: %s", j.Name, j.ID, err)
metric.JobserviceTotalTask.WithLabelValues(j.Name, "fail").Inc()
metric.JobservieTaskProcessTimeSummary.WithLabelValues(j.Name, "fail").Observe(time.Since(now).Seconds())
tracelib.RecordError(span, err, "job failed with err")
if er := tracker.Fail(); er != nil {
logger.Errorf("Error occurred when marking the status of job %s:%s to failure: %s", j.Name, j.ID, er)
span.RecordError(err)
}
return
@ -129,6 +140,7 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
// Logged
metric.JobserviceTotalTask.WithLabelValues(j.Name, "stop").Inc()
metric.JobservieTaskProcessTimeSummary.WithLabelValues(j.Name, "stop").Observe(time.Since(now).Seconds())
tracelib.RecordError(span, err, "job stopped")
logger.Infof("Job %s:%s is stopped", j.Name, j.ID)
return
}
@ -137,7 +149,9 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
metric.JobservieTaskProcessTimeSummary.WithLabelValues(j.Name, "success").Observe(time.Since(now).Seconds())
// Mark job status to success.
logger.Infof("Job '%s:%s' exit with success", j.Name, j.ID)
span.SetStatus(codes.Ok, "job exit with success")
if er := tracker.Succeed(); er != nil {
tracelib.RecordError(span, err, "failed to mark job success")
logger.Errorf("Error occurred when marking the status of job %s:%s to success: %s", j.Name, j.ID, er)
}
}()
@ -169,6 +183,7 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
// Reset job info.
if err = tracker.Reset(); err != nil {
// Log error and return the original error if existing
tracelib.RecordError(span, err, "reset job failed")
err = errors.Wrap(err, fmt.Sprintf("retrying %s job %s:%s failed", jStatus.String(), j.Name, j.ID))
if len(j.LastErr) > 0 {
@ -184,7 +199,9 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
// do nothing
return nil
default:
return errors.Errorf("mismatch status for running job: expected %s/%s but got %s", job.PendingStatus, job.ScheduledStatus, jStatus.String())
err = errors.Errorf("mismatch status for running job: expected %s/%s but got %s", job.PendingStatus, job.ScheduledStatus, jStatus.String())
tracelib.RecordError(span, err, "status mismatch")
return err
}
// Build job context
@ -197,6 +214,7 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
// Close open io stream first
if closer, ok := execContext.GetLogger().(logger.Closer); ok {
if er := closer.Close(); er != nil {
tracelib.RecordError(span, er, "close job logger failed")
logger.Errorf("Close job logger failed: %s", er)
}
}
@ -206,6 +224,7 @@ func (rj *RedisJob) Run(j *work.Job) (err error) {
runningJob = Wrap(rj.job)
// Set status to run
if err = tracker.Run(); err != nil {
tracelib.RecordError(span, err, "failed set status to run")
return
}
// Run the job

View File

@ -23,8 +23,6 @@ import (
"strings"
"github.com/astaxie/beego/orm"
"go.opentelemetry.io/otel/codes"
oteltrace "go.opentelemetry.io/otel/trace"
"github.com/goharbor/harbor/src/lib/log"
tracelib "github.com/goharbor/harbor/src/lib/trace"
@ -52,6 +50,8 @@ func RegisterModel(models ...interface{}) {
type ormKey struct{}
const tracerName = "goharbor/harbor/src/lib/orm"
func init() {
if os.Getenv("ORM_DEBUG") == "true" {
orm.Debug = true
@ -88,28 +88,17 @@ func Clone(ctx context.Context) context.Context {
// WithTransaction a decorator which make f run in transaction
func WithTransaction(f func(ctx context.Context) error) func(ctx context.Context) error {
return func(ctx context.Context) error {
var span oteltrace.Span
if tracelib.Enabled() {
span = oteltrace.SpanFromContext(ctx)
tr := span.TracerProvider().Tracer("goharbor/harbor/src/lib/orm")
ctx, span = tr.Start(ctx, "start transaction")
defer span.End()
}
_, span := tracelib.StartTrace(ctx, tracerName, "start-transaction")
defer span.End()
o, err := FromContext(ctx)
if err != nil {
if tracelib.Enabled() {
span.RecordError(err)
span.SetStatus(codes.Error, "get orm from ctx failed")
}
tracelib.RecordError(span, err, "get orm from ctx failed")
return err
}
tx := ormerTx{Ormer: o}
if err := tx.Begin(); err != nil {
if tracelib.Enabled() {
span.RecordError(err)
span.SetStatus(codes.Error, "begin transaction failed")
}
tracelib.RecordError(span, err, "begin transaction failed")
log.Errorf("begin transaction failed: %v", err)
return err
}
@ -117,10 +106,7 @@ func WithTransaction(f func(ctx context.Context) error) func(ctx context.Context
if err := f(ctx); err != nil {
span.AddEvent("rollback transaction")
if e := tx.Rollback(); e != nil {
if tracelib.Enabled() {
span.RecordError(err)
span.SetStatus(codes.Error, "rollback transaction failed")
}
tracelib.RecordError(span, e, "rollback transaction failed")
log.Errorf("rollback transaction failed: %v", e)
return e
}
@ -129,10 +115,7 @@ func WithTransaction(f func(ctx context.Context) error) func(ctx context.Context
}
span.AddEvent("commit transaction")
if err := tx.Commit(); err != nil {
if tracelib.Enabled() {
span.RecordError(err)
span.SetStatus(codes.Error, "commit transaction failed")
}
tracelib.RecordError(span, err, "commit transaction failed")
log.Errorf("commit transaction failed: %v", err)
return err
}

View File

@ -1,35 +1,63 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package trace
import (
"bytes"
"fmt"
"strings"
"github.com/goharbor/harbor/src/lib/log"
"github.com/spf13/viper"
)
const (
TraceEnvPrefix = "Trace"
TraceEnvPrefix = "trace"
)
// C is the global configuration for trace
var C Config
func init() {
log.Infof("parsing trace config from env")
viper.AutomaticEnv()
viper.SetConfigType("json")
viper.SetEnvPrefix(TraceEnvPrefix)
log.Infof("Set env prefix to %s", TraceEnvPrefix)
var config Config
var jaeger JaegerConfig
var otel OtelConfig
viper.Unmarshal(&config)
viper.Unmarshal(&jaeger)
viper.Unmarshal(&otel)
config.Jaeger = jaeger
config.Otel = otel
C = config
}
type OtelTraceConfig struct {
viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
viper.AutomaticEnv()
C = Config{Otel: OtelConfig{}, Jaeger: JaegerConfig{}}
C.Enabled = viper.GetBool("enabled")
C.SampleRate = viper.GetFloat64("sample_rate")
C.Namespace = viper.GetString("namespace")
C.ServiceName = viper.GetString("service_name")
C.Jaeger.Endpoint = viper.GetString("jaeger_endpoint")
C.Jaeger.Username = viper.GetString("jaeger_agent_username")
C.Jaeger.Password = viper.GetString("jaeger_agent_password")
C.Jaeger.AgentHost = viper.GetString("jaeger_agent_host")
C.Jaeger.AgentPort = viper.GetString("jaeger_agent_port")
C.Otel.Endpoint = viper.GetString("otel_endpoint")
C.Otel.URLPath = viper.GetString("otel_url_path")
C.Otel.Compression = viper.GetBool("otel_compression")
C.Otel.Insecure = viper.GetBool("otel_insecure")
C.Otel.Timeout = viper.GetInt("otel_timeout")
var jsonExample = []byte(viper.GetString("attributes"))
viper.ReadConfig(bytes.NewBuffer(jsonExample))
fmt.Println(viper.GetStringMapString("attributes"))
C.Attributes = viper.GetStringMapString("attributes")
log.Infof("ns: %s attr %+v", C.Namespace, C.Attributes)
}
// OtelConfig is the configuration for otel
type OtelConfig struct {
Endpoint string `mapstructure:"otel_trace_endpoint"`
URLPath string `mapstructure:"otel_trace_url_path"`
@ -37,6 +65,8 @@ type OtelConfig struct {
Insecure bool `mapstructure:"otel_trace_insecure"`
Timeout int `mapstructure:"otel_trace_timeout"`
}
// JaegerConfig is the configuration for Jaeger
type JaegerConfig struct {
Endpoint string `mapstructure:"jaeger_endpoint"`
Username string `mapstructure:"jaeger_username"`
@ -44,18 +74,24 @@ type JaegerConfig struct {
AgentHost string `mapstructure:"jaeger_agent_host"`
AgentPort string `mapstructure:"jaeger_agent_port"`
}
// Config is the configuration for trace
type Config struct {
Enabled bool `mapstructure:"enabled"`
SampleRate float64 `mapstructure:"sample_rate"`
Jaeger JaegerConfig
Otel OtelConfig
Attribute map[string]string
Enabled bool `mapstructure:"enabled"`
SampleRate float64 `mapstructure:"sample_rate"`
Namespace string `mapstructure:"namespace"`
ServiceName string `mapstructure:"service_name"`
Jaeger JaegerConfig
Otel OtelConfig
Attributes map[string]string
}
// GetConfig returns the global configuration for trace
func GetConfig() Config {
return C
}
// Enabled returns whether trace is enabled
func Enabled() bool {
return C.Enabled
}

64
src/lib/trace/helper.go Normal file
View File

@ -0,0 +1,64 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package trace
import (
"context"
"net/http"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
oteltrace "go.opentelemetry.io/otel/trace"
)
// GetGlobalTracer returns the global tracer.
func GetGlobalTracer(instrumentationName string, opts ...oteltrace.TracerOption) oteltrace.Tracer {
return otel.GetTracerProvider().Tracer(instrumentationName, opts...)
}
// StartSpan starts a span with the given name.
func StartSpan(ctx context.Context, name string) (context.Context, oteltrace.Span) {
return otel.Tracer("goharbor/harbor/src/lib/trace").Start(ctx, name)
}
// SpanFromContext returns the span from the context.
func SpanFromHTTPRequest(req *http.Request) oteltrace.Span {
ctx := req.Context()
return oteltrace.SpanFromContext(ctx)
}
// RecordError records the error in the span from context.
func RecordError(span oteltrace.Span, err error, description string) {
if span == nil {
return
}
span.RecordError(err)
span.SetStatus(codes.Error, description)
}
// NewHandler returns a handler that wraps the given handler with tracing.
func NewHandler(h http.Handler, operation string) http.Handler {
httpOptions := []otelhttp.Option{
otelhttp.WithTracerProvider(otel.GetTracerProvider()),
otelhttp.WithPropagators(otel.GetTextMapPropagator()),
}
return otelhttp.NewHandler(h, operation, httpOptions...)
}
// StarTrace returns a new span with the given name.
func StartTrace(ctx context.Context, tracerName string, spanName string, opts ...oteltrace.SpanStartOption) (context.Context, oteltrace.Span) {
return otel.Tracer(tracerName).Start(ctx, spanName, opts...)
}

View File

@ -16,7 +16,6 @@ package trace
import (
"context"
"log"
"time"
"go.opentelemetry.io/otel"
@ -27,27 +26,19 @@ import (
"go.opentelemetry.io/otel/sdk/resource"
tracesdk "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
"go.opentelemetry.io/otel/trace"
)
oteltrace "go.opentelemetry.io/otel/trace"
const (
service = "core"
environment = "production"
traceServiceName = "goharbor/harbor"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/pkg/version"
)
type ProviderConfig struct {
ExporterType string
URL string
Attribute map[string]string
}
func initExporter(ctx context.Context) (tracesdk.SpanExporter, error) {
var err error
var exp tracesdk.SpanExporter
cfg := GetConfig()
if len(cfg.Jaeger.Endpoint) != 0 {
// Jaeger collector exporter
log.Infof("init trace provider jaeger collector on %s with user %s", cfg.Jaeger.Endpoint, cfg.Jaeger.Username)
exp, err = jaeger.New(jaeger.WithCollectorEndpoint(
jaeger.WithEndpoint(cfg.Jaeger.Endpoint),
jaeger.WithUsername(cfg.Jaeger.Username),
@ -55,12 +46,14 @@ func initExporter(ctx context.Context) (tracesdk.SpanExporter, error) {
))
} else if len(cfg.Jaeger.AgentHost) != 0 {
// Jaeger agent exporter
log.Infof("init trace provider jaeger agent on %s", cfg.Jaeger.AgentHost)
exp, err = jaeger.New(jaeger.WithAgentEndpoint(
jaeger.WithAgentHost(cfg.Jaeger.AgentHost),
jaeger.WithAgentPort(cfg.Jaeger.AgentPort),
))
} else if len(cfg.Otel.Endpoint) != 0 {
// Otel exporter
log.Infof("init trace provider otel on %s/%s", cfg.Otel.Endpoint, cfg.Otel.URLPath)
opts := []otlptracehttp.Option{
otlptracehttp.WithEndpoint(cfg.Otel.Endpoint),
otlptracehttp.WithURLPath(cfg.Otel.URLPath),
@ -69,7 +62,9 @@ func initExporter(ctx context.Context) (tracesdk.SpanExporter, error) {
if cfg.Otel.Compression {
opts = append(opts, otlptracehttp.WithCompression(otlptracehttp.GzipCompression))
}
if cfg.Otel.Insecure {
opts = append(opts, otlptracehttp.WithInsecure())
}
exp, err = otlptracehttp.New(ctx, opts...)
} else {
log.Fatalf("Trace is enabled but no tracer provider is specified")
@ -79,44 +74,59 @@ func initExporter(ctx context.Context) (tracesdk.SpanExporter, error) {
func initProvider(exp tracesdk.SpanExporter) (*tracesdk.TracerProvider, error) {
cfg := GetConfig()
ops := make([]tracesdk.TracerProviderOption, 4)
ops = append(ops,
// Always be sure to batch in production.
tracesdk.WithBatcher(exp),
// Record information about this application in an Resource.
tracesdk.WithResource(resource.NewWithAttributes(
semconv.SchemaURL,
semconv.ServiceNameKey.String(service),
)),
)
attriSlice := make([]attribute.KeyValue, 0, len(cfg.Attribute))
if cfg.Attribute != nil {
for i, a := range cfg.Attribute {
// prepare attribute resources
attriSlice := []attribute.KeyValue{
semconv.ServiceNameKey.String(cfg.ServiceName),
semconv.ServiceVersionKey.String(version.ReleaseVersion)}
if cfg.Namespace != "" {
attriSlice = append(attriSlice, semconv.ServiceNamespaceKey.String(cfg.Namespace))
}
if cfg.Attributes != nil {
for i, a := range cfg.Attributes {
attriSlice = append(attriSlice, attribute.String(i, a))
}
ops = append(ops, tracesdk.WithResource(resource.NewWithAttributes(semconv.SchemaURL, attriSlice...)))
}
bsp := tracesdk.NewBatchSpanProcessor(exp)
ops = append(ops, tracesdk.WithSpanProcessor(bsp), tracesdk.WithSampler(tracesdk.TraceIDRatioBased(cfg.SampleRate)))
tp := tracesdk.NewTracerProvider(ops...)
// prepare tp options
ops := make([]tracesdk.TracerProviderOption, 0, 4)
ops = append(ops,
// Always be sure to batch in production.
// tracesdk.WithBatcher(exp),
tracesdk.WithBatcher(exp),
// Record information about this application in an Resource.
tracesdk.WithResource(resource.NewWithAttributes(semconv.SchemaURL, attriSlice...)),
tracesdk.WithSampler(tracesdk.TraceIDRatioBased(cfg.SampleRate)),
)
// init trace provider
tp := tracesdk.NewTracerProvider(ops...)
return tp, nil
}
func InitGlobalTracer(ctx context.Context) *tracesdk.TracerProvider {
// ShutdownFunc is a function to shutdown the trace provider
type ShutdownFunc func()
// Shutdown shutdown the trace provider
func (s ShutdownFunc) Shutdown() {
s()
}
// Init initializes the trace provider
func InitGlobalTracer(ctx context.Context) ShutdownFunc {
if !Enabled() {
otel.SetTracerProvider(oteltrace.NewNoopTracerProvider())
return func() {}
}
exp, err := initExporter(ctx)
handleErr(err, "fail in exporter initialization")
tp, err := initProvider(exp)
handleErr(err, "fail in tracer initialization")
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
otel.SetTracerProvider(tp)
return tp
}
func GetGlobalTracer(instrumentationName string, opts ...trace.TracerOption) trace.Tracer {
return otel.GetTracerProvider().Tracer(instrumentationName, opts...)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
return func() {
log.Infof("shutdown trace provider")
handleErr(tp.Shutdown(ctx), "fail in tracer shutdown")
}
}
func handleErr(err error, message string) {

View File

@ -2,14 +2,21 @@ package blob
import (
"errors"
"net/http"
"github.com/goharbor/harbor/src/lib/log"
tracelib "github.com/goharbor/harbor/src/lib/trace"
"github.com/goharbor/harbor/src/registryctl/api"
"github.com/docker/distribution/registry/storage"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/registryctl/api"
"github.com/gorilla/mux"
"net/http"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
const tracerName = "goharbor/harbor/src/registryctl/api/registry/blob"
// NewHandler returns the handler to handler blob request
func NewHandler(storageDriver storagedriver.StorageDriver) http.Handler {
return &handler{
@ -33,14 +40,19 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// DeleteBlob ...
func (h *handler) delete(w http.ResponseWriter, r *http.Request) {
ctx, span := tracelib.StartTrace(r.Context(), tracerName, "delete-blob", trace.WithAttributes(attribute.Key("method").String(r.Method)))
defer span.End()
ref := mux.Vars(r)["reference"]
if ref == "" {
api.HandleBadRequest(w, errors.New("no reference specified"))
err := errors.New("no reference specified")
tracelib.RecordError(span, err, "no reference specified")
api.HandleBadRequest(w, err)
return
}
// don't parse the reference here as RemoveBlob does.
cleaner := storage.NewVacuum(r.Context(), h.storageDriver)
cleaner := storage.NewVacuum(ctx, h.storageDriver)
if err := cleaner.RemoveBlob(ref); err != nil {
tracelib.RecordError(span, err, "failed to remove blob")
log.Infof("failed to remove blob: %s, with error:%v", ref, err)
api.HandleError(w, err)
return

View File

@ -1,14 +1,15 @@
package blob
import (
"net/http"
"net/http/httptest"
"testing"
"github.com/docker/distribution/registry/storage/driver/inmemory"
"github.com/docker/distribution/testutil"
"github.com/goharbor/harbor/src/registryctl/api/registry/test"
"github.com/gorilla/mux"
"github.com/stretchr/testify/assert"
"net/http"
"net/http/httptest"
"testing"
)
func TestDeletionBlob(t *testing.T) {

View File

@ -1,16 +1,23 @@
package manifest
import (
"github.com/docker/distribution/registry/storage"
storagedriver "github.com/docker/distribution/registry/storage/driver"
"net/http"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/lib/log"
"github.com/goharbor/harbor/src/registryctl/api"
"github.com/docker/distribution/registry/storage"
storagedriver "github.com/docker/distribution/registry/storage/driver"
tracelib "github.com/goharbor/harbor/src/lib/trace"
"github.com/gorilla/mux"
"github.com/opencontainers/go-digest"
"net/http"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
const tracerName = "goharbor/harbor/src/registryctl/api/registry/blob"
// NewHandler returns the handler to handler manifest request
func NewHandler(storageDriver storagedriver.StorageDriver) http.Handler {
return &handler{
@ -34,25 +41,37 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// delete deletes manifest ...
func (h *handler) delete(w http.ResponseWriter, r *http.Request) {
var span trace.Span
ctx := r.Context()
ref := mux.Vars(r)["reference"]
if tracelib.Enabled() {
ctx, span = tracelib.StartTrace(ctx, tracerName, "delete-manifest", trace.WithAttributes(attribute.Key("method").String(r.Method)))
defer span.End()
}
if ref == "" {
api.HandleBadRequest(w, errors.New("no reference specified"))
err := errors.New("no reference specified")
tracelib.RecordError(span, err, "no reference specified ")
api.HandleBadRequest(w, err)
return
}
dgst, err := digest.Parse(ref)
if err != nil {
tracelib.RecordError(span, err, "invalid reference")
api.HandleBadRequest(w, errors.Wrap(err, "not supported reference"))
return
}
repoName := mux.Vars(r)["name"]
if repoName == "" {
api.HandleBadRequest(w, errors.New("no repository name specified"))
err := errors.New("no repository name specified")
tracelib.RecordError(span, err, "no repository name specified")
api.HandleBadRequest(w, err)
return
}
// let the tags as empty here, as it non-blocking GC. The tags deletion will be handled via DELETE /v2/manifest
var tags []string
cleaner := storage.NewVacuum(r.Context(), h.storageDriver)
cleaner := storage.NewVacuum(ctx, h.storageDriver)
if err := cleaner.RemoveManifest(repoName, dgst, tags); err != nil {
tracelib.RecordError(span, err, "failed to remove blob")
log.Infof("failed to remove manifest: %s, with error:%v", ref, err)
api.HandleError(w, err)
return

View File

@ -19,6 +19,10 @@ import (
"crypto/tls"
"flag"
"net/http"
"os"
"os/signal"
"syscall"
"time"
_ "github.com/docker/distribution/registry/storage/driver/azure"
_ "github.com/docker/distribution/registry/storage/driver/filesystem"
@ -50,6 +54,21 @@ func (s *RegistryCtl) Start() {
Handler: s.Handler,
TLSConfig: common_http.NewServerTLSConfig(),
}
ctx := context.Background()
regCtl.RegisterOnShutdown(tracelib.InitGlobalTracer(ctx))
// graceful shutdown
go func() {
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
<-sig
context, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
log.Infof("Got an interrupt, shutting down...")
if err := regCtl.Shutdown(context); err != nil {
log.Fatalf("Failed to shutdown registry controller: %v", err)
}
log.Infof("Registry controller is shut down properly")
}()
var err error
if s.ServerConf.Protocol == "https" {
@ -60,11 +79,10 @@ func (s *RegistryCtl) Start() {
} else {
err = regCtl.ListenAndServe()
}
<-ctx.Done()
if err != nil {
log.Fatal(err)
}
return
}
@ -79,16 +97,6 @@ func main() {
if err := config.DefaultConfig.Load(*configPath, true); err != nil {
log.Fatalf("Failed to load configurations with error: %s\n", err)
}
if tracelib.Enabled() {
tp := tracelib.InitGlobalTracer(context.Background())
defer func() {
if err := tp.Shutdown(context.Background()); err != nil {
log.Errorf("Error shutting down tracer provider: %v", err)
}
}()
}
regCtl := &RegistryCtl{
ServerConf: *config.DefaultConfig,
Handler: handlers.NewHandlerChain(*config.DefaultConfig),

View File

@ -35,8 +35,7 @@ func Middleware() func(http.Handler) http.Handler {
ctx := log.WithLogger(r.Context(), logger.WithFields(log.Fields{"requestID": rid}))
if tracelib.Enabled() {
span := oteltrace.SpanFromContext(ctx)
span.SetAttributes(attribute.Key("request ID").String(rid))
oteltrace.SpanFromContext(ctx).SetAttributes(attribute.Key("request-id").String(rid))
}
next.ServeHTTP(w, r.WithContext(ctx))
} else {

View File

@ -18,6 +18,7 @@ import (
"net/http"
"github.com/goharbor/harbor/src/server/middleware"
"github.com/google/uuid"
)
@ -32,7 +33,6 @@ func Middleware(skippers ...middleware.Skipper) func(http.Handler) http.Handler
rid = uuid.New().String()
r.Header.Set(HeaderXRequestID, rid)
}
w.Header().Set(HeaderXRequestID, rid)
next.ServeHTTP(w, r)
}, skippers...)

View File

@ -1,3 +1,17 @@
// Copyright Project Harbor Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package trace
import (