Estimator API Reference

User Guide

Estimator API

determined.estimator.EstimatorTrial

class determined.estimator.EstimatorTrial(context: determined.estimator._estimator_context.EstimatorTrialContext)

By default, experiments run with TensorFlow 1.x. To configure your trial to use TensorFlow 2.x, set a TF 2.x image in the experiment configuration (e.g. determinedai/environments:cuda-11.3-pytorch-1.10-tf-2.8-gpu-0.19.10).

EstimatorTrial supports TF 2.x; however it uses TensorFlow V1 behavior. We have disabled TensorFlow V2 behavior for EstimatorTrial, so there is no need for you to disable it.

trial_context_class

alias of determined.estimator._estimator_context.EstimatorTrialContext

__init__(context: determined.estimator._estimator_context.EstimatorTrialContext)

Initializes a trial using the provided context.

This method should typically be overridden by trial definitions: at minimum, it is important to store context as an instance variable so that it can be accessed by other methods of the trial class. This can also be a convenient place to initialize other state that is shared between the estimator, train spec, and/or validation spec.

abstract build_estimator() tensorflow_estimator.python.estimator.estimator.EstimatorV2

Specifies the tf.estimator.Estimator instance to be used during training and validation. This may be an instance of a Premade Estimator provided by the TensorFlow team, or a Custom Estimator created by the user.

abstract build_train_spec() tensorflow_estimator.python.estimator.training.TrainSpec

Specifies the tf.estimator.TrainSpec to be used for training steps. This training specification will contain a TensorFlow input_fn which constructs the input data for a training step. Unlike the standard TensorFlow input_fn interface, EstimatorTrial only supports an input_fn that returns a tf.data.Dataset object. A function that returns a tuple of features and labels is currently not supported by EstimatorTrial. Additionally, the max_steps attribute of the training specification will be ignored; instead, the scheduling_unit option in the experiment configuration is used to determine how many batches each training workload uses.

abstract build_validation_spec() tensorflow_estimator.python.estimator.training.EvalSpec

Specifies the tf.estimator.EvalSpec to be used for validation steps. This evaluation spec will contain a TensorFlow input_fn which constructs the input data for a validation step. The validation step will evaluate steps batches, or evaluate until the input_fn raises an end-of-input exception if steps is None.

build_serving_input_receiver_fns() Dict[str, Callable[[...], Union[tensorflow_estimator.python.estimator.export.export.ServingInputReceiver, tensorflow_estimator.python.estimator.export.export.TensorServingInputReceiver]]]

Optionally returns a Python dictionary mapping string names to serving_input_receiver_fn s. If specified, each serving input receiver function will be used to export a distinct SavedModel inference graph when a Determined checkpoint is saved, using Estimator.export_saved_model. The exported models are saved under subdirectories named by the keys of the respective serving input receiver functions. For example, returning

{
    "raw": tf.estimator.export.build_raw_serving_input_receiver_fn(...),
    "parsing": tf.estimator.export.build_parsing_serving_input_receiver_fn(...)
}

from this function would configure Determined to export two SavedModel inference graphs in every checkpoint under raw and parsing subdirectories, respectively. By default, this function returns an empty dictionary and the Determined checkpoint directory only contains metadata associated with the training graph.

determined.estimator.EstimatorTrialContext

class determined.estimator.EstimatorTrialContext(*arg: Any, **kwarg: Any)

Bases: determined._trial_context.TrialContext, determined.estimator._reducer._EstimatorReducerContext

Base context class that contains runtime information for any Determined workflow that uses the tf.estimator API.

EstimatorTrialContext always has a DistributedContext accessible via context.distributed for information related to distributed training.

EstimatorTrialContext always has a EstimatorExperimentalContext accessible via context.experimental for information related to experimental features.

get_global_batch_size() int

Return the global batch size.

get_per_slot_batch_size() int

Return the per-slot batch size. When a model is trained with a single GPU, this is equal to the global batch size. When multi-GPU training is used, this is equal to the global batch size divided by the number of GPUs used to train the model.

wrap_optimizer(optimizer: Any) Any

This should be used to wrap optimizer objects immediately after they have been created. Users should use the output of this wrapper as the new instance of their optimizer. For example, if users create their optimizer within build_estimator(), they should call optimizer = wrap_optimizer(optimizer) prior to passing the optimizer into their Estimator.

wrap_dataset(dataset: Any, shard_dataset: bool = True) Any

This should be used to wrap tf.data.Dataset objects immediately after they have been created. Users should use the output of this wrapper as the new instance of their dataset. If users create multiple datasets (e.g., one for training and one for testing), users should wrap each dataset independently. E.g., If users instantiate their training dataset within build_train_spec(), they should call dataset = wrap_dataset(dataset) prior to passing it into tf.estimator.TrainSpec.

Parameters
  • dataset – tf.data.Dataset

  • shard_dataset – When performing multi-slot (distributed) training, this controls whether the dataset is sharded so that each training process (one per slot) sees unique data. If set to False, users must manually configure each process to use unique data.

classmethod from_config(config: Dict[str, Any]) determined._trial_context.TrialContext

Create a context object suitable for debugging outside of Determined.

An example for a subclass of PyTorchTrial:

config = { ... }
context = det.pytorch.PyTorchTrialContext.from_config(config)
my_trial = MyPyTorchTrial(context)

train_ds = my_trial.build_training_data_loader()
for epoch_idx in range(3):
    for batch_idx, batch in enumerate(train_ds):
        metrics = my_trial.train_batch(batch, epoch_idx, batch_idx)
        ...

An example for a subclass of TFKerasTrial:

config = { ... }
context = det.keras.TFKerasTrialContext.from_config(config)
my_trial = tf_keras_one_var_model.OneVarTrial(context)

model = my_trial.build_model()
model.fit(my_trial.build_training_data_loader())
eval_metrics = model.evaluate(my_trial.build_validation_data_loader())
Parameters

config – An experiment config file, in dictionary form.

get_data_config() Dict[str, Any]

Return the data configuration.

get_experiment_config() Dict[str, Any]

Return the experiment configuration.

get_experiment_id() int

Return the experiment ID of the current trial.

get_hparam(name: str) Any

Return the current value of the hyperparameter with the given name.

get_hparams() Dict[str, Any]

Return a dictionary of hyperparameter names to values.

get_stop_requested() bool

Return whether a trial stoppage has been requested.

get_tensorboard_path() pathlib.Path

Get the path where files for consumption by TensorBoard should be written

get_trial_id() int

Return the trial ID of the current trial.

make_metric(metric: Any, reducer: Union[Callable[[List[Any]], Any], determined.estimator._reducer.MetricReducer], numpy_dtype: Any) Tuple[tensorflow.python.framework.ops.Operation, tensorflow.python.framework.ops.Operation]

Return an estimator-compatible validation metric which will be calculated properly, even during distributed evaluation.

During distributed evaluation, many types of metrics calculated via tf.metrics or tf.keras.metrics cannot be aggregated properly from the per-slot final metrics calculated by each separate Estimator replica. One example is tf.metrics.auc, where the ROC AUC calculated over predictions and labels from a full dataset cannot be derived from the individual ROC AUC metrics evaluated over several shards of a dataset.

Determined solves this problem by offering customizable metrics which are Estimator-compatible. For example, ROC AUC could be properly calculated during distributed evaluation by calling sklearn.metrics.roc_auc_score in a custom reducer function passed to make_metric.

The metric input can be a tensor, a list of tensors, or a dictionary of tensors. Nested structures are not supported.

The reducer should be either a single function that can calculate the metric from a list of the per-batch values of metric, or it can be an instance of a det.estimator.MetricReducer.

The numpy_dtype must be a numpy dtype. It is used internally to determine the output type of the TensorFlow py_func to report the final metric result to the Estimator API. The format of numpy_dtype should be anything that np.dtype() accepts.

The primary motivation for passing a function as the reducer is simplicity. Metrics from all batches will be buffered in memory and passed over the network where they will be reduced all at once. This introduces some overhead, but it is likely unnoticeable for scalar metrics or on validation datasets of small or medium size. This single function strategy may also be desirable for quick prototyping or for calculating metrics that are difficult or impossible to calculate incrementally.

The primary motivation for passing a det.estimator.MetricsReducer as the reducer is performance. det.estimator.MetricsReducer allows the user to incrementally calculate the partial metric on each slot, taking advantage of distributed computation, minimizing memory usage, and minimizing the network communication before the final cross_slot_reduce operation.

Evaluation performance may be improved by precomputing as much as possible in the graph so that less computation on the metric value is required within the reducer.

Example usage where reducer is a function:

def my_mean_reducer(all_batch_metrics):
    # Use hstack in case not all batches are equal length.
    return np.mean(np.hstack(all_batch_metrics))

def my_estimator_model_function(features, labels, mode):
    ...
    if mode == tf.estimator.ModeKeys.EVAL:

        my_avg_prediction = context.make_metric(
             metric=predictions, reducer=my_mean_reducer, numpy_dtype=np.float32
        )

        return tf.estimator.EstimatorSpec(
            mode,
            loss=loss,
            eval_metric_ops={"my_avg_prediction": my_avg_prediction},
        )
set_stop_requested(stop_requested: bool) None

Set a flag to request a trial stoppage. When this flag is set to True, we finish the step, checkpoint, then exit.

determined.estimator.EstimatorTrialContext.distributed

class determined.core._distributed.DistributedContext(*, rank: int, size: int, local_rank: int, local_size: int, cross_rank: int, cross_size: int, chief_ip: Optional[str] = None, pub_port: int = 12360, pull_port: int = 12376, port_offset: int = 0, force_tcp: bool = False)

DistributedContext provides useful methods for effective distributed training.

A DistributedContext has the following required args:
  • rank: the index of this worker in the entire job

  • size: the number of workers in the entire job

  • local_rank: the index of this worker on this machine

  • local_size: the number of workers on this machine

  • cross_rank: the index of this machine in the entire job

  • cross_size: the number of machines in the entire job

Additionally, any time that cross_size > 1, you must also provide:
  • chief_ip: the ip address to reach the chief worker (where rank==0)

Note

DistributedContext has .allgather(), .gather(), and .broadcast() methods, which are easy to use and which can be useful for coordinating work across workers, but it is not a replacement for the allgather/gather/broadcast operations in your particular distributed training framework.

classmethod from_horovod(hvd: Any, chief_ip: Optional[str] = None) determined.core._distributed.DistributedContext

Create a DistributedContext using the provided hvd module to determine rank information.

Example:

import horovod.torch as hvd
hvd.init()
distributed = DistributedContext.from_horovod(hvd)

The IP address for the chief worker is required whenever hvd.cross_size() > 1. The value may be provided using the chief_ip argument or the DET_CHIEF_IP environment variable.

classmethod from_deepspeed(chief_ip: Optional[str] = None) determined.core._distributed.DistributedContext

Create a DistributedContext using the standard deepspeed environment variables to determine rank information.

The IP address for the chief worker is required whenever CROSS_SIZE > 1. The value may be provided using the chief_ip argument or the DET_CHIEF_IP environment variable.

classmethod from_torch_distributed(chief_ip: Optional[str] = None) determined.core._distributed.DistributedContext

Create a DistributedContext using the standard torch distributed environment variables to determine rank information.

The IP address for the chief worker is required whenever CROSS_SIZE > 1. The value may be provided via the chief_ip argument or the DET_CHIEF_IP environment variable.

get_rank() int

Return the rank of the process in the trial. The rank of a process is a unique ID within the trial. That is, no two processes in the same trial are assigned the same rank.

get_local_rank() int

Return the rank of the process on the agent. The local rank of a process is a unique ID within a given agent and trial; that is, no two processes in the same trial that are executing on the same agent are assigned the same rank.

get_size() int

Return the number of slots this trial is running on.

get_num_agents() int

Return the number of agents this trial is running on.

gather(stuff: Any) Optional[List]

Gather stuff to the chief. The chief returns a list of all stuff, and workers return None.

gather() is not a replacement for the gather functionality of your distributed training framework.

gather_local(stuff: Any) Optional[List]

Gather stuff to the local chief. The local chief returns a list of all stuff, and local workers return None.

gather_local() is not a replacement for the gather functionality of your distributed training framework.

allgather(stuff: Any) List

Gather stuff to the chief and broadcast all of it back to the workers.

allgather() is not a replacement for the allgather functionality of your distributed training framework.

allgather_local(stuff: Any) List

Gather stuff to the local chief and broadcast all of it back to the local workers.

allgather_local() is not a replacement for the allgather functionality of your distributed training framework.

broadcast(stuff: Any) Any

Every worker gets the stuff sent by the chief.

