io.github.makingthematrix.signals3

Members list

Type members

Classlikes

Attributes

Companion
class
Supertypes
class Object
trait Matchable
class Any
Self type
final class AggregatingSignal[E, V](loader: () => Future[V], sourceStream: Stream[E], updater: (V, E) => V)(using ec: ExecutionContext) extends Signal[V]

A signal which initializes its value by executing the loader future and then updates the value by composition of the previous value and an event published in the associated source stream. You may think of it as a more performance-efficient version of RefreshingSignal, useful when the loader requires heavy computations but an update between one value and another is simple in comparison. For example:

A signal which initializes its value by executing the loader future and then updates the value by composition of the previous value and an event published in the associated source stream. You may think of it as a more performance-efficient version of RefreshingSignal, useful when the loader requires heavy computations but an update between one value and another is simple in comparison. For example:

val loader: Future[ Vector[DBEntry] ] = fetchDBTableData()
val sourceStream: Stream[DBEntry] = newDBTableEntryStream()
val updater: (Vector[DBEntry], DBEntry) => Vector[DBEntry] = { (table, newEntry) => table :+ newEntry }
val signal = new AggregatingSignal(loader, sourceStream, updater)

Here, the loader fetches the whole DB table, but if we know that the only change to that table is that new entries can be added to it, we can avoid calling the loader every time the event comes. Instead, we can create the updater function which will combine the current value of the signal (i.e. the in-memory cache of the DB table, created by calling the loader only once, when the signal was initialized), with the new entry.

Type parameters

E

The type of the update events.

V

The type of the value held in the signal and the result of the loader execution.

Value parameters

ec

The execution context in which the loader is executed.

loader

A future used for computing the initial value of the signal. It's passed by name, so if it is created in the place of argument, it will be executed for the first time only when the first subscriber function is registered in the signal, or immediately if disableAutowiring is used. If a new event comes while the loader not yet finished, the event will be memorized and used to produce the first updated value right afterwards.

sourceStream

a stream publishing events which will be used to update the value of the signal.

updater

A function combining the current value of the signal with a new event to produce the updated value.

Attributes

See also
Companion
object
Supertypes
class Signal[V]
class EventSource[V, SignalSubscriber]
class Object
trait Matchable
class Any
Self type

Attributes

Supertypes
trait EventContext
class Object
trait Matchable
class Any
abstract class BaseSubscription(context: WeakReference[EventContext]) extends Subscription

Provides the default implementation of the Subscription trait. Exposes two new abstract methods: onSubscribe and onUnsubscribe. A typical way to implement them is to have a reference to the source of events which implements the EventSource trait and call subscribe(this) on that source (where this is the subscription).

Provides the default implementation of the Subscription trait. Exposes two new abstract methods: onSubscribe and onUnsubscribe. A typical way to implement them is to have a reference to the source of events which implements the EventSource trait and call subscribe(this) on that source (where this is the subscription).

For examples:

Value parameters

context

A weak reference to the event context within which the subscription lives.

Attributes

See also

Stream

Supertypes
trait Subscription
class Object
trait Matchable
class Any
trait Closeable extends AutoCloseable

A stream or a signal can be closeable, meaning that it can be closed and after that it will not publish new events anymore. Every GeneratorStream and GeneratorSignal is Closeable which allows for stopping them when they're no longer needed, but you can make any new stream or signal inherit Closeable and implement the required logic. Closeable extends java.lang.AutoCloseable so in theory it can be used in Java try-with-resources.

A stream or a signal can be closeable, meaning that it can be closed and after that it will not publish new events anymore. Every GeneratorStream and GeneratorSignal is Closeable which allows for stopping them when they're no longer needed, but you can make any new stream or signal inherit Closeable and implement the required logic. Closeable extends java.lang.AutoCloseable so in theory it can be used in Java try-with-resources.

Attributes

See also

ProxyStream and ProxySignal for examples.

Companion
object
Supertypes
trait AutoCloseable
class Object
trait Matchable
class Any
Known subtypes
class GeneratorSignal[V]
class GeneratorStream[E]
trait Closeability
class CloseableFuture[T]
object Closeable

