Compare commits

..

9 Commits
main ... main

@ -49,15 +49,16 @@ bashcompinit
source <(knotctl completion)
```
## Usage
```
usage: knotctl [-h] [--json | --no-json] {add,completion,config,delete,list,update} ...
usage: knotctl [-h] [--json | --no-json]
{add,auditlog,changelog,completion,config,delete,list,update}
...
Manage DNS records with knot dns rest api:
* https://gitlab.nic.cz/knot/knot-dns-rest
positional arguments:
{add,completion,config,delete,list,update}
{add,auditlog,changelog,completion,config,delete,list,update}
options:
-h, --help show this help message and exit
@ -134,6 +135,29 @@ options:
-s SHELL, --shell SHELL
```
### AUDITLOG
```
usage: knotctl auditlog [-h]
Audit the log file for errors.
options:
-h, --help show this help message and exit
```
### CHANGELOG
```
usage: knotctl changelog [-h] -z ZONE
View the changelog of a zone.
options:
-h, --help show this help message and exit
-z ZONE, --zone ZONE
```
### CONFIG
```

@ -7,7 +7,7 @@ import os
import sys
import urllib.parse
from os import environ, mkdir
from os.path import isdir, isfile, join
from os.path import isdir, isfile, islink, join, split
from typing import Union
from urllib.parse import urlparse
@ -16,6 +16,7 @@ import requests
import yaml
from requests.models import HTTPBasicAuth
from simplejson.errors import JSONDecodeError as SimplejsonJSONDecodeError
try:
from requests.exceptions import JSONDecodeError as RequestsJSONDecodeError
except ImportError:
@ -45,8 +46,7 @@ def nested_out(input, tabs="") -> str:
string += "{}\n".format(input)
elif isinstance(input, dict):
for key, value in input.items():
string += "{}{}: {}".format(tabs, key,
nested_out(value, tabs + " "))
string += "{}{}: {}".format(tabs, key, nested_out(value, tabs + " "))
elif isinstance(input, list):
for entry in input:
string += "{}\n{}".format(tabs, nested_out(entry, tabs + " "))
@ -70,15 +70,60 @@ def run_add(url: str, jsonout: bool, headers: dict):
out = response.json()
if isinstance(out, list):
for record in out:
if (record["data"] == parsed["data"]
if (
record["data"] == parsed["data"]
and record["name"] == parsed["name"]
and record["rtype"] == parsed["rtype"]):
and record["rtype"] == parsed["rtype"]
):
output(record, jsonout)
break
else:
output(out, jsonout)
def run_log(url: str, jsonout: bool, headers: dict):
response = requests.get(url, headers=headers)
string = response.content.decode("utf-8")
if jsonout:
out = []
lines = string.splitlines()
index = 0
text = ""
timestamp = ""
while index < len(lines):
line = lines[index]
index += 1
cur_has_timestamp = line.startswith("[")
next_has_timestamp = index < len(lines) and lines[index].startswith(
"["
)
# Simple case, just one line with timestamp
if cur_has_timestamp and next_has_timestamp:
timestamp = line.split(']')[0].split('[')[1]
text = line.split(']')[1].lstrip(':').strip()
out.append({'timestamp': timestamp, 'text': text})
text = ""
timestamp = ""
# Start of multiline
elif cur_has_timestamp:
timestamp = line.split(']')[0].split('[')[1]
text = line.split(']')[1].lstrip(':').strip()
# End of multiline
elif next_has_timestamp:
text += f'\n{line.strip()}'
out.append({'timestamp': timestamp, 'text': text})
text = ""
timestamp = ""
# Middle of multiline
else:
text += f'\n{line.strip()}'
else:
out = string
output(out, jsonout)
def run_complete(shell: Union[None, str]):
if not shell or shell in ["bash", "zsh"]:
os.system("register-python-argcomplete knotctl")
@ -94,11 +139,19 @@ def run_config(
baseurl: Union[None, str] = None,
username: Union[None, str] = None,
password: Union[None, str] = None,
current: Union[None, str] = None,
):
if current:
if os.path.islink(config_filename):
actual_path = os.readlink(config_filename)
print(actual_path.split('-')[-1])
else:
print("none")
return
config = {"baseurl": baseurl, "username": username, "password": password}
needed = []
if context:
symlink = f'{config_filename}-{context}'
symlink = f"{config_filename}-{context}"
found = os.path.isfile(symlink)
if os.path.islink(config_filename):
os.remove(config_filename)
@ -118,7 +171,8 @@ def run_config(
error(
"Can not configure without {}".format(need),
"No {}".format(need),
))
)
)
sys.exit(1)
config[need] = input("Enter {}: ".format(need))
@ -142,10 +196,7 @@ def run_delete(url: str, jsonout: bool, headers: dict):
output(reply, jsonout)
def run_list(url: str,
jsonout: bool,
headers: dict,
ret=False) -> Union[None, str]:
def run_list(url: str, jsonout: bool, headers: dict, ret=False) -> Union[None, str]:
response = requests.get(url, headers=headers)
string = response.json()
if ret:
@ -176,7 +227,7 @@ def setup_url(
url += "/{}".format(zone)
if name and zone:
if name.endswith(zone.rstrip(".")):
name += '.'
name += "."
url += "/records/{}".format(name)
if zone and name and rtype:
url += "/{}".format(rtype)
@ -197,14 +248,16 @@ def setup_url(
error(
"ttl only makes sense with rtype, name and zone",
"Missing parameter",
))
)
)
sys.exit(1)
if rtype and (not name or not zone):
output(
error(
"rtype only makes sense with name and zone",
"Missing parameter",
))
)
)
sys.exit(1)
if name and not zone:
output(error("name only makes sense with a zone", "Missing parameter"))
@ -293,7 +346,11 @@ def main() -> int:
* https://en.wikipedia.org/wiki/Zone_file
"""
# Grab user input
parser = argparse.ArgumentParser(description=description, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter)
parser = argparse.ArgumentParser(
description=description,
epilog=epilog,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("--json", action=argparse.BooleanOptionalAction)
subparsers = parser.add_subparsers(dest="command")
@ -305,6 +362,13 @@ def main() -> int:
addcmd.add_argument("-t", "--ttl")
addcmd.add_argument("-z", "--zone", required=True)
auditlog_description = "Audit the log file for errors."
subparsers.add_parser("auditlog", description=auditlog_description)
changelog_description = "View the changelog of a zone."
changelogcmd = subparsers.add_parser("changelog", description=changelog_description)
changelogcmd.add_argument("-z", "--zone", required=True)
complete_description = "Generate shell completion script."
completecmd = subparsers.add_parser("completion", description=complete_description)
completecmd.add_argument("-s", "--shell")
@ -313,6 +377,7 @@ def main() -> int:
configcmd = subparsers.add_parser("config", description=config_description)
configcmd.add_argument("-b", "--baseurl")
configcmd.add_argument("-c", "--context")
configcmd.add_argument("-C", "--current", action=argparse.BooleanOptionalAction)
configcmd.add_argument("-p", "--password")
configcmd.add_argument("-u", "--username")
@ -330,24 +395,33 @@ def main() -> int:
listcmd.add_argument("-r", "--rtype")
listcmd.add_argument("-z", "--zone", required=True)
update_description = "Update a record in the zone. The record must exist in the zone.\n"
update_description += "In this case --data, --name, --rtype and --ttl switches are used\n"
update_description += "for searching for the appropriate record, while the --argument\n"
update_description = (
"Update a record in the zone. The record must exist in the zone.\n"
)
update_description += (
"In this case --data, --name, --rtype and --ttl switches are used\n"
)
update_description += (
"for searching for the appropriate record, while the --argument\n"
)
update_description += "switches are used for updating the record."
update_epilog = """Available arguments are:
data: New record data.
name: New record domain name.
rtype: New record type.
ttl: New record time to live (TTL)."""
updatecmd = subparsers.add_parser("update", description=update_description, epilog=update_epilog, formatter_class=argparse.RawDescriptionHelpFormatter )
updatecmd = subparsers.add_parser(
"update",
description=update_description,
epilog=update_epilog,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
updatecmd.add_argument(
"-a",
"--argument",
nargs="*",
action="append",
metavar="KEY=VALUE",
help=
"Specify key - value pairs to be updated: name=dns1.example.com. or data=127.0.0.1 for example. --argument can be repeated",
help="Specify key - value pairs to be updated: name=dns1.example.com. or data=127.0.0.1 for example. --argument can be repeated",
required=True,
)
updatecmd.add_argument("-d", "--data", required=True)
@ -370,8 +444,9 @@ def main() -> int:
mkdir(config_basepath)
if args.command == "config":
run_config(config_filename, args.context, args.baseurl, args.username,
args.password)
run_config(
config_filename, args.context, args.baseurl, args.username, args.password, args.current
)
return 0
if not isfile(config_filename):
@ -392,12 +467,12 @@ def main() -> int:
output(response.json())
return 1
except requests.exceptions.JSONDecodeError:
output(
error("Could not decode api response as JSON", "Could not decode"))
output(error("Could not decode api response as JSON", "Could not decode"))
return 1
headers = {"Authorization": "Bearer {}".format(token)}
# Route based on command
url = ""
ttl = None
if "ttl" in args:
ttl = args.ttl
@ -411,7 +486,9 @@ def main() -> int:
soa_url = setup_url(baseurl, None, None, zname, "SOA", None, args.zone)
soa_json = run_list(soa_url, True, headers, ret=True)
ttl = soa_json[0]["ttl"]
if args.command in ["auditlog", "changelog"]:
pass
else:
try:
url = setup_url(
baseurl,
@ -435,9 +512,19 @@ def main() -> int:
run_list(url, args.json, headers)
elif args.command == "update":
run_update(url, args.json, headers)
elif args.command == "auditlog":
url = baseurl + "/user/auditlog"
run_log(url, args.json, headers)
elif args.command == "changelog":
url = baseurl + f"/zones/changelog/{args.zone.rstrip('.')}"
run_log(url, args.json, headers)
else:
parser.print_help(sys.stderr)
return 2
except requests.exceptions.RequestException as e:
output(error(e, "Could not connect to server"))
except (RequestsJSONDecodeError, SimplejsonJSONDecodeError):
output(
error("Could not decode api response as JSON", "Could not decode"))
output(error("Could not decode api response as JSON", "Could not decode"))
return 0

@ -16,7 +16,7 @@ classifiers=[
"Operating System :: OS Independent",
]
requires-python= ">=3.9"
version = "0.0.7"
version = "0.1.0"
dependencies = [
"argcomplete==2.0.0",

Loading…
Cancel
Save