Skip to content

Chunk-Level Versioning (v0.10.0)

Overview

Chunk-level versioning is a game-changing feature for RAG applications that enables 80-95% embedding cost reduction by tracking and re-embedding only changed chunks instead of entire documents.

The Problem

Traditional RAG systems re-embed entire documents when any change is detected, leading to: - Massive embedding costs for large documents with small changes - Unnecessary API calls to embedding providers - Wasted compute resources on unchanged content - Slower update cycles due to full document processing

The Solution

RAGVersion's chunk-level versioning: 1. Splits documents into chunks using configurable strategies 2. Hashes each chunk for O(1) change detection 3. Detects chunk-level changes (added, removed, unchanged, reordered) 4. Only re-embeds changed chunks when syncing to vector stores 5. Tracks cost savings metrics for visibility


Quick Start

1. Enable Chunk Tracking

Via Configuration File (ragversion.yaml):

chunk_tracking:
  enabled: true
  chunk_size: 500
  chunk_overlap: 50
  splitter_type: "recursive"
  store_chunk_content: true

Via Environment Variables:

export RAGVERSION_CHUNK_ENABLED=true
export RAGVERSION_CHUNK_CHUNK_SIZE=500
export RAGVERSION_CHUNK_CHUNK_OVERLAP=50
export RAGVERSION_CHUNK_SPLITTER_TYPE=recursive

Via Python Code:

from ragversion import AsyncVersionTracker
from ragversion.storage.sqlite import SQLiteStorage
from ragversion.models import ChunkingConfig

# Configure chunk tracking
chunk_config = ChunkingConfig(
    enabled=True,
    chunk_size=500,
    chunk_overlap=50,
    splitter_type="recursive",
    store_chunk_content=True
)

# Initialize tracker with chunk tracking
storage = SQLiteStorage(db_path="ragversion.db")
tracker = AsyncVersionTracker(
    storage=storage,
    chunk_tracking_enabled=True,
    chunk_config=chunk_config
)

await tracker.initialize()

2. Track Documents with Chunks

# Track a document and create chunks
event, chunk_diff = await tracker.track_with_chunks("document.txt")

if chunk_diff:
    print(f"Created {len(chunk_diff.added_chunks)} chunks")
    print(f"Savings: {chunk_diff.savings_percentage:.1f}%")

3. Get Chunk Diff Between Versions

from uuid import UUID

# Get chunk-level diff
chunk_diff = await tracker.get_chunk_diff(
    document_id=UUID("..."),
    from_version=1,
    to_version=2
)

if chunk_diff:
    print(f"Added: {len(chunk_diff.added_chunks)}")
    print(f"Removed: {len(chunk_diff.removed_chunks)}")
    print(f"Unchanged: {len(chunk_diff.unchanged_chunks)}")
    print(f"Embedding cost savings: {chunk_diff.savings_percentage:.1f}%")

4. Smart Vector Store Updates

LangChain Integration:

from ragversion.integrations.langchain import LangChainSync
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_text_splitters import RecursiveCharacterTextSplitter

# Setup components
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(embedding_function=embeddings)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500)

# Enable smart chunk updates
sync = LangChainSync(
    tracker=tracker,
    text_splitter=text_splitter,
    embeddings=embeddings,
    vectorstore=vectorstore,
    enable_chunk_tracking=True  # Enable smart updates
)

# Sync directory - only changed chunks will be re-embedded
await sync.sync_directory("./docs")

LlamaIndex Integration:

from ragversion.integrations.llamaindex import LlamaIndexSync
from llama_index.core import VectorStoreIndex
from llama_index.embeddings.openai import OpenAIEmbedding

# Setup index
embed_model = OpenAIEmbedding()
index = VectorStoreIndex.from_documents([], embed_model=embed_model)

# Enable smart chunk updates
sync = LlamaIndexSync(
    tracker=tracker,
    index=index,
    enable_chunk_tracking=True  # Enable smart updates
)

