Skip to content

Modules

Step

Bases: BaseStep

Represents a step in the pipeline.

Source code in grpipe/step.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
class Step(BaseStep):
    """Represents a step in the pipeline."""

    def __init__(
        self,
        function: Callable[..., Any],
        *,
        params: Any = None,
        args: Any = None,
        max_cache_size: int,
        **kwargs: Any,
    ):
        name = function.__name__
        super().__init__(name=name, **kwargs)
        self.__run: Callable[..., Any] = function
        self.__args: dict[str, Step | Argument] = {}
        self.__params: dict[str, Any] = {}
        self.__cache_times: list[float] = []
        self.__run_times: list[float] = []

        self.__args = args or self.__infer_args(signature(self.__run))
        self.__params = params or self.__infer_params(signature(self.__run))

        self.cache: LRUCache = LRUCache(max_cache_size)
        self.__signature__ = signature(self.__run).replace(
            parameters=[
                Parameter(key, Parameter.KEYWORD_ONLY, default=val)
                for key, val in self.__args.items()
                if isinstance(val, Argument) and not val.bound
            ]
        )

    def __infer_args(self, signature: Signature) -> dict[str, Any]:
        args = {}
        for param in signature.parameters.values():
            if isinstance(param.default, (Step, Argument)):
                args[param.name] = param.default

        return args

    def __infer_params(self, signature: Signature) -> dict[str, Any]:
        params = {}
        for key, param in signature.parameters.items():
            if key in self.args:  # Skip bound arguments
                continue
            if param.default is Parameter.empty:
                raise ParameterError(param.name, self.name)
            params[param.name] = param.default

        return params

    def is_cachable(self, bound_args: dict[str, Any], kwargs: dict[str, Any]) -> bool:
        """
        Determine if the step's result can be cached based on the given arguments.

        Args:
            bound_args (dict[str, Any]): The bound arguments.
            kwargs (dict[str, Any]): The keyword arguments.

        Returns:
            bool: True if the result can be cached, False otherwise.
        """
        return all(not (not arg.cachable and arg_name not in bound_args) for arg_name, arg in self.__args.items())

    def get_cache_key(self, kwargs: dict[str, Any]) -> frozendict:
        """
        Generate a cache key based on the given arguments.

        Args:
            kwargs (dict[str, Any]): The keyword arguments.

        Returns:
            frozendict: A hashable cache key.
        """
        cache_dict = self.params.copy()
        cache_dict.update(kwargs)
        return frozendict({k: custom_hash(v) for k, v in cache_dict.items()})

    def __call__(self, *args: Any, **kwargs: Any) -> Any:
        """
        Execute the step with the given arguments.

        Args:
            *args: Positional arguments (not used).
            **kwargs: Keyword arguments for the step.

        Returns:
            Any: The result of executing the step.
        """
        if args:
            raise ArgumentError(0, f"{len(args)} positional arguments")
        bound_args = {k: v.value for k, v in self.args.items() if isinstance(v, Argument) and v.bound}
        if set(kwargs.keys()) != set(self.args.keys()) - set(bound_args.keys()):
            raise ArgumentError(
                set(self.args.keys()) - set(bound_args.keys()),
                set(kwargs.keys()),
            )

        is_cachable = self.is_cachable(bound_args, kwargs)

        joined_args = self.format_args(**kwargs, **self.params)

        if is_cachable:
            t0 = perf_counter()
            cache_key = self.get_cache_key(kwargs)
            t1 = perf_counter()
            self.__cache_times.append(t1 - t0)

            cached_result = self.cache.get(cache_key)
            if cached_result is not None:
                self.logger.debug(
                    f"[{self.__cache_times[-1]:.4f}] Returning cached value {self.name}({joined_args}) = {cached_result}"
                )
                return cached_result

        t0 = perf_counter()
        result = self.__run(**bound_args, **self.params, **kwargs)
        t1 = perf_counter()
        self.__run_times.append(t1 - t0)

        if is_cachable:
            self.cache.put(cache_key, result)
            self.logger.debug(f"[{self.__run_times[-1]:.4f}] Added {self.name}({joined_args}) = {result} to cache")
        else:
            self.logger.debug(
                f"[{self.__run_times[-1]:.4f}] Executed {self.name}({joined_args}) = {result} (not cached)"
            )

        return result

    def reset_cache(self) -> None:
        """Clear the step's cache."""
        self.logger.debug("Resetting cache")
        self.cache.clear()

    @property
    def params(self) -> dict[str, Any]:
        return self.__params

    def set_params(self, **kwargs: Any) -> "Step":
        """
        Set the parameters for the step.

        Args:
            **kwargs: The parameters to set.

        Returns:
            Step: The updated step instance.
        """
        change = False
        for key, value in kwargs.items():
            if key not in self.params:
                raise ParameterError(key, self.name)
            if self.params[key] != value:
                self.__params[key] = value
                change = True
        if change:
            self.logger.debug(f"Updated parameters {kwargs}")
        return self

    @property
    def args(self) -> dict[str, Any]:
        return self.__args

    def format_args(self, **kwargs: Any) -> str:
        return ", ".join([f"{key}={format_value(value)}" for key, value in kwargs.items()])

    def __repr__(self) -> str:
        return f"{self.name}({self.format_args(**self.args)})"

