From 1cdb6472fa3efc23b7e3dffa9047d789b51c45c4 Mon Sep 17 00:00:00 2001 From: Micke Nordin Date: Fri, 20 Dec 2024 10:35:06 +0100 Subject: [PATCH] Add support for listing zones and userinfo --- knotctl/__init__.py | 219 ++++++++++++++++++++++++++++---------------- 1 file changed, 139 insertions(+), 80 deletions(-) diff --git a/knotctl/__init__.py b/knotctl/__init__.py index 3757e10..4ab4779 100755 --- a/knotctl/__init__.py +++ b/knotctl/__init__.py @@ -7,7 +7,7 @@ import os import sys import urllib.parse from os import environ, mkdir -from os.path import isdir, isfile, islink, join, split +from os.path import isdir, isfile, join from typing import Union from urllib.parse import urlparse @@ -36,6 +36,9 @@ def error(description: str, error: str) -> list[dict]: def get_config(config_filename: str): + if not isfile(config_filename): + print("You need to configure knotctl before proceeding") + run_config(config_filename) with open(config_filename, "r") as fh: return yaml.safe_load(fh.read()) @@ -46,7 +49,8 @@ def nested_out(input, tabs="") -> str: string += "{}\n".format(input) elif isinstance(input, dict): for key, value in input.items(): - string += "{}{}: {}".format(tabs, key, nested_out(value, tabs + " ")) + string += "{}{}: {}".format(tabs, key, + nested_out(value, tabs + " ")) elif isinstance(input, list): for entry in input: string += "{}\n{}".format(tabs, nested_out(entry, tabs + " ")) @@ -70,11 +74,9 @@ def run_add(url: str, jsonout: bool, headers: dict): out = response.json() if isinstance(out, list): for record in out: - if ( - record["data"] == parsed["data"] - and record["name"] == parsed["name"] - and record["rtype"] == parsed["rtype"] - ): + if (record["data"] == parsed["data"] + and record["name"] == parsed["name"] + and record["rtype"] == parsed["rtype"]): output(record, jsonout) break else: @@ -94,29 +96,28 @@ def run_log(url: str, jsonout: bool, headers: dict): line = lines[index] index += 1 cur_has_timestamp = line.startswith("[") - next_has_timestamp = index < len(lines) and lines[index].startswith( - "[" - ) + next_has_timestamp = index < len( + lines) and lines[index].startswith("[") # Simple case, just one line with timestamp if cur_has_timestamp and next_has_timestamp: - timestamp = line.split(']')[0].split('[')[1] - text = line.split(']')[1].lstrip(':').strip() - out.append({'timestamp': timestamp, 'text': text}) + timestamp = line.split("]")[0].split("[")[1] + text = line.split("]")[1].lstrip(":").strip() + out.append({"timestamp": timestamp, "text": text}) text = "" timestamp = "" # Start of multiline elif cur_has_timestamp: - timestamp = line.split(']')[0].split('[')[1] - text = line.split(']')[1].lstrip(':').strip() + timestamp = line.split("]")[0].split("[")[1] + text = line.split("]")[1].lstrip(":").strip() # End of multiline elif next_has_timestamp: - text += f'\n{line.strip()}' - out.append({'timestamp': timestamp, 'text': text}) + text += f"\n{line.strip()}" + out.append({"timestamp": timestamp, "text": text}) text = "" timestamp = "" # Middle of multiline else: - text += f'\n{line.strip()}' + text += f"\n{line.strip()}" else: out = string @@ -144,7 +145,7 @@ def run_config( if current: if os.path.islink(config_filename): actual_path = os.readlink(config_filename) - print(actual_path.split('-')[-1]) + print(actual_path.split("-")[-1]) else: print("none") return @@ -171,8 +172,7 @@ def run_config( error( "Can not configure without {}".format(need), "No {}".format(need), - ) - ) + )) sys.exit(1) config[need] = input("Enter {}: ".format(need)) @@ -196,7 +196,10 @@ def run_delete(url: str, jsonout: bool, headers: dict): output(reply, jsonout) -def run_list(url: str, jsonout: bool, headers: dict, ret=False) -> Union[None, str]: +def run_list(url: str, + jsonout: bool, + headers: dict, + ret=False) -> Union[None, str]: response = requests.get(url, headers=headers) string = response.json() if ret: @@ -210,6 +213,22 @@ def run_update(url: str, jsonout: bool, headers: dict): output(response.json(), jsonout) +def run_zone(url: str, + jsonout: bool, + headers: dict, + ret=False) -> Union[None, str]: + response = requests.get(url, headers=headers) + zones = response.json() + for zone in zones: + del zone["records"] + string = zones + + if ret: + return string + else: + output(string, jsonout) + + # Set up the url def setup_url( baseurl: str, @@ -248,16 +267,14 @@ def setup_url( error( "ttl only makes sense with rtype, name and zone", "Missing parameter", - ) - ) + )) sys.exit(1) if rtype and (not name or not zone): output( error( "rtype only makes sense with name and zone", "Missing parameter", - ) - ) + )) sys.exit(1) if name and not zone: output(error("name only makes sense with a zone", "Missing parameter")) @@ -297,8 +314,7 @@ def split_url(url: str) -> dict: } -# Entry point to program -def main() -> int: +def get_parser() -> dict: description = """Manage DNS records with knot dns rest api: * https://gitlab.nic.cz/knot/knot-dns-rest""" @@ -366,18 +382,22 @@ def main() -> int: subparsers.add_parser("auditlog", description=auditlog_description) changelog_description = "View the changelog of a zone." - changelogcmd = subparsers.add_parser("changelog", description=changelog_description) + changelogcmd = subparsers.add_parser("changelog", + description=changelog_description) changelogcmd.add_argument("-z", "--zone", required=True) complete_description = "Generate shell completion script." - completecmd = subparsers.add_parser("completion", description=complete_description) + completecmd = subparsers.add_parser("completion", + description=complete_description) completecmd.add_argument("-s", "--shell") config_description = "Configure access to knot-dns-rest-api." configcmd = subparsers.add_parser("config", description=config_description) configcmd.add_argument("-b", "--baseurl") configcmd.add_argument("-c", "--context") - configcmd.add_argument("-C", "--current", action=argparse.BooleanOptionalAction) + configcmd.add_argument("-C", + "--current", + action=argparse.BooleanOptionalAction) configcmd.add_argument("-p", "--password") configcmd.add_argument("-u", "--username") @@ -395,15 +415,16 @@ def main() -> int: listcmd.add_argument("-r", "--rtype") listcmd.add_argument("-z", "--zone", required=True) + user_description = "View user information." + usercmd = subparsers.add_parser("user", description=user_description) + usercmd.add_argument("-u", "--username", default=None) + update_description = ( - "Update a record in the zone. The record must exist in the zone.\n" - ) + "Update a record in the zone. The record must exist in the zone.\n") update_description += ( - "In this case --data, --name, --rtype and --ttl switches are used\n" - ) + "In this case --data, --name, --rtype and --ttl switches are used\n") update_description += ( - "for searching for the appropriate record, while the --argument\n" - ) + "for searching for the appropriate record, while the --argument\n") update_description += "switches are used for updating the record." update_epilog = """Available arguments are: data: New record data. @@ -421,7 +442,8 @@ def main() -> int: "--argument", action="append", metavar="KEY=VALUE", - help="Specify key - value pairs to be updated: name=dns1.example.com. or data=127.0.0.1 for example. --argument can be repeated", + help="Specify key - value pairs to be updated: name=dns1.example.com." + + " or data=127.0.0.1 for example. --argument can be repeated", required=True, ) updatecmd.add_argument("-d", "--data", required=True) @@ -430,7 +452,71 @@ def main() -> int: updatecmd.add_argument("-t", "--ttl") updatecmd.add_argument("-z", "--zone", required=True) + zone_description = "View zone information." + zonecmd = subparsers.add_parser("zone", description=zone_description) + zonecmd.add_argument("-z", "--zone", default=None) + argcomplete.autocomplete(parser) + + return parser + + +def get_token(config) -> str: + # Authenticate + baseurl = config["baseurl"] + username = config["username"] + password = config["password"] + basic = HTTPBasicAuth(username, password) + response = requests.get(baseurl + "/user/login", auth=basic) + token = "" + try: + token = response.json()["token"] + except KeyError: + output(response.json()) + except requests.exceptions.JSONDecodeError: + output( + error("Could not decode api response as JSON", "Could not decode")) + return token + + +def run(url, args, headers, baseurl, parser, username): + try: + if args.command == "add": + run_add(url, args.json, headers) + elif args.command == "delete": + run_delete(url, args.json, headers) + elif args.command == "list": + run_list(url, args.json, headers) + elif args.command == "update": + run_update(url, args.json, headers) + elif args.command == "user": + url = baseurl + f"/user/info/{username}" + run_list(url, args.json, headers) + elif args.command == "auditlog": + url = baseurl + "/user/auditlog" + run_log(url, args.json, headers) + elif args.command == "changelog": + url = baseurl + f"/zones/changelog/{args.zone.rstrip('.')}" + run_log(url, args.json, headers) + elif args.command == "zone": + url = baseurl + "/zones" + if args.zone: + url = url + f"/{args.zone.rstrip('.')}" + run_zone(url, args.json, headers) + else: + parser.print_help(sys.stderr) + return 2 + except requests.exceptions.RequestException as e: + output(error(e, "Could not connect to server")) + except (RequestsJSONDecodeError, SimplejsonJSONDecodeError): + output( + error("Could not decode api response as JSON", "Could not decode")) + return 0 + + +# Entry point to program +def main() -> int: + parser = get_parser() args = parser.parse_args() if args.command == "completion": run_complete(args.shell) @@ -445,35 +531,27 @@ def main() -> int: if args.command == "config": run_config( - config_filename, args.context, args.baseurl, args.username, args.password, args.current + config_filename, + args.context, + args.baseurl, + args.username, + args.password, + args.current, ) return 0 - if not isfile(config_filename): - print("You need to configure knotctl before proceeding") - run_config(config_filename) - config = get_config(config_filename) baseurl = config["baseurl"] - username = config["username"] - password = config["password"] - - # Authenticate - basic = HTTPBasicAuth(username, password) - response = requests.get(baseurl + "/user/login", auth=basic) - try: - token = response.json()["token"] - except KeyError: - output(response.json()) - return 1 - except requests.exceptions.JSONDecodeError: - output(error("Could not decode api response as JSON", "Could not decode")) + token = get_token(config) + if token == "": + print("Could not get token, exiting") return 1 headers = {"Authorization": "Bearer {}".format(token)} # Route based on command url = "" ttl = None + user = config["username"] if "ttl" in args: ttl = args.ttl if args.command != "update": @@ -486,7 +564,10 @@ def main() -> int: soa_url = setup_url(baseurl, None, None, zname, "SOA", None, args.zone) soa_json = run_list(soa_url, True, headers, ret=True) ttl = soa_json[0]["ttl"] - if args.command in ["auditlog", "changelog"]: + if args.command == "user": + if args.username: + user = args.username + if args.command in ["auditlog", "changelog", "user", "zone"]: pass else: try: @@ -503,29 +584,7 @@ def main() -> int: parser.print_help(sys.stderr) return 1 - try: - if args.command == "add": - run_add(url, args.json, headers) - elif args.command == "delete": - run_delete(url, args.json, headers) - elif args.command == "list": - run_list(url, args.json, headers) - elif args.command == "update": - run_update(url, args.json, headers) - elif args.command == "auditlog": - url = baseurl + "/user/auditlog" - run_log(url, args.json, headers) - elif args.command == "changelog": - url = baseurl + f"/zones/changelog/{args.zone.rstrip('.')}" - run_log(url, args.json, headers) - else: - parser.print_help(sys.stderr) - return 2 - except requests.exceptions.RequestException as e: - output(error(e, "Could not connect to server")) - except (RequestsJSONDecodeError, SimplejsonJSONDecodeError): - output(error("Could not decode api response as JSON", "Could not decode")) - return 0 + return run(url, args, headers, baseurl, parser, user) if __name__ == "__main__":