跳到主要内容

对象池模式(Object Pool Pattern)

引言

对象池模式(Object Pool Pattern)是一种创建一组可重用对象的设计模式。它通过维护一个预分配的对象集合,避免了频繁地创建和销毁对象所带来的性能开销。在需要使用对象时,可以直接从池中获取,而不需要每次都创建新的对象;当对象不再使用时,可以将其归还到池中,而不是直接销毁。

对象池模式的主要优点是减少了对象的创建和销毁的开销,提高了程序的性能。此外,它还有助于控制资源的使用,避免资源的浪费。然而,对象池模式也有一些缺点,如增加了代码的复杂性,以及可能导致内存占用过高。

对象池模式并不是GoF中的23种设计模式

定义及实现

定义

When objects are expensive to create and they are needed only for short periods of time it is advantageous to utilize the Object Pool pattern. The Object Pool provides a cache for instantiated objects tracking which ones are in use and which are available.

当对象的创建成本很高并且只在很短的周期内使用,那么对象池模式就很有优势。对象池提供一个对象示例的缓存来跟踪那个对象正在使用,哪个对象是可用的。

结构

代码实现

@Slf4j
public abstract class ObjectPool<T> {

private final Deque<T> available = new ArrayDeque<>();
private final Deque<T> using = new ArrayDeque<>();

/**
* 创建一个对象
*/
protected abstract T create();

/**
* 从池中获取一个对象
*/
public synchronized T checkOut() {
if (available.isEmpty()) {
T obj = this.create();
using.addLast(obj);
return obj;
}
T obj = available.poll();
using.addLast(obj);
return obj;
}

/**
* 将对象放回池中
*/
public synchronized void checkIn(T t) {
using.remove(t);
available.addLast(t);
}

public void printPoolInfo() {
log.info("available: {}, using: {}", available.size(), using.size());
}
}
java
@Slf4j
public class Oliphaunt {

// 这里类中定义一个序号,用来区分不同的实例
private final Integer sno;

public Oliphaunt(Integer sno) {
this.sno = sno;
}

public void doSomething() {
log.info("sno: {}, do something", this.sno);
}
}
java
public class OliphauntObjectPool extends ObjectPool<Oliphaunt> {

private final AtomicInteger count = new AtomicInteger(1);

@Override
public Oliphaunt create() {
return new Oliphaunt(count.getAndIncrement());
}

}
java

测试对象池的使用

public class Main {

public static void main(String[] args) {

ObjectPool<Oliphaunt> oliphauntObjectPool = new OliphauntObjectPool();
Oliphaunt oliphaunt = oliphauntObjectPool.checkOut();
Oliphaunt oliphaunt2 = oliphauntObjectPool.checkOut();
oliphaunt.doSomething();
oliphaunt2.doSomething();
oliphauntObjectPool.checkIn(oliphaunt);
oliphauntObjectPool.checkIn(oliphaunt2);

Oliphaunt oliphaunt3 = oliphauntObjectPool.checkOut();
oliphaunt3.doSomething();

oliphauntObjectPool.printPoolInfo();
}
}
java

输出结果

org.depsea.design.pattern.creation.objectpool.Oliphaunt -- sno: 1, do something
org.depsea.design.pattern.creation.objectpool.Oliphaunt -- sno: 2, do something
org.depsea.design.pattern.creation.objectpool.Oliphaunt -- sno: 1, do something
org.depsea.design.pattern.creation.objectpool.ObjectPool -- available: 1, using: 1

存在的问题

以上实现有一个使用起来不太方便的地方,每次使用完后都需要通过对象池的 checkIn 方法归还对象。但是我们在使用连接池获取连接,使用完毕后好像并没有这个操作,而是直接调用连接的 close 方法即可。这是如何实现的呢?这里提供一个思路。

使Oliphaut实现Closeable并提供一个关闭函数close,并在 Oliphaunt 中提供一个钩子函数,用于在Oliphaunt创建时,创建者可以注入一个钩子,这个钩子函数的目的就是将对象返还到连接池中。然后Oliphaut在关闭函数中调用这个钩子,就可以达到回收对象的目的。

我们需要对 OliphautOliphauntObjectPool 稍加改造。

@Slf4j
public class Oliphaunt implements Closeable {

// 这里类中定义一个序号,用来区分不同的实例
private final Integer sno;

@Setter
private Consumer<Oliphaunt> closeHook;

public Oliphaunt(Integer sno) {
this.sno = sno;
}

public void doSomething() {
log.info("sno: {}, do something", this.sno);
}

@Override
public void close() {
if (closeHook != null) {
closeHook.accept(this);
}
}
}
java
public class OliphauntObjectPool extends ObjectPool<Oliphaunt> {

private final AtomicInteger count = new AtomicInteger(1);

@Override
public Oliphaunt create() {
Oliphaunt oliphaunt = new Oliphaunt(count.getAndIncrement());
oliphaunt.setCloseHook(this::checkIn);
return oliphaunt;
}

}
java

