Model Context Protocol — Advanced Topics Study Guide
Table of Contents
- Model Context Protocol — Advanced Topics Study Guide
- Table of Contents
- Module 1 — Introduction & Prerequisites
- Module 2 — Core MCP Features
- 2.1 Sampling
- 2.2 Notifications
- What Are Notifications?
- Server Notifications (Server → Client)
- Client Notifications (Client → Server)
- Progress Reporting — Complete Flow
- Python Server — Sending Progress Notifications
- Logging Notifications — Severity Levels
- Log Severity Levels (Syslog Order, Most Severe Last)
- Resource Updated Notification
- Cancellation Notification
- Context Objects in Notifications
- Key Exam Points
- 2.3 Roots (Filesystem Access)
- Module 3 — Transports & Communication
- Module 4 — Assessment & Conclusion
- Quick Reference
Model Context Protocol — Advanced Topics Study Guide
Course: Anthropic “MCP Advanced Topics” | 4 Modules Last updated: 2026-04-26
Table of Contents
- Module 1 — Introduction & Prerequisites
- Module 2 — Core MCP Features
- Module 3 — Transports & Communication
- Module 4 — Assessment & Conclusion
- Quick Reference
Module 1 — Introduction & Prerequisites
Prerequisites
This course builds on the MCP Introduction course. You must already know:
- Basic MCP concepts: tools, resources, prompts
- Python async/await patterns
- JSON data format and HTTP basics
- How Claude Desktop or Claude Code connects to an MCP server
What This Course Adds
| Topic | Why It Matters |
|---|---|
| Sampling | Servers can request LLM completions — enables AI-assisted tools |
| Notifications | Real-time progress and state change events between client and server |
| Roots | Controlled filesystem access with security boundaries |
| JSON-RPC 2.0 | Wire format and message types used by all MCP transports |
| STDIO | Local transport for desktop and development use |
| Streamable HTTP | Remote transport for cloud and multi-client deployments |
| Production ops | Scaling, auth, health, error recovery for real deployments |
Mental Model
MCP Intro course: Tools + Resources + Prompts (what servers expose)
MCP Advanced: Sampling + Notifications + Roots + Transports (how they communicate)
The advanced topics focus on the communication layer — how client and server exchange messages, signal state changes, and share access to external systems securely.
Architecture Overview
┌─────────────────────────────────────────────────────────────────┐
│ MCP Architecture │
│ │
│ ┌──────────┐ Transport ┌──────────────┐ │
│ │ Client │◄──────────────►│ Server │ │
│ │ (Claude) │ JSON-RPC 2.0 │ (your tools) │ │
│ └──────────┘ └──────────────┘ │
│ │ │ │
│ Capabilities: Capabilities: │
│ - sampling (gatekeeper) - tools │
│ - roots (declares paths) - resources │
│ - notifications - prompts │
│ - human-in-the-loop - notifications │
└─────────────────────────────────────────────────────────────────┘
Protocol Version
The current stable MCP protocol version used in examples throughout this guide is 2024-11-05. Both client and server negotiate the version during the initialize handshake.
Module 2 — Core MCP Features
2.1 Sampling
What Is Sampling?
Sampling allows an MCP server to request an LLM completion through the client. The server does not call the LLM directly — it asks the client to do it on its behalf.
┌────────────────────────────────────────────────────────┐
│ Sampling Data Flow │
│ │
│ Server Client LLM (Claude API) │
│ │ │ │ │
│ │─sampling/──────► │ │
│ │ createMessage │ │ │
│ │ {messages, │ │ │
│ │ maxTokens} │ [optional: show │ │
│ │ │ to user for │ │
│ │ │ approval] │ │
│ │ │─messages.create──► │
│ │ │◄─response───────── │
│ │◄─sampling ─────│ │ │
│ │ result │ │ │
└────────────────────────────────────────────────────────┘
Why This Architecture?
- Security: The client controls which models, token budgets, and parameters are allowed
- Human-in-the-loop: The client (and therefore the user) can review, modify, or reject sampling requests before they are sent to the LLM
- Policy enforcement: Organizations can restrict which models servers may invoke
- Cost control: Clients can cap token usage, preventing runaway server costs
Use Cases
| Use Case | Example |
|---|---|
| Natural language to structured data | Database server generates SQL from plain English |
| Code explanation | File analysis server explains a function’s purpose |
| Data summarization | Log analysis server summarizes an error cluster |
| Decision making | Workflow server asks LLM whether a condition is met |
| Content classification | Document server categorizes files by type and relevance |
| Error diagnosis | Monitoring server interprets a stack trace and suggests fixes |
Sampling Request Format (Complete)
The server sends a sampling/createMessage request to the client:
{
"jsonrpc": "2.0",
"id": 7,
"method": "sampling/createMessage",
"params": {
"messages": [
{
"role": "user",
"content": {
"type": "text",
"text": "Convert this English query to SQL: find all users who joined last month and have made at least one purchase"
}
}
],
"modelPreferences": {
"hints": [{"name": "claude-sonnet"}],
"intelligencePriority": 0.8,
"speedPriority": 0.2,
"costPriority": 0.3
},
"systemPrompt": "You are a SQL expert. Return only valid SQL, no explanation. Use standard ANSI SQL.",
"maxTokens": 500,
"temperature": 0.1,
"stopSequences": [";"]
}
}
Sampling Response Format
The client sends back the LLM’s response:
{
"jsonrpc": "2.0",
"id": 7,
"result": {
"role": "assistant",
"content": {
"type": "text",
"text": "SELECT u.* FROM users u JOIN orders o ON u.id = o.user_id WHERE u.created_at >= DATE_TRUNC('month', CURRENT_DATE - INTERVAL '1 month') AND u.created_at < DATE_TRUNC('month', CURRENT_DATE) GROUP BY u.id HAVING COUNT(o.id) >= 1"
},
"model": "claude-sonnet-4-5",
"stopReason": "end_turn"
}
}
Python Implementation — Server Side
import asyncio
from mcp.server import Server
from mcp.types import (
CreateMessageRequest,
CreateMessageResult,
TextContent,
SamplingMessage,
)
server = Server("sql-assistant")
@server.call_tool()
async def handle_natural_language_query(name: str, arguments: dict) -> list:
if name != "nl-to-sql":
raise ValueError(f"Unknown tool: {name}")
nl_query = arguments["query"]
schema_hint = arguments.get("schema", "")
# Build the sampling request
messages = [
SamplingMessage(
role="user",
content=TextContent(
type="text",
text=f"Convert to SQL: {nl_query}\n\nSchema context: {schema_hint}"
)
)
]
# Request LLM completion through the client
# This is the key: server does NOT call LLM directly
result: CreateMessageResult = await server.request_context.session.create_message(
messages=messages,
max_tokens=500,
system_prompt="Return only valid SQL. No explanations or markdown.",
temperature=0.1,
)
sql = result.content.text
return [TextContent(type="text", text=sql)]
Human-in-the-Loop Approval Flow
┌────────────────────────────────────────────────────────────┐
│ Human-in-the-Loop Decision Points │
│ │
│ 1. Server sends sampling/createMessage │
│ │ │
│ ▼ │
│ 2. Client INTERCEPTS the request │
│ - Reads messages and parameters │
│ - Optionally shows UI to user: │
│ "Server X wants to call Claude with this prompt" │
│ [Approve] [Modify] [Reject] │
│ │ │
│ ┌────────┼────────┐ │
│ │ │ │ │
│ Approve Modify Reject │
│ │ │ │ │
│ │ Edit prompt │ │
│ │ │ Return error to server │
│ └────────┘ │
│ │ │
│ 3. Client forwards approved request to LLM │
│ 4. LLM response returned to server │
└────────────────────────────────────────────────────────────┘
Model Preferences and Hints
Servers can suggest (not mandate) which model to use:
"modelPreferences": {
"hints": [
{"name": "claude-opus"},
{"name": "claude-sonnet"}
],
"intelligencePriority": 0.9,
"speedPriority": 0.1,
"costPriority": 0.2
}
hints— ordered list of preferred model names; client may ignoreintelligencePriority— weight for model capability (0.0–1.0)speedPriority— weight for response latencycostPriority— weight for cost efficiency- Client makes the FINAL decision on which model to use
Security Controls in Sampling
| Control | Who Enforces | Description |
|---|---|---|
| Model allowlist | Client | Only approved models can be invoked |
| Max token cap | Client | Client can reduce maxTokens requested by server |
| System prompt filtering | Client | Client can review/block harmful system prompts |
| Human review gate | Client | User must approve before forwarding to LLM |
| Request logging | Client | All sampling requests logged for audit |
| Rate limiting | Client | Client throttles server’s sampling requests |
Key Exam Points
- Sampling requests flow: server → client → LLM → client → server
- Method name is
sampling/createMessage(not just “sampling”) - The client is the gatekeeper — it controls all LLM access
- Servers cannot bypass client policy by calling LLMs directly
- Human-in-the-loop is a client-side feature, not server-side
- Model preferences are hints — the client has final say on model choice
stopReasonin response indicates why the LLM stopped:end_turn,max_tokens,stop_sequence
2.2 Notifications
What Are Notifications?
Notifications are one-way messages with no response expected. Either side can send them. The defining characteristic: no id field.
// Request has id → expects response
{"jsonrpc": "2.0", "id": 5, "method": "tools/call", "params": {...}}
// Notification has NO id → no response expected
{"jsonrpc": "2.0", "method": "notifications/progress", "params": {...}}
Server Notifications (Server → Client)
| Notification | Trigger | When to Use |
|---|---|---|
notifications/resources/updated | A specific resource’s content has changed | File changed, DB row updated |
notifications/resources/list_changed | The list of available resources has changed | Files added/removed |
notifications/tools/list_changed | Available tools have been added or removed | Plugin loaded/unloaded |
notifications/prompts/list_changed | Available prompts have been added or removed | Template added/removed |
notifications/progress | Progress update for a long-running operation | File indexing, migrations, builds |
notifications/message | Structured log message with severity level | Debug output, warnings, errors |
Client Notifications (Client → Server)
| Notification | Trigger | When to Use |
|---|---|---|
notifications/roots/list_changed | Available filesystem roots have changed | User opened/closed a workspace |
notifications/cancelled | Client is cancelling a previously sent request | User pressed Ctrl+C |
Progress Reporting — Complete Flow
Used for long-running tool operations (file indexing, database migrations, test runs):
┌──────────────────────────────────────────────────────────────┐
│ Progress Token Flow │
│ │
│ 1. Client includes progressToken in original request: │
│ tools/call { _meta: { progressToken: "job-42" } } │
│ │ │
│ 2. Server starts long operation and sends notifications: │
│ notifications/progress { token: "job-42", progress: 25 } │
│ notifications/progress { token: "job-42", progress: 50 } │
│ notifications/progress { token: "job-42", progress: 75 } │
│ │ │
│ 3. Server sends final result when done: │
│ tools/call response { result: {...} } │
└──────────────────────────────────────────────────────────────┘
# 1. Client includes progressToken in the original request
request = {
"jsonrpc": "2.0",
"id": 42,
"method": "tools/call",
"params": {
"name": "index-files",
"arguments": {"path": "/src"},
"_meta": {"progressToken": "index-job-42"}
}
}
# 2. Server sends progress notifications during execution
progress_notification = {
"jsonrpc": "2.0",
"method": "notifications/progress",
"params": {
"progressToken": "index-job-42",
"progress": 65, # current value
"total": 100, # optional: total for percentage
"message": "Indexed 650 of 1000 files" # optional human-readable
}
}
Python Server — Sending Progress Notifications
from mcp.server import Server
from mcp.types import TextContent
import asyncio
server = Server("file-indexer")
@server.call_tool()
async def handle_index_files(name: str, arguments: dict) -> list:
path = arguments["path"]
ctx = server.request_context
files = list_all_files(path)
total = len(files)
for i, file_path in enumerate(files):
# Process file
index_file(file_path)
# Send progress notification every 10 files
if i % 10 == 0:
await ctx.session.send_progress_notification(
progress_token=ctx.meta.get("progressToken"),
progress=i,
total=total,
message=f"Indexed {i} of {total} files: {file_path}"
)
return [TextContent(type="text", text=f"Indexed {total} files successfully")]
Logging Notifications — Severity Levels
Structured log messages with syslog-compatible severity levels:
log_notification = {
"jsonrpc": "2.0",
"method": "notifications/message",
"params": {
"level": "warning",
"logger": "database-connector",
"data": {
"query": "SELECT * FROM users WHERE id = ?",
"duration_ms": 4500,
"message": "Slow query detected — consider adding index on users.id"
}
}
}
Log Severity Levels (Syslog Order, Most Severe Last)
| Level | Numeric | Use When |
|---|---|---|
debug | 7 | Verbose diagnostic information for development |
info | 6 | Normal operational messages |
notice | 5 | Significant but normal conditions |
warning | 4 | Unexpected condition that may need attention |
error | 3 | Error condition — operation failed |
critical | 2 | Critical failure — system component failing |
alert | 1 | Immediate action required |
emergency | 0 | System is unusable |
Resource Updated Notification
{
"jsonrpc": "2.0",
"method": "notifications/resources/updated",
"params": {
"uri": "file:///Users/project/src/config.json"
}
}
After receiving this, the client should re-fetch the resource if it has it cached.
Cancellation Notification
{
"jsonrpc": "2.0",
"method": "notifications/cancelled",
"params": {
"requestId": 42,
"reason": "User cancelled the operation"
}
}
Servers should stop processing the request with id 42 upon receiving this.
Context Objects in Notifications
When a server tool handler runs, it receives a context object giving access to notification sending:
@server.call_tool()
async def handle_tool(name: str, arguments: dict) -> list:
ctx = server.request_context # context provided by the framework
# Send a log message
await ctx.session.send_log_message(
level="info",
data={"message": "Starting analysis", "tool": name},
logger="my-tool"
)
# Send progress (if progressToken was provided)
if ctx.meta and ctx.meta.get("progressToken"):
await ctx.session.send_progress_notification(
progress_token=ctx.meta["progressToken"],
progress=50,
total=100
)
Key Exam Points
- Notifications have NO
idfield — no response is expected or sent - Progress tokens link notifications to their originating request via
_meta.progressToken - Both client and server can send notifications
- Log severity follows syslog levels — 8 levels from
debug(least severe) toemergency(most severe) notifications/cancelledis client → server;notifications/progressis server → client- Resource update notifications carry the resource URI that changed
2.3 Roots (Filesystem Access)
What Are Roots?
Roots define directories that MCP servers are allowed to access. The client declares which paths are available; the server can only read files within those boundaries.
┌──────────────────────────────────────────────────────────────┐
│ Roots Security Model │
│ │
│ Client declares roots: │
│ file:///Users/alice/project/src ← allowed │
│ file:///Users/alice/project/docs ← allowed │
│ │
│ Server CAN access: │
│ /Users/alice/project/src/auth/login.ts ✓ │
│ /Users/alice/project/src/services/user.ts ✓ │
│ /Users/alice/project/docs/api.md ✓ │
│ │
│ Server CANNOT access: │
│ /Users/alice/.ssh/id_rsa ✗ │
│ /Users/alice/.env ✗ │
│ /etc/passwd ✗ │
│ /Users/alice/project/tests/fixtures/ ✗ (not in roots)
└──────────────────────────────────────────────────────────────┘
Root URI Format
file:///absolute/path/to/directory
- Always
file://scheme - Always absolute path (three slashes:
file:///=file://+/absolute/path) - No relative paths (no
file://./relative/path) - No trailing slash required but allowed
- Directories only (roots are directories, not individual files)
Examples:
file:///Users/alice/project/src ✓ macOS/Linux absolute path
file:///C:/Users/alice/project/src ✓ Windows absolute path
file:///home/alice/workspace ✓ Linux home directory
file://./relative/path ✗ relative paths are invalid
file:///Users/alice/project/src/main.ts ✗ files are invalid (use directories)
Roots Declaration Flow
┌────────────────────────────────────────────────────────────┐
│ Roots Initialization Flow │
│ │
│ 1. During initialize, client declares roots capability: │
│ "capabilities": { "roots": { "listChanged": true } } │
│ │ │
│ 2. Server calls roots/list to discover available roots: │
│ → Request: { method: "roots/list" } │
│ ← Response: { roots: [ │
│ { uri: "file:///project/src", name: "Source" }, │
│ { uri: "file:///project/docs", name: "Docs" } │
│ ]} │
│ │ │
│ 3. Server uses roots to scope file access │
│ │ │
│ 4. If roots change, client sends notification: │
│ notifications/roots/list_changed │
│ │ │
│ 5. Server calls roots/list again to get updated list │
└────────────────────────────────────────────────────────────┘
Roots List Response Format
{
"jsonrpc": "2.0",
"id": 3,
"result": {
"roots": [
{
"uri": "file:///Users/alice/project/src",
"name": "Source Code"
},
{
"uri": "file:///Users/alice/project/docs",
"name": "Documentation"
}
]
}
}
Python Server — Discovering Files Within Roots
import os
from pathlib import Path
from urllib.parse import urlparse
from mcp.server import Server
from mcp.types import TextContent
server = Server("file-explorer")
async def get_roots() -> list[str]:
"""Fetch the current list of allowed root directories from the client."""
result = await server.request_context.session.list_roots()
return [urlparse(root.uri).path for root in result.roots]
def is_within_roots(file_path: str, roots: list[str]) -> bool:
"""Check if a file path is within any of the declared roots."""
resolved = os.path.realpath(file_path)
for root in roots:
resolved_root = os.path.realpath(root)
if resolved.startswith(resolved_root + os.sep) or resolved == resolved_root:
return True
return False
@server.call_tool()
async def handle_read_file(name: str, arguments: dict) -> list:
file_path = arguments["path"]
roots = await get_roots()
# Enforce root boundaries before accessing the file
if not is_within_roots(file_path, roots):
raise PermissionError(
f"Access denied: {file_path} is outside declared roots"
)
with open(file_path, "r") as f:
content = f.read()
return [TextContent(type="text", text=content)]
Dynamic Roots — Runtime Changes
Roots can change during a session (user opens a new workspace folder):
# Client sends this when user opens a new project folder
roots_changed_notification = {
"jsonrpc": "2.0",
"method": "notifications/roots/list_changed"
# No params needed — server must call roots/list to get the update
}
After receiving this notification, the server re-fetches:
@server.notification_handler("notifications/roots/list_changed")
async def handle_roots_changed():
# Invalidate cached roots
server_state.cached_roots = None
# Re-fetch current roots
result = await server.request_context.session.list_roots()
server_state.cached_roots = result.roots
logger.info(f"Roots updated: {[r.uri for r in result.roots]}")
User Approval for Root Access
Some clients (like Claude Desktop) require explicit user consent before granting a server access to a root:
User action: "Connect to MCP server 'code-analyzer'"
Client shows dialog:
"code-analyzer wants to access:
- /Users/alice/project/src
- /Users/alice/project/docs
[Allow] [Allow Once] [Deny]"
This is implementation-defined but strongly recommended for security-sensitive directories.
Path Traversal Prevention
Servers MUST prevent path traversal attacks even within declared roots:
# DANGEROUS: does not prevent traversal
def read_file_unsafe(path: str) -> str:
return open(path).read()
# SAFE: resolves symlinks and checks real path
def read_file_safe(path: str, roots: list[str]) -> str:
real_path = os.path.realpath(path) # resolves symlinks, "..", etc.
if not is_within_roots(real_path, roots):
raise PermissionError("Path traversal detected")
return open(real_path).read()
Roots vs Resources — Key Distinction
| Concept | Who Declares | Direction | Purpose |
|---|---|---|---|
| Roots | Client | Client → Server | Defines filesystem boundaries |
| Resources | Server | Server → Client | Exposes specific data the server has |
Roots say “you can look here.” Resources say “here is what I have.”
Key Exam Points
- Roots are declared by the client, not requested by the server
- URI format:
file:///absolute/path(three slashes for absolute Unix path) - Servers MUST enforce root boundaries — accessing outside is a protocol violation
- Roots can change dynamically during a session via
notifications/roots/list_changed - After
roots/list_changed, server MUST callroots/listto get the updated list - Use
os.path.realpath()to resolve symlinks before checking root membership
Module 3 — Transports & Communication
3.1 JSON-RPC 2.0 Message Architecture
All MCP communication uses JSON-RPC 2.0 as the message format. It is transport-agnostic — the same message format works over STDIO, HTTP, or any future transport.
Three Message Types
| Type | Has id? | Has method? | Has result/error? | Direction |
|---|---|---|---|---|
| Request | Yes | Yes | No | Either direction |
| Response | Yes (same) | No | Yes | Reply to request |
| Notification | No | Yes | No | Either direction |
Complete Message Format Examples
// ── REQUEST ──────────────────────────────────────────────────────────────
// Client calls a tool
{
"jsonrpc": "2.0",
"id": 1,
"method": "tools/call",
"params": {
"name": "read-file",
"arguments": {"path": "/src/main.ts"}
}
}
// Server requests sampling (server-initiated request!)
{
"jsonrpc": "2.0",
"id": 7,
"method": "sampling/createMessage",
"params": {
"messages": [{"role": "user", "content": {"type": "text", "text": "..."}}],
"maxTokens": 500
}
}
// ── SUCCESS RESPONSE ──────────────────────────────────────────────────────
{
"jsonrpc": "2.0",
"id": 1,
"result": {
"content": [{"type": "text", "text": "file contents here..."}]
}
}
// ── ERROR RESPONSE ────────────────────────────────────────────────────────
{
"jsonrpc": "2.0",
"id": 1,
"error": {
"code": -32602,
"message": "Invalid params",
"data": {
"field": "path",
"reason": "Path must be an absolute file path"
}
}
}
// ── NOTIFICATION ──────────────────────────────────────────────────────────
// No id field — no response expected
{
"jsonrpc": "2.0",
"method": "notifications/progress",
"params": {
"progressToken": "job-42",
"progress": 50,
"total": 100
}
}
Standard Error Codes
| Code | Name | When to Use |
|---|---|---|
| -32700 | Parse error | JSON could not be parsed at all |
| -32600 | Invalid request | JSON-RPC envelope structure is invalid |
| -32601 | Method not found | Method name is unknown |
| -32602 | Invalid params | Parameters are wrong type, missing, or extra |
| -32603 | Internal error | Unhandled server-side exception |
| -32000 to -32099 | Server errors | Application-defined errors (custom codes) |
Memory trick: -32700 = parse, -32600 = invalid request, -32601 = method, -32602 = params, -32603 = internal.
Bidirectional Communication — Full Flow
Both sides can initiate requests. This is what makes MCP powerful — it’s not just client → server.
┌────────────────────────────────────────────────────────────────────┐
│ Bidirectional JSON-RPC Communication │
│ │
│ Client Server │
│ │ │ │
│ │── tools/call (request id:1) ────►│ Client-initiated request │
│ │◄── result (response id:1) ───────│ Server replies │
│ │ │ │
│ │◄── notifications/progress ───────│ Server-initiated notif │
│ │ (no id, no reply needed) │ │
│ │ │ │
│ │◄── sampling/createMessage ────────│ Server-initiated request │
│ │ (request id:7) │ (server wants LLM call) │
│ │── result (response id:7) ───────►│ Client replies │
│ │ │ │
│ │── notifications/cancelled ──────►│ Client-initiated notif │
│ │ (no id, no reply needed) │ │
└────────────────────────────────────────────────────────────────────┘
Request ID Rules
- IDs must be unique within a session
- IDs can be numbers or strings:
"id": 1or"id": "req-abc" - Numeric IDs are conventional; sequential integers are common
- Response
idMUST match the requestidexactly - Notification has no
id— if present, it becomes a request
Batch Requests
JSON-RPC 2.0 supports sending multiple requests in one array (less common in MCP but valid):
[
{"jsonrpc": "2.0", "id": 1, "method": "tools/list", "params": {}},
{"jsonrpc": "2.0", "id": 2, "method": "resources/list", "params": {}}
]
Response is also an array, potentially in different order.
Key Exam Points
- All three message types have
"jsonrpc": "2.0"— always - Request: has
id+method+ optionalparams - Response: has
id+ eitherresultORerror(never both) - Notification: has
method+ optionalparams, NOid - Standard error codes: -32700 (parse), -32600 (invalid), -32601 (method), -32602 (params), -32603 (internal)
- Both sides can initiate requests — true bidirectional RPC
3.2 STDIO Transport
How It Works
The server runs as a child process of the client. All JSON-RPC communication flows through standard I/O streams:
┌─────────────────────────────────────────────────────────────────┐
│ STDIO Transport Architecture │
│ │
│ ┌─────────────────────┐ ┌─────────────────────────┐ │
│ │ Client Process │ │ Server Process │ │
│ │ (Claude Desktop/ │ │ (your MCP server) │ │
│ │ Claude Code) │ │ │ │
│ │ │──stdin─►│ reads JSON-RPC messages │ │
│ │ │◄stdout──│ writes JSON-RPC messages │ │
│ │ │ │──stderr──► logs only │ │
│ └─────────────────────┘ └─────────────────────────┘ │
│ │
│ Rules: │
│ • stdin/stdout: JSON-RPC protocol messages │
│ • stderr: diagnostic logs ONLY — NEVER protocol messages │
│ • Messages are newline-delimited (one JSON object per line) │
└─────────────────────────────────────────────────────────────────┘
Initialization Handshake — Complete Sequence
┌──────────────────────────────────────────────────────────────────┐
│ STDIO Initialization Handshake │
│ │
│ Step 1: Client launches server subprocess │
│ $ python my_mcp_server.py │
│ │
│ Step 2: Client sends initialize request │
│ Client ──► { "jsonrpc": "2.0", "id": 0, │
│ "method": "initialize", │
│ "params": { │
│ "protocolVersion": "2024-11-05", │
│ "capabilities": { │
│ "roots": { "listChanged": true }, │
│ "sampling": {} │
│ }, │
│ "clientInfo": { │
│ "name": "claude-code", │
│ "version": "1.5.0" │
│ } │
│ }} │
│ │
│ Step 3: Server responds with capabilities │
│ Server ──► { "jsonrpc": "2.0", "id": 0, │
│ "result": { │
│ "protocolVersion": "2024-11-05", │
│ "capabilities": { │
│ "tools": { "listChanged": true }, │
│ "resources": { "subscribe": true }, │
│ "logging": {} │
│ }, │
│ "serverInfo": { │
│ "name": "my-mcp-server", │
│ "version": "0.1.0" │
│ } │
│ }} │
│ │
│ Step 4: Client sends initialized notification │
│ Client ──► { "jsonrpc": "2.0", "method": "initialized" } │
│ │
│ Step 5: Normal communication begins │
│ Client can now call tools, list resources, etc. │
└──────────────────────────────────────────────────────────────────┘
Complete Handshake JSON Examples
// Step 2: Client → Server (initialize request)
{
"jsonrpc": "2.0",
"id": 0,
"method": "initialize",
"params": {
"protocolVersion": "2024-11-05",
"capabilities": {
"roots": { "listChanged": true },
"sampling": {}
},
"clientInfo": {
"name": "claude-code",
"version": "1.5.0"
}
}
}
// Step 3: Server → Client (initialize response)
{
"jsonrpc": "2.0",
"id": 0,
"result": {
"protocolVersion": "2024-11-05",
"capabilities": {
"tools": { "listChanged": true },
"resources": {
"subscribe": true,
"listChanged": true
},
"logging": {}
},
"serverInfo": {
"name": "my-mcp-server",
"version": "0.1.0"
}
}
}
// Step 4: Client → Server (initialized notification — no id)
{
"jsonrpc": "2.0",
"method": "initialized"
}
Stderr for Logging
Stderr is the ONLY place for diagnostic output in STDIO transport:
import sys
import logging
# Configure logging to stderr — NEVER stdout
logging.basicConfig(
stream=sys.stderr,
level=logging.DEBUG,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger("my-mcp-server")
# stdout is used ONLY for JSON-RPC messages
# stderr is used ONLY for human-readable logs
logger.info("Server started") # goes to stderr ✓
print('{"jsonrpc": ...}') # goes to stdout ✓
logger.error("Connection failed") # goes to stderr ✓
print("Debug: processing...") # BREAKS PROTOCOL ✗
Process Lifecycle
Client starts → subprocess spawned → handshake → session active → client exits → subprocess killed
- Server lifetime is tied to client process
- No persistent daemon — fresh process every session
- Clean shutdown when client disconnects
Python Server Implementation (STDIO)
import asyncio
import sys
from mcp.server import Server
from mcp.server.stdio import stdio_server
from mcp.types import Tool, TextContent
server = Server("my-server")
@server.list_tools()
async def list_tools() -> list[Tool]:
return [
Tool(
name="greet",
description="Returns a greeting",
inputSchema={
"type": "object",
"properties": {
"name": {"type": "string", "description": "Name to greet"}
},
"required": ["name"]
}
)
]
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list:
if name == "greet":
return [TextContent(type="text", text=f"Hello, {arguments['name']}!")]
raise ValueError(f"Unknown tool: {name}")
async def main():
async with stdio_server() as (read_stream, write_stream):
await server.run(read_stream, write_stream, server.create_initialization_options())
if __name__ == "__main__":
asyncio.run(main())
Advantages and Limitations
| Advantages | Limitations |
|---|---|
| Zero network overhead | Only one client per server instance |
| Simple process lifecycle | Cannot run remotely |
| No authentication required | Restarts when client process ends |
| Easy to debug (logs to stderr) | Not suitable for shared/multi-user use |
| Works offline | No persistent state between sessions |
| OS-level process isolation | Scaling requires multiple processes |
Claude Desktop Configuration (STDIO)
{
"mcpServers": {
"my-server": {
"command": "python",
"args": ["/Users/alice/my-mcp-server/server.py"],
"env": {
"DATABASE_URL": "postgresql://localhost/mydb"
}
}
}
}
Key Exam Points
- stdin/stdout for JSON-RPC; stderr for logs only — mixing these corrupts the protocol
- Initialization:
initializerequest →initializeresponse →initializednotification (3 steps) - The
initializednotification is step 4 — it has NOid(it’s a notification) - Best for local, single-client, development use
- Server lifetime = client lifetime
3.3 Streamable HTTP Transport
How It Works
- Client sends HTTP POST requests containing JSON-RPC messages
- Server responds with Server-Sent Events (SSE) for streaming responses and notifications
- Sessions are managed via session tokens passed in HTTP headers
┌──────────────────────────────────────────────────────────────────┐
│ Streamable HTTP Transport Architecture │
│ │
│ Client Server (HTTP) │
│ │ │ │
│ │── POST /mcp (initialize) ────────►│ │
│ │ Content-Type: application/json │ │
│ │◄── HTTP 200 ──────────────────────│ │
│ │ Content-Type: text/event-stream │ │
│ │ Mcp-Session-Id: sess-abc123 │ │
│ │ │ │
│ │── POST /mcp (tools/call) ────────►│ │
│ │ Mcp-Session-Id: sess-abc123 │ │
│ │◄── SSE: progress notification ────│ streaming events │
│ │◄── SSE: progress notification ────│ │
│ │◄── SSE: result ───────────────────│ final result │
│ │ │ │
│ │── DELETE /mcp ───────────────────►│ end session │
│ │ Mcp-Session-Id: sess-abc123 │ │
└──────────────────────────────────────────────────────────────────┘
SSE Event Format
Server-Sent Events have a specific wire format — each field on its own line, events separated by blank lines:
event: message
data: {"jsonrpc":"2.0","id":1,"result":{"content":[{"type":"text","text":"done"}]}}
event: message
data: {"jsonrpc":"2.0","method":"notifications/progress","params":{"progressToken":"job-1","progress":75}}
event: message
data: {"jsonrpc":"2.0","method":"notifications/progress","params":{"progressToken":"job-1","progress":100}}
Rules:
event:line — event type (alwaysmessagefor MCP)data:line — the JSON-RPC message as a single JSON string- Blank line (
\n\n) terminates each event - Multiple events can stream in one HTTP response
Session Management
┌──────────────────────────────────────────────────────────────────┐
│ Session Lifecycle │
│ │
│ 1. Client sends POST /mcp with initialize request │
│ 2. Server creates session, returns Mcp-Session-Id header │
│ 3. Client stores session ID │
│ 4. ALL subsequent requests include: Mcp-Session-Id: <id> │
│ 5. Server looks up session state using the ID │
│ 6. Client ends session with DELETE /mcp + session ID │
│ (or session expires after inactivity timeout) │
└──────────────────────────────────────────────────────────────────┘
Request headers example:
POST /mcp HTTP/1.1
Host: api.myserver.com
Content-Type: application/json
Authorization: Bearer eyJhbGc...
Mcp-Session-Id: sess-abc123
Accept: text/event-stream
{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"search","arguments":{"q":"python"}}}
Authentication Options
| Method | Header Format | Use Case |
|---|---|---|
| Bearer tokens | Authorization: Bearer <token> | API-key style auth, most common |
| OAuth 2.0 | Authorization: Bearer <oauth-token> | User-delegated access |
| API keys | X-Api-Key: <key> | Simple server-to-server integration |
| mTLS | Client certificate in TLS handshake | High-security environments |
| Basic auth | Authorization: Basic <b64> | Internal/dev use only |
Python Server Implementation (HTTP + FastAPI)
from fastapi import FastAPI, Request, Response
from fastapi.responses import StreamingResponse
import asyncio
import json
import uuid
app = FastAPI()
sessions: dict[str, dict] = {}
@app.post("/mcp")
async def handle_mcp(request: Request):
session_id = request.headers.get("Mcp-Session-Id")
body = await request.json()
if body.get("method") == "initialize":
session_id = str(uuid.uuid4())
sessions[session_id] = {"state": {}}
async def event_stream():
# Process request and yield SSE events
result = await process_request(body, sessions.get(session_id, {}))
yield f"event: message\ndata: {json.dumps(result)}\n\n"
response = StreamingResponse(
event_stream(),
media_type="text/event-stream"
)
if body.get("method") == "initialize":
response.headers["Mcp-Session-Id"] = session_id
return response
@app.delete("/mcp")
async def end_session(request: Request):
session_id = request.headers.get("Mcp-Session-Id")
if session_id and session_id in sessions:
del sessions[session_id]
return Response(status_code=200)
Connection Lifecycle
┌──────────────────────────────────────────────────────────────────┐
│ Streamable HTTP Connection Lifecycle │
│ │
│ ┌──────────┐ │
│ │ CONNECT │ POST /mcp initialize │
│ └────┬─────┘ │
│ │ │
│ ┌────▼─────┐ │
│ │ SESSION │ Mcp-Session-Id assigned, state stored │
│ │ ACTIVE │ │
│ └────┬─────┘ │
│ │ ← multiple POST requests with session ID │
│ │ ← each may return SSE stream of events │
│ │ │
│ ┌────▼──────────────────┐ │
│ │ DISCONNECT (one of): │ │
│ │ • DELETE /mcp │ client-initiated clean shutdown │
│ │ • Inactivity timeout │ server cleans up stale sessions │
│ │ • Network error │ triggers reconnection logic │
│ └───────────────────────┘ │
└──────────────────────────────────────────────────────────────────┘
Comparison: STDIO vs Streamable HTTP
| Dimension | STDIO | Streamable HTTP |
|---|---|---|
| Transport | Process stdin/stdout | HTTP POST + SSE |
| Clients | Single (the parent process) | Multiple concurrent |
| Network | None (local only) | Required |
| Authentication | OS process permissions | Bearer token / OAuth / mTLS |
| Scaling | One process per client | Load balanced across instances |
| Session state | In-process memory | HTTP session tokens + store |
| Best for | Local tools, Claude Desktop | Cloud servers, enterprise |
| Debugging | Easy (stderr logs) | Harder (distributed logs) |
| Reconnection | Restart process | Resume session with token |
| Protocol overhead | Near zero | HTTP framing per request |
Key Exam Points
- HTTP POST for requests; SSE for streaming responses and notifications
Mcp-Session-Idheader carries session state between requests- SSE format:
event: message\ndata: {json}\n\n - Session ends with
DELETE /mcpor inactivity timeout - Supports multiple concurrent clients (unlike STDIO)
- Authentication via
Authorizationheader (Bearer, OAuth, etc.)
3.4 Production Considerations
Stateless vs Stateful Servers
| Dimension | Stateless | Stateful |
|---|---|---|
| Session state | None — each request is independent | In-memory or Redis per session |
| Load balancing | Any LB works (round-robin, etc.) | Sticky sessions required |
| Horizontal scaling | Add instances freely | Instances must not be interchangeable |
| Failure recovery | Any instance handles retry | Losing an instance loses session |
| Complexity | Low — no state synchronization | High — state management required |
| Use when | Tools with no conversation memory | Tools that maintain context or cache |
| Example | File reader, code formatter | Long-running analysis with history |
Scaling Patterns
┌──────────────────────────────────────────────────────────────────┐
│ Horizontal Scaling Patterns │
│ │
│ STATELESS (preferred): │
│ │
│ Client A ─┐ │
│ Client B ─┤── Load Balancer ─┬─ Instance A │
│ Client C ─┘ (round robin) ├─ Instance B │
│ └─ Instance C │
│ Any request can go to any instance — no routing constraints. │
│ │
│ STATEFUL (sticky sessions): │
│ │
│ Client A ─┐ │
│ Client B ─┤── Load Balancer ─┬─ Instance A ← Client A always │
│ Client C ─┘ (session-aware) ├─ Instance B ← Client B always │
│ └─ Instance C ← Client C always │
│ Session hash routes each client to its designated instance. │
└──────────────────────────────────────────────────────────────────┘
Health Check Endpoint
Every production MCP server should expose a /health endpoint:
@app.get("/health")
async def health_check():
return {
"status": "healthy", # healthy | degraded | unhealthy
"uptime_seconds": get_uptime(),
"active_sessions": len(sessions),
"version": "1.2.3",
"checks": {
"database": "ok",
"cache": "ok",
"external_api": "ok"
}
}
Load balancers ping /health every 5–30 seconds. Unhealthy instances are removed from rotation.
Error Recovery and Reconnection
import asyncio
import random
async def connect_with_backoff(client, server_url: str, max_attempts: int = 5):
"""Exponential backoff reconnection strategy."""
base_delay = 1.0 # seconds
max_delay = 60.0 # cap at 60 seconds
for attempt in range(max_attempts):
try:
session = await client.connect(server_url)
return session
except ConnectionError as e:
if attempt == max_attempts - 1:
raise
# Exponential backoff with jitter
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.1)
total_delay = delay + jitter
print(f"Connection failed (attempt {attempt+1}): {e}")
print(f"Retrying in {total_delay:.1f}s...")
await asyncio.sleep(total_delay)
Reconnection Strategy
Client detects disconnect:
1. SSE stream closes unexpectedly
2. HTTP request times out
3. Network error received
Response:
1. Detect disconnect (event)
2. Wait: 1s → 2s → 4s → 8s → 16s (exponential backoff)
3. Add jitter (±10%) to avoid thundering herd
4. Re-establish session:
- If session token still valid: resume with same Mcp-Session-Id
- If session expired: new initialize handshake
5. Re-send any in-flight requests that had no confirmed response
Rate Limiting
from collections import defaultdict
from time import time
class RateLimiter:
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window = window_seconds
self.requests: dict[str, list[float]] = defaultdict(list)
def is_allowed(self, client_id: str) -> bool:
now = time()
window_start = now - self.window
# Remove expired timestamps
self.requests[client_id] = [
ts for ts in self.requests[client_id] if ts > window_start
]
if len(self.requests[client_id]) >= self.max_requests:
return False
self.requests[client_id].append(now)
return True
# Usage: 100 requests per 60-second window per session
limiter = RateLimiter(max_requests=100, window_seconds=60)
Security Checklist for Production
- TLS enforced on all connections (HTTPS only, redirect HTTP → HTTPS)
- Authentication on every request (validate token before processing any message)
- Input validation on all tool arguments (type checking, length limits, injection prevention)
- Rate limiting per client/session (prevent abuse and runaway costs)
- Audit logging of all tool calls (who called what, when, with what arguments)
- Roots boundaries enforced server-side with real path resolution (not just client-side)
- Secrets in environment variables or secret manager, never hardcoded
- Session token expiry and rotation policy
- Error messages sanitized (no stack traces or internal details exposed to clients)
- Dependency scanning and regular updates
Deployment Checklist
Before deploying to production:
[ ] Health endpoint returns structured JSON
[ ] Graceful shutdown: drain active sessions before stopping
[ ] Structured JSON logging (not plaintext — feeds into log aggregation)
[ ] Metrics exposed: request count, latency, error rate, active sessions
[ ] Distributed tracing headers forwarded (X-Request-ID, trace-id)
[ ] Load balancer health check configured (GET /health)
[ ] Session store external (Redis/Postgres) for stateful servers
[ ] Auto-scaling policy defined (CPU/memory/session thresholds)
[ ] Circuit breaker for downstream dependencies
Module 4 — Assessment & Conclusion
Key Takeaways by Topic
Sampling
- Server requests LLM completions through the client via
sampling/createMessage - Client controls model, token budget, and human-in-the-loop approval
- Flow: server → client → LLM → client → server
- Model preferences are hints only — client has final authority
Notifications
- One-way messages, no
idfield, no response expected - Progress tokens (
_meta.progressToken) link notifications to originating requests - Both sides can send notifications (server and client)
- Log severity: 8 syslog levels,
debug= least severe,emergency= most severe
Roots
- Client-declared filesystem boundaries for server access
- URI format:
file:///absolute/path(three slashes) - Dynamic — can change during session with
notifications/roots/list_changed - After change notification, server must call
roots/listto get updated list
JSON-RPC 2.0
- Three types: Request (has
id+method), Response (hasid+result/error), Notification (noid) - Bidirectional — both client and server can initiate requests
- Standard error codes: -32700 (parse) through -32603 (internal)
- Response
idmust match requestidexactly
STDIO Transport
- stdin/stdout for JSON-RPC; stderr for logs only
- Initialization:
initializerequest → response →initializednotification (3 steps, 4 messages) - Best for local, single-client, development use
- Server lifetime tied to client process
Streamable HTTP Transport
- HTTP POST for requests; SSE (
text/event-stream) for streaming responses Mcp-Session-Idheader manages session state- Best for remote, multi-client, production use
- Session ends with DELETE /mcp or timeout
Production Operations
- Stateless servers preferred — horizontal scaling without routing constraints
- Stateful servers require sticky sessions (session-aware load balancing)
- Health endpoint (
/health) required for load balancer integration - Exponential backoff with jitter for reconnection
Certification Prep Checklist
Sampling
- Method name:
sampling/createMessage(not just “sampling”) - Flow direction: server → client → LLM → client → server
- Human-in-the-loop is a client-side feature
-
modelPreferences.hintsare hints only — client decides -
stopReasonvalues:end_turn,max_tokens,stop_sequence
Notifications
- No
idfield = notification (no response expected) -
progressTokenlives in_metaof the original request - Both sides can send notifications
- 8 log severity levels: debug, info, notice, warning, error, critical, alert, emergency
-
notifications/cancelledis client → server
Roots
- Declared by CLIENT not server
- URI format:
file:///absolute/path(3 slashes for absolute Unix paths) - After
roots/list_changed, server callsroots/list - Use
os.path.realpath()to prevent symlink traversal
JSON-RPC 2.0
- All messages have
"jsonrpc": "2.0" - Request:
id+method+ optionalparams - Response:
id+resultORerror(never both) - Notification:
method+ optionalparams, NOid - Error codes: -32700, -32600, -32601, -32602, -32603
STDIO
- stdin/stdout = protocol; stderr = logs only
- 3-step init: initialize (request) → initialize (response) → initialized (notification)
- One client per server instance
Streamable HTTP
- HTTP POST requests + SSE responses
-
Mcp-Session-Idheader for session tracking - SSE format:
event: message\ndata: {json}\n\n - Session ends with DELETE /mcp
Production
- Stateless = any load balancer; stateful = sticky sessions required
- Exponential backoff for reconnection (1s, 2s, 4s, 8s…)
-
/healthendpoint required - TLS + auth + rate limiting + audit logging for production
Practice Exercises
Exercise 1 — Identify the Message Type
Classify each message as Request, Response, or Notification:
A: {"jsonrpc":"2.0","method":"notifications/progress","params":{"progress":50}}
B: {"jsonrpc":"2.0","id":3,"result":{"content":[]}}
C: {"jsonrpc":"2.0","id":5,"method":"tools/call","params":{}}
D: {"jsonrpc":"2.0","id":5,"error":{"code":-32602,"message":"Invalid params"}}
E: {"jsonrpc":"2.0","method":"notifications/resources/updated","params":{"uri":"file:///a"}}
Answers: A=Notification, B=Response(success), C=Request, D=Response(error), E=Notification
Exercise 2 — Sampling Security
A server’s sampling request specifies "modelPreferences": {"hints": [{"name": "claude-opus"}]}. The client’s policy only allows claude-sonnet. What happens?
Answer: The client uses claude-sonnet and ignores the hint. Model preferences are hints only.
Exercise 3 — Root Validation
A server receives path /Users/alice/project/src/../../../etc/passwd. Roots declare only file:///Users/alice/project/src. Should the server allow access?
Answer: No. After os.path.realpath() resolves the traversal, the real path is /etc/passwd, which is outside the declared root.
Exercise 4 — Transport Choice
Match each scenario to the correct transport:
- Multi-tenant SaaS tool serving 500 concurrent enterprise users
- Claude Desktop plugin for local file analysis
- CI/CD integration running on build agents
Answers: HTTP/Stateless, STDIO, HTTP/Stateless
Quick Reference
JSON-RPC Message Templates
// Request (expects response)
{"jsonrpc":"2.0","id":1,"method":"METHOD","params":{}}
// Success Response
{"jsonrpc":"2.0","id":1,"result":{}}
// Error Response
{"jsonrpc":"2.0","id":1,"error":{"code":-32602,"message":"Invalid params","data":{}}}
// Notification (no response)
{"jsonrpc":"2.0","method":"notifications/progress","params":{"progressToken":"t","progress":50}}
Transport Decision Tree
Is the server running locally on the same machine?
Yes ──► STDIO
Simple setup, no auth needed, Claude Desktop compatible
No ──► Streamable HTTP
Does it need stateful sessions?
No ──► Stateless HTTP (any load balancer, prefer this)
Yes ──► Stateful HTTP (sticky sessions, more complex)
Sampling Flow
Server ──sampling/createMessage──► Client ──messages.create──► LLM
Server ◄──sampling result────────── Client ◄──response────────── LLM
Error Code Reference
| Code | Meaning | Example Trigger |
|---|---|---|
| -32700 | Parse error | {"broken json |
| -32600 | Invalid request | Missing jsonrpc field |
| -32601 | Method not found | "method": "tools/unknownMethod" |
| -32602 | Invalid params | Required param missing or wrong type |
| -32603 | Internal error | Unhandled exception in server |
MCP Initialization Sequence
Client Server
│── initialize (request, id:0) ────►│
│ {protocolVersion, capabilities} │
│ │
│◄── initialize (response, id:0) ───│
│ {protocolVersion, capabilities}│
│ │
│── initialized (notification) ─────►│
│ (no id — this is a notification) │
│ │
│ [session now active] │
Notification Reference Card
| Notification | Direction | Trigger |
|---|---|---|
notifications/progress | Server→Client | Long-running op update |
notifications/message | Server→Client | Log message |
notifications/resources/updated | Server→Client | Resource content changed |
notifications/resources/list_changed | Server→Client | Resource list changed |
notifications/tools/list_changed | Server→Client | Tool list changed |
notifications/prompts/list_changed | Server→Client | Prompt list changed |
notifications/roots/list_changed | Client→Server | Roots changed (user action) |
notifications/cancelled | Client→Server | Request cancelled |
Production Readiness Checklist
Security: [ ] TLS [ ] Auth [ ] Rate limit [ ] Input validation [ ] Audit log
Operations: [ ] /health [ ] Structured logs [ ] Metrics [ ] Graceful shutdown
Scaling: [ ] Stateless preferred [ ] Sticky sessions if stateful [ ] Backoff reconnect