Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
start implementing TX PDOs
  • Loading branch information
ebroecker committed Jan 17, 2025
1 parent a72fb2c commit b8d2156
Showing 1 changed file with 120 additions and 79 deletions.
199 changes: 120 additions & 79 deletions src/canmatrix/formats/eds.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import canopen.objectdictionary.datatypes
import codecs
import copy
import re

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -33,23 +34,27 @@
0x82: 'Reset Communication'}


def get_signals(parent_object, signal_receiver):
signals = []
position = 0
for sub in range(1, len(parent_object)):
name = parent_object[sub].name
size = datatype_mapping[parent_object[sub].data_type][1]
factor = parent_object[sub].factor
unsigned = "UNSIGNED" in datatype_mapping[parent_object[sub].data_type][0]
signal = canmatrix.Signal(name = name, size=size, start_bit = position, factor=factor, is_signed = not unsigned)
signal.receivers = []
if len(signal_receiver) > 0:
signal.receivers.append(signal_receiver)
if parent_object[sub].min is not None:
signal.offset = parent_object[sub].min
position += size
signals.append(signal)
return signals
def name_cleanup(in_str):
rets_str = re.sub("[^A-Za-z0-9]", '_', in_str)
return rets_str

# def get_signals(parent_object, signal_receiver):
# signals = []
# position = 0
# for sub in range(1, len(parent_object)):
# name = parent_object[sub].name
# size = datatype_mapping[parent_object[sub].data_type][1]
# factor = parent_object[sub].factor
# unsigned = "UNSIGNED" in datatype_mapping[parent_object[sub].data_type][0]
# signal = canmatrix.Signal(name = name, size=size, start_bit = position, factor=factor, is_signed = not unsigned)
# signal.receivers = []
# if len(signal_receiver) > 0:
# signal.receivers.append(signal_receiver)
# if parent_object[sub].min is not None:
# signal.offset = parent_object[sub].min
# position += size
# signals.append(signal)
# return signals


def get_bit_length(data_type_code):
Expand All @@ -62,14 +67,16 @@ def format_index(index, subindex):

def load(f, **options): # type: (typing.IO, **typing.Any) -> canmatrix.CanMatrix
eds_import_encoding = options.get("edsImportEncoding", 'iso-8859-1')
node_id = options.get("eds_node_id", 1)
node_id = options.get("eds_node_id", 0)
generic = options.get("generic", False)
fp = codecs.getreader(eds_import_encoding)(f)
od = canopen.objectdictionary.eds.import_eds(fp, node_id)
db = canmatrix.CanMatrix()
signal_group_counter = 1

node_name = od.device_information.product_name
if len(node_name) == 0:
node_name = "DUMMY"
plc_name = "PLC"
if generic is True:
nm_out = canmatrix.canmatrix.Frame(name="NMT_Out_Request", size=2, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0), transmitters=[plc_name])
Expand Down Expand Up @@ -109,20 +116,6 @@ def load(f, **options): # type: (typing.IO, **typing.Any) -> canmatrix.CanMatri
index.mux_val_grp.append([ 2, 2])
index.muxer_for_signal = "sdo_down_CMD"
sdo_down.add_signal(index)

# sdo_down.add_signal(canmatrix.canmatrix.Signal(name="sdo_down_SUBIDX", size=8, start_bit=24, receivers=[plc_name]))

# bla = canmatrix.canmatrix.Signal(name="test_it", size=8, start_bit=32, receivers=[plc_name])
# bla.muxer_for_signal = "sdo_down_IDX"

#sig_cmd.add_values(0x2f, "8_bytes")
#sdo_down.add_signal(canmatrix.canmatrix.Signal(name="data16", size=16, start_bit=32, receivers=[plc_name], multiplex=0x2b))
#sig_cmd.add_values(0x2b, "16_bytes")
#sdo_down.add_signal(canmatrix.canmatrix.Signal(name="data24", size=24, start_bit=32, receivers=[plc_name], multiplex=0x27))
#sig_cmd.add_values(0x27, "3_bytes")
#sdo_down.add_signal(canmatrix.canmatrix.Signal(name="data32", size=32, start_bit=32, receivers=[plc_name], multiplex=0x23))
#sig_cmd.add_values(0x23, "4_bytes")
#sig_cmd.add_values(0x40, "upload_request")
db.add_frame(sdo_down)

sdo_up = canmatrix.canmatrix.Frame(name="SDO_upload", size=8, arbitration_id=canmatrix.canmatrix.ArbitrationId(id=0x580+node_id), transmitters=[plc_name])
Expand All @@ -148,7 +141,7 @@ def load(f, **options): # type: (typing.IO, **typing.Any) -> canmatrix.CanMatri
if isinstance(obj, canopen.objectdictionary.ODVariable):
subindex = 0
combined_value = int(f"{subindex:02X}{obj.index:04X}", 16)
signal_name = obj.name.replace(' ', '_').replace('-','_')
signal_name = name_cleanup(obj.name)
size = get_bit_length(obj.data_type)
if size == 0:
logger.info("Ignoring " + signal_name + " size 0")
Expand All @@ -170,7 +163,7 @@ def load(f, **options): # type: (typing.IO, **typing.Any) -> canmatrix.CanMatri
members = []
for subobj in obj.values():
combined_value = int(f"{subobj.subindex:02X}{obj.index:04X}", 16)
signal_name = subobj.name.replace(' ', '_').replace('-','_')
signal_name = name_cleanup(subobj.name)
size = get_bit_length(subobj.data_type)
if size == 0:
logger.info("Ignoring " + signal_name + " size 0")
Expand All @@ -190,14 +183,14 @@ def load(f, **options): # type: (typing.IO, **typing.Any) -> canmatrix.CanMatri
up_sig.receivers = []
sdo_up.add_signal(up_sig)
if len(members) > 0:
sdo_down.add_signal_group("SG_R_" + obj.name.replace(' ','_').replace('-','_'), signal_group_counter, members)
sdo_down.add_signal_group("SG_R_" + name_cleanup(obj.name), signal_group_counter, members)
signal_group_counter += 1

elif isinstance(obj, canopen.objectdictionary.ODArray):
members = []
for subobj in obj.values():
combined_value = int(f"{subobj.subindex:02X}{obj.index:04X}", 16)
signal_name = subobj.name.replace(' ', '_').replace('-','_')
signal_name = name_cleanup(subobj.name)
size = get_bit_length(subobj.data_type)
if size == 0:
logger.info("Ignoring " + signal_name + " size 0")
Expand All @@ -217,54 +210,102 @@ def load(f, **options): # type: (typing.IO, **typing.Any) -> canmatrix.CanMatri
up_sig.receivers = []
sdo_up.add_signal(up_sig)
if len(members) > 0:
sdo_down.add_signal_group("SG_A_" + obj.name.replace(' ','_').replace('-','_'), signal_group_counter, members)
sdo_down.add_signal_group("SG_A_" + name_cleanup(obj.name), signal_group_counter, members)
signal_group_counter += 1



# RX Can-Ids ...
for index in range(0x1400, 0x1408):
if index in od:
# store canid in object...
od[index+0x200].canid = od[index][1].default

##RX PDOs
for index in range(0x1600, 0x1608):
if index in od:
pdo_name = od[index].name.replace(" ", "_")
frame = canmatrix.canmatrix.Frame(name=pdo_name, transmitters=[plc_name])
db.add_frame(frame)
frame_id = od[index].canid
frame.arbitration_id = canmatrix.ArbitrationId(id=frame_id)
frame.size = 8
signals = get_signals(od[index], node_name)
for sig in signals:
frame.add_signal(sig)

# RT Can-Ids ...
for index in range(0x1800, 0x1808):
if index in od:
# store canid in object...
od[index+0x200].canid = od[index][1].default & 0x7FF

#TX
for index in range(0x1A00, 0x1A08):
if index in od:
pdo_name = od[index].name.replace(" ", "_")
frame = canmatrix.canmatrix.Frame(name=pdo_name, transmitters=[node_name])
db.add_frame(frame)
frame_id = od[index].canid
frame.arbitration_id = canmatrix.ArbitrationId(id=frame_id)
frame.size = 8
signals = get_signals(od[index], plc_name)
for sig in signals:
frame.add_signal(sig)
# for index in range(0x1400, 0x1408):
# if index in od:
# # store canid in object...
# od[index+0x200].canid = od[index][1].default

# ##RX PDOs
# for index in range(0x1600, 0x1608):
# if index in od:
# pdo_name = od[index].name.replace(" ", "_")
# frame = canmatrix.canmatrix.Frame(name=pdo_name, transmitters=[plc_name])
# # db.add_frame(frame)
# frame_id = od[index].canid
# frame.arbitration_id = canmatrix.ArbitrationId(id=frame_id)
# frame.size = 8
# signals = get_signals(od[index], node_name)
# for sig in signals:
# frame.add_signal(sig)

# # RT Can-Ids ...
# for index in range(0x1800, 0x1808):
# if index in od:
# # store canid in object...
# od[index+0x200].canid = od[index][1].default & 0x7FF

# #TX
# for index in range(0x1A00, 0x1A08):
# if index in od:
# pdo_name = od[index].name.replace(" ", "_")
# frame = canmatrix.canmatrix.Frame(name=pdo_name, transmitters=[node_name])
# # db.add_frame(frame)
# frame_id = od[index].canid
# frame.arbitration_id = canmatrix.ArbitrationId(id=frame_id)
# frame.size = 8
# signals = get_signals(od[index], plc_name)
# for sig in signals:
# frame.add_signal(sig)

for comm_index, map_index in zip(range(0x1800, 0x1808), range(0x1A00, 0x1A08)):
if comm_index not in od or map_index not in od:
continue

# Retrieve the COB-ID
comm_param = od[comm_index] #od.get(comm_index)
cob_id_entry = comm_param.get(1) if comm_param else None
if not cob_id_entry or cob_id_entry.default is None:
# print(f" Warning: No valid COB-ID found for {pdo_type} PDO at index 0x{comm_index:04X}. Skipping.")
continue
cob_id = cob_id_entry.default & 0x7FF
pdo_name = name_cleanup(od[comm_index].name)
frame = canmatrix.canmatrix.Frame(name=pdo_name, transmitters=[node_name])
frame_id = cob_id
frame.arbitration_id = canmatrix.ArbitrationId(id=frame_id)
db.add_frame(frame)
mapping_param = od.get(map_index)
if not mapping_param:
# print(f" Warning: No mapping parameter found for {pdo_type} PDO at index 0x{map_index:04X}.")
continue
num_entries = mapping_param[0].default if 0 in mapping_param else 0
current_bit_start = 0
for subindex in range(1, num_entries + 1):
mapping_entry = mapping_param.get(subindex)
if not mapping_entry or mapping_entry.default is None:
#print(f" Warning: Subindex {subindex} missing for mapping parameter at 0x{map_index:04X}.")
continue

# Decode the mapping entry
mapping_value = mapping_entry.default
obj_index = (mapping_value >> 16) & 0xFFFF
obj_subindex = (mapping_value >> 8) & 0xFF
bit_length = mapping_value & 0xFF

# Fetch the mapped object
mapped_obj = od.get_variable(obj_index, obj_subindex)
if not mapped_obj:
#print(f" Warning: Could not find object at Index: 0x{obj_index:04X}, Subindex: {obj_subindex}.")
continue
signal_name = name_cleanup(mapped_obj.name)
new_sig = canmatrix.Signal(signal_name, size=bit_length, start_bit=current_bit_start)
datatype_name = get_data_type_name(mapping_entry.data_type)
if "UNSIGNED" in datatype_name:
new_sig.is_signed = False
new_sig.factor = mapped_obj.factor
if mapped_obj.min is not None:
new_sig.min = mapped_obj.min
new_sig.offset = mapped_obj.min
if mapped_obj.max is not None:
new_sig.max = mapped_obj.max
frame.add_signal(new_sig)
current_bit_start += bit_length

db.update_ecu_list()
for ecu in db.ecus:
if "-" in ecu.name:
db.rename_ecu(ecu.name, ecu.name.replace("-","_").replace(" ", "_"))
else:
db.rename_ecu(ecu.name, ecu.name.replace(" ", "_"))

db.rename_ecu(ecu.name, name_cleanup(ecu.name))
return db

0 comments on commit b8d2156

Please sign in to comment.