现在的位置: 首页 > 自动控制 > 工业·编程 > 正文

redis分布式锁和zookeeper分布式锁

2019-10-01 06:55 工业·编程 ⁄ 共 7511字 ⁄ 字号 暂无评论

在分布式系统的时代中,微服务、服务治理、中间件等名次已经贯穿了我们开发的各个过程,给开发者们提供了很多便捷的方式去更好的开发大型的web系统,这篇文章,主要给大家简单说一下分布式锁。

在我们常见的xxx管理系统中,没有那么大的数据量,也没必要去引入中间件去搞。但,时代不同了,无论从tob端或者toc端,目前大型的系统网站,在开发初期,在技术选型上,都在追求或者考虑未来的高并发、高可用的场景。往往单一的jvm实例子便可通过传统的加锁方式去实现,但若是一个服务部署在多台机器上呢,或者是多个服务都在修改应用层某个共享实例呢。此时,我们的分布式锁便派上用场了

一、什么是分布式锁
分布式锁顾名思义,在分布式系统下,在高并发的场景下,我们为了协调资源不被随意修改而做的对系统共享资源的保护。

举个例子:用户下单后减库存,用户A请求应用服务器,对productA商品下单并去创建了新的订单,库存系统同步去减库存,若此时用户B也去请求该商品,并去下单,但用户B可能请求的不是一个服务器,因为在分布式环境下,一个服务可能部署在多台主机上,那么B此时拿到的库存数量可能还是之前的那个数量,这就可能导致超卖情况发生。

以上只是简单的一个说明,用户下单和减库存,还要考虑到的东西很多。

二、解决了哪些问题
协调各个服务节点去修改共享资源,保证数据正确性。

三、分布式锁实现原理
(1)redis分布式锁

原理:利用redis的setnx、expire、getset命令对应的提供的API函数接口实现;

   获取redis锁步骤如下:

a、线程A获取redis锁,通过setnx(key, 当前时间戳+key的失效时间)命令,若返回1,则redis缓存中不存在该key,则获得锁;
b、若a中存在该key,则继续循环判断,重复的去获取该key对应value,通过get(key)获取;
c、若b中获取到的value值 < 当前时间,则表明该key已经在缓存中失效了,线程A已经有了获取redis锁的权利了;
d、此时线程A通过getset命令,将当前时间戳设置到该key对应的缓存上,并返回旧的值;
e、若返回的旧值 == 在b中获取的值,则此时线程A成功获取到锁;
f、若不相等,则表明有其他线程在线程A之前已经获取到该锁,则线程A又要循环往复的在去判断;
总而言之,自我感觉有点像自旋锁的意思,通过CAS不断去比较当前修改的旧值和期待的值是否一致;

以下是redis锁的实现原理,包含了释放锁的过程;

public class RedisKeyLock {
    private static Logger logger = Logger.getLogger(RedisKeyLock.class);
    private final static long ACCQUIRE_LOCK_TIMEOUT_IN_MS = 10 * 1000;
    private final static int EXPIRE_IN_SECOND = 5;//锁失效时间
    private final static long WAIT_INTERVAL_IN_MS = 100;
    private static RedisKeyLock lock;
    private JedisPool jedisPool;
    private RedisKeyLock(JedisPool pool){
        this.jedisPool = pool;
    }
    public static RedisKeyLock getInstance(JedisPool pool){
        if(lock == null){
            lock = new RedisKeyLock(pool);
        }
        return lock;
    }
 
    public void lock(final String redisKey) {
        Jedis resource = null;
        try {
            long now = System.currentTimeMillis();
            resource = jedisPool.getResource();
            long timeoutAt = now + ACCQUIRE_LOCK_TIMEOUT_IN_MS;
            boolean flag = false;
            while (true) {
                String expireAt = String.valueOf(now + EXPIRE_IN_SECOND * 1000);
                long ret = resource.setnx(redisKey, expireAt);
                if (ret == 1) {//已获取锁
                    flag = true;
                    break;
                } else {//未获取锁,重试获取锁
                    String oldExpireAt = resource.get(redisKey);
                    if (oldExpireAt != null && Long.parseLong(oldExpireAt) < now) {
                        oldExpireAt = resource.getSet(redisKey, expireAt);
                        if (Long.parseLong(oldExpireAt) < now) {
                            flag = true;
                            break;
                        }
                    }
                }
                if (timeoutAt < now) {
                    break;
                }
              TimeUnit.NANOSECONDS.sleep(WAIT_INTERVAL_IN_MS);
            }
            if (!flag) {
                throw new RuntimeException("canot acquire lock now ...");
            }
        } catch (JedisException je) {
            logger.error("lock", je);
            je.printStackTrace();
            if (resource != null) {
                jedisPool.returnBrokenResource(resource);
            }
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("lock", e);
        } finally {
            if (resource != null) {
                jedisPool.returnResource(resource);
            }
        }
    }
    public boolean unlock(final String redisKey) {
        Jedis resource = null;
        try {
            resource = jedisPool.getResource();
            resource.del(redisKey);
            return true;
        } catch (JedisException je) {
            je.printStackTrace();
            if (resource != null) {
                jedisPool.returnBrokenResource(resource);
            }
            return false;
        } catch (Exception e) {
            logger.error("lock", e);
            return false;
        } finally {
            if (resource != null) {
                jedisPool.returnResource(resource);
            }
        }
    }
}
 

