Plugins

Plugins exist to extend the functionality of DroneManager in a straightforward way. They define a list of commands that they make available for the user interface, a list of background functions that should run continuously while the plugin is running. They can also have dependencies on other plugins, which are then loaded automatically when the plugin is loaded.

They are implemented as special classes in modules. Each module in the plugins folder is inspected for potential classes. There can one plugin per module. The name of the module defines the name of the plugin. For the command-line, they additionally provide a prefix, which is prepended to the commands to prevent collisions, i.e. multiple plugins can have a connect command. For plugin abc, this command becomes abc-connect.

There are two special types of plugins: Missions and Sensors.

Plugin Base Class

Class for extra, loadable plugins.

Plugins extend the functionality of DroneManager or Drone Classes by providing extra functions. They can also register their own commands to the CLI.

class dronemanager.plugin.Plugin(dm, logger, name, *args, **kwargs)

Bases: ABC

Generic plugin class.

The attribute cli_commands is called by the DroneManager CLI (and could be called by other UIs) to populate their interfaces. This is a dictionary with coroutines as values and human-readable names as keys. In DroneManager the names are used together with the class prefix to determine the command input on the command line, while the signature of the function is used to populate the CLI parser. The attribute background_functions should list coroutines that will run indefinitely, for example those polling for status updates from a camera. They will be started during construction of the class object, usually when the module is loaded. Note that these must be coroutines.

A common kwarg is “name”, for plugins of which multiple copies may be loaded, in which case the name acts as the unique identifier.

PREFIX = 'abc'
DEPENDENCIES = []
dm: dronemanager.dronemanager.DroneManager
start_background_functions()
async start()

Starts any background functions.

async close()

Ends all running tasks functions.

Plugin list

Camera

Plugin for controlling MAVSDK Cameras

class dronemanager.plugins.camera.CameraPlugin(dm, logger, name)

Bases: Plugin

PREFIX = 'cam'
async start()

Starts any background functions.

async close()

Removes all cameras

check_has_camera(drone)
async add_camera(drone: str, camera_id: int = 100)

Add cameras from/for a given drone to the plugin

async remove_camera(drone: str)

Remove a camera from the plugin

async status(drone: str)
async parameters(drone: str)
async set_parameter(drone: str, param_name: str, param_value: str)
async take_picture(drone: str)
async start_video(drone: str)
async stop_video(drone: str)
async set_zoom(drone: str, zoom: float)
class dronemanager.plugins.camera.ParameterOption(name, value, excludes)

Bases: object

name: str
value: int | float
excludes: list[str]
class dronemanager.plugins.camera.CameraParameter(name, param_type, default, control: bool, description: str, updates: list[str], options: list[ParameterOption], min_value: float | None, max_value: float | None, step_size: float | None)

Bases: object

value: int | float
property is_range
property is_bool
property is_option
check_option_valid(value)
get_current_otion()
get_option_by_name(option_name)
get_option_by_value(option_value)
get_options()
to_json_dict()
classmethod from_json_dict(json_dict)
class dronemanager.plugins.camera.Camera(logger, dm, drone_name: str, camera_id: int = 100)

Bases: object

cam_def_uri: str | None
parameters: dict[str, CameraParameter]
property drone
async start()
async close()
property params_loaded
log_status()
async take_picture()
async start_video()
async stop_video()
async set_zoom(zoom)
async init_cam_info()
async get_cam_param_definition()
async print_parameters()
parse_param_value(param_name: str, param_value: str) bool | int | float | None
async set_parameter(param_name: str, param_value: bool | int | float)

Controllers

Plugin for using controllers and joysticks to control drones with DM

class dronemanager.plugins.controllers.InputMapping

Bases: object

Map actions to controller axis

thrust_axis: int = None
yaw_axis: int = None
forward_axis: int = None
right_axis: int = None
arm_button: int = None
disarm_button: int = None
land_button: int = None
takeoff_button: int = None
control_button: int = None
arm_hold_duration: float = 1.0
extra_button_inputs: dict[int, set[Callable]] = {}
extra_axis_inputs: dict[Callable, list[int]] = {}
classmethod add_method_to_button(button: int, method: Callable)
classmethod add_axis_method(method: Callable, axes: list[int])
classmethod remove_method_from_button(button: int, method: Callable)
classmethod remove_axis_method(method: Callable)
class dronemanager.plugins.controllers.PS4Mapping

Bases: InputMapping

thrust_axis: int = 1
yaw_axis: int = 0
forward_axis: int = -3
right_axis: int = 2
arm_button: int = 0
disarm_button: int = 1
land_button: int = 12
takeoff_button: int = 11
control_button: int = 5
class dronemanager.plugins.controllers.ControllerPlugin(dm, logger, name, auto_set=False, auto_drone=False)

