summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/thread/atomic-impl+plan9-x64.s23
-rw-r--r--lib/thread/atomic-impl+x64.s40
-rw-r--r--lib/thread/atomic.myr14
-rw-r--r--lib/thread/bld.sub4
-rw-r--r--lib/thread/do.myr17
-rw-r--r--lib/thread/future.myr60
-rw-r--r--lib/thread/test/do.myr30
-rw-r--r--lib/thread/test/future.myr53
8 files changed, 164 insertions, 77 deletions
diff --git a/lib/thread/atomic-impl+plan9-x64.s b/lib/thread/atomic-impl+plan9-x64.s
index 22318eb..06224d2 100644
--- a/lib/thread/atomic-impl+plan9-x64.s
+++ b/lib/thread/atomic-impl+plan9-x64.s
@@ -1,3 +1,7 @@
+// get variants
+TEXT thread$xget8+0(SB),1,$0
+ MOVB (DI), AX
+ RET
TEXT thread$xget32+0(SB),1,$0
MOVL (DI), AX
RET
@@ -8,6 +12,10 @@ TEXT thread$xgetp+0(SB),1,$0
MOVQ (DI), AX
RET
+// set variants
+TEXT thread$xset8+0(SB),1,$0
+ MOVB SI, (DI)
+ RET
TEXT thread$xset32+0(SB),1,$0
MOVL SI, (DI)
RET
@@ -18,6 +26,11 @@ TEXT thread$xsetp+0(SB),1,$0
MOVQ SI, (DI)
RET
+// add variants
+TEXT thread$xadd8+0(SB),1,$0
+ LOCK; XADDB SI, (DI)
+ MOVL SI, AX
+ RET
TEXT thread$xadd32+0(SB),1,$0
LOCK; XADDL SI, (DI)
MOVL SI, AX
@@ -31,6 +44,11 @@ TEXT thread$xaddp+0(SB),1,$0
MOVQ SI, AX
RET
+// cas variants
+TEXT thread$xcas8+0(SB),1,$0
+ MOVL SI, AX
+ LOCK; CMPXCHGB DX, (DI)
+ RET
TEXT thread$xcas32+0(SB),1,$0
MOVL SI, AX
LOCK; CMPXCHGL DX, (DI)
@@ -44,6 +62,11 @@ TEXT thread$xcasp+0(SB),1,$0
LOCK; CMPXCHGQ DX, (DI)
RET
+// xchg variants
+TEXT thread$xchg8+0(SB),1,$0
+ MOVL SI, AX
+ LOCK; XCHGB (DI), AX
+ RET
TEXT thread$xchg32+0(SB),1,$0
MOVL SI, AX
LOCK; XCHGL (DI), AX
diff --git a/lib/thread/atomic-impl+x64.s b/lib/thread/atomic-impl+x64.s
index 82dbd78..3f28277 100644
--- a/lib/thread/atomic-impl+x64.s
+++ b/lib/thread/atomic-impl+x64.s
@@ -1,3 +1,10 @@
+# get variants
+.globl thread$xget8
+.globl _thread$xget8
+thread$xget8:
+_thread$xget8:
+ movb (%rdi), %al
+ ret
.globl thread$xget32
.globl _thread$xget32
thread$xget32:
@@ -15,6 +22,13 @@ _thread$xgetp:
movq (%rdi), %rax
ret
+# set variants
+.globl thread$xset8
+.globl _thread$xset8
+thread$xset8:
+_thread$xset8:
+ movl %esi, (%rdi)
+ ret
.globl thread$xset32
.globl _thread$xset32
thread$xset32:
@@ -32,6 +46,14 @@ _thread$xsetp:
movq %rsi, (%rdi)
ret
+# add variants
+.globl thread$xadd8
+.globl _thread$xadd8
+thread$xadd8:
+_thread$xadd8:
+ lock xaddb %sil, (%rdi)
+ movb %sil,%al
+ ret
.globl thread$xadd32
.globl _thread$xadd32
thread$xadd32:
@@ -51,6 +73,14 @@ _thread$xaddp:
movq %rsi,%rax
ret
+# cas variants
+.globl thread$xcas8
+.globl _thread$xcas8
+thread$xcas8:
+_thread$xcas8:
+ movb %sil, %al
+ lock cmpxchgb %dl, (%rdi)
+ ret
.globl thread$xcas32
.globl _thread$xcas32
thread$xcas32:
@@ -66,10 +96,18 @@ thread$xcas64:
thread$xcasp:
_thread$xcas64:
_thread$xcasp:
- movq %rsi, %rax
+ movq %rsi, %rax
lock cmpxchgq %rdx, (%rdi)
ret
+# xchg variants
+.globl thread$xchg8
+.globl _thread$xchg8
+thread$xchg8:
+_thread$xchg8:
+ movb %sil, %al
+ lock xchgb (%rdi), %al
+ ret
.globl thread$xchg32
.globl _thread$xchg32
thread$xchg32:
diff --git a/lib/thread/atomic.myr b/lib/thread/atomic.myr
index f71528f..04a103d 100644
--- a/lib/thread/atomic.myr
+++ b/lib/thread/atomic.myr
@@ -10,6 +10,7 @@ pkg thread =
xchg : (p : @a#, new : @a -> @a)
;;
+ impl atomic bool
impl atomic int32
impl atomic int64
impl atomic uint32
@@ -20,27 +21,40 @@ pkg thread =
generic xcasptr : (p : @a##, old : std.option(@a#), new : std.option(@a#) -> std.option(@a#))
generic xchgptr : (p : @a##, new : std.option(@a#) -> std.option(@a#))
+ pkglocal extern const xget8 : (p : uint8# -> uint8)
pkglocal extern const xget32 : (p : uint32# -> uint32)
pkglocal extern const xget64 : (p : uint64# -> uint64)
pkglocal extern const xgetp : (p : std.intptr# -> std.intptr)
+ pkglocal extern const xset8 : (p : uint8#, v : uint8 -> void)
pkglocal extern const xset32 : (p : uint32#, v : uint32 -> void)
pkglocal extern const xset64 : (p : uint64#, v : uint64 -> void)
pkglocal extern const xsetp : (p : std.intptr#, v : std.intptr -> void)
+ pkglocal extern const xadd8 : (p : uint8#, v : uint8 -> uint8)
pkglocal extern const xadd32 : (p : uint32#, v : uint32 -> uint32)
pkglocal extern const xadd64 : (p : uint64#, v : uint64 -> uint64)
pkglocal extern const xaddp : (p : std.intptr#, v : std.intptr -> std.intptr)
+ pkglocal extern const xcas8 : (p : uint8#, old: uint8, new : uint8 -> uint8)
pkglocal extern const xcas32 : (p : uint32#, old: uint32, new : uint32 -> uint32)
pkglocal extern const xcas64 : (p : uint64#, old: uint64, new : uint64 -> uint64)
pkglocal extern const xcasp : (p : std.intptr#, old: std.intptr, new : std.intptr -> std.intptr)
+ pkglocal extern const xchg8 : (p : uint8#, v : uint8 -> uint8)
pkglocal extern const xchg32 : (p : uint32#, v : uint32 -> uint32)
pkglocal extern const xchg64 : (p : uint64#, v : uint64 -> uint64)
pkglocal extern const xchgp : (p : std.intptr#, v : std.intptr -> std.intptr)
;;
+impl atomic bool =
+ xget = {p; -> (xget8((p : uint8#)) : bool)}
+ xset = {p, v; xset8((p : uint8#), (v : uint8))}
+ xadd = {p, v; -> (xadd8((p : uint8#), (v : uint8)) : bool)}
+ xcas = {p, old, new; -> (xcas8((p : uint8#), (old : uint8), (new : uint8)) : bool)}
+ xchg = {p, v; -> (xchg8((p : uint8#), (v : uint8)) : bool)}
+;;
+
impl atomic int32 =
xget = {p; -> (xget32((p : uint32#)) : int32)}
xset = {p, v; xset32((p : uint32#), (v : uint32))}
diff --git a/lib/thread/bld.sub b/lib/thread/bld.sub
index eff3c5b..c380091 100644
--- a/lib/thread/bld.sub
+++ b/lib/thread/bld.sub
@@ -2,6 +2,10 @@ lib thread =
common.myr
hookstd.myr # install thread hooks
+ # higher level apis
+ future.myr
+ do.myr
+
# generic fallbacks
condvar.myr
mutex.myr
diff --git a/lib/thread/do.myr b/lib/thread/do.myr
new file mode 100644
index 0000000..3a4449f
--- /dev/null
+++ b/lib/thread/do.myr
@@ -0,0 +1,17 @@
+use std
+
+use "future"
+use "spawn"
+
+pkg thread =
+ generic do : (fn : (-> @a) -> future(@a)#)
+;;
+
+generic do = {fn
+ var r
+
+ r = mkfut()
+ spawn({; futput(r, fn()) })
+ -> r
+}
+
diff --git a/lib/thread/future.myr b/lib/thread/future.myr
index 5637e6d..ada1d19 100644
--- a/lib/thread/future.myr
+++ b/lib/thread/future.myr
@@ -1,63 +1,55 @@
use std
use "mutex"
+use "condvar"
+use "atomic"
pkg thread =
type future(@a) = struct
mtx : mutex
+ cv : cond
set : bool
val : @a
;;
- generic mkfut : (-> future(@a))
- generic futset : (fut : future(@a)#, val : @a -> bool)
+ generic mkfut : (-> future(@a)#)
+ generic futput : (fut : future(@a)#, val : @a -> void)
generic futget : (fut : future(@a)# -> @a)
- generic futtryget : (fut : future(@a)# -> std.option(@a))
- generic futclear : (fut : future(@a)# -> void)
+ generic futpeek : (fut : future(@a)# -> @a)
;;
-const Unset = 0
-const Waiting = 1
-const Set = 2
-
generic mkfut = {
var fut
- fut = [.mtx = mkmtx() ]
- mtxlock(&fut.mtx)
+ fut = std.alloc()
+ fut.mtx = mkmtx()
+ fut.cv = mkcond(&fut.mtx)
+ fut.set = false
-> fut
}
-generic futset = {fut, val
- if fut.set
- -> false
- ;;
- /* compiler doesn't reorder shit */
+generic futput = {fut, val
fut.val = val
- fut.set = true
- mtxunlock(&fut.mtx)
- -> true
-}
-
-generic futtryget = {fut
- var val
-
- if !fut.set
- -> `std.None
- ;;
- mtxlock(&fut.mtx)
- val = fut.val
- mtxunlock(&fut.mtx)
- -> `std.Some val
+ xset(&fut.set, true)
+ condsignal(&fut.cv)
}
generic futget = {fut
var val
-
- mtxlock(&fut.mtx)
- val = fut.val
- mtxunlock(&fut.mtx)
+
+ val = futpeek(fut)
+ std.free(fut)
-> val
}
+generic futpeek = {fut
+ if !xget(&fut.set)
+ mtxlock(&fut.mtx)
+ if !xget(&fut.set)
+ condwait(&fut.cv)
+ ;;
+ mtxunlock(&fut.mtx)
+ ;;
+ -> fut.val
+}
diff --git a/lib/thread/test/do.myr b/lib/thread/test/do.myr
new file mode 100644
index 0000000..e2e64e8
--- /dev/null
+++ b/lib/thread/test/do.myr
@@ -0,0 +1,30 @@
+use std
+use thread
+
+const main = {
+ match std.espork(["echo", "hello"][:])
+ | `std.Err e:
+ std.fatal("could not spork\n")
+ | `std.Ok (pid, in, out, err):
+ std.close(in)
+ var w = thread.do({;-> std.wait(pid)})
+ var o = thread.do({;-> std.fslurp(out)})
+ var e = thread.do({;-> std.fslurp(err)})
+
+ match thread.futget(w)
+ | `std.Wsuccess: /* ok */
+ | bad: std.fatal("bad wait: {}\n", bad)
+ ;;
+
+ match thread.futget(o)
+ | `std.Ok "hello\n": /* ok */
+ | bad: std.fatal("bad out: {}\n", bad)
+ ;;
+
+ match thread.futget(e)
+ | `std.Ok "": /* ok */
+ | bad: std.fatal("bad err: {}\n", bad)
+ ;;
+ ;;
+}
+
diff --git a/lib/thread/test/future.myr b/lib/thread/test/future.myr
index e69e1af..760404b 100644
--- a/lib/thread/test/future.myr
+++ b/lib/thread/test/future.myr
@@ -2,49 +2,18 @@ use std
use sys
use thread
-use "util"
-
-var fut
-var nready : int32
-var ndone : int32
-
const main = {
- nready = 0
- ndone = 0
- fut = thread.mkfut()
- /* set after we have some waiters */
- mkherd(100, getfuture)
- while nready != 100
- /* spin */
- ;;
- std.put("done waiting for ready\n")
- std.assert(ndone == 0, "thread proceeded too soon\n")
- thread.futset(&fut, 666)
- std.assert(thread.futset(&fut, 1) == false, "double set future\n")
- while ndone != 100
- /* spin */
- ;;
- std.put("double set future ok")
- /* start up a few more to make sure we can still read */
- mkherd(50, getfuture)
- while ndone != 150
- /* spin */
- ;;
+ var f
-
- /* set ahead of time */
- ndone = 0
- fut = thread.mkfut()
- thread.futset(&fut, 666)
- std.assert(thread.futset(&fut, 666) == false, "double set future\n")
- mkherd(100, getfuture)
- while ndone != 100
- /* spin */
- ;;
-}
+ f = thread.mkfut()
+ thread.futput(f, 123)
+ std.assert(thread.futget(f) == 123, "the future is broken\n")
-const getfuture = {
- thread.xadd(&nready, 1)
- std.assert(thread.futget(&fut) == 666, "wrong value gotten from future")
- thread.xadd(&ndone, 1)
+ f = thread.mkfut()
+ thread.spawn({
+ std.usleep(10_000)
+ thread.futput(f, 321)
+ })
+ std.assert(thread.futget(f) == 321, "the future is broken\n")
}
+