Distributed Training¶
The Core API has special considerations for running distributed training. Some of the more important considerations are:
Access to all IP addresses of every node in the Trial (through the ClusterInfo API).
Communication primitives such as
allgather()
,gather()
, andbroadcast()
to give you out-of-the-box coordination between workers.Since many distributed training frameworks expect all workers in training to operate in-step, the
should_preempt()
call is automatically synchronized across workers so that all workers decide to preempt or continue as a unit.
Create a
4_distributed.py
training script by copying the3_hpsearch.py
from Hyperparameter Search.Add launcher logic to execute one worker subprocess per slot.
Start with a
launcher_main()
function that executes one worker subprocess per slot.def launcher_main(slots_per_node, num_nodes, cross_rank): # Use subprocess to start one worker process per node. procs = [] for local_rank in range(slots_per_node): rank = cross_rank * slots_per_node + local_rank cmd = [ # Use the determined.launch.wrap_rank to wrap the worker process. # This ensures logs from each worker can be filtered by rank in the WebUI. "python3", "-m", "determined.launch.wrap_rank", str(rank), "--", # Re-invoke this script but as a worker. "python3", __file__, "worker", str(rank), str(local_rank), ] procs.append(subprocess.Popen(cmd)) # A good launcher normally waits for all workers to finish, but cleans up and exits # nonzero immediately if any worker fails to prevent distributed training jobs from # hanging. One way to do this by managing each worker process in a thread and sending # exit codes over a Queue as workers complete. q = queue.Queue() def wait_for_worker(proc): worker_exit = proc.wait() q.put((proc, worker_exit)) threads = [threading.Thread(target=wait_for_worker, args=(proc,)) for proc in procs] for t in threads: t.start() first_failed_exit = 0 for i in range(slots_per_node): proc, worker_exit = q.get() procs.remove(proc) if worker_exit != 0 and first_failed_exit == 0: # When the first worker crashes, preempt the others. first_failed_exit = worker_exit for proc in procs: proc.kill() for t in threads: t.join() return first_failed_exit
Typically, you do not have to write your own launcher. Determined provides launchers for Horovod,
torch.distributed
, and DeepSpeed. Additionally, there are third-party launchers available, such asmpirun
. When using a custom or third-party launcher, wrap your worker script in thepython -m determined.launcher.wrap_rank
wrapper script so the WebUI log viewer can filter logs by rank.Also add a
worker_main()
that will run training on each slot:def worker_main(slots_per_node, num_nodes, cross_rank, chief_ip, rank, local_rank): # In the absence of a distributed training framework that might define the # rank/local_rank/cross_rank, you can derive them from the ClusterInfo API. distributed = det.core.DistributedContext( rank=rank, size=num_nodes * slots_per_node, local_rank=local_rank, local_size=slots_per_node, cross_rank=cross_rank, cross_size=num_nodes, chief_ip=chief_ip, ) with det.core.init(distributed=distributed) as core_context: main( core_context=core_context, latest_checkpoint=latest_checkpoint, trial_id=trial_id, increment_by=hparams["increment_by"], )
Then modify your
if __name__ == "__main__"
block to invoke the correct*_main()
based on command-line arguments:slots_per_node = len(info.slot_ids) num_nodes = len(info.container_addrs) cross_rank = info.container_rank chief_ip = info.container_addrs[0] # NEW: This script is invoked both as a launcher-of-workers, and again as each worker. if sys.argv[1] == "launcher": # Usage: SCRIPT launcher exitcode = launcher_main(slots_per_node, num_nodes, cross_rank) sys.exit(exitcode) if sys.argv[1] == "worker": # Usage: SCRIPT worker $RANK $LOCAL_RANK logging.info(f"worker starting") rank = int(sys.argv[2]) local_rank = int(sys.argv[3]) exitcode = worker_main(slots_per_node, num_nodes, cross_rank, chief_ip, rank, local_rank) sys.exit(exitcode) raise ValueError(f"unrecognized first argument: {sys.argv[1]}")
In the training code, use the
allgather
primitive to do a “distributed” increment, to gain experience using the communication primitives:all_increment_bys = core_context.distributed.allgather(increment_by) x += sum(all_increment_bys)
Usually, trial logs are easier to read when status is only printed on the chief worker:
if core_context.distributed.rank == 0: logging.info(f"x is now {x}")
Only the chief worker is permitted to report training metrics, report validation metrics, upload checkpoints, or report searcher operations completed. This rule applies to the steps you take periodically during training:
if steps_completed % 10 == 0: # NEW: only the chief may report training metrics and progress, # or upload checkpoints. if core_context.distributed.rank == 0: core_context.train.report_training_metrics( steps_completed=steps_completed, metrics={"x": x} ) op.report_progress(steps_completed) checkpoint_metadata = {"steps_completed": steps_completed} with core_context.checkpoint.store_path(checkpoint_metadata) as ( checkpoint_directory, uuid, ): save_state(x, steps_completed, trial_id, checkpoint_directory) last_checkpoint_batch = steps_completed if core_context.preempt.should_preempt(): return
The rule also applies to the steps you take after validating:
if core_context.distributed.rank == 0: core_context.train.report_validation_metrics( steps_completed=steps_completed, metrics={"x": x} ) op.report_completed(x)
The rule also applies to the conditional save after the main loop completes:
# NEW: again, only the chief may upload checkpoints. if core_context.distributed.rank == 0 and last_checkpoint_batch != steps_completed: checkpoint_metadata = {"steps_completed": steps_completed} with core_context.checkpoint.store_path(checkpoint_metadata) as (path, uuid): save_state(x, steps_completed, trial_id, path)
Create a
4_distributed.yaml
file by copying the3_distributed.yaml
file and changing the first couple of lines:name: core-api-stage-4 entrypoint: python3 4_distributed.py launcher
Set the
resources.slots_per_trial
field to the number of GPUs you want:resources: slots_per_trial: 8
You can return to using the
single
searcher instead of anadaptive_asha
hyperparameter search:searcher: name: single metric: x max_length: 100
Run the code using the Determined CLI with the following command:
det e create 4_distributed.yaml . -f
The complete 4_distributed.py
and 3_hpsearch.yaml
listings used in this example can be found
in the core_api.tgz
download or in the Github repository.