__call__(*args, **kwargs)

Execute the step with the given arguments.

Parameters:

Name Type Description Default
*args Any

Positional arguments (not used).

()
**kwargs Any

Keyword arguments for the step.

{}

Returns:

Name Type Description
Any Any

The result of executing the step.

Source code in grpipe/step.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def __call__(self, *args: Any, **kwargs: Any) -> Any:
    """
    Execute the step with the given arguments.

    Args:
        *args: Positional arguments (not used).
        **kwargs: Keyword arguments for the step.

    Returns:
        Any: The result of executing the step.
    """
    if args:
        raise ArgumentError(0, f"{len(args)} positional arguments")
    bound_args = {k: v.value for k, v in self.args.items() if isinstance(v, Argument) and v.bound}
    if set(kwargs.keys()) != set(self.args.keys()) - set(bound_args.keys()):
        raise ArgumentError(
            set(self.args.keys()) - set(bound_args.keys()),
            set(kwargs.keys()),
        )

    is_cachable = self.is_cachable(bound_args, kwargs)

    joined_args = self.format_args(**kwargs, **self.params)

    if is_cachable:
        t0 = perf_counter()
        cache_key = self.get_cache_key(kwargs)
        t1 = perf_counter()
        self.__cache_times.append(t1 - t0)

        cached_result = self.cache.get(cache_key)
        if cached_result is not None:
            self.logger.debug(
                f"[{self.__cache_times[-1]:.4f}] Returning cached value {self.name}({joined_args}) = {cached_result}"
            )
            return cached_result

    t0 = perf_counter()
    result = self.__run(**bound_args, **self.params, **kwargs)
    t1 = perf_counter()
    self.__run_times.append(t1 - t0)

    if is_cachable:
        self.cache.put(cache_key, result)
        self.logger.debug(f"[{self.__run_times[-1]:.4f}] Added {self.name}({joined_args}) = {result} to cache")
    else:
        self.logger.debug(
            f"[{self.__run_times[-1]:.4f}] Executed {self.name}({joined_args}) = {result} (not cached)"
        )

    return result

get_cache_key(kwargs)

Generate a cache key based on the given arguments.

Parameters:

Name Type Description Default
kwargs dict[str, Any]

The keyword arguments.

required

Returns:

Name Type Description
frozendict frozendict

A hashable cache key.

Source code in grpipe/step.py
77
78
79
80
81
82
83
84
85
86
87
88
89
def get_cache_key(self, kwargs: dict[str, Any]) -> frozendict:
    """
    Generate a cache key based on the given arguments.

    Args:
        kwargs (dict[str, Any]): The keyword arguments.

    Returns:
        frozendict: A hashable cache key.
    """
    cache_dict = self.params.copy()
    cache_dict.update(kwargs)
    return frozendict({k: custom_hash(v) for k, v in cache_dict.items()})

is_cachable(bound_args, kwargs)

Determine if the step's result can be cached based on the given arguments.

Parameters:

Name Type Description Default
bound_args dict[str, Any]

The bound arguments.

required
kwargs dict[str, Any]

The keyword arguments.

required

Returns:

Name Type Description
bool bool

True if the result can be cached, False otherwise.

Source code in grpipe/step.py
64
65
66
67
68
69
70
71
72
73
74
75
def is_cachable(self, bound_args: dict[str, Any], kwargs: dict[str, Any]) -> bool:
    """
    Determine if the step's result can be cached based on the given arguments.

    Args:
        bound_args (dict[str, Any]): The bound arguments.
        kwargs (dict[str, Any]): The keyword arguments.

    Returns:
        bool: True if the result can be cached, False otherwise.
    """
    return all(not (not arg.cachable and arg_name not in bound_args) for arg_name, arg in self.__args.items())

reset_cache()

Clear the step's cache.

Source code in grpipe/step.py
143
144
145
146
def reset_cache(self) -> None:
    """Clear the step's cache."""
    self.logger.debug("Resetting cache")
    self.cache.clear()

set_params(**kwargs)

Set the parameters for the step.

Parameters:

Name Type Description Default
**kwargs Any

The parameters to set.

{}

Returns:

Name Type Description
Step Step

The updated step instance.

Source code in grpipe/step.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def set_params(self, **kwargs: Any) -> "Step":
    """
    Set the parameters for the step.

    Args:
        **kwargs: The parameters to set.

    Returns:
        Step: The updated step instance.
    """
    change = False
    for key, value in kwargs.items():
        if key not in self.params:
            raise ParameterError(key, self.name)
        if self.params[key] != value:
            self.__params[key] = value
            change = True
    if change:
        self.logger.debug(f"Updated parameters {kwargs}")
    return self

Bases: BaseStep, Generic[T]

