Skip to content

Commit

Permalink
feat: [wip] - Implement html node
Browse files Browse the repository at this point in the history
  • Loading branch information
sujit-baniya committed Nov 24, 2024
1 parent bd778d8 commit 6541c64
Show file tree
Hide file tree
Showing 7 changed files with 326 additions and 44 deletions.
32 changes: 0 additions & 32 deletions dag/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/json"
"fmt"
"net/http"
"os"
"strings"

"github.com/oarkflow/mq"
Expand Down Expand Up @@ -119,34 +118,3 @@ func (tm *DAG) SetupWS() *sio.Server {
tm.Notifier = ws
return ws
}

func (tm *DAG) Handlers() {
http.Handle("/", http.FileServer(http.Dir("webroot")))
http.Handle("/notify", tm.SetupWS())
http.HandleFunc("/process", tm.render)
http.HandleFunc("/request", tm.render)
http.HandleFunc("/task/status", tm.taskStatusHandler)
http.HandleFunc("/dot", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")
fmt.Fprintln(w, tm.ExportDOT())
})
http.HandleFunc("/ui", func(w http.ResponseWriter, r *http.Request) {
image := fmt.Sprintf("%s.svg", mq.NewID())
err := tm.SaveSVG(image)
if err != nil {
http.Error(w, "Failed to read request body", http.StatusBadRequest)
return
}
defer os.Remove(image)
svgBytes, err := os.ReadFile(image)
if err != nil {
http.Error(w, "Could not read SVG file", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "image/svg+xml")
if _, err := w.Write(svgBytes); err != nil {
http.Error(w, "Could not write SVG response", http.StatusInternalServerError)
return
}
})
}
17 changes: 8 additions & 9 deletions dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ import (
"encoding/json"
"fmt"
"log"
"net/http"
"strings"
"time"

"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/recover"
"golang.org/x/time/rate"

"github.com/oarkflow/mq/sio"
Expand Down Expand Up @@ -360,14 +361,12 @@ func (tm *DAG) Start(ctx context.Context, addr string) error {
return true
})
}
log.Printf("DAG - HTTP_SERVER ~> started on http://%s", addr)
tm.Handlers()
config := tm.server.TLSConfig()
log.Printf("Server listening on http://%s", addr)
if config.UseTLS {
return http.ListenAndServeTLS(addr, config.CertPath, config.KeyPath, nil)
}
return http.ListenAndServe(addr, nil)
app := fiber.New()
app.Use(recover.New(recover.Config{
EnableStackTrace: true,
}))
tm.Handlers(app)
return app.Listen(addr)
}

func (tm *DAG) ScheduleTask(ctx context.Context, payload []byte, opts ...mq.SchedulerOption) mq.Result {
Expand Down
283 changes: 283 additions & 0 deletions dag/fiber_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,283 @@
package dag

import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"os"
"strings"

"github.com/gofiber/fiber/v2"
"github.com/oarkflow/errors"

"github.com/oarkflow/mq"

"github.com/oarkflow/mq/consts"
"github.com/oarkflow/mq/jsonparser"
)

// RenderNotFound handles 404 errors.
func renderFiberNotFound(c *fiber.Ctx) error {
html := `
<div>
<h1>task not found</h1>
<p><a href="/process">Back to home</a></p>
</div>
`
c.Set(fiber.HeaderContentType, fiber.MIMETextHTMLCharsetUTF8)
return c.Status(fiber.StatusNotFound).SendString(html)
}

// Render handles process and request routes.
func (tm *DAG) renderFiber(c *fiber.Ctx) error {
ctx, data, err := parseRequest(c)
if err != nil {
return c.Status(fiber.StatusNotFound).SendString(err.Error())
}
accept := c.Get("Accept")
userCtx := UserContext(ctx)
ctx = context.WithValue(ctx, "method", c.Method())

if c.Method() == fiber.MethodGet && userCtx.Get("task_id") != "" {
manager, ok := tm.taskManager.Get(userCtx.Get("task_id"))
if !ok || manager == nil {
if strings.Contains(accept, fiber.MIMETextHTML) || accept == "" {
return renderFiberNotFound(c)
}
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"message": "task not found"})
}
}

