Source code for utopya.task

"""The Task class supplies a container for all information needed for a task.

The WorkerTask and ProcessTask classes specialize on tasks for the
WorkerManager that work on subprocesses or multiprocessing processes.
"""

import copy
import io
import logging
import multiprocessing
import os
import queue
import re
import signal
import subprocess
import threading
import time
import uuid
import warnings
from functools import partial
from typing import Callable, Dict, Generator, List, Sequence, Set, Tuple, Union
from typing.io import TextIO

import numpy as np

from ._signal import SIGMAP
from .tools import yaml

log = logging.getLogger(__name__)

_ANSI_ESCAPE = re.compile(r"\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])")
"""A regex pattern to remove ANSI escape characters, needed for stream saving

From: https://stackoverflow.com/a/14693789/1827608
"""


# -----------------------------------------------------------------------------
# Helper methods
# These solely relate to the WorkerTask and similar classes, and thus are not
# implemented in the tools module.


[docs]def _follow( f: io.TextIOWrapper, delay: float = 0.05, should_stop: Callable = lambda: False, ) -> Generator[str, None, None]: """Generator that follows the output written to the given stream object and yields each new line written to it. If no output is retrieved, there will be a delay to reduce processor load. The ``should_stop`` argument may be a callable that will lead to breaking out of the waiting loop. If it is not given, the loop will only break if reading from the stream ``f`` is no longer possible, e.g. because it was closed. """ while not should_stop(): try: line = f.readline() except: # Stream was closed or is otherwise not readable, end generator return if not line: time.sleep(delay) continue yield line
[docs]def enqueue_lines( *, queue: queue.Queue, stream: TextIO, follow: bool = False, parse_func: Callable = None, ) -> None: """From the given text stream, read line-buffered lines and add them to the provided queue as 2-tuples, (line, parsed object). This function is meant to be passed to an individual thread in which it can read individual lines separately from the main thread. Before exiting this function, the stream is closed. Args: queue (queue.Queue): The queue object to put the read line and parsed objects into. stream (TextIO): The stream identifier. If this is not a text stream, be aware that the elements added to the queue might need decoding. follow (bool, optional): If instead of ``iter(stream.readline)``, the :py:func:`~utopya.task._follow` function should be used instead. This should be selected if the stream is file-like instead of ``sys.stdout``-like. parse_func (Callable, optional): A parse function that the read line is passed through. This should be a unary function that either returns a successfully parsed line or None. """ # Define a pass-through parse function, if none was given parse_func = parse_func if parse_func else lambda _: None # If this is a buffered stream (like subprocess.Popen.stdout), we can use a # simple iterator that will not hang up. If it is a file-based stream (e.g. # when reading from a file), we need to follow the file similar to how # `tail -f` does it ... if follow: # Get the current thread to allow stopping to follow ct = threading.current_thread() should_stop = lambda: getattr(ct, "stop_follow", False) it = _follow(stream, should_stop=should_stop) else: it = iter(stream.readline, "") # Read the lines and put them into the queue for line in it: # <-- thread waits here for a new line, w/o idle looping # Got a new line # Strip the whitespace on the right (e.g. the new-line character) line = line.rstrip() # Add it to the queue as tuples: (string, parsed object), where the # parsed object can also be None queue.put_nowait((line, parse_func(line)))
# Thread dies here. # Custom parse methods ........................................................
[docs]def parse_yaml_dict( line: str, *, start_str: str = "!!map" ) -> Union[None, dict]: """A yaml parse function that can be passed to enqueue_lines. It only tries parsing the line if it starts with the provided start string. It tries to decode the line, and parse it as a yaml. If that fails, it will still try to decode the string. If that fails yet again, the unchanged line will be returned. Args: line (str): The line to decode, assumed byte-string, utf8-encoded start_str (str, optional): Description Returns: Union[None, dict]: either the decoded dict, or, if that failed: """ # Check if it should be attempted to parse this line if not line.startswith(start_str): # Nope, return None return None # Try to load the object, ensuring it is a dict try: obj = dict(yaml.load(line)) except Exception as err: # Failed to do that, regardless why; be verbose about it log.warning( "Got %s while trying to parse line '%s': %s", err.__class__.__name__, line, err, ) return None # Was able to parse it. Return the parsed object. return obj
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
[docs]class Task: """The Task is a container for a task handled by the WorkerManager. It aims to provide the necessary interfaces for the WorkerManager to easily associate tasks with the corresponding workers and vice versa. """ __slots__ = ( "_name", "_priority", "_uid", "_progress_func", "_stop_conditions", "callbacks", )
[docs] def __init__( self, *, name: str = None, priority: float = None, callbacks: Dict[str, Callable] = None, progress_func: Callable = None, ): """Initialize a Task object. Args: name (str, optional): The task's name. If none is given, the generated uuid will be used. priority (float, optional): The priority of this task; if None, default is +np.inf, i.e. the lowest priority. If two priority values are the same, the task created earlier has a higher priority. callbacks (Dict[str, Callable], optional): A dict of callback funcs that are called at different points of the life of this task. The function gets passed as only argument this task object. progress_func (Callable, optional): Invoked by the ``progress`` property and used to calculate the progress given the current task object as argument """ self._name = str(name) if name is not None else None self._priority = priority if priority is not None else np.inf self._uid = uuid.uuid1() self.callbacks = callbacks self._progress_func = progress_func self._stop_conditions = set() log.debug( "Initialized Task '%s'.\n Priority: %s, UID: %s.", self.name, self.priority, self.uid, )
# Properties .............................................................. @property def name(self) -> str: """The task's name, if given; else the uid.""" if self._name is not None: return self._name return str(self.uid) @property def uid(self) -> int: """The task's unique ID""" return self._uid @property def priority(self) -> float: """The task's priority. Default is +inf, which is the lowest priority""" return self._priority @property def order_tuple(self) -> tuple: """Returns the ordering tuple (priority, uid.time)""" return (self.priority, self.uid.time) @property def progress(self) -> float: """If a progress function is given, invokes it; otherwise returns 0 This also performs checks that the progress is in [0, 1] """ if self._progress_func is None: return 0.0 progress = self._progress_func(self) if progress >= 0 and progress <= 1: return progress raise ValueError( f"The progres function {self._progress_func.__name__} of " f"task '{self.name}' returned a value outside of the " "allowed range [0, 1]!" ) @property def fulfilled_stop_conditions(self) -> Set["StopCondition"]: """The set of *fulfilled* stop conditions for this task. Typically, this is set by the :py:class:`~utopya.stop_conditions.StopCondition` itself as part of its evaluation in its :py:meth:`~utopya.stop_conditions.StopCondition.fulfilled` method. """ return self._stop_conditions # Magic methods ........................................................... # ... including rich comparisons, needed in PriorityQueue def __hash__(self) -> int: return hash(self.uid) def __str__(self) -> str: return f"Task<uid: {self.uid}, priority: {self.priority}>" def __lt__(self, other) -> bool: return bool(self.order_tuple < other.order_tuple) def __le__(self, other) -> bool: return bool(self.order_tuple <= other.order_tuple)
[docs] def __eq__(self, other) -> bool: """Evaluates equality of two tasks: returns true only if identical. .. note:: We trust that the unique ID of each task (generated with ``uuid``) is really unique, therefore different tasks can never be fully equivalent. """ return bool(self is other)
# Private methods .........................................................
[docs] def _invoke_callback(self, name: str): """If given, invokes the callback function with the name `name`. .. note:: In order to have higher flexibility, this will *not* raise errors or warnings if there was no callback function specified with the given name. """ if self.callbacks and name in self.callbacks: self.callbacks[name](self)
# ----------------------------------------------------------------------------- # ... working with subprocess
[docs]class WorkerTask(Task): """A specialisation of :py:class:`~utopya.task.Task` for use in the :py:class:`~utopya.workermanager.WorkerManager`. It is able to spawn a worker process using ``subprocess.Popen``, executing the task in a non-blocking manner. At the same time, the worker's stream can be read in via another non-blocking thread and stream information can be parsed. Furthermore, this class provides most of the interface for signalling the spawned process. For an equivalent class that uses ``multiprocessing`` instead of ``subprocess``, see the derived :py:class:`~utopya.task.MPProcessTask`. """ # Extend the slots of the Task class with some WorkerTask-specific slots __slots__ = ( "setup_func", "setup_kwargs", "worker_kwargs", "_worker", "_worker_pid", "_worker_status", "streams", "profiling", ) # Stream parser functions, used to generate Python objects from the # streams read from the worker. Resolved objects are stored in the # corresponding entry of the ``streams`` attribute. STREAM_PARSE_FUNCS = dict(default=None, yaml_dict=parse_yaml_dict)
[docs] def __init__( self, *, setup_func: Callable = None, setup_kwargs: dict = None, worker_kwargs: dict = None, **task_kwargs, ): """Initialize a WorkerTask. This is a specialization of :py:class:`~utopya.task.Task` for use in the :py:class:`~utopya.workermanager.WorkerManager`. Args: setup_func (Callable, optional): The setup function to use before this task is spawned; this allows to dynamically handle the worker arguments. It is called with the ``worker_kwargs`` keyword argument, containing the dict passed here. Additionally, ``setup_kwargs`` are unpacked into the funtion call. The function should return a dict that is then used as ``worker_kwargs`` for the individual task. setup_kwargs (dict, optional): The keyword arguments unpacked into the ``setup_func`` call. worker_kwargs (dict, optional): The keyword arguments needed to spawn the worker. Note that these are also passed to ``setup_func`` and, if a ``setup_func`` is given, the return value of that function will be used for the ``worker_kwargs``. **task_kwargs: Arguments to be passed to :py:meth:`~utopya.task.Task.__init__`, including the callbacks dictionary among other things. Raises: ValueError: If neither ``setup_func`` nor ``worker_kwargs`` were given, thus lacking information on how to spawn the worker. """ super().__init__(**task_kwargs) if setup_func: setup_kwargs = setup_kwargs if setup_kwargs else dict() elif worker_kwargs: if setup_kwargs: warnings.warn( "`worker_kwargs` given but also `setup_kwargs` " "specified; the latter will be ignored. Did " "you mean to call a setup function? If yes, " "pass it via the `setup_func` argument.", UserWarning, ) else: raise ValueError( "Need either argument `setup_func` or " "`worker_kwargs`, got none of those." ) self.setup_func = setup_func self.setup_kwargs = copy.deepcopy(setup_kwargs) self.worker_kwargs = copy.deepcopy(worker_kwargs) self._worker = None self._worker_pid = None self._worker_status = None self.streams = dict() self.profiling = dict() log.debug( "Finished setting up task '%s' as a %s.", self.name, self.__class__.__name__, ) log.debug(" With setup function? %s", bool(setup_func))
# Properties .............................................................. @property def worker(self) -> subprocess.Popen: """The associated worker process object or None, if not yet created.""" return self._worker @worker.setter def worker(self, proc: subprocess.Popen): """Set the associated worker process of this task. This can only be done once. Args: proc (subprocess.Popen): The process to associate with this task. Raises: RuntimeError: If a process was already associated. """ if self.worker is not None: raise RuntimeError( "A worker process was already associated with " "this task; cannot change it!" ) self._worker = proc self._worker_pid = proc.pid log.debug( "Task %s: associated with worker process %d.", self.name, self.worker_pid, ) @property def worker_pid(self) -> int: """The process ID of the associated worker process""" return self._worker_pid @property def worker_status(self) -> Union[int, None]: """The worker processe's current status or False, if there is no worker spawned yet. Note that this invokes a poll to the worker process if one was spawned. Returns: Union[int, None]: Current worker status. False, if there was no worker associated yet. """ if not self.worker: return False if self._worker_status is None: # No cached value yet; poll the worker poll_res = self.worker.poll() if poll_res is not None: # The worker finished. Save the exit status and finish up ... self._worker_status = poll_res self._finished() return poll_res return self._worker_status @property def outstream_objs(self) -> list: """Returns the list of objects parsed from the 'out' stream""" return self.streams["out"]["log_parsed"] # Magic methods ...........................................................
[docs] def __str__(self) -> str: """Return basic WorkerTask information.""" return "{}<uid: {}, priority: {}, worker_status: {}>".format( self.__class__.__name__, self.uid, self.priority, self.worker_status, )
# Public API ..............................................................
[docs] def spawn_worker(self) -> subprocess.Popen: """Spawn a worker process using subprocess.Popen and manage the corresponding queue and thread for reading the stdout stream. If there is a ``setup_func``, this function will be called first. Afterwards, from the worker_kwargs returned by that function or from the ones given during initialisation (if no ``setup_func`` was given), the worker process is spawned and associated with this task. Returns: subprocess.Popen: The created process object Raises: RuntimeError: If a worker was already spawned for this task. TypeError: For invalid ``args`` argument """ if self.worker: raise RuntimeError("Can only spawn one worker per task!") # If a setup function is available, call it with the given kwargs if self.setup_func: log.debug("Calling a setup function ...") worker_kwargs = self.setup_func( worker_kwargs=self.worker_kwargs, **self.setup_kwargs ) else: log.debug("No setup function given; using given `worker_kwargs`") worker_kwargs = self.worker_kwargs # Start the subprocess and associate it with this WorkerTask self.worker = self._spawn_worker(**worker_kwargs) # ... and take care of stdout stream reading. if worker_kwargs.get("read_stdout", True): self._setup_stream_reader( "out", stream=self.worker.stdout, parser=worker_kwargs.pop("stdout_parser", "default"), **worker_kwargs, ) # Done with spawning. self._invoke_callback("spawn") return self.worker
[docs] def read_streams( self, stream_names: list = "all", *, max_num_reads: int = 10, forward_directly: bool = False, ) -> None: """Read the streams associated with this task's worker. Args: stream_names (list, optional): The list of stream names to read. If ``all`` (default), will read all streams. max_num_reads (int, optional): How many lines should be read from the buffer. For -1, reads the whole buffer. .. warning:: Do not make this value too large as it could block the whole reader thread of this worker. forward_directly (bool, optional): Whether to call the :py:meth:`~utopya.task.WorkerTask.forward_streams` method; this is done before the callback and can be useful if the callback should not happen before the streams are forwarded. Returns: None """ def read_single_stream( stream: dict, stream_name: str, max_num_reads=max_num_reads ) -> bool: """A function to read a single stream Returns true, if a parsed object was among the read stream entries """ log.trace("Reading stream '%s' ...", stream_name) q = stream["queue"] # The flag that is set if there was a parsed object in the queue contained_parsed_obj = False # In certain cases, read as many as queue reports to have if max_num_reads == -1: max_num_reads = q.qsize() # NOTE this value is approximate; thus, this should only be # called if it is reasonably certain that the queue size # will not change # Perform the read operations for _ in range(max_num_reads): # Try to read a single entry, i.e.: the tuple enqueued by # enqueue_lines, being: (decoded string, parsed object) try: line, obj = q.get_nowait() except queue.Empty: break else: stream["log_raw"].append(line) # Check for parsed object if obj is not None: stream["log_parsed"].append(obj) contained_parsed_obj = True else: # Write line to the regular log. This way, the regular # log only contains this entry if no object could be # parsed. stream["log"].append(line) return contained_parsed_obj if not self.streams: log.trace( "No streams to read for WorkerTask '%s' (uid: %s).", self.name, self.uid, ) return elif stream_names == "all": stream_names = list(self.streams.keys()) # Now have the stream names set properly # Set the flag that determines whether there will be a callback got_parsed_obj = False # Loop over stream names and call the function to read a single stream for stream_name in stream_names: stream = self.streams[stream_name] # NOTE This way, a non-existent stream_name will not pass silently # put raise a KeyError, as it should. rv = read_single_stream(stream, stream_name) if rv: got_parsed_obj = True if forward_directly: self.forward_streams() if got_parsed_obj: self._invoke_callback("parsed_object_in_stream") return
[docs] def save_streams(self, stream_names: list = "all", *, final: bool = False): """For each stream, checks if it is to be saved, and if yes: saves it. The saving location is stored in the streams dict. The relevant keys are the `save` flag and the `save_path` string. Note that this function does not save the whole stream log, but only those part of the stream log that have not already been saved. The position up to which the stream was saved is stored under the `lines_saved` key in the stream dict. Args: stream_names (list, optional): The list of stream names to _check_. If 'all' (default), will check all streams whether the `save` flag is set. save_raw (bool, optional): If True, stores the raw log; otherwise stores the regular log, i.e. the lines that were parseable not included. final (bool, optional): If True, this is regarded as the final save operation for the stream, which will lead to additional information being saved to the end of the log. remove_ansi (bool, optional): If True, will remove ANSI escape characters (e.g. from colored logging) from the log before saving to file. Returns: None """ if not self.streams: log.debug( "No streams to save for WorkerTask '%s' (uid: %s).", self.name, self.uid, ) return elif stream_names == "all": stream_names = list(self.streams.keys()) # Go over all streams and check if they were configured to be saved for stream_name in stream_names: stream = self.streams[stream_name] if not stream.get("save"): log.debug("Not saving stream '%s' ...", stream_name) continue # else: this stream is to be saved # Determine the lines to save save_raw = stream["save_raw"] stream_log = stream["log_raw"] if save_raw else stream["log"] lines_to_save = stream_log[slice(stream["lines_saved"], None)] if not lines_to_save: log.debug( "No lines to save for stream '%s'. Lines already " "saved: %d / %d.", stream_name, stream["lines_saved"], len(stream["log"]), ) continue log.debug( "Saving the log of stream '%s' to %s, starting from " "line %d ...", stream_name, stream["save_path"], stream["lines_saved"], ) # Open the file and append the not yet saved lines with open(stream["save_path"], "a") as f: # Write header, if not already done if stream["lines_saved"] == 0: f.write( "Log of '{}' stream of {} '{}'\n---\n\n" "".format(stream_name, type(self).__name__, self.name) ) # Prepare the string that is to be saved, potentially removing # ANSI escape characters (e.g. from regex logging) ... s = "\n".join(lines_to_save) if stream["remove_ansi"]: s = _ANSI_ESCAPE.sub("", s) # ... and write it. f.write(s) # If this is the final save call, add information on the exit # status to the end if final: f.write( "\n" "\n---" f"\nend of log. exit code: {self.worker_status}\n" ) if self.fulfilled_stop_conditions: _fsc = "\n - ".join( [str(sc) for sc in self.fulfilled_stop_conditions] ) f.write( "\nFulfilled stop condition(s):\n" f" - {_fsc}\n" ) # Ensure new line at the end f.write("\n") stream["lines_saved"] += len(lines_to_save) log.debug( "Saved %d lines of stream '%s'.", len(lines_to_save), stream_name, )
[docs] def forward_streams( self, stream_names: list = "all", forward_raw: bool = False ) -> bool: """Forwards the streams to stdout, either via logging module or print This function can be periodically called to forward the part of the stream logs that was not already forwarded to stdout. The information for that is stored in the stream dict. The log_level entry is used to determine whether the logging module should be used or (in case of None) the print method. Args: stream_names (list, optional): The list of streams to print Returns: bool: whether there was any output """ def print_lines(lines: List[str], *, log_level: int): """Prints the lines to stdout via print or log.log""" prefix = f" {self.name} {stream_name}: " if log_level is None: # print it to the parent process's stdout for line in lines: print(prefix, line) else: for line in lines: log.log(log_level, "%s %s", prefix, line) # Check whether there are streams that could be printed if not self.streams: log.trace( "No streams to print for WorkerTask '%s' (uid: %s).", self.name, self.uid, ) return elif stream_names == "all": stream_names = list(self.streams.keys()) rv = False for stream_name in stream_names: stream = self.streams[stream_name] if not stream.get("forward"): log.trace("Not forwarding stream '%s' ...", stream_name) continue # else: this stream is to be forwarded # Determine lines to write forward_raw = stream.get("forward_raw", True) stream_log = stream["log_raw"] if forward_raw else stream["log"] lines = stream_log[stream["lines_forwarded"] :] if not lines: continue print_lines(lines, log_level=stream.get("log_level")) stream["lines_forwarded"] += len(lines) # There was output -> set flag rv = True log.trace( "Forwarded %d lines for stream '%s' of WorkerTask '%s'.", len(lines), stream_name, self.name, ) return rv
[docs] def signal_worker(self, signal: str) -> tuple: """Sends a signal to this WorkerTask's worker. Args: signal (str): The signal to send. Needs to be a valid signal name, i.e.: available in python signal module. Raises: ValueError: When an invalid `signal` argument was given Returns: tuple: (signal: str, signum: int) sent to the worker """ # Determine the signal number try: signum = SIGMAP[signal] except KeyError as err: _valid_sigs = ", ".join(SIGMAP.keys()) raise ValueError( f"No signal named '{signal}' available! Valid signals " f"are: {_valid_sigs}" ) from err # Handle some specific cases, then all the other signals ... if signal == "SIGTERM": log.debug("Terminating worker of task %s ...", self.name) self.worker.terminate() elif signal == "SIGKILL": log.debug("Killing worker of task %s ...", self.name) self.worker.kill() elif signal == "SIGINT": log.debug("Interrupting worker of task %s ...", self.name) self.worker.send_signal(SIGMAP["SIGINT"]) else: log.debug( "Sending %s (%d) to worker of task %s ...", signal, signum, self.name, ) self.worker.send_signal( SIGMAP[signal] if isinstance(signal, str) else signal ) self._invoke_callback("after_signal") return signal, signum
# Private API .............................................................
[docs] def _prepare_process_args( self, *, args: tuple, read_stdout: bool, **kwargs ) -> Tuple[tuple, dict]: """Prepares the arguments that will be passed to subprocess.Popen""" # Set encoding such that stream reading is in text mode; provides # backwards-compatibilibty to cases where popen_kwargs is empty. kwargs["encoding"] = kwargs.get("encoding", "utf8") # Set the buffer size kwargs["bufsize"] = kwargs.get("bufsize", 1) # NOTE bufsize = 1 is important here as default, as we usually want # lines to not be interrupted. As this only works in text mode, # the encoding specified via popen_kwargs is crucial here. # Depending on whether stdout should be read, set up the pipe objects if read_stdout: # Create new pipes for STDOUT and _forwarding_ STDERR into that # same pipe. For the specification of that syntax, see the # subprocess.Popen docs: # docs.python.org/3/library/subprocess.html#subprocess.Popen kwargs["stdout"] = subprocess.PIPE kwargs["stderr"] = subprocess.STDOUT else: # No stream-reading is taking place; forward all streams to devnull kwargs["stdout"] = kwargs["stderr"] = subprocess.DEVNULL return args, kwargs
[docs] def _spawn_process(self, args, **popen_kwargs): """This helper takes care *only* of spawning the actual process and potential error handling. It can be subclassed to spawn a different kind of process """ try: return subprocess.Popen(args, **popen_kwargs) except FileNotFoundError as err: raise FileNotFoundError( f"No executable found for task '{self.name}'! " f"Process arguments: {repr(args)}" ) from err
[docs] def _spawn_worker( self, *, args: tuple, popen_kwargs: dict = None, read_stdout: bool = True, **_, ) -> subprocess.Popen: """Helper function to spawn the worker subprocess""" args, popen_kwargs = self._prepare_process_args( args=args, read_stdout=read_stdout, **(popen_kwargs if popen_kwargs else {}), ) if not isinstance(args, tuple): raise TypeError( f"Need argument `args` to be of type tuple, got {type(args)} " f"with value {args}. Refusing to even try to spawn a worker " "process." ) log.debug("Spawning worker process with args:\n %s", args) proc = self._spawn_process(args, **popen_kwargs) # ... it is running now. # Save the approximate creation time (as soon as possible) self.profiling["create_time"] = time.time() log.debug("Spawned worker process with PID %s.", proc.pid) return proc
[docs] def _setup_stream_reader( self, stream_name: str, *, stream, parser: str = "default", follow: bool = False, save_streams: bool = False, save_streams_to: str = None, save_raw: bool = True, remove_ansi: bool = False, forward_streams: bool = False, forward_raw: bool = True, streams_log_lvl: int = None, **_, ): """Sets up the stream reader thread""" q = queue.Queue() # will contain the stream log.debug("Using stream parse function: %s", parser) parse_func = self.STREAM_PARSE_FUNCS[parser] enqueue_func = partial( enqueue_lines, parse_func=parse_func, follow=follow ) # Generate the thread that reads the stream and populates the queue t = threading.Thread( target=enqueue_func, kwargs=dict(queue=q, stream=stream) ) t.daemon = True # ==> will die with the parent thread # Start the thread; this will lead to enqueue_func being called t.start() # Save the stream information in the WorkerTask object # This includes two counters for the number of lines saved and # forwarded, which are used by the save_/forward_streams methods self.streams[stream_name] = dict( queue=q, thread=t, stream=stream, log=[], log_raw=[], log_parsed=[], save=save_streams, save_path=None, save_raw=save_raw, remove_ansi=remove_ansi, forward=forward_streams, forward_raw=forward_raw, log_level=streams_log_lvl, lines_saved=0, lines_forwarded=0, ) log.debug( "Added thread to read worker %s's %s stream", self.name, stream_name, ) if save_streams: if not save_streams_to: raise ValueError( "Was told to `save_streams` but did not find a " "`save_streams_to` argument in `worker_kwargs`!" ) save_path = save_streams_to.format(name=stream_name) self.streams[stream_name]["save_path"] = save_path
[docs] def _stop_stream_reader(self, name: str): """Stops the stream reader with the given name by closing the associated stream's file handle. """ self.streams[name]["stream"].close()
[docs] def _finished(self) -> None: """Is called once the worker has finished working on this task. It takes care that a profiling time is saved and that the remaining stream information is logged. """ # Update profiling info self.profiling["end_time"] = time.time() self.profiling["run_time"] = ( self.profiling["end_time"] - self.profiling["create_time"] ) # NOTE these are both approximate values as the worker process must # have ended prior to the call to this method # Read all remaining stream lines, then forward remaining and save all self.read_streams(max_num_reads=-1, forward_directly=True) self.save_streams(final=True) # Stop the stream-reading threads for stream_name in self.streams: self._stop_stream_reader(stream_name) self._invoke_callback("finished") log.debug( "Task %s: worker finished with status %s.", self.name, self.worker_status, )
# -----------------------------------------------------------------------------
[docs]class NoWorkTask(WorkerTask): """A WorkerTask specialization that does not spawn the worker. It is mostly equivalent to :py:class:`~utopya.task.WorkerTask` but adjusts the private methods that take care of spawning the actual process and skips the actual work. """ @property def worker(self) -> subprocess.Popen: """The associated worker process object or None, if not yet created.""" return self._worker @worker.setter def worker(self, proc: subprocess.Popen): """Set the associated worker process of this task.""" if proc is not None: raise RuntimeError("NoWorkTask does not accept an active worker!") @property def worker_pid(self) -> int: """The process ID of the associated worker process""" raise RuntimeError("NoWorkTask does not have an active worker!") @property def worker_status(self) -> Union[int, None]: """The worker processe's current status or False, if there is no worker spawned yet. Note that the worker is inactive after it was spawned. Returns: Union[int, None]: Current worker status. False, if there was no worker associated yet. """ if self._worker_status is None: return False return self._worker_status @property def outstream_objs(self) -> list: """Returns the list of objects parsed from the 'out' stream""" if not self.streams: return [] return self.streams["out"]["log_parsed"]
[docs] def spawn_worker(self) -> None: """Spawn a void process. Returns: None Raises: RuntimeError: If a worker was already spawned for this task. """ if self._worker_status is not None: raise RuntimeError("Can only spawn one worker per task!") # If a setup function is available, call it with the given kwargs if self.setup_func: log.debug("Calling a setup function ...") worker_kwargs = self.setup_func( worker_kwargs=self.worker_kwargs, **self.setup_kwargs ) else: log.debug("No setup function given; using given `worker_kwargs`") worker_kwargs = self.worker_kwargs self.profiling["create_time"] = time.time() log.debug("This is a NoWorkTask: Skipping to work on task.") self._worker_status = 0 self._finished() return None
[docs] def _setup_stream_reader( self, stream_name: str, **_, ): raise RuntimeError("NoWorkTasks cannot have a stream!")
[docs] def signal_worker(self, signal: str) -> tuple: """Overwrites signal_worker from WorkerTask Raises: RuntimeError: It is not possible to signal a NoWorkTask. """ raise RuntimeError("Cannot signal to a terminated worker!")
# ----------------------------------------------------------------------------- # ... working with the multiprocessing module
[docs]def _target_wrapper(target, streams: dict, *args, **kwargs): """A wrapper around the multiprocessing.Process target function which takes care of stream handling. """ import logging import os import sys import traceback log = logging.getLogger(__name__) # Stream handling . . . . . . . . . . . . . . . . . . . . . . . . . . . . . # For stdout, there are the following options: # - Leave as it is, which may lead to forwarding to the parent process # - Redirect to file (which may be os.devnull) if streams["stdout"] is not None: sys.stdout = open(streams["stdout"], mode="w+") log.debug("Using file-based custom stdout: %s", sys.stdout.name) # For stderr, there is one additional option: redirecting to stdout if streams["stderr"] is not None: if streams["stderr"] == streams["stdout"]: sys.stderr = sys.stdout log.debug("Redirecting stderr to stdout now.") else: sys.stderr = open(streams["stderr"], mode="w+") log.debug("Using file-based custom stderr: %s", sys.stderr.name) # Target invocation . . . . . . . . . . . . . . . . . . . . . . . . . . . . log.progress("Now invoking process target ...") try: target(*args, **kwargs) except Exception as exc: log.error("Process target invocation failed!") raise else: log.success("Process target returned successfully.")
[docs]class PopenMPProcess: """A wrapper around multiprocessing.Process that replicates (wide parts of) the interface of subprocess.Popen. """
[docs] def __init__( self, args: tuple, kwargs: dict = {}, stdin=None, stdout=None, stderr=None, bufsize: int = -1, encoding: str = "utf8", ): """Creates a :py:class:`multiprocessing.Process` and starts it. The interface here is a subset of :py:class:`subprocess.Popen` that makes those features available that make sense for a :py:class:`multiprocessing.Process`, mainly: stream reading. Subsequently, the interface is quite a bit different to that of the :py:class:`multiprocessing.Process`. The most important arguments of that interface are ``target``, ``args``, and ``kwargs``, which can be set as follows: - ``target`` will be ``args[0]`` - ``args`` will be ``args[1:]`` - ``kwargs`` is an additional keyword argument that is not part of the :py:class:`subprocess.Popen` interface typically. Regarding the stream arguments, the following steps are done to attach custom pipes: If any argument is a ``subprocess.PIPE`` or another stream specifier that is *not* ``subprocess.DEVNULL``, a new ``multiprocessing.Pipe`` and a reader thread will be established. .. warning:: This will *always* use ``spawn`` as a start method for the process! Args: args (tuple): The ``target`` callable (``args[0]``) and subsequent positional arguments. kwargs (dict, optional): Keyword arguments for the ``target``. stdin (None, optional): The stdin stream stdout (None, optional): The stdout stream stderr (None, optional): The stderr stream bufsize (int, optional): The buffersize to use. encoding (str, optional): The encoding to use for the streams; should typically remain ``utf8``, using other values is not encouraged! """ self._args = args self._kwargs = copy.deepcopy(kwargs) self._bufsize = bufsize self._encoding = encoding self._stdin = None self._stdout = None self._stderr = None # Always use spawn as a start method, will not be able to properly # read the streams otherwise. # FIXME On macOS, this causes issues downstream depending on args; but # so does forking. # See https://gitlab.com/utopia-project/utopya/-/issues/53 start_method = "spawn" # Prepare target and positional arguments, then spawn the process in a # custom context that uses an OS-independent start method. target, args = self._prepare_target_args( args, stdin=stdin, stdout=stdout, stderr=stderr, ) _ctx = multiprocessing.get_context(start_method) self._proc = _ctx.Process( target=_target_wrapper, args=args, kwargs=self.kwargs, daemon=True ) log.debug("Starting multiprocessing.Process for target %s ...", target) self._proc.start()
[docs] def _prepare_target_args( self, args: tuple, *, stdin, stdout, stderr, ) -> Tuple[Callable, tuple]: """Prepares the target callable and stream objects""" # Extract target and the actual positional arguments target, args = args[0], args[1:] if not callable(target): raise TypeError(f"Given target {target} is not callable!") # Prepare the streams . . . . . . . . . . . . . . . . . . . . . . . . . if stdin is not None: raise NotImplementedError("stdin is not supported!") # Create lambdas that create file descriptors for the streams import tempfile File = tempfile.NamedTemporaryFile get_tempfile = lambda: File( mode="x+", # exclusive creation buffering=self._bufsize, encoding=self._encoding, delete=False, ) # Need to map certain subprocess module flags to the stream creators get_stream = { None: lambda: None, subprocess.DEVNULL: lambda: open(os.devnull, mode="w"), True: get_tempfile, subprocess.PIPE: get_tempfile, subprocess.STDOUT: get_tempfile, } # Depending on the setting for stdout, let it create a file descriptor, # which is saved here in the parent process. For the child process, we # can only pass None or a string, which denotes the path to the file # (i.e. `File.name`) that should be used for this stream ... self._stdout = get_stream[stdout]() stdout = getattr(self._stdout, "name", self._stdout) # Same for stderr, but need to allow a shared pipe with stdout if stderr is subprocess.STDOUT and stdout is not None: self._stderr = self._stdout stderr = stdout else: self._stderr = get_stream[stderr]() stderr = getattr(self._stderr, "name", self._stderr) # Prepare arguments . . . . . . . . . . . . . . . . . . . . . . . . . . # Add those positional arguments that are used up by the wrapper wrapper_args = ( target, dict(stdin=stdin, stdout=stdout, stderr=stderr), ) return target, (wrapper_args + args)
[docs] def __del__(self): """Custom destructor that closes the process and file descriptors""" try: self._proc.close() except: pass try: self._stdout.close() except: pass try: self._stderr.close() except: pass
def __str__(self) -> str: return f"<PopenMPProcess for process: {self._proc}>" # .. subprocess.Popen interface ...........................................
[docs] def poll(self) -> Union[int, None]: """Check if child process has terminated. Set and return ``returncode`` attribute. Otherwise, returns None. With the underlying process being a multiprocessing.Process, this method is equivalent to the ``returncode`` property. """ return self.returncode
[docs] def wait(self, timeout=None): """Wait for the process to finish; blocking call. This method is not yet implemented, but will be! """ raise NotImplementedError("PopenMPProcess.wait")
[docs] def communicate(self, input=None, timeout=None): """Communicate with the process. This method is not yet implemented! Not sure if it will be ... """ raise NotImplementedError("PopenMPProcess.communicate")
[docs] def send_signal(self, signal: int): """Send a signal to the process. Only works for SIGKILL and SIGTERM.""" if signal == SIGMAP["SIGTERM"]: return self.terminate() elif signal == SIGMAP["SIGKILL"]: return self.kill() raise NotImplementedError( f"Cannot send signal {signal} to multiprocessing.Process! " "The only supported signals are SIGTERM and SIGKILL." )
[docs] def terminate(self): """Sends ``SIGTERM`` to the process""" self._proc.terminate()
[docs] def kill(self): """Sends ``SIGKILL`` to the process""" self._proc.kill()
@property def args(self) -> tuple: """The ``args`` argument to this process. Note that the returned tuple *includes* the target callable as its first entry. Note that these have already been passed to the process; changing them has no effect. """ return self._args @property def kwargs(self): """Keyword arguments passed to the target callable. Note that these have already been passed to the process; changing them has no effect. """ return self._kwargs @property def stdin(self): """The attached ``stdin`` stream""" return self._stdin @property def stdout(self): """The attached ``stdout`` stream""" return self._stdout @property def stderr(self): """The attached ``stderr`` stream""" return self._stderr @property def pid(self): """Process ID of the child process""" return self._proc.pid @property def returncode(self) -> Union[int, None]: """The child return code, set by ``poll()`` and ``wait()`` (and indirectly by ``communicate()``). A None value indicates that the process hasn’t terminated yet. A negative value ``-N`` indicates that the child was terminated by signal ``N`` (POSIX only). """ return self._proc.exitcode
# .............................................................................
[docs]class MPProcessTask(WorkerTask): """A WorkerTask specialization that uses multiprocessing.Process instead of subprocess.Popen. It is mostly equivalent to :py:class:`~utopya.task.WorkerTask` but adjusts the private methods that take care of spawning the actual process and setting up the stream readers, such that the particularities of the :py:class:`~utopya.task.PopenMPProcess` wrapper are accounted for. """
[docs] def _spawn_process(self, args, **popen_kwargs) -> PopenMPProcess: """This helper takes care *only* of spawning the actual process and potential error handling. It returns an PopenMPProcess instance, which has the same interface as subprocess.Popen. """ return PopenMPProcess(args, **popen_kwargs)
[docs] def _setup_stream_reader(self, *args, **kwargs): """Sets up the stream reader with ``follow=True``, such that the file- like streams that PopenMPProcess uses can be read properly.""" return super()._setup_stream_reader(*args, follow=True, **kwargs)
[docs] def _stop_stream_reader(self, name: str): """Stops the stream reader thread with the given name by telling its follow function to stop, thus ending iteration.""" self.streams[name].get("thread").stop_follow = True super()._stop_stream_reader(name)
# -----------------------------------------------------------------------------
[docs]class TaskList: """The TaskList stores Task objects in it, ensuring that none is in there twice and allows to lock it to prevent adding new tasks. """
[docs] def __init__(self): """Initialize an empty TaskList.""" self._l = [] self._locked = False
[docs] def __len__(self) -> int: """The length of the TaskList.""" return len(self._l)
[docs] def __contains__(self, val: Task) -> bool: """Checks if the given object is contained in this TaskList.""" if not isinstance(val, Task): # Cannot be part of this TaskList return False return val in self._l
[docs] def __getitem__(self, idx: int) -> Task: """Returns the item at the given index in the TaskList.""" return self._l[idx]
[docs] def __iter__(self): """Iterate over the TaskList""" return iter(self._l)
[docs] def __eq__(self, other) -> bool: """Tests for equality of the task list by forwarding to _l attribute""" return bool(self._l == other)
[docs] def lock(self): """If called, the TaskList becomes locked and allows no further calls to the append method. """ self._locked = True
[docs] def append(self, val: Task): """Append a Task object to this TaskList Args: val (Task): The task to add Raises: RuntimeError: If TaskList object was locked TypeError: Tried to add a non-Task type object ValueError: Task already added to this TaskList """ if self._locked: raise RuntimeError("TaskList locked! Cannot append further tasks.") elif not isinstance(val, Task): raise TypeError( "TaskList can only be filled with Task objects, got object of " f"type {type(val)}, value {repr(val)}" ) elif val in self: raise ValueError( f"Task '{val.name}' (uid: {val.uid}) was already added to " "this TaskList, cannot be added again." ) self._l.append(val)
[docs] def __add__(self, tasks: Sequence[Task]): """Appends all the tasks in the given iterable to the task list""" for t in tasks: self.append(t) return self