Skip to content

Commit 0c85e9a

Browse files
nesitorodesenfanshohAndres D. Molins
authored
Implement VM snapshots (#379)
* Problem: Instances loose data between nodes, so we need a way to handle and share this data. Feature: Implement VM snapshots that will be managed independently. * Fix: Added some refactor for snapshots and handled some errors. * Fix: Split snapshot class in two. * Fix: Make it work well with threads and asyncio. * Fix: Added code improvements with threading and fixed PR comments. * Fix: Fixed check_disk method error. * Fix: Fixed more snapshot errors with multithreading * Fix: CI for Ubuntu was failing too often because of apt/dpkg locks (#380) Problem: the CI is looking for only one lock file, but apt/dpkg use several. Furthermore, `lslocks --json` seems to cut its output to a specific width, and configuration options (`--notruncate`) do not appear to have an impact. Solution: repeat calls to apt-get update until we get the lock. This is not perfect but increases the rate of success. * Create Ubuntu rootfs (#370) Updated creation script to create a BTRFS image of Ubuntu filesystem. * Fix: Use SDK version 0.7.0 (#371) * Support BTRFS filesystem for instances (#373) Added support for BTRFS to execute a VM instance. * Feature: Delete old snapshots after do a new one. * add dependency to python3-schedule * Fix latest_snapshot attribute error * default snapshot frequency 1 hour --------- Co-authored-by: Olivier Desenfans <[email protected]> Co-authored-by: Hugo Herter <[email protected]> Co-authored-by: Andres D. Molins <[email protected]>
1 parent 84ed7d6 commit 0c85e9a

File tree

8 files changed

+292
-4
lines changed

8 files changed

+292
-4
lines changed

packaging/aleph-vm/DEBIAN/control

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@ Version: 0.1.8
33
Architecture: all
44
Maintainer: Aleph.im
55
Description: Aleph.im VM execution engine
6-
Depends: python3,python3-pip,python3-aiohttp,python3-msgpack,python3-aiodns,python3-alembic,python3-sqlalchemy,python3-setproctitle,redis,python3-aioredis,python3-psutil,sudo,acl,curl,systemd-container,squashfs-tools,debootstrap,python3-packaging,python3-cpuinfo,python3-nftables,python3-jsonschema,cloud-image-utils,ndppd,python3-yaml,python3-dotenv
6+
Depends: python3,python3-pip,python3-aiohttp,python3-msgpack,python3-aiodns,python3-alembic,python3-sqlalchemy,python3-setproctitle,redis,python3-aioredis,python3-psutil,sudo,acl,curl,systemd-container,squashfs-tools,debootstrap,python3-packaging,python3-cpuinfo,python3-nftables,python3-jsonschema,cloud-image-utils,ndppd,python3-yaml,python3-dotenv,python3-schedule
77
Section: aleph-im
88
Priority: Extra

vm_supervisor/conf.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ class IPv6AllocationPolicy(str, Enum):
2828
dynamic = "dynamic" # Assign an available IP address.
2929

3030

31+
class SnapshotCompressionAlgorithm(str, Enum):
32+
gz = "gzip"
33+
34+
3135
def etc_resolv_conf_dns_servers():
3236
with open("/etc/resolv.conf", "r") as resolv_file:
3337
for line in resolv_file.readlines():
@@ -146,6 +150,16 @@ class Settings(BaseSettings):
146150
MAX_PROGRAM_ARCHIVE_SIZE = 10_000_000 # 10 MB
147151
MAX_DATA_ARCHIVE_SIZE = 10_000_000 # 10 MB
148152

153+
SNAPSHOT_FREQUENCY: int = Field(
154+
default=60,
155+
description="Snapshot frequency interval in minutes. It will create a VM snapshot every X minutes.",
156+
)
157+
158+
SNAPSHOT_COMPRESSION_ALGORITHM: SnapshotCompressionAlgorithm = Field(
159+
default=SnapshotCompressionAlgorithm.gz,
160+
description="Snapshot compression algorithm.",
161+
)
162+
149163
# hashlib.sha256(b"secret-token").hexdigest()
150164
ALLOCATION_TOKEN_HASH = (
151165
"151ba92f2eb90bce67e912af2f7a5c17d8654b3d29895b042107ea312a7eebda"

vm_supervisor/pool.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from .conf import settings
1010
from .models import ExecutableContent, VmExecution
11+
from .snapshot_manager import SnapshotManager
1112
from .vm.vm_type import VmType
1213

1314
logger = logging.getLogger(__name__)
@@ -26,6 +27,7 @@ class VmPool:
2627
executions: Dict[ItemHash, VmExecution]
2728
message_cache: Dict[str, ExecutableMessage] = {}
2829
network: Optional[Network]
30+
snapshot_manager: SnapshotManager
2931

3032
def __init__(self):
3133
self.counter = settings.START_ID_INDEX
@@ -47,6 +49,9 @@ def __init__(self):
4749
if settings.ALLOW_VM_NETWORKING
4850
else None
4951
)
52+
self.snapshot_manager = SnapshotManager()
53+
logger.debug("Initializing SnapshotManager ...")
54+
self.snapshot_manager.run_snapshots()
5055

5156
async def create_a_vm(
5257
self, vm_hash: ItemHash, message: ExecutableContent, original: ExecutableContent
@@ -64,6 +69,10 @@ async def create_a_vm(
6469
tap_interface = None
6570

6671
await execution.create(vm_id=vm_id, tap_interface=tap_interface)
72+
73+
# Start VM snapshots automatically
74+
await self.snapshot_manager.start_for(execution=execution)
75+
6776
return execution
6877

6978
def get_unique_vm_id(self) -> int:
@@ -125,6 +134,8 @@ async def stop(self):
125134
*(execution.stop() for vm_hash, execution in self.executions.items())
126135
)
127136

137+
await self.snapshot_manager.stop_all()
138+
128139
def get_persistent_executions(self) -> Iterable[VmExecution]:
129140
for vm_hash, execution in self.executions.items():
130141
if execution.persistent and execution.is_running:

vm_supervisor/snapshot_manager.py

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
import asyncio
2+
import logging
3+
import threading
4+
from time import sleep
5+
from typing import Dict, Optional
6+
7+
from aleph_message.models import ItemHash
8+
from schedule import Job, Scheduler
9+
10+
from .conf import settings
11+
from .models import VmExecution
12+
from .snapshots import CompressedDiskVolumeSnapshot
13+
14+
logger = logging.getLogger(__name__)
15+
16+
17+
def wrap_async_snapshot(execution):
18+
asyncio.run(do_execution_snapshot(execution))
19+
20+
21+
def run_threaded_snapshot(execution):
22+
job_thread = threading.Thread(target=wrap_async_snapshot, args=(execution,))
23+
job_thread.start()
24+
25+
26+
async def do_execution_snapshot(execution: VmExecution) -> CompressedDiskVolumeSnapshot:
27+
try:
28+
logger.debug(f"Starting new snapshot for VM {execution.vm_hash}")
29+
assert execution.vm, "VM execution not set"
30+
31+
snapshot = await execution.vm.create_snapshot()
32+
await snapshot.upload()
33+
34+
logger.debug(
35+
f"New snapshots for VM {execution.vm_hash} created in {snapshot.path}"
36+
)
37+
return snapshot
38+
except ValueError:
39+
raise ValueError("Something failed taking an snapshot")
40+
41+
42+
def infinite_run_scheduler_jobs(scheduler: Scheduler) -> None:
43+
while True:
44+
scheduler.run_pending()
45+
sleep(1)
46+
47+
48+
class SnapshotExecution:
49+
vm_hash: ItemHash
50+
execution: VmExecution
51+
frequency: int
52+
_scheduler: Scheduler
53+
_job: Job
54+
55+
def __init__(
56+
self,
57+
scheduler: Scheduler,
58+
vm_hash: ItemHash,
59+
execution: VmExecution,
60+
frequency: int,
61+
):
62+
self.vm_hash = vm_hash
63+
self.execution = execution
64+
self.frequency = frequency
65+
self._scheduler = scheduler
66+
67+
async def start(self) -> None:
68+
logger.debug(
69+
f"Starting snapshots for VM {self.vm_hash} every {self.frequency} minutes"
70+
)
71+
job = self._scheduler.every(self.frequency).minutes.do(
72+
run_threaded_snapshot, self.execution
73+
)
74+
self._job = job
75+
76+
async def stop(self) -> None:
77+
logger.debug(f"Stopping snapshots for VM {self.vm_hash}")
78+
self._scheduler.cancel_job(self._job)
79+
80+
81+
class SnapshotManager:
82+
"""
83+
Manage VM snapshots.
84+
"""
85+
86+
executions: Dict[ItemHash, SnapshotExecution]
87+
_scheduler: Scheduler
88+
89+
def __init__(self):
90+
self.executions = {}
91+
self._scheduler = Scheduler()
92+
93+
def run_snapshots(self) -> None:
94+
job_thread = threading.Thread(
95+
target=infinite_run_scheduler_jobs,
96+
args=[self._scheduler],
97+
daemon=True,
98+
name="SnapshotManager",
99+
)
100+
job_thread.start()
101+
102+
async def start_for(
103+
self, execution: VmExecution, frequency: Optional[int] = None
104+
) -> None:
105+
if not execution.is_instance:
106+
raise TypeError("VM execution should be an Instance only")
107+
108+
if not frequency:
109+
frequency = settings.SNAPSHOT_FREQUENCY
110+
111+
vm_hash = execution.vm_hash
112+
snapshot_execution = SnapshotExecution(
113+
scheduler=self._scheduler,
114+
vm_hash=vm_hash,
115+
execution=execution,
116+
frequency=frequency,
117+
)
118+
self.executions[vm_hash] = snapshot_execution
119+
await snapshot_execution.start()
120+
121+
async def stop_for(self, vm_hash: ItemHash) -> None:
122+
if not self.executions[vm_hash]:
123+
raise ValueError(f"Snapshot execution not running for VM {vm_hash}")
124+
125+
await self.executions[vm_hash].stop()
126+
127+
async def stop_all(self) -> None:
128+
await asyncio.gather(
129+
*(self.stop_for(vm_hash) for vm_hash, execution in self.executions)
130+
)

vm_supervisor/snapshots.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import logging
2+
from pathlib import Path
3+
from typing import Optional
4+
5+
from aleph_message.models import ItemHash
6+
7+
from .conf import SnapshotCompressionAlgorithm
8+
from .storage import compress_volume_snapshot, create_volume_snapshot
9+
10+
logger = logging.getLogger(__name__)
11+
12+
13+
class DiskVolumeFile:
14+
path: Path
15+
size: int
16+
17+
def __init__(self, path: Path):
18+
self.path = path
19+
self.size = path.stat().st_size
20+
21+
22+
class CompressedDiskVolumeSnapshot(DiskVolumeFile):
23+
algorithm: SnapshotCompressionAlgorithm
24+
25+
def __init__(self, path: Path, algorithm: SnapshotCompressionAlgorithm):
26+
super().__init__(path=path)
27+
self.algorithm = algorithm
28+
29+
def delete(self) -> None:
30+
self.path.unlink(missing_ok=True)
31+
32+
async def upload(self) -> ItemHash:
33+
# TODO: Upload snapshots to Aleph Network
34+
pass
35+
36+
37+
class DiskVolumeSnapshot(DiskVolumeFile):
38+
compressed: Optional[CompressedDiskVolumeSnapshot]
39+
40+
def delete(self) -> None:
41+
if self.compressed:
42+
self.compressed.delete()
43+
44+
self.path.unlink(missing_ok=True)
45+
46+
async def compress(
47+
self, algorithm: SnapshotCompressionAlgorithm
48+
) -> CompressedDiskVolumeSnapshot:
49+
compressed_snapshot = await compress_volume_snapshot(self.path, algorithm)
50+
compressed = CompressedDiskVolumeSnapshot(
51+
path=compressed_snapshot, algorithm=algorithm
52+
)
53+
self.compressed = compressed
54+
return compressed
55+
56+
57+
class DiskVolume(DiskVolumeFile):
58+
async def take_snapshot(self) -> DiskVolumeSnapshot:
59+
snapshot = await create_volume_snapshot(self.path)
60+
return DiskVolumeSnapshot(snapshot)

vm_supervisor/storage.py

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
import logging
99
import re
1010
import sys
11+
from datetime import datetime
12+
from enum import Enum
1113
from pathlib import Path
12-
from shutil import make_archive
14+
from shutil import copy2, disk_usage, make_archive
1315
from typing import Union
1416

1517
import aiohttp
@@ -28,14 +30,18 @@
2830
VolumePersistence,
2931
)
3032

31-
from .conf import settings
33+
from .conf import SnapshotCompressionAlgorithm, settings
3234
from .utils import fix_message_validation, run_in_subprocess
3335

3436
logger = logging.getLogger(__name__)
3537

3638
DEVICE_MAPPER_DIRECTORY = "/dev/mapper"
3739

3840

41+
class NotEnoughDiskSpace(Exception):
42+
pass
43+
44+
3945
async def chown_to_jailman(path: Path) -> None:
4046
"""Changes ownership of the target when running firecracker inside jailer isolation."""
4147
if not path.exists():
@@ -315,3 +321,33 @@ async def get_volume_path(volume: MachineVolume, namespace: str) -> Path:
315321
return volume_path
316322
else:
317323
raise NotImplementedError("Only immutable volumes are supported")
324+
325+
326+
async def create_volume_snapshot(path: Path) -> Path:
327+
new_path = Path(f"{path}.{datetime.today().strftime('%d%m%Y-%H%M%S')}.bak")
328+
copy2(path, new_path)
329+
return new_path
330+
331+
332+
async def compress_volume_snapshot(
333+
path: Path,
334+
algorithm: SnapshotCompressionAlgorithm = SnapshotCompressionAlgorithm.gz,
335+
) -> Path:
336+
if algorithm != SnapshotCompressionAlgorithm.gz:
337+
raise NotImplementedError
338+
339+
new_path = Path(f"{path}.gz")
340+
341+
await run_in_subprocess(
342+
[
343+
"gzip",
344+
str(path),
345+
]
346+
)
347+
348+
return new_path
349+
350+
351+
def check_disk_space(bytes_to_use: int) -> bool:
352+
host_disk_usage = disk_usage("/")
353+
return host_disk_usage.free >= bytes_to_use

vm_supervisor/vm/firecracker/executable.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
from vm_supervisor.models import ExecutableContent
2929
from vm_supervisor.network.firewall import teardown_nftables_for_vm
3030
from vm_supervisor.network.interfaces import TapInterface
31+
from vm_supervisor.snapshots import CompressedDiskVolumeSnapshot
3132
from vm_supervisor.storage import get_volume_path
3233

3334
logger = logging.getLogger(__name__)
@@ -287,3 +288,6 @@ async def teardown(self):
287288
teardown_nftables_for_vm(self.vm_id)
288289
await self.tap_interface.delete()
289290
await self.stop_guest_api()
291+
292+
async def create_snapshot(self) -> CompressedDiskVolumeSnapshot:
293+
raise NotImplementedError()

0 commit comments

Comments
 (0)