Skip to content

parallel batching of large join requests #994

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 10, 2025
Merged

Conversation

hzding621
Copy link
Collaborator

@hzding621 hzding621 commented May 16, 2025

Summary

reduce batch size for really large batched join requests. there are sequential processing in fetchJoin kvstoreFuture.map{...} which becomes the bottleneck when the batch size is really large.

Why / Goal

latency reduction

Test Plan

  • Added Unit Tests
  • Covered by existing CI
  • Integration tested

integration testing:

  • observed p99 latency reduction 600ms => 200ms (traffic: batch size ~150, ~17 join parts)

Checklist

  • Documentation update

Reviewers

@pengyu-hou @airbnb/airbnb-chronon-maintainers

@@ -234,6 +234,18 @@ class Fetcher(val kvStore: KVStore,
}

override def fetchJoin(requests: scala.collection.Seq[Request],
Copy link
Collaborator

@nikhil-zlai nikhil-zlai May 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the behavior becomes backwards incompatible - shall we introduce a new method fetchJoinChunked(join, keys, chunkSize) instead?

Also it is better to be explicit with these.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm planning to make it a configurable parameter at Fetcher level.

Copy link
Collaborator Author

@hzding621 hzding621 Jun 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nikhil-zlai why not make the chunking behavior the new default?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated to keep existing behavior, ptal

@hzding621 hzding621 force-pushed the haozhen--par-map-3 branch from fd097f5 to e730f3a Compare June 5, 2025 23:23
@hzding621 hzding621 marked this pull request as ready for review June 5, 2025 23:24
@@ -94,7 +94,8 @@ class Fetcher(val kvStore: KVStore,
callerName: String = null,
flagStore: FlagStore = null,
disableErrorThrows: Boolean = false,
executionContextOverride: ExecutionContext = null)
executionContextOverride: ExecutionContext = null,
joinFetchParallelChunkSize: Option[Int] = Some(32))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we are going to keep this and not split like Nikhil mentioned, it might be good to keep the default behavior as before (unchunked) and allow clients to override if they want - wdyt?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. updated!

val batches = requests.grouped(joinFetchParallelChunkSize.get).toSeq
val batchFutures: Seq[Future[Seq[Response]]] =
batches.map(batch => doFetchJoin(batch, joinConf))
Future.sequence(batchFutures).map(_.flatten)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an idea here for the chunking might be to return individual chunked Futures rather than sequencing. With sequencing you are having to wait for all to get back, with individual futures you are getting a behavior akin to streaming responses back so the caller can start processing / acting on them

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Valid point. This is what GPT said:

val batches = requests.grouped(joinFetchParallelChunkSize.get).toSeq

// Return individual futures instead of a single combined future
val responseFutures: Seq[Future[Seq[Response]]] = batches.map { batch =>
  doFetchJoin(batch, joinConf)
}

// Don't use Future.sequence - instead, process each future individually
responseFutures.foreach { future =>
  future.foreach { responses =>
    // Process each batch of responses as soon as it arrives
    processResponses(responses)  // Your processing function
  }
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@piyush-zlai @pengyu-hou thanks for the suggestion! updated and added a separate entry point for a chunked api in both scala and java fetcher

Copy link
Collaborator Author

@hzding621 hzding621 Jun 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so I included two flows:

  • existing fetchJoin will use Fetcher-level configuration (default to un-chunked). this is useful for clients who'd like to mostly keep existing behavior, but still able to enable chunking (but only at global level). I think Airbnb users will most likely use this flow.
  • new fetchJoinChunked for users who'd like to have fine-grained control. this API returns a Seq<Future<Seq<Response>>> for streaming-like behavior.

@@ -45,7 +45,8 @@ class FetcherBase(kvStore: KVStore,
debug: Boolean = false,
flagStore: FlagStore = null,
disableErrorThrows: Boolean = false,
executionContextOverride: ExecutionContext = null)
executionContextOverride: ExecutionContext = null,
joinFetchParallelChunkSize: Option[Int] = Some(32))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use something like Option(System.getProperty("ai.chronon.fetcher. join_fetch_parallel_chunk_size")) to pass the default chunk size?

@hzding621 hzding621 merged commit 44cb75c into main Jun 10, 2025
7 checks passed
@hzding621 hzding621 deleted the haozhen--par-map-3 branch June 10, 2025 00:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants