Julia & Spark

In this notebook we will explore how to leverage Julia as an interface to Apache Spark. We will list how to setup a Spark cluster and then use Julia to perform an analysis on a large electronic medical records dataset.

Introduction

What is Apache Spark?

Apache Spark is an open source cluster computing framework. It enables distributed data analysis with implicit data parallelism and fault-tolerance. Spark quickly became the go to framework for big data analysis. It is currently used at the following companies:

  • Amazon
  • Baidu
  • eBay
  • Groupon
  • OpenTable
  • Tencent
  • ...

In October 2014, Spark broke the data sorting world record by sorting 100TB of data in 23 minutes using 207 EC2 AWS instances. The previous record holder – Hadoop MapReduce – needed 2100 machines and 72 minutes to sort the same dataset.

Sparking with Julia

Spark only supports Python and Scala out of the box. Therefore to use Julia, we will use a package called Spark.jl written by Andrei Zhabinski.

Spark.jl only supports the following commands on the cluster:

  • SparkConfig
  • SparkContext
  • RDD
  • JuliaRDD
  • JavaRDD
  • text_file
  • parallelize
  • map
  • reduce
  • filter
  • map_partitions
  • map_partitions_with_index
  • collect
  • count
  • close
  • typehint!

Setup

Starting a Cluster

We can start a cluster by navigating to aws.amazon.com and clicking on the EMR option:

Then click on the "Create" button to start the cluster configuration:

Select the following options then click on "Create Cluster":

PS: Please note that it will take AWS around 5 minutes to spin up the cluster.

Setting up Julia & Spark.jl

Once the cluster is up and running, we now can install Julia and download Spark.jl package.

We navigate to the EMR master node by SSH using the following command :

  • ssh -i {private_key_file}.pem ec2-user@{master_node_ip}

Once logged in, we can proceed to install the needed packages:

  • Install maven (Needed for building Spark.jl):

    • sudo wget http://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo
    • sudo sed -i s/\$releasever/6/g /etc/yum.repos.d/epel-apache-maven.repo
    • sudo yum install -y apache-maven
    • the validate the install by issuing: mvn --version
  • Install Julia:

    • wget https://julialang.s3.amazonaws.com/bin/linux/x64/0.5/julia-0.5.0-linux-x86_64.tar.gz
    • tar -zxvf julia-0.5.0-linux-x86_64.tar.gz
    • mv julia-3c9d75391c/ julia-0.5
    • cd julia-0.5
    • cd bin
  • Install Spark.jl:

    • ./julia
    • Pkg.clone("https://github.com/dfdx/Spark.jl")
    • Pkg.build("Spark")
  • Add Julia to Path

    • echo "export PATH=\"/home/ec2-user/julia-0.5/bin:\$PATH\"" > ~/.bashrc
    • source ~/.bashrc

Data Analysis

Dataset Description

The dataset we will be using is an obfuscated medical dataset. We will focus specifically on a file called diagnosis_list.csv. It contains the diagnosis information for 20,000 patients identifiable using their – randomized – medical record number.

Uploading Dataset

We will need to upload the diagnosis_list.csv file to /usr/local/main_dataset/diagnosis_list.csv on the master cluster node.

Example Map Reduce Functions

We can now start julia on the master cluster and start our testing. We can validate that the map/reduce functionality is operation by running the following simple queries

In [62]:
using Spark
sc = SparkContext(master="local")
path = "/usr/local/main_dataset/diagnosis_list.csv"
txt = text_file(sc, path)
rdd = map(txt, line -> length(split(line, ',')[5]))
reduce(rdd, min)
reduce(rdd, max)
Loaded /Library/Java/JavaVirtualMachines/jdk1.8.0_65.jdk/Contents/Home/jre/lib/server/libjvm.dylib
julia> using Spark
Loaded /usr/lib/jvm/java/jre/lib/amd64/server/libjvm.so

julia> sc = SparkContext(master="local")
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/11/01 03:04:54 INFO SparkContext: Running Spark version 1.5.2
16/11/01 03:04:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/11/01 03:04:54 INFO SecurityManager: Changing view acls to: ec2-user
16/11/01 03:04:54 INFO SecurityManager: Changing modify acls to: ec2-user
16/11/01 03:04:54 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(ec2-user); users with modify permissions: Set(ec2-user)
16/11/01 03:04:55 INFO Slf4jLogger: Slf4jLogger started
16/11/01 03:04:55 INFO Remoting: Starting remoting
16/11/01 03:04:55 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@172.30.4.204:39028]
16/11/01 03:04:55 INFO Utils: Successfully started service 'sparkDriver' on port 39028.
16/11/01 03:04:55 INFO SparkEnv: Registering MapOutputTracker
16/11/01 03:04:55 INFO SparkEnv: Registering BlockManagerMaster
16/11/01 03:04:55 INFO DiskBlockManager: Created local directory at /mnt/tmp/blockmgr-b3477f02-e933-4b49-9140-f7c349b0d8d3
16/11/01 03:04:55 INFO MemoryStore: MemoryStore started with capacity 491.7 MB
16/11/01 03:04:55 INFO HttpFileServer: HTTP File server directory is /mnt/tmp/spark-adfac224-9f64-44dc-ab6c-19abf5215c9f/httpd-fbe0eeeb-da0b-48a0-95b5-eb9c1e952e4c
16/11/01 03:04:55 INFO HttpServer: Starting HTTP Server
16/11/01 03:04:55 INFO Utils: Successfully started service 'HTTP file server' on port 39366.
16/11/01 03:04:55 INFO SparkEnv: Registering OutputCommitCoordinator
16/11/01 03:04:56 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/11/01 03:04:56 INFO SparkUI: Started SparkUI at http://172.30.4.204:4040
16/11/01 03:04:56 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
16/11/01 03:04:56 INFO Executor: Starting executor ID driver on host localhost
16/11/01 03:04:56 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 36140.
16/11/01 03:04:56 INFO NettyBlockTransferService: Server created on 36140
16/11/01 03:04:56 INFO BlockManagerMaster: Trying to register BlockManager
16/11/01 03:04:56 INFO BlockManagerMasterEndpoint: Registering block manager localhost:36140 with 491.7 MB RAM, BlockManagerId(driver, localhost, 36140)
16/11/01 03:04:56 INFO BlockManagerMaster: Registered BlockManager
16/11/01 03:04:56 INFO SparkContext: Added JAR /home/ec2-user/.julia/v0.5/Spark/src/../jvm/sparkjl/target/sparkjl-0.1.jar at http://172.30.4.204:39366/jars/sparkjl-0.1.jar with timestamp 1477969496556
SparkContext(Julia App on Spark)

julia> path = "/usr/local/main_dataset/diagnosis_list.csv"
"/usr/local/main_dataset/diagnosis_list.csv"

julia> txt = text_file(sc, path)
16/11/01 03:04:57 INFO MemoryStore: ensureFreeSpace(136784) called with curMem=0, maxMem=515553361
16/11/01 03:04:57 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 133.6 KB, free 491.5 MB)
16/11/01 03:04:57 INFO MemoryStore: ensureFreeSpace(12207) called with curMem=136784, maxMem=515553361
16/11/01 03:04:57 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 11.9 KB, free 491.5 MB)
16/11/01 03:04:57 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:36140 (size: 11.9 KB, free: 491.7 MB)
16/11/01 03:04:57 INFO SparkContext: Created broadcast 0 from textFile at :0
PipelinedRDD(JavaRDD{String}())

julia> rdd = map(txt, line -> length(split(line, ',')[5]))
PipelinedRDD(JavaRDD{String}())

