Julia & Spark

In this notebook we will explore how to leverage Julia as an interface to Apache Spark. We will list how to setup a Spark cluster and then use Julia to perform an analysis on a large electronic medical records dataset.


What is Apache Spark?

Apache Spark is an open source cluster computing framework. It enables distributed data analysis with implicit data parallelism and fault-tolerance. Spark quickly became the go to framework for big data analysis. It is currently used at the following companies:

  • Amazon
  • Baidu
  • eBay
  • Groupon
  • OpenTable
  • Tencent
  • ...

In October 2014, Spark broke the data sorting world record by sorting 100TB of data in 23 minutes using 207 EC2 AWS instances. The previous record holder – Hadoop MapReduce – needed 2100 machines and 72 minutes to sort the same dataset.

Sparking with Julia

Spark only supports Python and Scala out of the box. Therefore to use Julia, we will use a package called Spark.jl written by Andrei Zhabinski.

Spark.jl only supports the following commands on the cluster:

  • SparkConfig
  • SparkContext
  • RDD
  • JuliaRDD
  • JavaRDD
  • text_file
  • parallelize
  • map
  • reduce
  • filter
  • map_partitions
  • map_partitions_with_index
  • collect
  • count
  • close
  • typehint!


Starting a Cluster

We can start a cluster by navigating to aws.amazon.com and clicking on the EMR option:

Then click on the "Create" button to start the cluster configuration:

Select the following options then click on "Create Cluster":

PS: Please note that it will take AWS around 5 minutes to spin up the cluster.

Setting up Julia & Spark.jl

Once the cluster is up and running, we now can install Julia and download Spark.jl package.

We navigate to the EMR master node by SSH using the following command :

  • ssh -i {private_key_file}.pem ec2-user@{master_node_ip}

Once logged in, we can proceed to install the needed packages:

  • Install maven (Needed for building Spark.jl):

    • sudo wget http://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo
    • sudo sed -i s/\$releasever/6/g /etc/yum.repos.d/epel-apache-maven.repo
    • sudo yum install -y apache-maven
    • the validate the install by issuing: mvn --version
  • Install Julia:

    • wget https://julialang.s3.amazonaws.com/bin/linux/x64/0.5/julia-0.5.0-linux-x86_64.tar.gz
    • tar -zxvf julia-0.5.0-linux-x86_64.tar.gz
    • mv julia-3c9d75391c/ julia-0.5
    • cd julia-0.5
    • cd bin
  • Install Spark.jl:

    • ./julia
    • Pkg.clone("https://github.com/dfdx/Spark.jl")
    • Pkg.build("Spark")
  • Add Julia to Path

    • echo "export PATH=\"/home/ec2-user/julia-0.5/bin:\$PATH\"" > ~/.bashrc
    • source ~/.bashrc

Data Analysis

Dataset Description

The dataset we will be using is an obfuscated medical dataset. We will focus specifically on a file called diagnosis_list.csv. It contains the diagnosis information for 20,000 patients identifiable using their – randomized – medical record number.

Uploading Dataset

We will need to upload the diagnosis_list.csv file to /usr/local/main_dataset/diagnosis_list.csv on the master cluster node.

Example Map Reduce Functions

We can now start julia on the master cluster and start our testing. We can validate that the map/reduce functionality is operation by running the following simple queries

In [62]:
Here we encounter the first area of improvement for this plugin: verbosity. The commands are too verbose and the actual results are hidden amongst the logging statements. The logs should be set to debug levels and only shown if the user changes the log level. The lines that we care about the most from the above outputs are the answers to each query:

16/11/01 03:05:32 INFO DAGScheduler: Job 1 finished: collect at JuliaRDD.scala:44, took 27.444345 s
16/11/01 03:06:09 INFO DAGScheduler: Job 2 finished: collect at JuliaRDD.scala:44, took 25.485222 s

Comorbidity List

To demonstrate a longer example, we can run the following code to retrieve the list of comorbidity for a specific disease:

In [ ]:
using Spark
using DataFrames
sc = SparkContext(master="local")
path = "/usr/local/main_dataset/diagnosis_list.csv"
txt = text_file(sc, path)

# Get list of patients 
patients_rdd = map(txt, line -> begin
           x = split(line, ',')

patients = collect(patients_rdd)

# Get dates
dates_rdd = map(txt, line -> begin
           x = split(line, ',')
           Dates.DateTime(x[3], "yyyy-mm-dd HH:MM:SS")

dates = collect(dates_rdd)

# Get diags
diags_rdd = map(txt, line -> begin
           x = split(line, ',')

diags = collect(diags_rdd)
# Construct data frame from data returned from Spark
df = DataFrame()
df[:mrn] = patients
df[:dates] = dates
df[:diags] = diags

sort!(df, cols = [:mrn, :dates])

diags_array = convert(Array, df)

disease = "Aniridia"

current_mrn = "None"
d = unique(diags_array[:,3])
l = Dict(i => 0 for i = d)
disease_found = false
for i  in 1:size(diags_array, 1)
    row = diags_array[i,:]
    if current_mrn != row[1]
        current_mrn = row[1]
        disease_found = false

    if row[3] == disease
        disease_found = true
    if disease_found && row[3] != disease
        l[row[3]] += 1

for (key, value) in l 
    if value != 0 
        println(key, "=>", value)

By running the above code, we can get the following list of diseases that are associated with "Aniridia":

Subtrochanteric fx-open=>1 Mening in oth fungal dis=>1 Hypermobility of coccyx=>1 C5-c7 fx-op/cord inj NOS=>1 Neoplasm other sites NOS=>1 Burn NOS hand-multiple=>1 Puerp endomet del w p/p=>1 Ac eustachian salping=>1 TB sp crd absc-no exam=>1 Post-void dribbling=>1 Epimembranous nephritis=>1 Vasc lesion cord-deliver=>1 Serous choroid detachmnt=>1 Malig neo soft palate=>1 Candidal esophagitis=>1 Measles keratitis=>1 Toxic effect of asbestos=>1 Inj thoracic vessel NOS=>1 "Panniculitis=>1 Flu w manifestation NEC=>1 RR acc w explos-pasngr=>1 Ice Skating=>1 Pulmonary AV malformatn=>1 Mach acc-occ power boat=>1 Toxic effect strychnine=>1 "Fx mid/prx phal=>1 Open wound vulva-compl=>1 Hpt C w/o hepat coma NOS=>1 TB urinary NEC-unspec=>1 Carotid sinus syndrome=>1 Chr otitis externa NEC=>1 Burn NOS upper arm=>1 Ben carcinoid sig colon=>1 Meconium staining=>1 Tubrclma sp crd-histo dx=>1 Ac stomach ulc NOS-obstr=>1 Late ef-spch/lang df NEC=>1

This long example exposed a big gap in the implementation of this library: it is missing the group by and sort by functions that are crucial for perfoming basic analysis on Spark. Without those functions, we had to rely on DataFrames to perform the intended computation.

Conclusion & Next Steps

We can see from this analysis that even though we can perform mapping and reduction using Spark.jl, the lack of group by and sort by functions severely limit the possible solutions.

Plugin Issues :

  • No Group by function
  • No Sorty by function
  • Verbosity

We believe that tackling these 3 pain points will dramatically increase the usability of this plugin.