From 578de634fee5f805f0abbc829452bef0fa4b9fa6 Mon Sep 17 00:00:00 2001 From: Delyan Angelov Date: Thu, 11 Feb 2021 10:55:23 +0200 Subject: [PATCH] sync: move pool related code to `sync.pool`, cleanup, add a README.md --- cmd/tools/modules/testing/common.v | 26 +-- examples/news_fetcher.v | 22 +-- vlib/sync/pool.v | 254 ----------------------------- vlib/sync/pool/README.md | 36 ++++ vlib/sync/pool/pool.v | 172 +++++++++++++++++++ vlib/sync/pool/pool_test.v | 52 ++++++ vlib/sync/pool_test.v | 63 ------- vlib/v/tests/repl/repl_test.v | 14 +- 8 files changed, 292 insertions(+), 347 deletions(-) delete mode 100644 vlib/sync/pool.v create mode 100644 vlib/sync/pool/README.md create mode 100644 vlib/sync/pool/pool.v create mode 100644 vlib/sync/pool/pool_test.v delete mode 100644 vlib/sync/pool_test.v diff --git a/cmd/tools/modules/testing/common.v b/cmd/tools/modules/testing/common.v index 27889914bc..0634b91754 100644 --- a/cmd/tools/modules/testing/common.v +++ b/cmd/tools/modules/testing/common.v @@ -4,7 +4,7 @@ import os import time import term import benchmark -import sync +import sync.pool import v.pref import v.util.vtest @@ -198,7 +198,7 @@ pub fn (mut ts TestSession) test() { remaining_files = vtest.filter_vtest_only(remaining_files, fix_slashes: false) ts.files = remaining_files ts.benchmark.set_total_expected_steps(remaining_files.len) - mut pool_of_test_runners := sync.new_pool_processor(callback: worker_trunner) + mut pool_of_test_runners := pool.new_pool_processor(callback: worker_trunner) // for handling messages across threads ts.nmessages = chan LogMessage{cap: 10000} ts.nprint_ended = chan int{cap: 0} @@ -218,7 +218,7 @@ pub fn (mut ts TestSession) test() { } } -fn worker_trunner(mut p sync.PoolProcessor, idx int, thread_id int) voidptr { +fn worker_trunner(mut p pool.PoolProcessor, idx int, thread_id int) voidptr { mut ts := &TestSession(p.get_shared_context()) tmpd := ts.vtmp_dir show_stats := '-stats' in ts.vargs.split(' ') @@ -230,7 +230,7 @@ fn worker_trunner(mut p sync.PoolProcessor, idx int, thread_id int) voidptr { p.set_thread_context(idx, tls_bench) } tls_bench.no_cstep = true - dot_relative_file := p.get_string_item(idx) + dot_relative_file := p.get_item(idx) mut relative_file := dot_relative_file.replace('./', '') if ts.root_relative { relative_file = relative_file.replace(ts.vroot + os.path_separator, '') @@ -239,8 +239,11 @@ fn worker_trunner(mut p sync.PoolProcessor, idx int, thread_id int) voidptr { // Ensure that the generated binaries will be stored in the temporary folder. // Remove them after a test passes/fails. fname := os.file_name(file) - generated_binary_fname := if os.user_os() == 'windows' { fname.replace('.v', '.exe') } else { fname.replace('.v', - '') } + generated_binary_fname := if os.user_os() == 'windows' { + fname.replace('.v', '.exe') + } else { + fname.replace('.v', '') + } generated_binary_fpath := os.join_path(tmpd, generated_binary_fname) if os.exists(generated_binary_fpath) { if ts.rm_binaries { @@ -258,7 +261,7 @@ fn worker_trunner(mut p sync.PoolProcessor, idx int, thread_id int) voidptr { ts.benchmark.skip() tls_bench.skip() ts.append_message(.skip, tls_bench.step_message_skip(relative_file)) - return sync.no_result + return pool.no_result } if show_stats { ts.append_message(.ok, term.h_divider('-')) @@ -270,7 +273,7 @@ fn worker_trunner(mut p sync.PoolProcessor, idx int, thread_id int) voidptr { ts.failed = true ts.benchmark.fail() tls_bench.fail() - return sync.no_result + return pool.no_result } } else { if testing.show_start { @@ -281,7 +284,7 @@ fn worker_trunner(mut p sync.PoolProcessor, idx int, thread_id int) voidptr { ts.benchmark.fail() tls_bench.fail() ts.append_message(.fail, tls_bench.step_message_fail(relative_file)) - return sync.no_result + return pool.no_result } if r.exit_code != 0 { ts.failed = true @@ -300,7 +303,7 @@ fn worker_trunner(mut p sync.PoolProcessor, idx int, thread_id int) voidptr { os.rm(generated_binary_fpath) or { panic(err) } } } - return sync.no_result + return pool.no_result } pub fn vlib_should_be_present(parent_dir string) { @@ -343,8 +346,7 @@ pub fn prepare_test_session(zargs string, folder string, oskipped []string, main } $if windows { // skip pico and process/command examples on windows - if f.ends_with('examples\\pico\\pico.v') - || f.ends_with('examples\\process\\command.v') { + if f.ends_with('examples\\pico\\pico.v') || f.ends_with('examples\\process\\command.v') { continue } } diff --git a/examples/news_fetcher.v b/examples/news_fetcher.v index 19d42068d4..e724fbad99 100644 --- a/examples/news_fetcher.v +++ b/examples/news_fetcher.v @@ -3,25 +3,25 @@ // that can be found in the LICENSE file. import net.http import json -import sync +import sync.pool struct Story { title string url string } -fn worker_fetch(p &sync.PoolProcessor, cursor int, worker_id int) voidptr { - id := p.get_int_item(cursor) +fn worker_fetch(p &pool.PoolProcessor, cursor int, worker_id int) voidptr { + id := p.get_item(cursor) resp := http.get('https://hacker-news.firebaseio.com/v0/item/${id}.json') or { println('failed to fetch data from /v0/item/${id}.json') - return sync.no_result + return pool.no_result } - story := json.decode(Story,resp.text) or { + story := json.decode(Story, resp.text) or { println('failed to decode a story') - return sync.no_result + return pool.no_result } println('# $cursor) $story.title | $story.url') - return sync.no_result + return pool.no_result } // Fetches top HN stories in parallel, depending on how many cores you have @@ -30,20 +30,20 @@ fn main() { println('failed to fetch data from /v0/topstories.json') return } - mut ids := json.decode([]int,resp.text) or { + mut ids := json.decode([]int, resp.text) or { println('failed to decode topstories.json') return } if ids.len > 10 { ids = ids[0..10] } - mut fetcher_pool := sync.new_pool_processor({ + mut fetcher_pool := pool.new_pool_processor( callback: worker_fetch - }) + ) // NB: if you do not call set_max_jobs, the pool will try to use an optimal // number of threads, one per each core in your system, which in most // cases is what you want anyway... You can override the automatic choice // by setting the VJOBS environment variable too. // fetcher_pool.set_max_jobs( 4 ) - fetcher_pool.work_on_items_i(ids) + fetcher_pool.work_on_items(ids) } diff --git a/vlib/sync/pool.v b/vlib/sync/pool.v deleted file mode 100644 index 06ea156a12..0000000000 --- a/vlib/sync/pool.v +++ /dev/null @@ -1,254 +0,0 @@ -module sync - -import runtime - -// * Goal: this file provides a convenient way to run identical tasks over a list -// * of items in parallel, without worrying about waitgroups, mutexes and so on. -// * -// * Usage example: -// * struct SResult{ s string } -// * fn sprocess(p &sync.PoolProcessor, idx, wid int) voidptr { -// * item := p.get_item(idx) -// * println('idx: $idx, wid: $wid, item: ' + item) -// * return &SResult{ item.reverse() } -// * } -// * pool := sync.new_pool_processor({ callback: sprocess }) -// * pool.work_on_items(['a','b','c','d','e','f','g']) -// * // optionally, you can iterate over the results too: -// * for x in pool.get_results() { -// * println('result: $x.s') -// * } -// * -// * See https://github.com/vlang/v/blob/master/vlib/sync/pool_test.v for a -// * more detailed usage example. -// * -// * After all the work is done in parallel by the worker threads in the pool, -// * pool.work_on_items will return, and you can then call -// * pool.get_results() to retrieve a list of all the results, -// * that the worker callbacks returned for each item that you passed. -// * The parameters of new_pool_processor are: -// * context.maxjobs: when 0 (the default), the PoolProcessor will use an -// * optimal for your system number of threads to process your items -// * context.callback: this should be a callback function, that each worker -// * thread in the pool will run for each item. -// * The callback function will receive as parameters: -// * 1) the PoolProcessor instance, so it can call -// * p.get_item(idx) to get the actual item at index idx -// * NB: for now, you are better off calling p.get_string_item(idx) -// * or p.get_int_item(idx) ; TODO: vfmt and generics -// * 2) idx - the index of the currently processed item -// * 3) task_id - the index of the worker thread in which the callback -// * function is running. - -pub const ( - no_result = voidptr(0) -) - -pub struct PoolProcessor { - thread_cb voidptr -mut: - njobs int - items []voidptr - results []voidptr - ntask int // writing to this should be locked by ntask_mtx. - ntask_mtx &Mutex - waitgroup &WaitGroup - shared_context voidptr - thread_contexts []voidptr -} - -pub type ThreadCB = fn (p &PoolProcessor, idx int, task_id int) voidptr - -pub struct PoolProcessorConfig { - maxjobs int - callback ThreadCB -} - -// new_pool_processor returns a new PoolProcessor instance. -pub fn new_pool_processor(context PoolProcessorConfig) &PoolProcessor { - if isnil(context.callback) { - panic('You need to pass a valid callback to new_pool_processor.') - } - // TODO: remove this call. - // It prevents a V warning about unused module runtime. - runtime.nr_jobs() - pool := &PoolProcessor { - items: [] - results: [] - shared_context: voidptr(0) - thread_contexts: [] - njobs: context.maxjobs - ntask: 0 - ntask_mtx: new_mutex() - waitgroup: new_waitgroup() - thread_cb: voidptr(context.callback) - } - return pool -} - -// set_max_jobs gives you the ability to override the number -// of jobs *after* the PoolProcessor had been created already. -pub fn (mut pool PoolProcessor) set_max_jobs(njobs int) { - pool.njobs = njobs -} - -// work_on_items receives a list of items of type T, -// then starts a work pool of pool.njobs threads, each running -// pool.thread_cb in a loop, untill all items in the list, -// are processed. -// When pool.njobs is 0, the number of jobs is determined -// by the number of available cores on the system. -// work_on_items returns *after* all threads finish. -// You can optionally call get_results after that. -// TODO: uncomment, when generics work again -//pub fn (mut pool PoolProcessor) work_on_items(items []T) { -// pool.work_on_pointers( items.pointers() ) -//} - -pub fn (mut pool PoolProcessor) work_on_pointers(items []voidptr) { - mut njobs := runtime.nr_jobs() - if pool.njobs > 0 { - njobs = pool.njobs - } - pool.items = [] - pool.results = [] - pool.thread_contexts = [] - pool.items << items - pool.results = []voidptr{len:(pool.items.len)} - pool.thread_contexts << []voidptr{len:(pool.items.len)} - pool.waitgroup.add(njobs) - for i := 0; i < njobs; i++ { - if njobs > 1 { - go process_in_thread(mut pool,i) - } else { - // do not run concurrently, just use the same thread: - process_in_thread(mut pool,i) - } - } - pool.waitgroup.wait() -} - -// process_in_thread does the actual work of worker thread. -// It is a workaround for the current inability to pass a -// method in a callback. -fn process_in_thread(mut pool PoolProcessor, task_id int) { - cb := ThreadCB(pool.thread_cb) - mut idx := 0 - ilen := pool.items.len - for { - if pool.ntask >= ilen { - break - } - pool.ntask_mtx.@lock() - idx = pool.ntask - pool.ntask++ - pool.ntask_mtx.unlock() - if idx >= ilen { - break - } - pool.results[idx] = cb(pool, idx, task_id) - } - pool.waitgroup.done() -} - -// get_item - called by the worker callback. -// Retrieves a type safe instance of the currently processed item -// TODO: uncomment, when generics work again -//pub fn (pool &PoolProcessor) get_item(idx int) T { -// return *(&T(pool.items[idx])) -//} - -// get_string_item - called by the worker callback. -// It does not use generics so it does not mess up vfmt. -// TODO: remove the need for this when vfmt becomes smarter. -pub fn (pool &PoolProcessor) get_string_item(idx int) string { - // return *(&string(pool.items[idx])) - // TODO: the below is a hack, remove it when v2 casting works again - return *unsafe {&string( pool.items[idx] )} -} - -// get_int_item - called by the worker callback. -// It does not use generics so it does not mess up vfmt. -// TODO: remove the need for this when vfmt becomes smarter. -pub fn (pool &PoolProcessor) get_int_item(idx int) int { - item := pool.items[idx] - return *unsafe {&int(item)} -} - -// TODO: uncomment, when generics work again -//pub fn (pool &PoolProcessor) get_result(idx int) T { -// return *(&T(pool.results[idx])) -//} - -// TODO: uncomment, when generics work again -// get_results - can be called to get a list of type safe results. -//pub fn (pool &PoolProcessor) get_results() []T { -// mut res := []T{} -// for i in 0 .. pool.results.len { -// res << *(&T(pool.results[i])) -// } -// return res -//} - -// set_shared_context - can be called during the setup so that you can -// provide a context that is shared between all worker threads, like -// common options/settings. -pub fn (mut pool PoolProcessor) set_shared_context(context voidptr) { - pool.shared_context = context -} - -// get_shared_context - can be called in each worker callback, to get -// the context set by pool.set_shared_context -pub fn (pool &PoolProcessor) get_shared_context() voidptr { - return pool.shared_context -} - -// set_thread_context - can be called during the setup at the start of -// each worker callback, so that the worker callback can have some thread -// local storage area where it can write/read information that is private -// to the given thread, without worrying that it will get overwritten by -// another thread -pub fn (mut pool PoolProcessor) set_thread_context(idx int, context voidptr) { - pool.thread_contexts[idx] = context -} - -// get_thread_context - returns a pointer, that was set with -// pool.set_thread_context . This pointer is private to each thread. -pub fn (pool &PoolProcessor) get_thread_context(idx int) voidptr { - return pool.thread_contexts[idx] -} - -// TODO: remove everything below this line after generics are fixed: -pub struct SResult { -pub: - s string -} -pub struct IResult { -pub: - i int -} - -// - -pub fn (mut pool PoolProcessor) work_on_items_s(items []string) { - pool.work_on_pointers( items.pointers() ) -} - -pub fn (mut pool PoolProcessor) work_on_items_i(items []int) { - pool.work_on_pointers( items.pointers() ) -} - -pub fn (pool &PoolProcessor) get_results_s() []SResult { - mut res := []SResult{} - for i in 0 .. pool.results.len { - res << *unsafe {&SResult(pool.results[i])} - } - return res -} -pub fn (pool &PoolProcessor) get_results_i() []IResult { - mut res := []IResult{} - for i in 0 .. pool.results.len { - res << *unsafe {&IResult(pool.results[i])} - } - return res -} diff --git a/vlib/sync/pool/README.md b/vlib/sync/pool/README.md new file mode 100644 index 0000000000..bdea5b37c7 --- /dev/null +++ b/vlib/sync/pool/README.md @@ -0,0 +1,36 @@ + +The `sync.pool` module provides a convenient way to run identical tasks over +an array of items *in parallel*, without worrying about thread synchronization, +waitgroups, mutexes etc.., you just need to supply a callback function, that +will be called once per each item in your input array. + +After all the work is done in parallel by the worker threads in the pool, +pool.work_on_items will return. You can then call pool.get_results() +to retrieve a list of all the results, that the worker callbacks returned +for each input item. Example: + +```v +import sync.pool + +struct SResult { + s string +} + +fn sprocess(pp &pool.PoolProcessor, idx int, wid int) &SResult { + item := pp.get_item(idx) + println('idx: $idx, wid: $wid, item: ' + item) + return &SResult{item.reverse()} +} + +fn main() { + mut pp := pool.new_pool_processor(callback: sprocess) + pp.work_on_items(['1abc', '2abc', '3abc', '4abc', '5abc', '6abc', '7abc']) + // optionally, you can iterate over the results too: + for x in pp.get_results() { + println('result: $x.s') + } +} +``` + +See https://github.com/vlang/v/blob/master/vlib/sync/pool/pool_test.v for a +more detailed usage example. diff --git a/vlib/sync/pool/pool.v b/vlib/sync/pool/pool.v new file mode 100644 index 0000000000..594d1ba484 --- /dev/null +++ b/vlib/sync/pool/pool.v @@ -0,0 +1,172 @@ +module pool + +import sync + +import runtime + +pub const ( + no_result = voidptr(0) +) + +pub struct PoolProcessor { + thread_cb voidptr +mut: + njobs int + items []voidptr + results []voidptr + ntask int // writing to this should be locked by ntask_mtx. + ntask_mtx &sync.Mutex + waitgroup &sync.WaitGroup + shared_context voidptr + thread_contexts []voidptr +} + +pub type ThreadCB = fn (p &PoolProcessor, idx int, task_id int) voidptr + +pub struct PoolProcessorConfig { + maxjobs int + callback ThreadCB +} + +// new_pool_processor returns a new PoolProcessor instance. +// The parameters of new_pool_processor are: +// context.maxjobs: when 0 (the default), the PoolProcessor will use a +// number of threads, that is optimal for your system to process your items. +// context.callback: this should be a callback function, that each worker +// thread in the pool will run for each item. +// The callback function will receive as parameters: +// 1) the PoolProcessor instance, so it can call +// p.get_item(idx) to get the actual item at index idx +// 2) idx - the index of the currently processed item +// 3) task_id - the index of the worker thread in which the callback +// function is running. +pub fn new_pool_processor(context PoolProcessorConfig) &PoolProcessor { + if isnil(context.callback) { + panic('You need to pass a valid callback to new_pool_processor.') + } + pool := &PoolProcessor { + items: [] + results: [] + shared_context: voidptr(0) + thread_contexts: [] + njobs: context.maxjobs + ntask: 0 + ntask_mtx: sync.new_mutex() + waitgroup: sync.new_waitgroup() + thread_cb: voidptr(context.callback) + } + return pool +} + +// set_max_jobs gives you the ability to override the number +// of jobs *after* the PoolProcessor had been created already. +pub fn (mut pool PoolProcessor) set_max_jobs(njobs int) { + pool.njobs = njobs +} + +// work_on_items receives a list of items of type T, +// then starts a work pool of pool.njobs threads, each running +// pool.thread_cb in a loop, untill all items in the list, +// are processed. +// When pool.njobs is 0, the number of jobs is determined +// by the number of available cores on the system. +// work_on_items returns *after* all threads finish. +// You can optionally call get_results after that. +pub fn (mut pool PoolProcessor) work_on_items(items []T) { + pool.work_on_pointers( items.pointers() ) +} + +pub fn (mut pool PoolProcessor) work_on_pointers(items []voidptr) { + mut njobs := runtime.nr_jobs() + if pool.njobs > 0 { + njobs = pool.njobs + } + pool.items = [] + pool.results = [] + pool.thread_contexts = [] + pool.items << items + pool.results = []voidptr{len:(pool.items.len)} + pool.thread_contexts << []voidptr{len:(pool.items.len)} + pool.waitgroup.add(njobs) + for i := 0; i < njobs; i++ { + if njobs > 1 { + go process_in_thread(mut pool,i) + } else { + // do not run concurrently, just use the same thread: + process_in_thread(mut pool,i) + } + } + pool.waitgroup.wait() +} + +// process_in_thread does the actual work of worker thread. +// It is a workaround for the current inability to pass a +// method in a callback. +fn process_in_thread(mut pool PoolProcessor, task_id int) { + cb := ThreadCB(pool.thread_cb) + mut idx := 0 + ilen := pool.items.len + for { + if pool.ntask >= ilen { + break + } + pool.ntask_mtx.@lock() + idx = pool.ntask + pool.ntask++ + pool.ntask_mtx.unlock() + if idx >= ilen { + break + } + pool.results[idx] = cb(pool, idx, task_id) + } + pool.waitgroup.done() +} + +// get_item - called by the worker callback. +// Retrieves a type safe instance of the currently processed item +pub fn (pool &PoolProcessor) get_item(idx int) T { + return *(&T(pool.items[idx])) +} + +// get_result - called by the main thread to get a specific result. +// Retrieves a type safe instance of the produced result. +pub fn (pool &PoolProcessor) get_result(idx int) T { + return *(&T(pool.results[idx])) +} + +// get_results - get a list of type safe results in the main thread. +pub fn (pool &PoolProcessor) get_results() []T { + mut res := []T{} + for i in 0 .. pool.results.len { + res << *(&T(pool.results[i])) + } + return res +} + +// set_shared_context - can be called during the setup so that you can +// provide a context that is shared between all worker threads, like +// common options/settings. +pub fn (mut pool PoolProcessor) set_shared_context(context voidptr) { + pool.shared_context = context +} + +// get_shared_context - can be called in each worker callback, to get +// the context set by pool.set_shared_context +pub fn (pool &PoolProcessor) get_shared_context() voidptr { + return pool.shared_context +} + +// set_thread_context - can be called during the setup at the start of +// each worker callback, so that the worker callback can have some thread +// local storage area where it can write/read information that is private +// to the given thread, without worrying that it will get overwritten by +// another thread +pub fn (mut pool PoolProcessor) set_thread_context(idx int, context voidptr) { + pool.thread_contexts[idx] = context +} + +// get_thread_context - returns a pointer, that was set with +// pool.set_thread_context . This pointer is private to each thread. +pub fn (pool &PoolProcessor) get_thread_context(idx int) voidptr { + return pool.thread_contexts[idx] +} diff --git a/vlib/sync/pool/pool_test.v b/vlib/sync/pool/pool_test.v new file mode 100644 index 0000000000..17da78d347 --- /dev/null +++ b/vlib/sync/pool/pool_test.v @@ -0,0 +1,52 @@ +import time +import sync.pool + +pub struct SResult { + s string +} + +pub struct IResult { + i int +} + +fn worker_s(p &pool.PoolProcessor, idx int, worker_id int) &SResult { + item := p.get_item(idx) + println('worker_s worker_id: $worker_id | idx: $idx | item: $item') + time.sleep_ms(3) + return &SResult{'$item $item'} +} + +fn worker_i(p &pool.PoolProcessor, idx int, worker_id int) &IResult { + item := p.get_item(idx) + println('worker_i worker_id: $worker_id | idx: $idx | item: $item') + time.sleep_ms(5) + return &IResult{item * 1000} +} + +fn test_work_on_strings() { + mut pool_s := pool.new_pool_processor( + callback: worker_s + maxjobs: 8 + ) + + pool_s.work_on_items(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']) + for x in pool_s.get_results() { + println(x.s) + assert x.s.len > 1 + } +} + +fn test_work_on_ints() { + // NB: since maxjobs is left empty here, + // the pool processor will use njobs = runtime.nr_jobs so that + // it will work optimally without overloading the system + mut pool_i := pool.new_pool_processor( + callback: worker_i + ) + + pool_i.work_on_items([1, 2, 3, 4, 5, 6, 7, 8]) + for x in pool_i.get_results() { + println(x.i) + assert x.i > 100 + } +} diff --git a/vlib/sync/pool_test.v b/vlib/sync/pool_test.v deleted file mode 100644 index 4a65a13d28..0000000000 --- a/vlib/sync/pool_test.v +++ /dev/null @@ -1,63 +0,0 @@ -import sync -import time - -fn worker_s(p &sync.PoolProcessor, idx int, worker_id int) voidptr { - // TODO: this works, but confuses vfmt. It should be used instead of - // p.get_int_item when vfmt becomes smarter. - // item := p.get_item(idx) - item := p.get_string_item(idx) - println('worker_s worker_id: $worker_id | idx: $idx | item: ${item}') - time.sleep_ms(3) - return voidptr( &sync.SResult{ '${item} ${item}' } ) -} - -fn worker_i(p &sync.PoolProcessor, idx int, worker_id int) voidptr { - // TODO: this works, but confuses vfmt. See the comment above. - // item := p.get_item(idx) - item := p.get_int_item(idx) - println('worker_i worker_id: $worker_id | idx: $idx | item: ${item}') - time.sleep_ms(5) - return voidptr( &sync.IResult{ item * 1000 } ) -} - -fn test_work_on_strings() { - mut pool_s := sync.new_pool_processor({ - callback: worker_s - maxjobs: 8 - }) - - // TODO: uncomment this when generics work again - //pool_s.work_on_items(['a','b','c','d','e','f','g','h','i','j']) - //for x in pool_s.get_results() { - // println( x.s ) - // assert x.s.len > 1 - //} - - pool_s.work_on_items_s(['a','b','c','d','e','f','g','h','i','j']) - for x in pool_s.get_results_s() { - println( x.s ) - assert x.s.len > 1 - } -} - -fn test_work_on_ints() { - // NB: since maxjobs is left empty here, - // the pool processor will use njobs = runtime.nr_jobs so that - // it will work optimally without overloading the system - mut pool_i := sync.new_pool_processor({ - callback: worker_i - }) - - // TODO: uncomment this when generics work again - //pool_i.work_on_items([1,2,3,4,5,6,7,8]) - //for x in pool_i.get_results() { - // println( x.i ) - // assert x.i > 100 - //} - - pool_i.work_on_items_i([1,2,3,4,5,6,7,8]) - for x in pool_i.get_results_i() { - println( x.i ) - assert x.i > 100 - } -} diff --git a/vlib/v/tests/repl/repl_test.v b/vlib/v/tests/repl/repl_test.v index a285da7396..7f277ce1b8 100644 --- a/vlib/v/tests/repl/repl_test.v +++ b/vlib/v/tests/repl/repl_test.v @@ -3,7 +3,7 @@ module main import os import v.tests.repl.runner import benchmark -import sync +import sync.pool const turn_off_vcolors = os.setenv('VCOLORS', 'never', true) @@ -39,7 +39,7 @@ fn test_all_v_repl_files() { panic(err) } session.bmark.set_total_expected_steps(session.options.files.len) - mut pool_repl := sync.new_pool_processor( + mut pool_repl := pool.new_pool_processor( callback: worker_repl ) pool_repl.set_shared_context(session) @@ -47,12 +47,12 @@ fn test_all_v_repl_files() { // See: https://docs.microsoft.com/en-us/cpp/build/reference/fs-force-synchronous-pdb-writes?view=vs-2019 pool_repl.set_max_jobs(1) } - pool_repl.work_on_items_s(session.options.files) + pool_repl.work_on_items(session.options.files) session.bmark.stop() println(session.bmark.total_message('total time spent running REPL files')) } -fn worker_repl(mut p sync.PoolProcessor, idx int, thread_id int) voidptr { +fn worker_repl(mut p pool.PoolProcessor, idx int, thread_id int) voidptr { cdir := os.cache_dir() mut session := &Session(p.get_shared_context()) mut tls_bench := &benchmark.Benchmark(p.get_thread_context(idx)) @@ -67,7 +67,7 @@ fn worker_repl(mut p sync.PoolProcessor, idx int, thread_id int) voidptr { os.rmdir_all(tfolder) or { panic(err) } } os.mkdir(tfolder) or { panic(err) } - file := p.get_string_item(idx) + file := p.get_item(idx) session.bmark.step() tls_bench.step() fres := runner.run_repl_file(tfolder, session.options.vexec, file) or { @@ -76,12 +76,12 @@ fn worker_repl(mut p sync.PoolProcessor, idx int, thread_id int) voidptr { os.rmdir_all(tfolder) or { panic(err) } eprintln(tls_bench.step_message_fail(err)) assert false - return sync.no_result + return pool.no_result } session.bmark.ok() tls_bench.ok() os.rmdir_all(tfolder) or { panic(err) } println(tls_bench.step_message_ok(fres)) assert true - return sync.no_result + return pool.no_result }