Trace stolen funds across a blockchain
Request information on a single address or transaction
curl --compressed -H "Accept: application/json" -H "X-Api-Key: <client-api-key>" \
"https://eu.market-api.kaiko.io/v2/data/wallet.v1/audit?blockchain=ethereum"import pandas as pd
import requests
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
import time
list_wallets = ['0xf977814e90da44bfa03b6295a0616a897441acec','0xbe0eb53f46cd790cd13851d5eff43d12404d33e8']
start_time = '2025-02-27T00:00:00.000Z'
blockchain = 'ethereum'
# Headers for the API request
headers_dict = {
'Accept': 'application/json',
'X-Api-Key': 'YOUR_API_KEY'
}
# API URL for fetching Kaiko product Blockchain Monitoring
URL = 'https://us.market-api.kaiko.io/v2/data/wallet.v1/audit'
def get_data(url, headers, params=None, retries=5, delay=1):
for attempt in range(retries):
try:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status() # Raise an exception for HTTP errors
return response.json()
except requests.exceptions.RequestException as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < retries - 1:
time.sleep(delay)
else:
raise
def get_all_events_for_address(address, start_time, blockchain):
data = pd.DataFrame()
params_dict = {
'blockchain': blockchain,
'user_address': address.lower(),
'start_time': start_time
}
res = get_data(URL, headers_dict, params_dict)
try:
data = pd.concat([data, pd.DataFrame(res['data'])], ignore_index=True)
except Exception as e:
print(e)
while 'next_url' in res.keys():
if res['next_url'] == None:
break
try:
res = get_data(res['next_url'], headers_dict)
data = pd.concat([data, pd.DataFrame(res['data'])], ignore_index=True)
except KeyboardInterrupt:
print("Exit")
break
except Exception as e:
print(e)
continue
return data
def get_all_events_for_addresses(addresses, start_time):
with ThreadPoolExecutor(max_workers=20) as executor:
results = list(tqdm(executor.map(lambda addr: get_all_events_for_address(addr, start_time, blockchain), addresses), total=len(addresses)))
return pd.concat(results, ignore_index=True)
data = get_all_events_for_addresses(addresses=list_wallets, start_time=start_time, blockchain=blockchain)
Trace funds through several paths
import pandas as pd
import requests
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
import time
# Headers for the API request
KAIKO_HEADERS_DICT = {
'Accept': 'application/json',
'X-Api-Key': 'YOUR_API_KEY'
}
def get_data(url, headers, params=None, retries=10, delay=2):
"""
Fetches data from the given URL with retry logic.
Parameters:
url (str): The URL to fetch data from.
headers (dict): The headers for the request.
params (dict): The parameters for the request.
retries (int): The number of retry attempts.
delay (int): The delay between retry attempts in seconds.
Returns:
dict: The JSON response from the API.
"""
for attempt in range(retries):
try:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status() # Raise an exception for HTTP errors
return response.json()
except requests.exceptions.RequestException:
if attempt < retries - 1:
time.sleep(delay)
else:
print('max retries exceeded')
raise
def get_all_events_for_address(address, start_time='2024-01-01T00:00:00.000Z'):
"""
Fetches all events for a given address starting from a specified time.
Parameters:
address (str): The user address to fetch events for.
start_time (str): The start time for fetching events in ISO 8601 format.
Returns:
pd.DataFrame: A DataFrame containing all events for the given address.
"""
URL = 'https://us.market-api.kaiko.io/v2/data/wallet.v1/audit'
data = pd.DataFrame()
params_dict = {
'blockchain': 'ethereum',
'user_address': address.lower(),
'start_time': start_time,
'page_size':400
}
res = get_data(URL, KAIKO_HEADERS_DICT, params_dict)
try:
data = pd.concat([data, pd.DataFrame(res['data'])], ignore_index=True)
except Exception as e:
print(e)
count = 0
while 'next_url' in res.keys():
count+=1
if res['next_url'] == None:
break
try:
res = get_data(res['next_url'], KAIKO_HEADERS_DICT)
data = pd.concat([data, pd.DataFrame(res['data'])], ignore_index=True)
if (count > 100):
print(address, count)
break
except KeyboardInterrupt:
print("Exit")
break
except Exception as e:
print(e)
continue
return data
def get_all_events_for_addresses(addresses, start_time='2024-01-01T00:00:00.000Z'):
"""
Fetches all events for multiple addresses starting from a specified time using multithreading.
Parameters:
addresses (list): A list of user addresses to fetch events for.
start_time (str): The start time for fetching events in ISO 8601 format.
Returns:
pd.DataFrame: A DataFrame containing all events for the given addresses.
"""
with ThreadPoolExecutor(max_workers=20) as executor:
results = list(tqdm(executor.map(lambda addr: get_all_events_for_address(addr, start_time), addresses), total=len(addresses)))
return pd.concat(results, ignore_index=True)
def get_all_events_for_addresses_with_time(addresses_and_time):
"""
Fetches all events for multiple addresses with different start times using multithreading.
Parameters:
addresses_and_time (list of tuples): A list of tuples where each tuple contains an address and a start time.
example:
addresses_and_time = [["0x47666fab8bd0ac7003bce3f5c3585383f09486e2", '2025-01-01T04:00:00.000Z'],
["0xaf620e6d32b1c67f3396ef5d2f7d7642dc2e6ce9", '2021-02-21T01:00:00.000Z']]
Returns:
pd.DataFrame: A DataFrame containing all events for the given addresses and start times.
"""
with ThreadPoolExecutor(max_workers=20) as executor:
results = list(tqdm(executor.map(lambda addr: get_all_events_for_address(addr[0], addr[1]), addresses_and_time), total=len(addresses_and_time)))
return pd.concat(results, ignore_index=True)
def get_all_pools():
URL = 'https://reference-data-api.kaiko.io/v1/pools'
data = pd.DataFrame()
params_dict = {
'blockchain': 'ethereum'}
res = get_data(URL, KAIKO_HEADERS_DICT, params_dict)
try:
data = pd.concat([data, pd.DataFrame(res['data'])], ignore_index=True)
except Exception as e:
print(e)
return data['address'].tolist()
def get_receiver_addresses_filtered_with_time(events, filter_out):
"""
Filters and returns a list of receiver addresses with their earliest transaction timestamps,
excluding specified addresses and keeping only those with a total amount_usd > 100.
Parameters:
events (pd.DataFrame): DataFrame containing transaction events.
filter_out (set): A set of addresses to be excluded from the results.
Returns:
list: A list of tuples where each tuple contains a receiver address and its earliest transaction timestamp.
"""
outgoing_events = events[events["direction"] == "out"]
amount_sums = outgoing_events.groupby('receiver_address')['amount_usd'].sum()
valid_addresses = amount_sums[amount_sums > 10].index
receiver_addresses_list_with_time = outgoing_events[outgoing_events['receiver_address'].isin(valid_addresses)][["receiver_address", "timestamp"]]
receiver_addresses_list_with_time['timestamp'] = pd.to_datetime(receiver_addresses_list_with_time['timestamp'], unit='ns').dt.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
receiver_addresses_list_with_time = receiver_addresses_list_with_time.groupby('receiver_address')['timestamp'].min().reset_index().values.tolist()
receiver_addresses_list_with_time = [addr for addr in receiver_addresses_list_with_time if addr[0] not in filter_out]
return receiver_addresses_list_with_time
# ----------------------------
# Addresses to be removed from the results
addresses_to_remove = [
"0x0000000000000000000000000000000000000000", # null address
'', # empty address
"0x47666fab8bd0ac7003bce3f5c3585383f09486e2", # exploiter
"0xf89d7b9c864f589bbf53a82105107622b35eaa40", # bybit hot wallet
"0x1f9090aae28b8a3dceadf281b0f12828e676c326", # block builder
"0x95222290dd7278aa3ddd389cc1e1d165cc4bafe5", # block builder
"0x4838b106fce9647bdf1e7877bf73ce8b0bad5f97", # block builder
"0x388c818ca8b9251b393131c08a736a67ccb19297", # Lido execution builder
"0x7e2a2fa2a064f693f0a55c5639476d913ff12d05", # mev block builder
"0x6be457e04092b28865e0cba84e3b2cfa0f871e67", # mev block builder
"0x3bee5122e2a2fbe11287aafb0cb918e22abb5436", # mev block builder
"0xdadb0d80178819f2319190d340ce9a924f783711", # block builder
"0xe688b84b23f322a994a53dbf8e15fa82cdb71127", # block fee recipient
"0xd11d7d2cb0aff72a61df37fd016ee1bd9f180633", # mev block builder
"0x4675c7e5baafbffbca748158becba61ef3b0a263", # mev block builder
"0x7adc0e867ebc337e2d20c44db181c067fa08637b", # block builder
"0x98ed2d46a27afeead62a5ea39d022a33ea4d25c1", # ?
"0x00000000219ab540356cbb839cbe05303d7705fa", # Beacon chain deposit contract
"0x0000000000bbf5c5fd284e657f01bd000933c96d", # Paraswap delta v2
"0x6a000f20005980200259b80c5102003040001068", # Paraswap augustus v6.2
"0x3fc91a3afd70395cd496c647d5a6cc9d4b2b7fad", # Uniswap universal router
"0x1231deb6f5749ef6ce6943a275a1d3e7486f4eae", # li.fi diamond (router)
"0x74de5d4fcbf63e00296fd95d33236b9794016631", # metamask swap
"0x7d0ccaa3fac1e5a943c5168b6ced828691b46b36", # OKX DEX router
"0x9008d19f58aabd9ed0d60971565aa8510560ab41", # Cow protocol settlement
"0xd37bbe5744d730a1d98d8dc97c42f0ca46ad7146", # THORCHAIN router
]
# Get all pools on Ethereum in Kaiko's reference data (to be filtered out because they're not receivers from the exploiter)
all_pools = get_all_pools()
addresses_to_remove = addresses_to_remove + all_pools
# Level 1: Get all events from the exploiter
exploiter_address = "0x47666fab8bd0ac7003bce3f5c3585383f09486e2"
hack_time = '2025-02-21T14:00:00.000Z'
exploiter_events = get_all_events_for_address(exploiter_address, start_time=hack_time)
# Level 2: Get all events for all addresses that received funds from the exploiter
receiver_addresses_list_with_time = get_receiver_addresses_filtered_with_time(exploiter_events, addresses_to_remove)
receiver_addresses_2_events = get_all_events_for_addresses_with_time(receiver_addresses_list_with_time)
# Level 3: Get all events for all addresses that received funds from addresses that received funds from the exploiter
receiver_addresses_3_list_with_time = get_receiver_addresses_filtered_with_time(receiver_addresses_2_events, addresses_to_remove)
receiver_addresses_3_events = get_all_events_for_addresses_with_time(receiver_addresses_3_list_with_time)
# Level 4: Get all events for all addresses that received funds from addresses that received funds from the addresses that received funds from the exploiter
receiver_addresses_4_list_with_time = get_receiver_addresses_filtered_with_time(receiver_addresses_3_events, addresses_to_remove)
receiver_addresses_4_events = get_all_events_for_addresses_with_time(receiver_addresses_4_list_with_time)
# ----------------------------
# Concat all events dataframe and specify each "level" it is.
df1 = exploiter_events.copy()
df2 = receiver_addresses_2_events.copy()
df3 = receiver_addresses_3_events.copy()
df4 = receiver_addresses_4_events.copy()
# Add the 'Layer' column
df1["level"] = "Level 1"
df2["level"] = "Level 2"
df3["level"] = "Level 3"
df4["level"] = "Level 4"
# Concatenate the DataFrames
all_transfers = pd.concat([df1, df2, df3, df4], ignore_index=True)
all_transfers.to_csv("all_exploiters_and_receivers_transfers.csv", index=False)
list_addresses = all_transfers[all_transfers["direction"] == "out"].groupby(['user_address', 'level'])['amount_usd'].sum()
list_addresses.to_csv("list_addresses.csv", index=True)
```User_address
Level
Amount_usd
Last updated
Was this helpful?
