Source code for dyndns.dns

"""Query the DSN server using the package “dnspython”."""

# third party imports
import dns.exception
import dns.name
import dns.query
import dns.resolver
import dns.tsig
import dns.tsigkeyring
import dns.update

# local imports
from dyndns.exceptions import DNSServerError
from dyndns.log import UpdatesDB


[docs]class DnsUpdate: """ Update the DNS server """ def __init__(self, nameserver, names, ipaddresses=None, ttl=None): self.nameserver = nameserver #: The nameserver self.names = names self.ipaddresses = ipaddresses self.ttl = ttl if not self.ttl: self.ttl = 300 else: self.ttl = int(ttl) self._tsigkeyring = self._build_tsigkeyring( self.names.zone_name, self.names.tsig_key, ) self._dns_update = dns.update.Update(self.names.zone_name, keyring=self._tsigkeyring, keyalgorithm=dns.tsig.HMAC_SHA512) self._updates_db = UpdatesDB() self.log_update = self._updates_db.log_update @staticmethod def _build_tsigkeyring(zone_name, tsig_key): """ :param zone: A zone name object :type dns.name.Name: A instance of a dns.name.Name class :param str tsig_key: A TSIG key """ keyring = {} keyring[zone_name] = tsig_key return dns.tsigkeyring.from_text(keyring) @staticmethod def _convert_record_type(ip_version=4): if ip_version == 4: return 'a' elif ip_version == 6: return 'aaaa' else: raise ValueError('“ip_version” must be 4 or 6') def _resolve(self, ip_version=4): resolver = dns.resolver.Resolver() resolver.nameservers = [self.nameserver] try: ip = resolver.query( self.names.fqdn, self._convert_record_type(ip_version), ) return str(ip[0]) except dns.exception.DNSException: return '' def _query_tcp(self, dns_update): """Catch some error and convert this error to dyndns specific errors.""" try: dns.query.tcp(dns_update, where=self.nameserver, timeout=5) except dns.tsig.PeerBadKey: raise DNSServerError('The peer "{}" didn\'t know the tsig key ' 'we used for the zone "{}".'.format( self.nameserver, self.names.zone_name, )) except dns.exception.Timeout: raise DNSServerError('The DNS operation to the nameserver ' '"{}" timed out.'.format(self.nameserver)) def _set_record(self, new_ip, ip_version=4): out = {} out['ip_version'] = ip_version out['new_ip'] = new_ip old_ip = self._resolve(ip_version) out['old_ip'] = old_ip rdtype = self._convert_record_type(ip_version) if new_ip == old_ip: out['status'] = 'UNCHANGED' self.log_update(False, self.names.fqdn, rdtype, new_ip) else: self._dns_update.delete(self.names.fqdn, rdtype) # If the client (a notebook) moves in a network without ipv6 # support, we have to delete the 'aaaa' record. if rdtype == 'a': self._dns_update.delete(self.names.fqdn, 'aaaa') self._dns_update.add(self.names.fqdn, self.ttl, rdtype, new_ip) self._query_tcp(self._dns_update) checked_ip = self._resolve(ip_version) if new_ip == checked_ip: out['status'] = 'UPDATED' self.log_update(True, self.names.fqdn, rdtype, new_ip) else: out['status'] = 'DNS_SERVER_ERROR' return out
[docs] def delete(self): self._dns_update.delete(self.names.fqdn, 'a') self._dns_update.delete(self.names.fqdn, 'aaaa') self._query_tcp(self._dns_update) return True
[docs] def update(self): results = [] if self.ipaddresses.ipv4: results.append(self._set_record(new_ip=self.ipaddresses.ipv4, ip_version=4)) if self.ipaddresses.ipv6: results.append(self._set_record(new_ip=self.ipaddresses.ipv6, ip_version=6)) return results