diff --git a/java/util/concurrent/atomic/annotations.xml b/java/util/concurrent/atomic/annotations.xml
new file mode 100644
index 0000000..271a48e
--- /dev/null
+++ b/java/util/concurrent/atomic/annotations.xml
@@ -0,0 +1,5 @@
+
+ -
+
+
+
\ No newline at end of file
diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java
index b432fea..34ec7f1 100644
--- a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java
+++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java
@@ -16,6 +16,8 @@
package cn.nextop.rxjava.share.practices;
+import java.util.concurrent.atomic.AtomicInteger;
+
import cn.nextop.rxjava.share.util.type.Tuple2;
import io.reactivex.Observable;
@@ -30,6 +32,13 @@ public class Practice1 {
* 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始
*/
public Observable> indexable(Observable observable) {
- throw new UnsupportedOperationException("implementation");
+// // Wrong
+// return observable.map( e -> { return new Tuple2<>(e.charAt(0) - "a".charAt(0) + 1, e); } );
+
+// // Bad, has side effect
+// AtomicInteger a = new AtomicInteger(0);
+// return observable.map( e -> { int index = a.addAndGet(1); return new Tuple2<>(index, e); } );
+
+ return observable.map(e -> new Tuple2(1, e)).scan( (pre, cur) -> new Tuple2(pre.getV1() + 1, cur.getV2()));
}
}
diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java
index 08c7dcd..871f174 100644
--- a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java
+++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java
@@ -19,9 +19,22 @@
import cn.nextop.rxjava.share.util.type.Tuple2;
import io.reactivex.Observable;
+import io.reactivex.Observer;
+import io.reactivex.Scheduler;
import io.reactivex.Single;
+import io.reactivex.schedulers.Schedulers;
+import io.reactivex.subjects.PublishSubject;
+import javafx.collections.ObservableList;
+import java.util.Dictionary;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Iterator;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.security.auth.Subject;
/**
* @author Baoyi Chen
@@ -34,7 +47,28 @@ public class Practice2 {
* 返回: Observable[("a", 2), ("b", 1), ("c", 2)]
*/
public Observable> wordCount1(Observable words) {
- throw new UnsupportedOperationException("implementation");
+// // Passed, but not good
+// Hashtable info = new Hashtable();
+// words.map( e -> { Integer a = info.get(e); a = (a == null ? 1 : a+1); info.put(e, a); return e; } ).subscribe();
+// System.out.println(info.toString());
+// Observable< Tuple2 > ob = Observable.< Tuple2 >create( e -> {
+// Enumeration en = info.keys();
+// while (en.hasMoreElements()) {
+// String k = en.nextElement(); Integer v = info.get(k);
+// e.onNext(new Tuple2(k, v));
+// }
+// e.onComplete();
+// });
+//
+// return ob;
+
+ //AtomicInteger a = new AtomicInteger(0);
+ Observable< Tuple2 > ob = Observable.< Tuple2 >create( emitter -> {
+ words.groupBy( e -> e ).subscribe( e ->{ e.count().subscribe( count -> { emitter.onNext(new Tuple2(e.getKey(), count.intValue())); } ); });
+ emitter.onComplete();
+ });
+
+ return ob;
}
/*
@@ -43,7 +77,10 @@ public Observable> wordCount1(Observable words)
* 返回: Single[Map{a=2, b=1, c=2}]
*/
public Single