找回密码
 立即注册
查看: 147|回复: 0

记一次优化:将预序列化好的 protobuf 消息嵌入另一个消息中

[复制链接]
发表于 2023-3-6 18:46 | 显示全部楼层 |阅读模式
关键词:go,protobuf,性能优化,复用
背景:

在一些涉及到算法模型相关的开发项目中,往往会用 protobuf 来编码算法模型用到的特征集合。而目前遇到一个问题:线上用到的特征集的规模特别夸张,导致序列化和反序列化速度过慢。300 条 doc 的特征序列化之后的 protobuf 字节流大小有 2.3M。经测试,序列化的耗时在 100 毫秒以上,反序列化的开销也有数十毫秒。基本不能用来在线 serving。
根据目前的架构设计,提取特征还有模型预测其实是分开实现在不同的微服务中。典型的调用链路是这样的:
1. 从特征服务获取特征集合
2. 在特征集合上添加一些控制参数
3. 将 2 环节的产出发送到模型预测服务在这条链路中,特征集合经历了一次额外的序列化和反序列化。而由于模型需要的请求结构和特征服务的结构定义不完全相同。因此这一次额外的序列化和反序列化不能直接省去。
目的:

通过将预序列化好的消息嵌入另一个消息中,达到减少一次序列化 & 反序列化过程的效果。
详细描述:

假定一个场景:从 A 处(网络、文件等)获取到一条 HugeMessge 的实例,这个实例特别的大。而需要做的是在 HugeMessage 的外层包装上一些额外的参数组成 Request 然后转发到 B 处(网络、文件等)。 消息定义如下。
syntax = "proto3";
package protodef;
option go_package = "pkg/protodef";

message HugeMessage {
    // omitted
    bytes data = 1;
}

message Request {
    string name = 1;
    HugeMessage payload = 2;
}典型的代码实现(go)是这样的,会有一次序列化和反序列化的过程。
func main() {
    // receive it from file or network, not important.
    bins, _ := os.ReadFile("hugeMessage.dump")
    var message protodef.HugeMessage
    _ = proto.Unmarshal(bins, &message) // slow
    request := protodef.Request{
        Name: "xxxx",
        Payload: &message,
    }
    requestBinary, _ := proto.Marshal(&request) // slow
    // send it.
    os.WriteFile("request.dump", requestBinary, 0644)
}
其实以上的过程是可以避免的。从信息量的角度来说,Request 的信息量并不比 HugeMessage 多多少。而为了编码他付出了不对等的努力。这里是有优化的空间的。根据 protobuf 官网给出的 wire 格式简介,它指出:
A protocol buffer message is a series of key-value pairs. The binary version of a message just uses the field's number as the key -- the name and declared type for each field can only be determined on the decoding end by referencing the message type's definition (that is, the.protofile). Protoscope does not have access to this information, so it can only provide the field numbers.
简而言之,protobuf 的字节流其实是由一系列键值对组成的。每一个域 Field 与其对应的值 Value 编码成一对键值对。而一条消息其实就是一组键值对。在后文中指出,protobuf 并不关心键值对的顺序。因此,嵌入一个域可以这样完成:借助 protowire 手动生成这个域对应的键值对,将这个键值对序列化,然后加到希望嵌入的消息字节流末尾。



Protobuf Message 结构

在每个键值对中,键部分存储着相应域的类型、域编号以及值的长度。借助这些信息,protobuf 可以进行解码。以 Request 中的HugeMessage 为例。假设 HugeMessage 序列化后的字节为 0x0A01AA。其对应的 field_number 为 2,wire_type 查表知为 LEN(id = 2)。而wire_type = LEN 时,会附带一个长度编码信息,这个长度采用 VARINT 编码。这里可以根据以下公式算出,key 为 0x1203 。
key = (field_number << 3) | wire_type
相应的, value 为 0x0A01AA。将二者拼接得到键值对的字节流为 0x12030A01AA。
具体实现以及测试

以上的 HEX 计算都是根据文档手动演算来的。实际上 protowire 已经将这些工作封装好了。因此,根据以上思路,可以编写出这样的代码。(name 参数传进来是为了方便编写测试代码)。
func binaryEmbeddingImplementation(messageBytes []byte, name string) (requestBytes []byte, err error) {
    // 1. create a request with all ready except the payload. and marshal it.
    request := protodef.Request{
        Name: name,
    }
    requestBytes, err = proto.Marshal(&request)
    if err != nil {
        return nil, err
    }
    // 2. manually append the payload to the request, by protowire.
    requestBytes = protowire.AppendTag(requestBytes, 2, protowire.BytesType) //  embedded message is same as a bytes field, in wire view.
    requestBytes = protowire.AppendBytes(requestBytes, messageBytes)
    return requestBytes, nil
}
相应的,大多数情况下,这个实现为。
func commonImplementation(messageBytes []byte, name string) (requestBytes []byte, err error) {
    // receive it from file or network, not important.
    var message protodef.HugeMessage
    _ = proto.Unmarshal(messageBytes, &message) // slow
    request := protodef.Request{
        Name:    name,
        Payload: &message,
    }
    return proto.Marshal(&request) // slow
}
编写一些测试以及性能测试代码。
func TestEquivalent(t *testing.T) {
    requestBytes1, _ := commonImplementation(hugeMessageSample, "xxxx")
    requestBytes2, _ := binaryEmbeddingImplementation(hugeMessageSample, "xxxx")
    // NOTICE:They are not always equal int bytes. you should compare them in message view instead of binary from
    // due to: https://developers.google.com/protocol-buffers/docs/encoding#implications
    // I'm Lazy.
    assert.NotEmpty(t, requestBytes1)
    assert.Equal(t, requestBytes1, requestBytes2)
    var request protodef.Request
    err := proto.Unmarshal(requestBytes1, &request)
    assert.NoError(t, err)
    assert.Equal(t, "xxxx", request.Name)
}
// actually mock one.
func receiveHugeMessageFromSomewhere() []byte {
    buffer := make([]byte, 1024*1024*1024)
    _, _ = rand.Read(buffer)
    message := protodef.HugeMessage{
        Data: buffer,
    }
    res, _ := proto.Marshal(&message)
    return res
}

func BenchmarkCommon(b *testing.B) {
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        _, err := commonImplementation(hugeMessageSample, "xxxx")
        if err != nil {
            panic(err)
        }
    }
}

func BenchmarkEmbedding(b *testing.B) {
    b.ResetTimer()
    for i := 0; i < b.N; i++ {
        _, err := binaryEmbeddingImplementation(hugeMessageSample, "xxxx")
        if err != nil {
            panic(err)
        }
    }
}

$ go test -bench=.  ./pkg/
goos: darwin
goarch: arm64
pkg: pbembedding/pkg
BenchmarkCommon-8              9         629721014 ns/op
BenchmarkEmbedding-8          28          59078219 ns/op
PASS
ok      pbembedding/pkg 14.988s
为了对效果进行测试,我构建了一个 1GiB 的样本作为嵌入用的消息,然后直接对比两种实现产生的字节流(这种对比手段不科学,因为 protobuf 字节流顺序并不稳定)。对比结果显示二者完全相同。并且,嵌入方式的运行速度有着显著的优势。
然而有一点必须要指出,手动进行嵌入是有很大风险的行为。因为这打破了 protobuf 的封装而直接修改底层的二进制结构。如果没有进行足够的测试,可能遇到意想不到的问题。而且,大多数情况下,commonImplementation 才应该是推荐的做法。只有当官方实现无法解决问题时,才应该去打破这个边界。此外,这种做法也会为后续的代码维护带来困难。
参考等其他信息:


  • Faster Protocol Buffers
  • go - Could I reuse an existing protobuf binary, when marshaling a message including it?(protobuf3) - Stack Overflow
  • protobuf official document
  • protowire package - google.golang.org/protobuf/encoding/protowire - Go Packages
  • 文中的代码可以在 proto_embedding/pkg at master · LebranceBW/proto_embedding 下载到

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

×
懒得打字嘛,点击右侧快捷回复 【右侧内容,后台自定义】
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Unity开发者联盟 ( 粤ICP备20003399号 )

GMT+8, 2024-5-3 11:47 , Processed in 0.091585 second(s), 26 queries .

Powered by Discuz! X3.5 Licensed

© 2001-2024 Discuz! Team.

快速回复 返回顶部 返回列表