blockchain-risk-analyzer/blockchain_analyzer.py
jeirmeister 391a2fa35d Initial commit: Blockchain fraud detection analyzer
Initial implementation of a tool to analyze Ethereum smart contracts for fraudulent patterns.
Currently supports:
- Contract deployment analysis
- Early actor identification
- Exchange interaction tracking
- Wash trading detection
- Suspicious pattern recognition

Known issues:
- Directory structure needs cleanup
- Some indentation errors to fix
- Missing proper error handling
- Needs better report formatting
2024-11-10 16:10:27 -08:00

429 lines
16 KiB
Python

# blockchain_analyzer.py
from dotenv import load_dotenv
import os
import json
import requests
import logging
from datetime import datetime
from typing import Dict, List, Optional
from time import sleep
from report_generator import BlockchainReportGenerator
import time
class BlockchainAnalyzer:
def __init__(self):
load_dotenv()
self.logger = logging.getLogger(__name__)
self.api_key = os.getenv('ETHERSCAN_API_KEY')
if not self.api_key:
raise ValueError("ETHERSCAN_API_KEY not found in environment variables")
self.base_url = 'https://api.etherscan.io/api'
# Directory setup
self.base_dir = os.path.dirname(os.path.abspath(__file__))
self.data_dir = os.path.join(self.base_dir, 'data')
self.directories = {
# 'raw': os.path.join(self.data_dir, 'raw'),
# 'analysis': os.path.join(self.data_dir, 'analysis'),
'reports': os.path.join(self.data_dir, 'reports'),
'input': os.path.join(self.base_dir, 'input')
}
# Create directories
for directory in self.directories.values():
os.makedirs(directory, exist_ok=True)
# Load known addresses
self.known_addresses = self._load_known_addresses()
# Rate limiting
self.last_request_time = 0
self.request_interval = 0.2 # 5 requests per second max
def _load_known_addresses(self) -> Dict:
"""Load known addresses from JSON file"""
try:
with open(os.path.join(self.directories['input'], 'known_addresses.json')) as f:
return json.load(f)
except FileNotFoundError:
self.logger.warning("known_addresses.json not found")
return {
'exchanges': {},
'protocols': {
'0x7a250d5630b4cf539739df2c5dacb4c659f2488d': 'Uniswap_Router',
'0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f': 'Uniswap_Factory'
}
}
def analyze_contract(self, contract_address: str) -> Dict:
"""Main analysis starting from contract address"""
self.logger.info(f"Starting analysis for contract: {contract_address}")
# Get contract transactions
contract_txs = self.fetch_transactions(contract_address)
if not contract_txs:
return None
# Find deployer
deployer = self._find_deployer(contract_txs['result'])
self.logger.info(f"Found deployer: {deployer}")
# Find key actors
key_actors = self._find_key_actors(contract_txs['result'], deployer)
self.logger.info(f"Found key actors: {[a['address'] for a in key_actors]}")
# Get creation time
creation_time = int(contract_txs['result'][0]['timeStamp'])
# Analyze critical period
critical_period = self._analyze_critical_period(contract_txs['result'], creation_time)
# Analyze each key actor
analysis = {
'contract_address': contract_address,
'deployer': deployer,
'creation_date': datetime.fromtimestamp(creation_time).strftime('%Y-%m-%d %H:%M:%S'),
'key_actors': key_actors,
'critical_period': critical_period,
'exchange_interactions': {},
'wash_trading': self._analyze_wash_trading(contract_txs['result'])
}
# Analyze exchange interactions for each actor
for actor in [deployer] + [a['address'] for a in key_actors]:
actor_txs = self.fetch_transactions(actor)
if actor_txs and actor_txs['result']:
exchange_interactions = self._analyze_exchange_interactions(actor_txs['result'])
if exchange_interactions['incoming'] or exchange_interactions['outgoing']:
analysis['exchange_interactions'][actor] = exchange_interactions
return analysis
def fetch_transactions(self, address: str) -> Dict:
"""Fetch all transactions for an address"""
self._rate_limit()
params = {
'module': 'account',
'action': 'txlist',
'address': address,
'startblock': 0,
'endblock': 99999999,
'sort': 'asc',
'apikey': self.api_key
}
try:
response = requests.get(self.base_url, params=params)
response.raise_for_status()
data = response.json()
if data.get('status') == '0':
self.logger.error(f"API Error: {data.get('message')}")
return None
return data
except Exception as e:
self.logger.error(f"Error fetching transactions for {address}: {str(e)}")
return None
def _rate_limit(self):
"""Implement rate limiting"""
current_time = time.time()
elapsed = current_time - self.last_request_time
if elapsed < self.request_interval:
sleep(self.request_interval - elapsed)
self.last_request_time = time.time()
def _find_deployer(self, transactions: List[Dict]) -> str:
"""Find contract deployer from first transaction"""
creation_tx = next(
(tx for tx in transactions if tx['input'].startswith('0x60806040')),
None
)
return creation_tx['from'] if creation_tx else None
def _find_key_actors(self, transactions: List[Dict], deployer: str) -> List[Dict]:
"""Find first 3 unique addresses interacting with contract"""
key_actors = []
seen_addresses = {
addr.lower() for addr in
[deployer] + list(self.known_addresses['protocols'].keys())
}
for tx in transactions:
if len(key_actors) >= 3:
break
for addr in [tx['from'], tx['to']]:
if (addr and
addr.lower() not in seen_addresses):
key_actors.append({
'address': addr,
'first_interaction': tx['timeStamp'],
'transaction_hash': tx['hash']
})
seen_addresses.add(addr.lower())
return key_actors
def _is_liquidity_provider(self, tx: Dict) -> bool:
"""Check if transaction is adding liquidity"""
return 'addLiquidityETH' in tx.get('input', '')
def _is_suspicious_actor(self, tx: Dict) -> bool:
"""Check if transaction contains suspicious patterns"""
return (
'pac0as' in tx.get('input', '') or
float(tx.get('value', '0')) > 1e18 or # > 1 ETH
self._is_wash_trade(tx, [tx])
)
def _analyze_exchange_interactions(self, transactions: List[Dict]) -> Dict:
"""Analyze interactions with known exchanges"""
interactions = {
'incoming': [],
'outgoing': []
}
exchange_addrs = {addr.lower(): label
for addr, label in self.known_addresses['exchanges'].items()}
for tx in transactions:
from_addr = tx['from'].lower()
to_addr = tx['to'].lower()
if from_addr in exchange_addrs:
interactions['incoming'].append({
'exchange': exchange_addrs[from_addr],
'transaction': tx
})
if to_addr in exchange_addrs:
interactions['outgoing'].append({
'exchange': exchange_addrs[to_addr],
'transaction': tx
})
return interactions
def _is_wash_trade(self, tx: Dict, all_txs: List[Dict]) -> bool:
"""
Detect potential wash trading by looking for:
1. Back-and-forth transfers between same addresses
2. Similar amounts in short time windows
3. Circular trading patterns
"""
WASH_TIME_WINDOW = 300 # 5 minutes
AMOUNT_SIMILARITY_THRESHOLD = 0.1 # 10% difference
tx_timestamp = int(tx['timeStamp'])
tx_value = float(tx['value']) if tx['value'] != '0' else 0
from_addr = tx['from'].lower()
to_addr = tx['to'].lower()
# Skip if transaction has no value
if tx_value == 0:
return False
# Look for related transactions in time window
related_txs = [
t for t in all_txs
if abs(int(t['timeStamp']) - tx_timestamp) <= WASH_TIME_WINDOW
and t['hash'] != tx['hash']
]
for related_tx in related_txs:
related_value = float(related_tx['value']) if related_tx['value'] != '0' else 0
# Skip zero-value transactions
if related_value == 0:
continue
# Check for back-and-forth transfers
if (related_tx['from'].lower() == to_addr and
related_tx['to'].lower() == from_addr):
return True
# Check for similar amounts
value_diff = abs(tx_value - related_value) / max(tx_value, related_value)
if value_diff <= AMOUNT_SIMILARITY_THRESHOLD:
# Check for circular pattern
if (related_tx['from'].lower() == to_addr or
related_tx['to'].lower() == from_addr):
return True
# Check for multiple transfers with same amount
similar_amount_txs = [
t for t in related_txs
if abs(float(t['value']) - tx_value) / tx_value <= AMOUNT_SIMILARITY_THRESHOLD
]
if len(similar_amount_txs) >= 3: # Multiple similar transfers in short time
return True
return False
def _analyze_wash_trading(self, transactions: List[Dict]) -> Dict:
"""Analyze wash trading patterns across all transactions"""
wash_trading_info = {
'instances': [],
'total_volume': 0,
'addresses_involved': set(),
'time_periods': []
}
current_period = None
for tx in transactions:
if self._is_wash_trade(tx, transactions):
wash_trading_info['instances'].append(tx)
wash_trading_info['total_volume'] += float(tx['value']) / 1e18 # Convert to ETH
wash_trading_info['addresses_involved'].update([
tx['from'].lower(),
tx['to'].lower()
])
# Track continuous wash trading periods
timestamp = int(tx['timeStamp'])
if not current_period:
current_period = {
'start': timestamp,
'end': timestamp,
'transactions': []
}
elif timestamp - current_period['end'] > 300: # New period if gap > 5 min
wash_trading_info['time_periods'].append(current_period)
current_period = {
'start': timestamp,
'end': timestamp,
'transactions': []
}
current_period['end'] = timestamp
current_period['transactions'].append(tx)
if current_period:
wash_trading_info['time_periods'].append(current_period)
return wash_trading_info
def _analyze_critical_period(self, transactions: List[Dict], creation_time: int) -> Dict:
"""Analyze first hour after contract deployment"""
one_hour_later = creation_time + 3600
critical_events = {
'setup_phase': {
'contract_creation': None,
'trading_enabled': None,
'ownership_renounced': None,
'initial_liquidity': []
},
'suspicious_patterns': {
'rapid_transfers': [],
'large_transfers': [],
'wash_trading': [],
'suspicious_functions': []
},
'timeline': []
}
for tx in transactions:
if int(tx['timeStamp']) > one_hour_later:
break
# Track critical events
if tx['input'].startswith('0x60806040'):
critical_events['setup_phase']['contract_creation'] = tx
elif 'enableTrading' in tx['input']:
critical_events['setup_phase']['trading_enabled'] = tx
elif 'renounceOwnership' in tx['input']:
critical_events['setup_phase']['ownership_renounced'] = tx
elif 'addLiquidityETH' in tx['input']:
critical_events['setup_phase']['initial_liquidity'].append(tx)
# Track suspicious patterns
if float(tx['value']) > 1e18: # > 1 ETH
critical_events['suspicious_patterns']['large_transfers'].append(tx)
if 'pac0as' in tx['input']:
critical_events['suspicious_patterns']['suspicious_functions'].append(tx)
# Detect wash trading
if self._is_wash_trade(tx, transactions):
critical_events['suspicious_patterns']['wash_trading'].append(tx)
critical_events['timeline'].append(tx)
return critical_events
def _detect_suspicious_patterns(self, transactions: List[Dict]) -> List[Dict]:
"""Detect suspicious transaction patterns"""
patterns = []
# Quick setup pattern (< 5 minutes)
setup_time = self._calculate_time_between_events(
{'critical_period': {'timeline': transactions}},
'0x60806040', 'enableTrading'
)
if setup_time and setup_time < 300:
patterns.append({
'type': 'quick_setup',
'severity': 'high',
'description': f'Contract enabled trading within {setup_time/60:.1f} minutes of deployment'
})
# Large initial liquidity followed by removal
liquidity_events = [tx for tx in transactions if 'addLiquidityETH' in tx['input']]
if len(liquidity_events) > 3:
patterns.append({
'type': 'liquidity_manipulation',
'severity': 'critical',
'description': f'Multiple liquidity additions ({len(liquidity_events)} events)'
})
# Suspicious function calls
suspicious_calls = [tx for tx in transactions if 'pac0as' in tx['input']]
if suspicious_calls:
patterns.append({
'type': 'suspicious_functions',
'severity': 'critical',
'description': f'Suspicious function calls detected ({len(suspicious_calls)} instances)'
})
return patterns
def _calculate_time_between_events(self, data: Dict, event1: str, event2: str) -> Optional[int]:
"""Calculate time between two events in seconds"""
event1_tx = next((tx for tx in data['critical_period']['timeline']
if event1 in tx['input']), None)
event2_tx = next((tx for tx in data['critical_period']['timeline']
if event2 in tx['input']), None)
if event1_tx and event2_tx:
return int(event2_tx['timeStamp']) - int(event1_tx['timeStamp'])
return None
def _calculate_value_flow(self, transactions: List[Dict]) -> Dict:
"""Calculate value flow between addresses"""
value_flow = {
'total_in': 0,
'total_out': 0,
'by_address': {}
}
for tx in transactions:
value = float(tx['value']) / 1e18 # Convert to ETH
if value > 0:
from_addr = tx['from'].lower()
to_addr = tx['to'].lower()
if to_addr not in value_flow['by_address']:
value_flow['by_address'][to_addr] = {'in': 0, 'out': 0}
if from_addr not in value_flow['by_address']:
value_flow['by_address'][from_addr] = {'in': 0, 'out': 0}
value_flow['by_address'][to_addr]['in'] += value
value_flow['by_address'][from_addr]['out'] += value
return value_flow