diff --git a/api/layer/layer.go b/api/layer/layer.go index e7ccb0c3..06fcb023 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -690,7 +690,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings } for _, version := range versions { - if obj.Error = n.objectDelete(ctx, bkt, version.GetID()); obj.Error != nil { + if obj.Error = n.objectDelete(ctx, bkt, version.ID); obj.Error != nil { return obj } } @@ -729,7 +729,7 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings } for _, version := range versions { - if obj.Error = n.objectDelete(ctx, bkt, version.GetID()); obj.Error != nil { + if obj.Error = n.objectDelete(ctx, bkt, version.ID); obj.Error != nil { return obj } } @@ -750,8 +750,8 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings if obj.VersionID == "" { for _, ver := range versions { - if obj.Error = n.objectDelete(ctx, bkt, ver.GetID()); obj.Error != nil { - n.log.Error("could not delete object", zap.Error(obj.Error), zap.Stringer("oid", ver.GetID())) + if obj.Error = n.objectDelete(ctx, bkt, ver.ID); obj.Error != nil { + n.log.Error("could not delete object", zap.Error(obj.Error), zap.Stringer("oid", ver.ID)) if isErrObjectAlreadyRemoved(obj.Error) { obj.Error = nil continue @@ -762,8 +762,8 @@ func (n *layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings } } else { for _, ver := range versions { - if ver.GetID().EncodeToString() == obj.VersionID { - if obj.Error = n.objectDelete(ctx, bkt, ver.GetID()); obj.Error != nil { + if ver.ID.EncodeToString() == obj.VersionID { + if obj.Error = n.objectDelete(ctx, bkt, ver.ID); obj.Error != nil { return obj } diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index c4126bf5..1ce92ac2 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -960,7 +960,6 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar prm := PrmObjectCreate{ Container: p.Info.Bkt.CID, Creator: p.Info.Bkt.Owner, - Filepath: p.Info.Key, CreationTime: TimeNow(ctx), CopiesNumber: multipartInfo.CopiesNumber, Multipart: &Multipart{ diff --git a/api/layer/object.go b/api/layer/object.go index e60624ec..a84fc0c9 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -93,6 +93,16 @@ type ( CreationEpoch uint64 CreationTimestamp int64 } + + allVersionsSearchResult struct { + ID oid.ID + FilePath string + CreationEpoch uint64 + CreationTimestamp int64 + PayloadSize int64 + IsDeleteMarker bool + IsVersioned bool + } ) const ( @@ -413,11 +423,16 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.Bucke return nil, err } - if isDeleteMarkerObject(*heads[0]) { + if heads[0].IsDeleteMarker { return nil, s3errors.GetAPIError(s3errors.ErrNoSuchKey) } - objInfo := objectInfoFromMeta(bkt, heads[0]) // latest version. + meta, err := n.objectHead(ctx, bkt, heads[0].ID) // latest version. + if err != nil { + return nil, fmt.Errorf("get head failed: %w", err) + } + + objInfo := objectInfoFromMeta(bkt, meta) extObjInfo := &data.ExtendedObjectInfo{ ObjectInfo: objInfo, @@ -432,25 +447,107 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.Bucke // searchAllVersionsInNeoFS returns all version of object by its objectName. // // Returns ErrNodeNotFound if zero objects found. -func (n *layer) searchAllVersionsInNeoFS(ctx context.Context, bkt *data.BucketInfo, owner user.ID, objectName string, onlyUnversioned bool) ([]*object.Object, error) { - prmSearch := PrmObjectSearch{ - Container: bkt.CID, - Filters: make(object.SearchFilters, 0, 4), - } +func (n *layer) searchAllVersionsInNeoFS(ctx context.Context, bkt *data.BucketInfo, owner user.ID, objectName string, onlyUnversioned bool) ([]allVersionsSearchResult, error) { + var ( + filters = make(object.SearchFilters, 0, 6) + returningAttributes = []string{ + object.AttributeFilePath, + object.FilterCreationEpoch, + object.AttributeTimestamp, + attrS3VersioningState, + object.FilterPayloadSize, + attrS3DeleteMarker, + } - n.prepareAuthParameters(ctx, &prmSearch.PrmAuth, owner) - prmSearch.Filters.AddTypeFilter(object.MatchStringEqual, object.TypeRegular) - prmSearch.Filters.AddFilter(attributeTagsMetaObject, "", object.MatchNotPresent) + opts client.SearchObjectsOptions + ) + + if bt := bearerTokenFromContext(ctx, owner); bt != nil { + opts.WithBearerToken(*bt) + } if len(objectName) > 0 { - prmSearch.Filters.AddFilter(object.AttributeFilePath, objectName, object.MatchStringEqual) + filters.AddFilter(object.AttributeFilePath, objectName, object.MatchStringEqual) + } else { + filters.AddFilter(object.AttributeFilePath, "", object.MatchCommonPrefix) } + filters.AddTypeFilter(object.MatchStringEqual, object.TypeRegular) + filters.AddFilter(attributeTagsMetaObject, "", object.MatchNotPresent) + if onlyUnversioned { - prmSearch.Filters.AddFilter(attrS3VersioningState, data.VersioningUnversioned, object.MatchNotPresent) + filters.AddFilter(attrS3VersioningState, data.VersioningUnversioned, object.MatchNotPresent) + } + + searchResultItems, err := n.neoFS.SearchObjectsV2(ctx, bkt.CID, filters, returningAttributes, opts) + if err != nil { + if errors.Is(err, apistatus.ErrObjectAccessDenied) { + return nil, s3errors.GetAPIError(s3errors.ErrAccessDenied) + } + + return nil, fmt.Errorf("search object version: %w", err) + } + + if len(searchResultItems) == 0 { + return nil, ErrNodeNotFound + } + + var searchResults = make([]allVersionsSearchResult, 0, len(searchResultItems)) + + for _, item := range searchResultItems { + if len(item.Attributes) != len(returningAttributes) { + return nil, fmt.Errorf("invalid attribute count returned, expected %d, got %d", len(returningAttributes), len(item.Attributes)) + } + + var psr = allVersionsSearchResult{ + ID: item.ID, + FilePath: item.Attributes[0], + } + + if item.Attributes[1] != "" { + psr.CreationEpoch, err = strconv.ParseUint(item.Attributes[1], 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid creation epoch %s: %w", item.Attributes[1], err) + } + } + + if item.Attributes[2] != "" { + psr.CreationTimestamp, err = strconv.ParseInt(item.Attributes[2], 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid creation timestamp %s: %w", item.Attributes[2], err) + } + } + + psr.IsVersioned = item.Attributes[3] == data.VersioningEnabled + + if item.Attributes[4] != "" { + psr.PayloadSize, err = strconv.ParseInt(item.Attributes[4], 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid payload size %s: %w", item.Attributes[4], err) + } + } + + psr.IsDeleteMarker = item.Attributes[5] != "" + + searchResults = append(searchResults, psr) } - return n.searchObjects(ctx, bkt, prmSearch) + sortFunc := func(a, b allVersionsSearchResult) int { + if c := cmp.Compare(b.CreationEpoch, a.CreationEpoch); c != 0 { // reverse order. + return c + } + + if c := cmp.Compare(b.CreationTimestamp, a.CreationTimestamp); c != 0 { // reverse order. + return c + } + + // It is a temporary decision. We can't figure out what object was first and what the second right now. + return bytes.Compare(b.ID[:], a.ID[:]) // reverse order. + } + + slices.SortFunc(searchResults, sortFunc) + + return searchResults, nil } // searchAllVersionsInNeoFS returns all version of object by its objectName. @@ -572,98 +669,6 @@ func (n *layer) searchAllVersionsInNeoFSByPrefix(ctx context.Context, bkt *data. return searchResults, nextCursor, nil } -func (n *layer) searchObjects(ctx context.Context, bkt *data.BucketInfo, prmSearch PrmObjectSearch) ([]*object.Object, error) { - ids, err := n.neoFS.SearchObjects(ctx, prmSearch) - if err != nil { - if errors.Is(err, apistatus.ErrObjectAccessDenied) { - return nil, s3errors.GetAPIError(s3errors.ErrAccessDenied) - } - - return nil, fmt.Errorf("search object version: %w", err) - } - - if len(ids) == 0 { - return nil, ErrNodeNotFound - } - - var heads = make([]*object.Object, 0, len(ids)) - - for i := range ids { - head, err := n.objectHead(ctx, bkt, ids[i]) - if err != nil { - n.log.Warn("couldn't head object", - zap.Stringer("oid", &ids[i]), - zap.Stringer("cid", bkt.CID), - zap.Error(err)) - - return nil, fmt.Errorf("couldn't head object: %w", err) - } - - // The object is a part of split chain, it doesn't exist for user. - if head.HasParent() { - continue - } - - heads = append(heads, head) - } - - slices.SortFunc(heads, sortObjectsFunc) - - return heads, nil -} - -func sortObjectsFunc(a, b *object.Object) int { - if c := cmp.Compare(b.CreationEpoch(), a.CreationEpoch()); c != 0 { // reverse order. - return c - } - - var ( - aCreated int64 - bCreated int64 - ) - - for _, attr := range a.Attributes() { - if attr.Key() == object.AttributeTimestamp { - aCreated, _ = strconv.ParseInt(attr.Value(), 10, 64) - break - } - } - for _, attr := range b.Attributes() { - if attr.Key() == object.AttributeTimestamp { - bCreated, _ = strconv.ParseInt(attr.Value(), 10, 64) - break - } - } - - if c := cmp.Compare(bCreated, aCreated); c != 0 { // reverse order. - return c - } - - bID := b.GetID() - aID := a.GetID() - - // It is a temporary decision. We can't figure out what object was first and what the second right now. - return bytes.Compare(bID[:], aID[:]) // reverse order. -} - -func sortObjectsFuncByFilePath(a, b *object.Object) int { - var aPath string - var bPath string - - for _, attr := range a.Attributes() { - if attr.Key() == object.AttributeFilePath { - aPath = attr.Value() - } - } - for _, attr := range b.Attributes() { - if attr.Key() == object.AttributeFilePath { - bPath = attr.Value() - } - } - - return cmp.Compare(aPath, bPath) -} - func (n *layer) searchLatestVersionsByPrefix(ctx context.Context, bkt *data.BucketInfo, owner user.ID, prefix, cursor string, maxKeys int, onlyUnversioned bool) ([]prefixSearchResult, string, error) { searchResults, nextCursor, err := n.searchAllVersionsInNeoFSByPrefix(ctx, bkt, owner, prefix, cursor, maxKeys, onlyUnversioned) if err != nil { @@ -688,7 +693,7 @@ func (n *layer) searchLatestVersionsByPrefix(ctx context.Context, bkt *data.Buck func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadObjectParams) (*data.ExtendedObjectInfo, error) { var err error - var foundVersion *object.Object + var foundVersion *allVersionsSearchResult if p.VersionID == data.UnversionedObjectVersionID { versions, err := n.searchAllVersionsInNeoFS(ctx, bkt, bkt.Owner, p.Object, true) if err != nil { @@ -698,7 +703,7 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb return nil, err } - foundVersion = versions[0] + foundVersion = &versions[0] } else { versions, err := n.searchAllVersionsInNeoFS(ctx, bkt, bkt.Owner, p.Object, false) if err != nil { @@ -710,15 +715,15 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb if p.IsBucketVersioningEnabled { for _, version := range versions { - if version.GetID().EncodeToString() == p.VersionID { - foundVersion = version + if version.ID.EncodeToString() == p.VersionID { + foundVersion = &version break } } } else { // If versioning is not enabled, user "should see" only last version of uploaded object. - if versions[0].GetID().EncodeToString() == p.VersionID { - foundVersion = versions[0] + if versions[0].ID.EncodeToString() == p.VersionID { + foundVersion = &versions[0] } } if foundVersion == nil { @@ -726,7 +731,7 @@ func (n *layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb } } - id := foundVersion.GetID() + id := foundVersion.ID owner := n.Owner(ctx) if extObjInfo := n.cache.GetObject(owner, newAddress(bkt.CID, id)); extObjInfo != nil { return extObjInfo, nil @@ -980,55 +985,38 @@ func (n *layer) getLatestObjectsVersions(ctx context.Context, p allObjectParams) } func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *data.BucketInfo, prefix, delimiter string) (map[string][]*data.ExtendedObjectInfo, error) { - nodeVersions, err := n.searchAllVersionsInNeoFS(ctx, bkt, bkt.Owner, prefix, false) + searchResults, err := n.searchAllVersionsInNeoFS(ctx, bkt, bkt.Owner, prefix, false) if err != nil { return nil, err } - versions := make(map[string][]*data.ExtendedObjectInfo, len(nodeVersions)) + versions := make(map[string][]*data.ExtendedObjectInfo, len(searchResults)) - for _, nodeVersion := range nodeVersions { + for _, ver := range searchResults { oi := &data.ObjectInfo{} - if isDeleteMarkerObject(*nodeVersion) { - oi.ID = nodeVersion.GetID() - oi.Name = filepathFromObject(nodeVersion) - oi.Size = int64(nodeVersion.PayloadSize()) - if owner := nodeVersion.OwnerID(); owner != nil { - oi.Owner = *owner - } - - for _, attr := range nodeVersion.Attributes() { - if attr.Key() == object.AttributeTimestamp { - ts, err := strconv.ParseInt(attr.Value(), 10, 64) - if err != nil { - return nil, err - } - - oi.Created = time.Unix(ts, 0) - break - } - } - + if ver.IsDeleteMarker { + oi.ID = ver.ID + oi.Name = ver.FilePath + oi.Size = ver.PayloadSize + oi.Owner = bkt.Owner + oi.Created = time.Unix(ver.CreationTimestamp, 0) oi.IsDeleteMarker = true } else { nv := data.NodeVersion{ BaseNodeVersion: data.BaseNodeVersion{ - OID: nodeVersion.GetID(), + OID: ver.ID, FilePath: prefix, }, } - state := getS3VersioningState(*nodeVersion) - nv.IsUnversioned = state == data.VersioningUnversioned + nv.IsUnversioned = !ver.IsVersioned if oi = n.objectInfoFromObjectsCacheOrNeoFS(ctx, bkt, &nv, prefix, delimiter); oi == nil { continue } } - state := getS3VersioningState(*nodeVersion) - eoi := &data.ExtendedObjectInfo{ ObjectInfo: oi, NodeVersion: &data.NodeVersion{ @@ -1041,14 +1029,14 @@ func (n *layer) getAllObjectsVersions(ctx context.Context, bkt *data.BucketInfo, ETag: "", FilePath: oi.Name, }, - IsUnversioned: state == data.VersioningUnversioned, + IsUnversioned: !ver.IsVersioned, }, } if oi.IsDeleteMarker { eoi.NodeVersion.DeleteMarker = &data.DeleteMarkerInfo{ Created: oi.Created, - Owner: *nodeVersion.OwnerID(), + Owner: bkt.Owner, } } diff --git a/api/layer/system_object.go b/api/layer/system_object.go index 24236b93..1e06788a 100644 --- a/api/layer/system_object.go +++ b/api/layer/system_object.go @@ -60,7 +60,7 @@ func (n *layer) PutLockInfo(ctx context.Context, p *PutLockInfoParams) (err erro return err } - objectToLock := objList[0].GetID() + objectToLock := objList[0].ID if newLock.Retention != nil { if lockInfo.IsRetentionSet() {