Skip to content

Commit

Permalink
fix: gc objects (#1436)
Browse files Browse the repository at this point in the history
* fix: gc objects map

feat: update trigger branch name

feat: restore

* feat: record deleted size in log

feat: update ut

feat: use aws tools

* feat: return when no object in primary/secondary case

feat: add debug log

fix: wording

feat: wording

* feat: remove push on branch
  • Loading branch information
annielz authored Nov 22, 2024
1 parent d5b6cd1 commit 86392ec
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 20 deletions.
2 changes: 1 addition & 1 deletion core/piecestore/piecestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,5 @@ type PieceStore interface {
DeletePiece(ctx context.Context, key string) error
// DeletePiecesByPrefix deletes pieces data from piece store, it can delete
// segment or ec piece data.
DeletePiecesByPrefix(ctx context.Context, key string) error
DeletePiecesByPrefix(ctx context.Context, key string) (uint64, error)
}
7 changes: 4 additions & 3 deletions core/piecestore/piecestore_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions modular/executor/execute_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,9 @@ func (e *ExecuteModular) HandleGCObjectTask(ctx context.Context, task coretask.G
continue
}
segmentPieceKeyPrefix := fmt.Sprintf("s%d_", currentGCObjectID)
deleteErr := e.baseApp.PieceStore().DeletePiecesByPrefix(ctx, segmentPieceKeyPrefix)
deletedSize, deleteErr := e.baseApp.PieceStore().DeletePiecesByPrefix(ctx, segmentPieceKeyPrefix)
log.CtxDebugw(ctx, "delete the primary sp pieces", "object_info", objectInfo,
"piece_key_prefix", segmentPieceKeyPrefix, "error", deleteErr)
"piece_key_prefix", segmentPieceKeyPrefix, "deletedSize", deletedSize, "error", deleteErr)
bucketInfo, err := e.baseApp.GfSpClient().GetBucketInfoByBucketName(ctx, objectInfo.BucketName)
if err != nil || bucketInfo == nil {
log.Errorw("failed to get bucket by bucket name", "bucket_name", objectInfo.BucketName, "error", err)
Expand All @@ -284,16 +284,16 @@ func (e *ExecuteModular) HandleGCObjectTask(ctx context.Context, task coretask.G
if spId == sspId {
redundancyIndex = int32(rIdx)
// ignore this delete api error, TODO: refine gc workflow by enrich metadata index.
deleteErr := e.baseApp.PieceStore().DeletePiecesByPrefix(ctx, ECPieceKeyPrefix)
deletedSize, deleteErr = e.baseApp.PieceStore().DeletePiecesByPrefix(ctx, ECPieceKeyPrefix)
log.CtxDebugw(ctx, "delete the secondary sp pieces by prefix",
"object_info", objectInfo, "piece_key_prefix", ECPieceKeyPrefix, "error", deleteErr)
"object_info", objectInfo, "piece_key_prefix", ECPieceKeyPrefix, "deletedSize", deletedSize, "error", deleteErr)
}
}
} else {
// if failed to get secondary sps, check the current sp
deleteErr := e.baseApp.PieceStore().DeletePiecesByPrefix(ctx, ECPieceKeyPrefix)
deletedSize, deleteErr = e.baseApp.PieceStore().DeletePiecesByPrefix(ctx, ECPieceKeyPrefix)
log.CtxDebugw(ctx, "delete the sp pieces by prefix in current sp when secondary sp not found",
"object_info", objectInfo, "piece_key_prefix", ECPieceKeyPrefix, "error", deleteErr)
"object_info", objectInfo, "piece_key_prefix", ECPieceKeyPrefix, "deletedSize", deletedSize, "error", deleteErr)

// signal as delete any integrity meta related with the object
redundancyIndex = math.MaxInt32
Expand Down
6 changes: 3 additions & 3 deletions modular/executor/executor_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ func TestExecuteModular_HandleGCObjectTask(t *testing.T) {
e.baseApp.SetPieceOp(m2)

m3 := piecestore.NewMockPieceStore(ctrl)
m3.EXPECT().DeletePiecesByPrefix(gomock.Any(), gomock.Any()).Return(nil).Times(1)
m3.EXPECT().DeletePiecesByPrefix(gomock.Any(), gomock.Any()).Return(uint64(0), nil).Times(1)
e.baseApp.SetPieceStore(m3)
return e
},
Expand Down Expand Up @@ -585,7 +585,7 @@ func TestExecuteModular_HandleGCObjectTask(t *testing.T) {
e.baseApp.SetPieceOp(m2)

m3 := piecestore.NewMockPieceStore(ctrl)
m3.EXPECT().DeletePiecesByPrefix(gomock.Any(), gomock.Any()).Return(nil).Times(1)
m3.EXPECT().DeletePiecesByPrefix(gomock.Any(), gomock.Any()).Return(uint64(0), nil).Times(1)
e.baseApp.SetPieceStore(m3)
return e
},
Expand Down Expand Up @@ -649,7 +649,7 @@ func TestExecuteModular_HandleGCObjectTask(t *testing.T) {
e.baseApp.SetPieceOp(m2)

m3 := piecestore.NewMockPieceStore(ctrl)
m3.EXPECT().DeletePiecesByPrefix(gomock.Any(), gomock.Any()).Return(nil).Times(2)
m3.EXPECT().DeletePiecesByPrefix(gomock.Any(), gomock.Any()).Return(uint64(0), nil).Times(2)
e.baseApp.SetPieceStore(m3)

m4 := corespdb.NewMockSPDB(ctrl)
Expand Down
5 changes: 3 additions & 2 deletions store/piecestore/client/piece_store_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (client *StoreClient) DeletePiece(ctx context.Context, key string) error {
}

// DeletePiecesByPrefix deletes pieces by prefix from piece store.
func (client *StoreClient) DeletePiecesByPrefix(ctx context.Context, key string) error {
func (client *StoreClient) DeletePiecesByPrefix(ctx context.Context, key string) (uint64, error) {
var (
startTime = time.Now()
err error
Expand All @@ -154,5 +154,6 @@ func (client *StoreClient) DeletePiecesByPrefix(ctx context.Context, key string)
}()

valSize, err = client.ps.DeleteByPrefix(ctx, key)
return err

return valSize, err
}
7 changes: 6 additions & 1 deletion store/piecestore/storage/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (o *ossStore) DeleteObject(ctx context.Context, key string) error {
func (o *ossStore) DeleteObjectsByPrefix(ctx context.Context, key string) (uint64, error) {
var (
objectKeys []string
objectKeySizeMap map[string]uint64
objectKeySizeMap = make(map[string]uint64)
continueDeleteObject = true
batchSize = int64(1000)
size uint64
Expand All @@ -89,6 +89,11 @@ func (o *ossStore) DeleteObjectsByPrefix(ctx context.Context, key string) (uint6
return size, err
}

if len(objs) == 0 {
log.CtxDebugw(ctx, "No object is listed in oss by prefix", "prefix", key)
return 0, nil
}

// if the object listed here is less than required batch size, meaning it is the last page
if int64(len(objs)) < batchSize {
continueDeleteObject = false
Expand Down
13 changes: 9 additions & 4 deletions store/piecestore/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func (s *s3Store) DeleteObject(ctx context.Context, key string) error {
func (s *s3Store) DeleteObjectsByPrefix(ctx context.Context, key string) (uint64, error) {
var (
objectIdentifiers []*s3.ObjectIdentifier
objectKeySizeMap map[string]uint64
objectKeySizeMap = make(map[string]uint64)
continueDeleteObject = true
batchSize = int64(1000)
size uint64
Expand All @@ -156,17 +156,22 @@ func (s *s3Store) DeleteObjectsByPrefix(ctx context.Context, key string) (uint64
return size, err
}

if len(objs) == 0 {
log.CtxDebugw(ctx, "No object is listed in s3 by prefix", "prefix", key)
return 0, nil
}

if int64(len(objs)) < batchSize {
continueDeleteObject = false
}

for _, obj := range objs {
objKey := obj.Key()
objectIdentifiers = append(objectIdentifiers, &s3.ObjectIdentifier{Key: &objKey})
objectIdentifiers = append(objectIdentifiers, &s3.ObjectIdentifier{Key: aws.String(objKey)})
objectKeySizeMap[obj.Key()] = uint64(obj.Size())
}

deleteParams := s3.Delete{Objects: make([]*s3.ObjectIdentifier, 0)}
deleteParams := s3.Delete{Objects: objectIdentifiers}
param := &s3.DeleteObjectsInput{
Bucket: aws.String(s.bucketName),
Delete: &deleteParams,
Expand All @@ -177,7 +182,7 @@ func (s *s3Store) DeleteObjectsByPrefix(ctx context.Context, key string) (uint64
}
if deleteObjectsOutput != nil {
for _, deletedObj := range deleteObjectsOutput.Deleted {
size += objectKeySizeMap[*(deletedObj.Key)]
size += objectKeySizeMap[aws.StringValue(deletedObj.Key)]
}
}
}
Expand Down

0 comments on commit 86392ec

Please sign in to comment.