Trace stolen funds across a blockchain
Tracing blockchain transactions and wallets is critical in the aftermath of a hack, enabling investigators to follow stolen funds, uncover illicit activity, and improve transparency. Kaiko Blockchain Monitoring provides direct, UI-free access to on-chain data, ensuring full control over data extraction and analysis. Unlike proprietary interfaces that can be restrictive, this approach allows for flexible, scalable investigations tailored to evolving threats.
This guide demonstrates how to:
Trace fund movements across Ethereum wallets
Aggregate and analyze on-chain flows
Conduct scalable investigations without the constraints of a predefined UI
Request information on a single address or transaction
To see the balances and transactions for a single Ethereum wallet, use this request example.
curl --compressed -H "Accept: application/json" -H "X-Api-Key: <client-api-key>" \
"https://eu.market-api.kaiko.io/v2/data/wallet.v1/audit?blockchain=ethereum"
import pandas as pd
import requests
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
import time
list_wallets = ['0xf977814e90da44bfa03b6295a0616a897441acec','0xbe0eb53f46cd790cd13851d5eff43d12404d33e8']
start_time = '2025-02-27T00:00:00.000Z'
blockchain = 'ethereum'
# Headers for the API request
headers_dict = {
'Accept': 'application/json',
'X-Api-Key': 'YOUR_API_KEY'
}
# API URL for fetching Kaiko product Blockchain Monitoring
URL = 'https://us.market-api.kaiko.io/v2/data/wallet.v1/audit'
def get_data(url, headers, params=None, retries=5, delay=1):
for attempt in range(retries):
try:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status() # Raise an exception for HTTP errors
return response.json()
except requests.exceptions.RequestException as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < retries - 1:
time.sleep(delay)
else:
raise
def get_all_events_for_address(address, start_time, blockchain):
data = pd.DataFrame()
params_dict = {
'blockchain': blockchain,
'user_address': address.lower(),
'start_time': start_time
}
res = get_data(URL, headers_dict, params_dict)
try:
data = pd.concat([data, pd.DataFrame(res['data'])], ignore_index=True)
except Exception as e:
print(e)
while 'next_url' in res.keys():
if res['next_url'] == None:
break
try:
res = get_data(res['next_url'], headers_dict)
data = pd.concat([data, pd.DataFrame(res['data'])], ignore_index=True)
except KeyboardInterrupt:
print("Exit")
break
except Exception as e:
print(e)
continue
return data
def get_all_events_for_addresses(addresses, start_time):
with ThreadPoolExecutor(max_workers=20) as executor:
results = list(tqdm(executor.map(lambda addr: get_all_events_for_address(addr, start_time, blockchain), addresses), total=len(addresses)))
return pd.concat(results, ignore_index=True)
data = get_all_events_for_addresses(addresses=list_wallets, start_time=start_time, blockchain=blockchain)
Trace funds through several paths
To automatically trace the destination of funds, it's wise to use a custom Python script that:
Queries multiple wallets in a batch request
Automatically requests information on subesquent "hops" or flows
Consolidates into a final list of destinations addresses
This script dynamically traces fund flows across multiple paths, quickly generating a csv with all wallets associated with the hack up to 4 hops. You can re-run the script any time to get an instantly up-to-date picture of the stolen funds and any new associated wallets. You can also add extra steps to trace across as many hops as required.
import pandas as pd
import requests
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
import time
# Headers for the API request
KAIKO_HEADERS_DICT = {
'Accept': 'application/json',
'X-Api-Key': 'YOUR_API_KEY'
}
def get_data(url, headers, params=None, retries=10, delay=2):
"""
Fetches data from the given URL with retry logic.
Parameters:
url (str): The URL to fetch data from.
headers (dict): The headers for the request.
params (dict): The parameters for the request.
retries (int): The number of retry attempts.
delay (int): The delay between retry attempts in seconds.
Returns:
dict: The JSON response from the API.
"""
for attempt in range(retries):
try:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status() # Raise an exception for HTTP errors
return response.json()
except requests.exceptions.RequestException:
if attempt < retries - 1:
time.sleep(delay)
else:
print('max retries exceeded')
raise
def get_all_events_for_address(address, start_time='2024-01-01T00:00:00.000Z'):
"""
Fetches all events for a given address starting from a specified time.
Parameters:
address (str): The user address to fetch events for.
start_time (str): The start time for fetching events in ISO 8601 format.
Returns:
pd.DataFrame: A DataFrame containing all events for the given address.
"""
URL = 'https://us.market-api.kaiko.io/v2/data/wallet.v1/audit'
data = pd.DataFrame()
params_dict = {
'blockchain': 'ethereum',
'user_address': address.lower(),
'start_time': start_time,
'page_size':400
}
res = get_data(URL, KAIKO_HEADERS_DICT, params_dict)
try:
data = pd.concat([data, pd.DataFrame(res['data'])], ignore_index=True)
except Exception as e:
print(e)
count = 0
while 'next_url' in res.keys():
count+=1
if res['next_url'] == None:
break
try:
res = get_data(res['next_url'], KAIKO_HEADERS_DICT)
data = pd.concat([data, pd.DataFrame(res['data'])], ignore_index=True)
if (count > 100):
print(address, count)
break
except KeyboardInterrupt:
print("Exit")
break
except Exception as e:
print(e)
continue
return data
def get_all_events_for_addresses(addresses, start_time='2024-01-01T00:00:00.000Z'):
"""
Fetches all events for multiple addresses starting from a specified time using multithreading.
Parameters:
addresses (list): A list of user addresses to fetch events for.
start_time (str): The start time for fetching events in ISO 8601 format.
Returns:
pd.DataFrame: A DataFrame containing all events for the given addresses.
"""
with ThreadPoolExecutor(max_workers=20) as executor:
results = list(tqdm(executor.map(lambda addr: get_all_events_for_address(addr, start_time), addresses), total=len(addresses)))
return pd.concat(results, ignore_index=True)
def get_all_events_for_addresses_with_time(addresses_and_time):
"""
Fetches all events for multiple addresses with different start times using multithreading.
Parameters:
addresses_and_time (list of tuples): A list of tuples where each tuple contains an address and a start time.
example:
addresses_and_time = [["0x47666fab8bd0ac7003bce3f5c3585383f09486e2", '2025-01-01T04:00:00.000Z'],
["0xaf620e6d32b1c67f3396ef5d2f7d7642dc2e6ce9", '2021-02-21T01:00:00.000Z']]
Returns:
pd.DataFrame: A DataFrame containing all events for the given addresses and start times.
"""
with ThreadPoolExecutor(max_workers=20) as executor:
results = list(tqdm(executor.map(lambda addr: get_all_events_for_address(addr[0], addr[1]), addresses_and_time), total=len(addresses_and_time)))
return pd.concat(results, ignore_index=True)
def get_all_pools():
URL = 'https://reference-data-api.kaiko.io/v1/pools'
data = pd.DataFrame()
params_dict = {
'blockchain': 'ethereum'}
res = get_data(URL, KAIKO_HEADERS_DICT, params_dict)
try:
data = pd.concat([data, pd.DataFrame(res['data'])], ignore_index=True)
except Exception as e:
print(e)
return data['address'].tolist()
def get_receiver_addresses_filtered_with_time(events, filter_out):
"""
Filters and returns a list of receiver addresses with their earliest transaction timestamps,
excluding specified addresses and keeping only those with a total amount_usd > 100.
Parameters:
events (pd.DataFrame): DataFrame containing transaction events.
filter_out (set): A set of addresses to be excluded from the results.
Returns:
list: A list of tuples where each tuple contains a receiver address and its earliest transaction timestamp.
"""
outgoing_events = events[events["direction"] == "out"]
amount_sums = outgoing_events.groupby('receiver_address')['amount_usd'].sum()
valid_addresses = amount_sums[amount_sums > 10].index
receiver_addresses_list_with_time = outgoing_events[outgoing_events['receiver_address'].isin(valid_addresses)][["receiver_address", "timestamp"]]
receiver_addresses_list_with_time['timestamp'] = pd.to_datetime(receiver_addresses_list_with_time['timestamp'], unit='ns').dt.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
receiver_addresses_list_with_time = receiver_addresses_list_with_time.groupby('receiver_address')['timestamp'].min().reset_index().values.tolist()
receiver_addresses_list_with_time = [addr for addr in receiver_addresses_list_with_time if addr[0] not in filter_out]
return receiver_addresses_list_with_time
# ----------------------------
# Addresses to be removed from the results
addresses_to_remove = [
"0x0000000000000000000000000000000000000000", # null address
'', # empty address
"0x47666fab8bd0ac7003bce3f5c3585383f09486e2", # exploiter
"0xf89d7b9c864f589bbf53a82105107622b35eaa40", # bybit hot wallet
"0x1f9090aae28b8a3dceadf281b0f12828e676c326", # block builder
"0x95222290dd7278aa3ddd389cc1e1d165cc4bafe5", # block builder
"0x4838b106fce9647bdf1e7877bf73ce8b0bad5f97", # block builder
"0x388c818ca8b9251b393131c08a736a67ccb19297", # Lido execution builder
"0x7e2a2fa2a064f693f0a55c5639476d913ff12d05", # mev block builder
"0x6be457e04092b28865e0cba84e3b2cfa0f871e67", # mev block builder
"0x3bee5122e2a2fbe11287aafb0cb918e22abb5436", # mev block builder
"0xdadb0d80178819f2319190d340ce9a924f783711", # block builder
"0xe688b84b23f322a994a53dbf8e15fa82cdb71127", # block fee recipient
"0xd11d7d2cb0aff72a61df37fd016ee1bd9f180633", # mev block builder
"0x4675c7e5baafbffbca748158becba61ef3b0a263", # mev block builder
"0x7adc0e867ebc337e2d20c44db181c067fa08637b", # block builder
"0x98ed2d46a27afeead62a5ea39d022a33ea4d25c1", # ?
"0x00000000219ab540356cbb839cbe05303d7705fa", # Beacon chain deposit contract
"0x0000000000bbf5c5fd284e657f01bd000933c96d", # Paraswap delta v2
"0x6a000f20005980200259b80c5102003040001068", # Paraswap augustus v6.2
"0x3fc91a3afd70395cd496c647d5a6cc9d4b2b7fad", # Uniswap universal router
"0x1231deb6f5749ef6ce6943a275a1d3e7486f4eae", # li.fi diamond (router)
"0x74de5d4fcbf63e00296fd95d33236b9794016631", # metamask swap
"0x7d0ccaa3fac1e5a943c5168b6ced828691b46b36", # OKX DEX router
"0x9008d19f58aabd9ed0d60971565aa8510560ab41", # Cow protocol settlement
"0xd37bbe5744d730a1d98d8dc97c42f0ca46ad7146", # THORCHAIN router
]
# Get all pools on Ethereum in Kaiko's reference data (to be filtered out because they're not receivers from the exploiter)
all_pools = get_all_pools()
addresses_to_remove = addresses_to_remove + all_pools
# Level 1: Get all events from the exploiter
exploiter_address = "0x47666fab8bd0ac7003bce3f5c3585383f09486e2"
hack_time = '2025-02-21T14:00:00.000Z'
exploiter_events = get_all_events_for_address(exploiter_address, start_time=hack_time)
# Level 2: Get all events for all addresses that received funds from the exploiter
receiver_addresses_list_with_time = get_receiver_addresses_filtered_with_time(exploiter_events, addresses_to_remove)
receiver_addresses_2_events = get_all_events_for_addresses_with_time(receiver_addresses_list_with_time)
# Level 3: Get all events for all addresses that received funds from addresses that received funds from the exploiter
receiver_addresses_3_list_with_time = get_receiver_addresses_filtered_with_time(receiver_addresses_2_events, addresses_to_remove)
receiver_addresses_3_events = get_all_events_for_addresses_with_time(receiver_addresses_3_list_with_time)
# Level 4: Get all events for all addresses that received funds from addresses that received funds from the addresses that received funds from the exploiter
receiver_addresses_4_list_with_time = get_receiver_addresses_filtered_with_time(receiver_addresses_3_events, addresses_to_remove)
receiver_addresses_4_events = get_all_events_for_addresses_with_time(receiver_addresses_4_list_with_time)
# ----------------------------
# Concat all events dataframe and specify each "level" it is.
df1 = exploiter_events.copy()
df2 = receiver_addresses_2_events.copy()
df3 = receiver_addresses_3_events.copy()
df4 = receiver_addresses_4_events.copy()
# Add the 'Layer' column
df1["level"] = "Level 1"
df2["level"] = "Level 2"
df3["level"] = "Level 3"
df4["level"] = "Level 4"
# Concatenate the DataFrames
all_transfers = pd.concat([df1, df2, df3, df4], ignore_index=True)
all_transfers.to_csv("all_exploiters_and_receivers_transfers.csv", index=False)
list_addresses = all_transfers[all_transfers["direction"] == "out"].groupby(['user_address', 'level'])['amount_usd'].sum()
list_addresses.to_csv("list_addresses.csv", index=True)
```
0x000010036c0190e009a000d0fc3541100a07380a
Level 4
165439929.37011100
0x000949aef11d7b124a3c333e737af450fc70682a
Level 4
528853.1734133920
0x0014462b38e67e6c1f5e0385fbbd298abf182722
Level 4
686812.705417298
0x001c0cf0aba3614e650d591ef222ccb0f8e3a0ee
Level 4
90055.16267862360
0x00214cb533e66d062f420125b99fc60b6b3069a0
Level 3
239743.47125701400
0x00214cb533e66d062f420125b99fc60b6b3069a0
Level 4
239743.47125701400
0x0026b786684690772d87a448f9ff909669c83649
Level 4
152745.4588658880
0x002b8edf90443fca65241075709db049187b9603
Level 4
327137.64441197500
0x002fd753417a7348fdd84b4be390e399515fc488
Level 4
264196.3332642850
Last updated
Was this helpful?