A practical guide to building an observable Phishing Triage Assistant with MCP and structured logging
TLDR: We show structured logging of AI Agents with MCP to tackle Phishing Triage, allowing continuous security monitoring in a SIEM and automated remediation in a SOAR.
As enterprises race to deploy AI agents, they are creating a powerful new workforce capable of automating complex tasks. Yet, this speed and autonomy introduce a critical challenge: a new, unmonitored attack surface. How can you ensure these autonomous agents are operating safely? How do you provide a clear, defensible audit trail for compliance?
The core problem is an operational blind spot. When an AI agent interacts with a suite of tools – accessing APIs, querying databases, or analyzing files – its decision-making process is often scattered across different systems or lost entirely. Traditional application logging methods fail to capture the agent’s chain of thought and action, making it very challenging to perform effective security monitoring or forensic analysis when something goes wrong.
This post introduces a solution: using client-side logging with the Model Context Protocol (MCP) to create a unified flight data recorder for every agent action. This approach provides a complete, contextualized, and immutable audit trail of an agent’s behavior, directly from the agent’s perspective.
To demonstrate the power of this technique, we build a practical AI Phishing Triage Assistant. We show how to instrument this agent to produce rich, structured security logs that can be directly fed into a SIEM, turning your AI agents from opaque black boxes into fully observable, auditable assets.
AI Phishing Triage Assistant
Phishing is a top-tier challenge for virtually every Security Operations Center (SOC). Reviewing the high-volume of suspicious emails is time consuming – let’s use an AI agent to automate the process! The workflow involves ingesting a reported email, extracting Indicators of Compromise (IOCs) like URLs and file hashes, enriching these with threat intelligence, and executing a response. We want to reduce the Mean Time to Respond (MTTR) by automating the IOC extraction and enrichment, freeing up valuable analyst time for high-level threat hunting.
For this example we implement a ReAct agent using LangGraph, which combines chain-of-thought reasoning with MCP tools exposed by FastMCP. See the full example code on GitHub.
MCP Tool Server
Let’s give our agent the tools necessary to perform a phishing triage. We expose 3 tools to the agent:
1) extract_iocs_from_email -parses the email and gathers IOCs
2) get_url_reputation - calls threat intelligence APIs to determine URL reputation
3) get_filehash_reputation - calls threat intelligence APIs to determine attachment reputation
For this demonstration, we mock calls to external threat intelligence APIs (e.g. VirusTotal, AbuseIPDB, etc.).
def mock_threat_intel_api_url(url: str) -> dict:
if "evil-phish.com" in url:
return {"verdict": "malicious", "threat_type": "phishing"}
return {"verdict": "benign", "threat_type": None}
def mock_threat_intel_api_hash(file_hash: str) -> dict:
if file_hash.startswith("e88482"):
return {"verdict": "malicious", "threat_type": "malware"}
return {"verdict": "benign", "threat_type": None}
Using these APIs, we expose the 3 tools through FastMCP.
import re
from fastmcp import FastMCP, Context
mcp = FastMCP("PhishingTriage")
@mcp.tool
async def extract_iocs_from_email(email_body: str, ctx: Context) -> dict:
"""
Extracts Indicators of Compromise (IOCs) such as URLs, domains, IP addresses,
and file hashes from the body of an email.
"""
await ctx.info("Starting IOC extraction from email body.")
# Simple regex for demo purposes. Production use requires more robust parsing.
urls = re.findall(r'https?://\S+', email_body)
hashes_md5 = re.findall(r'\b[a-fA-F0-9]{32}\b', email_body)
iocs = {"urls": urls, "hashes_md5": hashes_md5}
await ctx.info(f"Extracted IOCs: {iocs}")
return {"success": True, "iocs": iocs}
@mcp.tool
async def get_url_reputation(url: str, ctx: Context) -> dict:
"""
Checks the reputation of a given URL against a threat intelligence source.
"""
await ctx.info(f"Querying threat intelligence for URL: {url}")
try:
result = mock_threat_intel_api_url(url)
await ctx.info(
f"Reputation for {url} is '{result['verdict']}'"
f"with threat type '{result['threat_type']}'."
)
return {"success": True, "url": url, **result}
except Exception as e:
await ctx.error(f"Failed to get URL reputation for {url}: {str(e)}")
raise
@mcp.tool
async def get_file_hash_reputation(file_hash: str, ctx:Context) -> dict:
"""
Checks the reputation of a given MD5 file hash against a threat intelligence source.
"""
await ctx.info(f"Querying threat intelligence for file hash: {file_hash}")
try:
result = mock_threat_intel_api_hash(file_hash)
await ctx.info(
f"Reputation for {file_hash} is '{result['verdict']}'"
f"with threat type '{result['threat_type']}'."
)
return {"success": True, "file_hash": file_hash, **result}
except Exception as e:
await ctx.error(f"Failed to get file hash reputation for {file_hash}: {str(e)}")
raise
Each tool uses the ctx (Context) to record the logs through the client-side API. This implements the MCP standard definition for sending structured logs to clients. Critically, client-side logging allows us to centralize all records for the agent in one unified stream, instead of having to stitch the logs together after the fact.
if __name__ == "__main__":
mcp.run(transport="streamable-http")
We expose the MCP server through Streamable HTTP instead of stdio, to simulate a realistic microservices architecture, where multiple agents consume tools over a network. In a production setting, TLS encryption and private networks (e.g. VPC) are important security considerations to ensure access is restricted to trusted applications and agents.
Structured Logging
Instead of traditional unstructured application logs, we use structured logging to make the data actionable. This allows us to route the logs through our Security Data Pipeline Platform (SDPP) to our SIEM, MDR provider, and compliance archive. SDPPs, like Realm.Security, allow the AI audit logs to be filtered for specific destinations, reducing costs for expensive SIEM storage/compute by keeping only the most relevant logs there. All logs are simultaneously routed to an archive for compliance and later forensic analysis.
The following fields cover the core telemetry necessary to track each agent individually. We can correlate logs across a given agent, task, and MCP server, providing a clear audit trail that describes the actions taken by AI.
| Field Name | Data Type | Description | Example |
|---|---|---|---|
timestamp | String (ISO 8601) | UTC timestamp of when the event occured | "2025-07-24T10:30:55.123Z" |
log_level | String | Severity of the log event (e.g. INFO, WARNING, ERROR) | "INFO" |
message | String | Human-readable summary of the event | "Querying threat intelligence for URL" |
agent_id | String | Unique identifier for the specific AI agent instance generating the log | "phishing-triage-agent-01" |
agent_name | String | Functional name of the agent | "PhishingTriageAssistant" |
model_id | String | Unique identifier for the LLM model used by the AI agent | "us.anthropic.claude-3-7-sonnet-20250219-v1:0" |
principal_user_id | String | ID of the user or service account that invoked the agent’s task (for accountability) | "soc_analyst_jane_doe" |
event_correlation_id | String | Unique ID for the entire agent task, allowing correlation across multiple log entries | "c4a2b8e0-1b1f-4a1e-9c4c-2b8e0a1b1f4a" |
event_action | String | Specific action or tool being used by the agent | "get_url_reputation" |
source_mcp_server | String | Name of the MCP server that provided the tool | "PhishingTriage" |
For our AI Phishing Triage Agent, we include additional fields relevant to the task. These fields enable our SOAR to automatically alert the SOC team, block URLs at the firewall, and create a ticket to track follow up actions. This greatly improves the speed of remediation and mitigation, preventing a phishing campaign from growing across the organization, while keeping the SOC team informed and in control of the next steps.
| Field Name | Data Type | Description | Example |
|---|---|---|---|
threat_indicator_type | String | Type of Indicator of Compromise (IOC) being analyzed | "url" |
threat_indicator_value | String | Actual value of the IOC | "http://evil-phish.com/login" |
threat_verdict | String | Outcome of the threat analysis (e.g. benign, suspicious, malicious) – used for alerting | "malicious" |
threat_summary | String | Summary explaining the reasoning for the threat verdict | "Based on the investigation..." |
threat_type | String | Specific category of threat identified | "phishing" |
The structure logs provide a clear audit trail across a single AI agent that can be retained for compliance and forensic purposes. Sending these logs to an archive allows for low-cost storage of the data.
Agent Client
Let’s set up our agent to produce structured logs using our schema above. There are multiple structured logging libraries, including structlog, python-json-logger, Loguru. We choose python-json-logger for simplicity, since it integrates into the existing logging module.
The logging Python standard library has an extra argument that supports additional keyword arguments. We create a custom formatter to output JSON and pass through the extra arguments to the top-level, matching our schema.
from pythonjsonlogger import jsonlogger
class CustomJsonFormatter(jsonlogger.JsonFormatter):
def add_fields(self, log_record, record, message_dict):
super().add_fields(log_record, record, message_dict)
# Rename standard fields for consistency
log_record['timestamp'] = log_record.pop('asctime')
log_record['log_level'] = log_record.pop('levelname')
# Ensure all 'extra' fields are at the top level
if 'extra' in log_record:
for key, value in log_record['extra'].items():
log_record[key] = value
del log_record['extra']
From there, we define a convenience function to get the structured logger.
import logging
def get_structured_logger(name, level=logging.INFO):
logger = logging.getLogger(name)
logger.setLevel(level)
if not logger.handlers:
console_handler = logging.StreamHandler()
# Use our custom formatter
formatter = CustomJsonFormatter('%(asctime)s %(levelname)s %(message)s')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
This allows us to create loggers for both the agent and the MCP server.
MCP_SERVER_NAME = "phishing_triage_mcp"
AGENT_ID = "phishing-triage-agent-01"
AGENT_NAME = "PhishingTriageAssistant"
MODEL_ID = "us.anthropic.claude-3-7-sonnet-20250219-v1:0"
# Get loggers for the agent's own actions and for the MCP tool logs
agent_logger = get_structured_logger(__name__)
mcp_logger = get_structured_logger(MCP_SERVER_NAME)
On the agent, we define a callback function that handles the incoming log messages from the MCP server.
from fastmcp.client.logging import LogMessage
LOGGING_LEVEL_MAP = logging.getLevelNamesMapping()
async def log_handler(message: LogMessage, correlation_id: str, user_id: str):
# Map the logging level string from MCP to the Python enum
level = LOGGING_LEVEL_MAP[message.level.upper()]
# Populate the structured log schema using 'extra'
extra_context = {
"agent_id": AGENT_ID,
"agent_name": AGENT_NAME,
"model_id": MODEL_ID,
"principal_user_id": user_id,
"event_correlation_id": correlation_id,
"source_mcp_server": MCP_SERVER_NAME
}
mcp_logger.log(level, message.data, extra=extra_context)
Since the MCP standard only defines log levels as strings, we map them to the values appropriate for the Python standard library. The additional fields are passed through the extra argument.
We connect the agent to the MCP server using the langchain_mcp_adapters package, which provides a convenient wrapper for integrating with our agent, since we are working in the LangChain ecosystem. There are a number of alternatives for connecting to the MCP server depending on the agent framework, including the FastMCP client and mcpadapt (used by CrewAI).
The logging_callback argument allows us to specify a callback function to handle the log coming in. Since the MultiServerMCPClient constructor doesn’t directly accept the necessary session arguments for the callback, we explicitly define the StreamableHttpConnection to inject the handler.
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain_mcp_adapters.sessions import StreamableHttpConnection
client = MultiServerMCPClient()
client.connections = {
MCP_SERVER_NAME: StreamableHttpConnection(
transport="streamable_http",
url="http://localhost:8000/mcp",
session_kwargs={
# Set the logging callback to handle client-side logs
"logging_callback": lambda msg: log_handler(msg, correlation_id, user_id)
}
)
}
The client generates the tool definitions that we pass to our ReAct agent.
To ensure the agent’s output is reliable and consistently structured, we use a Pydantic model. This forces the LLM to generate a response that conforms to our desired schema, which is critical for programmatic use and logging.
from enum import Enum
from pydantic import BaseModel, Field
class ThreatVerdict(str, Enum):
"""Enumeration for allowed threat verdicts."""
BENIGN = "benign"
SUSPICIOUS = "suspicious"
MALICIOUS = "malicious"
class PhishingAnalysis(BaseModel):
verdict: ThreatVerdict
summary: str = Field(description="Short summary explaining the reasoning for the threat verdict")
Given our MCP tools and output structure defined, we construct the ReAct agent. Our example of ChatBedrockConverse can be replaced with any of the other LangChain model adapters.
from langgraph.prebuilt import create_react_agent
from langchain_aws import ChatBedrockConverse
model = ChatBedrockConverse(...)
tools = await client.get_tools()
agent_prompt = """
You are a Tier 1 SOC Analyst AI Assistant. Your job is to analyze the user-reported email
and determine if it is malicious, suspicious, or benign.
1. First, extract all indicators of compromise (IOCs) from the email.
2. For each IOC, use your tools to check its reputation.
3. Based on the reputation of the IOCs, provide a final verdict and a summary of your findings.
"""
react_agent = create_react_agent(
model,
tools,
prompt=agent_prompt,
response_format=PhishingAnalysis,
)
The prompt instructs the agent to use the tools and follow the steps to determine a final verdict and summary of the findings. We invoke the agent with the email content to produce the structured output analysis.
response = await react_agent.ainvoke(
{"messages": [{"role": "user", "content": email_content}]}
)
analysis = response['structured_response']
In addition to the client-side logging, we also log the activity on the agent.
agent_logger.info(
"Completed phishing triage task.",
extra={
"agent_id": AGENT_ID,
"agent_name": AGENT_NAME,
"model_id": MODEL_ID,
"principal_user_id": user_id,
"event_correlation_id": correlation_id,
"event_action": "complete_triage_task",
"threat_verdict": analysis.verdict,
"threat_summary": analysis.summary,
}
)
End-to-end Example
Now that we have instrumented our AI agent and MCP server with structured logging, let’s see it in action on an example phishing email. For demonstration purposes, we have simplified the email content to reflect the key components.
From: security@yourbank-logins.com
Subject: Urgent: Suspicious Account Activity
We have detected unusual activity on your account. Please verify your identity immediately
by clicking here: http://evil-phish.com/login
Also, please review the attached report of your activity.
File hash: e88482b4026343e36bf48c1579a3f033
Using the full example code, the MCP server is run:
uv run -- python mcp_server.py
In a separate terminal, the AI agent is run:
uv run -- python agent_client.py
The agent produces multiple log events, both from the agent itself and from the MCP client-side logging.
{
"message": "Completed phishing triage task.",
"agent_id": "phishing-triage-agent-01",
"agent_name": "PhishingTriageAssistant",
"model_id": "us.anthropic.claude-3-7-sonnet-20250219-v1:0",
"principal_user_id": "soc_analyst_jane_doe",
"event_correlation_id": "ff54d007-a6d4-4d06-9cb5-4253fa97003f",
"event_action": "complete_triage_task",
"threat_verdict": "malicious",
"threat_summary": "This email contains multiple indicators of a phishing attack: (1) The sender domain \"yourbank-logins.com\" is suspicious and likely impersonating a legitimate bank, (2) The URL \"http://evil-phish.com/login\" is clearly malicious and designed for credential theft, (3) The email references an attachment with hash e88482b4026343e36bf48c1579a3f033 which is associated with malware, and (4) The message uses urgency and fear tactics to manipulate the recipient into taking immediate action. This is a sophisticated attack combining phishing and malware delivery techniques.",
"timestamp": "2025-07-30 14:18:38,564",
"log_level": "INFO"
}
Our AI agent has produced a threat verdict and summary findings that correctly identify the email as a phishing attempt, providing a clear explanation of the key elements. This structured log can now trigger the downstream remediation and mitigation techniques to stop the phishing campaign before it spreads.
Room for improvement
The frameworks for building Agents and MCP servers are rapidly maturing, and have room for further improvement. We see the following opportunities:
1) Adding support for the extra argument in the log message (see context.py:167) for FastMCP would allow client-side logging to send structured information. In FastMCP v2.10.6, the message argument was required to be a str type, limiting the logs to messages only. We introduced Pull Request #1326 to add support for the extra argument, which is now available in FastMCP v2.11.0.
2) Introducing client-side logging support for a log function analogous to exception in the Python standard library would improve error visibility. The exception handling log provides more detailed traceback information that is critical for understanding the error in depth beyond the string message.
As active open-source contributors, we are excited to continue to jump in and help move these frameworks forward!
Conclusion
We began with a critical question for the AI era: how do we turn powerful but opaque AI agents into transparent, auditable, and secure components of our enterprise systems?
For developers and data scientists, we have demonstrated a low-friction method using FastMCP and standard Python logging libraries to instrument AI agents. By embedding simple log calls within our agent’s tools, you can produce a rich, structured data stream that provides crucial visibility into the agent’s operational lifecycle. This isn’t an extra burden; it’s a best practice for building robust, maintainable, and secure-by-design AI systems.
For CISOs and security analysts, this visibility is the cornerstone of AI governance. The audit logs we generated for our Phishing Triage Assistant transform agents from a source of risk into a monitored asset. By feeding this structured, SIEM-ready data into your security ecosystem you can effectively monitor anomalous behavior, ensure compliance with audit requirements, and dramatically accelerate incident response. This enables not just better monitoring, but opens the door to fully automate defense, where a malicious action detected from an agent can trigger an immediate, orchestrated response, drastically reducing MTTR for AI-drive operations.
As AI becomes more integrated into business processes, the ability to trust and verify its actions will be paramount. The techniques shown here provide a practical and powerful foundation for building that trust.