4
4
import pytest
5
5
import os
6
6
import ptf .testutils as testutils
7
+ from scapy .layers .l2 import Ether
8
+ from scapy .contrib .mpls import MPLS
9
+ from scapy .layers .l2 import Dot1Q
10
+ from scapy .layers .vxlan import VXLAN
7
11
from . import everflow_test_utilities as everflow_utils
8
12
9
- from .everflow_test_utilities import BaseEverflowTest , erspan_ip_ver # noqa: F401
13
+ from .everflow_test_utilities import BaseEverflowTest , erspan_ip_ver # noqa: F401
10
14
from .everflow_test_utilities import TEMPLATE_DIR , EVERFLOW_RULE_CREATE_TEMPLATE , \
11
- DUT_RUN_DIR , EVERFLOW_RULE_CREATE_FILE , UP_STREAM
15
+ DUT_RUN_DIR , EVERFLOW_RULE_CREATE_FILE , UP_STREAM
12
16
from tests .common .helpers .assertions import pytest_require
13
17
14
- from .everflow_test_utilities import setup_info , EVERFLOW_DSCP_RULES , STABILITY_BUFFER # noqa: F401
18
+ from .everflow_test_utilities import setup_info , EVERFLOW_DSCP_RULES , STABILITY_BUFFER # noqa: F401
15
19
from tests .common .dualtor .mux_simulator_control import toggle_all_simulator_ports_to_rand_selected_tor # noqa: F401
16
20
17
21
pytestmark = [
@@ -79,7 +83,7 @@ def build_acl_rule_vars(candidate_ports, ip_ver):
79
83
80
84
81
85
@pytest .fixture (scope = 'module' )
82
- def apply_mirror_session (setup_info , erspan_ip_ver ): # noqa F811
86
+ def apply_mirror_session (setup_info , erspan_ip_ver ): # noqa F811
83
87
mirror_session_info = BaseEverflowTest .mirror_session_info (
84
88
EVERFLOW_SESSION_NAME , setup_info [UP_STREAM ]['everflow_dut' ].facts ["asic_type" ])
85
89
logger .info ("Applying mirror session to DUT" )
@@ -93,7 +97,7 @@ def apply_mirror_session(setup_info, erspan_ip_ver): # noqa F811
93
97
94
98
95
99
@pytest .fixture (scope = 'module' )
96
- def setup_mirror_session_dest_ip_route (tbinfo , setup_info , apply_mirror_session , erspan_ip_ver ): # noqa F811
100
+ def setup_mirror_session_dest_ip_route (tbinfo , setup_info , apply_mirror_session , erspan_ip_ver ): # noqa F811
97
101
"""
98
102
Setup the route for mirror session destination ip and update monitor port list.
99
103
Remove the route as part of cleanup.
@@ -124,7 +128,7 @@ def ip_ver(request):
124
128
125
129
126
130
@pytest .fixture (scope = 'module' )
127
- def apply_acl_rule (setup_info , tbinfo , setup_mirror_session_dest_ip_route , ip_ver ): # noqa F811
131
+ def apply_acl_rule (setup_info , tbinfo , setup_mirror_session_dest_ip_route , ip_ver ): # noqa F811
128
132
"""
129
133
Apply ACL rule for matching input_ports
130
134
"""
@@ -148,7 +152,7 @@ def apply_acl_rule(setup_info, tbinfo, setup_mirror_session_dest_ip_route, ip_ve
148
152
dest = os .path .join (DUT_RUN_DIR , EVERFLOW_RULE_CREATE_FILE ))
149
153
logger .info ("Applying acl rule config to DUT" )
150
154
command = "acl-loader update full {} --table_name {} --session_name {}" \
151
- .format (os .path .join (DUT_RUN_DIR , EVERFLOW_RULE_CREATE_FILE ), table_name , EVERFLOW_SESSION_NAME )
155
+ .format (os .path .join (DUT_RUN_DIR , EVERFLOW_RULE_CREATE_FILE ), table_name , EVERFLOW_SESSION_NAME )
152
156
setup_info [UP_STREAM ]['everflow_dut' ].shell (cmd = command )
153
157
ret = {
154
158
"candidate_ports" : candidate_ports ,
@@ -192,7 +196,7 @@ def send_and_verify_packet(ptfadapter, packet, expected_packet, tx_port, rx_port
192
196
testutils .verify_no_packet_any (ptfadapter , pkt = expected_packet , ports = rx_ports )
193
197
194
198
195
- def test_everflow_per_interface (ptfadapter , setup_info , apply_acl_rule , tbinfo , # noqa F811
199
+ def test_everflow_per_interface (ptfadapter , setup_info , apply_acl_rule , tbinfo , # noqa F811
196
200
toggle_all_simulator_ports_to_rand_selected_tor , ip_ver , erspan_ip_ver ): # noqa F811
197
201
"""Verify packet ingress from candidate ports are captured by EVERFLOW, while packets
198
202
ingress from unselected ports are not captured
@@ -213,3 +217,56 @@ def test_everflow_per_interface(ptfadapter, setup_info, apply_acl_rule, tbinfo,
213
217
for port , ptf_idx in list (everflow_config ['unselected_ports' ].items ()):
214
218
logger .info ("Verifying packet ingress from {} is not mirrored" .format (port ))
215
219
send_and_verify_packet (ptfadapter , packet , exp_packet , ptf_idx , uplink_ports , False )
220
+
221
+
222
+ def test_everflow_packet_format (ptfadapter , setup_info , apply_acl_rule , tbinfo , # noqa F811
223
+ toggle_all_simulator_ports_to_rand_selected_tor , ip_ver , erspan_ip_ver ): # noqa F811
224
+ """Verify that mirrored packets do not contain VLAN tags or unexpected fields."""
225
+ everflow_config = apply_acl_rule
226
+ packet , exp_packet = generate_testing_packet (ptfadapter , setup_info [UP_STREAM ]['everflow_dut' ],
227
+ everflow_config ['mirror_session_info' ],
228
+ setup_info [UP_STREAM ]['ingress_router_mac' ], setup_info , ip_ver ,
229
+ erspan_ip_ver )
230
+ uplink_ports = everflow_config ["monitor_port_ptf_ids" ]
231
+
232
+ # Send test packet
233
+ candidate_port , ptf_idx = list (everflow_config ['candidate_ports' ].items ())[0 ]
234
+ logger .info (f"Sending test packet from candidate port { candidate_port } " )
235
+ ptfadapter .dataplane .flush ()
236
+ testutils .send (ptfadapter , pkt = packet , port_id = ptf_idx )
237
+
238
+ # Capture mirrored packet
239
+ logger .info ("Capturing mirrored packet to verify format" )
240
+ res = testutils .verify_packet_any_port (ptfadapter ,
241
+ pkt = exp_packet ,
242
+ ports = uplink_ports ,
243
+ timeout = 5 )
244
+
245
+ # Skip traffic test if the return value is true.
246
+ # See tests.conftest.pytest_runtest_call and tests.common.plugins.ptfadapter.dummy_testutils.wrapped
247
+ if res is True :
248
+ logger .info ("Skipped. Ptf.testutils is set to DummyTestUtils to skip traffic test." )
249
+ return
250
+
251
+ port_idx , raw_captured_packet = res
252
+ # Ensure packet is not empty
253
+ assert raw_captured_packet , "Captured packet is empty or None"
254
+
255
+ captured_packet = Ether (raw_captured_packet )
256
+
257
+ # Debugging: Print packet summary if assertions fail
258
+ packet_summary = captured_packet .summary ()
259
+
260
+ # Ensure no VLAN tag
261
+ assert not captured_packet .haslayer (Dot1Q ), \
262
+ f"Mirrored packet should not contain VLAN tag: { packet_summary } "
263
+
264
+ # Check for unexpected MPLS headers
265
+ assert not captured_packet .haslayer (MPLS ), \
266
+ f"Mirrored packet contains unexpected MPLS label: { packet_summary } "
267
+
268
+ # Check for unexpected VXLAN encapsulation
269
+ assert not captured_packet .haslayer (VXLAN ), \
270
+ f"Mirrored packet should not have VXLAN encapsulation: { packet_summary } "
271
+
272
+ logger .info (f"Mirrored packet format verified: { packet_summary } " )
0 commit comments