Skip to content

Commit 9b2f2f9

Browse files
feat(tables): write order_key on insert, flag-gate delete reindex + query ordering, add backfill
Flag off (default) = identical behavior. Single-insert assigns a fractional order_key; queryRows orders by order_key when the flag is on; deletes skip the O(N) reindex when on. Per-table-atomic backfill script populates existing rows.
1 parent 6c3edef commit 9b2f2f9

2 files changed

Lines changed: 197 additions & 7 deletions

File tree

apps/sim/lib/table/service.ts

Lines changed: 79 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@ import { createLogger } from '@sim/logger'
1818
import { getPostgresErrorCode } from '@sim/utils/errors'
1919
import { generateId } from '@sim/utils/id'
2020
import { and, count, eq, gt, gte, inArray, isNull, ne, or, type SQL, sql } from 'drizzle-orm'
21+
import { isTablesFractionalOrderingEnabled } from '@/lib/core/config/feature-flags'
2122
import { MATERIALIZE_CONCURRENCY, mapWithConcurrency } from '@/lib/core/utils/concurrency'
2223
import { generateRestoreName } from '@/lib/core/utils/restore-name'
2324
import type { DbOrTx } from '@/lib/db/types'
2425
import { materializeExecutionData } from '@/lib/logs/execution/trace-store'
2526
import { COLUMN_TYPES, NAME_PATTERN, TABLE_LIMITS, USER_TABLE_ROWS_SQL_NAME } from './constants'
2627
import { areGroupDepsSatisfied } from './deps'
2728
import { CSV_MAX_BATCH_SIZE } from './import'
29+
import { keyBetween } from './order-key'
2830
import { buildFilterClause, buildSortClause } from './sql'
2931
import { fireTableTrigger } from './trigger'
3032
import type {
@@ -1148,9 +1150,48 @@ export function buildOrderedRowValues(opts: {
11481150
}
11491151

11501152
/**
1151-
* Inserts a single row in its own transaction: sets timeouts, reserves the
1152-
* position, and inserts. Validation and side-effect dispatch stay with the
1153-
* caller. Capacity is enforced by the `increment_user_table_row_count` trigger.
1153+
* Computes the fractional `order_key` for a row being inserted at
1154+
* `requestedPosition` (or appended when omitted). Neighbors are resolved by the
1155+
* current `position` order — valid because keys are kept consistent with
1156+
* position order while the flag is off. Caller holds the row-order lock.
1157+
*
1158+
* NOTE: flag-on insert-*at a position* will resolve neighbors by `order_key`
1159+
* (via beforeRowId/afterRowId) once the wire contract carries them; until then
1160+
* append (the common path) is exact and at-position is position-derived.
1161+
*/
1162+
async function resolveInsertOrderKey(
1163+
trx: DbTransaction,
1164+
tableId: string,
1165+
requestedPosition?: number
1166+
): Promise<string> {
1167+
const orderKeyAtPosition = async (pos: number): Promise<string | null> => {
1168+
if (pos < 0) return null
1169+
const [r] = await trx
1170+
.select({ orderKey: userTableRows.orderKey })
1171+
.from(userTableRows)
1172+
.where(and(eq(userTableRows.tableId, tableId), eq(userTableRows.position, pos)))
1173+
.limit(1)
1174+
return r?.orderKey ?? null
1175+
}
1176+
if (requestedPosition === undefined) {
1177+
const [{ maxKey }] = await trx
1178+
.select({ maxKey: sql<string | null>`max(${userTableRows.orderKey})` })
1179+
.from(userTableRows)
1180+
.where(eq(userTableRows.tableId, tableId))
1181+
return keyBetween(maxKey ?? null, null)
1182+
}
1183+
const lo = await orderKeyAtPosition(requestedPosition - 1)
1184+
const hi = await orderKeyAtPosition(requestedPosition)
1185+
return keyBetween(lo, hi)
1186+
}
1187+
1188+
/**
1189+
* Inserts a single row in its own transaction. Always assigns a fractional
1190+
* `order_key`. When the fractional-ordering flag is on, `order_key` is
1191+
* authoritative and `position` is a best-effort append (no O(N) shift); when
1192+
* off, `position` is reserved as before (shifting to open the slot). Validation
1193+
* and side-effect dispatch stay with the caller; capacity is enforced by the
1194+
* `increment_user_table_row_count` trigger.
11541195
*/
11551196
async function insertOrderedRow(params: {
11561197
tableId: string
@@ -1164,7 +1205,25 @@ async function insertOrderedRow(params: {
11641205
const { tableId, workspaceId, data, rowId, position, createdBy, now } = params
11651206
const [row] = await db.transaction(async (trx) => {
11661207
await setTableTxTimeouts(trx)
1167-
const targetPosition = await reserveInsertPosition(trx, tableId, position)
1208+
await acquireRowOrderLock(trx, tableId)
1209+
const orderKey = await resolveInsertOrderKey(trx, tableId, position)
1210+
1211+
let targetPosition: number
1212+
if (isTablesFractionalOrderingEnabled) {
1213+
// order_key is authoritative — keep a best-effort, no-shift position.
1214+
targetPosition = await nextRowPosition(trx, tableId)
1215+
} else if (position !== undefined) {
1216+
const [existing] = await trx
1217+
.select({ id: userTableRows.id })
1218+
.from(userTableRows)
1219+
.where(and(eq(userTableRows.tableId, tableId), eq(userTableRows.position, position)))
1220+
.limit(1)
1221+
if (existing) await shiftRowsUpFrom(trx, tableId, position)
1222+
targetPosition = position
1223+
} else {
1224+
targetPosition = await nextRowPosition(trx, tableId)
1225+
}
1226+
11681227
return trx
11691228
.insert(userTableRows)
11701229
.values({
@@ -1173,6 +1232,7 @@ async function insertOrderedRow(params: {
11731232
workspaceId,
11741233
data,
11751234
position: targetPosition,
1235+
orderKey,
11761236
createdAt: now,
11771237
updatedAt: now,
11781238
...(createdBy ? { createdBy } : {}),
@@ -1211,7 +1271,11 @@ async function deleteOrderedRow(params: {
12111271
)
12121272
.returning({ position: userTableRows.position })
12131273
if (!deleted) return false
1214-
await shiftRowsDownAfter(trx, tableId, deleted.position)
1274+
// Fractional ordering: deleting a row never changes another row's order_key,
1275+
// so the O(N) position reshift is skipped entirely.
1276+
if (!isTablesFractionalOrderingEnabled) {
1277+
await shiftRowsDownAfter(trx, tableId, deleted.position)
1278+
}
12151279
return true
12161280
})
12171281
}
@@ -1246,7 +1310,8 @@ async function deleteOrderedRowsByIds(params: {
12461310
.returning({ id: userTableRows.id, position: userTableRows.position })
12471311
deleted.push(...rows)
12481312
}
1249-
if (deleted.length > 0) {
1313+
// Fractional ordering: deletes leave order_key untouched, so no recompaction.
1314+
if (!isTablesFractionalOrderingEnabled && deleted.length > 0) {
12501315
const minDeletedPos = deleted.reduce(
12511316
(min, r) => (r.position < min ? r.position : min),
12521317
deleted[0].position
@@ -2154,7 +2219,14 @@ export async function queryRows(
21542219
.from(userTableRows)
21552220
.where(whereClause ?? baseConditions)
21562221
if (orderByClause) {
2157-
query = query.orderBy(orderByClause) as typeof query
2222+
// Explicit data-column sort: tiebreak by the default order for stability.
2223+
query = query.orderBy(
2224+
orderByClause,
2225+
isTablesFractionalOrderingEnabled ? userTableRows.orderKey : userTableRows.position,
2226+
userTableRows.id
2227+
) as typeof query
2228+
} else if (isTablesFractionalOrderingEnabled) {
2229+
query = query.orderBy(userTableRows.orderKey, userTableRows.id) as typeof query
21582230
} else {
21592231
query = query.orderBy(userTableRows.position) as typeof query
21602232
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
#!/usr/bin/env bun
2+
3+
/**
4+
* Backfills the `order_key` column on `user_table_rows`.
5+
*
6+
* Row ordering is moving from the contiguous integer `position` to a fractional
7+
* string `order_key` (O(1) insert/delete — no reshift/recompact). This script
8+
* assigns each existing row a key derived from its current `position` order, so
9+
* the new ordering matches today's once the `TABLES_FRACTIONAL_ORDERING` flag is
10+
* flipped on.
11+
*
12+
* Per-table-atomic: each table is keyed inside one transaction holding the same
13+
* per-table advisory lock the app uses for inserts, so a concurrent insert can't
14+
* interleave. Idempotent: tables already fully keyed are skipped; a table with
15+
* any NULL key is fully re-keyed from `position` order (deterministic, so a
16+
* re-run after a partial failure is safe).
17+
*
18+
* Usage:
19+
* DATABASE_URL=... bun run apps/sim/scripts/backfill-table-order-keys.ts
20+
* DATABASE_URL=... bun run apps/sim/scripts/backfill-table-order-keys.ts --dry-run
21+
*/
22+
23+
import { userTableRows } from '@sim/db/schema'
24+
import { getErrorMessage } from '@sim/utils/errors'
25+
import { asc, eq, isNull, sql } from 'drizzle-orm'
26+
import { drizzle } from 'drizzle-orm/postgres-js'
27+
import postgres from 'postgres'
28+
import { nKeysBetween } from '@/lib/table/order-key'
29+
30+
export async function runBackfill(): Promise<void> {
31+
const dryRun = process.argv.includes('--dry-run')
32+
const connectionString = process.env.DATABASE_URL ?? process.env.POSTGRES_URL
33+
if (!connectionString) {
34+
console.error('Missing DATABASE_URL or POSTGRES_URL')
35+
process.exit(1)
36+
}
37+
38+
const client = postgres(connectionString, {
39+
prepare: false,
40+
idle_timeout: 20,
41+
connect_timeout: 30,
42+
max: 5,
43+
onnotice: () => {},
44+
})
45+
const db = drizzle(client)
46+
47+
const stats = { tables: 0, tablesKeyed: 0, rowsKeyed: 0, failed: 0 }
48+
49+
try {
50+
// Tables that still have at least one un-keyed row.
51+
const pending = await db
52+
.selectDistinct({ tableId: userTableRows.tableId })
53+
.from(userTableRows)
54+
.where(isNull(userTableRows.orderKey))
55+
56+
console.log(
57+
`Backfill starting — ${pending.length} table(s) with NULL order_key${dryRun ? ' [DRY RUN]' : ''}`
58+
)
59+
60+
for (const { tableId } of pending) {
61+
stats.tables += 1
62+
try {
63+
const keyed = await db.transaction(async (trx) => {
64+
// Serialize with concurrent inserts on this table (same lock the app uses).
65+
await trx.execute(
66+
sql`SELECT pg_advisory_xact_lock(hashtextextended(${`user_table_rows_pos:${tableId}`}, 0))`
67+
)
68+
const rows = await trx
69+
.select({ id: userTableRows.id })
70+
.from(userTableRows)
71+
.where(eq(userTableRows.tableId, tableId))
72+
.orderBy(asc(userTableRows.position), asc(userTableRows.id))
73+
74+
if (rows.length === 0) return 0
75+
const keys = nKeysBetween(null, null, rows.length)
76+
if (dryRun) return rows.length
77+
78+
// One UPDATE … FROM (VALUES …) mapping id → key.
79+
const values = sql.join(
80+
rows.map((r, i) => sql`(${r.id}, ${keys[i]})`),
81+
sql`, `
82+
)
83+
await trx.execute(sql`
84+
UPDATE user_table_rows AS t
85+
SET order_key = v.order_key
86+
FROM (VALUES ${values}) AS v(id, order_key)
87+
WHERE t.id = v.id AND t.table_id = ${tableId}
88+
`)
89+
return rows.length
90+
})
91+
stats.tablesKeyed += 1
92+
stats.rowsKeyed += keyed
93+
console.log(` ${tableId}: keyed ${keyed} rows`)
94+
} catch (error) {
95+
stats.failed += 1
96+
console.error(` ${tableId}: FAILED — ${getErrorMessage(error)}`)
97+
}
98+
}
99+
100+
console.log('Backfill complete.')
101+
console.log(` tables scanned: ${stats.tables}`)
102+
console.log(` tables keyed: ${stats.tablesKeyed}`)
103+
console.log(` rows keyed: ${stats.rowsKeyed}`)
104+
console.log(` failed: ${stats.failed}`)
105+
if (stats.failed > 0) process.exitCode = 1
106+
} finally {
107+
await client.end({ timeout: 5 }).catch(() => {})
108+
}
109+
}
110+
111+
if ((import.meta as { main?: boolean }).main) {
112+
try {
113+
await runBackfill()
114+
} catch (error) {
115+
console.error('Backfill aborted:', getErrorMessage(error))
116+
process.exitCode = 1
117+
}
118+
}

0 commit comments

Comments
 (0)