Clients may also delegate the task of garbage collection directly to a stream replica

Reconfiguration is used extensively in Corfu, to replace failed drives, to add capacity to the system, and to add, remove, or relocate replicas. This makes reconfiguration latency a crucial metric for our system. Recall that reconfiguration latency has two components: sealing the current con- figuration, which contacts a subset of the cluster, and writing the new configuration to the auxiliary. In our experiments, we conservatively seal all drives, to provide an upper bound on reconfiguration time; in practice, only a subset needs to be sealed. Our auxiliary is implemented as a networked file share.We now demonstrate the performance of Corfu-Store for atomic multi-key operations. Figure 3.19 shows the performance of multi-put operations in Corfu-Store. On the x-axis, we vary the number of keys updated atomically in each multi-put operation. The bars in the graph plot the number of multi-puts executed per second. The line plots the total number of Corfu log appends as a result of the multi-put operations; a multi-put involving k keys generates k +1 log appends, one for each updated key and a final append for the commit record. A client appending to a materialized stream first obtains the current layout and makes a request to the sequencer with a stream id. The sequencer returns both alog token, which is a pointer to the next address in the global log, and a stream token, which is a pointer to the next address in the stream. Using these tokens and the layout, the client determines the set of replicas to write to. In contrast to traditional designs, replica sets in vCorfu are dynamically arranged during appends. For fault tolerance, each entry is replicated on two replica types: the first indexed by the address in the log ,best vertical garden system and the second by the combination of the stream id and the stream address. To perform a write, the client writes to the log replica first, then to the stream replica.

If a replica previously accepted a write to a given address, the write is rejected and the client must retry with a new log token. Once the client writes to both replicas, it commits the write by broadcasting a commit message to each replica it accessed. Replicas will only serve reads for committed data. The write path of a client, which takes four round trips in normal operation is shown in Figure 4.5. A server-driven variant where the log replica writes to the stream replica takes 6 messages; we leave implementation of this variant for future work. The primary benefit of materialized streams is that they provide an abstraction of independent logs while maintaining a total global order over all appends. This enables vCorfu to support atomic writes across streams, which form the basic building block for supporting transactions. To append to multiple streams atomically, the client obtains a log token and stream tokens for each materialized stream it wishes to append to. The client first writes to the log replica using the log token. Then, the client writes to the stream replica of each stream. The client then sends a commit message to each participating replica. The resulting write is ordered in the log by a single log token, but multiple stream tokens. Materialized strreams are a first class abstraction in vCorfu, unlike streams in Tango which are merely tags within a shared log. Materialized streams strike a balance that combines the global consistency advantages of shared logs with the locality advantages of distributed data platforms. Specifically, the following properties enable vCorfu materialized streams to effectively support state machine replication at scale: The global log is a single source of scalability, consistency, durability and history. One may wonder, why have log replicas at all, if all we care to read from are materialized streams? First, the global log provides a convenient, scalable mechanism to obtain a consistent snapshot of the entire system. This can be used to execute long running read-only transactions, a key part of many analytics workloads, or a backup utility could constantly scan the log and move it to cold storage. Second, the log provides us with a unique level of fault tolerance – even if all the stream replicas were to fail, vCorfu can fall back to using the log replicas only, continuing to service requests.

Materialized streams are true virtual logs, unlike streams. Tango streams enable clients to selectively consume a set of updates in a shared log. Clients read sequentially from streams using a read Next call, which returns the next entry in the stream. Tango clients cannot randomly read from anywhere in stream because streams are implemented using a technique called backpointers: each entry in a stream points to the previous entry, inducing a requirement for sequential traveral. Materializing the stream removes this restriction: since clients have access to a replica which contains all the updates for a given stream, clients can perform all the functions they would call on a log, including a random read given a stream address, or a bulk read of an entire stream. This support is essential if clients randomly read from different streams, as back pointers would require reading each stream from the tail in order. vCorfu avoids backpointers, which pose performance, concurrency and recovery issues. Back pointers can result in performance degradation when concurrent clients are writing to the log and a timeout occurs,vertical farming equipment causing a hole filling protocol to be invoked. Since holes have no back pointers, timeouts force a linear scan of the log, with a cost proportional to the number of streams in the log. Tango mitigates this problem by keeping the number of streams low and storing multiple back pointers, which has significant overhead because both the log and the sequencer must store these back pointers. Furthermore, back pointers significantly complicate recovery: if the sequencer fails, the entire log must be read to determine the most recent writes to each stream. vCorfu instead relies on stream replicas, which contain a complete copy of updates for each stream, resorting to a single back pointer only when stream replicas fail. Sequencer recovery is fast, since stream replicas can be queried for the most recent update. Stream replicas may handle playback and directly serve requests. In most shared log designs, clients must consume updates, which are distributed and sharded for performance. The log itself cannot directly serve requests because no single storage unit for the log contains all the updates necessary to service a request.

