vkit.engine.interface

  1# Copyright 2022 vkit-x Administrator. All Rights Reserved.
  2#
  3# This project (vkit-x/vkit) is dual-licensed under commercial and SSPL licenses.
  4#
  5# The commercial license gives you the full rights to create and distribute software
  6# on your own terms without any SSPL license obligations. For more information,
  7# please see the "LICENSE_COMMERCIAL.txt" file.
  8#
  9# This project is also available under Server Side Public License (SSPL).
 10# The SSPL licensing is ideal for use cases such as open source projects with
 11# SSPL distribution, student/academic purposes, hobby projects, internal research
 12# projects without external distribution, or other projects where all SSPL
 13# obligations can be met. For more information, please see the "LICENSE_SSPL.txt" file.
 14from typing import (
 15    cast,
 16    Generic,
 17    Type,
 18    TypeVar,
 19    Mapping,
 20    Sequence,
 21    Any,
 22    Tuple,
 23    List,
 24    Optional,
 25    Union,
 26    Callable,
 27)
 28import itertools
 29
 30import attrs
 31from numpy.random import Generator as RandomGenerator
 32
 33from vkit.utility import (
 34    normalize_to_keys_and_probs,
 35    rng_choice,
 36    is_path_type,
 37    read_json_file,
 38    dyn_structure,
 39    get_generic_classes,
 40    PathType,
 41)
 42
 43_T_INIT_CONFIG = TypeVar('_T_INIT_CONFIG')
 44_T_INIT_RESOURCE = TypeVar('_T_INIT_RESOURCE')
 45_T_RUN_CONFIG = TypeVar('_T_RUN_CONFIG')
 46_T_RUN_OUTPUT = TypeVar('_T_RUN_OUTPUT')
 47
 48
 49@attrs.define
 50class NoneTypeEngineInitConfig:
 51    pass
 52
 53
 54@attrs.define
 55class NoneTypeEngineInitResource:
 56    pass
 57
 58
 59class Engine(
 60    Generic[
 61        _T_INIT_CONFIG,
 62        _T_INIT_RESOURCE,
 63        _T_RUN_CONFIG,
 64        _T_RUN_OUTPUT,
 65    ]
 66):  # yapf: disable
 67
 68    @classmethod
 69    def get_type_name(cls) -> str:
 70        raise NotImplementedError()
 71
 72    def __init__(
 73        self,
 74        init_config: _T_INIT_CONFIG,
 75        init_resource: Optional[_T_INIT_RESOURCE] = None,
 76    ):
 77        self.init_config = init_config
 78        self.init_resource = init_resource
 79
 80    def run(
 81        self,
 82        run_config: _T_RUN_CONFIG,
 83        rng: Optional[RandomGenerator] = None,
 84    ) -> _T_RUN_OUTPUT:
 85        raise NotImplementedError()
 86
 87
 88class EngineExecutor(
 89    Generic[
 90        _T_INIT_CONFIG,
 91        _T_INIT_RESOURCE,
 92        _T_RUN_CONFIG,
 93        _T_RUN_OUTPUT,
 94    ]
 95):  # yapf: disable
 96
 97    def __init__(
 98        self,
 99        engine: Engine[
100            _T_INIT_CONFIG,
101            _T_INIT_RESOURCE,
102            _T_RUN_CONFIG,
103            _T_RUN_OUTPUT,
104        ],
105    ):  # yapf: disable
106        self.engine = engine
107
108    def get_run_config_cls(self) -> Type[_T_RUN_CONFIG]:
109        return get_generic_classes(type(self.engine))[2]  # type: ignore
110
111    def run(
112        self,
113        run_config: Union[
114            Mapping[str, Any],
115            _T_RUN_CONFIG,
116        ],
117        rng: Optional[RandomGenerator] = None,
118    ) -> _T_RUN_OUTPUT:  # yapf: disable
119        run_config = dyn_structure(run_config, self.get_run_config_cls())
120        return self.engine.run(run_config, rng)
121
122
123class EngineExecutorFactory(
124    Generic[
125        _T_INIT_CONFIG,
126        _T_INIT_RESOURCE,
127        _T_RUN_CONFIG,
128        _T_RUN_OUTPUT,
129    ]
130):  # yapf: disable
131
132    def __init__(
133        self,
134        engine_cls: Type[
135            Engine[
136                _T_INIT_CONFIG,
137                _T_INIT_RESOURCE,
138                _T_RUN_CONFIG,
139                _T_RUN_OUTPUT,
140            ]
141        ],
142    ):  # yapf: disable
143        self.engine_cls = engine_cls
144
145    def get_type_name(self):
146        return self.engine_cls.get_type_name()
147
148    def get_init_config_cls(self) -> Type[_T_INIT_CONFIG]:
149        return get_generic_classes(self.engine_cls)[0]  # type: ignore
150
151    def get_init_resource_cls(self) -> Type[_T_INIT_RESOURCE]:
152        return get_generic_classes(self.engine_cls)[1]  # type: ignore
153
154    def create(
155        self,
156        init_config: Optional[
157            Union[
158                Mapping[str, Any],
159                PathType,
160                _T_INIT_CONFIG,
161            ]
162        ] = None,
163        init_resource: Optional[
164            Union[
165                Mapping[str, Any],
166                _T_INIT_RESOURCE,
167            ]
168        ] = None,
169    ):  # yapf: disable
170        init_config = dyn_structure(
171            init_config,
172            self.get_init_config_cls(),
173            support_path_type=True,
174            support_none_type=True,
175        )
176
177        init_resource_cls = self.get_init_resource_cls()
178        if init_resource_cls is NoneTypeEngineInitResource:
179            assert init_resource is None
180        else:
181            assert init_resource
182        if init_resource is not None:
183            init_resource = dyn_structure(init_resource, init_resource_cls)
184
185        return EngineExecutor(self.engine_cls(init_config, init_resource))
186
187
188class EngineExecutorAggregatorSelector(
189    Generic[
190        _T_RUN_CONFIG,
191        _T_RUN_OUTPUT,
192    ]
193):  # yapf: disable
194
195    def __init__(
196        self,
197        pairs: Sequence[
198            Tuple[
199                EngineExecutor[
200                    Any,
201                    Any,
202                    _T_RUN_CONFIG,
203                    _T_RUN_OUTPUT,
204                ],
205                float,
206            ]
207        ],
208    ):  # yapf: disable
209        self.engine_executors, self.probs = normalize_to_keys_and_probs(pairs)
210
211    def get_run_config_cls(self):
212        return self.engine_executors[0].get_run_config_cls()
213
214    def select_engine_executor(self, rng: RandomGenerator):
215        return rng_choice(rng, self.engine_executors, probs=self.probs)
216
217
218def engine_executor_aggregator_default_func_collate(
219    selector: EngineExecutorAggregatorSelector[
220        _T_RUN_CONFIG,
221        _T_RUN_OUTPUT,
222    ],
223    run_config: _T_RUN_CONFIG,
224    rng: RandomGenerator,
225) -> _T_RUN_OUTPUT:  # yapf: disable
226    engine_executor = selector.select_engine_executor(rng)
227    return engine_executor.run(run_config, rng)
228
229
230class EngineExecutorAggregator(
231    Generic[
232        _T_RUN_CONFIG,
233        _T_RUN_OUTPUT,
234    ]
235):  # yapf: disable
236
237    def get_run_config_cls(self):
238        return self.selector.get_run_config_cls()
239
240    def __init__(
241        self,
242        selector: EngineExecutorAggregatorSelector[
243            _T_RUN_CONFIG,
244            _T_RUN_OUTPUT,
245        ],
246        func_collate: Callable[
247            [
248                EngineExecutorAggregatorSelector[
249                    _T_RUN_CONFIG,
250                    _T_RUN_OUTPUT,
251                ],
252                _T_RUN_CONFIG,
253                RandomGenerator,
254            ],
255            _T_RUN_OUTPUT,
256        ] = engine_executor_aggregator_default_func_collate,
257    ):  # yapf: disable
258        self.selector = selector
259        self.func_collate = func_collate
260
261    def run(
262        self,
263        run_config: Union[
264            Mapping[str, Any],
265            _T_RUN_CONFIG,
266        ],
267        rng: RandomGenerator,
268    ) -> _T_RUN_OUTPUT:  # yapf: disable
269        run_config = dyn_structure(run_config, self.get_run_config_cls())
270        return self.func_collate(self.selector, run_config, rng)
271
272
273class EngineExecutorAggregatorFactoryConfigKey:
274    TYPE = 'type'
275    WEIGHT = 'weight'
276    CONFIG = 'config'
277
278
279class EngineExecutorAggregatorFactory(
280    Generic[
281        _T_RUN_CONFIG,
282        _T_RUN_OUTPUT,
283    ]
284):  # yapf: disable
285
286    def __init__(
287        self,
288        engine_executor_factories: Sequence[
289            EngineExecutorFactory[
290                Any,
291                Any,
292                _T_RUN_CONFIG,
293                _T_RUN_OUTPUT,
294            ]
295        ],
296        func_collate: Callable[
297            [
298                EngineExecutorAggregatorSelector[
299                    _T_RUN_CONFIG,
300                    _T_RUN_OUTPUT,
301                ],
302                _T_RUN_CONFIG,
303                RandomGenerator,
304            ],
305            _T_RUN_OUTPUT,
306        ] = engine_executor_aggregator_default_func_collate,
307    ):  # yapf: disable
308        self.type_name_to_engine_executor_factory = {
309            engine_executor_factory.get_type_name(): engine_executor_factory
310            for engine_executor_factory in engine_executor_factories
311        }
312        self.func_collate = func_collate
313
314    def create(
315        self,
316        factory_init_configs: Union[
317            Sequence[Mapping[str, Any]],
318            PathType,
319        ],
320        init_resources: Optional[
321            Sequence[
322                Union[
323                    Mapping[str, Any],
324                    # TODO: find a better way to constrain resource type.
325                    Any,
326                ]
327            ]
328        ] = None,
329    ):  # yapf: disable
330        if is_path_type(factory_init_configs):
331            factory_init_configs = read_json_file(factory_init_configs)  # type: ignore
332        factory_init_configs = cast(Sequence[Mapping[str, Any]], factory_init_configs)
333
334        pairs: List[
335            Tuple[
336                EngineExecutor[Any, Any, _T_RUN_CONFIG, _T_RUN_OUTPUT],
337                float,
338            ],
339        ] = []  # yapf: disable
340
341        for factory_init_config, init_resource in zip(
342            factory_init_configs, init_resources or itertools.repeat(None)
343        ):
344            # Reflects engine_executor_factory.
345            type_name = factory_init_config[EngineExecutorAggregatorFactoryConfigKey.TYPE]
346            if type_name not in self.type_name_to_engine_executor_factory:
347                raise KeyError(f'type_name={type_name} not found')
348            engine_executor_factory = self.type_name_to_engine_executor_factory[type_name]
349
350            # Build init_resource.
351            init_resource_cls = engine_executor_factory.get_init_resource_cls()
352            if init_resource_cls is NoneTypeEngineInitResource:
353                assert init_resource is None
354            else:
355                assert init_resource
356                init_resource = dyn_structure(init_resource, init_resource_cls)
357
358            # Build engine_executor.
359            engine_executor = engine_executor_factory.create(
360                factory_init_config.get(EngineExecutorAggregatorFactoryConfigKey.CONFIG, {}),
361                init_resource,
362            )
363
364            # Get the weight.
365            if len(factory_init_configs) == 1:
366                weight = 1
367            else:
368                weight = factory_init_config[EngineExecutorAggregatorFactoryConfigKey.WEIGHT]
369
370            pairs.append((engine_executor, weight))
371
372        return EngineExecutorAggregator(
373            EngineExecutorAggregatorSelector(pairs),
374            func_collate=self.func_collate,
375        )
376
377    def create_with_repeated_init_resource(
378        self,
379        factory_init_configs: Union[
380            Sequence[Mapping[str, Any]],
381            PathType,
382        ],
383        init_resource: Union[
384            Mapping[str, Any],
385            Any,
386        ]
387    ):  # yapf: disable
388        if is_path_type(factory_init_configs):
389            factory_init_configs = read_json_file(factory_init_configs)  # type: ignore
390        factory_init_configs = cast(Sequence[Mapping[str, Any]], factory_init_configs)
391
392        return self.create(
393            factory_init_configs,
394            [init_resource] * len(factory_init_configs),
395        )
396
397    def create_engine_executor(
398        self,
399        factory_init_config: Mapping[str, Any],
400        init_resource: Optional[
401            Union[
402                Mapping[str, Any],
403                Any,
404            ]
405        ] = None,
406    ):  # yapf: disable
407        executor_aggregator = self.create(
408            factory_init_configs=[factory_init_config],
409            init_resources=[init_resource] if init_resource else None,
410        )
411        return executor_aggregator.selector.engine_executors[0]
412
413
414# TODO: move to doc.
415# A minimum template.
416'''
417from typing import Optional
418
419import attrs
420from numpy.random import Generator as RandomGenerator
421
422from ..interface import (
423    Engine,
424    EngineExecutorFactory,
425    NoneTypeEngineInitResource,
426)
427
428
429@attrs.define
430class FooEngineInitConfig:
431    pass
432
433
434@attrs.define
435class FooEngineRunConfig:
436    pass
437
438
439class FooEngine(
440    Engine[
441        FooEngineInitConfig,
442        NoneTypeEngineInitResource,
443        FooEngineRunConfig,
444        int,
445    ]
446):  # yapf: disable
447
448    @classmethod
449    def get_type_name(cls) -> str:
450        return 'foo'
451
452    def __init__(
453        self,
454        init_config: FooEngineInitConfig,
455        init_resource: Optional[NoneTypeEngineInitResource] = None,
456    ):
457        super().__init__(init_config, init_resource)
458
459    def run(self, run_config: FooEngineRunConfig, rng: RandomGenerator) -> int:
460        return 42
461
462
463foo_engine_executor_factory = EngineExecutorFactory(FooEngine)
464'''
class NoneTypeEngineInitConfig:
51class NoneTypeEngineInitConfig:
52    pass
NoneTypeEngineInitConfig()
2def __init__(self, ):
3    pass

