DroneManager
DroneManager module
- class dronemanager.core.DMConfig(drone_configs: DroneConfigs, mav_system_id: int = 246, mav_component_id: int = 190, default_plugins: list[str] | None = None, testing: bool = False, plugin_settings: dict[str, dict[str, Any]] | None = None, **kwargs)
Bases:
objectConfiguration class for DroneManager
- CORE_ENTRIES = ['drones', 'mav_system_id', 'mav_component_id', 'plugin_settings', 'default_plugins', 'testing']
- classmethod from_file(filepath: str)
- to_file(filepath: str)
- class dronemanager.core.DroneManager(drone_class, logger=None, log_to_console=True, console_log_level=logging.INFO)
Bases:
objectCore class of the library.
- plugins: set[str]
- async connect_to_drone(name: str, mavsdk_server_address: str | None = None, mavsdk_server_port: int | None = None, drone_address: str | None = None, timeout: float = 30, telemetry_frequency: float | None = None, log_telemetry=True)
- async disconnect(names: str | Collection[str], force=False)
- async arm(names: str | Collection[str], schedule=False)
- async disarm(names: str | Collection[str], schedule=False)
- async takeoff(names: str | Collection[str], altitude=2.0, schedule=False)
- async change_flightmode(names: str | Collection[str], flightmode: str, schedule=False)
- async land(names: str | Collection[str], schedule=False)
- pause(names: str | Collection[str])
- resume(names: str | Collection[str])
- async yaw_to(names: str | Collection[str], yaw: Collection[float] | float, yaw_rate: Collection[float] | float | None = None, local: Collection[float] | None = None, tol: float | Collection[float] = 2, schedule: bool = True)
- async fly_to(names: str | Collection[str], local: Collection[float] | None = None, gps: Collection[float] | None = None, waypoint: list[Waypoint] | None = None, yaw: Collection[float] | float | None = None, tol: float | Collection[float] = 0.25, schedule=True)
- async move(names: str | Collection[str], offset: Collection[float], yaw: Collection[float] | float | None = None, use_gps: bool | Collection[bool] = True, tol: float | Collection[float] = 0.25, schedule: bool = True)
Move the drones by offsets meters from their current positions. Which coordinate system is used depends on no_gps.
- Parameters:
names
offset – An array with the offsets for the drones. Should contain the number of meters to move in NED.
yaw
use_gps – If False, use the local coordinate system, otherwise use GPS.
tol
schedule
- Returns:
- async wait(names: str | Collection[str], delay: float | Collection[float], schedule=True)
- async orbit(name, radius, velocity, center_lat, center_long, amsl)
- async go_to(names: str | Collection[str], local: Collection[float] | None = None, gps: Collection[float] | None = None, waypoint: list[Waypoint] | None = None, yaw: Collection[float] | float | None = None, tol: float | Collection[float] = 0.25, schedule=True)
Note that this uses the GO TO Mavlink command instead of offboard mode.
- async action_stop(names)
- async kill(names)
- add_remove_func(func)
- add_connect_func(func)
- async close()
- plugin_options()
- currently_loaded_plugins()
- add_plugin_load_func(func)
- add_plugin_unload_func(func)
- async load_plugin(plugin_module: str, plugin_name: str | None = None, options: list[str] | None = None, class_getter: callable = None)
- async unload_plugin(plugin_name)
Drone module
- class dronemanager.drone.Battery
Bases:
object
- class dronemanager.drone.DroneConfig(drone_name: str, address: str | None, position_rate: float = 5.0, log_telemetry: bool = False, max_h_vel: float = 10.0, max_down_vel: float = 1.0, max_up_vel: float = 3.0, max_h_acc: float = 1.5, max_v_acc: float = 0.5, max_h_jerk: float = 0.5, max_v_jerk: float = 0.5, max_yaw_vel: float = 60, max_yaw_acc: float = 30, max_yaw_jerk: float = 30, size: float = 1.0, rtsp: str | None = None, **kwargs)
Bases:
objectConvenience class for drone configurations.
These exist for convenience purposes, to allow people to define drone objects with fixed parameters for easy reuse. They can be saved to and loaded from files. A given configuration is used when dm.connect_to_drone is called with the drone name matching the configuration.
- class dronemanager.drone.DroneConfigs(configs: list[DroneConfig])
Bases:
object
- class dronemanager.drone.DroneParams(raw=None)
Bases:
object
- class dronemanager.drone.Drone(name, *args, log_to_file=True, config: DroneConfig | None = None, **kwargs)
Bases:
ABC,Thread- VALID_FLIGHTMODES = {}
- VALID_SETPOINT_TYPES = {}
- run()
Method representing the thread’s activity.
You may override this method in a subclass. The standard run() method invokes the callable object passed to the object’s constructor as the target argument, if any, with sequential and keyword arguments taken from the args and kwargs arguments, respectively.
- schedule_task(coro) Future
- execute_task(coro) Future
- add_handler(handler)
- abstractmethod async stop_execution()
Stops the thread. This function should be called at the end of any implementing function.
- Returns:
- pause()
Pause task execution by setting self.is_paused to True.
Note that it is not possible to “pause” what the drone is doing in a general way. What “pausing” a task does or if a task can even be paused depends on the specific task and implementation. Subclasses must define and implement this behaviour themselves. However, pausing is always possible between tasks, and this is the default behaviour for subclasses that do not implement any of their own: When paused, drones will finish their current task and then wait until unpaused before beginning the next task.
- resume()
Resume executing tasks.
- abstract property is_connected: bool
- abstract property is_armed: bool
- abstract property flightmode: FlightMode
- abstract property in_air: bool
- property autopilot: str
- abstract property fix_type: FixType
- abstract property position_global: ndarray
Array with the GPS coordinates [latitude, longitude, AMSL]
- Type:
return
- abstract property position_ned: ndarray
- abstract property velocity: ndarray
- abstract property speed: float
- abstract property attitude: ndarray
RPY in degrees
- property parameters_loaded: bool
- abstractmethod async connect(drone_addr, *args, **kwargs)
- abstractmethod async load_parameters()
- abstractmethod async disconnect(force=False) bool
- abstractmethod async arm() bool
- abstractmethod async disarm() bool
- abstractmethod async takeoff(altitude=2.0) bool
Takes off to the specified altitude above current position.
Note that altitude is positive.
- Parameters:
altitude – Takeoff altitude above.
- Returns:
- abstractmethod async change_flight_mode(flightmode) bool
- is_at_waypoint(waypoint: Waypoint, pos_tolerance=0.25, vel_tolerance=0.1, yaw_tolerance=1) bool
Definition of “is at” depends on the waypoint type. At most checks position, yaw and velocity.
- Parameters:
waypoint
pos_tolerance – In meters
vel_tolerance – In m/s
yaw_tolerance – In degrees
- Returns:
- is_at_pos(target_pos, tolerance=0.25) bool
- Parameters:
target_pos – Array with target position. If a yaw is also passed (i.e. array length 4), it is ignored.
tolerance – How close we have to be to the target position to be considered “at” it.
- Returns:
- is_at_heading(target_heading, tolerance=1) bool
- is_at_gps(target_gps, tolerance=0.25) bool
- is_at_vel(target_vel, tolerance=0.1)
- abstractmethod async yaw_to(target_yaw, yaw_rate=30, local=None, tolerance=2)
- abstractmethod async spin_at_rate(yaw_rate, duration, direction='cw') bool
- check_waypoint(waypoint: Waypoint)
Check if a waypoint is valid and within any geofence (if such a fence is set)
- async wait(delay: float)
Wait delay seconds.
This function is useful with scheduling to schedule short waits between moves.
- abstractmethod async fly_to(local: ndarray | None = None, gps: ndarray | None = None, yaw: float | None = None, waypoint: Waypoint | None = None, tolerance=0.25)
Fly to the specified position.
- Parameters:
local
gps
yaw
waypoint
tolerance
- Returns:
- abstractmethod async move(offset: ndarray, yaw: float | None = None, use_gps=True, tolerance=0.25)
Move from the current position by the specified distances.
- Parameters:
offset – A numpy array with the information how much to move along each axis in meters.
yaw
use_gps
tolerance
- Returns:
- abstractmethod async orbit(radius, velocity, latitude, longitude, amsl) bool
- abstractmethod async land() bool
- abstractmethod async stop() bool
- abstractmethod async kill() bool
- clear_queue() None
Clears the action queue.
Does not cancel the current action.
- Returns:
- cancel_action() None
Cancels the current action task
- Returns:
- class dronemanager.drone.DroneMAVSDK(name, mavsdk_server_address: str | None = None, mavsdk_server_port: int = 50051, config: DroneConfig | None = None)
Bases:
Drone- VALID_FLIGHTMODES = {'altitude', 'hold', 'land', 'offboard', 'position', 'return', 'takeoff'}
- VALID_SETPOINT_TYPES = {WayPointType.POS_GLOBAL, WayPointType.POS_NED, WayPointType.POS_VEL_ACC_NED, WayPointType.POS_VEL_NED, WayPointType.VEL_NED}
- system: System | None
- mav_conn: MAVPassthrough
- property is_connected: bool
- property is_armed: bool
- property flightmode: FlightMode
- property in_air: bool
- property fix_type: FixType
- property position_global: ndarray
Array with the GPS coordinates [latitude, longitude, AMSL]
- Type:
return
- property altitude_above_takeoff: float
- property position_ned: ndarray
- property velocity: ndarray
- property speed: float
- property attitude: ndarray
RPY in degrees
- property heading: float
- async connect(drone_address, system_id=0, component_id=0, log_telemetry=None) bool
- async load_parameters()
- async disconnect(force=False)
- async arm()
- async disarm()
- async takeoff(altitude=2.0) bool
- Parameters:
altitude
- Returns:
- async change_flight_mode(flightmode: str, timeout: float = 5)
- async yaw_to(target_yaw, yaw_rate=30, local=None, tolerance=2)
Yawing to the target heading as you do so at the specified rate, maintaining current position.
Uses the local coordinate system for to determine and maintain position. Pausable.
- Parameters:
target_yaw – Heading as a degree fom -180 to 180, right positive, 0 forward.
yaw_rate
local – Position setpoint during yaw.
tolerance – How close we have to get to the heading before this function returns.
- Returns:
- async spin_at_rate(yaw_rate, duration, direction='cw')
Spin in place at the given rate for the given duration.
Pausable.
- Parameters:
yaw_rate
duration
direction
- Returns:
- async fly_to(local: ndarray | None = None, gps: ndarray | None = None, yaw: float | None = None, waypoint: Waypoint | None = None, tolerance=0.25, put_into_offboard=True, log=True)
Fly to a specified point in offboard mode. Uses path generators and followers to get there.
If multiple target are provided (for example GPS and local coordinates), we prefer coordinates in this fashion: Waypoint > GPS > local, i.e. in the example, the local coordinates would be ignored.
- Parameters:
local
gps
yaw
waypoint
tolerance
put_into_offboard
log
- Returns:
- async move(offset, yaw: float | None = None, use_gps=True, tolerance=0.25)
Move from the current position by the specified distances.
- Parameters:
offset – A numpy array with the information how much to move along each axis in meters.
yaw
use_gps
tolerance
- Returns:
- async go_to(local: ndarray | None = None, gps: ndarray | None = None, yaw: float | None = None, waypoint: Waypoint | None = None, tolerance=0.25)
- async orbit(radius, velocity, center_lat, center_long, amsl)
- async land()
- async manual_control_position()
- async manual_control_altitude()
- async set_manual_control_input(x, y, z, r)
- async stop_execution()
Stops all coroutines, closes all connections, etc.
- Returns:
- async stop()
- async kill()
MAVLink connection
- class dronemanager.mavpassthrough.MAVPassthrough(dialect=None, loggername='passthrough', log_messages=True)
Bases:
object- connect_gcs(address)
- connect_drone(loc, appendix, scheme='udp')
- connected_to_drone()
- connected_to_gcs()
- send_as_gcs(msg)
- listen_ack(command_id, target_component) Future
- listen_message(message_id, target_component) Future
- send_cmd_long(target_component, cmd, param1=math.nan, param2=math.nan, param3=math.nan, param4=math.nan, param5=math.nan, param6=math.nan, param7=math.nan) Future
- send_request_message(target_component, message_id, param1=math.nan, param2=math.nan, param3=math.nan, param4=math.nan, param5=math.nan, response_target=1)
- async request_message(target_component, message_id, param1=math.nan, param2=math.nan, param3=math.nan, param4=math.nan, param5=math.nan, response_target=1, timeout=5)
- send_param_ext_request_list(target_component)
- send_param_ext_set(target_component, param_id, param_value: int | float, param_type: int)
- send_param_ext_request_read(target_component, param_id: str, param_index: int = None)
- add_drone_message_callback(message_id: int, func: Callable[[any], Coroutine])
- remove_drone_message_callback(message_id: int, func: Callable[[any], Coroutine])
- async stop()
- async dronemanager.mavpassthrough.main()