fix(js): webhook infinitely resending issue

- update the resending logic in the hook agent
- use backoff lib to generate backoff duration
- remove retry cache queue
- leverage reaper to restore the sending if direct retries are all failed
- remove useless funcs/codes

fix #14545

Signed-off-by: Steven Zou <szou@vmware.com>
This commit is contained in:
Steven Zou 2021-04-06 13:56:19 +08:00
parent b445683730
commit bb7f706d78
23 changed files with 833 additions and 375 deletions

View File

@ -19,6 +19,7 @@ require (
github.com/bugsnag/panicwrap v1.2.0 // indirect
github.com/casbin/casbin v1.7.0
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cenkalti/backoff/v4 v4.1.0
github.com/cloudflare/cfssl v0.0.0-20190510060611-9c027c93ba9e // indirect
github.com/coreos/go-oidc v2.1.0+incompatible
github.com/denverdino/aliyungo v0.0.0-20191227032621-df38c6fa730c // indirect

View File

@ -121,7 +121,6 @@ github.com/beego/i18n v0.0.0-20140604031826-e87155e8f0c0/go.mod h1:KLeFCpAMq2+50
github.com/beego/x2j v0.0.0-20131220205130-a0352aadc542/go.mod h1:kSeGC/p1AbBiEp5kat81+DSQrZenVBZXklMLaELspWU=
github.com/beorn7/perks v0.0.0-20160804104726-4c0e84591b9a/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
@ -151,6 +150,8 @@ github.com/casbin/casbin v1.7.0 h1:PuzlE8w0JBg/DhIqnkF1Dewf3z+qmUZMVN07PonvVUQ=
github.com/casbin/casbin v1.7.0/go.mod h1:c67qKN6Oum3UF5Q1+BByfFxkwKvhwW57ITjqwtzR1KE=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
@ -361,7 +362,6 @@ github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LB
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gobuffalo/here v0.6.0/go.mod h1:wAG085dHOYqUpf+Ap+WOdrPTp5IYcDAs/x7PLa8Y5fM=
github.com/gobwas/glob v0.2.3 h1:A4xDbljILXROh+kObIiy5kIaPYD8e96x1tgBhUI5J+Y=
github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJAkT8=
github.com/gocql/gocql v0.0.0-20190301043612-f6df8288f9b4/go.mod h1:4Fw1eo5iaEhDUs8XyuhSVCVy52Jq3L+/3GJgYkwc+/0=
github.com/gocraft/work v0.5.1 h1:3bRjMiOo6N4zcRgZWV3Y7uX7R22SF+A9bPTk4xRXr34=
@ -395,7 +395,6 @@ github.com/golang/protobuf v0.0.0-20161109072736-4bd1920723d7/go.mod h1:6lQm79b+
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.3 h1:gyjaxf+svBWX08ZjK86iN9geUJF0H6gp2IRKX6Nf6/I=
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
@ -438,7 +437,6 @@ github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d h1:7XGaL1e6bYS1yIonGp9761ExpPPV1ui0SAC59Yube9k=
github.com/googleapis/gnostic v0.0.0-20170729233727-0c5108395e2d/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY=
@ -541,7 +539,6 @@ github.com/jinzhu/now v1.0.0 h1:6WV8LvwPpDhKjo5U9O6b4+xdG/jTXNPwlDme/MTo8Ns=
github.com/jinzhu/now v1.0.0/go.mod h1:oHTiXerJ20+SfYcrdlBO7rzZRJWGwSTQ0iUY2jI6Gfc=
github.com/jmespath/go-jmespath v0.0.0-20160202185014-0b12d6b521d8/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.0.0-20160803190731-bd40a432e4c7/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.3.0 h1:OS12ieG61fsCg5+qLJ+SsW9NicxNkg3b25OyT2yCeUc=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
@ -551,7 +548,6 @@ github.com/json-iterator/go v0.0.0-20180612202835-f2b4162afba3/go.mod h1:+SdeFBv
github.com/json-iterator/go v1.1.5/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.8 h1:QiWkFLKq0T7mpzwOTu6BzNDbfTE8OLrYhVKYMLF46Ok=
github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
@ -584,7 +580,6 @@ github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE=
github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc=
github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.1 h1:ZC2Vc7/ZFkGmsVC9KvOjumD+G5lXy2RtTKyzRKO2BQ4=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
@ -667,7 +662,6 @@ github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/opencontainers/go-digest v0.0.0-20170106003457-a6d0ee40d420/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
github.com/opencontainers/go-digest v0.0.0-20180430190053-c9281466c8b2/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
github.com/opencontainers/go-digest v1.0.0-rc1 h1:WzifXhOVOEOuFYOJAW6aQqW0TooG2iki3E3Ii+WN7gQ=
github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
@ -705,7 +699,6 @@ github.com/prometheus/client_golang v0.0.0-20180209125602-c332b6f63c06/go.mod h1
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829/go.mod h1:p2iRAGwDERtqlqzRXnrOVns+ignqQo//hLXqYxZYVNs=
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
github.com/prometheus/client_golang v1.0.0 h1:vrDKnkGzuGvhNAL56c7DBz29ZL+KxnoR0x7enabFceM=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
github.com/prometheus/client_golang v1.7.1 h1:NTGy1Ja9pByO+xAeH/qiWnLrKtr3hJPNjaVUwnjpdpA=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
@ -713,7 +706,6 @@ github.com/prometheus/client_model v0.0.0-20171117100541-99fa1f4be8e5/go.mod h1:
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4 h1:gQz4mCbXsO+nc9n1hCxHcGA3Zx3Eo+UHZoInFGUIXNM=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/prometheus/client_model v0.2.0 h1:uq5h0d+GuxiXLJLNABMgp2qUWDPiLvgCzz2dUR+/W/M=
github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@ -721,7 +713,6 @@ github.com/prometheus/common v0.0.0-20180110214958-89604d197083/go.mod h1:daVV7q
github.com/prometheus/common v0.0.0-20181113130724-41aa239b4cce/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/common v0.2.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.4.0/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.4.1 h1:K0MGApIoQvMw27RTdJkPbr3JZ7DNbtxQNyi5STVM6Kw=
github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4=
github.com/prometheus/common v0.10.0 h1:RyRA7RzGXQZiW+tGMr7sxa85G1z0yOpM1qq5c8lNawc=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
@ -730,7 +721,6 @@ github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.5 h1:3+auTFlqw+ZaQYJARz6ArODtkaIwtvBTx3N2NehQlL8=
github.com/prometheus/procfs v0.0.5/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDaekg4FpcdQ=
github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
@ -765,7 +755,6 @@ github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a h1:pa8hGb/2YqsZKovtsgrwcDH1RZhVbTKCjLp47XpqCDs=
github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
@ -799,9 +788,7 @@ github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoH
github.com/stretchr/testify v0.0.0-20151208002404-e3a8ff8ce365/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
@ -857,7 +844,6 @@ go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3 h1:8sGtKOrtQqkN1bp2AtX+misvLIlOmsEsNd+9NIcPEm8=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
@ -882,7 +868,6 @@ golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200128174031-69ecbb4d6d5d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073 h1:xMPOj6Pz6UipU1wXLkrtqpHbR0AVFnyPEQq/wRWz9lM=
golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200604202706-70a84ac30bf9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI=
@ -944,7 +929,6 @@ golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20191004110552-13f9640d40b9/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200625001655-4c5254603344 h1:vGXIOMxbNfDTk/aXCmfdLgkrSV+Z2tcbze+pEc3v5W4=
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
@ -997,7 +981,6 @@ golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200124204421-9fbb57f87de9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4 h1:sfkvUWPNGwSV+8/fNqctR5lS2AqCSqYwXdrjCxp/dXo=
golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80=
@ -1114,7 +1097,6 @@ gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20141024133853-64131543e789/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
@ -1132,7 +1114,6 @@ gopkg.in/gorethink/gorethink.v3 v3.0.5/go.mod h1:+3yIIHJUGMBK+wyPH+iN5TP+88ikFDf
gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/ini.v1 v1.42.0 h1:7N3gPTt50s8GuLortA00n8AqRTk75qOP98+mTPpgzRk=
gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/ini.v1 v1.51.0 h1:AQvPpx3LzTDM0AjnIRlVFwFFGC+npRopjZxLJj6gdno=
gopkg.in/ini.v1 v1.51.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
@ -1149,7 +1130,6 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View File

@ -57,11 +57,6 @@ func KeyPeriodicPolicy(namespace string) string {
return fmt.Sprintf("%s:%s", KeyPeriod(namespace), "policies")
}
// KeyPeriodicNotification returns the key of periodic pub/sub channel.
func KeyPeriodicNotification(namespace string) string {
return fmt.Sprintf("%s:%s", KeyPeriodicPolicy(namespace), "notifications")
}
// KeyPeriodicLock returns the key of locker under period
func KeyPeriodicLock(namespace string) string {
return fmt.Sprintf("%s:%s", KeyPeriod(namespace), "lock")
@ -77,11 +72,6 @@ func KeyUpstreamJobAndExecutions(namespace, upstreamJobID string) string {
return fmt.Sprintf("%s%s:%s", KeyNamespacePrefix(namespace), "executions", upstreamJobID)
}
// KeyHookEventRetryQueue returns the key of hook event retrying queue
func KeyHookEventRetryQueue(namespace string) string {
return fmt.Sprintf("%s%s", KeyNamespacePrefix(namespace), "hook_events")
}
// KeyStatusUpdateRetryQueue returns the key of status change retrying queue
func KeyStatusUpdateRetryQueue(namespace string) string {
return fmt.Sprintf("%s%s", KeyNamespacePrefix(namespace), "status_change_events")

View File

@ -107,53 +107,6 @@ return st
// SetStatusScript is lua script for setting job status atomically
var SetStatusScript = redis.NewScript(2, setStatusScriptText)
// Used to check if the status info provided is still validate
//
// KEY[1]: key of job stats
// ARGV[1]: job status
// ARGV[2]: revision of job stats
// ARGV[3]: check in timestamp
var isStatusMatchScriptText = fmt.Sprintf(`
%s
%s
local res, st, rev, checkInAt, ack
res = redis.call('hmget', KEYS[1], 'status', 'revision', 'check_in_at', 'ack')
if res then
st = res[1]
rev = tonumber(res[2]) or 0
checkInAt = tonumber(res[3]) or 0
ack = res[4]
local reply = compare(st, rev)
if reply == 'ok' then
if not ack then
return 'ok'
end
-- ack exists, compare with ack
local a = cjson.decode(ack)
st = a['status']
rev = a['revision']
checkInAt = a['check_in_at']
local reply2 = compare(st, rev)
if reply2 == 'ok' then
return 'ok'
end
end
end
return 'no'
`, luaFuncStCodeText, luaFuncCompareText)
// CheckStatusMatchScript is lua script for checking if the provided status is still matching
// the backend status.
var CheckStatusMatchScript = redis.NewScript(1, isStatusMatchScriptText)
// Used to set the hook ACK
//
// KEY[1]: key of job stats

View File

@ -123,35 +123,3 @@ func ReleaseLock(conn redis.Conn, lockerKey string, lockerID string) error {
return errors.New("locker ID mismatch")
}
// ZPopMin pops the element with lowest score in the zset
func ZPopMin(conn redis.Conn, key string) (interface{}, error) {
err := conn.Send("MULTI")
err = conn.Send("ZRANGE", key, 0, 0) // lowest one
err = conn.Send("ZREMRANGEBYRANK", key, 0, 0)
if err != nil {
return nil, err
}
replies, err := redis.Values(conn.Do("EXEC"))
if err != nil {
return nil, err
}
if len(replies) < 2 {
return nil, errors.Errorf("zpopmin error: not enough results returned, expected %d but got %d", 2, len(replies))
}
zrangeReply := replies[0]
if zrangeReply != nil {
if elements, ok := zrangeReply.([]interface{}); ok {
if len(elements) == 0 {
return nil, ErrNoElements
}
return elements[0], nil
}
}
return nil, errors.New("zpopmin error: bad result reply")
}

View File

@ -15,21 +15,15 @@
package rds
import (
"encoding/json"
"testing"
"github.com/goharbor/harbor/src/jobservice/tests"
"github.com/gomodule/redigo/redis"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"testing"
"time"
)
// For testing
type simpleStatusChange struct {
JobID string
}
// RdsUtilsTestSuite tests functions located in rds package
type RdsUtilsTestSuite struct {
suite.Suite
@ -66,34 +60,6 @@ func (suite *RdsUtilsTestSuite) TearDownSuite() {
assert.NoError(suite.T(), err, "clear all: nil error expected but got %s", err)
}
// TestZPopMin ...
func (suite *RdsUtilsTestSuite) TestZPopMin() {
s1 := &simpleStatusChange{"a"}
s2 := &simpleStatusChange{"b"}
raw1, _ := json.Marshal(s1)
raw2, _ := json.Marshal(s2)
key := KeyStatusUpdateRetryQueue(suite.namespace)
_, err := suite.conn.Do("ZADD", key, time.Now().Unix(), raw1)
_, err = suite.conn.Do("ZADD", key, time.Now().Unix()+5, raw2)
require.Nil(suite.T(), err, "zadd objects error should be nil")
v, err := ZPopMin(suite.conn, key)
require.Nil(suite.T(), err, "nil error should be returned by calling ZPopMin")
change1 := &simpleStatusChange{}
_ = json.Unmarshal(v.([]byte), change1)
assert.Equal(suite.T(), "a", change1.JobID, "job ID not equal")
v, err = ZPopMin(suite.conn, key)
require.Nil(suite.T(), err, "nil error should be returned by calling ZPopMin")
change2 := &simpleStatusChange{}
_ = json.Unmarshal(v.([]byte), change2)
assert.Equal(suite.T(), "b", change2.JobID, "job ID not equal")
}
// TestHmGetAndSet ...
func (suite *RdsUtilsTestSuite) TestHmGetAndSet() {
key := KeyJobStats(suite.namespace, "fake_job_id")

View File

@ -18,9 +18,9 @@ import (
"context"
"encoding/json"
"net/url"
"sync"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/env"
"github.com/goharbor/harbor/src/jobservice/job"
@ -30,22 +30,16 @@ import (
)
const (
// The max time for expiring the retrying events
// 1 day
maxEventExpireTime = 3600 * 24
// Waiting a short while if any errors occurred
shortLoopInterval = 5 * time.Second
// Waiting for long while if no retrying elements found
longLoopInterval = 5 * time.Minute
// Backoff duration of direct retrying.
errRetryBackoff = 5 * time.Minute
// Max concurrency of retrying goroutines.
maxConcurrency = 512
)
// Agent is designed to handle the hook events with reasonable numbers of concurrent threads
// Agent is designed to handle the hook events with reasonable numbers of concurrent threads.
type Agent interface {
// Trigger hooks
Trigger(evt *Event) error
// Serves retry loop now
Serve() error
}
// Event contains the hook URL and the data
@ -70,23 +64,13 @@ func (e *Event) Validate() error {
return nil
}
// Serialize event to bytes
func (e *Event) Serialize() ([]byte, error) {
return json.Marshal(e)
}
// Deserialize the bytes to event
func (e *Event) Deserialize(bytes []byte) error {
return json.Unmarshal(bytes, e)
}
// Basic agent for usage
type basicAgent struct {
context context.Context
namespace string
client Client
redisPool *redis.Pool
wg *sync.WaitGroup
tokens chan struct{}
}
// NewAgent is constructor of basic agent
@ -96,31 +80,28 @@ func NewAgent(ctx *env.Context, ns string, redisPool *redis.Pool) Agent {
namespace: ns,
client: NewClient(ctx.SystemContext),
redisPool: redisPool,
wg: ctx.WG,
tokens: make(chan struct{}, maxConcurrency),
}
}
// Trigger implements the same method of interface @Agent
func (ba *basicAgent) Trigger(evt *Event) error {
if evt == nil {
return errors.New("nil web hook event")
return errors.New("nil hook event")
}
if err := evt.Validate(); err != nil {
return errors.Wrap(err, "trigger error")
}
// Treat hook event is success if it is successfully sent or cached in the retry queue.
// Send hook event with retry supported.
// Exponential backoff is used and the max elapsed time is 5m.
// If it is still failed to send hook event after all tries, the reaper may help to fix the inconsistent status.
if err := ba.client.SendEvent(evt); err != nil {
// Push event to the retry queue
if er := ba.pushForRetry(evt); er != nil {
// Failed to push to the hook event retry queue, return error with all context
return errors.Wrap(er, err.Error())
}
// Start retry at background.
go ba.retry(evt)
logger.Warningf("Send hook event '%s' to '%s' failed with error: %s; push hook event to the queue for retrying later", evt.Message, evt.URL, err)
// Treat as successful hook event as the event has been put into the retry queue for future resending.
return nil
return errors.Wrap(err, "trigger hook event error")
}
// Mark event hook ACK including "revision", "status" and "check_in_at" in the job stats to indicate
@ -130,146 +111,53 @@ func (ba *basicAgent) Trigger(evt *Event) error {
// can be ignored.
if err := ba.ack(evt); err != nil {
// Just log error
logger.Error(errors.Wrap(err, "trigger"))
logger.Error(errors.Wrap(err, "hook event ack error"))
}
// For debugging
logger.Debugf("Hook event is successfully sent: %s->%s", evt.Message, evt.URL)
return nil
}
// Start the basic agent
// Termination depends on the system context
// Blocking call
func (ba *basicAgent) Serve() error {
ba.wg.Add(1)
go ba.loopRetry()
logger.Info("Hook event retrying loop is started")
return nil
}
func (ba *basicAgent) pushForRetry(evt *Event) error {
if evt == nil {
// do nothing
return nil
}
// Anyway we'll need the raw JSON, let's try to serialize it here
rawJSON, err := evt.Serialize()
if err != nil {
return err
}
now := time.Now().Unix()
if evt.Timestamp > 0 && now-evt.Timestamp >= maxEventExpireTime {
// Expired, do not need to push back to the retry queue
logger.Warningf("Event is expired: %s", rawJSON)
return nil
}
conn := ba.redisPool.Get()
// retry event with exponential backoff.
// Limit the max concurrency (defined by maxConcurrency) of retrying goroutines.
func (ba *basicAgent) retry(evt *Event) {
// Apply for a running token.
// If no token is available, then hold until token is released.
ba.tokens <- struct{}{}
// Release token
defer func() {
_ = conn.Close()
<-ba.tokens
}()
key := rds.KeyHookEventRetryQueue(ba.namespace)
args := make([]interface{}, 0)
// Resend hook event
bf := newBackoff(errRetryBackoff)
bf.Reset()
// Use nano time to get more accurate timestamp
score := time.Now().UnixNano()
args = append(args, key, "NX", score, rawJSON)
err := backoff.RetryNotify(func() error {
logger.Debugf("Retry: sending hook event: %s->%s", evt.Message, evt.URL)
_, err = conn.Do("ZADD", args...)
if err != nil {
return err
}
return nil
}
func (ba *basicAgent) loopRetry() {
defer func() {
logger.Info("Hook event retrying loop exit")
ba.wg.Done()
}()
for {
if err := ba.reSend(); err != nil {
waitInterval := shortLoopInterval
if err == rds.ErrNoElements {
// No elements
waitInterval = longLoopInterval
} else {
logger.Errorf("Resend hook event error: %s", err.Error())
}
select {
case <-time.After(waitInterval):
// Just wait, do nothing
case <-ba.context.Done():
// Terminated
return
// Try to avoid sending outdated events, just a try-best operation.
ot, err := ba.isOutdated(evt)
if err != nil {
// Log error and continue.
logger.Error(err)
} else {
if ot {
logger.Infof("Hook event is abandoned as it's outdated: %s->%s", evt.Message, evt.URL)
return nil
}
}
}
}
func (ba *basicAgent) reSend() error {
conn := ba.redisPool.Get()
defer func() {
if err := conn.Close(); err != nil {
logger.Error(errors.Wrap(err, "resend"))
}
}()
return ba.client.SendEvent(evt)
}, bf, func(e error, d time.Duration) {
logger.Errorf("Retry: sending hook event error: %s, evt=%s->%s, duration=%v", e.Error(), evt.Message, evt.URL, d)
})
// Pick up one queued event for resending
evt, err := ba.popMinOne(conn)
if err != nil {
return err
logger.Errorf("Retry: still failed after all retries: %s, evt=%s->%s", err.Error(), evt.Message, evt.URL)
}
// Args for executing script
args := []interface{}{
rds.KeyJobStats(ba.namespace, evt.Data.JobID),
evt.Data.Status,
evt.Data.Metadata.Revision,
evt.Data.Metadata.CheckInAt,
}
// If failed to check the status matching, just ignore it, continue the resending
reply, err := redis.String(rds.CheckStatusMatchScript.Do(conn, args...))
if err != nil {
// Log error
logger.Error(errors.Wrap(err, "resend"))
} else {
if reply != "ok" {
return errors.Errorf("outdated hook event: %s", evt.Message)
}
}
return ba.Trigger(evt)
}
// popMinOne picks up one event for retrying
func (ba *basicAgent) popMinOne(conn redis.Conn) (*Event, error) {
key := rds.KeyHookEventRetryQueue(ba.namespace)
minOne, err := rds.ZPopMin(conn, key)
if err != nil {
return nil, err
}
rawEvent, ok := minOne.([]byte)
if !ok {
return nil, errors.New("bad request: non bytes slice for raw event")
}
evt := &Event{}
if err := evt.Deserialize(rawEvent); err != nil {
return nil, err
}
return evt, nil
}
// ack hook event
@ -302,3 +190,78 @@ func (ba *basicAgent) ack(evt *Event) error {
return nil
}
// Check if the event has been outdated.
func (ba *basicAgent) isOutdated(evt *Event) (bool, error) {
if evt == nil || evt.Data == nil {
return false, nil
}
conn := ba.redisPool.Get()
defer func() {
_ = conn.Close()
}()
key := rds.KeyJobStats(ba.namespace, evt.Data.JobID)
values, err := rds.HmGet(conn, key, "ack")
if err != nil {
return false, errors.Wrap(err, "check outdated event error")
}
// Parse ack
if ab, ok := values[0].([]byte); ok && len(ab) > 0 {
ack := &job.ACK{}
if err := json.Unmarshal(ab, ack); err != nil {
return false, errors.Wrap(err, "parse ack error")
}
// Revision
diff := ack.Revision - evt.Data.Metadata.Revision
switch {
// Revision of the hook event has left behind the current acked revision.
case diff > 0:
return true, nil
case diff < 0:
return false, nil
case diff == 0:
// Continue to compare the status.
}
// Status
st := job.Status(ack.Status)
if err := st.Validate(); err != nil {
return false, errors.Wrap(err, "validate acked job status error")
}
est := job.Status(evt.Data.Status)
if err := est.Validate(); err != nil {
return false, errors.Wrap(err, "validate job status error")
}
switch {
case st.Before(est):
return false, nil
case st.After(est):
return true, nil
case st.Equal(est):
// Continue to compare check in at timestamp
}
// Check in timestamp
if ack.CheckInAt >= evt.Data.Metadata.CheckInAt {
return true, nil
}
}
return false, nil
}
func newBackoff(maxElapsedTime time.Duration) backoff.BackOff {
bf := backoff.NewExponentialBackOff()
bf.InitialInterval = 2 * time.Second
bf.RandomizationFactor = 0.5
bf.Multiplier = 2
bf.MaxElapsedTime = maxElapsedTime
return bf
}

View File

@ -20,14 +20,10 @@ import (
"time"
"github.com/goharbor/harbor/src/jobservice/common/list"
"github.com/goharbor/harbor/src/jobservice/common/utils"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/goharbor/harbor/src/jobservice/common/rds"
"github.com/goharbor/harbor/src/jobservice/job"
"github.com/goharbor/harbor/src/jobservice/tests"
"github.com/goharbor/harbor/src/lib/errors"
"github.com/gomodule/redigo/redis"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
@ -61,6 +57,8 @@ func (suite *HookAgentTestSuite) SetupSuite() {
namespace: suite.namespace,
redisPool: suite.pool,
}
suite.prepareData()
}
// TearDownSuite prepares test suites
@ -73,7 +71,7 @@ func (suite *HookAgentTestSuite) TearDownSuite() {
_ = tests.ClearAll(suite.namespace, conn)
}
func (suite *HookAgentTestSuite) SetupTest() {
func (suite *HookAgentTestSuite) prepareData() {
suite.jid = utils.MakeIdentifier()
rev := time.Now().Unix()
stats := &job.Stats{
@ -107,18 +105,6 @@ func (suite *HookAgentTestSuite) SetupTest() {
}
}
func (suite *HookAgentTestSuite) TearDownTest() {
conn := suite.pool.Get()
defer func() {
err := conn.Close()
suite.NoError(err, "close redis connection")
}()
k := rds.KeyHookEventRetryQueue(suite.namespace)
_, err := conn.Do("DEL", k)
suite.NoError(err, "tear down test cases")
}
// TestEventSending ...
func (suite *HookAgentTestSuite) TestEventSending() {
mc := &mockClient{}
@ -140,27 +126,7 @@ func (suite *HookAgentTestSuite) TestEventSendingError() {
err := suite.agent.Trigger(suite.event)
// Failed to send by client, it should be put into retry queue, check it
// The return should still be nil
suite.NoError(err, "agent trigger: nil error expected but got %s", err)
suite.checkRetryQueue(1)
}
// TestRetryAndPopMin ...
func (suite *HookAgentTestSuite) TestRetryAndPopMin() {
mc := &mockClient{}
mc.On("SendEvent", suite.event).Return(nil)
suite.agent.client = mc
err := suite.agent.pushForRetry(suite.event)
suite.NoError(err, "push event for retry")
err = suite.agent.reSend()
require.NoError(suite.T(), err, "resend error: %v", err)
// Check
suite.checkRetryQueue(0)
suite.checkStatus()
suite.Error(err)
}
func (suite *HookAgentTestSuite) checkStatus() {
@ -171,19 +137,6 @@ func (suite *HookAgentTestSuite) checkStatus() {
suite.Equal(job.SuccessStatus.String(), t.Job().Info.HookAck.Status, "ack status")
}
func (suite *HookAgentTestSuite) checkRetryQueue(size int) {
conn := suite.pool.Get()
defer func() {
err := conn.Close()
suite.NoError(err, "close redis connection")
}()
k := rds.KeyHookEventRetryQueue(suite.namespace)
c, err := redis.Int(conn.Do("ZCARD", k))
suite.NoError(err, "check retry queue")
suite.Equal(size, c, "retry queue count")
}
type mockClient struct {
mock.Mock
}

View File

@ -126,7 +126,12 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
// Create hook agent, it's a singleton object
hookAgent := hook.NewAgent(rootContext, namespace, redisPool)
hookCallback := func(URL string, change *job.StatusChange) error {
msg := fmt.Sprintf("status change: job=%s, status=%s", change.JobID, change.Status)
msg := fmt.Sprintf(
"status change: job=%s, status=%s, revision=%d",
change.JobID,
change.Status,
change.Metadata.Revision,
)
if !utils.IsEmptyStr(change.CheckIn) {
// Ignore the real check in message to avoid too big message stream
cData := change.CheckIn
@ -138,12 +143,17 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
evt := &hook.Event{
URL: URL,
Timestamp: time.Now().Unix(),
Timestamp: change.Metadata.UpdateTime, // use update timestamp to avoid duplicated resending.
Data: change,
Message: msg,
}
return hookAgent.Trigger(evt)
// Hook event sending should not influence the main job flow (because job may call checkin() in the job run).
if err := hookAgent.Trigger(evt); err != nil {
logger.Error(err)
}
return nil
}
// Create job life cycle management controller
@ -166,12 +176,6 @@ func (bs *Bootstrap) LoadAndRun(ctx context.Context, cancel context.CancelFunc)
if err = lcmCtl.Serve(); err != nil {
return errors.Errorf("start life cycle controller error: %s", err)
}
// Start agent
// Non blocking call
if err = hookAgent.Serve(); err != nil {
return errors.Errorf("start hook agent error: %s", err)
}
} else {
return errors.Errorf("worker backend '%s' is not supported", cfg.PoolConfig.Backend)
}

View File

@ -85,8 +85,8 @@ func (r *reaper) start() {
// This process only needs to be executed once when worker pool is starting.
func (r *reaper) reEnqueueInProgressJobs() error {
// Debug
logger.Debugf("Start: Reap in-progress jobs from the dead pools")
defer logger.Debugf("End: Reap in-progress jobs")
logger.Info("Start: Reap in-progress jobs from the dead pools")
defer logger.Info("End: Reap in-progress jobs")
currentPools, err := r.getCurrentWorkerPools()
if err != nil {
@ -130,8 +130,8 @@ func (r *reaper) reEnqueueInProgressJobs() error {
// the related status change hook events are successfully fired.
func (r *reaper) syncOutdatedStats() error {
// Debug
logger.Debugf("Start: reap outdated job stats")
defer logger.Debugf("End: reap outdated job stats")
logger.Info("Start: reap outdated job stats")
defer logger.Info("End: reap outdated job stats")
// Loop all the in progress jobs to check if they're really in progress or
// status is hung.
@ -139,13 +139,15 @@ func (r *reaper) syncOutdatedStats() error {
defer func() {
if errs.IsObjectNotFoundError(err) {
// As the job stats is lost and we don't have chance to restore it, then directly discard it.
logger.Errorf("Sync outdated stats error: %s", err.Error())
// Un-track the in-progress record
err = r.unTrackInProgress(k)
if e := r.unTrackInProgress(k); e != nil {
// Wrap error
err = errors.Wrap(e, err.Error())
}
}
if err != nil {
err = errors.Wrap(err, "sync outdated stats handler")
err = errors.Wrap(err, "sync outdated stats handler error")
}
}()
@ -174,8 +176,11 @@ func (r *reaper) syncOutdatedStats() error {
if err = t.Fail(); err != nil {
return
}
// Exit
// Log and exit
logger.Infof("Reaper: mark job %s failed as job is still not finished in 1 day", t.Job().Info.JobID)
}
// Exit as it is still a valid ongoing job
}
} else if diff > 0 {
@ -184,20 +189,34 @@ func (r *reaper) syncOutdatedStats() error {
if err = t.FireHook(); err != nil {
return
}
// Success and exit
logger.Infof(
"Reaper: fire hook again for job %s as job status change is not ACKed: %s(rev=%d)",
t.Job().Info.JobID,
t.Job().Info.Status,
t.Job().Info.Revision,
)
} else {
// Current status is outdated, update it with ACKed status.
if err = t.UpdateStatusWithRetry(job.Status(t.Job().Info.HookAck.Status)); err != nil {
return
}
// Success and exit
logger.Infof(
"Reaper: update the status of job %s to the ACKed status: %s(%d)",
t.Job().Info.JobID,
t.Job().Info.HookAck.Status,
t.Job().Info.Revision,
)
}
return nil
}
if err := r.scanLocks(rds.KeyJobTrackInProgress(r.namespace), h); err != nil {
return errors.Wrap(err, "reaper")
return errors.Wrap(err, "reaper error")
}
return nil

25
src/vendor/github.com/cenkalti/backoff/v4/.gitignore generated vendored Normal file
View File

@ -0,0 +1,25 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe
# IDEs
.idea/

10
src/vendor/github.com/cenkalti/backoff/v4/.travis.yml generated vendored Normal file
View File

@ -0,0 +1,10 @@
language: go
go:
- 1.13
- 1.x
- tip
before_install:
- go get github.com/mattn/goveralls
- go get golang.org/x/tools/cmd/cover
script:
- $HOME/gopath/bin/goveralls -service=travis-ci

20
src/vendor/github.com/cenkalti/backoff/v4/LICENSE generated vendored Normal file
View File

@ -0,0 +1,20 @@
The MIT License (MIT)
Copyright (c) 2014 Cenk Altı
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

32
src/vendor/github.com/cenkalti/backoff/v4/README.md generated vendored Normal file
View File

@ -0,0 +1,32 @@
# Exponential Backoff [![GoDoc][godoc image]][godoc] [![Build Status][travis image]][travis] [![Coverage Status][coveralls image]][coveralls]
This is a Go port of the exponential backoff algorithm from [Google's HTTP Client Library for Java][google-http-java-client].
[Exponential backoff][exponential backoff wiki]
is an algorithm that uses feedback to multiplicatively decrease the rate of some process,
in order to gradually find an acceptable rate.
The retries exponentially increase and stop increasing when a certain threshold is met.
## Usage
Import path is `github.com/cenkalti/backoff/v4`. Please note the version part at the end.
Use https://pkg.go.dev/github.com/cenkalti/backoff/v4 to view the documentation.
## Contributing
* I would like to keep this library as small as possible.
* Please don't send a PR without opening an issue and discussing it first.
* If proposed change is not a common use case, I will probably not accept it.
[godoc]: https://pkg.go.dev/github.com/cenkalti/backoff/v4
[godoc image]: https://godoc.org/github.com/cenkalti/backoff?status.png
[travis]: https://travis-ci.org/cenkalti/backoff
[travis image]: https://travis-ci.org/cenkalti/backoff.png?branch=master
[coveralls]: https://coveralls.io/github/cenkalti/backoff?branch=master
[coveralls image]: https://coveralls.io/repos/github/cenkalti/backoff/badge.svg?branch=master
[google-http-java-client]: https://github.com/google/google-http-java-client/blob/da1aa993e90285ec18579f1553339b00e19b3ab5/google-http-client/src/main/java/com/google/api/client/util/ExponentialBackOff.java
[exponential backoff wiki]: http://en.wikipedia.org/wiki/Exponential_backoff
[advanced example]: https://pkg.go.dev/github.com/cenkalti/backoff/v4?tab=doc#pkg-examples

66
src/vendor/github.com/cenkalti/backoff/v4/backoff.go generated vendored Normal file
View File

@ -0,0 +1,66 @@
// Package backoff implements backoff algorithms for retrying operations.
//
// Use Retry function for retrying operations that may fail.
// If Retry does not meet your needs,
// copy/paste the function into your project and modify as you wish.
//
// There is also Ticker type similar to time.Ticker.
// You can use it if you need to work with channels.
//
// See Examples section below for usage examples.
package backoff
import "time"
// BackOff is a backoff policy for retrying an operation.
type BackOff interface {
// NextBackOff returns the duration to wait before retrying the operation,
// or backoff. Stop to indicate that no more retries should be made.
//
// Example usage:
//
// duration := backoff.NextBackOff();
// if (duration == backoff.Stop) {
// // Do not retry operation.
// } else {
// // Sleep for duration and retry operation.
// }
//
NextBackOff() time.Duration
// Reset to initial state.
Reset()
}
// Stop indicates that no more retries should be made for use in NextBackOff().
const Stop time.Duration = -1
// ZeroBackOff is a fixed backoff policy whose backoff time is always zero,
// meaning that the operation is retried immediately without waiting, indefinitely.
type ZeroBackOff struct{}
func (b *ZeroBackOff) Reset() {}
func (b *ZeroBackOff) NextBackOff() time.Duration { return 0 }
// StopBackOff is a fixed backoff policy that always returns backoff.Stop for
// NextBackOff(), meaning that the operation should never be retried.
type StopBackOff struct{}
func (b *StopBackOff) Reset() {}
func (b *StopBackOff) NextBackOff() time.Duration { return Stop }
// ConstantBackOff is a backoff policy that always returns the same backoff delay.
// This is in contrast to an exponential backoff policy,
// which returns a delay that grows longer as you call NextBackOff() over and over again.
type ConstantBackOff struct {
Interval time.Duration
}
func (b *ConstantBackOff) Reset() {}
func (b *ConstantBackOff) NextBackOff() time.Duration { return b.Interval }
func NewConstantBackOff(d time.Duration) *ConstantBackOff {
return &ConstantBackOff{Interval: d}
}

66
src/vendor/github.com/cenkalti/backoff/v4/context.go generated vendored Normal file
View File

@ -0,0 +1,66 @@
package backoff
import (
"context"
"time"
)
// BackOffContext is a backoff policy that stops retrying after the context
// is canceled.
type BackOffContext interface { // nolint: golint
BackOff
Context() context.Context
}
type backOffContext struct {
BackOff
ctx context.Context
}
// WithContext returns a BackOffContext with context ctx
//
// ctx must not be nil
func WithContext(b BackOff, ctx context.Context) BackOffContext { // nolint: golint
if ctx == nil {
panic("nil context")
}
if b, ok := b.(*backOffContext); ok {
return &backOffContext{
BackOff: b.BackOff,
ctx: ctx,
}
}
return &backOffContext{
BackOff: b,
ctx: ctx,
}
}
func getContext(b BackOff) context.Context {
if cb, ok := b.(BackOffContext); ok {
return cb.Context()
}
if tb, ok := b.(*backOffTries); ok {
return getContext(tb.delegate)
}
return context.Background()
}
func (b *backOffContext) Context() context.Context {
return b.ctx
}
func (b *backOffContext) NextBackOff() time.Duration {
select {
case <-b.ctx.Done():
return Stop
default:
}
next := b.BackOff.NextBackOff()
if deadline, ok := b.ctx.Deadline(); ok && deadline.Sub(time.Now()) < next { // nolint: gosimple
return Stop
}
return next
}

View File

@ -0,0 +1,158 @@
package backoff
import (
"math/rand"
"time"
)
/*
ExponentialBackOff is a backoff implementation that increases the backoff
period for each retry attempt using a randomization function that grows exponentially.
NextBackOff() is calculated using the following formula:
randomized interval =
RetryInterval * (random value in range [1 - RandomizationFactor, 1 + RandomizationFactor])
In other words NextBackOff() will range between the randomization factor
percentage below and above the retry interval.
For example, given the following parameters:
RetryInterval = 2
RandomizationFactor = 0.5
Multiplier = 2
the actual backoff period used in the next retry attempt will range between 1 and 3 seconds,
multiplied by the exponential, that is, between 2 and 6 seconds.
Note: MaxInterval caps the RetryInterval and not the randomized interval.
If the time elapsed since an ExponentialBackOff instance is created goes past the
MaxElapsedTime, then the method NextBackOff() starts returning backoff.Stop.
The elapsed time can be reset by calling Reset().
Example: Given the following default arguments, for 10 tries the sequence will be,
and assuming we go over the MaxElapsedTime on the 10th try:
Request # RetryInterval (seconds) Randomized Interval (seconds)
1 0.5 [0.25, 0.75]
2 0.75 [0.375, 1.125]
3 1.125 [0.562, 1.687]
4 1.687 [0.8435, 2.53]
5 2.53 [1.265, 3.795]
6 3.795 [1.897, 5.692]
7 5.692 [2.846, 8.538]
8 8.538 [4.269, 12.807]
9 12.807 [6.403, 19.210]
10 19.210 backoff.Stop
Note: Implementation is not thread-safe.
*/
type ExponentialBackOff struct {
InitialInterval time.Duration
RandomizationFactor float64
Multiplier float64
MaxInterval time.Duration
// After MaxElapsedTime the ExponentialBackOff returns Stop.
// It never stops if MaxElapsedTime == 0.
MaxElapsedTime time.Duration
Stop time.Duration
Clock Clock
currentInterval time.Duration
startTime time.Time
}
// Clock is an interface that returns current time for BackOff.
type Clock interface {
Now() time.Time
}
// Default values for ExponentialBackOff.
const (
DefaultInitialInterval = 500 * time.Millisecond
DefaultRandomizationFactor = 0.5
DefaultMultiplier = 1.5
DefaultMaxInterval = 60 * time.Second
DefaultMaxElapsedTime = 15 * time.Minute
)
// NewExponentialBackOff creates an instance of ExponentialBackOff using default values.
func NewExponentialBackOff() *ExponentialBackOff {
b := &ExponentialBackOff{
InitialInterval: DefaultInitialInterval,
RandomizationFactor: DefaultRandomizationFactor,
Multiplier: DefaultMultiplier,
MaxInterval: DefaultMaxInterval,
MaxElapsedTime: DefaultMaxElapsedTime,
Stop: Stop,
Clock: SystemClock,
}
b.Reset()
return b
}
type systemClock struct{}
func (t systemClock) Now() time.Time {
return time.Now()
}
// SystemClock implements Clock interface that uses time.Now().
var SystemClock = systemClock{}
// Reset the interval back to the initial retry interval and restarts the timer.
// Reset must be called before using b.
func (b *ExponentialBackOff) Reset() {
b.currentInterval = b.InitialInterval
b.startTime = b.Clock.Now()
}
// NextBackOff calculates the next backoff interval using the formula:
// Randomized interval = RetryInterval * (1 ± RandomizationFactor)
func (b *ExponentialBackOff) NextBackOff() time.Duration {
// Make sure we have not gone over the maximum elapsed time.
elapsed := b.GetElapsedTime()
next := getRandomValueFromInterval(b.RandomizationFactor, rand.Float64(), b.currentInterval)
b.incrementCurrentInterval()
if b.MaxElapsedTime != 0 && elapsed+next > b.MaxElapsedTime {
return b.Stop
}
return next
}
// GetElapsedTime returns the elapsed time since an ExponentialBackOff instance
// is created and is reset when Reset() is called.
//
// The elapsed time is computed using time.Now().UnixNano(). It is
// safe to call even while the backoff policy is used by a running
// ticker.
func (b *ExponentialBackOff) GetElapsedTime() time.Duration {
return b.Clock.Now().Sub(b.startTime)
}
// Increments the current interval by multiplying it with the multiplier.
func (b *ExponentialBackOff) incrementCurrentInterval() {
// Check for overflow, if overflow is detected set the current interval to the max interval.
if float64(b.currentInterval) >= float64(b.MaxInterval)/b.Multiplier {
b.currentInterval = b.MaxInterval
} else {
b.currentInterval = time.Duration(float64(b.currentInterval) * b.Multiplier)
}
}
// Returns a random value from the following interval:
// [currentInterval - randomizationFactor * currentInterval, currentInterval + randomizationFactor * currentInterval].
func getRandomValueFromInterval(randomizationFactor, random float64, currentInterval time.Duration) time.Duration {
var delta = randomizationFactor * float64(currentInterval)
var minInterval = float64(currentInterval) - delta
var maxInterval = float64(currentInterval) + delta
// Get a random value from the range [minInterval, maxInterval].
// The formula used below has a +1 because if the minInterval is 1 and the maxInterval is 3 then
// we want a 33% chance for selecting either 1, 2 or 3.
return time.Duration(minInterval + (random * (maxInterval - minInterval + 1)))
}

3
src/vendor/github.com/cenkalti/backoff/v4/go.mod generated vendored Normal file
View File

@ -0,0 +1,3 @@
module github.com/cenkalti/backoff/v4
go 1.13

108
src/vendor/github.com/cenkalti/backoff/v4/retry.go generated vendored Normal file
View File

@ -0,0 +1,108 @@
package backoff
import (
"errors"
"time"
)
// An Operation is executing by Retry() or RetryNotify().
// The operation will be retried using a backoff policy if it returns an error.
type Operation func() error
// Notify is a notify-on-error function. It receives an operation error and
// backoff delay if the operation failed (with an error).
//
// NOTE that if the backoff policy stated to stop retrying,
// the notify function isn't called.
type Notify func(error, time.Duration)
// Retry the operation o until it does not return error or BackOff stops.
// o is guaranteed to be run at least once.
//
// If o returns a *PermanentError, the operation is not retried, and the
// wrapped error is returned.
//
// Retry sleeps the goroutine for the duration returned by BackOff after a
// failed operation returns.
func Retry(o Operation, b BackOff) error {
return RetryNotify(o, b, nil)
}
// RetryNotify calls notify function with the error and wait duration
// for each failed attempt before sleep.
func RetryNotify(operation Operation, b BackOff, notify Notify) error {
return RetryNotifyWithTimer(operation, b, notify, nil)
}
// RetryNotifyWithTimer calls notify function with the error and wait duration using the given Timer
// for each failed attempt before sleep.
// A default timer that uses system timer is used when nil is passed.
func RetryNotifyWithTimer(operation Operation, b BackOff, notify Notify, t Timer) error {
var err error
var next time.Duration
if t == nil {
t = &defaultTimer{}
}
defer func() {
t.Stop()
}()
ctx := getContext(b)
b.Reset()
for {
if err = operation(); err == nil {
return nil
}
var permanent *PermanentError
if errors.As(err, &permanent) {
return permanent.Err
}
if next = b.NextBackOff(); next == Stop {
return err
}
if notify != nil {
notify(err, next)
}
t.Start(next)
select {
case <-ctx.Done():
return ctx.Err()
case <-t.C():
}
}
}
// PermanentError signals that the operation should not be retried.
type PermanentError struct {
Err error
}
func (e *PermanentError) Error() string {
return e.Err.Error()
}
func (e *PermanentError) Unwrap() error {
return e.Err
}
func (e *PermanentError) Is(target error) bool {
_, ok := target.(*PermanentError)
return ok
}
// Permanent wraps the given err in a *PermanentError.
func Permanent(err error) error {
if err == nil {
return nil
}
return &PermanentError{
Err: err,
}
}

97
src/vendor/github.com/cenkalti/backoff/v4/ticker.go generated vendored Normal file
View File

@ -0,0 +1,97 @@
package backoff
import (
"context"
"sync"
"time"
)
// Ticker holds a channel that delivers `ticks' of a clock at times reported by a BackOff.
//
// Ticks will continue to arrive when the previous operation is still running,
// so operations that take a while to fail could run in quick succession.
type Ticker struct {
C <-chan time.Time
c chan time.Time
b BackOff
ctx context.Context
timer Timer
stop chan struct{}
stopOnce sync.Once
}
// NewTicker returns a new Ticker containing a channel that will send
// the time at times specified by the BackOff argument. Ticker is
// guaranteed to tick at least once. The channel is closed when Stop
// method is called or BackOff stops. It is not safe to manipulate the
// provided backoff policy (notably calling NextBackOff or Reset)
// while the ticker is running.
func NewTicker(b BackOff) *Ticker {
return NewTickerWithTimer(b, &defaultTimer{})
}
// NewTickerWithTimer returns a new Ticker with a custom timer.
// A default timer that uses system timer is used when nil is passed.
func NewTickerWithTimer(b BackOff, timer Timer) *Ticker {
if timer == nil {
timer = &defaultTimer{}
}
c := make(chan time.Time)
t := &Ticker{
C: c,
c: c,
b: b,
ctx: getContext(b),
timer: timer,
stop: make(chan struct{}),
}
t.b.Reset()
go t.run()
return t
}
// Stop turns off a ticker. After Stop, no more ticks will be sent.
func (t *Ticker) Stop() {
t.stopOnce.Do(func() { close(t.stop) })
}
func (t *Ticker) run() {
c := t.c
defer close(c)
// Ticker is guaranteed to tick at least once.
afterC := t.send(time.Now())
for {
if afterC == nil {
return
}
select {
case tick := <-afterC:
afterC = t.send(tick)
case <-t.stop:
t.c = nil // Prevent future ticks from being sent to the channel.
return
case <-t.ctx.Done():
return
}
}
}
func (t *Ticker) send(tick time.Time) <-chan time.Time {
select {
case t.c <- tick:
case <-t.stop:
return nil
}
next := t.b.NextBackOff()
if next == Stop {
t.Stop()
return nil
}
t.timer.Start(next)
return t.timer.C()
}

35
src/vendor/github.com/cenkalti/backoff/v4/timer.go generated vendored Normal file
View File

@ -0,0 +1,35 @@
package backoff
import "time"
type Timer interface {
Start(duration time.Duration)
Stop()
C() <-chan time.Time
}
// defaultTimer implements Timer interface using time.Timer
type defaultTimer struct {
timer *time.Timer
}
// C returns the timers channel which receives the current time when the timer fires.
func (t *defaultTimer) C() <-chan time.Time {
return t.timer.C
}
// Start starts the timer to fire after the given duration
func (t *defaultTimer) Start(duration time.Duration) {
if t.timer == nil {
t.timer = time.NewTimer(duration)
} else {
t.timer.Reset(duration)
}
}
// Stop is called when the timer is not used anymore and resources may be freed.
func (t *defaultTimer) Stop() {
if t.timer != nil {
t.timer.Stop()
}
}

38
src/vendor/github.com/cenkalti/backoff/v4/tries.go generated vendored Normal file
View File

@ -0,0 +1,38 @@
package backoff
import "time"
/*
WithMaxRetries creates a wrapper around another BackOff, which will
return Stop if NextBackOff() has been called too many times since
the last time Reset() was called
Note: Implementation is not thread-safe.
*/
func WithMaxRetries(b BackOff, max uint64) BackOff {
return &backOffTries{delegate: b, maxTries: max}
}
type backOffTries struct {
delegate BackOff
maxTries uint64
numTries uint64
}
func (b *backOffTries) NextBackOff() time.Duration {
if b.maxTries == 0 {
return Stop
}
if b.maxTries > 0 {
if b.maxTries <= b.numTries {
return Stop
}
b.numTries++
}
return b.delegate.NextBackOff()
}
func (b *backOffTries) Reset() {
b.numTries = 0
b.delegate.Reset()
}

View File

@ -152,6 +152,9 @@ github.com/casbin/casbin/rbac/default-role-manager
github.com/casbin/casbin/util
# github.com/cenkalti/backoff v2.2.1+incompatible
## explicit
# github.com/cenkalti/backoff/v4 v4.1.0
## explicit
github.com/cenkalti/backoff/v4
# github.com/cespare/xxhash/v2 v2.1.1
github.com/cespare/xxhash/v2
# github.com/cloudflare/cfssl v0.0.0-20190510060611-9c027c93ba9e