文件预览

batch.py

查看 bizyair-skill 技能包中的文件内容。

文件内容

scripts/batch.py

"""批量参数卡 / 批量执行的处理。

两个主入口:
  - handle_batch_prefill: --batch-prefill。串联多个任务的预填卡,**走路 B 渲染**
                          (remote.build_remote_info_output → build_remote_prefilled_card)。
                          per-batch 缓存:同一 model id 在一批里只 fetch 一次 contract。
  - handle_batch_run:     --batch-run。把每个任务写成独立子进程
                          (`python3 dispatch.py --model X --confirm-run ...`),
                          异步起一个 worker 调度它们,主进程立刻返回 status JSON。
                          worker 本身也是一个 dispatch.py 子进程
                          (`python3 dispatch.py --batch-worker-file <file>`)。

【⚠️ subprocess 入口都是 dispatch.py,不是 batch.py 自身】
build_batch_child_command 和 handle_batch_run 里启动 worker 的地方,都用
dispatch_common.DISPATCH_SCRIPT 而不是 Path(__file__)。原因:batch.py 没有 main(),
之前曾误用 __file__ 导致整个 --batch-run 静默退出(进程起来 0 秒就死)。

并发 / 任务数都受 config.json 的 batch.max_concurrency / max_tasks + 系统硬上限双重限制
(见 resolve_batch_policy 和 enforce_batch_task_limits)。
"""
from __future__ import annotations
import argparse, json, os, subprocess, sys, time, traceback
from pathlib import Path
from paths import resolve_batch_runs_dir
from typing import Any

import card
import dispatch_common
import remote
from dispatch_common import (
    COMPILER_RULES, CONFIGURED_MAX_BATCH_CONCURRENCY_FALLBACK,
    CONFIGURED_MAX_BATCH_TASKS_FALLBACK, ROUTE_TABLE,
    RUNTIME_STATE_DISABLED_ENV, SCENE_NUMBER_MAP,
    SYSTEM_MAX_BATCH_CONCURRENCY, SYSTEM_MAX_BATCH_TASKS,
)

# 搜索路由(菜单 6 / 7 / v6 / v7)是交互式 picker,不是可执行模型。batch 不允许跑这类 slug。
SEARCH_FLOW_SLUGS = {
    'remote-search-flow', 'remote-video-search-flow',
    'modelzoo-search-flow', 'modelzoo-video-search-flow',
}


def reject_search_flow_target(target: str, *, task_index: int, phase: str) -> None:
    """批量任务里如果出现搜索路由 slug(用户写 scene: 6 / 7 会触发),直接返回友好错误。"""
    if target not in SEARCH_FLOW_SLUGS:
        return
    print(json.dumps({
        'error': 'BATCH_SEARCH_FLOW_NOT_SUPPORTED',
        'message': (
            f'批量任务里第 {task_index} 个解析出来的是搜索入口(菜单 6 / 7),不是可执行模型。'
            '搜索入口属于交互式查找,无法直接 batch。'
            '请先单独跑搜索拿到具体的模型 slug 或 ModelZoo endpoint,再放进 batch。'
        ),
        'slug': target,
        'task_index': task_index,
        'phase': phase,
    }, ensure_ascii=False, indent=2), file=sys.stderr)
    sys.exit(2)

def resolve_batch_policy() -> dict:
    config = dispatch_common.load_skill_config()
    batch_cfg = config.get('batch') if isinstance(config.get('batch'), dict) else {}
    configured_max_concurrency = batch_cfg.get('max_concurrency', CONFIGURED_MAX_BATCH_CONCURRENCY_FALLBACK)
    configured_max_tasks = batch_cfg.get('max_tasks', CONFIGURED_MAX_BATCH_TASKS_FALLBACK)
    try:
        configured_max_concurrency = int(configured_max_concurrency)
    except Exception:
        configured_max_concurrency = CONFIGURED_MAX_BATCH_CONCURRENCY_FALLBACK
    try:
        configured_max_tasks = int(configured_max_tasks)
    except Exception:
        configured_max_tasks = CONFIGURED_MAX_BATCH_TASKS_FALLBACK
    if configured_max_concurrency < 1:
        configured_max_concurrency = CONFIGURED_MAX_BATCH_CONCURRENCY_FALLBACK
    if configured_max_tasks < 1:
        configured_max_tasks = CONFIGURED_MAX_BATCH_TASKS_FALLBACK
    configured_max_concurrency = min(configured_max_concurrency, SYSTEM_MAX_BATCH_CONCURRENCY)
    configured_max_tasks = min(configured_max_tasks, SYSTEM_MAX_BATCH_TASKS)
    return {'max_concurrency': configured_max_concurrency, 'system_max_concurrency': SYSTEM_MAX_BATCH_CONCURRENCY, 'max_tasks': configured_max_tasks, 'system_max_tasks': SYSTEM_MAX_BATCH_TASKS}

def build_cli_arg_list(flag: str, values: list[str] | None) -> list[str]:
    output: list[str] = []
    for value in values or []:
        output += [flag, str(value)]
    return output

def listify_task_value(value) -> list[str]:
    if value is None:
        return []
    if isinstance(value, list):
        return [str(item) for item in value if str(item or '').strip()]
    text = str(value).strip()
    return [text] if text else []

def task_label(task: dict, index: int) -> str:
    label = str(task.get('label') or task.get('title') or '').strip()
    return label if label else f'任务 {index}'

def parse_batch_payload(raw: str) -> tuple[list[dict], int | None]:
    parsed = json.loads(raw)
    concurrency = None
    tasks = parsed
    if isinstance(parsed, dict):
        tasks = parsed.get('tasks')
        concurrency = parsed.get('concurrency')
    if not isinstance(tasks, list) or not tasks:
        raise ValueError('batch_json must be a non-empty JSON array or an object with a tasks array')
    normalized: list[dict] = []
    for (index, item) in enumerate(tasks, start=1):
        if isinstance(item, str):
            normalized.append({'prompt': item})
            continue
        if isinstance(item, dict):
            normalized.append(dict(item))
            continue
        raise ValueError(f'batch task #{index} must be an object or string')
    return (normalized, concurrency)

def build_task_args(base_args: argparse.Namespace, task: dict) -> argparse.Namespace:
    prompt = task.get('prompt')
    card_input = task.get('card_input')
    task_args = argparse.Namespace(api_key=base_args.api_key, check=False, wallet=False, browse=False, search=None, modality=None, stability=None, info=None, task=task.get('task') or getattr(base_args, 'task', None), model=task.get('model') or base_args.model, scene=task.get('scene') or getattr(base_args, 'scene', None), image_menu=False, video_menu=False, prompt='' if prompt is None else str(prompt), param_help=bool(task.get('param_help', False)), prefill_card=False, card_input='' if card_input is None else str(card_input), run_with_defaults=bool(task.get('run_with_defaults', getattr(base_args, 'run_with_defaults', False))), confirm_run=bool(task.get('confirm_run', False)), image=listify_task_value(task.get('image')), audio=listify_task_value(task.get('audio')), video=listify_task_value(task.get('video')), aspect_ratio=task.get('aspect_ratio'), resolution=task.get('resolution'), steps=task.get('steps'), duration=task.get('duration'), seed=task.get('seed'), random_seed=bool(task.get('random_seed', False)), model_name=task.get('model_name'), width=task.get('width'), height=task.get('height'), param=listify_task_value(task.get('param')), output=task.get('output'), sync=bool(task.get('sync', getattr(base_args, 'sync', False))), batch_json=None, batch_prefill=False, batch_run=False, batch_concurrency=None, batch_concurrency_approved=False, batch_task_count_approved=False, batch_worker_file=None)
    return task_args

def should_batch_use_defaults(task_args: argparse.Namespace, model: str | None) -> bool:
    if task_args.run_with_defaults or card.allows_explicit_defaults(task_args):
        return True
    if model in COMPILER_RULES and (not card.has_explicit_tuning(task_args)):
        return True
    return False

def build_batch_prefill_reply(tasks: list[dict]) -> str:
    lines = ['我先把这批任务的参数确认卡分开整理好了:', '']
    for item in tasks:
        lines.append(f"### {item['label']}")
        lines.append(item['card_markdown'].strip())
        lines.append('')
    lines.append('你可以逐条改,也可以直接说“全部开跑”,我再一起提交。')
    return '\n'.join(lines).strip()

def build_batch_run_reply(tasks: list[dict]) -> str:
    lines = ['这批任务我已经按独立任务分别提交了:', '']
    for item in tasks:
        lines.append(f"- **{item['label']}**:已进入批量队列,日志在 `{item['log_path']}`")
    lines.append('')
    lines.append('我会按每个任务各自的结果继续往下接,不会把它们揉成一条。')
    return '\n'.join(lines).strip()

