diff --git a/src/core/src/bootstrap/ConfigurationFactory.py b/src/core/src/bootstrap/ConfigurationFactory.py index 545d2921..6609d64c 100644 --- a/src/core/src/bootstrap/ConfigurationFactory.py +++ b/src/core/src/bootstrap/ConfigurationFactory.py @@ -37,6 +37,7 @@ from core.src.local_loggers.CompositeLogger import CompositeLogger from core.src.package_managers.AptitudePackageManager import AptitudePackageManager +from core.src.package_managers.TdnfPackageManager import TdnfPackageManager from core.src.package_managers.YumPackageManager import YumPackageManager from core.src.package_managers.ZypperPackageManager import ZypperPackageManager @@ -69,14 +70,17 @@ def __init__(self, log_file_path, real_record_path, recorder_enabled, emulator_e self.configurations = { 'apt_prod_config': self.new_prod_configuration(Constants.APT, AptitudePackageManager), + 'tdnf_prod_config': self.new_prod_configuration(Constants.TDNF, TdnfPackageManager), 'yum_prod_config': self.new_prod_configuration(Constants.YUM, YumPackageManager), 'zypper_prod_config': self.new_prod_configuration(Constants.ZYPPER, ZypperPackageManager), 'apt_dev_config': self.new_dev_configuration(Constants.APT, AptitudePackageManager), + 'tdnf_dev_config': self.new_dev_configuration(Constants.TDNF, TdnfPackageManager), 'yum_dev_config': self.new_dev_configuration(Constants.YUM, YumPackageManager), 'zypper_dev_config': self.new_dev_configuration(Constants.ZYPPER, ZypperPackageManager), 'apt_test_config': self.new_test_configuration(Constants.APT, AptitudePackageManager), + 'tdnf_test_config': self.new_test_configuration(Constants.TDNF, TdnfPackageManager), 'yum_test_config': self.new_test_configuration(Constants.YUM, YumPackageManager), 'zypper_test_config': self.new_test_configuration(Constants.ZYPPER, ZypperPackageManager) } @@ -112,7 +116,7 @@ def get_configuration(self, env, package_manager_name): print ("Error: Environment configuration not supported - " + str(env)) return None - if str(package_manager_name) not in [Constants.APT, Constants.YUM, Constants.ZYPPER]: + if str(package_manager_name) not in [Constants.APT, Constants.TDNF, Constants.YUM, Constants.ZYPPER]: print ("Error: Package manager configuration not supported - " + str(package_manager_name)) return None diff --git a/src/core/src/bootstrap/Constants.py b/src/core/src/bootstrap/Constants.py index bce9d0ef..c73b45d0 100644 --- a/src/core/src/bootstrap/Constants.py +++ b/src/core/src/bootstrap/Constants.py @@ -197,9 +197,11 @@ class StatusTruncationConfig(EnumBackport): RED_HAT = 'Red Hat' SUSE = 'SUSE' CENTOS = 'CentOS' + AZURE_LINUX = 'Microsoft Azure Linux' # Package Managers APT = 'apt' + TDNF = 'tdnf' YUM = 'yum' ZYPPER = 'zypper' @@ -350,7 +352,7 @@ class EnvLayer(EnumBackport): PRIVILEGED_OP_REBOOT = PRIVILEGED_OP_MARKER + "Reboot_Exception" PRIVILEGED_OP_EXIT = PRIVILEGED_OP_MARKER + "Exit_" - # Supported Package Architectures - if this is changed, review YumPackageManage + # Supported Package Architectures - if this is changed, review TdnfPackageManager and YumPackageManager SUPPORTED_PACKAGE_ARCH = ['.x86_64', '.noarch', '.i686', '.aarch64'] # Package / Patch State Ordering Constants diff --git a/src/core/src/bootstrap/EnvLayer.py b/src/core/src/bootstrap/EnvLayer.py index 6de1bd77..4f20677d 100644 --- a/src/core/src/bootstrap/EnvLayer.py +++ b/src/core/src/bootstrap/EnvLayer.py @@ -37,7 +37,7 @@ class EnvLayer(object): def __init__(self, real_record_path=None, recorder_enabled=False, emulator_enabled=False): # Recorder / emulator storage self.__real_record_path = real_record_path - self.__real_record_pointer_path = real_record_path + ".pt" + self.__real_record_pointer_path = real_record_path + ".pt" if real_record_path is not None else None self.__real_record_handle = None self.__real_record_pointer = 0 @@ -55,7 +55,7 @@ def __init__(self, real_record_path=None, recorder_enabled=False, emulator_enabl self.platform = self.Platform(recorder_enabled, emulator_enabled, self.__write_record, self.__read_record) self.datetime = self.DateTime(recorder_enabled, emulator_enabled, self.__write_record, self.__read_record) self.file_system = self.FileSystem(recorder_enabled, emulator_enabled, self.__write_record, self.__read_record, - emulator_root_path=os.path.dirname(self.__real_record_path)) + emulator_root_path=os.path.dirname(self.__real_record_path) if self.__real_record_path is not None else self.__real_record_path) # Constant paths self.etc_environment_file_path = "/etc/environment" @@ -64,20 +64,27 @@ def get_package_manager(self): """ Detects package manager type """ ret = None - # choose default - almost surely one will match. - for b in ('apt-get', 'yum', 'zypper'): - code, out = self.run_command_output('which ' + b, False, False) + if self.platform.linux_distribution()[0] == Constants.AZURE_LINUX: + code, out = self.run_command_output('which tdnf', False, False) if code == 0: - ret = b - if ret == 'apt-get': - ret = Constants.APT - break - if ret == 'yum': - ret = Constants.YUM - break - if ret == 'zypper': - ret = Constants.ZYPPER - break + ret = Constants.TDNF + else: + print("Error: Expected package manager tdnf not found on this Azure Linux VM") + else: + # choose default - almost surely one will match. + for b in ('apt-get', 'yum', 'zypper'): + code, out = self.run_command_output('which ' + b, False, False) + if code == 0: + ret = b + if ret == 'apt-get': + ret = Constants.APT + break + if ret == 'yum': + ret = Constants.YUM + break + if ret == 'zypper': + ret = Constants.ZYPPER + break if ret is None and platform.system() == 'Windows': ret = Constants.APT diff --git a/src/core/src/core_logic/PatchAssessor.py b/src/core/src/core_logic/PatchAssessor.py index 50d49584..d8220333 100644 --- a/src/core/src/core_logic/PatchAssessor.py +++ b/src/core/src/core_logic/PatchAssessor.py @@ -51,7 +51,7 @@ def start_assessment(self): self.lifecycle_manager.lifecycle_status_check() return True - self.composite_logger.log("\nStarting patch assessment... [MachineId: " + self.env_layer.platform.node() +"][ActivityId: " + self.execution_config.activity_id +"][StartTime: " + self.execution_config.start_time +"]") + self.composite_logger.log("\nStarting patch assessment... [MachineId: " + self.env_layer.platform.node() + "][ActivityId: " + self.execution_config.activity_id + "][StartTime: " + self.execution_config.start_time + "]") self.write_assessment_state() # success / failure does not matter, only that an attempt started self.stopwatch.start() diff --git a/src/core/src/package_managers/TdnfPackageManager.py b/src/core/src/package_managers/TdnfPackageManager.py new file mode 100644 index 00000000..334e54f4 --- /dev/null +++ b/src/core/src/package_managers/TdnfPackageManager.py @@ -0,0 +1,691 @@ +# Copyright 2025 Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Requires Python 2.7+ + +"""TdnfPackageManager for Azure Linux""" +import json +import os +import re + +from core.src.core_logic.VersionComparator import VersionComparator +from core.src.package_managers.PackageManager import PackageManager +from core.src.bootstrap.Constants import Constants + + +class TdnfPackageManager(PackageManager): + """Implementation of Azure Linux package management operations""" + + def __init__(self, env_layer, execution_config, composite_logger, telemetry_writer, status_handler): + super(TdnfPackageManager, self).__init__(env_layer, execution_config, composite_logger, telemetry_writer, status_handler) + # Repo refresh + self.cmd_clean_cache = "sudo tdnf clean expire-cache" + self.cmd_repo_refresh = "sudo tdnf -q list updates" + + # Support to get updates and their dependencies + self.tdnf_check = 'sudo tdnf -q list updates' + self.single_package_check_versions = 'sudo tdnf list available ' + self.single_package_check_installed = 'sudo tdnf list installed ' + self.single_package_upgrade_simulation_cmd = 'sudo tdnf install --assumeno --skip-broken ' + + # Install update + self.single_package_upgrade_cmd = 'sudo tdnf -y install --skip-broken ' + + # Package manager exit code(s) + self.tdnf_exitcode_ok = 0 + self.tdnf_exitcode_on_no_action_for_install_update = 8 + self.commands_expecting_no_action_exitcode = [self.single_package_upgrade_simulation_cmd] + + # Support to check for processes requiring restart + self.dnf_utils_prerequisite = 'sudo tdnf -y install dnf-utils' + self.needs_restarting_with_flag = 'sudo LANG=en_US.UTF8 needs-restarting -r' + + # auto OS updates + self.current_auto_os_update_service = None + self.os_patch_configuration_settings_file_path = '' + self.auto_update_service_enabled = False + self.auto_update_config_pattern_match_text = "" + self.download_updates_identifier_text = "" + self.apply_updates_identifier_text = "" + self.enable_on_reboot_identifier_text = "" + self.enable_on_reboot_check_cmd = '' + self.installation_state_identifier_text = "" + self.install_check_cmd = "" + self.apply_updates_enabled = "Enabled" + self.apply_updates_disabled = "Disabled" + self.apply_updates_unknown = "Unknown" + + # commands for DNF Automatic updates service + self.__init_constants_for_dnf_automatic() + + # Miscellaneous + self.set_package_manager_setting(Constants.PKG_MGR_SETTING_IDENTITY, Constants.TDNF) + self.STR_TOTAL_DOWNLOAD_SIZE = "Total download size: " + self.version_comparator = VersionComparator() + + # if an Auto Patching request comes in on a Azure Linux machine with Security and/or Critical classifications selected, we need to install all patches, since classifications aren't available in Azure Linux repository + installation_included_classifications = [] if execution_config.included_classifications_list is None else execution_config.included_classifications_list + if execution_config.health_store_id is not str() and execution_config.operation.lower() == Constants.INSTALLATION.lower() \ + and Constants.AZURE_LINUX in str(env_layer.platform.linux_distribution()) \ + and 'Critical' in installation_included_classifications and 'Security' in installation_included_classifications: + self.composite_logger.log_debug("Updating classifications list to install all patches for the Auto Patching request since classification based patching is not available on Azure Linux machines") + execution_config.included_classifications_list = [Constants.PackageClassification.CRITICAL, Constants.PackageClassification.SECURITY, Constants.PackageClassification.OTHER] + + self.package_install_expected_avg_time_in_seconds = 90 # Setting a default value of 90 seconds as the avg time to install a package using tdnf, might be changed later if needed. + + def refresh_repo(self): + self.composite_logger.log("[TDNF] Refreshing local repo...") + self.invoke_package_manager(self.cmd_clean_cache) + self.invoke_package_manager(self.cmd_repo_refresh) + + # region Get Available Updates + def invoke_package_manager_advanced(self, command, raise_on_exception=True): + """Get missing updates using the command input""" + self.composite_logger.log_verbose("[TDNF] Invoking package manager. [Command={0}]".format(str(command))) + code, out = self.env_layer.run_command_output(command, False, False) + + if code is self.tdnf_exitcode_ok or \ + (any(command_expecting_no_action_exitcode in command for command_expecting_no_action_exitcode in self.commands_expecting_no_action_exitcode) and + code is self.tdnf_exitcode_on_no_action_for_install_update): + self.composite_logger.log_debug('[TDNF] Invoked package manager. [Command={0}][Code={1}][Output={2}]'.format(command, str(code), str(out))) + else: + self.composite_logger.log_warning('[ERROR] Customer environment error. [Command={0}][Code={1}][Output={2}]'.format(command, str(code), str(out))) + error_msg = "Customer environment error: Investigate and resolve unexpected return code ({0}) from package manager on command: {1}".format(str(code), command) + self.status_handler.add_error_to_status(error_msg, Constants.PatchOperationErrorCodes.PACKAGE_MANAGER_FAILURE) + if raise_on_exception: + raise Exception(error_msg, "[{0}]".format(Constants.ERROR_ADDED_TO_STATUS)) + + return out, code + + # region Classification-based (incl. All) update check + def get_all_updates(self, cached=False): + """Get all missing updates""" + self.composite_logger.log_verbose("[TDNF] Discovering all packages...") + if cached and not len(self.all_updates_cached) == 0: + self.composite_logger.log_debug("[TDNF] Get all updates : [Cached={0}][PackagesCount={1}]]".format(str(cached), len(self.all_updates_cached))) + return self.all_updates_cached, self.all_update_versions_cached # allows for high performance reuse in areas of the code explicitly aware of the cache + + out = self.invoke_package_manager(self.tdnf_check) + self.all_updates_cached, self.all_update_versions_cached = self.extract_packages_and_versions(out) + + self.composite_logger.log_debug("[TDNF] Get all updates : [Cached={0}][PackagesCount={1}]]".format(str(False), len(self.all_updates_cached))) + return self.all_updates_cached, self.all_update_versions_cached + + def get_security_updates(self): + """Get missing security updates. NOTE: Classification based categorization of patches is not available in Azure Linux as of now""" + self.composite_logger.log_verbose("[TDNF] Discovering 'security' packages...") + security_packages, security_package_versions = [], [] + self.composite_logger.log_debug("[TDNF] Discovered 'security' packages. [Count={0}]".format(len(security_packages))) + return security_packages, security_package_versions + + def get_other_updates(self): + """Get missing other updates. + NOTE: This function will return all available packages since Azure Linux does not support package classification in it's repository""" + self.composite_logger.log_verbose("[TDNF] Discovering 'other' packages...") + other_packages, other_package_versions = [], [] + + all_packages, all_package_versions = self.get_all_updates(True) + + self.composite_logger.log_debug("[TDNF] Discovered 'other' packages. [Count={0}]".format(len(other_packages))) + return all_packages, all_package_versions + + def set_max_patch_publish_date(self, max_patch_publish_date=str()): + pass + # endregion + + # region Output Parser(s) + def extract_packages_and_versions(self, output): + """Returns packages and versions from given output""" + packages, versions = self.extract_packages_and_versions_including_duplicates(output) + packages, versions = self.dedupe_update_packages_to_get_latest_versions(packages, versions) + return packages, versions + + def extract_packages_and_versions_including_duplicates(self, output): + """Returns packages and versions from given output""" + self.composite_logger.log_verbose("[TDNF] Extracting package and version data...") + packages, versions = [], [] + + lines = output.strip().split('\n') + + for line_index in range(0, len(lines)): + # Do not install Obsoleting Packages. The obsoleting packages list comes towards end in the output. + if lines[line_index].strip().startswith("Obsoleting"): + break + + line = re.split(r'\s+', lines[line_index].strip()) + + # If we run into a length of 3, we'll accept it and continue + if len(line) == 3 and self.__is_package(line[0]): + packages.append(self.get_product_name(line[0])) + versions.append(line[1]) + else: + self.composite_logger.log_verbose("[TDNF] > Inapplicable line (" + str(line_index) + "): " + lines[line_index]) + + return packages, versions + + def dedupe_update_packages_to_get_latest_versions(self, packages, package_versions): + """Remove duplicate packages and returns the latest/highest version of each package """ + deduped_packages = [] + deduped_package_versions = [] + + for index, package in enumerate(packages): + if package in deduped_packages: + deduped_package_version = deduped_package_versions[deduped_packages.index(package)] + duplicate_package_version = package_versions[index] + # use custom comparator output 0 (equal), -1 (deduped package version is the lower one), +1 (deduped package version is the greater one) + is_deduped_package_latest = self.version_comparator.compare_versions(deduped_package_version, duplicate_package_version) + if is_deduped_package_latest < 0: + deduped_package_versions[deduped_packages.index(package)] = duplicate_package_version + continue + + deduped_packages.append(package) + deduped_package_versions.append(package_versions[index]) + + return deduped_packages, deduped_package_versions + + @staticmethod + def __is_package(chunk): + # Using a list comprehension to determine if chunk is a package + package_extensions = Constants.SUPPORTED_PACKAGE_ARCH + return len([p for p in package_extensions if p in chunk]) == 1 + # endregion + # endregion + + # region Install Updates + def get_composite_package_identifier(self, package, package_version): + package_without_arch, arch = self.get_product_name_and_arch(package) + package_identifier = package_without_arch + '-' + str(package_version) + if arch is not None: + package_identifier += arch + return package_identifier + + def install_updates_fail_safe(self, excluded_packages): + return + + def install_security_updates_azgps_coordinated(self): + pass + # endregion + + # region Package Information + def get_all_available_versions_of_package(self, package_name): + """ Returns a list of all the available versions of a package """ + # Sample output format + # Loaded plugin: tdnfrepogpgcheck + # azurelinux-repos-shared.noarch 3.0-3.azl3 azurelinux-official-base + # azurelinux-repos-shared.noarch 3.0-4.azl3 azurelinux-official-base + cmd = self.single_package_check_versions.replace('', package_name) + output = self.invoke_package_manager(cmd) + packages, package_versions = self.extract_packages_and_versions_including_duplicates(output) + return package_versions + + def is_package_version_installed(self, package_name, package_version): + """ Returns true if the specific package version is installed """ + # Sample output format + # Loaded plugin: tdnfrepogpgcheck + # azurelinux-repos-shared.noarch 3.0-3.azl3 @System + self.composite_logger.log_verbose("[TDNF] Checking package install status. [PackageName={0}][PackageVersion={1}]".format(str(package_name), str(package_version))) + cmd = self.single_package_check_installed.replace('', package_name) + output = self.invoke_package_manager(cmd) + packages, package_versions = self.extract_packages_and_versions_including_duplicates(output) + + for index, package in enumerate(packages): + if package == package_name and (package_versions[index] == package_version): + self.composite_logger.log_debug("[TDNF] > Installed version match found. [PackageName={0}][PackageVersion={1}]".format(str(package_name), str(package_version))) + return True + else: + self.composite_logger.log_verbose("[TDNF] > Did not match: " + package + " (" + package_versions[index] + ")") + + # sometimes packages are removed entirely from the system during installation of other packages + # so let's check that the package is still needed before + self.composite_logger.log_debug("[TDNF] > Installed version match NOT found. [PackageName={0}][PackageVersion={1}]".format(str(package_name), str(package_version))) + return False + + def extract_dependencies(self, output, packages): + # Extracts dependent packages from output. + # sample output + # Loaded plugin: tdnfrepogpgcheck + # + # Upgrading: + # python3 x86_64 3.12.3-5.azl3 azurelinux-official-base 44.51k 36.89k + # python3-curses x86_64 3.12.3-5.azl3 azurelinux-official-base 165.62k 71.64k + # python3-libs x86_64 3.12.3-5.azl3 azurelinux-official-base 36.05M 10.52M + # + # Total installed size: 36.26M + # Total download size: 10.62M + # Error(1032) : Operation aborted. + dependencies = [] + package_arch_to_look_for = ["x86_64", "noarch", "i686", "aarch64"] # if this is changed, review Constants + + lines = output.strip().splitlines() + + for line_index in range(0, len(lines)): + line = re.split(r'\s+', lines[line_index].strip()) + dependent_package_name = "" + + if self.is_valid_update(line, package_arch_to_look_for): + dependent_package_name = self.get_product_name_with_arch(line, package_arch_to_look_for) + else: + self.composite_logger.log_verbose("[TDNF] > Inapplicable line: " + str(line)) + continue + + if len(dependent_package_name) != 0 and dependent_package_name not in packages and dependent_package_name not in dependencies: + self.composite_logger.log_verbose("[TDNF] > Dependency detected: " + dependent_package_name) + dependencies.append(dependent_package_name) + + return dependencies + + def add_arch_dependencies(self, package_manager, package, version, packages, package_versions, package_and_dependencies, package_and_dependency_versions): + """ + Add the packages with same name as that of input parameter package but with different architectures from packages list to the list package_and_dependencies. + Parameters: + package_manager (PackageManager): Package manager used. + package (string): Input package for which same package name but different architecture need to be added in the list package_and_dependencies. + version (string): version of the package. + packages (List of strings): List of all packages selected by user to install. + package_versions (List of strings): Versions of packages in packages list. + package_and_dependencies (List of strings): List of packages along with dependencies. This function adds packages with same name as input parameter package + but different architecture in this list. + package_and_dependency_versions (List of strings): Versions of packages in package_and_dependencies. + """ + package_name_without_arch = package_manager.get_product_name_without_arch(package) + for possible_arch_dependency, possible_arch_dependency_version in zip(packages, package_versions): + if package_manager.get_product_name_without_arch(possible_arch_dependency) == package_name_without_arch and possible_arch_dependency not in package_and_dependencies and possible_arch_dependency_version == version: + package_and_dependencies.append(possible_arch_dependency) + package_and_dependency_versions.append(possible_arch_dependency_version) + + def is_valid_update(self, package_details_in_output, package_arch_to_look_for): + # Verifies whether the line under consideration (i.e. package_details_in_output) contains relevant package details. + # package_details_in_output will be of the following format if it is valid + # Sample package details in TDNF: + # python3-libs x86_64 3.12.3-5.azl3 azurelinux-official-base 36.05M 10.52M + return len(package_details_in_output) == 6 and self.is_arch_in_package_details(package_details_in_output[1], package_arch_to_look_for) + + @staticmethod + def is_arch_in_package_details(package_detail, package_arch_to_look_for): + # Using a list comprehension to determine if chunk is a package + return len([p for p in package_arch_to_look_for if p in package_detail]) == 1 + + def get_dependent_list(self, packages): + """Returns dependent List for the list of packages""" + package_names = "" + for index, package in enumerate(packages): + if index != 0: + package_names += ' ' + package_names += package + + self.composite_logger.log_verbose("[TDNF] Resolving dependencies. [Command={0}]".format(str(self.single_package_upgrade_simulation_cmd + package_names))) + output = self.invoke_package_manager(self.single_package_upgrade_simulation_cmd + package_names) + dependencies = self.extract_dependencies(output, packages) + self.composite_logger.log_verbose("[TDNF] Resolved dependencies. [Packages={0}][DependencyCount={1}]".format(str(packages), len(dependencies))) + return dependencies + + def get_product_name(self, package_name): + """Retrieve package name """ + return package_name + + def get_product_name_and_arch(self, package_name): + """Splits out product name and architecture - if this is changed, modify in PackageFilter also""" + architectures = Constants.SUPPORTED_PACKAGE_ARCH + for arch in architectures: + if package_name.endswith(arch): + return package_name[:-len(arch)], arch + return package_name, None + + def get_product_name_without_arch(self, package_name): + """Retrieve product name only""" + product_name, arch = self.get_product_name_and_arch(package_name) + return product_name + + def get_product_arch(self, package_name): + """Retrieve product architecture only""" + product_name, arch = self.get_product_name_and_arch(package_name) + return arch + + def get_product_name_with_arch(self, package_detail, package_arch_to_look_for): + """Retrieve product name with arch separated by '.'. Note: This format is default in tdnf. Refer samples noted within func extract_dependencies() for more clarity""" + return package_detail[0] + "." + package_detail[1] if package_detail[1] in package_arch_to_look_for else package_detail[1] + + def get_package_size(self, output): + """Retrieve package size from installation output string""" + # Sample output line: + # Total download size: 15 M + if "Nothing to do" not in output: + lines = output.strip().split('\n') + for line in lines: + if line.find(self.STR_TOTAL_DOWNLOAD_SIZE) >= 0: + return line.replace(self.STR_TOTAL_DOWNLOAD_SIZE, "") + + return Constants.UNKNOWN_PACKAGE_SIZE + # endregion + + # region auto OS updates + def __init_constants_for_dnf_automatic(self): + self.dnf_automatic_configuration_file_path = '/etc/dnf/automatic.conf' + self.dnf_automatic_install_check_cmd = 'systemctl list-unit-files --type=service | grep dnf-automatic.service' # list-unit-files returns installed services, ref: https://www.freedesktop.org/software/systemd/man/systemctl.html#Unit%20File%20Commands + self.dnf_automatic_enable_on_reboot_check_cmd = 'systemctl is-enabled dnf-automatic.timer' + self.dnf_automatic_disable_on_reboot_cmd = 'systemctl disable dnf-automatic.timer' + self.dnf_automatic_config_pattern_match_text = ' = (no|yes)' + self.dnf_automatic_download_updates_identifier_text = 'download_updates' + self.dnf_automatic_apply_updates_identifier_text = 'apply_updates' + self.dnf_automatic_enable_on_reboot_identifier_text = "enable_on_reboot" + self.dnf_automatic_installation_state_identifier_text = "installation_state" + self.dnf_auto_os_update_service = "dnf-automatic" + + def get_current_auto_os_patch_state(self): + """ Gets the current auto OS update patch state on the machine """ + self.composite_logger.log("Fetching the current automatic OS patch state on the machine...") + + current_auto_os_patch_state_for_dnf_automatic = self.__get_current_auto_os_patch_state_for_dnf_automatic() + + self.composite_logger.log("OS patch state per auto OS update service: [dnf-automatic={0}]".format(str(current_auto_os_patch_state_for_dnf_automatic))) + + if current_auto_os_patch_state_for_dnf_automatic == Constants.AutomaticOSPatchStates.ENABLED: + current_auto_os_patch_state = Constants.AutomaticOSPatchStates.ENABLED + elif current_auto_os_patch_state_for_dnf_automatic == Constants.AutomaticOSPatchStates.DISABLED: + current_auto_os_patch_state = Constants.AutomaticOSPatchStates.DISABLED + else: + current_auto_os_patch_state = Constants.AutomaticOSPatchStates.UNKNOWN + + self.composite_logger.log_debug("Overall Auto OS Patch State based on all auto OS update service states [OverallAutoOSPatchState={0}]".format(str(current_auto_os_patch_state))) + return current_auto_os_patch_state + + def __get_current_auto_os_patch_state_for_dnf_automatic(self): + """ Gets current auto OS update patch state for dnf-automatic """ + self.composite_logger.log_debug("Fetching current automatic OS patch state in dnf-automatic service. This includes checks on whether the service is installed, current auto patch enable state and whether it is set to enable on reboot") + self.__init_auto_update_for_dnf_automatic() + is_service_installed, enable_on_reboot_value, download_updates_value, apply_updates_value = self.__get_current_auto_os_updates_setting_on_machine() + + apply_updates = self.__get_extension_standard_value_for_apply_updates(apply_updates_value) + + if apply_updates == self.apply_updates_enabled or enable_on_reboot_value: + return Constants.AutomaticOSPatchStates.ENABLED + # OS patch state is considered to be disabled: a) if it was successfully disabled or b) if the service is not installed + elif not is_service_installed or (apply_updates == self.apply_updates_disabled and not enable_on_reboot_value): + return Constants.AutomaticOSPatchStates.DISABLED + else: + return Constants.AutomaticOSPatchStates.UNKNOWN + + def __init_auto_update_for_dnf_automatic(self): + """ Initializes all generic auto OS update variables with the config values for dnf automatic service """ + self.os_patch_configuration_settings_file_path = self.dnf_automatic_configuration_file_path + self.download_updates_identifier_text = self.dnf_automatic_download_updates_identifier_text + self.apply_updates_identifier_text = self.dnf_automatic_apply_updates_identifier_text + self.enable_on_reboot_identifier_text = self.dnf_automatic_enable_on_reboot_identifier_text + self.installation_state_identifier_text = self.dnf_automatic_installation_state_identifier_text + self.auto_update_config_pattern_match_text = self.dnf_automatic_config_pattern_match_text + self.enable_on_reboot_check_cmd = self.dnf_automatic_enable_on_reboot_check_cmd + self.install_check_cmd = self.dnf_automatic_install_check_cmd + self.current_auto_os_update_service = self.dnf_auto_os_update_service + + def __get_current_auto_os_updates_setting_on_machine(self): + """ Gets all the update settings related to auto OS updates currently set on the machine """ + try: + download_updates_value = "" + apply_updates_value = "" + is_service_installed = False + enable_on_reboot_value = False + + # get install state + if not self.is_auto_update_service_installed(self.install_check_cmd): + return is_service_installed, enable_on_reboot_value, download_updates_value, apply_updates_value + + is_service_installed = True + enable_on_reboot_value = self.is_service_set_to_enable_on_reboot(self.enable_on_reboot_check_cmd) + + self.composite_logger.log_debug("[TDNF] Checking if auto updates are currently enabled...") + image_default_patch_configuration = self.env_layer.file_system.read_with_retry(self.os_patch_configuration_settings_file_path, raise_if_not_found=False) + if image_default_patch_configuration is not None: + settings = image_default_patch_configuration.strip().split('\n') + for setting in settings: + match = re.search(self.download_updates_identifier_text + self.auto_update_config_pattern_match_text, str(setting)) + if match is not None: + download_updates_value = match.group(1) + + match = re.search(self.apply_updates_identifier_text + self.auto_update_config_pattern_match_text, str(setting)) + if match is not None: + apply_updates_value = match.group(1) + + if download_updates_value == "": + self.composite_logger.log_debug("[TDNF] Machine did not have any value set for [Setting={0}]".format(str(self.download_updates_identifier_text))) + else: + self.composite_logger.log_verbose("[TDNF] Current value set for [{0}={1}]".format(str(self.download_updates_identifier_text), str(download_updates_value))) + + if apply_updates_value == "": + self.composite_logger.log_debug("[TDNF] Machine did not have any value set for [Setting={0}]".format(str(self.apply_updates_identifier_text))) + else: + self.composite_logger.log_verbose("[TDNF] Current value set for [{0}={1}]".format(str(self.apply_updates_identifier_text), str(apply_updates_value))) + + return is_service_installed, enable_on_reboot_value, download_updates_value, apply_updates_value + + except Exception as error: + raise Exception("[TDNF] Error occurred in fetching current auto OS update settings from the machine. [Exception={0}]".format(repr(error))) + + def is_auto_update_service_installed(self, install_check_cmd): + """ Checks if the auto update service is enable_on_reboot on the VM """ + code, out = self.env_layer.run_command_output(install_check_cmd, False, False) + self.composite_logger.log_debug("[TDNF] Checked if auto update service is installed. [Command={0}][Code={1}][Output={2}]".format(install_check_cmd, str(code), out)) + if len(out.strip()) > 0 and code == 0: + self.composite_logger.log_debug("[TDNF] > Auto OS update service is installed on the machine") + return True + else: + self.composite_logger.log_debug("[TDNF] > Auto OS update service is NOT installed on the machine") + return False + + def is_service_set_to_enable_on_reboot(self, command): + """ Checking if auto update is enable_on_reboot on the machine. An enable_on_reboot service will be activated (if currently inactive) on machine reboot """ + code, out = self.env_layer.run_command_output(command, False, False) + self.composite_logger.log_debug("[TDNF] Checked if auto update service is set to enable on reboot. [Code={0}][Out={1}]".format(str(code), out)) + if len(out.strip()) > 0 and code == 0 and 'enabled' in out: + self.composite_logger.log_debug("[TDNF] > Auto OS update service will enable on reboot") + return True + self.composite_logger.log_debug("[TDNF] > Auto OS update service will NOT enable on reboot") + return False + + def __get_extension_standard_value_for_apply_updates(self, apply_updates_value): + if apply_updates_value.lower() == 'yes' or apply_updates_value.lower() == 'true': + return self.apply_updates_enabled + elif apply_updates_value.lower() == 'no' or apply_updates_value.lower() == 'false': + return self.apply_updates_disabled + else: + return self.apply_updates_unknown + + def disable_auto_os_update(self): + """ Disables auto OS updates on the machine only if they are enabled and logs the default settings the machine comes with """ + try: + self.composite_logger.log_verbose("[TDNF] Disabling auto OS updates in all identified services...") + self.disable_auto_os_update_for_dnf_automatic() + self.composite_logger.log_debug("[TDNF] Successfully disabled auto OS updates") + + except Exception as error: + self.composite_logger.log_error("[TDNF] Could not disable auto OS updates. [Error={0}]".format(repr(error))) + raise + + def disable_auto_os_update_for_dnf_automatic(self): + """ Disables auto OS updates, using dnf-automatic service, and logs the default settings the machine comes with """ + self.composite_logger.log_verbose("[TDNF] Disabling auto OS updates using dnf-automatic") + self.__init_auto_update_for_dnf_automatic() + + self.backup_image_default_patch_configuration_if_not_exists() + + if not self.is_auto_update_service_installed(self.dnf_automatic_install_check_cmd): + self.composite_logger.log_debug("[TDNF] Cannot disable as dnf-automatic is not installed on the machine") + return + + self.composite_logger.log_verbose("[TDNF] Preemptively disabling auto OS updates using dnf-automatic") + self.update_os_patch_configuration_sub_setting(self.download_updates_identifier_text, "no", self.dnf_automatic_config_pattern_match_text) + self.update_os_patch_configuration_sub_setting(self.apply_updates_identifier_text, "no", self.dnf_automatic_config_pattern_match_text) + self.disable_auto_update_on_reboot(self.dnf_automatic_disable_on_reboot_cmd) + + self.composite_logger.log_debug("[TDNF] Successfully disabled auto OS updates using dnf-automatic") + + def disable_auto_update_on_reboot(self, command): + self.composite_logger.log_verbose("[TDNF] Disabling auto update on reboot. [Command={0}] ".format(command)) + code, out = self.env_layer.run_command_output(command, False, False) + + if code != 0: + self.composite_logger.log_error("[TDNF][ERROR] Error disabling auto update on reboot. [Command={0}][Code={1}][Output={2}]".format(command, str(code), out)) + error_msg = 'Unexpected return code (' + str(code) + ') on command: ' + command + self.status_handler.add_error_to_status(error_msg, Constants.PatchOperationErrorCodes.OPERATION_FAILED) + raise Exception(error_msg, "[{0}]".format(Constants.ERROR_ADDED_TO_STATUS)) + else: + self.composite_logger.log_debug("[TDNF] Disabled auto update on reboot. [Command={0}][Code={1}][Output={2}]".format(command, str(code), out)) + + def backup_image_default_patch_configuration_if_not_exists(self): + """ Records the default system settings for auto OS updates within patch extension artifacts for future reference. + We only log the default system settings a VM comes with, any subsequent updates will not be recorded""" + """ JSON format for backup file: + { + "dnf-automatic": { + "apply_updates": "yes/no/empty string", + "download_updates": "yes/no/empty string", + "enable_on_reboot": true/false, + "install_state": true/false + } + } """ + try: + self.composite_logger.log_debug("[TDNF] Ensuring there is a backup of the default patch state for [AutoOSUpdateService={0}]".format(str(self.current_auto_os_update_service))) + image_default_patch_configuration_backup = {} + + # read existing backup since it also contains backup from other update services. We need to preserve any existing data within the backup file + if self.image_default_patch_configuration_backup_exists(): + try: + image_default_patch_configuration_backup = json.loads(self.env_layer.file_system.read_with_retry(self.image_default_patch_configuration_backup_path)) + except Exception as error: + self.composite_logger.log_error("Unable to read backup for default patch state. Will attempt to re-write. [Exception={0}]".format(repr(error))) + + # verify if existing backup is valid if not, write to backup + is_backup_valid = self.is_image_default_patch_configuration_backup_valid(image_default_patch_configuration_backup) + if is_backup_valid: + self.composite_logger.log_debug("[TDNF] Since extension has a valid backup, no need to log the current settings again. [Default Auto OS update settings={0}] [File path={1}]" + .format(str(image_default_patch_configuration_backup), self.image_default_patch_configuration_backup_path)) + else: + self.composite_logger.log_debug("[TDNF] Since the backup is invalid, will add a new backup with the current auto OS update settings") + self.composite_logger.log_debug("[TDNF] Fetching current auto OS update settings for [AutoOSUpdateService={0}]".format(str(self.current_auto_os_update_service))) + is_service_installed, enable_on_reboot_value, download_updates_value, apply_updates_value = self.__get_current_auto_os_updates_setting_on_machine() + + backup_image_default_patch_configuration_json_to_add = { + self.current_auto_os_update_service: { + self.download_updates_identifier_text: download_updates_value, + self.apply_updates_identifier_text: apply_updates_value, + self.enable_on_reboot_identifier_text: enable_on_reboot_value, + self.installation_state_identifier_text: is_service_installed + } + } + + image_default_patch_configuration_backup.update(backup_image_default_patch_configuration_json_to_add) + + self.composite_logger.log_debug("[TDNF] Logging default system configuration settings for auto OS updates. [Settings={0}] [Log file path={1}]" + .format(str(image_default_patch_configuration_backup), self.image_default_patch_configuration_backup_path)) + self.env_layer.file_system.write_with_retry(self.image_default_patch_configuration_backup_path, '{0}'.format(json.dumps(image_default_patch_configuration_backup)), mode='w+') + except Exception as error: + error_message = "[TDNF] Exception during fetching and logging default auto update settings on the machine. [Exception={0}]".format(repr(error)) + self.composite_logger.log_error(error_message) + self.status_handler.add_error_to_status(error_message, Constants.PatchOperationErrorCodes.DEFAULT_ERROR) + raise + + def is_image_default_patch_configuration_backup_valid(self, image_default_patch_configuration_backup): + """ Verifies if default auto update configurations, for a service under consideration, are saved in backup """ + return self.is_backup_valid_for_dnf_automatic(image_default_patch_configuration_backup) + + def is_backup_valid_for_dnf_automatic(self, image_default_patch_configuration_backup): + if self.dnf_auto_os_update_service in image_default_patch_configuration_backup \ + and self.dnf_automatic_download_updates_identifier_text in image_default_patch_configuration_backup[self.dnf_auto_os_update_service] \ + and self.dnf_automatic_apply_updates_identifier_text in image_default_patch_configuration_backup[self.dnf_auto_os_update_service] \ + and self.dnf_automatic_enable_on_reboot_identifier_text in image_default_patch_configuration_backup[self.dnf_auto_os_update_service] \ + and self.dnf_automatic_installation_state_identifier_text in image_default_patch_configuration_backup[self.dnf_auto_os_update_service]: + self.composite_logger.log_debug("[TDNF] Extension has a valid backup for default dnf-automatic configuration settings") + return True + else: + self.composite_logger.log_debug("[TDNF] Extension does not have a valid backup for default dnf-automatic configuration settings") + return False + + def update_os_patch_configuration_sub_setting(self, patch_configuration_sub_setting, value="no", config_pattern_match_text=""): + """ Updates (or adds if it doesn't exist) the given patch_configuration_sub_setting with the given value in os_patch_configuration_settings_file """ + try: + # note: adding space between the patch_configuration_sub_setting and value since, we will have to do that if we have to add a patch_configuration_sub_setting that did not exist before + self.composite_logger.log_debug("[TDNF] Updating system configuration settings for auto OS updates. [Patch Configuration Sub Setting={0}] [Value={1}]".format(str(patch_configuration_sub_setting), value)) + os_patch_configuration_settings = self.env_layer.file_system.read_with_retry(self.os_patch_configuration_settings_file_path) + patch_configuration_sub_setting_to_update = patch_configuration_sub_setting + ' = ' + value + patch_configuration_sub_setting_found_in_file = False + updated_patch_configuration_sub_setting = "" + settings = os_patch_configuration_settings.strip().split('\n') + + # update value of existing setting + for i in range(len(settings)): + match = re.search(patch_configuration_sub_setting + config_pattern_match_text, settings[i]) + if match is not None: + settings[i] = patch_configuration_sub_setting_to_update + patch_configuration_sub_setting_found_in_file = True + updated_patch_configuration_sub_setting += settings[i] + "\n" + + # add setting to configuration file, since it doesn't exist + if not patch_configuration_sub_setting_found_in_file: + updated_patch_configuration_sub_setting += patch_configuration_sub_setting_to_update + "\n" + + self.env_layer.file_system.write_with_retry(self.os_patch_configuration_settings_file_path, '{0}'.format(updated_patch_configuration_sub_setting.lstrip()), mode='w+') + except Exception as error: + error_msg = "[TDNF] Error occurred while updating system configuration settings for auto OS updates. [Patch Configuration={0}] [Error={1}]".format(str(patch_configuration_sub_setting), repr(error)) + self.composite_logger.log_error(error_msg) + self.status_handler.add_error_to_status(error_msg, Constants.PatchOperationErrorCodes.DEFAULT_ERROR) + raise + + # endregion + + # region Reboot Management + def is_reboot_pending(self): + """ Checks if there is a pending reboot on the machine. """ + try: + pending_file_exists = os.path.isfile(self.REBOOT_PENDING_FILE_PATH) + pending_processes_exist = self.do_processes_require_restart() + self.composite_logger.log_debug("[TDNF] > Reboot required debug flags (tdnf): " + str(pending_file_exists) + ", " + str(pending_processes_exist) + ".") + return pending_file_exists or pending_processes_exist + except Exception as error: + self.composite_logger.log_error('[TDNF] Error while checking for reboot pending (tdnf): ' + repr(error)) + return True # defaults for safety + + def do_processes_require_restart(self): + """Signals whether processes require a restart due to updates""" + self.composite_logger.log_verbose("[TDNF] Checking if process requires reboot") + # Checking using dnf-utils + code, out = self.env_layer.run_command_output(self.dnf_utils_prerequisite, False, False) # idempotent, doesn't install if already present + self.composite_logger.log_verbose("[TDNF] Idempotent dnf-utils existence check. [Code={0}][Out={1}]".format(str(code), out)) + + # Checking for restart for distros with -r flag + code, out = self.env_layer.run_command_output(self.needs_restarting_with_flag, False, False) + self.composite_logger.log_verbose("[TDNF] > Code: " + str(code) + ", Output: \n|\t" + "\n|\t".join(out.splitlines())) + if out.find("Reboot is required") < 0: + self.composite_logger.log_debug("[TDNF] > Reboot not detected to be required (L1).") + else: + self.composite_logger.log_debug("[TDNF] > Reboot is detected to be required (L1).") + return True + + return False + # endregion + + def set_security_esm_package_status(self, operation, packages): + """ Set the security-ESM classification for the esm packages. Only needed for apt. No-op for tdnf, yum and zypper.""" + pass + + def separate_out_esm_packages(self, packages, package_versions): + """Filter out packages from the list where the version matches the UA_ESM_REQUIRED string. + Only needed for apt. No-op for tdnf, yum and zypper""" + esm_packages = [] + esm_package_versions = [] + esm_packages_found = False + + return packages, package_versions, esm_packages, esm_package_versions, esm_packages_found + + def get_package_install_expected_avg_time_in_seconds(self): + return self.package_install_expected_avg_time_in_seconds + diff --git a/src/core/src/package_managers/YumPackageManager.py b/src/core/src/package_managers/YumPackageManager.py index 60df32c0..1773fb75 100644 --- a/src/core/src/package_managers/YumPackageManager.py +++ b/src/core/src/package_managers/YumPackageManager.py @@ -1018,14 +1018,14 @@ def add_arch_dependencies(self, package_manager, package, version, packages, pac def set_security_esm_package_status(self, operation, packages): """ - Set the security-ESM classification for the esm packages. Only needed for apt. No-op for yum and zypper. + Set the security-ESM classification for the esm packages. Only needed for apt. No-op for tdnf, yum and zypper. """ pass def separate_out_esm_packages(self, packages, package_versions): """ Filter out packages from the list where the version matches the UA_ESM_REQUIRED string. - Only needed for apt. No-op for yum and zypper + Only needed for apt. No-op for tdnf, yum and zypper """ esm_packages = [] esm_package_versions = [] diff --git a/src/core/src/package_managers/ZypperPackageManager.py b/src/core/src/package_managers/ZypperPackageManager.py index 1b6d2c60..574d3fd3 100644 --- a/src/core/src/package_managers/ZypperPackageManager.py +++ b/src/core/src/package_managers/ZypperPackageManager.py @@ -821,14 +821,14 @@ def add_arch_dependencies(self, package_manager, package, version, packages, pac def set_security_esm_package_status(self, operation, packages): """ - Set the security-ESM classification for the esm packages. Only needed for apt. No-op for yum and zypper. + Set the security-ESM classification for the esm packages. Only needed for apt. No-op for tdnf, yum and zypper. """ pass def separate_out_esm_packages(self, packages, package_versions): """ Filter out packages from the list where the version matches the UA_ESM_REQUIRED string. - Only needed for apt. No-op for yum and zypper + Only needed for apt. No-op for tdnf, yum and zypper """ esm_packages = [] esm_package_versions = [] diff --git a/src/core/tests/Test_CoreMain.py b/src/core/tests/Test_CoreMain.py index 682787d2..daee38d3 100644 --- a/src/core/tests/Test_CoreMain.py +++ b/src/core/tests/Test_CoreMain.py @@ -47,6 +47,9 @@ def mock_linux_distribution_to_return_centos(self): def mock_linux_distribution_to_return_redhat(self): return ['Red Hat Enterprise Linux Server', '7.5', 'Maipo'] + def mock_linux_distribution_to_return_azure_linux(self): + return ['Microsoft Azure Linux', '3.0', ''] + def mock_os_remove(self, file_to_remove): raise Exception("File could not be deleted") @@ -637,6 +640,56 @@ def test_install_only_critical_and_security_packages_for_redhat_autopatching_war LegacyEnvLayerExtensions.LegacyPlatform.linux_distribution = backup_envlayer_platform_linux_distribution + def test_install_all_packages_for_azure_linux_autopatching(self): + """Unit test for auto patching request on Azure Linux, should install all patches irrespective of classification""" + + backup_envlayer_platform_linux_distribution = LegacyEnvLayerExtensions.LegacyPlatform.linux_distribution + LegacyEnvLayerExtensions.LegacyPlatform.linux_distribution = self.mock_linux_distribution_to_return_azure_linux + + argument_composer = ArgumentComposer() + classifications_to_include = ["Security", "Critical"] + argument_composer.health_store_id = str("pub_off_sku_2025.03.24") + argument_composer.classifications_to_include = classifications_to_include + argument_composer.reboot_setting = 'Always' + runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True, Constants.TDNF) + runtime.set_legacy_test_type("HappyPath") + CoreMain(argument_composer.get_composed_arguments()) + + # check telemetry events + self.__check_telemetry_events(runtime) + + # check status file + with runtime.env_layer.file_system.open(runtime.execution_config.status_file_path, 'r') as file_handle: + substatus_file_data = json.load(file_handle)[0]["status"]["substatus"] + self.assertEqual(len(substatus_file_data), 4) + self.assertTrue(substatus_file_data[0]["name"] == Constants.PATCH_ASSESSMENT_SUMMARY) + self.assertTrue(substatus_file_data[0]["status"].lower() == Constants.STATUS_SUCCESS.lower()) + self.assertTrue(substatus_file_data[1]["name"] == Constants.PATCH_INSTALLATION_SUMMARY) + self.assertTrue(substatus_file_data[1]["status"].lower() == Constants.STATUS_SUCCESS.lower()) + self.assertTrue(json.loads(substatus_file_data[1]["formattedMessage"]["message"])["installedPatchCount"] == 9) + self.assertEqual(json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][1]["name"], "azurelinux-repos-ms-oss.noarch") + self.assertTrue("Other" in str(json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][1]["classifications"])) + self.assertTrue("Installed" == json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][1]["patchInstallationState"]) + self.assertEqual(json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][2]["name"], "libseccomp.x86_64") + self.assertTrue("Other" in str(json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][2]["classifications"])) + self.assertTrue("Installed" == json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][2]["patchInstallationState"]) + self.assertEqual(json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][0]["name"], "azurelinux-release.noarch") + self.assertTrue("Other" in str(json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][0]["classifications"])) + self.assertTrue("Installed" == json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][0]["patchInstallationState"]) + self.assertEqual(json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][3]["name"], "python3.x86_64") + self.assertTrue("Other" in str(json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][3]["classifications"])) + self.assertTrue("Installed" == json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][3]["patchInstallationState"]) + self.assertTrue(substatus_file_data[2]["name"] == Constants.PATCH_METADATA_FOR_HEALTHSTORE) + self.assertTrue(substatus_file_data[2]["status"].lower() == Constants.STATUS_SUCCESS.lower()) + substatus_file_data_patch_metadata_summary = json.loads(substatus_file_data[2]["formattedMessage"]["message"]) + self.assertEqual(substatus_file_data_patch_metadata_summary["patchVersion"], "pub_off_sku_2025.03.24") + self.assertTrue(substatus_file_data_patch_metadata_summary["shouldReportToHealthStore"]) + self.assertTrue(substatus_file_data[3]["name"] == Constants.CONFIGURE_PATCHING_SUMMARY) + self.assertTrue(substatus_file_data[3]["status"].lower() == Constants.STATUS_SUCCESS.lower()) + runtime.stop() + + LegacyEnvLayerExtensions.LegacyPlatform.linux_distribution = backup_envlayer_platform_linux_distribution + # test with both assessment mode and patch mode set in configure patching or install patches or assess patches or auto assessment def test_auto_assessment_success_with_configure_patching_in_prev_operation_on_same_sequence(self): """Unit test for auto assessment request with configure patching completed on the sequence before. Result: should retain prev substatus and update only PatchAssessmentSummary""" diff --git a/src/core/tests/Test_EnvLayer.py b/src/core/tests/Test_EnvLayer.py new file mode 100644 index 00000000..54ad3eb4 --- /dev/null +++ b/src/core/tests/Test_EnvLayer.py @@ -0,0 +1,87 @@ +# Copyright 2025 Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Requires Python 2.7+ +import platform +import unittest +from core.src.bootstrap.EnvLayer import EnvLayer +from core.src.bootstrap.Constants import Constants + + +class TestExecutionConfig(unittest.TestCase): + def setUp(self): + self.envlayer = EnvLayer() + + def tearDown(self): + pass + + # region setup mocks + def mock_platform_system(self): + return 'Linux' + + def mock_linux_distribution(self): + return ['test', 'test', 'test'] + + def mock_linux_distribution_to_return_azure_linux(self): + return ['Microsoft Azure Linux', '3.0', ''] + + def mock_run_command_for_apt(self, cmd, no_output=False, chk_err=False): + if cmd.find("which apt-get") > -1: + return 0, '' + return -1, '' + + def mock_run_command_for_yum(self, cmd, no_output=False, chk_err=False): + if cmd.find("which yum") > -1: + return 0, '' + return -1, '' + + def mock_run_command_for_zypper(self, cmd, no_output=False, chk_err=False): + if cmd.find("which zypper") > -1: + return 0, '' + return -1, '' + + def mock_run_command_for_tdnf(self, cmd, no_output=False, chk_err=False): + if cmd.find("which tdnf") > -1: + return 0, '' + return -1, '' + # endregion + + def test_get_package_manager(self): + self.backup_platform_system = platform.system() + platform.system = self.mock_platform_system + self.backup_linux_distribution = self.envlayer.platform.linux_distribution + self.envlayer.platform.linux_distribution = self.mock_linux_distribution + self.backup_run_command_output = self.envlayer.run_command_output + + test_input_output_table = [ + [self.mock_run_command_for_apt, self.mock_linux_distribution, Constants.APT], + [self.mock_run_command_for_tdnf, self.mock_linux_distribution_to_return_azure_linux, Constants.TDNF], + [self.mock_run_command_for_yum, self.mock_linux_distribution_to_return_azure_linux, None], # check for Azure Linux machine which does not have tdnf + [self.mock_run_command_for_yum, self.mock_linux_distribution, Constants.YUM], + [self.mock_run_command_for_zypper, self.mock_linux_distribution, Constants.ZYPPER], + ] + + for row in test_input_output_table: + self.envlayer.run_command_output = row[0] + self.envlayer.platform.linux_distribution = row[1] + package_manager = self.envlayer.get_package_manager() + self.assertTrue(package_manager is row[2]) + + self.envlayer.run_command_output = self.backup_run_command_output + self.envlayer.platform.linux_distribution = self.backup_linux_distribution + platform.system = self.backup_platform_system + + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/src/core/tests/Test_TdnfPackageManager.py b/src/core/tests/Test_TdnfPackageManager.py new file mode 100644 index 00000000..ddaa9852 --- /dev/null +++ b/src/core/tests/Test_TdnfPackageManager.py @@ -0,0 +1,653 @@ +# Copyright 2025 Microsoft Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Requires Python 2.7+ +import json +import os +import unittest +from core.src.bootstrap.Constants import Constants +from core.tests.library.LegacyEnvLayerExtensions import LegacyEnvLayerExtensions +from core.tests.library.ArgumentComposer import ArgumentComposer +from core.tests.library.RuntimeCompositor import RuntimeCompositor + + +class TestTdnfPackageManager(unittest.TestCase): + def setUp(self): + self.runtime = RuntimeCompositor(ArgumentComposer().get_composed_arguments(), True, Constants.TDNF) + self.container = self.runtime.container + + def tearDown(self): + self.runtime.stop() + + # region Mocks + def mock_do_processes_require_restart_raise_exception(self): + raise Exception + + def mock_linux_distribution_to_return_azure_linux(self): + return ['Microsoft Azure Linux', '3.0', ''] + + def mock_write_with_retry_raise_exception(self, file_path_or_handle, data, mode='a+'): + raise Exception + # endregion + + def test_do_processes_require_restart(self): + """Unit test for tdnf package manager""" + # Restart required + self.runtime.set_legacy_test_type('HappyPath') + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager) + self.assertTrue(package_manager.is_reboot_pending()) + + # Restart not required + self.runtime.set_legacy_test_type('SadPath') + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager is not None) + self.assertFalse(package_manager.is_reboot_pending()) + + # Fake exception + self.runtime.set_legacy_test_type('SadPath') + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager is not None) + backup_do_processes_require_restart = package_manager.do_processes_require_restart + package_manager.do_processes_require_restart = self.mock_do_processes_require_restart_raise_exception + self.assertTrue(package_manager.is_reboot_pending()) # returns true because the safe default if a failure occurs is 'true' + package_manager.do_processes_require_restart = backup_do_processes_require_restart + + def test_package_manager_no_updates(self): + """Unit test for tdnf package manager with no updates""" + # Path change + self.runtime.set_legacy_test_type('SadPath') + + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager is not None) + package_filter = self.container.get('package_filter') + self.assertTrue(package_filter is not None) + + available_updates, package_versions = package_manager.get_available_updates(package_filter) + self.assertEqual(len(available_updates), 0) + self.assertEqual(len(package_versions), 0) + + def test_package_manager_unaligned_updates(self): + # Path change + self.runtime.set_legacy_test_type('UnalignedPath') + + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager is not None) + package_filter = self.container.get('package_filter') + self.assertTrue(package_filter is not None) + + try: + package_manager.get_available_updates(package_filter) + except Exception as exception: + self.assertTrue(str(exception)) + else: + self.assertFalse(1 != 2, 'Exception did not occur and test failed.') + + def test_package_manager(self): + """Unit test for tdnf package manager""" + self.runtime.set_legacy_test_type('HappyPath') + + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager is not None) + package_filter = self.container.get('package_filter') + self.assertTrue(package_filter is not None) + + # test for get_available_updates + available_updates, package_versions = package_manager.get_available_updates(package_filter) + self.assertTrue(available_updates is not None) + self.assertTrue(package_versions is not None) + self.assertEqual(9, len(available_updates)) + self.assertEqual(9, len(package_versions)) + self.assertEqual("azurelinux-release.noarch", available_updates[0]) + self.assertEqual("azurelinux-repos-ms-oss.noarch", available_updates[1]) + self.assertEqual("3.0-16.azl3", package_versions[0]) + self.assertEqual("3.0-3.azl3", package_versions[1]) + + # test for get_package_size when size is available + cmd = package_manager.single_package_upgrade_cmd + "curl" + code, out = self.runtime.env_layer.run_command_output(cmd, False, False) + size = package_manager.get_package_size(out) + self.assertEqual(size, "661.34k") + + # test for get_package_size when size is not available + cmd = package_manager.single_package_upgrade_cmd + "systemd" + code, out = self.runtime.env_layer.run_command_output(cmd, False, False) + size = package_manager.get_package_size(out) + self.assertEqual(size, Constants.UNKNOWN_PACKAGE_SIZE) + + # test for all available versions + package_versions = package_manager.get_all_available_versions_of_package("python3") + self.assertEqual(len(package_versions), 6) + self.assertEqual(package_versions[0], '3.12.3-1.azl3') + self.assertEqual(package_versions[1], '3.12.3-2.azl3') + self.assertEqual(package_versions[2], '3.12.3-4.azl3') + self.assertEqual(package_versions[3], '3.12.3-5.azl3') + self.assertEqual(package_versions[4], '3.12.3-6.azl3') + self.assertEqual(package_versions[5], '3.12.9-1.azl3') + + # test for get_dependent_list + dependent_list = package_manager.get_dependent_list(["hyperv-daemons.x86_64"]) + self.assertTrue(dependent_list is not None) + self.assertEqual(len(dependent_list), 4) + self.assertEqual(dependent_list[0], "hyperv-daemons-license.noarch") + self.assertEqual(dependent_list[1], "hypervvssd.x86_64") + self.assertEqual(dependent_list[2], "hypervkvpd.x86_64") + self.assertEqual(dependent_list[3], "hypervfcopyd.x86_64") + + # test install cmd + packages = ['kernel.x86_64', 'selinux-policy-targeted.noarch'] + package_versions = ['2.02.177-4.el7', '3.10.0-862.el7'] + cmd = package_manager.get_install_command('sudo tdnf -y install --skip-broken ', packages, package_versions) + self.assertEqual(cmd, 'sudo tdnf -y install --skip-broken kernel-2.02.177-4.el7.x86_64 selinux-policy-targeted-3.10.0-862.el7.noarch') + packages = ['kernel.x86_64'] + package_versions = ['2.02.177-4.el7'] + cmd = package_manager.get_install_command('sudo tdnf -y install --skip-broken ', packages, package_versions) + self.assertEqual(cmd, 'sudo tdnf -y install --skip-broken kernel-2.02.177-4.el7.x86_64') + packages = ['kernel.x86_64', 'kernel.i686'] + package_versions = ['2.02.177-4.el7', '2.02.177-4.el7'] + cmd = package_manager.get_install_command('sudo tdnf -y install --skip-broken ', packages, package_versions) + self.assertEqual(cmd, 'sudo tdnf -y install --skip-broken kernel-2.02.177-4.el7.x86_64 kernel-2.02.177-4.el7.i686') + + self.runtime.stop() + self.runtime = RuntimeCompositor(ArgumentComposer().get_composed_arguments(), True, Constants.TDNF) + self.container = self.runtime.container + self.runtime.set_legacy_test_type('ExceptionPath') + + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager is not None) + package_filter = self.container.get('package_filter') + self.assertTrue(package_filter is not None) + # test for get_available_updates + try: + package_manager.get_available_updates(package_filter) + except Exception as exception: + self.assertTrue(str(exception)) + else: + self.assertFalse(1 != 2, 'Exception did not occur and test failed.') + + # test for get_dependent_list + try: + package_manager.get_dependent_list(["man"]) + except Exception as exception: + self.assertTrue(str(exception)) + else: + self.assertFalse(1 != 2, 'Exception did not occur and test failed.') + + def test_install_package_success(self): + """Unit test for install package success""" + self.runtime.set_legacy_test_type('SuccessInstallPath') + + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager is not None) + package_filter = self.container.get('package_filter') + self.assertTrue(package_filter is not None) + + # test for successfully installing a package + self.assertEqual(package_manager.install_update_and_dependencies_and_get_status('hyperv-daemons-license.noarch', '6.6.78.1-1.azl3', simulate=True), Constants.INSTALLED) + + def test_install_package_failure(self): + """Unit test for install package failure""" + self.runtime.set_legacy_test_type('FailInstallPath') + + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager is not None) + package_filter = self.container.get('package_filter') + self.assertTrue(package_filter is not None) + + # test for unsuccessfully installing a package + self.assertEqual(package_manager.install_update_and_dependencies_and_get_status('hyperv-daemons-license.noarch', '6.6.78.1-1.azl3', simulate=True), Constants.FAILED) + + def test_get_product_name(self): + """Unit test for retrieving product Name""" + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager is not None) + package_filter = self.container.get('package_filter') + self.assertTrue(package_filter is not None) + self.assertEqual(package_manager.get_product_name("bash.x86_64"), "bash.x86_64") + self.assertEqual(package_manager.get_product_name("firefox.x86_64"), "firefox.x86_64") + self.assertEqual(package_manager.get_product_name("test.noarch"), "test.noarch") + self.assertEqual(package_manager.get_product_name("noextension"), "noextension") + self.assertEqual(package_manager.get_product_name("noextension.ext"), "noextension.ext") + + def test_get_product_name_without_arch(self): + """Unit test for retrieving product Name""" + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager is not None) + package_filter = self.container.get('package_filter') + self.assertTrue(package_filter is not None) + self.assertEqual(package_manager.get_product_name_without_arch("bash.x86_64"), "bash") + self.assertEqual(package_manager.get_product_name_without_arch("firefox.x86_64"), "firefox") + self.assertEqual(package_manager.get_product_name_without_arch("test.noarch"), "test") + self.assertEqual(package_manager.get_product_name_without_arch("noextension"), "noextension") + self.assertEqual(package_manager.get_product_name_without_arch("noextension.ext"), "noextension.ext") + + def test_get_product_arch(self): + """Unit test for retrieving product arch""" + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager is not None) + package_filter = self.container.get('package_filter') + self.assertTrue(package_filter is not None) + self.assertEqual(package_manager.get_product_arch("bash.x86_64"), ".x86_64") + self.assertEqual(package_manager.get_product_arch("firefox.x86_64"), ".x86_64") + self.assertEqual(package_manager.get_product_arch("test.noarch"), ".noarch") + self.assertEqual(package_manager.get_product_arch("noextension"), None) + self.assertEqual(package_manager.get_product_arch("noextension.ext"), None) + + def test_inclusion_type_all(self): + """Unit test for tdnf package manager Classification = all and IncludedPackageNameMasks not specified.""" + self.runtime.set_legacy_test_type('HappyPath') + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager is not None) + self.runtime.stop() + + argument_composer = ArgumentComposer() + argument_composer.classifications_to_include = [Constants.PackageClassification.UNCLASSIFIED] + argument_composer.patches_to_exclude = ["ssh*", "test"] + self.runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True, Constants.TDNF) + self.container = self.runtime.container + + package_filter = self.container.get('package_filter') + + # test for get_available_updates + available_updates, package_versions = package_manager.get_available_updates(package_filter) + self.assertTrue(available_updates is not None) + self.assertTrue(package_versions is not None) + self.assertEqual(9, len(available_updates)) + self.assertEqual(9, len(package_versions)) + self.assertEqual("azurelinux-release.noarch", available_updates[0]) + self.assertEqual("3.0-16.azl3", package_versions[0]) + self.assertEqual("azurelinux-repos-ms-oss.noarch", available_updates[1]) + self.assertEqual("3.0-3.azl3", package_versions[1]) + self.assertEqual("libseccomp.x86_64", available_updates[2]) + self.assertEqual("2.5.4-1.azl3", package_versions[2]) + self.assertEqual("python3.x86_64", available_updates[3]) + self.assertEqual("3.12.3-6.azl3", package_versions[3]) + self.assertEqual("libxml2.x86_64", available_updates[4]) + self.assertEqual("2.11.5-1.azl3", package_versions[4]) + self.assertEqual("dracut.x86_64", available_updates[5]) + self.assertEqual("102-7.azl3", package_versions[5]) + self.assertEqual("hyperv-daemons-license.noarch", available_updates[6]) + self.assertEqual("6.6.78.1-1.azl3", package_versions[6]) + self.assertEqual("hypervvssd.x86_64", available_updates[7]) + self.assertEqual("6.6.78.1-1.azl3", package_versions[7]) + self.assertEqual("hypervkvpd.x86_64", available_updates[8]) + self.assertEqual("6.6.78.1-1.azl3", package_versions[8]) + + def test_inclusion_type_critical(self): + """Unit test for tdnf package manager with inclusion and Classification = Critical. Returns no packages since classifications are not available in Azure Linux""" + self.runtime.set_legacy_test_type('HappyPath') + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager is not None) + self.runtime.stop() + + argument_composer = ArgumentComposer() + argument_composer.classifications_to_include = [Constants.PackageClassification.CRITICAL] + argument_composer.patches_to_exclude = ["ssh*", "test"] + argument_composer.patches_to_include = ["ssh", "tar*"] + self.runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True, Constants.TDNF) + self.container = self.runtime.container + + package_filter = self.container.get('package_filter') + self.assertTrue(package_filter is not None) + + # test for get_available_updates + available_updates, package_versions = package_manager.get_available_updates(package_filter) + self.assertTrue(available_updates == []) + self.assertTrue(package_versions == []) + + def test_inclusion_type_other(self): + """Unit test for tdnf package manager with inclusion and Classification = Other. All packages are considered are 'Other' since AzLinux does not have patch classification""" + self.runtime.set_legacy_test_type('HappyPath') + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager is not None) + self.runtime.stop() + + argument_composer = ArgumentComposer() + argument_composer.classifications_to_include = [Constants.PackageClassification.OTHER] + argument_composer.patches_to_include = ["ssh", "tcpdump"] + argument_composer.patches_to_exclude = ["ssh*", "test"] + self.runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True, Constants.TDNF) + self.container = self.runtime.container + + package_filter = self.container.get('package_filter') + self.assertTrue(package_filter is not None) + + # test for get_available_updates + available_updates, package_versions = package_manager.get_available_updates(package_filter) + self.assertTrue(available_updates is not None) + self.assertTrue(package_versions is not None) + self.assertEqual(9, len(available_updates)) + self.assertEqual(9, len(package_versions)) + self.assertEqual("azurelinux-release.noarch", available_updates[0]) + self.assertEqual("3.0-16.azl3", package_versions[0]) + self.assertEqual("azurelinux-repos-ms-oss.noarch", available_updates[1]) + self.assertEqual("3.0-3.azl3", package_versions[1]) + self.assertEqual("libseccomp.x86_64", available_updates[2]) + self.assertEqual("2.5.4-1.azl3", package_versions[2]) + self.assertEqual("python3.x86_64", available_updates[3]) + self.assertEqual("3.12.3-6.azl3", package_versions[3]) + self.assertEqual("libxml2.x86_64", available_updates[4]) + self.assertEqual("2.11.5-1.azl3", package_versions[4]) + self.assertEqual("dracut.x86_64", available_updates[5]) + self.assertEqual("102-7.azl3", package_versions[5]) + self.assertEqual("hyperv-daemons-license.noarch", available_updates[6]) + self.assertEqual("6.6.78.1-1.azl3", package_versions[6]) + self.assertEqual("hypervvssd.x86_64", available_updates[7]) + self.assertEqual("6.6.78.1-1.azl3", package_versions[7]) + self.assertEqual("hypervkvpd.x86_64", available_updates[8]) + self.assertEqual("6.6.78.1-1.azl3", package_versions[8]) + + def test_inclusion_only(self): + """Unit test for tdnf package manager with inclusion only and NotSelected Classifications""" + self.runtime.set_legacy_test_type('HappyPath') + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager is not None) + self.runtime.stop() + + argument_composer = ArgumentComposer() + argument_composer.classifications_to_include = [Constants.PackageClassification.UNCLASSIFIED] + argument_composer.patches_to_include = ["azurelinux-release.noarch", "lib*"] + argument_composer.patches_to_exclude = ["ssh*", "test"] + self.runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True, Constants.TDNF) + self.container = self.runtime.container + + package_filter = self.container.get('package_filter') + self.assertTrue(package_filter is not None) + + # test for get_available_updates + available_updates, package_versions = package_manager.get_available_updates(package_filter) + self.assertTrue(available_updates is not None) + self.assertTrue(package_versions is not None) + self.assertEqual(3, len(available_updates)) + self.assertEqual(3, len(package_versions)) + self.assertEqual("azurelinux-release.noarch", available_updates[0]) + self.assertEqual("3.0-16.azl3", package_versions[0]) + self.assertEqual("libseccomp.x86_64", available_updates[1]) + self.assertEqual("2.5.4-1.azl3", package_versions[1]) + self.assertEqual("libxml2.x86_64", available_updates[2]) + self.assertEqual("2.11.5-1.azl3", package_versions[2]) + + def test_inclusion_dependency_only(self): + """Unit test for tdnf with test dependencies in Inclusion & NotSelected Classifications""" + self.runtime.set_legacy_test_type('HappyPath') + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager is not None) + self.runtime.stop() + + argument_composer = ArgumentComposer() + argument_composer.classifications_to_include = [Constants.PackageClassification.UNCLASSIFIED] + argument_composer.patches_to_include = ["ssh", "hypervvssd.x86_64"] + argument_composer.patches_to_exclude = ["ssh*", "test"] + self.runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True, Constants.TDNF) + self.container = self.runtime.container + + package_filter = self.container.get('package_filter') + self.assertTrue(package_filter is not None) + + # test for get_available_updates + available_updates, package_versions = package_manager.get_available_updates(package_filter) + self.assertTrue(available_updates is not None) + self.assertTrue(package_versions is not None) + self.assertEqual(len(available_updates), 1) + self.assertEqual(len(package_versions), 1) + self.assertEqual(available_updates[0], "hypervvssd.x86_64") + self.assertEqual(package_versions[0], "6.6.78.1-1.azl3") + + def test_inclusion_notexist(self): + """Unit test for tdnf with Inclusion which does not exist & NotSelected Classifications""" + self.runtime.set_legacy_test_type('HappyPath') + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager is not None) + self.runtime.stop() + + argument_composer = ArgumentComposer() + argument_composer.classifications_to_include = [Constants.PackageClassification.UNCLASSIFIED] + argument_composer.patches_to_include = ["ssh"] + argument_composer.patches_to_exclude = ["ssh*", "test"] + self.runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True, Constants.TDNF) + self.container = self.runtime.container + + package_filter = self.container.get('package_filter') + self.assertTrue(package_filter is not None) + + # test for get_available_updates + available_updates, package_versions = package_manager.get_available_updates(package_filter) + self.assertTrue(available_updates is not None) + self.assertTrue(package_versions is not None) + self.assertEqual(len(available_updates), 0) + self.assertEqual(len(package_versions), 0) + + def test_dedupe_update_packages_to_get_latest_versions(self): + packages = [] + package_versions = [] + + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager is not None) + deduped_packages, deduped_package_versions = package_manager.dedupe_update_packages_to_get_latest_versions(packages, package_versions) + self.assertTrue(deduped_packages == []) + self.assertTrue(deduped_package_versions == []) + + packages = ['python3.x86_64', 'dracut.x86_64', 'libxml2.x86_64', 'azurelinux-release.noarch', 'python3.noarch', 'python3.x86_64', 'python3.x86_64', 'hypervvssd.x86_64', 'python3.x86_64', 'python3.x86_64'] + package_versions = ['3.12.3-1.azl3', '102-7.azl3 ', '2.11.5-1.azl3', '3.0-16.azl3', '3.12.9-2.azl3', '3.12.9-1.azl3', '3.12.3-4.azl3', '6.6.78.1-1.azl3', '3.12.3-5.azl3', '3.12.3-5.azl3'] + deduped_packages, deduped_package_versions = package_manager.dedupe_update_packages_to_get_latest_versions(packages, package_versions) + self.assertTrue(deduped_packages is not None and deduped_packages is not []) + self.assertTrue(deduped_package_versions is not None and deduped_package_versions is not []) + self.assertTrue(len(deduped_packages) == 6) + self.assertTrue(deduped_packages[0] == 'python3.x86_64') + self.assertTrue(deduped_package_versions[0] == '3.12.9-1.azl3') + + def test_obsolete_packages_should_not_considered_in_available_updates(self): + self.runtime.set_legacy_test_type('ObsoletePackages') + package_manager = self.container.get('package_manager') + package_filter = self.container.get('package_filter') + + # test for all available versions + package_versions = package_manager.get_all_available_versions_of_package("python3") + self.assertEqual(len(package_versions), 6) + self.assertEqual(package_versions[0], '3.12.3-1.azl3') + self.assertEqual(package_versions[1], '3.12.3-2.azl3') + self.assertEqual(package_versions[2], '3.12.3-4.azl3') + self.assertEqual(package_versions[3], '3.12.3-5.azl3') + self.assertEqual(package_versions[4], '3.12.3-6.azl3') + self.assertEqual(package_versions[5], '3.12.9-1.azl3') + + def test_all_classification_selected_for_auto_patching_request(self): + """Unit test for tdnf package manager for auto patching request where all classifications are selected since Azure Linux does not have classifications""" + backup_envlayer_platform_linux_distribution = LegacyEnvLayerExtensions.LegacyPlatform.linux_distribution + LegacyEnvLayerExtensions.LegacyPlatform.linux_distribution = self.mock_linux_distribution_to_return_azure_linux + + self.runtime.set_legacy_test_type('HappyPath') + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager is not None) + self.runtime.stop() + + argument_composer = ArgumentComposer() + argument_composer.classifications_to_include = [Constants.PackageClassification.SECURITY, Constants.PackageClassification.CRITICAL] + argument_composer.health_store_id = "pub_off_sku_2025.03.24" + argument_composer.operation = Constants.INSTALLATION + self.runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True, Constants.TDNF) + self.container = self.runtime.container + + execution_config = self.container.get('execution_config') + self.assertTrue(execution_config.included_classifications_list is not None) + self.assertTrue(execution_config.included_classifications_list == [Constants.PackageClassification.CRITICAL, Constants.PackageClassification.SECURITY, Constants.PackageClassification.OTHER]) + + LegacyEnvLayerExtensions.LegacyPlatform.linux_distribution = backup_envlayer_platform_linux_distribution + + def test_refresh_repo(self): + self.runtime.set_legacy_test_type('HappyPath') + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager is not None) + package_manager.refresh_repo_safely() + + def test_disable_auto_os_updates_with_uninstalled_services(self): + # no services are installed on the machine. expected o/p: function will complete successfully. Backup file will be created with default values, no auto OS update configuration settings will be updated as there are none + self.runtime.set_legacy_test_type('SadPath') + package_manager = self.container.get('package_manager') + package_manager.disable_auto_os_update() + self.assertTrue(package_manager.image_default_patch_configuration_backup_exists()) + image_default_patch_configuration_backup = json.loads(self.runtime.env_layer.file_system.read_with_retry(package_manager.image_default_patch_configuration_backup_path)) + self.assertTrue(image_default_patch_configuration_backup is not None) + + # validating backup for dnf-automatic + self.assertTrue(package_manager.dnf_auto_os_update_service in image_default_patch_configuration_backup) + self.assertEqual(image_default_patch_configuration_backup[package_manager.dnf_auto_os_update_service][package_manager.dnf_automatic_download_updates_identifier_text], "") + self.assertEqual(image_default_patch_configuration_backup[package_manager.dnf_auto_os_update_service][package_manager.dnf_automatic_apply_updates_identifier_text], "") + self.assertEqual(image_default_patch_configuration_backup[package_manager.dnf_auto_os_update_service][package_manager.dnf_automatic_enable_on_reboot_identifier_text], False) + self.assertEqual(image_default_patch_configuration_backup[package_manager.dnf_auto_os_update_service][package_manager.dnf_automatic_installation_state_identifier_text], False) + + def test_disable_auto_os_updates_with_installed_services(self): + # all services are installed and contain valid configurations. expected o/p All services will be disabled and backup file should reflect default settings for all + self.runtime.set_legacy_test_type('HappyPath') + package_manager = self.container.get('package_manager') + + package_manager.dnf_automatic_configuration_file_path = os.path.join(self.runtime.execution_config.config_folder, "automatic.conf") + dnf_automatic_os_patch_configuration_settings = 'apply_updates = yes\ndownload_updates = yes\n' + self.runtime.write_to_file(package_manager.dnf_automatic_configuration_file_path, dnf_automatic_os_patch_configuration_settings) + + package_manager.disable_auto_os_update() + self.assertTrue(package_manager.image_default_patch_configuration_backup_exists()) + image_default_patch_configuration_backup = json.loads(self.runtime.env_layer.file_system.read_with_retry(package_manager.image_default_patch_configuration_backup_path)) + self.assertTrue(image_default_patch_configuration_backup is not None) + + # validating backup for dnf-automatic + self.assertTrue(package_manager.dnf_auto_os_update_service in image_default_patch_configuration_backup) + self.assertEqual(image_default_patch_configuration_backup[package_manager.dnf_auto_os_update_service][package_manager.dnf_automatic_download_updates_identifier_text], "yes") + self.assertEqual(image_default_patch_configuration_backup[package_manager.dnf_auto_os_update_service][package_manager.dnf_automatic_apply_updates_identifier_text], "yes") + self.assertEqual(image_default_patch_configuration_backup[package_manager.dnf_auto_os_update_service][package_manager.dnf_automatic_enable_on_reboot_identifier_text], False) + self.assertEqual(image_default_patch_configuration_backup[package_manager.dnf_auto_os_update_service][package_manager.dnf_automatic_installation_state_identifier_text], True) + + def test_disable_auto_os_update_failure(self): + # disable with non existing log file + package_manager = self.container.get('package_manager') + + self.assertRaises(Exception, package_manager.disable_auto_os_update) + self.assertTrue(package_manager.image_default_patch_configuration_backup_exists()) + + def test_update_image_default_patch_mode(self): + package_manager = self.container.get('package_manager') + package_manager.os_patch_configuration_settings_file_path = package_manager.dnf_automatic_configuration_file_path = os.path.join(self.runtime.execution_config.config_folder, "automatic.conf") + + # disable apply_updates when enabled by default + dnf_automatic_os_patch_configuration_settings = 'apply_updates = yes\ndownload_updates = yes\n' + self.runtime.write_to_file(package_manager.dnf_automatic_configuration_file_path, dnf_automatic_os_patch_configuration_settings) + + package_manager.update_os_patch_configuration_sub_setting(package_manager.dnf_automatic_apply_updates_identifier_text, "no", package_manager.dnf_automatic_config_pattern_match_text) + dnf_automatic_os_patch_configuration_settings_file_path_read = self.runtime.env_layer.file_system.read_with_retry(package_manager.os_patch_configuration_settings_file_path) + self.assertTrue(dnf_automatic_os_patch_configuration_settings_file_path_read is not None) + self.assertTrue('apply_updates = no' in dnf_automatic_os_patch_configuration_settings_file_path_read) + self.assertTrue('download_updates = yes' in dnf_automatic_os_patch_configuration_settings_file_path_read) + + # disable download_updates when enabled by default + dnf_automatic_os_patch_configuration_settings = 'apply_updates = yes\ndownload_updates = yes\n' + self.runtime.write_to_file(package_manager.os_patch_configuration_settings_file_path, dnf_automatic_os_patch_configuration_settings) + package_manager.update_os_patch_configuration_sub_setting(package_manager.dnf_automatic_download_updates_identifier_text, "no", package_manager.dnf_automatic_config_pattern_match_text) + dnf_automatic_os_patch_configuration_settings_file_path_read = self.runtime.env_layer.file_system.read_with_retry(package_manager.os_patch_configuration_settings_file_path) + self.assertTrue(dnf_automatic_os_patch_configuration_settings_file_path_read is not None) + self.assertTrue('apply_updates = yes' in dnf_automatic_os_patch_configuration_settings_file_path_read) + self.assertTrue('download_updates = no' in dnf_automatic_os_patch_configuration_settings_file_path_read) + + # disable apply_updates when default patch mode settings file is empty + dnf_automatic_os_patch_configuration_settings = '' + self.runtime.write_to_file(package_manager.os_patch_configuration_settings_file_path, dnf_automatic_os_patch_configuration_settings) + package_manager.update_os_patch_configuration_sub_setting(package_manager.dnf_automatic_apply_updates_identifier_text, "no", package_manager.dnf_automatic_config_pattern_match_text) + dnf_automatic_os_patch_configuration_settings_file_path_read = self.runtime.env_layer.file_system.read_with_retry(package_manager.os_patch_configuration_settings_file_path) + self.assertTrue(dnf_automatic_os_patch_configuration_settings_file_path_read is not None) + self.assertTrue('download_updates' not in dnf_automatic_os_patch_configuration_settings_file_path_read) + self.assertTrue('apply_updates = no' in dnf_automatic_os_patch_configuration_settings_file_path_read) + + def test_update_image_default_patch_mode_raises_exception(self): + package_manager = self.container.get('package_manager') + package_manager.dnf_automatic_configuration_file_path = os.path.join(self.runtime.execution_config.config_folder, "automatic.conf") + dnf_automatic_os_patch_configuration_settings = 'apply_updates = yes\ndownload_updates = yes\n' + self.runtime.write_to_file(package_manager.dnf_automatic_configuration_file_path, dnf_automatic_os_patch_configuration_settings) + self.runtime.env_layer.file_system.write_with_retry = self.mock_write_with_retry_raise_exception + self.assertRaises(Exception, package_manager.update_os_patch_configuration_sub_setting) + + def test_get_current_auto_os_patch_state_with_uninstalled_services(self): + # no services are installed on the machine. expected o/p: function will complete successfully, backup file is not created and function returns current_auto_os_patch_state as disabled + self.runtime.set_legacy_test_type('SadPath') + package_manager = self.container.get('package_manager') + package_manager.get_current_auto_os_patch_state = self.runtime.backup_get_current_auto_os_patch_state + current_auto_os_patch_state = package_manager.get_current_auto_os_patch_state() + + self.assertFalse(package_manager.image_default_patch_configuration_backup_exists()) + self.assertEqual(current_auto_os_patch_state, Constants.AutomaticOSPatchStates.DISABLED) + + def test_get_current_auto_os_patch_state_with_installed_services_and_state_disabled(self): + # dnf-automatic is installed on the machine. expected o/p: function will complete successfully, backup file is NOT created and function returns current_auto_os_patch_state as disabled + self.runtime.set_legacy_test_type('HappyPath') + package_manager = self.container.get('package_manager') + package_manager.get_current_auto_os_patch_state = self.runtime.backup_get_current_auto_os_patch_state + + package_manager.dnf_automatic_configuration_file_path = os.path.join(self.runtime.execution_config.config_folder, "automatic.conf") + dnf_automatic_os_patch_configuration_settings = 'apply_updates = no\ndownload_updates = yes\n' + self.runtime.write_to_file(package_manager.dnf_automatic_configuration_file_path, dnf_automatic_os_patch_configuration_settings) + + current_auto_os_patch_state = package_manager.get_current_auto_os_patch_state() + + self.assertFalse(package_manager.image_default_patch_configuration_backup_exists()) + self.assertEqual(current_auto_os_patch_state, Constants.AutomaticOSPatchStates.DISABLED) + + def test_get_current_auto_os_patch_state_with_installed_services_and_state_enabled(self): + # dnf-automatic is installed on the machine. expected o/p: function will complete successfully, backup file is NOT created and function returns current_auto_os_patch_state as enabled + + # with enable on reboot set to false + self.runtime.set_legacy_test_type('HappyPath') + package_manager = self.container.get('package_manager') + package_manager.get_current_auto_os_patch_state = self.runtime.backup_get_current_auto_os_patch_state + + package_manager.dnf_automatic_configuration_file_path = os.path.join(self.runtime.execution_config.config_folder, "automatic.conf") + dnf_automatic_os_patch_configuration_settings = 'apply_updates = yes\ndownload_updates = yes\n' + self.runtime.write_to_file(package_manager.dnf_automatic_configuration_file_path, dnf_automatic_os_patch_configuration_settings) + + current_auto_os_patch_state = package_manager.get_current_auto_os_patch_state() + + self.assertFalse(package_manager.image_default_patch_configuration_backup_exists()) + self.assertEqual(current_auto_os_patch_state, Constants.AutomaticOSPatchStates.ENABLED) + + # with enable on reboot set to true + self.runtime.set_legacy_test_type('AnotherSadPath') + package_manager = self.container.get('package_manager') + package_manager.get_current_auto_os_patch_state = self.runtime.backup_get_current_auto_os_patch_state + + package_manager.dnf_automatic_configuration_file_path = os.path.join(self.runtime.execution_config.config_folder, "automatic.conf") + dnf_automatic_os_patch_configuration_settings = 'apply_updates = no\ndownload_updates = yes\n' + self.runtime.write_to_file(package_manager.dnf_automatic_configuration_file_path, dnf_automatic_os_patch_configuration_settings) + + current_auto_os_patch_state = package_manager.get_current_auto_os_patch_state() + + self.assertFalse(package_manager.image_default_patch_configuration_backup_exists()) + self.assertEqual(current_auto_os_patch_state, Constants.AutomaticOSPatchStates.ENABLED) + + def test_get_current_auto_os_patch_state_with_installed_services_and_state_unknown(self): + # dnf-automatic is installed on the machine. expected o/p: function will complete successfully, backup file is NOT created and function returns current_auto_os_patch_state as unknown + + self.runtime.set_legacy_test_type('HappyPath') + package_manager = self.container.get('package_manager') + package_manager.get_current_auto_os_patch_state = self.runtime.backup_get_current_auto_os_patch_state + + package_manager.dnf_automatic_configuration_file_path = os.path.join(self.runtime.execution_config.config_folder, "automatic.conf") + dnf_automatic_os_patch_configuration_settings = 'apply_updates = abc\ndownload_updates = yes\n' + self.runtime.write_to_file(package_manager.dnf_automatic_configuration_file_path, dnf_automatic_os_patch_configuration_settings) + + current_auto_os_patch_state = package_manager.get_current_auto_os_patch_state() + + self.assertFalse(package_manager.image_default_patch_configuration_backup_exists()) + self.assertEqual(current_auto_os_patch_state, Constants.AutomaticOSPatchStates.UNKNOWN) + + +if __name__ == '__main__': + unittest.main() + diff --git a/src/core/tests/library/LegacyEnvLayerExtensions.py b/src/core/tests/library/LegacyEnvLayerExtensions.py index 55575399..1a1cf24f 100644 --- a/src/core/tests/library/LegacyEnvLayerExtensions.py +++ b/src/core/tests/library/LegacyEnvLayerExtensions.py @@ -538,6 +538,92 @@ def run_command_output(self, cmd, no_output=False, chk_err=True): elif cmd.find('pro security-status --format=json') > -1: code = 0 output = "{\"summary\":{\"ua\":{\"attached\":true}}}" + elif self.legacy_package_manager_name is Constants.TDNF: + if cmd.find("--security list updates") > -1: + code = 0 + output = "\n" + \ + "azurelinux-release.noarch " + \ + "3.0-16.azl3 " + \ + "azurelinux-official-base\n" + elif cmd.find("list updates") > -1: + code = 0 + output = "\n" + \ + "azurelinux-release.noarch 3.0-16.azl3 azurelinux-official-base\n" + \ + "azurelinux-repos-ms-oss.noarch 3.0-3.azl3 azurelinux-official-base\n" + \ + "libseccomp.x86_64 2.5.4-1.azl3 azurelinux-official-base\n" + \ + "python3.x86_64 3.12.3-2.azl3 azurelinux-official-base\n" + \ + "libxml2.x86_64 2.11.5-1.azl3 azurelinux-official-base\n" + \ + "dracut.x86_64 102-7.azl3 azurelinux-official-base\n" + \ + "python3.x86_64 3.12.3-5.azl3 azurelinux-official-base\n" + \ + "python3.x86_64 3.12.3-6.azl3 azurelinux-official-base\n" + \ + "hyperv-daemons-license.noarch 6.6.78.1-1.azl3 azurelinux-official-base\n" + \ + "hypervvssd.x86_64 6.6.78.1-1.azl3 azurelinux-official-base\n" + \ + "python3.x86_64 3.12.3-4.azl3 azurelinux-official-base\n" + \ + "hypervkvpd.x86_64 6.6.78.1-1.azl3 azurelinux-official-base\n" + elif cmd.find("needs-restarting -r") > -1: + code = 0 + output = "Core libraries or services have been updated since boot-up:\n" + \ + " * kernel\n" + \ + "\n" + \ + "Reboot is required to fully utilize these updates.\n" + \ + "More information: https://access.redhat.com/solutions/27943\n" + elif cmd.find("tdnf -y install --skip-broken curl") > -1: + code = 0 + output = "Loaded plugin: tdnfrepogpgcheck\n\n" + \ + "Upgrading:\n" + \ + "curl-libs x86_64 8.11.1-3.azl3 azurelinux-official-base 847.91k 403.29k\n" + \ + "curl x86_ 8.11.1-3.azl3 azurelinux-official-base 382.51k 258.06k\n\n" + \ + "Total installed size: 1.20M\n" + \ + "Total download size: 661.34k\n" + \ + "curl-libs 412964 100%\n" + \ + "curl 264253 100%\n" + \ + "Testing transaction\n" + \ + "Running transaction\n" + \ + "Installing/Updating: curl-libs-8.11.1-3.azl3.x86_64\n" + \ + "Installing/Updating: curl-8.11.1-3.azl3.x86_64\n" + \ + "Removing: curl-8.8.0-4.azl3.x86_64\n" + \ + "Removing: curl-libs-8.8.0-4.azl3.x86_64\n" + elif cmd.find("tdnf list available python3") > -1: + code = 0 + output = "Loaded plugin: tdnfrepogpgcheck\n" + \ + "python3.x86_64 3.12.3-1.azl3 azurelinux-official-base\n" + \ + "python3.x86_64 3.12.3-2.azl3 azurelinux-official-base\n" + \ + "python3.x86_64 3.12.3-4.azl3 azurelinux-official-base\n" + \ + "python3.x86_64 3.12.3-5.azl3 azurelinux-official-base\n" + \ + "python3.x86_64 3.12.3-6.azl3 azurelinux-official-base\n" + \ + "python3.x86_64 3.12.9-1.azl3 azurelinux-official-base\n" + elif cmd.find("tdnf install --assumeno --skip-broken hyperv-daemons.x86_64") > -1: + code = 8 + output = "Loaded plugin: tdnfrepogpgcheck\n\n" + \ + "Upgrading:\n" + \ + "hyperv-daemons-license noarch 6.6.78.1-1.azl3 azurelinux-official-base 496.00b 20.92k\n" + \ + "hypervvssd x86_64 6.6.78.1-1.azl3 azurelinux-official-base 19.70k 28.39k\n" + \ + "hypervkvpd x86_64 6.6.78.1-1.azl3 azurelinux-official-base 42.29k 38.72k\n" + \ + "hypervfcopyd x86_64 6.6.78.1-1.azl3 azurelinux-official-base 15.69k 26.97k\n" + \ + "hyperv-daemons x86_64 6.6.78.1-1.azl3 azurelinux-official-base 0.00b 20.08k\n\n" + \ + "Total installed size: 78.16k\n" + \ + "Total download size: 135.09k\n" + \ + "Error(1032) : Operation aborted.\n" + elif cmd.find("list installed") > -1: + code = 0 + package = cmd.replace('sudo tdnf list installed ', '') + whitelisted_versions = [ + '3.0-16.azl3', '3.0-3.azl3', '2.5.4-1.azl3', '3.12.3-6.azl3', '2.11.5-1.azl3', '102-7.azl3', '6.6.78.1-1.azl3'] # any list of versions you want to work for *any* package + output = "Loaded plugin: tdnfrepogpgcheck\n" + template = " @System\n" + for version in whitelisted_versions: + entry = template.replace('', package) + entry = entry.replace('', version) + output += entry + elif cmd.find("systemctl list-unit-files --type=service") > -1: + code = 0 + output = 'Auto update service installed' + elif cmd.find("systemctl is-enabled ") > -1: + code = 0 + output = 'disabled' + elif cmd.find("systemctl disable ") > -1: + code = 0 + output = 'Auto update service disabled' elif self.legacy_test_type == 'SadPath': if cmd.find("cat /proc/cpuinfo | grep name") > -1: code = 0 @@ -580,6 +666,9 @@ def run_command_output(self, cmd, no_output=False, chk_err=True): elif cmd.find('sudo LANG=en_US.UTF8 zypper --non-interactive patch --category security') > -1: code = 103 output = '' + elif self.legacy_package_manager_name is Constants.TDNF: + code = 0 + output = '' elif cmd.find("systemctl") > -1: code = 1 output = '' @@ -617,6 +706,15 @@ def run_command_output(self, cmd, no_output=False, chk_err=True): elif cmd.find('sudo zypper refresh') > -1: code = 4 output = 'System management is locked by the application with pid 7914 (/usr/bin/zypper).' + elif self.legacy_package_manager_name is Constants.TDNF: + code = 100 + output = "azurelinux-release.noarch 3.0-16.azl3 \n" + \ + "azurelinux-official-base\n" + \ + "azurelinux-repos-ms-oss.noarch\n" + \ + "3.0-3.azl3 azurelinux-official-base\n" + \ + "libseccomp.x86_64 2.5.4-1.azl3 azurelinux-official-base\n" + \ + "libxml2.x86_64 azurelinux-official-base\n" + \ + "dracut.x86_64 102-7.azl3 azurelinux-official-base\n" elif self.legacy_test_type == 'NonexistentErrorCodePath': if self.legacy_package_manager_name is Constants.ZYPPER: if cmd.find('sudo zypper refresh') > -1: @@ -633,6 +731,13 @@ def run_command_output(self, cmd, no_output=False, chk_err=True): elif cmd.find('sudo LANG=en_US.UTF8 zypper --non-interactive patch --category security') > -1: code = 102 output = '' + if self.legacy_package_manager_name is Constants.TDNF: + if cmd.find("systemctl list-unit-files --type=service") > -1: + code = 0 + output = 'Auto update service installed' + elif cmd.find("systemctl is-enabled ") > -1: + code = 0 + output = 'enabled' elif self.legacy_test_type == 'ExceptionPath': code = -1 output = '' @@ -769,6 +874,23 @@ def run_command_output(self, cmd, no_output=False, chk_err=True): "sudo LANG=en_US.UTF8 zypper --non-interactive update --dry-run") > -1: code = 0 output = "Package sucessfully installed!" + elif self.legacy_package_manager_name is Constants.TDNF: + if cmd.find("simulate-install") > -1 or cmd.find("sudo tdnf install --assumeno --skip-broken hyperv-daemons-license") > -1: + code = 8 + output = "Loaded plugin: tdnfrepogpgcheck\n\n" + \ + "Upgrading:\n" + \ + "hyperv-daemons-license noarch 6.6.78.1-1.azl3 azurelinux-official-base 496.00b 20.92k\n" + \ + "hypervvssd x86_64 6.6.78.1-1.azl3 azurelinux-official-base 19.70k 28.39k\n" + \ + "hypervkvpd x86_64 6.6.78.1-1.azl3 azurelinux-official-base 42.29k 38.72k\n" + \ + "hypervfcopyd x86_64 6.6.78.1-1.azl3 azurelinux-official-base 15.69k 26.97k\n" + \ + "hyperv-daemons x86_64 6.6.78.1-1.azl3 azurelinux-official-base 0.00b 20.08k\n\n" + \ + "Total installed size: 78.16k\n" + \ + "Total download size: 135.09k\n" + \ + "Error(1032) : Operation aborted.\n" + elif cmd.find("sudo tdnf list installed hyperv-daemons-license.noarch") > -1: + code = 0 + output = "Loaded plugin: tdnfrepogpgcheck\n" + \ + "hyperv-daemons-license.noarch 6.6.78.1-1.azl3 @System\n" elif self.legacy_test_type == 'FailInstallPath': if cmd.find("cat /proc/cpuinfo | grep name") > -1: code = 0 @@ -921,6 +1043,10 @@ def run_command_output(self, cmd, no_output=False, chk_err=True): elif cmd.find("force-dpkg-failure") > -1: code = 100 output = "E: dpkg was interrupted, you must manually run 'sudo dpkg --configure -a' to correct the problem." + elif self.legacy_package_manager_name is Constants.TDNF: + if cmd.find("simulate-install") > -1 or cmd.find("sudo tdnf install --assumeno --skip-broken hyperv-daemons-license") > -1: + code = 100 + output = "Failed to install package" elif self.legacy_test_type == 'SSLCertificateIssueType1HappyPathAfterFix': if self.legacy_package_manager_name is Constants.YUM: if cmd.find("yum update -y --disablerepo='*' --enablerepo='*microsoft*'") > -1: @@ -1217,7 +1343,18 @@ def run_command_output(self, cmd, no_output=False, chk_err=True): " grub2-tools.x86_64 " + \ "1:2.02-123.el8 " + \ "@System\n" - + if self.legacy_package_manager_name is Constants.TDNF: + if cmd.find("tdnf list available python3") > -1: + code = 0 + output = "Loaded plugin: tdnfrepogpgcheck\n" + \ + "python3.x86_64 3.12.3-1.azl3 azurelinux-official-base\n" + \ + "python3.x86_64 3.12.3-2.azl3 azurelinux-official-base\n" + \ + "python3.x86_64 3.12.3-4.azl3 azurelinux-official-base\n" + \ + "python3.x86_64 3.12.3-5.azl3 azurelinux-official-base\n" + \ + "python3.x86_64 3.12.3-6.azl3 azurelinux-official-base\n" + \ + "python3.x86_64 3.12.9-1.azl3 azurelinux-official-base\n" + \ + "Obsoleting:\n" + \ + "python.x86_64 2.7.9-1.azl3 azurelinux-official-base\n" elif self.legacy_test_type == 'YumVersion4Dependency': if self.legacy_package_manager_name is Constants.YUM: if cmd.find("--version") > -1: