Core API Reference

User Guide

Core API

determined.core.init

determined.core.init(*, distributed: Optional[determined.core._distributed.DistributedContext] = None, storage_manager: Optional[determined.common.storage.base.StorageManager] = None, preempt_mode: determined.core._preempt.PreemptMode = PreemptMode.WorkersAskChief, tensorboard_mode: determined.core._tensorboard_mode.TensorboardMode = TensorboardMode.AUTO) determined.core._context.Context

core.init() builds a core.Context for use with the Core API.

Always use with core.init() as context instead of instantiating a core.Context directly. Certain components of the Core API may be configured by passing arguments to core.init(). The only arg that is required is a DistributedContext, and even that is only required for for multi-slot tasks.

All of your training must occur within the scope of the with core.init() as core_context, as there are resources necessary for training which start in the core.Context’s __enter__ method and must be cleaned up in its __exit__() method.

Parameters
  • distributed (core.DistributedContext, optional) – Passing a DistributedContext is required for multi-slot training, but unnecessary for single-slot training. Defaults to None.

  • preempt_mode (core.PreemptMode, optional) – Configure the calling pattern for the core_context.preempt.should_preempt() method. See PreemptMode for more detail. Defaults to WorkersAskChief.

  • storage_manager – Internal use only.

  • tensorboard_mode (core.TensorboardMode, optional) – Define how Tensorboard metrics and profiling data are retained. See TensorboardMode` for more detail. Defaults to AUTO.

determined.core.Context

class determined.core.Context(checkpoint: determined.core._checkpoint.CheckpointContext, distributed: Optional[determined.core._distributed.DistributedContext] = None, preempt: Optional[determined.core._preempt.PreemptContext] = None, train: Optional[determined.core._train.TrainContext] = None, searcher: Optional[determined.core._searcher.SearcherContext] = None)

core.Context is a simple composition of several component APIs, with the following public members:

core.Context is a tool for integrating arbitrary distributed tasks into a Determined cluster.

You should always use core.init() instead of creating a core.Context manually.

determined.core.DistributedContext

class determined.core.DistributedContext(*, rank: int, size: int, local_rank: int, local_size: int, cross_rank: int, cross_size: int, chief_ip: Optional[str] = None, pub_port: int = 12360, pull_port: int = 12376, port_offset: int = 0, force_tcp: bool = False)

DistributedContext provides useful methods for effective distributed training.

A DistributedContext has the following required args:
  • rank: the index of this worker in the entire job

  • size: the number of workers in the entire job

  • local_rank: the index of this worker on this machine

  • local_size: the number of workers on this machine

  • cross_rank: the index of this machine in the entire job

  • cross_size: the number of machines in the entire job

Additionally, any time that cross_size > 1, you must also provide:
  • chief_ip: the ip address to reach the chief worker (where rank==0)

Note

DistributedContext has .allgather(), .gather(), and .broadcast() methods, which are easy to use and which can be useful for coordinating work across workers, but it is not a replacement for the allgather/gather/broadcast operations in your particular distributed training framework.

classmethod from_horovod(hvd: Any, chief_ip: Optional[str] = None) determined.core._distributed.DistributedContext

Create a DistributedContext using the provided hvd module to determine rank information.

Example:

import horovod.torch as hvd
hvd.init()
distributed = DistributedContext.from_horovod(hvd)

The IP address for the chief worker is required whenever hvd.cross_size() > 1. The value may be provided using the chief_ip argument or the DET_CHIEF_IP environment variable.

classmethod from_deepspeed(chief_ip: Optional[str] = None) determined.core._distributed.DistributedContext

Create a DistributedContext using the standard deepspeed environment variables to determine rank information.

The IP address for the chief worker is required whenever CROSS_SIZE > 1. The value may be provided using the chief_ip argument or the DET_CHIEF_IP environment variable.

classmethod from_torch_distributed(chief_ip: Optional[str] = None) determined.core._distributed.DistributedContext

Create a DistributedContext using the standard torch distributed environment variables to determine rank information.

The IP address for the chief worker is required whenever CROSS_SIZE > 1. The value may be provided via the chief_ip argument or the DET_CHIEF_IP environment variable.

get_rank() int

Return the rank of the process in the trial. The rank of a process is a unique ID within the trial. That is, no two processes in the same trial are assigned the same rank.

get_local_rank() int

Return the rank of the process on the agent. The local rank of a process is a unique ID within a given agent and trial; that is, no two processes in the same trial that are executing on the same agent are assigned the same rank.

get_size() int

Return the number of slots this trial is running on.

get_num_agents() int

Return the number of agents this trial is running on.

gather(stuff: Any) Optional[List]

Gather stuff to the chief. The chief returns a list of all stuff, and workers return None.

gather() is not a replacement for the gather functionality of your distributed training framework.

gather_local(stuff: Any) Optional[List]

Gather stuff to the local chief. The local chief returns a list of all stuff, and local workers return None.

gather_local() is not a replacement for the gather functionality of your distributed training framework.

allgather(stuff: Any) List

Gather stuff to the chief and broadcast all of it back to the workers.

allgather() is not a replacement for the allgather functionality of your distributed training framework.

allgather_local(stuff: Any) List

Gather stuff to the local chief and broadcast all of it back to the local workers.

allgather_local() is not a replacement for the allgather functionality of your distributed training framework.

broadcast(stuff: Any) Any

Every worker gets the stuff sent by the chief.

broadcast() is not a replacement for the broadcast functionality of your distributed training framework.

broadcast_local(stuff: Optional[Any] = None) Any

Every worker gets the stuff sent by the local chief.

broadcast_local() is not a replacement for the broadcast functionality of your distributed training framework.

determined.core.CheckpointContext

class determined.core.CheckpointContext(dist: determined.core._distributed.DistributedContext, storage_manager: determined.common.storage.base.StorageManager, session: determined.common.api._session.Session, task_id: str, allocation_id: str, tbd_sync_mode: determined.core._tensorboard_mode.TensorboardMode, tensorboard_manager: determined.tensorboard.base.TensorboardManager)

CheckpointContext gives access to checkpoint-related features of a Determined cluster.

upload(ckpt_dir: Optional[Union[str, os.PathLike]], metadata: Optional[Dict[str, Any]] = None, *, shard: bool = False) str

upload() chooses a random storage_id, then uploads the contents of ckpt_dir to checkpoint storage into a directory by the name of the storage_id. The name of the ckpt_dir is not preserved.

When shard=False, only the chief worker (distributed.rank==0) may call upload().

When shard=True, upload() becomes a synchronization point between workers, so all workers must call upload(). Those workers with nothing to upload may pass ckpt_dir=None. The final checkpoint stored in checkpoint storage will contain a union of the contents from each ckpt_dir.

Returns: The storage_id for this checkpoint.

Example:

if core_context.distributed.rank == 0:
    storage_id = core_context.checkpoint.upload(ckpt_dir, shard=False)
    print(f"done uploading checkpoint {storage_id}")
download(storage_id: str, ckpt_dir: Union[str, os.PathLike], download_mode: determined.core._checkpoint.DownloadMode = DownloadMode.LocalWorkersShareDownload, *, selector: Optional[Callable[[str], bool]] = None) None

Download the contents of a checkpoint from checkpoint storage into a directory specified by ckpt_dir, which is created if it does not exist.

Note

This .download() method is similar to but less flexible than the .download() method of the Checkpoint class in the Determined Python SDK. This .download() is here as a convenience.

get_metadata(storage_id: str) Dict[str, Any]

Returns the current metadata associated with the checkpoint.

store_path(metadata: Optional[Dict[str, Any]] = None, *, shard: bool = False) Iterator[Tuple[pathlib.Path, str]]

store_path() is a context manager which chooses a random path and prepares a directory you should save your model to. When the context manager exits, the model is automatically uploaded (at least, for cloud-backed checkpoint storage backends).

When shard=False, only the chief worker (distributed.rank==0) may call store_path().

When shard=True, store_path() becomes a synchronization point between workers, so all workers must call store_path(), even workers which will not write any checkpoint files.

Example:

if core_context.distributed.rank == 0:
    with core_context.checkpoint.store_path(shard=False) as (path, storage_id):
        my_save_model(my_model, path)
        print(f"done saving checkpoint {storage_id}")
    print(f"done uploading checkpoint {storage_id}")
restore_path(storage_id: str, download_mode: determined.core._checkpoint.DownloadMode = DownloadMode.LocalWorkersShareDownload, *, selector: Optional[Callable[[str], bool]] = None) Iterator[pathlib.Path]

restore_path() is a context manager which downloads a checkpoint (if required by the storage backend) and cleans up the temporary files afterwards (if applicable).

In multi-worker scenarios, with the default download_mode (LocalWorkersShareDownload), all workers must call restore_path() but only the local chief worker on each node (distributed.local_rank==0) actually downloads data.

Example:

with core_context.checkpoint.restore_path(my_checkpoint_uuid) as path:
    my_model = my_load_model(path)
delete(storage_id: str) None

Delete a checkpoint from the storage backend.

determined.core.PreemptContext

class determined.core.PreemptContext(session: determined.common.api._session.Session, allocation_id: str, dist: determined.core._distributed.DistributedContext, preempt_mode: determined.core._preempt.PreemptMode = PreemptMode.WorkersAskChief)

PreemptContext gives access to preemption signals that originate from a user action, such as pausing an experiment using the WebUI or the CLI, from in the Determined scheduler.

should_preempt(auto_ack: bool = True) bool

should_preempt() returns True if the task should shut itself down, or False otherwise.

The requirements on the the caller and the synchronization between workers during a call to should_preempt() are defined by the preempt_mode argument passed to the PreemptContext constructor.

Parameters

auto_ack (bool, optional) – In order for the task to be restarted by the Determined master after shutting down due to preemption, the task must acknowledge the preemption signal to the Determined master. When auto_ack is True this acknowledgement is automatically sent the first time that should_preempt() returns True. If you might choose not to exit after receiving the preemption signal (but you still want to check the signal for some purpose), then you should set auto_ack to False. Then if you later do decide to comply with the preemption signal, it is your responsibility to call acknowledge_preemption_signal() manually any time before exiting. Defaults to True.

Note

Currently, only blocking behavior is supported when checking should_preempt(), so it is not performant enough to call every batch.

acknowledge_preemption_signal() None

acknowledge_preemption_signal() tells the Determined master that you are shutting down, but you have not finished your work and you expect to be restarted later to complete it.

This is important to tell the master explicitly because otherwise if the python process exits with a zero exit code, the master interprets that as a completed task, and the task does not get rescheduled.

By default, acknowledge_preemption_signal() is called automatically the first time that should_preempt() returns True, unless should_preempt() is called with auto_ack=False.

determined.core.TrainContext

class determined.core.TrainContext(session: determined.common.api._session.Session, trial_id: int, run_id: int, exp_id: int, distributed: determined.core._distributed.DistributedContext, tensorboard_mode: determined.core._tensorboard_mode.TensorboardMode, tensorboard_manager: determined.tensorboard.base.TensorboardManager, tbd_writer: Optional[determined.tensorboard.metric_writers.callback.BatchMetricWriter])

TrainContext gives access to report training and validation metrics to the Determined master during trial tasks.

set_status(status: str) None

Report a short user-facing string that the WebUI can render to indicate what a trial is working on.

report_training_metrics(steps_completed: int, metrics: Dict[str, Any], batch_metrics: Optional[List[Dict[str, Any]]] = None) None

Report training metrics to the master.

You can include a list of batch_metrics. Batch metrics are not be shown in the WebUI but may be accessed from the master using the CLI for post-processing.

get_tensorboard_path() pathlib.Path

Get TensorBoard log directory path.

upload_tensorboard_files(selector: Callable[[pathlib.Path], bool] = <function TrainContext.<lambda>>, mangler: Callable[[pathlib.Path, int], pathlib.Path] = <function TrainContext.<lambda>>) None

Upload files generated for consumption by Tensorboard to checkpoint storage.

Parameters
  • selector – optional function returning True for a file that should be included. If not provided, all files are uploaded.

  • mangler – optional function modifying the destination file names based on rank.

report_validation_metrics(steps_completed: int, metrics: Dict[str, Any]) None

Report validation metrics to the master.

Note that for hyperparameter search, this is independent of the need to report the searcher metric using SearcherOperation.report_completed() in the Searcher API.

report_early_exit(reason: determined.core._train.EarlyExitReason) None

Report an early exit reason to the Determined master.

Currenlty, the only meaningful value to report is EarlyExitReason.INVALID_HP, which is reported automatically in core.Context.__exit__() detects an exception of type det.InvalidHP.

get_experiment_best_validation() Optional[float]

Get the best reported validation metric reported so far, across the whole experiment.

The returned value is the highest or lowest reported validation metric value, using the searcher.metric field of the experiment config as the key and searcher.smaller_is_better for the comparison.

determined.core.SearcherContext

class determined.core.SearcherContext(session: determined.common.api._session.Session, dist: determined.core._distributed.DistributedContext, trial_id: int, run_id: int, allocation_id: str, units: Optional[determined.core._searcher.Unit] = None)

SearcherContext gives direct access to operations emitted by the search algorithm in the master. Each SearcherOperation emitted has a (unitless) length that you should train for, then you complete the op by reporting the validation metric you are searching over.

It is the user’s responsibility to execute the required training. Because the user configured the length of the searcher in the experiment configuration, the user should know if the unitless length represents epochs, batches, records, etc.

It is also the user’s responsibility to evaluate the model after training and report the correct metric; if you intend to search over a metric called val_accuracy, you should report val_accuracy.

Lastly, it is recommended (not required) to report progress periodically, so that the webui can accurately reflect current progress. Progress is another unitless length.

Example:

# Assuming you configured the searcher in terms of batches,
# the op.length is also interpeted as a batch count.
# Note that you'll have to load your starting point from a
# checkpoint if you want to support pausing/continuing training.
batches_trained = 0

for op in generic_context.searcher.operations():
    # Train for however long the op requires you to.
    # Note that op.length is an absolute length, not an
    # incremental length:
    while batches_trained < op.length:
        my_train_batch()

        batches_trained += 1

        # Reporting progress every batch would be expensive:
        if batches_trained % 1000:
            op.report_progress(batches_trained)

    # After training the required amount, pass your searcher
    # metric to op.report_completed():
    val_metrics = my_validate()
    op.report_completed(val_metrics["my_searcher_metric"])

Note that reporting metrics is completely independent of the SearcherContext API, using core_context.train.report_training_metrics() or core_context.train.report_validation_metrics().

operations(searcher_mode: determined.core._searcher.SearcherMode = SearcherMode.WorkersAskChief, auto_ack: bool = True) Iterator[determined.core._searcher.SearcherOperation]

Iterate through all the operations this searcher has to offer.

See SearcherMode for details about calling requirements in distributed training scenarios.

After training to the point specified by each SearcherOperation, the chief, and only the chief, must call op.report_completed() on each operation. This is true regardless of the searcher_mode setting because the Determined master needs a clear, unambiguous report of when an operation is completed.

acknowledge_out_of_ops() None

acknowledge_out_of_ops() tells the Determined master that you are shutting down because you have recognized the searcher has no more operations for you to complete at this time.

This is important for the Determined master to know that it is safe to restart this process should new operations be assigned to this trial.

acknowledge_out_of_ops() is normally called automatically just before operations() raises a StopIteration, unless operations() is called with auto_ack=False.

get_configured_units() Optional[determined.core._searcher.Unit]

get_configured_units() reports what units were used in the searcher field of the experiment config. If no units were configured, None is returned.

An experiment configured like this causes get_configured_units() to return EPOCHS:

searcher:
  name: single
  max_length:
    epochs: 50

An experiment configured like this causes get_configured_units() to return None:

searcher:
  name: single
  max_length: 50

determined.core.SearcherOperation

class determined.core.SearcherOperation(session: determined.common.api._session.Session, trial_id: int, length: int, is_chief: bool)

A SearcherOperation is a request from the hyperparameter-search logic for the training script to execute one train-validate-report cycle.

Some searchers, such as single, random, or grid, pass only a single SearcherOperation to each trial, while others may pass many SearcherOperations.

Each SearcherOperation has a length attribute representing the cumulative training that should be completed before the validate-report steps of the cycle. The length attribute is absolute, not incremental, meaning that if the searcher wants you to train for 10 units and validate, then train for 10 more units and validate, it emits one SearcherOperation with .length=10 followed by a second SearcherOperation with .length=20. Using absolute lengths instead of incremental lengths makes restarting after crashes simple and robust.

determined.core.PreemptMode

class determined.core.PreemptMode(value)

PreemptMode defines the calling behavior of the PreemptContext.should_preempt() call.

When mode is WorkersAskChief (the default), all workers must call should_preempt() in step. Only the chief actually communicates with the master, then the chief broadcasts its decision to all workers. This guarantees that all workers decide to preempt at the exact same time.

When mode is ChiefOnly, only the chief is allowed to call PreemptContext.should_preempt(). Usually this implies you must manually inform the workers if they should preempt or not.

When mode is WorkersAskMaster, each worker contacts the master independently in order to decide to preempt or not. Each worker receives the preemption signal at roughly the same time, but it becomes your responsibility to tolerate situations where some workers have exited due to preemption and others have not.

determined.core.SearcherMode

class determined.core.SearcherMode(value)

SearcherMode defines the calling behavior of the SearcherContext.operations() call.

When mode is WorkersAskChief (the default), all workers must call SearcherContext.operations() in step with each other. The chief iterates through searcher operations from the master and then propagates the operations to each worker, introducing a synchronization point between workers.

When mode is ChiefOnly, only the chief may call SearcherContext.operations(). Usually this implies you must manually inform the workers of what work to do next.

determined.core.TensorboardMode

class determined.core.TensorboardMode(value)

TensorboardMode defines how Tensorboard artifacts are handled.

In AUTO mode the chief automatically writes any reported training or validation metrics to the Tensorboard path (see TrainContext.get_tensorboard_path()), and automatically uploads all of its own tensorboard artifacts to checkpoint storage. Tensorboard artifacts written by non-chief workers will not be uploaded at all. This is the same behavior that existed prior to 0.18.3.

In MANUAL mode no Tensorboard artifacts are written or uploaded at all. It is entirely up to the user to write their desired metrics and upload them with calls to TrainContext.upload_tensorboard_files().

determined.ClusterInfo

class determined.ClusterInfo(master_url: str, cluster_id: str, agent_id: str, slot_ids: List[int], task_id: str, allocation_id: str, session_token: str, task_type: str, master_cert_name: Optional[str] = None, master_cert_file: Optional[str] = None, latest_checkpoint: Optional[str] = None, trial_info: Optional[determined._info.TrialInfo] = None, rendezvous_info: Optional[determined._info.RendezvousInfo] = None, resources_info: Optional[determined._info.ResourcesInfo] = None)

ClusterInfo exposes various properties that are set for tasks while running on the cluster.

Examples:

info = det.get_cluster_info()
assert info is not None, "this code only runs on-cluster!"

print("master_url", info.master_url)
print("task_id", info.task_id)
print("allocation_id", info.allocation_id)
print("session_token", info.session_token)

print("container_addrs", info.container_addrs)
print("container_rank", info.container_rank)

if info.task_type == "TRIAL":
    print("trial.id", info.trial.id)
    print("trial.hparams", info.trial.hparams)

Warning

Be careful with this object! If you depend on a ClusterInfo object during training for anything more than e.g. informational logging, you run the risk of making your training code unable to run outside of Determined. ClusterInfo is meant to be most useful to custom launch layers, which likely are not able to run outside of Determined anyway.

agent_id

The identifier of the Determined agent this container is running on.

allocation_id

The unique identifier for the current allocation.

cluster_id

The unique identifier for this cluster.

property container_addrs: List[str]

A list of addresses for all containers in the allocation, ordered by rank.

property container_rank: int

The rank assigned to this container.

When using a distributed training framework, the framework may choose a different rank for this container.

property gpu_uuids: List[str]

The UUIDs to the gpus assigned to this container.

property latest_checkpoint: Optional[str]

The checkpoint ID of the most recent checkpoint that should be loaded.

Since non-trial-type tasks cannot currently save checkpoints, .latest_checkpoint is currently always None for non-trial-type tasks.

master_cert_file

The file location for the master certificate, if present, or “noverify” if it has been configured not to verify the master cert.

master_cert_name

The name on the master certificate, when using TLS.

master_url

The url for reaching the master.

session_token

The Determined login session token created for the current task.

slot_ids

The slot ids assigned to this container.

task_id

The unique identifier for the current task.

task_type
The type of task. Currently one of the following string literals:
  • "TRIAL"

  • "NOTEBOOK"

  • "SHELL"

  • "COMMAND"

  • "TENSORBOARD"

  • "CHECKPOINT_GC"

Additional values may be added in the future.

property trial: determined._info.TrialInfo

The TrialInfo sub-info object for the current trial task.

Attempting to read .trial in a non-trial task type will raise a RuntimeError.

property user_data: Dict[str, Any]

The content of the data field of the experiment configuration.

Since other types of configuration files don’t allow a data field, accessing user_data from non-trial-type tasks will always return an empty dictionary.

determined.TrialInfo

class determined.TrialInfo(trial_id: int, experiment_id: int, trial_seed: int, hparams: Dict[str, Any], config: Dict[str, Any], steps_completed: int, trial_run_id: int, debug: bool, unique_port_offset: int, inter_node_network_interface: Optional[str])
experiment_id

The Experiment ID for the current task.

hparams

The hyperparameter values selected for the current Trial.

trial_id

The Trial ID for the current task.

trial_seed

The random seed for the current Trial.