Skip to main content
  1. Posts/

Whitehouse RAG

·2728 words·13 mins
Table of Contents

This post is dedicated to Retrieval Augmented Generation (RAG). I’ll be using two separate US Presidential Administrations’ policies as a dataset. This dataset was scraped and compliled last year, and I’m finally getting around to make it work.

What is RAG?
#

RAG is a technique using Large Language Models (LLMs) that digests documents and adds the relevant information in those documents to a request to the model, making the model’s response far more accurate than just asking for an answer, which often doesnt have the context to provide valid information.

The technique is becoming much more popular across the industry, and is getting more features as new models come out. For instance, Vision Language Models will allow context to be expanded beyond just text, and into multiple modalities.

RAG typically consists of the following stages:

I will be implementing the stages from ingesting raw documents through generation of the LLM’s response in this notebook, and then eventually a custom Python package.

The repo will be available for those who want it, the dataset may be requested.

In this notebook, I’ll be going through a basic RAG implementation using Weaviate, LangChain, and LlamaIndex.

Ingestion and Preprocessing
#

The first stages of the RAG will be conducted using LangChain.

The operations conducted will be loading the documents into memory, performing any preprocessing, chunking, and creating vector embeddings.

Before we do anything, let’s look at the structure of the data:

import json
from pathlib import Path
from pprint import pprint

data = Path(".")/"../data"

biden = data/"bidenwhreleases.jsonl"
trump = data/"trumpwhreleases.jsonl"

with biden.open("r") as f:
    pprint([json.loads(i) for i in f if i.strip()][0])
{'publish_date': 'April 10, 2024',
 'scrape_date': '07/07/2024',
 'text': 'South Lawn 10:14 A.M. EDT PRESIDENT BIDEN: Mr. Prime Minister, Mrs. '
         'Kishida, welcome. Welcome, welcome, welcome. On behalf of Jill and '
         'me, the Vice President and the Second Gentleman, and all the '
         'American people, welcome to the White House. Sixty-four years ago, '
         'our two nations signed a Treaty of Mutual Cooperation and Security. '
         'President Eisenhower said his goal was to establish an '
         'indestructible partnership between our countries. Today, the world '
         'can see that goal has been achieved and that partnership between us '
         'is unbreakable. ...',
 'title': 'Remarks by PresidentBiden and Prime Minister Kishida Fumio of Japan '
          'at ArrivalCeremony',
 'type': 'speech'}
import uuid
from typing import List, Dict, Any
from langchain_core.documents.base import Document
from langchain.document_loaders import JSONLoader


def extract_wh_metadata(
        record: Dict[str, str],
        additional_fields: Dict[str, Any]
) -> List[Document]:
    return {
        "title": record.get("title", ""),
        "type": record.get("type", ""),
        "publish_date": record.get("publish_date", ""),
        "scrape_date": record.get("scrape_date", ""),
        "document_id": str(uuid.uuid4()),
        ** additional_fields
    }


bdocs = JSONLoader(biden, json_lines=True, jq_schema=".",
                   content_key="text", metadata_func=extract_wh_metadata).load()
tdocs = JSONLoader(trump, json_lines=True, jq_schema=".",
                   content_key="text", metadata_func=extract_wh_metadata).load()
