Skip to content

Conversation

@shibd
Copy link
Member

@shibd shibd commented Nov 4, 2024

Motivation

For azure blob storage, When sink some batch message, somtime will get an error for azure blob storage:

2024-11-04T01:43:57,072+0000 [pulsar-io-cloud-storage-sink-flush-0] ERROR org.apache.pulsar.io.jcloud.sink.BlobStoreAbstractSink - Encountered unknown error writing to blob abhijith/testing-eh-kaka-sync/unbuffered-telemetryfeed-partition-28/2024-10-30/799132352513.json
com.azure.storage.blob.models.BlobStorageException: Status code 409, "<?xml version="1.0" encoding="utf-8"?><Error><Code>BlobAlreadyExists</Code><Message>The specified blob already exists.
RequestId:4506238d-001e-00df-085b-2eae40000000
Time:2024-11-04T01:43:57.0681430Z</Message></Error>"
	at java.lang.invoke.MethodHandle.invokeWithArguments(MethodHandle.java:732) ~[?:?]
	at com.azure.core.implementation.MethodHandleReflectiveInvoker.invokeWithArguments(MethodHandleReflectiveInvoker.java:39) ~[azure-core-1.47.0.jar:1.47.0]
	at com.azure.core.implementation.http.rest.ResponseExceptionConstructorCache.invoke(ResponseExceptionConstructorCache.java:53) ~[azure-core-1.47.0.jar:1.47.0]
	at com.azure.core.implementation.http.rest.RestProxyBase.instantiateUnexpectedException(RestProxyBase.java:411) ~[azure-core-1.47.0.jar:1.47.0]
	at com.azure.core.implementation.http.rest.AsyncRestProxy.lambda$ensureExpectedStatus$1(AsyncRestProxy.java:132) ~[azure-core-1.47.0.jar:1.47.0]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2400) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2196) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2070) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151) ~[reactor-core-3.4.34.jar:3.4.34]
	at reactor.core.publisher.MonoCreate$DefaultMonoSink.success(MonoCreate.java:172) ~[reactor-core-3.4.34.jar:3.4.34]

For AWS s3, will overwrite file: https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html

The root cause is here used getSequence

return record.getRecordSequence()
.orElseThrow(() -> new RuntimeException("found empty recordSequence"));

And then, in pulsar, just use LedgerId + EntryId to gen sequence.

https://github.com/apache/pulsar/blob/bbc62245c5ddba1de4b1e7cee4ab49334bc36277/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java#L289-L299

So, maybe get the same file name for two flush request.

Modifications

Currently, the implementation is not clear. For example, a file named 3221225506.json is difficult for users to understand in terms of the sequence's meaning.

The connectors we are currently running may have data being overwritten, leading to potential loss.

It would be better to directly use the messageId for clarity. If using a long type is insufficient to represent ledgerId + entryId + batchIndex

  • Return messageId by default. the file name format is: xx/xx/xx/18:2:1:24.json.

Verifying this change

  • Add AbstractPartitionerTest to cover it, and verify on my localhost
image

Documentation

Check the box below.

Need to update docs?

  • doc-required

    (If you need help on updating docs, create a doc issue)

  • [] no-need-doc

    (Please explain why)

  • doc

    (If this PR contains doc changes)

@shibd shibd requested a review from a team as a code owner November 4, 2024 09:16
@github-actions github-actions bot added the doc This pr contains a document label Nov 4, 2024
@shibd shibd changed the title Fix 409 new Fix duplication file name for batch message Nov 4, 2024
@shibd shibd changed the title Fix duplication file name for batch message Use messageId instead of sequence as file name by default Nov 4, 2024
@shibd shibd merged commit de86331 into streamnative:master Nov 5, 2024
1 check passed
shibd added a commit that referenced this pull request Nov 5, 2024
* Use message id as file name by default

* Allow overwirte file for azure cloud storage

* Use period as separator

* Update docs

(cherry picked from commit de86331)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc This pr contains a document

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants