StreamX 流中间件最佳实践

中间件类型

Stream Recv/Send Middleware

触发时机:流收发消息时调用

类型定义

type StreamRecvEndpoint func(ctx context.Context, stream Stream, res any) (err error)
type StreamSendEndpoint func(ctx context.Context, stream Stream, req any) (err error)

type StreamRecvMiddleware func(next StreamRecvEndpoint) StreamRecvEndpoint
type StreamSendMiddleware func(next StreamSendEndpoint) StreamSendEndpoint

参数说明

  • stream 直接获取当前的流对象
  • res/req 均代表真实请求和响应。
  • Next 函数调用前后的行为:
中间件类型 Next 调用前 Next 调用后
StreamRecvMiddleware - 数据未真正收,刚调用 stream.Recv() 函数

- res 参数为空
- 数据已收到或遇到错误

- res 参数有真实值
StreamSendMiddleware - 数据未真正发送,刚调用 stream.Send() 函数

- req 参数为真实请求
- 数据发送完成或遇到错误

- req 参数为真实请求

使用范例

使用场景:流收/发消息时,注入相关业务逻辑。

svr, err := xxx.NewServer(
    //...
    streamxserver.WithStreamRecvMiddleware(func(next streamx.StreamRecvEndpoint) streamx.StreamRecvEndpoint {
        return func(ctx context.Context, stream streamx.Stream, res any) (err error) {
            // ctx 依然含有用户透传的 token
            token, ok := metainfo.GetPersistentValue(ctx, "user_token")
            // 检查 token 是否有账户余额继续维持会话
            if !hasBalance(token) {
                return fmt.Errorf("user dont have enough balance: token=%s", token)
            }
            return next(ctx, stream, res)
        }
    }),
)

注入中间件

注入 Client Middleware

cli, err := xxx.NewClient(
    "a.b.c",
    streamxclient.WithStreamRecvMiddleware(func(next streamx.StreamRecvEndpoint) streamx.StreamRecvEndpoint {
        return func(ctx context.Context, stream streamx.Stream, res any) (err error) {
           return next(ctx, stream, res)
        }
    }),
    streamxclient.WithStreamSendMiddleware(func(next streamx.StreamSendEndpoint) streamx.StreamSendEndpoint {
        return func(ctx context.Context, stream streamx.Stream, req any) (err error) {
           return next(ctx, stream, req)
        }
    }),
)

注入 Server Middleware

server, err := xxx.NewServer(
    // ....
    streamxserver.WithStreamRecvMiddleware(func(next streamx.StreamRecvEndpoint) streamx.StreamRecvEndpoint {
        return func(ctx context.Context, stream streamx.Stream, res any) (err error) {
           return next(ctx, stream, res)
        }
    }),
    streamxserver.WithStreamSendMiddleware(func(next streamx.StreamSendEndpoint) streamx.StreamSendEndpoint {
        return func(ctx context.Context, stream streamx.Stream, req any) (err error) {
           return next(ctx, stream, req)
        }
    }),
)

最后修改 January 13, 2025 : docs: add description for streamx (#1202) (0337c81)