DataLayer MCP Server

MCP Protocol 2025-11-25 ยท Streamable HTTP ยท FastMCP + Starlette ยท Session-scoped dynamic tools


Architecture Overview

graph TD subgraph CLIENT["๐Ÿ–ฅ๏ธ AI Client (LangGraph / Claude)"] C1["tools/list"] C2["tools/call"] end subgraph MCP_LAYER["MCP Layer ยท :8080/mcp"] MW["_McpToolFilterMiddleware<br/>โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€<br/>โ€ข tools/list โ†’ filter by session<br/>โ€ข tools/call โ†’ add namespace prefix<br/>โ€ข everything else โ†’ pass-through"] FM["FastMCP Server<br/>โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€<br/>โ€ข Streamable HTTP<br/>โ€ข Bearer token auth<br/>โ€ข InMemoryEventStore"] end subgraph ADMIN_LAYER["Admin Layer ยท :8080/admin"] A1["POST /session/init"] A2["POST /tools/register"] A3["POST /tools/unregister"] A4["POST /session/cleanup"] A5["GET /tools/list"] SEC["๐Ÿ” X-Admin-Secret header"] end subgraph STATE["In-Process State"] DT["_DYNAMIC_TOOLS<br/>{session โ†’ {name โ†’ def}}"] DH["_DYNAMIC_HANDLERS<br/>{session โ†’ {name โ†’ fn}}"] ST["_SESSION_TOKENS<br/>{session โ†’ bearer}"] KT["_KNOWN_TOKENS<br/>set of valid tokens"] end subgraph BACKEND["Backend ยท DataPlayer API"] BE["POST /fetch<br/>POST /action_handler<br/>POST /catalog"] end C1 -->|Bearer token| MW C2 -->|Bearer token| MW MW --> FM FM -->|namespaced call| DH DH -->|HTTP POST + Bearer| BACKEND ADMIN_LAYER --> STATE STATE --> MW STATE --> FM style CLIENT fill:#e8f4fd,stroke:#1976d2 style MCP_LAYER fill:#f3e5f5,stroke:#7b1fa2 style ADMIN_LAYER fill:#e8f5e9,stroke:#388e3c style STATE fill:#fff3e0,stroke:#f57c00 style BACKEND fill:#fce4ec,stroke:#c62828

Session & Tool Lifecycle

graph TD subgraph PHASE1["โ‘  Session Initialization"] direction LR P1A["โš™๏ธ DataPlayer Backend<br/>POST /session/init"] -->|"session_id ยท user_token ยท user_id"| P1B["Admin API"] --> P1C["In-Process State<br/>_KNOWN_TOKENS โ† token<br/>_SESSION_USER_IDS โ† uid"] end subgraph PHASE2["โ‘ก Tool Registration"] direction LR P2A["โš™๏ธ DataPlayer Backend<br/>POST /tools/register"] -->|"tools: [...]"| P2B["Admin API<br/>register_session_tools()"] --> P2C["In-Process State<br/>_DYNAMIC_TOOLS[sid][name] = def"] -->|"mcp.add_tool()\nname: {sid}__tool"| P2D["FastMCP"] end subgraph PHASE3["โ‘ข Client Connects"] direction LR P3A["๐Ÿค– AI Client<br/>GET /mcp + Bearer token"] --> P3B["FastMCP<br/>_KnownTokenVerifier<br/>verify_token()"] -->|"101 / SSE stream"| P3C["๐Ÿค– AI Client<br/>connected โœ“"] end subgraph PHASE4["โ‘ฃ tools/list โ€” Filtering"] direction LR P4A["๐Ÿค– AI Client<br/>tools/list"] --> P4B["Middleware<br/>filter by session<br/>strip {sid}__ prefix"] -->|"[tool_a, tool_b, ...]"| P4C["๐Ÿค– AI Client<br/>sees clean names"] end subgraph PHASE5["โ‘ค tools/call โ€” Dispatch"] direction LR P5A["๐Ÿค– AI Client<br/>tools/call: tool_a"] --> P5B["Middleware<br/>add prefix โ†’<br/>{sid}__tool_a"] --> P5C["Handler<br/>HTTP POST + Bearer<br/>โ†’ DataPlayer API"] -->|"JSON result"| P5D["๐Ÿค– AI Client"] end subgraph PHASE6["โ‘ฅ Session Cleanup"] direction LR P6A["โš™๏ธ DataPlayer Backend<br/>POST /session/cleanup"] --> P6B["Admin API<br/>cleanup_session()"] --> P6C["State + FastMCP<br/>remove_tool() ร— N<br/>discard token"] end PHASE1 --> PHASE2 --> PHASE3 --> PHASE4 --> PHASE5 --> PHASE6 style PHASE1 fill:#e8f5e9,stroke:#388e3c style PHASE2 fill:#e3f2fd,stroke:#1565c0 style PHASE3 fill:#f3e5f5,stroke:#7b1fa2 style PHASE4 fill:#fff3e0,stroke:#f57c00 style PHASE5 fill:#fce4ec,stroke:#c62828 style PHASE6 fill:#f5f5f5,stroke:#616161

