Skip to main content
Cosmo Router provides powerful hooks for customizing Custom Streams (a.k.a. Event-Driven Federated Subscriptions, or Cosmo Streams) behavior. These hooks allow you to implement custom logic for subscription lifecycle management, event processing, and data transformation.

Available Hooks

The Cosmo Streams system provides three main hook interfaces that you can implement in your custom modules:
  • SubscriptionOnStartHandler: Called once at subscription start
  • StreamReceiveEventHandler: Triggered for each client/subscription when a batch of events is received from the provider, prior to delivery
  • StreamPublishEventHandler: Called each time a batch of events is going to be sent to the provider

Hook Interfaces

SubscriptionOnStartHandler

This hook is called once when a subscription starts, allowing you to implement custom logic such as authorization checks or initial message sending.
type SubscriptionOnStartHandler interface {
    // OnSubscriptionOnStart is called once at subscription start
    // Returning an error will result in a GraphQL error being returned to the client
    SubscriptionOnStart(ctx SubscriptionOnStartHandlerContext) error
}
Use cases:
  • Authorization checks at subscription start
  • Sending initial messages to clients
  • Validating subscription parameters

StreamReceiveEventHandler

This hook is triggered for each client/subscription when a batch of events is received from the provider, before delivering them to the client.
type StreamReceiveEventHandler interface {
    // OnReceiveEvents is called each time a batch of events is received from the provider before delivering them to the client
    // So for a single batch of events received from the provider, this hook will be called one time for each active subscription.
    // It is important to optimize the logic inside this hook to avoid performance issues.
    // Returning an error will result in a GraphQL error being returned to the client
    OnReceiveEvents(ctx StreamReceiveEventHandlerContext, events []StreamEvent) ([]StreamEvent, error)
}
Use cases:
  • Event filtering based on client permissions
  • Data transformation and mapping
  • Event validation and sanitization
The StreamReceiveEventHandler is called for each active subscription when events are received, so optimize your logic to avoid performance issues. Even small inefficiencies can lead to significant delays when many subscriptions are active.

StreamPublishEventHandler

This hook is called each time a batch of events is going to be sent to the provider.
type StreamPublishEventHandler interface {
    // OnPublishEvents is called each time a batch of events is going to be sent to the provider
    // Returning an error will result in an error being returned and the client will see the mutation failing
    OnPublishEvents(ctx StreamPublishEventHandlerContext, events []StreamEvent) ([]StreamEvent, error)
}
Use cases:
  • Data transformation before publishing
  • Event validation
  • Adding metadata to events

Context Interfaces

Each hook provides a rich context interface that gives you access to request information, authentication, and configuration:

SubscriptionOnStartHandlerContext

type SubscriptionOnStartHandlerContext interface {
    // Request is the original request received by the router.
    Request() *http.Request
    // Logger is the logger for the request
    Logger() *zap.Logger
    // Operation is the GraphQL operation
    Operation() OperationContext
    // Authentication is the authentication for the request
    Authentication() authentication.Authentication
    // SubscriptionEventConfiguration is the subscription event configuration
    SubscriptionEventConfiguration() datasource.SubscriptionEventConfiguration
    // WriteEvent writes an event to the stream of the current subscription
    // It returns true if the event was written to the stream, false if the event was dropped
    WriteEvent(event datasource.StreamEvent) bool
}

StreamReceiveEventHandlerContext

type StreamReceiveEventHandlerContext interface {
    // Request is the initial client request that started the subscription
    Request() *http.Request
    // Logger is the logger for the request
    Logger() *zap.Logger
    // Operation is the GraphQL operation
    Operation() OperationContext
    // Authentication is the authentication for the request
    Authentication() authentication.Authentication
    // SubscriptionEventConfiguration is the subscription event configuration
    SubscriptionEventConfiguration() SubscriptionEventConfiguration
}

StreamPublishEventHandlerContext

type StreamPublishEventHandlerContext interface {
    // Request is the original request received by the router.
    Request() *http.Request
    // Logger is the logger for the request
    Logger() *zap.Logger
    // Operation is the GraphQL operation
    Operation() OperationContext
    // Authentication is the authentication for the request
    Authentication() authentication.Authentication
    // PublishEventConfiguration is the publish event configuration
    PublishEventConfiguration() PublishEventConfiguration
}

Core Types

StreamEvent Interface

The StreamEvent interface allows the hooks system to be provider-agnostic:
type StreamEvent interface {
    GetData() []byte
}
Each provider (NATS, Kafka, Redis) will have its own event type with custom fields, but they all implement this common interface.

OperationContext

The OperationContext provides access to GraphQL operation information:
type OperationContext interface {
    Name() string
    // the variables are currently not available, so we need to expose them here
    Variables() *astjson.Value
}

Configuration Interfaces

SubscriptionEventConfiguration

type SubscriptionEventConfiguration interface {
    ProviderID() string
    ProviderType() string
    // the root field name of the subscription in the schema
    RootFieldName() string
}

PublishEventConfiguration

type PublishEventConfiguration interface {
    ProviderID() string
    ProviderType() string
    // the root field name of the mutation in the schema
    RootFieldName() string
}

Example: Authorization and Event Filtering

Here’s a complete example that demonstrates how to implement authorization checks and event filtering:
package mymodule

import (
    "encoding/json"
    "slices"
    "github.com/wundergraph/cosmo/router/core"
    "github.com/wundergraph/cosmo/router/pkg/pubsub/datasource"
    "github.com/wundergraph/cosmo/router/pkg/pubsub/nats"
)

func init() {
    // Register your module here and it will be loaded at router start
    core.RegisterModule(&MyModule{})
}

type MyModule struct {}

