LiveStream.java
package com.surrealdb;
import java.util.Optional;
/**
* Blocking iterator over live query notifications returned by
* {@link Surreal#selectLive(String)}.
*
* <p>Typical usage:
* <pre>{@code
* try (LiveStream stream = surreal.selectLive("person")) {
* while (true) {
* Optional<LiveNotification> n = stream.next();
* if (!n.isPresent()) break; // stream closed
* process(n.get());
* }
* }
* }</pre>
*
* <p><b>Thread safety:</b> {@link #next()} may be called from one thread while
* {@link #close()} is called from another. The {@code close()} call will
* unblock any thread currently waiting inside {@code next()}. The native
* handle is declared {@code volatile} so that the zeroing performed by
* {@code close()} is immediately visible to concurrent {@code next()} callers.
* Concurrent calls to {@code next()} from multiple threads are serialized by
* a mutex in the native layer.
*/
public class LiveStream implements AutoCloseable {
static {
Loader.loadNative();
}
/**
* Pointer to the native {@code LiveStreamChannel}. Zeroed by
* {@link #close()} after the native resources have been released. Declared
* {@code volatile} so that a {@code close()} on one thread is visible to a
* concurrent {@code next()} on another thread.
*/
private volatile long handle;
LiveStream(long handle) {
this.handle = handle;
}
/**
* Blocks until the next notification is available, or the stream ends.
*
* <p>Returns {@link Optional#empty()} when the stream has been closed
* (either explicitly via {@link #close()} or because the server ended the
* live query). If the underlying live query encounters an error, a
* {@link SurrealException} is thrown.
*
* @return the next notification, or empty if the stream has ended
* @throws SurrealException
* if the live query encounters an error
*/
public Optional<LiveNotification> next() {
if (handle == 0) {
return Optional.empty();
}
LiveNotification n = nextNative(handle);
return n == null ? Optional.empty() : Optional.of(n);
}
/**
* Releases the live query and stops receiving notifications.
*
* <p>If another thread is blocked inside {@link #next()}, it will be
* unblocked and will return {@link Optional#empty()}. This method is
* idempotent: calling it more than once has no effect.
*/
@Override
public void close() {
if (handle != 0) {
releaseNative(handle);
handle = 0;
}
}
private static native LiveNotification nextNative(long handle);
private static native void releaseNative(long handle);
}