Allow for configuration

main
Micke Nordin 2 years ago
parent 80c4a1b0d9
commit 448306869f
Signed by untrusted user: micke
GPG Key ID: 0DA0A7A5708FE257

@ -1,76 +1,20 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import argparse import argparse
import getpass
import json import json
import sys import sys
import urllib.parse import urllib.parse
from collections.abc import Sequence from collections.abc import Sequence
from os import environ, mkdir
from os.path import isdir, isfile, join
from typing import Union from typing import Union
import argcomplete import argcomplete
import requests import requests
from argcomplete.completers import subprocess import yaml
from requests.models import HTTPBasicAuth from requests.models import HTTPBasicAuth
base_url = "https://knotapitest.sunet.se"
user = "kano"
password = subprocess.getoutput("pass show knotapitest.sunet.se/{}".format(user))
# Authenticate
basic = HTTPBasicAuth(user, password)
response = requests.get(base_url + "/user/login", auth=basic)
token = response.json()["token"]
headers = {"Authorization": "Bearer {}".format(token)}
# Parse out information for each command line option
parser = argparse.ArgumentParser()
parser.add_argument("--json", action=argparse.BooleanOptionalAction)
subparsers = parser.add_subparsers(dest="command")
add = subparsers.add_parser("add")
add.add_argument(
"-d",
"--data",
nargs="*",
help="Specify any number of key - value pairs: name=dns1.example.com.",
)
add.add_argument("--name")
add.add_argument("--rtype")
add.add_argument("--zone", required=True)
delete = subparsers.add_parser("delete")
delete.add_argument(
"--data",
nargs="*",
help="Specify any number of key - value pairs: name=dns1.example.com.",
)
delete.add_argument("--name")
delete.add_argument("--rtype")
delete.add_argument("--zone", required=True)
listcmd = subparsers.add_parser("list")
listcmd.add_argument(
"--data",
nargs="*",
help="Specify any number of key - value pairs: name=dns1.example.com.",
)
listcmd.add_argument("--name")
listcmd.add_argument("--rtype")
listcmd.add_argument("--zone")
update = subparsers.add_parser("update")
update.add_argument(
"--data",
nargs="*",
help="Specify any number of key - value pairs: name=dns1.example.com.",
required=True,
)
update.add_argument("--name", required=True)
update.add_argument("--rtype", required=True)
update.add_argument("--zone", required=True)
argcomplete.autocomplete(parser)
args = parser.parse_args()
def error(description: str, error: str) -> Sequence[dict]: def error(description: str, error: str) -> Sequence[dict]:
response = [] response = []
@ -83,6 +27,11 @@ def error(description: str, error: str) -> Sequence[dict]:
return response return response
def get_config(config_filename: str):
with open(config_filename, "r") as fh:
return yaml.safe_load(fh.read())
def output(response: Sequence[dict], jsonout: bool = False): def output(response: Sequence[dict], jsonout: bool = False):
try: try:
if jsonout: if jsonout:
@ -108,34 +57,66 @@ def nested_out(input, tabs="") -> str:
# Define the parser for each command # Define the parser for each command
def run_add(url: str, jsonout: bool): def run_add(url: str, jsonout: bool, headers: dict):
response = requests.put(url, headers=headers) response = requests.put(url, headers=headers)
output(response.json(), jsonout) output(response.json(), jsonout)
def run_delete(url: str, jsonout: bool): def run_config(
config_filename: str,
baseurl: Union[None, str] = None,
username: Union[None, str] = None,
password: Union[None, str] = None,
):
config = {"baseurl": baseurl, "username": username, "password": password}
needed = []
if not baseurl:
needed.append("baseurl")
if not username:
needed.append("username")
for need in needed:
if need == "":
output(
error("Can not configure without {}".format(need), "No {}".format(need))
)
sys.exit(1)
config[need] = input("Enter {}:".format(need))
if not password:
try:
config["password"] = getpass.getpass()
except EOFError:
output(error("Can not configure without password", "No password"))
sys.exit(1)
with open(config_filename, "w") as fh:
fh.write(yaml.dump(config))
def run_delete(url: str, jsonout: bool, headers: dict):
response = requests.delete(url, headers=headers) response = requests.delete(url, headers=headers)
output(response.json(), jsonout) output(response.json(), jsonout)
def run_list(url: str, jsonout: bool): def run_list(url: str, jsonout: bool, headers: dict):
response = requests.get(url, headers=headers) response = requests.get(url, headers=headers)
output(response.json(), jsonout) output(response.json(), jsonout)
def run_update(url: str, jsonout: bool): def run_update(url: str, jsonout: bool, headers: dict):
response = requests.patch(url, headers=headers) response = requests.patch(url, headers=headers)
output(response.json(), jsonout) output(response.json(), jsonout)
# Set up the url # Set up the url
def setup_url( def setup_url(
baseurl: str,
data: Union[None, Sequence[str]], data: Union[None, Sequence[str]],
name: Union[None, str], name: Union[None, str],
rtype: Union[None, str], rtype: Union[None, str],
zone: Union[None, str], zone: Union[None, str],
) -> str: ) -> str:
url = base_url + "/zones" url = baseurl + "/zones"
if zone: if zone:
url += "/{}".format(zone) url += "/{}".format(zone)
if name and zone: if name and zone:
@ -157,13 +138,92 @@ def setup_url(
return url return url
def main() -> int:
config_basepath = join(environ["HOME"], ".knot")
config_filename = join(config_basepath, "config")
if not isdir(config_basepath):
mkdir(config_basepath)
if not isfile(config_filename):
print("You need to configure knotctl before proceeding")
run_config(config_filename)
config = get_config(config_filename)
baseurl = config["baseurl"]
username = config["username"]
password = config["password"]
# Authenticate
basic = HTTPBasicAuth(username, password)
response = requests.get(baseurl + "/user/login", auth=basic)
try:
token = response.json()["token"]
except KeyError:
output(response.json())
return 1
headers = {"Authorization": "Bearer {}".format(token)}
# Grab user input
parser = argparse.ArgumentParser()
parser.add_argument("--json", action=argparse.BooleanOptionalAction)
subparsers = parser.add_subparsers(dest="command")
for sub in ["add", "delete"]:
subparser = subparsers.add_parser(sub)
subparser.add_argument(
"-d",
"--data",
nargs="*",
help="Specify any number of key - value pairs: name=dns1.example.com.",
)
subparser.add_argument("-n", "--name")
subparser.add_argument("-r", "--rtype")
subparser.add_argument("-z", "--zone", required=True)
list = subparsers.add_parser("list")
list.add_argument(
"-d",
"--data",
nargs="*",
help="Specify any number of key - value pairs: name=dns1.example.com.",
)
list.add_argument("-n", "--name")
list.add_argument("-r", "--rtype")
list.add_argument("-z", "--zone")
config = subparsers.add_parser("config")
config.add_argument("-b", "--baseurl")
config.add_argument("-p", "--password")
config.add_argument("-u", "--username")
update = subparsers.add_parser("update")
update.add_argument(
"-d",
"--data",
nargs="*",
help="Specify any number of key - value pairs: name=dns1.example.com.",
required=True,
)
update.add_argument("-n", "--name", required=True)
update.add_argument("-r", "--rtype", required=True)
update.add_argument("-z", "--zone", required=True)
argcomplete.autocomplete(parser)
args = parser.parse_args()
# Route based on command # Route based on command
url = setup_url(args.data, args.name, args.rtype, args.zone) url = setup_url(baseurl, args.data, args.name, args.rtype, args.zone)
if args.command == "add": if args.command == "add":
run_add(url, args.json) run_add(url, args.json, headers)
elif args.command == "config":
run_config(args.baseurl, args.username, args.password)
elif args.command == "delete": elif args.command == "delete":
run_delete(url, args.json) run_delete(url, args.json, headers)
elif args.command == "list": elif args.command == "list":
run_list(url, args.json) run_list(url, args.json, headers)
elif args.command == "update": elif args.command == "update":
run_update(url, args.json) run_update(url, args.json, headers)
return 0
if __name__ == "__main__":
sys.exit(main())

Loading…
Cancel
Save