vkit.engine.interface

  1# Copyright 2022 vkit-x Administrator. All Rights Reserved.
  2#
  3# This project (vkit-x/vkit) is dual-licensed under commercial and SSPL licenses.
  4#
  5# The commercial license gives you the full rights to create and distribute software
  6# on your own terms without any SSPL license obligations. For more information,
  7# please see the "LICENSE_COMMERCIAL.txt" file.
  8#
  9# This project is also available under Server Side Public License (SSPL).
 10# The SSPL licensing is ideal for use cases such as open source projects with
 11# SSPL distribution, student/academic purposes, hobby projects, internal research
 12# projects without external distribution, or other projects where all SSPL
 13# obligations can be met. For more information, please see the "LICENSE_SSPL.txt" file.
 14from typing import (
 15    cast,
 16    Generic,
 17    Type,
 18    TypeVar,
 19    Mapping,
 20    Sequence,
 21    Any,
 22    Tuple,
 23    List,
 24    Optional,
 25    Union,
 26    Callable,
 27)
 28import itertools
 29
 30import attrs
 31from numpy.random import Generator as RandomGenerator
 32
 33from vkit.utility import (
 34    normalize_to_keys_and_probs,
 35    rng_choice,
 36    is_path_type,
 37    read_json_file,
 38    dyn_structure,
 39    get_generic_classes,
 40    PathType,
 41)
 42
 43_T_INIT_CONFIG = TypeVar('_T_INIT_CONFIG')
 44_T_INIT_RESOURCE = TypeVar('_T_INIT_RESOURCE')
 45_T_RUN_CONFIG = TypeVar('_T_RUN_CONFIG')
 46_T_RUN_OUTPUT = TypeVar('_T_RUN_OUTPUT')
 47
 48
 49@attrs.define
 50class NoneTypeEngineInitConfig:
 51    pass
 52
 53
 54@attrs.define
 55class NoneTypeEngineInitResource:
 56    pass
 57
 58
 59class Engine(
 60    Generic[
 61        _T_INIT_CONFIG,
 62        _T_INIT_RESOURCE,
 63        _T_RUN_CONFIG,
 64        _T_RUN_OUTPUT,
 65    ]
 66):  # yapf: disable
 67
 68    @classmethod
 69    def get_type_name(cls) -> str:
 70        raise NotImplementedError()
 71
 72    def __init__(
 73        self,
 74        init_config: _T_INIT_CONFIG,
 75        init_resource: Optional[_T_INIT_RESOURCE] = None,
 76    ):
 77        self.init_config = init_config
 78        self.init_resource = init_resource
 79
 80    def run(self, run_config: _T_RUN_CONFIG, rng: RandomGenerator) -> _T_RUN_OUTPUT:
 81        raise NotImplementedError()
 82
 83
 84class EngineExecutor(
 85    Generic[
 86        _T_INIT_CONFIG,
 87        _T_INIT_RESOURCE,
 88        _T_RUN_CONFIG,
 89        _T_RUN_OUTPUT,
 90    ]
 91):  # yapf: disable
 92
 93    def __init__(
 94        self,
 95        engine: Engine[
 96            _T_INIT_CONFIG,
 97            _T_INIT_RESOURCE,
 98            _T_RUN_CONFIG,
 99            _T_RUN_OUTPUT,
100        ],
101    ):  # yapf: disable
102        self.engine = engine
103
104    def get_run_config_cls(self) -> Type[_T_RUN_CONFIG]:
105        return get_generic_classes(type(self.engine))[2]  # type: ignore
106
107    def run(
108        self,
109        run_config: Union[
110            Mapping[str, Any],
111            _T_RUN_CONFIG,
112        ],
113        rng: RandomGenerator,
114    ) -> _T_RUN_OUTPUT:  # yapf: disable
115        run_config = dyn_structure(run_config, self.get_run_config_cls())
116        return self.engine.run(run_config, rng)
117
118
119class EngineExecutorFactory(
120    Generic[
121        _T_INIT_CONFIG,
122        _T_INIT_RESOURCE,
123        _T_RUN_CONFIG,
124        _T_RUN_OUTPUT,
125    ]
126):  # yapf: disable
127
128    def __init__(
129        self,
130        engine_cls: Type[
131            Engine[
132                _T_INIT_CONFIG,
133                _T_INIT_RESOURCE,
134                _T_RUN_CONFIG,
135                _T_RUN_OUTPUT,
136            ]
137        ],
138    ):  # yapf: disable
139        self.engine_cls = engine_cls
140
141    def get_type_name(self):
142        return self.engine_cls.get_type_name()
143
144    def get_init_config_cls(self) -> Type[_T_INIT_CONFIG]:
145        return get_generic_classes(self.engine_cls)[0]  # type: ignore
146
147    def get_init_resource_cls(self) -> Type[_T_INIT_RESOURCE]:
148        return get_generic_classes(self.engine_cls)[1]  # type: ignore
149
150    def create(
151        self,
152        init_config: Optional[
153            Union[
154                Mapping[str, Any],
155                PathType,
156                _T_INIT_CONFIG,
157            ]
158        ] = None,
159        init_resource: Optional[
160            Union[
161                Mapping[str, Any],
162                _T_INIT_RESOURCE,
163            ]
164        ] = None,
165    ):  # yapf: disable
166        init_config = dyn_structure(
167            init_config,
168            self.get_init_config_cls(),
169            support_path_type=True,
170            support_none_type=True,
171        )
172
173        init_resource_cls = self.get_init_resource_cls()
174        if init_resource_cls is NoneTypeEngineInitResource:
175            assert init_resource is None
176        else:
177            assert init_resource
178        if init_resource is not None:
179            init_resource = dyn_structure(init_resource, init_resource_cls)
180
181        return EngineExecutor(self.engine_cls(init_config, init_resource))
182
183
184class EngineExecutorAggregatorSelector(
185    Generic[
186        _T_RUN_CONFIG,
187        _T_RUN_OUTPUT,
188    ]
189):  # yapf: disable
190
191    def __init__(
192        self,
193        pairs: Sequence[
194            Tuple[
195                EngineExecutor[
196                    Any,
197                    Any,
198                    _T_RUN_CONFIG,
199                    _T_RUN_OUTPUT,
200                ],
201                float,
202            ]
203        ],
204    ):  # yapf: disable
205        self.engine_executors, self.probs = normalize_to_keys_and_probs(pairs)
206
207    def get_run_config_cls(self):
208        return self.engine_executors[0].get_run_config_cls()
209
210    def select_engine_executor(self, rng: RandomGenerator):
211        return rng_choice(rng, self.engine_executors, probs=self.probs)
212
213
214def engine_executor_aggregator_default_func_collate(
215    selector: EngineExecutorAggregatorSelector[
216        _T_RUN_CONFIG,
217        _T_RUN_OUTPUT,
218    ],
219    run_config: _T_RUN_CONFIG,
220    rng: RandomGenerator,
221) -> _T_RUN_OUTPUT:  # yapf: disable
222    engine_executor = selector.select_engine_executor(rng)
223    return engine_executor.run(run_config, rng)
224
225
226class EngineExecutorAggregator(
227    Generic[
228        _T_RUN_CONFIG,
229        _T_RUN_OUTPUT,
230    ]
231):  # yapf: disable
232
233    def get_run_config_cls(self):
234        return self.selector.get_run_config_cls()
235
236    def __init__(
237        self,
238        selector: EngineExecutorAggregatorSelector[
239            _T_RUN_CONFIG,
240            _T_RUN_OUTPUT,
241        ],
242        func_collate: Callable[
243            [
244                EngineExecutorAggregatorSelector[
245                    _T_RUN_CONFIG,
246                    _T_RUN_OUTPUT,
247                ],
248                _T_RUN_CONFIG,
249                RandomGenerator,
250            ],
251            _T_RUN_OUTPUT,
252        ] = engine_executor_aggregator_default_func_collate,
253    ):  # yapf: disable
254        self.selector = selector
255        self.func_collate = func_collate
256
257    def run(
258        self,
259        run_config: Union[
260            Mapping[str, Any],
261            _T_RUN_CONFIG,
262        ],
263        rng: RandomGenerator,
264    ) -> _T_RUN_OUTPUT:  # yapf: disable
265        run_config = dyn_structure(run_config, self.get_run_config_cls())
266        return self.func_collate(self.selector, run_config, rng)
267
268
269class EngineExecutorAggregatorFactoryConfigKey:
270    TYPE = 'type'
271    WEIGHT = 'weight'
272    CONFIG = 'config'
273
274
275class EngineExecutorAggregatorFactory(
276    Generic[
277        _T_RUN_CONFIG,
278        _T_RUN_OUTPUT,
279    ]
280):  # yapf: disable
281
282    def __init__(
283        self,
284        engine_executor_factories: Sequence[
285            EngineExecutorFactory[
286                Any,
287                Any,
288                _T_RUN_CONFIG,
289                _T_RUN_OUTPUT,
290            ]
291        ],
292        func_collate: Callable[
293            [
294                EngineExecutorAggregatorSelector[
295                    _T_RUN_CONFIG,
296                    _T_RUN_OUTPUT,
297                ],
298                _T_RUN_CONFIG,
299                RandomGenerator,
300            ],
301            _T_RUN_OUTPUT,
302        ] = engine_executor_aggregator_default_func_collate,
303    ):  # yapf: disable
304        self.type_name_to_engine_executor_factory = {
305            engine_executor_factory.get_type_name(): engine_executor_factory
306            for engine_executor_factory in engine_executor_factories
307        }
308        self.func_collate = func_collate
309
310    def create(
311        self,
312        factory_init_configs: Union[
313            Sequence[Mapping[str, Any]],
314            PathType,
315        ],
316        init_resources: Optional[
317            Sequence[
318                Union[
319                    Mapping[str, Any],
320                    # TODO: find a better way to constrain resource type.
321                    Any,
322                ]
323            ]
324        ] = None,
325    ):  # yapf: disable
326        if is_path_type(factory_init_configs):
327            factory_init_configs = read_json_file(factory_init_configs)  # type: ignore
328        factory_init_configs = cast(Sequence[Mapping[str, Any]], factory_init_configs)
329
330        pairs: List[
331            Tuple[
332                EngineExecutor[Any, Any, _T_RUN_CONFIG, _T_RUN_OUTPUT],
333                float,
334            ],
335        ] = []  # yapf: disable
336
337        for factory_init_config, init_resource in zip(
338            factory_init_configs, init_resources or itertools.repeat(None)
339        ):
340            # Reflects engine_executor_factory.
341            type_name = factory_init_config[EngineExecutorAggregatorFactoryConfigKey.TYPE]
342            if type_name not in self.type_name_to_engine_executor_factory:
343                raise KeyError(f'type_name={type_name} not found')
344            engine_executor_factory = self.type_name_to_engine_executor_factory[type_name]
345
346            # Build init_resource.
347            init_resource_cls = engine_executor_factory.get_init_resource_cls()
348            if init_resource_cls is NoneTypeEngineInitResource:
349                assert init_resource is None
350            else:
351                assert init_resource
352                init_resource = dyn_structure(init_resource, init_resource_cls)
353
354            # Build engine_executor.
355            engine_executor = engine_executor_factory.create(
356                factory_init_config.get(EngineExecutorAggregatorFactoryConfigKey.CONFIG, {}),
357                init_resource,
358            )
359
360            # Get the weight.
361            if len(factory_init_configs) == 1:
362                weight = 1
363            else:
364                weight = factory_init_config[EngineExecutorAggregatorFactoryConfigKey.WEIGHT]
365
366            pairs.append((engine_executor, weight))
367
368        return EngineExecutorAggregator(
369            EngineExecutorAggregatorSelector(pairs),
370            func_collate=self.func_collate,
371        )
372
373    def create_with_repeated_init_resource(
374        self,
375        factory_init_configs: Union[
376            Sequence[Mapping[str, Any]],
377            PathType,
378        ],
379        init_resource: Union[
380            Mapping[str, Any],
381            Any,
382        ]
383    ):  # yapf: disable
384        if is_path_type(factory_init_configs):
385            factory_init_configs = read_json_file(factory_init_configs)  # type: ignore
386        factory_init_configs = cast(Sequence[Mapping[str, Any]], factory_init_configs)
387
388        return self.create(
389            factory_init_configs,
390            [init_resource] * len(factory_init_configs),
391        )
392
393
394# TODO: move to doc.
395# A minimum template.
396'''
397from typing import Optional
398
399import attrs
400from numpy.random import Generator as RandomGenerator
401
402from vkit.engine.interface import (
403    Engine,
404    EngineExecutorFactory,
405    NoneTypeEngineInitResource,
406)
407
408
409@attrs.define
410class FooEngineInitConfig:
411    pass
412
413
414@attrs.define
415class FooEngineRunConfig:
416    pass
417
418
419class FooEngine(
420    Engine[
421        FooEngineInitConfig,
422        NoneTypeEngineInitResource,
423        FooEngineRunConfig,
424        int,
425    ]
426):  # yapf: disable
427
428    @classmethod
429    def get_type_name(cls) -> str:
430        return 'foo'
431
432    def __init__(
433        self,
434        init_config: FooEngineInitConfig,
435        init_resource: Optional[NoneTypeEngineInitResource] = None,
436    ):
437        super().__init__(init_config, init_resource)
438
439    def run(self, run_config: FooEngineRunConfig, rng: RandomGenerator) -> int:
440        return 42
441
442
443foo_engine_executor_factory = EngineExecutorFactory(FooEngine)
444'''
class NoneTypeEngineInitConfig:
51class NoneTypeEngineInitConfig:
52    pass
NoneTypeEngineInitConfig()
2def __init__(self, ):
3    pass

