Groups 116 of 99+ julia-users › moving data to workers for distributed workloads 9 posts by 4 authors Michael Eastwood May 18 Hi julia-users! I need some help with distributing a workload across tens of workers across several machines. The problem I am trying to solve involves calculating the elements of a large block diagonal matrix. The total size of the blocks is >500 GB, so I cannot store the entire thing in memory. The way the calculation works, I do a bunch of spherical harmonic transforms and the results give me one row in each block of the matrix. The following code illustrates what I am doing currently. I am distributing the spherical harmonic transforms amongst all the workers and bringing the data back to the master process to write the results to disk (the master process has each matrix block mmapped to disk). idx = 1 limit = 10000 nextidx() = (myidx = idx; idx += 1; myidx) @sync for worker in workers() @async while true myidx = nextidx() myidx ≤ limit || break coefficients = remotecall_fetch(spherical_harmonic_transforms, worker, input[myidx]) write_results_to_disk(coefficients) end end Each spherical harmonic transform takes O(10 seconds) so I thought the data movement cost would be negligible compared to this. However, if I have three machines each with 16 workers, machine 1 will have all 16 workers working hard (the master process is on machine 1) and machines 2&3 will have most of their workers idling. My hypothesis is that the cost of moving the data to and from the workers is preventing machines 2&3 from being fully utilized. coefficients is a vector of a million Complex128s (16 MB) input is composed of two parts: 1) a vector of 10 million Float64s (100 MB) and 2) a small amount of additional information that is negligibly small compared to the first part. The trick is that the first part of input (the 100 MB vector) doesn't change between iterations. So I could alleviate most of the data movement problem by moving that part to each worker once. Problem is that I can't seem to figure out how to do that. The manual (http://docs.julialang.org/en/release-0.4/manual/parallel-computing/#remoterefs-and-abstractchannels) is a little thin on how to use RemoteRefs. So how do you move data to workers in a way that it can be re-used on subsequent iterations? An example in the manual would be very helpful! Thanks, Michael Fabian Gans May 19 Hi Michael, I recently had a similar problem and this SO thread helped me a lot: http://stackoverflow.com/questions/27677399/julia-how-to-copy-data-to-another-processor-in-julia As a side question: Which code are you using to calculate spherical harmonics transforms. I was looking for a julia package some time ago and did not find any, so if your code is publicly available, could you point me to it? Thanks Fabian - show quoted text - Matthew Pearce May 19 Hi Michael Your current code looks like will pull back the `coefficients` across the network (500 gb transfer) and as you point out transfer `input` each time. I wrote a package ClusterUtils.jl to handle my own problems (MCMC sampling) which were somewhat similar. Roughly - given the available info - if I was trying to do something similar I'd do: ```julia using Compat using ClusterUtils sow(pids, :input, input) @everywhere function dostuff(input, myidxs) for myidx in myidxs coefficients = spherical_harmonic_transforms(input[myidx]) write_results_to_disk(coefficients) #needs myidx as arg too probably end end idxs = chunkit(limit, length(pids)) sow(pids, :work, :(Dict(zip($pids, $idxs)))) reap(pids, :(dostuff(input, $work[myid()]))) ``` This transfers `input` once, and writes something to disk from the remote process. Matthew Pearce May 19 Also... The above doesn't respect ordering of the `myidx` variable. Not sure how your problem domain operates, so it could get more complicated if things like order of execution matter. ;) Michael Eastwood May 19 Hi Fabian, It looks like that SO answer is moving data into the global scope of each worker. It is probably worth experimenting with but I'd be worried about performance implications of non-const global variables. It's probably the case that this is still a win for my use case though. Thanks for the link. I'm using alm2map and map2alm from LibHealpix.jl (https://github.com/mweastwood/LibHealpix.jl) to do the spherical harmonic transforms. Michael - show quoted text - Michael Eastwood May 19 Hi Matthew, ClusterUtils.jl looks very useful. I will definitely try it out. Am I correct in reading that the trick to moving input to the workers is here? You're also correct that write_results_to_disk does actually depend on myidx. I might have somewhat oversimplified the example. Thanks, Michael - show quoted text - Matthew Pearce May 19 Michael That's right. With `sow` the mod.eval of the third argument gets bound to the second; where mod defaults to Main. Maybe someone could think of a cleaner way to make values available for later work, but it seems to do the trick. It seems best to avoid using the `sow` function heavily. `reap` returns the mod.eval of the second argument with no assignment on each pid. Good luck! Matthew - show quoted text - Greg Plowman May 19 It looks like that SO answer is moving data into the global scope of each worker. It is probably worth experimenting with but I'd be worried about performance implications of non-const global variables. It's probably the case that this is still a win for my use case though. Thanks for the link. Why not declare input vector as const? I have a similar requirement for a simulation. I found it convenient to wrap everything that is required on workers into a module. module SphericalHarmonicTransforms export spherical_harmonic_transforms const input = Vector{Float64}(10^7) ... function spherical_harmonic_transforms(idx) coefficients = Vector{Complex128}(10^6) ... # reference input vector here return coefficients end end Then to propagate to all workers, just write using SphericalHarmonicTransforms function sim() idx = 1 limit = 10000 nextidx() = (myidx = idx; idx += 1; myidx) @sync for worker in workers() @async while true myidx = nextidx() myidx ≤ limit || break coefficients = remotecall_fetch(worker, spherical_harmonic_transforms, myidx) write_results_to_disk(coefficients) end end end addprocs() using SphericalHarmonicTransforms sim() Matthew Pearce May 20 Greg, interesting suggestion about const - I should use that more. In that setup we'd still be transferring alot of big coefficient vectors around (workarounds possible).