diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index 46aa7a08..6ac16193 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -595,14 +595,67 @@ func (n *layer) reUploadFollowingParts(ctx context.Context, uploadParams UploadP for _, part := range parts { uploadParams.PartNumber = part.Number - if err = n.reUploadPart(ctx, uploadParams, part.OID, bktInfo, multipartInfo); err != nil { - return fmt.Errorf("reupload number=%d: %w", part.Number, err) + if len(part.Elements) > 0 { + if err = n.reUploadSegmentedPart(ctx, uploadParams, part, bktInfo, multipartInfo); err != nil { + return fmt.Errorf("reupload number=%d: %w", part.Number, err) + } + } else { + if err = n.reUploadPart(ctx, uploadParams, part.OID, bktInfo, multipartInfo); err != nil { + return fmt.Errorf("reupload number=%d: %w", part.Number, err) + } } } return nil } +func (n *layer) reUploadSegmentedPart(ctx context.Context, uploadParams UploadPartParams, part *data.PartInfo, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error { + completePartPayload := make([]byte, 0, n.neoFS.MaxObjectSize()) + + for _, element := range part.Elements { + elementObj, err := n.objectGet(ctx, bktInfo, element.OID) + if err != nil { + return fmt.Errorf("get part oid=%s, emelent oid=%s: %w", part.OID.String(), element.OID.String(), err) + } + + completePartPayload = append(completePartPayload, elementObj.Payload()...) + + // The part contains all elements for Split chain and contains itself as well. + // We mustn't remove it here, it will be removed on MultipartComplete. + if part.OID == element.OID { + continue + } + + if err = n.objectDelete(ctx, bktInfo, element.OID); err != nil { + n.log.Error( + "couldn't delete object", + zap.Error(err), + zap.String("cnrID", bktInfo.CID.EncodeToString()), + zap.String("uploadID", multipartInfo.UploadID), + zap.Int("partNumber", part.Number), + zap.String("part.OID", part.OID.String()), + zap.String("part element OID", element.OID.String()), + ) + // no return intentionally. + } + } + + uploadParams.Size = int64(len(completePartPayload)) + uploadParams.Reader = bytes.NewReader(completePartPayload) + + n.log.Error("reUploadPart", zap.String("oid", part.OID.String()), zap.Int64("payload size", uploadParams.Size)) + if _, err := n.uploadPart(ctx, multipartInfo, &uploadParams); err != nil { + return fmt.Errorf("upload id=%s: %w", part.OID.String(), err) + } + + // remove old object, we just re-uploaded a new one. + if err := n.objectDelete(ctx, bktInfo, part.OID); err != nil { + return fmt.Errorf("delete old id=%s: %w", part.OID.String(), err) + } + + return nil +} + func (n *layer) reUploadPart(ctx context.Context, uploadParams UploadPartParams, id oid.ID, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error { obj, err := n.objectGet(ctx, bktInfo, id) if err != nil { @@ -612,7 +665,7 @@ func (n *layer) reUploadPart(ctx context.Context, uploadParams UploadPartParams, uploadParams.Size = int64(obj.PayloadSize()) uploadParams.Reader = bytes.NewReader(obj.Payload()) - n.log.Error("reUploadPart", zap.String("oid", id.String()), zap.Uint64("payload size", obj.PayloadSize())) + n.log.Debug("reUploadPart", zap.String("oid", id.String()), zap.Uint64("payload size", obj.PayloadSize())) if _, err = n.uploadPart(ctx, multipartInfo, &uploadParams); err != nil { return fmt.Errorf("upload id=%s: %w", id.String(), err) } @@ -1254,27 +1307,12 @@ func (n *layer) manualSlice(ctx context.Context, bktInfo *data.BucketInfo, prm P // uploadPartAsSlot uploads multipart part, but without correct link to previous part because we don't have it. // It uses zero part as pivot. Actual link will be set on CompleteMultipart. func (n *layer) uploadPartAsSlot(ctx context.Context, params uploadPartAsSlotParams) (*data.ObjectInfo, error) { - zeroPart, err := n.treeService.GetPartByNumber(ctx, params.bktInfo, params.multipartInfo.ID, 0) - if err != nil { - return nil, fmt.Errorf("get part by number: %w", err) - } - var ( - id oid.ID - chunk *[]byte - elements []data.LinkObjectPayload - isReturnToPool bool - splitFirstID = zeroPart.OID - splitPreviousID = zeroPart.OID - multipartHash = sha256.New() - currentPartHash = sha256.New() + id oid.ID + elements []data.LinkObjectPayload + multipartHash = sha256.New() ) - objHashes := []hash.Hash{multipartHash, currentPartHash} - if params.tzHash != nil { - objHashes = append(objHashes, params.tzHash) - } - params.attributes = append(params.attributes, [2]string{headerS3MultipartUpload, params.multipartInfo.UploadID}, [2]string{headerS3MultipartNumber, strconv.FormatInt(int64(params.uploadPartParams.PartNumber), 10)}, @@ -1287,26 +1325,13 @@ func (n *layer) uploadPartAsSlot(ctx context.Context, params uploadPartAsSlotPar Attributes: params.attributes, CreationTime: params.creationTime, CopiesNumber: params.multipartInfo.CopiesNumber, - Multipart: &Multipart{ - MultipartHashes: objHashes, - }, - } - - if params.uploadPartParams.Size > n.neoFS.MaxObjectSize()/2 { - chunk = n.buffers.Get().(*[]byte) - isReturnToPool = true - } else { - smallChunk := make([]byte, params.uploadPartParams.Size) - chunk = &smallChunk - } - - id, elements, err = n.manualSlice(ctx, params.bktInfo, prm, splitFirstID, splitPreviousID, *chunk, params.payloadReader) - if isReturnToPool { - n.buffers.Put(chunk) + Payload: params.payloadReader, + PayloadSize: uint64(params.decSize), } + id, objHashBts, err := n.objectPutAndHash(ctx, prm, params.bktInfo) if err != nil { - return nil, fmt.Errorf("manual slice: %w", err) + return nil, fmt.Errorf("object put and hash: %w", err) } partInfo := &data.PartInfo{ @@ -1315,7 +1340,7 @@ func (n *layer) uploadPartAsSlot(ctx context.Context, params uploadPartAsSlotPar Number: params.uploadPartParams.PartNumber, OID: id, Size: params.decSize, - ETag: hex.EncodeToString(currentPartHash.Sum(nil)), + ETag: hex.EncodeToString(objHashBts), Created: prm.CreationTime, Elements: elements, } diff --git a/internal/neofs/tree.go b/internal/neofs/tree.go index 5671de59..f8aafa9c 100644 --- a/internal/neofs/tree.go +++ b/internal/neofs/tree.go @@ -284,15 +284,17 @@ func newPartInfo(node NodeResponse) (*data.PartInfo, error) { case homoHashKV: partInfo.HomoHash = []byte(value) case elementsKV: - elements := strings.Split(value, ",") - partInfo.Elements = make([]data.LinkObjectPayload, len(elements)) - for i, e := range elements { - var element data.LinkObjectPayload - if err = element.Unmarshal(e); err != nil { - return nil, fmt.Errorf("invalid element: %w", err) + if value != "" { + elements := strings.Split(value, ",") + partInfo.Elements = make([]data.LinkObjectPayload, len(elements)) + for i, e := range elements { + var element data.LinkObjectPayload + if err = element.Unmarshal(e); err != nil { + return nil, fmt.Errorf("invalid element: %w", err) + } + + partInfo.Elements[i] = element } - - partInfo.Elements[i] = element } } }