Default gid & uid for folder creation

This commit is contained in:
Spitap
2025-11-01 17:41:02 +01:00
parent a81cd4196c
commit 39e78649aa

View File

@@ -24,7 +24,6 @@ BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
class FatalError(Exception):
"""A simple exception."""
pass
@@ -77,16 +76,14 @@ def dist_info():
info = {}
with open(path) as fp:
for line in fp.readlines():
if line == '\n':
if line == "\n":
continue
key, value = line.split("=")
value = value.rstrip('"\n')
value = value.strip('"')
info[key] = value
return info["NAME"], info["VERSION_ID"]
printcolor(
"Failed to retrieve information about your system, aborting.",
RED)
printcolor("Failed to retrieve information about your system, aborting.", RED)
sys.exit(1)
@@ -99,14 +96,27 @@ def is_dist_debian_based() -> (bool, str):
"""Check if current OS is Debian based or not."""
status, codename = exec_cmd("lsb_release -c -s")
codename = codename.decode().strip().lower()
return codename in [
"bionic", "bookworm", "bullseye", "buster",
"focal", "jammy", "jessie", "sid", "stretch",
"trusty", "wheezy", "xenial"
], codename
return (
codename
in [
"bionic",
"bookworm",
"bullseye",
"buster",
"focal",
"jammy",
"jessie",
"sid",
"stretch",
"trusty",
"wheezy",
"xenial",
],
codename,
)
def mkdir(path, mode, uid, gid):
def mkdir(path, mode=0o777, uid=os.getuid(), gid=os.getgid()):
"""Create a directory."""
if not os.path.exists(path):
os.mkdir(path, mode)
@@ -115,7 +125,7 @@ def mkdir(path, mode, uid, gid):
os.chown(path, uid, gid)
def mkdir_safe(path, mode, uid, gid):
def mkdir_safe(path, mode=0o777, uid=os.getuid(), gid=os.getgid()):
"""Create a directory. Safe way (-p)"""
if not os.path.exists(path):
os.makedirs(os.path.abspath(path), mode)
@@ -125,8 +135,9 @@ def mkdir_safe(path, mode, uid, gid):
def make_password(length=16):
"""Create a random password."""
return "".join(
random.SystemRandom().choice(
string.ascii_letters + string.digits) for _ in range(length))
random.SystemRandom().choice(string.ascii_letters + string.digits)
for _ in range(length)
)
@contextlib.contextmanager
@@ -149,8 +160,7 @@ def backup_file(fname):
"""Create a backup of a given file."""
for f in glob.glob("{}.old.*".format(fname)):
os.unlink(f)
bak_name = "{}.old.{}".format(
fname, datetime.datetime.now().isoformat())
bak_name = "{}.old.{}".format(fname, datetime.datetime.now().isoformat())
shutil.copy(fname, bak_name)
@@ -171,17 +181,13 @@ def copy_from_template(template, dest, context):
if os.path.isfile(dest):
backup_file(dest)
with open(dest, "w") as fp:
fp.write(
"# This file was automatically installed on {}\n"
.format(now))
fp.write("# This file was automatically installed on {}\n".format(now))
fp.write(ConfigFileTemplate(buf).substitute(context))
def check_config_file(dest,
interactive=False,
upgrade=False,
backup=False,
restore=False):
def check_config_file(
dest, interactive=False, upgrade=False, backup=False, restore=False
):
"""Create a new installer config file if needed."""
is_present = True
if os.path.exists(dest):
@@ -189,22 +195,25 @@ def check_config_file(dest,
if upgrade:
error(
"You cannot upgrade an existing installation without a "
"configuration file.")
"configuration file."
)
sys.exit(1)
elif backup:
is_present = False
error(
"Your configuration file hasn't been found. A new one will be generated. "
"Please edit it with correct password for the databases !")
"Please edit it with correct password for the databases !"
)
elif restore:
error(
"You cannot restore an existing installation without a "
f"configuration file. (file : {dest} has not been found...")
f"configuration file. (file : {dest} has not been found..."
)
sys.exit(1)
printcolor(
"Configuration file {} not found, creating new one."
.format(dest), YELLOW)
"Configuration file {} not found, creating new one.".format(dest), YELLOW
)
gen_config(dest, interactive)
return is_present, None
@@ -217,6 +226,7 @@ def has_colours(stream):
return False # auto color only on TTYs
try:
import curses
curses.setupterm()
return curses.tigetnum("colors") > 2
except:
@@ -251,8 +261,9 @@ def convert_version_to_int(version):
numbers = [int(number_string) for number_string in version.split(".")]
if len(numbers) > len(number_bits):
raise NotImplementedError(
"Versions with more than {0} decimal places are not supported"
.format(len(number_bits) - 1)
"Versions with more than {0} decimal places are not supported".format(
len(number_bits) - 1
)
)
# add 0s for missing numbers
numbers.extend([0] * (len(number_bits) - len(numbers)))
@@ -263,8 +274,9 @@ def convert_version_to_int(version):
max_num = (bits + 1) - 1
if num >= 1 << max_num:
raise ValueError(
"Number {0} cannot be stored with only {1} bits. Max is {2}"
.format(num, bits, max_num)
"Number {0} cannot be stored with only {1} bits. Max is {2}".format(
num, bits, max_num
)
)
number += num << total_bits
total_bits += bits
@@ -315,7 +327,9 @@ def validate(value, config_entry):
return True
def get_entry_value(entry: dict, interactive: bool, config: configparser.ConfigParser) -> string:
def get_entry_value(
entry: dict, interactive: bool, config: configparser.ConfigParser
) -> string:
default_entry = entry["default"]
if type(default_entry) is type(list()):
default_value = str(check_if_condition(config, default_entry)).lower()
@@ -325,7 +339,7 @@ def get_entry_value(entry: dict, interactive: bool, config: configparser.ConfigP
default_value = default_entry
user_value = None
if entry.get("customizable") and interactive:
while (user_value != '' and not validate(user_value, entry)):
while user_value != "" and not validate(user_value, entry):
question = entry.get("question")
if entry.get("values"):
question += " from the list"
@@ -366,14 +380,10 @@ def load_config_template(interactive):
config.add_section(section["name"])
for config_entry in section["values"]:
if config_entry.get("if") is not None:
interactive_section = (interactive_section and
check_if_condition(
config, config_entry["if"]
)
)
value = get_entry_value(config_entry,
interactive_section,
config)
interactive_section = interactive_section and check_if_condition(
config, config_entry["if"]
)
value = get_entry_value(config_entry, interactive_section, config)
config.set(section["name"], config_entry["option"], value)
return config
@@ -393,10 +403,11 @@ def update_config(path, apply_update=True):
dropped_sections = list(set(old_sections) - set(new_sections))
added_sections = list(set(new_sections) - set(old_sections))
if len(dropped_sections) > 0 and apply_update:
printcolor("Following section(s) will not be ported "
"due to being deleted or renamed: " +
', '.join(dropped_sections),
RED)
printcolor(
"Following section(s) will not be ported "
"due to being deleted or renamed: " + ", ".join(dropped_sections),
RED,
)
if len(dropped_sections) + len(added_sections) > 0:
update = True
@@ -409,11 +420,12 @@ def update_config(path, apply_update=True):
dropped_options = list(set(old_options) - set(new_options))
added_options = list(set(new_options) - set(old_options))
if len(dropped_options) > 0 and apply_update:
printcolor(f"Following option(s) from section: {section}, "
"will not be ported due to being "
"deleted or renamed: " +
', '.join(dropped_options),
RED)
printcolor(
f"Following option(s) from section: {section}, "
"will not be ported due to being "
"deleted or renamed: " + ", ".join(dropped_options),
RED,
)
if len(dropped_options) + len(added_options) > 0:
update = True
@@ -466,29 +478,27 @@ def validate_backup_path(path: str, silent_mode: bool):
"""Check if provided backup path is valid or not."""
path_exists = os.path.exists(path)
if path_exists and os.path.isfile(path):
printcolor(
"Error, you provided a file instead of a directory!", RED)
printcolor("Error, you provided a file instead of a directory!", RED)
return None
if not path_exists:
if not silent_mode:
create_dir = input(
f"\"{path}\" doesn't exist, would you like to create it? [y/N]\n"
f'"{path}" doesn\'t exist, would you like to create it? [y/N]\n'
).lower()
if silent_mode or (not silent_mode and create_dir.startswith("y")):
pw = pwd.getpwnam("root")
mkdir_safe(path, stat.S_IRWXU | stat.S_IRWXG, pw[2], pw[3])
else:
printcolor(
"Error, backup directory not present.", RED
)
printcolor("Error, backup directory not present.", RED)
return None
if len(os.listdir(path)) != 0:
if not silent_mode:
delete_dir = input(
"Warning: backup directory is not empty, it will be purged if you continue... [y/N]\n").lower()
"Warning: backup directory is not empty, it will be purged if you continue... [y/N]\n"
).lower()
if silent_mode or (not silent_mode and delete_dir.startswith("y")):
try:
@@ -496,22 +506,19 @@ def validate_backup_path(path: str, silent_mode: bool):
except FileNotFoundError:
pass
shutil.rmtree(os.path.join(path, "custom"),
ignore_errors=False)
shutil.rmtree(os.path.join(path, "custom"), ignore_errors=False)
shutil.rmtree(os.path.join(path, "mails"), ignore_errors=False)
shutil.rmtree(os.path.join(path, "databases"),
ignore_errors=False)
shutil.rmtree(os.path.join(path, "databases"), ignore_errors=False)
else:
printcolor(
"Error: backup directory not clean.", RED
)
printcolor("Error: backup directory not clean.", RED)
return None
backup_path = path
pw = pwd.getpwnam("root")
for dir in ["custom/", "databases/"]:
mkdir_safe(os.path.join(backup_path, dir),
stat.S_IRWXU | stat.S_IRWXG, pw[2], pw[3])
mkdir_safe(
os.path.join(backup_path, dir), stat.S_IRWXU | stat.S_IRWXG, pw[2], pw[3]
)
return backup_path
@@ -539,7 +546,9 @@ def check_app_compatibility(section, config):
if section in APP_INCOMPATIBILITY.keys():
for app in APP_INCOMPATIBILITY[section]:
if config.getboolean(app, "enabled"):
error(f"{section} cannot be installed if {app} is enabled. "
"Please disable one of them.")
error(
f"{section} cannot be installed if {app} is enabled. "
"Please disable one of them."
)
incompatible_app.append(app)
return len(incompatible_app) == 0