Skip to content

Commit

Permalink
Streamline getobject (#1085)
Browse files Browse the repository at this point in the history
  • Loading branch information
roman-khimov authored Feb 27, 2025
2 parents 30f920c + 49be936 commit 91bb004
Show file tree
Hide file tree
Showing 13 changed files with 491 additions and 159 deletions.
1 change: 1 addition & 0 deletions api/data/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ type (
Owner user.ID
OwnerPublicKey keys.PublicKey
Headers map[string]string
Version string
}

// ObjectListResponseContent holds response data for object listing.
Expand Down
7 changes: 7 additions & 0 deletions api/data/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ type ExtendedObjectInfo struct {
IsLatest bool
}

// ComprehensiveObjectInfo represents metasearch result for object, with tags and lock data.
type ComprehensiveObjectInfo struct {
ID oid.ID
TagSet map[string]string
LockInfo *LockInfo
}

func (e ExtendedObjectInfo) Version() string {
if e.NodeVersion.IsUnversioned {
return UnversionedObjectVersionID
Expand Down
82 changes: 52 additions & 30 deletions api/handler/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package handler

import (
"fmt"
"io"
"net/http"
"net/url"
"strconv"
Expand All @@ -22,6 +23,10 @@ type conditionalArgs struct {
IfNoneMatch string
}

const (
defaultBufferSize = 128 * 1024
)

func fetchRangeHeader(headers http.Header, fullSize uint64) (*layer.RangeParams, error) {
const prefix = "bytes="
rangeHeader := headers.Get("Range")
Expand Down Expand Up @@ -77,8 +82,7 @@ func addSSECHeaders(responseHeader http.Header, requestHeader http.Header) {
responseHeader.Set(api.AmzServerSideEncryptionCustomerKeyMD5, requestHeader.Get(api.AmzServerSideEncryptionCustomerKeyMD5))
}

func writeHeaders(h http.Header, requestHeader http.Header, extendedInfo *data.ExtendedObjectInfo, tagSetLength int, isBucketUnversioned bool) {
info := extendedInfo.ObjectInfo
func writeHeaders(h http.Header, requestHeader http.Header, info *data.ObjectInfo, tagSetLength int, isBucketUnversioned bool) {
if len(info.ContentType) > 0 && h.Get(api.ContentType) == "" {
h.Set(api.ContentType, info.ContentType)
}
Expand All @@ -95,7 +99,7 @@ func writeHeaders(h http.Header, requestHeader http.Header, extendedInfo *data.E
h.Set(api.AmzTaggingCount, strconv.Itoa(tagSetLength))

if !isBucketUnversioned {
h.Set(api.AmzVersionID, extendedInfo.Version())
h.Set(api.AmzVersionID, info.Version)
}

if cacheControl := info.Headers[api.CacheControl]; cacheControl != "" {
Expand Down Expand Up @@ -138,12 +142,24 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
VersionID: reqInfo.URL.Query().Get(api.QueryVersionID),
}

extendedInfo, err := h.obj.GetExtendedObjectInfo(r.Context(), p)
comprehensiveObjectInfo, err := h.obj.ComprehensiveObjectInfo(r.Context(), p)
if err != nil {
h.logAndSendError(w, "could not find object", reqInfo, err)
return
}
info := extendedInfo.ObjectInfo

objectWithPayloadReader, err := h.obj.GetObjectWithPayloadReader(r.Context(), &layer.GetObjectWithPayloadReaderParams{
Owner: bktInfo.Owner,
BktInfo: bktInfo,
Object: comprehensiveObjectInfo.ID,
})

if err != nil {
h.logAndSendError(w, "could not get object meta", reqInfo, err)
return
}

info := objectWithPayloadReader.ObjectInfo

if err = checkPreconditions(info, conditional); err != nil {
h.logAndSendError(w, "precondition failed", reqInfo, err)
Expand Down Expand Up @@ -180,46 +196,52 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
return
}

t := &layer.ObjectVersion{
BktInfo: bktInfo,
ObjectName: info.Name,
}

if bktSettings.VersioningEnabled() {
t.VersionID = info.VersionID()
}

tagSet, lockInfo, err := h.obj.GetObjectTaggingAndLock(r.Context(), t, extendedInfo.NodeVersion)
if err != nil && !s3errors.IsS3Error(err, s3errors.ErrNoSuchKey) {
h.logAndSendError(w, "could not get object meta data", reqInfo, err)
return
}

if layer.IsAuthenticatedRequest(r.Context()) {
overrideResponseHeaders(w.Header(), reqInfo.URL.Query())
}

if err = h.setLockingHeaders(bktInfo, lockInfo, w.Header()); err != nil {
if err = h.setLockingHeaders(bktInfo, comprehensiveObjectInfo.LockInfo, w.Header()); err != nil {
h.logAndSendError(w, "could not get locking info", reqInfo, err)
return
}

writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned())
writeHeaders(w.Header(), r.Header, info, len(comprehensiveObjectInfo.TagSet), bktSettings.Unversioned())
if params != nil {
writeRangeHeaders(w, params, info.Size)
} else {
w.WriteHeader(http.StatusOK)
}

getParams := &layer.GetObjectParams{
ObjectInfo: info,
Writer: w,
Range: params,
BucketInfo: bktInfo,
Encryption: encryptionParams,
if params != nil || encryptionParams.Enabled() {
// unfortunately this reader is useless for us in this case, we have to re-read another one.
_ = objectWithPayloadReader.Payload.Close()

getParams := &layer.GetObjectParams{
ObjectInfo: info,
Writer: w,
Range: params,
BucketInfo: bktInfo,
Encryption: encryptionParams,
}
if err = h.obj.GetObject(r.Context(), getParams); err != nil {
h.logAndSendError(w, "could not get object", reqInfo, err)
}

return
}

var bufferSize = min(info.Size, defaultBufferSize)
if bufferSize == 0 {
bufferSize = defaultBufferSize
}
if err = h.obj.GetObject(r.Context(), getParams); err != nil {
h.logAndSendError(w, "could not get object", reqInfo, err)

buf := make([]byte, bufferSize)
if _, err = io.CopyBuffer(w, objectWithPayloadReader.Payload, buf); err != nil {
h.logAndSendError(w, "could write object output", reqInfo, err)
}

if err = objectWithPayloadReader.Payload.Close(); err != nil {
h.logAndSendError(w, "close output", reqInfo, err)
}
}

Expand Down
4 changes: 2 additions & 2 deletions api/handler/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
t.VersionID = ""
}

tagSet, lockInfo, err := h.obj.GetObjectTaggingAndLock(r.Context(), t, extendedInfo.NodeVersion)
tagSet, lockInfo, err := h.obj.GetObjectTaggingAndLock(r.Context(), t)
if err != nil && !s3errors.IsS3Error(err, s3errors.ErrNoSuchKey) {
h.logAndSendError(w, "could not get object meta data", reqInfo, err)
return
Expand Down Expand Up @@ -113,7 +113,7 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
return
}

writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned())
writeHeaders(w.Header(), r.Header, extendedInfo.ObjectInfo, len(tagSet), bktSettings.Unversioned())
w.WriteHeader(http.StatusOK)
}

Expand Down
2 changes: 1 addition & 1 deletion api/layer/compound.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/nspcc-dev/neofs-s3-gw/api/s3errors"
)

func (n *layer) GetObjectTaggingAndLock(ctx context.Context, objVersion *ObjectVersion, nodeVersion *data.NodeVersion) (map[string]string, *data.LockInfo, error) {
func (n *layer) GetObjectTaggingAndLock(ctx context.Context, objVersion *ObjectVersion) (map[string]string, *data.LockInfo, error) {
var err error
owner := n.Owner(ctx)

Expand Down
Loading

0 comments on commit 91bb004

Please sign in to comment.