From b6ddac9531285bf2368db4425d6da2ee2e652b74 Mon Sep 17 00:00:00 2001 From: Henrique Goncalves Date: Fri, 7 Feb 2025 23:25:30 -0300 Subject: [PATCH] add nextdns script --- scripts/create-nextdns.py | 160 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 160 insertions(+) create mode 100644 scripts/create-nextdns.py diff --git a/scripts/create-nextdns.py b/scripts/create-nextdns.py new file mode 100644 index 0000000..6a90baf --- /dev/null +++ b/scripts/create-nextdns.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python3 + +import json +import os +import random +import sys +import time +import urllib.error +import urllib.request + +# Configuration +CACHE_DOMAINS_JSON = "cache_domains.json" + +# Timing / Retry configuration +DELAY_SECONDS = 1 # delay between requests +MAX_RETRIES = 5 # maximum retries if rate limited +BACKOFF_FACTOR = 2 # exponential backoff multiplier +USER_AGENT = "python-requests/2.31.0" + + +def post_redirect(base_url, api_key, payload): + """Post a redirect entry using urllib.request. Implements retry logic if rate limited (429) is encountered.""" + headers = { + "X-Api-Key": api_key, + "Content-Type": "application/json", + "User-Agent": USER_AGENT, + } + data = json.dumps(payload).encode("utf-8") + retries = 0 + current_delay = DELAY_SECONDS + + while retries <= MAX_RETRIES: + req = urllib.request.Request( + url=base_url, data=data, headers=headers, method="POST" + ) + try: + with urllib.request.urlopen(req) as response: + status_code = response.getcode() + if status_code in [200, 201]: + return True, response.read().decode("utf-8") + else: + # For non-success status codes (other than 429) + return False, f"Unexpected status code: {status_code}" + except urllib.error.HTTPError as e: + if e.code == 429: + print(f"[*] Rate limited, waiting for {current_delay} seconds ...") + time.sleep(current_delay) + retries += 1 + current_delay *= BACKOFF_FACTOR + else: + try: + error_body = e.read().decode("utf-8") + except Exception: + error_body = "No response body" + return False, f"HTTPError {e.code}, response: {error_body}" + except urllib.error.URLError as e: + return False, f"URLError: {e.reason}" + + return False, "Max retries exceeded" + + +def run(profile_id, api_key, redirect_ip): + base_url = f"https://api.nextdns.io/profiles/{profile_id}/rewrites/" + + # Read cache_domains.json + try: + with open(CACHE_DOMAINS_JSON, "r") as f: + cache_data = json.load(f) + except Exception as e: + print(f"[-] Failed to load {CACHE_DOMAINS_JSON}: {e}") + return + + # Set to deduplicate domains + all_domains = set() + + # Process each CDN entry in the JSON + for entry in cache_data.get("cache_domains", []): + domain_files = entry.get("domain_files", []) + for file_name in domain_files: + if os.path.exists(file_name): + with open(file_name, "r") as file: + # Read each line; ignore blank lines or comment lines + for line in file: + line = line.strip() + if not line or line.startswith("#") or line.startswith("//"): + continue + all_domains.add(line.lstrip("*.")) + else: + print(f"[-] File '{file_name}' not found, skipping ...") + + print("[*] Collected domains:") + for domain in sorted(all_domains): + print(f" - {domain}") + + # Retrieve the existing rewrite entries from NextDNS API + headers = { + "X-Api-Key": api_key, + "User-Agent": USER_AGENT, + } + req = urllib.request.Request(url=base_url, headers=headers, method="GET") + try: + with urllib.request.urlopen(req) as response: + if response.getcode() != 200: + resp_body = response.read().decode("utf-8") + print( + f"[-] Failed to get existing redirects, status code: {response.getcode()}, response: {resp_body}" + ) + return + resp_data = json.loads(response.read().decode("utf-8")) + except urllib.error.HTTPError as e: + try: + error_body = e.read().decode("utf-8") + except Exception: + error_body = "No response body" + print( + f"[-] Failed to get existing redirects, status code: {e.code}, response: {error_body}" + ) + return + except urllib.error.URLError as e: + print(f"[-] Failed to get existing redirects, URLError: {e.reason}") + return + + data = resp_data.get("data", []) + existing_domains = {entry.get("name") for entry in data} + print("\n[*] Existing domains:") + for domain in sorted(existing_domains): + print(f" - {domain}") + + # For each domain, if missing in NextDNS, post a new redirect + for domain in all_domains: + if domain in existing_domains: + print(f"[*] Domain '{domain}' already exists, skipping...") + continue + + payload = { + "name": domain, + "content": redirect_ip, + } + + print(f"[+] Adding '{domain}'...") + success, post_resp = post_redirect(base_url, api_key, payload) + if not success: + print(f"[-] Failed to add redirect for '{domain}', response: {post_resp}") + + # Delay between API calls to prevent triggering rate limits + time.sleep(DELAY_SECONDS + random.uniform(0, 1)) + + print("\n[+] Done!") + + +if __name__ == "__main__": + if len(sys.argv) != 4: + print("Usage: create-nextdns.py ") + sys.exit(1) + + profile_id = sys.argv[1] + api_key = sys.argv[2] + redirect_ip = sys.argv[3] + + run(profile_id, api_key, redirect_ip)