diff --git a/src/cryptocom/exchange/__init__.py b/src/cryptocom/exchange/__init__.py index 947080a..30a3ba4 100644 --- a/src/cryptocom/exchange/__init__.py +++ b/src/cryptocom/exchange/__init__.py @@ -9,7 +9,6 @@ from .structs import ( from .market import Exchange from .private import Account from .api import ApiError, ApiProvider -from .rate_limiter import RateLimiterError, RateLimiter from . import pairs, coins if platform.system() == 'Windows': diff --git a/src/cryptocom/exchange/api.py b/src/cryptocom/exchange/api.py index 4c395ab..17aa007 100644 --- a/src/cryptocom/exchange/api.py +++ b/src/cryptocom/exchange/api.py @@ -1,5 +1,6 @@ import os import json +from re import S import time import hmac import random @@ -7,7 +8,8 @@ import asyncio import hashlib from urllib.parse import urljoin -from .rate_limiter import RateLimiter +from aiolimiter import AsyncLimiter + import aiohttp @@ -32,6 +34,8 @@ class ApiProvider: self.ws_root_url = ws_root_url self.timeout = timeout self.retries = retries + self.limiter = AsyncLimiter(1, 1) + self.last_request = '' self.limits = { # method: (req_limit, period) 'private/create-order': (15, 0.1), @@ -50,11 +54,6 @@ class ApiProvider: 'private/margin/get-order-history': (1, 1) } - self.rate_limiter = RateLimiter(self.limits, 1) - - # NOTE: do not change this, due to crypto.com rate-limits - # TODO: add more strict settings, req/per second or milliseconds - if not auth_required: return @@ -96,29 +95,48 @@ class ApiProvider: ).hexdigest() return data + def set_limit(self, url): + + if not(url in self.limits.keys()): + if url.startswith('private'): + rate_limit, period = 3, 0.1 + + elif url.startswith('public'): + rate_limit, period = 100, 1 + + else: + raise ApiError(f'Wrong path: {url}') + + else: + rate_limit, period = self.limits[url] + + self.limiter.max_rate = rate_limit + self.limiter.time_period = period + async def request(self, method, path, params=None, data=None, sign=False): original_data = data timeout = aiohttp.ClientTimeout(total=self.timeout) - self.rate_limiter.set_config(path) + if not (path == self.last_request): + self.set_limit(path) + self.last_request = path for count in range(self.retries + 1): if sign: data = self._sign(path, original_data) try: - async with self.rate_limiter: - async with aiohttp.ClientSession(timeout=timeout) as session: - async with self.rate_limiter.throttle(): - resp = await session.request( - method, urljoin(self.root_url, path), - params=params, json=data, - headers={'content-type': 'application/json'} - ) - resp_json = await resp.json() - if resp.status != 200: - raise ApiError( - f"Error: {resp_json}. " - f"Status: {resp.status}. Json params: {data}") + async with aiohttp.ClientSession(timeout=timeout) as session: + async with self.limiter: + resp = await session.request( + method, urljoin(self.root_url, path), + params=params, json=data, + headers={'content-type': 'application/json'} + ) + resp_json = await resp.json() + if resp.status != 200: + raise ApiError( + f"Error: {resp_json}. " + f"Status: {resp.status}. Json params: {data}") except aiohttp.ClientConnectorError: raise ApiError(f"Cannot connect to host {self.root_url}") except asyncio.TimeoutError: diff --git a/src/cryptocom/exchange/rate_limiter.py b/src/cryptocom/exchange/rate_limiter.py deleted file mode 100644 index 38b3cb1..0000000 --- a/src/cryptocom/exchange/rate_limiter.py +++ /dev/null @@ -1,136 +0,0 @@ -import asyncio -import math -import time - -from contextlib import asynccontextmanager - - -class RateLimiterError(Exception): - pass - - -class RateLimiter: - def __init__(self, - limits, - concurrency_limit) -> None: - - if not concurrency_limit or concurrency_limit < 1: - raise ValueError('concurrent limit must be non zero positive number') - - self.rate_limit = int - self.period = float or int # takes seconds - self.tokens_queue = object # asyncio.Queue expecting - self.tokens_consumer_task = object # asyncio.create_task expecting - self.semaphore = object # asyncio.Semaphore expecting - - self.config_setted = False - self.concurrency_limit = concurrency_limit - self.limits = limits - - def get_url_config_data(self, url): - request_type = url.split('/')[0] - - if not(url in self.limits.keys()): - - if request_type == 'public': - rate_limit, period = 100, 1 - - elif request_type == 'private': - rate_limit, period = 3, 0.1 - - elif not (url in self.limits.keys()): - raise RateLimiterError(f'Wrong path: {url}') - - else: - rate_limit, period = self.limits[url] - - return rate_limit, period - - def set_config(self, url): - rate_limit, period = self.get_url_config_data(url) - - if not rate_limit or rate_limit < 1: - raise ValueError('rate limit must be non zero positive number') - - self.rate_limit = rate_limit - self.period = period - self.tokens_queue = asyncio.Queue(rate_limit) - self.tokens_consumer_task = asyncio.create_task(self.consume_tokens()) - self.semaphore = asyncio.Semaphore(self.concurrency_limit) - - self.config_setted = True - - async def add_token(self) -> None: - await self.tokens_queue.put(1) - return None - - async def consume_tokens(self): - try: - consumption_rate = self.period / self.rate_limit - last_consumption_time = 0 - - while True: - if self.tokens_queue.empty(): - await asyncio.sleep(consumption_rate) - continue - - current_consumption_time = time.monotonic() - total_tokens = self.tokens_queue.qsize() - tokens_to_consume = self.get_tokens_amount_to_consume( - consumption_rate, - current_consumption_time, - last_consumption_time, - total_tokens - ) - - for _ in range(0, tokens_to_consume): - self.tokens_queue.get_nowait() - - last_consumption_time = time.monotonic() - - await asyncio.sleep(consumption_rate) - except asyncio.CancelledError: - raise - except Exception as e: - raise - - @staticmethod - def get_tokens_amount_to_consume(consumption_rate, current_consumption_time, - last_consumption_time, total_tokens): - time_from_last_consumption = current_consumption_time - last_consumption_time - calculated_tokens_to_consume = math.floor(time_from_last_consumption / consumption_rate) - tokens_to_consume = min(total_tokens, calculated_tokens_to_consume) - - return tokens_to_consume - - @asynccontextmanager - async def throttle(self): - if not self.config_setted: - raise RateLimiterError('Config is not setted. You need to set it via set_config() before throttling') - - await self.semaphore.acquire() - await self.add_token() - try: - yield - finally: - self.semaphore.release() - - async def __aenter__(self): - return self - - async def __aexit__(self, exc_type, exc_val, exc_tb): - if exc_type: - pass - - await self.close() - - async def close(self) -> None: - if self.tokens_consumer_task and not self.tokens_consumer_task.cancelled(): - try: - self.tokens_consumer_task.cancel() - await self.tokens_consumer_task - except asyncio.CancelledError: - pass - - except Exception: - raise diff --git a/tests/test_api.py b/tests/test_api.py index 8b7d4fb..aad16da 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -1,11 +1,8 @@ import os import time import pytest -import aiohttp -import asyncio import cryptocom.exchange as cro -from cryptocom.exchange import rate_limiter def test_timeframe(): @@ -52,28 +49,56 @@ def test_api_args(monkeypatch): async def test_wrong_api_response(): api = cro.ApiProvider(from_env=True) - with pytest.raises(cro.RateLimiterError): + with pytest.raises(cro.ApiError): await api.get('somepath') api = cro.ApiProvider(auth_required=False) - with pytest.raises(cro.RateLimiterError): + with pytest.raises(cro.ApiError): await api.post('account') -# @pytest.mark.asyncio -# async def test_api_rate_limits(): -# api = cro.ApiProvider(from_env=True) -# account = cro.Account(from_env=True) +@pytest.mark.asyncio +async def test_api_rate_limits(): + api = cro.ApiProvider(from_env=True) + pair = cro.pairs.CRO_USDT + + page = 0 + page_size = 50 + + params = {'page_size': page_size, 'page': page} + + if pair: + params['instrument_name'] = pair.name + + start_time = time.time() + await api.post('private/get-order-history', {'params': params}) + await api.post('private/get-order-history', {'params': params}) + finish_time = (time.time() - start_time) + + assert finish_time > 1 + + start_time = time.time() + await api.post('private/get-order-history', {'params': params}) + await api.post('private/get-order-history', {'params': params}) + await api.post('private/get-order-history', {'params': params}) + await api.post('private/get-order-history', {'params': params}) + + finish_time = time.time() - start_time + assert finish_time > 4 + + start_time = time.time() + await api.get('public/get-instruments') + await api.get('public/get-instruments') + await api.get('public/get-instruments') + await api.get('public/get-instruments') -# rate_limiter = cro.RateLimiter(cro.api.limits) + finish_time = time.time() - start_time + assert finish_time < 4 -# for _ in range(0, 100): -# print(await account.get_balance()) + start_time = time.time() + await api.post('private/get-order-history', {'params': params}) + await api.post('private/get-order-history', {'params': params}) + await api.post('private/get-order-history', {'params': params}) -# for _ in range(0, 100): -# await account.get_orders_history(cro.pairs.CRO_USDT, page_size=50) - -# for _ in range(0, 100): -# await api.get('public/get-ticker') - -# async with + finish_time = time.time() - start_time + assert finish_time > 3