Skip to content

Commit

Permalink
maintainer: use ticker to generate periodEvent (#1031)
Browse files Browse the repository at this point in the history
ref #1030
  • Loading branch information
asddongmen authored Feb 24, 2025
1 parent 0c79fe2 commit 0e5af36
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 36 deletions.
41 changes: 10 additions & 31 deletions maintainer/maintainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ type Maintainer struct {

eventCh *chann.DrainableChann[*Event]

taskScheduler threadpool.ThreadPool
mc messaging.MessageCenter
mc messaging.MessageCenter

watermark struct {
mu sync.RWMutex
Expand Down Expand Up @@ -180,7 +179,6 @@ func NewMaintainer(cfID common.ChangeFeedID,
pdClock: pdClock,
selfNode: selfNode,
eventCh: chann.NewAutoDrainChann[*Event](),
taskScheduler: taskScheduler,
startCheckpointTs: checkpointTs,
controller: NewController(cfID, checkpointTs, pdAPI, tsoClient, regionCache, taskScheduler,
cfg.Config, ddlSpan, conf.AddTableBatchSize, time.Duration(conf.CheckBalanceInterval)),
Expand Down Expand Up @@ -222,6 +220,7 @@ func NewMaintainer(cfID common.ChangeFeedID,
metrics.MaintainerGauge.WithLabelValues(cfID.Namespace(), cfID.Name()).Inc()
ctx, cancel := context.WithCancel(context.Background())
m.cancelUpdateMetrics = cancel

go m.runUpdateMetrics(ctx)
go m.runHandleEvents(ctx)

Expand Down Expand Up @@ -251,11 +250,6 @@ func NewMaintainerForRemove(cfID common.ChangeFeedID,
m := NewMaintainer(cfID, conf, unused, selfNode, taskScheduler, pdAPI,
tsoClient, regionCache, 1, false)
m.cascadeRemoving = true
// setup period event
m.submitScheduledEvent(m.taskScheduler, &Event{
changefeedID: m.id,
eventType: EventPeriod,
}, time.Now().Add(periodEventInterval))
return m
}

Expand Down Expand Up @@ -370,12 +364,6 @@ func (m *Maintainer) initialize() error {
}
m.sendMessages(m.bootstrapper.HandleNewNodes(newNodes))

// setup period event
m.submitScheduledEvent(m.taskScheduler, &Event{
changefeedID: m.id,
eventType: EventPeriod,
}, time.Now().Add(periodEventInterval))

log.Info("changefeed maintainer initialized",
zap.String("info", m.config.String()),
zap.String("id", m.id.String()),
Expand Down Expand Up @@ -837,10 +825,6 @@ func (m *Maintainer) onPeriodTask() {
m.handleResendMessage()
m.collectMetrics()
m.calCheckpointTs()
m.submitScheduledEvent(m.taskScheduler, &Event{
changefeedID: m.id,
eventType: EventPeriod,
}, time.Now().Add(periodEventInterval))
}

func (m *Maintainer) collectMetrics() {
Expand Down Expand Up @@ -877,12 +861,20 @@ func (m *Maintainer) runUpdateMetrics(ctx context.Context) {
}

func (m *Maintainer) runHandleEvents(ctx context.Context) {
ticker := time.NewTicker(periodEventInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case event := <-m.eventCh.Out():
m.HandleEvent(event)
case <-ticker.C:
m.HandleEvent(&Event{
changefeedID: m.id,
eventType: EventPeriod,
})
}
}
}
Expand All @@ -896,19 +888,6 @@ func (m *Maintainer) GetTables() []*replica.SpanReplication {
return m.controller.replicationDB.GetAllTasks()
}

// SubmitScheduledEvent submits a task to controller pool to send a future event
func (m *Maintainer) submitScheduledEvent(
scheduler threadpool.ThreadPool,
event *Event,
scheduleTime time.Time,
) {
task := func() time.Time {
m.pushEvent(event)
return time.Time{}
}
scheduler.SubmitFunc(task, scheduleTime)
}

// pushEvent is used to push event to maintainer's event channel
// event will be handled by maintainer's main loop
func (m *Maintainer) pushEvent(event *Event) {
Expand Down
6 changes: 1 addition & 5 deletions maintainer/maintainer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,11 +347,7 @@ func TestMaintainerSchedule(t *testing.T) {
maintainer.sendMessages(maintainer.bootstrapper.HandleNewNodes(
[]*node.Info{n},
))
// setup period event
maintainer.submitScheduledEvent(maintainer.taskScheduler, &Event{
changefeedID: maintainer.id,
eventType: EventPeriod,
}, time.Now().Add(time.Millisecond*500))

time.Sleep(time.Second * time.Duration(sleepTime))

require.Eventually(t, func() bool {
Expand Down

0 comments on commit 0e5af36

Please sign in to comment.