vkit.pipeline.pool

  1# Copyright 2022 vkit-x Administrator. All Rights Reserved.
  2#
  3# This project (vkit-x/vkit) is dual-licensed under commercial and SSPL licenses.
  4#
  5# The commercial license gives you the full rights to create and distribute software
  6# on your own terms without any SSPL license obligations. For more information,
  7# please see the "LICENSE_COMMERCIAL.txt" file.
  8#
  9# This project is also available under Server Side Public License (SSPL).
 10# The SSPL licensing is ideal for use cases such as open source projects with
 11# SSPL distribution, student/academic purposes, hobby projects, internal research
 12# projects without external distribution, or other projects where all SSPL
 13# obligations can be met. For more information, please see the "LICENSE_SSPL.txt" file.
 14from typing import TypeVar, Generic, Optional
 15import logging
 16
 17import attrs
 18from numpy.random import SeedSequence, default_rng
 19
 20from vkit.utility import Pool, PoolConfig
 21from .interface import Pipeline
 22
 23_T_OUTPUT = TypeVar('_T_OUTPUT')
 24
 25
 26@attrs.define
 27class PipelinePoolWorkerConfig(Generic[_T_OUTPUT]):
 28    pipeline: Pipeline[_T_OUTPUT]
 29    num_runs_reset_rng: Optional[int]
 30
 31
 32class PipelinePoolWorker(Generic[_T_OUTPUT]):
 33
 34    def __init__(
 35        self,
 36        process_idx: int,
 37        seed_sequence: SeedSequence,
 38        logger: logging.Logger,
 39        config: PipelinePoolWorkerConfig[_T_OUTPUT],
 40    ):
 41        self.process_idx = process_idx
 42        self.logger = logger
 43
 44        self.seed_sequence = seed_sequence
 45        self.rng = default_rng(self.seed_sequence)
 46        self.logger.info(
 47            f'Set pipeline process_idx={self.process_idx} '
 48            f'rng_state to {self.rng.bit_generator.state} '
 49        )
 50        self.rng_run_idx = 0
 51
 52        self.pipeline = config.pipeline
 53        self.num_runs_reset_rng = config.num_runs_reset_rng
 54
 55    def reset_rng(self):
 56        self.rng = default_rng(self.seed_sequence)
 57        self.rng_run_idx = 0
 58        self.logger.info(
 59            f'Reset pipeline process_idx={self.process_idx} '
 60            f'rng_state to {self.rng.bit_generator.state} '
 61            'and run_idx to 0'
 62        )
 63
 64    def run(self):
 65        output: Optional[_T_OUTPUT] = None
 66
 67        while True:
 68            cur_rng_state = self.rng.bit_generator.state
 69            try:
 70                output = self.pipeline.run(self.rng)
 71                self.logger.debug(
 72                    f'pipeline.run process_idx={self.process_idx} with '
 73                    f'rng_state={cur_rng_state} generates output={output}'
 74                )
 75                break
 76            except Exception:
 77                self.logger.exception(
 78                    f'pipeline.run process_idx={self.process_idx} failed with '
 79                    f'rng_state={cur_rng_state}, retrying...'
 80                )
 81                if self.rng.bit_generator.state == cur_rng_state:
 82                    # Force to change rng state.
 83                    self.rng.random()
 84
 85        assert output is not None
 86
 87        self.rng_run_idx += 1
 88        if self.num_runs_reset_rng and self.rng_run_idx % self.num_runs_reset_rng == 0:
 89            self.logger.debug(f'pipeline.run rng_run_idx={self.rng_run_idx}, hit reset_rng')
 90            self.reset_rng()
 91
 92        return output
 93
 94
 95class PipelinePool(Generic[_T_OUTPUT]):
 96
 97    def __init__(
 98        self,
 99        pipeline: Pipeline[_T_OUTPUT],
100        inventory: int,
101        num_processes: int,
102        rng_seed: int,
103        num_runs_reset_rng: Optional[int] = None,
104        timeout: int = 60,
105    ):
106        self.pool = Pool(
107            config=PoolConfig(
108                inventory=inventory,
109                num_processes=num_processes,
110                pool_worker_class=PipelinePoolWorker[_T_OUTPUT],
111                pool_worker_config=PipelinePoolWorkerConfig(
112                    pipeline=pipeline,
113                    num_runs_reset_rng=num_runs_reset_rng,
114                ),
115                rng_seed=rng_seed,
116                timeout=timeout,
117            )
118        )
119
120    def cleanup(self):
121        self.pool.cleanup()
122
123    def run(self):
124        return self.pool.run()
class PipelinePoolWorkerConfig(typing.Generic[~_T_OUTPUT]):
28class PipelinePoolWorkerConfig(Generic[_T_OUTPUT]):
29    pipeline: Pipeline[_T_OUTPUT]
30    num_runs_reset_rng: Optional[int]

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

PipelinePoolWorkerConfig( pipeline: vkit.pipeline.interface.Pipeline[~_T_OUTPUT], num_runs_reset_rng: Union[int, NoneType])
2def __init__(self, pipeline, num_runs_reset_rng):
3    self.pipeline = pipeline
4    self.num_runs_reset_rng = num_runs_reset_rng

Method generated by attrs for class PipelinePoolWorkerConfig.

class PipelinePoolWorker(typing.Generic[~_T_OUTPUT]):
33class PipelinePoolWorker(Generic[_T_OUTPUT]):
34
35    def __init__(
36        self,
37        process_idx: int,
38        seed_sequence: SeedSequence,
39        logger: logging.Logger,
40        config: PipelinePoolWorkerConfig[_T_OUTPUT],
41    ):
42        self.process_idx = process_idx
43        self.logger = logger
44
45        self.seed_sequence = seed_sequence
46        self.rng = default_rng(self.seed_sequence)
47        self.logger.info(
48            f'Set pipeline process_idx={self.process_idx} '
49            f'rng_state to {self.rng.bit_generator.state} '
50        )
51        self.rng_run_idx = 0
52
53        self.pipeline = config.pipeline
54        self.num_runs_reset_rng = config.num_runs_reset_rng
55
56    def reset_rng(self):
57        self.rng = default_rng(self.seed_sequence)
58        self.rng_run_idx = 0
59        self.logger.info(
60            f'Reset pipeline process_idx={self.process_idx} '
61            f'rng_state to {self.rng.bit_generator.state} '
62            'and run_idx to 0'
63        )
64
65    def run(self):
66        output: Optional[_T_OUTPUT] = None
67
68        while True:
69            cur_rng_state = self.rng.bit_generator.state
70            try:
71                output = self.pipeline.run(self.rng)
72                self.logger.debug(
73                    f'pipeline.run process_idx={self.process_idx} with '
74                    f'rng_state={cur_rng_state} generates output={output}'
75                )
76                break
77            except Exception:
78                self.logger.exception(
79                    f'pipeline.run process_idx={self.process_idx} failed with '
80                    f'rng_state={cur_rng_state}, retrying...'
81                )
82                if self.rng.bit_generator.state == cur_rng_state:
83                    # Force to change rng state.
84                    self.rng.random()
85
86        assert output is not None
87
88        self.rng_run_idx += 1
89        if self.num_runs_reset_rng and self.rng_run_idx % self.num_runs_reset_rng == 0:
90            self.logger.debug(f'pipeline.run rng_run_idx={self.rng_run_idx}, hit reset_rng')
91            self.reset_rng()
92
93        return output

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

