diff --git a/client_emulator.py b/client_emulator.py new file mode 100644 index 0000000..7792492 --- /dev/null +++ b/client_emulator.py @@ -0,0 +1,93 @@ +import socket +import threading +import time +import random + +class TCPClient: + def __init__(self, host='127.0.0.1', port=3169): + self.host = host + self.port = port + self.client_socket = None + self.is_running = False # Control flag for background tasks + + def connect(self): + """Connect to the server.""" + try: + self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.client_socket.connect((self.host, self.port)) + self.is_running = True + print(f"Connected to server at {self.host}:{self.port}") + except Exception as e: + print(f"Error connecting to server: {e}") + + def listen(self): + """Listen for messages from the server.""" + try: + while self.is_running: + data = self.client_socket.recv(1024) + if not data: + break + print(f"Received: {data.decode()}") + except Exception as e: + print(f"Error receiving data: {e}") + finally: + self.is_running = False + print("Disconnected from server.") + + def send_ping(self): + """Background task to send 'ping' messages.""" + try: + while self.is_running: + if self.client_socket: + self.client_socket.sendall("ping".encode()) + print(f"Sent: ping") + time.sleep(10) # Send 'ping' every 10 seconds + except Exception as e: + print(f"Error sending ping: {e}") + + def disconnect(self): + """Disconnect from the server.""" + self.is_running = False + if self.client_socket: + try: + self.client_socket.close() + except Exception: + pass + print("Disconnected from server.") + +def client_behaviour(client_id, host, port): + """Simulate a client's behaviour.""" + client = TCPClient(host, port) + + while True: + client.connect() + + # Start listening and ping threads + listen_thread = threading.Thread(target=client.listen, daemon=True) + listen_thread.start() + + ping_thread = threading.Thread(target=client.send_ping, daemon=True) + ping_thread.start() + + # Stay connected for a random duration + time.sleep(random.randint(60, 180)) + + client.disconnect() + # Wait for a random interval before reconnecting + time.sleep(random.randint(5, 60)) + +def generate_clients(host='127.0.0.1', port=3169, num_clients=5): + """Generate multiple clients using threads.""" + threads = [] + for i in range(num_clients): + client_thread = threading.Thread(target=client_behaviour, args=(i, host, port), daemon=True) + threads.append(client_thread) + client_thread.start() + print(f"Client {i} started.") + + # Keep the main thread alive + for thread in threads: + thread.join() + +if __name__ == "__main__": + generate_clients(num_clients=10) diff --git a/main.py b/main.py new file mode 100644 index 0000000..0ac11ee --- /dev/null +++ b/main.py @@ -0,0 +1,149 @@ +import can +import socket +import threading +import time + + +class TCPServer: + def __init__(self, host='0.0.0.0', port=3169): + self.host = host + self.port = port + self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.is_running = False + self.clients = [] # List of connected clients + self.lock = threading.Lock() # Lock for thread-safe client list + self.no_clients_logged = False + + def _log(self, message): + """ Custom logging with a dynamic header. """ + timestamp = time.strftime('%H:%M:%S') + client_count = len(self.clients) + header_state = 'BROADCASTING' if client_count > 0 else 'WAITING FOR CLIENTS' if client_count == 0 else 'UNKNOWN' + header = f'[{timestamp}] ||| Connected clients: {client_count} ||| {header_state}' + print(f'{header}\n{message}\n') + + def setup(self): + """ Set up the server by binding to the host and port and start to listen """ + self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.server_socket.bind((self.host, self.port)) + self.server_socket.listen(5) + self.is_running = True + self._log(f'The server is set up and listening on {self.host}:{self.port}') + + def shutdown(self): + """ Shut down the server and disconnect all clients. """ + self.is_running = False + for client_socket in self.clients: + try: + client_socket.close() + except Exception: + pass + self.clients.clear() + self.server_socket.close() + self._log(f'The server that was running on {self.host}:{self.port} has been shut down.', 'SHUTDOWN') + + def run(self): + """ Run the server and accept clients. """ + self._log('The server is running. Press CTRL+C to stop.') + try: + while self.is_running: + client_socket, client_address = self.server_socket.accept() + client_thread = threading.Thread( + target = self.handle_client, + args = (client_socket, client_address), + daemon = True + ) + client_thread.start() + except KeyboardInterrupt: + self._log('The server is shutting down...', 'SHUTDOWN') + finally: + self.shutdown() + + def handle_client(self, client_socket, client_address): + """ Handle new client connections and add them to the client list. """ + with self.lock: + client_socket.settimeout(30) + self.clients.append(client_socket) + self._log(f'Client connected: {client_address}') + try: + while self.is_running: + data = client_socket.recv(1024) + if not data: + break + message = data.decode().strip() + self._log(f'Received message from {client_address}: {message}') + if message.lower() == 'ping': + response = 'pong' + client_socket.sendall('pong'.encode()) + self._log(f'Sent response to {client_address}: {response}') + except Exception as e: + self._log(f'Error with client {client_address}: {e}') + finally: + self._log(f'Client disconnected: {client_address}') + with self.lock: + self.clients.remove(client_socket) + client_socket.close() + + def broadcast_data(self, data): + """Send the {data} to all connected clients, disconnecting those that are inactive.""" + with self.lock: + if not self.clients: + if not self.no_clients_logged: # Log the message once + self._log('No connected clients, pausing data broadcast.') + self.no_clients_logged = True + return # Skip broadcasting if no clients are connected + + self.no_clients_logged = False + self._log(f'Sending data to {len(self.clients)} clients.') + + for client_socket in list(self.clients): + try: + # Set a 100ms timeout for the socket, attempt to send the data and reset timeout after successful send + #client_socket.settimeout(0.1) + client_socket.sendall(data.encode()) + self._log(f'Data sent to client: {client_socket.getpeername()} -- {data}') + #client_socket.settimeout(None) + except (socket.timeout, BrokenPipeError, ConnectionResetError) as e: + self._log(f'Disconnecting client due to inactivity or error: {e}') + self.clients.remove(client_socket) + try: + client_socket.close() + except Exception: + pass + + +def dump_can_messages(interface: str, server: TCPServer): + """ + Reads and dumps CAN messages from the specified interface. + + :param interface: The name of the CAN interface (e.g., 'can0'). + """ + bus = None + try: + bus = can.interface.Bus(channel=interface, interface='socketcan') + print(f"Listening for messages on {interface}...") + while True: + message = bus.recv() # Blocks until a message is received + if message: + msg = f"Timestamp: {message.timestamp:.6f}, ID: {message.arbitration_id:03X}, Data: {message.data.hex()}\n" + server.broadcast_data(msg) + except KeyboardInterrupt: + print("\nExiting...") + + except can.CanError as e: + print(f"CAN Error: {e}") + + finally: + if bus is not None: + bus.shutdown() + print("CAN bus shut down cleanly.") + + +if __name__ == "__main__": + server = TCPServer() + server.setup() + + server_thread = threading.Thread(target = server.run, daemon = True) + server_thread.start() + + dump_can_messages("can1", server)