Java中协议缓冲区分隔的I / O函数有C ++等价物吗?

我试图从文件中读取/写入多个Protocol Buffers消息,使用C ++和Java。 谷歌build议在消息前面写长度前缀,但是默认情况下是没有办法的(我可以看到)。

然而,版本2.1.0中的Java API收到了一组“分界”I / O函数,这些函数显然是做这个工作的:

parseDelimitedFrom mergeDelimitedFrom writeDelimitedTo 

有没有C ++的等价物? 如果不是,Java API的大小前缀的连线格式是什么,所以我可以用C ++parsing这些消息?

我在这里晚了一点,但是下面的实现包含了一些其他答案中缺less的优化,并且在64MBinput之后不会失败(尽pipe它仍然强制每个消息的64MB限制 ,而不是整个stream)。

(我是C ++和Java protobuf库的作者,但是我不再为Google工作了,对不起,这个代码从来没有进入到官方的lib中,这就是它的样子。

 bool writeDelimitedTo( const google::protobuf::MessageLite& message, google::protobuf::io::ZeroCopyOutputStream* rawOutput) { // We create a new coded stream for each message. Don't worry, this is fast. google::protobuf::io::CodedOutputStream output(rawOutput); // Write the size. const int size = message.ByteSize(); output.WriteVarint32(size); uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size); if (buffer != NULL) { // Optimization: The message fits in one buffer, so use the faster // direct-to-array serialization path. message.SerializeWithCachedSizesToArray(buffer); } else { // Slightly-slower path when the message is multiple buffers. message.SerializeWithCachedSizes(&output); if (output.HadError()) return false; } return true; } bool readDelimitedFrom( google::protobuf::io::ZeroCopyInputStream* rawInput, google::protobuf::MessageLite* message) { // We create a new coded stream for each message. Don't worry, this is fast, // and it makes sure the 64MB total size limit is imposed per-message rather // than on the whole stream. (See the CodedInputStream interface for more // info on this limit.) google::protobuf::io::CodedInputStream input(rawInput); // Read the size. uint32_t size; if (!input.ReadVarint32(&size)) return false; // Tell the stream not to read beyond that size. google::protobuf::io::CodedInputStream::Limit limit = input.PushLimit(size); // Parse the message. if (!message->MergeFromCodedStream(&input)) return false; if (!input.ConsumedEntireMessage()) return false; // Release the limit. input.PopLimit(limit); return true; } 

好的,所以我一直无法find实现我需要的顶级C ++函数,但是在MessageLite接口中,通过Java API参考进行的一些探讨发现了以下内容:

 void writeDelimitedTo(OutputStream output) /* Like writeTo(OutputStream), but writes the size of the message as a varint before writing the data. */ 

所以Java大小前缀是一个(Protocol Buffers)varint!

有了这些信息,我开始通过C ++ API进行挖掘,发现CodedStream标题,其中包含以下内容:

 bool CodedInputStream::ReadVarint32(uint32 * value) void CodedOutputStream::WriteVarint32(uint32 value) 

使用这些,我应该能够推出自己的C ++函数来完成这项工作。

他们应该真的把这个添加到主要的消息API,但是; 它考虑到了Java的缺失function,Marc Gravell出色的protobuf-net C#端口(通过SerializeWithLengthPrefix和DeserializeWithLengthPrefix)也是如此。

我使用CodedOutputStream / ArrayOutputStream解决了同样的问题,以写入消息(大小)和CodedInputStream / ArrayInputStream读取消息(大小)。

例如,下面的伪代码写下消息后面的消息大小:

 const unsigned bufLength = 256; unsigned char buffer[bufLength]; Message protoMessage; google::protobuf::io::ArrayOutputStream arrayOutput(buffer, bufLength); google::protobuf::io::CodedOutputStream codedOutput(&arrayOutput); codedOutput.WriteLittleEndian32(protoMessage.ByteSize()); protoMessage.SerializeToCodedStream(&codedOutput); 

在编写时,你还应该检查你的缓冲区是否足够大以适应消息(包括大小)。 而在阅读时,你应该检查你的缓冲区是否包含整个消息(包括大小)。

如果他们为C ++ API添加了类似于Java API提供的便捷方法,那肯定会很方便。

干得好:

 #include <google/protobuf/io/zero_copy_stream_impl.h> #include <google/protobuf/io/coded_stream.h> using namespace google::protobuf::io; class FASWriter { std::ofstream mFs; OstreamOutputStream *_OstreamOutputStream; CodedOutputStream *_CodedOutputStream; public: FASWriter(const std::string &file) : mFs(file,std::ios::out | std::ios::binary) { assert(mFs.good()); _OstreamOutputStream = new OstreamOutputStream(&mFs); _CodedOutputStream = new CodedOutputStream(_OstreamOutputStream); } inline void operator()(const ::google::protobuf::Message &msg) { _CodedOutputStream->WriteVarint32(msg.ByteSize()); if ( !msg.SerializeToCodedStream(_CodedOutputStream) ) std::cout << "SerializeToCodedStream error " << std::endl; } ~FASWriter() { delete _CodedOutputStream; delete _OstreamOutputStream; mFs.close(); } }; class FASReader { std::ifstream mFs; IstreamInputStream *_IstreamInputStream; CodedInputStream *_CodedInputStream; public: FASReader(const std::string &file), mFs(file,std::ios::in | std::ios::binary) { assert(mFs.good()); _IstreamInputStream = new IstreamInputStream(&mFs); _CodedInputStream = new CodedInputStream(_IstreamInputStream); } template<class T> bool ReadNext() { T msg; unsigned __int32 size; bool ret; if ( ret = _CodedInputStream->ReadVarint32(&size) ) { CodedInputStream::Limit msgLimit = _CodedInputStream->PushLimit(size); if ( ret = msg.ParseFromCodedStream(_CodedInputStream) ) { _CodedInputStream->PopLimit(msgLimit); std::cout << mFeed << " FASReader ReadNext: " << msg.DebugString() << std::endl; } } return ret; } ~FASReader() { delete _CodedInputStream; delete _IstreamInputStream; mFs.close(); } }; 

IsteamInputStream对eofs和其他容易发生的错误是非常脆弱的,当与std :: istream一起使用时。 在此之后protobufstream被永久性地损坏,任何已经使用的缓冲区数据被破坏。 在protobuf的传统stream中有适当的支持。

实施google::protobuf::io::CopyingInputStream并与CopyingInputStreamAdapter一起使用。 对输出variables做同样的事情。

实际上,parsing调用在google::protobuf::io::CopyingInputStream::Read(void* buffer, int size)中给出缓冲区。 剩下的唯一事情就是以某种方式读入它。

以下是与Asio同步stream( SyncReadStream / SyncWriteStream )一起使用的示例:

 #include <google/protobuf/io/zero_copy_stream_impl_lite.h> using namespace google::protobuf::io; template <typename SyncReadStream> class AsioInputStream : public CopyingInputStream { public: AsioInputStream(SyncReadStream& sock); int Read(void* buffer, int size); private: SyncReadStream& m_Socket; }; template <typename SyncReadStream> AsioInputStream<SyncReadStream>::AsioInputStream(SyncReadStream& sock) : m_Socket(sock) {} template <typename SyncReadStream> int AsioInputStream<SyncReadStream>::Read(void* buffer, int size) { std::size_t bytes_read; boost::system::error_code ec; bytes_read = m_Socket.read_some(boost::asio::buffer(buffer, size), ec); if(!ec) { return bytes_read; } else if (ec == boost::asio::error::eof) { return 0; } else { return -1; } } template <typename SyncWriteStream> class AsioOutputStream : public CopyingOutputStream { public: AsioOutputStream(SyncWriteStream& sock); bool Write(const void* buffer, int size); private: SyncWriteStream& m_Socket; }; template <typename SyncWriteStream> AsioOutputStream<SyncWriteStream>::AsioOutputStream(SyncWriteStream& sock) : m_Socket(sock) {} template <typename SyncWriteStream> bool AsioOutputStream<SyncWriteStream>::Write(const void* buffer, int size) { boost::system::error_code ec; m_Socket.write_some(boost::asio::buffer(buffer, size), ec); return !ec; } 

用法:

 AsioInputStream<boost::asio::ip::tcp::socket> ais(m_Socket); // Where m_Socket is a instance of boost::asio::ip::tcp::socket CopyingInputStreamAdaptor cis_adp(&ais); CodedInputStream cis(&cis_adp); Message protoMessage; uint32_t msg_size; /* Read message size */ if(!cis.ReadVarint32(&msg_size)) { // Handle error } /* Make sure not to read beyond limit of message */ CodedInputStream::Limit msg_limit = cis.PushLimit(msg_size); if(!msg.ParseFromCodedStream(&cis)) { // Handle error } /* Remove limit */ cis.PopLimit(msg_limit); 

我在C ++和Python中遇到了同样的问题。

对于C ++版本,我使用了发布在这个线程上的Kenton Varda代码和发送给protobuf团队的pull请求中的代码(因为这里发布的版本不会处理EOF,而他发送给github的版本会)。

 #include <google/protobuf/message_lite.h> #include <google/protobuf/io/zero_copy_stream.h> #include <google/protobuf/io/coded_stream.h> bool writeDelimitedTo(const google::protobuf::MessageLite& message, google::protobuf::io::ZeroCopyOutputStream* rawOutput) { // We create a new coded stream for each message. Don't worry, this is fast. google::protobuf::io::CodedOutputStream output(rawOutput); // Write the size. const int size = message.ByteSize(); output.WriteVarint32(size); uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size); if (buffer != NULL) { // Optimization: The message fits in one buffer, so use the faster // direct-to-array serialization path. message.SerializeWithCachedSizesToArray(buffer); } else { // Slightly-slower path when the message is multiple buffers. message.SerializeWithCachedSizes(&output); if (output.HadError()) return false; } return true; } bool readDelimitedFrom(google::protobuf::io::ZeroCopyInputStream* rawInput, google::protobuf::MessageLite* message, bool* clean_eof) { // We create a new coded stream for each message. Don't worry, this is fast, // and it makes sure the 64MB total size limit is imposed per-message rather // than on the whole stream. (See the CodedInputStream interface for more // info on this limit.) google::protobuf::io::CodedInputStream input(rawInput); const int start = input.CurrentPosition(); if (clean_eof) *clean_eof = false; // Read the size. uint32_t size; if (!input.ReadVarint32(&size)) { if (clean_eof) *clean_eof = input.CurrentPosition() == start; return false; } // Tell the stream not to read beyond that size. google::protobuf::io::CodedInputStream::Limit limit = input.PushLimit(size); // Parse the message. if (!message->MergeFromCodedStream(&input)) return false; if (!input.ConsumedEntireMessage()) return false; // Release the limit. input.PopLimit(limit); return true; } 

这是我的python2实现:

 from google.protobuf.internal import encoder from google.protobuf.internal import decoder #I had to implement this because the tools in google.protobuf.internal.decoder #read from a buffer, not from a file-like objcet def readRawVarint32(stream): mask = 0x80 # (1 << 7) raw_varint32 = [] while 1: b = stream.read(1) #eof if b == "": break raw_varint32.append(b) if not (ord(b) & mask): #we found a byte starting with a 0, which means it's the last byte of this varint break return raw_varint32 def writeDelimitedTo(message, stream): message_str = message.SerializeToString() delimiter = encoder._VarintBytes(len(message_str)) stream.write(delimiter + message_str) def readDelimitedFrom(MessageType, stream): raw_varint32 = readRawVarint32(stream) message = None if raw_varint32: size, _ = decoder._DecodeVarint32(raw_varint32, 0) data = stream.read(size) if len(data) < size: raise Exception("Unexpected end of file") message = MessageType() message.ParseFromString(data) return message #In place version that takes an already built protobuf object #In my tests, this is around 20% faster than the other version #of readDelimitedFrom() def readDelimitedFrom_inplace(message, stream): raw_varint32 = readRawVarint32(stream) if raw_varint32: size, _ = decoder._DecodeVarint32(raw_varint32, 0) data = stream.read(size) if len(data) < size: raise Exception("Unexpected end of file") message.ParseFromString(data) return message else: return None 

它可能不是最好看的代码,我相信它可以被重构一点点,但至less应该告诉你一个方法来做到这一点。

现在的大问题是:

即使使用python-protobuf的C ++实现,它的速度也比纯C ++慢一个数量级。 我有一个基准,我从一个文件中读取每个约30个字节的10M protobuf消息。 C ++需要0.9秒,python需要35s。

一种更快的方法是重新实现varint解码器,使其从文件中读取并一次性解码,而不是从文件中读取,然后按照当前的代码进行解码。 (分析显示在varint编码器/解码器中花费了大量的时间)。 但是不用说,仅靠Python是不足以缩小Python版本和C ++版本之间的差距的。

任何想法使其更快,是非常欢迎:)

也正在寻找这个解决scheme。 这里是我们解决scheme的核心,假设一些java代码用writeDelimitedTo将许多MyRecord消息写入文件中。 打开文件并循环,执行:

 if(someCodedInputStream-> ReadVarint32(&bytes)){
   CodedInputStream :: Limit msgLimit = someCodedInputStream-> PushLimit(bytes);
   if(myRecord-> ParseFromCodedStream(someCodedInputStream)){
     //用parsing的MyRecord实例来做你的东西
   } else {
     //处理parsing错误
   }
   someCodedInputStream-> PopLimit(msgLimit);
 } else {
   //也许文件结束
 }

希望它有帮助。

使用协议缓冲区的objective-c版本,我遇到了这个确切的问题。 在从iOS客户端发送到使用parseDelimitedFrom(基于Java的服务器,需要长度作为第一个字节)的服务器之前,我需要首先调用writeRawByte来访问CodedOutputStream。 在这里发帖希望能够帮助那些遇到这个问题的人。 在解决这个问题的时候,人们会认为Google的原始buf会附带一个简单的标志来帮助你…

  Request* request = [rBuild build]; [self sendMessage:request]; } - (void) sendMessage:(Request *) request { //** get length NSData* n = [request data]; uint8_t len = [n length]; PBCodedOutputStream* os = [PBCodedOutputStream streamWithOutputStream:outputStream]; //** prepend it to message, such that Request.parseDelimitedFrom(in) can parse it properly [os writeRawByte:len]; [request writeToCodedOutputStream:os]; [os flush]; } 

因为我不能把这个作为对Kenton Varda上面答案的评论。 我相信他发布的代码中有一个错误(以及其他已经提供的答案)。 以下代码:

 ... google::protobuf::io::CodedInputStream input(rawInput); // Read the size. uint32_t size; if (!input.ReadVarint32(&size)) return false; // Tell the stream not to read beyond that size. google::protobuf::io::CodedInputStream::Limit limit = input.PushLimit(size); ... 

设置了一个不正确的限制,因为它没有考虑到已经从input读取的varint32的大小。 这可能会导致数据丢失/损坏,因为从可能是下一个消息的一部分的stream读取附加字节。 正确处理的通常方法是删除用于读取大小的CodedInputStream,并创build一个用于读取有效负载的新的大小:

 ... uint32_t size; { google::protobuf::io::CodedInputStream input(rawInput); // Read the size. if (!input.ReadVarint32(&size)) return false; } google::protobuf::io::CodedInputStream input(rawInput); // Tell the stream not to read beyond that size. google::protobuf::io::CodedInputStream::Limit limit = input.PushLimit(size); ... 

您可以使用getline从指定的分隔符中读取一个string:

 istream& getline ( istream& is, string& str, char delim ); 

(在标题中定义)