knotctl/knotctl/__init__.py

513 lines
17 KiB
Python
Raw Normal View History

2022-10-23 10:19:03 +02:00
#!/usr/bin/env python3
import argparse
2022-10-24 13:24:38 +02:00
import getpass
2022-10-24 11:11:41 +02:00
import json
2022-10-24 19:13:24 +02:00
import os
2022-10-24 11:11:41 +02:00
import sys
2022-10-27 15:47:54 +02:00
import urllib.parse
2022-10-24 13:24:38 +02:00
from os import environ, mkdir
2024-09-30 16:34:34 +02:00
from os.path import isdir, isfile, join, split
2022-10-23 10:19:03 +02:00
from typing import Union
from urllib.parse import urlparse
2022-10-23 10:19:03 +02:00
import argcomplete
import requests
2022-10-24 13:24:38 +02:00
import yaml
2022-10-23 10:19:03 +02:00
from requests.models import HTTPBasicAuth
2022-10-25 09:10:11 +02:00
from simplejson.errors import JSONDecodeError as SimplejsonJSONDecodeError
2024-09-30 16:34:34 +02:00
try:
from requests.exceptions import JSONDecodeError as RequestsJSONDecodeError
except ImportError:
from requests.exceptions import InvalidJSONError as RequestsJSONDecodeError
2022-10-23 10:19:03 +02:00
2022-10-24 19:13:24 +02:00
# Helper functions
2022-10-27 15:47:54 +02:00
def error(description: str, error: str) -> list[dict]:
2022-10-24 11:11:41 +02:00
response = []
reply = {}
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/406
reply["Code"] = 406
reply["Description"] = description
reply["Error"] = error
response.append(reply)
return response
2022-10-24 13:24:38 +02:00
def get_config(config_filename: str):
with open(config_filename, "r") as fh:
return yaml.safe_load(fh.read())
2022-10-24 11:11:41 +02:00
def nested_out(input, tabs="") -> str:
string = ""
2022-10-24 17:49:22 +02:00
if isinstance(input, str) or isinstance(input, int):
2022-10-24 11:11:41 +02:00
string += "{}\n".format(input)
elif isinstance(input, dict):
for key, value in input.items():
2024-09-30 16:34:34 +02:00
string += "{}{}: {}".format(tabs, key, nested_out(value, tabs + " "))
2022-10-24 11:11:41 +02:00
elif isinstance(input, list):
for entry in input:
string += "{}\n{}".format(tabs, nested_out(entry, tabs + " "))
return string
2022-10-27 15:47:54 +02:00
def output(response: list[dict], jsonout: bool = False):
2022-10-24 19:13:24 +02:00
try:
if jsonout:
print(json.dumps(response))
else:
print(nested_out(response))
except BrokenPipeError:
pass
# Define the runner for each command
2022-10-24 13:24:38 +02:00
def run_add(url: str, jsonout: bool, headers: dict):
parsed = split_url(url)
2022-10-24 11:11:41 +02:00
response = requests.put(url, headers=headers)
out = response.json()
if isinstance(out, list):
for record in out:
2024-09-30 16:34:34 +02:00
if (
record["data"] == parsed["data"]
and record["name"] == parsed["name"]
and record["rtype"] == parsed["rtype"]
):
output(record, jsonout)
break
else:
output(out, jsonout)
2022-10-23 10:19:03 +02:00
2024-09-30 16:34:34 +02:00
def run_audit(url: str, jsonout: bool, headers: dict):
response = requests.get(url, headers=headers)
string = response.content.decode("utf-8")
if jsonout:
out = []
lines = string.splitlines()
index = 0
text = ""
timestamp = ""
while index < len(lines):
line = lines[index]
index += 1
cur_has_timestamp = line.startswith("[")
next_has_timestamp = index < len(lines) and lines[index].startswith(
"["
)
# Simple case, just one line with timestamp
if cur_has_timestamp and next_has_timestamp:
text = ":". join(line.split(':')[1:])
timestamp = line.split(']')[0].split('[')[1]
out.append({'timestamp': timestamp, 'text': text})
text = ""
timestamp = ""
elif cur_has_timestamp:
timestamp = line.split(']')[0].split('[')[1]
text = ":". join(line.split(':')[1:])
elif next_has_timestamp:
text += f'\n{line}'
out.append({'timestamp': timestamp, 'text': text})
text = ""
timestamp = ""
else:
text += f'\n{line}'
else:
out = string
output(out, jsonout)
2022-10-24 19:13:24 +02:00
def run_complete(shell: Union[None, str]):
if not shell or shell in ["bash", "zsh"]:
os.system("register-python-argcomplete knotctl")
elif shell == "fish":
os.system("register-python-argcomplete --shell fish knotctl")
elif shell == "tcsh":
2022-10-25 08:27:48 +02:00
os.system("register-python-argcomplete --shell tcsh knotctl")
2022-10-24 19:13:24 +02:00
2022-10-24 13:24:38 +02:00
def run_config(
config_filename: str,
context: Union[None, str] = None,
2022-10-24 13:24:38 +02:00
baseurl: Union[None, str] = None,
username: Union[None, str] = None,
password: Union[None, str] = None,
):
config = {"baseurl": baseurl, "username": username, "password": password}
needed = []
if context:
2024-09-30 16:34:34 +02:00
symlink = f"{config_filename}-{context}"
found = os.path.isfile(symlink)
if os.path.islink(config_filename):
os.remove(config_filename)
elif os.path.isfile(config_filename):
os.rename(config_filename, symlink)
os.symlink(symlink, config_filename)
config_filename = symlink
if found:
return
2022-10-24 13:24:38 +02:00
if not baseurl:
needed.append("baseurl")
if not username:
needed.append("username")
for need in needed:
if need == "":
output(
2023-01-09 08:38:09 +01:00
error(
"Can not configure without {}".format(need),
"No {}".format(need),
2024-09-30 16:34:34 +02:00
)
)
2022-10-24 13:24:38 +02:00
sys.exit(1)
2023-01-09 08:38:44 +01:00
config[need] = input("Enter {}: ".format(need))
2022-10-24 13:24:38 +02:00
if not password:
try:
config["password"] = getpass.getpass()
except EOFError:
output(error("Can not configure without password", "No password"))
sys.exit(1)
with open(config_filename, "w") as fh:
fh.write(yaml.dump(config))
def run_delete(url: str, jsonout: bool, headers: dict):
2022-10-24 11:11:41 +02:00
response = requests.delete(url, headers=headers)
2022-10-24 17:49:22 +02:00
reply = response.json()
if not reply and response.status_code == requests.codes.ok:
reply = [{"Code": 200, "Description": "{} deleted".format(url)}]
output(reply, jsonout)
2022-10-24 11:11:41 +02:00
2022-10-23 10:19:03 +02:00
2024-09-30 16:34:34 +02:00
def run_list(url: str, jsonout: bool, headers: dict, ret=False) -> Union[None, str]:
2022-10-24 11:11:41 +02:00
response = requests.get(url, headers=headers)
2022-11-10 12:50:18 +01:00
string = response.json()
if ret:
return string
else:
output(string, jsonout)
2022-10-23 10:19:03 +02:00
2022-10-24 11:11:41 +02:00
2022-10-24 13:24:38 +02:00
def run_update(url: str, jsonout: bool, headers: dict):
2022-10-24 11:11:41 +02:00
response = requests.patch(url, headers=headers)
output(response.json(), jsonout)
# Set up the url
def setup_url(
2022-10-24 13:24:38 +02:00
baseurl: str,
2022-10-27 15:47:54 +02:00
arguments: Union[None, list[str]],
2022-10-24 17:49:22 +02:00
data: Union[None, str],
2022-10-23 10:19:03 +02:00
name: Union[None, str],
rtype: Union[None, str],
2022-10-24 17:49:22 +02:00
ttl: Union[None, str],
2022-10-23 10:19:03 +02:00
zone: Union[None, str],
2022-10-24 11:11:41 +02:00
) -> str:
2022-10-24 13:24:38 +02:00
url = baseurl + "/zones"
2022-10-23 10:19:03 +02:00
if zone:
2022-10-25 10:55:34 +02:00
if not zone.endswith("."):
zone += "."
2022-10-24 11:11:41 +02:00
url += "/{}".format(zone)
2022-10-23 10:19:03 +02:00
if name and zone:
if name.endswith(zone.rstrip(".")):
2024-09-30 16:34:34 +02:00
name += "."
2022-10-23 10:19:03 +02:00
url += "/records/{}".format(name)
if zone and name and rtype:
url += "/{}".format(rtype)
if data and zone and name and rtype:
2022-10-24 17:49:22 +02:00
url += "/{}".format(data)
if ttl and data and zone and name and rtype:
url += "/{}".format(ttl)
2022-10-27 15:47:54 +02:00
if data and zone and name and rtype and arguments:
url += "?"
for arg in arguments:
if not url.endswith("?"):
url += "&"
key, value = arg.split("=")
url += key + "=" + urllib.parse.quote_plus(value)
2022-10-24 17:49:22 +02:00
if ttl and (not rtype or not name or not zone):
output(
2023-01-09 08:38:09 +01:00
error(
"ttl only makes sense with rtype, name and zone",
"Missing parameter",
2024-09-30 16:34:34 +02:00
)
)
2022-10-24 17:49:22 +02:00
sys.exit(1)
2022-10-24 11:11:41 +02:00
if rtype and (not name or not zone):
2022-10-24 17:49:22 +02:00
output(
2023-01-09 08:38:09 +01:00
error(
"rtype only makes sense with name and zone",
"Missing parameter",
2024-09-30 16:34:34 +02:00
)
)
2022-10-24 11:11:41 +02:00
sys.exit(1)
if name and not zone:
output(error("name only makes sense with a zone", "Missing parameter"))
sys.exit(1)
return url
2022-10-23 10:19:03 +02:00
def split_url(url: str) -> dict:
parsed = urlparse(url, allow_fragments=False)
path = parsed.path
query = parsed.query
arguments: Union[None, list[str]] = query.split("&")
path_arr = path.split("/")
data: Union[None, str] = None
name: Union[None, str] = None
rtype: Union[None, str] = None
ttl: Union[None, str] = None
zone: Union[None, str] = None
if len(path_arr) > 2:
zone = path_arr[2]
if len(path_arr) > 4:
name = path_arr[4]
if len(path_arr) > 5:
rtype = path_arr[5]
if len(path_arr) > 6:
data = path_arr[6]
if len(path_arr) > 7:
ttl = path_arr[7]
return {
"arguments": arguments,
"data": data,
"name": name,
"rtype": rtype,
"ttl": ttl,
"zone": zone,
}
2022-10-24 19:13:24 +02:00
# Entry point to program
2022-10-24 13:24:38 +02:00
def main() -> int:
description = """Manage DNS records with knot dns rest api:
* https://gitlab.nic.cz/knot/knot-dns-rest"""
epilog = """
The Domain Name System specifies a database of information
elements for network resources. The types of information
elements are categorized and organized with a list of DNS
record types, the resource records (RRs). Each record has a
name, a type, an expiration time (time to live), and
type-specific data.
The following is a list of terms used in this program:
----------------------------------------------------------------
| Vocabulary | Description |
----------------------------------------------------------------
| zone | A DNS zone is a specific portion of the DNS |
| | namespace in the Domain Name System (DNS), |
| | which a specific organization or administrator |
| | manages. |
----------------------------------------------------------------
| name | In the Internet, a domain name is a string that |
| | identifies a realm of administrative autonomy, |
| | authority or control. Domain names are often |
| | used to identify services provided through the |
| | Internet, such as websites, email services and |
| | more. |
----------------------------------------------------------------
| rtype | A record type indicates the format of the data |
| | and it gives a hint of its intended use. For |
| | example, the A record is used to translate from |
| | a domain name to an IPv4 address, the NS record |
| | lists which name servers can answer lookups on |
| | a DNS zone, and the MX record specifies the |
| | mail server used to handle mail for a domain |
| | specified in an e-mail address. |
----------------------------------------------------------------
| data | A records data is of type-specific relevance, |
| | such as the IP address for address records, or |
| | the priority and hostname for MX records. |
----------------------------------------------------------------
This information was compiled from Wikipedia:
* https://en.wikipedia.org/wiki/DNS_zone
* https://en.wikipedia.org/wiki/Domain_Name_System
* https://en.wikipedia.org/wiki/Zone_file
"""
2022-10-24 13:24:38 +02:00
# Grab user input
2024-09-30 16:34:34 +02:00
parser = argparse.ArgumentParser(
description=description,
epilog=epilog,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
2022-10-24 13:24:38 +02:00
parser.add_argument("--json", action=argparse.BooleanOptionalAction)
subparsers = parser.add_subparsers(dest="command")
add_description = "Add a new record to the zone."
addcmd = subparsers.add_parser("add", description=add_description)
2022-10-24 17:49:22 +02:00
addcmd.add_argument("-d", "--data", required=True)
addcmd.add_argument("-n", "--name", required=True)
addcmd.add_argument("-r", "--rtype", required=True)
2022-11-10 12:50:18 +01:00
addcmd.add_argument("-t", "--ttl")
2022-10-24 17:49:22 +02:00
addcmd.add_argument("-z", "--zone", required=True)
2024-09-30 16:34:34 +02:00
audit_description = "Audit the log file for errors."
subparsers.add_parser("audit", description=audit_description)
complete_description = "Generate shell completion script."
completecmd = subparsers.add_parser("completion", description=complete_description)
2022-10-24 19:13:24 +02:00
completecmd.add_argument("-s", "--shell")
config_description = "Configure access to knot-dns-rest-api."
configcmd = subparsers.add_parser("config", description=config_description)
2022-10-24 17:49:22 +02:00
configcmd.add_argument("-b", "--baseurl")
configcmd.add_argument("-c", "--context")
2022-10-24 17:49:22 +02:00
configcmd.add_argument("-p", "--password")
configcmd.add_argument("-u", "--username")
delete_description = "Delete a record from the zone."
deletecmd = subparsers.add_parser("delete", description=delete_description)
2022-10-24 17:49:22 +02:00
deletecmd.add_argument("-d", "--data")
deletecmd.add_argument("-n", "--name")
deletecmd.add_argument("-r", "--rtype")
deletecmd.add_argument("-z", "--zone", required=True)
list_description = "List records in the zone."
listcmd = subparsers.add_parser("list", description=list_description)
2022-10-24 17:49:22 +02:00
listcmd.add_argument("-d", "--data")
listcmd.add_argument("-n", "--name")
listcmd.add_argument("-r", "--rtype")
2024-06-24 16:06:49 +02:00
listcmd.add_argument("-z", "--zone", required=True)
2022-10-24 17:49:22 +02:00
2024-09-30 16:34:34 +02:00
update_description = (
"Update a record in the zone. The record must exist in the zone.\n"
)
update_description += (
"In this case --data, --name, --rtype and --ttl switches are used\n"
)
update_description += (
"for searching for the appropriate record, while the --argument\n"
)
update_description += "switches are used for updating the record."
update_epilog = """Available arguments are:
data: New record data.
name: New record domain name.
rtype: New record type.
ttl: New record time to live (TTL)."""
2024-09-30 16:34:34 +02:00
updatecmd = subparsers.add_parser(
"update",
description=update_description,
epilog=update_epilog,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
2022-10-27 15:47:54 +02:00
updatecmd.add_argument(
"-a",
"--argument",
nargs="*",
action="append",
metavar="KEY=VALUE",
2024-09-30 16:34:34 +02:00
help="Specify key - value pairs to be updated: name=dns1.example.com. or data=127.0.0.1 for example. --argument can be repeated",
2022-10-27 15:47:54 +02:00
required=True,
)
2022-10-24 17:49:22 +02:00
updatecmd.add_argument("-d", "--data", required=True)
updatecmd.add_argument("-n", "--name", required=True)
updatecmd.add_argument("-r", "--rtype", required=True)
2022-10-27 15:47:54 +02:00
updatecmd.add_argument("-t", "--ttl")
2022-10-24 17:49:22 +02:00
updatecmd.add_argument("-z", "--zone", required=True)
2022-10-24 13:24:38 +02:00
argcomplete.autocomplete(parser)
args = parser.parse_args()
if args.command == "completion":
2022-10-24 19:13:24 +02:00
run_complete(args.shell)
return 0
# Make sure we have config
config_basepath = join(environ["HOME"], ".knot")
config_filename = join(config_basepath, "config")
if not isdir(config_basepath):
mkdir(config_basepath)
2022-10-25 08:27:48 +02:00
if args.command == "config":
2024-09-30 16:34:34 +02:00
run_config(
config_filename, args.context, args.baseurl, args.username, args.password
)
2022-10-25 08:27:48 +02:00
return 0
2022-10-24 19:13:24 +02:00
if not isfile(config_filename):
2022-10-25 08:27:48 +02:00
print("You need to configure knotctl before proceeding")
run_config(config_filename)
2022-10-24 19:13:24 +02:00
config = get_config(config_filename)
baseurl = config["baseurl"]
username = config["username"]
password = config["password"]
# Authenticate
basic = HTTPBasicAuth(username, password)
response = requests.get(baseurl + "/user/login", auth=basic)
try:
token = response.json()["token"]
except KeyError:
output(response.json())
return 1
2023-06-26 10:54:49 +02:00
except requests.exceptions.JSONDecodeError:
2024-09-30 16:34:34 +02:00
output(error("Could not decode api response as JSON", "Could not decode"))
2023-06-26 09:18:26 +02:00
return 1
2022-10-24 19:13:24 +02:00
headers = {"Authorization": "Bearer {}".format(token)}
2022-10-24 13:24:38 +02:00
# Route based on command
2022-10-24 17:49:22 +02:00
ttl = None
2023-01-09 08:38:09 +01:00
if "ttl" in args:
2022-10-24 17:49:22 +02:00
ttl = args.ttl
2022-11-10 09:54:38 +01:00
if args.command != "update":
args.argument = None
2022-11-10 12:50:18 +01:00
if args.command == "add" and not ttl:
2023-01-09 08:38:09 +01:00
if args.zone.endswith("."):
2022-11-10 12:50:18 +01:00
zname = args.zone
else:
2023-01-09 08:38:09 +01:00
zname = args.zone + "."
soa_url = setup_url(baseurl, None, None, zname, "SOA", None, args.zone)
2022-11-10 12:50:18 +01:00
soa_json = run_list(soa_url, True, headers, ret=True)
ttl = soa_json[0]["ttl"]
2024-09-30 16:34:34 +02:00
if args.command == "audit":
url = baseurl + "/user/auditlog"
else:
try:
url = setup_url(
baseurl,
args.argument,
args.data,
args.name,
args.rtype,
ttl,
args.zone,
)
except AttributeError:
parser.print_help(sys.stderr)
return 1
2022-11-10 09:06:25 +01:00
2022-10-25 09:10:11 +02:00
try:
if args.command == "add":
run_add(url, args.json, headers)
elif args.command == "delete":
run_delete(url, args.json, headers)
elif args.command == "list":
run_list(url, args.json, headers)
elif args.command == "update":
run_update(url, args.json, headers)
2024-09-30 16:34:34 +02:00
elif args.command == "audit":
run_audit(url, args.json, headers)
else:
parser.print_help(sys.stderr)
return 1
except requests.exceptions.RequestException as e:
output(error(e, "Could not connect to server"))
2022-10-25 09:10:11 +02:00
except (RequestsJSONDecodeError, SimplejsonJSONDecodeError):
2024-09-30 16:34:34 +02:00
output(error("Could not decode api response as JSON", "Could not decode"))
2022-10-24 13:24:38 +02:00
return 0
if __name__ == "__main__":
sys.exit(main())