From 12963b27e0d9f6dd379c7024932eb1d28f8a8429 Mon Sep 17 00:00:00 2001 From: Irishery Date: Wed, 10 Nov 2021 23:25:39 +0200 Subject: [PATCH 1/9] added RPS instead semaphore --- src/cryptocom/exchange/api.py | 59 ++++++++++++---- src/cryptocom/exchange/rate_limiter.py | 98 ++++++++++++++++++++++++++ tests/test_api.py | 15 ++++ 3 files changed, 159 insertions(+), 13 deletions(-) create mode 100644 src/cryptocom/exchange/rate_limiter.py diff --git a/src/cryptocom/exchange/api.py b/src/cryptocom/exchange/api.py index ac1cd9e..8bb339d 100644 --- a/src/cryptocom/exchange/api.py +++ b/src/cryptocom/exchange/api.py @@ -7,6 +7,7 @@ import asyncio import hashlib from urllib.parse import urljoin +from .rate_limiter import RateLimiter import aiohttp @@ -24,16 +25,33 @@ class ApiProvider: auth_required=True, timeout=25, retries=6, root_url='https://api.crypto.com/v2/', ws_root_url='wss://stream.crypto.com/v2/', logger=None): + self.api_key = api_key self.api_secret = api_secret self.root_url = root_url self.ws_root_url = ws_root_url self.timeout = timeout self.retries = retries + self.limits = { + # method: (req_limit, period) + 'private/create-order': (15, 0.1), + 'private/cancel-order': (15, 0.1), + 'private/cancel-all-orders': (15, 0.1), + 'private/margin/create-order': (15, 0.1), + 'private/margin/cancel-order': (15, 0.1), + 'private/margin/cancel-all-orders': (15, 0.1), + + 'private/get-order-detail': (30, 0.1), + 'private/margin/get-order-detail': (30, 0.1), + + 'private/get-trades': (1, 1), + 'private/margin/get-trades': (1, 1), + 'private/get-order-history': (1, 1), + 'private/margin/get-order-history': (1, 1) + } # NOTE: do not change this, due to crypto.com rate-limits # TODO: add more strict settings, req/per second or milliseconds - self.semaphore = asyncio.Semaphore(20) if not auth_required: return @@ -79,22 +97,37 @@ class ApiProvider: async def request(self, method, path, params=None, data=None, sign=False): original_data = data timeout = aiohttp.ClientTimeout(total=self.timeout) + request_type = path.split('/')[0] + + if not (path in self.limits.keys()) and request_type == 'public': + rate_limit, period = 100, 1 + elif not (path in self.limits.keys()) and request_type == 'private': + rate_limit, period = 3, 0.1 + elif not (path in self.limits.keys()): + raise ApiError(f'Wrong path: {path}') + else: + rate_limit, period = self.limits[path] + + rate_limiter = RateLimiter(rate_limit=rate_limit, period=period, + concurrency_limit=1) + for count in range(self.retries + 1): if sign: data = self._sign(path, original_data) try: - async with aiohttp.ClientSession(timeout=timeout) as session: - async with self.semaphore: - resp = await session.request( - method, urljoin(self.root_url, path), - params=params, json=data, - headers={'content-type': 'application/json'} - ) - resp_json = await resp.json() - if resp.status != 200: - raise ApiError( - f"Error: {resp_json}. " - f"Status: {resp.status}. Json params: {data}") + async with rate_limiter: + async with aiohttp.ClientSession(timeout=timeout) as session: + async with rate_limiter.throttle(): + resp = await session.request( + method, urljoin(self.root_url, path), + params=params, json=data, + headers={'content-type': 'application/json'} + ) + resp_json = await resp.json() + if resp.status != 200: + raise ApiError( + f"Error: {resp_json}. " + f"Status: {resp.status}. Json params: {data}") except aiohttp.ClientConnectorError: raise ApiError(f"Cannot connect to host {self.root_url}") except asyncio.TimeoutError: diff --git a/src/cryptocom/exchange/rate_limiter.py b/src/cryptocom/exchange/rate_limiter.py new file mode 100644 index 0000000..dc58df6 --- /dev/null +++ b/src/cryptocom/exchange/rate_limiter.py @@ -0,0 +1,98 @@ +import asyncio +import math +import time +import traceback + +from contextlib import asynccontextmanager + + +class RateLimiter: + def __init__(self, + rate_limit: int, + period: float or int, # takes seconds + concurrency_limit: int) -> None: + if not rate_limit or rate_limit < 1: + raise ValueError('rate limit must be non zero positive number') + if not concurrency_limit or concurrency_limit < 1: + raise ValueError('concurrent limit must be non zero positive number') + + self.rate_limit = rate_limit + self.period = period + self.tokens_queue = asyncio.Queue(rate_limit) + self.tokens_consumer_task = asyncio.create_task(self.consume_tokens()) + self.semaphore = asyncio.Semaphore(concurrency_limit) + + async def add_token(self) -> None: + await self.tokens_queue.put(1) + return None + + async def consume_tokens(self): + try: + consumption_rate = self.period / self.rate_limit + last_consumption_time = 0 + + while True: + if self.tokens_queue.empty(): + await asyncio.sleep(consumption_rate) + continue + + current_consumption_time = time.monotonic() + total_tokens = self.tokens_queue.qsize() + tokens_to_consume = self.get_tokens_amount_to_consume( + consumption_rate, + current_consumption_time, + last_consumption_time, + total_tokens + ) + + for _ in range(0, tokens_to_consume): + self.tokens_queue.get_nowait() + + last_consumption_time = time.monotonic() + + await asyncio.sleep(consumption_rate) + except asyncio.CancelledError: + raise + except Exception as e: + raise + + @staticmethod + def get_tokens_amount_to_consume(consumption_rate, current_consumption_time, + last_consumption_time, total_tokens): + time_from_last_consumption = current_consumption_time - last_consumption_time + calculated_tokens_to_consume = math.floor(time_from_last_consumption / consumption_rate) + tokens_to_consume = min(total_tokens, calculated_tokens_to_consume) + + return tokens_to_consume + + @asynccontextmanager + async def throttle(self): + await self.semaphore.acquire() + await self.add_token() + try: + yield + finally: + self.semaphore.release() + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + if exc_type: + pass + # print(traceback.format_exc()) + + await self.close() + + async def close(self) -> None: + if self.tokens_consumer_task and not self.tokens_consumer_task.cancelled(): + try: + self.tokens_consumer_task.cancel() + await self.tokens_consumer_task + except asyncio.CancelledError: + # print(traceback.format_exc()) + pass + + except Exception as e: + # print(traceback.format_exc()) + raise diff --git a/tests/test_api.py b/tests/test_api.py index 541cc4e..aff2ce8 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -55,3 +55,18 @@ async def test_wrong_api_response(): api = cro.ApiProvider(auth_required=False) with pytest.raises(cro.ApiError): await api.post('account') + + +# @pytest.mark.asyncio +# async def test_api_rate_limits(): +# api = cro.ApiProvider(from_env=True) +# account = cro.Account(from_env=True) + +# for _ in range(0, 100): +# await account.get_balance() + +# for _ in range(0, 100): +# await account.get_orders_history(cro.pairs.CRO_USDT, page_size=50) + +# for _ in range(0, 100): +# await api.get('public/get-ticker') From 6734410470615298fe3b3de1923d31e71d204d5a Mon Sep 17 00:00:00 2001 From: Irishery Date: Fri, 26 Nov 2021 23:34:41 +0200 Subject: [PATCH 2/9] update RateLimiter logic --- src/cryptocom/exchange/__init__.py | 1 + src/cryptocom/exchange/api.py | 19 +++----- src/cryptocom/exchange/rate_limiter.py | 60 +++++++++++++++++++++----- tests/test_api.py | 13 ++++-- 4 files changed, 65 insertions(+), 28 deletions(-) diff --git a/src/cryptocom/exchange/__init__.py b/src/cryptocom/exchange/__init__.py index 30a3ba4..947080a 100644 --- a/src/cryptocom/exchange/__init__.py +++ b/src/cryptocom/exchange/__init__.py @@ -9,6 +9,7 @@ from .structs import ( from .market import Exchange from .private import Account from .api import ApiError, ApiProvider +from .rate_limiter import RateLimiterError, RateLimiter from . import pairs, coins if platform.system() == 'Windows': diff --git a/src/cryptocom/exchange/api.py b/src/cryptocom/exchange/api.py index 8bb339d..4c395ab 100644 --- a/src/cryptocom/exchange/api.py +++ b/src/cryptocom/exchange/api.py @@ -50,6 +50,8 @@ class ApiProvider: 'private/margin/get-order-history': (1, 1) } + self.rate_limiter = RateLimiter(self.limits, 1) + # NOTE: do not change this, due to crypto.com rate-limits # TODO: add more strict settings, req/per second or milliseconds @@ -97,27 +99,16 @@ class ApiProvider: async def request(self, method, path, params=None, data=None, sign=False): original_data = data timeout = aiohttp.ClientTimeout(total=self.timeout) - request_type = path.split('/')[0] - - if not (path in self.limits.keys()) and request_type == 'public': - rate_limit, period = 100, 1 - elif not (path in self.limits.keys()) and request_type == 'private': - rate_limit, period = 3, 0.1 - elif not (path in self.limits.keys()): - raise ApiError(f'Wrong path: {path}') - else: - rate_limit, period = self.limits[path] - rate_limiter = RateLimiter(rate_limit=rate_limit, period=period, - concurrency_limit=1) + self.rate_limiter.set_config(path) for count in range(self.retries + 1): if sign: data = self._sign(path, original_data) try: - async with rate_limiter: + async with self.rate_limiter: async with aiohttp.ClientSession(timeout=timeout) as session: - async with rate_limiter.throttle(): + async with self.rate_limiter.throttle(): resp = await session.request( method, urljoin(self.root_url, path), params=params, json=data, diff --git a/src/cryptocom/exchange/rate_limiter.py b/src/cryptocom/exchange/rate_limiter.py index dc58df6..38b3cb1 100644 --- a/src/cryptocom/exchange/rate_limiter.py +++ b/src/cryptocom/exchange/rate_limiter.py @@ -1,26 +1,64 @@ import asyncio import math import time -import traceback from contextlib import asynccontextmanager +class RateLimiterError(Exception): + pass + + class RateLimiter: def __init__(self, - rate_limit: int, - period: float or int, # takes seconds - concurrency_limit: int) -> None: - if not rate_limit or rate_limit < 1: - raise ValueError('rate limit must be non zero positive number') + limits, + concurrency_limit) -> None: + if not concurrency_limit or concurrency_limit < 1: raise ValueError('concurrent limit must be non zero positive number') + + self.rate_limit = int + self.period = float or int # takes seconds + self.tokens_queue = object # asyncio.Queue expecting + self.tokens_consumer_task = object # asyncio.create_task expecting + self.semaphore = object # asyncio.Semaphore expecting + + self.config_setted = False + self.concurrency_limit = concurrency_limit + self.limits = limits + + def get_url_config_data(self, url): + request_type = url.split('/')[0] + + if not(url in self.limits.keys()): + + if request_type == 'public': + rate_limit, period = 100, 1 + + elif request_type == 'private': + rate_limit, period = 3, 0.1 + + elif not (url in self.limits.keys()): + raise RateLimiterError(f'Wrong path: {url}') + + else: + rate_limit, period = self.limits[url] + + return rate_limit, period + + def set_config(self, url): + rate_limit, period = self.get_url_config_data(url) + + if not rate_limit or rate_limit < 1: + raise ValueError('rate limit must be non zero positive number') self.rate_limit = rate_limit self.period = period self.tokens_queue = asyncio.Queue(rate_limit) self.tokens_consumer_task = asyncio.create_task(self.consume_tokens()) - self.semaphore = asyncio.Semaphore(concurrency_limit) + self.semaphore = asyncio.Semaphore(self.concurrency_limit) + + self.config_setted = True async def add_token(self) -> None: await self.tokens_queue.put(1) @@ -67,6 +105,9 @@ class RateLimiter: @asynccontextmanager async def throttle(self): + if not self.config_setted: + raise RateLimiterError('Config is not setted. You need to set it via set_config() before throttling') + await self.semaphore.acquire() await self.add_token() try: @@ -80,7 +121,6 @@ class RateLimiter: async def __aexit__(self, exc_type, exc_val, exc_tb): if exc_type: pass - # print(traceback.format_exc()) await self.close() @@ -90,9 +130,7 @@ class RateLimiter: self.tokens_consumer_task.cancel() await self.tokens_consumer_task except asyncio.CancelledError: - # print(traceback.format_exc()) pass - except Exception as e: - # print(traceback.format_exc()) + except Exception: raise diff --git a/tests/test_api.py b/tests/test_api.py index aff2ce8..8b7d4fb 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -1,8 +1,11 @@ import os import time import pytest +import aiohttp +import asyncio import cryptocom.exchange as cro +from cryptocom.exchange import rate_limiter def test_timeframe(): @@ -49,11 +52,11 @@ def test_api_args(monkeypatch): async def test_wrong_api_response(): api = cro.ApiProvider(from_env=True) - with pytest.raises(cro.ApiError): + with pytest.raises(cro.RateLimiterError): await api.get('somepath') api = cro.ApiProvider(auth_required=False) - with pytest.raises(cro.ApiError): + with pytest.raises(cro.RateLimiterError): await api.post('account') @@ -62,11 +65,15 @@ async def test_wrong_api_response(): # api = cro.ApiProvider(from_env=True) # account = cro.Account(from_env=True) +# rate_limiter = cro.RateLimiter(cro.api.limits) + # for _ in range(0, 100): -# await account.get_balance() +# print(await account.get_balance()) # for _ in range(0, 100): # await account.get_orders_history(cro.pairs.CRO_USDT, page_size=50) # for _ in range(0, 100): # await api.get('public/get-ticker') + +# async with From 195c47d7320d3b939cf95664eb3b5e6f55e25f4c Mon Sep 17 00:00:00 2001 From: Irishery Date: Thu, 2 Dec 2021 18:56:40 +0200 Subject: [PATCH 4/9] refactored rate_limiting - switched to limiter library --- src/cryptocom/exchange/__init__.py | 1 - src/cryptocom/exchange/api.py | 58 +++++++---- src/cryptocom/exchange/rate_limiter.py | 136 ------------------------- tests/test_api.py | 63 ++++++++---- 4 files changed, 82 insertions(+), 176 deletions(-) delete mode 100644 src/cryptocom/exchange/rate_limiter.py diff --git a/src/cryptocom/exchange/__init__.py b/src/cryptocom/exchange/__init__.py index 947080a..30a3ba4 100644 --- a/src/cryptocom/exchange/__init__.py +++ b/src/cryptocom/exchange/__init__.py @@ -9,7 +9,6 @@ from .structs import ( from .market import Exchange from .private import Account from .api import ApiError, ApiProvider -from .rate_limiter import RateLimiterError, RateLimiter from . import pairs, coins if platform.system() == 'Windows': diff --git a/src/cryptocom/exchange/api.py b/src/cryptocom/exchange/api.py index 4c395ab..17aa007 100644 --- a/src/cryptocom/exchange/api.py +++ b/src/cryptocom/exchange/api.py @@ -1,5 +1,6 @@ import os import json +from re import S import time import hmac import random @@ -7,7 +8,8 @@ import asyncio import hashlib from urllib.parse import urljoin -from .rate_limiter import RateLimiter +from aiolimiter import AsyncLimiter + import aiohttp @@ -32,6 +34,8 @@ class ApiProvider: self.ws_root_url = ws_root_url self.timeout = timeout self.retries = retries + self.limiter = AsyncLimiter(1, 1) + self.last_request = '' self.limits = { # method: (req_limit, period) 'private/create-order': (15, 0.1), @@ -50,11 +54,6 @@ class ApiProvider: 'private/margin/get-order-history': (1, 1) } - self.rate_limiter = RateLimiter(self.limits, 1) - - # NOTE: do not change this, due to crypto.com rate-limits - # TODO: add more strict settings, req/per second or milliseconds - if not auth_required: return @@ -96,29 +95,48 @@ class ApiProvider: ).hexdigest() return data + def set_limit(self, url): + + if not(url in self.limits.keys()): + if url.startswith('private'): + rate_limit, period = 3, 0.1 + + elif url.startswith('public'): + rate_limit, period = 100, 1 + + else: + raise ApiError(f'Wrong path: {url}') + + else: + rate_limit, period = self.limits[url] + + self.limiter.max_rate = rate_limit + self.limiter.time_period = period + async def request(self, method, path, params=None, data=None, sign=False): original_data = data timeout = aiohttp.ClientTimeout(total=self.timeout) - self.rate_limiter.set_config(path) + if not (path == self.last_request): + self.set_limit(path) + self.last_request = path for count in range(self.retries + 1): if sign: data = self._sign(path, original_data) try: - async with self.rate_limiter: - async with aiohttp.ClientSession(timeout=timeout) as session: - async with self.rate_limiter.throttle(): - resp = await session.request( - method, urljoin(self.root_url, path), - params=params, json=data, - headers={'content-type': 'application/json'} - ) - resp_json = await resp.json() - if resp.status != 200: - raise ApiError( - f"Error: {resp_json}. " - f"Status: {resp.status}. Json params: {data}") + async with aiohttp.ClientSession(timeout=timeout) as session: + async with self.limiter: + resp = await session.request( + method, urljoin(self.root_url, path), + params=params, json=data, + headers={'content-type': 'application/json'} + ) + resp_json = await resp.json() + if resp.status != 200: + raise ApiError( + f"Error: {resp_json}. " + f"Status: {resp.status}. Json params: {data}") except aiohttp.ClientConnectorError: raise ApiError(f"Cannot connect to host {self.root_url}") except asyncio.TimeoutError: diff --git a/src/cryptocom/exchange/rate_limiter.py b/src/cryptocom/exchange/rate_limiter.py deleted file mode 100644 index 38b3cb1..0000000 --- a/src/cryptocom/exchange/rate_limiter.py +++ /dev/null @@ -1,136 +0,0 @@ -import asyncio -import math -import time - -from contextlib import asynccontextmanager - - -class RateLimiterError(Exception): - pass - - -class RateLimiter: - def __init__(self, - limits, - concurrency_limit) -> None: - - if not concurrency_limit or concurrency_limit < 1: - raise ValueError('concurrent limit must be non zero positive number') - - self.rate_limit = int - self.period = float or int # takes seconds - self.tokens_queue = object # asyncio.Queue expecting - self.tokens_consumer_task = object # asyncio.create_task expecting - self.semaphore = object # asyncio.Semaphore expecting - - self.config_setted = False - self.concurrency_limit = concurrency_limit - self.limits = limits - - def get_url_config_data(self, url): - request_type = url.split('/')[0] - - if not(url in self.limits.keys()): - - if request_type == 'public': - rate_limit, period = 100, 1 - - elif request_type == 'private': - rate_limit, period = 3, 0.1 - - elif not (url in self.limits.keys()): - raise RateLimiterError(f'Wrong path: {url}') - - else: - rate_limit, period = self.limits[url] - - return rate_limit, period - - def set_config(self, url): - rate_limit, period = self.get_url_config_data(url) - - if not rate_limit or rate_limit < 1: - raise ValueError('rate limit must be non zero positive number') - - self.rate_limit = rate_limit - self.period = period - self.tokens_queue = asyncio.Queue(rate_limit) - self.tokens_consumer_task = asyncio.create_task(self.consume_tokens()) - self.semaphore = asyncio.Semaphore(self.concurrency_limit) - - self.config_setted = True - - async def add_token(self) -> None: - await self.tokens_queue.put(1) - return None - - async def consume_tokens(self): - try: - consumption_rate = self.period / self.rate_limit - last_consumption_time = 0 - - while True: - if self.tokens_queue.empty(): - await asyncio.sleep(consumption_rate) - continue - - current_consumption_time = time.monotonic() - total_tokens = self.tokens_queue.qsize() - tokens_to_consume = self.get_tokens_amount_to_consume( - consumption_rate, - current_consumption_time, - last_consumption_time, - total_tokens - ) - - for _ in range(0, tokens_to_consume): - self.tokens_queue.get_nowait() - - last_consumption_time = time.monotonic() - - await asyncio.sleep(consumption_rate) - except asyncio.CancelledError: - raise - except Exception as e: - raise - - @staticmethod - def get_tokens_amount_to_consume(consumption_rate, current_consumption_time, - last_consumption_time, total_tokens): - time_from_last_consumption = current_consumption_time - last_consumption_time - calculated_tokens_to_consume = math.floor(time_from_last_consumption / consumption_rate) - tokens_to_consume = min(total_tokens, calculated_tokens_to_consume) - - return tokens_to_consume - - @asynccontextmanager - async def throttle(self): - if not self.config_setted: - raise RateLimiterError('Config is not setted. You need to set it via set_config() before throttling') - - await self.semaphore.acquire() - await self.add_token() - try: - yield - finally: - self.semaphore.release() - - async def __aenter__(self): - return self - - async def __aexit__(self, exc_type, exc_val, exc_tb): - if exc_type: - pass - - await self.close() - - async def close(self) -> None: - if self.tokens_consumer_task and not self.tokens_consumer_task.cancelled(): - try: - self.tokens_consumer_task.cancel() - await self.tokens_consumer_task - except asyncio.CancelledError: - pass - - except Exception: - raise diff --git a/tests/test_api.py b/tests/test_api.py index 8b7d4fb..aad16da 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -1,11 +1,8 @@ import os import time import pytest -import aiohttp -import asyncio import cryptocom.exchange as cro -from cryptocom.exchange import rate_limiter def test_timeframe(): @@ -52,28 +49,56 @@ def test_api_args(monkeypatch): async def test_wrong_api_response(): api = cro.ApiProvider(from_env=True) - with pytest.raises(cro.RateLimiterError): + with pytest.raises(cro.ApiError): await api.get('somepath') api = cro.ApiProvider(auth_required=False) - with pytest.raises(cro.RateLimiterError): + with pytest.raises(cro.ApiError): await api.post('account') -# @pytest.mark.asyncio -# async def test_api_rate_limits(): -# api = cro.ApiProvider(from_env=True) -# account = cro.Account(from_env=True) +@pytest.mark.asyncio +async def test_api_rate_limits(): + api = cro.ApiProvider(from_env=True) + pair = cro.pairs.CRO_USDT + + page = 0 + page_size = 50 + + params = {'page_size': page_size, 'page': page} + + if pair: + params['instrument_name'] = pair.name + + start_time = time.time() + await api.post('private/get-order-history', {'params': params}) + await api.post('private/get-order-history', {'params': params}) + finish_time = (time.time() - start_time) + + assert finish_time > 1 + + start_time = time.time() + await api.post('private/get-order-history', {'params': params}) + await api.post('private/get-order-history', {'params': params}) + await api.post('private/get-order-history', {'params': params}) + await api.post('private/get-order-history', {'params': params}) + + finish_time = time.time() - start_time + assert finish_time > 4 + + start_time = time.time() + await api.get('public/get-instruments') + await api.get('public/get-instruments') + await api.get('public/get-instruments') + await api.get('public/get-instruments') -# rate_limiter = cro.RateLimiter(cro.api.limits) + finish_time = time.time() - start_time + assert finish_time < 4 -# for _ in range(0, 100): -# print(await account.get_balance()) + start_time = time.time() + await api.post('private/get-order-history', {'params': params}) + await api.post('private/get-order-history', {'params': params}) + await api.post('private/get-order-history', {'params': params}) -# for _ in range(0, 100): -# await account.get_orders_history(cro.pairs.CRO_USDT, page_size=50) - -# for _ in range(0, 100): -# await api.get('public/get-ticker') - -# async with + finish_time = time.time() - start_time + assert finish_time > 3 From ef10e26229872058bab0122fb3e19348d266c79e Mon Sep 17 00:00:00 2001 From: Irishery Date: Thu, 2 Dec 2021 19:21:17 +0200 Subject: [PATCH 5/9] setup update --- setup.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 8d8790f..aafc76e 100644 --- a/setup.py +++ b/setup.py @@ -38,7 +38,8 @@ setup( packages=find_packages('src'), install_requires=[ 'aiohttp', - 'cached-property' + 'cached-property', + 'aiolimiter' ], extras_require={ 'dev': [ From f29a2ccc7dfc47f1e2d36d51e9d9cb7032853a95 Mon Sep 17 00:00:00 2001 From: Irishery Date: Thu, 2 Dec 2021 21:46:42 +0200 Subject: [PATCH 6/9] Refactored RPS --- src/cryptocom/exchange/api.py | 89 +++++++++++++++++++++++------------ tests/test_api.py | 27 +++++------ 2 files changed, 70 insertions(+), 46 deletions(-) diff --git a/src/cryptocom/exchange/api.py b/src/cryptocom/exchange/api.py index 17aa007..c66ad6e 100644 --- a/src/cryptocom/exchange/api.py +++ b/src/cryptocom/exchange/api.py @@ -1,6 +1,6 @@ import os import json -from re import S +import re import time import hmac import random @@ -34,19 +34,18 @@ class ApiProvider: self.ws_root_url = ws_root_url self.timeout = timeout self.retries = retries - self.limiter = AsyncLimiter(1, 1) - self.last_request = '' + self.last_request_path = '' self.limits = { # method: (req_limit, period) - 'private/create-order': (15, 0.1), - 'private/cancel-order': (15, 0.1), - 'private/cancel-all-orders': (15, 0.1), - 'private/margin/create-order': (15, 0.1), - 'private/margin/cancel-order': (15, 0.1), - 'private/margin/cancel-all-orders': (15, 0.1), + 'private/create-order': (14, 0.1), + 'private/cancel-order': (14, 0.1), + 'private/cancel-all-orders': (14, 0.1), + 'private/margin/create-order': (14, 0.1), + 'private/margin/cancel-order': (14, 0.1), + 'private/margin/cancel-all-orders': (14, 0.1), - 'private/get-order-detail': (30, 0.1), - 'private/margin/get-order-detail': (30, 0.1), + 'private/get-order-detail': (29, 0.1), + 'private/margin/get-order-detail': (29, 0.1), 'private/get-trades': (1, 1), 'private/margin/get-trades': (1, 1), @@ -54,6 +53,38 @@ class ApiProvider: 'private/margin/get-order-history': (1, 1) } + # lists for methods - aiolimiter metching + self.order_methods = [ + 'private/create-order', + 'private/cancel-order', + 'private/cancel-all-orders', + 'private/margin/create-order', + 'private/margin/cancel-order', + 'private/margin/cancel-all-orders', + ] + + self.order_methods_limit = AsyncLimiter(14, 0.1) + + self.detail_methods = [ + 'private/get-order-detail', + 'private/margin/get-order-detail', + ] + + self.detail_methods_limit = AsyncLimiter(29, 0.1) + + self.general_trade_methods = [ + 'private/get-trades', + 'private/margin/get-trades', + 'private/get-order-history', + 'private/margin/get-order-history' + ] + + self.general_trade_methods_limit = AsyncLimiter(1, 1) + + # limits for not matched methods + self.general_private_limit = AsyncLimiter(3, 0.1) + self.general_public_limit = AsyncLimiter(100, 1) + if not auth_required: return @@ -95,38 +126,34 @@ class ApiProvider: ).hexdigest() return data - def set_limit(self, url): - - if not(url in self.limits.keys()): - if url.startswith('private'): - rate_limit, period = 3, 0.1 - - elif url.startswith('public'): - rate_limit, period = 100, 1 - - else: - raise ApiError(f'Wrong path: {url}') - - else: - rate_limit, period = self.limits[url] + def set_limit(self, path): + if path in self.order_methods: + return self.order_methods_limit + elif path in self.detail_methods: + return self.detail_methods_limit + elif path in self.general_trade_methods: + return self.general_trade_methods_limit - self.limiter.max_rate = rate_limit - self.limiter.time_period = period + else: + if path.startswith('private'): + return self.general_private_limit + elif path.startswith('public'): + return self.general_public_limit + else: + raise ApiError(f'Wrong path: {path}') async def request(self, method, path, params=None, data=None, sign=False): original_data = data timeout = aiohttp.ClientTimeout(total=self.timeout) - if not (path == self.last_request): - self.set_limit(path) - self.last_request = path + limiter = self.set_limit(path) for count in range(self.retries + 1): if sign: data = self._sign(path, original_data) try: async with aiohttp.ClientSession(timeout=timeout) as session: - async with self.limiter: + async with limiter: resp = await session.request( method, urljoin(self.root_url, path), params=params, json=data, diff --git a/tests/test_api.py b/tests/test_api.py index aad16da..e797411 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -1,3 +1,5 @@ +import asyncio +from asyncio.events import get_running_loop import os import time import pytest @@ -71,34 +73,29 @@ async def test_api_rate_limits(): params['instrument_name'] = pair.name start_time = time.time() - await api.post('private/get-order-history', {'params': params}) - await api.post('private/get-order-history', {'params': params}) - finish_time = (time.time() - start_time) + tasks = [api.post('private/get-order-history', {'params': params}) for i in range(2)] + await asyncio.gather(*tasks) + finish_time = (time.time() - start_time) assert finish_time > 1 start_time = time.time() - await api.post('private/get-order-history', {'params': params}) - await api.post('private/get-order-history', {'params': params}) - await api.post('private/get-order-history', {'params': params}) - await api.post('private/get-order-history', {'params': params}) + tasks = [api.post('private/get-order-history', {'params': params}) for _ in range(5)] + await asyncio.gather(*tasks) finish_time = time.time() - start_time assert finish_time > 4 start_time = time.time() - await api.get('public/get-instruments') - await api.get('public/get-instruments') - await api.get('public/get-instruments') - await api.get('public/get-instruments') + tasks = [api.get('public/get-instruments') for _ in range(200)] + await asyncio.gather(*tasks) finish_time = time.time() - start_time - assert finish_time < 4 + assert finish_time > 1 start_time = time.time() - await api.post('private/get-order-history', {'params': params}) - await api.post('private/get-order-history', {'params': params}) - await api.post('private/get-order-history', {'params': params}) + tasks = [api.post('private/get-order-history', {'params': params}) for _ in range(4)] + await asyncio.gather(*tasks) finish_time = time.time() - start_time assert finish_time > 3 From a930ddd0920818984ddd631bbcdf0a80d2f80b71 Mon Sep 17 00:00:00 2001 From: Irishery Date: Fri, 3 Dec 2021 00:52:37 +0200 Subject: [PATCH 7/9] RPS update --- src/cryptocom/exchange/api.py | 85 ++++++++++++++--------------------- tests/test_api.py | 1 - 2 files changed, 34 insertions(+), 52 deletions(-) diff --git a/src/cryptocom/exchange/api.py b/src/cryptocom/exchange/api.py index c66ad6e..e91971e 100644 --- a/src/cryptocom/exchange/api.py +++ b/src/cryptocom/exchange/api.py @@ -1,6 +1,5 @@ import os import json -import re import time import hmac import random @@ -16,6 +15,33 @@ import aiohttp from aiohttp.client_exceptions import ContentTypeError +RATE_LIMITS = { + # order methods + ( + 'private/create-order', + 'private/cancel-order', + 'private/cancel-all-orders', + 'private/margin/create-order', + 'private/margin/cancel-order', + 'private/margin/cancel-all-orders', + ): (14, 0.1), + + # order detail methods + ( + 'private/get-order-detail', + 'private/margin/get-order-detail', + ): (29, 0.1), + + # general trade methods + ( + 'private/get-trades', + 'private/margin/get-trades', + 'private/get-order-history', + 'private/margin/get-order-history' + ): (1, 1) +} + + class ApiError(Exception): pass @@ -35,51 +61,12 @@ class ApiProvider: self.timeout = timeout self.retries = retries self.last_request_path = '' - self.limits = { - # method: (req_limit, period) - 'private/create-order': (14, 0.1), - 'private/cancel-order': (14, 0.1), - 'private/cancel-all-orders': (14, 0.1), - 'private/margin/create-order': (14, 0.1), - 'private/margin/cancel-order': (14, 0.1), - 'private/margin/cancel-all-orders': (14, 0.1), - - 'private/get-order-detail': (29, 0.1), - 'private/margin/get-order-detail': (29, 0.1), - - 'private/get-trades': (1, 1), - 'private/margin/get-trades': (1, 1), - 'private/get-order-history': (1, 1), - 'private/margin/get-order-history': (1, 1) - } - - # lists for methods - aiolimiter metching - self.order_methods = [ - 'private/create-order', - 'private/cancel-order', - 'private/cancel-all-orders', - 'private/margin/create-order', - 'private/margin/cancel-order', - 'private/margin/cancel-all-orders', - ] - self.order_methods_limit = AsyncLimiter(14, 0.1) + self.rate_limiters = {} - self.detail_methods = [ - 'private/get-order-detail', - 'private/margin/get-order-detail', - ] - - self.detail_methods_limit = AsyncLimiter(29, 0.1) - - self.general_trade_methods = [ - 'private/get-trades', - 'private/margin/get-trades', - 'private/get-order-history', - 'private/margin/get-order-history' - ] - - self.general_trade_methods_limit = AsyncLimiter(1, 1) + for urls in RATE_LIMITS: + for url in urls: + self.rate_limiters[url] = AsyncLimiter(*RATE_LIMITS[urls]) # limits for not matched methods self.general_private_limit = AsyncLimiter(3, 0.1) @@ -127,13 +114,9 @@ class ApiProvider: return data def set_limit(self, path): - if path in self.order_methods: - return self.order_methods_limit - elif path in self.detail_methods: - return self.detail_methods_limit - elif path in self.general_trade_methods: - return self.general_trade_methods_limit - + if path in self.rate_limiters.keys(): + return self.rate_limiters[path] + else: if path.startswith('private'): return self.general_private_limit diff --git a/tests/test_api.py b/tests/test_api.py index e797411..be4e34f 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -1,5 +1,4 @@ import asyncio -from asyncio.events import get_running_loop import os import time import pytest From 4978530a0e93648fc3a2541480fc2d520f943eb5 Mon Sep 17 00:00:00 2001 From: Irishery Date: Fri, 3 Dec 2021 01:32:30 +0200 Subject: [PATCH 8/9] a bit refactoring --- README.md | 1 + src/cryptocom/exchange/api.py | 5 ++--- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 8ec256c..8ebbb75 100644 --- a/README.md +++ b/README.md @@ -27,6 +27,7 @@ Exchange original API docs: [https://exchange-docs.crypto.com](https://exchange- ### Changelog +- **0.9.3** - added RPS limiter by @Irishery - **0.9.2** - fixed event loop import level - **0.9.1** - fixed Windows bug with asyncio event loop - **0.9.0** - updated coins, refactored wallet transactions diff --git a/src/cryptocom/exchange/api.py b/src/cryptocom/exchange/api.py index e91971e..1543e2d 100644 --- a/src/cryptocom/exchange/api.py +++ b/src/cryptocom/exchange/api.py @@ -113,10 +113,9 @@ class ApiProvider: ).hexdigest() return data - def set_limit(self, path): + def get_limit(self, path): if path in self.rate_limiters.keys(): return self.rate_limiters[path] - else: if path.startswith('private'): return self.general_private_limit @@ -129,7 +128,7 @@ class ApiProvider: original_data = data timeout = aiohttp.ClientTimeout(total=self.timeout) - limiter = self.set_limit(path) + limiter = self.get_limit(path) for count in range(self.retries + 1): if sign: From 9c9cb600bacfd5a313babb98dc15e0319511a5e5 Mon Sep 17 00:00:00 2001 From: Irishery Date: Fri, 3 Dec 2021 01:37:10 +0200 Subject: [PATCH 9/9] update version --- src/cryptocom/exchange/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cryptocom/exchange/__init__.py b/src/cryptocom/exchange/__init__.py index 30a3ba4..4d94fae 100644 --- a/src/cryptocom/exchange/__init__.py +++ b/src/cryptocom/exchange/__init__.py @@ -23,4 +23,4 @@ __all__ = [ 'ApiError', 'ApiProvider' ] -__version__ = '0.9.2' +__version__ = '0.9.3'