From 5f0b5be03c3ff822712271c1d1db5577ac9313ae Mon Sep 17 00:00:00 2001 From: Micke Nordin Date: Mon, 24 Oct 2022 11:11:41 +0200 Subject: [PATCH] Mostly working --- knotctl | 119 +++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 78 insertions(+), 41 deletions(-) diff --git a/knotctl b/knotctl index 45feb76..2b482c2 100644 --- a/knotctl +++ b/knotctl @@ -1,6 +1,8 @@ #!/usr/bin/env python3 import argparse +import json +import sys import urllib.parse from collections.abc import Sequence from typing import Union @@ -22,6 +24,7 @@ headers = {"Authorization": "Bearer {}".format(token)} # Parse out information for each command line option parser = argparse.ArgumentParser() +parser.add_argument("--json", action=argparse.BooleanOptionalAction) subparsers = parser.add_subparsers(dest="command") add = subparsers.add_parser("add") add.add_argument( @@ -44,58 +47,97 @@ delete.add_argument("--name") delete.add_argument("--rtype") delete.add_argument("--zone", required=True) -list = subparsers.add_parser("list") -list.add_argument( +listcmd = subparsers.add_parser("list") +listcmd.add_argument( "--data", nargs="*", help="Specify any number of key - value pairs: name=dns1.example.com.", ) -list.add_argument("--name") -list.add_argument("--rtype") -list.add_argument("--zone") +listcmd.add_argument("--name") +listcmd.add_argument("--rtype") +listcmd.add_argument("--zone") update = subparsers.add_parser("update") update.add_argument( "--data", nargs="*", help="Specify any number of key - value pairs: name=dns1.example.com.", + required=True, ) -update.add_argument("--name") -update.add_argument("--rtype") +update.add_argument("--name", required=True) +update.add_argument("--rtype", required=True) update.add_argument("--zone", required=True) argcomplete.autocomplete(parser) args = parser.parse_args() +def error(description: str, error: str) -> Sequence[dict]: + response = [] + reply = {} + # https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/406 + reply["Code"] = 406 + reply["Description"] = description + reply["Error"] = error + response.append(reply) + return response + + +def output(response: Sequence[dict], jsonout: bool = False): + try: + if jsonout: + print(json.dumps(response)) + else: + for entry in response: + print(nested_out(entry)) + except BrokenPipeError: + pass + + +def nested_out(input, tabs="") -> str: + string = "" + if isinstance(input, str): + string += "{}\n".format(input) + elif isinstance(input, dict): + for key, value in input.items(): + string += "{}{}: {}".format(tabs, key, nested_out(value, tabs + " ")) + elif isinstance(input, list): + for entry in input: + string += "{}\n{}".format(tabs, nested_out(entry, tabs + " ")) + return string + + # Define the parser for each command -def parse_add( - data: Union[None, Sequence[str]], - name: Union[None, str], - rtype: Union[None, str], - zone: Union[None, str], -): - print(args) +def run_add(url: str, jsonout: bool): + response = requests.put(url, headers=headers) + output(response.json(), jsonout) -def parse_delete( - data: Union[None, Sequence[str]], - name: Union[None, str], - rtype: Union[None, str], - zone: Union[None, str], -): - print(args) +def run_delete(url: str, jsonout: bool): + response = requests.delete(url, headers=headers) + output(response.json(), jsonout) + +def run_list(url: str, jsonout: bool): + response = requests.get(url, headers=headers) + output(response.json(), jsonout) -def parse_list( + +def run_update(url: str, jsonout: bool): + response = requests.patch(url, headers=headers) + output(response.json(), jsonout) + + +# Set up the url +def setup_url( data: Union[None, Sequence[str]], name: Union[None, str], rtype: Union[None, str], zone: Union[None, str], -): +) -> str: url = base_url + "/zones" if zone: - url += '/{}'.format(zone) + url += "/{}".format(zone) if name and zone: url += "/records/{}".format(name) if zone and name and rtype: @@ -106,27 +148,22 @@ def parse_list( if not url.endswith("?"): url += "&" url += urllib.parse.quote_plus(arg) - - print(url) - response = requests.get(url, headers=headers) - print(response.json()) - - -def parse_update( - data: Union[None, Sequence[str]], - name: Union[None, str], - rtype: Union[None, str], - zone: Union[None, str], -): - print(args) + if rtype and (not name or not zone): + output(error("rtype only makes sense with name and zone", "Missing parameter")) + sys.exit(1) + if name and not zone: + output(error("name only makes sense with a zone", "Missing parameter")) + sys.exit(1) + return url # Route based on command +url = setup_url(args.data, args.name, args.rtype, args.zone) if args.command == "add": - parse_add(args.data, args.name, args.rtype, args.zone) + run_add(url, args.json) elif args.command == "delete": - parse_delete(args.data, args.name, args.rtype, args.zone) + run_delete(url, args.json) elif args.command == "list": - parse_list(args.data, args.name, args.rtype, args.zone) + run_list(url, args.json) elif args.command == "update": - parse_update(args.data, args.name, args.rtype, args.zone) + run_update(url, args.json)