Concurrent Collection Class without Blocking
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
[DebuggerDisplay("Count = {" + nameof(Count) + "}")]
public class CcCollection<T> : IEnumerable<T>
{
private readonly IntA<T>[] _array;
private readonly int _size;
private volatile int[] _activeThreads;
private volatile int _bP;
public volatile int NumberOfActiveThreads;
public CcCollection() : this(1024)
{
}
public CcCollection(int size)
{
ThreadPool.GetMaxThreads(out var nW, out var nI);
_array = new IntA<T>[nW];
_size = size;
NumberOfActiveThreads = 0;
_bP = 0;
_activeThreads = new int[Environment.ProcessorCount];
_activeThreads.Fill(-1);
}
public int Count
{
get
{
var totalCount = 0;
for (var i = 0; i < _activeThreads.Length; ++i)
if (_activeThreads[i] != -1)
if (_array[_activeThreads[i]].Allocated)
totalCount += _array[_activeThreads[i]].Count;
return totalCount;
}
}
public IEnumerator<T> GetEnumerator()
{
return GetEnum();
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnum();
}
public void Add(T item)
{
var id = Thread.CurrentThread.ManagedThreadId;
if (!_array[id].Allocated)
{
_array[id] = new IntA<T>(_size);
Interlocked.Increment(ref NumberOfActiveThreads);
if (_bP >= _activeThreads.Length)
{
var nAtA = new int[_activeThreads.Length << 1];
nAtA.Fill(-1);
for (var i = 0; i < _activeThreads.Length; ++i)
if (_activeThreads[i] != -1)
nAtA[i] = _activeThreads[i];
_activeThreads = nAtA;
}
_activeThreads[_bP] = id;
Interlocked.Increment(ref _bP);
}
_array[id].Add(item);
}
public IEnumerator<T> GetEnum()
{
var arr = ToArray();
foreach (var i in arr)
yield return i;
}
public T[] ToArray()
{
var totalCount = 0;
for (var i = 0; i < _activeThreads.Length; ++i)
if (_activeThreads[i] != -1)
if (_array[_activeThreads[i]].Allocated)
totalCount += _array[_activeThreads[i]].Count;
var ta = new T[totalCount];
var ptr = 0;
for (var i = 0; i < _activeThreads.Length; ++i)
if (_activeThreads[i] != -1)
if (_array[_activeThreads[i]].Allocated)
{
var it = _array[_activeThreads[i]].ToArray();
for (var j = 0; j < it.Length; ++j)
ta[ptr++] = it[j];
}
return ta;
}
internal struct IntA<T> : IEnumerable<T>
{
private T[] _array;
public bool Allocated;
public IntA(int cap)
{
Count = 0;
_array = new T[cap];
Allocated = true;
}
public int Count
{
get;
private set;
}
public int Length => _array.Length;
public T this[int index]
{
get
{
if (index > _array.Length)
throw new Exception("Error: Index out of range.");
return _array[index];
}
set
{
if (index > _array.Length)
throw new Exception("Error: Index out of range.");
_array[index] = value;
}
}
IEnumerator<T> IEnumerable<T>.GetEnumerator()
{
return GetEnumerator();
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
public void Add(T item)
{
if (Count >= _array.Length)
Array.Resize(ref _array, _array.Length << 1);
_array[Count] = item;
Count++;
}
public T[] ToArray()
{
var newtArray = new T[Count];
Array.Copy(_array, 0, newtArray, 0, Count);
return newtArray;
}
public void Trim()
{
var newtArray = new T[Count];
Array.Copy(_array, 0, newtArray, 0, Count);
_array = newtArray;
}
public void Clear()
{
Array.Clear(_array, 0, Count);
Count = 0;
}
public void Zero()
{
_array = Array.Empty<T>();
Count = 0;
}
public IEnumerator<T> GetEnumerator()
{
return GetEnum();
}
public IEnumerator<T> GetEnum()
{
for (var i = 0; i < Count; i++)
yield return _array[i];
}
}
}