博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Curator学习笔记(一)- 读写锁
阅读量:4209 次
发布时间:2019-05-26

本文共 8667 字,大约阅读时间需要 28 分钟。

Curator Recipes是netfix开源的zookeeper客户端框架,因为zookeeper客户端在使用上很不方便,因此curator recipes对其进行了封装,并提供了十分丰富的功能。如下图所示。

基本涵盖了常用的分布式调度功能。那么这块他们是怎么做的。考虑到好的代码肯定做了很多优化,里边会有很多设计模式。但是作者目前还达不到那种一眼就看出其代码的精髓锁着,因此这块作者还是按照老样子。小了解其大概得轮廓。以后再复习设计模式的时候。再去思考这些能够真正提升自身功力的东西。

首先要使用curator提供的功能,需要导入相关的包

org.apache.curator
curator-framework
5.1.0
org.apache.curator
curator-recipes
5.1.0

这里我们将zookeeper客户端交给spring进行管理。

@Configurationpublic class TestCurd {    @Bean    public CuratorFramework main() {        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);        //创建连接对象        CuratorFramework client = CuratorFrameworkFactory.builder()                //IP地址端口号                .connectString("127.0.0.1:2181")                //客户端与服务器之间的会话超时时间                .sessionTimeoutMs(1000000)                //当客户端与服务器之间会话超时3s后,进行一次重连                .retryPolicy(retryPolicy)                //命名空间,当我们创建节点的时候,以/create为父节点                .namespace("create")                //构建连接对象                .build();        //打开连接        client.start();        //是否成功建立连接,true :建立, false:没有建立        System.out.println(client.isStarted());        return client;    }}

编写相关测试方法

@GetMapping(value = "/lock2")public void lock2() throws Exception {    // 读写锁    InterProcessReadWriteLock interProcessReadWriteLock=new InterProcessReadWriteLock(client, "/lock1");    // 获取读锁对象    InterProcessLock interProcessLock=interProcessReadWriteLock.readLock();    System.out.println("等待获取锁对象!");    // 获取锁    interProcessLock.acquire();    for (int i = 1; i <= 10; i++) {        Thread.sleep(3000);        System.out.println(i);    }    // 释放锁    interProcessLock.release();    System.out.println("等待释放锁!");}@GetMapping(value = "/lock3")public void lock3() throws Exception {    // 读写锁    InterProcessReadWriteLock interProcessReadWriteLock=new InterProcessReadWriteLock(client, "/lock1");    // 获取写锁对象    InterProcessLock interProcessLock=interProcessReadWriteLock.writeLock();    System.out.println("等待获取锁对象!");    // 获取锁    interProcessLock.acquire();    for (int i = 1; i <= 10; i++) {        Thread.sleep(3000);        System.out.println(i);    }    // 释放锁    interProcessLock.release();    System.out.println("等待释放锁!");}

 这块我们我们看到curator提供了读写锁。我们发现在初始化的时候。curator就已经将读锁和写锁进行了初始化。而我们真正在使用的时候也就是直接使用。

public InterProcessReadWriteLock(CuratorFramework client, String basePath) {        this(client, basePath, (byte[])null);    }    public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData) {        lockData = lockData == null ? null : Arrays.copyOf(lockData, lockData.length);        //写锁        this.writeMutex = new InterProcessReadWriteLock.InternalInterProcessMutex(client, basePath, "__WRIT__", lockData, 1, new InterProcessReadWriteLock.SortingLockInternalsDriver() {            public PredicateResults getsTheLock(CuratorFramework client, Listchildren, String sequenceNodeName, int maxLeases) throws Exception {                return super.getsTheLock(client, children, sequenceNodeName, maxLeases);            }        });        //读锁        this.readMutex = new InterProcessReadWriteLock.InternalInterProcessMutex(client, basePath, "__READ__", lockData, 2147483647, new InterProcessReadWriteLock.SortingLockInternalsDriver() {            public PredicateResults getsTheLock(CuratorFramework client, Listchildren, String sequenceNodeName, int maxLeases) throws Exception {                return InterProcessReadWriteLock.this.readLockPredicate(children, sequenceNodeName);            }        });    }

获取读锁

//获取锁    public void acquire() throws Exception {        if (!this.internalLock(-1L, (TimeUnit)null)) {            throw new IOException("Lost connection while trying to acquire lock: " + this.basePath);        }    }//获取锁    private boolean internalLock(long time, TimeUnit unit) throws Exception {        Thread currentThread = Thread.currentThread();//通过绑定thread的方式对该线程重入的次数进行记录。        InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);        if (lockData != null) {//如果发生了重入,那么这里就将重入的次数进行加一操作            lockData.lockCount.incrementAndGet();//表示获取到锁            return true;        } else {//如果第一次加锁,或者中途获取锁失败。那么进行尝试            String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes());            if (lockPath != null) {                InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath);                this.threadData.put(currentThread, newLockData);                return true;            } else {                return false;            }        }    }

String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {        long startMillis = System.currentTimeMillis();        Long millisToWait = unit != null ? unit.toMillis(time) : null;        byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes;        int retryCount = 0;        String ourPath = null;        boolean hasTheLock = false;        boolean isDone = false;        while(!isDone) {            isDone = true;            try {//通过初始化的driver获取锁                ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);//判断是否拿到锁,这里对读锁和写锁进行兼容。                hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath);            } catch (NoNodeException var14) {                if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {                    throw var14;                }                isDone = false;            }        }        return hasTheLock ? ourPath : null;    }

获取锁

public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {        String ourPath;//通过我们注入到spring ioc中的client操作zk,通过判断是否存在该路径进行加锁        if (lockNodeBytes != null) {            ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path, lockNodeBytes);        } else {            ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path);        }//拿到路径之后,就返回        return ourPath;

判断是否拿到锁的根据是这里的maxLeasse,写锁这里为1,读锁为2147483647

public PredicateResults getsTheLock(CuratorFramework client, Listchildren, String sequenceNodeName, int maxLeases) throws Exception {        int ourIndex = children.indexOf(sequenceNodeName);        validateOurIndex(sequenceNodeName, ourIndex);        boolean getsTheLock = ourIndex < maxLeases;        String pathToWatch = getsTheLock ? null : (String)children.get(ourIndex - maxLeases);        return new PredicateResults(pathToWatch, getsTheLock);    }

通过上述分析,我们大概了解了curator做分布式锁的基本过程,通过对path路径的是否存在进行加锁。锁的重入是针对于线程本身来说的。在单个jvm中线程的中断对其他线程的轮询没有任何影响。只有当当前线程运行完毕并删除zk中的节点,其他线程才可以进行加锁。相反在读锁中,通过与数字2147483647进行对比来判断是否可以加锁。这里的2147483647就是读锁的上线。

在锁释放的这个问题上。我们看到也是通过从lackdata中获取重入的次数,然后进行递减的。因为这个lockdata和线程进行绑定。所以在线程轮转中是没有数据消失的问题的。

public void release() throws Exception {        Thread currentThread = Thread.currentThread();//拿到当前线程的重入数据        InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread);        if (lockData == null) {            throw new IllegalMonitorStateException("You do not own the lock: " + this.basePath);        } else {//进行锁的重入次数的释放            int newLockCount = lockData.lockCount.decrementAndGet();            if (newLockCount <= 0) {                if (newLockCount < 0) {                    throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + this.basePath);                } else {                    try {//如果锁被释放完毕。那么就开始真正的释放                        this.internals.releaseLock(lockData.lockPath);                    } finally {                        this.threadData.remove(currentThread);                    }                }            }        }    }

锁的释放也很简单,直接删除

final void releaseLock(String lockPath) throws Exception {        this.client.removeWatchers();        this.revocable.set((Object)null);        this.deleteOurPath(lockPath);    }

总结:通过分析,curator读写锁是通过对zk节点的存在与否进行判断的从而进行加锁的,对于读锁来说只有在不存在的时候线程才能加锁成功。通过将线程和重入次数的绑定,来实现的锁重入机制。当锁被释放之后,通过删除节点来通知其他线程进行加锁。对于读锁来说,单个线程最大或者的读锁数量也是有限制的。通过序列号的方式与写锁进行区别。读锁这块的详细实现作者还没有想明白,以后想明白了再补上。

转载地址:http://mrkmi.baihongyu.com/

你可能感兴趣的文章
资源监控工具 - Hyperic HQ
查看>>
LoadRunner中Concurrent与Simultaneous的区别
查看>>
SiteScope - Agentless监控
查看>>
使用QTP的.NET插件扩展技术测试ComponentOne的ToolBar控件
查看>>
用上帝之眼进行自动化测试
查看>>
为LoadRunner写一个lr_save_float函数
查看>>
PrefTest工作室全新力作-《性能测试与调优实战》课程视频即将上线
查看>>
质量度量分析与测试技术 培训大纲
查看>>
欢迎加入【亿能测试快讯】邮件列表!
查看>>
为什么我们的自动化测试“要”这么难
查看>>
LoadRunner性能脚本开发实战训练
查看>>
测试之途,前途?钱途?图何?
查看>>
adb常用命令
查看>>
通过LR监控Linux服务器性能
查看>>
通过FTP服务的winsockes录制脚本
查看>>
LRwinsocket协议测试AAA服务器
查看>>
Net远程管理实验
查看>>
反病毒专家谈虚拟机技术 面临两大技术难题
查看>>
几种典型的反病毒技术:特征码技术、覆盖法技术等
查看>>
Software Security Testing软件安全测试
查看>>