cgen, sync: implement separate read/write locks for `rwshared` types (#5687)
							parent
							
								
									3cd9e2cab7
								
							
						
					
					
						commit
						c3614c0e38
					
				|  | @ -389,12 +389,24 @@ fn C.ReleaseMutex(voidptr) bool | |||
| fn C.CreateEvent(int, bool, bool, byteptr) voidptr | ||||
| fn C.SetEvent(voidptr) int | ||||
| 
 | ||||
| fn C.InitializeSRWLock(voidptr) | ||||
| fn C.AcquireSRWLockShared(voidptr) | ||||
| fn C.AcquireSRWLockExclusive(voidptr) | ||||
| fn C.ReleaseSRWLockShared(voidptr) | ||||
| fn C.ReleaseSRWLockExclusive(voidptr) | ||||
| 
 | ||||
| // pthread.h
 | ||||
| fn C.pthread_mutex_init(voidptr, voidptr) int | ||||
| fn C.pthread_mutex_lock(voidptr) int | ||||
| fn C.pthread_mutex_unlock(voidptr) int | ||||
| 
 | ||||
| fn C.pthread_rwlockattr_init(voidptr) int | ||||
| fn C.pthread_rwlockattr_setkind_np(voidptr, int) int | ||||
| fn C.pthread_rwlock_init(voidptr, voidptr) int | ||||
| fn C.pthread_rwlock_rdlock(voidptr) int | ||||
| fn C.pthread_rwlock_wrlock(voidptr) int | ||||
| fn C.pthread_rwlock_unlock(voidptr) int | ||||
| 
 | ||||
| fn C.read(fd int, buf voidptr, count size_t) int | ||||
| fn C.write(fd int, buf voidptr, count size_t) int | ||||
| fn C.close(fd int) int | ||||
|  |  | |||
|  | @ -11,12 +11,32 @@ pub struct Mutex { | |||
| 	mutex C.pthread_mutex_t | ||||
| } | ||||
| 
 | ||||
| [ref_only] | ||||
| pub struct RwMutex { | ||||
| 	mutex C.pthread_rwlock_t | ||||
| } | ||||
| 
 | ||||
| [ref_only] | ||||
| struct RwMutexAttr { | ||||
| 	attr C.pthread_rwlockattr_t | ||||
| } | ||||
| 
 | ||||
| pub fn new_mutex() &Mutex { | ||||
| 	m := &Mutex{} | ||||
| 	C.pthread_mutex_init(&m.mutex, C.NULL) | ||||
| 	return m | ||||
| } | ||||
| 
 | ||||
| pub fn new_rwmutex() &RwMutex { | ||||
| 	m := &RwMutex{} | ||||
| 	a := &RwMutexAttr{} | ||||
| 	C.pthread_rwlockattr_init(&a.attr) | ||||
| 	// Give writer priority over readers
 | ||||
| 	C.pthread_rwlockattr_setkind_np(&a.attr, C.PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP) | ||||
| 	C.pthread_rwlock_init(&m.mutex, &a.attr) | ||||
| 	return m | ||||
| } | ||||
| 
 | ||||
| // m_lock(), for *manual* mutex handling, since `lock` is a keyword
 | ||||
| pub fn (mut m Mutex) m_lock() { | ||||
| 	C.pthread_mutex_lock(&m.mutex) | ||||
|  | @ -25,3 +45,22 @@ pub fn (mut m Mutex) m_lock() { | |||
| pub fn (mut m Mutex) unlock() { | ||||
| 	C.pthread_mutex_unlock(&m.mutex) | ||||
| } | ||||
| 
 | ||||
| // RwMutex has separate read- and write locks
 | ||||
| pub fn (mut m RwMutex) r_lock() { | ||||
| 	C.pthread_rwlock_rdlock(&m.mutex) | ||||
| } | ||||
| 
 | ||||
| pub fn (mut m RwMutex) w_lock() { | ||||
| 	C.pthread_rwlock_wrlock(&m.mutex) | ||||
| } | ||||
| 
 | ||||
| // Windows SRWLocks have different function to unlock
 | ||||
| // So provide two functions here, too, to have a common interface
 | ||||
| pub fn (mut m RwMutex) r_unlock() { | ||||
| 	C.pthread_rwlock_unlock(&m.mutex) | ||||
| } | ||||
| 
 | ||||
| pub fn (mut m RwMutex) w_unlock() { | ||||
| 	C.pthread_rwlock_unlock(&m.mutex) | ||||
| } | ||||
|  |  | |||
|  | @ -22,6 +22,12 @@ mut: | |||
| 	writer_sem   u32        // writer semaphones
 | ||||
| } | ||||
| 
 | ||||
| [ref_only] | ||||
| pub struct RwMutex { | ||||
| mut: | ||||
| 	mx C.SRWLOCK    // mutex handle
 | ||||
| } | ||||
| 
 | ||||
| enum MutexState { | ||||
| 	broken | ||||
| 	waiting | ||||
|  | @ -43,6 +49,12 @@ pub fn new_mutex() &Mutex { | |||
| 	return sm | ||||
| } | ||||
| 
 | ||||
| pub fn new_rwmutex() &RwMutex { | ||||
| 	m := &RwMutex{} | ||||
| 	C.InitializeSRWLock(&m.mx) | ||||
| 	return m | ||||
| } | ||||
| 
 | ||||
| pub fn (mut m Mutex) m_lock() { | ||||
| 	// if mutex handle not initalized
 | ||||
| 	if isnil(m.mx) { | ||||
|  | @ -80,6 +92,25 @@ pub fn (mut m Mutex) unlock() { | |||
| 	m.state = .released | ||||
| } | ||||
| 
 | ||||
| // RwMutex has separate read- and write locks
 | ||||
| pub fn (mut m RwMutex) r_lock() { | ||||
| 	C.AcquireSRWLockShared(&m.mx) | ||||
| } | ||||
| 
 | ||||
| pub fn (mut m RwMutex) w_lock() { | ||||
| 	C.AcquireSRWLockExclusive(&m.mx) | ||||
| } | ||||
| 
 | ||||
| // Windows SRWLocks have different function to unlock
 | ||||
| // So provide two functions here, too, to have a common interface
 | ||||
| pub fn (mut m RwMutex) r_unlock() { | ||||
| 	C.ReleaseSRWLockShared(&m.mx) | ||||
| } | ||||
| 
 | ||||
| pub fn (mut m RwMutex) w_unlock() { | ||||
| 	C.ReleaseSRWLockExclusive(&m.mx) | ||||
| } | ||||
| 
 | ||||
| pub fn (mut m Mutex) destroy() { | ||||
| 	if m.state == .waiting { | ||||
| 		m.unlock() // unlock mutex before destroying
 | ||||
|  |  | |||
|  | @ -402,8 +402,7 @@ fn (mut g Gen) find_or_register_shared(t table.Type, base string) string { | |||
| 	if (is_rw && t_idx in g.rwshareds) || (!is_rw && t_idx in g.shareds) { | ||||
| 		return sh_typ | ||||
| 	} | ||||
| 	// TODO: These two should become different...
 | ||||
| 	mtx_typ := if is_rw { 'sync__Mutex' } else { 'sync__Mutex' } | ||||
| 	mtx_typ := if is_rw { 'sync__RwMutex' } else { 'sync__Mutex' } | ||||
| 	g.hotcode_definitions.writeln('struct $sh_typ { $base val; $mtx_typ* mtx; };') | ||||
| 	g.typedefs2.writeln('typedef struct $sh_typ $sh_typ;') | ||||
| 	// println('registered shared type $sh_typ')
 | ||||
|  | @ -2058,32 +2057,31 @@ fn (mut g Gen) infix_expr(node ast.InfixExpr) { | |||
| } | ||||
| 
 | ||||
| fn (mut g Gen) lock_expr(node ast.LockExpr) { | ||||
| 	mut lock_prefixes := []byte{len: 0, cap: node.lockeds.len} | ||||
| 	for id in node.lockeds { | ||||
| 		name := id.name | ||||
| 		deref := if id.is_mut { '->' } else { '.' } | ||||
| 		// TODO: use 3 different locking functions
 | ||||
| 		if node.is_rlock { | ||||
| 			g.writeln('sync__Mutex_m_lock(${name}${deref}mtx);') | ||||
| 		} else if id.var_info().typ.has_flag(.atomic_or_rw) { | ||||
| 			g.writeln('sync__Mutex_m_lock(${name}${deref}mtx);') | ||||
| 		} else { | ||||
| 			g.writeln('sync__Mutex_m_lock(${name}${deref}mtx);') | ||||
| 		// fields of `id` itself have no valid information, so look up type(flags) in scope
 | ||||
| 		obj := g.file.scope.innermost(id.pos.pos).find(name) or { | ||||
| 			verror('cgen: unable to get type for lock variable $name') | ||||
| 			ast.ScopeObject{} | ||||
| 		} | ||||
| 		typ := if obj is ast.Var { (*(obj as ast.Var)).typ } else { table.Type(0) } | ||||
| 		deref := if id.is_mut { '->' } else { '.' } | ||||
| 		lock_prefix := if node.is_rlock { `r` } else { if typ.has_flag(.atomic_or_rw) { `w` } else { `m` } } | ||||
| 		lock_prefixes << lock_prefix // keep for unlock
 | ||||
| 		mut_prefix := if lock_prefix == `m` { '' } else { 'Rw' } | ||||
| 		g.writeln('sync__${mut_prefix}Mutex_${lock_prefix:c}_lock(${name}${deref}mtx);') | ||||
| 	} | ||||
| 	g.stmts(node.stmts) | ||||
| 	// unlock in reverse order
 | ||||
| 	for i := node.lockeds.len-1; i >= 0; i-- { | ||||
| 		id := node.lockeds[i] | ||||
| 		lock_prefix := lock_prefixes[i] | ||||
| 		name := id.name | ||||
| 		deref := if id.is_mut { '->' } else { '.' } | ||||
| 		// TODO: use 3 different unlocking functions
 | ||||
| 		if node.is_rlock { | ||||
| 			g.writeln('sync__Mutex_unlock(${name}${deref}mtx);') | ||||
| 		} else if id.var_info().typ.has_flag(.atomic_or_rw) { | ||||
| 			g.writeln('sync__Mutex_unlock(${name}${deref}mtx);') | ||||
| 		} else { | ||||
| 			g.writeln('sync__Mutex_unlock(${name}${deref}mtx);') | ||||
| 		} | ||||
| 		mut_prefix := if lock_prefix == `m` { '' } else { 'Rw' } | ||||
| 		unlock_type := if lock_prefix == `m` { '' } else { '${lock_prefix:c}_' } | ||||
| 		g.writeln('sync__${mut_prefix}Mutex_${unlock_type}unlock(${name}${deref}mtx);') | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
|  | @ -2752,6 +2750,7 @@ const ( | |||
| fn (mut g Gen) struct_init(struct_init ast.StructInit) { | ||||
| 	styp := g.typ(struct_init.typ) | ||||
| 	mut shared_styp := '' // only needed for shared &St{...
 | ||||
| 	mut share_prefix := '' // 'rw' for `rwshared`
 | ||||
| 	if styp in skip_struct_init { | ||||
| 		g.go_back_out(3) | ||||
| 		return | ||||
|  | @ -2765,6 +2764,7 @@ fn (mut g Gen) struct_init(struct_init ast.StructInit) { | |||
| 			mut shared_typ := struct_init.typ.set_flag(.shared_f) | ||||
| 			if g.is_rwshared { | ||||
| 				shared_typ = shared_typ.set_flag(.atomic_or_rw) | ||||
| 				share_prefix = 'rw' | ||||
| 			} | ||||
| 			shared_styp = g.typ(shared_typ) | ||||
| 			g.writeln('($shared_styp*)memdup(&($shared_styp){.val = ($styp){') | ||||
|  | @ -2870,7 +2870,7 @@ fn (mut g Gen) struct_init(struct_init ast.StructInit) { | |||
| 	} | ||||
| 	g.write('}') | ||||
| 	if g.is_shared { | ||||
| 		g.write(', .mtx = sync__new_mutex()}') | ||||
| 		g.write(', .mtx = sync__new_${share_prefix}mutex()}') | ||||
| 		if is_amp { | ||||
| 			g.write(', sizeof($shared_styp))') | ||||
| 		} | ||||
|  |  | |||
|  | @ -195,10 +195,22 @@ $c_common_macros | |||
| #pragma comment(lib, "Dbghelp.lib") | ||||
| 
 | ||||
| extern wchar_t **_wenviron; | ||||
| #elif !defined(SRWLOCK_INIT) | ||||
| // these seem to be missing on Windows tcc
 | ||||
| typedef struct SRWLOCK { void* SRWLOCK; } SRWLOCK; | ||||
| void InitializeSRWLock(void*); | ||||
| void AcquireSRWLockShared(void*); | ||||
| void AcquireSRWLockExclusive(void*); | ||||
| void ReleaseSRWLockShared(void*); | ||||
| void ReleaseSRWLockExclusive(void*); | ||||
| #endif | ||||
| 
 | ||||
| #else | ||||
| #include <pthread.h> | ||||
| #ifndef PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP | ||||
| // musl does not have that
 | ||||
| #define pthread_rwlockattr_setkind_np(a, b) | ||||
| #endif | ||||
| #endif | ||||
| 
 | ||||
| // g_live_info is used by live.info()
 | ||||
|  |  | |||
|  | @ -0,0 +1,58 @@ | |||
| import sync | ||||
| import time | ||||
| 
 | ||||
| struct St { | ||||
| mut: | ||||
| 	a int | ||||
| } | ||||
| 
 | ||||
| fn f(rwshared x St, shared z St) { | ||||
| 	for _ in 0..reads_per_thread { | ||||
| 		rlock x { // other instances may read at the same time
 | ||||
| 			time.sleep_ms(1) | ||||
| 			assert x.a == 7 || x.a == 5 | ||||
| 		} | ||||
| 	} | ||||
| 	lock z { | ||||
| 		z.a-- | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| const ( | ||||
| 	reads_per_thread = 30 | ||||
| 	read_threads = 10 | ||||
| 	writes = 5 | ||||
| ) | ||||
| 
 | ||||
| fn test_shared_lock() { | ||||
| 	// object with separate read/write lock
 | ||||
| 	rwshared x := &St{ | ||||
| 		a: 5 | ||||
| 	} | ||||
| 	shared z := &St{ | ||||
| 		a: read_threads | ||||
| 	} | ||||
| 	for _ in 0..read_threads { | ||||
| 		go f(rwshared x, shared z) | ||||
| 	} | ||||
| 	for i in 0..writes { | ||||
| 		lock x { // wait for ongoing reads to finish, don't start new ones
 | ||||
| 			x.a = 17 // this should never be read
 | ||||
| 			time.sleep_ms(50) | ||||
| 			x.a = if (i&1) == 0 { 7 } else { 5 } | ||||
| 		} // now new reads are possible again
 | ||||
| 		time.sleep_ms(20) | ||||
| 	} | ||||
| 	// wait until all read threads are finished
 | ||||
| 	for finished := false; ; { | ||||
| 		mut rr := 0 | ||||
| 		lock z { | ||||
| 			rr = z.a | ||||
| 			finished = z.a == 0 | ||||
| 		} | ||||
| 		if finished { | ||||
| 			break | ||||
| 		} | ||||
| 		time.sleep_ms(100) | ||||
| 	} | ||||
| } | ||||
|  | @ -0,0 +1,58 @@ | |||
| import sync | ||||
| import time | ||||
| 
 | ||||
| struct St { | ||||
| mut: | ||||
| 	a int | ||||
| } | ||||
| 
 | ||||
| fn (rwshared x St) f(shared z St) { | ||||
| 	for _ in 0..reads_per_thread { | ||||
| 		rlock x { // other instances may read at the same time
 | ||||
| 			time.sleep_ms(1) | ||||
| 			assert x.a == 7 || x.a == 5 | ||||
| 		} | ||||
| 	} | ||||
| 	lock z { | ||||
| 		z.a-- | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| const ( | ||||
| 	reads_per_thread = 30 | ||||
| 	read_threads = 10 | ||||
| 	writes = 5 | ||||
| ) | ||||
| 
 | ||||
| fn test_shared_lock() { | ||||
| 	// object with separate read/write lock
 | ||||
| 	rwshared x := &St{ | ||||
| 		a: 5 | ||||
| 	} | ||||
| 	shared z := &St{ | ||||
| 		a: read_threads | ||||
| 	} | ||||
| 	for _ in 0..read_threads { | ||||
| 		go x.f(shared z) | ||||
| 	} | ||||
| 	for i in 0..writes { | ||||
| 		lock x { // wait for ongoing reads to finish, don't start new ones
 | ||||
| 			x.a = 17 // this value should never be read
 | ||||
| 			time.sleep_ms(50) | ||||
| 			x.a = if (i&1) == 0 { 7 } else { 5 } | ||||
| 		} // now new reads are possible again
 | ||||
| 		time.sleep_ms(20) | ||||
| 	} | ||||
| 	// wait until all read threads are finished
 | ||||
| 	for finished := false; ; { | ||||
| 		mut rr := 0 | ||||
| 		lock z { | ||||
| 			rr = z.a | ||||
| 			finished = z.a == 0 | ||||
| 		} | ||||
| 		if finished { | ||||
| 			break | ||||
| 		} | ||||
| 		time.sleep_ms(100) | ||||
| 	} | ||||
| } | ||||
		Loading…
	
		Reference in New Issue