Integrating IP360 with AWS Lambda

I built the below script to allow users to easily scale our vulnerability management monitoring of servers deployed in AWS with automated exceptions being pulled from an S3 bucket and automatically added to Tripwire IP360 via a AWS Lambda call:

import boto3
from botocore.config import Config
import os
from base64 import b64decode
from botocore.exceptions import ClientError
import requests
import urllib3
import json
import uuid
import ipaddress
import struct
import time
import os
import sys
import argparse
"""
Functions - AWS
"""
def GET_EXCEPTION_AWS_S3(s3_bucket,filename):
 # Set up session not show warnings/errors for the self-signed cert
 s3 = boto3.resource('s3',verify=False)
 # Capture some info about session in case it's required
 client = boto3.client("sts")
 account_id = client.get_caller_identity()["Account"]
 verified_ip_list = []
 # Get data
 try:
 obj = s3.Object(s3_bucket,filename)
 body = obj.get()['Body'].read()
 print("Retrieved")
 if (body != ""):
 # Content exists, validate format
 ip_list = body.decode("utf-8").split("\r\n")
 for ip in ip_list:
 try:
 ipaddress.ip_address(ip)
 verified_ip_list.append(ip)
 except:
 print("Not a valid IP")
 return verified_ip_list
 except:
 print("Failed to retrieve data as "+ str(account_id))
"""
Functions - IP360
"""
# Set up Session() to not show warnings/errors for the self-signed cert
session = requests.Session()
session.verify = False
urllib3.disable_warnings()
session.headers.update({
 "accept": "application/json, text/plain",
 "X-Requested-With": "required"
})
def ip360baseurl(ip360vnehost):
 ip360baseurl = "https://"+ip360vnehost+"/rest/v1"
 return ip360baseurl
def GET_IP360_NETWORK(network_name,ip360vnehost,ip360username,ip360password):
 with requests.Session() as s:
 s.verify = False
 urllib3.disable_warnings()
 s.headers.update({
 "accept": "application/json, text/plain",
 "X-Requested-With": "required",
 "Referer": ip360baseurl(ip360vnehost)
 })
 # Login
 try:
 payload = {"login" : ip360username, "password" : ip360password, "auth_type" : "internal",
"auth_server_id" : "0", "do": "auth"}
 login = s.post(url="https://%s/index.ice" % (ip360vnehost), data=payload)
 except:
 print("Failed log in - check credentials and network access")
 # Check for existing network
 try:
 network = s.get(url="https://%s/rest/v1/networks?name=%s" % (ip360vnehost,network_name))
 return network
 except:
 print("Network not found")
def GET_IP360_POOL(pool_name,ip360vnehost,ip360username,ip360password):
 with requests.Session() as s:
 s.verify = False
 urllib3.disable_warnings()
 s.headers.update({
 "accept": "application/json, text/plain",
 "X-Requested-With": "required",
 "Referer": ip360baseurl(ip360vnehost)
 })
 # Login
 try:
 payload = {"login" : ip360username, "password" : ip360password, "auth_type" : "internal",
"auth_server_id" : "0", "do": "auth"}
 login = s.post(url="https://%s/index.ice" % (ip360vnehost), data=payload)
 except:
 print("Failed log in - check credentials and network access")
 # Check for existing network
 try:
 pool = s.get(url="https://%s/rest/v1/appliance_pools?name=%s" % (ip360vnehost,pool_name))
 return pool
 except:
 print("Pool not found")
def GET_IP360_SCAN_PROFILE(profile_name,ip360vnehost,ip360username,ip360password):
 with requests.Session() as s:
 s.verify = False
 urllib3.disable_warnings()
 s.headers.update({
 "accept": "application/json, text/plain",
 "X-Requested-With": "required",
 "Referer": ip360baseurl(ip360vnehost)
 })
 # Login
 try:
 payload = {"login" : ip360username, "password" : ip360password, "auth_type" : "internal",
"auth_server_id" : "0", "do": "auth"}
 login = s.post(url="https://%s/index.ice" % (ip360vnehost), data=payload)
 except:
 print("Failed log in - check credentials and network access")
 # Check for existing network
 try:
 profile = s.get(url="https://%s/rest/v1/scan_profiles?search=%s" % (ip360vnehost,profile_name))
 return profile
 except:
 print("Scan Profile not found")
def GET_IP360_CREDENTIAL(credential_name,ip360vnehost,ip360username,ip360password):
 with requests.Session() as s:
 s.verify = False
 urllib3.disable_warnings()
 s.headers.update({
 "accept": "application/json, text/plain",
 "X-Requested-With": "required",
 "Referer": ip360baseurl(ip360vnehost)
 })
 # Login
 try:
 payload = {"login" : ip360username, "password" : ip360password, "auth_type" : "internal",
"auth_server_id" : "0", "do": "auth"}
 login = s.post(url="https://%s/index.ice" % (ip360vnehost), data=payload)
 except:
 print("Failed log in - check credentials and network access")
 # Check for existing network
 try:
 credential = s.get(url="https://%s/rest/v1/credentials?name=%s" % (ip360vnehost,credential_name))
 return credential
 except:
 print("Scan Profile not found")
def SET_IP360_NETWORK(network_name,included_ips,ip360vnehost,ip360username,ip360password):
 with requests.Session() as s:
 # !!! Disables SSL validation for self signed certs- NOT RECOMMENDED !!!
 s.verify = False
 urllib3.disable_warnings()
 s.headers.update({
 "accept": "application/json, text/plain",
 "X-Requested-With": "required",
 "Referer": ip360baseurl(ip360vnehost)
 })
 # Login
 try:
 payload = {"login" : ip360username, "password" : ip360password, "auth_type" : "internal",
"auth_server_id" : "0", "do": "auth"}
 login = s.post(url="https://%s/index.ice" % (ip360vnehost), data=payload)
 except:
 print("Failed log in - check credentials and network access")
 # Check for existing network
 networks = s.get(url="https://%s/rest/v1/networks?search=%s" % (ip360vnehost,network_name))
 #print("Matching name network found (count):"+json.loads(networks.content)["count"])
 if (json.loads(networks.content)["count"] > 0):
 # If we get this far, the network exists, so we need to patch it instead:
 print("Existing network with matching name found, updating")
 networks = json.loads(networks.content)
 network_to_update = networks["results"][0]
 payload = {
 "url": network_to_update["url"],
 "name": network_to_update["name"],
 "asset_value": network_to_update["asset_value"],
 #"notes": network_to_update["notes"]+". Updated by API",
 "include_ips": network_to_update["include_ips"]+ "," + included_ips,
 #"exclude_ips": network_to_update["exclude_ips"]+ "," + excluded_ips,
 "exclude_ips": "",
 "virtual_hosts": [],
 "network_type": "default",
 "effective_ranges": network_to_update["effective_ranges"],
 }
 try:
 patch_network = s.patch(url=network_to_update["url"], data=payload, auth=(ip360username,
ip360password))
 except:
 print("Failed to update existing network: "+ network_name +", check permissions/network
configuration")
 else:
 # Post a new network :
 payload = {
 "name": network_name,
 "active": True,
 "notes": "Created by API",
 "include_ips": included_ips,
 #"exclude_ips": excluded_ips,
 "exclude_ips": "",
 "host_tracking": "{\"correlation_threshold\": 90, \"correlation_method\": \"Static IP\", \"
correlation_values\": {\"DNS Name\": 0.0, \"OS Fingerprint\": 0.0, \"NetBIOS Name\": 0.0, \"Apparent MAC
Address\": 0.0, \"Port Signature\": 0.0, \"IP Address\": 100.0, \"Operating System\": 0.0}}",
 }
 print("Network config:")
 print(payload)
 try:
 post_new_network = s.post(url="%s/networks" % (ip360baseurl(ip360vnehost)), data=payload, auth=
(ip360username,ip360password))
 print("Added new network: "+network_name)
 except:
 print("Failed to add a new network - check access and network parameters")
 print("Adding/updating network complete for "+network_name)
def REPLACE_IP360_NETWORK(network_name,included_ips,excluded_ips,ip360vnehost,ip360username,ip360password):
 with requests.Session() as s:
 s.verify = False
 urllib3.disable_warnings()
 s.headers.update({
 "accept": "application/json, text/plain",
 "X-Requested-With": "required",
 "Referer": ip360baseurl(ip360vnehost)
 })
 try:
 payload = {"login" : ip360username, "password" : ip360password, "auth_type" : "internal",
"auth_server_id" : "0", "do": "auth"}
 login = s.post(url="https://%s/index.ice" % (ip360vnehost), data=payload)
 except:
 print("Failed log in - check credentials and network access")
 networks = s.get(url="https://%s/rest/v1/networks?search=%s" % (ip360vnehost,network_name), auth=
(ip360username, ip360password))
 if (json.loads(networks.content)["count"] > 0):
 # network exists - so remove it first
 print("Removing existing duplicate network")
 networks = json.loads(networks.content)
 network_to_update = networks["results"][0]
 payload = {
 "url": network_to_update["url"],
 "name": network_to_update["name"],
 }
 try:
 delete_network = s.delete(url=network_to_update["url"], data=payload, auth=(ip360username,
ip360password))
 except:
 print("Failed to remove existing network: "+ network_name +", check scan task is not in
progress / locked")
 payload = {
 "name": network_name,
 "active": True,
 "notes": "Created by API",
 "include_ips": included_ips,
 "exclude_ips": excluded_ips,
 "host_tracking": "{\"correlation_threshold\": 90, \"correlation_method\": \"Static IP\", \"
correlation_values\": {\"DNS Name\": 0.0, \"OS Fingerprint\": 0.0, \"NetBIOS Name\": 0.0, \"Apparent MAC
Address\": 0.0, \"Port Signature\": 0.0, \"IP Address\": 100.0, \"Operating System\": 0.0}}",
 }
 print(payload)
 try:
 post_new_network = s.post(url="%s/networks" % (ip360baseurl(ip360vnehost)), data=payload, auth=
(ip360username,ip360password))
 print("Added new network: "+network_name)
 except:
 print("Failed to add a new network - check access and network parameters")
def EXECUTE_IP360_ADHOC_SCAN(network,pool,scanprofile,ip360vnehost,ip360username,ip360password):
 network_set = GET_IP360_NETWORK(network,ip360vnehost,ip360username,ip360password)
 pool_set = GET_IP360_POOL(pool,ip360vnehost,ip360username,ip360password)
 profile_set = GET_IP360_SCAN_PROFILE(scanprofile,ip360vnehost,ip360username,ip360password)
 print("Sending ahoc scan request to %s as %s" % (ip360baseurl(ip360vnehost), ip360username))
 PAYLOAD={
 "debug": False,
 "name": "Adhoc Automated Scan",
 "network": json.loads(network_set.content)["results"][0]["url"],
 "pool": json.loads(pool_set.content)["results"][0]["url"],
 "scan_profile": json.loads(profile_set.content)["results"][0]["url"],
 "special_scan": False
 }
 postscan = session.post(url="%s/audits" % (ip360baseurl(ip360vnehost)), auth=(ip360username, ip360password),
json=PAYLOAD)
 print(postscan.content)
 postscan.connection.close()
def SET_IP360_SCHEDULED_SCAN(network,pool,scanprofile,ip360vnehost,ip360username,ip360password):
 # Get Network
 network_set = GET_IP360_NETWORK(network,ip360vnehost,ip360username,ip360password)
 # Get appliance pool
 pool_set = GET_IP360_POOL(pool,ip360vnehost,ip360username,ip360password)
 # Get scan profile
 profile_set = GET_IP360_SCAN_PROFILE(scanprofile,ip360vnehost,ip360username,ip360password)
 # Check for existing scan
 config_name = "Auto-" + json.loads(network_set.content)["results"][0]["name"] + json.loads(profile_set.
content)["results"][0]["name"] + " Scan"
 scheduledscans = session.get(url="https://%s/rest/v1/scan_configs?name=%s" % (ip360vnehost,config_name),
auth=(ip360username, ip360password))
 # Unforunately filtering for network names isn't consistent in older versions of IP360, so we need to check
our scan names to verify it doesn't already exist
 for scans in json.loads(scheduledscans.content)["results"]:
 if (scans["name"] == config_name):
 # Update existing scheduled scan
 print("Scheduled Scan already exists - no action taken")
 break
 else:
 # Post scheduled scan
 print("Sending scan scheduling request to %s as %s" % (ip360baseurl(ip360vnehost), ip360username))
 PAYLOAD={
 "active": True,
 "name": "Auto-" + json.loads(network_set.content)["results"][0]["name"] + json.loads
(profile_set.content)["results"][0]["name"] + " Scan",
 "network": json.loads(network_set.content)["results"][0]["url"],
 "pool": json.loads(pool_set.content)["results"][0]["url"],
 "scan_profile": json.loads(profile_set.content)["results"][0]["url"],
 }
 postscan = session.post(url="%s/scan_configs" % (ip360baseurl(ip360vnehost)), auth=(ip360username,
ip360password),json=PAYLOAD)
 postscan.connection.close()
def SET_IP360_CREDENTIAL(credential_name,credential_username,credential_password,credential_domain,
credential_auth_algo,ip360network,exceptionip,ip360vnehost,ip360username,ip360password):
 with requests.Session() as s:
 # !!! Disables SSL validation for self signed certs- NOT RECOMMENDED !!!
 s.verify = False
 urllib3.disable_warnings()
 s.headers.update({
 "accept": "application/json, text/plain",
 "X-Requested-With": "required",
 "Referer": ip360baseurl(ip360vnehost)
 })
 payload = {"login" : ip360username, "password" : ip360password, "auth_type" : "internal",
"auth_server_id" : "0", "do": "auth"}
 login = s.post(url="https://%s/index.ice" % (ip360vnehost), data=payload)
 payload = {"name":credential_name, "username":credential_username, "password":credential_password,
"domain":credential_domain, "minimum_auth_algorithm":credential_auth_algo}
 try:
 POSTCREDS = s.post(url="%s/credential/windows" % (ip360baseurl(ip360vnehost)), data=payload, auth=
(ip360username,ip360password))
 JSONCREDS = json.loads(POSTCREDS.text)
 payload = {"credential": JSONCREDS["url"], "network": ip360network, "ip_ranges": exceptionip}
 try:
 POSTCREDBIND = s.post(url="%s/credential/bindings" % (ip360baseurl(ip360vnehost)),
data=payload, auth=(ip360username,ip360password))
 except:
 print("Failed to bind credential to a network")
 except:
 print("Failed to add credential to IP360")
 print("Finished!")
def BIND_IP360_CREDENTIAL(credential_name,ip360network,ipbind,ip360vnehost,ip360username,ip360password):
 with requests.Session() as s:
 # !!! Disables SSL validation for self signed certs- NOT RECOMMENDED !!!
 s.verify = False
 urllib3.disable_warnings()
 s.headers.update({
 "accept": "application/json, text/plain",
 "X-Requested-With": "required",
 "Referer": ip360baseurl(ip360vnehost)
 })
 payload = {"login" : ip360username, "password" : ip360password, "auth_type" : "internal",
"auth_server_id" : "0", "do": "auth"}
 login = s.post(url="https://%s/index.ice" % (ip360vnehost), data=payload)
 credential = GET_IP360_CREDENTIAL(credential_name,ip360vnehost,ip360username,ip360password)
 network = GET_IP360_NETWORK(ip360network,ip360vnehost,ip360username,ip360password)
 JSONnetwork = json.loads(network.text)
 JSONCREDS = json.loads(credential.text)
 payload = {"credential": JSONCREDS["results"][0]["url"], "network": JSONnetwork["results"][0]["url"],
"ip_ranges": ipbind}
 try:
 POSTCREDBIND = s.post(url="%s/credential/bindings" % (ip360baseurl(ip360vnehost)), data=payload,
auth=(ip360username,ip360password))
 except:
 print("Failed to post credential")
 print("Finished!")
def initiate_adhoc_scan(network,pool,range,scanprofile,ip360baseurl,ip360username,ip360password):
 print("Sending ahoc scan request to %s as %s" % (ip360username, ip360password))
 PAYLOAD={
 "debug": False,
 "name":"Test100",
 "network":ip360baseurl+"/networks/3",
 "pool":ip360baseurl+"/appliance_pools/1",
 "range": range,
 "scan_profile": ip360baseurl+"/scan_profiles/5",
 "scan_profile_type": "Discovery",
 "scan_type": "Partial",
 "special_scan": False
 }
 postscan = session.post(url="%s/audits" % (ip360baseurl), auth=(ip360username, ip360password),json=PAYLOAD)
 print(postscan.content)
 postscan.connection.close()
def DO_IP360_ADHOC_SCAN(network,iptoscan,pool,scanprofile,ip360vnehost,ip360username,ip360password):
 # Get Network
 network_set = GET_IP360_NETWORK(network,ip360vnehost,ip360username,ip360password)
 # Get appliance pool
 pool_set = GET_IP360_POOL(pool,ip360vnehost,ip360username,ip360password)
 # Get scan profile
 profile_set = GET_IP360_SCAN_PROFILE(scanprofile,ip360vnehost,ip360username,ip360password)
 # Check for existing scan
 config_name = "Auto-" + json.loads(network_set.content)["results"][0]["name"] + json.loads(profile_set.
content)["results"][0]["name"] + " Scan"
 # Post scheduled scan
 print("Sending scan scheduling request to %s as %s" % (ip360baseurl(ip360vnehost), ip360username))
 PAYLOAD={
 "debug":False,
 "name": "Auto-" + json.loads(network_set.content)["results"][0]["name"] + json.loads
(profile_set.content)["results"][0]["name"] + " Scan",
 "network": json.loads(network_set.content)["results"][0]["url"],
 "range":iptoscan,
 "pool": json.loads(pool_set.content)["results"][0]["url"],
 "scan_profile": json.loads(profile_set.content)["results"][0]["url"],
 "scan_type": "Partial",
 "special_scan": False
 }
 try:
 postscan = session.post(url="%s/audits" % (ip360baseurl(ip360vnehost)), auth=(ip360username,
ip360password),json=PAYLOAD)
 except:
 print("Failed to create Adhoc Scan")
 postscan.connection.close()
def get_secret():
 secret_name = "/tripwire/vnepw"
 region_name = "eu-west-1"
 # Create a Secrets Manager client
 sm_session = boto3.session.Session()
 sm_client = sm_session.client(
 service_name='secretsmanager',
 region_name=region_name,
 config=Config(
 proxies={'https': os.environ['PROXY']}
 )
 )
 #Get the secret password from Secrets Manager
 try:
 get_secret_value_response = sm_client.get_secret_value(
 SecretId=secret_name
 )
 except ClientError as e:
 print(e)
 if e.response['Error']['Code'] == 'DecryptionFailureException':
 # Secrets Manager can't decrypt the protected secret text using the provided KMS key.
 # Deal with the exception here, and/or rethrow at your discretion.
 raise e
 elif e.response['Error']['Code'] == 'InternalServiceErrorException':
 # An error occurred on the server side.
 # Deal with the exception here, and/or rethrow at your discretion.
 raise e
 elif e.response['Error']['Code'] == 'InvalidParameterException':
 # You provided an invalid value for a parameter.
 # Deal with the exception here, and/or rethrow at your discretion.
 raise e
 elif e.response['Error']['Code'] == 'InvalidRequestException':
 # You provided a parameter value that is not valid for the current state of the resource.
 # Deal with the exception here, and/or rethrow at your discretion.
 raise e
 elif e.response['Error']['Code'] == 'ResourceNotFoundException':
 # We can't find the resource that you asked for.
 # Deal with the exception here, and/or rethrow at your discretion.
 raise e
 else:
 # Decrypts secret using the associated KMS CMK.
 # Depending on whether the secret is a string or binary, one of these fields will be populated.
 if 'SecretString' in get_secret_value_response:
 secret = get_secret_value_response['SecretString']
 return secret
 else:
 decodedBinarySecret = base64.b64decode(get_secret_value_response['SecretBinary'])
 return decodedBinarySecret
def lambda_handler(event, context):
 DEBUG = os.environ['DEBUG']
 VNEIP = os.environ['VNEIP']
 IP360DP = os.environ['IP360DP']
 IP360PROFILE = os.environ['IP360PROFILE']
 IP360NETWORK = os.environ['IP360NETWORK']
 VNEUSER = os.environ['VNEUSER']
 CUSTOMERS = os.environ['CUSTOMERS'].split(",")
 MPCAWSEXCEPTIONLIST = os.environ['MPCAWSEXCEPTIONLIST'].split(",")
 EXCEPTIONLISTBUCKET = os.environ['EXCEPTIONLISTBUCKET']
 ENVIRONMENT = os.environ['ENVIRONMENT']
 IPSCANLIST = []
 try:
 SINGLE_IP = event['SingleIP']
 except:
 SINGLE_IP = 'NOIP'
 IPSCANLIST = []
 if DEBUG == "True":
 print (SINGLE_IP)
 # setup connection to AWS
 kms_client = boto3.client(
 'kms',
 config=Config(
 proxies={'https': os.environ['PROXY']}
 )
 )
 sts_client = boto3.client(
 'sts',
 config=Config(
 proxies={'https': os.environ['PROXY']}
 )
 )
 if SINGLE_IP == "NOIP":
 # event contains all information about uploaded object
 if DEBUG == "True":
 print("Event :", event)
 # Bucket Name where file was uploaded
 source_bucket_name = event['Records'][0]['s3']['bucket']['name']
 # Filename of object (with path)
 file_key_name = event['Records'][0]['s3']['object']['key']
 if DEBUG == "True":
 print (source_bucket_name)
 print (file_key_name)
 # Read the exception list
 s3_exceptionlist_Resource=boto3.resource(
 's3',
 config=Config(
 proxies={'https': os.environ['PROXY']}
 ),
 )
 s3_exceptionlist_obj = s3_exceptionlist_Resource.Object(EXCEPTIONLISTBUCKET, 'tripwire_exceptions.txt')
 exceptionlist = s3_exceptionlist_obj.get()['Body'].read().decode('utf-8').splitlines()
 print(exceptionlist)
 # Read the post patching input file
 s3_post_patching_Resource=boto3.resource(
 's3',
 config=Config(
 proxies={'https': os.environ['PROXY']}
 )
 )
 s3_post_patching_obj = s3_post_patching_Resource.Object(source_bucket_name, file_key_name)
 post_patching_scanlist = s3_post_patching_obj.get()['Body'].read().decode('utf-8').splitlines()
 #print (post_patching_scanlist)
 for post_patching_scanline in post_patching_scanlist:
 if post_patching_scanline != "":
 CUSTOMERCODE = post_patching_scanline.split(',')[0]
 FQDN = post_patching_scanline.split(',')[1]
 IP_ADDRESS = post_patching_scanline.split(',')[2]
 if CUSTOMERCODE.upper() in CUSTOMERS:
 if IP_ADDRESS not in exceptionlist:
 if FQDN not in MPCAWSEXCEPTIONLIST:
 if DEBUG == "True":
 print ("IP: " + IP_ADDRESS + " FQDN: " + FQDN)
 IPSCANLIST.append(IP_ADDRESS)
 else:
 IPSCANLIST.append(SINGLE_IP)
 #Check scan list
 if DEBUG == "True":
 print (IPSCANLIST)
 IPCOMMASEPERATED = ""
 for IP in IPSCANLIST:
 IPCOMMASEPERATED = IPCOMMASEPERATED + IP + ","
 IPCOMMASEPERATED = IPCOMMASEPERATED[:-1]
 if DEBUG == "True":
 print (IPCOMMASEPERATED)
 if ENVIRONMENT == "QA":
 #Check connectivity to VNE
 response = requests.get('https://' + VNEIP, verify=False)
 print ("VNE connection status code: " + str(response.status_code))
 else:
 #Get the secret password from Secrets Manager
 ip360password = (json.loads(get_secret()))['VNEPassword']
 REPLACE_IP360_NETWORK(IP360NETWORK,IPCOMMASEPERATED,"",VNEIP,VNEUSER,ip360password)
 EXECUTE_IP360_ADHOC_SCAN(IP360NETWORK,IP360DP,IP360PROFILE,VNEIP,VNEUSER,ip360password)
 print ("Tripwire scan initiated")