Method generated by attrs for class NoneTypeEngineInitConfig.

class NoneTypeEngineInitResource:
56class NoneTypeEngineInitResource:
57    pass
NoneTypeEngineInitResource()
2def __init__(self, ):
3    pass

Method generated by attrs for class NoneTypeEngineInitResource.

class Engine(typing.Generic[~_T_INIT_CONFIG, ~_T_INIT_RESOURCE, ~_T_RUN_CONFIG, ~_T_RUN_OUTPUT]):
60class Engine(
61    Generic[
62        _T_INIT_CONFIG,
63        _T_INIT_RESOURCE,
64        _T_RUN_CONFIG,
65        _T_RUN_OUTPUT,
66    ]
67):  # yapf: disable
68
69    @classmethod
70    def get_type_name(cls) -> str:
71        raise NotImplementedError()
72
73    def __init__(
74        self,
75        init_config: _T_INIT_CONFIG,
76        init_resource: Optional[_T_INIT_RESOURCE] = None,
77    ):
78        self.init_config = init_config
79        self.init_resource = init_resource
80
81    def run(
82        self,
83        run_config: _T_RUN_CONFIG,
84        rng: Optional[RandomGenerator] = None,
85    ) -> _T_RUN_OUTPUT:
86        raise NotImplementedError()

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

Engine( init_config: ~_T_INIT_CONFIG, init_resource: Union[~_T_INIT_RESOURCE, NoneType] = None)
73    def __init__(
74        self,
75        init_config: _T_INIT_CONFIG,
76        init_resource: Optional[_T_INIT_RESOURCE] = None,
77    ):
78        self.init_config = init_config
79        self.init_resource = init_resource
@classmethod
def get_type_name(cls) -> str:
69    @classmethod
70    def get_type_name(cls) -> str:
71        raise NotImplementedError()
def run( self, run_config: ~_T_RUN_CONFIG, rng: Union[numpy.random._generator.Generator, NoneType] = None) -> ~_T_RUN_OUTPUT:
81    def run(
82        self,
83        run_config: _T_RUN_CONFIG,
84        rng: Optional[RandomGenerator] = None,
85    ) -> _T_RUN_OUTPUT:
86        raise NotImplementedError()
class EngineExecutor(typing.Generic[~_T_INIT_CONFIG, ~_T_INIT_RESOURCE, ~_T_RUN_CONFIG, ~_T_RUN_OUTPUT]):
 89class EngineExecutor(
 90    Generic[
 91        _T_INIT_CONFIG,
 92        _T_INIT_RESOURCE,
 93        _T_RUN_CONFIG,
 94        _T_RUN_OUTPUT,
 95    ]
 96):  # yapf: disable
 97
 98    def __init__(
 99        self,
100        engine: Engine[
101            _T_INIT_CONFIG,
102            _T_INIT_RESOURCE,
103            _T_RUN_CONFIG,
104            _T_RUN_OUTPUT,
105        ],
106    ):  # yapf: disable
107        self.engine = engine
108
109    def get_run_config_cls(self) -> Type[_T_RUN_CONFIG]:
110        return get_generic_classes(type(self.engine))[2]  # type: ignore
111
112    def run(
113        self,
114        run_config: Union[
115            Mapping[str, Any],
116            _T_RUN_CONFIG,
117        ],
118        rng: Optional[RandomGenerator] = None,
119    ) -> _T_RUN_OUTPUT:  # yapf: disable
120        run_config = dyn_structure(run_config, self.get_run_config_cls())
121        return self.engine.run(run_config, rng)

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

