Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/minimal-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
python -m pip install --upgrade pip
python -m pip install poetry==1.2.2
poetry config virtualenvs.create false --local
poetry install --extras "sqlserver" -vvv
poetry install --extras "sqlserver deltalake" -vvv
- name: Test Metadata
run: pytest tests/test_metadata.py
- name: Test Session
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/static-checking.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
python -m pip install --upgrade pip
python -m pip install poetry==1.2.2
poetry config virtualenvs.create false --local
poetry install --extras "sqlserver" -vvv
poetry install --extras "sqlserver deltalake" -vvv
- name: mypy check
run: mypy --install-types --non-interactive awswrangler
- name: Flake8 Lint
Expand Down
4 changes: 2 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ or

* Install dependencies:

``poetry install --extras "sqlserver oracle sparql"``
``poetry install --extras "sqlserver oracle sparql deltalake"``

* Run the validation script:

Expand Down Expand Up @@ -135,7 +135,7 @@ or

* Install dependencies:

``poetry install --extras "sqlserver oracle sparql"``
``poetry install --extras "sqlserver oracle sparql deltalake"``

* Go to the ``test_infra`` directory

Expand Down
2 changes: 2 additions & 0 deletions awswrangler/s3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from awswrangler.s3._download import download # noqa
from awswrangler.s3._list import does_object_exist, list_buckets, list_directories, list_objects # noqa
from awswrangler.s3._merge_upsert_table import merge_upsert_table # noqa
from awswrangler.s3._read_deltalake import read_deltalake # noqa
from awswrangler.s3._read_excel import read_excel # noqa
from awswrangler.s3._read_parquet import read_parquet, read_parquet_metadata, read_parquet_table # noqa
from awswrangler.s3._read_text import read_csv, read_fwf, read_json # noqa
Expand All @@ -27,6 +28,7 @@
"list_buckets",
"list_directories",
"list_objects",
"read_deltalake",
"read_parquet",
"read_parquet_metadata",
"read_parquet_table",
Expand Down
87 changes: 87 additions & 0 deletions awswrangler/s3/_read_deltalake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""Amazon S3 Read Delta Lake Module (PRIVATE)."""
import importlib.util
from typing import Any, Dict, List, Optional, Tuple

import boto3
import pandas as pd

from awswrangler import _utils

_deltalake_found = importlib.util.find_spec("deltalake")
if _deltalake_found:
from deltalake import DeltaTable # pylint: disable=import-error


def _set_default_storage_options_kwargs(
session: boto3.Session, s3_additional_kwargs: Optional[Dict[str, Any]]
) -> Dict[str, Any]:
defaults = {key.upper(): value for key, value in _utils.boto3_to_primitives(boto3_session=session).items()}
s3_additional_kwargs = s3_additional_kwargs or {}
return {
**defaults,
**s3_additional_kwargs,
}


def read_deltalake(
path: Optional[str] = None,
version: Optional[int] = None,
partitions: Optional[List[Tuple[str, str, Any]]] = None,
columns: Optional[List[str]] = None,
without_files: bool = False,
boto3_session: Optional[boto3.Session] = None,
s3_additional_kwargs: Optional[Dict[str, str]] = None,
pyarrow_additional_kwargs: Optional[Dict[str, Any]] = None,
) -> pd.DataFrame:
"""Load a Deltalake table data from an S3 path.

This function requires the `deltalake package
<https://delta-io.github.io/delta-rs/python>`__.
See the `How to load a Delta table
<https://delta-io.github.io/delta-rs/python/usage.html#loading-a-delta-table>`__
guide for loading instructions.