Attributes

Companion
trait
Supertypes
class Object
trait Matchable
class Any
Self type
Closeable.type
abstract class CloseableFuture[+T](using ec: ExecutionContext) extends Awaitable[T], Closeable

CloseableFuture is an object that for all practical uses works like a future but enables the user to close the operation. A closed future fails with CloseableFuture.Closed so the subscriber can differentiate between this and other failure reasons. It is impossible to close a future if it is already completed or if it is uncloseable.

CloseableFuture is an object that for all practical uses works like a future but enables the user to close the operation. A closed future fails with CloseableFuture.Closed so the subscriber can differentiate between this and other failure reasons. It is impossible to close a future if it is already completed or if it is uncloseable.

Attributes

See also

Uncloseable for details on uncloseable futures

Companion
object
Supertypes
trait Closeable
trait AutoCloseable
trait Awaitable[T]
class Object
trait Matchable
class Any
Show all
Self type

CloseableFuture is an object that for all practical uses works like a future but enables the user to close the operation. A closed future fails with CloseableFuture.Closed so the subscriber can differentiate between this and other failure reasons.

CloseableFuture is an object that for all practical uses works like a future but enables the user to close the operation. A closed future fails with CloseableFuture.Closed so the subscriber can differentiate between this and other failure reasons.

Attributes

See also
Companion
class
Supertypes
class Object
trait Matchable
class Any
Self type
trait DispatchQueue extends ExecutionContext

A thin wrapper over Scala's ExecutionContext allowing us to differentiate between the default execution context which tries to run asynchronously as many tasks as possible, and limited execution contexts, allowed to run only up to a given number of tasks at once.

A thin wrapper over Scala's ExecutionContext allowing us to differentiate between the default execution context which tries to run asynchronously as many tasks as possible, and limited execution contexts, allowed to run only up to a given number of tasks at once.

Attributes

See also

ExecutionContext

Companion
object
Supertypes
trait ExecutionContext
class Object
trait Matchable
class Any
Known subtypes
object DispatchQueue

Attributes

Companion
trait
Supertypes
class Object
trait Matchable
class Any
Self type
object EventContext

Attributes

Companion
trait
Supertypes
class Object
trait Matchable
class Any
Self type
trait EventContext

When you subscribe to an EventSource in return you receive a Subscription. You can use that subscription to unsubscribe from the event source or to temporarily pause receiving events. But managing a big number of subscriptions to different event sources can be tricky. EventContext comes to the rescue.

When you subscribe to an EventSource in return you receive a Subscription. You can use that subscription to unsubscribe from the event source or to temporarily pause receiving events. But managing a big number of subscriptions to different event sources can be tricky. EventContext comes to the rescue.

By default, every subscription is registered in a "dummy" EventContext.Global which lives for the lifetime of the whole program and does nothing. But if instead you will create a new EventContext and use it explicitly when subscribing or you will set it as an implicit parameter, taking over EventContext.Global, the subscription will be registered within this new one. It will allow you to manage all registered subscriptions at once and all registered subscriptions will be destroyed when the event context lifetime ends.

Usage of methods in the trait are explained as they are implemented in the default implementation. All operations on an EventContext are synchronized.

Attributes

See also
Companion
object
Supertypes
class Object
trait Matchable
class Any
Known subtypes
object Global.type
abstract class EventSource[E, S]

Attributes

Companion
object
Supertypes
class Object
trait Matchable
class Any
Known subtypes
class Signal[V]
class GeneratorSignal[V]
class AggregatingSignal[E, V]
class RefreshingSignal[V]
class SourceSignal[V]
class Stream[E]
class GeneratorStream[E]
class SourceStream[E]
class StreamWithAuxSignal[A, B]
Show all
object EventSource

Attributes

Companion
class
Supertypes
class Object
trait Matchable
class Any
Self type

A dispatch queue limiting number of concurrently executing tasks. All tasks are executed on parent execution context, but only up to the concurrencyLimit. New tasks, scheduled when the limit is reached, will wait in the queue until one of the current one finishes. Create with one of DispatchQueue.apply methods.