EngineExecutor( engine: vkit.engine.interface.Engine[~_T_INIT_CONFIG, ~_T_INIT_RESOURCE, ~_T_RUN_CONFIG, ~_T_RUN_OUTPUT])
 98    def __init__(
 99        self,
100        engine: Engine[
101            _T_INIT_CONFIG,
102            _T_INIT_RESOURCE,
103            _T_RUN_CONFIG,
104            _T_RUN_OUTPUT,
105        ],
106    ):  # yapf: disable
107        self.engine = engine
def get_run_config_cls(self) -> Type[~_T_RUN_CONFIG]:
109    def get_run_config_cls(self) -> Type[_T_RUN_CONFIG]:
110        return get_generic_classes(type(self.engine))[2]  # type: ignore
def run( self, run_config: Union[Mapping[str, Any], ~_T_RUN_CONFIG], rng: Union[numpy.random._generator.Generator, NoneType] = None) -> ~_T_RUN_OUTPUT:
112    def run(
113        self,
114        run_config: Union[
115            Mapping[str, Any],
116            _T_RUN_CONFIG,
117        ],
118        rng: Optional[RandomGenerator] = None,
119    ) -> _T_RUN_OUTPUT:  # yapf: disable
120        run_config = dyn_structure(run_config, self.get_run_config_cls())
121        return self.engine.run(run_config, rng)
class EngineExecutorFactory(typing.Generic[~_T_INIT_CONFIG, ~_T_INIT_RESOURCE, ~_T_RUN_CONFIG, ~_T_RUN_OUTPUT]):
124class EngineExecutorFactory(
125    Generic[
126        _T_INIT_CONFIG,
127        _T_INIT_RESOURCE,
128        _T_RUN_CONFIG,
129        _T_RUN_OUTPUT,
130    ]
131):  # yapf: disable
132
133    def __init__(
134        self,
135        engine_cls: Type[
136            Engine[
137                _T_INIT_CONFIG,
138                _T_INIT_RESOURCE,
139                _T_RUN_CONFIG,
140                _T_RUN_OUTPUT,
141            ]
142        ],
143    ):  # yapf: disable
144        self.engine_cls = engine_cls
145
146    def get_type_name(self):
147        return self.engine_cls.get_type_name()
148
149    def get_init_config_cls(self) -> Type[_T_INIT_CONFIG]:
150        return get_generic_classes(self.engine_cls)[0]  # type: ignore
151
152    def get_init_resource_cls(self) -> Type[_T_INIT_RESOURCE]:
153        return get_generic_classes(self.engine_cls)[1]  # type: ignore
154
155    def create(
156        self,
157        init_config: Optional[
158            Union[
159                Mapping[str, Any],
160                PathType,
161                _T_INIT_CONFIG,
162            ]
163        ] = None,
164        init_resource: Optional[
165            Union[
166                Mapping[str, Any],
167                _T_INIT_RESOURCE,
168            ]
169        ] = None,
170    ):  # yapf: disable
171        init_config = dyn_structure(
172            init_config,
173            self.get_init_config_cls(),
174            support_path_type=True,
175            support_none_type=True,
176        )
177
178        init_resource_cls = self.get_init_resource_cls()
179        if init_resource_cls is NoneTypeEngineInitResource:
180            assert init_resource is None
181        else:
182            assert init_resource
183        if init_resource is not None:
184            init_resource = dyn_structure(init_resource, init_resource_cls)
185
186        return EngineExecutor(self.engine_cls(init_config, init_resource))

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

