Site URL Integrity Check
Published 11 months ago on July 24, 2023

I had a client recently that wanted a simple python script that can be ran on a schedule to check a series of static websites and detect changes. My understanding was that they wanted to check whether or not a website had been defaced autonomously.


Lets Dive Right In

First of all, it needs a convenient way of reading URLs from a file so they aren’t hard coded. To do this we look for a file called url-list.txt in the root directory, or a command-line arg called –url-list. If neither are found, an error is thrown. We check for their existence with this code:

def validate_url_file(path): if path is None: if Path(default_url_file).is_file(): return default_url_file else: return False elif Path(path).is_file(): return path else: return validate_url_file(None)
path is an optional argparse parameter and if it is None (i.e. it isn’t provided), we look for the default file. If that is found, we return the path, otherwise we return false. Similarly, if the path parameter is provided, we check if that exists and return the path if it does, or re-run the function with the path param set to None to force a check for the default url-list.txt file if the user-provided file does not exist.

This function is called in the main entry point of the script and handled like so:
url_list_path = validate_url_file(args.url_list) if url_list_path is False: print(f"{ERROR}url-list.txt not found in root directory and alternative file list not found or not provided.") exit(-1)

If there are no URLs, there’s nothing to do, so we exit.

Once we have established which path to use for the url-list.txt file, we pass that path to another function to extract all the URLs from the file. We also validate the file presence here too:

def prepare_urls(path): urls = [] if not Path(path).is_file(): print(f"{ERROR}URL file: {path} does not exist. Exiting.") exit(-1) with open(path) as file: urls = file.read().splitlines() return urls

This function returns a list with all the URLs found in the file.

We then ensure the directories we need, exist. These are the logs and tmp directories to store the integrity log and temporary files, respectively:

def init_dirs(): if not Path(log_path).is_dir(): Path(log_path).mkdir(parents=True, exist_ok=True) if not Path(tmp_path).is_dir(): Path(tmp_path).mkdir(parents=True, exist_ok=True)

We’re simply checking whether or not some pre-defined paths exist as directories, and if not, create them.

Finally we’re getting to the juice. This function below is the final function called in main and it calls another couple of functions too so lets walk through it:

def check_urls(sites, verbose): hashes = load_hashes() timestamp = give_me_a_timestamp() error_sites = [] errors = False log = open(log_path + 'integrity.log', 'a+') write_log(log, f"Integrity Check: {timestamp}") for i, site in enumerate(sites): data = read_site(site) write_to_file('file_' + str(i), data) h = md5('file_' + str(i)) os.remove(tmp_path + 'file_' + str(i)) if site in hashes: if compare_hash(hashes.get(site), h): write_log(log, "OK:\t" + site) if verbose: print(f"{OK}OK:\t{site}") else: write_log(log, "ERROR:\t" + site) error_sites.append(site) errors = True if verbose: print(f"{ERROR}ERROR:\t{site}") else: hashes[site] = h write_log(log, "ADD:\t" + site) if verbose: if errors: print(f"{ERROR}", 50 * "*") print(f"{ERROR}HASH MISMATCH DETECTED") else: print(50 * "*") print(f"{OK}All sites OK") log.write("\n") log.close() save_hashes(hashes) if errors: dispatch_alert(timestamp, error_sites)

The first thing we do is call a load_hashes() function which uses some JSON voodoo to convert the contents of a file from JSON to a dictionary. This file contains the md5 hash(es) of the sites that have been checked. If the file doesn’t exist, we just return an empty dictionary.

def load_hashes(): if Path(hash_file).is_file(): d = json.load(open(hash_file)) return d else: return {}


The Hash File

The hash file simply contains: hash:url where the hash is generated from the content of a particular URL.

Each URL is visited, the contents saved, a hash generated and then the saved file deleted. If a site hasn’t been checked yet, the hash is inserted into the hash file and if it does exist, it’s compared to the stored value, and that’s the foundation of the check. Obviously, this check would be useless if a single piece of dynamic content existed within the page such as the current time, or a randomly loaded image, or if the website is rendered in the browser, such as a React or Angular app, because that would change the value of the hash each time it was checked. Of course, it would probably be better to evaluate the actual content of the site and make sure it matches an expected structure/content, but this was a “I need this ASAP, its for multiple WP sites, the content is static”.

Next, we define a couple of variables that we’ll need, including a timestamp and errors flag, and we open the log file and write the current timestamp to it:

timestamp = give_me_a_timestamp() error_sites = [] errors = False log = open(log_path + 'integrity.log', 'a+') write_log(log, f"Integrity Check: {timestamp}")

Then we iterate through the sites list that we prepared earlier, and passed to this check_urls() function:

for i, site in enumerate(sites): data = read_site(site) write_to_file('file_' + str(i), data) h = md5('file_' + str(i)) os.remove(tmp_path + 'file_' + str(i)) if site in hashes: if compare_hash(hashes.get(site), h): write_log(log, "OK:\t" + site) if verbose: print(f"{OK}OK:\t{site}") else: write_log(log, "ERROR:\t" + site) error_sites.append(site) errors = True if verbose: print(f"{ERROR}ERROR:\t{site}") else: hashes[site] = h write_log(log, "ADD:\t" + site)

We use enumerate() here so we can access the loop iteration index and use that for naming temporary files, although as I read it now, that’s not strictly necessary since the file is deleted almost immediately after.

So the first thing we do is attempt to read the site with the read_site() function. This function is 2 lines so I won’t show it right now, it just uses urllib3 to make a GET request to the given URL and then returns the data. Once we have that data, we write it to a file and then compute the MD5 hash of that file with the md5() function, which looks like this:

def md5(fname): hash_md5 = hashlib.md5() with open(tmp_path + fname, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest()

We then delete the temporary file: os.remove(tmp_path + ‘file_’ + str(i)) because we no longer need it.

Next, we compare the hash we just generated from the temporary file to the hash stored in the hashes dictionary that we prepared at the start of the function. If the dictionary contains the current site URL, we do the comparison and if it’s a match, great! But if not, we store the site URL and set the errors flag to true so that we know an error has occurred. If the site URL doesn’t exist within the dictionary, we append it to the hash dictionary. We also write the outcome to the log file via the write_log() function.

if site in hashes: if compare_hash(hashes.get(site), h): write_log(log, "OK:\t" + site) if verbose: print(f"{OK}OK:\t{site}") else: write_log(log, "ERROR:\t" + site) error_sites.append(site) errors = True if verbose: print(f"{ERROR}ERROR:\t{site}") else: hashes[site] = h write_log(log, "ADD:\t" + site)

compare_hash() returns true or false based on the comparison, and write_log() takes a file handle and a message and writes the message to the given file handle. It doesn’t open or close the file, it assumes that the file is already open.

You’ll notice some if verbose: if statements too, those allow the process to be printed to console if the -v argument is passed to the script when executing it.

Finally, once that for-loop has ran its life, we output the result if verbosity is requested, close the log, save the hashes and dispatch an alert if ther are any errors:

if verbose: if errors: print(f"{ERROR}", 50 * "*") print(f"{ERROR}HASH MISMATCH DETECTED") else: print(50 * "*") print(f"{OK}All sites OK") log.write("\n") log.close() save_hashes(hashes) if errors: dispatch_alert(timestamp, error_sites)

dispatch_alert() makes use of the messagebird library to send a text message to a given number, the code looks like so:

def dispatch_alert(timestamp, error_sites): originator = "" # pre-registered originator target = "" # phone number if error_sites.count() > 1: message = f"Integrity Check: Irregularity detected at {timestamp} on multiple sites" elif error_sites.count() == 1: message = f"Integrity Check: Irregularity detected at {timestamp} on {error_sites[0]}" else: message = f"Integrity Check: Irregularity detected at {timestamp}" try: client = messagebird.Client(MESSAGEBIRD_API_KEY) client.message_create(originator, target, message) except messagebird.client.ErrorException as e: error_log = open(log_path + 'error_log') print(f'An SMS dispatch error occurred at {timestamp}') for error in e.errors: error_log.write(f"code: {error.code}\tdescription: {error.description}\tparameter: {error.parameter}") error_log.close()

The originator is the name that appears as the sender of the text message – this must be pre-registered with the API provider. The target is the target phone number to send the message to, and the message is, of course, the message to send. This function takes a timestamp and a list of error sites. If there is only one error site, we include the URL in the message content, otherwise we simply state that an irregularity was detected on multiple sites.

We wrap the messagebird code in a try-except block to handle any exceptions and in the event of an exception, we log it to a separate error log so that the issue can be resolved in the future. Generally speaking, if an error occurs here, it’s because of an issue with the API provider, such as a rate limit, regional restriction or running out of message credits.


The Complete Code

import os import urllib3 import hashlib from pathlib import Path from datetime import datetime import argparse import json import messagebird MESSAGEBIRD_API_KEY = "" http = urllib3.PoolManager() OK = '\033[92m' ERROR = '\033[91m' WARNING = '\033[93m' REAL_PATH = os.path.dirname(os.path.realpath(__file__)) tmp_path = REAL_PATH + "/tmp/" log_path = REAL_PATH + "/logs/" hash_file = REAL_PATH + "/hashes.data" default_url_file = REAL_PATH + "/url-list.txt" def md5(fname): hash_md5 = hashlib.md5() with open(tmp_path + fname, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest() def write_to_file(name, data): f = open(tmp_path + name, "w") f.write(data) f.close() def read_site(site): response = http.request("GET", site) return response.data.decode("utf-8") def init_dirs(): if not Path(log_path).is_dir(): Path(log_path).mkdir(parents=True, exist_ok=True) if not Path(tmp_path).is_dir(): Path(tmp_path).mkdir(parents=True, exist_ok=True) def compare_hash(stored, given): if stored == given: return True else: return False def give_me_a_timestamp(): timestamp = datetime.now().timestamp() date_time = datetime.fromtimestamp(timestamp) return date_time.strftime("%Y-%m-%d %H:%M:%S") def write_log(f, message): to_write = message + "\n" f.write(to_write) def save_hashes(hashes): json.dump(hashes, open(hash_file, 'w')) def load_hashes(): if Path(hash_file).is_file(): d = json.load(open(hash_file)) return d else: return {} def prepare_urls(path): urls = [] if not Path(path).is_file(): print(f"{ERROR}URL file: {path} does not exist. Exiting.") exit(-1) with open(path) as file: urls = file.read().splitlines() return urls def prepare_args(): parser = argparse.ArgumentParser(description="URL Integrity Check") parser.add_argument("-v", help="verbose mode", default=False, action='store_true') parser.add_argument('-u', '--url-list', help="Path to URL list file") return parser.parse_args() def validate_url_file(path): if path is None: if Path(default_url_file).is_file(): return default_url_file else: return False elif Path(path).is_file(): return path else: return validate_url_file(None) def check_urls(sites, verbose): hashes = load_hashes() timestamp = give_me_a_timestamp() error_sites = [] errors = False log = open(log_path + 'integrity.log', 'a+') write_log(log, f"Integrity Check: {timestamp}") for i, site in enumerate(sites): data = read_site(site) write_to_file('file_' + str(i), data) h = md5('file_' + str(i)) os.remove(tmp_path + 'file_' + str(i)) if site in hashes: if compare_hash(hashes.get(site), h): write_log(log, "OK:\t" + site) if verbose: print(f"{OK}OK:\t{site}") else: write_log(log, "ERROR:\t" + site) error_sites.append(site) errors = True if verbose: print(f"{ERROR}ERROR:\t{site}") else: hashes[site] = h write_log(log, "ADD:\t" + site) if verbose: if errors: print(f"{ERROR}", 50 * "*") print(f"{ERROR}HASH MISMATCH DETECTED") else: print(50 * "*") print(f"{OK}All sites OK") log.write("\n") log.close() save_hashes(hashes) if errors: dispatch_alert(timestamp, error_sites) def dispatch_alert(timestamp, error_sites): originator = "" # pre-registered originator target = "" # phone number if error_sites.count() > 1: message = f"Integrity Check: Irregularity detected at {timestamp} on multiple sites" elif error_sites.count() == 1: message = f"Integrity Check: Irregularity detected at {timestamp} on {error_sites[0]}" else: message = f"Integrity Check: Irregularity detected at {timestamp}" try: client = messagebird.Client(MESSAGEBIRD_API_KEY) client.message_create(originator, target, message) except messagebird.client.ErrorException as e: error_log = open(log_path + 'error_log') print(f'An SMS dispatch error occurred at {timestamp}') for error in e.errors: error_log.write(f"code: {error.code}\tdescription: {error.description}\tparameter: {error.parameter}") error_log.close() if __name__ == '__main__': args = prepare_args() verbose = args.v url_list_path = validate_url_file(args.url_list) if url_list_path is False: print(f"{ERROR}url-list.txt not found in root directory and alternative file list not found or not provided.") exit(-1) sites = prepare_urls(url_list_path) init_dirs() check_urls(sites, verbose)


Summary

Here we looked at reading the data from a static website and generating a hash from that data in order to compare it to a previously generated hash to detect whether or not the content has changed. If the content has changed, we use the messagebird API to dispatch a text message to a designated number to alert somebody of the change.

This script is scheduled to run once an hour with cron. If you’re unfamiliar with cron, it’s a command-line task scheduler service for unix-based systems. I’ve heard rumours that systemd will replace it eventually, but I think cron is so popular and so widespread that it’s here to stay for at least a little while longer!

You can prepare this script in cron by running crontab -e in your unix terminal and then adding the following line of code to the bottom:

0 * * * * /usr/bin/python /path/to/integrity-check.py

and that’ll execute every hour, on the hour. The syntax for this is:

minute hour day of month month of year day of week /path/to/script 0-59 0-23 1-31 1-12 0-7 (0 and 7 are sunday)
That’s the simple version, but there are more advanced examples too which use asterisks, question marks, ranges etc. Cron is quite the versatile tool!