det.core
API Reference#
User Guide |
---|
determined.core.init
#
- determined.core.init(*, distributed: Optional[DistributedContext] = None, checkpoint_storage: Optional[Union[str, Dict[str, Any]]] = None, preempt_mode: PreemptMode = PreemptMode.WorkersAskChief, tensorboard_mode: TensorboardMode = TensorboardMode.AUTO) Context #
core.init()
builds acore.Context
for use with the Core API.Always use
with core.init() as context
instead of instantiating acore.Context
directly. Certain components of the Core API may be configured by passing arguments tocore.init()
. The only arg that is required is aDistributedContext
, and even that is only required for multi-slot tasks.All of your training must occur within the scope of the
with core.init() as core_context
, as there are resources necessary for training which start in thecore.Context
’s__enter__
method and must be cleaned up in its__exit__()
method.- Parameters:
distributed (
core.DistributedContext
, optional) – Passing aDistributedContext
is required for multi-slot training, but unnecessary for single-slot training. Defaults toNone
.preempt_mode (
core.PreemptMode
, optional) – Configure the calling pattern for thecore_context.preempt.should_preempt()
method. SeePreemptMode
for more detail. Defaults toWorkersAskChief
.checkpoint_storage (
Union[str, dict]
, optional) – A directory path or a cloud storage URI of the forms3://<bucket>[/<prefix>]
(AWS) orgs://<bucket>[/<prefix>]
(GCP). This should only be used when IAM permissions can be assumed. You may also pass a dictionary matching thecheckpoint_storage
field of the experiment config, with the exception thattype: shared_fs
configs are not allowed.tensorboard_mode (
core.TensorboardMode
, optional) – Define how Tensorboard metrics and profiling data are retained. SeeTensorboardMode`
for more detail. Defaults toAUTO
.
determined.core.Context
#
- class determined.core.Context(checkpoint: CheckpointContext, _session: Optional[Session] = None, distributed: Optional[DistributedContext] = None, preempt: Optional[PreemptContext] = None, train: Optional[TrainContext] = None, searcher: Optional[SearcherContext] = None, info: Optional[ClusterInfo] = None, experimental: Optional[ExperimentalCoreContext] = None, profiler: Optional[ProfilerContext] = None, _metrics: Optional[_MetricsContext] = None, _tensorboard_manager: Optional[TensorboardManager] = None, _heartbeat: Optional[_Heartbeat] = None, _log_shipper: Optional[_LogShipper] = None)#
core.Context
is a simple composition of several component APIs, with the following public members:.checkpoint
, aCheckpointContext
.distributed
, aDistributedContext
.preempt
, aPreemptContext
.searcher
, aSearcherContext
.train
, aTrainContext
.profiler
, aProfilerContext
core.Context
is a tool for integrating arbitrary distributed tasks into a Determined cluster.You should always use
core.init()
instead of creating a core.Context manually.
determined.core.DistributedContext
#
- class determined.core.DistributedContext(*, rank: int, size: int, local_rank: int, local_size: int, cross_rank: int, cross_size: int, chief_ip: Optional[str] = None, pub_port: int = 12360, pull_port: int = 12376, port_offset: int = 0, force_tcp: bool = False)#
DistributedContext provides useful methods for effective distributed training.
- A DistributedContext has the following required args:
rank: the index of this worker in the entire job
size: the number of workers in the entire job
local_rank: the index of this worker on this machine
local_size: the number of workers on this machine
cross_rank: the index of this machine in the entire job
cross_size: the number of machines in the entire job
- Additionally, any time that cross_size > 1, you must also provide:
chief_ip: the ip address to reach the chief worker (where rank==0)
Note
DistributedContext has
.allgather()
,.gather()
, and.broadcast()
methods, which are easy to use and which can be useful for coordinating work across workers, but it is not a replacement for the allgather/gather/broadcast operations in your particular distributed training framework.- classmethod from_horovod(hvd: Any, chief_ip: Optional[str] = None) DistributedContext #
Create a
DistributedContext
using the providedhvd
module to determine rank information.Example:
import horovod.torch as hvd hvd.init() distributed = DistributedContext.from_horovod(hvd)
The IP address for the chief worker is required whenever
hvd.cross_size() > 1
. The value may be provided using thechief_ip
argument or theDET_CHIEF_IP
environment variable.
- classmethod from_deepspeed(chief_ip: Optional[str] = None) DistributedContext #
Create a
DistributedContext
using the standard deepspeed environment variables to determine rank information.The IP address for the chief worker is required whenever CROSS_SIZE > 1. The value may be provided using the chief_ip argument or the DET_CHIEF_IP environment variable.
- classmethod from_torch_distributed(chief_ip: Optional[str] = None) DistributedContext #
Create a DistributedContext using the standard torch distributed environment variables to determine rank information.
The IP address for the chief worker is required whenever CROSS_SIZE > 1. The value may be provided via the chief_ip argument or the DET_CHIEF_IP environment variable.
- get_rank() int #
Return the rank of the process in the trial. The rank of a process is a unique ID within the trial. That is, no two processes in the same trial are assigned the same rank.
- get_local_rank() int #
Return the rank of the process on the agent. The local rank of a process is a unique ID within a given agent and trial; that is, no two processes in the same trial that are executing on the same agent are assigned the same rank.
- get_size() int #
Return the number of slots this trial is running on.
- get_num_agents() int #
Return the number of agents this trial is running on.
- gather(stuff: Any) Optional[List] #
Gather
stuff
to the chief. The chief returns a list of all stuff, and workers returnNone
.gather()
is not a replacement for the gather functionality of your distributed training framework.
- gather_local(stuff: Any) Optional[List] #
Gather
stuff
to the local chief. The local chief returns a list of all stuff, and local workers returnNone
.gather_local()
is not a replacement for the gather functionality of your distributed training framework.
- allgather(stuff: Any) List #
Gather
stuff
to the chief and broadcast all of it back to the workers.allgather()
is not a replacement for the allgather functionality of your distributed training framework.
- allgather_local(stuff: Any) List #
Gather
stuff
to the local chief and broadcast all of it back to the local workers.allgather_local()
is not a replacement for the allgather functionality of your distributed training framework.
- broadcast(stuff: Any) Any #
Every worker gets the
stuff
sent by the chief.broadcast()
is not a replacement for the broadcast functionality of your distributed training framework.
- broadcast_local(stuff: Optional[Any] = None) Any #
Every worker gets the
stuff
sent by the local chief.broadcast_local()
is not a replacement for the broadcast functionality of your distributed training framework.
determined.core.CheckpointContext
#
- class determined.core.CheckpointContext(dist: DistributedContext, storage_manager: StorageManager, session: Session, task_id: str, allocation_id: Optional[str], tbd_sync_mode: TensorboardMode, tensorboard_manager: Optional[TensorboardManager], storage_backend_id: Optional[int])#
CheckpointContext
gives access to checkpoint-related features of a Determined cluster.- upload(ckpt_dir: Optional[Union[str, PathLike]], metadata: Optional[Dict[str, Any]] = None, *, shard: bool = False, selector: Optional[Callable[[str], bool]] = None) str #
upload()
chooses a randomstorage_id
, then uploads the contents ofckpt_dir
to checkpoint storage into a directory by the name of thestorage_id
. The name of theckpt_dir
is not preserved.When
shard=False
, only the chief worker (distributed.rank==0
) may callupload()
.When
shard=True
,upload()
becomes a synchronization point between workers, so all workers must call upload(). Those workers with nothing to upload may passckpt_dir=None
. The final checkpoint stored in checkpoint storage will contain a union of the contents from each ckpt_dir.Each worker may optionally provide a
selector
that accepts a path relative to the checkpoint root, and returns True for paths that should be uploaded.Returns: The
storage_id
for this checkpoint.Example:
if core_context.distributed.rank == 0: storage_id = core_context.checkpoint.upload(ckpt_dir, shard=False) print(f"done uploading checkpoint {storage_id}")
- download(storage_id: str, ckpt_dir: Union[str, PathLike], download_mode: DownloadMode = DownloadMode.LocalWorkersShareDownload, *, selector: Optional[Callable[[str], bool]] = None) None #
Download the contents of a checkpoint from checkpoint storage into a directory specified by
ckpt_dir
, which is created if it does not exist.Note
This
.download()
method is similar to but less flexible than the.download()
method of theCheckpoint
class in the Determined Python SDK. This.download()
is here as a convenience.
- get_metadata(storage_id: str) Dict[str, Any] #
Returns the current metadata associated with the checkpoint.
- store_path(metadata: Optional[Dict[str, Any]] = None, *, shard: bool = False) Iterator[Tuple[Path, str]] #
store_path()
is a context manager which chooses a random path and prepares a directory you should save your model to. When the context manager exits, the model is automatically uploaded (at least, for cloud-backed checkpoint storage backends).Note
metadata must include a ‘steps_completed’ key in the current implementation. Raises ValueError if the ‘steps_completed’ key is not present in the metadata dictionary.
When
shard=False
, only the chief worker (distributed.rank==0
) may callstore_path()
.When
shard=True
,store_path()
becomes a synchronization point between workers, so all workers must call store_path(), even workers which will not write any checkpoint files.Example:
if core_context.distributed.rank == 0: with core_context.checkpoint.store_path(shard=False) as (path, storage_id): my_save_model(my_model, path) print(f"done saving checkpoint {storage_id}") print(f"done uploading checkpoint {storage_id}")
- restore_path(storage_id: str, download_mode: DownloadMode = DownloadMode.LocalWorkersShareDownload, *, selector: Optional[Callable[[str], bool]] = None) Iterator[Path] #
restore_path()
is a context manager which downloads a checkpoint (if required by the storage backend) and cleans up the temporary files afterwards (if applicable).In multi-worker scenarios, with the default
download_mode
(LocalWorkersShareDownload
), all workers must callrestore_path()
but only the local chief worker on each node (distributed.local_rank==0
) actually downloads data.Example:
with core_context.checkpoint.restore_path(my_checkpoint_uuid) as path: my_model = my_load_model(path)
- delete(storage_id: str) None #
Delete a checkpoint from the storage backend and notify the master.
determined.core.PreemptContext
#
- class determined.core.PreemptContext(session: Session, allocation_id: str, dist: DistributedContext, preempt_mode: PreemptMode = PreemptMode.WorkersAskChief)#
PreemptContext
gives access to preemption signals that originate from a user action, such as pausing an experiment using the WebUI or the CLI, from in the Determined scheduler.- should_preempt(auto_ack: bool = True) bool #
should_preempt()
returnsTrue
if the task should shut itself down, orFalse
otherwise.The requirements on the the caller and the synchronization between workers during a call to
should_preempt()
are defined by thepreempt_mode
argument passed to thePreemptContext
constructor.- Parameters:
auto_ack (
bool
, optional) – In order for the task to be restarted by the Determined master after shutting down due to preemption, the task must acknowledge the preemption signal to the Determined master. Whenauto_ack
isTrue
this acknowledgement is automatically sent the first time thatshould_preempt()
returnsTrue
. If you might choose not to exit after receiving the preemption signal (but you still want to check the signal for some purpose), then you should setauto_ack
toFalse
. Then if you later do decide to comply with the preemption signal, it is your responsibility to callacknowledge_preemption_signal()
manually any time before exiting. Defaults toTrue
.
Note
Currently, only blocking behavior is supported when checking
should_preempt()
, so it is not performant enough to call every batch.
- acknowledge_preemption_signal() None #
acknowledge_preemption_signal()
tells the Determined master that you are shutting down, but you have not finished your work and you expect to be restarted later to complete it.This is important to tell the master explicitly because otherwise if the python process exits with a zero exit code, the master interprets that as a completed task, and the task does not get rescheduled.
By default,
acknowledge_preemption_signal()
is called automatically the first time thatshould_preempt()
returnsTrue
, unlessshould_preempt()
is called withauto_ack=False
.
determined.core.TrainContext
#
- class determined.core.TrainContext(session: Session, trial_id: int, exp_id: int, metrics: _MetricsContext, distributed: DistributedContext, tensorboard_mode: TensorboardMode, tensorboard_manager: Optional[TensorboardManager], tbd_writer: Optional[BatchMetricWriter])#
TrainContext
gives access to report training and validation metrics to the Determined master during trial tasks.- set_status(status: str) None #
Report a short user-facing string that the WebUI can render to indicate what a trial is working on.
- set_metadata(metadata: Dict[str, Any]) Optional[Dict[str, Any]] #
Set the metadata on the current run to the Determined master, overwrite existing metadata on the current run. Returns the metadata that was set.
The metadata is a dictionary of key-value pairs that can be used for analysis, post-processing, or debugging.
- get_metadata() Optional[Dict[str, Any]] #
Get the metadata of the current run from the Determined master.
- report_training_metrics(steps_completed: int, metrics: Dict[str, Any], batch_metrics: Optional[List[Dict[str, Any]]] = None) None #
Report training metrics to the master.
You can include a list of
batch_metrics
. Batch metrics are not be shown in the WebUI but may be accessed from the master using the CLI for post-processing.
- report_validation_metrics(steps_completed: int, metrics: Dict[str, Any]) None #
Report validation metrics to the master. Note that for hyperparameter search, this is independent of the need to report the searcher metric using
SearcherOperation.report_completed()
in the Searcher API.
- report_metrics(group: str, steps_completed: int, metrics: Dict[str, Any]) None #
Report metrics data to the master.
- Parameters:
group (string) – metrics group name. Can be used to partition metrics into different logical groups or time series. “training” and “validation” group names map to built-in training and validation time series. Note: Group cannot contain
.
character.steps_completed (int) – global step number, e.g. the number of batches processed.
metrics (Dict[str, Any]) – metrics data dictionary. Must be JSON-serializable. When reporting metrics with the same
group
andsteps_completed
values, the dictionary keys must not overlap.
- get_tensorboard_path() Path #
Get TensorBoard log directory path.
- upload_tensorboard_files(selector: ~typing.Callable[[~pathlib.Path], bool] = <function TrainContext.<lambda>>, mangler: ~typing.Callable[[~pathlib.Path, int], ~pathlib.Path] = <function TrainContext.<lambda>>) None #
Upload files generated for consumption by Tensorboard to checkpoint storage.
- Parameters:
selector – optional function returning True for a file that should be included. If not provided, all files are uploaded.
mangler – optional function modifying the destination file names based on rank.
- report_early_exit(reason: EarlyExitReason) None #
Report an early exit reason to the Determined master.
Currenlty, the only meaningful value to report is
EarlyExitReason.INVALID_HP
, which is reported automatically incore.Context.__exit__()
detects an exception of typedet.InvalidHP
.
- get_experiment_best_validation() Optional[float] #
Get the best reported validation metric reported so far, across the whole experiment.
The returned value is the highest or lowest reported validation metric value, using the
searcher.metric
field of the experiment config as the key andsearcher.smaller_is_better
for the comparison.
determined.core.SearcherContext
#
- class determined.core.SearcherContext(session: Session, dist: DistributedContext, trial_id: int, run_id: int, allocation_id: str, units: Optional[Unit] = None)#
SearcherContext
gives direct access to operations emitted by the search algorithm in the master. EachSearcherOperation
emitted has a (unitless) length that you should train for, then you complete the op by reporting the validation metric you are searching over.It is the user’s responsibility to execute the required training. Because the user configured the length of the searcher in the experiment configuration, the user should know if the unitless length represents epochs, batches, records, etc.
It is also the user’s responsibility to evaluate the model after training and report the correct metric; if you intend to search over a metric called val_accuracy, you should report val_accuracy.
Lastly, it is recommended (not required) to report progress periodically, so that the webui can accurately reflect current progress. Progress is another unitless length.
Example:
# Assuming you configured the searcher in terms of batches, # the op.length is also interpeted as a batch count. # Note that you'll have to load your starting point from a # checkpoint if you want to support pausing/continuing training. batches_trained = 0 for op in core_context.searcher.operations(): # Train for however long the op requires you to. # Note that op.length is an absolute length, not an # incremental length: while batches_trained < op.length: my_train_batch() batches_trained += 1 # Reporting progress every batch would be expensive: if batches_trained % 1000: op.report_progress(batches_trained) # After training the required amount, pass your searcher # metric to op.report_completed(): val_metrics = my_validate() op.report_completed(val_metrics["my_searcher_metric"])
Note that reporting metrics is completely independent of the SearcherContext API, using
core_context.train.report_training_metrics()
orcore_context.train.report_validation_metrics()
.- operations(searcher_mode: SearcherMode = SearcherMode.WorkersAskChief, auto_ack: bool = True) Iterator[SearcherOperation] #
Iterate through all the operations this searcher has to offer.
See
SearcherMode
for details about calling requirements in distributed training scenarios.After training to the point specified by each
SearcherOperation
, the chief, and only the chief, must callop.report_completed(
) on each operation. This is true regardless of thesearcher_mode
setting because the Determined master needs a clear, unambiguous report of when an operation is completed.
- acknowledge_out_of_ops() None #
acknowledge_out_of_ops() tells the Determined master that you are shutting down because you have recognized the searcher has no more operations for you to complete at this time.
This is important for the Determined master to know that it is safe to restart this process should new operations be assigned to this trial.
acknowledge_out_of_ops() is normally called automatically just before operations() raises a StopIteration, unless operations() is called with auto_ack=False.
- get_configured_units() Optional[Unit] #
get_configured_units() reports what units were used in the searcher field of the experiment config. If no units were configured, None is returned.
An experiment configured like this causes
get_configured_units()
to return EPOCHS:searcher: name: single max_length: epochs: 50
An experiment configured like this causes
get_configured_units()
to return None:searcher: name: single max_length: 50
determined.core.ProfilerContext
#
- class determined.core.ProfilerContext(agent_id: str, metrics: _MetricsContext, distributed: DistributedContext)#
Gives access to the system profiling feature within Determined.
It is responsible for collecting system metrics at specified time intervals and reporting them to the master. When it is turned on, it spawns two threads that run in the background that collect and send profiling metrics to a Determined master, which are cleaned up when the
core.Context
exits or the profiler is turned off.This class is automatically created when the
core.Context
is initialized and can be turned on/off as such:with det.core.init() as core_context: core_context.profiler.on() ... core_context.profiler.off()
- on(sampling_interval: int = 1, samples_per_report: int = 10) None #
Turns system profiling functionality on.
This method creates two threads, one that collects system metrics at specified time intervals, and another that ships them to the master.
These metrics are persisted in the database and can be viewed in the Determined web UI for the associated trial.
Note
This method is idempotent: if profiling is already on, this method is effectively a no-op.
- Parameters:
sampling_interval – time (in seconds) between each metric collection.
samples_per_report – number of samples to collect before aggregating for report.
- off() None #
Turns off profiling.
Sets the internal state of this class and stops any threads that where created.
Note
This method is idempotent: if profiling is already off, this method is effectively a no-op.
determined.core.SearcherOperation
#
- class determined.core.SearcherOperation(session: Session, trial_id: int, length: int, is_chief: bool)#
A
SearcherOperation
is a request from the hyperparameter-search logic for the training script to execute one train-validate-report cycle.Some searchers, such as single, random, or grid, pass only a single
SearcherOperation
to each trial, while others may pass manySearcherOperations
.Each
SearcherOperation
has a length attribute representing the cumulative training that should be completed before the validate-report steps of the cycle. The length attribute is absolute, not incremental, meaning that if the searcher wants you to train for 10 units and validate, then train for 10 more units and validate, it emits oneSearcherOperation
with.length=10
followed by a secondSearcherOperation
with.length=20
. Using absolute lengths instead of incremental lengths makes restarting after crashes simple and robust.
determined.core.PreemptMode
#
- class determined.core.PreemptMode(value)#
PreemptMode
defines the calling behavior of thePreemptContext.should_preempt()
call.When mode is
WorkersAskChief
(the default), all workers must callshould_preempt()
in step. Only the chief actually communicates with the master, then the chief broadcasts its decision to all workers. This guarantees that all workers decide to preempt at the exact same time.When mode is
ChiefOnly
, only the chief is allowed to callPreemptContext.should_preempt()
. Usually this implies you must manually inform the workers if they should preempt or not.When mode is
WorkersAskMaster
, each worker contacts the master independently in order to decide to preempt or not. Each worker receives the preemption signal at roughly the same time, but it becomes your responsibility to tolerate situations where some workers have exited due to preemption and others have not.
determined.core.SearcherMode
#
- class determined.core.SearcherMode(value)#
SearcherMode
defines the calling behavior of theSearcherContext.operations()
call.When mode is
WorkersAskChief
(the default), all workers must callSearcherContext.operations()
in step with each other. The chief iterates through searcher operations from the master and then propagates the operations to each worker, introducing a synchronization point between workers.When mode is
ChiefOnly
, only the chief may callSearcherContext.operations()
. Usually this implies you must manually inform the workers of what work to do next.
determined.core.TensorboardMode
#
- class determined.core.TensorboardMode(value)#
TensorboardMode
defines how Tensorboard artifacts are handled.In
AUTO
mode the chief automatically writes any reported training or validation metrics to the Tensorboard path (seeTrainContext.get_tensorboard_path()
), and automatically uploads all of its own tensorboard artifacts to checkpoint storage. Tensorboard artifacts written by non-chief workers will not be uploaded at all. This is the same behavior that existed prior to 0.18.3.In
MANUAL
mode no Tensorboard artifacts are written or uploaded at all. It is entirely up to the user to write their desired metrics and upload them with calls toTrainContext.upload_tensorboard_files()
.
determined.TrialInfo
#
- class determined.TrialInfo(trial_id: int, experiment_id: int, trial_seed: int, hparams: Dict[str, Any], config: Dict[str, Any], steps_completed: int, trial_run_id: int, debug: bool, inter_node_network_interface: Optional[str])#
- experiment_id#
The Experiment ID for the current task.
- hparams#
The hyperparameter values selected for the current Trial.
- trial_id#
The Trial ID for the current task.
- trial_seed#
The random seed for the current Trial.