Stream replicas in vCorfu, however, contain all the updates for a particular stream, so a stream replica can playback updates locally and directly service requests to clients, a departure from the traditional client-driven shared log paradigm. This removes the burden of playback from clients and avoids the playback bottleneck of previous shared log designs. Garbage collection is greatly simplified. In Tango, clients cannot trim streams directly. Instead, they must read the stream to determine which log addresses should be released, and issue trim calls for each log address, which can be a costly operation if many entries are to be released. In vCorfu, clients issue trim commands to stream replicas, which release storage locally and issue trim commands to the global log.While the Corfu log provides a scalable fabric for consistency, programmers need more than just a shared log abstraction to write reliable distributed programs. This chapter introduces the rich, object-oriented data services provided by Corfu. Instead of forcing programmers to produce and consume entries on the log, the distributed objects provided by Corfu enables applications to use the log by interacting with in-memory objects which are similar to common in-memory data structures used today. In addition to supporting in-memory objects, Corfu also supports transactions, which enable applications to access and modify multiple objects consistently. Finally, materialized streams enable local playback of the log, which enable thousands of clients to consume the log simultaneously.A Corfu application is typically a service running in a cloud environment as a part of a larger distributed system, managing metadata such as indices, name spaces, membership, locks, or resource lists. Application code executes on clients and manipulates data stored in Corfu objects, typically in response to networked requests from machines belonging to other services and subsystems. The local view of the object on each client interacts with a Corfu runtime, which in turn provides persistence and consistency by issuing appends and reads to an underlying shared log. Importantly, Corfu run times on different machines do not communicate with each other directly through message-passing; all interaction occurs via the shared log. Applications can use a standard set of objects provided by Corfu, providing interfaces similar to the Java Collections library or the C++ STL; alternatively, application developers can roll their own Corfu objects.The code for the Corfu object itself has three main components. First, it contains the view, which is an in-memory representation of the object in some form, such as a list or a map; in the example of a Corfu Register shown in Figure 5.1, this state is a single integer. Second, it implements the mandatory apply upcall which changes the view when the Corfu runtime calls it with new entries from the log.

The view must be modified only by the Corfu runtime via this apply upcall, and not by application threads executing arbitrary methods of the object. Finally, each object exposes an external interface consisting of object-specific mutator and accessor methods; for example, a CorfuMap might expose get/put methods, while the Corfu Register in Figure 5.1 exposes read/write methods. The object’s mutators do not directly change the in-memory state of the object, nor do the accessors immediately read its state. Instead, each mutator coalesces its parameters into an opaque buffer – an update record – and calls the update_helper function of the Corfu runtime, which appends it to the shared log. Each accessor first calls query_helper before returning an arbitrary function over the state of the object; within the Corfu runtime, query_helper plays new update records in the shared log until its current tail and applies them to the object via the apply upcall before returning.Based on our description thus far, a Corfu object is indistinguishable from a conventional SMR object. As in SMR, different views of the object derive consistency by funneling all updates through a total ordering engine. As in SMR, strongly consistent accessors are implemented by first placing a marker at the current position in the total order and then ensuring that the view has seen all updates until that marker. In conventional SMR this is usually done by injecting a read operation into the total order, or by directing the read request through the leader; in our case we leverage the check function of the log. Accordingly, a Corfu object with multiple views on different machines provides linearizable semantics for invocations of its mutators and accessors. A Corfu object is trivially persistent; the state of the object can be reconstructed by simply creating a new instance and calling query_helper on Corfu. A more subtle point is that the in-memory data structure of the object can contain pointers to values stored in the shared log, effectively turning the data structure into an index over log-structured storage. To facilitate this, each Corfu object is given direct, read-only access to its underlying shared log, and the apply upcall optionally provides the offset in the log of the new update. For example, a CorfuMap can update its internal hash-map with the offset rather than the value on each apply upcall; on a subsequent get, it can consult the hash-map to locate the offset and then directly issue a random read to the shared log. Since all updates are stored in the shared log, the state of the object can be rolled back to any point in its history simply by creating a new instance and syncing with the appropriate prefix of the log. To enable this, the Corfu query_helper interface takes an optional parameter that specifies the offset at which to stop syncing.