thank you turk
This commit is contained in:
parent
0211a9b73e
commit
e4b5bcfb79
93
client_emulator.py
Normal file
93
client_emulator.py
Normal file
@ -0,0 +1,93 @@
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
import random
|
||||
|
||||
class TCPClient:
|
||||
def __init__(self, host='127.0.0.1', port=3169):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.client_socket = None
|
||||
self.is_running = False # Control flag for background tasks
|
||||
|
||||
def connect(self):
|
||||
"""Connect to the server."""
|
||||
try:
|
||||
self.client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.client_socket.connect((self.host, self.port))
|
||||
self.is_running = True
|
||||
print(f"Connected to server at {self.host}:{self.port}")
|
||||
except Exception as e:
|
||||
print(f"Error connecting to server: {e}")
|
||||
|
||||
def listen(self):
|
||||
"""Listen for messages from the server."""
|
||||
try:
|
||||
while self.is_running:
|
||||
data = self.client_socket.recv(1024)
|
||||
if not data:
|
||||
break
|
||||
print(f"Received: {data.decode()}")
|
||||
except Exception as e:
|
||||
print(f"Error receiving data: {e}")
|
||||
finally:
|
||||
self.is_running = False
|
||||
print("Disconnected from server.")
|
||||
|
||||
def send_ping(self):
|
||||
"""Background task to send 'ping' messages."""
|
||||
try:
|
||||
while self.is_running:
|
||||
if self.client_socket:
|
||||
self.client_socket.sendall("ping".encode())
|
||||
print(f"Sent: ping")
|
||||
time.sleep(10) # Send 'ping' every 10 seconds
|
||||
except Exception as e:
|
||||
print(f"Error sending ping: {e}")
|
||||
|
||||
def disconnect(self):
|
||||
"""Disconnect from the server."""
|
||||
self.is_running = False
|
||||
if self.client_socket:
|
||||
try:
|
||||
self.client_socket.close()
|
||||
except Exception:
|
||||
pass
|
||||
print("Disconnected from server.")
|
||||
|
||||
def client_behaviour(client_id, host, port):
|
||||
"""Simulate a client's behaviour."""
|
||||
client = TCPClient(host, port)
|
||||
|
||||
while True:
|
||||
client.connect()
|
||||
|
||||
# Start listening and ping threads
|
||||
listen_thread = threading.Thread(target=client.listen, daemon=True)
|
||||
listen_thread.start()
|
||||
|
||||
ping_thread = threading.Thread(target=client.send_ping, daemon=True)
|
||||
ping_thread.start()
|
||||
|
||||
# Stay connected for a random duration
|
||||
time.sleep(random.randint(60, 180))
|
||||
|
||||
client.disconnect()
|
||||
# Wait for a random interval before reconnecting
|
||||
time.sleep(random.randint(5, 60))
|
||||
|
||||
def generate_clients(host='127.0.0.1', port=3169, num_clients=5):
|
||||
"""Generate multiple clients using threads."""
|
||||
threads = []
|
||||
for i in range(num_clients):
|
||||
client_thread = threading.Thread(target=client_behaviour, args=(i, host, port), daemon=True)
|
||||
threads.append(client_thread)
|
||||
client_thread.start()
|
||||
print(f"Client {i} started.")
|
||||
|
||||
# Keep the main thread alive
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
|
||||
if __name__ == "__main__":
|
||||
generate_clients(num_clients=10)
|
149
main.py
Normal file
149
main.py
Normal file
@ -0,0 +1,149 @@
|
||||
import can
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
|
||||
|
||||
class TCPServer:
|
||||
def __init__(self, host='0.0.0.0', port=3169):
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.is_running = False
|
||||
self.clients = [] # List of connected clients
|
||||
self.lock = threading.Lock() # Lock for thread-safe client list
|
||||
self.no_clients_logged = False
|
||||
|
||||
def _log(self, message):
|
||||
""" Custom logging with a dynamic header. """
|
||||
timestamp = time.strftime('%H:%M:%S')
|
||||
client_count = len(self.clients)
|
||||
header_state = 'BROADCASTING' if client_count > 0 else 'WAITING FOR CLIENTS' if client_count == 0 else 'UNKNOWN'
|
||||
header = f'[{timestamp}] ||| Connected clients: {client_count} ||| {header_state}'
|
||||
print(f'{header}\n{message}\n')
|
||||
|
||||
def setup(self):
|
||||
""" Set up the server by binding to the host and port and start to listen """
|
||||
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
self.server_socket.bind((self.host, self.port))
|
||||
self.server_socket.listen(5)
|
||||
self.is_running = True
|
||||
self._log(f'The server is set up and listening on {self.host}:{self.port}')
|
||||
|
||||
def shutdown(self):
|
||||
""" Shut down the server and disconnect all clients. """
|
||||
self.is_running = False
|
||||
for client_socket in self.clients:
|
||||
try:
|
||||
client_socket.close()
|
||||
except Exception:
|
||||
pass
|
||||
self.clients.clear()
|
||||
self.server_socket.close()
|
||||
self._log(f'The server that was running on {self.host}:{self.port} has been shut down.', 'SHUTDOWN')
|
||||
|
||||
def run(self):
|
||||
""" Run the server and accept clients. """
|
||||
self._log('The server is running. Press CTRL+C to stop.')
|
||||
try:
|
||||
while self.is_running:
|
||||
client_socket, client_address = self.server_socket.accept()
|
||||
client_thread = threading.Thread(
|
||||
target = self.handle_client,
|
||||
args = (client_socket, client_address),
|
||||
daemon = True
|
||||
)
|
||||
client_thread.start()
|
||||
except KeyboardInterrupt:
|
||||
self._log('The server is shutting down...', 'SHUTDOWN')
|
||||
finally:
|
||||
self.shutdown()
|
||||
|
||||
def handle_client(self, client_socket, client_address):
|
||||
""" Handle new client connections and add them to the client list. """
|
||||
with self.lock:
|
||||
client_socket.settimeout(30)
|
||||
self.clients.append(client_socket)
|
||||
self._log(f'Client connected: {client_address}')
|
||||
try:
|
||||
while self.is_running:
|
||||
data = client_socket.recv(1024)
|
||||
if not data:
|
||||
break
|
||||
message = data.decode().strip()
|
||||
self._log(f'Received message from {client_address}: {message}')
|
||||
if message.lower() == 'ping':
|
||||
response = 'pong'
|
||||
client_socket.sendall('pong'.encode())
|
||||
self._log(f'Sent response to {client_address}: {response}')
|
||||
except Exception as e:
|
||||
self._log(f'Error with client {client_address}: {e}')
|
||||
finally:
|
||||
self._log(f'Client disconnected: {client_address}')
|
||||
with self.lock:
|
||||
self.clients.remove(client_socket)
|
||||
client_socket.close()
|
||||
|
||||
def broadcast_data(self, data):
|
||||
"""Send the {data} to all connected clients, disconnecting those that are inactive."""
|
||||
with self.lock:
|
||||
if not self.clients:
|
||||
if not self.no_clients_logged: # Log the message once
|
||||
self._log('No connected clients, pausing data broadcast.')
|
||||
self.no_clients_logged = True
|
||||
return # Skip broadcasting if no clients are connected
|
||||
|
||||
self.no_clients_logged = False
|
||||
self._log(f'Sending data to {len(self.clients)} clients.')
|
||||
|
||||
for client_socket in list(self.clients):
|
||||
try:
|
||||
# Set a 100ms timeout for the socket, attempt to send the data and reset timeout after successful send
|
||||
#client_socket.settimeout(0.1)
|
||||
client_socket.sendall(data.encode())
|
||||
self._log(f'Data sent to client: {client_socket.getpeername()} -- {data}')
|
||||
#client_socket.settimeout(None)
|
||||
except (socket.timeout, BrokenPipeError, ConnectionResetError) as e:
|
||||
self._log(f'Disconnecting client due to inactivity or error: {e}')
|
||||
self.clients.remove(client_socket)
|
||||
try:
|
||||
client_socket.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def dump_can_messages(interface: str, server: TCPServer):
|
||||
"""
|
||||
Reads and dumps CAN messages from the specified interface.
|
||||
|
||||
:param interface: The name of the CAN interface (e.g., 'can0').
|
||||
"""
|
||||
bus = None
|
||||
try:
|
||||
bus = can.interface.Bus(channel=interface, interface='socketcan')
|
||||
print(f"Listening for messages on {interface}...")
|
||||
while True:
|
||||
message = bus.recv() # Blocks until a message is received
|
||||
if message:
|
||||
msg = f"Timestamp: {message.timestamp:.6f}, ID: {message.arbitration_id:03X}, Data: {message.data.hex()}\n"
|
||||
server.broadcast_data(msg)
|
||||
except KeyboardInterrupt:
|
||||
print("\nExiting...")
|
||||
|
||||
except can.CanError as e:
|
||||
print(f"CAN Error: {e}")
|
||||
|
||||
finally:
|
||||
if bus is not None:
|
||||
bus.shutdown()
|
||||
print("CAN bus shut down cleanly.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
server = TCPServer()
|
||||
server.setup()
|
||||
|
||||
server_thread = threading.Thread(target = server.run, daemon = True)
|
||||
server_thread.start()
|
||||
|
||||
dump_can_messages("can1", server)
|
Loading…
Reference in New Issue
Block a user