result := tm.Process(ctx, data)
if result.Error != nil {
return c.Status(fiber.StatusInternalServerError).JSON(fiber.Map{"message": result.Error.Error()})
}

contentType := consts.TypeJson
if ct, ok := result.Ctx.Value(consts.ContentType).(string); ok {
contentType = ct
}

switch contentType {
case consts.TypeHtml:
htmlContent, err := jsonparser.GetString(result.Payload, "html_content")
if err != nil {
return err
}
c.Set(fiber.HeaderContentType, fiber.MIMETextHTMLCharsetUTF8)
return c.SendString(htmlContent)
default:
if c.Method() != fiber.MethodPost {
return c.Status(fiber.StatusMethodNotAllowed).JSON(fiber.Map{"message": "not allowed"})
}
c.Set(fiber.HeaderContentType, fiber.MIMEApplicationJSON)
return c.JSON(result.Payload)
}
}

// TaskStatusHandler retrieves task statuses.
func (tm *DAG) fiberTaskStatusHandler(c *fiber.Ctx) error {
taskID := c.Query("taskID")
if taskID == "" {
return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{"message": "taskID is missing"})
}

manager, ok := tm.taskManager.Get(taskID)
if !ok {
return c.Status(fiber.StatusNotFound).JSON(fiber.Map{"message": "Invalid TaskID"})
}

result := make(map[string]TaskState)
manager.taskStates.ForEach(func(key string, value *TaskState) bool {
key = strings.Split(key, Delimiter)[0]
nodeID := strings.Split(value.NodeID, Delimiter)[0]
rs := jsonparser.Delete(value.Result.Payload, "html_content")
status := value.Status
if status == mq.Processing {
status = mq.Completed
}
state := TaskState{
NodeID: nodeID,
Status: status,
UpdatedAt: value.UpdatedAt,
Result: mq.Result{
Payload: rs,
Error: value.Result.Error,
Status: status,
},
}
result[key] = state
return true
})
return c.Type(fiber.MIMEApplicationJSON).JSON(result)
}

// Handlers initializes route handlers.
func (tm *DAG) Handlers(app any) {
switch a := app.(type) {
case fiber.Router:
a.All("/process", tm.renderFiber)
a.Get("/request", tm.renderFiber)
a.Get("/task/status", tm.fiberTaskStatusHandler)
a.Get("/dot", func(c *fiber.Ctx) error {
return c.Type(fiber.MIMETextPlain).SendString(tm.ExportDOT())
})
a.Get("/ui", func(c *fiber.Ctx) error {
image := fmt.Sprintf("%s.svg", mq.NewID())
defer os.Remove(image)

if err := tm.SaveSVG(image); err != nil {
return c.Status(fiber.StatusBadRequest).SendString("Failed to read request body")
}

svgBytes, err := os.ReadFile(image)
if err != nil {
return c.Status(fiber.StatusInternalServerError).SendString("Could not read SVG file")
}
c.Set(fiber.HeaderContentType, "image/svg+xml")
return c.Send(svgBytes)
})
default:
http.Handle("/", http.FileServer(http.Dir("webroot")))
http.Handle("/notify", tm.SetupWS())
http.HandleFunc("/process", tm.render)
http.HandleFunc("/request", tm.render)
http.HandleFunc("/task/status", tm.taskStatusHandler)
http.HandleFunc("/dot", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain")
fmt.Fprintln(w, tm.ExportDOT())
})
http.HandleFunc("/ui", func(w http.ResponseWriter, r *http.Request) {
image := fmt.Sprintf("%s.svg", mq.NewID())
err := tm.SaveSVG(image)
if err != nil {
http.Error(w, "Failed to read request body", http.StatusBadRequest)
return
}
defer os.Remove(image)
svgBytes, err := os.ReadFile(image)
if err != nil {
http.Error(w, "Could not read SVG file", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "image/svg+xml")
if _, err := w.Write(svgBytes); err != nil {
http.Error(w, "Could not write SVG response", http.StatusInternalServerError)
return
}
})
}
}

