Split out runners

kano-refactor-lib
Micke Nordin 2 weeks ago
parent d1aec6bbb0
commit a43df8f86c
Signed by: micke
GPG Key ID: 0DA0A7A5708FE257

@ -1,20 +1,16 @@
#!/usr/bin/env python3
import argparse
import getpass
import os
import sys
from typing import Union
import argcomplete
import requests
from requests.models import HTTPBasicAuth
from simplejson.errors import JSONDecodeError as SimplejsonJSONDecodeError
from .config import Config
from .openstack import get_openstack_addresses
from .utils import error, output, setup_url, split_url
from .runners import Run
from .utils import error, get_parser, output, setup_url
try:
from requests.exceptions import JSONDecodeError as RequestsJSONDecodeError
@ -28,6 +24,7 @@ class Knotctl:
self.conf = Config()
self.config = self.get_config()
self.config_filename = self.conf.config_filename
self.runner = Run()
def get_config(self):
config = self.conf.get_config()
@ -37,63 +34,42 @@ class Knotctl:
return config
# Define the runner for each command
def run_add(url: str, jsonout: bool, headers: dict):
parsed = split_url(url)
response = requests.put(url, headers=headers)
out = response.json()
if isinstance(out, list):
for record in out:
if (record["data"] == parsed["data"]
and record["name"] == parsed["name"]
and record["rtype"] == parsed["rtype"]):
output(record, jsonout)
break
else:
output(out, jsonout)
def run_log(url: str, jsonout: bool, headers: dict):
response = requests.get(url, headers=headers)
string = response.content.decode("utf-8")
if jsonout:
out = []
lines = string.splitlines()
index = 0
text = ""
timestamp = ""
while index < len(lines):
line = lines[index]
index += 1
cur_has_timestamp = line.startswith("[")
next_has_timestamp = index < len(
lines) and lines[index].startswith("[")
# Simple case, just one line with timestamp
if cur_has_timestamp and next_has_timestamp:
timestamp = line.split("]")[0].split("[")[1]
text = line.split("]")[1].lstrip(":").strip()
out.append({"timestamp": timestamp, "text": text})
text = ""
timestamp = ""
# Start of multiline
elif cur_has_timestamp:
timestamp = line.split("]")[0].split("[")[1]
text = line.split("]")[1].lstrip(":").strip()
# End of multiline
elif next_has_timestamp:
text += f"\n{line.strip()}"
out.append({"timestamp": timestamp, "text": text})
text = ""
timestamp = ""
# Middle of multiline
else:
text += f"\n{line.strip()}"
def run(self, url: str, args: dict, baseurl: str, parser: dict,
username: str):
try:
if args.command == "add":
self.runner.add(url, args.json)
elif args.command == "delete":
self.runner.delete(url, args.json)
elif args.command == "list":
self.runner.lister(url, args.json)
elif args.command == "update":
self.runner.update(url, args.json)
elif args.command == "user":
url = baseurl + f"/user/info/{username}"
self.runner.lister(url, args.json)
elif args.command == "auditlog":
url = baseurl + "/user/auditlog"
self.runner.log(url, args.json)
elif args.command == "changelog":
url = baseurl + f"/zones/changelog/{args.zone.rstrip('.')}"
self.runner.log(url, args.json)
elif args.command == "zone":
url = baseurl + "/zones"
self.runner.zone(url, args.json)
elif args.command == "openstack-sync":
self.runner.openstack_sync(args.cloud, args.name, args.zone,
baseurl, args.json)
else:
out = string
output(out, jsonout)
parser.print_help(sys.stderr)
return 2
except requests.exceptions.RequestException as e:
output(error(e, "Could not connect to server"))
except (RequestsJSONDecodeError, SimplejsonJSONDecodeError):
output(
error("Could not decode api response as JSON",
"Could not decode"))
return 0
def run_complete(shell: Union[None, str]):
@ -151,368 +127,6 @@ def run_config(
conf.set_config(config)
def run_delete(url: str, jsonout: bool, headers: dict):
response = requests.delete(url, headers=headers)
reply = response.json()
if not reply and response.status_code == requests.codes.ok:
reply = [{"Code": 200, "Description": "{} deleted".format(url)}]
output(reply, jsonout)
def run_list(url: str,
jsonout: bool,
headers: dict,
ret=False) -> Union[None, str]:
response = requests.get(url, headers=headers)
string = response.json()
if ret:
return string
else:
output(string, jsonout)
def run_openstack_sync(cloud: str, name: str, zone: str, headers: dict,
baseurl: str, jsonout: bool):
url = setup_url(
baseurl,
None, # arguments,
None, # data,
name,
None, # rtype,
None, # ttl,
zone,
)
current_records = run_list(url, jsonout=True, headers=headers, ret=True)
openstack_addresses = get_openstack_addresses(cloud, name)
if current_records["Code"] == 404:
for address in openstack_addresses:
rtype = None
if address["version"] == 4:
rtype = "A"
elif address["version"] == 6:
rtype = "AAAA"
if rtype:
url = setup_url(
baseurl,
None, # arguments,
address["addr"], # data,
name,
rtype,
None, # ttl,
zone,
)
run_add(url, jsonout, headers)
else:
previpv4 = False
previpv6 = False
curripv4 = False
curripv6 = False
for record in current_records:
if record.type == "A":
previpv4 = record.data
elif record.type == "AAAA":
previpv6 = record.data
for address in openstack_addresses:
rtype = None
if address.version == 4:
rtype = "A"
curripv4 = True
elif address.version == 6:
rtype = "AAAA"
curripv6 = True
if rtype and record.type == rtype:
if record.data == address.addr:
continue
else:
url = setup_url(
baseurl,
None, # arguments,
address.addr, # data,
name,
record.type,
None, # ttl,
zone,
)
run_update(url, jsonout, headers)
if previpv4 and not curripv4:
url = setup_url(
baseurl,
None, # arguments,
previpv4, # data,
name,
"A",
None, # ttl,
zone,
)
run_delete(url, jsonout, headers)
if previpv6 and not curripv6:
url = setup_url(
baseurl,
None, # arguments,
previpv6, # data,
name,
"AAAA",
None, # ttl,
zone,
)
run_delete(url, jsonout, headers)
if curripv4 and not previpv4:
url = setup_url(
baseurl,
None, # arguments,
curripv4, # data,
name,
"A",
None, # ttl,
zone,
)
run_add(url, jsonout, headers)
if curripv6 and not previpv6:
url = setup_url(
baseurl,
None, # arguments,
curripv6, # data,
name,
"AAAA",
None, # ttl,
zone,
)
run_add(url, jsonout, headers)
def run_update(url: str, jsonout: bool, headers: dict):
response = requests.patch(url, headers=headers)
output(response.json(), jsonout)
def run_zone(url: str,
jsonout: bool,
headers: dict,
ret=False) -> Union[None, str]:
response = requests.get(url, headers=headers)
zones = response.json()
for zone in zones:
del zone["records"]
string = zones
if ret:
return string
else:
output(string, jsonout)
def get_parser() -> dict:
description = """Manage DNS records with knot dns rest api:
* https://gitlab.nic.cz/knot/knot-dns-rest"""
epilog = """
The Domain Name System specifies a database of information
elements for network resources. The types of information
elements are categorized and organized with a list of DNS
record types, the resource records (RRs). Each record has a
name, a type, an expiration time (time to live), and
type-specific data.
The following is a list of terms used in this program:
----------------------------------------------------------------
| Vocabulary | Description |
----------------------------------------------------------------
| zone | A DNS zone is a specific portion of the DNS |
| | namespace in the Domain Name System (DNS), |
| | which a specific organization or administrator |
| | manages. |
----------------------------------------------------------------
| name | In the Internet, a domain name is a string that |
| | identifies a realm of administrative autonomy, |
| | authority or control. Domain names are often |
| | used to identify services provided through the |
| | Internet, such as websites, email services and |
| | more. |
----------------------------------------------------------------
| rtype | A record type indicates the format of the data |
| | and it gives a hint of its intended use. For |
| | example, the A record is used to translate from |
| | a domain name to an IPv4 address, the NS record |
| | lists which name servers can answer lookups on |
| | a DNS zone, and the MX record specifies the |
| | mail server used to handle mail for a domain |
| | specified in an e-mail address. |
----------------------------------------------------------------
| data | A records data is of type-specific relevance, |
| | such as the IP address for address records, or |
| | the priority and hostname for MX records. |
----------------------------------------------------------------
This information was compiled from Wikipedia:
* https://en.wikipedia.org/wiki/DNS_zone
* https://en.wikipedia.org/wiki/Domain_Name_System
* https://en.wikipedia.org/wiki/Zone_file
"""
# Grab user input
parser = argparse.ArgumentParser(
description=description,
epilog=epilog,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("--json", action=argparse.BooleanOptionalAction)
subparsers = parser.add_subparsers(dest="command")
add_description = "Add a new record to the zone."
addcmd = subparsers.add_parser("add", description=add_description)
addcmd.add_argument("-d", "--data", required=True)
addcmd.add_argument("-n", "--name", required=True)
addcmd.add_argument("-r", "--rtype", required=True)
addcmd.add_argument("-t", "--ttl")
addcmd.add_argument("-z", "--zone", required=True)
auditlog_description = "Audit the log file for errors."
subparsers.add_parser("auditlog", description=auditlog_description)
changelog_description = "View the changelog of a zone."
changelogcmd = subparsers.add_parser("changelog",
description=changelog_description)
changelogcmd.add_argument("-z", "--zone", required=True)
complete_description = "Generate shell completion script."
completecmd = subparsers.add_parser("completion",
description=complete_description)
completecmd.add_argument("-s", "--shell")
config_description = "Configure access to knot-dns-rest-api."
configcmd = subparsers.add_parser("config", description=config_description)
configcmd.add_argument("-b", "--baseurl")
configcmd.add_argument("-c", "--context")
configcmd.add_argument("-C",
"--current",
action=argparse.BooleanOptionalAction)
configcmd.add_argument("-l",
"--list",
action=argparse.BooleanOptionalAction,
dest="list_config")
configcmd.add_argument("-p", "--password")
configcmd.add_argument("-u", "--username")
delete_description = "Delete a record from the zone."
deletecmd = subparsers.add_parser("delete", description=delete_description)
deletecmd.add_argument("-d", "--data")
deletecmd.add_argument("-n", "--name")
deletecmd.add_argument("-r", "--rtype")
deletecmd.add_argument("-z", "--zone", required=True)
list_description = "List records."
listcmd = subparsers.add_parser("list", description=list_description)
listcmd.add_argument("-d", "--data")
listcmd.add_argument("-n", "--name")
listcmd.add_argument("-r", "--rtype")
listcmd.add_argument("-z", "--zone", required=False)
openstack_description = "Sync records with openstack."
openstackcmd = subparsers.add_parser("openstack-sync",
description=openstack_description)
openstackcmd.add_argument("-n", "--name", required=True)
openstackcmd.add_argument("-c", "--cloud", required=True)
openstackcmd.add_argument("-z", "--zone", required=True)
user_description = "View user information."
usercmd = subparsers.add_parser("user", description=user_description)
usercmd.add_argument("-u", "--username", default=None)
update_description = (
"Update a record in the zone. The record must exist in the zone.\n")
update_description += (
"In this case --data, --name, --rtype and --ttl switches are used\n")
update_description += (
"for searching for the appropriate record, while the --argument\n")
update_description += "switches are used for updating the record."
update_epilog = """Available arguments are:
data: New record data.
name: New record domain name.
rtype: New record type.
ttl: New record time to live (TTL)."""
updatecmd = subparsers.add_parser(
"update",
description=update_description,
epilog=update_epilog,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
updatecmd.add_argument(
"-a",
"--argument",
action="append",
metavar="KEY=VALUE",
help="Specify key - value pairs to be updated: name=dns1.example.com."
+ " or data=127.0.0.1 for example. --argument can be repeated",
required=True,
)
updatecmd.add_argument("-d", "--data", required=True)
updatecmd.add_argument("-n", "--name", required=True)
updatecmd.add_argument("-r", "--rtype", required=True)
updatecmd.add_argument("-t", "--ttl")
updatecmd.add_argument("-z", "--zone", required=True)
zone_description = "View zones."
subparsers.add_parser("zone", description=zone_description)
argcomplete.autocomplete(parser)
return parser
def get_token(config) -> str:
# Authenticate
baseurl = config["baseurl"]
username = config["username"]
password = config["password"]
basic = HTTPBasicAuth(username, password)
response = requests.get(baseurl + "/user/login", auth=basic)
token = ""
try:
token = response.json()["token"]
except KeyError:
output(response.json())
except requests.exceptions.JSONDecodeError:
output(
error("Could not decode api response as JSON", "Could not decode"))
return token
def run(url, args, headers, baseurl, parser, username):
try:
if args.command == "add":
run_add(url, args.json, headers)
elif args.command == "delete":
run_delete(url, args.json, headers)
elif args.command == "list":
run_list(url, args.json, headers)
elif args.command == "update":
run_update(url, args.json, headers)
elif args.command == "user":
url = baseurl + f"/user/info/{username}"
run_list(url, args.json, headers)
elif args.command == "auditlog":
url = baseurl + "/user/auditlog"
run_log(url, args.json, headers)
elif args.command == "changelog":
url = baseurl + f"/zones/changelog/{args.zone.rstrip('.')}"
run_log(url, args.json, headers)
elif args.command == "zone":
url = baseurl + "/zones"
run_zone(url, args.json, headers)
elif args.command == "openstack-sync":
run_openstack_sync(args.cloud, args.name, args.zone, headers,
baseurl, args.json)
else:
parser.print_help(sys.stderr)
return 2
except requests.exceptions.RequestException as e:
output(error(e, "Could not connect to server"))
except (RequestsJSONDecodeError, SimplejsonJSONDecodeError):
output(
error("Could not decode api response as JSON", "Could not decode"))
return 0
# Entry point to program
def main() -> int:
parser = get_parser()
@ -536,11 +150,10 @@ def main() -> int:
config = knotctl.get_config()
baseurl = config["baseurl"]
token = get_token(config)
token = knotctl.conf.get_token()
if token == "":
print("Could not get token, exiting")
return 1
headers = {"Authorization": "Bearer {}".format(token)}
# Route based on command
url = ""
@ -556,7 +169,7 @@ def main() -> int:
else:
zname = args.zone + "."
soa_url = setup_url(baseurl, None, None, zname, "SOA", None, args.zone)
soa_json = run_list(soa_url, True, headers, ret=True)
soa_json = knotctl.runner.lister(soa_url, True, ret=True)
ttl = soa_json[0]["ttl"]
if args.command == "user":
if args.username:
@ -580,7 +193,7 @@ def main() -> int:
parser.print_help(sys.stderr)
return 1
return run(url, args, headers, baseurl, parser, user)
return knotctl.run(url, args, baseurl, parser, user)
if __name__ == "__main__":

@ -4,7 +4,11 @@ from os import mkdir
from os.path import isdir, isfile, join
from typing import Union
import requests
import yaml
from requests.models import HTTPBasicAuth
from ..utils import error, output
class Config:
@ -34,6 +38,25 @@ class Config:
else:
return "none"
def get_token(self) -> str:
# Authenticate
config = self.get_config()
baseurl = config["baseurl"]
username = config["username"]
password = config["password"]
basic = HTTPBasicAuth(username, password)
response = requests.get(baseurl + "/user/login", auth=basic)
token = ""
try:
token = response.json()["token"]
except KeyError:
output(response.json())
except requests.exceptions.JSONDecodeError:
output(
error("Could not decode api response as JSON",
"Could not decode"))
return token
def set_context(self, context) -> bool:
symlink = f"{self.config_filename}-{context}"
found = os.path.isfile(symlink)

@ -0,0 +1,219 @@
from typing import Union
import requests
from ..config import Config
from ..openstack import get_openstack_addresses
from ..utils import output, setup_url, split_url
class Run():
def __init__(self):
conf = Config()
self.headers = {"Authorization": f"Bearer {conf.get_token()}"}
def add(self, url: str, jsonout: bool):
parsed = split_url(url)
response = requests.put(url, headers=self.headers)
out = response.json()
if isinstance(out, list):
for record in out:
if (record["data"] == parsed["data"]
and record["name"] == parsed["name"]
and record["rtype"] == parsed["rtype"]):
output(record, jsonout)
break
else:
output(out, jsonout)
def delete(self, url: str, jsonout: bool):
response = requests.delete(url, headers=self.headers)
reply = response.json()
if not reply and response.status_code == requests.codes.ok:
reply = [{"Code": 200, "Description": "{} deleted".format(url)}]
output(reply, jsonout)
def log(self, url: str, jsonout: bool):
response = requests.get(url, headers=self.headers)
string = response.content.decode("utf-8")
if jsonout:
out = []
lines = string.splitlines()
index = 0
text = ""
timestamp = ""
while index < len(lines):
line = lines[index]
index += 1
cur_has_timestamp = line.startswith("[")
next_has_timestamp = index < len(
lines) and lines[index].startswith("[")
# Simple case, just one line with timestamp
if cur_has_timestamp and next_has_timestamp:
timestamp = line.split("]")[0].split("[")[1]
text = line.split("]")[1].lstrip(":").strip()
out.append({"timestamp": timestamp, "text": text})
text = ""
timestamp = ""
# Start of multiline
elif cur_has_timestamp:
timestamp = line.split("]")[0].split("[")[1]
text = line.split("]")[1].lstrip(":").strip()
# End of multiline
elif next_has_timestamp:
text += f"\n{line.strip()}"
out.append({"timestamp": timestamp, "text": text})
text = ""
timestamp = ""
# Middle of multiline
else:
text += f"\n{line.strip()}"
else:
out = string
output(out, jsonout)
def update(self, url: str, jsonout: bool):
response = requests.patch(url, headers=self.headers)
output(response.json(), jsonout)
def zone(self, url: str, jsonout: bool, ret=False) -> Union[None, str]:
response = requests.get(url, headers=self.headers)
zones = response.json()
for zone in zones:
del zone["records"]
string = zones
if ret:
return string
else:
output(string, jsonout)
def lister(self, url: str, jsonout: bool, ret=False) -> Union[None, str]:
response = requests.get(url, headers=self.headers)
string = response.json()
if ret:
return string
else:
output(string, jsonout)
def add_records(self, openstack_addresses, baseurl, name, zone, url,
jsonout):
for address in openstack_addresses:
rtype = None
if address["version"] == 4:
rtype = "A"
elif address["version"] == 6:
rtype = "AAAA"
if rtype:
url = setup_url(
baseurl,
None, # arguments,
address["addr"], # data,
name,
rtype,
None, # ttl,
zone,
)
self.add(url, jsonout)
def update_records(self, openstack_addresses, current_records, baseurl,
name, zone, url, jsonout):
previpv4 = False
previpv6 = False
curripv4 = False
curripv6 = False
for record in current_records:
if record.type == "A":
previpv4 = record.data
elif record.type == "AAAA":
previpv6 = record.data
for address in openstack_addresses:
rtype = None
if address.version == 4:
rtype = "A"
curripv4 = True
elif address.version == 6:
rtype = "AAAA"
curripv6 = True
if rtype and record.type == rtype:
if record.data == address.addr:
continue
else:
url = setup_url(
baseurl,
None, # arguments,
address.addr, # data,
name,
record.type,
None, # ttl,
zone,
)
self.update(url, jsonout)
if previpv4 and not curripv4:
url = setup_url(
baseurl,
None, # arguments,
previpv4, # data,
name,
"A",
None, # ttl,
zone,
)
self.delete(url, jsonout)
if previpv6 and not curripv6:
url = setup_url(
baseurl,
None, # arguments,
previpv6, # data,
name,
"AAAA",
None, # ttl,
zone,
)
self.delete(url, jsonout)
if curripv4 and not previpv4:
url = setup_url(
baseurl,
None, # arguments,
curripv4, # data,
name,
"A",
None, # ttl,
zone,
)
self.add(url, jsonout)
if curripv6 and not previpv6:
url = setup_url(
baseurl,
None, # arguments,
curripv6, # data,
name,
"AAAA",
None, # ttl,
zone,
)
self.add(url, jsonout)
def openstack_sync(self, cloud: str, name: str, zone: str, baseurl: str,
jsonout: bool):
url = setup_url(
baseurl,
None, # arguments,
None, # data,
name,
None, # rtype,
None, # ttl,
zone,
)
current_records = self.lister(url, jsonout=True, ret=True)
openstack_addresses = get_openstack_addresses(cloud, name)
if current_records["Code"] == 404:
self.add_records(openstack_addresses, baseurl, name, zone, url,
jsonout)
else:
self.update_records(openstack_addresses, current_records, baseurl,
name, zone, url, jsonout)

@ -1,3 +1,5 @@
import argparse
import argcomplete
import json
import sys
import urllib.parse
@ -5,6 +7,163 @@ from typing import Union
from urllib.parse import urlparse
def get_parser() -> dict:
description = """Manage DNS records with knot dns rest api:
* https://gitlab.nic.cz/knot/knot-dns-rest"""
epilog = """
The Domain Name System specifies a database of information
elements for network resources. The types of information
elements are categorized and organized with a list of DNS
record types, the resource records (RRs). Each record has a
name, a type, an expiration time (time to live), and
type-specific data.
The following is a list of terms used in this program:
----------------------------------------------------------------
| Vocabulary | Description |
----------------------------------------------------------------
| zone | A DNS zone is a specific portion of the DNS |
| | namespace in the Domain Name System (DNS), |
| | which a specific organization or administrator |
| | manages. |
----------------------------------------------------------------
| name | In the Internet, a domain name is a string that |
| | identifies a realm of administrative autonomy, |
| | authority or control. Domain names are often |
| | used to identify services provided through the |
| | Internet, such as websites, email services and |
| | more. |
----------------------------------------------------------------
| rtype | A record type indicates the format of the data |
| | and it gives a hint of its intended use. For |
| | example, the A record is used to translate from |
| | a domain name to an IPv4 address, the NS record |
| | lists which name servers can answer lookups on |
| | a DNS zone, and the MX record specifies the |
| | mail server used to handle mail for a domain |
| | specified in an e-mail address. |
----------------------------------------------------------------
| data | A records data is of type-specific relevance, |
| | such as the IP address for address records, or |
| | the priority and hostname for MX records. |
----------------------------------------------------------------
This information was compiled from Wikipedia:
* https://en.wikipedia.org/wiki/DNS_zone
* https://en.wikipedia.org/wiki/Domain_Name_System
* https://en.wikipedia.org/wiki/Zone_file
"""
# Grab user input
parser = argparse.ArgumentParser(
description=description,
epilog=epilog,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("--json", action=argparse.BooleanOptionalAction)
subparsers = parser.add_subparsers(dest="command")
add_description = "Add a new record to the zone."
addcmd = subparsers.add_parser("add", description=add_description)
addcmd.add_argument("-d", "--data", required=True)
addcmd.add_argument("-n", "--name", required=True)
addcmd.add_argument("-r", "--rtype", required=True)
addcmd.add_argument("-t", "--ttl")
addcmd.add_argument("-z", "--zone", required=True)
auditlog_description = "Audit the log file for errors."
subparsers.add_parser("auditlog", description=auditlog_description)
changelog_description = "View the changelog of a zone."
changelogcmd = subparsers.add_parser("changelog",
description=changelog_description)
changelogcmd.add_argument("-z", "--zone", required=True)
complete_description = "Generate shell completion script."
completecmd = subparsers.add_parser("completion",
description=complete_description)
completecmd.add_argument("-s", "--shell")
config_description = "Configure access to knot-dns-rest-api."
configcmd = subparsers.add_parser("config", description=config_description)
configcmd.add_argument("-b", "--baseurl")
configcmd.add_argument("-c", "--context")
configcmd.add_argument("-C",
"--current",
action=argparse.BooleanOptionalAction)
configcmd.add_argument("-l",
"--list",
action=argparse.BooleanOptionalAction,
dest="list_config")
configcmd.add_argument("-p", "--password")
configcmd.add_argument("-u", "--username")
delete_description = "Delete a record from the zone."
deletecmd = subparsers.add_parser("delete", description=delete_description)
deletecmd.add_argument("-d", "--data")
deletecmd.add_argument("-n", "--name")
deletecmd.add_argument("-r", "--rtype")
deletecmd.add_argument("-z", "--zone", required=True)
list_description = "List records."
listcmd = subparsers.add_parser("list", description=list_description)
listcmd.add_argument("-d", "--data")
listcmd.add_argument("-n", "--name")
listcmd.add_argument("-r", "--rtype")
listcmd.add_argument("-z", "--zone", required=False)
openstack_description = "Sync records with openstack."
openstackcmd = subparsers.add_parser("openstack-sync",
description=openstack_description)
openstackcmd.add_argument("-n", "--name", required=True)
openstackcmd.add_argument("-c", "--cloud", required=True)
openstackcmd.add_argument("-z", "--zone", required=True)
user_description = "View user information."
usercmd = subparsers.add_parser("user", description=user_description)
usercmd.add_argument("-u", "--username", default=None)
update_description = (
"Update a record in the zone. The record must exist in the zone.\n")
update_description += (
"In this case --data, --name, --rtype and --ttl switches are used\n")
update_description += (
"for searching for the appropriate record, while the --argument\n")
update_description += "switches are used for updating the record."
update_epilog = """Available arguments are:
data: New record data.
name: New record domain name.
rtype: New record type.
ttl: New record time to live (TTL)."""
updatecmd = subparsers.add_parser(
"update",
description=update_description,
epilog=update_epilog,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
updatecmd.add_argument(
"-a",
"--argument",
action="append",
metavar="KEY=VALUE",
help="Specify key - value pairs to be updated: name=dns1.example.com."
+ " or data=127.0.0.1 for example. --argument can be repeated",
required=True,
)
updatecmd.add_argument("-d", "--data", required=True)
updatecmd.add_argument("-n", "--name", required=True)
updatecmd.add_argument("-r", "--rtype", required=True)
updatecmd.add_argument("-t", "--ttl")
updatecmd.add_argument("-z", "--zone", required=True)
zone_description = "View zones."
subparsers.add_parser("zone", description=zone_description)
argcomplete.autocomplete(parser)
return parser
def error(description: str, error: str):
response = []
reply = {}

Loading…
Cancel
Save