Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 47 additions & 37 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "pipedream"

[tool.poetry]
name = "pipedream"
version = "1.0.9"
version = "1.0.10"
description = ""
readme = "README.md"
authors = []
Expand Down
2 changes: 1 addition & 1 deletion src/pipedream/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import typing

import httpx
from .types.project_environment import ProjectEnvironment
from ._.types.project_environment import ProjectEnvironment
from .core.api_error import ApiError
from .core.client_wrapper import AsyncClientWrapper, SyncClientWrapper
from .core.oauth_token_provider import OAuthTokenProvider
Expand Down
6 changes: 3 additions & 3 deletions src/pipedream/core/client_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import typing

import httpx
from ..types.project_environment import ProjectEnvironment
from .._.types.project_environment import ProjectEnvironment
from .http_client import AsyncHttpClient, HttpClient


Expand All @@ -27,10 +27,10 @@ def __init__(

def get_headers(self) -> typing.Dict[str, str]:
headers: typing.Dict[str, str] = {
"User-Agent": "pipedream/1.0.9",
"User-Agent": "pipedream/1.0.10",
"X-Fern-Language": "Python",
"X-Fern-SDK-Name": "pipedream",
"X-Fern-SDK-Version": "1.0.9",
"X-Fern-SDK-Version": "1.0.10",
**(self.get_custom_headers() or {}),
}
if self._project_environment is not None:
Expand Down
42 changes: 42 additions & 0 deletions src/pipedream/core/http_sse/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# This file was auto-generated by Fern from our API Definition.

# isort: skip_file

import typing
from importlib import import_module

if typing.TYPE_CHECKING:
from ._api import EventSource, aconnect_sse, connect_sse
from ._exceptions import SSEError
from ._models import ServerSentEvent
_dynamic_imports: typing.Dict[str, str] = {
"EventSource": "._api",
"SSEError": "._exceptions",
"ServerSentEvent": "._models",
"aconnect_sse": "._api",
"connect_sse": "._api",
}


def __getattr__(attr_name: str) -> typing.Any:
module_name = _dynamic_imports.get(attr_name)
if module_name is None:
raise AttributeError(f"No {attr_name} found in _dynamic_imports for module name -> {__name__}")
try:
module = import_module(module_name, __package__)
if module_name == f".{attr_name}":
return module
else:
return getattr(module, attr_name)
except ImportError as e:
raise ImportError(f"Failed to import {attr_name} from {module_name}: {e}") from e
except AttributeError as e:
raise AttributeError(f"Failed to get {attr_name} from {module_name}: {e}") from e


def __dir__():
lazy_attrs = list(_dynamic_imports.keys())
return sorted(lazy_attrs)


__all__ = ["EventSource", "SSEError", "ServerSentEvent", "aconnect_sse", "connect_sse"]
112 changes: 112 additions & 0 deletions src/pipedream/core/http_sse/_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# This file was auto-generated by Fern from our API Definition.

import re
from contextlib import asynccontextmanager, contextmanager
from typing import Any, AsyncGenerator, AsyncIterator, Iterator, cast

import httpx
from ._decoders import SSEDecoder
from ._exceptions import SSEError
from ._models import ServerSentEvent


class EventSource:
def __init__(self, response: httpx.Response) -> None:
self._response = response

def _check_content_type(self) -> None:
content_type = self._response.headers.get("content-type", "").partition(";")[0]
if "text/event-stream" not in content_type:
raise SSEError(
f"Expected response header Content-Type to contain 'text/event-stream', got {content_type!r}"
)

def _get_charset(self) -> str:
"""Extract charset from Content-Type header, fallback to UTF-8."""
content_type = self._response.headers.get("content-type", "")

# Parse charset parameter using regex
charset_match = re.search(r"charset=([^;\s]+)", content_type, re.IGNORECASE)
if charset_match:
charset = charset_match.group(1).strip("\"'")
# Validate that it's a known encoding
try:
# Test if the charset is valid by trying to encode/decode
"test".encode(charset).decode(charset)
return charset
except (LookupError, UnicodeError):
# If charset is invalid, fall back to UTF-8
pass

# Default to UTF-8 if no charset specified or invalid charset
return "utf-8"

@property
def response(self) -> httpx.Response:
return self._response

def iter_sse(self) -> Iterator[ServerSentEvent]:
self._check_content_type()
decoder = SSEDecoder()
charset = self._get_charset()

buffer = ""
for chunk in self._response.iter_bytes():
# Decode chunk using detected charset
text_chunk = chunk.decode(charset, errors="replace")
buffer += text_chunk

# Process complete lines
while "\n" in buffer:
line, buffer = buffer.split("\n", 1)
line = line.rstrip("\r")
sse = decoder.decode(line)
# when we reach a "\n\n" => line = ''
# => decoder will attempt to return an SSE Event
if sse is not None:
yield sse

# Process any remaining data in buffer
if buffer.strip():
line = buffer.rstrip("\r")
sse = decoder.decode(line)
if sse is not None:
yield sse

async def aiter_sse(self) -> AsyncGenerator[ServerSentEvent, None]:
self._check_content_type()
decoder = SSEDecoder()
lines = cast(AsyncGenerator[str, None], self._response.aiter_lines())
try:
async for line in lines:
line = line.rstrip("\n")
sse = decoder.decode(line)
if sse is not None:
yield sse
finally:
await lines.aclose()


@contextmanager
def connect_sse(client: httpx.Client, method: str, url: str, **kwargs: Any) -> Iterator[EventSource]:
headers = kwargs.pop("headers", {})
headers["Accept"] = "text/event-stream"
headers["Cache-Control"] = "no-store"

with client.stream(method, url, headers=headers, **kwargs) as response:
yield EventSource(response)


@asynccontextmanager
async def aconnect_sse(
client: httpx.AsyncClient,
method: str,
url: str,
**kwargs: Any,
) -> AsyncIterator[EventSource]:
headers = kwargs.pop("headers", {})
headers["Accept"] = "text/event-stream"
headers["Cache-Control"] = "no-store"

async with client.stream(method, url, headers=headers, **kwargs) as response:
yield EventSource(response)
Loading
Loading