refactored rate_limiting - switched to limiter library

api-breakage
Irishery 4 years ago
parent 3846a7ad83
commit 195c47d732
  1. 1
      src/cryptocom/exchange/__init__.py
  2. 58
      src/cryptocom/exchange/api.py
  3. 136
      src/cryptocom/exchange/rate_limiter.py
  4. 63
      tests/test_api.py

@ -9,7 +9,6 @@ from .structs import (
from .market import Exchange
from .private import Account
from .api import ApiError, ApiProvider
from .rate_limiter import RateLimiterError, RateLimiter
from . import pairs, coins
if platform.system() == 'Windows':

@ -1,5 +1,6 @@
import os
import json
from re import S
import time
import hmac
import random
@ -7,7 +8,8 @@ import asyncio
import hashlib
from urllib.parse import urljoin
from .rate_limiter import RateLimiter
from aiolimiter import AsyncLimiter
import aiohttp
@ -32,6 +34,8 @@ class ApiProvider:
self.ws_root_url = ws_root_url
self.timeout = timeout
self.retries = retries
self.limiter = AsyncLimiter(1, 1)
self.last_request = ''
self.limits = {
# method: (req_limit, period)
'private/create-order': (15, 0.1),
@ -50,11 +54,6 @@ class ApiProvider:
'private/margin/get-order-history': (1, 1)
}
self.rate_limiter = RateLimiter(self.limits, 1)
# NOTE: do not change this, due to crypto.com rate-limits
# TODO: add more strict settings, req/per second or milliseconds
if not auth_required:
return
@ -96,29 +95,48 @@ class ApiProvider:
).hexdigest()
return data
def set_limit(self, url):
if not(url in self.limits.keys()):
if url.startswith('private'):
rate_limit, period = 3, 0.1
elif url.startswith('public'):
rate_limit, period = 100, 1
else:
raise ApiError(f'Wrong path: {url}')
else:
rate_limit, period = self.limits[url]
self.limiter.max_rate = rate_limit
self.limiter.time_period = period
async def request(self, method, path, params=None, data=None, sign=False):
original_data = data
timeout = aiohttp.ClientTimeout(total=self.timeout)
self.rate_limiter.set_config(path)
if not (path == self.last_request):
self.set_limit(path)
self.last_request = path
for count in range(self.retries + 1):
if sign:
data = self._sign(path, original_data)
try:
async with self.rate_limiter:
async with aiohttp.ClientSession(timeout=timeout) as session:
async with self.rate_limiter.throttle():
resp = await session.request(
method, urljoin(self.root_url, path),
params=params, json=data,
headers={'content-type': 'application/json'}
)
resp_json = await resp.json()
if resp.status != 200:
raise ApiError(
f"Error: {resp_json}. "
f"Status: {resp.status}. Json params: {data}")
async with aiohttp.ClientSession(timeout=timeout) as session:
async with self.limiter:
resp = await session.request(
method, urljoin(self.root_url, path),
params=params, json=data,
headers={'content-type': 'application/json'}
)
resp_json = await resp.json()
if resp.status != 200:
raise ApiError(
f"Error: {resp_json}. "
f"Status: {resp.status}. Json params: {data}")
except aiohttp.ClientConnectorError:
raise ApiError(f"Cannot connect to host {self.root_url}")
except asyncio.TimeoutError:

@ -1,136 +0,0 @@
import asyncio
import math
import time
from contextlib import asynccontextmanager
class RateLimiterError(Exception):
pass
class RateLimiter:
def __init__(self,
limits,
concurrency_limit) -> None:
if not concurrency_limit or concurrency_limit < 1:
raise ValueError('concurrent limit must be non zero positive number')
self.rate_limit = int
self.period = float or int # takes seconds
self.tokens_queue = object # asyncio.Queue expecting
self.tokens_consumer_task = object # asyncio.create_task expecting
self.semaphore = object # asyncio.Semaphore expecting
self.config_setted = False
self.concurrency_limit = concurrency_limit
self.limits = limits
def get_url_config_data(self, url):
request_type = url.split('/')[0]
if not(url in self.limits.keys()):
if request_type == 'public':
rate_limit, period = 100, 1
elif request_type == 'private':
rate_limit, period = 3, 0.1
elif not (url in self.limits.keys()):
raise RateLimiterError(f'Wrong path: {url}')
else:
rate_limit, period = self.limits[url]
return rate_limit, period
def set_config(self, url):
rate_limit, period = self.get_url_config_data(url)
if not rate_limit or rate_limit < 1:
raise ValueError('rate limit must be non zero positive number')
self.rate_limit = rate_limit
self.period = period
self.tokens_queue = asyncio.Queue(rate_limit)
self.tokens_consumer_task = asyncio.create_task(self.consume_tokens())
self.semaphore = asyncio.Semaphore(self.concurrency_limit)
self.config_setted = True
async def add_token(self) -> None:
await self.tokens_queue.put(1)
return None
async def consume_tokens(self):
try:
consumption_rate = self.period / self.rate_limit
last_consumption_time = 0
while True:
if self.tokens_queue.empty():
await asyncio.sleep(consumption_rate)
continue
current_consumption_time = time.monotonic()
total_tokens = self.tokens_queue.qsize()
tokens_to_consume = self.get_tokens_amount_to_consume(
consumption_rate,
current_consumption_time,
last_consumption_time,
total_tokens
)
for _ in range(0, tokens_to_consume):
self.tokens_queue.get_nowait()
last_consumption_time = time.monotonic()
await asyncio.sleep(consumption_rate)
except asyncio.CancelledError:
raise
except Exception as e:
raise
@staticmethod
def get_tokens_amount_to_consume(consumption_rate, current_consumption_time,
last_consumption_time, total_tokens):
time_from_last_consumption = current_consumption_time - last_consumption_time
calculated_tokens_to_consume = math.floor(time_from_last_consumption / consumption_rate)
tokens_to_consume = min(total_tokens, calculated_tokens_to_consume)
return tokens_to_consume
@asynccontextmanager
async def throttle(self):
if not self.config_setted:
raise RateLimiterError('Config is not setted. You need to set it via set_config() before throttling')
await self.semaphore.acquire()
await self.add_token()
try:
yield
finally:
self.semaphore.release()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type:
pass
await self.close()
async def close(self) -> None:
if self.tokens_consumer_task and not self.tokens_consumer_task.cancelled():
try:
self.tokens_consumer_task.cancel()
await self.tokens_consumer_task
except asyncio.CancelledError:
pass
except Exception:
raise

@ -1,11 +1,8 @@
import os
import time
import pytest
import aiohttp
import asyncio
import cryptocom.exchange as cro
from cryptocom.exchange import rate_limiter
def test_timeframe():
@ -52,28 +49,56 @@ def test_api_args(monkeypatch):
async def test_wrong_api_response():
api = cro.ApiProvider(from_env=True)
with pytest.raises(cro.RateLimiterError):
with pytest.raises(cro.ApiError):
await api.get('somepath')
api = cro.ApiProvider(auth_required=False)
with pytest.raises(cro.RateLimiterError):
with pytest.raises(cro.ApiError):
await api.post('account')
# @pytest.mark.asyncio
# async def test_api_rate_limits():
# api = cro.ApiProvider(from_env=True)
# account = cro.Account(from_env=True)
@pytest.mark.asyncio
async def test_api_rate_limits():
api = cro.ApiProvider(from_env=True)
pair = cro.pairs.CRO_USDT
page = 0
page_size = 50
params = {'page_size': page_size, 'page': page}
if pair:
params['instrument_name'] = pair.name
start_time = time.time()
await api.post('private/get-order-history', {'params': params})
await api.post('private/get-order-history', {'params': params})
finish_time = (time.time() - start_time)
assert finish_time > 1
start_time = time.time()
await api.post('private/get-order-history', {'params': params})
await api.post('private/get-order-history', {'params': params})
await api.post('private/get-order-history', {'params': params})
await api.post('private/get-order-history', {'params': params})
finish_time = time.time() - start_time
assert finish_time > 4
start_time = time.time()
await api.get('public/get-instruments')
await api.get('public/get-instruments')
await api.get('public/get-instruments')
await api.get('public/get-instruments')
# rate_limiter = cro.RateLimiter(cro.api.limits)
finish_time = time.time() - start_time
assert finish_time < 4
# for _ in range(0, 100):
# print(await account.get_balance())
start_time = time.time()
await api.post('private/get-order-history', {'params': params})
await api.post('private/get-order-history', {'params': params})
await api.post('private/get-order-history', {'params': params})
# for _ in range(0, 100):
# await account.get_orders_history(cro.pairs.CRO_USDT, page_size=50)
# for _ in range(0, 100):
# await api.get('public/get-ticker')
# async with
finish_time = time.time() - start_time
assert finish_time > 3

Loading…
Cancel
Save