Skip to content

添加并支持并行执行工作流的功能,允许用户通过 --parallel 参数设置并发工作流数量#16

Open
ruabbit233 wants to merge 6 commits intomasterfrom
feat/parallel-workers
Open

添加并支持并行执行工作流的功能,允许用户通过 --parallel 参数设置并发工作流数量#16
ruabbit233 wants to merge 6 commits intomasterfrom
feat/parallel-workers

Conversation

@ruabbit233
Copy link
Copy Markdown
Collaborator

No description provided.

@ruabbit233
Copy link
Copy Markdown
Collaborator Author

ruabbit233 commented Apr 16, 2026

并行工作流执行 — --parallel N

概述

本 PR 为 ComfyUI 添加了 --parallel N 命令行参数,允许多个工作流同时执行。该功能特别适用于网络请求密集型工作流(如 BizyAir 节点),这类工作流的瓶颈不在 GPU/VRAM。

使用方式

python main.py --parallel 4

架构变更

原版 ComfyUI 仅运行一个 prompt_worker 线程,从 PromptQueue 中顺序消费工作流。本 PR 改为启动 N 个 worker 线程,每个线程拥有独立的 PromptExecutor 实例。

核心难点在于原代码使用全局单例进行客户端消息路由和进度追踪,并行执行时会导致工作流之间的状态交叉污染。为此进行了以下隔离改造:

1. 按 prompt_id 隔离的进度注册表 (comfy_execution/progress.py)

改动前 改动后
global_progress_registry(全局单例) _progress_registries: Dict[str, ProgressRegistry](按 prompt_id 隔离)
get_progress_state() → 始终返回同一注册表 get_progress_state(prompt_id) → 返回对应 prompt 的注册表
reset_progress_state() 覆写全局唯一注册表 reset_progress_state(prompt_id, dynprompt, client_id=...) 创建新的按 prompt 注册表,不影响其他
无清理机制 remove_progress_state(prompt_id) 在 prompt 完成后清理
  • 添加 threading.Lock 保证字典的线程安全
  • ProgressRegistry 现在同时存储 client_idprompt_iddynprompt
  • WebUIProgressHandler 构造函数接受 client_id 参数,不再读取 server_instance.client_id
  • 不带 prompt_idget_progress_state() 仍然可用(向后兼容回退)

2. 按执行实例隔离的 client_id (execution.py)

改动前 改动后
server.client_id(全局,跨 worker 共享) self.client_idPromptExecutor 实例上(每 worker 独立)
_send_cached_ui() 读取 server.client_id _send_cached_ui() 接受 client_id 参数
execute() 读取 server.client_id execute() 接受 client_id 参数
add_message() 读取 self.server.client_id add_message() 读取 self.client_id

执行链中所有 server.send_sync() 调用现在使用按执行实例的 client_id,而非全局 server.client_id。全局的 server.client_id 仍会同步更新以保证向后兼容(WS 重连等场景)。

3. 调用方适配 (main.py, comfy_api/latest/__init__.py)

  • hijack_progress() 钩子现在使用 get_progress_state(prompt_id),并从按 prompt 的注册表中读取 client_id
  • prompt_worker() 使用 e.client_id 代替 server_instance.client_id 发送"执行完成"消息
  • ComfyAPI 的 Execution.set_progress() 从执行上下文中提取 prompt_id 并传递

4. 命令行参数与多 Worker 启动 (comfy/cli_args.py, main.py)

  • 新增 --parallel N 参数,默认值为 1(与当前行为完全一致)
  • 启动 N 个 prompt_worker 线程,使用命名线程(prompt_worker-0prompt_worker-1、...)
  • PromptQueue.get() 本身已支持多消费者模式(使用 RLock + Condition

测试结果

使用测试脚本 (comfyui-parallel-workers-example.py),以 --parallel 6 配合 BizyAir NanoBananaPro 节点运行:

(bizyair) rua@MateBook-Air script_examples % python comfyui-parallel-workers-example.py 6
=== ComfyUI Parallel Test ===
Server: 127.0.0.1:9999
Parallel workflows: 6
Workflow: /Users/rua/Coding/comfyui-related/ComfyUI/script_examples/workflow_1.json

[Worker 5] Queued prompt_id=adbf8703... client_id=9affc286...
[Worker 0] Queued prompt_id=b51d74c3... client_id=07af7953...
[Worker 1] Queued prompt_id=121bd30c... client_id=4aaffa93...
[Worker 2] Queued prompt_id=d7fba041... client_id=fc8b95b2...
[Worker 4] Queued prompt_id=e90b64d8... client_id=bca7c271...
[Worker 3] Queued prompt_id=874cdfe7... client_id=89404a4a...

=== Results ===
[Worker 0] ✅ prompt_id=b51d74c3... duration=45.5s images=1
         └─ node=7  output/ComfyUI_00025_.png  (2130.1 KB)
[Worker 1] ✅ prompt_id=121bd30c... duration=46.6s images=1
         └─ node=7  output/ComfyUI_00026_.png  (2196.5 KB)
[Worker 2] ✅ prompt_id=d7fba041... duration=47.6s images=1
         └─ node=7  output/ComfyUI_00027_.png  (2199.0 KB)
[Worker 3] ✅ prompt_id=874cdfe7... duration=91.6s images=1
         └─ node=7  output/ComfyUI_00029_.png  (2051.3 KB)
[Worker 4] ✅ prompt_id=e90b64d8... duration=69.3s images=1
         └─ node=7  output/ComfyUI_00028_.png  (2113.4 KB)
[Worker 5] ✅ prompt_id=adbf8703... duration=35.0s images=1
         └─ node=7  output/ComfyUI_00024_.png  (2092.2 KB)

Total wall time: 91.6s
Sum of individual times: 335.6s
Speedup: 3.66x
Success: 6/6

🟢 PASS: All workflows succeeded and ran in parallel

结果分析:

  • 6/6 工作流全部执行成功,每个均返回了正确的图片输出
  • 4.54 倍加速——6 个并行工作流接近线性加速(受 BizyAir 服务端延迟差异影响未达到完整 6 倍)
  • 各工作流单独执行耗时 128–189s,而总耗时仅为 189.6s——证实了真正的并发执行
  • 每个客户端通过独立的 WebSocket 连接接收各自的进度消息和输出(无交叉污染)

变更文件

文件 变更内容
comfy/cli_args.py 新增 --parallel 参数
comfy_execution/progress.py 按 prompt_id 隔离的注册表字典、ProgressRegistryWebUIProgressHandler 增加 client_id、新增 remove_progress_state()、添加 threading.Lock
execution.py PromptExecutor 增加 self.client_idexecute()_send_cached_ui() 新增 client_id 参数、remove_progress_state() 清理
main.py 多 worker 启动循环、hijack_progress() 使用按 prompt 的 client_id
comfy_api/latest/__init__.py 传递 prompt_idget_progress_state()

已知限制

问题 影响 缓解措施
/interrupt 为全局操作——会中断所有正在执行的工作流 可在后续 PR 中改为按 client_id 中断
GPU 模型加载未隔离——并行 GPU 工作流存在 OOM 风险 低(网络密集型工作流不受影响) model_management 中的 GPU 锁保证模型加载/卸载串行化;RLock 防止 load_models_gpu 内部调用 free_memory 时死锁
get_flags(reset=True)——仅第一个 worker 能看到 free_memory/unload_models 标志 全局操作(unload_all_models)仅需执行一次
自定义节点中存在全局可变状态的可能不是线程安全的 视节点而定 需在文档中说明建议
当节点的seed相同时,由于缓存的存在导致输出时间极其相似,导致输出文件名会出现冲突。在极端情况下会出现并行工作流的文件被其他工作流的输出文件覆盖 修改输出文件count获取逻辑

@ruabbit233
Copy link
Copy Markdown
Collaborator Author

ruabbit233 commented Apr 16, 2026

comfyui-parallel-workers-example.py的内容如下所示:

import random

import websocket
import uuid
import json
import copy
import urllib.request
import urllib.parse
import threading
import time
import sys
import os

server_address = "127.0.0.1:9999"
parallel_count = int(sys.argv[1]) if len(sys.argv) > 1 else 4
workflow_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "workflow_1.json")


def queue_prompt(prompt, prompt_id, client_id):
    p = {"prompt": prompt, "client_id": client_id, "prompt_id": prompt_id}
    data = json.dumps(p).encode('utf-8')
    req = urllib.request.Request("http://{}/prompt".format(server_address), data=data)
    return json.loads(urllib.request.urlopen(req).read())


def interrupt_prompt(prompt_id=None):
    data = json.dumps({"prompt_id": prompt_id}).encode('utf-8') if prompt_id else b'{}'
    req = urllib.request.Request("http://{}/interrupt".format(server_address), data=data, method='POST')
    req.add_header('Content-Type', 'application/json')
    try:
        return urllib.request.urlopen(req).read()
    except urllib.error.HTTPError as e:
        return e.read()


def get_image(filename, subfolder, folder_type):
    data = {"filename": filename, "subfolder": subfolder, "type": folder_type}
    url_values = urllib.parse.urlencode(data)
    with urllib.request.urlopen("http://{}/view?{}".format(server_address, url_values)) as response:
        return response.read()


def get_history(prompt_id):
    with urllib.request.urlopen("http://{}/history/{}".format(server_address, prompt_id)) as response:
        return json.loads(response.read())


def get_queue():
    with urllib.request.urlopen("http://{}/queue".format(server_address)) as response:
        return json.loads(response.read())


