diff --git a/api/handler/delete_test.go b/api/handler/delete_test.go index 8c1b9937..4ed7dfa7 100644 --- a/api/handler/delete_test.go +++ b/api/handler/delete_test.go @@ -200,7 +200,7 @@ func TestDeleteMarkers(t *testing.T) { require.Len(t, versions.DeleteMarker, 3, "invalid delete markers length") require.Len(t, versions.Version, 0, "versions must be empty") - require.Len(t, listOIDsFromMockedNeoFS(t, tc, bktName), 0, "shouldn't be any object in neofs") + require.Len(t, listOIDsFromMockedNeoFS(t, tc, bktName), 3, "should be all delete marker object in neofs") } func TestDeleteObjectFromListCache(t *testing.T) { @@ -237,7 +237,8 @@ func TestDeleteObjectCheckMarkerReturn(t *testing.T) { require.Equal(t, deleteMarkerVersion, versions.DeleteMarker[0].VersionID) deleteMarkerVersion2, isDeleteMarker2 := deleteObject(t, tc, bktName, objName, deleteMarkerVersion) - require.True(t, isDeleteMarker2) + // deleting object with non-empty version - remove object from storage (even it is a delete marker). No additional markers. + require.False(t, isDeleteMarker2) versions = listVersions(t, tc, bktName) require.Len(t, versions.DeleteMarker, 0) require.Equal(t, deleteMarkerVersion, deleteMarkerVersion2) diff --git a/api/handler/object_list_test.go b/api/handler/object_list_test.go index ab915dac..16dc9bec 100644 --- a/api/handler/object_list_test.go +++ b/api/handler/object_list_test.go @@ -118,7 +118,6 @@ func TestS3BucketListV2DelimiterPrefix(t *testing.T) { continuationToken := validateListV2(t, tc, bktName, prefix, delim, "", 1, true, false, []string{"asdf"}, empty) continuationToken = validateListV2(t, tc, bktName, prefix, delim, continuationToken, 1, true, false, empty, []string{"boo/"}) validateListV2(t, tc, bktName, prefix, delim, continuationToken, 1, false, true, empty, []string{"cquux/"}) - continuationToken = validateListV2(t, tc, bktName, prefix, delim, "", 2, true, false, []string{"asdf"}, []string{"boo/"}) validateListV2(t, tc, bktName, prefix, delim, continuationToken, 2, false, true, empty, []string{"cquux/"}) diff --git a/api/layer/layer.go b/api/layer/layer.go index 851d494f..e3b904c6 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -1,8 +1,8 @@ package layer import ( + "bytes" "context" - "crypto/rand" "errors" "fmt" "io" @@ -22,6 +22,7 @@ import ( cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/eacl" "github.com/nspcc-dev/neofs-sdk-go/netmap" + "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/session" "github.com/nspcc-dev/neofs-sdk-go/user" @@ -73,9 +74,10 @@ type ( // HeadObjectParams stores object head request parameters. HeadObjectParams struct { - BktInfo *data.BucketInfo - Object string - VersionID string + BktInfo *data.BucketInfo + Object string + VersionID string + IsBucketVersioningEnabled bool } // ObjectVersion stores object version info. @@ -506,10 +508,17 @@ func (n *layer) GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.O func (n *layer) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ExtendedObjectInfo, error) { var objInfo *data.ExtendedObjectInfo var err error + var settings *data.BucketSettings if len(p.VersionID) == 0 { objInfo, err = n.headLastVersionIfNotDeleted(ctx, p.BktInfo, p.Object) } else { + settings, err = n.GetBucketSettings(ctx, p.BktInfo) + if err != nil { + return nil, fmt.Errorf("get bucket settings: %w", err) + } + + p.IsBucketVersioningEnabled = settings.VersioningEnabled() objInfo, err = n.headVersion(ctx, p.BktInfo, p) } if err != nil { @@ -556,76 +565,193 @@ func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.Exte }) } -func getRandomOID() (oid.ID, error) { - b := [32]byte{} - if _, err := rand.Read(b[:]); err != nil { - return oid.ID{}, err - } - - var objID oid.ID - objID.SetSHA256(b) - return objID, nil -} - func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings *data.BucketSettings, obj *VersionedObject) *VersionedObject { - if len(obj.VersionID) != 0 || settings.Unversioned() { - var nodeVersion *data.NodeVersion - if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil { - return dismissNotFoundError(obj) - } - - if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil { - if strings.Contains(obj.Error.Error(), "2050 message = object is locked") { + if settings.VersioningEnabled() { + if len(obj.VersionID) > 0 { + var deleteOID oid.ID + if err := deleteOID.DecodeString(obj.VersionID); err != nil { + obj.Error = fmt.Errorf("decode version: %w", err) return obj } - n.log.Info("remove old version", zap.Error(obj.Error)) + if obj.Error = n.objectDelete(ctx, bkt, deleteOID); obj.Error != nil { + return obj + } + } else { + var markerOID oid.ID + markerOID, obj.Error = n.putDeleteMarker(ctx, bkt, obj.Name) + obj.DeleteMarkVersion = markerOID.EncodeToString() } - obj.Error = n.treeService.RemoveVersion(ctx, bkt, nodeVersion.ID) - n.cache.CleanListCacheEntriesContainingObject(obj.Name, bkt.CID) + n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name) return obj } - var newVersion *data.NodeVersion - if settings.VersioningSuspended() { obj.VersionID = data.UnversionedObjectVersionID - var nodeVersion *data.NodeVersion - if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil { - return dismissNotFoundError(obj) + // versions, err := n.getLatestObjectsVersions(ctx, bkt, bkt.Owner, obj.Name, true) + versions, err := n.searchAllVersionsInNeoFS(ctx, bkt, bkt.Owner, obj.Name, true) + if err != nil { + obj.Error = fmt.Errorf("search versions: %w", err) + return obj + } + + // if len(versions) == 0 { + // obj.Error = s3errors.GetAPIError(s3errors.ErrNoSuchKey) + // return dismissNotFoundError(obj) + // } + + if len(versions) > 1 { + obj.Error = errors.New("more than one object version found") + return obj } - if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil { + if obj.Error = n.objectDelete(ctx, bkt, versions[0].GetID()); obj.Error != nil { return obj } + + var markerOID oid.ID + markerOID, obj.Error = n.putDeleteMarker(ctx, bkt, obj.Name) + obj.DeleteMarkVersion = markerOID.EncodeToString() + + return obj } - randOID, err := getRandomOID() + versions, err := n.searchAllVersionsInNeoFS(ctx, bkt, bkt.Owner, obj.Name, false) if err != nil { - obj.Error = fmt.Errorf("couldn't get random oid: %w", err) + if errors.Is(err, ErrNodeNotFound) { + obj.Error = nil + } else { + obj.Error = fmt.Errorf("search versions: %w", err) + } + return obj } - obj.DeleteMarkVersion = randOID.EncodeToString() + // if len(versions) == 0 { + // obj.Error = s3errors.GetAPIError(s3errors.ErrNoSuchKey) + // return dismissNotFoundError(obj) + // } - newVersion = &data.NodeVersion{ - BaseNodeVersion: data.BaseNodeVersion{ - OID: randOID, - FilePath: obj.Name, - }, - DeleteMarker: &data.DeleteMarkerInfo{ - Created: TimeNow(ctx), - Owner: n.Owner(ctx), - }, - IsUnversioned: settings.VersioningSuspended(), - } + if obj.VersionID == "" { + for _, ver := range versions { + if obj.Error = n.objectDelete(ctx, bkt, ver.GetID()); obj.Error != nil { + return obj + } + } + } else { + for _, ver := range versions { + if ver.GetID().EncodeToString() == obj.VersionID { + if obj.Error = n.objectDelete(ctx, bkt, ver.GetID()); obj.Error != nil { + return obj + } - if _, obj.Error = n.treeService.AddVersion(ctx, bkt, newVersion); obj.Error != nil { - return obj + return obj + } + } } + /* + if len(obj.VersionID) > 0 { + for _, ver := range versions { + if ver.GetID().EncodeToString() == obj.VersionID { + if obj.Error = n.objectDelete(ctx, bkt, ver.GetID()); obj.Error != nil { + return obj + } + + return obj + } + } + + return dismissNotFoundError(obj) + } + + if settings.Unversioned() { + for _, ver := range versions { + for _, attr := range ver.Attributes() { + if attr.Key() == attrS3VersioningState && attr.Value() == "false" { + if obj.Error = n.objectDelete(ctx, bkt, ver.GetID()); obj.Error != nil { + return obj + } + continue + } + } + } + } + + if settings.VersioningEnabled() { + deleteMarkOID, err := n.putDeleteMarker(ctx, bkt, obj.Name) + if err != nil { + obj.Error = err + return obj + } + + obj.DeleteMarkVersion = deleteMarkOID.EncodeToString() + } + */ + + // data.UnversionedObjectVersionID + /* + if len(obj.VersionID) != 0 || settings.Unversioned() { + var nodeVersion *data.NodeVersion + if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil { + return dismissNotFoundError(obj) + } + + if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil { + if strings.Contains(obj.Error.Error(), "2050 message = object is locked") { + return obj + } + + n.log.Info("remove old version", zap.Error(obj.Error)) + } + + obj.Error = n.treeService.RemoveVersion(ctx, bkt, nodeVersion.ID) + n.cache.CleanListCacheEntriesContainingObject(obj.Name, bkt.CID) + return obj + } + + var newVersion *data.NodeVersion + + if settings.VersioningSuspended() { + obj.VersionID = data.UnversionedObjectVersionID + + var nodeVersion *data.NodeVersion + if nodeVersion, obj.Error = n.getNodeVersionToDelete(ctx, bkt, obj); obj.Error != nil { + return dismissNotFoundError(obj) + } + + if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil { + return obj + } + } + + randOID, err := getRandomOID() + if err != nil { + obj.Error = fmt.Errorf("couldn't get random oid: %w", err) + return obj + } + + obj.DeleteMarkVersion = randOID.EncodeToString() + + newVersion = &data.NodeVersion{ + BaseNodeVersion: data.BaseNodeVersion{ + OID: randOID, + FilePath: obj.Name, + }, + DeleteMarker: &data.DeleteMarkerInfo{ + Created: TimeNow(ctx), + Owner: n.Owner(ctx), + }, + IsUnversioned: settings.VersioningSuspended(), + } + + if _, obj.Error = n.treeService.AddVersion(ctx, bkt, newVersion); obj.Error != nil { + return obj + } + */ + n.cache.DeleteObjectName(bkt.CID, bkt.Name, obj.Name) return obj @@ -699,14 +825,72 @@ func (n *layer) ResolveBucket(ctx context.Context, name string) (cid.ID, error) } func (n *layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error { - nodeVersions, err := n.bucketNodeVersions(ctx, p.BktInfo, "") + objects, err := n.searchAllVersionsInNeoFS(ctx, p.BktInfo, p.BktInfo.Owner, "", false) if err != nil { - return err + if !errors.Is(err, ErrNodeNotFound) { + return err + } } - if len(nodeVersions) != 0 { - return s3errors.GetAPIError(s3errors.ErrBucketNotEmpty) + + if len(objects) != 0 { + var hasRegularObject bool + for _, obj := range objects { + if obj.Type() == object.TypeRegular { + hasRegularObject = true + break + } + } + + // skip, if only tombstones. + if hasRegularObject { + return s3errors.GetAPIError(s3errors.ErrBucketNotEmpty) + } } n.cache.DeleteBucket(p.BktInfo.Name) return n.neoFS.DeleteContainer(ctx, p.BktInfo.CID, p.SessionToken) } + +func (n *layer) putDeleteMarker(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (oid.ID, error) { + var ( + ts = strconv.FormatInt(time.Now().Unix(), 10) + params = PutObjectParams{ + BktInfo: bktInfo, + Object: objectName, + Reader: bytes.NewReader(nil), + Header: map[string]string{ + attrS3DeleteMarker: ts, + object.AttributeTimestamp: ts, + }, + } + ) + + // object.AttributeTimestamp, strconv.FormatInt(creationTime.Unix(), 10) + + id, err := n.PutObject(ctx, ¶ms) + if err != nil { + return oid.ID{}, fmt.Errorf("save delete marker object: %w", err) + } + + return id.ObjectInfo.ID, nil +} + +func isDeleteMarkerObject(head object.Object) bool { + for _, attr := range head.Attributes() { + if attr.Key() == attrS3DeleteMarker { + return true + } + } + + return false +} + +func getS3VersioningState(head object.Object) string { + for _, attr := range head.Attributes() { + if attr.Key() == attrS3VersioningState { + return attr.Value() + } + } + + return "" +} diff --git a/api/layer/neofs_mock.go b/api/layer/neofs_mock.go index dd2f369e..da78a2d5 100644 --- a/api/layer/neofs_mock.go +++ b/api/layer/neofs_mock.go @@ -5,11 +5,14 @@ import ( "context" "crypto/rand" "crypto/sha256" + "encoding/base64" "encoding/hex" "errors" "fmt" "hash" "io" + "strconv" + "strings" "time" "github.com/nspcc-dev/neofs-s3-gw/api" @@ -36,6 +39,11 @@ type TestNeoFS struct { signer neofscrypto.Signer } +const ( + objectNonceSize = 8 + objectNonceAttribute = "__NEOFS__NONCE" +) + func NewTestNeoFS(signer neofscrypto.Signer) *TestNeoFS { return &TestNeoFS{ objects: make(map[string]*object.Object), @@ -250,17 +258,32 @@ func (t *TestNeoFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID id.SetSHA256(sha256.Sum256(b)) attrs := make([]object.Attribute, 0) + creationTime := prm.CreationTime + if creationTime.IsZero() { + creationTime = time.Now() + } + + var a *object.Attribute + a = object.NewAttribute(object.AttributeTimestamp, strconv.FormatInt(creationTime.Unix(), 10)) + attrs = append(attrs, *a) if prm.Filepath != "" { - a := object.NewAttribute(object.AttributeFilePath, prm.Filepath) + a = object.NewAttribute(object.AttributeFilePath, prm.Filepath) attrs = append(attrs, *a) } for i := range prm.Attributes { - a := object.NewAttribute(prm.Attributes[i][0], prm.Attributes[i][1]) + a = object.NewAttribute(prm.Attributes[i][0], prm.Attributes[i][1]) attrs = append(attrs, *a) } + nonce := make([]byte, objectNonceSize) + if _, err := rand.Read(nonce); err != nil { + return oid.ID{}, fmt.Errorf("object nonce: %w", err) + } + objectNonceAttr := object.NewAttribute(objectNonceAttribute, base64.StdEncoding.EncodeToString(nonce)) + attrs = append(attrs, *objectNonceAttr) + obj := object.New() obj.SetContainerID(prm.Container) obj.SetID(id) @@ -459,24 +482,47 @@ func getOwner(ctx context.Context) user.ID { func (t *TestNeoFS) SearchObjects(_ context.Context, prm PrmObjectSearch) ([]oid.ID, error) { var oids []oid.ID + if len(prm.Filters) == 0 { + for _, obj := range t.objects { + oids = append(oids, obj.GetID()) + } + + return oids, nil + } + for _, obj := range t.objects { + var isOk = true + for _, attr := range obj.Attributes() { for _, f := range prm.Filters { if attr.Key() == f.Header() { switch f.Operation() { case object.MatchStringEqual: - if attr.Value() == f.Value() { - oids = append(oids, obj.GetID()) - } + isOk = isOk && attr.Value() == f.Value() + // if attr.Value() == f.Value() { + // + // oids = append(oids, obj.GetID()) + // } case object.MatchStringNotEqual: - if attr.Value() != f.Value() { - oids = append(oids, obj.GetID()) - } + isOk = isOk && attr.Value() != f.Value() + // if attr.Value() != f.Value() { + // oids = append(oids, obj.GetID()) + // } + case object.MatchCommonPrefix: + isOk = isOk && strings.HasPrefix(attr.Value(), f.Value()) + // if strings.HasPrefix(attr.Value(), f.Value()) { + // oids = append(oids, obj.GetID()) + // } default: + isOk = false } } } } + + if isOk { + oids = append(oids, obj.GetID()) + } } return oids, nil diff --git a/api/layer/object.go b/api/layer/object.go index 2d22bffd..6ced52dc 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -1,6 +1,7 @@ package layer import ( + "bytes" "cmp" "context" "crypto/sha256" @@ -26,9 +27,11 @@ import ( cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/nspcc-dev/neofs-sdk-go/user" "github.com/nspcc-dev/neofs-sdk-go/version" "github.com/panjf2000/ants/v2" "go.uber.org/zap" + "golang.org/x/exp/maps" ) type ( @@ -75,6 +78,9 @@ type ( const ( continuationToken = "" + + attrS3VersioningState = "S3-versioning-state" + attrS3DeleteMarker = "S3-delete-marker" ) func newAddress(cnr cid.ID, obj oid.ID) oid.Address { @@ -243,7 +249,21 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend CopiesNumber: p.CopiesNumber, } - prm.Attributes = make([][2]string, 0, len(p.Header)) + prm.Attributes = make([][2]string, 0, len(p.Header)+1) + + if bktSettings.Unversioned() { + prm.Attributes = append(prm.Attributes, [2]string{attrS3VersioningState, data.VersioningUnversioned}) + } else if bktSettings.VersioningSuspended() { + prm.Attributes = append(prm.Attributes, [2]string{attrS3VersioningState, data.VersioningUnversioned}) + } else if bktSettings.VersioningEnabled() { + prm.Attributes = append(prm.Attributes, [2]string{attrS3VersioningState, data.VersioningEnabled}) + } + + // if bktSettings.Unversioned() || bktSettings.VersioningSuspended() { + // prm.Attributes = append(prm.Attributes, [2]string{attrS3VersioningState, "false"}) + // } else if bktSettings.VersioningEnabled() { + // prm.Attributes = append(prm.Attributes, [2]string{attrS3VersioningState, "true"}) + // } for k, v := range p.Header { if v == "" { @@ -376,7 +396,7 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.Bucke return extObjInfo, nil } - node, err := n.treeService.GetLatestVersion(ctx, bkt, objectName) + heads, err := n.searchAllVersionsInNeoFS(ctx, bkt, owner, objectName, false) if err != nil { if errors.Is(err, ErrNodeNotFound) { return nil, s3errors.GetAPIError(s3errors.ErrNoSuchKey) @@ -384,19 +404,15 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.Bucke return nil, err } - if node.IsDeleteMarker() { + if isDeleteMarkerObject(*heads[0]) { return nil, s3errors.GetAPIError(s3errors.ErrNoSuchKey) } - meta, err := n.objectHead(ctx, bkt, node.OID) - if err != nil { - return nil, err - } - objInfo := objectInfoFromMeta(bkt, meta) + objInfo := objectInfoFromMeta(bkt, heads[0]) // latest version. extObjInfo := &data.ExtendedObjectInfo{ ObjectInfo: objInfo, - NodeVersion: node, + NodeVersion: &data.NodeVersion{}, } n.cache.PutObjectWithName(owner, extObjInfo) @@ -404,27 +420,200 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.Bucke return extObjInfo, nil } +// searchAllVersionsInNeoFS returns all version of object by its objectName. +// +// Returns ErrNodeNotFound if zero objects found. +func (n *layer) searchAllVersionsInNeoFS(ctx context.Context, bkt *data.BucketInfo, owner user.ID, objectName string, onlyUnversioned bool) ([]*object.Object, error) { + prmSearch := PrmObjectSearch{ + Container: bkt.CID, + Filters: make(object.SearchFilters, 0, 2), + } + + n.prepareAuthParameters(ctx, &prmSearch.PrmAuth, owner) + + if len(objectName) > 0 { + prmSearch.Filters.AddFilter(object.AttributeFilePath, objectName, object.MatchStringEqual) + } + + if onlyUnversioned { + prmSearch.Filters.AddFilter(attrS3VersioningState, data.VersioningUnversioned, object.MatchStringEqual) + } + + return n.searchObjects(ctx, bkt, prmSearch) +} + +// searchAllVersionsInNeoFS returns all version of object by its objectName. +// +// Returns ErrNodeNotFound if zero objects found. +func (n *layer) searchAllVersionsInNeoFSByPrefix(ctx context.Context, bkt *data.BucketInfo, owner user.ID, prefix string, onlyUnversioned bool) ([]*object.Object, error) { + prmSearch := PrmObjectSearch{ + Container: bkt.CID, + Filters: make(object.SearchFilters, 0, 2), + } + + n.prepareAuthParameters(ctx, &prmSearch.PrmAuth, owner) + + if len(prefix) > 0 { + prmSearch.Filters.AddFilter(object.AttributeFilePath, prefix, object.MatchCommonPrefix) + } + + if onlyUnversioned { + prmSearch.Filters.AddFilter(attrS3VersioningState, data.VersioningUnversioned, object.MatchStringEqual) + } + + return n.searchObjects(ctx, bkt, prmSearch) +} + +func (n *layer) searchObjects(ctx context.Context, bkt *data.BucketInfo, prmSearch PrmObjectSearch) ([]*object.Object, error) { + ids, err := n.neoFS.SearchObjects(ctx, prmSearch) + if err != nil { + if errors.Is(err, apistatus.ErrObjectAccessDenied) { + return nil, s3errors.GetAPIError(s3errors.ErrAccessDenied) + } + + return nil, fmt.Errorf("search object version: %w", err) + } + + if len(ids) == 0 { + return nil, ErrNodeNotFound + } + + var heads = make([]*object.Object, 0, len(ids)) + + for i := range ids { + head, err := n.objectHead(ctx, bkt, ids[i]) + if err != nil { + n.log.Warn("couldn't head object", + zap.Stringer("oid", &ids[i]), + zap.Stringer("cid", bkt.CID), + zap.Error(err)) + + return nil, fmt.Errorf("couldn't head object: %w", err) + } + + heads = append(heads, head) + } + + slices.SortFunc(heads, sortObjectsFunc) + + return heads, nil +} + +func sortObjectsFunc(a, b *object.Object) int { + if c := cmp.Compare(b.CreationEpoch(), a.CreationEpoch()); c != 0 { // reverse order. + return c + } + + var ( + aCreated int64 + bCreated int64 + ) + + for _, attr := range a.Attributes() { + if attr.Key() == object.AttributeTimestamp { + aCreated, _ = strconv.ParseInt(attr.Value(), 10, 64) + break + } + } + for _, attr := range b.Attributes() { + if attr.Key() == object.AttributeTimestamp { + bCreated, _ = strconv.ParseInt(attr.Value(), 10, 64) + break + } + } + + if c := cmp.Compare(bCreated, aCreated); c != 0 { // reverse order. + return c + } + + bID := b.GetID() + aID := a.GetID() + + // It is a temporary decision. We can't figure out what object was first and what the second right now. + return bytes.Compare(bID[:], aID[:]) // reverse order. +} + +func sortObjectsFuncByFilePath(a, b *object.Object) int { + var aPath string + var bPath string + + for _, attr := range a.Attributes() { + if attr.Key() == object.AttributeFilePath { + aPath = attr.Value() + } + } + for _, attr := range b.Attributes() { + if attr.Key() == object.AttributeFilePath { + bPath = attr.Value() + } + } + + return cmp.Compare(aPath, bPath) +} + +func (n *layer) searchLatestVersionsByPrefix(ctx context.Context, bkt *data.BucketInfo, owner user.ID, prefix string, onlyUnversioned bool) ([]*object.Object, error) { + heads, err := n.searchAllVersionsInNeoFSByPrefix(ctx, bkt, owner, prefix, onlyUnversioned) + if err != nil { + if errors.Is(err, apistatus.ErrObjectAccessDenied) { + return nil, s3errors.GetAPIError(s3errors.ErrAccessDenied) + } + + return nil, fmt.Errorf("get all versions by prefix: %w", err) + } + + var uniq = make(map[string]*object.Object, len(heads)) + + for _, head := range heads { + var filePath string + for _, attr := range head.Attributes() { + if attr.Key() == object.AttributeFilePath { + filePath = attr.Value() + break + } + } + + // take only first object, because it is the freshest one. + if _, ok := uniq[filePath]; !ok { + uniq[filePath] = head + } + } + + return maps.Values(uniq), nil +} + func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadObjectParams) (*data.ExtendedObjectInfo, error) { var err error - var foundVersion *data.NodeVersion + var foundVersion *object.Object if p.VersionID == data.UnversionedObjectVersionID { - foundVersion, err = n.treeService.GetUnversioned(ctx, bkt, p.Object) + versions, err := n.searchAllVersionsInNeoFS(ctx, bkt, bkt.Owner, p.Object, true) if err != nil { if errors.Is(err, ErrNodeNotFound) { return nil, s3errors.GetAPIError(s3errors.ErrNoSuchVersion) } return nil, err } + + foundVersion = versions[0] } else { - versions, err := n.treeService.GetVersions(ctx, bkt, p.Object) + versions, err := n.searchAllVersionsInNeoFS(ctx, bkt, bkt.Owner, p.Object, false) if err != nil { - return nil, fmt.Errorf("couldn't get versions: %w", err) + if errors.Is(err, ErrNodeNotFound) { + return nil, s3errors.GetAPIError(s3errors.ErrNoSuchVersion) + } + return nil, err } - for _, version := range versions { - if version.OID.EncodeToString() == p.VersionID { - foundVersion = version - break + if p.IsBucketVersioningEnabled { + for _, version := range versions { + if version.GetID().EncodeToString() == p.VersionID { + foundVersion = version + break + } + } + } else { + // If versioning is not enabled, user "should see" only last version of uploaded object. + if versions[0].GetID().EncodeToString() == p.VersionID { + foundVersion = versions[0] } } if foundVersion == nil { @@ -432,12 +621,13 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb } } + id := foundVersion.GetID() owner := n.Owner(ctx) - if extObjInfo := n.cache.GetObject(owner, newAddress(bkt.CID, foundVersion.OID)); extObjInfo != nil { + if extObjInfo := n.cache.GetObject(owner, newAddress(bkt.CID, id)); extObjInfo != nil { return extObjInfo, nil } - meta, err := n.objectHead(ctx, bkt, foundVersion.OID) + meta, err := n.objectHead(ctx, bkt, id) if err != nil { if errors.Is(err, apistatus.ErrObjectNotFound) { return nil, s3errors.GetAPIError(s3errors.ErrNoSuchVersion) @@ -448,7 +638,7 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb extObjInfo := &data.ExtendedObjectInfo{ ObjectInfo: objInfo, - NodeVersion: foundVersion, + NodeVersion: &data.NodeVersion{}, } n.cache.PutObject(owner, extObjInfo) @@ -592,35 +782,66 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) cacheKey := cache.CreateObjectsListCacheKey(p.Bucket.CID, p.Prefix, true) nodeVersions := n.cache.GetList(owner, cacheKey) + var rawHeads []*object.Object + if nodeVersions == nil { - nodeVersions, err = n.treeService.GetLatestVersionsByPrefix(ctx, p.Bucket, p.Prefix) + rawHeads, err = n.searchLatestVersionsByPrefix(ctx, p.Bucket, p.Bucket.Owner, p.Prefix, false) if err != nil { + if errors.Is(err, ErrNodeNotFound) { + return nil, nil, nil + } + return nil, nil, err } - n.cache.PutList(owner, cacheKey, nodeVersions) + + // nodeVersions, err = convert(heads) + // if err != nil { + // return nil, nil, fmt.Errorf("convert nodes: %w", err) + // } + + // nodeVersions, err = n.treeService.GetLatestVersionsByPrefix(ctx, p.Bucket, p.Prefix) + // if err != nil { + // return nil, nil, err + // } + // n.cache.PutList(owner, cacheKey, nodeVersions) } - if len(nodeVersions) == 0 { + if len(rawHeads) == 0 { return nil, nil, nil } - slices.SortFunc(nodeVersions, func(a, b *data.NodeVersion) int { - return cmp.Compare(a.FilePath, b.FilePath) - }) + slices.SortFunc(rawHeads, sortObjectsFuncByFilePath) + // + // poolCtx, cancel := context.WithCancel(ctx) + // defer cancel() + // objOutCh, err := n.initWorkerPool(poolCtx, 2, p, nodesGenerator(poolCtx, p, nodeVersions)) + // if err != nil { + // return nil, nil, fmt.Errorf("failed to init worker pool: %w", err) + // } - poolCtx, cancel := context.WithCancel(ctx) - defer cancel() - objOutCh, err := n.initWorkerPool(poolCtx, 2, p, nodesGenerator(poolCtx, p, nodeVersions)) - if err != nil { - return nil, nil, fmt.Errorf("failed to init worker pool: %w", err) - } + existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories - objects = make([]*data.ObjectInfo, 0, p.MaxKeys) + for _, head := range rawHeads { + if shouldSkip(head, p, existed) { + continue + } - for obj := range objOutCh { - objects = append(objects, obj) + var oi *data.ObjectInfo + if oi = tryDirectoryFromObject(p.Bucket, p.Prefix, p.Delimiter, *head); oi == nil { + oi = objectInfoFromMeta(p.Bucket, head) + } + + objects = append(objects, oi) } + // + + // objects = make([]*data.ObjectInfo, 0, p.MaxKeys) + // + // for obj := range objOutCh { + // objects = append(objects, obj) + // } + slices.SortFunc(objects, func(a, b *data.ObjectInfo) int { return cmp.Compare(a.Name, b.Name) }) @@ -633,33 +854,33 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) return } -func nodesGenerator(ctx context.Context, p allObjectParams, nodeVersions []*data.NodeVersion) <-chan *data.NodeVersion { - nodeCh := make(chan *data.NodeVersion) - existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories - - go func() { - var generated int - LOOP: - for _, node := range nodeVersions { - if shouldSkip(node, p, existed) { - continue - } - - select { - case <-ctx.Done(): - break LOOP - case nodeCh <- node: - generated++ - if generated == p.MaxKeys+1 { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken - break LOOP - } - } - } - close(nodeCh) - }() - - return nodeCh -} +// func nodesGenerator(ctx context.Context, p allObjectParams, nodeVersions []*data.NodeVersion) <-chan *data.NodeVersion { +// nodeCh := make(chan *data.NodeVersion) +// existed := make(map[string]struct{}, len(nodeVersions)) // to squash the same directories +// +// go func() { +// var generated int +// LOOP: +// for _, node := range nodeVersions { +// if shouldSkip(node, p, existed) { +// continue +// } +// +// select { +// case <-ctx.Done(): +// break LOOP +// case nodeCh <- node: +// generated++ +// if generated == p.MaxKeys+1 { // we use maxKeys+1 to be able to know nextMarker/nextContinuationToken +// break LOOP +// } +// } +// } +// close(nodeCh) +// }() +// +// return nodeCh +// } func (n *layer) initWorkerPool(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ObjectInfo, error) { pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{n.log})) @@ -744,31 +965,99 @@ func (n *layer) bucketNodeVersions(ctx context.Context, bkt *data.BucketInfo, pr } func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *data.BucketInfo, prefix, delimiter string) (map[string][]*data.ExtendedObjectInfo, error) { - nodeVersions, err := n.bucketNodeVersions(ctx, bkt, prefix) + nodeVersions, err := n.searchAllVersionsInNeoFS(ctx, bkt, bkt.Owner, prefix, false) if err != nil { return nil, err } + // nodeVersions, err := n.bucketNodeVersions(ctx, bkt, prefix) + // if err != nil { + // return nil, err + // } + versions := make(map[string][]*data.ExtendedObjectInfo, len(nodeVersions)) for _, nodeVersion := range nodeVersions { oi := &data.ObjectInfo{} - if nodeVersion.IsDeleteMarker() { // delete marker does not match any object in NeoFS - oi.ID = nodeVersion.OID - oi.Name = nodeVersion.FilePath - oi.Owner = nodeVersion.DeleteMarker.Owner - oi.Created = nodeVersion.DeleteMarker.Created + if isDeleteMarkerObject(*nodeVersion) { + oi.ID = nodeVersion.GetID() + oi.Name = prefix + if owner := nodeVersion.OwnerID(); owner != nil { + oi.Owner = *owner + } + + for _, attr := range nodeVersion.Attributes() { + if attr.Key() == object.AttributeTimestamp { + ts, err := strconv.ParseInt(attr.Value(), 10, 64) + if err != nil { + return nil, err + } + + oi.Created = time.Unix(ts, 0) + break + } + } + + // oi.Created = nodeVersion.DeleteMarker.Created oi.IsDeleteMarker = true } else { - if oi = n.objectInfoFromObjectsCacheOrNeoFS(ctx, bkt, nodeVersion, prefix, delimiter); oi == nil { + oi = objectInfoFromMeta(bkt, nodeVersion) + + nv := data.NodeVersion{ + BaseNodeVersion: data.BaseNodeVersion{ + OID: nodeVersion.GetID(), + FilePath: prefix, + }, + } + // + state := getS3VersioningState(*nodeVersion) + nv.IsUnversioned = state == data.VersioningUnversioned + + if oi = n.objectInfoFromObjectsCacheOrNeoFS(ctx, bkt, &nv, prefix, delimiter); oi == nil { continue } } + /* + if nodeVersion.IsDeleteMarker() { // delete marker does not match any object in NeoFS + oi.ID = nodeVersion.OID + oi.Name = nodeVersion.FilePath + oi.Owner = nodeVersion.DeleteMarker.Owner + oi.Created = nodeVersion.DeleteMarker.Created + oi.IsDeleteMarker = true + } else { + if oi = n.objectInfoFromObjectsCacheOrNeoFS(ctx, bkt, nodeVersion, prefix, delimiter); oi == nil { + continue + } + } + */ + + state := getS3VersioningState(*nodeVersion) + // nv.IsUnversioned = + eoi := &data.ExtendedObjectInfo{ - ObjectInfo: oi, - NodeVersion: nodeVersion, + ObjectInfo: oi, + NodeVersion: &data.NodeVersion{ + BaseNodeVersion: data.BaseNodeVersion{ + ID: 0, + ParenID: 0, + OID: oi.ID, + Timestamp: uint64(oi.Created.Unix()), + Size: 0, + ETag: "", + FilePath: oi.Name, + }, + // DeleteMarker: nil, + IsUnversioned: state == data.VersioningUnversioned, + }, + } + + if oi.IsDeleteMarker { + eoi.NodeVersion.DeleteMarker = &data.DeleteMarkerInfo{ + Created: oi.Created, + Owner: *nodeVersion.OwnerID(), + } } objVersions, ok := versions[oi.Name] @@ -788,13 +1077,13 @@ func IsSystemHeader(key string) bool { return ok || strings.HasPrefix(key, api.NeoFSSystemMetadataPrefix) } -func shouldSkip(node *data.NodeVersion, p allObjectParams, existed map[string]struct{}) bool { - if node.IsDeleteMarker() { +func shouldSkip(head *object.Object, p allObjectParams, existed map[string]struct{}) bool { + if isDeleteMarkerObject(*head) { return true } - filePath := node.FilePath - if dirName := tryDirectoryName(node, p.Prefix, p.Delimiter); len(dirName) != 0 { + filePath := filePathFromAttributes(*head) + if dirName := tryDirectoryName(filePath, p.Prefix, p.Delimiter); len(dirName) != 0 { filePath = dirName } if _, ok := existed[filePath]; ok { @@ -807,7 +1096,7 @@ func shouldSkip(node *data.NodeVersion, p allObjectParams, existed map[string]st if p.ContinuationToken != "" { if _, ok := existed[continuationToken]; !ok { - if p.ContinuationToken != node.OID.EncodeToString() { + if p.ContinuationToken != head.GetID().EncodeToString() { return true } existed[continuationToken] = struct{}{} @@ -865,7 +1154,7 @@ func (n *layer) objectInfoFromObjectsCacheOrNeoFS(ctx context.Context, bktInfo * } func tryDirectory(bktInfo *data.BucketInfo, node *data.NodeVersion, prefix, delimiter string) *data.ObjectInfo { - dirName := tryDirectoryName(node, prefix, delimiter) + dirName := tryDirectoryName(node.FilePath, prefix, delimiter) if len(dirName) == 0 { return nil } @@ -880,15 +1169,33 @@ func tryDirectory(bktInfo *data.BucketInfo, node *data.NodeVersion, prefix, deli } } +func tryDirectoryFromObject(bktInfo *data.BucketInfo, prefix, delimiter string, head object.Object) *data.ObjectInfo { + nv := convert(head) + + dirName := tryDirectoryName(nv.FilePath, prefix, delimiter) + if len(dirName) == 0 { + return nil + } + + return &data.ObjectInfo{ + ID: nv.OID, // to use it as continuation token + CID: bktInfo.CID, + IsDir: true, + IsDeleteMarker: nv.IsDeleteMarker(), + Bucket: bktInfo.Name, + Name: dirName, + } +} + // tryDirectoryName forms directory name by prefix and delimiter. // If node isn't a directory empty string is returned. // This function doesn't check if node has a prefix. It must do a caller. -func tryDirectoryName(node *data.NodeVersion, prefix, delimiter string) string { +func tryDirectoryName(filePath string, prefix, delimiter string) string { if len(delimiter) == 0 { return "" } - tail := strings.TrimPrefix(node.FilePath, prefix) + tail := strings.TrimPrefix(filePath, prefix) index := strings.Index(tail, delimiter) if index >= 0 { return prefix + tail[:index+1] @@ -896,3 +1203,36 @@ func tryDirectoryName(node *data.NodeVersion, prefix, delimiter string) string { return "" } + +func convert(head object.Object) data.NodeVersion { + // var result = make([]*data.NodeVersion, 0, len(objs)) + nv := data.NodeVersion{ + BaseNodeVersion: data.BaseNodeVersion{ + OID: head.GetID(), + }, + } + + for _, attr := range head.Attributes() { + switch attr.Key() { + case object.AttributeFilePath: + nv.BaseNodeVersion.FilePath = attr.Value() + case attrS3DeleteMarker: + nv.DeleteMarker = &data.DeleteMarkerInfo{ + Created: time.Time{}, + Owner: *head.OwnerID(), + } + } + } + + return nv +} + +func filePathFromAttributes(head object.Object) string { + for _, attr := range head.Attributes() { + if attr.Key() == object.AttributeFilePath { + return attr.Value() + } + } + + return "" +} diff --git a/api/layer/versioning.go b/api/layer/versioning.go index 3a0cddc6..e2f2cb0a 100644 --- a/api/layer/versioning.go +++ b/api/layer/versioning.go @@ -3,6 +3,7 @@ package layer import ( "cmp" "context" + "errors" "slices" "github.com/nspcc-dev/neofs-s3-gw/api/data" @@ -14,8 +15,13 @@ func (n *layer) ListObjectVersions(ctx context.Context, p *ListObjectVersionsPar res = &ListObjectVersionsInfo{} ) + // versions, err := n.searchAllVersionsInNeoFS(ctx, p.BktInfo, p.BktInfo.Owner, p.Prefix, false) + versions, err := n.getAllObjectsVersions(ctx, p.BktInfo, p.Prefix, p.Delimiter) if err != nil { + if errors.Is(err, ErrNodeNotFound) { + return res, nil + } return nil, err } diff --git a/go.mod b/go.mod index 7af26bf0..f9156cf2 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/urfave/cli/v2 v2.27.4 go.uber.org/zap v1.27.0 golang.org/x/crypto v0.31.0 + golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 golang.org/x/sync v0.10.0 google.golang.org/grpc v1.66.0 google.golang.org/protobuf v1.34.2 @@ -47,7 +48,6 @@ require ( github.com/twmb/murmur3 v1.1.8 // indirect github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect go.etcd.io/bbolt v1.3.11 // indirect - golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed // indirect )