forked from micke/knotctl
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
281 lines
8.5 KiB
281 lines
8.5 KiB
#!/usr/bin/env python3
|
|
|
|
import argparse
|
|
import getpass
|
|
import json
|
|
import os
|
|
import sys
|
|
import urllib.parse
|
|
from os import environ, mkdir
|
|
from os.path import isdir, isfile, join
|
|
from typing import Union
|
|
|
|
import argcomplete
|
|
import requests
|
|
import yaml
|
|
from requests.exceptions import JSONDecodeError as RequestsJSONDecodeError
|
|
from requests.models import HTTPBasicAuth
|
|
from simplejson.errors import JSONDecodeError as SimplejsonJSONDecodeError
|
|
|
|
|
|
# Helper functions
|
|
def error(description: str, error: str) -> list[dict]:
|
|
response = []
|
|
reply = {}
|
|
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/406
|
|
reply["Code"] = 406
|
|
reply["Description"] = description
|
|
reply["Error"] = error
|
|
response.append(reply)
|
|
return response
|
|
|
|
|
|
def get_config(config_filename: str):
|
|
with open(config_filename, "r") as fh:
|
|
return yaml.safe_load(fh.read())
|
|
|
|
|
|
def nested_out(input, tabs="") -> str:
|
|
string = ""
|
|
if isinstance(input, str) or isinstance(input, int):
|
|
string += "{}\n".format(input)
|
|
elif isinstance(input, dict):
|
|
for key, value in input.items():
|
|
string += "{}{}: {}".format(tabs, key,
|
|
nested_out(value, tabs + " "))
|
|
elif isinstance(input, list):
|
|
for entry in input:
|
|
string += "{}\n{}".format(tabs, nested_out(entry, tabs + " "))
|
|
return string
|
|
|
|
|
|
def output(response: list[dict], jsonout: bool = False):
|
|
try:
|
|
if jsonout:
|
|
print(json.dumps(response))
|
|
else:
|
|
print(nested_out(response))
|
|
except BrokenPipeError:
|
|
pass
|
|
|
|
|
|
# Define the runner for each command
|
|
def run_add(url: str, jsonout: bool, headers: dict):
|
|
print(url)
|
|
response = requests.put(url, headers=headers)
|
|
output(response.json(), jsonout)
|
|
|
|
|
|
def run_complete(shell: Union[None, str]):
|
|
if not shell or shell in ["bash", "zsh"]:
|
|
os.system("register-python-argcomplete knotctl")
|
|
elif shell == "fish":
|
|
os.system("register-python-argcomplete --shell fish knotctl")
|
|
elif shell == "tcsh":
|
|
os.system("register-python-argcomplete --shell tcsh knotctl")
|
|
|
|
|
|
def run_config(
|
|
config_filename: str,
|
|
baseurl: Union[None, str] = None,
|
|
username: Union[None, str] = None,
|
|
password: Union[None, str] = None,
|
|
):
|
|
config = {"baseurl": baseurl, "username": username, "password": password}
|
|
needed = []
|
|
if not baseurl:
|
|
needed.append("baseurl")
|
|
if not username:
|
|
needed.append("username")
|
|
for need in needed:
|
|
if need == "":
|
|
output(
|
|
error("Can not configure without {}".format(need),
|
|
"No {}".format(need)))
|
|
sys.exit(1)
|
|
config[need] = input("Enter {}:".format(need))
|
|
|
|
if not password:
|
|
try:
|
|
config["password"] = getpass.getpass()
|
|
except EOFError:
|
|
output(error("Can not configure without password", "No password"))
|
|
sys.exit(1)
|
|
|
|
with open(config_filename, "w") as fh:
|
|
fh.write(yaml.dump(config))
|
|
|
|
|
|
def run_delete(url: str, jsonout: bool, headers: dict):
|
|
response = requests.delete(url, headers=headers)
|
|
reply = response.json()
|
|
if not reply and response.status_code == requests.codes.ok:
|
|
reply = [{"Code": 200, "Description": "{} deleted".format(url)}]
|
|
|
|
output(reply, jsonout)
|
|
|
|
|
|
def run_list(url: str, jsonout: bool, headers: dict):
|
|
response = requests.get(url, headers=headers)
|
|
output(response.json(), jsonout)
|
|
|
|
|
|
def run_update(url: str, jsonout: bool, headers: dict):
|
|
response = requests.patch(url, headers=headers)
|
|
output(response.json(), jsonout)
|
|
|
|
|
|
# Set up the url
|
|
def setup_url(
|
|
baseurl: str,
|
|
arguments: Union[None, list[str]],
|
|
data: Union[None, str],
|
|
name: Union[None, str],
|
|
rtype: Union[None, str],
|
|
ttl: Union[None, str],
|
|
zone: Union[None, str],
|
|
) -> str:
|
|
url = baseurl + "/zones"
|
|
if zone:
|
|
if not zone.endswith("."):
|
|
zone += "."
|
|
url += "/{}".format(zone)
|
|
if name and zone:
|
|
url += "/records/{}".format(name)
|
|
if zone and name and rtype:
|
|
url += "/{}".format(rtype)
|
|
if data and zone and name and rtype:
|
|
url += "/{}".format(data)
|
|
if ttl and data and zone and name and rtype:
|
|
url += "/{}".format(ttl)
|
|
if data and zone and name and rtype and arguments:
|
|
url += "?"
|
|
for arg in arguments:
|
|
if not url.endswith("?"):
|
|
url += "&"
|
|
key, value = arg.split("=")
|
|
url += key + "=" + urllib.parse.quote_plus(value)
|
|
|
|
if ttl and (not rtype or not name or not zone):
|
|
output(
|
|
error("ttl only makes sense with rtype, name and zone",
|
|
"Missing parameter"))
|
|
sys.exit(1)
|
|
if rtype and (not name or not zone):
|
|
output(
|
|
error("rtype only makes sense with name and zone",
|
|
"Missing parameter"))
|
|
sys.exit(1)
|
|
if name and not zone:
|
|
output(error("name only makes sense with a zone", "Missing parameter"))
|
|
sys.exit(1)
|
|
return url
|
|
|
|
|
|
# Entry point to program
|
|
def main() -> int:
|
|
# Grab user input
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument("--json", action=argparse.BooleanOptionalAction)
|
|
subparsers = parser.add_subparsers(dest="command")
|
|
addcmd = subparsers.add_parser("add")
|
|
addcmd.add_argument("-d", "--data", required=True)
|
|
addcmd.add_argument("-n", "--name", required=True)
|
|
addcmd.add_argument("-r", "--rtype", required=True)
|
|
addcmd.add_argument("-t", "--ttl", required=True)
|
|
addcmd.add_argument("-z", "--zone", required=True)
|
|
|
|
completecmd = subparsers.add_parser("completion")
|
|
completecmd.add_argument("-s", "--shell")
|
|
|
|
configcmd = subparsers.add_parser("config")
|
|
configcmd.add_argument("-b", "--baseurl")
|
|
configcmd.add_argument("-p", "--password")
|
|
configcmd.add_argument("-u", "--username")
|
|
|
|
deletecmd = subparsers.add_parser("delete")
|
|
deletecmd.add_argument("-d", "--data")
|
|
deletecmd.add_argument("-n", "--name")
|
|
deletecmd.add_argument("-r", "--rtype")
|
|
deletecmd.add_argument("-z", "--zone", required=True)
|
|
|
|
listcmd = subparsers.add_parser("list")
|
|
listcmd.add_argument("-d", "--data")
|
|
listcmd.add_argument("-n", "--name")
|
|
listcmd.add_argument("-r", "--rtype")
|
|
listcmd.add_argument("-z", "--zone")
|
|
|
|
updatecmd = subparsers.add_parser("update")
|
|
updatecmd.add_argument(
|
|
"-a",
|
|
"--argument",
|
|
nargs="*",
|
|
help="Specify key - value pairs to be updated: name=dns1.example.com.",
|
|
required=True,
|
|
)
|
|
updatecmd.add_argument("-d", "--data", required=True)
|
|
updatecmd.add_argument("-n", "--name", required=True)
|
|
updatecmd.add_argument("-r", "--rtype", required=True)
|
|
updatecmd.add_argument("-t", "--ttl")
|
|
updatecmd.add_argument("-z", "--zone", required=True)
|
|
|
|
argcomplete.autocomplete(parser)
|
|
args = parser.parse_args()
|
|
if args.command == "completion":
|
|
run_complete(args.shell)
|
|
return 0
|
|
|
|
# Make sure we have config
|
|
config_basepath = join(environ["HOME"], ".knot")
|
|
config_filename = join(config_basepath, "config")
|
|
|
|
if not isdir(config_basepath):
|
|
mkdir(config_basepath)
|
|
|
|
if args.command == "config":
|
|
run_config(args.baseurl, args.username, args.password)
|
|
return 0
|
|
|
|
if not isfile(config_filename):
|
|
print("You need to configure knotctl before proceeding")
|
|
run_config(config_filename)
|
|
|
|
config = get_config(config_filename)
|
|
baseurl = config["baseurl"]
|
|
username = config["username"]
|
|
password = config["password"]
|
|
|
|
# Authenticate
|
|
basic = HTTPBasicAuth(username, password)
|
|
response = requests.get(baseurl + "/user/login", auth=basic)
|
|
try:
|
|
token = response.json()["token"]
|
|
except KeyError:
|
|
output(response.json())
|
|
return 1
|
|
headers = {"Authorization": "Bearer {}".format(token)}
|
|
|
|
# Route based on command
|
|
ttl = None
|
|
if 'ttl' in args:
|
|
ttl = args.ttl
|
|
url = setup_url(baseurl, args.argument, args.data, args.name, args.rtype,
|
|
ttl, args.zone)
|
|
try:
|
|
if args.command == "add":
|
|
run_add(url, args.json, headers)
|
|
elif args.command == "delete":
|
|
run_delete(url, args.json, headers)
|
|
elif args.command == "list":
|
|
run_list(url, args.json, headers)
|
|
elif args.command == "update":
|
|
run_update(url, args.json, headers)
|
|
except (RequestsJSONDecodeError, SimplejsonJSONDecodeError):
|
|
output(
|
|
error("Could not decode api response as JSON", "Could not decode"))
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|