This module covers the design decisions that go into a production-grade operational MCP server. We use Harrier EMR MCP as the running example, but the patterns apply to any MCP server that reads AWS operational context.
The core design problem
MCP gives your agent a set of callable tools. The agent decides when to call them, what arguments to pass, and how to use the results. This is powerful — and dangerous if the tools are not carefully scoped.
The failure mode is an agent that calls list_all_s3_objects on a 20TB bucket, or describe_all_ec2_instances across every region, or worse — calls a tool that accidentally has write access.
The design goal is: make the right thing easy and the wrong thing impossible.
Tool design
One tool per concern
Each MCP tool should do exactly one thing. Resist the temptation to build a diagnose_everything tool that tries to cover every scenario.
# Bad: too broad, unpredictable cost
@server.call_tool()
async def diagnose_everything(cluster_id: str) -> str: ...
# Good: bounded scope, predictable cost
@server.call_tool()
async def get_emr_job_run_state(application_id: str, job_run_id: str) -> str: ...
@server.call_tool()
async def get_emr_driver_logs(application_id: str, job_run_id: str) -> str: ...
@server.call_tool()
async def classify_spark_failure(evidence_json: str) -> str: ...
Tool descriptions are instructions
The description field in your MCP tool definition is how the agent decides when to call the tool. Be specific about preconditions and output format.
@server.call_tool()
async def get_emr_job_run_state(application_id: str, job_run_id: str) -> str:
"""
Fetch the current state and failure reason for an EMR Serverless job run.
Returns JSON with: state, stateDetails, failureReason, totalResourceUtilization,
executionTimeoutMinutes, networkConfiguration, and configurationOverrides.
Use this as the first step in any EMR Serverless diagnosis — it establishes
whether the job failed and surfaces the top-level failure reason before
inspecting logs.
Prerequisites: application_id and job_run_id are required. Both are visible
in the EMR Serverless console and in CloudWatch logs.
"""
Evidence collection
Pull only what you need
EMR jobs generate a lot of log data. Driver logs can be gigabytes. CloudWatch can have thousands of log streams. Your tool should pull bounded, targeted evidence — not everything.
MAX_LOG_BYTES = 256 * 1024 # 256KB per log source
async def collect_driver_logs(s3_prefix: str) -> str:
# Find only the most recent driver log
recent_key = await find_most_recent_log(s3_prefix, pattern="stderr.gz")
if not recent_key:
return "No driver logs found at prefix."
# Tail the end of the file where failures typically appear
raw = await s3_read_range(recent_key, tail_bytes=MAX_LOG_BYTES)
return redact(raw.decode("utf-8", errors="replace"))
Annotate, don’t just relay
Raw log text is noisy. Good evidence collection annotates what it found:
def format_evidence(raw_logs: str, classified_pattern: str | None) -> str:
lines = [
"=== Driver Log Evidence ===",
f"Classified pattern: {classified_pattern or 'UNCLASSIFIED'}",
"",
"--- Relevant excerpts ---",
]
lines.extend(extract_relevant_lines(raw_logs, max_lines=60))
return "\n".join(lines)
Pattern classification
Classification turns raw evidence into a named signal. A named signal is what the agent can reason about.
from dataclasses import dataclass
@dataclass
class ClassificationResult:
pattern: str
confidence: str # HIGH, MEDIUM_HIGH, MEDIUM, LOW
matched_signals: list[str]
FAILURE_PATTERNS = {
"SHUFFLE_SPILL": {
"signals": ["task.result.serialize", "ExternalSorter", "bytes spilled to disk"],
"confidence": "MEDIUM_HIGH",
},
"OOM_DRIVER": {
"signals": ["java.lang.OutOfMemoryError", "GC overhead limit", "java.lang.Heap"],
"confidence": "HIGH",
},
"MISSING_INPUT": {
"signals": ["Path does not exist", "FileNotFoundException", "NoSuchKey"],
"confidence": "HIGH",
},
"NETWORK_TIMEOUT": {
"signals": ["Connection timed out", "SocketTimeoutException", "connection reset"],
"confidence": "MEDIUM",
},
}
def classify(log_text: str) -> ClassificationResult | None:
for pattern_name, config in FAILURE_PATTERNS.items():
matched = [s for s in config["signals"] if s in log_text]
if matched:
return ClassificationResult(
pattern=pattern_name,
confidence=config["confidence"],
matched_signals=matched,
)
return None
Slides: architecture overview
Slides will be updated with diagrams from the Harrier architecture documentation.
Secrets handling
Before any log text reaches the language model, run it through a redaction pass.
import re
REDACT_PATTERNS = [
(r"AKIA[0-9A-Z]{16}", "[AWS_ACCESS_KEY]"),
(r"(?i)(password|passwd|pwd)\s*[=:]\s*\S+", "[PASSWORD]"),
(r"(?i)(secret|token|api_?key)\s*[=:]\s*\S+", "[SECRET]"),
(r"(?i)Bearer\s+[A-Za-z0-9._-]+", "Bearer [TOKEN]"),
# Add your own patterns for internal secrets
]
def redact(text: str) -> str:
for pattern, replacement in REDACT_PATTERNS:
text = re.sub(pattern, replacement, text)
return text
Treat this as a best-effort safety net, not a compliance guarantee. For high-sensitivity environments, review the security model.
IAM scoping
Your MCP server should run with the minimum IAM permissions needed. For Harrier, that means:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "EMRReadOnly",
"Effect": "Allow",
"Action": [
"emr-serverless:GetApplication",
"emr-serverless:GetJobRun",
"emr-serverless:ListJobRuns"
],
"Resource": "arn:aws:emr-serverless:*:*:/applications/*"
},
{
"Sid": "LogsReadOnly",
"Effect": "Allow",
"Action": [
"logs:GetLogEvents",
"logs:FilterLogEvents",
"logs:DescribeLogGroups",
"logs:DescribeLogStreams"
],
"Resource": "*"
},
{
"Sid": "S3LogsReadOnly",
"Effect": "Allow",
"Action": ["s3:GetObject", "s3:ListBucket"],
"Resource": "arn:aws:s3:::your-emr-logs-bucket/*"
}
]
}
No write permissions. No wildcard on sensitive services. No cross-account access unless explicitly required.
Next steps
- Read the Harrier architecture decisions for the full rationale behind these patterns
- Review the MCP Tool Contracts for the complete Harrier tool surface
- Apply these patterns to your own operational MCP servers