Initial commit: Blockchain fraud detection analyzer

Initial implementation of a tool to analyze Ethereum smart contracts for fraudulent patterns.
Currently supports:
- Contract deployment analysis
- Early actor identification
- Exchange interaction tracking
- Wash trading detection
- Suspicious pattern recognition

Known issues:
- Directory structure needs cleanup
- Some indentation errors to fix
- Missing proper error handling
- Needs better report formatting
This commit is contained in:
jeirmeister 2024-11-10 16:10:27 -08:00
commit 391a2fa35d
11 changed files with 863 additions and 0 deletions

6
.gitignore vendored Normal file
View File

@ -0,0 +1,6 @@
.env
.venv
analysis*
combined_analysis*
raw_transactions_*
*.log

0
README.md Normal file
View File

428
blockchain_analyzer.py Normal file
View File

@ -0,0 +1,428 @@
# blockchain_analyzer.py
from dotenv import load_dotenv
import os
import json
import requests
import logging
from datetime import datetime
from typing import Dict, List, Optional
from time import sleep
from report_generator import BlockchainReportGenerator
import time
class BlockchainAnalyzer:
def __init__(self):
load_dotenv()
self.logger = logging.getLogger(__name__)
self.api_key = os.getenv('ETHERSCAN_API_KEY')
if not self.api_key:
raise ValueError("ETHERSCAN_API_KEY not found in environment variables")
self.base_url = 'https://api.etherscan.io/api'
# Directory setup
self.base_dir = os.path.dirname(os.path.abspath(__file__))
self.data_dir = os.path.join(self.base_dir, 'data')
self.directories = {
# 'raw': os.path.join(self.data_dir, 'raw'),
# 'analysis': os.path.join(self.data_dir, 'analysis'),
'reports': os.path.join(self.data_dir, 'reports'),
'input': os.path.join(self.base_dir, 'input')
}
# Create directories
for directory in self.directories.values():
os.makedirs(directory, exist_ok=True)
# Load known addresses
self.known_addresses = self._load_known_addresses()
# Rate limiting
self.last_request_time = 0
self.request_interval = 0.2 # 5 requests per second max
def _load_known_addresses(self) -> Dict:
"""Load known addresses from JSON file"""
try:
with open(os.path.join(self.directories['input'], 'known_addresses.json')) as f:
return json.load(f)
except FileNotFoundError:
self.logger.warning("known_addresses.json not found")
return {
'exchanges': {},
'protocols': {
'0x7a250d5630b4cf539739df2c5dacb4c659f2488d': 'Uniswap_Router',
'0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f': 'Uniswap_Factory'
}
}
def analyze_contract(self, contract_address: str) -> Dict:
"""Main analysis starting from contract address"""
self.logger.info(f"Starting analysis for contract: {contract_address}")
# Get contract transactions
contract_txs = self.fetch_transactions(contract_address)
if not contract_txs:
return None
# Find deployer
deployer = self._find_deployer(contract_txs['result'])
self.logger.info(f"Found deployer: {deployer}")
# Find key actors
key_actors = self._find_key_actors(contract_txs['result'], deployer)
self.logger.info(f"Found key actors: {[a['address'] for a in key_actors]}")
# Get creation time
creation_time = int(contract_txs['result'][0]['timeStamp'])
# Analyze critical period
critical_period = self._analyze_critical_period(contract_txs['result'], creation_time)
# Analyze each key actor
analysis = {
'contract_address': contract_address,
'deployer': deployer,
'creation_date': datetime.fromtimestamp(creation_time).strftime('%Y-%m-%d %H:%M:%S'),
'key_actors': key_actors,
'critical_period': critical_period,
'exchange_interactions': {},
'wash_trading': self._analyze_wash_trading(contract_txs['result'])
}
# Analyze exchange interactions for each actor
for actor in [deployer] + [a['address'] for a in key_actors]:
actor_txs = self.fetch_transactions(actor)
if actor_txs and actor_txs['result']:
exchange_interactions = self._analyze_exchange_interactions(actor_txs['result'])
if exchange_interactions['incoming'] or exchange_interactions['outgoing']:
analysis['exchange_interactions'][actor] = exchange_interactions
return analysis
def fetch_transactions(self, address: str) -> Dict:
"""Fetch all transactions for an address"""
self._rate_limit()
params = {
'module': 'account',
'action': 'txlist',
'address': address,
'startblock': 0,
'endblock': 99999999,
'sort': 'asc',
'apikey': self.api_key
}
try:
response = requests.get(self.base_url, params=params)
response.raise_for_status()
data = response.json()
if data.get('status') == '0':
self.logger.error(f"API Error: {data.get('message')}")
return None
return data
except Exception as e:
self.logger.error(f"Error fetching transactions for {address}: {str(e)}")
return None
def _rate_limit(self):
"""Implement rate limiting"""
current_time = time.time()
elapsed = current_time - self.last_request_time
if elapsed < self.request_interval:
sleep(self.request_interval - elapsed)
self.last_request_time = time.time()
def _find_deployer(self, transactions: List[Dict]) -> str:
"""Find contract deployer from first transaction"""
creation_tx = next(
(tx for tx in transactions if tx['input'].startswith('0x60806040')),
None
)
return creation_tx['from'] if creation_tx else None
def _find_key_actors(self, transactions: List[Dict], deployer: str) -> List[Dict]:
"""Find first 3 unique addresses interacting with contract"""
key_actors = []
seen_addresses = {
addr.lower() for addr in
[deployer] + list(self.known_addresses['protocols'].keys())
}
for tx in transactions:
if len(key_actors) >= 3:
break
for addr in [tx['from'], tx['to']]:
if (addr and
addr.lower() not in seen_addresses):
key_actors.append({
'address': addr,
'first_interaction': tx['timeStamp'],
'transaction_hash': tx['hash']
})
seen_addresses.add(addr.lower())
return key_actors
def _is_liquidity_provider(self, tx: Dict) -> bool:
"""Check if transaction is adding liquidity"""
return 'addLiquidityETH' in tx.get('input', '')
def _is_suspicious_actor(self, tx: Dict) -> bool:
"""Check if transaction contains suspicious patterns"""
return (
'pac0as' in tx.get('input', '') or
float(tx.get('value', '0')) > 1e18 or # > 1 ETH
self._is_wash_trade(tx, [tx])
)
def _analyze_exchange_interactions(self, transactions: List[Dict]) -> Dict:
"""Analyze interactions with known exchanges"""
interactions = {
'incoming': [],
'outgoing': []
}
exchange_addrs = {addr.lower(): label
for addr, label in self.known_addresses['exchanges'].items()}
for tx in transactions:
from_addr = tx['from'].lower()
to_addr = tx['to'].lower()
if from_addr in exchange_addrs:
interactions['incoming'].append({
'exchange': exchange_addrs[from_addr],
'transaction': tx
})
if to_addr in exchange_addrs:
interactions['outgoing'].append({
'exchange': exchange_addrs[to_addr],
'transaction': tx
})
return interactions
def _is_wash_trade(self, tx: Dict, all_txs: List[Dict]) -> bool:
"""
Detect potential wash trading by looking for:
1. Back-and-forth transfers between same addresses
2. Similar amounts in short time windows
3. Circular trading patterns
"""
WASH_TIME_WINDOW = 300 # 5 minutes
AMOUNT_SIMILARITY_THRESHOLD = 0.1 # 10% difference
tx_timestamp = int(tx['timeStamp'])
tx_value = float(tx['value']) if tx['value'] != '0' else 0
from_addr = tx['from'].lower()
to_addr = tx['to'].lower()
# Skip if transaction has no value
if tx_value == 0:
return False
# Look for related transactions in time window
related_txs = [
t for t in all_txs
if abs(int(t['timeStamp']) - tx_timestamp) <= WASH_TIME_WINDOW
and t['hash'] != tx['hash']
]
for related_tx in related_txs:
related_value = float(related_tx['value']) if related_tx['value'] != '0' else 0
# Skip zero-value transactions
if related_value == 0:
continue
# Check for back-and-forth transfers
if (related_tx['from'].lower() == to_addr and
related_tx['to'].lower() == from_addr):
return True
# Check for similar amounts
value_diff = abs(tx_value - related_value) / max(tx_value, related_value)
if value_diff <= AMOUNT_SIMILARITY_THRESHOLD:
# Check for circular pattern
if (related_tx['from'].lower() == to_addr or
related_tx['to'].lower() == from_addr):
return True
# Check for multiple transfers with same amount
similar_amount_txs = [
t for t in related_txs
if abs(float(t['value']) - tx_value) / tx_value <= AMOUNT_SIMILARITY_THRESHOLD
]
if len(similar_amount_txs) >= 3: # Multiple similar transfers in short time
return True
return False
def _analyze_wash_trading(self, transactions: List[Dict]) -> Dict:
"""Analyze wash trading patterns across all transactions"""
wash_trading_info = {
'instances': [],
'total_volume': 0,
'addresses_involved': set(),
'time_periods': []
}
current_period = None
for tx in transactions:
if self._is_wash_trade(tx, transactions):
wash_trading_info['instances'].append(tx)
wash_trading_info['total_volume'] += float(tx['value']) / 1e18 # Convert to ETH
wash_trading_info['addresses_involved'].update([
tx['from'].lower(),
tx['to'].lower()
])
# Track continuous wash trading periods
timestamp = int(tx['timeStamp'])
if not current_period:
current_period = {
'start': timestamp,
'end': timestamp,
'transactions': []
}
elif timestamp - current_period['end'] > 300: # New period if gap > 5 min
wash_trading_info['time_periods'].append(current_period)
current_period = {
'start': timestamp,
'end': timestamp,
'transactions': []
}
current_period['end'] = timestamp
current_period['transactions'].append(tx)
if current_period:
wash_trading_info['time_periods'].append(current_period)
return wash_trading_info
def _analyze_critical_period(self, transactions: List[Dict], creation_time: int) -> Dict:
"""Analyze first hour after contract deployment"""
one_hour_later = creation_time + 3600
critical_events = {
'setup_phase': {
'contract_creation': None,
'trading_enabled': None,
'ownership_renounced': None,
'initial_liquidity': []
},
'suspicious_patterns': {
'rapid_transfers': [],
'large_transfers': [],
'wash_trading': [],
'suspicious_functions': []
},
'timeline': []
}
for tx in transactions:
if int(tx['timeStamp']) > one_hour_later:
break
# Track critical events
if tx['input'].startswith('0x60806040'):
critical_events['setup_phase']['contract_creation'] = tx
elif 'enableTrading' in tx['input']:
critical_events['setup_phase']['trading_enabled'] = tx
elif 'renounceOwnership' in tx['input']:
critical_events['setup_phase']['ownership_renounced'] = tx
elif 'addLiquidityETH' in tx['input']:
critical_events['setup_phase']['initial_liquidity'].append(tx)
# Track suspicious patterns
if float(tx['value']) > 1e18: # > 1 ETH
critical_events['suspicious_patterns']['large_transfers'].append(tx)
if 'pac0as' in tx['input']:
critical_events['suspicious_patterns']['suspicious_functions'].append(tx)
# Detect wash trading
if self._is_wash_trade(tx, transactions):
critical_events['suspicious_patterns']['wash_trading'].append(tx)
critical_events['timeline'].append(tx)
return critical_events
def _detect_suspicious_patterns(self, transactions: List[Dict]) -> List[Dict]:
"""Detect suspicious transaction patterns"""
patterns = []
# Quick setup pattern (< 5 minutes)
setup_time = self._calculate_time_between_events(
{'critical_period': {'timeline': transactions}},
'0x60806040', 'enableTrading'
)
if setup_time and setup_time < 300:
patterns.append({
'type': 'quick_setup',
'severity': 'high',
'description': f'Contract enabled trading within {setup_time/60:.1f} minutes of deployment'
})
# Large initial liquidity followed by removal
liquidity_events = [tx for tx in transactions if 'addLiquidityETH' in tx['input']]
if len(liquidity_events) > 3:
patterns.append({
'type': 'liquidity_manipulation',
'severity': 'critical',
'description': f'Multiple liquidity additions ({len(liquidity_events)} events)'
})
# Suspicious function calls
suspicious_calls = [tx for tx in transactions if 'pac0as' in tx['input']]
if suspicious_calls:
patterns.append({
'type': 'suspicious_functions',
'severity': 'critical',
'description': f'Suspicious function calls detected ({len(suspicious_calls)} instances)'
})
return patterns
def _calculate_time_between_events(self, data: Dict, event1: str, event2: str) -> Optional[int]:
"""Calculate time between two events in seconds"""
event1_tx = next((tx for tx in data['critical_period']['timeline']
if event1 in tx['input']), None)
event2_tx = next((tx for tx in data['critical_period']['timeline']
if event2 in tx['input']), None)
if event1_tx and event2_tx:
return int(event2_tx['timeStamp']) - int(event1_tx['timeStamp'])
return None
def _calculate_value_flow(self, transactions: List[Dict]) -> Dict:
"""Calculate value flow between addresses"""
value_flow = {
'total_in': 0,
'total_out': 0,
'by_address': {}
}
for tx in transactions:
value = float(tx['value']) / 1e18 # Convert to ETH
if value > 0:
from_addr = tx['from'].lower()
to_addr = tx['to'].lower()
if to_addr not in value_flow['by_address']:
value_flow['by_address'][to_addr] = {'in': 0, 'out': 0}
if from_addr not in value_flow['by_address']:
value_flow['by_address'][from_addr] = {'in': 0, 'out': 0}
value_flow['by_address'][to_addr]['in'] += value
value_flow['by_address'][from_addr]['out'] += value
return value_flow