// parseRequest handles Fiber requests and extracts context and JSON payload.
func parseRequest(c *fiber.Ctx) (context.Context, []byte, error) {
ctx := c.UserContext()
userContext := &Context{Query: make(map[string]any)}
queryParams := c.Queries()
for key, value := range queryParams {
userContext.Query[key] = value
}
ctx = context.WithValue(ctx, "UserContext", userContext)

contentType := c.Get("Content-Type")
var result any

switch {
case strings.Contains(contentType, fiber.MIMEApplicationJSON):
body := c.Body()
if len(body) == 0 {
return ctx, nil, nil
}
var temp any
if err := json.Unmarshal(body, &temp); err != nil {
return ctx, nil, fmt.Errorf("failed to parse body: %v", err)
}
switch v := temp.(type) {
case map[string]any:
result = v
case []any:
parsedArray := make([]map[string]any, len(v))
for i, item := range v {
obj, ok := item.(map[string]any)
if !ok {
return ctx, nil, fmt.Errorf("invalid JSON array item at index %d", i)
}
parsedArray[i] = obj
}
result = parsedArray
default:
return ctx, nil, fmt.Errorf("unsupported JSON structure: %T", v)
}
case strings.Contains(contentType, fiber.MIMEApplicationForm):
body := c.BodyRaw()
if body == nil {
return ctx, nil, errors.New("empty form body")
}
formData := ParseFormToMap(body, &userContext.Query)
if formData != nil {
return ctx, nil, fmt.Errorf("failed to parse form data: %v", formData)
}
result = userContext.Query
default:
return ctx, nil, nil
}

bt, err := json.Marshal(result)
if err != nil {
return ctx, nil, err
}

return ctx, bt, nil
}

// ParseFormToMap parses form-encoded data into the provided map
func ParseFormToMap(body []byte, data *map[string]any) error {
if data == nil {
return errors.New("data map is nil")
}
if len(body) == 0 {
return errors.New("empty form body")
}
start := 0
for i := 0; i <= len(body); i++ {
if i == len(body) || body[i] == '&' {
if err := processPair(body[start:i], data); err != nil {
return err
}
start = i + 1
}
}
return nil
}

// processPair processes a key-value pair and inserts it into the map
func processPair(pair []byte, data *map[string]any) error {
if len(pair) == 0 {
return nil // Ignore empty pairs
}
eqIndex := bytes.IndexByte(pair, '=')
if eqIndex == -1 {
return errors.New("malformed key-value pair")
}

// Extract key and value
key := pair[:eqIndex]
value := pair[eqIndex+1:]

// Decode key and value (zero-allocation alternatives can replace this)
decodedKey, err := url.QueryUnescape(string(key))
if err != nil {
return err
}
decodedValue, err := url.QueryUnescape(string(value))
if err != nil {
return err
}

// Insert into the map
(*data)[decodedKey] = decodedValue
return nil
}
1 change: 0 additions & 1 deletion dag/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ func (tm *TaskManager) updateTimestamps(rs *mq.Result) {
rs.CreatedAt = tm.createdAt
rs.ProcessedAt = time.Now()
rs.Latency = time.Since(rs.CreatedAt).String()
fmt.Println(rs.Latency)
}

func (tm *TaskManager) handlePrevious(ctx context.Context, state *TaskState, result mq.Result, childNode string, dispatchFinal bool) {
Expand Down
4 changes: 2 additions & 2 deletions examples/webroot/js/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
}

window.onload = function() {
loadSVG('http://localhost:8082/ui');
loadSVG('/ui');
};
document.getElementById('send-request').addEventListener('click', function() {
const input = document.getElementById('payload');
const payloadData = JSON.parse(input.value);
const data = { payload: payloadData };

fetch('http://localhost:8082/request', {
fetch('/request', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
Expand Down
Loading

0 comments on commit 6541c64

Please sign in to comment.