Skip to content

Commit

Permalink
Refactor sqs
Browse files Browse the repository at this point in the history
  • Loading branch information
minhduc140583 committed Jul 21, 2024
1 parent 5d0f48a commit fd41352
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 32 deletions.
22 changes: 11 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,28 @@
## Message Queue Implementations
### RabbitMQ
- An open-source message broker that supports multiple messaging protocols. It provides features like message routing, persistence, and acknowledgment.
- RabbitMQ GO library is at [mq/rabbitmq](https://github.com/core-go/mq/tree/main/rabbitmq). The sample is at [go-subscription](https://github.com/project-samples/go-subscription)
- RabbitMQ nodejs library is at [rabbitmq](https://github.com/core-ts/rabbitmq). The sample is at [rabbitmq-sample](https://github.com/typescript-tutorial/rabbitmq-sample)
- RabbitMQ GO library is at [rabbitmq](https://github.com/core-go/rabbitmq). The sample is at [go-rabbit-mq-sample](https://github.com/project-samples/go-rabbit-mq-sample)
- RabbitMQ nodejs library is at [rabbitmq-ext](https://www.npmjs.com/package/rabbitmq-ext). The sample is at [rabbitmq-sample](https://github.com/typescript-tutorial/rabbitmq-sample)
### Apache Kafka
- A distributed streaming platform that handles high-throughput, low-latency message processing. It is often used for building real-time data pipelines and streaming applications.
- We support 3 Kafka GO libraries: [segmentio/kafka-go](https://github.com/core-go/mq/tree/main/kafka), [Shopify/sarama](https://github.com/core-go/mq/tree/main/sarama) and [confluent](https://github.com/core-go/mq/tree/main/confluent).
- Kafka nodejs library is at [kafka](https://github.com/core-ts/kafka). The sample is at [kafka-sample](https://github.com/typescript-tutorial/kafka-sample)
### Amazon SQS (Simple Queue Service)
- A fully managed message queue service offered by AWS. It provides a reliable, scalable, and cost-effective way to decouple and coordinate distributed software systems and microservices.
- SQS GO library is at [sqs](https://github.com/core-go/mq/tree/main/sqs). The sample is at [go-subscription](https://github.com/project-samples/go-subscription)
- SQS GO library is at [sqs](https://github.com/core-go/sqs). The sample is at [go-amazon-sqs-sample](https://github.com/project-samples/go-amazon-sqs-sample)
### Google Cloud Pub/Sub
- A fully managed messaging service that allows for event-driven systems and real-time analytics on Google Cloud Platform.
- Pub/Sub GO library is at [pubsub](https://github.com/core-go/mq/tree/main/pubsub). The sample is at [go-subscription](https://github.com/project-samples/go-subscription)
- Pub/Sub nodejs library is at [pubsub](https://github.com/core-ts/pubsub). The sample is at [pubsub-sample](https://github.com/typescript-tutorial/pubsub-sample)
- Pub/Sub GO library is at [pubsub](https://github.com/core-go/pubsub). The sample is at [go-pubsub-sample](https://github.com/project-samples/go-pubsub-sample)
- Pub/Sub nodejs library is at [google-pubsub](https://www.npmjs.com/package/google-pubsub). The sample is at [pubsub-sample](https://github.com/typescript-tutorial/pubsub-sample)
### IBM MQ
- IBM MQ at [ibmmq](https://github.com/core-go/mq/tree/main/ibmmq). The sample is at [go-subscription](https://github.com/project-samples/go-subscription)
- IBM MQ nodejs library is at [IBM MQ](https://github.com/core-ts/ibmmq). The sample is at [ibmmq-sample](https://github.com/typescript-tutorial/ibmmq-sample)
- IBM MQ at [ibmmq](https://github.com/core-go/ibmmq). The sample is at [go-ibm-mq-sample](https://github.com/project-samples/go-ibm-mq-sample)
- IBM MQ nodejs library is at [ibmmq-plus](https://github.com/core-ts/ibmmq). The sample is at [ibmmq-sample](https://github.com/typescript-tutorial/ibmmq-sample)
### Active MQ
- Active MQ at [activemq](https://github.com/core-go/mq/tree/main/activemq). The sample is at [go-subscription](https://github.com/project-samples/go-subscription)
- Active MQ nodejs library is at [Active MQ](https://github.com/core-ts/activemq). The sample is at [activemq-sample](https://github.com/typescript-tutorial/activemq-sample)
- Active MQ at [activemq](https://github.com/core-go/activemq). The sample is at [go-active-mq-sample](https://github.com/project-samples/go-active-mq-sample)
- Active MQ nodejs library is at [activemq](https://www.npmjs.com/package/activemq). The sample is at [activemq-sample](https://github.com/typescript-tutorial/activemq-sample)
### NATS
- NATS at [nats](https://github.com/core-go/mq/tree/main/nats). The sample is at [go-subscription](https://github.com/project-samples/go-subscription)
- NATS nodejs library is at [NATS](https://github.com/core-ts/nats). The sample is at [nats-sample](https://github.com/typescript-tutorial/nats-sample)
- NATS at [nats](https://github.com/core-go/nats). The sample is at [go-nats-sample](https://github.com/project-samples/go-nats-sample)
- NATS nodejs library is at [NATS](https://www.npmjs.com/package/nats-plus). The sample is at [nats-sample](https://github.com/typescript-tutorial/nats-sample)

## Installation
Please make sure to initialize a Go module before installing core-go/mq:
Expand Down
19 changes: 0 additions & 19 deletions sqs/queue_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,6 @@ type QueueSender struct {
func NewQueueSender(client *sqs.SQS, delaySeconds int64) *QueueSender {
return &QueueSender{Client: client, DelaySeconds: &delaySeconds}
}
func (p *QueueSender) SendMessage(ctx context.Context, queueName string, data []byte, attributes map[string]string) (string, error) {
queueUrl, er0 := GetQueueUrl(p.Client, queueName)
if er0 != nil {
return "", er0
}
attrs := MapToAttributes(attributes)
s := string(data)
result, err := p.Client.SendMessage(&sqs.SendMessageInput{
DelaySeconds: p.DelaySeconds,
MessageAttributes: attrs,
MessageBody: aws.String(s),
QueueUrl: &queueUrl,
})
if result != nil && result.MessageId != nil {
return *result.MessageId, err
} else {
return "", err
}
}
func (p *QueueSender) Send(ctx context.Context, queueName string, data []byte, attributes map[string]string) error {
queueUrl, er0 := GetQueueUrl(p.Client, queueName)
if er0 != nil {
Expand Down
20 changes: 18 additions & 2 deletions sqs/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,31 @@ func NewSenderByQueueName(client *sqs.SQS, queueName string, delaySeconds int64)
func NewSender(client *sqs.SQS, queueURL string, delaySeconds int64) *Sender {
return &Sender{Client: client, QueueURL: &queueURL, DelaySeconds: &delaySeconds}
}
func (p *Sender) SendMessage(ctx context.Context, data []byte, attributes map[string]string) (string, error) {
func (p *Sender) Send(ctx context.Context, data []byte, attributes map[string]string) error {
attrs := MapToAttributes(attributes)
s := string(data)
result, err := p.Client.SendMessage(&sqs.SendMessageInput{
_, err := p.Client.SendMessage(&sqs.SendMessageInput{
DelaySeconds: p.DelaySeconds,
MessageAttributes: attrs,
MessageBody: aws.String(s),
QueueUrl: p.QueueURL,
})
return err
}
func (p *Sender) SendBody(ctx context.Context, data []byte) error {
s := string(data)
_, err := p.Client.SendMessage(&sqs.SendMessageInput{
DelaySeconds: p.DelaySeconds,
MessageBody: aws.String(s),
QueueUrl: p.QueueURL,
})
return err
}
func (p *Sender) SendMessage(msg *sqs.SendMessageInput) (string, error) {
if msg == nil {
return "", nil
}
result, err := p.Client.SendMessage(msg)
if result != nil && result.MessageId != nil {
return *result.MessageId, err
} else {
Expand Down

0 comments on commit fd41352

Please sign in to comment.