Skip to content

Testing Infrastructure Guide

Cracktrader provides comprehensive testing infrastructure with 89 test files covering unit, integration, and end-to-end testing. This guide covers testing strategies, fixtures, mocking policies, and best practices for testing trading strategies.

Testing Overview

Test Architecture

tests/
├── unit/                    # Isolated component tests (71 files)
│   ├── broker/             # Order management, cash handling
│   ├── feed/               # Data feed functionality
│   ├── store/              # Exchange connectivity
│   ├── cerebro/            # Strategy execution engine
│   ├── commission/         # Fee calculations
│   └── web/                # API and web interface
├── integration/            # Cross-component tests (12 files)
│   ├── data_pipeline/      # End-to-end data flow
│   ├── multi_exchange/     # Cross-exchange integration
│   └── web/                # Web API workflows
├── e2e/                    # End-to-end scenarios (6 files)
│   ├── binance_specific/   # Exchange-specific tests
│   ├── live_data/          # Real market data tests
│   └── strategy_execution/ # Complete strategy workflows
├── fixtures/               # Reusable test components
│   ├── fake_exchange.py    # Mock exchange implementation
│   ├── broker_factory.py   # Test broker creation
│   └── common.py           # Shared test utilities
└── conftest.py             # Global test configuration

Testing Philosophy

Behavior-Driven Testing: - Tests focus on user behavior, not implementation details - Clear test names describing business scenarios - Comprehensive edge case coverage

Isolation and Mocking: - Unit tests are completely isolated using mocks - Integration tests use minimal real dependencies - End-to-end tests can optionally use live exchanges

Production Readiness: - 2:1 test-to-source ratio (89 test files, 44 source files) - Comprehensive coverage of trading scenarios - Performance and stress testing included

Getting Started with Testing

Running Tests

# Run all tests
pytest

# Run specific test categories
pytest tests/unit/                    # Unit tests only
pytest tests/integration/             # Integration tests
pytest tests/e2e/                     # End-to-end tests

# Run tests with coverage
pytest --cov=cracktrader --cov-report=html

# Run tests with detailed output
pytest -v -s

# Run specific test file
pytest tests/unit/broker/test_broker_order_lifecycle.py

# Run specific test method
pytest tests/unit/broker/test_broker_order_lifecycle.py::TestOrderLifecycle::test_market_order_execution

Test Configuration

# conftest.py - Global test configuration
import pytest
from tests.fixtures.fake_exchange import FakeExchange
from tests.fixtures.broker_factory import create_test_broker

@pytest.fixture(scope="session")
def fake_exchange():
    """Shared fake exchange for all tests."""
    return FakeExchange()

@pytest.fixture
def test_broker():
    """Create isolated test broker."""
    return create_test_broker()

@pytest.fixture(autouse=True)
def setup_test_logging():
    """Configure logging for tests."""
    import logging
    logging.getLogger('cracktrader').setLevel(logging.WARNING)

Unit Testing

Testing Strategies

import unittest
from unittest.mock import Mock, patch, MagicMock
import backtrader as bt
from cracktrader import Cerebro
from tests.fixtures.fake_exchange import FakeExchange

