Code examples

H.264 frames to NumPy arrays

Demonstrates the decoding of pre-captured video frames (of my cat, Marble) and loading into NumPy arrays suitable for analysis.

#!/usr/bin/env python3

from ast import literal_eval
from jetson_tello import h264_frame_to_numpy_array, FrameDecodeError

with open("frames.txt", "r") as f:
    frames = [literal_eval(l) for l in f]

i = 1
for frame in frames:
    try:
        array, width, height = h264_frame_to_numpy_array(frame)

        print(f'frame {i} size: {width} x {height}')
        print(array)
    except FrameDecodeError:
        print(f'frame {i} - (decode error)')
    i += 1
      

H.264 frames to CUDA

The same video frames, but this time loaded into CUDA memory for GPU processing. The images are saved back out to JPEG files just so you can see that they are valid.

#!/usr/bin/env python3

from pathlib import Path
import jetson.utils
from jetson_tello import h264_frame_to_cuda, FrameDecodeError

from ast import literal_eval
with open("frames.txt", "r") as f:
    frames = [literal_eval(l) for l in f]

Path("h264_frames_to_cuda").mkdir(exist_ok=True)

i = 1
for frame in frames:
    try:
        cuda, width, height = h264_frame_to_cuda(frame)

        print(f'frame {i}:')
        print(cuda)

        file_path = f'h264_frames_to_cuda/frame-{i}.jpg'
        jetson.utils.saveImageRGBA(file_path, cuda, width, height)
        print(f'saved as {file_path}')
        i += 1
    except FrameDecodeError:
        print('(decode error, frame skipped)')
    print('-----------------------------------------------------------------')

Object detection

Runs the frames through the ssd-mobilenet-v2 detector and (hopefully) finds the cat.

#!/usr/bin/env python3

import jetson.inference
from jetson_tello import h264_frame_to_cuda, FrameDecodeError

from ast import literal_eval
with open("frames.txt", "r") as f:
    frames = [literal_eval(l) for l in f]

net = jetson.inference.detectNet("ssd-mobilenet-v2", threshold=0.5)

i = 1
for frame in frames:
    try:
        cuda, width, height = h264_frame_to_cuda(frame)

        detections = net.Detect(cuda)

        print(f'frame {i} detections:')
        for d in  detections:
            print(d)

        i += 1
    except FrameDecodeError:
        pass    

Face detection

Pipes video frames captured from the flying Tello drone through the facenet detector and reports any human faces it sees.

#!/usr/bin/env python3

import asyncio
import jetson.inference
from jetson_tello import h264_frame_to_cuda, FrameDecodeError
from tello_asyncio import Tello

net = jetson.inference.detectNet("facenet", threshold=0.5)


async def process_frame(frame):
    try:
        cuda, width, height = h264_frame_to_cuda(frame)

        detections = net.Detect(cuda)

        for d in detections:
            print(d)

    except FrameDecodeError:
        pass    

async def main():
    global next_frame

    drone = Tello()

    await drone.wifi_wait_for_network()
    await drone.connect()
    await drone.start_video()

    async def fly():
        await drone.takeoff()

    async def process_video():
        async for frame in drone.video_stream:
            await process_frame(frame)

    try:
        await asyncio.wait([fly(), process_video()])
    finally:
        await drone.stop_video()
        await drone.disconnect()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Face and object detection

As face detection above, but also detecting objects in view at the same time.

#!/usr/bin/env python3

import asyncio
import jetson.inference
from jetson_tello import h264_frame_to_cuda, FrameDecodeError
from tello_asyncio import Tello

face_detector = jetson.inference.detectNet("facenet", threshold=0.5)
object_detector = jetson.inference.detectNet("ssd-mobilenet-v2", threshold=0.5)


async def process_frame(frame):
    try:
        cuda, width, height = h264_frame_to_cuda(frame)

        face_detections = face_detector.Detect(cuda)
        object_detections = object_detector.Detect(cuda)

        print('faces:')
        for d in face_detections:
            print(d)

        print('objects:')
        for d in object_detections:
            print(d)

    except FrameDecodeError:
        pass    

async def main():
    global next_frame

    drone = Tello()

    await drone.wifi_wait_for_network()
    await drone.connect()
    await drone.start_video()

    async def fly():
        await drone.takeoff()

    async def process_video():
        async for frame in drone.video_stream:
            await process_frame(frame)

    try:
        await asyncio.wait([fly(), process_video()])
    finally:
        await drone.stop_video()
        await drone.disconnect()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Example output:

faces:
<detectNet.Detection object>
   -- ClassID: 0
   -- Confidence: 0.809878
   -- Left:    434.667
   -- Top:     0
   -- Right:   702.267
   -- Bottom:  302.5
   -- Width:   267.6
   -- Height:  302.5
   -- Area:    80949
   -- Center:  (568.467, 151.25)
objects:
<detectNet.Detection object>
   -- ClassID: 7
   -- Confidence: 0.500977
   -- Left:    0
   -- Top:     7.30054
   -- Right:   959
   -- Bottom:  719.04
   -- Width:   959
   -- Height:  711.74
   -- Area:    682559
   -- Center:  (479.5, 363.171)