当前位置:首页 > 开发 > 互联网 > 正文

基于Zookeeper的分布式共享锁

发表于: 2015-07-09   作者:roadrunners   来源:转载   浏览:
摘要: 首先,说说我们的场景,订单服务是做成集群的,当两个以上结点同时收到一个相同订单的创建指令,这时并发就产生了,系统就会重复创建订单。等等......场景。这时,分布式共享锁就闪亮登场了。   共享锁在同一个进程中是很容易实现的,但在跨进程或者在不同Server之间就不好实现了。Zookeeper就很容易实现。具体的实现原理官网和其它网站也有翻译,这里就不在赘述了。   官

首先,说说我们的场景,订单服务是做成集群的,当两个以上结点同时收到一个相同订单的创建指令,这时并发就产生了,系统就会重复创建订单。等等......场景。这时,分布式共享锁就闪亮登场了。

 

共享锁在同一个进程中是很容易实现的,但在跨进程或者在不同Server之间就不好实现了。Zookeeper就很容易实现。具体的实现原理官网和其它网站也有翻译,这里就不在赘述了。

 

官网资料:http://zookeeper.apache.org/doc/r3.4.5/recipes.html

中文资料:https://www.ibm.com/developerworks/cn/opensource/os-cn-zookeeper

详见Locks章节。

 

原理都知道了,网上一搜索Apache上面已经有提供了,既然已经有轮子了,哪我们也没必要重复造轮子了吧!直接使用Curator。但是,我们在测试中发现,用于共享锁的结点无法自动回收,除了最末一级的临时结点会在锁释放和session超时的时候能自动回收外,其它结点均无法自动回收。我们的订单一天有好几万,遇到618和双十一的时候每天的订单量超50W,如果结点长期不回收的话,肯定会影响Zookeeper的性能。这时,我们就想到了一句话“自己动手,丰衣足食”。下面直接上代码:

 

首先,创建一个Maven工程,在pom文件里导入下面的包:

	<dependencies>
		<dependency>
			<groupId>org.apache.zookeeper</groupId>
			<artifactId>zookeeper</artifactId>
			<version>3.4.6</version>
		</dependency>
		<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-client</artifactId>
			<version>2.8.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-recipes</artifactId>
			<version>2.8.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-framework</artifactId>
			<version>2.8.0</version>
		</dependency>
		<dependency>
			<groupId>commons-beanutils</groupId>
			<artifactId>commons-beanutils</artifactId>
			<version>1.9.2</version>
		</dependency>
		<dependency>
			<groupId>commons-logging</groupId>
			<artifactId>commons-logging</artifactId>
			<version>1.2</version>
		</dependency>
		<dependency>
			<groupId>commons-lang</groupId>
			<artifactId>commons-lang</artifactId>
			<version>2.6</version>
		</dependency>
	</dependencies>

 

LockZookeeperClient接口:

package com.XXX.framework.lock;

import org.apache.curator.framework.CuratorFramework;

/**
 * 
 * description
 * 
 * @author Roadrunners
 * @version 1.0, 2015年7月9日
 */
public interface LockZookeeperClient {
	/**
	 * 
	 * @return
	 */
	CuratorFramework getCuratorFramework();

	/**
	 * 
	 * @return
	 */
	String getBasePath();

	/**
	 * garbage collector
	 * 
	 * @param gcPath
	 */
	void gc(String gcPath);
}

 

LockZookeeperClient接口的实现LockZookeeperClientFactory:

package com.XXX.framework.lock;

import java.util.Date;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentSkipListSet;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

/**
 * 
 * description
 *  
 * @author Roadrunners
 * @version 1.0, 2015年7月9日
 */
public class LockZookeeperClientFactory implements LockZookeeperClient {
	private static final Log LOG = LogFactory.getLog(LockZookeeperClientFactory.class);

	private boolean hasGc = true;
	private Timer gcTimer;
	private TimerTask gcTimerTask;
	private ConcurrentSkipListSet<String> gcPaths = new ConcurrentSkipListSet<String>();
	private int gcIntervalSecond = 60;

