From e40e1f393be96cc63445a89a84ea9e89a3fa7279 Mon Sep 17 00:00:00 2001 From: Michael Wiencek Date: Thu, 4 Jul 2013 19:35:20 -0500 Subject: [PATCH] Use QThreadPool for threading --- picard/acoustid.py | 2 +- picard/album.py | 10 ++-- picard/file.py | 34 ++++++------- picard/log.py | 20 ++++---- picard/tagger.py | 49 ++++++------------- picard/ui/mainwindow.py | 19 ++++---- picard/ui/metadatabox.py | 7 +-- picard/util/__init__.py | 17 ------- picard/util/queue.py | 100 --------------------------------------- picard/util/thread.py | 86 ++++++++------------------------- 10 files changed, 76 insertions(+), 268 deletions(-) delete mode 100644 picard/util/queue.py diff --git a/picard/acoustid.py b/picard/acoustid.py index dd10e9e12..4d0858e1c 100644 --- a/picard/acoustid.py +++ b/picard/acoustid.py @@ -22,7 +22,7 @@ from functools import partial from PyQt4 import QtCore from picard import config, log from picard.const import ACOUSTID_KEY, FPCALC_NAMES -from picard.util import call_next, find_executable +from picard.util import find_executable from picard.webservice import XmlNode diff --git a/picard/album.py b/picard/album.py index d7afd67d5..d7d77d2e5 100644 --- a/picard/album.py +++ b/picard/album.py @@ -31,7 +31,7 @@ from picard.file import File from picard.track import Track from picard.script import ScriptParser from picard.ui.item import Item -from picard.util import format_time, queue, mbid_validate, asciipunct +from picard.util import format_time, mbid_validate, asciipunct from picard.cluster import Cluster from picard.collection import Collection, user_collections, load_user_collections from picard.mbxml import ( @@ -60,7 +60,7 @@ class Album(DataObject, Item): self._requests = 0 self._tracks_loaded = False self._discid = discid - self._after_load_callbacks = queue.Queue() + self._after_load_callbacks = [] self.unmatched_files = Cluster(_("Unmatched Files"), special=True, related_album=self, hide_if_empty=True) self.errors = [] @@ -265,9 +265,9 @@ class Album(DataObject, Item): self.match_files(self.unmatched_files.files) self.update() self.tagger.window.set_statusbar_message(_('Album %s loaded'), self.id, timeout=3000) - while self._after_load_callbacks.qsize() > 0: - func = self._after_load_callbacks.get() + for func in self._after_load_callbacks: func() + self._after_load_callbacks = [] def load(self): if self._requests: @@ -310,7 +310,7 @@ class Album(DataObject, Item): if self.loaded: func() else: - self._after_load_callbacks.put(func) + self._after_load_callbacks.append(func) def stop_loading(self): if self.load_task: diff --git a/picard/file.py b/picard/file.py index a699086f4..a287adae7 100644 --- a/picard/file.py +++ b/picard/file.py @@ -35,7 +35,6 @@ from picard.ui.item import Item from picard.script import ScriptParser from picard.similarity import similarity2 from picard.util import ( - call_next, decode_filename, encode_filename, make_short_filename, @@ -45,8 +44,9 @@ from picard.util import ( unaccent, format_time, pathcmp, - mimetype - ) + mimetype, + thread +) class File(QtCore.QObject, Item): @@ -89,14 +89,13 @@ class File(QtCore.QObject, Item): def __repr__(self): return '' % self.base_filename - def load(self, next): - self.tagger.load_queue.put(( + def load(self, callback): + thread.run_task( partial(self._load, self.filename), - partial(self._loading_finished, next), - QtCore.Qt.LowEventPriority + 1)) + partial(self._loading_finished, callback), + priority=1) - @call_next - def _loading_finished(self, next, result=None, error=None): + def _loading_finished(self, callback, result=None, error=None): if self.state != self.PENDING: return if error is not None: @@ -107,7 +106,7 @@ class File(QtCore.QObject, Item): self.state = self.NORMAL self._copy_loaded_metadata(result) self.update() - return self + callback(self) def _copy_loaded_metadata(self, metadata): filename, _ = os.path.splitext(self.base_filename) @@ -153,14 +152,14 @@ class File(QtCore.QObject, Item): """Load metadata from the file.""" raise NotImplementedError - def save(self, next): + def save(self): self.set_pending() metadata = Metadata() metadata.copy(self.metadata) - self.tagger.save_queue.put(( + thread.run_task( partial(self._save_and_rename, self.filename, metadata), - partial(self._saving_finished, next), - QtCore.Qt.LowEventPriority + 2)) + self._saving_finished, + priority=2) def _save_and_rename(self, old_filename, metadata): """Save the metadata.""" @@ -209,8 +208,7 @@ class File(QtCore.QObject, Item): else: raise OSError - @call_next - def _saving_finished(self, next, result=None, error=None): + def _saving_finished(self, result=None, error=None): old_filename = new_filename = self.filename if error is not None: self.error = str(error) @@ -235,7 +233,9 @@ class File(QtCore.QObject, Item): self.error = None self.clear_pending() self._add_path_to_metadata(self.orig_metadata) - return self, old_filename, new_filename + + del self.tagger.files[old_filename] + self.tagger.files[new_filename] = self def _save(self, filename, metadata): """Save the metadata.""" diff --git a/picard/log.py b/picard/log.py index 111709fb9..d9e40560f 100644 --- a/picard/log.py +++ b/picard/log.py @@ -42,7 +42,7 @@ def unregister_receiver(receiver): _receivers.remove(receiver) -def _message(level, message, args, kwargs): +def _message(level, message, *args): if not log_levels & level: return if not (isinstance(message, str) or isinstance(message, unicode)): @@ -56,26 +56,26 @@ def _message(level, message, args, kwargs): entries.append((level, time, message)) for func in _receivers: try: - func(level, time, message) + thread.to_main(func, level, time, message) except Exception, e: import traceback traceback.print_exc() -def debug(message, *args, **kwargs): - thread.proxy_to_main(_message, LOG_DEBUG, message, args, kwargs) +def debug(message, *args): + _message(LOG_DEBUG, message, *args) -def info(message, *args, **kwargs): - thread.proxy_to_main(_message, LOG_INFO, message, args, kwargs) +def info(message, *args): + _message(LOG_INFO, message, *args) -def warning(message, *args, **kwargs): - thread.proxy_to_main(_message, LOG_WARNING, message, args, kwargs) +def warning(message, *args): + _message(LOG_WARNING, message, *args) -def error(message, *args, **kwargs): - thread.proxy_to_main(_message, LOG_ERROR, message, args, kwargs) +def error(message, *args): + _message(LOG_ERROR, message, *args) _log_prefixes = { diff --git a/picard/tagger.py b/picard/tagger.py index d562d36d9..54ce315a4 100644 --- a/picard/tagger.py +++ b/picard/tagger.py @@ -91,24 +91,9 @@ class Tagger(QtGui.QApplication): self._args = args self._autoupdate = autoupdate - # Initialize threading and allocate threads - self.thread_pool = thread.ThreadPool(self) - - self.load_queue = queue.Queue() - self.save_queue = queue.Queue() - self.analyze_queue = queue.Queue() - self.other_queue = queue.Queue() - - threads = self.thread_pool.threads - for i in range(4): - threads.append(thread.Thread(self.thread_pool, self.load_queue)) - threads.append(thread.Thread(self.thread_pool, self.save_queue)) - threads.append(thread.Thread(self.thread_pool, self.other_queue)) - threads.append(thread.Thread(self.thread_pool, self.other_queue)) - threads.append(thread.Thread(self.thread_pool, self.analyze_queue)) - - self.thread_pool.start() - self.stopping = False + # FIXME: Figure out what's wrong with QThreadPool.globalInstance(). + # It's a valid reference, but its start() doesn't work. + self.thread_pool = QtCore.QThreadPool(self) # Setup logging if debug or "PICARD_DEBUG" in os.environ: @@ -249,7 +234,7 @@ class Tagger(QtGui.QApplication): def exit(self): self.stopping = True self._acoustid.done() - self.thread_pool.stop() + self.thread_pool.waitForDone() self.browser_integration.stop() self.xmlws.stop() @@ -274,7 +259,9 @@ class Tagger(QtGui.QApplication): return res def event(self, event): - if event.type() == QtCore.QEvent.FileOpen: + if isinstance(event, thread.ProxyToMainEvent): + event.run() + elif event.type() == QtCore.QEvent.FileOpen: f = str(event.file()) self.add_files([f]) # We should just return True here, except that seems to @@ -283,9 +270,8 @@ class Tagger(QtGui.QApplication): return 1 return QtGui.QApplication.event(self, event) - def _file_loaded(self, target, result=None, error=None): - file = result - if file is not None and error is None and not file.has_error(): + def _file_loaded(self, file, target=None): + if file is not None and not file.has_error(): trackid = file.metadata['musicbrainz_trackid'] if target is not None: self.move_files([file], target) @@ -331,7 +317,7 @@ class Tagger(QtGui.QApplication): self.unmatched_files.add_files(new_files) target = None for file in new_files: - file.load(partial(self._file_loaded, target)) + file.load(partial(self._file_loaded, target=target)) def add_directory(self, path): walk = os.walk(unicode(path)) @@ -349,7 +335,7 @@ class Tagger(QtGui.QApplication): if result: if error is None: self.add_files(result) - self.other_queue.put((get_files, process, QtCore.Qt.LowEventPriority)) + thread.run_task(get_files, process) process(True, False) @@ -388,17 +374,11 @@ class Tagger(QtGui.QApplication): """Return list of files from list of albums, clusters, tracks or files.""" return uniqify(chain(*[obj.iterfiles(save) for obj in objects])) - def _file_saved(self, result=None, error=None): - if error is None: - file, old_filename, new_filename = result - del self.files[old_filename] - self.files[new_filename] = file - def save(self, objects): """Save the specified objects.""" files = self.get_files_from_objects(objects, save=True) for file in files: - file.save(self._file_saved) + file.save() def load_album(self, id, discid=None): id = self.mbid_redirects.get(id, id) @@ -501,10 +481,9 @@ class Tagger(QtGui.QApplication): disc = Disc() self.set_wait_cursor() - self.other_queue.put(( + thread.run_task( partial(disc.read, encode_filename(device)), - partial(self._lookup_disc, disc), - QtCore.Qt.LowEventPriority)) + partial(self._lookup_disc, disc)) @property def use_acoustid(self): diff --git a/picard/ui/mainwindow.py b/picard/ui/mainwindow.py index ada47174e..c6cf3c81b 100644 --- a/picard/ui/mainwindow.py +++ b/picard/ui/mainwindow.py @@ -22,6 +22,7 @@ from PyQt4 import QtCore, QtGui import sys import os.path +from functools import partial from picard import config, log from picard.file import File from picard.track import Track @@ -36,7 +37,7 @@ from picard.ui.options.dialog import OptionsDialog from picard.ui.infodialog import FileInfoDialog, AlbumInfoDialog from picard.ui.infostatus import InfoStatus from picard.ui.passworddialog import PasswordDialog -from picard.util import icontheme, webbrowser2, find_existing_path, throttle +from picard.util import icontheme, webbrowser2, find_existing_path, throttle, thread from picard.util.cdrom import get_cdrom_drives from picard.plugin import ExtensionPoint @@ -242,21 +243,17 @@ class MainWindow(QtGui.QMainWindow): def set_statusbar_message(self, message, *args, **kwargs): """Set the status bar message.""" - try: - if message: - log.debug(repr(message.replace('%%s', '%%r')), *args) - except: - pass - self.tagger.thread_pool.call_from_thread( - self._set_statusbar_message, message, *args, **kwargs) - - def _set_statusbar_message(self, message, *args, **kwargs): if message: + try: + log.debug(repr(message.replace('%%s', '%%r')), *args) + except: + pass if args: message = _(message) % args else: message = _(message) - self.statusBar().showMessage(message, kwargs.get('timeout', 0)) + thread.to_main(self.statusBar().showMessage, message, + kwargs.get("timeout", 0)) def _on_submit(self): if self.tagger.use_acoustid: diff --git a/picard/ui/metadatabox.py b/picard/ui/metadatabox.py index 5e22a8b05..6f8a93090 100644 --- a/picard/ui/metadatabox.py +++ b/picard/ui/metadatabox.py @@ -26,7 +26,7 @@ from picard.album import Album from picard.cluster import Cluster from picard.track import Track from picard.file import File -from picard.util import format_time, throttle +from picard.util import format_time, throttle, thread from picard.util.tags import display_tag_name from picard.ui.edittagdialog import EditTagDialog from picard.metadata import MULTI_VALUED_JOINER @@ -332,12 +332,9 @@ class MetadataBox(QtGui.QTableWidget): def update(self): if self.editing: return - if self.selection_dirty: self._update_selection() - - self.tagger.other_queue.put(( - self._update_tags, self._update_items, QtCore.Qt.LowEventPriority)) + thread.run_task(self._update_tags, self._update_items) def _update_tags(self): self.selection_mutex.lock() diff --git a/picard/util/__init__.py b/picard/util/__init__.py index b08deed90..9e7263de0 100644 --- a/picard/util/__init__.py +++ b/picard/util/__init__.py @@ -288,23 +288,6 @@ def find_executable(*executables): return f -def call_next(func): - def func_wrapper(self, *args, **kwargs): - next = args[0] - result = None - try: - result = func(self, *args, **kwargs) - except: - import traceback - from picard import log - log.error(traceback.format_exc()) - next(error=sys.exc_info()[1]) - else: - next(result=result) - func_wrapper.__name__ = func.__name__ - return func_wrapper - - _mbid_format = Template('$h{8}-$h$l-$h$l-$h$l-$h{12}').safe_substitute(h='[0-9a-fA-F]', l='{4}') _re_mbid_val = re.compile(_mbid_format) def mbid_validate(string): diff --git a/picard/util/queue.py b/picard/util/queue.py deleted file mode 100644 index c6d286307..000000000 --- a/picard/util/queue.py +++ /dev/null @@ -1,100 +0,0 @@ -"""A multi-producer, multi-consumer queue.""" - -from collections import deque -from PyQt4 import QtCore - - -class Queue: - """Create a queue object with a given maximum size. - - If maxsize is <= 0, the queue size is infinite. - """ - def __init__(self, maxsize=0): - self._init(maxsize) - # mutex must be held whenever the queue is mutating. All methods - # that acquire mutex must release it before returning. mutex - # is shared between the two conditions, so acquiring and - # releasing the conditions also acquires and releases mutex. - self.mutex = QtCore.QMutex() - # Notify not_empty whenever an item is added to the queue; a - # thread waiting to get is notified then. - self.not_empty = QtCore.QWaitCondition() - # Notify not_full whenever an item is removed from the queue; - # a thread waiting to put is notified then. - self.not_full = QtCore.QWaitCondition() - # Notify all_tasks_done whenever the number of unfinished tasks - # drops to zero; thread waiting to join() is notified to resume - self.all_tasks_done = QtCore.QWaitCondition() - self.unfinished_tasks = 0 - - def unlock(self): - self.mutex.lock() - self.maxsize = 0 - self.mutex.unlock() - self.not_full.wakeAll() - - def qsize(self): - """Return the approximate size of the queue (not reliable!).""" - self.mutex.lock() - n = self._qsize() - self.mutex.unlock() - return n - - def put(self, item): - """Put an item into the queue.""" - self.mutex.lock() - try: - while self._full(): - self.not_full.wait(self.mutex) - self._put(item) - self.not_empty.wakeOne() - finally: - self.mutex.unlock() - - def remove(self, item): - """Remove an item from the queue.""" - self.mutex.lock() - try: - self.queue.remove(item) - except ValueError: - pass - else: - self.not_full.wakeOne() - finally: - self.mutex.unlock() - - def get(self): - """Remove and return an item from the queue.""" - self.mutex.lock() - try: - while self._empty(): - self.not_empty.wait(self.mutex) - item = self._get() - self.not_full.wakeOne() - return item - finally: - self.mutex.unlock() - - # Initialize the queue representation - def _init(self, maxsize): - self.maxsize = maxsize - self.queue = deque() - - def _qsize(self): - return len(self.queue) - - # Check whether the queue is empty - def _empty(self): - return not self.queue - - # Check whether the queue is full - def _full(self): - return self.maxsize > 0 and len(self.queue) == self.maxsize - - # Put a new item in the queue - def _put(self, item): - self.queue.append(item) - - # Get an item from the queue - def _get(self): - return self.queue.popleft() diff --git a/picard/util/thread.py b/picard/util/thread.py index a3088c831..967a986ed 100644 --- a/picard/util/thread.py +++ b/picard/util/thread.py @@ -19,92 +19,44 @@ import sys import traceback -from PyQt4 import QtCore +from PyQt4.QtCore import QThreadPool, QRunnable, QCoreApplication, QEvent -class ProxyToMainEvent(QtCore.QEvent): +class ProxyToMainEvent(QEvent): - def __init__(self, func, args, kwargs): - QtCore.QEvent.__init__(self, QtCore.QEvent.User) + def __init__(self, func, *args, **kwargs): + QEvent.__init__(self, QEvent.User) self.func = func self.args = args self.kwargs = kwargs - def call(self): + def run(self): self.func(*self.args, **self.kwargs) -class Thread(QtCore.QThread): +class Runnable(QRunnable): - def __init__(self, parent, queue): - QtCore.QThread.__init__(self, parent) - self.queue = queue - self.stopping = False - - def stop(self): - self.stopping = True - self.queue.put(None) + def __init__(self, func, next): + QRunnable.__init__(self) + self.func = func + self.next = next def run(self): - while not self.stopping: - item = self.queue.get() - if item is None: - continue - self.run_item(item) - - def run_item(self, item): - func, next, priority = item try: - result = func() + result = self.func() except: from picard import log log.error(traceback.format_exc()) - self.to_main(next, priority, error=sys.exc_info()[1]) + to_main(self.next, error=sys.exc_info()[1]) else: - self.to_main(next, priority, result=result) - - def to_main(self, func, priority, *args, **kwargs): - event = ProxyToMainEvent(func, args, kwargs) - QtCore.QCoreApplication.postEvent(self.parent(), event, priority) + to_main(self.next, result=result) -class ThreadPool(QtCore.QObject): - - instance = None - - def __init__(self, parent=None): - QtCore.QObject.__init__(self, parent) - self.threads = [] - ThreadPool.instance = self - - def start(self): - for thread in self.threads: - thread.start(QtCore.QThread.LowPriority) - - def stop(self): - queues = set() - for thread in self.threads: - thread.stop() - queues.add(thread.queue) - for queue in queues: - queue.unlock() - - def event(self, event): - if isinstance(event, ProxyToMainEvent): - try: - event.call() - except: - from picard import log - log.error(traceback.format_exc()) - return True - return False - - def call_from_thread(self, handler, *args, **kwargs): - priority = kwargs.pop('priority', QtCore.Qt.LowEventPriority) - event = ProxyToMainEvent(handler, args, kwargs) - QtCore.QCoreApplication.postEvent(self, event, priority) +def run_task(func, next, priority=0): + QCoreApplication.instance().thread_pool.start( + Runnable(func, next), priority) -# REMOVEME -def proxy_to_main(handler, *args, **kwargs): - ThreadPool.instance.call_from_thread(handler, *args, **kwargs) +def to_main(func, *args, **kwargs): + QCoreApplication.postEvent(QCoreApplication.instance(), + ProxyToMainEvent(func, *args, **kwargs))