Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions bin/pg_repack.c
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ typedef struct repack_table
const char *create_log; /* CREATE TABLE log */
const char *create_trigger; /* CREATE TRIGGER repack_trigger */
const char *enable_trigger; /* ALTER TABLE ENABLE ALWAYS TRIGGER repack_trigger */
const char *create_sequence; /* CREATE SEQUENCE track_insert */
const char *create_table; /* CREATE TABLE table AS SELECT WITH NO DATA*/
const char *dest_tablespace; /* Destination tablespace */
const char *copy_data; /* INSERT INTO */
Expand Down Expand Up @@ -927,6 +928,7 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize)
table.create_log = getstr(res, i, c++);
table.create_trigger = getstr(res, i, c++);
table.enable_trigger = getstr(res, i, c++);
table.create_sequence = getstr(res, i, c++);

table.create_table = getstr(res, i, c++);
getstr(res, i, c++); /* tablespace_orig is clobbered */
Expand Down Expand Up @@ -968,6 +970,7 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize)
appendStringInfoString(&copy_sql, " ORDER BY ");
appendStringInfoString(&copy_sql, orderby);
}
appendStringInfo(&copy_sql, " RETURNING nextval('repack.track_insert_%u')", table.target_oid);
table.copy_data = copy_sql.data;

repack_one_table(&table, orderby);
Expand Down Expand Up @@ -1265,6 +1268,7 @@ repack_one_table(repack_table *table, const char *orderby)
elog(DEBUG2, "create_log : %s", table->create_log);
elog(DEBUG2, "create_trigger : %s", table->create_trigger);
elog(DEBUG2, "enable_trigger : %s", table->enable_trigger);
elog(DEBUG2, "create_sequence : %s", table->create_sequence);
elog(DEBUG2, "create_table : %s", table->create_table);
elog(DEBUG2, "dest_tablespace : %s", table->dest_tablespace);
elog(DEBUG2, "copy_data : %s", table->copy_data);
Expand Down Expand Up @@ -1390,6 +1394,9 @@ repack_one_table(repack_table *table, const char *orderby)
command(table->create_trigger, 0, NULL);
temp_obj_num++;
command(table->enable_trigger, 0, NULL);
command(table->create_sequence, 0, NULL);
temp_obj_num++;

printfStringInfo(&sql, "SELECT repack.disable_autovacuum('repack.log_%u')", table->target_oid);
command(sql.data, 0, NULL);

Expand Down
147 changes: 147 additions & 0 deletions lib/pg_repack.sql.in
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,7 @@ CREATE VIEW repack.tables AS
'SELECT repack.create_log_table(' || R.oid || ')' AS create_log,
repack.get_create_trigger(R.oid, PK.indexrelid) AS create_trigger,
repack.get_enable_trigger(R.oid) as enable_trigger,
'CREATE SEQUENCE IF NOT EXISTS repack.track_insert_' || R.oid AS create_sequence,
'SELECT repack.create_table($1, $2)'::text AS create_table,
coalesce(S.spcname, S2.spcname) AS tablespace_orig,
'INSERT INTO repack.table_' || R.oid || ' SELECT ' || repack.get_columns_for_create_as(R.oid) || ' FROM ONLY ' || repack.oid2text(R.oid) AS copy_data,
Expand Down Expand Up @@ -373,6 +374,152 @@ CREATE FUNCTION repack.repack_drop(oid, int) RETURNS void AS
'MODULE_PATHNAME', 'repack_drop'
LANGUAGE C VOLATILE STRICT;

CREATE OR REPLACE FUNCTION repack.get_progress_all()
RETURNS TABLE(
table_oid oid,
table_full_name text,
current_rows bigint,
estimated_total bigint,
progress_percent numeric,
status text
)
AS $$
DECLARE
seq_rec RECORD;
seq_value bigint;
reltuples_float float4;
table_relname text;
table_nspname text;
BEGIN
FOR seq_rec IN
SELECT c.relname AS seq_name
FROM pg_class c
JOIN pg_namespace n ON c.relnamespace = n.oid
WHERE n.nspname = 'repack'
AND c.relkind = 'S'
AND c.relname ~ '^track_insert_[0-9]+$'
LOOP
BEGIN
table_oid := substring(seq_rec.seq_name FROM '^track_insert_([0-9]+)$')::oid;
EXCEPTION
WHEN invalid_text_representation THEN
table_oid := 0;
table_full_name := seq_rec.seq_name;
current_rows := 0;
estimated_total := 0;
progress_percent := 0;
status := 'INVALID SEQUENCE NAME FORMAT';
RETURN NEXT;
CONTINUE;
WHEN OTHERS THEN
table_oid := 0;
table_full_name := seq_rec.seq_name;
current_rows := 0;
estimated_total := 0;
progress_percent := 0;
status := format('OID EXTRACTION ERROR: %s', SQLERRM);
RETURN NEXT;
CONTINUE;
END;

