Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
6b54911
fix(ci): make the Nix build work with NES_ENABLE_MEOS=OFF and stock p…
estebanzimanyi Jun 11, 2026
bfdc695
feat(berlinmod): scaffold the full BerlinMOD-9 × 3-form parity matrix…
estebanzimanyi May 21, 2026
4dd3302
feat(meos): TEMPORAL_LENGTH aggregation closes BerlinMOD-Q6 streaming…
estebanzimanyi May 21, 2026
78cf5a6
feat(meos): PAIR_MEETING + CROSS_DISTANCE aggregations close Q5 + Q9 …
estebanzimanyi May 21, 2026
afe4e2e
docs(berlinmod): streaming-semantics tier overlay + remove stale 'Not…
estebanzimanyi May 21, 2026
267fe99
feat(meos): parameterize PAIR_MEETING dMeet via SQL constant fifth arg
estebanzimanyi May 21, 2026
9171dbe
feat(meos): parameterize CROSS_DISTANCE (vidA, vidB) via SQL constant…
estebanzimanyi May 21, 2026
4f60e2e
tools(codegen): MEOS-operator generator + design proposal for Nebula …
estebanzimanyi May 21, 2026
d70f88e
fix(meos): proto extra_fields + Werror unused-param in aggregations
estebanzimanyi May 21, 2026
fa4fc7d
feat(meos): W1 codegen output — 5 spatial-relation operators (tgeo × …
estebanzimanyi May 21, 2026
d3e0845
feat(meos): W2 codegen output — close the _tgeo_geo spatial-rel row
estebanzimanyi May 21, 2026
b5aab48
feat(meos): W3 codegen output — closes the _tgeo_tgeo spatial-rel row…
estebanzimanyi May 21, 2026
2e302c5
feat(meos): W4 codegen — distance family (nad + dwithin, 5 ops + 2 te…
estebanzimanyi May 21, 2026
20bd61e
feat(codegen): auto-inject parser glue — closes SQL loop for W1–W4 (2…
estebanzimanyi May 21, 2026
2fe981e
feat(meos): W5a codegen — tnumber NAD ops (4 ops + 2 templates + 2 sy…
estebanzimanyi May 21, 2026
66f15cc
feat(meos): W6 codegen — tgeo restriction at/minus geom (2 ops + 1 te…
estebanzimanyi May 21, 2026
f064d40
feat(meos): W7 codegen — windowed aggregations (12 ops + tools/codege…
estebanzimanyi May 21, 2026
8005822
feat(meos): W8 codegen — tnumber avg/twavg aggregations (3 ops, mecha…
estebanzimanyi May 21, 2026
861ad2a
feat(meos): W9 codegen — tgeo scalar accessors w/ new return types (5…
estebanzimanyi May 21, 2026
8321211
feat(meos): W10 codegen — tcbuffer × geo spatial-rels (10 ops + 1 tem…
estebanzimanyi May 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
7 changes: 7 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,13 @@ endif ()

add_custom_target(build_all_plugins)

# Declared here so add_compile_definitions reaches all sibling nes-* targets.
# nes-plugins/CMakeLists.txt re-declares the same option (no-op when cached).
option(NES_ENABLE_MEOS "Enable MEOS plugin (requires libmeos installed on the system)" ON)
if(NES_ENABLE_MEOS)
add_compile_definitions(NES_ENABLE_MEOS)
endif()

# Add target for common lib, which contains a minimal set
# of shared functionality used by all components of nes
file(GLOB NES_DIRECTORIES RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} "nes-*")
Expand Down
21 changes: 21 additions & 0 deletions Input/input_berlinmod.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
1735711200,100,4.3517,50.8503
1735711200,300,4.2000,50.7500
1735711201,200,4.3060,50.8270
1735711202,100,4.3517,50.8503
1735711202,300,4.2000,50.7500
1735711203,200,4.3060,50.8270
1735711204,100,4.3517,50.8503
1735711204,300,4.2000,50.7500
1735711205,200,4.3060,50.8270
1735711206,100,4.3517,50.8503
1735711206,300,4.2000,50.7500
1735711207,200,4.3060,50.8270
1735711208,100,4.3517,50.8503
1735711208,300,4.2000,50.7500
1735711209,200,4.3060,50.8270
1735711210,100,4.3517,50.8503
1735711210,300,4.2000,50.7500
1735711211,200,4.3060,50.8270
1735711212,100,4.3517,50.8503
1735711212,300,4.2000,50.7500
1735711213,200,4.3060,50.8270
47 changes: 47 additions & 0 deletions Queries/berlinmod/q1_continuous.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# BerlinMOD-Q1 — continuous form
# "Which vehicles have appeared in the stream?"
# Per 1-second sliding bucket: emit (start, end, vehicle_id, event-count-in-bucket).
# Reading N rows over consecutive buckets enumerates the distinct-vehicles-seen set.

query: |
SELECT start,
end,
vehicle_id,
COUNT(time_utc) AS events
FROM berlinmod_stream
GROUP BY vehicle_id
WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC)
INTO file_sink;

sinks:
- name: FILE_SINK
type: File
schema:
- { name: BERLINMOD_STREAM$START, type: UINT64 }
- { name: BERLINMOD_STREAM$END, type: UINT64 }
- { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 }
- { name: BERLINMOD_STREAM$EVENTS, type: UINT64 }
config:
file_path: "/workspace/Output/output_berlinmod_q1_continuous.csv"
input_format: CSV

logical:
- name: BERLINMOD_STREAM
schema:
- { name: TIME_UTC, type: UINT64 }
- { name: VEHICLE_ID, type: UINT64 }
- { name: GPS_LON, type: FLOAT64 }
- { name: GPS_LAT, type: FLOAT64 }

physical:
- logical: BERLINMOD_STREAM
type: TCP
parser_config:
type: CSV
field_delimiter: ","
tuple_delimiter: "\n"
source_config:
socket_host: "host.docker.internal"
socket_port: "32325"
socket_type: "SOCK_STREAM"
socket_domain: "AF_INET"
46 changes: 46 additions & 0 deletions Queries/berlinmod/q1_snapshot.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# BerlinMOD-Q1 — snapshot form
# "At each 5-second tick, list of distinct vehicles seen in the tick window."
# Streaming approximation of the batch BerlinMOD-Q1 snapshot at time T.

query: |
SELECT start,
end,
vehicle_id,
COUNT(time_utc) AS events
FROM berlinmod_stream
GROUP BY vehicle_id
WINDOW TUMBLING(time_utc, SIZE 5 SEC)
INTO file_sink;

sinks:
- name: FILE_SINK
type: File
schema:
- { name: BERLINMOD_STREAM$START, type: UINT64 }
- { name: BERLINMOD_STREAM$END, type: UINT64 }
- { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 }
- { name: BERLINMOD_STREAM$EVENTS, type: UINT64 }
config:
file_path: "/workspace/Output/output_berlinmod_q1_snapshot.csv"
input_format: CSV

logical:
- name: BERLINMOD_STREAM
schema:
- { name: TIME_UTC, type: UINT64 }
- { name: VEHICLE_ID, type: UINT64 }
- { name: GPS_LON, type: FLOAT64 }
- { name: GPS_LAT, type: FLOAT64 }

physical:
- logical: BERLINMOD_STREAM
type: TCP
parser_config:
type: CSV
field_delimiter: ","
tuple_delimiter: "\n"
source_config:
socket_host: "host.docker.internal"
socket_port: "32325"
socket_type: "SOCK_STREAM"
socket_domain: "AF_INET"
46 changes: 46 additions & 0 deletions Queries/berlinmod/q1_windowed.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# BerlinMOD-Q1 — windowed form
# "Per 10-second tumbling window, distinct vehicles seen."
# Emits one row per (window, vehicle) seen; reading N rows per window = distinctCount.

query: |
SELECT start,
end,
vehicle_id,
COUNT(time_utc) AS events
FROM berlinmod_stream
GROUP BY vehicle_id
WINDOW TUMBLING(time_utc, SIZE 10 SEC)
INTO file_sink;

sinks:
- name: FILE_SINK
type: File
schema:
- { name: BERLINMOD_STREAM$START, type: UINT64 }
- { name: BERLINMOD_STREAM$END, type: UINT64 }
- { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 }
- { name: BERLINMOD_STREAM$EVENTS, type: UINT64 }
config:
file_path: "/workspace/Output/output_berlinmod_q1_windowed.csv"
input_format: CSV

logical:
- name: BERLINMOD_STREAM
schema:
- { name: TIME_UTC, type: UINT64 }
- { name: VEHICLE_ID, type: UINT64 }
- { name: GPS_LON, type: FLOAT64 }
- { name: GPS_LAT, type: FLOAT64 }

physical:
- logical: BERLINMOD_STREAM
type: TCP
parser_config:
type: CSV
field_delimiter: ","
tuple_delimiter: "\n"
source_config:
socket_host: "host.docker.internal"
socket_port: "32325"
socket_type: "SOCK_STREAM"
socket_domain: "AF_INET"
44 changes: 44 additions & 0 deletions Queries/berlinmod/q2_continuous.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# BerlinMOD-Q2 — continuous form
# "Where is vehicle X (= 200) right now?"
# Per 1-second sliding bucket, emit a trajectory snippet for vehicle X.

query: |
SELECT start,
end,
TEMPORAL_SEQUENCE(gps_lon, gps_lat, time_utc) AS trajectory
FROM berlinmod_stream
WHERE vehicle_id = UINT64(200)
WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC)
INTO file_sink;

sinks:
- name: FILE_SINK
type: File
schema:
- { name: BERLINMOD_STREAM$START, type: UINT64 }
- { name: BERLINMOD_STREAM$END, type: UINT64 }
- { name: BERLINMOD_STREAM$TRAJECTORY, type: VARSIZED }
config:
file_path: "/workspace/Output/output_berlinmod_q2_continuous.csv"
input_format: CSV

logical:
- name: BERLINMOD_STREAM
schema:
- { name: TIME_UTC, type: UINT64 }
- { name: VEHICLE_ID, type: UINT64 }
- { name: GPS_LON, type: FLOAT64 }
- { name: GPS_LAT, type: FLOAT64 }

physical:
- logical: BERLINMOD_STREAM
type: TCP
parser_config:
type: CSV
field_delimiter: ","
tuple_delimiter: "\n"
source_config:
socket_host: "host.docker.internal"
socket_port: "32325"
socket_type: "SOCK_STREAM"
socket_domain: "AF_INET"
43 changes: 43 additions & 0 deletions Queries/berlinmod/q2_snapshot.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# BerlinMOD-Q2 — snapshot form
# "At each 5-second tick, snapshot of vehicle X's (= 200) trajectory in the tick."

query: |
SELECT start,
end,
TEMPORAL_SEQUENCE(gps_lon, gps_lat, time_utc) AS trajectory
FROM berlinmod_stream
WHERE vehicle_id = UINT64(200)
WINDOW TUMBLING(time_utc, SIZE 5 SEC)
INTO file_sink;

sinks:
- name: FILE_SINK
type: File
schema:
- { name: BERLINMOD_STREAM$START, type: UINT64 }
- { name: BERLINMOD_STREAM$END, type: UINT64 }
- { name: BERLINMOD_STREAM$TRAJECTORY, type: VARSIZED }
config:
file_path: "/workspace/Output/output_berlinmod_q2_snapshot.csv"
input_format: CSV

logical:
- name: BERLINMOD_STREAM
schema:
- { name: TIME_UTC, type: UINT64 }
- { name: VEHICLE_ID, type: UINT64 }
- { name: GPS_LON, type: FLOAT64 }
- { name: GPS_LAT, type: FLOAT64 }

physical:
- logical: BERLINMOD_STREAM
type: TCP
parser_config:
type: CSV
field_delimiter: ","
tuple_delimiter: "\n"
source_config:
socket_host: "host.docker.internal"
socket_port: "32325"
socket_type: "SOCK_STREAM"
socket_domain: "AF_INET"
43 changes: 43 additions & 0 deletions Queries/berlinmod/q2_windowed.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# BerlinMOD-Q2 — windowed form
# "Per 10-second tumbling window, trajectory of vehicle X (= 200)."

query: |
SELECT start,
end,
TEMPORAL_SEQUENCE(gps_lon, gps_lat, time_utc) AS trajectory
FROM berlinmod_stream
WHERE vehicle_id = UINT64(200)
WINDOW TUMBLING(time_utc, SIZE 10 SEC)
INTO file_sink;

sinks:
- name: FILE_SINK
type: File
schema:
- { name: BERLINMOD_STREAM$START, type: UINT64 }
- { name: BERLINMOD_STREAM$END, type: UINT64 }
- { name: BERLINMOD_STREAM$TRAJECTORY, type: VARSIZED }
config:
file_path: "/workspace/Output/output_berlinmod_q2_windowed.csv"
input_format: CSV

logical:
- name: BERLINMOD_STREAM
schema:
- { name: TIME_UTC, type: UINT64 }
- { name: VEHICLE_ID, type: UINT64 }
- { name: GPS_LON, type: FLOAT64 }
- { name: GPS_LAT, type: FLOAT64 }

physical:
- logical: BERLINMOD_STREAM
type: TCP
parser_config:
type: CSV
field_delimiter: ","
tuple_delimiter: "\n"
source_config:
socket_host: "host.docker.internal"
socket_port: "32325"
socket_type: "SOCK_STREAM"
socket_domain: "AF_INET"
49 changes: 49 additions & 0 deletions Queries/berlinmod/q3_continuous.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# BerlinMOD-Q3 — continuous form
# "Vehicles within 5 km of Brussels city centre, right now."
# Per 1-second sliding bucket, emit (start, end, vehicle_id) for events near P.

query: |
SELECT start,
end,
vehicle_id
FROM berlinmod_stream
WHERE edwithin_tgeo_geo(gps_lon,
gps_lat,
time_utc,
'SRID=4326;POINT(4.3517 50.8503)',
FLOAT64(5000.0)) = INT32(1)
GROUP BY vehicle_id
WINDOW SLIDING(time_utc, SIZE 1 SEC, ADVANCE BY 1 SEC)
INTO file_sink;

sinks:
- name: FILE_SINK
type: File
schema:
- { name: BERLINMOD_STREAM$START, type: UINT64 }
- { name: BERLINMOD_STREAM$END, type: UINT64 }
- { name: BERLINMOD_STREAM$VEHICLE_ID, type: UINT64 }
config:
file_path: "/workspace/Output/output_berlinmod_q3_continuous.csv"
input_format: CSV

logical:
- name: BERLINMOD_STREAM
schema:
- { name: TIME_UTC, type: UINT64 }
- { name: VEHICLE_ID, type: UINT64 }
- { name: GPS_LON, type: FLOAT64 }
- { name: GPS_LAT, type: FLOAT64 }

physical:
- logical: BERLINMOD_STREAM
type: TCP
parser_config:
type: CSV
field_delimiter: ","
tuple_delimiter: "\n"
source_config:
socket_host: "host.docker.internal"
socket_port: "32325"
socket_type: "SOCK_STREAM"
socket_domain: "AF_INET"
Loading
Loading