Add advanced rate limiting to WS

Based on code by @rbeasley and PR #473
This commit is contained in:
Sambhav Kothari
2017-07-21 16:06:34 +05:30
parent bef4ad0425
commit f48c78aab7
2 changed files with 127 additions and 12 deletions

View File

@@ -43,7 +43,7 @@ from picard.util import build_qurl, parse_json
from picard.util.xml import parse_xml
COUNT_REQUESTS_DELAY_MS = 250
REQUEST_DELAY = defaultdict(lambda: 1000)
TEMP_ERRORS_RETRIES = 5
USER_AGENT_STRING = '%s-%s/%s (%s;%s-%s)' % (PICARD_ORG_NAME, PICARD_APP_NAME,
PICARD_VERSION_STR,
@@ -54,6 +54,44 @@ CLIENT_STRING = string_(QUrl.toPercentEncoding('%s %s-%s' % (PICARD_ORG_NAME,
PICARD_APP_NAME,
PICARD_VERSION_STR)))
# ============================================================================
# Throttling/congestion avoidance
# ============================================================================
#: Throttles requests to a given hostkey by assigning a minimum delay between
#: requests in milliseconds.
#:
#: Plugins may assign limits to their associated service(s) like so:
#:
#: >>> from picard.webservice import REQUEST_DELAY_MINIMUM
#: >>> REQUEST_DELAY_MINIMUM[('myservice.org', 80)] = 100 # 10 requests/second
REQUEST_DELAY_MINIMUM = defaultdict(lambda: 1000)
#: Current delay (adaptive) between requests to a given hostkey.
REQUEST_DELAY = defaultdict(lambda: 1000) # Conservative initial value.
#: Determines delay during exponential backoff phase.
REQUEST_DELAY_EXPONENT = defaultdict(lambda: 0)
#: Unacknowledged request counter.
#:
#: Bump this when handing a request to QNetworkManager and trim when receiving
#: a response.
CONGESTION_UNACK = defaultdict(lambda: 0)
#: Congestion window size in terms of unacked requests.
#:
#: We're allowed to send up to `int(this)` many requests at a time.
CONGESTION_WINDOW_SIZE = defaultdict(lambda: 1.0)
#: Slow start threshold.
#:
#: After placing this many unacknowledged requests on the wire, switch from
#: slow start to congestion avoidance. (See `_adjust_throttle`.) Initialized
#: upon encountering a temporary error.
CONGESTION_SSTHRESH = defaultdict(lambda: 0)
DEFAULT_RESPONSE_PARSER_TYPE = "json"
Parser = namedtuple('Parser', 'mimetype parser')
@@ -289,7 +327,68 @@ class WebService(QtCore.QObject):
proxy.setPassword(config.setting["proxy_password"])
self.manager.setProxy(proxy)
@staticmethod
def _adjust_throttle(hostkey, slow_down):
"""Adjust `REQUEST` and `CONGESTION` metrics when a HTTP request completes.
:param hostkey: `(host, port)`.
:param slow_down: `True` if we encountered intermittent server trouble
and need to slow down.
"""
def in_backoff_phase(hostkey):
return CONGESTION_UNACK[hostkey] > CONGESTION_WINDOW_SIZE[hostkey]
if slow_down:
# Backoff exponentially until ~30 seconds between requests.
delay = max(pow(2, REQUEST_DELAY_EXPONENT[hostkey]) * 1000,
REQUEST_DELAY_MINIMUM[hostkey])
log.debug('WebService: %s: delay: %dms -> %dms.', hostkey, REQUEST_DELAY[hostkey],
delay)
REQUEST_DELAY[hostkey] = delay
REQUEST_DELAY_EXPONENT[hostkey] = min(REQUEST_DELAY_EXPONENT[hostkey] + 1, 5)
# Slow start threshold is ~1/2 of the window size up until we saw
# trouble. Shrink the new window size back to 1.
CONGESTION_SSTHRESH[hostkey] = int(CONGESTION_WINDOW_SIZE[hostkey] / 2.0)
log.debug('WebService: %s: ssthresh: %d.', hostkey, CONGESTION_SSTHRESH[hostkey])
CONGESTION_WINDOW_SIZE[hostkey] = 1.0
log.debug('WebService: %s: cws: %.3f.', hostkey, CONGESTION_WINDOW_SIZE[hostkey])
elif not in_backoff_phase(hostkey):
REQUEST_DELAY_EXPONENT[hostkey] = 0 # Coming out of backoff, so reset.
# Shrink the delay between requests with each successive reply to
# converge on maximum throughput.
delay = max(int(REQUEST_DELAY[hostkey] / 2), REQUEST_DELAY_MINIMUM[hostkey])
if delay != REQUEST_DELAY[hostkey]:
log.debug('WebService: %s: delay: %dms -> %dms.', hostkey, REQUEST_DELAY[hostkey],
delay)
REQUEST_DELAY[hostkey] = delay
cws = CONGESTION_WINDOW_SIZE[hostkey]
sst = CONGESTION_SSTHRESH[hostkey]
if sst and cws >= sst:
# Analogous to TCP's congestion avoidance phase. Window growth is linear.
phase = 'congestion avoidance'
cws = cws + (1.0 / cws)
else:
# Analogous to TCP's slow start phase. Window growth is exponential.
phase = 'slow start'
cws += 1
if CONGESTION_WINDOW_SIZE[hostkey] != cws:
log.debug('WebService: %s: %s: window size %.3f -> %.3f', hostkey, phase,
CONGESTION_WINDOW_SIZE[hostkey], cws)
CONGESTION_WINDOW_SIZE[hostkey] = cws
def _send_request(self, request, access_token=None):
hostkey = request.get_host_key()
# Increment the number of unack'd requests on sending a new one
CONGESTION_UNACK[hostkey] += 1
log.debug("WebService: %s: outstanding reqs: %d", hostkey, CONGESTION_UNACK[hostkey])
request.access_token = access_token
send = self._request_methods[request.method]
data = request.data
@@ -333,14 +432,13 @@ class WebService(QtCore.QObject):
original_host = string_(url.host())
original_port = self.url_port(url)
if ((original_host, original_port) in REQUEST_DELAY
and (redirect_host, redirect_port) not in REQUEST_DELAY):
log.debug("Setting rate limit for %s:%i to %i" %
(redirect_host, redirect_port,
REQUEST_DELAY[(original_host, original_port)]))
REQUEST_DELAY[(redirect_host, redirect_port)] =\
REQUEST_DELAY[(original_host, original_port)]
original_host_key = (original_host, original_port)
redirect_host_key = (redirect_host, redirect_port)
if (original_host_key in REQUEST_DELAY_MINIMUM
and redirect_host_key not in REQUEST_DELAY_MINIMUM):
log.debug("Setting the minimum rate limit for %s to %i" %
(redirect_host_key, REQUEST_DELAY_MINIMUM[original_host_key]))
REQUEST_DELAY_MINIMUM[redirect_host_key] = REQUEST_DELAY_MINIMUM[original_host_key]
self.get(redirect_host,
redirect_port,
@@ -355,6 +453,12 @@ class WebService(QtCore.QObject):
handler(reply.readAll(), reply, error)
def _handle_reply(self, reply, request):
hostkey = request.get_host_key()
CONGESTION_UNACK[hostkey] -= 1
log.debug("WebService: %s: outstanding reqs: %d", hostkey, CONGESTION_UNACK[hostkey])
self._timer_run_next_task.start(0)
slow_down = False
error = int(reply.error())
handler = request.handler
@@ -380,6 +484,8 @@ class WebService(QtCore.QObject):
elif handler is not None:
handler(reply.readAll(), reply, error)
slow_down = True
else:
redirect = reply.attribute(QtNetwork.QNetworkRequest.RedirectionTargetAttribute)
fromCache = reply.attribute(QtNetwork.QNetworkRequest.SourceIsFromCacheAttribute)
@@ -405,6 +511,8 @@ class WebService(QtCore.QObject):
else:
handler(reply.readAll(), reply, error)
self._adjust_throttle(hostkey, slow_down)
def _process_reply(self, reply):
try:
request = self._active_requests.pop(reply)
@@ -480,6 +588,13 @@ class WebService(QtCore.QObject):
wait is True if a delay is needed
delay is the delay in milliseconds to next request
"""
if CONGESTION_UNACK[hostkey] >= int(CONGESTION_WINDOW_SIZE[hostkey]):
# We've maxed out the number of requests to `hostkey`, so wait
# until responses begin to come back. (See `_timer_run_next_task`
# strobe in `_handle_reply`.)
return (True, sys.maxsize)
interval = REQUEST_DELAY[hostkey]
if not interval:
log.debug("WSREQ: Starting another request to %s without delay", hostkey)

View File

@@ -26,10 +26,10 @@ from picard.const import (ACOUSTID_KEY,
CAA_HOST,
CAA_PORT)
from picard.webservice import CLIENT_STRING, REQUEST_DELAY
from picard.webservice import CLIENT_STRING, REQUEST_DELAY_MINIMUM
REQUEST_DELAY[(ACOUSTID_HOST, ACOUSTID_PORT)] = 333
REQUEST_DELAY[(CAA_HOST, CAA_PORT)] = 0
REQUEST_DELAY_MINIMUM[(ACOUSTID_HOST, ACOUSTID_PORT)] = 333
REQUEST_DELAY_MINIMUM[(CAA_HOST, CAA_PORT)] = 0
def escape_lucene_query(text):