# blockchain_analyzer.py from dotenv import load_dotenv import os import json import requests import logging from datetime import datetime from typing import Dict, List, Optional from time import sleep from report_generator import BlockchainReportGenerator import time class BlockchainAnalyzer: def __init__(self): load_dotenv() self.logger = logging.getLogger(__name__) self.api_key = os.getenv('ETHERSCAN_API_KEY') if not self.api_key: raise ValueError("ETHERSCAN_API_KEY not found in environment variables") self.base_url = 'https://api.etherscan.io/api' # Directory setup self.base_dir = os.path.dirname(os.path.abspath(__file__)) self.data_dir = os.path.join(self.base_dir, 'data') self.directories = { # 'raw': os.path.join(self.data_dir, 'raw'), # 'analysis': os.path.join(self.data_dir, 'analysis'), 'reports': os.path.join(self.data_dir, 'reports'), 'input': os.path.join(self.base_dir, 'input') } # Create directories for directory in self.directories.values(): os.makedirs(directory, exist_ok=True) # Load known addresses self.known_addresses = self._load_known_addresses() # Rate limiting self.last_request_time = 0 self.request_interval = 0.2 # 5 requests per second max def _load_known_addresses(self) -> Dict: """Load known addresses from JSON file""" try: with open(os.path.join(self.directories['input'], 'known_addresses.json')) as f: return json.load(f) except FileNotFoundError: self.logger.warning("known_addresses.json not found") return { 'exchanges': {}, 'protocols': { '0x7a250d5630b4cf539739df2c5dacb4c659f2488d': 'Uniswap_Router', '0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f': 'Uniswap_Factory' } } def analyze_contract(self, contract_address: str) -> Dict: """Main analysis starting from contract address""" self.logger.info(f"Starting analysis for contract: {contract_address}") # Get contract transactions contract_txs = self.fetch_transactions(contract_address) if not contract_txs: return None # Find deployer deployer = self._find_deployer(contract_txs['result']) self.logger.info(f"Found deployer: {deployer}") # Find key actors key_actors = self._find_key_actors(contract_txs['result'], deployer) self.logger.info(f"Found key actors: {[a['address'] for a in key_actors]}") # Get creation time creation_time = int(contract_txs['result'][0]['timeStamp']) # Analyze critical period critical_period = self._analyze_critical_period(contract_txs['result'], creation_time) # Analyze each key actor analysis = { 'contract_address': contract_address, 'deployer': deployer, 'creation_date': datetime.fromtimestamp(creation_time).strftime('%Y-%m-%d %H:%M:%S'), 'key_actors': key_actors, 'critical_period': critical_period, 'exchange_interactions': {}, 'wash_trading': self._analyze_wash_trading(contract_txs['result']) } # Analyze exchange interactions for each actor for actor in [deployer] + [a['address'] for a in key_actors]: actor_txs = self.fetch_transactions(actor) if actor_txs and actor_txs['result']: exchange_interactions = self._analyze_exchange_interactions(actor_txs['result']) if exchange_interactions['incoming'] or exchange_interactions['outgoing']: analysis['exchange_interactions'][actor] = exchange_interactions return analysis def fetch_transactions(self, address: str) -> Dict: """Fetch all transactions for an address""" self._rate_limit() params = { 'module': 'account', 'action': 'txlist', 'address': address, 'startblock': 0, 'endblock': 99999999, 'sort': 'asc', 'apikey': self.api_key } try: response = requests.get(self.base_url, params=params) response.raise_for_status() data = response.json() if data.get('status') == '0': self.logger.error(f"API Error: {data.get('message')}") return None return data except Exception as e: self.logger.error(f"Error fetching transactions for {address}: {str(e)}") return None def _rate_limit(self): """Implement rate limiting""" current_time = time.time() elapsed = current_time - self.last_request_time if elapsed < self.request_interval: sleep(self.request_interval - elapsed) self.last_request_time = time.time() def _find_deployer(self, transactions: List[Dict]) -> str: """Find contract deployer from first transaction""" creation_tx = next( (tx for tx in transactions if tx['input'].startswith('0x60806040')), None ) return creation_tx['from'] if creation_tx else None def _find_key_actors(self, transactions: List[Dict], deployer: str) -> List[Dict]: """Find first 3 unique addresses interacting with contract""" key_actors = [] seen_addresses = { addr.lower() for addr in [deployer] + list(self.known_addresses['protocols'].keys()) } for tx in transactions: if len(key_actors) >= 3: break for addr in [tx['from'], tx['to']]: if (addr and addr.lower() not in seen_addresses): key_actors.append({ 'address': addr, 'first_interaction': tx['timeStamp'], 'transaction_hash': tx['hash'] }) seen_addresses.add(addr.lower()) return key_actors def _is_liquidity_provider(self, tx: Dict) -> bool: """Check if transaction is adding liquidity""" return 'addLiquidityETH' in tx.get('input', '') def _is_suspicious_actor(self, tx: Dict) -> bool: """Check if transaction contains suspicious patterns""" return ( 'pac0as' in tx.get('input', '') or float(tx.get('value', '0')) > 1e18 or # > 1 ETH self._is_wash_trade(tx, [tx]) ) def _analyze_exchange_interactions(self, transactions: List[Dict]) -> Dict: """Analyze interactions with known exchanges""" interactions = { 'incoming': [], 'outgoing': [] } exchange_addrs = {addr.lower(): label for addr, label in self.known_addresses['exchanges'].items()} for tx in transactions: from_addr = tx['from'].lower() to_addr = tx['to'].lower() if from_addr in exchange_addrs: interactions['incoming'].append({ 'exchange': exchange_addrs[from_addr], 'transaction': tx }) if to_addr in exchange_addrs: interactions['outgoing'].append({ 'exchange': exchange_addrs[to_addr], 'transaction': tx }) return interactions def _is_wash_trade(self, tx: Dict, all_txs: List[Dict]) -> bool: """ Detect potential wash trading by looking for: 1. Back-and-forth transfers between same addresses 2. Similar amounts in short time windows 3. Circular trading patterns """ WASH_TIME_WINDOW = 300 # 5 minutes AMOUNT_SIMILARITY_THRESHOLD = 0.1 # 10% difference tx_timestamp = int(tx['timeStamp']) tx_value = float(tx['value']) if tx['value'] != '0' else 0 from_addr = tx['from'].lower() to_addr = tx['to'].lower() # Skip if transaction has no value if tx_value == 0: return False # Look for related transactions in time window related_txs = [ t for t in all_txs if abs(int(t['timeStamp']) - tx_timestamp) <= WASH_TIME_WINDOW and t['hash'] != tx['hash'] ] for related_tx in related_txs: related_value = float(related_tx['value']) if related_tx['value'] != '0' else 0 # Skip zero-value transactions if related_value == 0: continue # Check for back-and-forth transfers if (related_tx['from'].lower() == to_addr and related_tx['to'].lower() == from_addr): return True # Check for similar amounts value_diff = abs(tx_value - related_value) / max(tx_value, related_value) if value_diff <= AMOUNT_SIMILARITY_THRESHOLD: # Check for circular pattern if (related_tx['from'].lower() == to_addr or related_tx['to'].lower() == from_addr): return True # Check for multiple transfers with same amount similar_amount_txs = [ t for t in related_txs if abs(float(t['value']) - tx_value) / tx_value <= AMOUNT_SIMILARITY_THRESHOLD ] if len(similar_amount_txs) >= 3: # Multiple similar transfers in short time return True return False def _analyze_wash_trading(self, transactions: List[Dict]) -> Dict: """Analyze wash trading patterns across all transactions""" wash_trading_info = { 'instances': [], 'total_volume': 0, 'addresses_involved': set(), 'time_periods': [] } current_period = None for tx in transactions: if self._is_wash_trade(tx, transactions): wash_trading_info['instances'].append(tx) wash_trading_info['total_volume'] += float(tx['value']) / 1e18 # Convert to ETH wash_trading_info['addresses_involved'].update([ tx['from'].lower(), tx['to'].lower() ]) # Track continuous wash trading periods timestamp = int(tx['timeStamp']) if not current_period: current_period = { 'start': timestamp, 'end': timestamp, 'transactions': [] } elif timestamp - current_period['end'] > 300: # New period if gap > 5 min wash_trading_info['time_periods'].append(current_period) current_period = { 'start': timestamp, 'end': timestamp, 'transactions': [] } current_period['end'] = timestamp current_period['transactions'].append(tx) if current_period: wash_trading_info['time_periods'].append(current_period) return wash_trading_info def _analyze_critical_period(self, transactions: List[Dict], creation_time: int) -> Dict: """Analyze first hour after contract deployment""" one_hour_later = creation_time + 3600 critical_events = { 'setup_phase': { 'contract_creation': None, 'trading_enabled': None, 'ownership_renounced': None, 'initial_liquidity': [] }, 'suspicious_patterns': { 'rapid_transfers': [], 'large_transfers': [], 'wash_trading': [], 'suspicious_functions': [] }, 'timeline': [] } for tx in transactions: if int(tx['timeStamp']) > one_hour_later: break # Track critical events if tx['input'].startswith('0x60806040'): critical_events['setup_phase']['contract_creation'] = tx elif 'enableTrading' in tx['input']: critical_events['setup_phase']['trading_enabled'] = tx elif 'renounceOwnership' in tx['input']: critical_events['setup_phase']['ownership_renounced'] = tx elif 'addLiquidityETH' in tx['input']: critical_events['setup_phase']['initial_liquidity'].append(tx) # Track suspicious patterns if float(tx['value']) > 1e18: # > 1 ETH critical_events['suspicious_patterns']['large_transfers'].append(tx) if 'pac0as' in tx['input']: critical_events['suspicious_patterns']['suspicious_functions'].append(tx) # Detect wash trading if self._is_wash_trade(tx, transactions): critical_events['suspicious_patterns']['wash_trading'].append(tx) critical_events['timeline'].append(tx) return critical_events def _detect_suspicious_patterns(self, transactions: List[Dict]) -> List[Dict]: """Detect suspicious transaction patterns""" patterns = [] # Quick setup pattern (< 5 minutes) setup_time = self._calculate_time_between_events( {'critical_period': {'timeline': transactions}}, '0x60806040', 'enableTrading' ) if setup_time and setup_time < 300: patterns.append({ 'type': 'quick_setup', 'severity': 'high', 'description': f'Contract enabled trading within {setup_time/60:.1f} minutes of deployment' }) # Large initial liquidity followed by removal liquidity_events = [tx for tx in transactions if 'addLiquidityETH' in tx['input']] if len(liquidity_events) > 3: patterns.append({ 'type': 'liquidity_manipulation', 'severity': 'critical', 'description': f'Multiple liquidity additions ({len(liquidity_events)} events)' }) # Suspicious function calls suspicious_calls = [tx for tx in transactions if 'pac0as' in tx['input']] if suspicious_calls: patterns.append({ 'type': 'suspicious_functions', 'severity': 'critical', 'description': f'Suspicious function calls detected ({len(suspicious_calls)} instances)' }) return patterns def _calculate_time_between_events(self, data: Dict, event1: str, event2: str) -> Optional[int]: """Calculate time between two events in seconds""" event1_tx = next((tx for tx in data['critical_period']['timeline'] if event1 in tx['input']), None) event2_tx = next((tx for tx in data['critical_period']['timeline'] if event2 in tx['input']), None) if event1_tx and event2_tx: return int(event2_tx['timeStamp']) - int(event1_tx['timeStamp']) return None def _calculate_value_flow(self, transactions: List[Dict]) -> Dict: """Calculate value flow between addresses""" value_flow = { 'total_in': 0, 'total_out': 0, 'by_address': {} } for tx in transactions: value = float(tx['value']) / 1e18 # Convert to ETH if value > 0: from_addr = tx['from'].lower() to_addr = tx['to'].lower() if to_addr not in value_flow['by_address']: value_flow['by_address'][to_addr] = {'in': 0, 'out': 0} if from_addr not in value_flow['by_address']: value_flow['by_address'][from_addr] = {'in': 0, 'out': 0} value_flow['by_address'][to_addr]['in'] += value value_flow['by_address'][from_addr]['out'] += value return value_flow