Security Model

graph LR subgraph REQUESTS["Incoming Requests"] R1["๐Ÿค– AI Client<br/>โ†’ /mcp"] R2["โš™๏ธ Backend Service<br/>โ†’ /admin/*"] R3["๐Ÿ”ง Tool Execution<br/>โ†’ DataPlayer API"] end subgraph AUTH["Authentication Layers"] A1["Bearer Token<br/>Authorization: Bearer {token}"] A2["Admin Secret<br/>X-Admin-Secret: {secret}"] A3["Session Bearer<br/>Authorization: Bearer {session_token}"] end subgraph VERIFY["Verification"] V1["_KnownTokenVerifier<br/>token โˆˆ _KNOWN_TOKENS?"] V2["_require_admin()<br/>secret == MCP_ADMIN_SECRET?"] V3["_SESSION_TOKENS[session_id]"] end R1 --> A1 --> V1 R2 --> A2 --> V2 R3 --> A3 --> V3 V1 -->|"โŒ 401"| DENY["Denied"] V2 -->|"โŒ 401"| DENY V1 -->|"โœ…"| ALLOW["Access granted"] V2 -->|"โœ…"| ALLOW V3 -->|"โœ…"| PROXY["Proxy to API"] style REQUESTS fill:#e3f2fd,stroke:#1565c0 style AUTH fill:#f3e5f5,stroke:#6a1b9a style VERIFY fill:#e8f5e9,stroke:#2e7d32 style DENY fill:#ffebee,stroke:#b71c1c style ALLOW fill:#e8f5e9,stroke:#1b5e20 style PROXY fill:#fff8e1,stroke:#f57f17

Admin REST API

Method Endpoint Purpose Request body
POST /admin/session/init Create session, register token {session_id, user_token, user_id}
POST /admin/tools/register Register tools in FastMCP {session_id, tools: [...], user_token?, user_id?}
POST /admin/tools/unregister Remove a single tool {session_id, name}
POST /admin/session/cleanup Remove all tools for a session {session_id}
GET /admin/tools/list List tools for a session ?session_id=

All Admin endpoints require the X-Admin-Secret header, whose secret is derived from XPEPPER โ€” the platform's master encryption key.


Tool Definition Schema

{
  "name": "query_table",
  "title": "Query Table",
  "description": "Execute a SQL query on the connected datasource",
  "url": "http://localhost:8866/fetch",
  "action": "open_table",
  "inputSchema": {
    "type": "object",
    "properties": {
      "query":   { "type": "string",  "description": "SQL query to execute" },
      "limit":   { "type": "integer", "description": "Max rows to return" },
      "schema":  { "type": "string",  "description": "Target schema name" }
    },
    "required": ["query"]
  },
  "annotations": {
    "readOnlyHint": true,
    "destructiveHint": false
  },
  "fixed_params": {
    "connector_id": 42
  }
}

JSON Schema โ†’ Python Type Mapping

JSON Schema Python Example
string str table names, queries
integer int limit, offset, ID
number float thresholds, metrics
boolean bool flags
array list column lists
object dict nested structures

Optional parameters are wrapped in Optional[T] with default=None. FastMCP reads the signature and generates the JSON Schema automatically.


Dynamic Signature Generation

The server creates full Python functions on the fly โ€” FastMCP has no idea the tools were registered dynamically:

