Insights/Insights & Articles/Building Trading Bots in Python: From Strategy to Live Execution

Building Trading Bots in Python: From Strategy to Live Execution

We've been writing trading bots in MQL4 for over a decade, but Python is where we go when we want to build something properly modular without fighting the language. For connecting to exchanges, crunching data with pandas, running strategy logic on minute or hourly bars? Python destroys. It talks to every exchange, every data source, every ML library you'll ever need.

Apr 8, 2026

Building Trading Bots in Python: From Strategy to Live Execution

Building Trading Bots in Python: From Strategy to Live Execution

We've been writing trading bots in MQL4 for over a decade, but Python? Python is where we go when we want to build something properly modular without fighting the language. Yes, the GIL means you're not doing nanosecond HFT in pure Python — if that's your game, go write C++. But for everything else? Connecting to exchanges, crunching data with pandas, running strategy logic on minute or hourly bars? Python destroys. It talks to every exchange, every data source, every ML library you'll ever need.

And unlike that 3,000-line god script we've all written at 2am — what we're building here has actual architecture. Five separate modules: config, data feed, risk manager, strategy engine, executor. Each one does exactly one job. We've seen too many Python bots that dump everything into a single file and then the developer wonders why they can't swap a strategy without rewriting half the codebase. Not this time.

Architecture Overview

┌─────────────────────────────────────────────┐
│                TRADING BOT                   │
│                                              │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  │
│  │   Data    │→ │ Strategy │→ │Execution │  │
│  │   Feed    │  │  Engine  │  │  Engine  │  │
│  └──────────┘  └──────────┘  └──────────┘  │
│       ↑              ↑              │        │
│  ┌──────────┐  ┌──────────┐       ↓        │
│  │  Config  │  │   Risk   │  ┌──────────┐  │
│  │          │  │ Manager  │  │  Logger  │  │
│  └──────────┘  └──────────┘  └──────────┘  │
└──────────────────────┬──────────────────────┘
                       │
                   Exchange API

Dependencies

pip install ccxt pandas numpy python-dotenv schedule

The Complete Bot

Configuration

# config.py
from dataclasses import dataclass
from dotenv import load_dotenv
import os

load_dotenv()

@dataclass
class BotConfig:
    # Exchange
    exchange_id: str = 'binance'
    api_key: str = os.getenv('API_KEY', '')
    api_secret: str = os.getenv('API_SECRET', '')

    # Strategy
    symbol: str = 'BTC/USDT'
    timeframe: str = '1h'
    fast_ema: int = 21
    slow_ema: int = 55
    atr_period: int = 14
    atr_sl_mult: float = 1.5
    tp_mult: float = 2.0

    # Risk
    risk_percent: float = 1.0
    max_positions: int = 1
    max_daily_trades: int = 5
    max_drawdown_pct: float = 10.0

    # Execution
    dry_run: bool = True    # Paper trading mode
    check_interval: int = 60  # Seconds between checks

Data Feed Module

# data_feed.py
import ccxt
import pandas as pd
import numpy as np
from datetime import datetime

class DataFeed:
    """Fetches and processes market data."""

    def __init__(self, config):
        exchange_class = getattr(ccxt, config.exchange_id)
        self.exchange = exchange_class({
            'apiKey': config.api_key,
            'secret': config.api_secret,
            'enableRateLimit': True,
        })
        self.config = config

    def fetch_ohlcv(self, limit=100):
        """Fetch OHLCV candles and return as DataFrame."""
        raw = self.exchange.fetch_ohlcv(
            self.config.symbol,
            self.config.timeframe,
            limit=limit,
        )
        df = pd.DataFrame(raw, columns=[
            'timestamp', 'open', 'high', 'low', 'close', 'volume'
        ])
        df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
        df.set_index('timestamp', inplace=True)
        return df

    def calculate_indicators(self, df):
        """Add strategy indicators to DataFrame."""
        # EMAs
        df['ema_fast'] = df['close'].ewm(
            span=self.config.fast_ema, adjust=False).mean()
        df['ema_slow'] = df['close'].ewm(
            span=self.config.slow_ema, adjust=False).mean()

        # ATR
        high_low = df['high'] - df['low']
        high_close = (df['high'] - df['close'].shift()).abs()
        low_close = (df['low'] - df['close'].shift()).abs()
        tr = pd.concat([high_low, high_close, low_close], axis=1).max(axis=1)
        df['atr'] = tr.rolling(window=self.config.atr_period).mean()

        # Crossover signals
        df['cross_up'] = ((df['ema_fast'] > df['ema_slow']) &
                          (df['ema_fast'].shift() <= df['ema_slow'].shift()))
        df['cross_down'] = ((df['ema_fast'] < df['ema_slow']) &
                            (df['ema_fast'].shift() >= df['ema_slow'].shift()))

        return df

    def get_ticker(self):
        """Get current bid/ask."""
        return self.exchange.fetch_ticker(self.config.symbol)

    def get_balance(self):
        """Get account balance."""
        balance = self.exchange.fetch_balance()
        return balance['total'].get('USDT', 0)

