动手实现一致性哈希算法,并搭建环境测试其负载均衡特性.

Blandy 2019-05-06 01:45127 阅读

一. 简述一致性哈希算法

  • 这里不详细介绍一致性哈希算法的起源了, 网上能方便地搜到许多介绍一致性哈希算法的好文章. 本文主要想动手实现一致性哈希算法, 并搭建一个环境进行实战测试.
  • 在开始之前先整理一下算法的思路:
  • 一致性哈希算法通过把每台服务器的哈希值打在哈希环上, 把哈希环分成不同的段, 然后对到来的请求计算哈希值从而得知该请求所归属的服务器. 这个办法解决了传统服务器增减机器时需要重新计算哈希的麻烦.
  • 但如果服务器的数量较少, 可能导致计算出的哈希值相差较小, 在哈希环上分布不均匀, 导致某台服务器过载. 为了解决负载均衡问题, 我们引入虚拟节点技术, 为每台服务器分配一定数量的节点, 通过节点的哈希值在哈希环上进行划分. 这样一来, 我们就可以根据机器的性能为其分配节点, 性能好就多分配一点, 差就少一点, 从而达到负载均衡.

二. 实现一致性哈希算法.

  • 奠定了整体思路后我们开始考虑实现的细节

哈希算法的选择

  • 选择能散列出32位整数的FNV算法, 由于该哈希函数可能产生负数, 需要作取绝对值处理.

请求节点在哈希环上寻找对应服务器的策略

  • 策略为: 新节点寻找最近比且它大的节点, 比如说现在已经有环[0, 5, 7, 10], 来了个哈希值为6的节点, 那么它应该由哈希值为7对应的服务器处理. 如果请求节点所计算的哈希值大于环上的所有节点, 那么就取第一个节点. 比如来了个11, 将分配到0所对应的节点.

哈希环的组织结构

  • 开始的时候想过用顺序存储的结构存放, 但是在一致性哈希中, 最频繁的操作是在集合中查找最近且比目标大的数. 如果用顺序存储结构的话, 时间复杂度是收敛于O(N)的, 而树形结构则为更优的O(logN).
  • 但凡事有两面, 采用树形结构存储的代价是数据初始化的效率较低, 而且运行期间如果有节点插入删除的话效率也比较低. 但是在现实中, 服务器在一开始注册后基本上就不怎么变了, 期间增减机器, 宕机, 机器修复等事件的频率相比起节点的查询简直是微不足道. 所以本案例决定使用使用树形结构存储.
  • 贴合上述要求, 并且提供有序存储的首先想到的是红黑树, 而且Java中提供了红黑树的实现TreeMap.

虚拟节点与真实节点的映射关系

  • 如何确定一个虚拟节点对应的真实节点也是个问题. 理论上应该维护一张表记录真实节点与虚拟节点的映射关系. 本引入案例为了演示采用简单的字符串处理. 比方说服务器192.168.0.1:8888分配了1000个虚拟节点, 那么它的虚拟节点名称从192.168.0.1:8888@1一直到192.168.0.1:8888@1000. 通过这样的处理, 我们在通过虚拟节点找真实节点时只需要裁剪字符串即可.

  • 计划定制好后, 下面开始怼代码
public class ConsistentHashTest {
    /**
     * 服务器列表,一共有3台服务器提供服务, 将根据性能分配虚拟节点
     */
    public static String[] servers = {
            "192.168.0.1#100", //服务器1: 性能指数100, 将获得1000个虚拟节点
            "192.168.0.2#100", //服务器2: 性能指数100, 将获得1000个虚拟节点
            "192.168.0.3#30"   //服务器3: 性能指数30,  将获得300个虚拟节点
    };
    /**
     * 真实服务器列表, 由于增加与删除的频率比遍历高, 用链表存储比较划算
     */
    private static List<String> realNodes = new LinkedList<>();
    /**
     * 虚拟节点列表
     */
    private static TreeMap<Integer, String> virtualNodes = new TreeMap<>();

