Code Examples¶
Practical code examples and patterns for common Django Chain use cases.
🏗️ Basic Patterns¶
Simple Prompt and Workflow¶
from django_chain.models import Prompt, Workflow
# Create a basic prompt
prompt = Prompt.objects.create(
name="greeting",
prompt_template={
"langchain_type": "PromptTemplate",
"template": "Say hello to {name} in a {style} way.",
"input_variables": ["name", "style"]
},
input_variables=["name", "style"],
description="Generate personalized greetings"
)
# Create a simple workflow
workflow = Workflow.objects.create(
name="greeting_workflow",
prompt=prompt,
workflow_definition=[
{"type": "prompt", "name": "greeting"},
{"type": "llm", "provider": "openai"},
{"type": "parser", "parser_type": "str"}
]
)
# Execute the workflow
chain = workflow.to_langchain_chain()
result = chain.invoke({
"name": "Alice",
"style": "professional"
})
print(result) # "Good morning Alice, I hope you're having a productive day."
Chat Prompt Template¶
# More complex chat-based prompt
chat_prompt = Prompt.objects.create(
name="customer_service_chat",
prompt_template={
"langchain_type": "ChatPromptTemplate",
"messages": [
{
"role": "system",
"content": "You are a helpful customer service representative for {company}. Be polite, professional, and helpful."
},
{
"role": "human",
"content": "{customer_message}"
}
]
},
input_variables=["company", "customer_message"],
description="Customer service chat template"
)
# Use in workflow
workflow = Workflow.objects.create(
name="customer_service",
prompt=chat_prompt,
workflow_definition=[
{"type": "prompt", "name": "customer_service_chat"},
{"type": "llm", "provider": "openai"},
{"type": "parser", "parser_type": "str"}
]
)
result = chain.invoke({
"company": "TechCorp",
"customer_message": "I'm having trouble logging into my account"
})
🔄 Advanced Workflow Patterns¶
Multi-Step Workflow with JSON Output¶
# Create a prompt that expects JSON output
analysis_prompt = Prompt.objects.create(
name="sentiment_analysis",
prompt_template={
"langchain_type": "PromptTemplate",
"template": """Analyze the sentiment of this text and return a JSON response.
Text: {text}
Return your analysis in this exact JSON format:
{
"sentiment": "positive|negative|neutral",
"confidence": 0.95,
"emotions": ["happy", "excited"],
"summary": "Brief explanation"
}""",
"input_variables": ["text"]
},
input_variables=["text"]
)
# Workflow with JSON parser
sentiment_workflow = Workflow.objects.create(
name="sentiment_analysis",
prompt=analysis_prompt,
workflow_definition=[
{"type": "prompt", "name": "sentiment_analysis"},
{"type": "llm", "provider": "openai"},
{"type": "parser", "parser_type": "json"}
]
)
# Execute and get structured data
chain = sentiment_workflow.to_langchain_chain()
result = chain.invoke({
"text": "I absolutely love this new feature! It's amazing!"
})
print(result["sentiment"]) # "positive"
print(result["confidence"]) # 0.95
print(result["emotions"]) # ["happy", "excited"]
Conditional Workflow Logic¶
def create_smart_routing_workflow():
"""Create a workflow that routes queries to different specialists."""
# Classification prompt
classifier_prompt = Prompt.objects.create(
name="query_classifier",
prompt_template={
"langchain_type": "PromptTemplate",
"template": """Classify this customer query into one of these categories:
- technical: Technical support issues
- billing: Billing and payment questions
- general: General information requests
Query: {query}
Category:""",
"input_variables": ["query"]
},
input_variables=["query"]
)
# Technical support prompt
tech_prompt = Prompt.objects.create(
name="technical_support",
prompt_template={
"langchain_type": "PromptTemplate",
"template": """You are a technical support specialist. Help with this issue:
{query}
Provide step-by-step troubleshooting instructions.""",
"input_variables": ["query"]
},
input_variables=["query"]
)
# Billing support prompt
billing_prompt = Prompt.objects.create(
name="billing_support",
prompt_template={
"langchain_type": "PromptTemplate",
"template": """You are a billing specialist. Help with this billing question:
{query}
Provide clear information about billing policies and next steps.""",
"input_variables": ["query"]
},
input_variables=["query"]
)
# Create classification workflow
classifier_workflow = Workflow.objects.create(
name="query_classifier",
prompt=classifier_prompt,
workflow_definition=[
{"type": "prompt", "name": "query_classifier"},
{"type": "llm", "provider": "openai", "temperature": 0.1},
{"type": "parser", "parser_type": "str"}
]
)
return classifier_workflow, tech_prompt, billing_prompt
def route_customer_query(query):
"""Route customer query to appropriate specialist."""
classifier_workflow, tech_prompt, billing_prompt = create_smart_routing_workflow()
# Classify the query
classifier_chain = classifier_workflow.to_langchain_chain()
category = classifier_chain.invoke({"query": query}).strip().lower()
# Route to appropriate specialist
if "technical" in category:
specialist_workflow = Workflow.objects.create(
name=f"tech_response_{uuid.uuid4()}",
prompt=tech_prompt,
workflow_definition=[
{"type": "prompt", "name": "technical_support"},
{"type": "llm", "provider": "openai"},
{"type": "parser", "parser_type": "str"}
]
)
elif "billing" in category:
specialist_workflow = Workflow.objects.create(
name=f"billing_response_{uuid.uuid4()}",
prompt=billing_prompt,
workflow_definition=[
{"type": "prompt", "name": "billing_support"},
{"type": "llm", "provider": "openai"},
{"type": "parser", "parser_type": "str"}
]
)
else:
# Use general support
specialist_workflow = Workflow.objects.get(name="customer_support_workflow")
# Get specialist response
specialist_chain = specialist_workflow.to_langchain_chain()
response = specialist_chain.invoke({"query": query})
return {
"category": category,
"response": response,
"specialist": specialist_workflow.name
}
💬 Chat and Memory Patterns¶
Conversation with Memory¶
from django_chain.utils.memory_manager import get_langchain_memory, save_messages_to_session
from django_chain.models import ChatSession, ChatHistory
def create_conversational_workflow():
"""Create a workflow that maintains conversation context."""
conversation_prompt = Prompt.objects.create(
name="conversational_assistant",
prompt_template={
"langchain_type": "ChatPromptTemplate",
"messages": [
{
"role": "system",
"content": "You are a helpful assistant. Use the conversation history to provide contextual responses."
},
{
"role": "human",
"content": "{input}"
}
]
},
input_variables=["input"]
)
return Workflow.objects.create(
name="conversational_assistant",
prompt=conversation_prompt,
workflow_definition=[
{"type": "prompt", "name": "conversational_assistant"},
{"type": "llm", "provider": "openai"},
{"type": "parser", "parser_type": "str"}
]
)
def chat_with_memory(user, message, session_id=None):
"""Chat with persistent memory using Django models."""
# Get or create session
if session_id:
try:
session = ChatSession.objects.get(session_id=session_id, user=user)
except ChatSession.DoesNotExist:
session = ChatSession.objects.create(
session_id=session_id,
user=user,
title=f"Chat Session {str(session_id)[:8]}"
)
else:
session = ChatSession.objects.create(
user=user,
title="New Chat Session"
)
# Add user message to session
session.add_message(message, role="USER")
# Get workflow
workflow = Workflow.objects.get(name="conversational_assistant", is_active=True)
# Execute with memory - memory is automatically loaded from Django models
chain = workflow.to_langchain_chain(
session_id=session.session_id,
chat_input="input",
history="chat_history"
)
# Execute the chain
response = chain.invoke({"input": message})
# Add AI response to session
session.add_message(str(response), role="ASSISTANT")
return {
"response": str(response),
"session_id": str(session.session_id),
"message_count": session.get_message_count()
}
# Alternative: Using the memory manager directly
def chat_with_direct_memory(session_id, message):
"""Example using memory manager directly."""
# Get Django-backed memory (default provider)
memory = get_langchain_memory(session_id, memory_type="buffer")
# Add user message
memory.chat_memory.add_user_message(message)
# Get LLM response (simplified)
from django_chain.utils.llm_client import create_llm_chat_client
llm = create_llm_chat_client("openai")
# Use memory in conversation
conversation_history = memory.chat_memory.messages
response = llm.invoke([
*conversation_history,
{"role": "user", "content": message}
])
# Add AI response to memory
memory.chat_memory.add_ai_message(response.content)
return response.content
Multi-User Chat Rooms¶
class ChatRoomManager:
"""Manage multi-user chat rooms with LLM moderation."""
def __init__(self):
self.moderation_workflow = self.create_moderation_workflow()
self.summary_workflow = self.create_summary_workflow()
def create_moderation_workflow(self):
"""Create a workflow to moderate chat messages."""
moderation_prompt = Prompt.objects.create(
name="chat_moderation",
prompt_template={
"langchain_type": "PromptTemplate",
"template": """Analyze this chat message for inappropriate content:
Message: {message}
User: {username}
Context: {context}
Return JSON:
{
"appropriate": true/false,
"reason": "explanation if inappropriate",
"severity": "low|medium|high",
"suggested_action": "none|warn|timeout|ban"
}""",
"input_variables": ["message", "username", "context"]
},
input_variables=["message", "username", "context"]
)
return Workflow.objects.create(
name="chat_moderation",
prompt=moderation_prompt,
workflow_definition=[
{"type": "prompt", "name": "chat_moderation"},
{"type": "llm", "provider": "openai", "temperature": 0.1},
{"type": "parser", "parser_type": "json"}
]
)
def create_summary_workflow(self):
"""Create a workflow to summarize chat conversations."""
summary_prompt = Prompt.objects.create(
name="chat_summary",
prompt_template={
"langchain_type": "PromptTemplate",
"template": """Summarize this chat conversation:
{conversation}
Provide a brief summary of:
1. Main topics discussed
2. Key decisions or conclusions
3. Action items (if any)
4. Overall sentiment
Summary:""",
"input_variables": ["conversation"]
},
input_variables=["conversation"]
)
return Workflow.objects.create(
name="chat_summary",
prompt=summary_prompt,
workflow_definition=[
{"type": "prompt", "name": "chat_summary"},
{"type": "llm", "provider": "openai"},
{"type": "parser", "parser_type": "str"}
]
)
def moderate_message(self, message, user, context=""):
"""Moderate a chat message."""
chain = self.moderation_workflow.to_langchain_chain()
result = chain.invoke({
"message": message,
"username": user.username,
"context": context
})
return result
def summarize_conversation(self, session_id, hours=24):
"""Summarize recent conversation in a chat room."""
since = timezone.now() - timedelta(hours=hours)
messages = ChatHistory.objects.filter(
session__session_id=session_id,
created_at__gte=since
).order_by('created_at')
conversation = "\n".join([
f"{msg.role}: {msg.content}"
for msg in messages
])
chain = self.summary_workflow.to_langchain_chain()
summary = chain.invoke({"conversation": conversation})
return summary
🔍 Vector Store and RAG Patterns¶
Document Processing Pipeline¶
from django_chain.services.vector_store_manager import VectorStoreManager
from django_chain.utils.llm_client import create_llm_embedding_client
import os
from pathlib import Path
class DocumentProcessor:
"""Process and index documents for RAG."""
def __init__(self):
self.vector_manager = VectorStoreManager()
self.embedding_client = create_llm_embedding_client()
def process_text_file(self, file_path, metadata=None):
"""Process a text file and add to vector store."""
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Split into chunks
chunks = self.split_text(content, chunk_size=1000, overlap=200)
# Prepare metadata
base_metadata = {
"source": str(file_path),
"filename": Path(file_path).name,
"type": "text_file"
}
if metadata:
base_metadata.update(metadata)
# Add to vector store
metadatas = [base_metadata.copy() for _ in chunks]
for i, meta in enumerate(metadatas):
meta["chunk_id"] = i
meta["total_chunks"] = len(chunks)
self.vector_manager.add_documents_to_store(
texts=chunks,
metadatas=metadatas,
embedding_function=self.embedding_client
)
return len(chunks)
def split_text(self, text, chunk_size=1000, overlap=200):
"""Split text into overlapping chunks."""
words = text.split()
chunks = []
for i in range(0, len(words), chunk_size - overlap):
chunk = " ".join(words[i:i + chunk_size])
chunks.append(chunk)
if i + chunk_size >= len(words):
break
return chunks
def process_directory(self, directory_path, file_patterns=None):
"""Process all files in a directory."""
if file_patterns is None:
file_patterns = ["*.txt", "*.md", "*.py", "*.js"]
directory = Path(directory_path)
processed_files = 0
for pattern in file_patterns:
for file_path in directory.rglob(pattern):
if file_path.is_file():
try:
chunks = self.process_text_file(
file_path,
metadata={"category": pattern[2:]} # Remove *.
)
processed_files += 1
print(f"Processed {file_path}: {chunks} chunks")
except Exception as e:
print(f"Error processing {file_path}: {e}")
return processed_files
# Usage
processor = DocumentProcessor()
processor.process_directory("/path/to/docs", ["*.md", "*.txt"])
Advanced RAG with Re-ranking¶
class AdvancedRAGSystem:
"""Advanced RAG system with re-ranking and source attribution."""
def __init__(self):
self.vector_manager = VectorStoreManager()
self.embedding_client = create_llm_embedding_client()
self.rerank_workflow = self.create_rerank_workflow()
self.rag_workflow = self.create_rag_workflow()
def create_rerank_workflow(self):
"""Create workflow to re-rank search results."""
rerank_prompt = Prompt.objects.create(
name="rerank_documents",
prompt_template={
"langchain_type": "PromptTemplate",
"template": """Given a query and a list of documents, rank the documents by relevance to the query.
Query: {query}
Documents:
{documents}
Return a JSON list of document indices ordered by relevance (most relevant first):
[2, 0, 1, 3]""",
"input_variables": ["query", "documents"]
},
input_variables=["query", "documents"]
)
return Workflow.objects.create(
name="rerank_documents",
prompt=rerank_prompt,
workflow_definition=[
{"type": "prompt", "name": "rerank_documents"},
{"type": "llm", "provider": "openai", "temperature": 0.1},
{"type": "parser", "parser_type": "json"}
]
)
def create_rag_workflow(self):
"""Create RAG workflow with source attribution."""
rag_prompt = Prompt.objects.create(
name="rag_with_sources",
prompt_template={
"langchain_type": "PromptTemplate",
"template": """Use the following context to answer the question. Include specific source references in your answer.
Context:
{context}
Question: {question}
Instructions:
1. Answer the question using the provided context
2. Include source references like [Source: filename.txt]
3. If the context doesn't contain enough information, say so
4. Be specific and cite relevant sources
Answer:""",
"input_variables": ["context", "question"]
},
input_variables=["context", "question"]
)
return Workflow.objects.create(
name="rag_with_sources",
prompt=rag_prompt,
workflow_definition=[
{"type": "prompt", "name": "rag_with_sources"},
{"type": "llm", "provider": "openai"},
{"type": "parser", "parser_type": "str"}
]
)
def search_and_rerank(self, query, k=10, rerank_top=5):
"""Search documents and re-rank results."""
# Initial vector search
results = self.vector_manager.search_documents(
query=query,
k=k,
embedding_function=self.embedding_client
)
if not results:
return []
# Prepare documents for re-ranking
documents_text = "\n\n".join([
f"Document {i}: {result.page_content[:500]}..."
for i, result in enumerate(results)
])
# Re-rank using LLM
rerank_chain = self.rerank_workflow.to_langchain_chain()
rerank_indices = rerank_chain.invoke({
"query": query,
"documents": documents_text
})
# Reorder results
reranked_results = []
for idx in rerank_indices[:rerank_top]:
if 0 <= idx < len(results):
reranked_results.append(results[idx])
return reranked_results
def answer_with_sources(self, question, k=10):
"""Answer question with source attribution."""
# Search and re-rank
results = self.search_and_rerank(question, k=k, rerank_top=5)
if not results:
return {
"answer": "I don't have enough information to answer this question.",
"sources": [],
"confidence": 0.0
}
# Prepare context with source information
context_parts = []
sources = []
for i, result in enumerate(results):
source_info = {
"filename": result.metadata.get("filename", "unknown"),
"source": result.metadata.get("source", "unknown"),
"chunk_id": result.metadata.get("chunk_id", 0)
}
sources.append(source_info)
context_parts.append(
f"[Source: {source_info['filename']}]\n{result.page_content}"
)
context = "\n\n".join(context_parts)
# Generate answer
rag_chain = self.rag_workflow.to_langchain_chain()
answer = rag_chain.invoke({
"context": context,
"question": question
})
return {
"answer": answer,
"sources": sources,
"confidence": len(results) / k # Simple confidence score
}
# Usage
rag_system = AdvancedRAGSystem()
result = rag_system.answer_with_sources("How do I configure Django settings?")
print(f"Answer: {result['answer']}")
print(f"Sources: {[s['filename'] for s in result['sources']]}")
print(f"Confidence: {result['confidence']:.2f}")
📊 Analytics and Monitoring Patterns¶
Custom Analytics Dashboard¶
from django.db.models import Count, Avg, Sum, Q
from django.utils import timezone
from datetime import timedelta
import json
class LLMAnalytics:
"""Comprehensive LLM usage analytics."""
def get_usage_metrics(self, days=30):
"""Get comprehensive usage metrics."""
since = timezone.now() - timedelta(days=days)
logs = InteractionLog.objects.filter(created_at__gte=since)
metrics = {
"overview": self._get_overview_metrics(logs),
"performance": self._get_performance_metrics(logs),
"costs": self._get_cost_metrics(logs),
"errors": self._get_error_metrics(logs),
"workflows": self._get_workflow_metrics(logs),
"users": self._get_user_metrics(logs)
}
return metrics
def _get_overview_metrics(self, logs):
"""Get overview metrics."""
total = logs.count()
successful = logs.filter(status="SUCCESS").count()
failed = logs.filter(status="FAILURE").count()
return {
"total_requests": total,
"successful_requests": successful,
"failed_requests": failed,
"success_rate": (successful / total * 100) if total > 0 else 0,
"total_tokens": logs.aggregate(
total=Sum('input_tokens') + Sum('output_tokens')
)['total'] or 0
}
def _get_performance_metrics(self, logs):
"""Get performance metrics."""
return {
"average_latency": logs.aggregate(Avg('latency_ms'))['latency_ms__avg'] or 0,
"p95_latency": self._percentile(logs, 'latency_ms', 95),
"p99_latency": self._percentile(logs, 'latency_ms', 99),
"requests_per_hour": self._get_requests_per_hour(logs)
}
def _get_cost_metrics(self, logs):
"""Calculate cost metrics."""
# OpenAI pricing (update as needed)
pricing = {
"gpt-3.5-turbo": {"input": 0.0015, "output": 0.002},
"gpt-4": {"input": 0.03, "output": 0.06},
"text-embedding-ada-002": {"input": 0.0001, "output": 0}
}
total_cost = 0
model_costs = {}
for log in logs:
model = log.model_name
if model in pricing:
input_cost = (log.input_tokens / 1000) * pricing[model]["input"]
output_cost = (log.output_tokens / 1000) * pricing[model]["output"]
cost = input_cost + output_cost
total_cost += cost
model_costs[model] = model_costs.get(model, 0) + cost
return {
"total_cost": total_cost,
"cost_by_model": model_costs,
"average_cost_per_request": total_cost / logs.count() if logs.count() > 0 else 0
}
def _get_error_metrics(self, logs):
"""Get error analysis."""
error_logs = logs.filter(status="FAILURE")
error_types = error_logs.values('error_message').annotate(
count=Count('id')
).order_by('-count')[:10]
return {
"total_errors": error_logs.count(),
"error_rate": (error_logs.count() / logs.count() * 100) if logs.count() > 0 else 0,
"top_error_types": list(error_types)
}
def _get_workflow_metrics(self, logs):
"""Get workflow usage metrics."""
workflow_stats = logs.values('workflow__name').annotate(
count=Count('id'),
avg_latency=Avg('latency_ms'),
success_rate=Count('id', filter=Q(status="SUCCESS")) * 100.0 / Count('id')
).order_by('-count')
return list(workflow_stats)
def _get_user_metrics(self, logs):
"""Get user usage metrics."""
user_stats = logs.filter(user__isnull=False).values('user__username').annotate(
count=Count('id'),
total_tokens=Sum('input_tokens') + Sum('output_tokens')
).order_by('-count')[:20]
return list(user_stats)
def _percentile(self, queryset, field, percentile):
"""Calculate percentile for a field."""
values = list(queryset.values_list(field, flat=True))
if not values:
return 0
values.sort()
k = (len(values) - 1) * percentile / 100
f = int(k)
c = k - f
if f + 1 < len(values):
return values[f] * (1 - c) + values[f + 1] * c
else:
return values[f]
def _get_requests_per_hour(self, logs):
"""Get requests per hour over time."""
# Group by hour
from django.db.models import DateTimeField
from django.db.models.functions import TruncHour
hourly_stats = logs.annotate(
hour=TruncHour('created_at')
).values('hour').annotate(
count=Count('id')
).order_by('hour')
return list(hourly_stats)
# Usage in a view
def analytics_api(request):
"""API endpoint for analytics data."""
days = int(request.GET.get('days', 7))
analytics = LLMAnalytics()
metrics = analytics.get_usage_metrics(days=days)
return JsonResponse(metrics)
Real-time Monitoring¶
from django.core.cache import cache
from django.utils import timezone
import json
class RealTimeMonitor:
"""Real-time monitoring for LLM operations."""
def __init__(self):
self.cache_timeout = 300 # 5 minutes
def track_request(self, workflow_name, latency_ms, tokens, status):
"""Track a request in real-time."""
now = timezone.now()
minute_key = now.strftime("%Y%m%d%H%M")
# Update minute-level metrics
metrics_key = f"llm_metrics_{minute_key}"
metrics = cache.get(metrics_key, {
"requests": 0,
"total_latency": 0,
"total_tokens": 0,
"errors": 0,
"workflows": {}
})
metrics["requests"] += 1
metrics["total_latency"] += latency_ms
metrics["total_tokens"] += tokens
if status == "FAILURE":
metrics["errors"] += 1
# Track workflow-specific metrics
if workflow_name not in metrics["workflows"]:
metrics["workflows"][workflow_name] = {
"requests": 0,
"total_latency": 0,
"errors": 0
}
workflow_metrics = metrics["workflows"][workflow_name]
workflow_metrics["requests"] += 1
workflow_metrics["total_latency"] += latency_ms
if status == "FAILURE":
workflow_metrics["errors"] += 1
cache.set(metrics_key, metrics, self.cache_timeout)
# Update current metrics
self._update_current_metrics(metrics)
def _update_current_metrics(self, metrics):
"""Update current real-time metrics."""
current = {
"requests_per_minute": metrics["requests"],
"average_latency": metrics["total_latency"] / metrics["requests"] if metrics["requests"] > 0 else 0,
"tokens_per_minute": metrics["total_tokens"],
"error_rate": (metrics["errors"] / metrics["requests"] * 100) if metrics["requests"] > 0 else 0,
"top_workflows": sorted(
metrics["workflows"].items(),
key=lambda x: x[1]["requests"],
reverse=True
)[:5]
}
cache.set("llm_current_metrics", current, 60) # 1 minute cache
def get_current_metrics(self):
"""Get current real-time metrics."""
return cache.get("llm_current_metrics", {
"requests_per_minute": 0,
"average_latency": 0,
"tokens_per_minute": 0,
"error_rate": 0,
"top_workflows": []
})
def get_recent_activity(self, minutes=60):
"""Get recent activity over the last N minutes."""
now = timezone.now()
activity = []
for i in range(minutes):
minute = now - timedelta(minutes=i)
minute_key = minute.strftime("%Y%m%d%H%M")
metrics_key = f"llm_metrics_{minute_key}"
metrics = cache.get(metrics_key, {
"requests": 0,
"total_latency": 0,
"errors": 0
})
activity.append({
"timestamp": minute.isoformat(),
"requests": metrics["requests"],
"average_latency": metrics["total_latency"] / metrics["requests"] if metrics["requests"] > 0 else 0,
"errors": metrics["errors"]
})
return list(reversed(activity))
# Integration with LoggingHandler
class EnhancedLoggingHandler(LoggingHandler):
"""Enhanced logging handler with real-time monitoring."""
def __init__(self):
super().__init__()
self.monitor = RealTimeMonitor()
def on_llm_end(self, response, **kwargs):
"""Enhanced LLM end callback with monitoring."""
# Call parent method
super().on_llm_end(response, **kwargs)
# Track in real-time monitor
workflow_name = kwargs.get('workflow_name', 'unknown')
latency_ms = kwargs.get('latency_ms', 0)
tokens = kwargs.get('total_tokens', 0)
self.monitor.track_request(
workflow_name=workflow_name,
latency_ms=latency_ms,
tokens=tokens,
status="SUCCESS"
)
def on_llm_error(self, error, **kwargs):
"""Enhanced LLM error callback with monitoring."""
# Call parent method
super().on_llm_error(error, **kwargs)
# Track error in real-time monitor
workflow_name = kwargs.get('workflow_name', 'unknown')
self.monitor.track_request(
workflow_name=workflow_name,
latency_ms=0,
tokens=0,
status="FAILURE"
)
🎯 Next Steps¶
These examples demonstrate common patterns and advanced use cases for Django Chain. To continue learning:
- Study the Vanilla Django Example for complete implementations
- Read the How-to Guides for specific problem-solving approaches
- Explore Advanced Topics for customization
- Check the API Reference for detailed documentation
💡 Best Practices¶
- Use descriptive names for prompts and workflows
- Version your prompts to track changes
- Monitor token usage to control costs
- Implement proper error handling for production use
- Test with fake providers before using real LLMs
- Cache responses where appropriate to reduce costs
- Sanitize sensitive data in logs and outputs