# Sync directory - only changed chunks will be re-embedded
await sync.sync_directory("./docs")

Architecture

Component Overview

┌─────────────────────────────────────────────────────────────┐
│                     AsyncVersionTracker                      │
│  ┌───────────────────────────────────────────────────────┐  │
│  │           ChunkChangeDetector                         │  │
│  │  ┌─────────────────────────────────────────────────┐ │  │
│  │  │  Hash-based O(1) Chunk Comparison              │ │  │
│  │  │  • Build hash maps (old_chunks, new_chunks)   │ │  │
│  │  │  • Categorize: ADDED, REMOVED, UNCHANGED      │ │  │
│  │  │  • Detect REORDERED chunks                    │ │  │
│  │  └─────────────────────────────────────────────────┘ │  │
│  └───────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│                    ChunkerRegistry                           │
│  ┌──────────────────┐  ┌───────────────────┐               │
│  │ RecursiveChunker │  │ CharacterChunker  │               │
│  │  • LangChain     │  │  • Fixed-size     │               │
│  │  • Semantic      │  │  • Simple         │               │
│  │  • Smart splits  │  │  • Fast           │               │
│  └──────────────────┘  └───────────────────┘               │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│                    Storage Layer                             │
│  ┌──────────────┐  ┌──────────────┐                         │
│  │   SQLite     │  │   Supabase   │                         │
│  │ • Batch ops  │  │ • Batch ops  │                         │
│  │ • Compressed │  │ • Compressed │                         │
│  └──────────────┘  └──────────────┘                         │
└─────────────────────────────────────────────────────────────┘

Change Detection Algorithm

# Simplified pseudocode
def detect_chunk_changes(old_chunks, new_chunks):
    # Build hash maps for O(1) lookup
    old_map = {chunk.content_hash: chunk for chunk in old_chunks}
    new_map = {chunk.content_hash: chunk for chunk in new_chunks}

    added = []
    removed = []
    unchanged = []
    reordered = []

    # Check new chunks
    for new_chunk in new_chunks:
        if new_chunk.content_hash not in old_map:
            added.append(new_chunk)  # New content
        else:
            old_chunk = old_map[new_chunk.content_hash]
            if old_chunk.chunk_index == new_chunk.chunk_index:
                unchanged.append(new_chunk)  # Same content, same position
            else:
                reordered.append(new_chunk)  # Same content, different position

    # Check old chunks for removals
    for old_chunk in old_chunks:
        if old_chunk.content_hash not in new_map:
            removed.append(old_chunk)  # Content was deleted

    return ChunkDiff(
        added_chunks=added,
        removed_chunks=removed,
        unchanged_chunks=unchanged,
        reordered_chunks=reordered
    )

Time Complexity: O(n + m) where n = old chunks, m = new chunks


Configuration Reference

ChunkingConfig Parameters

Parameter Type Default Description
enabled bool False Enable chunk-level tracking (opt-in)
chunk_size int 500 Target size per chunk (tokens or characters)
chunk_overlap int 50 Overlap between chunks for context preservation
splitter_type str "recursive" Chunking strategy: "recursive", "character"
store_chunk_content bool True Store chunk content in database

Chunking Strategies

# Uses LangChain's RecursiveCharacterTextSplitter
chunk_config = ChunkingConfig(
    splitter_type="recursive",
    chunk_size=500,
    chunk_overlap=50
)

Pros: - Respects semantic boundaries (paragraphs, sentences) - Better chunk quality for embeddings - Integrates with LangChain ecosystem

Cons: - Slightly slower than character-based - Requires LangChain installation

Fallback: If LangChain is unavailable, falls back to paragraph-based splitting.

2. Character Chunker

# Simple character-based chunking
chunk_config = ChunkingConfig(
    splitter_type="character",
    chunk_size=500,
    chunk_overlap=50
)

