diff --git a/cli.js b/cli.js index dc15b9c..34a38d3 100755 --- a/cli.js +++ b/cli.js @@ -1723,30 +1723,16 @@ async function runBackfillWait(globalFlags) { }, globalFlags.timeoutMs); } -// Cancel backfills for a chat. Prefer the control server (so an in-flight job is -// stopped by the worker); fall back to a direct DB delete when no server is up. +// Cancel backfills for a chat through the control server, auto-starting it when +// needed so all queue writes go through the sole writer. async function runBackfillCancel(globalFlags, options = {}) { return runWithTimeout(async () => { if (!options.chat) { throw new Error('--chat is required'); } const storeDir = resolveStoreDir(); - const ping = await pingServer(storeDir); - let result; - if (ping) { - result = await cancelBackfill(storeDir, { chatId: options.chat }); - } else { - // No server running: same direct path as `backfill jobs cancel --channel`. - const release = acquireStoreLock(storeDir); - const { telegramClient, messageSyncService } = createServices({ storeDir }); - try { - result = messageSyncService.cancelJobs({ channelId: options.chat }); - } finally { - await messageSyncService.close(); - await telegramClient.destroy(); - release(); - } - } + await ensureServer(storeDir, { idleExit: '60s' }); + const result = await cancelBackfill(storeDir, { chatId: options.chat }); if (globalFlags.json) { writeJson(result); } else { @@ -2286,17 +2272,19 @@ async function runSyncJobsCancel(globalFlags, options = {}) { if (jobId && channelId) { throw new Error('Use --job-id or --channel, not both'); } - return withCommand(globalFlags, { need: 'archive', lock: 'write' }, async ({ messageSyncService }) => { - const result = messageSyncService.cancelJobs({ + return runWithTimeout(async () => { + const storeDir = resolveStoreDir(); + await ensureServer(storeDir, { idleExit: '60s' }); + const result = await cancelBackfill(storeDir, { jobId, - channelId, + chatId: channelId, }); if (globalFlags.json) { writeJson(result); } else { console.log(`Canceled ${result.canceled} job(s).`); } - }); + }, globalFlags.timeoutMs); } async function runDoctor(globalFlags, options = {}) { diff --git a/tests/backfill-client.test.js b/tests/backfill-client.test.js index 4d9730f..465c6c0 100644 --- a/tests/backfill-client.test.js +++ b/tests/backfill-client.test.js @@ -207,23 +207,15 @@ describe('backfill status', () => { }); describe('backfill cancel', () => { - it('routes to the control client when a server is up', async () => { + it('auto-starts the control server and cancels by chat through it', async () => { services.current = makeFakeServices(); - control.pingServer.mockResolvedValue({ ok: true, pid: 7 }); await runProgram(['backfill', 'cancel', '--chat', '@chan']); + expect(control.ensureServer).toHaveBeenCalledWith('/tmp/tgcli-test-store', { idleExit: '60s' }); expect(control.cancelBackfill).toHaveBeenCalledWith('/tmp/tgcli-test-store', { chatId: '@chan' }); + expect(control.pingServer).not.toHaveBeenCalled(); expect(services.current.messageSyncService.cancelJobs).not.toHaveBeenCalled(); }); - - it('falls back to a direct cancel when no server is running', async () => { - services.current = makeFakeServices(); - control.pingServer.mockResolvedValue(null); - - await runProgram(['backfill', 'cancel', '--chat', '@chan']); - expect(control.cancelBackfill).not.toHaveBeenCalled(); - expect(services.current.messageSyncService.cancelJobs).toHaveBeenCalledWith({ channelId: '@chan' }); - }); }); describe('backfill (no --chat): queue draining via the server', () => { @@ -264,7 +256,7 @@ describe('backfill (no --chat): queue draining via the server', () => { }); }); -describe('backfill jobs add / retry route through the control API', () => { +describe('backfill jobs add / retry / cancel route through the control API', () => { it('jobs add enqueues via the control client (no in-process queue)', async () => { services.current = makeFakeServices(); await runProgram(['backfill', 'jobs', 'add', '--chat', '@chan', '--depth', '5']); @@ -298,4 +290,26 @@ describe('backfill jobs add / retry route through the control API', () => { allErrors: false, }); }); + + it('jobs cancel by job id auto-starts and routes through the control client', async () => { + services.current = makeFakeServices(); + await runProgram(['backfill', 'jobs', 'cancel', '--job-id', '42']); + expect(control.ensureServer).toHaveBeenCalledWith('/tmp/tgcli-test-store', { idleExit: '60s' }); + expect(control.cancelBackfill).toHaveBeenCalledWith('/tmp/tgcli-test-store', { + jobId: 42, + chatId: null, + }); + expect(services.current.messageSyncService.cancelJobs).not.toHaveBeenCalled(); + }); + + it('jobs cancel by channel auto-starts and routes through the control client', async () => { + services.current = makeFakeServices(); + await runProgram(['backfill', 'jobs', 'cancel', '--channel', '@chan']); + expect(control.ensureServer).toHaveBeenCalledWith('/tmp/tgcli-test-store', { idleExit: '60s' }); + expect(control.cancelBackfill).toHaveBeenCalledWith('/tmp/tgcli-test-store', { + jobId: null, + chatId: '@chan', + }); + expect(services.current.messageSyncService.cancelJobs).not.toHaveBeenCalled(); + }); });