def _make_signature(properties: dict, required: list) -> inspect.Signature:
    params = [inspect.Parameter("ctx", inspect.Parameter.POSITIONAL_OR_KEYWORD)]
    for pname, pdef in properties.items():
        is_required = pname in required
        annotation = _json_type_to_python(pdef.get("type", "string"))
        if not is_required:
            annotation = Optional[annotation]
        params.append(inspect.Parameter(
            pname,
            inspect.Parameter.KEYWORD_ONLY,
            default=inspect.Parameter.empty if is_required else None,
            annotation=annotation,
        ))
    return inspect.Signature(params)

handler.__signature__ is replaced with the generated signature โ€” FastMCP sees (ctx, query: str, limit: Optional[int] = None) instead of (**kwargs).


Middleware: Transparent Namespacing

graph LR subgraph CLIENT_VIEW["What the client sees"] CT1["query_table"] CT2["list_schemas"] CT3["execute_sql"] end subgraph SERVER_VIEW["What the server stores"] ST1["abc123__query_table"] ST2["abc123__list_schemas"] ST3["abc123__execute_sql"] end subgraph MIDDLEWARE["_McpToolFilterMiddleware"] ML["tools/list response:<br/>strip '{sid}__' prefix"] MC["tools/call request:<br/>add '{sid}__' prefix"] end CLIENT_VIEW -->|"tools/call: query_table"| MC MC -->|"tools/call: abc123__query_table"| SERVER_VIEW SERVER_VIEW -->|"tools: abc123__query_table, ..."| ML ML -->|"tools: query_table, ..."| CLIENT_VIEW style CLIENT_VIEW fill:#e8f4fd,stroke:#1976d2 style SERVER_VIEW fill:#fce4ec,stroke:#c62828 style MIDDLEWARE fill:#fff3e0,stroke:#f57c00

Multiple sessions can register tools with identical names without any conflicts.


Destructive Operation Confirmation (MCP Elicitation)

When a tool is marked destructiveHint: true and readOnlyHint: false, the server requests explicit user confirmation via the standard MCP Elicitation mechanism:

elicit_result = await ctx.elicit(
    message=f"Confirm execution of '{tool_name}':\n\n{field_value}",
    schema=_ConfirmSchema,   # {"confirmed": bool}
)
if elicit_result.action != "accept" or not elicit_result.data.confirmed:
    return "Command execution cancelled by user."
flowchart TD CALL["tools/call: drop_table"] CHECK{"destructiveHint AND<br/>NOT readOnlyHint?"} ELICIT["ctx.elicit()<br/>โ€” ask the user"] ACCEPT{"action == 'accept'<br/>AND confirmed == true?"} CANCEL["โ†ฉ 'Cancelled by user'"] EXEC["HTTP POST โ†’ Backend API"] RESULT["โ†’ Result to client"] CALL --> CHECK CHECK -->|"No"| EXEC CHECK -->|"Yes"| ELICIT ELICIT --> ACCEPT ACCEPT -->|"No"| CANCEL ACCEPT -->|"Yes"| EXEC EXEC --> RESULT style CANCEL fill:#ffebee,stroke:#b71c1c style EXEC fill:#e8f5e9,stroke:#2e7d32

InMemoryEventStore โ€” Session Resumability

class InMemoryEventStore(EventStore):
    """SSE Last-Event-ID replay for reconnect without losing events."""

    async def store_event(self, stream_id: str, message) -> str:
        event_id = str(uuid.uuid4())
        self._events.append((event_id, stream_id, message))
        return event_id

    async def replay_events_after(self, last_event_id: str, send_callback):
        # Replays all events after last_event_id
        for eid, sid, msg in events_after_last:
            await send_callback(EventMessage(message=msg, event_id=eid))

Limitation: events are stored in process memory. They are lost on server restart. For production reliability, swap in a Redis-backed implementation.


Performance & Concurrency

{
  "title": { "text": "Server Characteristics", "left": "center", "textStyle": { "fontSize": 14 } },
  "radar": {
    "indicator": [
      { "name": "Session\nConcurrency",   "max": 100 },
      { "name": "Tool\nIsolation",        "max": 100 },
      { "name": "Security",               "max": 100 },
      { "name": "MCP\nCompliance",        "max": 100 },
      { "name": "Extensibility",          "max": 100 },
      { "name": "Performance",            "max": 100 }
    ],
    "radius": 110
  },
  "series": [{
    "type": "radar",
    "data": [{
      "value": [95, 100, 88, 100, 92, 85],
      "name": "DataLayer MCP",
      "areaStyle": { "opacity": 0.25 },
      "lineStyle": { "width": 2, "color": "#1976d2" },
      "itemStyle": { "color": "#1976d2" }
    }]
  }]
}
Characteristic Value
Session idle timeout 1800 s (30 min)
HTTP timeout per call 50 s
Mutation lock asyncio.Lock on all _DYNAMIC_TOOLS operations
Transport Streamable HTTP (SSE + JSON-RPC)
Concurrency Fully async โ€” uvicorn + asyncio

