Observable Package
pocket/pkg/observable Package
pocket/pkg/observable PackageThe pocket/pkg/observable package provides a lightweight and straightforward mechanism to handle asynchronous notifications using the Observer pattern. This is achieved through two primary interfaces: Observable and Observer.
Overview
The Observable interface is responsible for notifying multiple subscribers about new values asynchronously, while the Observer interface allows access to the notified channel and facilitates unsubscribing from an Observable.
Interfaces and Structures
Observable Interface
Observable InterfaceRepresents a publisher in a "Fan-Out" system design, allowing multiple subscribers to be notified of new values asynchronously.
Methods:
Subscribe: Used to subscribe an observer to the observable. Returns an instance of the
Observerinterface.{% code title="Subscribe signature" %}
func (o *MyObservableType) Subscribe(ctx context.Context) Observer[MyValueType]{% endcode %}
UnsubscribeAll: Unsubscribes all observers from the observable.
{% code title="UnsubscribeAll signature" %}
func (o *MyObservableType) UnsubscribeAll(){% endcode %}
Observer Interface
Observer InterfaceRepresents a subscriber in a "Fan-Out" system design, providing access to the notified channel and capabilities to unsubscribe.
Methods:
Unsubscribe: Used to unsubscribe the observer from its associated observable.
{% code title="Unsubscribe signature" %}
func (obs *MyObserverType) Unsubscribe(){% endcode %}
Ch: Returns the channel through which the observer receives notifications.
{% code title="Ch signature" %}
func (obs *MyObserverType) Ch() <-chan MyValueType{% endcode %}
Architecture Diagrams
Visual representations to explain high-level structure and interactions.
Observable Synchronization
observer
observable
read-lock
write-lock
write-lock
write-lock
read-lock
write-lock
read/write mutex
#subscribe
#close
read/write mutex
#notify
#unsubscribeFigure 1: This diagram depicts the synchronization mechanisms between the observable and its observers. It specifically showcases the use of read and write locks for different operations in both observable and observer contexts.
Observable Buffering
observer1
observer2
observable
subscriber channel
subscriber channel
publisher channel
source
sink
sink
sink
source
sink
source
publish context
subscribe context 1
subscribe context 2
publish buffer
subscribe buffer
#notify
subscribe buffer
#notifyFigure 2: The diagram illustrates the buffering mechanisms within the observable and its observers. It highlights how published messages are buffered and how they propagate to the individual observers' buffers.
Usage
Basic Example
package main
import (
"context"
"fmt"
"time"
"pocket/pkg/observable/channel"
)
func main() {
// Create a new context that can be cancelled
ctx, cancel := context.WithCancel(context.Background())
// Ensure to cancel the context to release resources
defer cancel()
// Create a new Observable and its corresponding publisher
obsvbl, publisher := channel.NewObservable[int]()
// Subscribe the first Observer to the Observable
observer1 := obsvbl.Subscribe(ctx)
// Start observing with observer1 in a goroutine
go func() {
for v := range observer1.Ch() {
fmt.Println("Observer1 received:", v)
}
}()
// Publish the first value to the Observable
publisher <- 10
time.Sleep(time.Millisecond)
// Now, subscribe the second Observer to the Observable
observer2 := obsvbl.Subscribe(ctx)
// Start observing with observer2 in a goroutine
go func() {
for v := range observer2.Ch() {
fmt.Println("Observer2 received:", v)
}
}()
// Publish the second value
publisher <- 20
time.Sleep(time.Millisecond)
// Unsubscribe observer1 before the last value is sent
observer1.Unsubscribe()
fmt.Println("Observer1 unsubscribed!")
// Publish the third value
publisher <- 30
time.Sleep(time.Millisecond)
}
// Expected Output:
// Observer1 received: 10
// Observer2 received: 20
// Observer1 received: 20
// Observer1 unsubscribed!
// Observer2 received: 30Considerations
Conclusion
The pkg/observable package is an intuitive solution for handling asynchronous notifications in Go projects, ensuring efficient communication between observables and observers.
Was this helpful?