Represents an argument that can be bound to a value and passed to a step.

Methods:

Name Description
bind

Bind the argument to a value

unbind

Unbind the argument

bound

Check if the argument is bound

value

Get the value of the argument

Source code in grpipe/argument.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class Argument(BaseStep, Generic[T]):
    """
    Represents an argument that can be bound to a value and passed to a step.

    Methods:
        bind: Bind the argument to a value
        unbind: Unbind the argument
        bound: Check if the argument is bound
        value: Get the value of the argument
    """

    def __init__(self, name: str, *args: Any, **kwargs: Any):
        """Abstract argument object.

        This object is used to pass arguments to steps in a pipeline. It can be bound to a value and passed to a step as a parameter.

        Args:
            name (str): The name of the argument

        Methods:
            bind: Bind the argument to a value
            unbind: Unbind the argument
            bound: Check if the argument is bound
            value: Get the value of the argument
        """
        super().__init__(name, *args, **kwargs)
        self.__bound = False
        self.__value: Optional[T] = None

    def bind(self, value: T) -> "Argument[T]":
        """
        Bind the argument to a value.

        Args:
            value (T): The value to bind to the argument.

        Returns:
            Argument[T]: The updated argument instance.
        """
        self.logger.debug(f"Bound {self.name} to {format_value(value)}")
        self.__value = value
        self.__bound = True
        return self

    def unbind(self) -> "Argument[T]":
        """
        Unbind the argument, clearing its value.

        Returns:
            Argument[T]: The updated argument instance.
        """
        self.logger.debug(f"Unbound {self.name}")
        self.__value = None
        self.__bound = False
        return self

    @property
    def bound(self) -> bool:
        return self.__bound

    @property
    def value(self) -> T:
        if not self.__bound or self.__value is None:
            raise ArgumentError(self.name, None)
        return self.__value

    def __repr__(self) -> str:
        return f"Argument({self.name})"

__init__(name, *args, **kwargs)

Abstract argument object.

This object is used to pass arguments to steps in a pipeline. It can be bound to a value and passed to a step as a parameter.

Parameters:

Name Type Description Default
name str

The name of the argument

required

Functions:

Name Description
bind

Bind the argument to a value

unbind

Unbind the argument

bound

Check if the argument is bound

value

Get the value of the argument

Source code in grpipe/argument.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def __init__(self, name: str, *args: Any, **kwargs: Any):
    """Abstract argument object.

    This object is used to pass arguments to steps in a pipeline. It can be bound to a value and passed to a step as a parameter.

    Args:
        name (str): The name of the argument

    Methods:
        bind: Bind the argument to a value
        unbind: Unbind the argument
        bound: Check if the argument is bound
        value: Get the value of the argument
    """
    super().__init__(name, *args, **kwargs)
    self.__bound = False
    self.__value: Optional[T] = None

bind(value)

Bind the argument to a value.

Parameters:

Name Type Description Default
value T

The value to bind to the argument.

required

Returns:

Type Description
Argument[T]

Argument[T]: The updated argument instance.

Source code in grpipe/argument.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def bind(self, value: T) -> "Argument[T]":
    """
    Bind the argument to a value.

    Args:
        value (T): The value to bind to the argument.

    Returns:
        Argument[T]: The updated argument instance.
    """
    self.logger.debug(f"Bound {self.name} to {format_value(value)}")
    self.__value = value
    self.__bound = True
    return self

unbind()

Unbind the argument, clearing its value.

Returns:

Type Description
Argument[T]

Argument[T]: The updated argument instance.

Source code in grpipe/argument.py
53
54
55
56
57
58
59
60
61
62
63
def unbind(self) -> "Argument[T]":
    """
    Unbind the argument, clearing its value.

    Returns:
        Argument[T]: The updated argument instance.
    """
    self.logger.debug(f"Unbound {self.name}")
    self.__value = None
    self.__bound = False
    return self

Bases: BaseStep

Represents a step in the pipeline.

