diff --git a/database/migrate/migrations/00027_ blob_upload.sql b/database/migrate/migrations/00027_ blob_upload.sql index a771ac0e1e..bc0dee7518 100644 --- a/database/migrate/migrations/00027_ blob_upload.sql +++ b/database/migrate/migrations/00027_ blob_upload.sql @@ -3,6 +3,7 @@ CREATE TABLE blob_upload ( batch_index BIGINT NOT NULL, + batch_hash VARCHAR NOT NULL, platform SMALLINT NOT NULL, status SMALLINT NOT NULL, @@ -13,20 +14,15 @@ CREATE TABLE blob_upload ( deleted_at TIMESTAMP(0) DEFAULT NULL ); -CREATE UNIQUE INDEX IF NOT EXISTS batch_index_platform_uindex -ON blob_upload(batch_index, platform) WHERE deleted_at IS NULL; +CREATE UNIQUE INDEX IF NOT EXISTS batch_index_batch_hash_platform_uindex +ON blob_upload(batch_index, batch_hash, platform) WHERE deleted_at IS NULL; COMMENT ON COLUMN blob_upload.status IS 'undefined, pending, uploaded, failed'; -CREATE INDEX IF NOT EXISTS idx_blob_upload_batch_index ON blob_upload(batch_index) WHERE deleted_at IS NULL; - -CREATE INDEX IF NOT EXISTS idx_blob_upload_platform ON blob_upload(platform) WHERE deleted_at IS NULL; - -CREATE INDEX IF NOT EXISTS idx_blob_upload_status ON blob_upload(status) WHERE deleted_at IS NULL; - CREATE INDEX IF NOT EXISTS idx_blob_upload_status_platform ON blob_upload(status, platform) WHERE deleted_at IS NULL; -CREATE INDEX IF NOT EXISTS idx_blob_upload_batch_index_status_platform ON blob_upload(batch_index, status, platform) WHERE deleted_at IS NULL; +CREATE INDEX IF NOT EXISTS idx_blob_upload_batch_index_batch_hash_status_platform +ON blob_upload(batch_index, batch_hash, status, platform) WHERE deleted_at IS NULL; -- +goose StatementEnd diff --git a/rollup/internal/controller/blob_uploader/blob_uploader.go b/rollup/internal/controller/blob_uploader/blob_uploader.go index 3b6c6b7591..0ee70ec4ed 100644 --- a/rollup/internal/controller/blob_uploader/blob_uploader.go +++ b/rollup/internal/controller/blob_uploader/blob_uploader.go @@ -2,6 +2,7 @@ package blob_uploader import ( "context" + "errors" "fmt" "github.com/prometheus/client_golang/prometheus" @@ -67,7 +68,7 @@ func (b *BlobUploader) UploadBlobToS3() { } // get un-uploaded batches from database in ascending order by their index. - dbBatch, err := b.batchOrm.GetFirstUnuploadedBatchByPlatform(b.ctx, b.cfg.StartBatch, types.BlobStoragePlatformS3) + dbBatch, err := b.GetFirstUnuploadedBatchByPlatform(b.ctx, b.cfg.StartBatch, types.BlobStoragePlatformS3) if err != nil { log.Error("Failed to fetch unuploaded batch", "err", err) return @@ -85,7 +86,7 @@ func (b *BlobUploader) UploadBlobToS3() { if err != nil { log.Error("failed to construct constructBlobCodec payload ", "codecVersion", codecVersion, "batch index", dbBatch.Index, "err", err) b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc() - if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil { + if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil { log.Error("failed to update blob upload status to failed", "batch index", dbBatch.Index, "err", updateErr) } return @@ -97,7 +98,7 @@ func (b *BlobUploader) UploadBlobToS3() { log.Error("failed to calculate versioned blob hash", "batch index", dbBatch.Index, "err", err) b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc() // update status to failed - if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil { + if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil { log.Error("failed to update blob upload status to failed", "batch index", dbBatch.Index, "err", updateErr) } return @@ -110,14 +111,14 @@ func (b *BlobUploader) UploadBlobToS3() { log.Error("failed to upload blob data to AWS S3", "batch index", dbBatch.Index, "versioned blob hash", key, "err", err) b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc() // update status to failed - if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil { + if updateErr := b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusFailed); updateErr != nil { log.Error("failed to update blob upload status to failed", "batch index", dbBatch.Index, "err", updateErr) } return } // update status to uploaded - if err = b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, types.BlobStoragePlatformS3, types.BlobUploadStatusUploaded); err != nil { + if err = b.blobUploadOrm.InsertOrUpdateBlobUpload(b.ctx, dbBatch.Index, dbBatch.Hash, types.BlobStoragePlatformS3, types.BlobUploadStatusUploaded); err != nil { log.Error("failed to update blob upload status to uploaded", "batch index", dbBatch.Index, "err", err) b.metrics.rollupBlobUploaderUploadToS3FailedTotal.Inc() return @@ -195,3 +196,56 @@ func (b *BlobUploader) constructBlobCodec(dbBatch *orm.Batch) (*kzg4844.Blob, er return daBatch.Blob(), nil } + +// GetFirstUnuploadedBatchByPlatform retrieves the first batch that either hasn't been uploaded to corresponding blob storage service +// The batch must have a commit_tx_hash (committed). +func (b *BlobUploader) GetFirstUnuploadedBatchByPlatform(ctx context.Context, startBatch uint64, platform types.BlobStoragePlatform) (*orm.Batch, error) { + batchIndex, err := b.blobUploadOrm.GetNextBatchIndexToUploadByPlatform(ctx, startBatch, platform) + if err != nil { + return nil, err + } + + var batch *orm.Batch + for { + var err error + batch, err = b.batchOrm.GetBatchByIndex(ctx, batchIndex) + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + log.Debug("got batch not proposed for blob uploading", "batch_index", batchIndex, "platform", platform.String()) + return nil, nil + } + return nil, err + } + + // to check if the parent batch uploaded + // if no, there is a batch revert happened, we need to fallback to upload previous batch + // skip the check if the parent batch is genesis batch + if batchIndex <= 1 || batchIndex == startBatch { + break + } + fields := map[string]interface{}{ + "batch_index = ?": batchIndex - 1, + "batch_hash = ?": batch.ParentBatchHash, + "platform = ?": platform, + "status = ?": types.BlobUploadStatusUploaded, + } + blobUpload, err := b.blobUploadOrm.GetBlobUploads(ctx, fields, nil, 1) + if err != nil { + return nil, err + } + + if len(blobUpload) == 0 { + batchIndex-- + continue + } + + break + } + + if len(batch.CommitTxHash) == 0 { + log.Debug("got batch not committed for blob uploading", "batch_index", batchIndex, "platform", platform.String()) + return nil, nil + } + + return batch, nil +} diff --git a/rollup/internal/orm/batch.go b/rollup/internal/orm/batch.go index 5e036ac6f7..584792fe18 100644 --- a/rollup/internal/orm/batch.go +++ b/rollup/internal/orm/batch.go @@ -263,44 +263,6 @@ func (o *Batch) GetBatchByIndex(ctx context.Context, index uint64) (*Batch, erro return &batch, nil } -// GetFirstUnuploadedBatchByPlatform retrieves the first batch that either hasn't been uploaded to corresponding blob storage service -// The batch must have a commit_tx_hash (committed). -func (o *Batch) GetFirstUnuploadedBatchByPlatform(ctx context.Context, startBatch uint64, platform types.BlobStoragePlatform) (*Batch, error) { - db := o.db.WithContext(ctx) - db = db.Model(&BlobUpload{}) - db = db.Where("platform = ? AND status = ?", platform, types.BlobUploadStatusUploaded) - db = db.Order("batch_index DESC") - db = db.Limit(1) - - var blobUpload BlobUpload - var batchIndex uint64 - if err := db.First(&blobUpload).Error; err != nil { - if errors.Is(err, gorm.ErrRecordNotFound) { - batchIndex = startBatch - } else { - return nil, fmt.Errorf("Batch.GetFirstUnuploadedBatchByPlatform error: %w", err) - } - } else { - batchIndex = blobUpload.BatchIndex + 1 - } - - batch, err := o.GetBatchByIndex(ctx, batchIndex) - if err != nil { - if errors.Is(err, gorm.ErrRecordNotFound) { - log.Debug("got batch not proposed for blob uploading", "batch_index", batchIndex, "platform", platform.String()) - return nil, nil - } - return nil, fmt.Errorf("Batch.GetFirstUnuploadedBatchByPlatform error: %w", err) - } - - if len(batch.CommitTxHash) == 0 { - log.Debug("got batch not committed for blob uploading", "batch_index", batchIndex, "platform", platform.String()) - return nil, nil - } - - return batch, nil -} - // InsertBatch inserts a new batch into the database. func (o *Batch) InsertBatch(ctx context.Context, batch *encoding.Batch, codecVersion encoding.CodecVersion, metrics rutils.BatchMetrics, dbTX ...*gorm.DB) (*Batch, error) { if batch == nil { diff --git a/rollup/internal/orm/blob_upload.go b/rollup/internal/orm/blob_upload.go index 79daa3838a..391be5f31e 100644 --- a/rollup/internal/orm/blob_upload.go +++ b/rollup/internal/orm/blob_upload.go @@ -2,11 +2,11 @@ package orm import ( "context" + "errors" "fmt" "time" "gorm.io/gorm" - "gorm.io/gorm/clause" "scroll-tech/common/types" ) @@ -16,8 +16,9 @@ type BlobUpload struct { db *gorm.DB `gorm:"-"` // blob upload - BatchIndex uint64 `json:"batch_index" gorm:"column:batch_index;primaryKey"` - Platform int16 `json:"platform" gorm:"column:platform;primaryKey"` + BatchIndex uint64 `json:"batch_index" gorm:"column:batch_index"` + BatchHash string `json:"batch_hash" gorm:"column:batch_hash"` + Platform int16 `json:"platform" gorm:"column:platform"` Status int16 `json:"status" gorm:"column:status"` // metadata @@ -36,23 +37,87 @@ func (*BlobUpload) TableName() string { return "blob_upload" } +// GetNextBatchIndexToUploadByPlatform retrieves the next batch index that hasn't been uploaded to corresponding blob storage service +func (o *BlobUpload) GetNextBatchIndexToUploadByPlatform(ctx context.Context, startBatch uint64, platform types.BlobStoragePlatform) (uint64, error) { + db := o.db.WithContext(ctx) + db = db.Model(&BlobUpload{}) + db = db.Where("platform = ? AND status = ?", platform, types.BlobUploadStatusUploaded) + db = db.Order("batch_index DESC") + db = db.Limit(1) + + var blobUpload BlobUpload + var batchIndex uint64 + if err := db.First(&blobUpload).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + batchIndex = startBatch + } else { + return 0, fmt.Errorf("BlobUpload.GetNextBatchIndexToUploadByPlatform error: %w", err) + } + } else { + batchIndex = blobUpload.BatchIndex + 1 + } + + return batchIndex, nil +} + +// GetBlobUpload retrieves the selected blob uploads from the database. +func (o *BlobUpload) GetBlobUploads(ctx context.Context, fields map[string]interface{}, orderByList []string, limit int) ([]*BlobUpload, error) { + db := o.db.WithContext(ctx) + db = db.Model(&BlobUpload{}) + + for key, value := range fields { + db = db.Where(key, value) + } + + for _, orderBy := range orderByList { + db = db.Order(orderBy) + } + + if limit > 0 { + db = db.Limit(limit) + } + + db = db.Order("batch_index ASC") + + var blobUploads []*BlobUpload + if err := db.Find(&blobUploads).Error; err != nil { + return nil, fmt.Errorf("BlobUpload.GetBlobUploads error: %w", err) + } + + return blobUploads, nil +} + // InsertOrUpdateBlobUpload inserts a new blob upload record or updates the existing one. -func (o *BlobUpload) InsertOrUpdateBlobUpload(ctx context.Context, batchIndex uint64, platform types.BlobStoragePlatform, status types.BlobUploadStatus, dbTX ...*gorm.DB) error { +func (o *BlobUpload) InsertOrUpdateBlobUpload(ctx context.Context, batchIndex uint64, batchHash string, platform types.BlobStoragePlatform, status types.BlobUploadStatus, dbTX ...*gorm.DB) error { db := o.db if len(dbTX) > 0 && dbTX[0] != nil { db = dbTX[0] } db = db.WithContext(ctx) - blobUpload := &BlobUpload{ - BatchIndex: batchIndex, - Platform: int16(platform), - Status: int16(status), + + var existing BlobUpload + err := db.Where("batch_index = ? AND batch_hash = ? AND platform = ? AND deleted_at IS NULL", + batchIndex, batchHash, int16(platform), + ).First(&existing).Error + + if errors.Is(err, gorm.ErrRecordNotFound) { + newRecord := BlobUpload{ + BatchIndex: batchIndex, + BatchHash: batchHash, + Platform: int16(platform), + Status: int16(status), + } + if err := db.Create(&newRecord).Error; err != nil { + return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload insert error: %w, batch index: %v, batch_hash: %v, platform: %v", err, batchIndex, batchHash, platform) + } + return nil + } else if err != nil { + return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload query error: %w, batch index: %v, batch_hash: %v, platform: %v", err, batchIndex, batchHash, platform) } - if err := db.Clauses(clause.OnConflict{ - Columns: []clause.Column{{Name: "batch_index"}, {Name: "platform"}}, - DoUpdates: clause.AssignmentColumns([]string{"status"}), - }).Create(blobUpload).Error; err != nil { - return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload error: %w, batch index: %v, platform: %v", err, batchIndex, platform) + + if err := db.Model(&existing).Update("status", int16(status)).Error; err != nil { + return fmt.Errorf("BlobUpload.InsertOrUpdateBlobUpload update error: %w, batch index: %v, batch_hash: %v, platform: %v", err, batchIndex, batchHash, platform) } + return nil }