class TestMyStrategy(unittest.TestCase):
    """Unit tests for strategy logic."""

    def setUp(self):
        """Set up test environment."""
        self.cerebro = Cerebro()
        self.strategy_class = MyStrategy

        # Create mock data
        self.mock_data = Mock()
        self.mock_data.close = Mock()
        self.mock_data.volume = Mock()

        # Create mock broker
        self.mock_broker = Mock()
        self.mock_broker.getvalue.return_value = 10000.0
        self.mock_broker.getcash.return_value = 5000.0

    def test_strategy_initialization(self):
        """Test strategy initializes correctly."""
        strategy = self.strategy_class()
        strategy.data = self.mock_data
        strategy.broker = self.mock_broker

        # Test parameter validation
        self.assertIsNotNone(strategy.params.period)
        self.assertGreater(strategy.params.period, 0)

    def test_buy_signal_generation(self):
        """Test buy signal logic."""
        strategy = self.strategy_class()
        strategy.data = self.mock_data

        # Mock indicator values for buy signal
        strategy.sma = Mock()
        strategy.sma.__getitem__ = Mock(return_value=100.0)
        self.mock_data.close.__getitem__ = Mock(return_value=105.0)

        # Test buy signal
        should_buy = strategy._should_buy()
        self.assertTrue(should_buy)

    def test_position_sizing(self):
        """Test position size calculation."""
        strategy = self.strategy_class()
        strategy.broker = self.mock_broker
        strategy.data = self.mock_data

        self.mock_data.close.__getitem__ = Mock(return_value=50000.0)

        position_size = strategy._calculate_position_size()

        # Verify position size is reasonable
        self.assertGreater(position_size, 0)
        self.assertLess(position_size, 1.0)  # Less than 100% of portfolio

    def test_risk_management(self):
        """Test risk management rules."""
        strategy = self.strategy_class()
        strategy.broker = self.mock_broker
        strategy.data = self.mock_data

        # Test stop loss
        strategy.buyprice = 50000.0
        self.mock_data.close.__getitem__ = Mock(return_value=47500.0)  # 5% loss

        should_stop = strategy._should_stop_loss()
        self.assertTrue(should_stop)

        # Test take profit
        self.mock_data.close.__getitem__ = Mock(return_value=57500.0)  # 15% gain

        should_take_profit = strategy._should_take_profit()
        self.assertTrue(should_take_profit)

    @patch('cracktrader.utils.log_trade')
    def test_trade_logging(self, mock_log_trade):
        """Test that trades are logged correctly."""
        strategy = self.strategy_class()

        # Simulate trade completion
        mock_trade = Mock()
        mock_trade.isclosed = True
        mock_trade.pnl = 500.0
        mock_trade.size = 0.1

        strategy.notify_trade(mock_trade)

        # Verify logging was called
        mock_log_trade.assert_called_once()

Testing Data Feeds

class TestCCXTDataFeed(unittest.TestCase):
    """Test CCXT data feed functionality."""

    def setUp(self):
        """Set up test environment."""
        self.fake_exchange = FakeExchange()
        self.store = Mock()
        self.store.exchange = self.fake_exchange

    def test_historical_data_loading(self):
        """Test loading historical OHLCV data."""
        from cracktrader.feeds import CCXTDataFeed

        feed = CCXTDataFeed(
            store=self.store,
            symbol='BTC/USDT',
            timeframe='1h',
            start_date='2024-01-01T00:00:00Z',
            end_date='2024-01-02T00:00:00Z'
        )

        # Mock data return
        self.fake_exchange.set_ohlcv_data([
            [1704067200000, 45000, 45500, 44500, 45200, 1000],  # Sample data
            [1704070800000, 45200, 45800, 45100, 45600, 1200],
        ])

        data = feed._load_data()

        # Verify data structure
        self.assertIsNotNone(data)
        self.assertEqual(len(data), 2)
        self.assertIn('open', data.columns)
        self.assertIn('high', data.columns)
        self.assertIn('low', data.columns)
        self.assertIn('close', data.columns)
        self.assertIn('volume', data.columns)

    def test_data_validation(self):
        """Test data validation and cleaning."""
        from cracktrader.feeds import CCXTDataFeed

        feed = CCXTDataFeed(store=self.store, symbol='BTC/USDT', timeframe='1h')

        # Test with invalid data
        invalid_data = [
            [1704067200000, 45000, 44500, 45500, 45200, 1000],  # high < low
            [1704070800000, 45200, 45800, 45100, 45600, -100],  # negative volume
        ]

        self.fake_exchange.set_ohlcv_data(invalid_data)

        # Should handle invalid data gracefully
        data = feed._load_data()
        self.assertIsNotNone(data)

    def test_timeframe_mapping(self):
        """Test timeframe conversion between CCXT and Backtrader."""
        from cracktrader.feeds.ccxt import CCXT_TO_BT_TIMEFRAME

        # Test common timeframes
        self.assertEqual(CCXT_TO_BT_TIMEFRAME['1m'], (bt.TimeFrame.Minutes, 1))
        self.assertEqual(CCXT_TO_BT_TIMEFRAME['1h'], (bt.TimeFrame.Minutes, 60))
        self.assertEqual(CCXT_TO_BT_TIMEFRAME['1d'], (bt.TimeFrame.Days, 1))

