消息类型

Kitex 支持 PingPong、Oneway、Streaming 消息类型。

协议支持

目前 Kitex 支持的消息类型、序列化协议和传输协议:

消息类型 序列化 传输协议
PingPong Thrift / Protobuf TTHeader / HTTP2(gRPC)
Oneway Thrift TTHeader
Streaming Thrift / Protobuf HTTP2(gRPC)
  • PingPong:客户端发起一个请求后会等待一个响应才可以进行下一次请求
  • Oneway:客户端发起一个请求后不等待一个响应
  • Streaming:客户端发起一个或多个请求 , 等待一个或多个响应

Thrift

Kitex 支持基于 Thrift 协议的 PingPongOneway 消息类型;同时支持 Thrift Streaming over HTTP2。

PingPong、Oneway 的接口定义见下面的 Example,Thrift Streaming 使用见 Thrift Streaming

Example

IDL 定义 :

namespace go echo

struct Request {
    1: string Msg
}

struct Response {
    1: string Msg
}

service EchoService {
    Response Echo(1: Request req); // pingpong method
    oneway void VisitOneway(1: Request req); // oneway method
}

生成的代码组织结构 :

.
└── kitex_gen
    └── echo
        ├── echo.go
        ├── echoservice
        │   ├── client.go
        │   ├── echoservice.go
        │   ├── invoker.go
        │   └── server.go
        ├── k-consts.go
        └── k-echo.go

Server 的处理代码形如 :

package main

import (
    "context"

    "xx/echo"
    "xx/echo/echoservice"
)

type handler struct {}

func (handler) Echo(ctx context.Context, req *echo.Request) (r *echo.Response, err error) {
    //...
    return &echo.Response{ Msg: "world" }, err
}

func (handler) VisitOneway(ctx context.Context, req *echo.Request) (err error) {
    //...
    return nil
}

func main() {
    svr := echo.NewServer(handler{})
	err := svr.Run()
    if err != nil {
        panic(err)
    }
}

PingPong

Client 侧代码 :

package main

import (
    "context"
    "fmt"

    "xx/echo"
    "xx/echo/echoservice"

    "github.com/cloudwego/kitex/client"
)

func main() {
    cli, err := echoservice.NewClient("destServiceName", client.WithHostPorts("0.0.0.0:8888"))
    if err != nil {
        panic(err)
    }
    req := echo.NewRequest()
    req.Msg = "hello"
    resp, err := cli.Echo(context.Background(), req)
    if err != nil {
        panic(err)
    }

    fmt.Println(resp.Msg)
    // resp.Msg == "world"
}

Oneway

Client 侧代码 :

package main

import (
    "context"

    "xx/echo"
    "xx/echo/echoservice"

    "github.com/cloudwego/kitex/client"
)

func main() {
    cli, err := echoservice.NewClient("destServiceName", client.WithHostPorts("0.0.0.0:8888"))
    if err != nil {
        panic(err)
    }
    req := echo.NewRequest()
    req.Msg = "hello"
    err = cli.VisitOneway(context.Background(), req)
    if err != nil {
        panic(err)
    }
    // no response return
}

Protobuf

Kitex 支持两种承载 Protobuf 负载的协议:

  • Kitex Protobuf
    • 只支持 PingPong,若 IDL 定义了 stream 方法,将默认使用 gRPC 协议
  • gRPC 协议
    • 可以与 gRPC 互通,与 gRPC service 定义相同,支持 Unary(PingPong)、 Streaming 调用

Example

以下给出 Streaming 的使用示例。

IDL 定义 :

syntax = "proto3";

option go_package = "echo";

package echo;

message Request {
  string msg = 1;
}

message Response {
  string msg = 1;
}

service EchoService {
  rpc ClientSideStreaming(stream Request) returns (Response) {} // 客户端侧 streaming
  rpc ServerSideStreaming(Request) returns (stream Response) {} // 服务端侧 streaming
  rpc BidiSideStreaming(stream Request) returns (stream Response) {} // 双向流
}

生成的代码组织结构 :

