-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
155 lines (132 loc) · 6.44 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import socket
import time
from abc import ABC, abstractmethod
import logging
import traceback
import math
import csv
from metrics import FlowMetricsManager
import threading
import os
class BaseServer(ABC):
def __init__(self, port=12345, ip=None, congestion_control='cubic', exp_id=''):
self.port = port
self.congestion_control = congestion_control
self.exp_id = exp_id
# self.ip = self.get_host_ip()
if ip:
self.ip = ip
else: raise ValueError("IP address must be specified")
self.flowtracker = FlowMetricsManager(self.exp_id)
def handle_request(self, conn):
try:
# Receive data in chunks
data = conn.recv(1024)
logging.info(f"[{self.ip}]: Received {len(data)} bytes of data")
# Echo back an acknowledgment or the received data
if data:
conn.sendall(b"ACK") # Acknowledge the receipt of data
except Exception as e:
logging.error(f"[{self.ip}]: Error handling request: {e}")
finally:
logging.info(f"[{self.ip}]: Closing connection")
conn.close()
def start(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('0.0.0.0', self.port))
s.listen()
logging.info(f"[{self.ip}]: Server listening on port {self.port}")
while True:
try:
conn, addr = s.accept()
# logging.info(f"[{self.ip}]: Accepted connection from {addr}")
with conn:
print(f"Connected by {addr}")
self.handle_request(conn)
except Exception as e:
logging.error(f"[{self.ip}]: Error accepting connection: {e}")
logging.error(traceback.format_exc())
class BurstyServer(BaseServer):
def __init__(self, ip=None, reply_size=40000, port=12346, exp_id='', cong_control='cubic'):
super().__init__(port, ip, exp_id=exp_id, congestion_control=cong_control)
self.reply_size = reply_size
def start(self):
"""Start the server with multi-threading support."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(('0.0.0.0', self.port))
s.listen()
logging.info(f"[{self.ip}]: Server listening on port {self.port}")
while True:
conn, addr = s.accept()
# logging.info(f"[{self.ip}]: Accepted connection from {addr}")
# Start a new thread for each client connection
client_thread = threading.Thread(target=self.handle_request, args=(conn, addr))
client_thread.daemon = True # Daemon thread will exit when main program exits
client_thread.start()
def handle_request(self, conn, addr):
"""Handle a client request and send the response."""
try:
data = conn.recv(1024)
if data:
# Decode flow ID (optional, for metrics tracking)
flow_id = data[:8].decode('utf-8') if len(data) >= 8 else None
# TODO - This is not entirely accurate, as the QCT is computed at client side taking the minimum fct
# In other words, the flow start time should be started at the client side
# The problem is that the servers should send flows at the same time to properly simulate an incast event
# TODO - Understand if the incast event is happening without the servers synchronization
self.flowtracker.start_flow(flow_id, self.ip, addr[0], self.reply_size, flow_type='bursty')
# Send the full reply size as one data stream
conn.sendall(b'x' * self.reply_size)
except Exception as e:
logging.error(f"[{self.ip}]: Error handling request from {addr[0]}:{addr[1]}: {e}")
print(traceback.format_exc())
finally:
conn.close()
class BackgroundServer(BaseServer):
def __init__(self, ip=None, port=12345, exp_id='', cong_control='cubic'):
super().__init__(port, ip, cong_control, exp_id)
def start(self):
"""Start the server with multi-threading support."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.IPPROTO_TCP, socket.TCP_CONGESTION, self.congestion_control.encode())
s.bind(('0.0.0.0', self.port))
s.listen()
# logging.info(f"[{self.ip}]: Server listening on port {self.port}")
while True:
conn, addr = s.accept()
# logging.info(f"[{self.ip}]: Accepted connection from {addr}")
# Start a new thread for each client connection
client_thread = threading.Thread(target=self.handle_request, args=(conn, addr))
client_thread.daemon = True # Daemon thread will exit when main program exits
client_thread.start()
def handle_request(self, conn, addr):
"""Handle incoming data, compute metrics, and send acknowledgment."""
total_bytes_received = 0
flow_id = None # Assuming each connection has a unique flow_id assigned by the client
try:
data = conn.recv(4096)
if data:
flow_id = data[:8].decode('utf-8').strip().split()[0]
total_bytes_received += len(data) - 8
# print(f"Received data from {addr[0]}:{addr[1]} | Flow ID: {flow_id} | Bytes: {len(data) - 8}")
# logging.info(f"Started Flow ID: {flow_id} | Bytes received: {total_bytes_received}")
while data:
data = conn.recv(4096)
if not data:
break
total_bytes_received += len(data)
# Mark the flow as complete in FlowMetricsManager
if flow_id is not None:
self.flowtracker.complete_flow(flow_id)
# logging.info(f"Flow {flow_id} from {addr} completed. Total bytes received: {total_bytes_received}")
except Exception as e:
logging.error(f"[{self.ip}]: Error receiving data from {addr[0]}:{addr[1]}: {e}")
def get_metrics(self):
"""Retrieve collected flow metrics."""
return self.flowtracker.get_metrics()
class IperfServer(BaseServer):
def __init__(self, port=5201):
self.port = port
def start(self):
cmd = f'iperf3 -s &'
os.system(cmd)