import can import socket import threading import time class TCPServer: def __init__(self, host='0.0.0.0', port=3169): self.host = host self.port = port self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.is_running = False self.clients = [] # List of connected clients self.lock = threading.Lock() # Lock for thread-safe client list self.no_clients_logged = False def _log(self, message): """ Custom logging with a dynamic header. """ timestamp = time.strftime('%H:%M:%S') client_count = len(self.clients) header_state = 'BROADCASTING' if client_count > 0 else 'WAITING FOR CLIENTS' if client_count == 0 else 'UNKNOWN' header = f'[{timestamp}] ||| Connected clients: {client_count} ||| {header_state}' print(f'{header}\n{message}\n') def setup(self): """ Set up the server by binding to the host and port and start to listen """ self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server_socket.bind((self.host, self.port)) self.server_socket.listen(5) self.is_running = True self._log(f'The server is set up and listening on {self.host}:{self.port}') def shutdown(self): """ Shut down the server and disconnect all clients. """ self.is_running = False for client_socket in self.clients: try: client_socket.close() except Exception: pass self.clients.clear() self.server_socket.close() self._log(f'The server that was running on {self.host}:{self.port} has been shut down.', 'SHUTDOWN') def run(self): """ Run the server and accept clients. """ self._log('The server is running. Press CTRL+C to stop.') try: while self.is_running: client_socket, client_address = self.server_socket.accept() client_thread = threading.Thread( target = self.handle_client, args = (client_socket, client_address), daemon = True ) client_thread.start() except KeyboardInterrupt: self._log('The server is shutting down...', 'SHUTDOWN') finally: self.shutdown() def handle_client(self, client_socket, client_address): """ Handle new client connections and add them to the client list. """ with self.lock: client_socket.settimeout(30) self.clients.append(client_socket) self._log(f'Client connected: {client_address}') try: while self.is_running: data = client_socket.recv(1024) if not data: break message = data.decode().strip() self._log(f'Received message from {client_address}: {message}') if message.lower() == 'ping': response = 'pong' client_socket.sendall('pong'.encode()) self._log(f'Sent response to {client_address}: {response}') except Exception as e: self._log(f'Error with client {client_address}: {e}') finally: self._log(f'Client disconnected: {client_address}') with self.lock: self.clients.remove(client_socket) client_socket.close() def broadcast_data(self, data): """Send the {data} to all connected clients, disconnecting those that are inactive.""" with self.lock: if not self.clients: if not self.no_clients_logged: # Log the message once self._log('No connected clients, pausing data broadcast.') self.no_clients_logged = True return # Skip broadcasting if no clients are connected self.no_clients_logged = False self._log(f'Sending data to {len(self.clients)} clients.') for client_socket in list(self.clients): try: # Set a 100ms timeout for the socket, attempt to send the data and reset timeout after successful send #client_socket.settimeout(0.1) client_socket.sendall(data.encode()) self._log(f'Data sent to client: {client_socket.getpeername()} -- {data}') #client_socket.settimeout(None) except (socket.timeout, BrokenPipeError, ConnectionResetError) as e: self._log(f'Disconnecting client due to inactivity or error: {e}') self.clients.remove(client_socket) try: client_socket.close() except Exception: pass def dump_can_messages(interface: str, server: TCPServer): """ Reads and dumps CAN messages from the specified interface. :param interface: The name of the CAN interface (e.g., 'can0'). """ bus = None try: bus = can.interface.Bus(channel=interface, interface='socketcan') print(f"Listening for messages on {interface}...") while True: message = bus.recv() # Blocks until a message is received if message: msg = f"Timestamp: {message.timestamp:.6f}, ID: {message.arbitration_id:03X}, Data: {message.data.hex()}\n" server.broadcast_data(msg) except KeyboardInterrupt: print("\nExiting...") except can.CanError as e: print(f"CAN Error: {e}") finally: if bus is not None: bus.shutdown() print("CAN bus shut down cleanly.") if __name__ == "__main__": server = TCPServer() server.setup() server_thread = threading.Thread(target = server.run, daemon = True) server_thread.start() dump_can_messages("can1", server)