测试

public class Main {

public static void main(String[] args) {

ObjectPool<Oliphaunt> oliphauntObjectPool = new OliphauntObjectPool();
Oliphaunt oliphaunt = oliphauntObjectPool.checkOut();
oliphaunt.doSomething();
oliphaunt.close();

Oliphaunt oliphaunt2 = oliphauntObjectPool.checkOut();
oliphaunt2.doSomething();

oliphauntObjectPool.printPoolInfo();
}
}
java

输出结果

org.depsea.design.pattern.creation.objectpool.Oliphaunt -- sno: 1, do something
org.depsea.design.pattern.creation.objectpool.Oliphaunt -- sno: 1, do something
org.depsea.design.pattern.creation.objectpool.ObjectPool -- available: 0, using: 1

这样依然会有问题,因为我们占用了原对象的close方法,如果原对象需要通过 close 方法来释放资源,会出现 “鱼与熊掌不可兼得”的情况。要么通过close回收对象,要么通过close释放资源。有没有其他方法解决呢?

那这里就需要用到代理模式(Proxy Pattern),我们创建一个 Oliphaunt 的代理类。通过代理类来回收对象。在需要关闭资源时,通过代理对象获取到真实的对象,然后调用真实对象的close方法来释放资源。

其类结构也会后些许变化,Oliphaunt不能再是一个具体类,而应是一个接口。具体如下:

ObjectPool的代码不动,其他类都有修改,具体如下:

public interface Oliphaunt extends Closeable {
void doSomething();
}
java
@Slf4j
public class OliphauntImpl implements Oliphaunt, Closeable {

// 这里类中定义一个序号,用来区分不同的实例
private final Integer sno;

public OliphauntImpl(Integer sno) {
this.sno = sno;
}

public void doSomething() {
log.info("sno: {}, do something", this.sno);
}

@Override
public void close() {
log.info("sno: {}, 释放资源", this.sno);
}
}
java
public class OliphauntProxy implements Oliphaunt {

@Getter
private final Oliphaunt realOliphaunt;

private final ObjectPool<Oliphaunt> objectPool;

public OliphauntProxy(Oliphaunt realOliphaunt, ObjectPool<Oliphaunt> objectPool) {
this.realOliphaunt = realOliphaunt;
this.objectPool = objectPool;
}


@Override
public void doSomething() {
this.realOliphaunt.doSomething();
}

@Override
public void close() throws IOException {
log.info("回收资源");
this.objectPool.checkIn(this);
}
}
java
public class OliphauntObjectPool extends ObjectPool<Oliphaunt> {

private final AtomicInteger count = new AtomicInteger(1);

@Override
public Oliphaunt create() {
// 先创建具体实例,再通过代理类代理具体实例
Oliphaunt oliphaunt = new OliphauntImpl(count.getAndIncrement());
return new OliphauntProxy(oliphaunt, this);
}

public void shutdown() throws IOException {
// 释放资源,无论是在用的还是可用的全部释放。在实际应用中需要区分这两种资源的是否逻辑,但这不是本例的重点,估简单处理。
while (!this.available.isEmpty()) {
OliphauntProxy oliphauntProxy = (OliphauntProxy) this.available.poll();
oliphauntProxy.getRealOliphaunt().close();
}
while (!this.using.isEmpty()) {
OliphauntProxy oliphauntProxy = (OliphauntProxy) this.using.poll();
oliphauntProxy.getRealOliphaunt().close();
}
this.available.clear();
this.using.clear();
}

}
java

测试代码

public class Main {

public static void main(String[] args) throws IOException {

OliphauntObjectPool oliphauntObjectPool = new OliphauntObjectPool();
Oliphaunt oliphaunt = oliphauntObjectPool.checkOut();
oliphaunt.doSomething();
oliphaunt.close();

Oliphaunt oliphaunt2 = oliphauntObjectPool.checkOut();
oliphaunt2.doSomething();

oliphauntObjectPool.printPoolInfo();
oliphaunt2.close();

oliphauntObjectPool.shutdown();
}
}
java

输出

