determined.estimator¶
determined.estimator.EstimatorTrial
¶
-
class
determined.estimator.
EstimatorTrial
(context: determined.estimator._estimator_context.EstimatorTrialContext)¶ By default, experiments run with TensorFlow 1.x. To configure your trial to use TensorFlow 2.x, set a TF 2.x image in the experiment configuration (e.g.
determinedai/environments:cuda-10.1-pytorch-1.4-tf-2.2-gpu-0.7.0
).EstimatorTrial
supports TF 2.x; however it uses TensorFlow V1 behavior. We have disabled TensorFlow V2 behavior forEstimatorTrial
, so there is no need for you to disable it.-
trial_context_class
¶ alias of
determined.estimator._estimator_context.EstimatorTrialContext
-
__init__
(context: determined.estimator._estimator_context.EstimatorTrialContext)¶ Initializes a trial using the provided
context
.This method should typically be overridden by trial definitions: at minimum, it is important to store
context
as an instance variable so that it can be accessed by other methods of the trial class. This can also be a convenient place to initialize other state that is shared between the estimator, train spec, and/or validation spec.
-
abstract
build_estimator
() → tensorflow_estimator.python.estimator.estimator.Estimator¶ Specifies the tf.estimator.Estimator instance to be used during training and validation. This may be an instance of a Premade Estimator provided by the TensorFlow team, or a Custom Estimator created by the user.
-
abstract
build_train_spec
() → tensorflow_estimator.python.estimator.training.TrainSpec¶ Specifies the tf.estimator.TrainSpec to be used for training steps. This training specification will contain a TensorFlow
input_fn
which constructs the input data for a training step. Unlike the standard Tensorflowinput_fn
interface,EstimatorTrial
only supports aninput_fn
that returns atf.data.Dataset
object. A function that returns a tuple of features and labels is currently not supported byEstimatorTrial
. Additionally, themax_steps
attribute of the training specification will be ignored; instead, thescheduling_unit
option in the experiment configuration is used to determine how many batches each training workload uses.
-
abstract
build_validation_spec
() → tensorflow_estimator.python.estimator.training.EvalSpec¶ Specifies the tf.estimator.EvalSpec to be used for validation steps. This evaluation spec will contain a TensorFlow
input_fn
which constructs the input data for a validation step. The validation step will evaluatesteps
batches, or evaluate until theinput_fn
raises an end-of-input exception ifsteps
isNone
.
-
build_serving_input_receiver_fns
() → Dict[str, Callable[..., Union[tensorflow_estimator.python.estimator.export.export.ServingInputReceiver, tensorflow_estimator.python.estimator.export.export.TensorServingInputReceiver]]]¶ Optionally returns a Python dictionary mapping string names to serving_input_receiver_fn s. If specified, each serving input receiver function will be used to export a distinct SavedModel inference graph when a Determined checkpoint is saved, using Estimator.export_saved_model. The exported models are saved under subdirectories named by the keys of the respective serving input receiver functions. For example, returning
{ "raw": tf.estimator.export.build_raw_serving_input_receiver_fn(...), "parsing": tf.estimator.export.build_parsing_serving_input_receiver_fn(...) }
from this function would configure Determined to export two
SavedModel
inference graphs in every checkpoint underraw
andparsing
subdirectories, respectively. By default, this function returns an empty dictionary and the Determined checkpoint directory only contains metadata associated with the training graph.
-
determined.experimental.estimator.init()
¶
-
determined.experimental.estimator.
init
(config: Optional[Dict[str, Any]] = None, local: bool = False, test: bool = False, context_dir: str = '', command: Optional[List[str]] = None, master_url: Optional[str] = None) → determined.estimator._estimator_context.EstimatorNativeContext¶ Create a tf.estimator experiment using the Native API.
- Parameters
config – A dictionary representing the experiment configuration to be associated with the experiment.
local – A boolean indicating if training will happen locally. When
False
, the experiment will be submitted to the Determined cluster. Defaults toFalse
.test – A boolean indicating if the experiment should be shortened to a minimal loop of training, validation, and checkpointing.
test=True
is useful quick iterating during model porting or debugging because common errors will surface more quickly. Defaults toFalse
.context_dir –
A string filepath that defines the context directory. All model code will be executed with this as the current working directory.
When
local=False
, this argument is required. All files in this directory will be uploaded to the Determined cluster. The total size of this directory must be under 96 MB.When
local=True
, this argument is optional and assumed to be the current working directory by default.command – A list of strings that is used as the entrypoint of the training script in the Determined task environment. When executing this function via a python script, this argument is inferred to be
sys.argv
by default. When executing this function via IPython or Jupyter notebook, this argument is required.master_url – An optional string to use as the Determined master URL when
local=False
. If not specified, will be inferred from the environment variableDET_MASTER
.
- Returns
determined.estimator.EstimatorNativeContext
determined.estimator.EstimatorContext
¶
To use tf.estimator
models with Determined, users need to wrap their
optimizer and datasets using the following functions inherited from
determined.estimator.EstimatorContext
. Note that the concrete
context object where these functions will be found will be in
determined.estimator.EstimatorTrialContext
.
-
class
determined.estimator.
EstimatorContext
(env: determined._env_context.EnvContext, hvd_config: determined.horovod.HorovodContext)¶ Base context class that contains runtime information for any Determined workflow that uses the
tf.estimator
API.EstimatorTrialContext always has a
DistributedContext
accessible viacontext.distributed
for information related to distributed training.EstimatorTrialContext always has a
EstimatorExperimentalContext
accessible viacontext.experimental
for information related to experimental features.-
wrap_optimizer
(optimizer: Any) → Any¶ This should be used to wrap optimizer objects immediately after they have been created. Users should use the output of this wrapper as the new instance of their optimizer. For example, if users create their optimizer within
build_estimator()
, they should calloptimizer = wrap_optimizer(optimzer)
prior to passing the optimizer into their Estimator.
-
wrap_dataset
(dataset: Any, shard_dataset: bool = True) → Any¶ This should be used to wrap
tf.data.Dataset
objects immediately after they have been created. Users should use the output of this wrapper as the new instance of their dataset. If users create multiple datasets (e.g., one for training and one for testing), users should wrap each dataset independently. E.g., If users instantiate their training dataset withinbuild_train_spec()
, they should calldataset = wrap_dataset(dataset)
prior to passing it intotf.estimator.TrainSpec
.- Parameters
dataset – tf.data.Dataset
shard_dataset – When performing multi-slot (distributed) training, this controls whether the dataset is sharded so that each training process (one per slot) sees unique data. If set to False, users must manually configure each process to use unique data.
-
-
class
determined.estimator.
EstimatorExperimentalContext
(env: determined._env_context.EnvContext, hvd_config: determined.horovod.HorovodContext)¶ Context class that contains experimental runtime information and features for any Determined workflow that uses the
tf.estimator
API.EstimatorExperimentalContext
extendsEstimatorTrialContext
under thecontext.experimental
namespace.-
make_metric
(metric: Any, reducer: Union[Callable[List[Any], Any], estimator.MetricReducer], numpy_dtype: Any) → Tuple[tensorflow.python.framework.ops.Operation, tensorflow.python.framework.ops.Operation]¶ Return an estimator-compatible validation metric which will be calculated properly, even during distributed evaluation.
During distributed evaluation, many types of metrics calculated via
tf.metrics
ortf.keras.metrics
cannot be aggregated properly from the per-slot final metrics calculated by each separate Estimator replica. One example istf.metrics.auc
, where the ROC AUC calculated over predictions and labels from a full dataset cannot be derived from the individual ROC AUC metrics evaluated over several shards of a dataset.Determined solves this problem by offering customizable metrics which are Estimator-compatible. For example, ROC AUC could be properly calculated during distributed evaluation by calling
sklearn.metrics.roc_auc_score
in a customreducer
function passed tomake_metric
.The
metric
input can be a tensor, a list of tensors, or a dictionary of tensors. Nested structures are not supported.The
reducer
should be either a single function that can calculate the metric from a list of the per-batch values ofmetric
, or it can be an instance of adet.estimator.MetricReducer
.The
numpy_dtype
must be a numpy dtype. It is used internally to determined the output type of the TensorFlowpy_func
to report the final metric result to the Estimator API. The format ofnumpy_dtype
should be anything thatnp.dtype()
accepts.The primary motivation for passing a function as the reducer is simplicity. Metrics from all batches will be buffered in memory and passed over the network where they will be reduced all at once. This introduces some overhead, but it is likely unnoticeable for scalar metrics or on validation datasets of small or medium size. This single function strategy may also be desirable for quick prototyping or for calculating metrics that are difficult or impossible to calculate incrementally.
The primary motivation for passing a
det.estimator.MetricsReducer
as the reducer is performance.det.estimator.MetricsReducer
allows the user to incrementally calculate the partial metric on each slot, taking advantage of distributed computation, minimizing memory usage, and minimizing the network communication before the finalcross_slot_reduce
operation.Evaluation performance may be improved by precomputing as much as possible in the graph so that less computation on the
metric
value is required within the reducer.Example usage where
reducer
is a function:def my_mean_reducer(all_batch_metrics): # Use hstack in case not all batches are equal length. return np.mean(np.hstack(all_batch_metrics)) def my_estimator_model_function(features, labels, mode): ... if mode == tf.estimator.ModeKeys.EVAL: my_avg_prediction = context.experimental.make_metric( metric=predictions, reducer=my_mean_reducer, numpy_dtype=np.float32 ) return tf.estimator.EstimatorSpec( mode, loss=loss, eval_metric_ops={"my_avg_prediction": my_avg_prediction}, )
-
cache_train_dataset
(dataset_id: str, dataset_version: str, shuffle: bool = False, skip_shuffle_at_epoch_end: bool = False) → Callable¶ cache_train_dataset is a decorator for creating your training dataset. It should decorate a function that outputs a
tf.data.Dataset
object. The dataset will be stored in a cache, keyed bydataset_id
anddataset_version
. The cache is re-used in subsequent calls.- Parameters
dataset_id – A string that will be used as part of the unique identifier for this dataset.
dataset_version – A string that will be used as part of the unique identifier for this dataset.
shuffle – A bool indicating if the dataset should be shuffled. Shuffling will be performed with the trial’s random seed which can be set in Experiment Configuration.
skip_shuffle_at_epoch_end – A bool indicating if shuffling should be skipped at the end of epochs.
Example Usage:
def make_train_dataset(self): @self.context.experimental.cache_train_dataset("range_dataset", "v1") def make_dataset(): ds = tf.data.Dataset.range(10) return ds dataset = make_dataset() dataset = dataset.batch(self.context.get_per_slot_batch_size()) dataset = dataset.map(...) return dataset
Note
dataset.batch()
and runtime augmentation should be done after caching. Additionally, users should never need to calldataset.repeat()
.
-
cache_validation_dataset
(dataset_id: str, dataset_version: str, shuffle: bool = False) → Callable¶ cache_validation_dataset is a decorator for creating your validation dataset. It should decorate a function that outputs a
tf.data.Dataset
object. The dataset will be stored in a cache, keyed bydataset_id
anddataset_version
. The cache is re-used in subsequent calls.- Parameters
dataset_id – A string that will be used as part of the unique identifier for this dataset.
dataset_version – A string that will be used as part of the unique identifier for this dataset.
shuffle – A bool indicating if the dataset should be shuffled. Shuffling will be performed with the trial’s random seed which can be set in Experiment Configuration.
-
Reducing Metrics¶
Determined supports proper reduction of arbitrary validation metrics
during distributed training by allowing users to define custom reducers
for their metrics. Custom reducers can be either a function or an
implementation of the determined.estimator.MetricReducer
interface.
See
determined.estimator.EstimatorExperimentalContext.make_metric()
for more details.
-
class
determined.estimator.
MetricReducer
¶ Efficiently aggregating validation metrics across a multi-slot distributed evaluation is done in two steps:
Accumulate metrics from each batch on each slot. In the case of calculating a mean, this might mean keeping a running sum and a count of metrics received.
Reduce metrics from each slot to calculate the final metric. In the case of calculating a mean, this might mean adding up the per-slot sums and dividing the result by the per-slot counts.
Example implementation and usage:
class MyAvgMetricReducer(estimator.MetricReducer): def __init__(self): self.sum = 0 self.counts = 0 def accumulate(self, metric): self.sum += sum(metric) self.counts += 1 return self.sum, self.counts def cross_slot_reduce(self, per_slot_metrics): # per_slot_metrics is a list of (sum, counts) tuples # returned by the final self.accumulate() on each slot sums, counts = zip(*per_slot_metrics) return sum(sums) / sum(counts) def my_estimator_model_function(features, labels, mode): ... if mode == tf.estimator.ModeKeys.EVAL: my_avg_prediction = context.experimental.make_metric( metric=predictions, reducer=MyAvgMetricReducer(), numpy_dtype=np.float32 ) return tf.estimator.EstimatorSpec( mode, loss=loss, eval_metric_ops={"my_avg_prediction": my_avg_prediction}, )
See also:
determined.estimator.EstimatorExperimentalContext.make_metric()
.-
abstract
accumulate
(metric: Any) → Any¶ accumulate is called for each batch in the evaluation dataset. Batches will be distributed across slots, so accumulate will be called many times on each slot.
accumulate should return the accumulated state. After evaluation is complete, the final return value of accumulate will become an element of the per_slot_metrics argument to cross_slot_reduce.
In the example of the calculating a distributed mean, accumulate might keep a running sum and a count of metrics received:
def accumulate(self, metric): self.sum += metric self.count += 1 return self.sum, self.count
-
abstract
cross_slot_reduce
(per_slot_metrics: List[Any]) → Any¶ cross_slot_reduce is called on the list of results from the final call to accumulate on each slot. per_slot_metrics will be a list of length N, where N is the number of slots in the trial (or 1 in non-distributed training). cross_slot_reduce must return the final metric.
In the example of calculating a distributed mean, cross_slot_reduce might recieve a list of (sum, count) tuples and it would calculate the overall mean.
def cross_slot_reduce(self, per_slot_metrics): sums, counts = zip(*per_slot_metrics) return np.array(sum(sums) / sum(counts))
Callbacks¶
To execute arbitrary Python code during the lifecycle of a
EstimatorTrial
, determined.estimator.RunHook
extends
tf.estimator.SessionRunHook.
When utilizing determined.estimator.RunHook
, users can use native
estimator hooks such as before_run()
and Determined hooks such as
on_checkpoint_end()
.
-
class
determined.estimator.
RunHook
¶ Abstract base class which extends SessionRunHook and is used to define callbacks that should execute during the lifetime of a EstimatorTrial.
Hooks should be passed in to Train Spec.
-
on_checkpoint_end
(checkpoint_dir: str) → None¶ Run after every checkpoint.
Warning
If distributed or parallel training is enabled, this callback is executed only on the chief GPU (rank = 0) which performs the checkpoint.
-
on_checkpoint_load
(checkpoint_dir: str) → None¶ Run at startup when the task environment starts up. If not resuming from checkpoint this is never called.
-
on_trial_close
() → None¶ Run when the trial close. This is the place users should execute post-trial cleanup.
-
Example usage of determined.estimator.RunHook
which adds custom
metadata checkpoints:
class MyHook(determined.estimator.RunHook):
def __init__(self, context, metadata) -> None:
self._context = context
self._metadata = metadata
def on_checkpoint_end(self, checkpoint_dir) -> None:
with open(os.path.join(checkpoint_dir, "metadata.txt"), "w") as fp:
fp.write(self._metadata)
class MyEstimatorTrial(determined.estimator.EstimatorTrial):
...
def build_train_spec(self) -> tf.estimator.TrainSpec:
return tf.estimator.TrainSpec(
make_input_fn(),
hooks=[MyHook(self.context, "my_metadata")],
)