Skip to content

feat: Implement CursorReplayStrategy with Visual Feedback and Self-Correction #952

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions experiments/cursor/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Package containing cursor movement experiments."""
207 changes: 207 additions & 0 deletions experiments/cursor/grid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
"""Grid-based cursor movement experiment.

This approach divides the screen into a grid and uses AI feedback to identify
which cell contains the target, then refines the position within that cell.
"""

import cv2
import numpy as np

from openadapt import models
from openadapt.custom_logger import logger
from openadapt.strategies.cursor import CursorReplayStrategy


class GridCursorStrategy(CursorReplayStrategy):
"""Grid-based cursor movement strategy."""

def __init__(
self,
recording: models.Recording,
grid_size: tuple[int, int] = (4, 4), # 4x4 grid by default
refinement_steps: int = 2, # Number of times to subdivide target cell
) -> None:
"""Initialize the GridCursorStrategy.

Args:
recording (models.Recording): The recording object.
grid_size (tuple[int, int]): Number of rows and columns in the grid.
refinement_steps (int): Number of times to subdivide the target cell.
"""
super().__init__(recording, approach="grid")
self.grid_size = grid_size
self.refinement_steps = refinement_steps
self.current_grid = None
self.current_cell = None
self.refinement_step = 0

def _init_grid_approach(
self, screenshot: models.Screenshot, window_event: models.WindowEvent
) -> models.ActionEvent:
"""Initialize the grid-based cursor movement approach.

Args:
screenshot (models.Screenshot): The current screenshot.
window_event (models.WindowEvent): The current window event.

Returns:
models.ActionEvent: The initial action for the grid approach.
"""
# Create initial grid
height, width = screenshot.image.shape[:2]
rows, cols = self.grid_size

# Calculate cell dimensions
cell_height = height // rows
cell_width = width // cols

# Create grid representation
self.current_grid = {
'height': height,
'width': width,
'rows': rows,
'cols': cols,
'cell_height': cell_height,
'cell_width': cell_width,
'target_row': None,
'target_col': None,
}

# Draw grid on screenshot for visualization
img_with_grid = self._draw_grid(screenshot.image.copy())

# Ask model to identify target cell
target_cell = self._identify_target_cell(img_with_grid, window_event)
self.current_cell = target_cell

# Get initial position (center of target cell)
x, y = self._get_cell_center(target_cell)

# Create mouse move action to initial position
return models.ActionEvent(
name="mouse_move",
mouse_x=x,
mouse_y=y,
window_event=window_event,
)

def _next_grid_action(
self, screenshot: models.Screenshot, window_event: models.WindowEvent
) -> models.ActionEvent:
"""Get the next action for the grid-based approach.

Args:
screenshot (models.Screenshot): The current screenshot.
window_event (models.WindowEvent): The current window event.

Returns:
models.ActionEvent: The next action for the grid approach.
"""
if self.refinement_step >= self.refinement_steps:
# We've finished refining, perform the click
return models.ActionEvent(
name="mouse_click",
mouse_x=self.action_history[-1].mouse_x,
mouse_y=self.action_history[-1].mouse_y,
window_event=window_event,
)

# Subdivide current cell into smaller grid
self._refine_grid()
self.refinement_step += 1

# Draw refined grid
img_with_grid = self._draw_grid(screenshot.image.copy())

# Ask model to identify target subcell
target_subcell = self._identify_target_cell(img_with_grid, window_event)
self.current_cell = target_subcell

# Get refined position
x, y = self._get_cell_center(target_subcell)

# Create mouse move action to refined position
return models.ActionEvent(
name="mouse_move",
mouse_x=x,
mouse_y=y,
window_event=window_event,
)

def _draw_grid(self, img: np.ndarray) -> np.ndarray:
"""Draw the current grid on the image.

Args:
img (np.ndarray): The image to draw on.

Returns:
np.ndarray: The image with grid lines drawn.
"""
height, width = img.shape[:2]
rows, cols = self.grid_size

# Draw vertical lines
for i in range(cols + 1):
x = (width * i) // cols
cv2.line(img, (x, 0), (x, height), (0, 255, 0), 1)

# Draw horizontal lines
for i in range(rows + 1):
y = (height * i) // rows
cv2.line(img, (0, y), (width, y), (0, 255, 0), 1)

return img

def _identify_target_cell(
self, img_with_grid: np.ndarray, window_event: models.WindowEvent
) -> tuple[int, int]:
"""Ask the model to identify which grid cell contains the target.

Args:
img_with_grid (np.ndarray): Screenshot with grid overlay.
window_event (models.WindowEvent): Current window event.

Returns:
tuple[int, int]: (row, col) of the identified target cell.
"""
# TODO: Implement model prompting to identify target cell
# For now, return center cell
return (self.grid_size[0] // 2, self.grid_size[1] // 2)

def _get_cell_center(self, cell: tuple[int, int]) -> tuple[int, int]:
"""Get the center coordinates of a grid cell.

Args:
cell (tuple[int, int]): (row, col) of the cell.