Method generated by attrs for class NoneTypeEngineInitConfig.

class NoneTypeEngineInitResource:
56class NoneTypeEngineInitResource:
57    pass
NoneTypeEngineInitResource()
2def __init__(self, ):
3    pass

Method generated by attrs for class NoneTypeEngineInitResource.

class Engine(typing.Generic[~_T_INIT_CONFIG, ~_T_INIT_RESOURCE, ~_T_RUN_CONFIG, ~_T_RUN_OUTPUT]):
60class Engine(
61    Generic[
62        _T_INIT_CONFIG,
63        _T_INIT_RESOURCE,
64        _T_RUN_CONFIG,
65        _T_RUN_OUTPUT,
66    ]
67):  # yapf: disable
68
69    @classmethod
70    def get_type_name(cls) -> str:
71        raise NotImplementedError()
72
73    def __init__(
74        self,
75        init_config: _T_INIT_CONFIG,
76        init_resource: Optional[_T_INIT_RESOURCE] = None,
77    ):
78        self.init_config = init_config
79        self.init_resource = init_resource
80
81    def run(self, run_config: _T_RUN_CONFIG, rng: RandomGenerator) -> _T_RUN_OUTPUT:
82        raise NotImplementedError()

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

Engine( init_config: ~_T_INIT_CONFIG, init_resource: Union[~_T_INIT_RESOURCE, NoneType] = None)
73    def __init__(
74        self,
75        init_config: _T_INIT_CONFIG,
76        init_resource: Optional[_T_INIT_RESOURCE] = None,
77    ):
78        self.init_config = init_config
79        self.init_resource = init_resource
@classmethod
def get_type_name(cls) -> str:
69    @classmethod
70    def get_type_name(cls) -> str:
71        raise NotImplementedError()
def run( self, run_config: ~_T_RUN_CONFIG, rng: numpy.random._generator.Generator) -> ~_T_RUN_OUTPUT:
81    def run(self, run_config: _T_RUN_CONFIG, rng: RandomGenerator) -> _T_RUN_OUTPUT:
82        raise NotImplementedError()
class EngineExecutor(typing.Generic[~_T_INIT_CONFIG, ~_T_INIT_RESOURCE, ~_T_RUN_CONFIG, ~_T_RUN_OUTPUT]):
 85class EngineExecutor(
 86    Generic[
 87        _T_INIT_CONFIG,
 88        _T_INIT_RESOURCE,
 89        _T_RUN_CONFIG,
 90        _T_RUN_OUTPUT,
 91    ]
 92):  # yapf: disable
 93
 94    def __init__(
 95        self,
 96        engine: Engine[
 97            _T_INIT_CONFIG,
 98            _T_INIT_RESOURCE,
 99            _T_RUN_CONFIG,
100            _T_RUN_OUTPUT,
101        ],
102    ):  # yapf: disable
103        self.engine = engine
104
105    def get_run_config_cls(self) -> Type[_T_RUN_CONFIG]:
106        return get_generic_classes(type(self.engine))[2]  # type: ignore
107
108    def run(
109        self,
110        run_config: Union[
111            Mapping[str, Any],
112            _T_RUN_CONFIG,
113        ],
114        rng: RandomGenerator,
115    ) -> _T_RUN_OUTPUT:  # yapf: disable
116        run_config = dyn_structure(run_config, self.get_run_config_cls())
117        return self.engine.run(run_config, rng)

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