EngineExecutorFactory( engine_cls: Type[vkit.engine.interface.Engine[~_T_INIT_CONFIG, ~_T_INIT_RESOURCE, ~_T_RUN_CONFIG, ~_T_RUN_OUTPUT]])
133    def __init__(
134        self,
135        engine_cls: Type[
136            Engine[
137                _T_INIT_CONFIG,
138                _T_INIT_RESOURCE,
139                _T_RUN_CONFIG,
140                _T_RUN_OUTPUT,
141            ]
142        ],
143    ):  # yapf: disable
144        self.engine_cls = engine_cls
def get_type_name(self):
146    def get_type_name(self):
147        return self.engine_cls.get_type_name()
def get_init_config_cls(self) -> Type[~_T_INIT_CONFIG]:
149    def get_init_config_cls(self) -> Type[_T_INIT_CONFIG]:
150        return get_generic_classes(self.engine_cls)[0]  # type: ignore
def get_init_resource_cls(self) -> Type[~_T_INIT_RESOURCE]:
152    def get_init_resource_cls(self) -> Type[_T_INIT_RESOURCE]:
153        return get_generic_classes(self.engine_cls)[1]  # type: ignore
def create( self, init_config: Union[Mapping[str, Any], str, os.PathLike, ~_T_INIT_CONFIG, NoneType] = None, init_resource: Union[Mapping[str, Any], ~_T_INIT_RESOURCE, NoneType] = None):
155    def create(
156        self,
157        init_config: Optional[
158            Union[
159                Mapping[str, Any],
160                PathType,
161                _T_INIT_CONFIG,
162            ]
163        ] = None,
164        init_resource: Optional[
165            Union[
166                Mapping[str, Any],
167                _T_INIT_RESOURCE,
168            ]
169        ] = None,
170    ):  # yapf: disable
171        init_config = dyn_structure(
172            init_config,
173            self.get_init_config_cls(),
174            support_path_type=True,
175            support_none_type=True,
176        )
177
178        init_resource_cls = self.get_init_resource_cls()
179        if init_resource_cls is NoneTypeEngineInitResource:
180            assert init_resource is None
181        else:
182            assert init_resource
183        if init_resource is not None:
184            init_resource = dyn_structure(init_resource, init_resource_cls)
185
186        return EngineExecutor(self.engine_cls(init_config, init_resource))
class EngineExecutorAggregatorSelector(typing.Generic[~_T_RUN_CONFIG, ~_T_RUN_OUTPUT]):
189class EngineExecutorAggregatorSelector(
190    Generic[
191        _T_RUN_CONFIG,
192        _T_RUN_OUTPUT,
193    ]
194):  # yapf: disable
195
196    def __init__(
197        self,
198        pairs: Sequence[
199            Tuple[
200                EngineExecutor[
201                    Any,
202                    Any,
203                    _T_RUN_CONFIG,
204                    _T_RUN_OUTPUT,
205                ],
206                float,
207            ]
208        ],
209    ):  # yapf: disable
210        self.engine_executors, self.probs = normalize_to_keys_and_probs(pairs)
211
212    def get_run_config_cls(self):
213        return self.engine_executors[0].get_run_config_cls()
214
215    def select_engine_executor(self, rng: RandomGenerator):
216        return rng_choice(rng, self.engine_executors, probs=self.probs)

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

