trading-algo/main.py
Gal Podlipnik 761e595389 docker
2025-07-17 02:43:19 +02:00

390 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Main Trading Application
Entry point for the automated trading system.
Supports Docker deployment with health monitoring.
"""
import sys
import time
import signal
import argparse
from datetime import datetime, timedelta
import logging
# Add src directory to path for imports
sys.path.append('src')
sys.path.append('config')
from src.logger_setup import setup_logging, log_trading_action, log_performance_metric
from src.trading_engine import TradingEngine
from config.trading_config import trading_config
# Global variables for graceful shutdown
running = True
trading_engine = None
last_health_check = datetime.now()
def signal_handler(signum, frame):
"""Handle shutdown signals gracefully"""
global running
logger = logging.getLogger(__name__)
logger.info(f"Received signal {signum}, shutting down gracefully...")
running = False
def health_check():
"""Perform health check for Docker monitoring"""
global last_health_check
try:
from src.data_handler import DataHandler
data_handler = DataHandler()
# Quick health check - just verify we can get market data
price = data_handler.get_latest_price(trading_config.symbol)
last_health_check = datetime.now()
# Write health status to file for Docker monitoring
with open('/tmp/health', 'w') as f:
f.write(f"OK:{datetime.now().isoformat()}:{price}")
return True
except Exception as e:
logger = logging.getLogger(__name__)
logger.error(f"Health check failed: {e}")
return False
def run_backtesting_mode(strategy_type=None):
"""Run backtesting using historical data"""
logger = logging.getLogger(__name__)
strategy_name = strategy_type or trading_config.strategy_type
logger.info(f"Starting backtesting mode with {strategy_name} strategy")
try:
# Import backtesting logic from existing files
from src.data_handler import DataHandler
from src.strategy_factory import get_strategy
data_handler = DataHandler()
strategy = get_strategy(strategy_type)
# Get historical data (180 days for comprehensive backtest)
bars = data_handler.get_historical_data(days=180)
if bars.empty:
logger.error("No historical data available for backtesting")
return
# Calculate technical indicators
bars = data_handler.calculate_technical_indicators(bars)
# Generate signals
bars = strategy.generate_signals(bars)
# Run backtest simulation
results = run_backtest_simulation(bars, strategy)
# Display results
display_backtest_results(results, strategy.name)
except Exception as e:
logger.error(f"Error in backtesting mode: {e}")
def run_backtest_simulation(bars, strategy):
"""Run backtest simulation with position management"""
logger = logging.getLogger(__name__)
logger.info(f"Running backtest simulation with {strategy.name}")
# Initialize tracking variables
position = 0
entry_price = None
highest_price_since_entry = None
bars_since_entry = 0
# Initialize position tracking
bars['position'] = 0
for i in range(1, len(bars)):
current_price = bars['close'].iloc[i]
if bars['buy_signal'].iloc[i] and position == 0:
# Use strategy-specific entry conditions
if strategy.get_entry_conditions(bars, i):
position = 1
entry_price = current_price
highest_price_since_entry = current_price
bars_since_entry = 0
elif position == 1:
bars_since_entry += 1
# Update highest price
if current_price > highest_price_since_entry:
highest_price_since_entry = current_price
# Use strategy-specific exit conditions
should_exit, exit_reason = strategy.should_exit_position(
current_price=current_price,
entry_price=entry_price,
highest_price=highest_price_since_entry,
bars_since_entry=bars_since_entry,
sell_signal=bars['sell_signal'].iloc[i],
ema_trend=bars['ema_trend'].iloc[i]
)
if should_exit:
position = 0
entry_price = None
highest_price_since_entry = None
bars_since_entry = 0
bars.iloc[i, bars.columns.get_loc('position')] = position
# Calculate returns
bars['market_return'] = bars['close'].pct_change()
bars['strategy_return'] = bars['position'].shift(1) * bars['market_return']
# Apply trading costs
trade_cost = trading_config.trade_cost
bars['position_change'] = bars['position'].diff().abs()
bars['costs'] = bars['position_change'] * trade_cost
bars['strategy_return'] = bars['strategy_return'] - bars['costs']
# Cumulative returns
bars['cum_market'] = (1 + bars['market_return']).cumprod() - 1
bars['cum_strategy'] = (1 + bars['strategy_return']).cumprod() - 1
return calculate_performance_metrics(bars)
def calculate_performance_metrics(bars):
"""Calculate comprehensive performance metrics"""
# Basic returns
total_return = bars['cum_strategy'].iloc[-1] * 100
market_return_pct = bars['cum_market'].iloc[-1] * 100
# Trade analysis
position_changes = bars['position'].diff()
buys = (position_changes == 1).sum()
sells = (position_changes == -1).sum()
# Calculate individual trade returns
trade_returns = []
entry_price = None
for i in range(len(bars)):
if position_changes.iloc[i] == 1:
entry_price = bars['close'].iloc[i]
elif position_changes.iloc[i] == -1 and entry_price is not None:
exit_price = bars['close'].iloc[i]
trade_return = (exit_price / entry_price - 1) - (2 * trading_config.trade_cost)
trade_returns.append(trade_return)
entry_price = None
# Trade statistics
if len(trade_returns) > 0:
import numpy as np
trade_returns = np.array(trade_returns)
win_rate = (trade_returns > 0).mean() * 100
avg_win = trade_returns[trade_returns > 0].mean() * 100 if (trade_returns > 0).any() else 0
avg_loss = trade_returns[trade_returns < 0].mean() * 100 if (trade_returns < 0).any() else 0
max_win = trade_returns.max() * 100
max_loss = trade_returns.min() * 100
total_wins = trade_returns[trade_returns > 0].sum()
total_losses = abs(trade_returns[trade_returns < 0].sum())
profit_factor = total_wins / total_losses if total_losses > 0 else float('inf')
else:
win_rate = avg_win = avg_loss = max_win = max_loss = profit_factor = 0
# Risk metrics
if bars['strategy_return'].std() > 0:
import numpy as np
sharpe = bars['strategy_return'].mean() / bars['strategy_return'].std() * np.sqrt(365 * 24)
else:
sharpe = 0
# Max drawdown
running_max = bars['cum_strategy'].cummax()
drawdown = (bars['cum_strategy'] - running_max)
max_drawdown = drawdown.min() * 100
# Time in market
time_in_market = (bars['position'] > 0).mean() * 100
return {
'total_return': total_return,
'market_return': market_return_pct,
'outperformance': total_return - market_return_pct,
'sharpe_ratio': sharpe,
'max_drawdown': max_drawdown,
'total_trades': len(trade_returns),
'win_rate': win_rate,
'profit_factor': profit_factor,
'avg_win': avg_win,
'avg_loss': avg_loss,
'max_win': max_win,
'max_loss': max_loss,
'time_in_market': time_in_market,
'buy_signals': buys,
'sell_signals': sells
}
def display_backtest_results(results, strategy_name):
"""Display backtest results in a formatted way"""
print("\n" + "="*60)
print(f"BACKTEST RESULTS - {strategy_name.upper()}")
print("="*60)
print(f"Strategy Return: {results['total_return']:>8.2f}%")
print(f"Market Return: {results['market_return']:>8.2f}%")
print(f"Outperformance: {results['outperformance']:>8.2f}%")
print(f"Sharpe Ratio: {results['sharpe_ratio']:>8.2f}")
print(f"Max Drawdown: {results['max_drawdown']:>8.2f}%")
print(f"Total Trades: {results['total_trades']:>8}")
print(f"Win Rate: {results['win_rate']:>8.2f}%")
print(f"Profit Factor: {results['profit_factor']:>8.2f}")
print(f"Avg Win: {results['avg_win']:>8.2f}%")
print(f"Avg Loss: {results['avg_loss']:>8.2f}%")
print(f"Max Win: {results['max_win']:>8.2f}%")
print(f"Max Loss: {results['max_loss']:>8.2f}%")
print(f"Time in Market: {results['time_in_market']:>8.1f}%")
print("="*60)
def run_live_trading_mode():
"""Run live trading mode with Docker health monitoring"""
global trading_engine, running
logger = logging.getLogger(__name__)
logger.info("🚀 Starting live trading mode for 24/7 operation")
try:
# Initialize trading engine
trading_engine = TradingEngine(paper_trading=True)
# Track start time for uptime monitoring
start_time = datetime.now()
# Main trading loop
cycle_count = 0
last_summary_time = datetime.now()
# Initial health check
health_check()
logger.info("📊 Entering main trading loop...")
while running:
try:
# Run trading cycle
cycle_results = trading_engine.run_trading_cycle()
cycle_count += 1
# Log cycle results
if cycle_results['success']:
logger.info(f"✅ Cycle {cycle_count} completed successfully")
if cycle_results['actions_taken']:
log_trading_action("trading_cycle", {
'cycle': cycle_count,
'actions': cycle_results['actions_taken'],
'price': cycle_results['current_price'],
'signals': cycle_results['signals']
})
# Log performance metrics periodically and health check
if datetime.now() - last_summary_time > timedelta(hours=1):
# Performance logging
summary = trading_engine.get_trading_summary()
if summary['current_position'] > 0 and summary['entry_price']:
current_profit = (cycle_results['current_price'] / summary['entry_price'] - 1) * 100
log_performance_metric("unrealized_pnl_pct", current_profit, {
'position': summary['current_position'],
'entry_price': summary['entry_price'],
'current_price': cycle_results['current_price']
})
# Health check for Docker monitoring
health_check()
last_summary_time = datetime.now()
# Docker deployment status
logger.info(f"🐳 Docker Status - Cycle: {cycle_count}, Uptime: {datetime.now() - start_time}")
else:
logger.error(f"Cycle {cycle_count} failed")
# Wait before next cycle (3600 seconds = 1 hour for hourly timeframe)
sleep_duration = 3600 # 1 hour
# Health check every 5 minutes during sleep
for i in range(sleep_duration):
if not running:
break
if i % 300 == 0: # Every 5 minutes
health_check()
time.sleep(1)
except KeyboardInterrupt:
logger.info("Received keyboard interrupt")
running = False
break
except Exception as e:
logger.error(f"Error in trading loop: {e}")
time.sleep(60) # Wait 1 minute before retrying
logger.info("Live trading mode stopped")
except Exception as e:
logger.error(f"Error in live trading mode: {e}")
def main():
"""Main application entry point"""
# Setup command line arguments
parser = argparse.ArgumentParser(description='Automated Trading System')
parser.add_argument('--mode', choices=['live', 'paper', 'backtest'], default='backtest',
help='Trading mode: live, paper, or backtest (default: backtest)')
parser.add_argument('--strategy', choices=['conservative', 'enhanced'],
default=None, help='Strategy type (default: from config)')
parser.add_argument('--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
default='INFO', help='Logging level (default: INFO)')
parser.add_argument('--days', type=int, default=180,
help='Days of historical data for backtesting (default: 180)')
args = parser.parse_args()
# Setup logging
setup_logging()
logger = logging.getLogger(__name__)
# Set log level from command line
logging.getLogger().setLevel(getattr(logging, args.log_level))
# Setup signal handlers for graceful shutdown
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
logger.info(f"Starting trading system in {args.mode} mode")
# Override strategy type if specified
if args.strategy:
logger.info(f"Strategy override: {args.strategy}")
# We'll pass this to the functions that need it
try:
if args.mode == 'backtest':
run_backtesting_mode(args.strategy)
elif args.mode in ['live', 'paper']:
# Both live and paper use the same function - paper is safer
logger.info(f"🛡️ Running in {'PAPER' if args.mode == 'paper' else 'LIVE'} trading mode")
run_live_trading_mode()
except Exception as e:
logger.error(f"Critical error in main: {e}")
return 1
logger.info("Trading system shutdown complete")
return 0
if __name__ == "__main__":
exit_code = main()
sys.exit(exit_code)