sync: move pool related code to `sync.pool`, cleanup, add a README.md
parent
93c1c1cec3
commit
578de634fe
|
@ -4,7 +4,7 @@ import os
|
||||||
import time
|
import time
|
||||||
import term
|
import term
|
||||||
import benchmark
|
import benchmark
|
||||||
import sync
|
import sync.pool
|
||||||
import v.pref
|
import v.pref
|
||||||
import v.util.vtest
|
import v.util.vtest
|
||||||
|
|
||||||
|
@ -198,7 +198,7 @@ pub fn (mut ts TestSession) test() {
|
||||||
remaining_files = vtest.filter_vtest_only(remaining_files, fix_slashes: false)
|
remaining_files = vtest.filter_vtest_only(remaining_files, fix_slashes: false)
|
||||||
ts.files = remaining_files
|
ts.files = remaining_files
|
||||||
ts.benchmark.set_total_expected_steps(remaining_files.len)
|
ts.benchmark.set_total_expected_steps(remaining_files.len)
|
||||||
mut pool_of_test_runners := sync.new_pool_processor(callback: worker_trunner)
|
mut pool_of_test_runners := pool.new_pool_processor(callback: worker_trunner)
|
||||||
// for handling messages across threads
|
// for handling messages across threads
|
||||||
ts.nmessages = chan LogMessage{cap: 10000}
|
ts.nmessages = chan LogMessage{cap: 10000}
|
||||||
ts.nprint_ended = chan int{cap: 0}
|
ts.nprint_ended = chan int{cap: 0}
|
||||||
|
@ -218,7 +218,7 @@ pub fn (mut ts TestSession) test() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn worker_trunner(mut p sync.PoolProcessor, idx int, thread_id int) voidptr {
|
fn worker_trunner(mut p pool.PoolProcessor, idx int, thread_id int) voidptr {
|
||||||
mut ts := &TestSession(p.get_shared_context())
|
mut ts := &TestSession(p.get_shared_context())
|
||||||
tmpd := ts.vtmp_dir
|
tmpd := ts.vtmp_dir
|
||||||
show_stats := '-stats' in ts.vargs.split(' ')
|
show_stats := '-stats' in ts.vargs.split(' ')
|
||||||
|
@ -230,7 +230,7 @@ fn worker_trunner(mut p sync.PoolProcessor, idx int, thread_id int) voidptr {
|
||||||
p.set_thread_context(idx, tls_bench)
|
p.set_thread_context(idx, tls_bench)
|
||||||
}
|
}
|
||||||
tls_bench.no_cstep = true
|
tls_bench.no_cstep = true
|
||||||
dot_relative_file := p.get_string_item(idx)
|
dot_relative_file := p.get_item<string>(idx)
|
||||||
mut relative_file := dot_relative_file.replace('./', '')
|
mut relative_file := dot_relative_file.replace('./', '')
|
||||||
if ts.root_relative {
|
if ts.root_relative {
|
||||||
relative_file = relative_file.replace(ts.vroot + os.path_separator, '')
|
relative_file = relative_file.replace(ts.vroot + os.path_separator, '')
|
||||||
|
@ -239,8 +239,11 @@ fn worker_trunner(mut p sync.PoolProcessor, idx int, thread_id int) voidptr {
|
||||||
// Ensure that the generated binaries will be stored in the temporary folder.
|
// Ensure that the generated binaries will be stored in the temporary folder.
|
||||||
// Remove them after a test passes/fails.
|
// Remove them after a test passes/fails.
|
||||||
fname := os.file_name(file)
|
fname := os.file_name(file)
|
||||||
generated_binary_fname := if os.user_os() == 'windows' { fname.replace('.v', '.exe') } else { fname.replace('.v',
|
generated_binary_fname := if os.user_os() == 'windows' {
|
||||||
'') }
|
fname.replace('.v', '.exe')
|
||||||
|
} else {
|
||||||
|
fname.replace('.v', '')
|
||||||
|
}
|
||||||
generated_binary_fpath := os.join_path(tmpd, generated_binary_fname)
|
generated_binary_fpath := os.join_path(tmpd, generated_binary_fname)
|
||||||
if os.exists(generated_binary_fpath) {
|
if os.exists(generated_binary_fpath) {
|
||||||
if ts.rm_binaries {
|
if ts.rm_binaries {
|
||||||
|
@ -258,7 +261,7 @@ fn worker_trunner(mut p sync.PoolProcessor, idx int, thread_id int) voidptr {
|
||||||
ts.benchmark.skip()
|
ts.benchmark.skip()
|
||||||
tls_bench.skip()
|
tls_bench.skip()
|
||||||
ts.append_message(.skip, tls_bench.step_message_skip(relative_file))
|
ts.append_message(.skip, tls_bench.step_message_skip(relative_file))
|
||||||
return sync.no_result
|
return pool.no_result
|
||||||
}
|
}
|
||||||
if show_stats {
|
if show_stats {
|
||||||
ts.append_message(.ok, term.h_divider('-'))
|
ts.append_message(.ok, term.h_divider('-'))
|
||||||
|
@ -270,7 +273,7 @@ fn worker_trunner(mut p sync.PoolProcessor, idx int, thread_id int) voidptr {
|
||||||
ts.failed = true
|
ts.failed = true
|
||||||
ts.benchmark.fail()
|
ts.benchmark.fail()
|
||||||
tls_bench.fail()
|
tls_bench.fail()
|
||||||
return sync.no_result
|
return pool.no_result
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if testing.show_start {
|
if testing.show_start {
|
||||||
|
@ -281,7 +284,7 @@ fn worker_trunner(mut p sync.PoolProcessor, idx int, thread_id int) voidptr {
|
||||||
ts.benchmark.fail()
|
ts.benchmark.fail()
|
||||||
tls_bench.fail()
|
tls_bench.fail()
|
||||||
ts.append_message(.fail, tls_bench.step_message_fail(relative_file))
|
ts.append_message(.fail, tls_bench.step_message_fail(relative_file))
|
||||||
return sync.no_result
|
return pool.no_result
|
||||||
}
|
}
|
||||||
if r.exit_code != 0 {
|
if r.exit_code != 0 {
|
||||||
ts.failed = true
|
ts.failed = true
|
||||||
|
@ -300,7 +303,7 @@ fn worker_trunner(mut p sync.PoolProcessor, idx int, thread_id int) voidptr {
|
||||||
os.rm(generated_binary_fpath) or { panic(err) }
|
os.rm(generated_binary_fpath) or { panic(err) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return sync.no_result
|
return pool.no_result
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn vlib_should_be_present(parent_dir string) {
|
pub fn vlib_should_be_present(parent_dir string) {
|
||||||
|
@ -343,8 +346,7 @@ pub fn prepare_test_session(zargs string, folder string, oskipped []string, main
|
||||||
}
|
}
|
||||||
$if windows {
|
$if windows {
|
||||||
// skip pico and process/command examples on windows
|
// skip pico and process/command examples on windows
|
||||||
if f.ends_with('examples\\pico\\pico.v')
|
if f.ends_with('examples\\pico\\pico.v') || f.ends_with('examples\\process\\command.v') {
|
||||||
|| f.ends_with('examples\\process\\command.v') {
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,25 +3,25 @@
|
||||||
// that can be found in the LICENSE file.
|
// that can be found in the LICENSE file.
|
||||||
import net.http
|
import net.http
|
||||||
import json
|
import json
|
||||||
import sync
|
import sync.pool
|
||||||
|
|
||||||
struct Story {
|
struct Story {
|
||||||
title string
|
title string
|
||||||
url string
|
url string
|
||||||
}
|
}
|
||||||
|
|
||||||
fn worker_fetch(p &sync.PoolProcessor, cursor int, worker_id int) voidptr {
|
fn worker_fetch(p &pool.PoolProcessor, cursor int, worker_id int) voidptr {
|
||||||
id := p.get_int_item(cursor)
|
id := p.get_item<int>(cursor)
|
||||||
resp := http.get('https://hacker-news.firebaseio.com/v0/item/${id}.json') or {
|
resp := http.get('https://hacker-news.firebaseio.com/v0/item/${id}.json') or {
|
||||||
println('failed to fetch data from /v0/item/${id}.json')
|
println('failed to fetch data from /v0/item/${id}.json')
|
||||||
return sync.no_result
|
return pool.no_result
|
||||||
}
|
}
|
||||||
story := json.decode(Story,resp.text) or {
|
story := json.decode(Story, resp.text) or {
|
||||||
println('failed to decode a story')
|
println('failed to decode a story')
|
||||||
return sync.no_result
|
return pool.no_result
|
||||||
}
|
}
|
||||||
println('# $cursor) $story.title | $story.url')
|
println('# $cursor) $story.title | $story.url')
|
||||||
return sync.no_result
|
return pool.no_result
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fetches top HN stories in parallel, depending on how many cores you have
|
// Fetches top HN stories in parallel, depending on how many cores you have
|
||||||
|
@ -30,20 +30,20 @@ fn main() {
|
||||||
println('failed to fetch data from /v0/topstories.json')
|
println('failed to fetch data from /v0/topstories.json')
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
mut ids := json.decode([]int,resp.text) or {
|
mut ids := json.decode([]int, resp.text) or {
|
||||||
println('failed to decode topstories.json')
|
println('failed to decode topstories.json')
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if ids.len > 10 {
|
if ids.len > 10 {
|
||||||
ids = ids[0..10]
|
ids = ids[0..10]
|
||||||
}
|
}
|
||||||
mut fetcher_pool := sync.new_pool_processor({
|
mut fetcher_pool := pool.new_pool_processor(
|
||||||
callback: worker_fetch
|
callback: worker_fetch
|
||||||
})
|
)
|
||||||
// NB: if you do not call set_max_jobs, the pool will try to use an optimal
|
// NB: if you do not call set_max_jobs, the pool will try to use an optimal
|
||||||
// number of threads, one per each core in your system, which in most
|
// number of threads, one per each core in your system, which in most
|
||||||
// cases is what you want anyway... You can override the automatic choice
|
// cases is what you want anyway... You can override the automatic choice
|
||||||
// by setting the VJOBS environment variable too.
|
// by setting the VJOBS environment variable too.
|
||||||
// fetcher_pool.set_max_jobs( 4 )
|
// fetcher_pool.set_max_jobs( 4 )
|
||||||
fetcher_pool.work_on_items_i(ids)
|
fetcher_pool.work_on_items(ids)
|
||||||
}
|
}
|
||||||
|
|
254
vlib/sync/pool.v
254
vlib/sync/pool.v
|
@ -1,254 +0,0 @@
|
||||||
module sync
|
|
||||||
|
|
||||||
import runtime
|
|
||||||
|
|
||||||
// * Goal: this file provides a convenient way to run identical tasks over a list
|
|
||||||
// * of items in parallel, without worrying about waitgroups, mutexes and so on.
|
|
||||||
// *
|
|
||||||
// * Usage example:
|
|
||||||
// * struct SResult{ s string }
|
|
||||||
// * fn sprocess(p &sync.PoolProcessor, idx, wid int) voidptr {
|
|
||||||
// * item := p.get_item<string>(idx)
|
|
||||||
// * println('idx: $idx, wid: $wid, item: ' + item)
|
|
||||||
// * return &SResult{ item.reverse() }
|
|
||||||
// * }
|
|
||||||
// * pool := sync.new_pool_processor({ callback: sprocess })
|
|
||||||
// * pool.work_on_items(['a','b','c','d','e','f','g'])
|
|
||||||
// * // optionally, you can iterate over the results too:
|
|
||||||
// * for x in pool.get_results<SResult>() {
|
|
||||||
// * println('result: $x.s')
|
|
||||||
// * }
|
|
||||||
// *
|
|
||||||
// * See https://github.com/vlang/v/blob/master/vlib/sync/pool_test.v for a
|
|
||||||
// * more detailed usage example.
|
|
||||||
// *
|
|
||||||
// * After all the work is done in parallel by the worker threads in the pool,
|
|
||||||
// * pool.work_on_items will return, and you can then call
|
|
||||||
// * pool.get_results<Result>() to retrieve a list of all the results,
|
|
||||||
// * that the worker callbacks returned for each item that you passed.
|
|
||||||
// * The parameters of new_pool_processor are:
|
|
||||||
// * context.maxjobs: when 0 (the default), the PoolProcessor will use an
|
|
||||||
// * optimal for your system number of threads to process your items
|
|
||||||
// * context.callback: this should be a callback function, that each worker
|
|
||||||
// * thread in the pool will run for each item.
|
|
||||||
// * The callback function will receive as parameters:
|
|
||||||
// * 1) the PoolProcessor instance, so it can call
|
|
||||||
// * p.get_item<int>(idx) to get the actual item at index idx
|
|
||||||
// * NB: for now, you are better off calling p.get_string_item(idx)
|
|
||||||
// * or p.get_int_item(idx) ; TODO: vfmt and generics
|
|
||||||
// * 2) idx - the index of the currently processed item
|
|
||||||
// * 3) task_id - the index of the worker thread in which the callback
|
|
||||||
// * function is running.
|
|
||||||
|
|
||||||
pub const (
|
|
||||||
no_result = voidptr(0)
|
|
||||||
)
|
|
||||||
|
|
||||||
pub struct PoolProcessor {
|
|
||||||
thread_cb voidptr
|
|
||||||
mut:
|
|
||||||
njobs int
|
|
||||||
items []voidptr
|
|
||||||
results []voidptr
|
|
||||||
ntask int // writing to this should be locked by ntask_mtx.
|
|
||||||
ntask_mtx &Mutex
|
|
||||||
waitgroup &WaitGroup
|
|
||||||
shared_context voidptr
|
|
||||||
thread_contexts []voidptr
|
|
||||||
}
|
|
||||||
|
|
||||||
pub type ThreadCB = fn (p &PoolProcessor, idx int, task_id int) voidptr
|
|
||||||
|
|
||||||
pub struct PoolProcessorConfig {
|
|
||||||
maxjobs int
|
|
||||||
callback ThreadCB
|
|
||||||
}
|
|
||||||
|
|
||||||
// new_pool_processor returns a new PoolProcessor instance.
|
|
||||||
pub fn new_pool_processor(context PoolProcessorConfig) &PoolProcessor {
|
|
||||||
if isnil(context.callback) {
|
|
||||||
panic('You need to pass a valid callback to new_pool_processor.')
|
|
||||||
}
|
|
||||||
// TODO: remove this call.
|
|
||||||
// It prevents a V warning about unused module runtime.
|
|
||||||
runtime.nr_jobs()
|
|
||||||
pool := &PoolProcessor {
|
|
||||||
items: []
|
|
||||||
results: []
|
|
||||||
shared_context: voidptr(0)
|
|
||||||
thread_contexts: []
|
|
||||||
njobs: context.maxjobs
|
|
||||||
ntask: 0
|
|
||||||
ntask_mtx: new_mutex()
|
|
||||||
waitgroup: new_waitgroup()
|
|
||||||
thread_cb: voidptr(context.callback)
|
|
||||||
}
|
|
||||||
return pool
|
|
||||||
}
|
|
||||||
|
|
||||||
// set_max_jobs gives you the ability to override the number
|
|
||||||
// of jobs *after* the PoolProcessor had been created already.
|
|
||||||
pub fn (mut pool PoolProcessor) set_max_jobs(njobs int) {
|
|
||||||
pool.njobs = njobs
|
|
||||||
}
|
|
||||||
|
|
||||||
// work_on_items receives a list of items of type T,
|
|
||||||
// then starts a work pool of pool.njobs threads, each running
|
|
||||||
// pool.thread_cb in a loop, untill all items in the list,
|
|
||||||
// are processed.
|
|
||||||
// When pool.njobs is 0, the number of jobs is determined
|
|
||||||
// by the number of available cores on the system.
|
|
||||||
// work_on_items returns *after* all threads finish.
|
|
||||||
// You can optionally call get_results after that.
|
|
||||||
// TODO: uncomment, when generics work again
|
|
||||||
//pub fn (mut pool PoolProcessor) work_on_items<T>(items []T) {
|
|
||||||
// pool.work_on_pointers( items.pointers() )
|
|
||||||
//}
|
|
||||||
|
|
||||||
pub fn (mut pool PoolProcessor) work_on_pointers(items []voidptr) {
|
|
||||||
mut njobs := runtime.nr_jobs()
|
|
||||||
if pool.njobs > 0 {
|
|
||||||
njobs = pool.njobs
|
|
||||||
}
|
|
||||||
pool.items = []
|
|
||||||
pool.results = []
|
|
||||||
pool.thread_contexts = []
|
|
||||||
pool.items << items
|
|
||||||
pool.results = []voidptr{len:(pool.items.len)}
|
|
||||||
pool.thread_contexts << []voidptr{len:(pool.items.len)}
|
|
||||||
pool.waitgroup.add(njobs)
|
|
||||||
for i := 0; i < njobs; i++ {
|
|
||||||
if njobs > 1 {
|
|
||||||
go process_in_thread(mut pool,i)
|
|
||||||
} else {
|
|
||||||
// do not run concurrently, just use the same thread:
|
|
||||||
process_in_thread(mut pool,i)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pool.waitgroup.wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
// process_in_thread does the actual work of worker thread.
|
|
||||||
// It is a workaround for the current inability to pass a
|
|
||||||
// method in a callback.
|
|
||||||
fn process_in_thread(mut pool PoolProcessor, task_id int) {
|
|
||||||
cb := ThreadCB(pool.thread_cb)
|
|
||||||
mut idx := 0
|
|
||||||
ilen := pool.items.len
|
|
||||||
for {
|
|
||||||
if pool.ntask >= ilen {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
pool.ntask_mtx.@lock()
|
|
||||||
idx = pool.ntask
|
|
||||||
pool.ntask++
|
|
||||||
pool.ntask_mtx.unlock()
|
|
||||||
if idx >= ilen {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
pool.results[idx] = cb(pool, idx, task_id)
|
|
||||||
}
|
|
||||||
pool.waitgroup.done()
|
|
||||||
}
|
|
||||||
|
|
||||||
// get_item - called by the worker callback.
|
|
||||||
// Retrieves a type safe instance of the currently processed item
|
|
||||||
// TODO: uncomment, when generics work again
|
|
||||||
//pub fn (pool &PoolProcessor) get_item<T>(idx int) T {
|
|
||||||
// return *(&T(pool.items[idx]))
|
|
||||||
//}
|
|
||||||
|
|
||||||
// get_string_item - called by the worker callback.
|
|
||||||
// It does not use generics so it does not mess up vfmt.
|
|
||||||
// TODO: remove the need for this when vfmt becomes smarter.
|
|
||||||
pub fn (pool &PoolProcessor) get_string_item(idx int) string {
|
|
||||||
// return *(&string(pool.items[idx]))
|
|
||||||
// TODO: the below is a hack, remove it when v2 casting works again
|
|
||||||
return *unsafe {&string( pool.items[idx] )}
|
|
||||||
}
|
|
||||||
|
|
||||||
// get_int_item - called by the worker callback.
|
|
||||||
// It does not use generics so it does not mess up vfmt.
|
|
||||||
// TODO: remove the need for this when vfmt becomes smarter.
|
|
||||||
pub fn (pool &PoolProcessor) get_int_item(idx int) int {
|
|
||||||
item := pool.items[idx]
|
|
||||||
return *unsafe {&int(item)}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: uncomment, when generics work again
|
|
||||||
//pub fn (pool &PoolProcessor) get_result<T>(idx int) T {
|
|
||||||
// return *(&T(pool.results[idx]))
|
|
||||||
//}
|
|
||||||
|
|
||||||
// TODO: uncomment, when generics work again
|
|
||||||
// get_results - can be called to get a list of type safe results.
|
|
||||||
//pub fn (pool &PoolProcessor) get_results<T>() []T {
|
|
||||||
// mut res := []T{}
|
|
||||||
// for i in 0 .. pool.results.len {
|
|
||||||
// res << *(&T(pool.results[i]))
|
|
||||||
// }
|
|
||||||
// return res
|
|
||||||
//}
|
|
||||||
|
|
||||||
// set_shared_context - can be called during the setup so that you can
|
|
||||||
// provide a context that is shared between all worker threads, like
|
|
||||||
// common options/settings.
|
|
||||||
pub fn (mut pool PoolProcessor) set_shared_context(context voidptr) {
|
|
||||||
pool.shared_context = context
|
|
||||||
}
|
|
||||||
|
|
||||||
// get_shared_context - can be called in each worker callback, to get
|
|
||||||
// the context set by pool.set_shared_context
|
|
||||||
pub fn (pool &PoolProcessor) get_shared_context() voidptr {
|
|
||||||
return pool.shared_context
|
|
||||||
}
|
|
||||||
|
|
||||||
// set_thread_context - can be called during the setup at the start of
|
|
||||||
// each worker callback, so that the worker callback can have some thread
|
|
||||||
// local storage area where it can write/read information that is private
|
|
||||||
// to the given thread, without worrying that it will get overwritten by
|
|
||||||
// another thread
|
|
||||||
pub fn (mut pool PoolProcessor) set_thread_context(idx int, context voidptr) {
|
|
||||||
pool.thread_contexts[idx] = context
|
|
||||||
}
|
|
||||||
|
|
||||||
// get_thread_context - returns a pointer, that was set with
|
|
||||||
// pool.set_thread_context . This pointer is private to each thread.
|
|
||||||
pub fn (pool &PoolProcessor) get_thread_context(idx int) voidptr {
|
|
||||||
return pool.thread_contexts[idx]
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: remove everything below this line after generics are fixed:
|
|
||||||
pub struct SResult {
|
|
||||||
pub:
|
|
||||||
s string
|
|
||||||
}
|
|
||||||
pub struct IResult {
|
|
||||||
pub:
|
|
||||||
i int
|
|
||||||
}
|
|
||||||
|
|
||||||
//
|
|
||||||
|
|
||||||
pub fn (mut pool PoolProcessor) work_on_items_s(items []string) {
|
|
||||||
pool.work_on_pointers( items.pointers() )
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn (mut pool PoolProcessor) work_on_items_i(items []int) {
|
|
||||||
pool.work_on_pointers( items.pointers() )
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn (pool &PoolProcessor) get_results_s() []SResult {
|
|
||||||
mut res := []SResult{}
|
|
||||||
for i in 0 .. pool.results.len {
|
|
||||||
res << *unsafe {&SResult(pool.results[i])}
|
|
||||||
}
|
|
||||||
return res
|
|
||||||
}
|
|
||||||
pub fn (pool &PoolProcessor) get_results_i() []IResult {
|
|
||||||
mut res := []IResult{}
|
|
||||||
for i in 0 .. pool.results.len {
|
|
||||||
res << *unsafe {&IResult(pool.results[i])}
|
|
||||||
}
|
|
||||||
return res
|
|
||||||
}
|
|
|
@ -0,0 +1,36 @@
|
||||||
|
|
||||||
|
The `sync.pool` module provides a convenient way to run identical tasks over
|
||||||
|
an array of items *in parallel*, without worrying about thread synchronization,
|
||||||
|
waitgroups, mutexes etc.., you just need to supply a callback function, that
|
||||||
|
will be called once per each item in your input array.
|
||||||
|
|
||||||
|
After all the work is done in parallel by the worker threads in the pool,
|
||||||
|
pool.work_on_items will return. You can then call pool.get_results<Result>()
|
||||||
|
to retrieve a list of all the results, that the worker callbacks returned
|
||||||
|
for each input item. Example:
|
||||||
|
|
||||||
|
```v
|
||||||
|
import sync.pool
|
||||||
|
|
||||||
|
struct SResult {
|
||||||
|
s string
|
||||||
|
}
|
||||||
|
|
||||||
|
fn sprocess(pp &pool.PoolProcessor, idx int, wid int) &SResult {
|
||||||
|
item := pp.get_item<string>(idx)
|
||||||
|
println('idx: $idx, wid: $wid, item: ' + item)
|
||||||
|
return &SResult{item.reverse()}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
mut pp := pool.new_pool_processor(callback: sprocess)
|
||||||
|
pp.work_on_items(['1abc', '2abc', '3abc', '4abc', '5abc', '6abc', '7abc'])
|
||||||
|
// optionally, you can iterate over the results too:
|
||||||
|
for x in pp.get_results<SResult>() {
|
||||||
|
println('result: $x.s')
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
See https://github.com/vlang/v/blob/master/vlib/sync/pool/pool_test.v for a
|
||||||
|
more detailed usage example.
|
|
@ -0,0 +1,172 @@
|
||||||
|
module pool
|
||||||
|
|
||||||
|
import sync
|
||||||
|
|
||||||
|
import runtime
|
||||||
|
|
||||||
|
pub const (
|
||||||
|
no_result = voidptr(0)
|
||||||
|
)
|
||||||
|
|
||||||
|
pub struct PoolProcessor {
|
||||||
|
thread_cb voidptr
|
||||||
|
mut:
|
||||||
|
njobs int
|
||||||
|
items []voidptr
|
||||||
|
results []voidptr
|
||||||
|
ntask int // writing to this should be locked by ntask_mtx.
|
||||||
|
ntask_mtx &sync.Mutex
|
||||||
|
waitgroup &sync.WaitGroup
|
||||||
|
shared_context voidptr
|
||||||
|
thread_contexts []voidptr
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type ThreadCB = fn (p &PoolProcessor, idx int, task_id int) voidptr
|
||||||
|
|
||||||
|
pub struct PoolProcessorConfig {
|
||||||
|
maxjobs int
|
||||||
|
callback ThreadCB
|
||||||
|
}
|
||||||
|
|
||||||
|
// new_pool_processor returns a new PoolProcessor instance.
|
||||||
|
// The parameters of new_pool_processor are:
|
||||||
|
// context.maxjobs: when 0 (the default), the PoolProcessor will use a
|
||||||
|
// number of threads, that is optimal for your system to process your items.
|
||||||
|
// context.callback: this should be a callback function, that each worker
|
||||||
|
// thread in the pool will run for each item.
|
||||||
|
// The callback function will receive as parameters:
|
||||||
|
// 1) the PoolProcessor instance, so it can call
|
||||||
|
// p.get_item<int>(idx) to get the actual item at index idx
|
||||||
|
// 2) idx - the index of the currently processed item
|
||||||
|
// 3) task_id - the index of the worker thread in which the callback
|
||||||
|
// function is running.
|
||||||
|
pub fn new_pool_processor(context PoolProcessorConfig) &PoolProcessor {
|
||||||
|
if isnil(context.callback) {
|
||||||
|
panic('You need to pass a valid callback to new_pool_processor.')
|
||||||
|
}
|
||||||
|
pool := &PoolProcessor {
|
||||||
|
items: []
|
||||||
|
results: []
|
||||||
|
shared_context: voidptr(0)
|
||||||
|
thread_contexts: []
|
||||||
|
njobs: context.maxjobs
|
||||||
|
ntask: 0
|
||||||
|
ntask_mtx: sync.new_mutex()
|
||||||
|
waitgroup: sync.new_waitgroup()
|
||||||
|
thread_cb: voidptr(context.callback)
|
||||||
|
}
|
||||||
|
return pool
|
||||||
|
}
|
||||||
|
|
||||||
|
// set_max_jobs gives you the ability to override the number
|
||||||
|
// of jobs *after* the PoolProcessor had been created already.
|
||||||
|
pub fn (mut pool PoolProcessor) set_max_jobs(njobs int) {
|
||||||
|
pool.njobs = njobs
|
||||||
|
}
|
||||||
|
|
||||||
|
// work_on_items receives a list of items of type T,
|
||||||
|
// then starts a work pool of pool.njobs threads, each running
|
||||||
|
// pool.thread_cb in a loop, untill all items in the list,
|
||||||
|
// are processed.
|
||||||
|
// When pool.njobs is 0, the number of jobs is determined
|
||||||
|
// by the number of available cores on the system.
|
||||||
|
// work_on_items returns *after* all threads finish.
|
||||||
|
// You can optionally call get_results after that.
|
||||||
|
pub fn (mut pool PoolProcessor) work_on_items<T>(items []T) {
|
||||||
|
pool.work_on_pointers( items.pointers() )
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn (mut pool PoolProcessor) work_on_pointers(items []voidptr) {
|
||||||
|
mut njobs := runtime.nr_jobs()
|
||||||
|
if pool.njobs > 0 {
|
||||||
|
njobs = pool.njobs
|
||||||
|
}
|
||||||
|
pool.items = []
|
||||||
|
pool.results = []
|
||||||
|
pool.thread_contexts = []
|
||||||
|
pool.items << items
|
||||||
|
pool.results = []voidptr{len:(pool.items.len)}
|
||||||
|
pool.thread_contexts << []voidptr{len:(pool.items.len)}
|
||||||
|
pool.waitgroup.add(njobs)
|
||||||
|
for i := 0; i < njobs; i++ {
|
||||||
|
if njobs > 1 {
|
||||||
|
go process_in_thread(mut pool,i)
|
||||||
|
} else {
|
||||||
|
// do not run concurrently, just use the same thread:
|
||||||
|
process_in_thread(mut pool,i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pool.waitgroup.wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// process_in_thread does the actual work of worker thread.
|
||||||
|
// It is a workaround for the current inability to pass a
|
||||||
|
// method in a callback.
|
||||||
|
fn process_in_thread(mut pool PoolProcessor, task_id int) {
|
||||||
|
cb := ThreadCB(pool.thread_cb)
|
||||||
|
mut idx := 0
|
||||||
|
ilen := pool.items.len
|
||||||
|
for {
|
||||||
|
if pool.ntask >= ilen {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
pool.ntask_mtx.@lock()
|
||||||
|
idx = pool.ntask
|
||||||
|
pool.ntask++
|
||||||
|
pool.ntask_mtx.unlock()
|
||||||
|
if idx >= ilen {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
pool.results[idx] = cb(pool, idx, task_id)
|
||||||
|
}
|
||||||
|
pool.waitgroup.done()
|
||||||
|
}
|
||||||
|
|
||||||
|
// get_item - called by the worker callback.
|
||||||
|
// Retrieves a type safe instance of the currently processed item
|
||||||
|
pub fn (pool &PoolProcessor) get_item<T>(idx int) T {
|
||||||
|
return *(&T(pool.items[idx]))
|
||||||
|
}
|
||||||
|
|
||||||
|
// get_result - called by the main thread to get a specific result.
|
||||||
|
// Retrieves a type safe instance of the produced result.
|
||||||
|
pub fn (pool &PoolProcessor) get_result<T>(idx int) T {
|
||||||
|
return *(&T(pool.results[idx]))
|
||||||
|
}
|
||||||
|
|
||||||
|
// get_results - get a list of type safe results in the main thread.
|
||||||
|
pub fn (pool &PoolProcessor) get_results<T>() []T {
|
||||||
|
mut res := []T{}
|
||||||
|
for i in 0 .. pool.results.len {
|
||||||
|
res << *(&T(pool.results[i]))
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
// set_shared_context - can be called during the setup so that you can
|
||||||
|
// provide a context that is shared between all worker threads, like
|
||||||
|
// common options/settings.
|
||||||
|
pub fn (mut pool PoolProcessor) set_shared_context(context voidptr) {
|
||||||
|
pool.shared_context = context
|
||||||
|
}
|
||||||
|
|
||||||
|
// get_shared_context - can be called in each worker callback, to get
|
||||||
|
// the context set by pool.set_shared_context
|
||||||
|
pub fn (pool &PoolProcessor) get_shared_context() voidptr {
|
||||||
|
return pool.shared_context
|
||||||
|
}
|
||||||
|
|
||||||
|
// set_thread_context - can be called during the setup at the start of
|
||||||
|
// each worker callback, so that the worker callback can have some thread
|
||||||
|
// local storage area where it can write/read information that is private
|
||||||
|
// to the given thread, without worrying that it will get overwritten by
|
||||||
|
// another thread
|
||||||
|
pub fn (mut pool PoolProcessor) set_thread_context(idx int, context voidptr) {
|
||||||
|
pool.thread_contexts[idx] = context
|
||||||
|
}
|
||||||
|
|
||||||
|
// get_thread_context - returns a pointer, that was set with
|
||||||
|
// pool.set_thread_context . This pointer is private to each thread.
|
||||||
|
pub fn (pool &PoolProcessor) get_thread_context(idx int) voidptr {
|
||||||
|
return pool.thread_contexts[idx]
|
||||||
|
}
|
|
@ -0,0 +1,52 @@
|
||||||
|
import time
|
||||||
|
import sync.pool
|
||||||
|
|
||||||
|
pub struct SResult {
|
||||||
|
s string
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct IResult {
|
||||||
|
i int
|
||||||
|
}
|
||||||
|
|
||||||
|
fn worker_s(p &pool.PoolProcessor, idx int, worker_id int) &SResult {
|
||||||
|
item := p.get_item<string>(idx)
|
||||||
|
println('worker_s worker_id: $worker_id | idx: $idx | item: $item')
|
||||||
|
time.sleep_ms(3)
|
||||||
|
return &SResult{'$item $item'}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn worker_i(p &pool.PoolProcessor, idx int, worker_id int) &IResult {
|
||||||
|
item := p.get_item<int>(idx)
|
||||||
|
println('worker_i worker_id: $worker_id | idx: $idx | item: $item')
|
||||||
|
time.sleep_ms(5)
|
||||||
|
return &IResult{item * 1000}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn test_work_on_strings() {
|
||||||
|
mut pool_s := pool.new_pool_processor(
|
||||||
|
callback: worker_s
|
||||||
|
maxjobs: 8
|
||||||
|
)
|
||||||
|
|
||||||
|
pool_s.work_on_items(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j'])
|
||||||
|
for x in pool_s.get_results<SResult>() {
|
||||||
|
println(x.s)
|
||||||
|
assert x.s.len > 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn test_work_on_ints() {
|
||||||
|
// NB: since maxjobs is left empty here,
|
||||||
|
// the pool processor will use njobs = runtime.nr_jobs so that
|
||||||
|
// it will work optimally without overloading the system
|
||||||
|
mut pool_i := pool.new_pool_processor(
|
||||||
|
callback: worker_i
|
||||||
|
)
|
||||||
|
|
||||||
|
pool_i.work_on_items([1, 2, 3, 4, 5, 6, 7, 8])
|
||||||
|
for x in pool_i.get_results<IResult>() {
|
||||||
|
println(x.i)
|
||||||
|
assert x.i > 100
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,63 +0,0 @@
|
||||||
import sync
|
|
||||||
import time
|
|
||||||
|
|
||||||
fn worker_s(p &sync.PoolProcessor, idx int, worker_id int) voidptr {
|
|
||||||
// TODO: this works, but confuses vfmt. It should be used instead of
|
|
||||||
// p.get_int_item when vfmt becomes smarter.
|
|
||||||
// item := p.get_item<string>(idx)
|
|
||||||
item := p.get_string_item(idx)
|
|
||||||
println('worker_s worker_id: $worker_id | idx: $idx | item: ${item}')
|
|
||||||
time.sleep_ms(3)
|
|
||||||
return voidptr( &sync.SResult{ '${item} ${item}' } )
|
|
||||||
}
|
|
||||||
|
|
||||||
fn worker_i(p &sync.PoolProcessor, idx int, worker_id int) voidptr {
|
|
||||||
// TODO: this works, but confuses vfmt. See the comment above.
|
|
||||||
// item := p.get_item<int>(idx)
|
|
||||||
item := p.get_int_item(idx)
|
|
||||||
println('worker_i worker_id: $worker_id | idx: $idx | item: ${item}')
|
|
||||||
time.sleep_ms(5)
|
|
||||||
return voidptr( &sync.IResult{ item * 1000 } )
|
|
||||||
}
|
|
||||||
|
|
||||||
fn test_work_on_strings() {
|
|
||||||
mut pool_s := sync.new_pool_processor({
|
|
||||||
callback: worker_s
|
|
||||||
maxjobs: 8
|
|
||||||
})
|
|
||||||
|
|
||||||
// TODO: uncomment this when generics work again
|
|
||||||
//pool_s.work_on_items(['a','b','c','d','e','f','g','h','i','j'])
|
|
||||||
//for x in pool_s.get_results<SResult>() {
|
|
||||||
// println( x.s )
|
|
||||||
// assert x.s.len > 1
|
|
||||||
//}
|
|
||||||
|
|
||||||
pool_s.work_on_items_s(['a','b','c','d','e','f','g','h','i','j'])
|
|
||||||
for x in pool_s.get_results_s() {
|
|
||||||
println( x.s )
|
|
||||||
assert x.s.len > 1
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn test_work_on_ints() {
|
|
||||||
// NB: since maxjobs is left empty here,
|
|
||||||
// the pool processor will use njobs = runtime.nr_jobs so that
|
|
||||||
// it will work optimally without overloading the system
|
|
||||||
mut pool_i := sync.new_pool_processor({
|
|
||||||
callback: worker_i
|
|
||||||
})
|
|
||||||
|
|
||||||
// TODO: uncomment this when generics work again
|
|
||||||
//pool_i.work_on_items([1,2,3,4,5,6,7,8])
|
|
||||||
//for x in pool_i.get_results<IResult>() {
|
|
||||||
// println( x.i )
|
|
||||||
// assert x.i > 100
|
|
||||||
//}
|
|
||||||
|
|
||||||
pool_i.work_on_items_i([1,2,3,4,5,6,7,8])
|
|
||||||
for x in pool_i.get_results_i() {
|
|
||||||
println( x.i )
|
|
||||||
assert x.i > 100
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -3,7 +3,7 @@ module main
|
||||||
import os
|
import os
|
||||||
import v.tests.repl.runner
|
import v.tests.repl.runner
|
||||||
import benchmark
|
import benchmark
|
||||||
import sync
|
import sync.pool
|
||||||
|
|
||||||
const turn_off_vcolors = os.setenv('VCOLORS', 'never', true)
|
const turn_off_vcolors = os.setenv('VCOLORS', 'never', true)
|
||||||
|
|
||||||
|
@ -39,7 +39,7 @@ fn test_all_v_repl_files() {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
session.bmark.set_total_expected_steps(session.options.files.len)
|
session.bmark.set_total_expected_steps(session.options.files.len)
|
||||||
mut pool_repl := sync.new_pool_processor(
|
mut pool_repl := pool.new_pool_processor(
|
||||||
callback: worker_repl
|
callback: worker_repl
|
||||||
)
|
)
|
||||||
pool_repl.set_shared_context(session)
|
pool_repl.set_shared_context(session)
|
||||||
|
@ -47,12 +47,12 @@ fn test_all_v_repl_files() {
|
||||||
// See: https://docs.microsoft.com/en-us/cpp/build/reference/fs-force-synchronous-pdb-writes?view=vs-2019
|
// See: https://docs.microsoft.com/en-us/cpp/build/reference/fs-force-synchronous-pdb-writes?view=vs-2019
|
||||||
pool_repl.set_max_jobs(1)
|
pool_repl.set_max_jobs(1)
|
||||||
}
|
}
|
||||||
pool_repl.work_on_items_s(session.options.files)
|
pool_repl.work_on_items<string>(session.options.files)
|
||||||
session.bmark.stop()
|
session.bmark.stop()
|
||||||
println(session.bmark.total_message('total time spent running REPL files'))
|
println(session.bmark.total_message('total time spent running REPL files'))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn worker_repl(mut p sync.PoolProcessor, idx int, thread_id int) voidptr {
|
fn worker_repl(mut p pool.PoolProcessor, idx int, thread_id int) voidptr {
|
||||||
cdir := os.cache_dir()
|
cdir := os.cache_dir()
|
||||||
mut session := &Session(p.get_shared_context())
|
mut session := &Session(p.get_shared_context())
|
||||||
mut tls_bench := &benchmark.Benchmark(p.get_thread_context(idx))
|
mut tls_bench := &benchmark.Benchmark(p.get_thread_context(idx))
|
||||||
|
@ -67,7 +67,7 @@ fn worker_repl(mut p sync.PoolProcessor, idx int, thread_id int) voidptr {
|
||||||
os.rmdir_all(tfolder) or { panic(err) }
|
os.rmdir_all(tfolder) or { panic(err) }
|
||||||
}
|
}
|
||||||
os.mkdir(tfolder) or { panic(err) }
|
os.mkdir(tfolder) or { panic(err) }
|
||||||
file := p.get_string_item(idx)
|
file := p.get_item<string>(idx)
|
||||||
session.bmark.step()
|
session.bmark.step()
|
||||||
tls_bench.step()
|
tls_bench.step()
|
||||||
fres := runner.run_repl_file(tfolder, session.options.vexec, file) or {
|
fres := runner.run_repl_file(tfolder, session.options.vexec, file) or {
|
||||||
|
@ -76,12 +76,12 @@ fn worker_repl(mut p sync.PoolProcessor, idx int, thread_id int) voidptr {
|
||||||
os.rmdir_all(tfolder) or { panic(err) }
|
os.rmdir_all(tfolder) or { panic(err) }
|
||||||
eprintln(tls_bench.step_message_fail(err))
|
eprintln(tls_bench.step_message_fail(err))
|
||||||
assert false
|
assert false
|
||||||
return sync.no_result
|
return pool.no_result
|
||||||
}
|
}
|
||||||
session.bmark.ok()
|
session.bmark.ok()
|
||||||
tls_bench.ok()
|
tls_bench.ok()
|
||||||
os.rmdir_all(tfolder) or { panic(err) }
|
os.rmdir_all(tfolder) or { panic(err) }
|
||||||
println(tls_bench.step_message_ok(fres))
|
println(tls_bench.step_message_ok(fres))
|
||||||
assert true
|
assert true
|
||||||
return sync.no_result
|
return pool.no_result
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue