← Kembali ke Blog

Bikin RAG Pipeline Production dengan pgvector, Python, dan Anthropic API

Kebanyakan demo RAG itu bukan RAG. Mereka tempel dokumen ke prompt, sebut itu retrieval, lalu lanjut. Modelnya tetep halusinasi karena chunk yang salah masuk ke konteks. RAG beneran itu pipeline: chunk sumber, embed chunk-nya, simpan di vector index, retrieve top-k berdasarkan similarity, baru minta model jawab. Setiap langkah penting. Skip satu dan jawabannya nyimpang.

Artikel ini bikin pipeline itu pakai komponen yang beneran bisa di-ship. Postgres dengan ekstensi pgvector urus storage dan similarity search. Voyage AI produksi embedding 1024 dimensi. Anthropic API generate jawaban akhirnya. Semuanya muat di 200 baris Python, jalan lokal pake Docker, dan scale ke jutaan chunk tanpa ubah kode.

Prerequisites

Langkah 1: Postgres dengan pgvector

Setup lokal paling bersih pake image pgvector/pgvector:pg16. pgvector udah preinstalled.

docker run -d --name pgvector-demo \
  -e POSTGRES_PASSWORD=*** \
  -e POSTGRES_DB=rag \
  -p 5432:5432 \
  pgvector/pgvector:pg16

Verifikasi ekstensi bisa di-load:

docker exec -it pgvector-demo psql -U postgres -d rag -c "CREATE EXTENSION IF NOT EXISTS vector;"
docker exec -it pgvector-demo psql -U postgres -d rag -c "SELECT extversion FROM pg_extension WHERE extname='vector';"

Kamu bakal lihat versi kayak 0.8.0 (versi bundled di image saat artikel ini ditulis). Rilis pgvector saat ini di lini 0.8.x per rilis GitHub pgvector. Pake pgvector/pgvector:pg17 kalau mau Postgres 17.

Langkah 2: Schema dan Index

Satu tabel buat simpan chunk. Satu kolom buat embedding sebagai vector dengan ukuran tetap. HNSW kasih kamu query milidetik dengan harga build lebih lambat dan RAM lebih besar. IVFFlat build lebih cepet tapi butuh parameter lists yang di-tune ke jumlah row. Buat di bawah satu juta chunk, HNSW adalah pilihan default.

CREATE TABLE chunks (
  id BIGSERIAL PRIMARY KEY,
  source TEXT NOT NULL,
  chunk_index INT NOT NULL,
  content TEXT NOT NULL,
  embedding vector(1024) NOT NULL,
  created_at TIMESTAMPTZ NOT NULL DEFAULT now()
);

CREATE INDEX chunks_embedding_hnsw ON chunks
  USING hnsw (embedding vector_cosine_ops);

vector_cosine_ops milih cosine distance, metrik yang tepat buat embedding normalisasi dari model modern. Dokumentasi pgvector bahas tipe index dan tradeoff secara detail.

Langkah 3: Dependencies

python -m venv .venv && source .venv/bin/activate
pip install anthropic voyageai psycopg[binary] python-dotenv

SDK Anthropic dan Voyage kecil. psycopg adalah driver Postgres yang maintained. psycopg2 lama juga jalan, tapi psycopg 3 punya connection pooling dan async support yang lebih baik kalau nanti kamu butuh. Taruh key di .env:

ANTHROPIC_API_KEY=sk-ant...
VOYAGE_API_KEY=pa-...
DATABASE_URL=postgresql://postgres:***@localhost:5432/rag

Langkah 4: Chunk Sumber

Chunking adalah langkah yang paling banyak di-skip di tutorial RAG, dan ini langkah yang nentuin sistemnya kepake atau nggak. Kekecilan chunk dan context-nya kurang. Kebesaran dan embedding-nya nge-average arti. Titik mulai yang solid: 512 token per chunk dengan 64 token overlap.

# chunker.py
from dataclasses import dataclass

@dataclass
class Chunk:
    source: str
    index: int
    content: str

def chunk_text(text: str, source: str, size: int = 512, overlap: int = 64) -> list[Chunk]:
    tokens = text.split()
    out: list[Chunk] = []
    i = 0
    idx = 0
    while i < len(tokens):
        piece = " ".join(tokens[i : i + size])
        out.append(Chunk(source=source, index=idx, content=piece))
        idx += 1
        i += size - overlap
    return out

Tokenisasi whitespace itu pendekatan kasar. Buat pipeline beneran pake tokenizer asli (tiktoken buat OpenAI-compatible, atau transformers AutoTokenizer buat yang lain). Intinya: batasin ukuran chunk dalam token, bukan karakter.

Langkah 5: Embed dan Simpan

Voyage voyage-3.5 produksi vector 1024-dim dan skor-nya bagus di benchmark retrieval. voyage-3 kakaknya yang sedikit lebih tua dan masih solid. Flag input_type ngubah profil embedding. Pake document buat indexing, query saat search. Itu bukan vector yang sama.

# indexer.py
import os
from dataclasses import dataclass
import voyageai
import psycopg
from dotenv import load_dotenv

load_dotenv()
client = voyageai.Client()
MODEL = "voyage-3.5"

@dataclass
class Chunk:
    source: str
    index: int
    content: str

def embed_chunks(chunks: list[Chunk], batch_size: int = 64) -> list[list[float]]:
    vectors: list[list[float]] = []
    for i in range(0, len(chunks), batch_size):
        batch = [c.content for c in chunks[i : i + batch_size]]
        result = client.embed(
            batch,
            model=MODEL,
            input_type="document",
        )
        vectors.extend(result.embeddings)
    return vectors

def store(chunks: list[Chunk], vectors: list[list[float]]) -> None:
    assert len(chunks) == len(vectors)
    rows = [
        (c.source, c.index, c.content, v)
        for c, v in zip(chunks, vectors)
    ]
    with psycopg.connect(os.environ["DATABASE_URL"]) as conn:
        with conn.cursor() as cur:
            cur.executemany(
                "INSERT INTO chunks (source, chunk_index, content, embedding) "
                "VALUES (%s, %s, %s, %s)",
                rows,
            )
        conn.commit()

if __name__ == "__main__":
    import sys
    for path in sys.argv[1:]:
        text = open(path).read()
        chunks = chunk_text(text, source=path)
        vectors = embed_chunks(chunks)
        store(chunks, vectors)
        print(f"indexed {len(chunks)} chunks from {path}")

Voyage batch sampai 128 input per call dan charge per token. Di harga voyage-3.5 ($0.18 per juta token per halaman harga Voyage), indexing sejuta token biaya sekitar 18 sen.

Langkah 6: Retrieve dan Generate

Langkah retrieve mirror langkah indexing tapi pake input_type="query". Cosine similarity lewat index HNSW balikin top-k chunk dalam beberapa milidetik bahkan dengan jutaan row.

# rag.py
import os
import voyageai
import anthropic
import psycopg
from dotenv import load_dotenv

load_dotenv()
voyage = voyageai.Client()
claude = anthropic.Anthropic()
EMBED_MODEL = "voyage-3.5"
GEN_MODEL = "claude-sonnet-4-5"

SYSTEM = """Kamu adalah support assistant. Jawab pertanyaan user pakai context yang diberikan aja. Kalau context nggak punya jawabannya, bilang terus terang. Kutip frasa yang relevan dan sebut nama dokumen sumbernya. Jawabnya di bawah 200 kata."""

def retrieve(question: str, k: int = 5) -> list[dict]:
    qvec = voyage.embed(
        [question], model=EMBED_MODEL, input_type="query"
    ).embeddings[0]
    with psycopg.connect(os.environ["DATABASE_URL"]) as conn:
        with conn.cursor() as cur:
            cur.execute(
                """
                SELECT source, chunk_index, content,
                       1 - (embedding <=> %s::vector) AS score
                FROM chunks
                ORDER BY embedding <=> %s::vector
                LIMIT %s
                """,
                (qvec, qvec, k),
            )
            return [
                {"source": r[0], "chunk_index": r[1], "content": r[2], "score": float(r[3])}
                for r in cur.fetchall()
            ]

def answer(question: str) -> str:
    hits = retrieve(question)
    context = "\n\n".join(
        f"[source: {h['source']}#{h['chunk_index']} score={h['score']:.3f}]\n{h['content']}"
        for h in hits
    )
    msg = claude.messages.create(
        model=GEN_MODEL,
        max_tokens=1024,
        system=SYSTEM,
        messages=[
            {
                "role": "user",
                "content": (
                    f"Context:\n{context}\n\n"
                    f"Question: {question}"
                ),
            }
        ],
    )
    return msg.content[0].text, hits

if __name__ == "__main__":
    import sys
    q = " ".join(sys.argv[1:]) or "What is the refund policy?"
    text, hits = answer(q)
    print(f"\nRetrieved {len(hits)} chunks (top score: {hits[0]['score']:.3f})\n")
    print(text)

Operator <=> adalah cosine distance pgvector. 1 - distance kasih similarity. Range score 0 sampai 1; buat voyage-3.5 apa pun di atas 0.7 biasanya match kuat, 0.5 sampai 0.7 campuran, di bawah 0.5 maksa.

Beberapa detail yang perlu dicatet:

System prompt itu ngerjain dua hal. Nyuruh model nolak nebak kalau context kurang, dan nyuruh model nyitasi sumber. Dua perilaku itu lebih gampang dapetnya di level system daripada di user message. Panduan prompt engineering Anthropic bahas kenapa struktur eksplisit ngebantu.

Query retrieval nyimpen path sumber dan index chunk di hasil. Model pake itu buat kutip, dan kamu pake itu buat debug waktu jawabannya salah. Failure mode paling umum di RAG adalah "chunk yang bener ada di database, tapi yang ke-retrieve salah." Tau chunk mana yang dipertimbangkan adalah satu-satunya cara ngediagnosa.

Langkah 7: Eval Loop

Pipeline RAG tanpa evaluasi cuma tebakan. Kamu butuh set kecil pasangan pertanyaan-jawaban dan cara nge-skor sistem di situ. Setup minimum viable: sepuluh sampai dua puluh pertanyaan realistis, script yang jalanin sistem di tiap pertanyaan, dan grading manual buat versi pertama.

# eval.py
import json
from rag import answer

CASES = [
    {
        "q": "Berapa lama proses refund?",
        "expected_keywords": ["refund", "hari", "billing"],
    },
    {
        "q": "Bisa export data saya nggak?",
        "expected_keywords": ["export", "csv", "pengaturan"],
    },
    # ...tambah 10-20
]

def score(answer_text: str, keywords: list[str]) -> float:
    lower = answer_text.lower()
    hits = sum(1 for k in keywords if k in lower)
    return hits / len(keywords)

if __name__ == "__main__":
    results = []
    for c in CASES:
        text, hits = answer(c["q"])
        s = score(text, c["expected_keywords"])
        results.append({"q": c["q"], "score": s, "answer": text, "hits": hits})
        print(f"[{s:.2f}] {c['q']}")
    avg = sum(r["score"] for r in results) / len(results)
    print(f"\nMean keyword coverage: {avg:.2f}")
    with open("eval_results.json", "w") as f:
        json.dump(results, f, indent=2)

Keyword coverage itu metrik lemah. Lebih baik pake LLM-as-judge yang nge-skor jawaban buat faithfulness ke context yang di-retrieve (bukan buat kebenaran absolut). Dokumentasi evaluasi Anthropic punya contoh jalan. Ide kuncinya: pisahin "apa kita retrieve chunk yang bener" sama "apa model make mereka dengan bener." Itu masalah beda dengan fix beda.

Biaya dan Performa

Query representatif di context 5 chunk sekitar 3000 input token plus jawaban 400 token biaya kira-kira 1.5 sen pake claude-sonnet-4-5 ($3 per juta input token, \$15 per juta output token per halaman harga Anthropic). Embedding query di Voyage itu sepersejutaan sen. Storage itu rounding error dibanding langkah generate. Kalau biaya penting, prompt caching motong biaya input token sampai 90% di context yang berulang, yang kejadian secara natural di follow-up question.

Buat latensi, query HNSW Postgres lokal di bawah 10 milidetik di beberapa juta row. Embedding query sekitar 100 milidetik. Generate 1 sampai 2 detik. Sebagian besar waktu adalah generate, dan nggak banyak yang bisa kamu lakuin tanpa ganti model.

Jebakan yang Sering Kejadian

Embed seluruh dokumen sekaligus. PDF 50 halaman produksi satu vector. Model harus kompres semuanya ke 1024 dimensi dan hasilnya useless buat retrieval. Selalu chunk.

Campur flag input_type. Indexing pake query dan querying pake document (atau sebaliknya) diem-diem nurunin retrieval. Dua mode itu di-tune buat distribusi beda. Dokumen Voyage peringatin ini dan SDK nggak enforce.

Skip overlap. Chunk yang bersebelahan tanpa overlap kehilangan informasi di boundary. Kalimat yang nyebrang batasannya kepecah dua dan nggak ada setengahnya yang jadi embedding bagus. Keep overlap di 10 sampai 20% dari ukuran chunk.

Percaya sama hasil top-1. Selalu retrieve top-k (5 default yang masuk akal) dan biarin model pake yang dia butuhin. Top-1 kadang salah, apalagi buat pertanyaan pendek.

Nggak ada kutipan sumber di output. Jawaban RAG tanpa sumber yang bisa di-click user itu bukan jawaban beneran. Model bakal halusinasi kutipan kalau kamu nggak masukin path sumber di context. System prompt di sini maksa itu.

Mau Lanjut ke Mana

  • Tambahin prompt caching buat system prompt dan retrieved context. Motong biaya query berulang sampai 90%.
  • Tambahin reranker. Cross-encoder kayak cross-encoder/ms-marco-MiniLM-L-6-v2 rerank top-20 chunk yang di-retrieve dan biasanya naikin kualitas jawaban 10 sampai 20 persen di query susah.
  • Bungkus server-nya jadi MCP server biar agent yang MCP-compatible bisa manggil dia. SDK MCP TypeScript dan Python dua-duanya expose tool interface yang bersih.
  • Pindah dari cosine ke hybrid search. Postgres tsvector buat keyword plus pgvector buat semantic, digabung pake reciprocal rank fusion. Dokumen pgvector hybrid search bahas polanya.
  • Tambahin observability. Log pertanyaan, chunk yang di-retrieve (dengan score), dan jawaban akhirnya. Beberapa trace aja itu satu-satunya cara nemuin failure mode yang penting.

Butuh Bantuan Implementasi?

Saya membantu tim mendesain dan membangun infrastruktur cloud scalable, pipeline DevOps, dan sistem production-grade.

Konsultasi Gratis