Exploratory Threat Analytics using Jupyter Notebooks#
Whoami ?#
Saksham Tushar - Bangalore/India - Lead Threat Researcher/Detection Engineer @ CRED
Loves Tactical Security (All Things Threat: Detection/Analytics/Intelligence/Hunting)
Seeks automation & Coffee Everywhere!!
Why Notebooks & not SIEM/EDR or any other Tool?#
SIEM is for Log Collection & Detection, Case Management is for Notes & Annotation, Let’s use Jupyter for Investigation#
Vendor agnostic Query Language : Programming Language
Intersection of Code/Investigation/Annotation-notes
Programmatic control over Data/Logs
Enrichment and Context on the Fly!!
Orchestrated Approach and Flow
Let’s Explore a simple Investigation of Finding Suspicious Powershell Executions.#
Two Common Use-case that I have is :
Import bulk alerts that might have been triggered in last 1 day due to a spike in data volume and perform statistical analysis on them to do a bulk investigation
Or Do bulk Analysis of alerts for a New Detection to analyze areas of Fine-tuning and improvement/Context.
Use-Case : Data from Wherever you want (Security Tools/S3/Online Datasets/Git repo)#
from elasticsearch import Elasticsearch
from elasticsearch import RequestsHttpConnection
from elasticsearch_dsl import Search,A
Use-Case : Data Analysis Capabilities#
Need Python Data Analysis capabilities ?
import pandas as pd
Need More ?? , SQL, Graphs, ML, Threat Intelligence, Alerts, Datasets, Visualization… ?#
Gather Analytical Capabilties (pyspark, Seaborn,plotly, graphframes)
let me also Import Pyspark , you know for SQL capabilities#
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.caseSensitive", "true")
pd.set_option('display.max_columns',None)
pd.set_option('display.max_rows',None)
pd.set_option('display.max_colwidth',None)
Let’s do a function to query Elastic to Pull data! We can call this Function From whenever I Need, so subject data at disposal is sorted!!.#
#also Let's Suppress SSL Warnings as I'm making Unverified HTTPS request in my isolated Environment.
es2 = Elasticsearch(['https://192.168.0.107:9200'], connection_class=RequestsHttpConnection, http_auth=('elastic', 'MyPassword'), use_ssl=True, verify_certs=False)
searchContext = Search(using=es2, index='logs-endpoint.events*', doc_type='doc')
def queryes(query) :
print('Running Query : '+ query)
s = searchContext.query('query_string', query=query).filter('range' , **{'@timestamp': {'gte': "now-120d/d" , 'lt': "now/d", 'format' : 'basic_date'}})
response = s.execute()
if response.success():
df = pd.json_normalize((d.to_dict() for d in s.scan()))
print("data fetched Parsing...")
sdf=spark.createDataFrame(df.astype(str))
#data santization
clean_df = sdf.toDF(*(c.replace('.', '_') for c in sdf.columns))
clean_df = clean_df.toDF(*(c.replace('@', '') for c in clean_df.columns))
print("Done!!!")
return clean_df
else :
print("Es query Failed")
Pull all elasticsearch Events from my SIEM - Elastic to investigate/Hunt for [‘Command and Scripting Interpreter: PowerShell’] https://attack.mitre.org/techniques/T1059/001/#
power_events= queryes("data_stream.dataset:endpoint.events.process AND process.name:powershell.exe")
power_events.createOrReplaceTempView('powershell_events')
Running Query : data_stream.dataset:endpoint.events.process AND process.name:powershell.exe
data fetched Parsing...
Done!!!
Use-Case : Statistical Capabilities at your disposal#
display(spark.sql('select count(*),process_parent_name from powershell_events group by process_parent_name order by count(*) asc').show(1000,truncate=200, vertical=False))
+--------+-------------------+
|count(1)|process_parent_name|
+--------+-------------------+
| 4| java.exe|
| 4| npcap.exe|
| 4| mshta.exe|
| 7| explorer.exe|
| 8| RuntimeBroker.exe|
| 8| Code.exe|
| 8| cmd.exe|
| 12| nan|
| 16| powershell.exe|
| 112|CompatTelRunner.exe|
+--------+-------------------+
None
display(spark.sql('select count(*),process_command_line,process_parent_name from powershell_events group by process_command_line,process_parent_name order by count(*) asc').show(1000,truncate=200, vertical=False))
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+
|count(1)| process_command_line|process_parent_name|
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+
| 2| Powershell -NoProfile -NonInteractive -NoLogo -ExecutionPolicy Bypass -File "C:\Users\stushar\Downloads\neo4j-community-4.4.11\bin\neo4j.ps1" install-service| cmd.exe|
| 2|powershell.exe -NoProfile -WindowStyle Hidden -NonInteractive -Command "Register-ScheduledTask -Force -TaskName 'npcapwatchdog' -Description 'Ensure Npcap service is configured to start at boot' -A...| npcap.exe|
| 2|powershell.exe -NoProfile -ExecutionPolicy unrestricted -Command "& { [Net.ServicePointManager]::SecurityProtocol = [Net.ServicePointManager]::SecurityProtocol -bor [Net.SecurityProtocolType]::Tls...| cmd.exe|
| 2| powershell.exe -OutputFormat Text -ExecutionPolicy Bypass -Command "Get-Service neo4j | Format-Table -AutoSize"| java.exe|
| 2| "C:\Windows\SysWOW64\WindowsPowerShell\v1.0\powershell.exe" | RuntimeBroker.exe|
| 2| powershell.exe -NoProfile -WindowStyle Hidden -NonInteractive -Command "Start-Service -Name npcap -PassThru | Stop-Service -PassThru | Start-Service"| npcap.exe|
| 2|powershell.exe -OutputFormat Text -ExecutionPolicy Bypass -Command "& 'C:\Users\stushar\Downloads\neo4j-community-4.4.11\bin\tools\prunsrv-amd64.exe'" "'//IS//neo4j' '--StartMode=jvm' '--StartMetho...| java.exe|
| 2| powershell | cmd.exe|
| 2| powershell -nop .\Desktop\Initial_Dropper.ps1| cmd.exe|
| 4|"C:\Windows\System32\WindowsPowerShell\v1.0\powershell.exe" -encodedcommand JABoAD0AKABnAHAAIABIAEsATABNADoAXABTAFkAUwBUAEUATQBcAEMAdQByAHIAZQBuAHQAQwBvAG4AdAByAG8AbABTAGUAdABcAFMAZQByAHYAaQBjAGUAc...| mshta.exe|
| 6| "C:\Windows\System32\WindowsPowerShell\v1.0\powershell.exe" | RuntimeBroker.exe|
| 7| "C:\Windows\System32\WindowsPowerShell\v1.0\powershell.exe" | explorer.exe|
| 8| C:\WINDOWS\System32\WindowsPowerShell\v1.0\powershell.exe| Code.exe|
| 12|"C:\Windows\System32\WindowsPowerShell\v1.0\powershell.exe" -encodedcommand JABoAD0AKABnAHAAIABIAEsATABNADoAXABTAFkAUwBUAEUATQBcAEMAdQByAHIAZQBuAHQAQwBvAG4AdAByAG8AbABTAGUAdABcAFMAZQByAHYAaQBjAGUAc...| nan|
| 16|"C:\WINDOWS\System32\WindowsPowerShell\v1.0\powershell.exe" -encodedCommand JABoAD0AKABnAHAAIABIAEsATABNADoAXABTAFkAUwBUAEUATQBcAEMAdQByAHIAZQBuAHQAQwBvAG4AdAByAG8AbABTAGUAdABcAFMAZQByAHYAaQBjAGUAc...| powershell.exe|
| 112| powershell.exe -ExecutionPolicy Restricted -Command Write-Host 'Final result: 1';|CompatTelRunner.exe|
+--------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+
None
display(spark.sql('select process_parent_name,process_name,process_command_line from powershell_events where process_parent_name in ("mshta.exe","cmd.exe") group by process_parent_name,process_name,process_command_line').show(1000,truncate=0, vertical=True))
-RECORD 0-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
process_parent_name | mshta.exe
process_name | powershell.exe
process_command_line | "C:\Windows\System32\WindowsPowerShell\v1.0\powershell.exe" -encodedcommand JABoAD0AKABnAHAAIABIAEsATABNADoAXABTAFkAUwBUAEUATQBcAEMAdQByAHIAZQBuAHQAQwBvAG4AdAByAG8AbABTAGUAdABcAFMAZQByAHYAaQBjAGUAcwBcAFgAIAAiAHMAIgApAC4AcwA7ACQAaAAuAFMAcABsAGkAdAAoACIAIAAiACkAfABmAG8AcgBFAGEAYwBoAHsAWwBjAGgAYQByAF0AKABbAGMAbwBuAHYAZQByAHQAXQA6ADoAdABvAGkAbgB0ADEANgAoACQAXwAsADEANgApACkAfQB8AGYAbwByAEUAYQBjAGgAewAkAHIAPQAkAHIAKwAkAF8AfQA7AGkAZQB4ACAAJAByADsA
-RECORD 1-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
process_parent_name | cmd.exe
process_name | powershell.exe
process_command_line | powershell.exe -NoProfile -ExecutionPolicy unrestricted -Command "& { [Net.ServicePointManager]::SecurityProtocol = [Net.ServicePointManager]::SecurityProtocol -bor [Net.SecurityProtocolType]::Tls12 ; & 'c:\Users\Leolabs-win\.vscode\extensions\ms-dotnettools.vscode-dotnet-runtime-1.5.0\dist\install scripts\dotnet-install.ps1' -InstallDir 'c:\Users\Leolabs-win\AppData\Roaming\Code\User\globalStorage\ms-dotnettools.vscode-dotnet-runtime\.dotnet\6.0.9' -Version 6.0.9 -Runtime dotnet }
-RECORD 2-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
process_parent_name | cmd.exe
process_name | powershell.exe
process_command_line | powershell
-RECORD 3-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
process_parent_name | cmd.exe
process_name | powershell.exe
process_command_line | powershell -nop .\Desktop\Initial_Dropper.ps1
-RECORD 4-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
process_parent_name | cmd.exe
process_name | powershell.exe
process_command_line | Powershell -NoProfile -NonInteractive -NoLogo -ExecutionPolicy Bypass -File "C:\Users\stushar\Downloads\neo4j-community-4.4.11\bin\neo4j.ps1" install-service
None
import base64
def base64ToString(b):
return base64.b64decode(b).decode('utf-16')
base64ToString("JABoAD0AKABnAHAAIABIAEsATABNADoAXABTAFkAUwBUAEUATQBcAEMAdQByAHIAZQBuAHQAQwBvAG4AdAByAG8AbABTAGUAdABcAFMAZQByAHYAaQBjAGUAcwBcAFgAIAAiAHMAIgApAC4AcwA7ACQAaAAuAFMAcABsAGkAdAAoACIAIAAiACkAfABmAG8AcgBFAGEAYwBoAHsAWwBjAGgAYQByAF0AKABbAGMAbwBuAHYAZQByAHQAXQA6ADoAdABvAGkAbgB0ADEANgAoACQAXwAsADEANgApACkAfQB8AGYAbwByAEUAYQBjAGgAewAkAHIAPQAkAHIAKwAkAF8AfQA7AGkAZQB4ACAAJAByADsA")
'$h=(gp HKLM:\\SYSTEM\\CurrentControlSet\\Services\\X "s").s;$h.Split(" ")|forEach{[char]([convert]::toint16($_,16))}|forEach{$r=$r+$_};iex $r;'
Use-Case : SuperCharged API Utilization#
use api powers of your tools - SQL API from Elasticsearch
import requests
from requests.auth import HTTPBasicAuth
import json
headers = {'Content-Type': 'application/json',}
query = {'query': ''' SELECT "@timestamp", "process.name","source.ip","source.port","destination.ip","destination.port" FROM "logs-endpoint.events*" where "process.name" = 'powershell.exe' and "destination.ip" IS NOT NULL and "@timestamp" > TODAY() - INTERVAL 90 DAY LIMIT 10000'''}
response = requests.post('https://192.168.0.107:9200/_sql?format=json', headers=headers, data=json.dumps(query) ,auth=HTTPBasicAuth('elastic', 'MyPassword'),verify=False)
powershel_network_events=pd.DataFrame(json.loads(response.text)['rows'],columns=['Timestamp','Process','Source_ip','Source_port','Destination_ip','Destination_port'])
len(powershel_network_events)
1000
powershel_network_events.head()
Timestamp | Process | Source_ip | Source_port | Destination_ip | Destination_port | |
---|---|---|---|---|---|---|
0 | 2022-09-30T21:02:02.508Z | powershell.exe | 192.168.1.104 | 58100 | 58.158.177.102 | 80 |
1 | 2022-11-18T05:58:53.998Z | powershell.exe | 192.168.1.104 | 55563 | 58.158.177.102 | 80 |
2 | 2022-09-30T21:02:02.751Z | powershell.exe | 192.168.1.104 | 58101 | 58.158.177.102 | 80 |
3 | 2022-11-18T05:58:53.998Z | powershell.exe | 192.168.1.104 | 55563 | 58.158.177.102 | 80 |
4 | 2022-11-18T05:58:54.236Z | powershell.exe | 192.168.1.104 | 55564 | 58.158.177.102 | 80 |
Popular Threat Hunting Techniques like stack counting/Grouping/Clustering are a breeze away!!#
Stack Counting to Check Unique IPs and Connection Count
powershel_network_events.groupby(['Source_ip','Destination_ip']).size()
Source_ip Destination_ip
192.168.1.104 58.158.177.102 1000
dtype: int64
Interestingly All Connections are made to Same Destination IP
Use-Case - Data Enrichment#
Let’s Enrich reputational Data from Virustotal
Gather Data & Intelligence : There are Product APIs, Webhooks (Siem/Case-management/Threat Intelligence Platform/EDRs/ Git/Slack), Service-APIs(Virustotal,Curl Websites, scrape data), The possibility to gather data is endless.
#let's Correlate Data from Virustotal :
def check_virustotal(ip):
headers = {
'x-apikey': '360523cac7446ee2bde736c004c72661718185c985d192d7e91f4a71fa8cedfc',
}
response = requests.get('https://www.virustotal.com/api/v3/ip_addresses/'+ip, headers=headers)
return response.json()['data']['attributes']['last_analysis_stats']
print("Malicious Score "+ str(check_virustotal(powershel_network_events['Destination_ip'].iloc[0])))
## do a for loop for as many IPs as you want.
Malicious Score {'harmless': 69, 'malicious': 14, 'suspicious': 0, 'undetected': 13, 'timeout': 0}
Use-Case - Data Visualization powers (You are free to use your favourite library, Matplotlib, seaborn, plotly etc etc…)#
Exploratory Analysis of Process events using plotly
Calling in Data from EDR Logs - this could be your EDR of choice, Defender/Crowdstrike/Carbon-black/sentinelOne/Elastic-EDR/OSQUERY etc etc.
I’m using Elastic-EDR along with Elastic SIEM for this Case study.
query = {'query': ''' SELECT "@timestamp", "process.name","process.command_line" FROM "logs-endpoint.events*" where "process.name" = 'powershell.exe' AND "process.command_line" IS NOT NULL AND "@timestamp" > TODAY() - INTERVAL 90 DAY LIMIT 10000'''}
Explo_analysis_example_response = requests.post('https://192.168.0.107:9200/_sql?format=json', headers=headers, data=json.dumps(query) ,auth=HTTPBasicAuth('elastic', 'MyPassword'),verify=False)
Use-Case - programmatic Control over Data, Wrangling, tuning, sanitization, enrichment, whatever you need !!!#
Truely a Canvas limited by the Artist’s Creativity.
from datetime import datetime
#load results of SQL Search into the Dataframe
Explo_analysis_example_df=pd.DataFrame(json.loads(Explo_analysis_example_response.text)['rows'],columns=['Timestamp','Process','Commandline'])
#Creating a new column of Data which hold datetime formatted object
Explo_analysis_example_df['Timestamp_parsed']=Explo_analysis_example_df['Timestamp'].apply(lambda x : datetime.strptime(x,"%Y-%m-%dT%H:%M:%S.%fZ"))
#creating a Column of Data which holds Date of event
Explo_analysis_example_df['Timestamp_date']=Explo_analysis_example_df['Timestamp_parsed'].apply(lambda x: x.date())
#Resetting index and grouping by commandline my Data set is ready for Investigation
plot_df=Explo_analysis_example_df.groupby(['Timestamp_date','Commandline']).size().reset_index()
plot_df.head()
Timestamp_date | Commandline | 0 | |
---|---|---|---|
0 | 2022-09-01 | powershell.exe -ExecutionPolicy Restricted -Command Write-Host 'Final result: 1'; | 4 |
1 | 2022-09-02 | powershell.exe -ExecutionPolicy Restricted -Command Write-Host 'Final result: 1'; | 2 |
2 | 2022-09-03 | powershell.exe -ExecutionPolicy Restricted -Command Write-Host 'Final result: 1'; | 2 |
3 | 2022-09-04 | "C:\Windows\SysWOW64\WindowsPowerShell\v1.0\powershell.exe" | 2 |
4 | 2022-09-04 | "C:\Windows\System32\WindowsPowerShell\v1.0\powershell.exe" | 2 |
import plotly.express as px
fig = px.bar(plot_df, x="Timestamp_date", y=0, color="Commandline", title="Exploraing CommandLine Executions occurances to identify outliers")
fig.update_layout(yaxis={'visible': True, 'showticklabels': False})
fig.update_layout(xaxis={'visible': True, 'showticklabels': True})
fig.layout.showlegend = False
fig.show()
Use-Case - Case-Management- bleeding into the Lifecycle !!!#
We’ll use theHive Case Management solution for the Demo
#import the required libraries - you can have jira/servicenow or any other Case management tools and use their apis to perform the same functions.
from thehive4py.api import TheHiveApi
from thehive4py.models import Alert, AlertArtifact, CustomFieldHelper
from thehive4py.models import Case, CaseObservable
THEHIVE_URL = 'http://192.168.0.107:9000'
THEHIVE_API_KEY = '6EyENjxqrFATV0S9zU99jxxjCAARFzCj'
api = TheHiveApi(THEHIVE_URL, THEHIVE_API_KEY)
#creating the case
print('Pushing to Create a new case')
print('-----------------------------')
case = Case(title='Hunt: Suspicious Powershell Observation', description='Based on the Hunt, we observed suspicious Powershell COmmandline, malicious IP address communication and Deviation from the Baseline activity.', tlp=2, tags=['Jupyterthon2022,Hunt,Powershell'])
print(case.jsonify())
response = api.create_case(case)
if response.status_code == 201:
print(json.dumps(response.json(), indent=4, sort_keys=True))
print('')
id = response.json()['id']
else:
print('ko: {}/{}'.format(response.status_code, response.text))
sys.exit(0)
print('Create observable IP')
print('-----------------------------')
domain = CaseObservable(dataType='ip',
data=['58.158.177.102'],
tlp=1,
ioc=True,
tags=['Hunt - Powershell, Malicious IP'],
message='test'
)
response = api.create_case_observable(id, domain)
if response.status_code == 201:
print(json.dumps(response.json(), indent=4, sort_keys=True))
print('')
else:
print('ko: {}/{}'.format(response.status_code, response.text))
sys.exit(0)
print('Create observable Other Details')
print('-----------------------------')
domain = CaseObservable(dataType='other',
data=['Suspicious IP Connection pattern to 58.158.177.102 is observed-IP reputation is Poor','Susspisi'],
tlp=1,
ioc=True,
tags=['Hunt - Powershell, Malicious IP'],
message='test'
)
response = api.create_case_observable(id, domain)
if response.status_code == 201:
print(json.dumps(response.json(), indent=4, sort_keys=True))
print('')
else:
print('ko: {}/{}'.format(response.status_code, response.text))
sys.exit(0)
Pushing to Create a new case
-----------------------------
{
"customFields": {},
"description": "Based on the Hunt, we observed suspicious Powershell COmmandline, malicious IP address communication and Deviation from the Baseline activity.",
"flag": false,
"id": null,
"metrics": {},
"owner": null,
"pap": 2,
"severity": 2,
"startDate": 1669825472000,
"tags": [
"Jupyterthon2022,Hunt,Powershell"
],
"tasks": [],
"template": null,
"title": "Hunt: Suspicious Powershell Observation",
"tlp": 2
}
{
"_id": "~49216",
"_type": "case",
"caseId": 3,
"createdAt": 1669825472677,
"createdBy": "sakshamtushar@gmail.com",
"customFields": {},
"description": "Based on the Hunt, we observed suspicious Powershell COmmandline, malicious IP address communication and Deviation from the Baseline activity.",
"endDate": null,
"flag": false,
"id": "~49216",
"impactStatus": null,
"owner": "sakshamtushar@gmail.com",
"pap": 2,
"permissions": [
"manageShare",
"manageAnalyse",
"manageTask",
"manageCaseTemplate",
"manageCase",
"manageUser",
"manageProcedure",
"managePage",
"manageObservable",
"manageTag",
"manageConfig",
"manageAlert",
"accessTheHiveFS",
"manageAction"
],
"resolutionStatus": null,
"severity": 2,
"startDate": 1669825472000,
"stats": {},
"status": "Open",
"summary": null,
"tags": [
"Jupyterthon2022,Hunt,Powershell"
],
"title": "Hunt: Suspicious Powershell Observation",
"tlp": 2,
"updatedAt": null,
"updatedBy": null
}
Create observable
-----------------------------
[
{
"_id": "~24728",
"_type": "case_artifact",
"createdAt": 1669825474375,
"createdBy": "sakshamtushar@gmail.com",
"data": "58.158.177.102",
"dataType": "ip",
"id": "~24728",
"ioc": true,
"message": "test",
"reports": {},
"sighted": false,
"startDate": 1669825474375,
"stats": {},
"tags": [
"Hunt - Powershell, Malicious IP"
],
"tlp": 1
}
]
More Tips & use-cases#
Tip : Wrap all your reusable functions into a seperate python file -> Import and call them in all your notebooks wherever needed
Tip : Schedule your notebooks to perform periodic hunts/Data Analysis reports
Tip : Write a Web-server to call Notebooks on Demand or Use CLoud services like AWS Sagemaker to Make it API Driven.
Usecase : Correlation - More Events from your security layers from Zeek/Suricata,Threat Intelligence Platform, MITRE, Firewall
Usecase : Containment Action, call your EDR/Tools api to contain a Host or Perform netowrk isolation
What you’ve acheived by Using Jupyter Notebooks for conducting this Analysis :#
What investigation was performed.
Notebook as tactical Investigation Report
ReUsable Notebook - Variables not Constants
You Can Draft your :
Hunting Notebook
Data Analysis Notebook
Investigation Notebook
Response Notebook
Detection Notebooks
Threat Intelligence Tracking Notebooks