Shortcuts

determined.estimator

determined.estimator.EstimatorTrial

class determined.estimator.EstimatorTrial(context: determined.estimator._estimator_context.EstimatorTrialContext)

By default, experiments run with TensorFlow 1.x. To configure your trial to use TensorFlow 2.x, set a TF 2.x image in the experiment configuration (e.g. determinedai/environments:cuda-10.1-pytorch-1.4-tf-2.2-gpu-0.7.0).

EstimatorTrial supports TF 2.x; however it uses TensorFlow V1 behavior. We have disabled TensorFlow V2 behavior for EstimatorTrial, so there is no need for you to disable it.

trial_context_class

alias of determined.estimator._estimator_context.EstimatorTrialContext

__init__(context: determined.estimator._estimator_context.EstimatorTrialContext)

Initializes a trial using the provided context.

This method should typically be overridden by trial definitions: at minimum, it is important to store context as an instance variable so that it can be accessed by other methods of the trial class. This can also be a convenient place to initialize other state that is shared between the estimator, train spec, and/or validation spec.

abstract build_estimator() → tensorflow_estimator.python.estimator.estimator.Estimator

Specifies the tf.estimator.Estimator instance to be used during training and validation. This may be an instance of a Premade Estimator provided by the TensorFlow team, or a Custom Estimator created by the user.

abstract build_train_spec() → tensorflow_estimator.python.estimator.training.TrainSpec

Specifies the tf.estimator.TrainSpec to be used for training steps. This training specification will contain a TensorFlow input_fn which constructs the input data for a training step. Unlike the standard Tensorflow input_fn interface, EstimatorTrial only supports an input_fn that returns a tf.data.Dataset object. A function that returns a tuple of features and labels is currently not supported by EstimatorTrial. Additionally, the max_steps attribute of the training specification will be ignored; instead, the scheduling_unit option in the experiment configuration is used to determine how many batches each training workload uses.

abstract build_validation_spec() → tensorflow_estimator.python.estimator.training.EvalSpec

Specifies the tf.estimator.EvalSpec to be used for validation steps. This evaluation spec will contain a TensorFlow input_fn which constructs the input data for a validation step. The validation step will evaluate steps batches, or evaluate until the input_fn raises an end-of-input exception if steps is None.

build_serving_input_receiver_fns() → Dict[str, Callable[..., Union[tensorflow_estimator.python.estimator.export.export.ServingInputReceiver, tensorflow_estimator.python.estimator.export.export.TensorServingInputReceiver]]]

Optionally returns a Python dictionary mapping string names to serving_input_receiver_fn s. If specified, each serving input receiver function will be used to export a distinct SavedModel inference graph when a Determined checkpoint is saved, using Estimator.export_saved_model. The exported models are saved under subdirectories named by the keys of the respective serving input receiver functions. For example, returning

{
    "raw": tf.estimator.export.build_raw_serving_input_receiver_fn(...),
    "parsing": tf.estimator.export.build_parsing_serving_input_receiver_fn(...)
}

from this function would configure Determined to export two SavedModel inference graphs in every checkpoint under raw and parsing subdirectories, respectively. By default, this function returns an empty dictionary and the Determined checkpoint directory only contains metadata associated with the training graph.

determined.experimental.estimator.init()

determined.experimental.estimator.init(config: Optional[Dict[str, Any]] = None, local: bool = False, test: bool = False, context_dir: str = '', command: Optional[List[str]] = None, master_url: Optional[str] = None) → determined.estimator._estimator_context.EstimatorNativeContext

Create a tf.estimator experiment using the Native API.

Parameters
  • config – A dictionary representing the experiment configuration to be associated with the experiment.

  • local – A boolean indicating if training will happen locally. When False, the experiment will be submitted to the Determined cluster. Defaults to False.

  • test – A boolean indicating if the experiment should be shortened to a minimal loop of training, validation, and checkpointing. test=True is useful quick iterating during model porting or debugging because common errors will surface more quickly. Defaults to False.

  • context_dir

    A string filepath that defines the context directory. All model code will be executed with this as the current working directory.

    When local=False, this argument is required. All files in this directory will be uploaded to the Determined cluster. The total size of this directory must be under 96 MB.

    When local=True, this argument is optional and assumed to be the current working directory by default.

  • command – A list of strings that is used as the entrypoint of the training script in the Determined task environment. When executing this function via a python script, this argument is inferred to be sys.argv by default. When executing this function via IPython or Jupyter notebook, this argument is required.

  • master_url – An optional string to use as the Determined master URL when local=False. If not specified, will be inferred from the environment variable DET_MASTER.

Returns

determined.estimator.EstimatorNativeContext

determined.estimator.EstimatorContext

To use tf.estimator models with Determined, users need to wrap their optimizer and datasets using the following functions inherited from determined.estimator.EstimatorContext. Note that the concrete context object where these functions will be found will be in determined.estimator.EstimatorTrialContext.

class determined.estimator.EstimatorContext(env: determined._env_context.EnvContext, hvd_config: determined.horovod.HorovodContext)

Base context class that contains runtime information for any Determined workflow that uses the tf.estimator API.

EstimatorTrialContext always has a DistributedContext accessible via context.distributed for information related to distributed training.

EstimatorTrialContext always has a EstimatorExperimentalContext accessible via context.experimental for information related to experimental features.

wrap_optimizer(optimizer: Any) → Any

This should be used to wrap optimizer objects immediately after they have been created. Users should use the output of this wrapper as the new instance of their optimizer. For example, if users create their optimizer within build_estimator(), they should call optimizer = wrap_optimizer(optimzer) prior to passing the optimizer into their Estimator.

wrap_dataset(dataset: Any, shard_dataset: bool = True) → Any

This should be used to wrap tf.data.Dataset objects immediately after they have been created. Users should use the output of this wrapper as the new instance of their dataset. If users create multiple datasets (e.g., one for training and one for testing), users should wrap each dataset independently. E.g., If users instantiate their training dataset within build_train_spec(), they should call dataset = wrap_dataset(dataset) prior to passing it into tf.estimator.TrainSpec.

Parameters
  • dataset – tf.data.Dataset

  • shard_dataset – When performing multi-slot (distributed) training, this controls whether the dataset is sharded so that each training process (one per slot) sees unique data. If set to False, users must manually configure each process to use unique data.

class determined.estimator.EstimatorExperimentalContext(env: determined._env_context.EnvContext, hvd_config: determined.horovod.HorovodContext)

Context class that contains experimental runtime information and features for any Determined workflow that uses the tf.estimator API.

EstimatorExperimentalContext extends EstimatorTrialContext under the context.experimental namespace.

make_metric(metric: Any, reducer: Union[Callable[List[Any], Any], estimator.MetricReducer], numpy_dtype: Any) → Tuple[tensorflow.python.framework.ops.Operation, tensorflow.python.framework.ops.Operation]

Return an estimator-compatible validation metric which will be calculated properly, even during distributed evaluation.

During distributed evaluation, many types of metrics calculated via tf.metrics or tf.keras.metrics cannot be aggregated properly from the per-slot final metrics calculated by each separate Estimator replica. One example is tf.metrics.auc, where the ROC AUC calculated over predictions and labels from a full dataset cannot be derived from the individual ROC AUC metrics evaluated over several shards of a dataset.

Determined solves this problem by offering customizable metrics which are Estimator-compatible. For example, ROC AUC could be properly calculated during distributed evaluation by calling sklearn.metrics.roc_auc_score in a custom reducer function passed to make_metric.

The metric input can be a tensor, a list of tensors, or a dictionary of tensors. Nested structures are not supported.

The reducer should be either a single function that can calculate the metric from a list of the per-batch values of metric, or it can be an instance of a det.estimator.MetricReducer.

