mirror of
https://github.com/fergalmoran/picard.git
synced 2025-12-22 09:18:18 +00:00
Refactor to create new RemoteCommands class
This commit is contained in:
189
picard/tagger.py
189
picard/tagger.py
@@ -50,9 +50,7 @@ from hashlib import md5
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
import queue
|
||||
import re
|
||||
import shlex
|
||||
import shutil
|
||||
import signal
|
||||
import sys
|
||||
@@ -137,6 +135,10 @@ from picard.util import (
|
||||
)
|
||||
from picard.util.cdrom import get_cdrom_drives
|
||||
from picard.util.checkupdate import UpdateCheckManager
|
||||
from picard.util.remotecommands import (
|
||||
REMOTE_COMMANDS,
|
||||
RemoteCommands,
|
||||
)
|
||||
from picard.webservice import WebService
|
||||
from picard.webservice.api_helpers import (
|
||||
AcoustIdAPIHelper,
|
||||
@@ -179,25 +181,6 @@ def plugin_dirs():
|
||||
yield USER_PLUGIN_DIR
|
||||
|
||||
|
||||
class CommandFiles:
|
||||
_command_files = set()
|
||||
|
||||
# Maintain a flag to indicate whether a 'QUIT' command has been queued
|
||||
quit_flag = False
|
||||
|
||||
@classmethod
|
||||
def contains(cls, filepath):
|
||||
return filepath in cls._command_files
|
||||
|
||||
@classmethod
|
||||
def add(cls, filepath):
|
||||
cls._command_files.add(filepath)
|
||||
|
||||
@classmethod
|
||||
def remove(cls, filepath):
|
||||
cls._command_files.discard(filepath)
|
||||
|
||||
|
||||
class ParseItemsToLoad:
|
||||
|
||||
WINDOWS_DRIVE_TEST = re.compile(r"^[a-z]\:", re.IGNORECASE)
|
||||
@@ -241,100 +224,6 @@ class ParseItemsToLoad:
|
||||
return f"files: {repr(self.files)} mbids: f{repr(self.mbids)} urls: {repr(self.urls)} commands: {repr(self.commands)}"
|
||||
|
||||
|
||||
class RemoteCommand:
|
||||
def __init__(self, method_name, help_text=None, help_args=None):
|
||||
self.method_name = method_name
|
||||
self.help_text = help_text or ""
|
||||
self.help_args = help_args or ""
|
||||
|
||||
|
||||
REMOTE_COMMANDS = {
|
||||
"CLEAR_LOGS": RemoteCommand(
|
||||
"handle_command_clear_logs",
|
||||
help_text="Clear the Picard logs",
|
||||
),
|
||||
"CLUSTER": RemoteCommand(
|
||||
"handle_command_cluster",
|
||||
help_text="Cluster all files in the cluster pane.",
|
||||
),
|
||||
"FINGERPRINT": RemoteCommand(
|
||||
"handle_command_fingerprint",
|
||||
help_text="Calculate acoustic fingerprints for all (matched) files in the album pane.",
|
||||
),
|
||||
"FROM_FILE": RemoteCommand(
|
||||
"handle_command_from_file",
|
||||
help_text="Load command pipeline from a file.",
|
||||
help_args="[Absolute path to a file containing command pipeline]",
|
||||
),
|
||||
"LOAD": RemoteCommand(
|
||||
"handle_command_load",
|
||||
help_text="Load 1 or more files/MBIDs/URLs to Picard.",
|
||||
help_args="[supported MBID/URL or absolute path to a file]",
|
||||
),
|
||||
"LOOKUP": RemoteCommand(
|
||||
"handle_command_lookup",
|
||||
help_text="Lookup files in the clustering pane. Defaults to all files.",
|
||||
help_args="[clustered|unclustered|all]"
|
||||
),
|
||||
"LOOKUP_CD": RemoteCommand(
|
||||
"handle_command_lookup_cd",
|
||||
help_text="Read CD from the selected drive and lookup on MusicBrainz. "
|
||||
"Without argument, it defaults to the first (alphabetically) available disc drive",
|
||||
help_args="[device/log file]",
|
||||
),
|
||||
"QUIT": RemoteCommand(
|
||||
"handle_command_quit",
|
||||
help_text="Exit the running instance of Picard.",
|
||||
),
|
||||
"REMOVE": RemoteCommand(
|
||||
"handle_command_remove",
|
||||
help_text="Remove the file from Picard. Do nothing if no arguments provided.",
|
||||
help_args="[absolute path to 1 or more files]",
|
||||
),
|
||||
"REMOVE_ALL": RemoteCommand(
|
||||
"handle_command_remove_all",
|
||||
help_text="Remove all files from Picard.",
|
||||
),
|
||||
"REMOVE_EMPTY": RemoteCommand(
|
||||
"handle_command_remove_empty",
|
||||
help_text="Remove all empty clusters and albums.",
|
||||
),
|
||||
"REMOVE_SAVED": RemoteCommand(
|
||||
"handle_command_remove_saved",
|
||||
help_text="Remove all saved releases from the album pane.",
|
||||
),
|
||||
"REMOVE_UNCLUSTERED": RemoteCommand(
|
||||
"handle_command_remove_unclustered",
|
||||
help_text="Remove all unclustered files from the cluster pane.",
|
||||
),
|
||||
"SAVE_MATCHED": RemoteCommand(
|
||||
"handle_command_save_matched",
|
||||
help_text="Save all matched releases from the album pane."
|
||||
),
|
||||
"SAVE_MODIFIED": RemoteCommand(
|
||||
"handle_command_save_modified",
|
||||
help_text="Save all modified files from the album pane.",
|
||||
),
|
||||
"SCAN": RemoteCommand(
|
||||
"handle_command_scan",
|
||||
help_text="Scan all files in the cluster pane.",
|
||||
),
|
||||
"SHOW": RemoteCommand(
|
||||
"handle_command_show",
|
||||
help_text="Make the running instance the currently active window.",
|
||||
),
|
||||
"SUBMIT_FINGERPRINTS": RemoteCommand(
|
||||
"handle_command_submit_fingerprints",
|
||||
help_text="Submit outstanding acoustic fingerprints for all (matched) files in the album pane.",
|
||||
),
|
||||
"WRITE_LOGS": RemoteCommand(
|
||||
"handle_command_write_logs",
|
||||
help_text="Write Picard logs to a given path.",
|
||||
help_args="[absolute path to 1 file]",
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
class Tagger(QtWidgets.QApplication):
|
||||
|
||||
tagger_stats_changed = QtCore.pyqtSignal()
|
||||
@@ -349,9 +238,6 @@ class Tagger(QtWidgets.QApplication):
|
||||
_debug = False
|
||||
_no_restore = False
|
||||
|
||||
command_queue = queue.Queue()
|
||||
command_running = False
|
||||
|
||||
def __init__(self, picard_args, localedir, autoupdate, pipe_handler=None):
|
||||
|
||||
super().__init__(sys.argv)
|
||||
@@ -506,7 +392,7 @@ class Tagger(QtWidgets.QApplication):
|
||||
log.debug('pipe server stopped')
|
||||
|
||||
def clear_command_running(self, *args, **kwargs):
|
||||
self.command_running = False
|
||||
RemoteCommands.set_running(False)
|
||||
|
||||
def run_commands(self):
|
||||
# Provide a set of commands that should not automatically release the 'self.command_running'
|
||||
@@ -517,13 +403,13 @@ class Tagger(QtWidgets.QApplication):
|
||||
# no_auto_complete = {'LOAD', 'CLUSTER'}
|
||||
|
||||
while not self.stopping:
|
||||
if not self.command_queue.empty() and not self.command_running:
|
||||
(cmd, arg) = self.command_queue.get()
|
||||
if not RemoteCommands.command_queue.empty() and not RemoteCommands.get_running():
|
||||
(cmd, arg) = RemoteCommands.command_queue.get()
|
||||
cmd = cmd.upper()
|
||||
arg = arg.strip()
|
||||
if cmd in self.commands:
|
||||
log.debug("Executing command: %s %r", cmd, arg)
|
||||
self.command_running = True
|
||||
RemoteCommands.set_running(True)
|
||||
|
||||
# FIXME: Find a way to execute each command using a block to ensure completion
|
||||
# before executing the next command. Perhaps track completion of all threads
|
||||
@@ -537,7 +423,7 @@ class Tagger(QtWidgets.QApplication):
|
||||
self.clear_command_running()
|
||||
else:
|
||||
log.error("Unknown command: %r", cmd)
|
||||
self.command_queue.task_done()
|
||||
RemoteCommands.command_queue.task_done()
|
||||
|
||||
def _run_commands_finished(self, result=None, error=None):
|
||||
if error:
|
||||
@@ -545,51 +431,13 @@ class Tagger(QtWidgets.QApplication):
|
||||
else:
|
||||
log.debug('command executor stopped')
|
||||
|
||||
def load_to_picard(self, items):
|
||||
@staticmethod
|
||||
def load_to_picard(items):
|
||||
commands = []
|
||||
for item in items:
|
||||
parts = str(item).split(maxsplit=1)
|
||||
commands.append((parts[0], parts[1:] or ['']))
|
||||
self.parse_commands_to_queue(commands)
|
||||
|
||||
def parse_commands_to_queue(self, commands):
|
||||
# Don't queue any more commands after a QUIT command.
|
||||
if CommandFiles.quit_flag:
|
||||
return
|
||||
for (cmd, cmdargs) in commands:
|
||||
cmd = cmd.upper()
|
||||
if cmd not in REMOTE_COMMANDS:
|
||||
log.error("Unknown command: %s", cmd)
|
||||
continue
|
||||
for cmd_arg in cmdargs or ['']:
|
||||
if cmd == 'FROM_FILE':
|
||||
self.handle_command_from_file(cmd_arg)
|
||||
else:
|
||||
log.debug(f"Queueing command: {cmd} {repr(cmd_arg)}")
|
||||
self.command_queue.put([cmd, cmd_arg])
|
||||
# Set flag so as to not queue any more commands after a QUIT command.
|
||||
if cmd == 'QUIT':
|
||||
CommandFiles.quit_flag = True
|
||||
return
|
||||
|
||||
@staticmethod
|
||||
def _read_commands_from_file(filepath):
|
||||
commands = []
|
||||
try:
|
||||
lines = open(filepath).readlines()
|
||||
except Exception as e:
|
||||
log.error("Error reading command file '%s': %s" % (filepath, e))
|
||||
return commands
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
if not line or line.startswith('#'):
|
||||
continue
|
||||
elements = shlex.split(line)
|
||||
if not elements:
|
||||
continue
|
||||
command_args = elements[1:] or ['']
|
||||
commands.append((elements[0], command_args))
|
||||
return commands
|
||||
RemoteCommands.parse_commands_to_queue(commands)
|
||||
|
||||
def iter_album_files(self):
|
||||
for album in self.albums.values():
|
||||
@@ -625,19 +473,10 @@ class Tagger(QtWidgets.QApplication):
|
||||
self.analyze(self.albums[album_name].iterfiles())
|
||||
|
||||
def handle_command_from_file(self, argstring):
|
||||
log.debug("Reading commands from: %r", argstring)
|
||||
if not os.path.exists(argstring):
|
||||
log.error("Missing command file: '%s'", argstring)
|
||||
return
|
||||
filepath = os.path.abspath(argstring)
|
||||
if CommandFiles.contains(filepath):
|
||||
log.warning("Circular command file reference ignored: '%s'", argstring)
|
||||
return
|
||||
CommandFiles.add(filepath)
|
||||
self.parse_commands_to_queue(self._read_commands_from_file(filepath))
|
||||
CommandFiles.remove(filepath)
|
||||
RemoteCommands.get_commands_from_file(argstring)
|
||||
|
||||
def handle_command_load(self, argstring):
|
||||
# TODO: Remove this check if "command://" no longer used.
|
||||
if argstring.startswith("command://"):
|
||||
log.error("Cannot LOAD a command: %s", argstring)
|
||||
return
|
||||
|
||||
255
picard/util/remotecommands.py
Normal file
255
picard/util/remotecommands.py
Normal file
@@ -0,0 +1,255 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Picard, the next-generation MusicBrainz tagger
|
||||
#
|
||||
# Copyright (C) 2022 Bob Swift
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU General Public License
|
||||
# as published by the Free Software Foundation; either version 2
|
||||
# of the License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, write to the Free Software
|
||||
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
|
||||
import datetime
|
||||
import os
|
||||
import queue
|
||||
import shlex
|
||||
import threading
|
||||
import time
|
||||
|
||||
from picard import log
|
||||
|
||||
|
||||
class RemoteCommand:
|
||||
def __init__(self, method_name, help_text=None, help_args=None):
|
||||
self.method_name = method_name
|
||||
self.help_text = help_text or ""
|
||||
self.help_args = help_args or ""
|
||||
|
||||
|
||||
REMOTE_COMMANDS = {
|
||||
"CLEAR_LOGS": RemoteCommand(
|
||||
"handle_command_clear_logs",
|
||||
help_text="Clear the Picard logs",
|
||||
),
|
||||
"CLUSTER": RemoteCommand(
|
||||
"handle_command_cluster",
|
||||
help_text="Cluster all files in the cluster pane.",
|
||||
),
|
||||
"FINGERPRINT": RemoteCommand(
|
||||
"handle_command_fingerprint",
|
||||
help_text="Calculate acoustic fingerprints for all (matched) files in the album pane.",
|
||||
),
|
||||
"FROM_FILE": RemoteCommand(
|
||||
"handle_command_from_file",
|
||||
help_text="Load command pipeline from a file.",
|
||||
help_args="[Absolute path to a file containing command pipeline]",
|
||||
),
|
||||
"LOAD": RemoteCommand(
|
||||
"handle_command_load",
|
||||
help_text="Load 1 or more files/MBIDs/URLs to Picard.",
|
||||
help_args="[supported MBID/URL or absolute path to a file]",
|
||||
),
|
||||
"LOOKUP": RemoteCommand(
|
||||
"handle_command_lookup",
|
||||
help_text="Lookup files in the clustering pane. Defaults to all files.",
|
||||
help_args="[clustered|unclustered|all]"
|
||||
),
|
||||
"LOOKUP_CD": RemoteCommand(
|
||||
"handle_command_lookup_cd",
|
||||
help_text="Read CD from the selected drive and lookup on MusicBrainz. "
|
||||
"Without argument, it defaults to the first (alphabetically) available disc drive",
|
||||
help_args="[device/log file]",
|
||||
),
|
||||
"QUIT": RemoteCommand(
|
||||
"handle_command_quit",
|
||||
help_text="Exit the running instance of Picard.",
|
||||
),
|
||||
"REMOVE": RemoteCommand(
|
||||
"handle_command_remove",
|
||||
help_text="Remove the file from Picard. Do nothing if no arguments provided.",
|
||||
help_args="[absolute path to 1 or more files]",
|
||||
),
|
||||
"REMOVE_ALL": RemoteCommand(
|
||||
"handle_command_remove_all",
|
||||
help_text="Remove all files from Picard.",
|
||||
),
|
||||
"REMOVE_EMPTY": RemoteCommand(
|
||||
"handle_command_remove_empty",
|
||||
help_text="Remove all empty clusters and albums.",
|
||||
),
|
||||
"REMOVE_SAVED": RemoteCommand(
|
||||
"handle_command_remove_saved",
|
||||
help_text="Remove all saved releases from the album pane.",
|
||||
),
|
||||
"REMOVE_UNCLUSTERED": RemoteCommand(
|
||||
"handle_command_remove_unclustered",
|
||||
help_text="Remove all unclustered files from the cluster pane.",
|
||||
),
|
||||
"SAVE_MATCHED": RemoteCommand(
|
||||
"handle_command_save_matched",
|
||||
help_text="Save all matched releases from the album pane."
|
||||
),
|
||||
"SAVE_MODIFIED": RemoteCommand(
|
||||
"handle_command_save_modified",
|
||||
help_text="Save all modified files from the album pane.",
|
||||
),
|
||||
"SCAN": RemoteCommand(
|
||||
"handle_command_scan",
|
||||
help_text="Scan all files in the cluster pane.",
|
||||
),
|
||||
"SHOW": RemoteCommand(
|
||||
"handle_command_show",
|
||||
help_text="Make the running instance the currently active window.",
|
||||
),
|
||||
"SUBMIT_FINGERPRINTS": RemoteCommand(
|
||||
"handle_command_submit_fingerprints",
|
||||
help_text="Submit outstanding acoustic fingerprints for all (matched) files in the album pane.",
|
||||
),
|
||||
"WRITE_LOGS": RemoteCommand(
|
||||
"handle_command_write_logs",
|
||||
help_text="Write Picard logs to a given path.",
|
||||
help_args="[absolute path to 1 file]",
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
class RemoteCommands:
|
||||
# Collection of command files currently being parsed
|
||||
_command_files = set()
|
||||
|
||||
# Flag to indicate whether a 'QUIT' command has been queued
|
||||
_has_quit = False
|
||||
|
||||
# Flag to indicate whether a command is currently running
|
||||
_command_running = False
|
||||
|
||||
_lock = threading.Lock()
|
||||
command_queue = queue.Queue()
|
||||
_threads = set()
|
||||
|
||||
@classmethod
|
||||
def cmd_files_contains(cls, filepath):
|
||||
with cls._lock:
|
||||
return filepath in cls._command_files
|
||||
|
||||
@classmethod
|
||||
def cmd_files_add(cls, filepath):
|
||||
with cls._lock:
|
||||
cls._command_files.add(filepath)
|
||||
|
||||
@classmethod
|
||||
def cmd_files_remove(cls, filepath):
|
||||
with cls._lock:
|
||||
cls._command_files.discard(filepath)
|
||||
|
||||
@classmethod
|
||||
def has_quit(cls):
|
||||
with cls._lock:
|
||||
return cls._has_quit
|
||||
|
||||
@classmethod
|
||||
def set_quit(cls, value):
|
||||
with cls._lock:
|
||||
cls._has_quit = value
|
||||
|
||||
@classmethod
|
||||
def thread_add(cls, thread_id):
|
||||
with cls._lock:
|
||||
cls._threads.add(thread_id)
|
||||
|
||||
@classmethod
|
||||
def thread_remove(cls, thread_id):
|
||||
with cls._lock:
|
||||
cls._threads.discard(thread_id)
|
||||
|
||||
@classmethod
|
||||
def processing(cls):
|
||||
with cls._lock:
|
||||
return (len(cls._threads) > 0)
|
||||
|
||||
@classmethod
|
||||
def get_running(cls):
|
||||
with cls._lock:
|
||||
return cls._command_running
|
||||
|
||||
@classmethod
|
||||
def set_running(cls, value):
|
||||
with cls._lock:
|
||||
cls._command_running = value
|
||||
|
||||
@classmethod
|
||||
def clear_command_running(cls, force=False, timeout=None):
|
||||
end_time = datetime.datetime.now() + datetime.timedelta(seconds=timeout) if timeout else None
|
||||
while True:
|
||||
if not cls.processing() or force or (timeout and datetime.datetime.now() > end_time):
|
||||
with cls._lock:
|
||||
cls._command_running = False
|
||||
cls._threads = set()
|
||||
break
|
||||
time.sleep(.01)
|
||||
|
||||
@classmethod
|
||||
def parse_commands_to_queue(cls, commands):
|
||||
if cls.has_quit():
|
||||
# Don't queue any more commands after a QUIT command.
|
||||
print(f"has_quit = {cls.has_quit()}\n\n")
|
||||
return
|
||||
|
||||
for (cmd, cmdargs) in commands:
|
||||
cmd = cmd.upper()
|
||||
if cmd not in REMOTE_COMMANDS:
|
||||
log.error("Unknown command: %s", cmd)
|
||||
continue
|
||||
for cmd_arg in cmdargs or ['']:
|
||||
if cmd == 'FROM_FILE':
|
||||
cls.get_commands_from_file(cmd_arg)
|
||||
else:
|
||||
log.debug(f"Queueing command: {cmd} {repr(cmd_arg)}")
|
||||
cls.command_queue.put([cmd, cmd_arg])
|
||||
|
||||
# Set flag so as to not queue any more commands after a QUIT command.
|
||||
if cmd == 'QUIT':
|
||||
cls.set_quit(True)
|
||||
return
|
||||
|
||||
@staticmethod
|
||||
def _read_commands_from_file(filepath):
|
||||
commands = []
|
||||
try:
|
||||
lines = open(filepath).readlines()
|
||||
except Exception as e:
|
||||
log.error("Error reading command file '%s': %s" % (filepath, e))
|
||||
return commands
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
if not line or line.startswith('#'):
|
||||
continue
|
||||
elements = shlex.split(line)
|
||||
if not elements:
|
||||
continue
|
||||
command_args = elements[1:] or ['']
|
||||
commands.append((elements[0], command_args))
|
||||
return commands
|
||||
|
||||
@classmethod
|
||||
def get_commands_from_file(cls, argstring):
|
||||
log.debug("Reading commands from: %r", argstring)
|
||||
if not os.path.exists(argstring):
|
||||
log.error("Missing command file: '%s'", argstring)
|
||||
return
|
||||
filepath = os.path.abspath(argstring)
|
||||
if cls.cmd_files_contains(filepath):
|
||||
log.warning("Circular command file reference ignored: '%s'", argstring)
|
||||
return
|
||||
cls.cmd_files_add(filepath)
|
||||
cls.parse_commands_to_queue(cls._read_commands_from_file(filepath))
|
||||
cls.cmd_files_remove(filepath)
|
||||
@@ -1,11 +1,15 @@
|
||||
# should be split into 2 commands
|
||||
LOAD file1.mp3 file2.mp3
|
||||
|
||||
# should be added as one
|
||||
LOAD file3.mp3
|
||||
FROM_FILE command_file.txt
|
||||
CLUSTER unclustered
|
||||
FINGERPRINT
|
||||
|
||||
# should be ignored because circular reference
|
||||
FROM_FILE test/data/test-command-file-1.txt
|
||||
|
||||
# should be ignored
|
||||
|
||||
|
||||
#commented command
|
||||
|
||||
FROM_FILE test/data/test-command-file-2.txt
|
||||
|
||||
10
test/data/test-command-file-2.txt
Normal file
10
test/data/test-command-file-2.txt
Normal file
@@ -0,0 +1,10 @@
|
||||
# should be ignored because missing
|
||||
FROM_FILE command_file.txt
|
||||
|
||||
CLUSTER
|
||||
FINGERPRINT
|
||||
LOOKUP unclustered
|
||||
QUIT
|
||||
|
||||
# should be ignored because after QUIT command
|
||||
LOOKUP clustered
|
||||
@@ -1,6 +1,27 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
# Picard, the next-generation MusicBrainz tagger
|
||||
#
|
||||
# Copyright (C) 2022 skelly37
|
||||
# Copyright (C) 2022 Bob Swift
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or
|
||||
# modify it under the terms of the GNU General Public License
|
||||
# as published by the Free Software Foundation; either version 2
|
||||
# of the License, or (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this program; if not, write to the Free Software
|
||||
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
|
||||
from test.picardtestcase import PicardTestCase
|
||||
|
||||
from picard.tagger import Tagger
|
||||
from picard.util.remotecommands import RemoteCommands
|
||||
|
||||
|
||||
class TestParsingFilesWithCommands(PicardTestCase):
|
||||
@@ -10,28 +31,38 @@ class TestParsingFilesWithCommands(PicardTestCase):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.result = []
|
||||
for (cmd, cmdargs) in Tagger._read_commands_from_file(self.TEST_FILE):
|
||||
for cmd_arg in cmdargs or ['']:
|
||||
self.result.append(f"{cmd} {cmd_arg}")
|
||||
RemoteCommands.set_quit(False)
|
||||
RemoteCommands.get_commands_from_file(self.TEST_FILE)
|
||||
while not RemoteCommands.command_queue.empty():
|
||||
(cmd, arg) = RemoteCommands.command_queue.get()
|
||||
self.result.append(f"{cmd} {arg}")
|
||||
RemoteCommands.command_queue.task_done()
|
||||
|
||||
def test_no_argument_command(self):
|
||||
self.assertIn("CLUSTER unclustered", self.result)
|
||||
self.assertIn("CLUSTER ", self.result)
|
||||
|
||||
def test_no_argument_command_stripped_correctly(self):
|
||||
self.assertIn("FINGERPRINT ", self.result)
|
||||
|
||||
def test_single_argument_command(self):
|
||||
self.assertIn("FROM_FILE command_file.txt", self.result)
|
||||
self.assertIn("LOAD file3.mp3", self.result)
|
||||
|
||||
def test_multiple_arguments_command(self):
|
||||
self.assertIn("LOAD file1.mp3", self.result)
|
||||
self.assertIn("LOAD file2.mp3", self.result)
|
||||
|
||||
def test_from_file_command_parsed(self):
|
||||
self.assertNotIn("FROM_FILE command_file.txt", self.result)
|
||||
self.assertNotIn("FROM_FILE test/data/test-command-file-1.txt", self.result)
|
||||
self.assertNotIn("FROM_FILE test/data/test-command-file-2.txt", self.result)
|
||||
|
||||
def test_noting_added_after_quit(self):
|
||||
self.assertNotIn("LOOKUP clustered", self.result)
|
||||
|
||||
def test_empty_lines(self):
|
||||
self.assertNotIn(" ", self.result)
|
||||
self.assertNotIn("", self.result)
|
||||
self.assertEqual(len(self.result), 6)
|
||||
self.assertEqual(len(self.result), 7)
|
||||
|
||||
def test_commented_lines(self):
|
||||
self.assertNotIn("#commented command", self.result)
|
||||
|
||||
Reference in New Issue
Block a user