// Implement SubscriptionOnStartHandler for authorization
func (m *MyModule) SubscriptionOnStart(ctx SubscriptionOnStartHandlerContext) error {
    // Check if the provider is NATS
    if ctx.SubscriptionEventConfiguration().ProviderType() != datasource.ProviderTypeNats {
        return nil
    }

    // Check if the provider ID matches
    if ctx.SubscriptionEventConfiguration().ProviderID() != "my-nats" {
        return nil
    }

    // Check if the subscription is the expected one
    if ctx.SubscriptionEventConfiguration().RootFieldName() != "employeeUpdates" {
        return nil
    }

    // Check if the client is authenticated
    if ctx.Authentication() == nil {
        return core.NewHttpGraphqlError("client is not authenticated", http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
    }

    // Check if the client has the required permissions
    clientAllowedEntitiesIds, found := ctx.Authentication().Claims()["readEmployee"]
    if !found {
      return core.NewHttpGraphqlError(
        "client is not allowed to read employees",
        http.StatusText(http.StatusForbidden),
        http.StatusForbidden
      )
    }

    return nil
}

// Implement StreamReceiveEventHandler for event filtering and transformation
func (m *MyModule) OnReceiveEvents(ctx StreamReceiveEventHandlerContext, events []core.StreamEvent) ([]core.StreamEvent, error) {
    // Check if the provider is NATS
    if ctx.SubscriptionEventConfiguration().ProviderType() != datasource.ProviderTypeNats {
        return events, nil
    }

    // Check if the provider ID matches
    if ctx.SubscriptionEventConfiguration().ProviderID() != "my-nats" {
        return events, nil
    }

    // Check if the subscription is the expected one
    if ctx.SubscriptionEventConfiguration().RootFieldName() != "employeeUpdates" {
        return events, nil
    }

    newEvents := make([]core.StreamEvent, 0, len(events))

    // Check if the client is authenticated
    if ctx.Authentication() == nil {
        // If the client is not authenticated, return no events
        return newEvents, nil
    }

    // Get client's allowed entity IDs
    clientAllowedEntitiesIds, found := ctx.Authentication().Claims()["allowedEntitiesIds"]
    if !found {
        return newEvents, fmt.Errorf("client is not allowed to subscribe to the stream")
    }

    for _, evt := range events {
        natsEvent, ok := evt.(*nats.NatsEvent)
        if !ok {
            newEvents = append(newEvents, evt)
            continue
        }

        // Decode the event data coming from the provider
        var dataReceived struct {
            EmployeeId string `json:"EmployeeId"`
            OtherField string `json:"OtherField"`
        }
        err := json.Unmarshal(natsEvent.Data, &dataReceived)
        if err != nil {
            return events, fmt.Errorf("error unmarshalling data: %w", err)
        }

        // Filter events based on client's permissions
        if !slices.Contains(clientAllowedEntitiesIds, dataReceived.EmployeeId) {
            continue
        }

        // Transform the data to match the expected GraphQL schema
        var dataToSend struct {
            Id string `json:"id"`
            TypeName string `json:"__typename"`
        }
        dataToSend.Id = dataReceived.EmployeeId
        dataToSend.TypeName = "Employee"

        // Marshal the transformed data
        dataToSendMarshalled, err := json.Marshal(dataToSend)
        if err != nil {
            return events, fmt.Errorf("error marshalling data: %w", err)
        }

        // Create the new event
        newEvent := &nats.NatsEvent{
            Data: dataToSendMarshalled,
            Metadata: natsEvent.Metadata,
        }
        newEvents = append(newEvents, newEvent)
    }
    return newEvents, nil
}

func (m *MyModule) Module() core.ModuleInfo {
    return core.ModuleInfo{
        ID: "myModule",
        Priority: 1,
        New: func() core.Module {
            return &MyModule{}
        },
    }
}

// Interface guards
var (
    _ core.SubscriptionOnStartHandler = (*MyModule)(nil)
    _ core.StreamReceiveEventHandler = (*MyModule)(nil)
)

Example: Event Publishing with Transformation

Here’s an example of how to transform events before publishing:
// Implement StreamPublishEventHandler for event transformation
func (m *MyModule) OnPublishEvents(ctx StreamPublishEventHandlerContext, events []StreamEvent) ([]StreamEvent, error) {
    // Check if the provider is NATS
    if ctx.PublishEventConfiguration().ProviderType() != datasource.ProviderTypeNats {
        return events, nil
    }

    // Check if the provider ID matches
    if ctx.PublishEventConfiguration().ProviderID() != "my-nats" {
        return events, nil
    }

    // Check if the mutation is the expected one
    if ctx.PublishEventConfiguration().RootFieldName() != "updateEmployee" {
        return events, nil
    }

    transformedEvents := make([]StreamEvent, 0, len(events))

    for _, evt := range events {
        natsEvent, ok := evt.(*nats.NatsEvent)
        if !ok {
            transformedEvents = append(transformedEvents, evt)
            continue
        }

        // Decode the original event data
        var originalData map[string]interface{}
        err := json.Unmarshal(natsEvent.Data, &originalData)
        if err != nil {
            return events, fmt.Errorf("error unmarshalling data: %w", err)
        }

        // Add metadata or transform the data
        originalData["timestamp"] = time.Now().Unix()
        originalData["source"] = "cosmo-router"

        // Marshal the transformed data
        transformedData, err := json.Marshal(originalData)
        if err != nil {
            return events, fmt.Errorf("error marshalling transformed data: %w", err)
        }

        // Create the transformed event
        transformedEvent := &nats.NatsEvent{
            Data: transformedData,
            Metadata: natsEvent.Metadata,
        }
        transformedEvents = append(transformedEvents, transformedEvent)
    }

    return transformedEvents, nil
}
I