julia> reduce(rdd, min)
16/11/01 03:04:59 INFO FileInputFormat: Total input paths to process : 1
16/11/01 03:04:59 INFO SparkContext: Starting job: first at :0
16/11/01 03:04:59 INFO DAGScheduler: Got job 0 (first at :0) with 1 output partitions
16/11/01 03:04:59 INFO DAGScheduler: Final stage: ResultStage 0(first at :0)
16/11/01 03:04:59 INFO DAGScheduler: Parents of final stage: List()
16/11/01 03:04:59 INFO DAGScheduler: Missing parents: List()
16/11/01 03:04:59 INFO DAGScheduler: Submitting ResultStage 0 (JuliaRDD[5] at RDD at JuliaRDD.scala:15), which has no missing parents
16/11/01 03:04:59 INFO MemoryStore: ensureFreeSpace(4208) called with curMem=148991, maxMem=515553361
16/11/01 03:04:59 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.1 KB, free 491.5 MB)
16/11/01 03:04:59 INFO MemoryStore: ensureFreeSpace(2467) called with curMem=153199, maxMem=515553361
16/11/01 03:04:59 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.4 KB, free 491.5 MB)
16/11/01 03:04:59 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:36140 (size: 2.4 KB, free: 491.7 MB)
16/11/01 03:04:59 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:861
16/11/01 03:04:59 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (JuliaRDD[5] at RDD at JuliaRDD.scala:15)
16/11/01 03:04:59 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/11/01 03:04:59 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 2212 bytes)
16/11/01 03:04:59 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/11/01 03:04:59 INFO Executor: Fetching http://172.30.4.204:39366/jars/sparkjl-0.1.jar with timestamp 1477969496556
16/11/01 03:04:59 INFO Utils: Fetching http://172.30.4.204:39366/jars/sparkjl-0.1.jar to /mnt/tmp/spark-adfac224-9f64-44dc-ab6c-19abf5215c9f/userFiles-5ac41ca6-dbdf-442a-82e3-f9d226ea03b5/fetchFileTemp3192813694756357817.tmp
16/11/01 03:04:59 INFO Executor: Adding file:/mnt/tmp/spark-adfac224-9f64-44dc-ab6c-19abf5215c9f/userFiles-5ac41ca6-dbdf-442a-82e3-f9d226ea03b5/sparkjl-0.1.jar to class loader
Loaded /usr/lib/jvm/java/jre/lib/amd64/server/libjvm.so
16/11/01 03:05:02 INFO HadoopRDD: Input split: file:/usr/local/main_dataset/diagnosis_list.csv:0+33554432
16/11/01 03:05:02 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
16/11/01 03:05:02 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
16/11/01 03:05:02 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
16/11/01 03:05:02 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
16/11/01 03:05:02 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
16/11/01 03:05:04 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2053 bytes result sent to driver
16/11/01 03:05:04 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 4745 ms on localhost (1/1)
16/11/01 03:05:04 INFO DAGScheduler: ResultStage 0 (first at :0) finished in 4.758 s
16/11/01 03:05:04 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/11/01 03:05:04 INFO DAGScheduler: Job 0 finished: first at :0, took 4.826125 s
16/11/01 03:05:04 INFO SparkContext: Starting job: collect at JuliaRDD.scala:44
16/11/01 03:05:04 INFO DAGScheduler: Got job 1 (collect at JuliaRDD.scala:44) with 2 output partitions
16/11/01 03:05:04 INFO DAGScheduler: Final stage: ResultStage 1(collect at JuliaRDD.scala:44)
16/11/01 03:05:04 INFO DAGScheduler: Parents of final stage: List()
16/11/01 03:05:04 INFO DAGScheduler: Missing parents: List()
16/11/01 03:05:04 INFO DAGScheduler: Submitting ResultStage 1 (JuliaRDD[4] at RDD at JuliaRDD.scala:15), which has no missing parents
16/11/01 03:05:04 INFO MemoryStore: ensureFreeSpace(4272) called with curMem=155666, maxMem=515553361
16/11/01 03:05:04 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 4.2 KB, free 491.5 MB)
16/11/01 03:05:04 INFO MemoryStore: ensureFreeSpace(2457) called with curMem=159938, maxMem=515553361
16/11/01 03:05:04 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 2.4 KB, free 491.5 MB)
16/11/01 03:05:04 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:36140 (size: 2.4 KB, free: 491.7 MB)
16/11/01 03:05:04 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:861
16/11/01 03:05:04 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (JuliaRDD[4] at RDD at JuliaRDD.scala:15)
16/11/01 03:05:04 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
16/11/01 03:05:04 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 2212 bytes)
16/11/01 03:05:04 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
Loaded /usr/lib/jvm/java/jre/lib/amd64/server/libjvm.so
16/11/01 03:05:07 INFO HadoopRDD: Input split: file:/usr/local/main_dataset/diagnosis_list.csv:0+33554432
16/11/01 03:05:07 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:36140 in memory (size: 2.4 KB, free: 491.7 MB)
16/11/01 03:05:07 INFO ContextCleaner: Cleaned accumulator 1
(Exception thrown after task completion (likely due to cleanup),java.lang.NullPointerException)

 in yieldto(::Task, ::ANY) at ./event.jl:136
 in wait() at ./event.jl:169
 in stream_wait(::Task) at ./stream.jl:44
 in uv_write(::TCPSocket, ::Ptr{UInt8}, ::UInt64) at ./stream.jl:820
 in unsafe_write(::TCPSocket, ::Ptr{UInt8}, ::UInt64) at ./stream.jl:830
 in unsafe_write(::TCPSocket, ::Base.RefValue{Int32}, ::Int64) at ./io.jl:155
 in writeobj at /home/ec2-user/.julia/v0.5/Spark/src/worker.jl:28 [inlined]
 in dump_stream(::TCPSocket, ::Iterators.IMap) at /home/ec2-user/.julia/v0.5/Spark/src/worker.jl:51
 in launch_worker() at /home/ec2-user/.julia/v0.5/Spark/src/worker.jl:76
 in eval(::Module, ::Any) at ./boot.jl:234
 in process_options(::Base.JLOptions) at ./client.jl:239
 in _start() at ./client.jl:318ERROR: write: broken pipe (EPIPE)
 in yieldto(::Task, ::ANY) at ./event.jl:136
 in wait() at ./event.jl:169
 in stream_wait(::Task) at ./stream.jl:44
 in uv_write(::TCPSocket, ::Ptr{UInt8}, ::UInt64) at ./stream.jl:820
 in unsafe_write(::TCPSocket, ::Ptr{UInt8}, ::UInt64) at ./stream.jl:830
 in unsafe_write(::TCPSocket, ::Base.RefValue{Int32}, ::Int64) at ./io.jl:155
 in launch_worker() at /home/ec2-user/.julia/v0.5/Spark/src/worker.jl:83
 in eval(::Module, ::Any) at ./boot.jl:234
 in process_options(::Base.JLOptions) at ./client.jl:239
 in _start() at ./client.jl:318
