Refactor WS by adding WSRequest class and retries for temp failures

Code borrowed from PR #473
Co-Authors - @zas and @rbeasley
This commit is contained in:
Sambhav Kothari
2017-07-20 19:49:46 +05:30
parent 599294762b
commit d2247eb30e

View File

@@ -44,6 +44,7 @@ from picard.util.xml import parse_xml
COUNT_REQUESTS_DELAY_MS = 250
REQUEST_DELAY = defaultdict(lambda: 1000)
TEMP_ERRORS_RETRIES = 5
USER_AGENT_STRING = '%s-%s/%s (%s;%s-%s)' % (PICARD_ORG_NAME, PICARD_APP_NAME,
PICARD_VERSION_STR,
platform.platform(),
@@ -65,6 +66,136 @@ class UnknownResponseParserError(Exception):
super().__init__(message)
class WSRequest(QtNetwork.QNetworkRequest):
"""Represents a single HTTP request.
:param method: HTTP method. One of ``GET``, ``POST``, ``PUT``, or ``DELETE``.
:param host: Hostname.
:param port: TCP port number (80 or 443).
:param path: Path component.
:param handler: Callback which takes a 3-tuple of `(str:document,
QNetworkReply:reply, QNetworkReply.Error:error)`.
:param parse_response_type: Specifies that request either sends or accepts
data as ``application/{{response_mimetype}}``.
:param data: Data to include with ``PUT`` or ``POST`` requests.
:param mblogin: Hints that this request should be tied to a MusicBrainz
account, requiring that we obtain an OAuth token first.
:param cacheloadcontrol: See `QNetworkRequest.CacheLoadControlAttribute`.
:param refresh: Indicates a user-specified resource refresh, such as when
the user wishes to reload a release. Marks the request as high priority
and disables caching.
:param access_token: OAuth token.
:param queryargs: `dict` of query arguments.
:param retries: Current retry attempt number.
:param priority: Indicates that this is a high priority request. (See
`WebService._run_next_task`.)
:param important: Indicates that this is an important request. (Ditto.)
"""
def __init__(self, method, host, port, path, handler, parse_response_type=None, data=None,
mblogin=False, cacheloadcontrol=None, refresh=None,
queryargs=None, priority=False, important=False):
url = build_qurl(host, port, path=path, queryargs=queryargs)
super().__init__(url)
# These two are codependent (see _update_authorization_header) and must
# be initialized explicitly.
self._access_token = None
self._mblogin = None
self._retries = 0
self.method = method
self.host = host
self.port = port
self.path = path
self.handler = handler
self.parse_response_type = parse_response_type
self.response_parser = None
self.response_mimetype = None
self.data = data
self.mblogin = mblogin
self.cacheloadcontrol = cacheloadcontrol
self.refresh = refresh
self.queryargs = queryargs
self.priority = priority
self.important = important
self.access_token = None
self._init_headers()
def _init_headers(self):
self.setRawHeader(b"User-Agent", USER_AGENT_STRING.encode('utf-8'))
if self.method == "GET":
self.setAttribute(QtNetwork.QNetworkRequest.HttpPipeliningAllowedAttribute,
True)
if self.mblogin or (self.method == "GET" and self.refresh):
self.setPriority(QtNetwork.QNetworkRequest.HighPriority)
self.setAttribute(QtNetwork.QNetworkRequest.CacheLoadControlAttribute,
QtNetwork.QNetworkRequest.AlwaysNetwork)
elif self.method in ("PUT", "DELETE"):
self.setPriority(QtNetwork.QNetworkRequest.HighPriority)
elif self.cacheloadcontrol is not None:
self.setAttribute(QtNetwork.QNetworkRequest.CacheLoadControlAttribute,
self.cacheloadcontrol)
if self.parse_response_type:
try:
self.response_mimetype = WebService.get_response_mimetype(self.parse_response_type)
self.response_parser = WebService.get_response_parser(self.parse_response_type)
except UnknownResponseParserError as e:
log.error(e.args[0])
else:
self.setRawHeader(b"Accept", self.response_mimetype.encode('utf-8'))
if self.data is not None:
if (self.method == "POST"
and self.host == config.setting["server_host"]
and self.response_mimetype):
self.setHeader(QtNetwork.QNetworkRequest.ContentTypeHeader, "%s; charset=utf-8" % self.response_mimetype)
else:
self.setHeader(QtNetwork.QNetworkRequest.ContentTypeHeader, "application/x-www-form-urlencoded")
def _update_authorization_header(self):
if self.mblogin and self.access_token:
self.setRawHeader(b"Authorization", ("Bearer %s" % string_(self.access_token)).encode('utf-8'))
else:
self.setRawHeader(b"Authorization", b"")
@property
def access_token(self):
return self._access_token
@access_token.setter
def access_token(self, access_token):
self._access_token = access_token
self._update_authorization_header()
@property
def mblogin(self):
return self._mblogin
@mblogin.setter
def mblogin(self, mblogin):
self._mblogin = mblogin
self._update_authorization_header()
def get_host_key(self):
return (self.host, self.port)
def max_retries_reached(self):
return self._retries >= TEMP_ERRORS_RETRIES
def mark_for_retry(self, important=True, priority=True):
# Put retries at the head of the list in order to not penalize
# the load an album unlucky enough to hit a temporary service
# snag.
self.important = important
self.priority = priority
self._retries += 1
return self._retries
class WebService(QtCore.QObject):
PARSERS = dict()
@@ -120,53 +251,19 @@ class WebService(QtCore.QObject):
proxy.setPassword(config.setting["proxy_password"])
self.manager.setProxy(proxy)
def _start_request_continue(self, method, host, port, path, data, handler, parse_response_type,
mblogin=False, cacheloadcontrol=None, refresh=None,
access_token=None, queryargs=None):
url = build_qurl(host, port, path=path, queryargs=queryargs)
request = QtNetwork.QNetworkRequest(url)
if mblogin and access_token:
# access_token must not be unicode - PyQt5 doesn't like it.
request.setRawHeader(b"Authorization", ("Bearer %s" % string_(access_token)).encode('utf-8'))
if mblogin or (method == "GET" and refresh):
request.setPriority(QtNetwork.QNetworkRequest.HighPriority)
request.setAttribute(QtNetwork.QNetworkRequest.CacheLoadControlAttribute,
QtNetwork.QNetworkRequest.AlwaysNetwork)
elif method == "PUT" or method == "DELETE":
request.setPriority(QtNetwork.QNetworkRequest.HighPriority)
elif cacheloadcontrol is not None:
request.setAttribute(QtNetwork.QNetworkRequest.CacheLoadControlAttribute,
cacheloadcontrol)
request.setRawHeader(b"User-Agent", USER_AGENT_STRING.encode('utf-8'))
response_mimetype = ""
if parse_response_type:
try:
response_mimetype = self.get_response_mimetype(parse_response_type)
request.setRawHeader(b"Accept", response_mimetype.encode('utf-8'))
except UnknownResponseParserError as e:
log.error(e.args[0])
if data is not None:
if method == "POST" and host == config.setting["server_host"] and response_mimetype:
request.setHeader(QtNetwork.QNetworkRequest.ContentTypeHeader, "%s; charset=utf-8" % response_mimetype)
else:
request.setHeader(QtNetwork.QNetworkRequest.ContentTypeHeader, "application/x-www-form-urlencoded")
send = self._request_methods[method]
def _send_request(self, request, access_token=None):
request.access_token = access_token
send = self._request_methods[request.method]
data = request.data
reply = send(request, data.encode('utf-8')) if data is not None else send(request)
self._remember_request_time((host, port))
self._active_requests[reply] = (request, handler, parse_response_type, refresh)
self._remember_request_time(request.get_host_key())
self._active_requests[reply] = request
def _start_request(self, method, host, port, path, data, handler, parse_response_type,
mblogin=False, cacheloadcontrol=None, refresh=None,
queryargs=None):
def start_request_continue(access_token=None):
self._start_request_continue(
method, host, port, path, data, handler, parse_response_type,
mblogin=mblogin, cacheloadcontrol=cacheloadcontrol, refresh=refresh,
access_token=access_token, queryargs=queryargs)
if mblogin and path != "/oauth2/token":
self.oauth_manager.get_access_token(start_request_continue)
def _start_request(self, request):
if request.mblogin and request.path != "/oauth2/token":
self.oauth_manager.get_access_token(partial(self._send_request, request))
else:
start_request_continue()
self._send_request(request)
@staticmethod
def urls_equivalent(leftUrl, rightUrl):
@@ -184,17 +281,65 @@ class WebService(QtCore.QObject):
return url.port(443)
return url.port(80)
def _handle_reply(self, reply, request, handler, parse_response_type, refresh):
def _handle_redirect(self, reply, request, redirect):
url = request.url()
error = int(reply.error())
if error:
log.error("Network request error for %s: %s (QT code %d, HTTP code %s)",
reply.request().url().toString(QUrl.RemoveUserInfo),
reply.errorString(),
error,
repr(reply.attribute(QtNetwork.QNetworkRequest.HttpStatusCodeAttribute))
# merge with base url (to cover the possibility of the URL being relative)
redirect = url.resolved(redirect)
if not WebService.urls_equivalent(redirect, reply.request().url()):
log.debug("Redirect to %s requested", redirect.toString(QUrl.RemoveUserInfo))
redirect_host = string_(redirect.host())
redirect_port = self.url_port(redirect)
redirect_query = dict(QUrlQuery(redirect).queryItems(QUrl.FullyEncoded))
redirect_path = redirect.path()
original_host = string_(url.host())
original_port = self.url_port(url)
if ((original_host, original_port) in REQUEST_DELAY
and (redirect_host, redirect_port) not in REQUEST_DELAY):
log.debug("Setting rate limit for %s:%i to %i" %
(redirect_host, redirect_port,
REQUEST_DELAY[(original_host, original_port)]))
REQUEST_DELAY[(redirect_host, redirect_port)] =\
REQUEST_DELAY[(original_host, original_port)]
self.get(redirect_host,
redirect_port,
redirect_path,
request.handler, request.parse_response_type, priority=True, important=True,
refresh=request.refresh, queryargs=redirect_query,
cacheloadcontrol=request.attribute(QtNetwork.QNetworkRequest.CacheLoadControlAttribute))
else:
log.error("Redirect loop: %s",
reply.request().url().toString(QUrl.RemoveUserInfo)
)
if handler is not None:
handler(string_(reply.readAll()), reply, error)
handler(reply.readAll(), reply, error)
def _handle_reply(self, reply, request):
error = int(reply.error())
handler = request.handler
if error:
code = reply.attribute(QtNetwork.QNetworkRequest.HttpStatusCodeAttribute)
code = int(code) if code else 0
errstr = reply.errorString()
url = reply.request().url().toString(QUrl.RemoveUserInfo)
log.error("Network request error for %s: %s (QT code %d, HTTP code %d)",
url, errstr, error, code)
if (not request.max_retries_reached()
and (code == 503
or code == 429
# following line is a workaround for Picard-809
or errstr.endswith("Service Temporarily Unavailable")
)
):
retries = request.mark_for_retry()
log.debug("Retrying %s (#%d)", url, retries)
self.add_task(partial(self._start_request, request), request)
elif handler is not None:
handler(reply.readAll(), reply, error)
else:
redirect = reply.attribute(QtNetwork.QNetworkRequest.RedirectionTargetAttribute)
fromCache = reply.attribute(QtNetwork.QNetworkRequest.SourceIsFromCacheAttribute)
@@ -208,49 +353,13 @@ class WebService(QtCore.QObject):
if handler is not None:
# Redirect if found and not infinite
if redirect:
url = request.url()
# merge with base url (to cover the possibility of the URL being relative)
redirect = url.resolved(redirect)
if not WebService.urls_equivalent(redirect, reply.request().url()):
log.debug("Redirect to %s requested", redirect.toString(QUrl.RemoveUserInfo))
redirect_host = string_(redirect.host())
redirect_port = self.url_port(redirect)
redirect_query = dict(QUrlQuery(redirect).queryItems(QUrl.FullyEncoded))
redirect_path = redirect.path()
original_host = string_(url.host())
original_port = self.url_port(url)
if ((original_host, original_port) in REQUEST_DELAY
and (redirect_host, redirect_port) not in REQUEST_DELAY):
log.debug("Setting rate limit for %s:%i to %i" %
(redirect_host, redirect_port,
REQUEST_DELAY[(original_host, original_port)]))
REQUEST_DELAY[(redirect_host, redirect_port)] =\
REQUEST_DELAY[(original_host, original_port)]
self.get(redirect_host,
redirect_port,
redirect_path,
handler, parse_response_type, priority=True, important=True, refresh=refresh, queryargs=redirect_query,
cacheloadcontrol=request.attribute(QtNetwork.QNetworkRequest.CacheLoadControlAttribute))
else:
log.error("Redirect loop: %s",
reply.request().url().toString(QUrl.RemoveUserInfo)
)
handler(reply.readAll(), reply, error)
elif parse_response_type:
self._handle_redirect(reply, request, redirect)
elif request.response_parser:
try:
response_parser = self.get_response_parser(parse_response_type)
except UnknownResponseParserError as e:
log.error(e.args[0])
document = request.response_parser(reply)
except Exception as e:
log.error("Unable to parse the response. %s", e)
document = reply.readAll()
else:
try:
document = response_parser(reply)
except Exception as e:
log.error("Unable to parse the response. %s", e)
document = reply.readAll()
finally:
handler(document, reply, error)
else:
@@ -258,34 +367,47 @@ class WebService(QtCore.QObject):
def _process_reply(self, reply):
try:
request, handler, parse_response_type, refresh = self._active_requests.pop(reply)
request = self._active_requests.pop(reply)
except KeyError:
log.error("Request not found for %s" % reply.request().url().toString(QUrl.RemoveUserInfo))
return
try:
self._handle_reply(reply, request, handler, parse_response_type, refresh)
self._handle_reply(reply, request)
finally:
reply.close()
reply.deleteLater()
def get(self, host, port, path, handler, parse_response_type=DEFAULT_RESPONSE_PARSER_TYPE, priority=False,
important=False, mblogin=False, cacheloadcontrol=None, refresh=False, queryargs=None):
func = partial(self._start_request, "GET", host, port, path, None,
handler, parse_response_type, mblogin, cacheloadcontrol=cacheloadcontrol, refresh=refresh, queryargs=queryargs)
return self.add_task(func, host, port, priority, important=important)
def get(self, host, port, path, handler, parse_response_type=DEFAULT_RESPONSE_PARSER_TYPE,
priority=False, important=False, mblogin=False, cacheloadcontrol=None, refresh=False,
queryargs=None):
request = WSRequest("GET", host, port, path, handler, parse_response_type=parse_response_type,
mblogin=mblogin, cacheloadcontrol=cacheloadcontrol, refresh=refresh,
queryargs=queryargs, priority=priority, important=important)
func = partial(self._start_request, request)
return self.add_task(func, request)
def post(self, host, port, path, data, handler, parse_response_type=DEFAULT_RESPONSE_PARSER_TYPE, priority=False, important=False, mblogin=True, queryargs=None):
def post(self, host, port, path, data, handler, parse_response_type=DEFAULT_RESPONSE_PARSER_TYPE,
priority=False, important=False, mblogin=True, queryargs=None):
request = WSRequest("POST", host, port, path, handler, parse_response_type=parse_response_type,
data=data, mblogin=mblogin, queryargs=queryargs,
priority=priority, important=important)
log.debug("POST-DATA %r", data)
func = partial(self._start_request, "POST", host, port, path, data, handler, parse_response_type, mblogin, queryargs=queryargs)
return self.add_task(func, host, port, priority, important=important)
func = partial(self._start_request, request)
return self.add_task(func, request)
def put(self, host, port, path, data, handler, priority=True, important=False, mblogin=True, queryargs=None):
func = partial(self._start_request, "PUT", host, port, path, data, handler, False, mblogin, queryargs=queryargs)
return self.add_task(func, host, port, priority, important=important)
def put(self, host, port, path, data, handler, priority=True, important=False, mblogin=True,
queryargs=None):
request = WSRequest("PUT", host, port, path, handler, data=data, mblogin=mblogin,
queryargs=queryargs, priority=priority, important=important)
func = partial(self._start_request, request)
return self.add_task(func, request)
def delete(self, host, port, path, handler, priority=True, important=False, mblogin=True, queryargs=None):
func = partial(self._start_request, "DELETE", host, port, path, None, handler, False, mblogin, queryargs=queryargs)
return self.add_task(func, host, port, priority, important=important)
def delete(self, host, port, path, handler, priority=True, important=False, mblogin=True,
queryargs=None):
request = WSRequest("DELETE", host, port, path, handler, mblogin=mblogin,
queryargs=queryargs, priority=priority, important=important)
func = partial(self._start_request, request)
return self.add_task(func, request)
def download(self, host, port, path, handler, priority=False,
important=False, cacheloadcontrol=None, refresh=False,
@@ -361,17 +483,20 @@ class WebService(QtCore.QObject):
if delay < sys.maxsize:
self._timer_run_next_task.start(delay)
def add_task(self, func, host, port, priority, important=False):
hostkey = (host, port)
prio = int(priority) # priority is a boolean
if important:
def add_task(self, func, request):
hostkey = request.get_host_key()
prio = int(request.priority) # priority is a boolean
if request.important:
self._queues[prio][hostkey].appendleft(func)
else:
self._queues[prio][hostkey].append(func)
if not self._timer_run_next_task.isActive():
self._timer_run_next_task.start(0)
if not self._timer_count_pending_requests.isActive():
self._timer_count_pending_requests.start(0)
return (hostkey, func, prio)
def remove_task(self, task):