150 lines
5.9 KiB
Python
150 lines
5.9 KiB
Python
import can
|
|
import socket
|
|
import threading
|
|
import time
|
|
|
|
|
|
class TCPServer:
|
|
def __init__(self, host='0.0.0.0', port=3169):
|
|
self.host = host
|
|
self.port = port
|
|
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
self.is_running = False
|
|
self.clients = [] # List of connected clients
|
|
self.lock = threading.Lock() # Lock for thread-safe client list
|
|
self.no_clients_logged = False
|
|
|
|
def _log(self, message):
|
|
""" Custom logging with a dynamic header. """
|
|
timestamp = time.strftime('%H:%M:%S')
|
|
client_count = len(self.clients)
|
|
header_state = 'BROADCASTING' if client_count > 0 else 'WAITING FOR CLIENTS' if client_count == 0 else 'UNKNOWN'
|
|
header = f'[{timestamp}] ||| Connected clients: {client_count} ||| {header_state}'
|
|
print(f'{header}\n{message}\n')
|
|
|
|
def setup(self):
|
|
""" Set up the server by binding to the host and port and start to listen """
|
|
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
self.server_socket.bind((self.host, self.port))
|
|
self.server_socket.listen(5)
|
|
self.is_running = True
|
|
self._log(f'The server is set up and listening on {self.host}:{self.port}')
|
|
|
|
def shutdown(self):
|
|
""" Shut down the server and disconnect all clients. """
|
|
self.is_running = False
|
|
for client_socket in self.clients:
|
|
try:
|
|
client_socket.close()
|
|
except Exception:
|
|
pass
|
|
self.clients.clear()
|
|
self.server_socket.close()
|
|
self._log(f'The server that was running on {self.host}:{self.port} has been shut down.', 'SHUTDOWN')
|
|
|
|
def run(self):
|
|
""" Run the server and accept clients. """
|
|
self._log('The server is running. Press CTRL+C to stop.')
|
|
try:
|
|
while self.is_running:
|
|
client_socket, client_address = self.server_socket.accept()
|
|
client_thread = threading.Thread(
|
|
target = self.handle_client,
|
|
args = (client_socket, client_address),
|
|
daemon = True
|
|
)
|
|
client_thread.start()
|
|
except KeyboardInterrupt:
|
|
self._log('The server is shutting down...', 'SHUTDOWN')
|
|
finally:
|
|
self.shutdown()
|
|
|
|
def handle_client(self, client_socket, client_address):
|
|
""" Handle new client connections and add them to the client list. """
|
|
with self.lock:
|
|
client_socket.settimeout(30)
|
|
self.clients.append(client_socket)
|
|
self._log(f'Client connected: {client_address}')
|
|
try:
|
|
while self.is_running:
|
|
data = client_socket.recv(1024)
|
|
if not data:
|
|
break
|
|
message = data.decode().strip()
|
|
self._log(f'Received message from {client_address}: {message}')
|
|
if message.lower() == 'ping':
|
|
response = 'pong'
|
|
client_socket.sendall('pong'.encode())
|
|
self._log(f'Sent response to {client_address}: {response}')
|
|
except Exception as e:
|
|
self._log(f'Error with client {client_address}: {e}')
|
|
finally:
|
|
self._log(f'Client disconnected: {client_address}')
|
|
with self.lock:
|
|
self.clients.remove(client_socket)
|
|
client_socket.close()
|
|
|
|
def broadcast_data(self, data):
|
|
"""Send the {data} to all connected clients, disconnecting those that are inactive."""
|
|
with self.lock:
|
|
if not self.clients:
|
|
if not self.no_clients_logged: # Log the message once
|
|
self._log('No connected clients, pausing data broadcast.')
|
|
self.no_clients_logged = True
|
|
return # Skip broadcasting if no clients are connected
|
|
|
|
self.no_clients_logged = False
|
|
self._log(f'Sending data to {len(self.clients)} clients.')
|
|
|
|
for client_socket in list(self.clients):
|
|
try:
|
|
# Set a 100ms timeout for the socket, attempt to send the data and reset timeout after successful send
|
|
#client_socket.settimeout(0.1)
|
|
client_socket.sendall(data.encode())
|
|
self._log(f'Data sent to client: {client_socket.getpeername()} -- {data}')
|
|
#client_socket.settimeout(None)
|
|
except (socket.timeout, BrokenPipeError, ConnectionResetError) as e:
|
|
self._log(f'Disconnecting client due to inactivity or error: {e}')
|
|
self.clients.remove(client_socket)
|
|
try:
|
|
client_socket.close()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def dump_can_messages(interface: str, server: TCPServer):
|
|
"""
|
|
Reads and dumps CAN messages from the specified interface.
|
|
|
|
:param interface: The name of the CAN interface (e.g., 'can0').
|
|
"""
|
|
bus = None
|
|
try:
|
|
bus = can.interface.Bus(channel=interface, interface='socketcan')
|
|
print(f"Listening for messages on {interface}...")
|
|
while True:
|
|
message = bus.recv() # Blocks until a message is received
|
|
if message:
|
|
msg = f"Timestamp: {message.timestamp:.6f}, ID: {message.arbitration_id:03X}, Data: {message.data.hex()}\n"
|
|
server.broadcast_data(msg)
|
|
except KeyboardInterrupt:
|
|
print("\nExiting...")
|
|
|
|
except can.CanError as e:
|
|
print(f"CAN Error: {e}")
|
|
|
|
finally:
|
|
if bus is not None:
|
|
bus.shutdown()
|
|
print("CAN bus shut down cleanly.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
server = TCPServer()
|
|
server.setup()
|
|
|
|
server_thread = threading.Thread(target = server.run, daemon = True)
|
|
server_thread.start()
|
|
|
|
dump_can_messages("can1", server)
|