The numpy_dtype must be a numpy dtype. It is used internally to determined the output type of the TensorFlow py_func to report the final metric result to the Estimator API. The format of numpy_dtype should be anything that np.dtype() accepts.

The primary motivation for passing a function as the reducer is simplicity. Metrics from all batches will be buffered in memory and passed over the network where they will be reduced all at once. This introduces some overhead, but it is likely unnoticeable for scalar metrics or on validation datasets of small or medium size. This single function strategy may also be desirable for quick prototyping or for calculating metrics that are difficult or impossible to calculate incrementally.

The primary motivation for passing a det.estimator.MetricsReducer as the reducer is performance. det.estimator.MetricsReducer allows the user to incrementally calculate the partial metric on each slot, taking advantage of distributed computation, minimizing memory usage, and minimizing the network communication before the final cross_slot_reduce operation.

Evaluation performance may be improved by precomputing as much as possible in the graph so that less computation on the metric value is required within the reducer.

Example usage where reducer is a function:

def my_mean_reducer(all_batch_metrics):
    # Use hstack in case not all batches are equal length.
    return np.mean(np.hstack(all_batch_metrics))

def my_estimator_model_function(features, labels, mode):
    ...
    if mode == tf.estimator.ModeKeys.EVAL:

        my_avg_prediction = context.experimental.make_metric(
             metric=predictions, reducer=my_mean_reducer, numpy_dtype=np.float32
        )

        return tf.estimator.EstimatorSpec(
            mode,
            loss=loss,
            eval_metric_ops={"my_avg_prediction": my_avg_prediction},
        )
cache_train_dataset(dataset_id: str, dataset_version: str, shuffle: bool = False, skip_shuffle_at_epoch_end: bool = False) → Callable

cache_train_dataset is a decorator for creating your training dataset. It should decorate a function that outputs a tf.data.Dataset object. The dataset will be stored in a cache, keyed by dataset_id and dataset_version. The cache is re-used in subsequent calls.

Parameters
  • dataset_id – A string that will be used as part of the unique identifier for this dataset.

  • dataset_version – A string that will be used as part of the unique identifier for this dataset.

  • shuffle – A bool indicating if the dataset should be shuffled. Shuffling will be performed with the trial’s random seed which can be set in Experiment Configuration.

  • skip_shuffle_at_epoch_end – A bool indicating if shuffling should be skipped at the end of epochs.

Example Usage:

def make_train_dataset(self):
    @self.context.experimental.cache_train_dataset("range_dataset", "v1")
    def make_dataset():
        ds = tf.data.Dataset.range(10)
        return ds

    dataset = make_dataset()
    dataset = dataset.batch(self.context.get_per_slot_batch_size())
    dataset = dataset.map(...)
    return dataset

Note

dataset.batch() and runtime augmentation should be done after caching. Additionally, users should never need to call dataset.repeat().

cache_validation_dataset(dataset_id: str, dataset_version: str, shuffle: bool = False) → Callable

cache_validation_dataset is a decorator for creating your validation dataset. It should decorate a function that outputs a tf.data.Dataset object. The dataset will be stored in a cache, keyed by dataset_id and dataset_version. The cache is re-used in subsequent calls.

Parameters
  • dataset_id – A string that will be used as part of the unique identifier for this dataset.

  • dataset_version – A string that will be used as part of the unique identifier for this dataset.

  • shuffle – A bool indicating if the dataset should be shuffled. Shuffling will be performed with the trial’s random seed which can be set in Experiment Configuration.

Reducing Metrics

Determined supports proper reduction of arbitrary validation metrics during distributed training by allowing users to define custom reducers for their metrics. Custom reducers can be either a function or an implementation of the determined.estimator.MetricReducer interface.

See determined.estimator.EstimatorExperimentalContext.make_metric() for more details.

class determined.estimator.MetricReducer

Efficiently aggregating validation metrics across a multi-slot distributed evaluation is done in two steps:

  1. Accumulate metrics from each batch on each slot. In the case of calculating a mean, this might mean keeping a running sum and a count of metrics received.

  2. Reduce metrics from each slot to calculate the final metric. In the case of calculating a mean, this might mean adding up the per-slot sums and dividing the result by the per-slot counts.

Example implementation and usage:

class MyAvgMetricReducer(estimator.MetricReducer):
    def __init__(self):
       self.sum = 0
       self.counts = 0

    def accumulate(self, metric):
        self.sum += sum(metric)
        self.counts += 1
        return self.sum, self.counts

    def cross_slot_reduce(self, per_slot_metrics):
        # per_slot_metrics is a list of (sum, counts) tuples
        # returned by the final self.accumulate() on each slot
        sums, counts = zip(*per_slot_metrics)
        return sum(sums) / sum(counts)

def my_estimator_model_function(features, labels, mode):
    ...
    if mode == tf.estimator.ModeKeys.EVAL:

        my_avg_prediction = context.experimental.make_metric(
             metric=predictions, reducer=MyAvgMetricReducer(), numpy_dtype=np.float32
        )

        return tf.estimator.EstimatorSpec(
            mode,
            loss=loss,
            eval_metric_ops={"my_avg_prediction": my_avg_prediction},
        )

See also: determined.estimator.EstimatorExperimentalContext.make_metric().

abstract accumulate(metric: Any) → Any

accumulate is called for each batch in the evaluation dataset. Batches will be distributed across slots, so accumulate will be called many times on each slot.

accumulate should return the accumulated state. After evaluation is complete, the final return value of accumulate will become an element of the per_slot_metrics argument to cross_slot_reduce.

In the example of the calculating a distributed mean, accumulate might keep a running sum and a count of metrics received:

def accumulate(self, metric):
    self.sum += metric
    self.count += 1
    return self.sum, self.count
abstract cross_slot_reduce(per_slot_metrics: List[Any]) → Any

cross_slot_reduce is called on the list of results from the final call to accumulate on each slot. per_slot_metrics will be a list of length N, where N is the number of slots in the trial (or 1 in non-distributed training). cross_slot_reduce must return the final metric.

In the example of calculating a distributed mean, cross_slot_reduce might recieve a list of (sum, count) tuples and it would calculate the overall mean.

def cross_slot_reduce(self, per_slot_metrics):
    sums, counts = zip(*per_slot_metrics)
    return np.array(sum(sums) / sum(counts))

Callbacks

To execute arbitrary Python code during the lifecycle of a EstimatorTrial, determined.estimator.RunHook extends tf.estimator.SessionRunHook. When utilizing determined.estimator.RunHook, users can use native estimator hooks such as before_run() and Determined hooks such as on_checkpoint_end().

class determined.estimator.RunHook

Abstract base class which extends SessionRunHook and is used to define callbacks that should execute during the lifetime of a EstimatorTrial.

Hooks should be passed in to Train Spec.

on_checkpoint_end(checkpoint_dir: str) → None

Run after every checkpoint.

Warning

If distributed or parallel training is enabled, this callback is executed only on the chief GPU (rank = 0) which performs the checkpoint.

on_checkpoint_load(checkpoint_dir: str) → None

Run at startup when the task environment starts up. If not resuming from checkpoint this is never called.

on_trial_close() → None

Run when the trial close. This is the place users should execute post-trial cleanup.

Example usage of determined.estimator.RunHook which adds custom metadata checkpoints:

class MyHook(determined.estimator.RunHook):
    def __init__(self, context, metadata) -> None:
        self._context = context
        self._metadata = metadata

    def on_checkpoint_end(self, checkpoint_dir) -> None:
        with open(os.path.join(checkpoint_dir, "metadata.txt"), "w") as fp:
            fp.write(self._metadata)


class MyEstimatorTrial(determined.estimator.EstimatorTrial):
    ...

    def build_train_spec(self) -> tf.estimator.TrainSpec:
        return tf.estimator.TrainSpec(
            make_input_fn(),
            hooks=[MyHook(self.context, "my_metadata")],
        )

Examples