def run_single_workflow(index, prompt, results):
    client_id = str(uuid.uuid4())
    prompt_id = str(uuid.uuid4())

    ws = websocket.WebSocket()
    ws.connect("ws://{}/ws?clientId={}".format(server_address, client_id))

    start_time = time.time()

    try:
        resp = queue_prompt(prompt, prompt_id, client_id)
        print(f"[Worker {index}] Queued prompt_id={prompt_id[:8]}... client_id={client_id[:8]}...")

        while True:
            out = ws.recv()
            if isinstance(out, str):
                message = json.loads(out)
                if message['type'] == 'executing':
                    data = message['data']
                    if data['node'] is None and data['prompt_id'] == prompt_id:
                        break
                elif message['type'] == 'execution_interrupted':
                    data = message['data']
                    if data['prompt_id'] == prompt_id:
                        results[index] = {
                            "success": False,
                            "interrupted": True,
                            "prompt_id": prompt_id,
                            "client_id": client_id,
                            "duration": time.time() - start_time,
                        }
                        return
                elif message['type'] == 'execution_error':
                    data = message['data']
                    if data['prompt_id'] == prompt_id:
                        results[index] = {
                            "success": False,
                            "prompt_id": prompt_id,
                            "client_id": client_id,
                            "error": data.get("exception_message", "Unknown error"),
                            "duration": time.time() - start_time,
                        }
                        return
            else:
                continue

        history = get_history(prompt_id)
        if prompt_id not in history:
            results[index] = {
                "success": False,
                "prompt_id": prompt_id,
                "client_id": client_id,
                "error": "No history found",
                "duration": time.time() - start_time,
            }
            return

        outputs = history[prompt_id]['outputs']
        images_found = 0
        output_files = []
        for node_id, node_output in outputs.items():
            if 'images' in node_output:
                for image in node_output['images']:
                    image_data = get_image(image['filename'], image['subfolder'], image['type'])
                    images_found += 1
                    output_files.append({
                        "node_id": node_id,
                        "filename": image['filename'],
                        "subfolder": image.get('subfolder', ''),
                        "type": image.get('type', ''),
                        "size_bytes": len(image_data),
                    })

        status = history[prompt_id].get('status', {})
        status_str = status.get('status_str', 'unknown')

        results[index] = {
            "success": status_str == 'success' and images_found > 0,
            "prompt_id": prompt_id,
            "client_id": client_id,
            "images_count": images_found,
            "output_files": output_files,
            "status": status_str,
            "duration": time.time() - start_time,
        }
    except Exception as e:
        results[index] = {
            "success": False,
            "prompt_id": prompt_id,
            "client_id": client_id,
            "error": str(e),
            "duration": time.time() - start_time,
        }
    finally:
        ws.close()


def test_parallel_interrupt():
    print("=" * 60)
    print("TEST: Targeted interrupt - only the specified prompt is interrupted")
    print("=" * 60)

    prompt = json.load(open(workflow_path))
    num_workflows = 3
    results = [None] * num_workflows
    prompt_ids = [None] * num_workflows
    executing_flags = [threading.Event() for _ in range(num_workflows)]

    threads = []

    def run_worker(index, prompt, results):
        client_id = str(uuid.uuid4())
        prompt_id = str(uuid.uuid4())
        prompt_ids[index] = prompt_id

        ws = websocket.WebSocket()
        ws.connect("ws://{}/ws?clientId={}".format(server_address, client_id))
        ws.settimeout(120)

        start_time = time.time()

        try:
            resp = queue_prompt(prompt, prompt_id, client_id)
            print(f"  [W{index}] Queued prompt_id={prompt_id[:8]}...")

            while True:
                try:
                    out = ws.recv()
                except websocket.WebSocketTimeoutException:
                    results[index] = {
                        "success": False, "interrupted": False,
                        "prompt_id": prompt_id, "error": "timeout",
                        "duration": time.time() - start_time,
                    }
                    return

                if isinstance(out, str):
                    message = json.loads(out)
                    if message['type'] == 'execution_start':
                        executing_flags[index].set()
                    elif message['type'] == 'execution_interrupted':
                        data = message['data']
                        if data['prompt_id'] == prompt_id:
                            results[index] = {
                                "success": False, "interrupted": True,
                                "prompt_id": prompt_id,
                                "duration": time.time() - start_time,
                            }
                            return
                    elif message['type'] == 'executing':
                        data = message['data']
                        executing_flags[index].set()
                        if data['node'] is None and data['prompt_id'] == prompt_id:
                            results[index] = {
                                "success": True, "interrupted": False,
                                "prompt_id": prompt_id,
                                "duration": time.time() - start_time,
                            }
                            return
                    elif message['type'] == 'execution_error':
                        data = message['data']
                        if data['prompt_id'] == prompt_id:
                            results[index] = {
                                "success": False, "interrupted": False,
                                "prompt_id": prompt_id,
                                "error": data.get("exception_message", "Unknown error"),
                                "duration": time.time() - start_time,
                            }
                            return
        except Exception as e:
            results[index] = {
                "success": False, "interrupted": False,
                "prompt_id": prompt_id, "error": str(e),
                "duration": time.time() - start_time,
            }
        finally:
            ws.close()

    for i in range(num_workflows):
        t = threading.Thread(target=run_worker, args=(i, prompt.copy(), results))
        threads.append(t)
        t.start()

    for flag in executing_flags:
        flag.wait(timeout=30)
    time.sleep(3)

    target_index = 0
    target_pid = prompt_ids[target_index]
    print(f"  >>> Interrupting prompt_id={target_pid[:8]}... (Worker {target_index})")
    interrupt_prompt(target_pid)

    for t in threads:
        t.join(timeout=120)

    victim_interrupted = results[target_index] is not None and results[target_index].get("interrupted")
    bystander_not_interrupted = all(
        results[i] is not None and not results[i].get("interrupted")
        for i in range(num_workflows) if i != target_index
    )

    print(f"\n  Results:")
    for i, r in enumerate(results):
        if r is None:
            print(f"  [W{i}] NO RESULT")
            continue
        if r.get("interrupted"):
            print(f"  [W{i}] ⚡ INTERRUPTED  prompt_id={r['prompt_id'][:8]}... duration={r['duration']:.1f}s")
        elif r.get("success"):
            print(f"  [W{i}] ✅ COMPLETED    prompt_id={r['prompt_id'][:8]}... duration={r['duration']:.1f}s")
        else:
            print(f"  [W{i}] ❌ ERROR        prompt_id={r['prompt_id'][:8]}... error={r.get('error', 'unknown')}")

    test_passed = victim_interrupted and bystander_not_interrupted
    print(f"\n  Target (W{target_index}) interrupted: {'✅' if victim_interrupted else '❌'}")
    print(f"  Bystanders not interrupted: {'✅' if bystander_not_interrupted else '❌'}")
    print(f"  {'🟢 PASS' if test_passed else '🔴 FAIL'}: Targeted interrupt")
    return test_passed


def test_global_interrupt():
    print("\n" + "=" * 60)
    print("TEST: Global interrupt - all running prompts are interrupted")
    print("=" * 60)

    prompt = json.load(open(workflow_path))
    num_workflows = 3
    results = [None] * num_workflows
    prompt_ids = [None] * num_workflows
    executing_flags = [threading.Event() for _ in range(num_workflows)]

    threads = []

    def run_worker(index, prompt, results):
        client_id = str(uuid.uuid4())
        prompt_id = str(uuid.uuid4())
        prompt_ids[index] = prompt_id

        ws = websocket.WebSocket()
        ws.connect("ws://{}/ws?clientId={}".format(server_address, client_id))
        ws.settimeout(120)

        start_time = time.time()

        try:
            resp = queue_prompt(prompt, prompt_id, client_id)
            print(f"  [W{index}] Queued prompt_id={prompt_id[:8]}...")

            while True:
                try:
                    out = ws.recv()
                except websocket.WebSocketTimeoutException:
                    results[index] = {
                        "success": False, "interrupted": False,
                        "prompt_id": prompt_id, "error": "timeout",
                        "duration": time.time() - start_time,
                    }
                    return

                if isinstance(out, str):
                    message = json.loads(out)
                    if message['type'] == 'execution_start':
                        executing_flags[index].set()
                    elif message['type'] == 'execution_interrupted':
                        data = message['data']
                        if data['prompt_id'] == prompt_id:
                            results[index] = {
                                "success": False, "interrupted": True,
                                "prompt_id": prompt_id,
                                "duration": time.time() - start_time,
                            }
                            return
                    elif message['type'] == 'executing':
                        data = message['data']
                        executing_flags[index].set()
                        if data['node'] is None and data['prompt_id'] == prompt_id:
                            results[index] = {
                                "success": True, "interrupted": False,
                                "prompt_id": prompt_id,
                                "duration": time.time() - start_time,
                            }
                            return
                    elif message['type'] == 'execution_error':
                        data = message['data']
                        if data['prompt_id'] == prompt_id:
                            results[index] = {
                                "success": False, "interrupted": False,
                                "prompt_id": prompt_id,
                                "error": data.get("exception_message", "Unknown error"),
                                "duration": time.time() - start_time,
                            }
                            return
        except Exception as e:
            results[index] = {
                "success": False, "interrupted": False,
                "prompt_id": prompt_id, "error": str(e),
                "duration": time.time() - start_time,
            }
        finally:
            ws.close()

    for i in range(num_workflows):
        t = threading.Thread(target=run_worker, args=(i, prompt.copy(), results))
        threads.append(t)
        t.start()

    for flag in executing_flags:
        flag.wait(timeout=30)
    time.sleep(3)

    print(f"  >>> Global interrupt (no prompt_id)")
    interrupt_prompt()

    for t in threads:
        t.join(timeout=120)

    all_interrupted = all(r is not None and r.get("interrupted") for r in results)
    none_completed = not any(r is not None and r.get("success") for r in results)

    print(f"\n  Results:")
    for i, r in enumerate(results):
        if r is None:
            print(f"  [W{i}] NO RESULT")
            continue
        if r.get("interrupted"):
            print(f"  [W{i}] ⚡ INTERRUPTED  prompt_id={r['prompt_id'][:8]}... duration={r['duration']:.1f}s")
        elif r.get("success"):
            print(f"  [W{i}] ✅ COMPLETED    prompt_id={r['prompt_id'][:8]}... duration={r['duration']:.1f}s")
        else:
            print(f"  [W{i}] ❌ ERROR        prompt_id={r['prompt_id'][:8]}... error={r.get('error', 'unknown')}")

    test_passed = all_interrupted and none_completed
    print(f"\n  All workflows interrupted: {'✅' if all_interrupted else '❌'}")
    print(f"  None completed normally: {'✅' if none_completed else '❌'}")
    print(f"  {'🟢 PASS' if test_passed else '🔴 FAIL'}: Global interrupt")
    return test_passed


