Process

The Process class is a powerful tool for managing complex workflows and data transformations. It allows you to chain together a series of steps, each performing a specific operation on the input data.

Here’s a breakdown of the key concepts and features:

Pipeline Configuration

The Process class is designed to be flexible and configurable. You can define the steps in your pipeline using either a list or a dictionary:

  • List-based Configuration: This approach is best for simple pipelines with a linear flow. Each element in the list represents a step in the process.

    from process import Process
              
              process = Process([
                  'step1',
                  'step2',
                  'step3'
              ])
              

    Source: process/init.py

  • Dictionary-based Configuration: This approach allows for more complex workflows, enabling parallel processing and conditional execution.

    from process import Process
              
              process = Process({
                  'step1': {
                      'function': 'step1_function',
                      'depends_on': ['step3']
                  },
                  'step2': {
                      'function': 'step2_function'
                  },
                  'step3': {
                      'function': 'step3_function'
                  }
              })
              

    Source: process/init.py

Step Functions

Each step in the process is defined by a function. This function takes the input data and performs a specific operation, returning the transformed data.

def step1_function(data):
              # Perform some operation on the data
              return transformed_data
          
          def step2_function(data):
              # Perform another operation on the data
              return transformed_data
          
          def step3_function(data):
              # Perform a third operation on the data
              return transformed_data
          

Dependencies

In complex pipelines, you might need to execute steps in a specific order. The Process class allows you to specify dependencies between steps. This ensures that a step is only executed after its dependencies have been completed.

# Define a process with dependencies
          process = Process({
              'step1': {
                  'function': 'step1_function',
                  'depends_on': ['step3']
              },
              'step2': {
                  'function': 'step2_function'
              },
              'step3': {
                  'function': 'step3_function'
              }
          })
          
          # Execute the process
          result = process.run(initial_data)
          

Source: process/init.py

Error Handling

The Process class provides built-in error handling mechanisms. If an error occurs during the execution of a step, the process will stop and raise an exception. You can customize the error handling behavior by defining exception handlers for specific steps.

def step1_function(data):
              # Perform some operation on the data
              if data == 'error':
                  raise ValueError('Error in step 1')
              return transformed_data
          
          # Define an exception handler for step 1
          process = Process({
              'step1': {
                  'function': 'step1_function',
                  'exception_handler': lambda e: 'Error caught in step 1'
              }
          })
          
          # Execute the process
          result = process.run(initial_data)
          

Source: process/init.py

Parallel Execution

The Process class allows for parallel execution of steps. This can significantly reduce the overall execution time of your workflow.

from process import Process, Parallel
          
          process = Process({
              'step1': {
                  'function': 'step1_function',
                  'parallel': Parallel(max_workers=4)
              },
              'step2': {
                  'function': 'step2_function'
              }
          })
          
          # Execute the process
          result = process.run(initial_data)
          

Source: process/init.py

Example Usage

from process import Process
          
          # Define the process steps
          def step1_function(data):
              return data * 2
          
          def step2_function(data):
              return data + 10
          
          def step3_function(data):
              return data / 2
          
          # Create a process with dependencies
          process = Process({
              'step1': {
                  'function': 'step1_function',
                  'depends_on': ['step3']
              },
              'step2': {
                  'function': 'step2_function'
              },
              'step3': {
                  'function': 'step3_function'
              }
          })
          
          # Execute the process
          initial_data = 5
          result = process.run(initial_data)
          
          # Print the result
          print(result)
          

Source: process/init.py