Trace stolen funds across a blockchain

Tracing blockchain transactions and wallets is critical in the aftermath of a hack, enabling investigators to follow stolen funds, uncover illicit activity, and improve transparency. Kaiko Blockchain Monitoring provides direct, UI-free access to on-chain data, ensuring full control over data extraction and analysis. Unlike proprietary interfaces that can be restrictive, this approach allows for flexible, scalable investigations tailored to evolving threats.

This guide demonstrates how to:

  • Trace fund movements across Ethereum wallets

  • Aggregate and analyze on-chain flows

  • Conduct scalable investigations without the constraints of a predefined UI

Request information on a single address or transaction

To see the balances and transactions for a single Ethereum wallet, use this request example.

curl --compressed -H "Accept: application/json" -H "X-Api-Key: <client-api-key>" \
  "https://eu.market-api.kaiko.io/v2/data/wallet.v1/audit?blockchain=ethereum"

Trace funds through several paths

To automatically trace the destination of funds, it's wise to use a custom Python script that:

  • Queries multiple wallets in a batch request

  • Automatically requests information on subesquent "hops" or flows

  • Consolidates into a final list of destinations addresses

This script dynamically traces fund flows across multiple paths, quickly generating a csv with all wallets associated with the hack up to 4 hops. You can re-run the script any time to get an instantly up-to-date picture of the stolen funds and any new associated wallets. You can also add extra steps to trace across as many hops as required.

import pandas as pd
import requests
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
import time

# Headers for the API request
KAIKO_HEADERS_DICT = {
    'Accept': 'application/json',
    'X-Api-Key': 'YOUR_API_KEY'
}

def get_data(url, headers, params=None, retries=10, delay=2):
    """
    Fetches data from the given URL with retry logic.

    Parameters:
    url (str): The URL to fetch data from.
    headers (dict): The headers for the request.
    params (dict): The parameters for the request.
    retries (int): The number of retry attempts.
    delay (int): The delay between retry attempts in seconds.

    Returns:
    dict: The JSON response from the API.
    """
    for attempt in range(retries):
        try:
            response = requests.get(url, headers=headers, params=params)
            response.raise_for_status()  # Raise an exception for HTTP errors
            return response.json()
        except requests.exceptions.RequestException:
            if attempt < retries - 1:
                time.sleep(delay)
            else:
                print('max retries exceeded')
                raise


def get_all_events_for_address(address, start_time='2024-01-01T00:00:00.000Z'):
    """
    Fetches all events for a given address starting from a specified time.
    
    Parameters:
    address (str): The user address to fetch events for.
    start_time (str): The start time for fetching events in ISO 8601 format.
    
    Returns:
    pd.DataFrame: A DataFrame containing all events for the given address.
    """
    URL = 'https://us.market-api.kaiko.io/v2/data/wallet.v1/audit'  
    data = pd.DataFrame()
    params_dict = {
        'blockchain': 'ethereum',
        'user_address': address.lower(),
        'start_time': start_time,
        'page_size':400
    }
    res = get_data(URL, KAIKO_HEADERS_DICT, params_dict)
    try:
        data = pd.concat([data, pd.DataFrame(res['data'])], ignore_index=True)
    except Exception as e:
        print(e)
    count = 0
    while 'next_url' in res.keys():
        count+=1
        if res['next_url'] == None:
            break
        try:
            res = get_data(res['next_url'], KAIKO_HEADERS_DICT)
            data = pd.concat([data, pd.DataFrame(res['data'])], ignore_index=True)
            if (count > 100):
                print(address, count)
                break
        except KeyboardInterrupt:
            print("Exit")
            break
        except Exception as e:
            print(e)
            continue
    return data


def get_all_events_for_addresses(addresses, start_time='2024-01-01T00:00:00.000Z'):
    """
    Fetches all events for multiple addresses starting from a specified time using multithreading.
    
    Parameters:
    addresses (list): A list of user addresses to fetch events for.
    start_time (str): The start time for fetching events in ISO 8601 format.
    
    Returns:
    pd.DataFrame: A DataFrame containing all events for the given addresses.
    """
    with ThreadPoolExecutor(max_workers=20) as executor:
        results = list(tqdm(executor.map(lambda addr: get_all_events_for_address(addr, start_time), addresses), total=len(addresses)))
    return pd.concat(results, ignore_index=True)


def get_all_events_for_addresses_with_time(addresses_and_time):
    """
    Fetches all events for multiple addresses with different start times using multithreading.
    
    Parameters:
    addresses_and_time (list of tuples): A list of tuples where each tuple contains an address and a start time.
    example:
        addresses_and_time = [["0x47666fab8bd0ac7003bce3f5c3585383f09486e2", '2025-01-01T04:00:00.000Z'],
        ["0xaf620e6d32b1c67f3396ef5d2f7d7642dc2e6ce9", '2021-02-21T01:00:00.000Z']]
    
    Returns:
    pd.DataFrame: A DataFrame containing all events for the given addresses and start times.
    """
    with ThreadPoolExecutor(max_workers=20) as executor:
        results = list(tqdm(executor.map(lambda addr: get_all_events_for_address(addr[0], addr[1]), addresses_and_time), total=len(addresses_and_time)))
    return pd.concat(results, ignore_index=True)

def get_all_pools():
    URL = 'https://reference-data-api.kaiko.io/v1/pools'
    data = pd.DataFrame()
    params_dict = {
        'blockchain': 'ethereum'}
    res = get_data(URL, KAIKO_HEADERS_DICT, params_dict)
    try:
        data = pd.concat([data, pd.DataFrame(res['data'])], ignore_index=True)
    except Exception as e:
        print(e)
    return data['address'].tolist()

def get_receiver_addresses_filtered_with_time(events, filter_out):
    """
    Filters and returns a list of receiver addresses with their earliest transaction timestamps,
    excluding specified addresses and keeping only those with a total amount_usd > 100.

    Parameters:
    events (pd.DataFrame): DataFrame containing transaction events.
    filter_out (set): A set of addresses to be excluded from the results.

    Returns:
    list: A list of tuples where each tuple contains a receiver address and its earliest transaction timestamp.
    """
    outgoing_events = events[events["direction"] == "out"]
    amount_sums = outgoing_events.groupby('receiver_address')['amount_usd'].sum()
    valid_addresses = amount_sums[amount_sums > 10].index
    receiver_addresses_list_with_time = outgoing_events[outgoing_events['receiver_address'].isin(valid_addresses)][["receiver_address", "timestamp"]]
    receiver_addresses_list_with_time['timestamp'] = pd.to_datetime(receiver_addresses_list_with_time['timestamp'], unit='ns').dt.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
    receiver_addresses_list_with_time = receiver_addresses_list_with_time.groupby('receiver_address')['timestamp'].min().reset_index().values.tolist()
    receiver_addresses_list_with_time = [addr for addr in receiver_addresses_list_with_time if addr[0] not in filter_out]
    return receiver_addresses_list_with_time

# ----------------------------

# Addresses to be removed from the results
addresses_to_remove = [
    "0x0000000000000000000000000000000000000000", # null address
    '', # empty address
    "0x47666fab8bd0ac7003bce3f5c3585383f09486e2", # exploiter
    "0xf89d7b9c864f589bbf53a82105107622b35eaa40", # bybit hot wallet
    "0x1f9090aae28b8a3dceadf281b0f12828e676c326", # block builder
    "0x95222290dd7278aa3ddd389cc1e1d165cc4bafe5", # block builder
    "0x4838b106fce9647bdf1e7877bf73ce8b0bad5f97", # block builder
    "0x388c818ca8b9251b393131c08a736a67ccb19297", # Lido execution builder
    "0x7e2a2fa2a064f693f0a55c5639476d913ff12d05", # mev block builder
    "0x6be457e04092b28865e0cba84e3b2cfa0f871e67", # mev block builder
    "0x3bee5122e2a2fbe11287aafb0cb918e22abb5436", # mev block builder
    "0xdadb0d80178819f2319190d340ce9a924f783711", # block builder
    "0xe688b84b23f322a994a53dbf8e15fa82cdb71127", # block fee recipient
    "0xd11d7d2cb0aff72a61df37fd016ee1bd9f180633", # mev block builder
    "0x4675c7e5baafbffbca748158becba61ef3b0a263", # mev block builder
    "0x7adc0e867ebc337e2d20c44db181c067fa08637b", # block builder
    "0x98ed2d46a27afeead62a5ea39d022a33ea4d25c1", # ?
    "0x00000000219ab540356cbb839cbe05303d7705fa", # Beacon chain deposit contract
    "0x0000000000bbf5c5fd284e657f01bd000933c96d", # Paraswap delta v2
    "0x6a000f20005980200259b80c5102003040001068", # Paraswap augustus v6.2
    "0x3fc91a3afd70395cd496c647d5a6cc9d4b2b7fad", # Uniswap universal router
    "0x1231deb6f5749ef6ce6943a275a1d3e7486f4eae", # li.fi diamond (router)
    "0x74de5d4fcbf63e00296fd95d33236b9794016631", # metamask swap
    "0x7d0ccaa3fac1e5a943c5168b6ced828691b46b36", # OKX DEX router
    "0x9008d19f58aabd9ed0d60971565aa8510560ab41", # Cow protocol settlement
    "0xd37bbe5744d730a1d98d8dc97c42f0ca46ad7146", # THORCHAIN router
]

# Get all pools on Ethereum in Kaiko's reference data (to be filtered out because they're not receivers from the exploiter)
all_pools = get_all_pools()
addresses_to_remove = addresses_to_remove + all_pools

# Level 1: Get all events from the exploiter
exploiter_address = "0x47666fab8bd0ac7003bce3f5c3585383f09486e2"
hack_time = '2025-02-21T14:00:00.000Z'
exploiter_events = get_all_events_for_address(exploiter_address, start_time=hack_time)

# Level 2: Get all events for all addresses that received funds from the exploiter
receiver_addresses_list_with_time = get_receiver_addresses_filtered_with_time(exploiter_events, addresses_to_remove)
receiver_addresses_2_events = get_all_events_for_addresses_with_time(receiver_addresses_list_with_time)

# Level 3: Get all events for all addresses that received funds from addresses that received funds from the exploiter
receiver_addresses_3_list_with_time = get_receiver_addresses_filtered_with_time(receiver_addresses_2_events, addresses_to_remove)
receiver_addresses_3_events = get_all_events_for_addresses_with_time(receiver_addresses_3_list_with_time)

# Level 4: Get all events for all addresses that received funds from addresses that received funds from the addresses that received funds from the exploiter
receiver_addresses_4_list_with_time = get_receiver_addresses_filtered_with_time(receiver_addresses_3_events, addresses_to_remove)
receiver_addresses_4_events = get_all_events_for_addresses_with_time(receiver_addresses_4_list_with_time)

# ----------------------------

# Concat all events dataframe and specify each "level" it is.
df1 = exploiter_events.copy()
df2 = receiver_addresses_2_events.copy()
df3 = receiver_addresses_3_events.copy()
df4 = receiver_addresses_4_events.copy()

# Add the 'Layer' column
df1["level"] = "Level 1"
df2["level"] = "Level 2"
df3["level"] = "Level 3"
df4["level"] = "Level 4"

# Concatenate the DataFrames
all_transfers = pd.concat([df1, df2, df3, df4], ignore_index=True)
all_transfers.to_csv("all_exploiters_and_receivers_transfers.csv", index=False)

list_addresses = all_transfers[all_transfers["direction"] == "out"].groupby(['user_address', 'level'])['amount_usd'].sum()
list_addresses.to_csv("list_addresses.csv", index=True)
```

Last updated

Was this helpful?