EngineExecutorAggregatorSelector( pairs: Sequence[Tuple[vkit.engine.interface.EngineExecutor[Any, Any, ~_T_RUN_CONFIG, ~_T_RUN_OUTPUT], float]])
196    def __init__(
197        self,
198        pairs: Sequence[
199            Tuple[
200                EngineExecutor[
201                    Any,
202                    Any,
203                    _T_RUN_CONFIG,
204                    _T_RUN_OUTPUT,
205                ],
206                float,
207            ]
208        ],
209    ):  # yapf: disable
210        self.engine_executors, self.probs = normalize_to_keys_and_probs(pairs)
def get_run_config_cls(self):
212    def get_run_config_cls(self):
213        return self.engine_executors[0].get_run_config_cls()
def select_engine_executor(self, rng: numpy.random._generator.Generator):
215    def select_engine_executor(self, rng: RandomGenerator):
216        return rng_choice(rng, self.engine_executors, probs=self.probs)
def engine_executor_aggregator_default_func_collate( selector: vkit.engine.interface.EngineExecutorAggregatorSelector[~_T_RUN_CONFIG, ~_T_RUN_OUTPUT], run_config: ~_T_RUN_CONFIG, rng: numpy.random._generator.Generator) -> ~_T_RUN_OUTPUT:
219def engine_executor_aggregator_default_func_collate(
220    selector: EngineExecutorAggregatorSelector[
221        _T_RUN_CONFIG,
222        _T_RUN_OUTPUT,
223    ],
224    run_config: _T_RUN_CONFIG,
225    rng: RandomGenerator,
226) -> _T_RUN_OUTPUT:  # yapf: disable
227    engine_executor = selector.select_engine_executor(rng)
228    return engine_executor.run(run_config, rng)
class EngineExecutorAggregator(typing.Generic[~_T_RUN_CONFIG, ~_T_RUN_OUTPUT]):
231class EngineExecutorAggregator(
232    Generic[
233        _T_RUN_CONFIG,
234        _T_RUN_OUTPUT,
235    ]
236):  # yapf: disable
237
238    def get_run_config_cls(self):
239        return self.selector.get_run_config_cls()
240
241    def __init__(
242        self,
243        selector: EngineExecutorAggregatorSelector[
244            _T_RUN_CONFIG,
245            _T_RUN_OUTPUT,
246        ],
247        func_collate: Callable[
248            [
249                EngineExecutorAggregatorSelector[
250                    _T_RUN_CONFIG,
251                    _T_RUN_OUTPUT,
252                ],
253                _T_RUN_CONFIG,
254                RandomGenerator,
255            ],
256            _T_RUN_OUTPUT,
257        ] = engine_executor_aggregator_default_func_collate,
258    ):  # yapf: disable
259        self.selector = selector
260        self.func_collate = func_collate
261
262    def run(
263        self,
264        run_config: Union[
265            Mapping[str, Any],
266            _T_RUN_CONFIG,
267        ],
268        rng: RandomGenerator,
269    ) -> _T_RUN_OUTPUT:  # yapf: disable
270        run_config = dyn_structure(run_config, self.get_run_config_cls())
271        return self.func_collate(self.selector, run_config, rng)

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