16/11/01 03:05:22 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 2059 bytes result sent to driver
16/11/01 03:05:22 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 2, localhost, PROCESS_LOCAL, 2212 bytes)
16/11/01 03:05:22 INFO Executor: Running task 1.0 in stage 1.0 (TID 2)
16/11/01 03:05:22 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 17438 ms on localhost (1/2)
Loaded /usr/lib/jvm/java/jre/lib/amd64/server/libjvm.so
16/11/01 03:05:24 INFO HadoopRDD: Input split: file:/usr/local/main_dataset/diagnosis_list.csv:33554432+12768524
16/11/01 03:05:32 INFO Executor: Finished task 1.0 in stage 1.0 (TID 2). 2059 bytes result sent to driver
16/11/01 03:05:32 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 2) in 9986 ms on localhost (2/2)
16/11/01 03:05:32 INFO DAGScheduler: ResultStage 1 (collect at JuliaRDD.scala:44) finished in 27.420 s
16/11/01 03:05:32 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
16/11/01 03:05:32 INFO DAGScheduler: Job 1 finished: collect at JuliaRDD.scala:44, took 27.444345 s
3

julia> reduce(rdd, max)
16/11/01 03:05:44 INFO SparkContext: Starting job: collect at JuliaRDD.scala:44
16/11/01 03:05:44 INFO DAGScheduler: Got job 2 (collect at JuliaRDD.scala:44) with 2 output partitions
16/11/01 03:05:44 INFO DAGScheduler: Final stage: ResultStage 2(collect at JuliaRDD.scala:44)
16/11/01 03:05:44 INFO DAGScheduler: Parents of final stage: List()
16/11/01 03:05:44 INFO DAGScheduler: Missing parents: List()
16/11/01 03:05:44 INFO DAGScheduler: Submitting ResultStage 2 (JuliaRDD[6] at RDD at JuliaRDD.scala:15), which has no missing parents
16/11/01 03:05:44 INFO MemoryStore: ensureFreeSpace(4272) called with curMem=155720, maxMem=515553361
16/11/01 03:05:44 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 4.2 KB, free 491.5 MB)
16/11/01 03:05:44 INFO MemoryStore: ensureFreeSpace(2457) called with curMem=159992, maxMem=515553361
16/11/01 03:05:44 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 2.4 KB, free 491.5 MB)
16/11/01 03:05:44 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:36140 (size: 2.4 KB, free: 491.7 MB)
16/11/01 03:05:44 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:861
16/11/01 03:05:44 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 2 (JuliaRDD[6] at RDD at JuliaRDD.scala:15)
16/11/01 03:05:44 INFO TaskSchedulerImpl: Adding task set 2.0 with 2 tasks
16/11/01 03:05:44 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 3, localhost, PROCESS_LOCAL, 2212 bytes)
16/11/01 03:05:44 INFO Executor: Running task 0.0 in stage 2.0 (TID 3)
Loaded /usr/lib/jvm/java/jre/lib/amd64/server/libjvm.so
16/11/01 03:05:46 INFO HadoopRDD: Input split: file:/usr/local/main_dataset/diagnosis_list.csv:0+33554432
16/11/01 03:05:46 INFO BlockManagerInfo: Removed broadcast_2_piece0 on localhost:36140 in memory (size: 2.4 KB, free: 491.7 MB)
16/11/01 03:05:46 INFO ContextCleaner: Cleaned accumulator 2
16/11/01 03:06:01 INFO Executor: Finished task 0.0 in stage 2.0 (TID 3). 2059 bytes result sent to driver
16/11/01 03:06:01 INFO TaskSetManager: Starting task 1.0 in stage 2.0 (TID 4, localhost, PROCESS_LOCAL, 2212 bytes)
16/11/01 03:06:01 INFO Executor: Running task 1.0 in stage 2.0 (TID 4)
16/11/01 03:06:01 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 3) in 16595 ms on localhost (1/2)
Loaded /usr/lib/jvm/java/jre/lib/amd64/server/libjvm.so
16/11/01 03:06:03 INFO HadoopRDD: Input split: file:/usr/local/main_dataset/diagnosis_list.csv:33554432+12768524
16/11/01 03:06:09 INFO Executor: Finished task 1.0 in stage 2.0 (TID 4). 2059 bytes result sent to driver
16/11/01 03:06:09 INFO TaskSetManager: Finished task 1.0 in stage 2.0 (TID 4) in 8882 ms on localhost (2/2)
16/11/01 03:06:09 INFO DAGScheduler: ResultStage 2 (collect at JuliaRDD.scala:44) finished in 25.474 s
16/11/01 03:06:09 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool
16/11/01 03:06:09 INFO DAGScheduler: Job 2 finished: collect at JuliaRDD.scala:44, took 25.485222 s
27