Risk Manager

# risk_manager.py
from datetime import datetime, date

class RiskManager:
    """Enforces risk rules before any trade execution."""

    def __init__(self, config):
        self.config = config
        self.daily_trades = 0
        self.last_reset_date = None
        self.peak_balance = 0
        self.initial_balance = None

    def reset_daily_counter(self):
        today = date.today()
        if self.last_reset_date != today:
            self.daily_trades = 0
            self.last_reset_date = today

    def update_peak_balance(self, current_balance):
        if self.initial_balance is None:
            self.initial_balance = current_balance
        if current_balance > self.peak_balance:
            self.peak_balance = current_balance

    def calculate_position_size(self, balance, entry_price, sl_price):
        """Risk-based position sizing."""
        risk_amount = balance * self.config.risk_percent / 100
        sl_distance = abs(entry_price - sl_price)

        if sl_distance == 0:
            return 0

        size = risk_amount / sl_distance
        return round(size, 6)

    def can_trade(self, current_balance, open_positions):
        """Run all risk checks. Returns (allowed, reason)."""
        self.reset_daily_counter()
        self.update_peak_balance(current_balance)

        # Check max positions
        if open_positions >= self.config.max_positions:
            return False, f"Max positions ({self.config.max_positions}) reached"

        # Check daily trade limit
        if self.daily_trades >= self.config.max_daily_trades:
            return False, f"Max daily trades ({self.config.max_daily_trades}) reached"

        # Check drawdown
        if self.peak_balance > 0:
            dd_pct = ((self.peak_balance - current_balance)
                      / self.peak_balance * 100)
            if dd_pct >= self.config.max_drawdown_pct:
                return False, f"Drawdown {dd_pct:.1f}% >= limit {self.config.max_drawdown_pct}%"

        return True, "OK"

    def record_trade(self):
        self.daily_trades += 1

Strategy Engine

# strategy.py
import logging

logger = logging.getLogger('bot.strategy')

class EMAStrategy:
    """EMA crossover strategy with ATR-based stops."""

    def __init__(self, config):
        self.config = config

    def generate_signal(self, df):
        """
        Analyze latest data and return signal.
        Returns: dict with 'action', 'sl', 'tp', or None
        """
        if len(df) < self.config.slow_ema + 5:
            logger.warning("Insufficient data for indicators")
            return None

        # Use the COMPLETED bar (index -2 = last closed bar)
        # Index -1 is the current forming bar
        last = df.iloc[-2]
        atr = last['atr']

        if pd.isna(atr) or atr <= 0:
            return None

        sl_dist = atr * self.config.atr_sl_mult
        tp_dist = sl_dist * self.config.tp_mult

        if last['cross_up']:
            return {
                'action': 'buy',
                'entry': last['close'],
                'sl': last['close'] - sl_dist,
                'tp': last['close'] + tp_dist,
                'atr': atr,
            }

        if last['cross_down']:
            return {
                'action': 'sell',
                'entry': last['close'],
                'sl': last['close'] + sl_dist,
                'tp': last['close'] - tp_dist,
                'atr': atr,
            }

        return None  # No signal

Execution Engine

# executor.py
import logging
import time

logger = logging.getLogger('bot.executor')

class Executor:
    """Handles order placement and position management."""

    def __init__(self, data_feed, risk_manager, config):
        self.feed = data_feed
        self.risk = risk_manager
        self.config = config

    def get_open_positions(self):
        """Get current open positions count."""
        try:
            positions = self.feed.exchange.fetch_positions([self.config.symbol])
            return sum(1 for p in positions
                       if abs(float(p.get('contracts', 0))) > 0)
        except Exception:
            # Spot exchanges don't support fetch_positions
            orders = self.feed.exchange.fetch_open_orders(self.config.symbol)
            return len(orders)

    def execute_signal(self, signal):
        """Execute a trading signal with full risk checks."""
        balance = self.feed.get_balance()
        open_pos = self.get_open_positions()

        # Risk check
        allowed, reason = self.risk.can_trade(balance, open_pos)
        if not allowed:
            logger.info(f"Trade blocked: {reason}")
            return None

        # Position sizing
        size = self.risk.calculate_position_size(
            balance, signal['entry'], signal['sl'])

        if size <= 0:
            logger.warning("Position size calculated as 0")
            return None

        logger.info(f"Signal: {signal['action'].upper()} "
                    f"size={size} SL={signal['sl']:.5f} TP={signal['tp']:.5f}")

        if self.config.dry_run:
            logger.info("[DRY RUN] Would place order — not executing")
            self.risk.record_trade()
            return {'dry_run': True, **signal, 'size': size}

        try:
            # Place market order
            if signal['action'] == 'buy':
                order = self.feed.exchange.create_market_buy_order(
                    self.config.symbol, size)
            else:
                order = self.feed.exchange.create_market_sell_order(
                    self.config.symbol, size)

            logger.info(f"Order placed: {order['id']}")
            self.risk.record_trade()

            # Place SL/TP (if exchange supports)
            self._place_sl_tp(signal, size)

            return order

        except Exception as e:
            logger.error(f"Order failed: {e}")
            return None

    def _place_sl_tp(self, signal, size):
        """Place stop-loss and take-profit orders."""
        try:
            if signal['action'] == 'buy':
                self.feed.exchange.create_order(
                    self.config.symbol, 'stop_market', 'sell',
                    size, None, {'stopPrice': signal['sl']})
                self.feed.exchange.create_order(
                    self.config.symbol, 'limit', 'sell',
                    size, signal['tp'])
            else:
                self.feed.exchange.create_order(
                    self.config.symbol, 'stop_market', 'buy',
                    size, None, {'stopPrice': signal['sl']})
                self.feed.exchange.create_order(
                    self.config.symbol, 'limit', 'buy',
                    size, signal['tp'])

            logger.info(f"SL/TP orders placed")
        except Exception as e:
            logger.warning(f"SL/TP placement failed: {e}")

