Building an MCP server for Tripwire IP360 vulnerability information
For those of you have been experimenting with AI's recently it's likely you've come across Model Context Protocol (MCP) (for those who haven't, it is effectively a clean way to expose operational systems to an LLM as a structured toolset). There's lot of use cases for this, but in practice, that means you can let an Large Language Model (LLM) AI query something like Tripwire IP360's (our Vulnerability Management tool) for details of assets, audit snapshots, audited hosts, and vulnerability metadata without hard-coding one-off scripts into every workflow and build "human" runbooks quickly and easily.
For Tripwire IP360, we have an OpenAPI (Swagger) that includes a REST API rooted at /rest/v1, using HTTPS and HTTP Basic authentication which is a prefect entry point for a MCP. The API includes endpoints for assets, audits, audited hosts, and the ASPL vulnerability catalogue, which gives us enough surface area to build a useful MCP server so I figured I'd spend a bit of my bank holiday (Easter!) Monday geeking out and seeing how we can build something on this - so let's have a look at how you too could power up your VM with some AI power.
A basic MCP
On the MCP side, FastMCP is well suited to this because it lets you expose ordinary Python functions as tools with @mcp.tool, and run them over the default STDIO transport with a simple mcp.run().
For our IP360 scenario we'll make an MCP server to expose tools to:
-
list IP360 assets / hosts
-
get a single asset
-
list audit snapshots
-
list audited hosts for an audit
-
list vulnerability definitions from the ASPL catalogue
-
search vulnerabilities by CVE, name, or minimum score
-
return a small “posture summary” from recent audits
in effect, the following API endpoints:
/assets for tracked assets / hosts
/audit/hosts for audited hosts
/audits for scan snapshots
/aspl/vulns for system vulnerabilities IP360 can detect - note this isn't quite as detailed as our actual vulnerability info database, but we can hopefully supplement this information with details our LLM knows about vulnerabilities!
Ok, with that plan in mind, let's look at getting started building!
Project layout
You only need two files:
ip360_mcp/
-- server.py
-- requirements.txt
In our requirements text we have just two to cover us:
fastmcp
requests
Once that's in place, you can install them with:
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
or with Windows PowerShell:
python -m venv .venv
.venv\Scripts\Activate.ps1
pip install -r requirements.txt
Then we can put together our MCP server quite easily:
from __future__ import annotations
import os
import json
from typing import Any, Dict, List, Optional
from urllib.parse import urljoin
import requests
from requests.auth import HTTPBasicAuth
from fastmcp import FastMCP
# -----------------------------------------------------------------------------
# Configuration
# -----------------------------------------------------------------------------
IP360_BASE_URL = os.getenv("IP360_BASE_URL", "https://127.0.0.1/rest/v1/")
IP360_USERNAME = os.getenv("IP360_USERNAME", "")
IP360_PASSWORD = os.getenv("IP360_PASSWORD", "")
IP360_VERIFY_TLS = os.getenv("IP360_VERIFY_TLS", "true").lower() in ("1", "true", "yes")
IP360_TIMEOUT = int(os.getenv("IP360_TIMEOUT", "30"))
mcp = FastMCP(
"Tripwire IP360 MCP",
instructions=(
"Use these tools to query Tripwire IP360 assets, audits, audited hosts, "
"and vulnerability catalogue data. Prefer narrow filters where possible."
),
)
# -----------------------------------------------------------------------------
# Low-level client helpers
# -----------------------------------------------------------------------------
class IP360Client:
def __init__(
self,
base_url: str,
username: str,
password: str,
verify_tls: bool = True,
timeout: int = 30,
) -> None:
if not base_url.endswith("/"):
base_url += "/"
self.base_url = base_url
self.verify_tls = verify_tls
self.timeout = timeout
self.session = requests.Session()
self.session.auth = HTTPBasicAuth(username, password)
self.session.headers.update(
{
"Accept": "application/json",
"User-Agent": "tripwire-ip360-mcp/1.0",
}
)
def _request(
self,
method: str,
path: str,
params: Optional[Dict[str, Any]] = None,
) -> Any:
url = urljoin(self.base_url, path.lstrip("/"))
response = self.session.request(
method=method.upper(),
url=url,
params=params,
verify=self.verify_tls,
timeout=self.timeout,
)
content_type = response.headers.get("Content-Type", "")
if not response.ok:
detail = None
try:
detail = response.json()
except Exception:
detail = response.text
raise RuntimeError(
f"IP360 API request failed: {response.status_code} {response.reason} | "
f"URL={yoururl} | Detail={detail}"
)
if "application/json" in content_type:
return response.json()
return response.text
def get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Any:
return self._request("GET", path, params=params)
def get_client() -> IP360Client:
if not IP360_USERNAME or not IP360_PASSWORD:
raise RuntimeError(
"IP360 credentials not configured. "
"Set IP360_USERNAME and IP360_PASSWORD environment variables."
)
return IP360Client(
base_url=IP360_BASE_URL,
username=IP360_USERNAME,
password=IP360_PASSWORD,
verify_tls=IP360_VERIFY_TLS,
timeout=IP360_TIMEOUT,
)
def compact(obj: Dict[str, Any]) -> Dict[str, Any]:
"""Remove None values from query dicts."""
return {k: v for k, v in obj.items() if v is not None}
def paginate_all(
client: IP360Client,
path: str,
params: Optional[Dict[str, Any]] = None,
max_pages: int = 10,
) -> Dict[str, Any]:
"""
Collect paginated DRF-style responses:
{
"count": int,
"next": url|null,
"previous": url|null,
"results": [...]
}
Falls back cleanly if the endpoint returns a single object instead.
"""
params = dict(params or {})
results: List[Any] = []
page = 0
offset = int(params.get("offset", 0))
limit = int(params.get("limit", 100))
while page < max_pages:
page_params = dict(params)
page_params["limit"] = limit
page_params["offset"] = offset
data = client.get(path, params=page_params)
if isinstance(data, dict) and "results" in data:
results.extend(data.get("results", []))
if not data.get("next"):
return {
"count": data.get("count", len(results)),
"returned": len(results),
"results": results,
}
offset += limit
page += 1
continue
# Non-paginated response
return {
"count": 1 if data else 0,
"returned": 1 if data else 0,
"results": [data] if data else [],
}
return {
"count": len(results),
"returned": len(results),
"results": results,
"warning": f"Pagination stopped after {max_pages} pages",
}
# -----------------------------------------------------------------------------
# Formatting helpers
# -----------------------------------------------------------------------------
def slim_asset(asset: Dict[str, Any]) -> Dict[str, Any]:
return {
"id": asset.get("id") or asset.get("pk"),
"ip_address": asset.get("ip_address"),
"dns_name": asset.get("dns_name"),
"netbios_name": asset.get("netbios_name"),
"os": (
asset.get("os", {}).get("name")
if isinstance(asset.get("os"), dict)
else asset.get("os")
),
"last_seen": asset.get("last_seen"),
"in_tripwire_enterprise": asset.get("in_tripwire_enterprise"),
"score": asset.get("score"),
"url": asset.get("url"),
}
def slim_audit(audit: Dict[str, Any]) -> Dict[str, Any]:
return {
"id": audit.get("id") or audit.get("pk"),
"name": audit.get("name"),
"status": audit.get("status"),
"scan_type": audit.get("scan_type"),
"host_count": audit.get("host_count"),
"vuln_count": audit.get("vuln_count"),
"average_host_score": audit.get("average_host_score"),
"start_date": audit.get("start_date"),
"end_date": audit.get("end_date"),
"url": audit.get("url"),
}
def slim_audit_host(host: Dict[str, Any]) -> Dict[str, Any]:
os_name = None
if isinstance(host.get("operating_system"), dict):
os_name = host["operating_system"].get("name")
return {
"id": host.get("id") or host.get("pk"),
"ip_address": host.get("ip_address"),
"dns_name": host.get("dns_name"),
"netbios_name": host.get("netbios_name"),
"domain_name": host.get("domain_name"),
"operating_system": os_name,
"score": host.get("score"),
"timestamp": host.get("timestamp"),
"audit": host.get("audit"),
"url": host.get("url"),
}
def slim_vuln(vuln: Dict[str, Any]) -> Dict[str, Any]:
risk = vuln.get("risk")
risk_name = risk.get("name") if isinstance(risk, dict) else risk
return {
"id": vuln.get("id") or vuln.get("pk"),
"name": vuln.get("name"),
"cve": vuln.get("cve"),
"score": vuln.get("score"),
"cvssv3": vuln.get("cvssv3"),
"cvssv2": vuln.get("cvssv2"),
"risk": risk_name,
"publish_date": vuln.get("publish_date"),
"description": vuln.get("description"),
"url": vuln.get("url"),
}
# -----------------------------------------------------------------------------
# MCP tools
# -----------------------------------------------------------------------------
@mcp.tool
def health_check() -> Dict[str, Any]:
"""
Validate that the MCP server is configured to talk to Tripwire IP360.
"""
return {
"server": "Tripwire IP360 MCP",
"base_url": IP360_BASE_URL,
"username_configured": bool(IP360_USERNAME),
"password_configured": bool(IP360_PASSWORD),
"verify_tls": IP360_VERIFY_TLS,
"timeout_seconds": IP360_TIMEOUT,
}
@mcp.tool
def list_assets(
search: Optional[str] = None,
ip_address: Optional[str] = None,
dns_name: Optional[str] = None,
os_name_contains: Optional[str] = None,
in_tripwire_enterprise: Optional[bool] = None,
limit: int = 50,
max_pages: int = 2,
) -> Dict[str, Any]:
"""
List IP360 assets/hosts tracked across scans.
"""
client = get_client()
params = compact(
{
"search": search,
"ip_address": ip_address,
"dns_name": dns_name,
"os__name__icontains": os_name_contains,
"in_tripwire_enterprise": str(in_tripwire_enterprise).lower()
if in_tripwire_enterprise is not None
else None,
"limit": limit,
"offset": 0,
}
)
data = paginate_all(client, "/assets", params=params, max_pages=max_pages)
data["results"] = [slim_asset(x) for x in data["results"]]
return data
@mcp.tool
def get_asset(persistent_host_id: int) -> Dict[str, Any]:
"""
Get a single asset by persistent host ID.
"""
client = get_client()
asset = client.get(f"/assets/{persistent_host_id}")
return slim_asset(asset)
@mcp.tool
def list_audits(
name_contains: Optional[str] = None,
status: Optional[str] = None,
network_name: Optional[str] = None,
scan_profile_name: Optional[str] = None,
started_after: Optional[str] = None,
started_before: Optional[str] = None,
limit: int = 50,
max_pages: int = 2,
) -> Dict[str, Any]:
"""
List IP360 audit snapshots.
Date strings should be ISO-like values accepted by the IP360 API.
"""
client = get_client()
params = compact(
{
"name__icontains": name_contains,
"status": status,
"network__name__icontains": network_name,
"scan_profile__name__icontains": scan_profile_name,
"start_date__gte": started_after,
"start_date__lte": started_before,
"ordering": "-start_date",
"limit": limit,
"offset": 0,
}
)
data = paginate_all(client, "/audits", params=params, max_pages=max_pages)
data["results"] = [slim_audit(x) for x in data["results"]]
return data
@mcp.tool
def list_audited_hosts(
audit: Optional[str] = None,
ip_address: Optional[str] = None,
dns_name: Optional[str] = None,
domain_name: Optional[str] = None,
operating_system_contains: Optional[str] = None,
min_score: Optional[int] = None,
limit: int = 100,
max_pages: int = 2,
) -> Dict[str, Any]:
"""
List hosts discovered in an audit snapshot.
The 'audit' argument is the audit filter value exposed by the API.
"""
client = get_client()
params = compact(
{
"audit": audit,
"ip_address": ip_address,
"dns_name": dns_name,
"domain_name": domain_name,
"operating_system__name__icontains": operating_system_contains,
"score__gte": min_score,
"ordering": "-score",
"limit": limit,
"offset": 0,
}
)
data = paginate_all(client, "/audit/hosts", params=params, max_pages=max_pages)
data["results"] = [slim_audit_host(x) for x in data["results"]]
return data
@mcp.tool
def list_vulnerability_catalog(
search: Optional[str] = None,
cve: Optional[str] = None,
name_contains: Optional[str] = None,
min_score: Optional[float] = None,
min_cvssv3: Optional[float] = None,
risk_level: Optional[str] = None,
published_after: Optional[str] = None,
limit: int = 100,
max_pages: int = 2,
) -> Dict[str, Any]:
"""
List vulnerability definitions from the IP360 ASPL catalogue.
This is the catalogue of vulnerabilities IP360 can detect.
"""
client = get_client()
params = compact(
{
"search": search,
"cve": cve,
"name__icontains": name_contains,
"score__gte": min_score,
"cvssv3__gte": min_cvssv3,
"risk__level": risk_level,
"publish_date__gte": published_after,
"ordering": "-score",
"limit": limit,
"offset": 0,
}
)
data = paginate_all(client, "/aspl/vulns", params=params, max_pages=max_pages)
data["results"] = [slim_vuln(x) for x in data["results"]]
return data
@mcp.tool
def get_vulnerability(vuln_id: str) -> Dict[str, Any]:
"""
Get a single vulnerability definition by ID from the ASPL catalogue.
"""
client = get_client()
vuln = client.get(f"/aspl/vulns/{vuln_id}")
return slim_vuln(vuln)
@mcp.tool
def recent_posture_summary(
audit_name_contains: Optional[str] = None,
limit: int = 10,
) -> Dict[str, Any]:
"""
Return a compact summary of recent audits with host and vulnerability counts.
"""
audits = list_audits(
name_contains=audit_name_contains,
limit=limit,
max_pages=1,
)
results = audits.get("results", [])
if not results:
return {
"message": "No audits found",
"audits": [],
"totals": {
"host_count": 0,
"vuln_count": 0,
},
}
total_hosts = sum((x.get("host_count") or 0) for x in results)
total_vulns = sum((x.get("vuln_count") or 0) for x in results)
return {
"audits": results,
"totals": {
"host_count": total_hosts,
"vuln_count": total_vulns,
},
}
@mcp.tool
def raw_get(
endpoint: str,
query_json: Optional[str] = None,
) -> Dict[str, Any]:
"""
Escape hatch for querying arbitrary GET endpoints exposed by IP360.
Example endpoint:
/assets
/audits
/audit/hosts
/aspl/vulns
Example query_json:
{"limit": 20, "ordering": "-start_date"}
"""
client = get_client()
params = json.loads(query_json) if query_json else None
data = client.get(endpoint, params=params)
if isinstance(data, dict):
return data
return {"result": data}
if __name__ == "__main__":
mcp.run()
How this maps to the Swagger
For those following closely, I'd note that this implementation is intentionally quite simple and aligned to the parts of the Swagger API that are most useful for our scenario(s) -you may want to expand this if you need different functionality (agent details for example) - so for now our server just wraps these endpoints:
-
GET /assets and GET /assets/{persistent_host_id} for tracked assets / hosts.
-
GET /audits for audit snapshots. Audit objects expose host_count and vuln_count, which is useful for summary tools.
-
GET /audit/hosts for audited hosts and their score, timestamp, names, and OS.
-
GET /aspl/vulns and GET /aspl/vulns/{vuln_id} for the catalogue of vulnerabilities IP360 can detect.
Environment variables
We will need to set a few variables for our use before running:
Linux/Bash
export IP360_BASE_URL="https://your-ip360-host/rest/v1/"
export IP360_USERNAME="apiuser"
export IP360_PASSWORD="supersecret"
export IP360_VERIFY_TLS="true"
export IP360_TIMEOUT="30"
Windows/PowerShell
$env:IP360_BASE_URL = "https://your-ip360-host/rest/v1/"
$env:IP360_USERNAME = "apiuser"
$env:IP360_PASSWORD = "supersecret"
$env:IP360_VERIFY_TLS = "true"
$env:IP360_TIMEOUT = "30"
And if you use a lab system with a self-signed cert, you can temporarily set:
export IP360_VERIFY_TLS="false"
of course, use that sparingly/not in your production deployments!
Running the MCP server
FastMCP supports the normal python server.py pattern via mcp.run(), which uses STDIO by default. You can also launch it with the FastMCP CLI.
python server.py
Or:
fastmcp run server.py:mcp
Example MCP queries
Once connected from an MCP-aware client, these are the kinds of prompts that become possible:
Return all Windows hosts seen in recent scans
List assets where the OS name contains Windows, return the first 20.
That would map naturally to the list_assets tool with:
{
"os_name_contains": "Windows",
"limit": 20
}
Find a host by IP
Find the asset for 10.20.30.40.
Mapped call:
{
"ip_address": "10.20.30.40",
"limit": 10
}
Show recent audit posture
Shows the last 5 audits and total hosts and vulnerabilities found.
Mapped call:
{
"limit": 5
}
against recent_posture_summary.
Return audited hosts for a given audit
Show audited hosts for audit Weekly Internal Scan, sorted by highest score.
Mapped call:
{
"audit": "Weekly Internal Scan",
"limit": 50
}
- Return vulnerability definitions for a CVE
Find the IP360 vulnerability entry for CVE-2025-12345.
Mapped call:
{
"cve": "CVE-2025-12345",
"limit": 20
}
against list_vulnerability_catalog.
- Return high-risk detectable vulnerabilities
List detectable vulnerabilities with CVSSv3 >= 9.0.
Mapped call:
{
"min_cvssv3": 9.0,
"limit": 50
}
- Search vulnerability descriptions
Search the vulnerability catalogue for OpenSSL issues.
Mapped call:
{
"search": "OpenSSL",
"limit": 50
}
A basic MCP - but a powerful tool for vulnerability management
There are three practical design choices here that I'd call out:
-
First, the server keeps one small, explicit wrapper per API area instead of trying to auto-generate hundreds of MCP tools from Swagger. That makes the tool surface stable and usable for an LLM.
-
Second, each tool exposes the filters that matter most operationally: IP, DNS name, OS, score, CVE, CVSS, dates, and audit name. Those align well with the filterable fields visible in IP360's API for assets, audit hosts, audits, and ASPL vulnerabilities.
-
Third, the raw_get tool gives us an escape hatch. That is useful when/if you later decide to expose extra IP360 endpoints without changing the server structure.
Final thoughts
This is the sweet spot for MCP in security operations for me at this point: taking a well-defined security tool API, wrap the parts analysts might actually want to use, and expose them as a compact set of typed tools that allows you to do useful things.
For Tripwire IP360, the Swagger API gives you a solid foundation: tracked assets, audit snapshots, audited hosts, and vulnerability catalogue data and that is enough to make an LLM genuinely useful for host lookups, scan summaries, vulnerability searches, and quick posture interrogation without building a heavyweight middleware tier.