# This file is part of fm-weck: executing fm-tools in containerized environments.
# https://gitlab.com/sosy-lab/software/fm-weck
#
# SPDX-FileCopyrightText: 2024 Dirk Beyer <https://www.sosy-lab.org>
#
# SPDX-License-Identifier: Apache-2.0
import contextlib
import multiprocessing
import multiprocessing.synchronize
import os
import threading
import uuid
from pathlib import Path
from shutil import rmtree
from typing import TYPE_CHECKING, Generator, Optional, Tuple, Union
from fm_tools.benchexec_helper import DataModel
from fm_weck.config import Config
from fm_weck.exceptions import Failure, RunFailedError, failure_from_exception
from fm_weck.resources import fm_tools_choice_map, property_choice_map
from fm_weck.serve import run_guided, run_manual
from .server_utils import TMP_DIR
if TYPE_CHECKING:
from fm_weck.grpc_service.proto.fm_weck_service_pb2 import File, RunRequest
from fm_weck.run_result import RunResult
def _get_unique_id() -> str:
return str(uuid.uuid4())
[docs]
class StillRunningError(Exception):
pass
[docs]
def worker(setup_complete_flag, queue: multiprocessing.SimpleQueue, initializer, initargs, target, args, kwargs):
import signal
import sys
# In the rare case that a cancel happens before the target function overrides the
# signal handler, we need to make sure that the process still sends something over the queue and exits.
def handle_extremely_fast_sigterm(signum, frame):
queue.put((False, InterruptedError("Run was canceled before setup complete.")))
sys.exit(1)
signal.signal(signal.SIGTERM, handle_extremely_fast_sigterm)
setup_complete_flag.set()
try:
if initializer:
initializer(*initargs)
result = target(*args, **kwargs)
if hasattr(result, "exit_code") and getattr(result, "exit_code", 0) != 0:
raise RunFailedError(result.exit_code, result.command, getattr(result, "raw_output", None))
queue.put((True, result))
except Exception as e:
queue.put((False, failure_from_exception(e)))
[docs]
class RunHandler:
mp = multiprocessing.get_context("spawn")
_success: bool = False
_result: Optional[Union["RunResult", Failure]] = None
_setup_complete: Optional[multiprocessing.synchronize.Event] = None
def __init__(self, request: "RunRequest"):
self.request = request
self.run_id = _get_unique_id()
self.run_path = TMP_DIR / self.run_id
self._is_cancelled: bool = False
self._output_log = self.run_path / "output" / "output.txt"
self._output_dir = self.run_path / "output"
self.run_path.mkdir(parents=True, exist_ok=False)
self._process = None
self._done = threading.Event()
self._queue = self.mp.SimpleQueue()
self._result_listener: threading.Thread = None
def _set(self, result=Tuple[bool, object]):
self._success, self._result = result
if isinstance(self._result, Failure):
self._success = False
self._done.set()
[docs]
def ready(self) -> bool:
return self._done.is_set()
@property
def output(self) -> str:
if self.is_running():
try:
with self._output_log.open("r") as output_file:
return output_file.read()
except (FileNotFoundError, PermissionError):
return ""
if self.ready() and self.successful():
return self._result.raw_output
[docs]
def successful(self) -> bool:
"""
Returns True if the run was successful, False otherwise.
:raises StillRunningError: If the run is still running.
:raises ValueError: If the run has not been started yet.
"""
if self.ready():
return self._success
if self._process is None:
raise ValueError("Run has not been started yet.")
raise StillRunningError("The run is still running.")
@property
def output_files(self) -> Generator[str, None, None]:
"""
Names of the files produced by the run.
"""
for root, _, files in os.walk(self._output_dir):
for file in files:
yield os.path.relpath(os.path.join(root, file), self._output_dir)
[docs]
def is_running(self):
"""
A run handler is running, if it has been started and is not finished yet.
"""
return self._process and (not self.ready())
[docs]
def is_canceled(self):
"""
Returns True if the run was really canceled, i.e. the underlying process terminated, False otherwise.
Since cancelling a finished run has no effect, calling `cancel_run` will not necessarily
result in is_canceled() returning True.
"""
return self._is_cancelled
def _apply(self, func, args, kwds):
self._setup_complete = self.mp.Event()
self._process = self.mp.Process(
target=worker,
args=(self._setup_complete, self._queue, os.chdir, (str(self.run_path.absolute()),), func, args, kwds),
daemon=True,
)
self._result_listener = threading.Thread(target=self._listen_for_result, daemon=True)
self._result_listener.start()
self._process.start()
def _listen_for_result(self):
result = self._queue.get()
self._process.join()
self._process.close()
self._queue.close()
# Make sure resources are released before
# setting the result.
self._set(result)
[docs]
def start(self):
c_program = self.get_c_program(self.request)
data_model = self.request.data_model
fm_data = self.get_tool(self.request)
property_path = self.get_property(self.request)
tool_version = None
if self.request.tool.HasField("tool_version"):
tool_version = self.request.tool.tool_version
config = Config()
config.load()
self._apply(
func=run_guided,
args=(
fm_data.absolute(),
tool_version,
config,
property_path.absolute(),
[c_program],
),
kwds=dict(
additional_args=[],
data_model=DataModel[data_model],
log_output_to=self._output_log.absolute(),
output_files_to=self._output_dir.absolute(),
),
)
[docs]
def start_expert(self, command: str):
fm_data = self.get_tool(self.request)
tool_version = None
if self.request.tool.HasField("tool_version"):
tool_version = self.request.tool.tool_version
command = list(self.request.command)
config = Config()
config.load()
self._apply(
func=run_manual,
args=(
fm_data.absolute(),
tool_version,
config,
command,
),
kwds=dict(
log_output_to=self._output_log.absolute(),
output_files_to=self._output_dir.absolute(),
),
)
[docs]
def join(self, timeout=None):
done = self._done.wait(timeout)
if not done:
raise TimeoutError(f"Timeout while joining run {self.run_id}.")
[docs]
def cleanup(self):
if self.is_running():
raise StillRunningError("The run is still running.")
rmtree(self.run_path, ignore_errors=True)
[docs]
def get_property(self, request: "RunRequest") -> Path:
property = request.property
if property.HasField("property_id"):
return property_choice_map()[property.property_id]
else:
return self.get_custom_property(property.property_file)
[docs]
def get_custom_property(self, property_file: "File") -> Path:
custom_property_path = self.run_path / "_custom_property.prp"
with custom_property_path.open("wb") as custom_property_file:
custom_property_file.write(property_file.file)
return custom_property_path
[docs]
def get_c_program(self, request) -> Path:
c_program = "_c_program.c"
c_program_path = self.run_path / c_program
c_program_path.parent.mkdir(parents=True, exist_ok=True)
with open(c_program_path, "wb") as c_file:
c_file.write(request.c_program)
return c_program
[docs]
def cancel_run(self):
if self.ready():
# Canceling a run that is already finished has no effect.
return
if self._setup_complete is None:
raise RuntimeError("Run has not been started yet.")
self._setup_complete.wait()
if self._process.is_alive():
self._process.terminate()
self._is_cancelled = True
[docs]
def get_file(self, file_name: str) -> Path:
"""
Finds the file with the given name in the output directory of the run.
:param file_name: The name of the file.
:return: The path to the file.
:raises FileNotFoundError: If the file does not exist.
"""
file_path = self._output_dir / file_name
if not file_path.exists():
raise FileNotFoundError(f"File {file_name} not found.")
return file_path
[docs]
def glob(self, name_pattern: str) -> Generator[Path, None, None]:
"""
Finds all files in the output directory of the run that match the given pattern.
:param name_pattern: The pattern to match.
:return: The paths to the files.
"""
return self._output_dir.glob(name_pattern)
[docs]
def close(self):
"""
Close the process and the result listener.
It is a StillRunningError to call this method if the process is still running.
"""
if self._process is None or self.ready():
return
if self._process.is_alive():
raise StillRunningError("Process is still running.")
# Should normally not occur, as this would mean, that the process is not alive,
# but the result listener is still running.
with contextlib.suppress(ValueError):
self._process.close()
[docs]
def failed(self) -> bool:
return self.ready() and not self._success
[docs]
def failure(self):
"""Return the error object/text when failed, else None."""
if self.failed():
return self._result
return None