	private CuratorFramework curatorFramework;
	private String zookeeperIpPort = "localhost:2181";
	private int sessionTimeoutMs = 10000;
	private int connectionTimeoutMs = 10000;
	private String basePath = "/locks";

	public void setHasGc(boolean hasGc) {
		this.hasGc = hasGc;
	}

	public void setGcIntervalSecond(int gcIntervalSecond) {
		this.gcIntervalSecond = gcIntervalSecond;
	}

	public void setZookeeperIpPort(String zookeeperIpPort) {
		this.zookeeperIpPort = zookeeperIpPort;
	}

	public void setSessionTimeoutMs(int sessionTimeoutMs) {
		this.sessionTimeoutMs = sessionTimeoutMs;
	}

	public void setConnectionTimeoutMs(int connectionTimeoutMs) {
		this.connectionTimeoutMs = connectionTimeoutMs;
	}

	public void setBasePath(String basePath) {
		basePath = basePath.trim();
		if (basePath.endsWith("/")) {
			basePath = basePath.substring(0, basePath.length() - 1);
		}

		this.basePath = basePath;
	}

	public void init() {
		if(StringUtils.isBlank(zookeeperIpPort)){
			throw new NullPointerException("zookeeperIpPort");
		}

		ExponentialBackoffRetry retryPolicy = new ExponentialBackoffRetry(1000, 3);
		curatorFramework = CuratorFrameworkFactory.newClient(zookeeperIpPort.trim(), sessionTimeoutMs, connectionTimeoutMs, retryPolicy);
		curatorFramework.start();
		LOG.info("CuratorFramework initialise succeed.");

		if (hasGc) {
			gc();
		}
	}

	public void destroy() {
		gcPaths.clear();
		gcPaths = null;
		gcStop();
		curatorFramework.close();
		curatorFramework = null;
	}

	@Override
	public void gc(String gcPath) {
		if (hasGc && StringUtils.isNotBlank(gcPath)) {
			gcPaths.add(gcPath.trim());
		}
	}

	@Override
	public CuratorFramework getCuratorFramework() {
		return this.curatorFramework;
	}

	@Override
	public String getBasePath() {
		return this.basePath;
	}

	private synchronized void gc() {
		gcStop();

		try {
			scanningGCNodes();
		} catch (Throwable e) {
			LOG.warn(e);
		}

		gcTimerTask = new TimerTask() {
			@Override
			public void run() {
				doingGc();
			}
		};

		Date begin = new Date();
		begin.setTime(begin.getTime() + (10 * 1000L));
		gcTimer = new Timer("lock-gc", true);
		gcTimer.schedule(gcTimerTask, begin, gcIntervalSecond * 1000L);
	}

	private synchronized void gcStop() {
		if (null != gcTimer) {
			gcTimer.cancel();
			gcTimer = null;
		}

		if (null != gcTimerTask) {
			gcTimerTask.cancel();
			gcTimerTask = null;
		}
	}

	private synchronized void scanningGCNodes() throws Exception {
		if (null == curatorFramework.checkExists().forPath(basePath)) {
			return;
		}

		List<String> paths = curatorFramework.getChildren().forPath(basePath);
		if (CollectionUtils.isEmpty(paths)) {
			gcPaths.add(basePath);
			return;
		}

		for (String path : paths) {
			try{
				String tmpPath = basePath + "/" + path;
				if (null == curatorFramework.checkExists().forPath(tmpPath)) {
					continue;
				}
				
				gcPaths.add(tmpPath);
			} catch(Throwable e){
				LOG.warn("scanning gc nodes error.", e);
			}
		}
	}
	
	private synchronized void doingGc() {
		LOG.debug("GC beginning.");

		if (CollectionUtils.isNotEmpty(gcPaths)) {
			for (String path : gcPaths) {
				try {
					if (null != curatorFramework.checkExists().forPath(path)) {
						if (CollectionUtils.isEmpty(curatorFramework.getChildren().forPath(path))) {
							curatorFramework.delete().forPath(path);
							gcPaths.remove(path);
							LOG.debug("GC " + path);
						}
					} else {
						gcPaths.remove(path);
					}
				} catch (Throwable e) {
					gcPaths.remove(path);
					LOG.warn(e);
				}
			}
		}

		LOG.debug("GC ended.");
	}

}

 

