Skip to content

Commit 70f04da

Browse files
Add a Parquet export variant to the lakehouse feed
GET /collections/{collectionId}/export?format=parquet emits the columnar WKB plus a bbox/time sidecar (id, properties, trajectory_wkb, xmin/ymin/xmax/ymax, tmin/tmax), written one row group per batch from a server-side cursor so a lake consumer can prune by space and time before decoding geometry. The default NDJSON feed is unchanged.
1 parent e028c94 commit 70f04da

1 file changed

Lines changed: 90 additions & 21 deletions

File tree

resource/collection/Export.py

Lines changed: 90 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,62 @@
11
# GET /collections/{collectionId}/export
2-
# Lakehouse bulk feed: the collection's moving features streamed as NDJSON (one
3-
# Feature per line, the temporal geometry as MF-JSON) from a server-side cursor,
4-
# so memory is bounded regardless of collection size. A lake consumer (DuckDB /
5-
# MobilityDuck / Spark) ingests the stream directly.
2+
# Lakehouse bulk feed. Default is NDJSON (one Feature per line, the temporal
3+
# geometry as MF-JSON), streamed from a server-side cursor so memory is bounded
4+
# regardless of collection size. ?format=parquet emits the columnar WKB + a
5+
# bbox/time sidecar (xmin..tmax) that a lake consumer (DuckDB / MobilityDuck /
6+
# Spark) can prune by space and time before decoding geometry.
7+
import io
68
import json
9+
from urllib.parse import urlparse, parse_qs
710

811
from utils import handle_error
912

13+
_NDJSON_SQL = """
14+
SELECT mf.id,
15+
coalesce(mf.properties, '{}'::jsonb)::text,
16+
asMFJSON(tg.trajectory)
17+
FROM moving_features mf
18+
LEFT JOIN temporal_geometries tg
19+
ON mf.id = tg.feature_id AND mf.collection_id = tg.collection_id
20+
WHERE mf.collection_id = %s
21+
ORDER BY mf.id
22+
"""
23+
24+
_PARQUET_SQL = """
25+
SELECT mf.id,
26+
coalesce(mf.properties, '{}'::jsonb)::text,
27+
asBinary(tg.trajectory),
28+
Xmin(stbox(tg.trajectory)), Ymin(stbox(tg.trajectory)),
29+
Xmax(stbox(tg.trajectory)), Ymax(stbox(tg.trajectory)),
30+
Tmin(stbox(tg.trajectory))::text, Tmax(stbox(tg.trajectory))::text
31+
FROM moving_features mf
32+
LEFT JOIN temporal_geometries tg
33+
ON mf.id = tg.feature_id AND mf.collection_id = tg.collection_id
34+
WHERE mf.collection_id = %s
35+
ORDER BY mf.id
36+
"""
37+
1038

1139
def export_collection(self, collection_id, connection, cursor):
1240
cursor.execute("SELECT id FROM collections WHERE id = %s", (collection_id,))
1341
if cursor.fetchone() is None:
1442
handle_error(self, 404, f"Collection '{collection_id}' not found")
1543
return
1644

45+
fmt = parse_qs(urlparse(self.path).query).get("format", ["ndjson"])[0]
46+
if fmt == "parquet":
47+
_export_parquet(self, collection_id, connection)
48+
else:
49+
_export_ndjson(self, collection_id, connection)
50+
51+
52+
def _export_ndjson(self, collection_id, connection):
1753
self.send_response(200)
1854
self.send_header("Content-Type", "application/x-ndjson")
1955
self.end_headers()
20-
21-
# Server-side cursor: rows are fetched in batches, so a large collection
22-
# streams at bounded memory.
23-
stream = connection.cursor(name="mfapi_export")
56+
stream = connection.cursor(name="mfapi_export_nd")
2457
stream.itersize = 1000
2558
try:
26-
stream.execute(
27-
"""
28-
SELECT mf.id,
29-
coalesce(mf.properties, '{}'::jsonb)::text,
30-
asMFJSON(tg.trajectory)
31-
FROM moving_features mf
32-
LEFT JOIN temporal_geometries tg
33-
ON mf.id = tg.feature_id AND mf.collection_id = tg.collection_id
34-
WHERE mf.collection_id = %s
35-
ORDER BY mf.id
36-
""",
37-
(collection_id,),
38-
)
59+
stream.execute(_NDJSON_SQL, (collection_id,))
3960
for fid, properties, tgeom in stream:
4061
feature = {
4162
"type": "Feature",
@@ -46,3 +67,51 @@ def export_collection(self, collection_id, connection, cursor):
4667
self.wfile.write((json.dumps(feature) + "\n").encode("utf-8"))
4768
finally:
4869
stream.close()
70+
71+
72+
def _export_parquet(self, collection_id, connection):
73+
import pyarrow as pa
74+
import pyarrow.parquet as pq
75+
76+
schema = pa.schema([
77+
("id", pa.string()), ("properties", pa.string()), ("trajectory_wkb", pa.binary()),
78+
("xmin", pa.float64()), ("ymin", pa.float64()), ("xmax", pa.float64()), ("ymax", pa.float64()),
79+
("tmin", pa.string()), ("tmax", pa.string()),
80+
])
81+
rowgroup = 1024
82+
sink = io.BytesIO()
83+
writer = pq.ParquetWriter(sink, schema)
84+
stream = connection.cursor(name="mfapi_export_pq")
85+
stream.itersize = rowgroup
86+
cols = {name: [] for name in schema.names}
87+
88+
def flush():
89+
if not cols["id"]:
90+
return
91+
# one row group per batch: the pyarrow working set is bounded by rowgroup,
92+
# and each row group carries its own min/max statistics for pushdown.
93+
writer.write_table(pa.table(cols, schema=schema))
94+
for name in cols:
95+
cols[name].clear()
96+
97+
try:
98+
stream.execute(_PARQUET_SQL, (collection_id,))
99+
for fid, props, wkb, xmin, ymin, xmax, ymax, tmin, tmax in stream:
100+
cols["id"].append(fid)
101+
cols["properties"].append(props)
102+
cols["trajectory_wkb"].append(bytes(wkb) if wkb is not None else None)
103+
cols["xmin"].append(xmin); cols["ymin"].append(ymin)
104+
cols["xmax"].append(xmax); cols["ymax"].append(ymax)
105+
cols["tmin"].append(tmin); cols["tmax"].append(tmax)
106+
if len(cols["id"]) >= rowgroup:
107+
flush()
108+
flush()
109+
finally:
110+
stream.close()
111+
writer.close()
112+
113+
self.send_response(200)
114+
self.send_header("Content-Type", "application/vnd.apache.parquet")
115+
self.send_header("Content-Disposition", f'attachment; filename="{collection_id}.parquet"')
116+
self.end_headers()
117+
self.wfile.write(sink.getvalue())

0 commit comments

Comments
 (0)