.
└── kitex_gen
    └── echo
        ├── echo.pb.go
        ├── echo.pb.fast.go
        └── echoservice
            ├── client.go
            ├── echoservice.go
            ├── invoker.go
            └── server.go

Server 侧代码 :

package main

import (
    "log"
    "time"
    "context"

    "xx/echo"
    "xx/echo/echoservice"
}

type handler struct{}

func (handler) ClientSideStreaming(stream echo.EchoService_ClientSideStreamingServer) (err error) {
    for {
        req, err := stream.Recv()
        if err != nil {
            return err
        }
        log.Println("received:" , req.GetMsg())
    }
}

func (handler) ServerSideStreaming(req *echo.Request, stream echo.EchoService_ServerSideStreamingServer) (err error) {
      _ = req
      for {
          resp := &echo.Response{Msg: "world"}
          if err := stream.Send(resp); err != nil {
              return err
          }
      }
}

func (handler) BidiSideStreaming(stream echo.EchoService_BidiSideStreamingServer) (err error) {
	ctx, cancel := context.WithCancel(context.Background())
	errChan := make(chan error, 1)

	go func() {
		for {
			select {
			case <- ctx.Done():
				return
			default:
				req,err := stream.Recv()
				if err != nil {
					errChan <- err
					cancel()
					return
				}
				log.Println("received:", req.GetMsg())
			}
		}
	}()
	go func() {
		for {
			select {
			case <- ctx.Done():
				return
			default:
				resp := &echo.Response{Msg: "world"}
				if err := stream.Send(resp); err != nil {
					errChan <- err
					cancel()
					return
				}
			}
			time.Sleep(time.Second)
		}
	}()

	err = <-errChan
	cancel()
	return err
}

func main() {
    svr := echoservice.NewServer(new(handler))

    err := svr.Run()

    if err != nil {
        log.Println(err.Error())
    }
}

Streaming

ClientSideStreaming:

package main

import (
    "context"
    "time"

    "xx/echo"
    "xx/echo/echoservice"

    "github.com/cloudwego/kitex/client"
}

func main() {
    cli, err := echoservice.NewClient("destServiceName", client.WithHostPorts("0.0.0.0:8888"))
    if err != nil {
        panic(err)
    }
    cliStream, err := cli.ClientSideStreaming(context.Background())
    if err != nil {
        panic(err)
    }
    for {
        req := &echo.Request{Msg: "hello"}
        if err := cliStream.Send(req); err != nil {
            panic(err)
        }
        time.Sleep(time.Second)
    }

}

ServerSideStreaming:

package main

import (
    "context"
    "log"
    "time"

    "xx/echo"
    "xx/echo/echoservice"

    "github.com/cloudwego/kitex/client"
}

func main() {
    cli, err := echoseervice.NewClient("destServiceName", client.WithHostPorts("0.0.0.0:8888"))
    if err != nil {
        panic(err)
    }
    req := &echo.Request{Msg: "hello"}
    svrStream, err := cli.ServerSideStreaming(context.Background(), req)
    if err != nil {
        panic(err)
    }
    for {
        resp, err := svrStream.Recv()
        log.Println("response:",resp.GetMsg())
        if err != nil {
            panic(err)
        }
        time.Sleep(time.Second)
        // resp.Msg == "world"
    }

}

BidiSideStreaming:

package main

import (
    "context"
    "log"
    "time"

    "xx/echo"
    "xx/echo/echoservice"

    "github.com/cloudwego/kitex/client"
}

func main() {
    cli, err := echoservice.NewClient("destServiceName", client.WithHostPorts("0.0.0.0:8888"))
    if err != nil {
        panic(err)
    }
    bidiStream, err := cli.BidiSideStreaming(context.Background())
    if err != nil {
        panic(err)
    }
    go func() {
        for {
            req := &echo.Request{Msg: "hello"}
            err := bidiStream.Send(req)
            if err != nil {
                panic(err)
            }
            time.Sleep(time.Second)
        }
    }()
    for {
        resp, err := bidiStream.Recv()
        if err != nil {
            panic(err)
        }
        log.Println(resp.GetMsg())
        // resp.Msg == "world"
    }
}

最后修改 January 13, 2025 : docs: add description for streamx (#1202) (0337c81)