Startup Configuration

# Environment variables
HOST=0.0.0.0      # bind address
PORT=8080         # MCP server port

# In dbwebtool/settings.py
MCP_ADMIN_SECRET   # secret for /admin/* endpoints
DATAPLAYER_API_URL # backend base URL (for AuthSettings)
XPEPPER            # master key (MCP_ADMIN_SECRET is derived from it)
# Run directly
python dataplayer_mcp_server.py

# Or via uvicorn
uvicorn dataplayer_mcp_server:create_app --host 0.0.0.0 --port 8080 --factory

The backend starts the MCP server as a subprocess automatically when MCP_AUTOSTART=True during startup_handler().


Example: Full Registration Cycle via Admin API

# 1. Initialize session
curl -X POST http://localhost:8080/admin/session/init \
  -H "X-Admin-Secret: <secret>" \
  -H "Content-Type: application/json" \
  -d '{"session_id": "sess_42", "user_token": "tok_abc", "user_id": 7}'

# 2. Register tools
curl -X POST http://localhost:8080/admin/tools/register \
  -H "X-Admin-Secret: <secret>" \
  -H "Content-Type: application/json" \
  -d '{
    "session_id": "sess_42",
    "tools": [{
      "name": "run_query",
      "description": "Run a SQL query",
      "url": "http://localhost:8866/fetch",
      "action": "open_table",
      "inputSchema": {
        "type": "object",
        "properties": { "query": { "type": "string" } },
        "required": ["query"]
      },
      "annotations": { "readOnlyHint": true }
    }]
  }'

# 3. AI client connects to /mcp
# Authorization: Bearer tok_abc
# โ†’ sees tool "run_query" (no prefix)

# 4. Cleanup after session ends
curl -X POST http://localhost:8080/admin/session/cleanup \
  -H "X-Admin-Secret: <secret>" \
  -d '{"session_id": "sess_42"}'

Integration with DataPlayer

graph TB subgraph DATAPLAYER["DataPlayer Backend ยท :8866"] DS["webserver_starter.py<br/>startup_handler()"] AI["ai_handler/<br/>langgraph_mcp.py"] SK["skill_loader.py<br/>mcp_client_http.py"] DIS["ai_dispatcher.py"] end subgraph MCP_PROC["MCP Server ยท :8080"] MCP["dataplayer_mcp_server.py"] end subgraph QDRANT["Vector DB ยท :6333"] QD["Tool embeddings<br/>search_tools()"] end DS -->|"subprocess start<br/>MCP_AUTOSTART=True"| MCP_PROC DS -->|"POST /admin/session/init<br/>POST /admin/tools/register"| MCP_PROC AI --> DIS DIS --> SK SK -->|"semantic search"| QD QD -->|"top-k tools"| SK SK -->|"Bearer token"| MCP style DATAPLAYER fill:#e3f2fd,stroke:#1565c0 style MCP_PROC fill:#f3e5f5,stroke:#7b1fa2 style QDRANT fill:#fff8e1,stroke:#f57f17

The LangGraph AI agent: 1. Receives a user request 2. Uses search_tools(semantic_query) to find relevant tools in Qdrant 3. Connects to the MCP server with the session Bearer token 4. Invokes tools โ€” they HTTP-proxy calls to the DataPlayer API 5. Results are streamed back to the client via WebSocket (/ws/)


Key Files

File Role
server/MCP/dataplayer_mcp_server.py The entire MCP server โ€” single file
ai_handler/langgraph_mcp.py LangGraph pipeline, MCP client
ai_handler/skill_loader.py Tool loading and search
ai_handler/mcp_client_http.py HTTP client to MCP
settings.py MCP_ADMIN_SECRET, DATAPLAYER_API_URL
webserver_starter.py MCP subprocess launch