Skip to content
8 min read

Building Model Context Protocol Servers: A Deep Dive

What I learned building production MCP servers in Python: the architecture, caching, and security patterns shared by gopher-mcp and openzim-mcp.

Having architected distributed systems across enterprise environments for over a decade, the Model Context Protocol represents a paradigm shift that addresses fundamental challenges in AI tooling infrastructure. Through the development of production-grade MCP servers including gopher-mcp and openzim-mcp, I’ve identified architectural patterns and implementation strategies that demonstrate MCP’s potential to revolutionize how AI systems interact with external resources.

Update (June 2025): I’ve split this comprehensive guide into two focused articles for better readability:

Understanding the Model Context Protocol Architecture

The Model Context Protocol addresses a critical gap in AI system architecture: the secure, standardized integration of external resources without compromising system integrity or performance. This protocol establishes a formal contract between AI models and external data sources, eliminating the ad-hoc integration patterns that have plagued enterprise AI deployments.

MCP functions as an abstraction layer that enables AI models to interact with heterogeneous external resources—from legacy protocol implementations to modern API endpoints—through a unified interface. This architectural approach reflects decades of distributed systems engineering principles applied to the unique challenges of AI tooling.

Strategic Advantages of MCP Implementation

  • Zero-Trust Security Model: Implements capability-based security with explicit permission boundaries, eliminating the attack vectors inherent in traditional plugin architectures
  • Protocol Standardization: Establishes consistent interaction patterns that reduce integration complexity and maintenance overhead across diverse resource types
  • Horizontal Scalability: Designed for extensibility without architectural debt, enabling rapid capability expansion without system redesign
  • Performance Optimization: Native support for caching, connection pooling, and resource lifecycle management that scales with enterprise workloads

Visualizing the 'Protocol Abstraction Layer' pattern, showing how the system separates high-level requests from low-level resource handling.

Architectural Patterns for Production MCP Systems

Through the implementation of multiple production-grade MCP servers, several critical architectural patterns have emerged that address scalability, maintainability, and operational concerns. These patterns reflect established principles from distributed systems engineering, adapted for the unique requirements of AI resource integration:

1. Resource-Centric Design

The pattern looks like this—Pydantic models for the data contract, a Protocol for the behavior:

from typing import Protocol

from pydantic import BaseModel


class Resource(BaseModel):
    uri: str
    name: str
    description: str | None = None
    mime_type: str | None = None


class ResourceProvider(Protocol):
    async def list_resources(self) -> list[Resource]: ...
    async def read_resource(self, uri: str) -> bytes: ...

This abstraction implements the Strategy pattern at the infrastructure level, enabling runtime backend substitution without affecting core business logic. The separation of concerns between resource discovery and access provides the foundation for implementing sophisticated caching strategies, load balancing, and failover mechanisms essential for production deployments.

2. Protocol Abstraction Layer

The gopher-mcp implementation required supporting multiple protocol families (Gopher and Gemini), presenting an opportunity to demonstrate protocol abstraction at scale. Rather than implementing protocol-specific handlers in isolation, a unified abstraction layer enables consistent behavior across diverse protocol implementations. Both clients expose the same async surface and inherit shared caching behavior, and each is registered as a FastMCP tool (condensed from server.py and the client modules):

class GopherClient(TTLCacheMixin[GopherFetchResponse]):
    """Async Gopher protocol client with caching and safety features."""

    async def fetch(self, url: str) -> GopherFetchResponse: ...


class GeminiClient(TTLCacheMixin[GeminiFetchResponse]):
    """Async Gemini protocol client with TLS, caching and safety features."""

    async def fetch(self, url: str) -> GeminiFetchResponse: ...


@mcp.tool(title="Fetch Gopher resource")
async def gopher_fetch(url: str) -> dict[str, Any]:
    client = await client_manager.get_gopher_client()
    response = await client.fetch(url)
    return response.model_dump()

This architectural approach demonstrates the Open/Closed Principle in practice—the system remains open for extension while closed for modification. Protocol addition becomes a matter of implementing the same client interface rather than core system modification, ensuring system stability while enabling rapid capability expansion.

3. Async-First Architecture

Production MCP servers must handle concurrent request loads while maintaining sub-millisecond response times for cached resources. Blocking I/O operations represent a fundamental scalability bottleneck that can cascade through the entire system. Python’s asyncio event loop provides the foundation here—every I/O path in gopher-mcp is async end to end—and because the event loop serializes access to shared state, the cache layer needs no locking at all. Both protocol clients inherit the same TTL + LRU cache behavior from a shared mixin (condensed from cache.py):

import time
from collections import OrderedDict
from typing import Generic, TypeVar

V = TypeVar("V")


