Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 10 additions & 22 deletions cli.js
Original file line number Diff line number Diff line change
Expand Up @@ -1723,30 +1723,16 @@ async function runBackfillWait(globalFlags) {
}, globalFlags.timeoutMs);
}

// Cancel backfills for a chat. Prefer the control server (so an in-flight job is
// stopped by the worker); fall back to a direct DB delete when no server is up.
// Cancel backfills for a chat through the control server, auto-starting it when
// needed so all queue writes go through the sole writer.
async function runBackfillCancel(globalFlags, options = {}) {
return runWithTimeout(async () => {
if (!options.chat) {
throw new Error('--chat is required');
}
const storeDir = resolveStoreDir();
const ping = await pingServer(storeDir);
let result;
if (ping) {
result = await cancelBackfill(storeDir, { chatId: options.chat });
} else {
// No server running: same direct path as `backfill jobs cancel --channel`.
const release = acquireStoreLock(storeDir);
const { telegramClient, messageSyncService } = createServices({ storeDir });
try {
result = messageSyncService.cancelJobs({ channelId: options.chat });
} finally {
await messageSyncService.close();
await telegramClient.destroy();
release();
}
}
await ensureServer(storeDir, { idleExit: '60s' });
const result = await cancelBackfill(storeDir, { chatId: options.chat });
if (globalFlags.json) {
writeJson(result);
} else {
Expand Down Expand Up @@ -2286,17 +2272,19 @@ async function runSyncJobsCancel(globalFlags, options = {}) {
if (jobId && channelId) {
throw new Error('Use --job-id or --channel, not both');
}
return withCommand(globalFlags, { need: 'archive', lock: 'write' }, async ({ messageSyncService }) => {
const result = messageSyncService.cancelJobs({
return runWithTimeout(async () => {
const storeDir = resolveStoreDir();
await ensureServer(storeDir, { idleExit: '60s' });
const result = await cancelBackfill(storeDir, {
jobId,
channelId,
chatId: channelId,
});
if (globalFlags.json) {
writeJson(result);
} else {
console.log(`Canceled ${result.canceled} job(s).`);
}
});
}, globalFlags.timeoutMs);
}

async function runDoctor(globalFlags, options = {}) {
Expand Down
38 changes: 26 additions & 12 deletions tests/backfill-client.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -207,23 +207,15 @@ describe('backfill status', () => {
});

describe('backfill cancel', () => {
it('routes to the control client when a server is up', async () => {
it('auto-starts the control server and cancels by chat through it', async () => {
services.current = makeFakeServices();
control.pingServer.mockResolvedValue({ ok: true, pid: 7 });

await runProgram(['backfill', 'cancel', '--chat', '@chan']);
expect(control.ensureServer).toHaveBeenCalledWith('/tmp/tgcli-test-store', { idleExit: '60s' });
expect(control.cancelBackfill).toHaveBeenCalledWith('/tmp/tgcli-test-store', { chatId: '@chan' });
expect(control.pingServer).not.toHaveBeenCalled();
expect(services.current.messageSyncService.cancelJobs).not.toHaveBeenCalled();
});

it('falls back to a direct cancel when no server is running', async () => {
services.current = makeFakeServices();
control.pingServer.mockResolvedValue(null);

await runProgram(['backfill', 'cancel', '--chat', '@chan']);
expect(control.cancelBackfill).not.toHaveBeenCalled();
expect(services.current.messageSyncService.cancelJobs).toHaveBeenCalledWith({ channelId: '@chan' });
});
});

describe('backfill (no --chat): queue draining via the server', () => {
Expand Down Expand Up @@ -264,7 +256,7 @@ describe('backfill (no --chat): queue draining via the server', () => {
});
});

describe('backfill jobs add / retry route through the control API', () => {
describe('backfill jobs add / retry / cancel route through the control API', () => {
it('jobs add enqueues via the control client (no in-process queue)', async () => {
services.current = makeFakeServices();
await runProgram(['backfill', 'jobs', 'add', '--chat', '@chan', '--depth', '5']);
Expand Down Expand Up @@ -298,4 +290,26 @@ describe('backfill jobs add / retry route through the control API', () => {
allErrors: false,
});
});

it('jobs cancel by job id auto-starts and routes through the control client', async () => {
services.current = makeFakeServices();
await runProgram(['backfill', 'jobs', 'cancel', '--job-id', '42']);
expect(control.ensureServer).toHaveBeenCalledWith('/tmp/tgcli-test-store', { idleExit: '60s' });
expect(control.cancelBackfill).toHaveBeenCalledWith('/tmp/tgcli-test-store', {
jobId: 42,
chatId: null,
});
expect(services.current.messageSyncService.cancelJobs).not.toHaveBeenCalled();
});

it('jobs cancel by channel auto-starts and routes through the control client', async () => {
services.current = makeFakeServices();
await runProgram(['backfill', 'jobs', 'cancel', '--channel', '@chan']);
expect(control.ensureServer).toHaveBeenCalledWith('/tmp/tgcli-test-store', { idleExit: '60s' });
expect(control.cancelBackfill).toHaveBeenCalledWith('/tmp/tgcli-test-store', {
jobId: null,
chatId: '@chan',
});
expect(services.current.messageSyncService.cancelJobs).not.toHaveBeenCalled();
});
});