`
ferry2174
  • 浏览: 10357 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

mongo-java-driver连接池

 
阅读更多

MongoDB的连接对象是DBPort,所以可以从DBPortPool对象看起,构造函数如下:

DBPortPool( ServerAddress addr , MongoOptions options ){

super"DBPortPool-" + addr.toString() + ", options = " +  options.toString() ,options.connectionsPerHost );

_options =options;

_addr = addr;

_waitingSem = new Semaphore( _options.connectionsPerHost * _options.threadsAllowedToBlockForConnectionMultiplier );

}

可以看出java-driver会为每个ServerAddress创建一个连接池。也是因为MongoDB是在驱动中设置高可用的,可以配置多个MongoS或MongoD的连接。_waitingSem是个很有意思的信号量,他是由connectionsPerHostthreadsAllowedToBlockForConnectionMultiplier相乘得出的,在后面来分析他的用法。

DBPortPool的父类是SimplePool,再来看从父类的构造函数可以看出:

publicSimplePool(String name, int size){

_name = name;

_size = size;

_sem = new Semaphore(size);

}

其中,size就是connectionsPerHost,SimplePool使用connectionsPerHost创建了一个信号量_sem,可以通过_sem在SimplePool中的使用方式来分析一下连接的创建策略。

SimplePool中,通过下面方法申请获取连接的许可证,

privatebooleanpermitAcquired(finallong waitTime) throws InterruptedException {

if(waitTime > 0) {

return_sem.tryAcquire(waitTime, TimeUnit.MILLISECONDS);

       } elseif (waitTime < 0) {

_sem.acquire();

returntrue;

       } else {

return_sem.tryAcquire();

       }

}

获取这个许可有一个等待时间的策略,如果waitTime为0,那么如果可以获取连接就立即返回true,如果不能获取连接就立即返回false;如果设置了waitTime为正数,会等待waitTime毫秒,如果这段时间仍然没有连接可以使用就返回false;如果waitTime就负数,那么会一直等待直到获得许可为止。

这在被调用方法不能设置超时时间的时候,在调用方法中设置超时时间的方法。

这也是一个平衡线程利用率的策略,在一个有限且使用率高的线程池中,让一个线程等待时间过长无疑是一种浪费,MongoDB使用阻塞信号量配置等待时间的方法,配置线程的释放策略。

这个方法是在获取DBPort对象时被调用的,用来申请获取连接的许可:

public T get(long waitTime) throws InterruptedException {

if(!permitAcquired(waitTime)) {

returnnull;

       }

       //获取连接的方法略……

       // t =createNewAndReleasePermitIfFailure();

}

SimplePool是一个获取池对象的基类方法,createNewAndReleasePermitIfFailure方法会调用子类的createNew方法来创建具体对象。然后再看看这个信号量什么时候被释放:

private TcreateNewAndReleasePermitIfFailure() {

try {

           T newMember = createNew();

if(newMember == null) {

thrownew IllegalStateException("null pool members are not allowed");

           }

returnnewMember;

       } catch(RuntimeException e) {

_sem.release();

throw e;

       } catch (Errore) {

_sem.release();

throw e;

       }

  }

可见,在连接创建失败的时候会释放这个信号量。

publicvoid done( T t ){

synchronizedthis ) {

if (_closed) {

               cleanup(t);

return;

           }

           // 释放连接方法略……

       }

_sem.release();

}

还有就是连接使用完毕,要调用done释放连接,信号量就会被恢复。

从上可知。connectionPerHost是单个Mongo服务连接地址的连接池数量,连接都被占用的获取连接等待时间是可设置的,由MongoClientOptions.maxWaitTime参数决定,默认时间是两分钟,可以通过减少这个值来提升线程利用率。

 

再来看看_waitingSem这个信号量的用法,他只在一个方法中被使用了:

public DBPort get() {

       DBPort port = null;

if ( ! _waitingSem.tryAcquire() )

thrownew SemaphoresOut(_options.connectionsPerHost * _options.threadsAllowedToBlockForConnectionMultiplier);

 

try {

           port = get( _options.maxWaitTime );

       } catch(InterruptedException e) {

thrownew MongoInterruptedException(e);

       } finally {

_waitingSem.release();

       }

if ( port== null )

thrownew ConnectionWaitTimeOut( _options.maxWaitTime );

       port._lastThread =System.identityHashCode(Thread.currentThread());

return port;

}

_waitingSem值的大小是连接池大小乘以一个常数(>=1),这个信号量在获取连接动作之前取得,在获取连接后马上释放,而不用等待连接被放回线程池,说明在线程池为空的情况下,最多可以有(connectionsPerHost* threadsAllowedToBlockForConnectionMultiplier)多个线程被阻塞在

port = get(_options.maxWaitTime)这段代码处。超过这个数就会抛异常,告诉后面的线程不用等了。所以这是一个等待获取连接的队列,队列的长度就是连接池大小乘以一个常数。如果线程池数量有限,而又允许有一定数据损失的场景下,可以将threadsAllowedToBlockForConnectionMultiplier值调小,比如写日志。像MongoDB这种周期性从内存向磁盘写入数据的情况,默认是每60秒写如一次数据,如果内存不够大,可能会在客户端积压很长的等待队列,如果操作比较频繁,而又不想处理因为等待造成的异常,就将这个值调大。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics