Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ycabled][active-active] Fix in gRPC channel callback logic by creating swsscommon table within the context #509

Merged
merged 7 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sonic-ycabled/ycable/ycable.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def handle_state_update_task(op, port, fvp_dict, y_cable_presence, port_tbl, por
port_dict[port] = SFP_STATUS_REMOVED

y_cable_helper.change_ports_status_for_y_cable_change_event(
port_dict, y_cable_presence, port_tbl, port_tbl_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, state_db, stopping_event)
port_dict, y_cable_presence, port_tbl, port_tbl_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, state_db, stopping_event)

#
# Helper classes ===============================================================
Expand Down
97 changes: 56 additions & 41 deletions sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ def wrapper(*args, **kwargs):

return wrapper

def retry_setup_grpc_channel_for_port(port, asic_index, port_tbl, grpc_client, fwd_state_response_tbl):
def retry_setup_grpc_channel_for_port(port, asic_index, port_tbl, grpc_client):

global grpc_port_stubs
global grpc_port_channels
Expand All @@ -366,7 +366,7 @@ def retry_setup_grpc_channel_for_port(port, asic_index, port_tbl, grpc_client, f
if soc_ipv4_full is not None:
soc_ipv4 = soc_ipv4_full.split('/')[0]

channel, stub = setup_grpc_channel_for_port(port, soc_ipv4, asic_index, grpc_client, fwd_state_response_tbl, False)
channel, stub = setup_grpc_channel_for_port(port, soc_ipv4, asic_index, grpc_client, False)
if channel is None or stub is None:
helper_logger.log_notice(
"stub is None, while reattempt setting up channels did not work {}".format(port))
Expand Down Expand Up @@ -458,34 +458,48 @@ def connect_channel(channel, stub, port):
else:
break

def create_channel(type_chan, level, kvp, soc_ip, port, asic_index, fwd_state_response_tbl, is_async):
def wait_for_state_change(channel_connectivity, port):
# Initialize YcableChannelStateTableHelper only once
if not hasattr(wait_for_state_change, 'table_helper'):
wait_for_state_change.table_helper = {}

# Helper callback to get an channel connectivity state
def wait_for_state_change(channel_connectivity):
if channel_connectivity == grpc.ChannelConnectivity.TRANSIENT_FAILURE:
helper_logger.log_notice("gRPC port {} state changed to TRANSIENT_FAILURE".format(port))
# for connectivity state to FAILURE/IDLE report a failure
fvs_updated = swsscommon.FieldValuePairs([('response', 'failure')])
fwd_state_response_tbl[asic_index].set(port, fvs_updated)
grpc_port_connectivity[port] = "TRANSIENT_FAILURE"

if channel_connectivity == grpc.ChannelConnectivity.CONNECTING:
helper_logger.log_notice("gRPC port {} state changed to CONNECTING".format(port))
grpc_port_connectivity[port] = "CONNECTING"
if channel_connectivity == grpc.ChannelConnectivity.READY:
helper_logger.log_notice("gRPC port {} state changed to READY".format(port))
grpc_port_connectivity[port] = "READY"
if channel_connectivity == grpc.ChannelConnectivity.IDLE:
helper_logger.log_notice("gRPC port {} state changed to IDLE".format(port))
# for connectivity state to FAILURE/IDLE report a failure
fvs_updated = swsscommon.FieldValuePairs([('response', 'failure')])
fwd_state_response_tbl[asic_index].set(port, fvs_updated)
grpc_port_connectivity[port] = "IDLE"

if channel_connectivity == grpc.ChannelConnectivity.SHUTDOWN:
helper_logger.log_notice("gRPC port {} state changed to SHUTDOWN".format(port))
grpc_port_connectivity[port] = "SHUTDOWN"
if wait_for_state_change.table_helper.get(port, None) is None:
wait_for_state_change.table_helper[port] = y_cable_table_helper.YcableChannelStateTableHelper()

# Use the initialized table_helper
table_helper = wait_for_state_change.table_helper[port]

# get the appropriate table_helper
fwd_state_response_tbl = table_helper.get_fwd_state_response_tbl()
asic_index = multi_asic.get_asic_index_from_namespace(DEFAULT_NAMESPACE)

if channel_connectivity == grpc.ChannelConnectivity.TRANSIENT_FAILURE:
helper_logger.log_notice("gRPC port {} state changed to TRANSIENT_FAILURE".format(port))
# for connectivity state to FAILURE/IDLE report a failure
fvs_updated = swsscommon.FieldValuePairs([('response', 'failure')])
fwd_state_response_tbl[asic_index].set(port, fvs_updated)
grpc_port_connectivity[port] = "TRANSIENT_FAILURE"

if channel_connectivity == grpc.ChannelConnectivity.CONNECTING:
helper_logger.log_notice("gRPC port {} state changed to CONNECTING".format(port))
grpc_port_connectivity[port] = "CONNECTING"
if channel_connectivity == grpc.ChannelConnectivity.READY:
helper_logger.log_notice("gRPC port {} state changed to READY".format(port))
grpc_port_connectivity[port] = "READY"
if channel_connectivity == grpc.ChannelConnectivity.IDLE:
helper_logger.log_notice("gRPC port {} state changed to IDLE".format(port))
# for connectivity state to FAILURE/IDLE report a failure
fvs_updated = swsscommon.FieldValuePairs([('response', 'failure')])
fwd_state_response_tbl[asic_index].set(port, fvs_updated)
grpc_port_connectivity[port] = "IDLE"

if channel_connectivity == grpc.ChannelConnectivity.SHUTDOWN:
helper_logger.log_notice("gRPC port {} state changed to SHUTDOWN".format(port))
grpc_port_connectivity[port] = "SHUTDOWN"



def create_channel(type_chan, level, kvp, soc_ip, port, asic_index, is_async):

if type_chan == "secure":
credential = get_grpc_credentials(level, kvp)
Expand Down Expand Up @@ -517,7 +531,8 @@ def wait_for_state_change(channel_connectivity):


if not is_async and channel is not None:
channel.subscribe(wait_for_state_change)
channel.subscribe(lambda channel_connectivity: wait_for_state_change(channel_connectivity, port))


#connect_channel(channel, stub, port)
"""
Expand All @@ -528,7 +543,7 @@ def wait_for_state_change(channel_connectivity):

return channel, stub

def setup_grpc_channel_for_port(port, soc_ip, asic_index, grpc_config, fwd_state_response_tbl, is_async):
def setup_grpc_channel_for_port(port, soc_ip, asic_index, grpc_config, is_async):

"""
Dummy values for lab for now
Expand Down Expand Up @@ -570,7 +585,7 @@ def setup_grpc_channel_for_port(port, soc_ip, asic_index, grpc_config, fwd_state
kvp = dict(fvs)


channel, stub = create_channel(type_chan, level, kvp, soc_ip, port, asic_index, fwd_state_response_tbl, is_async)
channel, stub = create_channel(type_chan, level, kvp, soc_ip, port, asic_index, is_async)

if stub is None:
helper_logger.log_warning("stub was not setup for gRPC soc ip {} port {}, no gRPC soc server running ?".format(soc_ip, port))
Expand Down Expand Up @@ -641,7 +656,7 @@ def process_loopback_interface_and_get_read_side(loopback_keys):
return -1


def check_identifier_presence_and_setup_channel(logical_port_name, port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, mux_tbl, y_cable_presence, grpc_client, fwd_state_response_tbl):
def check_identifier_presence_and_setup_channel(logical_port_name, port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, mux_tbl, y_cable_presence, grpc_client):
global grpc_port_stubs
global grpc_port_channels

Expand Down Expand Up @@ -678,7 +693,7 @@ def check_identifier_presence_and_setup_channel(logical_port_name, port_tbl, hw_
if prev_channel is not None and prev_stub is not None:
return

channel, stub = setup_grpc_channel_for_port(logical_port_name, soc_ipv4, asic_index, grpc_client, fwd_state_response_tbl, False)
channel, stub = setup_grpc_channel_for_port(logical_port_name, soc_ipv4, asic_index, grpc_client, False)
post_port_mux_info_to_db(logical_port_name, mux_tbl, asic_index, hw_mux_cable_tbl, 'pseudo-cable')
if channel is not None:
grpc_port_channels[logical_port_name] = channel
Expand All @@ -703,7 +718,7 @@ def check_identifier_presence_and_setup_channel(logical_port_name, port_tbl, hw_
"DAC cable logical to physical port mapping returned more than one physical ports while Channel setup Port {}".format(logical_port_name))


def setup_grpc_channels(stop_event, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, port_tbl, loopback_tbl, port_table_keys, grpc_client, fwd_state_response_tbl):
def setup_grpc_channels(stop_event, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, port_tbl, loopback_tbl, port_table_keys, grpc_client):

global read_side
helper_logger.log_debug("Y_CABLE_DEBUG:setting up channels for active-active")
Expand Down Expand Up @@ -753,7 +768,7 @@ def setup_grpc_channels(stop_event, loopback_keys, hw_mux_cable_tbl, hw_mux_cabl

if logical_port_name in port_table_keys[asic_index]:
check_identifier_presence_and_setup_channel(
logical_port_name, port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, mux_tbl, y_cable_presence, grpc_client, fwd_state_response_tbl)
logical_port_name, port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, mux_tbl, y_cable_presence, grpc_client)
else:
# This port does not exist in Port table of config but is present inside
# logical_ports after loading the port_mappings from port_config_file
Expand Down Expand Up @@ -1381,7 +1396,7 @@ def init_ports_status_for_y_cable(platform_sfp, platform_chassis, y_cable_presen
if status and cable_type == "active-active":
grpc_port_stats[logical_port_name] = {}
check_identifier_presence_and_setup_channel(
logical_port_name, port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, mux_tbl, y_cable_presence, grpc_client, fwd_state_response_tbl)
logical_port_name, port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, mux_tbl, y_cable_presence, grpc_client)
else:
# This port does not exist in Port table of config but is present inside
# logical_ports after loading the port_mappings from port_config_file
Expand All @@ -1390,7 +1405,7 @@ def init_ports_status_for_y_cable(platform_sfp, platform_chassis, y_cable_presen
"Could not retreive port inside config_db PORT table {} for Y-Cable initiation".format(logical_port_name))


def change_ports_status_for_y_cable_change_event(port_dict, y_cable_presence, port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, state_db, stop_event=threading.Event()):
def change_ports_status_for_y_cable_change_event(port_dict, y_cable_presence, port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, state_db, stop_event=threading.Event()):

global read_side
delete_change_event = [False]
Expand Down Expand Up @@ -1422,7 +1437,7 @@ def change_ports_status_for_y_cable_change_event(port_dict, y_cable_presence, po
state_db, port_tbl, y_cable_tbl, static_tbl, mux_tbl, asic_index, logical_port_name, y_cable_presence)
if status and cable_type == "active-active":
check_identifier_presence_and_setup_channel(
logical_port_name, port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, mux_tbl, y_cable_presence, grpc_client, fwd_state_response_tbl)
logical_port_name, port_tbl, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, asic_index, read_side, mux_tbl, y_cable_presence, grpc_client)
elif value == SFP_STATUS_REMOVED:
helper_logger.log_info("Got SFP deleted ycable event")
check_identifier_presence_and_delete_mux_table_entry(
Expand Down Expand Up @@ -3415,7 +3430,7 @@ def handle_fwd_state_command_grpc_notification(fvp_m, hw_mux_cable_tbl, fwd_stat
stub = grpc_port_stubs.get(port, None)
if stub is None:
helper_logger.log_notice("stub is None for getting admin port forwarding state RPC port {}".format(port))
retry_setup_grpc_channel_for_port(port, asic_index, port_tbl, grpc_client, fwd_state_response_tbl)
retry_setup_grpc_channel_for_port(port, asic_index, port_tbl, grpc_client)
stub = grpc_port_stubs.get(port, None)
if stub is None:
helper_logger.log_warning(
Expand Down Expand Up @@ -3497,7 +3512,7 @@ def handle_hw_mux_cable_table_grpc_notification(fvp, hw_mux_cable_tbl, asic_inde
stub = grpc_port_stubs.get(port, None)
if stub is None:
helper_logger.log_debug("Y_CABLE_DEBUG:stub is None for performing hw mux RPC port {}".format(port))
retry_setup_grpc_channel_for_port(port, asic_index, port_tbl, grpc_client, fwd_state_response_tbl)
retry_setup_grpc_channel_for_port(port, asic_index, port_tbl, grpc_client)
stub = grpc_port_stubs.get(port, None)
if stub is None:
helper_logger.log_warning(
Expand Down Expand Up @@ -4108,7 +4123,7 @@ async def task_worker(self):
if soc_ipv4_full is not None:
soc_ipv4 = soc_ipv4_full.split('/')[0]

channel, stub = setup_grpc_channel_for_port(logical_port_name, soc_ipv4, asic_index, self.table_helper.get_grpc_config_tbl(), self.table_helper.get_fwd_state_response_tbl(), True)
channel, stub = setup_grpc_channel_for_port(logical_port_name, soc_ipv4, asic_index, self.table_helper.get_grpc_config_tbl(), True)

client = GracefulRestartClient(logical_port_name, channel, read_side)
tasks.append(asyncio.create_task(client.send_request_and_get_response()))
Expand Down
24 changes: 23 additions & 1 deletion sonic-ycabled/ycable/ycable_utilities/y_cable_table_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@
"""


from sonic_py_common import daemon_base
from sonic_py_common import daemon_base, logger
from sonic_py_common import multi_asic
from swsscommon import swsscommon


SYSLOG_IDENTIFIER = "y_cable_table_helper"

helper_logger = logger.Logger(SYSLOG_IDENTIFIER)

MUX_CABLE_STATIC_INFO_TABLE = "MUX_CABLE_STATIC_INFO"
MUX_CABLE_INFO_TABLE = "MUX_CABLE_INFO"
TRANSCEIVER_INFO_TABLE = 'TRANSCEIVER_INFO'
Expand Down Expand Up @@ -532,3 +537,20 @@ def get_grpc_config_tbl(self):

def get_fwd_state_response_tbl(self):
return self.fwd_state_response_tbl

class YcableChannelStateTableHelper(object):
def __init__(self):

self.appl_db = {}
self.fwd_state_response_tbl = {}

# Get the namespaces in the platform
namespaces = multi_asic.get_front_end_namespaces()
for namespace in namespaces:
asic_id = multi_asic.get_asic_index_from_namespace(namespace)
self.appl_db[asic_id] = daemon_base.db_connect("APPL_DB", namespace)
self.fwd_state_response_tbl[asic_id] = swsscommon.Table(
self.appl_db[asic_id], "FORWARDING_STATE_RESPONSE")

def get_fwd_state_response_tbl(self):
return self.fwd_state_response_tbl
Loading