Testing Brokers

class TestCCXTBroker(unittest.TestCase):
    """Test broker order management."""

    def setUp(self):
        """Set up test broker."""
        from tests.fixtures.broker_factory import create_test_broker
        self.broker = create_test_broker(mode='test', initial_cash=10000.0)

    def test_market_order_execution(self):
        """Test market order execution."""
        # Create market buy order
        order = self.broker.buy(size=0.1)

        # Verify order created
        self.assertIsNotNone(order)
        self.assertEqual(order.exectype, bt.Order.Market)
        self.assertEqual(order.size, 0.1)

        # Simulate order execution
        self.broker._execute_order(order, price=50000.0)

        # Verify execution
        self.assertEqual(order.status, bt.Order.Completed)
        self.assertEqual(order.executed.price, 50000.0)
        self.assertEqual(order.executed.size, 0.1)

    def test_limit_order_execution(self):
        """Test limit order execution."""
        # Create limit buy order
        order = self.broker.buy(size=0.1, exectype=bt.Order.Limit, price=49000.0)

        # Test order doesn't execute above limit price
        self.broker._try_execute_order(order, current_price=50000.0)
        self.assertEqual(order.status, bt.Order.Accepted)

        # Test order executes at limit price
        self.broker._try_execute_order(order, current_price=48000.0)
        self.assertEqual(order.status, bt.Order.Completed)

    def test_stop_order_execution(self):
        """Test stop order execution."""
        # Create stop loss order
        order = self.broker.sell(size=0.1, exectype=bt.Order.Stop, price=48000.0)

        # Test order doesn't execute above stop price
        self.broker._try_execute_order(order, current_price=49000.0)
        self.assertEqual(order.status, bt.Order.Accepted)

        # Test order executes below stop price
        self.broker._try_execute_order(order, current_price=47000.0)
        self.assertEqual(order.status, bt.Order.Completed)

    def test_commission_calculation(self):
        """Test commission calculation."""
        # Set commission rate
        self.broker.setcommission(commission=0.001)  # 0.1%

        # Execute order
        order = self.broker.buy(size=0.1)
        self.broker._execute_order(order, price=50000.0)

        # Verify commission
        expected_commission = 0.1 * 50000.0 * 0.001  # size * price * rate
        self.assertAlmostEqual(order.executed.comm, expected_commission, places=2)

    def test_cash_and_value_tracking(self):
        """Test cash and portfolio value tracking."""
        initial_cash = self.broker.getcash()
        initial_value = self.broker.getvalue()

        # Execute buy order
        order = self.broker.buy(size=0.1)
        self.broker._execute_order(order, price=50000.0)

        # Verify cash reduction
        expected_cash = initial_cash - (0.1 * 50000.0) - order.executed.comm
        self.assertAlmostEqual(self.broker.getcash(), expected_cash, places=2)

        # Verify value remains approximately the same (minus commission)
        current_value = self.broker.getvalue()
        self.assertAlmostEqual(current_value, initial_value - order.executed.comm, places=2)

Integration Testing

Data Pipeline Integration

class TestDataPipelineIntegration(unittest.TestCase):
    """Test complete data pipeline integration."""

    def test_store_to_feed_integration(self):
        """Test data flow from store to feed to strategy."""
        from cracktrader import Cerebro, CCXTStore, CCXTDataFeed
        from tests.fixtures.fake_exchange import FakeExchange

        # Create fake exchange with test data
        fake_exchange = FakeExchange()
        fake_exchange.set_ohlcv_data([
            [1704067200000, 45000, 45500, 44500, 45200, 1000],
            [1704070800000, 45200, 45800, 45100, 45600, 1200],
            [1704074400000, 45600, 46100, 45300, 45900, 1500],
        ])

        # Create components
        store = CCXTStore(exchange='fake', exchange_instance=fake_exchange)
        feed = CCXTDataFeed(store=store, symbol='BTC/USDT', ccxt_timeframe='1h')

        # Test strategy that captures data
        class DataCaptureStrategy(bt.Strategy):
            def __init__(self):
                self.data_points = []

            def next(self):
                self.data_points.append({
                    'datetime': self.data.datetime.datetime(),
                    'open': self.data.open[0],
                    'high': self.data.high[0],
                    'low': self.data.low[0],
                    'close': self.data.close[0],
                    'volume': self.data.volume[0]
                })

        # Run integration
        cerebro = Cerebro()
        cerebro.adddata(feed)
        cerebro.addstrategy(DataCaptureStrategy)

        results = cerebro.run()
        strategy = results[0]

        # Verify data flow
        self.assertEqual(len(strategy.data_points), 3)
        self.assertEqual(strategy.data_points[0]['open'], 45000)
        self.assertEqual(strategy.data_points[-1]['close'], 45900)

    def test_multi_feed_integration(self):
        """Test multiple data feeds working together."""
        from cracktrader import Cerebro, CCXTStore, CCXTDataFeed
        from tests.fixtures.fake_exchange import FakeExchange

        # Create fake exchange
        fake_exchange = FakeExchange()

        # Set up different symbols
        btc_data = [[1704067200000, 45000, 45500, 44500, 45200, 1000]]
        eth_data = [[1704067200000, 2500, 2550, 2450, 2520, 500]]

        fake_exchange.set_symbol_data('BTC/USDT', btc_data)
        fake_exchange.set_symbol_data('ETH/USDT', eth_data)

        # Create components
        store = CCXTStore(exchange='fake', exchange_instance=fake_exchange)

        btc_feed = CCXTDataFeed(store=store, symbol='BTC/USDT', ccxt_timeframe='1h')
        eth_feed = CCXTDataFeed(store=store, symbol='ETH/USDT', ccxt_timeframe='1h')

        class MultiAssetStrategy(bt.Strategy):
            def next(self):
                btc_price = self.datas[0].close[0]
                eth_price = self.datas[1].close[0]

                # Simple ratio strategy
                ratio = btc_price / eth_price
                if ratio > 18:  # BTC expensive relative to ETH
                    self.sell(data=self.datas[0])  # Sell BTC
                    self.buy(data=self.datas[1])   # Buy ETH

        # Run integration
        cerebro = Cerebro()
        cerebro.adddata(btc_feed, name='BTC')
        cerebro.adddata(eth_feed, name='ETH')
        cerebro.addstrategy(MultiAssetStrategy)

        results = cerebro.run()
        self.assertIsNotNone(results)

Web API Integration

class TestWebAPIIntegration(unittest.TestCase):
    """Test web API integration workflows."""

    def setUp(self):
        """Set up test client."""
        from cracktrader.web.server.main import create_app
        from fastapi.testclient import TestClient

        app = create_app()
        self.client = TestClient(app)

    def test_complete_backtest_workflow(self):
        """Test complete backtest via API."""
        # 1. Check API health
        response = self.client.get("/api/v1/health")
        self.assertEqual(response.status_code, 200)

        # 2. List available strategies
        response = self.client.get("/api/v1/strategies")
        self.assertEqual(response.status_code, 200)
        strategies = response.json()
        self.assertIn("TestStrategy", strategies["strategies"])

        # 3. Start backtest
        backtest_request = {
            "strategy_name": "TestStrategy",
            "symbols": ["BTC/USDT"],
            "timeframe": "1h",
            "initial_cash": 10000.0
        }

        response = self.client.post("/api/v1/run/backtest", json=backtest_request)
        self.assertEqual(response.status_code, 200)

        result = response.json()
        run_id = result["run_id"]
        self.assertIsNotNone(run_id)

        # 4. Check status
        response = self.client.get("/api/v1/status")
        self.assertEqual(response.status_code, 200)

        status = response.json()
        self.assertEqual(status["run_id"], run_id)
        self.assertIn(status["status"], ["starting", "running", "completed"])

        # 5. Get results (may need to wait for completion)
        import time
        max_wait = 30  # seconds
        start_time = time.time()

        while time.time() - start_time < max_wait:
            response = self.client.get("/api/v1/status")
            status = response.json()

            if status["status"] == "completed":
                break

            time.sleep(1)

        # Get final results
        response = self.client.get("/api/v1/results")
        self.assertEqual(response.status_code, 200)

        results = response.json()
        self.assertEqual(results["run_id"], run_id)
        self.assertEqual(results["status"], "completed")