def redact_command_args(args: list[str]) -> list[str]:
    redacted = list(args)
    sensitive_flags = {'--api-key'}
    for (idx, value) in enumerate(redacted[:-1]):
        if value in sensitive_flags:
            redacted[idx + 1] = '***'
    return redacted

def resolve_batch_concurrency(base_args: argparse.Namespace, requested: int | None, task_count: int) -> tuple[int, int, dict]:
    policy = resolve_batch_policy()
    cli_value = getattr(base_args, 'batch_concurrency', None)
    raw_value = cli_value if cli_value is not None else requested
    if raw_value is None:
        raw_value = policy['max_concurrency']
    try:
        normalized = int(raw_value)
    except Exception as exc:
        raise ValueError('batch concurrency must be an integer') from exc
    if normalized < 1:
        raise ValueError('batch concurrency must be at least 1')
    effective = min(normalized, policy['system_max_concurrency'], max(task_count, 1))
    return (normalized, effective, policy)

def write_batch_status_file(status_path: Path, payload: dict) -> None:
    status_path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + '\n', encoding='utf-8')

def build_public_batch_payload(payload: dict[str, Any]) -> dict[str, Any]:
    public_payload = dict(payload)
    public_payload.pop('system_max_concurrency', None)
    public_payload.pop('system_max_tasks', None)
    return public_payload

def enforce_batch_task_limits(base_args: argparse.Namespace, *, task_count: int, policy: dict, phase: str) -> None:
    if task_count > policy['system_max_tasks']:
        print(json.dumps({'error': 'BATCH_TASK_LIMIT_EXCEEDED', 'message': f"Batch task count {task_count} exceeds hard system max {policy['system_max_tasks']} during {phase}. Split into smaller batches.", 'phase': phase, 'task_count': task_count, 'configured_max_tasks': policy['max_tasks'], 'system_max_tasks': policy['system_max_tasks']}, ensure_ascii=False, indent=2), file=sys.stderr)
        sys.exit(2)
    if task_count > policy['max_tasks'] and (not getattr(base_args, 'batch_task_count_approved', False)):
        print(json.dumps({'error': 'BATCH_TASK_COUNT_REQUIRES_APPROVAL', 'message': f"Batch task count {task_count} exceeds configured max_tasks {policy['max_tasks']} during {phase}. Ask for explicit user approval, then rerun with --batch-task-count-approved.", 'phase': phase, 'task_count': task_count, 'configured_max_tasks': policy['max_tasks'], 'system_max_tasks': policy['system_max_tasks']}, ensure_ascii=False, indent=2), file=sys.stderr)
        sys.exit(2)

def build_batch_child_command(task: dict) -> list[str]:
    """拼一个 batch 子任务的 dispatch.py 子进程命令。
    必须用 DISPATCH_SCRIPT,不要用 Path(__file__)(batch.py 没有 main,会静默退出)。
    """
    command = [sys.executable, str(dispatch_common.DISPATCH_SCRIPT), '--model', str(task['model'])]
    if task.get('prompt'):
        command += ['--prompt', str(task['prompt'])]
    command += build_cli_arg_list('--image', task.get('image'))
    command += build_cli_arg_list('--audio', task.get('audio'))
    command += build_cli_arg_list('--video', task.get('video'))
    if task.get('aspect_ratio'):
        command += ['--aspect-ratio', str(task['aspect_ratio'])]
    if task.get('resolution'):
        command += ['--resolution', str(task['resolution'])]
    if task.get('steps') is not None:
        command += ['--steps', str(task['steps'])]
    if task.get('duration') is not None:
        command += ['--duration', str(task['duration'])]
    if task.get('seed') is not None:
        command += ['--seed', str(task['seed'])]
    if task.get('random_seed'):
        command += ['--random-seed']
    if task.get('model_name'):
        command += ['--model-name', str(task['model_name'])]
    if task.get('width') is not None:
        command += ['--width', str(task['width'])]
    if task.get('height') is not None:
        command += ['--height', str(task['height'])]
    command += build_cli_arg_list('--param', task.get('param'))
    command += ['--output', str(task['output_path'])]
    if task.get('sync'):
        command += ['--sync']
    if task.get('run_mode') == 'run_with_defaults':
        command += ['--run-with-defaults']
    else:
        command += ['--confirm-run']
    return command

