Thursday, May 16, 2024

3 Steps to Navigate all Adverse Events Reports registered at EMA

Three steps to navigate all reported adverse events published by the European Medicines Agency from from 2002 until March 27, 2023 for Case Safety Reports submitted by patients and health care providers from both in- and outside the European Economic Area.

1. Install a 14 day free trial version of Tableau Desktop on your computer.

https://www.tableau.com/products/desktop/download

(Do not use the Tableau Public version as it has limited functionality and cannot handle the EMA Dashboard file)


2. Download the eudravigilance_march-2023.twbx file to your computer via:

https://drive.google.com/file/d/16RRvQJZ6VEyAF_Gm4DDPtE200esXTlSM/view?usp=share_link


3. Start Tableau and open eudravigilance_march-2023.twbx

(This is a big dashboard, please allow some time for loading and processing)



Friday, April 19, 2024

EMA Eudravigilance - Dashboard and collector tool


In the recent interview with Jim Ferguson, we discussed a Tableau dashboard I had generated from pharmacovigilance data from the European Medicines Agency (https://www.adrreports.eu/).



For those who wish to download the data, below is a Python script I developed with Twan van der Poel (https://vanderpoelsoftware.com/).

The tool downloads everything, both serious and non-serious line listings and both products and substances.

Later I will post the shell scripts and SQL statements to process the CSV files into PostgreSQL

If you have questions, pls feel free to leave a comment.

Download python script here: https://drive.google.com/file/d/1Zs8HiB3W-bd77OspdTrnhb8WiXsskfTM/view?usp=sharing

The Tableau dashboard is work in progress but can be downloaded here: https://drive.google.com/file/d/16RRvQJZ6VEyAF_Gm4DDPtE200esXTlSM/view?usp=share_link

(download a 14d trial of Tableau to open the dashboard)

import errno, json, hashlib, logging, pprint, urllib.request, re, requests, os, ssl, sys, time, threading, queue

from alive_progress import alive_bar;
from concurrent.futures import ThreadPoolExecutor
from itertools import product
from pathlib import Path
from multiprocessing.pool import ThreadPool
from urllib.parse import urlparse, unquote, urlencode
from requests.utils import requote_uri
from termcolor import colored
"""
# EMA Crawler
Crawls data from adrreports.eu and converts it into
data CSV and TXT files. The script operates from
the working directory and will create a "temp/", "output/" and "downloads/" subdirectory.
The "temp/" subdirectory will contain remote cache files and
an "output/" directory, the latter will contain the output files.
If downloads are performed the downloads are stored in a "downloads/" directory, this
directory holds hash references to keep track of what it's origins.
* Authors: In alphabetical order:
*          Twan van der Poel <twan@vanderpoelsoftware.com>
*          Wouter Aukema <waukema@gmail.com>
* Usage:   python3 ema-crawler.py
* Version: 1.98
# Changelog
## 1.98
* Used new session for each executor.map iteration
* Further integration of multithreading
* Temporary removal of hash-subdirectory
* Added stopwatch
## 1.97
* Dynamic non-configurable download directory
* Recycled session for each .get call
* Removed last bits of BIN_CHROMEDRIVER
* Dedicated function for remote calls
* Further integration of remote HTTP errors
* Resolved several encoding issues
## 1.96
* Simplified target directory based on files' md5 hash
* Wrapped executor in progress-bar
* Added parameter VERBOSE_DEBUGGING
## 1.95
* Added setting for verbose chromedriver logging
* Removed Selenium approach
* Integrated requests.Session()
## 1.94
* Integrated dynamic download storage
* Feature which moves downloads to their final location
* Added setting USE_DOWNLOADS_CACHE
## 1.93
* Integrated generic fatal_shutdown handler
## 1.92
* Headless support for Chromedriver
* Integrated storage filenames in list of downloads
* Integrated filters to capture downloads of more than 250K of lines properly
* Added parameter "MAX_PRODUCT_COUNT" for testing small chunks
* Dynamic construction of URL (incl. filters)
* Cache integration in download directory
* Dynamic construction of filter combinations for Covid related HLC's
## Todo
* Replace hard-coded ID's with detection of 250k limitation
"""
# Settings
MAX_PRODUCT_COUNT    = None
MAX_THREADS          = 20
USE_DOWNLOADS_CACHE  = False
VERBOSE_DEBUGGING    = False
connect_timeout: int = 40
download_timeout: int = 100

# Variables used to split up large downloads
variables = {
    'Seriousness':                                   'Serious',
    'Regulatory Primary Source Country EEA/Non EEA': 'European Economic Area',
    'Primary Source Qualification':                  'Healthcare Professional',
    'Sex':                                           'Female',
}
# Years used to split up large downloads
years = {
    '2017': '2017',
    '2018': '2018',
    '2019': '2019',
    '2020': '2020',
    '2021': '2021',
    '2022': '2022',
    '2023': '2023',
    '2024': '2024',
}
# Timing
executionStart = time.time()
print("Version 1.98")
# Function which handles fatal shutdowns
def fatal_shutdown(errorMessage, identifier):
    print(colored('+--------------------------------------------------+', 'red'))
    print(colored('| EMA crawler crashed, please read the error below |', 'red'))
    print(colored('+--------------------------------------------------+', 'red'))
    print(colored(errorMessage, 'yellow')+" "+colored("("+identifier+")", "white"))
    sys.exit()
# Function which debugs verbose logging (if enabled)
def verbose_debug(logMessage):
    if not VERBOSE_DEBUGGING:
        return
    print(logMessage)
# Helper to exit execution
def exit():
    sys.exit()
# Determine downloads directory
downloadsPath = ("%s%sdl" % (os.getcwd(), os.sep))
# Default EMA DPA base URL
emaBaseUrl = 'https://dap.ema.europa.eu/analytics/saw.dll?Go'
# Prepare thread storage
threadStorage = threading.local()
# Pre-perform validation
if MAX_PRODUCT_COUNT is not None and not isinstance(MAX_PRODUCT_COUNT, int):
    fatal_shutdown("MAX_PRODUCT_COUNT must be one of; numeric, None", "V01")
if not isinstance(MAX_THREADS, int) or MAX_THREADS < 1:
    fatal_shutdown("MAX_THREADS must be numeric, at least 1", "V02")
if not os.path.exists(downloadsPath):
    os.mkdir(downloadsPath)
if not os.path.exists(downloadsPath):
    fatal_shutdown("Downloads directory does not exist (and could not be created)", "V03")
# Ensure working directories
basepath = os.path.dirname(os.path.abspath(sys.argv[0]))+os.sep
workdirs = ['temp'+os.sep, 'output'+os.sep, 'downloads'+os.sep]
for workdir in workdirs:
    fullpath = basepath+workdir
    if not os.path.exists(fullpath):
        os.mkdir(fullpath)
    if not os.access(fullpath, os.W_OK):
        fatal_shutdown(("Working directory %s is not writable" % fullpath), "P01")
# Define important directories
CACHEDIR = basepath+workdirs[0]
OUTPUTDIR = basepath+workdirs[1]
# Function which creates a directory
def create_directory(directory):
    try:
        os.mkdir(directory)
    except OSError as exc:
        if exc.errno != errno.EEXIST:
            raise
        pass
# Function which initiates the thread executor
def initiate_thread_executor(resources):
    # Determine index
    threadStorage.threadIndex = resources.get(False)
    # Open initial session and authenticate
    threadStorage.localSession = requests.Session()
    # Prepare params
    params = {
        'Path':      '/shared/PHV DAP/DAP/Run Line Listing Report',
        'Format':    'txt',
        'Extension': '.csv',
        'Action':    'Extract',
        'P0':         0
    }
    # Initial request
    ema_get_remote(threadStorage.localSession, params)
# Function which fetches an URL and caches it by a hash
def fetch_remote_url(url):
    hash = hashlib.md5(url.encode()).hexdigest()
    cacheFile = CACHEDIR+hash+".html"
    ctx = ssl.create_default_context()
    ctx.check_hostname = False
    ctx.verify_mode = ssl.CERT_NONE
    if os.path.exists(cacheFile):
        verbose_debug("Fetching local from %s" % url)
        contents = Path(cacheFile).read_text()
        return contents.encode('utf-8')
    else:
        verbose_debug("Fetching remote from %s" % url)
        try:
            with urllib.request.urlopen(url, context=ctx) as response:
                content = response.read()
            if not content:
                fatal_shutdown("Unable to fetch remote data from '%s'" % url, "R01")
        except:
            verbose_debug("Remote data is empty '%s'" % url)
            content = "".encode('utf-8')
        f = open(cacheFile, "wb")
        f.write(content)
        f.close()
        return fetch_remote_url(url)
# Function which creates BI filtered URL's
def create_filtered_url(combination):
    filters = combination['filters']
    params = {}
    params['Path']      = '/shared/PHV DAP/DAP/Run Line Listing Report'
    params['Format']    = 'txt'
    params['Extension'] = '.csv'
    params['Action']    = 'Extract'
    paramCounter = 0
    if len(filters) > 0:
        keyName = ('P%d' % paramCounter)
        params[keyName] = len(filters)
        paramCounter = paramCounter + 1
        for filter in filters:
            keyName = ('P%d' % paramCounter)
            params[keyName] = filter[0]
            paramCounter = paramCounter + 1
            keyName = ('P%d' % paramCounter)
            familyName = ('"Line Listing Objects"."%s"' % filter[1])
            params[keyName] = familyName
            paramCounter = paramCounter + 1
            keyName = ('P%d' % paramCounter)
            params[keyName] = (("1 %s" % filter[2]))
            paramCounter = paramCounter + 1
    baseUrl = emaBaseUrl
    url = baseUrl
    for property in params:
        value = params[property]
        params[property] = value
        url += ('&%s=%s' % (property, params[property]))
    return {
        'baseUrl': baseUrl,
        'params': params,
        'url': url
    }
# Function which calls remote with the right session and headers
def ema_get_remote(localSession, localParams):
    # Prepare statics
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36'
    }
    errorPattern = r'<div class="ErrorMessage">(.*?)</div>'
    # Perform request
    response = localSession.get(emaBaseUrl, params=localParams, headers=headers, timeout=(connect_timeout, download_timeout))
    # Non 200 code?
    if response.status_code != 200:
        print(localParams)
        fatal_shutdown(f"Non remote 200 HTTP code, got {response.status_code}", "EGR01")
    # Explicitly specify the encoding if needed
    responseText = response.content.decode('utf-16', errors='replace')
    # Extract remote error?
    match = re.search(errorPattern, responseText, re.DOTALL)
    if match:
        print(localParams)
        fatal_shutdown(("Remote URL got error-response: %s" % match.group(1)), "EGR02")
    # Here you go, sir.
    return responseText
# Function which process compiled (downloadable) items
def process_compiled_item(item):
    # Prepare target
    hash = hashlib.md5(item['DownloadUrl'].encode()).hexdigest()
    #targetDirectory = "%s%s%s" % (downloadsPath, os.sep, hash)
    targetDirectory = "%s%s" % (downloadsPath, os.sep)
    targetFilename = "%s%s%s" % (targetDirectory, os.sep, item['OutputFile'])
    # Apply cache?
    if USE_DOWNLOADS_CACHE and os.path.exists(targetFilename):
        if not VERBOSE_DEBUGGING:
            bar()
        return
    # Create target directory
    create_directory(targetDirectory)
    # Debug
    verbose_debug("\nProcessing download")
    verbose_debug("-------------------")
    verbose_debug("-> HLC:  "+item['HighLevelCode'])
    verbose_debug("-> URL:  "+item['DownloadUrl'])
    verbose_debug("-> Into: "+targetDirectory)
    verbose_debug("-> File: "+targetFilename)
    # Perform the request
    responseText = ema_get_remote(threadStorage.localSession, item['Params'])
    # Write response body to file
    with open(targetFilename, 'w', encoding='utf-8') as file:
        file.write(responseText)
    verbose_debug("-> READY")
    if not VERBOSE_DEBUGGING:
        bar()
# Function which generates a matrix
def generate_matrix(variables):
    options = [[]]
    for name, value in variables.items():
        newOptions = []
        for option in options:
            newOptions.append(option + [f"{name}={value}"])
            newOptions.append(option + [f"{name}!={value}"])
        options = newOptions
    return options
# Fetch product- and substances list
collection = []
productCounter = 0
breakIteration = False
baseUrls = {}
baseUrls['products']   = 'https://www.adrreports.eu/tables/product/'
baseUrls['substances'] = 'https://www.adrreports.eu/tables/substance/'
chars = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', '0-9']
for type in baseUrls:
    baseUrl = baseUrls[type]
    baseType = type[:-1].capitalize()
    for char in chars:
        indexUrl = "%s%s.html" % (baseUrl, char)
        content = fetch_remote_url(indexUrl)
        pattern = r"<a\s+.*?</a>"
        matches = re.findall(pattern, str(content))
        for match in matches:
            # Extract CA
            centrallyAuthorized = "1"
            if type == 'substances':
                centrallyAuthorized = "0"
                if re.search(r"\*", match):
                    centrallyAuthorized = "1"
            # Extract URL
            words = match.strip().split('"')
            url = words[1]
            # Extract code
            parsed = urlparse(unquote(url))
            words = parsed.query.split('+')
            code = words[-1]
            # Extract name
            parsed = match.split(">")
            parsed = parsed[1].split("<")
            name = parsed[0]
            # Construct filters:
            #   First parameter is the total count of parameters
            #   Then, groups of 3 parameters are added
            #   Parameters above are respectively: operator, var1, var2 or value
            combinations = []
            # Default definition
            defaultFilters = []
            defaultFilters.append(['eq', baseType+' High Level Code', code])
            defaultFilters.append(['eq', 'Seriousness', 'Serious'])
            # Prepare outputfile
            filterString = ""
            # For Covid related HLC's we chop up the result because of a 250K line listing limit)
            if type == 'substances' and code in ['40983312', '40995439', '42325700', '21755']:
                # Generate matrix
                matrix = generate_matrix(variables)
                # Construct combo
                comboCounter = 0
                for year in years:
                    for square in matrix:
                        filterString = ""
                        filters = defaultFilters.copy()
                        filters.append(['eq', 'Gateway Year', year])
                        for point in square:
                            if re.search('!=', point):
                                variable, value = point.split('!=')
                                operator = 'neq'
                                filterString += "_Not"
                            else:
                                variable, value = point.split('=')
                                operator = 'eq'
                            filters.append([operator, variable, value])
                            filterString += ("_%s" % value)
                        comboCounter = comboCounter + 1
                        outputFile = ("%s_%s_%s%s.csv" % (type, code, year, filterString))
                        targetfile = downloadsPath + '/' + outputFile
                        if not os.path.exists(targetfile):
                            #print(f"{outputFile} exists")
                            #print(f"Working on {outputFile}")
                            combinations.append({'filters': filters, 'outputFile': outputFile, 'code': code, 'type': type})
            else:
                # Default ID match filter
                filters = defaultFilters.copy()
                outputFile = ("%s_%s.csv" % (type, code))
                targetfile = downloadsPath + '/' + outputFile
                if not os.path.exists(targetfile):
                    #print(f"Working on {outputFile}")
                    combinations.append({'filters': filters, 'outputFile': outputFile, 'code': code, 'type': type})
            # Raise product-counter
            productCounter += 1
            # Run every combination
            for combination in combinations:
                # Create URL
                result = create_filtered_url(combination)
                # Replace placeholders
                downloadUrl = result['url']
                downloadUrl = downloadUrl.replace('$hlc', code)
                downloadUrl = downloadUrl.replace('$type', type)
                # Collection storage
                item = {}
                item['HighLevelCode'] = code
                item['ProductName'] = name
                item['CentrallyAuthorized'] = centrallyAuthorized
                item['Type'] = type
                item['BaseUrl'] = result['baseUrl']
                item['IndexUrl'] = indexUrl
                item['DownloadUrl'] = downloadUrl
                item['OutputFile'] = combination['outputFile']
                item['Filters'] = combination['filters']
                item['Params'] = result['params']
                collection.append(item)
                #print(f"HLC: {item['HighLevelCode']}")
                #hier wordt dus de lijst met HLCs opgetuigd
            # Max product counter
            if MAX_PRODUCT_COUNT and productCounter == MAX_PRODUCT_COUNT:
                verbose_debug(("--> Warning: MAX_PRODUCT_COUNT reached = %d" % MAX_PRODUCT_COUNT))
                breakIteration = True
                break
        if breakIteration:
            break
    if breakIteration:
        break
# Preformat output
output = {}
for item in collection:
    type = item['Type']
    if not type in output.keys():
        output[type] = []
    output[type].append([str(item['HighLevelCode']), str(item['ProductName']), str(item['CentrallyAuthorized'])])
# Generate indexed output files
for type in output:
    resultFile = "%s%s.txt" % (OUTPUTDIR, type)
    verbose_debug("Generating output file %s" % resultFile)
    if os.path.exists(resultFile):
        os.remove(resultFile)
    lines = output[type]
    content = ""
    for line in lines:
        content += "\t".join(line)+"\n"
    f = open(resultFile, "wb")
    f.write(content.encode('utf-8'))
    f.close()
# Debug
resultFile = "%surls.txt" % (OUTPUTDIR)
if os.path.exists(resultFile):
    os.remove(resultFile)
verbose_debug("Generating URL output file %s" % resultFile)
content = ""
for item in collection:
    content += "\n"+item['OutputFile']+"\n"+item['DownloadUrl']+"\n"
f = open(resultFile, "wb")
f.write(content.encode('utf-8'))
f.close()
# Run executor
if VERBOSE_DEBUGGING:
    resources = queue.Queue(MAX_THREADS)
    for threadIndex in range(MAX_THREADS):
        resources.put(threadIndex, False)
    with ThreadPool(MAX_THREADS, initiate_thread_executor, [resources]) as pool:
        pool.map(process_compiled_item, collection)
else:
    collectionLength = len(collection)
    with alive_bar(collectionLength) as bar:
        resources = queue.Queue(MAX_THREADS)
        for threadIndex in range(MAX_THREADS):
            resources.put(threadIndex, False)
        with ThreadPool(MAX_THREADS, initiate_thread_executor, [resources]) as pool:
            pool.map(process_compiled_item, collection)
# Fin
if MAX_PRODUCT_COUNT != None:
    print(f"Requested {MAX_PRODUCT_COUNT} files")
runtime = round(time.time() - executionStart)
print(f"Execution finished in ~ {runtime} seconds")