Compare commits

...

48 commits
v0.0.2 ... main

Author SHA1 Message Date
Micke Nordin
70e82692d4 Merge pull request 'Refactor code to to be usable as lib' (#12) from kano-refactor-lib into main
Reviewed-on: #12
2025-01-30 11:38:09 +00:00
Micke Nordin
9c31d7f386 Merge branch 'main' into kano-refactor-lib 2025-01-30 11:37:52 +00:00
Micke Nordin
b12471087d
Add makefile 2025-01-30 12:24:04 +01:00
Micke Nordin
21bf26519d
Bump version 2025-01-30 12:10:07 +01:00
Micke Nordin
f63c4a3cee
Add main script 2025-01-30 12:08:55 +01:00
Micke Nordin
28bf4a4ac1
Try to support briefcase 2025-01-30 11:29:25 +01:00
Micke Nordin
3c52eb2a81 Update 'README.md' 2025-01-23 08:03:57 +00:00
Micke Nordin
a43df8f86c
Split out runners 2025-01-17 15:49:08 +01:00
Micke Nordin
d1aec6bbb0
Begin refactor 2025-01-17 13:32:13 +01:00
Micke Nordin
33a65181a5
Add deps 2025-01-17 09:18:06 +01:00
Micke Nordin
8119459d2f Just skightky tested, does not work 2025-01-10 05:32:22 +01:00
Micke Nordin
8d4c9bc470 Rename variable 2025-01-10 05:11:23 +01:00
Micke Nordin
5ea14a1a38 Merge branch 'add_config_list' 2025-01-10 04:42:43 +01:00
Micke Nordin
f711265629 UNTESTED: Support for syncing with openstack 2025-01-10 04:25:06 +01:00
Micke Nordin
c90ee01b93 New version: 0.1.1 includes new user and zone command
Also allows list without zone for listing all records in all zones
2024-12-21 15:58:21 +01:00
Micke Nordin
9bdaa81aca New version: 0.1.1 includes new user and zone command
Also allows list without zone for listing all records in all zones
2024-12-21 15:57:04 +01:00
Micke Nordin
1cdb6472fa Add support for listing zones and userinfo 2024-12-20 10:35:06 +01:00
Micke Nordin
3fff646945 Bump version to 0.1.0 2024-10-02 10:54:25 +02:00
Micke Nordin
41c5efa97e Remove nargs=* to flatten list 2024-10-02 10:53:37 +02:00
Micke Nordin
1c04ebcc18 Bump version to 0.0.9 2024-10-01 11:25:12 +02:00
Micke Nordin
cd32772689 Fix url unbound bug 2024-10-01 11:23:54 +02:00
Micke Nordin
a4dd0912fc Add usage for experimental features 2024-10-01 10:55:11 +02:00
Micke Nordin
35593bb503 Rename audit -> auditlog and add support for changelog 2024-10-01 10:15:36 +02:00
Micke Nordin
f6915b3e4a Add feature to show current context 2024-09-30 17:00:38 +02:00
Micke Nordin
fb89d01be1 Fix text in multiline case 2024-09-30 16:43:35 +02:00
Micke Nordin
c26a46e9a4 Early support for auditlog 2024-09-30 16:34:34 +02:00
9a2fcd8a94
rm print statement 2024-08-30 12:21:49 +02:00
69a70bcd0e
Add config --list 2024-08-30 11:21:27 +02:00
Micke Nordin
3787ad12df Update help/docs and actually make it possible to add more than one argument 2024-07-03 10:28:17 +02:00
Micke Nordin
2346c2de2e Merge pull request 'Uses flit and pyproject.toml' (#7) from pyproject into main
Reviewed-on: #7
2024-06-27 10:10:49 +00:00
Kushal Das
093511b597
Updates README 2024-06-27 11:26:12 +02:00
Kushal Das
4d92a08eab
Uses flit for building the project 2024-06-27 11:22:10 +02:00
Micke Nordin
f68bfc3768 Automatically append a dot if the name ends with the zone name without dot. Fixes #3 2024-06-24 16:23:00 +02:00
Micke Nordin
287d9a6c2d Get the older error if the new one does not exsist
Fixes: #2
2024-06-24 16:14:22 +02:00
Micke Nordin
d88ad607c9 Make zone required for list. Fixes #6 2024-06-24 16:06:49 +02:00
Micke Nordin
6c59202263 Make it possible to have multiple contexts 2024-06-24 16:04:52 +02:00
Micke Nordin
5433992395
Fix error type, thanks patlu 2023-06-26 10:54:49 +02:00
Micke Nordin
5b039dc8da
Format 2023-06-26 09:18:54 +02:00
Micke Nordin
f16eb88eab
Catch json decode error 2023-06-26 09:18:26 +02:00
Micke Nordin
665942653e
Only output a single record on successfull add
Fixes #1
2023-01-10 17:24:23 +01:00
Micke Nordin
7caed48746
V 0.0.6 2023-01-09 08:40:29 +01:00
Micke Nordin
5eeca61e09
Add space before input 2023-01-09 08:38:44 +01:00
Micke Nordin
570b6e0ff8
Format 2023-01-09 08:38:09 +01:00
Micke Nordin
d5902b57be
Fix config bug 2023-01-09 08:37:41 +01:00
Micke Nordin
7b9d05f5a8 Update 'README.md' 2022-11-10 12:53:44 +00:00
Micke Nordin
7817cb43b8
Make ttl optional for add 2022-11-10 12:50:18 +01:00
Micke Nordin
33a21cb7cb
Fix AttributeError for attribute flag 2022-11-10 09:54:38 +01:00
Micke Nordin
5baba47b9c
Show help when run without arguments 2022-11-10 09:09:56 +01:00
13 changed files with 1035 additions and 317 deletions

0
CHANGELOG Normal file
View file

13
Makefile Normal file
View file

@ -0,0 +1,13 @@
.PHONY: publish
publish:
flit publish
.PHONY: deb
deb:
briefcase update linux system --target debian:testing
briefcase build linux system --target debian:testing
briefcase package linux system --target debian:testing
.PHONY: clean
clean:
rm -rf build dist

162
README.md
View file

@ -3,34 +3,50 @@
This is a commandline tool for knotapi: https://gitlab.nic.cz/knot/knot-dns-rest
## Build and install
To install using pip, run the following command:
The preffered method of installation is via pipx:
```
pip3 install git+https://code.smolnet.org/micke/knotctl
pipx install knotctl
```
To install using pip, run the following command in a virtual envrionment.
```
python -m pip install knotctl
```
To build and install as a deb-package
```
sudo apt install python3-stdeb
git clone https://code.smolnet.org/micke/knotctl
cd knotctl
python3 setup.py --command-packages=stdeb.command bdist_deb
sudo dpkg -i deb_dist/knotctl_0.0.2-1_all.deb
sudo dpkg -i deb_dist/knotctl_*_all.deb
```
A prebuilt deb-package is also available from the release page: https://code.smolnet.org/micke/knotctl/releases/
## Shell completion
For bash: add this to .bashrc
```
source <(knotctl completion)
```
For fish, run:
```
knotctl completion --shell fish > ~/.config/fish/completions/knotctl.fish
```
For tcsh: add this to .cshrc
```
complete "knotctl" 'p@*@`python-argcomplete-tcsh "knotctl"`@' ;
```
For zsh: add this to .zshrc
```
autoload -U bashcompinit
bashcompinit
@ -39,18 +55,68 @@ source <(knotctl completion)
## Usage
```
usage: knotctl [-h] [--json | --no-json]
{add,completion,config,delete,list,update} ...
{add,auditlog,changelog,completion,config,delete,list,update}
...
Manage DNS records with knot dns rest api:
* https://gitlab.nic.cz/knot/knot-dns-rest
positional arguments:
{add,completion,config,delete,list,update}
{add,auditlog,changelog,completion,config,delete,list,update}
options:
-h, --help show this help message and exit
--json, --no-json
The Domain Name System specifies a database of information
elements for network resources. The types of information
elements are categorized and organized with a list of DNS
record types, the resource records (RRs). Each record has a
name, a type, an expiration time (time to live), and
type-specific data.
The following is a list of terms used in this program:
----------------------------------------------------------------
| Vocabulary | Description |
----------------------------------------------------------------
| zone | A DNS zone is a specific portion of the DNS |
| | namespace in the Domain Name System (DNS), |
| | which a specific organization or administrator |
| | manages. |
----------------------------------------------------------------
| name | In the Internet, a domain name is a string that |
| | identifies a realm of administrative autonomy, |
| | authority or control. Domain names are often |
| | used to identify services provided through the |
| | Internet, such as websites, email services and |
| | more. |
----------------------------------------------------------------
| rtype | A record type indicates the format of the data |
| | and it gives a hint of its intended use. For |
| | example, the A record is used to translate from |
| | a domain name to an IPv4 address, the NS record |
| | lists which name servers can answer lookups on |
| | a DNS zone, and the MX record specifies the |
| | mail server used to handle mail for a domain |
| | specified in an e-mail address. |
----------------------------------------------------------------
| data | A records data is of type-specific relevance, |
| | such as the IP address for address records, or |
| | the priority and hostname for MX records. |
----------------------------------------------------------------
This information was compiled from Wikipedia:
* https://en.wikipedia.org/wiki/DNS_zone
* https://en.wikipedia.org/wiki/Domain_Name_System
* https://en.wikipedia.org/wiki/Zone_file
```
### ADD
```
usage: knotctl add [-h] -d DATA -n NAME -r RTYPE -t TTL -z ZONE
usage: knotctl add [-h] -d DATA -n NAME -r RTYPE [-t TTL] -z ZONE
Add a new record to the zone.
options:
-h, --help show this help message and exit
@ -60,28 +126,64 @@ options:
-t TTL, --ttl TTL
-z ZONE, --zone ZONE
```
### COMPLETION
```
usage: knotctl completion [-h] [-s SHELL]
Generate shell completion script.
options:
-h, --help show this help message and exit
-s SHELL, --shell SHELL
```
### CONFIG
### AUDITLOG
```
usage: knotctl config [-h] [-b BASEURL] [-p PASSWORD] [-u USERNAME]
usage: knotctl auditlog [-h]
Audit the log file for errors.
options:
-h, --help show this help message and exit
```
### CHANGELOG
```
usage: knotctl changelog [-h] -z ZONE
View the changelog of a zone.
options:
-h, --help show this help message and exit
-z ZONE, --zone ZONE
```
### CONFIG
```
usage: knotctl config [-h] [-b BASEURL] [-c CONTEXT] [-p PASSWORD] [-u USERNAME]
Configure access to knot-dns-rest-api.
options:
-h, --help show this help message and exit
-b BASEURL, --baseurl BASEURL
-c CONTEXT, --context CONTEXT
-p PASSWORD, --password PASSWORD
-u USERNAME, --username USERNAME
```
### DELETE
```
usage: knotctl delete [-h] [-d DATA] [-n NAME] [-r RTYPE] -z ZONE
Delete a record from the zone.
options:
-h, --help show this help message and exit
-d DATA, --data DATA
@ -89,10 +191,14 @@ options:
-r RTYPE, --rtype RTYPE
-z ZONE, --zone ZONE
```
### LIST
```
usage: knotctl list [-h] [-d DATA] [-n NAME] [-r RTYPE] [-z ZONE]
List records.
options:
-h, --help show this help message and exit
-d DATA, --data DATA
@ -100,19 +206,55 @@ options:
-r RTYPE, --rtype RTYPE
-z ZONE, --zone ZONE
```
### UPDATE
```
usage: knotctl update [-h] -a [ARGUMENT ...] -d DATA -n NAME -r RTYPE [-t TTL]
-z ZONE
Update a record in the zone. The record must exist in the zone.
In this case --data, --name, --rtype and --ttl switches are used
for searching for the appropriate record, while the --argument
switches are used for updating the record.
options:
-h, --help show this help message and exit
-a [ARGUMENT ...], --argument [ARGUMENT ...]
-a [KEY=VALUE ...], --argument [KEY=VALUE ...]
Specify key - value pairs to be updated:
name=dns1.example.com.
name=dns1.example.com. or data=127.0.0.1 for example.
--argument can be repeated
-d DATA, --data DATA
-n NAME, --name NAME
-r RTYPE, --rtype RTYPE
-t TTL, --ttl TTL
-z ZONE, --zone ZONE
Available arguments are:
data: New record data.
name: New record domain name.
rtype: New record type.
ttl: New record time to live (TTL).
```
### USER
```
usage: knotctl user [-h] [-u USERNAME]
View user information.
options:
-h, --help show this help message and exit
-u USERNAME, --username USERNAME
```
### ZONE
```
usage: knotctl zone
List zones.
options:
-h, --help show this help message and exit
```

62
pyproject.toml Normal file
View file

@ -0,0 +1,62 @@
[build-system]
requires = ["flit_core >=3.2,<4"]
build-backend = "flit_core.buildapi"
[project]
name="knotctl"
description="A CLI for knotapi."
authors = [
{name = "Micke Nordin", email = "hej@mic.ke"},
]
license= { file="LICENSE" }
readme= "README.md"
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: GNU General Public License v3 (GPLv3)",
"Operating System :: OS Independent",
]
requires-python= ">=3.9"
version = "0.1.2"
dependencies = [
"argcomplete==2.0.0",
"pyyaml==6.0.1",
"requests==2.27.1",
"simplejson==3.17.6",
"openstacksdk==4.2.0",
]
[project.urls]
Source="https://code.smolnet.org/micke/knotctl"
Documentation = "https://code.smolnet.org/micke/knotctl"
[project.scripts]
knotctl="knotctl:main"
[tool.flit.sdist]
include = ["LICENSE", "README.md"]
[tool.briefcase]
project_name = "knotctl"
bundle = "org.smolnet"
version = "0.1.2"
[tool.briefcase.app.knotctl]
formal_name = "knotctl"
description = "A CLI for knotapi."
long_description = "A CLI for knotapi."
sources = ['src/knotctl']
console_app = "True"
requires = [
"argcomplete==2.0.0",
"pyyaml==6.0.1",
"requests==2.27.1",
"simplejson==3.17.6",
"openstacksdk==4.2.0",
]
[tool.briefcase.app.knotctl.linux.system.debian]
system_runtime_requires = [
"libpython3.13",
]

View file

@ -1,4 +1,5 @@
argcomplete==2.0.0
pyyaml==5.4.1
pyyaml==6.0.1
requests==2.27.1
simplejson==3.17.6
openstacksdk==4.2.0

View file

@ -1,280 +0,0 @@
#!/usr/bin/env python3
import argparse
import getpass
import json
import os
import sys
import urllib.parse
from os import environ, mkdir
from os.path import isdir, isfile, join
from typing import Union
import argcomplete
import requests
import yaml
from requests.exceptions import JSONDecodeError as RequestsJSONDecodeError
from requests.models import HTTPBasicAuth
from simplejson.errors import JSONDecodeError as SimplejsonJSONDecodeError
# Helper functions
def error(description: str, error: str) -> list[dict]:
response = []
reply = {}
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/406
reply["Code"] = 406
reply["Description"] = description
reply["Error"] = error
response.append(reply)
return response
def get_config(config_filename: str):
with open(config_filename, "r") as fh:
return yaml.safe_load(fh.read())
def nested_out(input, tabs="") -> str:
string = ""
if isinstance(input, str) or isinstance(input, int):
string += "{}\n".format(input)
elif isinstance(input, dict):
for key, value in input.items():
string += "{}{}: {}".format(tabs, key,
nested_out(value, tabs + " "))
elif isinstance(input, list):
for entry in input:
string += "{}\n{}".format(tabs, nested_out(entry, tabs + " "))
return string
def output(response: list[dict], jsonout: bool = False):
try:
if jsonout:
print(json.dumps(response))
else:
print(nested_out(response))
except BrokenPipeError:
pass
# Define the runner for each command
def run_add(url: str, jsonout: bool, headers: dict):
print(url)
response = requests.put(url, headers=headers)
output(response.json(), jsonout)
def run_complete(shell: Union[None, str]):
if not shell or shell in ["bash", "zsh"]:
os.system("register-python-argcomplete knotctl")
elif shell == "fish":
os.system("register-python-argcomplete --shell fish knotctl")
elif shell == "tcsh":
os.system("register-python-argcomplete --shell tcsh knotctl")
def run_config(
config_filename: str,
baseurl: Union[None, str] = None,
username: Union[None, str] = None,
password: Union[None, str] = None,
):
config = {"baseurl": baseurl, "username": username, "password": password}
needed = []
if not baseurl:
needed.append("baseurl")
if not username:
needed.append("username")
for need in needed:
if need == "":
output(
error("Can not configure without {}".format(need),
"No {}".format(need)))
sys.exit(1)
config[need] = input("Enter {}:".format(need))
if not password:
try:
config["password"] = getpass.getpass()
except EOFError:
output(error("Can not configure without password", "No password"))
sys.exit(1)
with open(config_filename, "w") as fh:
fh.write(yaml.dump(config))
def run_delete(url: str, jsonout: bool, headers: dict):
response = requests.delete(url, headers=headers)
reply = response.json()
if not reply and response.status_code == requests.codes.ok:
reply = [{"Code": 200, "Description": "{} deleted".format(url)}]
output(reply, jsonout)
def run_list(url: str, jsonout: bool, headers: dict):
response = requests.get(url, headers=headers)
output(response.json(), jsonout)
def run_update(url: str, jsonout: bool, headers: dict):
response = requests.patch(url, headers=headers)
output(response.json(), jsonout)
# Set up the url
def setup_url(
baseurl: str,
arguments: Union[None, list[str]],
data: Union[None, str],
name: Union[None, str],
rtype: Union[None, str],
ttl: Union[None, str],
zone: Union[None, str],
) -> str:
url = baseurl + "/zones"
if zone:
if not zone.endswith("."):
zone += "."
url += "/{}".format(zone)
if name and zone:
url += "/records/{}".format(name)
if zone and name and rtype:
url += "/{}".format(rtype)
if data and zone and name and rtype:
url += "/{}".format(data)
if ttl and data and zone and name and rtype:
url += "/{}".format(ttl)
if data and zone and name and rtype and arguments:
url += "?"
for arg in arguments:
if not url.endswith("?"):
url += "&"
key, value = arg.split("=")
url += key + "=" + urllib.parse.quote_plus(value)
if ttl and (not rtype or not name or not zone):
output(
error("ttl only makes sense with rtype, name and zone",
"Missing parameter"))
sys.exit(1)
if rtype and (not name or not zone):
output(
error("rtype only makes sense with name and zone",
"Missing parameter"))
sys.exit(1)
if name and not zone:
output(error("name only makes sense with a zone", "Missing parameter"))
sys.exit(1)
return url
# Entry point to program
def main() -> int:
# Grab user input
parser = argparse.ArgumentParser()
parser.add_argument("--json", action=argparse.BooleanOptionalAction)
subparsers = parser.add_subparsers(dest="command")
addcmd = subparsers.add_parser("add")
addcmd.add_argument("-d", "--data", required=True)
addcmd.add_argument("-n", "--name", required=True)
addcmd.add_argument("-r", "--rtype", required=True)
addcmd.add_argument("-t", "--ttl", required=True)
addcmd.add_argument("-z", "--zone", required=True)
completecmd = subparsers.add_parser("completion")
completecmd.add_argument("-s", "--shell")
configcmd = subparsers.add_parser("config")
configcmd.add_argument("-b", "--baseurl")
configcmd.add_argument("-p", "--password")
configcmd.add_argument("-u", "--username")
deletecmd = subparsers.add_parser("delete")
deletecmd.add_argument("-d", "--data")
deletecmd.add_argument("-n", "--name")
deletecmd.add_argument("-r", "--rtype")
deletecmd.add_argument("-z", "--zone", required=True)
listcmd = subparsers.add_parser("list")
listcmd.add_argument("-d", "--data")
listcmd.add_argument("-n", "--name")
listcmd.add_argument("-r", "--rtype")
listcmd.add_argument("-z", "--zone")
updatecmd = subparsers.add_parser("update")
updatecmd.add_argument(
"-a",
"--argument",
nargs="*",
help="Specify key - value pairs to be updated: name=dns1.example.com.",
required=True,
)
updatecmd.add_argument("-d", "--data", required=True)
updatecmd.add_argument("-n", "--name", required=True)
updatecmd.add_argument("-r", "--rtype", required=True)
updatecmd.add_argument("-t", "--ttl")
updatecmd.add_argument("-z", "--zone", required=True)
argcomplete.autocomplete(parser)
args = parser.parse_args()
if args.command == "completion":
run_complete(args.shell)
return 0
# Make sure we have config
config_basepath = join(environ["HOME"], ".knot")
config_filename = join(config_basepath, "config")
if not isdir(config_basepath):
mkdir(config_basepath)
if args.command == "config":
run_config(args.baseurl, args.username, args.password)
return 0
if not isfile(config_filename):
print("You need to configure knotctl before proceeding")
run_config(config_filename)
config = get_config(config_filename)
baseurl = config["baseurl"]
username = config["username"]
password = config["password"]
# Authenticate
basic = HTTPBasicAuth(username, password)
response = requests.get(baseurl + "/user/login", auth=basic)
try:
token = response.json()["token"]
except KeyError:
output(response.json())
return 1
headers = {"Authorization": "Bearer {}".format(token)}
# Route based on command
ttl = None
if 'ttl' in args:
ttl = args.ttl
url = setup_url(baseurl, args.argument, args.data, args.name, args.rtype,
ttl, args.zone)
try:
if args.command == "add":
run_add(url, args.json, headers)
elif args.command == "delete":
run_delete(url, args.json, headers)
elif args.command == "list":
run_list(url, args.json, headers)
elif args.command == "update":
run_update(url, args.json, headers)
except (RequestsJSONDecodeError, SimplejsonJSONDecodeError):
output(
error("Could not decode api response as JSON", "Could not decode"))
return 0
if __name__ == "__main__":
sys.exit(main())

View file

@ -1,26 +0,0 @@
import setuptools
with open("README.md", "r", encoding="utf-8") as fh:
long_description = fh.read()
setuptools.setup(
name="knotctl",
version="0.0.2",
packages = setuptools.find_packages(),
author="Micke Nordin",
author_email="hej@mic.ke",
description="A cli for knotapi.",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://code.smolnet.org/micke/knotctl",
project_urls={
"Bug Tracker": "https://code.smolnet.org/micke/knotctl/issues",
},
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: GPL-3.0",
"Operating System :: OS Independent",
],
python_requires=">=3.9",
scripts=["scripts/knotctl"],
)

200
src/knotctl/__init__.py Executable file
View file

@ -0,0 +1,200 @@
#!/usr/bin/env python3
import getpass
import os
import sys
from typing import Union
import requests
from simplejson.errors import JSONDecodeError as SimplejsonJSONDecodeError
from .config import Config
from .runners import Run
from .utils import error, get_parser, output, setup_url
try:
from requests.exceptions import JSONDecodeError as RequestsJSONDecodeError
except ImportError:
from requests.exceptions import InvalidJSONError as RequestsJSONDecodeError
class Knotctl:
def __init__(self):
self.conf = Config()
self.config = self.get_config()
self.config_filename = self.conf.config_filename
self.runner = Run()
def get_config(self):
config = self.conf.get_config()
if not config:
print("You need to configure knotctl before proceeding")
run_config()
return config
def run(self, url: str, args: dict, baseurl: str, parser: dict,
username: str):
try:
if args.command == "add":
self.runner.add(url, args.json)
elif args.command == "delete":
self.runner.delete(url, args.json)
elif args.command == "list":
self.runner.lister(url, args.json)
elif args.command == "update":
self.runner.update(url, args.json)
elif args.command == "user":
url = baseurl + f"/user/info/{username}"
self.runner.lister(url, args.json)
elif args.command == "auditlog":
url = baseurl + "/user/auditlog"
self.runner.log(url, args.json)
elif args.command == "changelog":
url = baseurl + f"/zones/changelog/{args.zone.rstrip('.')}"
self.runner.log(url, args.json)
elif args.command == "zone":
url = baseurl + "/zones"
self.runner.zone(url, args.json)
elif args.command == "openstack-sync":
self.runner.openstack_sync(args.cloud, args.name, args.zone,
baseurl, args.json)
else:
parser.print_help(sys.stderr)
return 2
except requests.exceptions.RequestException as e:
output(error(e, "Could not connect to server"))
except (RequestsJSONDecodeError, SimplejsonJSONDecodeError):
output(
error("Could not decode api response as JSON",
"Could not decode"))
return 0
def run_complete(shell: Union[None, str]):
if not shell or shell in ["bash", "zsh"]:
os.system("register-python-argcomplete knotctl")
elif shell == "fish":
os.system("register-python-argcomplete --shell fish knotctl")
elif shell == "tcsh":
os.system("register-python-argcomplete --shell tcsh knotctl")
def run_config(
context: Union[None, str] = None,
baseurl: Union[None, str] = None,
list_config: bool = False,
username: Union[None, str] = None,
password: Union[None, str] = None,
current: Union[None, str] = None,
):
conf = Config()
if current:
print(conf.get_current())
return
config = {"baseurl": baseurl, "username": username, "password": password}
needed = []
if context:
found = conf.set_context(context)
if found:
return
if list_config:
config_data = conf.get_config_data()
output(config_data)
return
if not baseurl:
needed.append("baseurl")
if not username:
needed.append("username")
for need in needed:
if need == "":
output(
error(
"Can not configure without {}".format(need),
"No {}".format(need),
))
sys.exit(1)
config[need] = input("Enter {}: ".format(need))
if not password:
try:
config["password"] = getpass.getpass()
except EOFError:
output(error("Can not configure without password", "No password"))
sys.exit(1)
conf.set_config(config)
# Entry point to program
def main() -> int:
parser = get_parser()
args = parser.parse_args()
if args.command == "completion":
run_complete(args.shell)
return 0
knotctl = Knotctl()
if args.command == "config":
run_config(
args.context,
args.baseurl,
args.list_config,
args.username,
args.password,
args.current,
)
return 0
config = knotctl.get_config()
baseurl = config["baseurl"]
token = knotctl.conf.get_token()
if token == "":
print("Could not get token, exiting")
return 1
# Route based on command
url = ""
ttl = None
user = config["username"]
if "ttl" in args:
ttl = args.ttl
if args.command != "update":
args.argument = None
if args.command == "add" and not ttl:
if args.zone.endswith("."):
zname = args.zone
else:
zname = args.zone + "."
soa_url = setup_url(baseurl, None, None, zname, "SOA", None, args.zone)
soa_json = knotctl.runner.lister(soa_url, True, ret=True)
ttl = soa_json[0]["ttl"]
if args.command == "user":
if args.username:
user = args.username
if args.command in [
"auditlog", "changelog", "openstack-sync", "user", "zone"
]:
pass
else:
try:
url = setup_url(
baseurl,
args.argument,
args.data,
args.name,
args.rtype,
ttl,
args.zone,
)
except AttributeError:
parser.print_help(sys.stderr)
return 1
return knotctl.run(url, args, baseurl, parser, user)
if __name__ == "__main__":
sys.exit(main())

4
src/knotctl/__main__.py Normal file
View file

@ -0,0 +1,4 @@
from knotctl import main
if __name__ == "__main__":
main()

84
src/knotctl/config/__init__.py Executable file
View file

@ -0,0 +1,84 @@
#!/usr/bin/env python3
import os
from os import mkdir
from os.path import isdir, isfile, join
from typing import Union
import requests
import yaml
from requests.models import HTTPBasicAuth
from ..utils import error, output
class Config:
def __init__(self):
# Make sure we have config
self.config_basepath = join(os.environ["HOME"], ".knot")
self.config_filename = join(self.config_basepath, "config")
if not isdir(self.config_basepath):
mkdir(self.config_basepath)
def get_config(self) -> Union[None, dict]:
if not isfile(self.config_filename):
return None
with open(self.config_filename, "r") as fh:
return yaml.safe_load(fh.read())
def get_config_data(self) -> dict:
config_data = self.get_config()
config_data.pop("password", None)
return config_data
def get_current(self) -> str:
if os.path.islink(self.config_filename):
actual_path = os.readlink(self.config_filename)
return actual_path.split("-")[-1]
else:
return "none"
def get_token(self) -> str:
# Authenticate
config = self.get_config()
baseurl = config["baseurl"]
username = config["username"]
password = config["password"]
basic = HTTPBasicAuth(username, password)
response = requests.get(baseurl + "/user/login", auth=basic)
token = ""
try:
token = response.json()["token"]
except KeyError:
output(response.json())
except requests.exceptions.JSONDecodeError:
output(
error("Could not decode api response as JSON",
"Could not decode"))
return token
def set_context(self, context) -> bool:
symlink = f"{self.config_filename}-{context}"
found = os.path.isfile(symlink)
if os.path.islink(self.config_filename):
os.remove(self.config_filename)
elif os.path.isfile(self.config_filename):
os.rename(self.config_filename, symlink)
os.symlink(symlink, self.config_filename)
self.config_filename = symlink
return found
def set_config(
self,
baseurl: str,
username: str,
password: str,
):
config = {
"baseurl": baseurl,
"username": username,
"password": password
}
with open(self.config_filename, "w") as fh:
fh.write(yaml.dump(config))

View file

@ -0,0 +1,17 @@
import openstack
import openstack.config.loader
def get_openstack_addresses(cloud: str, name: str):
conn = openstack.connect(cloud=cloud)
# List the servers
server = conn.compute.find_server(name)
if server is None:
print("Server not found")
exit(1)
openstack_addresses = []
for network in server.addresses:
for address in server.addresses[network]:
openstack_addresses.append(address)
return openstack_addresses

View file

@ -0,0 +1,219 @@
from typing import Union
import requests
from ..config import Config
from ..openstack import get_openstack_addresses
from ..utils import output, setup_url, split_url
class Run():
def __init__(self):
conf = Config()
self.headers = {"Authorization": f"Bearer {conf.get_token()}"}
def add(self, url: str, jsonout: bool):
parsed = split_url(url)
response = requests.put(url, headers=self.headers)
out = response.json()
if isinstance(out, list):
for record in out:
if (record["data"] == parsed["data"]
and record["name"] == parsed["name"]
and record["rtype"] == parsed["rtype"]):
output(record, jsonout)
break
else:
output(out, jsonout)
def delete(self, url: str, jsonout: bool):
response = requests.delete(url, headers=self.headers)
reply = response.json()
if not reply and response.status_code == requests.codes.ok:
reply = [{"Code": 200, "Description": "{} deleted".format(url)}]
output(reply, jsonout)
def log(self, url: str, jsonout: bool):
response = requests.get(url, headers=self.headers)
string = response.content.decode("utf-8")
if jsonout:
out = []
lines = string.splitlines()
index = 0
text = ""
timestamp = ""
while index < len(lines):
line = lines[index]
index += 1
cur_has_timestamp = line.startswith("[")
next_has_timestamp = index < len(
lines) and lines[index].startswith("[")
# Simple case, just one line with timestamp
if cur_has_timestamp and next_has_timestamp:
timestamp = line.split("]")[0].split("[")[1]
text = line.split("]")[1].lstrip(":").strip()
out.append({"timestamp": timestamp, "text": text})
text = ""
timestamp = ""
# Start of multiline
elif cur_has_timestamp:
timestamp = line.split("]")[0].split("[")[1]
text = line.split("]")[1].lstrip(":").strip()
# End of multiline
elif next_has_timestamp:
text += f"\n{line.strip()}"
out.append({"timestamp": timestamp, "text": text})
text = ""
timestamp = ""
# Middle of multiline
else:
text += f"\n{line.strip()}"
else:
out = string
output(out, jsonout)
def update(self, url: str, jsonout: bool):
response = requests.patch(url, headers=self.headers)
output(response.json(), jsonout)
def zone(self, url: str, jsonout: bool, ret=False) -> Union[None, str]:
response = requests.get(url, headers=self.headers)
zones = response.json()
for zone in zones:
del zone["records"]
string = zones
if ret:
return string
else:
output(string, jsonout)
def lister(self, url: str, jsonout: bool, ret=False) -> Union[None, str]:
response = requests.get(url, headers=self.headers)
string = response.json()
if ret:
return string
else:
output(string, jsonout)
def add_records(self, openstack_addresses, baseurl, name, zone, url,
jsonout):
for address in openstack_addresses:
rtype = None
if address["version"] == 4:
rtype = "A"
elif address["version"] == 6:
rtype = "AAAA"
if rtype:
url = setup_url(
baseurl,
None, # arguments,
address["addr"], # data,
name,
rtype,
None, # ttl,
zone,
)
self.add(url, jsonout)
def update_records(self, openstack_addresses, current_records, baseurl,
name, zone, url, jsonout):
previpv4 = False
previpv6 = False
curripv4 = False
curripv6 = False
for record in current_records:
if record.type == "A":
previpv4 = record.data
elif record.type == "AAAA":
previpv6 = record.data
for address in openstack_addresses:
rtype = None
if address.version == 4:
rtype = "A"
curripv4 = True
elif address.version == 6:
rtype = "AAAA"
curripv6 = True
if rtype and record.type == rtype:
if record.data == address.addr:
continue
else:
url = setup_url(
baseurl,
None, # arguments,
address.addr, # data,
name,
record.type,
None, # ttl,
zone,
)
self.update(url, jsonout)
if previpv4 and not curripv4:
url = setup_url(
baseurl,
None, # arguments,
previpv4, # data,
name,
"A",
None, # ttl,
zone,
)
self.delete(url, jsonout)
if previpv6 and not curripv6:
url = setup_url(
baseurl,
None, # arguments,
previpv6, # data,
name,
"AAAA",
None, # ttl,
zone,
)
self.delete(url, jsonout)
if curripv4 and not previpv4:
url = setup_url(
baseurl,
None, # arguments,
curripv4, # data,
name,
"A",
None, # ttl,
zone,
)
self.add(url, jsonout)
if curripv6 and not previpv6:
url = setup_url(
baseurl,
None, # arguments,
curripv6, # data,
name,
"AAAA",
None, # ttl,
zone,
)
self.add(url, jsonout)
def openstack_sync(self, cloud: str, name: str, zone: str, baseurl: str,
jsonout: bool):
url = setup_url(
baseurl,
None, # arguments,
None, # data,
name,
None, # rtype,
None, # ttl,
zone,
)
current_records = self.lister(url, jsonout=True, ret=True)
openstack_addresses = get_openstack_addresses(cloud, name)
if current_records["Code"] == 404:
self.add_records(openstack_addresses, baseurl, name, zone, url,
jsonout)
else:
self.update_records(openstack_addresses, current_records, baseurl,
name, zone, url, jsonout)

View file

@ -0,0 +1,282 @@
import argparse
import argcomplete
import json
import sys
import urllib.parse
from typing import Union
from urllib.parse import urlparse
def get_parser() -> dict:
description = """Manage DNS records with knot dns rest api:
* https://gitlab.nic.cz/knot/knot-dns-rest"""
epilog = """
The Domain Name System specifies a database of information
elements for network resources. The types of information
elements are categorized and organized with a list of DNS
record types, the resource records (RRs). Each record has a
name, a type, an expiration time (time to live), and
type-specific data.
The following is a list of terms used in this program:
----------------------------------------------------------------
| Vocabulary | Description |
----------------------------------------------------------------
| zone | A DNS zone is a specific portion of the DNS |
| | namespace in the Domain Name System (DNS), |
| | which a specific organization or administrator |
| | manages. |
----------------------------------------------------------------
| name | In the Internet, a domain name is a string that |
| | identifies a realm of administrative autonomy, |
| | authority or control. Domain names are often |
| | used to identify services provided through the |
| | Internet, such as websites, email services and |
| | more. |
----------------------------------------------------------------
| rtype | A record type indicates the format of the data |
| | and it gives a hint of its intended use. For |
| | example, the A record is used to translate from |
| | a domain name to an IPv4 address, the NS record |
| | lists which name servers can answer lookups on |
| | a DNS zone, and the MX record specifies the |
| | mail server used to handle mail for a domain |
| | specified in an e-mail address. |
----------------------------------------------------------------
| data | A records data is of type-specific relevance, |
| | such as the IP address for address records, or |
| | the priority and hostname for MX records. |
----------------------------------------------------------------
This information was compiled from Wikipedia:
* https://en.wikipedia.org/wiki/DNS_zone
* https://en.wikipedia.org/wiki/Domain_Name_System
* https://en.wikipedia.org/wiki/Zone_file
"""
# Grab user input
parser = argparse.ArgumentParser(
description=description,
epilog=epilog,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("--json", action=argparse.BooleanOptionalAction)
subparsers = parser.add_subparsers(dest="command")
add_description = "Add a new record to the zone."
addcmd = subparsers.add_parser("add", description=add_description)
addcmd.add_argument("-d", "--data", required=True)
addcmd.add_argument("-n", "--name", required=True)
addcmd.add_argument("-r", "--rtype", required=True)
addcmd.add_argument("-t", "--ttl")
addcmd.add_argument("-z", "--zone", required=True)
auditlog_description = "Audit the log file for errors."
subparsers.add_parser("auditlog", description=auditlog_description)
changelog_description = "View the changelog of a zone."
changelogcmd = subparsers.add_parser("changelog",
description=changelog_description)
changelogcmd.add_argument("-z", "--zone", required=True)
complete_description = "Generate shell completion script."
completecmd = subparsers.add_parser("completion",
description=complete_description)
completecmd.add_argument("-s", "--shell")
config_description = "Configure access to knot-dns-rest-api."
configcmd = subparsers.add_parser("config", description=config_description)
configcmd.add_argument("-b", "--baseurl")