Compare commits

...

12 commits

Author SHA1 Message Date
3c9293417e Fix openstack sync
Add options to skip sync of IPv4 and/or IPv6 addresses.
2025-01-30 12:14:53 +01:00
Micke Nordin
3c52eb2a81 Update 'README.md' 2025-01-23 08:03:57 +00:00
Micke Nordin
8119459d2f Just skightky tested, does not work 2025-01-10 05:32:22 +01:00
Micke Nordin
8d4c9bc470 Rename variable 2025-01-10 05:11:23 +01:00
Micke Nordin
5ea14a1a38 Merge branch 'add_config_list' 2025-01-10 04:42:43 +01:00
Micke Nordin
f711265629 UNTESTED: Support for syncing with openstack 2025-01-10 04:25:06 +01:00
Micke Nordin
c90ee01b93 New version: 0.1.1 includes new user and zone command
Also allows list without zone for listing all records in all zones
2024-12-21 15:58:21 +01:00
Micke Nordin
9bdaa81aca New version: 0.1.1 includes new user and zone command
Also allows list without zone for listing all records in all zones
2024-12-21 15:57:04 +01:00
Micke Nordin
1cdb6472fa Add support for listing zones and userinfo 2024-12-20 10:35:06 +01:00
Micke Nordin
3fff646945 Bump version to 0.1.0 2024-10-02 10:54:25 +02:00
9a2fcd8a94
rm print statement 2024-08-30 12:21:49 +02:00
69a70bcd0e
Add config --list 2024-08-30 11:21:27 +02:00
4 changed files with 292 additions and 86 deletions

View file

@ -3,11 +3,15 @@
This is a commandline tool for knotapi: https://gitlab.nic.cz/knot/knot-dns-rest
## Build and install
The preffered method of installation is via pipx:
```
pipx install knotctl
```
To install using pip, run the following command in a virtual envrionment.
```
python -m pip install "knotctl @ git+https://code.smolnet.org/micke/knotctl
python -m pip install knotctl
```
To build and install as a deb-package
@ -191,9 +195,9 @@ options:
### LIST
```
usage: knotctl list [-h] [-d DATA] [-n NAME] [-r RTYPE] -z ZONE
usage: knotctl list [-h] [-d DATA] [-n NAME] [-r RTYPE] [-z ZONE]
List records in the zone.
List records.
options:
-h, --help show this help message and exit
@ -231,3 +235,26 @@ Available arguments are:
rtype: New record type.
ttl: New record time to live (TTL).
```
### USER
```
usage: knotctl user [-h] [-u USERNAME]
View user information.
options:
-h, --help show this help message and exit
-u USERNAME, --username USERNAME
```
### ZONE
```
usage: knotctl zone
List zones.
options:
-h, --help show this help message and exit
```

View file

