-
Notifications
You must be signed in to change notification settings - Fork 4.3k
feat:large-row-skip-in-bigtable | added experimental options to skip … #34245
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
feat:large-row-skip-in-bigtable | added experimental options to skip … #34245
Conversation
…large rows while reading from bigtable
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
...a/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
Outdated
Show resolved
Hide resolved
...a/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
Show resolved
Hide resolved
...a/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
Show resolved
Hide resolved
try { | ||
stream = | ||
client | ||
.skipLargeRowsCallable(new BigtableRowProtoAdapter()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forget, does this throw an exception with the large rows, or just silently swallow them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it swallows them & returns the next non-large row
Retest this please |
Assigning reviewers. If you would like to opt out of this review, comment R: @Abacn for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
* | ||
* <p>This is incompatible with withMaxBufferElementCount() | ||
*/ | ||
public Read withExperimentalSkipLargeRows(@Nullable Boolean skipLargeRows) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please dont encode feature status in the public abi. The makes it impossible to evolve the feature into ga status. If this feature is not ready for GA, then please use
ExperimentalOptions which we already use for BIGTABLE_ENABLE_CLIENT_SIDE_METRICS
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@justinuang - will we go with GA for skipLargeRows
or do we want to keep it experiemental - if yes, why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use the experimental flag because it's not final state we want to have. The example code Igor mentioned:
Line 131 in 3b5a2b6
if (ExperimentalOptions.hasExperiment(pipelineOptions, BIGTABLE_ENABLE_CLIENT_SIDE_METRICS)) { |
if (bigtableReadOptions != null | ||
&& Boolean.TRUE.equals(bigtableReadOptions.getExperimentalSkipLargeRows())) { | ||
stream = | ||
client | ||
.skipLargeRowsCallable(new BigtableRowProtoAdapter()) | ||
.call(query, GrpcCallContext.createDefault()); | ||
} else { | ||
stream = | ||
client | ||
.readRowsCallable(new BigtableRowProtoAdapter()) | ||
.call(query, GrpcCallContext.createDefault()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolving of the feature should be done during pipeline construction not during execution
Also you should factor out the common code:
readRowsCallable = client.skipLargeRowsCallable(new BigtableRowProtoAdapter())
if (isLargeRowSkippingEnabled) {
readRowsCallable = client.readRowsCallable(new BigtableRowProtoAdapter());
}
readRowsCallable.call(...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
regarding code suggestion -
instead of having if-else,
do we want to override the code later? -
Resolving of the feature should be done during pipeline construction not during execution
initially, i had made this into a separate reader class itself.
since this was a experimental option, It came out in the discussion that we dont want the overhead of maintaining this as a separate reader for new changes.
and I had to revert these changes - 50f7924
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can pass in a unary callable when creating the BigtableReaderImpl instead of the client:
Lines 653 to 672 in 3b5a2b6
public Reader createReader(BigtableSource source) throws IOException { | |
if (source.getMaxBufferElementCount() != null) { | |
return BigtableSegmentReaderImpl.create( | |
client, | |
projectId, | |
instanceId, | |
source.getTableId().get(), | |
source.getRanges(), | |
source.getRowFilter(), | |
source.getMaxBufferElementCount()); | |
} else { | |
return new BigtableReaderImpl( | |
client, | |
projectId, | |
instanceId, | |
source.getTableId().get(), | |
source.getRanges(), | |
source.getRowFilter()); | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I meant keep the unary callable where it is, but resolve unset value in the settings. ie this logic:
bigtableReadOptions != null
&& Boolean.TRUE.equals(bigtableReadOptions.getExperimentalSkipLargeRows()
What unset means should be resolved during pipeline construction
new BigtableOptions.Builder().setProjectId(project).setInstanceId(options.getInstanceId()); | ||
|
||
final String tableId = "BigtableReadTest"; | ||
final long numRows = 1000L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how does this test large row skipping?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this doesn't.
the main logic lies in the java-client. Apache beam implementation is only a wrapper to call that implementation. I couldn't figure out - how to test the large row skipping in a IT here - it's already being done in the java-client.
it came out in our discussion earlier, that we need a data integrity check where no data loss should happen.
hence, this is a check for data integrity - that if there isn't a large row, the feature still works as expected - reading all the rows.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can do a bit better here
testE2EBigtableReadWithSkippingLargeRows() {
//...
// add an error injector to trigger large row logic
ExperimentalOptions.addExperiment("bigtable_settings_override", InterceptorInjector.class.getName());
//...
}
static class LargeRowErrorInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
private boolean artificiallyClosed = false;
private int numMsgs = 0;
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>() {
@Override
public void onMessage(RespT message) {
if (++numMsgs > 10) {
artificiallyClosed = true;
delegate().onClose(
Status.WHATEVER_ERROR_TRIGGERS_PAGING,
new Metadata()
);
return;
}
super.onMessage(message);
}
@Override
public void onClose(Status status, Metadata trailers) {
if (!artificiallyClosed) {
super.onClose(status, trailers);
}
}
}, headers);
}
};
}
}
public static class InterceptorInjector implements BiFunction<BigtableDataSettings.Builder, PipelineOptions,
BigtableDataSettings.Builder> {
@Override
public BigtableDataSettings.Builder apply(BigtableDataSettings.Builder builder, PipelineOptions pipelineOptions) {
InstantiatingGrpcChannelProvider.Builder transportChannelProvider = ((InstantiatingGrpcChannelProvider) builder.stubSettings()
.getTransportChannelProvider())
.toBuilder();
ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> oldConf = transportChannelProvider.getChannelConfigurator();
transportChannelProvider.setChannelConfigurator(b -> {
if (oldConf!=null) {
b = oldConf.apply(b);
}
return b.intercept(new LargeRowErrorInterceptor());
});
return null;
}
}
…large rows while reading from bigtable
Please add a meaningful description for your change here
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.