Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
f8d9b28
switch from property to method
valeriupredoi Dec 16, 2025
59d54b3
start fixing tests
valeriupredoi Dec 16, 2025
554fe6f
more test fixing
valeriupredoi Dec 16, 2025
8c82b69
more test fixing
valeriupredoi Dec 16, 2025
934b013
final test fixing
valeriupredoi Dec 16, 2025
1f47bcb
real s3 axis test
valeriupredoi Dec 16, 2025
b88a552
turn on axis
valeriupredoi Dec 16, 2025
23acfde
add axis to mock expected
valeriupredoi Dec 16, 2025
29fcaee
fix bug
valeriupredoi Dec 16, 2025
7298051
turn on some screen printing
valeriupredoi Dec 16, 2025
5ef6146
ix test
valeriupredoi Dec 16, 2025
f7f6949
final version of not working Reductionist test
valeriupredoi Dec 16, 2025
72f12ac
too many blank lines
valeriupredoi Dec 16, 2025
622f492
Merge branch 'main' into axis_api
valeriupredoi Jan 6, 2026
be78e13
Merge branch 'main' into axis_api
valeriupredoi Jan 20, 2026
d87005a
Change the Reductionist API to return a JSON payload, ths removes all…
maxstack Jan 21, 2026
0a66655
Merge branch 'main' into axis_api
valeriupredoi Jan 21, 2026
b2e6e49
add json import
valeriupredoi Jan 21, 2026
6857d22
add some prints for response and sizeof
valeriupredoi Jan 21, 2026
432fc4f
Use response.json() as Reductionist should return application/json
maxstack Jan 21, 2026
e371cee
Fix Reductionist unit tests
maxstack Jan 22, 2026
88fa26f
add small file test
valeriupredoi Jan 23, 2026
4b4f3ad
toss a print statement
valeriupredoi Jan 23, 2026
dc41f7b
add validation vs numpy
valeriupredoi Jan 23, 2026
ae71547
betterify test
valeriupredoi Jan 23, 2026
4cdb30f
skip tests that chuck in too much memory from stuffy Reductionst resp…
valeriupredoi Jan 26, 2026
cab6c0e
add tests
valeriupredoi Feb 2, 2026
59728e6
add some prints
valeriupredoi Feb 2, 2026
a3a7439
add a print
valeriupredoi Feb 3, 2026
8bdcdcf
Reductionist API updates:
maxstack Feb 3, 2026
3864de0
add cbor2 to deps lists
valeriupredoi Feb 4, 2026
b5b0df3
fix tests
valeriupredoi Feb 4, 2026
d33cefb
remove print
valeriupredoi Feb 4, 2026
bd6b02a
remove print
valeriupredoi Feb 4, 2026
e0c54e0
add todos in test module
valeriupredoi Feb 4, 2026
fc3e628
Reductionist API change moving from S3 centric fields to more generic…
maxstack Feb 6, 2026
f6045ce
Merge branch 'main' into axis_api
valeriupredoi Feb 11, 2026
6d09d59
Enable http/https interface_type with Reductionist.
maxstack Feb 13, 2026
a861fbc
correct handling of load from http
valeriupredoi Feb 13, 2026
34a889c
make test without pytest raiser
valeriupredoi Feb 13, 2026
0c38b26
Just use the URL when talking to http(s) storage backend.
maxstack Feb 13, 2026
2030d8d
Change test storage_type to https
maxstack Feb 13, 2026
a77b286
Set active_storage_url in failing Reductionist test and remove storag…
maxstack Feb 13, 2026
0d56ef6
turn on test
valeriupredoi Feb 16, 2026
40c9d62
correct test reductionist
valeriupredoi Feb 16, 2026
e8cd5ba
fix test json
valeriupredoi Feb 16, 2026
3d2a5c8
fix test storage types
valeriupredoi Feb 16, 2026
d0207f0
fix flake
valeriupredoi Feb 16, 2026
ab2297b
add comprehensive https test
valeriupredoi Feb 17, 2026
2e11a40
add a v1 test too
valeriupredoi Feb 17, 2026
382e7f3
add storage opts case
valeriupredoi Feb 17, 2026
a120079
remove commented out text
valeriupredoi Feb 17, 2026
8b08f96
add anon bucket test - currently failing
valeriupredoi Feb 17, 2026
ca8e46d
clean up active
valeriupredoi Feb 17, 2026
0093daf
clean up reductionist
valeriupredoi Feb 17, 2026
9635876
turn off anon bucket settings
valeriupredoi Feb 17, 2026
8ba6786
remove unwanted exception
valeriupredoi Feb 17, 2026
5c4ee12
add basic test for coverage
valeriupredoi Feb 17, 2026
8933eef
cleanup
valeriupredoi Feb 17, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 76 additions & 70 deletions activestorage/active.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pathlib import Path
from typing import Optional

