vkit.pipeline.pool
1# Copyright 2022 vkit-x Administrator. All Rights Reserved. 2# 3# This project (vkit-x/vkit) is dual-licensed under commercial and SSPL licenses. 4# 5# The commercial license gives you the full rights to create and distribute software 6# on your own terms without any SSPL license obligations. For more information, 7# please see the "LICENSE_COMMERCIAL.txt" file. 8# 9# This project is also available under Server Side Public License (SSPL). 10# The SSPL licensing is ideal for use cases such as open source projects with 11# SSPL distribution, student/academic purposes, hobby projects, internal research 12# projects without external distribution, or other projects where all SSPL 13# obligations can be met. For more information, please see the "LICENSE_SSPL.txt" file. 14from typing import TypeVar, Generic, Optional 15import logging 16 17import attrs 18from numpy.random import SeedSequence, default_rng 19 20from vkit.utility import Pool, PoolConfig 21from .interface import Pipeline 22 23_T_OUTPUT = TypeVar('_T_OUTPUT') 24 25 26@attrs.define 27class PipelinePoolWorkerConfig(Generic[_T_OUTPUT]): 28 pipeline: Pipeline[_T_OUTPUT] 29 num_runs_reset_rng: Optional[int] 30 31 32class PipelinePoolWorker(Generic[_T_OUTPUT]): 33 34 def __init__( 35 self, 36 process_idx: int, 37 seed_sequence: SeedSequence, 38 logger: logging.Logger, 39 config: PipelinePoolWorkerConfig[_T_OUTPUT], 40 ): 41 self.process_idx = process_idx 42 self.logger = logger 43 44 self.seed_sequence = seed_sequence 45 self.rng = default_rng(self.seed_sequence) 46 self.logger.info( 47 f'Set pipeline process_idx={self.process_idx} ' 48 f'rng_state to {self.rng.bit_generator.state} ' 49 ) 50 self.rng_run_idx = 0 51 52 self.pipeline = config.pipeline 53 self.num_runs_reset_rng = config.num_runs_reset_rng 54 55 def reset_rng(self): 56 self.rng = default_rng(self.seed_sequence) 57 self.rng_run_idx = 0 58 self.logger.info( 59 f'Reset pipeline process_idx={self.process_idx} ' 60 f'rng_state to {self.rng.bit_generator.state} ' 61 'and run_idx to 0' 62 ) 63 64 def run(self): 65 output: Optional[_T_OUTPUT] = None 66 67 while True: 68 cur_rng_state = self.rng.bit_generator.state 69 try: 70 output = self.pipeline.run(self.rng) 71 self.logger.debug( 72 f'pipeline.run process_idx={self.process_idx} with ' 73 f'rng_state={cur_rng_state} generates output={output}' 74 ) 75 break 76 except Exception: 77 self.logger.exception( 78 f'pipeline.run process_idx={self.process_idx} failed with ' 79 f'rng_state={cur_rng_state}, retrying...' 80 ) 81 if self.rng.bit_generator.state == cur_rng_state: 82 # Force to change rng state. 83 self.rng.random() 84 85 assert output is not None 86 87 self.rng_run_idx += 1 88 if self.num_runs_reset_rng and self.rng_run_idx % self.num_runs_reset_rng == 0: 89 self.logger.debug(f'pipeline.run rng_run_idx={self.rng_run_idx}, hit reset_rng') 90 self.reset_rng() 91 92 return output 93 94 95class PipelinePool(Generic[_T_OUTPUT]): 96 97 def __init__( 98 self, 99 pipeline: Pipeline[_T_OUTPUT], 100 inventory: int, 101 num_processes: int, 102 rng_seed: int, 103 num_runs_reset_rng: Optional[int] = None, 104 timeout: int = 60, 105 ): 106 self.pool = Pool( 107 config=PoolConfig( 108 inventory=inventory, 109 num_processes=num_processes, 110 pool_worker_class=PipelinePoolWorker[_T_OUTPUT], 111 pool_worker_config=PipelinePoolWorkerConfig( 112 pipeline=pipeline, 113 num_runs_reset_rng=num_runs_reset_rng, 114 ), 115 rng_seed=rng_seed, 116 timeout=timeout, 117 ) 118 ) 119 120 def cleanup(self): 121 self.pool.cleanup() 122 123 def run(self): 124 return self.pool.run()
28class PipelinePoolWorkerConfig(Generic[_T_OUTPUT]): 29 pipeline: Pipeline[_T_OUTPUT] 30 num_runs_reset_rng: Optional[int]
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
2def __init__(self, pipeline, num_runs_reset_rng): 3 self.pipeline = pipeline 4 self.num_runs_reset_rng = num_runs_reset_rng
Method generated by attrs for class PipelinePoolWorkerConfig.
33class PipelinePoolWorker(Generic[_T_OUTPUT]): 34 35 def __init__( 36 self, 37 process_idx: int, 38 seed_sequence: SeedSequence, 39 logger: logging.Logger, 40 config: PipelinePoolWorkerConfig[_T_OUTPUT], 41 ): 42 self.process_idx = process_idx 43 self.logger = logger 44 45 self.seed_sequence = seed_sequence 46 self.rng = default_rng(self.seed_sequence) 47 self.logger.info( 48 f'Set pipeline process_idx={self.process_idx} ' 49 f'rng_state to {self.rng.bit_generator.state} ' 50 ) 51 self.rng_run_idx = 0 52 53 self.pipeline = config.pipeline 54 self.num_runs_reset_rng = config.num_runs_reset_rng 55 56 def reset_rng(self): 57 self.rng = default_rng(self.seed_sequence) 58 self.rng_run_idx = 0 59 self.logger.info( 60 f'Reset pipeline process_idx={self.process_idx} ' 61 f'rng_state to {self.rng.bit_generator.state} ' 62 'and run_idx to 0' 63 ) 64 65 def run(self): 66 output: Optional[_T_OUTPUT] = None 67 68 while True: 69 cur_rng_state = self.rng.bit_generator.state 70 try: 71 output = self.pipeline.run(self.rng) 72 self.logger.debug( 73 f'pipeline.run process_idx={self.process_idx} with ' 74 f'rng_state={cur_rng_state} generates output={output}' 75 ) 76 break 77 except Exception: 78 self.logger.exception( 79 f'pipeline.run process_idx={self.process_idx} failed with ' 80 f'rng_state={cur_rng_state}, retrying...' 81 ) 82 if self.rng.bit_generator.state == cur_rng_state: 83 # Force to change rng state. 84 self.rng.random() 85 86 assert output is not None 87 88 self.rng_run_idx += 1 89 if self.num_runs_reset_rng and self.rng_run_idx % self.num_runs_reset_rng == 0: 90 self.logger.debug(f'pipeline.run rng_run_idx={self.rng_run_idx}, hit reset_rng') 91 self.reset_rng() 92 93 return output
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
35 def __init__( 36 self, 37 process_idx: int, 38 seed_sequence: SeedSequence, 39 logger: logging.Logger, 40 config: PipelinePoolWorkerConfig[_T_OUTPUT], 41 ): 42 self.process_idx = process_idx 43 self.logger = logger 44 45 self.seed_sequence = seed_sequence 46 self.rng = default_rng(self.seed_sequence) 47 self.logger.info( 48 f'Set pipeline process_idx={self.process_idx} ' 49 f'rng_state to {self.rng.bit_generator.state} ' 50 ) 51 self.rng_run_idx = 0 52 53 self.pipeline = config.pipeline 54 self.num_runs_reset_rng = config.num_runs_reset_rng
65 def run(self): 66 output: Optional[_T_OUTPUT] = None 67 68 while True: 69 cur_rng_state = self.rng.bit_generator.state 70 try: 71 output = self.pipeline.run(self.rng) 72 self.logger.debug( 73 f'pipeline.run process_idx={self.process_idx} with ' 74 f'rng_state={cur_rng_state} generates output={output}' 75 ) 76 break 77 except Exception: 78 self.logger.exception( 79 f'pipeline.run process_idx={self.process_idx} failed with ' 80 f'rng_state={cur_rng_state}, retrying...' 81 ) 82 if self.rng.bit_generator.state == cur_rng_state: 83 # Force to change rng state. 84 self.rng.random() 85 86 assert output is not None 87 88 self.rng_run_idx += 1 89 if self.num_runs_reset_rng and self.rng_run_idx % self.num_runs_reset_rng == 0: 90 self.logger.debug(f'pipeline.run rng_run_idx={self.rng_run_idx}, hit reset_rng') 91 self.reset_rng() 92 93 return output
96class PipelinePool(Generic[_T_OUTPUT]): 97 98 def __init__( 99 self, 100 pipeline: Pipeline[_T_OUTPUT], 101 inventory: int, 102 num_processes: int, 103 rng_seed: int, 104 num_runs_reset_rng: Optional[int] = None, 105 timeout: int = 60, 106 ): 107 self.pool = Pool( 108 config=PoolConfig( 109 inventory=inventory, 110 num_processes=num_processes, 111 pool_worker_class=PipelinePoolWorker[_T_OUTPUT], 112 pool_worker_config=PipelinePoolWorkerConfig( 113 pipeline=pipeline, 114 num_runs_reset_rng=num_runs_reset_rng, 115 ), 116 rng_seed=rng_seed, 117 timeout=timeout, 118 ) 119 ) 120 121 def cleanup(self): 122 self.pool.cleanup() 123 124 def run(self): 125 return self.pool.run()
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
98 def __init__( 99 self, 100 pipeline: Pipeline[_T_OUTPUT], 101 inventory: int, 102 num_processes: int, 103 rng_seed: int, 104 num_runs_reset_rng: Optional[int] = None, 105 timeout: int = 60, 106 ): 107 self.pool = Pool( 108 config=PoolConfig( 109 inventory=inventory, 110 num_processes=num_processes, 111 pool_worker_class=PipelinePoolWorker[_T_OUTPUT], 112 pool_worker_config=PipelinePoolWorkerConfig( 113 pipeline=pipeline, 114 num_runs_reset_rng=num_runs_reset_rng, 115 ), 116 rng_seed=rng_seed, 117 timeout=timeout, 118 ) 119 )