Estimator API Reference¶
User Guide |
---|
determined.estimator.EstimatorTrial
¶
- class determined.estimator.EstimatorTrial(context: determined.estimator._estimator_context.EstimatorTrialContext)¶
By default, experiments run with TensorFlow 1.x. To configure your trial to use TensorFlow 2.x, set a TF 2.x image in the experiment configuration (e.g.
determinedai/environments:cuda-11.3-pytorch-1.10-tf-2.8-gpu-0.19.10
).EstimatorTrial
supports TF 2.x; however it uses TensorFlow V1 behavior. We have disabled TensorFlow V2 behavior forEstimatorTrial
, so there is no need for you to disable it.- trial_context_class¶
alias of
determined.estimator._estimator_context.EstimatorTrialContext
- __init__(context: determined.estimator._estimator_context.EstimatorTrialContext)¶
Initializes a trial using the provided
context
.This method should typically be overridden by trial definitions: at minimum, it is important to store
context
as an instance variable so that it can be accessed by other methods of the trial class. This can also be a convenient place to initialize other state that is shared between the estimator, train spec, and/or validation spec.
- abstract build_estimator() tensorflow_estimator.python.estimator.estimator.EstimatorV2 ¶
Specifies the tf.estimator.Estimator instance to be used during training and validation. This may be an instance of a Premade Estimator provided by the TensorFlow team, or a Custom Estimator created by the user.
- abstract build_train_spec() tensorflow_estimator.python.estimator.training.TrainSpec ¶
Specifies the tf.estimator.TrainSpec to be used for training steps. This training specification will contain a TensorFlow
input_fn
which constructs the input data for a training step. Unlike the standard TensorFlowinput_fn
interface,EstimatorTrial
only supports aninput_fn
that returns atf.data.Dataset
object. A function that returns a tuple of features and labels is currently not supported byEstimatorTrial
. Additionally, themax_steps
attribute of the training specification will be ignored; instead, thescheduling_unit
option in the experiment configuration is used to determine how many batches each training workload uses.
- abstract build_validation_spec() tensorflow_estimator.python.estimator.training.EvalSpec ¶
Specifies the tf.estimator.EvalSpec to be used for validation steps. This evaluation spec will contain a TensorFlow
input_fn
which constructs the input data for a validation step. The validation step will evaluatesteps
batches, or evaluate until theinput_fn
raises an end-of-input exception ifsteps
isNone
.
- build_serving_input_receiver_fns() Dict[str, Callable[[...], Union[tensorflow_estimator.python.estimator.export.export.ServingInputReceiver, tensorflow_estimator.python.estimator.export.export.TensorServingInputReceiver]]] ¶
Optionally returns a Python dictionary mapping string names to serving_input_receiver_fn s. If specified, each serving input receiver function will be used to export a distinct SavedModel inference graph when a Determined checkpoint is saved, using Estimator.export_saved_model. The exported models are saved under subdirectories named by the keys of the respective serving input receiver functions. For example, returning
{ "raw": tf.estimator.export.build_raw_serving_input_receiver_fn(...), "parsing": tf.estimator.export.build_parsing_serving_input_receiver_fn(...) }
from this function would configure Determined to export two
SavedModel
inference graphs in every checkpoint underraw
andparsing
subdirectories, respectively. By default, this function returns an empty dictionary and the Determined checkpoint directory only contains metadata associated with the training graph.
determined.estimator.EstimatorTrialContext
¶
- class determined.estimator.EstimatorTrialContext(*arg: Any, **kwarg: Any)¶
Bases:
determined._trial_context.TrialContext
,determined.estimator._reducer._EstimatorReducerContext
Base context class that contains runtime information for any Determined workflow that uses the
tf.estimator
API.EstimatorTrialContext always has a
DistributedContext
accessible viacontext.distributed
for information related to distributed training.EstimatorTrialContext always has a
EstimatorExperimentalContext
accessible viacontext.experimental
for information related to experimental features.- get_global_batch_size() int ¶
Return the global batch size.
- get_per_slot_batch_size() int ¶
Return the per-slot batch size. When a model is trained with a single GPU, this is equal to the global batch size. When multi-GPU training is used, this is equal to the global batch size divided by the number of GPUs used to train the model.
- wrap_optimizer(optimizer: Any) Any ¶
This should be used to wrap optimizer objects immediately after they have been created. Users should use the output of this wrapper as the new instance of their optimizer. For example, if users create their optimizer within
build_estimator()
, they should calloptimizer = wrap_optimizer(optimizer)
prior to passing the optimizer into their Estimator.
- wrap_dataset(dataset: Any, shard_dataset: bool = True) Any ¶
This should be used to wrap
tf.data.Dataset
objects immediately after they have been created. Users should use the output of this wrapper as the new instance of their dataset. If users create multiple datasets (e.g., one for training and one for testing), users should wrap each dataset independently. E.g., If users instantiate their training dataset withinbuild_train_spec()
, they should calldataset = wrap_dataset(dataset)
prior to passing it intotf.estimator.TrainSpec
.- Parameters
dataset – tf.data.Dataset
shard_dataset – When performing multi-slot (distributed) training, this controls whether the dataset is sharded so that each training process (one per slot) sees unique data. If set to False, users must manually configure each process to use unique data.
- classmethod from_config(config: Dict[str, Any]) determined._trial_context.TrialContext ¶
Create a context object suitable for debugging outside of Determined.
An example for a subclass of
PyTorchTrial
:config = { ... } context = det.pytorch.PyTorchTrialContext.from_config(config) my_trial = MyPyTorchTrial(context) train_ds = my_trial.build_training_data_loader() for epoch_idx in range(3): for batch_idx, batch in enumerate(train_ds): metrics = my_trial.train_batch(batch, epoch_idx, batch_idx) ...
An example for a subclass of
TFKerasTrial
:config = { ... } context = det.keras.TFKerasTrialContext.from_config(config) my_trial = tf_keras_one_var_model.OneVarTrial(context) model = my_trial.build_model() model.fit(my_trial.build_training_data_loader()) eval_metrics = model.evaluate(my_trial.build_validation_data_loader())
- Parameters
config – An experiment config file, in dictionary form.
- get_data_config() Dict[str, Any] ¶
Return the data configuration.
- get_experiment_config() Dict[str, Any] ¶
Return the experiment configuration.
- get_experiment_id() int ¶
Return the experiment ID of the current trial.
- get_hparam(name: str) Any ¶
Return the current value of the hyperparameter with the given name.
- get_hparams() Dict[str, Any] ¶
Return a dictionary of hyperparameter names to values.
- get_stop_requested() bool ¶
Return whether a trial stoppage has been requested.
- get_tensorboard_path() pathlib.Path ¶
Get the path where files for consumption by TensorBoard should be written
- get_trial_id() int ¶
Return the trial ID of the current trial.
- make_metric(metric: Any, reducer: Union[Callable[[List[Any]], Any], determined.estimator._reducer.MetricReducer], numpy_dtype: Any) Tuple[tensorflow.python.framework.ops.Operation, tensorflow.python.framework.ops.Operation] ¶
Return an estimator-compatible validation metric which will be calculated properly, even during distributed evaluation.
During distributed evaluation, many types of metrics calculated via
tf.metrics
ortf.keras.metrics
cannot be aggregated properly from the per-slot final metrics calculated by each separate Estimator replica. One example istf.metrics.auc
, where the ROC AUC calculated over predictions and labels from a full dataset cannot be derived from the individual ROC AUC metrics evaluated over several shards of a dataset.Determined solves this problem by offering customizable metrics which are Estimator-compatible. For example, ROC AUC could be properly calculated during distributed evaluation by calling
sklearn.metrics.roc_auc_score
in a customreducer
function passed tomake_metric
.The
metric
input can be a tensor, a list of tensors, or a dictionary of tensors. Nested structures are not supported.The
reducer
should be either a single function that can calculate the metric from a list of the per-batch values ofmetric
, or it can be an instance of adet.estimator.MetricReducer
.The
numpy_dtype
must be a numpy dtype. It is used internally to determine the output type of the TensorFlowpy_func
to report the final metric result to the Estimator API. The format ofnumpy_dtype
should be anything thatnp.dtype()
accepts.The primary motivation for passing a function as the reducer is simplicity. Metrics from all batches will be buffered in memory and passed over the network where they will be reduced all at once. This introduces some overhead, but it is likely unnoticeable for scalar metrics or on validation datasets of small or medium size. This single function strategy may also be desirable for quick prototyping or for calculating metrics that are difficult or impossible to calculate incrementally.
The primary motivation for passing a
det.estimator.MetricsReducer
as the reducer is performance.det.estimator.MetricsReducer
allows the user to incrementally calculate the partial metric on each slot, taking advantage of distributed computation, minimizing memory usage, and minimizing the network communication before the finalcross_slot_reduce
operation.Evaluation performance may be improved by precomputing as much as possible in the graph so that less computation on the
metric
value is required within the reducer.Example usage where
reducer
is a function:def my_mean_reducer(all_batch_metrics): # Use hstack in case not all batches are equal length. return np.mean(np.hstack(all_batch_metrics)) def my_estimator_model_function(features, labels, mode): ... if mode == tf.estimator.ModeKeys.EVAL: my_avg_prediction = context.make_metric( metric=predictions, reducer=my_mean_reducer, numpy_dtype=np.float32 ) return tf.estimator.EstimatorSpec( mode, loss=loss, eval_metric_ops={"my_avg_prediction": my_avg_prediction}, )
- set_stop_requested(stop_requested: bool) None ¶
Set a flag to request a trial stoppage. When this flag is set to True, we finish the step, checkpoint, then exit.
determined.estimator.EstimatorTrialContext.distributed
¶
- class determined.core._distributed.DistributedContext(*, rank: int, size: int, local_rank: int, local_size: int, cross_rank: int, cross_size: int, chief_ip: Optional[str] = None, pub_port: int = 12360, pull_port: int = 12376, port_offset: int = 0, force_tcp: bool = False)
DistributedContext provides useful methods for effective distributed training.
- A DistributedContext has the following required args:
rank: the index of this worker in the entire job
size: the number of workers in the entire job
local_rank: the index of this worker on this machine
local_size: the number of workers on this machine
cross_rank: the index of this machine in the entire job
cross_size: the number of machines in the entire job
- Additionally, any time that cross_size > 1, you must also provide:
chief_ip: the ip address to reach the chief worker (where rank==0)
Note
DistributedContext has
.allgather()
,.gather()
, and.broadcast()
methods, which are easy to use and which can be useful for coordinating work across workers, but it is not a replacement for the allgather/gather/broadcast operations in your particular distributed training framework.- classmethod from_horovod(hvd: Any, chief_ip: Optional[str] = None) determined.core._distributed.DistributedContext
Create a
DistributedContext
using the providedhvd
module to determine rank information.Example:
import horovod.torch as hvd hvd.init() distributed = DistributedContext.from_horovod(hvd)
The IP address for the chief worker is required whenever
hvd.cross_size() > 1
. The value may be provided using thechief_ip
argument or theDET_CHIEF_IP
environment variable.
- classmethod from_deepspeed(chief_ip: Optional[str] = None) determined.core._distributed.DistributedContext
Create a
DistributedContext
using the standard deepspeed environment variables to determine rank information.The IP address for the chief worker is required whenever CROSS_SIZE > 1. The value may be provided using the chief_ip argument or the DET_CHIEF_IP environment variable.
- classmethod from_torch_distributed(chief_ip: Optional[str] = None) determined.core._distributed.DistributedContext
Create a DistributedContext using the standard torch distributed environment variables to determine rank information.
The IP address for the chief worker is required whenever CROSS_SIZE > 1. The value may be provided via the chief_ip argument or the DET_CHIEF_IP environment variable.
- get_rank() int
Return the rank of the process in the trial. The rank of a process is a unique ID within the trial. That is, no two processes in the same trial are assigned the same rank.
- get_local_rank() int
Return the rank of the process on the agent. The local rank of a process is a unique ID within a given agent and trial; that is, no two processes in the same trial that are executing on the same agent are assigned the same rank.
- get_size() int
Return the number of slots this trial is running on.
- get_num_agents() int
Return the number of agents this trial is running on.
- gather(stuff: Any) Optional[List]
Gather
stuff
to the chief. The chief returns a list of all stuff, and workers returnNone
.gather()
is not a replacement for the gather functionality of your distributed training framework.
- gather_local(stuff: Any) Optional[List]
Gather
stuff
to the local chief. The local chief returns a list of all stuff, and local workers returnNone
.gather_local()
is not a replacement for the gather functionality of your distributed training framework.
- allgather(stuff: Any) List
Gather
stuff
to the chief and broadcast all of it back to the workers.allgather()
is not a replacement for the allgather functionality of your distributed training framework.
- allgather_local(stuff: Any) List
Gather
stuff
to the local chief and broadcast all of it back to the local workers.allgather_local()
is not a replacement for the allgather functionality of your distributed training framework.
- broadcast(stuff: Any) Any
Every worker gets the
stuff
sent by the chief.broadcast()
is not a replacement for the broadcast functionality of your distributed training framework.
- broadcast_local(stuff: Optional[Any] = None) Any
Every worker gets the
stuff
sent by the local chief.broadcast_local()
is not a replacement for the broadcast functionality of your distributed training framework.
determined.estimator.EstimatorExperimentalContext
¶
- class determined.estimator.EstimatorExperimentalContext¶
Context class that contains experimental runtime information and features for any Determined workflow that uses the
tf.estimator
API.EstimatorExperimentalContext
extendsEstimatorTrialContext
under thecontext.experimental
namespace.
determined.estimator.MetricReducer
¶
- class determined.estimator.MetricReducer¶
Efficiently aggregating validation metrics across a multi-slot distributed evaluation is done in two steps:
Accumulate metrics from each batch on each slot. In the case of calculating a mean, this might mean keeping a running sum and a count of metrics received.
Reduce metrics from each slot to calculate the final metric. In the case of calculating a mean, this might mean adding up the per-slot sums and dividing the result by the per-slot counts.
Example implementation and usage:
class MyAvgMetricReducer(estimator.MetricReducer): def __init__(self): self.sum = 0 self.counts = 0 def accumulate(self, metric): self.sum += sum(metric) self.counts += 1 return self.sum, self.counts def cross_slot_reduce(self, per_slot_metrics): # per_slot_metrics is a list of (sum, counts) tuples # returned by the final self.accumulate() on each slot sums, counts = zip(*per_slot_metrics) return sum(sums) / sum(counts) def my_estimator_model_function(features, labels, mode): ... if mode == tf.estimator.ModeKeys.EVAL: my_avg_prediction = context.make_metric( metric=predictions, reducer=MyAvgMetricReducer(), numpy_dtype=np.float32 ) return tf.estimator.EstimatorSpec( mode, loss=loss, eval_metric_ops={"my_avg_prediction": my_avg_prediction}, )
See also:
context.make_metric()
.- abstract accumulate(metric: Any) Any ¶
accumulate is called for each batch in the evaluation dataset. Batches will be distributed across slots, so accumulate will be called many times on each slot.
accumulate should return the accumulated state. After evaluation is complete, the final return value of accumulate will become an element of the per_slot_metrics argument to cross_slot_reduce.
In the example of the calculating a distributed mean, accumulate might keep a running sum and a count of metrics received:
def accumulate(self, metric): self.sum += metric self.count += 1 return self.sum, self.count
- abstract cross_slot_reduce(per_slot_metrics: List[Any]) Any ¶
cross_slot_reduce is called on the list of results from the final call to accumulate on each slot. per_slot_metrics will be a list of length N, where N is the number of slots in the trial (or 1 in non-distributed training). cross_slot_reduce must return the final metric.
In the example of calculating a distributed mean, cross_slot_reduce might receive a list of (sum, count) tuples and it would calculate the overall mean.
def cross_slot_reduce(self, per_slot_metrics): sums, counts = zip(*per_slot_metrics) return np.array(sum(sums) / sum(counts))
determined.estimator.RunHook
¶
- class determined.estimator.RunHook¶
Abstract base class which extends SessionRunHook and is used to define callbacks that should execute during the lifetime of a EstimatorTrial.
Hooks should be passed in to Train Spec.
- on_checkpoint_end(checkpoint_dir: str) None ¶
Run after every checkpoint.
Warning
If distributed or parallel training is enabled, this callback is executed only on the chief GPU (rank = 0) which performs the checkpoint.
- on_checkpoint_load(checkpoint_dir: str) None ¶
Run at startup when the task environment starts up. If not resuming from checkpoint this is never called.
- on_trial_close() None ¶
Run when the trial close. This is the place users should execute post-trial cleanup.
determined.estimator.load_estimator_from_checkpoint_path
¶
- class determined.estimator.load_estimator_from_checkpoint_path(path: str, tags: Optional[List[str]] = None)¶
Loads a checkpoint written by an EstimatorTrial.
You should have already downloaded the checkpoint files, likely with
Checkpoint.download()
.The return type is a TensorFlow AutoTrackable object.
- Parameters
path (string) – Top level directory to load the checkpoint from.
tags (list string, optional) – Specifies which tags are loaded from the TensorFlow SavedModel. See documentation for tf.compat.v1.saved_model.load_v2.