blockchain-risk-analyzer/report_generator.py
jeirmeister 391a2fa35d Initial commit: Blockchain fraud detection analyzer
Initial implementation of a tool to analyze Ethereum smart contracts for fraudulent patterns.
Currently supports:
- Contract deployment analysis
- Early actor identification
- Exchange interaction tracking
- Wash trading detection
- Suspicious pattern recognition

Known issues:
- Directory structure needs cleanup
- Some indentation errors to fix
- Missing proper error handling
- Needs better report formatting
2024-11-10 16:10:27 -08:00

213 lines
8.8 KiB
Python

# report_generator.py
from string import Template
from mdutils import MdUtils
from datetime import datetime
import os
import json
from typing import Dict, List
class BlockchainReportGenerator:
def __init__(self, data_dir: str = 'data'):
self.data_dir = data_dir
self.template_dir = os.path.join(os.path.dirname(__file__), 'templates')
self.schema_dir = os.path.join(os.path.dirname(__file__), 'schema')
# Create directories if they don't exist
for directory in [self.data_dir, self.template_dir, self.schema_dir]:
os.makedirs(directory, exist_ok=True)
# Load known addresses
self.known_addresses = self._load_known_addresses()
def _load_known_addresses(self) -> Dict:
"""Load known addresses from JSON file"""
try:
with open(os.path.join('input', 'known_addresses.json')) as f:
return json.load(f)
except FileNotFoundError:
return {'exchanges': {}, 'protocols': {}}
def generate_report(self, analysis_data: Dict) -> str:
"""Generate markdown report from analysis data"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_name = f"analysis_report_{timestamp}"
md_file = MdUtils(file_name=os.path.join(self.data_dir, 'reports', report_name))
# Add title
md_file.new_header(level=1, title="Blockchain Fraud Analysis Report")
md_file.new_paragraph(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# Add table of contents
md_file.new_table_of_contents(table_title='Contents', depth=2)
# Add sections
self._add_executive_summary(md_file, analysis_data)
self._add_key_actors_section(md_file, analysis_data)
self._add_critical_events(md_file, analysis_data)
self._add_exchange_interactions(md_file, analysis_data)
self._add_risk_assessment(md_file, analysis_data)
self._add_technical_details(md_file, analysis_data)
self._add_transaction_legend(md_file)
md_file.create_md_file()
return os.path.join(self.data_dir, 'reports', f"{report_name}.md")
def _format_address(self, address: str, analysis_data: Dict) -> str:
"""Format address with label if known"""
address = address.lower()
# Check if contract or deployer
if address == analysis_data['contract_address'].lower():
return "Contract"
if address == analysis_data['deployer'].lower():
return "Deployer"
# Check known exchanges
if address in self.known_addresses['exchanges']:
return f"{self.known_addresses['exchanges'][address]}"
# Check known protocols
if address in self.known_addresses['protocols']:
return f"{self.known_addresses['protocols'][address]}"
# For unknown addresses, show shortened version
return f"`{address[:6]}...{address[-4:]}`"
def _add_executive_summary(self, md_file: MdUtils, data: Dict):
"""Add executive summary section"""
md_file.new_header(level=2, title="Executive Summary")
summary = [
f"**Contract Address**: `{data['contract_address']}`",
f"**Deployment Date**: {data['creation_date']}",
f"**Deployer**: `{data['deployer']}`",
f"**Analysis Period**: First hour after deployment",
f"**Exchange Interactions**: {len(data['exchange_interactions'])}"
]
md_file.new_paragraph("\n".join(summary))
def _add_key_actors_section(self, md_file: MdUtils, data: Dict):
"""Add key actors analysis section"""
md_file.new_header(level=2, title="Key Actors Analysis")
# Add deployer info
md_file.new_header(level=3, title="Deployer")
md_file.new_paragraph(f"Address: `{data['deployer']}`")
# Add early interactors
if data['key_actors']:
md_file.new_header(level=3, title="Early Interactors")
for actor in data['key_actors']:
md_file.new_paragraph(
f"- Address: `{actor['address']}`\n"
f" First Interaction: {datetime.fromtimestamp(int(actor['first_interaction'])).strftime('%Y-%m-%d %H:%M:%S')}"
)
def _add_critical_events(self, md_file: MdUtils, data: Dict):
"""Add critical events timeline"""
md_file.new_header(level=2, title="Critical Events Timeline")
critical_period = data['critical_period']
# Setup phase events
setup_events = []
if critical_period['setup_phase']['contract_creation']:
setup_events.append("🔨 Contract Creation")
if critical_period['setup_phase']['trading_enabled']:
setup_events.append("🚀 Trading Enabled")
if critical_period['setup_phase']['ownership_renounced']:
setup_events.append("🔑 Ownership Renounced")
if setup_events:
md_file.new_header(level=3, title="Setup Phase")
for event in setup_events:
md_file.new_paragraph(f"- {event}")
def _add_exchange_interactions(self, md_file: MdUtils, data: Dict):
"""Add exchange interactions section"""
md_file.new_header(level=2, title="Exchange Interactions")
if not data['exchange_interactions']:
md_file.new_paragraph("No exchange interactions detected")
return
for address, interactions in data['exchange_interactions'].items():
md_file.new_header(level=3, title=f"Address: `{address}`")
if interactions['incoming']:
md_file.new_paragraph("**Incoming Transfers**")
for tx in interactions['incoming']:
md_file.new_paragraph(
f"- From {tx['exchange']}: "
f"{float(tx['transaction']['value'])/1e18:.4f} ETH"
)
if interactions['outgoing']:
md_file.new_paragraph("**Outgoing Transfers**")
for tx in interactions['outgoing']:
md_file.new_paragraph(
f"- To {tx['exchange']}: "
f"{float(tx['transaction']['value'])/1e18:.4f} ETH"
)
def _add_risk_assessment(self, md_file: MdUtils, data: Dict):
"""Add risk assessment section"""
md_file.new_header(level=2, title="Risk Assessment")
risk_factors = []
risk_score = 0
# Check for quick trading enable
if (data['critical_period']['setup_phase']['trading_enabled'] and
data['critical_period']['setup_phase']['contract_creation']):
creation_time = int(data['critical_period']['setup_phase']['contract_creation']['timeStamp'])
trading_time = int(data['critical_period']['setup_phase']['trading_enabled']['timeStamp'])
setup_time = (trading_time - creation_time) / 60
if setup_time < 30:
risk_factors.append(f"⚠️ CRITICAL: Quick Trading Enable ({setup_time:.1f} minutes)")
risk_score += 30
# Add other risk factors...
# Overall risk rating
risk_rating = "LOW" if risk_score < 30 else "MEDIUM" if risk_score < 60 else "HIGH" if risk_score < 90 else "CRITICAL"
md_file.new_paragraph(f"**Overall Risk Rating**: {risk_rating} ({risk_score}/100)")
if risk_factors:
for factor in risk_factors:
md_file.new_paragraph(f"- {factor}")
def _add_technical_details(self, md_file: MdUtils, data: Dict):
"""Add technical details section"""
md_file.new_header(level=2, title="Technical Details")
# Add wash trading info if present
if 'wash_trading' in data:
wash_trading = data['wash_trading']
md_file.new_header(level=3, title="Wash Trading Analysis")
md_file.new_paragraph(
f"- Total Instances: {len(wash_trading['instances'])}\n"
f"- Total Volume: {wash_trading['total_volume']:.4f} ETH\n"
f"- Addresses Involved: {len(wash_trading['addresses_involved'])}"
)
def _add_transaction_legend(self, md_file: MdUtils):
"""Add transaction type explanations"""
md_file.new_header(level=2, title="Transaction Legend")
legend = {
"Transfer": "Direct ETH transfer between addresses",
"approve": "Grants permission for tokens to be spent by another address",
"addLiquidityETH": "Adds tokens and ETH to create a trading pair",
"enableTrading": "Enables trading functionality in the contract",
"renounceOwnership": "Permanently removes owner privileges",
"pac0as": "Suspicious custom function"
}
for action, description in legend.items():
md_file.new_paragraph(f"**{action}**: {description}")