EngineExecutorAggregator( selector: vkit.engine.interface.EngineExecutorAggregatorSelector[~_T_RUN_CONFIG, ~_T_RUN_OUTPUT], func_collate: Callable[[vkit.engine.interface.EngineExecutorAggregatorSelector[~_T_RUN_CONFIG, ~_T_RUN_OUTPUT], ~_T_RUN_CONFIG, numpy.random._generator.Generator], ~_T_RUN_OUTPUT] = <function engine_executor_aggregator_default_func_collate>)
241    def __init__(
242        self,
243        selector: EngineExecutorAggregatorSelector[
244            _T_RUN_CONFIG,
245            _T_RUN_OUTPUT,
246        ],
247        func_collate: Callable[
248            [
249                EngineExecutorAggregatorSelector[
250                    _T_RUN_CONFIG,
251                    _T_RUN_OUTPUT,
252                ],
253                _T_RUN_CONFIG,
254                RandomGenerator,
255            ],
256            _T_RUN_OUTPUT,
257        ] = engine_executor_aggregator_default_func_collate,
258    ):  # yapf: disable
259        self.selector = selector
260        self.func_collate = func_collate
def get_run_config_cls(self):
238    def get_run_config_cls(self):
239        return self.selector.get_run_config_cls()
def run( self, run_config: Union[Mapping[str, Any], ~_T_RUN_CONFIG], rng: numpy.random._generator.Generator) -> ~_T_RUN_OUTPUT:
262    def run(
263        self,
264        run_config: Union[
265            Mapping[str, Any],
266            _T_RUN_CONFIG,
267        ],
268        rng: RandomGenerator,
269    ) -> _T_RUN_OUTPUT:  # yapf: disable
270        run_config = dyn_structure(run_config, self.get_run_config_cls())
271        return self.func_collate(self.selector, run_config, rng)
class EngineExecutorAggregatorFactoryConfigKey:
274class EngineExecutorAggregatorFactoryConfigKey:
275    TYPE = 'type'
276    WEIGHT = 'weight'
277    CONFIG = 'config'
class EngineExecutorAggregatorFactory(typing.Generic[~_T_RUN_CONFIG, ~_T_RUN_OUTPUT]):
280class EngineExecutorAggregatorFactory(
281    Generic[
282        _T_RUN_CONFIG,
283        _T_RUN_OUTPUT,
284    ]
285):  # yapf: disable
286
287    def __init__(
288        self,
289        engine_executor_factories: Sequence[
290            EngineExecutorFactory[
291                Any,
292                Any,
293                _T_RUN_CONFIG,
294                _T_RUN_OUTPUT,
295            ]
296        ],
297        func_collate: Callable[
298            [
299                EngineExecutorAggregatorSelector[
300                    _T_RUN_CONFIG,
301                    _T_RUN_OUTPUT,
302                ],
303                _T_RUN_CONFIG,
304                RandomGenerator,
305            ],
306            _T_RUN_OUTPUT,
307        ] = engine_executor_aggregator_default_func_collate,
308    ):  # yapf: disable
309        self.type_name_to_engine_executor_factory = {
310            engine_executor_factory.get_type_name(): engine_executor_factory
311            for engine_executor_factory in engine_executor_factories
312        }
313        self.func_collate = func_collate
314
315    def create(
316        self,
317        factory_init_configs: Union[
318            Sequence[Mapping[str, Any]],
319            PathType,
320        ],
321        init_resources: Optional[
322            Sequence[
323                Union[
324                    Mapping[str, Any],
325                    # TODO: find a better way to constrain resource type.
326                    Any,
327                ]
328            ]
329        ] = None,
330    ):  # yapf: disable
331        if is_path_type(factory_init_configs):
332            factory_init_configs = read_json_file(factory_init_configs)  # type: ignore
333        factory_init_configs = cast(Sequence[Mapping[str, Any]], factory_init_configs)
334
335        pairs: List[
336            Tuple[
337                EngineExecutor[Any, Any, _T_RUN_CONFIG, _T_RUN_OUTPUT],
338                float,
339            ],
340        ] = []  # yapf: disable
341
342        for factory_init_config, init_resource in zip(
343            factory_init_configs, init_resources or itertools.repeat(None)
344        ):
345            # Reflects engine_executor_factory.
346            type_name = factory_init_config[EngineExecutorAggregatorFactoryConfigKey.TYPE]
347            if type_name not in self.type_name_to_engine_executor_factory:
348                raise KeyError(f'type_name={type_name} not found')
349            engine_executor_factory = self.type_name_to_engine_executor_factory[type_name]
350
351            # Build init_resource.
352            init_resource_cls = engine_executor_factory.get_init_resource_cls()
353            if init_resource_cls is NoneTypeEngineInitResource:
354                assert init_resource is None
355            else:
356                assert init_resource
357                init_resource = dyn_structure(init_resource, init_resource_cls)
358
359            # Build engine_executor.
360            engine_executor = engine_executor_factory.create(
361                factory_init_config.get(EngineExecutorAggregatorFactoryConfigKey.CONFIG, {}),
362                init_resource,
363            )
364
365            # Get the weight.
366            if len(factory_init_configs) == 1:
367                weight = 1
368            else:
369                weight = factory_init_config[EngineExecutorAggregatorFactoryConfigKey.WEIGHT]
370
371            pairs.append((engine_executor, weight))
372
373        return EngineExecutorAggregator(
374            EngineExecutorAggregatorSelector(pairs),
375            func_collate=self.func_collate,
376        )
377
378    def create_with_repeated_init_resource(
379        self,
380        factory_init_configs: Union[
381            Sequence[Mapping[str, Any]],
382            PathType,
383        ],
384        init_resource: Union[
385            Mapping[str, Any],
386            Any,
387        ]
388    ):  # yapf: disable
389        if is_path_type(factory_init_configs):
390            factory_init_configs = read_json_file(factory_init_configs)  # type: ignore
391        factory_init_configs = cast(Sequence[Mapping[str, Any]], factory_init_configs)
392
393        return self.create(
394            factory_init_configs,
395            [init_resource] * len(factory_init_configs),
396        )
397
398    def create_engine_executor(
399        self,
400        factory_init_config: Mapping[str, Any],
401        init_resource: Optional[
402            Union[
403                Mapping[str, Any],
404                Any,
405            ]
406        ] = None,
407    ):  # yapf: disable
408        executor_aggregator = self.create(
409            factory_init_configs=[factory_init_config],
410            init_resources=[init_resource] if init_resource else None,
411        )
412        return executor_aggregator.selector.engine_executors[0]

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