A dispatch queue limiting number of concurrently executing tasks. All tasks are executed on parent execution context, but only up to the concurrencyLimit. New tasks, scheduled when the limit is reached, will wait in the queue until one of the current one finishes. Create with one of DispatchQueue.apply methods.

Attributes

Companion
object
Supertypes
trait ExecutionContext
class Object
trait Matchable
class Any
Known subtypes

Attributes

Companion
class
Supertypes
class Object
trait Matchable
class Any
Self type
final class RefreshingSignal[V](loader: () => CloseableFuture[V], refreshStream: Stream[_])(using ec: ExecutionContext) extends Signal[V]

A signal which initializes its value by executing the loader closeable future and then updates the value the same way every time a new refresh event is published in the associated stream. The type of the event is not important.

A signal which initializes its value by executing the loader closeable future and then updates the value the same way every time a new refresh event is published in the associated stream. The type of the event is not important.

A typical use case for a refreshing signal might be, for example, to inform another component that something changed in the storage while already retrieving the updated data. In this case, the refresh stream can be anything that indicates the data has changed, and the loader is the query. The refresh event might even be a false positive: then the loader function will be called but the subscriber function of the refreshing signal will not be notified as the result of the loader is the same and so the value of the signal doesn't change.

Type parameters

V

The value type of the signal and the result of the loader closeable future.

Value parameters

ec

The execution context in which the loader is executed.

loader

A closeable future computing the value of the signal. It's passed by name, so if it is created in the place of argument, it will be executed for the first time only when the first subscriber function is registered in the signal, or immediately if disableAutowiring is used. If the execution fails or is cancelled, the value of the signal won't be updated.

refreshStream

a stream publishing events which will trigger new executions of the loader. If a new event comes before the previous call to loader finishes, the previous call will be cancelled.

Attributes

See also
Companion
object
Supertypes
class Signal[V]
class EventSource[V, SignalSubscriber]
class Object
trait Matchable
class Any

Attributes

Companion
class
Supertypes
class Object
trait Matchable
class Any
Self type

A special case of a limited dispatch queue which allows for only one task to be executed at once. Use when you want to enforce the tasks to be executed in the order they were scheduled.

A special case of a limited dispatch queue which allows for only one task to be executed at once. Use when you want to enforce the tasks to be executed in the order they were scheduled.

Attributes

Companion
object
Supertypes
trait ExecutionContext
class Object
trait Matchable
class Any
Show all

Attributes

Companion
class
Supertypes
class Object
trait Matchable
class Any
Self type
object Serialized

A utility object for serializing futures.

A utility object for serializing futures.

The need for this functionality comes from the fact that we can't assume an event will be processed before the next one comes, but sometimes it is also crucial to process the next event only after the first one is done. In such case, the user can use one of the methods of Serialized to schedule processing the first event, tag it with a key, and then simply use the same key to schedule processing of the second event. The user doesn't have to know if the first event was already processed or not - if yes, processing of the second will start immediately, if not, the processing (in the form of a future or a CloseableFuture) will be attached to the end of the ongoing processing and triggered only after it's done.

Attributes

Supertypes
class Object
trait Matchable
class Any
Self type
Serialized.type
class Signal[V](var value: Option[V]) extends EventSource[V, SignalSubscriber]

A signal is a stream with a cache.

A signal is a stream with a cache.