BEGIN
SELECT c.relname, n.nspname, c.reltuples
INTO table_relname, table_nspname, reltuples_float
FROM pg_class c
JOIN pg_namespace n ON c.relnamespace = n.oid
WHERE c.oid = table_oid
AND c.relkind = 'r';

IF NOT FOUND THEN
table_full_name := format('<OID %s>', table_oid);
current_rows := 0;
estimated_total := 0;
progress_percent := 0;
status := 'TABLE NOT FOUND OR NOT A REGULAR TABLE';
RETURN NEXT;
CONTINUE;
END IF;

table_full_name := format('%I.%I', table_nspname, table_relname);
EXCEPTION WHEN OTHERS THEN
table_full_name := format('<OID %s>', table_oid);
current_rows := 0;
estimated_total := 0;
progress_percent := 0;
status := 'ERROR FETCHING TABLE METADATA: ' || SQLERRM;
RETURN NEXT;
CONTINUE;
END;

BEGIN
EXECUTE format('SELECT last_value FROM repack.%I', seq_rec.seq_name)
INTO seq_value;
current_rows := seq_value;
EXCEPTION WHEN OTHERS THEN
current_rows := 0;
estimated_total := 0;
progress_percent := 0;
status := 'ERROR READING SEQUENCE: ' || SQLERRM;
RETURN NEXT;
CONTINUE;
END;

BEGIN
IF reltuples_float IS NULL OR reltuples_float < 0 THEN
estimated_total := 1;
status := 'WARNING: reltuples INVALID (using fallback=1)';
ELSE
estimated_total := GREATEST(ceil(reltuples_float)::bigint, 1);
status := NULL;
END IF;
EXCEPTION WHEN OTHERS THEN
estimated_total := 1;
status := 'ERROR PROCESSING reltuples: ' || SQLERRM;
RETURN NEXT;
CONTINUE;
END;

BEGIN
progress_percent := LEAST(
ROUND((current_rows::numeric * 100) / estimated_total, 2),
100.00
);
EXCEPTION
WHEN division_by_zero THEN
progress_percent := 0;
status := 'DIVISION BY ZERO (estimated_total=0)';
RETURN NEXT;
CONTINUE;
WHEN OTHERS THEN
progress_percent := 0;
status := format('PROGRESS CALCULATION ERROR: %s', SQLERRM);
RETURN NEXT;
CONTINUE;
END;

IF status IS NULL OR status = '' THEN
status := 'COPYING';
END IF;

RETURN NEXT;
END LOOP;

RETURN;
EXCEPTION WHEN OTHERS THEN
table_oid := 0;
table_full_name := 'GLOBAL ERROR';
current_rows := 0;
estimated_total := 0;
progress_percent := 0;
status := format('FUNCTION EXECUTION FAILED: %s', SQLERRM);
RETURN NEXT;
RETURN;
END;
$$ LANGUAGE plpgsql VOLATILE;

REVOKE ALL ON FUNCTION repack.get_progress_all() FROM PUBLIC;
GRANT EXECUTE ON FUNCTION repack.get_progress_all() TO PUBLIC;

CREATE FUNCTION repack.repack_index_swap(oid) RETURNS void AS
'MODULE_PATHNAME', 'repack_index_swap'
LANGUAGE C STABLE STRICT;
Expand Down
10 changes: 10 additions & 0 deletions lib/repack.c
Original file line number Diff line number Diff line change
Expand Up @@ -1150,6 +1150,16 @@ repack_drop(PG_FUNCTION_ARGS)
--numobj;
}

/* drop sequence for tracking insert progress */
if (numobj > 0)
{
execute_with_format(
SPI_OK_UTILITY,
"DROP SEQUENCE IF EXISTS repack.track_insert_%u",
oid);
--numobj;
}

/* drop temp table */
if (numobj > 0)
{
Expand Down