import cv2 import urllib.request import requests import numpy as np import math import pandas as pd import asyncio import time import os import glob from core_functions import read_table from ultralytics import YOLO def read_image_from_url(url): try: req = urllib.request.urlopen(url) img_arr = np.asarray(bytearray(req.read()), dtype=np.uint8) return cv2.imdecode(img_arr, -1) except Exception as _: print("URL Error") exit(1) def read_image_from_fs(path): return cv2.imread(path) async def analyze(model, img, name): # Run YOLOv8 tracking on the frame, persisting tracks between frames # results = model(img, conf=conf, classes=classes) # results = model.track(img, conf=conf, classes=classes, persist=True) results = model.track(img, conf=conf, classes=classes, persist=True, max_det=1, imgsz=1600) if draw: # Visualize the results on the frame annotated_frame = results[0].plot() # Display the annotated frame cv2.imshow(name, annotated_frame) # Gather the coordinates of the box boxes = results[0].boxes if len(boxes) == 0: print("No objects detected in " + name) return -1, -1 x1, _, x2, _ = boxes.xyxyn[0].numpy() x = (x1 + x2) / 2 width = x2 - x1 return x, width def interpolate(ratio): # N = 9 # cal_ratio = [0.06, 0.18, 0.28, 0.39, 0.49, 0.60, 0.70, 0.82, 0.92] # cal_angledeg = [-25.27, -19.20, -13.46, -6.68, -0.58, 6.40, 12.37, 18.86, 24.24] N, cal_ratio, cal_angledeg = read_table("configs/config4.csv") if trace: print(f"Interpolate {ratio}") for i in range(1, N): ratiomin = cal_ratio[i - 1] ratiomax = cal_ratio[i] anglemin = cal_angledeg[i - 1] anglemax = cal_angledeg[i] if ratiomin <= ratio < ratiomax: if trace: print(f" i={i}, Ratio[{i - 1}]={ratiomin}, Ratio[{i}]={ratiomax}") print(f" i={i}, Angledeg[{i - 1}]={anglemin}, Angledeg[{i}]={anglemax}") deltay = anglemax - anglemin deltax = ratiomax - ratiomin slope = deltay / deltax if trace: print(f" deltax={deltax}, deltay={deltay}, slope={slope}°/m") delta = ratio - ratiomin angle = anglemin + slope * delta if trace: print(f" delta={delta}, angle={angle}°") return angle print(f"Error: unable to interpolate {ratio}") exit(0) def estimate(leftd, rightd, alphar, betar, widthr, width): alphadeg, betadeg, thetadeg = 0.0, 0.0, 0.0 alpharad, betarad, thetarad = 0.0, 0.0, 0.0 alphatan, betatan, thetatan = 0.0, 0.0, 0.0 gammadeg, gammarad, gammatan = 0.0, 0.0, 0.0 distance = 0.0 if trace: print("Estimation") # interpolate to get angles alphadeg = -interpolate(alphar) if trace: print(f" interpolate: alphar={alphar} (ratio) -> alphadeg={alphadeg}°") betadeg = interpolate(betar) if trace: print(f"interpolate: betar={betar} (ratio) -> betadeg={betadeg}°") gammadeg = interpolate(0.5 + widthr) if trace: print(f"interpolate: widthr={widthr} (ratio) -> gammadeg={gammadeg}°") # convert to radians alpharad = math.radians(alphadeg) if trace: print(f"to radians: alphadeg={alphadeg} -> alpharad={alpharad}") betarad = math.radians(betadeg) if trace: print(f"to radians: betadeg={betadeg} -> betarad={betarad}") # get tan alphatan = math.tan(alpharad) betatan = math.tan(betarad) if trace: print(f"tan(): alphatan={alphatan}, betatan={betatan}") # (x, y) y = (leftd + rightd) / (alphatan + betatan) x = y * alphatan - leftd if trace: print(f"position: x={x}, y={y}") # (distance, angle) distance_1 = math.sqrt(x * x + y * y) thetatan = x / y thetarad = math.atan(thetatan) thetadeg = math.degrees(thetarad) print(f"distance={distance_1}, thetatan={thetatan}, theta={thetadeg}, thetarad={thetarad}") # print(f"distance = {distance}") gammarad = math.radians(gammadeg) gammatan = math.tan(gammarad) distance_2 = width / gammatan print(f"distance={distance_2}, gammatan={gammatan}, gamma={gammadeg}, gammarad={gammarad}") return distance_1, distance_2, thetadeg async def get_image_stream(cap): ret, img = cap.read() if not ret: print("Error: Unable to read frame.") exit() return img async def get_images_stream(l_cap, r_cap, l_imgs_buffer, r_imgs_buffer, n_image): current_img = n_image % 2 l_img = await l_imgs_buffer[current_img] r_img = await r_imgs_buffer[current_img] l_imgs_buffer[current_img] = asyncio.create_task(get_image_stream(l_cap)) r_imgs_buffer[current_img] = asyncio.create_task(get_image_stream(r_cap)) return l_img, r_img def init_stream(url): # Set quality requests.get(url + "/control?var=framesize&val=13") # requests.get(url + "/control?var=framesize&val=8") # Open stream stream_url = url + ":81/stream" print(stream_url) cap = cv2.VideoCapture(stream_url) if not cap.isOpened(): print("Error: Unable to open the stream.") exit() return cap async def main(): print("Loading models") l_model = YOLO('yolov8n.pt') r_model = YOLO('yolov8n.pt') l_model() r_model() print("Init streams") l_cap = init_stream(l_url) r_cap = init_stream(r_url) ti1 = asyncio.create_task(get_image_stream(l_cap)) ti2 = asyncio.create_task(get_image_stream(r_cap)) n_images = -1 results = (0, 0, 0) l_img_save = None r_img_save = None print("Loop start") init_time = time.time() with open("frametime.txt", 'w') as file: while True: n_images += 1 st_request = time.time() l_img = await ti1 r_img = await ti2 ti1 = asyncio.create_task(get_image_stream(l_cap)) ti2 = asyncio.create_task(get_image_stream(r_cap)) dt_request = time.time() - st_request print(f"imgs read in {dt_request}") st_analyze = time.time() t1 = asyncio.create_task(analyze(l_model, l_img, "left")) t2 = asyncio.create_task(analyze(r_model, r_img, "right")) lx, l_width = await t1 rx, r_width = await t2 if cv2.waitKey(1) & 0xFF == ord("q"): break if n_images == 100: break width = (l_width + r_width) / 2 if lx != -1 and rx != -1: results = estimate(0, 0.72, lx, rx, width, obj_size) l_img_save = l_img r_img_save = r_img dt_analyze = time.time() - st_analyze print(f"analyse done in {dt_analyze}") dt_total = time.time() - st_request print(f"total time {dt_total}") file.write(f"{dt_request} {dt_analyze} {dt_total}\n") print("\n\n\n") l_cap.release() r_cap.release() cv2.destroyAllWindows() print("\n\n\n") total_time = time.time() - init_time print(f"total time {total_time}") print(f"avrg time {total_time / n_images}") print(f"resutlts {results}") folder_path = 'imgs/' num_files = len(glob.glob(os.path.join(folder_path, '*'))) / 2 cv2.imwrite(folder_path + str(num_files) + "_left.png", l_img_save) cv2.imwrite(folder_path + str(num_files) + "_right.png", r_img_save) print("done saving") draw = True; trace = False; # classes = None # classes = [32] # Sport ball classes = [39] # Sport ball # classes = [64] # Mouse conf = 0.05 obj_size = 0.08 l_url = "http://192.168.0.151" r_url = "http://192.168.0.152" asyncio.run(main())