Skip to content

Commit c89a578

Browse files
author
Luc Duponcheel
committed
final release: that's it folks!
1 parent 1e61cd5 commit c89a578

9 files changed

+207
-253
lines changed

src/main/scala/bescala/Active.scala

+74
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
package bescala
2+
3+
import java.util.concurrent.TimeUnit
4+
5+
import bescala.Util.{Atomic, async}
6+
7+
import scala.language.{higherKinds, implicitConversions}
8+
9+
object Active extends Common {
10+
11+
override type M[A] = java.util.concurrent.Future[A]
12+
13+
// actively blocking get
14+
override def fromM[A](ma: M[A]): A = ma.get
15+
16+
// we do not care to much about timeout
17+
override def toM[A](a: => A): M[A] = new M[A] {
18+
def get = a
19+
20+
def get(timeout: Long, units: TimeUnit) = get
21+
22+
def isDone = true
23+
24+
def isCancelled = false
25+
26+
def cancel(evenIfRunning: Boolean): Boolean = false
27+
}
28+
29+
override def map[A, B](pa: Par[A])(a2b: A => B): Par[B] =
30+
es => {
31+
val ma: M[A] = pa(es)
32+
toM(a2b(fromM(ma)))
33+
}
34+
35+
// it is *very* important to define `ma` and `mb` outside of `toM(ab2c(fromM(ma), fromM(mb)))`
36+
// override def map2[A, B, C](pa: Par[A], pb: Par[B])(ab2c: (A, B) => C): Par[C] =
37+
// es => {
38+
// val ma = pa(es)
39+
// val mb = pb(es)
40+
// toM(ab2c(fromM(ma), fromM(mb)))
41+
// }
42+
43+
override def map2[A, B, C](pa: Par[A], pb: Par[B])(ab2c: (A, B) => C): Par[C] =
44+
es => {
45+
var optionalMa: Option[M[A]] = None
46+
var optionalMb: Option[M[B]] = None
47+
val atomicC: Atomic[M[C]] = new Atomic[M[C]]
48+
val combinerActor = new Actor[Either[M[A], M[B]]](es)({
49+
case Left(ma) =>
50+
if (optionalMb.isDefined) atomicC.setValue(toM(ab2c(ma.get, optionalMb.get.get)))
51+
else optionalMa = Some(ma)
52+
case Right(mb) =>
53+
if (optionalMa.isDefined) atomicC.setValue(toM(ab2c(optionalMa.get.get, mb.get)))
54+
else optionalMb = Some(mb)
55+
})
56+
combinerActor ! Left(pa(es))
57+
combinerActor ! Right(pb(es))
58+
59+
atomicC.getValue
60+
}
61+
62+
override def flatMap[A, B](pa: Par[A])(a2pb: A => Par[B]): Par[B] =
63+
es => {
64+
val ma: M[A] = pa(es)
65+
a2pb(fromM(ma))(es)
66+
}
67+
68+
override def fork[A](pa: => Par[A]): Par[A] =
69+
es => {
70+
val ma: M[A] = pa(es)
71+
async(es)(fromM(ma))
72+
}
73+
74+
}

src/main/scala/bescala/ActiveCommon.scala

-40
This file was deleted.

src/main/scala/bescala/Actor.scala

+2-3
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,9 @@ package bescala
33
import java.util.concurrent.ExecutorService
44
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
55

6-
import scala.annotation.tailrec
76
import bescala.Util.async
87

9-
import scala.util.{Left, Right}
8+
import scala.annotation.tailrec
109

1110

1211
// node
@@ -26,7 +25,7 @@ class Actor[M]
2625

2726
val nullM = null.asInstanceOf[M]
2827

29-
private val maxNumberOfProcessedMessages = 2
28+
private val maxNumberOfProcessedMessages = 20
3029

3130
private val suspended = new AtomicInteger(1)
3231

src/main/scala/bescala/Common.scala

+57-44
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@ package bescala
22

33
import java.util.concurrent.ExecutorService
44

5-
import language.{higherKinds, implicitConversions}
6-
7-
import Util.async
5+
import scala.language.{higherKinds, implicitConversions}
86

97
trait Common {
108

@@ -24,97 +22,112 @@ trait Common {
2422
// abstract M[A] related
2523
//
2624

27-
def fromM[A](ma: M[A]): A
25+
def fromM[A](ma: M[A]): A
2826

29-
def toM[A](a: => A): M[A]
27+
def toM[A](a: => A): M[A]
3028

3129
//
32-
// abstract applicative and monad Par[A] features
30+
// abstract applicative and monadic Par[A] computational features
3331
//
3432

35-
def map[A, B](parA: Par[A])(a2b: A => B): Par[B]
33+
def map[A, B](pa: Par[A])(a2b: A => B): Par[B]
3634

37-
def map2[A, B, C](parA: Par[A], parB: Par[B])(ab2c: (A, B) => C): Par[C]
35+
def map2[A, B, C](pa: Par[A], pb: Par[B])(ab2c: (A, B) => C): Par[C]
3836

39-
def flatMap[A, B](parA: Par[A])(a2pb: A => Par[B]): Par[B]
37+
def flatMap[A, B](pa: Par[A])(a2pb: A => Par[B]): Par[B]
4038

4139
//
42-
// only extra abstract Par[A] feature
40+
// only extra abstract Par[A] computational feature
4341
//
4442

45-
def fork[A](parA: => Par[A]): Par[A]
43+
def fork[A](pa: => Par[A]): Par[A]
4644

4745
//
4846
// concrete
4947
//
5048

49+
// monadic unit in terms of toM
5150
def unit[A](a: => A): Par[A] =
52-
es =>
53-
toM(a)
51+
es =>
52+
toM(a)
53+
54+
// monadic run in terms of fromM
55+
def run[A](es: ExecutorService)(pa: => Par[A]): A =
56+
fromM(pa(es))
5457

55-
def run[A](es: ExecutorService)(parA: => Par[A]): A =
56-
fromM(parA(es))
58+
//
59+
// more concrete stuff
60+
//
5761

58-
def join[A](parParA: Par[Par[A]]): Par[A] =
59-
flatMap(parParA)(identity)
62+
def join[A](ppa: Par[Par[A]]): Par[A] =
63+
flatMap(ppa)(identity)
6064

61-
def choice[A](parBoolean: Par[Boolean])(trueParA: Par[A], falseParA: Par[A]): Par[A] =
62-
flatMap(parBoolean)(b => if (b) trueParA else falseParA)
65+
def choice[A](pb: Par[Boolean])(tpa: Par[A], fpa: Par[A]): Par[A] =
66+
flatMap(pb)(b => if (b) tpa else fpa)
6367

64-
def choiceN[A](parInt: Par[Int])(parAs: List[Par[A]]): Par[A] =
65-
flatMap(parInt)(parAs(_))
68+
def choiceN[A](pi: Par[Int])(pas: List[Par[A]]): Par[A] =
69+
flatMap(pi)(pas(_))
6670

67-
def choiceMap[K,V](parK: Par[K])(choices: Map[K, Par[V]]): Par[V] =
68-
flatMap(parK)(choices)
71+
def choiceMap[K, V](pk: Par[K])(choices: Map[K, Par[V]]): Par[V] =
72+
flatMap(pk)(choices)
6973

70-
def sequence[A](parAs: List[Par[A]]): Par[List[A]] =
71-
parAs.foldRight[Par[List[A]]](unit(List()))(map2(_, _)(_ :: _))
74+
// quadratic instead of linear `pas.foldRight[Par[List[A]]](unit(List()))(map2(_, _)(_ :: _))`
75+
def sequence[A](pas: List[Par[A]]): Par[List[A]] = {
76+
if (pas.isEmpty) unit(List())
77+
else if (pas.length == 1) map(pas.head)(List(_))
78+
else {
79+
val (lpas, rpas) = pas.splitAt(pas.length / 2)
80+
val plas = sequence(lpas)
81+
val pras = sequence(rpas)
82+
map2(plas, pras)(_ ++ _)
83+
}
84+
}
7285

7386
def mapList[A, B](as: List[A])(a2b: A => B): Par[List[B]] =
7487
sequence(as.map(forkedFunction(a2b)))
7588

76-
def filterList[A](as: List[A])(predicateA: A => Boolean): Par[List[A]] =
77-
map(sequence(as map (a => unit(if (predicateA(a)) List(a) else List()))))(_.flatten)
89+
def filterList[A](as: List[A])(a2b: A => Boolean): Par[List[A]] =
90+
map(sequence(as map (a => unit(if (a2b(a)) List(a) else List()))))(_.flatten)
7891

7992
//
80-
// forked versions
93+
// some forked versions
8194
//
8295

8396
def forkedUnit[A](a: => A): Par[A] =
8497
fork(unit(a))
8598

86-
def forkedMap[A, B](parA: Par[A])(a2b: A => B): Par[B] =
87-
map(fork(parA))(a2b)
88-
// fork(map(fork(parA))(a2b))
99+
def forkedMap[A, B](pa: Par[A])(a2b: A => B): Par[B] =
100+
map(fork(pa))(a2b)
89101

90-
def forkedMap2[A,B,C](parA: Par[A], parB: Par[B])(ab2c: (A,B) => C): Par[C] =
91-
map2(fork(parA), fork(parB))(ab2c)
92-
// fork(map2(fork(parA), fork(parB))(ab2c))
102+
def forkedMap2[A, B, C](pa: Par[A], pb: Par[B])(ab2c: (A, B) => C): Par[C] =
103+
map2(fork(pa), fork(pb))(ab2c)
93104

94-
def forkedSequence[A](parAs: List[Par[A]]): Par[List[A]] = fork {
95-
if (parAs.isEmpty) unit(List())
96-
else if (parAs.length == 1) forkedMap(parAs.head)(List(_))
105+
// also quadratic
106+
def forkedSequence[A](pas: List[Par[A]]): Par[List[A]] = {
107+
if (pas.isEmpty) unit(List())
108+
else if (pas.length == 1) forkedMap(pas.head)(List(_))
97109
else {
98-
val (leftParAs, rightParAs) = parAs.splitAt(parAs.length/2)
99-
forkedMap2(forkedSequence(leftParAs), forkedSequence(rightParAs))(_ ++ _)
110+
val (lpas, rpas) = pas.splitAt(pas.length / 2)
111+
forkedMap2(forkedSequence(lpas), forkedSequence(rpas))(_ ++ _)
100112
}
101113
}
102114

103115
def forkedFunction[A, B](a2b: A => B): A => Par[B] =
104-
a => forkedUnit(a2b(a))
116+
a =>
117+
forkedUnit(a2b(a))
105118

106119
def forkedMapList[A, B](as: List[A])(a2b: A => B): Par[List[B]] =
107120
forkedSequence(as.map(forkedFunction(a2b)))
108121

109-
def forkedFilterList[A](as: List[A])(predicateA: A => Boolean): Par[List[A]] =
110-
forkedMap(forkedSequence(as map (a => forkedUnit(if (predicateA(a)) List(a) else List()))))(_.flatten)
122+
def forkedFilterList[A](as: List[A])(a2b: A => Boolean): Par[List[A]] =
123+
forkedMap(forkedSequence(as map (a => forkedUnit(if (a2b(a)) List(a) else List()))))(_.flatten)
111124

112125
//
113126
// example: sorting
114127
//
115128

116-
def sort[A : Ordering](parAs: Par[List[A]]) = map(parAs)(_.sorted)
129+
def sort[A: Ordering](pas: Par[List[A]]) = map(pas)(_.sorted)
117130

118-
def forkedSort[A : Ordering](parAs: Par[List[A]]) = forkedMap(parAs)(_.sorted)
131+
def forkedSort[A: Ordering](pas: Par[List[A]]) = forkedMap(pas)(_.sorted)
119132

120133
}

src/main/scala/bescala/FutureActive.scala

-36
This file was deleted.

src/main/scala/bescala/IdentityActive.scala

-42
This file was deleted.

0 commit comments

Comments
 (0)