Skip to content

Commit fa60a8a

Browse files
committed
Add manage_flow to superfluid.py
1 parent 5fd68b4 commit fa60a8a

File tree

5 files changed

+119
-11
lines changed

5 files changed

+119
-11
lines changed

src/aleph/sdk/chains/ethereum.py

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,13 @@
2222
BALANCEOF_ABI,
2323
MIN_ETH_BALANCE,
2424
MIN_ETH_BALANCE_WEI,
25+
FlowUpdate,
26+
from_wei_token,
2527
get_chain_id,
2628
get_chains_with_super_token,
2729
get_rpc,
2830
get_super_token_address,
2931
get_token_address,
30-
to_human_readable_token,
3132
)
3233
from ..exceptions import BadSignatureError
3334
from ..utils import bytes_from_hex
@@ -107,7 +108,7 @@ def can_transact(self, block=True) -> bool:
107108
if not valid and block:
108109
raise InsufficientFundsError(
109110
required_funds=MIN_ETH_BALANCE,
110-
available_funds=to_human_readable_token(balance),
111+
available_funds=float(from_wei_token(balance)),
111112
)
112113
return valid
113114

@@ -162,32 +163,55 @@ def get_super_token_balance(self) -> Decimal:
162163
return Decimal(contract.functions.balanceOf(self.get_address()).call())
163164
return Decimal(0)
164165

166+
@property
167+
def has_superfluid_connector(self) -> bool:
168+
return self.superfluid_connector is not None
169+
170+
def can_start_flow(self, flow: Decimal) -> Awaitable[bool]:
171+
"""Check if the account has enough funds to start a Superfluid flow of the given size."""
172+
if not self.has_superfluid_connector:
173+
raise ValueError("Superfluid connector is required to check a flow")
174+
return self.superfluid_connector.can_start_flow(flow)
175+
165176
def create_flow(self, receiver: str, flow: Decimal) -> Awaitable[str]:
166177
"""Creat a Superfluid flow between this account and the receiver address."""
167-
if not self.superfluid_connector:
178+
if not self.has_superfluid_connector:
168179
raise ValueError("Superfluid connector is required to create a flow")
169180
return self.superfluid_connector.create_flow(receiver=receiver, flow=flow)
170181

171182
def get_flow(self, receiver: str) -> Awaitable[Web3FlowInfo]:
172183
"""Get the Superfluid flow between this account and the receiver address."""
173-
if not self.superfluid_connector:
184+
if not self.has_superfluid_connector:
174185
raise ValueError("Superfluid connector is required to get a flow")
175186
return self.superfluid_connector.get_flow(
176187
sender=self.get_address(), receiver=receiver
177188
)
178189

179190
def update_flow(self, receiver: str, flow: Decimal) -> Awaitable[str]:
180191
"""Update the Superfluid flow between this account and the receiver address."""
181-
if not self.superfluid_connector:
192+
if not self.has_superfluid_connector:
182193
raise ValueError("Superfluid connector is required to update a flow")
183194
return self.superfluid_connector.update_flow(receiver=receiver, flow=flow)
184195

185196
def delete_flow(self, receiver: str) -> Awaitable[str]:
186197
"""Delete the Superfluid flow between this account and the receiver address."""
187-
if not self.superfluid_connector:
198+
if not self.has_superfluid_connector:
188199
raise ValueError("Superfluid connector is required to delete a flow")
189200
return self.superfluid_connector.delete_flow(receiver=receiver)
190201

202+
def manage_flow(
203+
self,
204+
receiver: str,
205+
flow: Decimal,
206+
update_type: FlowUpdate,
207+
) -> Awaitable[Optional[str]]:
208+
"""Manage the Superfluid flow between this account and the receiver address."""
209+
if not self.has_superfluid_connector:
210+
raise ValueError("Superfluid connector is required to manage a flow")
211+
return self.superfluid_connector.manage_flow(
212+
receiver=receiver, flow=flow, update_type=update_type
213+
)
214+
191215

192216
def get_fallback_account(
193217
path: Optional[Path] = None, chain: Optional[Chain] = None

src/aleph/sdk/chains/evm.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from aleph_message.models import Chain
66
from eth_account import Account # type: ignore
77

8+
from ..evm_utils import FlowUpdate
89
from .common import get_fallback_private_key
910
from .ethereum import ETHAccount
1011

@@ -29,6 +30,9 @@ def get_token_balance(self) -> Decimal:
2930
def get_super_token_balance(self) -> Decimal:
3031
raise ValueError(f"Super token not implemented for this chain {self.CHAIN}")
3132

33+
def can_start_flow(self, flow: Decimal) -> Awaitable[bool]:
34+
raise ValueError(f"Flow checking not implemented for this chain {self.CHAIN}")
35+
3236
def create_flow(self, receiver: str, flow: Decimal) -> Awaitable[str]:
3337
raise ValueError(f"Flow creation not implemented for this chain {self.CHAIN}")
3438

@@ -41,6 +45,11 @@ def update_flow(self, receiver: str, flow: Decimal) -> Awaitable[str]:
4145
def delete_flow(self, receiver: str) -> Awaitable[str]:
4246
raise ValueError(f"Flow deletion not implemented for this chain {self.CHAIN}")
4347

48+
def manage_flow(
49+
self, receiver: str, flow: Decimal, update_type: FlowUpdate
50+
) -> Awaitable[Optional[str]]:
51+
raise ValueError(f"Flow management not implemented for this chain {self.CHAIN}")
52+
4453

4554
def get_fallback_account(
4655
path: Optional[Path] = None, chain: Optional[Chain] = None

src/aleph/sdk/connectors/superfluid.py

Lines changed: 57 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
11
from __future__ import annotations
22

33
from decimal import Decimal
4-
from typing import TYPE_CHECKING
4+
from typing import TYPE_CHECKING, Optional
55

66
from eth_utils import to_normalized_address
77
from superfluid import CFA_V1, Operation, Web3FlowInfo
88

99
from aleph.sdk.exceptions import InsufficientFundsError
1010

11-
from ..evm_utils import get_super_token_address, to_human_readable_token, to_wei_token
11+
from ..evm_utils import (
12+
FlowUpdate,
13+
from_wei_token,
14+
get_super_token_address,
15+
to_wei_token,
16+
)
1217

1318
if TYPE_CHECKING:
1419
from aleph.sdk.chains.ethereum import ETHAccount
@@ -44,6 +49,7 @@ async def _execute_operation_with_account(self, operation: Operation) -> str:
4449
return await self.account._sign_and_send_transaction(populated_transaction)
4550

4651
def can_start_flow(self, flow: Decimal, block=True) -> bool:
52+
"""Check if the account has enough funds to start a Superfluid flow of the given size."""
4753
valid = False
4854
if self.account.can_transact(block=block):
4955
balance = self.account.get_super_token_balance()
@@ -52,7 +58,7 @@ def can_start_flow(self, flow: Decimal, block=True) -> bool:
5258
if not valid and block:
5359
raise InsufficientFundsError(
5460
required_funds=float(MIN_FLOW_4H),
55-
available_funds=to_human_readable_token(balance),
61+
available_funds=float(from_wei_token(balance)),
5662
)
5763
return valid
5864

@@ -96,3 +102,51 @@ async def update_flow(self, receiver: str, flow: Decimal) -> str:
96102
flow_rate=int(to_wei_token(flow)),
97103
),
98104
)
105+
106+
async def manage_flow(
107+
self,
108+
receiver: str,
109+
flow: Decimal,
110+
update_type: FlowUpdate,
111+
) -> Optional[str]:
112+
"""
113+
Update the flow of a Superfluid stream between a sender and receiver.
114+
This function either increases or decreases the flow rate between the sender and receiver,
115+
based on the update_type. If no flow exists and the update type is augmentation, it creates a new flow
116+
with the specified rate. If the update type is reduction and the reduction amount brings the flow to zero
117+
or below, the flow is deleted.
118+
119+
:param receiver: Address of the receiver in hexadecimal format.
120+
:param flow: The flow rate to be added or removed (in ether).
121+
:param update_type: The type of update to perform (augmentation or reduction).
122+
:return: The transaction hash of the executed operation (create, update, or delete flow).
123+
"""
124+
125+
# Retrieve current flow info
126+
flow_info: Web3FlowInfo = await self.account.get_flow(receiver)
127+
128+
current_flow_rate_wei: Decimal = Decimal(flow_info["flowRate"] or "0")
129+
flow_rate_wei: int = int(to_wei_token(flow))
130+
131+
if update_type == FlowUpdate.INCREASE:
132+
if current_flow_rate_wei > 0:
133+
# Update existing flow by increasing the rate
134+
new_flow_rate_wei = current_flow_rate_wei + flow_rate_wei
135+
new_flow_rate_ether = from_wei_token(new_flow_rate_wei)
136+
return await self.account.update_flow(receiver, new_flow_rate_ether)
137+
else:
138+
# Create a new flow if none exists
139+
return await self.account.create_flow(receiver, flow)
140+
else:
141+
if current_flow_rate_wei > 0:
142+
# Reduce the existing flow
143+
new_flow_rate_wei = current_flow_rate_wei - flow_rate_wei
144+
# Ensure to not leave infinitesimal flows
145+
# Often, there were 1-10 wei remaining in the flow rate, which prevented the flow from being deleted
146+
if new_flow_rate_wei > 99:
147+
new_flow_rate_ether = from_wei_token(new_flow_rate_wei)
148+
return await self.account.update_flow(receiver, new_flow_rate_ether)
149+
else:
150+
# Delete the flow if the new flow rate is zero or negative
151+
return await self.account.delete_flow(receiver)
152+
return None

src/aleph/sdk/evm_utils.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from decimal import Decimal
2+
from enum import Enum
23
from typing import List, Optional, Union
34

45
from aleph_message.models import Chain
@@ -21,11 +22,18 @@
2122
}]"""
2223

2324

24-
def to_human_readable_token(amount: Decimal) -> float:
25-
return float(amount / (Decimal(10) ** Decimal(settings.TOKEN_DECIMALS)))
25+
class FlowUpdate(str, Enum):
26+
REDUCE = "reduce"
27+
INCREASE = "increase"
28+
29+
30+
def from_wei_token(amount: Decimal) -> Decimal:
31+
"""Converts the given wei value to ether."""
32+
return amount / Decimal(10) ** Decimal(settings.TOKEN_DECIMALS)
2633

2734

2835
def to_wei_token(amount: Decimal) -> Decimal:
36+
"""Converts the given ether value to wei."""
2937
return amount * Decimal(10) ** Decimal(settings.TOKEN_DECIMALS)
3038

3139

tests/unit/test_superfluid.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from eth_utils import to_checksum_address
88

99
from aleph.sdk.chains.ethereum import ETHAccount
10+
from aleph.sdk.evm_utils import FlowUpdate
1011

1112

1213
def generate_fake_eth_address():
@@ -24,6 +25,7 @@ def mock_superfluid():
2425
mock_superfluid.create_flow = AsyncMock(return_value="0xTransactionHash")
2526
mock_superfluid.delete_flow = AsyncMock(return_value="0xTransactionHash")
2627
mock_superfluid.update_flow = AsyncMock(return_value="0xTransactionHash")
28+
mock_superfluid.manage_flow = AsyncMock(return_value="0xTransactionHash")
2729

2830
# Mock get_flow to return a mock Web3FlowInfo
2931
mock_flow_info = {"timestamp": 0, "flowRate": 0, "deposit": 0, "owedDeposit": 0}
@@ -98,3 +100,14 @@ async def test_get_flow(eth_account, mock_superfluid):
98100
assert flow_info["flowRate"] == 0
99101
assert flow_info["deposit"] == 0
100102
assert flow_info["owedDeposit"] == 0
103+
104+
105+
@pytest.mark.asyncio
106+
async def test_manage_flow(eth_account, mock_superfluid):
107+
receiver = generate_fake_eth_address()
108+
flow = Decimal("0.005")
109+
110+
tx_hash = await eth_account.manage_flow(receiver, flow, FlowUpdate.INCREASE)
111+
112+
assert tx_hash == "0xTransactionHash"
113+
mock_superfluid.manage_flow.assert_awaited_once()

0 commit comments

Comments
 (0)