Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 179 additions & 0 deletions e3dc/_e3dc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import datetime
import hashlib
import json
import struct
import time
import uuid
from calendar import monthrange
Expand Down Expand Up @@ -1082,6 +1083,184 @@ def get_system_status(self, keepAlive=False):
outObj = {k: SystemStatusBools[v] for k, v in outObj.items()}
return outObj

def get_wallbox_data(self, wbIndex=0, keepAlive=False):
"""Polls the wallbox status via rscp protocol locally.

Args:
wbIndex (Optional[int]): Index of the wallbox to poll data for
keepAlive (Optional[bool]): True to keep connection alive

Returns:
dict: Dictionary containing the wallbox status structured as follows::

{
"appSoftware": <version of the app>,
"chargingActive": <true if charging is currently active, otherwise false>,
"chargingCanceled": <true if charging was manually canceled, otherwise false>,
"consumptionNet": <power currently consumed by the wallbox, provided by the grid in watts>,
"consumptionSun": <power currently consumed by the wallbox, provided by the solar panels in watts>,
"energyAll": <total consumed energy this month in watthours>,
"energyNet": <consumed net energy this month in watthours>,
"energySun": <consumed solar energy this month in watthours>,
"index": <index of the requested wallbox>,
"keyState": <state of the key switch at the wallbox>,
"maxChargeCurrent": <configured maximum charge current in A>,
"phases": <number of phases used for charging>,
"schukoOn": <true if the connected schuko of the wallbox is on, otherwise false>,
"soc": <state of charge>,
"sunModeOn": <true if sun-only-mode is active, false if mixed mode is active>
}
"""
req = self.sendRequest(
(
"WB_REQ_DATA",
"Container",
[
("WB_INDEX", "UChar8", wbIndex),
("WB_REQ_EXTERN_DATA_ALG", "None", None),
("WB_REQ_EXTERN_DATA_SUN", "None", None),
("WB_REQ_EXTERN_DATA_NET", "None", None),
("WB_REQ_APP_SOFTWARE", "None", None),
("WB_REQ_KEY_STATE", "None", None),
],
),
keepAlive=keepAlive,
)

outObj = {
"index": rscpFindTagIndex(req, "WB_INDEX"),
"appSoftware": rscpFindTagIndex(req, "WB_APP_SOFTWARE"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 0 for my E3DC Easy Connect WB.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here. Is that expected behaviour?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 0 for my E3DC Easy Connect WB.

Same here. Is that expected behaviour?

I don't know. I guess this field is not supported / filled currently. Luckily, it is of little interest.

}

extern_data_alg = rscpFindTag(req, "WB_EXTERN_DATA_ALG")
if extern_data_alg is not None:
extern_data = rscpFindTagIndex(extern_data_alg, "WB_EXTERN_DATA")
status_byte = extern_data[2]
outObj["sunModeOn"] = (status_byte & 128) != 0
outObj["chargingCanceled"] = (status_byte & 64) != 0
outObj["chargingActive"] = (status_byte & 32) != 0
outObj["plugLocked"] = (status_byte & 16) != 0
outObj["plugged"] = (status_byte & 8) != 0
outObj["soc"] = extern_data[0]
outObj["phases"] = extern_data[1]
outObj["maxChargeCurrent"] = extern_data[3]
outObj["schukoOn"] = extern_data[5] != 0

extern_data_sun = rscpFindTag(req, "WB_EXTERN_DATA_SUN")
if extern_data_sun is not None:
extern_data = rscpFindTagIndex(extern_data_sun, "WB_EXTERN_DATA")
outObj["consumptionSun"] = struct.unpack("h", extern_data[0:2])[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The total consumption from the solar panels in watthours is returned in extern_data[2:6].
The value seems to be reset by the WB only if no car is connected and nobody requests this value for a few minutes (or hours).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should I implement something here? or just a note?

This comment was marked as resolved.

outObj["energySun"] = struct.unpack("i", extern_data[2:6])[0]

extern_data_net = rscpFindTag(req, "WB_EXTERN_DATA_NET")
if extern_data_net is not None:
extern_data = rscpFindTagIndex(extern_data_net, "WB_EXTERN_DATA")
outObj["consumptionNet"] = struct.unpack("h", extern_data[0:2])[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The total consumption from the grid in watthours is returned in extern_data[2:6].
The value seems to be reset by the WB only if no car is connected and nobody requests this value for a few minutes (or hours).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you mean instead of using extern_data[0:2], I should unpack extern_data[2:6]?

This comment was marked as resolved.

outObj["energyNet"] = struct.unpack("i", extern_data[2:6])[0]

if "energySun" in outObj and "energyNet" in outObj:
outObj["energyAll"] = outObj["energyNet"] + outObj["energySun"]

key_state = rscpFindTag(req, "WB_KEY_STATE")
if key_state is not None:
outObj["keyState"] = rscpFindTagIndex(key_state, "WB_KEY_STATE")

outObj = {k: v for k, v in sorted(outObj.items())}
return outObj

def set_wallbox_sunmode(self, enable: bool, wbIndex=0, keepAlive=False):
"""Sets the sun mode of the wallbox via rscp protocol locally.

Args:
enable (bool): True to enable sun mode, otherwise false,
wbIndex (Optional[int]): index of the requested wallbox,
keepAlive (Optional[bool]): True to keep connection alive
"""
return self.__wallbox_set_extern(0, 1 if enable else 2, wbIndex, keepAlive)

def set_wallbox_schuko(self, on: bool, wbIndex=0, keepAlive=False):
"""Sets the Schuko of the wallbox via rscp protocol locally.

Args:
on (bool): True to activate the Schuko, otherwise false
wbIndex (Optional[int]): index of the requested wallbox,
keepAlive (Optional[bool]): True to keep connection alive
"""
return self.__wallbox_set_extern(5, 1 if on else 0, wbIndex, keepAlive)

def set_wallbox_max_charge_current(
self, max_charge_current: int, wbIndex=0, keepAlive=False
):
"""Sets the maximum charge current of the wallbox via rscp protocol locally.

Args:
max_charge_current (int): maximum allowed charge current in A
wbIndex (Optional[int]): index of the requested wallbox,
keepAlive (Optional[bool]): True to keep connection alive
"""
barry = bytearray([0, 0, max_charge_current, 0, 0, 0])
return self.sendRequest(
(
"WB_REQ_DATA",
"Container",
[
("WB_INDEX", "UChar8", wbIndex),
(
"WB_REQ_SET_PARAM_1",
"Container",
[
("WB_EXTERN_DATA", "ByteArray", barry),
("WB_EXTERN_DATA_LEN", "UChar8", 6),
],
),
],
),
keepAlive=keepAlive,
)

def set_wallbox_phases(self, phases: int, wbIndex=0, keepAlive=False):
"""Sets the number of phases used for charging on the wallbox via rscp protocol locally.

Args:
phases (int): number of phases used, valid values are 1 or 3
wbIndex (Optional[int]): index of the requested wallbox,
keepAlive (Optional[bool]): True to keep connection alive
"""
if phases not in [1, 3]:
raise Exception("Invalid phase given, valid values are 1 or 3")
return self.__wallbox_set_extern(3, phases, wbIndex, keepAlive)

def toggle_wallbox_charging(self, wbIndex=0, keepAlive=False):
"""Toggles charging of the wallbox via rscp protocol locally.

Args:
wbIndex (Optional[int]): index of the requested wallbox,
keepAlive (Optional[bool]): True to keep connection alive
"""
return self.__wallbox_set_extern(4, 1, wbIndex, keepAlive)

def __wallbox_set_extern(self, index: int, value: int, wbIndex, keepAlive=False):
barry = bytearray([0, 0, 0, 0, 0, 0])
barry[index] = value
self.sendRequest(
(
"WB_REQ_DATA",
"Container",
[
("WB_INDEX", "UChar8", wbIndex),
(
"WB_REQ_SET_EXTERN",
"Container",
[
("WB_EXTERN_DATA", "ByteArray", barry),
("WB_EXTERN_DATA_LEN", "UChar8", 6),
],
),
],
),
keepAlive=keepAlive,
)

def get_battery_data(self, batIndex=None, dcbs=None, keepAlive=False):
"""Polls the battery data via rscp protocol locally.

Expand Down