class TTLCacheMixin(Generic[V]):
    """Shared TTL + LRU cache behavior for the Gopher and Gemini clients."""

    _cache: "OrderedDict[str, _BaseCacheEntry[V]]"

    def _get_cached_response(self, url: str) -> V | None:
        entry = self._cache.get(url)
        if entry is None:
            return None
        if entry.is_expired(time.time()):
            del self._cache[url]
            return None
        self._cache.move_to_end(url)  # mark as most recently used
        return entry.value

    def _cache_response(self, url: str, response: V) -> None:
        if len(self._cache) >= self.max_cache_entries:
            self._cache.popitem(last=False)  # evict the LRU entry
        self._cache[url] = self._cache_entry_cls(
            key=url,
            value=response,
            timestamp=time.time(),
            ttl=self.cache_ttl_seconds,
        )

Abstract representation of searching within compressed data structures, relevant to the OpenZIM case study.

Case Study: OpenZIM MCP Server Architecture

The openzim-mcp implementation addresses the complex challenge of providing sub-second search capabilities across compressed knowledge bases containing millions of articles. This represents a classic systems engineering problem: optimizing for both storage efficiency and query performance while maintaining memory constraints suitable for edge deployment scenarios.

ZIM File Handling

The fundamental challenge involves implementing efficient search algorithms over compressed data structures without incurring the computational overhead of full decompression. This requires sophisticated indexing strategies that balance memory utilization against query performance—a problem domain that intersects information retrieval, data compression theory, and systems optimization. The ZIM format addresses this by shipping an embedded Xapian full-text index inside the archive, which libzim exposes directly—so the server’s job is orchestration rather than index construction. The core search path (condensed from openzim-mcp’s zim/search.py):

from libzim.search import Query, Searcher


class _SearchMixin:
    def _perform_search(
        self, archive: Archive, query: str, limit: int, offset: int
    ) -> dict[str, Any]:
        searcher = Searcher(archive)
        search = searcher.search(Query().set_query(query))
        total_results = search.getEstimatedMatches()

        results = []
        for entry_id in search.getResults(offset, limit):
            entry = archive.get_entry_by_path(entry_id)
            results.append({"path": entry_id, "title": entry.title})

        return {"query": query, "total": total_results, "results": results}

Performance Tricks I Discovered

The optimization strategy implements several critical performance patterns:

  1. Demand-Driven Resource Loading: Implements lazy evaluation patterns to minimize memory footprint and initialization overhead
  2. Inverted Index Architecture: Leverages the Xapian full-text index embedded in ZIM archives—exposed through libzim—for fast search across massive document collections without building a separate index
  3. Memory-Mapped I/O: Delegates page cache management to the kernel, enabling efficient memory utilization without explicit cache implementation
  4. Result Caching: Caches paginated search results to amortize expensive query costs across request lifecycles

A visual metaphor for the Gopher MCP server: wrapping 1990s internet protocols in modern server architecture.

Case Study: Gopher MCP Server Implementation

The gopher-mcp server demonstrates how legacy protocol implementations can provide valuable insights into minimalist system design. The Gopher protocol’s simplicity—predating the complexity layers that characterize modern web protocols—offers architectural lessons about the relationship between protocol complexity and system reliability.

Protocol Implementation

The transport layer (condensed from gopher_transport.py) reflects the protocol’s simplicity—a TCP connection, a selector line, and a bounded read:

import asyncio


async def fetch_gopher(
    host: str,
    port: int,
    selector: str,
    *,
    search: str | None = None,
    max_bytes: int = DEFAULT_MAX_RESPONSE_SIZE,
    timeout: float = DEFAULT_TIMEOUT_SECONDS,
) -> bytes:
    async def _io() -> bytes:
        reader, writer = await asyncio.open_connection(host, port)
        try:
            # "selector[<TAB>search]<CR><LF>", UTF-8 encoded
            writer.write(build_request(selector, search))
            await writer.drain()

            chunks: list[bytes] = []
            total = 0
            while True:
                chunk = await reader.read(min(READ_CHUNK, max_bytes - total + 1))
                if not chunk:
                    break
                total += len(chunk)
                if total > max_bytes:
                    raise GopherProtocolError("Response exceeded size limit")
                chunks.append(chunk)
            return b"".join(chunks)
        finally:
            writer.close()
            await writer.wait_closed()

    # One deadline covers connect, send, and read
    return await asyncio.wait_for(_io(), timeout=timeout)

Content Type Detection

Gopher uses a simple but effective type system. In gopher-mcp, the mapping lives in mime.py as plain data:

type_mappings = {
    "0": "text/plain",                # text file
    "1": "text/gopher-menu",          # menu / directory
    "4": "application/mac-binhex40",  # BinHex file
    "7": "text/gopher-menu",          # search server
    "9": "application/octet-stream",  # binary file
    "g": "image/gif",
    "I": "image/jpeg",
    # ... more mappings
}


def guess_mime_type(gopher_type: str, selector: str = "") -> str:
    """Map a Gopher item type to a MIME type.

    The full implementation also refines ambiguous types using the
    selector's file extension (.html, .png, .pdf, ...).
    """
    return type_mappings.get(gopher_type, "application/octet-stream")

Best Practices for MCP Server Development

1. Error Handling

Implement comprehensive error handling with context. gopher-mcp draws a line between internal exceptions and what crosses the tool boundary:

