Source code for base.models

import json
import logging
import sys
from concurrent.futures import ThreadPoolExecutor
from datetime import timezone as tz
from decimal import Decimal

import pandas as pd
from asgiref.sync import async_to_sync
from binance.spot import Spot
from celery import chord
from channels.layers import get_channel_layer
from django.conf import settings
from django.contrib.auth.models import AbstractUser
from django.core.cache import cache
from django.core.exceptions import ValidationError
from django.core.validators import MinValueValidator
from django.db import models, transaction
from django.template.loader import render_to_string
from django.urls import reverse
from django.utils import timezone
from django.utils.module_loading import import_string
from django.utils.safestring import mark_safe
from django_ai.supervised_learning.models import HGBTreeRegressor, OneClassSVC
from encore.concurrent.futures.synchronous import SynchronousExecutor
from requests.adapters import HTTPAdapter
from sklearn.model_selection import GridSearchCV

from .client import TraderoClient
from .exceptions import BotQuotaExceeded
from .indicators import get_indicators
from .strategies import get_strategies
from .utils import datetime_minutes_rounder

channel_layer = get_channel_layer()
logger = logging.getLogger(__name__)


def thread_pool_executor(
    threads=settings.EXECUTOR_THREADS,
):  # pragma: no cover
    if settings.SYNC_EXECUTION:
        # Meant to be used in testing
        return SynchronousExecutor()
    else:
        return ThreadPoolExecutor(max_workers=threads)


class User(AbstractUser):
    """
    Custom User Model for tradero
    """

    _client = None

    api_key = models.CharField(
        "Binance's API key", max_length=255, blank=True, null=True
    )
    api_secret = models.CharField(
        "Binance's API key secret", max_length=255, blank=True, null=True
    )
    trading_active = models.BooleanField("Is Trading Active?", default=True)
    checkpoint = models.DateTimeField(
        "Checkpoint",
        blank=True,
        null=True,
    )
    bot_quota = models.PositiveIntegerField(
        "Bots' Quota",
        default=settings.BOT_USER_QUOTA,
        help_text="Maximum Bots for the User (0 for No Quota)",
    )
    others = models.JSONField("Others", default=dict)

    class Meta:
        verbose_name = "User"
        verbose_name_plural = "Users"

    def get_client(self, reinit=False):  # pragma: no cover
        if not self._client or reinit:
            self._client = TraderoClient(self)
        return self._client

    @property
    def trade_summary(self):
        return TradeHistory.summary_for_object(self)

    @property
    def bot_status(self):
        return TraderoBot.status_summary(self.bots.all())

    @property
    def bot_count(self):
        return self.bots.all().count()

    @property
    def valuation_current(self):
        return TraderoBot.aggregate_valuation(self.bots.all(), "current")

    @property
    def valuation_initial(self):
        return TraderoBot.aggregate_valuation(self.bots.all(), "initial")

    def save(self, *args, **kwargs):
        created = True if not self.pk else False
        super().save(*args, **kwargs)
        if created:
            TraderoBotGroup.objects.create(user=self, name="Golfinhos")
            TraderoBotGroup.objects.create(user=self, name="Galinhas")


class WSClient(models.Model):
    channel_group = models.CharField(max_length=256)
    channel_name = models.CharField(max_length=256)
    time_connect = models.DateTimeField("Time - Connection", auto_now_add=True)
    time_disconnect = models.DateTimeField(
        "Time - Disconnection",
        blank=True,
        null=True,
    )

    class Meta:
        verbose_name = "WS Client"
        verbose_name_plural = "WS Clients"

    def __str__(self):
        return self.channel_name

    @property
    def is_open(self):
        return not self.time_disconnect


class SymbolManager(models.Manager):
    def available(self):
        return self.filter(
            status="TRADING",
            is_enabled=True,
        )

    def top_symbols(self, n=settings.SYMBOLS_QUANTITY):
        qs1 = self.filter(
            status="TRADING",
            is_enabled=True,
            model_score__gte=settings.MODEL_SCORE_THRESHOLD,
            volume_quote_asset__gte=settings.MARKET_SIZE_THRESHOLD,
        ).prefetch_related(
            models.Prefetch(
                "bots",
                queryset=TraderoBot.objects.enabled(),
                to_attr="bots_prefetched",
            ),
        )
        qs2 = self.filter(
            bots__status__gt=TraderoBot.Status.INACTIVE,
            bots__user__trading_active=True,
        ).prefetch_related(
            models.Prefetch(
                "bots",
                queryset=TraderoBot.objects.enabled(),
                to_attr="bots_prefetched",
            ),
        )
        return qs1.order_by("-model_score")[:n].union(qs2)

    def all_top_symbols(self, n=settings.SYMBOLS_QUANTITY):
        qs1 = self.filter(
            status="TRADING",
            is_enabled=True,
            volume_quote_asset__gte=settings.MARKET_SIZE_THRESHOLD,
        ).prefetch_related(
            models.Prefetch(
                "bots",
                queryset=TraderoBot.objects.enabled(),
                to_attr="bots_prefetched",
            ),
        )
        qs2 = self.filter(
            bots__status__gt=TraderoBot.Status.INACTIVE,
            bots__user__trading_active=True,
        ).prefetch_related(
            models.Prefetch(
                "bots",
                queryset=TraderoBot.objects.enabled(),
                to_attr="bots_prefetched",
            ),
        )
        return qs1.order_by("-model_score")[:n].union(qs2)


