How to integrate TCP/IP communication with python computer vision program and send the data to doosan robot

First you must create your own computer vision code (in this case we using YOLO v8) and then you can create a socket server on your comvi program.

Socket Server (Laptop Side)
You need to create socket server that can run without interfere the comvi. Note that the data type have to be ‘string’. So here is the example.

from ultralytics import YOLO
import cv2
import numpy as np
import math
import time
import threading
import socket

CONFIDENCE_THRESHOLD = 0.90

model = YOLO('Pizza&Crates_nano.pt')

cap = cv2.VideoCapture(1, cv2.CAP_DSHOW)

if not cap.isOpened():
    print("Cannot open camera")
    exit()
    
def get_rotated_rectangle_points(x_center, y_center, width, height, angle_degrees):
    # Convert angle to radians
    angle_radians = math.radians(angle_degrees)

    # Half width and height
    half_width = width / 2
    half_height = height / 2

    # Calculate the unrotated corner points relative to the center
    corners = np.array([
        [-half_width, -half_height],  # Bottom-left
        [ half_width, -half_height],  # Bottom-right
        [ half_width,  half_height],  # Top-right
        [-half_width,  half_height]   # Top-left
    ])

    # Rotation matrix for 2D rotation
    rotation_matrix = np.array([
        [math.cos(angle_radians), -math.sin(angle_radians)],
        [math.sin(angle_radians),  math.cos(angle_radians)]
    ])

    # Rotate each corner point
    rotated_corners = np.dot(corners, rotation_matrix)

    # Translate the rotated points back to the rectangle's center
    rotated_corners[:, 0] += x_center
    rotated_corners[:, 1] += y_center

    return rotated_corners.astype(int)

def draw_circle(circle, img, inside):
    if inside:
        colour = (0,255,0) # green
    if not inside:
        colour = (0,0,255) # red
    cv2.circle(img,(circle[0],circle[1]), circle[2], colour, 2) # draw circel with colour

def draw_rotated_rectangle(image, rectangle_params, color=(255, 0, 0), thickness=2):
    """
    Draw a rotated rectangle on the image.
    
    image: The image on which to draw.
    rectangle_params: List [x_center, y_center, width, height, rotation (in degrees)].
    color: Color of the rectangle (default is green).
    thickness: Thickness of the rectangle edges (default is 2).
    """
    # Unpack the rectangle parameters
    x_center, y_center, width, height, rotation = rectangle_params
    
    # Get the four corner points of the rotated rectangle
    points = get_rotated_rectangle_points(x_center, y_center, width, height, -rotation)
    
    # Draw the rotated rectangle using polylines
    points = points.reshape((-1, 1, 2))  # Reshape for polylines function
    cv2.polylines(image, [points], isClosed=True, color=color, thickness=thickness)

    
def rotate_point(px, py, cx, cy, angle):
    """Rotate point (px, py) around (cx, cy) by 'angle' radians."""
    # Translate point to origin
    px -= cx
    py -= cy
    
    # Apply rotation
    x_new = px * math.cos(angle) - py * math.sin(angle)
    y_new = px * math.sin(angle) + py * math.cos(angle)
    
    # Translate back
    x_new += cx
    y_new += cy
    
    return x_new, y_new

def is_circle_in_rectangle(circle, rectangle):
    """
    Check if the circle is entirely within the rotated rectangle.
    
    circle: List containing the circle's properties in the form:
            [circle_x, circle_y, radius].
    rectangle: List containing the rectangle's properties in the form:
               [rect_x, rect_y, rect_width, rect_height, rect_angle].
    """
    # Unpack circle parameters from the array
    circle_x, circle_y, radius = circle
    
    # Unpack rectangle parameters from the array
    rect_x, rect_y, rect_width, rect_height, rect_angle_deg = rectangle
    
    # Convert the rectangle's rotation from degrees to radians
    rect_angle = math.radians(rect_angle_deg)
    
    # Step 1: Rotate the circle's center point to align with the rectangle's axis
    rotated_circle_x, rotated_circle_y = rotate_point(circle_x, circle_y, rect_x, rect_y, -rect_angle)
    
    # Step 2: Get the rectangle's boundaries
    half_width = rect_width / 2
    half_height = rect_height / 2
    
    # Step 3: Check if the circle lies within the rectangle's boundaries (accounting for the radius)
    if (rotated_circle_x - radius >= rect_x - half_width and
        rotated_circle_x + radius <= rect_x + half_width and
        rotated_circle_y - radius >= rect_y - half_height and
        rotated_circle_y + radius <= rect_y + half_height):
        return True
    else:
        return False
    
def Boundingbox(results, yolo_image):
    for result in results:
        obb = result.obb.cpu().numpy()

        results_xywhrs = obb.xywhr
        results_class = obb.cls
        circles = []
        rectangles = []
        
        for results_xywhr, cls in zip(results_xywhrs, results_class):
            if cls == 1:
                X_middle = int(results_xywhr[0])
                Y_middle = int(results_xywhr[1])
    
                radius= int((results_xywhr[2] + results_xywhr[3]) / 4)

                circles.append([X_middle, Y_middle, radius])
            if cls == 0:
                X_middle = int(results_xywhr[0])
                Y_middle = int(results_xywhr[1])
    
                rotation = results_xywhr[4]
                rotation = int(rotation * 180 / math.pi)

                rectangles.append([X_middle, Y_middle,results_xywhr[2], results_xywhr[3], rotation])

        return(circles, rectangles)

def amount_in_crate():
    success, yolo_image = cap.read()
    
    #apply moddel on image
    results = model.track(yolo_image, conf=CONFIDENCE_THRESHOLD)
    
    # Get position of the boundingboxes
    circles, rectangles = Boundingbox(results, yolo_image)
    
    # check & count pizza's in crate
    pizzas_in_crate = 0
    for rectangle in rectangles:
        draw_rotated_rectangle(yolo_image, rectangle)
        for cicle in circles:
            is_inside = is_circle_in_rectangle(cicle, rectangle)
            if is_inside:
                pizzas_in_crate += 1
                
            draw_circle(cicle, yolo_image, is_inside)
    
    if (len(rectangles) == 0):
        for cicle in circles:
            draw_circle(cicle, yolo_image, False)
    cv2.imshow('frame', yolo_image)# show image
    
    return(pizzas_in_crate)

class TCPServer:
    def __init__(self, host, port):
        self.host = host
        self.port = port
        self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server_socket.bind((self.host, self.port))
        self.server_socket.listen(1)  # Listen for up to 1 connection
        self.conn = None
        self.address = None

    def handle_client(self):
        while True:
            if self.conn is None:
                self.conn, self.address = self.server_socket.accept()
                print(f"Connection from: {self.address}")

            try:
                print("Calling amount_in_crate()")
                pizzas = amount_in_crate()  # Get the number of pizzas
                print(f"Pizzas in crate: {pizzas}")
                self.conn.send(str(pizzas).encode())  # Send the number to the client

                data = self.conn.recv(1024).decode()  # Receive data from the client
                if not data:
                    break
                print(f"Message from client: {data}")
            except ConnectionResetError:
                print(f"Connection reset by client: {self.address}")
                self.conn.close()
                self.conn = None
                self.address = None
            except Exception as e:
                print(f"Error occurred: {e}")
                self.conn.close()
                self.conn = None
                self.address = None

    def start(self):
        client_thread = threading.Thread(target=self.handle_client)
        client_thread.daemon = True
        client_thread.start()

# Start the TCP server in a separate thread
tcp_server = TCPServer('192.168.123.201', 20002)
tcp_server.start()

while (1):
    start_time = time.time() #start timer
    print(amount_in_crate())
    print("My program took", time.time() - start_time, "to run") # print cycle time
    # end when q is pressed
    if cv2.waitKey(1) == ord('q'):
        break

cv2.destroyAllWindows()

As you can see we created TCPServer class to create function for TCP server, create new thread, and client handling so the main computer vision program still can continue processing the image.

Socket Client (DRL Studio)
You must run the client code on DRL and you can run it on just one laptop (You can do 2 laptop but that is unnecessary). NOTE that DRL Studio has limitied core library because it only support python 3.2.

from DRCF import *

sock = client_socket_open("192.168.123.201", 20002)

    while sock:
        msg = "Robot is ready!"
        client_socket_write(sock, msg.encode())  # request data from camera
        res, rx_data = client_socket_read(sock)  # receive data from camera
        rx_msg = rx_data.decode()  # decode data from server
        tp_log("{0}".format(rx_msg))

If the connection estabilished, the robot will send the message “Robot is ready!” to the server.

2 Likes