Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 45 additions & 21 deletions aws/lambda/oss_ci_job_queue_time/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ def get_clickhouse_client(
host: str, user: str, password: str
) -> clickhouse_connect.driver.client.Client:
# for local testing only, disable SSL verification
# return clickhouse_connect.get_client(host=host, user=user, password=password, secure=True, verify=False)
return clickhouse_connect.get_client(
host=host, user=user, password=password, secure=True, verify=False
)

return clickhouse_connect.get_client(
host=host, user=user, password=password, secure=True
Expand Down Expand Up @@ -98,7 +100,7 @@ def write_to_file(data: Any, filename="", path=""):
# --------- Github Config File Methods Start----
class LazyFileHistory:
"""
Reads the content of a file from a GitHub repository on the version that it was on a specific time and date provided. It then caches the commits and file contents avoiding unnecessary requests to the GitHub API.
Reads the content of a file from a GitHub repository on the version that it was closest to a specific time and date provided. It then caches the commits and file contents avoiding unnecessary requests to the GitHub API.
All public methods are thread-safe.
"""

Expand All @@ -110,7 +112,9 @@ def __init__(self, repo: Any, path: str) -> None:
self._fetched_all_commits = False
self._lock = threading.RLock()

def get_version_after_timestamp(self, timestamp: str | datetime) -> Optional[str]:
def get_version_close_to_timestamp(
self, timestamp: str | datetime
) -> Optional[str]:
try:
with self._lock:
if not isinstance(timestamp, datetime):
Expand All @@ -120,31 +124,34 @@ def get_version_after_timestamp(self, timestamp: str | datetime) -> Optional[str
)
else:
timestamp = parse(timestamp)
commit = self._find_earliest_after_in_cache(timestamp)

info(
f" [LazyFileHistory] Try fetch cached content for {self.repo} : {self.path} at {timestamp.isoformat()}"
)
commit = self._find_closest_before_or_equal_in_cache(timestamp)
if commit:
return self._fetch_content_for_commit(commit)

if not self._fetched_all_commits:
info(
f" [LazyFileHistory] Nothing found in cache, fetching all commit includes {self.repo}/{self.path} close/equal to {timestamp.isoformat()}"
)
commit = self._fetch_until_timestamp(timestamp)
if commit:
return self._fetch_content_for_commit(commit)
info(
f" [LazyFileHistory] No config file found in cache and in commits for {self.repo}/{self.path}."
)
return None
except Exception as e:
warning(
f"Error fetching content for {self.repo} : {self.path} at {timestamp}: {e}"
f" [LazyFileHistory] Error fetching content for {self.repo} : {self.path} at {timestamp}: {e}"
)

return None

def _find_earliest_after_in_cache(self, timestamp: datetime) -> Optional[str]:
commits_after = [
c for c in self._commits_cache if c.commit.author.date > timestamp
]

if not commits_after:
return None
return commits_after[-1]

def _fetch_until_timestamp(self, timestamp: datetime) -> Optional[str]:
# call github api, with path parameter
# Only commits containing this file path will be returned.
all_commits = self.repo.get_commits(path=self.path)
known_shas = {c.sha for c in self._commits_cache}

Expand All @@ -158,13 +165,29 @@ def _fetch_until_timestamp(self, timestamp: datetime) -> Optional[str]:
if commit.commit.author.date <= timestamp:
break

info(f" [LazyFileHistory] Fetched new commits {len(newly_fetched)}")
if len(newly_fetched) > 0:
newly_fetched.sort(key=lambda c: c.commit.author.date)
info(
f" [LazyFileHistory] Fetched new commits with latest: {newly_fetched[-1].commit.author.date}, oldest:{newly_fetched[-1].commit.author.date}"
)

self._commits_cache.extend(newly_fetched)
self._commits_cache.sort(key=lambda c: c.commit.author.date, reverse=True)
self._commits_cache.sort(key=lambda c: c.commit.author.date)

if not newly_fetched:
self._fetched_all_commits = True

return self._find_earliest_after_in_cache(timestamp)
return self._find_closest_before_or_equal_in_cache(timestamp)

def _find_closest_before_or_equal_in_cache(
self, timestamp: datetime
) -> Optional[str]:
commits_before_equal = [
c for c in self._commits_cache if c.commit.author.date <= timestamp
]
commits_before_equal.sort(key=lambda c: c.commit.author.date)
return commits_before_equal[-1] if commits_before_equal else None

def _fetch_content_for_commit(self, commit: Any) -> str:
if commit.sha not in self._content_cache:
Expand Down Expand Up @@ -286,7 +309,7 @@ def create_runner_labels(
)

if len(all_runners_configs.keys()) == 0:
warning(
raise ValueError(
" [method: create_runner_labels] No runner labels found in the github config files, something is wrong, please investigate."
)

Expand Down Expand Up @@ -324,7 +347,7 @@ def create_runner_labels(
def get_runner_config(
retriever: LazyFileHistory, start_time: str | datetime
) -> Dict[str, Dict[str, Any]]:
contents = retriever.get_version_after_timestamp(start_time)
contents = retriever.get_version_close_to_timestamp(start_time)
if contents:
return explode_runner_variants(yaml.safe_load(contents))
return {"runner_types": {}}
Expand Down Expand Up @@ -590,7 +613,6 @@ def _add_runner_labels(
old_lf_lf_runner_config_retriever,
) -> None:
# create dictionary of tags with set of targeting machine types

lf_runner_config = get_runner_config(lf_runner_config_retriever, start_time)
if not lf_runner_config or not lf_runner_config["runner_types"]:
lf_runner_config = get_runner_config(
Expand Down Expand Up @@ -1170,7 +1192,9 @@ def get_latest_queue_time_histogram_table(
query = """
SELECT toUnixTimestamp(MAX(time)) as latest FROM fortesting.oss_ci_queue_time_histogram
"""
info(" Getting lastest timestamp from misc.oss_ci_queue_time_histogram....")
info(
" Getting lastest timestamp from fortesting.oss_ci_queue_time_histogram...."
)
res = cc.query(query, {})

if res.row_count != 1:
Expand Down
5 changes: 3 additions & 2 deletions aws/lambda/tests/test_lambda_oss_ci_job_queue_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,13 +238,14 @@ def setUp(self) -> None:
self.mock_get_config_retrievers = patcher4.start()
self.mock_envs = envs_patcher.start()

self.mock_get_runner_config.return_value = {"runner_types": {}}
self.mock_get_runner_config.return_value = {
"runner_types": {"pet": {"os": "linux", "is_ephemeral": "false"}}
}
self.mock_get_config_retrievers.return_value = {
"meta": MagicMock(),
"lf": MagicMock(),
"old_lf": MagicMock(),
}

self.addCleanup(patcher2.stop)
self.addCleanup(patcher3.stop)
self.addCleanup(patcher4.stop)
Expand Down