Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Add the dependency in your `pom.xml` file:
<dependency>
<groupId>com.pipedream</groupId>
<artifactId>pipedream</artifactId>
<version>1.0.5</version>
<version>1.0.6</version>
</dependency>
```

Expand All @@ -45,10 +45,10 @@ public class Example {
.builder()
.clientId("<clientId>")
.clientSecret("<clientSecret>")
.projectId("YOUR_PROJECT_ID")
.build();

client.actions().run(
"project_id",
RunActionOpts
.builder()
.id("id")
Expand Down Expand Up @@ -93,9 +93,9 @@ When the API returns a non-success status code (4xx or 5xx response), an API exc
```java
import com.pipedream.api.core.PipedreamApiApiException;

try {
try{
client.actions().run(...);
} catch (PipedreamApiApiException e) {
} catch (PipedreamApiApiException e){
// Do something with the API exception...
}
```
Expand Down
5 changes: 3 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter-engine:5.8.2'
testImplementation 'org.junit.jupiter:junit-jupiter-params:5.8.2'
api 'org.apache.commons:commons-text:1.13.1'
testImplementation 'com.squareup.okhttp3:mockwebserver:4.12.0'
}


Expand All @@ -48,7 +49,7 @@ java {

group = 'com.pipedream'

version = '1.0.5'
version = '1.0.6'

jar {
dependsOn(":generatePomFileForMavenPublication")
Expand Down Expand Up @@ -79,7 +80,7 @@ publishing {
maven(MavenPublication) {
groupId = 'com.pipedream'
artifactId = 'pipedream'
version = '1.0.5'
version = '1.0.6'
from components.java
pom {
name = 'pipedream'
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/pipedream/api/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ private ClientOptions(
this.headers.putAll(headers);
this.headers.putAll(new HashMap<String, String>() {
{
put("User-Agent", "com.pipedream:pipedream/1.0.5");
put("User-Agent", "com.pipedream:pipedream/1.0.6");
put("X-Fern-Language", "JAVA");
put("X-Fern-SDK-Name", "com.pipedream.fern:api-sdk");
put("X-Fern-SDK-Version", "1.0.5");
put("X-Fern-SDK-Version", "1.0.6");
}
});
this.headerSuppliers = headerSuppliers;
Expand Down
82 changes: 56 additions & 26 deletions src/main/java/com/pipedream/api/core/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ private final class SSEIterator implements Iterator<T> {
private T nextItem;
private boolean hasNextItem = false;
private boolean endOfStream = false;
private StringBuilder buffer = new StringBuilder();
private boolean prefixSeen = false;
private StringBuilder eventDataBuffer = new StringBuilder();
private String currentEventType = null;

private SSEIterator() {
if (sseReader != null && !isStreamClosed()) {
Expand Down Expand Up @@ -223,39 +223,69 @@ private boolean readNextMessage() {

try {
while (sseScanner.hasNextLine()) {
String chunk = sseScanner.nextLine();
buffer.append(chunk).append(NEWLINE);

int terminatorIndex;
while ((terminatorIndex = buffer.indexOf(messageTerminator)) >= 0) {
String line = buffer.substring(0, terminatorIndex + messageTerminator.length());
buffer.delete(0, terminatorIndex + messageTerminator.length());

line = line.trim();
if (line.isEmpty()) {
continue;
String line = sseScanner.nextLine();

if (line.trim().isEmpty()) {
if (eventDataBuffer.length() > 0) {
try {
nextItem = ObjectMappers.JSON_MAPPER.readValue(eventDataBuffer.toString(), valueType);
hasNextItem = true;
eventDataBuffer.setLength(0);
currentEventType = null;
return true;
} catch (Exception parseEx) {
System.err.println("Failed to parse SSE event: " + parseEx.getMessage());
eventDataBuffer.setLength(0);
currentEventType = null;
continue;
}
}
continue;
}

if (!prefixSeen && line.startsWith(DATA_PREFIX)) {
prefixSeen = true;
line = line.substring(DATA_PREFIX.length()).trim();
} else if (!prefixSeen) {
continue;
if (line.startsWith(DATA_PREFIX)) {
String dataContent = line.substring(DATA_PREFIX.length());
if (dataContent.startsWith(" ")) {
dataContent = dataContent.substring(1);
}

if (streamTerminator != null && line.contains(streamTerminator)) {
if (eventDataBuffer.length() == 0
&& streamTerminator != null
&& dataContent.trim().equals(streamTerminator)) {
endOfStream = true;
return false;
}

try {
nextItem = ObjectMappers.JSON_MAPPER.readValue(line, valueType);
hasNextItem = true;
prefixSeen = false;
return true;
} catch (Exception parseEx) {
continue;
if (eventDataBuffer.length() > 0) {
eventDataBuffer.append('\n');
}
eventDataBuffer.append(dataContent);
} else if (line.startsWith("event:")) {
String eventValue = line.length() > 6 ? line.substring(6) : "";
if (eventValue.startsWith(" ")) {
eventValue = eventValue.substring(1);
}
currentEventType = eventValue;
} else if (line.startsWith("id:")) {
// Event ID field (ignored)
} else if (line.startsWith("retry:")) {
// Retry field (ignored)
} else if (line.startsWith(":")) {
// Comment line (ignored)
}
}

if (eventDataBuffer.length() > 0) {
try {
nextItem = ObjectMappers.JSON_MAPPER.readValue(eventDataBuffer.toString(), valueType);
hasNextItem = true;
eventDataBuffer.setLength(0);
currentEventType = null;
return true;
} catch (Exception parseEx) {
System.err.println("Failed to parse final SSE event: " + parseEx.getMessage());
eventDataBuffer.setLength(0);
currentEventType = null;
}
}

Expand Down
16 changes: 15 additions & 1 deletion src/main/java/com/pipedream/api/core/pagination/BasePage.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
package com.pipedream.api.core.pagination;

import java.util.List;
import java.util.Optional;

public abstract class BasePage<T> {
private final boolean hasNext;
private final List<T> items;
private final Object response;

public BasePage(boolean hasNext, List<T> items) {
public BasePage(boolean hasNext, List<T> items, Object response) {
this.hasNext = hasNext;
this.items = items;
this.response = response;
}

public boolean hasNext() {
Expand All @@ -21,4 +24,15 @@ public boolean hasNext() {
public List<T> getItems() {
return items;
}

/**
* Returns the full response object for accessing pagination metadata like cursor tokens.
*
* @return Optional containing the response, or empty if unavailable
*/
public <R> Optional<R> getResponse() {
@SuppressWarnings("unchecked")
R typedResponse = (R) response;
return Optional.ofNullable(typedResponse);
}
}
4 changes: 2 additions & 2 deletions src/main/java/com/pipedream/api/core/pagination/SyncPage.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
public class SyncPage<T> extends BasePage<T> {
protected final Supplier<? extends SyncPage<T>> nextSupplier;

public SyncPage(boolean hasNext, List<T> items, Supplier<? extends SyncPage<T>> nextSupplier) {
super(hasNext, items);
public SyncPage(boolean hasNext, List<T> items, Object response, Supplier<? extends SyncPage<T>> nextSupplier) {
super(hasNext, items, response);
this.nextSupplier = nextSupplier;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@

public class SyncPagingIterable<T> extends SyncPage<T> implements Iterable<T> {

public SyncPagingIterable(boolean hasNext, List<T> items, Supplier<? extends SyncPage<T>> getNext) {
super(hasNext, items, getNext);
public SyncPagingIterable(
boolean hasNext, List<T> items, Object response, Supplier<? extends SyncPage<T>> getNext) {
super(hasNext, items, response, getNext);
}

public SyncPagingIterable(boolean hasNext, Optional<List<T>> items, Supplier<? extends SyncPage<T>> getNext) {
super(hasNext, items.orElse(new ArrayList<>()), getNext);
public SyncPagingIterable(
boolean hasNext, Optional<List<T>> items, Object response, Supplier<? extends SyncPage<T>> getNext) {
super(hasNext, items.orElse(new ArrayList<>()), response, getNext);
}

public Stream<T> streamItems() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,6 @@ public CompletableFuture<BaseClientHttpResponse<SyncPagingIterable<Account>>> li
.addPathSegments("v1/connect")
.addPathSegment(clientOptions.projectId())
.addPathSegments("accounts");
if (request.getAppId().isPresent()) {
QueryStringMapper.addQueryParameter(
httpUrl, "app_id", request.getAppId().get(), false);
}
if (request.getExternalUserId().isPresent()) {
QueryStringMapper.addQueryParameter(
httpUrl, "external_user_id", request.getExternalUserId().get(), false);
Expand All @@ -92,6 +88,9 @@ public CompletableFuture<BaseClientHttpResponse<SyncPagingIterable<Account>>> li
QueryStringMapper.addQueryParameter(
httpUrl, "limit", request.getLimit().get(), false);
}
if (request.getApp().isPresent()) {
QueryStringMapper.addQueryParameter(httpUrl, "app", request.getApp().get(), false);
}
if (request.getIncludeCredentials().isPresent()) {
QueryStringMapper.addQueryParameter(
httpUrl,
Expand Down Expand Up @@ -125,15 +124,16 @@ public void onResponse(@NotNull Call call, @NotNull Response response) throws IO
.build();
List<Account> result = parsedResponse.getData();
future.complete(new BaseClientHttpResponse<>(
new SyncPagingIterable<Account>(startingAfter.isPresent(), result, () -> {
try {
return list(nextRequest, requestOptions)
.get()
.body();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}),
new SyncPagingIterable<Account>(
startingAfter.isPresent(), result, parsedResponse, () -> {
try {
return list(nextRequest, requestOptions)
.get()
.body();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
}),
response));
return;
}
Expand Down Expand Up @@ -183,10 +183,6 @@ public CompletableFuture<BaseClientHttpResponse<Account>> create(
.addPathSegments("v1/connect")
.addPathSegment(clientOptions.projectId())
.addPathSegments("accounts");
if (request.getAppId().isPresent()) {
QueryStringMapper.addQueryParameter(
httpUrl, "app_id", request.getAppId().get(), false);
}
if (request.getExternalUserId().isPresent()) {
QueryStringMapper.addQueryParameter(
httpUrl, "external_user_id", request.getExternalUserId().get(), false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,6 @@ public BaseClientHttpResponse<SyncPagingIterable<Account>> list(
.addPathSegments("v1/connect")
.addPathSegment(clientOptions.projectId())
.addPathSegments("accounts");
if (request.getAppId().isPresent()) {
QueryStringMapper.addQueryParameter(
httpUrl, "app_id", request.getAppId().get(), false);
}
if (request.getExternalUserId().isPresent()) {
QueryStringMapper.addQueryParameter(
httpUrl, "external_user_id", request.getExternalUserId().get(), false);
Expand All @@ -87,6 +83,9 @@ public BaseClientHttpResponse<SyncPagingIterable<Account>> list(
QueryStringMapper.addQueryParameter(
httpUrl, "limit", request.getLimit().get(), false);
}
if (request.getApp().isPresent()) {
QueryStringMapper.addQueryParameter(httpUrl, "app", request.getApp().get(), false);
}
if (request.getIncludeCredentials().isPresent()) {
QueryStringMapper.addQueryParameter(
httpUrl,
Expand Down Expand Up @@ -116,9 +115,9 @@ public BaseClientHttpResponse<SyncPagingIterable<Account>> list(
.build();
List<Account> result = parsedResponse.getData();
return new BaseClientHttpResponse<>(
new SyncPagingIterable<Account>(
startingAfter.isPresent(), result, () -> list(nextRequest, requestOptions)
.body()),
new SyncPagingIterable<Account>(startingAfter.isPresent(), result, parsedResponse, () -> list(
nextRequest, requestOptions)
.body()),
response);
}
String responseBodyString = responseBody != null ? responseBody.string() : "{}";
Expand Down Expand Up @@ -156,10 +155,6 @@ public BaseClientHttpResponse<Account> create(CreateAccountOpts request, Request
.addPathSegments("v1/connect")
.addPathSegment(clientOptions.projectId())
.addPathSegments("accounts");
if (request.getAppId().isPresent()) {
QueryStringMapper.addQueryParameter(
httpUrl, "app_id", request.getAppId().get(), false);
}
if (request.getExternalUserId().isPresent()) {
QueryStringMapper.addQueryParameter(
httpUrl, "external_user_id", request.getExternalUserId().get(), false);
Expand Down
Loading
Loading