Groups 116 of 99+ julia-users › moving data to workers for distributed workloads 9 posts by 4 authors Michael Eastwood May 18 Hi julia-users! I need some help with distributing a workload across tens of workers across several machines. The problem I am trying to solve involves calculating the elements of a large block diagonal matrix. The total size of the blocks is 500 GB, so I cannot store the entire thing in memory. The way the calculation works, I do a bunch of spherical harmonic transforms and the results give me one row in each block of the matrix. The following code illustrates what I am doing currently. I am distributing the spherical harmonic transforms amongst all the workers and bringing the data back to the master process to write the results to disk the master process has each matrix block mmapped to disk . idx 1 limit 10000 nextidx myidx idx; idx + 1; myidx sync for worker in workers async while true myidx nextidx myidx ≤ limit || break coefficients remotecall_fetch spherical_harmonic_transforms, worker, input myidx write_results_to_disk coefficients end end Each spherical harmonic transform takes O 10 seconds so I thought the data movement cost would be negligible compared to this. However, if I have three machines each with 16 workers, machine 1 will have all 16 workers working hard the master process is on machine 1 and machines 2&3 will have most of their workers idling. My hypothesis is that the cost of moving the data to and from the workers is preventing machines 2&3 from being fully utilized. coefficients is a vector of a million Complex128s 16 MB input is composed of two parts: 1 a vector of 10 million Float64s 100 MB and 2 a small amount of additional information that is negligibly small compared to the first part. The trick is that the first part of input the 100 MB vector doesn't change between iterations. So I could alleviate most of the data movement problem by moving that part to each worker once. Problem is that I can't seem to figure out how to do that. The manual http: docs.julialang.org en release-0.4 manual parallel-computing remoterefs-and-abstractchannels is a little thin on how to use RemoteRefs. So how do you move data to workers in a way that it can be re-used on subsequent iterations? An example in the manual would be very helpful! Thanks, Michael Fabian Gans May 19 Hi Michael, I recently had a similar problem and this SO thread helped me a lot: http: stackoverflow.com questions 27677399 julia-how-to-copy-data-to-another-processor-in-julia As a side question: Which code are you using to calculate spherical harmonics transforms. I was looking for a julia package some time ago and did not find any, so if your code is publicly available, could you point me to it? Thanks Fabian Matthew Pearce May 19 Hi Michael Your current code looks like will pull back the `coefficients` across the network 500 gb transfer and as you point out transfer `input` each time. I wrote a package ClusterUtils-jl to handle my own problems MCMC sampling which were somewhat similar. Roughly - given the available info - if I was trying to do something similar I'd do: ```julia using Compat using ClusterUtils sow pids, :input, input everywhere function dostuff input, myidxs for myidx in myidxs coefficients spherical_harmonic_transforms input myidx write_results_to_disk coefficients needs myidx as arg too probably end end idxs chunkit limit, length pids sow pids, :work, : Dict zip $pids, $idxs reap pids, : dostuff input, $work myid ``` This transfers `input` once, and writes something to disk from the remote process. Matthew Pearce May 19 Also... The above doesn't respect ordering of the `myidx` variable. Not sure how your problem domain operates, so it could get more complicated if things like order of execution matter. ; Michael Eastwood May 19 Hi Fabian, It looks like that SO answer is moving data into the global scope of each worker. It is probably worth experimenting with but I'd be worried about performance implications of non-const global variables. It's probably the case that this is still a win for my use case though. Thanks for the link. I'm using alm2map and map2alm from LibHealpix-jl https: github.com mweastwood LibHealpix-jl to do the spherical harmonic transforms. Michael Michael Eastwood May 19 Hi Matthew, ClusterUtils-jl looks very useful. I will definitely try it out. Am I correct in reading that the trick to moving input to the workers is here? You're also correct that write_results_to_disk does actually depend on myidx. I might have somewhat oversimplified the example. Thanks, Michael Matthew Pearce May 19 Michael That's right. With `sow` the mod.eval of the third argument gets bound to the second; where mod defaults to Main. Maybe someone could think of a cleaner way to make values available for later work, but it seems to do the trick. It seems best to avoid using the `sow` function heavily. `reap` returns the mod.eval of the second argument with no assignment on each pid. Good luck! Matthew Greg Plowman May 19 It looks like that SO answer is moving data into the global scope of each worker. It is probably worth experimenting with but I'd be worried about performance implications of non-const global variables. It's probably the case that this is still a win for my use case though. Thanks for the link. Why not declare input vector as const? I have a similar requirement for a simulation. I found it convenient to wrap everything that is required on workers into a module. module SphericalHarmonicTransforms export spherical_harmonic_transforms const input Vector Float64 10 7 ... function spherical_harmonic_transforms idx coefficients Vector Complex128 10 6 ... reference input vector here return coefficients end end Then to propagate to all workers, just write using SphericalHarmonicTransforms function sim idx 1 limit 10000 nextidx myidx idx; idx + 1; myidx sync for worker in workers async while true myidx nextidx myidx ≤ limit || break coefficients remotecall_fetch worker, spherical_harmonic_transforms, myidx write_results_to_disk coefficients end end end addprocs using SphericalHarmonicTransforms sim Matthew Pearce May 20 Greg, interesting suggestion about const - I should use that more. In that setup we'd still be transferring alot of big coefficient vectors around workarounds possible .