(2)zookeeper分布式锁

原理:利用zookeeper系统结构,即类似linux的树形文件目录结构,根节点下有很多子节点,服务在访问zk时,通过创建临时顺序节点方式,并去比较当前节点的序号是否为其中最小,若为最小,则获得锁,并通知后续节点,删除自身节点;否则,进入等待状态;

以下是获取和释放的代码

public class ZooKeeperDistributedLock implements Watcher{
 
    private ZooKeeper zk;
    private String locksRoot= "/locks";
    private String productId;
    private String waitNode;
    private String lockNode;
    private CountDownLatch latch;
    private CountDownLatch connectedLatch = new CountDownLatch(1);
private int sessionTimeout = 30000;
 
    public ZooKeeperDistributedLock(String productId){
        this.productId = productId;
         try {
       String address = "192.168.31.187:2181,192.168.31.19:2181,192.168.31.227:2181";
            zk = new ZooKeeper(address, sessionTimeout, this);
            connectedLatch.await();
        } catch (IOException e) {
            throw new LockException(e);
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
    }
 
    public void process(WatchedEvent event) {
        if(event.getState()==KeeperState.SyncConnected){
            connectedLatch.countDown();
            return;
        }
 
        if(this.latch != null) {
            this.latch.countDown();
        }
    }
 
    public void acquireDistributedLock() {
        try {
            if(this.tryLock()){
                return;
            }
            else{
                waitForLock(waitNode, sessionTimeout);
            }
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
}
 
    public boolean tryLock() {
        try {
         // 传入进去的locksRoot + “/” + productId
        // 假设productId代表了一个商品id,比如说1
        // locksRoot = locks
        // /locks/10000000000,/locks/10000000001,/locks/10000000002
            lockNode = zk.create(locksRoot + "/" + productId, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
 
            // 看看刚创建的节点是不是最小的节点
         // locks:10000000000,10000000001,10000000002
            List<String> locks = zk.getChildren(locksRoot, false);
            Collections.sort(locks);
 
            if(lockNode.equals(locksRoot+"/"+ locks.get(0))){
                //如果是最小的节点,则表示取得锁
                return true;
            }
 
            //如果不是最小的节点,找到比自己小1的节点
      int previousLockIndex = -1;
            for(int i = 0; i < locks.size(); i++) {
        if(lockNode.equals(locksRoot + “/” + locks.get(i))) {
                     previousLockIndex = i - 1;
            break;
        }
       }
 
       this.waitNode = locks.get(previousLockIndex);
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
        return false;
    }
 
    private boolean waitForLock(String waitNode, long waitTime) throws InterruptedException, KeeperException {
        Stat stat = zk.exists(locksRoot + "/" + waitNode, true);
        if(stat != null){
            this.latch = new CountDownLatch(1);
            this.latch.await(waitTime, TimeUnit.MILLISECONDS);                   this.latch = null;
        }
        return true;
}
 
    public void unlock() {
        try {
        // 删除/locks/10000000000节点
        // 删除/locks/10000000001节点
            System.out.println("unlock " + lockNode);
            zk.delete(lockNode,-1);
            lockNode = null;
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
}
 
    public class LockException extends RuntimeException {
        private static final long serialVersionUID = 1L;
        public LockException(String e){
            super(e);
        }
        public LockException(Exception e){
            super(e);
        }
}
 
// 如果有一把锁,被多个人给竞争,此时多个人会排队,第一个拿到锁的人会执行,然后释放锁,后面的每个人都会去监听排在自己前面的那个人创建的node上,一旦某个人释放了锁,排在自己后面的人就会被zookeeper给通知,一旦被通知了之后,就ok了,自己就获取到了锁,就可以执行代码了
 
}
(3)不同之处

redis分布式锁需要不断重试去获取,比较消耗性能,zk锁不需要,只需要注册一个监听器,等待前节点的通知;

同时,redis服务若是挂掉了,锁只能等待超时时间后才可再被其他线程获得;而zk服务若是挂掉了,则会自动删除临时节点并释放锁,总体开销较小;

给我留言

留言无头像?