5
example.env Normal file
View File

@ -0,0 +1,5 @@
ETHERSCAN_API_KEY=
ETH_NODE_URL=https://eth.public-rpc.com
DATA_DIR=data
READ_DIR=input
RESULTS_DIR=results

View File

@ -0,0 +1,26 @@
{
"exchanges": {
"0x21a31ee1afc51d94c2efccaa2092ad1028285549": "Binance_1",
"0x28c6c06298d514db089934071355e5743bf21d60": "Binance_2",
"0x3a3c006053a9b40286b9951a11be4c5808c11dc8": "Binance_3",
"0x4976a4a02f38326660d17bf34b431dc6e2eb2327": "Binance_4",
"0x4a9e49a45a4b2545cb177f79c7381a30e1dc261f": "Binance_5",
"0x4aefa39caeadd662ae31ab0ce7c8c2c9c0a013e8": "Binance_6",
"0x4fdfe365436b5273a42f135c6a6244a20404271e": "Binance_7",
"0x56eddb7aa87536c09ccc2793473599fd21a8b17f": "Binance_8",
"0x5a52e96bacdabb82fd05763e25335261b270efcb": "Binance_9",
"0x835678a611b28684005a5e2233695fb6cbbb0007": "Binance_10",
"0x9696f59e4d72e237be84ffd425dcad154bf96976": "Binance_11",
"0xa7c0d36c4698981fab42a7d8c783674c6fe2592d": "Binance_12",
"0xbe0eb53f46cd790cd13851d5eff43d12404d33e8": "Binance_13",
"0xd3a22590f8243f8e83ac230d1842c9af0404c4a1": "Binance_14",
"0xdfd5293d8e347dfe59e90efd55b2956a1343963d": "Binance_15",
"0xf977814e90da44bfa03b6295a0616a897441acec": "Binance_16",
"0x030e37ddd7df1b43db172b23916d523f1599c6cb": "Binance_17",
"0x1b46970cfe6a271e884f636663c257a5a571fb2c": "Binance_18"
},
"protocols": {
"0x7a250d5630b4cf539739df2c5dacb4c659f2488d": "Uniswap_Router",
"0x5C69bEe701ef814a2B6a3EDD4B1652CB9cc5aA6f": "Uniswap_Factory"
}
}

