diff --git a/CHANGELOG b/CHANGELOG new file mode 100644 index 0000000..e69de29 diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..5874837 --- /dev/null +++ b/Makefile @@ -0,0 +1,13 @@ +.PHONY: publish +publish: + flit publish + +.PHONY: deb +deb: + briefcase update linux system --target debian:testing + briefcase build linux system --target debian:testing + briefcase package linux system --target debian:testing + +.PHONY: clean +clean: + rm -rf build dist diff --git a/knotctl/__init__.py b/knotctl/__init__.py deleted file mode 100755 index a768fd2..0000000 --- a/knotctl/__init__.py +++ /dev/null @@ -1,737 +0,0 @@ -#!/usr/bin/env python3 - -import argparse -import getpass -import json -import os -import sys -import urllib.parse -from os import environ, mkdir -from os.path import isdir, isfile, join -from typing import Union -from urllib.parse import urlparse - -import argcomplete -import openstack -import openstack.config.loader -import requests -import yaml -from requests.models import HTTPBasicAuth -from simplejson.errors import JSONDecodeError as SimplejsonJSONDecodeError - -try: - from requests.exceptions import JSONDecodeError as RequestsJSONDecodeError -except ImportError: - from requests.exceptions import InvalidJSONError as RequestsJSONDecodeError - - -# Helper functions -def error(description: str, error: str) -> list[dict]: - response = [] - reply = {} - # https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/406 - reply["Code"] = 406 - reply["Description"] = description - reply["Error"] = error - response.append(reply) - return response - - -def get_config(config_filename: str): - if not isfile(config_filename): - print("You need to configure knotctl before proceeding") - run_config(config_filename) - with open(config_filename, "r") as fh: - return yaml.safe_load(fh.read()) - - -def get_openstack_addresses(cloud: str, name: str): - conn = openstack.connect(cloud=cloud) - - # List the servers - server = conn.compute.find_server(name) - if server is None: - print("Server not found") - exit(1) - openstack_addresses = [] - for network in server.addresses: - for address in server.addresses[network]: - openstack_addresses.append(address) - return openstack_addresses - - -def nested_out(input, tabs="") -> str: - string = "" - if isinstance(input, str) or isinstance(input, int): - string += "{}\n".format(input) - elif isinstance(input, dict): - for key, value in input.items(): - string += "{}{}: {}".format(tabs, key, - nested_out(value, tabs + " ")) - elif isinstance(input, list): - for entry in input: - string += "{}\n{}".format(tabs, nested_out(entry, tabs + " ")) - return string - - -def output(response: list[dict], jsonout: bool = False): - try: - if jsonout: - print(json.dumps(response)) - else: - print(nested_out(response)) - except BrokenPipeError: - pass - - -# Define the runner for each command -def run_add(url: str, jsonout: bool, headers: dict): - parsed = split_url(url) - response = requests.put(url, headers=headers) - out = response.json() - if isinstance(out, list): - for record in out: - if (record["data"] == parsed["data"] - and record["name"] == parsed["name"] - and record["rtype"] == parsed["rtype"]): - output(record, jsonout) - break - else: - output(out, jsonout) - - -def run_log(url: str, jsonout: bool, headers: dict): - response = requests.get(url, headers=headers) - string = response.content.decode("utf-8") - if jsonout: - out = [] - lines = string.splitlines() - index = 0 - text = "" - timestamp = "" - while index < len(lines): - line = lines[index] - index += 1 - cur_has_timestamp = line.startswith("[") - next_has_timestamp = index < len( - lines) and lines[index].startswith("[") - # Simple case, just one line with timestamp - if cur_has_timestamp and next_has_timestamp: - timestamp = line.split("]")[0].split("[")[1] - text = line.split("]")[1].lstrip(":").strip() - out.append({"timestamp": timestamp, "text": text}) - text = "" - timestamp = "" - # Start of multiline - elif cur_has_timestamp: - timestamp = line.split("]")[0].split("[")[1] - text = line.split("]")[1].lstrip(":").strip() - # End of multiline - elif next_has_timestamp: - text += f"\n{line.strip()}" - out.append({"timestamp": timestamp, "text": text}) - text = "" - timestamp = "" - # Middle of multiline - else: - text += f"\n{line.strip()}" - - else: - out = string - - output(out, jsonout) - - -def run_complete(shell: Union[None, str]): - if not shell or shell in ["bash", "zsh"]: - os.system("register-python-argcomplete knotctl") - elif shell == "fish": - os.system("register-python-argcomplete --shell fish knotctl") - elif shell == "tcsh": - os.system("register-python-argcomplete --shell tcsh knotctl") - - -def run_config( - config_filename: str, - context: Union[None, str] = None, - baseurl: Union[None, str] = None, - list_config: bool = False, - username: Union[None, str] = None, - password: Union[None, str] = None, - current: Union[None, str] = None, -): - if current: - if os.path.islink(config_filename): - actual_path = os.readlink(config_filename) - print(actual_path.split("-")[-1]) - else: - print("none") - return - config = {"baseurl": baseurl, "username": username, "password": password} - needed = [] - if context: - symlink = f"{config_filename}-{context}" - found = os.path.isfile(symlink) - if os.path.islink(config_filename): - os.remove(config_filename) - elif os.path.isfile(config_filename): - os.rename(config_filename, symlink) - os.symlink(symlink, config_filename) - config_filename = symlink - if found: - return - if list_config: - config_data = get_config(config_filename) - config_data.pop("password", None) - output(config_data) - return - if not baseurl: - needed.append("baseurl") - if not username: - needed.append("username") - for need in needed: - if need == "": - output( - error( - "Can not configure without {}".format(need), - "No {}".format(need), - )) - sys.exit(1) - config[need] = input("Enter {}: ".format(need)) - - if not password: - try: - config["password"] = getpass.getpass() - except EOFError: - output(error("Can not configure without password", "No password")) - sys.exit(1) - - with open(config_filename, "w") as fh: - fh.write(yaml.dump(config)) - - -def run_delete(url: str, jsonout: bool, headers: dict): - response = requests.delete(url, headers=headers) - reply = response.json() - if not reply and response.status_code == requests.codes.ok: - reply = [{"Code": 200, "Description": "{} deleted".format(url)}] - - output(reply, jsonout) - - -def run_list(url: str, - jsonout: bool, - headers: dict, - ret=False) -> Union[None, str]: - response = requests.get(url, headers=headers) - string = response.json() - if ret: - return string - else: - output(string, jsonout) - - -def run_openstack_sync(cloud: str, name: str, zone: str, headers: dict, - baseurl: str, jsonout: bool): - url = setup_url( - baseurl, - None, # arguments, - None, # data, - name, - None, # rtype, - None, # ttl, - zone, - ) - current_records = run_list(url, jsonout=True, headers=headers, ret=True) - openstack_addresses = get_openstack_addresses(cloud, name) - if current_records["Code"] == 404: - for address in openstack_addresses: - rtype = None - if address["version"] == 4: - rtype = "A" - elif address["version"] == 6: - rtype = "AAAA" - if rtype: - url = setup_url( - baseurl, - None, # arguments, - address["addr"], # data, - name, - rtype, - None, # ttl, - zone, - ) - run_add(url, jsonout, headers) - else: - previpv4 = False - previpv6 = False - curripv4 = False - curripv6 = False - for record in current_records: - if record.type == "A": - previpv4 = record.data - elif record.type == "AAAA": - previpv6 = record.data - for address in openstack_addresses: - rtype = None - if address.version == 4: - rtype = "A" - curripv4 = True - elif address.version == 6: - rtype = "AAAA" - curripv6 = True - if rtype and recor.type == rtype: - if record.data == address.addr: - continue - else: - url = setup_url( - baseurl, - None, # arguments, - address.addr, # data, - name, - record.type, - None, # ttl, - zone, - ) - run_update(url, jsonout, headers) - if previpv4 and not curripv4: - url = setup_url( - baseurl, - None, # arguments, - previpv4, # data, - name, - "A", - None, # ttl, - zone, - ) - run_delete(url, jsonout, headers) - if previpv6 and not curripv6: - url = setup_url( - baseurl, - None, # arguments, - previpv6, # data, - name, - "AAAA", - None, # ttl, - zone, - ) - run_delete(url, jsonout, headers) - if curripv4 and not previpv4: - url = setup_url( - baseurl, - None, # arguments, - curripv4, # data, - name, - "A", - None, # ttl, - zone, - ) - run_add(url, jsonout, headers) - if curripv6 and not previpv6: - url = setup_url( - baseurl, - None, # arguments, - curripv6, # data, - name, - "AAAA", - None, # ttl, - zone, - ) - run_add(url, jsonout, headers) - - -def run_update(url: str, jsonout: bool, headers: dict): - response = requests.patch(url, headers=headers) - output(response.json(), jsonout) - - -def run_zone(url: str, - jsonout: bool, - headers: dict, - ret=False) -> Union[None, str]: - response = requests.get(url, headers=headers) - zones = response.json() - for zone in zones: - del zone["records"] - string = zones - - if ret: - return string - else: - output(string, jsonout) - - -# Set up the url -def setup_url( - baseurl: str, - arguments: Union[None, list[str]], - data: Union[None, str], - name: Union[None, str], - rtype: Union[None, str], - ttl: Union[None, str], - zone: Union[None, str], -) -> str: - url = baseurl + "/zones" - if zone: - if not zone.endswith("."): - zone += "." - url += "/{}".format(zone) - if name and zone: - if name.endswith(zone.rstrip(".")): - name += "." - url += "/records/{}".format(name) - if zone and name and rtype: - url += "/{}".format(rtype) - if data and zone and name and rtype: - url += "/{}".format(data) - if ttl and data and zone and name and rtype: - url += "/{}".format(ttl) - if data and zone and name and rtype and arguments: - url += "?" - for arg in arguments: - if not url.endswith("?"): - url += "&" - key, value = arg.split("=") - url += key + "=" + urllib.parse.quote_plus(value) - - if ttl and (not rtype or not name or not zone): - output( - error( - "ttl only makes sense with rtype, name and zone", - "Missing parameter", - )) - sys.exit(1) - if rtype and (not name or not zone): - output( - error( - "rtype only makes sense with name and zone", - "Missing parameter", - )) - sys.exit(1) - if name and not zone: - output(error("name only makes sense with a zone", "Missing parameter")) - sys.exit(1) - return url - - -def split_url(url: str) -> dict: - parsed = urlparse(url, allow_fragments=False) - path = parsed.path - query = parsed.query - arguments: Union[None, list[str]] = query.split("&") - path_arr = path.split("/") - data: Union[None, str] = None - name: Union[None, str] = None - rtype: Union[None, str] = None - ttl: Union[None, str] = None - zone: Union[None, str] = None - if len(path_arr) > 2: - zone = path_arr[2] - if len(path_arr) > 4: - name = path_arr[4] - if len(path_arr) > 5: - rtype = path_arr[5] - if len(path_arr) > 6: - data = path_arr[6] - if len(path_arr) > 7: - ttl = path_arr[7] - - return { - "arguments": arguments, - "data": data, - "name": name, - "rtype": rtype, - "ttl": ttl, - "zone": zone, - } - - -def get_parser() -> dict: - description = """Manage DNS records with knot dns rest api: - * https://gitlab.nic.cz/knot/knot-dns-rest""" - - epilog = """ - The Domain Name System specifies a database of information - elements for network resources. The types of information - elements are categorized and organized with a list of DNS - record types, the resource records (RRs). Each record has a - name, a type, an expiration time (time to live), and - type-specific data. - - The following is a list of terms used in this program: - ---------------------------------------------------------------- - | Vocabulary | Description | - ---------------------------------------------------------------- - | zone | A DNS zone is a specific portion of the DNS | - | | namespace in the Domain Name System (DNS), | - | | which a specific organization or administrator | - | | manages. | - ---------------------------------------------------------------- - | name | In the Internet, a domain name is a string that | - | | identifies a realm of administrative autonomy, | - | | authority or control. Domain names are often | - | | used to identify services provided through the | - | | Internet, such as websites, email services and | - | | more. | - ---------------------------------------------------------------- - | rtype | A record type indicates the format of the data | - | | and it gives a hint of its intended use. For | - | | example, the A record is used to translate from | - | | a domain name to an IPv4 address, the NS record | - | | lists which name servers can answer lookups on | - | | a DNS zone, and the MX record specifies the | - | | mail server used to handle mail for a domain | - | | specified in an e-mail address. | - ---------------------------------------------------------------- - | data | A records data is of type-specific relevance, | - | | such as the IP address for address records, or | - | | the priority and hostname for MX records. | - ---------------------------------------------------------------- - - This information was compiled from Wikipedia: - * https://en.wikipedia.org/wiki/DNS_zone - * https://en.wikipedia.org/wiki/Domain_Name_System - * https://en.wikipedia.org/wiki/Zone_file - """ - # Grab user input - parser = argparse.ArgumentParser( - description=description, - epilog=epilog, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - parser.add_argument("--json", action=argparse.BooleanOptionalAction) - subparsers = parser.add_subparsers(dest="command") - - add_description = "Add a new record to the zone." - addcmd = subparsers.add_parser("add", description=add_description) - addcmd.add_argument("-d", "--data", required=True) - addcmd.add_argument("-n", "--name", required=True) - addcmd.add_argument("-r", "--rtype", required=True) - addcmd.add_argument("-t", "--ttl") - addcmd.add_argument("-z", "--zone", required=True) - - auditlog_description = "Audit the log file for errors." - subparsers.add_parser("auditlog", description=auditlog_description) - - changelog_description = "View the changelog of a zone." - changelogcmd = subparsers.add_parser("changelog", - description=changelog_description) - changelogcmd.add_argument("-z", "--zone", required=True) - - complete_description = "Generate shell completion script." - completecmd = subparsers.add_parser("completion", - description=complete_description) - completecmd.add_argument("-s", "--shell") - - config_description = "Configure access to knot-dns-rest-api." - configcmd = subparsers.add_parser("config", description=config_description) - configcmd.add_argument("-b", "--baseurl") - configcmd.add_argument("-c", "--context") - configcmd.add_argument("-C", - "--current", - action=argparse.BooleanOptionalAction) - configcmd.add_argument("-l", - "--list", - action=argparse.BooleanOptionalAction, - dest="list_config") - configcmd.add_argument("-p", "--password") - configcmd.add_argument("-u", "--username") - - delete_description = "Delete a record from the zone." - deletecmd = subparsers.add_parser("delete", description=delete_description) - deletecmd.add_argument("-d", "--data") - deletecmd.add_argument("-n", "--name") - deletecmd.add_argument("-r", "--rtype") - deletecmd.add_argument("-z", "--zone", required=True) - - list_description = "List records." - listcmd = subparsers.add_parser("list", description=list_description) - listcmd.add_argument("-d", "--data") - listcmd.add_argument("-n", "--name") - listcmd.add_argument("-r", "--rtype") - listcmd.add_argument("-z", "--zone", required=False) - - openstack_description = "Sync records with openstack." - openstackcmd = subparsers.add_parser("openstack-sync", - description=openstack_description) - openstackcmd.add_argument("-n", "--name", required=True) - openstackcmd.add_argument("-c", "--cloud", required=True) - openstackcmd.add_argument("-z", "--zone", required=True) - - user_description = "View user information." - usercmd = subparsers.add_parser("user", description=user_description) - usercmd.add_argument("-u", "--username", default=None) - - update_description = ( - "Update a record in the zone. The record must exist in the zone.\n") - update_description += ( - "In this case --data, --name, --rtype and --ttl switches are used\n") - update_description += ( - "for searching for the appropriate record, while the --argument\n") - update_description += "switches are used for updating the record." - update_epilog = """Available arguments are: - data: New record data. - name: New record domain name. - rtype: New record type. - ttl: New record time to live (TTL).""" - updatecmd = subparsers.add_parser( - "update", - description=update_description, - epilog=update_epilog, - formatter_class=argparse.RawDescriptionHelpFormatter, - ) - updatecmd.add_argument( - "-a", - "--argument", - action="append", - metavar="KEY=VALUE", - help="Specify key - value pairs to be updated: name=dns1.example.com." - + " or data=127.0.0.1 for example. --argument can be repeated", - required=True, - ) - updatecmd.add_argument("-d", "--data", required=True) - updatecmd.add_argument("-n", "--name", required=True) - updatecmd.add_argument("-r", "--rtype", required=True) - updatecmd.add_argument("-t", "--ttl") - updatecmd.add_argument("-z", "--zone", required=True) - - zone_description = "View zones." - subparsers.add_parser("zone", description=zone_description) - - argcomplete.autocomplete(parser) - - return parser - - -def get_token(config) -> str: - # Authenticate - baseurl = config["baseurl"] - username = config["username"] - password = config["password"] - basic = HTTPBasicAuth(username, password) - response = requests.get(baseurl + "/user/login", auth=basic) - token = "" - try: - token = response.json()["token"] - except KeyError: - output(response.json()) - except requests.exceptions.JSONDecodeError: - output( - error("Could not decode api response as JSON", "Could not decode")) - return token - - -def run(url, args, headers, baseurl, parser, username): - try: - if args.command == "add": - run_add(url, args.json, headers) - elif args.command == "delete": - run_delete(url, args.json, headers) - elif args.command == "list": - run_list(url, args.json, headers) - elif args.command == "update": - run_update(url, args.json, headers) - elif args.command == "user": - url = baseurl + f"/user/info/{username}" - run_list(url, args.json, headers) - elif args.command == "auditlog": - url = baseurl + "/user/auditlog" - run_log(url, args.json, headers) - elif args.command == "changelog": - url = baseurl + f"/zones/changelog/{args.zone.rstrip('.')}" - run_log(url, args.json, headers) - elif args.command == "zone": - url = baseurl + "/zones" - run_zone(url, args.json, headers) - elif args.command == "openstack-sync": - run_openstack_sync(args.cloud, args.name, args.zone, headers, - baseurl, args.json) - else: - parser.print_help(sys.stderr) - return 2 - except requests.exceptions.RequestException as e: - output(error(e, "Could not connect to server")) - except (RequestsJSONDecodeError, SimplejsonJSONDecodeError): - output( - error("Could not decode api response as JSON", "Could not decode")) - return 0 - - -# Entry point to program -def main() -> int: - parser = get_parser() - args = parser.parse_args() - if args.command == "completion": - run_complete(args.shell) - return 0 - - # Make sure we have config - config_basepath = join(environ["HOME"], ".knot") - config_filename = join(config_basepath, "config") - - if not isdir(config_basepath): - mkdir(config_basepath) - - if args.command == "config": - run_config( - config_filename, - args.context, - args.baseurl, - args.list_config, - args.username, - args.password, - args.current, - ) - return 0 - - config = get_config(config_filename) - baseurl = config["baseurl"] - token = get_token(config) - if token == "": - print("Could not get token, exiting") - return 1 - headers = {"Authorization": "Bearer {}".format(token)} - - # Route based on command - url = "" - ttl = None - user = config["username"] - if "ttl" in args: - ttl = args.ttl - if args.command != "update": - args.argument = None - if args.command == "add" and not ttl: - if args.zone.endswith("."): - zname = args.zone - else: - zname = args.zone + "." - soa_url = setup_url(baseurl, None, None, zname, "SOA", None, args.zone) - soa_json = run_list(soa_url, True, headers, ret=True) - ttl = soa_json[0]["ttl"] - if args.command == "user": - if args.username: - user = args.username - if args.command in [ - "auditlog", "changelog", "openstack-sync", "user", "zone" - ]: - pass - else: - try: - url = setup_url( - baseurl, - args.argument, - args.data, - args.name, - args.rtype, - ttl, - args.zone, - ) - except AttributeError: - parser.print_help(sys.stderr) - return 1 - - return run(url, args, headers, baseurl, parser, user) - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/pyproject.toml b/pyproject.toml index cef6614..5313ea5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,13 +16,14 @@ classifiers=[ "Operating System :: OS Independent", ] requires-python= ">=3.9" -version = "0.1.1" +version = "0.1.2" dependencies = [ "argcomplete==2.0.0", "pyyaml==6.0.1", "requests==2.27.1", "simplejson==3.17.6", + "openstacksdk==4.2.0", ] [project.urls] @@ -34,4 +35,28 @@ knotctl="knotctl:main" [tool.flit.sdist] -include = ["LICENSE",] +include = ["LICENSE", "README.md"] + +[tool.briefcase] +project_name = "knotctl" +bundle = "org.smolnet" +version = "0.1.2" + +[tool.briefcase.app.knotctl] +formal_name = "knotctl" +description = "A CLI for knotapi." +long_description = "A CLI for knotapi." +sources = ['src/knotctl'] +console_app = "True" +requires = [ + "argcomplete==2.0.0", + "pyyaml==6.0.1", + "requests==2.27.1", + "simplejson==3.17.6", + "openstacksdk==4.2.0", +] + +[tool.briefcase.app.knotctl.linux.system.debian] +system_runtime_requires = [ + "libpython3.13", +] diff --git a/src/knotctl/__init__.py b/src/knotctl/__init__.py new file mode 100755 index 0000000..aec9e03 --- /dev/null +++ b/src/knotctl/__init__.py @@ -0,0 +1,200 @@ +#!/usr/bin/env python3 + +import getpass +import os +import sys +from typing import Union + +import requests +from simplejson.errors import JSONDecodeError as SimplejsonJSONDecodeError + +from .config import Config +from .runners import Run +from .utils import error, get_parser, output, setup_url + +try: + from requests.exceptions import JSONDecodeError as RequestsJSONDecodeError +except ImportError: + from requests.exceptions import InvalidJSONError as RequestsJSONDecodeError + + +class Knotctl: + + def __init__(self): + self.conf = Config() + self.config = self.get_config() + self.config_filename = self.conf.config_filename + self.runner = Run() + + def get_config(self): + config = self.conf.get_config() + if not config: + print("You need to configure knotctl before proceeding") + run_config() + + return config + + def run(self, url: str, args: dict, baseurl: str, parser: dict, + username: str): + try: + if args.command == "add": + self.runner.add(url, args.json) + elif args.command == "delete": + self.runner.delete(url, args.json) + elif args.command == "list": + self.runner.lister(url, args.json) + elif args.command == "update": + self.runner.update(url, args.json) + elif args.command == "user": + url = baseurl + f"/user/info/{username}" + self.runner.lister(url, args.json) + elif args.command == "auditlog": + url = baseurl + "/user/auditlog" + self.runner.log(url, args.json) + elif args.command == "changelog": + url = baseurl + f"/zones/changelog/{args.zone.rstrip('.')}" + self.runner.log(url, args.json) + elif args.command == "zone": + url = baseurl + "/zones" + self.runner.zone(url, args.json) + elif args.command == "openstack-sync": + self.runner.openstack_sync(args.cloud, args.name, args.zone, + baseurl, args.json) + else: + parser.print_help(sys.stderr) + return 2 + except requests.exceptions.RequestException as e: + output(error(e, "Could not connect to server")) + except (RequestsJSONDecodeError, SimplejsonJSONDecodeError): + output( + error("Could not decode api response as JSON", + "Could not decode")) + return 0 + + +def run_complete(shell: Union[None, str]): + if not shell or shell in ["bash", "zsh"]: + os.system("register-python-argcomplete knotctl") + elif shell == "fish": + os.system("register-python-argcomplete --shell fish knotctl") + elif shell == "tcsh": + os.system("register-python-argcomplete --shell tcsh knotctl") + + +def run_config( + context: Union[None, str] = None, + baseurl: Union[None, str] = None, + list_config: bool = False, + username: Union[None, str] = None, + password: Union[None, str] = None, + current: Union[None, str] = None, +): + conf = Config() + if current: + print(conf.get_current()) + return + config = {"baseurl": baseurl, "username": username, "password": password} + needed = [] + if context: + found = conf.set_context(context) + if found: + return + if list_config: + config_data = conf.get_config_data() + output(config_data) + return + if not baseurl: + needed.append("baseurl") + if not username: + needed.append("username") + for need in needed: + if need == "": + output( + error( + "Can not configure without {}".format(need), + "No {}".format(need), + )) + sys.exit(1) + config[need] = input("Enter {}: ".format(need)) + + if not password: + try: + config["password"] = getpass.getpass() + except EOFError: + output(error("Can not configure without password", "No password")) + sys.exit(1) + + conf.set_config(config) + + +# Entry point to program +def main() -> int: + parser = get_parser() + args = parser.parse_args() + if args.command == "completion": + run_complete(args.shell) + return 0 + + knotctl = Knotctl() + + if args.command == "config": + run_config( + args.context, + args.baseurl, + args.list_config, + args.username, + args.password, + args.current, + ) + return 0 + + config = knotctl.get_config() + baseurl = config["baseurl"] + token = knotctl.conf.get_token() + if token == "": + print("Could not get token, exiting") + return 1 + + # Route based on command + url = "" + ttl = None + user = config["username"] + if "ttl" in args: + ttl = args.ttl + if args.command != "update": + args.argument = None + if args.command == "add" and not ttl: + if args.zone.endswith("."): + zname = args.zone + else: + zname = args.zone + "." + soa_url = setup_url(baseurl, None, None, zname, "SOA", None, args.zone) + soa_json = knotctl.runner.lister(soa_url, True, ret=True) + ttl = soa_json[0]["ttl"] + if args.command == "user": + if args.username: + user = args.username + if args.command in [ + "auditlog", "changelog", "openstack-sync", "user", "zone" + ]: + pass + else: + try: + url = setup_url( + baseurl, + args.argument, + args.data, + args.name, + args.rtype, + ttl, + args.zone, + ) + except AttributeError: + parser.print_help(sys.stderr) + return 1 + + return knotctl.run(url, args, baseurl, parser, user) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/src/knotctl/__main__.py b/src/knotctl/__main__.py new file mode 100644 index 0000000..488386d --- /dev/null +++ b/src/knotctl/__main__.py @@ -0,0 +1,4 @@ +from knotctl import main + +if __name__ == "__main__": + main() diff --git a/src/knotctl/config/__init__.py b/src/knotctl/config/__init__.py new file mode 100755 index 0000000..134e1db --- /dev/null +++ b/src/knotctl/config/__init__.py @@ -0,0 +1,84 @@ +#!/usr/bin/env python3 +import os +from os import mkdir +from os.path import isdir, isfile, join +from typing import Union + +import requests +import yaml +from requests.models import HTTPBasicAuth + +from ..utils import error, output + + +class Config: + + def __init__(self): + # Make sure we have config + self.config_basepath = join(os.environ["HOME"], ".knot") + self.config_filename = join(self.config_basepath, "config") + if not isdir(self.config_basepath): + mkdir(self.config_basepath) + + def get_config(self) -> Union[None, dict]: + if not isfile(self.config_filename): + return None + with open(self.config_filename, "r") as fh: + return yaml.safe_load(fh.read()) + + def get_config_data(self) -> dict: + config_data = self.get_config() + config_data.pop("password", None) + return config_data + + def get_current(self) -> str: + if os.path.islink(self.config_filename): + actual_path = os.readlink(self.config_filename) + return actual_path.split("-")[-1] + else: + return "none" + + def get_token(self) -> str: + # Authenticate + config = self.get_config() + baseurl = config["baseurl"] + username = config["username"] + password = config["password"] + basic = HTTPBasicAuth(username, password) + response = requests.get(baseurl + "/user/login", auth=basic) + token = "" + try: + token = response.json()["token"] + except KeyError: + output(response.json()) + except requests.exceptions.JSONDecodeError: + output( + error("Could not decode api response as JSON", + "Could not decode")) + return token + + def set_context(self, context) -> bool: + symlink = f"{self.config_filename}-{context}" + found = os.path.isfile(symlink) + if os.path.islink(self.config_filename): + os.remove(self.config_filename) + elif os.path.isfile(self.config_filename): + os.rename(self.config_filename, symlink) + os.symlink(symlink, self.config_filename) + self.config_filename = symlink + return found + + def set_config( + self, + baseurl: str, + username: str, + password: str, + ): + config = { + "baseurl": baseurl, + "username": username, + "password": password + } + + with open(self.config_filename, "w") as fh: + fh.write(yaml.dump(config)) diff --git a/src/knotctl/openstack/__init__.py b/src/knotctl/openstack/__init__.py new file mode 100644 index 0000000..537dde3 --- /dev/null +++ b/src/knotctl/openstack/__init__.py @@ -0,0 +1,17 @@ +import openstack +import openstack.config.loader + + +def get_openstack_addresses(cloud: str, name: str): + conn = openstack.connect(cloud=cloud) + + # List the servers + server = conn.compute.find_server(name) + if server is None: + print("Server not found") + exit(1) + openstack_addresses = [] + for network in server.addresses: + for address in server.addresses[network]: + openstack_addresses.append(address) + return openstack_addresses diff --git a/src/knotctl/runners/__init__.py b/src/knotctl/runners/__init__.py new file mode 100644 index 0000000..5b99cbb --- /dev/null +++ b/src/knotctl/runners/__init__.py @@ -0,0 +1,219 @@ +from typing import Union + +import requests + +from ..config import Config +from ..openstack import get_openstack_addresses +from ..utils import output, setup_url, split_url + + +class Run(): + + def __init__(self): + conf = Config() + self.headers = {"Authorization": f"Bearer {conf.get_token()}"} + + def add(self, url: str, jsonout: bool): + parsed = split_url(url) + response = requests.put(url, headers=self.headers) + out = response.json() + if isinstance(out, list): + for record in out: + if (record["data"] == parsed["data"] + and record["name"] == parsed["name"] + and record["rtype"] == parsed["rtype"]): + output(record, jsonout) + break + else: + output(out, jsonout) + + def delete(self, url: str, jsonout: bool): + response = requests.delete(url, headers=self.headers) + reply = response.json() + if not reply and response.status_code == requests.codes.ok: + reply = [{"Code": 200, "Description": "{} deleted".format(url)}] + + output(reply, jsonout) + + def log(self, url: str, jsonout: bool): + response = requests.get(url, headers=self.headers) + string = response.content.decode("utf-8") + if jsonout: + out = [] + lines = string.splitlines() + index = 0 + text = "" + timestamp = "" + while index < len(lines): + line = lines[index] + index += 1 + cur_has_timestamp = line.startswith("[") + next_has_timestamp = index < len( + lines) and lines[index].startswith("[") + # Simple case, just one line with timestamp + if cur_has_timestamp and next_has_timestamp: + timestamp = line.split("]")[0].split("[")[1] + text = line.split("]")[1].lstrip(":").strip() + out.append({"timestamp": timestamp, "text": text}) + text = "" + timestamp = "" + # Start of multiline + elif cur_has_timestamp: + timestamp = line.split("]")[0].split("[")[1] + text = line.split("]")[1].lstrip(":").strip() + # End of multiline + elif next_has_timestamp: + text += f"\n{line.strip()}" + out.append({"timestamp": timestamp, "text": text}) + text = "" + timestamp = "" + # Middle of multiline + else: + text += f"\n{line.strip()}" + + else: + out = string + + output(out, jsonout) + + def update(self, url: str, jsonout: bool): + response = requests.patch(url, headers=self.headers) + output(response.json(), jsonout) + + def zone(self, url: str, jsonout: bool, ret=False) -> Union[None, str]: + response = requests.get(url, headers=self.headers) + zones = response.json() + for zone in zones: + del zone["records"] + string = zones + + if ret: + return string + else: + output(string, jsonout) + + def lister(self, url: str, jsonout: bool, ret=False) -> Union[None, str]: + response = requests.get(url, headers=self.headers) + string = response.json() + if ret: + return string + else: + output(string, jsonout) + + def add_records(self, openstack_addresses, baseurl, name, zone, url, + jsonout): + for address in openstack_addresses: + rtype = None + if address["version"] == 4: + rtype = "A" + elif address["version"] == 6: + rtype = "AAAA" + if rtype: + url = setup_url( + baseurl, + None, # arguments, + address["addr"], # data, + name, + rtype, + None, # ttl, + zone, + ) + self.add(url, jsonout) + + def update_records(self, openstack_addresses, current_records, baseurl, + name, zone, url, jsonout): + previpv4 = False + previpv6 = False + curripv4 = False + curripv6 = False + for record in current_records: + if record.type == "A": + previpv4 = record.data + elif record.type == "AAAA": + previpv6 = record.data + for address in openstack_addresses: + rtype = None + if address.version == 4: + rtype = "A" + curripv4 = True + elif address.version == 6: + rtype = "AAAA" + curripv6 = True + if rtype and record.type == rtype: + if record.data == address.addr: + continue + else: + url = setup_url( + baseurl, + None, # arguments, + address.addr, # data, + name, + record.type, + None, # ttl, + zone, + ) + self.update(url, jsonout) + if previpv4 and not curripv4: + url = setup_url( + baseurl, + None, # arguments, + previpv4, # data, + name, + "A", + None, # ttl, + zone, + ) + self.delete(url, jsonout) + if previpv6 and not curripv6: + url = setup_url( + baseurl, + None, # arguments, + previpv6, # data, + name, + "AAAA", + None, # ttl, + zone, + ) + self.delete(url, jsonout) + if curripv4 and not previpv4: + url = setup_url( + baseurl, + None, # arguments, + curripv4, # data, + name, + "A", + None, # ttl, + zone, + ) + self.add(url, jsonout) + if curripv6 and not previpv6: + url = setup_url( + baseurl, + None, # arguments, + curripv6, # data, + name, + "AAAA", + None, # ttl, + zone, + ) + self.add(url, jsonout) + + def openstack_sync(self, cloud: str, name: str, zone: str, baseurl: str, + jsonout: bool): + url = setup_url( + baseurl, + None, # arguments, + None, # data, + name, + None, # rtype, + None, # ttl, + zone, + ) + current_records = self.lister(url, jsonout=True, ret=True) + openstack_addresses = get_openstack_addresses(cloud, name) + if current_records["Code"] == 404: + self.add_records(openstack_addresses, baseurl, name, zone, url, + jsonout) + else: + self.update_records(openstack_addresses, current_records, baseurl, + name, zone, url, jsonout) diff --git a/src/knotctl/utils/__init__.py b/src/knotctl/utils/__init__.py new file mode 100644 index 0000000..c2444bc --- /dev/null +++ b/src/knotctl/utils/__init__.py @@ -0,0 +1,282 @@ +import argparse +import argcomplete +import json +import sys +import urllib.parse +from typing import Union +from urllib.parse import urlparse + + +def get_parser() -> dict: + description = """Manage DNS records with knot dns rest api: + * https://gitlab.nic.cz/knot/knot-dns-rest""" + + epilog = """ + The Domain Name System specifies a database of information + elements for network resources. The types of information + elements are categorized and organized with a list of DNS + record types, the resource records (RRs). Each record has a + name, a type, an expiration time (time to live), and + type-specific data. + + The following is a list of terms used in this program: + ---------------------------------------------------------------- + | Vocabulary | Description | + ---------------------------------------------------------------- + | zone | A DNS zone is a specific portion of the DNS | + | | namespace in the Domain Name System (DNS), | + | | which a specific organization or administrator | + | | manages. | + ---------------------------------------------------------------- + | name | In the Internet, a domain name is a string that | + | | identifies a realm of administrative autonomy, | + | | authority or control. Domain names are often | + | | used to identify services provided through the | + | | Internet, such as websites, email services and | + | | more. | + ---------------------------------------------------------------- + | rtype | A record type indicates the format of the data | + | | and it gives a hint of its intended use. For | + | | example, the A record is used to translate from | + | | a domain name to an IPv4 address, the NS record | + | | lists which name servers can answer lookups on | + | | a DNS zone, and the MX record specifies the | + | | mail server used to handle mail for a domain | + | | specified in an e-mail address. | + ---------------------------------------------------------------- + | data | A records data is of type-specific relevance, | + | | such as the IP address for address records, or | + | | the priority and hostname for MX records. | + ---------------------------------------------------------------- + + This information was compiled from Wikipedia: + * https://en.wikipedia.org/wiki/DNS_zone + * https://en.wikipedia.org/wiki/Domain_Name_System + * https://en.wikipedia.org/wiki/Zone_file + """ + # Grab user input + parser = argparse.ArgumentParser( + description=description, + epilog=epilog, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("--json", action=argparse.BooleanOptionalAction) + subparsers = parser.add_subparsers(dest="command") + + add_description = "Add a new record to the zone." + addcmd = subparsers.add_parser("add", description=add_description) + addcmd.add_argument("-d", "--data", required=True) + addcmd.add_argument("-n", "--name", required=True) + addcmd.add_argument("-r", "--rtype", required=True) + addcmd.add_argument("-t", "--ttl") + addcmd.add_argument("-z", "--zone", required=True) + + auditlog_description = "Audit the log file for errors." + subparsers.add_parser("auditlog", description=auditlog_description) + + changelog_description = "View the changelog of a zone." + changelogcmd = subparsers.add_parser("changelog", + description=changelog_description) + changelogcmd.add_argument("-z", "--zone", required=True) + + complete_description = "Generate shell completion script." + completecmd = subparsers.add_parser("completion", + description=complete_description) + completecmd.add_argument("-s", "--shell") + + config_description = "Configure access to knot-dns-rest-api." + configcmd = subparsers.add_parser("config", description=config_description) + configcmd.add_argument("-b", "--baseurl") + configcmd.add_argument("-c", "--context") + configcmd.add_argument("-C", + "--current", + action=argparse.BooleanOptionalAction) + configcmd.add_argument("-l", + "--list", + action=argparse.BooleanOptionalAction, + dest="list_config") + configcmd.add_argument("-p", "--password") + configcmd.add_argument("-u", "--username") + + delete_description = "Delete a record from the zone." + deletecmd = subparsers.add_parser("delete", description=delete_description) + deletecmd.add_argument("-d", "--data") + deletecmd.add_argument("-n", "--name") + deletecmd.add_argument("-r", "--rtype") + deletecmd.add_argument("-z", "--zone", required=True) + + list_description = "List records." + listcmd = subparsers.add_parser("list", description=list_description) + listcmd.add_argument("-d", "--data") + listcmd.add_argument("-n", "--name") + listcmd.add_argument("-r", "--rtype") + listcmd.add_argument("-z", "--zone", required=False) + + openstack_description = "Sync records with openstack." + openstackcmd = subparsers.add_parser("openstack-sync", + description=openstack_description) + openstackcmd.add_argument("-n", "--name", required=True) + openstackcmd.add_argument("-c", "--cloud", required=True) + openstackcmd.add_argument("-z", "--zone", required=True) + + user_description = "View user information." + usercmd = subparsers.add_parser("user", description=user_description) + usercmd.add_argument("-u", "--username", default=None) + + update_description = ( + "Update a record in the zone. The record must exist in the zone.\n") + update_description += ( + "In this case --data, --name, --rtype and --ttl switches are used\n") + update_description += ( + "for searching for the appropriate record, while the --argument\n") + update_description += "switches are used for updating the record." + update_epilog = """Available arguments are: + data: New record data. + name: New record domain name. + rtype: New record type. + ttl: New record time to live (TTL).""" + updatecmd = subparsers.add_parser( + "update", + description=update_description, + epilog=update_epilog, + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + updatecmd.add_argument( + "-a", + "--argument", + action="append", + metavar="KEY=VALUE", + help="Specify key - value pairs to be updated: name=dns1.example.com." + + " or data=127.0.0.1 for example. --argument can be repeated", + required=True, + ) + updatecmd.add_argument("-d", "--data", required=True) + updatecmd.add_argument("-n", "--name", required=True) + updatecmd.add_argument("-r", "--rtype", required=True) + updatecmd.add_argument("-t", "--ttl") + updatecmd.add_argument("-z", "--zone", required=True) + + zone_description = "View zones." + subparsers.add_parser("zone", description=zone_description) + + argcomplete.autocomplete(parser) + + return parser + + +def error(description: str, error: str): + response = [] + reply = {} + # https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/406 + reply["Code"] = 406 + reply["Description"] = description + reply["Error"] = error + response.append(reply) + return response + + +def nested_out(input, tabs="") -> str: + string = "" + if isinstance(input, str) or isinstance(input, int): + string += f"{input}\n" + elif isinstance(input, dict): + for key, value in input.items(): + string += f"{tabs}{key}: {nested_out(value, tabs + " ")}" + elif isinstance(input, list): + for entry in input: + string += f"{tabs}\n{nested_out(entry, tabs + ' ')}" + return string + + +def output(response: list[dict], jsonout: bool = False): + try: + if jsonout: + print(json.dumps(response)) + else: + print(nested_out(response)) + except BrokenPipeError: + pass + + +def setup_url( + baseurl: str, + arguments: Union[None, list[str]], + data: Union[None, str], + name: Union[None, str], + rtype: Union[None, str], + ttl: Union[None, str], + zone: Union[None, str], +) -> str: + url = baseurl + "/zones" + if zone: + if not zone.endswith("."): + zone += "." + url += "/{}".format(zone) + if name and zone: + if name.endswith(zone.rstrip(".")): + name += "." + url += "/records/{}".format(name) + if zone and name and rtype: + url += "/{}".format(rtype) + if data and zone and name and rtype: + url += "/{}".format(data) + if ttl and data and zone and name and rtype: + url += "/{}".format(ttl) + if data and zone and name and rtype and arguments: + url += "?" + for arg in arguments: + if not url.endswith("?"): + url += "&" + key, value = arg.split("=") + url += key + "=" + urllib.parse.quote_plus(value) + + if ttl and (not rtype or not name or not zone): + output( + error( + "ttl only makes sense with rtype, name and zone", + "Missing parameter", + )) + sys.exit(1) + if rtype and (not name or not zone): + output( + error( + "rtype only makes sense with name and zone", + "Missing parameter", + )) + sys.exit(1) + if name and not zone: + output(error("name only makes sense with a zone", "Missing parameter")) + sys.exit(1) + return url + + +def split_url(url: str) -> dict: + parsed = urlparse(url, allow_fragments=False) + path = parsed.path + query = parsed.query + arguments: Union[None, list[str]] = query.split("&") + path_arr = path.split("/") + data: Union[None, str] = None + name: Union[None, str] = None + rtype: Union[None, str] = None + ttl: Union[None, str] = None + zone: Union[None, str] = None + if len(path_arr) > 2: + zone = path_arr[2] + if len(path_arr) > 4: + name = path_arr[4] + if len(path_arr) > 5: + rtype = path_arr[5] + if len(path_arr) > 6: + data = path_arr[6] + if len(path_arr) > 7: + ttl = path_arr[7] + + return { + "arguments": arguments, + "data": data, + "name": name, + "rtype": rtype, + "ttl": ttl, + "zone": zone, + }