EngineExecutor( engine: vkit.engine.interface.Engine[~_T_INIT_CONFIG, ~_T_INIT_RESOURCE, ~_T_RUN_CONFIG, ~_T_RUN_OUTPUT])
 94    def __init__(
 95        self,
 96        engine: Engine[
 97            _T_INIT_CONFIG,
 98            _T_INIT_RESOURCE,
 99            _T_RUN_CONFIG,
100            _T_RUN_OUTPUT,
101        ],
102    ):  # yapf: disable
103        self.engine = engine
def get_run_config_cls(self) -> Type[~_T_RUN_CONFIG]:
105    def get_run_config_cls(self) -> Type[_T_RUN_CONFIG]:
106        return get_generic_classes(type(self.engine))[2]  # type: ignore
def run( self, run_config: Union[Mapping[str, Any], ~_T_RUN_CONFIG], rng: numpy.random._generator.Generator) -> ~_T_RUN_OUTPUT:
108    def run(
109        self,
110        run_config: Union[
111            Mapping[str, Any],
112            _T_RUN_CONFIG,
113        ],
114        rng: RandomGenerator,
115    ) -> _T_RUN_OUTPUT:  # yapf: disable
116        run_config = dyn_structure(run_config, self.get_run_config_cls())
117        return self.engine.run(run_config, rng)
class EngineExecutorFactory(typing.Generic[~_T_INIT_CONFIG, ~_T_INIT_RESOURCE, ~_T_RUN_CONFIG, ~_T_RUN_OUTPUT]):
120class EngineExecutorFactory(
121    Generic[
122        _T_INIT_CONFIG,
123        _T_INIT_RESOURCE,
124        _T_RUN_CONFIG,
125        _T_RUN_OUTPUT,
126    ]
127):  # yapf: disable
128
129    def __init__(
130        self,
131        engine_cls: Type[
132            Engine[
133                _T_INIT_CONFIG,
134                _T_INIT_RESOURCE,
135                _T_RUN_CONFIG,
136                _T_RUN_OUTPUT,
137            ]
138        ],
139    ):  # yapf: disable
140        self.engine_cls = engine_cls
141
142    def get_type_name(self):
143        return self.engine_cls.get_type_name()
144
145    def get_init_config_cls(self) -> Type[_T_INIT_CONFIG]:
146        return get_generic_classes(self.engine_cls)[0]  # type: ignore
147
148    def get_init_resource_cls(self) -> Type[_T_INIT_RESOURCE]:
149        return get_generic_classes(self.engine_cls)[1]  # type: ignore
150
151    def create(
152        self,
153        init_config: Optional[
154            Union[
155                Mapping[str, Any],
156                PathType,
157                _T_INIT_CONFIG,
158            ]
159        ] = None,
160        init_resource: Optional[
161            Union[
162                Mapping[str, Any],
163                _T_INIT_RESOURCE,
164            ]
165        ] = None,
166    ):  # yapf: disable
167        init_config = dyn_structure(
168            init_config,
169            self.get_init_config_cls(),
170            support_path_type=True,
171            support_none_type=True,
172        )
173
174        init_resource_cls = self.get_init_resource_cls()
175        if init_resource_cls is NoneTypeEngineInitResource:
176            assert init_resource is None
177        else:
178            assert init_resource
179        if init_resource is not None:
180            init_resource = dyn_structure(init_resource, init_resource_cls)
181
182        return EngineExecutor(self.engine_cls(init_config, init_resource))

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

EngineExecutorFactory( engine_cls: Type[vkit.engine.interface.Engine[~_T_INIT_CONFIG, ~_T_INIT_RESOURCE, ~_T_RUN_CONFIG, ~_T_RUN_OUTPUT]])
129    def __init__(
130        self,
131        engine_cls: Type[
132            Engine[
133                _T_INIT_CONFIG,
134                _T_INIT_RESOURCE,
135                _T_RUN_CONFIG,
136                _T_RUN_OUTPUT,
137            ]
138        ],
139    ):  # yapf: disable
140        self.engine_cls = engine_cls
def get_type_name(self):
142    def get_type_name(self):
143        return self.engine_cls.get_type_name()
def get_init_config_cls(self) -> Type[~_T_INIT_CONFIG]:
145    def get_init_config_cls(self) -> Type[_T_INIT_CONFIG]:
146        return get_generic_classes(self.engine_cls)[0]  # type: ignore
def get_init_resource_cls(self) -> Type[~_T_INIT_RESOURCE]:
148    def get_init_resource_cls(self) -> Type[_T_INIT_RESOURCE]:
149        return get_generic_classes(self.engine_cls)[1]  # type: ignore
def create( self, init_config: Union[Mapping[str, Any], str, os.PathLike, ~_T_INIT_CONFIG, NoneType] = None, init_resource: Union[Mapping[str, Any], ~_T_INIT_RESOURCE, NoneType] = None):
151    def create(
152        self,
153        init_config: Optional[
154            Union[
155                Mapping[str, Any],
156                PathType,
157                _T_INIT_CONFIG,
158            ]
159        ] = None,
160        init_resource: Optional[
161            Union[
162                Mapping[str, Any],
163                _T_INIT_RESOURCE,
164            ]
165        ] = None,
166    ):  # yapf: disable
167        init_config = dyn_structure(
168            init_config,
169            self.get_init_config_cls(),
170            support_path_type=True,
171            support_none_type=True,
172        )
173
174        init_resource_cls = self.get_init_resource_cls()
175        if init_resource_cls is NoneTypeEngineInitResource:
176            assert init_resource is None
177        else:
178            assert init_resource
179        if init_resource is not None:
180            init_resource = dyn_structure(init_resource, init_resource_cls)
181
182        return EngineExecutor(self.engine_cls(init_config, init_resource))
class EngineExecutorAggregatorSelector(typing.Generic[~_T_RUN_CONFIG, ~_T_RUN_OUTPUT]):
185class EngineExecutorAggregatorSelector(
186    Generic[
187        _T_RUN_CONFIG,
188        _T_RUN_OUTPUT,
189    ]
190):  # yapf: disable
191
192    def __init__(
193        self,
194        pairs: Sequence[
195            Tuple[
196                EngineExecutor[
197                    Any,
198                    Any,
199                    _T_RUN_CONFIG,
200                    _T_RUN_OUTPUT,
201                ],
202                float,
203            ]
204        ],
205    ):  # yapf: disable
206        self.engine_executors, self.probs = normalize_to_keys_and_probs(pairs)
207
208    def get_run_config_cls(self):
209        return self.engine_executors[0].get_run_config_cls()
210
211    def select_engine_executor(self, rng: RandomGenerator):
212        return rng_choice(rng, self.engine_executors, probs=self.probs)

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

EngineExecutorAggregatorSelector( pairs: Sequence[Tuple[vkit.engine.interface.EngineExecutor[Any, Any, ~_T_RUN_CONFIG, ~_T_RUN_OUTPUT], float]])
192    def __init__(
193        self,
194        pairs: Sequence[
195            Tuple[
196                EngineExecutor[
197                    Any,
198                    Any,
199                    _T_RUN_CONFIG,
200                    _T_RUN_OUTPUT,
201                ],
202                float,
203            ]
204        ],
205    ):  # yapf: disable
206        self.engine_executors, self.probs = normalize_to_keys_and_probs(pairs)
def get_run_config_cls(self):
208    def get_run_config_cls(self):
209        return self.engine_executors[0].get_run_config_cls()
def select_engine_executor(self, rng: numpy.random._generator.Generator):
211    def select_engine_executor(self, rng: RandomGenerator):
212        return rng_choice(rng, self.engine_executors, probs=self.probs)
def engine_executor_aggregator_default_func_collate( selector: vkit.engine.interface.EngineExecutorAggregatorSelector[~_T_RUN_CONFIG, ~_T_RUN_OUTPUT], run_config: ~_T_RUN_CONFIG, rng: numpy.random._generator.Generator) -> ~_T_RUN_OUTPUT:
215def engine_executor_aggregator_default_func_collate(
216    selector: EngineExecutorAggregatorSelector[
217        _T_RUN_CONFIG,
218        _T_RUN_OUTPUT,
219    ],
220    run_config: _T_RUN_CONFIG,
221    rng: RandomGenerator,
222) -> _T_RUN_OUTPUT:  # yapf: disable
223    engine_executor = selector.select_engine_executor(rng)
224    return engine_executor.run(run_config, rng)
class EngineExecutorAggregator(typing.Generic[~_T_RUN_CONFIG, ~_T_RUN_OUTPUT]):
227class EngineExecutorAggregator(
228    Generic[
229        _T_RUN_CONFIG,
230        _T_RUN_OUTPUT,
231    ]
232):  # yapf: disable
233
234    def get_run_config_cls(self):
235        return self.selector.get_run_config_cls()
236
237    def __init__(
238        self,
239        selector: EngineExecutorAggregatorSelector[
240            _T_RUN_CONFIG,
241            _T_RUN_OUTPUT,
242        ],
243        func_collate: Callable[
244            [
245                EngineExecutorAggregatorSelector[
246                    _T_RUN_CONFIG,
247                    _T_RUN_OUTPUT,
248                ],
249                _T_RUN_CONFIG,
250                RandomGenerator,
251            ],
252            _T_RUN_OUTPUT,
253        ] = engine_executor_aggregator_default_func_collate,
254    ):  # yapf: disable
255        self.selector = selector
256        self.func_collate = func_collate
257
258    def run(
259        self,
260        run_config: Union[
261            Mapping[str, Any],
262            _T_RUN_CONFIG,
263        ],
264        rng: RandomGenerator,
265    ) -> _T_RUN_OUTPUT:  # yapf: disable
266        run_config = dyn_structure(run_config, self.get_run_config_cls())
267        return self.func_collate(self.selector, run_config, rng)

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