Source code in grpipe/step.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
class Step(BaseStep):
    """Represents a step in the pipeline."""

    def __init__(
        self,
        function: Callable[..., Any],
        *,
        params: Any = None,
        args: Any = None,
        max_cache_size: int,
        **kwargs: Any,
    ):
        name = function.__name__
        super().__init__(name=name, **kwargs)
        self.__run: Callable[..., Any] = function
        self.__args: dict[str, Step | Argument] = {}
        self.__params: dict[str, Any] = {}
        self.__cache_times: list[float] = []
        self.__run_times: list[float] = []

        self.__args = args or self.__infer_args(signature(self.__run))
        self.__params = params or self.__infer_params(signature(self.__run))

        self.cache: LRUCache = LRUCache(max_cache_size)
        self.__signature__ = signature(self.__run).replace(
            parameters=[
                Parameter(key, Parameter.KEYWORD_ONLY, default=val)
                for key, val in self.__args.items()
                if isinstance(val, Argument) and not val.bound
            ]
        )

    def __infer_args(self, signature: Signature) -> dict[str, Any]:
        args = {}
        for param in signature.parameters.values():
            if isinstance(param.default, (Step, Argument)):
                args[param.name] = param.default

        return args

    def __infer_params(self, signature: Signature) -> dict[str, Any]:
        params = {}
        for key, param in signature.parameters.items():
            if key in self.args:  # Skip bound arguments
                continue
            if param.default is Parameter.empty:
                raise ParameterError(param.name, self.name)
            params[param.name] = param.default

        return params

    def is_cachable(self, bound_args: dict[str, Any], kwargs: dict[str, Any]) -> bool:
        """
        Determine if the step's result can be cached based on the given arguments.

        Args:
            bound_args (dict[str, Any]): The bound arguments.
            kwargs (dict[str, Any]): The keyword arguments.

        Returns:
            bool: True if the result can be cached, False otherwise.
        """
        return all(not (not arg.cachable and arg_name not in bound_args) for arg_name, arg in self.__args.items())

    def get_cache_key(self, kwargs: dict[str, Any]) -> frozendict:
        """
        Generate a cache key based on the given arguments.

        Args:
            kwargs (dict[str, Any]): The keyword arguments.

        Returns:
            frozendict: A hashable cache key.
        """
        cache_dict = self.params.copy()
        cache_dict.update(kwargs)
        return frozendict({k: custom_hash(v) for k, v in cache_dict.items()})

    def __call__(self, *args: Any, **kwargs: Any) -> Any:
        """
        Execute the step with the given arguments.

        Args:
            *args: Positional arguments (not used).
            **kwargs: Keyword arguments for the step.

        Returns:
            Any: The result of executing the step.
        """
        if args:
            raise ArgumentError(0, f"{len(args)} positional arguments")
        bound_args = {k: v.value for k, v in self.args.items() if isinstance(v, Argument) and v.bound}
        if set(kwargs.keys()) != set(self.args.keys()) - set(bound_args.keys()):
            raise ArgumentError(
                set(self.args.keys()) - set(bound_args.keys()),
                set(kwargs.keys()),
            )

        is_cachable = self.is_cachable(bound_args, kwargs)

        joined_args = self.format_args(**kwargs, **self.params)

        if is_cachable:
            t0 = perf_counter()
            cache_key = self.get_cache_key(kwargs)
            t1 = perf_counter()
            self.__cache_times.append(t1 - t0)

            cached_result = self.cache.get(cache_key)
            if cached_result is not None:
                self.logger.debug(
                    f"[{self.__cache_times[-1]:.4f}] Returning cached value {self.name}({joined_args}) = {cached_result}"
                )
                return cached_result

        t0 = perf_counter()
        result = self.__run(**bound_args, **self.params, **kwargs)
        t1 = perf_counter()
        self.__run_times.append(t1 - t0)

        if is_cachable:
            self.cache.put(cache_key, result)
            self.logger.debug(f"[{self.__run_times[-1]:.4f}] Added {self.name}({joined_args}) = {result} to cache")
        else:
            self.logger.debug(
                f"[{self.__run_times[-1]:.4f}] Executed {self.name}({joined_args}) = {result} (not cached)"
            )

        return result

    def reset_cache(self) -> None:
        """Clear the step's cache."""
        self.logger.debug("Resetting cache")
        self.cache.clear()

    @property
    def params(self) -> dict[str, Any]:
        return self.__params

    def set_params(self, **kwargs: Any) -> "Step":
        """
        Set the parameters for the step.

        Args:
            **kwargs: The parameters to set.

        Returns:
            Step: The updated step instance.
        """
        change = False
        for key, value in kwargs.items():
            if key not in self.params:
                raise ParameterError(key, self.name)
            if self.params[key] != value:
                self.__params[key] = value
                change = True
        if change:
            self.logger.debug(f"Updated parameters {kwargs}")
        return self

    @property
    def args(self) -> dict[str, Any]:
        return self.__args

    def format_args(self, **kwargs: Any) -> str:
        return ", ".join([f"{key}={format_value(value)}" for key, value in kwargs.items()])

    def __repr__(self) -> str:
        return f"{self.name}({self.format_args(**self.args)})"

__call__(*args, **kwargs)

Execute the step with the given arguments.

Parameters:

Name Type Description Default
*args Any

Positional arguments (not used).

()
**kwargs Any

Keyword arguments for the step.

{}

Returns:

Name Type Description
Any Any

The result of executing the step.

