profile image

L o a d i n g . . .

728x90

Cloudflare은 클라우드 벤더 중 하나로, 최근에 내부적으로 사용하던 Pingora를 오픈소스로 공개했습니다. 클라우드 서비스를 제공함에 있어 nginx의 한계를 극복하고자 자체적으로 Pingora를 개발해서 사용했다고 합니다.  Pingora는 Cloudflare의 운영 환경에서 battle testing을 거쳤기 때문에 충분히 검증되었습니다. Pingora는 네트워크 애플리케이션 개발에 사용할 수 있는 라이브러리로, nginx와 같이 executable binary를 제공하지 않습니다. 

 

기능

  • 프로토콜: HTTP/1, HTTP/2, end-to-end grpc, TCP/UDP 
  • 커스터마이징: callback, filter 

특징

  • Rust로 작성되었기 때문에 메모리 안전성이 뛰어남 
  • 멀티스레드 아키텍처를 활용하여 뛰어난 성능 
  • 기능 커스터마이징이 쉬움 

 

이번 포스팅에서 살펴보고자 하는 것은 Pingora가 기본으로 제공하는 load balancing 알고리즘 중 consistent hashing이 어떻게 구현되어 있는지 살펴보기 위함입니다. Consistent hashing의 원리가 궁금하시다면 아래 포스팅을 참고해 주세요. 

 

[Database] DBA급 개발자로 - #21 Distributed Database 1/3

이전 포스팅에서 로깅을 활용해서 시스템 장애 시 데이터베이스를 어떻게 복구하는지 살펴봤습니다. 이번 포스팅에서는 분산 데이터베이스에 대해 살펴보겠습니다. Distributed Database 데이터베이

code-run.tistory.com

 

Consistent Hashing 구현 

Pingora는 consistent hashing을 pingora-ketama 모듈에서 구현합니다. 자료구조와 그 알고리즘에 대해 살펴보겠습니다. 

자료구조 

Bucket 

Bucket은 서버를 나타냅니다. 

#[derive(Clone, Debug, Eq, PartialEq, PartialOrd)]
pub struct Bucket {
    // The node name.
    // TODO: UDS
    node: SocketAddr,

    // The weight associated with a node. A higher weight indicates that this node should
    // receive more requests.
    weight: u32,
}

Continuum 

Consistent hash ring과 관련된 서버 주소를 저장한 자료구조입니다. 

pub struct Continuum {
    ring: Box<[Point]>,
    addrs: Box<[SocketAddr]>,
}

Point 

Point는 continuum 상의 서버의 위치를 나타냅니다. 

// A point on the continuum.
#[derive(Clone, Debug, Eq, PartialEq)]
struct Point {
    // the index to the actual address
    node: u32,
    hash: u32,
}

 

 

Consistent Hashing 알고리즘 

Consistent Hash Ring 초기화

Consistent hash ring과 관련된 정보를 저장하는 Continuum을 초기화하는 알고리즘이 매우 흥미롭습니다. 전체 코드는 아래 "더 보기"를 통해 확인할 수 있습니다. 

더보기
pub fn new(buckets: &[Bucket]) -> Self {
    // This constant is copied from nginx. It will create 160 points per weight unit. For
    // example, a weight of 2 will create 320 points on the ring.
    const POINT_MULTIPLE: u32 = 160;

    if buckets.is_empty() { 
        return Continuum {
            ring: Box::new([]),
            addrs: Box::new([]),
        };
    }

    // The total weight is multiplied by the factor of points to create many points per node.
    let total_weight: u32 = buckets.iter().fold(0, |sum, b| sum + b.weight);
    let mut ring = Vec::with_capacity((total_weight * POINT_MULTIPLE) as usize);
    let mut addrs = Vec::with_capacity(buckets.len());

    for bucket in buckets {
        let mut hasher = Hasher::new();

        // We only do the following for backwards compatibility with nginx/memcache:
        // - Convert SocketAddr to string
        // - The hash input is as follows "HOST EMPTY PORT PREVIOUS_HASH". Spaces are only added
        //   for readability.
        // TODO: remove this logic and hash the literal SocketAddr once we no longer
        // need backwards compatibility

        // with_capacity = max_len(ipv6)(39) + len(null)(1) + max_len(port)(5)
        let mut hash_bytes = Vec::with_capacity(39 + 1 + 5);
        write!(&mut hash_bytes, "{}", bucket.node.ip()).unwrap();
        write!(&mut hash_bytes, "\0"). unwrap();
        write!(&mut hash_bytes, "{}", bucket.node.port()).unwrap();
        hasher.update(hash_bytes.as_ref());

        // A higher weight will add more points for this node.
        let num_points = bucket.weight * POINT_MULTIPLE;

        // This is appended to the crc32 hash for each point.
        let mut prev_hash: u32 = 0;
        addrs.push(bucket.node);
        let node = addrs.len() - 1;
        for _ in 0..num_points {
            let mut hasher = hasher.clone();
            hasher.update(&prev_hash.to_le_bytes());

            let hash = hasher.finalize();
            ring.push(Point::new(node as u32, hash));
            prev_hash = hash;
        }
    }

    // Sort and remove any duplicates.
    ring.sort();
    ring.dedup_by(|a, b| a.hash == b.hash);

    Continuum {
        ring: ring.into_boxed_slice(),
        addrs: addrs.into_boxed_slice(),
    }
}

 

그럼 각각의 코드에 대해 살펴보겠습니다. 

// This constant is copied from nginx. It will create 160 points per weight unit. For
// example, a weight of 2 will create 320 points on the ring.
const POINT_MULTIPLE: u32 = 160;

if buckets.is_empty() {
    return Continuum {
        ring: Box::new([]),
        addrs: Box::new([]),
    };
}

// The total weight is multiplied by the factor of points to create many points per node.
let total_weight: u32 = buckets.iter().fold(0, |sum, b| sum + b.weight);
let mut ring = Vec::with_capacity((total_weight * POINT_MULTIPLE) as usize);
let mut addrs = Vec::with_capacity(buckets.len());

 

Consistent hash는 고른 load balancing을 위해 노드(서버) 당 1개 이상의 point를 ring에 위치시킵니다. Pingora의 경우 각 노드당 160(POINT_MULTIPLE) 개의 point를 ring에 위치시킵니다. 만약 해당 노드의 weight가 1보다 큰 경우, POINT_MULTIPLE에 weight를 곱한 값만큼의 수의 point를 hash ring에 위치시킵니다. 

"let mut ring = Vec::with_capacity((total_weight * POINT_MULTIPLE) as usize);"을 보면 ring의 크기를 "전체 weight" * "POINT_MULTIPLE"로 초기화하는 걸 확인할 수 있습니다. 

 

for bucket in buckets {
    let mut hasher = Hasher::new();

    // We only do the following for backwards compatibility with nginx/memcache:
    // - Convert SocketAddr to string
    // - The hash input is as follows "HOST EMPTY PORT PREVIOUS_HASH". Spaces are only added
    //   for readability.
    // TODO: remove this logic and hash the literal SocketAddr once we no longer
    // need backwards compatibility

    // with_capacity = max_len(ipv6)(39) + len(null)(1) + max_len(port)(5)
    let mut hash_bytes = Vec::with_capacity(39 + 1 + 5);
    write!(&mut hash_bytes, "{}", bucket.node.ip()).unwrap();
    write!(&mut hash_bytes, "\0").unwrap();
    write!(&mut hash_bytes, "{}", bucket.node.port()).unwrap();
    hasher.update(hash_bytes.as_ref()); // 1차 해시 계산 

    // A higher weight will add more points for this node.
    let num_points = bucket.weight * POINT_MULTIPLE; // 해당 버킷의 point 갯수 

    // This is appended to the crc32 hash for each point.
    let mut prev_hash: u32 = 0;
    addrs.push(bucket.node);
    let node = addrs.len() - 1;
    for _ in 0..num_points {
        let mut hasher = hasher.clone();
        hasher.update(&prev_hash.to_le_bytes()); // 이전 해시값(prev_hash) 포함해서 계산 

        let hash = hasher.finalize();
        ring.push(Point::new(node as u32, hash)); // ring에 point를 추가 
        prev_hash = hash;
    }
}

 

위 코드는 ring에 각 bucket에 해당하는 points를 배치하는 동작을 수행합니다. 해시값을 계산하는 조합은 ip + port + prev_hash(계산된 이전 해시값)이고 총 num_points(해당 bucket의 weight * POINT_MULTIPLE)의 수만큼 point를 배치합니다.  

 

// Sort and remove any duplicates.
ring.sort();
ring.dedup_by(|a, b| a.hash == b.hash);

Continuum {
    ring: ring.into_boxed_slice(),
    addrs: addrs.into_boxed_slice(),
}

 

마지막으로 ring을 정렬 후 중복된 point를 제거합니다. 

 

Consistent Ring에서 노드 검색 

// Hash the given `hash_key` to the server address.
pub fn node(&self, hash_key: &[u8]) -> Option<SocketAddr> {
    self.ring
        .get(self.node_idx(hash_key)) // should we unwrap here?
        .map(|p| self.addrs[p.node as usize])
}

// Find the associated index for the given input.
pub fn node_idx(&self, input: &[u8]) -> usize {
    let hash = crc32fast::hash(input);

    // The `Result` returned here is either a match or the error variant returns where the
    // value would be inserted.
    match self.ring.binary_search_by(|p| p.hash.cmp(&hash)) {
        Ok(i) => i,
        Err(i) => {
            // We wrap around to the front if this value would be inserted at the end.
            if i == self.ring.len() {
                0
            } else {
                i
            }
        }
    }
}

 

Consistent hash ring에서의 검색은 이진탐색(binary search)을 활용합니다. Point에 저장된 해시값을 활용해 이진탐색을 수행함으로써 검색 성능을 높이는 것을 알 수 있습니다. 

 

마무리 

이전에 경험이 없던 "고성능 네트워크 라이브러리 + Rust"의 조합이라 코드가 복잡할 것으로 예상했지만, 실제로는 생각보다 코드가 단순하여 놀랐습니다. Java의 네트워크 애플리케이션과 관련된 코드를 보면 동시성 제어를 위해 많은 복잡한 코드가 필요하지만, Rust가 제공하는 동시성 기능 덕분에 코드가 더 간단해 보입니다. 

728x90
복사했습니다!