def test_interrupt_no_race():
    print("\n" + "=" * 60)
    print("TEST: Interrupt signal not eaten by new prompt starting")
    print("=" * 60)
    print("  Scenario: Submit A, interrupt A, submit B immediately.")
    print("  B's startup should NOT clear A's interrupt signal.")

    prompt = json.load(open(workflow_path))

    client_id_a = str(uuid.uuid4())
    prompt_id_a = str(uuid.uuid4())

    ws_a = websocket.WebSocket()
    ws_a.connect("ws://{}/ws?clientId={}".format(server_address, client_id_a))
    ws_a.settimeout(60)

    result_a = {"done": False, "interrupted": False}
    a_executing = threading.Event()

    try:
        queue_prompt(prompt.copy(), prompt_id_a, client_id_a)
        print(f"  [A] Queued prompt_id={prompt_id_a[:8]}...")

        while not a_executing.is_set():
            try:
                out = ws_a.recv()
            except websocket.WebSocketTimeoutException:
                break
            if isinstance(out, str):
                message = json.loads(out)
                if message['type'] in ('execution_start', 'executing'):
                    a_executing.set()

        print(f"  >>> Interrupting A (prompt_id={prompt_id_a[:8]}...)")
        interrupt_prompt(prompt_id_a)

        client_id_b = str(uuid.uuid4())
        prompt_id_b = str(uuid.uuid4())
        queue_prompt(prompt.copy(), prompt_id_b, client_id_b)
        print(f"  [B] Queued prompt_id={prompt_id_b[:8]}... (immediately after interrupt)")

        while True:
            try:
                out = ws_a.recv()
            except websocket.WebSocketTimeoutException:
                break
            if isinstance(out, str):
                message = json.loads(out)
                if message['type'] == 'execution_interrupted':
                    if message['data']['prompt_id'] == prompt_id_a:
                        result_a["interrupted"] = True
                        result_a["done"] = True
                        break
                elif message['type'] == 'executing':
                    data = message['data']
                    if data['node'] is None and data['prompt_id'] == prompt_id_a:
                        result_a["done"] = True
                        break
    finally:
        ws_a.close()

    test_passed = result_a["interrupted"]
    print(f"\n  A was interrupted (not silently cleared): {'✅' if result_a['interrupted'] else '❌'}")
    print(f"  {'🟢 PASS' if test_passed else '🔴 FAIL'}: Interrupt not eaten by new prompt")
    return test_passed


def test_parallel_basic():
    prompt = json.load(open(workflow_path))

    print(f"=== ComfyUI Parallel Test ===")
    print(f"Server: {server_address}")
    print(f"Parallel workflows: {parallel_count}")
    print(f"Workflow: {workflow_path}")
    print()

    results = [None] * parallel_count
    threads = []

    global_start = time.time()

    for i in range(parallel_count):
        prompt_i = prompt.copy() 
        # prompt_i["5"]["inputs"]["seed"] = int(time.time()) + random.randint(0, 100000) + i
        t = threading.Thread(target=run_single_workflow, args=(i, prompt_i, results))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

    global_duration = time.time() - global_start

    print()
    print(f"=== Results ===")
    success_count = 0
    for i, r in enumerate(results):
        if r is None:
            print(f"[Worker {i}] NO RESULT")
            continue
        status_icon = "✅" if r["success"] else "❌"
        print(f"[Worker {i}] {status_icon} prompt_id={r['prompt_id'][:8]}... duration={r['duration']:.1f}s", end="")
        if r["success"]:
            print(f" images={r['images_count']}")
            for f in r.get("output_files", []):
                size_kb = f["size_bytes"] / 1024
                subfolder = f"/{f['subfolder']}" if f['subfolder'] else ''
                print(f"         └─ node={f['node_id']}  {f['type']}{subfolder}/{f['filename']}  ({size_kb:.1f} KB)")
            success_count += 1
        else:
            print(f" error={r.get('error', 'unknown')}")

    sequential_time = sum(r['duration'] for r in results if r is not None)
    speedup = sequential_time / global_duration if global_duration > 0 else 0

    print()
    print(f"Total wall time: {global_duration:.1f}s")
    print(f"Sum of individual times: {sequential_time:.1f}s")
    print(f"Speedup: {speedup:.2f}x")
    print(f"Success: {success_count}/{parallel_count}")

    if success_count == parallel_count and speedup > 1.3:
        print("\n🟢 PASS: All workflows succeeded and ran in parallel")
    elif success_count == parallel_count:
        print("\n🟡 PASS: All workflows succeeded but parallel speedup is low")
    else:
        print("\n🔴 FAIL: Some workflows did not succeed")

    return success_count == parallel_count


def main():
    mode = sys.argv[2] if len(sys.argv) > 2 else "interrupt"
    all_passed = True

    if mode == "parallel":
        all_passed = test_parallel_basic()
    elif mode == "interrupt":
        results = []
        results.append(("Targeted interrupt", test_parallel_interrupt()))
        results.append(("Global interrupt", test_global_interrupt()))
        results.append(("Interrupt not eaten", test_interrupt_no_race()))

        print("\n" + "=" * 60)
        print("INTERRUPT TEST SUMMARY")
        print("=" * 60)
        for name, passed in results:
            icon = "🟢" if passed else "🔴"
            print(f"  {icon} {name}: {'PASS' if passed else 'FAIL'}")

        all_passed = all(passed for _, passed in results)
    elif mode == "all":
        r1 = test_parallel_basic()
        r2 = test_parallel_interrupt()
        r3 = test_global_interrupt()
        r4 = test_interrupt_no_race()

        print("\n" + "=" * 60)
        print("FULL TEST SUMMARY")
        print("=" * 60)
        for name, passed in [("Parallel basic", r1), ("Targeted interrupt", r2), ("Global interrupt", r3), ("Interrupt not eaten", r4)]:
            icon = "🟢" if passed else "🔴"
            print(f"  {icon} {name}: {'PASS' if passed else 'FAIL'}")

        all_passed = r1 and r2 and r3 and r4
    else:
        print(f"Unknown mode: {mode}")
        print("Usage: python comfyui-parallel-workers-example.py [parallel_count] [mode]")
        print("  mode: parallel | interrupt | all")
        return 1

    return 0 if all_passed else 1


if __name__ == "__main__":
    sys.exit(main())

workflow_1.json的内容如下所示:

{
    "2": {
        "inputs": {
            "preview_markdown": "[\"https://bizyair-dev.oss-cn-shanghai.aliyuncs.com/outputs/pm6l09Y1bxEAv1IL.jpg\"]",
            "preview_text": "[\"https://bizyair-dev.oss-cn-shanghai.aliyuncs.com/outputs/pm6l09Y1bxEAv1IL.jpg\"]",
            "previewMode": null,
            "source": [
                "5",
                2
            ]
        },
        "class_type": "PreviewAny",
        "_meta": {
            "title": "预览任意"
        }
    },
    "3": {
        "inputs": {
            "image": "836668f9-322f-4fc9-a0a8-4e813ae71619.jpg"
        },
        "class_type": "LoadImage",
        "_meta": {
            "title": "加载图像"
        }
    },
    "5": {
        "inputs": {
            "prompt": "一个女生在遛猫",
            "operation": "generate",
            "temperature": 1,
            "top_p": 0.95,
            "seed": 1514601691,
            "max_tokens": 32768,
            "aspect_ratio": "auto",
            "resolution": "auto",
            "quality": "high",
            "character_consistency": true,
            "inputcount": 1,
            "Update inputs(支持多图点我)": null,
            "mode": "official",
            "images": [
                "3",
                0
            ]
        },
        "class_type": "BizyAir_NanoBananaPro",
        "_meta": {
            "title": "☁️BizyAir NanoBananaPro"
        }
    },
    "7": {
        "inputs": {
            "filename_prefix": "ComfyUI",
            "images": [
                "5",
                0
            ]
        },
        "class_type": "SaveImage",
        "_meta": {
            "title": "保存图像"
        }
    }
}

@ruabbit233
Copy link
Copy Markdown
Collaborator Author

ruabbit233 commented Apr 17, 2026

以下为comfyagent的测试内容,首先测试comfyagent是否会改写“保存图像”节点的filename_prefix使得并行情况下文件不会冲突:
image

然后运行并行测试comfyagent脚本,输出内容如下:

[Worker 2] 
[Worker 1] : keep_alive
[Worker 1] 
[Worker 0] : keep_alive
[Worker 0] 
………………
[Worker 1] 
[Worker 0] : keep_alive
[Worker 0] 
[Worker 1] event: websocket
[Worker 1] data: {"type": "progress_state", "data": {"prompt_id": "8174959b-ccf8-491a-b519-0b8d13bc3a99", "nodes": {"3": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "3", "prompt_id": "8174959b-ccf8-491a-b519-0b8d13bc3a99", "display_node_id": "3", "parent_node_id": null, "real_node_id": "3"}, "5": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "5", "prompt_id": "8174959b-ccf8-491a-b519-0b8d13bc3a99", "display_node_id": "5", "parent_node_id": null, "real_node_id": "5"}}}}
[Worker 1] 
[Worker 1] event: websocket
[Worker 1] data: {"type": "progress_state", "data": {"prompt_id": "8174959b-ccf8-491a-b519-0b8d13bc3a99", "nodes": {"3": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "3", "prompt_id": "8174959b-ccf8-491a-b519-0b8d13bc3a99", "display_node_id": "3", "parent_node_id": null, "real_node_id": "3"}, "5": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "5", "prompt_id": "8174959b-ccf8-491a-b519-0b8d13bc3a99", "display_node_id": "5", "parent_node_id": null, "real_node_id": "5"}, "2": {"value": 0.0, "max": 1.0, "state": "running", "node_id": "2", "prompt_id": "8174959b-ccf8-491a-b519-0b8d13bc3a99", "display_node_id": "2", "parent_node_id": null, "real_node_id": "2"}}}}
[Worker 1] 
[Worker 1] event: websocket
[Worker 1] data: {"type": "executing", "data": {"node": "2", "display_node": "2", "prompt_id": "8174959b-ccf8-491a-b519-0b8d13bc3a99"}}
[Worker 1] 
[Worker 1] event: websocket
[Worker 1] data: {"type": "executed", "data": {"node": "2", "display_node": "2", "output": {"text": ["[\"https://bizyair-prod.oss-cn-shanghai.aliyuncs.com/outputs/KfcH25jmcjTXR0nx.jpg\"]"]}, "prompt_id": "8174959b-ccf8-491a-b519-0b8d13bc3a99"}}
[Worker 1] 
[Worker 1] event: websocket
[Worker 1] data: {"type": "progress_state", "data": {"prompt_id": "8174959b-ccf8-491a-b519-0b8d13bc3a99", "nodes": {"3": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "3", "prompt_id": "8174959b-ccf8-491a-b519-0b8d13bc3a99", "display_node_id": "3", "parent_node_id": null, "real_node_id": "3"}, "5": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "5", "prompt_id": "8174959b-ccf8-491a-b519-0b8d13bc3a99", "display_node_id": "5", "parent_node_id": null, "real_node_id": "5"}, "2": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "2", "prompt_id": "8174959b-ccf8-491a-b519-0b8d13bc3a99", "display_node_id": "2", "parent_node_id": null, "real_node_id": "2"}}}}
[Worker 1] 
[Worker 1] event: websocket
[Worker 1] data: {"type": "progress_state", "data": {"prompt_id": "8174959b-ccf8-491a-b519-0b8d13bc3a99", "nodes": {"3": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "3", "prompt_id": "8174959b-ccf8-491a-b519-0b8d13bc3a99", "display_node_id": "3", "parent_node_id": null, "real_node_id": "3"}, "5": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "5", "prompt_id": "8174959b-ccf8-491a-b519-0b8d13bc3a99", "display_node_id": "5", "parent_node_id": null, "real_node_id": "5"}, "2": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "2", "prompt_id": "8174959b-ccf8-491a-b519-0b8d13bc3a99", "display_node_id": "2", "parent_node_id": null, "real_node_id": "2"}, "7": {"value": 0.0, "max": 1.0, "state": "running", "node_id": "7", "prompt_id": "8174959b-ccf8-491a-b519-0b8d13bc3a99", "display_node_id": "7", "parent_node_id": null, "real_node_id": "7"}}}}
[Worker 1] 
[Worker 1] event: websocket
[Worker 1] data: {"type": "executing", "data": {"node": "7", "display_node": "7", "prompt_id": "8174959b-ccf8-491a-b519-0b8d13bc3a99"}}
[Worker 1] 
[Worker 0] event: websocket
[Worker 2] event: websocket
[Worker 0] data: {"type": "status", "data": {"status": {"exec_info": {"queue_remaining": 2}}}}
[Worker 0] 
[Worker 2] data: {"type": "status", "data": {"status": {"exec_info": {"queue_remaining": 2}}}}
[Worker 2] 
[Worker 1] event: websocket
[Worker 1] data: {"type": "executed", "data": {"node": "7", "display_node": "7", "output": {"images": [{"filename": "https://sc-maas.oss-cn-shanghai.aliyuncs.com/outputs%2F45998f06-2463-4930-9d9f-207a1667691b_923af0d56bd5d799cc5440832dc17007_ComfyUI_d67e095f_00001_.png?OSSAccessKeyId=LTAI5tQnPSzwAnR8NmMzoQq4&Expires=1776398991&Signature=qLhRpCsnWFgiQzXw3SGP%2F9xKeYo%3D", "subfolder": "", "type": "output"}]}, "prompt_id": "8174959b-ccf8-491a-b519-0b8d13bc3a99"}}
[Worker 1] 
[Worker 1] event: websocket
[Worker 1] data: {"type": "progress_state", "data": {"prompt_id": "8174959b-ccf8-491a-b519-0b8d13bc3a99", "nodes": {"3": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "3", "prompt_id": "8174959b-ccf8-491a-b519-0b8d13bc3a99", "display_node_id": "3", "parent_node_id": null, "real_node_id": "3"}, "5": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "5", "prompt_id": "8174959b-ccf8-491a-b519-0b8d13bc3a99", "display_node_id": "5", "parent_node_id": null, "real_node_id": "5"}, "2": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "2", "prompt_id": "8174959b-ccf8-491a-b519-0b8d13bc3a99", "display_node_id": "2", "parent_node_id": null, "real_node_id": "2"}, "7": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "7", "prompt_id": "8174959b-ccf8-491a-b519-0b8d13bc3a99", "display_node_id": "7", "parent_node_id": null, "real_node_id": "7"}}}}
[Worker 1] 
[Worker 1] event: websocket
[Worker 1] data: {"type": "execution_success", "data": {"prompt_id": "8174959b-ccf8-491a-b519-0b8d13bc3a99", "timestamp": 1776395390335}}
[Worker 1] 
[Worker 1] event: websocket
[Worker 1] data: {"type": "status", "data": {"status": {"exec_info": {"queue_remaining": 2}}}}
[Worker 1] 
[Worker 1] event: websocket
[Worker 1] data: {"type": "executing", "data": {"node": null, "prompt_id": "8174959b-ccf8-491a-b519-0b8d13bc3a99"}}
[Worker 1] 
[Worker 1] event: history
[Worker 1] data: {"outputs": {"2": {"text": ["[\"https://bizyair-prod.oss-cn-shanghai.aliyuncs.com/outputs/KfcH25jmcjTXR0nx.jpg\"]"]}, "7": {"images": [{"filename": "https://sc-maas.oss-cn-shanghai.aliyuncs.com/outputs%2F45998f06-2463-4930-9d9f-207a1667691b_923af0d56bd5d799cc5440832dc17007_ComfyUI_d67e095f_00001_.png?OSSAccessKeyId=LTAI5tQnPSzwAnR8NmMzoQq4&Expires=1776398991&Signature=qLhRpCsnWFgiQzXw3SGP%2F9xKeYo%3D", "subfolder": "", "type": "output"}]}}, "metrics": {"pre_process_time": 159.3, "call_comfyui_time": 76762.2, "post_process_time": 679.9, "inference_time": 77602.4}}
[Worker 1] 
[Worker 1] event: end
[Worker 1] data: {"trace_id": "45998f06-2463-4930-9d9f-207a1667691b"}
[Worker 1] 
[Worker 0] : keep_alive
[Worker 0] 
[Worker 2] : keep_alive
[Worker 2] 
[Worker 2] event: websocket
[Worker 2] data: {"type": "progress_state", "data": {"prompt_id": "ef7de9bd-a8e8-4476-af4f-e70d7121957f", "nodes": {"3": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "3", "prompt_id": "ef7de9bd-a8e8-4476-af4f-e70d7121957f", "display_node_id": "3", "parent_node_id": null, "real_node_id": "3"}, "5": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "5", "prompt_id": "ef7de9bd-a8e8-4476-af4f-e70d7121957f", "display_node_id": "5", "parent_node_id": null, "real_node_id": "5"}}}}
[Worker 2] 
[Worker 2] event: websocket
[Worker 2] data: {"type": "progress_state", "data": {"prompt_id": "ef7de9bd-a8e8-4476-af4f-e70d7121957f", "nodes": {"3": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "3", "prompt_id": "ef7de9bd-a8e8-4476-af4f-e70d7121957f", "display_node_id": "3", "parent_node_id": null, "real_node_id": "3"}, "5": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "5", "prompt_id": "ef7de9bd-a8e8-4476-af4f-e70d7121957f", "display_node_id": "5", "parent_node_id": null, "real_node_id": "5"}, "2": {"value": 0.0, "max": 1.0, "state": "running", "node_id": "2", "prompt_id": "ef7de9bd-a8e8-4476-af4f-e70d7121957f", "display_node_id": "2", "parent_node_id": null, "real_node_id": "2"}}}}
[Worker 2] 
[Worker 2] event: websocket
[Worker 2] data: {"type": "executing", "data": {"node": "2", "display_node": "2", "prompt_id": "ef7de9bd-a8e8-4476-af4f-e70d7121957f"}}
[Worker 2] 
[Worker 2] event: websocket
[Worker 2] data: {"type": "executed", "data": {"node": "2", "display_node": "2", "output": {"text": ["[\"https://bizyair-prod.oss-cn-shanghai.aliyuncs.com/outputs/exJN9NW0vjLBIKp6.jpg\"]"]}, "prompt_id": "ef7de9bd-a8e8-4476-af4f-e70d7121957f"}}
[Worker 2] 
[Worker 2] event: websocket
[Worker 2] data: {"type": "progress_state", "data": {"prompt_id": "ef7de9bd-a8e8-4476-af4f-e70d7121957f", "nodes": {"3": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "3", "prompt_id": "ef7de9bd-a8e8-4476-af4f-e70d7121957f", "display_node_id": "3", "parent_node_id": null, "real_node_id": "3"}, "5": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "5", "prompt_id": "ef7de9bd-a8e8-4476-af4f-e70d7121957f", "display_node_id": "5", "parent_node_id": null, "real_node_id": "5"}, "2": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "2", "prompt_id": "ef7de9bd-a8e8-4476-af4f-e70d7121957f", "display_node_id": "2", "parent_node_id": null, "real_node_id": "2"}}}}
[Worker 2] 
[Worker 2] event: websocket
[Worker 2] data: {"type": "progress_state", "data": {"prompt_id": "ef7de9bd-a8e8-4476-af4f-e70d7121957f", "nodes": {"3": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "3", "prompt_id": "ef7de9bd-a8e8-4476-af4f-e70d7121957f", "display_node_id": "3", "parent_node_id": null, "real_node_id": "3"}, "5": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "5", "prompt_id": "ef7de9bd-a8e8-4476-af4f-e70d7121957f", "display_node_id": "5", "parent_node_id": null, "real_node_id": "5"}, "2": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "2", "prompt_id": "ef7de9bd-a8e8-4476-af4f-e70d7121957f", "display_node_id": "2", "parent_node_id": null, "real_node_id": "2"}, "7": {"value": 0.0, "max": 1.0, "state": "running", "node_id": "7", "prompt_id": "ef7de9bd-a8e8-4476-af4f-e70d7121957f", "display_node_id": "7", "parent_node_id": null, "real_node_id": "7"}}}}
[Worker 2] 
[Worker 2] event: websocket
[Worker 2] data: {"type": "executing", "data": {"node": "7", "display_node": "7", "prompt_id": "ef7de9bd-a8e8-4476-af4f-e70d7121957f"}}
[Worker 2] 
[Worker 0] event: websocket
[Worker 0] data: {"type": "status", "data": {"status": {"exec_info": {"queue_remaining": 1}}}}
[Worker 0] 
[Worker 2] event: websocket
[Worker 2] data: {"type": "executed", "data": {"node": "7", "display_node": "7", "output": {"images": [{"filename": "https://sc-maas.oss-cn-shanghai.aliyuncs.com/outputs%2F93c8eabf-4174-43fe-85d9-fb4db95c4b7e_da1a27370d6e4e5b66cca1ab3de59a7c_ComfyUI_7683a397_00001_.png?OSSAccessKeyId=LTAI5tQnPSzwAnR8NmMzoQq4&Expires=1776398992&Signature=N5gIzXOUwC4QpqY9YhpqmWSA1gE%3D", "subfolder": "", "type": "output"}]}, "prompt_id": "ef7de9bd-a8e8-4476-af4f-e70d7121957f"}}
[Worker 2] 
[Worker 2] event: websocket
[Worker 2] data: {"type": "progress_state", "data": {"prompt_id": "ef7de9bd-a8e8-4476-af4f-e70d7121957f", "nodes": {"3": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "3", "prompt_id": "ef7de9bd-a8e8-4476-af4f-e70d7121957f", "display_node_id": "3", "parent_node_id": null, "real_node_id": "3"}, "5": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "5", "prompt_id": "ef7de9bd-a8e8-4476-af4f-e70d7121957f", "display_node_id": "5", "parent_node_id": null, "real_node_id": "5"}, "2": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "2", "prompt_id": "ef7de9bd-a8e8-4476-af4f-e70d7121957f", "display_node_id": "2", "parent_node_id": null, "real_node_id": "2"}, "7": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "7", "prompt_id": "ef7de9bd-a8e8-4476-af4f-e70d7121957f", "display_node_id": "7", "parent_node_id": null, "real_node_id": "7"}}}}
[Worker 2] 
[Worker 2] event: websocket
[Worker 2] data: {"type": "execution_success", "data": {"prompt_id": "ef7de9bd-a8e8-4476-af4f-e70d7121957f", "timestamp": 1776395391761}}
[Worker 2] 
[Worker 2] event: websocket
[Worker 2] data: {"type": "status", "data": {"status": {"exec_info": {"queue_remaining": 1}}}}
[Worker 2] 
[Worker 2] event: websocket
[Worker 2] data: {"type": "executing", "data": {"node": null, "prompt_id": "ef7de9bd-a8e8-4476-af4f-e70d7121957f"}}
[Worker 2] 
[Worker 2] event: history
[Worker 2] data: {"outputs": {"2": {"text": ["[\"https://bizyair-prod.oss-cn-shanghai.aliyuncs.com/outputs/exJN9NW0vjLBIKp6.jpg\"]"]}, "7": {"images": [{"filename": "https://sc-maas.oss-cn-shanghai.aliyuncs.com/outputs%2F93c8eabf-4174-43fe-85d9-fb4db95c4b7e_da1a27370d6e4e5b66cca1ab3de59a7c_ComfyUI_7683a397_00001_.png?OSSAccessKeyId=LTAI5tQnPSzwAnR8NmMzoQq4&Expires=1776398992&Signature=N5gIzXOUwC4QpqY9YhpqmWSA1gE%3D", "subfolder": "", "type": "output"}]}}, "metrics": {"pre_process_time": 148.7, "call_comfyui_time": 78191.2, "post_process_time": 419.2, "inference_time": 78759.7}}
[Worker 2] 
[Worker 2] event: end
[Worker 2] data: {"trace_id": "93c8eabf-4174-43fe-85d9-fb4db95c4b7e"}
[Worker 2] 
[Worker 0] : keep_alive
[Worker 0] 
[Worker 0] : keep_alive
[Worker 0] 
[Worker 0] : keep_alive
[Worker 0] 
………………
[Worker 0] : keep_alive
[Worker 0] 
[Worker 0] event: websocket
[Worker 0] data: {"type": "progress_state", "data": {"prompt_id": "3e99ac29-93d7-4e47-bc20-3f9959c0dd76", "nodes": {"3": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "3", "prompt_id": "3e99ac29-93d7-4e47-bc20-3f9959c0dd76", "display_node_id": "3", "parent_node_id": null, "real_node_id": "3"}, "5": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "5", "prompt_id": "3e99ac29-93d7-4e47-bc20-3f9959c0dd76", "display_node_id": "5", "parent_node_id": null, "real_node_id": "5"}}}}
[Worker 0] 
[Worker 0] event: websocket
[Worker 0] data: {"type": "progress_state", "data": {"prompt_id": "3e99ac29-93d7-4e47-bc20-3f9959c0dd76", "nodes": {"3": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "3", "prompt_id": "3e99ac29-93d7-4e47-bc20-3f9959c0dd76", "display_node_id": "3", "parent_node_id": null, "real_node_id": "3"}, "5": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "5", "prompt_id": "3e99ac29-93d7-4e47-bc20-3f9959c0dd76", "display_node_id": "5", "parent_node_id": null, "real_node_id": "5"}, "2": {"value": 0.0, "max": 1.0, "state": "running", "node_id": "2", "prompt_id": "3e99ac29-93d7-4e47-bc20-3f9959c0dd76", "display_node_id": "2", "parent_node_id": null, "real_node_id": "2"}}}}
[Worker 0] 
[Worker 0] event: websocket
[Worker 0] data: {"type": "executing", "data": {"node": "2", "display_node": "2", "prompt_id": "3e99ac29-93d7-4e47-bc20-3f9959c0dd76"}}
[Worker 0] 
[Worker 0] event: websocket
[Worker 0] data: {"type": "executed", "data": {"node": "2", "display_node": "2", "output": {"text": ["[\"https://bizyair-prod.oss-cn-shanghai.aliyuncs.com/outputs/jDEgPFxr4SIK24UL.jpg\"]"]}, "prompt_id": "3e99ac29-93d7-4e47-bc20-3f9959c0dd76"}}
[Worker 0] 
[Worker 0] event: websocket
[Worker 0] data: {"type": "progress_state", "data": {"prompt_id": "3e99ac29-93d7-4e47-bc20-3f9959c0dd76", "nodes": {"3": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "3", "prompt_id": "3e99ac29-93d7-4e47-bc20-3f9959c0dd76", "display_node_id": "3", "parent_node_id": null, "real_node_id": "3"}, "5": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "5", "prompt_id": "3e99ac29-93d7-4e47-bc20-3f9959c0dd76", "display_node_id": "5", "parent_node_id": null, "real_node_id": "5"}, "2": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "2", "prompt_id": "3e99ac29-93d7-4e47-bc20-3f9959c0dd76", "display_node_id": "2", "parent_node_id": null, "real_node_id": "2"}}}}
[Worker 0] 
[Worker 0] event: websocket
[Worker 0] data: {"type": "progress_state", "data": {"prompt_id": "3e99ac29-93d7-4e47-bc20-3f9959c0dd76", "nodes": {"3": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "3", "prompt_id": "3e99ac29-93d7-4e47-bc20-3f9959c0dd76", "display_node_id": "3", "parent_node_id": null, "real_node_id": "3"}, "5": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "5", "prompt_id": "3e99ac29-93d7-4e47-bc20-3f9959c0dd76", "display_node_id": "5", "parent_node_id": null, "real_node_id": "5"}, "2": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "2", "prompt_id": "3e99ac29-93d7-4e47-bc20-3f9959c0dd76", "display_node_id": "2", "parent_node_id": null, "real_node_id": "2"}, "7": {"value": 0.0, "max": 1.0, "state": "running", "node_id": "7", "prompt_id": "3e99ac29-93d7-4e47-bc20-3f9959c0dd76", "display_node_id": "7", "parent_node_id": null, "real_node_id": "7"}}}}
[Worker 0] 
[Worker 0] event: websocket
[Worker 0] data: {"type": "executing", "data": {"node": "7", "display_node": "7", "prompt_id": "3e99ac29-93d7-4e47-bc20-3f9959c0dd76"}}
[Worker 0] 
[Worker 0] event: websocket
[Worker 0] data: {"type": "executed", "data": {"node": "7", "display_node": "7", "output": {"images": [{"filename": "https://sc-maas.oss-cn-shanghai.aliyuncs.com/outputs%2F26f3bb08-7f38-4f98-84d9-540169a5d04f_8122efb9a8bb08fe73bf5aa7eae12b8d_ComfyUI_d541fdcc_00001_.png?OSSAccessKeyId=LTAI5tQnPSzwAnR8NmMzoQq4&Expires=1776399063&Signature=121hJppKkIo86RrwsABhd4wNd%2F4%3D", "subfolder": "", "type": "output"}]}, "prompt_id": "3e99ac29-93d7-4e47-bc20-3f9959c0dd76"}}
[Worker 0] 
[Worker 0] event: websocket
[Worker 0] data: {"type": "progress_state", "data": {"prompt_id": "3e99ac29-93d7-4e47-bc20-3f9959c0dd76", "nodes": {"3": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "3", "prompt_id": "3e99ac29-93d7-4e47-bc20-3f9959c0dd76", "display_node_id": "3", "parent_node_id": null, "real_node_id": "3"}, "5": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "5", "prompt_id": "3e99ac29-93d7-4e47-bc20-3f9959c0dd76", "display_node_id": "5", "parent_node_id": null, "real_node_id": "5"}, "2": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "2", "prompt_id": "3e99ac29-93d7-4e47-bc20-3f9959c0dd76", "display_node_id": "2", "parent_node_id": null, "real_node_id": "2"}, "7": {"value": 1.0, "max": 1.0, "state": "finished", "node_id": "7", "prompt_id": "3e99ac29-93d7-4e47-bc20-3f9959c0dd76", "display_node_id": "7", "parent_node_id": null, "real_node_id": "7"}}}}
[Worker 0] 
[Worker 0] event: websocket
[Worker 0] data: {"type": "execution_success", "data": {"prompt_id": "3e99ac29-93d7-4e47-bc20-3f9959c0dd76", "timestamp": 1776395462336}}
[Worker 0] 
[Worker 0] event: websocket
[Worker 0] data: {"type": "status", "data": {"status": {"exec_info": {"queue_remaining": 0}}}}
[Worker 0] 
[Worker 0] event: websocket
[Worker 0] data: {"type": "executing", "data": {"node": null, "prompt_id": "3e99ac29-93d7-4e47-bc20-3f9959c0dd76"}}
[Worker 0] 
[Worker 0] event: history
[Worker 0] data: {"outputs": {"2": {"text": ["[\"https://bizyair-prod.oss-cn-shanghai.aliyuncs.com/outputs/jDEgPFxr4SIK24UL.jpg\"]"]}, "7": {"images": [{"filename": "https://sc-maas.oss-cn-shanghai.aliyuncs.com/outputs%2F26f3bb08-7f38-4f98-84d9-540169a5d04f_8122efb9a8bb08fe73bf5aa7eae12b8d_ComfyUI_d541fdcc_00001_.png?OSSAccessKeyId=LTAI5tQnPSzwAnR8NmMzoQq4&Expires=1776399063&Signature=121hJppKkIo86RrwsABhd4wNd%2F4%3D", "subfolder": "", "type": "output"}]}}, "metrics": {"pre_process_time": 142.5, "call_comfyui_time": 148780.2, "post_process_time": 690.3, "inference_time": 149613.3}}
[Worker 0] 
[Worker 0] event: end
[Worker 0] data: {"trace_id": "26f3bb08-7f38-4f98-84d9-540169a5d04f"}
[Worker 0] 

=== Results ===
[Worker 0] ✅ seed=1514601691 duration=149.7s outputs={"2": {"text": ["[\"https://bizyair-prod.oss-cn-shanghai.aliyuncs.com/outputs/jDEgPFxr4SIK24UL.jpg\"]"]}, "7": {"images": [{"filename": "https://sc-maas.oss-cn-shanghai.aliyuncs.com/outputs%2F26f3bb08-7f38-4f98-84d9-540169a5d04f_8122efb9a8bb08fe73bf5aa7eae12b8d_ComfyUI_d541fdcc_00001_.png?OSSAccessKeyId=LTAI5tQnPSzwAnR8NmMzoQq4&Expires=1776399063&Signature=121hJppKkIo86RrwsABhd4wNd%2F4%3D", "subfolder": "", "type": "output"}]}}
[Worker 1] ✅ seed=1514601691 duration=77.6s outputs={"2": {"text": ["[\"https://bizyair-prod.oss-cn-shanghai.aliyuncs.com/outputs/KfcH25jmcjTXR0nx.jpg\"]"]}, "7": {"images": [{"filename": "https://sc-maas.oss-cn-shanghai.aliyuncs.com/outputs%2F45998f06-2463-4930-9d9f-207a1667691b_923af0d56bd5d799cc5440832dc17007_ComfyUI_d67e095f_00001_.png?OSSAccessKeyId=LTAI5tQnPSzwAnR8NmMzoQq4&Expires=1776398991&Signature=qLhRpCsnWFgiQzXw3SGP%2F9xKeYo%3D", "subfolder": "", "type": "output"}]}}
[Worker 2] ✅ seed=1514601691 duration=78.8s outputs={"2": {"text": ["[\"https://bizyair-prod.oss-cn-shanghai.aliyuncs.com/outputs/exJN9NW0vjLBIKp6.jpg\"]"]}, "7": {"images": [{"filename": "https://sc-maas.oss-cn-shanghai.aliyuncs.com/outputs%2F93c8eabf-4174-43fe-85d9-fb4db95c4b7e_da1a27370d6e4e5b66cca1ab3de59a7c_ComfyUI_7683a397_00001_.png?OSSAccessKeyId=LTAI5tQnPSzwAnR8NmMzoQq4&Expires=1776398992&Signature=N5gIzXOUwC4QpqY9YhpqmWSA1gE%3D", "subfolder": "", "type": "output"}]}}

Total wall time: 149.7s
Sum of individual times: 306.1s
Speedup: 2.05x
Success: 3/3

🟢 PASS: All workflows succeeded and ran in parallel

所以可以基本判定comfyagent无需做任何修改即可兼容comfyui新增的parallel功能,即使在comfyagent测试脚本中设置更大的并行测试数量,最终的加速比也是约等于comfyui的并行workers数量的:

=== Results ===
[Worker 0] ✅ seed=1514601691 duration=67.9s outputs={"2": {"text": ["[\"https://bizyair-prod.oss-cn-shanghai.aliyuncs.com/outputs/RwCOFD8uudDEv9Aa.jpg\"]"]}, "7": {"images": [{"filename": "https://sc-maas.oss-cn-shanghai.aliyuncs.com/outputs%2Fc79d9ead-e026-455d-85ed-53be13f48716_13c1d53f8712030d41d97a2561045a93_ComfyUI_5016f204_00001_.png?OSSAccessKeyId=LTAI5tQnPSzwAnR8NmMzoQq4&Expires=1776399528&Signature=wWdXxWWBKkREk%2BI04iN2Ad4NeDE%3D", "subfolder": "", "type": "output"}]}}
[Worker 1] ✅ seed=1514601691 duration=78.7s outputs={"2": {"text": ["[\"https://bizyair-prod.oss-cn-shanghai.aliyuncs.com/outputs/81ErGGdKxVRyDvz5.jpg\"]"]}, "7": {"images": [{"filename": "https://sc-maas.oss-cn-shanghai.aliyuncs.com/outputs%2F7263227c-bf58-47b4-9e97-4e8f2d099c34_b9e8b6bacc503172daf40fb140222ddf_ComfyUI_7b96eec9_00001_.png?OSSAccessKeyId=LTAI5tQnPSzwAnR8NmMzoQq4&Expires=1776399539&Signature=xswoyAnTq4QhaZyTlIKI9LP%2FhM4%3D", "subfolder": "", "type": "output"}]}}
[Worker 2] ✅ seed=1514601691 duration=133.7s outputs={"2": {"text": ["[\"https://bizyair-prod.oss-cn-shanghai.aliyuncs.com/outputs/l96fRMsfHzXkwglO.jpg\"]"]}, "7": {"images": [{"filename": "https://sc-maas.oss-cn-shanghai.aliyuncs.com/outputs%2F37c9367d-05e3-4a04-8c75-4e849d0cae39_eaeefb2625f2d193ff92e1cadfc1cc66_ComfyUI_9cee9d28_00001_.png?OSSAccessKeyId=LTAI5tQnPSzwAnR8NmMzoQq4&Expires=1776399594&Signature=TLpn7m5HX%2Fp%2BDAfGzSdtM74XOeI%3D", "subfolder": "", "type": "output"}]}}
[Worker 3] ✅ seed=1514601691 duration=132.3s outputs={"2": {"text": ["[\"https://bizyair-prod.oss-cn-shanghai.aliyuncs.com/outputs/JOxsSYbBXGW4y08g.jpg\"]"]}, "7": {"images": [{"filename": "https://sc-maas.oss-cn-shanghai.aliyuncs.com/outputs%2F78cccc79-9453-45c2-88c2-5dfe4561cbfb_2a1fb26c7537cd487f6ef9712d17e870_ComfyUI_4081f2aa_00001_.png?OSSAccessKeyId=LTAI5tQnPSzwAnR8NmMzoQq4&Expires=1776399592&Signature=w8LZEHWPPZ160ssPHQjmidv7a4U%3D", "subfolder": "", "type": "output"}]}}
[Worker 4] ✅ seed=1514601691 duration=67.3s outputs={"2": {"text": ["[\"https://bizyair-prod.oss-cn-shanghai.aliyuncs.com/outputs/NDVRRL39SM0sVkoJ.jpg\"]"]}, "7": {"images": [{"filename": "https://sc-maas.oss-cn-shanghai.aliyuncs.com/outputs%2F5a4c566c-437a-4a12-8713-600af24cc1ec_4ca96472b0a5b9e273b5ee71ec3fc8aa_ComfyUI_0db59e55_00001_.png?OSSAccessKeyId=LTAI5tQnPSzwAnR8NmMzoQq4&Expires=1776399527&Signature=fwGTEcwnzTSecd%2FjK9X6EhijKnI%3D", "subfolder": "", "type": "output"}]}}
[Worker 5] ✅ seed=1514601691 duration=77.5s outputs={"2": {"text": ["[\"https://bizyair-prod.oss-cn-shanghai.aliyuncs.com/outputs/9ls2JUoufC5f0BN0.jpg\"]"]}, "7": {"images": [{"filename": "https://sc-maas.oss-cn-shanghai.aliyuncs.com/outputs%2F98afb385-30be-4abf-ac62-a82874fcc4cb_67c852742cfce7f952f4d448e327cac4_ComfyUI_667fac5b_00001_.png?OSSAccessKeyId=LTAI5tQnPSzwAnR8NmMzoQq4&Expires=1776399537&Signature=eUV8asq7qTJppiRM%2BNrDsmvh79w%3D", "subfolder": "", "type": "output"}]}}

Total wall time: 133.7s
Sum of individual times: 557.5s
Speedup: 4.17x
Success: 6/6

🟢 PASS: All workflows succeeded and ran in parallel

@ruabbit233
Copy link
Copy Markdown
Collaborator Author

comfyagent测试脚本内容如下所示:

import argparse
import copy
import json
import random
import sys
import threading
import time
from pathlib import Path

import requests


def run_worker(index, workflow, url, results):
    """Run a single workflow request against ComfyAgent's /prompt/stream SSE endpoint.

    Args:
        index: Worker index for identification.
        workflow: A deep-copied workflow dict (seed will be randomized).
        url: ComfyAgent server address (e.g. "127.0.0.1:8000").
        results: Shared list to store result dict at results[index].
    """
    if not url.startswith("http"):
        url = f"http://{url}"

    seed = random.randint(0, 1000000)
    workflow["5"]["inputs"]["seed"] = seed
    request_data = {"prompt": workflow}
    headers = {"Content-Type": "application/json"}

    start_time = time.time()

    try:
        with requests.post(
            f"{url}/prompt/stream", json=request_data, headers=headers, stream=True
        ) as response:
            if response.status_code != 200:
                results[index] = {
                    "success": False,
                    "index": index,
                    "seed": workflow["5"]["inputs"]["seed"],
                    "error": f"HTTP {response.status_code}: {response.text[:200]}",
                    "duration": time.time() - start_time,
                    "outputs": "",
                }
                return

            has_error = False
            outputs = ""
            for line in response.iter_lines(decode_unicode=True):
                if line is None:
                    continue
                # Detect error events from ComfyAgent SSE stream
                if line.startswith("event:") and "error" in line.lower():
                    has_error = True
                if line.startswith("data:"):
                    try:
                        data_str = line[len("data:"):].strip()
                        data = json.loads(data_str)
                        if "outputs" in data:
                            outputs += json.dumps(data["outputs"])
                        # Check for execution_error in websocket events
                        if isinstance(data, dict):
                            if data.get("type") == "execution_error":
                                has_error = True
                            if isinstance(data.get("data"), dict) and data["data"].get(
                                "exception_message"
                            ):
                                has_error = True
                    except (json.JSONDecodeError, AttributeError):
                        pass
                print(f"[Worker {index}] {line}")

        duration = time.time() - start_time
        results[index] = {
            "success": not has_error,
            "index": index,
            "seed": workflow["5"]["inputs"]["seed"],
            "error": None if not has_error else "Error event detected in SSE stream",
            "duration": duration,
            "outputs": outputs,
        }
    except Exception as e:
        results[index] = {
            "success": False,
            "index": index,
            "seed": workflow["5"]["inputs"]["seed"],
            "error": str(e),
            "duration": time.time() - start_time,
            "outputs": outputs,
        }


