diff --git a/picard/cluster.py b/picard/cluster.py index 1f0320b5b..aa06d9f60 100644 --- a/picard/cluster.py +++ b/picard/cluster.py @@ -40,7 +40,8 @@ class Cluster(QtCore.QObject, Item): self.hide_if_empty = hide_if_empty self.related_album = related_album self.files = [] - self.lookup_queued = False + + self.lookup_task = None # Weights for different elements when comparing a cluster to a release self.comparison_weights = { 'album' : 17, 'artist' : 6, 'totaltracks' : 5, 'releasecountry': 2, 'format': 2 } @@ -152,7 +153,7 @@ class Cluster(QtCore.QObject, Item): return reduce(lambda x, y: x + y[0] * y[1] / total, parts, 0.0) def _lookup_finished(self, document, http, error): - self._signal_lookup_finished() + self.lookup_task = None try: releases = document.metadata[0].release_list[0].release @@ -169,7 +170,7 @@ class Cluster(QtCore.QObject, Item): for release in releases: matches.append((self._compare_to_release(release), release)) matches.sort(reverse=True) - self.log.debug("Matches: %r", matches) + #self.log.debug("Matches: %r", matches) if matches[0][0] < self.config.setting['cluster_lookup_threshold']: self.tagger.window.set_statusbar_message(N_("No matching releases for cluster %s"), self.metadata['album'], timeout=3000) @@ -177,21 +178,20 @@ class Cluster(QtCore.QObject, Item): self.tagger.window.set_statusbar_message(N_("Cluster %s identified!"), self.metadata['album'], timeout=3000) self.tagger.move_files_to_album(self.files, matches[0][1].id) - def _signal_lookup_finished(self): - if self.lookup_queued: - self.lookup_queued = False - self.emit(QtCore.SIGNAL("lookup_finished")) - def lookup_metadata(self): """ Try to identify the cluster using the existing metadata. """ self.tagger.window.set_statusbar_message(N_("Looking up the metadata for cluster %s..."), self.metadata['album']) - QtCore.QTimer.singleShot(10000, self._signal_lookup_finished) - self.tagger.xmlws.find_releases(self._lookup_finished, + self.lookup_task = self.tagger.xmlws.find_releases(self._lookup_finished, artist=self.metadata.get('artist', ''), release=self.metadata.get('album', ''), tracks=str(len(self.files)), limit=25) + def clear_lookup_task(self): + if self.lookup_task: + self.tagger.xmlws.remove_task(self.lookup_task) + self.lookup_task = None + @staticmethod def cluster(files, threshold): artistDict = ClusterDict() @@ -260,8 +260,6 @@ class UnmatchedFiles(Cluster): self.tagger.window.enable_cluster(self.get_num_files() > 0) def lookup_metadata(self): - self.lookup_queued = False - self.emit(QtCore.SIGNAL("lookup_finished")) self.tagger.autotag(self.files) diff --git a/picard/file.py b/picard/file.py index e7bda162a..b08cf6949 100644 --- a/picard/file.py +++ b/picard/file.py @@ -84,7 +84,8 @@ class File(LockableObject, Item): self.similarity = 1.0 self.parent = None - self.lookup_queued = False + + self.lookup_task = None self.comparison_weights = {"title": 13, "artist": 4, "album": 5, "length": 10, "totaltracks": 4, "releasetype": 20, @@ -327,6 +328,7 @@ class File(LockableObject, Item): def move(self, parent): if parent != self.parent: self.log.debug("Moving %r from %r to %r", self, self.parent, parent) + self.clear_lookup_task() if self.parent: self.clear_pending() self.parent.remove_file(self) @@ -487,7 +489,7 @@ class File(LockableObject, Item): return max(scores, key=lambda x: x[0]) def _lookup_finished(self, lookuptype, document, http, error): - self._signal_lookup_finished() + self.lookup_task = None if self.state == File.REMOVED: return @@ -515,7 +517,7 @@ class File(LockableObject, Item): score, release = self._compare_to_track(track) matches.append((score, track, release)) matches.sort(reverse=True) - self.log.debug("Track matches: %r", matches) + #self.log.debug("Track matches: %r", matches) if lookuptype != 'puid': threshold = self.config.setting['file_lookup_threshold'] @@ -535,25 +537,22 @@ class File(LockableObject, Item): else: self.tagger.move_file_to_nat(self, track.id, node=track) - def _signal_lookup_finished(self): - if self.lookup_queued: - self.lookup_queued = False - self.emit(QtCore.SIGNAL("lookup_finished")) - def lookup_trackid(self, trackid): """ Try to identify the file using the trackid. """ - self.tagger.xmlws.get_track_by_id(trackid, partial(self._lookup_finished, 'trackid')) + self.clear_lookup_task() + self.lookup_task = self.tagger.xmlws.get_track_by_id(trackid, partial(self._lookup_finished, 'trackid')) def lookup_puid(self, puid): """ Try to identify the file using the PUID. """ self.tagger.window.set_statusbar_message(N_("Looking up the PUID for file %s..."), self.filename) - self.tagger.xmlws.lookup_puid(puid, partial(self._lookup_finished, 'puid')) + self.clear_lookup_task() + self.lookup_task = self.tagger.xmlws.lookup_puid(puid, partial(self._lookup_finished, 'puid')) def lookup_metadata(self): """ Try to identify the file using the existing metadata. """ self.tagger.window.set_statusbar_message(N_("Looking up the metadata for file %s..."), self.filename) - QtCore.QTimer.singleShot(10000, self._signal_lookup_finished) - self.tagger.xmlws.find_tracks(partial(self._lookup_finished, 'metadata'), + self.clear_lookup_task() + self.lookup_task = self.tagger.xmlws.find_tracks(partial(self._lookup_finished, 'metadata'), track=self.metadata.get('title', ''), artist=self.metadata.get('artist', ''), release=self.metadata.get('album', ''), @@ -562,6 +561,11 @@ class File(LockableObject, Item): qdur=str(self.metadata.length / 2000), limit=25) + def clear_lookup_task(self): + if self.lookup_task: + self.tagger.xmlws.remove_task(self.lookup_task) + self.lookup_task = None + def set_pending(self): if self.state == File.REMOVED: return diff --git a/picard/tagger.py b/picard/tagger.py index 1eaa667c7..5d5d49665 100644 --- a/picard/tagger.py +++ b/picard/tagger.py @@ -209,9 +209,6 @@ class Tagger(QtGui.QApplication): self.nats = None - self.lookup_queue = queue.Queue() - self.lookup_running = False - def setup_gettext(self, localedir): """Setup locales, load translations, install gettext functions.""" if self.config.setting["ui_language"]: @@ -498,7 +495,7 @@ class Tagger(QtGui.QApplication): """Remove files from the tagger.""" for file in files: if self.files.has_key(file.filename): - self.lookup_queue.remove(file) + file.clear_lookup_task() self.analyze_queue.remove(file.filename) del self.files[file.filename] file.remove(from_parent) @@ -516,7 +513,7 @@ class Tagger(QtGui.QApplication): self.log.debug("Removing %r", cluster) files = list(cluster.files) cluster.files = [] - self.lookup_queue.remove(cluster) + cluster.clear_lookup_task() self.remove_files(files, from_parent=False) self.clusters.remove(cluster) self.emit(QtCore.SIGNAL("cluster_removed"), cluster) @@ -582,20 +579,8 @@ class Tagger(QtGui.QApplication): def autotag(self, objects): for obj in objects: - if isinstance(obj, (File, Cluster)): - if not obj.lookup_queued: - obj.lookup_queued = True - self.lookup_queue.put(obj) - self.connect(obj, QtCore.SIGNAL("lookup_finished"), self._run_next_lookup) - if not self.lookup_running: - self.lookup_running = True - self._run_next_lookup() - - def _run_next_lookup(self): - if self.lookup_queue.qsize() > 0: - self.lookup_queue.get().lookup_metadata() - else: - self.lookup_running = False + if isinstance(obj, (File, Cluster)) and not obj.lookup_task: + obj.lookup_metadata() # ======================================================================= # Clusters diff --git a/picard/webservice.py b/picard/webservice.py index 05d7298c7..a68169ee0 100644 --- a/picard/webservice.py +++ b/picard/webservice.py @@ -27,13 +27,14 @@ import os import sys import re import traceback +from collections import deque, defaultdict from PyQt4 import QtCore, QtNetwork, QtXml from picard import version_string from picard.util import partial from picard.const import PUID_SUBMIT_HOST, PUID_SUBMIT_PORT -REQUEST_DELAY = 1000 +REQUEST_DELAY = defaultdict(lambda: 1000) USER_AGENT_STRING = 'MusicBrainz%%20Picard-%s' % version_string @@ -41,10 +42,6 @@ def _escape_lucene_query(text): return re.sub(r'([+\-&|!(){}\[\]\^"~*?:\\])', r'\\\1', text) -def _node_name(name): - return re.sub('[^a-zA-Z0-9]', '_', unicode(name)) - - def _wrap_xml_metadata(data): return ('' + '%s' % data) @@ -75,13 +72,15 @@ class XmlHandler(QtXml.QXmlDefaultHandler): def init(self): self.document = XmlNode() self.node = self.document + _node_name_re = re.compile('[^a-zA-Z0-9]') + self._node_name = lambda n: _node_name_re.sub('_', unicode(n)) self.path = [] def startElement(self, namespace, name, qname, attrs): node = XmlNode() for i in xrange(attrs.count()): - node.attribs[_node_name(attrs.localName(i))] = unicode(attrs.value(i)) - self.node.children.setdefault(_node_name(name), []).append(node) + node.attribs[self._node_name(attrs.localName(i))] = unicode(attrs.value(i)) + self.node.children.setdefault(self._node_name(name), []).append(node) self.path.append(self.node) self.node = node return True @@ -95,19 +94,6 @@ class XmlHandler(QtXml.QXmlDefaultHandler): return True -class XmlWebServiceRequest(object): - - def __init__(self, request, reply, handler, xml=True): - self.request = request - self.reply = reply - self.handler = handler - self.xml = xml - self.finished = False - - def errorString(self): - return str(self.reply.errorString()) - - class XmlWebService(QtCore.QObject): """ Signals: @@ -122,9 +108,10 @@ class XmlWebService(QtCore.QObject): self.manager.connect(self.manager, QtCore.SIGNAL("authenticationRequired(QNetworkReply *, QAuthenticator *)"), self._site_authenticate) self.manager.connect(self.manager, QtCore.SIGNAL("proxyAuthenticationRequired(QNetworkProxy *, QAuthenticator *)"), self._proxy_authenticate) self._last_request_times = {} - self._active_hosts = set() self._active_requests = {} - self._queue = [] + self._high_priority_queues = {} + self._low_priority_queues = {} + self._hosts = [] self._timer = QtCore.QTimer(self) self._timer.setSingleShot(True) self._timer.timeout.connect(self._run_next_task) @@ -139,93 +126,63 @@ class XmlWebService(QtCore.QObject): self.proxy.setPassword(self.config.setting["proxy_password"]) self.manager.setProxy(self.proxy) - def _prepare_request(self, method, host, port, path, username=None, password=None): + def _start_request(self, method, send, host, port, path, data, handler, xml, mblogin=False): self.log.debug("%s http://%s:%d%s", method, host, port, path) - self.url = QtCore.QUrl.fromEncoded("http://%s:%d%s" % (host, port, path)) - if username: - self.url.setUserName(username) - self.url.setPassword(password) - self.genrequest = QtNetwork.QNetworkRequest(self.url) - self.genrequest.setRawHeader("User-Agent", "MusicBrainz-Picard/%s" % version_string) - if method == "POST": - contenttype = "application/x-www-form-urlencoded" if host == "ofa.musicdns.org" else "application/xml; charset=utf-8" - self.genrequest.setHeader(QtNetwork.QNetworkRequest.ContentTypeHeader, contenttype) - return self.genrequest - - def _start_request(self, host, port, request): - key = host, port + url = QtCore.QUrl.fromEncoded("http://%s:%d%s" % (host, port, path)) + if mblogin: + url.setUserName(self.config.setting["username"]) + url.setPassword(self.config.setting["password"]) + request = QtNetwork.QNetworkRequest(url) + request.setRawHeader("User-Agent", "MusicBrainz-Picard/%s" % version_string) + if method == "POST" and host == self.config.setting["server_host"]: + request.setHeader(QtNetwork.QNetworkRequest.ContentTypeHeader, "application/xml; charset=utf-8") + reply = send(request, data) if data is not None else send(request) + key = (host, port) self._last_request_times[key] = QtCore.QTime.currentTime() - #print "starting request", key, request.reply, self._last_request_times[key] - request.key = key - self._active_requests[request.reply] = request - self._active_hosts.add(key) - - def _finish_request(self): - for reply, request in self._active_requests.items(): - if request.finished: - self._active_hosts.remove(request.key) - del self._active_requests[reply] - self._timer.start(0) + self._active_requests[reply] = (request, handler, xml) + return True def _process_reply(self, reply): try: - #print "finishing request", reply - request = self._active_requests.get(reply) - if request is None: - print "**** request not found", reply.request().url(), reply - return - request.finished = True - error = int(reply.error()) - if request.handler is not None: - if error: - #print "ERROR", reply.error(), reply.errorString() - #for name in reply.rawHeaderList(): - # print name, reply.rawHeader(name) - self.log.debug("HTTP Error: %d", error) - if request.xml: - xml_handler = XmlHandler() - xml_handler.init() - xml_reader = QtXml.QXmlSimpleReader() - xml_reader.setContentHandler(xml_handler) - xml_input = QtXml.QXmlInputSource(reply) - xml_reader.parse(xml_input) - request.handler(xml_handler.document, request, error) - else: - request.handler(str(reply.readAll()), request, error) - reply.close() - finally: - QtCore.QTimer.singleShot(0, self._finish_request) + request, handler, xml = self._active_requests.pop(reply) + except KeyError: + self.log.error("Error: Request not found for %s" % str(reply.request().url().toString())) + return + error = int(reply.error()) + if handler is not None: + if error: + #print "ERROR", reply.error(), reply.errorString() + #for name in reply.rawHeaderList(): + # print name, reply.rawHeader(name) + self.log.debug("HTTP Error: %d", error) + if xml: + xml_handler = XmlHandler() + xml_handler.init() + xml_reader = QtXml.QXmlSimpleReader() + xml_reader.setContentHandler(xml_handler) + xml_input = QtXml.QXmlInputSource(reply) + xml_reader.parse(xml_input) + handler(xml_handler.document, request, error) + else: + handler(str(reply.readAll()), request, error) + reply.close() - def _get(self, host, port, path, handler, xml=True, mblogin=False): - if mblogin: - self.username = self.config.setting["username"] - self.password = self.config.setting["password"] - request = self._prepare_request("GET", host, port, path, self.username, self.password) - else: - request = self._prepare_request("GET", host, port, path) - reply = self.manager.get(request) - self._start_request(host, port, XmlWebServiceRequest(request, reply, handler, xml)) - return True + def get(self, host, port, path, handler, xml=True, priority=False, important=False, mblogin=False): + func = partial(self._start_request, "GET", self.manager.get, host, port, path, None, handler, xml, mblogin) + return self.add_task(func, host, port, priority, important=important) - def _post(self, host, port, path, data, handler, mblogin=True): + def post(self, host, port, path, data, handler, xml=True, priority=False, important=False, mblogin=True): self.log.debug("POST-DATA %r", data) - if mblogin: - self.username = self.config.setting["username"] - self.password = self.config.setting["password"] - request = self._prepare_request("POST", host, port, path, self.username, self.password) - else: - request = self._prepare_request("POST", host, port, path) - reply = self.manager.post(request, data) - self._start_request(host, port, XmlWebServiceRequest(request, reply, handler)) - return True + func = partial(self._start_request, "POST", self.manager.post, host, port, path, data, handler, xml, mblogin) + return self.add_task(func, host, port, priority, important=important) - def get(self, host, port, path, handler, xml=True, position=None, mblogin=False): - func = partial(self._get, host, port, path, handler, xml, mblogin) - self.add_task(func, host, port, position) + def put(self, host, port, path, data, handler, priority=False, important=False, mblogin=True): + func = partial(self._start_request, "PUT", self.manager.put, host, port, path, data, handler, False, mblogin) + return self.add_task(func, host, port, priority, important=important) - def post(self, host, port, path, data, handler, position=None, mblogin=True): - func = partial(self._post, host, port, path, data, handler, mblogin) - self.add_task(func, host, port, position) + def delete(self, host, port, path, handler, priority=False, important=False, mblogin=True): + func = partial(self._start_request, "DELETE", self.manager.deleteResource, host, port, path, None, handler, False, mblogin) + return self.add_task(func, host, port, priority, important=important) def _site_authenticate(self, reply, authenticator): self.emit(QtCore.SIGNAL("authentication_required"), reply, authenticator) @@ -234,63 +191,85 @@ class XmlWebService(QtCore.QObject): self.emit(QtCore.SIGNAL("proxyAuthentication_required"), proxy, authenticator) def stop(self): - for request in self._active_requests.itervalues(): + self._high_priority_queues = {} + self._low_priority_queues = {} + for request in self._active_requests.values(): request.reply.abort() def _run_next_task(self): - delay, index, key = sys.maxint, None, None - now = QtCore.QTime.currentTime() - for i, (k, task) in enumerate(self._queue): - if k == key or k in self._active_hosts: + delay = sys.maxint + for key in self._hosts: + queue = self._high_priority_queues.get(key) or self._low_priority_queues.get(key) + if not queue: continue - last = self._last_request_times.get(k) - last_ms = last.msecsTo(now) if last is not None else REQUEST_DELAY - if last_ms >= REQUEST_DELAY: - self.log.debug("Last request to %s was %d ms ago, starting another one", k, last_ms) - del self._queue[i] - task() - return - d = REQUEST_DELAY - last_ms + now = QtCore.QTime.currentTime() + last = self._last_request_times.get(key) + request_delay = REQUEST_DELAY[key] + last_ms = last.msecsTo(now) if last is not None else request_delay + if last_ms >= request_delay: + self.log.debug("Last request to %s was %d ms ago, starting another one", key, last_ms) + d = request_delay + queue.popleft()() + else: + d = request_delay - last_ms + self.log.debug("Waiting %d ms before starting another request to %s", d, key) if d < delay: - delay, index, key = d, i, k - if index is not None and not self._timer.isActive(): - self.log.debug("Waiting %d ms before starting another request to %s", - delay, key) + delay = d + if delay < sys.maxint: self._timer.start(delay) - def add_task(self, func, host, port, position=None): + def add_task(self, func, host, port, priority, important=False): key = (host, port) - if position is None: - self._queue.append((key, func)) + if key not in self._hosts: + self._hosts.append(key) + if priority: + queues = self._high_priority_queues else: - self._queue.insert(position, (key, func)) - if key not in self._active_hosts: + queues = self._low_priority_queues + queues.setdefault(key, deque()) + if important: + queues[key].appendleft(func) + else: + queues[key].append(func) + if not self._timer.isActive(): self._timer.start(0) + return (key, func, priority) - def _get_by_id(self, entitytype, entityid, handler, inc=[], mblogin=False): + def remove_task(self, task): + key, func, priority = task + if priority: + queue = self._high_priority_queues[key] + else: + queue = self._low_priority_queues[key] + try: + queue.remove(func) + except: + pass + + def _get_by_id(self, entitytype, entityid, handler, inc=[], params=[], priority=False, important=False, mblogin=False): host = self.config.setting["server_host"] port = self.config.setting["server_port"] - path = "/ws/2/%s/%s?inc=%s" % (entitytype, entityid, "+".join(inc)) - if entitytype == "discid": path += "&cdstubs=no" - self.get(host, port, path, handler, mblogin=mblogin) + path = "/ws/2/%s/%s?inc=%s&%s" % (entitytype, entityid, "+".join(inc), "&".join(params)) + return self.get(host, port, path, handler, priority=priority, important=important, mblogin=mblogin) - def get_release_group_by_id(self, releasegroupid, handler): + def get_release_group_by_id(self, releasegroupid, handler, priority=True, important=False): inc = ['releases', 'media'] - self._get_by_id('release-group', releasegroupid, handler, inc) + return self._get_by_id('release-group', releasegroupid, handler, inc, priority=priority, important=important) - def get_release_by_id(self, releaseid, handler, inc=[], mblogin=False): - self._get_by_id('release', releaseid, handler, inc, mblogin=mblogin) + def get_release_by_id(self, releaseid, handler, inc=[], priority=True, important=False, mblogin=False): + return self._get_by_id('release', releaseid, handler, inc, priority=priority, important=important, mblogin=mblogin) - def get_track_by_id(self, trackid, handler): + def get_track_by_id(self, trackid, handler, priority=False, important=False): inc = ['releases', 'release-groups', 'media', 'artist-credits'] - self._get_by_id('recording', trackid, handler, inc) + return self._get_by_id('recording', trackid, handler, inc, priority=priority, important=important) - def lookup_puid(self, puid, handler): + def lookup_puid(self, puid, handler, priority=False, important=False): inc = ['releases', 'release-groups', 'media', 'artist-credits'] - self._get_by_id('puid', puid, handler, inc) + return self._get_by_id('puid', puid, handler, inc, priority=False, important=False) - def lookup_discid(self, discid, handler): - self._get_by_id('discid', discid, handler, ['artist-credits', 'labels']) + def lookup_discid(self, discid, handler, priority=True, important=True): + inc = ['artist-credits', 'labels'] + return self._get_by_id('discid', discid, handler, inc, params=["cdstubs=no"], priority=priority, important=important) def _find(self, entitytype, handler, kwargs): host = self.config.setting["server_host"] @@ -309,19 +288,19 @@ class XmlWebService(QtCore.QObject): value = str(QtCore.QUrl.toPercentEncoding(QtCore.QString(value))) params.append('%s=%s' % (str(name), value)) path = "/ws/2/%s/?%s" % (entitytype, "&".join(params)) - self.get(host, port, path, handler) + return self.get(host, port, path, handler) def find_releases(self, handler, **kwargs): - self._find('release', handler, kwargs) + return self._find('release', handler, kwargs) def find_tracks(self, handler, **kwargs): - self._find('recording', handler, kwargs) + return self._find('recording', handler, kwargs) def submit_puids(self, puids, handler): path = '/ws/2/recording/?client=' + USER_AGENT_STRING recordings = ''.join(['' % i for i in puids.items()]) data = _wrap_xml_metadata('%s' % recordings) - self.post(PUID_SUBMIT_HOST, PUID_SUBMIT_PORT, path, data, handler) + return self.post(PUID_SUBMIT_HOST, PUID_SUBMIT_PORT, path, data, handler, priority=True, important=True) def submit_ratings(self, ratings, handler): host = self.config.setting['server_host'] @@ -330,16 +309,15 @@ class XmlWebService(QtCore.QObject): recordings = (''.join(['%s' % (i[1], j*20) for i, j in ratings.items() if i[0] == 'recording'])) data = _wrap_xml_metadata('%s' % recordings) - self.post(host, port, path, data, handler) + return self.post(host, port, path, data, handler, priority=True, important=True) def query_musicdns(self, handler, **kwargs): - host = 'ofa.musicdns.org' - port = 80 + host, port = 'ofa.musicdns.org', 80 filters = [] for name, value in kwargs.items(): value = str(QtCore.QUrl.toPercentEncoding(value)) filters.append('%s=%s' % (str(name), value)) - self.post(host, port, '/ofa/1/track/', '&'.join(filters), handler, mblogin = False) + return self.post(host, port, '/ofa/1/track/', '&'.join(filters), handler, mblogin=False) - def download(self, host, port, path, handler, position=None): - self.get(host, port, path, handler, xml=False, position=position) + def download(self, host, port, path, handler, priority=False, important=False): + return self.get(host, port, path, handler, xml=False, priority=priority, important=important)