Skip to content

Zookeeper Overview

Eddie Dugan edited this page Oct 26, 2017 · 20 revisions

Design Principles

  • All of the metadata and the communication between NameNodes and DataNodes are stored in ZooKeeper. Therefore, NameNodes and DataNodes never talk to each other directly.
  • The NameNode doesn’t need to store any metadata to be functional.
  • Tasks that require timeouts (heartbeats, waiting for BLOCK_RECEIVED acknowledgements) assume that the global clocks are in sync.
  • Some tasks require a NameNode to iterate over an entire directory of ZNodes. This is slow, but these operations only occurs on small directories or infrequently.

NameNode

  • NameNodes only need to communicate with DataNodes to tell them to delete blocks or replicate blocks.
    • They send these commands by putting items onto work queues in Zookeeper's filesystem.
    • Each DataNode has its own delete queue and replicate queue in ZooKeeper.
    • NameNodes write items to the queues, and DataNodes watch their respective queues for new commands.
  • NameNode operations that require multiple Zookeeper calls are stored in a WorkQueue.
    • If a NameNode dies while performing a multi-step task, another can take it's place using this queue.
    • Implications:
      • The subtasks must not change during their execution.
      • Executing a subtask more than once does not change the correctness of the full task.
        • i.e. There may exist more replicas of a block in the system than the replication factor at a given time. These COULD be cleaned up every once in awhile by daemon NameNode processes (currently not implemented).

DataNode

  • DataNodes communicate with the NameNodes by putting items onto a workqueue and by modifying ZNodes in ZooKeeper’s “/health” directory.

ZNode Tree Structure

Each bullet point represents a root (/) level ZNode in ZooKeeper. The nested bullet points represent ZNode children.

* Represents a mutable data element.

<words-in-angled-brackets> denote a placeholder.

  • /file_system
    • <directory_name>: {* space_quota: long, * node_quota:int}
      • <file_name>: {replication_factor: int, block_size: long, * under_construction: int, file_type: int, access_time: uint64, owner: string, group: string, length: uint64}
        • <block_id_seq>: {id: UUID}
  • /health
    • <datanode_id>
      • Heartbeat
      • stats
      • blocks
        • <block_uuid>
  • /block_locations
    • <block_uuid>: {file: string, replication_factor: int}
      • <datanode-id>
  • /work_queues
    • wait_for_acks
      • <block_uuid>: {timeout : long, replication_factor : int}
        • <datanode-id>
    • replicate
      • <datanode_id>
        • <block_uuid>
    • delete
      • <datanode_id>
        • <block_uuid>
    • block_reports
      • <datanode_id>
        • <block_uuid> with payload size_of_block

Figure 0: ZNode Tree Structure

File System Use Cases for Zookeeper

mkdir

  • No locking required.
  • NameNode calls ZKNNClient::mkdir(path, create_parent)
    • Creates a ZNode at /fileSystem/<path>
  • If directory already exists ZK will throw an exception.
  • No race conditions between mkdir and delete

create

  • No locking required.
  • NameNode calls ZKNNClient::create_file(path, replication_factor, block_size, create_parent)
    • ZKNNClient will create a ZNode at /fileSystem/<path>
  • Returns success boolean.
    • This should return the file info, since the client would not make the transaction if it couldn't create the file at the given location. Currently, the client must make a file info request following this.
  • If file already exists ZK will throw an exception.

get_block_locations

Let block_locations be a list of (block UUID, DataNode ID) tuples

for (i = offset/blocksize ... (offset + length)/blocksize) {
  UUID = the name of “fileSystem/<path>/block-<i>”’s child ZNode
  DataNode ID = child ZNode of “block_locations/<UUID>” that is closest to the client
  Add (UUID, DataNode ID) to block_locations
}
return block_locations
  • Proximity is determined by the DataNode's "xmit" count. This is the number of in-progress transmits for a DataNode.

add_block

  • No locking required.
  • NameNode calls ZKNNClient::add_block(path, previousBlock)
    • Checks that the file is under construction
    • Checks last block was successfully ack’d (check /work_queues/wait_for_acks/<block_uuid>)
    • Adds a sequential child ZNode to the path’s ZNode at namespace/<path>/block-<#>
      • Generates a UUID and sets it as the new ZNode’s payload
    • Creates a new ZNode at block_locations/<UUID>
    • Chooses REPL_FACTOR target DataNodes to replicate the new block onto
    • Create a ZNode at /work_queues/wait_for_acks/<block_uuid> with payload REPL_FACTOR
    • Returns UUID of new block, DataNodes on which to replicate the new block

delete

  • No locking required.
  • NameNode calls ZKNNClient::delete(path, recursive)
    • Check that the file is not under construction
    • Assumes path is to a file
    • Get all the Block UUIDs for the file
    • Multi-transaction:
      • For each UUID:
        • For each child ZNode DataNodeID in /block_locations/<datanode_id>
          • Create a new sequential ZNode at work_queues/delete/<datanode_id>/<block_id_seq> with payload UUID
      • Delete the ZNode corresponding to the path.

