大佬教程收集整理的这篇文章主要介绍了Redisson-关于使用订阅数问题,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
最近在使用分布式锁redisson时遇到一个线上问题:发现是subscriptionsPerConnection or subscriptionConnectionPoolSize 的大小不够,需要提高配置才能解决。
下面对其源码进行分析,才能找到到底是什么逻辑导致问题所在:
private void lock(long leaseTime, TimeUnit unit, Boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
// 尝试获取,如果ttl == null,则表示获取锁成功
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
// 订阅锁释放事件,并通过await方法阻塞等待锁释放,有效的解决了无效的锁申请浪费资源的问题
RFuture<redissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(futurE);
} else {
commandExecutor.syncSubscription(futurE);
}
// 后面代码忽略
try {
// 无限循环获取锁,直到获取锁成功
// ...
} finally {
// 取消订阅锁释放事件
unsubscribe(future, threadId);
}
}
总结下主要逻辑:
subscribe()
方法protected RFuture<redissonLockEntry> subscribe(long threadId) {
// entryName 格式:“id:name”;
// chAnnelName 格式:“redisson_lock__chAnnel:name”;
return pubSub.subscribe(getEntryName(), getChAnnelName());
}
redissonLock#pubSub
是在redissonLock
构造函数中初始化的:
public redissonLock(CommandAsyncExecutor commandExecutor, String Name) {
// ....
this.pubSub = commandExecutor.getConnectionManager().getSubscribeservice().getLockPubSub();
}
而subscribeservice
在@H_767_11@masterSlaveConnectionManager的实现中又是通过如下方式构造的
public MasterSlaveConnectionManager(MasterSlaveServersConfig cfg, Config config, UUID id) {
this(config, id);
this.config = cfg;
// 初始化
initTimer(cfg);
initSingleEntry();
}
protected void initTimer(MasterSlaveServersConfig config) {
int[] timeouts = new int[]{Config.getRetryInterval(), config.getTimeout()};
Arrays.sort(timeouts);
int minTimeout = timeouts[0];
if (minTimeout % 100 != 0) {
minTimeout = (minTimeout % 100) / 2;
} else if (minTimeout == 100) {
minTimeout = 50;
} else {
minTimeout = 100;
}
timer = new HashedWheelTimer(new DefaultThreadFactory("redisson-timer"), minTimeout, TimeUnit.MILLISECONDS, 1024, falsE);
connectionWatcher = new IdleConnectionWatcher(this, config);
// 初始化:其中this就是MasterSlaveConnectionManager实例,config则为MasterSlaveServersConfig实例:
subscribeservice = new PublishSubscribeservice(this, config);
}
PublishSubscribeservice
构造函数
private final SemaphorePubSub semaphorePubSub = new SemaphorePubSub(this);
public PublishSubscribeservice(ConnectionManager connectionManager, MasterSlaveServersConfig config) {
super();
this.connectionManager = connectionManager;
this.config = config;
for (int i = 0; i < locks.length; i++) {
// 这里初始化了一组信号量,每个信号量的初始值为1
locks[i] = new AsyncSemaphore(1);
}
}
subscribe()
方法主要逻辑还是交给了 LockPubSub#subscribe()
里面private final ConcurrentMap<String, E> entries = new ConcurrentHashMap<>();
public RFuture<E> subscribe(String entryName, String chAnnelName) {
// 从PublishSubscribeservice获取对应的信号量。 相同的chAnnelName获取的是同一个信号量
// public AsyncSemaphore getSemaphore(ChAnnelName chAnnelName) {
// return locks[Math.abs(chAnnelName.hashCode() % locks.length)];
// }
AsyncSemaphore semaphore = service.getSemaphore(new ChAnnelName(chAnnelName));
AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
RPromise<E> newPromise = new redissonPromise<E>() {
@Override
public Boolean cancel(Boolean mayInterruptIfRunning) {
return semaphore.remove(listenerHolder.get());
}
};
Runnable listener = new Runnable() {
@Override
public void run() {
// 如果存在redissonLockEntry, 则直接利用已有的监听
E entry = entries.get(entryName);
if (entry != null) {
entry.acquire();
semaphore.release();
entry.getPromise().onComplete(new TransferListener<E>(newPromisE));
return;
}
E value = createEntry(newPromisE);
value.acquire();
E oldValue = entries.putIfAbsent(entryName, value);
if (oldValue != null) {
oldValue.acquire();
semaphore.release();
oldValue.getPromise().onComplete(new TransferListener<E>(newPromisE));
return;
}
// 创建监听,
redisPubSubListener<Object> listener = createListener(chAnnelName, value);
// 订阅监听
service.subscribe(LongCodec.INSTANCE, chAnnelName, semaphore, listener);
}
};
// 最终会执行listener.run方法
semaphore.acquire(listener);
listenerHolder.set(listener);
return newPromise;
}
AsyncSemaphore#acquire()
方法
public void acquire(Runnable listener) {
acquire(listener, 1);
}
public void acquire(Runnable listener, int permits) {
Boolean run = false;
synchronized (this) {
// counter初始化值为1
if (counter < permits) {
// 如果不是第一次执行,则将listener加入到listeners集合中
listeners.add(new Entry(listener, permits));
return;
} else {
counter -= permits;
run = true;
}
}
// 第一次执行acquire, 才会执行listener.run()方法
if (run) {
listener.run();
}
}
梳理上述逻辑:
PublishSubscribeservice#subscribe
方法PublishSubscribeservice#subscribe
逻辑如下:private final ConcurrentMap<ChAnnelName, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<>();
private final Queue<PubSubConnectionEntry> freePubSubConnections = new ConcurrentLinkedQueue<>();
public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String chAnnelName, AsyncSemaphore semaphore, redisPubSubListener<?>... listeners) {
RPromise<PubSubConnectionEntry> promise = new redissonPromise<PubSubConnectionEntry>();
// 主要逻辑入口, 这里要主要chAnnelName每次都是新对象, 但内部覆写hashCode+equals。
subscribe(codec, new ChAnnelName(chAnnelName), promise, PubSubType.SUBSCRIBE, semaphore, listeners);
return promise;
}
private void subscribe(Codec codec, ChAnnelName chAnnelName, RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, redisPubSubListener<?>... listeners) {
PubSubConnectionEntry connEntry = name2PubSubConnection.get(chAnnelName);
if (connEntry != null) {
// 从已有Connection中取,如果存在直接把listeners加入到PubSubConnectionEntry中
addListeners(chAnnelName, promise, type, lock, connEntry, listeners);
return;
}
// 没有时,才是最重要的逻辑
freePubSubLock.acquire(new Runnable() {
@Override
public void run() {
if (promise.isDone()) {
lock.release();
freePubSubLock.release();
return;
}
// 从队列中取头部元素
PubSubConnectionEntry freeEntry = freePubSubConnections.peek();
if (freeEntry == null) {
// 第一次肯定是没有的需要建立
connect(codec, chAnnelName, promise, type, lock, listeners);
return;
}
// 如果存在则尝试获取,如果remainFreeamount小于0则抛出异常终止了。
int remainFreeamount = freeEntry.tryAcquire();
if (remainFreeamount == -1) {
throw new IllegalStateException();
}
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(chAnnelName, freeEntry);
if (oldEntry != null) {
freeEntry.release();
freePubSubLock.release();
addListeners(chAnnelName, promise, type, lock, oldEntry, listeners);
return;
}
// 如果remainFreeamount=0, 则从队列中移除
if (remainFreeamount == 0) {
freePubSubConnections.poll();
}
freePubSubLock.release();
// 增加监听
RFuture<Void> subscribeFuture = addListeners(chAnnelName, promise, type, lock, freeEntry, listeners);
ChAnnelFuture future;
if (PubSubType.PSUBSCRIBE == typE) {
future = freeEntry.psubscribe(codec, chAnnelName);
} else {
future = freeEntry.subscribe(codec, chAnnelName);
}
future.addListener(new ChAnnelFutureListener() {
@Override
public void operationComplete(ChAnnelFuture futurE) throws Exception {
if (!future.issuccess()) {
if (!promise.isDone()) {
subscribeFuture.cancel(false);
}
return;
}
connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
subscribeFuture.cancel(false);
}
}, config.getTimeout(), TimeUnit.MILLISECONDS);
}
});
}
});
}
private void connect(Codec codec, ChAnnelName chAnnelName, RPromise<PubSubConnectionEntry> promise, PubSubType type, AsyncSemaphore lock, redisPubSubListener<?>... listeners) {
// 根据chAnnelName计算出slot获取PubSubConnection
int slot = connectionManager.calcSlot(chAnnelName.getName());
RFuture<redisPubSubConnection> connFuture = nextPubSubConnection(slot);
promise.onComplete((res, E) -> {
if (e != null) {
((RPromise<redisPubSubConnection>) connFuturE).tryFailure(E);
}
});
connFuture.onComplete((conn, E) -> {
if (e != null) {
freePubSubLock.release();
lock.release();
promise.tryFailure(E);
return;
}
// 这里会从配置中读取subscriptionsPerConnection
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
// 每获取一次,subscriptionsPerConnection就会减直到为0
int remainFreeamount = entry.tryAcquire();
// 如果旧的存在,则将现有的entry释放,然后将listeners加入到oldEntry中
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(chAnnelName, entry);
if (oldEntry != null) {
releaseSubscribeConnection(slot, entry);
freePubSubLock.release();
addListeners(chAnnelName, promise, type, lock, oldEntry, listeners);
return;
}
if (remainFreeamount > 0) {
// 加入到队列中
freePubSubConnections.add(entry);
}
freePubSubLock.release();
RFuture<Void> subscribeFuture = addListeners(chAnnelName, promise, type, lock, entry, listeners);
// 这里真正的进行订阅(底层与redis交互)
ChAnnelFuture future;
if (PubSubType.PSUBSCRIBE == typE) {
future = entry.psubscribe(codec, chAnnelName);
} else {
future = entry.subscribe(codec, chAnnelName);
}
future.addListener(new ChAnnelFutureListener() {
@Override
public void operationComplete(ChAnnelFuture futurE) throws Exception {
if (!future.issuccess()) {
if (!promise.isDone()) {
subscribeFuture.cancel(false);
}
return;
}
connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
subscribeFuture.cancel(false);
}
}, config.getTimeout(), TimeUnit.MILLISECONDS);
}
});
});
}
PubSubConnectionEntry#tryAcquire
方法, subscriptionsPerConnection代表了每个连接的最大订阅数。当tryAcqcurie的时候会减少这个数量:
public int tryAcquire() {
while (true) {
int value = subscribedChAnnelsamount.get();
if (value == 0) {
return -1;
}
if (subscribedChAnnelsamount.compareAndSet(value, value - 1)) {
return value - 1;
}
}
}
梳理上述逻辑:
freePubSubConnections
中取公用的PubSubConnectionEntry, 如果没有就进入connect()
方法
IllegalStateException
异常;如果remainFreeamount=0,则会将其从队列中移除, 那么后续请求会重新获取一个可用的连接subscribe
和addListener
;根因: 从上面代码分析, 导致问题的根因是因为PublishSubscribeservice 会使用公共队列中的freePubSubConnections, 如果同一个key一次性请求超过subscriptionsPerConnection
它的默认值5时,remainFreeamount就可能出现-1的情况, 那么就会导致commandExecutor.syncSubscription(futurE)
中等待超时,也就抛出如上异常Subscribe timeout: (7500ms). Increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters.
解决方法: 在初始化redisson可以可指定这个配置项的值。
相关参数的解释以及默认值请参考官网:https://github.com/redisson/redisson/wiki/2.-Configuration#23-common-setTings
以上是大佬教程为你收集整理的Redisson-关于使用订阅数问题全部内容,希望文章能够帮你解决Redisson-关于使用订阅数问题所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。