Class SimplePushEventSourceImpl<T,U extends java.util.concurrent.BlockingQueue<PushEvent<? extends T>>>
- java.lang.Object
-
- org.osgi.util.pushstream.SimplePushEventSourceImpl<T,U>
-
- All Implemented Interfaces:
java.lang.AutoCloseable,PushEventSource<T>,SimplePushEventSource<T>
class SimplePushEventSourceImpl<T,U extends java.util.concurrent.BlockingQueue<PushEvent<? extends T>>> extends java.lang.Object implements SimplePushEventSource<T>
-
-
Field Summary
Fields Modifier and Type Field Description private booleanclosedprivate java.util.List<PushEventConsumer<? super T>>connectedprivate Deferred<java.lang.Void>connectPromiseprivate java.lang.Objectlockprivate java.lang.RunnableonCloseprivate intparallelismprivate PromiseFactorypromiseFactoryprivate Uqueueprivate QueuePolicy<T,U>queuePolicyprivate PromiseFactorysameThreadprivate java.util.concurrent.Semaphoresemaphoreprivate booleanwaitForFinishes
-
Constructor Summary
Constructors Constructor Description SimplePushEventSourceImpl(PromiseFactory promiseFactory, QueuePolicy<T,U> queuePolicy, U queue, int parallelism, java.lang.Runnable onClose)
-
Method Summary
All Methods Instance Methods Concrete Methods Modifier and Type Method Description voidclose()Close this source.private voidclose(PushEvent<T> event)private voidcloseConsumer(PushEventConsumer<? super T> pec, PushEvent<T> event)private Promise<java.lang.Void>closedConnectPromise()Promise<java.lang.Void>connectPromise()This method can be used to delay event generation until an event source has connected.private Promise<java.lang.Long>deliver(java.util.List<PushEventConsumer<? super T>> toCall, PushEvent<T> event)private Promise<java.lang.Long>doCall(PushEvent<T> event, PushEventConsumer<? super T> pec)private voiddoSend(PushEventConsumer<? super T> pec, PushEvent<T> event)private Promise<java.lang.Long>doSendWithBackPressure(PushEventConsumer<? super T> pec, PushEvent<T> event)voidendOfStream()Close this source for now, but potentially reopen it later.private voidenqueueEvent(PushEvent<T> event)voiderror(java.lang.Throwable t)Close this source for now, but potentially reopen it later.private voidhandleReset(boolean resetWait)booleanisConnected()Determine whether there are anyPushEventConsumers for thisPushEventSource.java.lang.AutoCloseableopen(PushEventConsumer<? super T> pec)Open the asynchronous channel between the source and the consumer.voidpublish(T t)Asynchronously publish an event to this stream and all connectedPushEventConsumerinstances.private longsafePush(PushEventConsumer<? super T> pec, PushEvent<T> event)private voidstartWorker()
-
-
-
Field Detail
-
lock
private final java.lang.Object lock
-
promiseFactory
private final PromiseFactory promiseFactory
-
sameThread
private final PromiseFactory sameThread
-
queuePolicy
private final QueuePolicy<T,U extends java.util.concurrent.BlockingQueue<PushEvent<? extends T>>> queuePolicy
-
parallelism
private final int parallelism
-
semaphore
private final java.util.concurrent.Semaphore semaphore
-
connected
private final java.util.List<PushEventConsumer<? super T>> connected
-
onClose
private final java.lang.Runnable onClose
-
closed
private boolean closed
-
connectPromise
private Deferred<java.lang.Void> connectPromise
-
waitForFinishes
private boolean waitForFinishes
-
-
Constructor Detail
-
SimplePushEventSourceImpl
public SimplePushEventSourceImpl(PromiseFactory promiseFactory, QueuePolicy<T,U> queuePolicy, U queue, int parallelism, java.lang.Runnable onClose)
-
-
Method Detail
-
open
public java.lang.AutoCloseable open(PushEventConsumer<? super T> pec) throws java.lang.Exception
Description copied from interface:PushEventSourceOpen the asynchronous channel between the source and the consumer. The call returns anAutoCloseable. This can be closed, and should close the channel, including sending a Close event if the channel was not already closed. The returned object must be able to be closed multiple times without sending more than one Close events.- Specified by:
openin interfacePushEventSource<T>- Parameters:
pec- the consumer (not null)- Returns:
- a
AutoCloseablethat can be used to close the stream - Throws:
java.lang.Exception
-
closeConsumer
private void closeConsumer(PushEventConsumer<? super T> pec, PushEvent<T> event)
-
doSend
private void doSend(PushEventConsumer<? super T> pec, PushEvent<T> event)
-
doSendWithBackPressure
private Promise<java.lang.Long> doSendWithBackPressure(PushEventConsumer<? super T> pec, PushEvent<T> event)
-
safePush
private long safePush(PushEventConsumer<? super T> pec, PushEvent<T> event)
-
close
public void close()
Description copied from interface:SimplePushEventSourceClose this source. Calling this method indicates that there will never be any more events published by it. Calling this method sends a close event to all connected consumers. After calling this method anyPushEventConsumerthat tries toPushEventSource.open(PushEventConsumer)this source will immediately receive a close event, and will not see any remaining buffered events.- Specified by:
closein interfacejava.lang.AutoCloseable- Specified by:
closein interfaceSimplePushEventSource<T>
-
publish
public void publish(T t)
Description copied from interface:SimplePushEventSourceAsynchronously publish an event to this stream and all connectedPushEventConsumerinstances. When this method returns there is no guarantee that all consumers have been notified. Events published by a single thread will maintain their relative ordering, however they may be interleaved with events from other threads.- Specified by:
publishin interfaceSimplePushEventSource<T>
-
endOfStream
public void endOfStream()
Description copied from interface:SimplePushEventSourceClose this source for now, but potentially reopen it later. Calling this method asynchronously sends a close event to all connected consumers and then disconnects them. Any events previously queued by theSimplePushEventSource.publish(Object)method will be delivered before this close event.After calling this method any
PushEventConsumerthat wishes mayPushEventSource.open(PushEventConsumer)this source, and will receive subsequent events.- Specified by:
endOfStreamin interfaceSimplePushEventSource<T>
-
error
public void error(java.lang.Throwable t)
Description copied from interface:SimplePushEventSourceClose this source for now, but potentially reopen it later. Calling this method asynchronously sends an error event to all connected consumers and then disconnects them. Any events previously queued by theSimplePushEventSource.publish(Object)method will be delivered before this error event.After calling this method any
PushEventConsumerthat wishes mayPushEventSource.open(PushEventConsumer)this source, and will receive subsequent events.- Specified by:
errorin interfaceSimplePushEventSource<T>- Parameters:
t- the error
-
startWorker
private void startWorker()
-
handleReset
private void handleReset(boolean resetWait)
-
deliver
private Promise<java.lang.Long> deliver(java.util.List<PushEventConsumer<? super T>> toCall, PushEvent<T> event)
-
doCall
private Promise<java.lang.Long> doCall(PushEvent<T> event, PushEventConsumer<? super T> pec)
-
isConnected
public boolean isConnected()
Description copied from interface:SimplePushEventSourceDetermine whether there are anyPushEventConsumers for thisPushEventSource. This can be used to skip expensive event creation logic when there are no listeners.- Specified by:
isConnectedin interfaceSimplePushEventSource<T>- Returns:
- true if any consumers are currently connected
-
connectPromise
public Promise<java.lang.Void> connectPromise()
Description copied from interface:SimplePushEventSourceThis method can be used to delay event generation until an event source has connected. The returned promise will resolve as soon as one or morePushEventConsumerinstances have opened the SimplePushEventSource.The returned promise may already be resolved if this
SimplePushEventSourcealready has connected consumers. If theSimplePushEventSourceis closed before the returned Promise resolves then it will be failed with anIllegalStateException.Note that the connected consumers are able to asynchronously close their connections to this
SimplePushEventSource, and therefore it is possible that once the promise resolves thisSimplePushEventSourcemay no longer be connected to any consumers.- Specified by:
connectPromisein interfaceSimplePushEventSource<T>- Returns:
- A promise representing the connection state of this EventSource
-
closedConnectPromise
private Promise<java.lang.Void> closedConnectPromise()
-
-