Merge pull request #2225 from zas/ws_url

Introduce WebService (get|post|put|delete|download)_url() methods
This commit is contained in:
Laurent Monin
2023-06-11 17:09:30 +02:00
committed by GitHub
20 changed files with 504 additions and 312 deletions

View File

@@ -628,7 +628,7 @@ class Album(DataObject, Item):
self.load_task = self.tagger.mb_api.get_release_by_id(
self.id,
self._release_request_finished,
inc=tuple(inc),
inc=inc,
mblogin=require_authentication,
priority=priority,
refresh=refresh

View File

@@ -53,8 +53,7 @@ CACHE_SIZE_IN_BYTES = 100*1000*1000
# AcoustID client API key
ACOUSTID_KEY = 'v8pQ6oyB'
ACOUSTID_HOST = 'api.acoustid.org'
ACOUSTID_PORT = 443
ACOUSTID_URL = 'https://api.acoustid.org/v2'
FPCALC_NAMES = ['fpcalc', 'pyfpcalc']
DEFAULT_FPCALC_THREADS = 2
@@ -62,9 +61,8 @@ DEFAULT_FPCALC_THREADS = 2
MUSICBRAINZ_OAUTH_CLIENT_ID = 'ACa9wsDX19cLp-AeEP-vVw'
MUSICBRAINZ_OAUTH_CLIENT_SECRET = 'xIsvXbIuntaLuRRhzuazOA'
# Cover art archive URL and port
CAA_HOST = "coverartarchive.org"
CAA_PORT = 443
# Cover art archive URL
CAA_URL = 'https://coverartarchive.org'
# Prepare documentation URLs
if PICARD_VERSION.identifier == 'final':
@@ -144,14 +142,13 @@ MUSICBRAINZ_SERVERS = [
]
# Plugins and Release Versions API
PLUGINS_API_BASE_URL = 'https://picard.musicbrainz.org/api/v2/'
PLUGINS_API = {
'host': 'picard.musicbrainz.org',
'port': 443,
'endpoint': {
'plugins': '/api/v2/plugins/',
'download': '/api/v2/download/',
'releases': '/api/v2/releases',
}
'urls': {
'plugins': PLUGINS_API_BASE_URL + 'plugins/',
'download': PLUGINS_API_BASE_URL + 'download/',
'releases': PLUGINS_API_BASE_URL + 'releases',
},
}
# Default query limit

View File

@@ -104,7 +104,7 @@ class CoverArt:
{
'type': coverartimage.types_as_string(),
'albumid': self.album.id,
'host': coverartimage.host
'host': coverartimage.url.host(),
},
echo=None
)
@@ -190,19 +190,15 @@ class CoverArt:
{
'type': coverartimage.types_as_string(),
'albumid': self.album.id,
'host': coverartimage.host
'host': coverartimage.url.host(),
},
echo=None
)
log.debug("Downloading %r", coverartimage)
self.album.tagger.webservice.download(
coverartimage.host,
coverartimage.port,
coverartimage.path,
partial(self._coverart_downloaded, coverartimage),
queryargs=coverartimage.queryargs,
self.album.tagger.webservice.download_url(
url=coverartimage.url,
handler=partial(self._coverart_downloaded, coverartimage),
priority=True,
important=False
)
self.album._requests += 1

View File

@@ -37,7 +37,6 @@ from PyQt5.QtCore import (
QMutex,
QObject,
QUrl,
QUrlQuery,
)
from picard import log
@@ -160,7 +159,10 @@ class CoverArtImage:
else:
self.types = types
if url is not None:
self.parse_url(url)
if not isinstance(url, QUrl):
self.url = QUrl(url)
else:
self.url = url
else:
self.url = None
self.comment = comment
@@ -178,17 +180,6 @@ class CoverArtImage:
if data is not None:
self.set_data(data)
def parse_url(self, url):
self.url = QUrl(url)
self.host = self.url.host()
self.port = self.url.port(443 if self.url.scheme() == 'https' else 80)
self.path = self.url.path(QUrl.ComponentFormattingOption.FullyEncoded)
if self.url.hasQuery():
query = QUrlQuery(self.url.query())
self.queryargs = dict(query.queryItems())
else:
self.queryargs = None
@property
def source(self):
if self.url is not None:

View File

@@ -52,10 +52,7 @@ from picard.config import (
ListOption,
get_config,
)
from picard.const import (
CAA_HOST,
CAA_PORT,
)
from picard.const import CAA_URL
from picard.coverart.image import (
CaaCoverArtImage,
CaaThumbnailCoverArtImage,
@@ -92,8 +89,8 @@ _CAA_IMAGE_SIZE_DEFAULT = 500
_CAA_IMAGE_TYPE_DEFAULT_INCLUDE = ['front']
_CAA_IMAGE_TYPE_DEFAULT_EXCLUDE = ['matrix/runout', 'raw/unedited', 'watermark']
ratecontrol.set_minimum_delay((CAA_HOST, CAA_PORT), 0)
ratecontrol.set_minimum_delay(('archive.org', 443), 0)
ratecontrol.set_minimum_delay_for_url(CAA_URL, 0)
ratecontrol.set_minimum_delay_for_url('https://archive.org', 0)
def caa_url_fallback_list(desired_size, thumbnails):
@@ -581,14 +578,12 @@ class CoverArtProviderCaa(CoverArtProvider):
return "/release/%s/" % self.metadata["musicbrainz_albumid"]
def queue_images(self):
self.album.tagger.webservice.get(
CAA_HOST,
CAA_PORT,
self._caa_path,
self._caa_json_downloaded,
self.album.tagger.webservice.get_url(
url=CAA_URL + self._caa_path,
handler=self._caa_json_downloaded,
priority=True,
important=False,
cacheloadcontrol=QNetworkRequest.CacheLoadControl.PreferNetwork
cacheloadcontrol=QNetworkRequest.CacheLoadControl.PreferNetwork,
)
self.album._requests += 1
# we will call next_in_queue() after json parsing

View File

@@ -150,6 +150,12 @@ class OAuthManager(object):
self.forget_access_token()
self.refresh_access_token(callback)
def url(self, path=None, params=None):
return build_qurl(
self.host, self.port, path=path,
queryargs=params
)
def get_authorization_url(self, scopes):
params = {
"response_type": "code",
@@ -157,11 +163,7 @@ class OAuthManager(object):
"redirect_uri": "urn:ietf:wg:oauth:2.0:oob",
"scope": scopes,
}
url = build_qurl(
self.host, self.port, path="/oauth2/authorize",
queryargs=params
)
return bytes(url.toEncoded()).decode()
return bytes(self.url(path="/oauth2/authorize", params=params).toEncoded()).decode()
def set_refresh_token(self, refresh_token, scopes):
log.debug("OAuth: got refresh_token %s with scopes %s", refresh_token, scopes)
@@ -179,18 +181,20 @@ class OAuthManager(object):
def refresh_access_token(self, callback):
log.debug("OAuth: refreshing access_token with a refresh_token %s", self.refresh_token)
path = "/oauth2/token"
params = {
"grant_type": "refresh_token",
"refresh_token": self.refresh_token,
"client_id": MUSICBRAINZ_OAUTH_CLIENT_ID,
"client_secret": MUSICBRAINZ_OAUTH_CLIENT_SECRET,
}
self.webservice.post(
self.host, self.port, path, self._query_data(params),
partial(self.on_refresh_access_token_finished, callback),
mblogin=True, priority=True, important=True,
request_mimetype="application/x-www-form-urlencoded"
self.webservice.post_url(
url=self.url(path="/oauth2/token"),
data=self._query_data(params),
handler=partial(self.on_refresh_access_token_finished, callback),
mblogin=True,
priority=True,
important=True,
request_mimetype="application/x-www-form-urlencoded",
)
def on_refresh_access_token_finished(self, callback, data, http, error):
@@ -212,7 +216,6 @@ class OAuthManager(object):
def exchange_authorization_code(self, authorization_code, scopes, callback):
log.debug("OAuth: exchanging authorization_code %s for an access_token", authorization_code)
path = "/oauth2/token"
params = {
"grant_type": "authorization_code",
"code": authorization_code,
@@ -220,11 +223,14 @@ class OAuthManager(object):
"client_secret": MUSICBRAINZ_OAUTH_CLIENT_SECRET,
"redirect_uri": "urn:ietf:wg:oauth:2.0:oob",
}
self.webservice.post(
self.host, self.port, path, self._query_data(params),
partial(self.on_exchange_authorization_code_finished, scopes, callback),
mblogin=True, priority=True, important=True,
request_mimetype="application/x-www-form-urlencoded"
self.webservice.post_url(
url=self.url(path="/oauth2/token"),
data=self._query_data(params),
handler=partial(self.on_exchange_authorization_code_finished, scopes, callback),
mblogin=True,
priority=True,
important=True,
request_mimetype="application/x-www-form-urlencoded",
)
def on_exchange_authorization_code_finished(self, scopes, callback, data, http, error):
@@ -246,11 +252,12 @@ class OAuthManager(object):
def fetch_username(self, callback):
log.debug("OAuth: fetching username")
path = "/oauth2/userinfo"
self.webservice.get(
self.host, self.port, path,
partial(self.on_fetch_username_finished, callback),
mblogin=True, priority=True, important=True
self.webservice.get_url(
url=self.url(path="/oauth2/userinfo"),
handler=partial(self.on_fetch_username_finished, callback),
mblogin=True,
priority=True,
important=True,
)
def on_fetch_username_finished(self, callback, data, http, error):

View File

@@ -414,13 +414,11 @@ class PluginManager(QtCore.QObject):
self.plugin_updated.emit(plugin_name, False)
def query_available_plugins(self, callback=None):
self.tagger.webservice.get(
PLUGINS_API['host'],
PLUGINS_API['port'],
PLUGINS_API['endpoint']['plugins'],
partial(self._plugins_json_loaded, callback=callback),
self.tagger.webservice.get_url(
url=PLUGINS_API['urls']['plugins'],
handler=partial(self._plugins_json_loaded, callback=callback),
priority=True,
important=True
important=True,
)
def is_available(self, plugin_name):

View File

@@ -408,7 +408,7 @@ class NonAlbumTrack(Track):
self.tagger.mb_api.get_track_by_id(
self.id,
self._recording_request_finished,
inc=tuple(inc),
inc=inc,
mblogin=require_authentication,
priority=priority,
refresh=refresh

View File

@@ -465,20 +465,12 @@ class CoverArtBox(QtWidgets.QGroupBox):
self.load_remote_image(url, fallback_data)
if url.scheme() in {'http', 'https'}:
path = url.path()
if url.hasQuery():
query = QtCore.QUrlQuery(url.query())
queryargs = dict(query.queryItems())
else:
queryargs = {}
if url.scheme() == 'https':
port = 443
else:
port = 80
self.tagger.webservice.get(url.host(), url.port(port), path,
partial(self.on_remote_image_fetched, url, fallback_data=fallback_data),
parse_response_type=None, queryargs=queryargs,
priority=True, important=True)
self.tagger.webservice.download_url(
url=url,
handler=partial(self.on_remote_image_fetched, url, fallback_data=fallback_data),
priority=True,
important=True,
)
elif url.scheme() == 'file':
path = normpath(url.toLocalFile().rstrip("\0"))
if path and os.path.exists(path):

View File

@@ -657,15 +657,13 @@ class PluginsOptionsPage(OptionsPage):
def download_plugin(self, item, update=False):
plugin = item.plugin
self.tagger.webservice.get(
PLUGINS_API['host'],
PLUGINS_API['port'],
PLUGINS_API['endpoint']['download'],
partial(self.download_handler, update, plugin=plugin),
self.tagger.webservice.get_url(
url=PLUGINS_API['urls']['download'],
handler=partial(self.download_handler, update, plugin=plugin),
parse_response_type=None,
priority=True,
important=True,
queryargs={"id": plugin.module_name, "version": plugin.version.to_string(short=True)}
unencoded_queryargs={"id": plugin.module_name, "version": plugin.version.to_string(short=True)},
)
def download_handler(self, update, response, reply, error, plugin):

View File

@@ -35,11 +35,7 @@ from picard.config import (
Option,
get_config,
)
from picard.const import (
CAA_HOST,
CAA_PORT,
)
from picard.coverart.image import CaaThumbnailCoverArtImage
from picard.const import CAA_URL
from picard.mbjson import (
countries_from_node,
media_formats_from_node,
@@ -248,12 +244,10 @@ class AlbumSearchDialog(SearchDialog):
if not cell.is_visible():
return
cell.fetched = True
caa_path = "/release/%s" % cell.release["musicbrainz_albumid"]
cell.fetch_task = self.tagger.webservice.get(
CAA_HOST,
CAA_PORT,
caa_path,
partial(self._caa_json_downloaded, cell)
mbid = cell.release["musicbrainz_albumid"]
cell.fetch_task = self.tagger.webservice.get_url(
url=f'{CAA_URL}/release/{mbid}',
handler=partial(self._caa_json_downloaded, cell),
)
def _caa_json_downloaded(self, cover_cell, data, http, error):
@@ -275,13 +269,9 @@ class AlbumSearchDialog(SearchDialog):
break
if front:
url = front["thumbnails"]["small"]
coverartimage = CaaThumbnailCoverArtImage(url=url)
cover_cell.fetch_task = self.tagger.webservice.download(
coverartimage.host,
coverartimage.port,
coverartimage.path,
partial(self._cover_downloaded, cover_cell)
cover_cell.fetch_task = self.tagger.webservice.download_url(
url=front["thumbnails"]["small"],
handler=partial(self._cover_downloaded, cover_cell)
)
else:
cover_cell.not_found()

View File

@@ -676,6 +676,17 @@ def album_artist_from_path(filename, album, artist):
return album, artist
def encoded_queryargs(queryargs):
"""
Percent-encode all values from passed dictionary
Keys are left unmodified
"""
return {
name: bytes(QtCore.QUrl.toPercentEncoding(str(value))).decode()
for name, value in queryargs.items()
}
def build_qurl(host, port=80, path=None, queryargs=None):
"""
Builds and returns a QUrl object from `host`, `port` and `path` and

View File

@@ -87,12 +87,10 @@ class UpdateCheckManager(QtCore.QObject):
def _query_available_updates(self, callback=None):
"""Gets list of releases from specified website api."""
log.debug("Getting Picard release information from %s", PLUGINS_API['host'])
self.tagger.webservice.get(
PLUGINS_API['host'],
PLUGINS_API['port'],
PLUGINS_API['endpoint']['releases'],
partial(self._releases_json_loaded, callback=callback),
log.debug("Getting Picard release information from %s", PLUGINS_API['urls']['releases'])
self.tagger.webservice.get_url(
url=PLUGINS_API['urls']['releases'],
handler=partial(self._releases_json_loaded, callback=callback),
priority=True,
important=True
)
@@ -105,9 +103,8 @@ class UpdateCheckManager(QtCore.QObject):
QMessageBox.information(
self._parent,
_("Picard Update"),
_("Unable to retrieve the latest version information from the website.\n(https://{url}{endpoint})").format(
url=PLUGINS_API['host'],
endpoint=PLUGINS_API['endpoint']['releases'],
_("Unable to retrieve the latest version information from the website.\n({url})").format(
url=PLUGINS_API['urls']['releases'],
),
QMessageBox.StandardButton.Ok, QMessageBox.StandardButton.Ok)
else:

View File

@@ -60,10 +60,12 @@ from picard.oauth import OAuthManager
from picard.util import (
build_qurl,
bytes2human,
encoded_queryargs,
parse_json,
)
from picard.util.xml import parse_xml
from picard.webservice import ratecontrol
from picard.webservice.utils import port_from_qurl
COUNT_REQUESTS_DELAY_MS = 250
@@ -74,9 +76,7 @@ USER_AGENT_STRING = '%s-%s/%s (%s;%s-%s)' % (PICARD_ORG_NAME, PICARD_APP_NAME,
platform.platform(),
platform.python_implementation(),
platform.python_version())
CLIENT_STRING = bytes(QUrl.toPercentEncoding('%s %s-%s' % (PICARD_ORG_NAME,
PICARD_APP_NAME,
PICARD_VERSION_STR))).decode()
CLIENT_STRING = '%s %s-%s' % (PICARD_ORG_NAME, PICARD_APP_NAME, PICARD_VERSION_STR)
DEFAULT_RESPONSE_PARSER_TYPE = "json"
@@ -115,6 +115,8 @@ class WSRequest(QNetworkRequest):
important=False,
request_mimetype=None,
url=None,
queryargs=None,
unencoded_queryargs=None,
):
"""
Args:
@@ -134,6 +136,8 @@ class WSRequest(QNetworkRequest):
important: Indicates that this is an important request.
request_mimetype: Set the Content-Type header.
url: URL passed as a string or as a QUrl to use for this request
queryargs: Encoded query arguments, a dictionary mapping field names to values
unencoded_queryargs: Unencoded query arguments, a dictionary mapping field names to values
"""
# mandatory parameters
self.method = method
@@ -149,6 +153,17 @@ class WSRequest(QNetworkRequest):
if not isinstance(url, QUrl):
url = QUrl(url)
if queryargs is not None or unencoded_queryargs is not None:
if queryargs is None:
queryargs = {}
if unencoded_queryargs:
queryargs.update(encoded_queryargs(unencoded_queryargs))
query = QtCore.QUrlQuery(url)
for k, v in queryargs.items():
query.addQueryItem(k, str(v))
url.setQuery(query)
super().__init__(url)
# optional parameters
@@ -204,11 +219,7 @@ class WSRequest(QNetworkRequest):
@property
def port(self):
"""Returns QUrl port or default ports (443 for https, 80 for http)"""
url = self.url()
if url.scheme() == 'https':
return url.port(443)
return url.port(80)
return port_from_qurl(self.url())
@property
def path(self):
@@ -340,8 +351,8 @@ class WebService(QtCore.QObject):
return reply.attribute(QNetworkRequest.Attribute.HttpReasonPhraseAttribute)
@staticmethod
def http_response_safe_url(reply):
return reply.request().url().toString(QUrl.UrlFormattingOption.RemoveUserInfo)
def display_url(url):
return url.toDisplayString(QUrl.UrlFormattingOption.RemoveUserInfo | QUrl.ComponentFormattingOption.EncodeSpaces)
def _network_accessible_changed(self, accessible):
# Qt's network accessibility check sometimes fails, e.g. with VPNs on Windows.
@@ -454,13 +465,16 @@ class WebService(QtCore.QObject):
def _handle_redirect(self, reply, request, redirect):
error = int(reply.error())
# merge with base url (to cover the possibility of the URL being relative)
redirect_qurl = request.url().resolved(redirect)
if not WebService.urls_equivalent(redirect_qurl, reply.request().url()):
log.debug("Redirect to %s requested", redirect_qurl.toString(QUrl.UrlFormattingOption.RemoveUserInfo))
redirect_url = request.url().resolved(redirect)
reply_url = reply.request().url()
display_redirect_url = self.display_url(redirect_url)
display_reply_url = self.display_url(reply_url)
if not WebService.urls_equivalent(redirect_url, reply_url):
log.debug("Redirect to %s requested", display_redirect_url)
redirect_request = WSRequest(
method='GET',
url=redirect_qurl,
url=redirect_url,
handler=request.handler,
parse_response_type=request.parse_response_type,
priority=True,
@@ -477,9 +491,7 @@ class WebService(QtCore.QObject):
self.add_request(redirect_request)
else:
log.error("Redirect loop: %s",
self.http_response_safe_url(reply)
)
log.error("Redirect loop: %s", display_reply_url)
request.handler(reply.readAll(), reply, error)
def _handle_reply(self, reply, request):
@@ -493,11 +505,11 @@ class WebService(QtCore.QObject):
error = int(reply.error())
handler = request.handler
response_code = self.http_response_code(reply)
url = self.http_response_safe_url(reply)
display_reply_url = self.display_url(reply.request().url())
if error:
errstr = reply.errorString()
log.error("Network request error for %s: %s (QT code %d, HTTP code %d)",
url, errstr, error, response_code)
log.error("Network request error for %s -> %s (QT code %d, HTTP code %d)",
display_reply_url, errstr, error, response_code)
if (not request.max_retries_reached()
and (response_code == 503
or response_code == 429
@@ -508,7 +520,7 @@ class WebService(QtCore.QObject):
)):
slow_down = True
retries = request.mark_for_retry()
log.debug("Retrying %s (#%d)", url, retries)
log.debug("Retrying %s (#%d)", display_reply_url, retries)
self.add_request(request)
elif handler is not None:
@@ -520,8 +532,8 @@ class WebService(QtCore.QObject):
redirect = reply.attribute(QNetworkRequest.Attribute.RedirectionTargetAttribute)
from_cache = reply.attribute(QNetworkRequest.Attribute.SourceIsFromCacheAttribute)
cached = ' (CACHED)' if from_cache else ''
log.debug("Received reply for %s: HTTP %d (%s) %s",
url,
log.debug("Received reply for %s -> HTTP %d (%s) %s",
display_reply_url,
response_code,
self.http_response_phrase(reply),
cached
@@ -535,7 +547,7 @@ class WebService(QtCore.QObject):
document = request.response_parser(reply)
log.debug("Response received: %s", document)
except Exception as e:
log.error("Unable to parse the response for %s: %s", url, e)
log.error("Unable to parse the response for %s -> %s", display_reply_url, e)
document = reply.readAll()
error = e
finally:
@@ -549,7 +561,8 @@ class WebService(QtCore.QObject):
try:
request = self._active_requests.pop(reply)
except KeyError:
log.error("Request not found for %s", self.http_response_safe_url(reply))
display_reply_url = self.display_url(reply.request().url())
log.error("Request not found for %s", display_reply_url)
return
try:
self._handle_reply(reply, request)
@@ -560,6 +573,7 @@ class WebService(QtCore.QObject):
def get(self, host, port, path, handler, parse_response_type=DEFAULT_RESPONSE_PARSER_TYPE,
priority=False, important=False, mblogin=False, cacheloadcontrol=None, refresh=False,
queryargs=None):
log.warning("This method is deprecated, use WebService.get_url() instead")
request = WSRequest(
method='GET',
url=build_qurl(host, port, path=path, queryargs=queryargs),
@@ -575,6 +589,7 @@ class WebService(QtCore.QObject):
def post(self, host, port, path, data, handler, parse_response_type=DEFAULT_RESPONSE_PARSER_TYPE,
priority=False, important=False, mblogin=True, queryargs=None, request_mimetype=None):
log.warning("This method is deprecated, use WebService.post_url() instead")
request = WSRequest(
method='POST',
url=build_qurl(host, port, path=path, queryargs=queryargs),
@@ -591,6 +606,7 @@ class WebService(QtCore.QObject):
def put(self, host, port, path, data, handler, priority=True, important=False, mblogin=True,
queryargs=None, request_mimetype=None):
log.warning("This method is deprecated, use WebService.put_url() instead")
request = WSRequest(
method='PUT',
url=build_qurl(host, port, path=path, queryargs=queryargs),
@@ -605,6 +621,7 @@ class WebService(QtCore.QObject):
def delete(self, host, port, path, handler, priority=True, important=False, mblogin=True,
queryargs=None):
log.warning("This method is deprecated, use WebService.delete_url() instead")
request = WSRequest(
method='DELETE',
url=build_qurl(host, port, path=path, queryargs=queryargs),
@@ -618,6 +635,7 @@ class WebService(QtCore.QObject):
def download(self, host, port, path, handler, priority=False,
important=False, cacheloadcontrol=None, refresh=False,
queryargs=None):
log.warning("This method is deprecated, use WebService.download_url() instead")
request = WSRequest(
method='GET',
url=build_qurl(host, port, path=path, queryargs=queryargs),
@@ -629,6 +647,34 @@ class WebService(QtCore.QObject):
)
return self.add_request(request)
def get_url(self, **kwargs):
kwargs['method'] = 'GET'
kwargs['parse_response_type'] = kwargs.get('parse_response_type', DEFAULT_RESPONSE_PARSER_TYPE)
return self.add_request(WSRequest(**kwargs))
def post_url(self, **kwargs):
kwargs['method'] = 'POST'
kwargs['parse_response_type'] = kwargs.get('parse_response_type', DEFAULT_RESPONSE_PARSER_TYPE)
kwargs['mblogin'] = kwargs.get('mblogin', True)
log.debug("POST-DATA %r", kwargs['data'])
return self.add_request(WSRequest(**kwargs))
def put_url(self, **kwargs):
kwargs['method'] = 'PUT'
kwargs['priority'] = kwargs.get('priority', True)
kwargs['mblogin'] = kwargs.get('mblogin', True)
return self.add_request(WSRequest(**kwargs))
def delete_url(self, **kwargs):
kwargs['method'] = 'DELETE'
kwargs['priority'] = kwargs.get('priority', True)
kwargs['mblogin'] = kwargs.get('mblogin', True)
return self.add_request(WSRequest(**kwargs))
def download_url(self, **kwargs):
kwargs['method'] = 'GET'
return self.add_request(WSRequest(**kwargs))
def stop(self):
for reply in list(self._active_requests):
reply.abort()

View File

@@ -3,8 +3,8 @@
# Picard, the next-generation MusicBrainz tagger
#
# Copyright (C) 2017 Sambhav Kothari
# Copyright (C) 2018, 2020-2021 Laurent Monin
# Copyright (C) 2018-2022 Philipp Wolfer
# Copyright (C) 2018, 2020-2021, 2023 Laurent Monin
# Copyright (C) 2018-2023 Philipp Wolfer
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
@@ -29,18 +29,18 @@ from PyQt5.QtCore import QUrl
from picard import PICARD_VERSION_STR
from picard.config import get_config
from picard.const import (
ACOUSTID_HOST,
ACOUSTID_KEY,
ACOUSTID_PORT,
ACOUSTID_URL,
)
from picard.util import encoded_queryargs
from picard.webservice import (
CLIENT_STRING,
DEFAULT_RESPONSE_PARSER_TYPE,
ratecontrol,
)
from picard.webservice.utils import host_port_to_url
ratecontrol.set_minimum_delay((ACOUSTID_HOST, ACOUSTID_PORT), 333)
ratecontrol.set_minimum_delay_for_url(ACOUSTID_URL, 333)
def escape_lucene_query(text):
@@ -59,91 +59,84 @@ def _wrap_xml_metadata(data):
class APIHelper(object):
_base_url = None
def __init__(self, host, port, api_path, webservice):
self._host = host
self._port = port
self.api_path = api_path
def __init__(self, webservice, base_url=None):
self._webservice = webservice
if base_url is not None:
self.base_url = base_url
@property
def base_url(self):
if self._base_url is None:
raise ValueError("base_url undefined")
return self._base_url
@base_url.setter
def base_url(self, url):
if not isinstance(url, QUrl):
url = QUrl(url)
self._base_url = url
@property
def webservice(self):
return self._webservice
@property
def host(self):
return self._host
def url_from_path(self, path):
url = QUrl(self.base_url)
url.setPath(url.path() + path)
return url
@property
def port(self):
return self._port
def get(self, path, handler, **kwargs):
kwargs['url'] = self.url_from_path(path)
kwargs['handler'] = handler
return self._webservice.get_url(**kwargs)
def get(self, path_list, handler, priority=False, important=False, mblogin=False,
cacheloadcontrol=None, refresh=False, queryargs=None, parse_response_type=DEFAULT_RESPONSE_PARSER_TYPE):
path = self.api_path + "/".join(path_list)
return self._webservice.get(self.host, self.port, path, handler,
priority=priority, important=important, mblogin=mblogin,
refresh=refresh, queryargs=queryargs, parse_response_type=parse_response_type)
def post(self, path, data, handler, **kwargs):
kwargs['url'] = self.url_from_path(path)
kwargs['handler'] = handler
kwargs['data'] = data
kwargs['mblogin'] = kwargs.get('mblogin', True)
return self._webservice.post_url(**kwargs)
def post(self, path_list, data, handler, priority=False, important=False,
mblogin=True, queryargs=None, parse_response_type=DEFAULT_RESPONSE_PARSER_TYPE,
request_mimetype=None):
path = self.api_path + "/".join(path_list)
return self._webservice.post(self.host, self.port, path, data, handler,
priority=priority, important=important, mblogin=mblogin,
queryargs=queryargs, parse_response_type=parse_response_type,
request_mimetype=request_mimetype)
def put(self, path, data, handler, **kwargs):
kwargs['url'] = self.url_from_path(path)
kwargs['handler'] = handler
kwargs['data'] = data
kwargs['priority'] = kwargs.get('priority', True)
kwargs['mblogin'] = kwargs.get('mblogin', True)
return self._webservice.put_url(**kwargs)
def put(self, path_list, data, handler, priority=True, important=False,
mblogin=True, queryargs=None, request_mimetype=None):
path = self.api_path + "/".join(path_list)
return self._webservice.put(self.host, self.port, path, data, handler,
priority=priority, important=important, mblogin=mblogin,
queryargs=queryargs, request_mimetype=request_mimetype)
def delete(self, path_list, handler, priority=True, important=False,
mblogin=True, queryargs=None):
path = self.api_path + "/".join(path_list)
return self._webservice.delete(self.host, self.port, path, handler,
priority=priority, important=important, mblogin=mblogin,
queryargs=queryargs)
def delete(self, path, handler, **kwargs):
kwargs['url'] = self.url_from_path(path)
kwargs['handler'] = handler
kwargs['priority'] = kwargs.get('priority', True)
kwargs['mblogin'] = kwargs.get('mblogin', True)
return self._webservice.delete_url(**kwargs)
class MBAPIHelper(APIHelper):
def __init__(self, webservice):
super().__init__(None, None, "/ws/2/", webservice)
@property
def host(self):
def base_url(self):
# we have to keep it dynamic since host/port can be changed via options
config = get_config()
return config.setting['server_host']
host = config.setting['server_host']
port = config.setting['server_port']
self._base_url = host_port_to_url(host, port)
self._base_url.setPath('/ws/2')
return self._base_url
@property
def port(self):
config = get_config()
return config.setting['server_port']
def _get_by_id(self, entitytype, entityid, handler, inc=None, queryargs=None,
priority=False, important=False, mblogin=False, refresh=False):
path_list = (entitytype, entityid)
if queryargs is None:
queryargs = {}
def _get_by_id(self, entitytype, entityid, handler, inc=None, **kwargs):
if inc:
queryargs["inc"] = "+".join(inc)
return self.get(path_list, handler,
priority=priority, important=important, mblogin=mblogin,
refresh=refresh, queryargs=queryargs)
kwargs['unencoded_queryargs'] = kwargs.get('queryargs', {})
kwargs['unencoded_queryargs']['inc'] = self._make_inc_arg(inc)
return self.get(f"/{entitytype}/{entityid}", handler, **kwargs)
def get_release_by_id(self, releaseid, handler, inc=None,
priority=False, important=False, mblogin=False, refresh=False):
return self._get_by_id('release', releaseid, handler, inc,
priority=priority, important=important, mblogin=mblogin, refresh=refresh)
def get_release_by_id(self, releaseid, handler, inc=None, **kwargs):
return self._get_by_id('release', releaseid, handler, inc, **kwargs)
def get_track_by_id(self, trackid, handler, inc=None,
priority=False, important=False, mblogin=False, refresh=False):
return self._get_by_id('recording', trackid, handler, inc,
priority=priority, important=important, mblogin=mblogin, refresh=refresh)
def get_track_by_id(self, trackid, handler, inc=None, **kwargs):
return self._get_by_id('recording', trackid, handler, inc, **kwargs)
def lookup_discid(self, discid, handler, priority=True, important=True, refresh=False):
inc = ('artist-credits', 'labels')
@@ -151,11 +144,11 @@ class MBAPIHelper(APIHelper):
priority=priority, important=important, refresh=refresh)
def _find(self, entitytype, handler, **kwargs):
filters = []
filters = {}
limit = kwargs.pop("limit")
if limit:
filters.append(("limit", limit))
filters['limit'] = limit
is_search = kwargs.pop("search", False)
if is_search:
@@ -165,20 +158,14 @@ class MBAPIHelper(APIHelper):
query = kwargs["query"]
else:
query = escape_lucene_query(kwargs["query"]).strip().lower()
filters.append(("dismax", 'true'))
filters['dismax'] = 'true'
else:
query = build_lucene_query(kwargs)
if query:
filters.append(("query", query))
filters['query'] = query
queryargs = {
name: bytes(QUrl.toPercentEncoding(str(value))).decode()
for name, value in filters
}
path_list = (entitytype, )
return self.get(path_list, handler, queryargs=queryargs,
return self.get(f"/{entitytype}", handler, unencoded_queryargs=filters,
priority=True, important=True, mblogin=False,
refresh=False)
@@ -191,13 +178,22 @@ class MBAPIHelper(APIHelper):
def find_artists(self, handler, **kwargs):
return self._find('artist', handler, **kwargs)
@staticmethod
def _make_inc_arg(inc):
"""
Convert an iterable to a string to be passed as inc paramater to MB
It drops non-unique and empty elements, and sort them before joining
them as a '+'-separated string
"""
return '+'.join(sorted(set(str(e) for e in inc if e)))
def _browse(self, entitytype, handler, inc=None, queryargs=None, mblogin=False):
path_list = (entitytype, )
if queryargs is None:
queryargs = {}
if inc:
queryargs["inc"] = "+".join(inc)
return self.get(path_list, handler, queryargs=queryargs,
queryargs["inc"] = self._make_inc_arg(inc)
return self.get(f"/{entitytype}", handler, unencoded_queryargs=queryargs,
priority=True, important=True, mblogin=mblogin,
refresh=False)
@@ -217,27 +213,26 @@ class MBAPIHelper(APIHelper):
return _wrap_xml_metadata('<recording-list>%s</recording-list>' % recordings)
def submit_ratings(self, ratings, handler):
path_list = ('rating', )
params = {"client": CLIENT_STRING}
data = self._xml_ratings(ratings)
return self.post(path_list, data, handler, priority=True,
queryargs=params, parse_response_type="xml",
return self.post("/rating", data, handler, priority=True,
unencoded_queryargs=params, parse_response_type="xml",
request_mimetype="application/xml; charset=utf-8")
def get_collection(self, collection_id, handler, limit=100, offset=0):
if collection_id is not None:
inc = ("releases", "artist-credits", "media")
path_list = ('collection', collection_id, "releases")
path = f"/collection/{collection_id}/releases"
queryargs = {
"inc": "+".join(inc),
"inc": self._make_inc_arg(inc),
"limit": limit,
"offset": offset,
}
else:
path_list = ('collection', )
path = '/collection'
queryargs = None
return self.get(tuple(path_list), handler, priority=True, important=True,
mblogin=True, queryargs=queryargs)
return self.get(path, handler, priority=True, important=True,
mblogin=True, unencoded_queryargs=queryargs)
def get_collection_list(self, handler):
return self.get_collection(None, handler)
@@ -246,50 +241,41 @@ class MBAPIHelper(APIHelper):
def _collection_request(collection_id, releases, batchsize=400):
for i in range(0, len(releases), batchsize):
ids = ";".join(releases[i:i+batchsize])
yield ("collection", collection_id, "releases", ids)
yield f"/collection/{collection_id}/releases/{ids}"
@staticmethod
def _get_client_queryarg():
return {"client": CLIENT_STRING}
def put_to_collection(self, collection_id, releases, handler):
for path_list in self._collection_request(collection_id, releases):
self.put(path_list, "", handler,
queryargs=self._get_client_queryarg())
for path in self._collection_request(collection_id, releases):
self.put(path, "", handler,
unencoded_queryargs=self._get_client_queryarg())
def delete_from_collection(self, collection_id, releases, handler):
for path_list in self._collection_request(collection_id, releases):
self.delete(path_list, handler,
queryargs=self._get_client_queryarg())
for path in self._collection_request(collection_id, releases):
self.delete(path, handler,
unencoded_queryargs=self._get_client_queryarg())
class AcoustIdAPIHelper(APIHelper):
acoustid_host = ACOUSTID_HOST
acoustid_port = ACOUSTID_PORT
client_key = ACOUSTID_KEY
client_version = PICARD_VERSION_STR
def __init__(self, webservice):
super().__init__(
self.acoustid_host, self.acoustid_port, '/v2/', webservice
)
super().__init__(webservice, base_url=ACOUSTID_URL)
def _encode_acoustid_args(self, args):
filters = []
args['client'] = self.client_key
args['clientversion'] = self.client_version
args['format'] = 'json'
for name, value in args.items():
value = bytes(QUrl.toPercentEncoding(value)).decode()
filters.append('%s=%s' % (name, value))
return '&'.join(filters)
return '&'.join((k + '=' + v for k, v in encoded_queryargs(args).items()))
def query_acoustid(self, handler, **args):
path_list = ('lookup', )
body = self._encode_acoustid_args(args)
return self.post(
path_list, body, handler, priority=False, important=False,
"/lookup", body, handler, priority=False, important=False,
mblogin=False, request_mimetype="application/x-www-form-urlencoded"
)
@@ -304,10 +290,9 @@ class AcoustIdAPIHelper(APIHelper):
return args
def submit_acoustid_fingerprints(self, submissions, handler):
path_list = ('submit', )
args = self._submissions_to_args(submissions)
body = self._encode_acoustid_args(args)
return self.post(
path_list, body, handler, priority=True, important=False,
"/submit", body, handler, priority=True, important=False,
mblogin=False, request_mimetype="application/x-www-form-urlencoded"
)

View File

@@ -29,6 +29,7 @@ import sys
import time
from picard import log
from picard.webservice.utils import hostkey_from_url
# ============================================================================
@@ -85,6 +86,14 @@ def set_minimum_delay(hostkey, delay_ms):
REQUEST_DELAY_MINIMUM[hostkey] = delay_ms
def set_minimum_delay_for_url(url, delay_ms):
"""Set the minimun delay between requests
url will be converted to an unique key (host, port)
delay_ms is the delay in milliseconds
"""
set_minimum_delay(hostkey_from_url(url), delay_ms)
def current_delay(hostkey):
"""Returns the current delay (adaptive) between requests for this hostkey
hostkey is an unique key, for example (host, port)

View File

@@ -0,0 +1,67 @@
# -*- coding: utf-8 -*-
#
# Picard, the next-generation MusicBrainz tagger
#
# Copyright (C) 2007 Lukáš Lalinský
# Copyright (C) 2009 Carlin Mangar
# Copyright (C) 2017 Sambhav Kothari
# Copyright (C) 2018-2022 Philipp Wolfer
# Copyright (C) 2018-2023 Laurent Monin
# Copyright (C) 2021 Tche333
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
Asynchronous web service utilities.
"""
from PyQt5.QtCore import QUrl
def port_from_qurl(qurl):
"""Returns QUrl port or default ports (443 for https, 80 for http)"""
if qurl.scheme() == 'https':
return qurl.port(443)
return qurl.port(80)
def hostkey_from_url(url):
"""Returns (host, port) from passed url (as string or QUrl)"""
if not isinstance(url, QUrl):
url = QUrl(url)
return (url.host(), port_from_qurl(url))
def host_port_to_url(host, port, path=None, scheme=None, as_string=False):
"""Convert host & port (with optional path and scheme) to an URL"""
url = QUrl()
if scheme is None:
if port == 443:
scheme = 'https'
else:
scheme = 'http'
url.setScheme(scheme)
if ((scheme == 'https' and port != 443)
or (scheme == 'http' and port != 80)):
url.setPort(port)
url.setHost(host)
if path is not None:
url.setPath(path)
return url.toString() if as_string else url

View File

@@ -4,8 +4,8 @@
#
# Copyright (C) 2017 Sambhav Kothari
# Copyright (C) 2018 Wieland Hoffmann
# Copyright (C) 2018, 2020-2021 Laurent Monin
# Copyright (C) 2019-2022 Philipp Wolfer
# Copyright (C) 2018, 2020-2021, 2023 Laurent Monin
# Copyright (C) 2019-2023 Philipp Wolfer
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
@@ -24,6 +24,8 @@
from unittest.mock import MagicMock
from PyQt5.QtCore import QUrl
from test.picardtestcase import PicardTestCase
from picard.acoustid.manager import Submission
@@ -42,35 +44,31 @@ class APITest(PicardTestCase):
def setUp(self):
super().setUp()
self.host = "abc.com"
self.port = 80
self.api_path = "/v1/"
self.path_list = ['test', 'more', 'test']
self.complete_path = "/v1/test/more/test"
base_url = "http://abc.com/v1"
self.path = "/test/more/test"
self.complete_url = QUrl(base_url + self.path)
self.ws = MagicMock(auto_spec=WebService)
self.api = APIHelper(self.host, self.port, self.api_path, self.ws)
self.api = APIHelper(self.ws, base_url=base_url)
def _test_ws_function_args(self, ws_function):
self.assertGreater(ws_function.call_count, 0)
self.assertEqual(ws_function.call_args[0][0], self.host)
self.assertEqual(ws_function.call_args[0][1], self.port)
self.assertEqual(ws_function.call_args[0][2], self.complete_path)
self.assertEqual(ws_function.call_args[1]['url'], self.complete_url)
def test_get(self):
self.api.get(self.path_list, None)
self._test_ws_function_args(self.ws.get)
self.api.get(self.path, None)
self._test_ws_function_args(self.ws.get_url)
def test_post(self):
self.api.post(self.path_list, None, None)
self._test_ws_function_args(self.ws.post)
self.api.post(self.path, None, None)
self._test_ws_function_args(self.ws.post_url)
def test_put(self):
self.api.put(self.path_list, None, None)
self._test_ws_function_args(self.ws.put)
self.api.put(self.path, None, None)
self._test_ws_function_args(self.ws.put_url)
def test_delete(self):
self.api.delete(self.path_list, None)
self._test_ws_function_args(self.ws.delete)
self.api.delete(self.path, None)
self._test_ws_function_args(self.ws.delete_url)
class MBAPITest(PicardTestCase):
@@ -84,64 +82,64 @@ class MBAPITest(PicardTestCase):
def _test_ws_function_args(self, ws_function):
self.assertGreater(ws_function.call_count, 0)
self.assertEqual(ws_function.call_args[0][0], self.config['server_host'])
self.assertEqual(ws_function.call_args[0][1], self.config['server_port'])
self.assertIn("/ws/2/", ws_function.call_args[0][2])
url = ws_function.call_args[1]['url']
self.assertTrue(url.toString().startswith("https://mb.org/"))
self.assertTrue(url.path().startswith("/ws/2/"))
def assertInPath(self, ws_function, path):
self.assertIn(path, ws_function.call_args[0][2])
self.assertIn(path, ws_function.call_args[1]['url'].path())
def assertNotInPath(self, ws_function, path):
self.assertNotIn(path, ws_function.call_args[0][2])
self.assertNotIn(path, ws_function.call_args[1]['url'].path())
def assertInQuery(self, ws_function, argname, value=None):
query_args = ws_function.call_args[1]['queryargs']
self.assertIn(argname, query_args)
self.assertEqual(value, query_args[argname])
unencoded_query_args = ws_function.call_args[1]['unencoded_queryargs']
self.assertIn(argname, unencoded_query_args)
self.assertEqual(value, unencoded_query_args[argname])
def _test_inc_args(self, ws_function, arg_list):
self.assertInQuery(self.ws.get, 'inc', "+".join(arg_list))
self.assertInQuery(self.ws.get_url, 'inc', self.api._make_inc_arg(arg_list))
def test_get_release(self):
inc_args_list = ['test']
self.api.get_release_by_id("1", None, inc=inc_args_list)
self._test_ws_function_args(self.ws.get)
self.assertInPath(self.ws.get, "/release/1")
self._test_inc_args(self.ws.get, inc_args_list)
self._test_ws_function_args(self.ws.get_url)
self.assertInPath(self.ws.get_url, "/release/1")
self._test_inc_args(self.ws.get_url, inc_args_list)
def test_get_track(self):
inc_args_list = ['test']
self.api.get_track_by_id("1", None, inc=inc_args_list)
self._test_ws_function_args(self.ws.get)
self.assertInPath(self.ws.get, "/recording/1")
self._test_inc_args(self.ws.get, inc_args_list)
self._test_ws_function_args(self.ws.get_url)
self.assertInPath(self.ws.get_url, "/recording/1")
self._test_inc_args(self.ws.get_url, inc_args_list)
def test_get_collection(self):
inc_args_list = ["releases", "artist-credits", "media"]
self.api.get_collection("1", None)
self._test_ws_function_args(self.ws.get)
self.assertInPath(self.ws.get, "collection")
self.assertInPath(self.ws.get, "1/releases")
self._test_inc_args(self.ws.get, inc_args_list)
self._test_ws_function_args(self.ws.get_url)
self.assertInPath(self.ws.get_url, "collection")
self.assertInPath(self.ws.get_url, "1/releases")
self._test_inc_args(self.ws.get_url, inc_args_list)
def test_get_collection_list(self):
self.api.get_collection_list(None)
self._test_ws_function_args(self.ws.get)
self.assertInPath(self.ws.get, "collection")
self.assertNotInPath(self.ws.get, "releases")
self._test_ws_function_args(self.ws.get_url)
self.assertInPath(self.ws.get_url, "collection")
self.assertNotInPath(self.ws.get_url, "releases")
def test_put_collection(self):
self.api.put_to_collection("1", ["1", "2", "3"], None)
self._test_ws_function_args(self.ws.put)
self.assertInPath(self.ws.put, "collection/1/releases/1;2;3")
self._test_ws_function_args(self.ws.put_url)
self.assertInPath(self.ws.put_url, "collection/1/releases/1;2;3")
def test_delete_collection(self):
self.api.delete_from_collection("1", ["1", "2", "3", "4"] * 200, None)
collection_string = ";".join(["1", "2", "3", "4"] * 100)
self._test_ws_function_args(self.ws.delete)
self.assertInPath(self.ws.delete, "collection/1/releases/" + collection_string)
self.assertNotInPath(self.ws.delete, collection_string + ";" + collection_string)
self.assertEqual(self.ws.delete.call_count, 2)
self._test_ws_function_args(self.ws.delete_url)
self.assertInPath(self.ws.delete_url, "collection/1/releases/" + collection_string)
self.assertNotInPath(self.ws.delete_url, collection_string + ";" + collection_string)
self.assertEqual(self.ws.delete_url.call_count, 2)
def test_xml_ratings_empty(self):
ratings = dict()
@@ -206,14 +204,19 @@ class MBAPITest(PicardTestCase):
releases = tuple("r"+str(i) for i in range(13))
generator = self.api._collection_request("test", releases, batchsize=5)
batch = next(generator)
self.assertEqual(batch, ('collection', 'test', 'releases', 'r0;r1;r2;r3;r4'))
self.assertEqual(batch, '/collection/test/releases/r0;r1;r2;r3;r4')
batch = next(generator)
self.assertEqual(batch, ('collection', 'test', 'releases', 'r5;r6;r7;r8;r9'))
self.assertEqual(batch, '/collection/test/releases/r5;r6;r7;r8;r9')
batch = next(generator)
self.assertEqual(batch, ('collection', 'test', 'releases', 'r10;r11;r12'))
self.assertEqual(batch, '/collection/test/releases/r10;r11;r12')
with self.assertRaises(StopIteration):
next(generator)
def test_make_inc_arg(self):
result = self.api._make_inc_arg(['b', 'a', '', 1, (), 0])
expected = '1+a+b'
self.assertEqual(result, expected)
class AcoustdIdAPITest(PicardTestCase):

View File

@@ -58,6 +58,7 @@ from picard.util import (
any_exception_isinstance,
build_qurl,
detect_unicode_encoding,
encoded_queryargs,
extract_year_from_date,
find_best_match,
is_absolute_path,
@@ -755,6 +756,15 @@ class BuildQUrlTest(PicardTestCase):
self.assertEqual(expected, build_qurl(host, port=443).toDisplayString())
self.assertEqual(expected, build_qurl(host, port=8080).toDisplayString())
def test_encoded_queryargs(self):
query = encoded_queryargs({'foo': ' %20&;', 'bar': '=%+?abc'})
self.assertEqual('%20%2520%26%3B', query['foo'])
self.assertEqual('%3D%25%2B%3Fabc', query['bar'])
# spaces are decoded in displayed string
expected = 'http://example.com?foo= %2520%26%3B&bar=%3D%25%2B%3Fabc'
result = build_qurl('example.com', queryargs=query).toDisplayString()
self.assertEqual(expected, result)
class NormpathTest(PicardTestCase):

View File

@@ -46,6 +46,11 @@ from picard.webservice import (
WSRequest,
ratecontrol,
)
from picard.webservice.utils import (
host_port_to_url,
hostkey_from_url,
port_from_qurl,
)
PROXY_SETTINGS = {
@@ -100,6 +105,30 @@ class WebServiceTest(PicardTestCase):
self.assertIn("GET", get_wsreq(mock_add_task).method)
self.assertEqual(5, mock_add_task.call_count)
@patch.object(WebService, 'add_task')
def test_webservice_url_method_calls(self, mock_add_task):
url = "http://abc.xyz"
handler = dummy_handler
data = None
def get_wsreq(mock_add_task):
return mock_add_task.call_args[0][1]
self.ws.get_url(url=url, handler=handler)
self.assertEqual(1, mock_add_task.call_count)
self.assertEqual('abc.xyz', get_wsreq(mock_add_task).host)
self.assertEqual(80, get_wsreq(mock_add_task).port)
self.assertIn("GET", get_wsreq(mock_add_task).method)
self.ws.post_url(url=url, data=data, handler=handler)
self.assertIn("POST", get_wsreq(mock_add_task).method)
self.ws.put_url(url=url, data=data, handler=handler)
self.assertIn("PUT", get_wsreq(mock_add_task).method)
self.ws.delete_url(url=url, handler=handler)
self.assertIn("DELETE", get_wsreq(mock_add_task).method)
self.ws.download_url(url=url, handler=handler)
self.assertIn("GET", get_wsreq(mock_add_task).method)
self.assertEqual(5, mock_add_task.call_count)
class WebServiceTaskTest(PicardTestCase):
@@ -497,3 +526,74 @@ class WSRequestTest(PicardTestCase):
self.assertTrue(TEMP_ERRORS_RETRIES > 1)
self.assertEqual(request.mark_for_retry(), 1)
self.assertFalse(request.max_retries_reached())
def test_queryargs(self):
request = WSRequest(
url='http://example.org/path?a=1',
method='GET',
handler=dummy_handler,
queryargs={'a': 2, 'b': 'x%20x', 'c': '1+2', 'd': '&', 'e': '?'},
)
expected = 'http://example.org/path?a=1&a=2&b=x x&c=1+2&d=%26&e=?'
self.assertEqual(request.url().toString(), expected)
def test_unencoded_queryargs(self):
request = WSRequest(
url='http://example.org/path?a=1',
method='GET',
handler=dummy_handler,
unencoded_queryargs={'a': 2, 'b': 'x%20x', 'c': '1+2', 'd': '&', 'e': '?'},
)
expected = 'http://example.org/path?a=1&a=2&b=x%2520x&c=1%2B2&d=%26&e=%3F'
self.assertEqual(request.url().toString(), expected)
def test_mixed_queryargs(self):
request = WSRequest(
url='http://example.org/path?a=1',
method='GET',
handler=dummy_handler,
queryargs={'a': '2&', 'b': '1&', 'c': '&'},
unencoded_queryargs={'a': '1&', 'b': '2&', 'd': '&'},
)
expected = 'http://example.org/path?a=1&a=1%26&b=2%26&c=%26&d=%26'
self.assertEqual(request.url().toString(), expected)
class WebServiceUtilsTest(PicardTestCase):
def test_port_from_qurl_http(self):
self.assertEqual(port_from_qurl(QUrl('http://example.org')), 80)
def test_port_from_qurl_http_other(self):
self.assertEqual(port_from_qurl(QUrl('http://example.org:666')), 666)
def test_port_from_qurl_https(self):
self.assertEqual(port_from_qurl(QUrl('https://example.org')), 443)
def test_port_from_qurl_https_other(self):
self.assertEqual(port_from_qurl(QUrl('https://example.org:666')), 666)
def test_port_from_qurl_exception(self):
with self.assertRaises(AttributeError):
port_from_qurl('xxx')
def test_hostkey_from_qurl_http(self):
self.assertEqual(hostkey_from_url(QUrl('http://example.org')), ('example.org', 80))
def test_hostkey_from_url_https_other(self):
self.assertEqual(hostkey_from_url('https://example.org:666'), ('example.org', 666))
def test_host_port_to_url_http_80(self):
self.assertEqual(host_port_to_url('example.org', 80, as_string=True), 'http://example.org')
def test_host_port_to_url_http_80_qurl(self):
self.assertEqual(host_port_to_url('example.org', 80).toString(), 'http://example.org')
def test_host_port_to_url_https_443(self):
self.assertEqual(host_port_to_url('example.org', 443, as_string=True), 'https://example.org')
def test_host_port_to_url_https_scheme_80(self):
self.assertEqual(host_port_to_url('example.org', 80, scheme='https', as_string=True), 'https://example.org:80')
def test_host_port_to_url_http_666_with_path(self):
self.assertEqual(host_port_to_url('example.org', 666, path='/abc', as_string=True), 'http://example.org:666/abc')