Core API User Guide#
This guide will help you get up and running with the Core API.
Visit the API reference |
---|
In this user guide, we’ll show you how to adapt model training code to use the Core API. As an example, we’ll be working with the PyTorch MNIST model.
Objectives#
These step-by-step instructions walk you through modifying a script for the purpose of performing the following functions:
Report metrics
Report checkpoints
Perform a hyperparameter search
Perform distributed training
After completing the steps in this user guide, you will be able to:
Understand the minimum requirements for running an experiment
Modify a script and an experiment configuration file
Understand how to convert model code
Use the Core API to train a model
Prerequisites#
Required
A Determined cluster
Recommended
Step 1: Get the Tutorial Files & Run the Experiment#
To run an experiment, you need, at minimum, a script and an experiment configuration (YAML) file.
Create a new directory.
Access the tutorial files via the core_api_pytorch_mnist.tgz
download link or directly from the Github repository.
These scripts have already been modified to fit the steps outlined in this tutorial.
In this initial step, we’ll run our experiment using the model_def.py
script and its
accompanying const.yaml
experiment configuration file.
CD into the directory and run this command:
det e create const.yaml . -f
Note
det e create const.yaml . -f
instructs Determined to follow the logs of the first trial that
is created as part of the experiment. The command will stay active and display the live output
from the logs of the first trial as it progresses.
Open the Determined WebUI by navigating to the master URL. One way to do this is to navigate to
http://localhost:8080/
, accept the default Determined username, leave the password empty, and
then click Sign In.
Note
This tutorial provides instructions for running a local distributed training job. Your setup may be different. For example, for instructions on how to run a remote distributed training job, visit the Quickstart for Model Developers.
In the WebUI, select your experiment. You’ll notice the tabs do not yet contain any information. In the next section, we’ll report training and validation metrics.
Step 2: Report Metrics#
To report training and validation metrics to the Determined master, we’ll add a few lines of code to
our script. More specifically, we’ll create a Context
object to allow
interaction with the master. Then, we’ll pass the core_context
as an argument into main()
,
train()
, and test()
and modify the function headers accordingly.
To run our experiment, we’ll use the model_def_metrics.py
script and its accompanying
metrics.yaml
experiment configuration file.
Note
For this tutorial, we’ve already created the script for you. Take a moment to review the changes we’ve made to the provided script so that you’ll know how to modify your own script.
To run the experiment, you can either use the provided scripts, which have already been modified, or you can build your own file by making changes to “model_def.py”.
Begin by importing Determined:
import determined as det
Step 2.1: Modify the Main Loop#
We’ll need a Context
object for interacting with the master. To accomplish
this, we’ll modify the __main__
loop to include core_context
:
Note
Refer to the if __name__ == "__main__":
block in model_def_metrics.py
if __name__ == "__main__":
# NEW: Establish new determined.core.Context and pass to main
# function.
with det.core.init() as core_context:
main(core_context=core_context)
Step 2.2: Modify the Train Method#
Use core_context.train
to report training and validation metrics.
Begin by importing the
determined
module:
# NEW: Report training metrics to Determined
# master via core_context.
# Index by (batch_idx + 1) * (epoch-1) * len(train_loader)
# to continuously plot loss on one graph for consecutive
# epochs.
core_context.train.report_training_metrics(
steps_completed=batches_completed + epoch_idx * len(train_loader),
metrics={"train_loss": loss.item()},
)
and core_context.train.report_validation_metrics()
:
# NEW: Report validation metrics to Determined master
# via core_context.
core_context.train.report_validation_metrics(
steps_completed=steps_completed,
metrics={"test_loss": test_loss},
)
Step 2.3: Modify the Test Method#
Modify the test()
function header to include args
and other elements you’ll need during the
evaluation loop. The args
variable lets you pass configuration settings such as batch size and
learning rate. In addition, pass the newly created core_context
into both train()
and
test()
. Passing core_context
enables reporting of metrics to the Determined master.
# NEW: Pass core_context into train() and test().
train(args, model, device, train_loader, optimizer, epoch_idx, core_context)
# NEW: Pass args, test_loader, epoch, and steps_completed into
# test().
test(
args,
model,
device,
test_loader,
epoch_idx,
core_context,
steps_completed=steps_completed,
)
scheduler.step()
Create a steps_completed
variable to plot metrics on a graph in the WebUI:
# NEW: Calculate steps_completed for plotting test metrics.
steps_completed = epoch_idx * len(train_loader)
Step 2.4: Run the Experiment#
Run the following command to run the experiment:
det e create metrics.yaml .
Open the Determined WebUI again and go to the Overview tab.
The WebUI now displays metrics. In this step, you learned how to add a few new lines of code in order to report training and validation metrics to the Determined master. Next, we’ll modify our script to report checkpoints.
Step 3: Checkpointing#
Checkpointing periodically during training and reporting the checkpoints to the master gives us the ability to stop and restart training. In this section, we’ll modify our script for the purpose of checkpointing.
In this step, we’ll run our experiment using the model_def_checkpoints.py
script and its
accompanying checkpoints.yaml
experiment configuration file.
Note
For this tutorial, we’ve already created the script for you. Take a moment to review the changes we’ve made to the provided script so that you’ll know how to modify your own script.
To run the experiment, you can either use the provided scripts, which have already been modified, or you can build your own file by making changes to “model_def.py”.
Step 3.1: Save Checkpoints#
To save checkpoints, add the store_path
function to your script:
# NEW: Save checkpoint.
checkpoint_metadata_dict = {"steps_completed": steps_completed}
# NEW: Here we are saving multiple files to our checkpoint
# directory. 1) a model state file and 2) a file includes
# information about the training loop state.
with core_context.checkpoint.store_path(checkpoint_metadata_dict) as (path, storage_id):
torch.save(model.state_dict(), path / "checkpoint.pt")
with path.joinpath("state").open("w") as f:
f.write(f"{epochs_completed},{info.trial.trial_id}")
Step 3.2: Continuations#
There are two types of continuations: pausing and reactivating training using the WebUI or clicking Continue Trial after the experiment completes.
These two types of continuations have different behaviors. While you always want to preserve the model’s state, you do not always want to preserve the batch index. When you pause and reactivate you want training to continue from the same batch index, but when starting a fresh experiment you want training to start with a fresh batch index. You can save the trial ID in the checkpoint and use it to distinguish the two types of continuations.
To distinguish between the two types of continuations, you can save the trial ID in the checkpoint.
Enable Pausing and Resuming an Experiment
To enable pausing an experiment, enable preemption:
# NEW: Detect when the experiment is paused by the WebUI.
if core_context.preempt.should_preempt():
return
Define a load_state function for restarting model training from existing checkpoint:
# NEW: Define load_state function for restarting model training from
# existing checkpoint. Returns (.pt, int).
# Also update load_state header to take trial info object as an argument.
def load_state(checkpoint_directory, trial_id):
checkpoint_directory = pathlib.Path(checkpoint_directory)
with checkpoint_directory.joinpath("checkpoint.pt").open("rb") as f:
model = torch.load(f)
with checkpoint_directory.joinpath("state").open("r") as f:
epochs_completed, ckpt_trial_id = [int(field) for field in f.read().split(",")]
# Docs snippet start: compare checkpoint and current trial IDs
# If trial ID does not match our current trial ID, we'll ignore
# epochs completed and start training from epoch_idx = 0
if ckpt_trial_id != trial_id:
epochs_completed = 0
# Docs snippet end: compare checkpoint and current trial IDs
return model, epochs_completed
If checkpoint exists, load it and assign it to model state prior to resuming training:
# NEW: If checkpoint exists, load it and assign it to model state
# prior to resuming training.
info = det.get_cluster_info()
assert info is not None, "this example only runs on-cluster"
latest_checkpoint = info.latest_checkpoint
if latest_checkpoint is None:
epochs_completed = 0
else:
with core_context.checkpoint.restore_path(latest_checkpoint) as path:
model, epochs_completed = load_state(path, info.trial.trial_id)
Enable Continuing the Trial
To enable continuing the trial after the experiment completes, save the trial ID. One way to do this is to load the checkpoint and save the checkpoint in a file in the checkpoint directory.
Open the checkpoint.pt
file in binary mode and compare ckpt_trial_id
with the current
trial_id
:
# If trial ID does not match our current trial ID, we'll ignore
# epochs completed and start training from epoch_idx = 0
if ckpt_trial_id != trial_id:
epochs_completed = 0
Save the checkpoint in the checkpoint.pt
file:
# NEW: Save checkpoint.
checkpoint_metadata_dict = {"steps_completed": steps_completed}
# NEW: Here we are saving multiple files to our checkpoint
# directory. 1) a model state file and 2) a file includes
# information about the training loop state.
with core_context.checkpoint.store_path(checkpoint_metadata_dict) as (path, storage_id):
torch.save(model.state_dict(), path / "checkpoint.pt")
with path.joinpath("state").open("w") as f:
f.write(f"{epochs_completed},{info.trial.trial_id}")
Detect when the experiment is paused by the WebUI:
# NEW: Detect when the experiment is paused by the WebUI.
if core_context.preempt.should_preempt():
return
Step 3.3: Run the Experiment#
Run the following command to run the experiment:
det e create checkpoints.yaml . -f
In the Determined WebUI, nagivate to the Checkpoints tab.
Checkpoints are saved and deleted according to the default Checkpoint Policy. You can modify the checkpoint policy in the experiment configuration file.
Step 4: Hyperparameter Search#
With the Core API you can run advanced hyperparameter searches with arbitrary training code. The hyperparameter search logic is in the master, which coordinates many different Trials. Each trial runs a train-validate-report loop:
Train |
Train until a point chosen by the hyperparameter search algorithm and obtained via the Core API. The length of training is absolute, so you have to keep track of how much you have already trained to know how much more to train. |
Validate |
Validate your model to obtain the metric you configured in the
|
Report |
Use the Core API to report results to the master. |
To perform a hyperparameter search, we’ll update our script to define the hyperparameter search settings we want to use for our experiment. More specifically, we’ll need to define the following settings in our experiment configuration file:
name:
adaptive_asha
(name of our searcher. For all options, visit Search Methods.metric
:test_loss
smaller_is_better
:True
(This is equivalent to minimization vs. maximization of objective.)max_trials
: 500 (This is the maximum number of trials the searcher should run.)max_length
: 20 epochs (The max length of a trial. For more information, visit Adaptive ASHA in the Experiment Configuration Reference.
In addition, we also need to define the hyperparameters themselves. Adaptive ASHA will pick values
between the minval
and maxval
for each hyperparameter for each trial.
Note
To see early stopping in action, try setting max_trials
to over 500 and playing around with
the hyperparameter search values.
In this step, we’ll run our experiment using the model_def_adaptive.py
script and its
accompanying adaptive.yaml
experiment configuration file.
Note
For this tutorial, we’ve already created the script for you. Take a moment to review the changes we’ve made to the provided script so that you’ll know how to modify your own script.
To run the experiment, you can either use the provided scripts, which have already been modified, or you can build your own file by making changes to “model_def.py”.
Begin by accessing the hyperparameters in your code:
# NEW: Get hparams chosen for this trial from cluster info object.
hparams = info.trial.hparams
Then, pass the hyperparameters into your model and optimizer:
# NEW: Pass relevant hparams to model and optimizer.
model = Net(hparams).to(device)
optimizer = optim.Adadelta(model.parameters(), lr=hparams["learning_rate"])
Ensure your model is set to use the selected values on a per-trial basis rather than your previously hardcoded values:
# NEW: Add hparams to __init__.
def __init__(self, hparams):
# NEW: Read hyperparameters provided for this trial.
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, hparams["n_filters1"], 3, 1)
self.conv2 = nn.Conv2d(hparams["n_filters1"], hparams["n_filters2"], 3, 1)
self.dropout1 = nn.Dropout(hparams["dropout1"])
self.dropout2 = nn.Dropout(hparams["dropout2"])
self.fc1 = nn.Linear(144 * hparams["n_filters2"], 128)
self.fc2 = nn.Linear(128, 10)
Step 4.1: Run the Experiment#
Run the following command to run the experiment:
det e create adaptive.yaml .
In the Determined WebUI, navigate to the Hyperparameters tab.
You should see a graph in the WebUI that displays the various trials initiated by the Adaptive ASHA hyperparameter search algorithm.
Step 5: Distributed Training#
The Core API has special features for running distributed training. Some of the more important features are:
Access to all IP addresses of every node in the Trial (through the ClusterInfo API).
Communication primitives such as
allgather()
,gather()
, andbroadcast()
to give you out-of-the-box coordination between workers.Since many distributed training frameworks expect all workers in training to operate in-step, the
should_preempt()
call is automatically synchronized across workers so that all workers decide to preempt or continue as a unit.
Tip
Launchers
Typically, you do not have to write your own launcher. Determined provides launchers for Horovod, torch.distributed, and DeepSpeed. For more information about launcher options, visit Submit Experiment.
In this example, we’ll be using PyTorch’s DistributedDataParallel. We’ll also need to make specific changes to our configuration experiment file.
In this step, we’ll run our experiment using the model_def_distributed.py
script and its
accompanying distributed.yaml
experiment configuration file.
Note
For this tutorial, we’ve already created the script for you. Take a moment to review the changes we’ve made to the provided script so that you’ll know how to modify your own script.
To run the experiment, you can either use the provided scripts, which have already been modified, or you can build your own file by making changes to “model_def.py”.
Step 5.1: Edit Your Experiment Configuration File#
Edit your experiment configuration file to point to a launch script:
entrypoint: >-
python3 -m determined.launch.torch_distributed
python3 model_def_distributed.py
and, set slots_per_trial
(under resources
) to the number of GPUs you want to distribute the
training across:
resources:
slots_per_trial: 4
Step 5.2: Modify Your Training Script#
Add a few more imports to your training script:
# NEW: Import torch distributed libraries.
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.optim.lr_scheduler import StepLR
from torch.utils.data.distributed import DistributedSampler
from torchvision import datasets, transforms
import determined as det
Initialize a process group using torch
. After initializing a process group, initialize a
Determined distributed context using from_torch_distributed
:
if __name__ == "__main__":
# NEW: Initialize process group using torch.
dist.init_process_group("nccl")
# NEW: Initialize distributed context using from_torch_distributed
# (obtains info such as rank, size, etc. from default torch
# environment variables).
distributed = det.core.DistributedContext.from_torch_distributed()
with det.core.init(distributed=distributed) as core_context:
main(core_context)
In main
, set your selected device to the device with index of local_rank
. This is a best
practice even if you only have a single GPU-per-node setup:
Note
Refer to the if use_cuda:
block in model_def_distributed.py
# NEW: Change selected device to the one with index of local_rank.
device = torch.device(core_context.distributed.local_rank)
elif use_mps:
device = torch.device("mps")
else:
device = torch.device("cpu")
Shard the data into num_replicas
non-overlapping parts. num_replicas
is equal to
core_context.distributed.size
, or the number of slots:
# NEW: Create DistributedSampler object for sharding data into
# core_context.distributed.size parts.
train_sampler = DistributedSampler(
train_dataset,
num_replicas=core_context.distributed.size,
rank=core_context.distributed.rank,
shuffle=True,
)
test_sampler = DistributedSampler(
test_dataset,
num_replicas=core_context.distributed.size,
rank=core_context.distributed.rank,
shuffle=True,
)
# NEW: Shard data.
train_loader = torch.utils.data.DataLoader(train_dataset, sampler=train_sampler, **train_kwargs)
test_loader = torch.utils.data.DataLoader(test_dataset, sampler=test_sampler, **test_kwargs)
Wrap your model with torch’s DistributedDataParallel:
model = Net(hparams).to(device)
# NEW: Wrap model with DDP. Aggregates gradients and synchronizes
# model training across slots.
model = DDP(model, device_ids=[device], output_device=device)
Finally, at each place in the code where you upload checkpoints, report training metrics, or report progress to the master, make sure this is done only on rank 0, e.g.,:
# NEW: Report metrics only on rank 0: only the chief worker
# may report training metrics and progress, or upload checkpoints.
if core_context.distributed.rank == 0:
core_context.train.report_training_metrics(
steps_completed=(batch_idx + 1) + epoch_idx * len(train_loader),
metrics={"train_loss": loss.item()},
)
Step 5.3: Run the Experiment#
Run the following command to run the experiment:
det e create distributed.yaml .
In the Determined WebUI, go to the Cluster pane.
You should be able to see multiple slots active corresponding to the value you set for
slots_per_trial
you set in distributed.yaml
, as well as logs appearing from multiple ranks.
Next Steps#
In this user guide, you learned how to use the Core API to integrate a model into Determined. You also saw how to modify a training script and use the appropriate configuration file to report metrics and checkpointing, perform a hyperparameter search, and run distributed training.
Note
To learn more about distributed training with Determined, visit the conceptual overview or the intro to implementing distributed training.