在.NET 4.0中没有ConcurrentList <T>?
我很高兴在.NET 4.0中看到新的System.Collections.Concurrent
命名空间,相当不错! 我见过ConcurrentDictionary
, ConcurrentQueue
, ConcurrentStack
, ConcurrentBag
和BlockingCollection
。
似乎神秘缺失的一件事是ConcurrentList<T>
。 我必须自己写(或从网上下载:))?
我在这里错过了很明显的东西吗
我试了一会儿 (也在GitHub上 )。 我的实现有一些问题,我不会在这里进入。 让我告诉你,更重要的是,我学到了什么。
首先,你不可能完全实现无锁和线程安全的IList<T>
。 特别是,随机插入和删除不会起作用,除非你也忘记了O(1)随机存取(即除非你“作弊”,只是使用某种链接列表,让索引吸)。
我认为可能值得的是IList<T>
一个线程安全的有限子集:特别是一个允许Add
并提供索引的随机只读访问(但不包括Insert
, RemoveAt
等等)没有随机写访问)。
这是我的ConcurrentList<T>
实现的目标。 但是当我在multithreading场景下testing它的性能时,我发现简单的同步添加到List<T>
更快 。 基本上,添加List<T>
已经很快了, 所涉及的计算步骤的复杂性是微乎其微的(增加一个索引并将其分配给一个数组中的元素; 实际上就是这样 )。 你需要大量的并发写入才能看到任何types的锁争用; 即使如此,每个写入的平均性能仍然会击败ConcurrentList<T>
更昂贵但无锁的实现。
在列表的内部数组需要调整自己的相对罕见的事件中,您确实付出了很小的代价。 所以最终我得出的结论是,这是一个只有ConcurrentList<T>
集合types才有意义的小生境:当您希望保证在每次调用时添加一个元素的开销较低(因此,与摊销的性能目标相反)。
这根本不像你想象的那样有用。
你会用什么ConcurrentList?
线程世界中的随机存取容器的概念并不像可能出现的那样有用。 该声明
if (i < MyConcurrentList.Count) x = MyConcurrentList[i];
作为一个整体仍然不是线程安全的。
而不是创build一个ConcurrentList,试着build立解决scheme。 最常见的类是ConcurrentBag,特别是BlockingCollection。
对于已经提供的很好的答案,所有的应有的尊重,有时候我只是想要一个线程安全的IList。 没有什么先进的或幻想的。 在很多情况下,性能是很重要的,但有时这并不是一个问题。 是的,没有诸如“TryGetValue”之类的方法总会遇到挑战,但是大多数情况下,我只想要一些我可以枚举的东西,而不必担心把所有东西都locking。 是的,有人可能会发现在我的执行中可能会导致死锁或一些(我想)的一些“错误”,但让老实说:当涉及到multithreading,如果你不写你的代码正确,它无论如何,它正在陷入僵局。 考虑到这一点,我决定做一个简单的ConcurrentList实现,提供这些基本的需求。
而对于它的价值:我做了一个基本的testing,将1000万个项目添加到常规List和ConcurrentList中,结果是:
列表完成时间:7793毫秒。 并发完成:8064毫秒。
public class ConcurrentList<T> : IList<T>, IDisposable { #region Fields private readonly List<T> _list; private readonly ReaderWriterLockSlim _lock; #endregion #region Constructors public ConcurrentList() { this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion); this._list = new List<T>(); } public ConcurrentList(int capacity) { this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion); this._list = new List<T>(capacity); } public ConcurrentList(IEnumerable<T> items) { this._lock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion); this._list = new List<T>(items); } #endregion #region Methods public void Add(T item) { try { this._lock.EnterWriteLock(); this._list.Add(item); } finally { this._lock.ExitWriteLock(); } } public void Insert(int index, T item) { try { this._lock.EnterWriteLock(); this._list.Insert(index, item); } finally { this._lock.ExitWriteLock(); } } public bool Remove(T item) { try { this._lock.EnterWriteLock(); return this._list.Remove(item); } finally { this._lock.ExitWriteLock(); } } public void RemoveAt(int index) { try { this._lock.EnterWriteLock(); this._list.RemoveAt(index); } finally { this._lock.ExitWriteLock(); } } public int IndexOf(T item) { try { this._lock.EnterReadLock(); return this._list.IndexOf(item); } finally { this._lock.ExitReadLock(); } } public void Clear() { try { this._lock.EnterWriteLock(); this._list.Clear(); } finally { this._lock.ExitWriteLock(); } } public bool Contains(T item) { try { this._lock.EnterReadLock(); return this._list.Contains(item); } finally { this._lock.ExitReadLock(); } } public void CopyTo(T[] array, int arrayIndex) { try { this._lock.EnterReadLock(); this._list.CopyTo(array, arrayIndex); } finally { this._lock.ExitReadLock(); } } public IEnumerator<T> GetEnumerator() { return new ConcurrentEnumerator<T>(this._list, this._lock); } IEnumerator IEnumerable.GetEnumerator() { return new ConcurrentEnumerator<T>(this._list, this._lock); } ~ConcurrentList() { this.Dispose(false); } public void Dispose() { this.Dispose(true); } private void Dispose(bool disposing) { if (disposing) GC.SuppressFinalize(this); this._lock.Dispose(); } #endregion #region Properties public T this[int index] { get { try { this._lock.EnterReadLock(); return this._list[index]; } finally { this._lock.ExitReadLock(); } } set { try { this._lock.EnterWriteLock(); this._list[index] = value; } finally { this._lock.ExitWriteLock(); } } } public int Count { get { try { this._lock.EnterReadLock(); return this._list.Count; } finally { this._lock.ExitReadLock(); } } } public bool IsReadOnly { get { return false; } } #endregion } public class ConcurrentEnumerator<T> : IEnumerator<T> { #region Fields private readonly IEnumerator<T> _inner; private readonly ReaderWriterLockSlim _lock; #endregion #region Constructor public ConcurrentEnumerator(IEnumerable<T> inner, ReaderWriterLockSlim @lock) { this._lock = @lock; this._lock.EnterReadLock(); this._inner = inner.GetEnumerator(); } #endregion #region Methods public bool MoveNext() { return _inner.MoveNext(); } public void Reset() { _inner.Reset(); } public void Dispose() { this._lock.ExitReadLock(); } #endregion #region Properties public T Current { get { return _inner.Current; } } object IEnumerator.Current { get { return _inner.Current; } } #endregion }
ConcurrentList
(作为可resize的数组,不是链接列表)不容易用非阻塞操作写入。 其API不能很好地转换为“并发”版本。
System.Collections.Generic.List<t>
已经是multithreading安全的读者。 试图使它对多个作家线程安全是没有道理的。 (由于Henk和Stephen已经提到的原因)
如果读取次数多于写入次数,或者(不pipe频繁)写入次数是否不同步 ,则写入时复制的方法可能是合适的。
下面的实现是
- 无锁
- 并发读取速度非常快 ,即使正在进行并发修改 – 无论多长时间
- 因为“快照”是不可变的, 无锁primefaces性是可能的,即
var snap = _list; snap[snap.Count - 1];
var snap = _list; snap[snap.Count - 1];
将永远不会(当然,除了一个空的列表)抛出,而且你也得到线程安全枚举与快照语义免费..我是如何爱不变的! - 一般实现 ,适用于任何数据结构和任何types的修改
- 死简单 ,即易于testing,debugging,通过阅读代码进行validation
- 可用于.net 3.5
为了使写入时复制起作用,你必须保持你的数据结构有效的不可变 ,也就是说,在你把它们提供给其他线程之后,任何人都不能改变它们。 当你想修改,你
- 克隆结构
- 对克隆进行修改
- primefaces交换参考修改克隆
码
static class CopyOnWriteSwapper { public static void Swap<T>(ref T obj, Func<T, T> cloner, Action<T> op) where T : class { while (true) { var objBefore = Volatile.Read(ref obj); var newObj = cloner(objBefore); op(newObj); if (Interlocked.CompareExchange(ref obj, newObj, objBefore) == objBefore) return; } } }
用法
CopyOnWriteSwapper.Swap(ref _myList, orig => new List<string>(orig), clone => clone.Add("asdf"));
如果你需要更多的性能,这将有助于ungenerify方法,例如创build一个方法为每种types的修改(添加,删除,…),并且硬编码函数指针cloner
和op
。
NB#1这是你的责任,以确保没有人修改(所谓)不可变的数据结构。 在通用实现中我们没有办法阻止这种情况发生,但是当专注于List<T>
,可以使用List.AsReadOnly()
注意2请小心列表中的值。 上面的写入复制方法只会保护其列表成员资格,但是如果您不使用string,而是使用其他可变对象,则必须注意线程安全性(例如locking)。 但是这与这个解决scheme是正交的,例如可以容易地使用可变值的locking而没有问题。 你只需要知道它。
注意#3如果你的数据结构很庞大,而且你经常修改它,那么在内存消耗和涉及拷贝的CPU成本方面,全副本拷贝的方法可能会被禁止。 在这种情况下,你可能想使用MS的不可变集合 。
没有ConcurrentList的原因是因为它从根本上不能写。 之所以这样,是因为IList中的几个重要的操作依赖于索引,而这种简单的操作是行不通的。 例如:
int catIndex = list.IndexOf("cat"); list.Insert(catIndex, "dog");
作者所追求的效果是在“猫”之前插入“狗”,但在multithreading环境中,这两行代码之间的列表中可能会发生任何事情。 例如,另一个线程可能会做list.RemoveAt(0)
,将整个列表向左移动,但最关键的是, catIndex不会改变。 这里的影响是Insert
操作实际上将“狗”放在猫之后,而不是放在它之前。
你看到的这个问题的答案是好的,但是正如上面所显示的那样,它们不能提供可靠的结果。 如果你真的想在multithreading环境中使用类似列表的语义,你不能通过在列表实现方法中join锁来达到目的。 您必须确保您使用的任何索引完全位于锁的上下文中。 结果是你可以在一个multithreading环境中使用一个具有正确locking的列表,但是列表本身不能在这个世界中存在。
如果你认为你需要一个并发列表,实际上只有两种可能性:
- 你真正需要的是一个ConcurrentBag
- 您需要创build自己的集合,可能是使用List和您自己的并发控制来实现的。
如果你有一个ConcurrentBag并且处于需要将它作为IList传递的位置,那么你有一个问题,因为你调用的方法已经指定他们可能会尝试像上面那样用cat&狗。 在大多数的世界里,这意味着你所调用的方法根本不能在multithreading环境中工作。 这意味着你要么重构它,要么就是,如果你不能,你将必须非常小心地处理它。 你几乎肯定会被要求用自己的锁创build你自己的集合,并在锁内调用违规的方法。
有些人喜欢一些商品点(和我的一些想法):
- 它可以看起来疯狂无法随机accesser(索引),但对我来说似乎很好。 您只需要认为在multithreading集合上有很多方法可能会像Indexer和Delete一样失败。 您也可以定义写入访问器的失败(回退)动作,如“失败”或简单地“添加到最后”。
- 这不是因为它是一个multithreading的集合,它将始终用于multithreading环境中。 或者它也可以被一个作者和一个读者使用。
- 另一种能够安全使用索引器的方法是使用其根(如果公开的话)将操作封装到集合的锁中。
- 对于很多人来说,制作一个rootLock是一个很好的习惯。 我并不十分确定这一点,因为如果它被隐藏了,你可以为用户删除很多的灵活性。 我们总是要记住,multithreading编程不适合任何人。 我们无法防止各种错误的使用。
- 微软将不得不做一些工作,并定义一些新的标准来介绍正确使用Multithreaded集合。 首先IEnumerator不应该有一个moveNext,但应该有一个GetNext返回true或false,并得到一个typesT的out参数(这样的迭代不会被阻止了)。 此外,微软已经在内部使用“使用”的foreach,但有时直接使用IEnumerator没有用“使用”(集合视图中的错误,可能在更多的地方)包装它 – 包装IEnumerator的使用是微软推荐的实践。 这个bug删除安全迭代器的好处…迭代器locking构造函数中的集合,并在其Dispose方法上解锁 – 用于阻塞foreach方法。
这不是一个答案。 这只是评论,并不适合某个特定的地方。
…我的结论是,微软必须对“foreach”进行一些深层次的改变,以使MultiThreaded集合更易于使用。 它也必须遵循自己的IEnumerator使用规则。 在这之前,我们可以轻松地编写一个MultiThreadList,它将使用阻塞迭代器,但不会遵循“IList”。 相反,您将不得不定义自己的“IListPersonnal”接口,可能无法在“插入”,“删除”和随机访问器(索引器)上失败。 但是,如果不是标准的话,谁会想要使用它?
我实施了一个类似于布赖恩的 。 我的是不同的:
- 我直接pipe理arrays。
- 我不inputtry块内的锁。
- 我使用
yield return
来产生一个枚举器。 - 我支持lockingrecursion。 这允许在迭代期间从列表读取。
- 我尽可能使用可升级的读锁。
-
DoSync
和GetSync
方法允许需要独占访问列表的顺序交互。
代码 :
public class ConcurrentList<T> : IList<T>, IDisposable { private ReaderWriterLockSlim _lock = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion); private int _count = 0; public int Count { get { _lock.EnterReadLock(); try { return _count; } finally { _lock.ExitReadLock(); } } } public int InternalArrayLength { get { _lock.EnterReadLock(); try { return _arr.Length; } finally { _lock.ExitReadLock(); } } } private T[] _arr; public ConcurrentList(int initialCapacity) { _arr = new T[initialCapacity]; } public ConcurrentList():this(4) { } public ConcurrentList(IEnumerable<T> items) { _arr = items.ToArray(); _count = _arr.Length; } public void Add(T item) { _lock.EnterWriteLock(); try { var newCount = _count + 1; EnsureCapacity(newCount); _arr[_count] = item; _count = newCount; } finally { _lock.ExitWriteLock(); } } public void AddRange(IEnumerable<T> items) { if (items == null) throw new ArgumentNullException("items"); _lock.EnterWriteLock(); try { var arr = items as T[] ?? items.ToArray(); var newCount = _count + arr.Length; EnsureCapacity(newCount); Array.Copy(arr, 0, _arr, _count, arr.Length); _count = newCount; } finally { _lock.ExitWriteLock(); } } private void EnsureCapacity(int capacity) { if (_arr.Length >= capacity) return; int doubled; checked { try { doubled = _arr.Length * 2; } catch (OverflowException) { doubled = int.MaxValue; } } var newLength = Math.Max(doubled, capacity); Array.Resize(ref _arr, newLength); } public bool Remove(T item) { _lock.EnterUpgradeableReadLock(); try { var i = IndexOfInternal(item); if (i == -1) return false; _lock.EnterWriteLock(); try { RemoveAtInternal(i); return true; } finally { _lock.ExitWriteLock(); } } finally { _lock.ExitUpgradeableReadLock(); } } public IEnumerator<T> GetEnumerator() { _lock.EnterReadLock(); try { for (int i = 0; i < _count; i++) // deadlocking potential mitigated by lock recursion enforcement yield return _arr[i]; } finally { _lock.ExitReadLock(); } } IEnumerator IEnumerable.GetEnumerator() { return this.GetEnumerator(); } public int IndexOf(T item) { _lock.EnterReadLock(); try { return IndexOfInternal(item); } finally { _lock.ExitReadLock(); } } private int IndexOfInternal(T item) { return Array.FindIndex(_arr, 0, _count, x => x.Equals(item)); } public void Insert(int index, T item) { _lock.EnterUpgradeableReadLock(); try { if (index > _count) throw new ArgumentOutOfRangeException("index"); _lock.EnterWriteLock(); try { var newCount = _count + 1; EnsureCapacity(newCount); // shift everything right by one, starting at index Array.Copy(_arr, index, _arr, index + 1, _count - index); // insert _arr[index] = item; _count = newCount; } finally { _lock.ExitWriteLock(); } } finally { _lock.ExitUpgradeableReadLock(); } } public void RemoveAt(int index) { _lock.EnterUpgradeableReadLock(); try { if (index >= _count) throw new ArgumentOutOfRangeException("index"); _lock.EnterWriteLock(); try { RemoveAtInternal(index); } finally { _lock.ExitWriteLock(); } } finally { _lock.ExitUpgradeableReadLock(); } } private void RemoveAtInternal(int index) { Array.Copy(_arr, index + 1, _arr, index, _count - index-1); _count--; // release last element Array.Clear(_arr, _count, 1); } public void Clear() { _lock.EnterWriteLock(); try { Array.Clear(_arr, 0, _count); _count = 0; } finally { _lock.ExitWriteLock(); } } public bool Contains(T item) { _lock.EnterReadLock(); try { return IndexOfInternal(item) != -1; } finally { _lock.ExitReadLock(); } } public void CopyTo(T[] array, int arrayIndex) { _lock.EnterReadLock(); try { if(_count > array.Length - arrayIndex) throw new ArgumentException("Destination array was not long enough."); Array.Copy(_arr, 0, array, arrayIndex, _count); } finally { _lock.ExitReadLock(); } } public bool IsReadOnly { get { return false; } } public T this[int index] { get { _lock.EnterReadLock(); try { if (index >= _count) throw new ArgumentOutOfRangeException("index"); return _arr[index]; } finally { _lock.ExitReadLock(); } } set { _lock.EnterUpgradeableReadLock(); try { if (index >= _count) throw new ArgumentOutOfRangeException("index"); _lock.EnterWriteLock(); try { _arr[index] = value; } finally { _lock.ExitWriteLock(); } } finally { _lock.ExitUpgradeableReadLock(); } } } public void DoSync(Action<ConcurrentList<T>> action) { GetSync(l => { action(l); return 0; }); } public TResult GetSync<TResult>(Func<ConcurrentList<T>,TResult> func) { _lock.EnterWriteLock(); try { return func(this); } finally { _lock.ExitWriteLock(); } } public void Dispose() { _lock.Dispose(); } }
在顺序执行代码中,所使用的数据结构不同于(良好编写的)并行执行的代码。 原因是顺序代码隐含着顺序。 然而并发代码并不意味着任何顺序。 更好,但它意味着缺乏任何明确的顺序!
由于这个原因,隐含顺序的数据结构(如List)对于解决并发问题并不是很有用。 列表意味着顺序,但是没有明确定义那个顺序是什么。 正因为如此,操纵列表的代码的执行顺序将(在一定程度上)确定列表的隐式顺序,这与有效的并行解决scheme直接冲突。
记住并发是数据问题,而不是代码问题! 你不能先实现代码(或重写现有的顺序代码),并得到一个devise良好的并发解决scheme。 您需要首先devise数据结构,同时牢记并发系统中不存在隐式sorting。