Skip to content

Movie Finder Chain

LangGraph pipeline that powers the Movie Finder conversational AI. The chain takes a natural-language plot description, searches a Qdrant vector store of Wikipedia movie plots (populated by rag_ingestion), enriches the results with live IMDb metadata, and guides the user through a discovery → confirmation → Q&A flow.


Architecture

User message
[route_by_phase]
     ├── phase="discovery" ──► rag_search ──► imdb_enrichment ──► validation ──► presentation ──► END
     │                                                                                  ▲
     │                                                              (refinement loop) ──┘
     ├── phase="confirmation" ──► confirmation ──► confirmed? ──► qa_agent ──► END
     │                                          ├── not found? ──► refinement ──► rag_search (loop)
     │                                          ├── exhausted? ──► dead_end ──► END
     │                                          └── unclear? ──► END (wait for next message)
     └── phase="qa" ──► qa_agent ──► END (loop per user question)

Nodes

Node Model Responsibility
rag_search Embed query with OpenAI, search Qdrant, return top-k candidates
imdb_enrichment Parallel IMDB search + batch_get for full metadata
validation Filter by confidence, deduplicate by IMDb ID, cap at 5
presentation Format candidate pool as an AI message for the user
confirmation Claude Haiku Classify user response: confirmed / not_found / unclear
refinement Claude Sonnet Extract new plot details from conversation, build richer query
qa_agent Claude Sonnet + IMDb tools ReAct agent for open-ended movie Q&A
dead_end Graceful exit after max refinement cycles

Event-driven design

Each user message triggers a new graph.ainvoke() call with the same thread_id. State is persisted by the checkpointer. Use checkpoint_lifespan() with DATABASE_URL for a shared Postgres-backed checkpointer in production; local tests can still use the in-memory MemorySaver. No interrupt() / resume complexity — the graph reads phase from state to decide which branch runs.


Project structure

chain/
├── Makefile
├── pyproject.toml
├── Dockerfile
├── docker-compose.yml
├── .pre-commit-config.yaml
├── README.md
├── CONTRIBUTING.md
├── examples/
│   ├── basic_usage.py          ← discovery + confirmation + Q&A walk-through
│   └── streaming_example.py    ← token-level streaming demo
├── prompts/                    ← originals (kept for reference)
└── src/chain/
    ├── __init__.py             ← exports compile_graph()
    ├── config.py               ← Pydantic Settings (env vars)
    ├── state.py                ← MovieFinderState TypedDict
    ├── graph.py                ← compile_graph()
    ├── prompts/                ← prompt files (inside package for importlib.resources)
    │   ├── confirmation.md
    │   ├── refinement.md
    │   └── qa_context.md
    ├── models/
    │   └── output.py           ← Pydantic output models
    ├── rag/
    │   └── service.py          ← MovieSearchService (OpenAI embed + Qdrant search)
    └── nodes/
        ├── rag_search.py
        ├── imdb_enrichment.py
        ├── validation.py
        ├── presentation.py
        ├── confirmation.py
        ├── refinement.py
        ├── qa_agent.py
        └── dead_end.py

Local workflow

movie-finder-chain is a child repo inside the backend workspace, but its local contributor contract is Docker-only and runs from backend/chain/.

Prerequisites:

  • Docker with the Compose plugin
  • make

Initialize, then start the persistent dev container:

make init       # build image, create .env from template, install git pre-commit hook
make editor-up  # start container for VS Code attach

make editor-up keeps the chain container running for attached-container editing. In a second terminal, use the repo-local targets:

make lint           # ruff check (report only)
make fix            # ruff check --fix + ruff format (auto-apply)
make format         # ruff format (apply)
make typecheck      # mypy --strict
make test           # pytest
make test-coverage  # pytest + coverage XML/HTML report
make pre-commit     # all hooks (full gate — also runs on git commit)
make check          # lint + typecheck + test-coverage

VS Code is configured for this workflow: after make editor-up, attach to the running chain container and use the committed .vscode/ launch/tasks files.

Configuration

Copy .env.example to .env and fill in the values needed for live examples or interactive runs:

cp .env.example .env
Variable Required Description
QDRANT_URL live runs Qdrant Cloud cluster URL
QDRANT_API_KEY_RO live runs Read-only Qdrant API key
QDRANT_COLLECTION_NAME optional Collection name (default: movies)
OPENAI_API_KEY live runs OpenAI embeddings for RAG queries
ANTHROPIC_API_KEY live runs Claude models for confirmation, refinement, and Q&A
DATABASE_URL optional Postgres URL for persistent LangGraph checkpoints
CLASSIFIER_MODEL optional Default: claude-haiku-4-5-20251001
REASONING_MODEL optional Default: claude-sonnet-4-6
RAG_TOP_K optional Qdrant result count (default: 8)
MAX_REFINEMENTS optional Max refinement cycles before dead-end (default: 3)
CONFIDENCE_THRESHOLD optional Minimum IMDb match confidence (default: 0.3)
LANGSMITH_TRACING optional true to enable LangSmith tracing
LANGSMITH_API_KEY optional LangSmith API key
LANGSMITH_PROJECT optional LangSmith project name (default: movie-finder)