close

  • No locking required.
  • NameNode calls ZKNNClient::close(path)
    • Check that the file IS under construction
    • For each block UUID in the file:
      • If UUID is in /work_queues/wait_for_acks, ensure that /work_queues/wait_for_acks/<block_uuid> has at least MIN_REPL_FACTOR children;
      • Otherwise, return false
    • Return true

Block Management Use Cases for Zookeeper

ReportCorruptBlock

  • No locking required.
  • Looks up current DataNodes for a block
  • Multi-transaction:
    • Place BlockUUID in delete queue for any corrupt DataNode
    • Place BlockUUID in replicate queue for a DataNode that does not currently have a replica of the block.

WaitForAcks Clean Up

  • A NameNode needs to ensure that a block has been properly replicated after a client calls AddBlock. If not, it needs to create replication work items.
  • Iterates over the child ZNodes in work_queues/wait_for_acks
    • If child has REPL_FACTOR children
      • Lock that child
      • Delete the child
      • Delete lock
    • If current time > child's replication timeout
      • If that child has 0 children, do nothing
      • Lock that child
      • Find REPL_FACTOR - child's number of children target DataNodes to replicate onto
        • Creat a ZNode at word_queues/replicate/<target-ID>/block-<#> with payload UUID.
      • Delete the child
      • Delete the lock

Processing Block Reports

  • All NameNodes will watch the children of /work_queues/block_reports/ using get_children.
  • A watch is triggered by a new block report from DataNode with <datanode_id>:
    • NameNode locks /work_queues/block_reports/<datanode_id>, and for each child ZNode UUID with payload size_of_block:
      • If /health/<datanode_id>/<UUID> exists:
        • Check that payload sizes are equal.
        • If not, report <datanode_id>'s block as corrupt.
      • If /health/<datanode_id>/<UUID> does not exist:
        • Create /health/<datanode_id>/<UUID> with payload size_of_block.
        • Create a new ZNode at /block_locations/<UUID>/<datanode_id>
    • NameNode checks /health/<datanode_id>/blocks, and for each child ZNode UUID with payload size_of_block:
      • If /work_queues/blockreport/<datanode_id>/<UUID> does not exist:
        • Delete /health/<datanode_id>/blocks/<UUID>
        • Delete /block_locations/<UUID>/<datanode_id>
  • Note: the lock on /work_queues/block_reports/<datanode_id> is not deleted until after processing /health/<datanode_id>/blocks.

Cluster Health Use Cases for Zookeeper

Watching for DataNode death

  • Each NameNode watches all ZNodes at /health/<datanode_id>.
  • If the watch is triggered and there is no ephemeral ZNode child for a DataNode
    • Lock /health/<datanode_id>
    • For each block (ZNode UUID) in /health/<datanode_id>/blocks:
      • Delete ZNode /block_locations/<UUID>/<datanode_id>.
      • Find a new target DataNode for the block.
      • If /work_queues/delete/<datanode_id>/<UUID> does not exist, create /work_queues/replicate/<targetID>/<UUID>
    • For each ZNode in /work_queues/replicate/<datanode_id>/, re-replicate the block and delete the ZNode.
    • Delete ZNode /health/<datanode-id> recursively.
    • Delete the lock.

DataNode Use Cases for Zookeeper

BlockReceived

  • Acknowledges that this DataNode has received some block with .
  • Locks /work_queues/wait_for_acks/<block-uuid>
  • Creates /work_queues/wait_for_acks/<block-uuid>/<datanode-id>
  • Deletes lock.
  • Create /block_locations/<block-uuid>/<datanode-id>
  • Create /health/<datanode-id>/<block-uuid> with payload size_of_block.

Replicating Blocks

  • DataNodes watch /work_queues/replicate/<datanode_id>
  • When the watch is triggered, the DataNode reads the first child's payload block_uuid.
  • DataNode replicates block with the given UUID.
  • Creates /health/<datanode_id>/<block_uuid> with payload size_of_block
  • Creates /block_locations/<block_uuid>/<datanode_id>
  • DataNode deletes /work_queues/replicate/<datanode_id>/<block_uuid>

Deleting Blocks

  • DataNodes watch /work_queues/delete/<datanode_id>/
  • When the watch is triggered, the DataNode reads the first child's payload block_uuid.
  • DataNode deletes the block with the given UUID.
  • DataNode deletes /work_queues/delete/<datanode_id>/
  • DataNode deletes /health/<datanode_id>/<block_uuid>
  • DataNode deletes /block_locations/<block_uuid>/<datanode_id>/

Writing a heartbeat

  • Create /health/<datanode_id>/heartbeat if it doesn't exist.
    • This ZNode has DATANODE_IS_DEAD timeout.
  • Write each storage failure sequentially to /health/<datanode_id>/failures

Writing a Block Report

  • Create /work_queues/blockreport/<datanode_id>
  • For each block with <block_uuid> on the DataNode:
    • Create /work_queues/blockreport/<datanode_id>/<block_uuid> with payload size_of_block

Link to the doc this was mostly copied from.

Clone this wiki locally