文件预览

langgraph_workflow_orch_worker.py

查看 langgraph-for-agents 技能包中的文件内容。

文件内容

references/langgraph_workflow_orch_worker.py

import os
import operator
import json
from typing import TypedDict, Annotated, List
from dotenv import load_dotenv
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from langgraph.types import Send
from langchain_core.messages import SystemMessage, HumanMessage

# ignore env load and llm call

# =========================
# Prompts
# =========================
planner_prompt = """
You are a well-known storyteller. 
Your task is to create a compelling story plan based on a given topic. 
Break the story down into 3 distinct chapters. 
For each chapter, provide a short name and a one-sentence description of the key events that should happen. 
Respond with nothing but the JSON object.

Here is an example:
{
    "chapters": [
        {"name": "Chapter 1", "description": "The beginning of the story"},
        {"name": "Chapter 2", "description": "The middle of the story"},
        {"name": "Chapter 3", "description": "The conclusion of the story"}
    ]
}
"""

writer_prompt = """
You are a creative writer. 
Your task is to write a chapter of a story based on a given topic and chapter description. 
The chapter should be 4-5 sentences long. 
Respond with nothing but the chapter content.
"""


# =========================
# TypedDicts for State
# =========================
class Chapter(TypedDict):
    name: str
    description: str


# =========================
# Create State Schema
# =========================
# Orchestrator State
class OrchestratorState(TypedDict):
    topic: str  # The initial story prompt
    story_plan: dict  # The generated plan for the story
    completed_chapters: Annotated[
        list[str], operator.add
    ]  # Chapters written by workers
    final_story: str  # The final completed story


# Worker State
class WorkerState(TypedDict):
    chapter: Chapter  # The plan for a single chapter
    completed_chapters: Annotated[
        list[str], operator.add
    ]  # The list to append the written chapter to


# =========================
# Create Nodes
# =========================
def orchestrator(state: OrchestratorState):
    response = planner_llm.invoke(
        [
            SystemMessage(
                content=planner_prompt
            ),
            HumanMessage(content=f"Story Topic: {state['topic']}"),
        ]
    )
    
    content = response.content
    json_str = content[content.find('{') : content.rfind('}')+1]
    plan = json.loads(json_str)

    return {"story_plan": plan}

def llm_call(state: WorkerState):
    chapter_name = state["chapter"]["name"]
    chapter_description = state["chapter"]["description"]

    chapter_content = writer_llm.invoke(
        [
            SystemMessage(
                content=writer_prompt
            ),
            HumanMessage(
                content=f"Chapter Name: {chapter_name}\nChapter Description: {chapter_description}"
            ),
        ]
    ).content

    return {"completed_chapters": [f"## {chapter_name}\n\n{chapter_content}"]}


def synthesizer(state: OrchestratorState):
    final_story = "\n\n".join(state["completed_chapters"])

    return {"final_story": final_story}

def assign_workers(state: OrchestratorState):
    return [
        Send("llm_call", {"chapter": chapter})
        for chapter in state["story_plan"]["chapters"]
    ]


# =========================
# Build Workflow
# =========================
workflow = StateGraph(OrchestratorState)

workflow.add_node("orchestrator", orchestrator)
workflow.add_node("llm_call", llm_call)
workflow.add_node("synthesizer", synthesizer)

workflow.add_edge(START, "orchestrator")
workflow.add_conditional_edges("orchestrator", assign_workers, ["llm_call"])
workflow.add_edge("llm_call", "synthesizer")
workflow.add_edge("synthesizer", END)

graph = workflow.compile()


if __name__ == "__main__":
    # =========================
    # Execute Workflow
    # =========================
    story_topic = "A group of high school students wake up in an unfamiliar school and are told they must participate in a deadly game."

    initial_state = {"topic": story_topic, "completed_chapters": []}

    final_state = graph.invoke(initial_state)

    print("\n\n--- FINAL STORY ---")
    print(final_state["final_story"])