Table of Contents
Concept 01
Chat Completions API Anatomy — What Every Field Means
Every major LLM provider's API follows the same basic shape: you send a list of messages (the conversation so far), and get back a completion. Understanding each field prevents the bugs that waste hours of debugging.
The messages array is the most important parameter. Each message has a role and content. There are three roles:
- system — Instructions to the model. Sets tone, persona, constraints, output format. Processed before any user input. This is your most powerful tool.
- user — The human's message. What the user typed, or what you're programmatically sending as the task.
- assistant — The model's previous responses. When you include these, you're giving the model its own conversation history to maintain context.
# The messages array structure — memorize this
messages = [
{
"role": "system",
"content": "You are a senior Python developer. Answer concisely with code examples."
},
{
"role": "user",
"content": "How do I read a CSV file?"
},
{
"role": "assistant",
"content": "Use pandas: `df = pd.read_csv('file.csv')`"
},
{
"role": "user",
"content": "What if the CSV has no header row?"
}
# The next assistant response will be generated by the API
]
The response object is equally important to understand. Here's what comes back:
# What the response object looks like (OpenAI format)
# response = client.chat.completions.create(...)
{
"id": "chatcmpl-abc123",
"object": "chat.completion",
"created": 1712345678,
"model": "gpt-4o-mini",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "Use `header=None` parameter: pd.read_csv('file.csv', header=None)"
},
"finish_reason": "stop", # "stop", "length", "content_filter", "tool_calls"
"logprobs": null
}
],
"usage": {
"prompt_tokens": 89, # tokens in your input
"completion_tokens": 23, # tokens in the response
"total_tokens": 112 # total billed tokens
}
}
The finish_reason field is critical for production code. It tells you why the model stopped generating:
| finish_reason | Meaning | What to do |
|---|---|---|
stop | Model finished naturally | Normal — use the output |
length | Hit max_tokens limit | Response is truncated — increase max_tokens or handle partial output |
content_filter | Output was filtered | Handle gracefully — don't show empty content to users |
tool_calls | Model wants to use a tool | Execute the tool and continue the conversation |
Concept 02
Complete OpenAI Integration — Every Option You'll Actually Use
import os
from openai import OpenAI
from typing import Optional
# Initialize client — reads OPENAI_API_KEY from environment
client = OpenAI(
api_key=os.environ.get("OPENAI_API_KEY"),
# Optional: set a custom timeout (default is 10 minutes)
timeout=30.0,
# Optional: set max retries (default is 2)
max_retries=3,
)
def chat(
user_message: str,
system_prompt: str = "You are a helpful assistant.",
model: str = "gpt-4o-mini",
temperature: float = 0.7,
max_tokens: int = 1000,
conversation_history: Optional[list] = None,
) -> dict:
"""
Complete OpenAI chat completion with full parameter control.
Returns dict with content, usage stats, and finish reason.
"""
messages = [{"role": "system", "content": system_prompt}]
# Add conversation history if provided
if conversation_history:
messages.extend(conversation_history)
messages.append({"role": "user", "content": user_message})
response = client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
# Optional parameters you'll sometimes need:
# top_p=0.9, # nucleus sampling (use instead of temperature, not both)
# presence_penalty=0, # -2 to 2; positive = discuss new topics
# frequency_penalty=0, # -2 to 2; positive = avoid repeating exact words
# stop=["\n\n"], # stop generating at these sequences
# seed=42, # for reproducible outputs (beta)
# response_format={"type": "json_object"}, # force JSON output
)
choice = response.choices[0]
return {
"content": choice.message.content,
"finish_reason": choice.finish_reason,
"model": response.model,
"usage": {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens,
}
}
# Usage
result = chat(
user_message="Explain list comprehensions in Python",
system_prompt="You are a Python teacher. Use simple examples.",
temperature=0.3,
)
print(result["content"])
print(f"Used {result['usage']['total_tokens']} tokens")
Concept 03
Complete Anthropic Integration — The Key Differences
Anthropic's API has a different structure than OpenAI's. The biggest difference: the system prompt is a separate top-level parameter, not part of the messages array.
import os
import anthropic
client = anthropic.Anthropic(
api_key=os.environ.get("ANTHROPIC_API_KEY"),
)
def chat_anthropic(
user_message: str,
system_prompt: str = "You are a helpful assistant.",
model: str = "claude-3-5-sonnet-20241022",
temperature: float = 0.7,
max_tokens: int = 1000,
conversation_history: Optional[list] = None,
) -> dict:
"""
Complete Anthropic chat completion.
Note: system prompt is separate from messages array.
"""
messages = []
# Add conversation history
if conversation_history:
messages.extend(conversation_history)
messages.append({"role": "user", "content": user_message})
response = client.messages.create(
model=model,
system=system_prompt, # KEY DIFFERENCE: system is top-level, not in messages
messages=messages,
temperature=temperature,
max_tokens=max_tokens, # REQUIRED in Anthropic (optional in OpenAI)
)
# Anthropic response structure is different from OpenAI
# response.content is a list of content blocks
text_content = ""
for block in response.content:
if block.type == "text":
text_content += block.text
return {
"content": text_content,
"finish_reason": response.stop_reason, # "end_turn", "max_tokens", "stop_sequence"
"model": response.model,
"usage": {
"prompt_tokens": response.usage.input_tokens,
"completion_tokens": response.usage.output_tokens,
"total_tokens": response.usage.input_tokens + response.usage.output_tokens,
}
}
# Usage — identical interface to our OpenAI function
result = chat_anthropic(
user_message="What's the difference between a list and a tuple?",
system_prompt="You are a Python teacher. Be concise.",
temperature=0.3,
)
print(result["content"])
Concept 04
Complete Gemini Integration — Google's Different API Style
import os
import google.generativeai as genai
genai.configure(api_key=os.environ.get("GOOGLE_API_KEY"))
def chat_gemini(
user_message: str,
system_prompt: str = "You are a helpful assistant.",
model: str = "gemini-1.5-flash",
temperature: float = 0.7,
max_tokens: int = 1000,
conversation_history: Optional[list] = None,
) -> dict:
"""
Complete Gemini chat completion.
Gemini uses 'model' and 'user' roles (no 'assistant' role — use 'model').
"""
gemini_model = genai.GenerativeModel(
model_name=model,
system_instruction=system_prompt,
generation_config=genai.GenerationConfig(
temperature=temperature,
max_output_tokens=max_tokens,
)
)
# Convert conversation history to Gemini format
# Gemini uses 'user' and 'model' (not 'assistant')
history = []
if conversation_history:
for msg in conversation_history:
role = "model" if msg["role"] == "assistant" else msg["role"]
history.append({"role": role, "parts": [msg["content"]]})
chat_session = gemini_model.start_chat(history=history)
response = chat_session.send_message(user_message)
return {
"content": response.text,
"finish_reason": str(response.candidates[0].finish_reason),
"model": model,
"usage": {
# Gemini token counts
"prompt_tokens": response.usage_metadata.prompt_token_count,
"completion_tokens": response.usage_metadata.candidates_token_count,
"total_tokens": response.usage_metadata.total_token_count,
}
}
result = chat_gemini("Summarize what transformers do in 2 sentences")
print(result["content"])
Concept 05
Streaming — Because Nobody Wants to Stare at a Spinner
Streaming is non-negotiable for any user-facing feature. Without streaming, the user sees nothing for 5–15 seconds, then the full response appears. With streaming, tokens appear as they're generated — which feels instant even on slow models.
from openai import OpenAI
client = OpenAI()
def stream_chat(user_message: str, system_prompt: str = "You are a helpful assistant."):
"""
Stream response tokens as they arrive.
Yields token chunks — ideal for SSE or WebSocket endpoints.
"""
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_message},
],
stream=True, # The magic flag
)
full_response = ""
for chunk in stream:
# Each chunk contains a delta (partial content)
delta = chunk.choices[0].delta
if delta.content is not None:
full_response += delta.content
yield delta.content # Yield each token as it arrives
# Check finish reason in the last chunk
if chunk.choices[0].finish_reason:
finish_reason = chunk.choices[0].finish_reason
return full_response # Full assembled response after streaming
# Terminal demo — watch tokens appear one by one
print("Streaming response: ", end="", flush=True)
for token in stream_chat("Count from 1 to 10 slowly, one number per line"):
print(token, end="", flush=True)
print() # Final newline
# FastAPI SSE streaming endpoint
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
app = FastAPI()
@app.post("/stream")
async def stream_endpoint(request: dict):
"""Server-Sent Events endpoint for streaming LLM responses."""
async def generate():
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": request["message"]}],
stream=True,
)
for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
# SSE format: data: {json}\n\n
data = json.dumps({"token": delta.content})
yield f"data: {data}\n\n"
# Signal end of stream
yield "data: [DONE]\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Important for nginx
}
)
Concept 06
Error Handling and Retry Logic — What Breaks in Production
LLM APIs fail in predictable ways. Rate limits, timeouts, and transient server errors are all common. Naive code that doesn't handle these will have production incidents. Here's the robust pattern:
import time
import random
from openai import OpenAI, RateLimitError, APITimeoutError, APIConnectionError, APIStatusError
client = OpenAI()
def chat_with_retry(
messages: list,
model: str = "gpt-4o-mini",
max_retries: int = 3,
base_delay: float = 1.0,
) -> str:
"""
Chat completion with exponential backoff retry logic.
Handles the errors that actually happen in production.
"""
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model=model,
messages=messages,
timeout=30.0,
)
# Check for truncated output
finish_reason = response.choices[0].finish_reason
if finish_reason == "length":
# Log warning but still return what we got
print(f"WARNING: Response truncated (finish_reason=length). "
f"Consider increasing max_tokens.")
return response.choices[0].message.content
except RateLimitError as e:
# HTTP 429 — we're sending too many requests
if attempt == max_retries - 1:
raise # Re-raise on final attempt
# Extract retry-after header if available
retry_after = getattr(e, 'retry_after', None)
if retry_after:
wait_time = retry_after
else:
# Exponential backoff with jitter
wait_time = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Rate limit hit. Waiting {wait_time:.1f}s before retry {attempt + 1}/{max_retries}")
time.sleep(wait_time)
except APITimeoutError:
# Request timed out — usually safe to retry
if attempt == max_retries - 1:
raise
wait_time = base_delay * (2 ** attempt)
print(f"Timeout. Retrying in {wait_time:.1f}s...")
time.sleep(wait_time)
except APIConnectionError:
# Network connectivity issue — retry with backoff
if attempt == max_retries - 1:
raise
wait_time = base_delay * (2 ** attempt)
print(f"Connection error. Retrying in {wait_time:.1f}s...")
time.sleep(wait_time)
except APIStatusError as e:
# HTTP 4xx/5xx errors
if e.status_code == 400:
# Bad request — don't retry, fix the input
raise ValueError(f"Bad request: {e.message}") from e
elif e.status_code == 401:
raise ValueError("Invalid API key") from e
elif e.status_code >= 500:
# Server error — safe to retry
if attempt == max_retries - 1:
raise
wait_time = base_delay * (2 ** attempt)
print(f"Server error {e.status_code}. Retrying in {wait_time:.1f}s...")
time.sleep(wait_time)
else:
raise # Don't retry other 4xx errors
Concept 07
The Provider-Agnostic Wrapper — Write Once, Swap Providers Freely
This is the most important pattern in this entire course. If your application code calls OpenAI directly, switching to Claude requires rewriting hundreds of lines. If you use a thin wrapper, switching takes 10 seconds. Here's the full implementation:
import os
from typing import Optional, Generator
from dataclasses import dataclass
from enum import Enum
class LLMProvider(Enum):
OPENAI = "openai"
ANTHROPIC = "anthropic"
GEMINI = "gemini"
@dataclass
class LLMResponse:
content: str
finish_reason: str
model: str
prompt_tokens: int
completion_tokens: int
total_tokens: int
@property
def cost_estimate_usd(self) -> float:
"""Very rough cost estimate — update with current pricing."""
cost_per_1k_tokens = {
"gpt-4o": 0.0025,
"gpt-4o-mini": 0.00015,
"claude-3-5-sonnet-20241022": 0.003,
"claude-3-haiku-20240307": 0.00025,
"gemini-1.5-flash": 0.000075,
"gemini-1.5-pro": 0.00125,
}
rate = cost_per_1k_tokens.get(self.model, 0.002)
return (self.total_tokens / 1000) * rate
class LLMClient:
"""
Provider-agnostic LLM client.
Change the provider/model in config — not in application code.
"""
def __init__(
self,
provider: LLMProvider = LLMProvider.OPENAI,
model: Optional[str] = None,
default_system_prompt: str = "You are a helpful assistant.",
default_temperature: float = 0.7,
default_max_tokens: int = 1000,
):
self.provider = provider
self.default_system_prompt = default_system_prompt
self.default_temperature = default_temperature
self.default_max_tokens = default_max_tokens
# Set default models per provider
self.model = model or {
LLMProvider.OPENAI: "gpt-4o-mini",
LLMProvider.ANTHROPIC: "claude-3-haiku-20240307",
LLMProvider.GEMINI: "gemini-1.5-flash",
}[provider]
# Initialize the appropriate client
if provider == LLMProvider.OPENAI:
from openai import OpenAI as _OpenAI
self._client = _OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
elif provider == LLMProvider.ANTHROPIC:
import anthropic as _anthropic
self._client = _anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY"))
elif provider == LLMProvider.GEMINI:
import google.generativeai as genai
genai.configure(api_key=os.environ.get("GOOGLE_API_KEY"))
self._client = genai
def chat(
self,
user_message: str,
system_prompt: Optional[str] = None,
temperature: Optional[float] = None,
max_tokens: Optional[int] = None,
conversation_history: Optional[list] = None,
) -> LLMResponse:
"""Single interface for all providers."""
system = system_prompt or self.default_system_prompt
temp = temperature if temperature is not None else self.default_temperature
tokens = max_tokens or self.default_max_tokens
history = conversation_history or []
if self.provider == LLMProvider.OPENAI:
return self._chat_openai(user_message, system, temp, tokens, history)
elif self.provider == LLMProvider.ANTHROPIC:
return self._chat_anthropic(user_message, system, temp, tokens, history)
elif self.provider == LLMProvider.GEMINI:
return self._chat_gemini(user_message, system, temp, tokens, history)
def _chat_openai(self, user_message, system, temperature, max_tokens, history):
messages = [{"role": "system", "content": system}]
messages.extend(history)
messages.append({"role": "user", "content": user_message})
resp = self._client.chat.completions.create(
model=self.model, messages=messages,
temperature=temperature, max_tokens=max_tokens
)
return LLMResponse(
content=resp.choices[0].message.content,
finish_reason=resp.choices[0].finish_reason,
model=resp.model,
prompt_tokens=resp.usage.prompt_tokens,
completion_tokens=resp.usage.completion_tokens,
total_tokens=resp.usage.total_tokens,
)
def _chat_anthropic(self, user_message, system, temperature, max_tokens, history):
messages = list(history)
messages.append({"role": "user", "content": user_message})
resp = self._client.messages.create(
model=self.model, system=system, messages=messages,
temperature=temperature, max_tokens=max_tokens
)
return LLMResponse(
content=resp.content[0].text,
finish_reason=resp.stop_reason,
model=resp.model,
prompt_tokens=resp.usage.input_tokens,
completion_tokens=resp.usage.output_tokens,
total_tokens=resp.usage.input_tokens + resp.usage.output_tokens,
)
def _chat_gemini(self, user_message, system, temperature, max_tokens, history):
import google.generativeai as genai
gemini_model = genai.GenerativeModel(
model_name=self.model, system_instruction=system,
generation_config=genai.GenerationConfig(
temperature=temperature, max_output_tokens=max_tokens
)
)
gemini_history = [
{"role": "model" if m["role"] == "assistant" else m["role"],
"parts": [m["content"]]} for m in history
]
chat_session = gemini_model.start_chat(history=gemini_history)
resp = chat_session.send_message(user_message)
return LLMResponse(
content=resp.text,
finish_reason=str(resp.candidates[0].finish_reason),
model=self.model,
prompt_tokens=resp.usage_metadata.prompt_token_count,
completion_tokens=resp.usage_metadata.candidates_token_count,
total_tokens=resp.usage_metadata.total_token_count,
)
# Usage — switching providers is ONE config change
client = LLMClient(provider=LLMProvider.OPENAI)
# client = LLMClient(provider=LLMProvider.ANTHROPIC) # swap here
# client = LLMClient(provider=LLMProvider.GEMINI) # or here
response = client.chat("What is gradient descent?")
print(response.content)
print(f"Cost estimate: ${response.cost_estimate_usd:.6f}")
Concept 08
Cost Tracking and Environment Variables — Production Hygiene
Never hardcode API keys. Use environment variables, always. Here's the correct pattern for managing credentials across dev, staging, and production:
# .env file (never commit this)
OPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=sk-ant-...
GOOGLE_API_KEY=AIza...
LLM_PROVIDER=openai
LLM_MODEL=gpt-4o-mini
LLM_MAX_TOKENS=1000
LLM_TEMPERATURE=0.7
# config.py
import os
from dotenv import load_dotenv
load_dotenv() # pip install python-dotenv
class LLMConfig:
PROVIDER = os.getenv("LLM_PROVIDER", "openai")
MODEL = os.getenv("LLM_MODEL", "gpt-4o-mini")
MAX_TOKENS = int(os.getenv("LLM_MAX_TOKENS", "1000"))
TEMPERATURE = float(os.getenv("LLM_TEMPERATURE", "0.7"))
@classmethod
def validate(cls):
"""Fail fast if required env vars are missing."""
required = {
"openai": "OPENAI_API_KEY",
"anthropic": "ANTHROPIC_API_KEY",
"gemini": "GOOGLE_API_KEY",
}
key_name = required.get(cls.PROVIDER)
if key_name and not os.getenv(key_name):
raise ValueError(
f"Missing required environment variable: {key_name}\n"
f"Set it with: export {key_name}=your-key-here"
)
# Call at app startup
LLMConfig.validate()
- The messages array (system/user/assistant roles) is the core primitive of all LLM APIs
- Always check
finish_reason— truncated responses (length) are a common silent bug - Streaming is essential for UX; use
stream=Trueand yield chunks via SSE - Retry with exponential backoff for rate limits and timeouts; don't retry 400s
- Build the provider-agnostic
LLMClientwrapper — swap providers in one line - All API keys in environment variables; validate at startup with fail-fast logic