End-to-End Testing

Live Exchange Testing

class TestLiveExchangeIntegration(unittest.TestCase):
    """Test with real exchange connections (optional)."""

    def setUp(self):
        """Set up live exchange connection."""
        import os

        # Skip if no API credentials
        self.api_key = os.getenv('BINANCE_TESTNET_API_KEY')
        self.secret = os.getenv('BINANCE_TESTNET_SECRET')

        if not self.api_key or not self.secret:
            self.skipTest("Live exchange credentials not available")

    def test_live_data_feed(self):
        """Test live data feed with real exchange."""
        from cracktrader import CCXTStore, CCXTDataFeed

        store = CCXTStore(
            exchange_id='binance',
            config={
                'apiKey': self.api_key,
                'secret': self.secret,
                'sandbox': True  # Use testnet
            }
        )

        # Test market data access
        markets = store.exchange.load_markets()
        self.assertIn('BTC/USDT', markets)

        # Test historical data
        feed = CCXTDataFeed(
            store=store,
            symbol='BTC/USDT',
            timeframe='1h',
            start_date='2024-01-01T00:00:00Z',
            end_date='2024-01-02T00:00:00Z'
        )

        data = feed._load_data()
        self.assertIsNotNone(data)
        self.assertGreater(len(data), 0)

    def test_live_order_execution(self):
        """Test order execution on testnet."""
        from cracktrader import Cerebro, CCXTStore, CCXTDataFeed
        from cracktrader.broker import CCXTLiveBroker

        store = CCXTStore(
            exchange_id='binance',
            config={
                'apiKey': self.api_key,
                'secret': self.secret,
                'sandbox': True
            }
        )

        # Simple test strategy
        class TestOrderStrategy(bt.Strategy):
            def __init__(self):
                self.order_placed = False

            def next(self):
                if not self.order_placed and len(self.data) > 5:
                    # Place small test order
                    self.buy(size=0.001)  # Small BTC amount
                    self.order_placed = True

        # Create live trading setup
        cerebro = Cerebro()

        feed = CCXTDataFeed(store=store, symbol='BTC/USDT', timeframe='1m')
        cerebro.adddata(feed)

        broker = CCXTLiveBroker(store=store, mode='paper')
        cerebro.broker = broker

        cerebro.addstrategy(TestOrderStrategy)

        # Run for limited time
        import signal
        import time

        def timeout_handler(signum, frame):
            raise TimeoutError("Test timeout")

        signal.signal(signal.SIGALRM, timeout_handler)
        signal.alarm(30)  # 30 second timeout

        try:
            results = cerebro.run()
            self.assertIsNotNone(results)
        except TimeoutError:
            pass  # Expected timeout
        finally:
            signal.alarm(0)

Performance Testing

