DroneManager

DroneManager module

class dronemanager.core.DMConfig(drone_configs: DroneConfigs, mav_system_id: int = 246, mav_component_id: int = 190, default_plugins: list[str] | None = None, testing: bool = False, plugin_settings: dict[str, dict[str, Any]] | None = None, **kwargs)

Bases: object

Configuration class for DroneManager

CORE_ENTRIES = ['drones', 'mav_system_id', 'mav_component_id', 'plugin_settings', 'default_plugins', 'testing']
classmethod from_file(filepath: str)
to_file(filepath: str)
class dronemanager.core.DroneManager(drone_class, logger=None, log_to_console=True, console_log_level=logging.INFO)

Bases: object

Core class of the library.

drones: dict[str, Drone]
plugins: set[str]
async connect_to_drone(name: str, mavsdk_server_address: str | None = None, mavsdk_server_port: int | None = None, drone_address: str | None = None, timeout: float = 30, telemetry_frequency: float | None = None, log_telemetry=True)
async disconnect(names: str | Collection[str], force=False)
async arm(names: str | Collection[str], schedule=False)
async disarm(names: str | Collection[str], schedule=False)
async takeoff(names: str | Collection[str], altitude=2.0, schedule=False)
async change_flightmode(names: str | Collection[str], flightmode: str, schedule=False)
async land(names: str | Collection[str], schedule=False)
set_fence(names: str | Collection[str], fence: Fence)

Set a fence on drones

pause(names: str | Collection[str])
resume(names: str | Collection[str])
async yaw_to(names: str | Collection[str], yaw: Collection[float] | float, yaw_rate: Collection[float] | float | None = None, local: Collection[float] | None = None, tol: float | Collection[float] = 2, schedule: bool = True)
async fly_to(names: str | Collection[str], local: Collection[float] | None = None, gps: Collection[float] | None = None, waypoint: list[Waypoint] | None = None, yaw: Collection[float] | float | None = None, tol: float | Collection[float] = 0.25, schedule=True)
async move(names: str | Collection[str], offset: Collection[float], yaw: Collection[float] | float | None = None, use_gps: bool | Collection[bool] = True, tol: float | Collection[float] = 0.25, schedule: bool = True)

Move the drones by offsets meters from their current positions. Which coordinate system is used depends on no_gps.

Parameters:
  • names

  • offset – An array with the offsets for the drones. Should contain the number of meters to move in NED.

  • yaw

  • use_gps – If False, use the local coordinate system, otherwise use GPS.

  • tol

  • schedule

Returns:

async wait(names: str | Collection[str], delay: float | Collection[float], schedule=True)
async orbit(name, radius, velocity, center_lat, center_long, amsl)
async go_to(names: str | Collection[str], local: Collection[float] | None = None, gps: Collection[float] | None = None, waypoint: list[Waypoint] | None = None, yaw: Collection[float] | float | None = None, tol: float | Collection[float] = 0.25, schedule=True)

Note that this uses the GO TO Mavlink command instead of offboard mode.

async action_stop(names)
async kill(names)
add_remove_func(func)
add_connect_func(func)
async close()
plugin_options()
currently_loaded_plugins()
add_plugin_load_func(func)
add_plugin_unload_func(func)
async load_plugin(plugin_module: str, plugin_name: str | None = None, options: list[str] | None = None, class_getter: callable = None)
async unload_plugin(plugin_name)

Drone module

class dronemanager.drone.Battery

Bases: object

class dronemanager.drone.DroneConfig(drone_name: str, address: str | None, position_rate: float = 5.0, log_telemetry: bool = False, max_h_vel: float = 10.0, max_down_vel: float = 1.0, max_up_vel: float = 3.0, max_h_acc: float = 1.5, max_v_acc: float = 0.5, max_h_jerk: float = 0.5, max_v_jerk: float = 0.5, max_yaw_vel: float = 60, max_yaw_acc: float = 30, max_yaw_jerk: float = 30, size: float = 1.0, rtsp: str | None = None, **kwargs)

Bases: object

Convenience class for drone configurations.

These exist for convenience purposes, to allow people to define drone objects with fixed parameters for easy reuse. They can be saved to and loaded from files. A given configuration is used when dm.connect_to_drone is called with the drone name matching the configuration.

class dronemanager.drone.DroneConfigs(configs: list[DroneConfig])

Bases: object

class dronemanager.drone.DroneParams(raw=None)

