-
Notifications
You must be signed in to change notification settings - Fork 71
parallel batching of large join requests #994
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -234,6 +234,18 @@ class Fetcher(val kvStore: KVStore, | |||
} | |||
|
|||
override def fetchJoin(requests: scala.collection.Seq[Request], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the behavior becomes backwards incompatible - shall we introduce a new method fetchJoinChunked(join, keys, chunkSize)
instead?
Also it is better to be explicit with these.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i'm planning to make it a configurable parameter at Fetcher level.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nikhil-zlai why not make the chunking behavior the new default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated to keep existing behavior, ptal
fd097f5
to
e730f3a
Compare
@@ -94,7 +94,8 @@ class Fetcher(val kvStore: KVStore, | |||
callerName: String = null, | |||
flagStore: FlagStore = null, | |||
disableErrorThrows: Boolean = false, | |||
executionContextOverride: ExecutionContext = null) | |||
executionContextOverride: ExecutionContext = null, | |||
joinFetchParallelChunkSize: Option[Int] = Some(32)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we are going to keep this and not split like Nikhil mentioned, it might be good to keep the default behavior as before (unchunked) and allow clients to override if they want - wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. updated!
val batches = requests.grouped(joinFetchParallelChunkSize.get).toSeq | ||
val batchFutures: Seq[Future[Seq[Response]]] = | ||
batches.map(batch => doFetchJoin(batch, joinConf)) | ||
Future.sequence(batchFutures).map(_.flatten) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
an idea here for the chunking might be to return individual chunked Futures rather than sequencing. With sequencing you are having to wait for all to get back, with individual futures you are getting a behavior akin to streaming responses back so the caller can start processing / acting on them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Valid point. This is what GPT said:
val batches = requests.grouped(joinFetchParallelChunkSize.get).toSeq
// Return individual futures instead of a single combined future
val responseFutures: Seq[Future[Seq[Response]]] = batches.map { batch =>
doFetchJoin(batch, joinConf)
}
// Don't use Future.sequence - instead, process each future individually
responseFutures.foreach { future =>
future.foreach { responses =>
// Process each batch of responses as soon as it arrives
processResponses(responses) // Your processing function
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@piyush-zlai @pengyu-hou thanks for the suggestion! updated and added a separate entry point for a chunked api in both scala and java fetcher
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so I included two flows:
- existing fetchJoin will use Fetcher-level configuration (default to un-chunked). this is useful for clients who'd like to mostly keep existing behavior, but still able to enable chunking (but only at global level). I think Airbnb users will most likely use this flow.
- new fetchJoinChunked for users who'd like to have fine-grained control. this API returns a
Seq<Future<Seq<Response>>>
for streaming-like behavior.
@@ -45,7 +45,8 @@ class FetcherBase(kvStore: KVStore, | |||
debug: Boolean = false, | |||
flagStore: FlagStore = null, | |||
disableErrorThrows: Boolean = false, | |||
executionContextOverride: ExecutionContext = null) | |||
executionContextOverride: ExecutionContext = null, | |||
joinFetchParallelChunkSize: Option[Int] = Some(32)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we use something like Option(System.getProperty("ai.chronon.fetcher. join_fetch_parallel_chunk_size"))
to pass the default chunk size?
Summary
reduce batch size for really large batched join requests. there are sequential processing in fetchJoin kvstoreFuture.map{...} which becomes the bottleneck when the batch size is really large.
Why / Goal
latency reduction
Test Plan
integration testing:
Checklist
Reviewers
@pengyu-hou @airbnb/airbnb-chronon-maintainers