[docs]class Symbol(models.Model): _indicators = None _prediction_model_class = None _outliers_model_class = None _last_td = None _serializer_class = None _client = None symbol = models.CharField("Symbol", max_length=20) status = models.CharField("Status", max_length=20) is_enabled = models.BooleanField("Is Enabled?", default=True) base_asset = models.CharField("Base Asset", max_length=20) quote_asset = models.CharField("Quote Asset", max_length=20) volume_quote_asset = models.DecimalField( "Quote Asset Volume (24h)", max_digits=40, decimal_places=8, blank=True, null=True, ) last_value = models.DecimalField( "Last Value", max_digits=16, decimal_places=8, blank=True, null=True, ) last_value_time = models.DateTimeField( "Last Value (time)", blank=True, null=True, ) last_variation = models.DecimalField( "Last Variation", max_digits=16, decimal_places=8, blank=True, null=True, ) last_variation_24h = models.DecimalField( "Last Variation - 24 hs", max_digits=16, decimal_places=8, blank=True, null=True, ) variation_range_24h = models.DecimalField( "Variation Range - 24 hs", max_digits=16, decimal_places=8, blank=True, null=True, ) model_score = models.DecimalField( "Model Score", max_digits=16, decimal_places=8, blank=True, null=True, ) prediction_time_interval = models.CharField( "Prediction (time interval)", max_length=2, blank=True, null=True, ) prediction_time = models.DateTimeField( "Prediction (time)", blank=True, null=True, ) prediction_value = models.DecimalField( "Prediction (value)", max_digits=16, decimal_places=8, blank=True, null=True, ) prediction_variation = models.DecimalField( "Prediction (variation)", max_digits=16, decimal_places=8, blank=True, null=True, ) others = models.JSONField("Others", default=dict) info = models.JSONField("Symmbol's Binance's Info", default=dict) last_updated = models.DateTimeField( "Last Updated (timestamp)", blank=True, null=True, auto_now=True, ) objects = SymbolManager() class Meta: verbose_name = "Symbol" verbose_name_plural = "Symbols" ordering = ["symbol"] def __str__(self): return self.symbol @property def step_size(self): return Decimal(self.info["filters"][1]["stepSize"]) @classmethod def load_all_data( cls, start_time=None, end_time=None, threads=settings.EXECUTOR_THREADS ): symbols = cls.objects.available() with thread_pool_executor(threads) as executor: for symbol in symbols: executor.submit( symbol.load_data(start_time=start_time, end_time=end_time) ) logger.warning("-> Data loading complete <-") @classmethod def load_all_data_sync(cls): # pragma: no cover """ For debugging purposes """ symbols = cls.objects.all() for s in symbols: s.load_data() @classmethod def get_client(cls): # pragma: no cover if not cls._client: cls._client = Spot() cls._client.session.mount("https://", HTTPAdapter(pool_maxsize=36)) return cls._client def load_data(self, start_time=None, end_time=None): Kline.load_klines(self, start_time=start_time, end_time=end_time) td = TrainingData.from_klines(self) logger.warning(f"{self}: Data loaded") return td def get_prediction_model(self): if not self._prediction_model_class: self._prediction_model_class = import_string( settings.PREDICTION_MODEL_CLASS ) r, c = self._prediction_model_class.objects.get_or_create( symbol=self, name=self.symbol, data_model="base.TrainingData", ) return r def get_indicators(self): if not self._indicators: self._indicators = get_indicators() return self._indicators def get_serializer_class(self): if not self._serializer_class: # pragma: no cover self._serializer_class = import_string(settings.SERIALIZER_CLASS) return self._serializer_class def get_outlier_classifiers(self): if not self._outliers_model_class: self._outliers_model_class = import_string( settings.OUTLIERS_MODEL_CLASS ) o_m = self._outliers_model_class o1, _ = o_m.objects.get_or_create( symbol=self, name=f"{self.symbol}_o1", data_model="base.TrainingData", window=1, defaults={ o_m.OUTLIERS_PROPORTION_PARAMETER: settings.OUTLIERS_THRESHOLD, }, ) o2, _ = o_m.objects.get_or_create( symbol=self, name=f"{self.symbol}_o2", data_model="base.TrainingData", window=2, defaults={ o_m.OUTLIERS_PROPORTION_PARAMETER: settings.OUTLIERS_THRESHOLD, }, ) o3, _ = o_m.objects.get_or_create( symbol=self, name=f"{self.symbol}_o3", data_model="base.TrainingData", window=3, defaults={ o_m.OUTLIERS_PROPORTION_PARAMETER: settings.OUTLIERS_THRESHOLD, }, ) return [o1, o2, o3] def update_and_classify_outliers(self): os = self.get_outlier_classifiers() os_res = {} for o in os: o.perform_inference() os_res[f"o{o.window}"] = o.classify_current() return os_res def update_prediction_model(self): predictor = self.get_prediction_model() predictor.perform_inference() self.model_score = predictor.metadata["inference"]["current"][ "scores" ]["r2"] return predictor def update_indicators( self, push=True, last_td=None, bot_early_notification=False ): self._last_td = ( last_td or self.training_data.first() # negative ordering) ) klines_24h = self.klines.order_by("time_close").filter( time_close__gte=timezone.now() - timezone.timedelta(days=1) ) self.volume_quote_asset = klines_24h.aggregate( vol=models.Sum("volume_quote_asset") )["vol"] self.last_variation = self._last_td.variation if klines_24h: # pragma: no cover self.last_variation_24h = ( self._last_td.price_close / klines_24h[0].price_close * 100 ) - 100 ceil_floor = klines_24h.aggregate( models.Min("price_low", default=1), models.Max("price_high", default=1), ) self.variation_range_24h = ( ceil_floor["price_high__max"] / ceil_floor["price_low__min"] * 100 ) - 100 self.last_value = self._last_td.price_close self.last_value_time = self._last_td.time + timezone.timedelta( seconds=1 ) self.others["last_3"] = [ float(d) for d in [ getattr(self._last_td, f"variation_{i:02d}") for i in range(1, 4) ] ] if not self.model_score: # There should always be a model score for the symbol, even if # prediction is not enabled predictor = self.update_prediction_model() if settings.PREDICTION_ENABLED: predictor = self.update_prediction_model() self.prediction_variation = Decimal(predictor.predict_next()) self.prediction_value = self.last_value * ( Decimal("1") + self.prediction_variation / 100 ) self.prediction_time_interval = settings.TIME_INTERVAL # TODO: Better parametrize the prediction time self.prediction_time = self._last_td.time + timezone.timedelta( minutes=settings.TIME_INTERVAL, seconds=1 ) if settings.OUTLIERS_ENABLED: self.others["outliers"] = self.update_and_classify_outliers() for indicator in self.get_indicators(): self.others[indicator] = self.get_indicators()[indicator]( self ).calculate() self.render_html_snippet(set_cache=True) self.render_json_snippet(set_cache=True) self.save() # Early bot notification if available if bot_early_notification: # pragma: no cover if getattr(self, "bots_prefetched", []): price = self.get_client().ticker_price(symbol=self.symbol)[ "price" ] # Re-fetch to avoid errors for bot in self.bots.enabled(): bot.price_current = Decimal(price) bot.symbol = self bot.decide() if push: async_to_sync(self.push_to_ws)() # Use warning to make sure it goes logger.warning(f"{self}: Indicators Updated.") async def push_to_ws(self): await channel_layer.group_send( "symbols_html", { "type": "symbol.html.message", "message": { "type": "symbol_update", "symbol": self.symbol, "text": self.render_html_snippet(), }, }, ) await channel_layer.group_send( "symbols_json", { "type": "symbol.json.message", "message": self.render_json_snippet(), }, ) def render_html_snippet(self, set_cache=False): cache_key = f"{self.symbol}_html" html = cache.get(cache_key) if not html or set_cache: html = render_to_string( "base/symbol_snippet.html", { "symbol": self, "settings": { "PREDICTION_ENABLED": settings.PREDICTION_ENABLED, "OUTLIERS_ENABLED": settings.OUTLIERS_ENABLED, }, }, ) cache.set(cache_key, html, settings.TIME_INTERVAL * 60 + 9) return html def render_json_snippet(self, set_cache=False): cache_key = f"{self.symbol}_json" j = cache.get(cache_key) if not j or set_cache: serializer = self.get_serializer_class() j = json.dumps( serializer().to_representation(self, set_cache=True) ) cache.set(cache_key, j, settings.TIME_INTERVAL * 60 + 9) return j def retrieve_and_update(self, push=False): cache_key = ( f"{settings.SYMBOLS_UPDATE_ALL_INDICATORS_KEY}_{self.symbol}" ) if not cache.get(cache_key, False) or "pytest" in sys.modules: cache.set(cache_key, True, 300) tds = self.load_data() last_td = tds[0] if tds else None self.update_indicators(push=push, last_td=last_td) cache.set(cache_key, False) message = f"{self.symbol}: Retrieved and Updated" else: # pragma: no cover message = f"Other process updating {self.symbol} is running, please wait." logger.warning(message) return message @classmethod def datarotate(cls): date_threshold = timezone.now() - timezone.timedelta( minutes=settings.TIME_INTERVAL * settings.CLEANING_WINDOW ) k = Kline.objects.filter(time_close__lt=date_threshold).delete() td = TrainingData.objects.filter(time__lt=date_threshold).delete() message = f"SYMBOLS DATAROTATE: {k[0]} Klines and {td[0]} cleaned " logger.warning(message) return message @classmethod def update_all_indicators( cls, only_top=False, push=True, model_score_threshold=settings.MODEL_SCORE_THRESHOLD, threads=settings.EXECUTOR_THREADS, ): if only_top: symbols = cls.objects.all_top_symbols() else: symbols = cls.objects.available() if settings.USE_TASKS: from base.tasks import ( retrieve_and_update_symbol, symbols_datarotate, ) header = [ retrieve_and_update_symbol.s(s.pk, push=push) for s in symbols ] callback = symbols_datarotate.si() chord(header, callback).apply_async() message = f"UPDATE ALL SYMBOLS' INDICATORS: {len(header)} Tasks submitted" else: cache_key = settings.SYMBOLS_UPDATE_ALL_INDICATORS_KEY if not cache.get(cache_key, False) or "pytest" in sys.modules: cache.set(cache_key, True, 2400) timestamp = timezone.now() with thread_pool_executor(threads) as executor: for symbol in symbols: executor.submit(symbol.retrieve_and_update, push=push) logger.warning( f"-> UPDATE ALL PREDICTIONS DONE (MST: {model_score_threshold}) <-" ) cls.datarotate() message = ( f"---> Elapsed Time: " f"{ (timezone.now() - timestamp).total_seconds() } " f"({len(symbols)} Symbols) <----" ) cache.set(cache_key, False) else: # pragma: no cover message = ( "Other process updating all indicators is running, please " "wait." ) logger.warning(message) return message @classmethod def general_warm_up( cls, n_periods=10, symbols=None, threads=settings.EXECUTOR_THREADS, ): now = timezone.now() for i in range(n_periods, 0, -1): logger.info(f"Period: {i}") start_time = now - timezone.timedelta( minutes=i * settings.TIME_INTERVAL * 1000 ) start_time = start_time.replace( minute=( (start_time.minute // settings.TIME_INTERVAL) * settings.TIME_INTERVAL ), second=0, microsecond=0, ) end_time = ( start_time + timezone.timedelta(minutes=settings.TIME_INTERVAL * 1000) - timezone.timedelta(seconds=1) ) Kline.load_all_klines(start_time=start_time, end_time=end_time) symbols = symbols or cls.objects.available() with thread_pool_executor(threads) as executor: for symbol in symbols: executor.submit(TrainingData.from_klines, symbol) logger.warning("-> WARMING UP COMPLETE <-") @classmethod def calibrate_all_windows( cls, symbols=None, threads=settings.EXECUTOR_THREADS, ): symbols = symbols or cls.objects.available() with thread_pool_executor(threads) as executor: for symbol in symbols: pm = symbol.get_prediction_model() executor.submit(pm.calibrate_window) logger.warning("-> ALL WINDOWS CALIBRATION COMPLETE <-") @classmethod def calibrate_all_models( cls, symbols=None, only_top=False, threads=settings.EXECUTOR_THREADS, ): symbols = symbols or cls.objects.available() with thread_pool_executor(threads) as executor: for symbol in symbols: pm = symbol.get_prediction_model() executor.submit(pm.calibrate_model) logger.warning("-> ALL MODEL CALIBRATION COMPLETE <-") @classmethod def reset_symbols( cls, symbols=None, ): symbols = symbols or cls.objects.available() for symbol in symbols: symbol.training_data.all().delete() symbol.klines.all().delete() logger.warning(f"{symbol}: RESET") @classmethod def load_symbols(cls): cache_key = settings.SYMBOLS_UPDATE_ALL_INDICATORS_KEY if not cache.get(cache_key, False) or "pytest" in sys.modules: cache.set(cache_key, True, 2400) client = cls.get_client() ei = client.exchange_info() symbols_processed, symbols_created = 0, 0 for symbol in ei["symbols"]: if symbol["symbol"].endswith(settings.QUOTE_ASSET): s, c = cls.objects.update_or_create( symbol=symbol["symbol"], defaults={ "status": symbol["status"], "base_asset": symbol["baseAsset"], "quote_asset": symbol["quoteAsset"], "info": symbol, }, ) symbols_processed += 1 if c: symbols_created += 1 message = ( f"Successfully processed {symbols_processed} symbols " f"({symbols_created} created)" ) cache.set(cache_key, False) else: # pragma: no cover message = ( "Other process updating all indicators is running, please " "wait." ) logger.warning(message) return message
[docs]class Kline(models.Model): _client = None symbol = models.ForeignKey( Symbol, verbose_name="Symbol", related_name="klines", on_delete=models.PROTECT, ) time_open = models.DateTimeField("Open Time") time_close = models.DateTimeField("Close Time") time_interval = models.CharField("Time Interval", max_length=2) price_open = models.DecimalField( "Price Open", max_digits=40, decimal_places=8 ) price_high = models.DecimalField( "Price High", max_digits=40, decimal_places=8 ) price_low = models.DecimalField( "Price Low", max_digits=40, decimal_places=8 ) price_close = models.DecimalField( "Price Close", max_digits=40, decimal_places=8 ) volume = models.DecimalField("Volume", max_digits=40, decimal_places=8) volume_quote_asset = models.DecimalField( "Quote Asset Volume", max_digits=40, decimal_places=8 ) volume_tb_base_asset = models.DecimalField( "Taker buy base asset volume", max_digits=40, decimal_places=8 ) volume_tb_quote_asset = models.DecimalField( "Taker buy quote asset volume Price", max_digits=40, decimal_places=8 ) number_of_trades = models.DecimalField( "Number of Trades", max_digits=40, decimal_places=8 ) variation = models.DecimalField( "Variation", max_digits=40, decimal_places=8 ) variation_range = models.DecimalField( "Variation Range", max_digits=40, decimal_places=8 ) class Meta: verbose_name = "Kline" verbose_name_plural = "Klines" ordering = ["-time_close"] # Descending ordering constraints = [ models.UniqueConstraint( fields=[ "symbol", "time_close", "time_interval", ], # leverage index on time_close name="klines_unique_symbol_time", ) ] def __str__(self): f_str = ( f"[{self.symbol}] {self.time_open.strftime('%Y-%m-%d')} | " f"{self.time_open.strftime('%H:%M:%S')} - " f"{self.time_close.strftime('%H:%M:%S')} | {self.price_open} - " f"{self.price_close} | {self.variation}%" ) return f_str @classmethod def get_client(cls): if not cls._client: cls._client = Spot() cls._client.session.mount("https://", HTTPAdapter(pool_maxsize=36)) return cls._client @classmethod def from_binance_kline(cls, symbol, time_interval, b_kline): """ Binance's response: [ 1499040000000, // Kline open time "0.01634790", // Open price "0.80000000", // High price "0.01575800", // Low price "0.01577100", // Close price "148976.11427815", // Volume 1499644799999, // Kline Close time "2434.19055334", // Quote asset volume 308, // Number of trades "1756.87402397", // Taker buy base asset volume "28.46694368", // Taker buy quote asset volume "0" // Unused field, ignore. ] """ kline = cls( symbol=symbol, time_open=timezone.datetime.fromtimestamp( b_kline[0] // 1000, tz=tz.utc ), time_close=timezone.datetime.fromtimestamp( b_kline[6] // 1000, tz=tz.utc ), time_interval=time_interval, price_open=b_kline[1], price_high=b_kline[2], price_low=b_kline[3], price_close=b_kline[4], volume=b_kline[5], volume_quote_asset=b_kline[7], number_of_trades=b_kline[8], volume_tb_base_asset=b_kline[9], volume_tb_quote_asset=b_kline[10], variation=((Decimal(b_kline[4]) / Decimal(b_kline[1])) * 100) - 100, variation_range=((Decimal(b_kline[2]) / Decimal(b_kline[3])) * 100) - 100, ) return kline @classmethod def from_binance_kline_ws( cls, symbol, ws_kline, save=True ): # pragma: no cover """ Binance's response: { "e": "kline", // Event type "E": 123456789, // Event time "s": "BNBBTC", // Symbol "k": { "t": 123400000, // Kline start time "T": 123460000, // Kline close time "s": "BNBBTC", // Symbol "i": "1m", // Interval "f": 100, // First trade ID "L": 200, // Last trade ID "o": "0.0010", // Open price "c": "0.0020", // Close price "h": "0.0025", // High price "l": "0.0015", // Low price "v": "1000", // Base asset volume "n": 100, // Number of trades "x": false, // Is this kline closed? "q": "1.0000", // Quote asset volume "V": "500", // Taker buy base asset volume "Q": "0.500", // Taker buy quote asset volume "B": "123456" // Ignore } } """ kline = cls( symbol=symbol, time_open=timezone.datetime.fromtimestamp( ws_kline["k"]["t"] // 1000, tz=tz.utc ), time_close=timezone.datetime.fromtimestamp( ws_kline["k"]["T"] // 1000, tz=tz.utc ), time_interval=ws_kline["k"]["i"], price_open=ws_kline["k"]["o"], price_high=ws_kline["k"]["h"], price_low=ws_kline["k"]["l"], price_close=ws_kline["k"]["c"], volume=ws_kline["k"]["v"], volume_quote_asset=ws_kline["k"]["q"], number_of_trades=ws_kline["k"]["n"], volume_tb_base_asset=ws_kline["k"]["V"], volume_tb_quote_asset=ws_kline["k"]["Q"], variation=( (Decimal(ws_kline["k"]["c"]) / Decimal(ws_kline["k"]["o"])) * 100 ) - 100, variation_range=( (Decimal(ws_kline["k"]["h"]) / Decimal(ws_kline["k"]["l"])) * 100 ) - 100, ) if save: kline.save() logger.warning(f"{symbol}: Created Kline: {kline}.") return kline @classmethod def load_klines( cls, symbol, interval=settings.TIME_INTERVAL, start_time=None, end_time=None, limit=1000, ): last_kline = symbol.klines.first() if last_kline and not start_time: logger.warning(f"{symbol}: Last kline is {last_kline}") start_time = datetime_minutes_rounder(last_kline.time_close) if not start_time: start_time = timezone.now() - timezone.timedelta( minutes=settings.TIME_INTERVAL * 1000 ) start_time = start_time.replace( minute=( (start_time.minute // settings.TIME_INTERVAL) * settings.TIME_INTERVAL ), second=0, microsecond=0, ) if not end_time: end_time = timezone.now() end_time = end_time.replace( minute=( (end_time.minute // settings.TIME_INTERVAL) * settings.TIME_INTERVAL ), second=0, microsecond=0, ) - timezone.timedelta(seconds=1) logger.warning( f"{symbol}: Requesting klines from {start_time} to {end_time}." ) client = cls.get_client() klines_binance = client.klines( symbol.symbol, interval=f"{interval}m", startTime=int(start_time.timestamp() * 1000), endTime=int(end_time.timestamp() * 1000), limit=1000, ) klines, times = [], set() for k_b in klines_binance: kline = cls.from_binance_kline(symbol, f"{interval}m", k_b) klines.append(kline) times.add(kline.time_open.replace(tzinfo=tz.utc)) existing_data = [ t.replace(tzinfo=tz.utc) for t in cls.objects.filter( symbol=symbol, time_open__in=times, time_interval=f"{interval}m", ).values_list("time_open", flat=True) ] klines = [k for k in klines if k.time_open not in existing_data] created = cls.objects.bulk_create(klines) logger.warning( f"{symbol}: Received {len(klines_binance)} klines, " f"created {len(created)}." ) return sorted(created, key=lambda x: x.time_close, reverse=True) @classmethod def load_all_klines( cls, interval=settings.TIME_INTERVAL, start_time=None, end_time=None, model_score=settings.MODEL_SCORE_THRESHOLD, symbols=None, threads=settings.EXECUTOR_THREADS, ): symbols = symbols or Symbol.objects.available() with thread_pool_executor(threads) as executor: for symbol in symbols: executor.submit( cls.load_klines, symbol, interval=settings.TIME_INTERVAL, start_time=start_time, end_time=end_time, )
[docs]class TrainingData(models.Model): WINDOW = 20 LEARNING_TARGET = "variation" LEARNING_FIELDS = [ "variation_01", "variation_02", "variation_03", "variation_04", "variation_05", "variation_06", "variation_07", "variation_08", "variation_09", "variation_10", "variation_11", "variation_12", "variation_13", "variation_14", "variation_15", "variation_16", "variation_17", "variation_18", "variation_19", "variation_20", "variation_range_01", "variation_range_02", "variation_range_03", "variation_range_04", "variation_range_05", "variation_range_06", "variation_range_07", "variation_range_08", "variation_range_09", "variation_range_10", "variation_range_11", "variation_range_12", "variation_range_13", "variation_range_14", "variation_range_15", "variation_range_16", "variation_range_17", "variation_range_18", "variation_range_19", "variation_range_20", ] symbol = models.ForeignKey( Symbol, verbose_name="Symbol", related_name="training_data", on_delete=models.PROTECT, ) time = models.DateTimeField("Time (End of Interval)") time_interval = models.CharField("Time Interval", max_length=2) price_close = models.DecimalField( "Price - End of Interval", max_digits=16, decimal_places=8, ) variation = models.DecimalField( "Variation (t)", max_digits=40, decimal_places=8 ) variation_01 = models.DecimalField( "Variation (t-1)", max_digits=40, decimal_places=8 ) variation_02 = models.DecimalField( "Variation (t-2)", max_digits=40, decimal_places=8 ) variation_03 = models.DecimalField( "Variation (t-3)", max_digits=40, decimal_places=8 ) variation_04 = models.DecimalField( "Variation (t-4)", max_digits=40, decimal_places=8 ) variation_05 = models.DecimalField( "Variation (t-5)", max_digits=40, decimal_places=8 ) variation_06 = models.DecimalField( "Variation (t-6)", max_digits=40, decimal_places=8 ) variation_07 = models.DecimalField( "Variation (t-7)", max_digits=40, decimal_places=8 ) variation_08 = models.DecimalField( "Variation (t-8)", max_digits=40, decimal_places=8 ) variation_09 = models.DecimalField( "Variation (t-9)", max_digits=40, decimal_places=8 ) variation_10 = models.DecimalField( "Variation (t-10)", max_digits=40, decimal_places=8 ) variation_11 = models.DecimalField( "Variation (t-11)", max_digits=40, decimal_places=8 ) variation_12 = models.DecimalField( "Variation (t-12)", max_digits=40, decimal_places=8 ) variation_13 = models.DecimalField( "Variation (t-13)", max_digits=40, decimal_places=8 ) variation_14 = models.DecimalField( "Variation (t-14)", max_digits=40, decimal_places=8 ) variation_15 = models.DecimalField( "Variation (t-15)", max_digits=40, decimal_places=8 ) variation_16 = models.DecimalField( "Variation (t-16)", max_digits=40, decimal_places=8 ) variation_17 = models.DecimalField( "Variation (t-17)", max_digits=40, decimal_places=8 ) variation_18 = models.DecimalField( "Variation (t-18)", max_digits=40, decimal_places=8 ) variation_19 = models.DecimalField( "Variation (t-19)", max_digits=40, decimal_places=8 ) variation_20 = models.DecimalField( "Variation (t-20)", max_digits=40, decimal_places=8 ) variation_range_01 = models.DecimalField( "Variation Range (t-1)", max_digits=40, decimal_places=8 ) variation_range_02 = models.DecimalField( "Variation Range (t-2)", max_digits=40, decimal_places=8 ) variation_range_03 = models.DecimalField( "Variation Range (t-3)", max_digits=40, decimal_places=8 ) variation_range_04 = models.DecimalField( "Variation Range (t-4)", max_digits=40, decimal_places=8 ) variation_range_05 = models.DecimalField( "Variation Range (t-5)", max_digits=40, decimal_places=8 ) variation_range_06 = models.DecimalField( "Variation Range (t-6)", max_digits=40, decimal_places=8 ) variation_range_07 = models.DecimalField( "Variation Range (t-7)", max_digits=40, decimal_places=8 ) variation_range_08 = models.DecimalField( "Variation Range (t-8)", max_digits=40, decimal_places=8 ) variation_range_09 = models.DecimalField( "Variation Range (t-9)", max_digits=40, decimal_places=8 ) variation_range_10 = models.DecimalField( "Variation Range (t-10)", max_digits=40, decimal_places=8 ) variation_range_11 = models.DecimalField( "Variation Range (t-11)", max_digits=40, decimal_places=8 ) variation_range_12 = models.DecimalField( "Variation Range (t-12)", max_digits=40, decimal_places=8 ) variation_range_13 = models.DecimalField( "Variation Range (t-13)", max_digits=40, decimal_places=8 ) variation_range_14 = models.DecimalField( "Variation Range (t-14)", max_digits=40, decimal_places=8 ) variation_range_15 = models.DecimalField( "Variation Range (t-15)", max_digits=40, decimal_places=8 ) variation_range_16 = models.DecimalField( "Variation Range (t-16)", max_digits=40, decimal_places=8 ) variation_range_17 = models.DecimalField( "Variation Range (t-17)", max_digits=40, decimal_places=8 ) variation_range_18 = models.DecimalField( "Variation Range (t-18)", max_digits=40, decimal_places=8 ) variation_range_19 = models.DecimalField( "Variation Range (t-19)", max_digits=40, decimal_places=8 ) variation_range_20 = models.DecimalField( "Variation Range (t-20)", max_digits=40, decimal_places=8 ) class Meta: verbose_name = "Training Data" verbose_name_plural = "Training Data" ordering = ["-time"] # Descending ordering constraints = [ models.UniqueConstraint( fields=["symbol", "time", "time_interval"], name="unique_symbol_time", ) ] def __str__(self): return ( f"[{self.symbol}] {self.time} ({self.time_interval}): " f"{self.variation}% [Window: {self.WINDOW}]" ) @classmethod def from_klines(cls, symbol): """ Only one Training Data will be generated for each kline """ columns = [ "time_close", "time_interval", "price_close", "variation", "variation_range", ] last_td = symbol.training_data.order_by("time").last() if not last_td: ts = ( symbol.klines.all() .order_by("-time_close") .values_list(*columns) ) else: ts = ( symbol.klines.filter( time_open__gte=last_td.time - timezone.timedelta( minutes=settings.TIME_INTERVAL * (cls.WINDOW + 1), seconds=1, ) ) .order_by("-time_close") .values_list(*columns) ) df = pd.DataFrame(ts, columns=columns) for i in range(1, cls.WINDOW + 1): df[f"variation_{i:02d}"] = df["variation"].shift(-i) # for i in range(1, cls.WINDOW + 1): df[f"variation_range_{i:02d}"] = df["variation_range"].shift(-i) # Drop rows where there is a NaN - TODO: slow, check df = df.dropna(axis=0) tds, times, time_intervals = [], set(), set() for row in df.values.tolist(): tds.append( cls( symbol=symbol, time=row[0], time_interval=row[1], price_close=row[2], variation=row[3], variation_01=row[4], variation_02=row[5], variation_03=row[6], variation_04=row[7], variation_05=row[8], variation_06=row[9], variation_07=row[10], variation_08=row[11], variation_09=row[12], variation_10=row[13], variation_11=row[14], variation_12=row[15], variation_13=row[16], variation_14=row[17], variation_15=row[18], variation_16=row[19], variation_17=row[20], variation_18=row[21], variation_19=row[22], variation_20=row[23], variation_range_01=row[24], variation_range_02=row[25], variation_range_03=row[26], variation_range_04=row[27], variation_range_05=row[28], variation_range_06=row[29], variation_range_07=row[30], variation_range_08=row[31], variation_range_09=row[32], variation_range_10=row[33], variation_range_11=row[34], variation_range_12=row[35], variation_range_13=row[36], variation_range_14=row[37], variation_range_15=row[38], variation_range_16=row[39], variation_range_17=row[40], variation_range_18=row[41], variation_range_19=row[42], variation_range_20=row[43], ) ) times.add(row[0]) time_intervals.add(row[1]) existing_data = cls.objects.filter( symbol=symbol, time__in=times, time_interval__in=time_intervals ).values_list("time", flat=True) tds_to_create = [td for td in tds if td.time not in existing_data] created = cls.objects.bulk_create(tds_to_create) if len(created) > 0: logger.warning(f"{symbol}: Created {len(created)} Training Data.") else: logger.warning( f"{symbol}: TD already exist: {tds} {tds_to_create}" ) return sorted(created, key=lambda x: x.time, reverse=True)
class TraderoMixin: # Hack in the meantime, review LearningTechnique prediction for regression def h_predict(self, obs): eo = self.get_engine_object() if isinstance(obs, dict): obs = [[obs[key] for key in obs]] else: obs = [[o[key] for key in o] for o in obs] return eo.predict(obs) def td_to_dict(self, td): obs = {"variation": td.variation} for i in range(1, self.window): obs[f"variation_{i:02d}"] = getattr(td, f"variation_{i:02d}") return obs def _get_data_queryset(self): data_model = self._get_data_model() qs = data_model.objects.filter(symbol=self.symbol) return qs class PredictionModel(TraderoMixin, models.Model): CALIBRATION_PARAMS = {} symbol = models.ForeignKey( "Symbol", verbose_name="Symbol", related_name="%(app_label)s_%(class)s_related", related_query_name="%(app_label)s_%(class)ss", on_delete=models.PROTECT, ) def get_targets(self): targets = super().get_targets() return [float(t) for t in targets] def predict_next(self): last_td = self.symbol.training_data.first() obs = { "variation_01": last_td.variation, "variation_range_01": None, } for i in range(2, self.get_window_size() + 1): obs[f"variation_{i:02d}"] = getattr( last_td, f"variation_{(i - 1):02d}" ) obs[f"variation_range_{i:02d}"] = getattr( last_td, f"variation_range_{(i - 1):02d}" ) pred = self.h_predict(obs) return pred[0] def predict_next_times(self, n): """ Assumption: n < self.WINDOW """ last_td = self.symbol.training_data.first() preds = [] obss = [] preds.append(self.predict_next()) for i in range(2, n + 1): obs = {} for w in range(1, self.get_window_size() + 1): try: obs[f"variation_{w:02d}"] = preds[w - 1] obs[f"variation_range_{w:02d}"] = None except Exception: obs[f"variation_{w:02d}"] = getattr( last_td, f"variation_{w - i + 1:02d}" ) obs[f"variation_range_{w:02d}"] = getattr( last_td, f"variation_range_{w - i + 1:02d}" ) obss.append(obs) preds.append(self.h_predict(obs)[0]) return (preds, obss) def calibrate_window(self, save=True): scores = [] for i in range(1, TrainingData.WINDOW + 1): self.learning_fields = ", ".join( [f"variation_{j:02d}" for j in range(1, i + 1)] ) self.learning_fields = ( self.learning_fields + ", " + ", ".join( [f"variation_range_{j:02d}" for j in range(1, i + 1)] ) ) self.perform_inference(save=False) score = self.metadata["inference"]["current"]["scores"]["r2"] scores.append(score) logger.warning(f"[{self.symbol}] Window size: {i}: {score}") win_size = scores.index(max(scores)) + 1 logger.warning( f"[{self.symbol}] Best available window size: {win_size} - {scores}" ) self.learning_fields = ", ".join( [f"variation_{j:02d}" for j in range(1, win_size + 1)] ) self.learning_fields = ( self.learning_fields + ", " + ", ".join( [f"variation_range_{j:02d}" for j in range(1, win_size + 1)] ) ) if save: # pragma: no cover self.save() def calibrate_model(self, save=True): """ TOOD: Review this """ print("Before:", self.symbol.model_score) gs_eo = GridSearchCV(self.get_engine_object(), self.CALIBRATION_PARAMS) data, targets = self.get_data(), self.get_targets() gs_eo.fit(data, targets) for param, value in gs_eo.best_params_.items(): setattr(self, param, value) logger.warning(f"{self.symbol}: Best params: {gs_eo.best_params_}") self.perform_inference(save=False) print("After:", self.metadata["inference"]["current"]["scores"]["r2"]) if save: # pragma: no cover self.save() return gs_eo.best_params_ def get_window_size(self): return TrainingData.WINDOW class Meta: abstract = True
[docs]class DecisionTreeRegressor(PredictionModel, HGBTreeRegressor): """ https://scikit-learn.org/stable/modules/ensemble.html#histogram-based-gradient-boosting """ CALIBRATION_PARAMS = { "loss": ["squared_error", "absolute_error"], "learning_rate": [0.1, 0.2, 0.3], # "max_iter": [100, 200, ], # "max_leaf_nodes": [31, None, ], "l2_regularization": [ 0, 0.1, 0.2, 0.4, ], } class Meta: verbose_name = "Decision Trees Regressor" verbose_name_plural = "Decision Trees Regressors"
class OutlierDetectionModel(TraderoMixin, models.Model): OUTLIERS_PROPORTION_PARAMETER = None symbol = models.ForeignKey( "Symbol", verbose_name="Symbol", related_name="%(app_label)s_%(class)s_related", related_query_name="%(app_label)s_%(class)ss", on_delete=models.PROTECT, ) window = models.IntegerField( "Window", default=1, help_text=("Window to be used to consider outlier"), ) def save(self, *args, **kwargs): self.learning_fields = ", ".join( ["variation"] + [f"variation_{j:02d}" for j in range(1, self.window)] ) super().save(*args, **kwargs) def classify_current(self): last_td = self.symbol.training_data.last() pred = self.h_predict(self.td_to_dict(last_td)) return bool(pred[0] < 0) class Meta: abstract = True
[docs]class OutliersSVC(OutlierDetectionModel, OneClassSVC): """ https://scikit-learn.org/stable/modules/generated/sklearn.svm.OneClassSVM.html#sklearn.svm.OneClassSVM """ OUTLIERS_PROPORTION_PARAMETER = "nu" class Meta: verbose_name = "Outliers SVC" verbose_name_plural = "Outliers SVCs"
class TraderoBotGroupManager(models.Manager): def get_queryset(self): return ( super() .get_queryset() .prefetch_related( models.Prefetch( "bots", queryset=TraderoBot.objects.order_by("name") ) ) ) class TraderoBotGroup(models.Model): user = models.ForeignKey( User, verbose_name="User", related_name="botgroups", on_delete=models.PROTECT, ) name = models.CharField( "Name", max_length=255, blank=True, null=True, ) others = models.JSONField("Others", default=dict, blank=True) objects = TraderoBotGroupManager() class Meta: verbose_name = "Tradero Bots Group" verbose_name_plural = "Tradero Bots Groups" def __str__(self): return f"{self.name}" def get_absolute_url(self): return reverse("base:botzinhos-group-detail", kwargs={"pk": self.pk}) @property def are_all_off(self): for bot in self.bots.all(): if bot.status != TraderoBot.Status.INACTIVE: return False return True def on(self): for bot in self.bots.all(): if bot.status == TraderoBot.Status.INACTIVE: bot.on() return True def off(self): for bot in self.bots.all(): if bot.status != TraderoBot.Status.INACTIVE: bot.off() return True def liquidate(self): for bot in self.bots.all(): if bot.status == TraderoBot.Status.SELLING: bot.sell() bot.off() elif bot.status == TraderoBot.Status.BUYING: bot.off() return True def jump(self, to_symbol): for bot in self.bots.all(): if not bot.price_buying: bot.jump(to_symbol) return True def reset_soft(self): for bot in self.bots.all(): bot.reset_soft() return True @property def valuation_current(self): return TraderoBot.aggregate_valuation(self.bots.all(), "current") @property def valuation_initial(self): return TraderoBot.aggregate_valuation(self.bots.all(), "initial") @property def bot_status(self): bots = self.bots.all() # prefetched return TraderoBot.status_summary(bots) def update_bots(self): cache_key = f"{settings.BOTS_UPDATE_GROUP_KEY}_{self.pk}" if not cache.get(cache_key, False) or "pytest" in sys.modules: cache.set(cache_key, True, 60) client = Spot() bots = self.bots.enabled() if bots: symbols = list( Symbol.objects.filter(bots__in=bots) .distinct() .values_list("symbol", flat=True) ) prices = { t["symbol"]: t["price"] for t in client.ticker_price(symbols=symbols) } for bot in bots: bot.price_current = Decimal(prices[bot.symbol.symbol]) bot.decide() message = ( f"GROUP [{self.pk}] {self}: {len(bots)} bots updated." ) else: # pragma: no cover message = f"GROUP [{self.pk}] {self}: No bots were found." cache.set(cache_key, False) else: # pragma: no cover message = ( f"GROUP [{self.pk}] {self}: Other process updating bots for " "the group is running, please wait." ) logger.warning(message) return message class TraderoBotManager(models.Manager): def enabled(self): return self.get_queryset().exclude( models.Q(status=TraderoBot.Status.INACTIVE) | models.Q(user__trading_active=False) ) def get_queryset(self): return super().get_queryset().select_related("symbol", "user")
[docs]class TraderoBot(models.Model): _client = None _strategies = get_strategies() _last_klines = None class Status(models.IntegerChoices): INACTIVE = 0, "Inactive" BUYING = 1, "Buying" SELLING = 2, "Selling" class Action(models.IntegerChoices): ERROR = -1, "Error" HOLD = 0, "Hold" BUY = 1, "Buy" SELL = 2, "Sell" JUMP = 3, "Jump" TURN_ON = 4, "Turn ON" TURN_OFF = 5, "Turn OFF" RESET = 6, "Reset" user = models.ForeignKey( User, verbose_name="User", related_name="bots", on_delete=models.PROTECT, ) group = models.ForeignKey( TraderoBotGroup, verbose_name="Group", related_name="bots", on_delete=models.PROTECT, ) symbol = models.ForeignKey( Symbol, verbose_name="Symbol", related_name="bots", on_delete=models.PROTECT, ) name = models.CharField( "Name", max_length=255, blank=True, null=True, ) strategy = models.CharField("Strategy", max_length=50, default="acmadness") strategy_params = models.CharField( "Strategy parameters", max_length=510, blank=True, null=True, help_text=mark_safe( "(<a href='https://tradero.readthedocs.io/en/latest" "/trading_bots.html' target='_blank'>Strategy docs</a>)" ), ) is_jumpy = models.BooleanField("Jumpy?", default=False) jumpy_whitelist = models.CharField( "Jumpy Symbols' Whitelist", max_length=1024, blank=True, null=True ) jumpy_blacklist = models.CharField( "Jumpy Symbols' Blacklist", max_length=1024, blank=True, null=True ) should_reinvest = models.BooleanField("Should Reinvest?", default=True) should_stop = models.BooleanField( "Should Stop After Selling?", default=False ) is_dummy = models.BooleanField("Dummy?", default=True) status = models.SmallIntegerField( "Bot Status", choices=Status.choices, default=Status.INACTIVE ) fund_base_asset = models.DecimalField( "Fund (Base Asset)", max_digits=40, decimal_places=8, blank=True, null=True, ) fund_quote_asset = models.DecimalField( "Fund (Quote Asset)", max_digits=40, decimal_places=8, validators=[MinValueValidator(Decimal(15))], blank=True, null=True, ) fund_quote_asset_initial = models.DecimalField( "Initial Fund (Quote Asset)", max_digits=40, decimal_places=8, validators=[MinValueValidator(Decimal(15))], ) price_buying = models.DecimalField( "Buying Price the Base Asset", max_digits=40, decimal_places=8, blank=True, null=True, ) price_current = models.DecimalField( "Current Price of the Base Asset", max_digits=40, decimal_places=8, blank=True, null=True, ) price_selling = models.DecimalField( "Selling Price of the Base Asset", max_digits=40, decimal_places=8, blank=True, null=True, ) timestamp_start = models.DateTimeField( "Timestamp of Start Buying", blank=True, null=True, ) timestamp_buying = models.DateTimeField( "Timestamp of Buying", blank=True, null=True, ) timestamp_selling = models.DateTimeField( "Timestamp of Selling", blank=True, null=True, ) receipt_buying = models.JSONField( "Receipt - Buying", default=dict, blank=True, null=True ) receipt_selling = models.JSONField( "Receipt - Selling", default=dict, blank=True, null=True ) others = models.JSONField("Others", default=dict) class Meta: verbose_name = "Tradero Bot" verbose_name_plural = "Tradero Bots" objects = TraderoBotManager() def __str__(self): return ( f"[{self.id:03d}] {self.name}: {self.symbol.symbol} | " f"{self.get_status_display()}" ) def get_absolute_url(self): return reverse("base:botzinhos-detail", kwargs={"pk": self.pk}) def save(self, *args, **kwargs): if not self.pk: if ( self.user.bot_quota > 0 and self.user.bot_count >= self.user.bot_quota ): raise BotQuotaExceeded self.others["ws_group_name"] = f"bots_html_{self.user.username}" if self.jumpy_whitelist: self.jumpy_whitelist = self.jumpy_whitelist.upper() if self.jumpy_blacklist: self.jumpy_blacklist = self.jumpy_blacklist.upper() super().save(*args, **kwargs) if not self.name: self.name = f"{settings.BOT_DEFAULT_NAME}-{self.pk:03d}" super().save(*args, **kwargs) self.render_html_snippet(set_cache=True) async_to_sync(self.push_to_ws)() @property def valuation_current(self): if self.receipt_buying and self.fund_base_asset and self.price_current: return self.fund_base_asset * Decimal(self.price_current) else: return self.fund_quote_asset or self.fund_quote_asset_initial @property def valuation_initial(self): return self.fund_quote_asset_initial @property def fund_quote_asset_exec(self): if self.receipt_buying: return Decimal(self.receipt_buying["cummulativeQuoteQty"]) return None @property def fund_base_asset_executable(self): # Executable valuated if self.fund_base_asset: return ( self.fund_base_asset // self.symbol.step_size ) * self.symbol.step_size return None @property def time_selling(self): # pragma: no cover if self.timestamp_buying: return timezone.now() - self.timestamp_buying return None @property def time_buying(self): if self.timestamp_start and not self.timestamp_buying: return timezone.now() - self.timestamp_start return None def get_client(self, reinit=False): # pragma: no cover if not self._client or reinit: self._client = self.user.get_client() return self._client def get_strategy(self, symbol=None): return self._strategies[self.strategy]( self, symbol, **self.get_strategy_params() ) def get_strategy_params(self): params = {} if self.strategy_params: for pv in self.strategy_params.split(","): pv = pv.split("=") params[pv[0]] = pv[1] return params def on(self): if self.price_buying: self.status = self.Status.SELLING else: self.timestamp_start = self.timestamp_start or timezone.now() self.status = self.Status.BUYING self.local_memory_reset() self.log_trade() self.log(self.Action.TURN_ON, "Turned ON") self.save() return True def off(self): self.status = self.Status.INACTIVE self.local_memory_reset() self.log(self.Action.TURN_OFF, "Turned OFF") self.save() return True def reset_hard(self): self.log_trade(cancelled=True) self.status = self.Status.INACTIVE self.receipt_buying, self.receipt_selling = None, None self.fund_base_asset, self.fund_quote_asset = None, None self.price_buying, self.price_selling, self.price_current = ( None, None, None, ) self.timestamp_start, self.timestamp_buying, self.timestamp_selling = ( None, None, None, ) self.log(self.Action.RESET, "RESET - HARD") self.save() return True def reset_soft(self): self.log_trade(cancelled=True) if self.receipt_buying: rb = self.get_client().parse_receipt(self.receipt_buying) self.symbol = Symbol.objects.get(symbol=rb["symbol"]) # Reset self.timestamp_buying = rb["timestamp"] self.price_buying = rb["price_net"] self.price_current = self.get_current_price() self.fund_base_asset = rb["quantity_rec"] self.fund_quote_asset = None if self.receipt_selling: rs = self.get_client().parse_receipt(self.receipt_selling) if rs["symbol"] != self.symbol.symbol: self.log( self.Action.ERROR, "ERROR - Hard reset or manual intervention required", ) self.off() return False self.log(self.Action.RESET, "RESET - SOFT") if self.receipt_buying and not self.receipt_selling: self.buy() # Resume from receipt elif self.receipt_buying and self.receipt_selling: self.sell() # Resume from receipt else: self.fund_base_asset = None self.price_buying, self.price_selling, self.price_current = ( None, None, None, ) self.timestamp_buying, self.timestamp_selling = ( None, None, ) if not self.timestamp_start: self.timestamp_start = timezone.now() self.save() self.on() return True def buy(self): client = self.get_client() if not self.receipt_buying: amount = self.fund_quote_asset or self.fund_quote_asset_initial success, receipt, message = client.tradero_buy( self.symbol, amount, dummy=self.is_dummy, ) if success: self.receipt_buying = receipt self.log( self.Action.BUY, message="Buying Transaction done sucessfully", ) self.save() else: self.log(self.Action.ERROR, message=message) self.save() return False else: self.log( self.Action.BUY, message="Buying Transaction already done, resuming from receipt", ) self.save() try: with transaction.atomic(): r = client.parse_receipt(self.receipt_buying) self.timestamp_buying = r["timestamp"] if not self.timestamp_start: # pragma: no cover self.timestamp_start = self.timestamp_buying self.price_buying = r["price_net"] self.fund_base_asset = r["quantity_rec"] self.log(self.Action.BUY) self.log_trade() # Set new state self.fund_quote_asset = ( None # Unexecuted FQA is not logged when Buying ) if self.status != self.Status.INACTIVE: self.status = self.Status.SELLING self.save() return True except Exception as e: message = str(e) self.log(self.Action.ERROR, message=message) self.log( self.Action.ERROR, message=( "Problems encountered, turning the bot off, " "please check the logs and either reset or adjust manually" ), ) self.off() self.save() return False def sell(self): client = self.get_client() if not self.receipt_selling: success, receipt, message = client.tradero_sell( self.symbol, self.fund_base_asset, dummy=self.is_dummy, ) if success: self.receipt_selling = receipt self.save() self.log( self.Action.SELL, message="Selling Transaction done sucessfully", ) self.save() else: self.log(self.Action.ERROR, message=message) self.save() return False else: self.log( self.Action.SELL, message="Selling Transaction already done, resuming from receipt", ) self.save() try: with transaction.atomic(): r = client.parse_receipt(self.receipt_selling) self.price_selling = r["price_net"] self.fund_quote_asset = r["quantity_rec"] self.timestamp_selling = r["timestamp"] self.log(self.Action.SELL) self.log_trade() # Set new state self.fund_base_asset = ( self.fund_base_asset - r["quantity_exec"] ) if not self.should_reinvest: self.fund_quote_asset = self.fund_quote_asset_initial self.timestamp_selling, self.timestamp_buying = None, None self.receipt_selling, self.receipt_buying = None, None self.price_selling, self.price_buying = None, None if self.status != self.Status.INACTIVE: self.status = self.Status.BUYING if self.should_stop: self.status = self.Status.INACTIVE self.timestamp_start = None else: self.timestamp_start = timezone.now() self.save() return True except Exception as e: message = str(e) self.log(self.Action.ERROR, message=message) self.log( self.Action.ERROR, message=( "Problems encountered, turning the bot off, " "please check and either reset or adjust manually" ), ) self.off() self.save() return False def jump(self, to_symbol): if self.receipt_buying: self.log( self.Action.HOLD, f"Holding JUMP to {to_symbol} due to already have bought", ) self.save() return False current_symbol = self.symbol fba = self.fund_base_asset fba_msg = ( f" (leaving {fba:.6f} {current_symbol.symbol} behind)" if fba else "" ) self.symbol = to_symbol self.fund_base_asset = None self.price_current = self.get_current_price() self.local_memory_reset() self.log( self.Action.JUMP, f"Jumped from {current_symbol} to {to_symbol}{fba_msg}", ) self.save() return True def decide(self): strategy = self.get_strategy() self.local_memory_update(strategy) if self.status == self.Status.BUYING: should_buy, message = strategy.evaluate_buy() if should_buy: self.buy() return if self.is_jumpy: should_jump, symbol = strategy.evaluate_jump() if should_jump: self.jump(symbol) self.get_strategy() # Update values self.decide() return message += " and no other symbol to go was found." if self.status == self.Status.SELLING: should_sell, message = strategy.evaluate_sell() if should_sell: self.sell() return self.log(self.Action.HOLD, message) self.save() def local_memory_update(self, strategy=None): strategy = strategy or self.get_strategy() strategy.local_memory_update() def has_local_memory(self, symbol=None, strategy=None): symbol = symbol or self.symbol strategy = strategy or self.get_strategy() return strategy.has_local_memory(symbol) def local_memory_reset(self): self.others["local_memory"] = {} def get_local_memory(self, symbol=None): symbol = symbol or self.symbol if self.others.get("local_memory", {}): return self.others["local_memory"].get(symbol.symbol, {}) else: self.others["local_memory"] = {} return self.others["local_memory"] def set_local_memory(self, symbol=None, value={}): symbol = symbol or self.symbol self.others["local_memory"][self.symbol.symbol] = value def get_current_price(self): client = self.get_client() price = client.ticker_price(symbol=self.symbol.symbol)["price"] return Decimal(price) def log(self, action, message=None): log = TraderoBotLog( bot=self, is_dummy=self.is_dummy, status=self.status, action=action, fund_base_asset=self.fund_base_asset, fund_quote_asset=self.fund_quote_asset, price_buying=self.price_buying, price_current=self.price_current, message=message, ) if action == self.Action.BUY and not message: log.message = ( f"Bought {self.fund_base_asset:.3f} of {self.symbol} at " f"{self.price_buying:.6f} (" f"{self.fund_base_asset * self.price_buying:.6f} " f"{self.symbol.quote_asset})" ) if action == self.Action.SELL and not message: log.price_selling = self.price_selling log.variation = (self.price_selling / self.price_buying - 1) * 100 log.message = ( f"BOOM! Sold {self.fund_base_asset:.3f} of {self.symbol} at " f"{self.price_selling:.6f} - VAR: {log.variation:.3f}%" ) log.save() last_logs = self.others.get("last_logs", []) last_logs.append(f"{log.timestamp:%Y-%m-%d %H:%M:%S}| {log.message}") self.others["last_logs"] = last_logs[-4:] def log_trade(self, cancelled=False): trade_history, _ = self.trades.update_or_create( user=self.user, timestamp_start=self.timestamp_start, defaults={ "is_dummy": self.is_dummy, "symbol": self.symbol, "strategy": self.strategy, "strategy_params": self.strategy_params, "timestamp_start": self.timestamp_start, "timestamp_buying": self.timestamp_buying, "timestamp_selling": self.timestamp_selling, "timestamp_cancelled": (timezone.now() if cancelled else None), "fund_base_asset": self.fund_base_asset, "price_buying": self.price_buying, "price_selling": self.price_selling, "receipt_buying": self.receipt_buying, "receipt_selling": self.receipt_selling, }, ) trade_history.save() return trade_history def get_last_log_message(self): last_log = self.logs.last() if last_log: return last_log.message return None def get_jumpy_blacklist(self): if self.jumpy_blacklist: return [s.strip().upper() for s in self.jumpy_blacklist.split(",")] return [] def get_jumpy_whitelist(self): if self.jumpy_whitelist: return [s.strip().upper() for s in self.jumpy_whitelist.split(",")] return [] def render_html_snippet(self, set_cache=False): cache_key = f"bot_{self.pk}_html" html = cache.get(cache_key) if not html or set_cache: html = render_to_string( "base/bot_snippet.html", { "bot": self, }, ) cache.set(cache_key, html, settings.TIME_INTERVAL_BOTS * 60 + 9) return html async def push_to_ws(self): bot = f"botzinho-{self.pk}" text = self.render_html_snippet() await channel_layer.group_send( self.others["ws_group_name"], { "type": "bot.html.message", "message": { "type": "bot_update", "bot": bot, "text": text, }, }, ) def clean(self): super().clean() # Check validity of jumpy_whitelist jumpy_whitelist = self.get_jumpy_whitelist() if jumpy_whitelist: if len(Symbol.objects.filter(symbol__in=jumpy_whitelist)) < len( jumpy_whitelist ): raise ValidationError( {"jumpy_whitelist": "Unrecognized Symbols."} ) # Check validity of jumpy_blacklist jumpy_blacklist = self.get_jumpy_blacklist() if jumpy_blacklist: if len(Symbol.objects.filter(symbol__in=jumpy_blacklist)) < len( jumpy_blacklist ): raise ValidationError( {"jumpy_blacklist": "Unrecognized Symbols."} ) # Check vailidity of strategy params try: strategy = self.get_strategy() except Exception: raise ValidationError( { "strategy_params": ( "Unrecognized parameters or format for the strategy " "- or missing symbol for the bot" ) } ) for param in self.get_strategy_params(): if param not in strategy.params: raise ValidationError( { "strategy_params": ( "Unrecognized parameters or format for the strategy" ) } ) @classmethod def update_all_bots(cls, groups=None, threads=settings.EXECUTOR_THREADS): timestamp = timezone.now() groups = groups or TraderoBotGroup.objects.all() if settings.USE_TASKS: from base.tasks import bots_logrotate, update_bots_group_job header = [update_bots_group_job.s(group.pk) for group in groups] callback = bots_logrotate.si() chord(header, callback).apply_async() message = "UPDATE ALL BOTS: Tasks submitted" else: with thread_pool_executor(threads) as executor: for group in groups: executor.submit(group.update_bots) cls.logrotate() logger.warning("-> UPDATE ALL BOTS DONE <-") message = ( f"---> Elapsed Time: " f"{ (timezone.now() - timestamp).total_seconds() } " f"({len(groups)} Groups) <----" ) logger.warning(message) return message @classmethod def status_summary(cls, bots_qs): bots = bots_qs.all() status = { "BUYING": 0, "SELLING": 0, "INACTIVE": 0, "TOTAL": 0, } for bot in bots: status[bot.get_status_display().upper()] += 1 status["TOTAL"] += 1 return status @classmethod def aggregate_valuation(cls, bots_qs, type="current"): valuations = [ getattr(bot, f"valuation_{type}", 0) for bot in bots_qs.all() ] return sum(valuations) @classmethod def logrotate(cls): if settings.CLEANING_WINDOW_BOTS_LOGS > 0: date_threshold = timezone.now() - timezone.timedelta( minutes=settings.TIME_INTERVAL_BOTS * settings.CLEANING_WINDOW_BOTS_LOGS ) r = TraderoBotLog.objects.filter( timestamp__lt=date_threshold ).delete() message = f"BOTS LOGROTATE: {r[0]} logs cleaned " logger.warning(message) return message return False # pragma: no cover
class TraderoBotLog(models.Model): bot = models.ForeignKey( TraderoBot, verbose_name="User", related_name="logs", on_delete=models.PROTECT, ) is_dummy = models.BooleanField("Dummy?") timestamp = models.DateTimeField("Timestamp", auto_now_add=True) status = models.SmallIntegerField( "Bot Status", choices=TraderoBot.Status.choices, ) action = models.SmallIntegerField( "Bot Action", choices=TraderoBot.Action.choices, ) fund_base_asset = models.DecimalField( "Fund (Base Asset)", max_digits=40, decimal_places=8, blank=True, null=True, ) fund_quote_asset = models.DecimalField( "Fund (Quote Asset)", max_digits=40, decimal_places=8, blank=True, null=True, ) price_buying = models.DecimalField( "Buying Price the Base Asset (Net)", max_digits=40, decimal_places=8, blank=True, null=True, ) price_current = models.DecimalField( "Current Market Price of the Base Asset", max_digits=40, decimal_places=8, blank=True, null=True, ) price_selling = models.DecimalField( "Selling Price of the Base Asset (Net)", max_digits=40, decimal_places=8, blank=True, null=True, ) variation = models.DecimalField( "Porcentual Variation between Buying and Selling Price", max_digits=40, decimal_places=8, blank=True, null=True, ) message = models.CharField( "Extra message (others)", max_length=2048, blank=True, null=True, ) class Meta: verbose_name = "Tradero Bot Log" verbose_name_plural = "Tradero Bots Logs" def __str__(self): return ( f"{self.bot} {'[[DUMMY]]' if self.is_dummy else ''} | " f"[{self.get_action_display()}] | {self.timestamp} " f"|| {self.message}" ) def save(self, *args, **kwargs): super().save(*args, **kwargs) logger.warning(str(self)) class TradeHistoryManager(models.Manager): def get_queryset(self): return super().get_queryset().select_related("symbol", "user", "bot") class TradeHistory(models.Model): """ Highly denormalized on purpose """ _client = None user = models.ForeignKey( User, verbose_name="User", related_name="trades", on_delete=models.PROTECT, ) bot = models.ForeignKey( TraderoBot, verbose_name="Bot", related_name="trades", on_delete=models.PROTECT, ) is_dummy = models.BooleanField("Dummy?") symbol = models.ForeignKey( Symbol, verbose_name="Symbol", related_name="trades_history", on_delete=models.PROTECT, ) strategy = models.CharField( "Strategy", max_length=50, blank=True, null=True, ) strategy_params = models.CharField( "Strategy parameters", max_length=255, blank=True, null=True, ) timestamp_start = models.DateTimeField( "Timestamp - Start", blank=True, null=True, ) timestamp_buying = models.DateTimeField( "Timestamp - Buying", blank=True, null=True, ) timestamp_selling = models.DateTimeField( "Timestamp - Selling", blank=True, null=True, ) timestamp_cancelled = models.DateTimeField( "Timestamp - Cancelled", blank=True, null=True, ) fund_base_asset = models.DecimalField( "Fund (Base Asset)", max_digits=40, decimal_places=8, blank=True, null=True, ) fund_base_asset_exec = models.DecimalField( "Fund (Base Asset) Executed", max_digits=40, decimal_places=8, blank=True, null=True, ) fund_base_asset_unexec = models.DecimalField( "Fund (Base Asset) Unexecuted", max_digits=40, decimal_places=8, blank=True, null=True, ) fund_quote_asset_exec = models.DecimalField( "Fund (Quote Asset) Executed", max_digits=40, decimal_places=8, blank=True, null=True, ) fund_quote_asset_return = models.DecimalField( "Fund (Quote Asset) Returned", max_digits=40, decimal_places=8, blank=True, null=True, ) price_buying = models.DecimalField( "Net Buying Price the Base Asset", max_digits=40, decimal_places=8, blank=True, null=True, ) price_selling = models.DecimalField( "Net Selling Price of the Base Asset", max_digits=40, decimal_places=8, blank=True, null=True, ) commission_buying = models.DecimalField( "Comission (Buying)", max_digits=40, decimal_places=8, blank=True, null=True, ) commission_buying_asset = models.CharField( "Comission Asset (Buying)", max_length=10, blank=True, null=True, ) commission_selling = models.DecimalField( "Comission (Selling)", max_digits=40, decimal_places=8, blank=True, null=True, ) commission_selling_asset = models.CharField( "Comission Asset (Selling)", max_length=10, blank=True, null=True, ) receipt_buying = models.JSONField( "Receipt - Buying", default=dict, blank=True, null=True ) receipt_selling = models.JSONField( "Receipt - Selling", default=dict, blank=True, null=True ) is_complete = models.BooleanField("Is Trade Complete?", default=False) variation_price = models.DecimalField( "Porcentual Variation between Buying and Selling Price", max_digits=40, decimal_places=8, blank=True, null=True, ) variation_quote_asset = models.DecimalField( "Variation of the Quote Asset", max_digits=40, decimal_places=8, blank=True, null=True, ) gain_quote_asset = models.DecimalField( "Gain of Quote Asset", max_digits=40, decimal_places=8, blank=True, null=True, ) duration_seeking = models.DurationField( "Elapsed time looking for buy", blank=True, null=True, ) duration_trade = models.DurationField( "Elapsed time between Buying and Selling", blank=True, null=True, ) duration_total = models.DurationField( "Total Elapsed time since start", blank=True, null=True, ) class Meta: verbose_name = "Trade History" verbose_name_plural = "Trades History" indexes = [ models.Index(fields=["-timestamp_selling"]), ] constraints = [ models.UniqueConstraint( fields=["user", "bot", "timestamp_start"], name="unique_user_bot_timestamp_start", ) ] objects = TradeHistoryManager() def __str__(self): return ( f"{self.user}: #{self.bot.id} {'[[DUMMY]]' if self.is_dummy else ''} |" f" {self.symbol.symbol} | {self.timestamp_start} - " f"{self.timestamp_selling or '...'}:: Var: {self.variation}% " f"Var FQA: {self.variation_quote_asset}" ) def save(self, *args, **kwargs): if self.receipt_buying: rb = self.get_client().parse_receipt(self.receipt_buying) self.timestamp_buying = rb["timestamp"] self.fund_quote_asset_exec = rb["quantity_exec"] self.commission_buying = rb["commission"] self.commission_buying_asset = rb["commission_asset"] self.fund_base_asset = rb["quantity_rec"] if self.receipt_buying and self.receipt_selling: self.is_complete = True rs = self.get_client().parse_receipt(self.receipt_selling) self.timestamp_selling = rs["timestamp"] self.fund_base_asset_exec = rs["quantity_exec"] self.fund_base_asset_unexec = ( self.fund_base_asset - self.fund_base_asset_exec ) self.commission_selling = rs["commission"] self.commission_selling_asset = rs["commission_asset"] self.fund_quote_asset_return = rs["quantity_rec"] self.gain_quote_asset = ( self.fund_quote_asset_return - self.fund_quote_asset_exec ) self.variation_quote_asset = ( self.fund_quote_asset_return / self.fund_quote_asset_exec - 1 ) * 100 self.variation_price = ( rs["price_net"] / rb["price_net"] - 1 ) * 100 if self.timestamp_buying and self.timestamp_start: self.duration_seeking = ( self.timestamp_buying - self.timestamp_start ) if ( self.timestamp_selling and self.timestamp_buying and self.timestamp_start ): self.duration_trade = ( self.timestamp_selling - self.timestamp_buying ) self.duration_total = self.timestamp_selling - self.timestamp_start super().save(*args, **kwargs) def get_client(self, reinit=False): # pragma: no cover if not self._client or reinit: self._client = self.user.get_client() return self._client @classmethod def summary(cls, trades_qs, checkpoint=None): result = {"rows": {}} dates = { "24h": timezone.now() - timezone.timedelta(days=1), "1w": timezone.now() - timezone.timedelta(days=7), "checkpoint": checkpoint, "alltime": None, } for label, t in dates.items(): t_qs = trades_qs.filter(is_complete=True) if t: t_qs = t_qs.filter(timestamp_selling__gte=t) result["rows"][label] = t_qs.aggregate( gain_quote_asset_total=models.Sum("gain_quote_asset"), variation_average=models.Avg("variation_quote_asset"), duration_total_average=models.Avg("duration_total"), trades_quantity=models.Count("pk"), ) result["meta"] = { "descriptions": { "rows": { "24h": "Last 24 Hours", "1w": "Last Week", "checkpoint": "Checkpoint", "alltime": "All-Time", }, "cols": { "gain_quote_asset_total": "Total Gains", "variation_average": "Avg. Var", "duration_total_average": "Avg. Duration", "trades_quantity": "#Trades", }, } } return result @classmethod def summary_for_object(cls, obj=None): if isinstance(obj, TraderoBot): qs = cls.objects.filter(bot=obj) checkpoint = obj.user.checkpoint elif isinstance(obj, User): qs = cls.objects.filter(user=obj) checkpoint = obj.checkpoint else: # TraderoBotGroup object qs = cls.objects.filter(bot__group=obj) checkpoint = obj.user.checkpoint result = { "object": obj, "cp": checkpoint, "dummy": cls.summary(qs.filter(is_dummy=True), checkpoint), "real": cls.summary(qs.filter(is_dummy=False), checkpoint), } return result