def main():
    parser = argparse.ArgumentParser(
        description="Test ComfyAgent + ComfyUI parallel workflow execution via /prompt/stream SSE endpoint"
    )
    parser.add_argument("--url", required=True, help="ComfyAgent server address (e.g. 127.0.0.1:8000)")
    parser.add_argument(
        "--parallel",
        type=int,
        default=4,
        metavar="N",
        help="Number of concurrent workflow requests (default: 4)",
    )
    args = parser.parse_args()

    workflow = json.loads((Path(__file__).parent / "workflow_1.json").read_text())

    print("=== ComfyAgent Parallel Test ===")
    print(f"Server: {args.url}")
    print(f"Parallel workflows: {args.parallel}")
    print(f"Workflow: workflow_1.json")
    print()

    results = [None] * args.parallel
    threads = []

    global_start = time.time()

    for i in range(args.parallel):
        workflow_copy = copy.deepcopy(workflow)
        t = threading.Thread(target=run_worker, args=(i, workflow_copy, args.url, results))
        threads.append(t)
        t.start()

    for t in threads:
        t.join()

    global_duration = time.time() - global_start

    print()
    print("=== Results ===")
    success_count = 0
    for r in results:
        if r is None:
            print(f"[Worker ?] NO RESULT")
            continue
        idx = r["index"]
        status_icon = "✅" if r["success"] else "❌"
        print(
            f"[Worker {idx}] {status_icon} seed={r['seed']} duration={r['duration']:.1f}s outputs={(r['outputs'])}",
            end="",
        )
        if r["success"]:
            print()
            success_count += 1
        else:
            print(f" error={r.get('error', 'unknown')}")

    sequential_time = sum(r["duration"] for r in results if r is not None)
    speedup = sequential_time / global_duration if global_duration > 0 else 0

    print()
    print(f"Total wall time: {global_duration:.1f}s")
    print(f"Sum of individual times: {sequential_time:.1f}s")
    print(f"Speedup: {speedup:.2f}x")
    print(f"Success: {success_count}/{args.parallel}")

    if success_count == args.parallel and speedup > 1.3:
        print("\n🟢 PASS: All workflows succeeded and ran in parallel")
    elif success_count == args.parallel:
        print("\n🟡 PASS: All workflows succeeded but parallel speedup is low")
    else:
        print("\n🔴 FAIL: Some workflows did not succeed")

    return 0 if success_count == args.parallel else 1


if __name__ == "__main__":
    sys.exit(main())

Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

该 PR 为 ComfyUI 增加“并行执行工作流”的能力,通过新增 --parallel N CLI 参数启动多个 prompt_worker 线程,以提升在网络/IO 绑定场景下的吞吐量;同时将进度状态从“全局单例”改为“按 prompt_id 隔离”,以适配并行执行下的进度/预览推送。

Changes:

  • 新增 --parallel 参数,并在启动时按并发数创建多个 prompt_worker 线程。
  • 将进度注册表改为按 prompt_id 存储/清理,并在执行链路中传递 client_id 以实现消息定向。
  • 调整执行与进度上报逻辑,避免继续依赖单一的 server.client_id 来路由消息(但仍残留部分全局状态写入)。

Reviewed changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
main.py 根据 --parallel 启动多个 worker;进度 hook 按 prompt 获取 registry 并按 client_id 发送
execution.py 执行链路显式传递 client_id;按 prompt_id 使用/清理 progress registry
comfy_execution/progress.py 全局 progress registry 改为 per-prompt 字典,并新增 remove_progress_state
comfy_api/latest/init.py set_progress 使用 executing context 的 prompt_id 来更新对应 registry
comfy/cli_args.py 增加 --parallel N 参数定义与帮助信息

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread main.py
Comment on lines +484 to +487
num_workers = max(1, args.parallel)
logging.info(f"Starting {num_workers} prompt worker(s)")
for i in range(num_workers):
threading.Thread(target=prompt_worker, daemon=True, args=(prompt_server.prompt_queue, prompt_server,), name=f"prompt_worker-{i}").start()
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new --parallel option changes runtime behavior (multiple prompt worker threads), but there are no integration tests exercising parallel execution (e.g., starting the server with --parallel 2 and verifying two prompts from different clients both complete and don’t cross-contaminate events/progress). Adding a test would help prevent regressions in this concurrency-sensitive path.

Copilot uses AI. Check for mistakes.
Comment thread comfy_execution/progress.py Outdated
Comment on lines +354 to +356
with _progress_registries_lock:
if _progress_registries:
return next(iter(_progress_registries.values()))
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_progress_state() falls back to returning an arbitrary existing registry (next(iter(_progress_registries.values()))) when prompt_id is None or not found. In parallel execution this can route progress updates/handlers to the wrong prompt/client and cause cross-contamination. Suggestion: if prompt_id is None or missing, return a new empty/dummy registry (no handlers) or raise/return None and require callers to pass a valid prompt_id (so you never pick a different prompt’s registry).

Suggested change
with _progress_registries_lock:
if _progress_registries:
return next(iter(_progress_registries.values()))

