canporter/main.py
2024-12-11 19:38:55 +00:00

150 lines
5.9 KiB
Python

import can
import socket
import threading
import time
class TCPServer:
def __init__(self, host='0.0.0.0', port=3169):
self.host = host
self.port = port
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.is_running = False
self.clients = [] # List of connected clients
self.lock = threading.Lock() # Lock for thread-safe client list
self.no_clients_logged = False
def _log(self, message):
""" Custom logging with a dynamic header. """
timestamp = time.strftime('%H:%M:%S')
client_count = len(self.clients)
header_state = 'BROADCASTING' if client_count > 0 else 'WAITING FOR CLIENTS' if client_count == 0 else 'UNKNOWN'
header = f'[{timestamp}] ||| Connected clients: {client_count} ||| {header_state}'
print(f'{header}\n{message}\n')
def setup(self):
""" Set up the server by binding to the host and port and start to listen """
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(5)
self.is_running = True
self._log(f'The server is set up and listening on {self.host}:{self.port}')
def shutdown(self):
""" Shut down the server and disconnect all clients. """
self.is_running = False
for client_socket in self.clients:
try:
client_socket.close()
except Exception:
pass
self.clients.clear()
self.server_socket.close()
self._log(f'The server that was running on {self.host}:{self.port} has been shut down.', 'SHUTDOWN')
def run(self):
""" Run the server and accept clients. """
self._log('The server is running. Press CTRL+C to stop.')
try:
while self.is_running:
client_socket, client_address = self.server_socket.accept()
client_thread = threading.Thread(
target = self.handle_client,
args = (client_socket, client_address),
daemon = True
)
client_thread.start()
except KeyboardInterrupt:
self._log('The server is shutting down...', 'SHUTDOWN')
finally:
self.shutdown()
def handle_client(self, client_socket, client_address):
""" Handle new client connections and add them to the client list. """
with self.lock:
client_socket.settimeout(30)
self.clients.append(client_socket)
self._log(f'Client connected: {client_address}')
try:
while self.is_running:
data = client_socket.recv(1024)
if not data:
break
message = data.decode().strip()
self._log(f'Received message from {client_address}: {message}')
if message.lower() == 'ping':
response = 'pong'
client_socket.sendall('pong'.encode())
self._log(f'Sent response to {client_address}: {response}')
except Exception as e:
self._log(f'Error with client {client_address}: {e}')
finally:
self._log(f'Client disconnected: {client_address}')
with self.lock:
self.clients.remove(client_socket)
client_socket.close()
def broadcast_data(self, data):
"""Send the {data} to all connected clients, disconnecting those that are inactive."""
with self.lock:
if not self.clients:
if not self.no_clients_logged: # Log the message once
self._log('No connected clients, pausing data broadcast.')
self.no_clients_logged = True
return # Skip broadcasting if no clients are connected
self.no_clients_logged = False
self._log(f'Sending data to {len(self.clients)} clients.')
for client_socket in list(self.clients):
try:
# Set a 100ms timeout for the socket, attempt to send the data and reset timeout after successful send
#client_socket.settimeout(0.1)
client_socket.sendall(data.encode())
self._log(f'Data sent to client: {client_socket.getpeername()} -- {data}')
#client_socket.settimeout(None)
except (socket.timeout, BrokenPipeError, ConnectionResetError) as e:
self._log(f'Disconnecting client due to inactivity or error: {e}')
self.clients.remove(client_socket)
try:
client_socket.close()
except Exception:
pass
def dump_can_messages(interface: str, server: TCPServer):
"""
Reads and dumps CAN messages from the specified interface.
:param interface: The name of the CAN interface (e.g., 'can0').
"""
bus = None
try:
bus = can.interface.Bus(channel=interface, interface='socketcan')
print(f"Listening for messages on {interface}...")
while True:
message = bus.recv() # Blocks until a message is received
if message:
msg = f"Timestamp: {message.timestamp:.6f}, ID: {message.arbitration_id:03X}, Data: {message.data.hex()}\n"
server.broadcast_data(msg)
except KeyboardInterrupt:
print("\nExiting...")
except can.CanError as e:
print(f"CAN Error: {e}")
finally:
if bus is not None:
bus.shutdown()
print("CAN bus shut down cleanly.")
if __name__ == "__main__":
server = TCPServer()
server.setup()
server_thread = threading.Thread(target = server.run, daemon = True)
server_thread.start()
dump_can_messages("can1", server)