@ -7,11 +7,13 @@ import os
import sys
import urllib.parse
from os import environ, mkdir
from os.path import isdir, isfile, islink, join, split
from os.path import isdir, isfile, join
from typing import Union
from urllib.parse import urlparse
import argcomplete
import openstack
import openstack.config.loader
import requests
import yaml
from requests.models import HTTPBasicAuth
@ -36,17 +38,36 @@ def error(description: str, error: str) -> list[dict]:
def get_config(config_filename: str):
if not isfile(config_filename):
print("You need to configure knotctl before proceeding")
run_config(config_filename)
with open(config_filename, "r") as fh:
return yaml.safe_load(fh.read())
def get_openstack_addresses(cloud: str, name: str):
conn = openstack.connect(cloud=cloud)
# List the servers
server = conn.compute.find_server(name)
if server is None:
print("Server not found")
exit(1)
openstack_addresses = []
for network in server.addresses:
for address in server.addresses[network]:
openstack_addresses.append(address)
return openstack_addresses
def nested_out(input, tabs="") -> str:
string = ""
if isinstance(input, str) or isinstance(input, int):
string += "{}\n".format(input)
elif isinstance(input, dict):
for key, value in input.items():
string += "{}{}: {}".format(tabs, key, nested_out(value, tabs + " "))
string += "{}{}: {}".format(tabs, key,
nested_out(value, tabs + " "))
elif isinstance(input, list):
for entry in input:
string += "{}\n{}".format(tabs, nested_out(entry, tabs + " "))
@ -70,11 +91,9 @@ def run_add(url: str, jsonout: bool, headers: dict):
out = response.json()
if isinstance(out, list):
for record in out:
if (
record["data"] == parsed["data"]
and record["name"] == parsed["name"]
and record["rtype"] == parsed["rtype"]
):
if (record["data"] == parsed["data"]
and record["name"] == parsed["name"]
and record["rtype"] == parsed["rtype"]):
output(record, jsonout)
break
else:
@ -94,29 +113,28 @@ def run_log(url: str, jsonout: bool, headers: dict):
line = lines[index]
index += 1
cur_has_timestamp = line.startswith("[")
next_has_timestamp = index < len(lines) and lines[index].startswith(
"["
)
next_has_timestamp = index < len(
lines) and lines[index].startswith("[")
# Simple case, just one line with timestamp
if cur_has_timestamp and next_has_timestamp:
timestamp = line.split(']')[0].split('[')[1]
text = line.split(']')[1].lstrip(':').strip()
out.append({'timestamp': timestamp, 'text': text})
timestamp = line.split("]")[0].split("[")[1]
text = line.split("]")[1].lstrip(":").strip()
out.append({"timestamp": timestamp, "text": text})
text = ""
timestamp = ""
# Start of multiline
elif cur_has_timestamp:
timestamp = line.split(']')[0].split('[')[1]
text = line.split(']')[1].lstrip(':').strip()
timestamp = line.split("]")[0].split("[")[1]
text = line.split("]")[1].lstrip(":").strip()
# End of multiline
elif next_has_timestamp:
text += f'\n{line.strip()}'
out.append({'timestamp': timestamp, 'text': text})
text += f"\n{line.strip()}"
out.append({"timestamp": timestamp, "text": text})
text = ""
timestamp = ""
# Middle of multiline
else:
text += f'\n{line.strip()}'
text += f"\n{line.strip()}"
else:
out = string
@ -137,6 +155,7 @@ def run_config(
config_filename: str,
context: Union[None, str] = None,
baseurl: Union[None, str] = None,
list_config: bool = False,
username: Union[None, str] = None,
password: Union[None, str] = None,
current: Union[None, str] = None,
@ -144,7 +163,7 @@ def run_config(
if current:
if os.path.islink(config_filename):
actual_path = os.readlink(config_filename)
print(actual_path.split('-')[-1])
print(actual_path.split("-")[-1])
else:
print("none")
return
@ -161,6 +180,11 @@ def run_config(
config_filename = symlink
if found:
return
if list_config:
config_data = get_config(config_filename)
config_data.pop("password", None)
output(config_data)
return
if not baseurl:
needed.append("baseurl")
if not username:
@ -171,8 +195,7 @@ def run_config(
error(
"Can not configure without {}".format(need),
"No {}".format(need),
)
)
))
sys.exit(1)
config[need] = input("Enter {}: ".format(need))
@ -196,7 +219,10 @@ def run_delete(url: str, jsonout: bool, headers: dict):
output(reply, jsonout)
def run_list(url: str, jsonout: bool, headers: dict, ret=False) -> Union[None, str]:
def run_list(url: str,
jsonout: bool,
headers: dict,
ret=False) -> Union[None, str]:
response = requests.get(url, headers=headers)
string = response.json()
if ret:
@ -205,11 +231,106 @@ def run_list(url: str, jsonout: bool, headers: dict, ret=False) -> Union[None, s
output(string, jsonout)
def run_openstack_sync(cloud: str, name: str, zone: str, headers: dict,
baseurl: str, jsonout: bool, ipv4: bool, ipv6: bool):
# List DNS records for name
url = setup_url(
baseurl,
None, # arguments,
None, # data,
name,
None, # rtype,
None, # ttl,
zone,
)
response = run_list(url, jsonout=True, headers=headers, ret=True)
if isinstance(response, dict):
if response["Code"] == 404:
response = []
else:
output(error(f'{name}: could not list records', 'API Error'))
sys.exit(1)
# List of DNS records for name
dns_records = []
for record in response:
rtype = record['rtype']
rname = record['name'].rstrip('.')
raddr = record['data']
if rtype not in ('A', 'AAAA'):
continue
if f'{name}.{zone}' != rname:
output(error(f'{rname}: not the DNS name asked for', 'API Error'))
sys.exit(1)
dns_records.append((rname, rtype, raddr))
# List instance addresses in openstack
openstack_addresses = get_openstack_addresses(cloud, name)
# Add missing DNS records
for address in openstack_addresses:
rtype = None
if address["version"] == 4 and ipv4:
rtype = "A"
elif address["version"] == 6 and ipv6:
rtype = "AAAA"
if not rtype:
continue
raddr = address['addr']
wanted_record = (f'{name}.{zone}', rtype, raddr)
if wanted_record in dns_records:
dns_records.remove(wanted_record)
else:
url = setup_url(
baseurl,
None, # arguments,
raddr, # data,
name,
rtype, # rtype,
None, # ttl,
zone,
)
run_add(url, jsonout, headers)
# Remove obsolete DNS records
for record in dns_records:
rname, rtype, raddr = record
url = setup_url(
baseurl,
None, # arguments,
raddr, # data,
name,
rtype, # rtype,
None, # ttl,
zone,
)
run_delete(url, jsonout, headers)
def run_update(url: str, jsonout: bool, headers: dict):
response = requests.patch(url, headers=headers)
output(response.json(), jsonout)
def run_zone(url: str,
jsonout: bool,
headers: dict,
ret=False) -> Union[None, str]:
response = requests.get(url, headers=headers)
zones = response.json()
for zone in zones:
del zone["records"]
string = zones
if ret:
return string
else:
output(string, jsonout)
# Set up the url
def setup_url(
baseurl: str,
@ -248,16 +369,14 @@ def setup_url(
error(
"ttl only makes sense with rtype, name and zone",
"Missing parameter",
)
)
))
sys.exit(1)
if rtype and (not name or not zone):
output(
error(
"rtype only makes sense with name and zone",
"Missing parameter",
)
)
))
sys.exit(1)
if name and not zone:
output(error("name only makes sense with a zone", "Missing parameter"))
@ -297,8 +416,7 @@ def split_url(url: str) -> dict:
}
# Entry point to program
def main() -> int:
def get_parser() -> dict:
description = """Manage DNS records with knot dns rest api:
* https://gitlab.nic.cz/knot/knot-dns-rest"""
@ -366,18 +484,26 @@ def main() -> int:
subparsers.add_parser("auditlog", description=auditlog_description)
changelog_description = "View the changelog of a zone."
changelogcmd = subparsers.add_parser("changelog", description=changelog_description)
changelogcmd = subparsers.add_parser("changelog",
description=changelog_description)
changelogcmd.add_argument("-z", "--zone", required=True)
complete_description = "Generate shell completion script."
completecmd = subparsers.add_parser("completion", description=complete_description)
completecmd = subparsers.add_parser("completion",
description=complete_description)
completecmd.add_argument("-s", "--shell")
config_description = "Configure access to knot-dns-rest-api."
configcmd = subparsers.add_parser("config", description=config_description)
configcmd.add_argument("-b", "--baseurl")
configcmd.add_argument("-c", "--context")
configcmd.add_argument("-C", "--current", action=argparse.BooleanOptionalAction)
configcmd.add_argument("-C",
"--current",
action=argparse.BooleanOptionalAction)
configcmd.add_argument("-l",
"--list",
action=argparse.BooleanOptionalAction,
dest="list_config")
configcmd.add_argument("-p", "--password")
configcmd.add_argument("-u", "--username")
@ -388,22 +514,32 @@ def main() -> int:
deletecmd.add_argument("-r", "--rtype")
deletecmd.add_argument("-z", "--zone", required=True)
list_description = "List records in the zone."
list_description = "List records."
listcmd = subparsers.add_parser("list", description=list_description)
listcmd.add_argument("-d", "--data")
listcmd.add_argument("-n", "--name")
listcmd.add_argument("-r", "--rtype")
listcmd.add_argument("-z", "--zone", required=True)
listcmd.add_argument("-z", "--zone", required=False)
openstack_description = "Sync records with openstack."
openstackcmd = subparsers.add_parser("openstack-sync",
description=openstack_description)
openstackcmd.add_argument("-n", "--name", required=True)
openstackcmd.add_argument("-c", "--cloud", required=True)
openstackcmd.add_argument("-z", "--zone", required=True)
openstackcmd.add_argument("--no-ipv4", action='store_true')
openstackcmd.add_argument("--no-ipv6", action='store_true')
user_description = "View user information."
usercmd = subparsers.add_parser("user", description=user_description)
usercmd.add_argument("-u", "--username", default=None)
update_description = (
"Update a record in the zone. The record must exist in the zone.\n"
)
"Update a record in the zone. The record must exist in the zone.\n")
update_description += (
"In this case --data, --name, --rtype and --ttl switches are used\n"
)
"In this case --data, --name, --rtype and --ttl switches are used\n")
update_description += (
"for searching for the appropriate record, while the --argument\n"
)
"for searching for the appropriate record, while the --argument\n")
update_description += "switches are used for updating the record."
update_epilog = """Available arguments are:
data: New record data.
@ -421,7 +557,8 @@ def main() -> int:
"--argument",
action="append",
metavar="KEY=VALUE",
help="Specify key - value pairs to be updated: name=dns1.example.com. or data=127.0.0.1 for example. --argument can be repeated",
help="Specify key - value pairs to be updated: name=dns1.example.com."
+ " or data=127.0.0.1 for example. --argument can be repeated",
required=True,
)
updatecmd.add_argument("-d", "--data", required=True)
@ -430,7 +567,72 @@ def main() -> int:
updatecmd.add_argument("-t", "--ttl")
updatecmd.add_argument("-z", "--zone", required=True)
zone_description = "View zones."
subparsers.add_parser("zone", description=zone_description)
argcomplete.autocomplete(parser)
return parser
def get_token(config) -> str:
# Authenticate
baseurl = config["baseurl"]
username = config["username"]
password = config["password"]
basic = HTTPBasicAuth(username, password)
response = requests.get(baseurl + "/user/login", auth=basic)
token = ""
try:
token = response.json()["token"]
except KeyError:
output(response.json())
except requests.exceptions.JSONDecodeError:
output(
error("Could not decode api response as JSON", "Could not decode"))
return token
def run(url, args, headers, baseurl, parser, username):
try:
if args.command == "add":
run_add(url, args.json, headers)
elif args.command == "delete":
run_delete(url, args.json, headers)
elif args.command == "list":
run_list(url, args.json, headers)
elif args.command == "update":
run_update(url, args.json, headers)
elif args.command == "user":
url = baseurl + f"/user/info/{username}"
run_list(url, args.json, headers)
elif args.command == "auditlog":
url = baseurl + "/user/auditlog"
run_log(url, args.json, headers)
elif args.command == "changelog":
url = baseurl + f"/zones/changelog/{args.zone.rstrip('.')}"
run_log(url, args.json, headers)
elif args.command == "zone":
url = baseurl + "/zones"
run_zone(url, args.json, headers)
elif args.command == "openstack-sync":
run_openstack_sync(args.cloud, args.name, args.zone, headers,
baseurl, args.json,
args.no_ipv4 is False, args.no_ipv6 is False)
else:
parser.print_help(sys.stderr)
return 2
except requests.exceptions.RequestException as e:
output(error(e, "Could not connect to server"))
except (RequestsJSONDecodeError, SimplejsonJSONDecodeError):
output(
error("Could not decode api response as JSON", "Could not decode"))
return 0
# Entry point to program
def main() -> int:
parser = get_parser()
args = parser.parse_args()
if args.command == "completion":
run_complete(args.shell)
@ -445,35 +647,28 @@ def main() -> int:
if args.command == "config":
run_config(
config_filename, args.context, args.baseurl, args.username, args.password, args.current
config_filename,
args.context,
args.baseurl,
args.list_config,
args.username,
args.password,
args.current,
)
return 0
if not isfile(config_filename):
print("You need to configure knotctl before proceeding")
run_config(config_filename)
config = get_config(config_filename)
baseurl = config["baseurl"]
username = config["username"]
password = config["password"]
# Authenticate
basic = HTTPBasicAuth(username, password)
response = requests.get(baseurl + "/user/login", auth=basic)
try:
token = response.json()["token"]
except KeyError:
output(response.json())
return 1
except requests.exceptions.JSONDecodeError:
output(error("Could not decode api response as JSON", "Could not decode"))
token = get_token(config)
if token == "":
print("Could not get token, exiting")
return 1
headers = {"Authorization": "Bearer {}".format(token)}
# Route based on command
url = ""
ttl = None
user = config["username"]
if "ttl" in args:
ttl = args.ttl
if args.command != "update":
@ -486,7 +681,12 @@ def main() -> int:
soa_url = setup_url(baseurl, None, None, zname, "SOA", None, args.zone)
soa_json = run_list(soa_url, True, headers, ret=True)
ttl = soa_json[0]["ttl"]
if args.command in ["auditlog", "changelog"]:
if args.command == "user":
if args.username:
user = args.username
if args.command in [
"auditlog", "changelog", "openstack-sync", "user", "zone"
]:
pass
else:
try:
@ -503,29 +703,7 @@ def main() -> int:
parser.print_help(sys.stderr)
return 1
try:
if args.command == "add":
run_add(url, args.json, headers)
elif args.command == "delete":
run_delete(url, args.json, headers)
elif args.command == "list":
run_list(url, args.json, headers)
elif args.command == "update":
run_update(url, args.json, headers)
elif args.command == "auditlog":
url = baseurl + "/user/auditlog"
run_log(url, args.json, headers)
elif args.command == "changelog":
url = baseurl + f"/zones/changelog/{args.zone.rstrip('.')}"
run_log(url, args.json, headers)
else:
parser.print_help(sys.stderr)
return 2
except requests.exceptions.RequestException as e:
output(error(e, "Could not connect to server"))
except (RequestsJSONDecodeError, SimplejsonJSONDecodeError):
output(error("Could not decode api response as JSON", "Could not decode"))
return 0
return run(url, args, headers, baseurl, parser, user)
if __name__ == "__main__":

View file

@ -16,7 +16,7 @@ classifiers=[
"Operating System :: OS Independent",
]
requires-python= ">=3.9"
version = "0.0.9"
version = "0.1.1"
dependencies = [
"argcomplete==2.0.0",

View file

@ -2,3 +2,4 @@ argcomplete==2.0.0
pyyaml==6.0.1
requests==2.27.1
simplejson==3.17.6
openstacksdk==4.2.0