-
Notifications
You must be signed in to change notification settings - Fork 176
reimplement metric diagnosis, combine step with tensor metrics #1525
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
reimplement metric diagnosis, combine step with tensor metrics #1525
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #1525 +/- ##
==========================================
+ Coverage 83.62% 83.75% +0.13%
==========================================
Files 267 268 +1
Lines 27672 27868 +196
==========================================
+ Hits 23140 23342 +202
+ Misses 4532 4526 -6 ☔ View full report in Codecov by Sentry. |
@@ -68,6 +70,23 @@ def get_first_step_event(self): | |||
key = keys[0] | |||
return self._step_events[key] | |||
|
|||
def last_steps_avg_time(self, last_steps): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
steps -> step
|
||
elif event.type == EventTypeName.END: | ||
if not len(keys): | ||
logger.error(f"invalid ckpt step without BEGIN: {event}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replace with warning
) | ||
return | ||
if event.timestamp < last_event.end_timestamp: | ||
logger.error( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replace with warning
logger.error(f"invalid ckpt step: {step_event}, {event}") | ||
return | ||
if step_event.begin_timestamp > event.timestamp: | ||
logger.error( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are too many error logs of this type. There should be a unified event validation process here, where validation failures are logged as warnings.
BTW, currently only event logs that impact core processes use the error level.
) | ||
step_event.step = event.step | ||
step_event.event_state = TrainEventState.TRAIN_EVT_END | ||
step_event.localtime = int(datetime.now().timestamp()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
int(datetime.now().timestamp()) -> int(time.time())
Since we are already using Unix timestamps here, there's no need to add an extra conversion through datetime.
and event_type == EventTypeName.BEGIN | ||
): | ||
logger.info( | ||
f"Collect first step since last rendezvous: {step}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are currently too many similar logs. Many of these logs should use the debug level. Only essential points, such as during output, should use the info level.
Has this been actually tested so far? It’s necessary to evaluate whether the increase in info logs might reduce efficiency during routine log investigations.
hang_downtime = _dlrover_context.hang_downtime | ||
if self._is_observing_paused: | ||
logger.info( | ||
f"Pause _metric_diagnose thread due to " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pause _metric_diagnose thread due to paused status
action_type=DiagnosisActionType.RESTART_WORKER, | ||
instance=DiagnosisConstant.ANY_INSTANCE, | ||
if step_hang is True: | ||
logger.info("Restart worker-0 all processes") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why only worker-0 need restart?
@@ -300,6 +300,11 @@ def _join_rendezvous(self, request: comm.JoinRendezvousRequest): | |||
RendezvousName.ELASTIC_TRAINING | |||
] | |||
training_manager.clear_waiting_nodes() | |||
|
|||
# Pause hang diagnosis during rendezvous | |||
if node_rank == 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation should not be placed here. It should be directly implemented under rdzv_manager
.
@@ -334,6 +339,9 @@ def _get_comm_world(self, request: comm.CommWorldRequest): | |||
rdzv_round = rdzv_manager.get_rdzv_round() | |||
metrics = {CustomMetricKeys.RDZV_ROUND: rdzv_round} | |||
self._job_metric_collector.collect_custom_data(metrics) | |||
# Finish elastic training rendezvous so we continue diagnosis | |||
self._diagnosis_manager.continue_observing() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same this part
What changes were proposed in this pull request?
We enable step collector for atorch trainer, so the starting and ending of each step will be reported to master.
If step has begun but not finished before timeout, it might be hang.
We are also continuing to collect key metrics such as tensor util, if tensor drop to zero for a period of time and the step
is also stuck, the job can be regarded as hang job and we will restart the processes to retry
Why are the changes needed?
To improve training availability
Does this PR introduce any user-facing change?
No
How was this patch tested?
UT