    static{
        for(String s : servers){
            //把服务器加入真实服务器列表中
            realNodes.add(s);
            String[] strs = s.split("#");
            //服务器名称, 省略端口号
            String name = strs[0];
            //根据服务器性能给每台真实服务器分配虚拟节点, 并把虚拟节点放到虚拟节点列表中.
            int virtualNodeNum = Integer.parseInt(strs[1]) * 10;
            for(int i = 1; i <= virtualNodeNum; i++){
                virtualNodes.put(FVNHash(name + "@" + i), name + "@" + i);
            }
        }
    }

    public static void main(String[] args) {
        new Thread(new RequestProcess()).start();
    }

    static class RequestProcess implements Runnable{
        @Override
        public void run() {
            String client = null;
            while(true){
                //模拟产生一个请求
                client = getN() + "." + getN() + "." + getN() + "." + getN() + ":" + (1000 + (int)(Math.random() * 9000));
                //计算请求的哈希值
                int hash = FVNHash(client);
                //判断请求将由哪台服务器处理
                System.out.println(client + " 的请求将由 " + getServer(client) + " 处理");
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }
    }

    private static String getServer(String client) {
        //计算客户端请求的哈希值
        int hash = FVNHash(client);
        //得到大于该哈希值的所有map集合
        SortedMap<Integer, String> subMap = virtualNodes.tailMap(hash);
        //找到比该值大的第一个虚拟节点, 如果没有比它大的虚拟节点, 根据哈希环, 则返回第一个节点.
        Integer targetKey = subMap.size() == 0 ? virtualNodes.firstKey() : subMap.firstKey();
        //通过该虚拟节点获得真实节点的名称
        String virtualNodeName = virtualNodes.get(targetKey);
        String realNodeName = virtualNodeName.split("@")[0];
        return realNodeName;
    }

    public static int getN(){
        return (int)(Math.random() * 128);
    }

    public static int FVNHash(String data){
        final int p = 16777619;
        int hash = (int)2166136261L;
        for(int i = 0; i < data.length(); i++)
            hash = (hash ^ data.charAt(i)) * p;
        hash += hash << 13;
        hash ^= hash >> 7;
        hash += hash << 3;
        hash ^= hash >> 17;
        hash += hash << 5;
        return hash < 0 ? Math.abs(hash) : hash;
    }
}

/* 运行结果片段
55.1.13.47:6240     的请求将由 192.168.0.1 处理
5.49.56.126:1105    的请求将由 192.168.0.1 处理
90.41.8.88:6884     的请求将由 192.168.0.2 处理
26.107.104.81:2989  的请求将由 192.168.0.2 处理
114.66.6.56:8233    的请求将由 192.168.0.1 处理
123.74.52.94:5523   的请求将由 192.168.0.1 处理
104.59.60.2:7502    的请求将由 192.168.0.2 处理
4.94.30.79:1299     的请求将由 192.168.0.1 处理
10.44.37.73:9332    的请求将由 192.168.0.2 处理
115.93.93.82:6333   的请求将由 192.168.0.2 处理
15.24.97.66:9177    的请求将由 192.168.0.2 处理
100.39.98.10:1023   的请求将由 192.168.0.2 处理
61.118.87.26:5108   的请求将由 192.168.0.2 处理
17.79.104.35:3901   的请求将由 192.168.0.1 处理
95.36.5.25:8020     的请求将由 192.168.0.2 处理
126.74.56.71:7792   的请求将由 192.168.0.2 处理
14.63.56.45:8275    的请求将由 192.168.0.1 处理
58.53.44.71:2089    的请求将由 192.168.0.3 处理
80.64.57.43:6144    的请求将由 192.168.0.2 处理
46.65.4.18:7649     的请求将由 192.168.0.2 处理
57.35.27.62:9607    的请求将由 192.168.0.2 处理
81.114.72.3:3444    的请求将由 192.168.0.1 处理
38.18.61.26:6295    的请求将由 192.168.0.2 处理
71.75.18.82:9686    的请求将由 192.168.0.2 处理
26.11.98.111:3781   的请求将由 192.168.0.1 处理
62.86.23.37:8570    的请求将由 192.168.0.3 处理
*/

 

  • 经过上面的测试我们可以看到性能较好的服务器1和服务器2分担了大部分的请求, 只有少部分请求落到了性能较差的服务器3上, 已经初步实现了负载均衡.
  • 下面我们将结合zookeeper, 搭建一个更加逼真的服务器集群, 看看在部分服务器上线下线的过程中, 一致性哈希算法是否仍能够实现负载均衡.

 

三. 结合zookeeper搭建环境

环境介绍

  • 首先会通过启动多台虚拟机模拟服务器集群, 各台服务器都提供一个相同的接口供消费者消费.
  • 同时会有一个消费者线程不断地向服务器集群发起请求, 这些请求会经过一致性哈希算法均衡负载到各个服务器.
  • 为了能够模拟上述场景, 我们必须在客户端维护一个服务器列表, 使得客户端能够通过一致性哈希算法选择服务器发送. (现实中可能会把一致性哈希算法实现在前端服务器, 客户先访问前端服务器, 再路由到后端服务器集群).
  • 但是我们的重点是模拟服务器的宕机和上线, 看看一致性哈希算法是否仍能实现负载均衡. 所以客户端必须能够感知服务器端的变化并动态地调整它的服务器列表.
  • 为了完成这项工作, 我们引入zookeeper, zookeeper的数据一致性算法保证数据实时, 准确, 客户端能够通过zookeeper得知实时的服务器情况.
  • 具体操作是这样的: 服务器集群先以临时节点的方式连接到zookeeper, 并在zookeeper上注册自己的接口服务(注册节点). 客户端连接上zookeeper后, 把已注册的节点(服务器)添加到自己的服务器列表中.
  • 如果有服务器宕机的话, 由于当初注册的是瞬时节点的原因, 该台服务器节点会从zookeeper中注销. 客户端监听到服务器节点有变时, 也会动态调整自己的服务器列表, 把当宕机的服务器从服务器列表中删除, 因此不会再向该服务器发送请求, 负载均衡的任务将交到剩余的机器身上.
  • 当有服务器从新连接上集群后, 客户端的服务器列表也会更新, 哈希环也将做出相应的变化以提供负载均衡.

具体操作:

I. 搭建zookeeper集群环境:

  1. 创建3个zookeeper服务, 构成集群. 在各自的data文件夹中添加一个myid文件, 各个id分别为1, 2, 3.
    在这里插入图片描述
  2. 重新复制一份配置文件, 在配置文件中配置各个zookeeper的端口号. 本案例中三台zookeeper分别在2181, 2182, 2183端口
    在这里插入图片描述
  3. 启动zookeeper集群

    由于zookeeper不是本案例的重点, 细节暂不展开讲了.

 

II. 创建服务器集群, 提供RPC远程调用服务

  1. 首先创建一个服务器项目(使用Maven), 添加zookeeper依赖
  2. 创建常量接口, 用于存储连接zookeeper 的信息
public interface Constant {
    //zookeeper集群的地址
    String ZK_HOST = "192.168.117.129:2181,192.168.117.129:2182,192.168.117.129:2183";
    //连接zookeeper的超时时间
    int ZK_TIME_OUT = 5000;
    //服务器所发布的远程服务在zookeeper中的注册地址, 也就是说这个节点中保存了各个服务器提供的接口
    String ZK_REGISTRY = "/provider";
    //zookeeper集群中注册服务的url地址的瞬时节点
    String ZK_RMI = ZK_REGISTRY + "/rmi";
}

 
3.封装操作zookeeper和发布远程服务的接口供自己调用, 本案例中发布远程服务使用Java自身提供的rmi包完成, 如果没有了解过可以参考这篇

public class ServiceProvider {

    private CountDownLatch latch = new CountDownLatch(1);

