det.core API Reference#

User Guide

Core API User Guide

determined.core.init#

determined.core.init(*, distributed: Optional[DistributedContext] = None, checkpoint_storage: Optional[Union[str, Dict[str, Any]]] = None, preempt_mode: PreemptMode = PreemptMode.WorkersAskChief, tensorboard_mode: TensorboardMode = TensorboardMode.AUTO) Context#

core.init() builds a core.Context for use with the Core API.

Always use with core.init() as context instead of instantiating a core.Context directly. Certain components of the Core API may be configured by passing arguments to core.init(). The only arg that is required is a DistributedContext, and even that is only required for multi-slot tasks.

All of your training must occur within the scope of the with core.init() as core_context, as there are resources necessary for training which start in the core.Context’s __enter__ method and must be cleaned up in its __exit__() method.

Parameters:
  • distributed (core.DistributedContext, optional) – Passing a DistributedContext is required for multi-slot training, but unnecessary for single-slot training. Defaults to None.

  • preempt_mode (core.PreemptMode, optional) – Configure the calling pattern for the core_context.preempt.should_preempt() method. See PreemptMode for more detail. Defaults to WorkersAskChief.

  • checkpoint_storage (Union[str, dict], optional) – A directory path or a cloud storage URI of the form s3://<bucket>[/<prefix>] (AWS) or gs://<bucket>[/<prefix>] (GCP). This should only be used when IAM permissions can be assumed. You may also pass a dictionary matching the checkpoint_storage field of the experiment config, with the exception that type: shared_fs configs are not allowed.

  • tensorboard_mode (core.TensorboardMode, optional) – Define how Tensorboard metrics and profiling data are retained. See TensorboardMode` for more detail. Defaults to AUTO.

determined.core.Context#

class determined.core.Context(checkpoint: CheckpointContext, _session: Optional[Session] = None, distributed: Optional[DistributedContext] = None, preempt: Optional[PreemptContext] = None, train: Optional[TrainContext] = None, searcher: Optional[SearcherContext] = None, info: Optional[ClusterInfo] = None, experimental: Optional[ExperimentalCoreContext] = None, profiler: Optional[ProfilerContext] = None, _metrics: Optional[_MetricsContext] = None, _tensorboard_manager: Optional[TensorboardManager] = None, _heartbeat: Optional[_Heartbeat] = None, _log_shipper: Optional[_LogShipper] = None)#

core.Context is a simple composition of several component APIs, with the following public members:

core.Context is a tool for integrating arbitrary distributed tasks into a Determined cluster.

You should always use core.init() instead of creating a core.Context manually.

determined.core.DistributedContext#

class determined.core.DistributedContext(*, rank: int, size: int, local_rank: int, local_size: int, cross_rank: int, cross_size: int, chief_ip: Optional[str] = None, pub_port: int = 12360, pull_port: int = 12376, port_offset: int = 0, force_tcp: bool = False)#

DistributedContext provides useful methods for effective distributed training.

A DistributedContext has the following required args:
  • rank: the index of this worker in the entire job

  • size: the number of workers in the entire job

  • local_rank: the index of this worker on this machine

  • local_size: the number of workers on this machine

  • cross_rank: the index of this machine in the entire job

  • cross_size: the number of machines in the entire job

Additionally, any time that cross_size > 1, you must also provide:
  • chief_ip: the ip address to reach the chief worker (where rank==0)

Note

DistributedContext has .allgather(), .gather(), and .broadcast() methods, which are easy to use and which can be useful for coordinating work across workers, but it is not a replacement for the allgather/gather/broadcast operations in your particular distributed training framework.

classmethod from_horovod(hvd: Any, chief_ip: Optional[str] = None) DistributedContext#

Create a DistributedContext using the provided hvd module to determine rank information.

Example:

import horovod.torch as hvd
hvd.init()
distributed = DistributedContext.from_horovod(hvd)

The IP address for the chief worker is required whenever hvd.cross_size() > 1. The value may be provided using the chief_ip argument or the DET_CHIEF_IP environment variable.

classmethod from_deepspeed(chief_ip: Optional[str] = None) DistributedContext#

Create a DistributedContext using the standard deepspeed environment variables to determine rank information.

The IP address for the chief worker is required whenever CROSS_SIZE > 1. The value may be provided using the chief_ip argument or the DET_CHIEF_IP environment variable.

classmethod from_torch_distributed(chief_ip: Optional[str] = None) DistributedContext#

Create a DistributedContext using the standard torch distributed environment variables to determine rank information.

The IP address for the chief worker is required whenever CROSS_SIZE > 1. The value may be provided via the chief_ip argument or the DET_CHIEF_IP environment variable.

get_rank() int#

Return the rank of the process in the trial. The rank of a process is a unique ID within the trial. That is, no two processes in the same trial are assigned the same rank.

get_local_rank() int#

Return the rank of the process on the agent. The local rank of a process is a unique ID within a given agent and trial; that is, no two processes in the same trial that are executing on the same agent are assigned the same rank.

get_size() int#

Return the number of slots this trial is running on.

get_num_agents() int#

Return the number of agents this trial is running on.

gather(stuff: Any) Optional[List]#

Gather stuff to the chief. The chief returns a list of all stuff, and workers return None.

gather() is not a replacement for the gather functionality of your distributed training framework.

gather_local(stuff: Any) Optional[List]#

Gather stuff to the local chief. The local chief returns a list of all stuff, and local workers return None.

gather_local() is not a replacement for the gather functionality of your distributed training framework.

allgather(stuff: Any) List#

Gather stuff to the chief and broadcast all of it back to the workers.

allgather() is not a replacement for the allgather functionality of your distributed training framework.

allgather_local(stuff: Any) List#

Gather stuff to the local chief and broadcast all of it back to the local workers.

allgather_local() is not a replacement for the allgather functionality of your distributed training framework.

broadcast(stuff: Any) Any#

Every worker gets the stuff sent by the chief.

broadcast() is not a replacement for the broadcast functionality of your distributed training framework.

broadcast_local(stuff: Optional[Any] = None) Any#

Every worker gets the stuff sent by the local chief.

broadcast_local() is not a replacement for the broadcast functionality of your distributed training framework.

determined.core.CheckpointContext#

class determined.core.CheckpointContext(dist: DistributedContext, storage_manager: StorageManager, session: Session, task_id: str, allocation_id: Optional[str], tbd_sync_mode: TensorboardMode, tensorboard_manager: Optional[TensorboardManager], storage_backend_id: Optional[int])#

CheckpointContext gives access to checkpoint-related features of a Determined cluster.

upload(ckpt_dir: Optional[Union[str, PathLike]], metadata: Optional[Dict[str, Any]] = None, *, shard: bool = False, selector: Optional[Callable[[str], bool]] = None) str#

upload() chooses a random storage_id, then uploads the contents of ckpt_dir to checkpoint storage into a directory by the name of the storage_id. The name of the ckpt_dir is not preserved.

When shard=False, only the chief worker (distributed.rank==0) may call upload().

When shard=True, upload() becomes a synchronization point between workers, so all workers must call upload(). Those workers with nothing to upload may pass ckpt_dir=None. The final checkpoint stored in checkpoint storage will contain a union of the contents from each ckpt_dir.

Each worker may optionally provide a selector that accepts a path relative to the checkpoint root, and returns True for paths that should be uploaded.

Returns: The storage_id for this checkpoint.

Example:

if core_context.distributed.rank == 0:
    storage_id = core_context.checkpoint.upload(ckpt_dir, shard=False)
    print(f"done uploading checkpoint {storage_id}")
download(storage_id: str, ckpt_dir: Union[str, PathLike], download_mode: DownloadMode = DownloadMode.LocalWorkersShareDownload, *, selector: Optional[Callable[[str], bool]] = None) None#

Download the contents of a checkpoint from checkpoint storage into a directory specified by ckpt_dir, which is created if it does not exist.

Note

This .download() method is similar to but less flexible than the .download() method of the Checkpoint class in the Determined Python SDK. This .download() is here as a convenience.

get_metadata(storage_id: str) Dict[str, Any]#

Returns the current metadata associated with the checkpoint.

store_path(metadata: Optional[Dict[str, Any]] = None, *, shard: bool = False) Iterator[Tuple[Path, str]]#

store_path() is a context manager which chooses a random path and prepares a directory you should save your model to. When the context manager exits, the model is automatically uploaded (at least, for cloud-backed checkpoint storage backends).

Note

metadata must include a ‘steps_completed’ key in the current implementation. Raises ValueError if the ‘steps_completed’ key is not present in the metadata dictionary.

When shard=False, only the chief worker (distributed.rank==0) may call store_path().

When shard=True, store_path() becomes a synchronization point between workers, so all workers must call store_path(), even workers which will not write any checkpoint files.

Example:

if core_context.distributed.rank == 0:
    with core_context.checkpoint.store_path(shard=False) as (path, storage_id):
        my_save_model(my_model, path)
        print(f"done saving checkpoint {storage_id}")
    print(f"done uploading checkpoint {storage_id}")
restore_path(storage_id: str, download_mode: DownloadMode = DownloadMode.LocalWorkersShareDownload, *, selector: Optional[Callable[[str], bool]] = None) Iterator[Path]#

restore_path() is a context manager which downloads a checkpoint (if required by the storage backend) and cleans up the temporary files afterwards (if applicable).

In multi-worker scenarios, with the default download_mode (LocalWorkersShareDownload), all workers must call restore_path() but only the local chief worker on each node (distributed.local_rank==0) actually downloads data.

Example:

with core_context.checkpoint.restore_path(my_checkpoint_uuid) as path:
    my_model = my_load_model(path)
delete(storage_id: str) None#

Delete a checkpoint from the storage backend and notify the master.

determined.core.PreemptContext#

class determined.core.PreemptContext(session: Session, allocation_id: str, dist: DistributedContext, preempt_mode: PreemptMode = PreemptMode.WorkersAskChief)#

PreemptContext gives access to preemption signals that originate from a user action, such as pausing an experiment using the WebUI or the CLI, from in the Determined scheduler.

should_preempt(auto_ack: bool = True) bool#

should_preempt() returns True if the task should shut itself down, or False otherwise.

The requirements on the the caller and the synchronization between workers during a call to should_preempt() are defined by the preempt_mode argument passed to the PreemptContext constructor.

Parameters:

auto_ack (bool, optional) – In order for the task to be restarted by the Determined master after shutting down due to preemption, the task must acknowledge the preemption signal to the Determined master. When auto_ack is True this acknowledgement is automatically sent the first time that should_preempt() returns True. If you might choose not to exit after receiving the preemption signal (but you still want to check the signal for some purpose), then you should set auto_ack to False. Then if you later do decide to comply with the preemption signal, it is your responsibility to call acknowledge_preemption_signal() manually any time before exiting. Defaults to True.

Note

Currently, only blocking behavior is supported when checking should_preempt(), so it is not performant enough to call every batch.

acknowledge_preemption_signal() None#

acknowledge_preemption_signal() tells the Determined master that you are shutting down, but you have not finished your work and you expect to be restarted later to complete it.

This is important to tell the master explicitly because otherwise if the python process exits with a zero exit code, the master interprets that as a completed task, and the task does not get rescheduled.

By default, acknowledge_preemption_signal() is called automatically the first time that should_preempt() returns True, unless should_preempt() is called with auto_ack=False.

determined.core.TrainContext#

class determined.core.TrainContext(session: Session, trial_id: int, exp_id: int, metrics: _MetricsContext, distributed: DistributedContext, tensorboard_mode: TensorboardMode, tensorboard_manager: Optional[TensorboardManager], tbd_writer: Optional[BatchMetricWriter])#

TrainContext gives access to report training and validation metrics to the Determined master during trial tasks.

set_status(status: str) None#

Report a short user-facing string that the WebUI can render to indicate what a trial is working on.

set_metadata(metadata: Dict[str, Any]) Optional[Dict[str, Any]]#

Set the metadata on the current run to the Determined master, overwrite existing metadata on the current run. Returns the metadata that was set.

The metadata is a dictionary of key-value pairs that can be used for analysis, post-processing, or debugging.

get_metadata() Optional[Dict[str, Any]]#

Get the metadata of the current run from the Determined master.

report_training_metrics(steps_completed: int, metrics: Dict[str, Any], batch_metrics: Optional[List[Dict[str, Any]]] = None) None#

Report training metrics to the master.

You can include a list of batch_metrics. Batch metrics are not be shown in the WebUI but may be accessed from the master using the CLI for post-processing.

report_validation_metrics(steps_completed: int, metrics: Dict[str, Any]) None#

Report validation metrics to the master. Note that for hyperparameter search, this is independent of the need to report the searcher metric using SearcherOperation.report_completed() in the Searcher API.

report_metrics(group: str, steps_completed: int, metrics: Dict[str, Any]) None#

Report metrics data to the master.

Parameters:
  • group (string) – metrics group name. Can be used to partition metrics into different logical groups or time series. “training” and “validation” group names map to built-in training and validation time series. Note: Group cannot contain . character.

  • steps_completed (int) – global step number, e.g. the number of batches processed.

  • metrics (Dict[str, Any]) – metrics data dictionary. Must be JSON-serializable. When reporting metrics with the same group and steps_completed values, the dictionary keys must not overlap.

get_tensorboard_path() Path#

Get TensorBoard log directory path.

upload_tensorboard_files(selector: ~typing.Callable[[~pathlib.Path], bool] = <function TrainContext.<lambda>>, mangler: ~typing.Callable[[~pathlib.Path, int], ~pathlib.Path] = <function TrainContext.<lambda>>) None#

Upload files generated for consumption by Tensorboard to checkpoint storage.

Parameters:
  • selector – optional function returning True for a file that should be included. If not provided, all files are uploaded.

  • mangler – optional function modifying the destination file names based on rank.

report_early_exit(reason: EarlyExitReason) None#

Report an early exit reason to the Determined master.

Currenlty, the only meaningful value to report is EarlyExitReason.INVALID_HP, which is reported automatically in core.Context.__exit__() detects an exception of type det.InvalidHP.

get_experiment_best_validation() Optional[float]#

Get the best reported validation metric reported so far, across the whole experiment.

The returned value is the highest or lowest reported validation metric value, using the searcher.metric field of the experiment config as the key and searcher.smaller_is_better for the comparison.

determined.core.SearcherContext#

class determined.core.SearcherContext(session: Session, dist: DistributedContext, trial_id: int, run_id: int, allocation_id: str, units: Optional[Unit] = None)#

SearcherContext gives direct access to operations emitted by the search algorithm in the master. Each SearcherOperation emitted has a (unitless) length that you should train for, then you complete the op by reporting the validation metric you are searching over.

It is the user’s responsibility to execute the required training. Because the user configured the length of the searcher in the experiment configuration, the user should know if the unitless length represents epochs, batches, records, etc.

It is also the user’s responsibility to evaluate the model after training and report the correct metric; if you intend to search over a metric called val_accuracy, you should report val_accuracy.

Lastly, it is recommended (not required) to report progress periodically, so that the webui can accurately reflect current progress. Progress is another unitless length.

Example:

# Assuming you configured the searcher in terms of batches,
# the op.length is also interpeted as a batch count.
# Note that you'll have to load your starting point from a
# checkpoint if you want to support pausing/continuing training.
batches_trained = 0

for op in core_context.searcher.operations():
    # Train for however long the op requires you to.
    # Note that op.length is an absolute length, not an
    # incremental length:
    while batches_trained < op.length:
        my_train_batch()

        batches_trained += 1

        # Reporting progress every batch would be expensive:
        if batches_trained % 1000:
            op.report_progress(batches_trained)

    # After training the required amount, pass your searcher
    # metric to op.report_completed():
    val_metrics = my_validate()
    op.report_completed(val_metrics["my_searcher_metric"])

Note that reporting metrics is completely independent of the SearcherContext API, using core_context.train.report_training_metrics() or core_context.train.report_validation_metrics().

operations(searcher_mode: SearcherMode = SearcherMode.WorkersAskChief, auto_ack: bool = True) Iterator[SearcherOperation]#

Iterate through all the operations this searcher has to offer.

See SearcherMode for details about calling requirements in distributed training scenarios.

After training to the point specified by each SearcherOperation, the chief, and only the chief, must call op.report_completed() on each operation. This is true regardless of the searcher_mode setting because the Determined master needs a clear, unambiguous report of when an operation is completed.

acknowledge_out_of_ops() None#

acknowledge_out_of_ops() tells the Determined master that you are shutting down because you have recognized the searcher has no more operations for you to complete at this time.

This is important for the Determined master to know that it is safe to restart this process should new operations be assigned to this trial.

acknowledge_out_of_ops() is normally called automatically just before operations() raises a StopIteration, unless operations() is called with auto_ack=False.

get_configured_units() Optional[Unit]#

get_configured_units() reports what units were used in the searcher field of the experiment config. If no units were configured, None is returned.

An experiment configured like this causes get_configured_units() to return EPOCHS:

searcher:
  name: single
  max_length:
    epochs: 50

An experiment configured like this causes get_configured_units() to return None:

searcher:
  name: single
  max_length: 50

determined.core.ProfilerContext#

class determined.core.ProfilerContext(agent_id: str, metrics: _MetricsContext, distributed: DistributedContext)#

Gives access to the system profiling feature within Determined.

It is responsible for collecting system metrics at specified time intervals and reporting them to the master. When it is turned on, it spawns two threads that run in the background that collect and send profiling metrics to a Determined master, which are cleaned up when the core.Context exits or the profiler is turned off.

This class is automatically created when the core.Context is initialized and can be turned on/off as such:

with det.core.init() as core_context:
    core_context.profiler.on()
    ...
    core_context.profiler.off()
on(sampling_interval: int = 1, samples_per_report: int = 10) None#

Turns system profiling functionality on.

This method creates two threads, one that collects system metrics at specified time intervals, and another that ships them to the master.

These metrics are persisted in the database and can be viewed in the Determined web UI for the associated trial.

Note

This method is idempotent: if profiling is already on, this method is effectively a no-op.

Parameters:
  • sampling_interval – time (in seconds) between each metric collection.

  • samples_per_report – number of samples to collect before aggregating for report.

off() None#

Turns off profiling.

Sets the internal state of this class and stops any threads that where created.

Note

This method is idempotent: if profiling is already off, this method is effectively a no-op.

determined.core.SearcherOperation#

class determined.core.SearcherOperation(session: Session, trial_id: int, length: int, is_chief: bool)#

A SearcherOperation is a request from the hyperparameter-search logic for the training script to execute one train-validate-report cycle.

Some searchers, such as single, random, or grid, pass only a single SearcherOperation to each trial, while others may pass many SearcherOperations.

Each SearcherOperation has a length attribute representing the cumulative training that should be completed before the validate-report steps of the cycle. The length attribute is absolute, not incremental, meaning that if the searcher wants you to train for 10 units and validate, then train for 10 more units and validate, it emits one SearcherOperation with .length=10 followed by a second SearcherOperation with .length=20. Using absolute lengths instead of incremental lengths makes restarting after crashes simple and robust.

determined.core.PreemptMode#

class determined.core.PreemptMode(value)#

PreemptMode defines the calling behavior of the PreemptContext.should_preempt() call.

When mode is WorkersAskChief (the default), all workers must call should_preempt() in step. Only the chief actually communicates with the master, then the chief broadcasts its decision to all workers. This guarantees that all workers decide to preempt at the exact same time.

When mode is ChiefOnly, only the chief is allowed to call PreemptContext.should_preempt(). Usually this implies you must manually inform the workers if they should preempt or not.

When mode is WorkersAskMaster, each worker contacts the master independently in order to decide to preempt or not. Each worker receives the preemption signal at roughly the same time, but it becomes your responsibility to tolerate situations where some workers have exited due to preemption and others have not.

determined.core.SearcherMode#

class determined.core.SearcherMode(value)#

SearcherMode defines the calling behavior of the SearcherContext.operations() call.

When mode is WorkersAskChief (the default), all workers must call SearcherContext.operations() in step with each other. The chief iterates through searcher operations from the master and then propagates the operations to each worker, introducing a synchronization point between workers.

When mode is ChiefOnly, only the chief may call SearcherContext.operations(). Usually this implies you must manually inform the workers of what work to do next.

determined.core.TensorboardMode#

class determined.core.TensorboardMode(value)#

TensorboardMode defines how Tensorboard artifacts are handled.

In AUTO mode the chief automatically writes any reported training or validation metrics to the Tensorboard path (see TrainContext.get_tensorboard_path()), and automatically uploads all of its own tensorboard artifacts to checkpoint storage. Tensorboard artifacts written by non-chief workers will not be uploaded at all. This is the same behavior that existed prior to 0.18.3.

In MANUAL mode no Tensorboard artifacts are written or uploaded at all. It is entirely up to the user to write their desired metrics and upload them with calls to TrainContext.upload_tensorboard_files().

determined.TrialInfo#

class determined.TrialInfo(trial_id: int, experiment_id: int, trial_seed: int, hparams: Dict[str, Any], config: Dict[str, Any], steps_completed: int, trial_run_id: int, debug: bool, inter_node_network_interface: Optional[str])#
experiment_id#

The Experiment ID for the current task.

hparams#

The hyperparameter values selected for the current Trial.

trial_id#

The Trial ID for the current task.

trial_seed#

The random seed for the current Trial.