Merge pull request #2693 from reasonerjt/clair-notification

Clair notification handler
This commit is contained in:
Daniel Jiang 2017-07-05 20:18:34 +08:00 committed by GitHub
commit b96770b90a
11 changed files with 321 additions and 6 deletions

View File

@ -194,6 +194,14 @@ create table img_scan_overview (
PRIMARY KEY(image_digest)
);
create table clair_vuln_timestamp (
id int NOT NULL AUTO_INCREMENT,
namespace varchar(128) NOT NULL,
last_update timestamp NOT NULL,
PRIMARY KEY(id),
UNIQUE(namespace)
);
create table properties (
k varchar(64) NOT NULL,
v varchar(128) NOT NULL,

View File

@ -150,7 +150,7 @@ create table replication_target (
);
create table replication_job (
id INTEGER PRIMARY KEY,
id INTEGER PRIMARY KEY,
status varchar(64) NOT NULL,
policy_id int NOT NULL,
repository varchar(256) NOT NULL,
@ -187,6 +187,13 @@ create table img_scan_overview (
CREATE INDEX policy ON replication_job (policy_id);
CREATE INDEX poid_uptime ON replication_job (policy_id, update_time);
create table clair_vuln_timestamp (
id INTEGER PRIMARY KEY,
namespace varchar(128) NOT NULL,
last_update timestamp NOT NULL,
UNIQUE(namespace)
);
create table properties (
k varchar(64) NOT NULL,
v varchar(128) NOT NULL,

View File

@ -16,8 +16,10 @@ clair:
# Deadline before an API request will respond with a 503
timeout: 300s
updater:
interval: 2h
interval: 1h
notifier:
attempts: 3
renotifyinterval: 2h
http:
endpoint: http://ui/service/notifications/clair

54
src/common/dao/clair.go Normal file
View File

@ -0,0 +1,54 @@
// copyright (c) 2017 vmware, inc. all rights reserved.
//
// licensed under the apache license, version 2.0 (the "license");
// you may not use this file except in compliance with the license.
// you may obtain a copy of the license at
//
// http://www.apache.org/licenses/license-2.0
//
// unless required by applicable law or agreed to in writing, software
// distributed under the license is distributed on an "as is" basis,
// without warranties or conditions of any kind, either express or implied.
// see the license for the specific language governing permissions and
// limitations under the license.
package dao
import (
"github.com/vmware/harbor/src/common/models"
"fmt"
"time"
)
//SetClairVulnTimestamp update the last_update of a namespace. If there's no record for this namespace, one will be created.
func SetClairVulnTimestamp(namespace string, timestamp time.Time) error {
o := GetOrmer()
rec := &models.ClairVulnTimestamp{
Namespace: namespace,
LastUpdate: timestamp,
}
created, _, err := o.ReadOrCreate(rec, "Namespace")
if err != nil {
return err
}
if !created {
rec.LastUpdate = timestamp
n, err := o.Update(rec)
if err != nil {
return err
}
if n == 0 {
return fmt.Errorf("No record is updated, record: %v", *rec)
}
}
return nil
}
//ListClairVulnTimestamps return a list of all records in vuln timestamp table.
func ListClairVulnTimestamps() ([]*models.ClairVulnTimestamp, error) {
var res []*models.ClairVulnTimestamp
o := GetOrmer()
_, err := o.QueryTable(models.ClairVulnTimestampTable).All(&res)
return res, err
}

View File

@ -1733,3 +1733,33 @@ func TestImgScanOverview(t *testing.T) {
assert.Equal(int(models.SevMedium), res.Sev)
assert.Equal(2, res.CompOverview.Summary[0].Count)
}
func TestVulnTimestamp(t *testing.T) {
assert := assert.New(t)
err := ClearTable(models.ClairVulnTimestampTable)
assert.Nil(err)
ns := "ubuntu:14"
res, err := ListClairVulnTimestamps()
assert.Nil(err)
assert.Equal(0, len(res))
err = SetClairVulnTimestamp(ns, time.Now())
assert.Nil(err)
res, err = ListClairVulnTimestamps()
assert.Nil(err)
assert.Equal(1, len(res))
assert.Equal(ns, res[0].Namespace)
old := time.Now()
t.Logf("Sleep 3 seconds")
time.Sleep(3 * time.Second)
err = SetClairVulnTimestamp(ns, time.Now())
assert.Nil(err)
res, err = ListClairVulnTimestamps()
assert.Nil(err)
assert.Equal(1, len(res))
d := res[0].LastUpdate.Sub(old)
if d < 2*time.Second {
t.Errorf("Delta should be larger than 2 seconds! old: %v, lastupdate: %v", old, res[0].LastUpdate)
}
}

View File

@ -28,5 +28,6 @@ func init() {
new(AccessLog),
new(ScanJob),
new(RepoRecord),
new(ImgScanOverview))
new(ImgScanOverview),
new(ClairVulnTimestamp))
}

View File

@ -14,6 +14,25 @@
package models
import (
"time"
)
// ClairVulnTimestampTable is the name of the table that tracks the timestamp of vulnerability in Clair.
const ClairVulnTimestampTable = "clair_vuln_timestamp"
// ClairVulnTimestamp represents a record in DB that tracks the timestamp of vulnerability in Clair.
type ClairVulnTimestamp struct {
ID int64 `orm:"pk;auto;column(id)" json:"-"`
Namespace string `orm:"column(namespace)" json:"namespace"`
LastUpdate time.Time `orm:"column(last_update)" json:"last_update"`
}
//TableName is required by beego to map struct to table.
func (ct *ClairVulnTimestamp) TableName() string {
return ClairVulnTimestampTable
}
//ClairLayer ...
type ClairLayer struct {
Name string `json:"Name,omitempty"`
@ -57,3 +76,34 @@ type ClairLayerEnvelope struct {
Layer *ClairLayer `json:"Layer,omitempty"`
Error *ClairError `json:"Error,omitempty"`
}
//ClairNotification ...
type ClairNotification struct {
Name string `json:"Name,omitempty"`
Created string `json:"Created,omitempty"`
Notified string `json:"Notified,omitempty"`
Deleted string `json:"Deleted,omitempty"`
Limit int `json:"Limit,omitempty"`
Page string `json:"Page,omitempty"`
NextPage string `json:"NextPage,omitempty"`
Old *ClairVulnerabilityWithLayers `json:"Old,omitempty"`
New *ClairVulnerabilityWithLayers `json:"New,omitempty"`
}
//ClairNotificationEnvelope ...
type ClairNotificationEnvelope struct {
Notification *ClairNotification `json:"Notification,omitempty"`
Error *ClairError `json:"Error,omitempty"`
}
//ClairVulnerabilityWithLayers ...
type ClairVulnerabilityWithLayers struct {
Vulnerability *ClairVulnerability `json:"Vulnerability,omitempty"`
OrderedLayersIntroducingVulnerability []ClairOrderedLayerName `json:"OrderedLayersIntroducingVulnerability,omitempty"`
}
//ClairOrderedLayerName ...
type ClairOrderedLayerName struct {
Index int `json:"Index"`
LayerName string `json:"LayerName"`
}

View File

@ -104,3 +104,55 @@ func (c *Client) GetResult(layerName string) (*models.ClairLayerEnvelope, error)
}
return &res, nil
}
// GetNotification calls Clair's API to get details of notification
func (c *Client) GetNotification(id string) (*models.ClairNotification, error) {
req, err := http.NewRequest("GET", c.endpoint+"/v1/notifications/"+id+"?limit=2", nil)
if err != nil {
return nil, err
}
resp, err := c.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("Unexpected status code: %d, text: %s", resp.StatusCode, string(b))
}
var ne models.ClairNotificationEnvelope
err = json.Unmarshal(b, &ne)
if err != nil {
return nil, err
}
if ne.Error != nil {
return nil, fmt.Errorf("Clair error: %s", ne.Error.Message)
}
log.Debugf("Retrived notification %s from Clair.", id)
return ne.Notification, nil
}
// DeleteNotification deletes a notification record from Clair
func (c *Client) DeleteNotification(id string) error {
req, err := http.NewRequest("DELETE", c.endpoint+"/v1/notifications/"+id, nil)
if err != nil {
return err
}
resp, err := c.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
b, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("Unexpected status code: %d, text: %s", resp.StatusCode, string(b))
}
log.Debugf("Deleted notification %s from Clair.", id)
return nil
}

