diff --git a/api/data/tree.go b/api/data/tree.go index 7796a69b..125af5e8 100644 --- a/api/data/tree.go +++ b/api/data/tree.go @@ -1,7 +1,9 @@ package data import ( + "fmt" "strconv" + "strings" "time" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" @@ -75,7 +77,38 @@ type MultipartInfo struct { Created time.Time Meta map[string]string CopiesNumber uint32 - SplitID string +} + +// LinkObjectPayload contains part info of the complex object. +// This data will be used for linking object construction. +type LinkObjectPayload struct { + OID oid.ID + Size uint32 +} + +// Marshal converts LinkObjectPayload to string. +func (e *LinkObjectPayload) Marshal() string { + return fmt.Sprintf("%s:%d", e.OID.String(), e.Size) +} + +// Unmarshal converts string to LinkObjectPayload. +func (e *LinkObjectPayload) Unmarshal(value string) error { + parts := strings.Split(value, ":") + if len(parts) != 2 { + return fmt.Errorf("invalid format: %s", value) + } + + if err := e.OID.DecodeString(parts[0]); err != nil { + return fmt.Errorf("invalid id: %w", err) + } + + size, err := strconv.ParseUint(parts[1], 10, 32) + if err != nil { + return fmt.Errorf("invalid size: %w", err) + } + + e.Size = uint32(size) + return nil } // PartInfo is upload information about part. @@ -95,8 +128,10 @@ type PartInfo struct { MultipartHash []byte // HomoHash contains internal state of the [hash.Hash] to calculate whole object homomorphic payload hash. HomoHash []byte - // Elements contain [oid.ID] object list for the current part. - Elements []oid.ID + // Elements contain [oid.ID] and size for each element for the current part. + Elements []LinkObjectPayload + // FirstSplitOID contains first object part in the split chain. + FirstSplitOID oid.ID } // ToHeaderString form short part representation to use in S3-Completed-Parts header. diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index 141caf0b..0dc90a66 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -152,7 +152,6 @@ func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartPar Created: TimeNow(ctx), Meta: make(map[string]string, metaSize), CopiesNumber: p.CopiesNumber, - SplitID: object.NewSplitID().String(), } for key, val := range p.Header { @@ -229,6 +228,7 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf var ( splitPreviousID oid.ID + splitFirstID oid.ID isSetSplitPreviousID bool multipartHash = sha256.New() tzHash hash.Hash @@ -238,7 +238,7 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf tzHash = tz.New() } - lastPart, err := n.treeService.GetLastPart(ctx, bktInfo, multipartInfo.ID) + lastPart, err := n.treeService.GetPartByNumber(ctx, bktInfo, multipartInfo.ID, p.PartNumber-1) if err != nil { // if ErrPartListIsEmpty, there is the first part of multipart. if !errors.Is(err, ErrPartListIsEmpty) { @@ -261,11 +261,12 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf isSetSplitPreviousID = true splitPreviousID = lastPart.OID + splitFirstID = lastPart.FirstSplitOID } var ( id oid.ID - elements []oid.ID + elements []data.LinkObjectPayload creationTime = TimeNow(ctx) // User may upload part large maxObjectSize in NeoFS. From users point of view it is a single object. // We have to calculate the hash from this object separately. @@ -284,20 +285,29 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf CreationTime: creationTime, CopiesNumber: multipartInfo.CopiesNumber, Multipart: &Multipart{ - SplitID: multipartInfo.SplitID, MultipartHashes: objHashes, }, } - chunk := n.buffers.Get().(*[]byte) + if lastPart != nil { + splitFirstID = lastPart.FirstSplitOID + } + chunk := n.buffers.Get().(*[]byte) + var totalBytes int // slice part manually. Simultaneously considering the part is a single object for user. for { if isSetSplitPreviousID { prm.Multipart.SplitPreviousID = &splitPreviousID } + if !splitFirstID.Equals(oid.ID{}) { + prm.Multipart.SplitFirstID = &splitFirstID + } + nBts, readErr := io.ReadAtLeast(payloadReader, *chunk, len(*chunk)) + totalBytes += nBts + if nBts > 0 { prm.Payload = bytes.NewReader((*chunk)[:nBts]) prm.PayloadSize = uint64(nBts) @@ -307,9 +317,13 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf return nil, err } + if splitFirstID.Equals(oid.ID{}) { + splitFirstID = id + } + isSetSplitPreviousID = true splitPreviousID = id - elements = append(elements, id) + elements = append(elements, data.LinkObjectPayload{OID: id, Size: uint32(nBts)}) } if readErr == nil { @@ -344,6 +358,10 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf Elements: elements, } + if !splitFirstID.Equals(oid.ID{}) { + partInfo.FirstSplitOID = splitFirstID + } + // encoding hash.Hash state to save it in tree service. // the required interface is guaranteed according to the docs, so just cast without checks. binaryMarshaler := multipartHash.(encoding.BinaryMarshaler) @@ -506,8 +524,9 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar var multipartObjetSize int64 var encMultipartObjectSize uint64 var lastPartID int - var children []oid.ID var completedPartsHeader strings.Builder + // +1 is the last part, it will be created later in the code. + var measuredObjects = make([]object.MeasuredObject, 0, len(p.Parts)+1) for i, part := range p.Parts { partInfo := partsInfo[part.PartNumber] if partInfo == nil || part.ETag != partInfo.ETag { @@ -539,12 +558,19 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar lastPartID = part.PartNumber } - children = append(children, partInfo.Elements...) + for _, element := range partInfo.Elements { + // Collecting payload for the link object. + var mObj object.MeasuredObject + mObj.SetObjectID(element.OID) + mObj.SetObjectSize(element.Size) + measuredObjects = append(measuredObjects, mObj) + } } multipartHash := sha256.New() var homoHash hash.Hash var splitPreviousID oid.ID + var splitFirstID oid.ID if lastPartID > 0 { lastPart := partsInfo[lastPartID] @@ -552,6 +578,7 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar if lastPart != nil { if len(lastPart.MultipartHash) > 0 { splitPreviousID = lastPart.OID + splitFirstID = lastPart.FirstSplitOID if len(lastPart.MultipartHash) > 0 { binaryUnmarshaler := multipartHash.(encoding.BinaryUnmarshaler) @@ -623,7 +650,7 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar CreationTime: TimeNow(ctx), CopiesNumber: multipartInfo.CopiesNumber, Multipart: &Multipart{ - SplitID: multipartInfo.SplitID, + SplitFirstID: &splitFirstID, SplitPreviousID: &splitPreviousID, HeaderObject: header, }, @@ -635,7 +662,13 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar return nil, nil, err } - children = append(children, lastPartObjID) + var mObj object.MeasuredObject + // last part has the zero length. + mObj.SetObjectID(lastPartObjID) + measuredObjects = append(measuredObjects, mObj) + + var linkObj = object.Link{} + linkObj.SetObjects(measuredObjects) // linking object prm = PrmObjectCreate{ @@ -644,11 +677,10 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar CreationTime: TimeNow(ctx), CopiesNumber: multipartInfo.CopiesNumber, Multipart: &Multipart{ - SplitID: multipartInfo.SplitID, HeaderObject: header, - Children: children, + SplitFirstID: &splitFirstID, + Link: &linkObj, }, - Payload: bytes.NewBuffer(nil), } _, _, err = n.objectPutAndHash(ctx, prm, p.Info.Bkt) diff --git a/api/layer/neofs.go b/api/layer/neofs.go index dd1fee0c..700548a3 100644 --- a/api/layer/neofs.go +++ b/api/layer/neofs.go @@ -123,15 +123,15 @@ type PrmObjectCreate struct { type Multipart struct { // MultipartHashes contains hashes for the multipart object payload calculation (optional). MultipartHashes []hash.Hash - // SplitID contains splitID for multipart object (optional). - SplitID string // SplitPreviousID contains [oid.ID] of previous object in chain (optional). SplitPreviousID *oid.ID - // Children contains all objects in multipart chain, for linking object (optional). - Children []oid.ID + // SplitFirstID contains [oid.ID] of the first object in chain (The first object has nil here). + SplitFirstID *oid.ID // HeaderObject is a virtual representation of complete multipart object (optional). It is used to set Parent in // linking object. HeaderObject *object.Object + // Link contains info for linking object. + Link *object.Link } // PrmObjectDelete groups parameters of NeoFS.DeleteObject operation. diff --git a/api/layer/neofs_mock.go b/api/layer/neofs_mock.go index 6ac92f23..d597d778 100644 --- a/api/layer/neofs_mock.go +++ b/api/layer/neofs_mock.go @@ -270,19 +270,13 @@ func (t *TestNeoFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID obj.SetOwnerID(&prm.Creator) t.currentEpoch++ - if prm.Multipart != nil && prm.Multipart.SplitID != "" { - var split object.SplitID - if err := split.Parse(prm.Multipart.SplitID); err != nil { - return oid.ID{}, fmt.Errorf("split parse: %w", err) - } - obj.SetSplitID(&split) - + if prm.Multipart != nil { if prm.Multipart.SplitPreviousID != nil { obj.SetPreviousID(*prm.Multipart.SplitPreviousID) } - if len(prm.Multipart.Children) > 0 { - obj.SetChildren(prm.Multipart.Children...) + if prm.Multipart.SplitFirstID != nil { + obj.SetFirstID(*prm.Multipart.SplitFirstID) } if prm.Multipart.HeaderObject != nil { @@ -294,6 +288,12 @@ func (t *TestNeoFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID obj.SetParentID(id) obj.SetParent(prm.Multipart.HeaderObject) } + + if prm.Multipart.Link != nil { + obj.WriteLink(*prm.Multipart.Link) + prm.Payload = bytes.NewReader(obj.Payload()) + obj.SetPayloadSize(uint64(len(obj.Payload()))) + } } if len(prm.Locks) > 0 { diff --git a/api/layer/tree_mock.go b/api/layer/tree_mock.go index 59ab5fdd..0cb81f5c 100644 --- a/api/layer/tree_mock.go +++ b/api/layer/tree_mock.go @@ -363,7 +363,7 @@ LOOP: return result, nil } -func (t *TreeServiceMock) GetLastPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) (*data.PartInfo, error) { +func (t *TreeServiceMock) GetPartByNumber(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, number int) (*data.PartInfo, error) { parts, err := t.GetParts(ctx, bktInfo, multipartNodeID) if err != nil { return nil, fmt.Errorf("get parts: %w", err) @@ -390,7 +390,18 @@ func (t *TreeServiceMock) GetLastPart(ctx context.Context, bktInfo *data.BucketI return 1 }) - return parts[len(parts)-1], nil + var pi *data.PartInfo + for _, part := range parts { + if part.Number != number { + continue + } + + if pi == nil || pi.ServerCreated.Before(part.ServerCreated) { + pi = part + } + } + + return pi, nil } func (t *TreeServiceMock) GetPartsAfter(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, partID int) ([]*data.PartInfo, error) { diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go index 4494cf3b..93f9ae6c 100644 --- a/api/layer/tree_service.go +++ b/api/layer/tree_service.go @@ -74,11 +74,10 @@ type TreeService interface { // If object id to remove is not found returns ErrNoNodeToRemove error. AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) - // GetLastPart returns the latest uploaded part. - // + // GetPartByNumber returns the part by number. If part was uploaded few times the newest one will be returned. // Return errors: // - [ErrPartListIsEmpty] if there is no parts in the upload id. - GetLastPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) (*data.PartInfo, error) + GetPartByNumber(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, number int) (*data.PartInfo, error) GetPartsAfter(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, partID int) ([]*data.PartInfo, error) // Compound methods for optimizations diff --git a/internal/neofs/neofs.go b/internal/neofs/neofs.go index 40993933..d298d184 100644 --- a/internal/neofs/neofs.go +++ b/internal/neofs/neofs.go @@ -274,19 +274,13 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oi obj.SetAttributes(attrs...) obj.SetPayloadSize(prm.PayloadSize) - if prm.Multipart != nil && prm.Multipart.SplitID != "" { - var split object.SplitID - if err := split.Parse(prm.Multipart.SplitID); err != nil { - return oid.ID{}, fmt.Errorf("parse split ID: %w", err) - } - obj.SetSplitID(&split) - + if prm.Multipart != nil { if prm.Multipart.SplitPreviousID != nil { obj.SetPreviousID(*prm.Multipart.SplitPreviousID) } - if len(prm.Multipart.Children) > 0 { - obj.SetChildren(prm.Multipart.Children...) + if prm.Multipart.SplitFirstID != nil { + obj.SetFirstID(*prm.Multipart.SplitFirstID) } if prm.Multipart.HeaderObject != nil { @@ -298,6 +292,15 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oi obj.SetParentID(id) obj.SetParent(prm.Multipart.HeaderObject) } + + if prm.Multipart.Link != nil { + obj.WriteLink(*prm.Multipart.Link) + prm.Payload = bytes.NewReader(obj.Payload()) + obj.SetPayloadSize(uint64(len(obj.Payload()))) + + // Link object should never have a previous one. + obj.ResetPreviousID() + } } if len(prm.Locks) > 0 { diff --git a/internal/neofs/tree.go b/internal/neofs/tree.go index 76fa7aba..7a4e7204 100644 --- a/internal/neofs/tree.go +++ b/internal/neofs/tree.go @@ -52,11 +52,11 @@ const ( versioningKV = "Versioning" lockConfigurationKV = "LockConfiguration" oidKV = "OID" + firstSplitOidKV = "FirstSplitOID" fileNameKV = "FileName" isUnversionedKV = "IsUnversioned" isTagKV = "IsTag" uploadIDKV = "UploadId" - splitIDKV = "SplitId" partNumberKV = "Number" sizeKV = "Size" etagKV = "ETag" @@ -226,8 +226,6 @@ func newMultipartInfo(node NodeResponse) (*data.MultipartInfo, error) { } case ownerKV: _ = multipartInfo.Owner.DecodeString(string(kv.GetValue())) - case splitIDKV: - multipartInfo.SplitID = string(kv.GetValue()) default: multipartInfo.Meta[kv.GetKey()] = string(kv.GetValue()) } @@ -255,6 +253,10 @@ func newPartInfo(node NodeResponse) (*data.PartInfo, error) { if err = partInfo.OID.DecodeString(value); err != nil { return nil, fmt.Errorf("invalid oid: %w", err) } + case firstSplitOidKV: + if err = partInfo.FirstSplitOID.DecodeString(value); err != nil { + return nil, fmt.Errorf("invalid FirstSplitOID: %w", err) + } case etagKV: partInfo.ETag = value case sizeKV: @@ -279,14 +281,14 @@ func newPartInfo(node NodeResponse) (*data.PartInfo, error) { partInfo.HomoHash = []byte(value) case elementsKV: elements := strings.Split(value, ",") - partInfo.Elements = make([]oid.ID, len(elements)) + partInfo.Elements = make([]data.LinkObjectPayload, len(elements)) for i, e := range elements { - var id oid.ID - if err = id.DecodeString(e); err != nil { - return nil, fmt.Errorf("invalid oid: %w", err) + var element data.LinkObjectPayload + if err = element.Unmarshal(e); err != nil { + return nil, fmt.Errorf("invalid element: %w", err) } - partInfo.Elements[i] = id + partInfo.Elements[i] = element } } } @@ -933,12 +935,13 @@ func (c *TreeClient) AddPart(ctx context.Context, bktInfo *data.BucketInfo, mult elements := make([]string, len(info.Elements)) for i, e := range info.Elements { - elements[i] = e.String() + elements[i] = e.Marshal() } meta := map[string]string{ partNumberKV: strconv.Itoa(info.Number), oidKV: info.OID.EncodeToString(), + firstSplitOidKV: info.FirstSplitOID.EncodeToString(), sizeKV: strconv.FormatInt(info.Size, 10), createdKV: strconv.FormatInt(info.Created.UTC().UnixMilli(), 10), serverCreatedKV: strconv.FormatInt(time.Now().UTC().UnixMilli(), 10), @@ -995,13 +998,13 @@ func (c *TreeClient) GetParts(ctx context.Context, bktInfo *data.BucketInfo, mul return result, nil } -func (c *TreeClient) GetLastPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) (*data.PartInfo, error) { +func (c *TreeClient) GetPartByNumber(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, number int) (*data.PartInfo, error) { parts, err := c.GetParts(ctx, bktInfo, multipartNodeID) if err != nil { return nil, fmt.Errorf("get parts: %w", err) } - if len(parts) == 0 { + if len(parts) == 0 || number == 0 { return nil, layer.ErrPartListIsEmpty } @@ -1022,7 +1025,18 @@ func (c *TreeClient) GetLastPart(ctx context.Context, bktInfo *data.BucketInfo, return 1 }) - return parts[len(parts)-1], nil + var pi *data.PartInfo + for _, part := range parts { + if part.Number != number { + continue + } + + if pi == nil || pi.ServerCreated.Before(part.ServerCreated) { + pi = part + } + } + + return pi, nil } // GetPartsAfter returns parts uploaded after partID. These parts are sorted and filtered by creation time. @@ -1320,7 +1334,6 @@ func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]str info.Meta[uploadIDKV] = info.UploadID info.Meta[ownerKV] = info.Owner.EncodeToString() info.Meta[createdKV] = strconv.FormatInt(info.Created.UTC().UnixMilli(), 10) - info.Meta[splitIDKV] = info.SplitID return info.Meta }