Defining a new task#
In order to use new tasks in our experiments, we need to define two classes:
- A Task, including, among others, a
step
implementation providing a RLStepResult, ametrics
method providing quantitative performance measurements for agents and, optionally, aquery_expert
method that can be used e.g. with an imitation loss during training. - A TaskSampler, that allows instantiating new Tasks for the agents to solve during training, validation and testing.
Task#
Let's define a semantic navigation task, where agents have to navigate from a starting point in an environment to an object of a specific class using a minimal amount of steps and deciding when the goal has been reached.
We need to define the methods action_space
, render
, _step
, reached_terminal_state
, class_action_names
, close
,
metrics
, and query_expert
from the base Task
definition.
Initialization, action space and termination#
Let's start with the definition of the action space and task initialization:
...
from allenact_plugins.ithor_plugin.ithor_constants import (
MOVE_AHEAD,
ROTATE_LEFT,
ROTATE_RIGHT,
LOOK_DOWN,
LOOK_UP,
END,
)
...
class ObjectNaviThorGridTask(Task[IThorEnvironment]):
_actions = (MOVE_AHEAD, ROTATE_LEFT, ROTATE_RIGHT, LOOK_DOWN, LOOK_UP, END)
def __init__(
self,
env: IThorEnvironment,
sensors: List[Sensor],
task_info: Dict[str, Any],
max_steps: int,
**kwargs
) -> None:
super().__init__(
env=env,
sensors=sensors,
task_info=task_info,
max_steps=max_steps, **kwargs
)
self._took_end_action: bool = False
self._success: Optional[bool] = False
@property
def action_space(self):
return gym.spaces.Discrete(len(self._actions))
@classmethod
def class_action_names(cls) -> Tuple[str, ...]:
return cls._actions
def reached_terminal_state(self) -> bool:
return self._took_end_action
def close(self) -> None:
self.env.stop()
...
Step method#
Next, we define the main method _step
that will be called every time the agent produces a new action:
class ObjectNaviThorGridTask(Task[IThorEnvironment]):
...
def _step(self, action: Union[int, Sequence[int]]) -> RLStepResult:
assert isinstance(action, int)
action = cast(int, action)
action_str = self.class_action_names()[action]
if action_str == END:
self._took_end_action = True
self._success = self.is_goal_object_visible()
self.last_action_success = self._success
else:
self.env.step({"action": action_str})
self.last_action_success = self.env.last_action_success
step_result = RLStepResult(
observation=self.get_observations(),
reward=self.judge(),
done=self.is_done(),
info={"last_action_success": self.last_action_success},
)
return step_result
...
def is_goal_object_visible(self) -> bool:
return any(
o["objectType"] == self.task_info["object_type"]
for o in self.env.visible_objects()
)
def judge(self) -> float:
reward = -0.01
if not self.last_action_success:
reward += -0.03
if self._took_end_action:
reward += 1.0 if self._success else -1.0
return float(reward)
Metrics, rendering and expert actions#
Finally, we define methods to render and evaluate the current task, and optionally generate expert actions to be used e.g. for DAgger training.
def render(self, mode: str = "rgb", *args, **kwargs) -> numpy.ndarray:
assert mode == "rgb", "only rgb rendering is implemented"
return self.env.current_frame
def metrics(self) -> Dict[str, Any]:
if not self.is_done():
return {}
else:
return {"success": self._success, "ep_length": self.num_steps_taken()}
def query_expert(self, **kwargs) -> Tuple[int, bool]:
return my_objnav_expert_implementation(self)
TaskSampler#
We also need to define the corresponding TaskSampler, which must contain implementations for methods __len__
,
total_unique
, last_sampled_task
, next_task
, close
, reset
, and set_seed
. Currently,
an additional method all_observation_spaces_equal
is used to ensure compatibility with the current
RolloutStorage.
Let's define a tasks sampler able to provide an infinite number of object navigation tasks for AI2-THOR.
Initialization and termination#
class ObjectNavTaskSampler(TaskSampler):
def __init__(
self,
scenes: List[str],
object_types: str,
sensors: List[Sensor],
max_steps: int,
env_args: Dict[str, Any],
action_space: gym.Space,
seed: Optional[int] = None,
deterministic_cudnn: bool = False,
*args,
**kwargs
) -> None:
self.env_args = env_args
self.scenes = scenes
self.object_types = object_types
self.grid_size = 0.25
self.env: Optional[IThorEnvironment] = None
self.sensors = sensors
self.max_steps = max_steps
self._action_sapce = action_space
self.scene_id: Optional[int] = None
self._last_sampled_task: Optional[ObjectNaviThorGridTask] = None
set_seed(seed)
self.reset()
def close(self) -> None:
if self.env is not None:
self.env.stop()
def reset(self):
self.scene_id = 0
def _create_environment(self) -> IThorEnvironment:
env = IThorEnvironment(
make_agents_visible=False,
object_open_speed=0.05,
restrict_to_initially_reachable_points=True,
**self.env_args,
)
return env
Task sampling#
Finally, we need to define methods to determine the number of available tasks (possibly infinite) and sample tasks:
@property
def length(self) -> Union[int, float]:
return float("inf")
@property
def total_unique(self) -> Optional[Union[int, float]]:
return None
@property
def last_sampled_task(self) -> Optional[ObjectNaviThorGridTask]:
return self._last_sampled_task
@property
def all_observation_spaces_equal(self) -> bool:
return True
def next_task(self) -> Optional[ObjectNaviThorGridTask]:
self.scene_id = random.randint(0, len(self.scenes) - 1)
self.scene = self.scenes[self.scene_id]
if self.env is not None:
if scene != self.env.scene_name:
self.env.reset(scene)
else:
self.env = self._create_environment()
self.env.reset(scene_name=scene)
self.env.randomize_agent_location()
task_info = {"object_type": random.sample(self.object_types, 1)}
self._last_sampled_task = ObjectNaviThorGridTask(
env=self.env,
sensors=self.sensors,
task_info=task_info,
max_steps=self.max_steps,
action_space=self._action_sapce,
)
return self._last_sampled_task