Bases: Plugin

PREFIX = 'control'
controller: pygame.joystick.JoystickType | None
async add_controller(dev_id: int)

Set which controller to use, matching the ID from check.

async remove_controller()

Remove the current controller.

async status()

Log current configuration of the controller plugin.

async set_drone(drone: str)

Set which drone is controlled by the controller.

stick_response(axis: int) float

Linear stick response with -10 to 10% dead zone.

Axis should be the joystick axis. A negative number means that the response is inverted.

async close()

Ends all running tasks functions.

External

Plugins for communication to other software

Currently only features a basic UDP server which sends data on connected drones and running missions in a json format.

class dronemanager.plugins.external.UDPClient(ip, port, frequency, duration)

Bases: object

class dronemanager.plugins.external.UDPPlugin(dm, logger, name, server_port=SERVER_PORT, max_frequency: float = MAX_FREQUENCY, min_frequency: float = MIN_FREQUENCY, max_duration: float = MAX_DURATION)

Bases: Plugin

Communication happens over port 31659. A client will send a json message with the desired frequency and duration (in seconds) to this port and the server starts answering. Frequency is capped between 1/60 and 20Hz.

Example message from client:

{
  "duration": 30,
  "frequency": 5
}
PREFIX = 'UDP'
async close()

Ends all running tasks functions.

Gimbal

class dronemanager.plugins.gimbal.GimbalPlugin(dm, logger, name)

Bases: Plugin

PREFIX = 'gimbal'
async start()

Starts any background functions.

async close()

Removes all gimbals

check_has_gimbal(drone)
async add_gimbals(drone: str, device_id: int = 154)

Add Gimbals from/for a given drone to the plugin

async remove_gimbal(drone: str)

Remove a gimbal from the plugin

async status(drone: str)
async take_control(drone: str)
async release_control(drone: str)
async set_gimbal_angles(drone: str, pitch: float, yaw: float)
async set_gimbal_rate(drone: str, pitch_rate: float, yaw_rate: float)
async point_gimbal_at(drone: str, x1: float, x2: float, x3: float, relative: bool = False)
async set_gimbal_mode(drone: str, mode: str)
class dronemanager.plugins.gimbal.Gimbal(logger, dm, drone, device_id: int = 154)

Bases: object

roll: float
pitch: float
yaw: float
yaw_absolute: float
mode: GimbalMode
primary_control: tuple[float, float]
secondary_control: tuple[float, float]
start()
async close()
property in_control
log_status()
async take_control()
async release_control()
async point_gimbal_at(lat, long, amsl)
async point_gimbal_at_relative(x, y, z)
async set_gimbal_angles(pitch, yaw)
async set_gimbal_rates(pitch_rate, yaw_rate)
async set_gimbal_mode(mode)
class dronemanager.plugins.gimbal.GimbalMulti(logger, dm, drone)

Bases: object

Should work with properly implemented gimbal managers, but those seem rare.

gimbal_list: set[int]
roll: dict[int, float]
pitch: dict[int, float]
yaw: dict[int, float]
mode: dict[int, GimbalMode]
primary_control: dict[int, tuple[float, float]]
secondary_control: dict[int, tuple[float, float]]
async close()
log_status()
in_control(gimbal_id: int | None)
async take_control(gimbal_id: int | None)
async release_control(gimbal_id: int | None)
async point_gimbal_at(gimbal_id: int | None, lat, long, amsl)
async point_gimbal_at_relative(gimbal_id: int | None, x, y, z)
async set_gimbal_angles(gimbal_id: int | None, pitch, yaw)
async set_gimbal_rates(gimbal_id: int | None, pitch_rate, yaw_rate)
async set_gimbal_mode(gimbal_id: int | None, mode)

Mission

class dronemanager.plugins.mission.MissionPlugin(dm, logger, name)

Bases: Plugin

PREFIX = 'mission'
missions: dict[str, Mission]
async close()

Stop all missions.

Returns:

mission_options()
async load(mission_module: str, name: str | None = None)

Load a new mission, which work like plugins with the name taking the role of the prefix.

async status()

Status of running missions and missions that could be loaded.

class dronemanager.plugins.mission.MissionStage(new_class_name, /, names, *, module=None, qualname=None, type=None, start=1, boundary=None)

Bases: Enum

class dronemanager.plugins.mission.FlightArea(*args, **kwargs)

Bases: ABC

