Changes to lookup/request queuing based on recent mb-devel discussion.

This commit is contained in:
Michael Wiencek
2011-07-05 19:40:57 -05:00
parent c1d6b2e0b3
commit 7b23daee47
4 changed files with 153 additions and 188 deletions

View File

@@ -40,7 +40,8 @@ class Cluster(QtCore.QObject, Item):
self.hide_if_empty = hide_if_empty
self.related_album = related_album
self.files = []
self.lookup_queued = False
self.lookup_task = None
# Weights for different elements when comparing a cluster to a release
self.comparison_weights = { 'album' : 17, 'artist' : 6, 'totaltracks' : 5, 'releasecountry': 2, 'format': 2 }
@@ -152,7 +153,7 @@ class Cluster(QtCore.QObject, Item):
return reduce(lambda x, y: x + y[0] * y[1] / total, parts, 0.0)
def _lookup_finished(self, document, http, error):
self._signal_lookup_finished()
self.lookup_task = None
try:
releases = document.metadata[0].release_list[0].release
@@ -169,7 +170,7 @@ class Cluster(QtCore.QObject, Item):
for release in releases:
matches.append((self._compare_to_release(release), release))
matches.sort(reverse=True)
self.log.debug("Matches: %r", matches)
#self.log.debug("Matches: %r", matches)
if matches[0][0] < self.config.setting['cluster_lookup_threshold']:
self.tagger.window.set_statusbar_message(N_("No matching releases for cluster %s"), self.metadata['album'], timeout=3000)
@@ -177,21 +178,20 @@ class Cluster(QtCore.QObject, Item):
self.tagger.window.set_statusbar_message(N_("Cluster %s identified!"), self.metadata['album'], timeout=3000)
self.tagger.move_files_to_album(self.files, matches[0][1].id)
def _signal_lookup_finished(self):
if self.lookup_queued:
self.lookup_queued = False
self.emit(QtCore.SIGNAL("lookup_finished"))
def lookup_metadata(self):
""" Try to identify the cluster using the existing metadata. """
self.tagger.window.set_statusbar_message(N_("Looking up the metadata for cluster %s..."), self.metadata['album'])
QtCore.QTimer.singleShot(10000, self._signal_lookup_finished)
self.tagger.xmlws.find_releases(self._lookup_finished,
self.lookup_task = self.tagger.xmlws.find_releases(self._lookup_finished,
artist=self.metadata.get('artist', ''),
release=self.metadata.get('album', ''),
tracks=str(len(self.files)),
limit=25)
def clear_lookup_task(self):
if self.lookup_task:
self.tagger.xmlws.remove_task(self.lookup_task)
self.lookup_task = None
@staticmethod
def cluster(files, threshold):
artistDict = ClusterDict()
@@ -260,8 +260,6 @@ class UnmatchedFiles(Cluster):
self.tagger.window.enable_cluster(self.get_num_files() > 0)
def lookup_metadata(self):
self.lookup_queued = False
self.emit(QtCore.SIGNAL("lookup_finished"))
self.tagger.autotag(self.files)

View File

@@ -84,7 +84,8 @@ class File(LockableObject, Item):
self.similarity = 1.0
self.parent = None
self.lookup_queued = False
self.lookup_task = None
self.comparison_weights = {"title": 13, "artist": 4, "album": 5,
"length": 10, "totaltracks": 4, "releasetype": 20,
@@ -327,6 +328,7 @@ class File(LockableObject, Item):
def move(self, parent):
if parent != self.parent:
self.log.debug("Moving %r from %r to %r", self, self.parent, parent)
self.clear_lookup_task()
if self.parent:
self.clear_pending()
self.parent.remove_file(self)
@@ -487,7 +489,7 @@ class File(LockableObject, Item):
return max(scores, key=lambda x: x[0])
def _lookup_finished(self, lookuptype, document, http, error):
self._signal_lookup_finished()
self.lookup_task = None
if self.state == File.REMOVED:
return
@@ -515,7 +517,7 @@ class File(LockableObject, Item):
score, release = self._compare_to_track(track)
matches.append((score, track, release))
matches.sort(reverse=True)
self.log.debug("Track matches: %r", matches)
#self.log.debug("Track matches: %r", matches)
if lookuptype != 'puid':
threshold = self.config.setting['file_lookup_threshold']
@@ -535,25 +537,22 @@ class File(LockableObject, Item):
else:
self.tagger.move_file_to_nat(self, track.id, node=track)
def _signal_lookup_finished(self):
if self.lookup_queued:
self.lookup_queued = False
self.emit(QtCore.SIGNAL("lookup_finished"))
def lookup_trackid(self, trackid):
""" Try to identify the file using the trackid. """
self.tagger.xmlws.get_track_by_id(trackid, partial(self._lookup_finished, 'trackid'))
self.clear_lookup_task()
self.lookup_task = self.tagger.xmlws.get_track_by_id(trackid, partial(self._lookup_finished, 'trackid'))
def lookup_puid(self, puid):
""" Try to identify the file using the PUID. """
self.tagger.window.set_statusbar_message(N_("Looking up the PUID for file %s..."), self.filename)
self.tagger.xmlws.lookup_puid(puid, partial(self._lookup_finished, 'puid'))
self.clear_lookup_task()
self.lookup_task = self.tagger.xmlws.lookup_puid(puid, partial(self._lookup_finished, 'puid'))
def lookup_metadata(self):
""" Try to identify the file using the existing metadata. """
self.tagger.window.set_statusbar_message(N_("Looking up the metadata for file %s..."), self.filename)
QtCore.QTimer.singleShot(10000, self._signal_lookup_finished)
self.tagger.xmlws.find_tracks(partial(self._lookup_finished, 'metadata'),
self.clear_lookup_task()
self.lookup_task = self.tagger.xmlws.find_tracks(partial(self._lookup_finished, 'metadata'),
track=self.metadata.get('title', ''),
artist=self.metadata.get('artist', ''),
release=self.metadata.get('album', ''),
@@ -562,6 +561,11 @@ class File(LockableObject, Item):
qdur=str(self.metadata.length / 2000),
limit=25)
def clear_lookup_task(self):
if self.lookup_task:
self.tagger.xmlws.remove_task(self.lookup_task)
self.lookup_task = None
def set_pending(self):
if self.state == File.REMOVED:
return

View File

@@ -209,9 +209,6 @@ class Tagger(QtGui.QApplication):
self.nats = None
self.lookup_queue = queue.Queue()
self.lookup_running = False
def setup_gettext(self, localedir):
"""Setup locales, load translations, install gettext functions."""
if self.config.setting["ui_language"]:
@@ -498,7 +495,7 @@ class Tagger(QtGui.QApplication):
"""Remove files from the tagger."""
for file in files:
if self.files.has_key(file.filename):
self.lookup_queue.remove(file)
file.clear_lookup_task()
self.analyze_queue.remove(file.filename)
del self.files[file.filename]
file.remove(from_parent)
@@ -516,7 +513,7 @@ class Tagger(QtGui.QApplication):
self.log.debug("Removing %r", cluster)
files = list(cluster.files)
cluster.files = []
self.lookup_queue.remove(cluster)
cluster.clear_lookup_task()
self.remove_files(files, from_parent=False)
self.clusters.remove(cluster)
self.emit(QtCore.SIGNAL("cluster_removed"), cluster)
@@ -582,20 +579,8 @@ class Tagger(QtGui.QApplication):
def autotag(self, objects):
for obj in objects:
if isinstance(obj, (File, Cluster)):
if not obj.lookup_queued:
obj.lookup_queued = True
self.lookup_queue.put(obj)
self.connect(obj, QtCore.SIGNAL("lookup_finished"), self._run_next_lookup)
if not self.lookup_running:
self.lookup_running = True
self._run_next_lookup()
def _run_next_lookup(self):
if self.lookup_queue.qsize() > 0:
self.lookup_queue.get().lookup_metadata()
else:
self.lookup_running = False
if isinstance(obj, (File, Cluster)) and not obj.lookup_task:
obj.lookup_metadata()
# =======================================================================
# Clusters

View File

@@ -27,13 +27,14 @@ import os
import sys
import re
import traceback
from collections import deque, defaultdict
from PyQt4 import QtCore, QtNetwork, QtXml
from picard import version_string
from picard.util import partial
from picard.const import PUID_SUBMIT_HOST, PUID_SUBMIT_PORT
REQUEST_DELAY = 1000
REQUEST_DELAY = defaultdict(lambda: 1000)
USER_AGENT_STRING = 'MusicBrainz%%20Picard-%s' % version_string
@@ -41,10 +42,6 @@ def _escape_lucene_query(text):
return re.sub(r'([+\-&|!(){}\[\]\^"~*?:\\])', r'\\\1', text)
def _node_name(name):
return re.sub('[^a-zA-Z0-9]', '_', unicode(name))
def _wrap_xml_metadata(data):
return ('<?xml version="1.0" encoding="UTF-8"?>' +
'<metadata xmlns="http://musicbrainz.org/ns/mmd-2.0#">%s</metadata>' % data)
@@ -75,13 +72,15 @@ class XmlHandler(QtXml.QXmlDefaultHandler):
def init(self):
self.document = XmlNode()
self.node = self.document
_node_name_re = re.compile('[^a-zA-Z0-9]')
self._node_name = lambda n: _node_name_re.sub('_', unicode(n))
self.path = []
def startElement(self, namespace, name, qname, attrs):
node = XmlNode()
for i in xrange(attrs.count()):
node.attribs[_node_name(attrs.localName(i))] = unicode(attrs.value(i))
self.node.children.setdefault(_node_name(name), []).append(node)
node.attribs[self._node_name(attrs.localName(i))] = unicode(attrs.value(i))
self.node.children.setdefault(self._node_name(name), []).append(node)
self.path.append(self.node)
self.node = node
return True
@@ -95,19 +94,6 @@ class XmlHandler(QtXml.QXmlDefaultHandler):
return True
class XmlWebServiceRequest(object):
def __init__(self, request, reply, handler, xml=True):
self.request = request
self.reply = reply
self.handler = handler
self.xml = xml
self.finished = False
def errorString(self):
return str(self.reply.errorString())
class XmlWebService(QtCore.QObject):
"""
Signals:
@@ -122,9 +108,10 @@ class XmlWebService(QtCore.QObject):
self.manager.connect(self.manager, QtCore.SIGNAL("authenticationRequired(QNetworkReply *, QAuthenticator *)"), self._site_authenticate)
self.manager.connect(self.manager, QtCore.SIGNAL("proxyAuthenticationRequired(QNetworkProxy *, QAuthenticator *)"), self._proxy_authenticate)
self._last_request_times = {}
self._active_hosts = set()
self._active_requests = {}
self._queue = []
self._high_priority_queues = {}
self._low_priority_queues = {}
self._hosts = []
self._timer = QtCore.QTimer(self)
self._timer.setSingleShot(True)
self._timer.timeout.connect(self._run_next_task)
@@ -139,93 +126,63 @@ class XmlWebService(QtCore.QObject):
self.proxy.setPassword(self.config.setting["proxy_password"])
self.manager.setProxy(self.proxy)
def _prepare_request(self, method, host, port, path, username=None, password=None):
def _start_request(self, method, send, host, port, path, data, handler, xml, mblogin=False):
self.log.debug("%s http://%s:%d%s", method, host, port, path)
self.url = QtCore.QUrl.fromEncoded("http://%s:%d%s" % (host, port, path))
if username:
self.url.setUserName(username)
self.url.setPassword(password)
self.genrequest = QtNetwork.QNetworkRequest(self.url)
self.genrequest.setRawHeader("User-Agent", "MusicBrainz-Picard/%s" % version_string)
if method == "POST":
contenttype = "application/x-www-form-urlencoded" if host == "ofa.musicdns.org" else "application/xml; charset=utf-8"
self.genrequest.setHeader(QtNetwork.QNetworkRequest.ContentTypeHeader, contenttype)
return self.genrequest
def _start_request(self, host, port, request):
key = host, port
url = QtCore.QUrl.fromEncoded("http://%s:%d%s" % (host, port, path))
if mblogin:
url.setUserName(self.config.setting["username"])
url.setPassword(self.config.setting["password"])
request = QtNetwork.QNetworkRequest(url)
request.setRawHeader("User-Agent", "MusicBrainz-Picard/%s" % version_string)
if method == "POST" and host == self.config.setting["server_host"]:
request.setHeader(QtNetwork.QNetworkRequest.ContentTypeHeader, "application/xml; charset=utf-8")
reply = send(request, data) if data is not None else send(request)
key = (host, port)
self._last_request_times[key] = QtCore.QTime.currentTime()
#print "starting request", key, request.reply, self._last_request_times[key]
request.key = key
self._active_requests[request.reply] = request
self._active_hosts.add(key)
def _finish_request(self):
for reply, request in self._active_requests.items():
if request.finished:
self._active_hosts.remove(request.key)
del self._active_requests[reply]
self._timer.start(0)
self._active_requests[reply] = (request, handler, xml)
return True
def _process_reply(self, reply):
try:
#print "finishing request", reply
request = self._active_requests.get(reply)
if request is None:
print "**** request not found", reply.request().url(), reply
return
request.finished = True
error = int(reply.error())
if request.handler is not None:
if error:
#print "ERROR", reply.error(), reply.errorString()
#for name in reply.rawHeaderList():
# print name, reply.rawHeader(name)
self.log.debug("HTTP Error: %d", error)
if request.xml:
xml_handler = XmlHandler()
xml_handler.init()
xml_reader = QtXml.QXmlSimpleReader()
xml_reader.setContentHandler(xml_handler)
xml_input = QtXml.QXmlInputSource(reply)
xml_reader.parse(xml_input)
request.handler(xml_handler.document, request, error)
else:
request.handler(str(reply.readAll()), request, error)
reply.close()
finally:
QtCore.QTimer.singleShot(0, self._finish_request)
request, handler, xml = self._active_requests.pop(reply)
except KeyError:
self.log.error("Error: Request not found for %s" % str(reply.request().url().toString()))
return
error = int(reply.error())
if handler is not None:
if error:
#print "ERROR", reply.error(), reply.errorString()
#for name in reply.rawHeaderList():
# print name, reply.rawHeader(name)
self.log.debug("HTTP Error: %d", error)
if xml:
xml_handler = XmlHandler()
xml_handler.init()
xml_reader = QtXml.QXmlSimpleReader()
xml_reader.setContentHandler(xml_handler)
xml_input = QtXml.QXmlInputSource(reply)
xml_reader.parse(xml_input)
handler(xml_handler.document, request, error)
else:
handler(str(reply.readAll()), request, error)
reply.close()
def _get(self, host, port, path, handler, xml=True, mblogin=False):
if mblogin:
self.username = self.config.setting["username"]
self.password = self.config.setting["password"]
request = self._prepare_request("GET", host, port, path, self.username, self.password)
else:
request = self._prepare_request("GET", host, port, path)
reply = self.manager.get(request)
self._start_request(host, port, XmlWebServiceRequest(request, reply, handler, xml))
return True
def get(self, host, port, path, handler, xml=True, priority=False, important=False, mblogin=False):
func = partial(self._start_request, "GET", self.manager.get, host, port, path, None, handler, xml, mblogin)
return self.add_task(func, host, port, priority, important=important)
def _post(self, host, port, path, data, handler, mblogin=True):
def post(self, host, port, path, data, handler, xml=True, priority=False, important=False, mblogin=True):
self.log.debug("POST-DATA %r", data)
if mblogin:
self.username = self.config.setting["username"]
self.password = self.config.setting["password"]
request = self._prepare_request("POST", host, port, path, self.username, self.password)
else:
request = self._prepare_request("POST", host, port, path)
reply = self.manager.post(request, data)
self._start_request(host, port, XmlWebServiceRequest(request, reply, handler))
return True
func = partial(self._start_request, "POST", self.manager.post, host, port, path, data, handler, xml, mblogin)
return self.add_task(func, host, port, priority, important=important)
def get(self, host, port, path, handler, xml=True, position=None, mblogin=False):
func = partial(self._get, host, port, path, handler, xml, mblogin)
self.add_task(func, host, port, position)
def put(self, host, port, path, data, handler, priority=False, important=False, mblogin=True):
func = partial(self._start_request, "PUT", self.manager.put, host, port, path, data, handler, False, mblogin)
return self.add_task(func, host, port, priority, important=important)
def post(self, host, port, path, data, handler, position=None, mblogin=True):
func = partial(self._post, host, port, path, data, handler, mblogin)
self.add_task(func, host, port, position)
def delete(self, host, port, path, handler, priority=False, important=False, mblogin=True):
func = partial(self._start_request, "DELETE", self.manager.deleteResource, host, port, path, None, handler, False, mblogin)
return self.add_task(func, host, port, priority, important=important)
def _site_authenticate(self, reply, authenticator):
self.emit(QtCore.SIGNAL("authentication_required"), reply, authenticator)
@@ -234,63 +191,85 @@ class XmlWebService(QtCore.QObject):
self.emit(QtCore.SIGNAL("proxyAuthentication_required"), proxy, authenticator)
def stop(self):
for request in self._active_requests.itervalues():
self._high_priority_queues = {}
self._low_priority_queues = {}
for request in self._active_requests.values():
request.reply.abort()
def _run_next_task(self):
delay, index, key = sys.maxint, None, None
now = QtCore.QTime.currentTime()
for i, (k, task) in enumerate(self._queue):
if k == key or k in self._active_hosts:
delay = sys.maxint
for key in self._hosts:
queue = self._high_priority_queues.get(key) or self._low_priority_queues.get(key)
if not queue:
continue
last = self._last_request_times.get(k)
last_ms = last.msecsTo(now) if last is not None else REQUEST_DELAY
if last_ms >= REQUEST_DELAY:
self.log.debug("Last request to %s was %d ms ago, starting another one", k, last_ms)
del self._queue[i]
task()
return
d = REQUEST_DELAY - last_ms
now = QtCore.QTime.currentTime()
last = self._last_request_times.get(key)
request_delay = REQUEST_DELAY[key]
last_ms = last.msecsTo(now) if last is not None else request_delay
if last_ms >= request_delay:
self.log.debug("Last request to %s was %d ms ago, starting another one", key, last_ms)
d = request_delay
queue.popleft()()
else:
d = request_delay - last_ms
self.log.debug("Waiting %d ms before starting another request to %s", d, key)
if d < delay:
delay, index, key = d, i, k
if index is not None and not self._timer.isActive():
self.log.debug("Waiting %d ms before starting another request to %s",
delay, key)
delay = d
if delay < sys.maxint:
self._timer.start(delay)
def add_task(self, func, host, port, position=None):
def add_task(self, func, host, port, priority, important=False):
key = (host, port)
if position is None:
self._queue.append((key, func))
if key not in self._hosts:
self._hosts.append(key)
if priority:
queues = self._high_priority_queues
else:
self._queue.insert(position, (key, func))
if key not in self._active_hosts:
queues = self._low_priority_queues
queues.setdefault(key, deque())
if important:
queues[key].appendleft(func)
else:
queues[key].append(func)
if not self._timer.isActive():
self._timer.start(0)
return (key, func, priority)
def _get_by_id(self, entitytype, entityid, handler, inc=[], mblogin=False):
def remove_task(self, task):
key, func, priority = task
if priority:
queue = self._high_priority_queues[key]
else:
queue = self._low_priority_queues[key]
try:
queue.remove(func)
except:
pass
def _get_by_id(self, entitytype, entityid, handler, inc=[], params=[], priority=False, important=False, mblogin=False):
host = self.config.setting["server_host"]
port = self.config.setting["server_port"]
path = "/ws/2/%s/%s?inc=%s" % (entitytype, entityid, "+".join(inc))
if entitytype == "discid": path += "&cdstubs=no"
self.get(host, port, path, handler, mblogin=mblogin)
path = "/ws/2/%s/%s?inc=%s&%s" % (entitytype, entityid, "+".join(inc), "&".join(params))
return self.get(host, port, path, handler, priority=priority, important=important, mblogin=mblogin)
def get_release_group_by_id(self, releasegroupid, handler):
def get_release_group_by_id(self, releasegroupid, handler, priority=True, important=False):
inc = ['releases', 'media']
self._get_by_id('release-group', releasegroupid, handler, inc)
return self._get_by_id('release-group', releasegroupid, handler, inc, priority=priority, important=important)
def get_release_by_id(self, releaseid, handler, inc=[], mblogin=False):
self._get_by_id('release', releaseid, handler, inc, mblogin=mblogin)
def get_release_by_id(self, releaseid, handler, inc=[], priority=True, important=False, mblogin=False):
return self._get_by_id('release', releaseid, handler, inc, priority=priority, important=important, mblogin=mblogin)
def get_track_by_id(self, trackid, handler):
def get_track_by_id(self, trackid, handler, priority=False, important=False):
inc = ['releases', 'release-groups', 'media', 'artist-credits']
self._get_by_id('recording', trackid, handler, inc)
return self._get_by_id('recording', trackid, handler, inc, priority=priority, important=important)
def lookup_puid(self, puid, handler):
def lookup_puid(self, puid, handler, priority=False, important=False):
inc = ['releases', 'release-groups', 'media', 'artist-credits']
self._get_by_id('puid', puid, handler, inc)
return self._get_by_id('puid', puid, handler, inc, priority=False, important=False)
def lookup_discid(self, discid, handler):
self._get_by_id('discid', discid, handler, ['artist-credits', 'labels'])
def lookup_discid(self, discid, handler, priority=True, important=True):
inc = ['artist-credits', 'labels']
return self._get_by_id('discid', discid, handler, inc, params=["cdstubs=no"], priority=priority, important=important)
def _find(self, entitytype, handler, kwargs):
host = self.config.setting["server_host"]
@@ -309,19 +288,19 @@ class XmlWebService(QtCore.QObject):
value = str(QtCore.QUrl.toPercentEncoding(QtCore.QString(value)))
params.append('%s=%s' % (str(name), value))
path = "/ws/2/%s/?%s" % (entitytype, "&".join(params))
self.get(host, port, path, handler)
return self.get(host, port, path, handler)
def find_releases(self, handler, **kwargs):
self._find('release', handler, kwargs)
return self._find('release', handler, kwargs)
def find_tracks(self, handler, **kwargs):
self._find('recording', handler, kwargs)
return self._find('recording', handler, kwargs)
def submit_puids(self, puids, handler):
path = '/ws/2/recording/?client=' + USER_AGENT_STRING
recordings = ''.join(['<recording id="%s"><puid-list><puid id="%s"/></puid-list></recording>' % i for i in puids.items()])
data = _wrap_xml_metadata('<recording-list>%s</recording-list>' % recordings)
self.post(PUID_SUBMIT_HOST, PUID_SUBMIT_PORT, path, data, handler)
return self.post(PUID_SUBMIT_HOST, PUID_SUBMIT_PORT, path, data, handler, priority=True, important=True)
def submit_ratings(self, ratings, handler):
host = self.config.setting['server_host']
@@ -330,16 +309,15 @@ class XmlWebService(QtCore.QObject):
recordings = (''.join(['<recording id="%s"><user-rating>%s</user-rating></recording>' %
(i[1], j*20) for i, j in ratings.items() if i[0] == 'recording']))
data = _wrap_xml_metadata('<recording-list>%s</recording-list>' % recordings)
self.post(host, port, path, data, handler)
return self.post(host, port, path, data, handler, priority=True, important=True)
def query_musicdns(self, handler, **kwargs):
host = 'ofa.musicdns.org'
port = 80
host, port = 'ofa.musicdns.org', 80
filters = []
for name, value in kwargs.items():
value = str(QtCore.QUrl.toPercentEncoding(value))
filters.append('%s=%s' % (str(name), value))
self.post(host, port, '/ofa/1/track/', '&'.join(filters), handler, mblogin = False)
return self.post(host, port, '/ofa/1/track/', '&'.join(filters), handler, mblogin=False)
def download(self, host, port, path, handler, position=None):
self.get(host, port, path, handler, xml=False, position=position)
def download(self, host, port, path, handler, priority=False, important=False):
return self.get(host, port, path, handler, xml=False, priority=priority, important=important)