diff options
-rw-r--r-- | bld.tags | 4 | ||||
-rw-r--r-- | lib/std/errno.myr | 1 | ||||
-rw-r--r-- | lib/sys/sys+freebsd-x64.myr | 7 | ||||
-rw-r--r-- | lib/sys/sys+osx-x64.myr | 23 | ||||
-rw-r--r-- | lib/sys/syserrno+linux.myr | 1 | ||||
-rw-r--r-- | lib/sys/syserrno+osx.myr | 1 | ||||
-rw-r--r-- | lib/thread/bld.sub | 11 | ||||
-rw-r--r-- | lib/thread/futex+freebsd.myr | 25 | ||||
-rw-r--r-- | lib/thread/futex+linux.myr | 16 | ||||
-rw-r--r-- | lib/thread/futex+openbsd:6.2.myr | 16 | ||||
-rw-r--r-- | lib/thread/futex+osx.myr | 44 | ||||
-rw-r--r-- | lib/thread/mutex+freebsd.myr | 81 | ||||
-rw-r--r-- | lib/thread/mutex+futex.myr (renamed from lib/thread/mutex+linux.myr) | 9 | ||||
-rw-r--r-- | lib/thread/mutex+openbsd:6.2.myr | 76 | ||||
-rw-r--r-- | lib/thread/sem+futex.myr | 55 | ||||
-rw-r--r-- | lib/thread/sem+plan9.myr | 54 | ||||
-rw-r--r-- | lib/thread/sem.myr | 66 | ||||
-rw-r--r-- | lib/thread/spawn+osx.myr | 1 | ||||
-rw-r--r-- | lib/thread/test/sem.myr | 42 |
19 files changed, 367 insertions, 166 deletions
diff --git a/bld.tags b/bld.tags new file mode 100644 index 0000000..ce47dd1 --- /dev/null +++ b/bld.tags @@ -0,0 +1,4 @@ +futex: freebsd +futex: linux +futex: openbsd:6.2 +futex: osx diff --git a/lib/std/errno.myr b/lib/std/errno.myr index 78e1d0a..9455fa9 100644 --- a/lib/std/errno.myr +++ b/lib/std/errno.myr @@ -38,4 +38,5 @@ pkg std = const Epipe : errno = (sys.Epipe : errno) const Edom : errno = (sys.Edom : errno) const Erange : errno = (sys.Erange : errno) + const Etimedout : errno = (sys.Etimedout : errno) ;; diff --git a/lib/sys/sys+freebsd-x64.myr b/lib/sys/sys+freebsd-x64.myr index 440f396..a140f0a 100644 --- a/lib/sys/sys+freebsd-x64.myr +++ b/lib/sys/sys+freebsd-x64.myr @@ -581,6 +581,13 @@ pkg sys = node : uint8[_Uuidnodesz]; ;; + const Umtxabstime = 1 + type _umtx_time = struct + _timeout : timespec + _flags : uint32 + _clockid : uint32 + ;; + /* open options */ const Ordonly : fdopt = 0x0 const Owronly : fdopt = 0x1 diff --git a/lib/sys/sys+osx-x64.myr b/lib/sys/sys+osx-x64.myr index 3de3875..bd1f427 100644 --- a/lib/sys/sys+osx-x64.myr +++ b/lib/sys/sys+osx-x64.myr @@ -18,6 +18,7 @@ pkg sys = type machport = int32 type signo = int32 type sigflags = int32 + type ulockop = uint32 type fdset = struct bits : int32[1024/4] @@ -388,6 +389,14 @@ pkg sys = const Sigusr1 : signo = 30 /* user defined signal 1 */ const Sigusr2 : signo = 31 /* user defined signal 2 */ + /* ulock ops */ + const Ulockcompareandwait : ulockop = 0x00000001 + const Ulockunfairlock : ulockop = 0x00000002 + const Ulockulfwakeall : ulockop = 0x00000100 + const Ulockulfwakethread : ulockop = 0x00000200 + const Ulockwaitworkqdatacontention : ulockop = 0x00010000 + const Ulocknoerrno : ulockop = 0x01000000 + /* syscalls. note, creat() implemented as open(path, Creat|Trunc|Wronly) */ const Syssyscall : scno = 0x2000000 @@ -737,6 +746,8 @@ pkg sys = const Syspid_resume : scno = 0x20001af const Sysfileport_makeport : scno = 0x20001b0 const Sysfileport_makefd : scno = 0x20001b1 + const Sysulock_wait : scno = 0x2000203 + const Sysulock_wake : scno = 0x2000204 extern const syscall : (sc:scno, args:... -> int64) @@ -823,6 +834,10 @@ pkg sys = new : void#, newsz : size# \ -> int) + /* ulock */ + const ulock_wait : (op : ulockop, uaddr : uint64#, val : uint64, timeout : uint32 -> int) + const ulock_wake : (op : ulockop, uaddr : uint64#, wakeval : uint64 -> int) + /* filled by start code */ extern var __cenvp : byte## ;; @@ -1081,6 +1096,14 @@ const sysctl = {mib, old, oldsz, new, newsz (mib : int#), a(mib.len), old, oldsz, new, newsz) : int) } +const ulock_wait = {op, uaddr, val, timeout + -> (syscall(Sysulock_wait, a(op), uaddr, val, a(timeout)) : int) +} + +const ulock_wake = {op, uaddr, wakeval + -> (syscall(Sysulock_wake, a(op), uaddr, a(wakeval)) : int) +} + const waitstatus = {st if st < 0 -> `Waitfail st diff --git a/lib/sys/syserrno+linux.myr b/lib/sys/syserrno+linux.myr index 18a3fc2..a1bf690 100644 --- a/lib/sys/syserrno+linux.myr +++ b/lib/sys/syserrno+linux.myr @@ -35,4 +35,5 @@ pkg sys = const Epipe : errno = -32 /* Broken pipe */ const Edom : errno = -33 /* Math argument out of domain of func */ const Erange : errno = -34 /* Math result not representable */ + const Etimedout : errno = -110 /* Operation timed out */ ;; diff --git a/lib/sys/syserrno+osx.myr b/lib/sys/syserrno+osx.myr index 7b5d888..3544308 100644 --- a/lib/sys/syserrno+osx.myr +++ b/lib/sys/syserrno+osx.myr @@ -51,4 +51,5 @@ pkg sys = const Eprototype : errno = -41 /* Protocol wrong type for socket */ const Enoprotoopt : errno = -42 /* Protocol not available */ const Eprotonosupport : errno = -43 /* Protocol not supported */ + const Etimedout : errno = -60 /* Operation timed out */ ;; diff --git a/lib/thread/bld.sub b/lib/thread/bld.sub index f386162..72cda98 100644 --- a/lib/thread/bld.sub +++ b/lib/thread/bld.sub @@ -1,7 +1,10 @@ lib thread = common.myr hookstd.myr # install thread hooks + mutex+futex.myr + sem+futex.myr mutex.myr # fallback, for unimplemented platforms + sem.myr # fallback, for unimplemented platforms #generic fallbacks ncpu.myr @@ -9,14 +12,14 @@ lib thread = # linux impl of basic thread primitives #condvar+linux.myr exit+linux-x64.s - mutex+linux.myr + futex+linux.myr ncpu+linux.myr spawn+linux.myr # freebsd impl of thread primitives #condvar+freebsd.myr exit+freebsd-x64.s - mutex+freebsd.myr + futex+freebsd.myr ncpu+freebsd.myr spawn+freebsd.myr @@ -29,6 +32,7 @@ lib thread = # osx impl of thread primitives #condvar+osx.myr + futex+osx.myr spawn+osx.myr start+osx-x64.s @@ -37,11 +41,12 @@ lib thread = atomic-impl+plan9-x64.s mutex+plan9.myr ncpu+plan9.myr + sem+plan9.myr spawn+plan9.myr # openbsd impl of thread primitives exit+openbsd-x64.s - mutex+openbsd:6.2.myr + futex+openbsd:6.2.myr ncpu+openbsd.myr spawn+openbsd.myr diff --git a/lib/thread/futex+freebsd.myr b/lib/thread/futex+freebsd.myr new file mode 100644 index 0000000..1ca52c7 --- /dev/null +++ b/lib/thread/futex+freebsd.myr @@ -0,0 +1,25 @@ +use sys + +use "common" + +pkg thread = + const ftxwait : (uaddr : uint64#, val : uint64, timeout : sys.timespec# -> int) + const ftxwake : (uaddr : uint64# -> int) +;; + +const ftxwait = {uaddr, val, timeout + if timeout == Zptr + -> sys.umtx_op((uaddr : void#), sys.Umtxwaituintpriv, (val : uint64), Zptr, Zptr) + ;; + + var ut : sys._umtx_time = [ + ._timeout = timeout# + ._flags = sys.Umtxabstime + ._clockid = 1 /* CLOCK_MONOTONIC. Not exported from sys. */ + ] + -> sys.umtx_op((uaddr : void#), sys.Umtxwaituintpriv, (val : uint64), (sys.sizeof(sys._umtx_time) : void#), &ut) +} + +const ftxwake = {uaddr + -> sys.umtx_op((uaddr : void#), sys.Umtxwakepriv, 1, Zptr, Zptr) +} diff --git a/lib/thread/futex+linux.myr b/lib/thread/futex+linux.myr new file mode 100644 index 0000000..e6cec24 --- /dev/null +++ b/lib/thread/futex+linux.myr @@ -0,0 +1,16 @@ +use sys + +use "common" + +pkg thread = + const ftxwait : (uaddr : uint64#, val : uint64, timeout : sys.timespec# -> int) + const ftxwake : (uaddr : uint64# -> int) +;; + +const ftxwait = {uaddr, val, timeout + -> sys.futex((uaddr : int32#), sys.Futexwait | sys.Futexpriv, val, timeout, Zptr, 0) +} + +const ftxwake = {uaddr + -> sys.futex((uaddr : int32#), sys.Futexwake | sys.Futexpriv, 1, Zptr, Zptr, 0) +} diff --git a/lib/thread/futex+openbsd:6.2.myr b/lib/thread/futex+openbsd:6.2.myr new file mode 100644 index 0000000..bbe2c17 --- /dev/null +++ b/lib/thread/futex+openbsd:6.2.myr @@ -0,0 +1,16 @@ +use sys + +use "common" + +pkg thread = + const ftxwait : (uaddr : uint64#, val : uint64, timeout : sys.timespec# -> int) + const ftxwake : (uaddr : uint64# -> int) +;; + +const ftxwait = {uaddr, val, timeout + -> sys.futex((uaddr : uint32#), sys.Futexwait, val, timeout, Zptr) +} + +const ftxwake = {uaddr + -> sys.futex((uaddr : uint32#), sys.Futexwake, 1, Zptr, Zptr) +} diff --git a/lib/thread/futex+osx.myr b/lib/thread/futex+osx.myr new file mode 100644 index 0000000..c2b47d9 --- /dev/null +++ b/lib/thread/futex+osx.myr @@ -0,0 +1,44 @@ +use std +use sys + +use "common" + +pkg thread = + const ftxwait : (uaddr : uint64#, val : uint64, timeout : sys.timespec# -> int) + const ftxwake : (uaddr : uint64# -> int) +;; + +/* + * The ulock_ functions are undocumented but the relevant source can be found at + * https://github.com/apple/darwin-xnu/blob/0a798f6738bc1db01281fc08ae024145e84df927/bsd/kern/sys_ulock.c + */ +const ftxwait = {uaddr, val, timeout + if timeout == Zptr + -> sys.ulock_wait(sys.Ulockcompareandwait, uaddr, val, 0) + ;; + + var ts + var err = sys.clock_gettime(`sys.Clockmonotonic, &ts) + std.assert(err == 0, "error: clock_gettime returned {}\n", err) + + var usec = 0 + if timeout.sec > ts.sec + var sec = (timeout.sec - ts.sec) * 1000 + std.assert(sec <= 0xffffffff, "error: maximum futex timeout exceeded\n") + usec = (sec : uint32) + ;; + if timeout.nsec > ts.nsec + var nsec = (timeout.nsec - ts.nsec) / 1000 + std.assert(usec + nsec > usec, "error: maximum futex timeout exceeded\n") + usec += nsec + ;; + + if usec == 0 + -> (std.Etimedout : int) + ;; + -> sys.ulock_wait(sys.Ulockcompareandwait, uaddr, val, 0) +} + +const ftxwake = {uaddr + -> sys.ulock_wake(sys.Ulockcompareandwait, uaddr, 0) +} diff --git a/lib/thread/mutex+freebsd.myr b/lib/thread/mutex+freebsd.myr deleted file mode 100644 index fc1f8c2..0000000 --- a/lib/thread/mutex+freebsd.myr +++ /dev/null @@ -1,81 +0,0 @@ -use std -use sys - -use "atomic" -use "common" - -pkg thread = - type mutex = struct - _state : uint32 - ;; - - const mkmtx : (-> mutex) - const mtxlock : (mtx : mutex# -> void) - const mtxtrylock : (mtx : mutex# -> bool) - const mtxunlock : (mtx : mutex# -> void) - - pkglocal const Unlocked = 0 - pkglocal const Locked = 1 - pkglocal const Contended = 2 -;; - -var nspin = 10 /* FIXME: pick a sane number, based on CPU count */ - -const mkmtx = { - -> [._state = Unlocked] -} - -const mtxlock = {mtx - var c - - /* - Uncontended case: we get an unlocked mutex, and we lock it. - */ - c = Locked - for var i = 0; i < nspin; i++ - c = xcas(&mtx._state, Unlocked, Locked) - if c == Unlocked - -> void - ;; - sys.sched_yield() - ;; - - /* - Contended case: we set the lock state to Contended. This indicates that there - the lock is locked, and we potentially have threads waiting on it, which means - that we will need to wake them up. - */ - if c == Locked - c = xchg(&mtx._state, Contended) - ;; - - while c != Unlocked - sys.umtx_op( \ - (&mtx._state : void#), \ - sys.Umtxwaituintpriv, \ - (Contended : uint64), \ - Zptr, Zptr) - c = xchg(&mtx._state, Contended) - ;; -} - -const mtxtrylock = {mtx - -> xcas(&mtx._state, Unlocked, Locked) == Unlocked -} - -const mtxunlock = {mtx - /* - Uncontended case: If the mutex state is not contended, and we still - are uncontended by the xchg() call, then it's safe to simply return; - nobody was waiting for us. - */ - if mtx._state == Contended - mtx._state = Unlocked - elif xchg(&mtx._state, Unlocked) == Locked - -> void - ;; - - /* wake all threads: for some reason nwake */ - sys.umtx_op((&mtx._state : void#), sys.Umtxwakepriv, 1, Zptr, Zptr) -} - diff --git a/lib/thread/mutex+linux.myr b/lib/thread/mutex+futex.myr index 22b9904..298285d 100644 --- a/lib/thread/mutex+linux.myr +++ b/lib/thread/mutex+futex.myr @@ -1,12 +1,12 @@ -use std use sys use "atomic" use "common" +use "futex" pkg thread = type mutex = struct - _state : int32 + _state : uint64 ;; const mkmtx : (-> mutex) @@ -49,7 +49,7 @@ const mtxlock = {mtx ;; while c != Unlocked - sys.futex(&mtx._state, sys.Futexwait | sys.Futexpriv, Contended, Zptr, Zptr, 0) + ftxwait(&mtx._state, Contended, Zptr) c = xchg(&mtx._state, Contended) ;; } @@ -71,6 +71,5 @@ const mtxunlock = {mtx ;; /* wake one thread */ - sys.futex(&mtx._state, sys.Futexwake | sys.Futexpriv, 1, Zptr, Zptr, 0) + ftxwake(&mtx._state) } - diff --git a/lib/thread/mutex+openbsd:6.2.myr b/lib/thread/mutex+openbsd:6.2.myr deleted file mode 100644 index f2648f5..0000000 --- a/lib/thread/mutex+openbsd:6.2.myr +++ /dev/null @@ -1,76 +0,0 @@ -use std -use sys - -use "atomic" -use "common" - -pkg thread = - type mutex = struct - _state : uint32 - ;; - - const mkmtx : (-> mutex) - const mtxlock : (mtx : mutex# -> void) - const mtxtrylock : (mtx : mutex# -> bool) - const mtxunlock : (mtx : mutex# -> void) - - pkglocal const Unlocked : uint32 = 0 - pkglocal const Locked : uint32 = 1 - pkglocal const Contended : uint32 = 2 -;; - -var nspin = 10 /* FIXME: pick a sane number, based on CPU count */ - -const mkmtx = { - -> [._state = Unlocked] -} - -const mtxlock = {mtx - var c - - /* - Uncontended case: we get an unlocked mutex, and we lock it. - */ - c = Locked - for var i = 0; i < nspin; i++ - c = xcas(&mtx._state, Unlocked, Locked) - if c == Unlocked - -> void - ;; - ;; - - /* - Contended case: we set the lock state to Contended. This indicates that there - the lock is locked, and we potentially have threads waiting on it, which means - that we will need to wake them up. - */ - if c == Locked - c = xchg(&mtx._state, Contended) - ;; - - while c != Unlocked - sys.futex(&mtx._state, sys.Futexwait, (Contended : int), Zptr, Zptr) - c = xchg(&mtx._state, Contended) - ;; -} - -const mtxtrylock = {mtx - -> xcas(&mtx._state, Unlocked, Locked) == Unlocked -} - -const mtxunlock = {mtx - /* - Uncontended case: If the mutex state is not contended, and we still - are uncontended by the xchg() call, then it's safe to simply return; - nobody was waiting for us. - */ - if mtx._state == Contended - mtx._state = Unlocked - elif xchg(&mtx._state, Unlocked) == Locked - -> void - ;; - - /* wake one thread */ - sys.futex(&mtx._state, sys.Futexwake, 1, Zptr, Zptr) -} - diff --git a/lib/thread/sem+futex.myr b/lib/thread/sem+futex.myr new file mode 100644 index 0000000..aa00f4c --- /dev/null +++ b/lib/thread/sem+futex.myr @@ -0,0 +1,55 @@ +use std +use sys + +use "atomic" +use "common" +use "futex" + +pkg thread = + type sem = struct + _val : uint64 + ;; + + const mksem : (v : uint32 -> sem) + const semwait : (s : sem# -> void) + const semtrywait : (s : sem# -> bool) + const sempost : (s : sem# -> void) +;; + +const mksem = {v + -> [._val = (v : uint64)] +} + +const semwait = {s + var v = 0 + + for ; ; + while (v = s._val) > 0 + if xcas(&s._val, v, v - 1) == v + -> void + ;; + ;; + ftxwait(&s._val, v, Zptr) + ;; + -> void /* Unreachable */ +} + +const semtrywait = {s + for ; ; + var v = xget(&s._val) + if v == 0 + -> false + ;; + if xcas(&s._val, v, v - 1) == v + -> true + ;; + ;; + -> false /* Unreachable */ +} + +const sempost = {s + std.assert((xadd(&s._val, 1) : uint32) != ~0x0, "error: semaphore overflowed\n") + + /* Unconditionally wake one waiter */ + ftxwake(&s._val) +} diff --git a/lib/thread/sem+plan9.myr b/lib/thread/sem+plan9.myr new file mode 100644 index 0000000..84ecce7 --- /dev/null +++ b/lib/thread/sem+plan9.myr @@ -0,0 +1,54 @@ +use std +use sys + +use "atomic" +use "common" + +pkg thread = + type sem = struct + _user : int32 + _kern : int32 + ;; + + const mksem : (v : uint32 -> sem) + const semwait : (s : sem# -> void) + const semtrywait : (s : sem# -> bool) + const sempost : (s : sem# -> void) +;; + +const mksem = {v + -> [._user = v, ._kern = 0] +} + +const semwait = {s + var u = xadd(&s._user, -1) + std.assert(u != 0xffffffff, "error: semaphore underflowed\n") + + /* When the userspace value is negative we fall back on the kernel semaphore */ + if u <= 0 + while sys.semacquire((&s._kern : uint32), 1) < 0 + /* Interrupted, retry */ + ;; + ;; +} + +const semtrywait = {s + for ; ; + var u = xget(&s._user) + if u <= 0 + -> false + ;; + if xcas(&s._user, u, u - 1) == u + -> true + ;; + ;; + -> false /* Unreachable */ +} + +const sempost = {s + var u = xadd(&s._user, 1) + std.assert(u != 0x7fffffff, "error: semaphore overflowed\n") + if u < 0 + sys.semrelease((&s._kern : uint32), 1) + ;; +} diff --git a/lib/thread/sem.myr b/lib/thread/sem.myr new file mode 100644 index 0000000..7aea35f --- /dev/null +++ b/lib/thread/sem.myr @@ -0,0 +1,66 @@ +use std + +use "atomic" + +pkg thread = + type sem = struct + _val : uint32 + ;; + + const mksem : (v : uint32 -> sem) + const semwait : (s : sem# -> void) + const semtrywait : (s : sem# -> bool) + const sempost : (s : sem# -> void) +;; + +const mksem = {v + -> [._val = v] +} + +const semwait = {s + var v = 0 + + for var i = 0; i < 1000; i++ + if (v = xget(&s._val)) != 0 && xcas(&s._val, v, v - 1) == v + -> void + ;; + ;; + + for var i = 0; i < 1000; i++ + if (v = xget(&s._val)) != 0 && xcas(&s._val, v, v - 1) == v + -> void + ;; + std.nanosleep(10_000) + ;; + + for var i = 0; i < 1000; i++ + if (v = xget(&s._val)) != 0 && xcas(&s._val, v, v - 1) == v + -> void + ;; + std.nanosleep(100_000) + ;; + + for ; ; + if (v = xget(&s._val)) != 0 && xcas(&s._val, v, v - 1) == v + -> void + ;; + std.nanosleep(1_000_000) + ;; +} + +const semtrywait = {s + for ; ; + var v = xget(&s._val) + if v == 0 + -> false + ;; + if xcas(&s._val, v, v - 1) == v + -> true + ;; + ;; + -> false /* Unreachable */ +} + +const sempost = {s + std.assert(xadd(&s._val, 1) != ~0x0, "error: semaphore overflowed\n") +} diff --git a/lib/thread/spawn+osx.myr b/lib/thread/spawn+osx.myr index 4e99598..417e64a 100644 --- a/lib/thread/spawn+osx.myr +++ b/lib/thread/spawn+osx.myr @@ -79,7 +79,6 @@ const spawnstk = {fn, sz const getstk = {sz var p, m - std.put("allocating stack {x}\n", sz) p = sys.mmap((0 : byte#), sz, sys.Mprotrw, sys.Mpriv | sys.Manon, -1, 0) if p == sys.Mapbad -> p diff --git a/lib/thread/test/sem.myr b/lib/thread/test/sem.myr new file mode 100644 index 0000000..8569738 --- /dev/null +++ b/lib/thread/test/sem.myr @@ -0,0 +1,42 @@ +use std +use thread + +const Cap = 4 + +var write : uint32 = 0, read : uint32 = 0 +var lock +var len +var cap +var buf : int[Cap] +var done : uint32 = 0 + +const main = { + lock = thread.mkmtx() + len = thread.mksem(0) + cap = thread.mksem(Cap) + + thread.spawn({ + for var i = 0; i < 100000; i++ + thread.semwait(&cap) + thread.mtxlock(&lock) + buf[write++ & (Cap - 1)] = i + thread.mtxunlock(&lock) + thread.sempost(&len) + ;; + }) + + thread.spawn({ + for var i = 0; i < 100000; i++ + thread.semwait(&len) + thread.mtxlock(&lock) + std.assert(i == buf[read++ & (Cap - 1)], "semaphores are broken\n") + thread.mtxunlock(&lock) + thread.sempost(&cap) + ;; + thread.xset(&done, 1) + }) + + while thread.xget(&done) == 0 + std.nanosleep(1_000_000) + ;; +} |