broadcast() is not a replacement for the broadcast functionality of your distributed training framework.

broadcast_local(stuff: Optional[Any] = None) Any

Every worker gets the stuff sent by the local chief.

broadcast_local() is not a replacement for the broadcast functionality of your distributed training framework.

determined.estimator.EstimatorExperimentalContext

class determined.estimator.EstimatorExperimentalContext

Context class that contains experimental runtime information and features for any Determined workflow that uses the tf.estimator API.

EstimatorExperimentalContext extends EstimatorTrialContext under the context.experimental namespace.

determined.estimator.MetricReducer

class determined.estimator.MetricReducer

Efficiently aggregating validation metrics across a multi-slot distributed evaluation is done in two steps:

  1. Accumulate metrics from each batch on each slot. In the case of calculating a mean, this might mean keeping a running sum and a count of metrics received.

  2. Reduce metrics from each slot to calculate the final metric. In the case of calculating a mean, this might mean adding up the per-slot sums and dividing the result by the per-slot counts.

Example implementation and usage:

class MyAvgMetricReducer(estimator.MetricReducer):
    def __init__(self):
       self.sum = 0
       self.counts = 0

    def accumulate(self, metric):
        self.sum += sum(metric)
        self.counts += 1
        return self.sum, self.counts

    def cross_slot_reduce(self, per_slot_metrics):
        # per_slot_metrics is a list of (sum, counts) tuples
        # returned by the final self.accumulate() on each slot
        sums, counts = zip(*per_slot_metrics)
        return sum(sums) / sum(counts)

def my_estimator_model_function(features, labels, mode):
    ...
    if mode == tf.estimator.ModeKeys.EVAL:

        my_avg_prediction = context.make_metric(
             metric=predictions, reducer=MyAvgMetricReducer(), numpy_dtype=np.float32
        )

        return tf.estimator.EstimatorSpec(
            mode,
            loss=loss,
            eval_metric_ops={"my_avg_prediction": my_avg_prediction},
        )

See also: context.make_metric().

abstract accumulate(metric: Any) Any

accumulate is called for each batch in the evaluation dataset. Batches will be distributed across slots, so accumulate will be called many times on each slot.

accumulate should return the accumulated state. After evaluation is complete, the final return value of accumulate will become an element of the per_slot_metrics argument to cross_slot_reduce.

In the example of the calculating a distributed mean, accumulate might keep a running sum and a count of metrics received:

def accumulate(self, metric):
    self.sum += metric
    self.count += 1
    return self.sum, self.count
abstract cross_slot_reduce(per_slot_metrics: List[Any]) Any

cross_slot_reduce is called on the list of results from the final call to accumulate on each slot. per_slot_metrics will be a list of length N, where N is the number of slots in the trial (or 1 in non-distributed training). cross_slot_reduce must return the final metric.

In the example of calculating a distributed mean, cross_slot_reduce might receive a list of (sum, count) tuples and it would calculate the overall mean.

def cross_slot_reduce(self, per_slot_metrics):
    sums, counts = zip(*per_slot_metrics)
    return np.array(sum(sums) / sum(counts))

determined.estimator.RunHook

class determined.estimator.RunHook

Abstract base class which extends SessionRunHook and is used to define callbacks that should execute during the lifetime of a EstimatorTrial.

Hooks should be passed in to Train Spec.

on_checkpoint_end(checkpoint_dir: str) None

Run after every checkpoint.

Warning

If distributed or parallel training is enabled, this callback is executed only on the chief GPU (rank = 0) which performs the checkpoint.

on_checkpoint_load(checkpoint_dir: str) None

Run at startup when the task environment starts up. If not resuming from checkpoint this is never called.

on_trial_close() None

Run when the trial close. This is the place users should execute post-trial cleanup.

determined.estimator.load_estimator_from_checkpoint_path

class determined.estimator.load_estimator_from_checkpoint_path(path: str, tags: Optional[List[str]] = None)

Loads a checkpoint written by an EstimatorTrial.

You should have already downloaded the checkpoint files, likely with Checkpoint.download().

The return type is a TensorFlow AutoTrackable object.

Parameters
  • path (string) – Top level directory to load the checkpoint from.

  • tags (list string, optional) – Specifies which tags are loaded from the TensorFlow SavedModel. See documentation for tf.compat.v1.saved_model.load_v2.