Here we encounter the first area of improvement for this plugin: verbosity. The commands are too verbose and the actual results are hidden amongst the logging statements. The logs should be set to debug levels and only shown if the user changes the log level. The lines that we care about the most from the above outputs are the answers to each query:

16/11/01 03:05:32 INFO DAGScheduler: Job 1 finished: collect at JuliaRDD.scala:44, took 27.444345 s
3
16/11/01 03:06:09 INFO DAGScheduler: Job 2 finished: collect at JuliaRDD.scala:44, took 25.485222 s
27

Comorbidity List

To demonstrate a longer example, we can run the following code to retrieve the list of comorbidity for a specific disease:

In [ ]:
Pkg.add("DataFrames")
using Spark
using DataFrames
sc = SparkContext(master="local")
path = "/usr/local/main_dataset/diagnosis_list.csv"
txt = text_file(sc, path)


# Get list of patients 
patients_rdd = map(txt, line -> begin
           x = split(line, ',')
           x[2]
      end)

patients = collect(patients_rdd)

# Get dates
dates_rdd = map(txt, line -> begin
           x = split(line, ',')
           Dates.DateTime(x[3], "yyyy-mm-dd HH:MM:SS")
      end)

dates = collect(dates_rdd)

# Get diags
diags_rdd = map(txt, line -> begin
           x = split(line, ',')
           x[5]
      end)


