vlib/context: add onecontext as submodule (#12549)

pull/12644/head
Ulises Jeremias Cornejo Fandos 2021-12-02 06:15:07 -03:00 committed by GitHub
parent 2144471ce1
commit f7926ec9a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 369 additions and 3 deletions

View File

@ -8,6 +8,7 @@ const github_job = os.getenv('GITHUB_JOB')
const ( const (
skip_test_files = [ skip_test_files = [
'vlib/context/onecontext/onecontext_test.v',
'vlib/context/deadline_test.v' /* sometimes blocks */, 'vlib/context/deadline_test.v' /* sometimes blocks */,
'vlib/mysql/mysql_orm_test.v' /* mysql not installed */, 'vlib/mysql/mysql_orm_test.v' /* mysql not installed */,
'vlib/pg/pg_orm_test.v' /* pg not installed */, 'vlib/pg/pg_orm_test.v' /* pg not installed */,

View File

@ -51,9 +51,10 @@ mut:
pub fn with_cancel(mut parent Context) (Context, CancelFn) { pub fn with_cancel(mut parent Context) (Context, CancelFn) {
mut c := new_cancel_context(parent) mut c := new_cancel_context(parent)
propagate_cancel(mut parent, mut c) propagate_cancel(mut parent, mut c)
return Context(c), fn [mut c] () { cancel_fn := fn [mut c] () {
c.cancel(true, canceled) c.cancel(true, canceled)
} }
return Context(c), CancelFn(cancel_fn)
} }
// new_cancel_context returns an initialized CancelContext. // new_cancel_context returns an initialized CancelContext.

View File

@ -44,9 +44,10 @@ pub fn with_deadline(mut parent Context, d time.Time) (Context, CancelFn) {
dur := d - time.now() dur := d - time.now()
if dur.nanoseconds() <= 0 { if dur.nanoseconds() <= 0 {
ctx.cancel(true, deadline_exceeded) // deadline has already passed ctx.cancel(true, deadline_exceeded) // deadline has already passed
return Context(ctx), fn [mut ctx] () { cancel_fn := fn [mut ctx] () {
ctx.cancel(true, canceled) ctx.cancel(true, canceled)
} }
return Context(ctx), CancelFn(cancel_fn)
} }
if ctx.err() is none { if ctx.err() is none {
@ -55,9 +56,11 @@ pub fn with_deadline(mut parent Context, d time.Time) (Context, CancelFn) {
ctx.cancel(true, deadline_exceeded) ctx.cancel(true, deadline_exceeded)
}(mut ctx, dur) }(mut ctx, dur)
} }
return Context(ctx), fn [mut ctx] () {
cancel_fn := fn [mut ctx] () {
ctx.cancel(true, canceled) ctx.cancel(true, canceled)
} }
return Context(ctx), CancelFn(cancel_fn)
} }
// with_timeout returns with_deadline(parent, time.now().add(timeout)). // with_timeout returns with_deadline(parent, time.now().add(timeout)).

View File

@ -0,0 +1,40 @@
# onecontext
A library to merge existing V contexts.
## Overview
Have you ever faced the situation where you have to merge multiple existing contexts?
If not, then you might, eventually.
For example, we can face the situation where we are building an application
using a library that gives us a global context.
This context expires once the application is stopped.
Meanwhile, we are exposing a service like this:
```v ignore
fn (f Foo) get(ctx context.Context, bar Bar) ?Baz {
. . .
}
```
Here, we receive another context provided by the service.
Then, in the `get` implementation, we want for example to query a database and
we must provide a context for that.
Ideally, we would like to provide a merged context that would expire either:
- When the application is stopped
- Or when the received service context expires
This is exactly the purpose of this library.
In our case, we can now merge the two contexts in a single one like this:
```v ignore
ctx, cancel := onecontext.merge(ctx1, ctx2)
```
This returns a merged context that we can now propagate

View File

@ -0,0 +1,146 @@
module onecontext
import context
import sync
import time
// canceled is the error returned when the cancel function is called on a merged context
pub const canceled = error('canceled context')
struct OneContext {
mut:
ctx context.Context
ctxs []context.Context
done chan int
err IError = none
err_mutex sync.Mutex
cancel_fn context.CancelFn
cancel_ctx context.Context
}
// merge allows to merge multiple contexts
// it returns the merged context
pub fn merge(ctx context.Context, ctxs ...context.Context) (context.Context, context.CancelFn) {
mut background := context.background()
cancel_ctx, cancel := context.with_cancel(mut &background)
mut octx := &OneContext{
done: chan int{cap: 3}
ctx: ctx
ctxs: ctxs
cancel_fn: cancel
cancel_ctx: cancel_ctx
}
go octx.run()
return context.Context(octx), context.CancelFn(cancel)
}
pub fn (octx OneContext) deadline() ?time.Time {
mut min := time.Time{}
if deadline := octx.ctx.deadline() {
min = deadline
}
for ctx in octx.ctxs {
if deadline := ctx.deadline() {
if min.unix_time() == 0 || deadline < min {
min = deadline
}
}
}
if min.unix_time() == 0 {
return none
}
return min
}
pub fn (octx OneContext) done() chan int {
return octx.done
}
pub fn (mut octx OneContext) err() IError {
octx.err_mutex.@lock()
defer {
octx.err_mutex.unlock()
}
return octx.err
}
pub fn (octx OneContext) value(key context.Key) ?context.Any {
if value := octx.ctx.value(key) {
return value
}
for ctx in octx.ctxs {
if value := ctx.value(key) {
return value
}
}
return none
}
pub fn (mut octx OneContext) run() {
mut wrapped_ctx := &octx.ctx
if octx.ctxs.len == 1 {
mut first_ctx := &octx.ctxs[0]
octx.run_two_contexts(mut wrapped_ctx, mut first_ctx)
return
}
octx.run_multiple_contexts(mut wrapped_ctx)
for mut ctx in octx.ctxs {
octx.run_multiple_contexts(mut &ctx)
}
}
pub fn (octx OneContext) str() string {
return ''
}
pub fn (mut octx OneContext) cancel(err IError) {
octx.cancel_fn()
octx.err_mutex.@lock()
octx.err = err
octx.err_mutex.unlock()
if !octx.done.closed {
octx.done <- 0
octx.done.close()
}
}
pub fn (mut octx OneContext) run_two_contexts(mut ctx1 context.Context, mut ctx2 context.Context) {
go fn (mut octx OneContext, mut ctx1 context.Context, mut ctx2 context.Context) {
octx_cancel_done := octx.cancel_ctx.done()
c1done := ctx1.done()
c2done := ctx2.done()
select {
_ := <-octx_cancel_done {
octx.cancel(onecontext.canceled)
}
_ := <-c1done {
octx.cancel(ctx1.err())
}
_ := <-c2done {
octx.cancel(ctx1.err())
}
}
}(mut &octx, mut &ctx1, mut &ctx2)
}
pub fn (mut octx OneContext) run_multiple_contexts(mut ctx context.Context) {
go fn (mut octx OneContext, mut ctx context.Context) {
octx_cancel_done := octx.cancel_ctx.done()
cdone := ctx.done()
select {
_ := <-octx_cancel_done {
octx.cancel(onecontext.canceled)
}
_ := <-cdone {
octx.cancel(ctx.err())
}
}
}(mut &octx, mut &ctx)
}

View File

@ -0,0 +1,175 @@
module onecontext
import context
import time
fn eventually(ch chan int) bool {
mut background := context.background()
mut timeout, cancel := context.with_timeout(mut &background, 30 * time.millisecond)
defer {
cancel()
}
tdone := timeout.done()
select {
_ := <-ch {
return true
}
_ := <-tdone {
return false
}
}
return false
}
struct Value {
val string
}
fn test_merge_nomilan() {
mut background := context.background()
foo := &Value{
val: 'foo'
}
mut value_ctx1 := context.with_value(background, 'foo', foo)
mut ctx1, cancel := context.with_cancel(mut &value_ctx1)
defer {
cancel()
}
bar := &Value{
val: 'bar'
}
mut value_ctx2 := context.with_value(background, 'bar', bar)
mut ctx2, _ := context.with_cancel(mut &value_ctx2)
mut ctx, cancel2 := merge(ctx1, ctx2)
if deadline := ctx.deadline() {
panic('this should never happen')
}
val1 := ctx.value('foo') or { panic('wrong value access for key `foo`') }
match val1 {
Value {
assert foo == val1
}
else {
assert false
}
}
val2 := ctx.value('bar') or { panic('wrong value access for key `bar`') }
match val2 {
Value {
assert bar == val2
}
else {
assert false
}
}
if _ := ctx.value('baz') {
panic('this should never happen')
}
assert !eventually(ctx.done())
assert ctx.err() is none
cancel2()
assert eventually(ctx.done())
assert ctx.err() is Error
}
fn test_merge_deadline_context_1() {
mut background := context.background()
mut ctx1, cancel := context.with_timeout(mut &background, time.second)
defer {
cancel()
}
ctx2 := context.background()
mut ctx, _ := merge(ctx1, ctx2)
if deadline := ctx.deadline() {
assert deadline.unix_time() != 0
} else {
panic('this should never happen')
}
}
fn test_merge_deadline_context_2() {
mut background := context.background()
ctx1 := context.background()
mut ctx2, cancel := context.with_timeout(mut &background, time.second)
defer {
cancel()
}
mut ctx, _ := merge(ctx1, ctx2)
if deadline := ctx.deadline() {
assert deadline.unix_time() != 0
} else {
panic('this should never happen')
}
}
fn test_merge_deadline_context_n() {
mut background := context.background()
ctx1 := context.background()
mut ctxs := []context.Context{cap: 21}
for i in 0 .. 10 {
ctxs << context.background()
}
mut ctx_n, _ := context.with_timeout(mut &background, time.second)
ctxs << ctx_n
for i in 0 .. 10 {
ctxs << context.background()
}
mut ctx, cancel := merge(ctx1, ...ctxs)
assert !eventually(ctx.done())
assert ctx.err() is none
cancel()
assert eventually(ctx.done())
assert ctx.err() is Error
}
fn test_merge_deadline_none() {
ctx1 := context.background()
ctx2 := context.background()
mut ctx, _ := merge(ctx1, ctx2)
if _ := ctx.deadline() {
panic('this should never happen')
}
}
fn test_merge_cancel_two() {
ctx1 := context.background()
ctx2 := context.background()
mut ctx, cancel := merge(ctx1, ctx2)
cancel()
assert eventually(ctx.done())
assert ctx.err() is Error
assert ctx.err().str() == 'canceled context'
}
fn test_merge_cancel_multiple() {
ctx1 := context.background()
ctx2 := context.background()
ctx3 := context.background()
mut ctx, cancel := merge(ctx1, ctx2, ctx3)
cancel()
assert eventually(ctx.done())
assert ctx.err() is Error
assert ctx.err().str() == 'canceled context'
}