Skip to content

Commit

Permalink
*: Replace split V1 to V2
Browse files Browse the repository at this point in the history
Closes #937.

Signed-off-by: Evgenii Baidakov <evgenii@nspcc.io>
  • Loading branch information
smallhive committed Jun 13, 2024
1 parent 1509a1d commit 7a4680b
Show file tree
Hide file tree
Showing 8 changed files with 149 additions and 56 deletions.
41 changes: 38 additions & 3 deletions api/data/tree.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package data

import (
"fmt"
"strconv"
"strings"
"time"

cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
Expand Down Expand Up @@ -75,7 +77,38 @@ type MultipartInfo struct {
Created time.Time
Meta map[string]string
CopiesNumber uint32
SplitID string
}

// LinkObjectPayload contains part info of the complex object.
// This data will be used for linking object construction.
type LinkObjectPayload struct {
OID oid.ID
Size uint32
}

// Marshal converts LinkObjectPayload to string.
func (e *LinkObjectPayload) Marshal() string {
return fmt.Sprintf("%s:%d", e.OID.String(), e.Size)
}

// Unmarshal converts string to LinkObjectPayload.
func (e *LinkObjectPayload) Unmarshal(value string) error {
parts := strings.Split(value, ":")
if len(parts) != 2 {
return fmt.Errorf("invalid format: %s", value)
}

if err := e.OID.DecodeString(parts[0]); err != nil {
return fmt.Errorf("invalid id: %w", err)
}

size, err := strconv.ParseUint(parts[1], 10, 32)
if err != nil {
return fmt.Errorf("invalid size: %w", err)
}

e.Size = uint32(size)
return nil
}

// PartInfo is upload information about part.
Expand All @@ -95,8 +128,10 @@ type PartInfo struct {
MultipartHash []byte
// HomoHash contains internal state of the [hash.Hash] to calculate whole object homomorphic payload hash.
HomoHash []byte
// Elements contain [oid.ID] object list for the current part.
Elements []oid.ID
// Elements contain [oid.ID] and size for each element for the current part.
Elements []LinkObjectPayload
// FirstSplitOID contains first object part in the split chain.
FirstSplitOID oid.ID
}

// ToHeaderString form short part representation to use in S3-Completed-Parts header.
Expand Down
58 changes: 45 additions & 13 deletions api/layer/multipart_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartPar
Created: TimeNow(ctx),
Meta: make(map[string]string, metaSize),
CopiesNumber: p.CopiesNumber,
SplitID: object.NewSplitID().String(),
}

