# Copyright (C) Internet Systems Consortium, Inc. ("ISC") # # SPDX-License-Identifier: MPL-2.0 # # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, you can obtain one at https://mozilla.org/MPL/2.0/. # # See the COPYRIGHT file distributed with this work for additional # information regarding copyright ownership. from collections import defaultdict import dns.message import dns.query import dns.rcode TIMEOUT = 10 def create_msg(qname, qtype): msg = dns.message.make_query( qname, qtype, want_dnssec=True, use_edns=0, payload=4096 ) return msg def udp_query(ip, port, msg): ans = dns.query.udp(msg, ip, TIMEOUT, port=port) assert ans.rcode() == dns.rcode.NOERROR return ans def tcp_query(ip, port, msg): ans = dns.query.tcp(msg, ip, TIMEOUT, port=port) assert ans.rcode() == dns.rcode.NOERROR return ans def create_expected(data): expected = { "dns-tcp-requests-sizes-received-ipv4": defaultdict(int), "dns-tcp-responses-sizes-sent-ipv4": defaultdict(int), "dns-tcp-requests-sizes-received-ipv6": defaultdict(int), "dns-tcp-responses-sizes-sent-ipv6": defaultdict(int), "dns-udp-requests-sizes-received-ipv4": defaultdict(int), "dns-udp-requests-sizes-received-ipv6": defaultdict(int), "dns-udp-responses-sizes-sent-ipv4": defaultdict(int), "dns-udp-responses-sizes-sent-ipv6": defaultdict(int), } for k, v in data.items(): for kk, vv in v.items(): expected[k][kk] += vv return expected def update_expected(expected, key, msg): msg_len = len(msg.to_wire()) bucket_num = (msg_len // 16) * 16 bucket = "{}-{}".format(bucket_num, bucket_num + 15) expected[key][bucket] += 1 def check_traffic(data, expected): def ordered(obj): if isinstance(obj, dict): return sorted((k, ordered(v)) for k, v in obj.items()) if isinstance(obj, list): return sorted(ordered(x) for x in obj) return obj ordered_data = ordered(data) ordered_expected = ordered(expected) assert len(ordered_data) == 8 assert len(ordered_expected) == 8 assert len(data) == len(ordered_data) assert len(expected) == len(ordered_expected) assert ordered_data == ordered_expected def test_traffic(fetch_traffic, **kwargs): statsip = kwargs["statsip"] statsport = kwargs["statsport"] port = kwargs["port"] data = fetch_traffic(statsip, statsport) exp = create_expected(data) msg = create_msg("short.example.", "TXT") update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg) ans = udp_query(statsip, port, msg) update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans) data = fetch_traffic(statsip, statsport) check_traffic(data, exp) msg = create_msg("long.example.", "TXT") update_expected(exp, "dns-udp-requests-sizes-received-ipv4", msg) ans = udp_query(statsip, port, msg) update_expected(exp, "dns-udp-responses-sizes-sent-ipv4", ans) data = fetch_traffic(statsip, statsport) check_traffic(data, exp) msg = create_msg("short.example.", "TXT") update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg) ans = tcp_query(statsip, port, msg) update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans) data = fetch_traffic(statsip, statsport) check_traffic(data, exp) msg = create_msg("long.example.", "TXT") update_expected(exp, "dns-tcp-requests-sizes-received-ipv4", msg) ans = tcp_query(statsip, port, msg) update_expected(exp, "dns-tcp-responses-sizes-sent-ipv4", ans) data = fetch_traffic(statsip, statsport) check_traffic(data, exp)