Add support for listing zones and userinfo

main
Micke Nordin 2 weeks ago
parent 3fff646945
commit 1cdb6472fa

@ -7,7 +7,7 @@ import os
import sys import sys
import urllib.parse import urllib.parse
from os import environ, mkdir from os import environ, mkdir
from os.path import isdir, isfile, islink, join, split from os.path import isdir, isfile, join
from typing import Union from typing import Union
from urllib.parse import urlparse from urllib.parse import urlparse
@ -36,6 +36,9 @@ def error(description: str, error: str) -> list[dict]:
def get_config(config_filename: str): def get_config(config_filename: str):
if not isfile(config_filename):
print("You need to configure knotctl before proceeding")
run_config(config_filename)
with open(config_filename, "r") as fh: with open(config_filename, "r") as fh:
return yaml.safe_load(fh.read()) return yaml.safe_load(fh.read())
@ -46,7 +49,8 @@ def nested_out(input, tabs="") -> str:
string += "{}\n".format(input) string += "{}\n".format(input)
elif isinstance(input, dict): elif isinstance(input, dict):
for key, value in input.items(): for key, value in input.items():
string += "{}{}: {}".format(tabs, key, nested_out(value, tabs + " ")) string += "{}{}: {}".format(tabs, key,
nested_out(value, tabs + " "))
elif isinstance(input, list): elif isinstance(input, list):
for entry in input: for entry in input:
string += "{}\n{}".format(tabs, nested_out(entry, tabs + " ")) string += "{}\n{}".format(tabs, nested_out(entry, tabs + " "))
@ -70,11 +74,9 @@ def run_add(url: str, jsonout: bool, headers: dict):
out = response.json() out = response.json()
if isinstance(out, list): if isinstance(out, list):
for record in out: for record in out:
if ( if (record["data"] == parsed["data"]
record["data"] == parsed["data"] and record["name"] == parsed["name"]
and record["name"] == parsed["name"] and record["rtype"] == parsed["rtype"]):
and record["rtype"] == parsed["rtype"]
):
output(record, jsonout) output(record, jsonout)
break break
else: else:
@ -94,29 +96,28 @@ def run_log(url: str, jsonout: bool, headers: dict):
line = lines[index] line = lines[index]
index += 1 index += 1
cur_has_timestamp = line.startswith("[") cur_has_timestamp = line.startswith("[")
next_has_timestamp = index < len(lines) and lines[index].startswith( next_has_timestamp = index < len(
"[" lines) and lines[index].startswith("[")
)
# Simple case, just one line with timestamp # Simple case, just one line with timestamp
if cur_has_timestamp and next_has_timestamp: if cur_has_timestamp and next_has_timestamp:
timestamp = line.split(']')[0].split('[')[1] timestamp = line.split("]")[0].split("[")[1]
text = line.split(']')[1].lstrip(':').strip() text = line.split("]")[1].lstrip(":").strip()
out.append({'timestamp': timestamp, 'text': text}) out.append({"timestamp": timestamp, "text": text})
text = "" text = ""
timestamp = "" timestamp = ""
# Start of multiline # Start of multiline
elif cur_has_timestamp: elif cur_has_timestamp:
timestamp = line.split(']')[0].split('[')[1] timestamp = line.split("]")[0].split("[")[1]
text = line.split(']')[1].lstrip(':').strip() text = line.split("]")[1].lstrip(":").strip()
# End of multiline # End of multiline
elif next_has_timestamp: elif next_has_timestamp:
text += f'\n{line.strip()}' text += f"\n{line.strip()}"
out.append({'timestamp': timestamp, 'text': text}) out.append({"timestamp": timestamp, "text": text})
text = "" text = ""
timestamp = "" timestamp = ""
# Middle of multiline # Middle of multiline
else: else:
text += f'\n{line.strip()}' text += f"\n{line.strip()}"
else: else:
out = string out = string
@ -144,7 +145,7 @@ def run_config(
if current: if current:
if os.path.islink(config_filename): if os.path.islink(config_filename):
actual_path = os.readlink(config_filename) actual_path = os.readlink(config_filename)
print(actual_path.split('-')[-1]) print(actual_path.split("-")[-1])
else: else:
print("none") print("none")
return return
@ -171,8 +172,7 @@ def run_config(
error( error(
"Can not configure without {}".format(need), "Can not configure without {}".format(need),
"No {}".format(need), "No {}".format(need),
) ))
)
sys.exit(1) sys.exit(1)
config[need] = input("Enter {}: ".format(need)) config[need] = input("Enter {}: ".format(need))
@ -196,7 +196,10 @@ def run_delete(url: str, jsonout: bool, headers: dict):
output(reply, jsonout) output(reply, jsonout)
def run_list(url: str, jsonout: bool, headers: dict, ret=False) -> Union[None, str]: def run_list(url: str,
jsonout: bool,
headers: dict,
ret=False) -> Union[None, str]:
response = requests.get(url, headers=headers) response = requests.get(url, headers=headers)
string = response.json() string = response.json()
if ret: if ret:
@ -210,6 +213,22 @@ def run_update(url: str, jsonout: bool, headers: dict):
output(response.json(), jsonout) output(response.json(), jsonout)
def run_zone(url: str,
jsonout: bool,
headers: dict,
ret=False) -> Union[None, str]:
response = requests.get(url, headers=headers)
zones = response.json()
for zone in zones:
del zone["records"]
string = zones
if ret:
return string
else:
output(string, jsonout)
# Set up the url # Set up the url
def setup_url( def setup_url(
baseurl: str, baseurl: str,
@ -248,16 +267,14 @@ def setup_url(
error( error(
"ttl only makes sense with rtype, name and zone", "ttl only makes sense with rtype, name and zone",
"Missing parameter", "Missing parameter",
) ))
)
sys.exit(1) sys.exit(1)
if rtype and (not name or not zone): if rtype and (not name or not zone):
output( output(
error( error(
"rtype only makes sense with name and zone", "rtype only makes sense with name and zone",
"Missing parameter", "Missing parameter",
) ))
)
sys.exit(1) sys.exit(1)
if name and not zone: if name and not zone:
output(error("name only makes sense with a zone", "Missing parameter")) output(error("name only makes sense with a zone", "Missing parameter"))
@ -297,8 +314,7 @@ def split_url(url: str) -> dict:
} }
# Entry point to program def get_parser() -> dict:
def main() -> int:
description = """Manage DNS records with knot dns rest api: description = """Manage DNS records with knot dns rest api:
* https://gitlab.nic.cz/knot/knot-dns-rest""" * https://gitlab.nic.cz/knot/knot-dns-rest"""
@ -366,18 +382,22 @@ def main() -> int:
subparsers.add_parser("auditlog", description=auditlog_description) subparsers.add_parser("auditlog", description=auditlog_description)
changelog_description = "View the changelog of a zone." changelog_description = "View the changelog of a zone."
changelogcmd = subparsers.add_parser("changelog", description=changelog_description) changelogcmd = subparsers.add_parser("changelog",
description=changelog_description)
changelogcmd.add_argument("-z", "--zone", required=True) changelogcmd.add_argument("-z", "--zone", required=True)
complete_description = "Generate shell completion script." complete_description = "Generate shell completion script."
completecmd = subparsers.add_parser("completion", description=complete_description) completecmd = subparsers.add_parser("completion",
description=complete_description)
completecmd.add_argument("-s", "--shell") completecmd.add_argument("-s", "--shell")
config_description = "Configure access to knot-dns-rest-api." config_description = "Configure access to knot-dns-rest-api."
configcmd = subparsers.add_parser("config", description=config_description) configcmd = subparsers.add_parser("config", description=config_description)
configcmd.add_argument("-b", "--baseurl") configcmd.add_argument("-b", "--baseurl")
configcmd.add_argument("-c", "--context") configcmd.add_argument("-c", "--context")
configcmd.add_argument("-C", "--current", action=argparse.BooleanOptionalAction) configcmd.add_argument("-C",
"--current",
action=argparse.BooleanOptionalAction)
configcmd.add_argument("-p", "--password") configcmd.add_argument("-p", "--password")
configcmd.add_argument("-u", "--username") configcmd.add_argument("-u", "--username")
@ -395,15 +415,16 @@ def main() -> int:
listcmd.add_argument("-r", "--rtype") listcmd.add_argument("-r", "--rtype")
listcmd.add_argument("-z", "--zone", required=True) listcmd.add_argument("-z", "--zone", required=True)
user_description = "View user information."
usercmd = subparsers.add_parser("user", description=user_description)
usercmd.add_argument("-u", "--username", default=None)
update_description = ( update_description = (
"Update a record in the zone. The record must exist in the zone.\n" "Update a record in the zone. The record must exist in the zone.\n")
)
update_description += ( update_description += (
"In this case --data, --name, --rtype and --ttl switches are used\n" "In this case --data, --name, --rtype and --ttl switches are used\n")
)
update_description += ( update_description += (
"for searching for the appropriate record, while the --argument\n" "for searching for the appropriate record, while the --argument\n")
)
update_description += "switches are used for updating the record." update_description += "switches are used for updating the record."
update_epilog = """Available arguments are: update_epilog = """Available arguments are:
data: New record data. data: New record data.
@ -421,7 +442,8 @@ def main() -> int:
"--argument", "--argument",
action="append", action="append",
metavar="KEY=VALUE", metavar="KEY=VALUE",
help="Specify key - value pairs to be updated: name=dns1.example.com. or data=127.0.0.1 for example. --argument can be repeated", help="Specify key - value pairs to be updated: name=dns1.example.com."
+ " or data=127.0.0.1 for example. --argument can be repeated",
required=True, required=True,
) )
updatecmd.add_argument("-d", "--data", required=True) updatecmd.add_argument("-d", "--data", required=True)
@ -430,7 +452,71 @@ def main() -> int:
updatecmd.add_argument("-t", "--ttl") updatecmd.add_argument("-t", "--ttl")
updatecmd.add_argument("-z", "--zone", required=True) updatecmd.add_argument("-z", "--zone", required=True)
zone_description = "View zone information."
zonecmd = subparsers.add_parser("zone", description=zone_description)
zonecmd.add_argument("-z", "--zone", default=None)
argcomplete.autocomplete(parser) argcomplete.autocomplete(parser)
return parser
def get_token(config) -> str:
# Authenticate
baseurl = config["baseurl"]
username = config["username"]
password = config["password"]
basic = HTTPBasicAuth(username, password)
response = requests.get(baseurl + "/user/login", auth=basic)
token = ""
try:
token = response.json()["token"]
except KeyError:
output(response.json())
except requests.exceptions.JSONDecodeError:
output(
error("Could not decode api response as JSON", "Could not decode"))
return token
def run(url, args, headers, baseurl, parser, username):
try:
if args.command == "add":
run_add(url, args.json, headers)
elif args.command == "delete":
run_delete(url, args.json, headers)
elif args.command == "list":
run_list(url, args.json, headers)
elif args.command == "update":
run_update(url, args.json, headers)
elif args.command == "user":
url = baseurl + f"/user/info/{username}"
run_list(url, args.json, headers)
elif args.command == "auditlog":
url = baseurl + "/user/auditlog"
run_log(url, args.json, headers)
elif args.command == "changelog":
url = baseurl + f"/zones/changelog/{args.zone.rstrip('.')}"
run_log(url, args.json, headers)
elif args.command == "zone":
url = baseurl + "/zones"
if args.zone:
url = url + f"/{args.zone.rstrip('.')}"
run_zone(url, args.json, headers)
else:
parser.print_help(sys.stderr)
return 2
except requests.exceptions.RequestException as e:
output(error(e, "Could not connect to server"))
except (RequestsJSONDecodeError, SimplejsonJSONDecodeError):
output(
error("Could not decode api response as JSON", "Could not decode"))
return 0
# Entry point to program
def main() -> int:
parser = get_parser()
args = parser.parse_args() args = parser.parse_args()
if args.command == "completion": if args.command == "completion":
run_complete(args.shell) run_complete(args.shell)
@ -445,35 +531,27 @@ def main() -> int:
if args.command == "config": if args.command == "config":
run_config( run_config(
config_filename, args.context, args.baseurl, args.username, args.password, args.current config_filename,
args.context,
args.baseurl,
args.username,
args.password,
args.current,
) )
return 0 return 0
if not isfile(config_filename):
print("You need to configure knotctl before proceeding")
run_config(config_filename)
config = get_config(config_filename) config = get_config(config_filename)
baseurl = config["baseurl"] baseurl = config["baseurl"]
username = config["username"] token = get_token(config)
password = config["password"] if token == "":
print("Could not get token, exiting")
# Authenticate
basic = HTTPBasicAuth(username, password)
response = requests.get(baseurl + "/user/login", auth=basic)
try:
token = response.json()["token"]
except KeyError:
output(response.json())
return 1
except requests.exceptions.JSONDecodeError:
output(error("Could not decode api response as JSON", "Could not decode"))
return 1 return 1
headers = {"Authorization": "Bearer {}".format(token)} headers = {"Authorization": "Bearer {}".format(token)}
# Route based on command # Route based on command
url = "" url = ""
ttl = None ttl = None
user = config["username"]
if "ttl" in args: if "ttl" in args:
ttl = args.ttl ttl = args.ttl
if args.command != "update": if args.command != "update":
@ -486,7 +564,10 @@ def main() -> int:
soa_url = setup_url(baseurl, None, None, zname, "SOA", None, args.zone) soa_url = setup_url(baseurl, None, None, zname, "SOA", None, args.zone)
soa_json = run_list(soa_url, True, headers, ret=True) soa_json = run_list(soa_url, True, headers, ret=True)
ttl = soa_json[0]["ttl"] ttl = soa_json[0]["ttl"]
if args.command in ["auditlog", "changelog"]: if args.command == "user":
if args.username:
user = args.username
if args.command in ["auditlog", "changelog", "user", "zone"]:
pass pass
else: else:
try: try:
@ -503,29 +584,7 @@ def main() -> int:
parser.print_help(sys.stderr) parser.print_help(sys.stderr)
return 1 return 1
try: return run(url, args, headers, baseurl, parser, user)
if args.command == "add":
run_add(url, args.json, headers)
elif args.command == "delete":
run_delete(url, args.json, headers)
elif args.command == "list":
run_list(url, args.json, headers)
elif args.command == "update":
run_update(url, args.json, headers)
elif args.command == "auditlog":
url = baseurl + "/user/auditlog"
run_log(url, args.json, headers)
elif args.command == "changelog":
url = baseurl + f"/zones/changelog/{args.zone.rstrip('.')}"
run_log(url, args.json, headers)
else:
parser.print_help(sys.stderr)
return 2
except requests.exceptions.RequestException as e:
output(error(e, "Could not connect to server"))
except (RequestsJSONDecodeError, SimplejsonJSONDecodeError):
output(error("Could not decode api response as JSON", "Could not decode"))
return 0
if __name__ == "__main__": if __name__ == "__main__":

Loading…
Cancel
Save