Bases: object

class dronemanager.drone.Drone(name, *args, log_to_file=True, config: DroneConfig | None = None, **kwargs)

Bases: ABC, Thread

VALID_FLIGHTMODES = {}
VALID_SETPOINT_TYPES = {}
run()

Method representing the thread’s activity.

You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.

schedule_task(coro) Future
execute_task(coro) Future
add_handler(handler)
abstractmethod async stop_execution()

Stops the thread. This function should be called at the end of any implementing function.

Returns:

pause()

Pause task execution by setting self.is_paused to True.

Note that it is not possible to “pause” what the drone is doing in a general way. What “pausing” a task does or if a task can even be paused depends on the specific task and implementation. Subclasses must define and implement this behaviour themselves. However, pausing is always possible between tasks, and this is the default behaviour for subclasses that do not implement any of their own: When paused, drones will finish their current task and then wait until unpaused before beginning the next task.

resume()

Resume executing tasks.

abstract property is_connected: bool
abstract property is_armed: bool
abstract property flightmode: FlightMode
abstract property in_air: bool
property autopilot: str
abstract property fix_type: FixType
abstract property position_global: ndarray

Array with the GPS coordinates [latitude, longitude, AMSL]

Type:

return

abstract property position_ned: ndarray
abstract property velocity: ndarray
abstract property speed: float
abstract property attitude: ndarray

RPY in degrees

abstract property batteries: dict[int, Battery]
property parameters_loaded: bool
abstractmethod async connect(drone_addr, *args, **kwargs)
abstractmethod async load_parameters()
abstractmethod async disconnect(force=False) bool
abstractmethod async arm() bool
abstractmethod async disarm() bool
abstractmethod async takeoff(altitude=2.0) bool

Takes off to the specified altitude above current position.

Note that altitude is positive.

Parameters:

altitude – Takeoff altitude above.

Returns:

abstractmethod async change_flight_mode(flightmode) bool
is_at_waypoint(waypoint: Waypoint, pos_tolerance=0.25, vel_tolerance=0.1, yaw_tolerance=1) bool

Definition of “is at” depends on the waypoint type. At most checks position, yaw and velocity.

Parameters:
  • waypoint

  • pos_tolerance – In meters

  • vel_tolerance – In m/s

  • yaw_tolerance – In degrees

Returns:

is_at_pos(target_pos, tolerance=0.25) bool
Parameters:
  • target_pos – Array with target position. If a yaw is also passed (i.e. array length 4), it is ignored.

  • tolerance – How close we have to be to the target position to be considered “at” it.

Returns:

is_at_heading(target_heading, tolerance=1) bool
is_at_gps(target_gps, tolerance=0.25) bool
is_at_vel(target_vel, tolerance=0.1)
abstractmethod async yaw_to(target_yaw, yaw_rate=30, local=None, tolerance=2)
abstractmethod async spin_at_rate(yaw_rate, duration, direction='cw') bool
set_fence(fence_type: type[Fence], *args, **kwargs)
check_waypoint(waypoint: Waypoint)

Check if a waypoint is valid and within any geofence (if such a fence is set)

abstractmethod async set_setpoint(setpoint: Waypoint) bool
async wait(delay: float)

Wait delay seconds.

This function is useful with scheduling to schedule short waits between moves.

abstractmethod async fly_to(local: ndarray | None = None, gps: ndarray | None = None, yaw: float | None = None, waypoint: Waypoint | None = None, tolerance=0.25)

Fly to the specified position.

Parameters:
  • local

  • gps

  • yaw

  • waypoint

  • tolerance

Returns:

abstractmethod async move(offset: ndarray, yaw: float | None = None, use_gps=True, tolerance=0.25)

Move from the current position by the specified distances.

Parameters:
  • offset – A numpy array with the information how much to move along each axis in meters.

  • yaw

  • use_gps

  • tolerance

Returns:

abstractmethod async orbit(radius, velocity, latitude, longitude, amsl) bool
abstractmethod async land() bool
abstractmethod async stop() bool
abstractmethod async kill() bool
clear_queue() None

Clears the action queue.

Does not cancel the current action.

Returns:

cancel_action() None

Cancels the current action task

Returns:

class dronemanager.drone.DroneMAVSDK(name, mavsdk_server_address: str | None = None, mavsdk_server_port: int = 50051, config: DroneConfig | None = None)

Bases: Drone

