In the recent interview with Jim Ferguson, we discussed a Tableau dashboard I had generated from pharmacovigilance data from the European Medicines Agency (https://www.adrreports.eu/).
For those who wish to download the data, below is a Python script I developed with Twan van der Poel (https://vanderpoelsoftware.com/).
The tool downloads everything, both serious and non-serious line listings and both products and substances.
Later I will post the shell scripts and SQL statements to process the CSV files into PostgreSQL
If you have questions, pls feel free to leave a comment.
Download python script here: https://drive.google.com/file/d/1Zs8HiB3W-bd77OspdTrnhb8WiXsskfTM/view?usp=sharing
The Tableau dashboard is work in progress but can be downloaded here: https://drive.google.com/file/d/16RRvQJZ6VEyAF_Gm4DDPtE200esXTlSM/view?usp=share_link
(download a 14d trial of Tableau to open the dashboard)
import errno, json, hashlib, logging, pprint, urllib.request, re, requests, os, ssl, sys, time, threading, queue
from alive_progress import alive_bar;
from concurrent.futures import ThreadPoolExecutor
from itertools import product
from pathlib import Path
from multiprocessing.pool import ThreadPool
from urllib.parse import urlparse, unquote, urlencode
from requests.utils import requote_uri
from termcolor import colored
"""
# EMA Crawler
Crawls data from adrreports.eu and converts it into
data CSV and TXT files. The script operates from
the working directory and will create a "temp/", "output/" and "downloads/" subdirectory.
The "temp/" subdirectory will contain remote cache files and
an "output/" directory, the latter will contain the output files.
If downloads are performed the downloads are stored in a "downloads/" directory, this
directory holds hash references to keep track of what it's origins.
* Authors: In alphabetical order:
* Twan van der Poel <twan@vanderpoelsoftware.com>
* Wouter Aukema <waukema@gmail.com>
* Usage: python3 ema-crawler.py
* Version: 1.98
# Changelog
## 1.98
* Used new session for each executor.map iteration
* Further integration of multithreading
* Temporary removal of hash-subdirectory
* Added stopwatch
## 1.97
* Dynamic non-configurable download directory
* Recycled session for each .get call
* Removed last bits of BIN_CHROMEDRIVER
* Dedicated function for remote calls
* Further integration of remote HTTP errors
* Resolved several encoding issues
## 1.96
* Simplified target directory based on files' md5 hash
* Wrapped executor in progress-bar
* Added parameter VERBOSE_DEBUGGING
## 1.95
* Added setting for verbose chromedriver logging
* Removed Selenium approach
* Integrated requests.Session()
## 1.94
* Integrated dynamic download storage
* Feature which moves downloads to their final location
* Added setting USE_DOWNLOADS_CACHE
## 1.93
* Integrated generic fatal_shutdown handler
## 1.92
* Headless support for Chromedriver
* Integrated storage filenames in list of downloads
* Integrated filters to capture downloads of more than 250K of lines properly
* Added parameter "MAX_PRODUCT_COUNT" for testing small chunks
* Dynamic construction of URL (incl. filters)
* Cache integration in download directory
* Dynamic construction of filter combinations for Covid related HLC's
## Todo
* Replace hard-coded ID's with detection of 250k limitation
"""
# Settings
MAX_PRODUCT_COUNT = None
MAX_THREADS = 20
USE_DOWNLOADS_CACHE = False
VERBOSE_DEBUGGING = False
connect_timeout: int = 40
download_timeout: int = 100
# Variables used to split up large downloads
variables = {
'Seriousness': 'Serious',
'Regulatory Primary Source Country EEA/Non EEA': 'European Economic Area',
'Primary Source Qualification': 'Healthcare Professional',
'Sex': 'Female',
}
# Years used to split up large downloads
years = {
'2017': '2017',
'2018': '2018',
'2019': '2019',
'2020': '2020',
'2021': '2021',
'2022': '2022',
'2023': '2023',
'2024': '2024',
}
# Timing
executionStart = time.time()
print("Version 1.98")
# Function which handles fatal shutdowns
def fatal_shutdown(errorMessage, identifier):
print(colored('+--------------------------------------------------+', 'red'))
print(colored('| EMA crawler crashed, please read the error below |', 'red'))
print(colored('+--------------------------------------------------+', 'red'))
print(colored(errorMessage, 'yellow')+" "+colored("("+identifier+")", "white"))
sys.exit()
# Function which debugs verbose logging (if enabled)
def verbose_debug(logMessage):
if not VERBOSE_DEBUGGING:
return
print(logMessage)
# Helper to exit execution
def exit():
sys.exit()
# Determine downloads directory
downloadsPath = ("%s%sdl" % (os.getcwd(), os.sep))
# Default EMA DPA base URL
emaBaseUrl = 'https://dap.ema.europa.eu/analytics/saw.dll?Go'
# Prepare thread storage
threadStorage = threading.local()
# Pre-perform validation
if MAX_PRODUCT_COUNT is not None and not isinstance(MAX_PRODUCT_COUNT, int):
fatal_shutdown("MAX_PRODUCT_COUNT must be one of; numeric, None", "V01")
if not isinstance(MAX_THREADS, int) or MAX_THREADS < 1:
fatal_shutdown("MAX_THREADS must be numeric, at least 1", "V02")
if not os.path.exists(downloadsPath):
os.mkdir(downloadsPath)
if not os.path.exists(downloadsPath):
fatal_shutdown("Downloads directory does not exist (and could not be created)", "V03")
# Ensure working directories
basepath = os.path.dirname(os.path.abspath(sys.argv[0]))+os.sep
workdirs = ['temp'+os.sep, 'output'+os.sep, 'downloads'+os.sep]
for workdir in workdirs:
fullpath = basepath+workdir
if not os.path.exists(fullpath):
os.mkdir(fullpath)
if not os.access(fullpath, os.W_OK):
fatal_shutdown(("Working directory %s is not writable" % fullpath), "P01")
# Define important directories
CACHEDIR = basepath+workdirs[0]
OUTPUTDIR = basepath+workdirs[1]
# Function which creates a directory
def create_directory(directory):
try:
os.mkdir(directory)
except OSError as exc:
if exc.errno != errno.EEXIST:
raise
pass
# Function which initiates the thread executor
def initiate_thread_executor(resources):
# Determine index
threadStorage.threadIndex = resources.get(False)
# Open initial session and authenticate
threadStorage.localSession = requests.Session()
# Prepare params
params = {
'Path': '/shared/PHV DAP/DAP/Run Line Listing Report',
'Format': 'txt',
'Extension': '.csv',
'Action': 'Extract',
'P0': 0
}
# Initial request
ema_get_remote(threadStorage.localSession, params)
# Function which fetches an URL and caches it by a hash
def fetch_remote_url(url):
hash = hashlib.md5(url.encode()).hexdigest()
cacheFile = CACHEDIR+hash+".html"
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
if os.path.exists(cacheFile):
verbose_debug("Fetching local from %s" % url)
contents = Path(cacheFile).read_text()
return contents.encode('utf-8')
else:
verbose_debug("Fetching remote from %s" % url)
try:
with urllib.request.urlopen(url, context=ctx) as response:
content = response.read()
if not content:
fatal_shutdown("Unable to fetch remote data from '%s'" % url, "R01")
except:
verbose_debug("Remote data is empty '%s'" % url)
content = "".encode('utf-8')
f = open(cacheFile, "wb")
f.write(content)
f.close()
return fetch_remote_url(url)
# Function which creates BI filtered URL's
def create_filtered_url(combination):
filters = combination['filters']
params = {}
params['Path'] = '/shared/PHV DAP/DAP/Run Line Listing Report'
params['Format'] = 'txt'
params['Extension'] = '.csv'
params['Action'] = 'Extract'
paramCounter = 0
if len(filters) > 0:
keyName = ('P%d' % paramCounter)
params[keyName] = len(filters)
paramCounter = paramCounter + 1
for filter in filters:
keyName = ('P%d' % paramCounter)
params[keyName] = filter[0]
paramCounter = paramCounter + 1
keyName = ('P%d' % paramCounter)
familyName = ('"Line Listing Objects"."%s"' % filter[1])
params[keyName] = familyName
paramCounter = paramCounter + 1
keyName = ('P%d' % paramCounter)
params[keyName] = (("1 %s" % filter[2]))
paramCounter = paramCounter + 1
baseUrl = emaBaseUrl
url = baseUrl
for property in params:
value = params[property]
params[property] = value
url += ('&%s=%s' % (property, params[property]))
return {
'baseUrl': baseUrl,
'params': params,
'url': url
}
# Function which calls remote with the right session and headers
def ema_get_remote(localSession, localParams):
# Prepare statics
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
}
errorPattern = r'<div class="ErrorMessage">(.*?)</div>'
# Perform request
response = localSession.get(emaBaseUrl, params=localParams, headers=headers, timeout=(connect_timeout, download_timeout))
# Non 200 code?
if response.status_code != 200:
print(localParams)
fatal_shutdown(f"Non remote 200 HTTP code, got {response.status_code}", "EGR01")
# Explicitly specify the encoding if needed
responseText = response.content.decode('utf-16', errors='replace')
# Extract remote error?
match = re.search(errorPattern, responseText, re.DOTALL)
if match:
print(localParams)
fatal_shutdown(("Remote URL got error-response: %s" % match.group(1)), "EGR02")
# Here you go, sir.
return responseText
# Function which process compiled (downloadable) items
def process_compiled_item(item):
# Prepare target
hash = hashlib.md5(item['DownloadUrl'].encode()).hexdigest()
#targetDirectory = "%s%s%s" % (downloadsPath, os.sep, hash)
targetDirectory = "%s%s" % (downloadsPath, os.sep)
targetFilename = "%s%s%s" % (targetDirectory, os.sep, item['OutputFile'])
# Apply cache?
if USE_DOWNLOADS_CACHE and os.path.exists(targetFilename):
if not VERBOSE_DEBUGGING:
bar()
return
# Create target directory
create_directory(targetDirectory)
# Debug
verbose_debug("\nProcessing download")
verbose_debug("-------------------")
verbose_debug("-> HLC: "+item['HighLevelCode'])
verbose_debug("-> URL: "+item['DownloadUrl'])
verbose_debug("-> Into: "+targetDirectory)
verbose_debug("-> File: "+targetFilename)
# Perform the request
responseText = ema_get_remote(threadStorage.localSession, item['Params'])
# Write response body to file
with open(targetFilename, 'w', encoding='utf-8') as file:
file.write(responseText)
verbose_debug("-> READY")
if not VERBOSE_DEBUGGING:
bar()
# Function which generates a matrix
def generate_matrix(variables):
options = [[]]
for name, value in variables.items():
newOptions = []
for option in options:
newOptions.append(option + [f"{name}={value}"])
newOptions.append(option + [f"{name}!={value}"])
options = newOptions
return options
# Fetch product- and substances list
collection = []
productCounter = 0
breakIteration = False
baseUrls = {}
baseUrls['products'] = 'https://www.adrreports.eu/tables/product/'
baseUrls['substances'] = 'https://www.adrreports.eu/tables/substance/'
chars = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0-9']
for type in baseUrls:
baseUrl = baseUrls[type]
baseType = type[:-1].capitalize()
for char in chars:
indexUrl = "%s%s.html" % (baseUrl, char)
content = fetch_remote_url(indexUrl)
pattern = r"<a\s+.*?</a>"
matches = re.findall(pattern, str(content))
for match in matches:
# Extract CA
centrallyAuthorized = "1"
if type == 'substances':
centrallyAuthorized = "0"
if re.search(r"\*", match):
centrallyAuthorized = "1"
# Extract URL
words = match.strip().split('"')
url = words[1]
# Extract code
parsed = urlparse(unquote(url))
words = parsed.query.split('+')
code = words[-1]
# Extract name
parsed = match.split(">")
parsed = parsed[1].split("<")
name = parsed[0]
# Construct filters:
# First parameter is the total count of parameters
# Then, groups of 3 parameters are added
# Parameters above are respectively: operator, var1, var2 or value
combinations = []
# Default definition
defaultFilters = []
defaultFilters.append(['eq', baseType+' High Level Code', code])
defaultFilters.append(['eq', 'Seriousness', 'Serious'])
# Prepare outputfile
filterString = ""
# For Covid related HLC's we chop up the result because of a 250K line listing limit)
if type == 'substances' and code in ['40983312', '40995439', '42325700', '21755']:
# Generate matrix
matrix = generate_matrix(variables)
# Construct combo
comboCounter = 0
for year in years:
for square in matrix:
filterString = ""
filters = defaultFilters.copy()
filters.append(['eq', 'Gateway Year', year])
for point in square:
if re.search('!=', point):
variable, value = point.split('!=')
operator = 'neq'
filterString += "_Not"
else:
variable, value = point.split('=')
operator = 'eq'
filters.append([operator, variable, value])
filterString += ("_%s" % value)
comboCounter = comboCounter + 1
outputFile = ("%s_%s_%s%s.csv" % (type, code, year, filterString))
targetfile = downloadsPath + '/' + outputFile
if not os.path.exists(targetfile):
#print(f"{outputFile} exists")
#print(f"Working on {outputFile}")
combinations.append({'filters': filters, 'outputFile': outputFile, 'code': code, 'type': type})
else:
# Default ID match filter
filters = defaultFilters.copy()
outputFile = ("%s_%s.csv" % (type, code))
targetfile = downloadsPath + '/' + outputFile
if not os.path.exists(targetfile):
#print(f"Working on {outputFile}")
combinations.append({'filters': filters, 'outputFile': outputFile, 'code': code, 'type': type})
# Raise product-counter
productCounter += 1
# Run every combination
for combination in combinations:
# Create URL
result = create_filtered_url(combination)
# Replace placeholders
downloadUrl = result['url']
downloadUrl = downloadUrl.replace('$hlc', code)
downloadUrl = downloadUrl.replace('$type', type)
# Collection storage
item = {}
item['HighLevelCode'] = code
item['ProductName'] = name
item['CentrallyAuthorized'] = centrallyAuthorized
item['Type'] = type
item['BaseUrl'] = result['baseUrl']
item['IndexUrl'] = indexUrl
item['DownloadUrl'] = downloadUrl
item['OutputFile'] = combination['outputFile']
item['Filters'] = combination['filters']
item['Params'] = result['params']
collection.append(item)
#print(f"HLC: {item['HighLevelCode']}")
#hier wordt dus de lijst met HLCs opgetuigd
# Max product counter
if MAX_PRODUCT_COUNT and productCounter == MAX_PRODUCT_COUNT:
verbose_debug(("--> Warning: MAX_PRODUCT_COUNT reached = %d" % MAX_PRODUCT_COUNT))
breakIteration = True
break
if breakIteration:
break
if breakIteration:
break
# Preformat output
output = {}
for item in collection:
type = item['Type']
if not type in output.keys():
output[type] = []
output[type].append([str(item['HighLevelCode']), str(item['ProductName']), str(item['CentrallyAuthorized'])])
# Generate indexed output files
for type in output:
resultFile = "%s%s.txt" % (OUTPUTDIR, type)
verbose_debug("Generating output file %s" % resultFile)
if os.path.exists(resultFile):
os.remove(resultFile)
lines = output[type]
content = ""
for line in lines:
content += "\t".join(line)+"\n"
f = open(resultFile, "wb")
f.write(content.encode('utf-8'))
f.close()
# Debug
resultFile = "%surls.txt" % (OUTPUTDIR)
if os.path.exists(resultFile):
os.remove(resultFile)
verbose_debug("Generating URL output file %s" % resultFile)
content = ""
for item in collection:
content += "\n"+item['OutputFile']+"\n"+item['DownloadUrl']+"\n"
f = open(resultFile, "wb")
f.write(content.encode('utf-8'))
f.close()
# Run executor
if VERBOSE_DEBUGGING:
resources = queue.Queue(MAX_THREADS)
for threadIndex in range(MAX_THREADS):
resources.put(threadIndex, False)
with ThreadPool(MAX_THREADS, initiate_thread_executor, [resources]) as pool:
pool.map(process_compiled_item, collection)
else:
collectionLength = len(collection)
with alive_bar(collectionLength) as bar:
resources = queue.Queue(MAX_THREADS)
for threadIndex in range(MAX_THREADS):
resources.put(threadIndex, False)
with ThreadPool(MAX_THREADS, initiate_thread_executor, [resources]) as pool:
pool.map(process_compiled_item, collection)
# Fin
if MAX_PRODUCT_COUNT != None:
print(f"Requested {MAX_PRODUCT_COUNT} files")
runtime = round(time.time() - executionStart)
print(f"Execution finished in ~ {runtime} seconds")