"""Query the DSN server using the package “dnspython”."""
from __future__ import annotations
import typing
import dns.exception
import dns.name
import dns.query
import dns.resolver
import dns.tsig
import dns.tsigkeyring
import dns.update
from dyndns.exceptions import DNSServerError, DyndnsError
from dyndns.ipaddresses import IpAddressContainer
from dyndns.log import UpdatesDB
from dyndns.names import Names
from dyndns.types import IpVersion, RecordType, UpdateRecord
[docs]
class DnsUpdate:
"""
Update the DNS server
"""
nameserver: str
"""for example ``127.0.0.1``"""
names: Names
ipaddresses: IpAddressContainer | None
ttl: int
def __init__(
self,
nameserver: str,
names: Names,
ipaddresses: IpAddressContainer | None = None,
ttl: str | int | None = None,
) -> None:
self.nameserver = nameserver #: The nameserver
self.names = names
self.ipaddresses = ipaddresses
if not ttl:
self.ttl = 300
else:
self.ttl = int(ttl)
self._tsigkeyring = self._build_tsigkeyring(
self.names.zone_name,
self.names.tsig_key,
)
self._dns_update: dns.update.Update = dns.update.Update(
self.names.zone_name,
keyring=self._tsigkeyring,
keyalgorithm=dns.tsig.HMAC_SHA512,
)
self._updates_db = UpdatesDB()
self.log_update = self._updates_db.log_update
@staticmethod
def _build_tsigkeyring(zone_name: str, tsig_key: str):
"""
:param zone: A zone name object
:param tsig_key: A TSIG key
"""
keyring: dict[str, str] = {}
keyring[zone_name] = tsig_key
return dns.tsigkeyring.from_text(keyring)
@staticmethod
def _convert_record_type(ip_version: IpVersion = 4) -> RecordType:
if ip_version == 4:
return "a"
elif ip_version == 6:
return "aaaa"
else:
raise ValueError("“ip_version” must be 4 or 6")
def _resolve(self, ip_version: IpVersion = 4):
resolver = dns.resolver.Resolver()
resolver.nameservers = [self.nameserver]
try:
ip: dns.resolver.Answer = resolver.query(
self.names.fqdn,
self._convert_record_type(ip_version),
)
return str(ip[0])
except dns.exception.DNSException:
return ""
def _query_tcp(self, dns_update: dns.update.Update) -> None:
"""Catch some errors and convert this errors to dyndns specific
errors."""
try:
dns.query.tcp(dns_update, where=self.nameserver, timeout=5)
except dns.tsig.PeerBadKey:
raise DNSServerError(
'The peer "{}" didn\'t know the tsig key '
'we used for the zone "{}".'.format(
self.nameserver,
self.names.zone_name,
)
)
except dns.exception.Timeout:
raise DNSServerError(
"The DNS operation to the nameserver " '"{}" timed out.'.format(
self.nameserver
)
)
def _set_record(self, new_ip: str, ip_version: IpVersion = 4) -> UpdateRecord:
out = {}
out["ip_version"] = ip_version
out["new_ip"] = new_ip
old_ip: str = self._resolve(ip_version)
out["old_ip"] = old_ip
rdtype = self._convert_record_type(ip_version)
if new_ip == old_ip:
out["status"] = "UNCHANGED"
self.log_update(False, self.names.fqdn, rdtype, new_ip)
else:
self._dns_update.delete(self.names.fqdn, rdtype)
# If the client (a notebook) moves in a network without ipv6
# support, we have to delete the 'aaaa' record.
if rdtype == "a":
self._dns_update.delete(self.names.fqdn, "aaaa")
self._dns_update.add(self.names.fqdn, self.ttl, rdtype, new_ip)
self._query_tcp(self._dns_update)
checked_ip = self._resolve(ip_version)
if new_ip == checked_ip:
out["status"] = "UPDATED"
self.log_update(True, self.names.fqdn, rdtype, new_ip)
else:
out["status"] = "DNS_SERVER_ERROR"
return typing.cast(UpdateRecord, out)
[docs]
def delete(self):
self._dns_update.delete(self.names.fqdn, "a")
self._dns_update.delete(self.names.fqdn, "aaaa")
self._query_tcp(self._dns_update)
return True
[docs]
def update(self) -> list[UpdateRecord]:
results: list[UpdateRecord] = []
if not self.ipaddresses:
raise DyndnsError("No ip addresses set.")
if self.ipaddresses.ipv4:
results.append(self._set_record(new_ip=self.ipaddresses.ipv4, ip_version=4))
if self.ipaddresses.ipv6:
results.append(self._set_record(new_ip=self.ipaddresses.ipv6, ip_version=6))
return results