Files
picard/picard/pluginmanager.py
Philipp Wolfer c90c6c7efd Refactor log string substitution
Rely on the string substitutions of the logging module to avoid
unnecessary substitutions if log output never gets written
2023-04-28 09:57:04 +02:00

450 lines
17 KiB
Python

# -*- coding: utf-8 -*-
#
# Picard, the next-generation MusicBrainz tagger
#
# Copyright (C) 2007 Lukáš Lalinský
# Copyright (C) 2014 Shadab Zafar
# Copyright (C) 2015-2021 Laurent Monin
# Copyright (C) 2019 Wieland Hoffmann
# Copyright (C) 2019-2020 Philipp Wolfer
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from functools import partial
import imp
import importlib
import json
import os.path
import shutil
import tempfile
import zipfile
import zipimport
from PyQt5 import QtCore
from picard import log
from picard.const import (
PLUGINS_API,
USER_PLUGIN_DIR,
)
from picard.plugin import (
_PLUGIN_MODULE_PREFIX,
PluginData,
PluginWrapper,
_unregister_module_extensions,
)
import picard.plugins
from picard.version import (
Version,
VersionError,
)
_SUFFIXES = tuple(importlib.machinery.all_suffixes())
_PACKAGE_ENTRIES = ("__init__.py", "__init__.pyc", "__init__.pyo")
_PLUGIN_PACKAGE_SUFFIX = ".picard"
_PLUGIN_PACKAGE_SUFFIX_LEN = len(_PLUGIN_PACKAGE_SUFFIX)
_FILEEXTS = ('.py', '.pyc', '.pyo', '.zip')
_UPDATE_SUFFIX = '.update'
_UPDATE_SUFFIX_LEN = len(_UPDATE_SUFFIX)
_extension_points = []
def is_update(path):
return path.endswith(_UPDATE_SUFFIX)
def strip_update_suffix(path):
if not is_update(path):
return path
return path[:-_UPDATE_SUFFIX_LEN]
def is_zip(path):
return path.endswith('.zip')
def strip_zip_suffix(path):
if not is_zip(path):
return path
return path[:-4]
def is_package(path):
return path.endswith(_PLUGIN_PACKAGE_SUFFIX)
def strip_package_suffix(path):
if not is_package(path):
return path
return path[:-_PLUGIN_PACKAGE_SUFFIX_LEN]
def is_zipped_package(path):
return path.endswith(_PLUGIN_PACKAGE_SUFFIX + '.zip')
def _plugin_name_from_path(path):
path = os.path.normpath(path)
if is_zip(path):
name = os.path.basename(strip_zip_suffix(path))
if is_package(name):
return strip_package_suffix(name)
else:
return name
elif os.path.isdir(path):
for entry in _PACKAGE_ENTRIES:
if os.path.isfile(os.path.join(path, entry)):
return os.path.basename(path)
else:
file = os.path.basename(path)
if file in _PACKAGE_ENTRIES:
return None
name, ext = os.path.splitext(file)
if ext in _SUFFIXES:
return name
return None
def load_manifest(archive_path):
archive = zipfile.ZipFile(archive_path)
manifest_data = None
with archive.open('MANIFEST.json') as f:
manifest_data = json.loads(str(f.read().decode()))
return manifest_data
def zip_import(path):
if (not is_zip(path) or not os.path.isfile(path)):
return (None, None, None)
try:
zip_importer = zipimport.zipimporter(path)
plugin_name = _plugin_name_from_path(path)
manifest_data = None
if is_zipped_package(path):
try:
manifest_data = load_manifest(path)
except Exception as why:
log.warning("Failed to load manifest data from json: %s", why)
return (zip_importer, plugin_name, manifest_data)
except zipimport.ZipImportError as why:
log.error("ZIP import error: %s", why)
return (None, None, None)
def _compatible_api_versions(api_versions):
versions = [Version.from_string(v) for v in list(api_versions)]
return set(versions) & set(picard.api_versions_tuple)
class PluginManager(QtCore.QObject):
plugin_installed = QtCore.pyqtSignal(PluginWrapper, bool)
plugin_updated = QtCore.pyqtSignal(str, bool)
plugin_removed = QtCore.pyqtSignal(str, bool)
plugin_errored = QtCore.pyqtSignal(str, str, bool)
def __init__(self, plugins_directory=None):
super().__init__()
self.plugins = []
self._available_plugins = None # None=never loaded, [] = empty
if plugins_directory is None:
plugins_directory = USER_PLUGIN_DIR
self.plugins_directory = os.path.normpath(plugins_directory)
@property
def available_plugins(self):
return self._available_plugins
def plugin_error(self, name, error, *args, **kwargs):
"""Log a plugin loading error for the plugin `name` and signal the
error via the `plugin_errored` signal.
A string consisting of all `args` interpolated into `error` will be
passed to the function given via the `log_func` keyword argument
(default: log.error) and as the error message to the `plugin_errored`
signal."""
error = error % args
log_func = kwargs.get('log_func', log.error)
log_func(error)
self.plugin_errored.emit(name, error, False)
def _marked_for_update(self):
for file in os.listdir(self.plugins_directory):
if file.endswith(_UPDATE_SUFFIX):
source_path = os.path.join(self.plugins_directory, file)
target_path = strip_update_suffix(source_path)
plugin_name = _plugin_name_from_path(target_path)
if plugin_name:
yield (source_path, target_path, plugin_name)
else:
log.error('Cannot get plugin name from %r', source_path)
def handle_plugin_updates(self):
for source_path, target_path, plugin_name in self._marked_for_update():
self._remove_plugin(plugin_name)
os.rename(source_path, target_path)
log.debug('Updating plugin %r (%r))', plugin_name, target_path)
def load_plugins_from_directory(self, plugindir):
plugindir = os.path.normpath(plugindir)
if not os.path.isdir(plugindir):
log.info("Plugin directory %r doesn't exist", plugindir)
return
if plugindir == self.plugins_directory:
# .update trick is only for plugins installed through the Picard UI
# and only for plugins in plugins_directory (USER_PLUGIN_DIR by default)
self.handle_plugin_updates()
# now load found plugins
names = set()
for path in (os.path.join(plugindir, file) for file in os.listdir(plugindir)):
name = _plugin_name_from_path(path)
if name:
names.add(name)
log.debug("Looking for plugins in directory %r, %d names found",
plugindir,
len(names))
for name in sorted(names):
try:
self._load_plugin_from_directory(name, plugindir)
except Exception:
self.plugin_error(name, _("Unable to load plugin '%s'"), name, log_func=log.exception)
def _get_plugin_index_by_name(self, name):
for index, plugin in enumerate(self.plugins):
if name == plugin.module_name:
return (plugin, index)
return (None, None)
def _load_plugin_from_directory(self, name, plugindir):
module_file = None
zipfilename = os.path.join(plugindir, name + '.zip')
(zip_importer, module_name, manifest_data) = zip_import(zipfilename)
if zip_importer:
name = module_name
if not zip_importer.find_module(name):
error = _("Failed loading zipped plugin %r from %r")
self.plugin_error(name, error, name, zipfilename)
return None
module_pathname = zip_importer.get_filename(name)
else:
try:
info = imp.find_module(name, [plugindir])
module_file = info[0]
module_pathname = info[1]
except ImportError:
error = _("Failed loading plugin %r in %r")
self.plugin_error(name, error, name, [plugindir])
return None
plugin = None
try:
existing_plugin, existing_plugin_index = self._get_plugin_index_by_name(name)
if existing_plugin:
log.warning("Module %r conflict: unregistering previously"
" loaded %r version %s from %r",
existing_plugin.module_name,
existing_plugin.name,
existing_plugin.version,
existing_plugin.file)
_unregister_module_extensions(name)
full_module_name = _PLUGIN_MODULE_PREFIX + name
if zip_importer:
plugin_module = zip_importer.load_module(full_module_name)
else:
plugin_module = imp.load_module(full_module_name, *info)
plugin = PluginWrapper(plugin_module, plugindir,
file=module_pathname, manifest_data=manifest_data)
compatible_versions = _compatible_api_versions(plugin.api_versions)
if compatible_versions:
log.debug("Loading plugin %r version %s, compatible with API: %s",
plugin.name,
plugin.version,
", ".join([v.to_string(short=True) for v in
sorted(compatible_versions)]))
plugin.compatible = True
setattr(picard.plugins, name, plugin_module)
if existing_plugin:
self.plugins[existing_plugin_index] = plugin
else:
self.plugins.append(plugin)
else:
error = _("Plugin '%s' from '%s' is not compatible with this "
"version of Picard.") % (plugin.name, plugin.file)
self.plugin_error(plugin.name, error, log_func=log.warning)
except VersionError as e:
error = _("Plugin %r has an invalid API version string : %s")
self.plugin_error(name, error, name, e)
except BaseException:
error = _("Plugin %r")
self.plugin_error(name, error, name, log_func=log.exception)
if module_file is not None:
module_file.close()
return plugin
def _get_existing_paths(self, plugin_name, fileexts):
dirpath = os.path.join(self.plugins_directory, plugin_name)
if not os.path.isdir(dirpath):
dirpath = None
filenames = {plugin_name + ext for ext in fileexts}
filepaths = [os.path.join(self.plugins_directory, f)
for f in os.listdir(self.plugins_directory)
if f in filenames
]
return (dirpath, filepaths)
def _remove_plugin_files(self, plugin_name, with_update=False):
plugin_name = strip_zip_suffix(plugin_name)
log.debug("Remove plugin files and dirs : %r", plugin_name)
dirpath, filepaths = self._get_existing_paths(plugin_name, _FILEEXTS)
if dirpath:
if os.path.islink(dirpath):
log.debug("Removing symlink %r", dirpath)
os.remove(dirpath)
elif os.path.isdir(dirpath):
log.debug("Removing directory %r", dirpath)
shutil.rmtree(dirpath)
if filepaths:
for filepath in filepaths:
log.debug("Removing file %r", filepath)
os.remove(filepath)
if with_update:
update = filepath + _UPDATE_SUFFIX
if os.path.isfile(update):
log.debug("Removing file %r", update)
os.remove(update)
def _remove_plugin(self, plugin_name, with_update=False):
self._remove_plugin_files(plugin_name, with_update)
_unregister_module_extensions(plugin_name)
self.plugins = [p for p in self.plugins if p.module_name != plugin_name]
def remove_plugin(self, plugin_name, with_update=False):
self._remove_plugin(plugin_name, with_update=with_update)
self.plugin_removed.emit(plugin_name, False)
def _install_plugin_zip(self, plugin_name, plugin_data, update=False):
# zipped module from download
zip_plugin = plugin_name + '.zip'
dst = os.path.join(self.plugins_directory, zip_plugin)
if update:
dst += _UPDATE_SUFFIX
if os.path.isfile(dst):
os.remove(dst)
with tempfile.NamedTemporaryFile(dir=self.plugins_directory) as zipfile:
zipfile.write(plugin_data)
zipfile.flush()
os.fsync(zipfile.fileno())
try:
os.link(zipfile.name, dst)
except OSError:
with open(dst, 'wb') as dstfile:
zipfile.seek(0)
shutil.copyfileobj(zipfile, dstfile)
log.debug("Plugin (zipped) saved to %r", dst)
def _install_plugin_file(self, path, update=False):
dst = os.path.join(self.plugins_directory, os.path.basename(path))
if update:
dst += _UPDATE_SUFFIX
if os.path.isfile(dst):
os.remove(dst)
shutil.copy2(path, dst)
log.debug("Plugin (file) saved to %r", dst)
def _install_plugin_dir(self, plugin_name, path, update=False):
dst = os.path.join(self.plugins_directory, plugin_name)
if update:
dst += _UPDATE_SUFFIX
if os.path.isdir(dst):
shutil.rmtree(dst)
shutil.copytree(path, dst)
log.debug("Plugin (directory) saved to %r", dst)
def install_plugin(self, path, update=False, plugin_name=None, plugin_data=None):
"""
path is either:
1) /some/dir/name.py
2) /some/dir/name (directory containing __init__.py)
3) /some/dir/name.zip (containing either 1 or 2)
"""
assert path or plugin_name, "path is required if plugin_name is empty"
if not plugin_name:
plugin_name = _plugin_name_from_path(path)
if plugin_name:
try:
if plugin_data:
self._install_plugin_zip(plugin_name, plugin_data, update=update)
elif os.path.isfile(path):
self._install_plugin_file(path, update=update)
elif os.path.isdir(path):
self._install_plugin_dir(plugin_name, path, update=update)
except OSError as why:
log.error("Unable to copy plugin '%s' to %r: %s", plugin_name, self.plugins_directory, why)
return
if not update:
try:
installed_plugin = self._load_plugin_from_directory(plugin_name, self.plugins_directory)
if not installed_plugin:
raise RuntimeError("Failed loading newly installed plugin %s" % plugin_name)
except Exception as e:
log.error("Unable to load plugin '%s': %s", plugin_name, e)
self._remove_plugin(plugin_name)
else:
self.plugin_installed.emit(installed_plugin, False)
else:
self.plugin_updated.emit(plugin_name, False)
def query_available_plugins(self, callback=None):
self.tagger.webservice.get(
PLUGINS_API['host'],
PLUGINS_API['port'],
PLUGINS_API['endpoint']['plugins'],
partial(self._plugins_json_loaded, callback=callback),
priority=True,
important=True
)
def is_available(self, plugin_name):
return any(p.module_name == plugin_name for p in self._available_plugins)
def _plugins_json_loaded(self, response, reply, error, callback=None):
if error:
self.tagger.window.set_statusbar_message(
N_("Error loading plugins list: %(error)s"),
{'error': reply.errorString()},
echo=log.error
)
self._available_plugins = []
else:
try:
self._available_plugins = [PluginData(data, key) for key, data in
response['plugins'].items()
if _compatible_api_versions(data['api_versions'])]
except (AttributeError, KeyError, TypeError):
self._available_plugins = []
if callback:
callback()
# pylint: disable=no-self-use
def enabled(self, name):
return True