Pros: - Fast - No dependencies - Predictable chunk sizes

Cons: - May split mid-sentence - Lower semantic quality


Integration Patterns

Pattern 1: Auto-Sync on File Changes

from ragversion import AsyncVersionTracker
from ragversion.integrations.langchain import LangChainSync

# Setup tracker with chunk tracking
tracker = AsyncVersionTracker(
    storage=storage,
    chunk_tracking_enabled=True
)

# Setup auto-sync with smart chunk updates
sync = LangChainSync(
    tracker=tracker,
    text_splitter=text_splitter,
    embeddings=embeddings,
    vectorstore=vectorstore,
    enable_chunk_tracking=True
)

# All tracked changes will automatically sync with smart updates
await tracker.track_with_chunks("document.txt")
# → Only changed chunks are re-embedded

Pattern 2: Manual Chunk Diff Analysis

# Track document
event, chunk_diff = await tracker.track_with_chunks("document.txt")

if chunk_diff:
    # Analyze changes
    for added_chunk in chunk_diff.added_chunks:
        print(f"New chunk {added_chunk.chunk_index}: {added_chunk.token_count} tokens")

    # Calculate cost savings
    metrics = tracker.chunk_detector.calculate_savings_metrics(chunk_diff)
    print(f"Cost savings: {metrics['savings_percentage']:.1f}%")
    print(f"Chunks to embed: {metrics['chunks_to_embed']}")

Pattern 3: Batch Processing with Chunk Tracking

# Track entire directory
result = await tracker.track_directory("./docs", max_workers=4)

# Get chunk diffs for all modified documents
for event in result.successful:
    if event.change_type == ChangeType.MODIFIED:
        chunk_diff = await tracker.get_chunk_diff(
            event.document_id,
            event.version_number - 1,
            event.version_number
        )
        if chunk_diff:
            print(f"{event.file_name}: {chunk_diff.savings_percentage:.1f}% savings")

Cost Savings Analysis

Real-World Examples

Example 1: Documentation Update

Scenario: 100-page technical document, 1-paragraph change

Without chunk tracking:
- Total chunks: 500
- Chunks to embed: 500
- Cost: $2.50 (@ $0.005 per chunk)

With chunk tracking:
- Total chunks: 500
- Chunks to embed: 2 (only changed paragraphs)
- Cost: $0.01
- Savings: 99.6%

Example 2: Code Repository

Scenario: 50 Python files, 10 files modified

Without chunk tracking:
- Total files: 50
- Files to re-embed: 50
- Average chunks per file: 20
- Total chunks to embed: 1,000
- Cost: $5.00

With chunk tracking:
- Total files: 50
- Modified files: 10
- Average changed chunks per file: 3
- Total chunks to embed: 30
- Cost: $0.15
- Savings: 97%

Cost Calculation Formula

# Embedding cost savings
total_chunks = len(added) + len(unchanged) + len(reordered)
unchanged_count = len(unchanged) + len(reordered)
savings_percentage = (unchanged_count / total_chunks) * 100

# Dollar savings (example with OpenAI ada-002)
cost_per_chunk = 0.0001  # $0.0001 per 1K tokens
average_tokens_per_chunk = 500
total_cost_without_chunks = total_chunks * cost_per_chunk
total_cost_with_chunks = len(added) * cost_per_chunk
savings_dollars = total_cost_without_chunks - total_cost_with_chunks

Database Schema

Chunks Table

CREATE TABLE chunks (
    id TEXT PRIMARY KEY,
    document_id TEXT NOT NULL,
    version_id TEXT NOT NULL,
    chunk_index INTEGER NOT NULL,
    content_hash TEXT NOT NULL,
    token_count INTEGER NOT NULL,
    created_at TEXT NOT NULL,
    metadata TEXT DEFAULT '{}',
    FOREIGN KEY (document_id) REFERENCES documents(id) ON DELETE CASCADE,
    FOREIGN KEY (version_id) REFERENCES versions(id) ON DELETE CASCADE,
    UNIQUE(version_id, chunk_index)
);

