Skip to content

feat:large-row-skip-in-bigtable | added experimental options to skip … #34245

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from

Conversation

sarthakbhutani
Copy link
Contributor

…large rows while reading from bigtable

Please add a meaningful description for your change here


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Copy link
Contributor

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

try {
stream =
client
.skipLargeRowsCallable(new BigtableRowProtoAdapter())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forget, does this throw an exception with the large rows, or just silently swallow them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it swallows them & returns the next non-large row

@chamikaramj
Copy link
Contributor

Retest this please

@sarthakbhutani
Copy link
Contributor Author

not sure why spotless check is failing - this is related to code formatting. Working fine on local.

image

Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @Abacn for label java.
R: @djyau for label bigtable.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

*
* <p>This is incompatible with withMaxBufferElementCount()
*/
public Read withExperimentalSkipLargeRows(@Nullable Boolean skipLargeRows) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please dont encode feature status in the public abi. The makes it impossible to evolve the feature into ga status. If this feature is not ready for GA, then please use
ExperimentalOptions which we already use for BIGTABLE_ENABLE_CLIENT_SIDE_METRICS

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@justinuang - will we go with GA for skipLargeRows
or do we want to keep it experiemental - if yes, why?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's use the experimental flag because it's not final state we want to have. The example code Igor mentioned:

if (ExperimentalOptions.hasExperiment(pipelineOptions, BIGTABLE_ENABLE_CLIENT_SIDE_METRICS)) {

Comment on lines +180 to +191
if (bigtableReadOptions != null
&& Boolean.TRUE.equals(bigtableReadOptions.getExperimentalSkipLargeRows())) {
stream =
client
.skipLargeRowsCallable(new BigtableRowProtoAdapter())
.call(query, GrpcCallContext.createDefault());
} else {
stream =
client
.readRowsCallable(new BigtableRowProtoAdapter())
.call(query, GrpcCallContext.createDefault());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Resolving of the feature should be done during pipeline construction not during execution

Also you should factor out the common code:

readRowsCallable = client.skipLargeRowsCallable(new BigtableRowProtoAdapter())
if (isLargeRowSkippingEnabled) {
  readRowsCallable = client.readRowsCallable(new BigtableRowProtoAdapter());
}
readRowsCallable.call(...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. regarding code suggestion -
    instead of having if-else,
    do we want to override the code later?

  2. Resolving of the feature should be done during pipeline construction not during execution
    initially, i had made this into a separate reader class itself.
    since this was a experimental option, It came out in the discussion that we dont want the overhead of maintaining this as a separate reader for new changes.

and I had to revert these changes - 50f7924

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can pass in a unary callable when creating the BigtableReaderImpl instead of the client:

public Reader createReader(BigtableSource source) throws IOException {
if (source.getMaxBufferElementCount() != null) {
return BigtableSegmentReaderImpl.create(
client,
projectId,
instanceId,
source.getTableId().get(),
source.getRanges(),
source.getRowFilter(),
source.getMaxBufferElementCount());
} else {
return new BigtableReaderImpl(
client,
projectId,
instanceId,
source.getTableId().get(),
source.getRanges(),
source.getRowFilter());
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I meant keep the unary callable where it is, but resolve unset value in the settings. ie this logic:

bigtableReadOptions != null
            && Boolean.TRUE.equals(bigtableReadOptions.getExperimentalSkipLargeRows()

What unset means should be resolved during pipeline construction

new BigtableOptions.Builder().setProjectId(project).setInstanceId(options.getInstanceId());

final String tableId = "BigtableReadTest";
final long numRows = 1000L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does this test large row skipping?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this doesn't.
the main logic lies in the java-client. Apache beam implementation is only a wrapper to call that implementation. I couldn't figure out - how to test the large row skipping in a IT here - it's already being done in the java-client.

it came out in our discussion earlier, that we need a data integrity check where no data loss should happen.
hence, this is a check for data integrity - that if there isn't a large row, the feature still works as expected - reading all the rows.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can do a bit better here

testE2EBigtableReadWithSkippingLargeRows() {
   //...
   // add an error injector to trigger large row logic
   ExperimentalOptions.addExperiment("bigtable_settings_override", InterceptorInjector.class.getName());
  //...
}
static class LargeRowErrorInterceptor implements ClientInterceptor {
        @Override
        public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
            return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
                private boolean artificiallyClosed = false;
                private int numMsgs = 0;

                @Override
                public void start(Listener<RespT> responseListener, Metadata headers) {
                    super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>() {
                        @Override
                        public void onMessage(RespT message) {
                            if (++numMsgs > 10) {
                                artificiallyClosed = true;
                                delegate().onClose(
                                        Status.WHATEVER_ERROR_TRIGGERS_PAGING,
                                        new Metadata()
                                );
                                return;
                            }
                            super.onMessage(message);
                        }

                        @Override
                        public void onClose(Status status, Metadata trailers) {
                            if (!artificiallyClosed) {
                                super.onClose(status, trailers);
                            }
                        }
                    }, headers);
                }
            };
        }
    }
    public static class InterceptorInjector implements BiFunction<BigtableDataSettings.Builder, PipelineOptions,
            BigtableDataSettings.Builder> {
        @Override
        public BigtableDataSettings.Builder apply(BigtableDataSettings.Builder builder, PipelineOptions pipelineOptions) {
            InstantiatingGrpcChannelProvider.Builder transportChannelProvider = ((InstantiatingGrpcChannelProvider) builder.stubSettings()
                    .getTransportChannelProvider())
                            .toBuilder();
            ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> oldConf = transportChannelProvider.getChannelConfigurator();
            transportChannelProvider.setChannelConfigurator(b -> {
                if (oldConf!=null) {
                    b = oldConf.apply(b);
                }
                return b.intercept(new LargeRowErrorInterceptor());
            });
            return null;
        }
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants