diff --git a/core/src/main/java/org/keycloak/representations/workflows/AbstractWorkflowComponentRepresentation.java b/core/src/main/java/org/keycloak/representations/workflows/AbstractWorkflowComponentRepresentation.java index 33a189152cd..c4da2940c0b 100644 --- a/core/src/main/java/org/keycloak/representations/workflows/AbstractWorkflowComponentRepresentation.java +++ b/core/src/main/java/org/keycloak/representations/workflows/AbstractWorkflowComponentRepresentation.java @@ -1,7 +1,7 @@ package org.keycloak.representations.workflows; import static org.keycloak.common.util.reflections.Reflections.isArrayType; -import static org.keycloak.representations.workflows.WorkflowConstants.CONFIG_IF; +import static org.keycloak.representations.workflows.WorkflowConstants.CONFIG_WITH; import java.util.Arrays; import java.util.Collections; @@ -20,7 +20,7 @@ public abstract class AbstractWorkflowComponentRepresentation { private String id; private String uses; - @JsonProperty(CONFIG_IF) + @JsonProperty(CONFIG_WITH) private MultivaluedHashMap config; public AbstractWorkflowComponentRepresentation(String id, String uses, MultivaluedHashMap config) { diff --git a/model/jpa/src/main/java/org/keycloak/models/workflow/JpaWorkflowStateProvider.java b/model/jpa/src/main/java/org/keycloak/models/workflow/JpaWorkflowStateProvider.java index d47fedc13f7..39df840a0e0 100644 --- a/model/jpa/src/main/java/org/keycloak/models/workflow/JpaWorkflowStateProvider.java +++ b/model/jpa/src/main/java/org/keycloak/models/workflow/JpaWorkflowStateProvider.java @@ -45,22 +45,23 @@ public class JpaWorkflowStateProvider implements WorkflowStateProvider { @Override public ScheduledStep getScheduledStep(String workflowId, String resourceId) { - WorkflowStateEntity.PrimaryKey pk = new WorkflowStateEntity.PrimaryKey(resourceId, workflowId); - WorkflowStateEntity entity = em.find(WorkflowStateEntity.class, pk); - if (entity != null) { - return new ScheduledStep(entity.getWorkflowId(), entity.getScheduledStepId(), entity.getResourceId()); - } - return null; + CriteriaBuilder cb = em.getCriteriaBuilder(); + CriteriaQuery query = cb.createQuery(WorkflowStateEntity.class); + Root stateRoot = query.from(WorkflowStateEntity.class); + + query.where(cb.and(cb.equal(stateRoot.get("workflowId"), workflowId), cb.equal(stateRoot.get("resourceId"), resourceId))); + WorkflowStateEntity entity = em.createQuery(query).getSingleResultOrNull(); + return entity != null ? toScheduledStep(entity) : null; } @Override - public void scheduleStep(Workflow workflow, WorkflowStep step, String resourceId) { - WorkflowStateEntity.PrimaryKey pk = new WorkflowStateEntity.PrimaryKey(resourceId, workflow.getId()); - WorkflowStateEntity entity = em.find(WorkflowStateEntity.class, pk); + public void scheduleStep(Workflow workflow, WorkflowStep step, String resourceId, String executionId) { + WorkflowStateEntity entity = em.find(WorkflowStateEntity.class, executionId); if (entity == null) { entity = new WorkflowStateEntity(); entity.setResourceId(resourceId); entity.setWorkflowId(workflow.getId()); + entity.setExecutionId(executionId); entity.setWorkflowProviderId(workflow.getProviderId()); entity.setScheduledStepId(step.getId()); entity.setScheduledStepTimestamp(Time.currentTimeMillis() + step.getAfter()); @@ -83,7 +84,7 @@ public class JpaWorkflowStateProvider implements WorkflowStateProvider { query.where(cb.and(byWorkflow, isExpired)); return em.createQuery(query).getResultStream() - .map(s -> new ScheduledStep(s.getWorkflowId(), s.getScheduledStepId(), s.getResourceId())) + .map(this::toScheduledStep) .toList(); } @@ -101,7 +102,7 @@ public class JpaWorkflowStateProvider implements WorkflowStateProvider { query.where(byWorkflow); return em.createQuery(query).getResultStream() - .map(s -> new ScheduledStep(s.getWorkflowId(), s.getScheduledStepId(), s.getResourceId())) + .map(this::toScheduledStep) .toList(); } @@ -115,7 +116,20 @@ public class JpaWorkflowStateProvider implements WorkflowStateProvider { query.where(byResource); return em.createQuery(query).getResultStream() - .map(s -> new ScheduledStep(s.getWorkflowId(), s.getScheduledStepId(), s.getResourceId())) + .map(this::toScheduledStep) + .toList(); + } + + public List getScheduledStepsByStep(String stepId) { + CriteriaBuilder cb = em.getCriteriaBuilder(); + CriteriaQuery query = cb.createQuery(WorkflowStateEntity.class); + Root stateRoot = query.from(WorkflowStateEntity.class); + + Predicate byStep = cb.equal(stateRoot.get("scheduledStepId"), stepId); + query.where(byStep); + + return em.createQuery(query).getResultStream() + .map(this::toScheduledStep) .toList(); } @@ -135,16 +149,7 @@ public class JpaWorkflowStateProvider implements WorkflowStateProvider { } @Override - public void remove(String workflowId, String resourceId) { - WorkflowStateEntity.PrimaryKey pk = new WorkflowStateEntity.PrimaryKey(resourceId, workflowId); - WorkflowStateEntity entity = em.find(WorkflowStateEntity.class, pk); - if (entity != null) { - em.remove(entity); - } - } - - @Override - public void remove(String workflowId) { + public void removeByWorkflow(String workflowId) { CriteriaBuilder cb = em.getCriteriaBuilder(); CriteriaDelete delete = cb.createCriteriaDelete(WorkflowStateEntity.class); Root root = delete.from(WorkflowStateEntity.class); @@ -159,6 +164,14 @@ public class JpaWorkflowStateProvider implements WorkflowStateProvider { } } + @Override + public void remove(String executionId) { + WorkflowStateEntity entity = em.find(WorkflowStateEntity.class, executionId); + if (entity != null) { + em.remove(entity); + } + } + @Override public void removeAll() { CriteriaBuilder cb = em.getCriteriaBuilder(); @@ -177,4 +190,7 @@ public class JpaWorkflowStateProvider implements WorkflowStateProvider { public void close() { } + private ScheduledStep toScheduledStep(WorkflowStateEntity entity) { + return new ScheduledStep(entity.getWorkflowId(), entity.getScheduledStepId(), entity.getResourceId(), entity.getExecutionId()); + } } diff --git a/model/jpa/src/main/java/org/keycloak/models/workflow/WorkflowStateEntity.java b/model/jpa/src/main/java/org/keycloak/models/workflow/WorkflowStateEntity.java index 8e68e005d30..87b1774f9e8 100644 --- a/model/jpa/src/main/java/org/keycloak/models/workflow/WorkflowStateEntity.java +++ b/model/jpa/src/main/java/org/keycloak/models/workflow/WorkflowStateEntity.java @@ -30,14 +30,15 @@ import java.util.Objects; */ @Entity @Table(name = "WORKFLOW_STATE") -@IdClass(WorkflowStateEntity.PrimaryKey.class) public class WorkflowStateEntity { @Id + @Column(name = "EXECUTION_ID") + private String executionId; + @Column(name = "RESOURCE_ID") private String resourceId; - @Id @Column(name = "WORKFLOW_ID") private String workflowId; @@ -53,6 +54,14 @@ public class WorkflowStateEntity { @Column(name = "SCHEDULED_STEP_TIMESTAMP") private long scheduledStepTimestamp; + public String getExecutionId() { + return executionId; + } + + public void setExecutionId(String executionId) { + this.executionId = executionId; + } + public String getResourceId() { return resourceId; } @@ -101,49 +110,6 @@ public class WorkflowStateEntity { this.scheduledStepTimestamp = scheduledStepTimestamp; } - public static class PrimaryKey implements Serializable { - - private String resourceId; - private String workflowId; - - public PrimaryKey() { - } - - public PrimaryKey(String resourceId, String workflowId) { - this.resourceId = resourceId; - this.workflowId = workflowId; - } - - public String getResourceId() { - return resourceId; - } - - public void setResourceId(String resourceId) { - this.resourceId = resourceId; - } - - public String getWorkflowId() { - return workflowId; - } - - public void setWorkflowId(String workflowId) { - this.workflowId = workflowId; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - PrimaryKey that = (PrimaryKey) o; - return Objects.equals(resourceId, that.resourceId) && Objects.equals(workflowId, that.workflowId); - } - - @Override - public int hashCode() { - return Objects.hash(resourceId, workflowId); - } - } - @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/model/jpa/src/main/resources/META-INF/jpa-changelog-26.4.0.xml b/model/jpa/src/main/resources/META-INF/jpa-changelog-26.4.0.xml index 932ff41616e..90accba1db2 100644 --- a/model/jpa/src/main/resources/META-INF/jpa-changelog-26.4.0.xml +++ b/model/jpa/src/main/resources/META-INF/jpa-changelog-26.4.0.xml @@ -31,6 +31,9 @@ + + + @@ -44,9 +47,13 @@ + columnNames="EXECUTION_ID" /> + + diff --git a/model/storage-private/src/main/java/org/keycloak/models/workflow/WorkflowStateProvider.java b/model/storage-private/src/main/java/org/keycloak/models/workflow/WorkflowStateProvider.java index a284c3bf25f..4f085ad46c1 100644 --- a/model/storage-private/src/main/java/org/keycloak/models/workflow/WorkflowStateProvider.java +++ b/model/storage-private/src/main/java/org/keycloak/models/workflow/WorkflowStateProvider.java @@ -33,25 +33,23 @@ public interface WorkflowStateProvider extends Provider { */ void removeByResource(String resourceId); - /** - * Removes the record identified by the specified {@code workflowId} and {@code resourceId}. - * @param workflowId the id of the workflow. - * @param resourceId the id of the resource. - */ - void remove(String workflowId, String resourceId); - /** * Removes any record identified by the specified {@code workflowId}. * @param workflowId the id of the workflow. */ - void remove(String workflowId); + void removeByWorkflow(String workflowId); + + /** + * Removes the record identified by the specified {@code executionId}. + */ + void remove(String executionId); /** * Deletes all state records associated with the current realm bound to the session. */ void removeAll(); - void scheduleStep(Workflow workflow, WorkflowStep step, String resourceId); + void scheduleStep(Workflow workflow, WorkflowStep step, String resourceId, String executionId); ScheduledStep getScheduledStep(String workflowId, String resourceId); @@ -59,6 +57,8 @@ public interface WorkflowStateProvider extends Provider { List getScheduledStepsByWorkflow(String workflowId); + List getScheduledStepsByStep(String stepId); + default List getScheduledStepsByWorkflow(Workflow workflow) { if (workflow == null) { return List.of(); @@ -69,5 +69,5 @@ public interface WorkflowStateProvider extends Provider { List getDueScheduledSteps(Workflow workflow); - record ScheduledStep(String workflowId, String stepId, String resourceId) {} + record ScheduledStep(String workflowId, String stepId, String resourceId, String executionId) {} } diff --git a/server-spi-private/src/main/java/org/keycloak/models/workflow/Workflow.java b/server-spi-private/src/main/java/org/keycloak/models/workflow/Workflow.java index 52928d4296b..b59d4a29132 100644 --- a/server-spi-private/src/main/java/org/keycloak/models/workflow/Workflow.java +++ b/server-spi-private/src/main/java/org/keycloak/models/workflow/Workflow.java @@ -19,6 +19,7 @@ package org.keycloak.models.workflow; import static org.keycloak.representations.workflows.WorkflowConstants.CONFIG_ENABLED; import static org.keycloak.representations.workflows.WorkflowConstants.CONFIG_ERROR; +import static org.keycloak.representations.workflows.WorkflowConstants.CONFIG_NAME; import static org.keycloak.representations.workflows.WorkflowConstants.CONFIG_RECURRING; import java.util.List; @@ -59,6 +60,10 @@ public class Workflow { return config; } + public String getName() { + return config != null ? config.getFirst(CONFIG_NAME) : null; + } + public boolean isEnabled() { return config != null && Boolean.parseBoolean(config.getFirstOrDefault(CONFIG_ENABLED, "true")); } diff --git a/server-spi-private/src/main/java/org/keycloak/models/workflow/WorkflowExecutionContext.java b/server-spi-private/src/main/java/org/keycloak/models/workflow/WorkflowExecutionContext.java new file mode 100644 index 00000000000..6bf9d2baf3b --- /dev/null +++ b/server-spi-private/src/main/java/org/keycloak/models/workflow/WorkflowExecutionContext.java @@ -0,0 +1,91 @@ +package org.keycloak.models.workflow; + +import org.jboss.logging.Logger; + +import java.util.List; +import java.util.UUID; + +import static java.util.Optional.ofNullable; + +public class WorkflowExecutionContext { + + private static final Logger logger = Logger.getLogger(WorkflowExecutionContext.class); + + private String executionId; + private String resourceId; + private Workflow workflow; + private List steps; + + // variable that keep track of execution steps + private int lastExecutedStepIndex = -1; + + public WorkflowExecutionContext(Workflow workflow, List steps, String resourceId) { + this.workflow = workflow; + this.steps = ofNullable(steps).orElse(List.of()); + this.resourceId = resourceId; + } + + public WorkflowExecutionContext(Workflow workflow, List steps, String resourceId, String stepId, String executionId) { + this(workflow, steps, resourceId); + this.executionId = executionId; + if (stepId != null) { + for (int i = 0; i < steps.size(); i++) { + if (steps.get(i).getId().equals(stepId)) { + this.lastExecutedStepIndex = i - 1; + break; + } + } + } + } + + public void init() { + if (this.executionId == null) { + this.executionId = UUID.randomUUID().toString(); + logger.debugf("Started workflow '%s' for resource %s (execution id: %s)", this.workflow.getName(), this.resourceId, this.executionId); + } + } + + public void success(WorkflowStep step) { + logger.debugf("Step %s completed successfully (execution id: %s)", step.getProviderId(), executionId); + } + + public void fail(WorkflowStep step, String errorMessage) { + StringBuilder sb = new StringBuilder(); + sb.append("Step %s failed (execution id: %s)"); + if (errorMessage != null) { + sb.append(" - error message: %s"); + logger.debugf(sb.toString(), step.getProviderId(), executionId, errorMessage); + } + else { + logger.debugf(sb.toString(), step.getProviderId(), executionId); + } + } + + public void complete() { + logger.debugf("Workflow '%s' completed for resource %s (execution id: %s)", workflow.getName(), resourceId, executionId); + } + + public void cancel() { + logger.debugf("Workflow '%s' cancelled for resource %s (execution id: %s)", workflow.getName(), resourceId, executionId); + } + + public boolean hasNextStep() { + return lastExecutedStepIndex + 1 < steps.size(); + } + + public WorkflowStep getNextStep() { + if (lastExecutedStepIndex + 1 < steps.size()) { + return steps.get(++lastExecutedStepIndex); + } + return null; + } + + public void restart() { + logger.debugf("Restarted workflow '%s' for resource %s (execution id: %s)",workflow.getName(), resourceId, executionId); + this.lastExecutedStepIndex = -1; + } + + public String getExecutionId() { + return this.executionId; + } +} diff --git a/server-spi-private/src/main/java/org/keycloak/models/workflow/WorkflowExecutionException.java b/server-spi-private/src/main/java/org/keycloak/models/workflow/WorkflowExecutionException.java new file mode 100644 index 00000000000..e41f8bb6ddf --- /dev/null +++ b/server-spi-private/src/main/java/org/keycloak/models/workflow/WorkflowExecutionException.java @@ -0,0 +1,10 @@ +package org.keycloak.models.workflow; + +import org.keycloak.models.ModelException; + +public class WorkflowExecutionException extends ModelException { + + public WorkflowExecutionException(String message) { + super(message); + } +} diff --git a/services/src/main/java/org/keycloak/models/workflow/WorkflowsManager.java b/services/src/main/java/org/keycloak/models/workflow/WorkflowsManager.java index f9cb4fab671..d4d45ff87e7 100644 --- a/services/src/main/java/org/keycloak/models/workflow/WorkflowsManager.java +++ b/services/src/main/java/org/keycloak/models/workflow/WorkflowsManager.java @@ -47,6 +47,7 @@ import static java.util.Optional.ofNullable; import static org.keycloak.representations.workflows.WorkflowConstants.CONFIG_CONDITIONS; import static org.keycloak.representations.workflows.WorkflowConstants.CONFIG_ENABLED; import static org.keycloak.representations.workflows.WorkflowConstants.CONFIG_NAME; +import static org.keycloak.representations.workflows.WorkflowConstants.CONFIG_RECURRING; public class WorkflowsManager { @@ -148,10 +149,6 @@ public class WorkflowsManager { return toStep(component); } - private WorkflowStep getFirstStep(Workflow workflow) { - return getSteps(workflow.getId()).get(0); - } - private WorkflowProvider getWorkflowProvider(Workflow workflow) { ComponentFactory factory = (ComponentFactory) session.getKeycloakSessionFactory() .getProviderFactory(WorkflowProvider.class, workflow.getProviderId()); @@ -198,8 +195,8 @@ public class WorkflowsManager { } public void processEvent(List workflows, WorkflowEvent event) { - List currentlyAssignedWorkflows = workflowStateProvider.getScheduledStepsByResource(event.getResourceId()) - .stream().map(ScheduledStep::workflowId).toList(); + Map scheduledSteps = workflowStateProvider.getScheduledStepsByResource(event.getResourceId()) + .stream().collect(HashMap::new, (m, v) -> m.put(v.workflowId(), v), HashMap::putAll); // iterate through the workflows, and for those not yet assigned to the user check if they can be assigned workflows.stream() @@ -207,46 +204,38 @@ public class WorkflowsManager { .forEach(workflow -> { WorkflowProvider provider = getWorkflowProvider(workflow); try { - if (!currentlyAssignedWorkflows.contains(workflow.getId())) { - // if workflow is not active for the resource, check if the provider allows activating based on the event + // if workflow is not active for the resource, check if the provider allows activating based on the event + if (!scheduledSteps.containsKey(workflow.getId())) { if (provider.activateOnEvent(event)) { - WorkflowStep firstStep = getFirstStep(workflow); - for (WorkflowStep step : getSteps(workflow.getId())) { - // If the workflow has a notBefore set, schedule the first step with it - if (step.getId().equals(firstStep.getId()) && workflow.getNotBefore() != null && workflow.getNotBefore() > 0) { - log.debugf("Scheduling first step %s of workflow %s for resource %s based on on event %s with notBefore %d", - step.getId(), workflow.getId(), event.getResourceId(), event.getOperation(), workflow.getNotBefore()); - Long originalAfter = step.getAfter(); - try { - step.setAfter(workflow.getNotBefore()); - workflowStateProvider.scheduleStep(workflow, step, event.getResourceId()); - continue; - } finally { - // restore the original after value - step.setAfter(originalAfter); - } - } - if (step.getAfter() > 0) { - // If a step has a time defined, schedule it and stop processing the other steps of workflow - log.debugf("Scheduling step %s of workflow %s for resource %s based on event %s", - step.getId(), workflow.getId(), event.getResourceId(), event.getOperation()); - workflowStateProvider.scheduleStep(workflow, step, event.getResourceId()); - break; - } else { - // Otherwise run the step right away - log.debugf("Running step %s of workflow %s for resource %s based on event %s", - step.getId(), workflow.getId(), event.getResourceId(), event.getOperation()); - KeycloakModelUtils.runJobInTransaction(session.getKeycloakSessionFactory(), session.getContext(), s -> - getStepProvider(step).run(List.of(event.getResourceId())) - ); + WorkflowExecutionContext context = buildAndInitContext(workflow, event.getResourceId()); + // If the workflow has a notBefore set, schedule the first step with it + if (context.hasNextStep() && workflow.getNotBefore() != null && workflow.getNotBefore() > 0) { + WorkflowStep step = context.getNextStep(); + log.debugf("Scheduling first step '%s' of workflow '%s' for resource %s based on on event %s with notBefore %d", + step.getProviderId(), workflow.getName(), event.getResourceId(), event.getOperation(), workflow.getNotBefore()); + Long originalAfter = step.getAfter(); + try { + step.setAfter(workflow.getNotBefore()); + workflowStateProvider.scheduleStep(workflow, step, event.getResourceId(), context.getExecutionId()); + } finally { + // restore the original after value + step.setAfter(originalAfter); } } + else { + // process the workflow steps, scheduling or running them as needed + processWorkflow(workflow, context, event.getResourceId()); + } } } else { + // workflow is active for the resource, check if the provider wants to reset or deactivate it based on the event + WorkflowExecutionContext context = buildFromScheduledStep(scheduledSteps.get(workflow.getId())); if (provider.resetOnEvent(event)) { - workflowStateProvider.scheduleStep(workflow, getFirstStep(workflow), event.getResourceId()); + context.restart(); + processWorkflow(workflow, context, event.getResourceId()); } else if (provider.deactivateOnEvent(event)) { - workflowStateProvider.remove(workflow.getId(), event.getResourceId()); + context.cancel(); + workflowStateProvider.remove(context.getExecutionId()); } } } catch (WorkflowInvalidStateException e) { @@ -259,44 +248,23 @@ public class WorkflowsManager { } public void runScheduledSteps() { - this.getWorkflows().stream().filter(Workflow::isEnabled).forEach(workflow -> { + this.getWorkflows().stream().filter(Workflow::isEnabled).forEach(workflow -> { - for (ScheduledStep scheduled : workflowStateProvider.getDueScheduledSteps(workflow)) { - List steps = getSteps(workflow.getId()); - - for (int i = 0; i < steps.size(); i++) { - WorkflowStep currentStep = steps.get(i); - - if (currentStep.getId().equals(scheduled.stepId())) { - getStepProvider(currentStep).run(List.of(scheduled.resourceId())); - - int nextIndex = i + 1; - // Process subsequent steps: run immediately if no time condition, schedule if time condition - while (nextIndex < steps.size()) { - WorkflowStep nextStep = steps.get(nextIndex); - if (nextStep.getAfter() > 0) { - workflowStateProvider.scheduleStep(workflow, nextStep, scheduled.resourceId()); - break; - } else { - getStepProvider(nextStep).run(List.of(scheduled.resourceId())); - nextIndex++; - } - } - - if (nextIndex == steps.size()) { - // this was the last step, check if the workflow is recurring - i.e. if we need to schedule the first step again - if (workflow.isRecurring()) { - WorkflowStep firstStep = getFirstStep(workflow); - workflowStateProvider.scheduleStep(workflow, firstStep, scheduled.resourceId()); - } else { - // not recurring, remove the state record - workflowStateProvider.remove(workflow.getId(), scheduled.resourceId()); - } - } - } - } + for (ScheduledStep scheduled : workflowStateProvider.getDueScheduledSteps(workflow)) { + WorkflowExecutionContext context = buildFromScheduledStep(scheduled); + if (!context.hasNextStep()) { + log.warnf("Could not find step %s in workflow %s for resource %s. Removing the workflow state.", + scheduled.stepId(), scheduled.workflowId(), scheduled.resourceId()); + workflowStateProvider.remove(scheduled.executionId()); + continue; } - }); + // run the scheduled step that is due + this.runWorkflowStep(context, context.getNextStep(), scheduled.resourceId()); + + // now process the subsequent steps, scheduling or running them as needed + processWorkflow(workflow, context, scheduled.resourceId()); + } + }); } public void removeWorkflow(String id) { @@ -307,7 +275,7 @@ public class WorkflowsManager { realm.getComponentsStream(workflow.getId(), WorkflowStepProvider.class.getName()).forEach(realm::removeComponent); realm.removeComponent(workflow); }); - workflowStateProvider.remove(id); + workflowStateProvider.removeByWorkflow(id); } public Workflow getWorkflow(String id) { @@ -431,6 +399,14 @@ public class WorkflowsManager { private void validateWorkflow(WorkflowRepresentation rep) { validateEvents(rep.getOnValues()); validateEvents(rep.getOnEventsReset()); + // a recurring workflow must have at least one scheduled step to prevent an infinite loop of immediate executions + if (rep.getConfig() != null && Boolean.parseBoolean(rep.getConfig().getFirstOrDefault(CONFIG_RECURRING, "false"))) { + boolean hasScheduledStep = ofNullable(rep.getSteps()).orElse(List.of()).stream() + .anyMatch(step -> Integer.parseInt(ofNullable(step.getAfter()).orElse("0")) > 0); + if (!hasScheduledStep) { + throw new WorkflowInvalidStateException("A recurring workflow must have at least one step with a time delay."); + } + } } private static void validateEvents(List events) { @@ -478,8 +454,6 @@ public class WorkflowsManager { step.setPriority(targetPosition + 1); WorkflowStep addedStep = addStep(workflow, step); - updateScheduledStepsAfterStepChange(workflow.getId()); - log.debugf("Added step %s to workflow %s at position %d", addedStep.getId(), workflow.getId(), targetPosition); return addedStep; } @@ -499,7 +473,7 @@ public class WorkflowsManager { // Reorder remaining steps and update state reorderAllSteps(workflow.getId()); - updateScheduledStepsAfterStepChange(workflow.getId()); + updateScheduledStepsAfterStepChange(workflow, stepId); log.debugf("Removed step %s from workflow %s", stepId, workflow.getId()); } @@ -532,22 +506,12 @@ public class WorkflowsManager { realm.updateComponent(component); } - private void updateScheduledStepsAfterStepChange(String workflowId) { - List steps = getSteps(workflowId); + private void updateScheduledStepsAfterStepChange(Workflow workflow, String stepId) { - if (steps.isEmpty()) { - workflowStateProvider.remove(workflowId); - return; - } - - for (ScheduledStep scheduled : workflowStateProvider.getScheduledStepsByWorkflow(workflowId)) { - boolean stepStillExists = steps.stream() - .anyMatch(step -> step.getId().equals(scheduled.stepId())); - - if (!stepStillExists) { - Workflow workflow = getWorkflow(workflowId); - workflowStateProvider.scheduleStep(workflow, steps.get(0), scheduled.resourceId()); - } + for (ScheduledStep scheduled : workflowStateProvider.getScheduledStepsByStep(stepId)) { + WorkflowExecutionContext context = buildFromScheduledStep(scheduled); + context.restart(); + workflowStateProvider.scheduleStep(workflow, context.getNextStep(), scheduled.resourceId(), context.getExecutionId()); } } @@ -586,4 +550,61 @@ public class WorkflowsManager { return providerFactory; } + + private WorkflowExecutionContext buildAndInitContext(Workflow workflow, String resourceId) { + WorkflowExecutionContext context = new WorkflowExecutionContext(workflow, getSteps(workflow.getId()), resourceId); + context.init(); + return context; + } + + private WorkflowExecutionContext buildFromScheduledStep(ScheduledStep scheduledStep) { + return new WorkflowExecutionContext( + getWorkflow(scheduledStep.workflowId()), + getSteps(scheduledStep.workflowId()), + scheduledStep.resourceId(), + scheduledStep.stepId(), + scheduledStep.executionId() + ); + } + + private void processWorkflow(Workflow workflow, WorkflowExecutionContext context, String resourceId) { + while (context.hasNextStep()) { + WorkflowStep step = context.getNextStep(); + if (step.getAfter() > 0) { + // If a step has a time defined, schedule it and stop processing the other steps of workflow + log.debugf("Scheduling step %s to run in %d ms for resource %s (execution id: %s)", + step.getProviderId(), step.getAfter(), resourceId, context.getExecutionId()); + workflowStateProvider.scheduleStep(workflow, step, resourceId, context.getExecutionId()); + return; + } else { + // Otherwise run the step right away + + runWorkflowStep(context, step, resourceId); + } + } + + // if we've reached the end of the workflow, check if it is recurring or if we can mark it as completed + if (workflow.isRecurring()) { + // if the workflow is recurring, restart it + context.restart(); + processWorkflow(workflow, context, resourceId); + } else { + // not recurring, remove the state record + context.complete(); + workflowStateProvider.remove(context.getExecutionId()); + } + } + + private void runWorkflowStep(WorkflowExecutionContext context, WorkflowStep step, String resourceId) { + log.debugf("Running step %s on resource %s (execution id: %s)", step.getProviderId(), resourceId, context.getExecutionId()); + KeycloakModelUtils.runJobInTransaction(session.getKeycloakSessionFactory(), session.getContext(), s -> { + try { + getStepProvider(step).run(List.of(resourceId)); + context.success(step); + } catch(WorkflowExecutionException e) { + context.fail(step, e.getMessage()); + throw e; + } + }); + } }