Distributed System

Posted on 2018-08-19

Introduction

  • Implementation
    RPC, threads, concurrency control.

  • Performance
    希望有N太机器,可以提高N倍的Throughput.

    • 问题:
      load imbalance, stragglers
      Non-parallelizable code: initialization, interaction.
      Bottlenecks from shared resources, e.g. network.
  • Fault tolerance

  • Consistency

    • Achieving good behavior is hard!
      “Replica” servers are hard to keep identical.
      Clients may crash midway through multi-step update.
      Servers crash at awkward moments, e.g. after executing but before replying.
      Network may make live servers look dead; risk of “split brain”.
    • Consistency and performance are enemies.
      Consistency requires communication, e.g. to get latest Put().
      “Strong consistency” often leads to slow systems.
      High performance often imposes “weak consistency” on applications.
  • Example: Map/Reduce

    • How to reduce effect of slow network.
      Map input is read from GFS replica on local disk, not over network.
      Intermediate data goes over network just once.
      Map worker writes to local disk, not GFS.
      Intermediate data partitioned into files holding many keys.
      Q: Why not stream the records to the reducer (via TCP) as they are being
      produced by the mappers?
      
    • Load Balance
      Duplicate to deal with swagger

L2: Infrastructure: RPC and threads

Why Go?

6.824 used C++ for many years

  1. C++ worked out well
    but students spent time tracking down pointer and alloc/free bugs and there’s no very satisfactory C++ RPC package
  2. Go is a bit better than C++ for us
    good support for concurrency (goroutines, channels, &c)
    good support for RPC
    garbage-collected (no use after freeing problems)
    type safe: 例如C/C++中的越界访问
    threads + GC is particularly attractive!

Threads

threads are a useful structuring tool
Go calls them goroutines; everyone else calls them threads
they can be tricky

Why threads?

They express concurrency, which shows up naturally in distributed systems

  1. I/O concurrency:
    While waiting for a response from another server, process next request
  2. Multicore:
    Threads run in parallel on several cores

Thread = “thread of execution”

threads allow one program to (logically) execute many things at once
the threads share memory
each thread includes some per-thread state:

program counter, registers, stack

How many threads in a program?

  1. Sometimes driven by structure
    e.g. one thread per client, one for background tasks
    
  2. Sometimes driven by desire for multi-core parallelism
    so one active thread per core
    

the Go runtime automatically schedules runnable goroutines on available cores

Sometimes driven by desire for I/O concurrency
the number is determined by latency and capacity
keep increasing until throughput stops growing
Go threads are pretty cheap
100s or 1000s are fine, but maybe not millions
Creating a thread is more expensive than a method call

Threading challenges:

  1. sharing data
    one thread reads data that another thread is changing?
    e.g. two threads do count = count + 1
    this is a “race” – and is usually a bug
    -> use Mutexes (or other synchronization)
    -> or avoid sharing
  2. coordination between threads
    how to wait for all Map threads to finish?
    -> use Go channels or WaitGroup
  3. granularity of concurrency
    coarse-grained -> simple, but little concurrency/parallelism
    fine-grained -> more concurrency, more races and deadlocks

When to use sharing and locks, versus channels?

  1. Most problems can be solved in either style, What makes the most sense depends on how the programmer thinks

    state – sharing and locks
    communication – channels
    waiting for events – channels

Use Go’s race detectorhttps://golang.org/doc/articles/race_detector.html go test -race

Remote Procedure Call (RPC)

a key piece of distributed system machinery; all the labs use RPC
goal: easy-to-program client/server communication

RPC message diagram:

Client Server
request—>
<—response

RPC tries to mimic local fn call:

Client: //调用函数
z = fn(x, y)
Server: //定义函数
fn(x, y) {
compute
return z
}
Rarely this simple in practice…

Software structure
client app handlers
stubs dispatcher
RPC lib RPC lib
net ———— net

Go example: kv.go link on schedule page
A toy key/value storage server – Put(key,value), Get(key)->value
Uses Go’s RPC library
Common:
> You have to declare Args and Reply struct for each RPC type
Client:
> connect()’s Dial() creates a TCP connection to the server
> Call() asks the RPC library to perform the call you specify server function name, arguments, place to put reply library marshalls args, sends request, waits, unmarshally reply
return value from Call() indicates whether it got a reply
usually you’ll also have a reply.Err indicating service-level failure