EngineExecutorAggregatorFactory( engine_executor_factories: Sequence[vkit.engine.interface.EngineExecutorFactory[Any, Any, ~_T_RUN_CONFIG, ~_T_RUN_OUTPUT]], func_collate: Callable[[vkit.engine.interface.EngineExecutorAggregatorSelector[~_T_RUN_CONFIG, ~_T_RUN_OUTPUT], ~_T_RUN_CONFIG, numpy.random._generator.Generator], ~_T_RUN_OUTPUT] = <function engine_executor_aggregator_default_func_collate>)
287    def __init__(
288        self,
289        engine_executor_factories: Sequence[
290            EngineExecutorFactory[
291                Any,
292                Any,
293                _T_RUN_CONFIG,
294                _T_RUN_OUTPUT,
295            ]
296        ],
297        func_collate: Callable[
298            [
299                EngineExecutorAggregatorSelector[
300                    _T_RUN_CONFIG,
301                    _T_RUN_OUTPUT,
302                ],
303                _T_RUN_CONFIG,
304                RandomGenerator,
305            ],
306            _T_RUN_OUTPUT,
307        ] = engine_executor_aggregator_default_func_collate,
308    ):  # yapf: disable
309        self.type_name_to_engine_executor_factory = {
310            engine_executor_factory.get_type_name(): engine_executor_factory
311            for engine_executor_factory in engine_executor_factories
312        }
313        self.func_collate = func_collate
def create( self, factory_init_configs: Union[Sequence[Mapping[str, Any]], str, os.PathLike], init_resources: Union[Sequence[Union[Mapping[str, Any], Any]], NoneType] = None):
315    def create(
316        self,
317        factory_init_configs: Union[
318            Sequence[Mapping[str, Any]],
319            PathType,
320        ],
321        init_resources: Optional[
322            Sequence[
323                Union[
324                    Mapping[str, Any],
325                    # TODO: find a better way to constrain resource type.
326                    Any,
327                ]
328            ]
329        ] = None,
330    ):  # yapf: disable
331        if is_path_type(factory_init_configs):
332            factory_init_configs = read_json_file(factory_init_configs)  # type: ignore
333        factory_init_configs = cast(Sequence[Mapping[str, Any]], factory_init_configs)
334
335        pairs: List[
336            Tuple[
337                EngineExecutor[Any, Any, _T_RUN_CONFIG, _T_RUN_OUTPUT],
338                float,
339            ],
340        ] = []  # yapf: disable
341
342        for factory_init_config, init_resource in zip(
343            factory_init_configs, init_resources or itertools.repeat(None)
344        ):
345            # Reflects engine_executor_factory.
346            type_name = factory_init_config[EngineExecutorAggregatorFactoryConfigKey.TYPE]
347            if type_name not in self.type_name_to_engine_executor_factory:
348                raise KeyError(f'type_name={type_name} not found')
349            engine_executor_factory = self.type_name_to_engine_executor_factory[type_name]
350
351            # Build init_resource.
352            init_resource_cls = engine_executor_factory.get_init_resource_cls()
353            if init_resource_cls is NoneTypeEngineInitResource:
354                assert init_resource is None
355            else:
356                assert init_resource
357                init_resource = dyn_structure(init_resource, init_resource_cls)
358
359            # Build engine_executor.
360            engine_executor = engine_executor_factory.create(
361                factory_init_config.get(EngineExecutorAggregatorFactoryConfigKey.CONFIG, {}),
362                init_resource,
363            )
364
365            # Get the weight.
366            if len(factory_init_configs) == 1:
367                weight = 1
368            else:
369                weight = factory_init_config[EngineExecutorAggregatorFactoryConfigKey.WEIGHT]
370
371            pairs.append((engine_executor, weight))
372
373        return EngineExecutorAggregator(
374            EngineExecutorAggregatorSelector(pairs),
375            func_collate=self.func_collate,
376        )
def create_with_repeated_init_resource( self, factory_init_configs: Union[Sequence[Mapping[str, Any]], str, os.PathLike], init_resource: Union[Mapping[str, Any], Any]):
378    def create_with_repeated_init_resource(
379        self,
380        factory_init_configs: Union[
381            Sequence[Mapping[str, Any]],
382            PathType,
383        ],
384        init_resource: Union[
385            Mapping[str, Any],
386            Any,
387        ]
388    ):  # yapf: disable
389        if is_path_type(factory_init_configs):
390            factory_init_configs = read_json_file(factory_init_configs)  # type: ignore
391        factory_init_configs = cast(Sequence[Mapping[str, Any]], factory_init_configs)
392
393        return self.create(
394            factory_init_configs,
395            [init_resource] * len(factory_init_configs),
396        )
def create_engine_executor( self, factory_init_config: Mapping[str, Any], init_resource: Union[Mapping[str, Any], Any, NoneType] = None):
398    def create_engine_executor(
399        self,
400        factory_init_config: Mapping[str, Any],
401        init_resource: Optional[
402            Union[
403                Mapping[str, Any],
404                Any,
405            ]
406        ] = None,
407    ):  # yapf: disable
408        executor_aggregator = self.create(
409            factory_init_configs=[factory_init_config],
410            init_resources=[init_resource] if init_resource else None,
411        )
412        return executor_aggregator.selector.engine_executors[0]