Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 114 additions & 1 deletion internal/cbm/sqlite_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,82 @@ static uint8_t *build_table_cell(int64_t rowid, const uint8_t *payload, int payl
return cell;
}

// Build a table leaf cell with overflow: stores only the first local_len bytes of
// payload inline, followed by a 4-byte overflow page number.
// total_payload_len is the FULL original payload length (written as the payload-size
// varint so SQLite knows the real record size).
static uint8_t *build_table_cell_overflow(int64_t rowid, const uint8_t *payload,
int total_payload_len, int local_len,
uint32_t overflow_page, int *out_cell_len) {
int rl = varint_len(total_payload_len);
int kl = varint_len(rowid);
// cell = varint(total_payload_len) + varint(rowid) + payload[0..local_len) + uint32(overflow)
int total = rl + kl + local_len + 4;
uint8_t *cell = (uint8_t *)malloc(total);
if (!cell) {
return NULL;
}
int pos = 0;
pos += put_varint(cell + pos, total_payload_len);
pos += put_varint(cell + pos, rowid);
memcpy(cell + pos, payload, local_len);
pos += local_len;
put_u32(cell + pos, overflow_page);
pos += 4;
*out_cell_len = pos;
return cell;
}

// --- Overflow page writer ---
// Writes overflow pages for payload bytes that exceed local storage.
// Returns the first overflow page number (embedded in the leaf cell).
// Each overflow page: 4-byte next-page pointer + up to (PAGE_SIZE-4) bytes of data.
// NOLINTNEXTLINE(bugprone-easily-swappable-parameters)
static uint32_t write_overflow_pages(FILE *fp, uint32_t *next_page, const uint8_t *data,
int data_len) {
int per_page = PAGE_SIZE - 4; // 4 bytes reserved for next-page pointer
uint32_t first_page = 0;
long prev_next_ptr_offset = -1; // file offset of the next-page field in the previous overflow page

int offset = 0;
while (offset < data_len) {
uint32_t pnum = (*next_page)++;
if (first_page == 0) {
first_page = pnum;
}

// Backpatch previous overflow page's next-page pointer
if (prev_next_ptr_offset >= 0) {
uint8_t ptr[4];
put_u32(ptr, pnum);
// NOLINTNEXTLINE(cert-err33-c)
fseek(fp, prev_next_ptr_offset, SEEK_SET);
// NOLINTNEXTLINE(cert-err33-c)
fwrite(ptr, 1, 4, fp);
}

int chunk = data_len - offset;
if (chunk > per_page) {
chunk = per_page;
}

uint8_t page[PAGE_SIZE];
memset(page, 0, PAGE_SIZE);
put_u32(page, 0); // next-page pointer — 0 for now, backpatched on next iteration
memcpy(page + 4, data + offset, chunk);

long page_offset = (long)(pnum - 1) * PAGE_SIZE;
prev_next_ptr_offset = page_offset; // next-page pointer is at byte 0 of this page
// NOLINTNEXTLINE(cert-err33-c)
fseek(fp, page_offset, SEEK_SET);
// NOLINTNEXTLINE(cert-err33-c)
fwrite(page, 1, PAGE_SIZE, fp);

offset += chunk;
}
return first_page;
}

// --- Index record builders ---

// Build an index entry for a 2-column TEXT index (project, col) + rowid.
Expand Down Expand Up @@ -840,12 +916,49 @@ static void pb_ensure_leaf_cap(PageBuilder *pb) {
}
}

// SQLite overflow thresholds for leaf table B-tree pages (PAGE_SIZE=65536, reserved=0):
// usable = PAGE_SIZE = 65536
// max_local = usable - 35 = 65501
// min_local = (usable - 12) * 32 / 255 - 23 = 8199 (C integer arithmetic, same as SQLite)
//
// These must match SQLite's btree.c formulas exactly:
// pBt->maxLeaf = usableSize - 35
// pBt->minLeaf = (usableSize-12)*32/255 - 23
#define TABLE_OVERFLOW_MAX_LOCAL 65501
#define TABLE_OVERFLOW_MIN_LOCAL 8199