39
main.py Normal file
View File

@ -0,0 +1,39 @@
from blockchain_analyzer import BlockchainAnalyzer
from report_generator import BlockchainReportGenerator
import argparse
import logging
import os
def main():
parser = argparse.ArgumentParser(description='Analyze blockchain contract for suspicious activity')
parser.add_argument('contract_address', help='The contract address to analyze')
parser.add_argument('--verbose', '-v', action='store_true', help='Enable verbose logging')
args = parser.parse_args()
# Setup logging
log_level = logging.DEBUG if args.verbose else logging.INFO
logging.basicConfig(
level=log_level,
format='%(asctime)s - %(levelname)s - %(message)s'
)
try:
analyzer = BlockchainAnalyzer()
analysis = analyzer.analyze_contract(args.contract_address)
if analysis:
report_generator = BlockchainReportGenerator()
report_path = report_generator.generate_report(analysis)
print(f"\nAnalysis complete!")
print(f"Report generated at: {report_path}")
print(f"Raw data and analysis files in: {os.path.join(os.getcwd(), 'data')}")
else:
logging.error(f"Failed to analyze contract {args.contract_address}")
except Exception as e:
logging.error(f"Error during analysis: {str(e)}")
raise
if __name__ == "__main__":
main()

212
report_generator.py Normal file
View File

