Compare commits

..

18 Commits
main ... main

@ -49,15 +49,16 @@ bashcompinit
source <(knotctl completion)
```
## Usage
```
usage: knotctl [-h] [--json | --no-json] {add,completion,config,delete,list,update} ...
usage: knotctl [-h] [--json | --no-json]
{add,auditlog,changelog,completion,config,delete,list,update}
...
Manage DNS records with knot dns rest api:
* https://gitlab.nic.cz/knot/knot-dns-rest
positional arguments:
{add,completion,config,delete,list,update}
{add,auditlog,changelog,completion,config,delete,list,update}
options:
-h, --help show this help message and exit
@ -134,6 +135,29 @@ options:
-s SHELL, --shell SHELL
```
### AUDITLOG
```
usage: knotctl auditlog [-h]
Audit the log file for errors.
options:
-h, --help show this help message and exit
```
### CHANGELOG
```
usage: knotctl changelog [-h] -z ZONE
View the changelog of a zone.
options:
-h, --help show this help message and exit
-z ZONE, --zone ZONE
```
### CONFIG
```
@ -167,9 +191,9 @@ options:
### LIST
```
usage: knotctl list [-h] [-d DATA] [-n NAME] [-r RTYPE] -z ZONE
usage: knotctl list [-h] [-d DATA] [-n NAME] [-r RTYPE] [-z ZONE]
List records in the zone.
List records.
options:
-h, --help show this help message and exit
@ -207,3 +231,26 @@ Available arguments are:
rtype: New record type.
ttl: New record time to live (TTL).
```
### USER
```
usage: knotctl user [-h] [-u USERNAME]
View user information.
options:
-h, --help show this help message and exit
-u USERNAME, --username USERNAME
```
### ZONE
```
usage: knotctl zone
List zones.
options:
-h, --help show this help message and exit
```