EngineExecutorAggregator( selector: vkit.engine.interface.EngineExecutorAggregatorSelector[~_T_RUN_CONFIG, ~_T_RUN_OUTPUT], func_collate: Callable[[vkit.engine.interface.EngineExecutorAggregatorSelector[~_T_RUN_CONFIG, ~_T_RUN_OUTPUT], ~_T_RUN_CONFIG, numpy.random._generator.Generator], ~_T_RUN_OUTPUT] = <function engine_executor_aggregator_default_func_collate>)
237    def __init__(
238        self,
239        selector: EngineExecutorAggregatorSelector[
240            _T_RUN_CONFIG,
241            _T_RUN_OUTPUT,
242        ],
243        func_collate: Callable[
244            [
245                EngineExecutorAggregatorSelector[
246                    _T_RUN_CONFIG,
247                    _T_RUN_OUTPUT,
248                ],
249                _T_RUN_CONFIG,
250                RandomGenerator,
251            ],
252            _T_RUN_OUTPUT,
253        ] = engine_executor_aggregator_default_func_collate,
254    ):  # yapf: disable
255        self.selector = selector
256        self.func_collate = func_collate
def get_run_config_cls(self):
234    def get_run_config_cls(self):
235        return self.selector.get_run_config_cls()
def run( self, run_config: Union[Mapping[str, Any], ~_T_RUN_CONFIG], rng: numpy.random._generator.Generator) -> ~_T_RUN_OUTPUT:
258    def run(
259        self,
260        run_config: Union[
261            Mapping[str, Any],
262            _T_RUN_CONFIG,
263        ],
264        rng: RandomGenerator,
265    ) -> _T_RUN_OUTPUT:  # yapf: disable
266        run_config = dyn_structure(run_config, self.get_run_config_cls())
267        return self.func_collate(self.selector, run_config, rng)
class EngineExecutorAggregatorFactoryConfigKey:
270class EngineExecutorAggregatorFactoryConfigKey:
271    TYPE = 'type'
272    WEIGHT = 'weight'
273    CONFIG = 'config'
EngineExecutorAggregatorFactoryConfigKey()
class EngineExecutorAggregatorFactory(typing.Generic[~_T_RUN_CONFIG, ~_T_RUN_OUTPUT]):
276class EngineExecutorAggregatorFactory(
277    Generic[
278        _T_RUN_CONFIG,
279        _T_RUN_OUTPUT,
280    ]
281):  # yapf: disable
282
283    def __init__(
284        self,
285        engine_executor_factories: Sequence[
286            EngineExecutorFactory[
287                Any,
288                Any,
289                _T_RUN_CONFIG,
290                _T_RUN_OUTPUT,
291            ]
292        ],
293        func_collate: Callable[
294            [
295                EngineExecutorAggregatorSelector[
296                    _T_RUN_CONFIG,
297                    _T_RUN_OUTPUT,
298                ],
299                _T_RUN_CONFIG,
300                RandomGenerator,
301            ],
302            _T_RUN_OUTPUT,
303        ] = engine_executor_aggregator_default_func_collate,
304    ):  # yapf: disable
305        self.type_name_to_engine_executor_factory = {
306            engine_executor_factory.get_type_name(): engine_executor_factory
307            for engine_executor_factory in engine_executor_factories
308        }
309        self.func_collate = func_collate
310
311    def create(
312        self,
313        factory_init_configs: Union[
314            Sequence[Mapping[str, Any]],
315            PathType,
316        ],
317        init_resources: Optional[
318            Sequence[
319                Union[
320                    Mapping[str, Any],
321                    # TODO: find a better way to constrain resource type.
322                    Any,
323                ]
324            ]
325        ] = None,
326    ):  # yapf: disable
327        if is_path_type(factory_init_configs):
328            factory_init_configs = read_json_file(factory_init_configs)  # type: ignore
329        factory_init_configs = cast(Sequence[Mapping[str, Any]], factory_init_configs)
330
331        pairs: List[
332            Tuple[
333                EngineExecutor[Any, Any, _T_RUN_CONFIG, _T_RUN_OUTPUT],
334                float,
335            ],
336        ] = []  # yapf: disable
337
338        for factory_init_config, init_resource in zip(
339            factory_init_configs, init_resources or itertools.repeat(None)
340        ):
341            # Reflects engine_executor_factory.
342            type_name = factory_init_config[EngineExecutorAggregatorFactoryConfigKey.TYPE]
343            if type_name not in self.type_name_to_engine_executor_factory:
344                raise KeyError(f'type_name={type_name} not found')
345            engine_executor_factory = self.type_name_to_engine_executor_factory[type_name]
346
347            # Build init_resource.
348            init_resource_cls = engine_executor_factory.get_init_resource_cls()
349            if init_resource_cls is NoneTypeEngineInitResource:
350                assert init_resource is None
351            else:
352                assert init_resource
353                init_resource = dyn_structure(init_resource, init_resource_cls)
354
355            # Build engine_executor.
356            engine_executor = engine_executor_factory.create(
357                factory_init_config.get(EngineExecutorAggregatorFactoryConfigKey.CONFIG, {}),
358                init_resource,
359            )
360
361            # Get the weight.
362            if len(factory_init_configs) == 1:
363                weight = 1
364            else:
365                weight = factory_init_config[EngineExecutorAggregatorFactoryConfigKey.WEIGHT]
366
367            pairs.append((engine_executor, weight))
368
369        return EngineExecutorAggregator(
370            EngineExecutorAggregatorSelector(pairs),
371            func_collate=self.func_collate,
372        )
373
374    def create_with_repeated_init_resource(
375        self,
376        factory_init_configs: Union[
377            Sequence[Mapping[str, Any]],
378            PathType,
379        ],
380        init_resource: Union[
381            Mapping[str, Any],
382            Any,
383        ]
384    ):  # yapf: disable
385        if is_path_type(factory_init_configs):
386            factory_init_configs = read_json_file(factory_init_configs)  # type: ignore
387        factory_init_configs = cast(Sequence[Mapping[str, Any]], factory_init_configs)
388
389        return self.create(
390            factory_init_configs,
391            [init_resource] * len(factory_init_configs),
392        )

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

