diff --git a/src/core/src/bootstrap/EnvLayer.py b/src/core/src/bootstrap/EnvLayer.py index 5bb598c7..f79f7b2c 100644 --- a/src/core/src/bootstrap/EnvLayer.py +++ b/src/core/src/bootstrap/EnvLayer.py @@ -15,6 +15,7 @@ # Requires Python 2.7+ from __future__ import print_function + import datetime import glob import os @@ -46,6 +47,15 @@ def __init__(self): def is_distro_azure_linux(distro_name): return any(x in distro_name for x in Constants.AZURE_LINUX) + def is_distro_azure_linux_3_or_beyond(self): + # type: () -> bool + """ Checks if the current distro is Azure Linux 3 """ + if self.is_distro_azure_linux(self.platform.linux_distribution()): + version = distro.os_release_attr('version') + major = version.split('.')[0] if version else None + return major is not None and int(major) >= 3 + return False + def get_package_manager(self): # type: () -> str """ Detects package manager type """ @@ -242,6 +252,7 @@ def cpu_arch(): # architecture @staticmethod def vm_name(): # machine name return platform.node() + # endregion - Platform extensions # region - File system extensions @@ -396,4 +407,18 @@ def utc_to_standard_datetime(utc_datetime): def standard_datetime_to_utc(std_datetime): """ Converts datetime object to string of format '"%Y-%m-%dT%H:%M:%SZ"' """ return std_datetime.strftime("%Y-%m-%dT%H:%M:%SZ") + + @staticmethod + def datetime_string_to_posix_time(datetime_string, format_string): + # type: (str, str) -> int + """ Converts string of given format to posix datetime string. """ + # eg: Input: datetime_string: 20241220T000000Z (str), format_string: '%Y%m%dT%H%M%SZ' -> Output: 1734681600 (str) + + # Parse the datetime string + dt = datetime.datetime.strptime(datetime_string, format_string) + + # Convert to POSIX time assuming UTC + epoch = datetime.datetime(1970, 1, 1) + return int((dt - epoch).total_seconds()) + # endregion - DateTime emulator and extensions diff --git a/src/core/src/core_logic/PatchInstaller.py b/src/core/src/core_logic/PatchInstaller.py index 5af1b8df..16a26adc 100644 --- a/src/core/src/core_logic/PatchInstaller.py +++ b/src/core/src/core_logic/PatchInstaller.py @@ -80,7 +80,7 @@ def start_installation(self, simulate=False): if self.execution_config.max_patch_publish_date != str(): self.package_manager.set_max_patch_publish_date(self.execution_config.max_patch_publish_date) - if self.package_manager.max_patch_publish_date != str(): + if self.package_manager.max_patch_publish_date != str() and self.package_manager.try_meet_azgps_coordinated_requirements(): """ Strict SDP with the package manager that supports it """ installed_update_count, update_run_successful, maintenance_window_exceeded = self.install_updates_azgps_coordinated(maintenance_window, package_manager, simulate) package_manager.set_package_manager_setting(Constants.PACKAGE_MGR_SETTING_REPEAT_PATCH_OPERATION, bool(not update_run_successful)) diff --git a/src/core/src/package_managers/AptitudePackageManager.py b/src/core/src/package_managers/AptitudePackageManager.py index a16852a1..cdc1b2ec 100644 --- a/src/core/src/package_managers/AptitudePackageManager.py +++ b/src/core/src/package_managers/AptitudePackageManager.py @@ -488,6 +488,10 @@ def install_security_updates_azgps_coordinated(self): command = self.__generate_command_with_custom_sources(self.install_security_updates_azgps_coordinated_cmd, source_parts=source_parts, source_list=source_list) out, code = self.invoke_package_manager_advanced(command, raise_on_exception=False) return code, out + + def try_meet_azgps_coordinated_requirements(self): + # type: () -> bool + return True # endregion # region Package Information diff --git a/src/core/src/package_managers/PackageManager.py b/src/core/src/package_managers/PackageManager.py index b0fa6efe..d2bb57d3 100644 --- a/src/core/src/package_managers/PackageManager.py +++ b/src/core/src/package_managers/PackageManager.py @@ -346,6 +346,12 @@ def install_update_and_dependencies_and_get_status(self, package_and_dependencie @abstractmethod def install_security_updates_azgps_coordinated(self): pass + + @abstractmethod + def try_meet_azgps_coordinated_requirements(self): + # type: () -> bool + """ Returns true if the package manager meets the requirements for azgps coordinated security updates """ + return False # endregion # region Package Information diff --git a/src/core/src/package_managers/TdnfPackageManager.py b/src/core/src/package_managers/TdnfPackageManager.py index f5b699ca..9c380548 100644 --- a/src/core/src/package_managers/TdnfPackageManager.py +++ b/src/core/src/package_managers/TdnfPackageManager.py @@ -34,13 +34,14 @@ def __init__(self, env_layer, execution_config, composite_logger, telemetry_writ self.cmd_repo_refresh = "sudo tdnf -q list updates" # Support to get updates and their dependencies - self.tdnf_check = 'sudo tdnf -q list updates' - self.single_package_check_versions = 'sudo tdnf list available ' - self.single_package_check_installed = 'sudo tdnf list installed ' + self.tdnf_check = 'sudo tdnf -q list updates ' + self.single_package_check_versions = 'sudo tdnf list available ' + self.single_package_check_installed = 'sudo tdnf list installed ' self.single_package_upgrade_simulation_cmd = 'sudo tdnf install --assumeno --skip-broken ' # Install update self.single_package_upgrade_cmd = 'sudo tdnf -y install --skip-broken ' + self.install_security_updates_azgps_coordinated_cmd = 'sudo tdnf -y upgrade --skip-broken ' # Package manager exit code(s) self.tdnf_exitcode_ok = 0 @@ -70,12 +71,15 @@ def __init__(self, env_layer, execution_config, composite_logger, telemetry_writ # commands for DNF Automatic updates service self.__init_constants_for_dnf_automatic() + # AzLinux3 Package Manager. + self.azl3_tdnf_packagemanager = self.AzL3TdnfPackageManager() + # Miscellaneous self.set_package_manager_setting(Constants.PKG_MGR_SETTING_IDENTITY, Constants.TDNF) self.STR_TOTAL_DOWNLOAD_SIZE = "Total download size: " self.version_comparator = VersionComparator() - # if an Auto Patching request comes in on a Azure Linux machine with Security and/or Critical classifications selected, we need to install all patches, since classifications aren't available in Azure Linux repository + # if an Auto Patching request comes in on an Azure Linux machine with Security and/or Critical classifications selected, we need to install all patches, since classifications aren't available in Azure Linux repository installation_included_classifications = [] if execution_config.included_classifications_list is None else execution_config.included_classifications_list if execution_config.health_store_id is not str() and execution_config.operation.lower() == Constants.INSTALLATION.lower() \ and (env_layer.is_distro_azure_linux(str(env_layer.platform.linux_distribution()))) \ @@ -90,6 +94,16 @@ def refresh_repo(self): self.invoke_package_manager(self.cmd_clean_cache) self.invoke_package_manager(self.cmd_repo_refresh) + # region Strict SDP using SnapshotTime + @staticmethod + def __generate_command_with_snapshotposixtime_if_specified(command_template, snapshot_posix_time=str()): + # type: (str, str) -> str + if snapshot_posix_time == str(): + return command_template.replace('', str()) + else: + return command_template.replace('', ('--snapshottime={0}'.format(str(snapshot_posix_time)))) + # endregion + # region Get Available Updates def invoke_package_manager_advanced(self, command, raise_on_exception=True): """Get missing updates using the command input""" @@ -117,32 +131,28 @@ def get_all_updates(self, cached=False): self.composite_logger.log_debug("[TDNF] Get all updates : [Cached={0}][PackagesCount={1}]]".format(str(cached), len(self.all_updates_cached))) return self.all_updates_cached, self.all_update_versions_cached # allows for high performance reuse in areas of the code explicitly aware of the cache - out = self.invoke_package_manager(self.tdnf_check) + out = self.invoke_package_manager(self.__generate_command_with_snapshotposixtime_if_specified(self.tdnf_check, self.max_patch_publish_date)) self.all_updates_cached, self.all_update_versions_cached = self.extract_packages_and_versions(out) - self.composite_logger.log_debug("[TDNF] Get all updates : [Cached={0}][PackagesCount={1}]]".format(str(False), len(self.all_updates_cached))) return self.all_updates_cached, self.all_update_versions_cached def get_security_updates(self): - """Get missing security updates. NOTE: Classification based categorization of patches is not available in Azure Linux as of now""" - self.composite_logger.log_verbose("[TDNF] Discovering 'security' packages...") - security_packages, security_package_versions = [], [] + """Get missing security updates. NOTE: Classification based categorization of patches is not available in TDNF as of now""" + self.composite_logger.log_verbose("[TDNF] Discovering all packages as 'security' packages, since TDNF does not support package classification...") + security_packages, security_package_versions = self.get_all_updates(cached=False) self.composite_logger.log_debug("[TDNF] Discovered 'security' packages. [Count={0}]".format(len(security_packages))) return security_packages, security_package_versions def get_other_updates(self): - """Get missing other updates. - NOTE: This function will return all available packages since Azure Linux does not support package classification in it's repository""" + """Get missing other updates.""" self.composite_logger.log_verbose("[TDNF] Discovering 'other' packages...") - other_packages, other_package_versions = [], [] - - all_packages, all_package_versions = self.get_all_updates(True) - - self.composite_logger.log_debug("[TDNF] Discovered 'other' packages. [Count={0}]".format(len(other_packages))) - return all_packages, all_package_versions + return [], [] def set_max_patch_publish_date(self, max_patch_publish_date=str()): - pass + """Set the max patch publish date in POSIX time for strict SDP""" + self.composite_logger.log_debug("[TDNF] Setting max patch publish date. [MaxPatchPublishDate={0}]".format(str(max_patch_publish_date))) + self.max_patch_publish_date = str(self.env_layer.datetime.datetime_string_to_posix_time(max_patch_publish_date, '%Y%m%dT%H%M%SZ')) if max_patch_publish_date != str() else max_patch_publish_date + self.composite_logger.log_debug("[TDNF] Set max patch publish date. [MaxPatchPublishDatePosixTime={0}]".format(str(self.max_patch_publish_date))) # endregion # region Output Parser(s) @@ -215,7 +225,78 @@ def install_updates_fail_safe(self, excluded_packages): return def install_security_updates_azgps_coordinated(self): - pass + """Install security updates in Azure Linux following strict SDP""" + command = self.__generate_command_with_snapshotposixtime_if_specified(self.install_security_updates_azgps_coordinated_cmd, self.max_patch_publish_date) + out, code = self.invoke_package_manager_advanced(command, raise_on_exception=False) + return code, out + + def try_meet_azgps_coordinated_requirements(self): + # type: () -> bool + """ Check if the system meets the requirements for Azure Linux strict safe deployment and attempt to update TDNF if necessary """ + self.composite_logger.log_debug("[TDNF] Checking if system meets Azure Linux security updates requirements...") + # Check if the system is Azure Linux 3.0 or beyond + if not self.env_layer.is_distro_azure_linux_3_or_beyond(): + self.composite_logger.log_error("[TDNF] The system does not meet minimum Azure Linux requirement of 3.0 or above for strict safe deployment. Defaulting to regular upgrades.") + self.set_max_patch_publish_date() # fall-back + return False + else: + if self.is_minimum_tdnf_version_for_strict_sdp_installed(): + self.composite_logger.log_debug("[TDNF] Minimum tdnf version for strict safe deployment is installed.") + return True + else: + if not self.try_tdnf_update_to_meet_strict_sdp_requirements(): + error_msg = "Failed to meet minimum TDNF version requirement for strict safe deployment. Defaulting to regular upgrades." + self.composite_logger.log_error(error_msg + "[Error={0}]".format(repr(error_msg))) + self.status_handler.add_error_to_status(error_msg) + self.set_max_patch_publish_date() # fall-back + return False + return True + + def is_minimum_tdnf_version_for_strict_sdp_installed(self): + # type: () -> bool + """Check if at least the minimum required version of TDNF is installed""" + self.composite_logger.log_debug("[TDNF] Checking if minimum TDNF version required for strict safe deployment is installed...") + tdnf_version = self.get_tdnf_version() + minimum_tdnf_version_for_strict_sdp = self.azl3_tdnf_packagemanager.TDNF_MINIMUM_VERSION_FOR_STRICT_SDP + distro_from_minimum_tdnf_version_for_strict_sdp = re.match(r".*-\d+\.([a-zA-Z0-9]+)$", minimum_tdnf_version_for_strict_sdp).group(1) + if tdnf_version is None: + self.composite_logger.log_error("[TDNF] Failed to get TDNF version. Cannot proceed with strict safe deployment. Defaulting to regular upgrades.") + return False + elif re.match(r".*-\d+\.([a-zA-Z0-9]+)$", tdnf_version).group(1) != distro_from_minimum_tdnf_version_for_strict_sdp: + self.composite_logger.log_warning("[TDNF] TDNF version installed is not from the same Azure Linux distribution as the minimum required version for strict SDP. [InstalledVersion={0}][MinimumRequiredVersion={1}]".format(tdnf_version, self.azl3_tdnf_packagemanager.TDNF_MINIMUM_VERSION_FOR_STRICT_SDP)) + return False + elif not self.version_comparator.compare_versions(tdnf_version, minimum_tdnf_version_for_strict_sdp) >= 0: + self.composite_logger.log_warning("[TDNF] TDNF version installed is less than the minimum required version for strict SDP. [InstalledVersion={0}][MinimumRequiredVersion={1}]".format(tdnf_version, self.azl3_tdnf_packagemanager.TDNF_MINIMUM_VERSION_FOR_STRICT_SDP)) + return False + return True + + def get_tdnf_version(self): + # type: () -> any + """Get the version of TDNF installed on the system""" + self.composite_logger.log_debug("[TDNF] Getting tdnf version...") + cmd = "rpm -q tdnf | sed -E 's/^tdnf-([0-9]+\\.[0-9]+\\.[0-9]+-[0-9]+\\.[a-zA-Z0-9]+).*/\\1/'" + code, output = self.env_layer.run_command_output(cmd, False, False) + if code == 0: + # Sample output: 3.5.8-3-azl3 + version = output.split()[0] if output else None + self.composite_logger.log_debug("[TDNF] TDNF version detected. [Version={0}]".format(version)) + return version + else: + self.composite_logger.log_error("[TDNF] Failed to get TDNF version. [Command={0}][Code={1}][Output={2}]".format(cmd, code, output)) + return None + + def try_tdnf_update_to_meet_strict_sdp_requirements(self): + # type: () -> bool + """Attempt to update TDNF to meet the minimum version required for strict SDP""" + self.composite_logger.log_debug("[TDNF] Attempting to update TDNF to meet strict safe deployment requirements...") + cmd = "sudo tdnf -y install tdnf-" + self.azl3_tdnf_packagemanager.TDNF_MINIMUM_VERSION_FOR_STRICT_SDP + code, output = self.env_layer.run_command_output(cmd, no_output=True, chk_err=False) + if code == 0: + self.composite_logger.log_debug("[TDNF] Successfully updated TDNF for Strict SDP. [Command={0}][Code={1}]".format(cmd, code)) + return True + else: + self.composite_logger.log_error("[TDNF] Failed to update TDNF for Strict SDP. [Command={0}][Code={1}][Output={2}]".format(cmd, code, output)) + return False # endregion # region Package Information @@ -325,10 +406,10 @@ def get_dependent_list(self, packages): package_names += ' ' package_names += package - self.composite_logger.log_verbose("[TDNF] Resolving dependencies. [Command={0}]".format(str(self.single_package_upgrade_simulation_cmd + package_names))) - output = self.invoke_package_manager(self.single_package_upgrade_simulation_cmd + package_names) + cmd = self.single_package_upgrade_simulation_cmd + package_names + output = self.invoke_package_manager(cmd) dependencies = self.extract_dependencies(output, packages) - self.composite_logger.log_verbose("[TDNF] Resolved dependencies. [Packages={0}][DependencyCount={1}]".format(str(packages), len(dependencies))) + self.composite_logger.log_verbose("[TDNF] Resolved dependencies. [Command={0}][Packages={1}][DependencyCount={2}]".format(str(cmd),str(packages), len(dependencies))) return dependencies def get_product_name(self, package_name): @@ -748,3 +829,9 @@ def separate_out_esm_packages(self, packages, package_versions): def get_package_install_expected_avg_time_in_seconds(self): return self.package_install_expected_avg_time_in_seconds + # region - AzLinux specializations + class AzL3TdnfPackageManager(object): + """AzLinux Package Manager class for TDNF package manager.""" + def __init__(self): + self.TDNF_MINIMUM_VERSION_FOR_STRICT_SDP = "3.5.8-3.azl3" # minimum version of tdnf required to support Strict SDP in Azure Linux + diff --git a/src/core/src/package_managers/YumPackageManager.py b/src/core/src/package_managers/YumPackageManager.py index dc888dd8..9e99cd41 100644 --- a/src/core/src/package_managers/YumPackageManager.py +++ b/src/core/src/package_managers/YumPackageManager.py @@ -266,6 +266,10 @@ def install_updates_fail_safe(self, excluded_packages): def install_security_updates_azgps_coordinated(self): pass + + def try_meet_azgps_coordinated_requirements(self): + # type: () -> bool + return False # endregion # region Package Information diff --git a/src/core/src/package_managers/ZypperPackageManager.py b/src/core/src/package_managers/ZypperPackageManager.py index 6fe5c4d0..365eb1da 100644 --- a/src/core/src/package_managers/ZypperPackageManager.py +++ b/src/core/src/package_managers/ZypperPackageManager.py @@ -420,6 +420,10 @@ def install_updates_fail_safe(self, excluded_packages): def install_security_updates_azgps_coordinated(self): pass + + def try_meet_azgps_coordinated_requirements(self): + # type: () -> bool + return False # endregion # region Package Information diff --git a/src/core/tests/Test_CoreMain.py b/src/core/tests/Test_CoreMain.py index e25f5e99..2071171e 100644 --- a/src/core/tests/Test_CoreMain.py +++ b/src/core/tests/Test_CoreMain.py @@ -24,6 +24,7 @@ from core.src.CoreMain import CoreMain from core.src.bootstrap.Constants import Constants +from core.src.external_dependencies import distro from core.tests.library.ArgumentComposer import ArgumentComposer from core.tests.library.LegacyEnvLayerExtensions import LegacyEnvLayerExtensions from core.tests.library.RuntimeCompositor import RuntimeCompositor @@ -56,6 +57,9 @@ def mock_os_remove(self, file_to_remove): def mock_os_path_exists(self, patch_to_validate): return False + def mock_distro_os_release_attr_return_azure_linux_3(self, attribute): + return '3.0.0' + def test_operation_fail_for_non_autopatching_request(self): # Test for non auto patching request argument_composer = ArgumentComposer() @@ -668,16 +672,16 @@ def test_install_all_packages_for_azure_linux_autopatching(self): self.assertTrue(substatus_file_data[1]["status"].lower() == Constants.STATUS_SUCCESS.lower()) self.assertTrue(json.loads(substatus_file_data[1]["formattedMessage"]["message"])["installedPatchCount"] == 9) self.assertEqual(json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][1]["name"], "azurelinux-repos-ms-oss.noarch") - self.assertTrue("Other" in str(json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][1]["classifications"])) + self.assertTrue("Security" in str(json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][1]["classifications"])) self.assertTrue("Installed" == json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][1]["patchInstallationState"]) self.assertEqual(json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][2]["name"], "libseccomp.x86_64") - self.assertTrue("Other" in str(json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][2]["classifications"])) + self.assertTrue("Security" in str(json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][2]["classifications"])) self.assertTrue("Installed" == json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][2]["patchInstallationState"]) self.assertEqual(json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][0]["name"], "azurelinux-release.noarch") - self.assertTrue("Other" in str(json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][0]["classifications"])) + self.assertTrue("Security" in str(json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][0]["classifications"])) self.assertTrue("Installed" == json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][0]["patchInstallationState"]) self.assertEqual(json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][3]["name"], "python3.x86_64") - self.assertTrue("Other" in str(json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][3]["classifications"])) + self.assertTrue("Security" in str(json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][3]["classifications"])) self.assertTrue("Installed" == json.loads(substatus_file_data[1]["formattedMessage"]["message"])["patches"][3]["patchInstallationState"]) self.assertTrue(substatus_file_data[2]["name"] == Constants.PATCH_METADATA_FOR_HEALTHSTORE) self.assertTrue(substatus_file_data[2]["status"].lower() == Constants.STATUS_SUCCESS.lower()) @@ -690,6 +694,49 @@ def test_install_all_packages_for_azure_linux_autopatching(self): LegacyEnvLayerExtensions.LegacyPlatform.linux_distribution = backup_envlayer_platform_linux_distribution + def test_install_all_packages_for_azure_linux_strict_sdp(self): + # backups + backup_envlayer_platform_linux_distribution = LegacyEnvLayerExtensions.LegacyPlatform.linux_distribution + backup_distro_os_release_attr = distro.os_release_attr + + LegacyEnvLayerExtensions.LegacyPlatform.linux_distribution = self.mock_linux_distribution_to_return_azure_linux + distro.os_release_attr = self.mock_distro_os_release_attr_return_azure_linux_3 + + argument_composer = ArgumentComposer() + classifications_to_include = ["Security", "Critical"] + argument_composer.maximum_duration = "PT3H" + argument_composer.classifications_to_include = classifications_to_include + argument_composer.patches_to_include = ["MaxPatchPublishDate=20250210T000000Z", "AzGPS_Mitigation_Mode_No_SLA"] + argument_composer.reboot_setting = 'Always' + runtime = RuntimeCompositor(argument_composer.get_composed_arguments(), True, Constants.TDNF) + runtime.set_legacy_test_type("HappyPath") + CoreMain(argument_composer.get_composed_arguments()) + + self.assertEqual(runtime.package_manager.max_patch_publish_date, "1739145600") + + # check telemetry events + self.__check_telemetry_events(runtime) + + # check status file + with runtime.env_layer.file_system.open(runtime.execution_config.status_file_path, 'r') as file_handle: + substatus_file_data = json.load(file_handle)[0]["status"]["substatus"] + self.assertEqual(len(substatus_file_data), 4) + self.assertTrue(substatus_file_data[0]["name"] == Constants.PATCH_ASSESSMENT_SUMMARY) + self.assertTrue(substatus_file_data[0]["status"].lower() == Constants.STATUS_SUCCESS.lower()) + self.assertTrue(substatus_file_data[1]["name"] == Constants.PATCH_INSTALLATION_SUMMARY) + self.assertTrue(substatus_file_data[1]["status"].lower() == Constants.STATUS_SUCCESS.lower()) + self.assertTrue(substatus_file_data[2]["name"] == Constants.PATCH_METADATA_FOR_HEALTHSTORE) + self.assertTrue(substatus_file_data[2]["status"].lower() == Constants.STATUS_SUCCESS.lower()) + substatus_file_data_patch_metadata_summary = json.loads(substatus_file_data[2]["formattedMessage"]["message"]) + self.assertEqual(substatus_file_data_patch_metadata_summary["patchVersion"], "") + self.assertTrue(substatus_file_data_patch_metadata_summary["shouldReportToHealthStore"]) + self.assertTrue(substatus_file_data[3]["name"] == Constants.CONFIGURE_PATCHING_SUMMARY) + self.assertTrue(substatus_file_data[3]["status"].lower() == Constants.STATUS_SUCCESS.lower()) + runtime.stop() + + LegacyEnvLayerExtensions.LegacyPlatform.linux_distribution = backup_envlayer_platform_linux_distribution + distro.os_release_attr = backup_distro_os_release_attr + # test with both assessment mode and patch mode set in configure patching or install patches or assess patches or auto assessment def test_auto_assessment_success_with_configure_patching_in_prev_operation_on_same_sequence(self): """Unit test for auto assessment request with configure patching completed on the sequence before. Result: should retain prev substatus and update only PatchAssessmentSummary""" diff --git a/src/core/tests/Test_EnvLayer.py b/src/core/tests/Test_EnvLayer.py index 24bf5c73..37db4055 100644 --- a/src/core/tests/Test_EnvLayer.py +++ b/src/core/tests/Test_EnvLayer.py @@ -17,6 +17,7 @@ import unittest from core.src.bootstrap.EnvLayer import EnvLayer from core.src.bootstrap.Constants import Constants +from core.src.external_dependencies import distro class TestExecutionConfig(unittest.TestCase): @@ -61,6 +62,15 @@ def mock_run_command_for_tdnf(self, cmd, no_output=False, chk_err=False): if cmd.find("which tdnf") > -1: return 0, '' return -1, '' + + def mock_distro_os_release_attr_return_azure_linux_3(self, attribute): + return '3.0.0' + + def mock_distro_os_release_attr_return_azure_linux_2(self, attribute): + return '2.9.0' + + def mock_distro_os_release_attr_return_none(self, attribute): + return None # endregion def test_get_package_manager(self): @@ -95,6 +105,26 @@ def test_get_package_manager(self): self.envlayer.platform.linux_distribution = self.backup_linux_distribution platform.system = self.backup_platform_system + def test_is_distro_azure_linux_3_or_beyond(self): + self.backup_linux_distribution = self.envlayer.platform.linux_distribution + self.backup_envlayer_distro_os_release_attr = distro.os_release_attr + + test_input_output_table = [ + [self.mock_linux_distribution_to_return_azure_linux_3, self.mock_distro_os_release_attr_return_azure_linux_3, True], + [self.mock_linux_distribution_to_return_azure_linux_2, self.mock_distro_os_release_attr_return_azure_linux_2, False], + [self.mock_linux_distribution_to_return_azure_linux_3, self.mock_distro_os_release_attr_return_none, False] + ] + + for row in test_input_output_table: + self.envlayer.platform.linux_distribution = row[0] + distro.os_release_attr = row[1] + result = self.envlayer.is_distro_azure_linux_3_or_beyond() + self.assertEqual(result, row[2]) + + # restore original methods + distro.os_release_attr = self.backup_envlayer_distro_os_release_attr + self.envlayer.platform.linux_distribution = self.backup_linux_distribution + def test_filesystem(self): # only validates if these invocable without exceptions backup_retry_count = Constants.MAX_FILE_OPERATION_RETRY_COUNT diff --git a/src/core/tests/Test_TdnfPackageManager.py b/src/core/tests/Test_TdnfPackageManager.py index f7f53922..c05275de 100644 --- a/src/core/tests/Test_TdnfPackageManager.py +++ b/src/core/tests/Test_TdnfPackageManager.py @@ -27,6 +27,7 @@ from core.tests.library.LegacyEnvLayerExtensions import LegacyEnvLayerExtensions from core.tests.library.ArgumentComposer import ArgumentComposer from core.tests.library.RuntimeCompositor import RuntimeCompositor +from core.src.external_dependencies import distro class TestTdnfPackageManager(unittest.TestCase): @@ -44,8 +45,46 @@ def mock_do_processes_require_restart_raise_exception(self): def mock_linux_distribution_to_return_azure_linux(self): return ['Microsoft Azure Linux', '3.0', ''] + def mock_linux_distribution_to_return_azure_linux_2(self): + return ['Common Base Linux Mariner', '2.0', ''] + def mock_write_with_retry_raise_exception(self, file_path_or_handle, data, mode='a+'): raise Exception + + def mock_run_command_output_return_tdnf_3(self, cmd, no_output=False, chk_err=True): + """ Mock for run_command_output to return tdnf 3 """ + return 0, "3.5.8-3\n" + + def mock_run_command_output_return_1(self, cmd, no_output=False, chk_err=True): + """ Mock for run_command_output to return None """ + return 1, "No output available\n" + + def mock_run_command_output_return_0(self, cmd, no_output=False, chk_err=True): + return 0, "Successfully executed command\n" + + def mock_get_tdnf_version_return_tdnf_3_5_8_3(self): + return "3.5.8-3.azl3" + + def mock_get_tdnf_version_return_tdnf_4_0(self): + return "4.0.0-1.azl3" + + def mock_get_tdnf_version_return_tdnf_2_5(self): + return "2.5.0-1.cm2" + + def mock_get_tdnf_version_return_tdnf_3_5_8_2(self): + return "3.5.8-2.azl3" + + def mock_get_tdnf_version_return_tdnf_3_5_8_6_cm2(self): + return "3.5.8-6.cm2" + + def mock_get_tdnf_version_return_None(self): + return None + + def mock_distro_os_release_attr_return_azure_linux_3(self, attribute): + return '3.0.0' + + def mock_distro_os_release_attr_return_azure_linux_2(self, attribute): + return '2.9.0' # endregion # region Utility Functions @@ -347,7 +386,7 @@ def test_inclusion_type_all(self): self.assertEqual("6.6.78.1-1.azl3", package_versions[8]) def test_inclusion_type_critical(self): - """Unit test for tdnf package manager with inclusion and Classification = Critical. Returns no packages since classifications are not available in Azure Linux""" + """Unit test for tdnf package manager with inclusion and Classification = Critical. Returns all packages since classifications are not available in Azure Linux, hence everything is considered as Critical.""" self.runtime.set_legacy_test_type('HappyPath') package_manager = self.container.get('package_manager') self.assertTrue(package_manager is not None) @@ -365,11 +404,11 @@ def test_inclusion_type_critical(self): # test for get_available_updates available_updates, package_versions = package_manager.get_available_updates(package_filter) - self.assertTrue(available_updates == []) - self.assertTrue(package_versions == []) + self.assertEqual(9, len(available_updates)) + self.assertEqual(9, len(package_versions)) def test_inclusion_type_other(self): - """Unit test for tdnf package manager with inclusion and Classification = Other. All packages are considered are 'Other' since AzLinux does not have patch classification""" + """Unit test for tdnf package manager with inclusion and Classification = Other. All packages are considered are 'Security' since TDNF does not have patch classification""" self.runtime.set_legacy_test_type('HappyPath') package_manager = self.container.get('package_manager') self.assertTrue(package_manager is not None) @@ -389,26 +428,8 @@ def test_inclusion_type_other(self): available_updates, package_versions = package_manager.get_available_updates(package_filter) self.assertTrue(available_updates is not None) self.assertTrue(package_versions is not None) - self.assertEqual(9, len(available_updates)) - self.assertEqual(9, len(package_versions)) - self.assertEqual("azurelinux-release.noarch", available_updates[0]) - self.assertEqual("3.0-16.azl3", package_versions[0]) - self.assertEqual("azurelinux-repos-ms-oss.noarch", available_updates[1]) - self.assertEqual("3.0-3.azl3", package_versions[1]) - self.assertEqual("libseccomp.x86_64", available_updates[2]) - self.assertEqual("2.5.4-1.azl3", package_versions[2]) - self.assertEqual("python3.x86_64", available_updates[3]) - self.assertEqual("3.12.3-6.azl3", package_versions[3]) - self.assertEqual("libxml2.x86_64", available_updates[4]) - self.assertEqual("2.11.5-1.azl3", package_versions[4]) - self.assertEqual("dracut.x86_64", available_updates[5]) - self.assertEqual("102-7.azl3", package_versions[5]) - self.assertEqual("hyperv-daemons-license.noarch", available_updates[6]) - self.assertEqual("6.6.78.1-1.azl3", package_versions[6]) - self.assertEqual("hypervvssd.x86_64", available_updates[7]) - self.assertEqual("6.6.78.1-1.azl3", package_versions[7]) - self.assertEqual("hypervkvpd.x86_64", available_updates[8]) - self.assertEqual("6.6.78.1-1.azl3", package_versions[8]) + self.assertEqual(0, len(available_updates)) + self.assertEqual(0, len(package_versions)) def test_inclusion_only(self): """Unit test for tdnf package manager with inclusion only and NotSelected Classifications""" @@ -871,6 +892,122 @@ def test_revert_auto_os_update_to_system_default(self): self.__assert_std_io(captured_output=captured_output, expected_output=testcase["stdio"]["expected_output"]) self.__assert_reverted_automatic_patch_configuration_settings(package_manager, config_exists=bool(testcase["assertions"]["config_exists"]), config_value_expected=testcase["assertions"]["config_value_expected"]) + def test_set_max_patch_publish_date(self): + """Unit test for tdnf package manager set_max_patch_publish_date method""" + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager is not None) + + input_output_table_for_successful_cases = [ + ["20240702T000000Z", "1719878400"], + ["", ""] + ] + for row in input_output_table_for_successful_cases: + package_manager.set_max_patch_publish_date(row[0]) + self.assertEqual(package_manager.max_patch_publish_date, row[1]) + + # posix time computation throws an exception if the date is not in the correct format + self.assertRaises(ValueError, package_manager.set_max_patch_publish_date, "2024-07-02T00:00:00Z") + + def test_get_tdnf_version(self): + """Unit test for tdnf package manager get_tdnf_version method""" + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager is not None) + self.backup_run_command_output = self.runtime.env_layer.run_command_output + + test_input_output_table = [ + [self.mock_run_command_output_return_tdnf_3, "3.5.8-3"], + [self.mock_run_command_output_return_1, None], + ] + + for row in test_input_output_table: + self.runtime.env_layer.run_command_output = row[0] + version = package_manager.get_tdnf_version() + self.assertEqual(version, row[1]) + + self.runtime.env_layer.run_command_output = self.backup_run_command_output + + def test_is_mininum_tdnf_version_for_strict_sdp_installed(self): + """Unit test for tdnf package manager is_minimum_tdnf_version method""" + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager is not None) + + self.backup_get_tdnf_version = package_manager.get_tdnf_version + + test_input_output_table = [ + [self.mock_get_tdnf_version_return_None, False], + [self.mock_get_tdnf_version_return_tdnf_2_5, False], + [self.mock_get_tdnf_version_return_tdnf_3_5_8_2, False], + [self.mock_get_tdnf_version_return_tdnf_3_5_8_6_cm2, False], + [self.mock_get_tdnf_version_return_tdnf_3_5_8_3, True], + [self.mock_get_tdnf_version_return_tdnf_4_0, True] + ] + + for row in test_input_output_table: + package_manager.get_tdnf_version = row[0] + result = package_manager.is_minimum_tdnf_version_for_strict_sdp_installed() + self.assertEqual(result, row[1]) + + package_manager.get_tdnf_version = self.backup_get_tdnf_version + + def test_try_tdnf_update_to_meet_strict_sdp_requirements(self): + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager is not None) + + self.backup_run_command_output = self.runtime.env_layer.run_command_output + + input_output_table = [ + [self.mock_run_command_output_return_0, True], + [self.mock_run_command_output_return_1, False], + ] + + for row in input_output_table: + self.runtime.env_layer.run_command_output = row[0] + result = package_manager.try_tdnf_update_to_meet_strict_sdp_requirements() + self.assertEqual(result, row[1]) + + self.runtime.env_layer.run_command_output = self.backup_run_command_output + + def test_try_meet_azgps_coordinated_requirements(self): + package_manager = self.container.get('package_manager') + self.assertTrue(package_manager is not None) + + # backup methods + self.backup_linux_distribution = self.runtime.env_layer.platform.linux_distribution + self.backup_distro_os_release_attr = distro.os_release_attr + self.backup_get_tdnf_version = package_manager.get_tdnf_version + self.backup_run_command_output = self.runtime.env_layer.run_command_output + + """ test cases: + 1. Azure Linux 3 with tdnf version > 3.5.8-3 + 2. Azure Linux 3 with tdnf version = 3.5.8-3 + 3. Azure Linux 3 with tdnf version < 3.5.8-3, will be updated to 3.5.8-3 successfully + 4. Azure Linux 3 with tdnf version < 3.5.8-3, will not be updated to 3.5.8-3 + 5. Azure Linux 2""" + test_input_output_table = [ + [self.mock_linux_distribution_to_return_azure_linux, self.mock_distro_os_release_attr_return_azure_linux_3, self.mock_get_tdnf_version_return_tdnf_4_0, self.backup_run_command_output, True], + [self.mock_linux_distribution_to_return_azure_linux, self.mock_distro_os_release_attr_return_azure_linux_3, self.mock_get_tdnf_version_return_tdnf_3_5_8_3, self.backup_run_command_output, True], + [self.mock_linux_distribution_to_return_azure_linux, self.mock_distro_os_release_attr_return_azure_linux_3, self.mock_get_tdnf_version_return_tdnf_2_5, self.mock_run_command_output_return_0, True], + [self.mock_linux_distribution_to_return_azure_linux, self.mock_distro_os_release_attr_return_azure_linux_3, self.mock_get_tdnf_version_return_tdnf_2_5, self.mock_run_command_output_return_1, False], + [self.mock_linux_distribution_to_return_azure_linux_2, self.mock_distro_os_release_attr_return_azure_linux_2, self.backup_distro_os_release_attr, self.backup_run_command_output, False] + ] + + for row in test_input_output_table: + # set test case values + self.runtime.env_layer.platform.linux_distribution = row[0] + distro.os_release_attr = row[1] + package_manager.get_tdnf_version = row[2] + self.runtime.env_layer.run_command_output = row[3] + + # run test case + result = package_manager.try_meet_azgps_coordinated_requirements() + self.assertEqual(result, row[4]) + + # restore original methods + self.runtime.env_layer.platform.linux_distribution = self.backup_linux_distribution + distro.os_release_attr = self.backup_distro_os_release_attr + package_manager.get_tdnf_version = self.backup_get_tdnf_version + self.runtime.env_layer.run_command_output = self.backup_run_command_output + if __name__ == '__main__': unittest.main() diff --git a/src/core/tests/library/LegacyEnvLayerExtensions.py b/src/core/tests/library/LegacyEnvLayerExtensions.py index ede2bf81..967ab32e 100644 --- a/src/core/tests/library/LegacyEnvLayerExtensions.py +++ b/src/core/tests/library/LegacyEnvLayerExtensions.py @@ -14,6 +14,7 @@ # # Requires Python 2.7+ import os +import re import sys from core.src.bootstrap.Constants import Constants @@ -606,6 +607,7 @@ def run_command_output(self, cmd, no_output=False, chk_err=True): "Error(1032) : Operation aborted.\n" elif cmd.find("list installed") > -1: code = 0 + cmd = re.sub(r"--snapshottime=\d+", '', cmd) package = cmd.replace('sudo tdnf list installed ', '') whitelisted_versions = [ '3.0-16.azl3', '3.0-3.azl3', '2.5.4-1.azl3', '3.12.3-6.azl3', '2.11.5-1.azl3', '102-7.azl3', '6.6.78.1-1.azl3'] # any list of versions you want to work for *any* package @@ -624,6 +626,24 @@ def run_command_output(self, cmd, no_output=False, chk_err=True): elif cmd.find("systemctl disable ") > -1: code = 0 output = 'Auto update service disabled' + elif cmd.find("rpm -q tdnf") > -1: + code = 0 + output = '3.5.8-3.azl3' + elif cmd.find('tdnf -y upgrade') > -1: + code = 0 + output = "Loaded plugin: tdnfrepogpgcheck\n" + \ + "Upgrading:\n" + \ + "azurelinux-release noarch 3.0-16.azl3 azurelinux-official-base 847.91k 403.29k\n" + \ + "azurelinux-repos-ms-oss noarch 3.0-3.azl3 azurelinux-official-base 382.51k 258.06k\n\n" + \ + "libseccomp x86_64 2.5.4-1.azl3 azurelinux-official-base 847.91k 403.29k\n" + \ + "python3 x86_64 3.12.3-6.azl3 azurelinux-official-base 382.51k 258.06k\n\n" + \ + "libxml2 x86_64 2.11.5-1.azl3 azurelinux-official-base 847.91k 403.29k\n" + \ + "dracut x86_64 102-7.azl3 azurelinux-official-base 382.51k 258.06k\n\n" + \ + "hyperv-daemons-license noarch 6.6.78.1-1.azl3 azurelinux-official-base 847.91k 403.29k\n" + \ + "hypervvssd x86_64 6.6.78.1-1.azl3 azurelinux-official-base 382.51k 258.06k\n\n" + \ + "hypervkvpd x86_64 6.6.78.1-1.azl3 azurelinux-official-base 847.91k 403.29k\n" + \ + "Total installed size: 1.20M\n" + \ + "Total download size: 661.34k\n" elif self.legacy_test_type == 'SadPath': if cmd.find("cat /proc/cpuinfo | grep name") > -1: code = 0