protobuf怎样传输复杂数据结构空间复杂度

C++ 技术(61)
#include &addressbook.pb.h&
#include &google/protobuf/descriptor.h&
#include &google/protobuf/message.h&
#include &google/protobuf/io/zero_copy_stream.h&
#include &google/protobuf/descriptor_database.h&
#include &google/protobuf/dynamic_message.h&
#include &google/protobuf/util/json_util.h&
#include &google/protobuf/util/type_resolver.h&
#include &google/protobuf/util/type_resolver_util.h&
#include &zlib.h& &// adler32,crc32
#include &bitset&
#include &string&
#include &algorithm&
#include &iostream&
#include &arpa/inet.h& &// htonl, ntohl
#include &stdint.h&
using namespace std::
//g++ -g proto.cpp addressbook.pb.cc -lprotobuf -lz -std=gnu++11 -o proto -Wl,-rpath,/usr/local/lib -I/usr/local/include/
yum install zlib-devel
protolbuf,测试代码使用 v3.2版本
/google/protobuf/
/chenshuo/recipes/blob/master/protobuf/codec.h
struct ProtobufTransportFormat __attribute__ ((__packed__))
int16_t &&
& & int16_t &nameL
& & char & & typeName[nameLen];
& & char & & protobufData[len - nameLen - 8];
& & int32_t &checkS
从最低位开始,
第0位用作校验类型, 0: adler32(默认), 1:表示使用的是 CRC32。
第1位用来表示BufData的protobuf编码类型,0: 二进制编码(默认),1:json编码
第2位用来表示BufData的压缩类型,0:不压缩(默认);1:zip压缩
2. checkSum
只支持adler32和crc32校验和,校验内容按顺序包括 flag+namelen+typeName+protobufData
不包括自身四字节长度
4. protobufData长度
(len - nameLen - 8)等于下面表达式
(len - NameLen - sizeof(Flag) - sizeof(NameLen) - sizeof(CheckSum))
class CProtobufPacket final
inline std::string encode(const protobuf::Message& message);
inline protobuf::Message* decode(const std::string& buf);
inline void set_proto_checksum_algorithm(bool adler32);
inline void set_proto_format(bool json);
inline void set_proto_zip(bool zip);
inline bool get_proto_checksum_algorithm();
inline bool get_proto_format();
inline bool get_proto_zip();
const int32_t HEAD_LEN = sizeof(int32_t);
const int16_t FLAG_LEN = sizeof(int16_t);
const int16_t NAME_LEN = sizeof(int16_t);
const int16_t CHECKSUM_LEN = sizeof(int32_t);
const int16_t CHECKSUM_ALGORITHM_INDXE = 0;
const int16_t PROTO_FORMAT_INDXE = 1;
const int16_t PROTO_ZIP_INDXE = 2;
const int32_t MIN_LENGTH = HEAD_LEN + FLAG_LEN + NAME_LEN + CHECKSUM_LEN;
std::bitset&16& flag_ = 0;
std::function&size_t (size_t,const Bytef*,size_t)& checksum_f = std::bind(::adler32,_1,_2,_3);
protobuf::Message* createMessage(const std::string& type_name);
inline int32_t asInt32(const char* buf);
inline int16_t asInt16(const char* buf);
bool unzip(std::string &json);
inline std::string CProtobufPacket::encode(
const protobuf::Message& message)
std::string result = &&;
result.resize(HEAD_LEN);
//设置 flag
int16_t be16 = ::htons(flag_.to_ulong());
result.append(reinterpret_cast&char*&(&be16),sizeof(int16_t));
//设置 nameLen和typeName(\0结尾)
const std::string& typeName = message.GetTypeName();
int16_t nameLen = static_cast&int16_t&(typeName.size() + 1);
be16 = ::htons(nameLen);
result.append(reinterpret_cast&char*&(&be16),sizeof(int16_t));
result.append(typeName.c_str(), nameLen);
bool success =
//设置 protobufData
if(flag_.test(PROTO_FORMAT_INDXE))
protobuf::util::JsonO
//options.add_whitespace =
if(flag_.test(PROTO_ZIP_INDXE))
std::string json = &&;
success = MessageToJsonString(message, &json, options).ok();
uint64_t ziplen = compressBound(json.size());
std::shared_ptr&Bytef& zip(new Bytef[ziplen + 1]);
int err = compress(zip.get(),&ziplen,(const Bytef*)json.c_str(),json.size());
if(err != Z_OK)
result.append((const char *)zip.get(),ziplen);
success = MessageToJsonString(message, &result, options).ok();
success = message.AppendToString(&result);
if(!success)
result.clear();
if(flag_.test(CHECKSUM_ALGORITHM_INDXE))
checksum_f = std::bind(crc32,_1,_2,_3);
int32_t checkSum = checksum_f(1,
& & & &reinterpret_cast&const Bytef*&(result.c_str() + HEAD_LEN),&
& result.size() - HEAD_LEN);
checkSum = ::htonl(checkSum);
result.append(reinterpret_cast&char*&(&checkSum), sizeof(int32_t));
//网络序数据包长度
int32_t len = ::htonl(result.size() - HEAD_LEN);
std::copy(reinterpret_cast&char*&(&len),
reinterpret_cast&char*&(&len) + sizeof len,
result.begin());
inline protobuf::Message*&
CProtobufPacket::decode(const std::string& buf)
& int32_t total = static_cast&int32_t&(buf.size());
& int32_t len = asInt32(buf.c_str());
& if (total & MIN_LENGTH || len &= total)
& //int32_t checkSum = asInt32(buf.c_str() + buf.size() - HEAD_LEN);
& int32_t checkSum = asInt32(buf.c_str() + len);
& flag_ = asInt16(buf.c_str() + HEAD_LEN);
& if(flag_.test(CHECKSUM_ALGORITHM_INDXE))
&checksum_f = std::bind(crc32,_1,_2,_3);
& int32_t compute_checkSum = checksum_f(1,&
& reinterpret_cast&const Bytef*&(buf.c_str() + HEAD_LEN),&
& len - CHECKSUM_LEN);
& if (checkSum != compute_checkSum)
& int16_t nameLen = asInt16(buf.c_str() + HEAD_LEN + FLAG_LEN);
& if (nameLen & 2 || nameLen & len - 2*HEAD_LEN)
& std::string typeName = buf.substr(HEAD_LEN + FLAG_LEN + NAME_LEN,nameLen);
& google::protobuf::Message* message = createMessage(typeName);
& if(!message)
& const char* data = buf.c_str() + HEAD_LEN + FLAG_LEN + NAME_LEN + nameL
& int32_t dataLen = len - FLAG_LEN - NAME_LEN - CHECKSUM_LEN - nameL
& bool success =
& if(flag_.test(PROTO_FORMAT_INDXE))
&protobuf::util::JsonParseO
&std::string json = buf.substr
&(HEAD_LEN + FLAG_LEN + NAME_LEN + nameLen,dataLen);
&if(flag_.test(PROTO_ZIP_INDXE))
&success = unzip(json);
&success = JsonStringToMessage(json,message, options).ok();
&success = message-&ParseFromArray(data, dataLen);
& if(!success)
& return success ? message :
inline google::protobuf::Message* CProtobufPacket::createMessage(const std::string& type_name)
& google::protobuf::Message* message =
& const google::protobuf::Descriptor* descriptor =
& & google::protobuf::DescriptorPool::generated_pool()-&FindMessageTypeByName(type_name);
& if (descriptor)
& & const google::protobuf::Message* prototype =
& & & google::protobuf::MessageFactory::generated_factory()-&GetPrototype(descriptor);
& & if (prototype)
& & & message = prototype-&New();
bool CProtobufPacket::unzip(std::string &json)
//最大支持4倍压缩
uint64_t unziplen = json.size() * 4;
shared_ptr&Bytef& unzip(new Bytef[unziplen + 1]);
int err = uncompress(unzip.get(),&unziplen,(const Bytef *)json.c_str(),json.size());
if(err == Z_OK)
json.clear();
json.resize(unziplen);
std::copy((const char *)unzip.get(),
(const char *)unzip.get() + unziplen,json.begin());
return err == Z_OK ? true :
inline int32_t CProtobufPacket::asInt32(const char* buf)
//int32_t be32 = 0;
& //::memcpy(&be32, buf, sizeof(be32));
//return ::ntohl(be32);
uint32_t ch1 = 0, ch2 = 0, ch3 = 0,ch4 = 0;
ch1 = (uint32_t)(buf[0] & 0xff);
ch2 = (uint32_t)(buf[1] & 0xff);
ch3 = (uint32_t)(buf[2] & 0xff);
ch4 = (uint32_t)(buf[3] & 0xff);
return (ch1 && 24) + (ch2 && 16) + (ch3 && 8) + (ch4 && 0);
inline int16_t CProtobufPacket::asInt16(const char* buf)
//int16_t be16 = 0;
& //::memcpy(&be16, buf, sizeof(int16_t));
& //return ::ntohs(be16);
uint16_t ch1 = 0, ch2 = 0;
ch1 = (unsigned short)(buf[0] & 0xff);
ch2 = (unsigned short)(buf[1] & 0xff);
return (ch1 && 8) + (ch2 && 0);&
inline void CProtobufPacket::set_proto_checksum_algorithm(bool adler32)
flag_[CHECKSUM_ALGORITHM_INDXE] = adler32 ? 0 : 1;
inline bool CProtobufPacket::get_proto_checksum_algorithm()
return flag_.test(CHECKSUM_ALGORITHM_INDXE);
inline void CProtobufPacket::set_proto_format(bool json)
flag_[PROTO_FORMAT_INDXE] = json ? 1 : 0;
inline bool CProtobufPacket::get_proto_format()
return flag_.test(PROTO_FORMAT_INDXE);
inline void CProtobufPacket::set_proto_zip(bool zip)
flag_[PROTO_ZIP_INDXE] = zip ? 1 : 0;
inline bool CProtobufPacket::get_proto_zip()
return flag_.test(PROTO_ZIP_INDXE);
void addPerson(tutorial::AddressBook &address_book)
tutorial::Person* person = address_book.add_people();
assert(person);
person-&set_id(100);
*person-&mutable_name() = &huang&;
person-&set_email(&&);
tutorial::Person::PhoneNumber* phone_number = person-&add_phones();
& & phone_number-&set_number(&&);
phone_number-&set_type(tutorial::Person::MOBILE);
phone_number = person-&add_phones();
& & phone_number-&set_number(&&);
phone_number-&set_type(tutorial::Person::WORK);
int main(int argc,char * argv[])
GOOGLE_PROTOBUF_VERIFY_VERSION;
tutorial::AddressBook address_
addPerson(address_book);
addPerson(address_book);
//std::string protobuf = &&;
//assert(address_book.SerializeToString(&protobuf));
CProtobufP
packet.set_proto_checksum_algorithm(true);
packet.set_proto_format(true);
packet.set_proto_zip(true);
std::string buf = packet.encode(address_book);
buf.append(&dafdfdsa&);
tutorial::AddressBook *book = dynamic_cast&tutorial::AddressBook*&(packet.decode(buf)); &
for(int i = 0;i & book-&people_size();++i)
const tutorial::Person &person = book-&people(i);
std::cout&&&id:&&&person.id()&&
std::cout&&&e-mail:&&&person.email()&&
for(int k = 0;k& person.phones_size();++k)
const tutorial::Person_PhoneNumber &number = person.phones(k);
std::cout&&&number:&&&number.number()&&
std::cout&&&number type:&&&number.type()&&
// Optional: &Delete all global objects allocated by libprotobuf.
& google::protobuf::ShutdownProtobufLibrary();
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:256510次
积分:3463
积分:3463
排名:第7966名
原创:74篇
转载:21篇
评论:36条
(1)(1)(1)(1)(2)(1)(5)(2)(1)(1)(2)(1)(1)(1)(1)(2)(2)(8)(1)(2)(1)(1)(1)(4)(1)(1)(1)(2)(2)(1)(1)(1)(2)(1)(2)(1)(2)(1)(1)(2)(1)(2)(1)(1)(2)(1)(1)(1)(4)(1)(2)(1)(1)(2)(1)(4)(5)(2)Protobuf与JAVA - 深入一点,你会更加快乐 - ITeye技术网站
博客分类:
我们在开发一些RPC调用的程序时,通常会涉及到对象的序列化/反序列化的问题,比如一个“Person”对象从Client端通过TCP方式发送到Server端;因为TCP协议(UDP等这种低级协议)只能发送字节流,所以需要应用层将Java对象序列化成字节流,数据接收端再反序列化成Java对象即可。“序列化”一定会涉及到编码(encoding,format),目前我们可选择的编码方式:
1)使用JSON,将java对象转换成JSON结构化字符串。在web应用、移动开发方面等,基于Http协议下,这是常用的,因为JSON的可读性较强。性能稍差。
2)基于XML,和JSON一样,数据在序列化成字节流之前,都转换成字符串。可读性强,性能差,异构系统、open api类型的应用中常用。
3)使用JAVA内置的编码和序列化机制,可移植性强,性能稍差。无法跨平台(语言)。
4)其他开源的序列化/反序列化框架,比如Apache Avro,Apache Thrift,这两个框架和Protobuf相比,性能非常接近,而且设计原理如出一辙;其中Avro在大数据存储(RPC数据交换,本地存储)时比较常用;Thrift的亮点在于内置了RPC机制,所以在开发一些RPC交互式应用时,Client和Server端的开发与部署都非常简单。
评价一个序列化框架的优缺点,大概有2个方面:1)结果数据大小,原则上说,序列化后的数据尺寸越小,传输效率越高。 2)结构复杂度,这会影响序列化/反序列化的效率,结构越复杂,越耗时。
Protobuf是一个高性能、易扩展的序列化框架,它的性能测试有关数据可以参看官方文档。通常在TCP Socket通讯(RPC调用)相关的应用中使用;它本身非常简单,易于开发,而且结合Netty框架可以非常便捷的实现一个RPC应用程序,同时Netty也为Protobuf解决了有关Socket通讯中“半包、粘包”等问题(反序列化时,字节成帧)。
1、安装Protobuf
从“/protocol-buffers/docs/downloads”下载安装包,windows下的使用不再赘言;在linux或者mac下,下载tar.gz的压缩包,解压后执行:
$ ./configure
$ make check
$ make install
此后,可以通过“protoc --version”查看是否安装成功了,安装过程不需要配置环境变量。安装主要是为了能够使用命令编译proto文件,实际部署环境并不需要。
Protobuf需要一个schema声明文件,后缀为“.proto”的文本文件,内容样例如下:
option java_package = "com.test.protobuf";
option java_outer_classname="PersonProtos";
message Person {
required string name = 1;
required int32 id = 2;
optional string email = 3;
enum PhoneType {
MOBILE = 0;
message PhoneNumber {
required string number = 1;
optional PhoneType type = 2 [default = HOME];
repeated PhoneNumber phone = 4;
如果你曾经使用过thrift、avro,你会发现它们都需要一个类似的schema文件,只是结构规则不同罢了。特别备注:protbuf和thrift的声明文件相似度极高。
“message”表示,声明一个“类”,即java中的class。message中可以内嵌message,就像java的内部类一样。一个message有多个filed,“required string name = 1”则表示:name字段在序列化、反序列化时为第一个字段,string类型,“required”表示这个字段的值是必选;可以看出每个filed都至少有着三个部分组成,其中filed的“位置index”全局唯一。“optional”表示这个filed是可选的(允许为null)。“repeated”表示这个filed是一个集合(list)。也可以通过[default = ]为一个“optional”的filed指定默认值。
我们可以在一个.proto文件中声明多个“message”,不过大部分情况下我们把互相继承或者依赖的类写入一个.proto文件,将那些没有关联关系的类分别写入不同的文件,这样便于管理。
我们可以在.proto文件的头部声明一些额外的信息,比如“java_package”表示当“generate code”时将生成的java代码放入指定的package中。“java_outer_classname”表示生成的java类的名称。
然后执行如下命令,生成JAVA代码:
protoc --java_out=./ Persion.proto
通过“--java_out”指定生成JAVA代码保存的目录,后面紧跟“.proto”文件的路径。此后我们看到生成 了Package和一个PersonProto.java文件,我们只需要把此java文件复制到项目中即可。
3、JAVA实例
1)pom.xml
&dependency&
&groupId&com.google.protobuf&/groupId&
&artifactId&protobuf-java&/artifactId&
&version&2.6.1&/version&
&/dependency&
PersonProtos.Person.Builder personBuilder = PersonProtos.Person.newBuilder();
personBuilder.setEmail("");
personBuilder.setId(1000);
PersonProtos.Person.PhoneNumber.Builder phone = PersonProtos.Person.PhoneNumber.newBuilder();
phone.setNumber("");
personBuilder.setName("张三");
personBuilder.addPhones(phone);
PersonProtos.Person person = personBuilder.build();
获得到person实例后,我们可以通过如下方式,将person对象序列化、反序列化。
//第一种方式
byte[] data = person.toByteArray();//获取字节数组,适用于SOCKET或者保存在磁盘。
//反序列化
PersonProtos.Person result = PersonProtos.Person.parseFrom(data);
System.out.println(result.getEmail());
这种方式,适用于很多场景,Protobuf会根据自己的encoding方式,将JAVA对象序列化成字节数组。同时Protobuf也可以从字节数组中重新decoding,得到Java新的实例。
//第二种序列化:粘包,将一个或者多个protobuf对象字节写入stream。
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
//生成一个由:[字节长度][字节数据]组成的package。特别适合RPC场景
person.writeDelimitedTo(byteArrayOutputStream);
//反序列化,从steam中读取一个或者多个protobuf字节对象
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(byteArrayOutputStream.toByteArray());
result = PersonProtos.Person.parseDelimitedFrom(byteArrayInputStream);
System.out.println(result.getEmail());
第二种方式,是RPC调用中、Socket传输时适用,在序列化的字节数组之前,添加一个varint32的数字表示字节数组的长度;那么在反序列化时,可以通过先读取varint,然后再依次读取此长度的字节;这种方式有效的解决了socket传输时如何“拆包”“封包”的问题。在Netty中,适用了同样的技巧。
//第三种序列化,写入文件或者Socket
FileOutputStream fileOutputStream = new FileOutputStream(new File("/test.dt"));
person.writeTo(fileOutputStream);
fileOutputStream.close();
FileInputStream fileInputStream = new FileInputStream(new File("/test.dt"));
result = PersonProtos.Person.parseFrom(fileInputStream);
System.out.println(result);
第三种方式,比较少用。但是比较通用,意思为将序列化的字节数组写入到OutputStream中,具体的拆包工作,交给了高层框架。
4、protobuf入门介绍
以上述Person.proto文件为例:
message Person {
required string name = 1;
required int32 id = 2;
optional string email = 3;
声明了三个filed,每个filed都“规则”、“类型”、“字段名称”和一个“唯一的数字tag”。
1)其中“规则”可以为如下几个值:
“required”:表示此字段值必填,一个结构良好的message至少有一个flied为“required”。
“optional”:表示此字段值为可选的。对于此类型的字段,可以通过default来指定默认值,这是一个良好的设计习惯。
optional int32 page = 3 [default = 10];
如果没有指定默认值,在encoding时protobuf将会用一个特殊的默认值来替代。对于string,默认值为空,bool类型默认为false,数字类型默认位0,对于enum则默认值为枚举列表的第一个值。
“repeated”:表示这个字段的值可以允许被重复多次,如果转换成JAVA代码,此filed数据结构为list,有序的。可以在“repeated”类型的filed后使用“packed”--压缩,提高数据传输的效率。
repeated int32 numbers = 4 [packed=true];
特别需要注意:当你指定一个filed位required时,需要慎重考虑这个filed是否永远都是“必须的”。将一个required调整为optional,需要同时重新部署数据通讯的Client和Server端,否则将会对解析带来问题。
2)可以在一个.proto文件中,同时声明多个message,这样是允许的。
3)为message或者filed添加注释,风格和JAVA一样:
optional int32 page = 3;// Which page number do we want?
4)数据类型与JAVA对应关系:
ByteString
其中“ByteString”是Protobuf自定义的JAVA API。
5)枚举:和JAVA中Enum API一致,如果开发者希望某个filed的值只能在一些限定的列表中,可以将次filed声明为enum类型。Protobuf中,enum类型的每个值是一个int32的数字,不像JAVA中那样enum可以定义的非常复杂。如果enum中有些值是相同的,可以将“allow_alias”设定为true。
message Person {
required Type type = 1;
enum Type {
option allow_alias =
TEACHER = 0;
STUDENT = 1;
OTHER = 1;//the same as STUDENT
6)import:如果当前.proto文件中引用了其他proto文件的message类型,那么可以在此文件的开头声明import。
import "other_protos.proto";
不过这会引入一个小小的麻烦,如果你的“other_protos.proto”文件变更了目录,需要连带修改其他文件。
7)嵌入message:类似于java的内部类,即在message中,嵌入其他message。如Person.proto例子中的PhoneNumber。
8)更新message类型:如果一个现有的message类型无法满足当前的需要,比如你需要新增一个filed,但是仍然希望使用生成的旧代码来解析。
(1)不要修改现有fileds的数字tag,即字段的index数字。
(2)新增字段必须为optional或者repeated类型,同时还要为它们设置“default”值,这意味着“old”代码序列化的messages能够被“new”代码解析。“new”代码生成的数据也能被“old”代码解析,对于“old”代码而言,那些没有被声明的filed将会在解析式忽略。
(3)非“required”filed可以被删除,但是它的“数字tag”不能被其他字段重用。
(4)int32、uint32、int64、uint64、bool,是互相兼容的,它们可以从一个类型修改成另外一个,而不会对程序带来错误。参见源码WireFormat.FiledType
(5)sint32和sint64是兼容的,但和其他数字类型是不兼容的。
(6)string和bytes是兼容的,只要为UTF-8编码的。注意protobuf中string默认是UTF-8编码的。
(7)optional与repeated是兼容的。如果输入的数据格式是repeated,但是client希望接受的数据是optional,对于原生类型,那么client将会使用repeated的最后一个值,对于message类型,client将会merge这些输入的数据。
(8)修改“default”值通常不会有任何问题,只要保证这个默认值不会被真正的使用。
9)Map结构:
map&key_type, value_type& map = 3;
其中key_type可以为任何“整形”或者string类型,value_type可以为任意类型,只要JAVA API能够支持。map类型不能被“repeated”、“optional”或者“required”修饰,传输过程中无法确保map中数据的顺序,
对于文本格式,map是按照key排序。
10)如下为一些有用的选项:
(1)java_package:在.proto文件的顶部设定,指定生成JAVA文件时类所在的package。
option java_package = "com.example.foo";
(2)java_outer_classname:在.proto文件的顶部设定,指定生成JAVA文件时类的名字。一个.proto文件只会生成一个JAVA类。
option java_outer_classname = "FooProtos";
(3)packed:对于repeated类型有效,指定输入的数据是否“压缩”。
5、protobuf序列化原理:
其实protobuf的序列化原理并不是什么高超的“绝技”:如果你曾经了解过thrift、avro,或者从事过socket通信,那么你对protobuf的序列化方式并不感到惊奇;如下为protobuf的序列化format:
[serializedSize]{[int32(tag,type)][value]...}
对于一个message,序列化时首先就算这个message所有filed序列化需要占用的字节长度,计算这个长度是非常简单的,因为protobuf中每种类型的filed所占用的字节数是已知的(bytes、string除外),只需要累加即可。这个长度就是serializedSize,32为integer,在protobuf的某些序列化方式中可能使用varint32(一个压缩的、根据数字区间,使用不同字节长度的int);此后是filed列表输出,每个filed输出包含int32(tag,type)和value的字节数组,从上文我们知道每个filed都有一个唯一的数字tag表示它的index位置,type为字段的类型,tag和type分别占用一个int的高位、低位字节;如果filed为string、bytes类型,还会在value之前额外的补充添加一个varint32类型的数字,表示string、bytes的字节长度。
那么在反序列化的时候,首先读取一个32为的int表示serializedSize,然后读取serializedSize个字节保存在一个bytebuffer中,即读取一个完整的package。然后读取一个int32数字,从这个数字中解析出tag和type,如果type为string、bytes,然后补充读取一个varint32就知道了string的字节长度了,此后根据type或者字节长度,读取后续的字节数组并转换成java type。重复上述操作,直到整个package解析完毕。
protobuf的这种序列化format,极大的介绍了输入、输出的数据大小,而且复杂度非常低,从而性能较高。
6、protobuf与Netty编程:
1)Netty Server端样例
public class ProtobufNettyServerTestMain {
public static void main(String[] args) {
//bossGroup : NIO selector threadPool
EventLoopGroup bossGroup = new NioEventLoopGroup();
//workerGroup : socket data read-write worker threadPool
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer&SocketChannel&() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(PersonProtos.Person.getDefaultInstance()))
.addLast(new ProtobufVarint32LengthFieldPrepender())
.addLast(new ProtobufEncoder())
.addLast(new ProtobufServerHandler());//自定义handler
}).childOption(ChannelOption.TCP_NODELAY,true);
System.out.println("begin");
//bind到本地的18080端口
ChannelFuture future = bootstrap.bind(18080).sync();
//阻塞,直到channel.close
future.channel().closeFuture().sync();
System.out.println("end");
} catch (Exception e) {
e.printStackTrace();
} finally {
//辅助线程优雅退出
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
备注:channel内部维护一个pipeline,类似一个filter链表一样,所有的socket读写都会经过,对于write操作(outbound)会从pipeline列表的last--&first方向依次调用Encoder处理器;对于read操作(inbound)会从first--&last依次调用Decoder处理器。此外Encoder处理对于read操作不起效,Decoder处理器对write操作不起效,原理 稍后在Netty相关章节介绍。
ProtobufEncoder:非常简单,内部直接使用了message.toByteArray()将字节数据放入bytebuf中输出(out中,交由下一个encoder处理)。
ProtobufVarint32LengthFieldPrepender:因为ProtobufEncoder只是将message的各个filed按照规则输出了,并没有serializedSize,所以socket无法判定package(封包)。这个Encoder的作用就是在ProtobufEncoder生成的字节数组前,prepender一个varint32数字,表示serializedSize。
ProtobufVarint32FrameDecoder:这个decoder和Prepender做的工作正好对应,作用就是“成帧”,根据seriaziedSize读取足额的字节数组--一个完整的package。
ProtobufDecoder:和ProtobufEncoder对应,这个Decoder需要指定一个默认的instance,decoder将会解析byteArray,并根据format规则为此instance中的各个filed赋值。
2)ProtobufServerHandler.java
发送Protobuf数据和接收client发送的数据。一个自定义的处理器,通常我们的业务会在这里处理。
public class ProtobufServerHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
PersonProtos.Person person = (PersonProtos.Person)
//经过pipeline的各个decoder,到此Person类型已经可以断定
System.out.println(person.getEmail());
ChannelFuture future = ctx.writeAndFlush(build());
//发送数据之后,我们手动关闭channel,这个关闭是异步的,当数据发送完毕后执行。
future.addListener(ChannelFutureListener.CLOSE);
* 构建一个Protobuf实例,测试
public MessageLite build() {
PersonProtos.Person.Builder personBuilder = PersonProtos.Person.newBuilder();
personBuilder.setEmail("");
personBuilder.setId(1000);
PersonProtos.Person.PhoneNumber.Builder phone = PersonProtos.Person.PhoneNumber.newBuilder();
phone.setNumber("");
personBuilder.setName("张三");
personBuilder.addPhones(phone);
return personBuilder.build();
3)Netty Client样例
public class ProtobufNettyClientTestMain {
public static void main(String[] args) throws Exception{
EventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,10000)
.handler(new ChannelInitializer&SocketChannel&() {
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("initChannel");
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufDecoder(PersonProtos.Person.getDefaultInstance()))
.addLast(new ProtobufVarint32LengthFieldPrepender())
.addLast(new ProtobufEncoder())
.addLast(new ProtobufClientHandler());
ChannelFuture future = bootstrap.connect(new InetSocketAddress("127.0.0.1", 18080));
System.out.println("begin");
future.channel().closeFuture().sync();
System.out.println("Closed");
} catch (Exception e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
4)ProtobufClientHandler.java
public class ProtobufClientHandler extends ChannelInboundHandlerAdapter {
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//当channel就绪后,我们首先通过client发送一个数据。
ctx.writeAndFlush(build());
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
PersonProtos.Person person = (PersonProtos.Person)
System.out.println(person.getEmail());
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();;
ctx.close();
public MessageLite build() {
PersonProtos.Person.Builder personBuilder = PersonProtos.Person.newBuilder();
personBuilder.setEmail("");
personBuilder.setId(1000);
PersonProtos.Person.PhoneNumber.Builder phone = PersonProtos.Person.PhoneNumber.newBuilder();
phone.setNumber("");
personBuilder.setName("李四");
personBuilder.addPhones(phone);
return personBuilder.build();
关于Netty的相关技术,请参考其他文档。
到此为止,我们基本上对protobuf使用方式,有了初步的了解。祝大家好运!
浏览: 543824 次
来自: 北京
楼主大牛,请问下 是如何有效的阅读源码呢,个人在阅读时,容易迷 ...
这个代码,应该是容器自己去做吧,自己应用程序写感觉没有意义啊。 ...
xyexueke ~~~~~~
hao_foo 写道日志收集、分析、监控? 好像跟ELK 功能 ...
“对于client端请求是队列话的,即一个操作阻塞直到serv ...

我要回帖

更多关于 redis 复杂数据结构 的文章

 

随机推荐