diff --git a/README.md b/README.md index 246e413..efb7178 100644 --- a/README.md +++ b/README.md @@ -44,19 +44,27 @@ npm run test npm run typecheck ``` -To regenerate FlatBuffers TypeScript after syncing schemas, run `npm run generate:fbs`. -The pinned `flatc` compiler is downloaded automatically if not already available. -The version is set in `scripts/ensure-flatc.sh`. +To regenerate FlatBuffers TypeScript after syncing schemas, run +`npm run generate:fbs`. The pinned `flatc` compiler is downloaded automatically +if not already available. The version is set in `scripts/ensure-flatc.sh`. ## API ### IcechunkStore The main class for zarrita integration. Implements zarrita's `AsyncReadable` -interface with both `get()` and `getRange()` (needed for sharded arrays). +interface with both `get()` and `getRange()` (needed for sharded arrays). Pass +zarrita's `withRangeCoalescing` to coalesce concurrent reads against the same +backing object. This requires zarrita >= 0.7. + +> **Note:** Range coalescing uses zarrita's merged abort-signal behavior. If one +> read in a merged batch is aborted, other reads in the same batch may also +> reject. Avoid sharing an `AbortController` across requests that must cancel +> independently. ```typescript import { IcechunkStore } from "icechunk-js"; +import { withRangeCoalescing } from "zarrita"; // Open from a URL (default: branch "main") const store = await IcechunkStore.open("https://example.com/repo", { @@ -65,6 +73,7 @@ const store = await IcechunkStore.open("https://example.com/repo", { // snapshot: 'ABC123...', // formatVersion: 'v1', // skip format auto-detection for v1 repos // maxManifestCacheSize: 50, // LRU cache size (default: 100) + // withRangeCoalescing, // opt into merged range reads // signal: abortController.signal, // cancel initialization // validateChecksums: true, // integrity headers for virtual chunks // azureAccount: 'myaccount', // required for az:// virtual chunks @@ -133,12 +142,11 @@ Cloud storage URLs in virtual chunk references are automatically translated: For direct access to branches, tags, and checkouts. > **Note:** Over plain HTTP, `listBranches()` and `listTags()` only work -> reliably with v2 repos, which embed refs in the top-level `repo` file. For -> v1 repos, direct `checkoutBranch()` / `checkoutTag()` can work when the -> target ref still lives at the legacy `ref.json` path, but versioned ref -> filenames still require `listPrefix()` discovery, which `HttpStorage` does -> not provide. Use a listing-capable storage backend for full v1 branch/tag -> support. +> reliably with v2 repos, which embed refs in the top-level `repo` file. For v1 +> repos, direct `checkoutBranch()` / `checkoutTag()` can work when the target +> ref still lives at the legacy `ref.json` path, but versioned ref filenames +> still require `listPrefix()` discovery, which `HttpStorage` does not provide. +> Use a listing-capable storage backend for full v1 branch/tag support. ```typescript import { Repository, HttpStorage } from "icechunk-js"; diff --git a/src/index.ts b/src/index.ts index cd661ae..d3a8b19 100644 --- a/src/index.ts +++ b/src/index.ts @@ -28,6 +28,8 @@ export type { export { Repository } from "./reader/repository.js"; export type { RepositoryOptions, RefData } from "./reader/repository.js"; export { ReadSession } from "./reader/session.js"; +export type { ReadOptions } from "./reader/session.js"; +export type { RangeCoalescingFn } from "./reader/range-coalescer.js"; // Storage backends export { HttpStorage } from "./storage/http-storage.js"; diff --git a/src/reader/range-coalescer.ts b/src/reader/range-coalescer.ts new file mode 100644 index 0000000..31d469c --- /dev/null +++ b/src/reader/range-coalescer.ts @@ -0,0 +1,179 @@ +/** + * Adapters for zarrita's `withRangeCoalescing` (added in zarrita 0.7). + * + * The coalescer works over any range-readable store keyed by object path. + * These adapters expose icechunk's two backing-object cases in that shape: + * + * - `makeUrlStore` fetches ranges from one external virtual-chunk URL. + * - `makeStorageStore` fetches ranges from repository storage objects. + * + * Callers pass `zarrita.withRangeCoalescing` into icechunk-js explicitly when + * they want coalescing, keeping zarrita a true optional dependency. + */ + +import type { FetchClient, Storage } from "../storage/storage.js"; + +// Minimal structural mirror of zarrita's `AsyncReadable`. Kept local so +// this module has no required import from zarrita — the peer dep is +// optional and may not resolve at build time for some consumers. +type RangeQuery = { offset: number; length: number } | { suffixLength: number }; +interface GetOptions { + signal?: AbortSignal; +} +export interface AsyncReadable { + get(key: string, options?: GetOptions): Promise; + getRange( + key: string, + range: RangeQuery, + options?: GetOptions, + ): Promise; +} + +export type RangeCoalescingFn = ( + store: AsyncReadable, + opts?: { coalesceSize?: number }, +) => AsyncReadable; + +export interface MakeUrlStoreOptions { + /** Absolute HTTP URL this store always fetches. */ + url: string; + /** Pluggable HTTP client; defaults to `globalThis.fetch`. */ + fetchClient?: FetchClient; + /** + * Conditional request headers (`If-Match`, `If-Unmodified-Since`) baked + * into every fetch. Used to carry `validateChecksums` semantics through + * the coalesced path — all payloads sharing this store are assumed to + * share the same checksum, so `ReadSession.getVirtualStoreForPayload` + * partitions stores by checksum to avoid mixing conditional headers. + * + * Kept opt-in because these headers trigger CORS preflight requests in + * browsers, and most storage servers don't whitelist them by default. + */ + conditionalHeaders?: Record; +} + +function expectedRangeLength(range: RangeQuery): number { + return "suffixLength" in range ? range.suffixLength : range.length; +} + +/** + * Build a minimal `AsyncReadable` that services every `getRange` by + * fetching the configured URL with the requested byte range. The zarr + * key is ignored — when wrapped by `withRangeCoalescing`, all requests + * converge on the same path and become eligible for range-merging. + */ +export function makeUrlStore(opts: MakeUrlStoreOptions): AsyncReadable { + const { url, fetchClient, conditionalHeaders } = opts; + + async function doFetch(init: RequestInit): Promise { + return fetchClient + ? await fetchClient.fetch(url, init) + : await fetch(url, init); + } + + return { + async get() { + throw new Error( + `Virtual chunk URL store for ${url} only supports ranged reads`, + ); + }, + async getRange(_key, range, options) { + const headers: Record = conditionalHeaders + ? { ...conditionalHeaders } + : {}; + headers.Range = + "suffixLength" in range + ? `bytes=-${range.suffixLength}` + : `bytes=${range.offset}-${range.offset + range.length - 1}`; + + const response = await doFetch({ headers, signal: options?.signal }); + + if (response.status === 412) { + throw new Error( + `Virtual chunk at ${url} failed integrity check — data has been modified since snapshot was created`, + ); + } + if (response.status !== 200 && response.status !== 206) { + throw new Error( + `Failed to fetch virtual chunk from ${url}: ${response.status} ${response.statusText}`, + ); + } + + const data = new Uint8Array(await response.arrayBuffer()); + + // 206 (Partial Content) is the happy path only when the response body + // is exactly the requested range. Coalescers slice from this buffer by + // offset, so accepting an overlong partial response can shift data. + if (response.status === 206) { + const expected = expectedRangeLength(range); + if (data.length === expected) return data; + throw new Error( + `Virtual range response size mismatch for ${url}: expected ${expected} bytes, got ${data.length}`, + ); + } + + // 200 means the server ignored the Range header and sent the full + // object. Slice out the requested window so callers don't have to + // know the distinction. + if ("offset" in range) { + const end = range.offset + range.length; + if (data.length >= end) return data.slice(range.offset, end); + throw new Error( + `Virtual range request not honored for ${url}: need at least ${end} bytes for fallback slicing, got ${data.length}`, + ); + } + // Suffix-length on a 200 fallback: take the trailing suffixLength bytes. + if (data.length >= range.suffixLength) { + return data.slice(data.length - range.suffixLength); + } + throw new Error( + `Virtual suffix range request not honored for ${url}: need at least ${range.suffixLength} bytes for fallback slicing, got ${data.length}`, + ); + }, + }; +} + +/** + * Build a minimal `AsyncReadable` over icechunk repository storage objects. + * + * This lets native chunk payloads use the same zarrita range-coalescing + * wrapper as virtual chunks. The key remains the repository object path, so + * zarrita only merges ranges that target the same chunk object. + */ +export function makeStorageStore(storage: Storage): AsyncReadable { + return { + async get(key, options) { + const storageOptions = options?.signal + ? { signal: options.signal } + : undefined; + return storage.getObject(key, undefined, storageOptions); + }, + async getRange(key, range, options) { + const storageOptions = options?.signal + ? { signal: options.signal } + : undefined; + if ("suffixLength" in range) { + throw new Error( + `Storage suffix ranges are not supported for ${key}; convert suffixLength to offset/length before reading`, + ); + } + + const storageRange = { + start: range.offset, + end: range.offset + range.length, + }; + const data = await storage.getObject(key, storageRange, storageOptions); + if (data.length === range.length) return data; + + // Range header may be ignored (e.g. HTTP 200 full body). If the full + // object is available, slice out the requested window explicitly. + if (data.length >= storageRange.end) { + return data.slice(storageRange.start, storageRange.end); + } + + throw new Error( + `Storage returned ${data.length} bytes for ${key} range ${storageRange.start}-${storageRange.end - 1}; expected ${range.length} bytes`, + ); + }, + }; +} diff --git a/src/reader/session.ts b/src/reader/session.ts index 3636488..5c7bfe4 100644 --- a/src/reader/session.ts +++ b/src/reader/session.ts @@ -2,7 +2,11 @@ * ReadSession - Read-only session for accessing icechunk data. */ -import type { Storage, ByteRange, RequestOptions } from "../storage/storage.js"; +import type { + FetchClient, + Storage, + RequestOptions, +} from "../storage/storage.js"; import { decompress } from "fzstd"; import { LRUCache } from "../cache/lru.js"; import { @@ -35,6 +39,37 @@ import { type TransactionLogEntry, } from "../format/flatbuffers/index.js"; import { NotFoundError } from "../storage/storage.js"; +import { + makeStorageStore, + makeUrlStore, + type AsyncReadable, + type RangeCoalescingFn, +} from "./range-coalescer.js"; + +/** Default byte-gap threshold for zarrita's range coalescer (matches its own default). */ +const RANGE_COALESCE_SIZE = 32 * 1024; +// Per-session cap on wrapped range stores. Eviction only reduces future +// coalescing reuse for older backing objects; it does not affect read +// correctness. 256 keeps the cache bounded while covering many active virtual +// URLs or native objects in typical concurrent reads. +const RANGE_STORE_CACHE_SIZE = 256; +type RangeStoreKeyPart = string | number | null | readonly RangeStoreKeyPart[]; + +function makeRangeStoreCacheKey(parts: readonly RangeStoreKeyPart[]): string { + return JSON.stringify(parts); +} + +/** Options for chunk reads from a ReadSession. */ +export interface ReadOptions extends RequestOptions { + /** + * Zarrita-backed range coalescing function for chunk payload reads. + * + * Pass `zarrita.withRangeCoalescing` to opt in. Concurrent range reads + * against the same backing object may be merged into one larger request, + * matching zarrita's abort behavior for merged signals. + */ + withRangeCoalescing?: RangeCoalescingFn; +} /** * ReadSession provides read access to a specific snapshot. @@ -49,6 +84,21 @@ export class ReadSession { private snapshot: Snapshot; private specVersion: SpecVersion; private manifestCache: LRUCache; + /** + * Bounded cache of `AsyncReadable`s wrapped with zarrita's range coalescer. + * The cache key includes request-option identities that must not share one + * coalescing queue, notably virtual fetch clients and checksum headers. + * + * Promise slots are inserted synchronously on first use, so + * concurrent requests for the same partition share one coalescing window + * instead of racing to create parallel stores. + */ + private rangeStores?: LRUCache>; + private nativeStore?: AsyncReadable; + private fetchClientIds?: WeakMap; + private nextFetchClientId = 1; + private rangeCoalescerIds?: WeakMap; + private nextRangeCoalescerId = 1; private constructor( storage: Storage, @@ -62,6 +112,123 @@ export class ReadSession { this.manifestCache = new LRUCache(maxManifestCacheSize); } + private getFetchClientKey(fetchClient: FetchClient | undefined): string { + if (!fetchClient) return "default"; + if (!this.fetchClientIds) this.fetchClientIds = new WeakMap(); + + let id = this.fetchClientIds.get(fetchClient); + if (id === undefined) { + id = this.nextFetchClientId; + this.nextFetchClientId = id + 1; + this.fetchClientIds.set(fetchClient, id); + } + return String(id); + } + + private getRangeCoalescerKey(withRangeCoalescing: RangeCoalescingFn): string { + if (!this.rangeCoalescerIds) this.rangeCoalescerIds = new WeakMap(); + + let id = this.rangeCoalescerIds.get(withRangeCoalescing); + if (id === undefined) { + id = this.nextRangeCoalescerId; + this.nextRangeCoalescerId = id + 1; + this.rangeCoalescerIds.set(withRangeCoalescing, id); + } + return String(id); + } + + private getRangeStore( + partitionKey: readonly RangeStoreKeyPart[], + createStore: () => AsyncReadable, + withRangeCoalescing: RangeCoalescingFn, + ): Promise { + if (!this.rangeStores) { + this.rangeStores = new LRUCache(RANGE_STORE_CACHE_SIZE); + } + const stores = this.rangeStores; + const cacheKey = makeRangeStoreCacheKey([ + ...partitionKey, + ["coalescer", this.getRangeCoalescerKey(withRangeCoalescing)], + ]); + + const cached = stores.get(cacheKey); + if (cached) return cached; + + const raw = createStore(); + const promise: Promise = Promise.resolve() + .then(() => + withRangeCoalescing(raw, { coalesceSize: RANGE_COALESCE_SIZE }), + ) + .catch((error: unknown) => { + if (stores.get(cacheKey) === promise) stores.delete(cacheKey); + throw error; + }); + stores.set(cacheKey, promise); + return promise; + } + + private getNativeStore(options?: ReadOptions): Promise { + if (!this.nativeStore) { + this.nativeStore = makeStorageStore(this.storage); + } + const raw = this.nativeStore; + const withRangeCoalescing = options?.withRangeCoalescing; + if (!withRangeCoalescing) { + return Promise.resolve(raw); + } + return this.getRangeStore(["native"], () => raw, withRangeCoalescing); + } + + private getVirtualStoreForPayload( + httpUrl: string, + payload: { + checksumEtag: string | null; + checksumLastModified: number; + }, + options: ReadOptions | undefined, + ): Promise { + const validate = !!options?.validateChecksums; + let conditionalHeaders: Record | undefined; + if (validate) { + conditionalHeaders = {}; + if (payload.checksumEtag) { + conditionalHeaders["If-Match"] = payload.checksumEtag; + } + if (payload.checksumLastModified > 0) { + conditionalHeaders["If-Unmodified-Since"] = new Date( + payload.checksumLastModified * 1000, + ).toUTCString(); + } + } + + const checksumKey = validate + ? ["checked", payload.checksumEtag ?? "", payload.checksumLastModified] + : ["unchecked"]; + + const createStore = () => + makeUrlStore({ + url: httpUrl, + fetchClient: options?.fetchClient, + conditionalHeaders, + }); + + const withRangeCoalescing = options?.withRangeCoalescing; + if (!withRangeCoalescing) { + return Promise.resolve(createStore()); + } + + return this.getRangeStore( + [ + "virtual", + httpUrl, + ["fetch", this.getFetchClientKey(options?.fetchClient)], + checksumKey, + ], + createStore, + withRangeCoalescing, + ); + } + /** * Open a read session for a specific snapshot. * @@ -304,9 +471,10 @@ export class ReadSession { async getChunk( path: string, coords: number[], - options?: RequestOptions, + options?: ReadOptions, ): Promise { options?.signal?.throwIfAborted(); + const requestOptions = toRequestOptions(options); const node = this.getNode(path); if (!node || node.nodeData.type !== "array") { @@ -323,7 +491,10 @@ export class ReadSession { } // Load the manifest with signal - const manifest = await this.loadManifest(manifestRef.objectId, options); + const manifest = await this.loadManifest( + manifestRef.objectId, + requestOptions, + ); // Find the chunk reference const chunkRef = findChunkRef(manifest, node.id, coords); @@ -354,9 +525,10 @@ export class ReadSession { path: string, coords: number[], range: { offset: number; length: number } | { suffixLength: number }, - options?: RequestOptions, + options?: ReadOptions, ): Promise { options?.signal?.throwIfAborted(); + const requestOptions = toRequestOptions(options); const node = this.getNode(path); if (!node || node.nodeData.type !== "array") { @@ -370,7 +542,10 @@ export class ReadSession { continue; } - const manifest = await this.loadManifest(manifestRef.objectId, options); + const manifest = await this.loadManifest( + manifestRef.objectId, + requestOptions, + ); const chunkRef = findChunkRef(manifest, node.id, coords); if (!chunkRef) continue; @@ -401,7 +576,7 @@ export class ReadSession { /** Fetch chunk data based on payload type */ private async fetchChunkPayload( payload: ChunkPayload, - options?: RequestOptions, + options?: ReadOptions, ): Promise { switch (payload.type) { case "inline": @@ -409,21 +584,23 @@ export class ReadSession { case "native": { const path = getChunkPath(encodeObjectId12(payload.chunkId)); - const range: ByteRange = { - start: payload.offset, - end: payload.offset + payload.length, - }; - const data = await this.storage.getObject(path, range, options); - if (data.length === payload.length) return data; - - // Range header may be ignored (e.g. HTTP 200 full body). If the full object - // is available, slice out the requested window explicitly. - if (data.length >= range.end) { - return data.slice(range.start, range.end); + const requestedStart = payload.offset; + const requestedEnd = payload.offset + payload.length; + const store = await this.getNativeStore(options); + const data = await store.getRange( + path, + { offset: requestedStart, length: payload.length }, + { signal: options?.signal }, + ); + if (!data) { + throw new Error( + `Failed to fetch native chunk from ${path} range ${requestedStart}-${requestedEnd - 1}: empty response`, + ); } + if (data.length === payload.length) return data; throw new Error( - `Storage returned ${data.length} bytes for ${path} range ${range.start}-${range.end - 1}; expected ${payload.length} bytes`, + `Storage returned ${data.length} bytes for ${path} range ${requestedStart}-${requestedEnd - 1}; expected ${payload.length} bytes`, ); } @@ -434,65 +611,28 @@ export class ReadSession { payload.location, options?.azureAccount, ); - const headers: Record = { - Range: `bytes=${payload.offset}-${payload.offset + payload.length - 1}`, - }; - - // Add conditional request headers for integrity validation (opt-in - // because these trigger CORS preflight in browsers) - if (options?.validateChecksums) { - if (payload.checksumEtag) { - headers["If-Match"] = payload.checksumEtag; - } - if (payload.checksumLastModified > 0) { - headers["If-Unmodified-Since"] = new Date( - payload.checksumLastModified * 1000, - ).toUTCString(); - } - } - - const fetchInit: RequestInit = { - headers, - signal: options?.signal, - }; - - const client = options?.fetchClient; - const response = client - ? await client.fetch(httpUrl, fetchInit) - : await fetch(httpUrl, fetchInit); - if (response.status === 412) { + const store = await this.getVirtualStoreForPayload( + httpUrl, + payload, + options, + ); + const data = await store.getRange( + "/", + { offset: payload.offset, length: payload.length }, + { signal: options?.signal }, + ); + if (!data) { throw new Error( - `Virtual chunk at ${httpUrl} failed integrity check — data has been modified since snapshot was created`, + `Failed to fetch virtual chunk from ${httpUrl}: empty response`, ); } - - if (response.status !== 200 && response.status !== 206) { + if (data.length !== payload.length) { throw new Error( - `Failed to fetch virtual chunk from ${httpUrl}: ${response.status} ${response.statusText}`, + `Virtual range response size mismatch for ${httpUrl}: expected ${payload.length} bytes, got ${data.length}`, ); } - - const data = new Uint8Array(await response.arrayBuffer()); - if (response.status === 206) { - if (data.length !== payload.length) { - throw new Error( - `Virtual range response size mismatch for ${httpUrl}: expected ${payload.length} bytes, got ${data.length}`, - ); - } - return data; - } - - const absoluteEnd = payload.offset + payload.length; - // 200 means the Range request was ignored; only accept it if we can prove - // we received enough bytes to slice the requested absolute window. - if (data.length >= absoluteEnd) { - return data.slice(payload.offset, absoluteEnd); - } - - throw new Error( - `Virtual range request not honored for ${httpUrl}: need at least ${absoluteEnd} bytes for fallback slicing, got ${data.length}`, - ); + return data; } } } @@ -501,7 +641,7 @@ export class ReadSession { private async fetchChunkPayloadRange( payload: ChunkPayload, range: { offset: number; length: number } | { suffixLength: number }, - options?: RequestOptions, + options?: ReadOptions, ): Promise { // Compute absolute start/end within the chunk's data let rangeStart: number; @@ -525,93 +665,57 @@ export class ReadSession { case "native": { const path = getChunkPath(encodeObjectId12(payload.chunkId)); - const storageRange: ByteRange = { - start: payload.offset + rangeStart, - end: payload.offset + rangeEnd, - }; + const requestedStart = payload.offset + rangeStart; const expectedSize = rangeEnd - rangeStart; - const data = await this.storage.getObject(path, storageRange, options); - if (data.length === expectedSize) return data; - - // Range header may be ignored (e.g. HTTP 200 full body). If the full object - // is available, slice out the requested window explicitly. - if (data.length >= storageRange.end) { - return data.slice(storageRange.start, storageRange.end); + const requestedEnd = requestedStart + expectedSize; + const store = await this.getNativeStore(options); + const data = await store.getRange( + path, + { offset: requestedStart, length: expectedSize }, + { signal: options?.signal }, + ); + if (!data) { + throw new Error( + `Failed to fetch native chunk from ${path} range ${requestedStart}-${requestedEnd - 1}: empty response`, + ); } + if (data.length === expectedSize) return data; throw new Error( - `Storage returned ${data.length} bytes for ${path} range ${storageRange.start}-${storageRange.end - 1}; expected ${expectedSize} bytes`, + `Storage returned ${data.length} bytes for ${path} range ${requestedStart}-${requestedEnd - 1}; expected ${expectedSize} bytes`, ); } case "virtual": { const absoluteStart = payload.offset + rangeStart; - const absoluteEnd = payload.offset + rangeEnd; + const expectedSize = rangeEnd - rangeStart; const httpUrl = translateToHttpUrl( payload.location, options?.azureAccount, ); - const headers: Record = { - Range: `bytes=${absoluteStart}-${absoluteEnd - 1}`, - }; - - // Add conditional request headers for integrity validation (opt-in - // because these trigger CORS preflight in browsers) - if (options?.validateChecksums) { - if (payload.checksumEtag) { - headers["If-Match"] = payload.checksumEtag; - } - if (payload.checksumLastModified > 0) { - headers["If-Unmodified-Since"] = new Date( - payload.checksumLastModified * 1000, - ).toUTCString(); - } - } - const fetchInit: RequestInit = { - headers, - signal: options?.signal, - }; - - const client = options?.fetchClient; - const response = client - ? await client.fetch(httpUrl, fetchInit) - : await fetch(httpUrl, fetchInit); - - if (response.status === 412) { + const store = await this.getVirtualStoreForPayload( + httpUrl, + payload, + options, + ); + const data = await store.getRange( + "/", + { offset: absoluteStart, length: expectedSize }, + { signal: options?.signal }, + ); + if (!data) { throw new Error( - `Virtual chunk at ${httpUrl} failed integrity check — data has been modified since snapshot was created`, + `Failed to fetch virtual chunk from ${httpUrl}: empty response`, ); } - - if (response.status !== 200 && response.status !== 206) { + if (data.length !== expectedSize) { throw new Error( - `Failed to fetch virtual chunk from ${httpUrl}: ${response.status} ${response.statusText}`, + `Virtual range response size mismatch for ${httpUrl}: expected ${expectedSize} bytes, got ${data.length}`, ); } - - const expectedSize = rangeEnd - rangeStart; - const data = new Uint8Array(await response.arrayBuffer()); - - if (response.status === 206) { - if (data.length !== expectedSize) { - throw new Error( - `Virtual range response size mismatch for ${httpUrl}: expected ${expectedSize} bytes, got ${data.length}`, - ); - } - return data; - } - - // 200 means the Range request was ignored; only accept it if we can prove - // we received enough bytes to slice the requested absolute window. - if (data.length >= absoluteEnd) { - return data.slice(absoluteStart, absoluteEnd); - } - - throw new Error( - `Virtual range request not honored for ${httpUrl}: need at least ${absoluteEnd} bytes for fallback slicing, got ${data.length}`, - ); + return data; } } } @@ -642,6 +746,20 @@ export class ReadSession { /** UTF-8 encoder for byte comparisons */ const utf8Encoder = new TextEncoder(); +function toRequestOptions(options?: ReadOptions): RequestOptions | undefined { + if (!options) return undefined; + return { + ...(options.signal && { signal: options.signal }), + ...(options.fetchClient && { fetchClient: options.fetchClient }), + ...(options.validateChecksums !== undefined && { + validateChecksums: options.validateChecksums, + }), + ...(options.azureAccount !== undefined && { + azureAccount: options.azureAccount, + }), + }; +} + /** * Compare two strings by UTF-8 byte order to match Rust's str::cmp. * diff --git a/src/store.ts b/src/store.ts index f25ebee..d5804b4 100644 --- a/src/store.ts +++ b/src/store.ts @@ -11,6 +11,7 @@ import { HttpStorage } from "./storage/http-storage.js"; import type { Storage, FetchClient } from "./storage/storage.js"; import { NotFoundError } from "./storage/storage.js"; import type { NodeSnapshot } from "./format/flatbuffers/types.js"; +import type { RangeCoalescingFn } from "./reader/range-coalescer.js"; /** * zarrita's AbsolutePath type - paths must start with "/" @@ -66,6 +67,14 @@ export interface IcechunkStoreOptions { /** Maximum number of manifests to cache in the LRU cache (default: 100) */ maxManifestCacheSize?: number; + /** + * Zarrita-backed range coalescing function for chunk payload reads. + * + * Pass `zarrita.withRangeCoalescing` to opt in. Concurrent range reads + * against the same backing object may be merged into one larger request. + */ + withRangeCoalescing?: RangeCoalescingFn; + /** * Send If-Match / If-Unmodified-Since headers on virtual chunk requests. * @@ -104,6 +113,7 @@ export class IcechunkStore implements AsyncReadable { private fetchClient?: FetchClient; private validateChecksums: boolean; private azureAccount?: string; + private withRangeCoalescing?: RangeCoalescingFn; private basePath: string = ""; private constructor( @@ -111,6 +121,7 @@ export class IcechunkStore implements AsyncReadable { fetchClient?: FetchClient, validateChecksums?: boolean, azureAccount?: string, + withRangeCoalescing?: RangeCoalescingFn, ) { if (!(session instanceof ReadSession)) { throw new Error( @@ -121,6 +132,7 @@ export class IcechunkStore implements AsyncReadable { this.fetchClient = fetchClient; this.validateChecksums = validateChecksums ?? false; this.azureAccount = azureAccount; + this.withRangeCoalescing = withRangeCoalescing; } /** @@ -138,11 +150,17 @@ export class IcechunkStore implements AsyncReadable { * Open an IcechunkStore from an existing ReadSession. * * @param session - Existing ReadSession - * @param options - Store options (only fetchClient is used) + * @param options - Store options for virtual chunk reads */ static async open( session: ReadSession, - options?: Pick, + options?: Pick< + IcechunkStoreOptions, + | "fetchClient" + | "validateChecksums" + | "azureAccount" + | "withRangeCoalescing" + >, ): Promise; /** @@ -166,6 +184,7 @@ export class IcechunkStore implements AsyncReadable { options.fetchClient, options.validateChecksums, options.azureAccount, + options.withRangeCoalescing, ); } @@ -201,6 +220,7 @@ export class IcechunkStore implements AsyncReadable { options.fetchClient, options.validateChecksums, options.azureAccount, + options.withRangeCoalescing, ); } @@ -236,6 +256,9 @@ export class IcechunkStore implements AsyncReadable { ...(this.fetchClient && { fetchClient: this.fetchClient }), validateChecksums: this.validateChecksums, azureAccount: this.azureAccount, + ...(this.withRangeCoalescing && { + withRangeCoalescing: this.withRangeCoalescing, + }), }); return chunk ?? undefined; } catch (err) { @@ -284,6 +307,9 @@ export class IcechunkStore implements AsyncReadable { ...(this.fetchClient && { fetchClient: this.fetchClient }), validateChecksums: this.validateChecksums, azureAccount: this.azureAccount, + ...(this.withRangeCoalescing && { + withRangeCoalescing: this.withRangeCoalescing, + }), }, ); return data ?? undefined; @@ -330,6 +356,7 @@ export class IcechunkStore implements AsyncReadable { this.fetchClient, this.validateChecksums, this.azureAccount, + this.withRangeCoalescing, ); const cleanPath = path.replace(/^\/+|\/+$/g, ""); scoped.basePath = this.basePath diff --git a/tests/reader/range-coalescer.test.ts b/tests/reader/range-coalescer.test.ts new file mode 100644 index 0000000..7f9ce13 --- /dev/null +++ b/tests/reader/range-coalescer.test.ts @@ -0,0 +1,156 @@ +import { describe, it, expect, vi } from "vitest"; +import { + makeStorageStore, + makeUrlStore, +} from "../../src/reader/range-coalescer.js"; +import type { ByteRange, RequestOptions, Storage } from "../../src/index.js"; + +function makeBacking(size: number): Uint8Array { + const data = new Uint8Array(size); + for (let i = 0; i < size; i++) data[i] = i & 0xff; + return data; +} + +function toArrayBuffer(data: Uint8Array): ArrayBuffer { + return data.buffer.slice( + data.byteOffset, + data.byteOffset + data.byteLength, + ) as ArrayBuffer; +} + +function mockFetchResponse(status: number, data: Uint8Array): Response { + return { + status, + statusText: status === 206 ? "Partial Content" : "OK", + arrayBuffer: vi.fn().mockResolvedValue(toArrayBuffer(data)), + } as unknown as Response; +} + +function makeStorage(data: Uint8Array): Storage { + return { + getObject: vi.fn( + async (_path: string, _range?: ByteRange, _options?: RequestOptions) => + data, + ), + exists: vi.fn(async () => true), + async *listPrefix() {}, + }; +} + +describe("range coalescer adapters", () => { + it("rejects full URL reads because virtual stores are range-only", async () => { + const store = makeUrlStore({ url: "https://example.com/data.bin" }); + + await expect(store.get("/")).rejects.toThrow( + "Virtual chunk URL store for https://example.com/data.bin only supports ranged reads", + ); + }); + + it("returns URL suffix range responses directly for 206 responses", async () => { + const url = "https://example.com/data.bin"; + const body = new Uint8Array([7, 8, 9]); + const fetchSpy = vi + .spyOn(globalThis, "fetch") + .mockResolvedValue(mockFetchResponse(206, body)); + const store = makeUrlStore({ url }); + + const result = await store.getRange("/", { suffixLength: 3 }); + + expect(fetchSpy).toHaveBeenCalledWith(url, { + headers: { Range: "bytes=-3" }, + signal: undefined, + }); + expect(result).toEqual(body); + + fetchSpy.mockRestore(); + }); + + it("rejects URL offset 206 responses with mismatched body length", async () => { + const url = "https://example.com/data.bin"; + const fetchSpy = vi + .spyOn(globalThis, "fetch") + .mockResolvedValue(mockFetchResponse(206, new Uint8Array([0, 1, 2, 3]))); + const store = makeUrlStore({ url }); + + await expect( + store.getRange("/", { offset: 10, length: 2 }), + ).rejects.toThrow( + "Virtual range response size mismatch for https://example.com/data.bin: expected 2 bytes, got 4", + ); + + fetchSpy.mockRestore(); + }); + + it("rejects URL suffix 206 responses with mismatched body length", async () => { + const url = "https://example.com/data.bin"; + const fetchSpy = vi + .spyOn(globalThis, "fetch") + .mockResolvedValue(mockFetchResponse(206, new Uint8Array([7, 8, 9]))); + const store = makeUrlStore({ url }); + + await expect(store.getRange("/", { suffixLength: 2 })).rejects.toThrow( + "Virtual range response size mismatch for https://example.com/data.bin: expected 2 bytes, got 3", + ); + + fetchSpy.mockRestore(); + }); + + it("slices URL suffix ranges from 200 full-body fallback responses", async () => { + const url = "https://example.com/full.bin"; + const backing = makeBacking(10); + const fetchSpy = vi + .spyOn(globalThis, "fetch") + .mockResolvedValue(mockFetchResponse(200, backing)); + const store = makeUrlStore({ url }); + + const result = await store.getRange("/", { suffixLength: 3 }); + + expect(result).toEqual(backing.slice(7)); + + fetchSpy.mockRestore(); + }); + + it("rejects undersized URL suffix 200 fallback responses", async () => { + const url = "https://example.com/short.bin"; + const fetchSpy = vi + .spyOn(globalThis, "fetch") + .mockResolvedValue(mockFetchResponse(200, new Uint8Array([1, 2]))); + const store = makeUrlStore({ url }); + + await expect(store.getRange("/", { suffixLength: 3 })).rejects.toThrow( + "Virtual suffix range request not honored", + ); + + fetchSpy.mockRestore(); + }); + + it("slices storage offset ranges when storage returns a full object", async () => { + const backing = makeBacking(10); + const storage = makeStorage(backing); + const store = makeStorageStore(storage); + + const result = await store.getRange("chunks/abc", { + offset: 2, + length: 3, + }); + + expect(storage.getObject).toHaveBeenCalledWith( + "chunks/abc", + { start: 2, end: 5 }, + undefined, + ); + expect(result).toEqual(backing.slice(2, 5)); + }); + + it("rejects storage suffix ranges instead of downloading the full object", async () => { + const storage = makeStorage(makeBacking(10)); + const store = makeStorageStore(storage); + + await expect( + store.getRange("chunks/abc", { suffixLength: 3 }), + ).rejects.toThrow( + "Storage suffix ranges are not supported for chunks/abc; convert suffixLength to offset/length before reading", + ); + expect(storage.getObject).not.toHaveBeenCalled(); + }); +}); diff --git a/tests/reader/session.test.ts b/tests/reader/session.test.ts index b144fc6..700b4d0 100644 --- a/tests/reader/session.test.ts +++ b/tests/reader/session.test.ts @@ -73,6 +73,7 @@ function createMockSession(options: { session.snapshot = mockSnapshot; session.specVersion = options.specVersion ?? SpecVersion.V1_0; session.manifestCache = new Map(); + session.nextFetchClientId = 1; return session; } @@ -683,7 +684,7 @@ describe("ReadSession", () => { }; await expect(session.fetchChunkPayload(payload)).rejects.toThrow( - "Storage returned 2 bytes", + /Storage returned 2 bytes for chunks\/.* range 3-6; expected 4 bytes/, ); expect(getObjectSpy).toHaveBeenCalledWith( @@ -856,7 +857,9 @@ describe("ReadSession", () => { offset: 1, length: 3, }), - ).rejects.toThrow("Storage returned 2 bytes"); + ).rejects.toThrow( + /Storage returned 2 bytes for chunks\/.* range 4-6; expected 3 bytes/, + ); expect(getObjectSpy).toHaveBeenCalledWith( expect.any(String), diff --git a/tests/reader/virtual-coalescing.test.ts b/tests/reader/virtual-coalescing.test.ts new file mode 100644 index 0000000..18d618f --- /dev/null +++ b/tests/reader/virtual-coalescing.test.ts @@ -0,0 +1,700 @@ +/** + * Virtual-chunk coalescing: verifies that when enabled and many virtual + * chunks share a backing URL, `fetchChunkPayload`/`fetchChunkPayloadRange` + * collapse the reads into a handful of merged Range GETs via zarrita's + * `withRangeCoalescing`. + * + * These tests only run when the installed zarrita exports + * `withRangeCoalescing`. The CI matrix also runs against older zarrita + * versions; those jobs still exercise the uncoalesced fallback path through + * the regular virtual-chunk tests in session.test.ts. For zarrita >= 0.7, + * absence of `withRangeCoalescing` is a test setup failure. + */ + +import { readFileSync } from "node:fs"; +import { join } from "node:path"; +import { describe, it, expect, vi } from "vitest"; +import { ReadSession } from "../../src/reader/session.js"; +import { MockStorage, createMockSnapshotId } from "../fixtures/mock-storage.js"; +import { SpecVersion } from "../../src/format/header.js"; +import type { Snapshot } from "../../src/format/flatbuffers/types.js"; +import type { RangeCoalescingFn } from "../../src/index.js"; + +const withRangeCoalescing = await import("zarrita").then( + (mod) => + (mod as Record).withRangeCoalescing as + | RangeCoalescingFn + | undefined, + () => undefined, +); + +function getInstalledZarritaVersion(): string | undefined { + try { + const packageJson = JSON.parse( + readFileSync( + join(process.cwd(), "node_modules", "zarrita", "package.json"), + "utf8", + ), + ) as { + name?: string; + version?: string; + }; + if (packageJson.name === "zarrita") return packageJson.version; + } catch { + return undefined; + } + return undefined; +} + +function isZarrita07OrNewer(version: string | undefined): boolean { + const match = /^(\d+)\.(\d+)\./.exec(version ?? ""); + if (!match) return false; + const major = Number(match[1]); + const minor = Number(match[2]); + return major > 0 || minor >= 7; +} + +const installedZarritaVersion = getInstalledZarritaVersion(); +if (!withRangeCoalescing && isZarrita07OrNewer(installedZarritaVersion)) { + throw new Error( + `Expected zarrita ${installedZarritaVersion} to export withRangeCoalescing`, + ); +} + +const itWithRangeCoalescing = withRangeCoalescing ? it : it.skip; + +/** + * Minimal ReadSession with just enough wiring to drive + * fetchChunkPayload / fetchChunkPayloadRange against mocked HTTP. + * Mirrors the `createMockSession` helper from session.test.ts. + */ +function createMockSession(options: { storage?: MockStorage } = {}): any { + const snapshot: Snapshot = { + id: createMockSnapshotId(1) as any, + parentId: null, + nodes: [], + flushedAt: BigInt(Date.now() * 1000), + message: "test commit", + metadata: [], + manifestFiles: [], + }; + const session = Object.create(ReadSession.prototype); + session.storage = options.storage ?? new MockStorage({}); + session.snapshot = snapshot; + session.specVersion = SpecVersion.V1_0; + session.manifestCache = new Map(); + session.nextFetchClientId = 1; + session.nextRangeCoalescerId = 1; + return session; +} + +/** + * Install a fetch spy whose response is an exact byte-range slice of + * `backing`. Returns the spy so tests can assert call counts / headers. + */ +function spyFetchEchoingBacking(backing: Uint8Array) { + return vi + .spyOn(globalThis, "fetch") + .mockImplementation(async (_url, init) => { + const rangeHeader = (init?.headers as Record).Range; + const match = /bytes=(\d+)-(\d+)/.exec(rangeHeader); + if (!match) throw new Error(`Unexpected Range header: ${rangeHeader}`); + const start = Number(match[1]); + const end = Number(match[2]) + 1; // HTTP Range is inclusive, JS slice is exclusive + const slice = backing.slice(start, end); + // Copy into a fresh ArrayBuffer so every response is an independent buffer. + const buf = new ArrayBuffer(slice.byteLength); + new Uint8Array(buf).set(slice); + return { + ok: true, + status: 206, + statusText: "Partial Content", + arrayBuffer: vi.fn().mockResolvedValue(buf), + } as unknown as Response; + }); +} + +function spyStorageEchoingBacking(storage: MockStorage, backing: Uint8Array) { + return vi + .spyOn(storage, "getObject") + .mockImplementation(async (_path, range) => { + if (!range) return backing; + return backing.slice(range.start, range.end); + }); +} + +function virtualPayload( + location: string, + offset: number, + length: number, + opts: { etag?: string | null; lastModified?: number } = {}, +) { + return { + type: "virtual" as const, + location, + offset, + length, + checksumEtag: opts.etag ?? null, + checksumLastModified: opts.lastModified ?? 0, + }; +} + +/** + * Build a deterministic byte pattern so every requested slice is + * distinguishable by content — lets us catch off-by-one errors where one + * caller gets another caller's bytes. + */ +function makeBacking(size: number): Uint8Array { + const data = new Uint8Array(size); + for (let i = 0; i < size; i++) data[i] = i & 0xff; + return data; +} + +const coalescingOptions = { withRangeCoalescing }; + +async function waitUntil( + condition: () => boolean, + message: string, +): Promise { + for (let i = 0; i < 20; i++) { + if (condition()) return; + await new Promise((resolve) => setTimeout(resolve, 0)); + } + throw new Error(message); +} + +describe("Virtual chunk coalescing", () => { + it("does not coalesce virtual reads unless range coalescing is enabled", async () => { + const backing = makeBacking(1024); + const fetchSpy = spyFetchEchoingBacking(backing); + const session = createMockSession(); + const url = "https://example.com/default.bin"; + + const [a, b, c] = await Promise.all([ + session.fetchChunkPayload(virtualPayload(url, 0, 10)), + session.fetchChunkPayload(virtualPayload(url, 20, 10)), + session.fetchChunkPayload(virtualPayload(url, 40, 10)), + ]); + + expect(fetchSpy).toHaveBeenCalledTimes(3); + expect( + fetchSpy.mock.calls.map( + (call) => (call[1]!.headers as Record).Range, + ), + ).toEqual(["bytes=0-9", "bytes=20-29", "bytes=40-49"]); + expect(a).toEqual(backing.slice(0, 10)); + expect(b).toEqual(backing.slice(20, 30)); + expect(c).toEqual(backing.slice(40, 50)); + + fetchSpy.mockRestore(); + }); + + it("partitions cached stores by range coalescer function", async () => { + const session = createMockSession(); + const url = "https://example.com/coalescer.bin"; + const coalescerA: RangeCoalescingFn = vi.fn((store) => ({ + ...store, + getRange: vi.fn(async () => new Uint8Array([1])), + })); + const coalescerB: RangeCoalescingFn = vi.fn((store) => ({ + ...store, + getRange: vi.fn(async () => new Uint8Array([2])), + })); + + const a = await session.fetchChunkPayload(virtualPayload(url, 0, 1), { + withRangeCoalescing: coalescerA, + }); + const b = await session.fetchChunkPayload(virtualPayload(url, 0, 1), { + withRangeCoalescing: coalescerB, + }); + + expect(a).toEqual(new Uint8Array([1])); + expect(b).toEqual(new Uint8Array([2])); + expect(coalescerA).toHaveBeenCalledTimes(1); + expect(coalescerB).toHaveBeenCalledTimes(1); + }); + + itWithRangeCoalescing( + "merges concurrent same-URL reads within the gap threshold into one fetch", + async () => { + const backing = makeBacking(1024); + const fetchSpy = spyFetchEchoingBacking(backing); + const session = createMockSession(); + const url = "https://example.com/data.bin"; + + const [a, b, c] = await Promise.all([ + session.fetchChunkPayload( + virtualPayload(url, 0, 10), + coalescingOptions, + ), + session.fetchChunkPayload( + virtualPayload(url, 20, 10), + coalescingOptions, + ), + session.fetchChunkPayload( + virtualPayload(url, 40, 10), + coalescingOptions, + ), + ]); + + // One merged GET instead of three individual ones. + expect(fetchSpy).toHaveBeenCalledTimes(1); + const [calledUrl, calledInit] = fetchSpy.mock.calls[0]!; + expect(calledUrl).toBe(url); + // Merged span is 0..49 — smallest offset through largest (offset+length-1). + expect((calledInit!.headers as Record).Range).toBe( + "bytes=0-49", + ); + + // Each caller should still get exactly its requested bytes. + expect(a).toEqual(backing.slice(0, 10)); + expect(b).toEqual(backing.slice(20, 30)); + expect(c).toEqual(backing.slice(40, 50)); + + fetchSpy.mockRestore(); + }, + ); + + itWithRangeCoalescing( + "rejects overlong 206 responses before slicing coalesced virtual reads", + async () => { + const backing = makeBacking(1024); + const fetchSpy = vi.spyOn(globalThis, "fetch").mockResolvedValue({ + ok: true, + status: 206, + statusText: "Partial Content", + arrayBuffer: vi.fn().mockResolvedValue(backing.buffer), + } as unknown as Response); + const session = createMockSession(); + const url = "https://example.com/overlong.bin"; + + await expect( + Promise.all([ + session.fetchChunkPayload( + virtualPayload(url, 20, 10), + coalescingOptions, + ), + session.fetchChunkPayload( + virtualPayload(url, 40, 10), + coalescingOptions, + ), + ]), + ).rejects.toThrow( + "Virtual range response size mismatch for https://example.com/overlong.bin: expected 30 bytes, got 1024", + ); + + expect(fetchSpy).toHaveBeenCalledTimes(1); + const [, init] = fetchSpy.mock.calls[0]!; + expect((init!.headers as Record).Range).toBe( + "bytes=20-49", + ); + + fetchSpy.mockRestore(); + }, + ); + + itWithRangeCoalescing( + "issues separate fetches when concurrent reads are farther apart than the 32KB coalesce gap", + async () => { + // 200_000-byte backing so we can place one read at 0 and another at + // 100_000 — way beyond zarrita's default 32KB gap. + const backing = makeBacking(200_000); + const fetchSpy = spyFetchEchoingBacking(backing); + const session = createMockSession(); + const url = "https://example.com/sparse.bin"; + + const [near, far] = await Promise.all([ + session.fetchChunkPayload( + virtualPayload(url, 0, 100), + coalescingOptions, + ), + session.fetchChunkPayload( + virtualPayload(url, 100_000, 100), + coalescingOptions, + ), + ]); + + // Past the gap threshold → two separate HTTP fetches. + expect(fetchSpy).toHaveBeenCalledTimes(2); + expect(near).toEqual(backing.slice(0, 100)); + expect(far).toEqual(backing.slice(100_000, 100_100)); + + fetchSpy.mockRestore(); + }, + ); + + itWithRangeCoalescing( + "never coalesces reads that target different backing URLs", + async () => { + const backing = makeBacking(1024); + const fetchSpy = spyFetchEchoingBacking(backing); + const session = createMockSession(); + + const urlA = "https://example.com/a.bin"; + const urlB = "https://example.com/b.bin"; + + const [a, b] = await Promise.all([ + session.fetchChunkPayload( + virtualPayload(urlA, 0, 32), + coalescingOptions, + ), + session.fetchChunkPayload( + virtualPayload(urlB, 0, 32), + coalescingOptions, + ), + ]); + + expect(fetchSpy).toHaveBeenCalledTimes(2); + const calledUrls = fetchSpy.mock.calls.map((c) => c[0]); + expect(calledUrls).toEqual(expect.arrayContaining([urlA, urlB])); + expect(a).toEqual(backing.slice(0, 32)); + expect(b).toEqual(backing.slice(0, 32)); + + fetchSpy.mockRestore(); + }, + ); + + itWithRangeCoalescing( + "partitions the store cache by checksum when validateChecksums is set, preventing mismatched-checksum coalescing", + async () => { + const backing = makeBacking(1024); + const fetchSpy = spyFetchEchoingBacking(backing); + const session = createMockSession(); + const url = "https://example.com/checksum.bin"; + + const [a, b] = await Promise.all([ + session.fetchChunkPayload( + virtualPayload(url, 0, 10, { etag: '"v1"' }), + { validateChecksums: true, withRangeCoalescing }, + ), + session.fetchChunkPayload( + virtualPayload(url, 20, 10, { etag: '"v2"' }), + { validateChecksums: true, withRangeCoalescing }, + ), + ]); + + // Different checksums → two separate stores → two separate fetches. + expect(fetchSpy).toHaveBeenCalledTimes(2); + expect(a).toEqual(backing.slice(0, 10)); + expect(b).toEqual(backing.slice(20, 30)); + + // Each fetch must carry its own conditional header. + const headers = fetchSpy.mock.calls.map( + (c) => c[1]!.headers as Record, + ); + const etagsSent = headers.map((h) => h["If-Match"]).sort(); + expect(etagsSent).toEqual(['"v1"', '"v2"']); + + fetchSpy.mockRestore(); + }, + ); + + itWithRangeCoalescing( + "coalesces same-URL same-checksum reads into a single conditional GET when validateChecksums is set", + async () => { + const backing = makeBacking(1024); + const fetchSpy = spyFetchEchoingBacking(backing); + const session = createMockSession(); + const url = "https://example.com/conditional.bin"; + const etag = '"shared-etag"'; + const lastModified = 1_700_000_000; // arbitrary epoch seconds + + const [a, b] = await Promise.all([ + session.fetchChunkPayload( + virtualPayload(url, 0, 10, { etag, lastModified }), + { validateChecksums: true, withRangeCoalescing }, + ), + session.fetchChunkPayload( + virtualPayload(url, 16, 10, { etag, lastModified }), + { validateChecksums: true, withRangeCoalescing }, + ), + ]); + + expect(fetchSpy).toHaveBeenCalledTimes(1); + const headers = fetchSpy.mock.calls[0]![1]!.headers as Record< + string, + string + >; + expect(headers.Range).toBe("bytes=0-25"); + expect(headers["If-Match"]).toBe(etag); + expect(headers["If-Unmodified-Since"]).toBe( + new Date(lastModified * 1000).toUTCString(), + ); + expect(a).toEqual(backing.slice(0, 10)); + expect(b).toEqual(backing.slice(16, 26)); + + fetchSpy.mockRestore(); + }, + ); + + itWithRangeCoalescing( + "coalesces fetchChunkPayloadRange calls the same way as fetchChunkPayload", + async () => { + const backing = makeBacking(1024); + const fetchSpy = spyFetchEchoingBacking(backing); + const session = createMockSession(); + const url = "https://example.com/range.bin"; + + // Two chunks at different offsets, each asking for a sub-range inside + // the chunk. The coalesced request should span both absolute windows. + const [a, b] = await Promise.all([ + session.fetchChunkPayloadRange( + virtualPayload(url, 100, 50), + { offset: 10, length: 20 }, // absolute 110..129 + coalescingOptions, + ), + session.fetchChunkPayloadRange( + virtualPayload(url, 200, 50), + { offset: 5, length: 10 }, // absolute 205..214 + coalescingOptions, + ), + ]); + + expect(fetchSpy).toHaveBeenCalledTimes(1); + const [, init] = fetchSpy.mock.calls[0]!; + expect((init!.headers as Record).Range).toBe( + "bytes=110-214", + ); + expect(a).toEqual(backing.slice(110, 130)); + expect(b).toEqual(backing.slice(205, 215)); + + fetchSpy.mockRestore(); + }, + ); + + itWithRangeCoalescing( + "partitions cached virtual stores by fetch client", + async () => { + const backing = makeBacking(1024); + const session = createMockSession(); + const url = "https://example.com/client.bin"; + + function responseFor(offset: number, length: number) { + const slice = backing.slice(offset, offset + length); + const buf = new ArrayBuffer(slice.byteLength); + new Uint8Array(buf).set(slice); + return { + status: 206, + statusText: "Partial Content", + arrayBuffer: vi.fn().mockResolvedValue(buf), + } as unknown as Response; + } + + const clientA = { + fetch: vi.fn().mockResolvedValue(responseFor(0, 10)), + }; + const clientB = { + fetch: vi.fn().mockResolvedValue(responseFor(20, 10)), + }; + + await session.fetchChunkPayload(virtualPayload(url, 0, 10), { + fetchClient: clientA, + withRangeCoalescing, + }); + await session.fetchChunkPayload(virtualPayload(url, 20, 10), { + fetchClient: clientB, + withRangeCoalescing, + }); + + expect(clientA.fetch).toHaveBeenCalledTimes(1); + expect(clientB.fetch).toHaveBeenCalledTimes(1); + expect(clientB.fetch).toHaveBeenCalledWith(url, { + headers: { Range: "bytes=20-29" }, + signal: undefined, + }); + }, + ); + + itWithRangeCoalescing( + "coalesces virtual reads with different abort signals using zarrita's merged signal", + async () => { + const backing = makeBacking(1024); + const fetchSpy = spyFetchEchoingBacking(backing); + const session = createMockSession(); + const url = "https://example.com/signals.bin"; + const controllerA = new AbortController(); + const controllerB = new AbortController(); + + const [a, b] = await Promise.all([ + session.fetchChunkPayload(virtualPayload(url, 0, 10), { + signal: controllerA.signal, + withRangeCoalescing, + }), + session.fetchChunkPayload(virtualPayload(url, 20, 10), { + signal: controllerB.signal, + withRangeCoalescing, + }), + ]); + + expect(fetchSpy).toHaveBeenCalledTimes(1); + const [, init] = fetchSpy.mock.calls[0]!; + expect((init!.headers as Record).Range).toBe( + "bytes=0-29", + ); + expect(init!.signal).toBeInstanceOf(AbortSignal); + expect(a).toEqual(backing.slice(0, 10)); + expect(b).toEqual(backing.slice(20, 30)); + + fetchSpy.mockRestore(); + }, + ); + + itWithRangeCoalescing( + "rejects all virtual reads in a coalesced batch when one signal aborts", + async () => { + const session = createMockSession(); + const url = "https://example.com/mid-abort.bin"; + const controllerA = new AbortController(); + const controllerB = new AbortController(); + let pending: + | { + range: string; + signal: AbortSignal | undefined; + } + | undefined; + + const fetchSpy = vi + .spyOn(globalThis, "fetch") + .mockImplementation((_url, init) => { + const range = (init?.headers as Record).Range; + const signal = init?.signal as AbortSignal | undefined; + + return new Promise((_resolve, reject) => { + if (signal?.aborted) { + reject( + signal.reason ?? + new DOMException("Operation aborted", "AbortError"), + ); + return; + } + signal?.addEventListener( + "abort", + () => + reject( + signal.reason ?? + new DOMException("Operation aborted", "AbortError"), + ), + { once: true }, + ); + pending = { range, signal }; + }); + }); + + const promiseA = session.fetchChunkPayload(virtualPayload(url, 0, 10), { + signal: controllerA.signal, + withRangeCoalescing, + }); + const rejectedA = promiseA.then( + () => { + throw new Error("expected request A to abort"); + }, + (error: unknown) => error, + ); + + const promiseB = session.fetchChunkPayload(virtualPayload(url, 20, 10), { + signal: controllerB.signal, + withRangeCoalescing, + }); + const rejectedB = promiseB.then( + () => { + throw new Error("expected request B to abort"); + }, + (error: unknown) => error, + ); + + await waitUntil( + () => pending !== undefined, + "expected the coalesced fetch to start", + ); + + controllerA.abort(); + + expect(pending!.range).toBe("bytes=0-29"); + expect(pending!.signal).toBeInstanceOf(AbortSignal); + + expect(await rejectedA).toMatchObject({ name: "AbortError" }); + expect(await rejectedB).toMatchObject({ name: "AbortError" }); + expect(fetchSpy).toHaveBeenCalledTimes(1); + + fetchSpy.mockRestore(); + }, + ); + + itWithRangeCoalescing( + "coalesces native chunk range reads that target the same storage object", + async () => { + const backing = makeBacking(1024); + const storage = new MockStorage({}); + const getObjectSpy = spyStorageEchoingBacking(storage, backing); + const session = createMockSession({ storage }); + const payload = { + type: "native" as const, + chunkId: createMockSnapshotId(101) as any, + offset: 0, + length: 128, + }; + + const [a, b] = await Promise.all([ + session.fetchChunkPayloadRange( + payload, + { offset: 0, length: 10 }, + coalescingOptions, + ), + session.fetchChunkPayloadRange( + payload, + { offset: 20, length: 10 }, + coalescingOptions, + ), + ]); + + expect(getObjectSpy).toHaveBeenCalledTimes(1); + expect(getObjectSpy).toHaveBeenCalledWith( + expect.any(String), + { start: 0, end: 30 }, + undefined, + ); + expect(a).toEqual(backing.slice(0, 10)); + expect(b).toEqual(backing.slice(20, 30)); + }, + ); + + itWithRangeCoalescing( + "coalesces native reads with different abort signals", + async () => { + const backing = makeBacking(1024); + const storage = new MockStorage({}); + const getObjectSpy = spyStorageEchoingBacking(storage, backing); + const session = createMockSession({ storage }); + const payload = { + type: "native" as const, + chunkId: createMockSnapshotId(102) as any, + offset: 0, + length: 128, + }; + const controllerA = new AbortController(); + const controllerB = new AbortController(); + + const [a, b] = await Promise.all([ + session.fetchChunkPayloadRange( + payload, + { offset: 0, length: 10 }, + { signal: controllerA.signal, withRangeCoalescing }, + ), + session.fetchChunkPayloadRange( + payload, + { offset: 20, length: 10 }, + { signal: controllerB.signal, withRangeCoalescing }, + ), + ]); + + expect(getObjectSpy).toHaveBeenCalledTimes(1); + expect(getObjectSpy).toHaveBeenCalledWith( + expect.any(String), + { start: 0, end: 30 }, + { signal: expect.any(AbortSignal) }, + ); + expect(a).toEqual(backing.slice(0, 10)); + expect(b).toEqual(backing.slice(20, 30)); + }, + ); +}); diff --git a/tests/store.test.ts b/tests/store.test.ts index aec9f67..95e76c5 100644 --- a/tests/store.test.ts +++ b/tests/store.test.ts @@ -3,17 +3,24 @@ import { IcechunkStore } from "../src/store.js"; import type { AbsolutePath } from "../src/store.js"; import { MockStorage } from "./fixtures/mock-storage.js"; import { NotFoundError } from "../src/storage/storage.js"; +import type { RangeCoalescingFn } from "../src/index.js"; /** * Helper to create an IcechunkStore with a mock session. */ -function createStoreWithMockSession(mockSession: { - getRawMetadata: ReturnType; - getChunk: ReturnType; - getChunkRange?: ReturnType; -}): IcechunkStore { +function createStoreWithMockSession( + mockSession: { + getRawMetadata: ReturnType; + getChunk: ReturnType; + getChunkRange?: ReturnType; + }, + options: { withRangeCoalescing?: RangeCoalescingFn } = {}, +): IcechunkStore { const store = Object.create(IcechunkStore.prototype); store.session = mockSession; + if (options.withRangeCoalescing !== undefined) { + store.withRangeCoalescing = options.withRangeCoalescing; + } return store; } @@ -96,6 +103,26 @@ describe("IcechunkStore", () => { expect(result).toEqual(new Uint8Array([100])); }); + it("should pass withRangeCoalescing through when enabled", async () => { + getChunkSpy.mockResolvedValue(new Uint8Array([10])); + const withRangeCoalescing: RangeCoalescingFn = vi.fn((store) => store); + store = createStoreWithMockSession( + { + getRawMetadata: getRawMetadataSpy, + getChunk: getChunkSpy, + }, + { withRangeCoalescing }, + ); + + await store.get("/array/c/1" as AbsolutePath); + + expect(getChunkSpy).toHaveBeenCalledWith( + "/array", + [1], + expect.objectContaining({ withRangeCoalescing }), + ); + }); + it("should return undefined for missing chunks", async () => { getChunkSpy.mockResolvedValue(null);