Source code in grpipe/step.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def __call__(self, *args: Any, **kwargs: Any) -> Any:
    """
    Execute the step with the given arguments.

    Args:
        *args: Positional arguments (not used).
        **kwargs: Keyword arguments for the step.

    Returns:
        Any: The result of executing the step.
    """
    if args:
        raise ArgumentError(0, f"{len(args)} positional arguments")
    bound_args = {k: v.value for k, v in self.args.items() if isinstance(v, Argument) and v.bound}
    if set(kwargs.keys()) != set(self.args.keys()) - set(bound_args.keys()):
        raise ArgumentError(
            set(self.args.keys()) - set(bound_args.keys()),
            set(kwargs.keys()),
        )

    is_cachable = self.is_cachable(bound_args, kwargs)

    joined_args = self.format_args(**kwargs, **self.params)

    if is_cachable:
        t0 = perf_counter()
        cache_key = self.get_cache_key(kwargs)
        t1 = perf_counter()
        self.__cache_times.append(t1 - t0)

        cached_result = self.cache.get(cache_key)
        if cached_result is not None:
            self.logger.debug(
                f"[{self.__cache_times[-1]:.4f}] Returning cached value {self.name}({joined_args}) = {cached_result}"
            )
            return cached_result

    t0 = perf_counter()
    result = self.__run(**bound_args, **self.params, **kwargs)
    t1 = perf_counter()
    self.__run_times.append(t1 - t0)

    if is_cachable:
        self.cache.put(cache_key, result)
        self.logger.debug(f"[{self.__run_times[-1]:.4f}] Added {self.name}({joined_args}) = {result} to cache")
    else:
        self.logger.debug(
            f"[{self.__run_times[-1]:.4f}] Executed {self.name}({joined_args}) = {result} (not cached)"
        )

    return result

get_cache_key(kwargs)

Generate a cache key based on the given arguments.

Parameters:

Name Type Description Default
kwargs dict[str, Any]

The keyword arguments.

required

Returns:

Name Type Description
frozendict frozendict

A hashable cache key.

Source code in grpipe/step.py
77
78
79
80
81
82
83
84
85
86
87
88
89
def get_cache_key(self, kwargs: dict[str, Any]) -> frozendict:
    """
    Generate a cache key based on the given arguments.

    Args:
        kwargs (dict[str, Any]): The keyword arguments.

    Returns:
        frozendict: A hashable cache key.
    """
    cache_dict = self.params.copy()
    cache_dict.update(kwargs)
    return frozendict({k: custom_hash(v) for k, v in cache_dict.items()})

is_cachable(bound_args, kwargs)

Determine if the step's result can be cached based on the given arguments.

Parameters:

Name Type Description Default
bound_args dict[str, Any]

The bound arguments.

required
kwargs dict[str, Any]

The keyword arguments.

required

Returns:

Name Type Description
bool bool

True if the result can be cached, False otherwise.

Source code in grpipe/step.py
64
65
66
67
68
69
70
71
72
73
74
75
def is_cachable(self, bound_args: dict[str, Any], kwargs: dict[str, Any]) -> bool:
    """
    Determine if the step's result can be cached based on the given arguments.

    Args:
        bound_args (dict[str, Any]): The bound arguments.
        kwargs (dict[str, Any]): The keyword arguments.

    Returns:
        bool: True if the result can be cached, False otherwise.
    """
    return all(not (not arg.cachable and arg_name not in bound_args) for arg_name, arg in self.__args.items())

reset_cache()

Clear the step's cache.

Source code in grpipe/step.py
143
144
145
146
def reset_cache(self) -> None:
    """Clear the step's cache."""
    self.logger.debug("Resetting cache")
    self.cache.clear()

set_params(**kwargs)

Set the parameters for the step.

Parameters:

Name Type Description Default
**kwargs Any

The parameters to set.

{}

Returns:

Name Type Description
Step Step

The updated step instance.

Source code in grpipe/step.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def set_params(self, **kwargs: Any) -> "Step":
    """
    Set the parameters for the step.

    Args:
        **kwargs: The parameters to set.

    Returns:
        Step: The updated step instance.
    """
    change = False
    for key, value in kwargs.items():
        if key not in self.params:
            raise ParameterError(key, self.name)
        if self.params[key] != value:
            self.__params[key] = value
            change = True
    if change:
        self.logger.debug(f"Updated parameters {kwargs}")
    return self

Bases: BaseStep

Represents a pipeline of steps.

