文件内容
scripts/contract.py
"""contract.py — BizyAir 远端对象的输入字段契约 / 节点判定。
存在的原因:BizyAir 上不同 app/workflow 的字段名极度不一致(prompt / text /
CLIPTextEncode.text / 27:text / BizyAir_Seedream4.prompt 等),需要一层规则
把字段类型分清楚,用于 info 输出和预填卡的展示标注。
注意:prompt 识别相关函数(is_remote_prompt_input_node / get_supported_remote_prompt_binding_keys 等)
现在仅用于 info 输出和预填卡的展示标注,不参与实际执行路径的字段映射。
执行时的 prompt 写入由 api.build_remote_run_payload 中的 --prompt / --param 逻辑处理。
主要 API:
- resolve_remote_input_contract(execution_target, webapp_detail, workflow): 主入口,
把多个数据源的 input_nodes 合并成一份 resolved_contract dict
- is_negative_prompt_key: 判定是否为负向提示词字段
- is_text_input_field: 判定 field_type 是不是文本输入类型
- classify_remote_media_slot: 判定字段是否为媒体上传槽
- remote_contract_supports(contract, logical_name): 卡片渲染时检查"该字段在不在 contract 里"
"""
from __future__ import annotations
import argparse, json, re
from typing import Any
import api
import common
from common import (
EXPLICIT_PROMPT_LABELS, KNOWN_PROMPT_FIELD_NAMES,
KNOWN_PROMPT_NODE_MARKERS, PROMPT_EXCLUDE_EN, PROMPT_EXCLUDE_ZH,
REMOTE_CONTRACT_SOURCE_PRIORITY, REMOTE_EXPOSED_CONTRACT_SOURCES,
REMOTE_GENERIC_PROMPT_ALIASES, TEXT_INPUT_FIELD_TYPES,
add_unique_key,
)
def is_negative_prompt_key(field_name: str | None, field_label: str | None=None) -> bool:
raw = ' '.join([str(field_name or '').strip(), str(field_label or '').strip()])
lowered = raw.lower()
if 'negative' in lowered and 'prompt' in lowered:
return True
# 中文负向提示词识别
chinese_negatives = ('反向提示词', '负向提示词', '负面提示词', '反向', '负向')
return any(marker in raw for marker in chinese_negatives)
def normalize_prompt_label(value: Any) -> str:
text = str(value or '').strip()
text = re.sub('[\\s::_\\-()\\[\\]{}]+', '', text)
return text.lower()
def has_prompt_excluded_marker(*parts: Any) -> bool:
raw_parts = [str(part or '').strip() for part in parts if str(part or '').strip()]
if not raw_parts:
return False
joined = ' '.join(raw_parts)
lowered = joined.lower()
if any((marker in lowered for marker in PROMPT_EXCLUDE_EN)):
return True
return any((marker in joined for marker in PROMPT_EXCLUDE_ZH))
def is_explicit_prompt_label(label: Any) -> bool:
normalized = normalize_prompt_label(label)
return normalized in EXPLICIT_PROMPT_LABELS
def is_known_prompt_node_type(node_type: Any) -> bool:
lowered = str(node_type or '').strip().lower()
if not lowered or has_prompt_excluded_marker(lowered) or is_negative_prompt_key(lowered):
return False
return any((marker in lowered for marker in KNOWN_PROMPT_NODE_MARKERS)) or lowered == 'prompt' or lowered.endswith('.prompt')
def is_known_prompt_binding_key(value: Any) -> bool:
text = str(value or '').strip()
lowered = text.lower()
if not lowered or has_prompt_excluded_marker(text) or is_negative_prompt_key(lowered):
return False
if lowered in KNOWN_PROMPT_FIELD_NAMES:
return True
if any((token in lowered for token in ['.prompt', ':prompt', '.user_prompt', ':user_prompt', '.positive_prompt', ':positive_prompt'])):
return True
if 'cliptextencode.text' in lowered or 'cliptextencode:text' in lowered:
return True
if 'primitivestringmultiline.value' in lowered or 'primitivestringmultiline:value' in lowered:
return True
if 'primitivestring.value' in lowered or 'primitivestring:value' in lowered:
return True
if lowered in {'text', 'value'}:
return False
if (lowered.endswith('.text') or lowered.endswith(':text')) and is_known_prompt_node_type(lowered):
return True
if (lowered.endswith('.value') or lowered.endswith(':value')) and any((marker in lowered for marker in {'primitivestringmultiline', 'primitivestring'})):
return True
return False
def split_remote_binding_key(value: Any) -> tuple[str, str]:
text = str(value or '').strip()
if not text:
return ('', '')
if ':' not in text:
return ('', text)
(node_id, suffix) = text.split(':', 1)
return (str(node_id).strip(), str(suffix).strip())
def collect_remote_binding_aliases(*values: Any) -> set[str]:
aliases: set[str] = set()
for raw in values:
text = str(raw or '').strip()
if not text:
continue
variants = [text]
if ':' in text:
(_, suffix) = split_remote_binding_key(text)
if suffix:
variants.append(suffix)
for variant in variants:
normalized = normalize_prompt_label(variant)
if normalized:
aliases.add(normalized)
dot_parts = [part for part in re.split('[.:]', variant) if str(part).strip()]
if dot_parts:
aliases.add(normalize_prompt_label(dot_parts[-1]))
aliases.add(normalize_prompt_label('.'.join(dot_parts[-2:])))
for part in dot_parts:
aliases.add(normalize_prompt_label(part))
return {alias for alias in aliases if alias}
REMOTE_LOGICAL_FIELD_ALIASES = {
'seed': {'seed', 'randomseed', 'fixedseed'},
'steps': {'steps', 'step', 'samplingsteps', 'samplingstep', 'numsteps'},
'duration': {'duration', 'seconds', 'second', 'videoduration'},
'aspect_ratio': {'aspectratio', 'ratio'},
'resolution': {'resolution', 'maxresolution', 'size'},
'width': {'width', 'imagewidth', 'outputwidth', 'targetwidth'},
'height': {'height', 'imageheight', 'outputheight', 'targetheight'},
'model_name': {'model', 'modelname', 'checkpoint'},
}
REMOTE_LOGICAL_FIELD_TEXT_MARKERS = {
'seed': {'随机'},
'steps': {'步数', '采样步数'},
'duration': {'时长'},
'aspect_ratio': {'比例', '画幅'},
'resolution': {'分辨率', '清晰度', '规格'},
'width': {'宽度'},
'height': {'高度'},
'model_name': {'模型档位', '档位'},
}
def is_remote_logical_field(logical_name: str, *values: Any) -> bool:
logical = str(logical_name or '').strip()
if not logical:
return False
raw_values = [str(value or '').strip() for value in values if str(value or '').strip()]
aliases = collect_remote_binding_aliases(*raw_values)
if aliases & (REMOTE_LOGICAL_FIELD_ALIASES.get(logical) or set()):
return True
markers = REMOTE_LOGICAL_FIELD_TEXT_MARKERS.get(logical) or set()
return any((marker in value for marker in markers for value in raw_values))
def is_remote_seed_field(*values: Any) -> bool:
return is_remote_logical_field('seed', *values)
def is_text_input_field(field_type: str | None) -> bool:
return str(field_type or '').strip().lower() in TEXT_INPUT_FIELD_TYPES
def is_remote_prompt_input_node(node: dict[str, Any]) -> bool:
field_type = str(node.get('field_type') or '').strip().lower()
field_name = str(node.get('field_name') or '').strip()
field_label = str(node.get('field_label') or '').strip()
variable_name = str(node.get('variable_name') or '').strip()
node_type = str(node.get('node_type') or '').strip()
if not is_text_input_field(field_type):
return False
if has_prompt_excluded_marker(field_name, field_label, variable_name, node_type):
return False
if is_negative_prompt_key(field_name, field_label):
return False
if is_explicit_prompt_label(field_label):
return True
if is_known_prompt_binding_key(field_name) or is_known_prompt_binding_key(variable_name):
return True
if normalize_prompt_label(field_name) == 'text' and is_known_prompt_node_type(node_type):
return True
return False
def infer_remote_contract_logical_name(*, field_name: str='', field_label: str='', variable_name: str='', node_type: str='', media_type: str | None=None, prompt_like: bool=False, negative_prompt_like: bool=False) -> str:
if negative_prompt_like:
return 'negative_prompt'
if prompt_like:
return 'prompt'
if media_type:
return media_type
(_, variable_suffix) = split_remote_binding_key(variable_name)
strict_values = (field_name, field_label, variable_suffix)
joined = ' '.join([field_name, field_label, variable_suffix, node_type]).lower()
if 'batch_size' in joined or 'batch size' in joined:
return 'batch_size'
if is_remote_logical_field('aspect_ratio', *strict_values):
return 'aspect_ratio'
if is_remote_logical_field('resolution', *strict_values):
return 'resolution'
if is_remote_logical_field('width', *strict_values):
return 'width'
if is_remote_logical_field('height', *strict_values):
return 'height'
if is_remote_logical_field('duration', *strict_values):
return 'duration'
if is_remote_logical_field('seed', *strict_values):
return 'seed'
if is_remote_logical_field('steps', *strict_values):
return 'steps'
if is_remote_logical_field('model_name', *strict_values):
return 'model_name'
if 'cfg_scale' in joined or ('cfg' in joined and 'scale' in joined):
return 'cfg_scale'
if 'strength' in joined or 'denoise' in joined:
return 'strength'
if 'sound' in joined or 'audio' in joined:
return 'sound'
raw = str(field_name or variable_name or field_label or node_type or '').strip()
if not raw:
return 'unknown'
normalized = re.sub('[^A-Za-z0-9_]+', '_', raw).strip('_').lower()
return normalized or 'unknown'
def normalize_remote_field_options(field_options: Any) -> list[dict[str, Any]]:
normalized: list[dict[str, Any]] = []
metadata_keys = {'max', 'min', 'precision', 'step', 'step2', 'hideonzoom', 'minnodesize'}
if isinstance(field_options, str):
text = field_options.strip()
if text.startswith('{') or text.startswith('['):
try:
field_options = json.loads(text)
except Exception:
return normalized
else:
return normalized
if isinstance(field_options, dict):
if set((str(key).strip().lower() for key in field_options.keys())).issubset(metadata_keys):
return normalized
iterable = field_options.items()
elif isinstance(field_options, list):
iterable = enumerate(field_options)
else:
return normalized
for (raw_key, raw_value) in iterable:
if isinstance(raw_value, list):
for item in raw_value:
normalized.append({'value': item, 'label': str(item)})
continue
if isinstance(raw_value, dict):
value = raw_value.get('value', raw_value.get('id', raw_key))
label = raw_value.get('label', raw_value.get('name', value))
else:
value = raw_value
label = raw_value
normalized.append({'value': value, 'label': str(label)})
return normalized
def build_remote_contract_field_from_node(node: dict[str, Any], *, source: str, priority: int) -> dict[str, Any]:
field_name = str(node.get('field_name') or '').strip()
field_label = str(node.get('field_label') or '').strip()
variable_name = str(node.get('variable_name') or '').strip()
node_type = str(node.get('node_type') or '').strip()
field_type = str(node.get('field_type') or '').strip().lower()
media_type = classify_remote_media_slot(node)
prompt_like = is_remote_prompt_input_node(node)
negative_prompt_like = is_negative_prompt_key(field_name, field_label)
if not prompt_like and (not negative_prompt_like) and is_text_input_field(field_type) and (not has_prompt_excluded_marker(field_name, field_label, variable_name, node_type)):
normalized_hint = normalize_prompt_label(field_name or field_label)
if normalized_hint in {'text', 'prompt'} or is_known_prompt_binding_key(variable_name):
prompt_like = True
logical_name = infer_remote_contract_logical_name(field_name=field_name, field_label=field_label, variable_name=variable_name, node_type=node_type, media_type=media_type, prompt_like=prompt_like, negative_prompt_like=negative_prompt_like)
display_name = humanize_remote_prefill_label(node)
return {'source': source, 'priority': priority, 'support_level': 'supported', 'logical_name': logical_name, 'display_name': display_name, 'field_name': field_name, 'field_label': field_label, 'variable_name': variable_name, 'node_type': node_type, 'field_type': field_type, 'field_value': node.get('field_value'), 'field_options': normalize_remote_field_options(node.get('field_options')), 'user_input': field_type != 'hidden' or media_type is not None or prompt_like or negative_prompt_like, 'system_inject': negative_prompt_like, 'media_type': media_type, 'prompt_like': prompt_like, 'negative_prompt_like': negative_prompt_like, 'execution_binding': {'write_key': variable_name or None}, 'raw_node': node}
def build_remote_contract_field_from_hint(logical_name: str, binding_key: str, *, source: str, priority: int, support_level: str='hint_only', user_input: bool=True) -> dict[str, Any]:
return {'source': source, 'priority': priority, 'support_level': support_level, 'logical_name': logical_name, 'display_name': logical_name, 'field_name': logical_name, 'field_label': logical_name, 'variable_name': str(binding_key), 'node_type': 'workflow_hint', 'field_type': 'hint', 'field_value': None, 'field_options': [], 'user_input': user_input, 'system_inject': logical_name == 'negative_prompt', 'media_type': logical_name if logical_name in {'image', 'audio', 'video'} else None, 'prompt_like': logical_name == 'prompt', 'negative_prompt_like': logical_name == 'negative_prompt', 'execution_binding': {'write_key': str(binding_key)}, 'raw_node': None}
def empty_remote_contract_bindings() -> dict[str, Any]:
return {'prompt_keys': [], 'supported_prompt_keys': [], 'hint_prompt_keys': [], 'negative_prompt_keys': [], 'logical': {}, 'media_slots': {'image': [], 'audio': [], 'video': []}}
def collect_remote_contract_field_aliases(field: dict[str, Any]) -> set[str]:
field_name = str(field.get('field_name') or '').strip()
field_label = str(field.get('field_label') or '').strip()
variable_name = str(field.get('variable_name') or '').strip()
node_type = str(field.get('node_type') or '').strip()
aliases = collect_remote_binding_aliases(field_name, field_label, variable_name, node_type)
(_, variable_suffix) = split_remote_binding_key(variable_name)
if variable_suffix:
aliases.update(collect_remote_binding_aliases(variable_suffix))
if node_type and field_name:
aliases.update(collect_remote_binding_aliases(f'{node_type}.{field_name}'))
if node_type and field_label:
aliases.update(collect_remote_binding_aliases(f'{node_type}.{field_label}'))
return aliases
def choose_safe_prompt_upgrade_field(contract: dict[str, Any], hint_key: str) -> dict[str, Any] | None:
(hint_node_id, hint_suffix) = split_remote_binding_key(hint_key)
if not hint_node_id or not hint_suffix:
return None
hint_aliases = collect_remote_binding_aliases(hint_key, hint_suffix)
exact_candidates: list[dict[str, Any]] = []
structural_candidates: list[dict[str, Any]] = []
for field in contract.get('fields') or []:
if not is_real_remote_text_input_contract_field(field):
continue
variable_name = str(field.get('variable_name') or '').strip()
(field_node_id, _) = split_remote_binding_key(variable_name)
if field_node_id != hint_node_id:
continue
field_aliases = collect_remote_contract_field_aliases(field)
if hint_aliases & field_aliases:
exact_candidates.append(field)
continue
if is_structurally_prompt_like_remote_text_field(field):
structural_candidates.append(field)
deduped_exact = {str(item.get('variable_name') or '').strip(): item for item in exact_candidates}
if len(deduped_exact) == 1:
return next(iter(deduped_exact.values()))
if len(deduped_exact) > 1:
return None
deduped_structural = {str(item.get('variable_name') or '').strip(): item for item in structural_candidates}
if len(deduped_structural) == 1:
return next(iter(deduped_structural.values()))
return None
def promote_supported_prompt_fields_from_hints(contract: dict[str, Any], hint_keys: list[str] | None) -> list[dict[str, Any]]:
promotions: list[dict[str, Any]] = []
seen_promotions: set[tuple[str, str]] = set()
for hint_key in hint_keys or []:
hint_text = str(hint_key or '').strip()
if not hint_text:
continue
candidate = choose_safe_prompt_upgrade_field(contract, hint_text)
if not candidate:
continue
write_key = str((candidate.get('execution_binding') or {}).get('write_key') or candidate.get('variable_name') or '').strip()
if not write_key:
continue
already_supported_prompt = bool(candidate.get('prompt_like')) and str(candidate.get('logical_name') or '').strip() == 'prompt' and (str(candidate.get('support_level') or 'supported').strip().lower() == 'supported')
if already_supported_prompt:
continue
dedupe = (hint_text, write_key)
if dedupe in seen_promotions:
continue
seen_promotions.add(dedupe)
candidate['prompt_like'] = True
candidate['logical_name'] = 'prompt'
candidate['support_level'] = 'supported'
candidate['support_reason'] = 'workflow_hint_promoted'
hint_binding_keys = candidate.setdefault('hint_binding_keys', [])
add_unique_key(hint_binding_keys, hint_text)
promotions.append({'hint_key': hint_text, 'promoted_to': write_key, 'source': str(candidate.get('source') or ''), 'reason': 'same_node_exposed_text_input'})
return promotions
def rebuild_remote_contract_bindings(contract: dict[str, Any]) -> None:
rebuilt = empty_remote_contract_bindings()
contract['bindings'] = rebuilt
for field in contract.get('fields') or []:
binding_key = str((field.get('execution_binding') or {}).get('write_key') or '').strip()
logical_name = str(field.get('logical_name') or '').strip()
media_type = field.get('media_type')
support_level = str(field.get('support_level') or 'supported').strip().lower()
if binding_key and field.get('prompt_like'):
if support_level == 'supported':
add_unique_key(rebuilt['prompt_keys'], binding_key)
add_unique_key(rebuilt['supported_prompt_keys'], binding_key)
else:
add_unique_key(rebuilt['hint_prompt_keys'], binding_key)
if binding_key and field.get('negative_prompt_like'):
add_unique_key(rebuilt['negative_prompt_keys'], binding_key)
if binding_key:
add_to_logical = bool(logical_name)
if field.get('prompt_like') and logical_name == 'prompt' and (support_level != 'supported'):
add_to_logical = False
if field.get('negative_prompt_like') and logical_name == 'negative_prompt' and (support_level != 'supported'):
add_to_logical = False
if add_to_logical:
rebuilt['logical'].setdefault(logical_name, [])
add_unique_key(rebuilt['logical'][logical_name], binding_key)
if media_type in {'image', 'audio', 'video'}:
current_slots = rebuilt['media_slots'][str(media_type)]
node_prefix = binding_key.split(':', 1)[0]
if not any((str(existing).split(':', 1)[0] == node_prefix for existing in current_slots)):
add_unique_key(current_slots, binding_key)
def add_remote_contract_field(contract: dict[str, Any], field: dict[str, Any]) -> None:
fields = contract.setdefault('fields', [])
bindings = contract.setdefault('bindings', empty_remote_contract_bindings())
seen_keys = contract.setdefault('_seen_binding_keys', set())
binding_key = str((field.get('execution_binding') or {}).get('write_key') or '').strip()
source = str(field.get('source') or '')
dedupe_key = binding_key or f"{source}:{field.get('logical_name')}:{field.get('field_name')}:{field.get('field_label')}"
if dedupe_key in seen_keys:
return
seen_keys.add(dedupe_key)
fields.append(field)
logical_name = str(field.get('logical_name') or '').strip()
media_type = field.get('media_type')
support_level = str(field.get('support_level') or 'supported').strip().lower()
if binding_key and media_type in {'image', 'audio', 'video'}:
current_slots = bindings['media_slots'][str(media_type)]
node_prefix = binding_key.split(':', 1)[0]
if any((str(existing).split(':', 1)[0] == node_prefix for existing in current_slots)):
return
if binding_key:
if field.get('prompt_like'):
if support_level == 'supported':
add_unique_key(bindings['prompt_keys'], binding_key)
add_unique_key(bindings['supported_prompt_keys'], binding_key)
else:
add_unique_key(bindings['hint_prompt_keys'], binding_key)
if field.get('negative_prompt_like'):
add_unique_key(bindings['negative_prompt_keys'], binding_key)
add_to_logical = bool(logical_name)
if field.get('prompt_like') and logical_name == 'prompt' and (support_level != 'supported'):
add_to_logical = False
if field.get('negative_prompt_like') and logical_name == 'negative_prompt' and (support_level != 'supported'):
add_to_logical = False
if add_to_logical:
bindings['logical'].setdefault(logical_name, [])
add_unique_key(bindings['logical'][logical_name], binding_key)
if media_type in {'image', 'audio', 'video'}:
add_unique_key(bindings['media_slots'][str(media_type)], binding_key)
def get_supported_remote_prompt_binding_keys(contract: dict[str, Any] | None) -> list[str]:
bindings = (contract or {}).get('bindings') or {}
supported = bindings.get('supported_prompt_keys')
if supported is not None:
return list(supported or [])
return list(bindings.get('prompt_keys') or [])
def build_remote_required_inputs_from_contract(contract: dict[str, Any]) -> list[dict[str, Any]]:
required_inputs: list[dict[str, Any]] = []
bindings = contract.get('bindings') or {}
prompt_keys = get_supported_remote_prompt_binding_keys(contract)
if prompt_keys:
required_inputs.append({'type': 'prompt', 'keys': list(prompt_keys), 'required': True})
media_slots = bindings.get('media_slots') or {}
for media_type in ['image', 'audio', 'video']:
slots = media_slots.get(media_type) or []
if slots:
required_inputs.append({'type': media_type, 'count': len(slots), 'required': True})
return required_inputs
def get_remote_contract_binding_keys(contract_data: dict[str, Any], logical_name: str) -> list[str]:
if str(logical_name) == 'prompt':
return get_supported_remote_prompt_binding_keys(contract_data)
logical = ((contract_data or {}).get('bindings') or {}).get('logical') or {}
return list(logical.get(str(logical_name), []) or [])
def remote_contract_supports(contract: dict[str, Any], logical_name: str) -> bool:
return bool(get_remote_contract_binding_keys(contract, logical_name))
def contract_field_by_binding_key(contract: dict[str, Any]) -> dict[str, dict[str, Any]]:
mapping: dict[str, dict[str, Any]] = {}
for field in contract.get('fields') or []:
binding = str((field.get('execution_binding') or {}).get('write_key') or '').strip()
if not binding:
continue
mapping.setdefault(binding, field)
return mapping
def resolve_remote_input_contract(*, execution_target: dict[str, Any] | None=None, webapp_detail: dict[str, Any] | None=None, workflow: dict[str, Any] | None=None) -> dict[str, Any]:
execution = execution_target or {}
runtime_webapp_detail = execution.get('webapp_detail') or {}
runtime_webapp_detail_data = api.extract_result_data(runtime_webapp_detail)
webapp_detail_data = api.extract_result_data(webapp_detail)
workflow_data = api.extract_result_data(workflow)
input_hints = collect_required_input_hints(workflow_data)
contract: dict[str, Any] = {'source_priority': list(REMOTE_CONTRACT_SOURCE_PRIORITY), 'fields': [], 'bindings': empty_remote_contract_bindings(), 'required_inputs': [], 'candidate_fields': [], 'hint_promotions': [], 'source_status': {'execution_target.webapp_detail.input_nodes': {'available': bool(runtime_webapp_detail_data.get('input_nodes')), 'count': len(runtime_webapp_detail_data.get('input_nodes', []) or [])}, 'webapp_detail.input_nodes': {'available': bool(webapp_detail_data.get('input_nodes')), 'count': len(webapp_detail_data.get('input_nodes', []) or [])}, 'workflow_hints': {'available': bool(input_hints.get('prompt_keys') or input_hints.get('candidate_fields')), 'count': len(input_hints.get('prompt_keys', []) or []) + len(input_hints.get('candidate_fields', []) or [])}}}
for (priority, (source_name, nodes)) in enumerate([('execution_target.webapp_detail.input_nodes', runtime_webapp_detail_data.get('input_nodes', []) or []), ('webapp_detail.input_nodes', webapp_detail_data.get('input_nodes', []) or [])], start=1):
for node in nodes:
add_remote_contract_field(contract, build_remote_contract_field_from_node(node, source=source_name, priority=priority))
workflow_priority = len(REMOTE_CONTRACT_SOURCE_PRIORITY)
contract['hint_promotions'] = promote_supported_prompt_fields_from_hints(contract, list(input_hints.get('prompt_keys') or []))
for key in input_hints.get('prompt_keys', []) or []:
add_remote_contract_field(contract, build_remote_contract_field_from_hint('prompt', str(key), source='workflow_hints', priority=workflow_priority))
for (media_type, keys) in [('image', list(input_hints.get('image_keys') or [])), ('audio', list(input_hints.get('audio_keys') or [])), ('video', list(input_hints.get('video_keys') or []))]:
for (index, binding_key) in enumerate(keys):
add_remote_contract_field(contract, build_remote_contract_field_from_hint(media_type, str(binding_key), source='workflow_hints', priority=workflow_priority))
rebuild_remote_contract_bindings(contract)
contract['source_status']['workflow_hint_safe_promotions'] = {'available': bool(contract.get('hint_promotions')), 'count': len(contract.get('hint_promotions') or [])}
contract['required_inputs'] = build_remote_required_inputs_from_contract(contract)
contract['candidate_fields'] = list(input_hints.get('candidate_fields') or [])
contract['binding_by_logical_name'] = {key: list(value) for (key, value) in (contract.get('bindings', {}).get('logical') or {}).items()}
contract.pop('_seen_binding_keys', None)
return contract
def find_prompt_keys_in_workflow(workflow_data: dict[str, Any]) -> list[str]:
keys: list[str] = []
if not isinstance(workflow_data, dict):
return keys
graph = workflow_data.get('graph') if isinstance(workflow_data.get('graph'), dict) else workflow_data
nodes = graph.get('nodes', []) if isinstance(graph, dict) else []
for node in nodes:
node_id = node.get('id')
node_type_name = str(node.get('type') or '')
node_type = str(node.get('type') or '').lower()
inputs = node.get('inputs') or []
widget_values = node.get('widgets_values') or []
if not isinstance(node_id, (int, str)):
continue
for (idx, inp) in enumerate(inputs):
input_name = str((inp or {}).get('name') or '').strip()
input_label = str((inp or {}).get('label') or '').strip()
name = str(input_name or input_label).lower()
if has_prompt_excluded_marker(node_type_name, input_name, input_label) or is_negative_prompt_key(name):
continue
explicit_label = is_explicit_prompt_label(input_label) or is_explicit_prompt_label(input_name)
known_binding_name = is_known_prompt_binding_key(input_name) or is_known_prompt_binding_key(input_label)
if not explicit_label and (not known_binding_name) and (not (normalize_prompt_label(input_name) == 'text' and is_known_prompt_node_type(node_type_name))):
continue
if idx < len(widget_values):
value = widget_values[idx]
if should_strip_prompt_default(value):
keys.append(f'{node_id}:{input_name or input_label}')
continue
keys.append(f'{node_id}:{input_name or input_label}')
if is_known_prompt_node_type(node_type_name):
has_string_widget = any((isinstance(value, str) and value.strip() for value in widget_values))
if not has_string_widget:
continue
if 'textencode' in node_type or node_type.endswith('text') or '.text' in node_type:
keys.append(f'{node_id}:{node_type_name}.text')
keys.append(f'{node_id}:text')
elif 'primitivestringmultiline' in node_type or 'primitivestring' in node_type:
keys.append(f'{node_id}:{node_type_name}.value')
keys.append(f'{node_id}:value')
else:
keys.append(f'{node_id}:{node_type_name}.prompt')
keys.append(f'{node_id}:prompt')
deduped: list[str] = []
seen: set[str] = set()
for key in keys:
if key and key not in seen:
seen.add(key)
deduped.append(key)
return deduped
def find_media_binding_keys_in_workflow(workflow_data: dict[str, Any]) -> dict[str, list[str]]:
result = {'image': [], 'audio': [], 'video': []}
if not isinstance(workflow_data, dict):
return result
graph = workflow_data.get('graph') if isinstance(workflow_data.get('graph'), dict) else workflow_data
nodes = graph.get('nodes', []) if isinstance(graph, dict) else []
for node in nodes:
node_id = node.get('id')
node_type = str(node.get('type') or '').lower()
if not isinstance(node_id, (int, str)):
continue
if 'loadimage' in node_type:
add_unique_key(result['image'], f'{node_id}:image')
if 'audio' in node_type and ('load' in node_type or 'input' in node_type):
add_unique_key(result['audio'], f'{node_id}:audio')
if 'video' in node_type and ('load' in node_type or 'input' in node_type):
add_unique_key(result['video'], f'{node_id}:video')
return result
def collect_required_input_hints(workflow_data: dict[str, Any]) -> dict[str, Any]:
media_binding_keys = find_media_binding_keys_in_workflow(workflow_data)
result = {'prompt_keys': find_prompt_keys_in_workflow(workflow_data), 'image_slots': len(media_binding_keys['image']), 'audio_slots': len(media_binding_keys['audio']), 'video_slots': len(media_binding_keys['video']), 'image_keys': list(media_binding_keys['image']), 'audio_keys': list(media_binding_keys['audio']), 'video_keys': list(media_binding_keys['video']), 'candidate_fields': []}
if not isinstance(workflow_data, dict):
return result
graph = workflow_data.get('graph') if isinstance(workflow_data.get('graph'), dict) else workflow_data
nodes = graph.get('nodes', []) if isinstance(graph, dict) else []
candidate_fields: list[str] = []
for node in nodes:
node_type = str(node.get('type') or '')
lowered = node_type.lower()
node_id = node.get('id')
for inp in node.get('inputs') or []:
name = str((inp or {}).get('name') or (inp or {}).get('label') or '').strip()
low = name.lower()
if not name or low in {'prompt', 'text'} or 'negative' in low:
continue
if any((k in low for k in ['seed', 'step', 'cfg', 'duration', 'ratio', 'size', 'width', 'height', 'image', 'audio', 'video', 'model'])):
candidate_fields.append(f'{node_id}:{name}')
deduped: list[str] = []
seen: set[str] = set()
for item in candidate_fields:
if item not in seen:
seen.add(item)
deduped.append(item)
result['candidate_fields'] = deduped[:20]
return result
def should_strip_prompt_default(value: Any) -> bool:
if not isinstance(value, str):
return False
text = common.normalized_text(value)
if not text:
return False
suspicious_markers = ['example', 'default', 'preset', 'sample', 'template', '示例', '默认', '预设', '样例', '模板']
obvious_prompt_markers = ['masterpiece', 'best quality', 'ultra detailed', 'cinematic', 'poster', 'girl', 'boy', '1girl', '1boy']
return any((x in text for x in suspicious_markers)) or any((x in text for x in obvious_prompt_markers))
def is_real_remote_text_input_contract_field(field: dict[str, Any]) -> bool:
source = str(field.get('source') or '').strip()
field_type = str(field.get('field_type') or '').strip().lower()
field_name = str(field.get('field_name') or '').strip()
field_label = str(field.get('field_label') or '').strip()
variable_name = str(field.get('variable_name') or '').strip()
node_type = str(field.get('node_type') or '').strip()
if source not in REMOTE_EXPOSED_CONTRACT_SOURCES:
return False
if not is_text_input_field(field_type):
return False
if field_type == 'hidden':
return False
if not bool(field.get('user_input', True)):
return False
if is_negative_prompt_key(field_name, field_label):
return False
if has_prompt_excluded_marker(field_name, field_label, variable_name, node_type):
return False
return True
def is_structurally_prompt_like_remote_text_field(field: dict[str, Any]) -> bool:
aliases = collect_remote_contract_field_aliases(field)
if aliases & REMOTE_GENERIC_PROMPT_ALIASES:
return True
node_type = str(field.get('node_type') or '').strip()
return is_known_prompt_node_type(node_type)
# ---- 从 remote.py 移入的字段识别函数 ----
def _split_machine_words(*parts: str) -> list[str]:
raw = ' '.join((str(part or '') for part in parts))
raw = re.sub(r'([a-z0-9])([A-Z])', r'\1 \2', raw)
tokens = re.split(r'[^A-Za-z0-9]+', raw.lower())
return [token for token in tokens if token]
def _contains_cjk(text: str) -> bool:
return bool(re.search(r'[一-鿿]', text or ''))
def classify_remote_media_slot(node: dict[str, Any]) -> str | None:
variable_name = str(node.get('variable_name') or '').lower()
field_name = str(node.get('field_name') or '').lower()
field_label = str(node.get('field_label') or '').lower()
node_type = str(node.get('node_type') or '').lower()
field_type = str(node.get('field_type') or '').lower()
field_value = node.get('field_value')
load_like_node = any((token in node_type for token in {'load', 'input'}))
# Toggle / 数值 / 枚举类字段不可能是上传媒体槽,先一刀切排除。
# 否则像 "generate_audio"、"image_count"、"video_speed" 这种 label 里含 audio/image/video
# 关键词的 boolean / int / 枚举开关,会被误判成需要上传文件的槽。
non_media_field_types = {'boolean', 'bool', 'toggle', 'switch', 'checkbox', 'combo', 'select', 'enum', 'choice', 'option', 'int', 'integer', 'float', 'number', 'slider', 'dropdown'}
if field_type in non_media_field_types:
return None
if isinstance(field_value, bool):
return None
if field_name in {'image', 'images'} or 'image' in field_label or 'loadimage' in node_type or ('image' in variable_name and field_type == 'hidden') or ('image' in node_type and load_like_node and (field_type == 'hidden')):
return 'image'
if field_name in {'audio', 'audios'} or 'audio' in field_label or ('audio' in node_type and load_like_node and (field_type == 'hidden')) or ('audio' in variable_name and field_type == 'hidden'):
return 'audio'
if field_name in {'video', 'videos'} or 'video' in field_label or ('video' in node_type and load_like_node and (field_type == 'hidden')) or ('video' in variable_name and field_type == 'hidden'):
return 'video'
return None
def humanize_remote_prefill_label(node: dict[str, Any]) -> str:
field_label = str(node.get('field_label') or '').strip()
field_name = str(node.get('field_name') or '').strip()
variable_name = str(node.get('variable_name') or '').strip()
field_type = str(node.get('field_type') or '').strip().lower()
joined = ' '.join([field_label.lower(), field_name.lower(), variable_name.lower()])
tokens = _split_machine_words(field_label, field_name, variable_name)
if 'negative' in tokens and 'prompt' in tokens:
return '反向提示词(不想出现什么)'
# 中文负向提示词识别
raw_text = ' '.join([field_label, field_name])
if any(marker in raw_text for marker in ('反向提示词', '负向提示词', '负面提示词', '反向', '负向')):
return '反向提示词(不想出现什么)'
if any((token in joined for token in ['prompt', 'text', '描述', '文本', '内容'])):
return '文本指令(告诉模型这轮要做什么)'
if 'batch_size' in joined:
return '生成数量(一次出几张)'
if any((token in joined for token in ['aspect_ratio', 'aspect-ratio', ' ratio', '比例', '画幅'])):
return '比例 / 画幅(横竖和构图比例)'
if any((token in joined for token in ['resolution', 'size', '规格', '分辨率', '清晰度'])):
return '尺寸 / 规格(清晰度或出图档位)'
if 'width' in joined:
return '宽度(像素)'
if 'height' in joined:
return '高度(像素)'
if any((token in joined for token in ['duration', '时长'])):
return '时长(秒)'
if 'seed' in joined or '随机' in joined:
return '随机性 / Seed(想复现时再固定)'
if any((token in joined for token in ['steps', 'step', '采样步数'])):
return '采样步数(细化强度)'
if 'cfg' in joined:
return '提示词强度(贴合文本的力度)'
if 'guidance' in tokens and 'scale' in tokens:
return '提示词强度(贴合文本的力度)'
if any((token in joined for token in ['strength', 'denoise', '重绘强度'])):
return '重绘强度(改动原图的力度)'
if 'sampler' in tokens:
return '采样方式(生成策略)'
if 'quality' in tokens:
return '画质档位'
if 'weight' in tokens:
return '权重设置(影响力度)'
if 'temperature' in tokens:
return '随机发散度'
if 'top' in tokens and 'p' in tokens:
return '采样范围(Top P)'
if any((token in joined for token in ['model', 'checkpoint', '档位'])):
return '模型档位'
if field_label:
if _contains_cjk(field_label):
return field_label
if field_name and _contains_cjk(field_name):
return field_name
token_labels = {'image': '图片', 'audio': '音频', 'video': '视频', 'prompt': '提示词', 'text': '文本', 'seed': 'Seed', 'random': '随机', 'ratio': '比例', 'aspect': '画幅', 'size': '规格', 'resolution': '分辨率', 'width': '宽度', 'height': '高度', 'duration': '时长', 'step': '步数', 'steps': '步数', 'cfg': '提示词强度', 'guidance': '引导强度', 'scale': '强度', 'strength': '重绘强度', 'denoise': '降噪强度', 'model': '模型', 'checkpoint': '模型', 'sampler': '采样', 'quality': '画质', 'weight': '权重', 'temperature': '随机度', 'count': '数量', 'batch': '批量'}
translated_tokens: list[str] = []
for token in tokens:
label = token_labels.get(token)
if label and label not in translated_tokens:
translated_tokens.append(label)
if translated_tokens:
return f"其他可控参数({' / '.join(translated_tokens[:3])})"
type_labels = {'number': '数值项', 'slider': '数值项', 'select': '选项', 'dropdown': '选项', 'radio': '选项', 'checkbox': '开关项', 'switch': '开关项', 'boolean': '开关项', 'text': '文本项', 'textarea': '文本项', 'customtext': '文本项'}
type_label = type_labels.get(field_type)
if type_label:
return f'其他可控参数({type_label})'
return '其他可控参数(需确认)'