VALID_FLIGHTMODES = {'altitude', 'hold', 'land', 'offboard', 'position', 'return', 'takeoff'}
VALID_SETPOINT_TYPES = {WayPointType.POS_GLOBAL, WayPointType.POS_NED, WayPointType.POS_VEL_ACC_NED, WayPointType.POS_VEL_NED, WayPointType.VEL_NED}
system: System | None
mav_conn: MAVPassthrough
property is_connected: bool
property is_armed: bool
property flightmode: FlightMode
property in_air: bool
property fix_type: FixType
property position_global: ndarray

Array with the GPS coordinates [latitude, longitude, AMSL]

Type:

return

property altitude_above_takeoff: float
property position_ned: ndarray
property velocity: ndarray
property speed: float
property attitude: ndarray

RPY in degrees

property heading: float
property batteries: dict[int, Battery]
async connect(drone_address, system_id=0, component_id=0, log_telemetry=None) bool
async load_parameters()
async disconnect(force=False)
async arm()
async disarm()
async takeoff(altitude=2.0) bool
Parameters:

altitude

Returns:

async change_flight_mode(flightmode: str, timeout: float = 5)
async set_setpoint(setpoint: Waypoint)
async yaw_to(target_yaw, yaw_rate=30, local=None, tolerance=2)

Yawing to the target heading as you do so at the specified rate, maintaining current position.

Uses the local coordinate system for to determine and maintain position. Pausable.

Parameters:
  • target_yaw – Heading as a degree fom -180 to 180, right positive, 0 forward.

  • yaw_rate

  • local – Position setpoint during yaw.

  • tolerance – How close we have to get to the heading before this function returns.

Returns:

async spin_at_rate(yaw_rate, duration, direction='cw')

Spin in place at the given rate for the given duration.

Pausable.

Parameters:
  • yaw_rate

  • duration

  • direction

Returns:

async fly_to(local: ndarray | None = None, gps: ndarray | None = None, yaw: float | None = None, waypoint: Waypoint | None = None, tolerance=0.25, put_into_offboard=True, log=True)

Fly to a specified point in offboard mode. Uses path generators and followers to get there.

If multiple target are provided (for example GPS and local coordinates), we prefer coordinates in this fashion: Waypoint > GPS > local, i.e. in the example, the local coordinates would be ignored.

Parameters:
  • local

  • gps

  • yaw

  • waypoint

  • tolerance

  • put_into_offboard

  • log

Returns:

async move(offset, yaw: float | None = None, use_gps=True, tolerance=0.25)

Move from the current position by the specified distances.

Parameters:
  • offset – A numpy array with the information how much to move along each axis in meters.

  • yaw

  • use_gps

  • tolerance

Returns:

async go_to(local: ndarray | None = None, gps: ndarray | None = None, yaw: float | None = None, waypoint: Waypoint | None = None, tolerance=0.25)
async orbit(radius, velocity, center_lat, center_long, amsl)
async land()
async manual_control_position()
async manual_control_altitude()
async set_manual_control_input(x, y, z, r)
async stop_execution()

Stops all coroutines, closes all connections, etc.

Returns:

async stop()
async kill()

MAVLink connection

class dronemanager.mavpassthrough.MAVPassthrough(dialect=None, loggername='passthrough', log_messages=True)

Bases: object

connect_gcs(address)
connect_drone(loc, appendix, scheme='udp')
connected_to_drone()
connected_to_gcs()
send_as_gcs(msg)
listen_ack(command_id, target_component) Future
listen_message(message_id, target_component) Future
send_cmd_long(target_component, cmd, param1=math.nan, param2=math.nan, param3=math.nan, param4=math.nan, param5=math.nan, param6=math.nan, param7=math.nan) Future
send_request_message(target_component, message_id, param1=math.nan, param2=math.nan, param3=math.nan, param4=math.nan, param5=math.nan, response_target=1)
async request_message(target_component, message_id, param1=math.nan, param2=math.nan, param3=math.nan, param4=math.nan, param5=math.nan, response_target=1, timeout=5)
send_param_ext_request_list(target_component)
send_param_ext_set(target_component, param_id, param_value: int | float, param_type: int)
send_param_ext_request_read(target_component, param_id: str, param_index: int = None)
add_drone_message_callback(message_id: int, func: Callable[[any], Coroutine])
remove_drone_message_callback(message_id: int, func: Callable[[any], Coroutine])
async stop()
async dronemanager.mavpassthrough.main()