def launch_logged_process(command: list[str], log_path: Path, *, env: dict[str, str] | None=None) -> tuple[subprocess.Popen, object]:
    log_handle = open(log_path, 'w', encoding='utf-8')
    proc = subprocess.Popen(command, stdout=log_handle, stderr=subprocess.STDOUT, text=True, encoding='utf-8', errors='replace', stdin=subprocess.DEVNULL, env=env)
    return (proc, log_handle)

def append_supervisor_log(log_path: Path, message: str) -> None:
    timestamp = time.strftime('%Y-%m-%d %H:%M:%S')
    log_path.parent.mkdir(parents=True, exist_ok=True)
    with open(log_path, 'a', encoding='utf-8') as handle:
        handle.write(f'[{timestamp}] {message}\n')

def mark_batch_tasks_failed(payload: dict, *, reason: str, running_only: bool=False) -> None:
    for task in payload.get('tasks') or []:
        if not isinstance(task, dict):
            continue
        if running_only and task.get('status') != 'running':
            continue
        if not running_only and task.get('status') not in {'queued', 'running'}:
            continue
        task['status'] = 'failed'
        task.setdefault('failure_reason', reason)

def handle_batch_worker(worker_file: str) -> None:
    worker_path = Path(worker_file)
    payload = json.loads(worker_path.read_text(encoding='utf-8'))
    tasks = payload.get('tasks') or []
    concurrency = int(payload.get('effective_concurrency') or CONFIGURED_MAX_BATCH_CONCURRENCY_FALLBACK)
    status_path = Path(payload['status_path'])
    supervisor_log = Path(payload.get('supervisor_log') or status_path.with_name('batch-supervisor.log'))
    payload['scheduler_status'] = 'running'
    payload['started_at'] = int(time.time())
    write_batch_status_file(status_path, payload)
    append_supervisor_log(supervisor_log, f'worker started with {len(tasks)} task(s), concurrency={concurrency}')
    active: list[dict] = []
    next_index = 0
    try:
        while next_index < len(tasks) or active:
            while next_index < len(tasks) and len(active) < concurrency:
                task = tasks[next_index]
                child_env = dict(os.environ)
                child_env[RUNTIME_STATE_DISABLED_ENV] = '1'
                (proc, log_handle) = launch_logged_process(build_batch_child_command(task), Path(task['log_path']), env=child_env)
                task['pid'] = proc.pid
                task['status'] = 'running'
                append_supervisor_log(supervisor_log, f"launched task#{task.get('index')} pid={proc.pid} label={task.get('label')}")
                active.append({'proc': proc, 'log_handle': log_handle, 'task': task})
                next_index += 1
                write_batch_status_file(status_path, payload)
            if not active:
                break
            time.sleep(1)
            still_active: list[dict] = []
            for entry in active:
                proc = entry['proc']
                task = entry['task']
                ret = proc.poll()
                if ret is None:
                    still_active.append(entry)
                    continue
                entry['log_handle'].close()
                task['exit_code'] = ret
                task['status'] = 'completed' if ret == 0 else 'failed'
                append_supervisor_log(supervisor_log, f"task#{task.get('index')} pid={task.get('pid')} finished status={task['status']} exit_code={ret}")
                write_batch_status_file(status_path, payload)
            active = still_active
    except Exception:
        mark_batch_tasks_failed(payload, reason='worker_crashed')
        payload['scheduler_status'] = 'failed'
        payload['worker_error'] = traceback.format_exc()
        write_batch_status_file(status_path, payload)
        append_supervisor_log(supervisor_log, payload['worker_error'].rstrip())
        raise
    finally:
        for entry in active:
            try:
                entry['log_handle'].close()
            except Exception:
                pass
    payload['scheduler_status'] = 'completed'
    payload['finished_at'] = int(time.time())
    write_batch_status_file(status_path, payload)
    append_supervisor_log(supervisor_log, 'worker finished')

