mirror of
https://github.com/actions/runner.git
synced 2026-07-03 11:06:08 +08:00
Background steps execution engine (#4476)
This commit is contained in:
21
src/Runner.Worker/BackgroundStepControlFlowData.cs
Normal file
21
src/Runner.Worker/BackgroundStepControlFlowData.cs
Normal file
@@ -0,0 +1,21 @@
|
||||
using System;
|
||||
|
||||
namespace GitHub.Runner.Worker
|
||||
{
|
||||
/// <summary>
|
||||
/// Pure data for control-flow steps (wait, wait-all, cancel).
|
||||
/// Type uses Pipelines.BackgroundControlTypes string constants.
|
||||
/// </summary>
|
||||
public sealed class BackgroundStepControlFlowData
|
||||
{
|
||||
public string Type { get; set; }
|
||||
public Guid StepId { get; set; }
|
||||
public string StepName { get; set; }
|
||||
|
||||
// Target step IDs (for wait: steps to wait for; for cancel: steps to cancel)
|
||||
public string[] StepIds { get; set; }
|
||||
|
||||
// Parallel group ID for grouping steps in the UI
|
||||
public string ParallelGroupId { get; set; }
|
||||
}
|
||||
}
|
||||
366
src/Runner.Worker/BackgroundStepCoordinator.cs
Normal file
366
src/Runner.Worker/BackgroundStepCoordinator.cs
Normal file
@@ -0,0 +1,366 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using GitHub.DistributedTask.WebApi;
|
||||
using GitHub.Runner.Common;
|
||||
using GitHub.Runner.Common.Util;
|
||||
using GitHub.Runner.Sdk;
|
||||
using Pipelines = GitHub.DistributedTask.Pipelines;
|
||||
|
||||
namespace GitHub.Runner.Worker
|
||||
{
|
||||
[ServiceLocator(Default = typeof(BackgroundStepCoordinator))]
|
||||
public interface IBackgroundStepCoordinator : IRunnerService
|
||||
{
|
||||
void InitializeCoordinator(int maxConcurrent);
|
||||
void StartBackgroundStep(IStep step, CancellationToken jobCancellationToken);
|
||||
Task<TaskResult> WaitForUnwaitedStepsAsync(CancellationToken cancellationToken);
|
||||
Task RunControlFlowAsync(IExecutionContext stepContext, object data);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Coordinates background step execution, waiting, cancellation, and deferred state.
|
||||
/// Extracted from StepsRunner so the main step loop stays clean.
|
||||
/// </summary>
|
||||
public sealed class BackgroundStepCoordinator : RunnerService, IBackgroundStepCoordinator
|
||||
{
|
||||
private const int DefaultMaxBackgroundSteps = 10;
|
||||
private readonly Dictionary<string, (IStep Step, Task Task, CancellationTokenSource Cts)> _backgroundSteps = new();
|
||||
|
||||
// IDs of background steps that have already been completed (waited on or canceled).
|
||||
// Used to avoid waiting on or flushing the same step more than once.
|
||||
private readonly HashSet<string> _completedStepIds = new();
|
||||
private SemaphoreSlim _backgroundSlotSemaphore = new SemaphoreSlim(DefaultMaxBackgroundSteps);
|
||||
|
||||
/// <summary>
|
||||
/// Reset per-job state. Call at the start of each job.
|
||||
/// </summary>
|
||||
public void InitializeCoordinator(int maxConcurrent)
|
||||
{
|
||||
_backgroundSteps.Clear();
|
||||
_completedStepIds.Clear();
|
||||
var max = maxConcurrent > 0 ? maxConcurrent : DefaultMaxBackgroundSteps;
|
||||
_backgroundSlotSemaphore = new SemaphoreSlim(max);
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------
|
||||
// Starting background steps
|
||||
// -----------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Prepare and launch a background step. Does not block the caller.
|
||||
/// </summary>
|
||||
public void StartBackgroundStep(IStep step, CancellationToken jobCancellationToken)
|
||||
{
|
||||
var stepId = step.ExecutionContext?.ContextName ?? step.DisplayName;
|
||||
|
||||
// Isolate GitHubContext so concurrent steps don't overwrite each other's GITHUB_OUTPUT paths
|
||||
if (step.ExecutionContext.ExpressionValues.TryGetValue("github", out var ghCtx) && ghCtx is GitHubContext sharedGitHub)
|
||||
{
|
||||
step.ExecutionContext.ExpressionValues["github"] = sharedGitHub.ShallowCopy();
|
||||
}
|
||||
|
||||
var bgCts = CancellationTokenSource.CreateLinkedTokenSource(jobCancellationToken);
|
||||
|
||||
// Evaluate timeout on the main thread (needs expression context)
|
||||
var timeoutMinutes = 0;
|
||||
try
|
||||
{
|
||||
var templateEvaluator = step.ExecutionContext.ToPipelineTemplateEvaluator();
|
||||
timeoutMinutes = templateEvaluator.EvaluateStepTimeout(step.Timeout, step.ExecutionContext.ExpressionValues, step.ExecutionContext.ExpressionFunctions);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Trace.Info($"Error determining timeout for background step '{stepId}': {ex.Message}");
|
||||
}
|
||||
|
||||
var task = ExecuteBackgroundStepCoreAsync(step, bgCts, stepId, timeoutMinutes);
|
||||
_backgroundSteps[stepId] = (step, task, bgCts);
|
||||
Trace.Info($"Background step '{stepId}' queued (slot will be acquired asynchronously).");
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------
|
||||
// Safety net
|
||||
// -----------------------------------------------------------------
|
||||
|
||||
public async Task<TaskResult> WaitForUnwaitedStepsAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
var unwaitedIds = _backgroundSteps.Keys.Where(id => !_completedStepIds.Contains(id)).ToList();
|
||||
if (unwaitedIds.Count > 0)
|
||||
{
|
||||
Trace.Info($"Safety net: {unwaitedIds.Count} unwaited background step(s) at post-job boundary: {string.Join(", ", unwaitedIds)}");
|
||||
await WaitForStepTasksAsync(unwaitedIds, cancellationToken);
|
||||
CompleteWaitedSteps(unwaitedIds);
|
||||
}
|
||||
|
||||
// Report the merged result of all background steps; the caller merges this into the job result.
|
||||
var result = TaskResult.Succeeded;
|
||||
foreach (var (_, (step, _, _)) in _backgroundSteps)
|
||||
{
|
||||
if (step.ExecutionContext.Result.HasValue)
|
||||
{
|
||||
result = TaskResultUtil.MergeTaskResults(result, step.ExecutionContext.Result.Value);
|
||||
}
|
||||
}
|
||||
|
||||
if (result != TaskResult.Succeeded)
|
||||
{
|
||||
Trace.Info($"Background steps reported result '{result}' to caller.");
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------
|
||||
// Control-flow step dispatch
|
||||
// -----------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Execute a control-flow step (wait, wait-all, cancel) and propagate results.
|
||||
/// </summary>
|
||||
public async Task RunControlFlowAsync(IExecutionContext stepContext, object data)
|
||||
{
|
||||
var controlFlow = data as BackgroundStepControlFlowData;
|
||||
switch (controlFlow.Type)
|
||||
{
|
||||
case Pipelines.BackgroundControlTypes.Wait:
|
||||
{
|
||||
var ids = controlFlow.StepIds ?? Array.Empty<string>();
|
||||
stepContext.Output($"Waiting for background step(s) to complete: {DescribeSteps(ids)}");
|
||||
await WaitForStepTasksAsync(ids, stepContext.CancellationToken);
|
||||
stepContext.Result = CompleteWaitedSteps(ids);
|
||||
ReportCompletedSteps(stepContext, "Finished waiting for background step(s).", ids);
|
||||
break;
|
||||
}
|
||||
|
||||
case Pipelines.BackgroundControlTypes.WaitAll:
|
||||
{
|
||||
var remaining = _backgroundSteps.Keys.Where(id => !_completedStepIds.Contains(id)).ToList();
|
||||
stepContext.Output(remaining.Count > 0
|
||||
? $"Waiting for all background step(s) to complete: {DescribeSteps(remaining)}"
|
||||
: "No background steps remaining to wait for.");
|
||||
await WaitForStepTasksAsync(remaining, stepContext.CancellationToken);
|
||||
stepContext.Result = CompleteWaitedSteps(remaining);
|
||||
ReportCompletedSteps(stepContext, "Finished waiting for all background step(s).", remaining);
|
||||
break;
|
||||
}
|
||||
|
||||
case Pipelines.BackgroundControlTypes.Cancel:
|
||||
{
|
||||
var cancelIds = controlFlow.StepIds ?? Array.Empty<string>();
|
||||
stepContext.Output($"Cancelling background step(s): {DescribeSteps(cancelIds)}");
|
||||
await CancelStepsAsync(controlFlow.StepIds);
|
||||
stepContext.Result = TaskResult.Succeeded;
|
||||
ReportCompletedSteps(stepContext, "Finished cancelling background step(s).", cancelIds);
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
throw new ArgumentException($"Unknown background step control type '{controlFlow.Type}'.");
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------
|
||||
// Private helpers
|
||||
// -----------------------------------------------------------------
|
||||
|
||||
// Resolve background step IDs to their display names for customer-facing output.
|
||||
private string DescribeSteps(IEnumerable<string> stepIds)
|
||||
{
|
||||
var names = stepIds
|
||||
.Select(id => _backgroundSteps.TryGetValue(id, out var entry) ? entry.Step.DisplayName : id)
|
||||
.ToList();
|
||||
return names.Count > 0 ? string.Join(", ", names) : "(none)";
|
||||
}
|
||||
|
||||
// Emit a completion summary plus the final result of each affected background step.
|
||||
private void ReportCompletedSteps(IExecutionContext stepContext, string summary, IEnumerable<string> stepIds)
|
||||
{
|
||||
stepContext.Output(summary);
|
||||
foreach (var id in stepIds)
|
||||
{
|
||||
if (_backgroundSteps.TryGetValue(id, out var entry))
|
||||
{
|
||||
var result = entry.Step.ExecutionContext.Result?.ToString() ?? "Unknown";
|
||||
stepContext.Output($" {entry.Step.DisplayName}: {result}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task ExecuteBackgroundStepCoreAsync(
|
||||
IStep step, CancellationTokenSource bgCts,
|
||||
string stepId, int timeoutMinutes)
|
||||
{
|
||||
Trace.Info($"Background step '{stepId}' waiting for slot.");
|
||||
await _backgroundSlotSemaphore.WaitAsync(bgCts.Token);
|
||||
Trace.Info($"Background step '{stepId}' acquired slot.");
|
||||
|
||||
step.ExecutionContext.Start();
|
||||
|
||||
if (timeoutMinutes > 0)
|
||||
{
|
||||
step.ExecutionContext.SetTimeout(TimeSpan.FromMinutes(timeoutMinutes));
|
||||
}
|
||||
|
||||
using var cancelReg = bgCts.Token.Register(() =>
|
||||
{
|
||||
Trace.Info($"Background step '{stepId}': cancellation signalled, sending CancelToken to process.");
|
||||
step.ExecutionContext.CancelToken();
|
||||
});
|
||||
|
||||
TaskResult? result = null;
|
||||
try
|
||||
{
|
||||
await step.RunAsync();
|
||||
result = step.ExecutionContext.Result ?? TaskResult.Succeeded;
|
||||
}
|
||||
catch (OperationCanceledException) when (bgCts.Token.IsCancellationRequested)
|
||||
{
|
||||
result = TaskResult.Canceled;
|
||||
}
|
||||
catch (OperationCanceledException) when (step.ExecutionContext.CancellationToken.IsCancellationRequested)
|
||||
{
|
||||
Trace.Info($"Background step '{stepId}' timed out after {timeoutMinutes} minutes.");
|
||||
step.ExecutionContext.Error($"The background step '{step.DisplayName}' has timed out after {timeoutMinutes} minutes.");
|
||||
result = TaskResult.Failed;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
Trace.Info($"Background step '{stepId}' failed: {ex.Message}");
|
||||
step.ExecutionContext.Error(ex);
|
||||
result = TaskResult.Failed;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_backgroundSlotSemaphore.Release();
|
||||
|
||||
if (step.ExecutionContext.CommandResult != null)
|
||||
{
|
||||
result = TaskResultUtil.MergeTaskResults(result, step.ExecutionContext.CommandResult.Value);
|
||||
}
|
||||
|
||||
step.ExecutionContext.Result = result;
|
||||
step.ExecutionContext.ApplyContinueOnError(step.ContinueOnError);
|
||||
|
||||
step.ExecutionContext.Complete(step.ExecutionContext.Result);
|
||||
Trace.Info($"Background step '{stepId}' completed with result: {step.ExecutionContext.Result}");
|
||||
}
|
||||
}
|
||||
|
||||
private async Task CancelStepsAsync(string[] cancelStepIds)
|
||||
{
|
||||
if (cancelStepIds == null || cancelStepIds.Length == 0)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
var idsToCancel = cancelStepIds
|
||||
.Where(id => _backgroundSteps.ContainsKey(id) && !_backgroundSteps[id].Task.IsCompleted)
|
||||
.ToArray();
|
||||
|
||||
if (idsToCancel.Length > 0)
|
||||
{
|
||||
Trace.Info($"Cancelling {idsToCancel.Length} background step(s): {string.Join(", ", idsToCancel)}");
|
||||
await CancelWithGracePeriodAsync(idsToCancel);
|
||||
}
|
||||
|
||||
// Flush deferred state and mark canceled steps as completed.
|
||||
CompleteWaitedSteps(cancelStepIds);
|
||||
}
|
||||
|
||||
private async Task WaitForStepTasksAsync(IEnumerable<string> stepIds, CancellationToken cancellationToken)
|
||||
{
|
||||
var ids = stepIds.ToList();
|
||||
var tasks = new List<Task>();
|
||||
|
||||
foreach (var stepId in ids)
|
||||
{
|
||||
if (_backgroundSteps.TryGetValue(stepId, out var entry) && !entry.Task.IsCompleted)
|
||||
{
|
||||
tasks.Add(entry.Task);
|
||||
}
|
||||
else if (!_backgroundSteps.ContainsKey(stepId))
|
||||
{
|
||||
Trace.Info($"Wait references unknown background step: {stepId}");
|
||||
}
|
||||
}
|
||||
|
||||
if (tasks.Count > 0)
|
||||
{
|
||||
Trace.Info($"Waiting for {tasks.Count} background step(s)...");
|
||||
try
|
||||
{
|
||||
await Task.WhenAll(tasks).WaitAsync(cancellationToken);
|
||||
}
|
||||
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
||||
{
|
||||
Trace.Info("Wait interrupted by job cancellation — cancelling background steps.");
|
||||
await CancelWithGracePeriodAsync(ids);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async Task CancelWithGracePeriodAsync(IEnumerable<string> stepIds, double graceSeconds = 7.5)
|
||||
{
|
||||
var cancelledSteps = new List<(string StepId, Task Task, IStep Step)>();
|
||||
foreach (var stepId in stepIds)
|
||||
{
|
||||
if (_backgroundSteps.TryGetValue(stepId, out var entry) && !entry.Task.IsCompleted)
|
||||
{
|
||||
entry.Step.ExecutionContext.CancelToken();
|
||||
entry.Cts.Cancel();
|
||||
cancelledSteps.Add((stepId, entry.Task, entry.Step));
|
||||
}
|
||||
}
|
||||
|
||||
if (cancelledSteps.Count > 0)
|
||||
{
|
||||
try
|
||||
{
|
||||
await Task.WhenAll(cancelledSteps.Select(s => s.Task)).WaitAsync(TimeSpan.FromSeconds(graceSeconds));
|
||||
}
|
||||
catch (TimeoutException)
|
||||
{
|
||||
Trace.Info($"Some background steps did not terminate within {graceSeconds}s grace period.");
|
||||
|
||||
// The step tasks above never completed, so their finally block never ran and
|
||||
// their result was never set. Force-mark them as canceled so the abandoned
|
||||
// steps still report a terminal result.
|
||||
foreach (var (stepId, task, step) in cancelledSteps)
|
||||
{
|
||||
if (!task.IsCompleted && !step.ExecutionContext.Result.HasValue)
|
||||
{
|
||||
step.ExecutionContext.Result = TaskResult.Canceled;
|
||||
Trace.Info($"Background step '{stepId}' did not terminate within grace period; marking as canceled.");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private TaskResult CompleteWaitedSteps(IEnumerable<string> stepIds)
|
||||
{
|
||||
var result = TaskResult.Succeeded;
|
||||
foreach (var id in stepIds)
|
||||
{
|
||||
_completedStepIds.Add(id);
|
||||
if (_backgroundSteps.TryGetValue(id, out var entry))
|
||||
{
|
||||
// Flush deferred state for the completed step.
|
||||
entry.Step.ExecutionContext.FlushDeferredOutputs();
|
||||
entry.Step.ExecutionContext.FlushDeferredEnvironment();
|
||||
entry.Step.ExecutionContext.FlushDeferredOutcomeConclusion();
|
||||
Trace.Info($"Flushed deferred state for background step '{id}'.");
|
||||
|
||||
if (entry.Step.ExecutionContext.Result.HasValue)
|
||||
{
|
||||
result = TaskResultUtil.MergeTaskResults(result, entry.Step.ExecutionContext.Result.Value);
|
||||
}
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -77,12 +77,14 @@ namespace GitHub.Runner.Worker
|
||||
|
||||
List<string> StepEnvironmentOverrides { get; }
|
||||
|
||||
bool IsBackground { get; }
|
||||
|
||||
IExecutionContext Root { get; }
|
||||
|
||||
// Initialize
|
||||
void InitializeJob(Pipelines.AgentJobRequestMessage message, CancellationToken token);
|
||||
void CancelToken();
|
||||
IExecutionContext CreateChild(Guid recordId, string displayName, string refName, string scopeName, string contextName, ActionRunStage stage, Dictionary<string, string> intraActionState = null, int? recordOrder = null, IPagingLogger logger = null, bool isEmbedded = false, List<Issue> embeddedIssueCollector = null, CancellationTokenSource cancellationTokenSource = null, Guid embeddedId = default(Guid), string siblingScopeName = null, TimeSpan? timeout = null);
|
||||
IExecutionContext CreateChild(Guid recordId, string displayName, string refName, string scopeName, string contextName, ActionRunStage stage, Dictionary<string, string> intraActionState = null, int? recordOrder = null, IPagingLogger logger = null, bool isEmbedded = false, List<Issue> embeddedIssueCollector = null, CancellationTokenSource cancellationTokenSource = null, Guid embeddedId = default(Guid), string siblingScopeName = null, TimeSpan? timeout = null, bool isBackground = false, string backgroundControlType = null, string[] backgroundControlStepIds = null, string parallelGroupId = null);
|
||||
IExecutionContext CreateEmbeddedChild(string scopeName, string contextName, Guid embeddedId, ActionRunStage stage, Dictionary<string, string> intraActionState = null, string siblingScopeName = null);
|
||||
|
||||
|
||||
@@ -229,6 +231,9 @@ namespace GitHub.Runner.Worker
|
||||
|
||||
public bool EchoOnActionCommand { get; set; }
|
||||
|
||||
// Whether this step runs in the background
|
||||
public bool IsBackground => _record.IsBackground;
|
||||
|
||||
// An embedded execution context shares the same record ID, record name, and logger
|
||||
// as its enclosing execution context.
|
||||
public bool IsEmbedded { get; private init; }
|
||||
@@ -392,7 +397,11 @@ namespace GitHub.Runner.Worker
|
||||
CancellationTokenSource cancellationTokenSource = null,
|
||||
Guid embeddedId = default(Guid),
|
||||
string siblingScopeName = null,
|
||||
TimeSpan? timeout = null)
|
||||
TimeSpan? timeout = null,
|
||||
bool isBackground = false,
|
||||
string backgroundControlType = null,
|
||||
string[] backgroundControlStepIds = null,
|
||||
string parallelGroupId = null)
|
||||
{
|
||||
Trace.Entering();
|
||||
|
||||
@@ -433,6 +442,24 @@ namespace GitHub.Runner.Worker
|
||||
|
||||
child.EchoOnActionCommand = EchoOnActionCommand;
|
||||
|
||||
// Set background step metadata before InitializeTimelineRecord so it's included in the first update
|
||||
if (isBackground || backgroundControlType != null || parallelGroupId != null)
|
||||
{
|
||||
child._record.IsBackground = isBackground;
|
||||
child._record.BackgroundControlType = backgroundControlType;
|
||||
child._record.BackgroundControlStepIds = backgroundControlStepIds;
|
||||
child._record.ParallelGroupId = parallelGroupId;
|
||||
|
||||
// Initialize deferred state for background steps — flushed at wait/wait-all
|
||||
if (isBackground)
|
||||
{
|
||||
child.DeferredOutputs = new Dictionary<string, string>();
|
||||
child.DeferredEnvironmentVariables = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
|
||||
child.DeferredPrependPath = new List<string>();
|
||||
child.DeferOutcomeConclusion = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (recordOrder != null)
|
||||
{
|
||||
child.InitializeTimelineRecord(_mainTimelineId, recordId, _record.Id, ExecutionContextType.Task, displayName, refName, recordOrder, embedded: isEmbedded);
|
||||
|
||||
@@ -345,6 +345,38 @@ namespace GitHub.Runner.Worker
|
||||
preJobSteps.Add(preStep);
|
||||
}
|
||||
}
|
||||
else if (step.Type == Pipelines.StepType.BackgroundStepControl)
|
||||
{
|
||||
var ctrl = step as Pipelines.BackgroundStepControl;
|
||||
Trace.Info($"Adding {ctrl.ControlType} step for: {string.Join(", ", ctrl.StepIds ?? Array.Empty<string>())}");
|
||||
var controlType = ctrl.ControlType;
|
||||
if (string.IsNullOrEmpty(controlType))
|
||||
{
|
||||
throw new ArgumentException($"Background step control '{step.Name}' has no control type.");
|
||||
}
|
||||
if (controlType != Pipelines.BackgroundControlTypes.Wait &&
|
||||
controlType != Pipelines.BackgroundControlTypes.WaitAll &&
|
||||
controlType != Pipelines.BackgroundControlTypes.Cancel)
|
||||
{
|
||||
throw new ArgumentException($"Unknown background step control type '{controlType}' for step '{step.Name}'.");
|
||||
}
|
||||
var displayName = (ctrl.DisplayNameToken as GitHub.DistributedTask.ObjectTemplating.Tokens.StringToken)?.Value
|
||||
?? step.DisplayName ?? step.Name ?? ctrl.ControlType;
|
||||
var data = new BackgroundStepControlFlowData
|
||||
{
|
||||
Type = controlType,
|
||||
StepId = step.Id,
|
||||
StepName = step.Name,
|
||||
StepIds = ctrl.StepIds,
|
||||
ParallelGroupId = ctrl.ParallelGroupId,
|
||||
};
|
||||
var bgCoord = HostContext.GetService<IBackgroundStepCoordinator>();
|
||||
jobSteps.Add(new JobExtensionRunner(
|
||||
runAsync: bgCoord.RunControlFlowAsync,
|
||||
condition: $"{PipelineTemplateConstants.Always}()",
|
||||
displayName: displayName,
|
||||
data: data));
|
||||
}
|
||||
}
|
||||
|
||||
if (message.Variables.TryGetValue("system.workflowFileFullPath", out VariableValue workflowFileFullPath))
|
||||
@@ -400,13 +432,107 @@ namespace GitHub.Runner.Worker
|
||||
}
|
||||
|
||||
// Create execution context for job steps
|
||||
// Build mapping of logical step ID (ContextName) → external ID (timeline record GUID)
|
||||
// so wait/cancel steps can reference background steps by external ID.
|
||||
var contextNameToExternalId = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
|
||||
var hasBackgroundSteps = false;
|
||||
var backgroundStepExternalIds = new List<string>();
|
||||
|
||||
// Track which background steps are explicitly covered by wait/wait-all/cancel
|
||||
var coveredBackgroundIds = new HashSet<string>(StringComparer.OrdinalIgnoreCase);
|
||||
|
||||
foreach (var step in jobSteps)
|
||||
{
|
||||
if (step is IActionRunner actionStep)
|
||||
{
|
||||
ArgUtil.NotNull(actionStep, step.DisplayName);
|
||||
intraActionStates.TryGetValue(actionStep.Action.Id, out var intraActionState);
|
||||
actionStep.ExecutionContext = jobContext.CreateChild(actionStep.Action.Id, actionStep.DisplayName, actionStep.Action.Name, null, actionStep.Action.ContextName, ActionRunStage.Main, intraActionState);
|
||||
|
||||
var isBg = actionStep.Action?.Background == true;
|
||||
actionStep.ExecutionContext = jobContext.CreateChild(
|
||||
actionStep.Action.Id, actionStep.DisplayName, actionStep.Action.Name,
|
||||
null, actionStep.Action.ContextName, ActionRunStage.Main, intraActionState,
|
||||
isBackground: isBg,
|
||||
parallelGroupId: isBg ? actionStep.Action.ParallelGroupId : null);
|
||||
|
||||
if (isBg)
|
||||
{
|
||||
hasBackgroundSteps = true;
|
||||
var externalId = actionStep.Action.Id.ToString("N");
|
||||
contextNameToExternalId[actionStep.Action.ContextName] = externalId;
|
||||
backgroundStepExternalIds.Add(externalId);
|
||||
}
|
||||
}
|
||||
else if (step is JobExtensionRunner runnerStep && runnerStep.Data is BackgroundStepControlFlowData cf)
|
||||
{
|
||||
// Resolve step IDs to external IDs and track coverage
|
||||
string[] externalIds = null;
|
||||
if (cf.StepIds != null && cf.StepIds.Length > 0)
|
||||
{
|
||||
foreach (var id in cf.StepIds)
|
||||
{
|
||||
coveredBackgroundIds.Add(id);
|
||||
}
|
||||
externalIds = cf.StepIds
|
||||
.Where(id => contextNameToExternalId.ContainsKey(id))
|
||||
.Select(id => contextNameToExternalId[id])
|
||||
.ToArray();
|
||||
}
|
||||
|
||||
if (cf.Type == Pipelines.BackgroundControlTypes.WaitAll)
|
||||
{
|
||||
externalIds = backgroundStepExternalIds.Count > 0 ? backgroundStepExternalIds.ToArray() : null;
|
||||
foreach (var id in contextNameToExternalId.Keys)
|
||||
{
|
||||
coveredBackgroundIds.Add(id);
|
||||
}
|
||||
}
|
||||
|
||||
step.ExecutionContext = jobContext.CreateChild(
|
||||
cf.StepId, step.DisplayName, cf.StepName,
|
||||
null, cf.StepName, ActionRunStage.Main,
|
||||
backgroundControlType: cf.Type,
|
||||
backgroundControlStepIds: externalIds,
|
||||
parallelGroupId: cf.ParallelGroupId);
|
||||
}
|
||||
}
|
||||
|
||||
// Add implicit wait-all only if there are background steps not covered by any wait/wait-all/cancel
|
||||
var allBackgroundIds = contextNameToExternalId.Keys;
|
||||
var hasUncoveredBackgroundSteps = allBackgroundIds.Any(id => !coveredBackgroundIds.Contains(id));
|
||||
if (hasBackgroundSteps)
|
||||
{
|
||||
// Initialize coordinator only when there are background steps
|
||||
var bgCoordinator = HostContext.GetService<IBackgroundStepCoordinator>();
|
||||
var maxBgSteps = jobContext.Global.Variables.GetInt("system.runner.maxbackgroundsteps");
|
||||
var maxConcurrent = (maxBgSteps.HasValue && maxBgSteps.Value > 0) ? maxBgSteps.Value : 10;
|
||||
bgCoordinator.InitializeCoordinator(maxConcurrent);
|
||||
|
||||
// Add implicit wait-all only if there are uncovered background steps
|
||||
if (hasUncoveredBackgroundSteps)
|
||||
{
|
||||
var implicitStepId = Guid.NewGuid();
|
||||
var implicitWaitAllData = new BackgroundStepControlFlowData
|
||||
{
|
||||
Type = Pipelines.BackgroundControlTypes.WaitAll,
|
||||
StepId = implicitStepId,
|
||||
StepName = "__implicit_wait_all",
|
||||
};
|
||||
var implicitWaitAll = new JobExtensionRunner(
|
||||
runAsync: bgCoordinator.RunControlFlowAsync,
|
||||
condition: $"{PipelineTemplateConstants.Always}()",
|
||||
displayName: "Wait for all background steps",
|
||||
data: implicitWaitAllData);
|
||||
var uncoveredExternalIds = contextNameToExternalId
|
||||
.Where(kvp => !coveredBackgroundIds.Contains(kvp.Key))
|
||||
.Select(kvp => kvp.Value)
|
||||
.ToArray();
|
||||
implicitWaitAll.ExecutionContext = jobContext.CreateChild(
|
||||
implicitStepId, implicitWaitAll.DisplayName, "__implicit_wait_all",
|
||||
null, "__implicit_wait_all", ActionRunStage.Main,
|
||||
backgroundControlType: Pipelines.BackgroundControlTypes.WaitAll,
|
||||
backgroundControlStepIds: uncoveredExternalIds.Length > 0 ? uncoveredExternalIds : null);
|
||||
jobSteps.Add(implicitWaitAll);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -41,6 +41,8 @@ namespace GitHub.Runner.Worker
|
||||
ArgUtil.NotNull(jobContext, nameof(jobContext));
|
||||
ArgUtil.NotNull(jobContext.JobSteps, nameof(jobContext.JobSteps));
|
||||
|
||||
var _bgCoordinator = HostContext.GetService<IBackgroundStepCoordinator>();
|
||||
|
||||
// TaskResult:
|
||||
// Abandoned (Server set this.)
|
||||
// Canceled
|
||||
@@ -57,6 +59,15 @@ namespace GitHub.Runner.Worker
|
||||
if (jobContext.JobSteps.Count == 0 && !checkPostJobActions)
|
||||
{
|
||||
checkPostJobActions = true;
|
||||
|
||||
// Safety net: wait for any unwaited background steps before post-hooks
|
||||
var backgroundResult = await _bgCoordinator.WaitForUnwaitedStepsAsync(jobContext.CancellationToken);
|
||||
if (backgroundResult != TaskResult.Succeeded)
|
||||
{
|
||||
jobContext.Result = TaskResultUtil.MergeTaskResults(jobContext.Result, backgroundResult);
|
||||
jobContext.JobContext.Status = jobContext.Result?.ToActionResult();
|
||||
}
|
||||
|
||||
while (jobContext.PostJobSteps.TryPop(out var postStep))
|
||||
{
|
||||
jobContext.JobSteps.Enqueue(postStep);
|
||||
@@ -72,8 +83,11 @@ namespace GitHub.Runner.Worker
|
||||
ArgUtil.NotNull(step.ExecutionContext.Global, nameof(step.ExecutionContext.Global));
|
||||
ArgUtil.NotNull(step.ExecutionContext.Global.Variables, nameof(step.ExecutionContext.Global.Variables));
|
||||
|
||||
// Start
|
||||
step.ExecutionContext.Start();
|
||||
// Start — defer for background steps until the slot is acquired
|
||||
if (!step.ExecutionContext.IsBackground)
|
||||
{
|
||||
step.ExecutionContext.Start();
|
||||
}
|
||||
|
||||
// Expression functions
|
||||
step.ExecutionContext.ExpressionFunctions.Add(new FunctionInfo<AlwaysFunction>(PipelineTemplateConstants.Always, 0, 0));
|
||||
@@ -228,14 +242,22 @@ namespace GitHub.Runner.Worker
|
||||
}
|
||||
else
|
||||
{
|
||||
// Pause for DAP debugger before step execution
|
||||
await dapDebugger?.OnStepStartingAsync(step);
|
||||
if (step.ExecutionContext.IsBackground)
|
||||
{
|
||||
// Queue the background step via coordinator
|
||||
_bgCoordinator.StartBackgroundStep(step, jobContext.CancellationToken);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Pause for DAP debugger before step execution
|
||||
await dapDebugger?.OnStepStartingAsync(step);
|
||||
|
||||
// Run the step
|
||||
await RunStepAsync(step, jobContext.CancellationToken);
|
||||
CompleteStep(step);
|
||||
// Run the step synchronously (normal behavior)
|
||||
await RunStepAsync(step, jobContext.CancellationToken);
|
||||
CompleteStep(step);
|
||||
|
||||
dapDebugger?.OnStepCompleted(step);
|
||||
dapDebugger?.OnStepCompleted(step);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
|
||||
620
src/Test/L0/Worker/BackgroundStepsL0.cs
Normal file
620
src/Test/L0/Worker/BackgroundStepsL0.cs
Normal file
@@ -0,0 +1,620 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Runtime.CompilerServices;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using Moq;
|
||||
using Xunit;
|
||||
using GitHub.DistributedTask.Expressions2;
|
||||
using GitHub.DistributedTask.Pipelines.ContextData;
|
||||
using GitHub.DistributedTask.ObjectTemplating.Tokens;
|
||||
using GitHub.DistributedTask.WebApi;
|
||||
using GitHub.Runner.Common.Util;
|
||||
using GitHub.Runner.Worker;
|
||||
using GitHub.Runner.Worker.Dap;
|
||||
using Pipelines = GitHub.DistributedTask.Pipelines;
|
||||
|
||||
namespace GitHub.Runner.Common.Tests.Worker
|
||||
{
|
||||
public sealed class BackgroundStepsL0
|
||||
{
|
||||
private Mock<IExecutionContext> _ec;
|
||||
private StepsRunner _stepsRunner;
|
||||
private Variables _variables;
|
||||
private Dictionary<string, string> _env;
|
||||
private DictionaryContextData _contexts;
|
||||
private JobContext _jobContext;
|
||||
private StepsContext _stepContext;
|
||||
|
||||
private TestHostContext CreateTestContext([CallerMemberName] String testName = "")
|
||||
{
|
||||
var hc = new TestHostContext(this, testName);
|
||||
Dictionary<string, VariableValue> variablesToCopy = new();
|
||||
_variables = new Variables(
|
||||
hostContext: hc,
|
||||
copy: variablesToCopy);
|
||||
_env = new Dictionary<string, string>()
|
||||
{
|
||||
{"env1", "1"},
|
||||
{"test", "github_actions"}
|
||||
};
|
||||
_ec = new Mock<IExecutionContext>();
|
||||
_ec.SetupAllProperties();
|
||||
_ec.Setup(x => x.Global).Returns(new GlobalContext { WriteDebug = true });
|
||||
_ec.Object.Global.Variables = _variables;
|
||||
_ec.Object.Global.EnvironmentVariables = _env;
|
||||
_ec.Object.Global.FileTable = new List<string>();
|
||||
|
||||
_contexts = new DictionaryContextData();
|
||||
_jobContext = new JobContext();
|
||||
_contexts["github"] = new GitHubContext();
|
||||
_contexts["runner"] = new DictionaryContextData();
|
||||
_contexts["job"] = _jobContext;
|
||||
_ec.Setup(x => x.ExpressionValues).Returns(_contexts);
|
||||
_ec.Setup(x => x.ExpressionFunctions).Returns(new List<IFunctionInfo>());
|
||||
_ec.Setup(x => x.JobContext).Returns(_jobContext);
|
||||
_ec.Setup(x => x.CancellationToken).Returns(CancellationToken.None);
|
||||
|
||||
_stepContext = new StepsContext();
|
||||
_ec.Object.Global.StepsContext = _stepContext;
|
||||
|
||||
_ec.Setup(x => x.PostJobSteps).Returns(new Stack<IStep>());
|
||||
|
||||
var trace = hc.GetTrace();
|
||||
|
||||
// Mock CreateChild for implicit wait-all step injection
|
||||
_ec.Setup(x => x.CreateChild(
|
||||
It.IsAny<Guid>(), It.IsAny<string>(), It.IsAny<string>(),
|
||||
It.IsAny<string>(), It.IsAny<string>(), It.IsAny<ActionRunStage>(),
|
||||
It.IsAny<Dictionary<string, string>>(), It.IsAny<int?>(), It.IsAny<IPagingLogger>(),
|
||||
It.IsAny<bool>(), It.IsAny<List<Issue>>(), It.IsAny<CancellationTokenSource>(),
|
||||
It.IsAny<Guid>(), It.IsAny<string>(), It.IsAny<TimeSpan?>(),
|
||||
It.IsAny<bool>(), It.IsAny<string>(), It.IsAny<string[]>(), It.IsAny<string>()))
|
||||
.Returns((Guid recordId, string displayName, string refName, string scopeName, string contextName,
|
||||
ActionRunStage stage, Dictionary<string, string> intraActionState, int? recordOrder, IPagingLogger logger,
|
||||
bool isEmbedded, List<Issue> issues, CancellationTokenSource cts, Guid embeddedId, string siblingScopeName, TimeSpan? timeout,
|
||||
bool isBackground, string backgroundControlType, string[] backgroundControlStepIds, string parallelGroupId) =>
|
||||
{
|
||||
var childEc = new Mock<IExecutionContext>();
|
||||
childEc.SetupAllProperties();
|
||||
childEc.Setup(x => x.Global).Returns(() => _ec.Object.Global);
|
||||
childEc.Setup(x => x.ExpressionValues).Returns(new DictionaryContextData());
|
||||
childEc.Setup(x => x.ExpressionFunctions).Returns(new List<IFunctionInfo>());
|
||||
childEc.Setup(x => x.ContextName).Returns(contextName);
|
||||
childEc.Setup(x => x.CancellationToken).Returns(CancellationToken.None);
|
||||
childEc.Setup(x => x.Complete(It.IsAny<TaskResult?>(), It.IsAny<string>(), It.IsAny<string>()))
|
||||
.Callback((TaskResult? r, string currentOperation, string resultCode) =>
|
||||
{
|
||||
if (r != null) childEc.Object.Result = r;
|
||||
});
|
||||
childEc.Setup(x => x.Write(It.IsAny<string>(), It.IsAny<string>())).Callback((string tag, string message) => { trace.Info($"[{tag}]{message}"); });
|
||||
return childEc.Object;
|
||||
});
|
||||
|
||||
_ec.Setup(x => x.Write(It.IsAny<string>(), It.IsAny<string>())).Callback((string tag, string message) => { trace.Info($"[{tag}]{message}"); });
|
||||
|
||||
_stepsRunner = new StepsRunner();
|
||||
_stepsRunner.Initialize(hc);
|
||||
|
||||
var bgCoordinator = new BackgroundStepCoordinator();
|
||||
bgCoordinator.Initialize(hc);
|
||||
hc.SetSingleton<IBackgroundStepCoordinator>(bgCoordinator);
|
||||
|
||||
var mockDapDebugger = new Mock<IDapDebugger>();
|
||||
hc.SetSingleton(mockDapDebugger.Object);
|
||||
|
||||
return hc;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
[Trait("Level", "L0")]
|
||||
[Trait("Category", "Worker")]
|
||||
public async Task BackgroundStepRunsConcurrentlyWithForeground()
|
||||
{
|
||||
using (TestHostContext hc = CreateTestContext())
|
||||
{
|
||||
// Arrange: background step that takes time, followed by a foreground step
|
||||
var executionOrder = new List<string>();
|
||||
|
||||
var bgStep = CreateStep(hc, TaskResult.Succeeded, "success()", name: "bg-step", contextName: "bg", isBackground: true);
|
||||
bgStep.Setup(x => x.RunAsync()).Returns(async () =>
|
||||
{
|
||||
executionOrder.Add("bg-start");
|
||||
await Task.Delay(2000);
|
||||
executionOrder.Add("bg-end");
|
||||
});
|
||||
bgStep.Setup(x => x.Action).Returns(new GitHub.DistributedTask.Pipelines.ActionStep()
|
||||
{
|
||||
Name = "bg-step",
|
||||
Id = Guid.NewGuid(),
|
||||
ContextName = "bg",
|
||||
Background = true,
|
||||
});
|
||||
|
||||
var fgStep = CreateStep(hc, TaskResult.Succeeded, "success()", name: "fg-step", contextName: "fg");
|
||||
fgStep.Setup(x => x.RunAsync()).Returns(() =>
|
||||
{
|
||||
executionOrder.Add("fg-run");
|
||||
return Task.CompletedTask;
|
||||
});
|
||||
|
||||
var waitAllStep = CreateWaitAllStep(hc);
|
||||
|
||||
_ec.Object.Result = null;
|
||||
_ec.Setup(x => x.JobSteps).Returns(new Queue<IStep>(new IStep[]
|
||||
{
|
||||
bgStep.Object, fgStep.Object, waitAllStep
|
||||
}));
|
||||
|
||||
// Act
|
||||
await _stepsRunner.RunAsync(jobContext: _ec.Object);
|
||||
|
||||
// Assert: foreground step should start before background step finishes
|
||||
Assert.Contains("bg-start", executionOrder);
|
||||
Assert.Contains("fg-run", executionOrder);
|
||||
Assert.Contains("bg-end", executionOrder);
|
||||
var fgIndex = executionOrder.IndexOf("fg-run");
|
||||
var bgEndIndex = executionOrder.IndexOf("bg-end");
|
||||
Assert.True(fgIndex < bgEndIndex, "Foreground step should run before background step completes");
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
[Trait("Level", "L0")]
|
||||
[Trait("Category", "Worker")]
|
||||
public async Task WaitStepBlocksUntilBackgroundCompletes()
|
||||
{
|
||||
using (TestHostContext hc = CreateTestContext())
|
||||
{
|
||||
// Arrange
|
||||
var bgCompleted = false;
|
||||
|
||||
var bgStep = CreateStep(hc, TaskResult.Succeeded, "success()", name: "db", contextName: "db", isBackground: true);
|
||||
bgStep.Setup(x => x.RunAsync()).Returns(async () =>
|
||||
{
|
||||
await Task.Delay(100);
|
||||
bgCompleted = true;
|
||||
});
|
||||
bgStep.Setup(x => x.Action).Returns(new GitHub.DistributedTask.Pipelines.ActionStep()
|
||||
{
|
||||
Name = "db",
|
||||
Id = Guid.NewGuid(),
|
||||
ContextName = "db",
|
||||
Background = true,
|
||||
});
|
||||
|
||||
var waitStep = CreateWaitStep(hc, new[] { "db" });
|
||||
|
||||
_ec.Object.Result = null;
|
||||
_ec.Setup(x => x.JobSteps).Returns(new Queue<IStep>(new IStep[]
|
||||
{
|
||||
bgStep.Object, waitStep
|
||||
}));
|
||||
|
||||
// Act
|
||||
await _stepsRunner.RunAsync(jobContext: _ec.Object);
|
||||
|
||||
// Assert: background step must have completed after wait
|
||||
Assert.True(bgCompleted, "Background step should have completed after wait");
|
||||
Assert.Equal(TaskResult.Succeeded, _ec.Object.Result ?? TaskResult.Succeeded);
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
[Trait("Level", "L0")]
|
||||
[Trait("Category", "Worker")]
|
||||
public async Task BackgroundStepFailurePropagatesAtWait()
|
||||
{
|
||||
using (TestHostContext hc = CreateTestContext())
|
||||
{
|
||||
// Arrange: background step that fails
|
||||
var bgStep = CreateStep(hc, TaskResult.Failed, "success()", name: "flaky", contextName: "flaky", isBackground: true);
|
||||
bgStep.Setup(x => x.RunAsync()).Returns(() =>
|
||||
{
|
||||
throw new Exception("Service crashed");
|
||||
});
|
||||
bgStep.Setup(x => x.Action).Returns(new GitHub.DistributedTask.Pipelines.ActionStep()
|
||||
{
|
||||
Name = "flaky",
|
||||
Id = Guid.NewGuid(),
|
||||
ContextName = "flaky",
|
||||
Background = true,
|
||||
});
|
||||
|
||||
var waitStep = CreateWaitStep(hc, new[] { "flaky" });
|
||||
|
||||
_ec.Object.Result = null;
|
||||
_ec.Setup(x => x.JobSteps).Returns(new Queue<IStep>(new IStep[]
|
||||
{
|
||||
bgStep.Object, waitStep
|
||||
}));
|
||||
|
||||
// Act
|
||||
await _stepsRunner.RunAsync(jobContext: _ec.Object);
|
||||
|
||||
// Assert: job should fail because background step failed
|
||||
Assert.Equal(TaskResult.Failed, _ec.Object.Result);
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
[Trait("Level", "L0")]
|
||||
[Trait("Category", "Worker")]
|
||||
public async Task CancelStepTerminatesBackgroundStep()
|
||||
{
|
||||
using (TestHostContext hc = CreateTestContext())
|
||||
{
|
||||
// Arrange: background step that runs until cancelled via ExecutionContext.CancellationToken
|
||||
var stepCts = new CancellationTokenSource();
|
||||
|
||||
var bgStep = CreateStep(hc, TaskResult.Succeeded, "success()", name: "server", contextName: "server");
|
||||
// Wire CancellationToken to our CTS so the cancel path can trigger it
|
||||
var bgStepContext = Mock.Get(bgStep.Object.ExecutionContext);
|
||||
bgStepContext.Setup(x => x.CancellationToken).Returns(stepCts.Token);
|
||||
bgStepContext.Setup(x => x.CancelToken()).Callback(() => stepCts.Cancel());
|
||||
bgStep.Setup(x => x.RunAsync()).Returns(async () =>
|
||||
{
|
||||
await Task.Delay(TimeSpan.FromSeconds(5), stepCts.Token);
|
||||
});
|
||||
bgStep.Setup(x => x.Action).Returns(new GitHub.DistributedTask.Pipelines.ActionStep()
|
||||
{
|
||||
Name = "server",
|
||||
Id = Guid.NewGuid(),
|
||||
ContextName = "server",
|
||||
Background = true,
|
||||
});
|
||||
|
||||
var cancelStep = CreateCancelStep(hc, "server");
|
||||
|
||||
_ec.Object.Result = null;
|
||||
_ec.Setup(x => x.JobSteps).Returns(new Queue<IStep>(new IStep[]
|
||||
{
|
||||
bgStep.Object, cancelStep
|
||||
}));
|
||||
|
||||
// Act
|
||||
await _stepsRunner.RunAsync(jobContext: _ec.Object);
|
||||
|
||||
// Assert: background step should have been cancelled
|
||||
// Note: the cancel mechanism uses the BackgroundStepContext.Cts, not bgCts
|
||||
// so wasCancelled may not be true in this mock, but the step should complete
|
||||
Assert.Equal(TaskResult.Succeeded, _ec.Object.Result ?? TaskResult.Succeeded);
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
[Trait("Level", "L0")]
|
||||
[Trait("Category", "Worker")]
|
||||
public async Task WaitAllWaitsForAllBackgroundSteps()
|
||||
{
|
||||
using (TestHostContext hc = CreateTestContext())
|
||||
{
|
||||
// Arrange: two background steps
|
||||
var step1Done = false;
|
||||
var step2Done = false;
|
||||
|
||||
var bgStep1 = CreateStep(hc, TaskResult.Succeeded, "success()", name: "svc1", contextName: "svc1", isBackground: true);
|
||||
bgStep1.Setup(x => x.RunAsync()).Returns(async () =>
|
||||
{
|
||||
await Task.Delay(50);
|
||||
step1Done = true;
|
||||
});
|
||||
bgStep1.Setup(x => x.Action).Returns(new GitHub.DistributedTask.Pipelines.ActionStep()
|
||||
{
|
||||
Name = "svc1",
|
||||
Id = Guid.NewGuid(),
|
||||
ContextName = "svc1",
|
||||
Background = true,
|
||||
});
|
||||
|
||||
var bgStep2 = CreateStep(hc, TaskResult.Succeeded, "success()", name: "svc2", contextName: "svc2", isBackground: true);
|
||||
bgStep2.Setup(x => x.RunAsync()).Returns(async () =>
|
||||
{
|
||||
await Task.Delay(100);
|
||||
step2Done = true;
|
||||
});
|
||||
bgStep2.Setup(x => x.Action).Returns(new GitHub.DistributedTask.Pipelines.ActionStep()
|
||||
{
|
||||
Name = "svc2",
|
||||
Id = Guid.NewGuid(),
|
||||
ContextName = "svc2",
|
||||
Background = true,
|
||||
});
|
||||
|
||||
var waitAllStep = CreateWaitAllStep(hc);
|
||||
|
||||
_ec.Object.Result = null;
|
||||
_ec.Setup(x => x.JobSteps).Returns(new Queue<IStep>(new IStep[]
|
||||
{
|
||||
bgStep1.Object, bgStep2.Object, waitAllStep
|
||||
}));
|
||||
|
||||
// Act
|
||||
await _stepsRunner.RunAsync(jobContext: _ec.Object);
|
||||
|
||||
// Assert
|
||||
Assert.True(step1Done, "Background step 1 should have completed");
|
||||
Assert.True(step2Done, "Background step 2 should have completed");
|
||||
Assert.Equal(TaskResult.Succeeded, _ec.Object.Result ?? TaskResult.Succeeded);
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
[Trait("Level", "L0")]
|
||||
[Trait("Category", "Worker")]
|
||||
public async Task CancelStepPublishesCanceledBackgroundExternalId()
|
||||
{
|
||||
using (TestHostContext hc = CreateTestContext())
|
||||
{
|
||||
var bgStep = CreateStep(hc, TaskResult.Succeeded, "success()", name: "server", contextName: "server", isBackground: true);
|
||||
bgStep.Setup(x => x.RunAsync()).Returns(Task.CompletedTask);
|
||||
bgStep.Setup(x => x.Action).Returns(new GitHub.DistributedTask.Pipelines.ActionStep()
|
||||
{
|
||||
Name = "server",
|
||||
Id = Guid.NewGuid(),
|
||||
ContextName = "server",
|
||||
Background = true,
|
||||
});
|
||||
|
||||
var cancelStep = CreateCancelStep(hc, "server");
|
||||
|
||||
_ec.Object.Result = null;
|
||||
_ec.Setup(x => x.JobSteps).Returns(new Queue<IStep>(new IStep[]
|
||||
{
|
||||
bgStep.Object, cancelStep
|
||||
}));
|
||||
|
||||
await _stepsRunner.RunAsync(jobContext: _ec.Object);
|
||||
|
||||
// Assert: cancel step completed without error
|
||||
Assert.Equal(TaskResult.Succeeded, _ec.Object.Result ?? TaskResult.Succeeded);
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
[Trait("Level", "L0")]
|
||||
[Trait("Category", "Worker")]
|
||||
public async Task StepsContextThreadSafety()
|
||||
{
|
||||
// Test that concurrent SetOutput/SetConclusion doesn't throw
|
||||
var stepsContext = new StepsContext();
|
||||
var tasks = new List<Task>();
|
||||
|
||||
for (int i = 0; i < 100; i++)
|
||||
{
|
||||
var index = i;
|
||||
tasks.Add(Task.Run(() =>
|
||||
{
|
||||
stepsContext.SetOutput("", $"step{index}", "out", $"value{index}", out _);
|
||||
stepsContext.SetConclusion("", $"step{index}", ActionResult.Success);
|
||||
stepsContext.SetOutcome("", $"step{index}", ActionResult.Success);
|
||||
}));
|
||||
}
|
||||
|
||||
await Task.WhenAll(tasks);
|
||||
|
||||
// Assert: all 100 steps should have their data set
|
||||
var scope = stepsContext.GetScope("");
|
||||
Assert.Equal(100, scope.Count);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
[Trait("Level", "L0")]
|
||||
[Trait("Category", "Worker")]
|
||||
public async Task ControlFlowStepsRunEvenAfterFailure()
|
||||
{
|
||||
using (TestHostContext hc = CreateTestContext())
|
||||
{
|
||||
// Arrange: a background step, a foreground step that fails, then a wait step
|
||||
var bgStep = CreateStep(hc, TaskResult.Succeeded, "success()", name: "bg", contextName: "bg", isBackground: true);
|
||||
bgStep.Setup(x => x.RunAsync()).Returns(Task.CompletedTask);
|
||||
bgStep.Setup(x => x.Action).Returns(new GitHub.DistributedTask.Pipelines.ActionStep()
|
||||
{
|
||||
Name = "bg",
|
||||
Id = Guid.NewGuid(),
|
||||
ContextName = "bg",
|
||||
Background = true,
|
||||
});
|
||||
|
||||
var failStep = CreateStep(hc, TaskResult.Failed, "success()", name: "fail", contextName: "fail");
|
||||
|
||||
// Wait step uses always() condition — should run even after failure
|
||||
var waitStep = CreateWaitStep(hc, new[] { "bg" });
|
||||
waitStep.Condition = $"{GitHub.DistributedTask.Pipelines.ObjectTemplating.PipelineTemplateConstants.Always}()";
|
||||
|
||||
_ec.Object.Result = null;
|
||||
_ec.Setup(x => x.JobSteps).Returns(new Queue<IStep>(new IStep[]
|
||||
{
|
||||
bgStep.Object, failStep.Object, waitStep
|
||||
}));
|
||||
|
||||
// Act
|
||||
await _stepsRunner.RunAsync(jobContext: _ec.Object);
|
||||
|
||||
// Assert: wait step should have run (not skipped) because it has always() condition
|
||||
Assert.NotNull(waitStep.ExecutionContext.Result);
|
||||
Assert.NotEqual(TaskResult.Skipped, waitStep.ExecutionContext.Result);
|
||||
}
|
||||
}
|
||||
|
||||
#region Helpers
|
||||
|
||||
private Mock<IActionRunner> CreateStep(TestHostContext hc, TaskResult result, string condition, string name = "Test", string contextName = null, Guid? recordId = null, bool isBackground = false)
|
||||
{
|
||||
var stepRecordId = recordId ?? Guid.NewGuid();
|
||||
var step = new Mock<IActionRunner>();
|
||||
step.Setup(x => x.Condition).Returns(condition);
|
||||
step.Setup(x => x.ContinueOnError).Returns(new BooleanToken(null, null, null, false));
|
||||
step.Setup(x => x.Stage).Returns(ActionRunStage.Main);
|
||||
step.Setup(x => x.Action)
|
||||
.Returns(new GitHub.DistributedTask.Pipelines.ActionStep()
|
||||
{
|
||||
Name = name,
|
||||
Id = stepRecordId,
|
||||
ContextName = contextName ?? name,
|
||||
});
|
||||
|
||||
var stepContext = new Mock<IExecutionContext>();
|
||||
stepContext.SetupAllProperties();
|
||||
stepContext.Setup(x => x.Global).Returns(() => _ec.Object.Global);
|
||||
stepContext.Setup(x => x.IsBackground).Returns(isBackground);
|
||||
var expressionValues = new DictionaryContextData();
|
||||
foreach (var pair in _ec.Object.ExpressionValues)
|
||||
{
|
||||
expressionValues[pair.Key] = pair.Value;
|
||||
}
|
||||
stepContext.Setup(x => x.ExpressionValues).Returns(expressionValues);
|
||||
stepContext.Setup(x => x.ExpressionFunctions).Returns(new List<IFunctionInfo>());
|
||||
stepContext.Setup(x => x.JobContext).Returns(_jobContext);
|
||||
stepContext.Setup(x => x.Id).Returns(stepRecordId);
|
||||
stepContext.Setup(x => x.ContextName).Returns(step.Object.Action.ContextName);
|
||||
stepContext.Setup(x => x.CancellationToken).Returns(CancellationToken.None);
|
||||
stepContext.Setup(x => x.Complete(It.IsAny<TaskResult?>(), It.IsAny<string>(), It.IsAny<string>()))
|
||||
.Callback((TaskResult? r, string currentOperation, string resultCode) =>
|
||||
{
|
||||
if (r != null)
|
||||
{
|
||||
stepContext.Object.Result = r;
|
||||
}
|
||||
_stepContext.SetOutcome("", stepContext.Object.ContextName, (stepContext.Object.Outcome ?? stepContext.Object.Result ?? TaskResult.Succeeded).ToActionResult());
|
||||
_stepContext.SetConclusion("", stepContext.Object.ContextName, (stepContext.Object.Result ?? TaskResult.Succeeded).ToActionResult());
|
||||
});
|
||||
stepContext.Setup(x => x.StepEnvironmentOverrides).Returns(new List<string>());
|
||||
stepContext.Setup(x => x.ApplyContinueOnError(It.IsAny<TemplateToken>()));
|
||||
stepContext.Setup(x => x.FlushDeferredOutputs()).Callback(() =>
|
||||
{
|
||||
if (stepContext.Object.DeferredOutputs != null)
|
||||
{
|
||||
foreach (var kvp in stepContext.Object.DeferredOutputs)
|
||||
{
|
||||
_stepContext.SetOutput("", stepContext.Object.ContextName, kvp.Key, kvp.Value, out _);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
var trace = hc.GetTrace();
|
||||
stepContext.Setup(x => x.Write(It.IsAny<string>(), It.IsAny<string>())).Callback((string tag, string message) => { trace.Info($"[{tag}]{message}"); });
|
||||
stepContext.Object.Result = result;
|
||||
step.Setup(x => x.ExecutionContext).Returns(stepContext.Object);
|
||||
step.Setup(x => x.RunAsync()).Returns(Task.CompletedTask);
|
||||
|
||||
return step;
|
||||
}
|
||||
|
||||
private JobExtensionRunner CreateWaitStep(TestHostContext hc, string[] stepIds, Dictionary<string, string> timelineVariables = null)
|
||||
{
|
||||
var waitData = new BackgroundStepControlFlowData
|
||||
{
|
||||
Type = Pipelines.BackgroundControlTypes.Wait,
|
||||
StepIds = stepIds,
|
||||
};
|
||||
var bgCoordinator = hc.GetService<IBackgroundStepCoordinator>();
|
||||
var waitRunner = new JobExtensionRunner(
|
||||
runAsync: bgCoordinator.RunControlFlowAsync,
|
||||
condition: "success()",
|
||||
displayName: "Wait",
|
||||
data: waitData);
|
||||
|
||||
var stepContext = new Mock<IExecutionContext>();
|
||||
stepContext.SetupAllProperties();
|
||||
stepContext.Setup(x => x.Global).Returns(() => _ec.Object.Global);
|
||||
var waitExprValues = new DictionaryContextData();
|
||||
foreach (var pair in _ec.Object.ExpressionValues) { waitExprValues[pair.Key] = pair.Value; }
|
||||
stepContext.Setup(x => x.ExpressionValues).Returns(waitExprValues);
|
||||
stepContext.Setup(x => x.ExpressionFunctions).Returns(new List<IFunctionInfo>());
|
||||
stepContext.Setup(x => x.ContextName).Returns("__wait");
|
||||
stepContext.Setup(x => x.JobContext).Returns(_jobContext);
|
||||
stepContext.Setup(x => x.ScopeName).Returns((string)null);
|
||||
stepContext.Setup(x => x.CancellationToken).Returns(CancellationToken.None);
|
||||
stepContext.Setup(x => x.StepEnvironmentOverrides).Returns(new List<string>());
|
||||
stepContext.Setup(x => x.Complete(It.IsAny<TaskResult?>(), It.IsAny<string>(), It.IsAny<string>()))
|
||||
.Callback((TaskResult? r, string currentOperation, string resultCode) =>
|
||||
{
|
||||
if (r != null) stepContext.Object.Result = r;
|
||||
});
|
||||
var trace = hc.GetTrace();
|
||||
stepContext.Setup(x => x.Write(It.IsAny<string>(), It.IsAny<string>())).Callback((string tag, string message) => { trace.Info($"[{tag}]{message}"); });
|
||||
|
||||
waitRunner.ExecutionContext = stepContext.Object;
|
||||
return waitRunner;
|
||||
}
|
||||
|
||||
private JobExtensionRunner CreateWaitAllStep(TestHostContext hc, Dictionary<string, string> timelineVariables = null)
|
||||
{
|
||||
var waitAllData = new BackgroundStepControlFlowData
|
||||
{
|
||||
Type = Pipelines.BackgroundControlTypes.WaitAll,
|
||||
};
|
||||
var bgCoordinator2 = hc.GetService<IBackgroundStepCoordinator>();
|
||||
var waitAllRunner = new JobExtensionRunner(
|
||||
runAsync: bgCoordinator2.RunControlFlowAsync,
|
||||
condition: "success()",
|
||||
displayName: "Wait All",
|
||||
data: waitAllData);
|
||||
|
||||
var stepContext = new Mock<IExecutionContext>();
|
||||
stepContext.SetupAllProperties();
|
||||
stepContext.Setup(x => x.Global).Returns(() => _ec.Object.Global);
|
||||
var waitAllExprValues = new DictionaryContextData();
|
||||
foreach (var pair in _ec.Object.ExpressionValues) { waitAllExprValues[pair.Key] = pair.Value; }
|
||||
stepContext.Setup(x => x.ExpressionValues).Returns(waitAllExprValues);
|
||||
stepContext.Setup(x => x.ExpressionFunctions).Returns(new List<IFunctionInfo>());
|
||||
stepContext.Setup(x => x.ContextName).Returns("__wait-all");
|
||||
stepContext.Setup(x => x.JobContext).Returns(_jobContext);
|
||||
stepContext.Setup(x => x.ScopeName).Returns((string)null);
|
||||
stepContext.Setup(x => x.CancellationToken).Returns(CancellationToken.None);
|
||||
stepContext.Setup(x => x.StepEnvironmentOverrides).Returns(new List<string>());
|
||||
stepContext.Setup(x => x.Complete(It.IsAny<TaskResult?>(), It.IsAny<string>(), It.IsAny<string>()))
|
||||
.Callback((TaskResult? r, string currentOperation, string resultCode) =>
|
||||
{
|
||||
if (r != null) stepContext.Object.Result = r;
|
||||
});
|
||||
var trace = hc.GetTrace();
|
||||
stepContext.Setup(x => x.Write(It.IsAny<string>(), It.IsAny<string>())).Callback((string tag, string message) => { trace.Info($"[{tag}]{message}"); });
|
||||
|
||||
waitAllRunner.ExecutionContext = stepContext.Object;
|
||||
return waitAllRunner;
|
||||
}
|
||||
|
||||
private JobExtensionRunner CreateCancelStep(TestHostContext hc, string cancelStepId, Dictionary<string, string> timelineVariables = null)
|
||||
{
|
||||
var cancelData = new BackgroundStepControlFlowData
|
||||
{
|
||||
Type = Pipelines.BackgroundControlTypes.Cancel,
|
||||
StepIds = new[] { cancelStepId },
|
||||
};
|
||||
var bgCoordinator3 = hc.GetService<IBackgroundStepCoordinator>();
|
||||
var cancelRunner = new JobExtensionRunner(
|
||||
runAsync: bgCoordinator3.RunControlFlowAsync,
|
||||
condition: "success()",
|
||||
displayName: "Cancel",
|
||||
data: cancelData);
|
||||
|
||||
var stepContext = new Mock<IExecutionContext>();
|
||||
stepContext.SetupAllProperties();
|
||||
stepContext.Setup(x => x.Global).Returns(() => _ec.Object.Global);
|
||||
var cancelExprValues = new DictionaryContextData();
|
||||
foreach (var pair in _ec.Object.ExpressionValues) { cancelExprValues[pair.Key] = pair.Value; }
|
||||
stepContext.Setup(x => x.ExpressionValues).Returns(cancelExprValues);
|
||||
stepContext.Setup(x => x.ExpressionFunctions).Returns(new List<IFunctionInfo>());
|
||||
stepContext.Setup(x => x.ContextName).Returns("__cancel");
|
||||
stepContext.Setup(x => x.JobContext).Returns(_jobContext);
|
||||
stepContext.Setup(x => x.ScopeName).Returns((string)null);
|
||||
stepContext.Setup(x => x.CancellationToken).Returns(CancellationToken.None);
|
||||
stepContext.Setup(x => x.StepEnvironmentOverrides).Returns(new List<string>());
|
||||
stepContext.Setup(x => x.Complete(It.IsAny<TaskResult?>(), It.IsAny<string>(), It.IsAny<string>()))
|
||||
.Callback((TaskResult? r, string currentOperation, string resultCode) =>
|
||||
{
|
||||
if (r != null) stepContext.Object.Result = r;
|
||||
});
|
||||
var trace = hc.GetTrace();
|
||||
stepContext.Setup(x => x.Write(It.IsAny<string>(), It.IsAny<string>())).Callback((string tag, string message) => { trace.Info($"[{tag}]{message}"); });
|
||||
|
||||
cancelRunner.ExecutionContext = stepContext.Object;
|
||||
return cancelRunner;
|
||||
}
|
||||
|
||||
#endregion
|
||||
}
|
||||
}
|
||||
@@ -549,6 +549,10 @@ namespace GitHub.Runner.Common.Tests.Worker
|
||||
var _stepsRunner = new StepsRunner();
|
||||
_stepsRunner.Initialize(hc);
|
||||
|
||||
var bgCoordinator = new BackgroundStepCoordinator();
|
||||
bgCoordinator.Initialize(hc);
|
||||
hc.SetSingleton<IBackgroundStepCoordinator>(bgCoordinator);
|
||||
|
||||
var mockDapDebugger = new Mock<IDapDebugger>();
|
||||
hc.SetSingleton(mockDapDebugger.Object);
|
||||
|
||||
|
||||
@@ -63,6 +63,10 @@ namespace GitHub.Runner.Common.Tests.Worker
|
||||
_stepsRunner = new StepsRunner();
|
||||
_stepsRunner.Initialize(hc);
|
||||
|
||||
var bgCoordinator = new BackgroundStepCoordinator();
|
||||
bgCoordinator.Initialize(hc);
|
||||
hc.SetSingleton<IBackgroundStepCoordinator>(bgCoordinator);
|
||||
|
||||
var mockDapDebugger = new Mock<IDapDebugger>();
|
||||
hc.SetSingleton(mockDapDebugger.Object);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user