Extending MSTICPy with Pivot functions#
Extending MSTICPy with Pivot functions
Infosec Jupyterthon 2022
Ian Hellen (@ianhellen Twitter/ @ianhelle GitHub)#
Principal Software Engineer
Microsoft Security Research
# MSTICPy initialization
import msticpy as mp
mp.init_notebook()
Pivot function definition
A pivot function is a contextual function thatlets you invoke operations on different entity types.
Pivot functions are wrappers around other functions and
are exposed as methods of entity classes:
Examples:
> Dns.util.dns_resolve("www.python.org")
> Host.MSSentinel.get_processes(host_name="MyPC")
1. Adding a simple pivot function
Dns.pivots()
['MSSentinel_cybersecuritysoc.dns_queries',
'MSSentinel_cybersecuritysoc.sent_bookmarks',
'MSSentinel_cybersecuritysoc.ti_list_indicators_by_domain',
'RiskIQ.articles',
'RiskIQ.artifacts',
'RiskIQ.certificates',
'RiskIQ.components',
'RiskIQ.cookies',
'RiskIQ.hostpair_children',
'RiskIQ.hostpair_parents',
'RiskIQ.malware',
'RiskIQ.projects',
'RiskIQ.reputation',
'RiskIQ.resolutions',
'RiskIQ.summary',
'RiskIQ.trackers',
'RiskIQ.whois',
'VT.vt_communicating_files',
'VT.vt_historical_ssl_certificates',
'VT.vt_historical_whois',
'VT.vt_parent',
'VT.vt_resolutions',
'VT.vt_subdomains',
'bye',
'defang2',
'dns_is_resolvable',
'dns_resolve',
'hello',
'other.bye',
'other.defang2',
'other.hello',
'ti.lookup_dns',
'tilookup_dns',
'util.defang',
'util.dns_components',
'util.dns_in_abuse_list',
'util.dns_is_resolvable',
'util.dns_resolve',
'util.dns_validate_tld']
Here is the function that we want to add to some entities#
Note it takes a string parameter for the entity value input and returns a string
def defang_ioc(ioc: str, ioc_type: str = None) -> str:
"""
Return de-fanged observable.
Parameters
----------
ioc : str
The observable.
ioc_type : str
The type of IoC. If URL or Email it will do
extra processing to neuter the URL protocol and email @ symbol
Returns
-------
str
The de-fanged observable.
"""
de_fanged = ioc
if ioc_type == "email":
de_fanged = de_fanged.replace("@", "AT")
elif ioc_type == "url":
de_fanged = de_fanged.replace("http", "hXXp").replace("ftp", "fXp")
return de_fanged.replace(".", "[.]")
# Check that our function works as expected
defang_ioc("192.168.1.1")
'192[.]168[.]1[.]1'
Call Pivot.add_pivot_function
to add the function to a couple of entities#
mp.Pivot.add_pivot_function(
func=defang_ioc,
container="util",
input_type="value",
entity_map={
"IpAddress": "Address",
"Dns": "Domain",
},
func_input_value_arg="ioc",
func_new_name="defang",
)
Now we can defang IP addresses and DNS names#
it accepts lists and DataFrames as inputs.
IpAddress.util.defang("192.1.1.1")
ioc | result | src_row_index | |
---|---|---|---|
0 | 192.1.1.1 | 192[.]1[.]1[.]1 | 0 |
Dns.util.defang("www.infosecjupyterthon.com")
ioc | result | src_row_index | |
---|---|---|---|
0 | www.infosecjupyterthon.com | www[.]infosecjupyterthon[.]com | 0 |
IpAddress.util.defang(["54.69.246.204", "104.73.1.162"])
ioc | result | src_row_index | |
---|---|---|---|
0 | 54.69.246.204 | 54[.]69[.]246[.]204 | 0 |
1 | 104.73.1.162 | 104[.]73[.]1[.]162 | 1 |
For URLs we want to also set the ioc_type
parameter#
We can add that as a registration parameter - func_static_params
.
This is a dict of {param_name: param_value}
# Adding static parameters to supply ioc_type param
mp.Pivot.add_pivot_function(
func=defang_ioc,
container="util",
input_type="value",
entity_map={
"Url": "Url",
},
func_input_value_arg="ioc",
func_new_name="defang",
func_static_params={"ioc_type": "url"}
)
We should now have a defang
method on the Url
class
Url.util.defang("https://python.org")
ioc | result | src_row_index | |
---|---|---|---|
0 | https://python.org | hXXps://python[.]org | 0 |
2. Adding functions from modules and packages
Define a Python module and write it to a file#
%%writefile ioc_utils.py
"""IoC Utility functions"""
def defang_ioc(ioc: str, ioc_type: str = None) -> str:
"""
Return de-fanged observable.
Parameters
----------
ioc : str
The observable.
ioc_type : str
The type of IoC. If URL or Email it will do
extra processing to neuter the URL protocol and email @ symbol
Returns
-------
str
The de-fanged observable.
"""
de_fanged = ioc
if ioc_type == "email":
de_fanged = de_fanged.replace("@", "AT")
elif ioc_type == "url":
de_fanged = de_fanged.replace("http", "hXXp").replace("ftp", "fXp")
return de_fanged.replace(".", "[.]")
def ioc_hello(ioc: str, lang: str="en"):
if lang == "en":
return f"Hello {ioc}!"
if lang == "es":
return f"Hola {ioc}!"
if lang == "it":
return f"Ciao {ioc}!"
def ioc_goodbye(ioc: str, lang: str="en"):
if lang == "en":
return f"Goodbye {ioc}!"
if lang == "es":
return f"Adios {ioc}!"
if lang == "it":
return f"Ciao {ioc}!"
Overwriting ioc_utils.py
Pivot definition file#
%%writefile pivot_funcs.yaml
pivot_providers:
# Defang function for IPs and DNS
defang_ioc:
src_module: ioc_utils
src_func_name: defang_ioc
func_new_name: defang2
input_type: value
entity_map:
IpAddress: Address
Dns: Domain
func_input_value_arg: ioc
create_shortcut: True
# Defang function for IPs and DNS
defang_ioc_url:
src_module: ioc_utils
src_func_name: defang_ioc
func_new_name: defang2
input_type: value
entity_map:
Account: qualified_name
Mailbox: MailboxPrimaryAddress
func_input_value_arg: ioc
create_shortcut: True
func_static_params:
ioc_type: email
# Other "utility" functions
hello:
src_module: ioc_utils
src_func_name: ioc_hello
func_new_name: hello
input_type: value
entity_map:
IpAddress: Address
Dns: Domain
Account: qualified_name
Mailbox: MailboxPrimaryAddress
func_input_value_arg: ioc
create_shortcut: True
goodbye:
src_module: ioc_utils
src_func_name: ioc_goodbye
func_new_name: bye
input_type: value
entity_map:
IpAddress: Address
Dns: Domain
Account: qualified_name
Mailbox: MailboxPrimaryAddress
func_input_value_arg: ioc
create_shortcut: True
quote:
src_module: urllib.parse
src_func_name: quote
func_new_name: quote
input_type: value
entity_map:
Url: Url
func_input_value_arg: string
Overwriting pivot_funcs.yaml
# Good to check that your file reads as valid YAML
# - no exceptions thrown!
import yaml
from pathlib import Path
yaml.safe_load(Path("./pivot_funcs.yaml").read_text());
help(mp.Pivot.register_pivot_providers)
Help on function register_pivot_providers in module msticpy.init.pivot:
register_pivot_providers(pivot_reg_path: str, namespace: Dict[str, Any] = None, def_container: str = 'custom', force_container: bool = False)
Register pivot functions from configuration file.
Parameters
----------
pivot_reg_path : str
Path to config yaml file
namespace : Dict[str, Any], optional
Namespace to search for existing instances of classes, by default None
def_container : str, optional
Container name to use for entity pivot functions, by default "other"
force_container : bool, optional
Force `container` value to be used even if entity definitions have
specific setting for a container name, by default False
Raises
------
ValueError
An entity specified in the config file is not recognized.
mp.Pivot.register_pivot_providers("pivot_funcs.yaml")
Mailbox.pivots()
['bye', 'defang2', 'hello', 'other.bye', 'other.defang2', 'other.hello']
Mailbox.hello("ian@infosecjupyterthon.com")
ioc | result | src_row_index | |
---|---|---|---|
0 | ian@infosecjupyterthon.com | Hello ian@infosecjupyterthon.com! | 0 |
Mailbox.hello("ian@infosecjupyterthon.com", lang="it")
ioc | result | src_row_index | |
---|---|---|---|
0 | ian@infosecjupyterthon.com | Ciao ian@infosecjupyterthon.com! | 0 |
Mailbox.hello(
("ian@infosecjupyterthon.com", "roberto@infosecjupyterthon.com"),
lang="es"
)
ioc | result | src_row_index | |
---|---|---|---|
0 | ian@infosecjupyterthon.com | Hola ian@infosecjupyterthon.com! | 0 |
1 | roberto@infosecjupyterthon.com | Hola roberto@infosecjupyterthon.com! | 1 |
Works with functions from other libraries#
Python’s urllib.parse.quote
function
Url.other.quote("https://myspacey.dom/the path/to mañana")
string | result | src_row_index | |
---|---|---|---|
0 | https://myspacey.dom/the path/to mañana | https%3A//myspacey.dom/the%20path/to%20ma%C3%B1ana | 0 |
What are the benefits?
1. Entity-specific functionality all in one place - no searching, no imports#
2. Flexible input types#
- Can also wrap functions that expect list
or dataframe
input#
3. Standardized output types#
3. Queries as Pivot functions
Starting Query#
SecurityEvent
| where TimeGenerated > ago(1d)
| where EventID == 4625
| where Computer has "infected_pc"
Parameterize the query#
SecurityEvent
| where TimeGenerated between (datetime({start}) .. datetime({end}))
| where EventID == 4624
| where Computer has "{host_name}"
Example query definition file#
Add query to query definition file#
metadata:
version: 1
description: My Event Queries
data_environments: [MSSentinel]
data_families: [Events]
defaults:
parameters:
start:
description: Query start time
type: datetime
end:
description: Query end time
type: datetime
add_query_items:
description: Additional query clauses
type: str
default: ""
sources:
get_alerts_for_host:
description: Retrieves list of alerts for host
args:
query: '
SecurityEvent
| where TimeGenerated between (datetime({start}) .. datetime({end}))
| where EventID == 4624
| where Computer has "{host_name}"
{add_query_items}'
uri: None
parameters:
host_name:
description: Name of host
type: str
# Create a subfolder to hold our query files
!mkdir queries
A subdirectory or file queries already exists.
Write our query definitions to a file#
%%writefile queries/my_queries.yaml
metadata:
version: 1
description: My Event Queries
data_environments: [MSSentinel]
data_families: [WinEvents]
defaults:
parameters:
start:
description: Query start time
type: datetime
end:
description: Query end time
type: datetime
add_query_items:
description: Additional query clauses
type: str
default: ""
sources:
get_logons_for_host_jt:
description: Retrieves list of alerts for Jupyterthon
args:
query: '
SecurityEvent
| where TimeGenerated between (datetime({start}) .. datetime({end}))
| where EventID == 4624
| where Computer has "{host_name}"
| limit 3
{add_query_items}'
parameters:
host_name:
description: Name of host
type: str
Overwriting queries/my_queries.yaml
Create a query provider
and tell it to look in the ./queries folder for additional queries#
qry_prov = mp.QueryProvider("MSSentinel", query_paths=["./queries"])
# Authenticate/connect
qry_prov.connect(workspace="CyberSecuritySOC")
Connecting... connected
# Check that our query has been added to the provider
qry_prov.WinEvents.get_logons_for_host_jt("?")
Query: get_logons_for_host_jt
Data source: MSSentinel
Retrieves list of alerts for Jupyterthon
Parameters
----------
add_query_items: str (optional)
Additional query clauses
end: datetime
Query end time
host_name: str
Name of host
start: datetime
Query start time
Query:
SecurityEvent | where TimeGenerated between (datetime({start}) .. datetime({end})) | where EventID == 4624 | where Computer has "{host_name}" | limit 3 {add_query_items}
Since it has a “host_name” parameter it should also be
available as a pivot function on the Host class.#
# Host.MSSentinel_cybersecuritysoc()
Host
CommonSecurityLog_host_connections_csl (pivot function)
CommonSecurityLog_ips_csl (pivot function)
DeviceNetworkEvents_host_connections (pivot function)
DeviceProcessEvents_list_host_processes (pivot function)
VMComputer_vmcomputer (pivot function)
auditd_auditd_all (pivot function)
az_nsg_interface (pivot function)
az_nsg_net_flows (pivot function)
az_nsg_net_flows_depr (pivot function)
get_alerts_for_host (pivot function)
get_logons_for_host (pivot function)
get_logons_for_host_jt (pivot function)
heartbeat (pivot function)
heartbeat_for_host_depr (pivot function)
sec_alerts (pivot function)
sent_bookmarks (pivot function)
syslog_all_syslog (pivot function)
syslog_cron_activity (pivot function)
syslog_logon_failures (pivot function)
syslog_logons (pivot function)
syslog_notable_events (pivot function)
syslog_squid_activity (pivot function)
syslog_sudo_activity (pivot function)
syslog_summarize_events (pivot function)
syslog_user_group_activity (pivot function)
syslog_user_logon (pivot function)
sysmon_process_events (pivot function)
uncommon_powershell (pivot function)
wevt_account_change_events (pivot function)
wevt_all_events (pivot function)
wevt_events_by_id (pivot function)
wevt_get_process_tree (pivot function)
wevt_list_other_events (pivot function)
wevt_logon_attempts (pivot function)
wevt_logon_failures (pivot function)
wevt_logon_session (pivot function)
wevt_logons (pivot function)
wevt_notable_events (pivot function)
wevt_parent_process (pivot function)
wevt_process_session (pivot function)
wevt_processes (pivot function)
wevt_schdld_tasks_and_services (pivot function)
wevt_summarize_events (pivot function)
Host.MSSentinel_cybersecuritysoc.get_logons_for_host_jt(host_name="Workstation6")
TenantId | TimeGenerated | SourceSystem | Account | AccountType | Computer | EventSourceName | Channel | Task | Level | EventData | EventID | Activity | SourceComputerId | EventOriginId | MG | TimeCollected | ManagementGroupName | AccessList | AccessMask | AccessReason | AccountDomain | AccountExpires | AccountName | AccountSessionIdentifier | ... | TargetUserName | TargetUserSid | TemplateContent | TemplateDSObjectFQDN | TemplateInternalName | TemplateOID | TemplateSchemaVersion | TemplateVersion | TokenElevationType | TransmittedServices | UserAccountControl | UserParameters | UserPrincipalName | UserWorkstations | VirtualAccount | VendorIds | Workstation | WorkstationName | PartitionKey | RowKey | StorageAccount | AzureDeploymentID | AzureTableName | Type | _ResourceId | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 8ecf8077-cf51-4820-aadd-14040956f35d | 2022-12-01 17:49:47.817824+00:00 | OpsManager | NT AUTHORITY\SYSTEM | Machine | Workstation6.seccxp.ninja | Microsoft-Windows-Security-Auditing | Security | 12544 | 8 | 4624 | 4624 - An account was successfully logged on. | 6cff374c-b8ff-4495-9470-930d981b217a | 1afd56e9-75e6-4218-9421-076406ac08ff | 00000000-0000-0000-0000-000000000001 | 2022-12-01 17:50:09.566740+00:00 | AOI-8ecf8077-cf51-4820-aadd-14040956f35d | ... | SYSTEM | S-1-5-18 | - | %%1843 | - | SecurityEvent | /subscriptions/d1d8779d-38d7-4f06-91db-9cbc8de0176f/resourcegroups/simuland/providers/microsoft.... | ||||||||||||||||||||||||||
1 | 8ecf8077-cf51-4820-aadd-14040956f35d | 2022-12-01 17:55:23.462961800+00:00 | OpsManager | NT AUTHORITY\SYSTEM | Machine | Workstation6.seccxp.ninja | Microsoft-Windows-Security-Auditing | Security | 12544 | 8 | 4624 | 4624 - An account was successfully logged on. | 6cff374c-b8ff-4495-9470-930d981b217a | 590a20a0-305e-4668-bc37-721e2b38c89c | 00000000-0000-0000-0000-000000000001 | 2022-12-01 17:55:54.552881700+00:00 | AOI-8ecf8077-cf51-4820-aadd-14040956f35d | ... | SYSTEM | S-1-5-18 | - | %%1843 | - | SecurityEvent | /subscriptions/d1d8779d-38d7-4f06-91db-9cbc8de0176f/resourcegroups/simuland/providers/microsoft.... | ||||||||||||||||||||||||||
2 | 8ecf8077-cf51-4820-aadd-14040956f35d | 2022-12-01 17:44:42.875699300+00:00 | OpsManager | NT AUTHORITY\SYSTEM | Machine | Workstation6.seccxp.ninja | Microsoft-Windows-Security-Auditing | Security | 12544 | 8 | 4624 | 4624 - An account was successfully logged on. | 6cff374c-b8ff-4495-9470-930d981b217a | b5e7ba10-38b5-41b1-99b7-20f6cf531510 | 00000000-0000-0000-0000-000000000001 | 2022-12-01 17:44:55.554926+00:00 | AOI-8ecf8077-cf51-4820-aadd-14040956f35d | ... | SYSTEM | S-1-5-18 | - | %%1843 | - | SecurityEvent | /subscriptions/d1d8779d-38d7-4f06-91db-9cbc8de0176f/resourcegroups/simuland/providers/microsoft.... |
3 rows × 225 columns
References
Pivot Functions#
https://msticpy.readthedocs.io/en/latest/data_analysis/PivotFunctions.html
Entity to parameter mapping
https://msticpy.readthedocs.io/en/latest/data_analysis/PivotFunctions.html#how-are-queries-assigned-to-specific-entities
Creating custom Pivot functions
https://msticpy.readthedocs.io/en/latest/data_analysis/PivotFunctions.html#customizing-and-managing-pivots
MSTICPy Queries#
https://msticpy.readthedocs.io/en/latest/data_acquisition/DataProviders.html
Creating custom queries
https://msticpy.readthedocs.io/en/latest/data_acquisition/DataProviders.html#creating-new-queries