Source code in grpipe/pipeline.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
class Pipeline(BaseStep):
    """Represents a pipeline of steps."""

    def __init__(self, *steps: Step | Argument, intermedite: bool = False, **kwargs: Any):
        super().__init__(name="pipeline", cachable=False, **kwargs)
        self.__intermediate = intermedite
        self.__steps: dict[str, Step | Argument] = {step.name: step for step in steps}

        graph, output_nodes, args = self.__build_graph(*steps)

        self.__graph = graph
        self.__args = {arg: self.__steps[arg] for arg in args}

        if len(output_nodes) != 1 or not isinstance(self.steps[output_nodes[0]], Step):
            self.logger.info(f"Creating pipeline with multiple outputs {output_nodes}")
        self.__outputs = output_nodes

    def __build_graph(self, *steps: Step | Argument) -> tuple[nx.DiGraph, list[str], list[str]]:
        """
        Build a graph representation of the pipeline.

        Args:
            *steps: The steps in the pipeline.

        Returns:
            tuple[nx.DiGraph, list[str], list[str]]: The graph, output nodes, and argument nodes.
        """
        graph: nx.DiGraph = nx.DiGraph()

        for step in steps:
            if step.name not in graph:
                graph.add_node(step.name, kind="step" if isinstance(step, Step) else "arg")
        for step in steps:
            if isinstance(step, Argument):
                continue
            for dep in step.args.values():
                if dep.name in self.__steps:
                    graph.add_edge(dep.name, step.name)

        if cycles := list(nx.simple_cycles(graph)):
            raise PipelineError(0, f"{len(cycles)} cycles")

        output_nodes = [node for node in graph.nodes if graph.out_degree[node] == 0]

        args = [node for node, data in graph.nodes(data=True) if data["kind"] == "arg"]

        return graph, output_nodes, args

    def draw(self, params: bool = False) -> str:
        """
        Generate a flowchart of the pipeline using mermaid markdown.

        Args:
            params (bool): Whether to include parameters in the flowchart.

        Returns:
            str: The mermaid markdown representation of the flowchart.
        """
        mermaid = ["flowchart TD"]
        for step in self.steps.values():
            if isinstance(step, Step):
                step_label = f"{step.name}"
                if params:
                    param_str = "\n".join(f"⚙ {k}={v!s}" for k, v in step.params.items())
                    step_label += f"\n{param_str}"
                mermaid.append(f'{step.name}["{step_label}"]')
                for dep in step.args.values():
                    mermaid.append(f"{dep.name} --> {step.name}")
            elif isinstance(step, Argument):
                mermaid.append(f"{step.name}[({step.name})]")
        return "\n".join(mermaid)

    def bind(self, **kwargs: Any) -> "Pipeline":
        """
        Bind values to arguments in the pipeline.

        Args:
            **kwargs: The values to bind to arguments.

        Returns:
            Pipeline: The updated pipeline instance.
        """
        for name, value in kwargs.items():
            if name not in self.args:
                raise PipelineError("Argument", name)
            step = self.args[name]
            if not isinstance(step, Argument):
                raise PipelineError("Argument", type(step))
            step.bind(value)
            for child in nx.descendants(self.graph, step.name):
                child_step = self.steps[child]
                if isinstance(child_step, Step):
                    child_step.reset_cache()
        return self

    def unbind(self, *args: str) -> "Pipeline":
        """
        Unbind values from arguments in the pipeline.

        Args:
            *args: The names of arguments to unbind.

        Returns:
            Pipeline: The updated pipeline instance.
        """
        for name in args:
            if name not in self.args:
                raise PipelineError("Argument", name)
            step = self.args[name]
            if not isinstance(step, Argument):
                raise PipelineError("Argument", type(step))
            step.unbind()
            for child in nx.descendants(self.graph, step.name):
                child_step = self.steps[child]
                if isinstance(child_step, Step):
                    child_step.reset_cache()
        return self

    @property
    def graph(self) -> nx.DiGraph:
        return self.__graph

    @property
    def steps(self) -> dict[str, Step | Argument]:
        return self.__steps

    @property
    def args(self) -> dict[str, Argument | Step]:
        return self.__args

    @property
    def output(self) -> list[Step]:
        steps: list[Step] = []
        for step_name in self.__outputs:
            step = self.steps[step_name]
            if isinstance(step, Step):
                steps.append(step)
            else:
                raise PipelineError("Step", type(step))
        return steps

    @property
    def params(self) -> dict[str, Any]:
        return {
            f"{step.name}__{key}": value
            for step in self.steps.values()
            if isinstance(step, Step)
            for key, value in step.params.items()
        }

    def set_params(self, **kwargs: Any) -> "Pipeline":
        """
        Set parameters for steps in the pipeline.

        Args:
            **kwargs: The parameters to set, in the format "step__param".

        Returns:
            Pipeline: The updated pipeline instance.
        """
        step_params: Any = defaultdict(dict)
        for k, v in kwargs.items():
            step, param = k.split("__", 1)
            step_params[step][param] = v
        for step, params in step_params.items():
            if step not in self.steps:
                raise PipelineError(self.steps, step)
            child = self.steps[step]
            if not isinstance(child, Step):
                raise PipelineError("Step", type(child))
            child.set_params(**params)
        return self

    def set(
        self,
        verbose: Optional[bool] = None,
        cachable: Optional[bool] = False,
        intermediate: Optional[bool] = False,
    ) -> "Pipeline":
        """
        Set the verbose and cachable properties for all steps in the pipeline.

        Args:
            verbose (Optional[bool]): If provided, sets the verbose property.
            cachable (Optional[bool]): If provided, sets the cachable property.

        Returns:
            Pipeline: The updated pipeline instance.
        """
        if intermediate is not None:
            self.__intermediate = intermediate
        super().set(verbose=verbose, cachable=cachable)
        for step in self.steps.values():
            step.set(verbose=verbose)
        return self

    def __repr__(self) -> str:
        args = ", ".join([k for k, v in self.args.items() if not (isinstance(v, Argument) and v.bound)])
        return f"Pipeline({args})"

    def _repr_markdown_(self) -> str:
        return f"```mermaid\n{self.draw(params=False)}\n```"

    def _repr_html_(self) -> str | None:
        if "marimo" in sys.modules:
            html: str = sys.modules["marimo"].mermaid(self.draw(params=False)).text
            return html
        else:
            return None

    def __call__(self, *args: Any, **kwargs: Any) -> Any:
        """
        Execute the pipeline with the given arguments.

        Args:
            *args: Positional arguments (not used).
            **kwargs: Keyword arguments for the pipeline.

        Returns:
            Any: The result of executing the pipeline.
        """
        bound_args = {k: v.value for k, v in self.args.items() if isinstance(v, Argument) and v.bound}
        if args:
            raise ArgumentError(0, f"{len(args)} positional arguments")
        if set(kwargs.keys()) != set(self.args.keys()) - set(bound_args.keys()):
            raise ArgumentError(
                set(self.args.keys()) - set(bound_args.keys()),
                set(kwargs.keys()),
            )
        results: dict[str, Any] = kwargs | bound_args
        remaining_steps: set[Step] = {x for x in self.steps.values() if isinstance(x, Step)}

        while remaining_steps:
            for step in list(remaining_steps):
                if all(dep.name in results for dep in step.args.values()):
                    step_args = {k: results[v.name] for k, v in step.args.items() if v.name not in bound_args}
                    results[step.name] = step(**step_args)
                    remaining_steps.remove(step)
        outputs = [step.name for step in self.output]
        if self.__intermediate:
            return {key: val for key, val in results.items() if not isinstance(self.steps[key], Argument)}
        if len(outputs) == 1:
            return results[outputs[0]]
        else:
            return {name: results[name] for name in outputs}

    def __getitem__(self, key: str) -> "Pipeline":
        if key not in self.steps:
            raise PipelineError(key, self.steps)
        ancestor_nodes = nx.ancestors(self.graph, key)
        ancestor_nodes.add(key)
        steps = [self.steps[node] for node in ancestor_nodes]
        return Pipeline(*steps)

