I have instructed a LLM to add synthetic 50Hz POS-Messages with a static position using a python script, which is slow, but works well:
#!/usr/bin/env python3
"""
Inject synthetic POS messages into ArduPilot DataFlash .bin logs.
Adds POS messages at 10Hz frequency while preserving all original messages.
Uses pymavlink for robust log parsing and writing.
save as inject_pos.py
Usage:
python inject_pos.py input.bin output.bin --lat 47.6205 --lon -122.3493 --alt 100.0
Optional relative altitudes:
python inject_pos.py input.bin output.bin --lat 53.156324 --lon 8.164749 --alt 10.0 --rel-home-alt 5.0 --rel-origin-alt 5.0
"""
import sys
import struct
import argparse
from typing import List, Tuple
try:
from pymavlink import mavutil
except ImportError:
print("Error: pymavlink not installed. Install with: pip install pymavlink")
sys.exit(1)
class POSInjector:
"""Handles injection of POS messages into ArduPilot logs."""
# POS message definition
POS_MSG_ID = 74
POS_NAME = "POS"
POS_FORMAT = "QLLfff"
POS_COLUMNS = "TimeUS,Lat,Lng,Alt,RelHomeAlt,RelOriginAlt"
POS_INTERVAL_US = 100000 # 10Hz = 100ms = 100000 microseconds
def __init__(self, input_path: str, output_path: str,
lat: float, lon: float, alt: float,
rel_home_alt: float = 0.0, rel_origin_alt: float = 0.0):
"""
Initialize the POS injector.
Args:
input_path: Path to input .bin log file
output_path: Path to output .bin log file
lat: Latitude in degrees (will be converted to 1e7 format)
lon: Longitude in degrees (will be converted to 1e7 format)
alt: Altitude in meters
rel_home_alt: Relative altitude from home in meters
rel_origin_alt: Relative altitude from origin in meters
"""
self.input_path = input_path
self.output_path = output_path
self.lat_1e7 = int(lat * 1e7) # Convert to ArduPilot's 1e7 format
self.lon_1e7 = int(lon * 1e7) # Convert to ArduPilot's 1e7 format
self.alt = float(alt)
self.rel_home_alt = float(rel_home_alt)
self.rel_origin_alt = float(rel_origin_alt)
def create_pos_message(self, time_us: int) -> bytes:
"""
Create a POS message with the given timestamp.
Args:
time_us: Timestamp in microseconds
Returns:
Raw bytes for the POS message
"""
# Message header: 0xA3, 0x95 (ArduPilot log header)
# Message ID: 74 (POS)
# Payload: QLLfff format
header = struct.pack('BBB', 0xA3, 0x95, self.POS_MSG_ID)
# Pack the payload according to format QLLfff
# Q = unsigned long long (8 bytes) - TimeUS
# L = unsigned long (4 bytes) - Lat (in 1e7 format)
# L = unsigned long (4 bytes) - Lng (in 1e7 format)
# f = float (4 bytes) - Alt
# f = float (4 bytes) - RelHomeAlt
# f = float (4 bytes) - RelOriginAlt
payload = struct.pack('<QLLfff',
time_us,
self.lat_1e7,
self.lon_1e7,
self.alt,
self.rel_home_alt,
self.rel_origin_alt)
return header + payload
def inject_pos_messages(self) -> Tuple[int, int]:
"""
Read input log, inject POS messages at 10Hz, and write to output.
Returns:
Tuple of (total_messages_written, pos_messages_added)
"""
try:
# Open input log
mlog = mavutil.mavlink_connection(self.input_path)
except Exception as e:
print(f"Error opening input log: {e}")
sys.exit(1)
# Open output file in binary write mode
try:
outfile = open(self.output_path, 'wb')
except Exception as e:
print(f"Error opening output file: {e}")
sys.exit(1)
# Track statistics
total_messages = 0
pos_messages_added = 0
next_pos_time = 0
has_fmt_pos = False
print(f"Processing log: {self.input_path}")
print(f"Output file: {self.output_path}")
print(f"Position: Lat={self.lat_1e7/1e7:.6f}, Lon={self.lon_1e7/1e7:.6f}, Alt={self.alt:.2f}m")
print(f"POS frequency: 10Hz (every {self.POS_INTERVAL_US/1000:.0f}ms)")
print()
# Read through the log
while True:
# Get the raw message bytes
msg = mlog.recv_msg()
if msg is None:
break
# Check if this is a FMT message for POS
if msg.get_type() == 'FMT' and hasattr(msg, 'Name') and msg.Name == self.POS_NAME:
has_fmt_pos = True
# Get the current message timestamp if available
current_time = getattr(msg, 'TimeUS', None)
# If we have a timestamp, inject POS messages as needed
if current_time is not None:
# Initialize next_pos_time on first timestamped message
if next_pos_time == 0:
next_pos_time = (current_time // self.POS_INTERVAL_US) * self.POS_INTERVAL_US
# Inject all POS messages that should come before this message
while next_pos_time <= current_time:
pos_msg = self.create_pos_message(next_pos_time)
outfile.write(pos_msg)
pos_messages_added += 1
next_pos_time += self.POS_INTERVAL_US
# Write the original message
outfile.write(msg.get_msgbuf())
total_messages += 1
# Progress indicator
if total_messages % 10000 == 0:
print(f"Processed {total_messages} messages, added {pos_messages_added} POS messages...", end='\r')
outfile.close()
print() # New line after progress indicator
if not has_fmt_pos:
print("\nWarning: No FMT definition for POS found in original log.")
print("The output log may need a FMT definition for message ID 74 (POS).")
return total_messages, pos_messages_added
def main():
parser = argparse.ArgumentParser(
description='Inject synthetic POS messages into ArduPilot .bin logs at 10Hz',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s input.bin output.bin --lat 47.6205 --lon -122.3493 --alt 100.0
%(prog)s flight.bin flight_with_pos.bin --lat 53.156324 --lon 8.164749 --alt 10.0 --rel-home-alt 5.0
"""
)
parser.add_argument('input', help='Input .bin log file')
parser.add_argument('output', help='Output .bin log file')
parser.add_argument('--lat', type=float, required=True,
help='Latitude in degrees (e.g., 47.6205)')
parser.add_argument('--lon', type=float, required=True,
help='Longitude in degrees (e.g., -122.3493)')
parser.add_argument('--alt', type=float, required=True,
help='Altitude in meters')
parser.add_argument('--rel-home-alt', type=float, default=0.0,
help='Relative altitude from home in meters (default: 0.0)')
parser.add_argument('--rel-origin-alt', type=float, default=0.0,
help='Relative altitude from origin in meters (default: 0.0)')
args = parser.parse_args()
# Validate inputs
if not (-90 <= args.lat <= 90):
print("Error: Latitude must be between -90 and 90 degrees")
sys.exit(1)
if not (-180 <= args.lon <= 180):
print("Error: Longitude must be between -180 and 180 degrees")
sys.exit(1)
# Create injector and process
injector = POSInjector(
args.input,
args.output,
args.lat,
args.lon,
args.alt,
args.rel_home_alt,
args.rel_origin_alt
)
try:
total, pos_added = injector.inject_pos_messages()
print()
print("=" * 60)
print("Injection complete!")
print("=" * 60)
print(f"Total messages written: {total}")
print(f"POS messages added: {pos_added}")
print(f"Output saved to: {args.output}")
except Exception as e:
print(f"\nError during processing: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
if __name__ == '__main__':
main()
It’s just a workaround, but for my purposes it works good enough