Harvester Developer Guide Part 2 - Worker implementation with Python⚓︎
This chapter covers implementing the tasks for the workflow we created in the first part of this tutorial. The workflow definition file and code samples can be found in the Registration Harvester repository.
Understanding the TaskHandler⚓︎
The TaskHandler (located in worker.common.task_handler.py) is the base class for all custom task implementations. When creating a new task, you’ll primarily focus on overriding its execute method.
The execute Method⚓︎
The execute(self, task: ExternalTask, config: dict) -> TaskResult method is where the actual work of your task is performed. This is the only method you typically need to override in your custom handler.
- Purpose: To receive task details, perform the task’s business logic, and return a result indicating success or failure, along with any output variables.
- Parameters:
task: ExternalTask: An object providing access to the current task details. Key uses include:task.get_task_id(): The unique ID of the current task instance.task.get_topic_name(): The name or ID of the task element from the BPMN workflow.task.get_variable("my_input_variable"): To retrieve input variables passed from the workflow.
config: dict: A general configuration dictionary. While available, for handler-specific configurations, it is generally preferred to use the self.get_config() method (see below).
- Returns: A TaskResult object which informs the workflow engine about the task’s outcome e.g.
- success:
task.complete(global_variables={"variable_name": value}) - failure:
task.failure(error_message="...", error_details="...", max_retries=3, retry_timeout=60000)
Supporting Features from the Base TaskHandler⚓︎
The TaskHandler base class provides some helpful features:
- Configuration Handling (init and get_config):
- The base TaskHandler’s constructor (init) automatically loads configurations specific to your handler class from the main
config.yamlfile (underhandlers.<YourHandlerClassName>). It also handles subscription-related settings. - You can access these configurations within your execute method using
self.get_config(key: str, default=None). For example,api_key = self.get_config("api_key").
- The base TaskHandler’s constructor (init) automatically loads configurations specific to your handler class from the main
So, when implementing a new task, your focus will be on the execute method, leveraging task.get_variable(), self.get_config(), and task.complete or task.failure() for outputs.
Implementing the tasks for our example workflow⚓︎
In general, for each workflow step modelled as external worker task, you have to do the following:
-
Create a Handler Class
- Create a new Python file (e.g.,
my_custom_tasks.py) or add to an existing one within thesrc/worker/directory structure (e.g.,src/worker/custom/tasks.py). - Define a new class that inherits from
worker.common.task_handler.TaskHandler.
- Create a new Python file (e.g.,
-
Implement the execute Method
Task implementation⚓︎
For our tutorial example we have two tasks to implement, the “Discovery STAC Items” and the “Process STAC Item” task. For simplicity we will implement both handler classes in the same Python module as shown below. The full Python file is located here.
class TutorialDiscoverItemsTaskHandler(TaskHandler):
def execute(self, task: ExternalTask, config: dict) -> TaskResult:
log_context = {
"WORKER_ID": task.get_worker_id(),
"TASK_ID": task.get_task_id(),
"TOPIC_NAME": task.get_topic_name(),
}
log_with_context("Starting DiscoverItems task ...", log_context)
try:
# no input data needed for this task
# get STAC API url from configuration
api_url = self.get_config("service_url", "https://stac.dataspace.copernicus.eu/v1/")
# 2. Perform task logic
log_with_context(f"Searching STAC items using API: {api_url}", log_context)
# stac search
catalog = Client.open(api_url, headers=[])
search = catalog.search(max_items=100, collections="sentinel-2-l2a", datetime="2025-07-02")
items = list(search.items_as_dicts())
# 3. Return success with output variables
log_with_context("DiscoverItems task completed successfully.", log_context)
return task.complete(global_variables={"items": items})
except Exception as e:
return task.failure(
error_message="Error in TutorialDiscoverItemsTaskHandler",
error_details=str(e),
max_retries=0,
retry_timeout=0,
)
class TutorialProcessItemTaskHandler(TaskHandler):
def execute(self, task: ExternalTask, config: dict) -> TaskResult:
log_context = {
"WORKER_ID": task.get_worker_id(),
"TASK_ID": task.get_task_id(),
"TOPIC_NAME": task.get_topic_name(),
}
log_with_context("Starting ProcessItem task ...", log_context)
try:
# 1. Get input variables
item = task.get_variable("item")
if not item:
return task.failure(
error_message="Missing input variable",
error_details="The variable 'item' is missing",
max_retries=0,
retry_timeout=0,
)
# 2. Perform task logic: just logging the item
log_with_context(f"Processing item {item}", log_context)
# 3. Return success, no output variable produced by this task
log_with_context("ProcessItem task completed successfully.", log_context)
return task.complete()
except Exception as e:
return task.failure(
error_message="Error in TutorialProcessItemTaskHandler",
error_details=str(e),
max_retries=0,
retry_timeout=0,
)
Best practices⚓︎
- Logging: Use
log_with_contextfor all logging. Includetask.get_task_id()andtask.get_topic_name()in yourlog_contextfor better traceability. - Input Variables: Retrieve any necessary input variables passed from the workflow using
task.get_variable("variable_name"). - Configuration: Access handler-specific configurations (defined in
config.yaml) usingself.get_config("config_key", "default_value"). - Business Logic: Implement the core functionality of your task.
- Error Handling: Wrap your logic in a
try...exceptblock. If an error occurs, log it and return a failure result, preferably usingtask.failure(). - Return Result: Use
task.complete(global_variables={})and pass the output data in theglobal_variablesdictionary back to the workflow.
Configuring the Worker⚓︎
After implementing your TaskHandler, you need to configure the worker to use it. This is done in the main configuration file (typically etc/config.yaml, or the path specified by the CONFIG_FILE_PATH environment variable).
The SubscriptionManager component automatically discovers and subscribes your handlers to specific topics based on this configuration.
General structure of configuration file⚓︎
topics: This section maps topic names (which your BPMN tasks will publish to) to your handler classes.- Each key is a topic name (e.g.,
my_custom_task_topic). - For each topic, you must specify:
module: The fully qualified Python module path to your handler class (e.g.,worker.custom.tasks).handler: The class name of your handler (e.g.,MyCustomTaskHandler).
- Each key is a topic name (e.g.,
handlers: This section provides specific configurations for individual TaskHandler implementations.- Each key is the class name of a handler (e.g.,
MyCustomTaskHandler). - Inside each handler’s configuration, you can define key-value pairs. These values are accessible within that handler using
self.get_config("your_key").
- Each key is the class name of a handler (e.g.,
Configuration for example workflow⚓︎
This sections provides the part of the configuration to bind our worker implementation to the tasks we defined in the BMPN model. The full configuration file can be found here.
# Bind the workflow steps defined in BPMN to a worker implementation by Job topic name
topics:
tutorial_discover_items: # Job topic name defined in the BPMN for this task
module: "worker.tutorial.tasks" # Path to the .py file containing the handler implementation, dot-separated
handler: "TutorialDiscoverItemsTaskHandler" # Name of the handler class
lock_duration: 300000 # Example: 5 minute lock duration for this task
retries: 3 # If the task fails, the BPMN engine will retry 3 times
tutorial_process_item:
module: "worker.tutorial.tasks"
handler: "TutorialProcessItemTaskHandler"
lock_duration: 300000
retries: 3
# Provide configuration specific to each handler, if needed
handlers:
TutorialDiscoverItemsTaskHandler:
service_url: "https://my.api.service.com/v1/process"
default_timeout_seconds: 60
How it Works⚓︎
- The SubscriptionManager starts up.
- It reads the
topicssection from theconfig.yaml. - For each topic entry, it dynamically imports the specified module and retrieves the handler class.
- It instantiates your handler class. The
handlersdictionary from the config is passed to the TaskHandler’s constructor. The TaskHandler.init method then filters out the configuration relevant to its specific class name. - The manager subscribes this handler instance to the specified topic, ready to process incoming jobs.
Workflow Integration⚓︎
Tasks implemented as TaskHandlers are invoked by an external system, the BPMN engine Operaton. In your BPMN model, you would define an “External Worker task”. The “Job topic” you configure for this external task in the BPMN model must match one of the topic names defined in your topics configuration (e.g., my_custom_task_topic).
When the workflow reaches such an external task, the engine publishes a job to the topic specified in the BPMN model. The worker, listening on that topic, picks up the job and delegates it to the configured TaskHandler for execution.