SharedLock共享锁:

package com.XXX.framework.lock.shared;

import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.StringUtils;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;

import com.XXX.framework.lock.LockZookeeperClient;

/**
 * 
 * description
 * 
 * @author Roadrunners
 * @version 1.0, 2015年7月9日
 */
public class SharedLock {
	private InterProcessLock interProcessLock;

	public SharedLock(LockZookeeperClient lockZookeeperClient, String resourceId) {
		super();
		
		if (StringUtils.isBlank(resourceId)) {
			throw new NullPointerException("resourceId");
		}
		String path = lockZookeeperClient.getBasePath();
		path += ("/" + resourceId.trim());

		interProcessLock = new InterProcessMutex(lockZookeeperClient.getCuratorFramework(), path);
		lockZookeeperClient.gc(path);
	}
	
    /**
     * Acquire the mutex - blocking until it's available. Each call to acquire must be balanced by a call
     * to {@link #release()}
     *
     * @throws Exception ZK errors, connection interruptions
     */
	public void acquire() throws Exception {
		interProcessLock.acquire();
	}

    /**
     * Acquire the mutex - blocks until it's available or the given time expires. Each call to acquire that returns true must be balanced by a call
     * to {@link #release()}
     *
     * @param time time to wait
     * @param unit time unit
     * @return true if the mutex was acquired, false if not
     * @throws Exception ZK errors, connection interruptions
     */
	public boolean acquire(long time, TimeUnit unit) throws Exception {
		return interProcessLock.acquire(time, unit);
	}

    /**
     * Perform one release of the mutex.
     *
     * @throws Exception ZK errors, interruptions, current thread does not own the lock
     */
	public void release() throws Exception {
		interProcessLock.release();
	}
	
    /**
     * Returns true if the mutex is acquired by a thread in this JVM
     *
     * @return true/false
     */
	public boolean isAcquiredInThisProcess() {
		return interProcessLock.isAcquiredInThisProcess();
	}
}

 

到此代码已经完成。下面写一个简单的Demo:

		//LockZookeeperClientFactory通常是通过Spring配置注入的,此处是为了Demo的简单明了才这样写的,不建议这样写
		LockZookeeperClientFactory lzc = new LockZookeeperClientFactory();
		lzc.setZookeeperIpPort("10.100.15.1:8900");
		lzc.setBasePath("/locks/sharedLock/");
		lzc.init();

		SharedLock sharedLock = new SharedLock(lzc, "sharedLock1");
		try {
			if (sharedLock.acquire(100, TimeUnit.MILLISECONDS)) {
				System.out.println("sharedLock1 get");
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				sharedLock.release();
			} catch (Exception e) {
				e.printStackTrace();
			}
		}

		lzc.destroy();
 

就这样,系统就会每隔一分钟去回收一次没有使用的结点。

基于Zookeeper的分布式共享锁

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
工作中需要写一个定时任务,由于是集群环境,自然而然想到需要通过分布式锁来保证单台执行..相信大家
agapple 基于zookeeper的分布式lock实现 博客分类: opensource java distributed 背景 继续上一篇
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; impo
大家也许都很熟悉了多个线程或者多个进程间的共享锁的实现方式了,但是在分布式场景中我们会面临多
众所周知,在Java开发中,要创建一个单进程锁非常简单,使用JDK中自带的java.util.concurrent.locks
背景 继续上一篇文章:http://agapple.iteye.com/blog/1183972 ,项目中需要对分布式任务进行调度,
背景 继续上一篇文章:http://agapple.iteye.com/blog/1183972 ,项目中需要对分布式任务进行调度,
背景 继续上一篇文章:http://agapple.iteye.com/blog/1183972 ,项目中需要对分布式任务进行调度,
背景 继续上一篇文章:http://agapple.iteye.com/blog/1183972 ,项目中需要对分布式任务进行调度,
背景 继续上一篇文章:http://agapple.iteye.com/blog/1183972 ,项目中需要对分布式任务进行调度,
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号