Core API

In this guide, you’ll learn how to use the Core API.

Visit the API reference

det.core API Reference

With the Core API you can train arbitrary models on the Determined platform with seamless access to the the following capabilities:

  • metrics tracking

  • checkpoint tracking and preemption support

  • hyperparameter search

  • distributing work across multiple GPUs and/or nodes

These are the same features provided by the higher-level PyTorchTrial, DeepSpeedTrial, and TFKerasTrial APIs (the higher-level APIs are implemented using the Core API).

This user guide shows you how to get started using the Core API.

Getting Started

As a simple introduction, this example training script increments a single integer in a loop, instead of training a model with machine learning. The changes shown for the example model should be similar to the changes you make in your actual model.

The 0_start.py training script used in this example contains your simple “model”:

import logging
import sys
import time


def main(increment_by):
    x = 0
    for batch in range(100):
        x += increment_by
        time.sleep(0.1)
        print(f"x is now {x}")


if __name__ == "__main__":
    main(increment_by=1)

To run this script, create a configuration file with at least the following values:

name: core-api-stage-0
entrypoint: python3 0_start.py

# Use the single-searcher to run just one instance of the training script
searcher:
   name: single
   # metric is required but it shouldn't hurt to ignore it at this point.
   metric: x
   # max_length is ignored if the training script ignores it.
   max_length: 1

max_restarts: 0

The actual configuration file can have any name, but this example uses 0_start.yaml.

Run the code using the command:

det e create 0_start.yaml . -f

If you navigate to this experiment in the WebUI no metrics are displayed because you have not yet reported them to the master using the Core API.

Report Metrics

The Core API makes it easy to report training and validation metrics to the master during training with only a few new lines of code.

  1. For this example, create a new training script called 1_metrics.py by copying the 0_start.py script from Getting Started.

  2. Begin by importing import the determined module:

    import determined as det
    
    
    
  3. Enable logging, using the det.LOG_FORMAT for logs. This enables useful log messages from the determined library, and det.LOG_FORMAT enables filter-by-level in the WebUI.

        logging.basicConfig(level=logging.DEBUG, format=det.LOG_FORMAT)
        # Log at different levels to demonstrate filter-by-level in the WebUI.
        logging.debug("debug-level message")
        logging.info("info-level message")
        logging.warning("warning-level message")
        logging.error("error-level message")
    
  4. In your if __name__ == "__main__" block, wrap the entire execution of main() within the scope of determined.core.init(), which prepares resources for training and cleans them up afterward. Add the core_context as a new argument to main() because the Core API is accessed through the core_context object.

        with det.core.init() as core_context:
            main(core_context=core_context, increment_by=1)
    
  5. Within main(), add two calls: (1) report training metrics periodically during training and (2) report validation metrics every time a validation runs.

    def main(core_context, increment_by):
        x = 0
        for batch in range(100):
            x += increment_by
            steps_completed = batch + 1
            time.sleep(0.1)
            logging.info(f"x is now {x}")
            # NEW: report training metrics.
            if steps_completed % 10 == 0:
                core_context.train.report_training_metrics(
                    steps_completed=steps_completed, metrics={"x": x}
                )
        # NEW: report a "validation" metric at the end.
        core_context.train.report_validation_metrics(steps_completed=steps_completed, metrics={"x": x})
    

    The report_validation_metrics() call typically happens after the validation step, however, actual validation is not demonstrated by this example.

  6. Create a 1_metrics.yaml file with an entrypoint invoking the new 1_metrics.py file. You can copy the 0_start.yaml configuration file and change the first couple of lines:

    name: core-api-stage-1
    entrypoint: python3 1_metrics.py
    
  7. Run the code using the command:

    det e create 1_metrics.yaml . -f
    
  8. You can now navigate to the new experiment in the WebUI and view the plot populated with the training and validation metrics.

The complete 1_metrics.py and 1_metrics.yaml listings used in this example can be found in the core_api.tgz download or in the Github repository.

Report Checkpoints

By checkpointing periodically during training and reporting those checkpoints to the master, you can stop and restart training in two different ways: either by pausing and reactivating training using the WebUI, or by clicking the Continue Trial button after the experiment completes.

These two types of continuations have different behaviors. While you always want to preserve the value you are incrementing (the “model weight”), you do not always want to preserve the batch index. When you pause and reactivate you want training to continue from the same batch index, but when starting a fresh experiment you want training to start with a fresh batch index. You can save the trial ID in the checkpoint and use it to distinguish the two types of continues.

  1. Create a new 2_checkpoints.py training script called by copying the 1_metrics.py script from Report Metrics.

  2. Write save and load methods for your model:

    def save_state(x, steps_completed, trial_id, checkpoint_directory):
        with checkpoint_directory.joinpath("state").open("w") as f:
            f.write(f"{x},{steps_completed},{trial_id}")
    
    def load_state(trial_id, checkpoint_directory):
        checkpoint_directory = pathlib.Path(checkpoint_directory)
        with checkpoint_directory.joinpath("state").open("r") as f:
            x, steps_completed, ckpt_trial_id = [int(field) for field in f.read().split(",")]
        if ckpt_trial_id == trial_id:
            return x, steps_completed
        else:
            # This is a new trial; load the "model weight" but not the batch count.
            return x, 0
    
  3. In your if __name__ == "__main__" block, use the ClusterInfo API to gather additional information about the task running on the cluster, specifically a checkpoint to load from and the trial ID, which you also pass to main().

        info = det.get_cluster_info()
        assert info is not None, "this example only runs on-cluster"
        latest_checkpoint = info.latest_checkpoint
        trial_id = info.trial.trial_id
    
        with det.core.init() as core_context:
            main(
                core_context=core_context,
                latest_checkpoint=latest_checkpoint,
                trial_id=trial_id,
                increment_by=1,
            )
    

    It is recommended that you always follow this pattern of extracting values from the ClusterInfo API and passing the values to lower layers of your code, instead of accessing the ClusterInfo API directly in the lower layers. In this way the lower layer can be written to run on or off of the Determined cluster.

  4. Within main(), add logic to continue from a checkpoint, when a checkpoint is provided:

    def main(core_context, latest_checkpoint, trial_id, increment_by):
        x = 0
    
        # NEW: load a checkpoint if one was provided.
        starting_batch = 0
        if latest_checkpoint is not None:
            with core_context.checkpoint.restore_path(latest_checkpoint) as path:
                x, starting_batch = load_state(trial_id, path)
    
        for batch in range(starting_batch, 100):
    
  5. You can checkpoint your model as frequently as you like. For this exercise, save a checkpoint after each training report, and check for a preemption signal after each checkpoint:

    if steps_completed % 10 == 0:
        core_context.train.report_training_metrics(
            steps_completed=steps_completed, metrics={"x": x}
        )
    
        # NEW: write checkpoints at regular intervals to limit lost progress
        # in case of a crash during training.
        checkpoint_metadata = {"steps_completed": steps_completed}
        with core_context.checkpoint.store_path(checkpoint_metadata) as (path, uuid):
            save_state(x, steps_completed, trial_id, path)
    
        # NEW: check for a preemption signal.  This could originate from a
        # higher-priority task bumping us off the cluster, or for a user pausing
        # the experiment via the WebUI or CLI.
        if core_context.preempt.should_preempt():
            # At this point, a checkpoint was just saved, so training can exit
            # immediately and resume when the trial is reactivated.
            return
    
  6. Create a 2_checkpoints.yaml file by copying the 0_start.yaml file and changing the first couple of lines:

    name: core-api-stage-2
    entrypoint: python3 2_checkpoints.py
    
  7. Run the code using the command:

    det e create 2_checkpoints.yaml . -f
    
  8. You can navigate to the experiment in the WebUI and pause it mid-training. The trial shuts down and stops producing logs. If you reactivate training it resumes where it stopped. After training is completed, click Continue Trial to see that fresh training is started but that the model weight continues from where previous training finished.

The complete 2_checkpoints.py and 2_checkpoints.yaml listings used in this example can be found in the core_api.tgz download or in the Github repository.

Distributed Training