abstract property x_min
abstract property x_max
abstract property y_min
abstract property y_max
abstract property z_min
abstract property z_max
bounding_box()
class dronemanager.plugins.mission.Mission(dm, logger, name='YOUDIDSOMETHINGWRONG')

Bases: Plugin, ABC

PREFIX = 'YOUDIDSOMETHINGWRONG'
current_stage: MissionStage | None
flight_area: FlightArea | None
async start()

This function is called when the mission is loaded to start all the necessary processes asynchronously.

It is NOT a “start this mission” function. By default, launches any background processes, like starting a plugin.

async close()

Shutdown function for the script. It should end any running tasks and clear any resources.

By default, it cancels any tasks tracked in self._running_tasks.

abstractmethod async reset()

Resets the mission back to the initial position.

Keep safety in mind when this requires moving drones.

abstractmethod async status()

Should write information about the current status of the mission to the logger under INFO.

abstractmethod async add_drones(names: list[str])

Add drones to the mission. Implementations should check that the drones are capable and meet mission requirements.

abstractmethod async remove_drones(names: list[str])

Remove drones from the mission. Implementations must take measures to prevent missions from running with too few drones

abstractmethod async mission_ready(drone: str)

Check whether any given drone is ready to keep going, i.e. is still connected etc.

Optitrack

Plugin for using controllers and joysticks to control drones with DM

class dronemanager.plugins.optitrack.CoordinateConversion(n_axis: str, e_axis: str, d_axis: str)

Bases: object

convert_euler(tracking_pos, tracking_euler, out_sequence='XYZ', degrees=False, in_degrees=True)
convert_quat(tracking_pos, tracking_quat, out_sequence='XYZ', degrees=False)
make_rotation()
class dronemanager.plugins.optitrack.OptitrackPlugin(dm, logger, name, server_ip: str | None = None, local_ip: str | None = None, axes: list[str] | None = None, log_frames: bool = False)

Bases: Plugin

PREFIX = 'opti'
async connect_server(remote: str = None, local: str = None)

Connect to a NatNet server at the given IP remote and local IP addresses. Localhost by default.

async add_drone(name: str, track_id: int)

Add a drone to the data forwarding system by name and track ID

async remove_drone(name: str)

Remove a drone from the data forwarding system by name

async log_available_bodies()

Print available rigid bodies on the NatNet server

async status()
async close()

Ends all running tasks functions.

Scripts

class dronemanager.plugins.scripts.ScriptsPlugin(dm, logger, name)

Bases: Plugin

PREFIX = 'script'
async start()

Starts any background functions.

async close()

Ends all running tasks functions.

async execute_script(script_name: str)

Run Script in ./Scripts with given Name

dronemanager.plugins.scripts.script_function(script_path)

Sensor

Plugin and ABC for external sensors, such as weather sensors.

class dronemanager.plugins.sensor.SensorPlugin(dm, logger, name)

Bases: Plugin

PREFIX = 'sensor'
async close()

Stop all missions.

Returns:

sensor_options()
async load(mission_module: str, name: str | None = None)

Load a new sensor, which work like plugins with the name taking the role of the prefix.

Returns:

async status()

Status of running missions and missions that could be loaded.

class dronemanager.plugins.sensor.Sensor(dm, logger, name='YOUDIDSOMETHINGWRONG')

Bases: Plugin, ABC

PREFIX = 'YOUDIDSOMETHINGWRONG'
async start()

This function is called when the sensor plugin is loaded to automatically start any background functions.

async close()

This function must end all running asyncio tasks. By default, all tasks in self._running_tasks are cancelled.

abstractmethod async connect(*args, **kwargs)

Connect to a sensor.

abstractmethod async get_data()

Should return whatever information the sensor provides,

async log_data()
abstractmethod async status()

Should write information about the current status of the sensor to the logger under INFO.

abstractmethod async disconnect()

Disconnect from a sensor. Should handle any socket clearing etc.

async reconnect()

Stream

class dronemanager.plugins.stream.StreamPlugin(dm, logger, name, ip='127.0.0.1', port=5000, **kwargs)

Bases: Plugin

Plugin to receive video stream from Unity via TCP.

Parameters:
  • ip – Default IP, can be set via config.json ‘plugin_settings’.

  • port – Default Port, can be set via config.json ‘plugin_settings’.

PREFIX = 'stream'
async start_stream(ip: str = None, port: int = None)

Starts the Video Stream.

Parameters:
  • ip – Override the default IP (optional).

  • port – Override the default Port (optional).

async stop_stream()

Stops the running stream and closes the window.

async display()

Toggles the display of the stream.

async close()

Cleanup when plugin is unloaded.

add_callback(callback_function: Callable)
remove_callback(callback_function: Callable)