def handle_batch_prefill(base_args: argparse.Namespace) -> None:
    """批量预填卡。每个任务都通过路 B 渲染(远端 contract 驱动)。

    per-batch contract 缓存(_info_cache):同一 model id 在一批里只 fetch 一次。
    比如 5 个任务都用 ChatGPT Image2,只打一次 BizyAir API 拿 contract。
    """
    (tasks, _) = parse_batch_payload(base_args.batch_json)
    policy = resolve_batch_policy()
    enforce_batch_task_limits(base_args, task_count=len(tasks), policy=policy, phase='batch_prefill')
    dispatch_common.ensure_api_key_ready(base_args.api_key)
    _info_cache: dict[str, dict] = {}
    payload_tasks = []
    for (index, task) in enumerate(tasks, start=1):
        task_args = build_task_args(base_args, task)
        model = task_args.model
        if task_args.scene:
            model = SCENE_NUMBER_MAP.get(task_args.scene, task_args.scene)
        target = dispatch_common.resolve_model(task_args.task, model)
        reject_search_flow_target(target, task_index=index, phase='batch_prefill')
        card.apply_compiled_selection(task_args, target)
        if target in ROUTE_TABLE:
            # 固定模型:用 ModelZoo detail 渲染 prefill card
            import modelzoo
            import api as _api
            endpoint = ROUTE_TABLE[target]['endpoint']
            cache_key = endpoint
            if cache_key not in _info_cache:
                key = _api.require_api_key(base_args.api_key)
                detail_result = modelzoo.get_detail(key, endpoint)
                detail_data = (detail_result.get('data') or {}).get('data') or detail_result.get('data') or {}
                _info_cache[cache_key] = detail_data
            detail_data = _info_cache[cache_key]
            input_params = detail_data.get('input_params') or []
            param_summary = ', '.join(f"{p.get('field_label', p.get('field_name'))}" for p in input_params if p.get('field_name'))
            card_markdown = f"**{dispatch_common.display_model_name(target)}** (ModelZoo: {endpoint})\n参数:{param_summary}"
        else:
            # 非固定模型:旧 webapp 路径
            app_id = target
            if app_id not in _info_cache:
                _info_cache[app_id] = remote.build_remote_info_output(app_id, api_key_arg=base_args.api_key, include_workflow=True)
            info_out = _info_cache[app_id]
            summary = info_out.get('summary') or {}
            card_markdown = remote.build_remote_prefilled_card(summary, task_args, resolved_object_id=app_id)
        payload_tasks.append({'index': index, 'label': task_label(task, index), 'model': target, 'display_name': dispatch_common.display_model_name(target), 'card_markdown': card_markdown})
    print(json.dumps({'mode': 'batch_prefill', 'task_count': len(payload_tasks), 'tasks': payload_tasks, 'reply_markdown': build_batch_prefill_reply(payload_tasks)}, ensure_ascii=False, indent=2))