__build_graph(*steps)

Build a graph representation of the pipeline.

Parameters:

Name Type Description Default
*steps Step | Argument

The steps in the pipeline.

()

Returns:

Type Description
tuple[DiGraph, list[str], list[str]]

tuple[nx.DiGraph, list[str], list[str]]: The graph, output nodes, and argument nodes.

Source code in grpipe/pipeline.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def __build_graph(self, *steps: Step | Argument) -> tuple[nx.DiGraph, list[str], list[str]]:
    """
    Build a graph representation of the pipeline.

    Args:
        *steps: The steps in the pipeline.

    Returns:
        tuple[nx.DiGraph, list[str], list[str]]: The graph, output nodes, and argument nodes.
    """
    graph: nx.DiGraph = nx.DiGraph()

    for step in steps:
        if step.name not in graph:
            graph.add_node(step.name, kind="step" if isinstance(step, Step) else "arg")
    for step in steps:
        if isinstance(step, Argument):
            continue
        for dep in step.args.values():
            if dep.name in self.__steps:
                graph.add_edge(dep.name, step.name)

    if cycles := list(nx.simple_cycles(graph)):
        raise PipelineError(0, f"{len(cycles)} cycles")

    output_nodes = [node for node in graph.nodes if graph.out_degree[node] == 0]

    args = [node for node, data in graph.nodes(data=True) if data["kind"] == "arg"]

    return graph, output_nodes, args

__call__(*args, **kwargs)

Execute the pipeline with the given arguments.

Parameters:

Name Type Description Default
*args Any

Positional arguments (not used).

()
**kwargs Any

Keyword arguments for the pipeline.

{}

Returns:

Name Type Description
Any Any

The result of executing the pipeline.

Source code in grpipe/pipeline.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
def __call__(self, *args: Any, **kwargs: Any) -> Any:
    """
    Execute the pipeline with the given arguments.

    Args:
        *args: Positional arguments (not used).
        **kwargs: Keyword arguments for the pipeline.

    Returns:
        Any: The result of executing the pipeline.
    """
    bound_args = {k: v.value for k, v in self.args.items() if isinstance(v, Argument) and v.bound}
    if args:
        raise ArgumentError(0, f"{len(args)} positional arguments")
    if set(kwargs.keys()) != set(self.args.keys()) - set(bound_args.keys()):
        raise ArgumentError(
            set(self.args.keys()) - set(bound_args.keys()),
            set(kwargs.keys()),
        )
    results: dict[str, Any] = kwargs | bound_args
    remaining_steps: set[Step] = {x for x in self.steps.values() if isinstance(x, Step)}

    while remaining_steps:
        for step in list(remaining_steps):
            if all(dep.name in results for dep in step.args.values()):
                step_args = {k: results[v.name] for k, v in step.args.items() if v.name not in bound_args}
                results[step.name] = step(**step_args)
                remaining_steps.remove(step)
    outputs = [step.name for step in self.output]
    if self.__intermediate:
        return {key: val for key, val in results.items() if not isinstance(self.steps[key], Argument)}
    if len(outputs) == 1:
        return results[outputs[0]]
    else:
        return {name: results[name] for name in outputs}

