From c817bb337e8acdc932f7029936af6b0c4d7dc9bb Mon Sep 17 00:00:00 2001 From: Lars Erik Wik Date: Wed, 11 Mar 2026 13:48:59 +0100 Subject: [PATCH 1/4] Add extraPaths to pyrightconfig for local library resolution Signed-off-by: Lars Erik Wik --- pyrightconfig.json | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pyrightconfig.json b/pyrightconfig.json index 389015c..3ab6871 100644 --- a/pyrightconfig.json +++ b/pyrightconfig.json @@ -1,3 +1,4 @@ { - "reportMissingImports": "none" + "reportMissingImports": "none", + "extraPaths": ["libraries/python"] } From 489311a7a3ba5eba3d35ca2494eab720cc80825a Mon Sep 17 00:00:00 2001 From: Lars Erik Wik Date: Thu, 12 Mar 2026 10:37:40 +0100 Subject: [PATCH 2/4] Custom promise type for managing global sshd configuration Ticket: ENT-13797 Signed-off-by: Lars Erik Wik --- cfbs.json | 9 + promise-types/sshd/LICENSE | 21 ++ promise-types/sshd/README.md | 63 ++++ promise-types/sshd/enable.cf | 6 + promise-types/sshd/example.cf | 33 ++ promise-types/sshd/sshd.py | 570 ++++++++++++++++++++++++++++++++++ 6 files changed, 702 insertions(+) create mode 100644 promise-types/sshd/LICENSE create mode 100644 promise-types/sshd/README.md create mode 100644 promise-types/sshd/enable.cf create mode 100644 promise-types/sshd/example.cf create mode 100644 promise-types/sshd/sshd.py diff --git a/cfbs.json b/cfbs.json index 9a1f191..cc2c295 100644 --- a/cfbs.json +++ b/cfbs.json @@ -302,6 +302,15 @@ "append enable.cf services/init.cf" ] }, + "promise-type-sshd": { + "description": "Promise type to configure sshd.", + "subdirectory": "promise-types/sshd", + "dependencies": ["library-for-promise-types-in-python"], + "steps": [ + "copy sshd.py modules/promises/", + "append enable.cf services/init.cf" + ] + }, "promise-type-systemd": { "description": "Promise type to manage systemd services.", "subdirectory": "promise-types/systemd", diff --git a/promise-types/sshd/LICENSE b/promise-types/sshd/LICENSE new file mode 100644 index 0000000..6e1016e --- /dev/null +++ b/promise-types/sshd/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2025 Northern.tech + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/promise-types/sshd/README.md b/promise-types/sshd/README.md new file mode 100644 index 0000000..4f47b01 --- /dev/null +++ b/promise-types/sshd/README.md @@ -0,0 +1,63 @@ +# `sshd` promise type + +Configures sshd and restarts the service when configuration changes. + +## Promiser +An arbitrary human-readable label that appears in log messages and reports. +Since there is only one global sshd configuration, the promiser is not used to identify a resource. +Example: `"global sshd config"`. + +## Attributes +- Named using sshd's native directive names (e.g. `PermitRootLogin`, not `permit_root_login`) +- Values can be strings or slists +- Validated against `sshd -t` during promise evaluation + +## What the module manages internally +1. **Include directive** — ensures the base `sshd_config` includes the drop-in directory (`sshd_config.d/`) as its first non-comment directive +2. **Drop-in directory** — creates the drop-in directory if it doesn't exist +3. **Drop-in file** — writes directives to `sshd_config.d/00-cfengine.conf` +4. **Service restart** — restarts sshd if configuration was changed and the service is already running +5. **Verification** — verifies the desired attributes appear in the effective sshd config (`sshd -T`) + +## What the module does NOT do +- Install sshd — that is a `packages:` promise +- Ensure sshd is running — that is a `services:` promise +- Manage match blocks — those are a policy-level concern + +## Policy +```cf3 +bundle agent sshd_config +{ + packages: + "openssh-server" + policy => "present"; + + services: + "sshd" + service_policy => "start"; + + vars: + "allowed_users" slist => { "alice", "bob" }; + + sshd: + "global" + PermitRootLogin => "no", + PasswordAuthentication => "no", + Port => "22", + AllowUsers => @(allowed_users); +} +``` + +## Authors + +This software was created by the team at [Northern.tech](https://northern.tech), with many contributions from the community. +Thanks everyone! + +## Contribute + +Feel free to open pull requests to expand this documentation, add features, or fix problems. +You can also pick up an existing task or file an issue in [our bug tracker](https://northerntech.atlassian.net/). + +## License + +This software is licensed under the MIT License. See LICENSE in the root of the repository for the full license text. diff --git a/promise-types/sshd/enable.cf b/promise-types/sshd/enable.cf new file mode 100644 index 0000000..89b2c9a --- /dev/null +++ b/promise-types/sshd/enable.cf @@ -0,0 +1,6 @@ +promise agent sshd +# @brief Define sshd promise type +{ + path => "$(sys.workdir)/modules/promises/sshd.py"; + interpreter => "/usr/bin/python3"; +} diff --git a/promise-types/sshd/example.cf b/promise-types/sshd/example.cf new file mode 100644 index 0000000..4070cdb --- /dev/null +++ b/promise-types/sshd/example.cf @@ -0,0 +1,33 @@ +promise agent sshd +# @brief Define sshd promise type +{ + path => "$(sys.workdir)/modules/promises/sshd.py"; + interpreter => "/usr/bin/python3"; +} + +bundle agent example +{ + packages: + "openssh-server" + policy => "present"; + + services: + "sshd" + service_policy => "start"; + + vars: + "allowed_users" slist => { "alice", "bob" }; + + sshd: + "global" + PermitRootLogin => "no", + PasswordAuthentication => "no", + Port => "22", + AllowUsers => @(allowed_users); +} + +bundle agent __main__ +{ + methods: + "example"; +} diff --git a/promise-types/sshd/sshd.py b/promise-types/sshd/sshd.py new file mode 100644 index 0000000..c0c882e --- /dev/null +++ b/promise-types/sshd/sshd.py @@ -0,0 +1,570 @@ +import os +import re +import sys +import subprocess +import tempfile + +try: + from cfengine_module_library import PromiseModule, ValidationError, Result +except ImportError: + sys.path.append(os.path.join(os.path.dirname(__file__), "../../libraries/python")) + from cfengine_module_library import PromiseModule, ValidationError, Result + + +BASE_CONFIG = "/etc/ssh/sshd_config" +DROP_IN_DIR = "/etc/ssh/sshd_config.d/" +CFE_CONFIG = os.path.join(DROP_IN_DIR, "00-cfengine.conf") + + +def sshd_quote(value: str) -> str: + """Quote a string for sshd_config. Values containing whitespace, '#', or + '\"' are wrapped in double quotes, with internal backslashes and double + quotes escaped.""" + if not value: + return '""' + if re.search(r'[\s#"]', value): + escaped = value.replace("\\", "\\\\").replace('"', '\\"') + return f'"{escaped}"' + return value + + +def to_sshd_value(value: str | list[str]) -> str: + """Convert a Python value to an sshd config value. Lists are space-joined, + individual strings are quoted when necessary.""" + if isinstance(value, list): + return " ".join(sshd_quote(v) for v in value) + if isinstance(value, str): + return sshd_quote(value) + raise TypeError(f"Expected str or list[str], got {type(value).__name__}") + + +def try_unlink(path: str): + """Remove a file, ignoring errors if it no longer exists.""" + try: + os.unlink(path) + except OSError: + pass + + +def safe_write(path: str, lines: list[str]): + """Atomically write lines to a file via a temporary file in the same directory.""" + dir = os.path.dirname(path) + base = os.path.basename(path) + prefix, suffix = os.path.splitext(base) + + fd, tmp_path = tempfile.mkstemp(prefix=prefix, suffix=suffix, dir=dir) + try: + os.fchmod(fd, 0o600) # rw------- + with os.fdopen(fd, "w") as f: + f.writelines(lines) + os.replace(tmp_path, path) + except BaseException: + # BaseException (not Exception) to also clean up on KeyboardInterrupt/SystemExit + try_unlink(tmp_path) + raise + + +def is_drop_in_directive(directive: str) -> bool: + """Check if a directive is an Include for the drop-in config directory.""" + m = re.match( + rf"include(\s+|\s*=\s*){re.escape(DROP_IN_DIR)}\*\.conf", + directive.strip(), + re.IGNORECASE, + ) + return m is not None + + +def update_result(old: str, new: str) -> str: + """Return the worst of two results. Severity: KEPT < REPAIRED < NOT_KEPT.""" + if old == Result.NOT_KEPT or new == Result.NOT_KEPT: + return Result.NOT_KEPT + if old == Result.REPAIRED or new == Result.REPAIRED: + return Result.REPAIRED + return Result.KEPT + + +def strip_trailing_comment(directive: str) -> str: + """Remove trailing comment from a directive, respecting double-quoted strings.""" + in_quotes = False + i = 0 + while i < len(directive): + c = directive[i] + if c == "\\" and in_quotes: + i += 2 # skip escaped character + continue + if c == '"': + in_quotes = not in_quotes + elif c == "#" and not in_quotes: + return directive[:i].rstrip() + i += 1 + return directive + + +def normalize_directive(directive: str) -> str: + """Normalize a directive by removing trailing comments and replacing = with a space.""" + directive = strip_trailing_comment(directive) + # Normalize separator (= or whitespace) to a single space + directive = re.sub(r"\s*=\s*", " ", directive, count=1) + return directive.strip().lower() + + +def get_directives(lines: list[str]) -> set[str]: + """Extract and normalize all non-comment, non-empty directives from lines.""" + return { + normalize_directive(line) + for line in lines + if line.strip() and not line.strip().startswith("#") + } + + +def has_same_directives(a: list[str], b: list[str]) -> bool: + """Check if two sets of lines contain the same directives, ignoring comments and order.""" + return get_directives(a) == get_directives(b) + + +def get_first_directive(lines: list[str]) -> str | None: + """Return the first non-comment, non-empty directive, or None if not found.""" + for line in lines: + stripped = line.strip() + if stripped and not stripped.startswith("#"): + return stripped + return None + + +class SshdPromiseTypeModule(PromiseModule): + + def __init__(self): + super().__init__("sshd_promise_module", "0.0.0") + + def validate_promise( # pyright: ignore[reportImplicitOverride] + self, promiser: str, attributes: dict[str, object], metadata: dict[str, str] + ): + for attr, value in attributes.items(): + if not isinstance(value, (str, list)): + raise ValidationError(f"Attribute '{attr}' must be a string or slist") + + def ensure_include_directive(self) -> str: + """Ensure the base sshd config includes the drop-in directory.""" + try: + with open(BASE_CONFIG, "r") as f: + lines = f.readlines() + except FileNotFoundError: + self.log_error( # pyright: ignore[reportUnknownMemberType] + f"Base configuration file '{BASE_CONFIG}' does not exist" + ) + return Result.NOT_KEPT + + first_directive = get_first_directive(lines) + + if first_directive is None or not is_drop_in_directive(first_directive): + include_directive = f"Include {DROP_IN_DIR}*.conf" + + self.log_debug( # pyright: ignore[reportUnknownMemberType] + f"Expected first directive in '{BASE_CONFIG}' to be '{include_directive}'" + ) + + lines.insert(0, f"{include_directive} # Added by CFEngine\n") + try: + safe_write(BASE_CONFIG, lines) + except Exception as e: + self.log_error( # pyright: ignore[reportUnknownMemberType] + f"Failed to write '{BASE_CONFIG}': {e}" + ) + return Result.NOT_KEPT + + self.log_info( # pyright: ignore[reportUnknownMemberType] + f"Added include directive to '{BASE_CONFIG}'" + ) + return Result.REPAIRED + + return Result.KEPT + + def ensure_drop_in_dir(self) -> str: + """Ensure the drop-in config directory exists.""" + if os.path.isdir(DROP_IN_DIR): + return Result.KEPT + + try: + os.makedirs(DROP_IN_DIR, mode=0o755) # rwxr-xr-x + except Exception as e: + self.log_error( # pyright: ignore[reportUnknownMemberType] + f"Failed to create drop-in directory '{DROP_IN_DIR}': {e}" + ) + return Result.NOT_KEPT + + self.log_info( # pyright: ignore[reportUnknownMemberType] + f"Created drop-in directory '{DROP_IN_DIR}'" + ) + return Result.REPAIRED + + def ensure_drop_in_config(self, attributes: dict[str, object]) -> str: + """Write the CFEngine drop-in config file with the given attributes.""" + lines = ["# Managed by CFEngine\n"] + for attr, value in attributes.items(): + # Ensured by validate_promise + assert isinstance(value, (str, list)) + lines.append( + f"{attr} {to_sshd_value(value)}\n" # pyright: ignore[reportUnknownArgumentType] + ) + + try: + with open(CFE_CONFIG, "r") as f: + existing = f.readlines() + except FileNotFoundError: + existing = [] + + if has_same_directives(lines, existing): + return Result.KEPT + + try: + safe_write(CFE_CONFIG, lines) + except Exception as e: + self.log_error( # pyright: ignore[reportUnknownMemberType] + f"Failed to write drop-in config '{CFE_CONFIG}': {e}" + ) + return Result.NOT_KEPT + + self.log_info( # pyright: ignore[reportUnknownMemberType] + f"Updated drop-in config '{CFE_CONFIG}'" + ) + return Result.REPAIRED + + def validate_config(self) -> str: + """Validate the sshd configuration using sshd -t.""" + r = subprocess.run( + ["/usr/sbin/sshd", "-t"], + capture_output=True, + text=True, + ) + if r.returncode != 0: + self.log_error( # pyright: ignore[reportUnknownMemberType] + f"Configuration validation failed: {r.stderr.strip()}" + ) + return Result.NOT_KEPT + return Result.KEPT + + def restart_sshd(self) -> str: + """Restart the sshd service if it is currently running.""" + r = subprocess.run( + ["systemctl", "is-active", "--quiet", "sshd"], + ) + if r.returncode != 0: + # If sshd is not running, do nothing + self.log_debug( # pyright: ignore[reportUnknownMemberType] + "The service sshd is not running" + ) + return Result.KEPT + + r = subprocess.run( + ["systemctl", "restart", "--quiet", "sshd"], + ) + if r.returncode != 0: + self.log_error( # pyright: ignore[reportUnknownMemberType] + "Failed to restart sshd service" + ) + return Result.NOT_KEPT + + self.log_info( # pyright: ignore[reportUnknownMemberType] + "Restarted sshd service" + ) + return Result.REPAIRED + + def verify_effective_config(self, attributes: dict[str, object]) -> str: + """Verify that the desired attributes appear in the effective sshd config.""" + r = subprocess.run( + ["/usr/sbin/sshd", "-T"], + capture_output=True, + text=True, + ) + if r.returncode != 0: + self.log_error( # pyright: ignore[reportUnknownMemberType] + "Failed to retrieve effective sshd configuration" + ) + return Result.NOT_KEPT + + effective = get_directives(r.stdout.splitlines()) + + desired: list[str] = [] + for attr, value in attributes.items(): + # Ensured by validate_promise + assert isinstance(value, (str, list)) + if isinstance(value, list): + # sshd -T splits multi-argument keywords into separate + # lines (e.g. "AllowUsers user1 user2" becomes two lines: + # "allowusers user1" and "allowusers user2"), so we must + # expand list values into individual directives to match + # the effective config format for set comparison. + for item in value: + desired.append(f"{attr} {to_sshd_value(item)}") + else: + desired.append(f"{attr} {to_sshd_value(value)}") + + missing = get_directives(desired) - effective + if missing: + self.log_error( # pyright: ignore[reportUnknownMemberType] + f"Missing directives in effective sshd config: {missing}" + ) + return Result.NOT_KEPT + + return Result.KEPT + + def evaluate_promise( # pyright: ignore[reportImplicitOverride] + self, promiser: str, attributes: dict[str, object], metadata: dict[str, str] + ) -> str: + result = Result.KEPT + + # Step 1: Ensure the base config includes the drop-in directory + result = update_result(result, self.ensure_include_directive()) + + # Step 2: Ensure the drop-in directory exists + result = update_result(result, self.ensure_drop_in_dir()) + + # Step 3: Ensure the drop-in config file contains the desired attributes + result = update_result(result, self.ensure_drop_in_config(attributes)) + + # Step 4: Validate config before restarting sshd + result = update_result(result, self.validate_config()) + + # Step 5: Restart sshd only if configuration was changed + if result == Result.REPAIRED: + result = update_result(result, self.restart_sshd()) + + # Step 6: Verify the effective config matches the desired attributes + if result != Result.NOT_KEPT: + result = update_result(result, self.verify_effective_config(attributes)) + + return result + + +def test_sshd_quote_simple(): + assert sshd_quote("no") == "no" + + +def test_sshd_quote_empty(): + assert sshd_quote("") == '""' + + +def test_sshd_quote_space(): + assert sshd_quote("some value") == '"some value"' + + +def test_sshd_quote_tab(): + assert sshd_quote("some\tvalue") == '"some\tvalue"' + + +def test_sshd_quote_hash(): + assert sshd_quote("before#after") == '"before#after"' + + +def test_sshd_quote_double_quote(): + assert sshd_quote('say "hello"') == '"say \\"hello\\""' + + +def test_sshd_quote_backslash(): + assert sshd_quote("path\\to") == "path\\to" + + +def test_sshd_quote_backslash_and_space(): + assert sshd_quote("path\\to dir") == '"path\\\\to dir"' + + +def test_to_sshd_value_str(): + assert to_sshd_value("no") == "no" + + +def test_to_sshd_value_str_with_spaces(): + assert to_sshd_value("some value") == '"some value"' + + +def test_to_sshd_value_list(): + assert to_sshd_value(["user1", "user2"]) == "user1 user2" + + +def test_to_sshd_value_list_with_quoting(): + assert to_sshd_value(["user1", "user 2"]) == 'user1 "user 2"' + + +def test_get_first_directive(): + lines = ["# comment\n", "PermitRootLogin no\n", "Port 22\n"] + assert get_first_directive(lines) == "PermitRootLogin no" + + +def test_get_first_directive_no_comments(): + lines = ["PermitRootLogin no\n", "Port 22\n"] + assert get_first_directive(lines) == "PermitRootLogin no" + + +def test_get_first_directive_all_comments(): + lines = ["# comment\n", "# another comment\n"] + assert get_first_directive(lines) is None + + +def test_get_first_directive_empty(): + assert get_first_directive([]) is None + + +def test_get_first_directive_extra_whitespace(): + lines = ["# comment\n", "PermitRootLogin no\n"] + assert get_first_directive(lines) == "PermitRootLogin no" + + +def test_get_first_directive_equal_sign(): + lines = ["# comment\n", "PermitRootLogin=no\n"] + assert get_first_directive(lines) == "PermitRootLogin=no" + + +def test_get_first_directive_blank_lines(): + lines = ["\n", " \n", "# comment\n", "Port 22\n"] + assert get_first_directive(lines) == "Port 22" + + +def test_normalize_directive_simple(): + assert normalize_directive("permitrootlogin no") == "permitrootlogin no" + + +def test_normalize_directive_trailing_comment(): + assert normalize_directive("PermitRootLogin no # comment") == "permitrootlogin no" + + +def test_normalize_directive_equal_sign(): + assert normalize_directive("PermitRootLogin=no") == "permitrootlogin no" + + +def test_normalize_directive_space_equal_space(): + assert normalize_directive("PermitRootLogin = no") == "permitrootlogin no" + + +def test_normalize_directive_equal_and_comment(): + assert normalize_directive("PermitRootLogin=no # comment") == "permitrootlogin no" + + +def test_normalize_directive_leading_trailing_whitespace(): + assert normalize_directive(" PermitRootLogin no ") == "permitrootlogin no" + + +def test_normalize_directive_hash_in_quoted_value(): + assert ( + normalize_directive('Banner "/etc/banner #info"') + == 'banner "/etc/banner #info"' + ) + + +def test_get_directives_filters_comments(): + lines = ["# comment\n", "PermitRootLogin no\n", "Port 22\n"] + assert get_directives(lines) == {"permitrootlogin no", "port 22"} + + +def test_get_directives_filters_blank_lines(): + lines = ["\n", "PermitRootLogin no\n", " \n"] + assert get_directives(lines) == {"permitrootlogin no"} + + +def test_get_directives_normalizes(): + lines = ["PermitRootLogin=no # managed\n", "Port 22\n"] + assert get_directives(lines) == {"permitrootlogin no", "port 22"} + + +def test_get_directives_empty(): + assert get_directives([]) == set() + + +def test_has_same_directives_same(): + a = ["# comment\n", "PermitRootLogin no\n", "Port 22\n"] + b = ["Port 22\n", "PermitRootLogin no\n"] + assert has_same_directives(a, b) + + +def test_has_same_directives_different_comments(): + a = ["# managed by X\n", "PermitRootLogin no\n"] + b = ["# managed by Y\n", "PermitRootLogin no\n"] + assert has_same_directives(a, b) + + +def test_has_same_directives_different_format(): + a = ["PermitRootLogin=no # comment\n"] + b = ["PermitRootLogin no\n"] + assert has_same_directives(a, b) + + +def test_has_same_directives_missing(): + a = ["PermitRootLogin no\n", "Port 22\n"] + b = ["PermitRootLogin no\n"] + assert not has_same_directives(a, b) + + +def test_has_same_directives_extra(): + a = ["PermitRootLogin no\n"] + b = ["PermitRootLogin no\n", "Port 22\n"] + assert not has_same_directives(a, b) + + +def test_is_drop_in_directive_space(): + assert is_drop_in_directive(f"Include {DROP_IN_DIR}*.conf") + + +def test_is_drop_in_directive_equal(): + assert is_drop_in_directive(f"Include={DROP_IN_DIR}*.conf") + + +def test_is_drop_in_directive_space_equal_space(): + assert is_drop_in_directive(f"Include = {DROP_IN_DIR}*.conf") + + +def test_is_drop_in_directive_case_insensitive(): + assert is_drop_in_directive(f"include {DROP_IN_DIR}*.conf") + + +def test_is_drop_in_directive_extra_files(): + assert is_drop_in_directive(f"Include {DROP_IN_DIR}*.conf /other/path") + + +def test_is_drop_in_directive_wrong_path(): + assert not is_drop_in_directive("Include /other/path/*.conf") + + +def test_is_drop_in_directive_no_separator(): + assert not is_drop_in_directive(f"Include{DROP_IN_DIR}*.conf") + + +def test_is_drop_in_directive_not_include(): + assert not is_drop_in_directive("permitrootlogin no") + + +def test_update_result_kept_kept(): + assert update_result(Result.KEPT, Result.KEPT) == Result.KEPT + + +def test_update_result_kept_repaired(): + assert update_result(Result.KEPT, Result.REPAIRED) == Result.REPAIRED + + +def test_update_result_kept_not_kept(): + assert update_result(Result.KEPT, Result.NOT_KEPT) == Result.NOT_KEPT + + +def test_update_result_repaired_kept(): + assert update_result(Result.REPAIRED, Result.KEPT) == Result.REPAIRED + + +def test_update_result_repaired_repaired(): + assert update_result(Result.REPAIRED, Result.REPAIRED) == Result.REPAIRED + + +def test_update_result_repaired_not_kept(): + assert update_result(Result.REPAIRED, Result.NOT_KEPT) == Result.NOT_KEPT + + +def test_update_result_not_kept_kept(): + assert update_result(Result.NOT_KEPT, Result.KEPT) == Result.NOT_KEPT + + +def test_update_result_not_kept_repaired(): + assert update_result(Result.NOT_KEPT, Result.REPAIRED) == Result.NOT_KEPT + + +def test_update_result_not_kept_not_kept(): + assert update_result(Result.NOT_KEPT, Result.NOT_KEPT) == Result.NOT_KEPT + + +if __name__ == "__main__": + SshdPromiseTypeModule().start() # pyright: ignore[reportUnknownMemberType] From a30017072287a632385f52f71fa16462cc7b6823 Mon Sep 17 00:00:00 2001 From: Lars Erik Wik Date: Tue, 24 Mar 2026 16:59:44 +0100 Subject: [PATCH 3/4] Rewrite sshd promise type to use the keyword as the promiser Instead of passing sshd directives as named attributes on a single promise, each directive is now its own promise where the promiser is the sshd keyword and "value" is the sole attribute. This simplifies validation and aligns with a one-promise-per-directive model. Ticket: ENT-13797 Signed-off-by: Lars Erik Wik Co-Authored-By: Claude Opus 4.6 (1M context) --- cfbs.json | 2 +- promise-types/sshd/LICENSE | 2 +- promise-types/sshd/README.md | 28 +- promise-types/sshd/enable.cf | 2 +- promise-types/sshd/example.cf | 17 +- promise-types/sshd/sshd.py | 570 ------------------------ promise-types/sshd/sshd_promise_type.py | 470 +++++++++++++++++++ 7 files changed, 491 insertions(+), 600 deletions(-) delete mode 100644 promise-types/sshd/sshd.py create mode 100644 promise-types/sshd/sshd_promise_type.py diff --git a/cfbs.json b/cfbs.json index cc2c295..a957671 100644 --- a/cfbs.json +++ b/cfbs.json @@ -307,7 +307,7 @@ "subdirectory": "promise-types/sshd", "dependencies": ["library-for-promise-types-in-python"], "steps": [ - "copy sshd.py modules/promises/", + "copy sshd_promise_type.py modules/promises/", "append enable.cf services/init.cf" ] }, diff --git a/promise-types/sshd/LICENSE b/promise-types/sshd/LICENSE index 6e1016e..eb2ada6 100644 --- a/promise-types/sshd/LICENSE +++ b/promise-types/sshd/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2025 Northern.tech +Copyright (c) 2026 Northern.tech Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/promise-types/sshd/README.md b/promise-types/sshd/README.md index 4f47b01..841d3ae 100644 --- a/promise-types/sshd/README.md +++ b/promise-types/sshd/README.md @@ -3,21 +3,18 @@ Configures sshd and restarts the service when configuration changes. ## Promiser -An arbitrary human-readable label that appears in log messages and reports. -Since there is only one global sshd configuration, the promiser is not used to identify a resource. -Example: `"global sshd config"`. +The sshd configuration keyword to manage (e.g. `PermitRootLogin`, `AllowUsers`). +Each promise manages a single directive in the drop-in config file. ## Attributes -- Named using sshd's native directive names (e.g. `PermitRootLogin`, not `permit_root_login`) -- Values can be strings or slists -- Validated against `sshd -t` during promise evaluation +- `value` (required) — the value for the directive, either a string or an slist ## What the module manages internally 1. **Include directive** — ensures the base `sshd_config` includes the drop-in directory (`sshd_config.d/`) as its first non-comment directive 2. **Drop-in directory** — creates the drop-in directory if it doesn't exist 3. **Drop-in file** — writes directives to `sshd_config.d/00-cfengine.conf` 4. **Service restart** — restarts sshd if configuration was changed and the service is already running -5. **Verification** — verifies the desired attributes appear in the effective sshd config (`sshd -T`) +5. **Verification** — verifies the desired directive appears in the effective sshd config (`sshd -T`) ## What the module does NOT do - Install sshd — that is a `packages:` promise @@ -29,22 +26,19 @@ Example: `"global sshd config"`. bundle agent sshd_config { packages: - "openssh-server" - policy => "present"; + "openssh-server" policy => "present"; services: - "sshd" - service_policy => "start"; + "sshd" service_policy => "start"; vars: - "allowed_users" slist => { "alice", "bob" }; + "allowed_users" slist => { "alice", "bob" }; sshd: - "global" - PermitRootLogin => "no", - PasswordAuthentication => "no", - Port => "22", - AllowUsers => @(allowed_users); + "PermitRootLogin" value => "no"; + "PasswordAuthentication" value => "no"; + "Port" value => "22"; + "AllowUsers" value => @(allowed_users); } ``` diff --git a/promise-types/sshd/enable.cf b/promise-types/sshd/enable.cf index 89b2c9a..bc3ea19 100644 --- a/promise-types/sshd/enable.cf +++ b/promise-types/sshd/enable.cf @@ -1,6 +1,6 @@ promise agent sshd # @brief Define sshd promise type { - path => "$(sys.workdir)/modules/promises/sshd.py"; + path => "$(sys.workdir)/modules/promises/sshd_promise_type.py"; interpreter => "/usr/bin/python3"; } diff --git a/promise-types/sshd/example.cf b/promise-types/sshd/example.cf index 4070cdb..3e83b4b 100644 --- a/promise-types/sshd/example.cf +++ b/promise-types/sshd/example.cf @@ -1,29 +1,26 @@ promise agent sshd # @brief Define sshd promise type { - path => "$(sys.workdir)/modules/promises/sshd.py"; + path => "$(sys.workdir)/modules/promises/sshd_promise_type.py"; interpreter => "/usr/bin/python3"; } bundle agent example { packages: - "openssh-server" - policy => "present"; + "openssh-server" policy => "present"; services: - "sshd" - service_policy => "start"; + "sshd" service_policy => "start"; vars: "allowed_users" slist => { "alice", "bob" }; sshd: - "global" - PermitRootLogin => "no", - PasswordAuthentication => "no", - Port => "22", - AllowUsers => @(allowed_users); + "PermitRootLogin" value => "no"; + "PasswordAuthentication" value => "no"; + "Port" value => "22"; + "AllowUsers" value => @(allowed_users); } bundle agent __main__ diff --git a/promise-types/sshd/sshd.py b/promise-types/sshd/sshd.py deleted file mode 100644 index c0c882e..0000000 --- a/promise-types/sshd/sshd.py +++ /dev/null @@ -1,570 +0,0 @@ -import os -import re -import sys -import subprocess -import tempfile - -try: - from cfengine_module_library import PromiseModule, ValidationError, Result -except ImportError: - sys.path.append(os.path.join(os.path.dirname(__file__), "../../libraries/python")) - from cfengine_module_library import PromiseModule, ValidationError, Result - - -BASE_CONFIG = "/etc/ssh/sshd_config" -DROP_IN_DIR = "/etc/ssh/sshd_config.d/" -CFE_CONFIG = os.path.join(DROP_IN_DIR, "00-cfengine.conf") - - -def sshd_quote(value: str) -> str: - """Quote a string for sshd_config. Values containing whitespace, '#', or - '\"' are wrapped in double quotes, with internal backslashes and double - quotes escaped.""" - if not value: - return '""' - if re.search(r'[\s#"]', value): - escaped = value.replace("\\", "\\\\").replace('"', '\\"') - return f'"{escaped}"' - return value - - -def to_sshd_value(value: str | list[str]) -> str: - """Convert a Python value to an sshd config value. Lists are space-joined, - individual strings are quoted when necessary.""" - if isinstance(value, list): - return " ".join(sshd_quote(v) for v in value) - if isinstance(value, str): - return sshd_quote(value) - raise TypeError(f"Expected str or list[str], got {type(value).__name__}") - - -def try_unlink(path: str): - """Remove a file, ignoring errors if it no longer exists.""" - try: - os.unlink(path) - except OSError: - pass - - -def safe_write(path: str, lines: list[str]): - """Atomically write lines to a file via a temporary file in the same directory.""" - dir = os.path.dirname(path) - base = os.path.basename(path) - prefix, suffix = os.path.splitext(base) - - fd, tmp_path = tempfile.mkstemp(prefix=prefix, suffix=suffix, dir=dir) - try: - os.fchmod(fd, 0o600) # rw------- - with os.fdopen(fd, "w") as f: - f.writelines(lines) - os.replace(tmp_path, path) - except BaseException: - # BaseException (not Exception) to also clean up on KeyboardInterrupt/SystemExit - try_unlink(tmp_path) - raise - - -def is_drop_in_directive(directive: str) -> bool: - """Check if a directive is an Include for the drop-in config directory.""" - m = re.match( - rf"include(\s+|\s*=\s*){re.escape(DROP_IN_DIR)}\*\.conf", - directive.strip(), - re.IGNORECASE, - ) - return m is not None - - -def update_result(old: str, new: str) -> str: - """Return the worst of two results. Severity: KEPT < REPAIRED < NOT_KEPT.""" - if old == Result.NOT_KEPT or new == Result.NOT_KEPT: - return Result.NOT_KEPT - if old == Result.REPAIRED or new == Result.REPAIRED: - return Result.REPAIRED - return Result.KEPT - - -def strip_trailing_comment(directive: str) -> str: - """Remove trailing comment from a directive, respecting double-quoted strings.""" - in_quotes = False - i = 0 - while i < len(directive): - c = directive[i] - if c == "\\" and in_quotes: - i += 2 # skip escaped character - continue - if c == '"': - in_quotes = not in_quotes - elif c == "#" and not in_quotes: - return directive[:i].rstrip() - i += 1 - return directive - - -def normalize_directive(directive: str) -> str: - """Normalize a directive by removing trailing comments and replacing = with a space.""" - directive = strip_trailing_comment(directive) - # Normalize separator (= or whitespace) to a single space - directive = re.sub(r"\s*=\s*", " ", directive, count=1) - return directive.strip().lower() - - -def get_directives(lines: list[str]) -> set[str]: - """Extract and normalize all non-comment, non-empty directives from lines.""" - return { - normalize_directive(line) - for line in lines - if line.strip() and not line.strip().startswith("#") - } - - -def has_same_directives(a: list[str], b: list[str]) -> bool: - """Check if two sets of lines contain the same directives, ignoring comments and order.""" - return get_directives(a) == get_directives(b) - - -def get_first_directive(lines: list[str]) -> str | None: - """Return the first non-comment, non-empty directive, or None if not found.""" - for line in lines: - stripped = line.strip() - if stripped and not stripped.startswith("#"): - return stripped - return None - - -class SshdPromiseTypeModule(PromiseModule): - - def __init__(self): - super().__init__("sshd_promise_module", "0.0.0") - - def validate_promise( # pyright: ignore[reportImplicitOverride] - self, promiser: str, attributes: dict[str, object], metadata: dict[str, str] - ): - for attr, value in attributes.items(): - if not isinstance(value, (str, list)): - raise ValidationError(f"Attribute '{attr}' must be a string or slist") - - def ensure_include_directive(self) -> str: - """Ensure the base sshd config includes the drop-in directory.""" - try: - with open(BASE_CONFIG, "r") as f: - lines = f.readlines() - except FileNotFoundError: - self.log_error( # pyright: ignore[reportUnknownMemberType] - f"Base configuration file '{BASE_CONFIG}' does not exist" - ) - return Result.NOT_KEPT - - first_directive = get_first_directive(lines) - - if first_directive is None or not is_drop_in_directive(first_directive): - include_directive = f"Include {DROP_IN_DIR}*.conf" - - self.log_debug( # pyright: ignore[reportUnknownMemberType] - f"Expected first directive in '{BASE_CONFIG}' to be '{include_directive}'" - ) - - lines.insert(0, f"{include_directive} # Added by CFEngine\n") - try: - safe_write(BASE_CONFIG, lines) - except Exception as e: - self.log_error( # pyright: ignore[reportUnknownMemberType] - f"Failed to write '{BASE_CONFIG}': {e}" - ) - return Result.NOT_KEPT - - self.log_info( # pyright: ignore[reportUnknownMemberType] - f"Added include directive to '{BASE_CONFIG}'" - ) - return Result.REPAIRED - - return Result.KEPT - - def ensure_drop_in_dir(self) -> str: - """Ensure the drop-in config directory exists.""" - if os.path.isdir(DROP_IN_DIR): - return Result.KEPT - - try: - os.makedirs(DROP_IN_DIR, mode=0o755) # rwxr-xr-x - except Exception as e: - self.log_error( # pyright: ignore[reportUnknownMemberType] - f"Failed to create drop-in directory '{DROP_IN_DIR}': {e}" - ) - return Result.NOT_KEPT - - self.log_info( # pyright: ignore[reportUnknownMemberType] - f"Created drop-in directory '{DROP_IN_DIR}'" - ) - return Result.REPAIRED - - def ensure_drop_in_config(self, attributes: dict[str, object]) -> str: - """Write the CFEngine drop-in config file with the given attributes.""" - lines = ["# Managed by CFEngine\n"] - for attr, value in attributes.items(): - # Ensured by validate_promise - assert isinstance(value, (str, list)) - lines.append( - f"{attr} {to_sshd_value(value)}\n" # pyright: ignore[reportUnknownArgumentType] - ) - - try: - with open(CFE_CONFIG, "r") as f: - existing = f.readlines() - except FileNotFoundError: - existing = [] - - if has_same_directives(lines, existing): - return Result.KEPT - - try: - safe_write(CFE_CONFIG, lines) - except Exception as e: - self.log_error( # pyright: ignore[reportUnknownMemberType] - f"Failed to write drop-in config '{CFE_CONFIG}': {e}" - ) - return Result.NOT_KEPT - - self.log_info( # pyright: ignore[reportUnknownMemberType] - f"Updated drop-in config '{CFE_CONFIG}'" - ) - return Result.REPAIRED - - def validate_config(self) -> str: - """Validate the sshd configuration using sshd -t.""" - r = subprocess.run( - ["/usr/sbin/sshd", "-t"], - capture_output=True, - text=True, - ) - if r.returncode != 0: - self.log_error( # pyright: ignore[reportUnknownMemberType] - f"Configuration validation failed: {r.stderr.strip()}" - ) - return Result.NOT_KEPT - return Result.KEPT - - def restart_sshd(self) -> str: - """Restart the sshd service if it is currently running.""" - r = subprocess.run( - ["systemctl", "is-active", "--quiet", "sshd"], - ) - if r.returncode != 0: - # If sshd is not running, do nothing - self.log_debug( # pyright: ignore[reportUnknownMemberType] - "The service sshd is not running" - ) - return Result.KEPT - - r = subprocess.run( - ["systemctl", "restart", "--quiet", "sshd"], - ) - if r.returncode != 0: - self.log_error( # pyright: ignore[reportUnknownMemberType] - "Failed to restart sshd service" - ) - return Result.NOT_KEPT - - self.log_info( # pyright: ignore[reportUnknownMemberType] - "Restarted sshd service" - ) - return Result.REPAIRED - - def verify_effective_config(self, attributes: dict[str, object]) -> str: - """Verify that the desired attributes appear in the effective sshd config.""" - r = subprocess.run( - ["/usr/sbin/sshd", "-T"], - capture_output=True, - text=True, - ) - if r.returncode != 0: - self.log_error( # pyright: ignore[reportUnknownMemberType] - "Failed to retrieve effective sshd configuration" - ) - return Result.NOT_KEPT - - effective = get_directives(r.stdout.splitlines()) - - desired: list[str] = [] - for attr, value in attributes.items(): - # Ensured by validate_promise - assert isinstance(value, (str, list)) - if isinstance(value, list): - # sshd -T splits multi-argument keywords into separate - # lines (e.g. "AllowUsers user1 user2" becomes two lines: - # "allowusers user1" and "allowusers user2"), so we must - # expand list values into individual directives to match - # the effective config format for set comparison. - for item in value: - desired.append(f"{attr} {to_sshd_value(item)}") - else: - desired.append(f"{attr} {to_sshd_value(value)}") - - missing = get_directives(desired) - effective - if missing: - self.log_error( # pyright: ignore[reportUnknownMemberType] - f"Missing directives in effective sshd config: {missing}" - ) - return Result.NOT_KEPT - - return Result.KEPT - - def evaluate_promise( # pyright: ignore[reportImplicitOverride] - self, promiser: str, attributes: dict[str, object], metadata: dict[str, str] - ) -> str: - result = Result.KEPT - - # Step 1: Ensure the base config includes the drop-in directory - result = update_result(result, self.ensure_include_directive()) - - # Step 2: Ensure the drop-in directory exists - result = update_result(result, self.ensure_drop_in_dir()) - - # Step 3: Ensure the drop-in config file contains the desired attributes - result = update_result(result, self.ensure_drop_in_config(attributes)) - - # Step 4: Validate config before restarting sshd - result = update_result(result, self.validate_config()) - - # Step 5: Restart sshd only if configuration was changed - if result == Result.REPAIRED: - result = update_result(result, self.restart_sshd()) - - # Step 6: Verify the effective config matches the desired attributes - if result != Result.NOT_KEPT: - result = update_result(result, self.verify_effective_config(attributes)) - - return result - - -def test_sshd_quote_simple(): - assert sshd_quote("no") == "no" - - -def test_sshd_quote_empty(): - assert sshd_quote("") == '""' - - -def test_sshd_quote_space(): - assert sshd_quote("some value") == '"some value"' - - -def test_sshd_quote_tab(): - assert sshd_quote("some\tvalue") == '"some\tvalue"' - - -def test_sshd_quote_hash(): - assert sshd_quote("before#after") == '"before#after"' - - -def test_sshd_quote_double_quote(): - assert sshd_quote('say "hello"') == '"say \\"hello\\""' - - -def test_sshd_quote_backslash(): - assert sshd_quote("path\\to") == "path\\to" - - -def test_sshd_quote_backslash_and_space(): - assert sshd_quote("path\\to dir") == '"path\\\\to dir"' - - -def test_to_sshd_value_str(): - assert to_sshd_value("no") == "no" - - -def test_to_sshd_value_str_with_spaces(): - assert to_sshd_value("some value") == '"some value"' - - -def test_to_sshd_value_list(): - assert to_sshd_value(["user1", "user2"]) == "user1 user2" - - -def test_to_sshd_value_list_with_quoting(): - assert to_sshd_value(["user1", "user 2"]) == 'user1 "user 2"' - - -def test_get_first_directive(): - lines = ["# comment\n", "PermitRootLogin no\n", "Port 22\n"] - assert get_first_directive(lines) == "PermitRootLogin no" - - -def test_get_first_directive_no_comments(): - lines = ["PermitRootLogin no\n", "Port 22\n"] - assert get_first_directive(lines) == "PermitRootLogin no" - - -def test_get_first_directive_all_comments(): - lines = ["# comment\n", "# another comment\n"] - assert get_first_directive(lines) is None - - -def test_get_first_directive_empty(): - assert get_first_directive([]) is None - - -def test_get_first_directive_extra_whitespace(): - lines = ["# comment\n", "PermitRootLogin no\n"] - assert get_first_directive(lines) == "PermitRootLogin no" - - -def test_get_first_directive_equal_sign(): - lines = ["# comment\n", "PermitRootLogin=no\n"] - assert get_first_directive(lines) == "PermitRootLogin=no" - - -def test_get_first_directive_blank_lines(): - lines = ["\n", " \n", "# comment\n", "Port 22\n"] - assert get_first_directive(lines) == "Port 22" - - -def test_normalize_directive_simple(): - assert normalize_directive("permitrootlogin no") == "permitrootlogin no" - - -def test_normalize_directive_trailing_comment(): - assert normalize_directive("PermitRootLogin no # comment") == "permitrootlogin no" - - -def test_normalize_directive_equal_sign(): - assert normalize_directive("PermitRootLogin=no") == "permitrootlogin no" - - -def test_normalize_directive_space_equal_space(): - assert normalize_directive("PermitRootLogin = no") == "permitrootlogin no" - - -def test_normalize_directive_equal_and_comment(): - assert normalize_directive("PermitRootLogin=no # comment") == "permitrootlogin no" - - -def test_normalize_directive_leading_trailing_whitespace(): - assert normalize_directive(" PermitRootLogin no ") == "permitrootlogin no" - - -def test_normalize_directive_hash_in_quoted_value(): - assert ( - normalize_directive('Banner "/etc/banner #info"') - == 'banner "/etc/banner #info"' - ) - - -def test_get_directives_filters_comments(): - lines = ["# comment\n", "PermitRootLogin no\n", "Port 22\n"] - assert get_directives(lines) == {"permitrootlogin no", "port 22"} - - -def test_get_directives_filters_blank_lines(): - lines = ["\n", "PermitRootLogin no\n", " \n"] - assert get_directives(lines) == {"permitrootlogin no"} - - -def test_get_directives_normalizes(): - lines = ["PermitRootLogin=no # managed\n", "Port 22\n"] - assert get_directives(lines) == {"permitrootlogin no", "port 22"} - - -def test_get_directives_empty(): - assert get_directives([]) == set() - - -def test_has_same_directives_same(): - a = ["# comment\n", "PermitRootLogin no\n", "Port 22\n"] - b = ["Port 22\n", "PermitRootLogin no\n"] - assert has_same_directives(a, b) - - -def test_has_same_directives_different_comments(): - a = ["# managed by X\n", "PermitRootLogin no\n"] - b = ["# managed by Y\n", "PermitRootLogin no\n"] - assert has_same_directives(a, b) - - -def test_has_same_directives_different_format(): - a = ["PermitRootLogin=no # comment\n"] - b = ["PermitRootLogin no\n"] - assert has_same_directives(a, b) - - -def test_has_same_directives_missing(): - a = ["PermitRootLogin no\n", "Port 22\n"] - b = ["PermitRootLogin no\n"] - assert not has_same_directives(a, b) - - -def test_has_same_directives_extra(): - a = ["PermitRootLogin no\n"] - b = ["PermitRootLogin no\n", "Port 22\n"] - assert not has_same_directives(a, b) - - -def test_is_drop_in_directive_space(): - assert is_drop_in_directive(f"Include {DROP_IN_DIR}*.conf") - - -def test_is_drop_in_directive_equal(): - assert is_drop_in_directive(f"Include={DROP_IN_DIR}*.conf") - - -def test_is_drop_in_directive_space_equal_space(): - assert is_drop_in_directive(f"Include = {DROP_IN_DIR}*.conf") - - -def test_is_drop_in_directive_case_insensitive(): - assert is_drop_in_directive(f"include {DROP_IN_DIR}*.conf") - - -def test_is_drop_in_directive_extra_files(): - assert is_drop_in_directive(f"Include {DROP_IN_DIR}*.conf /other/path") - - -def test_is_drop_in_directive_wrong_path(): - assert not is_drop_in_directive("Include /other/path/*.conf") - - -def test_is_drop_in_directive_no_separator(): - assert not is_drop_in_directive(f"Include{DROP_IN_DIR}*.conf") - - -def test_is_drop_in_directive_not_include(): - assert not is_drop_in_directive("permitrootlogin no") - - -def test_update_result_kept_kept(): - assert update_result(Result.KEPT, Result.KEPT) == Result.KEPT - - -def test_update_result_kept_repaired(): - assert update_result(Result.KEPT, Result.REPAIRED) == Result.REPAIRED - - -def test_update_result_kept_not_kept(): - assert update_result(Result.KEPT, Result.NOT_KEPT) == Result.NOT_KEPT - - -def test_update_result_repaired_kept(): - assert update_result(Result.REPAIRED, Result.KEPT) == Result.REPAIRED - - -def test_update_result_repaired_repaired(): - assert update_result(Result.REPAIRED, Result.REPAIRED) == Result.REPAIRED - - -def test_update_result_repaired_not_kept(): - assert update_result(Result.REPAIRED, Result.NOT_KEPT) == Result.NOT_KEPT - - -def test_update_result_not_kept_kept(): - assert update_result(Result.NOT_KEPT, Result.KEPT) == Result.NOT_KEPT - - -def test_update_result_not_kept_repaired(): - assert update_result(Result.NOT_KEPT, Result.REPAIRED) == Result.NOT_KEPT - - -def test_update_result_not_kept_not_kept(): - assert update_result(Result.NOT_KEPT, Result.NOT_KEPT) == Result.NOT_KEPT - - -if __name__ == "__main__": - SshdPromiseTypeModule().start() # pyright: ignore[reportUnknownMemberType] diff --git a/promise-types/sshd/sshd_promise_type.py b/promise-types/sshd/sshd_promise_type.py new file mode 100644 index 0000000..34a7c96 --- /dev/null +++ b/promise-types/sshd/sshd_promise_type.py @@ -0,0 +1,470 @@ +import os +import re +import sys +import subprocess +import tempfile + +# When run by CFEngine, the module library is installed. For local development +# and testing, fall back to the in-tree copy. +try: + from cfengine_module_library import PromiseModule, ValidationError, Result +except ImportError: + sys.path.append(os.path.join(os.path.dirname(__file__), "../../libraries/python")) + from cfengine_module_library import PromiseModule, ValidationError, Result + + +BASE_CONFIG = "/etc/ssh/sshd_config" +DROP_IN_DIR = "/etc/ssh/sshd_config.d/" +CFE_CONFIG = os.path.join(DROP_IN_DIR, "00-cfengine.conf") + + +# TODO: Add a "restart" attribute (default: True) to allow overriding the +# automatic restart of sshd after configuration changes. +# TODO: Add a "start" attribute (default: False) to optionally start the sshd +# service if it is not already running. +# TODO: Append the policy comment (e.g. "# Promised by CFEngine") as a trailing +# comment on each directive written to the drop-in file. + +REQUIRED_ATTRIBUTES = ("value",) # Add required attributes here +ACCEPTED_ATTRIBUTES = REQUIRED_ATTRIBUTES + () # Add optional attributes here + + +def sshd_quote(value: str) -> str: + """Quote a string for sshd_config. Values containing whitespace, '#', or + '\"' are wrapped in double quotes, with internal backslashes and double + quotes escaped.""" + if not value: + return '""' + if re.search(r'[\s#"]', value): + escaped = value.replace("\\", "\\\\").replace('"', '\\"') + return f'"{escaped}"' + return value + + +def to_sshd_value(value) -> str: + """Convert a Python value to an sshd config value. Lists are space-joined, + individual strings are quoted when necessary.""" + if isinstance(value, list): + return " ".join(sshd_quote(v) for v in value) + if isinstance(value, str): + return sshd_quote(value) + raise TypeError(f"Expected str or list[str], got {type(value).__name__}") + + +def try_unlink(path: str): + """Remove a file, ignoring errors if it no longer exists.""" + try: + os.unlink(path) + except OSError: + pass + + +def is_drop_in_directive(directive: str) -> bool: + """Check if a directive is an Include for the drop-in config directory.""" + m = re.match( + rf"include(\s+|\s*=\s*){re.escape(DROP_IN_DIR)}\*\.conf", + directive.strip(), + re.IGNORECASE, + ) + return m is not None + + +def update_result(old: str, new: str) -> str: + """Return the worst of two results. Severity: KEPT < REPAIRED < NOT_KEPT.""" + if old == Result.NOT_KEPT or new == Result.NOT_KEPT: + return Result.NOT_KEPT + if old == Result.REPAIRED or new == Result.REPAIRED: + return Result.REPAIRED + return Result.KEPT + + +def get_first_directive(lines: list[str]) -> str | None: + """Return the first non-comment, non-empty directive, or None if not found.""" + for line in lines: + stripped = line.strip() + if stripped and not stripped.startswith("#"): + return stripped + return None + + +class SshdPromiseTypeModule(PromiseModule): + def __init__(self): + super().__init__("sshd_promise_module", "0.0.0") + + def validate_promise( + self, promiser: str, attributes: dict[str, object], metadata: dict[str, str] + ): + # Check for unknown attributes + for attr in attributes: + if attr not in ACCEPTED_ATTRIBUTES: + raise ValidationError(f"Attribute '{attr}' is NOT accepted") + + # Check for any missing required attributes + for attr in REQUIRED_ATTRIBUTES: + if attr not in attributes: + raise ValidationError(f"Missing required attribute '{attr}'") + + # Check type of 'value' attributes + value = attributes.get("value") + if not isinstance(value, (str, list)): + raise ValidationError("Attribute 'value' must be a string or an slist") + + # Make sure 'value' attribute is not empty + if not value: + raise ValidationError("Attribute 'value' cannot be empty") + + def validate_config(self, filename: str) -> bool: + """Validate the sshd syntax on a file""" + r = subprocess.run( + ["/usr/sbin/sshd", "-t", "-f", filename], + capture_output=True, + text=True, + ) + if r.returncode != 0: + self.log_error(f"Configuration validation failed: {r.stderr.strip()}") + return False + return True + + def safe_write_config(self, path: str, lines: list[str]) -> bool: + """Atomically write config lines to a temporary file and replace the + target only if sshd validates the syntax successfully.""" + directory = os.path.dirname(path) + base = os.path.basename(path) + prefix, suffix = os.path.splitext(base) + + fd, tmp_path = tempfile.mkstemp(prefix=prefix, suffix=suffix, dir=directory) + try: + os.fchmod(fd, 0o600) # rw------- + with os.fdopen(fd, "w") as f: + f.writelines(lines) + f.flush() # Push data to kernel buffer so sshd can read it + success = self.validate_config(tmp_path) + if success: + os.replace(tmp_path, path) + return success + finally: + try_unlink(tmp_path) + + def ensure_include_directive(self) -> str: + """Ensure the base sshd config includes the drop-in directory.""" + try: + with open(BASE_CONFIG, "r") as f: + lines = f.readlines() + except FileNotFoundError: + self.log_error(f"Base configuration file '{BASE_CONFIG}' does not exist") + return Result.NOT_KEPT + + first_directive = get_first_directive(lines) + + if (first_directive is None) or (not is_drop_in_directive(first_directive)): + include_directive = f"Include {DROP_IN_DIR}*.conf" + self.log_debug( + f"Expected first directive in '{BASE_CONFIG}' to be '{include_directive}'" + ) + + lines.insert(0, f"{include_directive} # Added by CFEngine\n") + try: + if not self.safe_write_config(BASE_CONFIG, lines): + # Error already logged + return Result.NOT_KEPT + except Exception as e: + self.log_error(f"Failed to write '{BASE_CONFIG}': {e}") + return Result.NOT_KEPT + + self.log_info(f"Added include directive to '{BASE_CONFIG}'") + return Result.REPAIRED + + return Result.KEPT + + def ensure_drop_in_dir(self) -> str: + """Ensure the drop-in config directory exists.""" + if os.path.isdir(DROP_IN_DIR): + return Result.KEPT + + try: + os.makedirs(DROP_IN_DIR, mode=0o755) # rwxr-xr-x + except Exception as e: + self.log_error(f"Failed to create drop-in directory '{DROP_IN_DIR}': {e}") + return Result.NOT_KEPT + + self.log_info(f"Created drop-in directory '{DROP_IN_DIR}'") + return Result.REPAIRED + + def ensure_drop_in_config(self, keyword: str, value: str | list[str]) -> str: + """Write the CFEngine drop-in config file with the given attribute""" + + try: + with open(CFE_CONFIG, "r") as f: + lines = f.readlines() + except FileNotFoundError: + lines = [] + + # Remove conflicting directives + lines = [line for line in lines if not line.lower().startswith(keyword.lower())] + + # Remove the disclaimer so that we can put it back on top + lines = [line for line in lines if not line.startswith("# ")] + + # Add the promised directive to the top (just after the disclaimer) + disclaimer = ["# Managed by CFEngine\n", "# Do NOT manually edit this file\n"] + lines = disclaimer + [f"{keyword} {to_sshd_value(value)}\n"] + lines + + try: + if not self.safe_write_config(CFE_CONFIG, lines): + return Result.NOT_KEPT + except Exception as e: + self.log_error(f"Failed to write drop-in config '{CFE_CONFIG}': {e}") + return Result.NOT_KEPT + + self.log_info(f"Updated drop-in config '{CFE_CONFIG}'") + return Result.REPAIRED + + def restart_sshd(self) -> str: + """Restart the sshd service if it is currently running.""" + r = subprocess.run( + ["systemctl", "is-active", "--quiet", "sshd"], + ) + if r.returncode != 0: + # If sshd is not running, do nothing + self.log_debug("The service sshd is not running") + return Result.KEPT + + r = subprocess.run( + ["systemctl", "restart", "--quiet", "sshd"], + ) + if r.returncode != 0: + self.log_error("Failed to restart sshd service") + return Result.NOT_KEPT + + self.log_info("Restarted sshd service") + return Result.REPAIRED + + def effective_config_has_directive( + self, keyword: str, values: str | list[str] + ) -> bool: + """Check if the running sshd effective configuration (via sshd -T) + contains the given directive(s) for a keyword.""" + r = subprocess.run( + ["/usr/sbin/sshd", "-T"], + capture_output=True, + text=True, + ) + if r.returncode != 0: + self.log_error("Failed to get effective sshd config") + return False + effective = r.stdout.strip().splitlines() + + # sshd -T splits multi-argument keywords into separate + # lines (e.g. "AllowUsers user1 user2" becomes two lines: + # "allowusers user1" and "allowusers user2"), so we must + # expand list values into individual directives to match + # the effective config format for set comparison. + + for value in values if isinstance(values, list) else [values]: + assert isinstance(value, str) + directive = f"{keyword.lower()} {to_sshd_value(value)}" + if directive in effective: + self.log_debug( + f"Directive '{directive}' is present in effective sshd config" + ) + else: + self.log_debug( + f"Directive '{directive}' is NOT present in effective sshd config" + ) + return False + + return True + + def verify_effective_config(self, keyword: str, values: str | list[str]) -> str: + """Verify the effective sshd config contains the expected directive, + returning KEPT on success or NOT_KEPT on failure.""" + if self.effective_config_has_directive(keyword, values): + self.log_verbose("Successfully verified effective sshd config") + return Result.KEPT + + self.log_error("Failed to verify effective sshd config") + return Result.NOT_KEPT + + def evaluate_promise( + self, promiser: str, attributes: dict[str, object], metadata: dict[str, str] + ) -> str: + assert "value" in attributes, "expected 'value' in attributes" + value = attributes["value"] + assert isinstance(value, (str, list)), "expected type str or list" + assert value, "expected non-empty str or list" + + # Check if the effective config already has the desired state + if self.effective_config_has_directive(promiser, value): + return Result.KEPT + + # Ensure the base config includes the drop-in directory + result = update_result(Result.KEPT, self.ensure_include_directive()) + + # Ensure the drop-in directory exists + result = update_result(result, self.ensure_drop_in_dir()) + + # Ensure the drop-in config file contains the desired directive + result = update_result(result, self.ensure_drop_in_config(promiser, value)) + + # Restart sshd only if configuration was changed + if result == Result.REPAIRED: + result = update_result(result, self.restart_sshd()) + + # Verify the effective config matches the desired state + result = update_result(result, self.verify_effective_config(promiser, value)) + + return result + + +def test_sshd_quote_simple(): + assert sshd_quote("no") == "no" + + +def test_sshd_quote_empty(): + assert sshd_quote("") == '""' + + +def test_sshd_quote_space(): + assert sshd_quote("some value") == '"some value"' + + +def test_sshd_quote_tab(): + assert sshd_quote("some\tvalue") == '"some\tvalue"' + + +def test_sshd_quote_hash(): + assert sshd_quote("before#after") == '"before#after"' + + +def test_sshd_quote_double_quote(): + assert sshd_quote('say "hello"') == '"say \\"hello\\""' + + +def test_sshd_quote_backslash(): + assert sshd_quote("path\\to") == "path\\to" + + +def test_sshd_quote_backslash_and_space(): + assert sshd_quote("path\\to dir") == '"path\\\\to dir"' + + +def test_to_sshd_value_str(): + assert to_sshd_value("no") == "no" + + +def test_to_sshd_value_str_with_spaces(): + assert to_sshd_value("some value") == '"some value"' + + +def test_to_sshd_value_list(): + assert to_sshd_value(["user1", "user2"]) == "user1 user2" + + +def test_to_sshd_value_list_with_quoting(): + assert to_sshd_value(["user1", "user 2"]) == 'user1 "user 2"' + + +def test_get_first_directive(): + lines = ["# comment\n", "PermitRootLogin no\n", "Port 22\n"] + assert get_first_directive(lines) == "PermitRootLogin no" + + +def test_get_first_directive_no_comments(): + lines = ["PermitRootLogin no\n", "Port 22\n"] + assert get_first_directive(lines) == "PermitRootLogin no" + + +def test_get_first_directive_all_comments(): + lines = ["# comment\n", "# another comment\n"] + assert get_first_directive(lines) is None + + +def test_get_first_directive_empty(): + assert get_first_directive([]) is None + + +def test_get_first_directive_extra_whitespace(): + lines = ["# comment\n", "PermitRootLogin no\n"] + assert get_first_directive(lines) == "PermitRootLogin no" + + +def test_get_first_directive_equal_sign(): + lines = ["# comment\n", "PermitRootLogin=no\n"] + assert get_first_directive(lines) == "PermitRootLogin=no" + + +def test_get_first_directive_blank_lines(): + lines = ["\n", " \n", "# comment\n", "Port 22\n"] + assert get_first_directive(lines) == "Port 22" + + +def test_is_drop_in_directive_space(): + assert is_drop_in_directive(f"Include {DROP_IN_DIR}*.conf") + + +def test_is_drop_in_directive_equal(): + assert is_drop_in_directive(f"Include={DROP_IN_DIR}*.conf") + + +def test_is_drop_in_directive_space_equal_space(): + assert is_drop_in_directive(f"Include = {DROP_IN_DIR}*.conf") + + +def test_is_drop_in_directive_case_insensitive(): + assert is_drop_in_directive(f"include {DROP_IN_DIR}*.conf") + + +def test_is_drop_in_directive_extra_files(): + assert is_drop_in_directive(f"Include {DROP_IN_DIR}*.conf /other/path") + + +def test_is_drop_in_directive_wrong_path(): + assert not is_drop_in_directive("Include /other/path/*.conf") + + +def test_is_drop_in_directive_no_separator(): + assert not is_drop_in_directive(f"Include{DROP_IN_DIR}*.conf") + + +def test_is_drop_in_directive_not_include(): + assert not is_drop_in_directive("permitrootlogin no") + + +def test_update_result_kept_kept(): + assert update_result(Result.KEPT, Result.KEPT) == Result.KEPT + + +def test_update_result_kept_repaired(): + assert update_result(Result.KEPT, Result.REPAIRED) == Result.REPAIRED + + +def test_update_result_kept_not_kept(): + assert update_result(Result.KEPT, Result.NOT_KEPT) == Result.NOT_KEPT + + +def test_update_result_repaired_kept(): + assert update_result(Result.REPAIRED, Result.KEPT) == Result.REPAIRED + + +def test_update_result_repaired_repaired(): + assert update_result(Result.REPAIRED, Result.REPAIRED) == Result.REPAIRED + + +def test_update_result_repaired_not_kept(): + assert update_result(Result.REPAIRED, Result.NOT_KEPT) == Result.NOT_KEPT + + +def test_update_result_not_kept_kept(): + assert update_result(Result.NOT_KEPT, Result.KEPT) == Result.NOT_KEPT + + +def test_update_result_not_kept_repaired(): + assert update_result(Result.NOT_KEPT, Result.REPAIRED) == Result.NOT_KEPT + + +def test_update_result_not_kept_not_kept(): + assert update_result(Result.NOT_KEPT, Result.NOT_KEPT) == Result.NOT_KEPT + + +if __name__ == "__main__": + SshdPromiseTypeModule().start() From f024caf6014412869e9ea247e7119d7250f86887 Mon Sep 17 00:00:00 2001 From: Lars Erik Wik Date: Tue, 24 Mar 2026 19:07:03 +0100 Subject: [PATCH 4/4] Added note about conflicting sshd promises Ticket: ENT-13797 Signed-off-by: Lars Erik Wik --- promise-types/sshd/README.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/promise-types/sshd/README.md b/promise-types/sshd/README.md index 841d3ae..c7eac1b 100644 --- a/promise-types/sshd/README.md +++ b/promise-types/sshd/README.md @@ -16,6 +16,12 @@ Each promise manages a single directive in the drop-in config file. 4. **Service restart** — restarts sshd if configuration was changed and the service is already running 5. **Verification** — verifies the desired directive appears in the effective sshd config (`sshd -T`) +## Conflicting promisers +Having multiple promises with the same sshd keyword is not recommended. +In case of conflicting promisers, the agent will attempt to converge the correct state for each one in the order they are evaluated. +This means the last promise wins and determines the final value in the configuration file. +It will also cause multiple restarts of the sshd service, which may be disruptive. + ## What the module does NOT do - Install sshd — that is a `packages:` promise - Ensure sshd is running — that is a `services:` promise