Process
The Process
class is a powerful tool for managing complex workflows and data transformations. It allows you to chain together a series of steps, each performing a specific operation on the input data.
Here’s a breakdown of the key concepts and features:
Pipeline Configuration
The Process
class is designed to be flexible and configurable. You can define the steps in your pipeline using either a list or a dictionary:
List-based Configuration: This approach is best for simple pipelines with a linear flow. Each element in the list represents a step in the process.
from process import Process process = Process([ 'step1', 'step2', 'step3' ])
Source: process/init.py
Dictionary-based Configuration: This approach allows for more complex workflows, enabling parallel processing and conditional execution.
from process import Process process = Process({ 'step1': { 'function': 'step1_function', 'depends_on': ['step3'] }, 'step2': { 'function': 'step2_function' }, 'step3': { 'function': 'step3_function' } })
Source: process/init.py
Step Functions
Each step in the process is defined by a function. This function takes the input data and performs a specific operation, returning the transformed data.
def step1_function(data):
# Perform some operation on the data
return transformed_data
def step2_function(data):
# Perform another operation on the data
return transformed_data
def step3_function(data):
# Perform a third operation on the data
return transformed_data
Dependencies
In complex pipelines, you might need to execute steps in a specific order. The Process
class allows you to specify dependencies between steps. This ensures that a step is only executed after its dependencies have been completed.
# Define a process with dependencies
process = Process({
'step1': {
'function': 'step1_function',
'depends_on': ['step3']
},
'step2': {
'function': 'step2_function'
},
'step3': {
'function': 'step3_function'
}
})
# Execute the process
result = process.run(initial_data)
Source: process/init.py
Error Handling
The Process
class provides built-in error handling mechanisms. If an error occurs during the execution of a step, the process will stop and raise an exception. You can customize the error handling behavior by defining exception handlers for specific steps.
def step1_function(data):
# Perform some operation on the data
if data == 'error':
raise ValueError('Error in step 1')
return transformed_data
# Define an exception handler for step 1
process = Process({
'step1': {
'function': 'step1_function',
'exception_handler': lambda e: 'Error caught in step 1'
}
})
# Execute the process
result = process.run(initial_data)
Source: process/init.py
Parallel Execution
The Process
class allows for parallel execution of steps. This can significantly reduce the overall execution time of your workflow.
from process import Process, Parallel
process = Process({
'step1': {
'function': 'step1_function',
'parallel': Parallel(max_workers=4)
},
'step2': {
'function': 'step2_function'
}
})
# Execute the process
result = process.run(initial_data)
Source: process/init.py
Example Usage
from process import Process
# Define the process steps
def step1_function(data):
return data * 2
def step2_function(data):
return data + 10
def step3_function(data):
return data / 2
# Create a process with dependencies
process = Process({
'step1': {
'function': 'step1_function',
'depends_on': ['step3']
},
'step2': {
'function': 'step2_function'
},
'step3': {
'function': 'step3_function'
}
})
# Execute the process
initial_data = 5
result = process.run(initial_data)
# Print the result
print(result)
Source: process/init.py