vkit.engine.interface
1# Copyright 2022 vkit-x Administrator. All Rights Reserved. 2# 3# This project (vkit-x/vkit) is dual-licensed under commercial and SSPL licenses. 4# 5# The commercial license gives you the full rights to create and distribute software 6# on your own terms without any SSPL license obligations. For more information, 7# please see the "LICENSE_COMMERCIAL.txt" file. 8# 9# This project is also available under Server Side Public License (SSPL). 10# The SSPL licensing is ideal for use cases such as open source projects with 11# SSPL distribution, student/academic purposes, hobby projects, internal research 12# projects without external distribution, or other projects where all SSPL 13# obligations can be met. For more information, please see the "LICENSE_SSPL.txt" file. 14from typing import ( 15 cast, 16 Generic, 17 Type, 18 TypeVar, 19 Mapping, 20 Sequence, 21 Any, 22 Tuple, 23 List, 24 Optional, 25 Union, 26 Callable, 27) 28import itertools 29 30import attrs 31from numpy.random import Generator as RandomGenerator 32 33from vkit.utility import ( 34 normalize_to_keys_and_probs, 35 rng_choice, 36 is_path_type, 37 read_json_file, 38 dyn_structure, 39 get_generic_classes, 40 PathType, 41) 42 43_T_INIT_CONFIG = TypeVar('_T_INIT_CONFIG') 44_T_INIT_RESOURCE = TypeVar('_T_INIT_RESOURCE') 45_T_RUN_CONFIG = TypeVar('_T_RUN_CONFIG') 46_T_RUN_OUTPUT = TypeVar('_T_RUN_OUTPUT') 47 48 49@attrs.define 50class NoneTypeEngineInitConfig: 51 pass 52 53 54@attrs.define 55class NoneTypeEngineInitResource: 56 pass 57 58 59class Engine( 60 Generic[ 61 _T_INIT_CONFIG, 62 _T_INIT_RESOURCE, 63 _T_RUN_CONFIG, 64 _T_RUN_OUTPUT, 65 ] 66): # yapf: disable 67 68 @classmethod 69 def get_type_name(cls) -> str: 70 raise NotImplementedError() 71 72 def __init__( 73 self, 74 init_config: _T_INIT_CONFIG, 75 init_resource: Optional[_T_INIT_RESOURCE] = None, 76 ): 77 self.init_config = init_config 78 self.init_resource = init_resource 79 80 def run( 81 self, 82 run_config: _T_RUN_CONFIG, 83 rng: Optional[RandomGenerator] = None, 84 ) -> _T_RUN_OUTPUT: 85 raise NotImplementedError() 86 87 88class EngineExecutor( 89 Generic[ 90 _T_INIT_CONFIG, 91 _T_INIT_RESOURCE, 92 _T_RUN_CONFIG, 93 _T_RUN_OUTPUT, 94 ] 95): # yapf: disable 96 97 def __init__( 98 self, 99 engine: Engine[ 100 _T_INIT_CONFIG, 101 _T_INIT_RESOURCE, 102 _T_RUN_CONFIG, 103 _T_RUN_OUTPUT, 104 ], 105 ): # yapf: disable 106 self.engine = engine 107 108 def get_run_config_cls(self) -> Type[_T_RUN_CONFIG]: 109 return get_generic_classes(type(self.engine))[2] # type: ignore 110 111 def run( 112 self, 113 run_config: Union[ 114 Mapping[str, Any], 115 _T_RUN_CONFIG, 116 ], 117 rng: Optional[RandomGenerator] = None, 118 ) -> _T_RUN_OUTPUT: # yapf: disable 119 run_config = dyn_structure(run_config, self.get_run_config_cls()) 120 return self.engine.run(run_config, rng) 121 122 123class EngineExecutorFactory( 124 Generic[ 125 _T_INIT_CONFIG, 126 _T_INIT_RESOURCE, 127 _T_RUN_CONFIG, 128 _T_RUN_OUTPUT, 129 ] 130): # yapf: disable 131 132 def __init__( 133 self, 134 engine_cls: Type[ 135 Engine[ 136 _T_INIT_CONFIG, 137 _T_INIT_RESOURCE, 138 _T_RUN_CONFIG, 139 _T_RUN_OUTPUT, 140 ] 141 ], 142 ): # yapf: disable 143 self.engine_cls = engine_cls 144 145 def get_type_name(self): 146 return self.engine_cls.get_type_name() 147 148 def get_init_config_cls(self) -> Type[_T_INIT_CONFIG]: 149 return get_generic_classes(self.engine_cls)[0] # type: ignore 150 151 def get_init_resource_cls(self) -> Type[_T_INIT_RESOURCE]: 152 return get_generic_classes(self.engine_cls)[1] # type: ignore 153 154 def create( 155 self, 156 init_config: Optional[ 157 Union[ 158 Mapping[str, Any], 159 PathType, 160 _T_INIT_CONFIG, 161 ] 162 ] = None, 163 init_resource: Optional[ 164 Union[ 165 Mapping[str, Any], 166 _T_INIT_RESOURCE, 167 ] 168 ] = None, 169 ): # yapf: disable 170 init_config = dyn_structure( 171 init_config, 172 self.get_init_config_cls(), 173 support_path_type=True, 174 support_none_type=True, 175 ) 176 177 init_resource_cls = self.get_init_resource_cls() 178 if init_resource_cls is NoneTypeEngineInitResource: 179 assert init_resource is None 180 else: 181 assert init_resource 182 if init_resource is not None: 183 init_resource = dyn_structure(init_resource, init_resource_cls) 184 185 return EngineExecutor(self.engine_cls(init_config, init_resource)) 186 187 188class EngineExecutorAggregatorSelector( 189 Generic[ 190 _T_RUN_CONFIG, 191 _T_RUN_OUTPUT, 192 ] 193): # yapf: disable 194 195 def __init__( 196 self, 197 pairs: Sequence[ 198 Tuple[ 199 EngineExecutor[ 200 Any, 201 Any, 202 _T_RUN_CONFIG, 203 _T_RUN_OUTPUT, 204 ], 205 float, 206 ] 207 ], 208 ): # yapf: disable 209 self.engine_executors, self.probs = normalize_to_keys_and_probs(pairs) 210 211 def get_run_config_cls(self): 212 return self.engine_executors[0].get_run_config_cls() 213 214 def select_engine_executor(self, rng: RandomGenerator): 215 return rng_choice(rng, self.engine_executors, probs=self.probs) 216 217 218def engine_executor_aggregator_default_func_collate( 219 selector: EngineExecutorAggregatorSelector[ 220 _T_RUN_CONFIG, 221 _T_RUN_OUTPUT, 222 ], 223 run_config: _T_RUN_CONFIG, 224 rng: RandomGenerator, 225) -> _T_RUN_OUTPUT: # yapf: disable 226 engine_executor = selector.select_engine_executor(rng) 227 return engine_executor.run(run_config, rng) 228 229 230class EngineExecutorAggregator( 231 Generic[ 232 _T_RUN_CONFIG, 233 _T_RUN_OUTPUT, 234 ] 235): # yapf: disable 236 237 def get_run_config_cls(self): 238 return self.selector.get_run_config_cls() 239 240 def __init__( 241 self, 242 selector: EngineExecutorAggregatorSelector[ 243 _T_RUN_CONFIG, 244 _T_RUN_OUTPUT, 245 ], 246 func_collate: Callable[ 247 [ 248 EngineExecutorAggregatorSelector[ 249 _T_RUN_CONFIG, 250 _T_RUN_OUTPUT, 251 ], 252 _T_RUN_CONFIG, 253 RandomGenerator, 254 ], 255 _T_RUN_OUTPUT, 256 ] = engine_executor_aggregator_default_func_collate, 257 ): # yapf: disable 258 self.selector = selector 259 self.func_collate = func_collate 260 261 def run( 262 self, 263 run_config: Union[ 264 Mapping[str, Any], 265 _T_RUN_CONFIG, 266 ], 267 rng: RandomGenerator, 268 ) -> _T_RUN_OUTPUT: # yapf: disable 269 run_config = dyn_structure(run_config, self.get_run_config_cls()) 270 return self.func_collate(self.selector, run_config, rng) 271 272 273class EngineExecutorAggregatorFactoryConfigKey: 274 TYPE = 'type' 275 WEIGHT = 'weight' 276 CONFIG = 'config' 277 278 279class EngineExecutorAggregatorFactory( 280 Generic[ 281 _T_RUN_CONFIG, 282 _T_RUN_OUTPUT, 283 ] 284): # yapf: disable 285 286 def __init__( 287 self, 288 engine_executor_factories: Sequence[ 289 EngineExecutorFactory[ 290 Any, 291 Any, 292 _T_RUN_CONFIG, 293 _T_RUN_OUTPUT, 294 ] 295 ], 296 func_collate: Callable[ 297 [ 298 EngineExecutorAggregatorSelector[ 299 _T_RUN_CONFIG, 300 _T_RUN_OUTPUT, 301 ], 302 _T_RUN_CONFIG, 303 RandomGenerator, 304 ], 305 _T_RUN_OUTPUT, 306 ] = engine_executor_aggregator_default_func_collate, 307 ): # yapf: disable 308 self.type_name_to_engine_executor_factory = { 309 engine_executor_factory.get_type_name(): engine_executor_factory 310 for engine_executor_factory in engine_executor_factories 311 } 312 self.func_collate = func_collate 313 314 def create( 315 self, 316 factory_init_configs: Union[ 317 Sequence[Mapping[str, Any]], 318 PathType, 319 ], 320 init_resources: Optional[ 321 Sequence[ 322 Union[ 323 Mapping[str, Any], 324 # TODO: find a better way to constrain resource type. 325 Any, 326 ] 327 ] 328 ] = None, 329 ): # yapf: disable 330 if is_path_type(factory_init_configs): 331 factory_init_configs = read_json_file(factory_init_configs) # type: ignore 332 factory_init_configs = cast(Sequence[Mapping[str, Any]], factory_init_configs) 333 334 pairs: List[ 335 Tuple[ 336 EngineExecutor[Any, Any, _T_RUN_CONFIG, _T_RUN_OUTPUT], 337 float, 338 ], 339 ] = [] # yapf: disable 340 341 for factory_init_config, init_resource in zip( 342 factory_init_configs, init_resources or itertools.repeat(None) 343 ): 344 # Reflects engine_executor_factory. 345 type_name = factory_init_config[EngineExecutorAggregatorFactoryConfigKey.TYPE] 346 if type_name not in self.type_name_to_engine_executor_factory: 347 raise KeyError(f'type_name={type_name} not found') 348 engine_executor_factory = self.type_name_to_engine_executor_factory[type_name] 349 350 # Build init_resource. 351 init_resource_cls = engine_executor_factory.get_init_resource_cls() 352 if init_resource_cls is NoneTypeEngineInitResource: 353 assert init_resource is None 354 else: 355 assert init_resource 356 init_resource = dyn_structure(init_resource, init_resource_cls) 357 358 # Build engine_executor. 359 engine_executor = engine_executor_factory.create( 360 factory_init_config.get(EngineExecutorAggregatorFactoryConfigKey.CONFIG, {}), 361 init_resource, 362 ) 363 364 # Get the weight. 365 if len(factory_init_configs) == 1: 366 weight = 1 367 else: 368 weight = factory_init_config[EngineExecutorAggregatorFactoryConfigKey.WEIGHT] 369 370 pairs.append((engine_executor, weight)) 371 372 return EngineExecutorAggregator( 373 EngineExecutorAggregatorSelector(pairs), 374 func_collate=self.func_collate, 375 ) 376 377 def create_with_repeated_init_resource( 378 self, 379 factory_init_configs: Union[ 380 Sequence[Mapping[str, Any]], 381 PathType, 382 ], 383 init_resource: Union[ 384 Mapping[str, Any], 385 Any, 386 ] 387 ): # yapf: disable 388 if is_path_type(factory_init_configs): 389 factory_init_configs = read_json_file(factory_init_configs) # type: ignore 390 factory_init_configs = cast(Sequence[Mapping[str, Any]], factory_init_configs) 391 392 return self.create( 393 factory_init_configs, 394 [init_resource] * len(factory_init_configs), 395 ) 396 397 def create_engine_executor( 398 self, 399 factory_init_config: Mapping[str, Any], 400 init_resource: Optional[ 401 Union[ 402 Mapping[str, Any], 403 Any, 404 ] 405 ] = None, 406 ): # yapf: disable 407 executor_aggregator = self.create( 408 factory_init_configs=[factory_init_config], 409 init_resources=[init_resource] if init_resource else None, 410 ) 411 return executor_aggregator.selector.engine_executors[0] 412 413 414# TODO: move to doc. 415# A minimum template. 416''' 417from typing import Optional 418 419import attrs 420from numpy.random import Generator as RandomGenerator 421 422from ..interface import ( 423 Engine, 424 EngineExecutorFactory, 425 NoneTypeEngineInitResource, 426) 427 428 429@attrs.define 430class FooEngineInitConfig: 431 pass 432 433 434@attrs.define 435class FooEngineRunConfig: 436 pass 437 438 439class FooEngine( 440 Engine[ 441 FooEngineInitConfig, 442 NoneTypeEngineInitResource, 443 FooEngineRunConfig, 444 int, 445 ] 446): # yapf: disable 447 448 @classmethod 449 def get_type_name(cls) -> str: 450 return 'foo' 451 452 def __init__( 453 self, 454 init_config: FooEngineInitConfig, 455 init_resource: Optional[NoneTypeEngineInitResource] = None, 456 ): 457 super().__init__(init_config, init_resource) 458 459 def run(self, run_config: FooEngineRunConfig, rng: RandomGenerator) -> int: 460 return 42 461 462 463foo_engine_executor_factory = EngineExecutorFactory(FooEngine) 464'''
60class Engine( 61 Generic[ 62 _T_INIT_CONFIG, 63 _T_INIT_RESOURCE, 64 _T_RUN_CONFIG, 65 _T_RUN_OUTPUT, 66 ] 67): # yapf: disable 68 69 @classmethod 70 def get_type_name(cls) -> str: 71 raise NotImplementedError() 72 73 def __init__( 74 self, 75 init_config: _T_INIT_CONFIG, 76 init_resource: Optional[_T_INIT_RESOURCE] = None, 77 ): 78 self.init_config = init_config 79 self.init_resource = init_resource 80 81 def run( 82 self, 83 run_config: _T_RUN_CONFIG, 84 rng: Optional[RandomGenerator] = None, 85 ) -> _T_RUN_OUTPUT: 86 raise NotImplementedError()
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
89class EngineExecutor( 90 Generic[ 91 _T_INIT_CONFIG, 92 _T_INIT_RESOURCE, 93 _T_RUN_CONFIG, 94 _T_RUN_OUTPUT, 95 ] 96): # yapf: disable 97 98 def __init__( 99 self, 100 engine: Engine[ 101 _T_INIT_CONFIG, 102 _T_INIT_RESOURCE, 103 _T_RUN_CONFIG, 104 _T_RUN_OUTPUT, 105 ], 106 ): # yapf: disable 107 self.engine = engine 108 109 def get_run_config_cls(self) -> Type[_T_RUN_CONFIG]: 110 return get_generic_classes(type(self.engine))[2] # type: ignore 111 112 def run( 113 self, 114 run_config: Union[ 115 Mapping[str, Any], 116 _T_RUN_CONFIG, 117 ], 118 rng: Optional[RandomGenerator] = None, 119 ) -> _T_RUN_OUTPUT: # yapf: disable 120 run_config = dyn_structure(run_config, self.get_run_config_cls()) 121 return self.engine.run(run_config, rng)
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
124class EngineExecutorFactory( 125 Generic[ 126 _T_INIT_CONFIG, 127 _T_INIT_RESOURCE, 128 _T_RUN_CONFIG, 129 _T_RUN_OUTPUT, 130 ] 131): # yapf: disable 132 133 def __init__( 134 self, 135 engine_cls: Type[ 136 Engine[ 137 _T_INIT_CONFIG, 138 _T_INIT_RESOURCE, 139 _T_RUN_CONFIG, 140 _T_RUN_OUTPUT, 141 ] 142 ], 143 ): # yapf: disable 144 self.engine_cls = engine_cls 145 146 def get_type_name(self): 147 return self.engine_cls.get_type_name() 148 149 def get_init_config_cls(self) -> Type[_T_INIT_CONFIG]: 150 return get_generic_classes(self.engine_cls)[0] # type: ignore 151 152 def get_init_resource_cls(self) -> Type[_T_INIT_RESOURCE]: 153 return get_generic_classes(self.engine_cls)[1] # type: ignore 154 155 def create( 156 self, 157 init_config: Optional[ 158 Union[ 159 Mapping[str, Any], 160 PathType, 161 _T_INIT_CONFIG, 162 ] 163 ] = None, 164 init_resource: Optional[ 165 Union[ 166 Mapping[str, Any], 167 _T_INIT_RESOURCE, 168 ] 169 ] = None, 170 ): # yapf: disable 171 init_config = dyn_structure( 172 init_config, 173 self.get_init_config_cls(), 174 support_path_type=True, 175 support_none_type=True, 176 ) 177 178 init_resource_cls = self.get_init_resource_cls() 179 if init_resource_cls is NoneTypeEngineInitResource: 180 assert init_resource is None 181 else: 182 assert init_resource 183 if init_resource is not None: 184 init_resource = dyn_structure(init_resource, init_resource_cls) 185 186 return EngineExecutor(self.engine_cls(init_config, init_resource))
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
155 def create( 156 self, 157 init_config: Optional[ 158 Union[ 159 Mapping[str, Any], 160 PathType, 161 _T_INIT_CONFIG, 162 ] 163 ] = None, 164 init_resource: Optional[ 165 Union[ 166 Mapping[str, Any], 167 _T_INIT_RESOURCE, 168 ] 169 ] = None, 170 ): # yapf: disable 171 init_config = dyn_structure( 172 init_config, 173 self.get_init_config_cls(), 174 support_path_type=True, 175 support_none_type=True, 176 ) 177 178 init_resource_cls = self.get_init_resource_cls() 179 if init_resource_cls is NoneTypeEngineInitResource: 180 assert init_resource is None 181 else: 182 assert init_resource 183 if init_resource is not None: 184 init_resource = dyn_structure(init_resource, init_resource_cls) 185 186 return EngineExecutor(self.engine_cls(init_config, init_resource))
189class EngineExecutorAggregatorSelector( 190 Generic[ 191 _T_RUN_CONFIG, 192 _T_RUN_OUTPUT, 193 ] 194): # yapf: disable 195 196 def __init__( 197 self, 198 pairs: Sequence[ 199 Tuple[ 200 EngineExecutor[ 201 Any, 202 Any, 203 _T_RUN_CONFIG, 204 _T_RUN_OUTPUT, 205 ], 206 float, 207 ] 208 ], 209 ): # yapf: disable 210 self.engine_executors, self.probs = normalize_to_keys_and_probs(pairs) 211 212 def get_run_config_cls(self): 213 return self.engine_executors[0].get_run_config_cls() 214 215 def select_engine_executor(self, rng: RandomGenerator): 216 return rng_choice(rng, self.engine_executors, probs=self.probs)
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
219def engine_executor_aggregator_default_func_collate( 220 selector: EngineExecutorAggregatorSelector[ 221 _T_RUN_CONFIG, 222 _T_RUN_OUTPUT, 223 ], 224 run_config: _T_RUN_CONFIG, 225 rng: RandomGenerator, 226) -> _T_RUN_OUTPUT: # yapf: disable 227 engine_executor = selector.select_engine_executor(rng) 228 return engine_executor.run(run_config, rng)
231class EngineExecutorAggregator( 232 Generic[ 233 _T_RUN_CONFIG, 234 _T_RUN_OUTPUT, 235 ] 236): # yapf: disable 237 238 def get_run_config_cls(self): 239 return self.selector.get_run_config_cls() 240 241 def __init__( 242 self, 243 selector: EngineExecutorAggregatorSelector[ 244 _T_RUN_CONFIG, 245 _T_RUN_OUTPUT, 246 ], 247 func_collate: Callable[ 248 [ 249 EngineExecutorAggregatorSelector[ 250 _T_RUN_CONFIG, 251 _T_RUN_OUTPUT, 252 ], 253 _T_RUN_CONFIG, 254 RandomGenerator, 255 ], 256 _T_RUN_OUTPUT, 257 ] = engine_executor_aggregator_default_func_collate, 258 ): # yapf: disable 259 self.selector = selector 260 self.func_collate = func_collate 261 262 def run( 263 self, 264 run_config: Union[ 265 Mapping[str, Any], 266 _T_RUN_CONFIG, 267 ], 268 rng: RandomGenerator, 269 ) -> _T_RUN_OUTPUT: # yapf: disable 270 run_config = dyn_structure(run_config, self.get_run_config_cls()) 271 return self.func_collate(self.selector, run_config, rng)
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
241 def __init__( 242 self, 243 selector: EngineExecutorAggregatorSelector[ 244 _T_RUN_CONFIG, 245 _T_RUN_OUTPUT, 246 ], 247 func_collate: Callable[ 248 [ 249 EngineExecutorAggregatorSelector[ 250 _T_RUN_CONFIG, 251 _T_RUN_OUTPUT, 252 ], 253 _T_RUN_CONFIG, 254 RandomGenerator, 255 ], 256 _T_RUN_OUTPUT, 257 ] = engine_executor_aggregator_default_func_collate, 258 ): # yapf: disable 259 self.selector = selector 260 self.func_collate = func_collate
280class EngineExecutorAggregatorFactory( 281 Generic[ 282 _T_RUN_CONFIG, 283 _T_RUN_OUTPUT, 284 ] 285): # yapf: disable 286 287 def __init__( 288 self, 289 engine_executor_factories: Sequence[ 290 EngineExecutorFactory[ 291 Any, 292 Any, 293 _T_RUN_CONFIG, 294 _T_RUN_OUTPUT, 295 ] 296 ], 297 func_collate: Callable[ 298 [ 299 EngineExecutorAggregatorSelector[ 300 _T_RUN_CONFIG, 301 _T_RUN_OUTPUT, 302 ], 303 _T_RUN_CONFIG, 304 RandomGenerator, 305 ], 306 _T_RUN_OUTPUT, 307 ] = engine_executor_aggregator_default_func_collate, 308 ): # yapf: disable 309 self.type_name_to_engine_executor_factory = { 310 engine_executor_factory.get_type_name(): engine_executor_factory 311 for engine_executor_factory in engine_executor_factories 312 } 313 self.func_collate = func_collate 314 315 def create( 316 self, 317 factory_init_configs: Union[ 318 Sequence[Mapping[str, Any]], 319 PathType, 320 ], 321 init_resources: Optional[ 322 Sequence[ 323 Union[ 324 Mapping[str, Any], 325 # TODO: find a better way to constrain resource type. 326 Any, 327 ] 328 ] 329 ] = None, 330 ): # yapf: disable 331 if is_path_type(factory_init_configs): 332 factory_init_configs = read_json_file(factory_init_configs) # type: ignore 333 factory_init_configs = cast(Sequence[Mapping[str, Any]], factory_init_configs) 334 335 pairs: List[ 336 Tuple[ 337 EngineExecutor[Any, Any, _T_RUN_CONFIG, _T_RUN_OUTPUT], 338 float, 339 ], 340 ] = [] # yapf: disable 341 342 for factory_init_config, init_resource in zip( 343 factory_init_configs, init_resources or itertools.repeat(None) 344 ): 345 # Reflects engine_executor_factory. 346 type_name = factory_init_config[EngineExecutorAggregatorFactoryConfigKey.TYPE] 347 if type_name not in self.type_name_to_engine_executor_factory: 348 raise KeyError(f'type_name={type_name} not found') 349 engine_executor_factory = self.type_name_to_engine_executor_factory[type_name] 350 351 # Build init_resource. 352 init_resource_cls = engine_executor_factory.get_init_resource_cls() 353 if init_resource_cls is NoneTypeEngineInitResource: 354 assert init_resource is None 355 else: 356 assert init_resource 357 init_resource = dyn_structure(init_resource, init_resource_cls) 358 359 # Build engine_executor. 360 engine_executor = engine_executor_factory.create( 361 factory_init_config.get(EngineExecutorAggregatorFactoryConfigKey.CONFIG, {}), 362 init_resource, 363 ) 364 365 # Get the weight. 366 if len(factory_init_configs) == 1: 367 weight = 1 368 else: 369 weight = factory_init_config[EngineExecutorAggregatorFactoryConfigKey.WEIGHT] 370 371 pairs.append((engine_executor, weight)) 372 373 return EngineExecutorAggregator( 374 EngineExecutorAggregatorSelector(pairs), 375 func_collate=self.func_collate, 376 ) 377 378 def create_with_repeated_init_resource( 379 self, 380 factory_init_configs: Union[ 381 Sequence[Mapping[str, Any]], 382 PathType, 383 ], 384 init_resource: Union[ 385 Mapping[str, Any], 386 Any, 387 ] 388 ): # yapf: disable 389 if is_path_type(factory_init_configs): 390 factory_init_configs = read_json_file(factory_init_configs) # type: ignore 391 factory_init_configs = cast(Sequence[Mapping[str, Any]], factory_init_configs) 392 393 return self.create( 394 factory_init_configs, 395 [init_resource] * len(factory_init_configs), 396 ) 397 398 def create_engine_executor( 399 self, 400 factory_init_config: Mapping[str, Any], 401 init_resource: Optional[ 402 Union[ 403 Mapping[str, Any], 404 Any, 405 ] 406 ] = None, 407 ): # yapf: disable 408 executor_aggregator = self.create( 409 factory_init_configs=[factory_init_config], 410 init_resources=[init_resource] if init_resource else None, 411 ) 412 return executor_aggregator.selector.engine_executors[0]
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
287 def __init__( 288 self, 289 engine_executor_factories: Sequence[ 290 EngineExecutorFactory[ 291 Any, 292 Any, 293 _T_RUN_CONFIG, 294 _T_RUN_OUTPUT, 295 ] 296 ], 297 func_collate: Callable[ 298 [ 299 EngineExecutorAggregatorSelector[ 300 _T_RUN_CONFIG, 301 _T_RUN_OUTPUT, 302 ], 303 _T_RUN_CONFIG, 304 RandomGenerator, 305 ], 306 _T_RUN_OUTPUT, 307 ] = engine_executor_aggregator_default_func_collate, 308 ): # yapf: disable 309 self.type_name_to_engine_executor_factory = { 310 engine_executor_factory.get_type_name(): engine_executor_factory 311 for engine_executor_factory in engine_executor_factories 312 } 313 self.func_collate = func_collate
315 def create( 316 self, 317 factory_init_configs: Union[ 318 Sequence[Mapping[str, Any]], 319 PathType, 320 ], 321 init_resources: Optional[ 322 Sequence[ 323 Union[ 324 Mapping[str, Any], 325 # TODO: find a better way to constrain resource type. 326 Any, 327 ] 328 ] 329 ] = None, 330 ): # yapf: disable 331 if is_path_type(factory_init_configs): 332 factory_init_configs = read_json_file(factory_init_configs) # type: ignore 333 factory_init_configs = cast(Sequence[Mapping[str, Any]], factory_init_configs) 334 335 pairs: List[ 336 Tuple[ 337 EngineExecutor[Any, Any, _T_RUN_CONFIG, _T_RUN_OUTPUT], 338 float, 339 ], 340 ] = [] # yapf: disable 341 342 for factory_init_config, init_resource in zip( 343 factory_init_configs, init_resources or itertools.repeat(None) 344 ): 345 # Reflects engine_executor_factory. 346 type_name = factory_init_config[EngineExecutorAggregatorFactoryConfigKey.TYPE] 347 if type_name not in self.type_name_to_engine_executor_factory: 348 raise KeyError(f'type_name={type_name} not found') 349 engine_executor_factory = self.type_name_to_engine_executor_factory[type_name] 350 351 # Build init_resource. 352 init_resource_cls = engine_executor_factory.get_init_resource_cls() 353 if init_resource_cls is NoneTypeEngineInitResource: 354 assert init_resource is None 355 else: 356 assert init_resource 357 init_resource = dyn_structure(init_resource, init_resource_cls) 358 359 # Build engine_executor. 360 engine_executor = engine_executor_factory.create( 361 factory_init_config.get(EngineExecutorAggregatorFactoryConfigKey.CONFIG, {}), 362 init_resource, 363 ) 364 365 # Get the weight. 366 if len(factory_init_configs) == 1: 367 weight = 1 368 else: 369 weight = factory_init_config[EngineExecutorAggregatorFactoryConfigKey.WEIGHT] 370 371 pairs.append((engine_executor, weight)) 372 373 return EngineExecutorAggregator( 374 EngineExecutorAggregatorSelector(pairs), 375 func_collate=self.func_collate, 376 )
378 def create_with_repeated_init_resource( 379 self, 380 factory_init_configs: Union[ 381 Sequence[Mapping[str, Any]], 382 PathType, 383 ], 384 init_resource: Union[ 385 Mapping[str, Any], 386 Any, 387 ] 388 ): # yapf: disable 389 if is_path_type(factory_init_configs): 390 factory_init_configs = read_json_file(factory_init_configs) # type: ignore 391 factory_init_configs = cast(Sequence[Mapping[str, Any]], factory_init_configs) 392 393 return self.create( 394 factory_init_configs, 395 [init_resource] * len(factory_init_configs), 396 )
398 def create_engine_executor( 399 self, 400 factory_init_config: Mapping[str, Any], 401 init_resource: Optional[ 402 Union[ 403 Mapping[str, Any], 404 Any, 405 ] 406 ] = None, 407 ): # yapf: disable 408 executor_aggregator = self.create( 409 factory_init_configs=[factory_init_config], 410 init_resources=[init_resource] if init_resource else None, 411 ) 412 return executor_aggregator.selector.engine_executors[0]