File

使用 本地文件 作为 Kitex 的服务治理配置中心

支持文件类型

json yaml

安装

go get github.com/kitex-contrib/config-file

Suite

本地文件 的配置中心适配器,kitex 通过 WithSuite 将 本地文件 中的配置转换为 kitex 的治理特性配置。

使用方法可以分为两个步骤

  1. 创建文件监听器(FileWatcher)
  2. 使用 WithSuite 引入配置监听

Server

type FileConfigServerSuite struct {
	watcher monitor.ConfigMonitor
}

函数签名:

func NewSuite(key string, watcher filewatcher.FileWatcher, opts ...utils.Option) *FileConfigServerSuite

示例代码(或访问此处):

package main

import (
	"context"
	"encoding/json"
	"log"

	"github.com/cloudwego/kitex-examples/kitex_gen/api"
	"github.com/cloudwego/kitex-examples/kitex_gen/api/echo"
	"github.com/cloudwego/kitex/pkg/klog"
	"github.com/cloudwego/kitex/pkg/rpcinfo"
	kitexserver "github.com/cloudwego/kitex/server"
	"github.com/kitex-contrib/config-file/filewatcher"
	"github.com/kitex-contrib/config-file/parser"
	fileserver "github.com/kitex-contrib/config-file/server"
)

var _ api.Echo = &EchoImpl{}

const (
	filepath    = "kitex_server.json"
	key         = "ServiceName"
	serviceName = "ServiceName"
)

type EchoImpl struct{}

func (s *EchoImpl) Echo(ctx context.Context, req *api.Request) (resp *api.Response, err error) {
	klog.Info("echo called")
	return &api.Response{Message: req.Message}, nil
}

// 由用户自定义
type MyParser struct{}

// 自定义解析器的一个示例
// 如果服务器配置的类型是 json 或 yaml,则仅需要使用默认解析器
func (p *MyParser) Decode(kind parser.ConfigType, data []byte, config interface{}) error {
	return json.Unmarshal(data, config)
}

func main() {
	klog.SetLevel(klog.LevelDebug)

	// 创建一个文件监听器对象
	fw, err := filewatcher.NewFileWatcher(filepath)
	if err != nil {
		panic(err)
	}
	// 开始监听文件变化
	if err = fw.StartWatching(); err != nil {
		panic(err)
	}
	defer fw.StopWatching()

	svr := echo.NewServer(
		new(EchoImpl),
		kitexserver.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: serviceName}),
		kitexserver.WithSuite(fileserver.NewSuite(key, fw)), // 添加监听
	)
	if err := svr.Run(); err != nil {
		log.Println("server stopped with error:", err)
	} else {
		log.Println("server stopped")
	}
}

Client

type FileConfigClientSuite struct {
	watcher monitor.ConfigMonitor
	service string
}

函数签名:

func NewSuite(service, key string, watcher filewatcher.FileWatcher,opts ...utils.Option)*FileConfigClientSuite

示例代码(或访问此处):

package main

import (
	"context"
	"encoding/json"
	"log"
	"os"
	"os/signal"
	"time"

	"github.com/cloudwego/kitex-examples/kitex_gen/api"
	"github.com/cloudwego/kitex-examples/kitex_gen/api/echo"
	kitexclient "github.com/cloudwego/kitex/client"
	"github.com/cloudwego/kitex/pkg/klog"
	fileclient "github.com/kitex-contrib/config-file/client"
	"github.com/kitex-contrib/config-file/filewatcher"
	"github.com/kitex-contrib/config-file/parser"
)

const (
	filepath    = "kitex_client.json"
	key         = "ClientName/ServiceName"
	serviceName = "ServiceName"
	clientName  = "ClientName"
)

// 由用户自定义
type MyParser struct{}

