Skip to content

Defining a new training pipeline#

Defining a new training pipeline, or even new learning algorithms, is straightforward with the modular design in AllenAct.

A convenience Builder object allows us to defer the instantiation of objects of the class passed as their first argument while allowing passing additional keyword arguments to their initializers.

On-policy#

We can implement a training pipeline which trains with a single stage using PPO:

class ObjectNavThorPPOExperimentConfig(ExperimentConfig):
    ...
    @classmethod
    def training_pipeline(cls, **kwargs):
        ppo_steps = int(1e6)
        lr = 2.5e-4
        num_mini_batch = 2 if not torch.cuda.is_available() else 6
        update_repeats = 4
        num_steps = 128
        metric_accumulate_interval = cls.MAX_STEPS * 10  # Log every 10 max length tasks
        save_interval = 10000
        gamma = 0.99
        use_gae = True
        gae_lambda = 1.0
        max_grad_norm = 0.5

        return TrainingPipeline(
            save_interval=save_interval,
            metric_accumulate_interval=metric_accumulate_interval,
            optimizer_builder=Builder(optim.Adam, dict(lr=lr)),
            num_mini_batch=num_mini_batch,
            update_repeats=update_repeats,
            max_grad_norm=max_grad_norm,
            num_steps=num_steps,
            named_losses={
                "ppo_loss": PPO(clip_decay=LinearDecay(ppo_steps), **PPOConfig),
            },
            gamma=gamma,
            use_gae=use_gae,
            gae_lambda=gae_lambda,
            advance_scene_rollout_period=cls.ADVANCE_SCENE_ROLLOUT_PERIOD,
            pipeline_stages=[
                PipelineStage(loss_names=["ppo_loss"], max_stage_steps=ppo_steps,),
            ],
            lr_scheduler_builder=Builder(
                LambdaLR, {"lr_lambda": LinearDecay(steps=ppo_steps)}
            ),
        )
    ...

Alternatively, we could use a more complex pipeline that includes dataset aggregation (DAgger). This requires the existence of an expert (implemented in the task definition) that provides optimal actions to agents. We have implemented such a pipeline by extending the above configuration as follows:

class ObjectNavThorDaggerThenPPOExperimentConfig(ExperimentConfig):
    ...
    SENSORS = [
        ...
        ExpertActionSensor(nactions=6), # Notice that we have added
                                        # an expert action sensor.
    ]
    ...
    @classmethod
    def training_pipeline(cls, **kwargs):
        dagger_steps = int(1e4) # Much smaller number of steps as we're using imitation learning
        ppo_steps = int(1e6)
        lr = 2.5e-4
        num_mini_batch = 1 if not torch.cuda.is_available() else 6
        update_repeats = 4
        num_steps = 128
        metric_accumulate_interval = cls.MAX_STEPS * 10  # Log every 10 max length tasks
        save_interval = 10000
        gamma = 0.99
        use_gae = True
        gae_lambda = 1.0
        max_grad_norm = 0.5

        return TrainingPipeline(
            save_interval=save_interval,
            metric_accumulate_interval=metric_accumulate_interval,
            optimizer_builder=Builder(optim.Adam, dict(lr=lr)),
            num_mini_batch=num_mini_batch,
            update_repeats=update_repeats,
            max_grad_norm=max_grad_norm,
            num_steps=num_steps,
            named_losses={
                "ppo_loss": PPO(clip_decay=LinearDecay(ppo_steps), **PPOConfig),
                "imitation_loss": Imitation(), # We add an imitation loss.
            },
            gamma=gamma,
            use_gae=use_gae,
            gae_lambda=gae_lambda,
            advance_scene_rollout_period=cls.ADVANCE_SCENE_ROLLOUT_PERIOD,
            pipeline_stages=[ # The pipeline now has two stages, in the first
                              # we use DAgger (imitation loss + teacher forcing).
                              # In the second stage we no longer use teacher
                              # forcing and add in the ppo loss.
                PipelineStage(
                    loss_names=["imitation_loss"],
                    teacher_forcing=LinearDecay(
                        startp=1.0, endp=0.0, steps=dagger_steps,
                    ),
                    max_stage_steps=dagger_steps,
                ),
                PipelineStage(loss_names=["ppo_loss"], max_stage_steps=ppo_steps,),
            ],
            lr_scheduler_builder=Builder(
                LambdaLR, {"lr_lambda": LinearDecay(steps=ppo_steps)}
            ),
        )

Off-policy#

We can also define off-policy stages where an external dataset is used, in this case, for Behavior Cloning:

class BCOffPolicyBabyAIGoToLocalExperimentConfig(ExperimentConfig):
    ...
    @classmethod
    def training_pipeline(cls, **kwargs):
        total_train_steps = int(1e7)
        num_steps=128
        return TrainingPipeline(
            save_interval=10000,  # Save every 10000 steps (approximately)
            metric_accumulate_interval=1,
            optimizer_builder=Builder(optim.Adam, dict(lr=2.5e-4)),
            num_mini_batch=0,  # no on-policy training
            update_repeats=0,  # no on-policy training
            num_steps=num_steps // 4,  # rollouts from environment tasks
            named_losses={
                "offpolicy_expert_ce_loss": MiniGridOffPolicyExpertCELoss(
                    total_episodes_in_epoch=int(1e6)  # dataset contains 1M episodes
                ),
            },
            gamma=0.99,
            use_gae=True,
            gae_lambda=1.0,
            max_grad_norm=0.5,
            advance_scene_rollout_period=None,
            pipeline_stages=[
                PipelineStage(
                    loss_names=[],  # no on-policy losses
                    max_stage_steps=total_train_steps,
                    # We only train from off-policy data:
                    offpolicy_component=OffPolicyPipelineComponent(
                        data_iterator_builder=lambda **kwargs: create_minigrid_offpolicy_data_iterator(
                            path=DATASET_PATH,  # external dataset
                            nrollouts=128,  # per trainer batch size
                            rollout_len=num_steps,  # For truncated-BPTT
                            instr_len=5,
                            **kwargs,
                        ),
                        loss_names=["offpolicy_expert_ce_loss"],  # off-policy losses
                        updates=16,  # 16 batches per rollout
                    ),
                ),
            ],
        )

Note that, in this example, 128 / 4 = 32 steps will be sampled from tasks in a MiniGrid environment (which can be useful to track the agent's performance), while a subgraph of the model (in this case the entire Actor) is trained from batches of 128-step truncated episodes sampled from an offline dataset stored under DATASET_PATH.