sync: implement pool.work_on_items to process a list of items in parallel

pull/3937/head
Alexander Medvednikov 2020-03-04 20:28:42 +01:00 committed by GitHub
parent 136aa763a3
commit b0ece3a9d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 423 additions and 215 deletions

View File

@ -5,22 +5,17 @@ import (
term
benchmark
filepath
runtime
sync
v.pref
)
pub struct TestSession {
pub mut:
files []string
vexe string
vargs string
failed bool
benchmark benchmark.Benchmark
ntask int // writing to this should be locked by mu.
ntask_mtx &sync.Mutex
waitgroup &sync.WaitGroup
files []string
vexe string
vargs string
failed bool
benchmark benchmark.Benchmark
show_ok_tests bool
}
@ -28,11 +23,6 @@ pub fn new_test_session(vargs string) TestSession {
return TestSession{
vexe: pref.vexe_path()
vargs: vargs
ntask: 0
ntask_mtx: sync.new_mutex()
waitgroup: sync.new_waitgroup()
show_ok_tests: !vargs.contains('-silent')
}
}
@ -69,109 +59,96 @@ pub fn (ts mut TestSession) test() {
}
remaining_files << dot_relative_file
}
ts.files = remaining_files
ts.benchmark.set_total_expected_steps(remaining_files.len)
mut njobs := runtime.nr_jobs()
mut pool_of_test_runners := sync.new_pool_processor({
callback: worker_trunner
})
pool_of_test_runners.set_shared_context(ts)
$if msvc {
// NB: MSVC can not be launched in parallel, without giving it
// the option /FS because it uses a shared PDB file, which should
// be locked, but that makes writing slower...
// See: https://docs.microsoft.com/en-us/cpp/build/reference/fs-force-synchronous-pdb-writes?view=vs-2019
// Instead, just run tests on 1 core for now.
njobs = 1
pool_of_test_runners.set_max_jobs(1)
}
ts.waitgroup.add( njobs )
for i:=0; i < njobs; i++ {
go process_in_thread(ts)
}
ts.waitgroup.wait()
pool_of_test_runners.work_on_pointers(remaining_files.pointers())
ts.benchmark.stop()
eprintln(term.h_divider('-'))
}
fn process_in_thread(ts mut TestSession){
ts.process_files()
ts.waitgroup.done()
}
fn (ts mut TestSession) process_files() {
fn worker_trunner(p mut sync.PoolProcessor, idx int, thread_id int) voidptr {
mut ts := &TestSession(p.get_shared_context())
tmpd := os.tmpdir()
show_stats := '-stats' in ts.vargs.split(' ')
mut tls_bench := benchmark.new_benchmark() // tls_bench is used to format the step messages/timings
tls_bench.set_total_expected_steps( ts.benchmark.nexpected_steps )
for {
ts.ntask_mtx.lock()
ts.ntask++
idx := ts.ntask-1
ts.ntask_mtx.unlock()
if idx >= ts.files.len { break }
tls_bench.cstep = idx
dot_relative_file := ts.files[ idx ]
relative_file := dot_relative_file.replace('./', '')
file := os.realpath(relative_file)
// Ensure that the generated binaries will be stored in the temporary folder.
// Remove them after a test passes/fails.
fname := filepath.filename(file)
generated_binary_fname := if os.user_os() == 'windows' { fname.replace('.v', '.exe') } else { fname.replace('.v', '') }
generated_binary_fpath := filepath.join(tmpd,generated_binary_fname)
if os.exists(generated_binary_fpath) {
os.rm(generated_binary_fpath)
}
mut cmd_options := [ts.vargs]
if !ts.vargs.contains('fmt') {
cmd_options << ' -o "$generated_binary_fpath"'
}
cmd := '"${ts.vexe}" ' + cmd_options.join(' ') + ' "${file}"'
// eprintln('>>> v cmd: $cmd')
ts.benchmark.step()
tls_bench.step()
if show_stats {
eprintln(term.h_divider('-'))
status := os.system(cmd)
if status == 0 {
ts.benchmark.ok()
tls_bench.ok()
}
else {
ts.failed = true
ts.benchmark.fail()
tls_bench.fail()
continue
}
// tls_bench is used to format the step messages/timings
mut tls_bench := &benchmark.Benchmark(p.get_thread_context(idx))
if isnil(tls_bench) {
tls_bench = benchmark.new_benchmark_pointer()
tls_bench.set_total_expected_steps(ts.benchmark.nexpected_steps)
p.set_thread_context(idx, tls_bench)
}
tls_bench.cstep = idx
dot_relative_file := p.get_string_item(idx)
relative_file := dot_relative_file.replace('./', '')
file := os.realpath(relative_file)
// Ensure that the generated binaries will be stored in the temporary folder.
// Remove them after a test passes/fails.
fname := filepath.filename(file)
generated_binary_fname := if os.user_os() == 'windows' { fname.replace('.v', '.exe') } else { fname.replace('.v', '') }
generated_binary_fpath := filepath.join(tmpd,generated_binary_fname)
if os.exists(generated_binary_fpath) {
os.rm(generated_binary_fpath)
}
mut cmd_options := [ts.vargs]
if !ts.vargs.contains('fmt') {
cmd_options << ' -o "$generated_binary_fpath"'
}
cmd := '"${ts.vexe}" ' + cmd_options.join(' ') + ' "${file}"'
// eprintln('>>> v cmd: $cmd')
ts.benchmark.step()
tls_bench.step()
if show_stats {
eprintln(term.h_divider('-'))
status := os.system(cmd)
if status == 0 {
ts.benchmark.ok()
tls_bench.ok()
}
else {
r := os.exec(cmd) or {
ts.failed = true
ts.benchmark.fail()
tls_bench.fail()
eprintln(tls_bench.step_message_fail(relative_file))
continue
}
if r.exit_code != 0 {
ts.failed = true
ts.benchmark.fail()
tls_bench.fail()
eprintln(tls_bench.step_message_fail('${relative_file}\n`$file`\n (\n$r.output\n)'))
}
else {
ts.benchmark.ok()
tls_bench.ok()
if ts.show_ok_tests {
eprintln(tls_bench.step_message_ok(relative_file))
}
}
}
if os.exists(generated_binary_fpath) {
os.rm(generated_binary_fpath)
ts.failed = true
ts.benchmark.fail()
tls_bench.fail()
return sync.no_result
}
}
else {
r := os.exec(cmd) or {
ts.failed = true
ts.benchmark.fail()
tls_bench.fail()
eprintln(tls_bench.step_message_fail(relative_file))
return sync.no_result
}
if r.exit_code != 0 {
ts.failed = true
ts.benchmark.fail()
tls_bench.fail()
eprintln(tls_bench.step_message_fail('${relative_file}\n`$file`\n (\n$r.output\n)'))
}
else {
ts.benchmark.ok()
tls_bench.ok()
if ts.show_ok_tests {
eprintln(tls_bench.step_message_ok(relative_file))
}
}
}
if os.exists(generated_binary_fpath) {
os.rm(generated_binary_fpath)
}
return sync.no_result
}
pub fn vlib_should_be_present(parent_dir string) {
@ -193,17 +170,17 @@ pub fn v_build_failing(zargs string, folder string) bool {
eprintln('v compiler args: "$vargs"')
mut session := new_test_session(vargs)
files := os.walk_ext(filepath.join(parent_dir,folder), '.v')
mut mains := files.filter(!it.contains('modules') && !it.contains('preludes'))
$if windows {
// skip pico example on windows
// there was a bug using filter here
mut mains_filtered := []string
for file in mains {
if !file.ends_with('examples\\pico\\pico.v') {
mains_filtered << file
mut mains := []string
for f in files {
if !f.contains('modules') && !f.contains('preludes') {
$if windows {
// skip pico example on windows
if f.ends_with('examples\\pico\\pico.v') {
continue
}
}
mains << f
}
mains = mains_filtered
}
session.files << mains
session.test()
@ -257,9 +234,9 @@ pub fn building_any_v_binaries_failed() bool {
}
pub fn eheader(msg string) {
eprintln(term.header(msg,'-'))
eprintln(term.header(msg, '-'))
}
pub fn header(msg string) {
println(term.header(msg,'-'))
println(term.header(msg, '-'))
}

View File

@ -5,47 +5,26 @@ import net.http
import json
import sync
const (
nr_threads = 4
)
struct Story {
title string
url string
}
struct Fetcher {
mut:
mu &sync.Mutex
ids []int
cursor int
wg &sync.WaitGroup
}
fn (f mut Fetcher) fetch() {
for {
if f.cursor >= f.ids.len {
return
}
id := f.ids[f.cursor]
f.mu.lock()
f.cursor++
f.mu.unlock()
cursor := f.cursor
resp := http.get('https://hacker-news.firebaseio.com/v0/item/${id}.json') or {
println('failed to fetch data from /v0/item/${id}.json')
exit(1)
}
story := json.decode(Story,resp.text) or {
println('failed to decode a story')
exit(1)
}
println('#$cursor) $story.title | $story.url')
f.wg.done()
fn worker_fetch(p &sync.PoolProcessor, cursor int, worker_id int) voidptr {
id := p.get_item<int>(cursor)
resp := http.get('https://hacker-news.firebaseio.com/v0/item/${id}.json') or {
println('failed to fetch data from /v0/item/${id}.json')
return sync.no_result
}
story := json.decode(Story,resp.text) or {
println('failed to decode a story')
return sync.no_result
}
println('# $cursor) $story.title | $story.url')
return sync.no_result
}
// Fetches top HN stories in 4 coroutines
// Fetches top HN stories in parallel, depending on how many cores you have
fn main() {
resp := http.get('https://hacker-news.firebaseio.com/v0/topstories.json') or {
println('failed to fetch data from /v0/topstories.json')
@ -56,22 +35,15 @@ fn main() {
return
}
if ids.len > 10 {
// ids = ids[:10]
mut tmp := [0].repeat(10)
for i in 0..10 {
tmp[i] = ids[i]
}
ids = tmp
ids = ids[0..10]
}
mut fetcher := &Fetcher{
ids: ids
mu: sync.new_mutex()
wg: sync.new_waitgroup()
}
fetcher.wg.add(ids.len)
for i in 0..nr_threads {
go fetcher.fetch()
}
fetcher.wg.wait()
mut fetcher_pool := sync.new_pool_processor({
callback: worker_fetch
})
// NB: if you do not call set_max_jobs, the pool will try to use an optimal
// number of threads, one per each core in your system, which in most
// cases is what you want anyway... You can override the automatic choice
// by setting the VJOBS environment variable too.
// fetcher_pool.set_max_jobs( 4 )
fetcher_pool.work_on_items<int>(ids)
}

View File

@ -74,6 +74,13 @@ pub fn new_benchmark() Benchmark {
}
}
pub fn new_benchmark_pointer() &Benchmark {
return &Benchmark{
bench_start_time: benchmark.now()
verbose: true
}
}
pub fn (b mut Benchmark) set_total_expected_steps(n int) {
b.nexpected_steps = n
}

View File

@ -577,3 +577,13 @@ pub fn compare_f32(a, b &f32) int {
}
return 0
}
// a.pointers() returns a new array, where each element
// is the address of the corresponding element in a.
pub fn (a array) pointers() []voidptr {
mut res := []voidptr
for i in 0..a.len {
res << a.data + i * a.element_size
}
return res
}

View File

@ -3,7 +3,6 @@ module main
import os
import compiler.tests.repl.runner
import benchmark
import runtime
import sync
import filepath
@ -28,77 +27,65 @@ fn test_the_v_compiler_can_be_invoked() {
struct Session {
mut:
options runner.RunnerOptions
bmark benchmark.Benchmark
ntask int
ntask_mtx &sync.Mutex
waitgroup &sync.WaitGroup
options runner.RunnerOptions
bmark benchmark.Benchmark
}
fn test_all_v_repl_files() {
mut session := &Session{
options: runner.new_options()
bmark: benchmark.new_benchmark()
ntask: 0
ntask_mtx: sync.new_mutex()
waitgroup: sync.new_waitgroup()
}
// warmup, and ensure that the vrepl is compiled in single threaded mode if it does not exist
runner.run_repl_file(os.cachedir(), session.options.vexec, 'vlib/compiler/tests/repl/nothing.repl') or {
panic(err)
}
session.bmark.set_total_expected_steps( session.options.files.len )
mut ncpus := 0
ncpus = runtime.nr_cpus()
session.bmark.set_total_expected_steps(session.options.files.len)
mut pool_repl := sync.new_pool_processor({
callback: worker_repl
})
pool_repl.set_shared_context(session)
$if windows {
// See: https://docs.microsoft.com/en-us/cpp/build/reference/fs-force-synchronous-pdb-writes?view=vs-2019
ncpus = 1
// See: https://docs.microsoft.com/en-us/cpp/build/reference/fs-force-synchronous-pdb-writes?view=vs-2019
pool_repl.set_max_jobs(1)
}
session.waitgroup.add( ncpus )
for i:=0; i < ncpus; i++ {
go process_in_thread(session,i)
}
session.waitgroup.wait()
pool_repl.work_on_items<string>(session.options.files)
session.bmark.stop()
println(session.bmark.total_message('total time spent running REPL files'))
}
fn process_in_thread( session mut Session, thread_id int ){
fn worker_repl(p mut sync.PoolProcessor, idx int, thread_id int) voidptr {
cdir := os.cachedir()
mut tls_bench := benchmark.new_benchmark()
tls_bench.set_total_expected_steps( session.bmark.nexpected_steps )
for {
session.ntask_mtx.lock()
session.ntask++
idx := session.ntask-1
session.ntask_mtx.unlock()
if idx >= session.options.files.len { break }
tls_bench.cstep = idx
tfolder := filepath.join( cdir, 'vrepl_tests_$idx')
if os.is_dir( tfolder ) {
os.rmdir_all( tfolder )
}
os.mkdir( tfolder ) or { panic(err) }
file := session.options.files[ idx ]
session.bmark.step()
tls_bench.step()
fres := runner.run_repl_file(tfolder, session.options.vexec, file) or {
session.bmark.fail()
tls_bench.fail()
os.rmdir_all( tfolder )
eprintln(tls_bench.step_message_fail(err))
assert false
continue
}
session.bmark.ok()
tls_bench.ok()
os.rmdir_all( tfolder )
println(tls_bench.step_message_ok(fres))
assert true
mut session := &Session(p.get_shared_context())
mut tls_bench := &benchmark.Benchmark(p.get_thread_context(idx))
if isnil(tls_bench) {
tls_bench = benchmark.new_benchmark_pointer()
tls_bench.set_total_expected_steps(session.bmark.nexpected_steps)
p.set_thread_context(idx, tls_bench)
}
session.waitgroup.done()
tls_bench.cstep = idx
tfolder := filepath.join(cdir,'vrepl_tests_$idx')
if os.is_dir(tfolder) {
os.rmdir_all(tfolder)
}
os.mkdir(tfolder) or {
panic(err)
}
file := p.get_string_item(idx)
session.bmark.step()
tls_bench.step()
fres := runner.run_repl_file(tfolder, session.options.vexec, file) or {
session.bmark.fail()
tls_bench.fail()
os.rmdir_all(tfolder)
eprintln(tls_bench.step_message_fail(err))
assert false
return sync.no_result
}
session.bmark.ok()
tls_bench.ok()
os.rmdir_all(tfolder)
println(tls_bench.step_message_ok(fres))
assert true
return sync.no_result
}

197
vlib/sync/pool.v 100644
View File

@ -0,0 +1,197 @@
module sync
// * Goal: this file provides a convenient way to run identical tasks over a list
// * of items in parallel, without worrying about waitgroups, mutexes and so on.
// *
// * Usage example:
// * pool := sync.new_pool_processor({ callback: worker_cb })
// * //pool.work_on_items<string>(['a','b','c']) // TODO: vfmt and generics
// * pool.work_on_pointers(['a','b','c'].pointers())
// * // optionally, you can iterate over the results too:
// * for x in pool.get_results<IResult>() {
// * // do stuff with x
// * }
// *
// * See https://github.com/vlang/v/blob/master/vlib/sync/pool_test.v for a
// * more detailed usage example.
// *
// * After all the work is done in parallel by the worker threads in the pool,
// * pool.work_on_items will return, and you can then call
// * pool.get_results<Result>() to retrieve a list of all the results,
// * that the worker callbacks returned for each item that you passed.
// * The parameters of new_pool_processor are:
// * context.maxjobs: when 0 (the default), the PoolProcessor will use an
// * optimal for your system number of threads to process your items
// * context.callback: this should be a callback function, that each worker
// * thread in the pool will run for each item.
// * The callback function will receive as parameters:
// * 1) the PoolProcessor instance, so it can call
// * p.get_item<int>(idx) to get the actual item at index idx
// * NB: for now, you are better off calling p.get_string_item(idx)
// * or p.get_int_item(idx) ; TODO: vfmt and generics
// * 2) idx - the index of the currently processed item
// * 3) task_id - the index of the worker thread in which the callback
// * function is running.
import runtime
pub const (
no_result = voidptr(0)
)
pub struct PoolProcessor {
thread_cb voidptr
mut:
njobs int
items []voidptr
results []voidptr
ntask int // writing to this should be locked by ntask_mtx.
ntask_mtx &sync.Mutex
waitgroup &sync.WaitGroup
shared_context voidptr
thread_contexts []voidptr
}
pub type ThreadCB fn(p &PoolProcessor, idx int, task_id int)voidptr
pub struct PoolProcessorConfig {
maxjobs int
callback ThreadCB
}
// new_pool_processor returns a new PoolProcessor instance.
pub fn new_pool_processor(context PoolProcessorConfig) &PoolProcessor {
if isnil(context.callback) {
panic('You need to pass a valid callback to new_pool_processor.')
}
// TODO: remove this call.
// It prevents a V warning about unused module runtime.
runtime.nr_jobs()
pool := &PoolProcessor {
items: []
results: []
shared_context: voidptr(0)
thread_contexts: []
njobs: context.maxjobs
ntask: 0
ntask_mtx: sync.new_mutex()
waitgroup: sync.new_waitgroup()
thread_cb: context.callback
}
return pool
}
// set_max_jobs gives you the ability to override the number
// of jobs *after* the PoolProcessor had been created already.
pub fn (pool mut PoolProcessor) set_max_jobs(njobs int) {
pool.njobs = njobs
}
// work_on_items receives a list of items of type T,
// then starts a work pool of pool.njobs threads, each running
// pool.thread_cb in a loop, untill all items in the list,
// are processed.
// When pool.njobs is 0, the number of jobs is determined
// by the number of available cores on the system.
// work_on_items returns *after* all threads finish.
// You can optionally call get_results after that.
pub fn (pool mut PoolProcessor) work_on_items<T>(items []T) {
pool.work_on_pointers( items.pointers() )
}
pub fn (pool mut PoolProcessor) work_on_pointers(items []voidptr) {
mut njobs := runtime.nr_jobs()
if pool.njobs > 0 {
njobs = pool.njobs
}
pool.items = []
pool.results = []
pool.thread_contexts = []
pool.items << items
pool.results = [voidptr(0)].repeat(pool.items.len)
pool.thread_contexts << [voidptr(0)].repeat(pool.items.len)
pool.waitgroup.add(njobs)
for i := 0; i < njobs; i++ {
go process_in_thread(pool,i)
}
pool.waitgroup.wait()
}
// process_in_thread does the actual work of worker thread.
// It is a workaround for the current inability to pass a
// method in a callback.
fn process_in_thread(pool mut PoolProcessor, task_id int) {
cb := ThreadCB(pool.thread_cb)
mut idx := 0
ilen := pool.items.len
for {
if pool.ntask >= ilen {
break
}
pool.ntask_mtx.lock()
idx = pool.ntask
pool.ntask++
pool.ntask_mtx.unlock()
pool.results[idx] = cb(pool, idx, task_id)
}
pool.waitgroup.done()
}
// get_item - called by the worker callback.
// Retrieves a type safe instance of the currently processed item
pub fn (pool &PoolProcessor) get_item<T>(idx int) T {
return *(&T(pool.items[idx]))
}
// get_string_item - called by the worker callback.
// It does not use generics so it does not mess up vfmt.
// TODO: remove the need for this when vfmt becomes smarter.
pub fn (pool &PoolProcessor) get_string_item(idx int) string {
return *(&string(pool.items[idx]))
}
// get_int_item - called by the worker callback.
// It does not use generics so it does not mess up vfmt.
// TODO: remove the need for this when vfmt becomes smarter.
pub fn (pool &PoolProcessor) get_int_item(idx int) int {
return *(&int(pool.items[idx]))
}
pub fn (pool &PoolProcessor) get_result<T>(idx int) T {
return *(&T(pool.results[idx]))
}
// get_results - can be called to get a list of type safe results.
pub fn (pool &PoolProcessor) get_results<T>() []T {
mut res := []T
for i in 0 .. pool.results.len {
res << *(&T(pool.results[i]))
}
return res
}
// set_shared_context - can be called during the setup so that you can
// provide a context that is shared between all worker threads, like
// common options/settings.
pub fn (pool mut PoolProcessor) set_shared_context(context voidptr) {
pool.shared_context = context
}
// get_shared_context - can be called in each worker callback, to get
// the context set by pool.set_shared_context
pub fn (pool &PoolProcessor) get_shared_context() voidptr {
return pool.shared_context
}
// set_thread_context - can be called during the setup at the start of
// each worker callback, so that the worker callback can have some thread
// local storage area where it can write/read information that is private
// to the given thread, without worrying that it will get overwritten by
// another thread
pub fn (pool mut PoolProcessor) set_thread_context(idx int, context voidptr) {
pool.thread_contexts[idx] = context
}
// get_thread_context - returns a pointer, that was set with
// pool.set_thread_context . This pointer is private to each thread.
pub fn (pool &PoolProcessor) get_thread_context(idx int) voidptr {
return pool.thread_contexts[idx]
}

View File

@ -0,0 +1,58 @@
import sync
import time
import rand
struct SResult {
s string
}
fn worker_s(p &sync.PoolProcessor, idx int, worker_id int) voidptr {
// TODO: this works, but confuses vfmt. It should be used instead of
// p.get_int_item when vfmt becomes smarter.
// item := p.get_item<string>(idx)
item := p.get_string_item(idx)
println('worker_s worker_id: $worker_id | idx: $idx | item: ${item}')
time.sleep_ms(rand.next(3))
return &SResult{item + item}
}
struct IResult {
i int
}
fn worker_i(p &sync.PoolProcessor, idx int, worker_id int) voidptr {
// TODO: this works, but confuses vfmt. See the comment above.
// item := p.get_item<int>(idx)
item := p.get_int_item(idx)
println('worker_i worker_id: $worker_id | idx: $idx | item: ${item}')
time.sleep_ms(rand.next(5))
return &IResult{item * 1000}
}
fn test_work_on_strings() {
rand.seed(0)
mut pool_s := sync.new_pool_processor({
callback: worker_s
maxjobs: 8
})
pool_s.work_on_items<string>(['a','b','c','d','e','f','g','h','i','j'])
for x in pool_s.get_results<SResult>() {
println( x.s )
assert x.s.len > 1
}
}
fn test_work_on_ints() {
rand.seed(0)
// NB: since maxjobs is left empty here,
// the pool processor will use njobs = runtime.nr_jobs so that
// it will work optimally without overloading the system
mut pool_i := sync.new_pool_processor({
callback: worker_i
})
pool_i.work_on_items<int>([1,2,3,4,5,6,7,8])
for x in pool_i.get_results<IResult>() {
println( x.i )
assert x.i > 100
}
}