Defining an experiment#
Let's look at an example experiment configuration for an object navigation example with an actor-critic agent observing
RGB images from the environment and target object classes from the task. This is a simplified example where the
agent is confined to a single iTHOR
scene (FloorPlan1
) and needs to find a single object (a tomato). To see how one
might running a "full"/"hard" version of navigation within AI2-THOR, see our tutorials
PointNav in RoboTHOR and
Swapping in a new environment.
The interface to be implemented by the experiment specification is defined in allenact.base_abstractions.experiment_config. If you'd like to skip ahead and see the finished configuration, see here. We begin by making the following imports:
from math import ceil
from typing import Dict, Any, List, Optional
import gym
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import LambdaLR
from allenact.algorithms.onpolicy_sync.losses import PPO
from allenact.algorithms.onpolicy_sync.losses.ppo import PPOConfig
from allenact.base_abstractions.experiment_config import ExperimentConfig
from allenact.base_abstractions.sensor import SensorSuite
from allenact.base_abstractions.task import TaskSampler
from allenact_plugins.ithor_plugin.ithor_sensors import RGBSensorThor, GoalObjectTypeThorSensor
from allenact_plugins.ithor_plugin.ithor_task_samplers import ObjectNavTaskSampler
from allenact_plugins.ithor_plugin.ithor_tasks import ObjectNaviThorGridTask
from projects.objectnav_baselines.models.object_nav_models import (
ObjectNavBaselineActorCritic,
)
from allenact.utils.experiment_utils import Builder, PipelineStage, TrainingPipeline, LinearDecay
Now first method to implement is tag
, which provides a string identifying the experiment:
class ObjectNavThorPPOExperimentConfig(ExperimentConfig):
...
@classmethod
def tag(cls):
return "ObjectNavThorPPO"
...
Model creation#
Next, create_model
will be used to instantiate an
baseline object navigation actor-critic model:
class ObjectNavThorExperimentConfig(ExperimentConfig):
...
# A simple setting, train/valid/test are all the same single scene
# and we're looking for a single object
OBJECT_TYPES = ["Tomato"]
TRAIN_SCENES = ["FloorPlan1_physics"]
VALID_SCENES = ["FloorPlan1_physics"]
TEST_SCENES = ["FloorPlan1_physics"]
# Setting up sensors and basic environment details
SCREEN_SIZE = 224
SENSORS = [
RGBSensorThor(
height=SCREEN_SIZE, width=SCREEN_SIZE, use_resnet_normalization=True,
),
GoalObjectTypeThorSensor(object_types=OBJECT_TYPES),
]
...
@classmethod
def create_model(cls, **kwargs) -> nn.Module:
return ObjectNavBaselineActorCritic(
action_space=gym.spaces.Discrete(len(ObjectNaviThorGridTask.class_action_names())),
observation_space=SensorSuite(cls.SENSORS).observation_spaces,
rgb_uuid=cls.SENSORS[0].uuid,
depth_uuid=None,
goal_sensor_uuid="goal_object_type_ind",
hidden_size=512,
object_type_embedding_dim=8,
)
...
Training pipeline#
We now implement a training pipeline which trains with a single stage using PPO.
In the below we use Builder objects, which allow us to defer the instantiation
of objects of the class passed as their first argument while allowing passing additional keyword arguments to their
initializers. This is necessary when instantiating things like PyTorch optimizers who take as input the list of
parameters associated with our agent's model (something we can't know until the create_model
function has been called).
class ObjectNavThorPPOExperimentConfig(ExperimentConfig):
...
@classmethod
def training_pipeline(cls, **kwargs):
ppo_steps = int(1e6)
lr = 2.5e-4
num_mini_batch = 2 if not torch.cuda.is_available() else 6
update_repeats = 4
num_steps = 128
metric_accumulate_interval = cls.MAX_STEPS * 10 # Log every 10 max length tasks
save_interval = 10000
gamma = 0.99
use_gae = True
gae_lambda = 1.0
max_grad_norm = 0.5
return TrainingPipeline(
save_interval=save_interval,
metric_accumulate_interval=metric_accumulate_interval,
optimizer_builder=Builder(optim.Adam, dict(lr=lr)),
num_mini_batch=num_mini_batch,
update_repeats=update_repeats,
max_grad_norm=max_grad_norm,
num_steps=num_steps,
named_losses={
"ppo_loss": PPO(clip_decay=LinearDecay(ppo_steps), **PPOConfig),
},
gamma=gamma,
use_gae=use_gae,
gae_lambda=gae_lambda,
advance_scene_rollout_period=cls.ADVANCE_SCENE_ROLLOUT_PERIOD,
pipeline_stages=[
PipelineStage(loss_names=["ppo_loss"], max_stage_steps=ppo_steps,),
],
lr_scheduler_builder=Builder(
LambdaLR, {"lr_lambda": LinearDecay(steps=ppo_steps)}
),
)
...
Alternatively, we could use a more sophisticated pipeline that begins training with dataset aggregation (DAgger) before moving to training with PPO. This requires the existence of an expert (implemented in the task definition) that provides optimal actions to agents. We have implemented such a pipeline by extending the above configuration as follows
class ObjectNavThorDaggerThenPPOExperimentConfig(ObjectNavThorPPOExperimentConfig):
...
SENSORS = [
RGBSensorThor(
height=SCREEN_SIZE, width=SCREEN_SIZE, use_resnet_normalization=True,
),
GoalObjectTypeThorSensor(object_types=OBJECT_TYPES),
ExpertActionSensor(nactions=6), # Notice that we have added an expert action sensor.
]
...
@classmethod
def training_pipeline(cls, **kwargs):
dagger_steps = int(1e4) # Much smaller number of steps as we're using imitation learning
ppo_steps = int(1e6)
lr = 2.5e-4
num_mini_batch = 1 if not torch.cuda.is_available() else 6
update_repeats = 4
num_steps = 128
metric_accumulate_interval = cls.MAX_STEPS * 10 # Log every 10 max length tasks
save_interval = 10000
gamma = 0.99
use_gae = True
gae_lambda = 1.0
max_grad_norm = 0.5
return TrainingPipeline(
save_interval=save_interval,
metric_accumulate_interval=metric_accumulate_interval,
optimizer_builder=Builder(optim.Adam, dict(lr=lr)),
num_mini_batch=num_mini_batch,
update_repeats=update_repeats,
max_grad_norm=max_grad_norm,
num_steps=num_steps,
named_losses={
"ppo_loss": PPO(clip_decay=LinearDecay(ppo_steps), **PPOConfig),
"imitation_loss": Imitation(), # We add an imitation loss.
},
gamma=gamma,
use_gae=use_gae,
gae_lambda=gae_lambda,
advance_scene_rollout_period=cls.ADVANCE_SCENE_ROLLOUT_PERIOD,
pipeline_stages=[ # The pipeline now has two stages, in the first
# we use DAgger (imitation loss + teacher forcing).
# In the second stage we no longer use teacher
# forcing and add in the ppo loss.
PipelineStage(
loss_names=["imitation_loss"],
teacher_forcing=LinearDecay(
startp=1.0, endp=0.0, steps=dagger_steps,
),
max_stage_steps=dagger_steps,
),
PipelineStage(loss_names=["ppo_loss"], max_stage_steps=ppo_steps,),
],
lr_scheduler_builder=Builder(
LambdaLR, {"lr_lambda": LinearDecay(steps=ppo_steps)}
),
)
A version of our experiment config file for which we have implemented this two-stage training
can be found here.
This two-stage configuration ObjectNavThorDaggerThenPPOExperimentConfig
is actually implemented as a subclass of ObjectNavThorPPOExperimentConfig
.
This is a common pattern used in AllenAct and lets one skip a great deal of boilerplate when defining a new
experiment as a slight modification of an old one. Of course one must then be careful: changes to the superclass
configuration will propagate to all subclassed configurations.
Machine configuration#
In machine_params
we define machine configuration parameters that will be used for training, validation and test:
class ObjectNavThorPPOExperimentConfig(allenact.base_abstractions.experiment_config.ExperimentConfig):
...
@classmethod
def machine_params(cls, mode="train", **kwargs):
num_gpus = torch.cuda.device_count()
has_gpu = num_gpus != 0
if mode == "train":
nprocesses = 20 if has_gpu else 4
gpu_ids = [0] if has_gpu else []
elif mode == "valid":
nprocesses = 1
gpu_ids = [1 % num_gpus] if has_gpu else []
elif mode == "test":
nprocesses = 1
gpu_ids = [0] if has_gpu else []
else:
raise NotImplementedError("mode must be 'train', 'valid', or 'test'.")
return {"nprocesses": nprocesses, "gpu_ids": gpu_ids}
...
In the above we use the availability of cuda (torch.cuda.device_count() != 0
) to determine whether
we should use parameters appropriate for local machines or for a server. We might optionally add a list of
sampler_devices
to assign devices (likely those not used for running our agent) to task sampling workers.
Task sampling#
The above has defined the model we'd like to use, the types of losses we wish to use during training,
and the machine specific parameters that should be used during training. Critically we have not yet
defined which task we wish to train our agent to complete. This is done by implementing the
ExperimentConfig.make_sampler_fn
function
class ObjectNavThorPPOExperimentConfig(ExperimentConfig):
...
@classmethod
def make_sampler_fn(cls, **kwargs) -> TaskSampler:
return ObjectNavTaskSampler(**kwargs)
...
Now, before training starts, our trainer will know to generate a collection of task
samplers using make_sampler_fn
for training (and possibly validation or testing).
The kwargs
parameters used in the above function call can be different for each
training process, we implement such differences using the
ExperimentConfig.train_task_sampler_args
function
class ObjectNavThorPPOExperimentConfig(ExperimentConfig):
...
def train_task_sampler_args(
self,
process_ind: int,
total_processes: int,
devices: Optional[List[int]] = None,
seeds: Optional[List[int]] = None,
deterministic_cudnn: bool = False,
) -> Dict[str, Any]:
res = self._get_sampler_args_for_scene_split(
self.TRAIN_SCENES,
process_ind,
total_processes,
seeds=seeds,
deterministic_cudnn=deterministic_cudnn,
)
res["scene_period"] = "manual"
res["env_args"] = {}
res["env_args"].update(self.ENV_ARGS)
res["env_args"]["x_display"] = (
("0.%d" % devices[process_ind % len(devices)])
if devices is not None and len(devices) > 0
else None
)
return res
...
Now training process i
out of n
total processes will be instantiated with the parameters
ObjectNavThorPPOExperimentConfig.train_task_sampler_args(i, n, ...)
. Similar functions
(valid_task_sampler_args
and test_task_sampler_args
) exist for generating validation
and test parameters. Note also that with this function we can assign devices to run
our environment for each worker. See the documentation of ExperimentConfig
for more information.
Running the experiment#
We are now in the position to run the experiment (with seed 12345) using the command
python main.py object_nav_ithor_ppo_one_object -b projects/tutorials -s 12345