commit 391a2fa35d91bb6ccbf0d875da38ba9c07135b9a Author: jeirmeister Date: Sun Nov 10 16:10:27 2024 -0800 Initial commit: Blockchain fraud detection analyzer Initial implementation of a tool to analyze Ethereum smart contracts for fraudulent patterns. Currently supports: - Contract deployment analysis - Early actor identification - Exchange interaction tracking - Wash trading detection - Suspicious pattern recognition Known issues: - Directory structure needs cleanup - Some indentation errors to fix - Missing proper error handling - Needs better report formatting diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a0e254c --- /dev/null +++ b/.gitignore @@ -0,0 +1,6 @@ +.env +.venv +analysis* +combined_analysis* +raw_transactions_* +*.log \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..e69de29 diff --git a/blockchain_analyzer.py b/blockchain_analyzer.py new file mode 100644 index 0000000..fef1bc4 --- /dev/null +++ b/blockchain_analyzer.py @@ -0,0 +1,428 @@ +# blockchain_analyzer.py +from dotenv import load_dotenv +import os +import json +import requests +import logging +from datetime import datetime +from typing import Dict, List, Optional +from time import sleep +from report_generator import BlockchainReportGenerator +import time + +class BlockchainAnalyzer: + def __init__(self): + load_dotenv() + self.logger = logging.getLogger(__name__) + self.api_key = os.getenv('ETHERSCAN_API_KEY') + if not self.api_key: + raise ValueError("ETHERSCAN_API_KEY not found in environment variables") + + self.base_url = 'https://api.etherscan.io/api' + + # Directory setup + self.base_dir = os.path.dirname(os.path.abspath(__file__)) + self.data_dir = os.path.join(self.base_dir, 'data') + self.directories = { + # 'raw': os.path.join(self.data_dir, 'raw'), + # 'analysis': os.path.join(self.data_dir, 'analysis'), + 'reports': os.path.join(self.data_dir, 'reports'), + 'input': os.path.join(self.base_dir, 'input') + } + + # Create directories + for directory in self.directories.values(): + os.makedirs(directory, exist_ok=True) + + # Load known addresses + self.known_addresses = self._load_known_addresses() + + # Rate limiting + self.last_request_time = 0 + self.request_interval = 0.2 # 5 requests per second max + + + def _load_known_addresses(self) -> Dict: + """Load known addresses from JSON file""" + try: + with open(os.path.join(self.directories['input'], 'known_addresses.json')) as f: + return json.load(f) + except FileNotFoundError: + self.logger.warning("known_addresses.json not found") + return { + 'exchanges': {}, + 'protocols': { + '0x7a250d5630b4cf539739df2c5dacb4c659f2488d': 'Uniswap_Router', + '0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f': 'Uniswap_Factory' + } + } + + def analyze_contract(self, contract_address: str) -> Dict: + """Main analysis starting from contract address""" + self.logger.info(f"Starting analysis for contract: {contract_address}") + + # Get contract transactions + contract_txs = self.fetch_transactions(contract_address) + if not contract_txs: + return None + + # Find deployer + deployer = self._find_deployer(contract_txs['result']) + self.logger.info(f"Found deployer: {deployer}") + + # Find key actors + key_actors = self._find_key_actors(contract_txs['result'], deployer) + self.logger.info(f"Found key actors: {[a['address'] for a in key_actors]}") + + # Get creation time + creation_time = int(contract_txs['result'][0]['timeStamp']) + + # Analyze critical period + critical_period = self._analyze_critical_period(contract_txs['result'], creation_time) + + # Analyze each key actor + analysis = { + 'contract_address': contract_address, + 'deployer': deployer, + 'creation_date': datetime.fromtimestamp(creation_time).strftime('%Y-%m-%d %H:%M:%S'), + 'key_actors': key_actors, + 'critical_period': critical_period, + 'exchange_interactions': {}, + 'wash_trading': self._analyze_wash_trading(contract_txs['result']) + } + + # Analyze exchange interactions for each actor + for actor in [deployer] + [a['address'] for a in key_actors]: + actor_txs = self.fetch_transactions(actor) + if actor_txs and actor_txs['result']: + exchange_interactions = self._analyze_exchange_interactions(actor_txs['result']) + if exchange_interactions['incoming'] or exchange_interactions['outgoing']: + analysis['exchange_interactions'][actor] = exchange_interactions + + return analysis + + def fetch_transactions(self, address: str) -> Dict: + """Fetch all transactions for an address""" + self._rate_limit() + + params = { + 'module': 'account', + 'action': 'txlist', + 'address': address, + 'startblock': 0, + 'endblock': 99999999, + 'sort': 'asc', + 'apikey': self.api_key + } + + try: + response = requests.get(self.base_url, params=params) + response.raise_for_status() + data = response.json() + + if data.get('status') == '0': + self.logger.error(f"API Error: {data.get('message')}") + return None + + return data + + except Exception as e: + self.logger.error(f"Error fetching transactions for {address}: {str(e)}") + return None + + def _rate_limit(self): + """Implement rate limiting""" + current_time = time.time() + elapsed = current_time - self.last_request_time + if elapsed < self.request_interval: + sleep(self.request_interval - elapsed) + self.last_request_time = time.time() + + def _find_deployer(self, transactions: List[Dict]) -> str: + """Find contract deployer from first transaction""" + creation_tx = next( + (tx for tx in transactions if tx['input'].startswith('0x60806040')), + None + ) + return creation_tx['from'] if creation_tx else None + + def _find_key_actors(self, transactions: List[Dict], deployer: str) -> List[Dict]: + """Find first 3 unique addresses interacting with contract""" + key_actors = [] + seen_addresses = { + addr.lower() for addr in + [deployer] + list(self.known_addresses['protocols'].keys()) + } + + for tx in transactions: + if len(key_actors) >= 3: + break + + for addr in [tx['from'], tx['to']]: + if (addr and + addr.lower() not in seen_addresses): + key_actors.append({ + 'address': addr, + 'first_interaction': tx['timeStamp'], + 'transaction_hash': tx['hash'] + }) + seen_addresses.add(addr.lower()) + + return key_actors + + def _is_liquidity_provider(self, tx: Dict) -> bool: + """Check if transaction is adding liquidity""" + return 'addLiquidityETH' in tx.get('input', '') + + def _is_suspicious_actor(self, tx: Dict) -> bool: + """Check if transaction contains suspicious patterns""" + return ( + 'pac0as' in tx.get('input', '') or + float(tx.get('value', '0')) > 1e18 or # > 1 ETH + self._is_wash_trade(tx, [tx]) + ) + + def _analyze_exchange_interactions(self, transactions: List[Dict]) -> Dict: + """Analyze interactions with known exchanges""" + interactions = { + 'incoming': [], + 'outgoing': [] + } + + exchange_addrs = {addr.lower(): label + for addr, label in self.known_addresses['exchanges'].items()} + + for tx in transactions: + from_addr = tx['from'].lower() + to_addr = tx['to'].lower() + + if from_addr in exchange_addrs: + interactions['incoming'].append({ + 'exchange': exchange_addrs[from_addr], + 'transaction': tx + }) + + if to_addr in exchange_addrs: + interactions['outgoing'].append({ + 'exchange': exchange_addrs[to_addr], + 'transaction': tx + }) + + return interactions + + def _is_wash_trade(self, tx: Dict, all_txs: List[Dict]) -> bool: + """ + Detect potential wash trading by looking for: + 1. Back-and-forth transfers between same addresses + 2. Similar amounts in short time windows + 3. Circular trading patterns + """ + WASH_TIME_WINDOW = 300 # 5 minutes + AMOUNT_SIMILARITY_THRESHOLD = 0.1 # 10% difference + + tx_timestamp = int(tx['timeStamp']) + tx_value = float(tx['value']) if tx['value'] != '0' else 0 + from_addr = tx['from'].lower() + to_addr = tx['to'].lower() + + # Skip if transaction has no value + if tx_value == 0: + return False + + # Look for related transactions in time window + related_txs = [ + t for t in all_txs + if abs(int(t['timeStamp']) - tx_timestamp) <= WASH_TIME_WINDOW + and t['hash'] != tx['hash'] + ] + + for related_tx in related_txs: + related_value = float(related_tx['value']) if related_tx['value'] != '0' else 0 + + # Skip zero-value transactions + if related_value == 0: + continue + + # Check for back-and-forth transfers + if (related_tx['from'].lower() == to_addr and + related_tx['to'].lower() == from_addr): + return True + + # Check for similar amounts + value_diff = abs(tx_value - related_value) / max(tx_value, related_value) + if value_diff <= AMOUNT_SIMILARITY_THRESHOLD: + # Check for circular pattern + if (related_tx['from'].lower() == to_addr or + related_tx['to'].lower() == from_addr): + return True + + # Check for multiple transfers with same amount + similar_amount_txs = [ + t for t in related_txs + if abs(float(t['value']) - tx_value) / tx_value <= AMOUNT_SIMILARITY_THRESHOLD + ] + if len(similar_amount_txs) >= 3: # Multiple similar transfers in short time + return True + + return False + + def _analyze_wash_trading(self, transactions: List[Dict]) -> Dict: + """Analyze wash trading patterns across all transactions""" + wash_trading_info = { + 'instances': [], + 'total_volume': 0, + 'addresses_involved': set(), + 'time_periods': [] + } + + current_period = None + + for tx in transactions: + if self._is_wash_trade(tx, transactions): + wash_trading_info['instances'].append(tx) + wash_trading_info['total_volume'] += float(tx['value']) / 1e18 # Convert to ETH + wash_trading_info['addresses_involved'].update([ + tx['from'].lower(), + tx['to'].lower() + ]) + + # Track continuous wash trading periods + timestamp = int(tx['timeStamp']) + if not current_period: + current_period = { + 'start': timestamp, + 'end': timestamp, + 'transactions': [] + } + elif timestamp - current_period['end'] > 300: # New period if gap > 5 min + wash_trading_info['time_periods'].append(current_period) + current_period = { + 'start': timestamp, + 'end': timestamp, + 'transactions': [] + } + current_period['end'] = timestamp + current_period['transactions'].append(tx) + + if current_period: + wash_trading_info['time_periods'].append(current_period) + + return wash_trading_info + + def _analyze_critical_period(self, transactions: List[Dict], creation_time: int) -> Dict: + """Analyze first hour after contract deployment""" + one_hour_later = creation_time + 3600 + critical_events = { + 'setup_phase': { + 'contract_creation': None, + 'trading_enabled': None, + 'ownership_renounced': None, + 'initial_liquidity': [] + }, + 'suspicious_patterns': { + 'rapid_transfers': [], + 'large_transfers': [], + 'wash_trading': [], + 'suspicious_functions': [] + }, + 'timeline': [] + } + + for tx in transactions: + if int(tx['timeStamp']) > one_hour_later: + break + + # Track critical events + if tx['input'].startswith('0x60806040'): + critical_events['setup_phase']['contract_creation'] = tx + elif 'enableTrading' in tx['input']: + critical_events['setup_phase']['trading_enabled'] = tx + elif 'renounceOwnership' in tx['input']: + critical_events['setup_phase']['ownership_renounced'] = tx + elif 'addLiquidityETH' in tx['input']: + critical_events['setup_phase']['initial_liquidity'].append(tx) + + # Track suspicious patterns + if float(tx['value']) > 1e18: # > 1 ETH + critical_events['suspicious_patterns']['large_transfers'].append(tx) + if 'pac0as' in tx['input']: + critical_events['suspicious_patterns']['suspicious_functions'].append(tx) + + # Detect wash trading + if self._is_wash_trade(tx, transactions): + critical_events['suspicious_patterns']['wash_trading'].append(tx) + + critical_events['timeline'].append(tx) + + return critical_events + + def _detect_suspicious_patterns(self, transactions: List[Dict]) -> List[Dict]: + """Detect suspicious transaction patterns""" + patterns = [] + + # Quick setup pattern (< 5 minutes) + setup_time = self._calculate_time_between_events( + {'critical_period': {'timeline': transactions}}, + '0x60806040', 'enableTrading' + ) + if setup_time and setup_time < 300: + patterns.append({ + 'type': 'quick_setup', + 'severity': 'high', + 'description': f'Contract enabled trading within {setup_time/60:.1f} minutes of deployment' + }) + + # Large initial liquidity followed by removal + liquidity_events = [tx for tx in transactions if 'addLiquidityETH' in tx['input']] + if len(liquidity_events) > 3: + patterns.append({ + 'type': 'liquidity_manipulation', + 'severity': 'critical', + 'description': f'Multiple liquidity additions ({len(liquidity_events)} events)' + }) + + # Suspicious function calls + suspicious_calls = [tx for tx in transactions if 'pac0as' in tx['input']] + if suspicious_calls: + patterns.append({ + 'type': 'suspicious_functions', + 'severity': 'critical', + 'description': f'Suspicious function calls detected ({len(suspicious_calls)} instances)' + }) + + return patterns + + def _calculate_time_between_events(self, data: Dict, event1: str, event2: str) -> Optional[int]: + """Calculate time between two events in seconds""" + event1_tx = next((tx for tx in data['critical_period']['timeline'] + if event1 in tx['input']), None) + event2_tx = next((tx for tx in data['critical_period']['timeline'] + if event2 in tx['input']), None) + + if event1_tx and event2_tx: + return int(event2_tx['timeStamp']) - int(event1_tx['timeStamp']) + return None + + def _calculate_value_flow(self, transactions: List[Dict]) -> Dict: + """Calculate value flow between addresses""" + value_flow = { + 'total_in': 0, + 'total_out': 0, + 'by_address': {} + } + + for tx in transactions: + value = float(tx['value']) / 1e18 # Convert to ETH + if value > 0: + from_addr = tx['from'].lower() + to_addr = tx['to'].lower() + + if to_addr not in value_flow['by_address']: + value_flow['by_address'][to_addr] = {'in': 0, 'out': 0} + if from_addr not in value_flow['by_address']: + value_flow['by_address'][from_addr] = {'in': 0, 'out': 0} + + value_flow['by_address'][to_addr]['in'] += value + value_flow['by_address'][from_addr]['out'] += value + + return value_flow diff --git a/example.env b/example.env new file mode 100644 index 0000000..7350ba5 --- /dev/null +++ b/example.env @@ -0,0 +1,5 @@ +ETHERSCAN_API_KEY= +ETH_NODE_URL=https://eth.public-rpc.com +DATA_DIR=data +READ_DIR=input +RESULTS_DIR=results diff --git a/input/known_addresses.json b/input/known_addresses.json new file mode 100644 index 0000000..f03aa75 --- /dev/null +++ b/input/known_addresses.json @@ -0,0 +1,26 @@ +{ + "exchanges": { + "0x21a31ee1afc51d94c2efccaa2092ad1028285549": "Binance_1", + "0x28c6c06298d514db089934071355e5743bf21d60": "Binance_2", + "0x3a3c006053a9b40286b9951a11be4c5808c11dc8": "Binance_3", + "0x4976a4a02f38326660d17bf34b431dc6e2eb2327": "Binance_4", + "0x4a9e49a45a4b2545cb177f79c7381a30e1dc261f": "Binance_5", + "0x4aefa39caeadd662ae31ab0ce7c8c2c9c0a013e8": "Binance_6", + "0x4fdfe365436b5273a42f135c6a6244a20404271e": "Binance_7", + "0x56eddb7aa87536c09ccc2793473599fd21a8b17f": "Binance_8", + "0x5a52e96bacdabb82fd05763e25335261b270efcb": "Binance_9", + "0x835678a611b28684005a5e2233695fb6cbbb0007": "Binance_10", + "0x9696f59e4d72e237be84ffd425dcad154bf96976": "Binance_11", + "0xa7c0d36c4698981fab42a7d8c783674c6fe2592d": "Binance_12", + "0xbe0eb53f46cd790cd13851d5eff43d12404d33e8": "Binance_13", + "0xd3a22590f8243f8e83ac230d1842c9af0404c4a1": "Binance_14", + "0xdfd5293d8e347dfe59e90efd55b2956a1343963d": "Binance_15", + "0xf977814e90da44bfa03b6295a0616a897441acec": "Binance_16", + "0x030e37ddd7df1b43db172b23916d523f1599c6cb": "Binance_17", + "0x1b46970cfe6a271e884f636663c257a5a571fb2c": "Binance_18" + }, + "protocols": { + "0x7a250d5630b4cf539739df2c5dacb4c659f2488d": "Uniswap_Router", + "0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f": "Uniswap_Factory" + } +} \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..5a1ba3d --- /dev/null +++ b/main.py @@ -0,0 +1,39 @@ +from blockchain_analyzer import BlockchainAnalyzer +from report_generator import BlockchainReportGenerator +import argparse +import logging +import os + +def main(): + parser = argparse.ArgumentParser(description='Analyze blockchain contract for suspicious activity') + parser.add_argument('contract_address', help='The contract address to analyze') + parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose logging') + + args = parser.parse_args() + + # Setup logging + log_level = logging.DEBUG if args.verbose else logging.INFO + logging.basicConfig( + level=log_level, + format='%(asctime)s - %(levelname)s - %(message)s' + ) + + try: + analyzer = BlockchainAnalyzer() + analysis = analyzer.analyze_contract(args.contract_address) + + if analysis: + report_generator = BlockchainReportGenerator() + report_path = report_generator.generate_report(analysis) + print(f"\nAnalysis complete!") + print(f"Report generated at: {report_path}") + print(f"Raw data and analysis files in: {os.path.join(os.getcwd(), 'data')}") + else: + logging.error(f"Failed to analyze contract {args.contract_address}") + + except Exception as e: + logging.error(f"Error during analysis: {str(e)}") + raise + +if __name__ == "__main__": + main() diff --git a/report_generator.py b/report_generator.py new file mode 100644 index 0000000..aec8260 --- /dev/null +++ b/report_generator.py @@ -0,0 +1,212 @@ +# report_generator.py +from string import Template +from mdutils import MdUtils +from datetime import datetime +import os +import json +from typing import Dict, List + + +class BlockchainReportGenerator: + def __init__(self, data_dir: str = 'data'): + self.data_dir = data_dir + self.template_dir = os.path.join(os.path.dirname(__file__), 'templates') + self.schema_dir = os.path.join(os.path.dirname(__file__), 'schema') + + # Create directories if they don't exist + for directory in [self.data_dir, self.template_dir, self.schema_dir]: + os.makedirs(directory, exist_ok=True) + + # Load known addresses + self.known_addresses = self._load_known_addresses() + + def _load_known_addresses(self) -> Dict: + """Load known addresses from JSON file""" + try: + with open(os.path.join('input', 'known_addresses.json')) as f: + return json.load(f) + except FileNotFoundError: + return {'exchanges': {}, 'protocols': {}} + + def generate_report(self, analysis_data: Dict) -> str: + """Generate markdown report from analysis data""" + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + report_name = f"analysis_report_{timestamp}" + + md_file = MdUtils(file_name=os.path.join(self.data_dir, 'reports', report_name)) + + # Add title + md_file.new_header(level=1, title="Blockchain Fraud Analysis Report") + md_file.new_paragraph(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + + # Add table of contents + md_file.new_table_of_contents(table_title='Contents', depth=2) + + # Add sections + self._add_executive_summary(md_file, analysis_data) + self._add_key_actors_section(md_file, analysis_data) + self._add_critical_events(md_file, analysis_data) + self._add_exchange_interactions(md_file, analysis_data) + self._add_risk_assessment(md_file, analysis_data) + self._add_technical_details(md_file, analysis_data) + self._add_transaction_legend(md_file) + + md_file.create_md_file() + return os.path.join(self.data_dir, 'reports', f"{report_name}.md") + + def _format_address(self, address: str, analysis_data: Dict) -> str: + """Format address with label if known""" + address = address.lower() + + # Check if contract or deployer + if address == analysis_data['contract_address'].lower(): + return "Contract" + if address == analysis_data['deployer'].lower(): + return "Deployer" + + # Check known exchanges + if address in self.known_addresses['exchanges']: + return f"{self.known_addresses['exchanges'][address]}" + + # Check known protocols + if address in self.known_addresses['protocols']: + return f"{self.known_addresses['protocols'][address]}" + + # For unknown addresses, show shortened version + return f"`{address[:6]}...{address[-4:]}`" + + def _add_executive_summary(self, md_file: MdUtils, data: Dict): + """Add executive summary section""" + md_file.new_header(level=2, title="Executive Summary") + + summary = [ + f"**Contract Address**: `{data['contract_address']}`", + f"**Deployment Date**: {data['creation_date']}", + f"**Deployer**: `{data['deployer']}`", + f"**Analysis Period**: First hour after deployment", + f"**Exchange Interactions**: {len(data['exchange_interactions'])}" + ] + + md_file.new_paragraph("\n".join(summary)) + + def _add_key_actors_section(self, md_file: MdUtils, data: Dict): + """Add key actors analysis section""" + md_file.new_header(level=2, title="Key Actors Analysis") + + # Add deployer info + md_file.new_header(level=3, title="Deployer") + md_file.new_paragraph(f"Address: `{data['deployer']}`") + + # Add early interactors + if data['key_actors']: + md_file.new_header(level=3, title="Early Interactors") + for actor in data['key_actors']: + md_file.new_paragraph( + f"- Address: `{actor['address']}`\n" + f" First Interaction: {datetime.fromtimestamp(int(actor['first_interaction'])).strftime('%Y-%m-%d %H:%M:%S')}" + ) + + def _add_critical_events(self, md_file: MdUtils, data: Dict): + """Add critical events timeline""" + md_file.new_header(level=2, title="Critical Events Timeline") + + critical_period = data['critical_period'] + + # Setup phase events + setup_events = [] + if critical_period['setup_phase']['contract_creation']: + setup_events.append("🔨 Contract Creation") + if critical_period['setup_phase']['trading_enabled']: + setup_events.append("🚀 Trading Enabled") + if critical_period['setup_phase']['ownership_renounced']: + setup_events.append("🔑 Ownership Renounced") + + if setup_events: + md_file.new_header(level=3, title="Setup Phase") + for event in setup_events: + md_file.new_paragraph(f"- {event}") + + def _add_exchange_interactions(self, md_file: MdUtils, data: Dict): + """Add exchange interactions section""" + md_file.new_header(level=2, title="Exchange Interactions") + + if not data['exchange_interactions']: + md_file.new_paragraph("No exchange interactions detected") + return + + for address, interactions in data['exchange_interactions'].items(): + md_file.new_header(level=3, title=f"Address: `{address}`") + + if interactions['incoming']: + md_file.new_paragraph("**Incoming Transfers**") + for tx in interactions['incoming']: + md_file.new_paragraph( + f"- From {tx['exchange']}: " + f"{float(tx['transaction']['value'])/1e18:.4f} ETH" + ) + + if interactions['outgoing']: + md_file.new_paragraph("**Outgoing Transfers**") + for tx in interactions['outgoing']: + md_file.new_paragraph( + f"- To {tx['exchange']}: " + f"{float(tx['transaction']['value'])/1e18:.4f} ETH" + ) + + def _add_risk_assessment(self, md_file: MdUtils, data: Dict): + """Add risk assessment section""" + md_file.new_header(level=2, title="Risk Assessment") + + risk_factors = [] + risk_score = 0 + + # Check for quick trading enable + if (data['critical_period']['setup_phase']['trading_enabled'] and + data['critical_period']['setup_phase']['contract_creation']): + creation_time = int(data['critical_period']['setup_phase']['contract_creation']['timeStamp']) + trading_time = int(data['critical_period']['setup_phase']['trading_enabled']['timeStamp']) + setup_time = (trading_time - creation_time) / 60 + + if setup_time < 30: + risk_factors.append(f"⚠️ CRITICAL: Quick Trading Enable ({setup_time:.1f} minutes)") + risk_score += 30 + + # Add other risk factors... + + # Overall risk rating + risk_rating = "LOW" if risk_score < 30 else "MEDIUM" if risk_score < 60 else "HIGH" if risk_score < 90 else "CRITICAL" + md_file.new_paragraph(f"**Overall Risk Rating**: {risk_rating} ({risk_score}/100)") + + if risk_factors: + for factor in risk_factors: + md_file.new_paragraph(f"- {factor}") + + def _add_technical_details(self, md_file: MdUtils, data: Dict): + """Add technical details section""" + md_file.new_header(level=2, title="Technical Details") + + # Add wash trading info if present + if 'wash_trading' in data: + wash_trading = data['wash_trading'] + md_file.new_header(level=3, title="Wash Trading Analysis") + md_file.new_paragraph( + f"- Total Instances: {len(wash_trading['instances'])}\n" + f"- Total Volume: {wash_trading['total_volume']:.4f} ETH\n" + f"- Addresses Involved: {len(wash_trading['addresses_involved'])}" + ) + + def _add_transaction_legend(self, md_file: MdUtils): + """Add transaction type explanations""" + md_file.new_header(level=2, title="Transaction Legend") + + legend = { + "Transfer": "Direct ETH transfer between addresses", + "approve": "Grants permission for tokens to be spent by another address", + "addLiquidityETH": "Adds tokens and ETH to create a trading pair", + "enableTrading": "Enables trading functionality in the contract", + "renounceOwnership": "Permanently removes owner privileges", + "pac0as": "Suspicious custom function" + } + + for action, description in legend.items(): + md_file.new_paragraph(f"**{action}**: {description}") diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..ff958dc --- /dev/null +++ b/requirements.txt @@ -0,0 +1,5 @@ +python-dotenv +web3 +pandas +mdutils +requests diff --git a/shell.nix b/shell.nix new file mode 100644 index 0000000..a328071 --- /dev/null +++ b/shell.nix @@ -0,0 +1,23 @@ +# shell.nix +{ pkgs ? import {} }: + +pkgs.mkShell { + buildInputs = with pkgs; [ + python312 + python312Packages.pip + python312Packages.virtualenv + ]; + + shellHook = '' + # Create and activate virtual environment + python -m venv .venv + source .venv/bin/activate + + # Install requirements if they exist + if [ -f requirements.txt ]; then + pip install -r requirements.txt + fi + + echo "Python virtual environment activated" + ''; +} diff --git a/templates/report_template.md b/templates/report_template.md new file mode 100644 index 0000000..25adcb8 --- /dev/null +++ b/templates/report_template.md @@ -0,0 +1,34 @@ +# Blockchain Fraud Analysis Report +Generated on: ${TIMESTAMP} + +## Table of Contents +${TOC} + +## Executive Summary +**Contract Address**: `${CONTRACT_ADDRESS}` +**Deployment Date**: ${CREATION_DATE} +**Deployer**: `${DEPLOYER_ADDRESS}` +**Analysis Period**: First hour after deployment + +## Key Actors Analysis +${ACTORS_ANALYSIS} + +## Critical Events Timeline +${CRITICAL_EVENTS} + +## Exchange Interactions +${EXCHANGE_INTERACTIONS} + +## Risk Assessment +${RISK_ASSESSMENT} + +## Technical Details +${TECHNICAL_DETAILS} + +## Transaction Legend +- **Transfer**: Direct ETH transfer between addresses +- **approve**: Grants permission for tokens to be spent by another address +- **addLiquidityETH**: Adds tokens and ETH to create a trading pair +- **enableTrading**: Enables trading functionality in the contract +- **renounceOwnership**: Permanently removes owner privileges +- **pac0as**: Suspicious custom function potentially used for manipulation diff --git a/templates/schema/blockchain_analysis_schema.json b/templates/schema/blockchain_analysis_schema.json new file mode 100644 index 0000000..91c507b --- /dev/null +++ b/templates/schema/blockchain_analysis_schema.json @@ -0,0 +1,85 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Blockchain Analysis Report", + "description": "Analysis of suspicious blockchain activity", + "type": "object", + "properties": { + "contract_address": { + "type": "string", + "description": "The analyzed contract address" + }, + "deployer": { + "type": "string", + "description": "Contract deployer address" + }, + "creation_date": { + "type": "string", + "description": "Contract creation timestamp" + }, + "key_actors": { + "type": "array", + "items": { + "type": "object", + "properties": { + "address": {"type": "string"}, + "first_interaction": {"type": "string"}, + "transaction_hash": {"type": "string"} + }, + "required": ["address", "first_interaction"] + } + }, + "critical_period": { + "type": "object", + "properties": { + "setup_phase": { + "type": "object", + "properties": { + "contract_creation": {"type": ["object", "null"]}, + "trading_enabled": {"type": ["object", "null"]}, + "ownership_renounced": {"type": ["object", "null"]}, + "initial_liquidity": { + "type": "array", + "items": {"type": "object"} + } + } + }, + "suspicious_patterns": { + "type": "object", + "properties": { + "rapid_transfers": {"type": "array"}, + "large_transfers": {"type": "array"}, + "wash_trading": {"type": "array"}, + "suspicious_functions": {"type": "array"} + } + }, + "timeline": { + "type": "array", + "items": {"type": "object"} + } + } + }, + "exchange_interactions": { + "type": "object", + "additionalProperties": { + "type": "object", + "properties": { + "incoming": {"type": "array"}, + "outgoing": {"type": "array"} + } + } + }, + "wash_trading": { + "type": "object", + "properties": { + "instances": {"type": "array"}, + "total_volume": {"type": "number"}, + "addresses_involved": { + "type": "array", + "items": {"type": "string"} + }, + "time_periods": {"type": "array"} + } + } + }, + "required": ["contract_address", "deployer", "creation_date", "critical_period"] +}