Chunk-Level Versioning (v0.10.0)¶
Overview¶
Chunk-level versioning is a game-changing feature for RAG applications that enables 80-95% embedding cost reduction by tracking and re-embedding only changed chunks instead of entire documents.
The Problem¶
Traditional RAG systems re-embed entire documents when any change is detected, leading to: - Massive embedding costs for large documents with small changes - Unnecessary API calls to embedding providers - Wasted compute resources on unchanged content - Slower update cycles due to full document processing
The Solution¶
RAGVersion's chunk-level versioning: 1. Splits documents into chunks using configurable strategies 2. Hashes each chunk for O(1) change detection 3. Detects chunk-level changes (added, removed, unchanged, reordered) 4. Only re-embeds changed chunks when syncing to vector stores 5. Tracks cost savings metrics for visibility
Quick Start¶
1. Enable Chunk Tracking¶
Via Configuration File (ragversion.yaml):
chunk_tracking:
enabled: true
chunk_size: 500
chunk_overlap: 50
splitter_type: "recursive"
store_chunk_content: true
Via Environment Variables:
export RAGVERSION_CHUNK_ENABLED=true
export RAGVERSION_CHUNK_CHUNK_SIZE=500
export RAGVERSION_CHUNK_CHUNK_OVERLAP=50
export RAGVERSION_CHUNK_SPLITTER_TYPE=recursive
Via Python Code:
from ragversion import AsyncVersionTracker
from ragversion.storage.sqlite import SQLiteStorage
from ragversion.models import ChunkingConfig
# Configure chunk tracking
chunk_config = ChunkingConfig(
enabled=True,
chunk_size=500,
chunk_overlap=50,
splitter_type="recursive",
store_chunk_content=True
)
# Initialize tracker with chunk tracking
storage = SQLiteStorage(db_path="ragversion.db")
tracker = AsyncVersionTracker(
storage=storage,
chunk_tracking_enabled=True,
chunk_config=chunk_config
)
await tracker.initialize()
2. Track Documents with Chunks¶
# Track a document and create chunks
event, chunk_diff = await tracker.track_with_chunks("document.txt")
if chunk_diff:
print(f"Created {len(chunk_diff.added_chunks)} chunks")
print(f"Savings: {chunk_diff.savings_percentage:.1f}%")
3. Get Chunk Diff Between Versions¶
from uuid import UUID
# Get chunk-level diff
chunk_diff = await tracker.get_chunk_diff(
document_id=UUID("..."),
from_version=1,
to_version=2
)
if chunk_diff:
print(f"Added: {len(chunk_diff.added_chunks)}")
print(f"Removed: {len(chunk_diff.removed_chunks)}")
print(f"Unchanged: {len(chunk_diff.unchanged_chunks)}")
print(f"Embedding cost savings: {chunk_diff.savings_percentage:.1f}%")
4. Smart Vector Store Updates¶
LangChain Integration:
from ragversion.integrations.langchain import LangChainSync
from langchain_openai import OpenAIEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_text_splitters import RecursiveCharacterTextSplitter
# Setup components
embeddings = OpenAIEmbeddings()
vectorstore = Chroma(embedding_function=embeddings)
text_splitter = RecursiveCharacterTextSplitter(chunk_size=500)
# Enable smart chunk updates
sync = LangChainSync(
tracker=tracker,
text_splitter=text_splitter,
embeddings=embeddings,
vectorstore=vectorstore,
enable_chunk_tracking=True # Enable smart updates
)
# Sync directory - only changed chunks will be re-embedded
await sync.sync_directory("./docs")
LlamaIndex Integration:
from ragversion.integrations.llamaindex import LlamaIndexSync
from llama_index.core import VectorStoreIndex
from llama_index.embeddings.openai import OpenAIEmbedding
# Setup index
embed_model = OpenAIEmbedding()
index = VectorStoreIndex.from_documents([], embed_model=embed_model)
# Enable smart chunk updates
sync = LlamaIndexSync(
tracker=tracker,
index=index,
enable_chunk_tracking=True # Enable smart updates
)
# Sync directory - only changed chunks will be re-embedded
await sync.sync_directory("./docs")
Architecture¶
Component Overview¶
┌─────────────────────────────────────────────────────────────┐
│ AsyncVersionTracker │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ ChunkChangeDetector │ │
│ │ ┌─────────────────────────────────────────────────┐ │ │
│ │ │ Hash-based O(1) Chunk Comparison │ │ │
│ │ │ • Build hash maps (old_chunks, new_chunks) │ │ │
│ │ │ • Categorize: ADDED, REMOVED, UNCHANGED │ │ │
│ │ │ • Detect REORDERED chunks │ │ │
│ │ └─────────────────────────────────────────────────┘ │ │
│ └───────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ ChunkerRegistry │
│ ┌──────────────────┐ ┌───────────────────┐ │
│ │ RecursiveChunker │ │ CharacterChunker │ │
│ │ • LangChain │ │ • Fixed-size │ │
│ │ • Semantic │ │ • Simple │ │
│ │ • Smart splits │ │ • Fast │ │
│ └──────────────────┘ └───────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Storage Layer │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ SQLite │ │ Supabase │ │
│ │ • Batch ops │ │ • Batch ops │ │
│ │ • Compressed │ │ • Compressed │ │
│ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
Change Detection Algorithm¶
# Simplified pseudocode
def detect_chunk_changes(old_chunks, new_chunks):
# Build hash maps for O(1) lookup
old_map = {chunk.content_hash: chunk for chunk in old_chunks}
new_map = {chunk.content_hash: chunk for chunk in new_chunks}
added = []
removed = []
unchanged = []
reordered = []
# Check new chunks
for new_chunk in new_chunks:
if new_chunk.content_hash not in old_map:
added.append(new_chunk) # New content
else:
old_chunk = old_map[new_chunk.content_hash]
if old_chunk.chunk_index == new_chunk.chunk_index:
unchanged.append(new_chunk) # Same content, same position
else:
reordered.append(new_chunk) # Same content, different position
# Check old chunks for removals
for old_chunk in old_chunks:
if old_chunk.content_hash not in new_map:
removed.append(old_chunk) # Content was deleted
return ChunkDiff(
added_chunks=added,
removed_chunks=removed,
unchanged_chunks=unchanged,
reordered_chunks=reordered
)
Time Complexity: O(n + m) where n = old chunks, m = new chunks
Configuration Reference¶
ChunkingConfig Parameters¶
| Parameter | Type | Default | Description |
|---|---|---|---|
enabled |
bool | False |
Enable chunk-level tracking (opt-in) |
chunk_size |
int | 500 |
Target size per chunk (tokens or characters) |
chunk_overlap |
int | 50 |
Overlap between chunks for context preservation |
splitter_type |
str | "recursive" |
Chunking strategy: "recursive", "character" |
store_chunk_content |
bool | True |
Store chunk content in database |
Chunking Strategies¶
1. Recursive Text Splitter (Recommended)¶
# Uses LangChain's RecursiveCharacterTextSplitter
chunk_config = ChunkingConfig(
splitter_type="recursive",
chunk_size=500,
chunk_overlap=50
)
Pros: - Respects semantic boundaries (paragraphs, sentences) - Better chunk quality for embeddings - Integrates with LangChain ecosystem
Cons: - Slightly slower than character-based - Requires LangChain installation
Fallback: If LangChain is unavailable, falls back to paragraph-based splitting.
2. Character Chunker¶
# Simple character-based chunking
chunk_config = ChunkingConfig(
splitter_type="character",
chunk_size=500,
chunk_overlap=50
)
Pros: - Fast - No dependencies - Predictable chunk sizes
Cons: - May split mid-sentence - Lower semantic quality
Integration Patterns¶
Pattern 1: Auto-Sync on File Changes¶
from ragversion import AsyncVersionTracker
from ragversion.integrations.langchain import LangChainSync
# Setup tracker with chunk tracking
tracker = AsyncVersionTracker(
storage=storage,
chunk_tracking_enabled=True
)
# Setup auto-sync with smart chunk updates
sync = LangChainSync(
tracker=tracker,
text_splitter=text_splitter,
embeddings=embeddings,
vectorstore=vectorstore,
enable_chunk_tracking=True
)
# All tracked changes will automatically sync with smart updates
await tracker.track_with_chunks("document.txt")
# → Only changed chunks are re-embedded
Pattern 2: Manual Chunk Diff Analysis¶
# Track document
event, chunk_diff = await tracker.track_with_chunks("document.txt")
if chunk_diff:
# Analyze changes
for added_chunk in chunk_diff.added_chunks:
print(f"New chunk {added_chunk.chunk_index}: {added_chunk.token_count} tokens")
# Calculate cost savings
metrics = tracker.chunk_detector.calculate_savings_metrics(chunk_diff)
print(f"Cost savings: {metrics['savings_percentage']:.1f}%")
print(f"Chunks to embed: {metrics['chunks_to_embed']}")
Pattern 3: Batch Processing with Chunk Tracking¶
# Track entire directory
result = await tracker.track_directory("./docs", max_workers=4)
# Get chunk diffs for all modified documents
for event in result.successful:
if event.change_type == ChangeType.MODIFIED:
chunk_diff = await tracker.get_chunk_diff(
event.document_id,
event.version_number - 1,
event.version_number
)
if chunk_diff:
print(f"{event.file_name}: {chunk_diff.savings_percentage:.1f}% savings")
Cost Savings Analysis¶
Real-World Examples¶
Example 1: Documentation Update¶
Scenario: 100-page technical document, 1-paragraph change
Without chunk tracking:
- Total chunks: 500
- Chunks to embed: 500
- Cost: $2.50 (@ $0.005 per chunk)
With chunk tracking:
- Total chunks: 500
- Chunks to embed: 2 (only changed paragraphs)
- Cost: $0.01
- Savings: 99.6%
Example 2: Code Repository¶
Scenario: 50 Python files, 10 files modified
Without chunk tracking:
- Total files: 50
- Files to re-embed: 50
- Average chunks per file: 20
- Total chunks to embed: 1,000
- Cost: $5.00
With chunk tracking:
- Total files: 50
- Modified files: 10
- Average changed chunks per file: 3
- Total chunks to embed: 30
- Cost: $0.15
- Savings: 97%
Cost Calculation Formula¶
# Embedding cost savings
total_chunks = len(added) + len(unchanged) + len(reordered)
unchanged_count = len(unchanged) + len(reordered)
savings_percentage = (unchanged_count / total_chunks) * 100
# Dollar savings (example with OpenAI ada-002)
cost_per_chunk = 0.0001 # $0.0001 per 1K tokens
average_tokens_per_chunk = 500
total_cost_without_chunks = total_chunks * cost_per_chunk
total_cost_with_chunks = len(added) * cost_per_chunk
savings_dollars = total_cost_without_chunks - total_cost_with_chunks
Database Schema¶
Chunks Table¶
CREATE TABLE chunks (
id TEXT PRIMARY KEY,
document_id TEXT NOT NULL,
version_id TEXT NOT NULL,
chunk_index INTEGER NOT NULL,
content_hash TEXT NOT NULL,
token_count INTEGER NOT NULL,
created_at TEXT NOT NULL,
metadata TEXT DEFAULT '{}',
FOREIGN KEY (document_id) REFERENCES documents(id) ON DELETE CASCADE,
FOREIGN KEY (version_id) REFERENCES versions(id) ON DELETE CASCADE,
UNIQUE(version_id, chunk_index)
);
CREATE INDEX idx_chunks_document_id ON chunks(document_id);
CREATE INDEX idx_chunks_version_id ON chunks(version_id);
CREATE INDEX idx_chunks_content_hash ON chunks(content_hash);
CREATE INDEX idx_chunks_doc_version ON chunks(document_id, version_id);
Chunk Content Table¶
CREATE TABLE chunk_content (
chunk_id TEXT PRIMARY KEY,
content BLOB NOT NULL,
compressed INTEGER DEFAULT 1,
created_at TEXT NOT NULL,
FOREIGN KEY (chunk_id) REFERENCES chunks(id) ON DELETE CASCADE
);
Migration Guide¶
Migrating Existing Documents¶
Step 1: Run Database Migration
For SQLite (automatic):
# Migrations run automatically on initialize()
storage = SQLiteStorage(db_path="ragversion.db")
await storage.initialize() # Creates chunk tables if needed
For Supabase (manual):
Step 2: Enable Chunk Tracking
Update ragversion.yaml:
Step 3: Re-track Documents
# Re-track existing documents to create chunks
documents = await tracker.list_documents(limit=1000)
for doc in documents:
latest_version = await tracker.get_latest_version(doc.id)
if latest_version:
# This will create chunks for the latest version
event, chunk_diff = await tracker.track_with_chunks(doc.file_path)
if chunk_diff:
print(f"Created {len(chunk_diff.added_chunks)} chunks for {doc.file_name}")
Backward Compatibility¶
Chunk tracking is 100% backward compatible:
- Default disabled: Existing installations work unchanged
- Opt-in activation: Enable via configuration
- Graceful degradation: Falls back to document-level tracking if chunks unavailable
- No breaking changes: All existing APIs remain functional
Performance Optimization¶
Batch Operations¶
Chunk operations use optimized batch inserts:
# SQLite: executemany for 1000+ chunks in <1 second
await storage.create_chunks_batch(chunks) # Efficient
# vs. slow individual inserts
for chunk in chunks:
await storage.create_chunk(chunk) # Avoid this
Indexing Strategy¶
The chunk tables use strategic indexes for performance:
-- Primary lookups
idx_chunks_version_id -- Get all chunks for a version
idx_chunks_content_hash -- Find duplicate chunks
-- Composite indexes
idx_chunks_doc_version -- Get chunks for doc + version
Compression¶
Chunk content is compressed by default using gzip:
# Automatic compression (default)
chunk_config = ChunkingConfig(store_chunk_content=True)
# Compression reduces storage by ~70% for text
original_size = 500 bytes
compressed_size = 150 bytes # Typical ratio
Troubleshooting¶
Issue: Chunk tracking not working¶
Symptom: get_chunk_diff() returns None
Solution:
# Check if chunk tracking is enabled
print(tracker.chunk_tracking_enabled) # Should be True
# Check configuration
print(tracker.chunk_config.enabled) # Should be True
# Verify chunks are being created
chunks = await storage.get_chunks_by_version(version_id)
print(f"Found {len(chunks)} chunks")
Issue: Import errors with LangChain¶
Symptom: ImportError: No module named 'langchain_text_splitters'
Solution:
# Install LangChain dependencies
pip install 'ragversion[langchain]'
# Or install manually
pip install langchain-text-splitters
The chunker will automatically fall back to simple splitting if LangChain is unavailable.
Issue: High memory usage with large documents¶
Symptom: Memory errors when tracking large PDFs
Solution:
# Increase chunk size to reduce chunk count
chunk_config = ChunkingConfig(
chunk_size=1000, # Larger chunks
chunk_overlap=100
)
# Or disable chunk content storage
chunk_config = ChunkingConfig(
store_chunk_content=False # Only store hashes
)
API Reference¶
AsyncVersionTracker¶
class AsyncVersionTracker:
def __init__(
self,
storage: BaseStorage,
chunk_tracking_enabled: bool = False,
chunk_config: Optional[ChunkingConfig] = None,
**kwargs
)
async def track_with_chunks(
self,
file_path: str,
metadata: Optional[Dict[str, Any]] = None
) -> tuple[Optional[ChangeEvent], Optional[ChunkDiff]]
async def get_chunk_diff(
self,
document_id: UUID,
from_version: int,
to_version: int
) -> Optional[ChunkDiff]
ChunkDiff¶
class ChunkDiff:
document_id: UUID
from_version: int
to_version: int
added_chunks: List[Chunk]
removed_chunks: List[Chunk]
unchanged_chunks: List[Chunk]
reordered_chunks: List[Chunk]
@property
def total_changes(self) -> int
@property
def savings_percentage(self) -> float
LangChainSync¶
class LangChainSync:
def __init__(
self,
tracker: AsyncVersionTracker,
text_splitter: TextSplitter,
embeddings: Embeddings,
vectorstore: VectorStore,
enable_chunk_tracking: bool = False
)
Best Practices¶
1. Choose Appropriate Chunk Size¶
# For long-form documents (articles, books)
chunk_size = 1000 # Larger context
# For code and structured data
chunk_size = 500 # Smaller, more granular
# For short documents (emails, tweets)
chunk_size = 200 # Very granular
2. Use Overlap for Context Preservation¶
3. Monitor Cost Savings¶
# Log savings for visibility
chunk_diff = await tracker.get_chunk_diff(doc_id, 1, 2)
if chunk_diff:
metrics = tracker.chunk_detector.calculate_savings_metrics(chunk_diff)
logger.info(f"Cost savings: {metrics['savings_percentage']:.1f}%")
4. Test Before Production¶
# Test with dry-run mode
for doc in test_documents:
event, chunk_diff = await tracker.track_with_chunks(doc)
if chunk_diff:
print(f"{doc}: {chunk_diff.savings_percentage:.1f}% savings")
Future Enhancements¶
Planned for v0.11.0¶
- Embedding deduplication: Reuse embeddings for identical chunks across documents
- Semantic chunking: Use embedding similarity for smarter chunk boundaries
- Chunk version compression: Store only deltas between chunk versions
Planned for v0.12.0¶
- A/B testing: Compare chunking strategies with metrics
- Auto-optimization: Automatically tune chunk size based on content type
- Retry logic: Resilient sync with exponential backoff
FAQ¶
Q: Does chunk tracking work with all vector stores?
A: Yes, as long as the vector store supports: - Adding documents with metadata - Deleting documents by metadata filter (for chunk removal)
Tested with: Chroma, Pinecone, Weaviate, Qdrant, FAISS
Q: Can I use chunk tracking without integrations?
A: Yes! Use track_with_chunks() and get_chunk_diff() directly, then handle syncing manually.
Q: What happens if I disable chunk tracking after enabling it?
A: The system gracefully falls back to document-level tracking. Existing chunks remain in the database but are not used.
Q: How much storage does chunk tracking add?
A: Approximately 2x the document size (chunks + chunk_content tables). Compression reduces this significantly.
Q: Does chunk tracking slow down document tracking?
A: Minimal impact (<5% slower) due to: - Batch insert optimizations - Parallel chunk processing - Efficient hash-based comparison
Support¶
- Issues: GitHub Issues
- Discussions: GitHub Discussions
- Documentation: RAGVersion Docs
License¶
MIT License - see LICENSE file for details.