diff --git a/api/handler/delete_test.go b/api/handler/delete_test.go index 8c1b9937..594a7f30 100644 --- a/api/handler/delete_test.go +++ b/api/handler/delete_test.go @@ -52,7 +52,7 @@ func TestDeleteObjectFromSuspended(t *testing.T) { putObject(t, tc, bktName, objName) versionID, isDeleteMarker := deleteObject(t, tc, bktName, objName, emptyVersion) - require.True(t, isDeleteMarker) + require.False(t, isDeleteMarker) require.Equal(t, data.UnversionedObjectVersionID, versionID) } @@ -200,7 +200,7 @@ func TestDeleteMarkers(t *testing.T) { require.Len(t, versions.DeleteMarker, 3, "invalid delete markers length") require.Len(t, versions.Version, 0, "versions must be empty") - require.Len(t, listOIDsFromMockedNeoFS(t, tc, bktName), 0, "shouldn't be any object in neofs") + require.Len(t, listOIDsFromMockedNeoFS(t, tc, bktName), 3, "should be all delete marker object in neofs") } func TestDeleteObjectFromListCache(t *testing.T) { @@ -237,7 +237,8 @@ func TestDeleteObjectCheckMarkerReturn(t *testing.T) { require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID) deleteMarkerVersion2, isDeleteMarker2 := deleteObject(t, tc, bktName, objName, deleteMarkerVersion) - require.True(t, isDeleteMarker2) + // deleting object with non-empty version - remove object from storage (even it is a delete marker). No additional markers. + require.False(t, isDeleteMarker2) versions = listVersions(t, tc, bktName) require.Len(t, versions.DeleteMarker, 0) require.Equal(t, deleteMarkerVersion, deleteMarkerVersion2) diff --git a/api/handler/get.go b/api/handler/get.go index 0e1b9c1d..8650e95a 100644 --- a/api/handler/get.go +++ b/api/handler/get.go @@ -174,10 +174,19 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) { return } + bktSettings, err := h.obj.GetBucketSettings(r.Context(), bktInfo) + if err != nil { + h.logAndSendError(w, "could not get bucket settings", reqInfo, err) + return + } + t := &layer.ObjectVersion{ BktInfo: bktInfo, ObjectName: info.Name, - VersionID: info.VersionID(), + } + + if bktSettings.VersioningEnabled() { + t.VersionID = info.VersionID() } tagSet, lockInfo, err := h.obj.GetObjectTaggingAndLock(r.Context(), t, extendedInfo.NodeVersion) @@ -195,12 +204,6 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) { return } - bktSettings, err := h.obj.GetBucketSettings(r.Context(), bktInfo) - if err != nil { - h.logAndSendError(w, "could not get bucket settings", reqInfo, err) - return - } - writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned()) if params != nil { writeRangeHeaders(w, params, info.Size) diff --git a/api/handler/locking.go b/api/handler/locking.go index 1d8c867d..81cf46e3 100644 --- a/api/handler/locking.go +++ b/api/handler/locking.go @@ -152,6 +152,27 @@ func (h *handler) PutObjectLegalHoldHandler(w http.ResponseWriter, r *http.Reque CopiesNumber: h.cfg.CopiesNumber, } + settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo) + if err != nil { + h.logAndSendError(w, "could not get bucket settings", reqInfo, err) + return + } + + if settings.VersioningEnabled() && p.ObjVersion.VersionID == "" { + headObjectPrm := &layer.HeadObjectParams{ + BktInfo: bktInfo, + Object: reqInfo.ObjectName, + } + + ei, err := h.obj.GetExtendedObjectInfo(r.Context(), headObjectPrm) + if err != nil { + h.logAndSendError(w, "could not find object", reqInfo, err) + return + } + + p.ObjVersion.VersionID = ei.ObjectInfo.VersionID() + } + if err = h.obj.PutLockInfo(r.Context(), p); err != nil { h.logAndSendError(w, "couldn't head put legal hold", reqInfo, err) return @@ -231,6 +252,27 @@ func (h *handler) PutObjectRetentionHandler(w http.ResponseWriter, r *http.Reque CopiesNumber: h.cfg.CopiesNumber, } + settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo) + if err != nil { + h.logAndSendError(w, "could not get bucket settings", reqInfo, err) + return + } + + if settings.VersioningEnabled() && p.ObjVersion.VersionID == "" { + headObjectPrm := &layer.HeadObjectParams{ + BktInfo: bktInfo, + Object: reqInfo.ObjectName, + } + + ei, err := h.obj.GetExtendedObjectInfo(r.Context(), headObjectPrm) + if err != nil { + h.logAndSendError(w, "could not find object", reqInfo, err) + return + } + + p.ObjVersion.VersionID = ei.ObjectInfo.VersionID() + } + if err = h.obj.PutLockInfo(r.Context(), p); err != nil { h.logAndSendError(w, "couldn't put legal hold", reqInfo, err) return @@ -252,12 +294,33 @@ func (h *handler) GetObjectRetentionHandler(w http.ResponseWriter, r *http.Reque return } + settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo) + if err != nil { + h.logAndSendError(w, "could not get bucket settings", reqInfo, err) + return + } + p := &layer.ObjectVersion{ BktInfo: bktInfo, ObjectName: reqInfo.ObjectName, VersionID: reqInfo.URL.Query().Get(api.QueryVersionID), } + if settings.VersioningEnabled() && p.VersionID == "" { + headObjectPrm := &layer.HeadObjectParams{ + BktInfo: bktInfo, + Object: reqInfo.ObjectName, + } + + ei, err := h.obj.GetExtendedObjectInfo(r.Context(), headObjectPrm) + if err != nil { + h.logAndSendError(w, "could not find object", reqInfo, err) + return + } + + p.VersionID = ei.ObjectInfo.VersionID() + } + lockInfo, err := h.obj.GetLockInfo(r.Context(), p) if err != nil { h.logAndSendError(w, "couldn't head lock object", reqInfo, err) diff --git a/api/layer/compound.go b/api/layer/compound.go index adc43206..f714d09d 100644 --- a/api/layer/compound.go +++ b/api/layer/compound.go @@ -26,7 +26,7 @@ func (n *layer) GetObjectTaggingAndLock(ctx context.Context, objVersion *ObjectV } } - tags, lockInfo, err = n.treeService.GetObjectTaggingAndLock(ctx, objVersion.BktInfo, nodeVersion) + tags, _, err = n.treeService.GetObjectTaggingAndLock(ctx, objVersion.BktInfo, nodeVersion) if err != nil { if errorsStd.Is(err, ErrNodeNotFound) { return nil, nil, s3errors.GetAPIError(s3errors.ErrNoSuchKey) @@ -34,8 +34,19 @@ func (n *layer) GetObjectTaggingAndLock(ctx context.Context, objVersion *ObjectV return nil, nil, err } + lockInfo, err = n.getLockDataFromObjects(ctx, objVersion.BktInfo, objVersion.ObjectName, objVersion.VersionID) + if err != nil { + return nil, nil, err + } + n.cache.PutTagging(owner, objectTaggingCacheKey(objVersion), tags) - n.cache.PutLockInfo(owner, lockObjectKey(objVersion), lockInfo) + if lockInfo != nil { + if !lockInfo.LegalHold().IsZero() || lockInfo.Retention().IsZero() { + n.cache.PutLockInfo(owner, lockObjectKey(objVersion), lockInfo) + } + } else { + lockInfo = &data.LockInfo{} + } return tags, lockInfo, nil } diff --git a/api/layer/layer.go b/api/layer/layer.go index 851d494f..2be464fb 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -1,8 +1,8 @@ package layer import ( + "bytes" "context" - "crypto/rand" "errors" "fmt" "io" @@ -19,9 +19,11 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api/layer/encryption" "github.com/nspcc-dev/neofs-s3-gw/api/s3errors" "github.com/nspcc-dev/neofs-s3-gw/creds/accessbox" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/eacl" "github.com/nspcc-dev/neofs-sdk-go/netmap" + "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/session" "github.com/nspcc-dev/neofs-sdk-go/user" @@ -73,9 +75,10 @@ type ( // HeadObjectParams stores object head request parameters. HeadObjectParams struct { - BktInfo *data.BucketInfo - Object string - VersionID string + BktInfo *data.BucketInfo + Object string + VersionID string + IsBucketVersioningEnabled bool } // ObjectVersion stores object version info. @@ -506,10 +509,17 @@ func (n *layer) GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.O func (n *layer) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ExtendedObjectInfo, error) { var objInfo *data.ExtendedObjectInfo var err error + var settings *data.BucketSettings if len(p.VersionID) == 0 { objInfo, err = n.headLastVersionIfNotDeleted(ctx, p.BktInfo, p.Object) } else { + settings, err = n.GetBucketSettings(ctx, p.BktInfo) + if err != nil { + return nil, fmt.Errorf("get bucket settings: %w", err) + } + + p.IsBucketVersioningEnabled = settings.VersioningEnabled() objInfo, err = n.headVersion(ctx, p.BktInfo, p) } if err != nil { @@ -556,74 +566,108 @@ func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.Exte }) } -func getRandomOID() (oid.ID, error) { - b := [32]byte{} - if _, err := rand.Read(b[:]); err != nil { - return oid.ID{}, err - } - - var objID oid.ID - objID.SetSHA256(b) - return objID, nil -} - func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings *data.BucketSettings, obj *VersionedObject) *VersionedObject { - if len(obj.VersionID) != 0 || settings.Unversioned() { - var nodeVersion *data.NodeVersion - if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil { - return dismissNotFoundError(obj) - } - - if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil { - if strings.Contains(obj.Error.Error(), "2050 message = object is locked") { - return obj + if settings.VersioningEnabled() { + if len(obj.VersionID) > 0 { + var deleteOID oid.ID + + if obj.VersionID == data.UnversionedObjectVersionID { + versions, err := n.searchAllVersionsInNeoFS(ctx, bkt, bkt.Owner, obj.Name, true) + if err != nil { + obj.Error = fmt.Errorf("search versions: %w", err) + if errors.Is(err, ErrNodeNotFound) { + obj.Error = nil + } + + return obj + } + + if len(versions) == 0 { + obj.Error = nil + return obj + } + + for _, version := range versions { + if obj.Error = n.objectDelete(ctx, bkt, version.GetID()); obj.Error != nil { + return obj + } + } + } else { + if err := deleteOID.DecodeString(obj.VersionID); err != nil { + obj.Error = fmt.Errorf("decode version: %w", err) + return obj + } + + if obj.Error = n.objectDelete(ctx, bkt, deleteOID); obj.Error != nil { + return obj + } } - - n.log.Info("remove old version", zap.Error(obj.Error)) + } else { + var markerOID oid.ID + markerOID, obj.Error = n.putDeleteMarker(ctx, bkt, obj.Name) + obj.DeleteMarkVersion = markerOID.EncodeToString() } - obj.Error = n.treeService.RemoveVersion(ctx, bkt, nodeVersion.ID) - n.cache.CleanListCacheEntriesContainingObject(obj.Name, bkt.CID) + n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name) return obj } - var newVersion *data.NodeVersion - if settings.VersioningSuspended() { obj.VersionID = data.UnversionedObjectVersionID - var nodeVersion *data.NodeVersion - if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil { - return dismissNotFoundError(obj) - } + versions, err := n.searchAllVersionsInNeoFS(ctx, bkt, bkt.Owner, obj.Name, true) + if err != nil { + if errors.Is(err, ErrNodeNotFound) { + obj.Error = nil + } else { + obj.Error = fmt.Errorf("search versions: %w", err) + } - if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil { return obj } + + for _, version := range versions { + if obj.Error = n.objectDelete(ctx, bkt, version.GetID()); obj.Error != nil { + return obj + } + } + + return obj } - randOID, err := getRandomOID() + versions, err := n.searchAllVersionsInNeoFS(ctx, bkt, bkt.Owner, obj.Name, false) if err != nil { - obj.Error = fmt.Errorf("couldn't get random oid: %w", err) + if errors.Is(err, ErrNodeNotFound) { + obj.Error = nil + } else { + obj.Error = fmt.Errorf("search versions: %w", err) + } + return obj } - obj.DeleteMarkVersion = randOID.EncodeToString() + if obj.VersionID == "" { + for _, ver := range versions { + if obj.Error = n.objectDelete(ctx, bkt, ver.GetID()); obj.Error != nil { + n.log.Error("could not delete object", zap.Error(obj.Error), zap.Stringer("oid", ver.GetID())) + if isErrObjectAlreadyRemoved(obj.Error) { + obj.Error = nil + continue + } - newVersion = &data.NodeVersion{ - BaseNodeVersion: data.BaseNodeVersion{ - OID: randOID, - FilePath: obj.Name, - }, - DeleteMarker: &data.DeleteMarkerInfo{ - Created: TimeNow(ctx), - Owner: n.Owner(ctx), - }, - IsUnversioned: settings.VersioningSuspended(), - } + return obj + } + } + } else { + for _, ver := range versions { + if ver.GetID().EncodeToString() == obj.VersionID { + if obj.Error = n.objectDelete(ctx, bkt, ver.GetID()); obj.Error != nil { + return obj + } - if _, obj.Error = n.treeService.AddVersion(ctx, bkt, newVersion); obj.Error != nil { - return obj + return obj + } + } } n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name) @@ -631,6 +675,19 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings return obj } +func isErrObjectAlreadyRemoved(err error) bool { + var ( + ol apistatus.ObjectAlreadyRemoved + olp *apistatus.ObjectAlreadyRemoved + ) + switch { + case errors.As(err, &ol), errors.As(err, &olp): + return true + default: + return strings.Contains(err.Error(), "object already removed") + } +} + func dismissNotFoundError(obj *VersionedObject) *VersionedObject { if s3errors.IsS3Error(obj.Error, s3errors.ErrNoSuchKey) || s3errors.IsS3Error(obj.Error, s3errors.ErrNoSuchVersion) { @@ -699,14 +756,59 @@ func (n *layer) ResolveBucket(ctx context.Context, name string) (cid.ID, error) } func (n *layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error { - nodeVersions, err := n.bucketNodeVersions(ctx, p.BktInfo, "") + objects, err := n.searchAllVersionsInNeoFS(ctx, p.BktInfo, p.BktInfo.Owner, "", false) if err != nil { - return err + if !errors.Is(err, ErrNodeNotFound) { + return err + } } - if len(nodeVersions) != 0 { + + // there are only Regular objects in slice. + if len(objects) != 0 { return s3errors.GetAPIError(s3errors.ErrBucketNotEmpty) } n.cache.DeleteBucket(p.BktInfo.Name) return n.neoFS.DeleteContainer(ctx, p.BktInfo.CID, p.SessionToken) } + +func (n *layer) putDeleteMarker(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (oid.ID, error) { + var ( + ts = strconv.FormatInt(time.Now().Unix(), 10) + params = PutObjectParams{ + BktInfo: bktInfo, + Object: objectName, + Reader: bytes.NewReader(nil), + Header: map[string]string{ + attrS3DeleteMarker: ts, + }, + } + ) + + extendedObjectInfo, err := n.PutObject(ctx, ¶ms) + if err != nil { + return oid.ID{}, fmt.Errorf("save delete marker object: %w", err) + } + + return extendedObjectInfo.ObjectInfo.ID, nil +} + +func isDeleteMarkerObject(head object.Object) bool { + for _, attr := range head.Attributes() { + if attr.Key() == attrS3DeleteMarker { + return true + } + } + + return false +} + +func getS3VersioningState(head object.Object) string { + for _, attr := range head.Attributes() { + if attr.Key() == attrS3VersioningState { + return attr.Value() + } + } + + return "" +} diff --git a/api/layer/neofs_mock.go b/api/layer/neofs_mock.go index dd2f369e..5e72ac0e 100644 --- a/api/layer/neofs_mock.go +++ b/api/layer/neofs_mock.go @@ -5,11 +5,14 @@ import ( "context" "crypto/rand" "crypto/sha256" + "encoding/base64" "encoding/hex" "errors" "fmt" "hash" "io" + "strconv" + "strings" "time" "github.com/nspcc-dev/neofs-s3-gw/api" @@ -36,6 +39,11 @@ type TestNeoFS struct { signer neofscrypto.Signer } +const ( + objectNonceSize = 8 + objectNonceAttribute = "__NEOFS__NONCE" +) + func NewTestNeoFS(signer neofscrypto.Signer) *TestNeoFS { return &TestNeoFS{ objects: make(map[string]*object.Object), @@ -250,17 +258,32 @@ func (t *TestNeoFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID id.SetSHA256(sha256.Sum256(b)) attrs := make([]object.Attribute, 0) + creationTime := prm.CreationTime + if creationTime.IsZero() { + creationTime = time.Now() + } + + var a *object.Attribute + a = object.NewAttribute(object.AttributeTimestamp, strconv.FormatInt(creationTime.Unix(), 10)) + attrs = append(attrs, *a) if prm.Filepath != "" { - a := object.NewAttribute(object.AttributeFilePath, prm.Filepath) + a = object.NewAttribute(object.AttributeFilePath, prm.Filepath) attrs = append(attrs, *a) } for i := range prm.Attributes { - a := object.NewAttribute(prm.Attributes[i][0], prm.Attributes[i][1]) + a = object.NewAttribute(prm.Attributes[i][0], prm.Attributes[i][1]) attrs = append(attrs, *a) } + nonce := make([]byte, objectNonceSize) + if _, err := rand.Read(nonce); err != nil { + return oid.ID{}, fmt.Errorf("object nonce: %w", err) + } + objectNonceAttr := object.NewAttribute(objectNonceAttribute, base64.StdEncoding.EncodeToString(nonce)) + attrs = append(attrs, *objectNonceAttr) + obj := object.New() obj.SetContainerID(prm.Container) obj.SetID(id) @@ -459,24 +482,46 @@ func getOwner(ctx context.Context) user.ID { func (t *TestNeoFS) SearchObjects(_ context.Context, prm PrmObjectSearch) ([]oid.ID, error) { var oids []oid.ID + if len(prm.Filters) == 0 { + for _, obj := range t.objects { + oids = append(oids, obj.GetID()) + } + + return oids, nil + } + for _, obj := range t.objects { + var isOk = true + for _, attr := range obj.Attributes() { for _, f := range prm.Filters { if attr.Key() == f.Header() { switch f.Operation() { case object.MatchStringEqual: - if attr.Value() == f.Value() { - oids = append(oids, obj.GetID()) + if f.Header() == "$Object:objectType" { + isOk = isOk && obj.Type().String() == f.Value() + } else { + isOk = isOk && attr.Value() == f.Value() } case object.MatchStringNotEqual: - if attr.Value() != f.Value() { - oids = append(oids, obj.GetID()) - } + isOk = isOk && attr.Value() != f.Value() + case object.MatchCommonPrefix: + isOk = isOk && strings.HasPrefix(attr.Value(), f.Value()) default: + isOk = false } } + + if f.Header() == "$Object:objectType" { + isOk = isOk && obj.Type().String() == f.Value() + } } } + + // all filters are valid for obj. + if isOk { + oids = append(oids, obj.GetID()) + } } return oids, nil diff --git a/api/layer/object.go b/api/layer/object.go index 2d22bffd..d3634b0e 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -1,6 +1,7 @@ package layer import ( + "bytes" "cmp" "context" "crypto/sha256" @@ -26,9 +27,11 @@ import ( cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/nspcc-dev/neofs-sdk-go/user" "github.com/nspcc-dev/neofs-sdk-go/version" "github.com/panjf2000/ants/v2" "go.uber.org/zap" + "golang.org/x/exp/maps" ) type ( @@ -75,6 +78,9 @@ type ( const ( continuationToken = "" + + attrS3VersioningState = "S3-versioning-state" + attrS3DeleteMarker = "S3-delete-marker" ) func newAddress(cnr cid.ID, obj oid.ID) oid.Address { @@ -243,7 +249,13 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend CopiesNumber: p.CopiesNumber, } - prm.Attributes = make([][2]string, 0, len(p.Header)) + prm.Attributes = make([][2]string, 0, len(p.Header)+1) + + if bktSettings.Unversioned() || bktSettings.VersioningSuspended() { + prm.Attributes = append(prm.Attributes, [2]string{attrS3VersioningState, data.VersioningUnversioned}) + } else if bktSettings.VersioningEnabled() { + prm.Attributes = append(prm.Attributes, [2]string{attrS3VersioningState, data.VersioningEnabled}) + } for k, v := range p.Header { if v == "" { @@ -262,7 +274,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend n.log.Debug("put object", zap.String("reqId", reqInfo.RequestID), zap.String("bucket", p.BktInfo.Name), zap.Stringer("cid", p.BktInfo.CID), - zap.String("object", p.Object), zap.Stringer("oid", id)) + zap.String("object", p.Object), zap.Stringer("oid", id), zap.Int64("size", p.Size)) newVersion.OID = id newVersion.ETag = hex.EncodeToString(hash) @@ -275,13 +287,16 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend ObjVersion: &ObjectVersion{ BktInfo: p.BktInfo, ObjectName: p.Object, - VersionID: id.EncodeToString(), }, NewLock: p.Lock, CopiesNumber: p.CopiesNumber, NodeVersion: newVersion, // provide new version to make one less tree service call in PutLockInfo } + if bktSettings.VersioningEnabled() { + putLockInfoPrms.ObjVersion.VersionID = id.String() + } + if err = n.PutLockInfo(ctx, putLockInfoPrms); err != nil { return nil, err } @@ -376,7 +391,7 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.Bucke return extObjInfo, nil } - node, err := n.treeService.GetLatestVersion(ctx, bkt, objectName) + heads, err := n.searchAllVersionsInNeoFS(ctx, bkt, owner, objectName, false) if err != nil { if errors.Is(err, ErrNodeNotFound) { return nil, s3errors.GetAPIError(s3errors.ErrNoSuchKey) @@ -384,19 +399,15 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.Bucke return nil, err } - if node.IsDeleteMarker() { + if isDeleteMarkerObject(*heads[0]) { return nil, s3errors.GetAPIError(s3errors.ErrNoSuchKey) } - meta, err := n.objectHead(ctx, bkt, node.OID) - if err != nil { - return nil, err - } - objInfo := objectInfoFromMeta(bkt, meta) + objInfo := objectInfoFromMeta(bkt, heads[0]) // latest version. extObjInfo := &data.ExtendedObjectInfo{ ObjectInfo: objInfo, - NodeVersion: node, + NodeVersion: &data.NodeVersion{}, } n.cache.PutObjectWithName(owner, extObjInfo) @@ -404,27 +415,211 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.Bucke return extObjInfo, nil } +// searchAllVersionsInNeoFS returns all version of object by its objectName. +// +// Returns ErrNodeNotFound if zero objects found. +func (n *layer) searchAllVersionsInNeoFS(ctx context.Context, bkt *data.BucketInfo, owner user.ID, objectName string, onlyUnversioned bool) ([]*object.Object, error) { + prmSearch := PrmObjectSearch{ + Container: bkt.CID, + Filters: make(object.SearchFilters, 0, 3), + } + + n.prepareAuthParameters(ctx, &prmSearch.PrmAuth, owner) + prmSearch.Filters.AddTypeFilter(object.MatchStringEqual, object.TypeRegular) + + if len(objectName) > 0 { + prmSearch.Filters.AddFilter(object.AttributeFilePath, objectName, object.MatchStringEqual) + } + + if onlyUnversioned { + prmSearch.Filters.AddFilter(attrS3VersioningState, data.VersioningUnversioned, object.MatchStringEqual) + } + + return n.searchObjects(ctx, bkt, prmSearch) +} + +// searchAllVersionsInNeoFS returns all version of object by its objectName. +// +// Returns ErrNodeNotFound if zero objects found. +func (n *layer) searchAllVersionsInNeoFSByPrefix(ctx context.Context, bkt *data.BucketInfo, owner user.ID, prefix string, onlyUnversioned bool) ([]*object.Object, error) { + prmSearch := PrmObjectSearch{ + Container: bkt.CID, + Filters: make(object.SearchFilters, 0, 3), + } + + n.prepareAuthParameters(ctx, &prmSearch.PrmAuth, owner) + prmSearch.Filters.AddTypeFilter(object.MatchStringEqual, object.TypeRegular) + + if len(prefix) > 0 { + prmSearch.Filters.AddFilter(object.AttributeFilePath, prefix, object.MatchCommonPrefix) + } + + if onlyUnversioned { + prmSearch.Filters.AddFilter(attrS3VersioningState, data.VersioningUnversioned, object.MatchStringEqual) + } + + return n.searchObjects(ctx, bkt, prmSearch) +} + +func (n *layer) searchObjects(ctx context.Context, bkt *data.BucketInfo, prmSearch PrmObjectSearch) ([]*object.Object, error) { + ids, err := n.neoFS.SearchObjects(ctx, prmSearch) + if err != nil { + if errors.Is(err, apistatus.ErrObjectAccessDenied) { + return nil, s3errors.GetAPIError(s3errors.ErrAccessDenied) + } + + return nil, fmt.Errorf("search object version: %w", err) + } + + if len(ids) == 0 { + return nil, ErrNodeNotFound + } + + var heads = make([]*object.Object, 0, len(ids)) + + for i := range ids { + head, err := n.objectHead(ctx, bkt, ids[i]) + if err != nil { + n.log.Warn("couldn't head object", + zap.Stringer("oid", &ids[i]), + zap.Stringer("cid", bkt.CID), + zap.Error(err)) + + return nil, fmt.Errorf("couldn't head object: %w", err) + } + + // if head.Type() == object.TypeTombstone || head.Type() == object.TypeLink || head.Type() == object.TypeLock { + // continue + // } + + // The object is a part of split chain, it doesn't exist for user. + if head.HasParent() { + continue + } + + heads = append(heads, head) + } + + slices.SortFunc(heads, sortObjectsFunc) + + return heads, nil +} + +func sortObjectsFunc(a, b *object.Object) int { + if c := cmp.Compare(b.CreationEpoch(), a.CreationEpoch()); c != 0 { // reverse order. + return c + } + + var ( + aCreated int64 + bCreated int64 + ) + + for _, attr := range a.Attributes() { + if attr.Key() == object.AttributeTimestamp { + aCreated, _ = strconv.ParseInt(attr.Value(), 10, 64) + break + } + } + for _, attr := range b.Attributes() { + if attr.Key() == object.AttributeTimestamp { + bCreated, _ = strconv.ParseInt(attr.Value(), 10, 64) + break + } + } + + if c := cmp.Compare(bCreated, aCreated); c != 0 { // reverse order. + return c + } + + bID := b.GetID() + aID := a.GetID() + + // It is a temporary decision. We can't figure out what object was first and what the second right now. + return bytes.Compare(bID[:], aID[:]) // reverse order. +} + +func sortObjectsFuncByFilePath(a, b *object.Object) int { + var aPath string + var bPath string + + for _, attr := range a.Attributes() { + if attr.Key() == object.AttributeFilePath { + aPath = attr.Value() + } + } + for _, attr := range b.Attributes() { + if attr.Key() == object.AttributeFilePath { + bPath = attr.Value() + } + } + + return cmp.Compare(aPath, bPath) +} + +func (n *layer) searchLatestVersionsByPrefix(ctx context.Context, bkt *data.BucketInfo, owner user.ID, prefix string, onlyUnversioned bool) ([]*object.Object, error) { + heads, err := n.searchAllVersionsInNeoFSByPrefix(ctx, bkt, owner, prefix, onlyUnversioned) + if err != nil { + if errors.Is(err, apistatus.ErrObjectAccessDenied) { + return nil, s3errors.GetAPIError(s3errors.ErrAccessDenied) + } + + return nil, fmt.Errorf("get all versions by prefix: %w", err) + } + + var uniq = make(map[string]*object.Object, len(heads)) + + for _, head := range heads { + var filePath string + for _, attr := range head.Attributes() { + if attr.Key() == object.AttributeFilePath { + filePath = attr.Value() + break + } + } + + // take only first object, because it is the freshest one. + if _, ok := uniq[filePath]; !ok { + uniq[filePath] = head + } + } + + return maps.Values(uniq), nil +} + func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadObjectParams) (*data.ExtendedObjectInfo, error) { var err error - var foundVersion *data.NodeVersion + var foundVersion *object.Object if p.VersionID == data.UnversionedObjectVersionID { - foundVersion, err = n.treeService.GetUnversioned(ctx, bkt, p.Object) + versions, err := n.searchAllVersionsInNeoFS(ctx, bkt, bkt.Owner, p.Object, true) if err != nil { if errors.Is(err, ErrNodeNotFound) { return nil, s3errors.GetAPIError(s3errors.ErrNoSuchVersion) } return nil, err } + + foundVersion = versions[0] } else { - versions, err := n.treeService.GetVersions(ctx, bkt, p.Object) + versions, err := n.searchAllVersionsInNeoFS(ctx, bkt, bkt.Owner, p.Object, false) if err != nil { - return nil, fmt.Errorf("couldn't get versions: %w", err) + if errors.Is(err, ErrNodeNotFound) { + return nil, s3errors.GetAPIError(s3errors.ErrNoSuchVersion) + } + return nil, err } - for _, version := range versions { - if version.OID.EncodeToString() == p.VersionID { - foundVersion = version - break + if p.IsBucketVersioningEnabled { + for _, version := range versions { + if version.GetID().EncodeToString() == p.VersionID { + foundVersion = version + break + } + } + } else { + // If versioning is not enabled, user "should see" only last version of uploaded object. + if versions[0].GetID().EncodeToString() == p.VersionID { + foundVersion = versions[0] } } if foundVersion == nil { @@ -432,12 +627,13 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb } } + id := foundVersion.GetID() owner := n.Owner(ctx) - if extObjInfo := n.cache.GetObject(owner, newAddress(bkt.CID, foundVersion.OID)); extObjInfo != nil { + if extObjInfo := n.cache.GetObject(owner, newAddress(bkt.CID, id)); extObjInfo != nil { return extObjInfo, nil } - meta, err := n.objectHead(ctx, bkt, foundVersion.OID) + meta, err := n.objectHead(ctx, bkt, id) if err != nil { if errors.Is(err, apistatus.ErrObjectNotFound) { return nil, s3errors.GetAPIError(s3errors.ErrNoSuchVersion) @@ -448,7 +644,7 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb extObjInfo := &data.ExtendedObjectInfo{ ObjectInfo: objInfo, - NodeVersion: foundVersion, + NodeVersion: &data.NodeVersion{}, } n.cache.PutObject(owner, extObjInfo) @@ -592,33 +788,37 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) cacheKey := cache.CreateObjectsListCacheKey(p.Bucket.CID, p.Prefix, true) nodeVersions := n.cache.GetList(owner, cacheKey) + var rawHeads []*object.Object + if nodeVersions == nil { - nodeVersions, err = n.treeService.GetLatestVersionsByPrefix(ctx, p.Bucket, p.Prefix) + rawHeads, err = n.searchLatestVersionsByPrefix(ctx, p.Bucket, p.Bucket.Owner, p.Prefix, false) if err != nil { + if errors.Is(err, ErrNodeNotFound) { + return nil, nil, nil + } + return nil, nil, err } - n.cache.PutList(owner, cacheKey, nodeVersions) } - if len(nodeVersions) == 0 { + if len(rawHeads) == 0 { return nil, nil, nil } - slices.SortFunc(nodeVersions, func(a, b *data.NodeVersion) int { - return cmp.Compare(a.FilePath, b.FilePath) - }) + slices.SortFunc(rawHeads, sortObjectsFuncByFilePath) + existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories - poolCtx, cancel := context.WithCancel(ctx) - defer cancel() - objOutCh, err := n.initWorkerPool(poolCtx, 2, p, nodesGenerator(poolCtx, p, nodeVersions)) - if err != nil { - return nil, nil, fmt.Errorf("failed to init worker pool: %w", err) - } + for _, head := range rawHeads { + if shouldSkip(head, p, existed) { + continue + } - objects = make([]*data.ObjectInfo, 0, p.MaxKeys) + var oi *data.ObjectInfo + if oi = tryDirectoryFromObject(p.Bucket, p.Prefix, p.Delimiter, *head); oi == nil { + oi = objectInfoFromMeta(p.Bucket, head) + } - for obj := range objOutCh { - objects = append(objects, obj) + objects = append(objects, oi) } slices.SortFunc(objects, func(a, b *data.ObjectInfo) int { @@ -633,34 +833,6 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) return } -func nodesGenerator(ctx context.Context, p allObjectParams, nodeVersions []*data.NodeVersion) <-chan *data.NodeVersion { - nodeCh := make(chan *data.NodeVersion) - existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories - - go func() { - var generated int - LOOP: - for _, node := range nodeVersions { - if shouldSkip(node, p, existed) { - continue - } - - select { - case <-ctx.Done(): - break LOOP - case nodeCh <- node: - generated++ - if generated == p.MaxKeys+1 { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken - break LOOP - } - } - } - close(nodeCh) - }() - - return nodeCh -} - func (n *layer) initWorkerPool(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ObjectInfo, error) { pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{n.log})) if err != nil { @@ -744,7 +916,7 @@ func (n *layer) bucketNodeVersions(ctx context.Context, bkt *data.BucketInfo, pr } func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *data.BucketInfo, prefix, delimiter string) (map[string][]*data.ExtendedObjectInfo, error) { - nodeVersions, err := n.bucketNodeVersions(ctx, bkt, prefix) + nodeVersions, err := n.searchAllVersionsInNeoFS(ctx, bkt, bkt.Owner, prefix, false) if err != nil { return nil, err } @@ -754,21 +926,66 @@ func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *data.BucketInfo, for _, nodeVersion := range nodeVersions { oi := &data.ObjectInfo{} - if nodeVersion.IsDeleteMarker() { // delete marker does not match any object in NeoFS - oi.ID = nodeVersion.OID - oi.Name = nodeVersion.FilePath - oi.Owner = nodeVersion.DeleteMarker.Owner - oi.Created = nodeVersion.DeleteMarker.Created + if isDeleteMarkerObject(*nodeVersion) { + oi.ID = nodeVersion.GetID() + oi.Name = filepathFromObject(nodeVersion) + oi.Size = int64(nodeVersion.PayloadSize()) + if owner := nodeVersion.OwnerID(); owner != nil { + oi.Owner = *owner + } + + for _, attr := range nodeVersion.Attributes() { + if attr.Key() == object.AttributeTimestamp { + ts, err := strconv.ParseInt(attr.Value(), 10, 64) + if err != nil { + return nil, err + } + + oi.Created = time.Unix(ts, 0) + break + } + } + oi.IsDeleteMarker = true } else { - if oi = n.objectInfoFromObjectsCacheOrNeoFS(ctx, bkt, nodeVersion, prefix, delimiter); oi == nil { + nv := data.NodeVersion{ + BaseNodeVersion: data.BaseNodeVersion{ + OID: nodeVersion.GetID(), + FilePath: prefix, + }, + } + + state := getS3VersioningState(*nodeVersion) + nv.IsUnversioned = state == data.VersioningUnversioned + + if oi = n.objectInfoFromObjectsCacheOrNeoFS(ctx, bkt, &nv, prefix, delimiter); oi == nil { continue } } + state := getS3VersioningState(*nodeVersion) + eoi := &data.ExtendedObjectInfo{ - ObjectInfo: oi, - NodeVersion: nodeVersion, + ObjectInfo: oi, + NodeVersion: &data.NodeVersion{ + BaseNodeVersion: data.BaseNodeVersion{ + ID: 0, + ParenID: 0, + OID: oi.ID, + Timestamp: uint64(oi.Created.Unix()), + Size: 0, + ETag: "", + FilePath: oi.Name, + }, + IsUnversioned: state == data.VersioningUnversioned, + }, + } + + if oi.IsDeleteMarker { + eoi.NodeVersion.DeleteMarker = &data.DeleteMarkerInfo{ + Created: oi.Created, + Owner: *nodeVersion.OwnerID(), + } } objVersions, ok := versions[oi.Name] @@ -788,13 +1005,13 @@ func IsSystemHeader(key string) bool { return ok || strings.HasPrefix(key, api.NeoFSSystemMetadataPrefix) } -func shouldSkip(node *data.NodeVersion, p allObjectParams, existed map[string]struct{}) bool { - if node.IsDeleteMarker() { +func shouldSkip(head *object.Object, p allObjectParams, existed map[string]struct{}) bool { + if isDeleteMarkerObject(*head) { return true } - filePath := node.FilePath - if dirName := tryDirectoryName(node, p.Prefix, p.Delimiter); len(dirName) != 0 { + filePath := filePathFromAttributes(*head) + if dirName := tryDirectoryName(filePath, p.Prefix, p.Delimiter); len(dirName) != 0 { filePath = dirName } if _, ok := existed[filePath]; ok { @@ -807,7 +1024,7 @@ func shouldSkip(node *data.NodeVersion, p allObjectParams, existed map[string]st if p.ContinuationToken != "" { if _, ok := existed[continuationToken]; !ok { - if p.ContinuationToken != node.OID.EncodeToString() { + if p.ContinuationToken != head.GetID().EncodeToString() { return true } existed[continuationToken] = struct{}{} @@ -865,7 +1082,7 @@ func (n *layer) objectInfoFromObjectsCacheOrNeoFS(ctx context.Context, bktInfo * } func tryDirectory(bktInfo *data.BucketInfo, node *data.NodeVersion, prefix, delimiter string) *data.ObjectInfo { - dirName := tryDirectoryName(node, prefix, delimiter) + dirName := tryDirectoryName(node.FilePath, prefix, delimiter) if len(dirName) == 0 { return nil } @@ -880,15 +1097,33 @@ func tryDirectory(bktInfo *data.BucketInfo, node *data.NodeVersion, prefix, deli } } +func tryDirectoryFromObject(bktInfo *data.BucketInfo, prefix, delimiter string, head object.Object) *data.ObjectInfo { + nv := convert(head) + + dirName := tryDirectoryName(nv.FilePath, prefix, delimiter) + if len(dirName) == 0 { + return nil + } + + return &data.ObjectInfo{ + ID: nv.OID, // to use it as continuation token + CID: bktInfo.CID, + IsDir: true, + IsDeleteMarker: nv.IsDeleteMarker(), + Bucket: bktInfo.Name, + Name: dirName, + } +} + // tryDirectoryName forms directory name by prefix and delimiter. // If node isn't a directory empty string is returned. // This function doesn't check if node has a prefix. It must do a caller. -func tryDirectoryName(node *data.NodeVersion, prefix, delimiter string) string { +func tryDirectoryName(filePath string, prefix, delimiter string) string { if len(delimiter) == 0 { return "" } - tail := strings.TrimPrefix(node.FilePath, prefix) + tail := strings.TrimPrefix(filePath, prefix) index := strings.Index(tail, delimiter) if index >= 0 { return prefix + tail[:index+1] @@ -896,3 +1131,35 @@ func tryDirectoryName(node *data.NodeVersion, prefix, delimiter string) string { return "" } + +func convert(head object.Object) data.NodeVersion { + nv := data.NodeVersion{ + BaseNodeVersion: data.BaseNodeVersion{ + OID: head.GetID(), + }, + } + + for _, attr := range head.Attributes() { + switch attr.Key() { + case object.AttributeFilePath: + nv.BaseNodeVersion.FilePath = attr.Value() + case attrS3DeleteMarker: + nv.DeleteMarker = &data.DeleteMarkerInfo{ + Created: time.Time{}, + Owner: *head.OwnerID(), + } + } + } + + return nv +} + +func filePathFromAttributes(head object.Object) string { + for _, attr := range head.Attributes() { + if attr.Key() == object.AttributeFilePath { + return attr.Value() + } + } + + return "" +} diff --git a/api/layer/system_object.go b/api/layer/system_object.go index 25005d07..1dc081ea 100644 --- a/api/layer/system_object.go +++ b/api/layer/system_object.go @@ -6,17 +6,22 @@ import ( errorsStd "errors" "fmt" "math" + "slices" "strconv" "time" "github.com/nspcc-dev/neofs-s3-gw/api/data" "github.com/nspcc-dev/neofs-s3-gw/api/s3errors" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.uber.org/zap" ) const ( - AttributeComplianceMode = ".s3-compliance-mode" + AttributeComplianceMode = ".s3-compliance-mode" + AttributeRetentionUntilMode = ".s3-retention-until" + AttributeObjectVersion = ".s3-object-version" ) type PutLockInfoParams struct { @@ -38,7 +43,7 @@ func (n *layer) PutLockInfo(ctx context.Context, p *PutLockInfoParams) (err erro } } - lockInfo, err := n.treeService.GetLock(ctx, p.ObjVersion.BktInfo, versionNode.ID) + lockInfo, err := n.getLockDataFromObjects(ctx, p.ObjVersion.BktInfo, p.ObjVersion.ObjectName, p.ObjVersion.VersionID) if err != nil && !errorsStd.Is(err, ErrNodeNotFound) { return err } @@ -68,7 +73,7 @@ func (n *layer) PutLockInfo(ctx context.Context, p *PutLockInfoParams) (err erro } } lock := &data.ObjectLock{Retention: newLock.Retention} - retentionOID, err := n.putLockObject(ctx, p.ObjVersion.BktInfo, versionNode.OID, lock, p.CopiesNumber) + retentionOID, err := n.putLockObject(ctx, p.ObjVersion.BktInfo, versionNode.OID, lock, p.CopiesNumber, p.ObjVersion.ObjectName, p.ObjVersion.VersionID) if err != nil { return err } @@ -78,7 +83,7 @@ func (n *layer) PutLockInfo(ctx context.Context, p *PutLockInfoParams) (err erro if newLock.LegalHold != nil { if newLock.LegalHold.Enabled && !lockInfo.IsLegalHoldSet() { lock := &data.ObjectLock{LegalHold: newLock.LegalHold} - legalHoldOID, err := n.putLockObject(ctx, p.ObjVersion.BktInfo, versionNode.OID, lock, p.CopiesNumber) + legalHoldOID, err := n.putLockObject(ctx, p.ObjVersion.BktInfo, versionNode.OID, lock, p.CopiesNumber, p.ObjVersion.ObjectName, p.ObjVersion.VersionID) if err != nil { return err } @@ -88,15 +93,94 @@ func (n *layer) PutLockInfo(ctx context.Context, p *PutLockInfoParams) (err erro } } - if err = n.treeService.PutLock(ctx, p.ObjVersion.BktInfo, versionNode.ID, lockInfo); err != nil { - return fmt.Errorf("couldn't put lock into tree: %w", err) - } - n.cache.PutLockInfo(n.Owner(ctx), lockObjectKey(p.ObjVersion), lockInfo) return nil } +func (n *layer) getLockDataFromObjects(ctx context.Context, bkt *data.BucketInfo, objectName, version string) (*data.LockInfo, error) { + prmSearch := PrmObjectSearch{ + Container: bkt.CID, + Filters: make(object.SearchFilters, 0, 3), + } + + n.prepareAuthParameters(ctx, &prmSearch.PrmAuth, bkt.Owner) + prmSearch.Filters.AddFilter(object.AttributeFilePath, objectName, object.MatchStringEqual) + prmSearch.Filters.AddTypeFilter(object.MatchStringEqual, object.TypeLock) + if version != "" { + prmSearch.Filters.AddFilter(AttributeObjectVersion, version, object.MatchStringEqual) + } + + ids, err := n.neoFS.SearchObjects(ctx, prmSearch) + if err != nil { + if errorsStd.Is(err, apistatus.ErrObjectAccessDenied) { + return nil, s3errors.GetAPIError(s3errors.ErrAccessDenied) + } + + return nil, fmt.Errorf("search object version: %w", err) + } + + if len(ids) == 0 { + return nil, nil + } + + var ( + heads = make([]*object.Object, 0, len(ids)) + lock data.LockInfo + ) + + for i := range ids { + head, err := n.objectHead(ctx, bkt, ids[i]) + if err != nil { + n.log.Warn("couldn't head object", + zap.Stringer("oid", &ids[i]), + zap.Stringer("cid", bkt.CID), + zap.Error(err)) + + return nil, fmt.Errorf("couldn't head object: %w", err) + } + + heads = append(heads, head) + } + + slices.SortFunc(heads, sortObjectsFunc) + slices.Reverse(heads) + + for _, head := range heads { + var ( + expEpoch uint64 + isCompliance bool + retentionUntil time.Time + ) + + for _, attr := range head.Attributes() { + switch attr.Key() { + case object.AttributeExpirationEpoch: + expEpoch, err = strconv.ParseUint(attr.Value(), 10, 64) + if err != nil { + return nil, fmt.Errorf("parse expiration epoch: %w", err) + } + case AttributeComplianceMode: + isCompliance = attr.Value() == "true" + case AttributeRetentionUntilMode: + retentionUntil, err = time.Parse(time.RFC3339, attr.Value()) + if err != nil { + return nil, fmt.Errorf("parse retention until attribute: %w", err) + } + } + } + + // legal hold. + if expEpoch == math.MaxUint64 { + lock.SetLegalHold(head.GetID()) + } else { + lock.SetRetention(head.GetID(), retentionUntil.Format(time.RFC3339), isCompliance) + } + } + + return &lock, nil +} + func (n *layer) getNodeVersionFromCacheOrNeofs(ctx context.Context, objVersion *ObjectVersion) (nodeVersion *data.NodeVersion, err error) { // check cache if node version is stored inside extendedObjectVersion nodeVersion = n.getNodeVersionFromCache(n.Owner(ctx), objVersion) @@ -108,13 +192,14 @@ func (n *layer) getNodeVersionFromCacheOrNeofs(ctx context.Context, objVersion * return nodeVersion, nil } -func (n *layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID, lock *data.ObjectLock, copiesNumber uint32) (oid.ID, error) { +func (n *layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID, lock *data.ObjectLock, copiesNumber uint32, objectName, objectVersion string) (oid.ID, error) { prm := PrmObjectCreate{ Container: bktInfo.CID, Creator: bktInfo.Owner, Locks: []oid.ID{objID}, CreationTime: TimeNow(ctx), CopiesNumber: copiesNumber, + Filepath: objectName, } var err error @@ -123,6 +208,10 @@ func (n *layer) putLockObject(ctx context.Context, bktInfo *data.BucketInfo, obj return oid.ID{}, err } + if objectVersion != "" { + prm.Attributes = append(prm.Attributes, [2]string{AttributeObjectVersion, objectVersion}) + } + id, _, err := n.objectPutAndHash(ctx, prm, bktInfo) return id, err } @@ -133,12 +222,7 @@ func (n *layer) GetLockInfo(ctx context.Context, objVersion *ObjectVersion) (*da return lockInfo, nil } - versionNode, err := n.getNodeVersion(ctx, objVersion) - if err != nil { - return nil, err - } - - lockInfo, err := n.treeService.GetLock(ctx, objVersion.BktInfo, versionNode.ID) + lockInfo, err := n.getLockDataFromObjects(ctx, objVersion.BktInfo, objVersion.ObjectName, objVersion.VersionID) if err != nil && !errorsStd.Is(err, ErrNodeNotFound) { return nil, err } @@ -146,7 +230,9 @@ func (n *layer) GetLockInfo(ctx context.Context, objVersion *ObjectVersion) (*da lockInfo = &data.LockInfo{} } - n.cache.PutLockInfo(owner, lockObjectKey(objVersion), lockInfo) + if !lockInfo.LegalHold().IsZero() || !lockInfo.Retention().IsZero() { + n.cache.PutLockInfo(owner, lockObjectKey(objVersion), lockInfo) + } return lockInfo, nil } @@ -229,6 +315,8 @@ func (n *layer) attributesFromLock(ctx context.Context, lock *data.ObjectLock) ( return nil, fmt.Errorf("fetch time to epoch: %w", err) } + result = append(result, [2]string{AttributeRetentionUntilMode, lock.Retention.Until.UTC().Format(time.RFC3339)}) + if lock.Retention.IsCompliance { result = append(result, [2]string{AttributeComplianceMode, "true"}) } diff --git a/api/layer/versioning.go b/api/layer/versioning.go index 3a0cddc6..a1df890e 100644 --- a/api/layer/versioning.go +++ b/api/layer/versioning.go @@ -3,6 +3,7 @@ package layer import ( "cmp" "context" + "errors" "slices" "github.com/nspcc-dev/neofs-s3-gw/api/data" @@ -16,6 +17,9 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar versions, err := n.getAllObjectsVersions(ctx, p.BktInfo, p.Prefix, p.Delimiter) if err != nil { + if errors.Is(err, ErrNodeNotFound) { + return res, nil + } return nil, err } @@ -31,8 +35,18 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar return cmp.Compare(b.NodeVersion.Timestamp, a.NodeVersion.Timestamp) // sort in reverse order }) + // The object with "null" version should be only one. We get only last (actual) one. + var isNullVersionCounted bool + for i, version := range sortedVersions { version.IsLatest = i == 0 + if version.NodeVersion.IsUnversioned && isNullVersionCounted { + continue + } + + if version.NodeVersion.IsUnversioned { + isNullVersionCounted = true + } allObjects = append(allObjects, version) } } diff --git a/go.mod b/go.mod index 7af26bf0..f9156cf2 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/urfave/cli/v2 v2.27.4 go.uber.org/zap v1.27.0 golang.org/x/crypto v0.31.0 + golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 golang.org/x/sync v0.10.0 google.golang.org/grpc v1.66.0 google.golang.org/protobuf v1.34.2 @@ -47,7 +48,6 @@ require ( github.com/twmb/murmur3 v1.1.8 // indirect github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect go.etcd.io/bbolt v1.3.11 // indirect - golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed // indirect )