@ -0,0 +1,212 @@
# report_generator.py
from string import Template
from mdutils import MdUtils
from datetime import datetime
import os
import json
from typing import Dict, List
class BlockchainReportGenerator:
def __init__(self, data_dir: str = 'data'):
self.data_dir = data_dir
self.template_dir = os.path.join(os.path.dirname(__file__), 'templates')
self.schema_dir = os.path.join(os.path.dirname(__file__), 'schema')
# Create directories if they don't exist
for directory in [self.data_dir, self.template_dir, self.schema_dir]:
os.makedirs(directory, exist_ok=True)
# Load known addresses
self.known_addresses = self._load_known_addresses()
def _load_known_addresses(self) -> Dict:
"""Load known addresses from JSON file"""
try:
with open(os.path.join('input', 'known_addresses.json')) as f:
return json.load(f)
except FileNotFoundError:
return {'exchanges': {}, 'protocols': {}}
def generate_report(self, analysis_data: Dict) -> str:
"""Generate markdown report from analysis data"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_name = f"analysis_report_{timestamp}"
md_file = MdUtils(file_name=os.path.join(self.data_dir, 'reports', report_name))
# Add title
md_file.new_header(level=1, title="Blockchain Fraud Analysis Report")
md_file.new_paragraph(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# Add table of contents
md_file.new_table_of_contents(table_title='Contents', depth=2)
# Add sections
self._add_executive_summary(md_file, analysis_data)
self._add_key_actors_section(md_file, analysis_data)
self._add_critical_events(md_file, analysis_data)
self._add_exchange_interactions(md_file, analysis_data)
self._add_risk_assessment(md_file, analysis_data)
self._add_technical_details(md_file, analysis_data)
self._add_transaction_legend(md_file)
md_file.create_md_file()
return os.path.join(self.data_dir, 'reports', f"{report_name}.md")
def _format_address(self, address: str, analysis_data: Dict) -> str:
"""Format address with label if known"""
address = address.lower()
# Check if contract or deployer
if address == analysis_data['contract_address'].lower():
return "Contract"
if address == analysis_data['deployer'].lower():
return "Deployer"
# Check known exchanges
if address in self.known_addresses['exchanges']:
return f"{self.known_addresses['exchanges'][address]}"
# Check known protocols
if address in self.known_addresses['protocols']:
return f"{self.known_addresses['protocols'][address]}"
# For unknown addresses, show shortened version
return f"`{address[:6]}...{address[-4:]}`"
def _add_executive_summary(self, md_file: MdUtils, data: Dict):
"""Add executive summary section"""
md_file.new_header(level=2, title="Executive Summary")
summary = [
f"**Contract Address**: `{data['contract_address']}`",
f"**Deployment Date**: {data['creation_date']}",
f"**Deployer**: `{data['deployer']}`",
f"**Analysis Period**: First hour after deployment",
f"**Exchange Interactions**: {len(data['exchange_interactions'])}"
]
md_file.new_paragraph("\n".join(summary))
def _add_key_actors_section(self, md_file: MdUtils, data: Dict):
"""Add key actors analysis section"""
md_file.new_header(level=2, title="Key Actors Analysis")
# Add deployer info
md_file.new_header(level=3, title="Deployer")
md_file.new_paragraph(f"Address: `{data['deployer']}`")
# Add early interactors
if data['key_actors']:
md_file.new_header(level=3, title="Early Interactors")
for actor in data['key_actors']:
md_file.new_paragraph(
f"- Address: `{actor['address']}`\n"
f" First Interaction: {datetime.fromtimestamp(int(actor['first_interaction'])).strftime('%Y-%m-%d %H:%M:%S')}"
)
def _add_critical_events(self, md_file: MdUtils, data: Dict):
"""Add critical events timeline"""
md_file.new_header(level=2, title="Critical Events Timeline")
critical_period = data['critical_period']
# Setup phase events
setup_events = []
if critical_period['setup_phase']['contract_creation']:
setup_events.append("🔨 Contract Creation")
if critical_period['setup_phase']['trading_enabled']:
setup_events.append("🚀 Trading Enabled")
if critical_period['setup_phase']['ownership_renounced']:
setup_events.append("🔑 Ownership Renounced")
if setup_events:
md_file.new_header(level=3, title="Setup Phase")
for event in setup_events:
md_file.new_paragraph(f"- {event}")
def _add_exchange_interactions(self, md_file: MdUtils, data: Dict):
"""Add exchange interactions section"""
md_file.new_header(level=2, title="Exchange Interactions")
if not data['exchange_interactions']:
md_file.new_paragraph("No exchange interactions detected")
return
for address, interactions in data['exchange_interactions'].items():
md_file.new_header(level=3, title=f"Address: `{address}`")
if interactions['incoming']:
md_file.new_paragraph("**Incoming Transfers**")
for tx in interactions['incoming']:
md_file.new_paragraph(
f"- From {tx['exchange']}: "
f"{float(tx['transaction']['value'])/1e18:.4f} ETH"
)
if interactions['outgoing']:
md_file.new_paragraph("**Outgoing Transfers**")
for tx in interactions['outgoing']:
md_file.new_paragraph(
f"- To {tx['exchange']}: "
f"{float(tx['transaction']['value'])/1e18:.4f} ETH"
)
def _add_risk_assessment(self, md_file: MdUtils, data: Dict):
"""Add risk assessment section"""
md_file.new_header(level=2, title="Risk Assessment")
risk_factors = []
risk_score = 0
# Check for quick trading enable
if (data['critical_period']['setup_phase']['trading_enabled'] and
data['critical_period']['setup_phase']['contract_creation']):
creation_time = int(data['critical_period']['setup_phase']['contract_creation']['timeStamp'])
trading_time = int(data['critical_period']['setup_phase']['trading_enabled']['timeStamp'])
setup_time = (trading_time - creation_time) / 60
if setup_time < 30:
risk_factors.append(f"⚠️ CRITICAL: Quick Trading Enable ({setup_time:.1f} minutes)")
risk_score += 30
# Add other risk factors...
# Overall risk rating
risk_rating = "LOW" if risk_score < 30 else "MEDIUM" if risk_score < 60 else "HIGH" if risk_score < 90 else "CRITICAL"
md_file.new_paragraph(f"**Overall Risk Rating**: {risk_rating} ({risk_score}/100)")
if risk_factors:
for factor in risk_factors:
md_file.new_paragraph(f"- {factor}")
def _add_technical_details(self, md_file: MdUtils, data: Dict):
"""Add technical details section"""
md_file.new_header(level=2, title="Technical Details")
# Add wash trading info if present
if 'wash_trading' in data:
wash_trading = data['wash_trading']
md_file.new_header(level=3, title="Wash Trading Analysis")
md_file.new_paragraph(
f"- Total Instances: {len(wash_trading['instances'])}\n"
f"- Total Volume: {wash_trading['total_volume']:.4f} ETH\n"
f"- Addresses Involved: {len(wash_trading['addresses_involved'])}"
)
def _add_transaction_legend(self, md_file: MdUtils):
"""Add transaction type explanations"""
md_file.new_header(level=2, title="Transaction Legend")
legend = {
"Transfer": "Direct ETH transfer between addresses",
"approve": "Grants permission for tokens to be spent by another address",
"addLiquidityETH": "Adds tokens and ETH to create a trading pair",
"enableTrading": "Enables trading functionality in the contract",
"renounceOwnership": "Permanently removes owner privileges",
"pac0as": "Suspicious custom function"
}
for action, description in legend.items():
md_file.new_paragraph(f"**{action}**: {description}")

5
requirements.txt Normal file
View File

@ -0,0 +1,5 @@
python-dotenv
web3
pandas
mdutils
requests

23
shell.nix Normal file
View File

@ -0,0 +1,23 @@
# shell.nix
{ pkgs ? import <nixpkgs> {} }:
pkgs.mkShell {
buildInputs = with pkgs; [
python312
python312Packages.pip
python312Packages.virtualenv
];
shellHook = ''
# Create and activate virtual environment
python -m venv .venv
source .venv/bin/activate
# Install requirements if they exist
if [ -f requirements.txt ]; then
pip install -r requirements.txt
fi
echo "Python virtual environment activated"
'';
}

View File

@ -0,0 +1,34 @@
# Blockchain Fraud Analysis Report
Generated on: ${TIMESTAMP}
## Table of Contents
${TOC}
## Executive Summary
**Contract Address**: `${CONTRACT_ADDRESS}`
**Deployment Date**: ${CREATION_DATE}
**Deployer**: `${DEPLOYER_ADDRESS}`
**Analysis Period**: First hour after deployment
## Key Actors Analysis
${ACTORS_ANALYSIS}
## Critical Events Timeline
${CRITICAL_EVENTS}
## Exchange Interactions
${EXCHANGE_INTERACTIONS}
## Risk Assessment
${RISK_ASSESSMENT}
## Technical Details
${TECHNICAL_DETAILS}
## Transaction Legend
- **Transfer**: Direct ETH transfer between addresses
- **approve**: Grants permission for tokens to be spent by another address
- **addLiquidityETH**: Adds tokens and ETH to create a trading pair
- **enableTrading**: Enables trading functionality in the contract
- **renounceOwnership**: Permanently removes owner privileges
- **pac0as**: Suspicious custom function potentially used for manipulation

View File

@ -0,0 +1,85 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Blockchain Analysis Report",
"description": "Analysis of suspicious blockchain activity",
"type": "object",
"properties": {
"contract_address": {
"type": "string",
"description": "The analyzed contract address"
},
"deployer": {
"type": "string",
"description": "Contract deployer address"
},
"creation_date": {
"type": "string",
"description": "Contract creation timestamp"
},
"key_actors": {
"type": "array",
"items": {
"type": "object",
"properties": {
"address": {"type": "string"},
"first_interaction": {"type": "string"},
"transaction_hash": {"type": "string"}
},
"required": ["address", "first_interaction"]
}
},
"critical_period": {
"type": "object",
"properties": {
"setup_phase": {
"type": "object",
"properties": {
"contract_creation": {"type": ["object", "null"]},
"trading_enabled": {"type": ["object", "null"]},
"ownership_renounced": {"type": ["object", "null"]},
"initial_liquidity": {
"type": "array",
"items": {"type": "object"}
}
}
},
"suspicious_patterns": {
"type": "object",
"properties": {
"rapid_transfers": {"type": "array"},
"large_transfers": {"type": "array"},
"wash_trading": {"type": "array"},
"suspicious_functions": {"type": "array"}
}
},
"timeline": {
"type": "array",
"items": {"type": "object"}
}
}
},
"exchange_interactions": {
"type": "object",
"additionalProperties": {
"type": "object",
"properties": {
"incoming": {"type": "array"},
"outgoing": {"type": "array"}
}
}
},
"wash_trading": {
"type": "object",
"properties": {
"instances": {"type": "array"},
"total_volume": {"type": "number"},
"addresses_involved": {
"type": "array",
"items": {"type": "string"}
},
"time_periods": {"type": "array"}
}
}
},
"required": ["contract_address", "deployer", "creation_date", "critical_period"]
}