import aiohttp
import fsspec
import numpy as np
import pyfive
Expand Down Expand Up @@ -83,14 +84,40 @@ def load_from_s3(uri, storage_options=None):
return ds


def load_from_https(uri):
def get_endpoint_url(storage_options):
"""
Return the endpoint_url defined in storage_options, or `None` if not defined.
"""
if storage_options is not None:
endpoint_url = storage_options.get('endpoint_url')
if endpoint_url is not None:
return endpoint_url
client_kwargs = storage_options.get('client_kwargs')
if client_kwargs:
endpoint_url = client_kwargs.get('endpoint_url')
if endpoint_url is not None:
return endpoint_url


def load_from_https(uri, storage_options=None):
"""
Load a pyfive.high_level.Dataset from a
netCDF4 file on an https server (NGINX).
This works for both http and https endpoints.
"""
# TODO need to test if NGINX server behind https://
fs = fsspec.filesystem('http')
http_file = fs.open(uri, 'rb')
if storage_options is None:
client_kwargs = {'auth': None}
fs = fsspec.filesystem('http', **client_kwargs)
http_file = fs.open(uri, 'rb')
else:
username = storage_options.get("username", None)
password = storage_options.get("password", None)
client_kwargs = {
'auth': aiohttp.BasicAuth(username, password) if username and password else None
}
fs = fsspec.filesystem('http', **client_kwargs)
http_file = fs.open(uri, 'rb')

ds = pyfive.File(http_file)
print(f"Dataset loaded from https with Pyfive: {uri}")
return ds
Expand Down Expand Up @@ -272,9 +299,10 @@ def __load_nc_file(self):
elif self.storage_type == "s3":
nc = load_from_s3(self.uri, self.storage_options)
elif self.storage_type == "https":
nc = load_from_https(self.uri)
nc = load_from_https(self.uri, self.storage_options)
self.filename = self.uri
self.ds = nc[ncvar]
print("Loaded dataset", self.ds)

def __get_missing_attributes(self):
if self.ds is None:
Expand Down Expand Up @@ -365,19 +393,22 @@ def method(self, value):

self._method = value

@property
def mean(self):
def mean(self, axis=None):
self._method = "mean"
if axis is not None:
self._axis = axis
return self

@property
def min(self):
def min(self, axis=None):
self._method = "min"
if axis is not None:
self._axis = axis
return self

@property
def max(self):
def max(self, axis=None):
self._method = "max"
if axis is not None:
self._axis = axis
return self

@property
Expand Down Expand Up @@ -484,6 +515,10 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype,
if self.storage_type == "s3" and self._version == 2:
if self.storage_options is not None:
key, secret = None, None
if self.storage_options.get("anon", None) is True:
print("Reductionist session for Anon S3 bucket.")
session = reductionist.get_session(
None, None, S3_ACTIVE_STORAGE_CACERT)
if "key" in self.storage_options:
key = self.storage_options["key"]
if "secret" in self.storage_options:
Expand All @@ -498,6 +533,15 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype,
session = reductionist.get_session(S3_ACCESS_KEY,
S3_SECRET_KEY,
S3_ACTIVE_STORAGE_CACERT)
elif self.storage_type == "https" and self._version == 2:
username, password = None, None
if self.storage_options is not None:
username = self.storage_options.get("username", None)
password = self.storage_options.get("password", None)
if username and password:
session = reductionist.get_session(username, password, None)
else:
session = reductionist.get_session(None, None, None)
else:
session = None

