限制C#中并行线程的数量
我正在编写一个C#程序,通过FTP生成并上传50万个文件。 我想要并行处理4个文件,因为机器有4个内核,生成文件需要更长的时间。 是否有可能将下面的Powershell示例转换为C#? 还是有没有更好的框架,如C#中的Actor框架(如F#MailboxProcessor)?
Powershell示例
$maxConcurrentJobs = 3; # Read the input and queue it up $jobInput = get-content .\input.txt $queue = [System.Collections.Queue]::Synchronized( (New-Object System.Collections.Queue) ) foreach($item in $jobInput) { $queue.Enqueue($item) } # Function that pops input off the queue and starts a job with it function RunJobFromQueue { if( $queue.Count -gt 0) { $j = Start-Job -ScriptBlock {param($x); Get-WinEvent -LogName $x} -ArgumentList $queue.Dequeue() Register-ObjectEvent -InputObject $j -EventName StateChanged -Action { RunJobFromQueue; Unregister-Event $eventsubscriber.SourceIdentifier; Remove-Job $eventsubscriber.SourceIdentifier } | Out-Null } } # Start up to the max number of concurrent jobs # Each job will take care of running the rest for( $i = 0; $i -lt $maxConcurrentJobs; $i++ ) { RunJobFromQueue }
更新:
与远程FTP服务器的连接可能会很慢,所以我想限制FTP上传处理。
假设你正在用TPL构build它,你可以将ParallelOptions.MaxDegreesOfParallelism设置为你想要的。
Parallel.For代码示例。
任务并行库是你的朋友在这里。 请参阅此链接,其中介绍了可用的内容。 基本上框架4随它一起优化了这些基本上后台线程池线程与运行机器上处理器的数量。
也许是沿着以下方向的东西:
ParallelOptions options = new ParallelOptions(); options.MaxDegreeOfParallelism = 4;
然后在你的循环中像这样:
Parallel.Invoke(options, () => new WebClient().Upload("http://www.linqpad.net", "lp.html"), () => new WebClient().Upload("http://www.jaoo.dk", "jaoo.html"));
如果您使用.Net 4.0,则可以使用并行库
假设你正在迭代通过50万个文件,你可以使用Parallel Foreach来“平行”迭代,或者你可以看看PLinq这里比较两者
本质上,您将要为每个要上传的文件创build一个Action或Task,将它们放在List中,然后处理该列表,限制可以并行处理的数量。
我的博客文章展示了如何使用“任务”和“操作”来完成此任务,并提供了一个示例项目,您可以下载并运行这些项目以查看这两个实例。
与行动
如果使用Actions,则可以使用内置的.Net Parallel.Invoke函数。 这里我们限制它并行运行最多4个线程。
var listOfActions = new List<Action>(); foreach (var file in files) { var localFile = file; // Note that we create the Task here, but do not start it. listOfTasks.Add(new Task(() => UploadFile(localFile))); } var options = new ParallelOptions {MaxDegreeOfParallelism = 4}; Parallel.Invoke(options, listOfActions.ToArray());
虽然这个选项不支持asynchronous,我假设你是FileUpload函数,所以你可能想使用下面的Task示例。
与任务
随着任务没有内置的function。 不过,您可以使用我在我的博客上提供的那个。
/// <summary> /// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel. /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> /// </summary> /// <param name="tasksToRun">The tasks to run.</param> /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> /// <param name="cancellationToken">The cancellation token.</param> public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken()) { await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken); } /// <summary> /// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel. /// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para> /// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para> /// </summary> /// <param name="tasksToRun">The tasks to run.</param> /// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param> /// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param> /// <param name="cancellationToken">The cancellation token.</param> public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken()) { // Convert to a list of tasks so that we don't enumerate over it multiple times needlessly. var tasks = tasksToRun.ToList(); using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel)) { var postTaskTasks = new List<Task>(); // Have each task notify the throttler when it completes so that it decrements the number of tasks currently running. tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release()))); // Start running each task. foreach (var task in tasks) { // Increment the number of tasks currently running and wait if too many are running. await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken); cancellationToken.ThrowIfCancellationRequested(); task.Start(); } // Wait for all of the provided tasks to complete. // We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object. await Task.WhenAll(postTaskTasks.ToArray()); } }
然后创build你的任务列表,并调用函数让它们运行,每次最多同时运行4次,你可以这样做:
var listOfTasks = new List<Task>(); foreach (var file in files) { var localFile = file; // Note that we create the Task here, but do not start it. listOfTasks.Add(new Task(async () => await UploadFile(localFile))); } await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 4);
另外,因为这个方法支持asynchronous,所以不会像使用Parallel.Invoke或Parallel.ForEach那样阻塞UI线程。
我已经使用BlockingCollection作为线程pipe理器的技巧。 实施和处理这项工作相当简单。 它只是接受任务对象,并将一个整数值添加到阻塞列表中,将运行线程数增加1.当线程完成时,它将使对象出队并在添加操作时释放该块以用于即将到来的任务。
public class BlockingTaskQueue { private BlockingCollection<int> threadManager { get; set; } = null; public bool IsWorking { get { return threadManager.Count > 0 ? true : false; } } public BlockingTaskQueue(int maxThread) { threadManager = new BlockingCollection<int>(maxThread); } public async Task AddTask(Task task) { Task.Run(() => { Run(task); }); } private bool Run(Task task) { try { threadManager.Add(1); task.Start(); task.Wait(); return true; } catch (Exception ex) { return false; } finally { threadManager.Take(); } } }