EngineExecutorAggregatorFactory( engine_executor_factories: Sequence[vkit.engine.interface.EngineExecutorFactory[Any, Any, ~_T_RUN_CONFIG, ~_T_RUN_OUTPUT]], func_collate: Callable[[vkit.engine.interface.EngineExecutorAggregatorSelector[~_T_RUN_CONFIG, ~_T_RUN_OUTPUT], ~_T_RUN_CONFIG, numpy.random._generator.Generator], ~_T_RUN_OUTPUT] = <function engine_executor_aggregator_default_func_collate>)
283    def __init__(
284        self,
285        engine_executor_factories: Sequence[
286            EngineExecutorFactory[
287                Any,
288                Any,
289                _T_RUN_CONFIG,
290                _T_RUN_OUTPUT,
291            ]
292        ],
293        func_collate: Callable[
294            [
295                EngineExecutorAggregatorSelector[
296                    _T_RUN_CONFIG,
297                    _T_RUN_OUTPUT,
298                ],
299                _T_RUN_CONFIG,
300                RandomGenerator,
301            ],
302            _T_RUN_OUTPUT,
303        ] = engine_executor_aggregator_default_func_collate,
304    ):  # yapf: disable
305        self.type_name_to_engine_executor_factory = {
306            engine_executor_factory.get_type_name(): engine_executor_factory
307            for engine_executor_factory in engine_executor_factories
308        }
309        self.func_collate = func_collate
def create( self, factory_init_configs: Union[Sequence[Mapping[str, Any]], str, os.PathLike], init_resources: Union[Sequence[Union[Mapping[str, Any], Any]], NoneType] = None):
311    def create(
312        self,
313        factory_init_configs: Union[
314            Sequence[Mapping[str, Any]],
315            PathType,
316        ],
317        init_resources: Optional[
318            Sequence[
319                Union[
320                    Mapping[str, Any],
321                    # TODO: find a better way to constrain resource type.
322                    Any,
323                ]
324            ]
325        ] = None,
326    ):  # yapf: disable
327        if is_path_type(factory_init_configs):
328            factory_init_configs = read_json_file(factory_init_configs)  # type: ignore
329        factory_init_configs = cast(Sequence[Mapping[str, Any]], factory_init_configs)
330
331        pairs: List[
332            Tuple[
333                EngineExecutor[Any, Any, _T_RUN_CONFIG, _T_RUN_OUTPUT],
334                float,
335            ],
336        ] = []  # yapf: disable
337
338        for factory_init_config, init_resource in zip(
339            factory_init_configs, init_resources or itertools.repeat(None)
340        ):
341            # Reflects engine_executor_factory.
342            type_name = factory_init_config[EngineExecutorAggregatorFactoryConfigKey.TYPE]
343            if type_name not in self.type_name_to_engine_executor_factory:
344                raise KeyError(f'type_name={type_name} not found')
345            engine_executor_factory = self.type_name_to_engine_executor_factory[type_name]
346
347            # Build init_resource.
348            init_resource_cls = engine_executor_factory.get_init_resource_cls()
349            if init_resource_cls is NoneTypeEngineInitResource:
350                assert init_resource is None
351            else:
352                assert init_resource
353                init_resource = dyn_structure(init_resource, init_resource_cls)
354
355            # Build engine_executor.
356            engine_executor = engine_executor_factory.create(
357                factory_init_config.get(EngineExecutorAggregatorFactoryConfigKey.CONFIG, {}),
358                init_resource,
359            )
360
361            # Get the weight.
362            if len(factory_init_configs) == 1:
363                weight = 1
364            else:
365                weight = factory_init_config[EngineExecutorAggregatorFactoryConfigKey.WEIGHT]
366
367            pairs.append((engine_executor, weight))
368
369        return EngineExecutorAggregator(
370            EngineExecutorAggregatorSelector(pairs),
371            func_collate=self.func_collate,
372        )
def create_with_repeated_init_resource( self, factory_init_configs: Union[Sequence[Mapping[str, Any]], str, os.PathLike], init_resource: Union[Mapping[str, Any], Any]):
374    def create_with_repeated_init_resource(
375        self,
376        factory_init_configs: Union[
377            Sequence[Mapping[str, Any]],
378            PathType,
379        ],
380        init_resource: Union[
381            Mapping[str, Any],
382            Any,
383        ]
384    ):  # yapf: disable
385        if is_path_type(factory_init_configs):
386            factory_init_configs = read_json_file(factory_init_configs)  # type: ignore
387        factory_init_configs = cast(Sequence[Mapping[str, Any]], factory_init_configs)
388
389        return self.create(
390            factory_init_configs,
391            [init_resource] * len(factory_init_configs),
392        )