class TestPerformance(unittest.TestCase):
    """Test performance characteristics."""

    def test_backtest_performance(self):
        """Test backtest execution time."""
        import time
        from cracktrader import Cerebro, CCXTStore, CCXTDataFeed
        from tests.fixtures.fake_exchange import FakeExchange

        # Create large dataset
        fake_exchange = FakeExchange()
        large_dataset = []

        # Generate 1000 candles
        base_time = 1704067200000  # 2024-01-01
        for i in range(1000):
            timestamp = base_time + (i * 3600000)  # 1 hour intervals
            ohlcv = [timestamp, 45000, 45500, 44500, 45200, 1000]
            large_dataset.append(ohlcv)

        fake_exchange.set_ohlcv_data(large_dataset)

        # Set up backtest
        store = CCXTStore(exchange_id='fake', exchange=fake_exchange)
        feed = CCXTDataFeed(store=store, symbol='BTC/USDT', timeframe='1h')

        cerebro = Cerebro()
        cerebro.adddata(feed)
        cerebro.addstrategy(bt.strategies.BuyAndHold)

        # Measure execution time
        start_time = time.time()
        results = cerebro.run()
        execution_time = time.time() - start_time

        # Performance assertions
        self.assertLess(execution_time, 10.0)  # Should complete in under 10 seconds
        self.assertIsNotNone(results)

    def test_memory_usage(self):
        """Test memory usage during backtests."""
        import psutil
        import os

        process = psutil.Process(os.getpid())
        initial_memory = process.memory_info().rss / 1024 / 1024  # MB

        # Run memory-intensive backtest
        from cracktrader import Cerebro, CCXTStore, CCXTDataFeed
        from tests.fixtures.fake_exchange import FakeExchange

        # Large dataset with multiple feeds
        fake_exchange = FakeExchange()
        dataset = []
        for i in range(5000):  # 5000 candles
            timestamp = 1704067200000 + (i * 3600000)
            ohlcv = [timestamp, 45000, 45500, 44500, 45200, 1000]
            dataset.append(ohlcv)

        fake_exchange.set_ohlcv_data(dataset)

        cerebro = Cerebro()
        store = CCXTStore(exchange_id='fake', exchange=fake_exchange)

        # Add multiple feeds
        for symbol in ['BTC/USDT', 'ETH/USDT', 'ADA/USDT']:
            feed = CCXTDataFeed(store=store, symbol=symbol, timeframe='1h')
            cerebro.adddata(feed)

        cerebro.addstrategy(bt.strategies.BuyAndHold)

        # Run backtest
        results = cerebro.run()

        # Check memory usage
        final_memory = process.memory_info().rss / 1024 / 1024  # MB
        memory_increase = final_memory - initial_memory

        # Memory should not increase excessively (less than 500MB for this test)
        self.assertLess(memory_increase, 500)

Test Fixtures and Utilities

Fake Exchange Implementation

# tests/fixtures/fake_exchange.py
class FakeExchange:
    """Mock exchange for testing."""

    def __init__(self):
        self.id = 'fake'
        self.name = 'Fake Exchange'
        self.markets = {
            'BTC/USDT': {'symbol': 'BTC/USDT', 'base': 'BTC', 'quote': 'USDT'},
            'ETH/USDT': {'symbol': 'ETH/USDT', 'base': 'ETH', 'quote': 'USDT'},
        }
        self.timeframes = {'1m': '1m', '5m': '5m', '1h': '1h', '1d': '1d'}
        self._ohlcv_data = {}
        self._symbol_data = {}

        # Default realistic data
        self._set_default_data()

    def _set_default_data(self):
        """Set default realistic OHLCV data."""
        base_time = 1704067200000  # 2024-01-01

        for i, symbol in enumerate(['BTC/USDT', 'ETH/USDT']):
            data = []
            base_price = 45000 if 'BTC' in symbol else 2500

            for j in range(100):  # 100 candles
                timestamp = base_time + (j * 3600000)  # 1 hour intervals

                # Generate realistic OHLCV
                open_price = base_price + (j * 10) + (i * 1000)
                high_price = open_price + 200
                low_price = open_price - 150
                close_price = open_price + 50
                volume = 1000 + (j * 10)

                data.append([timestamp, open_price, high_price, low_price, close_price, volume])

            self._symbol_data[symbol] = data

    def set_ohlcv_data(self, data):
        """Set OHLCV data for default symbol."""
        self._ohlcv_data['BTC/USDT'] = data

    def set_symbol_data(self, symbol, data):
        """Set OHLCV data for specific symbol."""
        self._symbol_data[symbol] = data

    def load_markets(self):
        """Return available markets."""
        return self.markets

    def fetch_ohlcv(self, symbol, timeframe, since=None, limit=None):
        """Fetch OHLCV data."""
        # Check specific symbol data first
        if symbol in self._symbol_data:
            data = self._symbol_data[symbol]
        elif symbol in self._ohlcv_data:
            data = self._ohlcv_data[symbol]
        else:
            # Return default data
            data = self._symbol_data.get('BTC/USDT', [])

        # Apply time filtering
        if since:
            data = [candle for candle in data if candle[0] >= since]

        # Apply limit
        if limit:
            data = data[:limit]

        return data

    def create_order(self, symbol, type, side, amount, price=None):
        """Create mock order."""
        import uuid

        order = {
            'id': str(uuid.uuid4()),
            'symbol': symbol,
            'type': type,
            'side': side,
            'amount': amount,
            'price': price,
            'status': 'open',
            'filled': 0,
            'remaining': amount,
            'timestamp': 1704067200000,
        }

        # Simulate immediate execution for market orders
        if type == 'market':
            order['status'] = 'closed'
            order['filled'] = amount
            order['remaining'] = 0

            # Use realistic prices
            if 'BTC' in symbol:
                order['price'] = 45000
            elif 'ETH' in symbol:
                order['price'] = 2500

        return order

    def fetch_balance(self):
        """Return mock balance."""
        return {
            'USDT': {'free': 10000, 'used': 0, 'total': 10000},
            'BTC': {'free': 0, 'used': 0, 'total': 0},
            'ETH': {'free': 0, 'used': 0, 'total': 0},
        }

Test Data Generators

# tests/fixtures/data_generators.py
import pandas as pd
import numpy as np
from datetime import datetime, timedelta

def generate_realistic_ohlcv(
    symbol='BTC/USDT',
    start_date='2024-01-01',
    end_date='2024-01-31',
    timeframe='1h',
    initial_price=45000,
    volatility=0.02
):
    """Generate realistic OHLCV data for testing."""

    # Create date range
    freq_map = {'1m': '1T', '5m': '5T', '1h': '1H', '1d': '1D'}
    freq = freq_map.get(timeframe, '1H')

    dates = pd.date_range(start=start_date, end=end_date, freq=freq)

    # Generate price series using random walk
    returns = np.random.normal(0, volatility, len(dates))
    prices = [initial_price]

    for ret in returns[1:]:
        new_price = prices[-1] * (1 + ret)
        prices.append(max(new_price, 1))  # Prevent negative prices

    # Generate OHLCV data
    ohlcv_data = []

    for i, (date, price) in enumerate(zip(dates, prices)):
        timestamp = int(date.timestamp() * 1000)

        # Generate OHLC from close price
        close = price
        open_price = prices[i-1] if i > 0 else price

        # Add some intrabar movement
        high = max(open_price, close) * (1 + np.random.uniform(0, 0.01))
        low = min(open_price, close) * (1 - np.random.uniform(0, 0.01))

        # Generate volume (correlated with volatility)
        base_volume = 1000
        volume_multiplier = 1 + abs(returns[i]) * 10
        volume = base_volume * volume_multiplier

        ohlcv_data.append([timestamp, open_price, high, low, close, volume])

    return ohlcv_data

def generate_market_scenarios():
    """Generate different market scenarios for testing."""
    scenarios = {}

    # Bull market
    scenarios['bull_market'] = generate_realistic_ohlcv(
        initial_price=45000,
        volatility=0.015,
        start_date='2024-01-01',
        end_date='2024-03-01'
    )

    # Bear market
    scenarios['bear_market'] = generate_realistic_ohlcv(
        initial_price=45000,
        volatility=0.025,
        start_date='2024-01-01',
        end_date='2024-03-01'
    )

    # Sideways market
    scenarios['sideways_market'] = generate_realistic_ohlcv(
        initial_price=45000,
        volatility=0.01,
        start_date='2024-01-01',
        end_date='2024-03-01'
    )

    # High volatility
    scenarios['high_volatility'] = generate_realistic_ohlcv(
        initial_price=45000,
        volatility=0.05,
        start_date='2024-01-01',
        end_date='2024-03-01'
    )

    return scenarios

