jeirmeister
391a2fa35d
Initial implementation of a tool to analyze Ethereum smart contracts for fraudulent patterns. Currently supports: - Contract deployment analysis - Early actor identification - Exchange interaction tracking - Wash trading detection - Suspicious pattern recognition Known issues: - Directory structure needs cleanup - Some indentation errors to fix - Missing proper error handling - Needs better report formatting
429 lines
16 KiB
Python
429 lines
16 KiB
Python
# blockchain_analyzer.py
|
|
from dotenv import load_dotenv
|
|
import os
|
|
import json
|
|
import requests
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional
|
|
from time import sleep
|
|
from report_generator import BlockchainReportGenerator
|
|
import time
|
|
|
|
class BlockchainAnalyzer:
|
|
def __init__(self):
|
|
load_dotenv()
|
|
self.logger = logging.getLogger(__name__)
|
|
self.api_key = os.getenv('ETHERSCAN_API_KEY')
|
|
if not self.api_key:
|
|
raise ValueError("ETHERSCAN_API_KEY not found in environment variables")
|
|
|
|
self.base_url = 'https://api.etherscan.io/api'
|
|
|
|
# Directory setup
|
|
self.base_dir = os.path.dirname(os.path.abspath(__file__))
|
|
self.data_dir = os.path.join(self.base_dir, 'data')
|
|
self.directories = {
|
|
# 'raw': os.path.join(self.data_dir, 'raw'),
|
|
# 'analysis': os.path.join(self.data_dir, 'analysis'),
|
|
'reports': os.path.join(self.data_dir, 'reports'),
|
|
'input': os.path.join(self.base_dir, 'input')
|
|
}
|
|
|
|
# Create directories
|
|
for directory in self.directories.values():
|
|
os.makedirs(directory, exist_ok=True)
|
|
|
|
# Load known addresses
|
|
self.known_addresses = self._load_known_addresses()
|
|
|
|
# Rate limiting
|
|
self.last_request_time = 0
|
|
self.request_interval = 0.2 # 5 requests per second max
|
|
|
|
|
|
def _load_known_addresses(self) -> Dict:
|
|
"""Load known addresses from JSON file"""
|
|
try:
|
|
with open(os.path.join(self.directories['input'], 'known_addresses.json')) as f:
|
|
return json.load(f)
|
|
except FileNotFoundError:
|
|
self.logger.warning("known_addresses.json not found")
|
|
return {
|
|
'exchanges': {},
|
|
'protocols': {
|
|
'0x7a250d5630b4cf539739df2c5dacb4c659f2488d': 'Uniswap_Router',
|
|
'0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f': 'Uniswap_Factory'
|
|
}
|
|
}
|
|
|
|
def analyze_contract(self, contract_address: str) -> Dict:
|
|
"""Main analysis starting from contract address"""
|
|
self.logger.info(f"Starting analysis for contract: {contract_address}")
|
|
|
|
# Get contract transactions
|
|
contract_txs = self.fetch_transactions(contract_address)
|
|
if not contract_txs:
|
|
return None
|
|
|
|
# Find deployer
|
|
deployer = self._find_deployer(contract_txs['result'])
|
|
self.logger.info(f"Found deployer: {deployer}")
|
|
|
|
# Find key actors
|
|
key_actors = self._find_key_actors(contract_txs['result'], deployer)
|
|
self.logger.info(f"Found key actors: {[a['address'] for a in key_actors]}")
|
|
|
|
# Get creation time
|
|
creation_time = int(contract_txs['result'][0]['timeStamp'])
|
|
|
|
# Analyze critical period
|
|
critical_period = self._analyze_critical_period(contract_txs['result'], creation_time)
|
|
|
|
# Analyze each key actor
|
|
analysis = {
|
|
'contract_address': contract_address,
|
|
'deployer': deployer,
|
|
'creation_date': datetime.fromtimestamp(creation_time).strftime('%Y-%m-%d %H:%M:%S'),
|
|
'key_actors': key_actors,
|
|
'critical_period': critical_period,
|
|
'exchange_interactions': {},
|
|
'wash_trading': self._analyze_wash_trading(contract_txs['result'])
|
|
}
|
|
|
|
# Analyze exchange interactions for each actor
|
|
for actor in [deployer] + [a['address'] for a in key_actors]:
|
|
actor_txs = self.fetch_transactions(actor)
|
|
if actor_txs and actor_txs['result']:
|
|
exchange_interactions = self._analyze_exchange_interactions(actor_txs['result'])
|
|
if exchange_interactions['incoming'] or exchange_interactions['outgoing']:
|
|
analysis['exchange_interactions'][actor] = exchange_interactions
|
|
|
|
return analysis
|
|
|
|
def fetch_transactions(self, address: str) -> Dict:
|
|
"""Fetch all transactions for an address"""
|
|
self._rate_limit()
|
|
|
|
params = {
|
|
'module': 'account',
|
|
'action': 'txlist',
|
|
'address': address,
|
|
'startblock': 0,
|
|
'endblock': 99999999,
|
|
'sort': 'asc',
|
|
'apikey': self.api_key
|
|
}
|
|
|
|
try:
|
|
response = requests.get(self.base_url, params=params)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
|
|
if data.get('status') == '0':
|
|
self.logger.error(f"API Error: {data.get('message')}")
|
|
return None
|
|
|
|
return data
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error fetching transactions for {address}: {str(e)}")
|
|
return None
|
|
|
|
def _rate_limit(self):
|
|
"""Implement rate limiting"""
|
|
current_time = time.time()
|
|
elapsed = current_time - self.last_request_time
|
|
if elapsed < self.request_interval:
|
|
sleep(self.request_interval - elapsed)
|
|
self.last_request_time = time.time()
|
|
|
|
def _find_deployer(self, transactions: List[Dict]) -> str:
|
|
"""Find contract deployer from first transaction"""
|
|
creation_tx = next(
|
|
(tx for tx in transactions if tx['input'].startswith('0x60806040')),
|
|
None
|
|
)
|
|
return creation_tx['from'] if creation_tx else None
|
|
|
|
def _find_key_actors(self, transactions: List[Dict], deployer: str) -> List[Dict]:
|
|
"""Find first 3 unique addresses interacting with contract"""
|
|
key_actors = []
|
|
seen_addresses = {
|
|
addr.lower() for addr in
|
|
[deployer] + list(self.known_addresses['protocols'].keys())
|
|
}
|
|
|
|
for tx in transactions:
|
|
if len(key_actors) >= 3:
|
|
break
|
|
|
|
for addr in [tx['from'], tx['to']]:
|
|
if (addr and
|
|
addr.lower() not in seen_addresses):
|
|
key_actors.append({
|
|
'address': addr,
|
|
'first_interaction': tx['timeStamp'],
|
|
'transaction_hash': tx['hash']
|
|
})
|
|
seen_addresses.add(addr.lower())
|
|
|
|
return key_actors
|
|
|
|
def _is_liquidity_provider(self, tx: Dict) -> bool:
|
|
"""Check if transaction is adding liquidity"""
|
|
return 'addLiquidityETH' in tx.get('input', '')
|
|
|
|
def _is_suspicious_actor(self, tx: Dict) -> bool:
|
|
"""Check if transaction contains suspicious patterns"""
|
|
return (
|
|
'pac0as' in tx.get('input', '') or
|
|
float(tx.get('value', '0')) > 1e18 or # > 1 ETH
|
|
self._is_wash_trade(tx, [tx])
|
|
)
|
|
|
|
def _analyze_exchange_interactions(self, transactions: List[Dict]) -> Dict:
|
|
"""Analyze interactions with known exchanges"""
|
|
interactions = {
|
|
'incoming': [],
|
|
'outgoing': []
|
|
}
|
|
|
|
exchange_addrs = {addr.lower(): label
|
|
for addr, label in self.known_addresses['exchanges'].items()}
|
|
|
|
for tx in transactions:
|
|
from_addr = tx['from'].lower()
|
|
to_addr = tx['to'].lower()
|
|
|
|
if from_addr in exchange_addrs:
|
|
interactions['incoming'].append({
|
|
'exchange': exchange_addrs[from_addr],
|
|
'transaction': tx
|
|
})
|
|
|
|
if to_addr in exchange_addrs:
|
|
interactions['outgoing'].append({
|
|
'exchange': exchange_addrs[to_addr],
|
|
'transaction': tx
|
|
})
|
|
|
|
return interactions
|
|
|
|
def _is_wash_trade(self, tx: Dict, all_txs: List[Dict]) -> bool:
|
|
"""
|
|
Detect potential wash trading by looking for:
|
|
1. Back-and-forth transfers between same addresses
|
|
2. Similar amounts in short time windows
|
|
3. Circular trading patterns
|
|
"""
|
|
WASH_TIME_WINDOW = 300 # 5 minutes
|
|
AMOUNT_SIMILARITY_THRESHOLD = 0.1 # 10% difference
|
|
|
|
tx_timestamp = int(tx['timeStamp'])
|
|
tx_value = float(tx['value']) if tx['value'] != '0' else 0
|
|
from_addr = tx['from'].lower()
|
|
to_addr = tx['to'].lower()
|
|
|
|
# Skip if transaction has no value
|
|
if tx_value == 0:
|
|
return False
|
|
|
|
# Look for related transactions in time window
|
|
related_txs = [
|
|
t for t in all_txs
|
|
if abs(int(t['timeStamp']) - tx_timestamp) <= WASH_TIME_WINDOW
|
|
and t['hash'] != tx['hash']
|
|
]
|
|
|
|
for related_tx in related_txs:
|
|
related_value = float(related_tx['value']) if related_tx['value'] != '0' else 0
|
|
|
|
# Skip zero-value transactions
|
|
if related_value == 0:
|
|
continue
|
|
|
|
# Check for back-and-forth transfers
|
|
if (related_tx['from'].lower() == to_addr and
|
|
related_tx['to'].lower() == from_addr):
|
|
return True
|
|
|
|
# Check for similar amounts
|
|
value_diff = abs(tx_value - related_value) / max(tx_value, related_value)
|
|
if value_diff <= AMOUNT_SIMILARITY_THRESHOLD:
|
|
# Check for circular pattern
|
|
if (related_tx['from'].lower() == to_addr or
|
|
related_tx['to'].lower() == from_addr):
|
|
return True
|
|
|
|
# Check for multiple transfers with same amount
|
|
similar_amount_txs = [
|
|
t for t in related_txs
|
|
if abs(float(t['value']) - tx_value) / tx_value <= AMOUNT_SIMILARITY_THRESHOLD
|
|
]
|
|
if len(similar_amount_txs) >= 3: # Multiple similar transfers in short time
|
|
return True
|
|
|
|
return False
|
|
|
|
def _analyze_wash_trading(self, transactions: List[Dict]) -> Dict:
|
|
"""Analyze wash trading patterns across all transactions"""
|
|
wash_trading_info = {
|
|
'instances': [],
|
|
'total_volume': 0,
|
|
'addresses_involved': set(),
|
|
'time_periods': []
|
|
}
|
|
|
|
current_period = None
|
|
|
|
for tx in transactions:
|
|
if self._is_wash_trade(tx, transactions):
|
|
wash_trading_info['instances'].append(tx)
|
|
wash_trading_info['total_volume'] += float(tx['value']) / 1e18 # Convert to ETH
|
|
wash_trading_info['addresses_involved'].update([
|
|
tx['from'].lower(),
|
|
tx['to'].lower()
|
|
])
|
|
|
|
# Track continuous wash trading periods
|
|
timestamp = int(tx['timeStamp'])
|
|
if not current_period:
|
|
current_period = {
|
|
'start': timestamp,
|
|
'end': timestamp,
|
|
'transactions': []
|
|
}
|
|
elif timestamp - current_period['end'] > 300: # New period if gap > 5 min
|
|
wash_trading_info['time_periods'].append(current_period)
|
|
current_period = {
|
|
'start': timestamp,
|
|
'end': timestamp,
|
|
'transactions': []
|
|
}
|
|
current_period['end'] = timestamp
|
|
current_period['transactions'].append(tx)
|
|
|
|
if current_period:
|
|
wash_trading_info['time_periods'].append(current_period)
|
|
|
|
return wash_trading_info
|
|
|
|
def _analyze_critical_period(self, transactions: List[Dict], creation_time: int) -> Dict:
|
|
"""Analyze first hour after contract deployment"""
|
|
one_hour_later = creation_time + 3600
|
|
critical_events = {
|
|
'setup_phase': {
|
|
'contract_creation': None,
|
|
'trading_enabled': None,
|
|
'ownership_renounced': None,
|
|
'initial_liquidity': []
|
|
},
|
|
'suspicious_patterns': {
|
|
'rapid_transfers': [],
|
|
'large_transfers': [],
|
|
'wash_trading': [],
|
|
'suspicious_functions': []
|
|
},
|
|
'timeline': []
|
|
}
|
|
|
|
for tx in transactions:
|
|
if int(tx['timeStamp']) > one_hour_later:
|
|
break
|
|
|
|
# Track critical events
|
|
if tx['input'].startswith('0x60806040'):
|
|
critical_events['setup_phase']['contract_creation'] = tx
|
|
elif 'enableTrading' in tx['input']:
|
|
critical_events['setup_phase']['trading_enabled'] = tx
|
|
elif 'renounceOwnership' in tx['input']:
|
|
critical_events['setup_phase']['ownership_renounced'] = tx
|
|
elif 'addLiquidityETH' in tx['input']:
|
|
critical_events['setup_phase']['initial_liquidity'].append(tx)
|
|
|
|
# Track suspicious patterns
|
|
if float(tx['value']) > 1e18: # > 1 ETH
|
|
critical_events['suspicious_patterns']['large_transfers'].append(tx)
|
|
if 'pac0as' in tx['input']:
|
|
critical_events['suspicious_patterns']['suspicious_functions'].append(tx)
|
|
|
|
# Detect wash trading
|
|
if self._is_wash_trade(tx, transactions):
|
|
critical_events['suspicious_patterns']['wash_trading'].append(tx)
|
|
|
|
critical_events['timeline'].append(tx)
|
|
|
|
return critical_events
|
|
|
|
def _detect_suspicious_patterns(self, transactions: List[Dict]) -> List[Dict]:
|
|
"""Detect suspicious transaction patterns"""
|
|
patterns = []
|
|
|
|
# Quick setup pattern (< 5 minutes)
|
|
setup_time = self._calculate_time_between_events(
|
|
{'critical_period': {'timeline': transactions}},
|
|
'0x60806040', 'enableTrading'
|
|
)
|
|
if setup_time and setup_time < 300:
|
|
patterns.append({
|
|
'type': 'quick_setup',
|
|
'severity': 'high',
|
|
'description': f'Contract enabled trading within {setup_time/60:.1f} minutes of deployment'
|
|
})
|
|
|
|
# Large initial liquidity followed by removal
|
|
liquidity_events = [tx for tx in transactions if 'addLiquidityETH' in tx['input']]
|
|
if len(liquidity_events) > 3:
|
|
patterns.append({
|
|
'type': 'liquidity_manipulation',
|
|
'severity': 'critical',
|
|
'description': f'Multiple liquidity additions ({len(liquidity_events)} events)'
|
|
})
|
|
|
|
# Suspicious function calls
|
|
suspicious_calls = [tx for tx in transactions if 'pac0as' in tx['input']]
|
|
if suspicious_calls:
|
|
patterns.append({
|
|
'type': 'suspicious_functions',
|
|
'severity': 'critical',
|
|
'description': f'Suspicious function calls detected ({len(suspicious_calls)} instances)'
|
|
})
|
|
|
|
return patterns
|
|
|
|
def _calculate_time_between_events(self, data: Dict, event1: str, event2: str) -> Optional[int]:
|
|
"""Calculate time between two events in seconds"""
|
|
event1_tx = next((tx for tx in data['critical_period']['timeline']
|
|
if event1 in tx['input']), None)
|
|
event2_tx = next((tx for tx in data['critical_period']['timeline']
|
|
if event2 in tx['input']), None)
|
|
|
|
if event1_tx and event2_tx:
|
|
return int(event2_tx['timeStamp']) - int(event1_tx['timeStamp'])
|
|
return None
|
|
|
|
def _calculate_value_flow(self, transactions: List[Dict]) -> Dict:
|
|
"""Calculate value flow between addresses"""
|
|
value_flow = {
|
|
'total_in': 0,
|
|
'total_out': 0,
|
|
'by_address': {}
|
|
}
|
|
|
|
for tx in transactions:
|
|
value = float(tx['value']) / 1e18 # Convert to ETH
|
|
if value > 0:
|
|
from_addr = tx['from'].lower()
|
|
to_addr = tx['to'].lower()
|
|
|
|
if to_addr not in value_flow['by_address']:
|
|
value_flow['by_address'][to_addr] = {'in': 0, 'out': 0}
|
|
if from_addr not in value_flow['by_address']:
|
|
value_flow['by_address'][from_addr] = {'in': 0, 'out': 0}
|
|
|
|
value_flow['by_address'][to_addr]['in'] += value
|
|
value_flow['by_address'][from_addr]['out'] += value
|
|
|
|
return value_flow
|