100 lines
2.9 KiB
Python
100 lines
2.9 KiB
Python
#!/usr/bin/env python3
|
|
|
|
# SPDX-FileCopyrightText: 2023 fosslinux <fosslinux@aussies.space>
|
|
# SPDX-License-Identifier: GPL-3.0-or-later
|
|
|
|
"""
|
|
Contains a class that represents a tmpdir
|
|
"""
|
|
|
|
import enum
|
|
import getpass
|
|
import os
|
|
import shutil
|
|
|
|
from lib.utils import mount, umount, create_disk, run
|
|
|
|
class TmpType(enum.Enum):
|
|
"""Different types of tmpdirs we can have"""
|
|
NONE = 0
|
|
TMPFS = 1
|
|
|
|
class Tmpdir:
|
|
"""
|
|
Represents a tmpdir
|
|
"""
|
|
|
|
_syses = {}
|
|
_disks = {}
|
|
_disk_filesystems = {}
|
|
_mountpoints = {}
|
|
|
|
def __init__(self, preserve, path="tmp"):
|
|
self.path = os.path.abspath(path)
|
|
self.preserve = preserve
|
|
self._type = TmpType.NONE
|
|
|
|
if not os.path.exists(self.path):
|
|
os.mkdir(self.path)
|
|
|
|
def __del__(self):
|
|
for path in self._mountpoints:
|
|
print(f"Unmounting {path}")
|
|
umount(path)
|
|
|
|
if not self.preserve:
|
|
for disk in self._disks.values():
|
|
print(f"Detaching {disk}")
|
|
run("sudo", "losetup", "-d", disk)
|
|
|
|
if self._type == TmpType.TMPFS:
|
|
print(f"Unmounting tmpdir from {self.path}")
|
|
umount(self.path)
|
|
|
|
print(f"Removing {self.path}")
|
|
shutil.rmtree(self.path, ignore_errors=True)
|
|
|
|
def tmpfs(self, size="8G"):
|
|
"""Mount a tmpfs"""
|
|
print(f"Mounting tmpfs on {self.path}")
|
|
mount("tmpfs", self.path, "tmpfs", f"size={size}")
|
|
self._type = TmpType.TMPFS
|
|
|
|
def add_sys(self, name, subdir=None):
|
|
"""Create a subdirectory and register a sys"""
|
|
if subdir is None:
|
|
subdir = name
|
|
sys_path = os.path.join(self.path, name)
|
|
if not os.path.exists(sys_path):
|
|
os.mkdir(sys_path)
|
|
return sys_path
|
|
|
|
def add_disk(self, name, size="8G", filesystem="ext4"):
|
|
"""Add a disk"""
|
|
disk_path = os.path.join(self.path, f"{name}.img")
|
|
self._disks[name] = create_disk(disk_path, "msdos", filesystem, size)
|
|
self._disk_filesystems[name] = filesystem
|
|
# Allow executing user to access it
|
|
run("sudo", "chown", getpass.getuser(), self._disks[name])
|
|
|
|
def mount_disk(self, name, mountpoint=None):
|
|
"""Mount the disk"""
|
|
if mountpoint is None:
|
|
mountpoint = f"{name}_mnt"
|
|
mountpoint = os.path.join(self.path, mountpoint)
|
|
os.mkdir(mountpoint)
|
|
mount(self._disks[name] + "p1", mountpoint, self._disk_filesystems[name])
|
|
# Allow executing user to access it
|
|
run("sudo", "chown", getpass.getuser(), mountpoint)
|
|
self._mountpoints[name] = mountpoint
|
|
return mountpoint
|
|
|
|
def umount_disk(self, name):
|
|
"""Unmount a disk"""
|
|
umount(self._mountpoints[name])
|
|
del self._mountpoints[name]
|
|
|
|
def get_disk(self, name):
|
|
"""Get the path to a device of a disk"""
|
|
return self._disks[name]
|