Stage2024/tracking_sources/tracking.py

258 lines
7.7 KiB
Python
Raw Normal View History

2024-06-28 14:08:49 +02:00
import cv2
import urllib.request
import requests
import numpy as np
import math
import pandas as pd
import asyncio
import time
import os
import glob
from core_functions import read_table
from ultralytics import YOLO
def read_image_from_url(url):
try:
req = urllib.request.urlopen(url)
img_arr = np.asarray(bytearray(req.read()), dtype=np.uint8)
return cv2.imdecode(img_arr, -1)
except Exception as _:
print("URL Error")
exit(1)
def read_image_from_fs(path):
return cv2.imread(path)
async def analyze(model, img, name):
# Run YOLOv8 tracking on the frame, persisting tracks between frames
# results = model(img, conf=conf, classes=classes)
# results = model.track(img, conf=conf, classes=classes, persist=True)
results = model.track(img, conf=conf, classes=classes, persist=True, max_det=1, imgsz=1600)
if draw:
# Visualize the results on the frame
annotated_frame = results[0].plot()
# Display the annotated frame
cv2.imshow(name, annotated_frame)
# Gather the coordinates of the box
boxes = results[0].boxes
if len(boxes) == 0:
print("No objects detected in " + name)
return -1, -1
x1, _, x2, _ = boxes.xyxyn[0].numpy()
x = (x1 + x2) / 2
width = x2 - x1
return x, width
def interpolate(ratio):
# N = 9
# cal_ratio = [0.06, 0.18, 0.28, 0.39, 0.49, 0.60, 0.70, 0.82, 0.92]
# cal_angledeg = [-25.27, -19.20, -13.46, -6.68, -0.58, 6.40, 12.37, 18.86, 24.24]
N, cal_ratio, cal_angledeg = read_table("configs/config4.csv")
if trace:
print(f"Interpolate {ratio}")
for i in range(1, N):
ratiomin = cal_ratio[i - 1]
ratiomax = cal_ratio[i]
anglemin = cal_angledeg[i - 1]
anglemax = cal_angledeg[i]
if ratiomin <= ratio < ratiomax:
if trace:
print(f" i={i}, Ratio[{i - 1}]={ratiomin}, Ratio[{i}]={ratiomax}")
print(f" i={i}, Angledeg[{i - 1}]={anglemin}, Angledeg[{i}]={anglemax}")
deltay = anglemax - anglemin
deltax = ratiomax - ratiomin
slope = deltay / deltax
if trace:
print(f" deltax={deltax}, deltay={deltay}, slope={slope}°/m")
delta = ratio - ratiomin
angle = anglemin + slope * delta
if trace:
print(f" delta={delta}, angle={angle}°")
return angle
print(f"Error: unable to interpolate {ratio}")
exit(0)
def estimate(leftd, rightd, alphar, betar, widthr, width):
alphadeg, betadeg, thetadeg = 0.0, 0.0, 0.0
alpharad, betarad, thetarad = 0.0, 0.0, 0.0
alphatan, betatan, thetatan = 0.0, 0.0, 0.0
gammadeg, gammarad, gammatan = 0.0, 0.0, 0.0
distance = 0.0
if trace:
print("Estimation")
# interpolate to get angles
alphadeg = -interpolate(alphar)
if trace:
print(f" interpolate: alphar={alphar} (ratio) -> alphadeg={alphadeg}°")
betadeg = interpolate(betar)
if trace:
print(f"interpolate: betar={betar} (ratio) -> betadeg={betadeg}°")
gammadeg = interpolate(0.5 + widthr)
if trace:
print(f"interpolate: widthr={widthr} (ratio) -> gammadeg={gammadeg}°")
# convert to radians
alpharad = math.radians(alphadeg)
if trace:
print(f"to radians: alphadeg={alphadeg} -> alpharad={alpharad}")
betarad = math.radians(betadeg)
if trace:
print(f"to radians: betadeg={betadeg} -> betarad={betarad}")
# get tan
alphatan = math.tan(alpharad)
betatan = math.tan(betarad)
if trace:
print(f"tan(): alphatan={alphatan}, betatan={betatan}")
# (x, y)
y = (leftd + rightd) / (alphatan + betatan)
x = y * alphatan - leftd
if trace:
print(f"position: x={x}, y={y}")
# (distance, angle)
distance_1 = math.sqrt(x * x + y * y)
thetatan = x / y
thetarad = math.atan(thetatan)
thetadeg = math.degrees(thetarad)
print(f"distance={distance_1}, thetatan={thetatan}, theta={thetadeg}, thetarad={thetarad}")
# print(f"distance = {distance}")
gammarad = math.radians(gammadeg)
gammatan = math.tan(gammarad)
distance_2 = width / gammatan
print(f"distance={distance_2}, gammatan={gammatan}, gamma={gammadeg}, gammarad={gammarad}")
return distance_1, distance_2, thetadeg
async def get_image_stream(cap):
ret, img = cap.read()
if not ret:
print("Error: Unable to read frame.")
exit()
return img
async def get_images_stream(l_cap, r_cap, l_imgs_buffer, r_imgs_buffer, n_image):
current_img = n_image % 2
l_img = await l_imgs_buffer[current_img]
r_img = await r_imgs_buffer[current_img]
l_imgs_buffer[current_img] = asyncio.create_task(get_image_stream(l_cap))
r_imgs_buffer[current_img] = asyncio.create_task(get_image_stream(r_cap))
return l_img, r_img
def init_stream(url):
# Set quality
requests.get(url + "/control?var=framesize&val=13")
# requests.get(url + "/control?var=framesize&val=8")
# Open stream
stream_url = url + ":81/stream"
print(stream_url)
cap = cv2.VideoCapture(stream_url)
if not cap.isOpened():
print("Error: Unable to open the stream.")
exit()
return cap
async def main():
print("Loading models")
l_model = YOLO('yolov8n.pt')
r_model = YOLO('yolov8n.pt')
l_model()
r_model()
print("Init streams")
l_cap = init_stream(l_url)
r_cap = init_stream(r_url)
ti1 = asyncio.create_task(get_image_stream(l_cap))
ti2 = asyncio.create_task(get_image_stream(r_cap))
n_images = -1
results = (0, 0, 0)
l_img_save = None
r_img_save = None
print("Loop start")
init_time = time.time()
with open("frametime.txt", 'w') as file:
while True:
n_images += 1
st_request = time.time()
l_img = await ti1
r_img = await ti2
ti1 = asyncio.create_task(get_image_stream(l_cap))
ti2 = asyncio.create_task(get_image_stream(r_cap))
dt_request = time.time() - st_request
print(f"imgs read in {dt_request}")
st_analyze = time.time()
t1 = asyncio.create_task(analyze(l_model, l_img, "left"))
t2 = asyncio.create_task(analyze(r_model, r_img, "right"))
lx, l_width = await t1
rx, r_width = await t2
if cv2.waitKey(1) & 0xFF == ord("q"):
break
if n_images == 100:
break
width = (l_width + r_width) / 2
if lx != -1 and rx != -1:
results = estimate(0, 0.72, lx, rx, width, obj_size)
l_img_save = l_img
r_img_save = r_img
dt_analyze = time.time() - st_analyze
print(f"analyse done in {dt_analyze}")
dt_total = time.time() - st_request
print(f"total time {dt_total}")
file.write(f"{dt_request} {dt_analyze} {dt_total}\n")
print("\n\n\n")
l_cap.release()
r_cap.release()
cv2.destroyAllWindows()
print("\n\n\n")
total_time = time.time() - init_time
print(f"total time {total_time}")
print(f"avrg time {total_time / n_images}")
print(f"resutlts {results}")
folder_path = 'imgs/'
num_files = len(glob.glob(os.path.join(folder_path, '*'))) / 2
cv2.imwrite(folder_path + str(num_files) + "_left.png", l_img_save)
cv2.imwrite(folder_path + str(num_files) + "_right.png", r_img_save)
print("done saving")
draw = True;
trace = False;
# classes = None
# classes = [32] # Sport ball
classes = [39] # Sport ball
# classes = [64] # Mouse
conf = 0.05
obj_size = 0.08
l_url = "http://192.168.0.151"
r_url = "http://192.168.0.152"
asyncio.run(main())