Parameters
----------
path: Optional[str]
The path of the DeltaTable.
version: Optional[int]
The version of the DeltaTable.
partitions: Optional[List[Tuple[str, str, Any]]
A list of partition filters, see help(DeltaTable.files_by_partitions)
for filter syntax.
columns: Optional[List[str]]
The columns to project. This can be a list of column names to include
(order and duplicates are preserved).
without_files: bool
If True, load the table without tracking files (memory-friendly).
Some append-only applications might not need to track files.
boto3_session: Optional[boto3.Session()]
Boto3 Session. If None, the default boto3 session is used.
s3_additional_kwargs: Optional[Dict[str, str]]
Forwarded to the Delta Table class for the storage options of the S3 backend.
pyarrow_additional_kwargs: Optional[Dict[str, str]]
Forwarded to the PyArrow to_pandas method.

Returns
-------
df: pd.DataFrame
DataFrame with the results.

See Also
--------
deltalake.DeltaTable : Create a DeltaTable instance with the deltalake library.
"""
pyarrow_additional_kwargs = pyarrow_additional_kwargs or {} # TODO: Use defaults in 3.0.0 # pylint: disable=fixme
storage_options = _set_default_storage_options_kwargs(_utils.ensure_session(boto3_session), s3_additional_kwargs)

return (
DeltaTable(
table_uri=path,
version=version,
storage_options=storage_options,
without_files=without_files,
)
.to_pyarrow_table(partitions=partitions, columns=columns)
.to_pandas(**pyarrow_additional_kwargs)
)
1 change: 1 addition & 0 deletions docs/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Amazon S3
merge_datasets
merge_upsert_table
read_csv
read_deltalake
read_excel
read_fwf
read_json
Expand Down
63 changes: 40 additions & 23 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@ backoff = ">=1.11.1,<3.0.0"
SPARQLWrapper = { version = ">=1.8.5,<3.0.0", optional = true }
pyodbc = { version = "~4.0.32", optional = true }
oracledb = { version = "~1.0.0", optional = true }
deltalake = { version = "~0.6.4", optional = true }

[tool.poetry.extras]
sqlserver = ["pyodbc"]
oracle = ["oracledb"]
sparql = ["SPARQLWrapper"]
deltalake = ["deltalake"]

[tool.poetry.dev-dependencies]
wheel = "^0.37.1"
Expand Down
48 changes: 48 additions & 0 deletions tests/test_s3_deltalake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import pandas as pd
import pytest
from deltalake import DeltaTable, write_deltalake

import awswrangler as wr


@pytest.fixture(scope="session")
def storage_options():
return {"AWS_S3_ALLOW_UNSAFE_RENAME": "TRUE"}


@pytest.mark.parametrize("s3_additional_kwargs", [None, {"ServerSideEncryption": "AES256"}])
@pytest.mark.parametrize("pyarrow_additional_kwargs", [None, {"safe": True, "deduplicate_objects": False}])
def test_read_deltalake(path, s3_additional_kwargs, pyarrow_additional_kwargs, storage_options):
df = pd.DataFrame({"c0": [1, 2, 3], "c1": ["foo", None, "bar"], "c2": [3.0, 4.0, 5.0], "c3": [True, False, None]})
write_deltalake(table_or_uri=path, data=df, storage_options=storage_options)

df2 = wr.s3.read_deltalake(
path=path, s3_additional_kwargs=s3_additional_kwargs, pyarrow_additional_kwargs=pyarrow_additional_kwargs
)
assert df2.equals(df)


def test_read_deltalake_versioned(path, storage_options):
df = pd.DataFrame({"c0": [1, 2, 3], "c1": ["foo", "baz", "bar"]})
write_deltalake(table_or_uri=path, data=df, storage_options=storage_options)
table = DeltaTable(path, version=0, storage_options=storage_options)

df2 = wr.s3.read_deltalake(path=path)
assert df2.equals(df)

df["c2"] = [True, False, True]
write_deltalake(table_or_uri=table, data=df, mode="overwrite", overwrite_schema=True)

df3 = wr.s3.read_deltalake(path=path, version=0)
assert df3.equals(df.drop("c2", axis=1))

df4 = wr.s3.read_deltalake(path=path, version=1)
assert df4.equals(df)


def test_read_deltalake_partitions(path, storage_options):
df = pd.DataFrame({"c0": [1, 2, 3], "c1": [True, False, True], "par0": ["foo", "foo", "bar"], "par1": [1, 2, 2]})
write_deltalake(table_or_uri=path, data=df, partition_by=["par0", "par1"], storage_options=storage_options)

df2 = wr.s3.read_deltalake(path=path, columns=["c0"], partitions=[("par0", "=", "foo"), ("par1", "=", "1")])
assert df2.shape == (1, 1)
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ deps =
.[sqlserver]
.[oracle]
.[sparql]
.[deltalake]
pytest==7.1.2
pytest-rerunfailures==10.2
pytest-xdist==3.0.2
Expand Down