diff --git a/aws/lambda/oss_ci_job_queue_time/lambda_function.py b/aws/lambda/oss_ci_job_queue_time/lambda_function.py index be39ecc4ec..7f3d9884ef 100644 --- a/aws/lambda/oss_ci_job_queue_time/lambda_function.py +++ b/aws/lambda/oss_ci_job_queue_time/lambda_function.py @@ -31,7 +31,9 @@ def get_clickhouse_client( host: str, user: str, password: str ) -> clickhouse_connect.driver.client.Client: # for local testing only, disable SSL verification - # return clickhouse_connect.get_client(host=host, user=user, password=password, secure=True, verify=False) + return clickhouse_connect.get_client( + host=host, user=user, password=password, secure=True, verify=False + ) return clickhouse_connect.get_client( host=host, user=user, password=password, secure=True @@ -98,7 +100,7 @@ def write_to_file(data: Any, filename="", path=""): # --------- Github Config File Methods Start---- class LazyFileHistory: """ - Reads the content of a file from a GitHub repository on the version that it was on a specific time and date provided. It then caches the commits and file contents avoiding unnecessary requests to the GitHub API. + Reads the content of a file from a GitHub repository on the version that it was closest to a specific time and date provided. It then caches the commits and file contents avoiding unnecessary requests to the GitHub API. All public methods are thread-safe. """ @@ -110,7 +112,9 @@ def __init__(self, repo: Any, path: str) -> None: self._fetched_all_commits = False self._lock = threading.RLock() - def get_version_after_timestamp(self, timestamp: str | datetime) -> Optional[str]: + def get_version_close_to_timestamp( + self, timestamp: str | datetime + ) -> Optional[str]: try: with self._lock: if not isinstance(timestamp, datetime): @@ -120,31 +124,34 @@ def get_version_after_timestamp(self, timestamp: str | datetime) -> Optional[str ) else: timestamp = parse(timestamp) - commit = self._find_earliest_after_in_cache(timestamp) + + info( + f" [LazyFileHistory] Try fetch cached content for {self.repo} : {self.path} at {timestamp.isoformat()}" + ) + commit = self._find_closest_before_or_equal_in_cache(timestamp) if commit: return self._fetch_content_for_commit(commit) if not self._fetched_all_commits: + info( + f" [LazyFileHistory] Nothing found in cache, fetching all commit includes {self.repo}/{self.path} close/equal to {timestamp.isoformat()}" + ) commit = self._fetch_until_timestamp(timestamp) if commit: return self._fetch_content_for_commit(commit) + info( + f" [LazyFileHistory] No config file found in cache and in commits for {self.repo}/{self.path}." + ) + return None except Exception as e: warning( - f"Error fetching content for {self.repo} : {self.path} at {timestamp}: {e}" + f" [LazyFileHistory] Error fetching content for {self.repo} : {self.path} at {timestamp}: {e}" ) - - return None - - def _find_earliest_after_in_cache(self, timestamp: datetime) -> Optional[str]: - commits_after = [ - c for c in self._commits_cache if c.commit.author.date > timestamp - ] - - if not commits_after: return None - return commits_after[-1] def _fetch_until_timestamp(self, timestamp: datetime) -> Optional[str]: + # call github api, with path parameter + # Only commits containing this file path will be returned. all_commits = self.repo.get_commits(path=self.path) known_shas = {c.sha for c in self._commits_cache} @@ -158,13 +165,29 @@ def _fetch_until_timestamp(self, timestamp: datetime) -> Optional[str]: if commit.commit.author.date <= timestamp: break + info(f" [LazyFileHistory] Fetched new commits {len(newly_fetched)}") + if len(newly_fetched) > 0: + newly_fetched.sort(key=lambda c: c.commit.author.date) + info( + f" [LazyFileHistory] Fetched new commits with latest: {newly_fetched[-1].commit.author.date}, oldest:{newly_fetched[-1].commit.author.date}" + ) + self._commits_cache.extend(newly_fetched) - self._commits_cache.sort(key=lambda c: c.commit.author.date, reverse=True) + self._commits_cache.sort(key=lambda c: c.commit.author.date) if not newly_fetched: self._fetched_all_commits = True - return self._find_earliest_after_in_cache(timestamp) + return self._find_closest_before_or_equal_in_cache(timestamp) + + def _find_closest_before_or_equal_in_cache( + self, timestamp: datetime + ) -> Optional[str]: + commits_before_equal = [ + c for c in self._commits_cache if c.commit.author.date <= timestamp + ] + commits_before_equal.sort(key=lambda c: c.commit.author.date) + return commits_before_equal[-1] if commits_before_equal else None def _fetch_content_for_commit(self, commit: Any) -> str: if commit.sha not in self._content_cache: @@ -286,7 +309,7 @@ def create_runner_labels( ) if len(all_runners_configs.keys()) == 0: - warning( + raise ValueError( " [method: create_runner_labels] No runner labels found in the github config files, something is wrong, please investigate." ) @@ -324,7 +347,7 @@ def create_runner_labels( def get_runner_config( retriever: LazyFileHistory, start_time: str | datetime ) -> Dict[str, Dict[str, Any]]: - contents = retriever.get_version_after_timestamp(start_time) + contents = retriever.get_version_close_to_timestamp(start_time) if contents: return explode_runner_variants(yaml.safe_load(contents)) return {"runner_types": {}} @@ -590,7 +613,6 @@ def _add_runner_labels( old_lf_lf_runner_config_retriever, ) -> None: # create dictionary of tags with set of targeting machine types - lf_runner_config = get_runner_config(lf_runner_config_retriever, start_time) if not lf_runner_config or not lf_runner_config["runner_types"]: lf_runner_config = get_runner_config( @@ -1170,7 +1192,9 @@ def get_latest_queue_time_histogram_table( query = """ SELECT toUnixTimestamp(MAX(time)) as latest FROM fortesting.oss_ci_queue_time_histogram """ - info(" Getting lastest timestamp from misc.oss_ci_queue_time_histogram....") + info( + " Getting lastest timestamp from fortesting.oss_ci_queue_time_histogram...." + ) res = cc.query(query, {}) if res.row_count != 1: diff --git a/aws/lambda/tests/test_lambda_oss_ci_job_queue_time.py b/aws/lambda/tests/test_lambda_oss_ci_job_queue_time.py index 904fcde37d..f0fab0de11 100644 --- a/aws/lambda/tests/test_lambda_oss_ci_job_queue_time.py +++ b/aws/lambda/tests/test_lambda_oss_ci_job_queue_time.py @@ -238,13 +238,14 @@ def setUp(self) -> None: self.mock_get_config_retrievers = patcher4.start() self.mock_envs = envs_patcher.start() - self.mock_get_runner_config.return_value = {"runner_types": {}} + self.mock_get_runner_config.return_value = { + "runner_types": {"pet": {"os": "linux", "is_ephemeral": "false"}} + } self.mock_get_config_retrievers.return_value = { "meta": MagicMock(), "lf": MagicMock(), "old_lf": MagicMock(), } - self.addCleanup(patcher2.stop) self.addCleanup(patcher3.stop) self.addCleanup(patcher4.stop)