Je veux limiter à 30 le nombre total de requêtes que je soumets à mon serveur de base de données dans tous les blocs DataFlow. Dans le scénario suivant, l'étranglement de 30 tâches simultanées se fait par bloc, de sorte que le nombre de tâches simultanées atteint toujours 60 pendant l'exécution. Je pourrais évidemment limiter mon parallélisme à 15 par bloc pour obtenir un total de 30 à l'échelle du système, mais ce ne serait pas optimal.
Comment faire pour que ça marche ? Dois-je limiter (et bloquer) mes attentes en utilisant SemaphoreSlim, etc., ou existe-t-il une approche DataFlow intrinsèque qui fonctionne mieux ?
public class TPLTest
{
private long AsyncCount = 0;
private long MaxAsyncCount = 0;
private long TaskId = 0;
private object MetricsLock = new object();
public async Task Start()
{
ExecutionDataflowBlockOptions execOption = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 30 };
DataflowLinkOptions linkOption = new DataflowLinkOptions() { PropagateCompletion = true };
var doFirstIOWorkAsync = new TransformBlock<Data, Data>(async data => await DoIOBoundWorkAsync(data), execOption);
var doCPUWork = new TransformBlock<Data, Data>(data => DoCPUBoundWork(data));
var doSecondIOWorkAsync = new TransformBlock<Data, Data>(async data => await DoIOBoundWorkAsync(data), execOption);
var doProcess = new TransformBlock<Data, string>(i => $"Task finished, ID = : {i.TaskId}");
var doPrint = new ActionBlock<string>(s => Debug.WriteLine(s));
doFirstIOWorkAsync.LinkTo(doCPUWork, linkOption);
doCPUWork.LinkTo(doSecondIOWorkAsync, linkOption);
doSecondIOWorkAsync.LinkTo(doProcess, linkOption);
doProcess.LinkTo(doPrint, linkOption);
int taskCount = 150;
for (int i = 0; i < taskCount; i++)
{
await doFirstIOWorkAsync.SendAsync(new Data() { Delay = 2500 });
}
doFirstIOWorkAsync.Complete();
await doPrint.Completion;
Debug.WriteLine("Max concurrent tasks: " + MaxAsyncCount.ToString());
}
private async Task<Data> DoIOBoundWorkAsync(Data data)
{
lock(MetricsLock)
{
AsyncCount++;
if (AsyncCount > MaxAsyncCount)
MaxAsyncCount = AsyncCount;
}
if (data.TaskId <= 0)
data.TaskId = Interlocked.Increment(ref TaskId);
await Task.Delay(data.Delay);
lock (MetricsLock)
AsyncCount--;
return data;
}
private Data DoCPUBoundWork(Data data)
{
data.Step = 1;
return data;
}
}
Classe de données :
public class Data
{
public int Delay { get; set; }
public long TaskId { get; set; }
public int Step { get; set; }
}
Point de départ :
TPLTest tpl = new TPLTest();
await tpl.Start();