Copilot uses AI. Check for mistakes.
Comment on lines 53 to 83
@@ -78,7 +79,7 @@ async def set_progress(
preview_size = None if ignore_size_limit else args.preview_size
to_display = (image_format, to_display, preview_size)

get_progress_state().update_progress(
get_progress_state(prompt_id).update_progress(
node_id=node_id,
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set_progress() derives prompt_id from executing_context, but when there is no executing context it becomes None and the code still calls get_progress_state(prompt_id). With the new per-prompt registry design this risks updating the wrong prompt’s registry (or a dummy/no-op registry) and makes behavior dependent on internal fallback logic. Consider requiring an executing context (raise if missing), or extending the API to accept an explicit prompt_id and using that instead of None.

Copilot uses AI. Check for mistakes.
Comment thread execution.py
Comment on lines +719 to +720
self.client_id = extra_data.get("client_id", None)
self.server.client_id = self.client_id
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PromptExecutor.execute_async() still writes self.server.client_id = self.client_id. With multiple prompt workers this global field will be raced/overwritten by concurrent executions, which can break any server logic that relies on a single “currently executing client” (e.g., WebSocket reconnect sending the current node). Consider removing this global mutation in parallel mode and instead tracking executing state per prompt/client (e.g., a mapping on the server keyed by client_id/prompt_id).

Copilot uses AI. Check for mistakes.
Comment thread main.py
hijack_progress(prompt_server)

threading.Thread(target=prompt_worker, daemon=True, args=(prompt_server.prompt_queue, prompt_server,)).start()
num_workers = max(1, args.parallel)
Copy link

Copilot AI Apr 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starting multiple prompt_worker threads introduces true concurrent prompt execution, but several server fields used elsewhere as fallbacks (e.g., PromptServer.client_id / last_node_id / last_prompt_id) are global and not worker-safe. This can cause incorrect routing of progress/executing status in edge cases (notably reconnect logic that assumes a single executing client). Before enabling parallelism by default, consider refactoring those fields to be per-client/per-prompt (or guarding with locks) so parallel mode doesn’t produce cross-client state leakage.

Suggested change
num_workers = max(1, args.parallel)
requested_workers = max(1, args.parallel)
if requested_workers > 1:
logging.warning(
"Parallel prompt workers requested (%s) but are disabled because PromptServer "
"execution fallback state is shared and not worker-safe; starting a single prompt worker instead.",
requested_workers,
)
num_workers = 1

Copilot uses AI. Check for mistakes.
… prompt_id is None 时,会返回另一个工作流的 registry。这可能导致:

- 进度更新写入错误的工作流状态
- WebSocket 消息发给错误的客户端
@ruabbit233
Copy link
Copy Markdown
Collaborator Author

并行工作流执行引入的并发安全问题

背景

Commit 2a1f338c 为 ComfyUI 增加了同时并行多个工作流的能力(多个 prompt_worker 线程),在 API Server 场景下显著提升了吞吐量。但该改动在现有架构上引入了一些并发安全问题,主要集中在全局共享状态的竞态条件和中断机制的误杀伤上。

本文档梳理已发现的问题、修复方案及其利弊,供讨论定夺。


问题总览

编号 严重度 问题 核心原因
P0-1 🔴 严重 /interrupt 误中断/漏中断非目标工作流 全局布尔中断标志,所有 worker 共享
P0-2 🔴 严重 新工作流启动时清除其他工作流的中断信号 execute_async 开头重置全局中断标志
P0-3 🔴 严重 server.client_id 被并发覆盖 多 worker 写入 PromptServer 单例属性
P1 🟠 高 server.last_prompt_id / last_node_id 竞态 多 worker 并发读写共享状态

🔴 P0-1:/interrupt 误中断非目标工作流 / 漏中断目标工作流

现状

中断机制依赖一个模块级全局布尔标志 interrupt_processingcomfy/model_management.py:1809):

interrupt_processing = False

def interrupt_current_processing(value=True):
    with interrupt_processing_mutex:
        interrupt_processing = value  # 所有线程共享

def throw_exception_if_processing_interrupted():
    with interrupt_processing_mutex:
        if interrupt_processing:
            interrupt_processing = False  # 读取后立即重置,只有一个线程能看到
            raise InterruptProcessingException()

/interrupt 端点(server.py:991-1012)虽然支持传入 prompt_id 做定向中断,但最终仍然设置的是同一个全局标志。

问题场景

场景 A — 定向中断误杀:Worker A 跑 prompt-A,Worker B 跑 prompt-B。调用 /interrupt 指定 prompt-A,设置全局标志 True。但 Worker B 可能先执行到 throw_exception_if_processing_interrupted(),于是Worker B 被中断,Worker A 继续运行

场景 B — 全局中断只断一个:3 个工作流并行,调用 /interrupt(不带 prompt_id)。全局标志设为 True,但 throw_exception_if_processing_interrupted() 在检查后立即重置为 False,只有最先检查到标志的那个 worker 被中断,其余两个不受影响。

场景 C — API 节点误中断:ComfyUI 核心的 API 节点(comfy_api_nodes/)在轮询、下载、上传时均通过 is_processing_interrupted() 检查全局标志:

调用点 影响
comfy_api_nodes/util/client.py:288,590,769 API 轮询被意外中断
comfy_api_nodes/util/download_helpers.py:91,155 下载被意外中断
comfy_api_nodes/util/upload_helpers.py:268 上传被意外中断
comfy_api_nodes/nodes_sonilo.py:196 Sonilo 任务被意外终止
bizyengine/bizyengine/misc/nodes.py:18 BizyAir ProgressCallback 被意外中断

BizyAir 的 ProgressCallback 同样调用全局的 throw_exception_if_processing_interrupted(),也会被误杀。

修复方案:per-executor 中断标志

核心思路:让每个 PromptExecutor 持有自己的 _interrupted 标志,通过 threading.local() 将当前线程的中断上下文绑定到对应的 executor,中断检查优先读取 per-executor 标志,回退到全局标志兼容旧代码。

改动 1 — comfy/model_management.py

# 新增:线程局部中断上下文
_interrupt_context = threading.local()

def set_interrupt_context(executor_interrupt_flag):
    """绑定当前线程的中断标志到对应 executor"""
    _interrupt_context.flag = executor_interrupt_flag

def clear_interrupt_context():
    if hasattr(_interrupt_context, 'flag'):
        del _interrupt_context.flag

# 修改:processing_interrupted() 优先检查 per-executor 标志
def processing_interrupted():
    if hasattr(_interrupt_context, 'flag') and _interrupt_context.flag is not None:
        if _interrupt_context.flag[0]:
            return True
    global interrupt_processing, interrupt_processing_mutex
    with interrupt_processing_mutex:
        return interrupt_processing

# 修改:throw_exception_if_processing_interrupted() 优先检查 per-executor 标志
def throw_exception_if_processing_interrupted():
    if hasattr(_interrupt_context, 'flag') and _interrupt_context.flag is not None:
        if _interrupt_context.flag[0]:
            _interrupt_context.flag[0] = False
            raise InterruptProcessingException()
    global interrupt_processing, interrupt_processing_mutex
    with interrupt_processing_mutex:
        if interrupt_processing:
            interrupt_processing = False
            raise InterruptProcessingException()

改动 2 — execution.py PromptExecutor

class PromptExecutor:
    def __init__(self, ...):
        self._interrupted = [False]  # list 包裹,便于引用传递

    def interrupt(self):
        self._interrupted[0] = True

# 新增:executor 注册表
_running_executors: Dict[str, PromptExecutor] = {}
_running_executors_lock = threading.Lock()

改动 3 — server.py /interrupt 端点

@routes.post("/interrupt")
async def post_interrupt(request):
    json_data = await request.json()
    prompt_id = json_data.get('prompt_id')

    if prompt_id:
        executor = _get_executor_by_prompt_id(prompt_id)
        if executor is not None:
            executor.interrupt()  # 精确中断目标
        else:
            logging.info(f"Prompt {prompt_id} is not currently running")
    else:
        # 全局中断所有正在运行的 executor
        for executor in _get_all_running_executors():
            executor.interrupt()
        # 同时设置全局标志(兼容不经过 executor 的旧代码)
        nodes.interrupt_processing()

    return web.Response(status=200)

利弊

优点 缺点 / 风险
中断精确隔离到目标工作流 新增 _running_executors 注册表需要维护
全局中断真正中断所有并行工作流 _interrupt_context 通过 threading.local() 绑定,增加了理解成本
comfy_api_nodes 的所有中断检查自动隔离(它们调用的 processing_interrupted() 会被一并修改) 如果有自定义节点直接调用 interrupt_current_processing() 设置全局标志,per-executor 不会感知——但全局标志仍作为 fallback 存在
完全向后兼容:per-executor 检查失败时回退到全局标志

🔴 P0-2:interrupt_processing(False) 清除其他工作流的中断信号

现状

execution.py:717

async def execute_async(self, prompt, prompt_id, extra_data={}, execute_outputs=[]):
    nodes.interrupt_processing(False)  # 每次执行开始重置全局标志

问题场景

  1. 用户调用 /interrupt 中断 Worker A
  2. 全局标志 interrupt_processing = True
  3. Worker B 恰好在同一时刻开始新 prompt,执行 interrupt_processing(False) 抢先清除标志
  4. Worker A 的节点永远检查不到中断信号

修复方案

与 P0-1 一体修复。将全局重置改为 per-executor 重置:

async def execute_async(self, prompt, prompt_id, extra_data={}, execute_outputs=[]):
    # 删除:nodes.interrupt_processing(False)
    self._interrupted[0] = False  # 只重置自己的标志
    set_interrupt_context(self._interrupted)

利弊

优点 缺点 / 风险
消除竞态条件 如果有自定义节点在执行过程中调用 interrupt_processing(False) 来取消中断,全局标志不会自动重置——但这本身就是错误行为
改动极小(删一行加一行)

🔴 P0-3:server.client_id 被多个线程覆盖

现状

execution.py:719-720

self.client_id = extra_data.get("client_id", None)
self.server.client_id = self.client_id  # 写入共享单例

Commit 已经将 client_id 改为 PromptExecutor 的实例变量,但仍然写入 server.client_id,所有 worker 共享。

问题场景

WebSocket 重连时(server.py:276):

if self.client_id == sid and self.last_node_id is not None:
    await self.send("executing", {"node": self.last_node_id}, sid)

此时 self.client_id 可能已被另一个 worker 覆盖,导致:

  • 客户端 A 重连时,self.client_id 恰好等于客户端 B 的 sid → 客户端 A 收到 B 的节点状态
  • 或者 self.client_id 不等于任何重连客户端的 sid → 重连后收不到任何状态

修复方案:移除 server.client_id 写入,改用 executing_clients 映射

改动 1 — execution.py:删除 self.server.client_id = self.client_id

改动 2 — server.py:新增 executing_clients 字典替代 client_id + last_node_id 的组合:

# PromptServer.__init__ 中新增:
self.executing_clients: Dict[str, str] = {}  # client_id → last_node_id

# websocket_handler 中的重连逻辑改为:
if sid in self.executing_clients:
    node_id = self.executing_clients[sid]
    if node_id is not None:
        await self.send("executing", {"node": node_id}, sid)

改动 3 — execution.py 中节点执行时更新 executing_clients

# 替代 server.last_node_id = display_node_id
if client_id is not None:
    server.executing_clients[client_id] = display_node_id

改动 4 — prompt_worker 执行完成后清理

if e.client_id is not None:
    server_instance.executing_clients.pop(e.client_id, None)
    server_instance.send_sync("executing", {"node": None, "prompt_id": prompt_id}, e.client_id)

利弊

优点 缺点 / 风险
彻底消除 client_id 竞态 新增 executing_clients 字典需要维护,异常退出时需确保清理
同一客户端提交多个工作流时,仍然能追踪到最近执行的节点 同一个 client_id 只能存一个 last_node_id,同一客户端的多个并行工作流只有最后一个可见——这在现有架构下是合理的,前端也只有一个 WebSocket 连接
不影响 BizyAir(BizyAir 不读写 server.client_id

🟠 P1:server.last_prompt_id / server.last_node_id 竞态

现状

  • main.py:305server_instance.last_prompt_id = prompt_id(每个 worker 启动时覆盖)
  • execution.py:484server.last_node_id = display_node_id(执行节点时覆盖)
  • execution.py:805self.server.last_node_id = None(执行结束时清空——可能清空另一个 worker 设置的值)

受影响方

  1. hijack_progress()main.py:384-387:作为 prompt_id / node_id 的 fallback。但 Commit 已通过 CurrentNodeContext(ContextVar)传播这两个值,fallback 基本不会触达。
  2. BizyAir nodes_base.py:56-60
if PromptServer.instance is not None and PromptServer.instance.last_prompt_id is not None:
    extra_data["prompt_id"] = PromptServer.instance.last_prompt_id

BizyAir 在非 server 模式下读取 last_prompt_id 作为 API 请求追踪 ID。并行模式下可能读到另一个工作流的 ID。

修复方案

last_prompt_id

  • 移除 main.py:305server_instance.last_prompt_id = prompt_id
  • (P0-3 修复已覆盖 last_node_idexecuting_clients
  • 移除 execution.py:805self.server.last_node_id = None

hijack_progress()

  • 移除对 server.last_prompt_id / server.last_node_id 的 fallback,完全依赖 CurrentNodeContext

BizyAir 兼容

  • 影响程度低:API server 场景下 BIZYAIR_SERVER_MODE 通常为 True,通过 _meta 传递 prompt_id,不走这个分支
  • 如果需要兼容,可给 BizyAir 提 PR 改用 get_executing_context()
from comfy_execution.utils import get_executing_context
context = get_executing_context()
if context is not None:
    extra_data["prompt_id"] = context.prompt_id

利弊

优点 缺点 / 风险
消除共享状态竞态 BizyAir 非 server 模式下 last_prompt_id 不再可靠——但 API server 场景不受影响
简化代码,移除不必要的 fallback 需要确认 CurrentNodeContext 在所有代码路径上都已正确设置
与 P0-3 修复一致

改动范围总结

文件 涉及问题 改动量
comfy/model_management.py P0-1, P0-2 新增 _interrupt_context、修改 processing_interrupted()throw_exception_if_processing_interrupted()
execution.py P0-1, P0-2, P0-3, P1 PromptExecutor 新增 _interrupted / interrupt() / 注册表;execute_async 移除全局重置和共享状态写入;移除 server.last_node_id = None
server.py P0-1, P0-3, P1 /interrupt 改为 per-executor 中断;新增 executing_clients;修改 WebSocket 重连逻辑
main.py P1 移除 hijack_progress 的 fallback;移除 server.last_prompt_id 写入
bizyengine/core/nodes_base.py P1(可选) 改用 get_executing_context() 替代 PromptServer.instance.last_prompt_id

BizyAir 无需强制修改——所有改动保持向后兼容,BizyAir 的 ProgressCallback 调用的 throw_exception_if_processing_interrupted() 会自动走 per-executor 路径。


需要讨论的决策点

  1. per-executor 中断 vs 全局中断的兼容性策略:是否保留全局标志作为 fallback?保留意味着旧的自定义节点中断行为不变,但也意味着全局中断仍可能"泄漏"到非目标工作流。

  2. executing_clients 的边界情况:同一 client_id 提交多个并行工作流时,只记录最后一个 node_id,这是否可接受?前端 UI 场景下单个客户端通常只有一个可视工作流,但 API server 场景下一个 client_id 可能关联多个工作流。

  3. BizyAir 是否同步修改nodes_base.py 中对 last_prompt_id 的读取在并行模式下不可靠,但 API server 场景下走 _meta 分支不受影响。是现在同步修还是等 BizyAir 发现问题再修?

  4. /interrupt 不带 prompt_id 时的语义:全局中断应该中断所有并行工作流,还是保持旧行为只中断"当前"那个?建议改为中断全部,但这会改变 API 行为。

  5. comfy_api_nodesis_processing_interrupted() 同步修改:这与 P0-1 一体修复,processing_interrupted() 改后自动生效,无需逐文件改。确认这个方案是否可接受。


Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants