Jun 21, 2010

Google Protocol Buffers 实用技术:

Google Protocol Buffers 是一种非常方便高效的数据编码方式(data serialization),几乎在Google的每个产品中都用到了。本文介绍 protocol buffers 的一种高级使用方法(在Google Protocol Buffer的主页上没有的)。

Protocol Buffers 通常的使用方式如下:我们的程序只依赖于有限的几个 protocol messages。我们把这几个 message 定义在一个或者多个 .proto 文件里,然后用编译器 protoc 把 .proto 文件翻译成 C++ 语言(.h和.cc文件)。这个翻译过程把每个 message 翻译成了一个 C++ class。这样我们的程序就可以使用这些 protocol classes 了。

但是还有一种不那么常见的使用方式:我们有一批数据文件(或者网络数据流),其中包含了一些 protocol messages 内容。我们也有定义这些 protocol messages 的 .proto 文件。我们希望解析数据文件中的内容,但是不能使用 protoc 编译器。

一个例子是 codex。codex是Google内最常用的一个工具程序。它可以解析任意文件中的 protocol message 内容,并且把这些内容打印成人能方便的阅读的格式。为了能正确解析和打印数据文件内容,codex 需要定义 protocol message 的 .proto 文件。

为了实现 codex 的功能,一种ad hoc的方法是:
1 把 codex 的基本功能(比如读取数据文件,打印文件内容等)实现在一个 .cc 文件里(比如叫做 codex-base.cc)
1 对给定的 .proto 文件,调用 protoc,得到对应的 .pb.h 和 .pb.cc 文件。
1 把 codex-base.cc 和 protoc 的结果一起编译,生成一个专门解析某一个 protocol message 内容的 codex 程序。
这个办法太ad hoc了。它为每个 .protoc 文件编写一个 codex 程序。

另一个办法是,如果我们有世界上所有的 .proto 文件,那么我们把它们都预先用 protoc 编译了,链接进 codex。显然这个搞法也是不现实的。

那么codex到底是怎么实现的呢?其实它利用了 protocol buffers 没有写入文档的一些 API。闲话少说,我们来看一段用这些“神秘的API“写的代码。这段代码用于解析任意给定的 .proto 文件:

#include <google/protobuf/descriptor.h>
#include <google/protobuf/dynamic_message.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/io/tokenizer.h>
#include <google/protobuf/compiler/parser.h>

// Parsing given .proto file for Descriptor of the given message (by
// name). The returned message descriptor can be used with a
// DynamicMessageFactory in order to create prototype message and
// mutable messages. For example:
DynamicMessageFactory factory;
const Message* prototype_msg = factory.GetPrototype(message_descriptor);
const Message* mutable_msg = prototype_msg->New();

void GetMessageTypeFromProtoFile(const string& proto_filename,
FileDescriptorProto* file_desc_proto) {
using namespace google::protobuf;
using namespace google::protobuf::io;
using namespace google::protobuf::compiler;

FILE* proto_file = fopen(proto_filename.c_str(), "r");
if (proto_file == NULL) {
LOG(FATAL) << "Cannot open .proto file: " << proto_filename;

FileInputStream proto_input_stream(fileno(proto_file));
Tokenizer tokenizer(&proto_input_stream, NULL);
Parser parser;
if (!parser.Parse(&tokenizer, file_desc_proto)) {
LOG(FATAL) << "Cannot parse .proto file:" << proto_filename;

// Here we walk around a bug in protocol buffers that
// |Parser::Parse| does not set name (.proto filename) in
// file_desc_proto.
if (!file_desc_proto->has_name()) {
这个函数的输入是一个 .proto 文件的文件名。输出是一个 FileDescriptorProto 对象。这个对象里存储着对 .proto 文件解析之后的结果。我们接下来用这些结果动态生成某个 protocol message 的 instance(或者用C++术语叫做object)。然后可以调用这个 instance 自己的 ParseFromArray/String 成员函数,来解析数据文件中的每一条记录的内容。请看如下代码:

// Print contents of a record file with following format:
// { <int record_size> <KeyValuePair> }
// where KeyValuePair is a proto message defined in mpimr.proto, and
// consists of two string fields: key and value, where key will be
// printed as a text string, and value will be parsed into a proto
// message given as |message_descriptor|.
void PrintDataFile(const string& data_filename,
const FileDescriptorProto& file_desc_proto,
const string& message_name) {
const int kMaxRecieveBufferSize = 32 * 1024 * 1024; // 32MB
static char buffer[kMaxRecieveBufferSize];

ifstream input_stream(data_filename.c_str());
if (!input_stream.is_open()) {
LOG(FATAL) << "Cannot open data file: " << data_filename;

google::protobuf::DescriptorPool pool;
const google::protobuf::FileDescriptor* file_desc =
if (file_desc == NULL) {
LOG(FATAL) << "Cannot get file descriptor from file descriptor"
<< file_desc_proto.DebugString();

const google::protobuf::Descriptor* message_desc =
if (message_desc == NULL) {
LOG(FATAL) << "Cannot get message descriptor of message: " << message_name;

google::protobuf::DynamicMessageFactory factory;
const google::protobuf::Message* prototype_msg =
if (prototype_msg == NULL) {
LOG(FATAL) << "Cannot create prototype message from message descriptor";
google::protobuf::Message* mutable_msg = prototype_msg->New();
if (mutable_msg == NULL) {
LOG(FATAL) << "Failed in prototype_msg->New(); to create mutable message";

uint32 proto_msg_size; // uint32 is the type used in reocrd files.
for (;;) {
input_stream.read((char*)&proto_msg_size, sizeof(proto_msg_size));

if (proto_msg_size > kMaxRecieveBufferSize) {
LOG(FATAL) << "Failed to read a proto message with size = "
<< proto_msg_size
<< ", which is larger than kMaxRecieveBufferSize ("
<< kMaxRecieveBufferSize << ")."
<< "You can modify kMaxRecieveBufferSize defined in "
<< __FILE__;

input_stream.read(buffer, proto_msg_size);
if (!input_stream)

if (!mutable_msg->ParseFromArray(buffer, proto_msg_size)) {
LOG(FATAL) << "Failed to parse value in KeyValuePair:" << pair.value();

cout << mutable_msg->DebugString();

delete mutable_msg;
这个函数需要三个输入:1)数据文件的文件名,2)之前GetMessageTypeFromProtoFile函数返回的FileDescriptorProto对象,3)数据文件中每条记录的对应的protocol message 的名字(注意,一个 .proto 文件里可以定义多个 protocol messages,所以我们需要知道数据记录对应的具体是哪一个 message)。

以上代码中利用了 DescriptorPool 从 FileDescriptorProto 解析出 FileDescriptor(描述 .proto 文件中所有的 messages)。然后用 DynamicMessageFactory 从 FileDescriptor 里找到我们关注的那个 message 的 MessageDescriptor。接下来,我们利用 DynamicMessageFactory 根据 MessageDescriptor 得到一个 prototype message instance。注意,这个 instance 是不能往里面写内容的(immutable)。我们需要调用其 New 成员函数,来生成一个 mutable 的 instance。

有了一个对应数据记录的 message instance,接下来就好办了。我们读取数据文件中的每条记录。注意:此处我们假设数据文件中以此存放了一条记录的长度,然后是记录内容,接下来是第二条记录的长度和内容,以此类推。所以在上述函数中,我们循环的读取记录长度,然后解析记录内容。值得注意的是,解析内容利用的是 mutable message instance 的 ParseFromArrary 函数;它需要知道记录的长度。因此我们必须在数据文件中存储每条记录的长度。

接下来这段程序演示如何调用 GetMessageTypeFromProtoFile 和 PrintDataFile:

int main(int argc, char** argv) {
string proto_filename, message_name;
vector<string> data_filenames;
FileDescriptorProto file_desc_proto;

ParseCmdLine(argc, argv, &proto_filename, &message_name, &data_filenames);
GetMessageTypeFromProtoFile(proto_filename, &file_desc_proto);

for (int i = 0; i < data_filenames.size(); ++i) {
PrintDataFile(data_filenames[i], file_desc_proto, message_name);

return 0;

Jun 10, 2010

Intel’s Single-chip Cluster Computer

I just read a post on Intel's single-chip cluster computer. This is really a unique thing: multiple cores in a chip, each core has its own private-and-fast memory: cache. Main memory becomes more like the NFS. Programmers write programs for this chip using MPI.

So, this chip is a cluster with super-fast network connection (64GB/s) and very limited memory (because they are implemented as cache).

Jun 5, 2010

How to Install Dropbox in Ubuntu Using SOCKS Proxy

As a Chinese, I live behind the G.F.W. and have to get access Internet through an SSH tunnel. However, the Dropbox installation procedure do not support either SOCKS5 or http proxy. (Yes, the document says it works with http_proxy environment variable, but seems it does not.) Thanks to two great tools: proxychains and tsocks, which can launch any application and handle their network communication request through a pre-configured proxy. With the help of these tools, I can install Dropbox on a brand new Ubuntu machine behind the G.F.W. Here follows the steps:
  1. Install tsocks (or proxychains) under Ubuntu using synaptic. Thanks to Ubuntu, who made these tools standard packages.
  2. Get a SSH tunnel (in any way you like). I paid for an account. So I can setup a proxy tunnel by the following command line on my computer:
    ssh -D 7070 my-username@my-service-provider-url
  3. Configure your Web browser to use the tunnel. In Firefox, select to use SOCKS5 proxy: localhost:7070. This enables you access to Dropbox's homepage and download the Ubuntu package.
  4. Install the package by clicking it in Nautilus. To check the installation, in a shell, type the command
    dropbox start -i
    If you can see some error messages complaining network access restriction, you made it.
  5. Add the following lines to /etc/tsocks.conf:
    server =
    server_type = 5
    server_port = 7070
    If you are using proxychains, you need to modify /etc/proxychains.conf or make your own ~/.proxychains.conf.
  6. This time, using tsocks to launch the Dropbox online install procedure:
    tsocks dropbox start -i
    Cross your fingers and wait for it to download and install Dropbox, until you see Dropbox icon appears on the top-right corner of your screen.
  7. Right-click Dropbox icon, select "Preferences", and set SOCKS5 proxy like you did for Firefox. Hopefully, Dropbox starts to sync files you need now.
Good luck!

Jun 4, 2010

Building Distributed Programs Using GCC

In the case of distributed computing, a program is build (into a binary) and distributed on multiple computers for running. It is often that we do not want to install libraries depended by our program on every working computers. Instead, we can instruct GCC to link static libraries by setting the environment variable:
LDFLAGS=-static -static-libgcc

I have tried this method under Cygwin and Ubuntu Linux. However, if I do this under Darwin (Mac OS X 10.6 Snow Leopard), the linker complains that
ld: library not found for -lcrt0.o
In this Technical Q&A, Apple explains that they want to make Mac OS X upgrading easier, so they do not provide crt0.o to encourage dynamic linking.

Jun 3, 2010

Uint64 Constants

If we write uint64 a = 0xffffffffffffffff;, the compiler often complains:
error: integer constant is too large for ‘long’ type
What we need is to add the LLU suffix: uint64 a = 0xffffffffffffffffLLU;

Jun 2, 2010

Launch a Series of Hadoop Pipes Task

It is OK to call runTask multiple times from within main() in a Hadoop Pipes program to launch a series of MapReduce tasks. However, one thing to remember: MapReduce tasks must not have conflicts output directory, because Hadoop runtime does not start a MapReduce task whose output directory exists.

Incremental Parsing Using Boost Program Options Library

I have to say I fell in love with boost::program_options since the first time I use it in developing my own C++ MapReduce implementation. It can parse command line parameters as well as configuration files. This makes it convenient for programs which supports and expects a bunch of options, where a MapReduce program is a typical example.

A special use-case of a command line parser is that a function need to parse some options out from the command line parameters, and then the rest parameters are passed to another function, which parse other options. For example, the MapReduce runtime requires to get options like "num_map_workers", "num_reduce_workers", etc, and the rest of the program (user customized map and reduce functions) need to parse application-specific options like "topic_dirichlet_prior", "num_lda_topics", etc. boost::program_options supports such kind of multi-round parsing, where the key is boost::program_options::allow_unregistered(). Here attaches a sample program: (For more explanation on this program, please refer to the official document of boost::program_options.)

#include <iostream>
#include <string>
#include <vector>

#include <boost/program_options/option.hpp>
#include <boost/program_options/options_description.hpp>
#include <boost/program_options/variables_map.hpp>
#include <boost/program_options/parsers.hpp>

using namespace std;
namespace po = boost::program_options;

int g_num_map_workers;
int g_num_reduce_workers;

vector<string> foo(int argc, char** argv) {
po::options_description desc("Supported options");
("num_map_workers", po::value<int>(&g_num_map_workers), "# map workers")
("num_reduce_workers", po::value<int>(&g_num_reduce_workers), "# reduce workers")
po::variables_map vm;
po::parsed_options parsed =
po::command_line_parser(argc, argv).options(desc).allow_unregistered().run();
po::store(parsed, vm);

cout << "The following options were parsed by foo:\n";
if (vm.count("num_map_workers")) {
cout << "num_map_workers = " << g_num_map_workers << "\n";
if (vm.count("num_reduce_workers")) {
cout << "num_reduce_workers = " << g_num_reduce_workers << "\n";

return po::collect_unrecognized(parsed.options, po::include_positional);

void bar(vector<string>& rest_args) {
po::options_description desc("Supported options");
("apple", po::value<int>(), "# apples")
po::variables_map vm;
po::parsed_options parsed =
po::store(parsed, vm);

cout << "The following options were parsed by bar:\n";
if (vm.count("apple")) {
cout << "apple = " << vm["apple"].as<int>() << "\n";

int main(int argc, char** argv) {
vector<string> rest_options = foo(argc, argv);

cout << "The following cmd args cannot not be recognized by foo:\n";
for (int i = 0; i < rest_options.size(); ++i) {
cout << rest_options[i] << "\n";


Finally I have to tell that early boost version (e.g., 1.33.1 packed in Cygwin) has bugs in program_options, which leads to core dump in case of unknown options. The solution to download and build your own boost libraries. I just built 1.43.0 on Cygwin on my Windows computer.