def handle_batch_run(base_args: argparse.Namespace) -> None:
    (tasks, requested_concurrency) = parse_batch_payload(base_args.batch_json)
    (requested_concurrency, effective_concurrency, policy) = resolve_batch_concurrency(base_args, requested_concurrency, len(tasks))
    enforce_batch_task_limits(base_args, task_count=len(tasks), policy=policy, phase='batch_run')
    if requested_concurrency > policy['system_max_concurrency']:
        print(json.dumps({'error': 'BATCH_CONCURRENCY_LIMIT_EXCEEDED', 'message': f"Requested concurrency {requested_concurrency} exceeds hard system max {policy['system_max_concurrency']}. Lower the concurrency and try again.", 'requested_concurrency': requested_concurrency, 'configured_max_concurrency': policy['max_concurrency'], 'system_max_concurrency': policy['system_max_concurrency']}, ensure_ascii=False, indent=2), file=sys.stderr)
        sys.exit(2)
    if requested_concurrency > policy['max_concurrency'] and (not getattr(base_args, 'batch_concurrency_approved', False)):
        print(json.dumps({'error': 'BATCH_CONCURRENCY_REQUIRES_APPROVAL', 'message': f"Requested concurrency {requested_concurrency} exceeds configured max_concurrency {policy['max_concurrency']}. Ask for explicit user approval, then rerun with --batch-concurrency-approved.", 'requested_concurrency': requested_concurrency, 'configured_max_concurrency': policy['max_concurrency'], 'system_max_concurrency': policy['system_max_concurrency']}, ensure_ascii=False, indent=2), file=sys.stderr)
        sys.exit(2)
    batch_dir = resolve_batch_runs_dir() / time.strftime('%Y%m%d-%H%M%S')
    batch_dir.mkdir(parents=True, exist_ok=True)
    status_path = batch_dir / 'batch-status.json'
    worker_file = batch_dir / 'batch-payload.json'
    supervisor_log = batch_dir / 'batch-supervisor.log'
    payload_tasks = []
    for (index, task) in enumerate(tasks, start=1):
        task_args = build_task_args(base_args, task)
        model = task_args.model
        if task_args.scene:
            model = SCENE_NUMBER_MAP.get(task_args.scene, task_args.scene)
        target = dispatch_common.resolve_model(task_args.task, model)
        reject_search_flow_target(target, task_index=index, phase='batch_run')
        card.apply_compiled_selection(task_args, target)
        output_path = str((batch_dir / f'task-{index:02d}-output').resolve())
        log_path = batch_dir / f'task-{index:02d}.log'
        if task_args.sync:
            child_sync = True
        else:
            child_sync = False
        if should_batch_use_defaults(task_args, target):
            run_mode = 'run_with_defaults'
        else:
            run_mode = 'confirm_run'
        payload_tasks.append({'index': index, 'label': task_label(task, index), 'model': target, 'display_name': dispatch_common.display_model_name(target), 'status': 'queued', 'log_path': str(log_path.resolve()), 'output_path': str(Path(task.get('output') or output_path).resolve()), 'prompt': task_args.prompt, 'image': list(task_args.image or []), 'audio': list(task_args.audio or []), 'video': list(task_args.video or []), 'aspect_ratio': task_args.aspect_ratio, 'resolution': task_args.resolution, 'steps': task_args.steps, 'duration': task_args.duration, 'seed': task_args.seed, 'random_seed': bool(task_args.random_seed), 'model_name': task_args.model_name, 'width': task_args.width, 'height': task_args.height, 'param': list(task_args.param or []), 'sync': child_sync, 'run_mode': run_mode, 'command_preview': redact_command_args(build_batch_child_command({'model': target, 'prompt': task_args.prompt, 'image': list(task_args.image or []), 'audio': list(task_args.audio or []), 'video': list(task_args.video or []), 'aspect_ratio': task_args.aspect_ratio, 'resolution': task_args.resolution, 'steps': task_args.steps, 'duration': task_args.duration, 'seed': task_args.seed, 'random_seed': bool(task_args.random_seed), 'model_name': task_args.model_name, 'width': task_args.width, 'height': task_args.height, 'param': list(task_args.param or []), 'output_path': str(Path(task.get('output') or output_path).resolve()), 'sync': child_sync, 'run_mode': run_mode}))})
    payload = {'mode': 'batch_run', 'version': 2, 'task_count': len(payload_tasks), 'requested_concurrency': requested_concurrency, 'effective_concurrency': effective_concurrency, 'configured_max_concurrency': policy['max_concurrency'], 'system_max_concurrency': policy['system_max_concurrency'], 'configured_max_tasks': policy['max_tasks'], 'system_max_tasks': policy['system_max_tasks'], 'batch_dir': str(batch_dir.resolve()), 'status_path': str(status_path.resolve()), 'supervisor_log': str(supervisor_log.resolve()), 'scheduler_status': 'queued', 'tasks': payload_tasks}
    write_batch_status_file(status_path, payload)
    worker_file.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + '\n', encoding='utf-8')
    worker_env = dict(os.environ)
    if base_args.api_key:
        worker_env['BIZYAIR_API_KEY'] = base_args.api_key
    worker_env[RUNTIME_STATE_DISABLED_ENV] = '1'
    log_handle = open(supervisor_log, 'a', encoding='utf-8')
    try:
        worker_proc = subprocess.Popen([sys.executable, str(dispatch_common.DISPATCH_SCRIPT), '--batch-worker-file', str(worker_file.resolve())], stdout=log_handle, stderr=subprocess.STDOUT, stdin=subprocess.DEVNULL, text=True, encoding='utf-8', errors='replace', env=worker_env, start_new_session=True, close_fds=True)
    except Exception as exc:
        log_handle.write(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] failed to launch batch worker: {exc}\n")
        log_handle.flush()
        payload['scheduler_status'] = 'failed'
        mark_batch_tasks_failed(payload, reason='worker_launch_failed')
        write_batch_status_file(status_path, payload)
        raise
    finally:
        log_handle.close()
    payload['scheduler_pid'] = worker_proc.pid
    payload['scheduler_status'] = 'running'
    payload['reply_markdown'] = f"这批任务我已经按独立任务排进批量执行队列了。\n\n- 当前配置任务上限:{policy['max_tasks']}\n- 这次任务数:{len(payload_tasks)}\n- 当前配置并发上限:{policy['max_concurrency']}\n- 你这次请求并发:{requested_concurrency}\n- 实际并发上限:{effective_concurrency}\n\n" + build_batch_run_reply(payload_tasks)
    write_batch_status_file(status_path, payload)
    print(json.dumps(build_public_batch_payload(payload), ensure_ascii=False, indent=2))