-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathblob_handler.go
123 lines (109 loc) · 3.33 KB
/
blob_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package api
import (
"fmt"
"io"
"log/slog"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"time"
"github.com/buildbarn/bb-portal/ent/gen/ent"
"github.com/buildbarn/bb-portal/ent/gen/ent/blob"
"github.com/buildbarn/bb-portal/pkg/cas"
)
// A struct to handle blobs.
type blobHandler struct {
client *ent.Client
casManager *cas.ConnectionManager
}
// NewBlobHandler Constructor functio for a blob hanlder.
func NewBlobHandler(client *ent.Client, casManager *cas.ConnectionManager) http.Handler {
return &blobHandler{client: client, casManager: casManager}
}
// ServeHTTP Serve this over http.
func (b *blobHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
blobIDPathValue := request.PathValue("blobID")
name := request.PathValue("name")
blobID, err := strconv.Atoi(blobIDPathValue)
if err != nil {
writeErr(writer, request, http.StatusBadRequest, fmt.Sprintf("Invalid blobID: %s", blobIDPathValue))
return
}
// TODO: We probably want semantic IDs, not row IDs.
blobRecord, err := b.client.Blob.Get(request.Context(), blobID)
if err != nil {
writeErr(
writer,
request,
http.StatusNotFound,
fmt.Sprintf("Could not find blob with blobID: %s", blobIDPathValue),
)
return
}
b.serveBlob(writer, request, name, blobRecord)
}
// Serve a blob.
func (b *blobHandler) serveBlob(writer http.ResponseWriter, request *http.Request, name string, blobRecord *ent.Blob) {
if blobRecord.ArchivingStatus == blob.ArchivingStatusSUCCESS {
http.ServeFile(writer, request, blobRecord.ArchiveURL)
return
}
// Fallback to reading original.
uri, err := url.Parse(blobRecord.URI)
if err != nil {
writeErr(
writer,
request,
http.StatusInternalServerError,
fmt.Sprintf("Blob %d had an invalid URI: %s", blobRecord.ID, blobRecord.URI),
)
return
}
switch uri.Scheme {
case "file":
http.ServeFile(writer, request, uri.Path)
case "bytestream":
b.serveFromBytestream(writer, request, name, uri)
default:
writeErr(writer, request, http.StatusInternalServerError, fmt.Sprintf("unsupported URI scheme: %s", uri.Scheme))
}
}
// Serve from bytestream function.
func (b *blobHandler) serveFromBytestream(writer http.ResponseWriter, request *http.Request, name string, uri *url.URL) {
casClient, err := b.casManager.GetClientForURI(request.Context(), uri)
if err != nil {
writeErr(writer, request, http.StatusInternalServerError, err.Error())
return
}
defer casClient.Close()
tmpFile, err := os.CreateTemp("", filepath.Base(uri.Path))
if err != nil {
writeErr(writer, request, http.StatusInternalServerError, err.Error())
return
}
defer tmpFile.Close()
defer os.Remove(tmpFile.Name())
err = casClient.ReadBlobToFile(request.Context(), uri, tmpFile.Name())
if err != nil {
writeErr(writer, request, http.StatusInternalServerError, err.Error())
return
}
if _, err = tmpFile.Seek(0, io.SeekStart); err != nil {
writeErr(writer, request, http.StatusInternalServerError, err.Error())
return
}
http.ServeContent(writer, request, name, time.Time{}, tmpFile)
}
// A function to write an error.
func writeErr(writer http.ResponseWriter, request *http.Request, statusCode int, msg string) {
writer.WriteHeader(statusCode)
if _, err := writer.Write([]byte(msg)); err != nil {
slog.ErrorContext(
request.Context(),
"could not write response",
"statusCode", statusCode, "msg", msg,
)
}
}