Wednesday, August 20, 2008

The missing pieces in the protobuf binary log

Protobuf comes with a minor problem: it does not have support for handling "type tagged structures", that is, something reminiscent of objects in OOP lingo, so if one is going to have a heterogeneous sequences of messages, you have to roll it yourself. For that reason, I added a transport frame for the messages in the binary log that wraps each with some extra information. In addition to allowing the binary log to be a sequence of messages, it also adds some integrity-checking data and simplifies some administrative tasks.

Transport frame with message
Length
Type Tag
Message
Checksum
The format of each message in the sequences is given in the table in the margin. where the length is a specially encoded length that we will go through below, type is a single byte being the type tag, message being one of the messages given in the specification, and checksum being a checksum to ensure the integrity of the transport.

Checksum. As checksum, the plan is to use a CRC-32. We don't want it to be too large to affect performance, and we want it to catch reasonable losses of integrity. I'm considering storing this as a varint after the actual message, but for the time being, it is given as 4 raw bytes (it is not implemented at all yet). Please give me feedback on this: if we make it a varint, we can stuff the checksum in there, but that will also run the risk of not being able to read the checksum due to corruption of the checksum, so offhand, I would say that a fixed number of bytes is preferable.

Type Tag. The type tag is a single byte giving the type of the event. This means that we are limited to 255 events, but considering that we don't even have 26 events in 5.1 right now, I don't see that we will run into that limit very soon. It is possible to put the type tag in the message as well, and specifying it as an enum inside the protobuf specification, but that will just provide the information in two places, so it is better to keep it separately.

Length. The length is the length of the message, that is, it does not include the type tag, checksum, nor the length itself. This simplifies the normal processing, and in the event that one needs to skip an event, it is easy to compute the next position of a transport frame by just decoding the length (see below).

Length encoding

The length is encoded using a special scheme to allow for very little overhead for small events while still leaving room for giving the length for very large events. This scheme currently allows for a compact representation of lengths from 2 bytes 4 GiB (Gigabytes) and, if you don't need to have a compact representation of 2, you can represent lengths in the range 3 bytes to to 16 EiB (Exabytes) [sic].

The basic idea is to note that the length of a message can never be zero, and the minimal length in this case is actually 16 bytes. Since we will never have a length that is less than 16 for an event, that leaves the lengths 0-16 available for denoting other information. The obvious solution is to let the first byte denote either the length, if it is, say greater than 8, or the number of bytes following the byte that gives the length if it is less than or equal to 8, but we can actually do better.

Storing Length requires ceil(log256(Length)) bytes, so if we let the first byte L be

ceil(log2(ceil(log256(Length)))) - 1
(that is, the smallest power of two that is greater than the number of bytes needed for the length, minus 1), we can get away with reserving significantly fewer values. So, for example:

Bytes Value
2A 42
00 FF 02 767
01 FF 01 01 00 66047

Computing the number of bytes can be done by computing 1 << (L + 1), but computing the inverse is a little more involved. The following two functions does the job. Although it looks like a lot of code, length_encode() is actually only 29 instructions on my machine (no function calls), while length_decode() is about 7 instructions. The trick here is to compute the logarithm at the same time as we serialize the bytes into the memory, so the overhead compared to just serializing the length is just a few instructions.

inline unsigned char *
length_encode(size_t length, unsigned char *buf)
{
  unsigned char *ptr= buf;
  assert(length > 1);
  if (length < 256)
    *ptr++= length & 0xFF;
  else {
    int_fast8_t log2m1= -1;        // ceil(log2(ptr - buf)) - 1
    uint_fast8_t pow2= 1;          // pow2(log2m1 + 1)
    while (length > 0) {
      // Check the invariants
      assert(pow2 == (1 << (log2m1 + 1)));
      assert((ptr - buf) <= (1 << (log2m1 + 1)));

      // Write the least significant byte of the current
      // length. Prefix increment is used to make space for the first
      // byte that will hold log2m1.
      *++ptr= length & 0xFF;
      length >>= 8;

      // Ensure the invariant holds by correcting it if it doesn't,
      // that is, the number of bytes written is greater than the
      // nearest power of two.
      if (ptr - buf > pow2) {
        ++log2m1;
        pow2 <<= 1;
      }
    }
    // Clear the remaining bytes up to the next power of two
    while (++ptr < buf + pow2 + 1)
      *ptr= 0;
    *buf= log2m1;
    assert(ptr == buf + pow2 + 1);
  }
  return ptr;
}

inline const unsigned char *
length_decode(const unsigned char *buf, size_t *plen)
{
  if (*buf > 1) {
    *plen = *buf;
    return buf + 1;
  }

  size_t bytes= 1U << (*buf + 1);
  const unsigned char *ptr= buf + 1;
  size_t length= 0;
  for (unsigned int i = 0 ; i < bytes ; ++i)
    length |= *ptr++ << (8 * i);
  *plen= length;
  return ptr;
}

Tuesday, August 19, 2008

Using protobuf for designing and implementing replication in Drizzle

So, following the lead of Brian, I spent a few hours of the weekend to create a very simple replication scheme for Drizzle using protobuf for specifying the binary log events.

Since we are developing a replication for a cloud, there are a few things we have to consider:

  • Servers are unreliable. We shall not trust server, but we shall expect them to crash at the worst possible time (Murphy is a very good friend of mine, you know. He must be, since he visits me very often.) This means that we need to have support to allow statements to be sent to the slaves before the transaction is complete, which means that we need to support interleaving and hence both commit and rollback events.

    In order to handle interleaving, we need to have a transaction id, but in order to handle session specific data in the event of a crash (for example, temporary tables), we need to have a session id as well. However, the session id is only needed for statements, not for other events, so we add it there. This will allow the slave to expire any session objects when necessary.

    Since we cannot always know if the transaction is complete when a statement has been executed, we need to have the commit and rollback events as separate events, instead of using the alternative approach of adding flags to each query event.

  • Reconnections are frequent. Since masters go up and down all the time, we have to do what we can to make reconnections to another master easy. Among other things, it means that we cannot interrogate the master of a slave after it has crashed to figure out where we should start replication, so we need some form of Global Transaction ID to be able to decide where to start replication when connecting to another master.

    In our case, we want the transaction id to be transferable to other servers as well, so we combine the server id and the transaction id to form the Global Transaction ID for this replication.

    Since reconnections are frequent, we also need to have techniques for resolving conflicts between events, and using a timestamp is such a one. To handle that, we add a timestamp to each event, and we make room for a nano-precision timestamp immediately, meaning that we need at least 64 bits for that.

  • Network is not reliable. We expect the cloud to be spread all over the planet, so we cannot really trust the network to provide a reliable transport. This means that we need some form of checksum on each event to ensure that it was transferred correctly.
Now, for the first simple implementation, we're aiming at a statement-based replication, which means that there has to be a way to transfer session context information. Remember that statement based replication just sends statements that change the existing state of the database, but which is also dependent on the context for the session. For example, the INSERT statement below cannot be reliably replicated without the value of the user variable @foo.
SET @foo = 'Hello world!';
INSERT INTO whatever VALUES (@foo);
This is already handled in MySQL Replication by preceeding each query log events with a sequence of context events, but for this solution there is another approach that is more appropriate: adding a set of name-value pairs to the query event. Another problem is that there are functions that are non-deterministic, or context dependent in other ways, but these can be handled by rewriting the queries as follows:

Instead of...use this
INSERT INTO info VALUES (UUID(), 47); SET @tmp = UUID();
INSERT INTO info VALUES (@tmp, 47);

Now, the protobuf specification for all the above items, including some events used to control the storage of the binary log, is:

package BinaryLog;

message Header {
  required fixed64 timestamp = 1;
  required uint32 server_id = 2;
  required uint32 trans_id = 3;
}

message Start {
  required Header header = 1;
  required uint32 server_version = 2;
  required string server_signature = 3;
}

message Chain {
  required Header header = 1;
  required uint32 next = 2;            // Sequence number of next file
}

message Query {
  message Variable {
    required string name = 1;
    required string value = 2;
  }

  required Header header = 1;
  repeated Variable variable = 2;
  required uint32 session_id = 3;
  required string query = 4;
}

message Commit {
  required Header header = 1;
}