# Visual inspection!
bdocs[0]
Document(metadata={'title': 'Remarks by PresidentBiden and Prime Minister Kishida Fumio of Japan at ArrivalCeremony', 'type': 'speech', 'publish_date': 'April 10, 2024', 'scrape_date': '07/07/2024', 'document_id': '683afd35-6dc9-498b-b09a-93063ba8b9aa', 'source': '/workspaces/whitehouserag/whitehouserag/data/bidenwhreleases.jsonl', 'seq_num': 1}, page_content='South Lawn 10:14 A.M. EDT PRESIDENT BIDEN: Mr. Prime Minister, Mrs. Kishida, welcome. Welcome, welcome, welcome. On behalf of Jill and me, the Vice President and the Second Gentleman, and all the American people, welcome to the White House. Sixty-four years ago, our two nations signed a Treaty of Mutual Cooperation and Security. President Eisenhower said his goal was to establish an indestructible partnership between our countries. Today, the world can see that goal has been achieved and that partnership between us is unbreakable. The alliance between Japan and the United States is a cornerstone of peace, security, prosperity in the in the Indo-Pacific and around the world. Ours is truly a global partnership. For that, Mr. Prime Minister Kishida, I thank you...)

The data is now loaded; it’s time to split and chunk it.

from tqdm.notebook import tqdm
from langchain.text_splitter import (
    RecursiveCharacterTextSplitter,
    CharacterTextSplitter,
    SentenceTransformersTokenTextSplitter
)
# Registry for metadata relationship mapping
document_registry: Dict[str, Dict[str, str]] = {}

CHK_SIZE = 300
CHK_OVERLAP = 50

# Options!
rec = RecursiveCharacterTextSplitter(
    chunk_size=CHK_SIZE,
    chunk_overlap=CHK_OVERLAP
)

char = CharacterTextSplitter(
    separator="\n",
    chunk_size=CHK_SIZE,
    chunk_overlap=CHK_OVERLAP
)

tok = SentenceTransformersTokenTextSplitter(
    chunk_overlap=CHK_OVERLAP,
    tokens_per_chunk=CHK_SIZE
)


def split_and_chunk(docs: List[Document], splitter: object, doctitle: str = "") -> List[Document]:

    chunked_docs = []

    for doc in tqdm(docs, f"Chunking {doctitle} docs"):
        doc_id = doc.metadata["document_id"]
        document_registry[doc.metadata["document_id"]] = {
            "title": doc.metadata.get("title"),
            "source": doc.metadata.get("source"),
            "type": doc.metadata.get("type")
        }
        chunks = splitter.split_text(doc.page_content)
        for i, chunk in enumerate(chunks):
            chunk_doc = Document(
                page_content=chunk,
                metadata={
                    "document_id": doc_id,
                    "chunk_index": i,
                    "source": doc.metadata.get("source"),
                    "title": document_registry[doc_id]["title"],
                }
            )
            chunked_docs.append(chunk_doc)
    return chunked_docs


bchonk = split_and_chunk(bdocs, rec, "Biden")
tchonk = split_and_chunk(tdocs, rec, "Trump")
Chunking Biden docs:   0%|          | 0/10729 [00:00<?, ?it/s]



Chunking Trump docs:   0%|          | 0/8476 [00:00<?, ?it/s]
# Visual inspection!
# Note the chunk's composition; that will change based on the chunking strategy
tchonk[0]
Document(metadata={'document_id': 'a8ae7702-3806-41ca-966b-dd99335497e3', 'chunk_index': 0, 'source': '/workspaces/whitehouserag/whitehouserag/data/trumpwhreleases.jsonl', 'title': 'Remarks by President Trump In Farewell Address to the Nation'}, page_content='The White House THE PRESIDENT: My fellow Americans: Four years ago, we launched a great national effort to rebuild our country, to renew its spirit, and to restore the allegiance of this government to its citizens. In short, we embarked on a mission to make America great again for all Americans. As')

Embeddings
#

Now it’s time to take the chunks and make vector embeddings for the LLM to utilize.

The embeddings are going to be lists of floating point representations of the text, essentially numbers that contain the meaning of the chunk based on the embedding model used. The embedding model will be all-MiniLM-L6-v2, which is a small, fast model.

First we’ll prep the Weaviate connection and prepare the database to receive the vectors. I’ve decided to do custom vector embeddings due to hardware constraints on the devices hosting Weaviate; my local GPU can do the processing and uploading.

import os
import weaviate
import weaviate.classes.config as wvc

from sentence_transformers import SentenceTransformer
from weaviate.classes.config import Property, DataType
from weaviate.util import generate_uuid5

# Fancy custom Weaviate deployment
client = weaviate.connect_to_custom(
    http_host=os.getenv("WV_URL"),
    http_port=443,
    http_secure=True,
    grpc_host=os.getenv("WV_GRPC_URL"),
    grpc_port=8444,
    grpc_secure=True,
    skip_init_checks=True
)

assert client.is_ready()

# Create Weaviate collections for each vectorstore
if not client.collections.exists("BidenChunks"):
    client.collections.create(
        name="BidenChunks",
        vectorizer_config=wvc.Configure.Vectorizer.none(),  # you provide vectors
        properties=[
            Property(
                name="content", data_type=DataType.TEXT),
            Property(name="source", data_type=DataType.TEXT),
            Property(name="document_id",
                     data_type=DataType.TEXT),
            Property(name="title", data_type=DataType.TEXT),
            Property(name="chunk_index",
                     data_type=DataType.INT),
        ],
    )

if not client.collections.exists("TrumpChunks"):
    client.collections.create(
        name="TrumpChunks",
        vectorizer_config=wvc.Configure.Vectorizer.none(),  # you provide vectors
        properties=[
            Property(
                name="content", data_type=DataType.TEXT),
            Property(name="source", data_type=DataType.TEXT),
            Property(name="document_id",
                     data_type=DataType.TEXT),
            Property(name="title", data_type=DataType.TEXT),
            Property(name="chunk_index",
                     data_type=DataType.INT),
        ],
    )

# Instantiate custom embedding model with the GPU
model = SentenceTransformer("all-MiniLM-L6-v2", device="cuda")

This was my first pass at the embedding process:

def create_embeddings(docs: List[Document], model: object, vectorstore: object):
    "Basic embedding and upload code, no optimizations other than GPU"
    insertlist = []
    for doc in tqdm(docs):
        # or batch these for efficiency
        vector = model.encode(
            doc.page_content, convert_to_tensor=True, device="cuda").tolist()

        clean_metadata = {
            k: v for k, v in doc.metadata.items()
            if k not in {"uuid", "vector", "id"}
        }

        wv_doc = {
            # optional but consistent
            "uuid": generate_uuid5(doc.page_content),
            "properties": {
                "content": doc.page_content,
                **clean_metadata
            },
            "vector": vector
        }

        insertlist.append(wv_doc)

    vectorstore.data.insert_many(insertlist)


# create_embeddings(bchonk, model, client.collections.get("BidenChunks"))
# create_embeddings(tchonk, model, client.collections.get("TrumpChunks"))

The performance of this function got the job done, but left much to be desired.

If I want to be able to iterate quickly on this project, optimizing this part of the project is the most important thing I can do.

Optimizing
#

With 2 jsonl files totaling ~150MB of data running on a 4070(16gb), it took more than an hour to embed the first file with the above code before it errored out, and the embeddings had to be held in memory on the GPU, so I’m going to use streaming and batch uploads with an adaptive windowing strategy. Memory usage held steady, but utilization fluctuated from 70-95% during the process.

The above function is good for small updates to a vectorstore, but the next one should be good for all, but specifically, larger datasets.

from pynvml import nvmlInit, nvmlDeviceGetHandleByIndex, nvmlDeviceGetMemoryInfo
import time
import json
from pathlib import Path
from typing import Iterator, List
from concurrent.futures import ThreadPoolExecutor, as_completed
from weaviate.collections.classes.data import DataObject
from weaviate.util import generate_uuid5
from tqdm import tqdm


def embed_and_upload(
        batch: List[dict],
        vectors: List[List[float]],
        collection: object,
        retries: int = 3
) -> int:
    # Retries in case of failures
    for attempt in range(1, retries + 1):
        try:
            # Create list of documents for upload with vectors
            objs = []
            for doc, vec in zip(batch, vectors):
                objs.append(
                    # Custom vectors require DataObject
                    DataObject(
                        uuid=generate_uuid5(doc.page_content),
                        vector=vec.tolist() if hasattr(vec, "tolist") else vec,
                        properties={
                            "content": doc.page_content,
                            "source": doc.metadata.get("source", ""),
                            "chunk_index": doc.metadata.get("chunk_index", 0),
                            "document_id": doc.metadata.get("document_id"),
                            "title": doc.metadata.get("title")
                        }))

            # Upload vectors
            collection.data.insert_many(objs)
            return len(objs)
        except Exception as e:
            if attempt == retries:
                log_failed_batch(batch, reason=f"upload_error: {str(e)}")
                return 0
            time.sleep(2 ** attempt)


def log_failed_batch(batch: List[dict], reason: str):
    """Append a failed batch to a JSONL logfile."""
    FAILED_LOG = Path(".")/"../data/logs/failed_batches.jsonl"
    FAILED_LOG.parent.mkdir(parents=True, exist_ok=True)
    with open(FAILED_LOG, "a", encoding="utf-8") as f:
        for doc in batch:
            f.write(json.dumps({
                "reason": reason,
                "content": getattr(doc, "page_content", None),
                "metadata": {
                    "source": getattr(doc, "metadata", {}).get("source"),
                    "chunk_index": getattr(doc, "metadata", {}).get("chunk_index")
                }
            }, ensure_ascii=False) + "\n")


def check_gpu_memory(threshold_mb=1000) -> bool:
    try:
        nvmlInit()
        handle = nvmlDeviceGetHandleByIndex(0)  # use GPU 0
        mem_info = nvmlDeviceGetMemoryInfo(handle)
        free_mb = mem_info.free / 1024 / 1024
        return free_mb > threshold_mb
    except Exception:
        return True  # Fail open


def adaptive_stream_upload(
        doc_stream: Iterator[dict],
        embedder: object,
        collection: object,
        workers: int = 4,
        initial_batch_size: int = 32,
        embed_time_threshold: float = .7,
        upload_time_threshold: float = .5,
        max_batch: int = 128,
        min_batch: int = 8,
        step: int = 8,
):
    batch_size = initial_batch_size
    docs_buffer = []

    with ThreadPoolExecutor(max_workers=workers) as executor:
        futures = []
        for doc in tqdm(doc_stream):
            docs_buffer.append(doc)

            if len(docs_buffer) >= batch_size:
                batch = docs_buffer[:batch_size]
                docs_buffer = docs_buffer[batch_size:]

                start_embed = time.perf_counter()
                texts = [d.page_content for d in batch]
                try:
                    vectors = embedder.encode(
                        texts, convert_to_tensor=False)
                except Exception as e:
                    log_failed_batch(
                        batch, reason=f"embedding_error: {str(e)}")
                    continue
                embed_time = time.perf_counter() - start_embed

                def upload_fn():
                    return embed_and_upload(batch, vectors, collection=collection)

                start_upload = time.perf_counter()
                futures.append(executor.submit(upload_fn))
                upload_time = time.perf_counter() - start_upload

                # --- ADAPTIVE LOGIC ---
                gpu_ok = check_gpu_memory(threshold_mb=1000)
                if embed_time < embed_time_threshold and upload_time < upload_time_threshold and gpu_ok:
                    batch_size = min(batch_size + step, max_batch)
                elif embed_time > 2 * embed_time_threshold or not gpu_ok:
                    batch_size = max(batch_size - step, min_batch)

        # Flush remaining
        if docs_buffer:
            vectors = embedder.encode(
                [d.page_content for d in docs_buffer], convert_to_tensor=False)
            embed_and_upload(docs_buffer, vectors, collection=collection)

        # Wait for uploads to finish
        for f in tqdm(as_completed(futures), total=len(futures), desc="Uploading"):
            _ = f.result()

    print("✅ Done with adaptive streaming.")


adaptive_stream_upload(bchonk, model, client.collections.get("BidenChunks"))
adaptive_stream_upload(tchonk, model, client.collections.get("TrumpChunks"))
100%|██████████| 398491/398491 [04:21<00:00, 1524.92it/s]
Uploading: 100%|██████████| 3118/3118 [00:00<00:00, 56593.94it/s]


✅ Done with adaptive streaming.


100%|██████████| 235740/235740 [02:34<00:00, 1521.51it/s]
Uploading: 100%|██████████| 1846/1846 [00:00<00:00, 16121.02it/s]

✅ Done with adaptive streaming.

The adaptive streaming code reduced the vectorization and upload time for the dataset from over 1.5 hours down to 7 minutes.

This was an extremely effective use of streaming, and it worked even on a smaller consumer grade GPU. It was only ~150MB of data, but it turned into over 500k chunks across hundreds of documents.

The key techniques used in this were

  • Batch processing
  • Multiprocessing with workers
  • Dynamic batch sizing and allocation based on GPU usage

The longest part of this exercise in optimization was getting Weaviate’s GRPC endpoint running on my personal cluster.

Search and Retrieval
#

The next phases of getting this RAG stood up are staging a user’s query and performing vector search to return chunks closest to it, and then adding them to the LLM’s context.

Originally, the plan was to use LangChain, but LlamaIndex’s LLM clients are a bit easier to work with, in my opinion.

import os
import weaviate

from llama_index.vector_stores.weaviate import WeaviateVectorStore
from llama_index.core import VectorStoreIndex, Settings
from llama_index.llms.ollama import Ollama
from whitehouserag.presentation.models.embedding import AllMiniL6V2Embeddings

# Locally hosted phi4
Settings.llm = Ollama(
    model="phi4", base_url="http://phi:11434", request_timeout=120)
# Custom LlamaIndex embedding model class
# Omitted because it's long
Settings.embed_model = AllMiniL6V2Embeddings()

# Initialize Weaviate client with my cluster address
client = weaviate.connect_to_custom(
    http_host=os.getenv("WV_URL"),
    http_port=443,
    http_secure=True,
    grpc_host=os.getenv("WV_GRPC_URL"),
    grpc_port=8444,
    grpc_secure=True,
    skip_init_checks=True
)

# Connections to vectorstores
bvec = WeaviateVectorStore(
    weaviate_client=client,
    index_name="BidenChunks",
    text_key="content",
)
bvec._is_self_created_weaviate_client = False

bstore = VectorStoreIndex.from_vector_store(bvec)
bquery = bstore.as_query_engine(streaming=True, similarity_top_k=10)

tvec = WeaviateVectorStore(
    weaviate_client=client,
    index_name="TrumpChunks",
    text_key="content",
)
tvec._is_self_created_weaviate_client = False

tstore = VectorStoreIndex.from_vector_store(tvec)
tquery = tstore.as_query_engine(streaming=True, similarity_top_k=10)
from IPython.display import Markdown, display


def ipynb_md_render(query_engine, query):
    display(
        Markdown(" ".join([i for i in query_engine.query(query).response_gen])))


question = """
    What is the administration's relationship with Japan 
    based on the provided docs?
    """

display(Markdown("### Biden Admin Response: Raw Output"))
ipynb_md_render(bquery, question)
display(Markdown("### Trump Admin Response: Raw Output"))
ipynb_md_render(tquery, question)

Biden Admin Response: Raw Output
#

The administration maintains a strong and proactive relationship with Japan , characterized by deep engagement and collaboration . This is exempl ified through Prime Minister K ish ida F um io ’s visit to the United States , where significant defense commitments were reinforced within the U .S .- Japan bilateral framework . The efforts include high -level engagements across various sectors like defense and development .

A senior administration official has been particularly noted for setting a new standard as an active and passionate advocate of this relationship , unders c oring a determined approach in driving forward a robust partnership between the two nations . This collaboration is seen as pivotal to their strategy in the Indo -Pacific region , align ing with Japan ’s vision for deeper engagement on the global stage . The administration ’s commitment includes modern izing alliance partner posture in Japan , reflecting strategic adjustments and enhancements in command and control within the bilateral context .

Overall , the relationship is portrayed as essential and dynamic , with both countries working shoulder to shoulder to uphold core values and address global challenges together .

Trump Admin Response: Raw Output
#

The administration maintains a close and collaborative relationship with Japan , emphasizing economic ties and shared security interests . This connection has been strengthened through various meetings and agreements between U .S . President Donald Trump and Japanese Prime Minister Shin zo Abe . Both leaders have expressed a joint commitment to fostering a free and open Indo -Pacific region , highlighting the significance of their alliance in addressing global challenges related to peace , stability , and international law .

In economic terms , the relationship has been bolster ed by trade agreements that are designed to be mutually beneficial . These include the U .S .- Japan Trade Agreement and Digital Trade Agreement , which aim to enhance economic cooperation and provide benefits for both nations . The administration ’s engagement with Japan reflects a strategic approach to strengthening alliances , reinforcing security commitments , and promoting economic prosperity through collaborative efforts .

Overall , the relationship between the U .S . administration and Japan is characterized by strong bilateral ties focused on trade , security , and international stability .

Conclusion
#

In conclusion, using custom scraped data from various official White House Admin sites, I was successfully able to create a Retrieval Augmented Generation project that can be extended to test and utilize new techniques in the field of RAG, as well as a way to get answers directly from my data instead of reading every document I scraped.

Personal Thoughts
#

This was a really cool project, and I finally got elbows deep into RAG from the data collection stage all the way through receiving LLM outputs, all on my own hardware, and all on code that I can reference at any time.

I’m extremely excited to extend this as my own sandbox, and maybe I’ll even make a general pipeline so I can be more informed on any topic I so desire.

Next Steps
#

The next steps for this project would include the following:

  • Reexamine chunking strategy to optimize output returns
  • Examine use of temperature and other hyperparameters in output quality
  • Optimizing the vector index and retrieval functions in Weaviate
  • Adding document title retrieval in the final outputs to increase user confidence
  • Adding reranking before final output generation using a different model to increase precision
  • Add postprocessing to clean spacing and spelling issues
  • Implement output evaluation metrics to allow directed experiments in improving retrieval

Stay tuned, these will probably show up in follow on posts!

If this gets really interesting to users, I’d also add these:

  • Svelte JS frontend with two response windows and one input window
  • Database for monitoring and analysis
  • User feedback loop with tuning of application
  • Duel of the Fates soundtrack

Feedback
#

Dicussion
#

I’d love to see the comment section fill with thoughts on what’s next for the project, or how I could improve this!

Did you like it?
#

Leave a comment in the thread or one of the emojis on the post!

Did I miss something?
#

Also leave a comment on the thread on the post! If I missed something critical I’d love to know what it is.