本文共 8667 字,大约阅读时间需要 28 分钟。
Curator Recipes是netfix开源的zookeeper客户端框架,因为zookeeper客户端在使用上很不方便,因此curator recipes对其进行了封装,并提供了十分丰富的功能。如下图所示。
基本涵盖了常用的分布式调度功能。那么这块他们是怎么做的。考虑到好的代码肯定做了很多优化,里边会有很多设计模式。但是作者目前还达不到那种一眼就看出其代码的精髓锁着,因此这块作者还是按照老样子。小了解其大概得轮廓。以后再复习设计模式的时候。再去思考这些能够真正提升自身功力的东西。
首先要使用curator提供的功能,需要导入相关的包
org.apache.curator curator-framework 5.1.0 org.apache.curator curator-recipes 5.1.0
这里我们将zookeeper客户端交给spring进行管理。
@Configurationpublic class TestCurd { @Bean public CuratorFramework main() { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); //创建连接对象 CuratorFramework client = CuratorFrameworkFactory.builder() //IP地址端口号 .connectString("127.0.0.1:2181") //客户端与服务器之间的会话超时时间 .sessionTimeoutMs(1000000) //当客户端与服务器之间会话超时3s后,进行一次重连 .retryPolicy(retryPolicy) //命名空间,当我们创建节点的时候,以/create为父节点 .namespace("create") //构建连接对象 .build(); //打开连接 client.start(); //是否成功建立连接,true :建立, false:没有建立 System.out.println(client.isStarted()); return client; }}
编写相关测试方法
@GetMapping(value = "/lock2")public void lock2() throws Exception { // 读写锁 InterProcessReadWriteLock interProcessReadWriteLock=new InterProcessReadWriteLock(client, "/lock1"); // 获取读锁对象 InterProcessLock interProcessLock=interProcessReadWriteLock.readLock(); System.out.println("等待获取锁对象!"); // 获取锁 interProcessLock.acquire(); for (int i = 1; i <= 10; i++) { Thread.sleep(3000); System.out.println(i); } // 释放锁 interProcessLock.release(); System.out.println("等待释放锁!");}@GetMapping(value = "/lock3")public void lock3() throws Exception { // 读写锁 InterProcessReadWriteLock interProcessReadWriteLock=new InterProcessReadWriteLock(client, "/lock1"); // 获取写锁对象 InterProcessLock interProcessLock=interProcessReadWriteLock.writeLock(); System.out.println("等待获取锁对象!"); // 获取锁 interProcessLock.acquire(); for (int i = 1; i <= 10; i++) { Thread.sleep(3000); System.out.println(i); } // 释放锁 interProcessLock.release(); System.out.println("等待释放锁!");}
这块我们我们看到curator提供了读写锁。我们发现在初始化的时候。curator就已经将读锁和写锁进行了初始化。而我们真正在使用的时候也就是直接使用。
public InterProcessReadWriteLock(CuratorFramework client, String basePath) { this(client, basePath, (byte[])null); } public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData) { lockData = lockData == null ? null : Arrays.copyOf(lockData, lockData.length); //写锁 this.writeMutex = new InterProcessReadWriteLock.InternalInterProcessMutex(client, basePath, "__WRIT__", lockData, 1, new InterProcessReadWriteLock.SortingLockInternalsDriver() { public PredicateResults getsTheLock(CuratorFramework client, Listchildren, String sequenceNodeName, int maxLeases) throws Exception { return super.getsTheLock(client, children, sequenceNodeName, maxLeases); } }); //读锁 this.readMutex = new InterProcessReadWriteLock.InternalInterProcessMutex(client, basePath, "__READ__", lockData, 2147483647, new InterProcessReadWriteLock.SortingLockInternalsDriver() { public PredicateResults getsTheLock(CuratorFramework client, Listchildren, String sequenceNodeName, int maxLeases) throws Exception { return InterProcessReadWriteLock.this.readLockPredicate(children, sequenceNodeName); } }); }
获取读锁
//获取锁 public void acquire() throws Exception { if (!this.internalLock(-1L, (TimeUnit)null)) { throw new IOException("Lost connection while trying to acquire lock: " + this.basePath); } }//获取锁 private boolean internalLock(long time, TimeUnit unit) throws Exception { Thread currentThread = Thread.currentThread();//通过绑定thread的方式对该线程重入的次数进行记录。 InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread); if (lockData != null) {//如果发生了重入,那么这里就将重入的次数进行加一操作 lockData.lockCount.incrementAndGet();//表示获取到锁 return true; } else {//如果第一次加锁,或者中途获取锁失败。那么进行尝试 String lockPath = this.internals.attemptLock(time, unit, this.getLockNodeBytes()); if (lockPath != null) { InterProcessMutex.LockData newLockData = new InterProcessMutex.LockData(currentThread, lockPath); this.threadData.put(currentThread, newLockData); return true; } else { return false; } } }
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception { long startMillis = System.currentTimeMillis(); Long millisToWait = unit != null ? unit.toMillis(time) : null; byte[] localLockNodeBytes = this.revocable.get() != null ? new byte[0] : lockNodeBytes; int retryCount = 0; String ourPath = null; boolean hasTheLock = false; boolean isDone = false; while(!isDone) { isDone = true; try {//通过初始化的driver获取锁 ourPath = this.driver.createsTheLock(this.client, this.path, localLockNodeBytes);//判断是否拿到锁,这里对读锁和写锁进行兼容。 hasTheLock = this.internalLockLoop(startMillis, millisToWait, ourPath); } catch (NoNodeException var14) { if (!this.client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) { throw var14; } isDone = false; } } return hasTheLock ? ourPath : null; }
获取锁
public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception { String ourPath;//通过我们注入到spring ioc中的client操作zk,通过判断是否存在该路径进行加锁 if (lockNodeBytes != null) { ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path, lockNodeBytes); } else { ourPath = (String)((ACLBackgroundPathAndBytesable)client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)).forPath(path); }//拿到路径之后,就返回 return ourPath;
判断是否拿到锁的根据是这里的maxLeasse,写锁这里为1,读锁为2147483647
public PredicateResults getsTheLock(CuratorFramework client, Listchildren, String sequenceNodeName, int maxLeases) throws Exception { int ourIndex = children.indexOf(sequenceNodeName); validateOurIndex(sequenceNodeName, ourIndex); boolean getsTheLock = ourIndex < maxLeases; String pathToWatch = getsTheLock ? null : (String)children.get(ourIndex - maxLeases); return new PredicateResults(pathToWatch, getsTheLock); }
通过上述分析,我们大概了解了curator做分布式锁的基本过程,通过对path路径的是否存在进行加锁。锁的重入是针对于线程本身来说的。在单个jvm中线程的中断对其他线程的轮询没有任何影响。只有当当前线程运行完毕并删除zk中的节点,其他线程才可以进行加锁。相反在读锁中,通过与数字2147483647进行对比来判断是否可以加锁。这里的2147483647就是读锁的上线。
在锁释放的这个问题上。我们看到也是通过从lackdata中获取重入的次数,然后进行递减的。因为这个lockdata和线程进行绑定。所以在线程轮转中是没有数据消失的问题的。
public void release() throws Exception { Thread currentThread = Thread.currentThread();//拿到当前线程的重入数据 InterProcessMutex.LockData lockData = (InterProcessMutex.LockData)this.threadData.get(currentThread); if (lockData == null) { throw new IllegalMonitorStateException("You do not own the lock: " + this.basePath); } else {//进行锁的重入次数的释放 int newLockCount = lockData.lockCount.decrementAndGet(); if (newLockCount <= 0) { if (newLockCount < 0) { throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + this.basePath); } else { try {//如果锁被释放完毕。那么就开始真正的释放 this.internals.releaseLock(lockData.lockPath); } finally { this.threadData.remove(currentThread); } } } } }
锁的释放也很简单,直接删除
final void releaseLock(String lockPath) throws Exception { this.client.removeWatchers(); this.revocable.set((Object)null); this.deleteOurPath(lockPath); }
总结:通过分析,curator读写锁是通过对zk节点的存在与否进行判断的从而进行加锁的,对于读锁来说只有在不存在的时候线程才能加锁成功。通过将线程和重入次数的绑定,来实现的锁重入机制。当锁被释放之后,通过删除节点来通知其他线程进行加锁。对于读锁来说,单个线程最大或者的读锁数量也是有限制的。通过序列号的方式与写锁进行区别。读锁这块的详细实现作者还没有想明白,以后想明白了再补上。
转载地址:http://mrkmi.baihongyu.com/