scalability problem in parallel julia #10427 Closed epn opened this Issue on Mar 6, 2015 · 9 comments Projects None yet Labels parallel performance Milestone No milestone Assignees No one assigned 5 participants @epn @kmsquire @amitmurthy @ViralBShah @simonster Notifications You’re not receiving notifications from this thread. @epn epn commented on Mar 6, 2015 This issue describes a scalability problem in parallel julia. The described julia code solves Ax = b in parallel, where A is a symmetric tridiagonal matrix, and uses darrays. The following figure shows the theoretical expected scalability of this code for a matrix of size 1 billion. expected The following figure shows the actual scalability of this code on julia.mit.edu, for a matrix of size 1 billion. actual I am not sure why the actual scalability is poor beyond 8 processors. I guess the spawn/fetch overhead is growing significantly, but not sure how to show that. At a higher level, the code divides the matrix A into p submatrices, where p is the # of processors. The code forward solves the p submatrices in parallel, then solves a smaller size p submatrix in serial, and finally back solves the p submatrices again in parallel. Only 5 scalars are communicated between the workers and the master, and hence there is no significant communication overhead. #solves Ax = b where A is a symmetric tridiagonal matrix function mylu(a, b, c) n = size(a,1) for i = 2:n l = c [i - 1] / a [i - 1] a [i] -= c [i - 1] * l b [i] -= l * b [i - 1] end b [n] /= a [n] for i = n - 1 : -1 : 1 b [i] = (b [i] - c [i] * b [i + 1]) / a [i] end end #backsolve function usolve(Dc, Dd, Dl, Dx, x1, x2) c = localpart(Dc) d = localpart(Dd) l = localpart(Dl) x = localpart(Dx) n = size(x, 1) #x1, x2 are solutions at the next and previous separators x_ = x [n] = x1 for i = n - 1 : -1 : 1 x_ = x [i] = (x [i] - c [i + 1] * x_ - l [i] * x2) / d [i] # 5 flops end end #forward solve function lsolve(Da, Db, Dc, Dd, Dl, Dx) a = localpart(Da) b = localpart(Db) c = localpart(Dc) d = localpart(Dd) l = localpart(Dl) x = localpart(Dx) n = size(x,1) d_ = d [1] = a [1] x_ = x [1] = b [1] f = c [1] l [1] = f r = f / d_ s1 = f * r x1 = x_ * r for i = 2 : n - 1 c_ = c [i] l_ = c_ / d_ # 1 d_ = a [i] - c_ * l_ # 2 x_ = b [i] - x_ * l_ # 2 f *= -l_ # 1 excluding negation for fillin r = f / d_ # 1 s1 += f * r # 2 x1 += x_ * r # 2 l [i] = f # store fillin d [i] = d_ x [i] = x_ end l_ = c [n] / d_ d [n] = a [n] - c [n] * l_ x [n] = b [n] - x_ * l_ fp = - r * c [n] # fillin between previous and next separators s1, x1, fp, d [n], x[n] end function nd_p_s(Da, Db, Dc, Dd, Dl, Dx, procs) np = size(procs,1) # no of processors assert (np > 0) rf = Array(Any, np) for i = 1:np p = procs [i] rf [i] = @spawnat p lsolve(Da, Db, Dc, Dd, Dl, Dx) end a = zeros (np) c = zeros (np -1) x = zeros (np) (s1, x1, f, a [1], x [1]) = fetch(rf [1]) for i = 2:np (s1, x1, c [i - 1], a [i], x [i]) = fetch(rf [i]) a [i - 1] -= s1 x [i - 1] -= x1 end mylu(a, x, c) p = procs [1] @sync begin @spawnat p usolve(Dc, Dd, Dl, Dx, x [1], 0) for i = 2:np p = procs [i] @spawnat p usolve(Dc, Dd, Dl, Dx, x [i], x [i - 1]) end end end function f1(dims, procs) # auxiliary function to partition all vectors but c n = dims[1] P = size(procs, 1) leaf_size::Int64 = max(floor(n / P), 2) #size of a leaf cuts = ones(Int, P + 1) * leaf_size cuts [1] = 1 cuts = cumsum(cuts) cuts [end] = n + 1 indexes = Array(Any, P) for i = 1:P indexes [i] = (cuts [i] : cuts [i+1] - 1,) end indexes, Any[cuts] end function f2(dims, procs) # auxiliary function to partition c n = dims[1] P = size(procs, 1) leaf_size::Int64 = max(floor((n - 1) / P), 2) #size of a leaf cuts = ones(Int, P + 1) * leaf_size cuts [1] = 1 cuts = cumsum(cuts) cuts [end] = n + 1 indexes = Array(Any, P) for i = 1:P indexes [i] = (cuts [i] : cuts [i+1] - 1,) end indexes, Any[cuts] end function test_nd(n, procs::Array{Int64, 1}) P = size(procs, 1) # # of processors Da = DArray(I->100 * rand(map(length,I)), (n,), procs, P, f1) Db = DArray(I->rand(map(length,I)), (n,), procs, P, f1) Dc = DArray(I->rand(map(length,I)), (n + 1,), procs, P, f2) Dx = DArray(I->zeros(map(length,I)), (n,), procs, P, f1) Dd = DArray(I->zeros(map(length,I)), (n,), procs, P, f1) Dl = DArray(I->zeros(map(length,I)), (n,), procs, P, f1) elapsed_time = 0 elapsed_time = @elapsed begin nd_p_s(Da, Db, Dc, Dd, Dl, Dx, procs) end println("n ", n, " # processors ", P , " elapsed time ", elapsed_time) end To be able to run this code, you need to add the following function to base/darray.jl function DArray(init, dims, procs, dist, distfunc::Function) np = prod(dist) procs = procs[1:np] idxs, cuts = distfunc([dims...], procs) chunks = Array(RemoteRef, dist...) for i = 1:np chunks[i] = remotecall(procs[i], init, idxs[i]) end p = max(1, localpartindex(procs)) A = remotecall_fetch(procs[p], r->typeof(fetch(r)), chunks[p]) DArray{eltype(A),length(dims),A}(dims, chunks, procs, idxs, cuts) end The code can be tested as follows : addprocs(p) # where p is the # of processors require("julia file with the above code") test_nd(n, workers()) #where n is the size of the matrix to solve. I would appreciate if you could throw some light on this problem. Thanks. edit: jwn added code fences for improved readability @kmsquire The Julia Language member kmsquire commented on Mar 6, 2015 Was anything else running on julia.mit.edu when you ran this? @epn epn commented on Mar 7, 2015 No. @kmsquire The Julia Language member kmsquire commented on Mar 7, 2015 Okay, more obvious questions: What version of Julia was this (what's the output of versioninfo())? DArray was recently moved to a DistributedArrays package (where it will still be developed). Have you tried using SharedArrays instead? One thing you gain with SharedArrays is that you don't have to actually copy the data to each process, which might explain some of the slowdown. BTW, the DArray constructor code you added to base/darray.jl could have just been part of your original code, as long as you first called import Base.DArray. @epn epn commented on Mar 11, 2015 Thanks for the inputs. 1. I don't have the correct version info for the above experiment, since i updated julia after i ran the experiment. However, i think the results are similar even on the latest version. 2. We can't use SharedArrays since this code is supposed to run in distributed memory. On a related note, do you know how to run parallel code on a distributed memory setting, for example a cluster? I tried adding the darray constructor code to my original code, and using import Base.DArray, but it doesn't work. I get the error that "localpartindex" is undefined. The darray constructor calls localpartindex. @kmsquire The Julia Language member kmsquire commented on Mar 11, 2015 For localpartindex, you can change the call to Base.localpartindex (it's not exported). The same should be true for any other functions not exported from Base, but used in your constructor. Regarding distributing to a cluster, your best bet is to read the documentation (http://julia.readthedocs.org/en/latest/manual/parallel-computing/). It's not something I'm very familiar with. I also have the impression that the original API interface is going to be changing (especially since DArrays were moved out of base), and that it hadn't been developed as much as the rest of Julia. @tanmaykm might have some additional thoughts on the slowdown. @kmsquire The Julia Language member kmsquire commented on Mar 11, 2015 also Cc: @amitmurthy @amitmurthy The Julia Language member amitmurthy commented on Mar 26, 2015 It seems to me to be a cache contention issue. I tested with two situations: 1) Regular addprocs and 2) addprocs with a taskset to better distribute the workers across cores. My understanding is that julia.mit.edu is a 8 processor machine, with 10 cores each for a total of 80 cores. With hyperthreading disabled. Code for the regular run is at https://gist.github.com/amitmurthy/b6d1e8e43742f58042ad Code with setting processor affinity is at https://gist.github.com/amitmurthy/05ca93b3eb1e20bfb1af Regular run results: n np | mode elapsed %comp [lsolve_delays usolve_delays] 1000000000 32 | SEND 2.90s 64.87% [0.0151 0.1849 0.0874],[0.0077 0.1081 0.0542] 1000000000 31 | SEND 3.03s 61.84% [0.0134 0.1646 0.0865],[0.0092 0.1191 0.0612] 1000000000 30 | SEND 3.45s 61.71% [0.0120 0.1552 0.0758],[0.0076 0.0949 0.0499] 1000000000 29 | SEND 3.48s 61.70% [0.0117 0.1330 0.0731],[0.0090 0.0890 0.0486] 1000000000 28 | SEND 3.36s 64.60% [0.0117 0.1474 0.0758],[0.0094 0.0799 0.0434] 1000000000 27 | SEND 3.10s 64.99% [0.0139 0.1198 0.0652],[0.0075 0.0811 0.0446] 1000000000 26 | SEND 3.65s 64.29% [0.0135 0.1081 0.0595],[0.0067 0.0735 0.0410] 1000000000 25 | SEND 3.52s 58.32% [0.0139 0.1063 0.0581],[0.0090 0.0773 0.0447] 1000000000 24 | SEND 3.59s 66.27% [0.0116 0.0938 0.0526],[0.0111 0.0672 0.0387] 1000000000 23 | SEND 3.57s 66.41% [0.0095 0.1102 0.0582],[0.0053 0.0633 0.0367] 1000000000 22 | SEND 3.89s 64.45% [0.0159 0.1013 0.0537],[0.0133 0.0668 0.0419] 1000000000 21 | SEND 4.29s 67.07% [0.0097 0.0737 0.0426],[0.0069 0.0544 0.0316] 1000000000 20 | SEND 4.16s 70.46% [0.0107 0.0645 0.0375],[0.0070 0.0517 0.0298] 1000000000 19 | SEND 4.61s 72.60% [0.0083 0.0732 0.0389],[0.0059 0.0463 0.0271] 1000000000 18 | SEND 4.29s 73.76% [0.0076 0.0704 0.0360],[0.0065 0.0449 0.0248] 1000000000 17 | SEND 3.87s 78.88% [0.0101 0.0603 0.0356],[0.0123 0.0562 0.0370] 1000000000 16 | SEND 5.12s 76.88% [0.0118 0.0546 0.0331],[0.0057 0.0346 0.0200] 1000000000 15 | SEND 5.45s 73.18% [0.0087 0.0429 0.0276],[0.0065 0.0342 0.0219] 1000000000 14 | SEND 5.96s 73.23% [0.0077 0.0391 0.0253],[0.0055 0.0297 0.0175] 1000000000 13 | SEND 5.93s 74.55% [0.0078 0.0424 0.0235],[0.0061 0.0253 0.0166] 1000000000 12 | SEND 6.44s 76.73% [0.0060 0.0291 0.0181],[0.0042 0.0214 0.0141] 1000000000 11 | SEND 6.76s 81.71% [0.0086 0.0282 0.0187],[0.0050 0.0183 0.0123] 1000000000 10 | SEND 7.03s 82.68% [0.0043 0.0245 0.0149],[0.0043 0.0213 0.0152] 1000000000 9 | SEND 8.24s 74.90% [0.0065 0.0245 0.0177],[0.0039 0.0174 0.0117] 1000000000 8 | SEND 9.40s 98.96% [0.0062 0.0204 0.0137],[0.0033 0.0120 0.0078] 1000000000 7 | SEND 10.56s 89.49% [0.0046 0.0155 0.0100],[0.0045 0.0124 0.0086] 1000000000 6 | SEND 11.83s 87.03% [0.0044 0.0113 0.0079],[0.0036 0.0119 0.0085] 1000000000 5 | SEND 14.22s 77.26% [0.0051 0.0132 0.0098],[0.0051 0.0161 0.0112] 1000000000 4 | SEND 12.03s 93.52% [0.0053 0.0074 0.0061],[0.0039 0.0108 0.0065] 1000000000 3 | SEND 18.09s 83.10% [0.0034 0.0059 0.0049],[0.0030 0.0054 0.0042] 1000000000 2 | SEND 21.43s 92.68% [0.0068 0.0079 0.0073],[0.0025 0.0035 0.0030] 1000000000 1 | SEND 36.13s 99.98% [0.0021 0.0021 0.0021],[0.0019 0.0019 0.0019] Processor affinity run results: n np | mode elapsed %comp [lsolve_delays usolve_delays] 1000000000 32 | SEND 1.87s 79.12% [0.0090 0.2107 0.1099], [0.0089 0.0986 0.0537] 1000000000 31 | SEND 1.79s 83.55% [0.0089 0.1680 0.0888], [0.0119 0.1061 0.0537] 1000000000 30 | SEND 1.99s 78.17% [0.0101 0.1445 0.0765], [0.0108 0.1246 0.0738] 1000000000 29 | SEND 1.85s 83.87% [0.0081 0.1460 0.0750], [0.0087 0.0877 0.0484] 1000000000 28 | SEND 1.89s 83.19% [0.0076 0.1315 0.0806], [0.0086 0.0897 0.0492] 1000000000 27 | SEND 2.00s 81.88% [0.0096 0.1255 0.0644], [0.0102 0.1111 0.0622] 1000000000 26 | SEND 2.02s 81.33% [0.0073 0.1204 0.0634], [0.0080 0.1059 0.0530] 1000000000 25 | SEND 2.01s 83.06% [0.0070 0.1197 0.0588], [0.0086 0.0853 0.0449] 1000000000 24 | SEND 2.06s 84.76% [0.0084 0.0998 0.0500], [0.0076 0.0650 0.0330] 1000000000 23 | SEND 1.98s 90.61% [0.0063 0.0957 0.0474], [0.0082 0.0840 0.0451] 1000000000 22 | SEND 2.25s 84.54% [0.0085 0.1534 0.0517], [0.0075 0.0576 0.0318] 1000000000 21 | SEND 2.08s 92.52% [0.0059 0.0724 0.0424], [0.0076 0.0558 0.0342] 1000000000 20 | SEND 2.17s 92.20% [0.0058 0.0773 0.0418], [0.0106 0.0532 0.0333] 1000000000 19 | SEND 2.31s 90.76% [0.0082 0.0658 0.0364], [0.0064 0.0979 0.0354] 1000000000 18 | SEND 2.44s 90.68% [0.0077 0.0599 0.0367], [0.0062 0.0921 0.0604] 1000000000 17 | SEND 2.48s 93.40% [0.0059 0.0549 0.0333], [0.0065 0.0389 0.0230] 1000000000 16 | SEND 2.64s 93.53% [0.0051 0.0465 0.0255], [0.0059 0.0379 0.0236] 1000000000 15 | SEND 2.71s 96.15% [0.0045 0.0592 0.0503], [0.0055 0.0340 0.0204] 1000000000 14 | SEND 2.98s 94.33% [0.0060 0.1004 0.0313], [0.0042 0.0303 0.0185] 1000000000 13 | SEND 3.08s 97.46% [0.0064 0.0376 0.0263], [0.0052 0.0281 0.0172] 1000000000 12 | SEND 3.37s 96.47% [0.0054 0.0291 0.0182], [0.0039 0.0743 0.0368] 1000000000 11 | SEND 3.66s 96.48% [0.0035 0.0889 0.0311], [0.0056 0.0209 0.0136] 1000000000 10 | SEND 4.02s 96.39% [0.0050 0.0985 0.0332], [0.0045 0.0214 0.0131] 1000000000 9 | SEND 4.40s 98.03% [0.0043 0.0198 0.0127], [0.0035 0.0169 0.0109] 1000000000 8 | SEND 4.91s 98.16% [0.0041 0.0233 0.0118], [0.0041 0.0133 0.0089] 1000000000 7 | SEND 5.60s 98.62% [0.0029 0.0558 0.0355], [0.0035 0.0139 0.0085] 1000000000 6 | SEND 6.57s 96.21% [0.0037 0.0286 0.0119], [0.0038 0.0667 0.0544] 1000000000 5 | SEND 7.75s 99.45% [0.0030 0.0310 0.0131], [0.0024 0.0088 0.0057] 1000000000 4 | SEND 9.72s 97.09% [0.0046 0.0828 0.0381], [0.0026 0.0055 0.0041] 1000000000 3 | SEND 12.89s 99.90% [0.0023 0.0059 0.0042], [0.0021 0.0048 0.0034] 1000000000 2 | SEND 19.37s 99.89% [0.0022 0.0040 0.0031], [0.0019 0.0029 0.0024] 1000000000 1 | SEND 36.09s 99.99% [0.0018 0.0018 0.0018], [0.0016 0.0016 0.0016] Columns to observe: 2nd column is number of workers 4th column is elapsed time 5th column is (time spent in usolve and lsolve across all workers) / (elapsed time * num_procs) With processor affinity, the scaling is near linear from 1 to 8 workers and thereafter too it is much better than the regular run. Setting processor affinity is not straightforward with addprocs currently. I used something like function launch_n_procs(n) r = n==1 ? n : linrange(0, Sys.CPU_CORES-1, n) for i in r p = Int(round(i)) addprocs(1; exename=`taskset -c $p $(joinpath(JULIA_HOME, Base.julia_exename()))`) end end i.e., calling addprocs serially, which is a very slow means of adding 32 workers. We should make it a feature of addprocs itself. Also, performance issues due to cache contention has cropped up before on the mailing lists. Is there a means of detecting excessive cache misses and providing the information as part of the Profile package? cc @andreasnoack , @jakebolewski , @ViralBShah This was referenced on Mar 26, 2015 Closed option to set processor affinity in addprocs #10645 Closed support basic core affinities in LocalManager and SSHManager #10733 @amitmurthy The Julia Language member amitmurthy commented on Apr 30, 2015 @epn I have added a provision to start local workers with CPU affinity in JuliaParallel/ClusterManagers.jl@9afe252 Could you run your tests with this and close this issue? When I tested with distributing 32 workers across all processor sockets, I found near linear scaling. @simonster simonster referenced this issue on May 19, 2015 Closed Problems with scaling pmap beyond 6-7 cores #11354 @simonster simonster added parallel performance labels on May 19, 2015 @ViralBShah The Julia Language member ViralBShah commented on Aug 7, 2015 Definitely a processor affinity issue. I believe the machine (julia.mit.edu) is two multi-socket boards. If you get placement that requires communication across boards (using the slower PCI transport), scalability will suffer. This has been seen on many problem sets. I think it is worth reopening this if the same problem is run on a different machine with careful placement. One could argue that Julia should figure the placement automatically. @ViralBShah ViralBShah closed this on Aug 7, 2015