org.depsea.design.pattern.creation.objectpool.OliphauntImpl -- sno: 1, do something
org.depsea.design.pattern.creation.objectpool.OliphauntProxy -- 回收资源
org.depsea.design.pattern.creation.objectpool.OliphauntImpl -- sno: 1, do something
org.depsea.design.pattern.creation.objectpool.ObjectPool -- available: 0, using: 1
org.depsea.design.pattern.creation.objectpool.OliphauntProxy -- 回收资源
org.depsea.design.pattern.creation.objectpool.OliphauntImpl -- sno: 1, 释放资源

在本文的例子中ObjectPool是一个抽象类,但是最好定义成接口,并提供一个AbstractObjectPool类抽象公共业务。并且 shutdown方法应该上移至顶层接口,及ObjectPool中。类图如下:

这种方法也是连接池常用的一种方法,无论是Druid 还是 HikariCP 处理连接回收的逻辑基本与上文中的一致。

实际应用

对象池最常见的应用就是连接池。以HikariCP为例,简单介绍下对象池模式在连接池中的应用。

核心类方法:

  • com.zaxxer.hikari.HikariDataSource
  • com.zaxxer.hikari.pool.HikariPool
  • com.zaxxer.hikari.util.ConcurrentBag
  • com.zaxxer.hikari.pool.PoolEntry
  • com.zaxxer.hikari.HikariDataSource#getConnection()
  • com.zaxxer.hikari.pool.HikariPool#getConnection()
  • com.zaxxer.hikari.util.ConcurrentBag#borrow
  • com.zaxxer.hikari.pool.PoolEntry#createProxyConnection
  • com.zaxxer.hikari.pool.ProxyConnection#close
  • com.zaxxer.hikari.pool.PoolEntry#recycle
  • com.zaxxer.hikari.pool.HikariPool#recycle

连接获取

com.zaxxer.hikari.HikariDataSource实际上是com.zaxxer.hikari.pool.HikariPool的一个代理类,核心逻辑都在com.zaxxer.hikari.pool.HikariPool中。当我们调用com.zaxxer.hikari.HikariDataSource#getConnection()方法时,实际上调用的是com.zaxxer.hikari.pool.HikariPool#getConnection()

HikariCP使用com.zaxxer.hikari.util.ConcurrentBag来管理对象。ConcurrentBag并没有使用两个集合来维护可用和在用的对象列表,而是使用com.zaxxer.hikari.pool.PoolEntry中的状态(state)字段来维护。

com.zaxxer.hikari.pool.HikariPool初始化时会创建一个 com.zaxxer.hikari.pool.PoolEntry对象并添加到 com.zaxxer.hikari.util.ConcurrentBag中。

   public HikariPool(final HikariConfig config)
{
super(config);

this.connectionBag = new ConcurrentBag<>(this);
this.suspendResumeLock = config.isAllowPoolSuspension() ? new SuspendResumeLock() : SuspendResumeLock.FAUX_LOCK;

this.houseKeepingExecutorService = initializeHouseKeepingExecutorService();

checkFailFast();

if (config.getMetricsTrackerFactory() != null) {
setMetricsTrackerFactory(config.getMetricsTrackerFactory());
}
else {
setMetricRegistry(config.getMetricRegistry());
}

...
}
java

checkFailFast 方法会创建一个 PoolEntry并添加到connectionBag中。

   private void checkFailFast()
{
final var initializationTimeout = config.getInitializationFailTimeout();
if (initializationTimeout < 0) {
return;
}

final var startTime = currentTime();
do {
// 在这里创建了一个PoolEntry
final var poolEntry = createPoolEntry();
if (poolEntry != null) {
if (config.getMinimumIdle() > 0) {
connectionBag.add(poolEntry);
logger.info("{} - Added connection {}", poolName, poolEntry.connection);
}
else {
quietlyCloseConnection(poolEntry.close(), "(initialization check complete and minimumIdle is zero)");
}

return;
}

if (getLastConnectionFailure() instanceof ConnectionSetupException) {
throwPoolInitializationException(getLastConnectionFailure().getCause());
}

quietlySleep(SECONDS.toMillis(1));
} while (elapsedMillis(startTime) < initializationTimeout);

if (initializationTimeout > 0) {
throwPoolInitializationException(getLastConnectionFailure());
}
}
java

