Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: v1.7 #1401

Merged
merged 4 commits into from
May 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions modular/blocksyncer/modules/bucket/bucket_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ var (
EventDeleteBucket = proto.MessageName(&storagetypes.EventDeleteBucket{})
EventUpdateBucketInfo = proto.MessageName(&storagetypes.EventUpdateBucketInfo{})
EventDiscontinueBucket = proto.MessageName(&storagetypes.EventDiscontinueBucket{})
EventMigrationBucket = proto.MessageName(&storagetypes.EventMigrationBucket{})
EventCancelMigrationBucket = proto.MessageName(&storagetypes.EventCancelMigrationBucket{})
EventRejectMigrateBucket = proto.MessageName(&storagetypes.EventRejectMigrateBucket{})
EventCompleteMigrationBucket = proto.MessageName(&storagetypes.EventCompleteMigrationBucket{})
EventToggleSPAsDelegatedAgent = proto.MessageName(&storagetypes.EventToggleSPAsDelegatedAgent{})
)
Expand All @@ -31,6 +34,9 @@ var BucketEvents = map[string]bool{
EventDeleteBucket: true,
EventUpdateBucketInfo: true,
EventDiscontinueBucket: true,
EventMigrationBucket: true,
EventCancelMigrationBucket: true,
EventRejectMigrateBucket: true,
EventCompleteMigrationBucket: true,
EventToggleSPAsDelegatedAgent: true,
}
Expand Down Expand Up @@ -100,6 +106,27 @@ func (m *Module) ExtractEventStatements(ctx context.Context, block *tmctypes.Res
return nil, errors.New("discontinue bucket event assert error")
}
return m.handleDiscontinueBucket(ctx, block, txHash, discontinueBucket), nil
case EventMigrationBucket:
migrationBucket, ok := typedEvent.(*storagetypes.EventMigrationBucket)
if !ok {
log.Errorw("type assert error", "type", "EventMigrationBucket", "event", typedEvent)
return nil, errors.New("migration bucket event assert error")
}
return m.handleEventMigrationBucket(ctx, block, txHash, migrationBucket), nil
case EventCancelMigrationBucket:
cancelMigrationBucket, ok := typedEvent.(*storagetypes.EventCancelMigrationBucket)
if !ok {
log.Errorw("type assert error", "type", "EventCancelMigrationBucket", "event", typedEvent)
return nil, errors.New("cancel migration bucket event assert error")
}
return m.handleEventCancelMigrationBucket(ctx, block, txHash, cancelMigrationBucket), nil
case EventRejectMigrateBucket:
rejectMigrateBucket, ok := typedEvent.(*storagetypes.EventRejectMigrateBucket)
if !ok {
log.Errorw("type assert error", "type", "EventRejectMigrateBucket", "event", typedEvent)
return nil, errors.New("reject migration bucket event assert error")
}
return m.handleEventRejectMigrateBucket(ctx, block, txHash, rejectMigrateBucket), nil
case EventCompleteMigrationBucket:
completeMigrationBucket, ok := typedEvent.(*storagetypes.EventCompleteMigrationBucket)
if !ok {
Expand Down Expand Up @@ -208,11 +235,63 @@ func (m *Module) handleUpdateBucketInfo(ctx context.Context, block *tmctypes.Res
}
}

func (m *Module) handleEventMigrationBucket(ctx context.Context, block *tmctypes.ResultBlock, txHash common.Hash, migrationBucket *storagetypes.EventMigrationBucket) map[string][]interface{} {
bucket := &models.Bucket{
BucketID: common.BigToHash(migrationBucket.BucketId.BigInt()),
BucketName: migrationBucket.BucketName,
Status: storagetypes.BUCKET_STATUS_MIGRATING.String(),

UpdateAt: block.Block.Height,
UpdateTxHash: txHash,
UpdateTime: block.Block.Time.UTC().Unix(),
}

k, v := m.db.UpdateBucketToSQL(ctx, bucket)
return map[string][]interface{}{
k: v,
}
}

func (m *Module) handleEventCancelMigrationBucket(ctx context.Context, block *tmctypes.ResultBlock, txHash common.Hash, cancelMigrationBucket *storagetypes.EventCancelMigrationBucket) map[string][]interface{} {
bucket := &models.Bucket{
BucketID: common.BigToHash(cancelMigrationBucket.BucketId.BigInt()),
BucketName: cancelMigrationBucket.BucketName,
Status: storagetypes.BUCKET_STATUS_CREATED.String(),

UpdateAt: block.Block.Height,
UpdateTxHash: txHash,
UpdateTime: block.Block.Time.UTC().Unix(),
}

k, v := m.db.UpdateBucketToSQL(ctx, bucket)
return map[string][]interface{}{
k: v,
}
}

func (m *Module) handleEventRejectMigrateBucket(ctx context.Context, block *tmctypes.ResultBlock, txHash common.Hash, rejectMigrateBucket *storagetypes.EventRejectMigrateBucket) map[string][]interface{} {
bucket := &models.Bucket{
BucketID: common.BigToHash(rejectMigrateBucket.BucketId.BigInt()),
BucketName: rejectMigrateBucket.BucketName,
Status: storagetypes.BUCKET_STATUS_CREATED.String(),

UpdateAt: block.Block.Height,
UpdateTxHash: txHash,
UpdateTime: block.Block.Time.UTC().Unix(),
}

k, v := m.db.UpdateBucketToSQL(ctx, bucket)
return map[string][]interface{}{
k: v,
}
}

func (m *Module) handleCompleteMigrationBucket(ctx context.Context, block *tmctypes.ResultBlock, txHash common.Hash, completeMigrationBucket *storagetypes.EventCompleteMigrationBucket) map[string][]interface{} {
bucket := &models.Bucket{
BucketID: common.BigToHash(completeMigrationBucket.BucketId.BigInt()),
BucketName: completeMigrationBucket.BucketName,
GlobalVirtualGroupFamilyId: completeMigrationBucket.GlobalVirtualGroupFamilyId,
Status: storagetypes.BUCKET_STATUS_CREATED.String(),

UpdateAt: block.Block.Height,
UpdateTxHash: txHash,
Expand Down
43 changes: 42 additions & 1 deletion modular/manager/bucket_migrate_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,13 @@ func (plan *BucketMigrateExecutePlan) sendCompleteMigrateBucketTx(migrateExecute
if err != nil {
return err
}
if bucket == nil {
log.Debugw("send complete migrate bucket has been deleted", "bucket_id", plan.bucketID)
if err = UpdateBucketMigrationProgress(plan.manager.baseApp, plan.bucketID, storetypes.BucketMigrationState_BUCKET_MIGRATION_STATE_MIGRATION_FINISHED); err != nil {
return err
}
return nil
}
var gvgMappings []*storagetypes.GVGMapping
for _, migrateGVGUnit := range plan.gvgUnitMap {
aggBlsSig, getBlsError := plan.getBlsAggregateSigForBucketMigration(context.Background(), migrateGVGUnit)
Expand Down Expand Up @@ -239,6 +246,13 @@ func (plan *BucketMigrateExecutePlan) rejectBucketMigration() error {
if err != nil {
return err
}
if bucket == nil {
log.Debugw("reject bucket migration has been deleted", "bucket_id", plan.bucketID)
if err = UpdateBucketMigrationProgress(plan.manager.baseApp, plan.bucketID, storetypes.BucketMigrationState_BUCKET_MIGRATION_STATE_MIGRATION_FINISHED); err != nil {
return err
}
return nil
}
rejectMigrateBucket := &storagetypes.MsgRejectMigrateBucket{Operator: plan.manager.baseApp.OperatorAddress(),
BucketName: bucket.BucketInfo.GetBucketName()}
txHash, txErr := plan.manager.baseApp.GfSpClient().RejectMigrateBucket(ctx, rejectMigrateBucket)
Expand Down Expand Up @@ -591,6 +605,23 @@ func (s *BucketMigrateScheduler) doneMigrateBucket(bucketID uint64) error {
return err
}

func (s *BucketMigrateScheduler) deleteMigrateBucket(bucketID uint64) error {
executePlan, err := s.getExecutePlanByBucketID(bucketID)
// 1) Received the CompleteEvents event for the first time.
// 2) Subsequently received the CompleteEvents event.
if err != nil {
log.Errorw("bucket migrate schedule received EventCompleteMigrationBucket, the event may already finished", "bucket_id", bucketID)
return err
}

s.deleteExecutePlanByBucketID(bucketID)
executePlan.stopSPSchedule()
err = s.manager.baseApp.GfSpDB().DeleteMigrateGVGUnitsByBucketID(bucketID)
log.Infow("succeed to delete migrate bucket", "bucket_id", bucketID, "error", err)

return err
}

func (s *BucketMigrateScheduler) cancelMigrateBucket(bucketID uint64, reject bool) error {
var (
executePlan *BucketMigrateExecutePlan
Expand Down Expand Up @@ -696,7 +727,17 @@ func (s *BucketMigrateScheduler) confirmCompleteTxEvents(ctx context.Context, ev
log.Errorw("failed to get bucket by bucket id", "bucket_id", bucketID, "error", err)
return
}

if bucket == nil {
if err = s.deleteMigrateBucket(bucketID); err != nil {
log.Errorw("failed to done migrate bucket", "EventMigrationBucket", event, "error", err)
return
}
if err = UpdateBucketMigrationProgress(s.manager.baseApp, bucketID, storetypes.BucketMigrationState_BUCKET_MIGRATION_STATE_MIGRATION_FINISHED); err != nil {
return
}
log.CtxInfow(ctx, "succeed to remove deleted bucket migrate event", "EventMigrationBucket", event)
return
}
if bucket.BucketInfo.GetBucketStatus() == storagetypes.BUCKET_STATUS_CREATED {
if err = UpdateBucketMigrationProgress(s.manager.baseApp, bucketID, storetypes.BucketMigrationState_BUCKET_MIGRATION_STATE_WAIT_COMPLETE_TX_EVENT_DONE); err != nil {
return
Expand Down
12 changes: 11 additions & 1 deletion modular/metadata/metadata_bucket_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,15 +520,25 @@ func (r *MetadataModular) GfSpGetLatestBucketReadQuota(
", bucket_id: " + util.Uint64ToString(req.GetBucketId()) + ", error: " + err.Error())}, nil
}
}

// if the traffic table has been created, return the db info from meta service
var chargedQuotaSize uint64
bucketInfo, err := r.baseApp.Consensus().QueryBucketInfoById(ctx, req.BucketId)
if err != nil {
log.Errorw("failed to get bucketInfo on chain",
"bucket_id", req.GetBucketId(), "error", err)
chargedQuotaSize = bucketTraffic.ChargedQuotaSize
} else {
chargedQuotaSize = bucketInfo.ChargedReadQuota
}
quota := &gfsptask.GfSpBucketQuotaInfo{
BucketName: bucketTraffic.BucketName,
BucketId: bucketTraffic.BucketID,
Month: bucketTraffic.YearMonth,
ReadConsumedSize: bucketTraffic.ReadConsumedSize,
FreeQuotaConsumedSize: bucketTraffic.FreeQuotaConsumedSize,
FreeQuotaSize: bucketTraffic.FreeQuotaSize,
ChargedQuotaSize: bucketTraffic.ChargedQuotaSize,
ChargedQuotaSize: chargedQuotaSize,
MonthlyFreeQuotaSize: bucketTraffic.MonthlyFreeQuotaSize,
MonthlyFreeQuotaConsumedSize: bucketTraffic.MonthlyFreeQuotaConsumedSize,
}
Expand Down
18 changes: 17 additions & 1 deletion modular/metadata/metadata_bucket_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"testing"

"cosmossdk.io/math"
"github.com/forbole/juno/v4/common"
"github.com/shopspring/decimal"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -656,6 +657,7 @@ func TestMetadataModular_GfSpGetLatestBucketReadQuota_Success(t *testing.T) {
a.baseApp.SetGfBsDB(m)
spDBMocker := spdb.NewMockSPDB(ctrl)

consensusChargedQuotaSize := uint64(100)
bucketTraffic := &spdb.BucketTraffic{
BucketID: 1,
YearMonth: "2024-01",
Expand All @@ -670,6 +672,20 @@ func TestMetadataModular_GfSpGetLatestBucketReadQuota_Success(t *testing.T) {

consensusMock := consensus.NewMockConsensus(ctrl)
consensusMock.EXPECT().QuerySPFreeQuota(gomock.Any(), gomock.Any()).Return(uint64(10000), nil).Times(0)
consensusMock.EXPECT().QueryBucketInfoById(gomock.Any(), gomock.Any()).Return(&storagetypes.BucketInfo{
Owner: "0x11E0A11A7A01E2E757447B52FBD7152004AC699D",
BucketName: "",
Visibility: 0,
Id: math.NewUint(1),
SourceType: 0,
CreateAt: 0,
PaymentAddress: "",
GlobalVirtualGroupFamilyId: 0,
ChargedReadQuota: consensusChargedQuotaSize,
BucketStatus: 0,
Tags: nil,
SpAsDelegatedAgentDisabled: false,
}, nil).AnyTimes()
a.baseApp.SetConsensus(consensusMock)

resp, err := a.GfSpGetLatestBucketReadQuota(context.Background(), &types.GfSpGetLatestBucketReadQuotaRequest{
Expand All @@ -684,7 +700,7 @@ func TestMetadataModular_GfSpGetLatestBucketReadQuota_Success(t *testing.T) {
ReadConsumedSize: bucketTraffic.ReadConsumedSize,
FreeQuotaConsumedSize: bucketTraffic.FreeQuotaConsumedSize,
FreeQuotaSize: bucketTraffic.FreeQuotaSize,
ChargedQuotaSize: bucketTraffic.ChargedQuotaSize,
ChargedQuotaSize: consensusChargedQuotaSize,
},
}, resp)
}
Expand Down
4 changes: 2 additions & 2 deletions modular/metadata/metadata_sp_exit_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,14 @@ func (r *MetadataModular) GfSpListMigrateBucketEvents(ctx context.Context, req *
GlobalVirtualGroupFamilyId: complete.GlobalVirtualGroupFamilyId,
}
}
if cancel != nil && cancel.CreateAt >= event.CreateAt && complete == nil {
if cancel != nil && cancel.CreateAt >= event.CreateAt && (complete == nil || cancel.CreateAt > complete.CreateAt) {
spCancelEvent = &storage_types.EventCancelMigrationBucket{
Operator: cancel.Operator.String(),
BucketName: cancel.BucketName,
BucketId: math.NewUintFromBigInt(cancel.BucketID.Big()),
}
}
if reject != nil && reject.CreateAt >= event.CreateAt && complete == nil {
if reject != nil && reject.CreateAt >= event.CreateAt && (complete == nil || reject.CreateAt > complete.CreateAt) {
spRejectEvent = &storage_types.EventRejectMigrateBucket{
Operator: reject.Operator.String(),
BucketName: reject.BucketName,
Expand Down
Loading