Expand Down Expand Up @@ -585,16 +629,9 @@ def _from_storage(self, ds, indexer, chunks, out_shape, out_dtype,

def _get_endpoint_url(self):
"""Return the endpoint_url of an S3 object store, or `None`"""
endpoint_url = self.storage_options.get('endpoint_url')
endpoint_url = get_endpoint_url(self.storage_options)
if endpoint_url is not None:
return endpoint_url

client_kwargs = self.storage_options.get('client_kwargs')
if client_kwargs:
endpoint_url = client_kwargs.get('endpoint_url')
if endpoint_url is not None:
return endpoint_url

return f"http://{urllib.parse.urlparse(self.filename).netloc}"

def _process_chunk(self,
Expand Down Expand Up @@ -624,7 +661,6 @@ def _process_chunk(self,
axis = self._axis

if self.storage_type == 's3' and self._version == 1:

tmp, count = reduce_opens3_chunk(ds._fh,
offset,
size,
Expand All @@ -640,9 +676,7 @@ def _process_chunk(self,

elif self.storage_type == "s3" and self._version == 2:
# S3: pass in pre-configured storage options (credentials)
# print("S3 rfile is:", self.filename)
parsed_url = urllib.parse.urlparse(self.filename)

bucket = parsed_url.netloc
object = parsed_url.path

Expand All @@ -651,17 +685,13 @@ def _process_chunk(self,
if bucket == "":
bucket = os.path.dirname(object)
object = os.path.basename(object)
# print("S3 bucket:", bucket)
# print("S3 file:", object)
if self.storage_options is None:

# for the moment we need to force ds.dtype to be a numpy type
# Reductionist returns "count" as a list even for single elements
tmp, count = reductionist.reduce_chunk(session,
S3_ACTIVE_STORAGE_URL,
S3_URL,
bucket,
object,
f"{S3_URL}/{bucket}/{object}",
offset,
size,
compressor,
Expand All @@ -674,22 +704,14 @@ def _process_chunk(self,
axis,
operation=self._method)
else:
# special case for "anon=True" buckets that work only with e.g.
# fs = s3fs.S3FileSystem(anon=True, client_kwargs={'endpoint_url': S3_URL})
# where file uri = bucketX/fileY.mc
# print("S3 Storage options to Reductionist:", self.storage_options)
if self.storage_options.get("anon", None) is True:
bucket = os.path.dirname(parsed_url.path) # bucketX
object = os.path.basename(parsed_url.path) # fileY
print("S3 anon=True Bucket and File:", bucket, object)

bucket = os.path.dirname(parsed_url.path)
object = os.path.basename(parsed_url.path)
# Reductionist returns "count" as a list even for single elements
tmp, count = reductionist.reduce_chunk(
session,
self.active_storage_url,
self._get_endpoint_url(),
bucket,
object,
f"{self._get_endpoint_url()}/{bucket}/{object}",
offset,
size,
compressor,
Expand All @@ -701,39 +723,23 @@ def _process_chunk(self,
chunk_selection,
axis,
operation=self._method)
# this is for testing ONLY until Reductionist is able to handle https
# located files; after that, we can pipe any regular https file through
# to Reductionist, provided the https server is "closer" to Reductionist
elif self.storage_type == "https" and self._version == 2:
# build a simple session
session = requests.Session()
session.auth = (None, None)
session.verify = False
bucket = "https" # really doesn't matter

# note the extra "storage_type" kwarg
# this currently makes Reductionist throw a wobbly
# E activestorage.reductionist.ReductionistError: Reductionist error: HTTP 400: {"error": {"message": "request data is not valid", "caused_by": ["Failed to deserialize the JSON body into the target type", "storage_type: unknown field `storage_type`, expected one of `source`, `bucket`, `object`, `dtype`, `byte_order`, `offset`, `size`, `shape`, `order`, `selection`, `compression`, `filters`, `missing` at line 1 column 550"]}} # noqa

# Reductionist returns "count" as a list even for single elements
tmp, count = reductionist.reduce_chunk(
session,
"https://reductionist.jasmin.ac.uk/", # Wacasoft
self.filename,
bucket,
self.filename,
offset,
size,
compressor,
filters,
self.missing,
np.dtype(ds.dtype),
chunks,
ds._order,
chunk_selection,
axis,
operation=self._method,
storage_type="https")
tmp, count = reductionist.reduce_chunk(session,
self.active_storage_url,
f"{self.uri}",
offset,
size,
compressor,
filters,
self.missing,
np.dtype(ds.dtype),
chunks,
ds._order,
chunk_selection,
axis,
operation=self._method,
storage_type="https")

elif self.storage_type == 'ActivePosix' and self.version == 2:
# This is where the DDN Fuse and Infinia wrappers go
raise NotImplementedError
Expand Down
45 changes: 19 additions & 26 deletions activestorage/reductionist.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Reductionist S3 Active Storage server storage interface module."""

import cbor2 as cbor
import collections.abc
import http.client
import json
Expand All @@ -10,7 +11,6 @@
import numpy as np
import requests

REDUCTIONIST_AXIS_READY = False

DEBUG = 0

Expand All @@ -24,16 +24,19 @@ def get_session(username: str, password: str,
:returns: a client session object.
"""
session = requests.Session()
# TODO Stack-HPC
# we need to allow Anon buckets. though this
# will break connection to data server
# if username is None and password is None:
# return session
session.auth = (username, password)
session.verify = cacert or False
return session


def reduce_chunk(session,
server,
source,
bucket,
object,
url,
offset,
size,
compression,
Expand All @@ -50,9 +53,7 @@ def reduce_chunk(session,

:param server: Reductionist server URL
:param cacert: Reductionist CA certificate path
:param source: S3 URL
:param bucket: S3 bucket
:param object: S3 object
:param url: object URL
:param offset: offset of data in object
:param size: size of data in object
:param compression: optional `numcodecs.abc.Codec` compression codec
Expand All @@ -74,9 +75,7 @@ def reduce_chunk(session,
:raises ReductionistError: if the request to Reductionist fails
"""

request_data = build_request_data(source,
bucket,
object,
request_data = build_request_data(url,
offset,
size,
compression,
Expand All @@ -91,7 +90,7 @@ def reduce_chunk(session,
if DEBUG:
print(f"Reductionist request data dictionary: {request_data}")
api_operation = "sum" if operation == "mean" else operation or "select"
url = f'{server}/v1/{api_operation}/'
url = f'{server}/v2/{api_operation}/'
response = request(session, url, request_data)

if response.ok:
Expand Down Expand Up @@ -174,9 +173,7 @@ def encode_missing(missing):
assert False, "Expected missing values not found"


def build_request_data(source: str,
bucket: str,
object: str,
def build_request_data(url: str,
offset: int,
size: int,
compression,
Expand All @@ -190,15 +187,13 @@ def build_request_data(source: str,
storage_type=None) -> dict:
"""Build request data for Reductionist API."""
request_data = {
'source': source,
'bucket': bucket,
'object': object,
'interface_type': storage_type if storage_type else "s3",
'url': url,
'dtype': dtype.name,
'byte_order': encode_byte_order(dtype),
'offset': int(offset),
'size': int(size),
'order': order,
'storage_type': storage_type,
}
if shape:
request_data["shape"] = shape
Expand All @@ -214,11 +209,8 @@ def build_request_data(source: str,
if any(missing):
request_data["missing"] = encode_missing(missing)

if REDUCTIONIST_AXIS_READY:
if axis is not None:
request_data['axis'] = axis
elif axis is not None and len(axis) != len(shape):
raise ValueError(
"Can't reduce over axis subset unitl reductionist is ready")

return {k: v for k, v in request_data.items() if v is not None}

Expand All @@ -234,15 +226,16 @@ def request(session: requests.Session, url: str, request_data: dict):

def decode_result(response):
"""Decode a successful response, return as a 2-tuple of (numpy array or scalar, count)."""
dtype = response.headers['x-activestorage-dtype']
shape = json.loads(response.headers['x-activestorage-shape'])
reduction_result = cbor.loads(response.content)
dtype = reduction_result['dtype']
shape = reduction_result['shape'] if "shape" in reduction_result else None

# Result
result = np.frombuffer(response.content, dtype=dtype)
result = np.frombuffer(reduction_result['bytes'], dtype=dtype)
result = result.reshape(shape)

# Counts
count = json.loads(response.headers['x-activestorage-count'])
count = reduction_result['count']
# TODO: When reductionist is ready, we need to fix 'count'

# Mask the result
Expand Down
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ channels:
dependencies:
- python >=3.10
- pyfive >=0.5.0 # earliest support for advanced Pyfive
- cbor2
- fsspec
- h5netcdf
- netcdf4
Expand Down
Loading
Loading