View File

@ -17,7 +17,8 @@ package main
import (
"github.com/vmware/harbor/src/ui/api"
"github.com/vmware/harbor/src/ui/controllers"
"github.com/vmware/harbor/src/ui/service"
"github.com/vmware/harbor/src/ui/service/notifications/clair"
"github.com/vmware/harbor/src/ui/service/notifications/registry"
"github.com/vmware/harbor/src/ui/service/token"
"github.com/astaxie/beego"
@ -109,7 +110,8 @@ func initRouters() {
beego.Router("/api/email/ping", &api.EmailAPI{}, "post:Ping")
//external service that hosted on harbor process:
beego.Router("/service/notifications", &service.NotificationHandler{})
beego.Router("/service/notifications", &registry.NotificationHandler{})
beego.Router("/service/notifications/clair", &clair.Handler{}, "post:Handle")
beego.Router("/service/token", &token.Handler{})
beego.Router("/registryproxy/*", &controllers.RegistryProxy{}, "*:Handle")

View File

@ -0,0 +1,109 @@
// Copyright (c) 2017 VMware, Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package clair
import (
"encoding/json"
"sync"
"time"
"github.com/vmware/harbor/src/common/dao"
"github.com/vmware/harbor/src/common/models"
"github.com/vmware/harbor/src/common/utils/clair"
"github.com/vmware/harbor/src/common/utils/log"
"github.com/vmware/harbor/src/ui/api"
"github.com/vmware/harbor/src/ui/config"
)
const (
rescanInterval = 15 * time.Minute
)
type timer struct {
sync.Mutex
next time.Time
}
// returns true to indicate it should reshedule the "rescan" action.
func (t *timer) needReschedule() bool {
t.Lock()
defer t.Unlock()
if time.Now().Before(t.next) {
return false
}
t.next = time.Now().Add(rescanInterval)
return true
}
var (
rescanTimer = timer{}
clairClient = clair.NewClient(config.ClairEndpoint(), nil)
)
// Handler handles reqeust on /service/notifications/clair/, which listens to clair's notifications.
// When there's unexpected error it will silently fail without removing the notification such that it will be triggered again.
type Handler struct {
api.BaseController
}
// Handle ...
func (h *Handler) Handle() {
var ne models.ClairNotificationEnvelope
if err := json.Unmarshal(h.Ctx.Input.CopyBody(1<<32), &ne); err != nil {
log.Errorf("Failed to decode the request: %v", err)
return
}
log.Debugf("Received notification from Clair, name: %s", ne.Notification.Name)
notification, err := clairClient.GetNotification(ne.Notification.Name)
if err != nil {
log.Errorf("Failed to get notification details from Clair, name: %s, err: %v", ne.Notification.Name, err)
return
}
ns := make(map[string]bool)
if old := notification.Old; old != nil {
if vuln := old.Vulnerability; vuln != nil {
log.Debugf("old vulnerability namespace: %s", vuln.NamespaceName)
ns[vuln.NamespaceName] = true
}
}
if new := notification.New; new != nil {
if vuln := new.Vulnerability; vuln != nil {
log.Debugf("new vulnerability namespace: %s", vuln.NamespaceName)
ns[vuln.NamespaceName] = true
}
}
for k, v := range ns {
if v {
if err := dao.SetClairVulnTimestamp(k, time.Now()); err == nil {
log.Debugf("Updated the timestamp for namespaces: %s", k)
} else {
log.Warningf("Failed to update the timestamp for namespaces: %s, error: %v", k, err)
}
}
}
if rescanTimer.needReschedule() {
go func() {
<-time.After(rescanInterval)
log.Debugf("TODO: rescan or resfresh scan_overview!")
}()
} else {
log.Debugf("There is a rescan scheduled already, skip.")
}
if err := clairClient.DeleteNotification(ne.Notification.Name); err != nil {
log.Warningf("Failed to remove notification from Clair, name: %s", ne.Notification.Name)
} else {
log.Debugf("Removed notification from Clair, name: %s", ne.Notification.Name)
}
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package service
package registry
import (
"encoding/json"