trading-algo/main.py
Gal Podlipnik 69462cf3e0 first
2025-07-17 02:30:21 +02:00

346 lines
13 KiB
Python

#!/usr/bin/env python3
"""
Main Trading Application
Entry point for the automated trading system.
"""
import sys
import time
import signal
import argparse
from datetime import datetime, timedelta
import logging
# Add src directory to path for imports
sys.path.append('src')
sys.path.append('config')
from src.logger_setup import setup_logging, log_trading_action, log_performance_metric
from src.trading_engine import TradingEngine
from config.trading_config import trading_config
# Global variables for graceful shutdown
running = True
trading_engine = None
def signal_handler(signum, frame):
"""Handle shutdown signals gracefully"""
global running
logger = logging.getLogger(__name__)
logger.info(f"Received signal {signum}, shutting down gracefully...")
running = False
def run_backtesting_mode(strategy_type=None):
"""Run backtesting using historical data"""
logger = logging.getLogger(__name__)
strategy_name = strategy_type or trading_config.strategy_type
logger.info(f"Starting backtesting mode with {strategy_name} strategy")
try:
# Import backtesting logic from existing files
from src.data_handler import DataHandler
from src.strategy_factory import get_strategy
data_handler = DataHandler()
strategy = get_strategy(strategy_type)
# Get historical data (180 days for comprehensive backtest)
bars = data_handler.get_historical_data(days=180)
if bars.empty:
logger.error("No historical data available for backtesting")
return
# Calculate technical indicators
bars = data_handler.calculate_technical_indicators(bars)
# Generate signals
bars = strategy.generate_signals(bars)
# Run backtest simulation
results = run_backtest_simulation(bars, strategy)
# Display results
display_backtest_results(results, strategy.name)
except Exception as e:
logger.error(f"Error in backtesting mode: {e}")
def run_backtest_simulation(bars, strategy):
"""Run backtest simulation with position management"""
logger = logging.getLogger(__name__)
logger.info(f"Running backtest simulation with {strategy.name}")
# Initialize tracking variables
position = 0
entry_price = None
highest_price_since_entry = None
bars_since_entry = 0
# Initialize position tracking
bars['position'] = 0
for i in range(1, len(bars)):
current_price = bars['close'].iloc[i]
if bars['buy_signal'].iloc[i] and position == 0:
# Use strategy-specific entry conditions
if strategy.get_entry_conditions(bars, i):
position = 1
entry_price = current_price
highest_price_since_entry = current_price
bars_since_entry = 0
elif position == 1:
bars_since_entry += 1
# Update highest price
if current_price > highest_price_since_entry:
highest_price_since_entry = current_price
# Use strategy-specific exit conditions
should_exit, exit_reason = strategy.should_exit_position(
current_price=current_price,
entry_price=entry_price,
highest_price=highest_price_since_entry,
bars_since_entry=bars_since_entry,
sell_signal=bars['sell_signal'].iloc[i],
ema_trend=bars['ema_trend'].iloc[i]
)
if should_exit:
position = 0
entry_price = None
highest_price_since_entry = None
bars_since_entry = 0
bars.iloc[i, bars.columns.get_loc('position')] = position
# Calculate returns
bars['market_return'] = bars['close'].pct_change()
bars['strategy_return'] = bars['position'].shift(1) * bars['market_return']
# Apply trading costs
trade_cost = trading_config.trade_cost
bars['position_change'] = bars['position'].diff().abs()
bars['costs'] = bars['position_change'] * trade_cost
bars['strategy_return'] = bars['strategy_return'] - bars['costs']
# Cumulative returns
bars['cum_market'] = (1 + bars['market_return']).cumprod() - 1
bars['cum_strategy'] = (1 + bars['strategy_return']).cumprod() - 1
return calculate_performance_metrics(bars)
def calculate_performance_metrics(bars):
"""Calculate comprehensive performance metrics"""
# Basic returns
total_return = bars['cum_strategy'].iloc[-1] * 100
market_return_pct = bars['cum_market'].iloc[-1] * 100
# Trade analysis
position_changes = bars['position'].diff()
buys = (position_changes == 1).sum()
sells = (position_changes == -1).sum()
# Calculate individual trade returns
trade_returns = []
entry_price = None
for i in range(len(bars)):
if position_changes.iloc[i] == 1:
entry_price = bars['close'].iloc[i]
elif position_changes.iloc[i] == -1 and entry_price is not None:
exit_price = bars['close'].iloc[i]
trade_return = (exit_price / entry_price - 1) - (2 * trading_config.trade_cost)
trade_returns.append(trade_return)
entry_price = None
# Trade statistics
if len(trade_returns) > 0:
import numpy as np
trade_returns = np.array(trade_returns)
win_rate = (trade_returns > 0).mean() * 100
avg_win = trade_returns[trade_returns > 0].mean() * 100 if (trade_returns > 0).any() else 0
avg_loss = trade_returns[trade_returns < 0].mean() * 100 if (trade_returns < 0).any() else 0
max_win = trade_returns.max() * 100
max_loss = trade_returns.min() * 100
total_wins = trade_returns[trade_returns > 0].sum()
total_losses = abs(trade_returns[trade_returns < 0].sum())
profit_factor = total_wins / total_losses if total_losses > 0 else float('inf')
else:
win_rate = avg_win = avg_loss = max_win = max_loss = profit_factor = 0
# Risk metrics
if bars['strategy_return'].std() > 0:
import numpy as np
sharpe = bars['strategy_return'].mean() / bars['strategy_return'].std() * np.sqrt(365 * 24)
else:
sharpe = 0
# Max drawdown
running_max = bars['cum_strategy'].cummax()
drawdown = (bars['cum_strategy'] - running_max)
max_drawdown = drawdown.min() * 100
# Time in market
time_in_market = (bars['position'] > 0).mean() * 100
return {
'total_return': total_return,
'market_return': market_return_pct,
'outperformance': total_return - market_return_pct,
'sharpe_ratio': sharpe,
'max_drawdown': max_drawdown,
'total_trades': len(trade_returns),
'win_rate': win_rate,
'profit_factor': profit_factor,
'avg_win': avg_win,
'avg_loss': avg_loss,
'max_win': max_win,
'max_loss': max_loss,
'time_in_market': time_in_market,
'buy_signals': buys,
'sell_signals': sells
}
def display_backtest_results(results, strategy_name):
"""Display backtest results in a formatted way"""
print("\n" + "="*60)
print(f"BACKTEST RESULTS - {strategy_name.upper()}")
print("="*60)
print(f"Strategy Return: {results['total_return']:>8.2f}%")
print(f"Market Return: {results['market_return']:>8.2f}%")
print(f"Outperformance: {results['outperformance']:>8.2f}%")
print(f"Sharpe Ratio: {results['sharpe_ratio']:>8.2f}")
print(f"Max Drawdown: {results['max_drawdown']:>8.2f}%")
print(f"Total Trades: {results['total_trades']:>8}")
print(f"Win Rate: {results['win_rate']:>8.2f}%")
print(f"Profit Factor: {results['profit_factor']:>8.2f}")
print(f"Avg Win: {results['avg_win']:>8.2f}%")
print(f"Avg Loss: {results['avg_loss']:>8.2f}%")
print(f"Max Win: {results['max_win']:>8.2f}%")
print(f"Max Loss: {results['max_loss']:>8.2f}%")
print(f"Time in Market: {results['time_in_market']:>8.1f}%")
print("="*60)
def run_live_trading_mode():
"""Run live trading mode"""
global trading_engine, running
logger = logging.getLogger(__name__)
logger.info("Starting live trading mode")
try:
# Initialize trading engine
trading_engine = TradingEngine(paper_trading=True)
# Main trading loop
cycle_count = 0
last_summary_time = datetime.now()
while running:
try:
# Run trading cycle
cycle_results = trading_engine.run_trading_cycle()
cycle_count += 1
# Log cycle results
if cycle_results['success']:
logger.info(f"Cycle {cycle_count} completed successfully")
if cycle_results['actions_taken']:
log_trading_action("trading_cycle", {
'cycle': cycle_count,
'actions': cycle_results['actions_taken'],
'price': cycle_results['current_price'],
'signals': cycle_results['signals']
})
# Log performance metrics periodically
if datetime.now() - last_summary_time > timedelta(hours=1):
summary = trading_engine.get_trading_summary()
if summary['current_position'] > 0 and summary['entry_price']:
current_profit = (cycle_results['current_price'] / summary['entry_price'] - 1) * 100
log_performance_metric("unrealized_pnl_pct", current_profit, {
'position': summary['current_position'],
'entry_price': summary['entry_price'],
'current_price': cycle_results['current_price']
})
last_summary_time = datetime.now()
else:
logger.error(f"Cycle {cycle_count} failed")
# Wait before next cycle (3600 seconds = 1 hour for hourly timeframe)
sleep_duration = 3600 # 1 hour
for _ in range(sleep_duration):
if not running:
break
time.sleep(1)
except KeyboardInterrupt:
logger.info("Received keyboard interrupt")
running = False
break
except Exception as e:
logger.error(f"Error in trading loop: {e}")
time.sleep(60) # Wait 1 minute before retrying
logger.info("Live trading mode stopped")
except Exception as e:
logger.error(f"Error in live trading mode: {e}")
def main():
"""Main application entry point"""
# Setup command line arguments
parser = argparse.ArgumentParser(description='Automated Trading System')
parser.add_argument('--mode', choices=['live', 'backtest'], default='backtest',
help='Trading mode: live or backtest (default: backtest)')
parser.add_argument('--strategy', choices=['conservative', 'enhanced'],
default=None, help='Strategy type (default: from config)')
parser.add_argument('--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
default='INFO', help='Logging level (default: INFO)')
args = parser.parse_args()
# Setup logging
setup_logging()
logger = logging.getLogger(__name__)
# Set log level from command line
logging.getLogger().setLevel(getattr(logging, args.log_level))
# Setup signal handlers for graceful shutdown
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
logger.info(f"Starting trading system in {args.mode} mode")
# Override strategy type if specified
if args.strategy:
logger.info(f"Strategy override: {args.strategy}")
# We'll pass this to the functions that need it
try:
if args.mode == 'backtest':
run_backtesting_mode(args.strategy)
elif args.mode == 'live':
run_live_trading_mode()
except Exception as e:
logger.error(f"Critical error in main: {e}")
return 1
logger.info("Trading system shutdown complete")
return 0
if __name__ == "__main__":
exit_code = main()
sys.exit(exit_code)