    /**
     * 连接zookeeper集群
     */
    public ZooKeeper connectToZK(){
        ZooKeeper zk = null;
        try {
            zk = new ZooKeeper(Constant.ZK_HOST, Constant.ZK_TIME_OUT, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    //如果连接上了就唤醒当前线程.
                    latch.countDown();
                }
            });
            latch.await();//还没连接上时当前线程等待
        } catch (Exception e) {
            e.printStackTrace();
        }
        return zk;
    }

    /**
     * 创建znode节点
     * @param zk
     * @param url 节点中写入的数据
     */
    public void createNode(ZooKeeper zk, String url){
        try{
            //要把写入的数据转化为字节数组
            byte[] data = url.getBytes();
            zk.create(Constant.ZK_RMI, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 发布rmi服务
     */
    public String publishService(Remote remote, String host, int port){
        String url = null;
        try{
            LocateRegistry.createRegistry(port);
            url = "rmi://" + host + ":" + port + "/rmiService";
            Naming.bind(url, remote);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return url;
    }

    /**
     * 发布rmi服务, 并且将服务的url注册到zookeeper集群中
     */
    public void publish(Remote remote, String host, int port){
        //调用publishService, 得到服务的url地址
        String url = publishService(remote, host, port);
        if(null != url){
            ZooKeeper zk = connectToZK();//连接到zookeeper
            if(null != zk){
                createNode(zk, url);
            }
        }
    }
}

 

  1. 自定义远程服务. 服务提供一个简单的方法: 客户端发来一个字符串, 服务器在字符串前面添加上Hello, 并返回字符串.
//UserService
public interface UserService extends Remote {
    public String helloRmi(String name) throws RemoteException;
}
//UserServiceImpl
public class UserServiceImpl implements UserService {

    public UserServiceImpl() throws RemoteException{
        super();
    }

    @Override
    public String helloRmi(String name) throws RemoteException {
        return "Hello " + name + "!";
    }
}

 

  1. 修改端口号, 启动多个java虚拟机, 模拟服务器集群. 为了方便演示, 自定义7777, 8888, 9999端口开启3个服务器进程, 到时会模拟7777端口的服务器宕机和修复重连.
public static void main(String[] args) throws RemoteException {
    //创建工具类对象
    ServiceProvider sp = new ServiceProvider();
    //创建远程服务对象
    UserService userService = new UserServiceImpl();
    //完成发布
    sp.publish(userService, "localhost", 9999);
}

在这里插入图片描述

 

III. 编写客户端程序(运用一致性哈希算法实现负载均衡

  1. 封装客户端接口.
public class ServiceConsumer {
    /**
     * 提供远程服务的服务器列表, 只记录远程服务的url
     */
    private volatile List<String> urls = new LinkedList<>();
    /**
     * 远程服务对应的虚拟节点集合
     */
    private static TreeMap<Integer, String> virtualNodes = new TreeMap<>();

    public ServiceConsumer(){
        ZooKeeper zk = connectToZK();//客户端连接到zookeeper
        if(null != zk){
            //连接上后关注zookeeper中的节点变化(服务器变化)
            watchNode(zk);
        }
    }

    private void watchNode(final ZooKeeper zk) {
        try{
            //观察/provider节点下的子节点是否有变化(是否有服务器登入或登出)
            List<String> nodeList = zk.getChildren(Constants.ZK_REGISTRY, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    //如果服务器节点有变化就重新获取
                    if(watchedEvent.getType() == Event.EventType.NodeChildrenChanged){
                        System.out.println("服务器端有变化, 可能有旧服务器宕机或者新服务器加入集群...");
                        watchNode(zk);
                    }
                }
            });
            //将获取到的服务器节点数据保存到集合中, 也就是获得了远程服务的访问url地址
            List<String> dataList = new LinkedList<>();
            TreeMap<Integer, String> newVirtualNodesList = new TreeMap<>();
            for(String nodeStr : nodeList){
                byte[] data = zk.getData(Constants.ZK_REGISTRY + "/" + nodeStr, false, null);
                //放入服务器列表的url
                String url = new String(data);
                //为每个服务器分配虚拟节点, 为了方便模拟, 默认开启在9999端口的服务器性能较差, 只分配300个虚拟节点, 其他分配1000个.
                if(url.contains("9999")){
                    for(int i = 1; i <= 300; i++){
                        newVirtualNodesList.put(FVNHash(url + "@" + i), url + "@" + i);
                    }
                }else{
                    for(int i = 1; i <= 1000; i++){
                        newVirtualNodesList.put(FVNHash(url + "@" + i), url + "@" + i);
                    }
                }
                dataList.add(url);
            }
            urls = dataList;
            virtualNodes = newVirtualNodesList;
            dataList = null;//好让垃圾回收器尽快收集
            newVirtualNodesList = null;
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 根据url获得远程服务对象
     */
    public <T> T lookUpService(String url){
        T remote = null;
        try{
            remote = (T)Naming.lookup(url);
        } catch (Exception e) {
            //如果该url连接不上, 很有可能是该服务器挂了, 这时使用服务器列表中的第一个服务器url重新获取远程对象.
            if(e instanceof ConnectException){
                if (urls.size() != 0){
                    url = urls.get(0);
                    return lookUpService(url);
                }
            }
        }
        return remote;
    }

    /**
     * 通过一致性哈希算法, 选取一个url, 最后返回一个远程服务对象
     */
    public <T extends Remote> T lookUp(){
        T service = null;
        //随机计算一个哈希值
        int hash = FVNHash(Math.random() * 10000 + "");
        //得到大于该哈希值的所有map集合
        SortedMap<Integer, String> subMap = virtualNodes.tailMap(hash);
        //找到比该值大的第一个虚拟节点, 如果没有比它大的虚拟节点, 根据哈希环, 则返回第一个节点.
        Integer targetKey = subMap.size() == 0 ? virtualNodes.firstKey() : subMap.firstKey();
        //通过该虚拟节点获得服务器url
        String virtualNodeName = virtualNodes.get(targetKey);
        String url = virtualNodeName.split("@")[0];
        //根据服务器url获取远程服务对象
        service = lookUpService(url);
        System.out.print("提供本次服务的地址为: " + url + ", 返回结果: ");
        return service;
    }

    private CountDownLatch latch = new CountDownLatch(1);

    public ZooKeeper connectToZK(){
        ZooKeeper zk = null;
        try {
            zk = new ZooKeeper(Constants.ZK_HOST, Constants.ZK_TIME_OUT, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    //判断是否连接zk集群
                    latch.countDown();//唤醒处于等待状态的当前线程
                }
            });
            latch.await();//没有连接上的时候当前线程处于等待状态.
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return zk;
    }


    public static int FVNHash(String data){
        final int p = 16777619;
        int hash = (int)2166136261L;
        for(int i = 0; i < data.length(); i++)
            hash = (hash ^ data.charAt(i)) * p;
        hash += hash << 13;
        hash ^= hash >> 7;
        hash += hash << 3;
        hash ^= hash >> 17;
        hash += hash << 5;
        return hash < 0 ? Math.abs(hash) : hash;
    }
}

 

  1. 启动客户端进行测试
public static void main(String[] args){
    ServiceConsumer sc = new ServiceConsumer();//创建工具类对象
    while(true){
        //获得rmi远程服务对象
        UserService userService = sc.lookUp();
        try{
            //调用远程方法
            String result = userService.helloRmi("炭烧生蚝");
            System.out.println(result);
            Thread.sleep(100);
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}

 

  1. 客户端跑起来后, 在显示台不断进行打印...下面将对数据进行统计.
    在这里插入图片描述
    在这里插入图片描述
     

    IV. 对服务器调用数据进行统计分析

  • 重温一遍模拟的过程: 首先分别在7777, 8888, 9999端口启动了3台服务器. 然后启动客户端进行访问. 7777, 8888端口的两台服务器设置性能指数为1000, 而9999端口的服务器性能指数设置为300.
  • 在客户端运行期间, 我手动关闭了8888端口的服务器, 客户端正常打印出服务器变化信息. 此时理论上不会有访问被路由到8888端口的服务器. 当我重新启动8888端口服务器时, 客户端打印出服务器变化信息, 访问能正常到达8888端口服务器.
  • 下面对各服务器的访问量进行统计, 看是否实现了负载均衡.
  • 测试程序如下:
public class DataStatistics {
    private static float ReqToPort7777 = 0;
    private static float ReqToPort8888 = 0;
    private static float ReqToPort9999 = 0;

    public static void main(String[] args) {
        BufferedReader br = null;
        try {
            br = new BufferedReader(new FileReader("C://test.txt"));
            String line = null;
            while(null != (line = br.readLine())){
                if(line.contains("7777")){
                    ReqToPort7777++;
                }else if(line.contains("8888")){
                    ReqToPort8888++;
                }else if(line.contains("9999")){
                    ReqToPort9999++;
                }else{
                    print(false);
                }
            }
            print(true);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            if(null != br){
                try {
                    br.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                br = null;
            }
        }
    }

    private static void print(boolean isEnd){
        if(!isEnd){
            System.out.println("------------- 服务器集群发生变化 -------------");
        }else{
            System.out.println("------------- 最后一次统计 -------------");
        }
        System.out.println("截取自上次服务器变化到现在: ");
        float total = ReqToPort7777 + ReqToPort8888 + ReqToPort9999;
        System.out.println("7777端口服务器访问量为: " + ReqToPort7777 + ", 占比" + (ReqToPort7777 / total));
        System.out.println("8888端口服务器访问量为: " + ReqToPort8888 + ", 占比" + (ReqToPort8888 / total));
        System.out.println("9999端口服务器访问量为: " + ReqToPort9999 + ", 占比" + (ReqToPort9999 / total));
        ReqToPort7777 = 0;
        ReqToPort8888 = 0;
        ReqToPort9999 = 0;
    }
}

/* 以下是输出结果
------------- 服务器集群发生变化 -------------
截取自上次服务器变化到现在: 
7777端口服务器访问量为: 198.0, 占比0.4419643
8888端口服务器访问量为: 184.0, 占比0.4107143
9999端口服务器访问量为: 66.0, 占比0.14732143
------------- 服务器集群发生变化 -------------
截取自上次服务器变化到现在: 
7777端口服务器访问量为: 510.0, 占比0.7589286
8888端口服务器访问量为: 1.0, 占比0.0014880953
9999端口服务器访问量为: 161.0, 占比0.23958333
------------- 最后一次统计 -------------
截取自上次服务器变化到现在: 
7777端口服务器访问量为: 410.0, 占比0.43248945
8888端口服务器访问量为: 398.0, 占比0.41983122
9999端口服务器访问量为: 140.0, 占比0.14767933
*/

 

V. 结果

  • 从测试数据可以看出, 不管是8888端口服务器宕机之前, 还是宕机之后, 三台服务器接收的访问量和性能指数成正比. 成功地验证了一致性哈希算法的负载均衡作用.

 

四. 扩展思考

  • 初识一致性哈希算法的时候, 对这种奇特的思路佩服得五体投地. 但是一致性哈希算法除了能够让后端服务器实现负载均衡, 还有一个特点可能是其他负载均衡算法所不具备的.
  • 这个特点是基于哈希函数的, 我们知道通过哈希函数, 固定的输入能够产生固定的输出. 换句话说, 同样的请求会路由到相同的服务器. 这点就很牛逼了, 我们可以结合一致性哈希算法和缓存机制提供后端服务器的性能.
  • 比如说在一个分布式系统中, 有一个服务器集群提供查询用户信息的方法, 每个请求将会带着用户的uid到达, 我们可以通过哈希函数进行处理(从上面的演示代码可以看到, 这点是可以轻松实现的), 使同样的uid路由到某个独定的服务器. 这样我们就可以在服务器上对该的uid背后的用户信息进行缓存, 从而减少对数据库或其他中间件的操作, 从而提高系统效率.
  • 当然如果使用该策略的话, 你可能还要考虑缓存更新等操作, 但作为一种优良的策略, 我们可以考虑在适当的场合灵活运用.
  • 以上思考受启发于Dubbo框架中对其实现的四种负载均衡策略的描述.
回复数量: 0
暂无评论~~
  请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!