orquesta
orquesta copied to clipboard
( #275 ) Allow `context` Use in `WorkflowRehearsal`
This addresses the issue outlined in #275. The shape of the context I created for the unit tests is st2 similar to the runtime context provided by the st2 orquesta runner. This makes it easier to test StackStorm workflows that rely on context to function. For things like the st2kv_ function and the task function, those can be easily enough added by a user by extending orquesta.expressions.functions. Having said that, I thought it might be a bit too tightly coupled to add fixture functions in for that functionality.
Beyond the tests I added to this repo, here is a token example of how this can be used by folks wanting to write unit tests for their workflows:
Minimal Workflow - includes ctx reference
---
version: 1.0
description: A fake workflow
output:
- from_context: <% ctx().st2.action_execution_id %>
- published_from_context: <% ctx().published_from_context %>
tasks:
task1:
action: core.noop
next:
- publish:
- published_from_context: <% ctx().st2.action_execution_id %>
Testing the workflow with context
from __future__ import annotations
import pathlib
from orquesta import rehearsing, statuses
from orquesta.tests.unit.conducting.native import base
class TestWorkflowMockedFunction(base.OrchestraWorkflowConductorTest):
def setUp(self) -> None:
super().setUp()
self.workflow_path = str(pathlib.Path(__file__).parent / "workflow_has_context.yaml")
def test_workflow_mocked_function(self) -> None:
test_spec = {
"workflow": self.workflow_path,
"context": {"st2": {"action_execution_id": "123"}},
"expected_task_sequence": ["task1", "continue"],
"expected_workflow_status": statuses.SUCCEEDED,
"expected_output": {"from_context": "123", "published_from_context": "123"},
}
rehearsal = rehearsing.load_test_spec(test_spec)
rehearsal.assert_conducting_sequence()
Outcome
pytest test_workflow_has_context.py -vvv
=============================================================================================================== test session starts ===============================================================================================================
cachedir: .pytest_cache
configfile: pyproject.toml
plugins: cov-4.1.0
collected 1 item
test_workflow_has_context.py::TestWorkflowMockedFunction::test_workflow_mocked_function PASSED [100%]
================================================================================================================ 1 passed in 0.23s ================================================================================================================