Main Bot Loop

# bot.py
import logging
import time
import schedule
from config import BotConfig
from data_feed import DataFeed
from risk_manager import RiskManager
from strategy import EMAStrategy
from executor import Executor

# Logging setup
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s %(name)-15s %(levelname)-7s %(message)s',
    handlers=[
        logging.FileHandler('bot.log'),
        logging.StreamHandler(),
    ]
)
logger = logging.getLogger('bot')


class TradingBot:
    def __init__(self, config=None):
        self.config = config or BotConfig()
        self.feed = DataFeed(self.config)
        self.risk = RiskManager(self.config)
        self.strategy = EMAStrategy(self.config)
        self.executor = Executor(self.feed, self.risk, self.config)

    def tick(self):
        """One iteration of the bot's main loop."""
        try:
            # 1. Fetch data
            df = self.feed.fetch_ohlcv(limit=100)
            df = self.feed.calculate_indicators(df)

            # 2. Generate signal
            signal = self.strategy.generate_signal(df)

            if signal is None:
                logger.debug("No signal this bar")
                return

            # 3. Execute
            result = self.executor.execute_signal(signal)

            if result:
                logger.info(f"Trade executed: {result}")

        except Exception as e:
            logger.error(f"Bot error: {e}", exc_info=True)

    def run(self):
        """Start the bot with scheduled execution."""
        logger.info(f"Bot started — {self.config.symbol} "
                    f"{self.config.timeframe} "
                    f"{'[DRY RUN]' if self.config.dry_run else '[LIVE]'}")

        # Run immediately on start
        self.tick()

        # Then on schedule
        schedule.every(self.config.check_interval).seconds.do(self.tick)

        while True:
            schedule.run_pending()
            time.sleep(1)


if __name__ == '__main__':
    bot = TradingBot()
    bot.run()

Running the Bot

Paper Trading (Safe)

# .env file
API_KEY=your_api_key_here
API_SECRET=your_api_secret_here

# Run in dry-run mode (default)
python bot.py

Live Trading

# In config.py, change:
dry_run: bool = False   # LIVE MODE — real money

Production Checklist

We don't care how good your backtest looks. Before you flip dry_run to False, go through every single one of these:

CheckDone?
Tested in dry_run mode for 2+ weeks[ ]
Risk percent set to ≤ 1%[ ]
Max drawdown limit configured[ ]
Daily trade limit set[ ]
API keys are read-only (no withdrawal permission)[ ]
Bot running on VPS (not your laptop)[ ]
Logging to file enabled[ ]
Error alerts sent to phone/email[ ]
Exchange rate limits respected (enableRateLimit: True)[ ]
Tested manual position close while bot is running[ ]

Supported Exchanges via ccxt

This is the part that makes Python bots absurdly portable. ccxt wraps 100+ exchanges behind one unified API — swap one string and you're talking to a completely different exchange:

# Just change the exchange_id:
config.exchange_id = 'binance'      # Crypto
config.exchange_id = 'bybit'        # Crypto derivatives
config.exchange_id = 'kraken'       # Crypto
config.exchange_id = 'oanda'        # Forex (with oandapyV20)

For forex brokers that ccxt doesn't cover, you're looking at the broker's native REST API or a bridge like MetaApi. Honestly, for MT4/MT5 brokers we still just write the bot in MQL — right tool for the right job.

This architecture is what we ship to clients at Quantumcona — except with more strategies, more exchanges, proper monitoring, and cloud deployment that actually stays up at 3am when you're not watching. If you want a Python bot built right the first time instead of duct-taping scripts together for six months, we handle the whole stack.

Build My Python Bot →

This is Article 17 of 20 in the Algo Trading Masterclass by Quantumcona.

1of8