Concurrent Array (Ordered) Class with Indexing, and without Blocking
Updated: Aug-2,2021
RandomBigInt rng = new RandomBigInt(64); ulong[] sla1 = Enumerable.Range(0, 1000000).AsParallel().WithDegreeOfParallelism(10).Select(i => (ulong)rng.Next()).ToArray(); CcArray<ulong> ccl = new CcArray<ulong>(1000000); Example 1: Parallel.ForEach(sla1, new ParallelOptions { MaxDegreeOfParallelism = 10 }, (v, p, i) => { ccl.Add(v, (int)i); }); Example 2: Parallel.For(0,sla1.Length,i=> { ccl.Add(sla1[i], (int)i); }); Example 3: sla1.AsEnumerable().Select( (v,i)=> new {value=v,index=i}).AsParallel().WithDegreeOfParallelism(10).ForAll(x=> { ccl[x.index] = x.value; }); Var asa = new ulong[1000000]; for (var i = 0; i < 1000000; ++i) asa[i] = ccl[i]; var exc = sla1.Except(asa).ToArray();
using System; using System.Collections; using System.Collections.Generic; using System.Diagnostics; using System.Threading; [DebuggerDisplay("Count = {" + nameof(Count) + "}")] public class CcArray<T> : IEnumerable<T> { private readonly int _size; private volatile int _activeThreadPosition; private volatile int[] _activeThreads; private volatile IntSet18<T>[] _array; public volatile int NumberOfActiveThreads; public CcArray(int size) { ThreadPool.GetMaxThreads(out var nW, out var nI); _array = new IntSet18<T>[nW]; _size = size; NumberOfActiveThreads = 0; _activeThreadPosition = 0; _activeThreads = new int[Environment.ProcessorCount]; _activeThreads.Fill(-1); } public T this[int index] { get { var (idx, trd) = FindIndex(index); return idx != -1 ? _array[trd].Slots[idx].Value : default; } set { var (idx, trd) = FindIndex(index); if (idx != -1) _array[trd].Slots[idx].Value = value; else Add(value, index); } } public int Count { get { if (_activeThreads == null) throw new Exception("Initialization has not completed. Note: The constructor cannot be void."); var totalCount = 0; for (var i = 0; i < _activeThreads.Length; ++i) if (_activeThreads[i] != -1) if (_array[_activeThreads[i]].Allocated) totalCount += _array[_activeThreads[i]].Count; return totalCount; } } public IEnumerator<T> GetEnumerator() { return GetEnum(); } IEnumerator IEnumerable.GetEnumerator() { return GetEnum(); } public void Add(T item, int index, bool updateValue = false) { if (_activeThreads == null) throw new Exception("Initialization has not completed. Note: The constructor cannot be void."); var id = ProcessThread(); var (idx, trd) = FindIndex(index); if (idx == -1) _array[id].Add(index, item); else if (updateValue) _array[trd].Slots[idx].Value = item; } public (int idx, int trd) FindIndex(int index) { if (_activeThreads == null) throw new Exception("Initialization has not completed. Note: The constructor cannot be void."); for (var i = 0; i < _activeThreads.Length; ++i) if (_activeThreads[i] != -1) if (_array[_activeThreads[i]].Allocated) { var idx = _array[_activeThreads[i]].FindKey(index); if (idx != -1) return (idx, _activeThreads[i]); } return (-1, -1); } private int ProcessThread() { var id = Thread.CurrentThread.ManagedThreadId; if (!_array[id].Allocated) { Interlocked.Increment(ref NumberOfActiveThreads); _array[id] = new IntSet18<T>(_size / NumberOfActiveThreads); if (_activeThreadPosition >= _activeThreads.Length) { var newActiveThreads = new int[_activeThreads.Length << 1]; newActiveThreads.Fill(-1); for (var i = 0; i < _activeThreads.Length; ++i) if (_activeThreads[i] != -1) newActiveThreads[i] = _activeThreads[i]; _activeThreads = newActiveThreads; } _activeThreads[_activeThreadPosition] = id; Interlocked.Increment(ref _activeThreadPosition); } return id; } public bool Contains(T item) { if (_activeThreads == null) throw new Exception("Initialization has not completed. Note: The constructor cannot be void."); var eComparer = EqualityComparer<T>.Default; for (var i = 0; i < _activeThreads.Length; ++i) if (_activeThreads[i] != -1) if (_array[_activeThreads[i]].Allocated) for (var j = 0; j < _array[_activeThreads[i]].Count; ++j) if (eComparer.Equals(_array[_activeThreads[i]].Slots[j].Value, item)) return true; return false; } public T[] ToArray() { if (_activeThreads == null) throw new Exception("Initialization has not completed. Note: The constructor cannot be void."); var outputArray = new T[Count]; var ptr = 0; for (var i = 0; i < _activeThreads.Length; ++i) if (_activeThreads[i] != -1) if (_array[_activeThreads[i]].Allocated) foreach (var v in _array[_activeThreads[i]]) outputArray[ptr++] = v.Value; return outputArray; } private IEnumerator<T> GetEnum() { var array = ToArray(); foreach (var i in array) yield return i; } [DebuggerDisplay("Count = {" + nameof(Count) + "}")] internal struct IntSet18<T1> { private int[] _buckets; internal Slot[] Slots; public bool Allocated; public IntSet18(int size) { _buckets = new int[size]; Slots = new Slot[size]; Count = 0; Allocated = true; } public int Count { get; private set; } public IEnumerator<KeyValuePair<int, T1>> GetEnumerator() { for (var i = 0; i < Count; i++) if (Slots[i].Key >= 0) yield return new KeyValuePair<int, T1>(Slots[i].Key, Slots[i].Value); } public bool Add(int key, T1 value) { if (FindKey(key) != -1) return true; if (Count >= Slots.Length) Resize(); var pos = key % _buckets.Length; Slots[Count].Next = _buckets[pos] - 1; Slots[Count].Key = key; Slots[Count].Value = value; _buckets[pos] = Count + 1; ++Count; return false; } private void Resize() { var newSize = _buckets.Length + _buckets.Length / 4 * 3; var newSlots = new Slot[newSize]; var newBuckets = new int[newSize]; var newCount = 0; var en = GetEnumerator(); while (en.MoveNext()) { var key = en.Current.Key; var value = en.Current.Value; var pos = key % newBuckets.Length; newSlots[newCount].Next = newBuckets[pos] - 1; newSlots[newCount].Key = key; newSlots[newCount].Value = value; newBuckets[pos] = newCount + 1; ++newCount; } Slots = newSlots; _buckets = newBuckets; Count = newCount; } public int FindKey(int key) { for (var position = _buckets[key % _buckets.Length] - 1; position >= 0; position = Slots[position].Next) if (Equals(Slots[position].Key, key)) return position; return -1; } internal struct Slot { public int Next; public int Key; public T1 Value; } } }