C#对象池模式的实现
有没有人有一个很好的资源实施共享对象池战略的有限资源的Sql连接池? (即完全实现它是线程安全的)。
为了跟进@Aaronaught的澄清要求,池的使用将是负载平衡请求到外部服务。 把它放在一个可能更容易立即理解的场景中,而不是我的直接情景。 我有一个会话对象,其function类似于NHibernate的ISession
对象。 每个独特的会话pipe理它连接到数据库。 目前我有1长时间运行的会话对象,并遇到问题,我的服务提供商是速率限制我个人会话的使用。
由于他们没有期望将一个会话视为一个长期运行的服务帐户,他们显然把它视为一个锤击他们的服务的客户端。 这使我想到我的问题,而不是有一个单独的会话,我会创build一个不同的会话池,并跨越这些多个会话将请求分离到服务,而不是像我以前那样创build一个焦点。
希望背景提供一些价值,但直接回答你的一些问题:
问:创build对象是否昂贵?
答:没有对象是一个有限的资源池
问:他们会被频繁获得/释放吗?
答:是的,他们可以再次想到NHibernate的ISessions,其中1通常是在每个页面请求期间获取和发布的。
问:简单的先到先得的服务是否足够,还是需要更聪明的东西,即能够防止饥饿?
答:一个简单的循环型分布就足够了,通过饥饿我假设你是指如果没有可用的会话,呼叫者被阻止等待发布。 这是不适用的,因为会话可以由不同的呼叫者共享。 我的目标是分配使用跨多个会话,而不是一个单一的会议。
我认为这可能与对象池的正常使用有所分歧,这就是为什么我最初离开这个部分并计划只是为了适应模式,允许共享对象,而不是让饥饿的情况发生。
问:关于优先级,懒惰与急切加载等事情呢?
答:不涉及优先级,为了简单起见,假设我将在创build池本身时创build可用对象池。
.NET核心中的对象池
dotnet核心具有添加到基类库(BCL)的对象池的实现。 您可以在这里阅读原始的GitHub问题并查看System.Buffers的代码。 目前ArrayPool
是唯一可用的types,用于存储数组。 这里有一个很好的博客文章。
namespace System.Buffers { public abstract class ArrayPool<T> { public static ArrayPool<T> Shared { get; internal set; } public static ArrayPool<T> Create(int maxBufferSize = <number>, int numberOfBuffers = <number>); public T[] Rent(int size); public T[] Enlarge(T[] buffer, int newSize, bool clearBuffer = false); public void Return(T[] buffer, bool clearBuffer = false); } }
在ASP.NET Core中可以看到它的用法的一个例子。 由于它位于BCL的dotnet核心,因此ASP.NET Core可以与其他对象(如Newtonsoft.Json的JSON序列化程序)共享其对象池。 你可以阅读这篇博客文章,了解更多关于Newtonsoft.Json如何做的信息。
Microsoft Roslyn C#编译器中的对象池
新的Microsoft Roslyn C#编译器包含ObjectPooltypes,该types用于汇集经常使用的对象,这些对象通常会变得更新,并且经常收集垃圾。 这减less了必须发生的垃圾收集操作的数量和大小。 有几个不同的子实现都使用ObjectPool(请参阅: 为什么在Roslyn中有太多的Object Pooling实现? )。
1 – SharedPools – 如果使用BigDefault,则存储20个对象的池或100个池。
// Example 1 - In a using statement, so the object gets freed at the end. using (PooledObject<Foo> pooledObject = SharedPools.Default<List<Foo>>().GetPooledObject()) { // Do something with pooledObject.Object } // Example 2 - No using statement so you need to be sure no exceptions are not thrown. List<Foo> list = SharedPools.Default<List<Foo>>().AllocateAndClear(); // Do something with list SharedPools.Default<List<Foo>>().Free(list); // Example 3 - I have also seen this variation of the above pattern, which ends up the same as Example 1, except Example 1 seems to create a new instance of the IDisposable [PooledObject<T>][4] object. This is probably the preferred option if you want fewer GC's. List<Foo> list = SharedPools.Default<List<Foo>>().AllocateAndClear(); try { // Do something with list } finally { SharedPools.Default<List<Foo>>().Free(list); }
2 – ListPool和StringBuilderPool – 不是严格独立的实现,而是专门用于List和StringBuilder的SharedPools实现的封装。 因此,这将重新使用存储在SharedPools中的对象池。
// Example 1 - No using statement so you need to be sure no exceptions are thrown. StringBuilder stringBuilder= StringBuilderPool.Allocate(); // Do something with stringBuilder StringBuilderPool.Free(stringBuilder); // Example 2 - Safer version of Example 1. StringBuilder stringBuilder= StringBuilderPool.Allocate(); try { // Do something with stringBuilder } finally { StringBuilderPool.Free(stringBuilder); }
3 – PooledDictionary和PooledHashSet – 这些直接使用ObjectPool,并有一个完全独立的对象池。 存储一个由128个对象组成的池。
// Example 1 PooledHashSet<Foo> hashSet = PooledHashSet<Foo>.GetInstance() // Do something with hashSet. hashSet.Free(); // Example 2 - Safer version of Example 1. PooledHashSet<Foo> hashSet = PooledHashSet<Foo>.GetInstance() try { // Do something with hashSet. } finally { hashSet.Free(); }
Microsoft.IO.RecyclableMemoryStream
这个库为MemoryStream
对象提供池。 这是System.IO.MemoryStream
替代品。 它具有完全相同的语义。 它是由Bing工程师devise的。 阅读这里的博客文章或在GitHub上看到的代码。
var sourceBuffer = new byte[]{0,1,2,3,4,5,6,7}; var manager = new RecyclableMemoryStreamManager(); using (var stream = manager.GetStream()) { stream.Write(sourceBuffer, 0, sourceBuffer.Length); }
请注意, RecyclableMemoryStreamManager
应该声明一次,并且它将在整个进程中生存 – 这是池。 如果您愿意,使用多个池是完全正确的。
由于一些未知因素,这个问题比人们可能期望的要复杂一些:资源池的行为,对象的预期/需要的生命周期,需要池的真正原因等等。典型的池是特殊用途的线程池,连接池等 – 因为当你确切地知道资源的作用,更重要的是可以控制资源的实现时,更容易优化它。
既然不是那么简单,我试图做的是提供一个相当灵活的方法,你可以尝试一下,看看什么效果最好。 对于这个长期职位,我们提前表示歉意,但是在实施一个体面的通用资源库方面有很多的理由。 而我真的只是在表面上刮擦。
通用的游泳池必须有几个主要的“设置”,包括:
- 资源加载策略 – 渴望或懒惰;
- 资源加载机制 – 如何实际构build一个;
- 访问策略 – 你提到“循环”,这不像听起来那么直截了当; 这个实现可以使用一个类似但不完美的循环缓冲区,因为池实际上回收资源的时候是无法控制的。 其他选项有FIFO和LIFO; FIFO将会有更多的随机访问模式,但是LIFO使得实现最近最less使用的释放策略(你所说的超出范围,但仍然值得一提)更容易。
对于资源加载机制,.NET已经给了我们一个干净的抽象 – 代表。
private Func<Pool<T>, T> factory;
把它传递给pool的构造函数,我们就完成了。 使用带new()
约束的genericstypes也可以,但是这更灵活。
其他两个参数中,访问策略是更复杂的野兽,所以我的方法是使用基于inheritance(接口)的方法:
public class Pool<T> : IDisposable { // Other code - we'll come back to this interface IItemStore { T Fetch(); void Store(T item); int Count { get; } } }
这里的概念很简单 – 我们让公共Pool
类处理像线程安全这样的常见问题,但是为每个访问模式使用不同的“item store”。 LIFO很容易代表堆栈,FIFO是一个队列,而且我使用了一个不是非常优化,但是可能足够的循环缓冲区实现,它使用List<T>
和索引指针来近似循环访问模式。
下面所有的类都是Pool<T>
内部类 – 这是一个样式select,但是因为这些并不意味着在Pool
之外使用,所以最有意义。
class QueueStore : Queue<T>, IItemStore { public QueueStore(int capacity) : base(capacity) { } public T Fetch() { return Dequeue(); } public void Store(T item) { Enqueue(item); } } class StackStore : Stack<T>, IItemStore { public StackStore(int capacity) : base(capacity) { } public T Fetch() { return Pop(); } public void Store(T item) { Push(item); } }
这些是显而易见的 – 堆栈和队列。 我不认为他们真的需要太多的解释。 循环缓冲区稍微复杂一些:
class CircularStore : IItemStore { private List<Slot> slots; private int freeSlotCount; private int position = -1; public CircularStore(int capacity) { slots = new List<Slot>(capacity); } public T Fetch() { if (Count == 0) throw new InvalidOperationException("The buffer is empty."); int startPosition = position; do { Advance(); Slot slot = slots[position]; if (!slot.IsInUse) { slot.IsInUse = true; --freeSlotCount; return slot.Item; } } while (startPosition != position); throw new InvalidOperationException("No free slots."); } public void Store(T item) { Slot slot = slots.Find(s => object.Equals(s.Item, item)); if (slot == null) { slot = new Slot(item); slots.Add(slot); } slot.IsInUse = false; ++freeSlotCount; } public int Count { get { return freeSlotCount; } } private void Advance() { position = (position + 1) % slots.Count; } class Slot { public Slot(T item) { this.Item = item; } public T Item { get; private set; } public bool IsInUse { get; set; } } }
我可以select一些不同的方法,但底线是资源应该按照它们创build的顺序访问,这意味着我们必须保持对它们的引用,但将它们标记为“正在使用”(或不)。 在最糟糕的情况下,只有一个插槽可用,并且每次获取都需要缓冲区的完整迭代。 如果您有数百个资源汇集在一起,并且每秒获取并释放数次, 对于5-10个物品池来说并不是一个真正的问题,而在典型的情况下,如果资源被轻易使用,它只需要提前一两个时隙。
请记住,这些类是私有的内部类 – 这就是为什么他们不需要大量的错误检查,池本身限制对它们的访问。
抛出枚举和工厂方法,我们完成了这部分:
// Outside the pool public enum AccessMode { FIFO, LIFO, Circular }; private IItemStore itemStore; // Inside the Pool private IItemStore CreateItemStore(AccessMode mode, int capacity) { switch (mode) { case AccessMode.FIFO: return new QueueStore(capacity); case AccessMode.LIFO: return new StackStore(capacity); default: Debug.Assert(mode == AccessMode.Circular, "Invalid AccessMode in CreateItemStore"); return new CircularStore(capacity); } }
下一个要解决的问题是加载策略。 我定义了三种types:
public enum LoadingMode { Eager, Lazy, LazyExpanding };
前两个应该是不言自明的; 第三种是混合,它懒惰地加载资源,但实际上并没有开始重新使用任何资源,直到池已满。 如果你想让游戏池满(这听起来像你一样),但是要延迟实际创build游戏直到第一次访问(即改善启动时间)的开销,这将是一个很好的折衷。
加载方法真的不是太复杂,现在我们有物品存储抽象:
private int size; private int count; private T AcquireEager() { lock (itemStore) { return itemStore.Fetch(); } } private T AcquireLazy() { lock (itemStore) { if (itemStore.Count > 0) { return itemStore.Fetch(); } } Interlocked.Increment(ref count); return factory(this); } private T AcquireLazyExpanding() { bool shouldExpand = false; if (count < size) { int newCount = Interlocked.Increment(ref count); if (newCount <= size) { shouldExpand = true; } else { // Another thread took the last spot - use the store instead Interlocked.Decrement(ref count); } } if (shouldExpand) { return factory(this); } else { lock (itemStore) { return itemStore.Fetch(); } } } private void PreloadItems() { for (int i = 0; i < size; i++) { T item = factory(this); itemStore.Store(item); } count = size; }
上面的size
和count
字段分别指的是池的最大大小和池所拥有的资源总数(但不一定是可用的 )。 AcquireEager
是最简单的,它假定一个项目已经在商店中 – 这些项目将在施工时预加载,即在最后显示的PreloadItems
方法中。
AcquireLazy
检查池中是否有空闲物品,如果没有,则创build一个新的物品。 AcquireLazyExpanding
将创build一个新的资源,只要该池尚未达到其目标大小。 我试图优化这个以最大限度地减lesslocking,我希望我没有犯任何错误(我已经在multithreading条件下testing了这个,但显然不是详尽的)。
您可能想知道为什么这些方法都不能检查商店是否达到最大尺寸。 我马上就会明白这一点。
现在为游泳池本身。 这里是一整套私人数据,其中一些已经被显示:
private bool isDisposed; private Func<Pool<T>, T> factory; private LoadingMode loadingMode; private IItemStore itemStore; private int size; private int count; private Semaphore sync;
回答我在最后一段中所讨论的问题 – 如何确保我们限制创build的资源总数 – 事实certificate,.NET已经有了一个非常好的工具,它被称为信号量(Semaphore) ,它专门devise用于允许固定线程数量访问资源(在这种情况下,“资源”是内部项目存储)。 由于我们没有实施一个完整的生产者/消费者队列,所以这对我们的需求是完全适合的。
构造函数如下所示:
public Pool(int size, Func<Pool<T>, T> factory, LoadingMode loadingMode, AccessMode accessMode) { if (size <= 0) throw new ArgumentOutOfRangeException("size", size, "Argument 'size' must be greater than zero."); if (factory == null) throw new ArgumentNullException("factory"); this.size = size; this.factory = factory; sync = new Semaphore(size, size); this.loadingMode = loadingMode; this.itemStore = CreateItemStore(accessMode, size); if (loadingMode == LoadingMode.Eager) { PreloadItems(); } }
这里应该不会有什么惊喜。 唯一需要注意的是使用前面已经介绍过的PreloadItems
方法的加载的特殊PreloadItems
。
由于现在几乎所有的东西都被完全抽象出来了,所以实际的Acquire
和Release
方法真的非常简单:
public T Acquire() { sync.WaitOne(); switch (loadingMode) { case LoadingMode.Eager: return AcquireEager(); case LoadingMode.Lazy: return AcquireLazy(); default: Debug.Assert(loadingMode == LoadingMode.LazyExpanding, "Unknown LoadingMode encountered in Acquire method."); return AcquireLazyExpanding(); } } public void Release(T item) { lock (itemStore) { itemStore.Store(item); } sync.Release(); }
如前所述,我们使用Semaphore
来控制并发性,而不是虔诚地检查项目存储的状态。 只要获得的物品被正确释放,就没有什么可担心的。
最后但并非最不重要的,有清理:
public void Dispose() { if (isDisposed) { return; } isDisposed = true; if (typeof(IDisposable).IsAssignableFrom(typeof(T))) { lock (itemStore) { while (itemStore.Count > 0) { IDisposable disposable = (IDisposable)itemStore.Fetch(); disposable.Dispose(); } } } sync.Close(); } public bool IsDisposed { get { return isDisposed; } }
该IsDisposed
属性的目的将在一瞬间变得清晰。 所有主要的Dispose
方法实际上都是在实现IDisposable
情况下处理实际的合并项目。
现在,你可以基本上用try-finally
块,但是我不喜欢这种语法,因为如果你开始在类和方法之间传递资源,那么它会变得非常混乱。 使用资源的主类可能甚至没有对池的引用。 它确实变得相当混乱,所以更好的方法是创build一个“聪明”的汇集对象。
假设我们从以下简单的接口/类开始:
public interface IFoo : IDisposable { void Test(); } public class Foo : IFoo { private static int count = 0; private int num; public Foo() { num = Interlocked.Increment(ref count); } public void Dispose() { Console.WriteLine("Goodbye from Foo #{0}", num); } public void Test() { Console.WriteLine("Hello from Foo #{0}", num); } }
这是我们伪装的一次性Foo
资源,它实现了IFoo
并有一些样板代码用于生成唯一的身份。 我们所做的是创build另一个特殊的汇集对象:
public class PooledFoo : IFoo { private Foo internalFoo; private Pool<IFoo> pool; public PooledFoo(Pool<IFoo> pool) { if (pool == null) throw new ArgumentNullException("pool"); this.pool = pool; this.internalFoo = new Foo(); } public void Dispose() { if (pool.IsDisposed) { internalFoo.Dispose(); } else { pool.Release(this); } } public void Test() { internalFoo.Test(); } }
这只是代表所有的“真正的”方法到其内部的IFoo
(我们可以做到这一点与dynamic代理库,如城堡,但我不会进入)。 它还维护对创build它的Pool
的引用,以便当我们Dispose
此对象时,它会自动将其自身释放回池中。 除了池已经被处置 – 这意味着我们处于“清理”模式,在这种情况下,它实际上清理内部资源 。
使用上面的方法,我们可以像这样编写代码:
// Create the pool early Pool<IFoo> pool = new Pool<IFoo>(PoolSize, p => new PooledFoo(p), LoadingMode.Lazy, AccessMode.Circular); // Sometime later on... using (IFoo foo = pool.Acquire()) { foo.Test(); }
这是一件很好的事情,能够做到。 这意味着使用 IFoo
的代码(而不是创build它的代码)实际上并不需要知道该池。 您甚至可以使用您最喜爱的DI库和Pool<T>
作为提供者/工厂来注入 IFoo
对象。
我已经把完整的代码放在PasteBin上,以供您复制和粘贴的乐趣。 还有一个简短的testing程序,您可以使用不同的加载/访问模式和multithreading条件,以满足自己的线程安全,而不是越野车。
如果您对此有任何疑问或疑问,请告知我们。
像这样的东西可能适合您的需求。
/// <summary> /// Represents a pool of objects with a size limit. /// </summary> /// <typeparam name="T">The type of object in the pool.</typeparam> public sealed class ObjectPool<T> : IDisposable where T : new() { private readonly int size; private readonly object locker; private readonly Queue<T> queue; private int count; /// <summary> /// Initializes a new instance of the ObjectPool class. /// </summary> /// <param name="size">The size of the object pool.</param> public ObjectPool(int size) { if (size <= 0) { const string message = "The size of the pool must be greater than zero."; throw new ArgumentOutOfRangeException("size", size, message); } this.size = size; locker = new object(); queue = new Queue<T>(); } /// <summary> /// Retrieves an item from the pool. /// </summary> /// <returns>The item retrieved from the pool.</returns> public T Get() { lock (locker) { if (queue.Count > 0) { return queue.Dequeue(); } count++; return new T(); } } /// <summary> /// Places an item in the pool. /// </summary> /// <param name="item">The item to place to the pool.</param> public void Put(T item) { lock (locker) { if (count < size) { queue.Enqueue(item); } else { using (item as IDisposable) { count--; } } } } /// <summary> /// Disposes of items in the pool that implement IDisposable. /// </summary> public void Dispose() { lock (locker) { count = 0; while (queue.Count > 0) { using (queue.Dequeue() as IDisposable) { } } } } }
用法示例
public class ThisObject { private readonly ObjectPool<That> pool = new ObjectPool<That>(100); public void ThisMethod() { var that = pool.Get(); try { // Use that .... } finally { pool.Put(that); } } }
当天,Microsoft通过Microsoft Transaction Server(MTS)提供了一个框架,随后COM +为COM对象进行对象池化。 该function被转移到.NET Framework中的System.EnterpriseServices,现在在Windows Communication Foundation中。
WCF中的对象池
本文来自.NET 1.1,但仍然适用于当前版本的框架(即使WCF是首选方法)。
对象池.NET
我非常喜欢Aronaught的实现 – 尤其是在他处理等待资源以通过使用信号量变得可用时。 我想补充几个补充:
- 将
sync.WaitOne()
更改为sync.WaitOne(timeout)
,并将超时作为Acquire(int timeout)
方法的参数公开。 当线程超时等待对象变为可用时,这也将需要处理条件。 - 例如,添加
Recycle(T item)
方法来处理发生故障时需要回收对象的情况。
面向Java的,本文展示了connectionImpl池模式和抽象对象池模式,可能是一个很好的第一种方法: http : //www.developer.com/design/article.php/626171/Pattern-Summaries-Object-Pool。 HTM
对象池模式:
来自MSDN的示例: 如何:使用ConcurrentBag创build一个对象池
这是另一个实现,池中的对象数量有限。
public class ObjectPool<T> where T : class { private readonly int maxSize; private Func<T> constructor; private int currentSize; private Queue<T> pool; private AutoResetEvent poolReleasedEvent; public ObjectPool(int maxSize, Func<T> constructor) { this.maxSize = maxSize; this.constructor = constructor; this.currentSize = 0; this.pool = new Queue<T>(); this.poolReleasedEvent = new AutoResetEvent(false); } public T GetFromPool() { T item = null; do { lock (this) { if (this.pool.Count == 0) { if (this.currentSize < this.maxSize) { item = this.constructor(); this.currentSize++; } } else { item = this.pool.Dequeue(); } } if (null == item) { this.poolReleasedEvent.WaitOne(); } } while (null == item); return item; } public void ReturnToPool(T item) { lock (this) { this.pool.Enqueue(item); this.poolReleasedEvent.Set(); } } }
msdn的扩展如何使用ConcurrentBag创build对象池。