RFC: Simplifying and generalising pmap #14843 Closed samoconnor opened this Issue on Jan 28 · 33 comments Projects None yet Labels parallel Milestone No milestone Assignees No one assigned 8 participants @samoconnor @bjarthur @amitmurthy @blakejohnson @tkelman @StefanKarpinski @tanmaykm @kshyatt Notifications You’re not receiving notifications from this thread. @samoconnor samoconnor commented on Jan 28 [ Update: #15409 implements most of this and is now merged ] This issue covers some of the same ground as #12943 (stale), #14515 and #14736. My intention is to present an overall pmap refactoring plan here for comment and submit new PRs if it is seen as useful. pmap Features The current pmap implementation has the following features: Using @sync/@async to run mapped function in parallel and collect results. Automatic allocation of idle workers to remotecall_wait calls. Not reusing workers believed to be faulty. Option to retry on error. Option to continue mapping despite errors (returning exceptions in the result array). These are all useful features. However, it seems to me that pmap currently tries to do too much and that many of these features would be useful in contexts other than pmap: "1." is useful when using RPC mechanisms other than remotecall ( e.g. HTTP, readstring(::Cmd) or AWS Lambda). "2." and "3." are probably useful to other users of the remotecall mechanism. If "4." and "5." are useful with pmap they should also be useful with ordinary map. Proposed Separation of Features amap asyncmap #15058 asyncmap adds @sync/@async to regular map. (feature "1.") # Run `::Cmd`s in parallel... julia> @time map(t->run(`sleep $t`), 1:3) 6 seconds julia> @time asyncmap(t->run(`sleep $t`), 1:3) 3 seconds # Run downloads in parallel... julia> url_data = asyncmap(download, urls)) WorkerPool and remote #15073 A WorkerPool keeps track of which workers are busy. take!(default_worker_pool()) yeilds a worker that is not already busy doing remotecall_wait, waiting if needed (feature "2".). If there is a reliable way to identify "faulty" workers (feature "3.") then worker will not return fault workers. (Partial implementation in comments of: #14736) remote takes a function and returns a lambda that executes the function on a remote worker. Using asyncmap and remote together... pmap(f, c...) = asyncmap(remote(f), c...) function remotecall_fetch(f, args...) remotecall_fetch(f, default_worker_pool(), args...) end remote(f::Function) = (args...)->remotecall_fetch(f, args...) @catch #15409 @catch takes a function can returns a lambda that catches any exceptions thrown by the function and returns the exception object. pmap(f, v; err_stop=false) can be replaced with pmap(@catch(f), v) (feature "5.") macro catch(f) :((args...) -> try $f(args...) catch ex; ex end) end retry #15409 retry takes a function and returns a lambda that retries the function if an error occurs (feature "4."). pmap(f, v; err_retry=true) can be replaced with pmap(retry(f), v), or for more granular error handling pmap(retry(f, e -> isa(e, NetworkTimeoutError)), v) function retry(f::Function, condition::Function=(e)->true, n::Integer=3) (args...) -> @repeat n try f(args...) catch e @retry if condition(e) end end end (@repeat and @retry are implemented in https://github.com/samoconnor/Retry.jl) pmap Defaults pmap currently has some unexpected (at least to me) defaults. The default behaviour is to re-execute the mapped function on error irrespective of the type of error. (i.e. the function is assumed to be idempotent). When the function throws an error, the worker is removed from the pool irrespective of the type of error (i.e. it is assumed that all errors are due to a faulty worker). By default, errors thrown by the mapped function are returned as part of the result array rather than being re-thrown. This is inconsistent with regular map. This was referenced on Jan 28 Closed `pmap()` cleanup. Fixes #12908. #12943 Closed map() using @async #14515 Closed idleworker() function. #14736 @kshyatt kshyatt added the parallel label on Jan 29 @bjarthur bjarthur commented on Feb 8 how easy would it be to have pmap dynamically adjust to a changing number of workers in the pool? my dream is to have a non-blocking addprocs for use on my private shared SGE cluster. when the load is high, qsub'ing a large number of procs can take awhile to completely finish. it would be huge be if ClusterManagers.launch was asynchronous, and could add to the pool as soon as each proc started, instead of waiting until they all started. pmap could then get started right away, at first using just a few workers, and adding more as they became available. @amitmurthy @amitmurthy The Julia Language member amitmurthy commented on Feb 8 @async addprocs(....) should work. Changing pmap to use a shared queue, as discussed here - #14736 (comment), should allow us to start using started workers right away. @amitmurthy The Julia Language member amitmurthy commented on Feb 9 To expand on the above: The SGE cluster manager creates a WorkerPool consisting of the workers it has launched SGE manage with op as :register will add newly launched workers to the worker pool as and when they come online. :deregister will remove them. pmap has an option to use a WorkerPool for distributing work User code would be something like: cman = SGEManager(np, queue) @async addprocs(cman) pmap(f, jobs; pids=worker_pool(cman)) # worker_pool is a method implemented by the SGE cluster manager and returns its pool of workers. @bjarthur bjarthur commented on Feb 9 thanks amit. question: in your proposed interface, can a worker which comes online during the execution of a particular pmap be utilized by that same call to pmap? that is what i am hoping for. @amitmurthy The Julia Language member amitmurthy commented on Feb 9 Yes. @amitmurthy The Julia Language member amitmurthy commented on Feb 10 @samoconnor, the proposed changes look good. Look forward to the PRs. @samoconnor samoconnor commented on Feb 10 @amitmurthy, In the interests of avoiding one-big-PR, I plan to start by submitting a PR adds just adds amap (and its underlying iterator version imap) without changing any existing code as follows. Do you agree with this approach? amap(f, c...; async_max=100) → collection Transform collection c by applying f to each element using at most 100 asynchronous tasks. For multiple collection arguments, apply f elementwise. Note: amap(f, c...; async_max=1) is equivalent to map(f, c...). Implementation: amap(f, c...; kv...) = collect(imap(f, c...; kv...)) imap(f, c...; async_max=100) → iterator Apply f to each element of c using at most 100 asynchronous tasks. For multiple collection arguments, apply f elementwise. Note: collect(imap(f, c...; async_max=1)) is equivalent to map(f, c...). Implementation using StreamMapItr: imap(f, c...; async_max=nothing) = StreamMapItr(f, c...; async_max=async_max) amap!(function, collection; async_max=100) In-place version of amap(). amap!(function, destination, collection...; async_max=100) Like amap(), but stores the result in destination rather than a new collection. destination must be at least as large as the first collection. Implementation using AsyncMapItr: function amap!(f, c...; async_max=nothing) destination = c[1] if length(c) > 1 c = c[2:end] end for task in AsyncMapItr(f, destination, c..., async_max=async_max) end return destination end @bjarthur bjarthur commented on Feb 11 one benefit of a single large PR is that we could see all the proposed changes to the API at once. for example, i'm curious whether you plan to include an iterator-returning version of pmap. i proposed this myself once long ago: #4601 @blakejohnson blakejohnson commented on Feb 11 :+1: I definitely like the separation into composable chunks. @tkelman The Julia Language member tkelman commented on Feb 11 Please don't open a PR that tries to change too many things at once. Incremental changes are much smoother to get reviewed and merged. The end vision can be seen "at once" in a package or separate branch, but PR's should be broken into smaller gradual chunks whenever possible. @samoconnor samoconnor commented on Feb 12 @bjarthur: If pmap(f, c...) = amap(remote(f), c...) then an iterator version of pmap would be simply: ipmap(f, c...) = imap(remote(f), c...). I think this is most definitely a useful thing. I don't know if ipmap is the right name for it... (sounds like it has something to do with IP Addresses). @amitmurthy The Julia Language member amitmurthy commented on Feb 12 Some thoughts. Apologies for the delayed response. My understanding is that the suggested amap parallelizes over tasks and pmap over workers. Why don't we just have pmap that would use tasks, workers or a worker pool depending on a keyword arg. pmap as a parallel map is well understood. Async execution is typically understood to be non-blocking, i.e., returns immediately with the expression/function being executed in a different task. Putting up some thoughts for discussion: pmap(f, c...; ntasks=N, pids=< pid_array | worker_pool >) where ntasks is the concurrent tasks per pid, defaults to 100 pids identifies either specific workers to distribute these tasks over or a worker pool This will also help optimize pmap in cases where the data to be mapped is much larger than the number of workers and having pmap processes chunks of the list at a time will reduce network roundtrips. @tanmaykm , @ViralBShah saw this issue for some of their workloads. This version of pmap should also use threads when they become fully available. The list to be mapped should be distributed across remote workers, followed by threads and finally tasks. @samoconnor samoconnor added a commit to samoconnor/julia that referenced this issue on Feb 12 @samoconnor New functions imap and amap. 4b38b92 @samoconnor samoconnor referenced this issue on Feb 12 Merged StreamMapIterator (was asyncmap) #15058 @samoconnor samoconnor commented on Feb 13 @amitmurthy, like the idea of having pmap parallelise over tasks and workers. I think the first step is still to add the underlying imap and amap functions that the eventual pmap would rely on. See PR #15058. (I guess if pmap eventually provides an interface to amap that is just as convenient, then amap could be relegated to being an un-exported implementation function.) @samoconnor samoconnor commented on Feb 13 ... having pmap processes chunks of the list at a time will reduce network roundtrips. @tanmaykm , @ViralBShah saw this issue for some of their workloads. @tanmaykm , @ViralBShah can you post test cases that cover the issues that @amitmurthy describes above? @bjarthur bjarthur commented on Feb 13 in regards to the request for clearer names in #15058 (comment), it's not obvious to me, from the name alone, that the proposed imap is asynchronous. how about the following refactoring of the interface: get rid of amap and amap! and rename the proposed imap to amap. to get a collection, the user would have to manually say collect(amap), just like they have to do for a dict's keys() and values(). then later, re-factor pmap to also return an iterator, which gets asynchronously (and lazily?) filled, just like amap. so only map returns a collection, which minimizes breakage. so in the end, we have this: map -> collection amap -> iterator pmap -> iterator remote -> function @catch -> function retry -> function @samoconnor samoconnor commented on Feb 14 @bjarthur: I've taken you suggestion together with @StefanKarpinski's request for a clearer names and revised #15058 (comment) to have just one function: asyncmap(f, c...; ntasks=0) = StreamMapIterator(f, c...; ntasks=ntasks) @samoconnor samoconnor commented on Feb 14 Reading the code for remotecall -> future and remotecall_fetch -> value leaves me asking: Are these names the wrong way around? Should it be remotecall_future -> future and remotecall -> value. remotecall(f, id, args...) -> value would be more consistent with call(f, args...) -> value. The usual pattern in Julia seems to be that non-blocking implementations are hidden and public interfaces are blocking. e.g. #14546 (comment) [in the context of ::IO] Everything in Julia always blocks the task it's called from until it's done. Under the hood it's all non-blocking, but that's exposed to the programmer via task-level concurrency. Perhaps the preferred approach should be to use @async with remotecall -> value instead of remotecall -> future. In Base there are only 4 calls to remotecall vs 25-ish to remotecall_fetch. A GitHub-wide search finds 359 matches for remotecall_fetch, 115 matches for remotecall_wait and 458 matches for remotecall (458 includes remotecall_* due to the way GitHub text search works). @samoconnor samoconnor referenced this issue on Feb 14 Merged WorkerPool and remote() #15073 @samoconnor samoconnor commented on Feb 14 @amitmurthy: see PR #15073, WorkerPool and remote() @amitmurthy The Julia Language member amitmurthy commented on Feb 14 I would rather not export asyncmap in #15058 if we are going to support task asynchronous execution via pmap itself. I agree with your reasoning w.r.t. remotecall* names. It may be a bit late to change them....... What do others think? @samoconnor samoconnor added a commit to samoconnor/julia that referenced this issue on Feb 14 @samoconnor dont export asyncmap per JuliaLang#14843 (comment) e263ad6 @samoconnor samoconnor commented on Feb 14 I would rather not export asyncmap in #15058 OK, done: samoconnor@e263ad6 @samoconnor samoconnor commented on Feb 14 I agree with your reasoning w.r.t. remotecall* names. It may be a bit late to change them... Maybe if remote(f)(args...) per #15073 works well then remotecall* could eventually be deprecated. @samoconnor samoconnor added a commit to samoconnor/AsyncMap.jl that referenced this issue on Feb 14 @samoconnor reference JuliaLang/julia#14843 c78370a @StefanKarpinski The Julia Language member StefanKarpinski commented on Feb 14 Could pmap support both a list of workers (or a count) and a number of tasks per worker? Then we could have a single function for this. @samoconnor samoconnor commented on Feb 14 @StefanKarpinski: Could pmap support both a list of workers (or a count) and a number of tasks per worker? Yes, Amit suggested that in a previous comment. Given Amit's goal of "having pmap processes chunks of the list at a time [to] reduce network roundtrips", I'm thinking of something simple like this to start with: asyncpmap(f, v) = flatten( pmap(chunk -> asyncmap(f, chunk), eachchunk(v)) ) This assumes a reasonably balanced workload, which seems fine for now. Another possible scheme would be to treat all "task slots" across all workers as a single pool. This would imply network comms for each task but would allow for unbalanced IO-bound workloads (e.g. downloading and indexing a large number of arbitrary URLs). @tanmaykm The Julia Language member tanmaykm commented on Feb 17 @samoconnor The observation that @amitmurthy was referring to in #14843 (comment) was abhijithch/RecSys.jl#22 (comment) A simple test would be: julia> # $ julia -p 8 julia> f = (x)->nothing; julia> @time @parallel for i in 1:10^4 f(i) end; 0.117559 seconds (180.03 k allocations: 7.235 MB) julia> @time pmap(f, 1:10^4); 1.196159 seconds (2.87 M allocations: 77.740 MB, 1.11% gc time) @samoconnor samoconnor added a commit to samoconnor/julia that referenced this issue on Mar 7 @samoconnor New type MapIterator, AsyncMapIterator and StreamMapIterator cd6b48b @samoconnor samoconnor commented on Mar 7 @amitmurthy, @StefanKarpinski I intend to open a third PR (in addition to #15058 and #15073) to implement @catch and retry as described above. The retry function above relies on the @repeat n try... macro from here: https://github.com/samoconnor/Retry.jl/blob/master/src/repeat_try.jl. Question: Should the PR... include repeat_try.jl as-is (with comment and code formatting cleaned up a little to match Base conventions) ? or, implement retry without using @repeat. i.e. manually copy/pasting the logic from @repeat? (Note: At this point I think Retry.jl is reasonably well field tested. It is used in many places in the AWS* packages which I've been using quite heavily in code for a client.) @samoconnor samoconnor added a commit to samoconnor/julia that referenced this issue on Mar 8 @samoconnor add retry and @catch per b16feaa @samoconnor samoconnor referenced this issue on Mar 8 Merged @catch, retry, partition, asyncmap and pmap #15409 @samoconnor samoconnor commented on Mar 8 @amitmurthy I have submitted #15409 with @catch and a lightweight implementation of retry (not dependant on Retry.jl). @samoconnor samoconnor added a commit to samoconnor/julia that referenced this issue on Mar 8 @samoconnor add retry and @catch per 41a317d @samoconnor samoconnor commented on Mar 8 Amit's notes pasted from #15073: We will also need to: export WorkerPool and related methods document the concept and usage in the manual mention remote and WorkerPool in NEWS.md @samoconnor samoconnor added a commit to samoconnor/julia that referenced this issue on Mar 9 @samoconnor add retry and @catch per 2ca8142 @samoconnor samoconnor commented on Mar 10 Status #15058 StreamMapIterator (was asyncmap) is merged. #15073 WorkerPool and remote() is merged. #15409 @catch, retry and split(c, n) is waiting for review. Next steps New pmap might look something like this: asyncmap(f, c...) = collect(StreamMapIterator(f, c...)) function pmap(f, c...; err_retry=nothing, err_stop=nothing, pids=nothing) if err_retry != nothing depwarn("`err_retry` is deprecated, use `pmap(retry(f), c...) or `asyncmap(remote(retry(f)), c...)`.", :pmap) if err_retry == true return asyncmap(retry(remote(f)), c...) end end if err_stop != nothing depwarn("`err_stop` is deprecated, use `pmap(@catch(f), c...).", :pmap) if err_stop == true return asyncmap(remote(@catch(f)), c...) end end if pids != nothing depwarn("`pids` is deprecated. It no longer has any effect.", :pmap) end return asyncmap(remote(f), c...) end Perhaps there should be a kw arg pool= to replace pids=. e.g... remote(f, pool) = (args...)->remotecall_fetch(f, pool, args...) pmap(f, c...; pool=default_worker_pool()) = asyncmap(remote(f, pool), c...) @samoconnor samoconnor added a commit to samoconnor/julia that referenced this issue on Mar 11 @samoconnor add retry and @catch per ee52fa6 @amitmurthy amitmurthy referenced this issue on Mar 16 Open `pmap` with `nprocs() == 1` #15532 @samoconnor samoconnor added a commit to samoconnor/julia that referenced this issue on Mar 22 @samoconnor add retry and @catch per a28a914 @samoconnor samoconnor added a commit to samoconnor/julia that referenced this issue on Mar 24 @samoconnor add retry and @catch per cd0c486 @samoconnor samoconnor added a commit to samoconnor/julia that referenced this issue on Mar 24 @samoconnor add retry and @catch per 6a438ac @samoconnor samoconnor commented on Mar 24 Refactored pmap is now in #15409. function pgenerate(f, c) batches = batchsplit(c, min_batch_count = nworkers() * 3) return flatten(asyncgenerate(remote(b -> asyncmap(f, b)), batches)) end pmap(f, c) = collect(pgenerate(f, c)) @samoconnor samoconnor added a commit to samoconnor/julia that referenced this issue on Mar 24 @samoconnor add retry and @catch per 8561984 @samoconnor samoconnor commented on Mar 28 #15409 introduces 3 un-exported generate functions that might be generally useful: generate is like map but returns an iterator asyncgenerate is like asyncmap but returns an iterator pgenerate is like pmap but returns an iterator Should these be exported for general use in a seperate PR? They are quite useful for chaining things together: # For one... result_url = upload_results(crunch_numbers(download_data(data_url))) # For many in parallel... result_urls = asyncmap(upload_results, pgenerate(crunch_numbers, asyncgenerate(download_data, url_list))) @StefanKarpinski The Julia Language member StefanKarpinski commented on Mar 28 I like where this is headed, but... any time we use naming to express combinations of behaviors, I feel like we're missing something. In this case, we have map vs. generate, sync vs. async, and local vs. distributed (which is somewhat unfortunately called parallel rather than distributed). It seems like an ideal case for something compositional rather than having some subset of the eight possible names. @samoconnor samoconnor commented on Mar 28 FWIW #15409 tries to make things compositional under the covers. e.g. asyncmap(f, c) = collect(asyncgenerate(f, c)), pgenerate(f,c) = asyncgenerate(remote(f), c), and pmap(f,c) = collect(pgenerate(f,c)). My preference would be to make map and generate async by default (removing the need for asyncmap and asyncgenerate). There could still be special short-cut methods of map for cases where the types or size of the collection mean that a non-async implementation is faster. Maybe there could eventually be a static analysis that f never yields in some cases, thereby allowing the faster non-async map to be used. I'd also remove pmap and just tell people to do map(remote(f), c) instead. Then I'd make map return an iterator (removing the need for generate). This would cause the need for collect(map(f, c)) at some, but not all, call sites. That would leave just: map and remote. ... maybe too extreme? @samoconnor samoconnor added a commit to samoconnor/julia that referenced this issue on Mar 29 @samoconnor add retry and @catch per 653fa50 @samoconnor samoconnor added a commit to samoconnor/julia that referenced this issue on Mar 30 @samoconnor add retry and @catch per d9d9ae2 @samoconnor samoconnor added a commit to samoconnor/julia that referenced this issue on Apr 12 @samoconnor add retry and @catch per 31f99be @samoconnor samoconnor added a commit to samoconnor/julia that referenced this issue on Apr 12 @samoconnor @catch, retry, partition, asyncmap and refactored pmap (Seep #15409 and 808e868 @amitmurthy amitmurthy added a commit that referenced this issue on Apr 14 @samoconnor @catch, retry, partition, asyncmap and pmap (#15409) cf52a34 @amitmurthy The Julia Language member amitmurthy commented on Apr 18 Closed by #15409 @amitmurthy amitmurthy closed this on Apr 18 @samoconnor samoconnor referenced this issue on Apr 27 Merged Configure pmap behavior via keyword args #15975