From 839f54844ccad546cbf9af6ab57059f4f70a34c1 Mon Sep 17 00:00:00 2001 From: nischit Date: Mon, 27 Oct 2025 15:20:28 +0545 Subject: [PATCH 1/6] minor change --- internal/committer/reorg.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/committer/reorg.go b/internal/committer/reorg.go index 2a03e01..9bb9d1b 100644 --- a/internal/committer/reorg.go +++ b/internal/committer/reorg.go @@ -57,7 +57,7 @@ func getReorgRange() (int64, int64, error) { endBlock = min(endBlock-5, startBlock+100) // lag by some blocks for safety if startBlock >= endBlock { - return 0, 0, fmt.Errorf("start block is greater than end block") + return 0, 0, fmt.Errorf("start block is greater than end block (%d >= %d)", startBlock, endBlock) } return startBlock, endBlock, nil From 061cc55d541b0301b3d1fcfbe8060ea1739b9848 Mon Sep 17 00:00:00 2001 From: nischit Date: Mon, 27 Oct 2025 17:53:30 +0545 Subject: [PATCH 2/6] right size s3 committer after it is caught up --- configs/config.go | 1 + internal/backfill/getbackfillboundaries.go | 2 +- internal/committer/poollatest.go | 8 +++++++ .../insightServiceRequests.go} | 23 +++++++++++++------ 4 files changed, 26 insertions(+), 8 deletions(-) rename internal/{backfill/disableIndexerMaybeStartCommitter.go => libs/insightServiceRequests.go} (70%) diff --git a/configs/config.go b/configs/config.go index c94cc87..8147e80 100644 --- a/configs/config.go +++ b/configs/config.go @@ -60,6 +60,7 @@ type Config struct { CommitterMaxMemoryMB int `env:"COMMITTER_MAX_MEMORY_MB" envDefault:"512"` CommitterCompressionThresholdMB int `env:"COMMITTER_COMPRESSION_THRESHOLD_MB" envDefault:"50"` CommitterKafkaBatchSize int `env:"COMMITTER_KAFKA_BATCH_SIZE" envDefault:"500"` + CommitterIsLive bool `env:"COMMITTER_IS_LIVE" envDefault:"false"` StagingS3Bucket string `env:"STAGING_S3_BUCKET" envDefault:"thirdweb-insight-production"` StagingS3Region string `env:"STAGING_S3_REGION" envDefault:"us-west-2"` StagingS3AccessKeyID string `env:"STAGING_S3_ACCESS_KEY_ID"` diff --git a/internal/backfill/getbackfillboundaries.go b/internal/backfill/getbackfillboundaries.go index 6fd1508..3efbd6e 100644 --- a/internal/backfill/getbackfillboundaries.go +++ b/internal/backfill/getbackfillboundaries.go @@ -22,7 +22,7 @@ func GetBackfillBoundaries() (uint64, uint64) { if startBlock > endBlock { // since indexing is done, we call insight service to disable the indexer - DisableIndexerMaybeStartCommitter() + libs.DisableIndexerMaybeStartCommitter() // most likely this will not be called as this service will be paused. but a panic just incase log.Panic(). Uint64("start_block", startBlock). diff --git a/internal/committer/poollatest.go b/internal/committer/poollatest.go index 4fb39bf..ac09d76 100644 --- a/internal/committer/poollatest.go +++ b/internal/committer/poollatest.go @@ -74,5 +74,13 @@ func pollLatest() error { // Update nextCommitBlockNumber for next iteration nextBlockNumber = expectedBlockNumber metrics.CommitterNextBlockNumber.WithLabelValues(indexerName, chainIdStr).Set(float64(nextBlockNumber)) + + if !config.Cfg.CommitterIsLive && latestBlock.Int64()-int64(nextBlockNumber) < 20 { + log.Debug(). + Uint64("latest_block", latestBlock.Uint64()). + Uint64("next_commit_block", nextBlockNumber). + Msg("Latest block is close to next commit block. Resizing s3 committer") + libs.RightsizeS3Committer() + } } } diff --git a/internal/backfill/disableIndexerMaybeStartCommitter.go b/internal/libs/insightServiceRequests.go similarity index 70% rename from internal/backfill/disableIndexerMaybeStartCommitter.go rename to internal/libs/insightServiceRequests.go index ca02f0a..73f9406 100644 --- a/internal/backfill/disableIndexerMaybeStartCommitter.go +++ b/internal/libs/insightServiceRequests.go @@ -1,4 +1,4 @@ -package backfill +package libs import ( "bytes" @@ -9,7 +9,6 @@ import ( "github.com/rs/zerolog/log" config "github.com/thirdweb-dev/indexer/configs" - "github.com/thirdweb-dev/indexer/internal/libs" ) type DeployS3CommitterRequest struct { @@ -17,6 +16,15 @@ type DeployS3CommitterRequest struct { } func DisableIndexerMaybeStartCommitter() { + makeS3CommitterRequest("deploy-s3-committer") +} + +func RightsizeS3Committer() { + makeS3CommitterRequest("rightsize-s3-committer") +} + +// makeS3CommitterRequest is a common function to make HTTP requests to the insight service +func makeS3CommitterRequest(endpoint string) { serviceURL := config.Cfg.InsightServiceUrl apiKey := config.Cfg.InsightServiceApiKey zeetDeploymentId := config.Cfg.ZeetDeploymentId @@ -33,7 +41,7 @@ func DisableIndexerMaybeStartCommitter() { } // Create HTTP request - url := fmt.Sprintf("%s/service/chains/%s/deploy-s3-committer", serviceURL, libs.ChainIdStr) + url := fmt.Sprintf("%s/service/chains/%s/%s", serviceURL, ChainIdStr, endpoint) req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) if err != nil { log.Error().Err(err).Msg("Failed to create HTTP request") @@ -52,12 +60,13 @@ func DisableIndexerMaybeStartCommitter() { // Send request log.Info(). Str("url", url). + Str("endpoint", endpoint). Str("zeetDeploymentId", zeetDeploymentId). - Msg("Sending deploy-s3-committer request to disable indexer") + Msgf("Sending %s request", endpoint) resp, err := client.Do(req) if err != nil { - log.Error().Err(err).Msg("Failed to send HTTP request") + log.Error().Err(err).Msgf("Failed to send %s request", endpoint) return } defer resp.Body.Close() @@ -66,10 +75,10 @@ func DisableIndexerMaybeStartCommitter() { if resp.StatusCode >= 200 && resp.StatusCode < 300 { log.Info(). Int("statusCode", resp.StatusCode). - Msg("Successfully sent deploy-s3-committer request. Indexer disabled") + Msgf("Successfully sent %s request", endpoint) } else { log.Error(). Int("statusCode", resp.StatusCode). - Msg("Deploy-s3-committer request failed. Could not disable indexer") + Msgf("%s request failed", endpoint) } } From 538c7f0eaf752c8dd9939cf37b6636b70fcd7ab9 Mon Sep 17 00:00:00 2001 From: nischit Date: Mon, 27 Oct 2025 18:00:45 +0545 Subject: [PATCH 3/6] minor change --- internal/committer/poollatest.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/committer/poollatest.go b/internal/committer/poollatest.go index ac09d76..ae28c0e 100644 --- a/internal/committer/poollatest.go +++ b/internal/committer/poollatest.go @@ -17,6 +17,7 @@ func pollLatest() error { // Initialize metrics labels chainIdStr := libs.ChainIdStr indexerName := config.Cfg.ZeetProjectName + isRightsizing := false for { latestBlock, err := libs.RpcClient.GetLatestBlockNumber(context.Background()) @@ -75,7 +76,8 @@ func pollLatest() error { nextBlockNumber = expectedBlockNumber metrics.CommitterNextBlockNumber.WithLabelValues(indexerName, chainIdStr).Set(float64(nextBlockNumber)) - if !config.Cfg.CommitterIsLive && latestBlock.Int64()-int64(nextBlockNumber) < 20 { + if !config.Cfg.CommitterIsLive && latestBlock.Int64()-int64(nextBlockNumber) < 20 && !isRightsizing { + isRightsizing = true log.Debug(). Uint64("latest_block", latestBlock.Uint64()). Uint64("next_commit_block", nextBlockNumber). From 3cd7742c09d313af4ac45d1a5a18e715ff2d7c7a Mon Sep 17 00:00:00 2001 From: nischit Date: Mon, 27 Oct 2025 18:01:54 +0545 Subject: [PATCH 4/6] minor change --- internal/committer/poollatest.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/committer/poollatest.go b/internal/committer/poollatest.go index ae28c0e..fc2af26 100644 --- a/internal/committer/poollatest.go +++ b/internal/committer/poollatest.go @@ -17,7 +17,7 @@ func pollLatest() error { // Initialize metrics labels chainIdStr := libs.ChainIdStr indexerName := config.Cfg.ZeetProjectName - isRightsizing := false + hasRightsized := false for { latestBlock, err := libs.RpcClient.GetLatestBlockNumber(context.Background()) @@ -76,13 +76,13 @@ func pollLatest() error { nextBlockNumber = expectedBlockNumber metrics.CommitterNextBlockNumber.WithLabelValues(indexerName, chainIdStr).Set(float64(nextBlockNumber)) - if !config.Cfg.CommitterIsLive && latestBlock.Int64()-int64(nextBlockNumber) < 20 && !isRightsizing { - isRightsizing = true + if !config.Cfg.CommitterIsLive && latestBlock.Int64()-int64(nextBlockNumber) < 20 && !hasRightsized { log.Debug(). Uint64("latest_block", latestBlock.Uint64()). Uint64("next_commit_block", nextBlockNumber). Msg("Latest block is close to next commit block. Resizing s3 committer") libs.RightsizeS3Committer() + hasRightsized = true } } } From b733663f74e5a5ce1ed8116302f0445c806128ec Mon Sep 17 00:00:00 2001 From: nischit Date: Mon, 27 Oct 2025 18:04:10 +0545 Subject: [PATCH 5/6] no need to check for reorg when indexer is not live --- internal/committer/reorg.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/internal/committer/reorg.go b/internal/committer/reorg.go index 9bb9d1b..26501ab 100644 --- a/internal/committer/reorg.go +++ b/internal/committer/reorg.go @@ -16,6 +16,11 @@ func InitReorg() { } func RunReorgValidator() { + // indexer is not live, so we don't need to check for reorgs + if !config.Cfg.CommitterIsLive { + return + } + lastBlockCheck := int64(0) for { startBlock, endBlock, err := getReorgRange() From 9c465bfd76b6b9d3c804abbee3b8217525b4b8d3 Mon Sep 17 00:00:00 2001 From: nischit Date: Mon, 27 Oct 2025 18:17:18 +0545 Subject: [PATCH 6/6] is indexer live metric --- internal/committer/poollatest.go | 4 ++++ internal/metrics/metrics.go | 5 +++++ 2 files changed, 9 insertions(+) diff --git a/internal/committer/poollatest.go b/internal/committer/poollatest.go index fc2af26..9380044 100644 --- a/internal/committer/poollatest.go +++ b/internal/committer/poollatest.go @@ -76,6 +76,10 @@ func pollLatest() error { nextBlockNumber = expectedBlockNumber metrics.CommitterNextBlockNumber.WithLabelValues(indexerName, chainIdStr).Set(float64(nextBlockNumber)) + if config.Cfg.CommitterIsLive { + metrics.CommitterIsLive.WithLabelValues(indexerName, chainIdStr).Set(1) + } + if !config.Cfg.CommitterIsLive && latestBlock.Int64()-int64(nextBlockNumber) < 20 && !hasRightsized { log.Debug(). Uint64("latest_block", latestBlock.Uint64()). diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 6791c1d..8a7f417 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -94,4 +94,9 @@ var ( Name: "committer_rpc_retries_total", Help: "The total number of RPC retries", }, []string{"project_name", "chain_id"}) + + CommitterIsLive = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "committer_is_live", + Help: "Whether the committer is live", + }, []string{"project_name", "chain_id"}) )