diff --git a/doc/scapy/advanced_usage.rst b/doc/scapy/advanced_usage.rst index 56f957e7357..dbb4ddf835e 100644 --- a/doc/scapy/advanced_usage.rst +++ b/doc/scapy/advanced_usage.rst @@ -1570,6 +1570,53 @@ Close the sockets:: socket0.close() socket1.close() +CAN Calibration Protocol (CCP) +------------------------------ + +CCP is derived from CAN. The CAN-header is part of a CCP frame. CCP has to types +of message objects. One is called Command Receive Object (CRO), the other is called +Data Transmission Object (DTO). Usually CROs are sent to an ECU, and DTOs are received +from an ECU. The information, if one DTO answers a CRO is implemented through a counter +field (ctr). If both objects have the same counter value, the payload of a DTO object +can be interpreted from the command of the associated CRO object. + +Creating a CRO message:: + + CCP(identifier=0x700)/CRO(ctr=1)/CONNECT(station_address=0x02) + CCP(identifier=0x711)/CRO(ctr=2)/GET_SEED(resource=2) + CCP(identifier=0x711)/CRO(ctr=3)/UNLOCK(key=b"123456") + +If we aren't interested in the DTO of an ECU, we can just send a CRO message like this: +Sending a CRO message:: + + pkt = CCP(identifier=0x700)/CRO(ctr=1)/CONNECT(station_address=0x02) + sock = CANSocket(iface=can.interface.Bus(bustype='socketcan', channel='vcan0', bitrate=250000)) + sock.send(pkt) + +If we are interested in the DTO of an ECU, we need to set the basecls parameter of the +CANSocket to CCP and we need to use sr1: +Sending a CRO message:: + + cro = CCP(identifier=0x700)/CRO(ctr=0x53)/PROGRAM_6(data=b"\x10\x11\x12\x10\x11\x12") + sock = CANSocket(iface=can.interface.Bus(bustype='socketcan', channel='vcan0', bitrate=250000), basecls=CCP) + dto = sock.sr1(cro) + dto.show() + ###[ CAN Calibration Protocol ]### + flags= + identifier= 0x700 + length= 8 + reserved= 0 + ###[ DTO ]### + packet_id= 0xff + return_code= acknowledge / no error + ctr= 83 + ###[ PROGRAM_6_DTO ]### + MTA0_extension= 2 + MTA0_address= 0x34002006 + +Since sr1 calls the answers function, our payload of the DTO objects gets interpreted with the +command of our cro object. + ISOTP message ------------- diff --git a/scapy/contrib/automotive/ccp.py b/scapy/contrib/automotive/ccp.py new file mode 100644 index 00000000000..0aada0b6d2f --- /dev/null +++ b/scapy/contrib/automotive/ccp.py @@ -0,0 +1,589 @@ +#! /usr/bin/env python + +# This file is part of Scapy +# See http://www.secdev.org/projects/scapy for more information +# Copyright (C) Nils Weiss +# This program is published under a GPLv2 license + +# scapy.contrib.description = CAN Calibration Protocol (CCP) +# scapy.contrib.status = loads + +import struct + +from scapy.packet import Packet, bind_layers, bind_bottom_up +from scapy.fields import XIntField, FlagsField, ByteEnumField, \ + ThreeBytesField, XBitField, ShortField, IntField, XShortField, \ + ByteField, XByteField, StrFixedLenField, LEShortField +from scapy.layers.can import CAN + + +class CCP(CAN): + name = 'CAN Calibration Protocol' + fields_desc = [ + FlagsField('flags', 0, 3, ['error', + 'remote_transmission_request', + 'extended']), + XBitField('identifier', 0, 29), + ByteField('length', 8), + ThreeBytesField('reserved', 0), + ] + + def extract_padding(self, p): + return p, None + + +class CRO(Packet): + commands = { + 0x01: "CONNECT", + 0x1B: "GET_CCP_VERSION", + 0x17: "EXCHANGE_ID", + 0x12: "GET_SEED", + 0x13: "UNLOCK", + 0x02: "SET_MTA", + 0x03: "DNLOAD", + 0x23: "DNLOAD_6", + 0x04: "UPLOAD", + 0x0F: "SHORT_UP", + 0x11: "SELECT_CAL_PAGE", + 0x14: "GET_DAQ_SIZE", + 0x15: "SET_DAQ_PTR", + 0x16: "WRITE_DAQ", + 0x06: "START_STOP", + 0x07: "DISCONNECT", + 0x0C: "SET_S_STATUS", + 0x0D: "GET_S_STATUS", + 0x0E: "BUILD_CHKSUM", + 0x10: "CLEAR_MEMORY", + 0x18: "PROGRAM", + 0x22: "PROGRAM_6", + 0x19: "MOVE", + 0x05: "TEST", + 0x09: "GET_ACTIVE_CAL_PAGE", + 0x08: "START_STOP_ALL", + 0x20: "DIAG_SERVICE", + 0x21: "ACTION_SERVICE" + } + name = 'Command Receive Object' + fields_desc = [ + ByteEnumField('cmd', 0x01, commands), + ByteField('ctr', 0) + ] + + def hashret(self): + return struct.pack('B', self.ctr) + + +# ##### CROs ###### + +class CONNECT(Packet): + fields_desc = [ + LEShortField('station_address', 0), + StrFixedLenField('ccp_reserved', b'\xff' * 4, length=4), + ] + + +bind_layers(CRO, CONNECT, cmd=0x01) + + +class GET_CCP_VERSION(Packet): + fields_desc = [ + XByteField('main_protocol_version', 0), + XByteField('release_version', 0), + StrFixedLenField('ccp_reserved', b'\xff' * 4, length=4) + ] + + +bind_layers(CRO, GET_CCP_VERSION, cmd=0x1B) + + +class EXCHANGE_ID(Packet): + fields_desc = [ + StrFixedLenField('ccp_master_device_id', b'\x00' * 6, length=6) + ] + + +bind_layers(CRO, EXCHANGE_ID, cmd=0x17) + + +class GET_SEED(Packet): + fields_desc = [ + XByteField('resource', 0), + StrFixedLenField('ccp_reserved', b'\xff' * 5, length=5) + ] + + +bind_layers(CRO, GET_SEED, cmd=0x12) + + +class UNLOCK(Packet): + fields_desc = [ + StrFixedLenField('key', b'\x00' * 6, length=6) + ] + + +bind_layers(CRO, UNLOCK, cmd=0x13) + + +class SET_MTA(Packet): + fields_desc = [ + XByteField('mta_num', 0), + XByteField('address_extension', 0), + XIntField('address', 0), + ] + + +bind_layers(CRO, SET_MTA, cmd=0x02) + + +class DNLOAD(Packet): + fields_desc = [ + XByteField('size', 0), + StrFixedLenField('data', b'\x00' * 5, length=5) + ] + + +bind_layers(CRO, DNLOAD, cmd=0x03) + + +class DNLOAD_6(Packet): + fields_desc = [ + StrFixedLenField('data', b'\x00' * 6, length=6) + ] + + +bind_layers(CRO, DNLOAD_6, cmd=0x23) + + +class UPLOAD(Packet): + fields_desc = [ + XByteField('size', 0), + StrFixedLenField('ccp_reserved', b'\xff' * 5, length=5) + ] + + +bind_layers(CRO, UPLOAD, cmd=0x04) + + +class SHORT_UP(Packet): + fields_desc = [ + XByteField('size', 0), + XByteField('address_extension', 0), + XIntField('address', 0), + ] + + +bind_layers(CRO, SHORT_UP, cmd=0x0F) + + +class SELECT_CAL_PAGE(Packet): + fields_desc = [ + StrFixedLenField('ccp_reserved', b'\xff' * 6, length=6) + ] + + +bind_layers(CRO, SELECT_CAL_PAGE, cmd=0x11) + + +class GET_DAQ_SIZE(Packet): + fields_desc = [ + XByteField('DAQ_num', 0), + XByteField('ccp_reserved', 0), + XIntField('DTO_identifier', 0), + ] + + +bind_layers(CRO, GET_DAQ_SIZE, cmd=0x14) + + +class SET_DAQ_PTR(Packet): + fields_desc = [ + XByteField('DAQ_num', 0), + XByteField('ODT_num', 0), + XByteField('ODT_element', 0), + StrFixedLenField('ccp_reserved', b'\xff' * 3, length=3) + ] + + +bind_layers(CRO, SET_DAQ_PTR, cmd=0x15) + + +class WRITE_DAQ(Packet): + fields_desc = [ + XByteField('DAQ_size', 0), + XByteField('address_extension', 0), + XIntField('address', 0), + ] + + +bind_layers(CRO, WRITE_DAQ, cmd=0x16) + + +class START_STOP(Packet): + fields_desc = [ + XByteField('mode', 0), + XByteField('DAQ_num', 0), + XByteField('ODT_num', 0), + XByteField('event_channel', 0), + XShortField('transmission_rate', 0), + ] + + +bind_layers(CRO, START_STOP, cmd=0x06) + + +class DISCONNECT(Packet): + fields_desc = [ + ByteEnumField('type', 0, {0: "temporary", 1: "end_of_session"}), + StrFixedLenField('ccp_reserved0', b'\xff' * 1, length=1), + LEShortField('station_address', 0), + StrFixedLenField('ccp_reserved', b'\xff' * 2, length=2) + ] + + +bind_layers(CRO, DISCONNECT, cmd=0x07) + + +class SET_S_STATUS(Packet): + name = "Set Session Status" + fields_desc = [ + FlagsField("session_status", 0, 8, ["CAL", "DAQ", "RESUME", "RES0", + "RES1", "RES2", "STORE", "RUN"]), + StrFixedLenField('ccp_reserved', b'\xff' * 5, length=5) + ] + + +bind_layers(CRO, SET_S_STATUS, cmd=0x0C) + + +class GET_S_STATUS(Packet): + fields_desc = [ + StrFixedLenField('ccp_reserved', b'\xff' * 6, length=6) + ] + + +bind_layers(CRO, GET_S_STATUS, cmd=0x0D) + + +class BUILD_CHKSUM(Packet): + fields_desc = [ + IntField('size', 0), + StrFixedLenField('ccp_reserved', b'\xff' * 2, length=2) + ] + + +bind_layers(CRO, BUILD_CHKSUM, cmd=0x0E) + + +class CLEAR_MEMORY(Packet): + fields_desc = [ + IntField('size', 0), + StrFixedLenField('ccp_reserved', b'\xff' * 2, length=2) + ] + + +bind_layers(CRO, CLEAR_MEMORY, cmd=0x10) + + +class PROGRAM(Packet): + fields_desc = [ + XByteField('size', 0), + StrFixedLenField('data', b'\x00' * 0, + length_from=lambda pkt: pkt.size), + StrFixedLenField('ccp_reserved', b'\xff' * 5, + length_from=lambda pkt: 5 - pkt.size) + ] + + +bind_layers(CRO, PROGRAM, cmd=0x18) + + +class PROGRAM_6(Packet): + fields_desc = [ + StrFixedLenField('data', b'\x00' * 6, length=6) + ] + + +bind_layers(CRO, PROGRAM_6, cmd=0x22) + + +class MOVE(Packet): + fields_desc = [ + IntField('size', 0), + StrFixedLenField('ccp_reserved', b'\xff' * 2, length=2) + ] + + +bind_layers(CRO, MOVE, cmd=0x19) + + +class TEST(Packet): + fields_desc = [ + LEShortField('station_address', 0), + StrFixedLenField('ccp_reserved', b'\xff' * 4, length=4) + ] + + +bind_layers(CRO, TEST, cmd=0x05) + + +class GET_ACTIVE_CAL_PAGE(Packet): + fields_desc = [ + StrFixedLenField('ccp_reserved', b'\xff' * 6, length=6) + ] + + +bind_layers(CRO, GET_ACTIVE_CAL_PAGE, cmd=0x09) + + +class START_STOP_ALL(Packet): + fields_desc = [ + ByteEnumField('type', 0, {0: "stop", 1: "start"}), + StrFixedLenField('ccp_reserved', b'\xff' * 5, length=5) + + ] + + +bind_layers(CRO, START_STOP_ALL, cmd=0x08) + + +class DIAG_SERVICE(Packet): + fields_desc = [ + ShortField('diag_service', 0), + StrFixedLenField('ccp_reserved', b'\xff' * 4, length=4) + ] + + +bind_layers(CRO, DIAG_SERVICE, cmd=0x20) + + +class ACTION_SERVICE(Packet): + fields_desc = [ + ShortField('action_service', 0), + StrFixedLenField('ccp_reserved', b'\xff' * 4, length=4) + ] + + +bind_layers(CRO, ACTION_SERVICE, cmd=0x21) + + +# ##### DTOs ###### + +class DEFAULT_DTO(Packet): + fields_desc = [ + StrFixedLenField('load', b'\xff' * 5, length=5), + ] + + +class GET_CCP_VERSION_DTO(Packet): + fields_desc = [ + XByteField('main_protocol_version', 0), + XByteField('release_version', 0), + StrFixedLenField('ccp_reserved', b'\x00' * 3, length=3) + ] + + +class EXCHANGE_ID_DTO(Packet): + fields_desc = [ + ByteField('slave_device_ID_length', 0), + ByteField('data_type_qualifier', 0), + ByteField('resource_availability_mask', 0), + ByteField('resource_protection_mask', 0), + StrFixedLenField('ccp_reserved', b'\xff' * 1, length=1), + ] + + +class GET_SEED_DTO(Packet): + fields_desc = [ + XByteField('protection_status', 0), + StrFixedLenField('seed', b'\x00' * 4, length=4) + ] + + +class UNLOCK_DTO(Packet): + fields_desc = [ + ByteField('privilege_status', 0), + StrFixedLenField('ccp_reserved', b'\xff' * 4, length=4), + ] + + +class DNLOAD_DTO(Packet): + fields_desc = [ + XByteField('MTA0_extension', 0), + XIntField('MTA0_address', 0) + ] + + +class DNLOAD_6_DTO(Packet): + fields_desc = [ + XByteField('MTA0_extension', 0), + XIntField('MTA0_address', 0) + ] + + +class UPLOAD_DTO(Packet): + fields_desc = [ + StrFixedLenField('data', b'\x00' * 5, length=5) + ] + + +class SHORT_UP_DTO(Packet): + fields_desc = [ + StrFixedLenField('data', b'\x00' * 5, length=5) + ] + + +class GET_DAQ_SIZE_DTO(Packet): + fields_desc = [ + XByteField('DAQ_list_size', 0), + XByteField('first_pid', 0), + StrFixedLenField('ccp_reserved', b'\xff' * 3, length=3) + ] + + +class GET_S_STATUS_DTO(Packet): + fields_desc = [ + FlagsField("session_status", 0, 8, ["CAL", "DAQ", "RESUME", "RES0", + "RES1", "RES2", "STORE", "RUN"]), + ByteField('information_qualifier', 0), + StrFixedLenField('information', b'\x00' * 3, length=3) + ] + + +class BUILD_CHKSUM_DTO(Packet): + fields_desc = [ + ByteField('checksum_size', 0), + StrFixedLenField('checksum_data', b'\x00' * 4, + length_from=lambda pkt: pkt.checksum_size), + StrFixedLenField('ccp_reserved', b'\xff' * 0, + length_from=lambda pkt: 4 - pkt.checksum_size) + ] + + +class PROGRAM_DTO(Packet): + fields_desc = [ + ByteField('MTA0_extension', 0), + XIntField('MTA0_address', 0) + ] + + +class PROGRAM_6_DTO(Packet): + fields_desc = [ + ByteField('MTA0_extension', 0), + XIntField('MTA0_address', 0) + ] + + +class GET_ACTIVE_CAL_PAGE_DTO(Packet): + fields_desc = [ + XByteField('address_extension', 0), + XIntField('address', 0) + ] + + +class DIAG_SERVICE_DTO(Packet): + fields_desc = [ + ByteField('data_length', 0), + ByteField('data_type', 0), + StrFixedLenField('ccp_reserved', b'\xff' * 3, length=3) + ] + + +class ACTION_SERVICE_DTO(Packet): + fields_desc = [ + ByteField('data_length', 0), + ByteField('data_type', 0), + StrFixedLenField('ccp_reserved', b'\xff' * 3, length=3) + ] + + +class DTO(Packet): + __slots__ = Packet.__slots__ + ["payload_cls"] + + return_codes = { + 0x00: "acknowledge / no error", + 0x01: "DAQ processor overload", + 0x10: "command processor busy", + 0x11: "DAQ processor busy", + 0x12: "internal timeout", + 0x18: "key request", + 0x19: "session status request", + 0x20: "cold start request", + 0x21: "cal. data init. request", + 0x22: "DAQ list init. request", + 0x23: "code update request", + 0x30: "unknown command", + 0x31: "command syntax", + 0x32: "parameter(s) out of range", + 0x33: "access denied", + 0x34: "overload", + 0x35: "access locked", + 0x36: "resource/function not available" + } + fields_desc = [ + XByteField("packet_id", 0xff), + ByteEnumField('return_code', 0x00, return_codes), + ByteField('ctr', 0) + ] + + def __init__(self, *args, **kwargs): + self.payload_cls = DEFAULT_DTO + if "payload_cls" in kwargs: + self.payload_cls = kwargs["payload_cls"] + del kwargs["payload_cls"] + Packet.__init__(self, *args, **kwargs) + + def guess_payload_class(self, payload): + return self.payload_cls + + @staticmethod + def get_dto_cls(cmd): + dto_cls_mapping = { + 0x03: DNLOAD_DTO, + 0x04: UPLOAD_DTO, + 0x09: GET_ACTIVE_CAL_PAGE_DTO, + 0x0D: GET_S_STATUS_DTO, + 0x0E: BUILD_CHKSUM_DTO, + 0x0F: SHORT_UP_DTO, + 0x12: GET_SEED_DTO, + 0x13: UNLOCK_DTO, + 0x14: GET_DAQ_SIZE_DTO, + 0x17: EXCHANGE_ID_DTO, + 0x18: PROGRAM_DTO, + 0x1B: GET_CCP_VERSION_DTO, + 0x20: DIAG_SERVICE_DTO, + 0x21: ACTION_SERVICE_DTO, + 0x22: PROGRAM_6_DTO, + 0x23: DNLOAD_6_DTO + } + return dto_cls_mapping[cmd] if cmd in dto_cls_mapping.keys() \ + else DEFAULT_DTO + + def answers(self, other): + """In CCP, the payload of a DTO packet is dependent on the cmd field + of a corresponding CRO packet. Two packets correspond, if there + ctr field is equal. If answers detect the corresponding CRO, it will + interpret the payload of a DTO with the correct class. In CCP, there is + no other way, to determine the class of a DTO payload. Since answers is + called on sr and sr1, this modification of the original answers + implementation will give a better user experience. """ + if not hasattr(other, "ctr"): + return 0 + if self.ctr != other.ctr: + return 0 + if not hasattr(other, "cmd"): + return 0 + + new_pl_cls = self.get_dto_cls(other.cmd) + if self.payload_cls != new_pl_cls and \ + self.payload_cls == DEFAULT_DTO: + data = bytes(self.load) + self.remove_payload() + self.add_payload(new_pl_cls(data)) + self.payload_cls = new_pl_cls + return 1 + + def hashret(self): + return struct.pack('B', self.ctr) + + +bind_bottom_up(CCP, DTO) diff --git a/scapy/contrib/automotive/ccp.uts b/scapy/contrib/automotive/ccp.uts new file mode 100644 index 00000000000..c5527e16d44 --- /dev/null +++ b/scapy/contrib/automotive/ccp.uts @@ -0,0 +1,998 @@ +% Regression tests for the CCP layer + +# More information at http://www.secdev.org/projects/UTscapy/ + + +############ +############ + ++ Basic operations + += Load module + +load_contrib("automotive.ccp") + += Build CRO CONNECT + +cro = CCP(identifier=0x700)/CRO(ctr=1)/CONNECT(station_address=0x02) + +assert cro.identifier == 0x700 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 1 +assert cro.cmd == 1 +assert cro.station_address == 0x02 +assert bytes(cro) == b'\x00\x00\x07\x00\x08\x00\x00\x00\x01\x01\x02\x00\xff\xff\xff\xff' + += Dissect DTO CONNECT + +dto = CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x01\xff\xff\xff\xff\xff') + +assert dto.answers(cro) +assert dto.identifier == 0x700 +assert dto.length == 8 +assert dto.flags == 0 +assert dto.ctr == 1 +assert dto.load == b"\xff" * 5 + += Build CRO EXCHANGE_ID + +cro = CCP(identifier=0x700)/CRO(ctr=18)/EXCHANGE_ID(ccp_master_device_id=b'abcdef') + +assert cro.identifier == 0x700 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 18 +assert cro.cmd == 0x17 +assert cro.ccp_master_device_id == b"abcdef" +assert bytes(cro) == b'\x00\x00\x07\x00\x08\x00\x00\x00\x17\x12abcdef' + += Dissect DTO EXCHANGE_ID + +dto = CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x12\x04\x02\x03\x03\xff') + +assert dto.ctr == 18 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\x04\x02\x03\x03\xff' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 18 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == False +assert dto.slave_device_ID_length == 4 +assert dto.data_type_qualifier == 2 +assert dto.resource_availability_mask == 3 +assert dto.resource_protection_mask == 3 +assert dto.ccp_reserved == b"\xff" + += Build CRO GET_SEED + +cro = CCP(identifier=0x711)/CRO(ctr=19)/GET_SEED(resource=2) + +assert cro.identifier == 0x711 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 19 +assert cro.cmd == 0x12 +assert cro.resource == 2 +assert cro.ccp_reserved == b"\xff" * 5 + +assert bytes(cro) == b'\x00\x00\x07\x11\x08\x00\x00\x00\x12\x13\x02\xff\xff\xff\xff\xff' + += Dissect DTO GET_SEED + +dto = CCP(b'\x00\x00\x07\x11\x08\x00\x00\x00\xff\x00\x13\x01\x14\x15\x16\x17') + +assert dto.ctr == 19 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\x01\x14\x15\x16\x17' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 19 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == False +assert dto.protection_status == 0x1 +assert dto.seed == b'\x14\x15\x16\x17' + += Build CRO UNLOCK + +cro = CCP(identifier=0x711)/CRO(ctr=20)/UNLOCK(key=b"123456") + +assert cro.identifier == 0x711 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 20 +assert cro.cmd == 0x13 +assert cro.key == b"123456" + +assert bytes(cro) == b'\x00\x00\x07\x11\x08\x00\x00\x00\x13\x14123456' + += Dissect DTO UNLOCK + +dto = CCP(b'\x00\x00\x07\x11\x08\x00\x00\x00\xff\x00\x14\x02\xff\xff\xff\xff') + +assert dto.ctr == 20 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\x02\xff\xff\xff\xff' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 20 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == False +assert dto.privilege_status == 0x2 +assert dto.ccp_reserved == b"\xff" * 4 + += Build CRO SET_MTA + +cro = CCP(identifier=0x711)/CRO(ctr=21)/SET_MTA(mta_num=0, address_extension=0x02, address=0x34002000) + +assert cro.identifier == 0x711 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 21 +assert cro.cmd == 0x02 +assert cro.mta_num == 0 +assert cro.address_extension == 2 +assert cro.address == 0x34002000 + +assert bytes(cro) == b'\x00\x00\x07\x11\x08\x00\x00\x00\x02\x15\x00\x02\x34\x00\x20\x00' + += Dissect DTO SET_MTA + +dto = CCP(b'\x00\x00\x07\x11\x08\x00\x00\x00\xff\x00\x15\xff\xff\xff\xff\xff') + +assert dto.ctr == 21 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\xff\xff\xff\xff\xff' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 21 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == True +assert dto.load == b"\xff" * 5 + += Build CRO DNLOAD + +cro = CCP(identifier=0x700)/CRO(ctr=17)/DNLOAD(size=0x05, data=b'abcde') + +assert cro.identifier == 0x700 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 17 +assert cro.cmd == 3 +assert cro.size == 0x05 +assert cro.data == b'abcde' +assert bytes(cro) == b'\x00\x00\x07\x00\x08\x00\x00\x00\x03\x11\x05abcde' + += Dissect DTO DNLOAD + +dto = CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x11\x02\x34\x00\x20\x05') + +assert dto.ctr == 17 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\x024\x00 \x05' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 17 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == False +assert dto.MTA0_extension == 2 +assert dto.MTA0_address == 0x34002005 + += Build CRO DNLOAD_6 + +cro = CCP(identifier=0x700)/CRO(ctr=0x40)/DNLOAD_6(data=b'abcdef') + +assert cro.identifier == 0x700 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 0x40 +assert cro.cmd == 0x23 +assert cro.data == b'abcdef' +assert bytes(cro) == b'\x00\x00\x07\x00\x08\x00\x00\x00\x23\x40abcdef' + += Dissect DTO DNLOAD_6 + +dto = CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x40\x02\x34\x00\x20\x06') + +assert dto.ctr == 64 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\x024\x00 \x06' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 64 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == False +assert dto.MTA0_extension == 2 +assert dto.MTA0_address == 0x34002006 + += Build CRO UPLOAD + +cro = CCP(identifier=0x700)/CRO(ctr=0x41)/UPLOAD(size=4) + +assert cro.identifier == 0x700 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 0x41 +assert cro.cmd == 0x04 +assert cro.size == 4 +assert cro.ccp_reserved == b"\xff" * 5 +assert bytes(cro) == b'\x00\x00\x07\x00\x08\x00\x00\x00\x04\x41\x04\xff\xff\xff\xff\xff' + += Dissect DTO UPLOAD + +dto = CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x41\x10\x11\x12\x13\xff') + +assert dto.ctr == 65 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\x10\x11\x12\x13\xff' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 65 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == False +assert dto.data == b"\x10\x11\x12\x13\xff" + += Build CRO SHORT_UP + +cro = CCP(identifier=0x700)/CRO(ctr=0x42)/SHORT_UP(size=4, address_extension=0, address=0x12345678) + +assert cro.identifier == 0x700 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 0x42 +assert cro.cmd == 0x0f +assert cro.size == 4 +assert cro.address == 0x12345678 +assert cro.address_extension == 0 +assert bytes(cro) == b'\x00\x00\x07\x00\x08\x00\x00\x00\x0f\x42\x04\x00\x12\x34\x56\x78' + += Dissect DTO SHORT_UP + +dto = CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x42\x10\x11\x12\x13\xff') + +assert dto.ctr == 66 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\x10\x11\x12\x13\xff' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 66 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == False +assert dto.data == b"\x10\x11\x12\x13\xff" + += Build CRO SELECT_CAL_PAGE + +cro = CCP(identifier=0x700)/CRO(ctr=0x43)/SELECT_CAL_PAGE() + +assert cro.identifier == 0x700 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 0x43 +assert cro.cmd == 0x11 +assert cro.ccp_reserved == b"\xff" * 6 +assert bytes(cro) == b'\x00\x00\x07\x00\x08\x00\x00\x00\x11\x43\xff\xff\xff\xff\xff\xff' + += Dissect DTO SELECT_CAL_PAGE + +dto = CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x43\xff\xff\xff\xff\xff') + +assert dto.ctr == 67 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\xff\xff\xff\xff\xff' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 67 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == True +assert dto.load == b"\xff\xff\xff\xff\xff" + += Build CRO GET_DAQ_SIZE + +cro = CCP(identifier=0x700)/CRO(ctr=0x44)/GET_DAQ_SIZE(DAQ_num=0x03, DTO_identifier=0x1020304) + +assert cro.identifier == 0x700 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 0x44 +assert cro.cmd == 0x14 +assert cro.DAQ_num == 0x03 +assert cro.ccp_reserved == 00 +assert cro.DTO_identifier == 0x01020304 +assert bytes(cro) == b'\x00\x00\x07\x00\x08\x00\x00\x00\x14\x44\x03\x00\x01\x02\x03\x04' + += Dissect DTO GET_DAQ_SIZE + +dto = CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x44\x10\x08\xff\xff\xff') + +assert dto.ctr == 68 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\x10\x08\xff\xff\xff' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 68 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == False +assert dto.DAQ_list_size == 16 +assert dto.first_pid == 8 +assert dto.ccp_reserved == b"\xff\xff\xff" + += Build CRO SET_DAQ_PTR + +cro = CCP(identifier=0x700)/CRO(ctr=0x45)/SET_DAQ_PTR(DAQ_num=3, ODT_num=5, ODT_element=2) + +assert cro.identifier == 0x700 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 0x45 +assert cro.cmd == 0x15 +assert cro.DAQ_num == 0x03 +assert cro.ODT_num == 5 +assert cro.ODT_element == 2 +assert cro.ccp_reserved == b"\xff\xff\xff" +assert bytes(cro) == b'\x00\x00\x07\x00\x08\x00\x00\x00\x15\x45\x03\x05\x02\xff\xff\xff' + += Dissect DTO SET_DAQ_PTR + +dto = CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x45\xff\xff\xff\xff\xff') + +assert dto.ctr == 69 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\xff\xff\xff\xff\xff' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 69 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == True +assert dto.load == b'\xff\xff\xff\xff\xff' + += Build CRO WRITE_DAQ + +cro = CCP(identifier=0x700)/CRO(ctr=0x46)/WRITE_DAQ(DAQ_size=2, address_extension=1, address=0x2004200) + +assert cro.identifier == 0x700 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 0x46 +assert cro.cmd == 0x16 +assert cro.DAQ_size == 0x02 +assert cro.address_extension == 1 +assert cro.address == 0x2004200 +assert bytes(cro) == b'\x00\x00\x07\x00\x08\x00\x00\x00\x16\x46\x02\x01\x02\x00\x42\x00' + += Dissect DTO WRITE_DAQ + +dto = CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x46\xff\xff\xff\xff\xff') + +assert dto.ctr == 70 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\xff\xff\xff\xff\xff' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 70 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == True +assert dto.load == b'\xff\xff\xff\xff\xff' + += Build CRO START_STOP + +cro = CCP(identifier=0x700)/CRO(ctr=0x47)/START_STOP(mode=1, DAQ_num=3, ODT_num=7, event_channel=2, transmission_rate=1) + +assert cro.identifier == 0x700 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 0x47 +assert cro.cmd == 0x06 +assert cro.mode == 0x01 +assert cro.DAQ_num == 3 +assert cro.event_channel == 2 +assert cro.transmission_rate == 1 +assert cro.ODT_num == 7 +assert bytes(cro) == b'\x00\x00\x07\x00\x08\x00\x00\x00\x06\x47\x01\x03\x07\x02\x00\x01' + += Dissect DTO START_STOP + +dto = CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x47\xff\xff\xff\xff\xff') + +assert dto.ctr == 71 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\xff\xff\xff\xff\xff' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 71 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == True +assert dto.load == b'\xff\xff\xff\xff\xff' + += Build CRO DISCONNECT + +cro = CCP(identifier=0x700)/CRO(ctr=0x48)/DISCONNECT(type="temporary", station_address=0x208) + +assert cro.identifier == 0x700 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 0x48 +assert cro.cmd == 0x07 +assert cro.type == 0x00 +assert cro.station_address == 0x208 +assert cro.ccp_reserved0 == b"\xff" +assert cro.ccp_reserved == b"\xff" * 2 +assert bytes(cro) == b'\x00\x00\x07\x00\x08\x00\x00\x00\x07\x48\x00\xff\x08\x02\xff\xff' + += Dissect DTO DISCONNECT + +dto = CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x48\xff\xff\xff\xff\xff') + +assert dto.ctr == 72 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\xff\xff\xff\xff\xff' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 72 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == True +assert dto.load == b'\xff\xff\xff\xff\xff' + += Build CRO SET_S_STATUS + +cro = CCP(identifier=0x700)/CRO(ctr=0x49)/SET_S_STATUS(session_status="RUN+CAL") + +assert cro.identifier == 0x700 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 0x49 +assert cro.cmd == 0x0c +assert cro.session_status == 0x81 +assert cro.ccp_reserved == b"\xff" * 5 +assert bytes(cro) == b'\x00\x00\x07\x00\x08\x00\x00\x00\x0c\x49\x81\xff\xff\xff\xff\xff' + += Dissect DTO SET_S_STATUS + +dto = CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x49\xff\xff\xff\xff\xff') + +assert dto.ctr == 73 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\xff\xff\xff\xff\xff' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 73 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == True +assert dto.load == b'\xff\xff\xff\xff\xff' + += Build CRO GET_S_STATUS + +cro = CCP(identifier=0x700)/CRO(ctr=0x4a)/GET_S_STATUS() + +assert cro.identifier == 0x700 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 0x4a +assert cro.cmd == 0x0D +assert cro.ccp_reserved == b"\xff" * 6 +assert bytes(cro) == b'\x00\x00\x07\x00\x08\x00\x00\x00\x0d\x4a\xff\xff\xff\xff\xff\xff' + += Dissect DTO GET_S_STATUS + +dto = CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x4a\x81\xff\xff\xff\xff') + +assert dto.ctr == 74 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\x81\xff\xff\xff\xff' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 74 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == False +assert dto.session_status == 0x81 +assert dto.information_qualifier == 0xff +assert dto.information == b"\xff" * 3 + += Build CRO BUILD_CHKSUM + +cro = CCP(identifier=0x700)/CRO(ctr=0x50)/BUILD_CHKSUM(size=0x8000) + +assert cro.identifier == 0x700 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 0x50 +assert cro.cmd == 0x0e +assert cro.size == 0x8000 +assert cro.ccp_reserved == b"\xff" * 2 +assert bytes(cro) == b'\x00\x00\x07\x00\x08\x00\x00\x00\x0e\x50\x00\x00\x80\x00\xff\xff' + += Dissect DTO BUILD_CHKSUM + +dto = CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x50\x02\x12\x34\xff\xff') + +assert dto.ctr == 80 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\x02\x12\x34\xff\xff' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 80 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == False +assert dto.checksum_size == 2 +assert dto.checksum_data == b'\x12\x34' +assert dto.ccp_reserved == b'\xff\xff' + += Dissect DTO BUILD_CHKSUM2 + +dto = CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x50\x04\x12\x34\x56\x78') + +assert dto.ctr == 80 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\x04\x12\x34\x56\x78' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 80 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == False +assert dto.checksum_size == 4 +assert dto.checksum_data == b'\x12\x34\x56\x78' +assert dto.ccp_reserved == b'' + += Build CRO CLEAR_MEMORY + +cro = CCP(identifier=0x700)/CRO(ctr=0x51)/CLEAR_MEMORY(size=0x8000) + +assert cro.identifier == 0x700 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 0x51 +assert cro.cmd == 0x10 +assert cro.size == 0x8000 +assert cro.ccp_reserved == b"\xff" * 2 +assert bytes(cro) == b'\x00\x00\x07\x00\x08\x00\x00\x00\x10\x51\x00\x00\x80\x00\xff\xff' + += Dissect DTO CLEAR_MEMORY + +dto = CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x51\xff\xff\xff\xff\xff') + +assert dto.ctr == 81 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\xff\xff\xff\xff\xff' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 81 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == True +assert dto.load == b'\xff\xff\xff\xff\xff' + += Build CRO PROGRAM + +cro = CCP(identifier=0x700)/CRO(ctr=0x52)/PROGRAM(size=0x3, data=b"\x10\x11\x12") + +assert cro.identifier == 0x700 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 0x52 +assert cro.cmd == 0x18 +assert cro.size == 0x3 +assert cro.data == b"\x10\x11\x12" +assert cro.ccp_reserved == b"\xff" * 5 +assert bytes(cro) == b'\x00\x00\x07\x00\x08\x00\x00\x00\x18\x52\x03\x10\x11\x12\xff\xff' + += Dissect DTO PROGRAM + +dto = CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x52\x02\x34\x00\x20\x03') + +assert dto.ctr == 82 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\x02\x34\x00\x20\x03' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 82 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == False +assert dto.MTA0_extension == 2 +assert dto.MTA0_address == 0x34002003 + += Build CRO PROGRAM_6 + +cro = CCP(identifier=0x700)/CRO(ctr=0x53)/PROGRAM_6(data=b"\x10\x11\x12\x10\x11\x12") + +assert cro.identifier == 0x700 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 0x53 +assert cro.cmd == 0x22 +assert cro.data == b"\x10\x11\x12\x10\x11\x12" +assert bytes(cro) == b'\x00\x00\x07\x00\x08\x00\x00\x00\x22\x53\x10\x11\x12\x10\x11\x12' + += Dissect DTO PROGRAM_6 + +dto = CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x53\x02\x34\x00\x20\x06') + +assert dto.ctr == 83 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\x02\x34\x00\x20\x06' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 83 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == False +assert dto.MTA0_extension == 2 +assert dto.MTA0_address == 0x34002006 + += Build CRO MOVE + +cro = CCP(identifier=0x700)/CRO(ctr=0x54)/MOVE(size=0x8000) + +assert cro.identifier == 0x700 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 0x54 +assert cro.cmd == 0x19 +assert cro.size == 0x8000 +assert cro.ccp_reserved == b'\xff\xff' +assert bytes(cro) == b'\x00\x00\x07\x00\x08\x00\x00\x00\x19\x54\x00\x00\x80\x00\xff\xff' + += Dissect DTO MOVE + +dto = CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x54\xff\xff\xff\xff\xff') + +assert dto.ctr == 84 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\xff\xff\xff\xff\xff' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 84 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == True + += Build CRO DIAG_SERVICE + +cro = CCP(identifier=0x700)/CRO(ctr=0x55)/DIAG_SERVICE(diag_service=0x8000) + +assert cro.identifier == 0x700 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 0x55 +assert cro.cmd == 0x20 +assert cro.diag_service == 0x8000 +assert cro.ccp_reserved == b'\xff\xff\xff\xff' +assert bytes(cro) == b'\x00\x00\x07\x00\x08\x00\x00\x00\x20\x55\x80\x00\xff\xff\xff\xff' + += Dissect DTO DIAG_SERVICE + +dto = CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x55\x20\x00\xff\xff\xff') + +assert dto.ctr == 85 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\x20\x00\xff\xff\xff' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 85 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == False +assert dto.data_length == 0x20 +assert dto.data_type == 0x00 +assert dto.ccp_reserved == b"\xff\xff\xff" + += Build CRO ACTION_SERVICE + +cro = CCP(identifier=0x700)/CRO(ctr=0x56)/ACTION_SERVICE(action_service=0x8000) + +assert cro.identifier == 0x700 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 0x56 +assert cro.cmd == 0x21 +assert cro.action_service == 0x8000 +assert cro.ccp_reserved == b'\xff\xff\xff\xff' +assert bytes(cro) == b'\x00\x00\x07\x00\x08\x00\x00\x00\x21\x56\x80\x00\xff\xff\xff\xff' + += Dissect DTO ACTION_SERVICE + +dto = CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x56\x20\x00\xff\xff\xff') + +assert dto.ctr == 86 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\x20\x00\xff\xff\xff' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 86 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == False +assert dto.data_length == 0x20 +assert dto.data_type == 0x00 +assert dto.ccp_reserved == b"\xff\xff\xff" + += Build CRO TEST + +cro = CCP(identifier=0x700)/CRO(ctr=0x60)/TEST(station_address=0x80) + +assert cro.identifier == 0x700 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 0x60 +assert cro.cmd == 0x05 +assert cro.station_address == 0x80 +assert cro.ccp_reserved == b"\xff" * 4 +assert bytes(cro) == b'\x00\x00\x07\x00\x08\x00\x00\x00\x05\x60\x80\x00\xff\xff\xff\xff' + += Dissect DTO TEST + +dto = CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x60\xff\xff\xff\xff\xff') + +assert dto.ctr == 96 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\xff\xff\xff\xff\xff' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 96 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == True +assert dto.load == b'\xff\xff\xff\xff\xff' + += Build CRO START_STOP_ALL + +cro = CCP(identifier=0x700)/CRO(ctr=0x61)/START_STOP_ALL(type="start") + +assert cro.identifier == 0x700 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 0x61 +assert cro.cmd == 0x08 +assert cro.type == 0x01 +assert cro.ccp_reserved == b"\xff" * 5 +assert bytes(cro) == b'\x00\x00\x07\x00\x08\x00\x00\x00\x08\x61\x01\xff\xff\xff\xff\xff' + += Dissect DTO START_STOP_ALL + +dto = CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x61\xff\xff\xff\xff\xff') + +assert dto.ctr == 97 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\xff\xff\xff\xff\xff' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 97 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == True +assert dto.load == b'\xff\xff\xff\xff\xff' + += Build CRO GET_ACTIVE_CAL_PAGE + +cro = CCP(identifier=0x700)/CRO(ctr=0x62)/GET_ACTIVE_CAL_PAGE() + +assert cro.identifier == 0x700 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 0x62 +assert cro.cmd == 0x09 +assert cro.ccp_reserved == b"\xff" * 6 +assert bytes(cro) == b'\x00\x00\x07\x00\x08\x00\x00\x00\x09\x62\xff\xff\xff\xff\xff\xff' + += Dissect DTO GET_ACTIVE_CAL_PAGE + +dto = CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x62\x01\x11\x44\x77\x22') + +assert dto.ctr == 98 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\x01\x11\x44\x77\x22' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 98 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == False +assert dto.address_extension == 1 +assert dto.address == 0x11447722 + += Build CRO GET_CCP_VERSION + +cro = CCP(identifier=0x700)/CRO(ctr=0x63)/GET_CCP_VERSION(main_protocol_version=2, release_version=1) + +assert cro.identifier == 0x700 +assert cro.length == 8 +assert cro.flags == 0 +assert cro.ctr == 0x63 +assert cro.cmd == 0x1b +assert cro.main_protocol_version == 2 +assert cro.release_version == 1 +assert cro.ccp_reserved == b"\xff" * 4 +assert bytes(cro) == b'\x00\x00\x07\x00\x08\x00\x00\x00\x1b\x63\x02\x01\xff\xff\xff\xff' + +assert dto.hashret() != cro.hashret() +assert not dto.answers(cro) + += Dissect DTO GET_CCP_VERSION + +dto = CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x63\x02\x01\xff\xff\xff') + +assert dto.ctr == 99 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert dto.load == b'\x02\x01\xff\xff\xff' +# answers will interpret payload +assert dto.answers(cro) +assert dto.ctr == 99 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == False +assert dto.main_protocol_version == 2 +assert dto.release_version == 1 +assert dto.ccp_reserved == b"\xff" * 3 + +assert dto.hashret() == cro.hashret() + ++ Tests on a virtual CAN-Bus +~ needs_root linux + += Load modules +~ needs_root linux conf +import can +from subprocess import call +import time + +conf.contribs['CANSocket'] = {'use-python-can': True} +load_contrib("cansocket") + +iface0 = "vcan0" + += Initialize a virtual CAN interface +~ needs_root linux conf +if 0 != call("cansend %s 000#" % iface0, shell=True): + # vcan0 is not enabled + if 0 != call("sudo modprobe vcan", shell=True): + raise Exception("modprobe vcan failed") + if 0 != call("sudo ip link add name %s type vcan" % iface0, shell=True): + print("add %s failed: Maybe it was already up?" % iface0) + if 0 != call("sudo ip link set dev %s up" % iface0, shell=True): + raise Exception("could not bring up %s" % iface0) + +if 0 != call("cansend %s 000#" % iface0, shell=True): + raise Exception("cansend doesn't work") + +print("CAN should work now") + += CAN Socket sr1 with dto.ansers(cro) == True +~ needs_root linux + +sock1 = CANSocket(iface=can.interface.Bus(bustype='socketcan', channel='vcan0', bitrate=250000), basecls=CCP) +sock2 = CANSocket(iface=can.interface.Bus(bustype='socketcan', channel='vcan0', bitrate=250000)) + +def ecu(): + pkts = sock2.sniff(count=1, timeout=1) + if len(pkts) == 1: + cro = CRO(pkts[0].data) + assert cro.cmd == 0x22 + assert cro.data == b"\x10\x11\x12\x10\x11\x12" + sock2.send(CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x53\x02\x34\x00\x20\x06')) + + +thread = threading.Thread(target=ecu) +thread.start() +time.sleep(0.1) + +dto = sock1.sr1(CCP(identifier=0x700)/CRO(ctr=0x53)/PROGRAM_6(data=b"\x10\x11\x12\x10\x11\x12"), timeout=1) +thread.join() + +assert dto.ctr == 83 +assert dto.packet_id == 0xff +assert dto.return_code == 0 +assert hasattr(dto, "load") == False +assert dto.MTA0_extension == 2 +assert dto.MTA0_address == 0x34002006 + +sock1.close() +sock2.close() + + += CAN Socket sr1 with dto.ansers(cro) == False +~ needs_root linux + +sock1 = CANSocket(iface=can.interface.Bus(bustype='socketcan', channel='vcan0', bitrate=250000), basecls=CCP) +sock2 = CANSocket(iface=can.interface.Bus(bustype='socketcan', channel='vcan0', bitrate=250000)) + +def ecu(): + pkts = sock2.sniff(count=1, timeout=1) + if len(pkts) == 1: + cro = CRO(pkts[0].data) + assert cro.cmd == 0x22 + assert cro.data == b"\x10\x11\x12\x10\x11\x12" + sock2.send(CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x00\x55\x02\x34\x00\x20\x06')) + + +thread = threading.Thread(target=ecu) +thread.start() +time.sleep(0.1) +gotTimeout = False +try: + dto = sock1.sr1(CCP(identifier=0x700)/CRO(ctr=0x54)/PROGRAM_6(data=b"\x10\x11\x12\x10\x11\x12"), timeout=1) +except CANSocketTimeoutElapsed as e: + gotTimeout = True + +assert gotTimeout +thread.join() + +sock1.close() +sock2.close() + += CAN Socket sr1 with error code +~ needs_root linux + +sock1 = CANSocket(iface=can.interface.Bus(bustype='socketcan', channel='vcan0', bitrate=250000), basecls=CCP) +sock2 = CANSocket(iface=can.interface.Bus(bustype='socketcan', channel='vcan0', bitrate=250000)) + +def ecu(): + pkts = sock2.sniff(count=1, timeout=1) + if len(pkts) == 1: + cro = CRO(pkts[0].data) + assert cro.cmd == 0x22 + assert cro.data == b"\x10\x11\x12\x10\x11\x12" + sock2.send(CCP(b'\x00\x00\x07\x00\x08\x00\x00\x00\xff\x01\x55\xff\xff\xff\xff\xff')) + + +thread = threading.Thread(target=ecu) +thread.start() +time.sleep(0.1) + +dto = sock1.sr1(CCP(identifier=0x700)/CRO(ctr=0x55)/PROGRAM_6(data=b"\x10\x11\x12\x10\x11\x12"), timeout=1) +thread.join() + +assert dto.ctr == 85 +assert dto.packet_id == 0xff +assert dto.return_code == 1 +assert hasattr(dto, "load") == False +assert dto.MTA0_extension == 0xff +assert dto.MTA0_address == 0xffffffff + +sock1.close() +sock2.close() \ No newline at end of file diff --git a/scapy/contrib/cansocket_python_can.py b/scapy/contrib/cansocket_python_can.py index e1bfd5a446a..c62fde289f6 100644 --- a/scapy/contrib/cansocket_python_can.py +++ b/scapy/contrib/cansocket_python_can.py @@ -71,7 +71,7 @@ def send(self, x): is_error_frame=x.flags == 0x1, arbitration_id=x.identifier, dlc=x.length, - data=x.data) + data=bytes(x)[8:]) return self.iface.send(msg) except can_Error as ex: raise ex