I had a client recently that wanted a simple python script that can be ran on a schedule to check a series of static websites and detect changes. My understanding was that they wanted to check whether or not a website had been defaced autonomously.
Lets Dive Right In
First of all, it needs a convenient way of reading URLs from a file so they aren’t hard coded. To do this we look for a file called
def validate_url_file(path):
if path is None:
if Path(default_url_file).is_file():
return default_url_file
else:
return False
elif Path(path).is_file():
return path
else:
return validate_url_file(None)
This function is called in the main entry point of the script and handled like so:
url_list_path = validate_url_file(args.url_list)
if url_list_path is False:
print(f"{ERROR}url-list.txt not found in root directory and alternative file list not found or not provided.")
exit(-1)
If there are no URLs, there’s nothing to do, so we exit.
Once we have established which path to use for the url-list.txt file, we pass that path to another function to extract all the URLs from the file. We also validate the file presence here too:
def prepare_urls(path):
urls = []
if not Path(path).is_file():
print(f"{ERROR}URL file: {path} does not exist. Exiting.")
exit(-1)
with open(path) as file:
urls = file.read().splitlines()
return urls
This function returns a list with all the URLs found in the file.
We then ensure the directories we need, exist. These are the
def init_dirs():
if not Path(log_path).is_dir():
Path(log_path).mkdir(parents=True, exist_ok=True)
if not Path(tmp_path).is_dir():
Path(tmp_path).mkdir(parents=True, exist_ok=True)
We’re simply checking whether or not some pre-defined paths exist as directories, and if not, create them.
Finally we’re getting to the juice. This function below is the final function called in main and it calls another couple of functions too so lets walk through it:
def check_urls(sites, verbose):
hashes = load_hashes()
timestamp = give_me_a_timestamp()
error_sites = []
errors = False
log = open(log_path + 'integrity.log', 'a+')
write_log(log, f"Integrity Check: {timestamp}")
for i, site in enumerate(sites):
data = read_site(site)
write_to_file('file_' + str(i), data)
h = md5('file_' + str(i))
os.remove(tmp_path + 'file_' + str(i))
if site in hashes:
if compare_hash(hashes.get(site), h):
write_log(log, "OK:\t" + site)
if verbose:
print(f"{OK}OK:\t{site}")
else:
write_log(log, "ERROR:\t" + site)
error_sites.append(site)
errors = True
if verbose:
print(f"{ERROR}ERROR:\t{site}")
else:
hashes[site] = h
write_log(log, "ADD:\t" + site)
if verbose:
if errors:
print(f"{ERROR}", 50 * "*")
print(f"{ERROR}HASH MISMATCH DETECTED")
else:
print(50 * "*")
print(f"{OK}All sites OK")
log.write("\n")
log.close()
save_hashes(hashes)
if errors:
dispatch_alert(timestamp, error_sites)
The first thing we do is call a
def load_hashes():
if Path(hash_file).is_file():
d = json.load(open(hash_file))
return d
else:
return {}
The Hash File
The hash file simply contains:
Each URL is visited, the contents saved, a hash generated and then the saved file deleted. If a site hasn’t been checked yet, the hash is inserted into the hash file and if it does exist, it’s compared to the stored value, and that’s the foundation of the check. Obviously, this check would be useless if a single piece of dynamic content existed within the page such as the current time, or a randomly loaded image, or if the website is rendered in the browser, such as a React or Angular app, because that would change the value of the hash each time it was checked. Of course, it would probably be better to evaluate the actual content of the site and make sure it matches an expected structure/content, but this was a “I need this ASAP, its for multiple WP sites, the content is static”.
Next, we define a couple of variables that we’ll need, including a timestamp and errors flag, and we open the log file and write the current timestamp to it:
timestamp = give_me_a_timestamp()
error_sites = []
errors = False
log = open(log_path + 'integrity.log', 'a+')
write_log(log, f"Integrity Check: {timestamp}")
Then we iterate through the
for i, site in enumerate(sites):
data = read_site(site)
write_to_file('file_' + str(i), data)
h = md5('file_' + str(i))
os.remove(tmp_path + 'file_' + str(i))
if site in hashes:
if compare_hash(hashes.get(site), h):
write_log(log, "OK:\t" + site)
if verbose:
print(f"{OK}OK:\t{site}")
else:
write_log(log, "ERROR:\t" + site)
error_sites.append(site)
errors = True
if verbose:
print(f"{ERROR}ERROR:\t{site}")
else:
hashes[site] = h
write_log(log, "ADD:\t" + site)
We use
So the first thing we do is attempt to read the site with the
def md5(fname):
hash_md5 = hashlib.md5()
with open(tmp_path + fname, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
We then delete the temporary file:
Next, we compare the hash we just generated from the temporary file to the hash stored in the hashes dictionary that we prepared at the start of the function. If the dictionary contains the current site URL, we do the comparison and if it’s a match, great! But if not, we store the site URL and set the errors flag to true so that we know an error has occurred. If the site URL doesn’t exist within the dictionary, we append it to the hash dictionary. We also write the outcome to the log file via the
if site in hashes:
if compare_hash(hashes.get(site), h):
write_log(log, "OK:\t" + site)
if verbose:
print(f"{OK}OK:\t{site}")
else:
write_log(log, "ERROR:\t" + site)
error_sites.append(site)
errors = True
if verbose:
print(f"{ERROR}ERROR:\t{site}")
else:
hashes[site] = h
write_log(log, "ADD:\t" + site)
You’ll notice some
Finally, once that for-loop has ran its life, we output the result if verbosity is requested, close the log, save the hashes and dispatch an alert if ther are any errors:
if verbose:
if errors:
print(f"{ERROR}", 50 * "*")
print(f"{ERROR}HASH MISMATCH DETECTED")
else:
print(50 * "*")
print(f"{OK}All sites OK")
log.write("\n")
log.close()
save_hashes(hashes)
if errors:
dispatch_alert(timestamp, error_sites)
def dispatch_alert(timestamp, error_sites):
originator = "" # pre-registered originator
target = "" # phone number
if error_sites.count() > 1:
message = f"Integrity Check: Irregularity detected at {timestamp} on multiple sites"
elif error_sites.count() == 1:
message = f"Integrity Check: Irregularity detected at {timestamp} on {error_sites[0]}"
else:
message = f"Integrity Check: Irregularity detected at {timestamp}"
try:
client = messagebird.Client(MESSAGEBIRD_API_KEY)
client.message_create(originator, target, message)
except messagebird.client.ErrorException as e:
error_log = open(log_path + 'error_log')
print(f'An SMS dispatch error occurred at {timestamp}')
for error in e.errors:
error_log.write(f"code: {error.code}\tdescription: {error.description}\tparameter: {error.parameter}")
error_log.close()
The originator is the name that appears as the sender of the text message – this must be pre-registered with the API provider. The target is the target phone number to send the message to, and the message is, of course, the message to send. This function takes a timestamp and a list of error sites. If there is only one error site, we include the URL in the message content, otherwise we simply state that an irregularity was detected on multiple sites.
We wrap the messagebird code in a try-except block to handle any exceptions and in the event of an exception, we log it to a separate error log so that the issue can be resolved in the future. Generally speaking, if an error occurs here, it’s because of an issue with the API provider, such as a rate limit, regional restriction or running out of message credits.
The Complete Code
import os
import urllib3
import hashlib
from pathlib import Path
from datetime import datetime
import argparse
import json
import messagebird
MESSAGEBIRD_API_KEY = ""
http = urllib3.PoolManager()
OK = '\033[92m'
ERROR = '\033[91m'
WARNING = '\033[93m'
REAL_PATH = os.path.dirname(os.path.realpath(__file__))
tmp_path = REAL_PATH + "/tmp/"
log_path = REAL_PATH + "/logs/"
hash_file = REAL_PATH + "/hashes.data"
default_url_file = REAL_PATH + "/url-list.txt"
def md5(fname):
hash_md5 = hashlib.md5()
with open(tmp_path + fname, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def write_to_file(name, data):
f = open(tmp_path + name, "w")
f.write(data)
f.close()
def read_site(site):
response = http.request("GET", site)
return response.data.decode("utf-8")
def init_dirs():
if not Path(log_path).is_dir():
Path(log_path).mkdir(parents=True, exist_ok=True)
if not Path(tmp_path).is_dir():
Path(tmp_path).mkdir(parents=True, exist_ok=True)
def compare_hash(stored, given):
if stored == given:
return True
else:
return False
def give_me_a_timestamp():
timestamp = datetime.now().timestamp()
date_time = datetime.fromtimestamp(timestamp)
return date_time.strftime("%Y-%m-%d %H:%M:%S")
def write_log(f, message):
to_write = message + "\n"
f.write(to_write)
def save_hashes(hashes):
json.dump(hashes, open(hash_file, 'w'))
def load_hashes():
if Path(hash_file).is_file():
d = json.load(open(hash_file))
return d
else:
return {}
def prepare_urls(path):
urls = []
if not Path(path).is_file():
print(f"{ERROR}URL file: {path} does not exist. Exiting.")
exit(-1)
with open(path) as file:
urls = file.read().splitlines()
return urls
def prepare_args():
parser = argparse.ArgumentParser(description="URL Integrity Check")
parser.add_argument("-v", help="verbose mode", default=False, action='store_true')
parser.add_argument('-u', '--url-list', help="Path to URL list file")
return parser.parse_args()
def validate_url_file(path):
if path is None:
if Path(default_url_file).is_file():
return default_url_file
else:
return False
elif Path(path).is_file():
return path
else:
return validate_url_file(None)
def check_urls(sites, verbose):
hashes = load_hashes()
timestamp = give_me_a_timestamp()
error_sites = []
errors = False
log = open(log_path + 'integrity.log', 'a+')
write_log(log, f"Integrity Check: {timestamp}")
for i, site in enumerate(sites):
data = read_site(site)
write_to_file('file_' + str(i), data)
h = md5('file_' + str(i))
os.remove(tmp_path + 'file_' + str(i))
if site in hashes:
if compare_hash(hashes.get(site), h):
write_log(log, "OK:\t" + site)
if verbose:
print(f"{OK}OK:\t{site}")
else:
write_log(log, "ERROR:\t" + site)
error_sites.append(site)
errors = True
if verbose:
print(f"{ERROR}ERROR:\t{site}")
else:
hashes[site] = h
write_log(log, "ADD:\t" + site)
if verbose:
if errors:
print(f"{ERROR}", 50 * "*")
print(f"{ERROR}HASH MISMATCH DETECTED")
else:
print(50 * "*")
print(f"{OK}All sites OK")
log.write("\n")
log.close()
save_hashes(hashes)
if errors:
dispatch_alert(timestamp, error_sites)
def dispatch_alert(timestamp, error_sites):
originator = "" # pre-registered originator
target = "" # phone number
if error_sites.count() > 1:
message = f"Integrity Check: Irregularity detected at {timestamp} on multiple sites"
elif error_sites.count() == 1:
message = f"Integrity Check: Irregularity detected at {timestamp} on {error_sites[0]}"
else:
message = f"Integrity Check: Irregularity detected at {timestamp}"
try:
client = messagebird.Client(MESSAGEBIRD_API_KEY)
client.message_create(originator, target, message)
except messagebird.client.ErrorException as e:
error_log = open(log_path + 'error_log')
print(f'An SMS dispatch error occurred at {timestamp}')
for error in e.errors:
error_log.write(f"code: {error.code}\tdescription: {error.description}\tparameter: {error.parameter}")
error_log.close()
if __name__ == '__main__':
args = prepare_args()
verbose = args.v
url_list_path = validate_url_file(args.url_list)
if url_list_path is False:
print(f"{ERROR}url-list.txt not found in root directory and alternative file list not found or not provided.")
exit(-1)
sites = prepare_urls(url_list_path)
init_dirs()
check_urls(sites, verbose)
Summary
Here we looked at reading the data from a static website and generating a hash from that data in order to compare it to a previously generated hash to detect whether or not the content has changed. If the content has changed, we use the messagebird API to dispatch a text message to a designated number to alert somebody of the change.
This script is scheduled to run once an hour with cron. If you’re unfamiliar with cron, it’s a command-line task scheduler service for unix-based systems. I’ve heard rumours that
You can prepare this script in cron by running
0 * * * * /usr/bin/python /path/to/integrity-check.py
and that’ll execute every hour, on the hour. The syntax for this is:
minute hour day of month month of year day of week /path/to/script
0-59 0-23 1-31 1-12 0-7 (0 and 7 are sunday)
That’s the simple version, but there are more advanced examples too which use asterisks, question marks, ranges etc. Cron is quite the versatile tool!