diff --git a/bytes_writer.go b/bytes_writer.go index a6c0cbb..b27c8eb 100644 --- a/bytes_writer.go +++ b/bytes_writer.go @@ -1,6 +1,9 @@ package mq -import "context" +import ( + "context" + "github.com/pkg/errors" +) type BytesWriter struct { Send func(ctx context.Context, data []byte, attributes map[string]string) (string, error) @@ -8,8 +11,12 @@ type BytesWriter struct { func NewBytesWriter(send func(ctx context.Context, data []byte, attributes map[string]string) (string, error)) *BytesWriter { return &BytesWriter{Send: send} } -func (w *BytesWriter) Write(ctx context.Context, model interface{}) error { - data := model.([]byte) - _, err := w.Send(ctx, data, nil) - return err +func (w *BytesWriter) Write(ctx context.Context, data interface{}) error { + d, ok := data.([]byte) + if !ok { + return errors.New("data must be byte array ([]byte)") + } else { + _, err := w.Send(ctx, d, nil) + return err + } } diff --git a/handler/handler.go b/handler/handler.go index 6fbf75a..47d23cc 100644 --- a/handler/handler.go +++ b/handler/handler.go @@ -3,7 +3,6 @@ package handler import ( "context" "encoding/json" - "fmt" "io/ioutil" "net/http" ) @@ -26,7 +25,7 @@ func (h *SenderHandler) Receive(w http.ResponseWriter, r *http.Request) { } l := len(h.Send) if l == 0 { - respond(w, h.Response) + respond(w, http.StatusOK, h.Response) return } var result string @@ -39,16 +38,14 @@ func (h *SenderHandler) Receive(w http.ResponseWriter, r *http.Request) { } } if len(h.Response) == 0 { - respond(w, result) + respond(w, http.StatusOK, result) } else { - respond(w, h.Response) + respond(w, http.StatusOK, h.Response) } } -func respond(w http.ResponseWriter, result interface{}) { - w.WriteHeader(http.StatusOK) +func respond(w http.ResponseWriter, status int, result interface{}) error { + w.WriteHeader(status) w.Header().Set("Content-Type", "application/json") err := json.NewEncoder(w).Encode(result) - if err != nil { - fmt.Println(err) - } + return err } diff --git a/ibm-mq/health_checker.go b/ibm-mq/health_checker.go index 07cfbcf..2774088 100644 --- a/ibm-mq/health_checker.go +++ b/ibm-mq/health_checker.go @@ -1,97 +1,82 @@ -package ibmmq +package server import ( - "context" - "github.com/ibm-messaging/mq-golang/v5/ibmmq" + "crypto/tls" + "log" + "net/http" + "strconv" "time" ) -type HealthChecker struct { - name string - queueManager *ibmmq.MQQueueManager - topic string - timeout time.Duration - queue *QueueConfig - auth *MQAuth +type ServerConf struct { + Name string `mapstructure:"name" json:"name,omitempty" gorm:"column:name" bson:"name,omitempty" dynamodbav:"name,omitempty" firestore:"name,omitempty"` + Version string `mapstructure:"version" json:"version,omitempty" gorm:"column:version" bson:"version,omitempty" dynamodbav:"version,omitempty" firestore:"version,omitempty"` + Port *int64 `mapstructure:"port" json:"port,omitempty" gorm:"column:port" bson:"port,omitempty" dynamodbav:"port,omitempty" firestore:"port,omitempty"` + WriteTimeout *int64 `mapstructure:"write_timeout" json:"writeTimeout,omitempty" gorm:"column:writetimeout" bson:"writeTimeout,omitempty" dynamodbav:"writeTimeout,omitempty" firestore:"writeTimeout,omitempty"` + ReadTimeout *int64 `mapstructure:"read_timeout" json:"readTimeout,omitempty" gorm:"column:readtimeout" bson:"readTimeout,omitempty" dynamodbav:"readTimeout,omitempty" firestore:"readTimeout,omitempty"` + ReadHeaderTimeout *int64 `mapstructure:"read_header_timeout" json:"readHeaderTimeout,omitempty" gorm:"column:readheadertimeout" bson:"readHeaderTimeout,omitempty" dynamodbav:"readHeaderTimeout,omitempty" firestore:"readHeaderTimeout,omitempty"` + IdleTimeout *int64 `mapstructure:"idle_timeout" json:"idleTimeout,omitempty" gorm:"column:idletimeout" bson:"idleTimeout,omitempty" dynamodbav:"idleTimeout,omitempty" firestore:"idleTimeout,omitempty"` + MaxHeaderBytes *int `mapstructure:"max_header_bytes" json:"maxHeaderBytes,omitempty" gorm:"column:maxheaderbytes" bson:"maxHeaderBytes,omitempty" dynamodbav:"maxHeaderBytes,omitempty" firestore:"maxHeaderBytes,omitempty"` } -var qObject ibmmq.MQObject - -func NewHealthCheckerByConfig(queue *QueueConfig, auth *MQAuth, topic string, options ...string) *HealthChecker { - var name string - if len(options) >= 1 { - name = options[0] - } else { - name = "ibmmq" +func Addr(port *int64) string { + server := "" + if port != nil && *port >= 0 { + server = ":" + strconv.FormatInt(*port, 10) } - return NewIBMMQHealthCheckerByConfig(queue, auth, topic, name, 4*time.Second) + return server } -func NewHealthChecker(connection *ibmmq.MQQueueManager, topic string, options ...string) *HealthChecker { - var name string - if len(options) >= 1 { - name = options[0] +func ServerInfo(conf ServerConf) string { + if len(conf.Version) > 0 { + if conf.Port != nil && *conf.Port >= 0 { + return "Start service: " + conf.Name + " at port " + strconv.FormatInt(*conf.Port, 10) + " with version " + conf.Version + } else { + return "Start service: " + conf.Name + " with version " + conf.Version + } } else { - name = "ibmmq" + if conf.Port != nil && *conf.Port >= 0 { + return "Start service: " + conf.Name + " at port " + strconv.FormatInt(*conf.Port, 10) + } else { + return "Start service: " + conf.Name + } } - return NewHealthCheckerWithTimeout(connection, topic, name, 4*time.Second) } - -func NewHealthCheckerWithTimeout(connection *ibmmq.MQQueueManager, topic string, name string, timeouts ...time.Duration) *HealthChecker { - var timeout time.Duration - if len(timeouts) >= 1 { - timeout = timeouts[0] - } else { - timeout = 4 * time.Second +func Serve(conf ServerConf, check func(w http.ResponseWriter, r *http.Request), options ...*tls.Config) { + log.Println(ServerInfo(conf)) + http.HandleFunc("/health", check) + http.HandleFunc("/", check) + srv := CreateServer(conf, nil, options...) + err := srv.ListenAndServe() + if err != nil { + log.Println(err.Error()) + panic(err) } - return &HealthChecker{name: name, queueManager: connection, topic: topic, timeout: timeout} } - -func NewIBMMQHealthCheckerByConfig(queue *QueueConfig, auth *MQAuth, topic string, name string, timeouts ...time.Duration) *HealthChecker { - var timeout time.Duration - if len(timeouts) >= 1 { - timeout = timeouts[0] - } else { - timeout = 4 * time.Second +func CreateServer(conf ServerConf, handler http.Handler, options ...*tls.Config) *http.Server { + addr := Addr(conf.Port) + srv := http.Server{ + Addr: addr, + Handler: nil, + TLSConfig: nil, } - return &HealthChecker{name: name, queue: queue, auth: auth, topic: topic, timeout: timeout} -} - -func (s *HealthChecker) Name() string { - return s.name -} - -func (s *HealthChecker) Check(ctx context.Context) (map[string]interface{}, error) { - res := make(map[string]interface{}) - if s.queueManager == nil { - conn, err := NewQueueManagerByConfig(*s.queue, *s.auth) - if err != nil { - return res, err - } - s.queueManager = conn + if len(options) > 0 && options[0] != nil { + srv.TLSConfig = options[0] } - sd := ibmmq.NewMQSD() - sd.Options = ibmmq.MQSO_CREATE | - ibmmq.MQSO_NON_DURABLE | - ibmmq.MQSO_MANAGED - sd.ObjectString = s.topic - subscriptionObject, err := s.queueManager.Sub(sd, &qObject) - if err != nil { - return res, err + if conf.ReadTimeout != nil && *conf.ReadTimeout > 0 { + srv.ReadTimeout = time.Duration(*conf.ReadTimeout) * time.Second } - err = subscriptionObject.Close(0) - if err != nil { - return res, err + if conf.ReadHeaderTimeout != nil && *conf.ReadHeaderTimeout > 0 { + srv.ReadHeaderTimeout = time.Duration(*conf.ReadHeaderTimeout) * time.Second } - return res, nil -} - -func (s *HealthChecker) Build(ctx context.Context, data map[string]interface{}, err error) map[string]interface{} { - if err == nil { - return data + if conf.WriteTimeout != nil && *conf.WriteTimeout > 0 { + srv.WriteTimeout = time.Duration(*conf.WriteTimeout) * time.Second } - if data == nil { - return make(map[string]interface{}, 0) + if conf.IdleTimeout != nil && *conf.IdleTimeout > 0 { + srv.IdleTimeout = time.Duration(*conf.IdleTimeout) * time.Second } - data["error"] = err.Error() - return data -} \ No newline at end of file + if conf.MaxHeaderBytes != nil && *conf.MaxHeaderBytes > 0 { + srv.MaxHeaderBytes = *conf.MaxHeaderBytes + } + srv.Handler = handler + return &srv +}