Skip to content

Commit

Permalink
Refactor bytes_writer
Browse files Browse the repository at this point in the history
  • Loading branch information
minhduc140583 committed Oct 16, 2021
1 parent 5a05f8c commit b3e926a
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 91 deletions.
17 changes: 12 additions & 5 deletions bytes_writer.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
package mq

import "context"
import (
"context"
"github.com/pkg/errors"
)

type BytesWriter struct {
Send func(ctx context.Context, data []byte, attributes map[string]string) (string, error)
}
func NewBytesWriter(send func(ctx context.Context, data []byte, attributes map[string]string) (string, error)) *BytesWriter {
return &BytesWriter{Send: send}
}
func (w *BytesWriter) Write(ctx context.Context, model interface{}) error {
data := model.([]byte)
_, err := w.Send(ctx, data, nil)
return err
func (w *BytesWriter) Write(ctx context.Context, data interface{}) error {
d, ok := data.([]byte)
if !ok {
return errors.New("data must be byte array ([]byte)")
} else {
_, err := w.Send(ctx, d, nil)
return err
}
}
15 changes: 6 additions & 9 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package handler
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
)
Expand All @@ -26,7 +25,7 @@ func (h *SenderHandler) Receive(w http.ResponseWriter, r *http.Request) {
}
l := len(h.Send)
if l == 0 {
respond(w, h.Response)
respond(w, http.StatusOK, h.Response)
return
}
var result string
Expand All @@ -39,16 +38,14 @@ func (h *SenderHandler) Receive(w http.ResponseWriter, r *http.Request) {
}
}
if len(h.Response) == 0 {
respond(w, result)
respond(w, http.StatusOK, result)
} else {
respond(w, h.Response)
respond(w, http.StatusOK, h.Response)
}
}
func respond(w http.ResponseWriter, result interface{}) {
w.WriteHeader(http.StatusOK)
func respond(w http.ResponseWriter, status int, result interface{}) error {
w.WriteHeader(status)
w.Header().Set("Content-Type", "application/json")
err := json.NewEncoder(w).Encode(result)
if err != nil {
fmt.Println(err)
}
return err
}
139 changes: 62 additions & 77 deletions ibm-mq/health_checker.go
Original file line number Diff line number Diff line change
@@ -1,97 +1,82 @@
package ibmmq
package server

import (
"context"
"github.com/ibm-messaging/mq-golang/v5/ibmmq"
"crypto/tls"
"log"
"net/http"
"strconv"
"time"
)

type HealthChecker struct {
name string
queueManager *ibmmq.MQQueueManager
topic string
timeout time.Duration
queue *QueueConfig
auth *MQAuth
type ServerConf struct {
Name string `mapstructure:"name" json:"name,omitempty" gorm:"column:name" bson:"name,omitempty" dynamodbav:"name,omitempty" firestore:"name,omitempty"`
Version string `mapstructure:"version" json:"version,omitempty" gorm:"column:version" bson:"version,omitempty" dynamodbav:"version,omitempty" firestore:"version,omitempty"`
Port *int64 `mapstructure:"port" json:"port,omitempty" gorm:"column:port" bson:"port,omitempty" dynamodbav:"port,omitempty" firestore:"port,omitempty"`
WriteTimeout *int64 `mapstructure:"write_timeout" json:"writeTimeout,omitempty" gorm:"column:writetimeout" bson:"writeTimeout,omitempty" dynamodbav:"writeTimeout,omitempty" firestore:"writeTimeout,omitempty"`
ReadTimeout *int64 `mapstructure:"read_timeout" json:"readTimeout,omitempty" gorm:"column:readtimeout" bson:"readTimeout,omitempty" dynamodbav:"readTimeout,omitempty" firestore:"readTimeout,omitempty"`
ReadHeaderTimeout *int64 `mapstructure:"read_header_timeout" json:"readHeaderTimeout,omitempty" gorm:"column:readheadertimeout" bson:"readHeaderTimeout,omitempty" dynamodbav:"readHeaderTimeout,omitempty" firestore:"readHeaderTimeout,omitempty"`
IdleTimeout *int64 `mapstructure:"idle_timeout" json:"idleTimeout,omitempty" gorm:"column:idletimeout" bson:"idleTimeout,omitempty" dynamodbav:"idleTimeout,omitempty" firestore:"idleTimeout,omitempty"`
MaxHeaderBytes *int `mapstructure:"max_header_bytes" json:"maxHeaderBytes,omitempty" gorm:"column:maxheaderbytes" bson:"maxHeaderBytes,omitempty" dynamodbav:"maxHeaderBytes,omitempty" firestore:"maxHeaderBytes,omitempty"`
}

var qObject ibmmq.MQObject

func NewHealthCheckerByConfig(queue *QueueConfig, auth *MQAuth, topic string, options ...string) *HealthChecker {
var name string
if len(options) >= 1 {
name = options[0]
} else {
name = "ibmmq"
func Addr(port *int64) string {
server := ""
if port != nil && *port >= 0 {
server = ":" + strconv.FormatInt(*port, 10)
}
return NewIBMMQHealthCheckerByConfig(queue, auth, topic, name, 4*time.Second)
return server
}
func NewHealthChecker(connection *ibmmq.MQQueueManager, topic string, options ...string) *HealthChecker {
var name string
if len(options) >= 1 {
name = options[0]
func ServerInfo(conf ServerConf) string {
if len(conf.Version) > 0 {
if conf.Port != nil && *conf.Port >= 0 {
return "Start service: " + conf.Name + " at port " + strconv.FormatInt(*conf.Port, 10) + " with version " + conf.Version
} else {
return "Start service: " + conf.Name + " with version " + conf.Version
}
} else {
name = "ibmmq"
if conf.Port != nil && *conf.Port >= 0 {
return "Start service: " + conf.Name + " at port " + strconv.FormatInt(*conf.Port, 10)
} else {
return "Start service: " + conf.Name
}
}
return NewHealthCheckerWithTimeout(connection, topic, name, 4*time.Second)
}

func NewHealthCheckerWithTimeout(connection *ibmmq.MQQueueManager, topic string, name string, timeouts ...time.Duration) *HealthChecker {
var timeout time.Duration
if len(timeouts) >= 1 {
timeout = timeouts[0]
} else {
timeout = 4 * time.Second
func Serve(conf ServerConf, check func(w http.ResponseWriter, r *http.Request), options ...*tls.Config) {
log.Println(ServerInfo(conf))
http.HandleFunc("/health", check)
http.HandleFunc("/", check)
srv := CreateServer(conf, nil, options...)
err := srv.ListenAndServe()
if err != nil {
log.Println(err.Error())
panic(err)
}
return &HealthChecker{name: name, queueManager: connection, topic: topic, timeout: timeout}
}

func NewIBMMQHealthCheckerByConfig(queue *QueueConfig, auth *MQAuth, topic string, name string, timeouts ...time.Duration) *HealthChecker {
var timeout time.Duration
if len(timeouts) >= 1 {
timeout = timeouts[0]
} else {
timeout = 4 * time.Second
func CreateServer(conf ServerConf, handler http.Handler, options ...*tls.Config) *http.Server {
addr := Addr(conf.Port)
srv := http.Server{
Addr: addr,
Handler: nil,
TLSConfig: nil,
}
return &HealthChecker{name: name, queue: queue, auth: auth, topic: topic, timeout: timeout}
}

func (s *HealthChecker) Name() string {
return s.name
}

func (s *HealthChecker) Check(ctx context.Context) (map[string]interface{}, error) {
res := make(map[string]interface{})
if s.queueManager == nil {
conn, err := NewQueueManagerByConfig(*s.queue, *s.auth)
if err != nil {
return res, err
}
s.queueManager = conn
if len(options) > 0 && options[0] != nil {
srv.TLSConfig = options[0]
}
sd := ibmmq.NewMQSD()
sd.Options = ibmmq.MQSO_CREATE |
ibmmq.MQSO_NON_DURABLE |
ibmmq.MQSO_MANAGED
sd.ObjectString = s.topic
subscriptionObject, err := s.queueManager.Sub(sd, &qObject)
if err != nil {
return res, err
if conf.ReadTimeout != nil && *conf.ReadTimeout > 0 {
srv.ReadTimeout = time.Duration(*conf.ReadTimeout) * time.Second
}
err = subscriptionObject.Close(0)
if err != nil {
return res, err
if conf.ReadHeaderTimeout != nil && *conf.ReadHeaderTimeout > 0 {
srv.ReadHeaderTimeout = time.Duration(*conf.ReadHeaderTimeout) * time.Second
}
return res, nil
}

func (s *HealthChecker) Build(ctx context.Context, data map[string]interface{}, err error) map[string]interface{} {
if err == nil {
return data
if conf.WriteTimeout != nil && *conf.WriteTimeout > 0 {
srv.WriteTimeout = time.Duration(*conf.WriteTimeout) * time.Second
}
if data == nil {
return make(map[string]interface{}, 0)
if conf.IdleTimeout != nil && *conf.IdleTimeout > 0 {
srv.IdleTimeout = time.Duration(*conf.IdleTimeout) * time.Second
}
data["error"] = err.Error()
return data
}
if conf.MaxHeaderBytes != nil && *conf.MaxHeaderBytes > 0 {
srv.MaxHeaderBytes = *conf.MaxHeaderBytes
}
srv.Handler = handler
return &srv
}

0 comments on commit b3e926a

Please sign in to comment.