class GopherProtocolError(Exception):
    """Raised internally when a Gopher request cannot be completed."""


class ErrorResult(BaseModel):
    """Structured error payload returned to the model."""

    kind: Literal["error"] = "error"
    error: dict[str, str]

The split matters: transport failures raise GopherProtocolError inside the client, but the tool boundary converts everything into a structured ErrorResult. An exception that escapes an MCP tool is an opaque failure to the calling model; a structured error is something it can reason about and recover from.

2. Configuration Management

Use structured configuration with validation. gopher-mcp builds its config from pydantic-settings models keyed by environment prefix:

from pydantic_settings import BaseSettings, SettingsConfigDict


class GopherConfig(BaseSettings):
    """Gopher client settings, read from GOPHER_* environment variables."""

    model_config = SettingsConfigDict(env_prefix="GOPHER_", env_file=".env")

    max_response_size: int = 1024 * 1024  # 1MB
    timeout_seconds: float = 30.0
    cache_enabled: bool = True
    cache_ttl_seconds: int = 300
    max_cache_entries: int = 1000
    allowed_hosts: list[str] | None = None  # None = allow all
    allow_local_hosts: bool = False  # SSRF protection

Each surface gets its own settings class—GOPHER_*, GEMINI_*, and GOPHER_MCP_* for the server itself—with field validators that parse comma-separated environment values into typed lists.

3. Testing Strategy

Implement comprehensive testing including integration tests. With pytest and pytest-asyncio, the shape looks like this:

import pytest


def test_gopher_url_parsing() -> None:
    parsed = parse_gopher_url("gopher://example.com/0/about.txt")
    assert parsed.host == "example.com"
    assert parsed.port == 70
    assert parsed.gopher_type == "0"


@pytest.mark.asyncio
async def test_fetch_returns_menu_result() -> None:
    client = GopherClient(cache_enabled=False)
    result = await client.fetch("gopher://gopher.example/")
    assert result.kind == "menu"

Performance Considerations

Memory Management

  • Use streaming for large resources
  • Implement proper caching strategies
  • Monitor memory usage in production

Concurrency

  • Design for high concurrency from the start
  • Use appropriate synchronization primitives
  • Consider backpressure mechanisms

Network Efficiency

  • Implement connection pooling
  • Use compression when appropriate
  • Handle network timeouts gracefully

Deployment and Monitoring

Docker Deployment

# Build stage: produce a wheel from the source tree
FROM python:3.14-slim AS build
RUN pip install --no-cache-dir uv
WORKDIR /src
COPY . .
RUN uv build --wheel --out-dir /dist

# Runtime stage: install just the wheel, run as a non-root user
FROM python:3.14-slim
RUN useradd --create-home --uid 10001 app
COPY --from=build /dist/*.whl /tmp/
RUN pip install --no-cache-dir /tmp/*.whl && rm -f /tmp/*.whl
USER app

EXPOSE 8000
ENTRYPOINT ["gopher-mcp"]
CMD ["--transport", "streamable-http", "--host", "0.0.0.0", "--port", "8000"]

Health Checks

Health checks look different for MCP servers than for web services: over the stdio transport there is no HTTP surface to probe, so it pays to expose health as a first-class tool the client can call like any other—openzim-mcp ships one as zim_health. The pattern looks like this:

@mcp.tool()
async def health_check() -> dict[str, str]:
    """Lightweight liveness probe callable by any MCP client."""
    return {"status": "ok"}

Future Directions in MCP Architecture

The MCP ecosystem represents an emerging infrastructure layer with significant implications for enterprise AI deployment strategies. Several architectural evolution paths warrant investigation:

  • Streaming Protocol Extensions: Implementing backpressure-aware streaming for large dataset processing without memory exhaustion
  • Zero-Trust Authentication Models: Developing capability-based security frameworks that scale across federated MCP deployments
  • Distributed MCP Federations: Architecting service mesh patterns for MCP server orchestration and load distribution
  • Observability Infrastructure: Implementing distributed tracing and metrics collection for complex MCP interaction patterns

Strategic Implications and Future Outlook

The development of production-grade MCP servers reveals fundamental patterns that will shape the next generation of AI infrastructure. These implementations demonstrate that the Model Context Protocol represents more than a technical specification—it embodies a architectural philosophy that prioritizes security, scalability, and operational excellence.

The strategic insight emerging from this work centers on progressive complexity management: begin with minimal viable implementations, establish comprehensive observability, and iterate based on production feedback. The Model Context Protocol’s maturation trajectory suggests it will become foundational infrastructure for enterprise AI deployments, requiring the same engineering rigor applied to other critical system components.

The architectural patterns documented here provide a foundation for building AI systems that are not merely functional, but operationally excellent—systems that scale gracefully, fail safely, and evolve sustainably as requirements change.

Dive Deeper

For more focused, practical guides on building specific types of MCP servers, check out these detailed articles:


Want to explore these concepts further? Check out the gopher-mcp and openzim-mcp repositories for complete implementations.

Was this helpful?

Have questions about this article?

Ask can help explain concepts, provide context, or point you to related content.