// 自定义解析器的一个示例
// 如果服务器配置的类型是 json 或 yaml,则仅需要使用默认解析器
func (p *MyParser) Decode(kind parser.ConfigType, data []byte, config interface{}) error {
	return json.Unmarshal(data, config)
}

func main() {
	klog.SetLevel(klog.LevelDebug)

	// 创建一个文件监听器对象
	fw, err := filewatcher.NewFileWatcher(filepath)
	if err != nil {
		panic(err)
	}
	// 开始监听文件变化
	if err = fw.StartWatching(); err != nil {
		panic(err)
	}

	go func() {
		sig := make(chan os.Signal, 1)
		signal.Notify(sig, os.Interrupt, os.Kill)
		<-sig
		fw.StopWatching()
		os.Exit(1)
	}()

	client, err := echo.NewClient(
		serviceName,
		kitexclient.WithHostPorts("0.0.0.0:8888"),
		kitexclient.WithSuite(fileclient.NewSuite(serviceName, key, fw)),
	)
	if err != nil {
		log.Fatal(err)
	}

	for {
		req := &api.Request{Message: "my request"}
		resp, err := client.Echo(context.Background(), req)
		if err != nil {
			klog.Errorf("take request error: %v", err)
		} else {
			klog.Infof("receive response %v", resp)
		}
		time.Sleep(time.Second * 10)
	}
}

NewFileWatcher

创建本地文件监听器

函数签名:

func NewFileWatcher(filePath string) (FileWatcher, error)

示例代码:

package main

import "github.com/kitex-contrib/config-file/filewatcher"

func main() {
	// 创建文件监听器对象
	fw, err := filewatcher.NewFileWatcher(filepath)
	if err != nil {
		panic(err)
	}
	// 启动文件监听(应当在引入 Suite 之前启动)
	if err = fw.StartWatching(); err != nil {
		panic(err)
	}

    // 程序退出时监听退出行为
	go func() {
		sig := make(chan os.Signal, 1)
		signal.Notify(sig, os.Interrupt, os.Kill)
		<-sig
		fw.StopWatching()
		os.Exit(1)
	}()
}

在服务端(Server)中,由于 KitexServer 的特性,我们只需要定义defer fw.StopWatching()即可

配置

自定义解析器

定义自定义格式解析器并通过NewSuiteoption传入,格式默认支持jsonyaml

接口定义:

// ConfigParser 配置文件的解析器。
type ConfigParser interface {
	Decode(kind ConfigType, data []byte, config interface{}) error
}

示例代码:

扩展解析 YAML 类型。

// 由用户自定义
type MyParser struct{}

// 自定义解析器的一个示例
// 如果服务器配置的类型是 json 或 yaml,则仅需要使用默认解析器
func (p *MyParser) Decode(kind parser.ConfigType, data []byte, config interface{}) error {
	return yaml.Unmarshal(data, config)
}

const YAML parser.ConfigType = "yaml"

func withParser(o *utils.Options) {
	o.Parser = &MyParser{}
	o.Params = &parser.ConfigParam{
		Type: YAML,
	}
}

// 依靠`NewSuite`传入

// 服务端
svr := echo.NewServer(
		new(EchoImpl),
		kitexserver.WithServerBasicInfo(&rpcinfo.EndpointBasicInfo{ServiceName: serviceName}),
		kitexserver.WithSuite(fileserver.NewSuite(key, fw, withParser)), // 添加监听
	)

// 客户端
client, err := echo.NewClient(
		serviceName,
		kitexclient.WithHostPorts("0.0.0.0:8888"),
		kitexclient.WithSuite(fileclient.NewSuite(serviceName, key, fw, withParser)),
	)

治理策略

在后续样例中,我们设定服务名称为 ServiceName,客户端名称为 ClientName

限流

Category=limit

限流目前只支持服务端,所以只需要设置服务端的 ServiceName。

JSON Schema

字段 说明
connection_limit 最大并发数量
qps_limit 每 100ms 内的最大请求数量

样例:

{
  "ServiceName": {
    "limit": {
      "connection_limit": 300,
      "qps_limit": 200
    }
  }
}

注:

  • 限流配置的粒度是 Server 全局,不分 client、method
  • 「未配置」或「取值为 0」表示不开启
  • connection_limit 和 qps_limit 可以独立配置,例如 connection_limit = 100, qps_limit = 0
  • 可以在一个 json 内编写多个服务的不同限流策略,只需要 filewatch 监控同一个文件,然后传入不同的 key 即可,如样例所示,key 即为ServiceName

重试

Category=retry

JSON Schema

参数 说明
type 0: failure_policy 1: backup_policy
failure_policy.backoff_policy 可以设置的策略: fixed none random

样例:

key 为 ClientName/ServiceName

{
  "ClientName/ServiceName": {
    "retry": {
      "*": {
        "enable": true,
        "type": 0,
        "failure_policy": {
          "stop_policy": {
            "max_retry_times": 3,
            "max_duration_ms": 2000,
            "cb_policy": {
              "error_rate": 0.2
            }
          }
        }
      },
      "Echo": {
        "enable": true,
        "type": 1,
        "backup_policy": {
          "retry_delay_ms": 200,
          "stop_policy": {
            "max_retry_times": 2,
            "max_duration_ms": 1000,
            "cb_policy": {
              "error_rate": 0.3
            }
          }
        }
      }
    }
  }
}

注:retry.Container 内置支持用 * 通配符指定默认配置(详见 getRetryer 方法)

超时

Category=rpc_timeout

JSON Schema

样例:

key 为 ClientName/ServiceName

{
  "ClientName/ServiceName": {
    "timeout": {
      "*": {
        "conn_timeout_ms": 100,
        "rpc_timeout_ms": 2000
      },
      "Pay": {
        "conn_timeout_ms": 50,
        "rpc_timeout_ms": 1000
      }
    }
  }
}

熔断

Category=circuit_break

JSON Schema

参数 说明
min_sample 最小的统计样本数

样例: echo 方法使用下面的配置(0.3、100),其他方法使用全局默认配置(0.5、200)

key 为 ClientName/ServiceName

{
  "ClientName/ServiceName": {
    "circuitbreaker": {
      "Echo": {
        "enable": true,
        "err_rate": 0.3,
        "min_sample": 100
      }
    }
  }
}

注:kitex 的熔断实现目前不支持修改全局默认配置(详见 initServiceCB

更多信息

更多示例请参考 example

注意事项

客户端键名

对于单一客户端配置,您应该将它们的所有配置写入同一对$UserServiceName/$ServerServiceName中,例如

{
  "ClientName/ServiceName": {
    "timeout": {
      "*": {
        "conn_timeout_ms": 100,
        "rpc_timeout_ms": 2000
      },
      "Pay": {
        "conn_timeout_ms": 50,
        "rpc_timeout_ms": 1000
      }
    },
    "circuitbreaker": {
      "Echo": {
        "enable": true,
        "err_rate": 0.3,
        "min_sample": 100
      }
    },
    "retry": {
      "*": {
        "enable": true,
        "type": 0,
        "failure_policy": {
          "stop_policy": {
            "max_retry_times": 3,
            "max_duration_ms": 2000,
            "cb_policy": {
              "error_rate": 0.2
            }
          }
        }
      },
      "Echo": {
        "enable": true,
        "type": 1,
        "backup_policy": {
          "retry_delay_ms": 200,
          "stop_policy": {
            "max_retry_times": 2,
            "max_duration_ms": 1000,
            "cb_policy": {
              "error_rate": 0.3
            }
          }
        }
      }
    }
  }
}

兼容性

项目中使用了sync/atomic在 1.19 版本加入的新特性,因此Go 的版本必须 >= 1.19


最后修改 January 13, 2025 : docs: add description for streamx (#1202) (0337c81)