-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsub.go
103 lines (83 loc) · 1.82 KB
/
sub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package pubsub
import (
"context"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/rs/zerolog"
"github.com/soulgarden/go-amqp-reconnect/rabbitmq"
)
type Sub interface {
StartConsumer(ctx context.Context) error
GetDeliveryChannel() (<-chan amqp.Delivery, error)
}
type sub struct {
conn *rabbitmq.Connection
svc Service
rmq Rmqer
cfg *Cfg
logger *zerolog.Logger
}
func NewSub(
conn *rabbitmq.Connection,
svc Service,
rmq Rmqer,
cfg *Cfg,
logger *zerolog.Logger,
) *sub {
return &sub{
conn: conn,
svc: svc,
rmq: rmq,
cfg: cfg,
logger: logger,
}
}
func (s *sub) StartConsumer(ctx context.Context) error {
delivery, err := s.GetDeliveryChannel()
if err != nil {
s.logger.Err(err).Msg("get delivery channel")
return err
}
return s.svc.Process(ctx, delivery)
}
func (s *sub) GetDeliveryChannel() (<-chan amqp.Delivery, error) {
consumeCh, err := s.rmq.OpenChannel()
if err != nil {
s.logger.Err(err).Msg("open conn channel")
return nil, err
}
err = s.rmq.QueueDeclare(consumeCh)
if err != nil {
s.logger.Err(err).Str("name", s.cfg.QueueName).Msg("declare queue")
return nil, err
}
if s.cfg.ExchangeName != "" {
err = s.rmq.ExchangeDeclare(consumeCh)
if err != nil {
s.logger.Err(err).Str("name", s.cfg.ExchangeName).Msg("declare exchange")
return nil, err
}
err = s.rmq.QueueBind(consumeCh)
if err != nil {
s.logger.Err(err).
Str("queue name", s.cfg.ExchangeName).
Str("exchange name", s.cfg.ExchangeName).
Msg("declare exchange")
return nil, err
}
}
_ = consumeCh.Qos(s.cfg.PrefetchCount, 0, false)
delivery, err := consumeCh.Consume(
s.cfg.QueueName,
"",
false,
false,
false,
false,
nil,
)
if err != nil {
s.logger.Err(err).Str("name", s.cfg.QueueName).Msg("start consume")
return nil, err
}
return delivery, err
}