Source code for dyndns.dns

"""Query the DSN server using the package “dnspython”."""

import random
import string
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any

import dns.exception
import dns.message
import dns.name
import dns.query
import dns.rdatatype
import dns.resolver
import dns.rrset
import dns.tsig
import dns.tsigkeyring
import dns.update

from dyndns.config import RecordType
from dyndns.exceptions import CheckError, DNSServerError
from dyndns.log import LogLevel, logger

if TYPE_CHECKING:
    from dyndns.zones import Zone


[docs] @dataclass class DnsChangeMessage: fqdn: str old: str | None new: str | None record_type: RecordType @property def changed(self) -> bool: return self.old != self.new
[docs] class DnsZone: _nameserver: str """The ip address of the nameserver, for example ``127.0.0.1``.""" _port: int _zone: "Zone" _keyring: dict[dns.name.Name, dns.tsig.Key] __resolver: dns.resolver.Resolver def __init__(self, nameserver: str, port: int, zone: "Zone") -> None: self._nameserver = nameserver self._port = port self._zone = zone self._keyring = dns.tsigkeyring.from_text({zone.name: zone.tsig_key}) @property def _resolver(self) -> dns.resolver.Resolver: if not hasattr(self, "__resolver"): self.__resolver = dns.resolver.Resolver() self.__resolver.nameservers = [self._nameserver] self.__resolver.port = self._port return self.__resolver def _create_update_message(self) -> dns.update.UpdateMessage: return dns.update.UpdateMessage( self._zone.name, keyring=self._keyring, keyalgorithm=dns.tsig.HMAC_SHA512, ) def _query(self, message: dns.message.Message) -> dns.message.Message: """Catch some errors and convert this errors to dyndns specific errors.""" try: return dns.query.tcp( message, where=self._nameserver, port=self._port, timeout=5 ) except dns.tsig.PeerBadKey: raise DNSServerError( f'The peer "{self._nameserver}" didn\'t know the tsig key ' f'we used for the zone "{self._zone.name}".' ) except dns.exception.Timeout: raise DNSServerError( f'The DNS operation to the nameserver "{self._nameserver}" timed out.' ) def _normalize_name(self, name: str) -> str: """ :param name: A record name (e. g. ``dyndns``) or a fully qualified domain name (e. g. ``dyndns.example.com``). :return: A fully qualified domain name (e. g. ``dyndns.example.com.``). """ return self._zone.get_fqdn(name)
[docs] def add_record( self, name: str, record_type: RecordType, content: str, ttl: int = 300 ) -> DnsChangeMessage: """ Add one record. All existing records with the same name and same record type are deleted before a new record is added. :param name: A record name (e. g. ``dyndns``) or a fully qualified domain name (e. g. ``dyndns.example.com``). :param record_type: The type of the resource record. ``dyndns`` supports only ``A``, ``AAAA`` and ``TXT`` record types. """ fqdn = self._normalize_name(name) old = self.read_record(fqdn, record_type) self._delete_record(fqdn, record_type) message: dns.update.UpdateMessage = self._create_update_message() message.add(fqdn, ttl, record_type, content) self._query(message) new = self.read_record(fqdn, record_type) return DnsChangeMessage(fqdn=fqdn, old=old, new=new, record_type=record_type)
[docs] def read_resource_record_set( self, name: str, record_type: RecordType ) -> dns.rrset.RRset | None: """ :param name: A record name (e. g. ``dyndns``) or a fully qualified domain name (e. g. ``dyndns.example.com``). :param record_type: The type of the resource record. ``dyndns`` supports only ``A``, ``AAAA`` and ``TXT`` record types. """ result: dns.resolver.Answer = self._resolver.resolve( self._normalize_name(name), record_type ) return result.rrset
[docs] def read_record(self, name: str, record_type: RecordType) -> str | None: """ Read one record. :param name: A record name (e. g. ``dyndns``) or a fully qualified domain name (e. g. ``dyndns.example.com``). :param record_type: The type of the resource record. ``dyndns`` supports only ``A``, ``AAAA`` and ``TXT`` record types. """ try: result: Any = self.read_resource_record_set(name, record_type) if result and len(result) > 0: if record_type == "TXT": element = result.pop() result = element.strings[0].decode() if isinstance(result, str): return result raise ValueError("The record could not be read.") else: return str(result[0]) except (dns.resolver.NoAnswer, dns.resolver.NXDOMAIN): pass return None
[docs] def read_a_record(self, name: str) -> str | None: """ Read an IPv4 address record. :param name: A record name (e. g. ``dyndns``) or a fully qualified domain name (e. g. ``dyndns.example.com``). :return: An IPv4 address. """ return self.read_record(name, "A")
[docs] def read_aaaa_record(self, name: str) -> str | None: """ Read an IPv6 address record. :param name: A record name (e. g. ``dyndns``) or a fully qualified domain name (e. g. ``dyndns.example.com``). :return: An IPv6 address. """ return self.read_record(name, "AAAA")
[docs] def is_a_record(self, name: str) -> bool: """ Return ``True`` if the specified name has an A record (IPv4). :param name: A record name (e. g. ``dyndns``) or a fully qualified domain name (e. g. ``dyndns.example.com``). :return: `True`` if the specified name has an A record (IPv4). """ return self.read_a_record(name) is not None
[docs] def is_aaaa_record(self, name: str) -> bool: """ Return ``True`` if the specified name has an AAAA record (IPv6). :param name: A record name (e. g. ``dyndns``) or a fully qualified domain name (e. g. ``dyndns.example.com``). :return: `True`` if the specified name has an AAAA record (IPv6). """ return self.read_aaaa_record(name) is not None
def _delete_record( self, name: str, record_type: RecordType = "A" ) -> dns.message.Message: """ Delete one record or multiple records of a specific record type. :param name: A record name (e. g. ``dyndns``) or a fully qualified domain name (e. g. ``dyndns.example.com``). :param record_type: The type of the resource record. ``dyndns`` supports only ``A``, ``AAAA`` and ``TXT`` record types. """ message: dns.update.UpdateMessage = self._create_update_message() message.delete(self._normalize_name(name), record_type) return self._query(message)
[docs] def delete_record( self, name: str, record_type: RecordType = "A" ) -> DnsChangeMessage: """ Delete one record or multiple records of a specific record type. :param name: A record name (e. g. ``dyndns``) or a fully qualified domain name (e. g. ``dyndns.example.com``). :param record_type: The type of the resource record. ``dyndns`` supports only ``A``, ``AAAA`` and ``TXT`` record types. """ fqdn: str = self._normalize_name(name) old = self.read_record(fqdn, record_type) self._delete_record(fqdn, record_type) return DnsChangeMessage(fqdn=fqdn, old=old, new=None, record_type=record_type)
[docs] def delete_records(self, name: str) -> None: """Delete all A and the AAAA records. :param name: A record name (e. g. ``dyndns``) or a fully qualified domain name (e. g. ``dyndns.example.com``). """ self._delete_record(name, "A") self._delete_record(name, "AAAA")
[docs] def check(self) -> str: """Check the functionality of the DNS server by creating a temporary text record.""" check_record_name = "dyndns-check-tmp-a841278b-f089-4164-b8e6-f90514e573ec" random_content: str = "".join( random.choices(string.ascii_uppercase + string.digits, k=8) ) self._delete_record(check_record_name, "TXT") self.add_record(check_record_name, "TXT", random_content) result: str | None = self.read_record(check_record_name, "TXT") self._delete_record(check_record_name, "TXT") if not result: raise CheckError("no response") if result != random_content: raise CheckError("check failed") else: return logger.log( LogLevel.INFO, "The update check passed: " f"A TXT record '{check_record_name}' with the content '{random_content}' " f"could be updated on the zone '{self._zone.name}'.", )