Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion dataframely/_compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,14 @@ class Dialect: # type: ignore # noqa: N801
from polars._typing import ( # type: ignore[attr-defined,unused-ignore]
PartitioningScheme as PartitionSchemeOrSinkDirectory,
)
else:
elif _polars_version_tuple < (1, 38): # pragma: no cover
from polars.io.partition import ( # type: ignore[no-redef,attr-defined,unused-ignore]
_SinkDirectory as PartitionSchemeOrSinkDirectory,
)
else:
from polars.io.partition import ( # type: ignore[no-redef,attr-defined,unused-ignore]
PartitionBy as PartitionSchemeOrSinkDirectory,
)

# ------------------------------------------------------------------------------------ #

Expand Down
5 changes: 2 additions & 3 deletions dataframely/_storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# SPDX-License-Identifier: BSD-3-Clause

from ._base import StorageBackend
from ._fsspec import get_file_prefix

__all__ = [
"StorageBackend",
]
__all__ = ["StorageBackend", "get_file_prefix"]
18 changes: 18 additions & 0 deletions dataframely/_storage/_fsspec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright (c) QuantCo 2025-2026
# SPDX-License-Identifier: BSD-3-Clause

from fsspec import AbstractFileSystem


def get_file_prefix(fs: AbstractFileSystem) -> str:
match fs.protocol:
case "file":
return ""
case str():
return f"{fs.protocol}://"
case ["file", *_]:
return ""
case [str(proto), *_]:
return f"{proto}://"
case _:
raise ValueError(f"Unexpected fs.protocol: {fs.protocol}")
20 changes: 9 additions & 11 deletions dataframely/_storage/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import polars as pl
from fsspec import AbstractFileSystem, url_to_fs

from dataframely._storage import get_file_prefix

from ._base import (
SerializedCollection,
SerializedRules,
Expand Down Expand Up @@ -79,7 +81,9 @@ def sink_collection(
fs: AbstractFileSystem = url_to_fs(path)[0]
for key, lf in dfs.items():
destination = (
fs.sep.join([path, key])
# Enforce that the path ends with a separator. Otherwise
# polars misbehaves on Windows.
fs.sep.join([path, key]) + fs.sep
if "partition_by" in kwargs
else fs.sep.join([path, f"{key}.parquet"])
)
Expand Down Expand Up @@ -107,7 +111,9 @@ def write_collection(
fs: AbstractFileSystem = url_to_fs(path)[0]
for key, lf in dfs.items():
destination = (
fs.sep.join([path, key])
# Enforce that the path ends with a separator. Otherwise
# polars misbehaves on Windows.
fs.sep.join([path, key]) + fs.sep
if "partition_by" in kwargs
else fs.sep.join([path, f"{key}.parquet"])
)
Expand Down Expand Up @@ -155,15 +161,7 @@ def _collection_from_parquet(
if is_file:
collection_types.append(_read_serialized_collection(source_path))
else:
prefix = (
""
if fs.protocol == "file"
else (
f"{fs.protocol}://"
if isinstance(fs.protocol, str)
else f"{fs.protocol[0]}://"
)
)
prefix = get_file_prefix(fs)
for file in fs.glob(fs.sep.join([source_path, "**", "*.parquet"])):
collection_types.append(
_read_serialized_collection(f"{prefix}{file}")
Expand Down
15 changes: 2 additions & 13 deletions dataframely/testing/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import dataframely as dy
from dataframely import FailureInfo, Validation
from dataframely._compat import deltalake
from dataframely._storage import get_file_prefix
from dataframely._storage.delta import _to_delta_table

# ----------------------------------- Schema -------------------------------------------
Expand Down Expand Up @@ -190,19 +191,7 @@ def set_metadata(self, path: str, metadata: dict[str, Any]) -> None:
metadata."""

def _prefix_path(self, path: str, fs: AbstractFileSystem) -> str:
return f"{self._get_prefix(fs)}{path}"

@staticmethod
def _get_prefix(fs: AbstractFileSystem) -> str:
return (
""
if fs.protocol == "file"
else (
f"{fs.protocol}://"
if isinstance(fs.protocol, str)
else f"{fs.protocol[0]}://"
)
)
return f"{get_file_prefix(fs)}{path}"


class ParquetCollectionStorageTester(CollectionStorageTester):
Expand Down
314 changes: 141 additions & 173 deletions pixi.lock

Large diffs are not rendered by default.

20 changes: 10 additions & 10 deletions pixi.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,35 +117,35 @@ polars = "1.35.*"

[feature.polars-latest.dependencies]
# NOTE: Update docs/faq.md when updating this.
polars = "1.36.*"
polars = "1.38.*"

[feature.nightly.tasks]
install-polars-nightly = "pip install --pre --no-deps --upgrade --only-binary :all: polars polars-runtime-32"

[environments]
build = ["build"]
default = ["dev", "lint", "optionals", "py314", "test", "polars-latest"]
default = ["dev", "lint", "optionals", "polars-latest", "py314", "test"]
docs = ["docs"]
lint = { features = ["lint"], no-default-feature = true }
nightly = ["nightly", "optionals", "test"]

# Different python versions with the latest polars
py310 = ["py310", "test", "polars-latest"]
py311 = ["py311", "test", "polars-latest"]
py312 = ["py312", "test", "polars-latest"]
py313 = ["py313", "test", "polars-latest"]
py314 = ["py314", "test", "polars-latest"]
py310 = ["polars-latest", "py310", "test"]
py311 = ["polars-latest", "py311", "test"]
py312 = ["polars-latest", "py312", "test"]
py313 = ["polars-latest", "py313", "test"]
py314 = ["polars-latest", "py314", "test"]

# Test with optional dependencies
py314-optionals = ["optionals", "py314", "test", "polars-latest"]
py314-optionals = ["optionals", "polars-latest", "py314", "test"]

# Polars compatibility envs
polars-minimal = ["py314", "test", "polars-minimal"]
default-polars-minimal = [
"dev",
"lint",
"optionals",
"polars-minimal",
"py314",
"test",
"polars-minimal",
]
polars-minimal = ["polars-minimal", "py314", "test"]
26 changes: 26 additions & 0 deletions tests/storage/test_fsspec.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Copyright (c) QuantCo 2025-2026
# SPDX-License-Identifier: BSD-3-Clause

from typing import Any

import pytest

from dataframely._storage._fsspec import get_file_prefix


class MockFS:
def __init__(self, protocol: Any) -> None:
self.protocol = protocol


def test_get_file_prefix() -> None:
assert get_file_prefix(MockFS("file")) == ""
assert get_file_prefix(MockFS("s3")) == "s3://"
assert get_file_prefix(MockFS(["file", "whatever"])) == ""
assert get_file_prefix(MockFS(["s3", "whatever"])) == "s3://"


@pytest.mark.parametrize("protocol", [5, None, [5]])
def test_get_file_prefix_invalid(protocol: Any) -> None:
with pytest.raises(ValueError):
assert 1 == get_file_prefix(MockFS(protocol))
Loading