From f48c78aab78aa87c2b8222ebe608cdbb58edcec8 Mon Sep 17 00:00:00 2001 From: Sambhav Kothari Date: Fri, 21 Jul 2017 16:06:34 +0530 Subject: [PATCH] Add advanced rate limiting to WS Based on code by @rbeasley and PR #473 --- picard/webservice/__init__.py | 133 ++++++++++++++++++++++++++++--- picard/webservice/api_helpers.py | 6 +- 2 files changed, 127 insertions(+), 12 deletions(-) diff --git a/picard/webservice/__init__.py b/picard/webservice/__init__.py index a2f484553..1e6eecede 100644 --- a/picard/webservice/__init__.py +++ b/picard/webservice/__init__.py @@ -43,7 +43,7 @@ from picard.util import build_qurl, parse_json from picard.util.xml import parse_xml COUNT_REQUESTS_DELAY_MS = 250 -REQUEST_DELAY = defaultdict(lambda: 1000) + TEMP_ERRORS_RETRIES = 5 USER_AGENT_STRING = '%s-%s/%s (%s;%s-%s)' % (PICARD_ORG_NAME, PICARD_APP_NAME, PICARD_VERSION_STR, @@ -54,6 +54,44 @@ CLIENT_STRING = string_(QUrl.toPercentEncoding('%s %s-%s' % (PICARD_ORG_NAME, PICARD_APP_NAME, PICARD_VERSION_STR))) +# ============================================================================ +# Throttling/congestion avoidance +# ============================================================================ + +#: Throttles requests to a given hostkey by assigning a minimum delay between +#: requests in milliseconds. +#: +#: Plugins may assign limits to their associated service(s) like so: +#: +#: >>> from picard.webservice import REQUEST_DELAY_MINIMUM +#: >>> REQUEST_DELAY_MINIMUM[('myservice.org', 80)] = 100 # 10 requests/second +REQUEST_DELAY_MINIMUM = defaultdict(lambda: 1000) + +#: Current delay (adaptive) between requests to a given hostkey. +REQUEST_DELAY = defaultdict(lambda: 1000) # Conservative initial value. + +#: Determines delay during exponential backoff phase. +REQUEST_DELAY_EXPONENT = defaultdict(lambda: 0) + +#: Unacknowledged request counter. +#: +#: Bump this when handing a request to QNetworkManager and trim when receiving +#: a response. +CONGESTION_UNACK = defaultdict(lambda: 0) + +#: Congestion window size in terms of unacked requests. +#: +#: We're allowed to send up to `int(this)` many requests at a time. +CONGESTION_WINDOW_SIZE = defaultdict(lambda: 1.0) + +#: Slow start threshold. +#: +#: After placing this many unacknowledged requests on the wire, switch from +#: slow start to congestion avoidance. (See `_adjust_throttle`.) Initialized +#: upon encountering a temporary error. +CONGESTION_SSTHRESH = defaultdict(lambda: 0) + + DEFAULT_RESPONSE_PARSER_TYPE = "json" Parser = namedtuple('Parser', 'mimetype parser') @@ -289,7 +327,68 @@ class WebService(QtCore.QObject): proxy.setPassword(config.setting["proxy_password"]) self.manager.setProxy(proxy) + @staticmethod + def _adjust_throttle(hostkey, slow_down): + """Adjust `REQUEST` and `CONGESTION` metrics when a HTTP request completes. + :param hostkey: `(host, port)`. + :param slow_down: `True` if we encountered intermittent server trouble + and need to slow down. + """ + def in_backoff_phase(hostkey): + return CONGESTION_UNACK[hostkey] > CONGESTION_WINDOW_SIZE[hostkey] + + if slow_down: + # Backoff exponentially until ~30 seconds between requests. + delay = max(pow(2, REQUEST_DELAY_EXPONENT[hostkey]) * 1000, + REQUEST_DELAY_MINIMUM[hostkey]) + log.debug('WebService: %s: delay: %dms -> %dms.', hostkey, REQUEST_DELAY[hostkey], + delay) + REQUEST_DELAY[hostkey] = delay + + REQUEST_DELAY_EXPONENT[hostkey] = min(REQUEST_DELAY_EXPONENT[hostkey] + 1, 5) + + # Slow start threshold is ~1/2 of the window size up until we saw + # trouble. Shrink the new window size back to 1. + CONGESTION_SSTHRESH[hostkey] = int(CONGESTION_WINDOW_SIZE[hostkey] / 2.0) + log.debug('WebService: %s: ssthresh: %d.', hostkey, CONGESTION_SSTHRESH[hostkey]) + + CONGESTION_WINDOW_SIZE[hostkey] = 1.0 + log.debug('WebService: %s: cws: %.3f.', hostkey, CONGESTION_WINDOW_SIZE[hostkey]) + + elif not in_backoff_phase(hostkey): + REQUEST_DELAY_EXPONENT[hostkey] = 0 # Coming out of backoff, so reset. + + # Shrink the delay between requests with each successive reply to + # converge on maximum throughput. + delay = max(int(REQUEST_DELAY[hostkey] / 2), REQUEST_DELAY_MINIMUM[hostkey]) + if delay != REQUEST_DELAY[hostkey]: + log.debug('WebService: %s: delay: %dms -> %dms.', hostkey, REQUEST_DELAY[hostkey], + delay) + REQUEST_DELAY[hostkey] = delay + + cws = CONGESTION_WINDOW_SIZE[hostkey] + sst = CONGESTION_SSTHRESH[hostkey] + + if sst and cws >= sst: + # Analogous to TCP's congestion avoidance phase. Window growth is linear. + phase = 'congestion avoidance' + cws = cws + (1.0 / cws) + else: + # Analogous to TCP's slow start phase. Window growth is exponential. + phase = 'slow start' + cws += 1 + + if CONGESTION_WINDOW_SIZE[hostkey] != cws: + log.debug('WebService: %s: %s: window size %.3f -> %.3f', hostkey, phase, + CONGESTION_WINDOW_SIZE[hostkey], cws) + CONGESTION_WINDOW_SIZE[hostkey] = cws + def _send_request(self, request, access_token=None): + hostkey = request.get_host_key() + # Increment the number of unack'd requests on sending a new one + CONGESTION_UNACK[hostkey] += 1 + log.debug("WebService: %s: outstanding reqs: %d", hostkey, CONGESTION_UNACK[hostkey]) + request.access_token = access_token send = self._request_methods[request.method] data = request.data @@ -333,14 +432,13 @@ class WebService(QtCore.QObject): original_host = string_(url.host()) original_port = self.url_port(url) - - if ((original_host, original_port) in REQUEST_DELAY - and (redirect_host, redirect_port) not in REQUEST_DELAY): - log.debug("Setting rate limit for %s:%i to %i" % - (redirect_host, redirect_port, - REQUEST_DELAY[(original_host, original_port)])) - REQUEST_DELAY[(redirect_host, redirect_port)] =\ - REQUEST_DELAY[(original_host, original_port)] + original_host_key = (original_host, original_port) + redirect_host_key = (redirect_host, redirect_port) + if (original_host_key in REQUEST_DELAY_MINIMUM + and redirect_host_key not in REQUEST_DELAY_MINIMUM): + log.debug("Setting the minimum rate limit for %s to %i" % + (redirect_host_key, REQUEST_DELAY_MINIMUM[original_host_key])) + REQUEST_DELAY_MINIMUM[redirect_host_key] = REQUEST_DELAY_MINIMUM[original_host_key] self.get(redirect_host, redirect_port, @@ -355,6 +453,12 @@ class WebService(QtCore.QObject): handler(reply.readAll(), reply, error) def _handle_reply(self, reply, request): + hostkey = request.get_host_key() + CONGESTION_UNACK[hostkey] -= 1 + log.debug("WebService: %s: outstanding reqs: %d", hostkey, CONGESTION_UNACK[hostkey]) + self._timer_run_next_task.start(0) + + slow_down = False error = int(reply.error()) handler = request.handler @@ -380,6 +484,8 @@ class WebService(QtCore.QObject): elif handler is not None: handler(reply.readAll(), reply, error) + + slow_down = True else: redirect = reply.attribute(QtNetwork.QNetworkRequest.RedirectionTargetAttribute) fromCache = reply.attribute(QtNetwork.QNetworkRequest.SourceIsFromCacheAttribute) @@ -405,6 +511,8 @@ class WebService(QtCore.QObject): else: handler(reply.readAll(), reply, error) + self._adjust_throttle(hostkey, slow_down) + def _process_reply(self, reply): try: request = self._active_requests.pop(reply) @@ -480,6 +588,13 @@ class WebService(QtCore.QObject): wait is True if a delay is needed delay is the delay in milliseconds to next request """ + + if CONGESTION_UNACK[hostkey] >= int(CONGESTION_WINDOW_SIZE[hostkey]): + # We've maxed out the number of requests to `hostkey`, so wait + # until responses begin to come back. (See `_timer_run_next_task` + # strobe in `_handle_reply`.) + return (True, sys.maxsize) + interval = REQUEST_DELAY[hostkey] if not interval: log.debug("WSREQ: Starting another request to %s without delay", hostkey) diff --git a/picard/webservice/api_helpers.py b/picard/webservice/api_helpers.py index 45835e992..5d08174b5 100644 --- a/picard/webservice/api_helpers.py +++ b/picard/webservice/api_helpers.py @@ -26,10 +26,10 @@ from picard.const import (ACOUSTID_KEY, CAA_HOST, CAA_PORT) -from picard.webservice import CLIENT_STRING, REQUEST_DELAY +from picard.webservice import CLIENT_STRING, REQUEST_DELAY_MINIMUM -REQUEST_DELAY[(ACOUSTID_HOST, ACOUSTID_PORT)] = 333 -REQUEST_DELAY[(CAA_HOST, CAA_PORT)] = 0 +REQUEST_DELAY_MINIMUM[(ACOUSTID_HOST, ACOUSTID_PORT)] = 333 +REQUEST_DELAY_MINIMUM[(CAA_HOST, CAA_PORT)] = 0 def escape_lucene_query(text):