Skip to content

Commit

Permalink
Merge pull request #8 from FortnoxAB/feature/kafka
Browse files Browse the repository at this point in the history
Listen to webhooks which are piped through kafka
  • Loading branch information
jonaz authored Nov 26, 2024
2 parents 763c1e8 + 6b4f46a commit 3b9db22
Show file tree
Hide file tree
Showing 8 changed files with 331 additions and 60 deletions.
38 changes: 27 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,47 +3,63 @@ module github.com/fortnoxab/renovator
go 1.23.2

require (
github.com/IBM/sarama v1.43.3
github.com/fortnoxab/ginprometheus v0.0.0-20211026110220-d3da4ce1dc2b
github.com/gin-contrib/pprof v1.5.1
github.com/gin-gonic/gin v1.10.0
github.com/jonaz/ginlogrus v0.0.0-20191118094232-2f4da50f5dd6
github.com/prometheus/client_golang v1.11.0
github.com/jonaz/mgit v0.0.0-20241126102940-3dcd290b3a55
github.com/prometheus/client_golang v1.20.5
github.com/redis/go-redis/v9 v9.7.0
github.com/robfig/cron/v3 v3.0.1
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.9.0
github.com/stretchr/testify v1.10.0
github.com/urfave/cli/v2 v2.27.5
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/bytedance/sonic v1.12.4 // indirect
github.com/bytedance/sonic v1.12.5 // indirect
github.com/bytedance/sonic/loader v0.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/gabriel-vasile/mimetype v1.4.6 // indirect
github.com/eapache/go-resiliency v1.7.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.7 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.22.1 // indirect
github.com/go-playground/validator/v10 v10.23.0 // indirect
github.com/goccy/go-json v0.10.3 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/klauspost/cpuid/v2 v2.2.9 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.60.1 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
Expand Down
145 changes: 125 additions & 20 deletions go.sum

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func app() *cli.App {
Usage: "webserver port for pprof and metrics",
Value: "8080",
},
&cli.StringFlag{
Name: "kafka-brokers",
Usage: "listen to bitbucket webhook transported over kafka",
},
},
},
{
Expand Down
8 changes: 3 additions & 5 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"sync"

"github.com/fortnoxab/renovator/pkg/command"
"github.com/fortnoxab/renovator/pkg/master"
localredis "github.com/fortnoxab/renovator/pkg/redis"
"github.com/fortnoxab/renovator/pkg/renovate"
"github.com/fortnoxab/renovator/pkg/webserver"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -85,15 +85,13 @@ func (a *Agent) Run(ctx context.Context) {
}

for ctx.Err() == nil {
repos, err := a.RedisClient.BLPop(ctx, 0, master.RedisRepoListKey).Result() // 0 duration == block until key exists.
repos, err := a.RedisClient.BLPop(ctx, 0, localredis.RedisRepoListKey).Result() // 0 duration == block until key exists.
if err != nil {
logrus.Error("BLpop err: ", err)
continue
}

logrus.Debugf("got %d number of repos to process", len(repos))

if len(repos) != 2 || repos[0] != master.RedisRepoListKey {
if len(repos) != 2 || repos[0] != localredis.RedisRepoListKey {
logrus.Errorf("unexpected reply from BLpop: %s", strings.Join(repos, ","))
continue
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"

"github.com/fortnoxab/renovator/mocks"
"github.com/fortnoxab/renovator/pkg/master"
localredis "github.com/fortnoxab/renovator/pkg/redis"
"github.com/fortnoxab/renovator/pkg/renovate"
"github.com/redis/go-redis/v9"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -75,5 +75,5 @@ func (t *redisMockList) LPop() *redis.StringSliceCmd {
// Take first value and shift remaining
first := t.list[0]
t.list = t.list[1:]
return redis.NewStringSliceResult([]string{master.RedisRepoListKey, first}, nil)
return redis.NewStringSliceResult([]string{localredis.RedisRepoListKey, first}, nil)
}
121 changes: 121 additions & 0 deletions pkg/kafka/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package kafka

import (
"context"
"encoding/json"
"strings"
"sync"

"github.com/IBM/sarama"
localredis "github.com/fortnoxab/renovator/pkg/redis"
"github.com/jonaz/mgit/pkg/bitbucket"
"github.com/redis/go-redis/v9"
"github.com/sirupsen/logrus"
)

func Start(ctx context.Context, brokers string, redisClient redis.Cmdable) {
config := sarama.NewConfig()
config.Version = sarama.V3_5_1_0
group := "renovator-master"

consumer := Consumer{redis: redisClient}
client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
if err != nil {
logrus.Errorf("Error creating consumer group client: %v", err)
return
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
for {
// `Consume` should be called inside an infinite loop, when a
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := client.Consume(ctx, []string{"vcs-pullrequests"}, &consumer); err != nil {
// if errors.Is(err, sarama.ErrClosedConsumerGroup) {
// return
// }
logrus.Errorf("Error from consumer: %v", err)
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
return
}
}
}()

wg.Wait()
if err = client.Close(); err != nil {
logrus.Errorf("Error closing client: %v", err)
}
}

// Consumer represents a Sarama consumer group consumer
type Consumer struct {
redis redis.Cmdable
}

// Setup is run at the beginning of a new session, before ConsumeClaim
func (consumer *Consumer) Setup(sarama.ConsumerGroupSession) error {
return nil
}

// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (consumer *Consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}

type hookData struct {
HookData bitbucket.WebhookEvent `json:"hookData"`
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
// NOTE:
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/IBM/sarama/blob/main/consumer_group.go#L27-L29
for {
select {
case message, ok := <-claim.Messages():
if !ok {
logrus.Info("message channel was closed")
return nil
}
hookData := &hookData{}
err := json.Unmarshal(message.Value, hookData)
if err != nil {
logrus.Errorf("failed to unmarshal event to struct: %s message was: %s", err, string(message.Value))
}
session.MarkMessage(message, "")

hook := hookData.HookData
if strings.HasPrefix(hook.PullRequest.Title, "rebase!") && hook.PullRequest.Title != hook.PreviousTitle {
repo := hook.PullRequest.ToRef.Repository.Project.Key + "/" + hook.PullRequest.ToRef.Repository.Slug

// If its a webhook and its already in the queue to be processed we move it first in the queue.
err = consumer.redis.LRem(session.Context(), localredis.RedisRepoListKey, 0, repo).Err()
if err != nil {
logrus.Errorf("error LRem: %s", err)
continue
}

logrus.Infof("trigger renovate on %s due to 'rebase!' in PR %s", repo, hook.PullRequest.Links.Self[0].Href)
err = consumer.redis.LPush(session.Context(), localredis.RedisRepoListKey, []string{repo}).Err()
if err != nil {
logrus.Errorf("error LPush: %s", err)
}
}
//TODO dbouncer same hook can happen twice....

// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/IBM/sarama/issues/1192
case <-session.Context().Done():
return nil
}
}
}
39 changes: 17 additions & 22 deletions pkg/master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
"sync"

"github.com/fortnoxab/renovator/pkg/command"
"github.com/fortnoxab/renovator/pkg/kafka"
"github.com/fortnoxab/renovator/pkg/leaderelect"
localredis "github.com/fortnoxab/renovator/pkg/redis"
"github.com/fortnoxab/renovator/pkg/renovate"
"github.com/fortnoxab/renovator/pkg/webserver"
"github.com/redis/go-redis/v9"
Expand All @@ -15,8 +17,6 @@ import (
"github.com/urfave/cli/v2"
)

const RedisRepoListKey = "renovator-joblist"

type Master struct {
Renovator *renovate.Runner
RedisClient redis.Cmdable
Expand All @@ -25,6 +25,7 @@ type Master struct {
CronSchedule cron.Schedule
RunFirstTime bool
Webserver *webserver.Webserver
Brokers string
}

type autoDiscoverJob struct {
Expand Down Expand Up @@ -57,6 +58,7 @@ func NewMasterFromContext(cCtx *cli.Context) (*Master, error) {
CronSchedule: cronSchedule,
RunFirstTime: cCtx.Bool("run-first-time"),
Webserver: &webserver.Webserver{Port: cCtx.String("port"), EnableMetrics: true},
Brokers: cCtx.String("kafka-brokers"),
}, nil
}

Expand Down Expand Up @@ -101,6 +103,15 @@ func (m *Master) Run(ctx context.Context) error {
m.Webserver.Start(ctx)
}()
}

if m.Brokers != "" {
wg.Add(1)
go func() {
defer wg.Done()
kafka.Start(ctx, m.Brokers, m.RedisClient)
}()
}

// If context is cancelled, stop cronrunner and wait for job to finish
<-ctx.Done()
logrus.Debug("main context cancelled")
Expand Down Expand Up @@ -135,25 +146,9 @@ func doRun(ctx context.Context, candidate *leaderelect.Candidate, redisClient re
return err
}

var reposToQueue []string
reposInQueue, err := redisClient.LRange(ctx, RedisRepoListKey, 0, -1).Result()
if err != nil && err != redis.Nil {
return fmt.Errorf("error from LRange: %w", err)
}

for _, repo := range repos {

add := true

for _, e := range reposInQueue {
if repo == e {
add = false
break
}
}
if add {
reposToQueue = append(reposToQueue, repo)
}
reposToQueue, err := localredis.RemoveAlreadyQueued(ctx, redisClient, repos)
if err != nil {
return err
}

if len(reposToQueue) == 0 {
Expand All @@ -162,7 +157,7 @@ func doRun(ctx context.Context, candidate *leaderelect.Candidate, redisClient re
}

logrus.Debug("pushing repo list to redis")
err = redisClient.RPush(ctx, RedisRepoListKey, reposToQueue).Err()
err = redisClient.RPush(ctx, localredis.RedisRepoListKey, reposToQueue).Err()
if err != nil {
return fmt.Errorf("failed to push repolist to redis, err: %w", err)
}
Expand Down
32 changes: 32 additions & 0 deletions pkg/redis/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package redis

import (
"context"
"fmt"

"github.com/redis/go-redis/v9"
)

const RedisRepoListKey = "renovator-joblist"

func RemoveAlreadyQueued(ctx context.Context, redisClient redis.Cmdable, repos []string) ([]string, error) {
var reposToQueue []string
reposInQueue, err := redisClient.LRange(ctx, RedisRepoListKey, 0, -1).Result()
if err != nil && err != redis.Nil {
return nil, fmt.Errorf("error from LRange: %w", err)
}

for _, repo := range repos {
add := true
for _, e := range reposInQueue {
if repo == e {
add = false
break
}
}
if add {
reposToQueue = append(reposToQueue, repo)
}
}
return reposToQueue, nil
}

0 comments on commit 3b9db22

Please sign in to comment.