Skip to content

Commit

Permalink
layer: Fix multipart
Browse files Browse the repository at this point in the history
Reupload each small part, which we sliced manually before.

Signed-off-by: Evgenii Baidakov <evgenii@nspcc.io>
  • Loading branch information
smallhive committed Dec 9, 2024
1 parent dc8f608 commit c7cb522
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 47 deletions.
103 changes: 64 additions & 39 deletions api/layer/multipart_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,14 +595,67 @@ func (n *layer) reUploadFollowingParts(ctx context.Context, uploadParams UploadP
for _, part := range parts {
uploadParams.PartNumber = part.Number

if err = n.reUploadPart(ctx, uploadParams, part.OID, bktInfo, multipartInfo); err != nil {
return fmt.Errorf("reupload number=%d: %w", part.Number, err)
if len(part.Elements) > 0 {
if err = n.reUploadSegmentedPart(ctx, uploadParams, part, bktInfo, multipartInfo); err != nil {
return fmt.Errorf("reupload number=%d: %w", part.Number, err)
}
} else {
if err = n.reUploadPart(ctx, uploadParams, part.OID, bktInfo, multipartInfo); err != nil {
return fmt.Errorf("reupload number=%d: %w", part.Number, err)
}

Check warning on line 605 in api/layer/multipart_upload.go

View check run for this annotation

Codecov / codecov/patch

api/layer/multipart_upload.go#L598-L605

Added lines #L598 - L605 were not covered by tests
}
}

return nil

Check warning on line 609 in api/layer/multipart_upload.go

View check run for this annotation

Codecov / codecov/patch

api/layer/multipart_upload.go#L609

Added line #L609 was not covered by tests
}

func (n *layer) reUploadSegmentedPart(ctx context.Context, uploadParams UploadPartParams, part *data.PartInfo, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {
completePartPayload := make([]byte, 0, n.neoFS.MaxObjectSize())

for _, element := range part.Elements {
elementObj, err := n.objectGet(ctx, bktInfo, element.OID)
if err != nil {
return fmt.Errorf("get part oid=%s, emelent oid=%s: %w", part.OID.String(), element.OID.String(), err)
}

Check warning on line 619 in api/layer/multipart_upload.go

View check run for this annotation

Codecov / codecov/patch

api/layer/multipart_upload.go#L612-L619

Added lines #L612 - L619 were not covered by tests

completePartPayload = append(completePartPayload, elementObj.Payload()...)

// The part contains all elements for Split chain and contains itself as well.
// We mustn't remove it here, it will be removed on MultipartComplete.
if part.OID == element.OID {
continue

Check warning on line 626 in api/layer/multipart_upload.go

View check run for this annotation

Codecov / codecov/patch

api/layer/multipart_upload.go#L621-L626

Added lines #L621 - L626 were not covered by tests
}

if err = n.objectDelete(ctx, bktInfo, element.OID); err != nil {
n.log.Error(
"couldn't delete object",
zap.Error(err),
zap.String("cnrID", bktInfo.CID.EncodeToString()),
zap.String("uploadID", multipartInfo.UploadID),
zap.Int("partNumber", part.Number),
zap.String("part.OID", part.OID.String()),
zap.String("part element OID", element.OID.String()),
)
// no return intentionally.

Check warning on line 639 in api/layer/multipart_upload.go

View check run for this annotation

Codecov / codecov/patch

api/layer/multipart_upload.go#L629-L639

Added lines #L629 - L639 were not covered by tests
}
}

uploadParams.Size = int64(len(completePartPayload))
uploadParams.Reader = bytes.NewReader(completePartPayload)

n.log.Error("reUploadPart", zap.String("oid", part.OID.String()), zap.Int64("payload size", uploadParams.Size))
if _, err := n.uploadPart(ctx, multipartInfo, &uploadParams); err != nil {
return fmt.Errorf("upload id=%s: %w", part.OID.String(), err)
}

Check warning on line 649 in api/layer/multipart_upload.go

View check run for this annotation

Codecov / codecov/patch

api/layer/multipart_upload.go#L643-L649

Added lines #L643 - L649 were not covered by tests

// remove old object, we just re-uploaded a new one.
if err := n.objectDelete(ctx, bktInfo, part.OID); err != nil {
return fmt.Errorf("delete old id=%s: %w", part.OID.String(), err)
}

Check warning on line 654 in api/layer/multipart_upload.go

View check run for this annotation

Codecov / codecov/patch

api/layer/multipart_upload.go#L652-L654

Added lines #L652 - L654 were not covered by tests

return nil
}

func (n *layer) reUploadPart(ctx context.Context, uploadParams UploadPartParams, id oid.ID, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {
obj, err := n.objectGet(ctx, bktInfo, id)
if err != nil {
Expand All @@ -612,7 +665,7 @@ func (n *layer) reUploadPart(ctx context.Context, uploadParams UploadPartParams,
uploadParams.Size = int64(obj.PayloadSize())
uploadParams.Reader = bytes.NewReader(obj.Payload())

n.log.Error("reUploadPart", zap.String("oid", id.String()), zap.Uint64("payload size", obj.PayloadSize()))
n.log.Debug("reUploadPart", zap.String("oid", id.String()), zap.Uint64("payload size", obj.PayloadSize()))

Check warning on line 668 in api/layer/multipart_upload.go

View check run for this annotation

Codecov / codecov/patch

api/layer/multipart_upload.go#L668

Added line #L668 was not covered by tests
if _, err = n.uploadPart(ctx, multipartInfo, &uploadParams); err != nil {
return fmt.Errorf("upload id=%s: %w", id.String(), err)
}
Expand Down Expand Up @@ -1254,27 +1307,12 @@ func (n *layer) manualSlice(ctx context.Context, bktInfo *data.BucketInfo, prm P
// uploadPartAsSlot uploads multipart part, but without correct link to previous part because we don't have it.
// It uses zero part as pivot. Actual link will be set on CompleteMultipart.
func (n *layer) uploadPartAsSlot(ctx context.Context, params uploadPartAsSlotParams) (*data.ObjectInfo, error) {
zeroPart, err := n.treeService.GetPartByNumber(ctx, params.bktInfo, params.multipartInfo.ID, 0)
if err != nil {
return nil, fmt.Errorf("get part by number: %w", err)
}

var (
id oid.ID
chunk *[]byte
elements []data.LinkObjectPayload
isReturnToPool bool
splitFirstID = zeroPart.OID
splitPreviousID = zeroPart.OID
multipartHash = sha256.New()
currentPartHash = sha256.New()
id oid.ID
elements []data.LinkObjectPayload
multipartHash = sha256.New()

Check warning on line 1313 in api/layer/multipart_upload.go

View check run for this annotation

Codecov / codecov/patch

api/layer/multipart_upload.go#L1311-L1313

Added lines #L1311 - L1313 were not covered by tests
)

objHashes := []hash.Hash{multipartHash, currentPartHash}
if params.tzHash != nil {
objHashes = append(objHashes, params.tzHash)
}

params.attributes = append(params.attributes,
[2]string{headerS3MultipartUpload, params.multipartInfo.UploadID},
[2]string{headerS3MultipartNumber, strconv.FormatInt(int64(params.uploadPartParams.PartNumber), 10)},
Expand All @@ -1287,26 +1325,13 @@ func (n *layer) uploadPartAsSlot(ctx context.Context, params uploadPartAsSlotPar
Attributes: params.attributes,
CreationTime: params.creationTime,
CopiesNumber: params.multipartInfo.CopiesNumber,
Multipart: &Multipart{
MultipartHashes: objHashes,
},
}

if params.uploadPartParams.Size > n.neoFS.MaxObjectSize()/2 {
chunk = n.buffers.Get().(*[]byte)
isReturnToPool = true
} else {
smallChunk := make([]byte, params.uploadPartParams.Size)
chunk = &smallChunk
}

id, elements, err = n.manualSlice(ctx, params.bktInfo, prm, splitFirstID, splitPreviousID, *chunk, params.payloadReader)
if isReturnToPool {
n.buffers.Put(chunk)
Payload: params.payloadReader,
PayloadSize: uint64(params.decSize),

Check warning on line 1329 in api/layer/multipart_upload.go

View check run for this annotation

Codecov / codecov/patch

api/layer/multipart_upload.go#L1328-L1329

Added lines #L1328 - L1329 were not covered by tests
}

id, objHashBts, err := n.objectPutAndHash(ctx, prm, params.bktInfo)

Check warning on line 1332 in api/layer/multipart_upload.go

View check run for this annotation

Codecov / codecov/patch

api/layer/multipart_upload.go#L1332

Added line #L1332 was not covered by tests
if err != nil {
return nil, fmt.Errorf("manual slice: %w", err)
return nil, fmt.Errorf("object put and hash: %w", err)

Check warning on line 1334 in api/layer/multipart_upload.go

View check run for this annotation

Codecov / codecov/patch

api/layer/multipart_upload.go#L1334

Added line #L1334 was not covered by tests
}

partInfo := &data.PartInfo{
Expand All @@ -1315,7 +1340,7 @@ func (n *layer) uploadPartAsSlot(ctx context.Context, params uploadPartAsSlotPar
Number: params.uploadPartParams.PartNumber,
OID: id,
Size: params.decSize,
ETag: hex.EncodeToString(currentPartHash.Sum(nil)),
ETag: hex.EncodeToString(objHashBts),

Check warning on line 1343 in api/layer/multipart_upload.go

View check run for this annotation

Codecov / codecov/patch

api/layer/multipart_upload.go#L1343

Added line #L1343 was not covered by tests
Created: prm.CreationTime,
Elements: elements,
}
Expand Down
18 changes: 10 additions & 8 deletions internal/neofs/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,15 +284,17 @@ func newPartInfo(node NodeResponse) (*data.PartInfo, error) {
case homoHashKV:
partInfo.HomoHash = []byte(value)
case elementsKV:
elements := strings.Split(value, ",")
partInfo.Elements = make([]data.LinkObjectPayload, len(elements))
for i, e := range elements {
var element data.LinkObjectPayload
if err = element.Unmarshal(e); err != nil {
return nil, fmt.Errorf("invalid element: %w", err)
if value != "" {
elements := strings.Split(value, ",")
partInfo.Elements = make([]data.LinkObjectPayload, len(elements))
for i, e := range elements {
var element data.LinkObjectPayload
if err = element.Unmarshal(e); err != nil {
return nil, fmt.Errorf("invalid element: %w", err)
}

Check warning on line 294 in internal/neofs/tree.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/tree.go#L287-L294

Added lines #L287 - L294 were not covered by tests

partInfo.Elements[i] = element

Check warning on line 296 in internal/neofs/tree.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/tree.go#L296

Added line #L296 was not covered by tests
}

partInfo.Elements[i] = element
}
}
}
Expand Down

0 comments on commit c7cb522

Please sign in to comment.