PipelinePoolWorker( process_idx: int, seed_sequence: numpy.random.bit_generator.SeedSequence, logger: logging.Logger, config: vkit.pipeline.pool.PipelinePoolWorkerConfig[~_T_OUTPUT])
35    def __init__(
36        self,
37        process_idx: int,
38        seed_sequence: SeedSequence,
39        logger: logging.Logger,
40        config: PipelinePoolWorkerConfig[_T_OUTPUT],
41    ):
42        self.process_idx = process_idx
43        self.logger = logger
44
45        self.seed_sequence = seed_sequence
46        self.rng = default_rng(self.seed_sequence)
47        self.logger.info(
48            f'Set pipeline process_idx={self.process_idx} '
49            f'rng_state to {self.rng.bit_generator.state} '
50        )
51        self.rng_run_idx = 0
52
53        self.pipeline = config.pipeline
54        self.num_runs_reset_rng = config.num_runs_reset_rng
def reset_rng(self):
56    def reset_rng(self):
57        self.rng = default_rng(self.seed_sequence)
58        self.rng_run_idx = 0
59        self.logger.info(
60            f'Reset pipeline process_idx={self.process_idx} '
61            f'rng_state to {self.rng.bit_generator.state} '
62            'and run_idx to 0'
63        )
def run(self):
65    def run(self):
66        output: Optional[_T_OUTPUT] = None
67
68        while True:
69            cur_rng_state = self.rng.bit_generator.state
70            try:
71                output = self.pipeline.run(self.rng)
72                self.logger.debug(
73                    f'pipeline.run process_idx={self.process_idx} with '
74                    f'rng_state={cur_rng_state} generates output={output}'
75                )
76                break
77            except Exception:
78                self.logger.exception(
79                    f'pipeline.run process_idx={self.process_idx} failed with '
80                    f'rng_state={cur_rng_state}, retrying...'
81                )
82                if self.rng.bit_generator.state == cur_rng_state:
83                    # Force to change rng state.
84                    self.rng.random()
85
86        assert output is not None
87
88        self.rng_run_idx += 1
89        if self.num_runs_reset_rng and self.rng_run_idx % self.num_runs_reset_rng == 0:
90            self.logger.debug(f'pipeline.run rng_run_idx={self.rng_run_idx}, hit reset_rng')
91            self.reset_rng()
92
93        return output
class PipelinePool(typing.Generic[~_T_OUTPUT]):
 96class PipelinePool(Generic[_T_OUTPUT]):
 97
 98    def __init__(
 99        self,
100        pipeline: Pipeline[_T_OUTPUT],
101        inventory: int,
102        num_processes: int,
103        rng_seed: int,
104        num_runs_reset_rng: Optional[int] = None,
105        timeout: int = 60,
106    ):
107        self.pool = Pool(
108            config=PoolConfig(
109                inventory=inventory,
110                num_processes=num_processes,
111                pool_worker_class=PipelinePoolWorker[_T_OUTPUT],
112                pool_worker_config=PipelinePoolWorkerConfig(
113                    pipeline=pipeline,
114                    num_runs_reset_rng=num_runs_reset_rng,
115                ),
116                rng_seed=rng_seed,
117                timeout=timeout,
118            )
119        )
120
121    def cleanup(self):
122        self.pool.cleanup()
123
124    def run(self):
125        return self.pool.run()

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

PipelinePool( pipeline: vkit.pipeline.interface.Pipeline[~_T_OUTPUT], inventory: int, num_processes: int, rng_seed: int, num_runs_reset_rng: Union[int, NoneType] = None, timeout: int = 60)
 98    def __init__(
 99        self,
100        pipeline: Pipeline[_T_OUTPUT],
101        inventory: int,
102        num_processes: int,
103        rng_seed: int,
104        num_runs_reset_rng: Optional[int] = None,
105        timeout: int = 60,
106    ):
107        self.pool = Pool(
108            config=PoolConfig(
109                inventory=inventory,
110                num_processes=num_processes,
111                pool_worker_class=PipelinePoolWorker[_T_OUTPUT],
112                pool_worker_config=PipelinePoolWorkerConfig(
113                    pipeline=pipeline,
114                    num_runs_reset_rng=num_runs_reset_rng,
115                ),
116                rng_seed=rng_seed,
117                timeout=timeout,
118            )
119        )
def cleanup(self):
121    def cleanup(self):
122        self.pool.cleanup()
def run(self):
124    def run(self):
125        return self.pool.run()