Best Practices

Test Organization

# Organize tests by behavior, not implementation
class TestOrderManagement:
    """Test order management behavior."""

    def test_market_order_immediate_execution(self):
        """Market orders should execute immediately at current price."""
        pass

    def test_limit_order_price_improvement(self):
        """Limit orders should execute when price improves."""
        pass

    def test_stop_order_activation(self):
        """Stop orders should activate when stop price is hit."""
        pass

class TestRiskManagement:
    """Test risk management behavior."""

    def test_position_size_limits(self):
        """Position sizes should respect maximum limits."""
        pass

    def test_stop_loss_execution(self):
        """Stop losses should execute when triggered."""
        pass

class TestDataIntegrity:
    """Test data quality and integrity."""

    def test_no_missing_timestamps(self):
        """Data feeds should not have missing timestamps."""
        pass

    def test_valid_ohlc_relationships(self):
        """OHLC data should maintain valid relationships."""
        pass

Mocking Strategy

# Mock external dependencies, not internal logic
class TestStrategy:
    @patch('cracktrader.store.CCXTStore')  # Mock external exchange
    def test_strategy_with_mocked_exchange(self, mock_store):
        # Set up mock behavior
        mock_store.return_value.fetch_ohlcv.return_value = [
            [1704067200000, 45000, 45500, 44500, 45200, 1000]
        ]

        # Test strategy behavior
        strategy = MyStrategy()
        # ... test logic

    def test_strategy_decision_logic(self):
        # Test strategy logic directly without mocking
        strategy = MyStrategy()
        strategy.sma = Mock(return_value=45000)
        strategy.data.close = Mock(return_value=45500)

        result = strategy._should_buy()
        self.assertTrue(result)

Performance Testing

def test_backtest_performance():
    """Test backtest performance characteristics."""
    import time
    import memory_profiler

    @memory_profiler.profile
    def run_backtest():
        cerebro = Cerebro()
        # ... setup backtest
        return cerebro.run()

    # Measure time
    start_time = time.time()
    results = run_backtest()
    execution_time = time.time() - start_time

    # Assert performance requirements
    assert execution_time < 30.0  # Should complete in under 30 seconds
    assert results is not None

Continuous Integration

GitHub Actions Configuration

# .github/workflows/test.yml
name: Test Suite

on: [push, pull_request]

jobs:
  test:
    runs-on: ubuntu-latest
    strategy:
      matrix:
        python-version: [3.9, 3.10, 3.11, 3.12]

    steps:
    - uses: actions/checkout@v3

    - name: Set up Python ${{ matrix.python-version }}
      uses: actions/setup-python@v4
      with:
        python-version: ${{ matrix.python-version }}

    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -e .'[dev]'

    - name: Run unit tests
      run: |
        pytest tests/unit/ -v --cov=cracktrader --cov-report=xml

    - name: Run integration tests
      run: |
        pytest tests/integration/ -v

    - name: Upload coverage
      uses: codecov/codecov-action@v3
      with:
        file: ./coverage.xml

    - name: Run end-to-end tests (optional)
      if: ${{ github.event_name == 'push' && github.ref == 'refs/heads/main' }}
      env:
        BINANCE_TESTNET_API_KEY: ${{ secrets.BINANCE_TESTNET_API_KEY }}
        BINANCE_TESTNET_SECRET: ${{ secrets.BINANCE_TESTNET_SECRET }}
      run: |
        pytest tests/e2e/ -v --tb=short

Coverage Requirements

# pyproject.toml
[tool.coverage.run]
source = ["src/cracktrader"]
omit = [
    "*/tests/*",
    "*/test_*",
    "*/conftest.py",
]

[tool.coverage.report]
exclude_lines = [
    "pragma: no cover",
    "def __repr__",
    "raise AssertionError",
    "raise NotImplementedError",
]

[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
addopts = [
    "--strict-markers",
    "--strict-config",
    "--cov-fail-under=80",
]

Next Steps: - Strategy Development - Writing testable strategies - Broker Integration - Testing order execution - Advanced Configuration - Performance testing and optimization