diags = collect(diags_rdd)
# Construct data frame from data returned from Spark
df = DataFrame()
df[:mrn] = patients
df[:dates] = dates
df[:diags] = diags

sort!(df, cols = [:mrn, :dates])

diags_array = convert(Array, df)

disease = "Aniridia"

current_mrn = "None"
d = unique(diags_array[:,3])
l = Dict(i => 0 for i = d)
disease_found = false
for i  in 1:size(diags_array, 1)
    row = diags_array[i,:]
    if current_mrn != row[1]
        current_mrn = row[1]
        disease_found = false
    end

    if row[3] == disease
        disease_found = true
    end
        
    if disease_found && row[3] != disease
        l[row[3]] += 1
    end
    
end


for (key, value) in l 
    if value != 0 
        println(key, "=>", value)
    end
end
    
close(sc)

By running the above code, we can get the following list of diseases that are associated with "Aniridia":

Subtrochanteric fx-open=>1 Mening in oth fungal dis=>1 Hypermobility of coccyx=>1 C5-c7 fx-op/cord inj NOS=>1 Neoplasm other sites NOS=>1 Burn NOS hand-multiple=>1 Puerp endomet del w p/p=>1 Ac eustachian salping=>1 TB sp crd absc-no exam=>1 Post-void dribbling=>1 Epimembranous nephritis=>1 Vasc lesion cord-deliver=>1 Serous choroid detachmnt=>1 Malig neo soft palate=>1 Candidal esophagitis=>1 Measles keratitis=>1 Toxic effect of asbestos=>1 Inj thoracic vessel NOS=>1 "Panniculitis=>1 Flu w manifestation NEC=>1 RR acc w explos-pasngr=>1 Ice Skating=>1 Pulmonary AV malformatn=>1 Mach acc-occ power boat=>1 Toxic effect strychnine=>1 "Fx mid/prx phal=>1 Open wound vulva-compl=>1 Hpt C w/o hepat coma NOS=>1 TB urinary NEC-unspec=>1 Carotid sinus syndrome=>1 Chr otitis externa NEC=>1 Burn NOS upper arm=>1 Ben carcinoid sig colon=>1 Meconium staining=>1 Tubrclma sp crd-histo dx=>1 Ac stomach ulc NOS-obstr=>1 Late ef-spch/lang df NEC=>1

This long example exposed a big gap in the implementation of this library: it is missing the group by and sort by functions that are crucial for perfoming basic analysis on Spark. Without those functions, we had to rely on DataFrames to perform the intended computation.

Conclusion & Next Steps

We can see from this analysis that even though we can perform mapping and reduction using Spark.jl, the lack of group by and sort by functions severely limit the possible solutions.

Plugin Issues :

  • No Group by function
  • No Sorty by function
  • Verbosity

We believe that tackling these 3 pain points will dramatically increase the usability of this plugin.