Source code for base.management.commands.ws_klines

import logging

from django.conf import settings
from websocketclient.management.commands import runwebsocketclient

from base.models import Symbol
from base.websocket_client import BinanceWebSocketClient

logger = logging.getLogger(__name__)


[docs]class Command(runwebsocketclient.Command): def __init__(self, websocket_client=None, *args, **kwargs): super().__init__(*args, **kwargs) self.client = ( websocket_client if websocket_client else BinanceWebSocketClient(stdout=self.stdout) ) def add_arguments(self, parser): parser.add_argument( "--periods", action="store", default=settings.WARM_UP_PERIODS, help="Periods to do request", type=int, ) parser.add_argument( "--reset-ms", action="store", default=False, help="Periods to do request", type=bool, ) parser.add_argument( "--clean", action="store", default=True, help="Periods to do request", type=bool, ) def configure_options(self, **options): super().configure_options(**options) if options["reset_ms"]: Symbol.objects.all().update(model_score=None) if options["clean"]: Symbol.reset_symbols() Symbol.general_warm_up(n_periods=options["periods"]) Symbol.update_all_indicators(push=False) Symbol.update_all_indicators(only_top=True) symbols = Symbol.objects.top_symbols().values_list("symbol", flat=True) path = "/stream?streams=" + "/".join( [ f"{s.lower()}@kline_{settings.TIME_INTERVAL}m" for s in sorted(symbols) ] ) self.client.path = path