diff --git a/src/main/java/io/reactivex/subjects/CompletableSubject.java b/src/main/java/io/reactivex/subjects/CompletableSubject.java
new file mode 100644
index 0000000000..dd5394d899
--- /dev/null
+++ b/src/main/java/io/reactivex/subjects/CompletableSubject.java
@@ -0,0 +1,229 @@
+/**
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.subjects;
+
+import java.util.concurrent.atomic.*;
+
+import io.reactivex.*;
+import io.reactivex.annotations.*;
+import io.reactivex.disposables.Disposable;
+import io.reactivex.plugins.RxJavaPlugins;
+
+/**
+ * Represents a hot Completable-like source and consumer of events similar to Subjects.
+ *
+ * All methods are thread safe. Calling onComplete multiple
+ * times has no effect. Calling onError multiple times relays the Throwable to
+ * the RxJavaPlugins' error handler.
+ *
+ * The CompletableSubject doesn't store the Disposables coming through onSubscribe but
+ * disposes them once the other onXXX methods were called (terminal state reached).
+ * @since 2.0.5 - experimental
+ */
+@Experimental
+public final class CompletableSubject extends Completable implements CompletableObserver {
+
+ final AtomicReference observers;
+
+ static final CompletableDisposable[] EMPTY = new CompletableDisposable[0];
+
+ static final CompletableDisposable[] TERMINATED = new CompletableDisposable[0];
+
+ final AtomicBoolean once;
+ Throwable error;
+
+ /**
+ * Creates a fresh CompletableSubject.
+ * @return the new CompletableSubject instance
+ */
+ @CheckReturnValue
+ public static CompletableSubject create() {
+ return new CompletableSubject();
+ }
+
+ CompletableSubject() {
+ once = new AtomicBoolean();
+ observers = new AtomicReference(EMPTY);
+ }
+
+ @Override
+ public void onSubscribe(Disposable d) {
+ if (observers.get() == TERMINATED) {
+ d.dispose();
+ }
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ if (e == null) {
+ e = new NullPointerException("Null errors are not allowed in 2.x");
+ }
+ if (once.compareAndSet(false, true)) {
+ this.error = e;
+ for (CompletableDisposable md : observers.getAndSet(TERMINATED)) {
+ md.actual.onError(e);
+ }
+ } else {
+ RxJavaPlugins.onError(e);
+ }
+ }
+
+ @Override
+ public void onComplete() {
+ if (once.compareAndSet(false, true)) {
+ for (CompletableDisposable md : observers.getAndSet(TERMINATED)) {
+ md.actual.onComplete();
+ }
+ }
+ }
+
+ @Override
+ protected void subscribeActual(CompletableObserver observer) {
+ CompletableDisposable md = new CompletableDisposable(observer, this);
+ observer.onSubscribe(md);
+ if (add(md)) {
+ if (md.isDisposed()) {
+ remove(md);
+ }
+ } else {
+ Throwable ex = error;
+ if (ex != null) {
+ observer.onError(ex);
+ } else {
+ observer.onComplete();
+ }
+ }
+ }
+
+ boolean add(CompletableDisposable inner) {
+ for (;;) {
+ CompletableDisposable[] a = observers.get();
+ if (a == TERMINATED) {
+ return false;
+ }
+
+ int n = a.length;
+
+ CompletableDisposable[] b = new CompletableDisposable[n + 1];
+ System.arraycopy(a, 0, b, 0, n);
+ b[n] = inner;
+ if (observers.compareAndSet(a, b)) {
+ return true;
+ }
+ }
+ }
+
+ void remove(CompletableDisposable inner) {
+ for (;;) {
+ CompletableDisposable[] a = observers.get();
+ int n = a.length;
+ if (n == 0) {
+ return;
+ }
+
+ int j = -1;
+
+ for (int i = 0; i < n; i++) {
+ if (a[i] == inner) {
+ j = i;
+ break;
+ }
+ }
+
+ if (j < 0) {
+ return;
+ }
+ CompletableDisposable[] b;
+ if (n == 1) {
+ b = EMPTY;
+ } else {
+ b = new CompletableDisposable[n - 1];
+ System.arraycopy(a, 0, b, 0, j);
+ System.arraycopy(a, j + 1, b, j, n - j - 1);
+ }
+
+ if (observers.compareAndSet(a, b)) {
+ return;
+ }
+ }
+ }
+
+ /**
+ * Returns the terminal error if this CompletableSubject has been terminated with an error, null otherwise.
+ * @return the terminal error or null if not terminated or not with an error
+ */
+ public Throwable getThrowable() {
+ if (observers.get() == TERMINATED) {
+ return error;
+ }
+ return null;
+ }
+
+ /**
+ * Returns true if this CompletableSubject has been terminated with an error.
+ * @return true if this CompletableSubject has been terminated with an error
+ */
+ public boolean hasThrowable() {
+ return observers.get() == TERMINATED && error != null;
+ }
+
+ /**
+ * Returns true if this CompletableSubject has been completed.
+ * @return true if this CompletableSubject has been completed
+ */
+ public boolean hasComplete() {
+ return observers.get() == TERMINATED && error == null;
+ }
+
+ /**
+ * Returns true if this CompletableSubject has observers.
+ * @return true if this CompletableSubject has observers
+ */
+ public boolean hasObservers() {
+ return observers.get().length != 0;
+ }
+
+ /**
+ * Returns the number of current observers.
+ * @return the number of current observers
+ */
+ /* test */ int observerCount() {
+ return observers.get().length;
+ }
+
+ static final class CompletableDisposable
+ extends AtomicReference implements Disposable {
+ private static final long serialVersionUID = -7650903191002190468L;
+
+ final CompletableObserver actual;
+
+ CompletableDisposable(CompletableObserver actual, CompletableSubject parent) {
+ this.actual = actual;
+ lazySet(parent);
+ }
+
+ @Override
+ public void dispose() {
+ CompletableSubject parent = getAndSet(null);
+ if (parent != null) {
+ parent.remove(this);
+ }
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return get() == null;
+ }
+ }
+}
diff --git a/src/main/java/io/reactivex/subjects/MaybeSubject.java b/src/main/java/io/reactivex/subjects/MaybeSubject.java
new file mode 100644
index 0000000000..3d03d5a637
--- /dev/null
+++ b/src/main/java/io/reactivex/subjects/MaybeSubject.java
@@ -0,0 +1,277 @@
+/**
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.subjects;
+
+import java.util.concurrent.atomic.*;
+
+import io.reactivex.*;
+import io.reactivex.annotations.*;
+import io.reactivex.disposables.Disposable;
+import io.reactivex.plugins.RxJavaPlugins;
+
+/**
+ * Represents a hot Maybe-like source and consumer of events similar to Subjects.
+ *
+ * All methods are thread safe. Calling onSuccess or onComplete multiple
+ * times has no effect. Calling onError multiple times relays the Throwable to
+ * the RxJavaPlugins' error handler.
+ *
+ * The MaybeSubject doesn't store the Disposables coming through onSubscribe but
+ * disposes them once the other onXXX methods were called (terminal state reached).
+ * @param the value type received and emitted
+ * @since 2.0.5 - experimental
+ */
+@Experimental
+public final class MaybeSubject extends Maybe implements MaybeObserver {
+
+ final AtomicReference[]> observers;
+
+ @SuppressWarnings("rawtypes")
+ static final MaybeDisposable[] EMPTY = new MaybeDisposable[0];
+
+ @SuppressWarnings("rawtypes")
+ static final MaybeDisposable[] TERMINATED = new MaybeDisposable[0];
+
+ final AtomicBoolean once;
+ T value;
+ Throwable error;
+
+ /**
+ * Creates a fresh MaybeSubject.
+ * @param the value type received and emitted
+ * @return the new MaybeSubject instance
+ */
+ @CheckReturnValue
+ public static MaybeSubject create() {
+ return new MaybeSubject();
+ }
+
+ @SuppressWarnings("unchecked")
+ MaybeSubject() {
+ once = new AtomicBoolean();
+ observers = new AtomicReference[]>(EMPTY);
+ }
+
+ @Override
+ public void onSubscribe(Disposable d) {
+ if (observers.get() == TERMINATED) {
+ d.dispose();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void onSuccess(T value) {
+ if (value == null) {
+ onError(new NullPointerException("Null values are not allowed in 2.x"));
+ return;
+ }
+ if (once.compareAndSet(false, true)) {
+ this.value = value;
+ for (MaybeDisposable md : observers.getAndSet(TERMINATED)) {
+ md.actual.onSuccess(value);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void onError(Throwable e) {
+ if (e == null) {
+ e = new NullPointerException("Null errors are not allowed in 2.x");
+ }
+ if (once.compareAndSet(false, true)) {
+ this.error = e;
+ for (MaybeDisposable md : observers.getAndSet(TERMINATED)) {
+ md.actual.onError(e);
+ }
+ } else {
+ RxJavaPlugins.onError(e);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void onComplete() {
+ if (once.compareAndSet(false, true)) {
+ for (MaybeDisposable md : observers.getAndSet(TERMINATED)) {
+ md.actual.onComplete();
+ }
+ }
+ }
+
+ @Override
+ protected void subscribeActual(MaybeObserver super T> observer) {
+ MaybeDisposable md = new MaybeDisposable(observer, this);
+ observer.onSubscribe(md);
+ if (add(md)) {
+ if (md.isDisposed()) {
+ remove(md);
+ }
+ } else {
+ Throwable ex = error;
+ if (ex != null) {
+ observer.onError(ex);
+ } else {
+ T v = value;
+ if (v == null) {
+ observer.onComplete();
+ } else {
+ observer.onSuccess(v);
+ }
+ }
+ }
+ }
+
+ boolean add(MaybeDisposable inner) {
+ for (;;) {
+ MaybeDisposable[] a = observers.get();
+ if (a == TERMINATED) {
+ return false;
+ }
+
+ int n = a.length;
+ @SuppressWarnings("unchecked")
+ MaybeDisposable[] b = new MaybeDisposable[n + 1];
+ System.arraycopy(a, 0, b, 0, n);
+ b[n] = inner;
+ if (observers.compareAndSet(a, b)) {
+ return true;
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ void remove(MaybeDisposable inner) {
+ for (;;) {
+ MaybeDisposable[] a = observers.get();
+ int n = a.length;
+ if (n == 0) {
+ return;
+ }
+
+ int j = -1;
+
+ for (int i = 0; i < n; i++) {
+ if (a[i] == inner) {
+ j = i;
+ break;
+ }
+ }
+
+ if (j < 0) {
+ return;
+ }
+ MaybeDisposable[] b;
+ if (n == 1) {
+ b = EMPTY;
+ } else {
+ b = new MaybeDisposable[n - 1];
+ System.arraycopy(a, 0, b, 0, j);
+ System.arraycopy(a, j + 1, b, j, n - j - 1);
+ }
+
+ if (observers.compareAndSet(a, b)) {
+ return;
+ }
+ }
+ }
+
+ /**
+ * Returns the success value if this MaybeSubject was terminated with a success value.
+ * @return the success value or null
+ */
+ public T getValue() {
+ if (observers.get() == TERMINATED) {
+ return value;
+ }
+ return null;
+ }
+
+ /**
+ * Returns true if this MaybeSubject was terminated with a success value.
+ * @return true if this MaybeSubject was terminated with a success value
+ */
+ public boolean hasValue() {
+ return observers.get() == TERMINATED && value != null;
+ }
+
+ /**
+ * Returns the terminal error if this MaybeSubject has been terminated with an error, null otherwise.
+ * @return the terminal error or null if not terminated or not with an error
+ */
+ public Throwable getThrowable() {
+ if (observers.get() == TERMINATED) {
+ return error;
+ }
+ return null;
+ }
+
+ /**
+ * Returns true if this MaybeSubject has been terminated with an error.
+ * @return true if this MaybeSubject has been terminated with an error
+ */
+ public boolean hasThrowable() {
+ return observers.get() == TERMINATED && error != null;
+ }
+
+ /**
+ * Returns true if this MaybeSubject has been completed.
+ * @return true if this MaybeSubject has been completed
+ */
+ public boolean hasComplete() {
+ return observers.get() == TERMINATED && value == null && error == null;
+ }
+
+ /**
+ * Returns true if this MaybeSubject has observers.
+ * @return true if this MaybeSubject has observers
+ */
+ public boolean hasObservers() {
+ return observers.get().length != 0;
+ }
+
+ /**
+ * Returns the number of current observers.
+ * @return the number of current observers
+ */
+ /* test */ int observerCount() {
+ return observers.get().length;
+ }
+
+ static final class MaybeDisposable
+ extends AtomicReference> implements Disposable {
+ private static final long serialVersionUID = -7650903191002190468L;
+
+ final MaybeObserver super T> actual;
+
+ MaybeDisposable(MaybeObserver super T> actual, MaybeSubject parent) {
+ this.actual = actual;
+ lazySet(parent);
+ }
+
+ @Override
+ public void dispose() {
+ MaybeSubject parent = getAndSet(null);
+ if (parent != null) {
+ parent.remove(this);
+ }
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return get() == null;
+ }
+ }
+}
diff --git a/src/main/java/io/reactivex/subjects/SingleSubject.java b/src/main/java/io/reactivex/subjects/SingleSubject.java
new file mode 100644
index 0000000000..2f8f076d5c
--- /dev/null
+++ b/src/main/java/io/reactivex/subjects/SingleSubject.java
@@ -0,0 +1,254 @@
+/**
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.subjects;
+
+import java.util.concurrent.atomic.*;
+
+import io.reactivex.*;
+import io.reactivex.annotations.*;
+import io.reactivex.disposables.Disposable;
+import io.reactivex.plugins.RxJavaPlugins;
+
+/**
+ * Represents a hot Single-like source and consumer of events similar to Subjects.
+ *
+ * All methods are thread safe. Calling onSuccess multiple
+ * times has no effect. Calling onError multiple times relays the Throwable to
+ * the RxJavaPlugins' error handler.
+ *
+ * The SingleSubject doesn't store the Disposables coming through onSubscribe but
+ * disposes them once the other onXXX methods were called (terminal state reached).
+ * @param the value type received and emitted
+ * @since 2.0.5 - experimental
+ */
+@Experimental
+public final class SingleSubject extends Single implements SingleObserver {
+
+ final AtomicReference[]> observers;
+
+ @SuppressWarnings("rawtypes")
+ static final SingleDisposable[] EMPTY = new SingleDisposable[0];
+
+ @SuppressWarnings("rawtypes")
+ static final SingleDisposable[] TERMINATED = new SingleDisposable[0];
+
+ final AtomicBoolean once;
+ T value;
+ Throwable error;
+
+ /**
+ * Creates a fresh SingleSubject.
+ * @param the value type received and emitted
+ * @return the new SingleSubject instance
+ */
+ @CheckReturnValue
+ public static SingleSubject create() {
+ return new SingleSubject();
+ }
+
+ @SuppressWarnings("unchecked")
+ SingleSubject() {
+ once = new AtomicBoolean();
+ observers = new AtomicReference[]>(EMPTY);
+ }
+
+ @Override
+ public void onSubscribe(Disposable d) {
+ if (observers.get() == TERMINATED) {
+ d.dispose();
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void onSuccess(T value) {
+ if (value == null) {
+ onError(new NullPointerException("Null values are not allowed in 2.x"));
+ return;
+ }
+ if (once.compareAndSet(false, true)) {
+ this.value = value;
+ for (SingleDisposable md : observers.getAndSet(TERMINATED)) {
+ md.actual.onSuccess(value);
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void onError(Throwable e) {
+ if (e == null) {
+ e = new NullPointerException("Null errors are not allowed in 2.x");
+ }
+ if (once.compareAndSet(false, true)) {
+ this.error = e;
+ for (SingleDisposable md : observers.getAndSet(TERMINATED)) {
+ md.actual.onError(e);
+ }
+ } else {
+ RxJavaPlugins.onError(e);
+ }
+ }
+
+ @Override
+ protected void subscribeActual(SingleObserver super T> observer) {
+ SingleDisposable md = new SingleDisposable(observer, this);
+ observer.onSubscribe(md);
+ if (add(md)) {
+ if (md.isDisposed()) {
+ remove(md);
+ }
+ } else {
+ Throwable ex = error;
+ if (ex != null) {
+ observer.onError(ex);
+ } else {
+ observer.onSuccess(value);
+ }
+ }
+ }
+
+ boolean add(SingleDisposable inner) {
+ for (;;) {
+ SingleDisposable[] a = observers.get();
+ if (a == TERMINATED) {
+ return false;
+ }
+
+ int n = a.length;
+ @SuppressWarnings("unchecked")
+ SingleDisposable[] b = new SingleDisposable[n + 1];
+ System.arraycopy(a, 0, b, 0, n);
+ b[n] = inner;
+ if (observers.compareAndSet(a, b)) {
+ return true;
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ void remove(SingleDisposable inner) {
+ for (;;) {
+ SingleDisposable[] a = observers.get();
+ int n = a.length;
+ if (n == 0) {
+ return;
+ }
+
+ int j = -1;
+
+ for (int i = 0; i < n; i++) {
+ if (a[i] == inner) {
+ j = i;
+ break;
+ }
+ }
+
+ if (j < 0) {
+ return;
+ }
+ SingleDisposable[] b;
+ if (n == 1) {
+ b = EMPTY;
+ } else {
+ b = new SingleDisposable[n - 1];
+ System.arraycopy(a, 0, b, 0, j);
+ System.arraycopy(a, j + 1, b, j, n - j - 1);
+ }
+
+ if (observers.compareAndSet(a, b)) {
+ return;
+ }
+ }
+ }
+
+ /**
+ * Returns the success value if this SingleSubject was terminated with a success value.
+ * @return the success value or null
+ */
+ public T getValue() {
+ if (observers.get() == TERMINATED) {
+ return value;
+ }
+ return null;
+ }
+
+ /**
+ * Returns true if this SingleSubject was terminated with a success value.
+ * @return true if this SingleSubject was terminated with a success value
+ */
+ public boolean hasValue() {
+ return observers.get() == TERMINATED && value != null;
+ }
+
+ /**
+ * Returns the terminal error if this SingleSubject has been terminated with an error, null otherwise.
+ * @return the terminal error or null if not terminated or not with an error
+ */
+ public Throwable getThrowable() {
+ if (observers.get() == TERMINATED) {
+ return error;
+ }
+ return null;
+ }
+
+ /**
+ * Returns true if this SingleSubject has been terminated with an error.
+ * @return true if this SingleSubject has been terminated with an error
+ */
+ public boolean hasThrowable() {
+ return observers.get() == TERMINATED && error != null;
+ }
+
+ /**
+ * Returns true if this SingleSubject has observers.
+ * @return true if this SingleSubject has observers
+ */
+ public boolean hasObservers() {
+ return observers.get().length != 0;
+ }
+
+ /**
+ * Returns the number of current observers.
+ * @return the number of current observers
+ */
+ /* test */ int observerCount() {
+ return observers.get().length;
+ }
+
+ static final class SingleDisposable
+ extends AtomicReference> implements Disposable {
+ private static final long serialVersionUID = -7650903191002190468L;
+
+ final SingleObserver super T> actual;
+
+ SingleDisposable(SingleObserver super T> actual, SingleSubject parent) {
+ this.actual = actual;
+ lazySet(parent);
+ }
+
+ @Override
+ public void dispose() {
+ SingleSubject parent = getAndSet(null);
+ if (parent != null) {
+ parent.remove(this);
+ }
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return get() == null;
+ }
+ }
+}
diff --git a/src/test/java/io/reactivex/subjects/CompletableSubjectTest.java b/src/test/java/io/reactivex/subjects/CompletableSubjectTest.java
new file mode 100644
index 0000000000..e6c953d1e0
--- /dev/null
+++ b/src/test/java/io/reactivex/subjects/CompletableSubjectTest.java
@@ -0,0 +1,226 @@
+/**
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.subjects;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.Test;
+
+import io.reactivex.*;
+import io.reactivex.disposables.*;
+import io.reactivex.observers.TestObserver;
+import io.reactivex.plugins.RxJavaPlugins;
+import io.reactivex.schedulers.Schedulers;
+
+public class CompletableSubjectTest {
+
+ @Test
+ public void once() {
+ CompletableSubject cs = CompletableSubject.create();
+
+ TestObserver to = cs.test();
+
+ cs.onComplete();
+
+ List errors = TestHelper.trackPluginErrors();
+ try {
+ cs.onError(new IOException());
+
+ TestHelper.assertError(errors, 0, IOException.class);
+ } finally {
+ RxJavaPlugins.reset();
+ }
+ cs.onComplete();
+
+ to.assertResult();
+ }
+
+ @Test
+ public void error() {
+ CompletableSubject cs = CompletableSubject.create();
+
+ assertFalse(cs.hasComplete());
+ assertFalse(cs.hasThrowable());
+ assertNull(cs.getThrowable());
+ assertFalse(cs.hasObservers());
+ assertEquals(0, cs.observerCount());
+
+ TestObserver to = cs.test();
+
+ to.assertEmpty();
+
+ assertTrue(cs.hasObservers());
+ assertEquals(1, cs.observerCount());
+
+ cs.onError(new IOException());
+
+ assertFalse(cs.hasComplete());
+ assertTrue(cs.hasThrowable());
+ assertTrue(cs.getThrowable().toString(), cs.getThrowable() instanceof IOException);
+ assertFalse(cs.hasObservers());
+ assertEquals(0, cs.observerCount());
+
+ to.assertFailure(IOException.class);
+
+ cs.test().assertFailure(IOException.class);
+
+ assertFalse(cs.hasComplete());
+ assertTrue(cs.hasThrowable());
+ assertTrue(cs.getThrowable().toString(), cs.getThrowable() instanceof IOException);
+ assertFalse(cs.hasObservers());
+ assertEquals(0, cs.observerCount());
+ }
+
+ @Test
+ public void complete() {
+ CompletableSubject cs = CompletableSubject.create();
+
+ assertFalse(cs.hasComplete());
+ assertFalse(cs.hasThrowable());
+ assertNull(cs.getThrowable());
+ assertFalse(cs.hasObservers());
+ assertEquals(0, cs.observerCount());
+
+ TestObserver to = cs.test();
+
+ to.assertEmpty();
+
+ assertTrue(cs.hasObservers());
+ assertEquals(1, cs.observerCount());
+
+ cs.onComplete();
+
+ assertTrue(cs.hasComplete());
+ assertFalse(cs.hasThrowable());
+ assertNull(cs.getThrowable());
+ assertFalse(cs.hasObservers());
+ assertEquals(0, cs.observerCount());
+
+ to.assertResult();
+
+ cs.test().assertResult();
+
+ assertTrue(cs.hasComplete());
+ assertFalse(cs.hasThrowable());
+ assertNull(cs.getThrowable());
+ assertFalse(cs.hasObservers());
+ assertEquals(0, cs.observerCount());
+ }
+
+ @Test
+ public void nullThrowable() {
+ CompletableSubject cs = CompletableSubject.create();
+
+ TestObserver to = cs.test();
+
+ cs.onError(null);
+
+ to.assertFailure(NullPointerException.class);
+ }
+
+ @Test
+ public void cancelOnArrival() {
+ CompletableSubject.create()
+ .test(true)
+ .assertEmpty();
+ }
+
+ @Test
+ public void cancelOnArrival2() {
+ CompletableSubject cs = CompletableSubject.create();
+
+ cs.test();
+
+ cs
+ .test(true)
+ .assertEmpty();
+ }
+
+ @Test
+ public void dispose() {
+ TestHelper.checkDisposed(CompletableSubject.create());
+ }
+
+ @Test
+ public void disposeTwice() {
+ CompletableSubject.create()
+ .subscribe(new CompletableObserver() {
+ @Override
+ public void onSubscribe(Disposable d) {
+ assertFalse(d.isDisposed());
+
+ d.dispose();
+ d.dispose();
+
+ assertTrue(d.isDisposed());
+ }
+
+ @Override
+ public void onError(Throwable e) {
+
+ }
+
+ @Override
+ public void onComplete() {
+
+ }
+ });
+ }
+
+ @Test
+ public void onSubscribeDispose() {
+ CompletableSubject cs = CompletableSubject.create();
+
+ Disposable d = Disposables.empty();
+
+ cs.onSubscribe(d);
+
+ assertFalse(d.isDisposed());
+
+ cs.onComplete();
+
+ d = Disposables.empty();
+
+ cs.onSubscribe(d);
+
+ assertTrue(d.isDisposed());
+ }
+
+ @Test
+ public void addRemoveRace() {
+ for (int i = 0; i < 500; i++) {
+ final CompletableSubject cs = CompletableSubject.create();
+
+ final TestObserver to = cs.test();
+
+ Runnable r1 = new Runnable() {
+ @Override
+ public void run() {
+ cs.test();
+ }
+ };
+
+ Runnable r2 = new Runnable() {
+ @Override
+ public void run() {
+ to.cancel();
+ }
+ };
+ TestHelper.race(r1, r2, Schedulers.single());
+ }
+ }
+}
diff --git a/src/test/java/io/reactivex/subjects/MaybeSubjectTest.java b/src/test/java/io/reactivex/subjects/MaybeSubjectTest.java
new file mode 100644
index 0000000000..4e04212757
--- /dev/null
+++ b/src/test/java/io/reactivex/subjects/MaybeSubjectTest.java
@@ -0,0 +1,297 @@
+/**
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.subjects;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.junit.Test;
+
+import io.reactivex.*;
+import io.reactivex.disposables.*;
+import io.reactivex.observers.TestObserver;
+import io.reactivex.plugins.RxJavaPlugins;
+import io.reactivex.schedulers.Schedulers;
+
+public class MaybeSubjectTest {
+
+ @Test
+ public void success() {
+ MaybeSubject ms = MaybeSubject.create();
+
+ assertFalse(ms.hasValue());
+ assertNull(ms.getValue());
+ assertFalse(ms.hasComplete());
+ assertFalse(ms.hasThrowable());
+ assertNull(ms.getThrowable());
+ assertFalse(ms.hasObservers());
+ assertEquals(0, ms.observerCount());
+
+ TestObserver to = ms.test();
+
+ to.assertEmpty();
+
+ assertTrue(ms.hasObservers());
+ assertEquals(1, ms.observerCount());
+
+ ms.onSuccess(1);
+
+ assertTrue(ms.hasValue());
+ assertEquals(1, ms.getValue().intValue());
+ assertFalse(ms.hasComplete());
+ assertFalse(ms.hasThrowable());
+ assertNull(ms.getThrowable());
+ assertFalse(ms.hasObservers());
+ assertEquals(0, ms.observerCount());
+
+ to.assertResult(1);
+
+ ms.test().assertResult(1);
+
+ assertTrue(ms.hasValue());
+ assertEquals(1, ms.getValue().intValue());
+ assertFalse(ms.hasComplete());
+ assertFalse(ms.hasThrowable());
+ assertNull(ms.getThrowable());
+ assertFalse(ms.hasObservers());
+ assertEquals(0, ms.observerCount());
+ }
+
+ @Test
+ public void once() {
+ MaybeSubject ms = MaybeSubject.create();
+
+ TestObserver to = ms.test();
+
+ ms.onSuccess(1);
+ ms.onSuccess(2);
+
+ List errors = TestHelper.trackPluginErrors();
+ try {
+ ms.onError(new IOException());
+
+ TestHelper.assertError(errors, 0, IOException.class);
+ } finally {
+ RxJavaPlugins.reset();
+ }
+ ms.onComplete();
+
+ to.assertResult(1);
+ }
+
+ @Test
+ public void error() {
+ MaybeSubject ms = MaybeSubject.create();
+
+ assertFalse(ms.hasValue());
+ assertNull(ms.getValue());
+ assertFalse(ms.hasComplete());
+ assertFalse(ms.hasThrowable());
+ assertNull(ms.getThrowable());
+ assertFalse(ms.hasObservers());
+ assertEquals(0, ms.observerCount());
+
+ TestObserver to = ms.test();
+
+ to.assertEmpty();
+
+ assertTrue(ms.hasObservers());
+ assertEquals(1, ms.observerCount());
+
+ ms.onError(new IOException());
+
+ assertFalse(ms.hasValue());
+ assertNull(ms.getValue());
+ assertFalse(ms.hasComplete());
+ assertTrue(ms.hasThrowable());
+ assertTrue(ms.getThrowable().toString(), ms.getThrowable() instanceof IOException);
+ assertFalse(ms.hasObservers());
+ assertEquals(0, ms.observerCount());
+
+ to.assertFailure(IOException.class);
+
+ ms.test().assertFailure(IOException.class);
+
+ assertFalse(ms.hasValue());
+ assertNull(ms.getValue());
+ assertFalse(ms.hasComplete());
+ assertTrue(ms.hasThrowable());
+ assertTrue(ms.getThrowable().toString(), ms.getThrowable() instanceof IOException);
+ assertFalse(ms.hasObservers());
+ assertEquals(0, ms.observerCount());
+ }
+
+ @Test
+ public void complete() {
+ MaybeSubject ms = MaybeSubject.create();
+
+ assertFalse(ms.hasValue());
+ assertNull(ms.getValue());
+ assertFalse(ms.hasComplete());
+ assertFalse(ms.hasThrowable());
+ assertNull(ms.getThrowable());
+ assertFalse(ms.hasObservers());
+ assertEquals(0, ms.observerCount());
+
+ TestObserver to = ms.test();
+
+ to.assertEmpty();
+
+ assertTrue(ms.hasObservers());
+ assertEquals(1, ms.observerCount());
+
+ ms.onComplete();
+
+ assertFalse(ms.hasValue());
+ assertNull(ms.getValue());
+ assertTrue(ms.hasComplete());
+ assertFalse(ms.hasThrowable());
+ assertNull(ms.getThrowable());
+ assertFalse(ms.hasObservers());
+ assertEquals(0, ms.observerCount());
+
+ to.assertResult();
+
+ ms.test().assertResult();
+
+ assertFalse(ms.hasValue());
+ assertNull(ms.getValue());
+ assertTrue(ms.hasComplete());
+ assertFalse(ms.hasThrowable());
+ assertNull(ms.getThrowable());
+ assertFalse(ms.hasObservers());
+ assertEquals(0, ms.observerCount());
+ }
+
+ @Test
+ public void nullValue() {
+ MaybeSubject ms = MaybeSubject.create();
+
+ TestObserver to = ms.test();
+
+ ms.onSuccess(null);
+
+ to.assertFailure(NullPointerException.class);
+ }
+
+ @Test
+ public void nullThrowable() {
+ MaybeSubject ms = MaybeSubject.create();
+
+ TestObserver to = ms.test();
+
+ ms.onError(null);
+
+ to.assertFailure(NullPointerException.class);
+ }
+
+ @Test
+ public void cancelOnArrival() {
+ MaybeSubject.create()
+ .test(true)
+ .assertEmpty();
+ }
+
+ @Test
+ public void cancelOnArrival2() {
+ MaybeSubject ms = MaybeSubject.create();
+
+ ms.test();
+
+ ms
+ .test(true)
+ .assertEmpty();
+ }
+
+ @Test
+ public void dispose() {
+ TestHelper.checkDisposed(MaybeSubject.create());
+ }
+
+ @Test
+ public void disposeTwice() {
+ MaybeSubject.create()
+ .subscribe(new MaybeObserver