message Rollback {
  required Header header = 1;
}
After tossing together a reader and a writer for the format, the result is:
$ ./binlog_writer --trans 1 \
> --set nick=mkindahl --set name='Mats Kindahl' \
> 'INSERT INTO whatever VALUES (@nick,@name)'
$ ./binlog_writer --trans 1 \
> --set nick=krow --set name='Brian Aker' \
> 'INSERT INTO whatever VALUES (@nick,@name)'
$ ./binlog_writer --trans 1 \
> --set nick=mtaylor --set name='Monty Taylor' \
> 'INSERT INTO whatever VALUES (@nick,@name)'
$ ./binlog_reader
# Global Id: (1,0)
# Timestamp: 484270929829911
set @name = 'Mats Kindahl'
set @nick = 'mkindahl'
INSERT INTO whatever VALUES (@nick,@name)
# Global Id: (1,0)
# Timestamp: 484330886264299
set @name = 'Brian Aker'
set @nick = 'krow'
INSERT INTO whatever VALUES (@nick,@name)
# Global Id: (1,0)
# Timestamp: 484391458447787
set @name = 'Monty Taylor'
set @nick = 'mtaylor'
INSERT INTO whatever VALUES (@nick,@name)
$ ls -l log.bin
-rw-r--r-- 1 mats bazaar 311 2008-08-19 14:22 log.bin
Protobuf Rocks! You find the branch containing the ongoing development of this at Launchpad. Right now, there is no changes to the server, we want the format to be stable first, so the branch is merged on a regular basis to the main tree as well.

Thursday, August 14, 2008

A join I/O manipulator for IOStream

I started playing around with protobuf when doing some stuff in Drizzle (more about that later), and since the examples where using IOStream, the table reader and writer that Brian wrote is using IOStreams. Now, IOStreams is pretty powerful, but it can be a pain to use, so of course I start tossing together some utilities to make it easier to work with.

Being a serious Perl addict since 20 years, I of course start missing a lot of nice functions for manipulating strings, and the most immediate one is join, so I wrote a C++ IOStream manipulator to join the elements of an arbitrary sequence and output them to an std::ostream.

In this case, since the I/O Manipulator takes arguments, it has to be written as a class. Recall that std::cout << foo(3) is just a shorthand for operator<<(std::cout, foo(3)). Since we want to avoid constructing the full string before writing it to the output stream, we define our own joiner class and create a operator<<(std::ostream&, joiner&) function that work with the joiner class in the following manner:

template <class FwdIter> class joiner {
  friend std::ostream& operator<<(std::ostream& out, const joiner& j) {
    j.write(out);
    return out;
  }

public:
  explicit joiner(const std::string& separator, FwdIter start, FwdIter finish)
    : m_sep(separator), m_start(start), m_finish(finish)
  { }

private:
  std::string m_sep;
  FwdIter m_start, m_finish;

  void write(std::ostream& out) const {
    FwdIter fi = m_start;
    if (m_start == m_finish)
      return;
    while (true) {
      out << *fi;
      if (++fi == m_finish)
        break;
      else
        out << m_sep;
    }
  }
};
So, now we can write something like:
std::cout << joiner<std::vector<int>::const_iterator>(",", seq.begin(), seq.end())
          << std::endl;
This is an awful lot to type, and especially difficult to maintain since the type of the sequence we are printing might change, so we introduce two helper functions to infer the iterator type for us:
template <class FwdIter>
joiner<FwdIter>
join(const std::string& delim, FwdIter start, FwdIter finish) {
  return joiner<FwdIter>(delim, start, finish);
}

template <class Container>
joiner<typename Container::const_iterator>
join(const std::string& delim, Container seq) {
  typedef typename Container::const_iterator FwdIter;
  return joiner<FwdIter>(delim, seq.begin(), seq.end());
}
Now we can use the following code to write out a comma-separated sequence and let the compiler infer the types for us.
std::cout << join(",", seq.begin(), seq.end())
          << std::endl;
or even more compactly
std::cout << join(",", seq) << std::endl;
Update: There were a bug that cause the write() function above to try to read the first element of an empty sequence. I have added some code in red above that needs to be added to handle empty sequences.