# This file is part of fm-weck: executing fm-tools in containerized environments.
# https://gitlab.com/sosy-lab/software/fm-weck
#
# SPDX-FileCopyrightText: 2024 Dirk Beyer <https://www.sosy-lab.org>
#
# SPDX-License-Identifier: Apache-2.0
import dbm
import json
import logging
import os
import sys
from pathlib import Path
from typing import Optional, Tuple, Union, cast
from fm_tools.benchexec_helper import DataModel
from fm_tools.files import unzip
from fm_tools.fmtoolversion import FmToolVersion
from fm_weck.run_result import RunResult
from fm_weck.tmp_file import NTempFile
from .config import Config, parse_fm_data
from .engine import Engine
from .file_util import copy_ensuring_unix_line_endings
logger = logging.getLogger(__name__)
[docs]
def check_cache_entry(shelve_space: Path, checksum: str, config: Config) -> bool:
checksum_file = config.get_checksum_db()
if sys.version_info < (3, 11):
# Python 3.10 and below only support strings as path to dbm.open
checksum_file = str(checksum_file.resolve())
try:
with dbm.open(checksum_file, "r") as db:
# dbm returns bytes, so we need to encode the checksum
# we use utf-8 encoding to ensure consistency
return db[shelve_space.name] == checksum.encode("utf-8")
except dbm.error:
logger.debug("Checksum file does not exist")
return False
except KeyError:
logger.debug("Checksum does not exist")
return False
[docs]
def update_checksum(shelve_space: Path, checksum: str, config: Config):
checksum_file = config.get_checksum_db()
if sys.version_info < (3, 11):
# Python 3.10 and below only support strings as path to dbm.open
checksum_file = str(checksum_file.resolve())
with dbm.open(checksum_file, "c") as db:
logger.debug("Updating checksum for %s", shelve_space.name)
logger.debug("Checksum: %s", checksum)
# dbm only stores bytes, so we need to encode the checksum
# we use utf-8 encoding to ensure consistency
db[shelve_space.name] = checksum.encode("utf-8")
[docs]
def run_guided(
fm_tool: Union[Path, FmToolVersion],
version: Optional[str],
configuration: Config,
prop: Optional[Path],
program_files: list[Path],
additional_args: list[str],
witness: Optional[Path] = None,
data_model: Optional[DataModel] = None,
offline_mode: bool = False,
log_output_to: Optional[Path] = None,
output_files_to: Optional[Path] = None,
timeout_sec: Optional[float] = None,
print_tool_output_to_console: bool = True,
) -> RunResult:
property_path = None
if prop is not None:
try:
# the source path might not be mounted in the contianer, so we
# copy the property to the weck_cache which should be mounted
source_property_path = prop
property_path = configuration.get_shelve_path_for_property(source_property_path)
copy_ensuring_unix_line_endings(source_property_path, property_path)
except KeyError:
logger.error("Unknown property %s", prop)
return RunResult(command=[], exit_code=1, raw_output="Unknown property")
configuration.make_script_available()
fm_data, shelve_space = setup_fm_tool(fm_tool, version, configuration, offline_mode)
engine = Engine.from_config(fm_data, configuration)
if log_output_to is not None:
engine.set_log_file(log_output_to)
if output_files_to is not None:
engine.set_output_dir(output_files_to)
engine.print_output_to_stdout = print_tool_output_to_console
current_dir = Path.cwd().resolve()
os.chdir(shelve_space)
command = fm_data.command_line(
Path("."),
input_files=program_files,
working_dir=engine.get_workdir(),
property=property_path,
data_model=data_model,
options=additional_args,
add_options_from_fm_data=True,
)
os.chdir(current_dir)
# There are two ways to pass the witness
# 1. Through the task options in BenchExec using {"witness": filename} and adding the witness to the input files
# 2. Replacing the placeholder in FM-Data "${witness}" with the correct path
#
# We are using the second approach here, in order to avoid modifying FM-Data
if witness is not None:
assert any(c == "${witness}" for c in command), "The version given does not support witness files"
command = [str(witness) if c == "${witness}" else c for c in command]
logger.debug("Assembled command from fm-tools: %s", command)
engine.use_overlay(shelve_space.name)
return engine.run(*command, timeout_sec=timeout_sec)
[docs]
def run_manual(
fm_tool: Union[Path, FmToolVersion],
version: Optional[str],
configuration: Config,
command: list[str],
offline_mode: bool = False,
use_overlay: bool = False,
log_output_to: Optional[Path] = None,
output_files_to: Optional[Path] = None,
timeout_sec: Optional[float] = None,
print_tool_output_to_console: bool = True,
) -> RunResult:
fm_data, shelve_space = setup_fm_tool(fm_tool, version, configuration, offline_mode)
engine = Engine.from_config(fm_data, configuration)
if log_output_to is not None:
engine.set_log_file(log_output_to)
if output_files_to is not None:
engine.set_output_dir(output_files_to)
if use_overlay:
configuration.make_script_available()
current_dir = Path.cwd().resolve()
os.chdir(shelve_space)
executable = fm_data.get_executable_path(Path("."))
os.chdir(current_dir)
logger.debug("Running with overlay...")
engine.use_overlay(shelve_space.name)
else:
executable = fm_data.get_executable_path(shelve_space)
if log_output_to is not None:
engine.set_log_file(log_output_to)
if output_files_to is not None:
engine.set_output_dir(output_files_to)
engine.print_output_to_stdout = print_tool_output_to_console
execution_result = engine.run(str(executable), *command, timeout_sec=timeout_sec)
return execution_result
### Move to cache_mgr.py after merge to main ###
[docs]
def map_doi(fm_data: FmToolVersion, tool_path: Path):
doi_map = tool_path.parent / "doi_map.json"
if os.path.exists(doi_map):
with open(doi_map, "r") as file:
data = json.load(file)
else:
data = {}
doi = str(tool_path).split("/")[-1]
if doi not in data:
data[doi] = []
if fm_data.get_tool_name_with_version() not in data[doi]:
data[doi].append(fm_data.get_tool_name_with_version())
with open(doi_map, "w") as file:
json.dump(data, file, indent=2)
### Move to cache_mgr.py after merge to main ###