Trace stolen funds across a blockchain
Tracing blockchain transactions and wallets is critical in the aftermath of a hack, enabling investigators to follow stolen funds, uncover illicit activity, and improve transparency. Kaiko Blockchain Monitoring provides direct, UI-free access to on-chain data, ensuring full control over data extraction and analysis. Unlike proprietary interfaces that can be restrictive, this approach allows for flexible, scalable investigations tailored to evolving threats.
This guide demonstrates how to:
Trace fund movements across Ethereum wallets
Aggregate and analyze on-chain flows
Conduct scalable investigations without the constraints of a predefined UI
Request information on a single address or transaction
To see the balances and transactions for a single Ethereum wallet, use this request example.
curl --compressed -H "Accept: application/json" -H "X-Api-Key: <client-api-key>" \
"https://eu.market-api.kaiko.io/v2/data/wallet.v1/audit?blockchain=ethereum"
import http.client
import json
# Enter your Kaiko API Key
api_key = "KAIKO_API_KEY"
api_host = "us.market-api.kaiko.io"
api_base_endpoint = "/v2/data/wallet.v1/audit"
# Start of parameter configuration
optional_params = {
"blockchain": "ethereum",
"start_time":'2024-01-01T00:00:00.000Z",
"end_time":'2024-01-02T00:00:00.000Z",
}
# End of parameter configuration
conn = http.client.HTTPSConnection(api_host)
headers = {
"X-Api-Key": api_key,
"Accept": "application/json"
}
url_params = []
for param, value in optional_params.items():
url_params.append(f"{param}={value}")
url_params = '&'.join(url_params)
# Pagination for next pages
all_data = []
next_url = f"{api_base_endpoint}?{url_params}"
while next_url:
conn.request("GET", next_url, headers=headers)
response = conn.getresponse()
data = json.loads(response.read().decode("utf-8"))
all_data.extend(data.get("data", []))
print(f"Fetched {len(data.get('data', []))} datapoints. Total: {len(all_data)}")
next_url = data.get("next_url", "").replace("https://us.market-api.kaiko.io", "")
if not next_url:
break
conn.close()
print(f" datapoints fetched: {(all_data)}")
Trace funds through several paths
To automatically trace the destination of funds, it's wise to use a custom Python script that:
Queries multiple wallets in a batch request
Automatically requests information on subesquent "hops" or flows
Consolidates into a final list of destinations addresses
This script dynamically traces fund flows across multiple paths, quickly delivering a list of all destination wallets. You can re-run the script any time to get an instantly up-to-date picture of the stolen funds and any new associated wallets.
import pandas as pd
import requests
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
import time
# Headers for the API request
headers_dict = {
'Accept': 'application/json',
'X-Api-Key': 'KAIKO_API_KEY'
}
# API URL for fetching Kaiko product Blockchain Monitoring
URL = 'https://us.market-api.kaiko.io/v2/data/wallet.v1/audit'
def get_data(url, headers, params=None, retries=5, delay=1):
"""
Fetches data from the given URL with retry logic.
Parameters:
url (str): The URL to fetch data from.
headers (dict): The headers for the request.
params (dict): The parameters for the request.
retries (int): The number of retry attempts.
delay (int): The delay between retry attempts in seconds.
Returns:
dict: The JSON response from the API.
"""
for attempt in range(retries):
try:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status() # Raise an exception for HTTP errors
return response.json()
except requests.exceptions.RequestException as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < retries - 1:
time.sleep(delay)
else:
raise
def get_all_events_for_address(address, start_time='2024-01-01T00:00:00.000Z'):
"""
Fetches all events for a given address starting from a specified time.
Parameters:
address (str): The user address to fetch events for.
start_time (str): The start time for fetching events in ISO 8601 format.
Returns:
pd.DataFrame: A DataFrame containing all events for the given address.
"""
data = pd.DataFrame()
params_dict = {
'blockchain': 'ethereum',
'user_address': address.lower(),
'start_time': start_time
}
res = get_data(URL, headers_dict, params_dict)
try:
data = pd.concat([data, pd.DataFrame(res['data'])], ignore_index=True)
except Exception as e:
print(e)
while 'next_url' in res.keys():
if res['next_url'] == None:
break
try:
res = get_data(res['next_url'], headers_dict)
data = pd.concat([data, pd.DataFrame(res['data'])], ignore_index=True)
except KeyboardInterrupt:
print("Exit")
break
except Exception as e:
print(e)
continue
return data
def get_all_events_for_addresses(addresses, start_time='2024-01-01T00:00:00.000Z'):
"""
Fetches all events for multiple addresses starting from a specified time using multithreading.
Parameters:
addresses (list): A list of user addresses to fetch events for.
start_time (str): The start time for fetching events in ISO 8601 format.
Returns:
pd.DataFrame: A DataFrame containing all events for the given addresses.
"""
with ThreadPoolExecutor(max_workers=20) as executor:
results = list(tqdm(executor.map(lambda addr: get_all_events_for_address(addr, start_time), addresses), total=len(addresses)))
return pd.concat(results, ignore_index=True)
def get_all_events_for_addresses_with_time(addresses_and_time):
"""
Fetches all events for multiple addresses with different start times using multithreading.
Parameters:
addresses_and_time (list of tuples): A list of tuples where each tuple contains an address and a start time.
example:
addresses_and_time = [
["0x47666fab8bd0ac7003bce3f5c3585383f09486e2", '2025-01-01T04:00:00.000Z'],
["0xaf620e6d32b1c67f3396ef5d2f7d7642dc2e6ce9", '2021-02-21T01:00:00.000Z']
]
Returns:
pd.DataFrame: A DataFrame containing all events for the given addresses and start times.
"""
with ThreadPoolExecutor(max_workers=20) as executor:
results = list(tqdm(executor.map(lambda addr: get_all_events_for_address(addr[0], addr[1]), addresses_and_time), total=len(addresses_and_time)))
return pd.concat(results, ignore_index=True)
# Addresses to be removed from the results
addresses_to_remove = {
"0x0000000000000000000000000000000000000000", # null address
"0x47666fab8bd0ac7003bce3f5c3585383f09486e2", # exploiter
"0xf89d7b9c864f589bbf53a82105107622b35eaa40", # bybit hot wallet
"0x1f9090aae28b8a3dceadf281b0f12828e676c326", # block builder
"0x95222290dd7278aa3ddd389cc1e1d165cc4bafe5", # block builder
"0x4838b106fce9647bdf1e7877bf73ce8b0bad5f97", # block builder
"0x388c818ca8b9251b393131c08a736a67ccb19297", # Lido execution builder
"0x7e2a2fa2a064f693f0a55c5639476d913ff12d05", # mev block builder
"0x6be457e04092b28865e0cba84e3b2cfa0f871e67", # mev block builder
"0xdadb0d80178819f2319190d340ce9a924f783711", # block builder
"0xe688b84b23f322a994a53dbf8e15fa82cdb71127", # block fee recipient
"0xd11d7d2cb0aff72a61df37fd016ee1bd9f180633", # mev block builder
"0x7adc0e867ebc337e2d20c44db181c067fa08637b", # block builder
"0x98ed2d46a27afeead62a5ea39d022a33ea4d25c1",
}
def get_receiver_addresses_filtered_with_time(events, filter_out):
"""
Filters and returns a list of receiver addresses with their earliest transaction timestamps,
excluding specified addresses.
Parameters:
events (pd.DataFrame): DataFrame containing transaction events.
filter_out (set): A set of addresses to be excluded from the results.
Returns:
list: A list of tuples where each tuple contains a receiver address and its earliest transaction timestamp.
"""
# Get all unique addresses that received funds in the dataframe
receiver_addresses_list_with_time = events[events["direction"] == "out"][["receiver_address", "timestamp"]]
receiver_addresses_list_with_time['timestamp'] = pd.to_datetime(receiver_addresses_list_with_time['timestamp'], unit='ns').dt.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
receiver_addresses_list_with_time = receiver_addresses_list_with_time.groupby('receiver_address')['timestamp'].min().reset_index().values.tolist()
# Remove specified addresses from the list
receiver_addresses_list_with_time = [addr for addr in receiver_addresses_list_with_time if addr[0] not in filter_out]
return receiver_addresses_list_with_time
# ----------------------------
# Get all events from the exploiter
exploiter_address = "0x47666fab8bd0ac7003bce3f5c3585383f09486e2"
hack_time = '2025-02-21T13:00:00.000Z'
exploiter_events = get_all_events_for_address(exploiter_address, start_time=hack_time)
# ----------------------------
# Get all events for all addresses that received funds from the exploiter
receiver_addresses_list_with_time = get_receiver_addresses_filtered_with_time(exploiter_events, addresses_to_remove)
receiver_addresses_events = get_all_events_for_addresses_with_time(receiver_addresses_list_with_time)
# ----------------------------
# Get all events for all addresses that received funds from addresses that received funds from the exploiter
receiver_addresses_2_list_with_time = get_receiver_addresses_filtered_with_time(receiver_addresses_events, addresses_to_remove)
receiver_addresses_2_events = get_all_events_for_addresses_with_time(receiver_addresses_2_list_with_time)
# ----------------------------
# Concat all events dataframe and specify each "level" it is.
df1 = exploiter_events.copy()
df2 = receiver_addresses_events.copy()
df3 = receiver_addresses_2_events.copy()
# Add the 'Layer' column
df1["Level"] = "Level 1"
df2["Level"] = "Level 2"
df3["Level"] = "Level 3"
# Concatenate the DataFrames
df_combined = pd.concat([df1, df2, df3], ignore_index=True)
df_combined.to_csv("combined_data_2.csv", index=False)
Last updated
Was this helpful?