jeirmeister
391a2fa35d
Initial implementation of a tool to analyze Ethereum smart contracts for fraudulent patterns. Currently supports: - Contract deployment analysis - Early actor identification - Exchange interaction tracking - Wash trading detection - Suspicious pattern recognition Known issues: - Directory structure needs cleanup - Some indentation errors to fix - Missing proper error handling - Needs better report formatting
213 lines
8.8 KiB
Python
213 lines
8.8 KiB
Python
# report_generator.py
|
|
from string import Template
|
|
from mdutils import MdUtils
|
|
from datetime import datetime
|
|
import os
|
|
import json
|
|
from typing import Dict, List
|
|
|
|
|
|
class BlockchainReportGenerator:
|
|
def __init__(self, data_dir: str = 'data'):
|
|
self.data_dir = data_dir
|
|
self.template_dir = os.path.join(os.path.dirname(__file__), 'templates')
|
|
self.schema_dir = os.path.join(os.path.dirname(__file__), 'schema')
|
|
|
|
# Create directories if they don't exist
|
|
for directory in [self.data_dir, self.template_dir, self.schema_dir]:
|
|
os.makedirs(directory, exist_ok=True)
|
|
|
|
# Load known addresses
|
|
self.known_addresses = self._load_known_addresses()
|
|
|
|
def _load_known_addresses(self) -> Dict:
|
|
"""Load known addresses from JSON file"""
|
|
try:
|
|
with open(os.path.join('input', 'known_addresses.json')) as f:
|
|
return json.load(f)
|
|
except FileNotFoundError:
|
|
return {'exchanges': {}, 'protocols': {}}
|
|
|
|
def generate_report(self, analysis_data: Dict) -> str:
|
|
"""Generate markdown report from analysis data"""
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
report_name = f"analysis_report_{timestamp}"
|
|
|
|
md_file = MdUtils(file_name=os.path.join(self.data_dir, 'reports', report_name))
|
|
|
|
# Add title
|
|
md_file.new_header(level=1, title="Blockchain Fraud Analysis Report")
|
|
md_file.new_paragraph(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
|
|
# Add table of contents
|
|
md_file.new_table_of_contents(table_title='Contents', depth=2)
|
|
|
|
# Add sections
|
|
self._add_executive_summary(md_file, analysis_data)
|
|
self._add_key_actors_section(md_file, analysis_data)
|
|
self._add_critical_events(md_file, analysis_data)
|
|
self._add_exchange_interactions(md_file, analysis_data)
|
|
self._add_risk_assessment(md_file, analysis_data)
|
|
self._add_technical_details(md_file, analysis_data)
|
|
self._add_transaction_legend(md_file)
|
|
|
|
md_file.create_md_file()
|
|
return os.path.join(self.data_dir, 'reports', f"{report_name}.md")
|
|
|
|
def _format_address(self, address: str, analysis_data: Dict) -> str:
|
|
"""Format address with label if known"""
|
|
address = address.lower()
|
|
|
|
# Check if contract or deployer
|
|
if address == analysis_data['contract_address'].lower():
|
|
return "Contract"
|
|
if address == analysis_data['deployer'].lower():
|
|
return "Deployer"
|
|
|
|
# Check known exchanges
|
|
if address in self.known_addresses['exchanges']:
|
|
return f"{self.known_addresses['exchanges'][address]}"
|
|
|
|
# Check known protocols
|
|
if address in self.known_addresses['protocols']:
|
|
return f"{self.known_addresses['protocols'][address]}"
|
|
|
|
# For unknown addresses, show shortened version
|
|
return f"`{address[:6]}...{address[-4:]}`"
|
|
|
|
def _add_executive_summary(self, md_file: MdUtils, data: Dict):
|
|
"""Add executive summary section"""
|
|
md_file.new_header(level=2, title="Executive Summary")
|
|
|
|
summary = [
|
|
f"**Contract Address**: `{data['contract_address']}`",
|
|
f"**Deployment Date**: {data['creation_date']}",
|
|
f"**Deployer**: `{data['deployer']}`",
|
|
f"**Analysis Period**: First hour after deployment",
|
|
f"**Exchange Interactions**: {len(data['exchange_interactions'])}"
|
|
]
|
|
|
|
md_file.new_paragraph("\n".join(summary))
|
|
|
|
def _add_key_actors_section(self, md_file: MdUtils, data: Dict):
|
|
"""Add key actors analysis section"""
|
|
md_file.new_header(level=2, title="Key Actors Analysis")
|
|
|
|
# Add deployer info
|
|
md_file.new_header(level=3, title="Deployer")
|
|
md_file.new_paragraph(f"Address: `{data['deployer']}`")
|
|
|
|
# Add early interactors
|
|
if data['key_actors']:
|
|
md_file.new_header(level=3, title="Early Interactors")
|
|
for actor in data['key_actors']:
|
|
md_file.new_paragraph(
|
|
f"- Address: `{actor['address']}`\n"
|
|
f" First Interaction: {datetime.fromtimestamp(int(actor['first_interaction'])).strftime('%Y-%m-%d %H:%M:%S')}"
|
|
)
|
|
|
|
def _add_critical_events(self, md_file: MdUtils, data: Dict):
|
|
"""Add critical events timeline"""
|
|
md_file.new_header(level=2, title="Critical Events Timeline")
|
|
|
|
critical_period = data['critical_period']
|
|
|
|
# Setup phase events
|
|
setup_events = []
|
|
if critical_period['setup_phase']['contract_creation']:
|
|
setup_events.append("🔨 Contract Creation")
|
|
if critical_period['setup_phase']['trading_enabled']:
|
|
setup_events.append("🚀 Trading Enabled")
|
|
if critical_period['setup_phase']['ownership_renounced']:
|
|
setup_events.append("🔑 Ownership Renounced")
|
|
|
|
if setup_events:
|
|
md_file.new_header(level=3, title="Setup Phase")
|
|
for event in setup_events:
|
|
md_file.new_paragraph(f"- {event}")
|
|
|
|
def _add_exchange_interactions(self, md_file: MdUtils, data: Dict):
|
|
"""Add exchange interactions section"""
|
|
md_file.new_header(level=2, title="Exchange Interactions")
|
|
|
|
if not data['exchange_interactions']:
|
|
md_file.new_paragraph("No exchange interactions detected")
|
|
return
|
|
|
|
for address, interactions in data['exchange_interactions'].items():
|
|
md_file.new_header(level=3, title=f"Address: `{address}`")
|
|
|
|
if interactions['incoming']:
|
|
md_file.new_paragraph("**Incoming Transfers**")
|
|
for tx in interactions['incoming']:
|
|
md_file.new_paragraph(
|
|
f"- From {tx['exchange']}: "
|
|
f"{float(tx['transaction']['value'])/1e18:.4f} ETH"
|
|
)
|
|
|
|
if interactions['outgoing']:
|
|
md_file.new_paragraph("**Outgoing Transfers**")
|
|
for tx in interactions['outgoing']:
|
|
md_file.new_paragraph(
|
|
f"- To {tx['exchange']}: "
|
|
f"{float(tx['transaction']['value'])/1e18:.4f} ETH"
|
|
)
|
|
|
|
def _add_risk_assessment(self, md_file: MdUtils, data: Dict):
|
|
"""Add risk assessment section"""
|
|
md_file.new_header(level=2, title="Risk Assessment")
|
|
|
|
risk_factors = []
|
|
risk_score = 0
|
|
|
|
# Check for quick trading enable
|
|
if (data['critical_period']['setup_phase']['trading_enabled'] and
|
|
data['critical_period']['setup_phase']['contract_creation']):
|
|
creation_time = int(data['critical_period']['setup_phase']['contract_creation']['timeStamp'])
|
|
trading_time = int(data['critical_period']['setup_phase']['trading_enabled']['timeStamp'])
|
|
setup_time = (trading_time - creation_time) / 60
|
|
|
|
if setup_time < 30:
|
|
risk_factors.append(f"⚠️ CRITICAL: Quick Trading Enable ({setup_time:.1f} minutes)")
|
|
risk_score += 30
|
|
|
|
# Add other risk factors...
|
|
|
|
# Overall risk rating
|
|
risk_rating = "LOW" if risk_score < 30 else "MEDIUM" if risk_score < 60 else "HIGH" if risk_score < 90 else "CRITICAL"
|
|
md_file.new_paragraph(f"**Overall Risk Rating**: {risk_rating} ({risk_score}/100)")
|
|
|
|
if risk_factors:
|
|
for factor in risk_factors:
|
|
md_file.new_paragraph(f"- {factor}")
|
|
|
|
def _add_technical_details(self, md_file: MdUtils, data: Dict):
|
|
"""Add technical details section"""
|
|
md_file.new_header(level=2, title="Technical Details")
|
|
|
|
# Add wash trading info if present
|
|
if 'wash_trading' in data:
|
|
wash_trading = data['wash_trading']
|
|
md_file.new_header(level=3, title="Wash Trading Analysis")
|
|
md_file.new_paragraph(
|
|
f"- Total Instances: {len(wash_trading['instances'])}\n"
|
|
f"- Total Volume: {wash_trading['total_volume']:.4f} ETH\n"
|
|
f"- Addresses Involved: {len(wash_trading['addresses_involved'])}"
|
|
)
|
|
|
|
def _add_transaction_legend(self, md_file: MdUtils):
|
|
"""Add transaction type explanations"""
|
|
md_file.new_header(level=2, title="Transaction Legend")
|
|
|
|
legend = {
|
|
"Transfer": "Direct ETH transfer between addresses",
|
|
"approve": "Grants permission for tokens to be spent by another address",
|
|
"addLiquidityETH": "Adds tokens and ETH to create a trading pair",
|
|
"enableTrading": "Enables trading functionality in the contract",
|
|
"renounceOwnership": "Permanently removes owner privileges",
|
|
"pac0as": "Suspicious custom function"
|
|
}
|
|
|
|
for action, description in legend.items():
|
|
md_file.new_paragraph(f"**{action}**: {description}")
|