for key, val := range p.Header {
Expand Down Expand Up @@ -229,6 +228,7 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf

var (
splitPreviousID oid.ID
splitFirstID oid.ID
isSetSplitPreviousID bool
multipartHash = sha256.New()
tzHash hash.Hash
Expand All @@ -238,7 +238,7 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
tzHash = tz.New()
}

lastPart, err := n.treeService.GetLastPart(ctx, bktInfo, multipartInfo.ID)
lastPart, err := n.treeService.GetPartByNumber(ctx, bktInfo, multipartInfo.ID, p.PartNumber-1)
if err != nil {
// if ErrPartListIsEmpty, there is the first part of multipart.
if !errors.Is(err, ErrPartListIsEmpty) {
Expand All @@ -261,11 +261,12 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf

isSetSplitPreviousID = true
splitPreviousID = lastPart.OID
splitFirstID = lastPart.FirstSplitOID
}

var (
id oid.ID
elements []oid.ID
elements []data.LinkObjectPayload
creationTime = TimeNow(ctx)
// User may upload part large maxObjectSize in NeoFS. From users point of view it is a single object.
// We have to calculate the hash from this object separately.
Expand All @@ -284,20 +285,29 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
CreationTime: creationTime,
CopiesNumber: multipartInfo.CopiesNumber,
Multipart: &Multipart{
SplitID: multipartInfo.SplitID,
MultipartHashes: objHashes,
},
}

chunk := n.buffers.Get().(*[]byte)
if lastPart != nil {
splitFirstID = lastPart.FirstSplitOID
}

chunk := n.buffers.Get().(*[]byte)
var totalBytes int
// slice part manually. Simultaneously considering the part is a single object for user.
for {
if isSetSplitPreviousID {
prm.Multipart.SplitPreviousID = &splitPreviousID
}

if !splitFirstID.Equals(oid.ID{}) {
prm.Multipart.SplitFirstID = &splitFirstID
}

nBts, readErr := io.ReadAtLeast(payloadReader, *chunk, len(*chunk))
totalBytes += nBts

if nBts > 0 {
prm.Payload = bytes.NewReader((*chunk)[:nBts])
prm.PayloadSize = uint64(nBts)
Expand All @@ -307,9 +317,13 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
return nil, err
}

if splitFirstID.Equals(oid.ID{}) {
splitFirstID = id
}

isSetSplitPreviousID = true
splitPreviousID = id
elements = append(elements, id)
elements = append(elements, data.LinkObjectPayload{OID: id, Size: uint32(nBts)})
}

if readErr == nil {
Expand Down Expand Up @@ -344,6 +358,10 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
Elements: elements,
}

if !splitFirstID.Equals(oid.ID{}) {
partInfo.FirstSplitOID = splitFirstID
}

// encoding hash.Hash state to save it in tree service.
// the required interface is guaranteed according to the docs, so just cast without checks.
binaryMarshaler := multipartHash.(encoding.BinaryMarshaler)
Expand Down Expand Up @@ -506,8 +524,9 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
var multipartObjetSize int64
var encMultipartObjectSize uint64
var lastPartID int
var children []oid.ID
var completedPartsHeader strings.Builder
// +1 is the last part, it will be created later in the code.
var measuredObjects = make([]object.MeasuredObject, 0, len(p.Parts)+1)
for i, part := range p.Parts {
partInfo := partsInfo[part.PartNumber]
if partInfo == nil || part.ETag != partInfo.ETag {
Expand Down Expand Up @@ -539,19 +558,27 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
lastPartID = part.PartNumber
}

children = append(children, partInfo.Elements...)
for _, element := range partInfo.Elements {
// Collecting payload for the link object.
var mObj object.MeasuredObject
mObj.SetObjectID(element.OID)
mObj.SetObjectSize(element.Size)
measuredObjects = append(measuredObjects, mObj)
}
}

multipartHash := sha256.New()
var homoHash hash.Hash
var splitPreviousID oid.ID
var splitFirstID oid.ID

if lastPartID > 0 {
lastPart := partsInfo[lastPartID]

if lastPart != nil {
if len(lastPart.MultipartHash) > 0 {
splitPreviousID = lastPart.OID
splitFirstID = lastPart.FirstSplitOID

if len(lastPart.MultipartHash) > 0 {
binaryUnmarshaler := multipartHash.(encoding.BinaryUnmarshaler)
Expand Down Expand Up @@ -623,7 +650,7 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
CreationTime: TimeNow(ctx),
CopiesNumber: multipartInfo.CopiesNumber,
Multipart: &Multipart{
SplitID: multipartInfo.SplitID,
SplitFirstID: &splitFirstID,
SplitPreviousID: &splitPreviousID,
HeaderObject: header,
},
Expand All @@ -635,7 +662,13 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
return nil, nil, err
}

children = append(children, lastPartObjID)
var mObj object.MeasuredObject
// last part has the zero length.
mObj.SetObjectID(lastPartObjID)
measuredObjects = append(measuredObjects, mObj)

var linkObj = object.Link{}
linkObj.SetObjects(measuredObjects)

// linking object
prm = PrmObjectCreate{
Expand All @@ -644,11 +677,10 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
CreationTime: TimeNow(ctx),
CopiesNumber: multipartInfo.CopiesNumber,
Multipart: &Multipart{
SplitID: multipartInfo.SplitID,
HeaderObject: header,
Children: children,
SplitFirstID: &splitFirstID,
Link: &linkObj,
},
Payload: bytes.NewBuffer(nil),
}

_, _, err = n.objectPutAndHash(ctx, prm, p.Info.Bkt)
Expand Down
8 changes: 4 additions & 4 deletions api/layer/neofs.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,15 @@ type PrmObjectCreate struct {
type Multipart struct {
// MultipartHashes contains hashes for the multipart object payload calculation (optional).
MultipartHashes []hash.Hash
// SplitID contains splitID for multipart object (optional).
SplitID string
// SplitPreviousID contains [oid.ID] of previous object in chain (optional).
SplitPreviousID *oid.ID
// Children contains all objects in multipart chain, for linking object (optional).
Children []oid.ID
// SplitFirstID contains [oid.ID] of the first object in chain (The first object has nil here).
SplitFirstID *oid.ID
// HeaderObject is a virtual representation of complete multipart object (optional). It is used to set Parent in
// linking object.
HeaderObject *object.Object
// Link contains info for linking object.
Link *object.Link
}

// PrmObjectDelete groups parameters of NeoFS.DeleteObject operation.
Expand Down
18 changes: 9 additions & 9 deletions api/layer/neofs_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,19 +270,13 @@ func (t *TestNeoFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID
obj.SetOwnerID(&prm.Creator)
t.currentEpoch++

if prm.Multipart != nil && prm.Multipart.SplitID != "" {
var split object.SplitID
if err := split.Parse(prm.Multipart.SplitID); err != nil {
return oid.ID{}, fmt.Errorf("split parse: %w", err)
}
obj.SetSplitID(&split)

if prm.Multipart != nil {
if prm.Multipart.SplitPreviousID != nil {
obj.SetPreviousID(*prm.Multipart.SplitPreviousID)
}

if len(prm.Multipart.Children) > 0 {
obj.SetChildren(prm.Multipart.Children...)
if prm.Multipart.SplitFirstID != nil {
obj.SetFirstID(*prm.Multipart.SplitFirstID)
}

if prm.Multipart.HeaderObject != nil {
Expand All @@ -294,6 +288,12 @@ func (t *TestNeoFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID
obj.SetParentID(id)
obj.SetParent(prm.Multipart.HeaderObject)
}

if prm.Multipart.Link != nil {
obj.WriteLink(*prm.Multipart.Link)
prm.Payload = bytes.NewReader(obj.Payload())
obj.SetPayloadSize(uint64(len(obj.Payload())))
}
}

if len(prm.Locks) > 0 {
Expand Down
15 changes: 13 additions & 2 deletions api/layer/tree_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ LOOP:
return result, nil
}

func (t *TreeServiceMock) GetLastPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) (*data.PartInfo, error) {
func (t *TreeServiceMock) GetPartByNumber(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, number int) (*data.PartInfo, error) {
parts, err := t.GetParts(ctx, bktInfo, multipartNodeID)
if err != nil {
return nil, fmt.Errorf("get parts: %w", err)
Expand All @@ -390,7 +390,18 @@ func (t *TreeServiceMock) GetLastPart(ctx context.Context, bktInfo *data.BucketI
return 1
})

return parts[len(parts)-1], nil
var pi *data.PartInfo
for _, part := range parts {
if part.Number != number {
continue
}

if pi == nil || pi.ServerCreated.Before(part.ServerCreated) {
pi = part
}
}

return pi, nil
}

func (t *TreeServiceMock) GetPartsAfter(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, partID int) ([]*data.PartInfo, error) {
Expand Down
5 changes: 2 additions & 3 deletions api/layer/tree_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,10 @@ type TreeService interface {
// If object id to remove is not found returns ErrNoNodeToRemove error.
AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error)
GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error)
// GetLastPart returns the latest uploaded part.
//
// GetPartByNumber returns the part by number. If part was uploaded few times the newest one will be returned.
// Return errors:
// - [ErrPartListIsEmpty] if there is no parts in the upload id.
GetLastPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) (*data.PartInfo, error)
GetPartByNumber(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, number int) (*data.PartInfo, error)
GetPartsAfter(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, partID int) ([]*data.PartInfo, error)

// Compound methods for optimizations
Expand Down
21 changes: 12 additions & 9 deletions internal/neofs/neofs.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,19 +274,13 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oi
obj.SetAttributes(attrs...)
obj.SetPayloadSize(prm.PayloadSize)

if prm.Multipart != nil && prm.Multipart.SplitID != "" {
var split object.SplitID
if err := split.Parse(prm.Multipart.SplitID); err != nil {
return oid.ID{}, fmt.Errorf("parse split ID: %w", err)
}
obj.SetSplitID(&split)

if prm.Multipart != nil {
if prm.Multipart.SplitPreviousID != nil {
obj.SetPreviousID(*prm.Multipart.SplitPreviousID)
}

if len(prm.Multipart.Children) > 0 {
obj.SetChildren(prm.Multipart.Children...)
if prm.Multipart.SplitFirstID != nil {
obj.SetFirstID(*prm.Multipart.SplitFirstID)
}

if prm.Multipart.HeaderObject != nil {
Expand All @@ -298,6 +292,15 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oi
obj.SetParentID(id)
obj.SetParent(prm.Multipart.HeaderObject)
}

if prm.Multipart.Link != nil {
obj.WriteLink(*prm.Multipart.Link)
prm.Payload = bytes.NewReader(obj.Payload())
obj.SetPayloadSize(uint64(len(obj.Payload())))

// Link object should never have a previous one.
obj.ResetPreviousID()
}
}

if len(prm.Locks) > 0 {
Expand Down
Loading

0 comments on commit 7a4680b

Please sign in to comment.