Simplified pipe test case, fixes tests on Windows

This commit is contained in:
Philipp Wolfer
2023-08-13 15:54:58 +02:00
parent b0f734abaf
commit 38ea9d5902

View File

@@ -3,6 +3,7 @@
# Picard, the next-generation MusicBrainz tagger
#
# Copyright (C) 2022 skelly37
# Copyright (C) 2023 Philipp Wolfer
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
@@ -34,16 +35,6 @@ def pipe_listener(pipe_handler):
return message
def pipe_writer(pipe_handler, to_send):
if not to_send:
return False
while not pipe_handler.send_to_pipe(to_send):
pass
return True
class TestPipe(PicardTestCase):
# some random name that is different on each run
NAME = str(uuid.uuid4())
@@ -68,27 +59,21 @@ class TestPipe(PicardTestCase):
"https://musicbrainz.org/recording/7cd3782d-86dc-4dd1-8d9b-e37f9cbe6b94",
)
pipe_listener_handler = pipe.Pipe(self.NAME, self.VERSION)
if pipe_listener_handler.path_was_forced:
pipe_writer_handler = pipe.Pipe(self.NAME, self.VERSION, args=None, forced_path=pipe_listener_handler.path)
else:
pipe_writer_handler = pipe.Pipe(self.NAME, self.VERSION)
__pool = concurrent.futures.ThreadPoolExecutor()
for message in to_send:
plistener = __pool.submit(pipe_listener, pipe_listener_handler)
pwriter = __pool.submit(pipe_writer, pipe_writer_handler, message)
__pool = concurrent.futures.ThreadPoolExecutor()
pipe_handler = pipe.Pipe(self.NAME, self.VERSION)
plistener = __pool.submit(pipe_listener, pipe_handler)
res = ""
# handle the write/read processes
try:
pipe_handler.send_to_pipe(message)
res = plistener.result(timeout=6)
except concurrent.futures._base.TimeoutError:
pipe_writer_handler.send_to_pipe(pipe_writer_handler.MESSAGE_TO_IGNORE)
try:
pwriter.result(timeout=0.01)
except concurrent.futures._base.TimeoutError:
pipe_listener_handler.read_from_pipe()
pass
finally:
pipe_handler.stop()
__pool.shutdown()
self.assertEqual(res, message,
"Data is sent and read correctly")