Whereas a stream holds no internal state and just passes on events it receives, a signal keeps the last value it received. A new subscriber function registered in a stream will be called only when a new event is published. A new subscriber function registered in a signal will be called immediately (or as soon as possible on the given execution context) with the current value of the signal (unless it's not initialized yet) and then again when the value changes. A signal is also able to compare a new value published in it with the old one - the new value will be passed on only if it is different. Thus, a signal can help us with optimizing performance on both ends: as a cache for values which otherwise would require expensive computations to produce them every time we need them, and as a way to ensure that subscriber functions are called only when the value actually changes, but not when the result of the intermediate computation is the same as before.

Note that for clarity we talk about events in the event streams, but about values in signals.

An signal of the type V dispatches values to all functions of the type (V) => Unit which were registered in the signal as its subscribers. It provides a handful of methods which enable the user to create new signals by means of composing the old ones, filtering them, etc., in a way similar to how the user can operate on standard collections, as well as to interact with Scala futures, closeable futures, and event streams. Please note that by default a signal is not able to receive events from the outside - that functionality belongs to SourceSignal.

Type parameters

V

The type of the value held in the signal.

Value parameters

value

The option of the last value published in the signal or None if the signal was not initialized yet.

Attributes

See also

Stream

Companion
object
Supertypes
class EventSource[V, SignalSubscriber]
class Object
trait Matchable
class Any
Known subtypes
class GeneratorSignal[V]
class AggregatingSignal[E, V]
class RefreshingSignal[V]
class SourceSignal[V]
Self type
Signal[V]
object Signal

Attributes

Companion
class
Supertypes
class Object
trait Matchable
class Any
Self type
Signal.type
final class SourceSignal[V](val v: Option[V]) extends Signal[V]

The usual entry point for publishing values in signals.

The usual entry point for publishing values in signals.

Create a new signal either using the default constructor or the Signal.apply[V]() method. The source signal exposes methods you can use for changing its value. Then you can combine it with other signals and finally subscribe a function to it which will be called initially, and then on each change of the signal's value.

Type parameters

V

the type of the value held by the signal.

Attributes

Companion
object
Supertypes
class Signal[V]
class EventSource[V, SignalSubscriber]
class Object
trait Matchable
class Any
object SourceSignal

Attributes

Companion
class
Supertypes
class Object
trait Matchable
class Any
Self type
final class SourceStream[E] extends Stream[E]

The usual entry point for publishing events.

The usual entry point for publishing events.

Create a new source stream either using the default constructor or the Stream.apply[V]() method. The source stream exposes methods you can use for publishing new events. Then you can combine it with other event streams and finally subscribe a function to it which will receive the resulting events.

Type parameters

E

the type of the event

Attributes

Supertypes
class Stream[E]
class EventSource[E, EventSubscriber[E]]
class Object
trait Matchable
class Any
class Stream[E]() extends EventSource[E, EventSubscriber[E]]

A stream of type E dispatches events (of type E) to all functions of type (E) => Unit which were registered in the stream as its subscribers. It doesn't have an internal state. It provides a handful of methods which enable the user to create new event streams by means of composing the old ones, filtering them, etc., in a way similar to how the user can operate on standard collections, as well as to interact with Scala futures, closeable futures, and signals. Please note that by default a stream is not able to receive events from the outside - that functionality belongs to SourceStream.

A stream of type E dispatches events (of type E) to all functions of type (E) => Unit which were registered in the stream as its subscribers. It doesn't have an internal state. It provides a handful of methods which enable the user to create new event streams by means of composing the old ones, filtering them, etc., in a way similar to how the user can operate on standard collections, as well as to interact with Scala futures, closeable futures, and signals. Please note that by default a stream is not able to receive events from the outside - that functionality belongs to SourceStream.

A stream may also help in sending events from one execution context to another. For example, a source stream may receive an event in one execution context, but the function which consumes it is registered with another execution context specified. In that case the function won't be called immediately, but in a future executed in that execution context.

Attributes

See also

ExecutionContext

Companion
object
Supertypes
class EventSource[E, EventSubscriber[E]]
class Object
trait Matchable
class Any
Known subtypes
class GeneratorStream[E]
class SourceStream[E]
class StreamWithAuxSignal[A, B]
object Stream

Attributes

Companion
class
Supertypes
class Object
trait Matchable
class Any
Self type
Stream.type
final class StreamWithAuxSignal[A, B](source: Stream[A], aux: Signal[B]) extends Stream[(A, Option[B])]

a stream coupled with an auxiliary signal. You can use it if you want to repeat some computations based on the current value of the signal every time when an event is published in the source stream.

a stream coupled with an auxiliary signal. You can use it if you want to repeat some computations based on the current value of the signal every time when an event is published in the source stream.

val aux = Signal[Int]()
val source = Stream[Unit]()

val newStream = EventStreamWithAuxSignal(source, aux)
newStream.foreach { case (_, Option(n)) => /* ... */ }

Here, newStream extends Stream[Unit, Option[Int]]. The subscriber function registered in newStream`` will be called every time a new unit event is published insourceand it will receive a tuple of the event and the current value ofaux:Some[Int]if something was already published in the signal, orNone` if it is not initialized yet.

Type parameters

A

The type of events in the source stream.

B

The type of values in the auxiliary signal.

Value parameters

aux

An auxiliary signal of values of the type B. Every time a new event is published in source, this stream will access the signal for its current value. The value (or lack of it) will become the second part of the tuple published in this stream.

source

The source stream used to trigger events in this stream. Every event of type A published in source will become the first part of the tuple published in this stream.

Attributes

Companion
object
Supertypes
class Stream[(A, Option[B])]
class EventSource[(A, Option[B]), EventSubscriber[(A, Option[B])]]
class Object
trait Matchable
class Any

Attributes

Companion
class
Supertypes
class Object
trait Matchable
class Any
Self type
trait Subscription

When you add a new subscriber to your Stream or Signal, in return you get a Subscription. A subscription can then be used to inform the source about changes in the condition of the connection: should it be enabled or disabled, should the subscriber be subscribed or (temporarily) unsubscribed, or should the subscription be permanently destroyed.

When you add a new subscriber to your Stream or Signal, in return you get a Subscription. A subscription can then be used to inform the source about changes in the condition of the connection: should it be enabled or disabled, should the subscriber be subscribed or (temporarily) unsubscribed, or should the subscription be permanently destroyed.

It is important to destroy subscriptions when they are no longer needed, e.g. at the end of the life of an object which subscribes to the source of events. Otherwise you may face a hidden memory leak where no longer used data cannot be GC-ed because it is still referenced by the source of events.

Attributes

See also

EventContext Implement this trait together with writing a new event source if you want to change how your event source reacts to the aforementioned events. For an example of how to do it on a small scale, please

Supertypes
class Object
trait Matchable
class Any
Known subtypes
object Threading

Use Threading to set up the default execution context which will be later used as the parent for other dispatch queues and to run closeable futures, event streams, and signals, if no other execution context is provided.

Use Threading to set up the default execution context which will be later used as the parent for other dispatch queues and to run closeable futures, event streams, and signals, if no other execution context is provided.

Attributes

Supertypes
class Object
trait Matchable
class Any
Self type
Threading.type
final class ThrottledSignal[V](val source: Signal[V], val delay: FiniteDuration)

A signal which publishes changes of its parent signal but no more often than once during a given time interval. The initial value of the parent signal will be published immediately. The first change to it will happen at the earliest after the given delay. If the parent signal changes its value more often, the intermediate values will be ignored.

A signal which publishes changes of its parent signal but no more often than once during a given time interval. The initial value of the parent signal will be published immediately. The first change to it will happen at the earliest after the given delay. If the parent signal changes its value more often, the intermediate values will be ignored.

Use it e.g. for optimization of a signal chain when there is no need to react immediately to all changes to the original signal. For example. changes in the UI could be displayed only with the speed that allows for comfortable usage of the app by the user, but not faster.

Type parameters

V

The value type of the signal.

Value parameters

delay

The time interval used for publishing. No more than one change of the value per delay will be published.

source

The original signal providing the value and changes to it.

Attributes

Todo

Check if when the original value changes once during the delay interval, but not again after it, will that one change be noticed. I think it should be.

Companion
object
Supertypes
class Signal[V]
class EventSource[V, SignalSubscriber]
class Object
trait Matchable
class Any

Attributes

Companion
class
Supertypes
class Object
trait Matchable
class Any
Self type

A dispatch queue that simply passes all its tasks to its execution context.

A dispatch queue that simply passes all its tasks to its execution context.

Attributes

Companion
object
Supertypes
trait ExecutionContext
class Object
trait Matchable
class Any

Attributes

Companion
class
Supertypes
class Object
trait Matchable
class Any
Self type