Returns:
tuple[int, int]: (x, y) coordinates of cell center.
"""
row, col = cell
x = (col * self.current_grid['cell_width'] +
(col + 1) * self.current_grid['cell_width']) // 2
y = (row * self.current_grid['cell_height'] +
(row + 1) * self.current_grid['cell_height']) // 2
return x, y

def _refine_grid(self) -> None:
"""Subdivide the current cell into a finer grid."""
row, col = self.current_cell

# Calculate boundaries of current cell
x1 = col * self.current_grid['cell_width']
y1 = row * self.current_grid['cell_height']
x2 = (col + 1) * self.current_grid['cell_width']
y2 = (row + 1) * self.current_grid['cell_height']

# Update grid to focus on current cell
self.current_grid = {
'height': y2 - y1,
'width': x2 - x1,
'rows': self.grid_size[0],
'cols': self.grid_size[1],
'cell_height': (y2 - y1) // self.grid_size[0],
'cell_width': (x2 - x1) // self.grid_size[1],
'offset_x': x1,
'offset_y': y1,
}
184 changes: 184 additions & 0 deletions experiments/cursor/test_grid.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
"""Test script for evaluating the grid-based cursor movement approach."""

import cv2
import numpy as np
from pathlib import Path
import time

from openadapt import models, replay
from openadapt.custom_logger import logger
from experiments.cursor.grid import GridCursorStrategy


def create_test_recording(
target_x: int,
target_y: int,
window_width: int = 800,
window_height: int = 600,
) -> models.Recording:
"""Create a test recording with a target at the specified location.

Args:
target_x (int): Target X coordinate.
target_y (int): Target Y coordinate.
window_width (int): Width of the test window.
window_height (int): Height of the test window.

Returns:
models.Recording: A recording object for testing.
"""
# Create a blank image
img = np.zeros((window_height, window_width, 3), dtype=np.uint8)

# Draw target (red circle)
cv2.circle(img, (target_x, target_y), 5, (0, 0, 255), -1)

# Save image
test_dir = Path("experiments/cursor/test_data")
test_dir.mkdir(parents=True, exist_ok=True)
img_path = test_dir / "test_target.png"
cv2.imwrite(str(img_path), img)

# Create screenshot
screenshot = models.Screenshot(
image=img,
timestamp=time.time(),
)

# Create window event
window_event = models.WindowEvent(
left=0,
top=0,
width=window_width,
height=window_height,
timestamp=time.time(),
)

# Create recording
recording = models.Recording(
screenshots=[screenshot],
window_events=[window_event],
action_events=[], # No actions needed for testing
timestamp=time.time(),
)

return recording


def evaluate_grid_strategy(
target_positions: list[tuple[int, int]],
grid_sizes: list[tuple[int, int]] = [(2, 2), (4, 4), (8, 8)],
refinement_steps: list[int] = [1, 2, 3],
) -> dict:
"""Evaluate the grid-based cursor movement strategy.

Args:
target_positions: List of (x, y) target positions to test.
grid_sizes: List of (rows, cols) grid sizes to test.
refinement_steps: List of refinement step counts to test.

Returns:
dict: Evaluation results including accuracy and timing metrics.
"""
results = {
'grid_size': [],
'refinement_steps': [],
'target_x': [],
'target_y': [],
'final_x': [],
'final_y': [],
'distance_error': [],
'num_actions': [],
'time_taken': [],
}

for grid_size in grid_sizes:
for steps in refinement_steps:
for target_x, target_y in target_positions:
# Create test recording
recording = create_test_recording(target_x, target_y)

# Initialize strategy
strategy = GridCursorStrategy(
recording=recording,
grid_size=grid_size,
refinement_steps=steps,
)

# Time the execution
start_time = time.time()

try:
# Run strategy
strategy.run()

# Get final position
final_action = strategy.action_history[-1]
final_x = final_action.mouse_x
final_y = final_action.mouse_y

# Calculate error
distance_error = np.sqrt(
(final_x - target_x) ** 2 +
(final_y - target_y) ** 2
)

# Record results
results['grid_size'].append(f"{grid_size[0]}x{grid_size[1]}")
results['refinement_steps'].append(steps)
results['target_x'].append(target_x)
results['target_y'].append(target_y)
results['final_x'].append(final_x)
results['final_y'].append(final_y)
results['distance_error'].append(distance_error)
results['num_actions'].append(len(strategy.action_history))
results['time_taken'].append(time.time() - start_time)

except Exception as e:
logger.exception(f"Error evaluating grid {grid_size} with {steps} "
f"refinement steps at target ({target_x}, {target_y}): {e}")

return results


def main():
"""Run the grid strategy evaluation."""
# Define test cases
window_width = 800
window_height = 600
target_positions = [
(100, 100), # Top-left region
(700, 100), # Top-right region
(400, 300), # Center region
(100, 500), # Bottom-left region
(700, 500), # Bottom-right region
]

# Run evaluation
results = evaluate_grid_strategy(target_positions)

# Print summary
print("\nGrid Strategy Evaluation Results:")
print("---------------------------------")
print(f"Total test cases: {len(results['grid_size'])}")
print(f"Average distance error: {np.mean(results['distance_error']):.2f} pixels")
print(f"Average actions per target: {np.mean(results['num_actions']):.2f}")
print(f"Average time per target: {np.mean(results['time_taken']):.2f} seconds")

# Group by grid size
grid_sizes = sorted(set(results['grid_size']))
print("\nResults by grid size:")
for grid_size in grid_sizes:
indices = [i for i, g in enumerate(results['grid_size']) if g == grid_size]
errors = [results['distance_error'][i] for i in indices]
actions = [results['num_actions'][i] for i in indices]
times = [results['time_taken'][i] for i in indices]

print(f"\nGrid size: {grid_size}")
print(f" Average error: {np.mean(errors):.2f} pixels")
print(f" Average actions: {np.mean(actions):.2f}")
print(f" Average time: {np.mean(times):.2f} seconds")


if __name__ == "__main__":
main()
Loading