CREATE INDEX idx_chunks_document_id ON chunks(document_id);
CREATE INDEX idx_chunks_version_id ON chunks(version_id);
CREATE INDEX idx_chunks_content_hash ON chunks(content_hash);
CREATE INDEX idx_chunks_doc_version ON chunks(document_id, version_id);

Chunk Content Table

CREATE TABLE chunk_content (
    chunk_id TEXT PRIMARY KEY,
    content BLOB NOT NULL,
    compressed INTEGER DEFAULT 1,
    created_at TEXT NOT NULL,
    FOREIGN KEY (chunk_id) REFERENCES chunks(id) ON DELETE CASCADE
);

Migration Guide

Migrating Existing Documents

Step 1: Run Database Migration

For SQLite (automatic):

# Migrations run automatically on initialize()
storage = SQLiteStorage(db_path="ragversion.db")
await storage.initialize()  # Creates chunk tables if needed

For Supabase (manual):

-- Run in Supabase SQL Editor
-- See: ragversion/storage/migrations/002_chunk_versioning.sql

Step 2: Enable Chunk Tracking

Update ragversion.yaml:

chunk_tracking:
  enabled: true

Step 3: Re-track Documents

# Re-track existing documents to create chunks
documents = await tracker.list_documents(limit=1000)

for doc in documents:
    latest_version = await tracker.get_latest_version(doc.id)
    if latest_version:
        # This will create chunks for the latest version
        event, chunk_diff = await tracker.track_with_chunks(doc.file_path)
        if chunk_diff:
            print(f"Created {len(chunk_diff.added_chunks)} chunks for {doc.file_name}")

Backward Compatibility

Chunk tracking is 100% backward compatible:

  1. Default disabled: Existing installations work unchanged
  2. Opt-in activation: Enable via configuration
  3. Graceful degradation: Falls back to document-level tracking if chunks unavailable
  4. No breaking changes: All existing APIs remain functional

Performance Optimization

Batch Operations

Chunk operations use optimized batch inserts:

# SQLite: executemany for 1000+ chunks in <1 second
await storage.create_chunks_batch(chunks)  # Efficient

# vs. slow individual inserts
for chunk in chunks:
    await storage.create_chunk(chunk)  # Avoid this

Indexing Strategy

The chunk tables use strategic indexes for performance:

-- Primary lookups
idx_chunks_version_id  -- Get all chunks for a version
idx_chunks_content_hash  -- Find duplicate chunks

-- Composite indexes
idx_chunks_doc_version  -- Get chunks for doc + version

Compression

Chunk content is compressed by default using gzip:

# Automatic compression (default)
chunk_config = ChunkingConfig(store_chunk_content=True)

# Compression reduces storage by ~70% for text
original_size = 500 bytes
compressed_size = 150 bytes  # Typical ratio

Troubleshooting

Issue: Chunk tracking not working

Symptom: get_chunk_diff() returns None

Solution:

# Check if chunk tracking is enabled
print(tracker.chunk_tracking_enabled)  # Should be True

# Check configuration
print(tracker.chunk_config.enabled)  # Should be True

# Verify chunks are being created
chunks = await storage.get_chunks_by_version(version_id)
print(f"Found {len(chunks)} chunks")

Issue: Import errors with LangChain

Symptom: ImportError: No module named 'langchain_text_splitters'

Solution:

# Install LangChain dependencies
pip install 'ragversion[langchain]'

# Or install manually
pip install langchain-text-splitters

The chunker will automatically fall back to simple splitting if LangChain is unavailable.

Issue: High memory usage with large documents

Symptom: Memory errors when tracking large PDFs

Solution:

# Increase chunk size to reduce chunk count
chunk_config = ChunkingConfig(
    chunk_size=1000,  # Larger chunks
    chunk_overlap=100
)

# Or disable chunk content storage
chunk_config = ChunkingConfig(
    store_chunk_content=False  # Only store hashes
)


API Reference

AsyncVersionTracker

class AsyncVersionTracker:
    def __init__(
        self,
        storage: BaseStorage,
        chunk_tracking_enabled: bool = False,
        chunk_config: Optional[ChunkingConfig] = None,
        **kwargs
    )

    async def track_with_chunks(
        self,
        file_path: str,
        metadata: Optional[Dict[str, Any]] = None
    ) -> tuple[Optional[ChangeEvent], Optional[ChunkDiff]]

    async def get_chunk_diff(
        self,
        document_id: UUID,
        from_version: int,
        to_version: int
    ) -> Optional[ChunkDiff]

ChunkDiff

class ChunkDiff:
    document_id: UUID
    from_version: int
    to_version: int
    added_chunks: List[Chunk]
    removed_chunks: List[Chunk]
    unchanged_chunks: List[Chunk]
    reordered_chunks: List[Chunk]

    @property
    def total_changes(self) -> int

    @property
    def savings_percentage(self) -> float

LangChainSync

class LangChainSync:
    def __init__(
        self,
        tracker: AsyncVersionTracker,
        text_splitter: TextSplitter,
        embeddings: Embeddings,
        vectorstore: VectorStore,
        enable_chunk_tracking: bool = False
    )

Best Practices

1. Choose Appropriate Chunk Size

# For long-form documents (articles, books)
chunk_size = 1000  # Larger context

# For code and structured data
chunk_size = 500  # Smaller, more granular

# For short documents (emails, tweets)
chunk_size = 200  # Very granular

2. Use Overlap for Context Preservation

# Recommended overlap: 10-20% of chunk size
chunk_overlap = chunk_size // 10  # 10%

3. Monitor Cost Savings

# Log savings for visibility
chunk_diff = await tracker.get_chunk_diff(doc_id, 1, 2)
if chunk_diff:
    metrics = tracker.chunk_detector.calculate_savings_metrics(chunk_diff)
    logger.info(f"Cost savings: {metrics['savings_percentage']:.1f}%")

4. Test Before Production

# Test with dry-run mode
for doc in test_documents:
    event, chunk_diff = await tracker.track_with_chunks(doc)
    if chunk_diff:
        print(f"{doc}: {chunk_diff.savings_percentage:.1f}% savings")

Future Enhancements

Planned for v0.11.0

  • Embedding deduplication: Reuse embeddings for identical chunks across documents
  • Semantic chunking: Use embedding similarity for smarter chunk boundaries
  • Chunk version compression: Store only deltas between chunk versions

Planned for v0.12.0

  • A/B testing: Compare chunking strategies with metrics
  • Auto-optimization: Automatically tune chunk size based on content type
  • Retry logic: Resilient sync with exponential backoff

FAQ

Q: Does chunk tracking work with all vector stores?

A: Yes, as long as the vector store supports: - Adding documents with metadata - Deleting documents by metadata filter (for chunk removal)

Tested with: Chroma, Pinecone, Weaviate, Qdrant, FAISS

Q: Can I use chunk tracking without integrations?

A: Yes! Use track_with_chunks() and get_chunk_diff() directly, then handle syncing manually.

Q: What happens if I disable chunk tracking after enabling it?

A: The system gracefully falls back to document-level tracking. Existing chunks remain in the database but are not used.

Q: How much storage does chunk tracking add?

A: Approximately 2x the document size (chunks + chunk_content tables). Compression reduces this significantly.

Q: Does chunk tracking slow down document tracking?

A: Minimal impact (<5% slower) due to: - Batch insert optimizations - Parallel chunk processing - Efficient hash-based comparison


Support


License

MIT License - see LICENSE file for details.