Skip to content

Commit 716cb38

Browse files
jschefflmajorosdonat
authored andcommitted
Log message source details are grouped (#43681) (#44070)
* Log message source details are grouped (#43681) * Log message source details are grouped * fix static checks * fix pytests * Another pytest fix --------- Co-authored-by: Majoros Donat (XC-DX/EET2-Bp) <[email protected]> (cherry picked from commit 9d18772) * Fix pytest --------- Co-authored-by: majorosdonat <[email protected]>
1 parent 04a9e2d commit 716cb38

File tree

7 files changed

+55
-69
lines changed

7 files changed

+55
-69
lines changed

airflow/utils/log/file_task_handler.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,11 @@ def _read(
416416
)
417417
)
418418
log_pos = len(logs)
419-
messages = "".join([f"*** {x}\n" for x in messages_list])
419+
# Log message source details are grouped: they are not relevant for most users and can
420+
# distract them from finding the root cause of their errors
421+
messages = " INFO - ::group::Log message source details\n"
422+
messages += "".join([f"*** {x}\n" for x in messages_list])
423+
messages += " INFO - ::endgroup::\n"
420424
end_of_log = ti.try_number != try_number or ti.state not in (
421425
TaskInstanceState.RUNNING,
422426
TaskInstanceState.DEFERRED,

tests/api_connexion/endpoints/test_log_endpoint.py

+10-13
Original file line numberDiff line numberDiff line change
@@ -188,10 +188,10 @@ def test_should_respond_200_json(self, try_number):
188188
)
189189
expected_filename = f"{self.log_dir}/dag_id={self.DAG_ID}/run_id={self.RUN_ID}/task_id={self.TASK_ID}/attempt={try_number}.log"
190190
log_content = "Log for testing." if try_number == 1 else "Log for testing 2."
191-
assert (
192-
response.json["content"]
193-
== f"[('localhost', '*** Found local files:\\n*** * {expected_filename}\\n{log_content}')]"
194-
)
191+
assert "[('localhost'," in response.json["content"]
192+
assert f"*** Found local files:\\n*** * {expected_filename}\\n" in response.json["content"]
193+
assert f"{log_content}')]" in response.json["content"]
194+
195195
info = serializer.loads(response.json["continuation_token"])
196196
assert info == {"end_of_log": True, "log_pos": 16 if try_number == 1 else 18}
197197
assert 200 == response.status_code
@@ -244,11 +244,9 @@ def test_should_respond_200_text_plain(
244244
assert 200 == response.status_code
245245

246246
log_content = "Log for testing." if try_number == 1 else "Log for testing 2."
247-
248-
assert (
249-
response.data.decode("utf-8")
250-
== f"localhost\n*** Found local files:\n*** * {expected_filename}\n{log_content}\n"
251-
)
247+
assert "localhost\n" in response.data.decode("utf-8")
248+
assert f"*** Found local files:\n*** * {expected_filename}\n" in response.data.decode("utf-8")
249+
assert f"{log_content}\n" in response.data.decode("utf-8")
252250

253251
@pytest.mark.parametrize(
254252
"request_url, expected_filename, extra_query_string, try_number",
@@ -302,10 +300,9 @@ def test_get_logs_of_removed_task(self, request_url, expected_filename, extra_qu
302300
assert 200 == response.status_code
303301

304302
log_content = "Log for testing." if try_number == 1 else "Log for testing 2."
305-
assert (
306-
response.data.decode("utf-8")
307-
== f"localhost\n*** Found local files:\n*** * {expected_filename}\n{log_content}\n"
308-
)
303+
assert "localhost\n" in response.data.decode("utf-8")
304+
assert f"*** Found local files:\n*** * {expected_filename}\n" in response.data.decode("utf-8")
305+
assert f"{log_content}\n" in response.data.decode("utf-8")
309306

310307
@pytest.mark.parametrize("try_number", [1, 2])
311308
def test_get_logs_response_with_ti_equal_to_none(self, try_number):

tests/providers/amazon/aws/log/test_s3_task_handler.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,8 @@ def test_read(self):
127127
ti.state = TaskInstanceState.SUCCESS
128128
log, metadata = self.s3_task_handler.read(ti)
129129
actual = log[0][0][-1]
130-
expected = "*** Found logs in s3:\n*** * s3://bucket/remote/log/location/1.log\nLog line"
131-
assert actual == expected
130+
assert "*** Found logs in s3:\n*** * s3://bucket/remote/log/location/1.log\n" in actual
131+
assert actual.endswith("Log line")
132132
assert metadata == [{"end_of_log": True, "log_pos": 8}]
133133

134134
def test_read_when_s3_log_missing(self):
@@ -140,7 +140,7 @@ def test_read_when_s3_log_missing(self):
140140
assert len(log) == len(metadata)
141141
actual = log[0][0][-1]
142142
expected = "*** No logs found on s3 for ti=<TaskInstance: dag_for_testing_s3_task_handler.task_for_testing_s3_log_handler test [success]>\n"
143-
assert actual == expected
143+
assert expected in actual
144144
assert {"end_of_log": True, "log_pos": 0} == metadata[0]
145145

146146
def test_s3_read_when_log_missing(self):

tests/providers/google/cloud/log/test_gcs_task_handler.py

+4-3
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,8 @@ def test_should_read_logs_from_remote(self, mock_blob, mock_client, mock_creds,
106106
mock_blob.from_string.assert_called_once_with(
107107
"gs://bucket/remote/log/location/1.log", mock_client.return_value
108108
)
109-
assert logs == "*** Found remote logs:\n*** * gs://bucket/remote/log/location/1.log\nCONTENT"
109+
assert "*** Found remote logs:\n*** * gs://bucket/remote/log/location/1.log\n" in logs
110+
assert logs.endswith("CONTENT")
110111
assert {"end_of_log": True, "log_pos": 7} == metadata
111112

112113
@mock.patch(
@@ -126,13 +127,13 @@ def test_should_read_from_local_on_logs_read_error(self, mock_blob, mock_client,
126127
ti.state = TaskInstanceState.SUCCESS
127128
log, metadata = self.gcs_task_handler._read(ti, self.ti.try_number)
128129

129-
assert log == (
130+
assert (
130131
"*** Found remote logs:\n"
131132
"*** * gs://bucket/remote/log/location/1.log\n"
132133
"*** Unable to read remote log Failed to connect\n"
133134
"*** Found local files:\n"
134135
f"*** * {self.gcs_task_handler.local_base}/1.log\n"
135-
)
136+
) in log
136137
assert metadata == {"end_of_log": True, "log_pos": 0}
137138
mock_blob.from_string.assert_called_once_with(
138139
"gs://bucket/remote/log/location/1.log", mock_client.return_value

tests/providers/microsoft/azure/log/test_wasb_task_handler.py

+6-11
Original file line numberDiff line numberDiff line change
@@ -111,18 +111,13 @@ def test_wasb_read(self, mock_hook_cls, ti):
111111
assert self.wasb_task_handler.wasb_read(self.remote_log_location) == "Log line"
112112
ti = copy.copy(ti)
113113
ti.state = TaskInstanceState.SUCCESS
114-
assert self.wasb_task_handler.read(ti) == (
115-
[
116-
[
117-
(
118-
"localhost",
119-
"*** Found remote logs:\n"
120-
"*** * https://wasb-container.blob.core.windows.net/abc/hello.log\nLog line",
121-
)
122-
]
123-
],
124-
[{"end_of_log": True, "log_pos": 8}],
114+
assert self.wasb_task_handler.read(ti)[0][0][0][0] == "localhost"
115+
assert (
116+
"*** Found remote logs:\n*** * https://wasb-container.blob.core.windows.net/abc/hello.log\n"
117+
in self.wasb_task_handler.read(ti)[0][0][0][1]
125118
)
119+
assert "Log line" in self.wasb_task_handler.read(ti)[0][0][0][1]
120+
assert self.wasb_task_handler.read(ti)[1][0] == {"end_of_log": True, "log_pos": 8}
126121

127122
@mock.patch(
128123
"airflow.providers.microsoft.azure.hooks.wasb.WasbHook",

tests/utils/log/test_log_reader.py

+17-34
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,10 @@ def test_test_read_log_chunks_should_read_one_try(self):
128128
assert logs[0] == [
129129
(
130130
"localhost",
131+
" INFO - ::group::Log message source details\n"
131132
"*** Found local files:\n"
132133
f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n"
134+
" INFO - ::endgroup::\n"
133135
"try_number=1.",
134136
)
135137
]
@@ -141,32 +143,13 @@ def test_test_read_log_chunks_should_read_all_files(self):
141143
ti.state = TaskInstanceState.SUCCESS
142144
logs, metadatas = task_log_reader.read_log_chunks(ti=ti, try_number=None, metadata={})
143145

144-
assert logs == [
145-
[
146-
(
147-
"localhost",
148-
"*** Found local files:\n"
149-
f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n"
150-
"try_number=1.",
151-
)
152-
],
153-
[
154-
(
155-
"localhost",
156-
"*** Found local files:\n"
157-
f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/2.log\n"
158-
f"try_number=2.",
159-
)
160-
],
161-
[
162-
(
163-
"localhost",
164-
"*** Found local files:\n"
165-
f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/3.log\n"
166-
f"try_number=3.",
167-
)
168-
],
169-
]
146+
for i in range(0, 3):
147+
assert logs[i][0][0] == "localhost"
148+
assert (
149+
"*** Found local files:\n"
150+
f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/{i + 1}.log\n"
151+
) in logs[i][0][1]
152+
assert f"try_number={i + 1}." in logs[i][0][1]
170153
assert metadatas == {"end_of_log": True, "log_pos": 13}
171154

172155
def test_test_test_read_log_stream_should_read_one_try(self):
@@ -175,27 +158,27 @@ def test_test_test_read_log_stream_should_read_one_try(self):
175158
ti.state = TaskInstanceState.SUCCESS
176159
stream = task_log_reader.read_log_stream(ti=ti, try_number=1, metadata={})
177160
assert list(stream) == [
178-
"localhost\n*** Found local files:\n"
161+
"localhost\n INFO - ::group::Log message source details\n*** Found local files:\n"
179162
f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n"
180-
"try_number=1.\n"
163+
" INFO - ::endgroup::\ntry_number=1.\n"
181164
]
182165

183166
def test_test_test_read_log_stream_should_read_all_logs(self):
184167
task_log_reader = TaskLogReader()
185168
self.ti.state = TaskInstanceState.SUCCESS # Ensure mocked instance is completed to return stream
186169
stream = task_log_reader.read_log_stream(ti=self.ti, try_number=None, metadata={})
187170
assert list(stream) == [
188-
"localhost\n*** Found local files:\n"
171+
"localhost\n INFO - ::group::Log message source details\n*** Found local files:\n"
189172
f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/1.log\n"
190-
"try_number=1."
173+
" INFO - ::endgroup::\ntry_number=1."
191174
"\n",
192-
"localhost\n*** Found local files:\n"
175+
"localhost\n INFO - ::group::Log message source details\n*** Found local files:\n"
193176
f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/2.log\n"
194-
"try_number=2."
177+
" INFO - ::endgroup::\ntry_number=2."
195178
"\n",
196-
"localhost\n*** Found local files:\n"
179+
"localhost\n INFO - ::group::Log message source details\n*** Found local files:\n"
197180
f"*** * {self.log_dir}/dag_log_reader/task_log_reader/2017-09-01T00.00.00+00.00/3.log\n"
198-
"try_number=3."
181+
" INFO - ::endgroup::\ntry_number=3."
199182
"\n",
200183
]
201184

tests/utils/test_log_handlers.py

+10-4
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,9 @@ def test__read_when_local(self, mock_read_local, create_task_instance):
272272
fth = FileTaskHandler("")
273273
actual = fth._read(ti=local_log_file_read, try_number=1)
274274
mock_read_local.assert_called_with(path)
275-
assert actual == ("*** the messages\nthe log", {"end_of_log": True, "log_pos": 7})
275+
assert "*** the messages\n" in actual[0]
276+
assert actual[0].endswith("the log")
277+
assert actual[1] == {"end_of_log": True, "log_pos": 7}
276278

277279
def test__read_from_local(self, tmp_path):
278280
"""Tests the behavior of method _read_from_local"""
@@ -333,9 +335,11 @@ def test__read_for_celery_executor_fallbacks_to_worker(self, create_task_instanc
333335

334336
fth._read_from_logs_server = mock.Mock()
335337
fth._read_from_logs_server.return_value = ["this message"], ["this\nlog\ncontent"]
336-
actual = fth._read(ti=ti, try_number=1)
338+
actual_text, actual_meta = fth._read(ti=ti, try_number=1)
337339
fth._read_from_logs_server.assert_called_once()
338-
assert actual == ("*** this message\nthis\nlog\ncontent", {"end_of_log": True, "log_pos": 16})
340+
assert "*** this message" in actual_text
341+
assert "this\nlog\ncontent" in actual_text
342+
assert actual_meta == {"end_of_log": True, "log_pos": 16}
339343

340344
@pytest.mark.parametrize(
341345
"remote_logs, local_logs, served_logs_checked",
@@ -379,7 +383,9 @@ def test__read_served_logs_checked_when_done_and_no_local_or_remote_logs(
379383
actual = fth._read(ti=ti, try_number=1)
380384
if served_logs_checked:
381385
fth._read_from_logs_server.assert_called_once()
382-
assert actual == ("*** this message\nthis\nlog\ncontent", {"end_of_log": True, "log_pos": 16})
386+
assert "*** this message\n" in actual[0]
387+
assert actual[0].endswith("this\nlog\ncontent")
388+
assert actual[1] == {"end_of_log": True, "log_pos": 16}
383389
else:
384390
fth._read_from_logs_server.assert_not_called()
385391
assert actual[0]

0 commit comments

Comments
 (0)