文件内容
src/founder_signal/setup.py
"""Canonical Founder Signal setup config intake, validation, and import."""
from __future__ import annotations
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from .config import _PROFILE_ID_PATTERN, _validate_profile
from .models import FounderSignalConfig
from .runtime_paths import imported_config_path, runtime_profile_path
_DEFAULT_V2EX_PROVIDERS = ["sov2ex", "node_latest", "configured_seed_urls"]
_KNOWN_PLATFORM_NAMES = {"reddit", "v2ex"}
_LEGACY_FIELD_HINTS = {
"subreddits": "Use platforms.reddit.communities instead of subreddits.",
"seed_reddit_urls": "Use platforms.reddit.seed_urls instead of seed_reddit_urls.",
"excluded_reddit_urls": "Use platforms.reddit.excluded_urls instead of excluded_reddit_urls.",
"draft_publish_command": "Remove draft_publish_command; Draft CLI handoff uses the canonical skill contract.",
}
@dataclass(frozen=True)
class DoctorResult:
status: str
profile_id: str
normalized_config: dict[str, Any]
internal_profile: dict[str, Any]
platform_summaries: list[str]
warnings: list[str]
next_command: str
@dataclass(frozen=True)
class ImportResult:
profile_id: str
profile_path: Path
normalized_config_path: Path
doctor_result: DoctorResult
def load_user_config(config_path: Path) -> dict[str, Any]:
try:
payload = json.loads(config_path.read_text(encoding="utf-8"))
except FileNotFoundError:
raise FileNotFoundError(f"Config file not found: {config_path}") from None
except json.JSONDecodeError as exc:
raise ValueError(f"Config file is not valid JSON: {config_path}: {exc}") from exc
if not isinstance(payload, dict):
raise ValueError(f"Config file must contain one JSON object: {config_path}")
return payload
def doctor_user_config(*, root_dir: Path, config_path: Path) -> DoctorResult:
raw_payload = load_user_config(config_path)
normalized = _normalize_user_config(raw_payload)
internal_profile = _build_internal_profile(normalized)
config = FounderSignalConfig.from_dict(internal_profile)
_validate_profile(config=config, profile_path=config_path)
warnings: list[str] = []
platform_summaries: list[str] = []
for platform_name in sorted(config.platforms):
platform_payload = normalized["platforms"][platform_name]
summary, platform_warnings = _platform_summary(
platform_name=platform_name,
platform_payload=platform_payload,
snapshots=normalized["verified_evidence_snapshots"],
)
platform_summaries.append(summary)
warnings.extend(platform_warnings)
status = "ready" if not warnings else "ready_with_warnings"
next_command = f"python3 -m founder_signal config import {config_path}"
return DoctorResult(
status=status,
profile_id=config.profile_id,
normalized_config=normalized,
internal_profile=internal_profile,
platform_summaries=platform_summaries,
warnings=warnings,
next_command=next_command,
)
def import_user_config(*, root_dir: Path, config_path: Path) -> ImportResult:
doctor = doctor_user_config(root_dir=root_dir, config_path=config_path)
return save_profile_bundle(
root_dir=root_dir,
normalized_config=doctor.normalized_config,
internal_profile=doctor.internal_profile,
doctor_result=doctor,
)
def save_profile_bundle(
*,
root_dir: Path,
normalized_config: dict[str, Any],
internal_profile: dict[str, Any] | None = None,
doctor_result: DoctorResult | None = None,
) -> ImportResult:
if internal_profile is None:
internal_profile = _build_internal_profile(normalized_config)
config = FounderSignalConfig.from_dict(internal_profile)
_validate_profile(
config=config,
profile_path=runtime_profile_path(root_dir, config.profile_id),
)
profiles_dir = root_dir / "profiles"
profiles_dir.mkdir(parents=True, exist_ok=True)
profile_path = runtime_profile_path(root_dir, config.profile_id)
profile_path.write_text(json.dumps(internal_profile, indent=2) + "\n", encoding="utf-8")
normalized_dir = root_dir / "config-imports"
normalized_dir.mkdir(parents=True, exist_ok=True)
normalized_path = imported_config_path(root_dir, config.profile_id)
normalized_path.write_text(json.dumps(normalized_config, indent=2) + "\n", encoding="utf-8")
if doctor_result is None:
doctor_result = DoctorResult(
status="ready",
profile_id=config.profile_id,
normalized_config=normalized_config,
internal_profile=internal_profile,
platform_summaries=[],
warnings=[],
next_command=f"python3 -m founder_signal run {config.profile_id}",
)
return ImportResult(
profile_id=config.profile_id,
profile_path=profile_path,
normalized_config_path=normalized_path,
doctor_result=doctor_result,
)
def normalized_config_from_runtime_profile(payload: dict[str, Any]) -> dict[str, Any]:
config = FounderSignalConfig.from_dict(payload)
_validate_profile(config=config, profile_path=Path(f"{config.profile_id}.json"))
normalized_platforms: dict[str, dict[str, Any]] = {}
for platform_name, platform_config in sorted(config.platforms.items()):
normalized_payload: dict[str, Any] = {
"enabled": bool(platform_config.enabled),
"communities": list(platform_config.communities),
"seed_urls": list(platform_config.seed_urls),
"excluded_urls": list(platform_config.excluded_urls),
}
if platform_name == "v2ex":
normalized_payload["discovery_providers"] = list(platform_config.discovery_providers) or list(
_DEFAULT_V2EX_PROVIDERS
)
normalized_platforms[platform_name] = normalized_payload
return {
"profile_id": config.profile_id,
"enabled": bool(config.enabled),
"product_name": config.product_name,
"product_one_liner": config.product_one_liner,
"target_audience": config.target_audience,
"keywords": list(config.keywords),
"scoring_terms": list(config.scoring_terms),
"negative_scoring_terms": list(config.negative_scoring_terms),
"discovery_terms": list(config.discovery_terms),
"live_discovery_terms": list(config.live_discovery_terms),
"research_terms": list(config.research_terms),
"platforms": normalized_platforms,
"verified_evidence_snapshots": [
{
"platform": snapshot.platform,
"source_url": snapshot.source_url,
"verification_method": snapshot.verification_method,
"verified_by": snapshot.verified_by,
"text_snapshot": snapshot.text_snapshot,
}
for snapshot in config.verified_evidence_snapshots
],
"discovery_mode": config.discovery_mode,
"max_candidates": int(config.max_candidates),
"max_action_cards": 1,
"max_post_age_days": int(config.max_post_age_days),
"preferred_post_age_hours": int(config.preferred_post_age_hours),
"min_comment_count": int(config.min_comment_count),
"max_comment_count": int(config.max_comment_count),
"history_ttl_days": int(config.history_ttl_days),
"draft": {
"generate_publish_intent": True,
"require_confirmation_before_public_publish": True,
},
}
def render_doctor_report(result: DoctorResult, *, config_path: Path) -> str:
lines = [
"Founder Signal Doctor",
"",
f"Status: {'Ready' if result.status == 'ready' else 'Ready with warnings'}",
"",
"Profile:",
f"- {result.profile_id}",
"",
"Platforms:",
]
lines.extend(f"- {summary}" for summary in result.platform_summaries)
lines.extend(
[
"",
"Safety:",
(
"- Draft publish intent enabled"
if result.normalized_config["draft"]["generate_publish_intent"]
else "- Draft publish intent disabled"
),
"- Draft-hosted review/public preview page publishing is automatic after successful runs",
"- External publication outside the Draft review surface requires explicit confirmation",
]
)
if result.warnings:
lines.extend(["", "Warnings:"])
lines.extend(f"- {warning}" for warning in result.warnings)
lines.extend(
[
"",
"Validated config:",
f"- {config_path}",
"",
"Next command:",
result.next_command,
]
)
return "\n".join(lines) + "\n"
def _normalize_user_config(payload: dict[str, Any]) -> dict[str, Any]:
legacy_fields = [field for field in _LEGACY_FIELD_HINTS if field in payload]
if legacy_fields:
hints = " ".join(_LEGACY_FIELD_HINTS[field] for field in legacy_fields)
raise ValueError(f"Legacy config fields detected: {', '.join(sorted(legacy_fields))}. {hints}")
profile_id = _require_string(payload, "profile_id")
if not _PROFILE_ID_PATTERN.match(profile_id):
raise ValueError(
"profile_id must use lowercase letters, digits, '-' or '_'."
)
normalized_platforms = _normalize_platforms(payload.get("platforms"))
enabled_platforms = [
platform_name
for platform_name, platform_payload in normalized_platforms.items()
if platform_payload["enabled"]
]
if not enabled_platforms:
raise ValueError("At least one platform must be enabled in platforms.")
draft_payload = payload.get("draft", {})
if draft_payload is None:
draft_payload = {}
if not isinstance(draft_payload, dict):
raise ValueError("draft must be a JSON object when provided.")
if "draft_publish_command" in draft_payload:
raise ValueError(
"draft.draft_publish_command is not allowed; Draft CLI handoff uses the canonical skill contract."
)
if draft_payload.get("generate_publish_intent", True) is not True:
raise ValueError(
"draft.generate_publish_intent must stay true because Draft handoff is part of the Founder Signal review flow."
)
if draft_payload.get("require_confirmation_before_public_publish", True) is not True:
raise ValueError(
"draft.require_confirmation_before_public_publish must stay true so any publication outside the Draft review surface remains confirmation-gated."
)
normalized = {
"profile_id": profile_id,
"enabled": bool(payload.get("enabled", True)),
"product_name": _require_string(payload, "product_name"),
"product_one_liner": _require_string(payload, "product_one_liner"),
"target_audience": _require_string(payload, "target_audience"),
"keywords": _require_string_list(payload, "keywords", min_items=1),
"scoring_terms": _require_string_list(payload, "scoring_terms", min_items=1),
"negative_scoring_terms": _optional_string_list(payload, "negative_scoring_terms"),
"discovery_terms": _optional_string_list(payload, "discovery_terms"),
"live_discovery_terms": _optional_string_list(payload, "live_discovery_terms"),
"research_terms": _optional_string_list(payload, "research_terms"),
"platforms": normalized_platforms,
"verified_evidence_snapshots": _normalize_verified_snapshots(
payload.get("verified_evidence_snapshots", [])
),
"discovery_mode": _normalized_choice(
payload.get("discovery_mode", "research"),
field_name="discovery_mode",
allowed={"live", "research"},
),
"max_candidates": _require_int(payload, "max_candidates", minimum=1),
"max_action_cards": 1,
"max_post_age_days": _optional_int(payload, "max_post_age_days", default=7, minimum=0),
"preferred_post_age_hours": _optional_int(
payload, "preferred_post_age_hours", default=72, minimum=0
),
"min_comment_count": _optional_int(payload, "min_comment_count", default=0, minimum=0),
"max_comment_count": _optional_int(payload, "max_comment_count", default=250, minimum=0),
"history_ttl_days": _optional_int(payload, "history_ttl_days", default=45, minimum=1),
"draft": {
"generate_publish_intent": bool(draft_payload.get("generate_publish_intent", True)),
"require_confirmation_before_public_publish": True,
},
}
if normalized["min_comment_count"] > normalized["max_comment_count"]:
raise ValueError("min_comment_count must be less than or equal to max_comment_count.")
return normalized
def _build_internal_profile(normalized: dict[str, Any]) -> dict[str, Any]:
payload = {
"profile_id": normalized["profile_id"],
"enabled": normalized["enabled"],
"product_name": normalized["product_name"],
"product_one_liner": normalized["product_one_liner"],
"target_audience": normalized["target_audience"],
"keywords": normalized["keywords"],
"platforms": normalized["platforms"],
"verified_evidence_snapshots": normalized["verified_evidence_snapshots"],
"discovery_mode": normalized["discovery_mode"],
"max_post_age_days": normalized["max_post_age_days"],
"preferred_post_age_hours": normalized["preferred_post_age_hours"],
"min_comment_count": normalized["min_comment_count"],
"max_comment_count": normalized["max_comment_count"],
"history_ttl_days": normalized["history_ttl_days"],
"scoring_terms": normalized["scoring_terms"],
"negative_scoring_terms": normalized["negative_scoring_terms"],
"max_candidates": normalized["max_candidates"],
"max_action_cards": 1,
"draft": normalized["draft"],
}
for optional_key in ("discovery_terms", "live_discovery_terms", "research_terms"):
if normalized[optional_key]:
payload[optional_key] = normalized[optional_key]
return payload
def _normalize_platforms(value: Any) -> dict[str, dict[str, Any]]:
if not isinstance(value, dict):
raise ValueError("platforms is required and must be a JSON object.")
normalized: dict[str, dict[str, Any]] = {}
for raw_name, raw_payload in value.items():
platform_name = str(raw_name).strip().lower()
if platform_name not in _KNOWN_PLATFORM_NAMES:
raise ValueError(f"Unsupported platform '{raw_name}'. Supported platforms: reddit, v2ex.")
if not isinstance(raw_payload, dict):
raise ValueError(f"platforms.{platform_name} must be a JSON object.")
if any(field in raw_payload for field in _LEGACY_FIELD_HINTS):
raise ValueError(
f"Legacy config fields detected under platforms.{platform_name}; use communities, seed_urls, and excluded_urls."
)
normalized_payload = {
"enabled": bool(raw_payload.get("enabled", True)),
"communities": _optional_string_list(raw_payload, "communities"),
"seed_urls": _optional_string_list(raw_payload, "seed_urls"),
"excluded_urls": _optional_string_list(raw_payload, "excluded_urls"),
}
if platform_name == "v2ex":
providers = _optional_string_list(raw_payload, "discovery_providers")
normalized_payload["discovery_providers"] = providers or list(_DEFAULT_V2EX_PROVIDERS)
normalized[platform_name] = normalized_payload
return normalized
def _normalize_verified_snapshots(value: Any) -> list[dict[str, str]]:
if value is None:
return []
if not isinstance(value, list):
raise ValueError("verified_evidence_snapshots must be an array when provided.")
normalized: list[dict[str, str]] = []
for index, item in enumerate(value):
if not isinstance(item, dict):
raise ValueError(f"verified_evidence_snapshots[{index}] must be a JSON object.")
platform = _normalized_choice(
item.get("platform", "reddit"),
field_name=f"verified_evidence_snapshots[{index}].platform",
allowed=_KNOWN_PLATFORM_NAMES,
)
source_url = str(item.get("source_url") or item.get("reddit_url") or "").strip()
if not source_url:
raise ValueError(f"verified_evidence_snapshots[{index}].source_url is required.")
text_snapshot = str(item.get("text_snapshot") or "").strip()
if not text_snapshot:
raise ValueError(
f"verified_evidence_snapshots[{index}].text_snapshot is required."
)
normalized.append(
{
"platform": platform,
"source_url": source_url,
"verification_method": str(item.get("verification_method", "agent_browser")).strip()
or "agent_browser",
"verified_by": str(item.get("verified_by", "")).strip(),
"text_snapshot": text_snapshot,
}
)
return normalized
def _platform_summary(
*,
platform_name: str,
platform_payload: dict[str, Any],
snapshots: list[dict[str, str]],
) -> tuple[str, list[str]]:
communities = platform_payload.get("communities", [])
seed_urls = platform_payload.get("seed_urls", [])
snapshot_count = sum(
1 for snapshot in snapshots if snapshot.get("platform") == platform_name
)
warnings: list[str] = []
if not communities and not seed_urls and snapshot_count == 0:
warnings.append(
f"{platform_name} is enabled but has no communities, seed_urls, or verified_evidence_snapshots."
)
if platform_name == "reddit":
return (
f"Reddit: enabled, {len(communities)} communities, {len(seed_urls)} seed URLs, {snapshot_count} verified snapshots",
warnings,
)
providers = platform_payload.get("discovery_providers", [])
return (
"V2EX: enabled, "
f"{len(communities)} communities, "
f"{len(providers)} discovery providers, "
f"{len(seed_urls)} seed URLs, "
f"{snapshot_count} verified snapshots",
warnings,
)
def _require_string(payload: dict[str, Any], field_name: str) -> str:
value = str(payload.get(field_name) or "").strip()
if not value:
raise ValueError(f"{field_name} is required.")
return value
def _require_string_list(payload: dict[str, Any], field_name: str, *, min_items: int = 0) -> list[str]:
values = _optional_string_list(payload, field_name)
if len(values) < min_items:
raise ValueError(f"{field_name} must contain at least {min_items} item(s).")
return values
def _optional_string_list(payload: dict[str, Any], field_name: str) -> list[str]:
value = payload.get(field_name, [])
if value is None:
return []
if not isinstance(value, list):
raise ValueError(f"{field_name} must be an array when provided.")
values = [str(item).strip() for item in value]
return [item for item in values if item]
def _require_int(payload: dict[str, Any], field_name: str, *, minimum: int | None = None) -> int:
if field_name not in payload:
raise ValueError(f"{field_name} is required.")
return _coerce_int(payload[field_name], field_name=field_name, minimum=minimum)
def _optional_int(
payload: dict[str, Any],
field_name: str,
*,
default: int,
minimum: int | None = None,
) -> int:
if field_name not in payload:
return default
return _coerce_int(payload[field_name], field_name=field_name, minimum=minimum)
def _coerce_int(value: Any, *, field_name: str, minimum: int | None = None) -> int:
try:
result = int(value)
except (TypeError, ValueError) as exc:
raise ValueError(f"{field_name} must be an integer.") from exc
if minimum is not None and result < minimum:
raise ValueError(f"{field_name} must be at least {minimum}.")
return result
def _normalized_choice(value: Any, *, field_name: str, allowed: set[str]) -> str:
normalized = str(value or "").strip().lower()
if normalized not in allowed:
allowed_values = ", ".join(sorted(allowed))
raise ValueError(f"{field_name} must be one of: {allowed_values}.")
return normalized