// Add a table cell to the PageBuilder, flushing leaf pages as needed.
// If the payload exceeds max_local, overflow pages are written and only the
// local portion plus a 4-byte overflow page pointer is stored in the leaf cell.
static void pb_add_table_cell_with_flush(PageBuilder *pb, int64_t rowid, const uint8_t *payload,
// NOLINTNEXTLINE(bugprone-easily-swappable-parameters)
int payload_len, int64_t prev_rowid) {
int cell_len = 0;
uint8_t *cell = build_table_cell(rowid, payload, payload_len, &cell_len);
uint8_t *cell = NULL;

if (payload_len > TABLE_OVERFLOW_MAX_LOCAL) {
// Compute local_len per SQLite spec §overflow-pages for leaf table cells.
int local_len =
TABLE_OVERFLOW_MIN_LOCAL +
((payload_len - TABLE_OVERFLOW_MIN_LOCAL) % (PAGE_SIZE - 4));
if (local_len > TABLE_OVERFLOW_MAX_LOCAL) {
local_len = TABLE_OVERFLOW_MIN_LOCAL;
}

// Write overflow pages for the bytes that don't fit locally.
uint32_t overflow_page =
write_overflow_pages(pb->fp, &pb->next_page, payload + local_len,
payload_len - local_len);
if (overflow_page == 0) {
return; // overflow write failed
}

cell = build_table_cell_overflow(rowid, payload, payload_len, local_len, overflow_page,
&cell_len);
} else {
cell = build_table_cell(rowid, payload, payload_len, &cell_len);
}

if (!cell) {
return;
}
Expand Down
64 changes: 64 additions & 0 deletions tests/test_sqlite_writer.c
Original file line number Diff line number Diff line change
Expand Up @@ -372,11 +372,75 @@ TEST(sw_multi_page) {
PASS();
}

/* ── Oversized node: properties JSON > 65KB triggers overflow pages ─ */

TEST(sw_oversized_node) {
char path[256];
ASSERT_EQ(make_temp_db(path, sizeof(path)), 0);

/* Build a properties JSON string that exceeds max_local (65501 bytes).
* Use 70000 bytes of padding inside the JSON value so the full record,
* which includes other text columns, is well above the threshold. */
int prop_len = 70000;
char *big_props = (char *)malloc(prop_len + 1);
ASSERT_NOT_NULL(big_props);
memset(big_props, 'x', prop_len);
big_props[0] = '"';
big_props[prop_len - 1] = '"';
big_props[prop_len] = '\0';

CBMDumpNode nodes[1] = {{
.id = 1,
.project = "test",
.label = "Function",
.name = "huge_fn",
.qualified_name = "test.huge_fn",
.file_path = "huge.go",
.start_line = 1,
.end_line = 9999,
.properties = big_props,
}};

int rc = cbm_write_db(path, "test", "/tmp/test", "2026-03-28T00:00:00Z", nodes, 1, NULL, 0);
free(big_props);
ASSERT_EQ(rc, 0);

sqlite3 *db = NULL;
rc = sqlite3_open(path, &db);
ASSERT_EQ(rc, SQLITE_OK);

/* Integrity check — SQLite will validate overflow page chain */
sqlite3_stmt *stmt = NULL;
sqlite3_prepare_v2(db, "PRAGMA integrity_check", -1, &stmt, NULL);
rc = sqlite3_step(stmt);
ASSERT_EQ(rc, SQLITE_ROW);
ASSERT_STR_EQ((const char *)sqlite3_column_text(stmt, 0), "ok");
sqlite3_finalize(stmt);

/* Verify we can read the node back */
sqlite3_prepare_v2(db, "SELECT COUNT(*) FROM nodes", -1, &stmt, NULL);
sqlite3_step(stmt);
ASSERT_EQ(sqlite3_column_int(stmt, 0), 1);
sqlite3_finalize(stmt);

/* Verify the name round-trips correctly */
sqlite3_prepare_v2(db, "SELECT name FROM nodes WHERE id=1", -1, &stmt, NULL);
rc = sqlite3_step(stmt);
ASSERT_EQ(rc, SQLITE_ROW);
ASSERT_STR_EQ((const char *)sqlite3_column_text(stmt, 0), "huge_fn");
sqlite3_finalize(stmt);

sqlite3_close(db);
unlink(path);
PASS();
}

/* ── Suite ─────────────────────────────────────────────────────── */

SUITE(sqlite_writer) {
RUN_TEST(sw_minimal_data);
RUN_TEST(sw_scale_and_indexes);
RUN_TEST(sw_empty);
RUN_TEST(sw_multi_page);
RUN_TEST(sw_oversized_node);
}