From 8df1e78af485daf3b3d75237a5320c155996facb Mon Sep 17 00:00:00 2001 From: Laurent Monin Date: Fri, 26 Jan 2018 19:02:45 +0100 Subject: [PATCH] Move throttling code to its own file API is modified to use methods to get/set delays, instead of setting global variables --- picard/webservice/__init__.py | 153 ++------------------- picard/webservice/api_helpers.py | 10 +- picard/webservice/ratecontrol.py | 229 +++++++++++++++++++++++++++++++ test/test_webservice.py | 9 +- 4 files changed, 252 insertions(+), 149 deletions(-) create mode 100644 picard/webservice/ratecontrol.py diff --git a/picard/webservice/__init__.py b/picard/webservice/__init__.py index 887ec2646..62e776021 100644 --- a/picard/webservice/__init__.py +++ b/picard/webservice/__init__.py @@ -41,6 +41,7 @@ from picard import (PICARD_APP_NAME, from picard.oauth import OAuthManager from picard.util import build_qurl, parse_json from picard.util.xml import parse_xml +from picard.webservice import ratecontrol COUNT_REQUESTS_DELAY_MS = 250 @@ -54,42 +55,6 @@ CLIENT_STRING = string_(QUrl.toPercentEncoding('%s %s-%s' % (PICARD_ORG_NAME, PICARD_APP_NAME, PICARD_VERSION_STR))) -# ============================================================================ -# Throttling/congestion avoidance -# ============================================================================ - -# Throttles requests to a given hostkey by assigning a minimum delay between -# requests in milliseconds. -# -# Plugins may assign limits to their associated service(s) like so: -# -# >>> from picard.webservice import REQUEST_DELAY_MINIMUM -# >>> REQUEST_DELAY_MINIMUM[('myservice.org', 80)] = 100 # 10 requests/second -REQUEST_DELAY_MINIMUM = defaultdict(lambda: 1000) - -# Current delay (adaptive) between requests to a given hostkey. -REQUEST_DELAY = defaultdict(lambda: 1000) # Conservative initial value. - -# Determines delay during exponential backoff phase. -REQUEST_DELAY_EXPONENT = defaultdict(lambda: 0) - -# Unacknowledged request counter. -# -# Bump this when handing a request to QNetworkManager and trim when receiving -# a response. -CONGESTION_UNACK = defaultdict(lambda: 0) - -# Congestion window size in terms of unacked requests. -# -# We're allowed to send up to `int(this)` many requests at a time. -CONGESTION_WINDOW_SIZE = defaultdict(lambda: 1.0) - -# Slow start threshold. -# -# After placing this many unacknowledged requests on the wire, switch from -# slow start to congestion avoidance. (See `_adjust_throttle`.) Initialized -# upon encountering a temporary error. -CONGESTION_SSTHRESH = defaultdict(lambda: 0) DEFAULT_RESPONSE_PARSER_TYPE = "json" @@ -282,7 +247,6 @@ class WebService(QtCore.QObject): self.set_cache() self.setup_proxy() self.manager.finished.connect(self._process_reply) - self._last_request_times = defaultdict(lambda: 0) self._request_methods = { "GET": self.manager.get, "POST": self.manager.post, @@ -326,75 +290,15 @@ class WebService(QtCore.QObject): proxy.setPassword(config.setting["proxy_password"]) self.manager.setProxy(proxy) - @staticmethod - def _adjust_throttle(hostkey, slow_down): - """Adjust `REQUEST` and `CONGESTION` metrics when a HTTP request completes. - - Args: - hostkey: `(host, port)`. - slow_down: `True` if we encountered intermittent server trouble - and need to slow down. - """ - def in_backoff_phase(hostkey): - return CONGESTION_UNACK[hostkey] > CONGESTION_WINDOW_SIZE[hostkey] - - if slow_down: - # Backoff exponentially until ~30 seconds between requests. - delay = max(pow(2, REQUEST_DELAY_EXPONENT[hostkey]) * 1000, - REQUEST_DELAY_MINIMUM[hostkey]) - log.debug('WebService: %s: delay: %dms -> %dms.', hostkey, REQUEST_DELAY[hostkey], - delay) - REQUEST_DELAY[hostkey] = delay - - REQUEST_DELAY_EXPONENT[hostkey] = min(REQUEST_DELAY_EXPONENT[hostkey] + 1, 5) - - # Slow start threshold is ~1/2 of the window size up until we saw - # trouble. Shrink the new window size back to 1. - CONGESTION_SSTHRESH[hostkey] = int(CONGESTION_WINDOW_SIZE[hostkey] / 2.0) - log.debug('WebService: %s: ssthresh: %d.', hostkey, CONGESTION_SSTHRESH[hostkey]) - - CONGESTION_WINDOW_SIZE[hostkey] = 1.0 - log.debug('WebService: %s: cws: %.3f.', hostkey, CONGESTION_WINDOW_SIZE[hostkey]) - - elif not in_backoff_phase(hostkey): - REQUEST_DELAY_EXPONENT[hostkey] = 0 # Coming out of backoff, so reset. - - # Shrink the delay between requests with each successive reply to - # converge on maximum throughput. - delay = max(int(REQUEST_DELAY[hostkey] / 2), REQUEST_DELAY_MINIMUM[hostkey]) - if delay != REQUEST_DELAY[hostkey]: - log.debug('WebService: %s: delay: %dms -> %dms.', hostkey, REQUEST_DELAY[hostkey], - delay) - REQUEST_DELAY[hostkey] = delay - - cws = CONGESTION_WINDOW_SIZE[hostkey] - sst = CONGESTION_SSTHRESH[hostkey] - - if sst and cws >= sst: - # Analogous to TCP's congestion avoidance phase. Window growth is linear. - phase = 'congestion avoidance' - cws = cws + (1.0 / cws) - else: - # Analogous to TCP's slow start phase. Window growth is exponential. - phase = 'slow start' - cws += 1 - - if CONGESTION_WINDOW_SIZE[hostkey] != cws: - log.debug('WebService: %s: %s: window size %.3f -> %.3f', hostkey, phase, - CONGESTION_WINDOW_SIZE[hostkey], cws) - CONGESTION_WINDOW_SIZE[hostkey] = cws def _send_request(self, request, access_token=None): hostkey = request.get_host_key() - # Increment the number of unack'd requests on sending a new one - CONGESTION_UNACK[hostkey] += 1 - log.debug("WebService: %s: outstanding reqs: %d", hostkey, CONGESTION_UNACK[hostkey]) + ratecontrol.increment_requests(hostkey) request.access_token = access_token send = self._request_methods[request.method] data = request.data reply = send(request, data.encode('utf-8')) if data is not None else send(request) - self._remember_request_time(request.get_host_key()) self._active_requests[reply] = request def _start_request(self, request): @@ -435,11 +339,7 @@ class WebService(QtCore.QObject): original_port = self.url_port(url) original_host_key = (original_host, original_port) redirect_host_key = (redirect_host, redirect_port) - if (original_host_key in REQUEST_DELAY_MINIMUM - and redirect_host_key not in REQUEST_DELAY_MINIMUM): - log.debug("Setting the minimum rate limit for %s to %i" % - (redirect_host_key, REQUEST_DELAY_MINIMUM[original_host_key])) - REQUEST_DELAY_MINIMUM[redirect_host_key] = REQUEST_DELAY_MINIMUM[original_host_key] + ratecontrol.copy_minimal_delay(original_host_key, redirect_host_key) self.get(redirect_host, redirect_port, @@ -455,8 +355,8 @@ class WebService(QtCore.QObject): def _handle_reply(self, reply, request): hostkey = request.get_host_key() - CONGESTION_UNACK[hostkey] -= 1 - log.debug("WebService: %s: outstanding reqs: %d", hostkey, CONGESTION_UNACK[hostkey]) + ratecontrol.decrement_requests(hostkey) + self._timer_run_next_task.start(0) slow_down = False @@ -514,13 +414,13 @@ class WebService(QtCore.QObject): else: handler(reply.readAll(), reply, error) - self._adjust_throttle(hostkey, slow_down) + ratecontrol.adjust(hostkey, slow_down) def _process_reply(self, reply): try: request = self._active_requests.pop(reply) except KeyError: - log.error("Request not found for %s" % reply.request().url().toString(QUrl.RemoveUserInfo)) + log.error("Request not found for %s", reply.request().url().toString(QUrl.RemoveUserInfo)) return try: self._handle_reply(reply, request) @@ -585,41 +485,6 @@ class WebService(QtCore.QObject): if count: self._timer_count_pending_requests.start(COUNT_REQUESTS_DELAY_MS) - def _get_delay_to_next_request(self, hostkey): - """Calculate delay to next request to hostkey (host, port) - returns a tuple (wait, delay) where: - wait is True if a delay is needed - delay is the delay in milliseconds to next request - """ - - if CONGESTION_UNACK[hostkey] >= int(CONGESTION_WINDOW_SIZE[hostkey]): - # We've maxed out the number of requests to `hostkey`, so wait - # until responses begin to come back. (See `_timer_run_next_task` - # strobe in `_handle_reply`.) - return (True, sys.maxsize) - - interval = REQUEST_DELAY[hostkey] - if not interval: - log.debug("WSREQ: Starting another request to %s without delay", hostkey) - return (False, 0) - last_request = self._last_request_times[hostkey] - if not last_request: - log.debug("WSREQ: First request to %s", hostkey) - self._remember_request_time(hostkey) # set it on first run - return (False, interval) - elapsed = (time.time() - last_request) * 1000 - if elapsed >= interval: - log.debug("WSREQ: Last request to %s was %d ms ago, starting another one", hostkey, elapsed) - return (False, interval) - delay = int(math.ceil(interval - elapsed)) - log.debug("WSREQ: Last request to %s was %d ms ago, waiting %d ms before starting another one", - hostkey, elapsed, delay) - return (True, delay) - - def _remember_request_time(self, hostkey): - if REQUEST_DELAY[hostkey]: - self._last_request_times[hostkey] = time.time() - def _run_next_task(self): delay = sys.maxsize for prio in sorted(self._queues.keys(), reverse=True): @@ -628,12 +493,12 @@ class WebService(QtCore.QObject): del(self._queues[prio]) continue for hostkey in sorted(prio_queue.keys(), - key=lambda hostkey: REQUEST_DELAY[hostkey]): + key=ratecontrol.current_delay): queue = self._queues[prio][hostkey] if not queue: del(self._queues[prio][hostkey]) continue - wait, d = self._get_delay_to_next_request(hostkey) + wait, d = ratecontrol.get_delay_to_next_request(hostkey) if not wait: queue.popleft()() if d < delay: diff --git a/picard/webservice/api_helpers.py b/picard/webservice/api_helpers.py index 3ee2e726a..6276bc6fb 100644 --- a/picard/webservice/api_helpers.py +++ b/picard/webservice/api_helpers.py @@ -26,10 +26,14 @@ from picard.const import (ACOUSTID_KEY, CAA_HOST, CAA_PORT) -from picard.webservice import CLIENT_STRING, REQUEST_DELAY_MINIMUM, DEFAULT_RESPONSE_PARSER_TYPE +from picard.webservice import ( + CLIENT_STRING, + DEFAULT_RESPONSE_PARSER_TYPE, + ratecontrol, +) -REQUEST_DELAY_MINIMUM[(ACOUSTID_HOST, ACOUSTID_PORT)] = 333 -REQUEST_DELAY_MINIMUM[(CAA_HOST, CAA_PORT)] = 0 +ratecontrol.set_minimum_delay((ACOUSTID_HOST, ACOUSTID_PORT), 333) +ratecontrol.set_minimum_delay((CAA_HOST, CAA_PORT), 0) def escape_lucene_query(text): diff --git a/picard/webservice/ratecontrol.py b/picard/webservice/ratecontrol.py new file mode 100644 index 000000000..dd13988c0 --- /dev/null +++ b/picard/webservice/ratecontrol.py @@ -0,0 +1,229 @@ +# -*- coding: utf-8 -*- +# +# Picard, the next-generation MusicBrainz tagger +# Copyright (C) 2007 Lukáš Lalinský +# Copyright (C) 2009 Carlin Mangar +# Copyright (C) 2017 Sambhav Kothari +# Copyright (C) 2018 Laurent Monin +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + +import sys +import math +import time + +from collections import defaultdict +from picard import log + +# ============================================================================ +# Throttling/congestion avoidance +# ============================================================================ + +# Throttles requests to a given hostkey by assigning a minimum delay between +# requests in milliseconds. +# + +# Plugins may assign limits to their associated service(s) like so: +# +# >>> from picard.webservice import ratecontrol +# >>> ratecontrol.set_minimum_delay(('myservice.org', 80), 100) # 10 requests/second + + +# Minimun delay for the given hostkey (in milliseconds), can be set using +# set_minimum_delay() +REQUEST_DELAY_MINIMUM = defaultdict(lambda: 1000) + +# Current delay (adaptive) between requests to a given hostkey. +REQUEST_DELAY = defaultdict(lambda: 1000) # Conservative initial value. + +# Determines delay during exponential backoff phase. +REQUEST_DELAY_EXPONENT = defaultdict(lambda: 0) + +# Unacknowledged request counter. +# +# Bump this when handing a request to QNetworkManager and trim when receiving +# a response. +CONGESTION_UNACK = defaultdict(lambda: 0) + +# Congestion window size in terms of unacked requests. +# +# We're allowed to send up to `int(this)` many requests at a time. +CONGESTION_WINDOW_SIZE = defaultdict(lambda: 1.0) + +# Slow start threshold. +# +# After placing this many unacknowledged requests on the wire, switch from +# slow start to congestion avoidance. (See `_adjust_throttle`.) Initialized +# upon encountering a temporary error. +CONGESTION_SSTHRESH = defaultdict(lambda: 0) + +# Storage of last request times per host key +LAST_REQUEST_TIMES = defaultdict(lambda: 0) + + +def _dbug(message, hostkey, *args): + log.debug('ratecontrol: %s: '+ message, hostkey, *args) + + +def set_minimum_delay(hostkey, delay_ms): + """Set the minimun delay between requests + hostkey is an unique key, for example (host, port) + delay_ms is the delay in milliseconds + """ + REQUEST_DELAY_MINIMUM[hostkey] = delay_ms + + +def current_delay(hostkey): + """Returns the current delay (adaptive) between requests for this hostkey + hostkey is an unique key, for example (host, port) + """ + return REQUEST_DELAY[hostkey] + + +def get_delay_to_next_request(hostkey): + """Calculate delay to next request to hostkey (host, port) + returns a tuple (wait, delay) where: + wait is True if a delay is needed + delay is the delay in milliseconds to next request + """ + if CONGESTION_UNACK[hostkey] >= int(CONGESTION_WINDOW_SIZE[hostkey]): + # We've maxed out the number of requests to `hostkey`, so wait + # until responses begin to come back. (See `_timer_run_next_task` + # strobe in `_handle_reply`.) + return (True, sys.maxsize) + + interval = REQUEST_DELAY[hostkey] + if not interval: + _dbug("Starting another request without delay", hostkey) + return (False, 0) + last_request = LAST_REQUEST_TIMES[hostkey] + if not last_request: + _dbug("First request", hostkey) + _remember_request_time(hostkey) # set it on first run + return (False, interval) + elapsed = (time.time() - last_request) * 1000 + if elapsed >= interval: + _dbug("Last request was %d ms ago, starting another one", hostkey, elapsed) + return (False, interval) + delay = int(math.ceil(interval - elapsed)) + _dbug("Last request was %d ms ago, waiting %d ms before starting another one", + hostkey, elapsed, delay) + return (True, delay) + + +def _remember_request_time(hostkey): + if REQUEST_DELAY[hostkey]: + LAST_REQUEST_TIMES[hostkey] = time.time() + + +def increment_requests(hostkey): + """Store the request time for this hostkey, and increment counter + It has to be called on each request + """ + _remember_request_time(hostkey) + # Increment the number of unack'd requests on sending a new one + CONGESTION_UNACK[hostkey] += 1 + _dbug("Incrementing requests to: %d", hostkey, CONGESTION_UNACK[hostkey]) + + +def decrement_requests(hostkey): + """Decrement counter, it has to be called on each reply + """ + assert(CONGESTION_UNACK[hostkey] > 0) + CONGESTION_UNACK[hostkey] -= 1 + _dbug("Decrementing requests to: %d", hostkey, CONGESTION_UNACK[hostkey]) + + +def copy_minimal_delay(from_hostkey, to_hostkey): + """Copy minimal delay from one hostkey to another + Useful for redirections + """ + if (from_hostkey in REQUEST_DELAY_MINIMUM + and to_hostkey not in REQUEST_DELAY_MINIMUM): + REQUEST_DELAY_MINIMUM[to_hostkey] = REQUEST_DELAY_MINIMUM[from_hostkey] + _dbug("Copy minimun delay from %s, setting it to %dms", + to_hostkey, from_hostkey, REQUEST_DELAY_MINIMUM[to_hostkey]) + + +def adjust(hostkey, slow_down): + """Adjust `REQUEST` and `CONGESTION` metrics when a HTTP request completes. + + Args: + hostkey: `(host, port)`. + slow_down: `True` if we encountered intermittent server trouble + and need to slow down. + """ + if slow_down: + _slow_down(hostkey) + elif CONGESTION_UNACK[hostkey] <= CONGESTION_WINDOW_SIZE[hostkey]: + # not in backoff phase anymore + _out_of_backoff(hostkey) + + +def _slow_down(hostkey): + # Backoff exponentially until ~30 seconds between requests. + delay = max(pow(2, REQUEST_DELAY_EXPONENT[hostkey]) * 1000, + REQUEST_DELAY_MINIMUM[hostkey]) + + REQUEST_DELAY_EXPONENT[hostkey] = min( + REQUEST_DELAY_EXPONENT[hostkey] + 1, 5) + + # Slow start threshold is ~1/2 of the window size up until we saw + # trouble. Shrink the new window size back to 1. + CONGESTION_SSTHRESH[hostkey] = int(CONGESTION_WINDOW_SIZE[hostkey] / 2.0) + CONGESTION_WINDOW_SIZE[hostkey] = 1.0 + + _dbug('slowdown; delay: %dms -> %dms; ssthresh: %d; cws: %.3f', + hostkey, + REQUEST_DELAY[hostkey], + delay, + CONGESTION_SSTHRESH[hostkey], + CONGESTION_WINDOW_SIZE[hostkey]) + + REQUEST_DELAY[hostkey] = delay + + +def _out_of_backoff(hostkey): + REQUEST_DELAY_EXPONENT[hostkey] = 0 # Coming out of backoff, so reset. + + # Shrink the delay between requests with each successive reply to + # converge on maximum throughput. + delay = max(int(REQUEST_DELAY[hostkey] / 2), + REQUEST_DELAY_MINIMUM[hostkey]) + + cws = CONGESTION_WINDOW_SIZE[hostkey] + sst = CONGESTION_SSTHRESH[hostkey] + + if sst and cws >= sst: + # Analogous to TCP's congestion avoidance phase. Window growth is linear. + phase = 'congestion avoidance' + cws = cws + (1.0 / cws) + else: + # Analogous to TCP's slow start phase. Window growth is exponential. + phase = 'slow start' + cws += 1 + + if (REQUEST_DELAY[hostkey] != delay + or CONGESTION_WINDOW_SIZE[hostkey] != cws): + _dbug('oobackoff; delay: %dms -> %dms; %s; window size %.3f -> %.3f', + hostkey, + REQUEST_DELAY[hostkey], + delay, + phase, + CONGESTION_WINDOW_SIZE[hostkey], + cws) + + CONGESTION_WINDOW_SIZE[hostkey] = cws + REQUEST_DELAY[hostkey] = delay diff --git a/test/test_webservice.py b/test/test_webservice.py index 41476a354..484c1293f 100644 --- a/test/test_webservice.py +++ b/test/test_webservice.py @@ -2,7 +2,12 @@ import unittest from picard import config -from picard.webservice import WebService, UnknownResponseParserError, WSRequest +from picard.webservice import ( + WebService, + UnknownResponseParserError, + WSRequest, + ratecontrol, +) from unittest.mock import patch, MagicMock PROXY_SETTINGS = { @@ -126,7 +131,7 @@ class WebServiceTaskTest(unittest.TestCase): mock_task = MagicMock() mock_task2 = MagicMock() - delay_func = self.ws._get_delay_to_next_request = MagicMock() + delay_func = ratecontrol.get_delay_to_next_request = MagicMock() # Patching the get delay function to delay the 2nd task on queue to the next call delay_func.side_effect = [(False, 0), (True, 0), (False, 0), (False, 0), (False, 0), (False, 0)]