Use QThreadPool for threading

This commit is contained in:
Michael Wiencek
2013-07-04 19:35:20 -05:00
parent 72dce65f09
commit e40e1f393b
10 changed files with 76 additions and 268 deletions

View File

@@ -22,7 +22,7 @@ from functools import partial
from PyQt4 import QtCore
from picard import config, log
from picard.const import ACOUSTID_KEY, FPCALC_NAMES
from picard.util import call_next, find_executable
from picard.util import find_executable
from picard.webservice import XmlNode

View File

@@ -31,7 +31,7 @@ from picard.file import File
from picard.track import Track
from picard.script import ScriptParser
from picard.ui.item import Item
from picard.util import format_time, queue, mbid_validate, asciipunct
from picard.util import format_time, mbid_validate, asciipunct
from picard.cluster import Cluster
from picard.collection import Collection, user_collections, load_user_collections
from picard.mbxml import (
@@ -60,7 +60,7 @@ class Album(DataObject, Item):
self._requests = 0
self._tracks_loaded = False
self._discid = discid
self._after_load_callbacks = queue.Queue()
self._after_load_callbacks = []
self.unmatched_files = Cluster(_("Unmatched Files"), special=True, related_album=self, hide_if_empty=True)
self.errors = []
@@ -265,9 +265,9 @@ class Album(DataObject, Item):
self.match_files(self.unmatched_files.files)
self.update()
self.tagger.window.set_statusbar_message(_('Album %s loaded'), self.id, timeout=3000)
while self._after_load_callbacks.qsize() > 0:
func = self._after_load_callbacks.get()
for func in self._after_load_callbacks:
func()
self._after_load_callbacks = []
def load(self):
if self._requests:
@@ -310,7 +310,7 @@ class Album(DataObject, Item):
if self.loaded:
func()
else:
self._after_load_callbacks.put(func)
self._after_load_callbacks.append(func)
def stop_loading(self):
if self.load_task:

View File

@@ -35,7 +35,6 @@ from picard.ui.item import Item
from picard.script import ScriptParser
from picard.similarity import similarity2
from picard.util import (
call_next,
decode_filename,
encode_filename,
make_short_filename,
@@ -45,8 +44,9 @@ from picard.util import (
unaccent,
format_time,
pathcmp,
mimetype
)
mimetype,
thread
)
class File(QtCore.QObject, Item):
@@ -89,14 +89,13 @@ class File(QtCore.QObject, Item):
def __repr__(self):
return '<File %r>' % self.base_filename
def load(self, next):
self.tagger.load_queue.put((
def load(self, callback):
thread.run_task(
partial(self._load, self.filename),
partial(self._loading_finished, next),
QtCore.Qt.LowEventPriority + 1))
partial(self._loading_finished, callback),
priority=1)
@call_next
def _loading_finished(self, next, result=None, error=None):
def _loading_finished(self, callback, result=None, error=None):
if self.state != self.PENDING:
return
if error is not None:
@@ -107,7 +106,7 @@ class File(QtCore.QObject, Item):
self.state = self.NORMAL
self._copy_loaded_metadata(result)
self.update()
return self
callback(self)
def _copy_loaded_metadata(self, metadata):
filename, _ = os.path.splitext(self.base_filename)
@@ -153,14 +152,14 @@ class File(QtCore.QObject, Item):
"""Load metadata from the file."""
raise NotImplementedError
def save(self, next):
def save(self):
self.set_pending()
metadata = Metadata()
metadata.copy(self.metadata)
self.tagger.save_queue.put((
thread.run_task(
partial(self._save_and_rename, self.filename, metadata),
partial(self._saving_finished, next),
QtCore.Qt.LowEventPriority + 2))
self._saving_finished,
priority=2)
def _save_and_rename(self, old_filename, metadata):
"""Save the metadata."""
@@ -209,8 +208,7 @@ class File(QtCore.QObject, Item):
else:
raise OSError
@call_next
def _saving_finished(self, next, result=None, error=None):
def _saving_finished(self, result=None, error=None):
old_filename = new_filename = self.filename
if error is not None:
self.error = str(error)
@@ -235,7 +233,9 @@ class File(QtCore.QObject, Item):
self.error = None
self.clear_pending()
self._add_path_to_metadata(self.orig_metadata)
return self, old_filename, new_filename
del self.tagger.files[old_filename]
self.tagger.files[new_filename] = self
def _save(self, filename, metadata):
"""Save the metadata."""

View File

@@ -42,7 +42,7 @@ def unregister_receiver(receiver):
_receivers.remove(receiver)
def _message(level, message, args, kwargs):
def _message(level, message, *args):
if not log_levels & level:
return
if not (isinstance(message, str) or isinstance(message, unicode)):
@@ -56,26 +56,26 @@ def _message(level, message, args, kwargs):
entries.append((level, time, message))
for func in _receivers:
try:
func(level, time, message)
thread.to_main(func, level, time, message)
except Exception, e:
import traceback
traceback.print_exc()
def debug(message, *args, **kwargs):
thread.proxy_to_main(_message, LOG_DEBUG, message, args, kwargs)
def debug(message, *args):
_message(LOG_DEBUG, message, *args)
def info(message, *args, **kwargs):
thread.proxy_to_main(_message, LOG_INFO, message, args, kwargs)
def info(message, *args):
_message(LOG_INFO, message, *args)
def warning(message, *args, **kwargs):
thread.proxy_to_main(_message, LOG_WARNING, message, args, kwargs)
def warning(message, *args):
_message(LOG_WARNING, message, *args)
def error(message, *args, **kwargs):
thread.proxy_to_main(_message, LOG_ERROR, message, args, kwargs)
def error(message, *args):
_message(LOG_ERROR, message, *args)
_log_prefixes = {

View File

@@ -91,24 +91,9 @@ class Tagger(QtGui.QApplication):
self._args = args
self._autoupdate = autoupdate
# Initialize threading and allocate threads
self.thread_pool = thread.ThreadPool(self)
self.load_queue = queue.Queue()
self.save_queue = queue.Queue()
self.analyze_queue = queue.Queue()
self.other_queue = queue.Queue()
threads = self.thread_pool.threads
for i in range(4):
threads.append(thread.Thread(self.thread_pool, self.load_queue))
threads.append(thread.Thread(self.thread_pool, self.save_queue))
threads.append(thread.Thread(self.thread_pool, self.other_queue))
threads.append(thread.Thread(self.thread_pool, self.other_queue))
threads.append(thread.Thread(self.thread_pool, self.analyze_queue))
self.thread_pool.start()
self.stopping = False
# FIXME: Figure out what's wrong with QThreadPool.globalInstance().
# It's a valid reference, but its start() doesn't work.
self.thread_pool = QtCore.QThreadPool(self)
# Setup logging
if debug or "PICARD_DEBUG" in os.environ:
@@ -249,7 +234,7 @@ class Tagger(QtGui.QApplication):
def exit(self):
self.stopping = True
self._acoustid.done()
self.thread_pool.stop()
self.thread_pool.waitForDone()
self.browser_integration.stop()
self.xmlws.stop()
@@ -274,7 +259,9 @@ class Tagger(QtGui.QApplication):
return res
def event(self, event):
if event.type() == QtCore.QEvent.FileOpen:
if isinstance(event, thread.ProxyToMainEvent):
event.run()
elif event.type() == QtCore.QEvent.FileOpen:
f = str(event.file())
self.add_files([f])
# We should just return True here, except that seems to
@@ -283,9 +270,8 @@ class Tagger(QtGui.QApplication):
return 1
return QtGui.QApplication.event(self, event)
def _file_loaded(self, target, result=None, error=None):
file = result
if file is not None and error is None and not file.has_error():
def _file_loaded(self, file, target=None):
if file is not None and not file.has_error():
trackid = file.metadata['musicbrainz_trackid']
if target is not None:
self.move_files([file], target)
@@ -331,7 +317,7 @@ class Tagger(QtGui.QApplication):
self.unmatched_files.add_files(new_files)
target = None
for file in new_files:
file.load(partial(self._file_loaded, target))
file.load(partial(self._file_loaded, target=target))
def add_directory(self, path):
walk = os.walk(unicode(path))
@@ -349,7 +335,7 @@ class Tagger(QtGui.QApplication):
if result:
if error is None:
self.add_files(result)
self.other_queue.put((get_files, process, QtCore.Qt.LowEventPriority))
thread.run_task(get_files, process)
process(True, False)
@@ -388,17 +374,11 @@ class Tagger(QtGui.QApplication):
"""Return list of files from list of albums, clusters, tracks or files."""
return uniqify(chain(*[obj.iterfiles(save) for obj in objects]))
def _file_saved(self, result=None, error=None):
if error is None:
file, old_filename, new_filename = result
del self.files[old_filename]
self.files[new_filename] = file
def save(self, objects):
"""Save the specified objects."""
files = self.get_files_from_objects(objects, save=True)
for file in files:
file.save(self._file_saved)
file.save()
def load_album(self, id, discid=None):
id = self.mbid_redirects.get(id, id)
@@ -501,10 +481,9 @@ class Tagger(QtGui.QApplication):
disc = Disc()
self.set_wait_cursor()
self.other_queue.put((
thread.run_task(
partial(disc.read, encode_filename(device)),
partial(self._lookup_disc, disc),
QtCore.Qt.LowEventPriority))
partial(self._lookup_disc, disc))
@property
def use_acoustid(self):

View File

@@ -22,6 +22,7 @@ from PyQt4 import QtCore, QtGui
import sys
import os.path
from functools import partial
from picard import config, log
from picard.file import File
from picard.track import Track
@@ -36,7 +37,7 @@ from picard.ui.options.dialog import OptionsDialog
from picard.ui.infodialog import FileInfoDialog, AlbumInfoDialog
from picard.ui.infostatus import InfoStatus
from picard.ui.passworddialog import PasswordDialog
from picard.util import icontheme, webbrowser2, find_existing_path, throttle
from picard.util import icontheme, webbrowser2, find_existing_path, throttle, thread
from picard.util.cdrom import get_cdrom_drives
from picard.plugin import ExtensionPoint
@@ -242,21 +243,17 @@ class MainWindow(QtGui.QMainWindow):
def set_statusbar_message(self, message, *args, **kwargs):
"""Set the status bar message."""
try:
if message:
log.debug(repr(message.replace('%%s', '%%r')), *args)
except:
pass
self.tagger.thread_pool.call_from_thread(
self._set_statusbar_message, message, *args, **kwargs)
def _set_statusbar_message(self, message, *args, **kwargs):
if message:
try:
log.debug(repr(message.replace('%%s', '%%r')), *args)
except:
pass
if args:
message = _(message) % args
else:
message = _(message)
self.statusBar().showMessage(message, kwargs.get('timeout', 0))
thread.to_main(self.statusBar().showMessage, message,
kwargs.get("timeout", 0))
def _on_submit(self):
if self.tagger.use_acoustid:

View File

@@ -26,7 +26,7 @@ from picard.album import Album
from picard.cluster import Cluster
from picard.track import Track
from picard.file import File
from picard.util import format_time, throttle
from picard.util import format_time, throttle, thread
from picard.util.tags import display_tag_name
from picard.ui.edittagdialog import EditTagDialog
from picard.metadata import MULTI_VALUED_JOINER
@@ -332,12 +332,9 @@ class MetadataBox(QtGui.QTableWidget):
def update(self):
if self.editing:
return
if self.selection_dirty:
self._update_selection()
self.tagger.other_queue.put((
self._update_tags, self._update_items, QtCore.Qt.LowEventPriority))
thread.run_task(self._update_tags, self._update_items)
def _update_tags(self):
self.selection_mutex.lock()

View File

@@ -288,23 +288,6 @@ def find_executable(*executables):
return f
def call_next(func):
def func_wrapper(self, *args, **kwargs):
next = args[0]
result = None
try:
result = func(self, *args, **kwargs)
except:
import traceback
from picard import log
log.error(traceback.format_exc())
next(error=sys.exc_info()[1])
else:
next(result=result)
func_wrapper.__name__ = func.__name__
return func_wrapper
_mbid_format = Template('$h{8}-$h$l-$h$l-$h$l-$h{12}').safe_substitute(h='[0-9a-fA-F]', l='{4}')
_re_mbid_val = re.compile(_mbid_format)
def mbid_validate(string):

View File

@@ -1,100 +0,0 @@
"""A multi-producer, multi-consumer queue."""
from collections import deque
from PyQt4 import QtCore
class Queue:
"""Create a queue object with a given maximum size.
If maxsize is <= 0, the queue size is infinite.
"""
def __init__(self, maxsize=0):
self._init(maxsize)
# mutex must be held whenever the queue is mutating. All methods
# that acquire mutex must release it before returning. mutex
# is shared between the two conditions, so acquiring and
# releasing the conditions also acquires and releases mutex.
self.mutex = QtCore.QMutex()
# Notify not_empty whenever an item is added to the queue; a
# thread waiting to get is notified then.
self.not_empty = QtCore.QWaitCondition()
# Notify not_full whenever an item is removed from the queue;
# a thread waiting to put is notified then.
self.not_full = QtCore.QWaitCondition()
# Notify all_tasks_done whenever the number of unfinished tasks
# drops to zero; thread waiting to join() is notified to resume
self.all_tasks_done = QtCore.QWaitCondition()
self.unfinished_tasks = 0
def unlock(self):
self.mutex.lock()
self.maxsize = 0
self.mutex.unlock()
self.not_full.wakeAll()
def qsize(self):
"""Return the approximate size of the queue (not reliable!)."""
self.mutex.lock()
n = self._qsize()
self.mutex.unlock()
return n
def put(self, item):
"""Put an item into the queue."""
self.mutex.lock()
try:
while self._full():
self.not_full.wait(self.mutex)
self._put(item)
self.not_empty.wakeOne()
finally:
self.mutex.unlock()
def remove(self, item):
"""Remove an item from the queue."""
self.mutex.lock()
try:
self.queue.remove(item)
except ValueError:
pass
else:
self.not_full.wakeOne()
finally:
self.mutex.unlock()
def get(self):
"""Remove and return an item from the queue."""
self.mutex.lock()
try:
while self._empty():
self.not_empty.wait(self.mutex)
item = self._get()
self.not_full.wakeOne()
return item
finally:
self.mutex.unlock()
# Initialize the queue representation
def _init(self, maxsize):
self.maxsize = maxsize
self.queue = deque()
def _qsize(self):
return len(self.queue)
# Check whether the queue is empty
def _empty(self):
return not self.queue
# Check whether the queue is full
def _full(self):
return self.maxsize > 0 and len(self.queue) == self.maxsize
# Put a new item in the queue
def _put(self, item):
self.queue.append(item)
# Get an item from the queue
def _get(self):
return self.queue.popleft()

View File

@@ -19,92 +19,44 @@
import sys
import traceback
from PyQt4 import QtCore
from PyQt4.QtCore import QThreadPool, QRunnable, QCoreApplication, QEvent
class ProxyToMainEvent(QtCore.QEvent):
class ProxyToMainEvent(QEvent):
def __init__(self, func, args, kwargs):
QtCore.QEvent.__init__(self, QtCore.QEvent.User)
def __init__(self, func, *args, **kwargs):
QEvent.__init__(self, QEvent.User)
self.func = func
self.args = args
self.kwargs = kwargs
def call(self):
def run(self):
self.func(*self.args, **self.kwargs)
class Thread(QtCore.QThread):
class Runnable(QRunnable):
def __init__(self, parent, queue):
QtCore.QThread.__init__(self, parent)
self.queue = queue
self.stopping = False
def stop(self):
self.stopping = True
self.queue.put(None)
def __init__(self, func, next):
QRunnable.__init__(self)
self.func = func
self.next = next
def run(self):
while not self.stopping:
item = self.queue.get()
if item is None:
continue
self.run_item(item)
def run_item(self, item):
func, next, priority = item
try:
result = func()
result = self.func()
except:
from picard import log
log.error(traceback.format_exc())
self.to_main(next, priority, error=sys.exc_info()[1])
to_main(self.next, error=sys.exc_info()[1])
else:
self.to_main(next, priority, result=result)
def to_main(self, func, priority, *args, **kwargs):
event = ProxyToMainEvent(func, args, kwargs)
QtCore.QCoreApplication.postEvent(self.parent(), event, priority)
to_main(self.next, result=result)
class ThreadPool(QtCore.QObject):
instance = None
def __init__(self, parent=None):
QtCore.QObject.__init__(self, parent)
self.threads = []
ThreadPool.instance = self
def start(self):
for thread in self.threads:
thread.start(QtCore.QThread.LowPriority)
def stop(self):
queues = set()
for thread in self.threads:
thread.stop()
queues.add(thread.queue)
for queue in queues:
queue.unlock()
def event(self, event):
if isinstance(event, ProxyToMainEvent):
try:
event.call()
except:
from picard import log
log.error(traceback.format_exc())
return True
return False
def call_from_thread(self, handler, *args, **kwargs):
priority = kwargs.pop('priority', QtCore.Qt.LowEventPriority)
event = ProxyToMainEvent(handler, args, kwargs)
QtCore.QCoreApplication.postEvent(self, event, priority)
def run_task(func, next, priority=0):
QCoreApplication.instance().thread_pool.start(
Runnable(func, next), priority)
# REMOVEME
def proxy_to_main(handler, *args, **kwargs):
ThreadPool.instance.call_from_thread(handler, *args, **kwargs)
def to_main(func, *args, **kwargs):
QCoreApplication.postEvent(QCoreApplication.instance(),
ProxyToMainEvent(func, *args, **kwargs))