@ -12,10 +12,13 @@ from typing import Union
from urllib.parse import urlparse
import argcomplete
import openstack
import openstack.config.loader
import requests
import yaml
from requests.models import HTTPBasicAuth
from simplejson.errors import JSONDecodeError as SimplejsonJSONDecodeError
try:
from requests.exceptions import JSONDecodeError as RequestsJSONDecodeError
except ImportError:
@ -35,10 +38,28 @@ def error(description: str, error: str) -> list[dict]:
def get_config(config_filename: str):
if not isfile(config_filename):
print("You need to configure knotctl before proceeding")
run_config(config_filename)
with open(config_filename, "r") as fh:
return yaml.safe_load(fh.read())
def get_openstack_addresses(cloud: str, name: str):
conn = openstack.connect(cloud=cloud)
# List the servers
server = conn.compute.find_server(name)
if server is None:
print("Server not found")
exit(1)
openstack_addresses = []
for network in server.addresses:
for address in server.addresses[network]:
openstack_addresses.append(address)
return openstack_addresses
def nested_out(input, tabs="") -> str:
string = ""
if isinstance(input, str) or isinstance(input, int):
@ -79,6 +100,48 @@ def run_add(url: str, jsonout: bool, headers: dict):
output(out, jsonout)
def run_log(url: str, jsonout: bool, headers: dict):
response = requests.get(url, headers=headers)
string = response.content.decode("utf-8")
if jsonout:
out = []
lines = string.splitlines()
index = 0
text = ""
timestamp = ""
while index < len(lines):
line = lines[index]
index += 1
cur_has_timestamp = line.startswith("[")
next_has_timestamp = index < len(
lines) and lines[index].startswith("[")
# Simple case, just one line with timestamp
if cur_has_timestamp and next_has_timestamp:
timestamp = line.split("]")[0].split("[")[1]
text = line.split("]")[1].lstrip(":").strip()
out.append({"timestamp": timestamp, "text": text})
text = ""
timestamp = ""
# Start of multiline
elif cur_has_timestamp:
timestamp = line.split("]")[0].split("[")[1]
text = line.split("]")[1].lstrip(":").strip()
# End of multiline
elif next_has_timestamp:
text += f"\n{line.strip()}"
out.append({"timestamp": timestamp, "text": text})
text = ""
timestamp = ""
# Middle of multiline
else:
text += f"\n{line.strip()}"
else:
out = string
output(out, jsonout)
def run_complete(shell: Union[None, str]):
if not shell or shell in ["bash", "zsh"]:
os.system("register-python-argcomplete knotctl")
@ -92,13 +155,22 @@ def run_config(
config_filename: str,
context: Union[None, str] = None,
baseurl: Union[None, str] = None,
list_config: bool = False,
username: Union[None, str] = None,
password: Union[None, str] = None,
current: Union[None, str] = None,
):
if current:
if os.path.islink(config_filename):
actual_path = os.readlink(config_filename)
print(actual_path.split("-")[-1])
else:
print("none")
return
config = {"baseurl": baseurl, "username": username, "password": password}
needed = []
if context:
symlink = f'{config_filename}-{context}'
symlink = f"{config_filename}-{context}"
found = os.path.isfile(symlink)
if os.path.islink(config_filename):
os.remove(config_filename)
@ -108,6 +180,11 @@ def run_config(
config_filename = symlink
if found:
return
if list_config:
config_data = get_config(config_filename)
config_data.pop("password", None)
output(config_data)
return
if not baseurl:
needed.append("baseurl")
if not username:
@ -154,11 +231,136 @@ def run_list(url: str,
output(string, jsonout)
def run_openstack_sync(cloud: str, name: str, zone: str, headers: dict,
baseurl: str, jsonout: bool):
url = setup_url(
baseurl,
None, # arguments,
None, # data,
name,
None, # rtype,
None, # ttl,
zone,
)
current_records = run_list(url, jsonout=True, headers=headers, ret=True)
openstack_addresses = get_openstack_addresses(cloud, name)
if current_records["Code"] == 404:
for address in openstack_addresses:
rtype = None
if address["version"] == 4:
rtype = "A"
elif address["version"] == 6:
rtype = "AAAA"
if rtype:
url = setup_url(
baseurl,
None, # arguments,
address["addr"], # data,
name,
rtype,
None, # ttl,
zone,
)
run_add(url, jsonout, headers)
else:
previpv4 = False
previpv6 = False
curripv4 = False
curripv6 = False
for record in current_records:
if record.type == "A":
previpv4 = record.data
elif record.type == "AAAA":
previpv6 = record.data
for address in openstack_addresses:
rtype = None
if address.version == 4:
rtype = "A"
curripv4 = True
elif address.version == 6:
rtype = "AAAA"
curripv6 = True
if rtype and recor.type == rtype:
if record.data == address.addr:
continue
else:
url = setup_url(
baseurl,
None, # arguments,
address.addr, # data,
name,
record.type,
None, # ttl,
zone,
)
run_update(url, jsonout, headers)
if previpv4 and not curripv4:
url = setup_url(
baseurl,
None, # arguments,
previpv4, # data,
name,
"A",
None, # ttl,
zone,
)
run_delete(url, jsonout, headers)
if previpv6 and not curripv6:
url = setup_url(
baseurl,
None, # arguments,
previpv6, # data,
name,
"AAAA",
None, # ttl,
zone,
)
run_delete(url, jsonout, headers)
if curripv4 and not previpv4:
url = setup_url(
baseurl,
None, # arguments,
curripv4, # data,
name,
"A",
None, # ttl,
zone,
)
run_add(url, jsonout, headers)
if curripv6 and not previpv6:
url = setup_url(
baseurl,
None, # arguments,
curripv6, # data,
name,
"AAAA",
None, # ttl,
zone,
)
run_add(url, jsonout, headers)
def run_update(url: str, jsonout: bool, headers: dict):
response = requests.patch(url, headers=headers)
output(response.json(), jsonout)
def run_zone(url: str,
jsonout: bool,
headers: dict,
ret=False) -> Union[None, str]:
response = requests.get(url, headers=headers)
zones = response.json()
for zone in zones:
del zone["records"]
string = zones
if ret:
return string
else:
output(string, jsonout)
# Set up the url
def setup_url(
baseurl: str,
@ -176,7 +378,7 @@ def setup_url(
url += "/{}".format(zone)
if name and zone:
if name.endswith(zone.rstrip(".")):
name += '.'
name += "."
url += "/records/{}".format(name)
if zone and name and rtype:
url += "/{}".format(rtype)
@ -244,8 +446,7 @@ def split_url(url: str) -> dict:
}
# Entry point to program
def main() -> int:
def get_parser() -> dict:
description = """Manage DNS records with knot dns rest api:
* https://gitlab.nic.cz/knot/knot-dns-rest"""
@ -293,7 +494,11 @@ def main() -> int:
* https://en.wikipedia.org/wiki/Zone_file
"""
# Grab user input
parser = argparse.ArgumentParser(description=description, epilog=epilog, formatter_class=argparse.RawDescriptionHelpFormatter)
parser = argparse.ArgumentParser(
description=description,
epilog=epilog,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("--json", action=argparse.BooleanOptionalAction)
subparsers = parser.add_subparsers(dest="command")
@ -305,14 +510,30 @@ def main() -> int:
addcmd.add_argument("-t", "--ttl")
addcmd.add_argument("-z", "--zone", required=True)
auditlog_description = "Audit the log file for errors."
subparsers.add_parser("auditlog", description=auditlog_description)
changelog_description = "View the changelog of a zone."
changelogcmd = subparsers.add_parser("changelog",
description=changelog_description)
changelogcmd.add_argument("-z", "--zone", required=True)
complete_description = "Generate shell completion script."
completecmd = subparsers.add_parser("completion", description=complete_description)
completecmd = subparsers.add_parser("completion",
description=complete_description)
completecmd.add_argument("-s", "--shell")
config_description = "Configure access to knot-dns-rest-api."
configcmd = subparsers.add_parser("config", description=config_description)
configcmd.add_argument("-b", "--baseurl")
configcmd.add_argument("-c", "--context")
configcmd.add_argument("-C",
"--current",
action=argparse.BooleanOptionalAction)
configcmd.add_argument("-l",
"--list",
action=argparse.BooleanOptionalAction,
dest="list_config")
configcmd.add_argument("-p", "--password")
configcmd.add_argument("-u", "--username")
@ -323,31 +544,49 @@ def main() -> int:
deletecmd.add_argument("-r", "--rtype")
deletecmd.add_argument("-z", "--zone", required=True)
list_description = "List records in the zone."
list_description = "List records."
listcmd = subparsers.add_parser("list", description=list_description)
listcmd.add_argument("-d", "--data")
listcmd.add_argument("-n", "--name")
listcmd.add_argument("-r", "--rtype")
listcmd.add_argument("-z", "--zone", required=True)
update_description = "Update a record in the zone. The record must exist in the zone.\n"
update_description += "In this case --data, --name, --rtype and --ttl switches are used\n"
update_description += "for searching for the appropriate record, while the --argument\n"
listcmd.add_argument("-z", "--zone", required=False)
openstack_description = "Sync records with openstack."
openstackcmd = subparsers.add_parser("openstack-sync",
description=openstack_description)
openstackcmd.add_argument("-n", "--name", required=True)
openstackcmd.add_argument("-c", "--cloud", required=True)
openstackcmd.add_argument("-z", "--zone", required=True)
user_description = "View user information."
usercmd = subparsers.add_parser("user", description=user_description)
usercmd.add_argument("-u", "--username", default=None)
update_description = (
"Update a record in the zone. The record must exist in the zone.\n")
update_description += (
"In this case --data, --name, --rtype and --ttl switches are used\n")
update_description += (
"for searching for the appropriate record, while the --argument\n")
update_description += "switches are used for updating the record."
update_epilog = """Available arguments are:
data: New record data.
name: New record domain name.
rtype: New record type.
ttl: New record time to live (TTL)."""
updatecmd = subparsers.add_parser("update", description=update_description, epilog=update_epilog, formatter_class=argparse.RawDescriptionHelpFormatter )
updatecmd = subparsers.add_parser(
"update",
description=update_description,
epilog=update_epilog,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
updatecmd.add_argument(
"-a",
"--argument",
nargs="*",
action="append",
metavar="KEY=VALUE",
help=
"Specify key - value pairs to be updated: name=dns1.example.com. or data=127.0.0.1 for example. --argument can be repeated",
help="Specify key - value pairs to be updated: name=dns1.example.com."
+ " or data=127.0.0.1 for example. --argument can be repeated",
required=True,
)
updatecmd.add_argument("-d", "--data", required=True)
@ -356,7 +595,71 @@ def main() -> int:
updatecmd.add_argument("-t", "--ttl")
updatecmd.add_argument("-z", "--zone", required=True)
zone_description = "View zones."
subparsers.add_parser("zone", description=zone_description)
argcomplete.autocomplete(parser)
return parser
def get_token(config) -> str:
# Authenticate
baseurl = config["baseurl"]
username = config["username"]
password = config["password"]
basic = HTTPBasicAuth(username, password)
response = requests.get(baseurl + "/user/login", auth=basic)
token = ""
try:
token = response.json()["token"]
except KeyError:
output(response.json())
except requests.exceptions.JSONDecodeError:
output(
error("Could not decode api response as JSON", "Could not decode"))
return token
def run(url, args, headers, baseurl, parser, username):
try:
if args.command == "add":
run_add(url, args.json, headers)
elif args.command == "delete":
run_delete(url, args.json, headers)
elif args.command == "list":
run_list(url, args.json, headers)
elif args.command == "update":
run_update(url, args.json, headers)
elif args.command == "user":
url = baseurl + f"/user/info/{username}"
run_list(url, args.json, headers)
elif args.command == "auditlog":
url = baseurl + "/user/auditlog"
run_log(url, args.json, headers)
elif args.command == "changelog":
url = baseurl + f"/zones/changelog/{args.zone.rstrip('.')}"
run_log(url, args.json, headers)
elif args.command == "zone":
url = baseurl + "/zones"
run_zone(url, args.json, headers)
elif args.command == "openstack-sync":
run_openstack_sync(args.cloud, args.name, args.zone, headers,
baseurl, args.json)
else:
parser.print_help(sys.stderr)
return 2
except requests.exceptions.RequestException as e:
output(error(e, "Could not connect to server"))
except (RequestsJSONDecodeError, SimplejsonJSONDecodeError):
output(
error("Could not decode api response as JSON", "Could not decode"))
return 0
# Entry point to program
def main() -> int:
parser = get_parser()
args = parser.parse_args()
if args.command == "completion":
run_complete(args.shell)
@ -370,35 +673,29 @@ def main() -> int:
mkdir(config_basepath)
if args.command == "config":
run_config(config_filename, args.context, args.baseurl, args.username,
args.password)
run_config(
config_filename,
args.context,
args.baseurl,
args.list_config,
args.username,
args.password,
args.current,
)
return 0
if not isfile(config_filename):
print("You need to configure knotctl before proceeding")
run_config(config_filename)
config = get_config(config_filename)
baseurl = config["baseurl"]
username = config["username"]
password = config["password"]
# Authenticate
basic = HTTPBasicAuth(username, password)
response = requests.get(baseurl + "/user/login", auth=basic)
try:
token = response.json()["token"]
except KeyError:
output(response.json())
return 1
except requests.exceptions.JSONDecodeError:
output(
error("Could not decode api response as JSON", "Could not decode"))
token = get_token(config)
if token == "":
print("Could not get token, exiting")
return 1
headers = {"Authorization": "Bearer {}".format(token)}
# Route based on command
url = ""
ttl = None
user = config["username"]
if "ttl" in args:
ttl = args.ttl
if args.command != "update":
@ -411,34 +708,29 @@ def main() -> int:
soa_url = setup_url(baseurl, None, None, zname, "SOA", None, args.zone)
soa_json = run_list(soa_url, True, headers, ret=True)
ttl = soa_json[0]["ttl"]
try:
url = setup_url(
baseurl,
args.argument,
args.data,
args.name,
args.rtype,
ttl,
args.zone,
)
except AttributeError:
parser.print_help(sys.stderr)
return 1
try:
if args.command == "add":
run_add(url, args.json, headers)
elif args.command == "delete":
run_delete(url, args.json, headers)
elif args.command == "list":
run_list(url, args.json, headers)
elif args.command == "update":
run_update(url, args.json, headers)
except (RequestsJSONDecodeError, SimplejsonJSONDecodeError):
output(
error("Could not decode api response as JSON", "Could not decode"))
return 0
if args.command == "user":
if args.username:
user = args.username
if args.command in [
"auditlog", "changelog", "openstack-sync", "user", "zone"
]:
pass
else:
try:
url = setup_url(
baseurl,
args.argument,
args.data,
args.name,
args.rtype,
ttl,
args.zone,
)
except AttributeError:
parser.print_help(sys.stderr)
return 1
return run(url, args, headers, baseurl, parser, user)
if __name__ == "__main__":

@ -16,7 +16,7 @@ classifiers=[
"Operating System :: OS Independent",
]
requires-python= ">=3.9"
version = "0.0.7"
version = "0.1.1"
dependencies = [
"argcomplete==2.0.0",

@ -2,3 +2,4 @@ argcomplete==2.0.0
pyyaml==6.0.1
requests==2.27.1
simplejson==3.17.6
openstacksdk==4.2.0

Loading…
Cancel
Save