The Core API has special considerations for running distributed training. Some of the more important considerations are:

  • Access to all IP addresses of every node in the Trial (through the ClusterInfo API).

  • Communication primitives such as allgather(), gather(), and broadcast() to give you out-of-the-box coordination between workers.

  • Since many distributed training frameworks expect all workers in training to operate in-step, the should_preempt() call is automatically synchronized across workers so that all workers decide to preempt or continue as a unit.

  1. Create a 4_distributed.py training script by copying the 3_hpsearch.py from Hyperparameter Search.

  2. Add launcher logic to execute one worker subprocess per slot.

    Start with a launcher_main() function that executes one worker subprocess per slot.

    def launcher_main(slots_per_node, num_nodes, cross_rank):
        # Use subprocess to start one worker process per node.
        procs = []
        for local_rank in range(slots_per_node):
            rank = cross_rank * slots_per_node + local_rank
            cmd = [
                # Use the determined.launch.wrap_rank to wrap the worker process.
                # This ensures logs from each worker can be filtered by rank in the WebUI.
                "python3",
                "-m",
                "determined.launch.wrap_rank",
                str(rank),
                "--",
                # Re-invoke this script but as a worker.
                "python3",
                __file__,
                "worker",
                str(rank),
                str(local_rank),
            ]
            procs.append(subprocess.Popen(cmd))
    
        # A good launcher normally waits for all workers to finish, but cleans up and exits
        # nonzero immediately if any worker fails to prevent distributed training jobs from
        # hanging.  One way to do this by managing each worker process in a thread and sending
        # exit codes over a Queue as workers complete.
        q = queue.Queue()
    
        def wait_for_worker(proc):
            worker_exit = proc.wait()
            q.put((proc, worker_exit))
    
        threads = [threading.Thread(target=wait_for_worker, args=(proc,)) for proc in procs]
    
        for t in threads:
            t.start()
    
        first_failed_exit = 0
        for i in range(slots_per_node):
            proc, worker_exit = q.get()
            procs.remove(proc)
            if worker_exit != 0 and first_failed_exit == 0:
                # When the first worker crashes, preempt the others.
                first_failed_exit = worker_exit
                for proc in procs:
                    proc.kill()
    
        for t in threads:
            t.join()
    
        return first_failed_exit
    

    Typically, you do not have to write your own launcher. Determined provides launchers for Horovod, torch.distributed, and DeepSpeed. Additionally, there are third-party launchers available, such as mpirun. When using a custom or third-party launcher, wrap your worker script in the python -m determined.launcher.wrap_rank wrapper script so the WebUI log viewer can filter logs by rank.

    Also add a worker_main() that will run training on each slot:

    def worker_main(slots_per_node, num_nodes, cross_rank, chief_ip, rank, local_rank):
        # In the absence of a distributed training framework that might define the
        # rank/local_rank/cross_rank, you can derive them from the ClusterInfo API.
        distributed = det.core.DistributedContext(
            rank=rank,
            size=num_nodes * slots_per_node,
            local_rank=local_rank,
            local_size=slots_per_node,
            cross_rank=cross_rank,
            cross_size=num_nodes,
            chief_ip=chief_ip,
        )
    
        with det.core.init(distributed=distributed) as core_context:
            main(
                core_context=core_context,
                latest_checkpoint=latest_checkpoint,
                trial_id=trial_id,
                increment_by=hparams["increment_by"],
            )
    

    Then modify your if __name__ == "__main__" block to invoke the correct *_main() based on command-line arguments:

    slots_per_node = len(info.slot_ids)
    num_nodes = len(info.container_addrs)
    cross_rank = info.container_rank
    chief_ip = info.container_addrs[0]
    
    # NEW: This script is invoked both as a launcher-of-workers, and again as each worker.
    if sys.argv[1] == "launcher":
        # Usage: SCRIPT launcher
        exitcode = launcher_main(slots_per_node, num_nodes, cross_rank)
        sys.exit(exitcode)
    
    if sys.argv[1] == "worker":
        # Usage: SCRIPT worker $RANK $LOCAL_RANK
        logging.info(f"worker starting")
        rank = int(sys.argv[2])
        local_rank = int(sys.argv[3])
        exitcode = worker_main(slots_per_node, num_nodes, cross_rank, chief_ip, rank, local_rank)
        sys.exit(exitcode)
    
    raise ValueError(f"unrecognized first argument: {sys.argv[1]}")
    
  3. In the training code, use the allgather primitive to do a “distributed” increment, to gain experience using the communication primitives:

    all_increment_bys = core_context.distributed.allgather(increment_by)
    x += sum(all_increment_bys)
    
  4. Usually, trial logs are easier to read when status is only printed on the chief worker:

    if core_context.distributed.rank == 0:
        logging.info(f"x is now {x}")
    
  5. Only the chief worker is permitted to report training metrics, report validation metrics, upload checkpoints, or report searcher operations completed. This rule applies to the steps you take periodically during training:

    if steps_completed % 10 == 0:
        # NEW: only the chief may report training metrics and progress,
        # or upload checkpoints.
        if core_context.distributed.rank == 0:
            core_context.train.report_training_metrics(
                steps_completed=steps_completed, metrics={"x": x}
            )
            op.report_progress(steps_completed)
            checkpoint_metadata = {"steps_completed": steps_completed}
            with core_context.checkpoint.store_path(checkpoint_metadata) as (
                checkpoint_directory,
                uuid,
            ):
                save_state(x, steps_completed, trial_id, checkpoint_directory)
            last_checkpoint_batch = steps_completed
        if core_context.preempt.should_preempt():
            return
    

    The rule also applies to the steps you take after validating:

    if core_context.distributed.rank == 0:
        core_context.train.report_validation_metrics(
            steps_completed=steps_completed, metrics={"x": x}
        )
        op.report_completed(x)
    

    The rule also applies to the conditional save after the main loop completes:

    # NEW: again, only the chief may upload checkpoints.
    if core_context.distributed.rank == 0 and last_checkpoint_batch != steps_completed:
        checkpoint_metadata = {"steps_completed": steps_completed}
        with core_context.checkpoint.store_path(checkpoint_metadata) as (path, uuid):
            save_state(x, steps_completed, trial_id, path)
    
  6. Create a 4_distributed.yaml file by copying the 3_distributed.yaml file and changing the first couple of lines:

    name: core-api-stage-4
    entrypoint: python3 4_distributed.py launcher
    

    Set the resources.slots_per_trial field to the number of GPUs you want:

    resources:
      slots_per_trial: 8
    

    You can return to using the single searcher instead of an adaptive_asha hyperparameter search:

    searcher:
       name: single
       metric: x
       max_length: 100
    
  7. Run the code using the Determined CLI with the following command:

    det e create 4_distributed.yaml . -f
    

The complete 4_distributed.py and 3_hpsearch.yaml listings used in this example can be found in the core_api.tgz download or in the Github repository.