RFC: Simplifying and generalising pmap 14843 Closed samoconnor opened this Issue on Jan 28 · 33 comments Projects None yet Labels parallel Milestone No milestone Assignees No one assigned 8 participants samoconnor bjarthur amitmurthy blakejohnson tkelman StefanKarpinski tanmaykm kshyatt Notifications samoconnor samoconnor commented on Jan 28 Update: 15409 implements most of this and is now merged This issue covers some of the same ground as 12943 stale , 14515 and 14736. My intention is to present an overall pmap refactoring plan here for comment and submit new PRs if it is seen as useful. pmap Features The current pmap implementation has the following features: Using sync async to run mapped function in parallel and collect results. Automatic allocation of idle workers to remotecall_wait calls. Not reusing workers believed to be faulty. Option to retry on error. Option to continue mapping despite errors returning exceptions in the result array . These are all useful features. However, it seems to me that pmap currently tries to do too much and that many of these features would be useful in contexts other than pmap: 1. is useful when using RPC mechanisms other than remotecall e.g. HTTP, readstring ::Cmd or AWS Lambda . 2. and 3. are probably useful to other users of the remotecall mechanism. If 4. and 5. are useful with pmap they should also be useful with ordinary map. Proposed Separation of Features amap asyncmap 15058 asyncmap adds sync async to regular map. feature 1. Run `::Cmd`s in parallel... julia time map t- run `sleep $t` , 1:3 6 seconds julia time asyncmap t- run `sleep $t` , 1:3 3 seconds Run downloads in parallel... julia url_data asyncmap download, urls WorkerPool and remote 15073 A WorkerPool keeps track of which workers are busy. take! default_worker_pool yeilds a worker that is not already busy doing remotecall_wait, waiting if needed feature 2 . . If there is a reliable way to identify faulty workers feature 3. then worker will not return fault workers. Partial implementation in comments of: 14736 remote takes a function and returns a lambda that executes the function on a remote worker. Using asyncmap and remote together... pmap f, c... asyncmap remote f , c... function remotecall_fetch f, args... remotecall_fetch f, default_worker_pool , args... end remote f::Function args... - remotecall_fetch f, args... catch 15409 catch takes a function can returns a lambda that catches any exceptions thrown by the function and returns the exception object. pmap f, v; err_stop false can be replaced with pmap catch f , v feature 5. macro catch f : args... - try $f args... catch ex; ex end end retry 15409 retry takes a function and returns a lambda that retries the function if an error occurs feature 4. . pmap f, v; err_retry true can be replaced with pmap retry f , v , or for more granular error handling pmap retry f, e - isa e, NetworkTimeoutError , v function retry f::Function, condition::Function e - true, n::Integer 3 args... - repeat n try f args... catch e retry if condition e end end end repeat and retry are implemented in https: github.com samoconnor Retry-jl pmap Defaults pmap currently has some unexpected at least to me defaults. The default behaviour is to re-execute the mapped function on error irrespective of the type of error. i.e. the function is assumed to be idempotent . When the function throws an error, the worker is removed from the pool irrespective of the type of error i.e. it is assumed that all errors are due to a faulty worker . By default, errors thrown by the mapped function are returned as part of the result array rather than being re-thrown. This is inconsistent with regular map. This was referenced on Jan 28 Closed `pmap ` cleanup. Fixes 12908. 12943 Closed map using async 14515 Closed idleworker function. 14736 kshyatt kshyatt added the parallel label on Jan 29 bjarthur bjarthur commented on Feb 8 how easy would it be to have pmap dynamically adjust to a changing number of workers in the pool? my dream is to have a non-blocking addprocs for use on my private shared SGE cluster. when the load is high, qsub'ing a large number of procs can take awhile to completely finish. it would be huge be if ClusterManagers.launch was asynchronous, and could add to the pool as soon as each proc started, instead of waiting until they all started. pmap could then get started right away, at first using just a few workers, and adding more as they became available. amitmurthy amitmurthy The Julia Language member amitmurthy commented on Feb 8 async addprocs .... should work. Changing pmap to use a shared queue, as discussed here - 14736 comment , should allow us to start using started workers right away. amitmurthy The Julia Language member amitmurthy commented on Feb 9 To expand on the above: The SGE cluster manager creates a WorkerPool consisting of the workers it has launched SGE manage with op as :register will add newly launched workers to the worker pool as and when they come online. :deregister will remove them. pmap has an option to use a WorkerPool for distributing work User code would be something like: cman SGEManager np, queue async addprocs cman pmap f, jobs; pids worker_pool cman worker_pool is a method implemented by the SGE cluster manager and returns its pool of workers. bjarthur bjarthur commented on Feb 9 thanks amit. question: in your proposed interface, can a worker which comes online during the execution of a particular pmap be utilized by that same call to pmap? that is what i am hoping for. amitmurthy The Julia Language member amitmurthy commented on Feb 9 Yes. amitmurthy The Julia Language member amitmurthy commented on Feb 10 samoconnor, the proposed changes look good. Look forward to the PRs. samoconnor samoconnor commented on Feb 10 amitmurthy, In the interests of avoiding one-big-PR, I plan to start by submitting a PR adds just adds amap and its underlying iterator version imap without changing any existing code as follows. Do you agree with this approach? amap f, c...; async_max 100 → collection Transform collection c by applying f to each element using at most 100 asynchronous tasks. For multiple collection arguments, apply f elementwise. Note: amap f, c...; async_max 1 is equivalent to map f, c... . Implementation: amap f, c...; kv... collect imap f, c...; kv... imap f, c...; async_max 100 → iterator Apply f to each element of c using at most 100 asynchronous tasks. For multiple collection arguments, apply f elementwise. Note: collect imap f, c...; async_max 1 is equivalent to map f, c... . Implementation using StreamMapItr: imap f, c...; async_max nothing StreamMapItr f, c...; async_max async_max amap! function, collection; async_max 100 In-place version of amap . amap! function, destination, collection...; async_max 100 Like amap , but stores the result in destination rather than a new collection. destination must be at least as large as the first collection. Implementation using AsyncMapItr: function amap! f, c...; async_max nothing destination c 1 if length c 1 c c 2:end end for task in AsyncMapItr f, destination, c..., async_max async_max end return destination end bjarthur bjarthur commented on Feb 11 one benefit of a single large PR is that we could see all the proposed changes to the API at once. for example, i'm curious whether you plan to include an iterator-returning version of pmap. i proposed this myself once long ago: 4601 blakejohnson blakejohnson commented on Feb 11 :+1: I definitely like the separation into composable chunks. tkelman The Julia Language member tkelman commented on Feb 11 Please don't open a PR that tries to change too many things at once. Incremental changes are much smoother to get reviewed and merged. The end vision can be seen at once in a package or separate branch, but PR's should be broken into smaller gradual chunks whenever possible. samoconnor samoconnor commented on Feb 12 bjarthur: If pmap f, c... amap remote f , c... then an iterator version of pmap would be simply: ipmap f, c... imap remote f , c... . I think this is most definitely a useful thing. I don't know if ipmap is the right name for it... sounds like it has something to do with IP Addresses . amitmurthy The Julia Language member amitmurthy commented on Feb 12 Some thoughts. Apologies for the delayed response. My understanding is that the suggested amap parallelizes over tasks and pmap over workers. Why don't we just have pmap that would use tasks, workers or a worker pool depending on a keyword arg. pmap as a parallel map is well understood. Async execution is typically understood to be non-blocking, i.e., returns immediately with the expression function being executed in a different task. Putting up some thoughts for discussion: pmap f, c...; ntasks N, pids pid_array | worker_pool where ntasks is the concurrent tasks per pid, defaults to 100 pids identifies either specific workers to distribute these tasks over or a worker pool This will also help optimize pmap in cases where the data to be mapped is much larger than the number of workers and having pmap processes chunks of the list at a time will reduce network roundtrips. tanmaykm , ViralBShah saw this issue for some of their workloads. This version of pmap should also use threads when they become fully available. The list to be mapped should be distributed across remote workers, followed by threads and finally tasks. samoconnor samoconnor added a commit to samoconnor julia that referenced this issue on Feb 12 samoconnor New functions imap and amap. 4b38b92 samoconnor samoconnor referenced this issue on Feb 12 Merged StreamMapIterator was asyncmap 15058 samoconnor samoconnor commented on Feb 13 amitmurthy, like the idea of having pmap parallelise over tasks and workers. I think the first step is still to add the underlying imap and amap functions that the eventual pmap would rely on. See PR 15058. I guess if pmap eventually provides an interface to amap that is just as convenient, then amap could be relegated to being an un-exported implementation function. samoconnor samoconnor commented on Feb 13 ... having pmap processes chunks of the list at a time will reduce network roundtrips. tanmaykm , ViralBShah saw this issue for some of their workloads. tanmaykm , ViralBShah can you post test cases that cover the issues that amitmurthy describes above? bjarthur bjarthur commented on Feb 13 in regards to the request for clearer names in 15058 comment , it's not obvious to me, from the name alone, that the proposed imap is asynchronous. how about the following refactoring of the interface: get rid of amap and amap! and rename the proposed imap to amap. to get a collection, the user would have to manually say collect amap , just like they have to do for a dict's keys and values . then later, re-factor pmap to also return an iterator, which gets asynchronously and lazily? filled, just like amap. so only map returns a collection, which minimizes breakage. so in the end, we have this: map - collection amap - iterator pmap - iterator remote - function catch - function retry - function samoconnor samoconnor commented on Feb 14 bjarthur: I've taken you suggestion together with StefanKarpinski's request for a clearer names and revised 15058 comment to have just one function: asyncmap f, c...; ntasks 0 StreamMapIterator f, c...; ntasks ntasks samoconnor samoconnor commented on Feb 14 Reading the code for remotecall - future and remotecall_fetch - value leaves me asking: Are these names the wrong way around? Should it be remotecall_future - future and remotecall - value. remotecall f, id, args... - value would be more consistent with call f, args... - value. The usual pattern in Julia seems to be that non-blocking implementations are hidden and public interfaces are blocking. e.g. 14546 comment in the context of ::IO Everything in Julia always blocks the task it's called from until it's done. Under the hood it's all non-blocking, but that's exposed to the programmer via task-level concurrency. Perhaps the preferred approach should be to use async with remotecall - value instead of remotecall - future. In Base there are only 4 calls to remotecall vs 25-ish to remotecall_fetch. A GitHub-wide search finds 359 matches for remotecall_fetch, 115 matches for remotecall_wait and 458 matches for remotecall 458 includes remotecall_ due to the way GitHub text search works . samoconnor samoconnor referenced this issue on Feb 14 Merged WorkerPool and remote 15073 samoconnor samoconnor commented on Feb 14 amitmurthy: see PR 15073, WorkerPool and remote amitmurthy The Julia Language member amitmurthy commented on Feb 14 I would rather not export asyncmap in 15058 if we are going to support task asynchronous execution via pmap itself. I agree with your reasoning w.r.t. remotecall names. It may be a bit late to change them....... What do others think? samoconnor samoconnor added a commit to samoconnor julia that referenced this issue on Feb 14 samoconnor dont export asyncmap per JuliaLang 14843 comment e263ad6 samoconnor samoconnor commented on Feb 14 I would rather not export asyncmap in 15058 OK, done: samoconnor e263ad6 samoconnor samoconnor commented on Feb 14 I agree with your reasoning w.r.t. remotecall names. It may be a bit late to change them... Maybe if remote f args... per 15073 works well then remotecall could eventually be deprecated. samoconnor samoconnor added a commit to samoconnor AsyncMap-jl that referenced this issue on Feb 14 samoconnor reference JuliaLang julia 14843 c78370a StefanKarpinski The Julia Language member StefanKarpinski commented on Feb 14 Could pmap support both a list of workers or a count and a number of tasks per worker? Then we could have a single function for this. samoconnor samoconnor commented on Feb 14 StefanKarpinski: Could pmap support both a list of workers or a count and a number of tasks per worker? Yes, Amit suggested that in a previous comment. Given Amit's goal of having pmap processes chunks of the list at a time to reduce network roundtrips , I'm thinking of something simple like this to start with: asyncpmap f, v flatten pmap chunk - asyncmap f, chunk , eachchunk v This assumes a reasonably balanced workload, which seems fine for now. Another possible scheme would be to treat all task slots across all workers as a single pool. This would imply network comms for each task but would allow for unbalanced IO-bound workloads e.g. downloading and indexing a large number of arbitrary URLs . tanmaykm The Julia Language member tanmaykm commented on Feb 17 samoconnor The observation that amitmurthy was referring to in 14843 comment was abhijithch RecSys-jl 22 comment A simple test would be: julia $ julia -p 8 julia f x - nothing; julia time parallel for i in 1:10 4 f i end; 0.117559 seconds 180.03 k allocations: 7.235 MB julia time pmap f, 1:10 4 ; 1.196159 seconds 2.87 M allocations: 77.740 MB, 1.11 gc time samoconnor samoconnor added a commit to samoconnor julia that referenced this issue on Mar 7 samoconnor New type MapIterator, AsyncMapIterator and StreamMapIterator cd6b48b samoconnor samoconnor commented on Mar 7 amitmurthy, StefanKarpinski I intend to open a third PR in addition to 15058 and 15073 to implement catch and retry as described above. The retry function above relies on the repeat n try... macro from here: https: github.com samoconnor Retry-jl blob master src repeat_try-jl. Question: Should the PR... include repeat_try-jl as-is with comment and code formatting cleaned up a little to match Base conventions ? or, implement retry without using repeat. i.e. manually copy pasting the logic from repeat? Note: At this point I think Retry-jl is reasonably well field tested. It is used in many places in the AWS packages which I've been using quite heavily in code for a client. samoconnor samoconnor added a commit to samoconnor julia that referenced this issue on Mar 8 samoconnor add retry and catch per b16feaa samoconnor samoconnor referenced this issue on Mar 8 Merged catch, retry, partition, asyncmap and pmap 15409 samoconnor samoconnor commented on Mar 8 amitmurthy I have submitted 15409 with catch and a lightweight implementation of retry not dependant on Retry-jl . samoconnor samoconnor added a commit to samoconnor julia that referenced this issue on Mar 8 samoconnor add retry and catch per 41a317d samoconnor samoconnor commented on Mar 8 Amit's notes pasted from 15073: We will also need to: export WorkerPool and related methods document the concept and usage in the manual mention remote and WorkerPool in NEWS.md samoconnor samoconnor added a commit to samoconnor julia that referenced this issue on Mar 9 samoconnor add retry and catch per 2ca8142 samoconnor samoconnor commented on Mar 10 Status 15058 StreamMapIterator was asyncmap is merged. 15073 WorkerPool and remote is merged. 15409 catch, retry and split c, n is waiting for review. Next steps New pmap might look something like this: asyncmap f, c... collect StreamMapIterator f, c... function pmap f, c...; err_retry nothing, err_stop nothing, pids nothing if err_retry ! nothing depwarn `err_retry` is deprecated, use `pmap retry f , c... or `asyncmap remote retry f , c... `. , :pmap if err_retry true return asyncmap retry remote f , c... end end if err_stop ! nothing depwarn `err_stop` is deprecated, use `pmap catch f , c... . , :pmap if err_stop true return asyncmap remote catch f , c... end end if pids ! nothing depwarn `pids` is deprecated. It no longer has any effect. , :pmap end return asyncmap remote f , c... end Perhaps there should be a kw arg pool to replace pids . e.g... remote f, pool args... - remotecall_fetch f, pool, args... pmap f, c...; pool default_worker_pool asyncmap remote f, pool , c... samoconnor samoconnor added a commit to samoconnor julia that referenced this issue on Mar 11 samoconnor add retry and catch per ee52fa6 amitmurthy amitmurthy referenced this issue on Mar 16 Open `pmap` with `nprocs 1` 15532 samoconnor samoconnor added a commit to samoconnor julia that referenced this issue on Mar 22 samoconnor add retry and catch per a28a914 samoconnor samoconnor added a commit to samoconnor julia that referenced this issue on Mar 24 samoconnor add retry and catch per cd0c486 samoconnor samoconnor added a commit to samoconnor julia that referenced this issue on Mar 24 samoconnor add retry and catch per 6a438ac samoconnor samoconnor commented on Mar 24 Refactored pmap is now in 15409. function pgenerate f, c batches batchsplit c, min_batch_count nworkers 3 return flatten asyncgenerate remote b - asyncmap f, b , batches end pmap f, c collect pgenerate f, c samoconnor samoconnor added a commit to samoconnor julia that referenced this issue on Mar 24 samoconnor add retry and catch per 8561984 samoconnor samoconnor commented on Mar 28 15409 introduces 3 un-exported generate functions that might be generally useful: generate is like map but returns an iterator asyncgenerate is like asyncmap but returns an iterator pgenerate is like pmap but returns an iterator Should these be exported for general use in a seperate PR? They are quite useful for chaining things together: For one... result_url upload_results crunch_numbers download_data data_url For many in parallel... result_urls asyncmap upload_results, pgenerate crunch_numbers, asyncgenerate download_data, url_list StefanKarpinski The Julia Language member StefanKarpinski commented on Mar 28 I like where this is headed, but... any time we use naming to express combinations of behaviors, I feel like we're missing something. In this case, we have map vs. generate, sync vs. async, and local vs. distributed which is somewhat unfortunately called parallel rather than distributed . It seems like an ideal case for something compositional rather than having some subset of the eight possible names. samoconnor samoconnor commented on Mar 28 FWIW 15409 tries to make things compositional under the covers. e.g. asyncmap f, c collect asyncgenerate f, c , pgenerate f,c asyncgenerate remote f , c , and pmap f,c collect pgenerate f,c . My preference would be to make map and generate async by default removing the need for asyncmap and asyncgenerate . There could still be special short-cut methods of map for cases where the types or size of the collection mean that a non-async implementation is faster. Maybe there could eventually be a static analysis that f never yields in some cases, thereby allowing the faster non-async map to be used. I'd also remove pmap and just tell people to do map remote f , c instead. Then I'd make map return an iterator removing the need for generate . This would cause the need for collect map f, c at some, but not all, call sites. That would leave just: map and remote. ... maybe too extreme? samoconnor samoconnor added a commit to samoconnor julia that referenced this issue on Mar 29 samoconnor add retry and catch per 653fa50 samoconnor samoconnor added a commit to samoconnor julia that referenced this issue on Mar 30 samoconnor add retry and catch per d9d9ae2 samoconnor samoconnor added a commit to samoconnor julia that referenced this issue on Apr 12 samoconnor add retry and catch per 31f99be samoconnor samoconnor added a commit to samoconnor julia that referenced this issue on Apr 12 samoconnor catch, retry, partition, asyncmap and refactored pmap Seep 15409 and 808e868 amitmurthy amitmurthy added a commit that referenced this issue on Apr 14 samoconnor catch, retry, partition, asyncmap and pmap 15409 cf52a34 amitmurthy The Julia Language member amitmurthy commented on Apr 18 Closed by 15409 amitmurthy amitmurthy closed this on Apr 18 samoconnor samoconnor referenced this issue on Apr 27 Merged Configure pmap behavior via keyword args 15975