make test and make test-coverage do not require live Qdrant or LLM credentials because the test suite fully stubs those integrations.

QDRANT_API_KEY_RW is intentionally absent from this repo. The chain is a read-only Qdrant consumer and never writes to the collection.


Usage

Minimal example

import asyncio
from langchain_core.messages import HumanMessage
from chain import checkpoint_lifespan, compile_graph

async def main():
    async with checkpoint_lifespan() as checkpointer:
        graph = compile_graph(checkpointer=checkpointer)
    config = {"configurable": {"thread_id": "session-1"}}

    # Turn 1 — Discovery
    state = await graph.ainvoke(
        {"messages": [HumanMessage("A heist movie where they steal dreams")]},
        config=config,
    )
    print(state["messages"][-1].content)   # candidate pool

    # Turn 2 — Confirmation
    state = await graph.ainvoke(
        {"messages": [HumanMessage("Yes, the first one!")]},
        config=config,
    )
    print(state["messages"][-1].content)   # confirmation ack

    # Turn 3 — Q&A
    state = await graph.ainvoke(
        {"messages": [HumanMessage("Is it suitable for kids?")]},
        config=config,
    )
    print(state["messages"][-1].content)   # IMDb-grounded answer

asyncio.run(main())

Streaming responses

async for event in graph.astream_events(
    {"messages": [HumanMessage("A movie about dreams within dreams")]},
    config=config,
    version="v2",
):
    if event["event"] == "on_chat_model_stream":
        chunk = event["data"].get("chunk")
        if chunk:
            print(chunk.content, end="", flush=True)

FastAPI integration

from collections.abc import AsyncIterator
from contextlib import asynccontextmanager

from fastapi import FastAPI
from langchain_core.messages import AIMessage, HumanMessage

from chain import checkpoint_lifespan, compile_graph


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    async with checkpoint_lifespan() as checkpointer:
        app.state.graph = compile_graph(checkpointer=checkpointer)
        yield


app = FastAPI(lifespan=lifespan)


@app.post("/chat/{session_id}")
async def chat(session_id: str, message: str):
    config = {"configurable": {"thread_id": session_id}}
    state = await app.state.graph.ainvoke(
        {"messages": [HumanMessage(content=message)]},
        config=config,
    )
    last_ai = next(
        (m for m in reversed(state["messages"]) if isinstance(m, AIMessage)),
        None,
    )
    return {"reply": last_ai.content if last_ai else ""}

Testing

All tests run without real API calls (OpenAI, IMDB, Qdrant, Anthropic are all mocked via unittest.mock).

make test
make test-coverage

Test coverage targets

Module What's tested
test_rag_service.py MovieSearchService.search(), _to_list() edge cases
test_nodes.py Every node function (mocked LLM, IMDB, and RAG deps)
test_graph.py Routing functions, graph compilation, node registration
test_models.py Pydantic models: defaults, validation, round-trip serialisation

Development

Use the repo-local Docker targets for all quality checks:

make lint           # ruff check (report only)
make fix            # ruff check --fix + ruff format (auto-apply)
make format         # ruff format
make typecheck      # mypy --strict
make pre-commit     # full hook suite (also enforced on git commit)
make check          # lint + typecheck + test-coverage (CI gate)

Hooks enforced on every git commit via the hook installed by make init: trailing-whitespace, end-of-file-fixer, check-yaml, check-toml, check-json, check-ast, debug-statements, detect-private-key, detect-secrets, ruff-check --fix, ruff-format, mypy.

Examples

make example-basic
make example-streaming

Container workflow

make editor-up starts a persistent chain container for attached-container editing, interactive debugging, and ad hoc shell access via make shell.

docker-compose.yml intentionally defines only the chain container. Qdrant remains external-only; this repo no longer ships or documents a local Qdrant developer flow.


LangSmith Monitoring

Set the following environment variables to enable end-to-end tracing in LangSmith:

LANGSMITH_TRACING=true
LANGSMITH_ENDPOINT=https://api.smith.langchain.com
LANGSMITH_API_KEY=<your key>
LANGSMITH_PROJECT=movie-finder

The graph mirrors these to the legacy LANGCHAIN_* aliases automatically. Each graph invocation produces a run trace showing all nodes, LLM calls, tool invocations, and their latencies.


Dependencies

Package Purpose
langgraph Stateful graph runtime
langchain-anthropic Claude models (classifier + Q&A agent)
langchain-core Message types, tool protocol
langsmith Observability / tracing
imdbapi-client IMDb REST API client (path dep from ./imdbapi — nested submodule)
openai text-embedding-3-large for RAG query embedding
qdrant-client Vector search against the movie plot collection
pydantic-settings Env-var configuration with validation