A modern reactive programming library for Java, designed to be simpler and easier to use than RxJava, while maintaining all essential features.
- 4 Reactive Types: Observable, Single, Maybe, and Completable
- Simple and Intuitive API: Easier to learn than RxJava
- 60+ Operators: map, filter, flatMap, merge, zip, concat, retry, and more
- Error Handling: onErrorReturn, onErrorResumeNext, retry
- Schedulers: Support for asynchronous execution (io, computation, newThread)
- Seamless Conversions: Complete interoperability between reactive types
- Type-Safe: Leverages Java's type system
- Zero Dependencies: Standalone library using only standard Java
- Java 11 or higher
- Maven 3.6+ or Gradle 7.0+ (optional for build)
Add the GitHub Packages repository to your pom.xml:
<repositories>
<repository>
<id>github</id>
<url>https://maven.pkg.github.com/yasmramos/jreactive</url>
</repository>
</repositories>
<dependency>
<groupId>com.reactive</groupId>
<artifactId>jreactive</artifactId>
<version>1.0.0-alpha</version>
</dependency>Configure authentication in your ~/.m2/settings.xml:
<servers>
<server>
<id>github</id>
<username>YOUR_GITHUB_USERNAME</username>
<password>YOUR_GITHUB_TOKEN</password>
</server>
</servers>Add to your build.gradle:
repositories {
maven {
url = uri("https://maven.pkg.github.com/yasmramos/jreactive")
credentials {
username = project.findProperty("gpr.user") ?: System.getenv("GITHUB_USERNAME")
password = project.findProperty("gpr.key") ?: System.getenv("GITHUB_TOKEN")
}
}
}
dependencies {
implementation 'com.reactive:jreactive:1.0.0-alpha'
}Download the latest release directly from GitHub:
| File | Description |
|---|---|
| jreactive-1.0.0-alpha.jar | Main library JAR |
| jreactive-1.0.0-alpha-sources.jar | Source code JAR |
Add to your classpath:
java -cp jreactive-1.0.0-alpha.jar:. YourApplicationgit clone https://github.com/yasmramos/jreactive.git
cd jreactive
mvn clean installAn Observable is a stream that can emit 0 or more elements, followed by a completion or error signal.
Observable<String> observable = Observable.just("Hello", "World");An Observer consumes events emitted by an Observable:
observable.subscribe(
item -> System.out.println("Received: " + item), // onNext
error -> System.err.println("Error: " + error), // onError
() -> System.out.println("Completed!") // onComplete
);Represents a subscription that can be cancelled:
Disposable subscription = observable.subscribe(item -> System.out.println(item));
subscription.dispose(); // Cancel the subscriptionThe library offers 4 reactive types for different use cases:
Use Observable when you have multiple elements or a data stream:
Observable.just(1, 2, 3, 4, 5)
.filter(x -> x % 2 == 0)
.subscribe(System.out::println);Use Single when there's always a single result:
Single.fromCallable(() -> fetchUser(123))
.map(user -> user.name)
.subscribe(
name -> System.out.println("User: " + name),
error -> System.err.println("Error: " + error)
);Use Maybe for lookups that might not have a result:
Maybe.fromCallable(() -> cache.get("key"))
.defaultIfEmpty("default-value")
.subscribe(System.out::println);Use Completable for operations without a result:
Completable.fromRunnable(() -> saveToDatabase(data))
.retry(3)
.subscribe(
() -> System.out.println("β Saved"),
error -> System.err.println("β Error")
);π Complete guide: See SINGLE_MAYBE_COMPLETABLE.md
Observable.just("A", "B", "C")
.subscribe(System.out::println);
// Output: A B CObservable.range(1, 5)
.map(n -> n * 2)
.subscribe(System.out::println);
// Output: 2 4 6 8 10Observable.range(1, 10)
.filter(n -> n % 2 == 0)
.subscribe(System.out::println);
// Output: 2 4 6 8 10Observable.just("Hello", "World")
.flatMap(word -> Observable.fromIterable(Arrays.asList(word.split(""))))
.subscribe(System.out::println);
// Output: H e l l o W o r l dObservable.create(emitter -> {
emitter.onNext("Item 1");
throw new RuntimeException("Error!");
})
.onErrorReturn(error -> "Default value")
.subscribe(System.out::println);
// Output: Item 1, Default valueObservable.just("Task")
.subscribeOn(Schedulers.io()) // Execute on I/O thread
.observeOn(Schedulers.computation()) // Observe on computation thread
.subscribe(item -> System.out.println(
item + " on " + Thread.currentThread().getName()
));Observable<String> obs1 = Observable.just("A", "B", "C");
Observable<Integer> obs2 = Observable.just(1, 2, 3);
Observable.zip(obs1, obs2, (letter, number) -> letter + number)
.subscribe(System.out::println);
// Output: A1 B2 C3int[] attempt = {0};
Observable.create(emitter -> {
if (++attempt[0] < 3) {
throw new RuntimeException("Failed");
}
emitter.onNext("Success!");
emitter.onComplete();
})
.retry(5)
.subscribe(System.out::println);
// Output: Success! (after 3 attempts)| Method | Description |
|---|---|
just(T...) |
Creates an Observable that emits the specified elements |
fromIterable(Iterable<T>) |
Creates an Observable from an Iterable |
range(int, int) |
Emits a range of numbers |
create(OnSubscribe) |
Creates a custom Observable |
empty() |
Observable that completes immediately |
error(Throwable) |
Observable that emits an error |
interval(long, TimeUnit) |
Emits incremental numbers periodically |
| Method | Description |
|---|---|
map(Function) |
Transforms each element |
flatMap(Function) |
Transforms each element into an Observable and flattens |
concatMap(Function) |
Like flatMap but maintains order |
switchMap(Function) |
Cancels the previous Observable on switch |
| Method | Description |
|---|---|
filter(Predicate) |
Filters elements based on a condition |
take(long) |
Takes only the first n elements |
skip(long) |
Skips the first n elements |
distinctUntilChanged() |
Filters consecutive duplicate elements |
first(T) |
Emits only the first element |
last(T) |
Emits only the last element |
| Method | Description |
|---|---|
concat(Observable...) |
Concatenates Observables sequentially |
merge(Observable...) |
Merges Observables concurrently |
zip(Observable, Observable, BiFunction) |
Combines pairs of elements |
defaultIfEmpty(T) |
Emits default value if empty |
| Method | Description |
|---|---|
doOnNext(Consumer) |
Executes action for each element |
doOnError(Consumer) |
Executes action on error |
doOnComplete(Runnable) |
Executes action on completion |
doOnSubscribe(Consumer) |
Executes action on subscription |
doOnDispose(Runnable) |
Executes action on disposal |
| Method | Description |
|---|---|
onErrorReturn(Function) |
Emits default value on error |
onErrorResumeNext(Function) |
Continues with another Observable on error |
retry() |
Retries infinitely |
retry(long) |
Retries n times |
| Method | Description |
|---|---|
subscribeOn(Scheduler) |
Specifies where subscription executes |
observeOn(Scheduler) |
Specifies where events are observed |
Schedulers.io()- For I/O operations (cached thread pool)Schedulers.computation()- For computations (fixed pool based on CPU cores)Schedulers.newThread()- Creates a new thread per taskSchedulers.immediate()- Executes immediately on current thread
# With Maven
mvn exec:java -Dexec.mainClass="com.reactive.examples.BasicExamples"
mvn exec:java -Dexec.mainClass="com.reactive.examples.AdvancedExamples"
# With Gradle
gradle run
gradle runAdvancedExamples
# Build and run JAR
mvn package
java -jar target/jreactive-1.0.0-jar-with-dependencies.jarjreactive/
βββ src/
β βββ main/java/com/reactive/
β β βββ core/ # Core classes
β β β βββ Observable.java
β β β βββ Observer.java
β β β βββ Disposable.java
β β β βββ Emitter.java
β β β βββ ...
β β βββ operators/ # Operator implementations
β β β βββ ObservableMap.java
β β β βββ ObservableFilter.java
β β β βββ ObservableFlatMap.java
β β β βββ ...
β β βββ schedulers/ # Scheduler system
β β βββ Scheduler.java
β β βββ Schedulers.java
β βββ examples/java/com/reactive/examples/
β βββ BasicExamples.java
β βββ AdvancedExamples.java
βββ pom.xml # Maven configuration
βββ build.gradle # Gradle configuration
βββ README.md # This file
| Feature | JReactive | RxJava |
|---|---|---|
| Learning curve | ββ Low | ββββ High |
| API | Simplified | Complete |
| Operators | Essential | All |
| Backpressure | Basic | Advanced |
| Size | Lightweight | Large |
| Dependencies | None | Several |
| Use case | Medium projects | Large projects |
Contributions are welcome! For major changes:
- Fork the project
- Create a feature branch (
git checkout -b feature/AmazingFeature) - Commit your changes (
git commit -m 'Add some AmazingFeature') - Push to the branch (
git push origin feature/AmazingFeature) - Open a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.
Yasmany Ramos GarcΓa
- Inspired by RxJava and Project Reactor
- Designed to be more accessible for developers learning reactive programming
- Focused on simplicity without sacrificing essential functionality
- Single, Maybe & Completable - Guide to specialized reactive types
- Subjects - Hot Observables and multicasting
- Specialized Types - Overview of all specialized types
- Benchmarks - Methodology and setup
- Benchmark Results - Performance vs RxJava
- Complete Documentation - Full documentation index
- ReactiveX - ReactiveX Specification
- Reactive Streams - Reactive Streams Specification
- Examples included in
src/examples/java/com/reactive/examples/
Questions or suggestions? Open an issue in the repository.