From d2247eb30e83c1da565e41dab581fd66ee6a1774 Mon Sep 17 00:00:00 2001 From: Sambhav Kothari Date: Thu, 20 Jul 2017 19:49:46 +0530 Subject: [PATCH] Refactor WS by adding WSRequest class and retries for temp failures Code borrowed from PR #473 Co-Authors - @zas and @rbeasley --- picard/webservice/__init__.py | 353 +++++++++++++++++++++++----------- 1 file changed, 239 insertions(+), 114 deletions(-) diff --git a/picard/webservice/__init__.py b/picard/webservice/__init__.py index 5efed1c30..54417192a 100644 --- a/picard/webservice/__init__.py +++ b/picard/webservice/__init__.py @@ -44,6 +44,7 @@ from picard.util.xml import parse_xml COUNT_REQUESTS_DELAY_MS = 250 REQUEST_DELAY = defaultdict(lambda: 1000) +TEMP_ERRORS_RETRIES = 5 USER_AGENT_STRING = '%s-%s/%s (%s;%s-%s)' % (PICARD_ORG_NAME, PICARD_APP_NAME, PICARD_VERSION_STR, platform.platform(), @@ -65,6 +66,136 @@ class UnknownResponseParserError(Exception): super().__init__(message) +class WSRequest(QtNetwork.QNetworkRequest): + """Represents a single HTTP request. + + :param method: HTTP method. One of ``GET``, ``POST``, ``PUT``, or ``DELETE``. + :param host: Hostname. + :param port: TCP port number (80 or 443). + :param path: Path component. + :param handler: Callback which takes a 3-tuple of `(str:document, + QNetworkReply:reply, QNetworkReply.Error:error)`. + :param parse_response_type: Specifies that request either sends or accepts + data as ``application/{{response_mimetype}}``. + :param data: Data to include with ``PUT`` or ``POST`` requests. + :param mblogin: Hints that this request should be tied to a MusicBrainz + account, requiring that we obtain an OAuth token first. + :param cacheloadcontrol: See `QNetworkRequest.CacheLoadControlAttribute`. + :param refresh: Indicates a user-specified resource refresh, such as when + the user wishes to reload a release. Marks the request as high priority + and disables caching. + :param access_token: OAuth token. + :param queryargs: `dict` of query arguments. + :param retries: Current retry attempt number. + :param priority: Indicates that this is a high priority request. (See + `WebService._run_next_task`.) + :param important: Indicates that this is an important request. (Ditto.) + """ + + def __init__(self, method, host, port, path, handler, parse_response_type=None, data=None, + mblogin=False, cacheloadcontrol=None, refresh=None, + queryargs=None, priority=False, important=False): + url = build_qurl(host, port, path=path, queryargs=queryargs) + super().__init__(url) + + # These two are codependent (see _update_authorization_header) and must + # be initialized explicitly. + self._access_token = None + self._mblogin = None + + self._retries = 0 + + self.method = method + self.host = host + self.port = port + self.path = path + self.handler = handler + self.parse_response_type = parse_response_type + self.response_parser = None + self.response_mimetype = None + self.data = data + self.mblogin = mblogin + self.cacheloadcontrol = cacheloadcontrol + self.refresh = refresh + self.queryargs = queryargs + self.priority = priority + self.important = important + + self.access_token = None + self._init_headers() + + def _init_headers(self): + self.setRawHeader(b"User-Agent", USER_AGENT_STRING.encode('utf-8')) + + if self.method == "GET": + self.setAttribute(QtNetwork.QNetworkRequest.HttpPipeliningAllowedAttribute, + True) + + if self.mblogin or (self.method == "GET" and self.refresh): + self.setPriority(QtNetwork.QNetworkRequest.HighPriority) + self.setAttribute(QtNetwork.QNetworkRequest.CacheLoadControlAttribute, + QtNetwork.QNetworkRequest.AlwaysNetwork) + elif self.method in ("PUT", "DELETE"): + self.setPriority(QtNetwork.QNetworkRequest.HighPriority) + elif self.cacheloadcontrol is not None: + self.setAttribute(QtNetwork.QNetworkRequest.CacheLoadControlAttribute, + self.cacheloadcontrol) + if self.parse_response_type: + try: + self.response_mimetype = WebService.get_response_mimetype(self.parse_response_type) + self.response_parser = WebService.get_response_parser(self.parse_response_type) + except UnknownResponseParserError as e: + log.error(e.args[0]) + else: + self.setRawHeader(b"Accept", self.response_mimetype.encode('utf-8')) + if self.data is not None: + if (self.method == "POST" + and self.host == config.setting["server_host"] + and self.response_mimetype): + self.setHeader(QtNetwork.QNetworkRequest.ContentTypeHeader, "%s; charset=utf-8" % self.response_mimetype) + else: + self.setHeader(QtNetwork.QNetworkRequest.ContentTypeHeader, "application/x-www-form-urlencoded") + + def _update_authorization_header(self): + if self.mblogin and self.access_token: + self.setRawHeader(b"Authorization", ("Bearer %s" % string_(self.access_token)).encode('utf-8')) + else: + self.setRawHeader(b"Authorization", b"") + + @property + def access_token(self): + return self._access_token + + @access_token.setter + def access_token(self, access_token): + self._access_token = access_token + self._update_authorization_header() + + @property + def mblogin(self): + return self._mblogin + + @mblogin.setter + def mblogin(self, mblogin): + self._mblogin = mblogin + self._update_authorization_header() + + def get_host_key(self): + return (self.host, self.port) + + def max_retries_reached(self): + return self._retries >= TEMP_ERRORS_RETRIES + + def mark_for_retry(self, important=True, priority=True): + # Put retries at the head of the list in order to not penalize + # the load an album unlucky enough to hit a temporary service + # snag. + self.important = important + self.priority = priority + self._retries += 1 + return self._retries + + class WebService(QtCore.QObject): PARSERS = dict() @@ -120,53 +251,19 @@ class WebService(QtCore.QObject): proxy.setPassword(config.setting["proxy_password"]) self.manager.setProxy(proxy) - def _start_request_continue(self, method, host, port, path, data, handler, parse_response_type, - mblogin=False, cacheloadcontrol=None, refresh=None, - access_token=None, queryargs=None): - url = build_qurl(host, port, path=path, queryargs=queryargs) - request = QtNetwork.QNetworkRequest(url) - if mblogin and access_token: - # access_token must not be unicode - PyQt5 doesn't like it. - request.setRawHeader(b"Authorization", ("Bearer %s" % string_(access_token)).encode('utf-8')) - if mblogin or (method == "GET" and refresh): - request.setPriority(QtNetwork.QNetworkRequest.HighPriority) - request.setAttribute(QtNetwork.QNetworkRequest.CacheLoadControlAttribute, - QtNetwork.QNetworkRequest.AlwaysNetwork) - elif method == "PUT" or method == "DELETE": - request.setPriority(QtNetwork.QNetworkRequest.HighPriority) - elif cacheloadcontrol is not None: - request.setAttribute(QtNetwork.QNetworkRequest.CacheLoadControlAttribute, - cacheloadcontrol) - request.setRawHeader(b"User-Agent", USER_AGENT_STRING.encode('utf-8')) - response_mimetype = "" - if parse_response_type: - try: - response_mimetype = self.get_response_mimetype(parse_response_type) - request.setRawHeader(b"Accept", response_mimetype.encode('utf-8')) - except UnknownResponseParserError as e: - log.error(e.args[0]) - if data is not None: - if method == "POST" and host == config.setting["server_host"] and response_mimetype: - request.setHeader(QtNetwork.QNetworkRequest.ContentTypeHeader, "%s; charset=utf-8" % response_mimetype) - else: - request.setHeader(QtNetwork.QNetworkRequest.ContentTypeHeader, "application/x-www-form-urlencoded") - send = self._request_methods[method] + def _send_request(self, request, access_token=None): + request.access_token = access_token + send = self._request_methods[request.method] + data = request.data reply = send(request, data.encode('utf-8')) if data is not None else send(request) - self._remember_request_time((host, port)) - self._active_requests[reply] = (request, handler, parse_response_type, refresh) + self._remember_request_time(request.get_host_key()) + self._active_requests[reply] = request - def _start_request(self, method, host, port, path, data, handler, parse_response_type, - mblogin=False, cacheloadcontrol=None, refresh=None, - queryargs=None): - def start_request_continue(access_token=None): - self._start_request_continue( - method, host, port, path, data, handler, parse_response_type, - mblogin=mblogin, cacheloadcontrol=cacheloadcontrol, refresh=refresh, - access_token=access_token, queryargs=queryargs) - if mblogin and path != "/oauth2/token": - self.oauth_manager.get_access_token(start_request_continue) + def _start_request(self, request): + if request.mblogin and request.path != "/oauth2/token": + self.oauth_manager.get_access_token(partial(self._send_request, request)) else: - start_request_continue() + self._send_request(request) @staticmethod def urls_equivalent(leftUrl, rightUrl): @@ -184,17 +281,65 @@ class WebService(QtCore.QObject): return url.port(443) return url.port(80) - def _handle_reply(self, reply, request, handler, parse_response_type, refresh): + def _handle_redirect(self, reply, request, redirect): + url = request.url() error = int(reply.error()) - if error: - log.error("Network request error for %s: %s (QT code %d, HTTP code %s)", - reply.request().url().toString(QUrl.RemoveUserInfo), - reply.errorString(), - error, - repr(reply.attribute(QtNetwork.QNetworkRequest.HttpStatusCodeAttribute)) + # merge with base url (to cover the possibility of the URL being relative) + redirect = url.resolved(redirect) + if not WebService.urls_equivalent(redirect, reply.request().url()): + log.debug("Redirect to %s requested", redirect.toString(QUrl.RemoveUserInfo)) + redirect_host = string_(redirect.host()) + redirect_port = self.url_port(redirect) + redirect_query = dict(QUrlQuery(redirect).queryItems(QUrl.FullyEncoded)) + redirect_path = redirect.path() + + original_host = string_(url.host()) + original_port = self.url_port(url) + + if ((original_host, original_port) in REQUEST_DELAY + and (redirect_host, redirect_port) not in REQUEST_DELAY): + log.debug("Setting rate limit for %s:%i to %i" % + (redirect_host, redirect_port, + REQUEST_DELAY[(original_host, original_port)])) + REQUEST_DELAY[(redirect_host, redirect_port)] =\ + REQUEST_DELAY[(original_host, original_port)] + + self.get(redirect_host, + redirect_port, + redirect_path, + request.handler, request.parse_response_type, priority=True, important=True, + refresh=request.refresh, queryargs=redirect_query, + cacheloadcontrol=request.attribute(QtNetwork.QNetworkRequest.CacheLoadControlAttribute)) + else: + log.error("Redirect loop: %s", + reply.request().url().toString(QUrl.RemoveUserInfo) ) - if handler is not None: - handler(string_(reply.readAll()), reply, error) + handler(reply.readAll(), reply, error) + + def _handle_reply(self, reply, request): + + error = int(reply.error()) + handler = request.handler + if error: + code = reply.attribute(QtNetwork.QNetworkRequest.HttpStatusCodeAttribute) + code = int(code) if code else 0 + errstr = reply.errorString() + url = reply.request().url().toString(QUrl.RemoveUserInfo) + log.error("Network request error for %s: %s (QT code %d, HTTP code %d)", + url, errstr, error, code) + if (not request.max_retries_reached() + and (code == 503 + or code == 429 + # following line is a workaround for Picard-809 + or errstr.endswith("Service Temporarily Unavailable") + ) + ): + retries = request.mark_for_retry() + log.debug("Retrying %s (#%d)", url, retries) + self.add_task(partial(self._start_request, request), request) + + elif handler is not None: + handler(reply.readAll(), reply, error) else: redirect = reply.attribute(QtNetwork.QNetworkRequest.RedirectionTargetAttribute) fromCache = reply.attribute(QtNetwork.QNetworkRequest.SourceIsFromCacheAttribute) @@ -208,49 +353,13 @@ class WebService(QtCore.QObject): if handler is not None: # Redirect if found and not infinite if redirect: - url = request.url() - # merge with base url (to cover the possibility of the URL being relative) - redirect = url.resolved(redirect) - if not WebService.urls_equivalent(redirect, reply.request().url()): - log.debug("Redirect to %s requested", redirect.toString(QUrl.RemoveUserInfo)) - redirect_host = string_(redirect.host()) - redirect_port = self.url_port(redirect) - redirect_query = dict(QUrlQuery(redirect).queryItems(QUrl.FullyEncoded)) - redirect_path = redirect.path() - - original_host = string_(url.host()) - original_port = self.url_port(url) - - if ((original_host, original_port) in REQUEST_DELAY - and (redirect_host, redirect_port) not in REQUEST_DELAY): - log.debug("Setting rate limit for %s:%i to %i" % - (redirect_host, redirect_port, - REQUEST_DELAY[(original_host, original_port)])) - REQUEST_DELAY[(redirect_host, redirect_port)] =\ - REQUEST_DELAY[(original_host, original_port)] - - self.get(redirect_host, - redirect_port, - redirect_path, - handler, parse_response_type, priority=True, important=True, refresh=refresh, queryargs=redirect_query, - cacheloadcontrol=request.attribute(QtNetwork.QNetworkRequest.CacheLoadControlAttribute)) - else: - log.error("Redirect loop: %s", - reply.request().url().toString(QUrl.RemoveUserInfo) - ) - handler(reply.readAll(), reply, error) - elif parse_response_type: + self._handle_redirect(reply, request, redirect) + elif request.response_parser: try: - response_parser = self.get_response_parser(parse_response_type) - except UnknownResponseParserError as e: - log.error(e.args[0]) + document = request.response_parser(reply) + except Exception as e: + log.error("Unable to parse the response. %s", e) document = reply.readAll() - else: - try: - document = response_parser(reply) - except Exception as e: - log.error("Unable to parse the response. %s", e) - document = reply.readAll() finally: handler(document, reply, error) else: @@ -258,34 +367,47 @@ class WebService(QtCore.QObject): def _process_reply(self, reply): try: - request, handler, parse_response_type, refresh = self._active_requests.pop(reply) + request = self._active_requests.pop(reply) except KeyError: log.error("Request not found for %s" % reply.request().url().toString(QUrl.RemoveUserInfo)) return try: - self._handle_reply(reply, request, handler, parse_response_type, refresh) + self._handle_reply(reply, request) finally: reply.close() reply.deleteLater() - def get(self, host, port, path, handler, parse_response_type=DEFAULT_RESPONSE_PARSER_TYPE, priority=False, - important=False, mblogin=False, cacheloadcontrol=None, refresh=False, queryargs=None): - func = partial(self._start_request, "GET", host, port, path, None, - handler, parse_response_type, mblogin, cacheloadcontrol=cacheloadcontrol, refresh=refresh, queryargs=queryargs) - return self.add_task(func, host, port, priority, important=important) + def get(self, host, port, path, handler, parse_response_type=DEFAULT_RESPONSE_PARSER_TYPE, + priority=False, important=False, mblogin=False, cacheloadcontrol=None, refresh=False, + queryargs=None): + request = WSRequest("GET", host, port, path, handler, parse_response_type=parse_response_type, + mblogin=mblogin, cacheloadcontrol=cacheloadcontrol, refresh=refresh, + queryargs=queryargs, priority=priority, important=important) + func = partial(self._start_request, request) + return self.add_task(func, request) - def post(self, host, port, path, data, handler, parse_response_type=DEFAULT_RESPONSE_PARSER_TYPE, priority=False, important=False, mblogin=True, queryargs=None): + def post(self, host, port, path, data, handler, parse_response_type=DEFAULT_RESPONSE_PARSER_TYPE, + priority=False, important=False, mblogin=True, queryargs=None): + request = WSRequest("POST", host, port, path, handler, parse_response_type=parse_response_type, + data=data, mblogin=mblogin, queryargs=queryargs, + priority=priority, important=important) log.debug("POST-DATA %r", data) - func = partial(self._start_request, "POST", host, port, path, data, handler, parse_response_type, mblogin, queryargs=queryargs) - return self.add_task(func, host, port, priority, important=important) + func = partial(self._start_request, request) + return self.add_task(func, request) - def put(self, host, port, path, data, handler, priority=True, important=False, mblogin=True, queryargs=None): - func = partial(self._start_request, "PUT", host, port, path, data, handler, False, mblogin, queryargs=queryargs) - return self.add_task(func, host, port, priority, important=important) + def put(self, host, port, path, data, handler, priority=True, important=False, mblogin=True, + queryargs=None): + request = WSRequest("PUT", host, port, path, handler, data=data, mblogin=mblogin, + queryargs=queryargs, priority=priority, important=important) + func = partial(self._start_request, request) + return self.add_task(func, request) - def delete(self, host, port, path, handler, priority=True, important=False, mblogin=True, queryargs=None): - func = partial(self._start_request, "DELETE", host, port, path, None, handler, False, mblogin, queryargs=queryargs) - return self.add_task(func, host, port, priority, important=important) + def delete(self, host, port, path, handler, priority=True, important=False, mblogin=True, + queryargs=None): + request = WSRequest("DELETE", host, port, path, handler, mblogin=mblogin, + queryargs=queryargs, priority=priority, important=important) + func = partial(self._start_request, request) + return self.add_task(func, request) def download(self, host, port, path, handler, priority=False, important=False, cacheloadcontrol=None, refresh=False, @@ -361,17 +483,20 @@ class WebService(QtCore.QObject): if delay < sys.maxsize: self._timer_run_next_task.start(delay) - def add_task(self, func, host, port, priority, important=False): - hostkey = (host, port) - prio = int(priority) # priority is a boolean - if important: + def add_task(self, func, request): + hostkey = request.get_host_key() + prio = int(request.priority) # priority is a boolean + if request.important: self._queues[prio][hostkey].appendleft(func) else: self._queues[prio][hostkey].append(func) + if not self._timer_run_next_task.isActive(): self._timer_run_next_task.start(0) + if not self._timer_count_pending_requests.isActive(): self._timer_count_pending_requests.start(0) + return (hostkey, func, prio) def remove_task(self, task):