Skip to content

Commit 20d7810

Browse files
feat(tables): write order_key on all insert paths (batch, upsert, replace, import, create, copilot)
Completes the always-write-keys prerequisite: every row insert now assigns a fractional order_key consistent with position order, so the flag can be flipped safely after backfill. Flag off (default) still = identical behavior.
1 parent 9b2f2f9 commit 20d7810

2 files changed

Lines changed: 74 additions & 9 deletions

File tree

apps/sim/lib/copilot/request/tools/tables.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { TraceSpan } from '@/lib/copilot/generated/trace-spans-v1'
1313
import { withCopilotSpan } from '@/lib/copilot/request/otel'
1414
import type { ExecutionContext, ToolCallResult } from '@/lib/copilot/request/types'
1515
import type { RowData } from '@/lib/table'
16+
import { nKeysBetween } from '@/lib/table/order-key'
1617
import { buildOrderedRowValues, getTableById } from '@/lib/table/service'
1718

1819
const logger = createLogger('CopilotToolResultTables')
@@ -103,6 +104,8 @@ export async function maybeWriteOutputToTable(
103104
await tx.delete(userTableRows).where(eq(userTableRows.tableId, outputTable))
104105

105106
const now = new Date()
107+
// Replace-all: table was just cleared — mint a fresh contiguous key run.
108+
const orderKeys = nKeysBetween(null, null, rows.length)
106109
for (let i = 0; i < rows.length; i += BATCH_CHUNK_SIZE) {
107110
if (context.abortSignal?.aborted) {
108111
throw new Error('Request aborted before tool mutation could be applied')
@@ -113,6 +116,7 @@ export async function maybeWriteOutputToTable(
113116
workspaceId: context.workspaceId!,
114117
rows: chunk as RowData[],
115118
startPosition: i,
119+
orderKeys: orderKeys.slice(i, i + BATCH_CHUNK_SIZE),
116120
now,
117121
createdBy: context.userId,
118122
makeId: () => `row_${generateId().replace(/-/g, '')}`,
@@ -246,6 +250,8 @@ export async function maybeWriteReadCsvToTable(
246250
await tx.delete(userTableRows).where(eq(userTableRows.tableId, outputTable))
247251

248252
const now = new Date()
253+
// Replace-all: table was just cleared — mint a fresh contiguous key run.
254+
const orderKeys = nKeysBetween(null, null, rows.length)
249255
for (let i = 0; i < rows.length; i += BATCH_CHUNK_SIZE) {
250256
if (context.abortSignal?.aborted) {
251257
throw new Error('Request aborted before tool mutation could be applied')
@@ -256,6 +262,7 @@ export async function maybeWriteReadCsvToTable(
256262
workspaceId: context.workspaceId!,
257263
rows: chunk as RowData[],
258264
startPosition: i,
265+
orderKeys: orderKeys.slice(i, i + BATCH_CHUNK_SIZE),
259266
now,
260267
createdBy: context.userId,
261268
makeId: () => `row_${generateId().replace(/-/g, '')}`,

apps/sim/lib/table/service.ts

Lines changed: 67 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import { materializeExecutionData } from '@/lib/logs/execution/trace-store'
2626
import { COLUMN_TYPES, NAME_PATTERN, TABLE_LIMITS, USER_TABLE_ROWS_SQL_NAME } from './constants'
2727
import { areGroupDepsSatisfied } from './deps'
2828
import { CSV_MAX_BATCH_SIZE } from './import'
29-
import { keyBetween } from './order-key'
29+
import { keyBetween, nKeysBetween } from './order-key'
3030
import { buildFilterClause, buildSortClause } from './sql'
3131
import { fireTableTrigger } from './trigger'
3232
import type {
@@ -450,11 +450,13 @@ export async function createTable(
450450

451451
const initialRowCount = data.initialRowCount ?? 0
452452
if (initialRowCount > 0) {
453+
const orderKeys = nKeysBetween(null, null, initialRowCount)
453454
const rowsToInsert = Array.from({ length: initialRowCount }, (_, i) => ({
454455
id: `row_${generateId().replace(/-/g, '')}`,
455456
tableId,
456457
data: {},
457458
position: i,
459+
orderKey: orderKeys[i],
458460
workspaceId: data.workspaceId,
459461
createdAt: now,
460462
updatedAt: now,
@@ -1109,40 +1111,44 @@ async function compactPositions(trx: DbTransaction, tableId: string, minDeletedP
11091111
`)
11101112
}
11111113

1112-
/** A row value ready to INSERT into `user_table_rows`, with its assigned position. */
1114+
/** A row value ready to INSERT into `user_table_rows`, with its assigned order. */
11131115
export interface OrderedRowValue {
11141116
id: string
11151117
tableId: string
11161118
workspaceId: string
11171119
data: RowData
11181120
position: number
1121+
orderKey: string
11191122
createdAt: Date
11201123
updatedAt: Date
11211124
createdBy?: string
11221125
}
11231126

11241127
/**
11251128
* Builds INSERT values for a contiguous run of rows, assigning sequential
1126-
* positions `startPosition, startPosition + 1, …`. Centralizes position
1127-
* assignment for callers that write a fresh ordered run (e.g. the copilot tool's
1128-
* replace-all write).
1129+
* positions `startPosition + i` and the supplied `orderKeys[i]`. Centralizes
1130+
* row assignment for callers that write a fresh ordered run (e.g. the copilot
1131+
* tool's replace-all write). `orderKeys` must be index-aligned with `rows` —
1132+
* mint them once for the whole run with {@link nKeysBetween}.
11291133
*/
11301134
export function buildOrderedRowValues(opts: {
11311135
tableId: string
11321136
workspaceId: string
11331137
rows: RowData[]
11341138
startPosition: number
1139+
orderKeys: string[]
11351140
now: Date
11361141
createdBy?: string
11371142
makeId: () => string
11381143
}): OrderedRowValue[] {
1139-
const { tableId, workspaceId, rows, startPosition, now, createdBy, makeId } = opts
1144+
const { tableId, workspaceId, rows, startPosition, orderKeys, now, createdBy, makeId } = opts
11401145
return rows.map((data, i) => ({
11411146
id: makeId(),
11421147
tableId,
11431148
workspaceId,
11441149
data,
11451150
position: startPosition + i,
1151+
orderKey: orderKeys[i],
11461152
createdAt: now,
11471153
updatedAt: now,
11481154
...(createdBy ? { createdBy } : {}),
@@ -1185,6 +1191,32 @@ async function resolveInsertOrderKey(
11851191
return keyBetween(lo, hi)
11861192
}
11871193

1194+
/**
1195+
* Computes fractional `order_key`s for a batch insert. With no `positions`,
1196+
* appends a contiguous run after the current max key. With explicit `positions`
1197+
* (undo restore), keys each row between its pre-shift position neighbors —
1198+
* correct because requested positions are distinct. Caller holds the lock.
1199+
*/
1200+
async function resolveBatchInsertOrderKeys(
1201+
trx: DbTransaction,
1202+
tableId: string,
1203+
count: number,
1204+
positions?: number[]
1205+
): Promise<string[]> {
1206+
if (!positions || positions.length === 0) {
1207+
const [{ maxKey }] = await trx
1208+
.select({ maxKey: sql<string | null>`max(${userTableRows.orderKey})` })
1209+
.from(userTableRows)
1210+
.where(eq(userTableRows.tableId, tableId))
1211+
return nKeysBetween(maxKey ?? null, null, count)
1212+
}
1213+
const keys: string[] = []
1214+
for (const pos of positions) {
1215+
keys.push(await resolveInsertOrderKey(trx, tableId, pos))
1216+
}
1217+
return keys
1218+
}
1219+
11881220
/**
11891221
* Inserts a single row in its own transaction. Always assigns a fractional
11901222
* `order_key`. When the fractional-ordering flag is on, `order_key` is
@@ -1475,19 +1507,34 @@ export async function batchInsertRowsWithTx(
14751507

14761508
await setTableTxTimeouts(trx, { statementMs: 60_000 })
14771509

1478-
const buildRow = (rowData: RowData, position: number) => ({
1510+
const buildRow = (rowData: RowData, position: number, orderKey: string) => ({
14791511
id: `row_${generateId().replace(/-/g, '')}`,
14801512
tableId: data.tableId,
14811513
workspaceId: data.workspaceId,
14821514
data: rowData,
14831515
position,
1516+
orderKey,
14841517
createdAt: now,
14851518
updatedAt: now,
14861519
...(data.userId ? { createdBy: data.userId } : {}),
14871520
})
14881521

1489-
const positions = await reserveBatchPositions(trx, data.tableId, data.rows.length, data.positions)
1490-
const rowsToInsert = data.rows.map((rowData, i) => buildRow(rowData, positions[i]))
1522+
await acquireRowOrderLock(trx, data.tableId)
1523+
const orderKeys = await resolveBatchInsertOrderKeys(
1524+
trx,
1525+
data.tableId,
1526+
data.rows.length,
1527+
data.positions
1528+
)
1529+
let positions: number[]
1530+
if (isTablesFractionalOrderingEnabled) {
1531+
// order_key authoritative — best-effort append positions, no shift.
1532+
const start = await nextRowPosition(trx, data.tableId)
1533+
positions = Array.from({ length: data.rows.length }, (_, i) => start + i)
1534+
} else {
1535+
positions = await reserveBatchPositions(trx, data.tableId, data.rows.length, data.positions)
1536+
}
1537+
const rowsToInsert = data.rows.map((rowData, i) => buildRow(rowData, positions[i], orderKeys[i]))
14911538
const insertedRows = await trx.insert(userTableRows).values(rowsToInsert).returning()
14921539

14931540
logger.info(`[${requestId}] Batch inserted ${data.rows.length} rows into table ${data.tableId}`)
@@ -1586,12 +1633,19 @@ export async function bulkInsertImportBatch(
15861633
}
15871634

15881635
const now = new Date()
1636+
// Import worker is the table's sole writer; append keys after the current max.
1637+
const [{ maxKey }] = await db
1638+
.select({ maxKey: sql<string | null>`max(${userTableRows.orderKey})` })
1639+
.from(userTableRows)
1640+
.where(eq(userTableRows.tableId, data.tableId))
1641+
const orderKeys = nKeysBetween(maxKey ?? null, null, data.rows.length)
15891642
const rowsToInsert = data.rows.map((rowData, i) => ({
15901643
id: `row_${generateId().replace(/-/g, '')}`,
15911644
tableId: data.tableId,
15921645
workspaceId: data.workspaceId,
15931646
data: rowData,
15941647
position: data.startPosition + i,
1648+
orderKey: orderKeys[i],
15951649
createdAt: now,
15961650
updatedAt: now,
15971651
...(data.userId ? { createdBy: data.userId } : {}),
@@ -1862,12 +1916,15 @@ export async function replaceTableRowsWithTx(
18621916

18631917
let insertedCount = 0
18641918
if (data.rows.length > 0) {
1919+
// All prior rows were just deleted — assign a fresh contiguous key run.
1920+
const orderKeys = nKeysBetween(null, null, data.rows.length)
18651921
const rowsToInsert = data.rows.map((rowData, i) => ({
18661922
id: `row_${generateId().replace(/-/g, '')}`,
18671923
tableId: data.tableId,
18681924
workspaceId: data.workspaceId,
18691925
data: rowData,
18701926
position: i,
1927+
orderKey: orderKeys[i],
18711928
createdAt: now,
18721929
updatedAt: now,
18731930
...(data.userId ? { createdBy: data.userId } : {}),
@@ -2106,6 +2163,7 @@ export async function upsertRow(
21062163
workspaceId: data.workspaceId,
21072164
data: data.data,
21082165
position: await reserveInsertPosition(trx, data.tableId),
2166+
orderKey: await resolveInsertOrderKey(trx, data.tableId),
21092167
createdAt: now,
21102168
updatedAt: now,
21112169
...(data.userId ? { createdBy: data.userId } : {}),

0 commit comments

Comments
 (0)