当调用com.zaxxer.hikari.pool.HikariPool#getConnection(long)时会调用
com.zaxxer.hikari.util.ConcurrentBag#borrow获取一个PoolEntry,最终通过poolEntry.createProxyConnection创建一个代理连接。代码如下:

   public Connection getConnection(final long hardTimeout) throws SQLException
{
suspendResumeLock.acquire();
final var startTime = currentTime();

try {
var timeout = hardTimeout;
do {
// 这里从池中获取一个`poolEntry`
var poolEntry = connectionBag.borrow(timeout, MILLISECONDS);
if (poolEntry == null) {
break; // We timed out... break and throw exception
}

final var now = currentTime();
if (poolEntry.isMarkedEvicted() || (elapsedMillis(poolEntry.lastAccessed, now) > aliveBypassWindowMs && isConnectionDead(poolEntry.connection))) {
closeConnection(poolEntry, poolEntry.isMarkedEvicted() ? EVICTED_CONNECTION_MESSAGE : DEAD_CONNECTION_MESSAGE);
timeout = hardTimeout - elapsedMillis(startTime);
}
else {
metricsTracker.recordBorrowStats(poolEntry, startTime);
// 创建代理连接
return poolEntry.createProxyConnection(leakTaskFactory.schedule(poolEntry));
}
} while (timeout > 0L);

metricsTracker.recordBorrowTimeoutStats(startTime);
throw createTimeoutException(startTime);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SQLException(poolName + " - Interrupted during connection acquisition", e);
}
finally {
suspendResumeLock.release();
}
}
java
   public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
{
// Try the thread-local list first
final var list = threadList.get();
for (int i = list.size() - 1; i >= 0; i--) {
final var entry = list.remove(i);
@SuppressWarnings("unchecked")
final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
// 这里判断PoolEntry是否为未使用状态
if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}
}

// Otherwise, scan the shared list ... then poll the handoff queue
final int waiting = waiters.incrementAndGet();
try {
for (T bagEntry : sharedList) {
// 这里判断PoolEntry是否为未使用状态
if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
// If we may have stolen another waiter's connection, request another bag add.
if (waiting > 1) {
// 如果等待者超过1,则添加一个PoolEntry
listener.addBagItem(waiting - 1);
}
return bagEntry;
}
}

listener.addBagItem(waiting);

timeout = timeUnit.toNanos(timeout);
do {
final var start = currentTime();
final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
return bagEntry;
}

timeout -= elapsedNanos(start);
} while (timeout > 10_000);

return null;
}
finally {
waiters.decrementAndGet();
}
}
java

上面代码中的listener就是com.zaxxer.hikari.pool.HikariPool,相当于是调用的com.zaxxer.hikari.pool.HikariPool#addBagItem方法创建PoolEntry

@Override
public void addBagItem(final int waiting)
{
if (waiting > addConnectionExecutor.getQueue().size())
addConnectionExecutor.submit(poolEntryCreator);
}
java

以上就是对象创建的全部核心逻辑。部分代码未展示,有兴趣的可以去阅读HikariCP源码。

连接回收

HikariPool.getConnection()返回的连接对象实际为com.zaxxer.hikari.pool.HikariProxyConnection,而com.zaxxer.hikari.pool.HikariProxyConnection又继承自com.zaxxer.hikari.pool.ProxyConnection

当调用com.zaxxer.hikari.pool.ProxyConnection#close方法时,最终调用了com.zaxxer.hikari.pool.PoolEntry#recycle方法。

@Override
public final void close() throws SQLException
{
// Closing statements can cause connection eviction, so this must run before the conditional below
closeStatements();

if (delegate != ClosedConnection.CLOSED_CONNECTION) {
leakTask.cancel();

try {
if (isCommitStateDirty && !isAutoCommit) {
delegate.rollback();
LOGGER.debug("{} - Executed rollback on connection {} due to dirty commit state on close().", poolEntry.getPoolName(), delegate);
}

if (dirtyBits != 0) {
poolEntry.resetConnectionState(this, dirtyBits);
}

delegate.clearWarnings();
}
catch (SQLException e) {
// when connections are aborted, exceptions are often thrown that should not reach the application
if (!poolEntry.isMarkedEvicted()) {
throw checkException(e);
}
}
finally {
delegate = ClosedConnection.CLOSED_CONNECTION;
poolEntry.recycle();
}
}
}
java

其中的com.zaxxer.hikari.pool.ProxyConnection#delegate 就是真实的连接对象。

void recycle()
{
if (connection != null) {
this.lastAccessed = currentTime();
hikariPool.recycle(this);
}
}
java

java com.zaxxer.hikari.pool.PoolEntry#recycle 又调用 com.zaxxer.hikari.pool.HikariPool#recycle方法,最终将对象回收到connectionBag中。

总结

对象池其实有点类似于工厂,工厂生产的对象交个客户端后就由客户端来维护其生命周期了。而对象池生产的对象再客户端用完之后还需要还给对象池,对象池中对象的生命周期是由对象池来维护的。

对象池的核心操作

  • 获取对象:从池中获取一个可用对象,可能会触发对象池的创建动作,创建出一个新的对象。
  • 对象归还:在使用完成后归还对象