Get Started with Core API#
Learn how to get started with the Core API by incrementing a single integer in a loop.
Note
You can also visit the Core API User Guide where you’ll how to adapt model training code to use the Core API that uses the PyTorch MNIST model as an example.
Visit the API reference |
---|
With the Core API you can train arbitrary models on the Determined platform with seamless access to the the following capabilities:
metrics tracking
checkpoint tracking and preemption support
hyperparameter search
distributing work across multiple GPUs and/or nodes
These features are also available in the higher-level PyTorchTrial and DeepSpeedTrial APIs, both of which are built on top of the Core API.
This user guide shows you how to get started using the Core API.
Get the Tutorial Files#
Access the tutorial files via the core_api.tgz
download or
directly from the Github repository.
Getting Started#
As a simple introduction, this example training script increments a single integer in a loop, instead of training a model with machine learning. The changes shown for the example model should be similar to the changes you make in your actual model.
The 0_start.py
training script used in this example contains your simple “model”:
import logging
import sys
import time
def main(increment_by):
x = 0
for batch in range(100):
x += increment_by
time.sleep(0.1)
print(f"x is now {x}")
if __name__ == "__main__":
main(increment_by=1)
To run this script, create a configuration file with at least the following values:
name: core-api-stage-0
entrypoint: python3 0_start.py
# Use the single-searcher to run just one instance of the training script
searcher:
name: single
# metric is required but it shouldn't hurt to ignore it at this point.
metric: x
max_restarts: 0
The actual configuration file can have any name, but this example uses 0_start.yaml
.
Run the code using the command:
det e create 0_start.yaml . -f
If you navigate to this experiment in the WebUI no metrics are displayed because you have not yet reported them to the master using the Core API.
Report Metrics#
The Core API makes it easy to report training and validation metrics to the master during training with only a few new lines of code.
For this example, create a new training script called
1_metrics.py
by copying the0_start.py
script from Core API User Guide.Begin by importing import the
determined
module:import determined as det
Enable
logging
, using thedet.LOG_FORMAT
for logs. This enables useful log messages from thedetermined
library, anddet.LOG_FORMAT
enables filter-by-level in the WebUI.logging.basicConfig(level=logging.DEBUG, format=det.LOG_FORMAT) # Log at different levels to demonstrate filter-by-level in the WebUI. logging.debug("debug-level message") logging.info("info-level message") logging.warning("warning-level message") logging.error("error-level message")
In your
if __name__ == "__main__"
block, wrap the entire execution ofmain()
within the scope ofdetermined.core.init()
, which prepares resources for training and cleans them up afterward. Add thecore_context
as a new argument tomain()
because the Core API is accessed through thecore_context
object.with det.core.init() as core_context: main(core_context=core_context, increment_by=1)
Within
main()
, add two calls: (1) report training metrics periodically during training and (2) report validation metrics every time a validation runs.def main(core_context, increment_by): x = 0 max_length = 100 for batch in range(max_length): x += increment_by steps_completed = batch + 1 time.sleep(0.1) logging.info(f"x is now {x}") # NEW: report training metrics. if steps_completed % 10 == 0: core_context.train.report_training_metrics( steps_completed=steps_completed, metrics={"x": x} ) # NEW: report training progress. core_context.train.report_progress(steps_completed / float(max_length)) # NEW: report a "validation" metric at the end. core_context.train.report_validation_metrics(steps_completed=steps_completed, metrics={"x": x})
The
report_validation_metrics()
call typically happens after the validation step, however, actual validation is not demonstrated by this example.Create a
1_metrics.yaml
file with anentrypoint
invoking the new1_metrics.py
file. You can copy the0_start.yaml
configuration file and change the first couple of lines:name: core-api-stage-1 entrypoint: python3 1_metrics.py
Run the code using the command:
det e create 1_metrics.yaml . -f
You can now navigate to the new experiment in the WebUI and view the plot populated with the training and validation metrics.
The complete 1_metrics.py
and 1_metrics.yaml
listings used in this example can be found in
the core_api.tgz
download or in the Github repository.
Report Checkpoints#
By checkpointing periodically during training and reporting those checkpoints to the master, you can stop and restart training in two different ways: either by pausing and reactivating training using the WebUI, or by clicking the Continue Trial button after the experiment completes.
These two types of continuations have different behaviors. While you always want to preserve the value you are incrementing (the “model weight”), you do not always want to preserve the batch index. When you pause and reactivate you want training to continue from the same batch index, but when starting a fresh experiment you want training to start with a fresh batch index. You can save the trial ID in the checkpoint and use it to distinguish the two types of continues.
Create a new
2_checkpoints.py
training script called by copying the1_metrics.py
script from Report Metrics.Write save and load methods for your model:
def save_state(x, steps_completed, trial_id, checkpoint_directory): with checkpoint_directory.joinpath("state").open("w") as f: f.write(f"{x},{steps_completed},{trial_id}")
def load_state(trial_id, checkpoint_directory): checkpoint_directory = pathlib.Path(checkpoint_directory) with checkpoint_directory.joinpath("state").open("r") as f: x, steps_completed, ckpt_trial_id = [int(field) for field in f.read().split(",")] if ckpt_trial_id == trial_id: return x, steps_completed else: # This is a new trial; load the "model weight" but not the batch count. return x, 0
In your
if __name__ == "__main__"
block, use the ClusterInfo API to gather additional information about the task running on the cluster, specifically a checkpoint to load from and the trial ID, which you also pass tomain()
.info = det.get_cluster_info() assert info is not None, "this example only runs on-cluster" latest_checkpoint = info.latest_checkpoint trial_id = info.trial.trial_id with det.core.init() as core_context: main( core_context=core_context, latest_checkpoint=latest_checkpoint, trial_id=trial_id, increment_by=1, )
It is recommended that you always follow this pattern of extracting values from the ClusterInfo API and passing the values to lower layers of your code, instead of accessing the ClusterInfo API directly in the lower layers. In this way the lower layer can be written to run on or off of the Determined cluster.
Within
main()
, add logic to continue from a checkpoint, when a checkpoint is provided:def main(core_context, latest_checkpoint, trial_id, increment_by): x = 0 max_length = 100 # NEW: load a checkpoint if one was provided. starting_batch = 0 if latest_checkpoint is not None: with core_context.checkpoint.restore_path(latest_checkpoint) as path: x, starting_batch = load_state(trial_id, path) for batch in range(starting_batch, max_length):
You can checkpoint your model as frequently as you like. For this exercise, save a checkpoint after each training report, and check for a preemption signal after each checkpoint:
if steps_completed % 10 == 0: core_context.train.report_training_metrics( steps_completed=steps_completed, metrics={"x": x} ) core_context.train.report_progress(steps_completed / float(max_length)) # NEW: write checkpoints at regular intervals to limit lost progress # in case of a crash during training. checkpoint_metadata = {"steps_completed": steps_completed} with core_context.checkpoint.store_path(checkpoint_metadata) as (path, uuid): save_state(x, steps_completed, trial_id, path) # NEW: check for a preemption signal. This could originate from a # higher-priority task bumping us off the cluster, from the hpsearch # algorithm, or from a user pausing in the WebUI or CLI. if core_context.preempt.should_preempt(): # At this point, a checkpoint was just saved, so training can exit # immediately and resume when the trial is reactivated. return
Create a
2_checkpoints.yaml
file by copying the0_start.yaml
file and changing the first couple of lines:name: core-api-stage-2 entrypoint: python3 2_checkpoints.py
Run the code using the command:
det e create 2_checkpoints.yaml . -f
You can navigate to the experiment in the WebUI and pause it mid-training. The trial shuts down and stops producing logs. If you reactivate training it resumes where it stopped. After training is completed, click Continue Trial to see that fresh training is started but that the model weight continues from where previous training finished.
The complete 2_checkpoints.py
and 2_checkpoints.yaml
listings used in this example can be
found in the core_api.tgz
download or in the Github repository.
Hyperparameter Search#
With the Core API you can run advanced hyperparameter searches with arbitrary training code. The hyperparameter search logic is in the master, which coordinates many different Trials. Each trial runs a train-validate-report loop:
Train |
Train until a point chosen by the hyperparameter search algorithm and obtained via the Core API. The length of training is absolute, so you have to keep track of how much you have already trained to know how much more to train. |
Validate |
Validate your model to obtain the metric you configured in the
|
Report |
Use the Core API to report results to the master. |
Create a
3_hpsearch.py
training script by copying the2_checkpoints.py
script you created in Report Checkpoints.In your
if __name__ == "__main__"
block, access the hyperparameter values chosen for this trial using the ClusterInfo API and configure the training loop accordingly:hparams = info.trial.hparams with det.core.init() as core_context: main( core_context=core_context, latest_checkpoint=latest_checkpoint, trial_id=trial_id, # NEW: configure the "model" using hparams. increment_by=hparams["increment_by"], )
Create a new
3_hpsearch.yaml
file and add anentrypoint
that invokes3_hpsearch.py
:name: core-api-stage-3 entrypoint: python3 3_hpsearch.py
Add a
hyperparameters
section with the integer-typeincrement_by
hyperparameter value that referenced in the training script:hyperparameters: increment_by: type: int minval: 1 maxval: 8
Run the code using the command:
det e create 3_hpsearch.yaml . -f
The complete 3_hpsearch.py
and 3_hpsearch.yaml
listings used in this example can be found in
the core_api.tgz
download or in the Github repository.
Distributed Training#
The Core API has special considerations for running distributed training. Some of the more important considerations are:
Access to all IP addresses of every node in the Trial (through the ClusterInfo API).
Communication primitives such as
allgather()
,gather()
, andbroadcast()
to give you out-of-the-box coordination between workers.Since many distributed training frameworks expect all workers in training to operate in-step, the
should_preempt()
call is automatically synchronized across workers so that all workers decide to preempt or continue as a unit.
Create a
4_distributed.py
training script by copying the3_hpsearch.py
from Hyperparameter Search.Add launcher logic to execute one worker subprocess per slot.
Start with a
launcher_main()
function that executes one worker subprocess per slot.def launcher_main(slots_per_node, num_nodes, cross_rank): # Use subprocess to start one worker process per node. procs = [] for local_rank in range(slots_per_node): rank = cross_rank * slots_per_node + local_rank cmd = [ # Use the determined.launch.wrap_rank to wrap the worker process. # This ensures logs from each worker can be filtered by rank in the WebUI. "python3", "-m", "determined.launch.wrap_rank", str(rank), "--", # Re-invoke this script but as a worker. "python3", __file__, "worker", str(rank), str(local_rank), ] procs.append(subprocess.Popen(cmd)) # A good launcher normally waits for all workers to finish, but cleans up and exits # nonzero immediately if any worker fails to prevent distributed training jobs from # hanging. One way to do this by managing each worker process in a thread and sending # exit codes over a Queue as workers complete. q = queue.Queue() def wait_for_worker(proc): worker_exit = proc.wait() q.put((proc, worker_exit)) threads = [threading.Thread(target=wait_for_worker, args=(proc,)) for proc in procs] for t in threads: t.start() first_failed_exit = 0 for i in range(slots_per_node): proc, worker_exit = q.get() procs.remove(proc) if worker_exit != 0 and first_failed_exit == 0: # When the first worker crashes, preempt the others. first_failed_exit = worker_exit for proc in procs: proc.kill() for t in threads: t.join() return first_failed_exit
Typically, you do not have to write your own launcher. Determined provides launchers for Horovod,
torch.distributed
, and DeepSpeed. Additionally, there are third-party launchers available, such asmpirun
. When using a custom or third-party launcher, wrap your worker script in thepython -m determined.launcher.wrap_rank
wrapper script so the WebUI log viewer can filter logs by rank.Also add a
worker_main()
that will run training on each slot:def worker_main(slots_per_node, num_nodes, cross_rank, chief_ip, rank, local_rank): # In the absence of a distributed training framework that might define the # rank/local_rank/cross_rank, you can derive them from the ClusterInfo API. distributed = det.core.DistributedContext( rank=rank, size=num_nodes * slots_per_node, local_rank=local_rank, local_size=slots_per_node, cross_rank=cross_rank, cross_size=num_nodes, chief_ip=chief_ip, ) with det.core.init(distributed=distributed) as core_context: main( core_context=core_context, latest_checkpoint=latest_checkpoint, trial_id=trial_id, increment_by=hparams["increment_by"], )
Then modify your
if __name__ == "__main__"
block to invoke the correct*_main()
based on command-line arguments:slots_per_node = len(info.slot_ids) num_nodes = len(info.container_addrs) cross_rank = info.container_rank chief_ip = info.container_addrs[0] # NEW: This script is invoked both as a launcher-of-workers, and again as each worker. if sys.argv[1] == "launcher": # Usage: SCRIPT launcher exitcode = launcher_main(slots_per_node, num_nodes, cross_rank) sys.exit(exitcode) if sys.argv[1] == "worker": # Usage: SCRIPT worker $RANK $LOCAL_RANK logging.info(f"worker starting") rank = int(sys.argv[2]) local_rank = int(sys.argv[3]) exitcode = worker_main(slots_per_node, num_nodes, cross_rank, chief_ip, rank, local_rank) sys.exit(exitcode) raise ValueError(f"unrecognized first argument: {sys.argv[1]}")
In the training code, use the
allgather
primitive to do a “distributed” increment, to gain experience using the communication primitives:all_increment_bys = core_context.distributed.allgather(increment_by) x += sum(all_increment_bys)
Usually, trial logs are easier to read when status is only printed on the chief worker:
if core_context.distributed.rank == 0: logging.info(f"x is now {x}")
Only the chief worker is permitted to report metrics, upload checkpoints, or report progress. This rule applies to the steps you take periodically during training:
if steps_completed % 10 == 0: # NEW: only the chief may report metrics and progress, # or upload checkpoints. if core_context.distributed.rank == 0: core_context.train.report_training_metrics( steps_completed=steps_completed, metrics={"x": x} ) core_context.train.report_progress(steps_completed / float(max_length)) core_context.train.report_validation_metrics( steps_completed=steps_completed, metrics={"x": x} ) checkpoint_metadata = {"steps_completed": steps_completed} with core_context.checkpoint.store_path(checkpoint_metadata) as (path, uuid): save_state(x, steps_completed, trial_id, path) if core_context.preempt.should_preempt(): return
Create a
4_distributed.yaml
file by copying the3_distributed.yaml
file and changing the first couple of lines:name: core-api-stage-4 entrypoint: python3 4_distributed.py launcher
Set the
resources.slots_per_trial
field to the number of GPUs you want:resources: slots_per_trial: 8
You can return to using the
single
searcher instead of anadaptive_asha
hyperparameter search:searcher: name: single metric: x
Run the code using the Determined CLI with the following command:
det e create 4_distributed.yaml . -f
The complete 4_distributed.py
and 3_hpsearch.yaml
listings used in this example can be found
in the core_api.tgz
download or in the Github repository.