Observable Package

pocket/pkg/observable Package

The pocket/pkg/observable package provides a lightweight and straightforward mechanism to handle asynchronous notifications using the Observer pattern. This is achieved through two primary interfaces: Observable and Observer.

Overview

The Observable interface is responsible for notifying multiple subscribers about new values asynchronously, while the Observer interface allows access to the notified channel and facilitates unsubscribing from an Observable.

Interfaces and Structures

Observable Interface

Represents a publisher in a "Fan-Out" system design, allowing multiple subscribers to be notified of new values asynchronously.

  • Methods:

    • Subscribe: Used to subscribe an observer to the observable. Returns an instance of the Observer interface.

      {% code title="Subscribe signature" %}

      func (o *MyObservableType) Subscribe(ctx context.Context) Observer[MyValueType]

      {% endcode %}

    • UnsubscribeAll: Unsubscribes all observers from the observable.

      {% code title="UnsubscribeAll signature" %}

      func (o *MyObservableType) UnsubscribeAll()

      {% endcode %}

Observer Interface

Represents a subscriber in a "Fan-Out" system design, providing access to the notified channel and capabilities to unsubscribe.

  • Methods:

    • Unsubscribe: Used to unsubscribe the observer from its associated observable.

      {% code title="Unsubscribe signature" %}

      func (obs *MyObserverType) Unsubscribe()

      {% endcode %}

    • Ch: Returns the channel through which the observer receives notifications.

      {% code title="Ch signature" %}

      func (obs *MyObserverType) Ch() <-chan MyValueType

      {% endcode %}

Architecture Diagrams

Visual representations to explain high-level structure and interactions.

Observable Synchronization

observer

observable

read-lock

write-lock

write-lock

write-lock

read-lock

write-lock

read/write mutex

#subscribe

#close

read/write mutex

#notify

#unsubscribe

Figure 1: This diagram depicts the synchronization mechanisms between the observable and its observers. It specifically showcases the use of read and write locks for different operations in both observable and observer contexts.

Observable Buffering

observer1

observer2

observable

subscriber channel

subscriber channel

publisher channel

source

sink

sink

sink

source

sink

source

publish context

subscribe context 1

subscribe context 2

publish buffer

subscribe buffer

#notify

subscribe buffer

#notify

Figure 2: The diagram illustrates the buffering mechanisms within the observable and its observers. It highlights how published messages are buffered and how they propagate to the individual observers' buffers.

Usage

Basic Example

main.go
package main

import (
	"context"
	"fmt"
	"time"

	"pocket/pkg/observable/channel"
)

func main() {
	// Create a new context that can be cancelled
	ctx, cancel := context.WithCancel(context.Background())
	// Ensure to cancel the context to release resources
	defer cancel()

	// Create a new Observable and its corresponding publisher
	obsvbl, publisher := channel.NewObservable[int]()
	// Subscribe the first Observer to the Observable
	observer1 := obsvbl.Subscribe(ctx)

	// Start observing with observer1 in a goroutine
	go func() {
		for v := range observer1.Ch() {
			fmt.Println("Observer1 received:", v)
		}
	}()

	// Publish the first value to the Observable
	publisher <- 10
	time.Sleep(time.Millisecond)

	// Now, subscribe the second Observer to the Observable
	observer2 := obsvbl.Subscribe(ctx)

	// Start observing with observer2 in a goroutine
	go func() {
		for v := range observer2.Ch() {
			fmt.Println("Observer2 received:", v)
		}
	}()

	// Publish the second value
	publisher <- 20
	time.Sleep(time.Millisecond)

	// Unsubscribe observer1 before the last value is sent
	observer1.Unsubscribe()
	fmt.Println("Observer1 unsubscribed!")

	// Publish the third value
	publisher <- 30
	time.Sleep(time.Millisecond)
}

// Expected Output:
// Observer1 received: 10
// Observer2 received: 20
// Observer1 received: 20
// Observer1 unsubscribed!
// Observer2 received: 30

Considerations

While the pkg/observable package is designed to be simple and minimal, developers with more complex requirements may need to consider extending its functionality or exploring other libraries like RxGo.

Conclusion

The pkg/observable package is an intuitive solution for handling asynchronous notifications in Go projects, ensuring efficient communication between observables and observers.

Was this helpful?