Cosmo Router provides powerful hooks for customizing Custom Streams (a.k.a. Event-Driven Federated Subscriptions, or Cosmo Streams) behavior. These hooks allow you to implement custom logic for subscription lifecycle management, event processing, and data transformation.
Available Hooks
The Cosmo Streams system provides three main hook interfaces that you can implement in your custom modules:
SubscriptionOnStartHandler: Called once at subscription start
StreamReceiveEventHandler: Triggered for each client/subscription when a batch of events is received from the provider, prior to delivery
StreamPublishEventHandler: Called each time a batch of events is going to be sent to the provider
Hook Interfaces
SubscriptionOnStartHandler
This hook is called once when a subscription starts, allowing you to implement custom logic such as authorization checks or initial message sending.
type SubscriptionOnStartHandler interface {
// OnSubscriptionOnStart is called once at subscription start
// Returning an error will result in a GraphQL error being returned to the client
SubscriptionOnStart(ctx SubscriptionOnStartHandlerContext) error
}
Use cases:
- Authorization checks at subscription start
- Sending initial messages to clients
- Validating subscription parameters
StreamReceiveEventHandler
This hook is triggered for each client/subscription when a batch of events is received from the provider, before delivering them to the client.
type StreamReceiveEventHandler interface {
// OnReceiveEvents is called each time a batch of events is received from the provider before delivering them to the client
// So for a single batch of events received from the provider, this hook will be called one time for each active subscription.
// It is important to optimize the logic inside this hook to avoid performance issues.
// Returning an error will result in a GraphQL error being returned to the client
OnReceiveEvents(ctx StreamReceiveEventHandlerContext, events []StreamEvent) ([]StreamEvent, error)
}
Use cases:
- Event filtering based on client permissions
- Data transformation and mapping
- Event validation and sanitization
The StreamReceiveEventHandler is called for each active subscription when events are received, so optimize your logic to avoid performance issues. Even small inefficiencies can lead to significant delays when many subscriptions are active.
StreamPublishEventHandler
This hook is called each time a batch of events is going to be sent to the provider.
type StreamPublishEventHandler interface {
// OnPublishEvents is called each time a batch of events is going to be sent to the provider
// Returning an error will result in an error being returned and the client will see the mutation failing
OnPublishEvents(ctx StreamPublishEventHandlerContext, events []StreamEvent) ([]StreamEvent, error)
}
Use cases:
- Data transformation before publishing
- Event validation
- Adding metadata to events
Context Interfaces
Each hook provides a rich context interface that gives you access to request information, authentication, and configuration:
SubscriptionOnStartHandlerContext
type SubscriptionOnStartHandlerContext interface {
// Request is the original request received by the router.
Request() *http.Request
// Logger is the logger for the request
Logger() *zap.Logger
// Operation is the GraphQL operation
Operation() OperationContext
// Authentication is the authentication for the request
Authentication() authentication.Authentication
// SubscriptionEventConfiguration is the subscription event configuration
SubscriptionEventConfiguration() datasource.SubscriptionEventConfiguration
// WriteEvent writes an event to the stream of the current subscription
// It returns true if the event was written to the stream, false if the event was dropped
WriteEvent(event datasource.StreamEvent) bool
}
StreamReceiveEventHandlerContext
type StreamReceiveEventHandlerContext interface {
// Request is the initial client request that started the subscription
Request() *http.Request
// Logger is the logger for the request
Logger() *zap.Logger
// Operation is the GraphQL operation
Operation() OperationContext
// Authentication is the authentication for the request
Authentication() authentication.Authentication
// SubscriptionEventConfiguration is the subscription event configuration
SubscriptionEventConfiguration() SubscriptionEventConfiguration
}
StreamPublishEventHandlerContext
type StreamPublishEventHandlerContext interface {
// Request is the original request received by the router.
Request() *http.Request
// Logger is the logger for the request
Logger() *zap.Logger
// Operation is the GraphQL operation
Operation() OperationContext
// Authentication is the authentication for the request
Authentication() authentication.Authentication
// PublishEventConfiguration is the publish event configuration
PublishEventConfiguration() PublishEventConfiguration
}
Core Types
StreamEvent Interface
The StreamEvent interface allows the hooks system to be provider-agnostic:
type StreamEvent interface {
GetData() []byte
}
Each provider (NATS, Kafka, Redis) will have its own event type with custom fields, but they all implement this common interface.
OperationContext
The OperationContext provides access to GraphQL operation information:
type OperationContext interface {
Name() string
// the variables are currently not available, so we need to expose them here
Variables() *astjson.Value
}
Configuration Interfaces
SubscriptionEventConfiguration
type SubscriptionEventConfiguration interface {
ProviderID() string
ProviderType() string
// the root field name of the subscription in the schema
RootFieldName() string
}
PublishEventConfiguration
type PublishEventConfiguration interface {
ProviderID() string
ProviderType() string
// the root field name of the mutation in the schema
RootFieldName() string
}
Example: Authorization and Event Filtering
Here’s a complete example that demonstrates how to implement authorization checks and event filtering:
package mymodule
import (
"encoding/json"
"slices"
"github.com/wundergraph/cosmo/router/core"
"github.com/wundergraph/cosmo/router/pkg/pubsub/datasource"
"github.com/wundergraph/cosmo/router/pkg/pubsub/nats"
)
func init() {
// Register your module here and it will be loaded at router start
core.RegisterModule(&MyModule{})
}
type MyModule struct {}
// Implement SubscriptionOnStartHandler for authorization
func (m *MyModule) SubscriptionOnStart(ctx SubscriptionOnStartHandlerContext) error {
// Check if the provider is NATS
if ctx.SubscriptionEventConfiguration().ProviderType() != datasource.ProviderTypeNats {
return nil
}
// Check if the provider ID matches
if ctx.SubscriptionEventConfiguration().ProviderID() != "my-nats" {
return nil
}
// Check if the subscription is the expected one
if ctx.SubscriptionEventConfiguration().RootFieldName() != "employeeUpdates" {
return nil
}
// Check if the client is authenticated
if ctx.Authentication() == nil {
return core.NewHttpGraphqlError("client is not authenticated", http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
}
// Check if the client has the required permissions
clientAllowedEntitiesIds, found := ctx.Authentication().Claims()["readEmployee"]
if !found {
return core.NewHttpGraphqlError(
"client is not allowed to read employees",
http.StatusText(http.StatusForbidden),
http.StatusForbidden
)
}
return nil
}
// Implement StreamReceiveEventHandler for event filtering and transformation
func (m *MyModule) OnReceiveEvents(ctx StreamReceiveEventHandlerContext, events []core.StreamEvent) ([]core.StreamEvent, error) {
// Check if the provider is NATS
if ctx.SubscriptionEventConfiguration().ProviderType() != datasource.ProviderTypeNats {
return events, nil
}
// Check if the provider ID matches
if ctx.SubscriptionEventConfiguration().ProviderID() != "my-nats" {
return events, nil
}
// Check if the subscription is the expected one
if ctx.SubscriptionEventConfiguration().RootFieldName() != "employeeUpdates" {
return events, nil
}
newEvents := make([]core.StreamEvent, 0, len(events))
// Check if the client is authenticated
if ctx.Authentication() == nil {
// If the client is not authenticated, return no events
return newEvents, nil
}
// Get client's allowed entity IDs
clientAllowedEntitiesIds, found := ctx.Authentication().Claims()["allowedEntitiesIds"]
if !found {
return newEvents, fmt.Errorf("client is not allowed to subscribe to the stream")
}
for _, evt := range events {
natsEvent, ok := evt.(*nats.NatsEvent)
if !ok {
newEvents = append(newEvents, evt)
continue
}
// Decode the event data coming from the provider
var dataReceived struct {
EmployeeId string `json:"EmployeeId"`
OtherField string `json:"OtherField"`
}
err := json.Unmarshal(natsEvent.Data, &dataReceived)
if err != nil {
return events, fmt.Errorf("error unmarshalling data: %w", err)
}
// Filter events based on client's permissions
if !slices.Contains(clientAllowedEntitiesIds, dataReceived.EmployeeId) {
continue
}
// Transform the data to match the expected GraphQL schema
var dataToSend struct {
Id string `json:"id"`
TypeName string `json:"__typename"`
}
dataToSend.Id = dataReceived.EmployeeId
dataToSend.TypeName = "Employee"
// Marshal the transformed data
dataToSendMarshalled, err := json.Marshal(dataToSend)
if err != nil {
return events, fmt.Errorf("error marshalling data: %w", err)
}
// Create the new event
newEvent := &nats.NatsEvent{
Data: dataToSendMarshalled,
Metadata: natsEvent.Metadata,
}
newEvents = append(newEvents, newEvent)
}
return newEvents, nil
}
func (m *MyModule) Module() core.ModuleInfo {
return core.ModuleInfo{
ID: "myModule",
Priority: 1,
New: func() core.Module {
return &MyModule{}
},
}
}
// Interface guards
var (
_ core.SubscriptionOnStartHandler = (*MyModule)(nil)
_ core.StreamReceiveEventHandler = (*MyModule)(nil)
)
Here’s an example of how to transform events before publishing:
// Implement StreamPublishEventHandler for event transformation
func (m *MyModule) OnPublishEvents(ctx StreamPublishEventHandlerContext, events []StreamEvent) ([]StreamEvent, error) {
// Check if the provider is NATS
if ctx.PublishEventConfiguration().ProviderType() != datasource.ProviderTypeNats {
return events, nil
}
// Check if the provider ID matches
if ctx.PublishEventConfiguration().ProviderID() != "my-nats" {
return events, nil
}
// Check if the mutation is the expected one
if ctx.PublishEventConfiguration().RootFieldName() != "updateEmployee" {
return events, nil
}
transformedEvents := make([]StreamEvent, 0, len(events))
for _, evt := range events {
natsEvent, ok := evt.(*nats.NatsEvent)
if !ok {
transformedEvents = append(transformedEvents, evt)
continue
}
// Decode the original event data
var originalData map[string]interface{}
err := json.Unmarshal(natsEvent.Data, &originalData)
if err != nil {
return events, fmt.Errorf("error unmarshalling data: %w", err)
}
// Add metadata or transform the data
originalData["timestamp"] = time.Now().Unix()
originalData["source"] = "cosmo-router"
// Marshal the transformed data
transformedData, err := json.Marshal(originalData)
if err != nil {
return events, fmt.Errorf("error marshalling transformed data: %w", err)
}
// Create the transformed event
transformedEvent := &nats.NatsEvent{
Data: transformedData,
Metadata: natsEvent.Metadata,
}
transformedEvents = append(transformedEvents, transformedEvent)
}
return transformedEvents, nil
}