Concurrent Array (Ordered) Class with Indexing, and without Blocking
Updated: Aug-2,2021
RandomBigInt rng = new RandomBigInt(64);
ulong[] sla1 = Enumerable.Range(0, 1000000).AsParallel().WithDegreeOfParallelism(10).Select(i => (ulong)rng.Next()).ToArray();
CcArray<ulong> ccl = new CcArray<ulong>(1000000);
Example 1:
Parallel.ForEach(sla1, new ParallelOptions { MaxDegreeOfParallelism = 10 }, (v, p, i) =>
{
ccl.Add(v, (int)i);
});
Example 2:
Parallel.For(0,sla1.Length,i=>
{
ccl.Add(sla1[i], (int)i);
});
Example 3:
sla1.AsEnumerable().Select( (v,i)=> new {value=v,index=i}).AsParallel().WithDegreeOfParallelism(10).ForAll(x=>
{
ccl[x.index] = x.value;
});
Var asa = new ulong[1000000];
for (var i = 0; i < 1000000; ++i)
asa[i] = ccl[i];
var exc = sla1.Except(asa).ToArray();
using System;
using System.Collections;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
[DebuggerDisplay("Count = {" + nameof(Count) + "}")]
public class CcArray<T> : IEnumerable<T>
{
private readonly int _size;
private volatile int _activeThreadPosition;
private volatile int[] _activeThreads;
private volatile IntSet18<T>[] _array;
public volatile int NumberOfActiveThreads;
public CcArray(int size)
{
ThreadPool.GetMaxThreads(out var nW, out var nI);
_array = new IntSet18<T>[nW];
_size = size;
NumberOfActiveThreads = 0;
_activeThreadPosition = 0;
_activeThreads = new int[Environment.ProcessorCount];
_activeThreads.Fill(-1);
}
public T this[int index]
{
get
{
var (idx, trd) = FindIndex(index);
return idx != -1 ? _array[trd].Slots[idx].Value : default;
}
set
{
var (idx, trd) = FindIndex(index);
if (idx != -1)
_array[trd].Slots[idx].Value = value;
else Add(value, index);
}
}
public int Count
{
get
{
if (_activeThreads == null)
throw new Exception("Initialization has not completed. Note: The constructor cannot be void.");
var totalCount = 0;
for (var i = 0; i < _activeThreads.Length; ++i)
if (_activeThreads[i] != -1)
if (_array[_activeThreads[i]].Allocated)
totalCount += _array[_activeThreads[i]].Count;
return totalCount;
}
}
public IEnumerator<T> GetEnumerator()
{
return GetEnum();
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnum();
}
public void Add(T item, int index, bool updateValue = false)
{
if (_activeThreads == null)
throw new Exception("Initialization has not completed. Note: The constructor cannot be void.");
var id = ProcessThread();
var (idx, trd) = FindIndex(index);
if (idx == -1)
_array[id].Add(index, item);
else if (updateValue)
_array[trd].Slots[idx].Value = item;
}
public (int idx, int trd) FindIndex(int index)
{
if (_activeThreads == null)
throw new Exception("Initialization has not completed. Note: The constructor cannot be void.");
for (var i = 0; i < _activeThreads.Length; ++i)
if (_activeThreads[i] != -1)
if (_array[_activeThreads[i]].Allocated)
{
var idx = _array[_activeThreads[i]].FindKey(index);
if (idx != -1)
return (idx, _activeThreads[i]);
}
return (-1, -1);
}
private int ProcessThread()
{
var id = Thread.CurrentThread.ManagedThreadId;
if (!_array[id].Allocated)
{
Interlocked.Increment(ref NumberOfActiveThreads);
_array[id] = new IntSet18<T>(_size / NumberOfActiveThreads);
if (_activeThreadPosition >= _activeThreads.Length)
{
var newActiveThreads = new int[_activeThreads.Length << 1];
newActiveThreads.Fill(-1);
for (var i = 0; i < _activeThreads.Length; ++i)
if (_activeThreads[i] != -1)
newActiveThreads[i] = _activeThreads[i];
_activeThreads = newActiveThreads;
}
_activeThreads[_activeThreadPosition] = id;
Interlocked.Increment(ref _activeThreadPosition);
}
return id;
}
public bool Contains(T item)
{
if (_activeThreads == null)
throw new Exception("Initialization has not completed. Note: The constructor cannot be void.");
var eComparer = EqualityComparer<T>.Default;
for (var i = 0; i < _activeThreads.Length; ++i)
if (_activeThreads[i] != -1)
if (_array[_activeThreads[i]].Allocated)
for (var j = 0; j < _array[_activeThreads[i]].Count; ++j)
if (eComparer.Equals(_array[_activeThreads[i]].Slots[j].Value, item))
return true;
return false;
}
public T[] ToArray()
{
if (_activeThreads == null)
throw new Exception("Initialization has not completed. Note: The constructor cannot be void.");
var outputArray = new T[Count];
var ptr = 0;
for (var i = 0; i < _activeThreads.Length; ++i)
if (_activeThreads[i] != -1)
if (_array[_activeThreads[i]].Allocated)
foreach (var v in _array[_activeThreads[i]])
outputArray[ptr++] = v.Value;
return outputArray;
}
private IEnumerator<T> GetEnum()
{
var array = ToArray();
foreach (var i in array)
yield return i;
}
[DebuggerDisplay("Count = {" + nameof(Count) + "}")]
internal struct IntSet18<T1>
{
private int[] _buckets;
internal Slot[] Slots;
public bool Allocated;
public IntSet18(int size)
{
_buckets = new int[size];
Slots = new Slot[size];
Count = 0;
Allocated = true;
}
public int Count
{
get;
private set;
}
public IEnumerator<KeyValuePair<int, T1>> GetEnumerator()
{
for (var i = 0; i < Count; i++)
if (Slots[i].Key >= 0)
yield return new KeyValuePair<int, T1>(Slots[i].Key, Slots[i].Value);
}
public bool Add(int key, T1 value)
{
if (FindKey(key) != -1)
return true;
if (Count >= Slots.Length)
Resize();
var pos = key % _buckets.Length;
Slots[Count].Next = _buckets[pos] - 1;
Slots[Count].Key = key;
Slots[Count].Value = value;
_buckets[pos] = Count + 1;
++Count;
return false;
}
private void Resize()
{
var newSize = _buckets.Length + _buckets.Length / 4 * 3;
var newSlots = new Slot[newSize];
var newBuckets = new int[newSize];
var newCount = 0;
var en = GetEnumerator();
while (en.MoveNext())
{
var key = en.Current.Key;
var value = en.Current.Value;
var pos = key % newBuckets.Length;
newSlots[newCount].Next = newBuckets[pos] - 1;
newSlots[newCount].Key = key;
newSlots[newCount].Value = value;
newBuckets[pos] = newCount + 1;
++newCount;
}
Slots = newSlots;
_buckets = newBuckets;
Count = newCount;
}
public int FindKey(int key)
{
for (var position = _buckets[key % _buckets.Length] - 1; position >= 0; position = Slots[position].Next)
if (Equals(Slots[position].Key, key))
return position;
return -1;
}
internal struct Slot
{
public int Next;
public int Key;
public T1 Value;
}
}
}