← Back to Blog

Build a Production RAG Pipeline with pgvector, Python, and the Anthropic API

Most RAG demos are not RAG. They paste a doc into the prompt, call it retrieval, and move on. The model hallucinates anyway because the wrong chunks make it into the context. Real RAG is a pipeline: chunk the source, embed chunks, store them in a vector index, retrieve the top-k by similarity, and only then ask the model to answer. Every step matters. Skip one and the answers drift.

This article builds that pipeline with parts you can actually ship. Postgres with the pgvector extension handles storage and similarity search. Voyage AI produces embeddings at 1024 dimensions. The Anthropic API generates the final answer. The whole thing fits in 200 lines of Python, runs locally with Docker, and scales to millions of chunks without changing the code.

Prerequisites

Step 1: Postgres with pgvector

The cleanest local setup is the pgvector/pgvector:pg16 image. It ships pgvector preinstalled.

docker run -d --name pgvector-demo \
  -e POSTGRES_PASSWORD=*** \
  -e POSTGRES_DB=rag \
  -p 5432:5432 \
  pgvector/pgvector:pg16

Verify the extension is loadable:

docker exec -it pgvector-demo psql -U postgres -d rag -c "CREATE EXTENSION IF NOT EXISTS vector;"
docker exec -it pgvector-demo psql -U postgres -d rag -c "SELECT extversion FROM pg_extension WHERE extname='vector';"

You should see something like 0.8.0 (the version bundled with the image at the time of writing). The current pgvector release line is 0.8.x per the pgvector GitHub releases. Use pgvector/pgvector:pg17 if you want Postgres 17.

Step 2: Schema and Index

One table holds the chunks. One column holds the embedding as a fixed-size vector. HNSW gives you millisecond queries at the cost of a slower build and more RAM. IVFFlat is faster to build but needs a lists parameter tuned to your row count. For under one million chunks, HNSW is the default choice.

CREATE TABLE chunks (
  id BIGSERIAL PRIMARY KEY,
  source TEXT NOT NULL,
  chunk_index INT NOT NULL,
  content TEXT NOT NULL,
  embedding vector(1024) NOT NULL,
  created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX chunks_embedding_hnsw ON chunks
  USING hnsw (embedding vector_cosine_ops);

vector_cosine_ops picks cosine distance, the right metric for normalized embeddings from modern models. The pgvector docs cover the index types and tradeoffs in detail.

Step 3: Dependencies

python -m venv .venv && source .venv/bin/activate
pip install anthropic voyageai psycopg[binary] python-dotenv

The Anthropic and Voyage SDKs are small. psycopg is the maintained Postgres driver. The older psycopg2 works too, but psycopg 3 has better connection pooling and async support if you want it later. Put keys in .env:

ANTHROPIC_API_KEY=sk-ant...
VOYAGE_API_KEY=pa-...
DATABASE_URL=postgresql://postgres:***@localhost:5432/rag

Step 4: Chunk the Source

Chunking is the step most RAG tutorials gloss over, and it is the step that decides whether the system is useful. Too small and chunks lack context. Too big and the embedding averages out the meaning. A solid starting point: 512 tokens per chunk with 64 tokens of overlap.

# chunker.py
from dataclasses import dataclass

@dataclass
class Chunk:
    source: str
    index: int
    content: str

def chunk_text(text: str, source: str, size: int = 512, overlap: int = 64) -> list[Chunk]:
    tokens = text.split()
    out: list[Chunk] = []
    i = 0
    idx = 0
    while i < len(tokens):
        piece = " ".join(tokens[i : i + size])
        out.append(Chunk(source=source, index=idx, content=piece))
        idx += 1
        i += size - overlap
    return out

Whitespace tokenization is a rough approximation. For a real pipeline use a real tokenizer (tiktoken for OpenAI-compatible, or transformers AutoTokenizer for the rest). The point is to bound chunk size in tokens, not characters.

Step 5: Embed and Store

Voyage voyage-3.5 produces 1024-dim vectors and scores well on retrieval benchmarks. voyage-3 is the slightly older sibling and still solid. The input_type flag changes the embedding profile. Use document for indexing, query at search time. They are not the same vectors.

# indexer.py
import os
from dataclasses import dataclass
import voyageai
import psycopg
from dotenv import load_dotenv

load_dotenv()
client = voyageai.Client()
MODEL = "voyage-3.5"

@dataclass
class Chunk:
    source: str
    index: int
    content: str

def embed_chunks(chunks: list[Chunk], batch_size: int = 64) -> list[list[float]]:
    vectors: list[list[float]] = []
    for i in range(0, len(chunks), batch_size):
        batch = [c.content for c in chunks[i : i + batch_size]]
        result = client.embed(
            batch,
            model=MODEL,
            input_type="document",
        )
        vectors.extend(result.embeddings)
    return vectors

def store(chunks: list[Chunk], vectors: list[list[float]]) -> None:
    assert len(chunks) == len(vectors)
    rows = [
        (c.source, c.index, c.content, v)
        for c, v in zip(chunks, vectors)
    ]
    with psycopg.connect(os.environ["DATABASE_URL"]) as conn:
        with conn.cursor() as cur:
            cur.executemany(
                "INSERT INTO chunks (source, chunk_index, content, embedding) "
                "VALUES (%s, %s, %s, %s)",
                rows,
            )
        conn.commit()

if __name__ == "__main__":
    import sys
    for path in sys.argv[1:]:
        text = open(path).read()
        chunks = chunk_text(text, source=path)
        vectors = embed_chunks(chunks)
        store(chunks, vectors)
        print(f"indexed {len(chunks)} chunks from {path}")

Voyage batches up to 128 inputs per call and charges per token. At voyage-3.5 pricing ($0.18 per million tokens as of the Voyage pricing page), indexing a million tokens costs about 18 cents.

Step 6: Retrieve and Generate

The retrieval step mirrors the indexing step but uses input_type="query". Cosine similarity via the HNSW index returns the top-k chunks in a few milliseconds even with a million rows.

# rag.py
import os
import voyageai
import anthropic
import psycopg
from dotenv import load_dotenv

load_dotenv()
voyage = voyageai.Client()
claude = anthropic.Anthropic()
EMBED_MODEL = "voyage-3.5"
GEN_MODEL = "claude-sonnet-4-5"

SYSTEM = """You are a support assistant. Answer the user question using only the provided context. If the context does not contain the answer, say so plainly. Quote relevant phrases and name the source document. Keep answers under 200 words."""

def retrieve(question: str, k: int = 5) -> list[dict]:
    qvec = voyage.embed(
        [question], model=EMBED_MODEL, input_type="query"
    ).embeddings[0]
    with psycopg.connect(os.environ["DATABASE_URL"]) as conn:
        with conn.cursor() as cur:
            cur.execute(
                """
                SELECT source, chunk_index, content,
                       1 - (embedding <=> %s::vector) AS score
                FROM chunks
                ORDER BY embedding <=> %s::vector
                LIMIT %s
                """,
                (qvec, qvec, k),
            )
            return [
                {"source": r[0], "chunk_index": r[1], "content": r[2], "score": float(r[3])}
                for r in cur.fetchall()
            ]

def answer(question: str) -> str:
    hits = retrieve(question)
    context = "\n\n".join(
        f"[source: {h['source']}#{h['chunk_index']} score={h['score']:.3f}]\n{h['content']}"
        for h in hits
    )
    msg = claude.messages.create(
        model=GEN_MODEL,
        max_tokens=1024,
        system=SYSTEM,
        messages=[
            {
                "role": "user",
                "content": (
                    f"Context:\n{context}\n\n"
                    f"Question: {question}"
                ),
            }
        ],
    )
    return msg.content[0].text, hits

if __name__ == "__main__":
    import sys
    q = " ".join(sys.argv[1:]) or "What is the refund policy?"
    text, hits = answer(q)
    print(f"\nRetrieved {len(hits)} chunks (top score: {hits[0]['score']:.3f})\n")
    print(text)

The <=> operator is pgvector's cosine distance. 1 - distance gives similarity. The score range is 0 to 1; for voyage-3.5 anything above 0.7 is usually a strong match, 0.5 to 0.7 is mixed, below 0.5 is a stretch.

A few details worth pointing out:

The system prompt is doing two jobs. It tells the model to refuse to guess when context is missing, and it tells it to cite sources. Both behaviors are easier to get right at the system level than in the user message. The Anthropic prompt engineering guide covers why explicit structure helps.

The retrieval query keeps the source path and chunk index on the result. The model uses those to cite, and you use them to debug when an answer is wrong. The most common failure mode in RAG is "the right chunk was in the database, but the wrong one got retrieved." Knowing which chunks were considered is the only way to diagnose it.

Step 7: An Evaluation Loop

A RAG pipeline without evaluation is just a guess. You need a small set of question-answer pairs and a way to score the system on them. The minimum viable setup: ten to twenty realistic questions, a script that runs the system against each one, and a manual grading pass for the first version.

# eval.py
import json
from rag import answer

CASES = [
    {
        "q": "How long do refunds take?",
        "expected_keywords": ["refund", "days", "billing"],
    },
    {
        "q": "Can I export my data?",
        "expected_keywords": ["export", "csv", "settings"],
    },
    # ...add 10-20
]

def score(answer_text: str, keywords: list[str]) -> float:
    lower = answer_text.lower()
    hits = sum(1 for k in keywords if k in lower)
    return hits / len(keywords)

if __name__ == "__main__":
    results = []
    for c in CASES:
        text, hits = answer(c["q"])
        s = score(text, c["expected_keywords"])
        results.append({"q": c["q"], "score": s, "answer": text, "hits": hits})
        print(f"[{s:.2f}] {c['q']}")
    avg = sum(r["score"] for r in results) / len(results)
    print(f"\nMean keyword coverage: {avg:.2f}")
    with open("eval_results.json", "w") as f:
        json.dump(results, f, indent=2)

Keyword coverage is a weak metric. Better is an LLM-as-judge that scores the answer for faithfulness to the retrieved context (not for correctness in an absolute sense). The Anthropic evaluation docs have working examples. The key idea: separate "did we retrieve the right chunks" from "did the model use them well." Those are different problems with different fixes.

Cost and Performance

A representative query on a 5-chunk context of about 3000 input tokens plus a 400-token answer costs roughly 1.5 cents with claude-sonnet-4-5 ($3 per million input tokens, \$15 per million output tokens per the Anthropic pricing page). Voyage embedding the query is on the order of a millionth of a cent. Storage is a rounding error compared to the generation step. If costs matter, prompt caching cuts the input token cost by up to 90% on repeated context, which happens naturally in follow-up questions.

For latency, the local Postgres HNSW query is sub-10-millisecond on a few million rows. Embedding the query is about 100 milliseconds. Generation is 1 to 2 seconds. Most of the time is generation, and there is not much you can do about that without changing models.

Common Pitfalls

Embedding the whole document at once. A 50-page PDF produces one vector. The model has to compress everything into 1024 dimensions and the result is useless for retrieval. Always chunk.

Mixing input_type flags. Indexing with query and querying with document (or vice versa) quietly degrades retrieval. The two modes are tuned for different distributions. The Voyage docs warn about this and the SDK does not enforce it.

Skipping overlap. Adjacent chunks with no overlap lose information at the boundary. A sentence that spans the cut gets split in two and neither half is a good embedding. Keep overlap at 10 to 20% of chunk size.

Trusting the top-1 result. Always retrieve top-k (5 is a reasonable default) and let the model use what it needs. The top-1 hit is sometimes the wrong one, especially for short questions.

No source citation in the output. A RAG answer without a source the user can click is not really an answer. The model will hallucinate citations if you do not include the source path in the context. The system prompt here forces it.

Next Steps

  • Add prompt caching for the system prompt and retrieved context. Cuts repeated-query cost by up to 90%.
  • Add a reranker. A cross-encoder like cross-encoder/ms-marco-MiniLM-L-6-v2 reranks the top-20 retrieved chunks and usually lifts answer quality by 10 to 20 percent on hard queries.
  • Wrap the server in an MCP server so any MCP-compatible agent can call it. The MCP TypeScript and Python SDKs both expose a clean tool interface.
  • Move from cosine to hybrid search. Postgres tsvector for keyword plus pgvector for semantic, combined with reciprocal rank fusion. The pgvector hybrid search docs cover the pattern.
  • Add observability. Log the question, retrieved chunks (with scores), and the final answer. A handful of these traces is the only way to find the failure modes that matter.

Need Help Implementing This?

I help teams design and build scalable cloud infrastructure, DevOps pipelines, and production-grade systems.

Book a Free Consultation