Server:
Go requires you to declare an object with methods as RPC handlers
You then register that object with the RPC library, accept TCP connections, give them to RPC library

The RPC library

reads each request
creates a new goroutine for this request
unmarshalls request
calls the named method (dispatch)
marshalls reply
writes reply on TCP connection
The server’s Get() and Put() handlers
Must lock, since RPC library creates per-request goroutines, read args; modify reply

A few details:

  1. Binding: how does client know who to talk to?
    For Go’s RPC, server name/port is an argument to Dial
    Big systems have some kind of name or configuration server
  2. Marshalling: format data into packets
    Go’s RPC library can pass strings, arrays, objects, maps, &c
    Go passes pointers by copying (server can’t directly use client pointer)
    Cannot pass channels or functions

RPC problem: what to do about failures? resend
e.g. lost packet, broken network, slow server, crashed server

What does a failure look like to the client RPC library?
Client never sees a response from the server
Client does not know if the server saw the request!
Maybe server never saw the request
Maybe server executed, crashed just before sending reply
Maybe server executed, but network died just before delivering reply
[diagram of lost reply]

Simplest failure-handling scheme: “best effort”
Call() waits for response for a while, If none arrives, re-send the request. Do this a few times. Then give up and return an error

Q: is “best effort” easy for applications to cope with?

A particularly bad situation:
client executes
Put(“k”, 10);
Put(“k”, 20);
both succeed
what will Get(“k”) yield?
[diagram, timeout, re-send, original arrives late]

Q: is best effort ever OK?
read-only operations
operations that do nothing if repeated
e.g. DB checks if record has already been inserted

Better RPC behavior: “at most once”
idea: server RPC code detects duplicate requests
returns previous reply instead of re-running handler
Q: how to detect a duplicate request?
client includes unique ID (XID) with each request (TCP feature)
uses same XID for re-send
server:
if seen[xid]:
r = old[xid]
else
r = handler()
old[xid] = r
seen[xid] = true

some at-most-once complexities
how to ensure XID is unique?

big random number?
combine unique client ID (ip address?) with sequence #?

  • At-most-once (go RPC的选择)
    Server端检测重复请求,如果之前处理过,则返回之前处理的结果,否则,生成结果并返回
    当采用at most once时,并且是把重复请求信息记录在内存中的,如果Server端宕机了,则记录的重复请求信息就不存在了,重启后,就无法正常工作了。
    解决方案可能是把重复记录信息持久化,为了更安全,可能需要把这些信息复制到多个Server。

  • Exactly Once
    at most once + 具有多副本容灾,并且Client一直重试到成功

GFS

paper: The Google File System

需要关注的: network performance, fault-tolerance, consistency

many other systems use GFS (e.g., Bigtable, Spanner @ Google), HDFS (Hadoop Distributed File System) based on GFS

What is consistency

  • Weak Consistency
    read() may return stale data — not the result of the most recent write
  • Strong consistency
    read() always returns the data from the most recent write()
  • General tension between these
    strong consistency is easy for application writers
    strong consistency is bad for performance
    weak consistency has good performance and is easy to scale to many servers
    weak consistency is complex to reason about

GFS goals

  • failures are common, must tolerate
  • High-performance: many concurrent readers and writers, so the Map/Reduce job read and store final result in GFS.(而不是存在temporary or intermediate files)
  • Use network efficiently: save bandwidth.

High-level design/Reads

  • Master + chunkservers
  • Master stores directories(path), files, names, open/read/write
    But not POSIS(避免处理复杂的符合POSIX语义的各种接口)
    • Master knows directory hierarchy
      for directory, what files are in it
      for file, knows chunk servers for each 64 MB
    • Master keeps state in memory
      64 bytes of metadata per each chunk
    • Master has private recoverable database for metadata
      operation log flushed to disk
      occasional asynchronous compression info checkpoint
      N.B.: != the application checkpointing in §2.7.2
      master can recovery quickly from power failure
      shadow masters that lag a little behind master, can be promoted to master(master有备份)
  • Chunk servers
    100s of Linux chunk servers with disks
    • store 64MB chunks (an ordinary Linux file for each chunk)
    • each chunk replicated on three servers

Q: Besides availability of data, what does 3x replication give us?
load balancing for reads to hot files
Affinity(读最近那个,减少网络延迟和traffic)

Q: why not just store one copy of each file on a RAID’d disk?
RAID isn’t commodity
Want fault-tolerance for whole machine/system; not just storage device

Q: why are the chunks so big?
amortizes overheads, reduces state size in the master(减少master开销(directory, path, more chunks))

  • Client

    • Client Read
      • Send file name and chunk index to master
      • master replies with set of servers that have that chunks
      • response includes version # of chunk
      • client cache that information
      • client ask nearest chunk server
      • check version, if version is wrong, re-contact master
    • Random client write to existing file

      • client asks master for chunk locations + primary(vs slave)
      • master responds with chunk servers(may involve serveral chunks), version #, and who is primary(primary是其中一个replica)
      • primary has(or gets) 60s lease
      • client computes chain of replicas based on network topology(一个文件,involve多个chunk)
      • client sends data to first replica, which forwards to others
      • pipelines network use, distributes load
      • replicas ack data receipt
      • client tells primary to write
      • primary assign sequence number and writes, then tells other replicas to write once all done, ack to client.

      What if there’s another concurrent client writing to the same place?

GFS

组件: A single master and serveral Chunkservers.

组件

  • Chunk
    Files divided into fixed-size chunks. Each chunk is identified by an immutable and globally unique 64 bit chunk handle, assigned by the master at the time of chunk creation. Chunkservers store chunks on local disks as Linux files and read/write chunk data specified by a chunk handle and byte range.
  • Master
    The master maintains all file system metadata, includes namespace, access control information, the mapping from files to chunks and current location of chunks.
    The master send heartbeat to each chunkserver to give it instructions and collect its state.
  • Metadata(store in master’s memory)
    filename->chunk(with version#)
    chunk namespace.
    chunk with all replicas location(heartbeat更新)
    前两种,会用 operation log 来persist(可以deal with master failure)

Q: client and chunkserver都不cache file data的原因?
因为, GFS处理的数据都非常大,cache没有太大必要。并且没有cache可以简化client and the overall system by eliminating cache coherence issues.但是 client cache metadata.

Operation

  • Read
    client->master, with filename and chunk index(文件很大,会involve serveral chunk)
    master->client, the corresponding chunk handle(chunk unique index), locations of the replica, then client cache it(using filename and chunk index as key)
    client->closest replica, chunk handle and a byte range within that chunk.
  • Write/Append
    master will grant lease to a replica(called primary), 来确保sequence. Lease和its extension都是根据heartbeat来捎带的。
    1. client->master, which chunkserver holds the current lease for the chunk and location for other replicas. If no one has a lease, the master grants one to a replica it chooses
    2. master->client, identity of the primary and the locations of the other replicas.
    3. Client pushes the data to primary, primary forward to secondaries, chunkserver store it in LRU cache.
    4. All replicas ack to the client, and client send a write request to the primary. The primary assigns consecutive serial numbers to all the mutations it receives(keep serialization).
    5. Primary forwards the write request to all replica. Each replica applies mutations in the same serial number order assigned by the primary. Each secondary replica applies mutations in the same serial number order assigned by the primary.
    6. Secondaries->primary, ack.
    7. Primary->client.

Primary-Backup Replication for fault tolerance

  • Fault tolerance
    available: still useable despite [some class of] failures
    strongly consistent: looks just like a single server to clients
    transparent to clients
    transparent to server software
    efficient

  • Type of Faliures

    • Fail-stop failures
    • independent failures
    • Network drops some/all packets
    • Network partition
  • Core idea: replication
    Two servers (or more), Each replica keeps state needed for the service, If one replica fails, others can continue

  • Big Questions:
    What state to replicate?
    Does primary have to wait for backup?
    When to cut over to backup?
    Are anomalies visible at cut-over?
    How to bring a replacement up to speed?

  • Two main approaches

    • State Transfer
      “Primary” executes the service, sends new state to backups.
      disadv: state maybe large, slow to transfer
    • Replicated state machine
      All replicas execute all operations. If same state state, same operations, same order, deterministic, then same state.
      adv: operation are small compared to data
      disadv: complex to get right
  • At what level to define a replicated state machine(RSM)?

    • K/V put and get
      “application-level” RSM; usually requires server and client modifications can be efficient; primary only sends high-level operations to backup
    • x86 instructions
      might allow us to replicate any existing server w/o modification; but requires much more detailed primary/backup synchronization; and we have to deal with interrupts, DMA, weird x86 instructions
  • RSM Example: vmware vm
    os and app is guest, vm have primary and backup, share disk. primary sends all inputs to backup over logging channel.
    initial state: memory, disk files.
    same instruction, same inputs -> same execution

    • Divergence
      Inputs from external world (the network).
      Data read from storage server.
      Timing of interrupts.
      Instructions that aren’t pure functions of state, such as cycle counter.
      Races.
  • Example of divergence

  1. “if primary fails, clients will see inconsistent story from backup.”
    Lock server grants lock to client C1, rejects later request from C2.
    Primary and backup had better agree on input order!
    Otherwise, primary fails, backup now tells clients that C2 holds the lock.
    Lock server revokes lock after one minute.
    Suppose C1 holds the lock, and the minute is almost exactly up.
    C2 requests the lock.
    Primary might see C2’s request just before timer interrupt, reject.
    Backup might see C2’s request just after timer interrupt, grant.
    So: backup must see same events, in same order, at same point in instruction stream.

Raft - Distributed Consensus

Abstract

  • fault-tolerant services using replicated state machines:
    • 应用:
      • configuration server(MapReduce, GFS master)
      • k/v storage server, put()/get()
  • Architecture
    • client发给K/V层的leader, put/get/append的命令
    • K/V层forward给raft层, via AppendEntries RPCs
    • raft层每个replica appends the entry to its local log(haven’t commit yet).
    • entry becomes “committed” at the leader if a majority put it in their logs.
    • leader respond to K/V layer after it has committed.
    • K/V layer applies Put to DB
    • Leader replies to client execution result.

Leader election

  • 为什么有leader: 保证每个replica执行相同的command.

  • Term: 用来给leader的编号
    new leader->new term, 每个term最多有一个leader或没有。
    term也可以用来追踪最新的leader.

  • 什么时候开始leader election
    如果replica don’t hear from(heartbeat/AppendEntries) current leader for an “election timeout”
    Followers increment local currentTerm, become candidates, start election.
    NOTE: 此时election不一定是必须的,that’s slow but safe
    NOTE: old leader may still be alive and think it is the leader

  • what happens when a server becomes candidate?
    3中可能的情况

    • Get majority, converts to leader(但是在byzantine情况下可能有风险)
    • Fails to get a majority, hears from another leader(接受到了AppendEntries RPC)
      Becomes follower
    • Fails to get a majority, but doesn’t hear from a new Leader. 增加term, 重新elect.
      eg., if in minority network partition.
  • 什么来保证只有一个term最多只有一个leader

    • leader must get “yes” votes from a majority of servers
    • each server can cast only one vote per term
  • 如何通知有新的leader

    • 新的leader immediately sends AppendEntries RPC (heart-beats) to everybody
  • how to set the election timeout?

    • each server picks a random election timeout
      至少大于一个heartbeart的interval.
      好处: helps avoid split votes(避免总是很多竞争者,大家都得不到majority),这样总会有某些server term很高

Raft log

  • 两种类型的log: Replicated vs. Committed entries.
    • Committed entries: 已经可以保证存在了大多数的replica中,保证了一致性。可以执行log并存入DB中。
    • Replicated
  • 尽管可能会丢失一部分log, this is okay, because the leader only responds to clients after entries commit.

persistence

  • When a server crashes, Raft can continue with one missing server, but we must repair soon to avoid dipping below a majority.
    two strategies:

    • replace with a fresh (empty) server
      requires transfer of entire log (or snapshot) to new server (slow), we must support this, in case failure is permanent
    • reboot crashed server, re-join with state intact, catch up requires state that persists across crashes, we must support this, for simultaneous power failure
  • 为了re-boot的server可以尽快加入,需要将”persistent state”(log[], currentTerm, votedFor)存入non-volatile storage(disk, SSD, battery-backed RAM), save after each change, before sending any RPC or RPC reply.

    • why log[]?
      if a server was in leader’s majority for committing an entry,
      must remember entry despite reboot, so any future leader is
      guaranteed to see the committed log entry
    • why votedFor?
      to prevent a client from voting for one candidate, then reboot,
      then vote for a different candidate in the same (or older!) term
      could lead to two leaders for the same term
    • why currentTerm?
      to ensure that term numbers only increase
      to detect RPCs from stale leaders and candidates
  • Volatile raft state:
    commitIndex, lastApplied, next/matchIndex[]
    Raft’s algorithms reconstruct them from initial values

  • persistence is often the bottleneck for performance:

    • a hard disk write takes 10 ms, SSD write takes 0.1 ms, 所以这个操作会limits us to 100 to 10,000 ops/second
    • tricks to cope with it: batch many new log entries per disk write; persist to battery-backed RAM, not disk
  • how does service (e.g. k/v server) recover its state after a crash+reboot?
    start with empty state, re-play Raft’s entire persisted log. But re-play will be too slow for a long-lived system
    faster: use Raft snapshot and replay just the tail of the log

//todo: remain to complete after the assignments

log compaction and Snapshots

if log too long, will be much larger than state-machine state! will take a long time to re-play on reboot or send to a new server

  • service periodically creates persistent “snapshot”
    service writes snapshot to persistent storage (disk)
    service tells Raft it is snapshotted through some log index
    Raft discards log before that index
  • a server’s on-disk state consists of:
    service’s snapshot up to a certain log entry
    Raft’s persisted log w/ following log entries
    the combination is equivalent to the full log

//todo: remain to complete after the assignments

configuration change

configuration means set of servers-> move to a new set of servers, or increase/decrease the number of servers.
suppose each server has the list of servers in the current config, change configuration by telling each server the new list

performance

最大的开销:
Disk writes for persistence.
Message/packet/RPC overhead.
Need to execute logged commands sequentially.
Fast path for read-only operations.

Raft sacrifice performance for simplicity:
Follower rejects out-of-order AppendEntries RPCs. Rather than saving for use after hole is filled. Maybe important if network re-orders packets a lot.
No provision for batching or pipelining AppendEntries.
Snapshotting is wasteful for big states.
A slow leader may hurt Raft, e.g. in geo-replication.

更加注重性能的系统:
Zookeeper/ZAB; Paxos Made Live; Harp

Snapshots

log太长,client需要的只是server state. 我们仅仅需要保留:
un-executed entries – not yet reflected in the state
un-committed entries – might be part of leader’s majority

Solution: service periodically creates persistent “snapshot”
什么是service state: eg, k/v table.

  • service writes snapshot to persistent storage (disk)
  • service tells Raft it is snapshotted through some log index
  • Raft discards log before that index

what happens on crash+restart?
service reads snapshot from disk
Raft reads persisted log from disk
Raft log may start before the snapshot (but definitely not after)
Raft will (re-)send committed entries on the applyCh
since applyIndex starts at zero after reboot
service will see repeats, must detect repeated index, ignore

what if follower’s log ends before leader’s log starts?
nextIndex[i] will back up to start of leader’s log
so leader can’t repair that follower with AppendEntries RPCs
thus the InstallSnapshot RPC

Duplicate Detection

可能因为丢包,导致收到了两个重复的rpc, 于是if already executed, client still needs the result.
server->(rpc) client

client picks an ID for each request, sends in RPC, same ID in re-sends of same RPC.
k/v service maintains table indexed by ID.

问题:

  • 我们什么时候可以delete table entries.
  • if new leader takes over, how does it get the duplicate table?
  • if server crashes, how does it restore its table?

解决办法(sliding window的思路):
one table entry per client, rather than one per RPC
each client has only one RPC outstanding at a time
each client numbers RPCs sequentially
when server receives client RPC #10,
it can forget about client’s lower entries
since this means client won’t ever re-send older RPCs

More detail:
each client needs a unique client ID – perhaps a 64-bit random number
client sends client ID and seq # in every RPC
repeats seq # if it re-sends
duplicate table in k/v service indexed by client ID
contains just seq #, and value if already executed
RPC handler first checks table, only Start()s if seq # > table entry
each log entry must include client ID, seq #
when operation appears on applyCh
update the seq # and value in the client’s table entry
wake up the waiting RPC handler (if any)

Spinnaker

Spinnaker为了加速read, 选择牺牲了linearizability.
Any replica can reply to a read, allowing read load to be parallelized