bind(**kwargs)

Bind values to arguments in the pipeline.

Parameters:

Name Type Description Default
**kwargs Any

The values to bind to arguments.

{}

Returns:

Name Type Description
Pipeline Pipeline

The updated pipeline instance.

Source code in grpipe/pipeline.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def bind(self, **kwargs: Any) -> "Pipeline":
    """
    Bind values to arguments in the pipeline.

    Args:
        **kwargs: The values to bind to arguments.

    Returns:
        Pipeline: The updated pipeline instance.
    """
    for name, value in kwargs.items():
        if name not in self.args:
            raise PipelineError("Argument", name)
        step = self.args[name]
        if not isinstance(step, Argument):
            raise PipelineError("Argument", type(step))
        step.bind(value)
        for child in nx.descendants(self.graph, step.name):
            child_step = self.steps[child]
            if isinstance(child_step, Step):
                child_step.reset_cache()
    return self

draw(params=False)

Generate a flowchart of the pipeline using mermaid markdown.

Parameters:

Name Type Description Default
params bool

Whether to include parameters in the flowchart.

False

Returns:

Name Type Description
str str

The mermaid markdown representation of the flowchart.

Source code in grpipe/pipeline.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def draw(self, params: bool = False) -> str:
    """
    Generate a flowchart of the pipeline using mermaid markdown.

    Args:
        params (bool): Whether to include parameters in the flowchart.

    Returns:
        str: The mermaid markdown representation of the flowchart.
    """
    mermaid = ["flowchart TD"]
    for step in self.steps.values():
        if isinstance(step, Step):
            step_label = f"{step.name}"
            if params:
                param_str = "\n".join(f"⚙ {k}={v!s}" for k, v in step.params.items())
                step_label += f"\n{param_str}"
            mermaid.append(f'{step.name}["{step_label}"]')
            for dep in step.args.values():
                mermaid.append(f"{dep.name} --> {step.name}")
        elif isinstance(step, Argument):
            mermaid.append(f"{step.name}[({step.name})]")
    return "\n".join(mermaid)

set(verbose=None, cachable=False, intermediate=False)

Set the verbose and cachable properties for all steps in the pipeline.

Parameters:

Name Type Description Default
verbose Optional[bool]

If provided, sets the verbose property.

None
cachable Optional[bool]

If provided, sets the cachable property.

False

Returns:

Name Type Description
Pipeline Pipeline

The updated pipeline instance.

Source code in grpipe/pipeline.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def set(
    self,
    verbose: Optional[bool] = None,
    cachable: Optional[bool] = False,
    intermediate: Optional[bool] = False,
) -> "Pipeline":
    """
    Set the verbose and cachable properties for all steps in the pipeline.

    Args:
        verbose (Optional[bool]): If provided, sets the verbose property.
        cachable (Optional[bool]): If provided, sets the cachable property.

    Returns:
        Pipeline: The updated pipeline instance.
    """
    if intermediate is not None:
        self.__intermediate = intermediate
    super().set(verbose=verbose, cachable=cachable)
    for step in self.steps.values():
        step.set(verbose=verbose)
    return self

set_params(**kwargs)

Set parameters for steps in the pipeline.

Parameters:

Name Type Description Default
**kwargs Any

The parameters to set, in the format "step__param".

{}

Returns:

Name Type Description
Pipeline Pipeline

The updated pipeline instance.

Source code in grpipe/pipeline.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
def set_params(self, **kwargs: Any) -> "Pipeline":
    """
    Set parameters for steps in the pipeline.

    Args:
        **kwargs: The parameters to set, in the format "step__param".

    Returns:
        Pipeline: The updated pipeline instance.
    """
    step_params: Any = defaultdict(dict)
    for k, v in kwargs.items():
        step, param = k.split("__", 1)
        step_params[step][param] = v
    for step, params in step_params.items():
        if step not in self.steps:
            raise PipelineError(self.steps, step)
        child = self.steps[step]
        if not isinstance(child, Step):
            raise PipelineError("Step", type(child))
        child.set_params(**params)
    return self

unbind(*args)

Unbind values from arguments in the pipeline.

Parameters:

Name Type Description Default
*args str

The names of arguments to unbind.

()

Returns:

Name Type Description
Pipeline Pipeline

The updated pipeline instance.

Source code in grpipe/pipeline.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def unbind(self, *args: str) -> "Pipeline":
    """
    Unbind values from arguments in the pipeline.

    Args:
        *args: The names of arguments to unbind.

    Returns:
        Pipeline: The updated pipeline instance.
    """
    for name in args:
        if name not in self.args:
            raise PipelineError("Argument", name)
        step = self.args[name]
        if not isinstance(step, Argument):
            raise PipelineError("Argument", type(step))
        step.unbind()
        for child in nx.descendants(self.graph, step.name):
            child_step = self.steps[child]
            if isinstance(child_step, Step):
                child_step.reset_cache()
    return self