vkit.engine.interface
1# Copyright 2022 vkit-x Administrator. All Rights Reserved. 2# 3# This project (vkit-x/vkit) is dual-licensed under commercial and SSPL licenses. 4# 5# The commercial license gives you the full rights to create and distribute software 6# on your own terms without any SSPL license obligations. For more information, 7# please see the "LICENSE_COMMERCIAL.txt" file. 8# 9# This project is also available under Server Side Public License (SSPL). 10# The SSPL licensing is ideal for use cases such as open source projects with 11# SSPL distribution, student/academic purposes, hobby projects, internal research 12# projects without external distribution, or other projects where all SSPL 13# obligations can be met. For more information, please see the "LICENSE_SSPL.txt" file. 14from typing import ( 15 cast, 16 Generic, 17 Type, 18 TypeVar, 19 Mapping, 20 Sequence, 21 Any, 22 Tuple, 23 List, 24 Optional, 25 Union, 26 Callable, 27) 28import itertools 29 30import attrs 31from numpy.random import Generator as RandomGenerator 32 33from vkit.utility import ( 34 normalize_to_keys_and_probs, 35 rng_choice, 36 is_path_type, 37 read_json_file, 38 dyn_structure, 39 get_generic_classes, 40 PathType, 41) 42 43_T_INIT_CONFIG = TypeVar('_T_INIT_CONFIG') 44_T_INIT_RESOURCE = TypeVar('_T_INIT_RESOURCE') 45_T_RUN_CONFIG = TypeVar('_T_RUN_CONFIG') 46_T_RUN_OUTPUT = TypeVar('_T_RUN_OUTPUT') 47 48 49@attrs.define 50class NoneTypeEngineInitConfig: 51 pass 52 53 54@attrs.define 55class NoneTypeEngineInitResource: 56 pass 57 58 59class Engine( 60 Generic[ 61 _T_INIT_CONFIG, 62 _T_INIT_RESOURCE, 63 _T_RUN_CONFIG, 64 _T_RUN_OUTPUT, 65 ] 66): # yapf: disable 67 68 @classmethod 69 def get_type_name(cls) -> str: 70 raise NotImplementedError() 71 72 def __init__( 73 self, 74 init_config: _T_INIT_CONFIG, 75 init_resource: Optional[_T_INIT_RESOURCE] = None, 76 ): 77 self.init_config = init_config 78 self.init_resource = init_resource 79 80 def run(self, run_config: _T_RUN_CONFIG, rng: RandomGenerator) -> _T_RUN_OUTPUT: 81 raise NotImplementedError() 82 83 84class EngineExecutor( 85 Generic[ 86 _T_INIT_CONFIG, 87 _T_INIT_RESOURCE, 88 _T_RUN_CONFIG, 89 _T_RUN_OUTPUT, 90 ] 91): # yapf: disable 92 93 def __init__( 94 self, 95 engine: Engine[ 96 _T_INIT_CONFIG, 97 _T_INIT_RESOURCE, 98 _T_RUN_CONFIG, 99 _T_RUN_OUTPUT, 100 ], 101 ): # yapf: disable 102 self.engine = engine 103 104 def get_run_config_cls(self) -> Type[_T_RUN_CONFIG]: 105 return get_generic_classes(type(self.engine))[2] # type: ignore 106 107 def run( 108 self, 109 run_config: Union[ 110 Mapping[str, Any], 111 _T_RUN_CONFIG, 112 ], 113 rng: RandomGenerator, 114 ) -> _T_RUN_OUTPUT: # yapf: disable 115 run_config = dyn_structure(run_config, self.get_run_config_cls()) 116 return self.engine.run(run_config, rng) 117 118 119class EngineExecutorFactory( 120 Generic[ 121 _T_INIT_CONFIG, 122 _T_INIT_RESOURCE, 123 _T_RUN_CONFIG, 124 _T_RUN_OUTPUT, 125 ] 126): # yapf: disable 127 128 def __init__( 129 self, 130 engine_cls: Type[ 131 Engine[ 132 _T_INIT_CONFIG, 133 _T_INIT_RESOURCE, 134 _T_RUN_CONFIG, 135 _T_RUN_OUTPUT, 136 ] 137 ], 138 ): # yapf: disable 139 self.engine_cls = engine_cls 140 141 def get_type_name(self): 142 return self.engine_cls.get_type_name() 143 144 def get_init_config_cls(self) -> Type[_T_INIT_CONFIG]: 145 return get_generic_classes(self.engine_cls)[0] # type: ignore 146 147 def get_init_resource_cls(self) -> Type[_T_INIT_RESOURCE]: 148 return get_generic_classes(self.engine_cls)[1] # type: ignore 149 150 def create( 151 self, 152 init_config: Optional[ 153 Union[ 154 Mapping[str, Any], 155 PathType, 156 _T_INIT_CONFIG, 157 ] 158 ] = None, 159 init_resource: Optional[ 160 Union[ 161 Mapping[str, Any], 162 _T_INIT_RESOURCE, 163 ] 164 ] = None, 165 ): # yapf: disable 166 init_config = dyn_structure( 167 init_config, 168 self.get_init_config_cls(), 169 support_path_type=True, 170 support_none_type=True, 171 ) 172 173 init_resource_cls = self.get_init_resource_cls() 174 if init_resource_cls is NoneTypeEngineInitResource: 175 assert init_resource is None 176 else: 177 assert init_resource 178 if init_resource is not None: 179 init_resource = dyn_structure(init_resource, init_resource_cls) 180 181 return EngineExecutor(self.engine_cls(init_config, init_resource)) 182 183 184class EngineExecutorAggregatorSelector( 185 Generic[ 186 _T_RUN_CONFIG, 187 _T_RUN_OUTPUT, 188 ] 189): # yapf: disable 190 191 def __init__( 192 self, 193 pairs: Sequence[ 194 Tuple[ 195 EngineExecutor[ 196 Any, 197 Any, 198 _T_RUN_CONFIG, 199 _T_RUN_OUTPUT, 200 ], 201 float, 202 ] 203 ], 204 ): # yapf: disable 205 self.engine_executors, self.probs = normalize_to_keys_and_probs(pairs) 206 207 def get_run_config_cls(self): 208 return self.engine_executors[0].get_run_config_cls() 209 210 def select_engine_executor(self, rng: RandomGenerator): 211 return rng_choice(rng, self.engine_executors, probs=self.probs) 212 213 214def engine_executor_aggregator_default_func_collate( 215 selector: EngineExecutorAggregatorSelector[ 216 _T_RUN_CONFIG, 217 _T_RUN_OUTPUT, 218 ], 219 run_config: _T_RUN_CONFIG, 220 rng: RandomGenerator, 221) -> _T_RUN_OUTPUT: # yapf: disable 222 engine_executor = selector.select_engine_executor(rng) 223 return engine_executor.run(run_config, rng) 224 225 226class EngineExecutorAggregator( 227 Generic[ 228 _T_RUN_CONFIG, 229 _T_RUN_OUTPUT, 230 ] 231): # yapf: disable 232 233 def get_run_config_cls(self): 234 return self.selector.get_run_config_cls() 235 236 def __init__( 237 self, 238 selector: EngineExecutorAggregatorSelector[ 239 _T_RUN_CONFIG, 240 _T_RUN_OUTPUT, 241 ], 242 func_collate: Callable[ 243 [ 244 EngineExecutorAggregatorSelector[ 245 _T_RUN_CONFIG, 246 _T_RUN_OUTPUT, 247 ], 248 _T_RUN_CONFIG, 249 RandomGenerator, 250 ], 251 _T_RUN_OUTPUT, 252 ] = engine_executor_aggregator_default_func_collate, 253 ): # yapf: disable 254 self.selector = selector 255 self.func_collate = func_collate 256 257 def run( 258 self, 259 run_config: Union[ 260 Mapping[str, Any], 261 _T_RUN_CONFIG, 262 ], 263 rng: RandomGenerator, 264 ) -> _T_RUN_OUTPUT: # yapf: disable 265 run_config = dyn_structure(run_config, self.get_run_config_cls()) 266 return self.func_collate(self.selector, run_config, rng) 267 268 269class EngineExecutorAggregatorFactoryConfigKey: 270 TYPE = 'type' 271 WEIGHT = 'weight' 272 CONFIG = 'config' 273 274 275class EngineExecutorAggregatorFactory( 276 Generic[ 277 _T_RUN_CONFIG, 278 _T_RUN_OUTPUT, 279 ] 280): # yapf: disable 281 282 def __init__( 283 self, 284 engine_executor_factories: Sequence[ 285 EngineExecutorFactory[ 286 Any, 287 Any, 288 _T_RUN_CONFIG, 289 _T_RUN_OUTPUT, 290 ] 291 ], 292 func_collate: Callable[ 293 [ 294 EngineExecutorAggregatorSelector[ 295 _T_RUN_CONFIG, 296 _T_RUN_OUTPUT, 297 ], 298 _T_RUN_CONFIG, 299 RandomGenerator, 300 ], 301 _T_RUN_OUTPUT, 302 ] = engine_executor_aggregator_default_func_collate, 303 ): # yapf: disable 304 self.type_name_to_engine_executor_factory = { 305 engine_executor_factory.get_type_name(): engine_executor_factory 306 for engine_executor_factory in engine_executor_factories 307 } 308 self.func_collate = func_collate 309 310 def create( 311 self, 312 factory_init_configs: Union[ 313 Sequence[Mapping[str, Any]], 314 PathType, 315 ], 316 init_resources: Optional[ 317 Sequence[ 318 Union[ 319 Mapping[str, Any], 320 # TODO: find a better way to constrain resource type. 321 Any, 322 ] 323 ] 324 ] = None, 325 ): # yapf: disable 326 if is_path_type(factory_init_configs): 327 factory_init_configs = read_json_file(factory_init_configs) # type: ignore 328 factory_init_configs = cast(Sequence[Mapping[str, Any]], factory_init_configs) 329 330 pairs: List[ 331 Tuple[ 332 EngineExecutor[Any, Any, _T_RUN_CONFIG, _T_RUN_OUTPUT], 333 float, 334 ], 335 ] = [] # yapf: disable 336 337 for factory_init_config, init_resource in zip( 338 factory_init_configs, init_resources or itertools.repeat(None) 339 ): 340 # Reflects engine_executor_factory. 341 type_name = factory_init_config[EngineExecutorAggregatorFactoryConfigKey.TYPE] 342 if type_name not in self.type_name_to_engine_executor_factory: 343 raise KeyError(f'type_name={type_name} not found') 344 engine_executor_factory = self.type_name_to_engine_executor_factory[type_name] 345 346 # Build init_resource. 347 init_resource_cls = engine_executor_factory.get_init_resource_cls() 348 if init_resource_cls is NoneTypeEngineInitResource: 349 assert init_resource is None 350 else: 351 assert init_resource 352 init_resource = dyn_structure(init_resource, init_resource_cls) 353 354 # Build engine_executor. 355 engine_executor = engine_executor_factory.create( 356 factory_init_config.get(EngineExecutorAggregatorFactoryConfigKey.CONFIG, {}), 357 init_resource, 358 ) 359 360 # Get the weight. 361 if len(factory_init_configs) == 1: 362 weight = 1 363 else: 364 weight = factory_init_config[EngineExecutorAggregatorFactoryConfigKey.WEIGHT] 365 366 pairs.append((engine_executor, weight)) 367 368 return EngineExecutorAggregator( 369 EngineExecutorAggregatorSelector(pairs), 370 func_collate=self.func_collate, 371 ) 372 373 def create_with_repeated_init_resource( 374 self, 375 factory_init_configs: Union[ 376 Sequence[Mapping[str, Any]], 377 PathType, 378 ], 379 init_resource: Union[ 380 Mapping[str, Any], 381 Any, 382 ] 383 ): # yapf: disable 384 if is_path_type(factory_init_configs): 385 factory_init_configs = read_json_file(factory_init_configs) # type: ignore 386 factory_init_configs = cast(Sequence[Mapping[str, Any]], factory_init_configs) 387 388 return self.create( 389 factory_init_configs, 390 [init_resource] * len(factory_init_configs), 391 ) 392 393 394# TODO: move to doc. 395# A minimum template. 396''' 397from typing import Optional 398 399import attrs 400from numpy.random import Generator as RandomGenerator 401 402from vkit.engine.interface import ( 403 Engine, 404 EngineExecutorFactory, 405 NoneTypeEngineInitResource, 406) 407 408 409@attrs.define 410class FooEngineInitConfig: 411 pass 412 413 414@attrs.define 415class FooEngineRunConfig: 416 pass 417 418 419class FooEngine( 420 Engine[ 421 FooEngineInitConfig, 422 NoneTypeEngineInitResource, 423 FooEngineRunConfig, 424 int, 425 ] 426): # yapf: disable 427 428 @classmethod 429 def get_type_name(cls) -> str: 430 return 'foo' 431 432 def __init__( 433 self, 434 init_config: FooEngineInitConfig, 435 init_resource: Optional[NoneTypeEngineInitResource] = None, 436 ): 437 super().__init__(init_config, init_resource) 438 439 def run(self, run_config: FooEngineRunConfig, rng: RandomGenerator) -> int: 440 return 42 441 442 443foo_engine_executor_factory = EngineExecutorFactory(FooEngine) 444'''
60class Engine( 61 Generic[ 62 _T_INIT_CONFIG, 63 _T_INIT_RESOURCE, 64 _T_RUN_CONFIG, 65 _T_RUN_OUTPUT, 66 ] 67): # yapf: disable 68 69 @classmethod 70 def get_type_name(cls) -> str: 71 raise NotImplementedError() 72 73 def __init__( 74 self, 75 init_config: _T_INIT_CONFIG, 76 init_resource: Optional[_T_INIT_RESOURCE] = None, 77 ): 78 self.init_config = init_config 79 self.init_resource = init_resource 80 81 def run(self, run_config: _T_RUN_CONFIG, rng: RandomGenerator) -> _T_RUN_OUTPUT: 82 raise NotImplementedError()
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
85class EngineExecutor( 86 Generic[ 87 _T_INIT_CONFIG, 88 _T_INIT_RESOURCE, 89 _T_RUN_CONFIG, 90 _T_RUN_OUTPUT, 91 ] 92): # yapf: disable 93 94 def __init__( 95 self, 96 engine: Engine[ 97 _T_INIT_CONFIG, 98 _T_INIT_RESOURCE, 99 _T_RUN_CONFIG, 100 _T_RUN_OUTPUT, 101 ], 102 ): # yapf: disable 103 self.engine = engine 104 105 def get_run_config_cls(self) -> Type[_T_RUN_CONFIG]: 106 return get_generic_classes(type(self.engine))[2] # type: ignore 107 108 def run( 109 self, 110 run_config: Union[ 111 Mapping[str, Any], 112 _T_RUN_CONFIG, 113 ], 114 rng: RandomGenerator, 115 ) -> _T_RUN_OUTPUT: # yapf: disable 116 run_config = dyn_structure(run_config, self.get_run_config_cls()) 117 return self.engine.run(run_config, rng)
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
120class EngineExecutorFactory( 121 Generic[ 122 _T_INIT_CONFIG, 123 _T_INIT_RESOURCE, 124 _T_RUN_CONFIG, 125 _T_RUN_OUTPUT, 126 ] 127): # yapf: disable 128 129 def __init__( 130 self, 131 engine_cls: Type[ 132 Engine[ 133 _T_INIT_CONFIG, 134 _T_INIT_RESOURCE, 135 _T_RUN_CONFIG, 136 _T_RUN_OUTPUT, 137 ] 138 ], 139 ): # yapf: disable 140 self.engine_cls = engine_cls 141 142 def get_type_name(self): 143 return self.engine_cls.get_type_name() 144 145 def get_init_config_cls(self) -> Type[_T_INIT_CONFIG]: 146 return get_generic_classes(self.engine_cls)[0] # type: ignore 147 148 def get_init_resource_cls(self) -> Type[_T_INIT_RESOURCE]: 149 return get_generic_classes(self.engine_cls)[1] # type: ignore 150 151 def create( 152 self, 153 init_config: Optional[ 154 Union[ 155 Mapping[str, Any], 156 PathType, 157 _T_INIT_CONFIG, 158 ] 159 ] = None, 160 init_resource: Optional[ 161 Union[ 162 Mapping[str, Any], 163 _T_INIT_RESOURCE, 164 ] 165 ] = None, 166 ): # yapf: disable 167 init_config = dyn_structure( 168 init_config, 169 self.get_init_config_cls(), 170 support_path_type=True, 171 support_none_type=True, 172 ) 173 174 init_resource_cls = self.get_init_resource_cls() 175 if init_resource_cls is NoneTypeEngineInitResource: 176 assert init_resource is None 177 else: 178 assert init_resource 179 if init_resource is not None: 180 init_resource = dyn_structure(init_resource, init_resource_cls) 181 182 return EngineExecutor(self.engine_cls(init_config, init_resource))
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
151 def create( 152 self, 153 init_config: Optional[ 154 Union[ 155 Mapping[str, Any], 156 PathType, 157 _T_INIT_CONFIG, 158 ] 159 ] = None, 160 init_resource: Optional[ 161 Union[ 162 Mapping[str, Any], 163 _T_INIT_RESOURCE, 164 ] 165 ] = None, 166 ): # yapf: disable 167 init_config = dyn_structure( 168 init_config, 169 self.get_init_config_cls(), 170 support_path_type=True, 171 support_none_type=True, 172 ) 173 174 init_resource_cls = self.get_init_resource_cls() 175 if init_resource_cls is NoneTypeEngineInitResource: 176 assert init_resource is None 177 else: 178 assert init_resource 179 if init_resource is not None: 180 init_resource = dyn_structure(init_resource, init_resource_cls) 181 182 return EngineExecutor(self.engine_cls(init_config, init_resource))
185class EngineExecutorAggregatorSelector( 186 Generic[ 187 _T_RUN_CONFIG, 188 _T_RUN_OUTPUT, 189 ] 190): # yapf: disable 191 192 def __init__( 193 self, 194 pairs: Sequence[ 195 Tuple[ 196 EngineExecutor[ 197 Any, 198 Any, 199 _T_RUN_CONFIG, 200 _T_RUN_OUTPUT, 201 ], 202 float, 203 ] 204 ], 205 ): # yapf: disable 206 self.engine_executors, self.probs = normalize_to_keys_and_probs(pairs) 207 208 def get_run_config_cls(self): 209 return self.engine_executors[0].get_run_config_cls() 210 211 def select_engine_executor(self, rng: RandomGenerator): 212 return rng_choice(rng, self.engine_executors, probs=self.probs)
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
215def engine_executor_aggregator_default_func_collate( 216 selector: EngineExecutorAggregatorSelector[ 217 _T_RUN_CONFIG, 218 _T_RUN_OUTPUT, 219 ], 220 run_config: _T_RUN_CONFIG, 221 rng: RandomGenerator, 222) -> _T_RUN_OUTPUT: # yapf: disable 223 engine_executor = selector.select_engine_executor(rng) 224 return engine_executor.run(run_config, rng)
227class EngineExecutorAggregator( 228 Generic[ 229 _T_RUN_CONFIG, 230 _T_RUN_OUTPUT, 231 ] 232): # yapf: disable 233 234 def get_run_config_cls(self): 235 return self.selector.get_run_config_cls() 236 237 def __init__( 238 self, 239 selector: EngineExecutorAggregatorSelector[ 240 _T_RUN_CONFIG, 241 _T_RUN_OUTPUT, 242 ], 243 func_collate: Callable[ 244 [ 245 EngineExecutorAggregatorSelector[ 246 _T_RUN_CONFIG, 247 _T_RUN_OUTPUT, 248 ], 249 _T_RUN_CONFIG, 250 RandomGenerator, 251 ], 252 _T_RUN_OUTPUT, 253 ] = engine_executor_aggregator_default_func_collate, 254 ): # yapf: disable 255 self.selector = selector 256 self.func_collate = func_collate 257 258 def run( 259 self, 260 run_config: Union[ 261 Mapping[str, Any], 262 _T_RUN_CONFIG, 263 ], 264 rng: RandomGenerator, 265 ) -> _T_RUN_OUTPUT: # yapf: disable 266 run_config = dyn_structure(run_config, self.get_run_config_cls()) 267 return self.func_collate(self.selector, run_config, rng)
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
237 def __init__( 238 self, 239 selector: EngineExecutorAggregatorSelector[ 240 _T_RUN_CONFIG, 241 _T_RUN_OUTPUT, 242 ], 243 func_collate: Callable[ 244 [ 245 EngineExecutorAggregatorSelector[ 246 _T_RUN_CONFIG, 247 _T_RUN_OUTPUT, 248 ], 249 _T_RUN_CONFIG, 250 RandomGenerator, 251 ], 252 _T_RUN_OUTPUT, 253 ] = engine_executor_aggregator_default_func_collate, 254 ): # yapf: disable 255 self.selector = selector 256 self.func_collate = func_collate
276class EngineExecutorAggregatorFactory( 277 Generic[ 278 _T_RUN_CONFIG, 279 _T_RUN_OUTPUT, 280 ] 281): # yapf: disable 282 283 def __init__( 284 self, 285 engine_executor_factories: Sequence[ 286 EngineExecutorFactory[ 287 Any, 288 Any, 289 _T_RUN_CONFIG, 290 _T_RUN_OUTPUT, 291 ] 292 ], 293 func_collate: Callable[ 294 [ 295 EngineExecutorAggregatorSelector[ 296 _T_RUN_CONFIG, 297 _T_RUN_OUTPUT, 298 ], 299 _T_RUN_CONFIG, 300 RandomGenerator, 301 ], 302 _T_RUN_OUTPUT, 303 ] = engine_executor_aggregator_default_func_collate, 304 ): # yapf: disable 305 self.type_name_to_engine_executor_factory = { 306 engine_executor_factory.get_type_name(): engine_executor_factory 307 for engine_executor_factory in engine_executor_factories 308 } 309 self.func_collate = func_collate 310 311 def create( 312 self, 313 factory_init_configs: Union[ 314 Sequence[Mapping[str, Any]], 315 PathType, 316 ], 317 init_resources: Optional[ 318 Sequence[ 319 Union[ 320 Mapping[str, Any], 321 # TODO: find a better way to constrain resource type. 322 Any, 323 ] 324 ] 325 ] = None, 326 ): # yapf: disable 327 if is_path_type(factory_init_configs): 328 factory_init_configs = read_json_file(factory_init_configs) # type: ignore 329 factory_init_configs = cast(Sequence[Mapping[str, Any]], factory_init_configs) 330 331 pairs: List[ 332 Tuple[ 333 EngineExecutor[Any, Any, _T_RUN_CONFIG, _T_RUN_OUTPUT], 334 float, 335 ], 336 ] = [] # yapf: disable 337 338 for factory_init_config, init_resource in zip( 339 factory_init_configs, init_resources or itertools.repeat(None) 340 ): 341 # Reflects engine_executor_factory. 342 type_name = factory_init_config[EngineExecutorAggregatorFactoryConfigKey.TYPE] 343 if type_name not in self.type_name_to_engine_executor_factory: 344 raise KeyError(f'type_name={type_name} not found') 345 engine_executor_factory = self.type_name_to_engine_executor_factory[type_name] 346 347 # Build init_resource. 348 init_resource_cls = engine_executor_factory.get_init_resource_cls() 349 if init_resource_cls is NoneTypeEngineInitResource: 350 assert init_resource is None 351 else: 352 assert init_resource 353 init_resource = dyn_structure(init_resource, init_resource_cls) 354 355 # Build engine_executor. 356 engine_executor = engine_executor_factory.create( 357 factory_init_config.get(EngineExecutorAggregatorFactoryConfigKey.CONFIG, {}), 358 init_resource, 359 ) 360 361 # Get the weight. 362 if len(factory_init_configs) == 1: 363 weight = 1 364 else: 365 weight = factory_init_config[EngineExecutorAggregatorFactoryConfigKey.WEIGHT] 366 367 pairs.append((engine_executor, weight)) 368 369 return EngineExecutorAggregator( 370 EngineExecutorAggregatorSelector(pairs), 371 func_collate=self.func_collate, 372 ) 373 374 def create_with_repeated_init_resource( 375 self, 376 factory_init_configs: Union[ 377 Sequence[Mapping[str, Any]], 378 PathType, 379 ], 380 init_resource: Union[ 381 Mapping[str, Any], 382 Any, 383 ] 384 ): # yapf: disable 385 if is_path_type(factory_init_configs): 386 factory_init_configs = read_json_file(factory_init_configs) # type: ignore 387 factory_init_configs = cast(Sequence[Mapping[str, Any]], factory_init_configs) 388 389 return self.create( 390 factory_init_configs, 391 [init_resource] * len(factory_init_configs), 392 )
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
283 def __init__( 284 self, 285 engine_executor_factories: Sequence[ 286 EngineExecutorFactory[ 287 Any, 288 Any, 289 _T_RUN_CONFIG, 290 _T_RUN_OUTPUT, 291 ] 292 ], 293 func_collate: Callable[ 294 [ 295 EngineExecutorAggregatorSelector[ 296 _T_RUN_CONFIG, 297 _T_RUN_OUTPUT, 298 ], 299 _T_RUN_CONFIG, 300 RandomGenerator, 301 ], 302 _T_RUN_OUTPUT, 303 ] = engine_executor_aggregator_default_func_collate, 304 ): # yapf: disable 305 self.type_name_to_engine_executor_factory = { 306 engine_executor_factory.get_type_name(): engine_executor_factory 307 for engine_executor_factory in engine_executor_factories 308 } 309 self.func_collate = func_collate
311 def create( 312 self, 313 factory_init_configs: Union[ 314 Sequence[Mapping[str, Any]], 315 PathType, 316 ], 317 init_resources: Optional[ 318 Sequence[ 319 Union[ 320 Mapping[str, Any], 321 # TODO: find a better way to constrain resource type. 322 Any, 323 ] 324 ] 325 ] = None, 326 ): # yapf: disable 327 if is_path_type(factory_init_configs): 328 factory_init_configs = read_json_file(factory_init_configs) # type: ignore 329 factory_init_configs = cast(Sequence[Mapping[str, Any]], factory_init_configs) 330 331 pairs: List[ 332 Tuple[ 333 EngineExecutor[Any, Any, _T_RUN_CONFIG, _T_RUN_OUTPUT], 334 float, 335 ], 336 ] = [] # yapf: disable 337 338 for factory_init_config, init_resource in zip( 339 factory_init_configs, init_resources or itertools.repeat(None) 340 ): 341 # Reflects engine_executor_factory. 342 type_name = factory_init_config[EngineExecutorAggregatorFactoryConfigKey.TYPE] 343 if type_name not in self.type_name_to_engine_executor_factory: 344 raise KeyError(f'type_name={type_name} not found') 345 engine_executor_factory = self.type_name_to_engine_executor_factory[type_name] 346 347 # Build init_resource. 348 init_resource_cls = engine_executor_factory.get_init_resource_cls() 349 if init_resource_cls is NoneTypeEngineInitResource: 350 assert init_resource is None 351 else: 352 assert init_resource 353 init_resource = dyn_structure(init_resource, init_resource_cls) 354 355 # Build engine_executor. 356 engine_executor = engine_executor_factory.create( 357 factory_init_config.get(EngineExecutorAggregatorFactoryConfigKey.CONFIG, {}), 358 init_resource, 359 ) 360 361 # Get the weight. 362 if len(factory_init_configs) == 1: 363 weight = 1 364 else: 365 weight = factory_init_config[EngineExecutorAggregatorFactoryConfigKey.WEIGHT] 366 367 pairs.append((engine_executor, weight)) 368 369 return EngineExecutorAggregator( 370 EngineExecutorAggregatorSelector(pairs), 371 func_collate=self.func_collate, 372 )
374 def create_with_repeated_init_resource( 375 self, 376 factory_init_configs: Union[ 377 Sequence[Mapping[str, Any]], 378 PathType, 379 ], 380 init_resource: Union[ 381 Mapping[str, Any], 382 Any, 383 ] 384 ): # yapf: disable 385 if is_path_type(factory_init_configs): 386 factory_init_configs = read_json_file(factory_init_configs) # type: ignore 387 factory_init_configs = cast(Sequence[Mapping[str, Any]], factory_init_configs) 388 389 return self.create( 390 factory_init_configs, 391 [init_resource] * len(factory_init_configs), 392 )