vkit.engine.char_sampler.corpus

  1# Copyright 2022 vkit-x Administrator. All Rights Reserved.
  2#
  3# This project (vkit-x/vkit) is dual-licensed under commercial and SSPL licenses.
  4#
  5# The commercial license gives you the full rights to create and distribute software
  6# on your own terms without any SSPL license obligations. For more information,
  7# please see the "LICENSE_COMMERCIAL.txt" file.
  8#
  9# This project is also available under Server Side Public License (SSPL).
 10# The SSPL licensing is ideal for use cases such as open source projects with
 11# SSPL distribution, student/academic purposes, hobby projects, internal research
 12# projects without external distribution, or other projects where all SSPL
 13# obligations can be met. For more information, please see the "LICENSE_SSPL.txt" file.
 14from typing import Sequence, List, Optional, Tuple
 15from os.path import getsize
 16import logging
 17from pathlib import Path
 18
 19import attrs
 20from numpy.random import Generator as RandomGenerator
 21import iolite as io
 22
 23from vkit.utility import normalize_to_probs, rng_choice
 24from vkit.engine.interface import Engine, EngineExecutorFactory
 25from .type import CharSamplerEngineInitResource, CharSamplerEngineRunConfig
 26
 27logger = logging.getLogger(__name__)
 28
 29
 30@attrs.define
 31class CharSamplerCorpusEngineInitConfig:
 32    txt_files: Sequence[str]
 33
 34
 35CharSamplerCorpusEngineInitResource = CharSamplerEngineInitResource
 36
 37
 38class CharSamplerCorpusEngine(
 39    Engine[
 40        CharSamplerCorpusEngineInitConfig,
 41        CharSamplerCorpusEngineInitResource,
 42        CharSamplerEngineRunConfig,
 43        Sequence[str],
 44    ]
 45):  # yapf: disable
 46
 47    @classmethod
 48    def get_type_name(cls) -> str:
 49        return 'corpus'
 50
 51    def __init__(
 52        self,
 53        init_config: CharSamplerCorpusEngineInitConfig,
 54        init_resource: Optional[CharSamplerCorpusEngineInitResource] = None
 55    ):
 56        super().__init__(init_config, init_resource)
 57
 58        assert init_resource
 59        self.lexicon_collection = init_resource.lexicon_collection
 60
 61        self.txt_file_size_pairs: List[Tuple[Path, int]] = []
 62        for txt_file in init_config.txt_files:
 63            txt_file = io.file(txt_file, expandvars=True, exists=True)
 64            self.txt_file_size_pairs.append((
 65                txt_file,
 66                getsize(txt_file),
 67            ))
 68        self.txt_file_probs = normalize_to_probs([size for _, size in self.txt_file_size_pairs])
 69
 70    @classmethod
 71    def sample_text_line_from_file(
 72        cls,
 73        txt_file: Path,
 74        size: int,
 75        rng: RandomGenerator,
 76    ):
 77        pos = int(rng.integers(0, size))
 78        with txt_file.open('rb') as fin:
 79            # Find the next newline.
 80            end = pos + 1
 81            while end < size:
 82                fin.seek(end)
 83                if fin.read(1) == b'\n':
 84                    break
 85                end += 1
 86            # Find the prev newline.
 87            begin = pos
 88            while begin >= 0:
 89                fin.seek(begin)
 90                if fin.read(1) == b'\n':
 91                    break
 92                begin -= 1
 93            # Read line.
 94            begin += 1
 95            fin.seek(begin)
 96            binary = fin.read(end - begin)
 97            # Decode.
 98            try:
 99                return binary.decode()
100            except UnicodeError:
101                logger.exception(f'Failed to decode {binary}')
102                return ''
103
104    def sample_text_line(self, rng: RandomGenerator):
105        txt_file, size = rng_choice(rng, self.txt_file_size_pairs, probs=self.txt_file_probs)
106        return self.sample_text_line_from_file(txt_file, size, rng)
107
108    def sample_and_prep_text(self, rng: RandomGenerator):
109        while True:
110            text = self.sample_text_line(rng)
111            segments: List[str] = []
112            for segment in text.split():
113                segment = ''.join(
114                    char for char in segment if self.lexicon_collection.has_char(char)
115                )
116                if segment:
117                    segments.append(segment)
118            if segments:
119                return ' '.join(segments)
120
121    def run(self, run_config: CharSamplerEngineRunConfig, rng: RandomGenerator) -> Sequence[str]:
122        if not run_config.enable_aggregator_mode:
123            num_chars = run_config.num_chars
124            if num_chars <= 0:
125                return []
126
127            # Uniform selection.
128            texts: List[str] = []
129            num_chars_in_texts = 0
130            while num_chars_in_texts + len(texts) - 1 < num_chars:
131                text = self.sample_and_prep_text(rng)
132                texts.append(text)
133                num_chars_in_texts += len(text)
134
135            chars = list(' '.join(texts))
136
137            # Trim and make sure the last char is not space.
138            if len(chars) > num_chars:
139                rest = chars[num_chars:]
140                chars = chars[:num_chars]
141                if chars[-1].isspace():
142                    chars.pop()
143                    assert not rest[0].isspace()
144                    chars.append(rest[0])
145
146            return chars
147
148        else:
149            return self.sample_and_prep_text(rng)
150
151
152char_sampler_corpus_engine_executor_factory = EngineExecutorFactory(CharSamplerCorpusEngine)
class CharSamplerCorpusEngineInitConfig:
32class CharSamplerCorpusEngineInitConfig:
33    txt_files: Sequence[str]
CharSamplerCorpusEngineInitConfig(txt_files: Sequence[str])
2def __init__(self, txt_files):
3    self.txt_files = txt_files

Method generated by attrs for class CharSamplerCorpusEngineInitConfig.

 39class CharSamplerCorpusEngine(
 40    Engine[
 41        CharSamplerCorpusEngineInitConfig,
 42        CharSamplerCorpusEngineInitResource,
 43        CharSamplerEngineRunConfig,
 44        Sequence[str],
 45    ]
 46):  # yapf: disable
 47
 48    @classmethod
 49    def get_type_name(cls) -> str:
 50        return 'corpus'
 51
 52    def __init__(
 53        self,
 54        init_config: CharSamplerCorpusEngineInitConfig,
 55        init_resource: Optional[CharSamplerCorpusEngineInitResource] = None
 56    ):
 57        super().__init__(init_config, init_resource)
 58
 59        assert init_resource
 60        self.lexicon_collection = init_resource.lexicon_collection
 61
 62        self.txt_file_size_pairs: List[Tuple[Path, int]] = []
 63        for txt_file in init_config.txt_files:
 64            txt_file = io.file(txt_file, expandvars=True, exists=True)
 65            self.txt_file_size_pairs.append((
 66                txt_file,
 67                getsize(txt_file),
 68            ))
 69        self.txt_file_probs = normalize_to_probs([size for _, size in self.txt_file_size_pairs])
 70
 71    @classmethod
 72    def sample_text_line_from_file(
 73        cls,
 74        txt_file: Path,
 75        size: int,
 76        rng: RandomGenerator,
 77    ):
 78        pos = int(rng.integers(0, size))
 79        with txt_file.open('rb') as fin:
 80            # Find the next newline.
 81            end = pos + 1
 82            while end < size:
 83                fin.seek(end)
 84                if fin.read(1) == b'\n':
 85                    break
 86                end += 1
 87            # Find the prev newline.
 88            begin = pos
 89            while begin >= 0:
 90                fin.seek(begin)
 91                if fin.read(1) == b'\n':
 92                    break
 93                begin -= 1
 94            # Read line.
 95            begin += 1
 96            fin.seek(begin)
 97            binary = fin.read(end - begin)
 98            # Decode.
 99            try:
100                return binary.decode()
101            except UnicodeError:
102                logger.exception(f'Failed to decode {binary}')
103                return ''
104
105    def sample_text_line(self, rng: RandomGenerator):
106        txt_file, size = rng_choice(rng, self.txt_file_size_pairs, probs=self.txt_file_probs)
107        return self.sample_text_line_from_file(txt_file, size, rng)
108
109    def sample_and_prep_text(self, rng: RandomGenerator):
110        while True:
111            text = self.sample_text_line(rng)
112            segments: List[str] = []
113            for segment in text.split():
114                segment = ''.join(
115                    char for char in segment if self.lexicon_collection.has_char(char)
116                )
117                if segment:
118                    segments.append(segment)
119            if segments:
120                return ' '.join(segments)
121
122    def run(self, run_config: CharSamplerEngineRunConfig, rng: RandomGenerator) -> Sequence[str]:
123        if not run_config.enable_aggregator_mode:
124            num_chars = run_config.num_chars
125            if num_chars <= 0:
126                return []
127
128            # Uniform selection.
129            texts: List[str] = []
130            num_chars_in_texts = 0
131            while num_chars_in_texts + len(texts) - 1 < num_chars:
132                text = self.sample_and_prep_text(rng)
133                texts.append(text)
134                num_chars_in_texts += len(text)
135
136            chars = list(' '.join(texts))
137
138            # Trim and make sure the last char is not space.
139            if len(chars) > num_chars:
140                rest = chars[num_chars:]
141                chars = chars[:num_chars]
142                if chars[-1].isspace():
143                    chars.pop()
144                    assert not rest[0].isspace()
145                    chars.append(rest[0])
146
147            return chars
148
149        else:
150            return self.sample_and_prep_text(rng)

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

CharSamplerCorpusEngine( init_config: vkit.engine.char_sampler.corpus.CharSamplerCorpusEngineInitConfig, init_resource: Union[vkit.engine.char_sampler.type.CharSamplerEngineInitResource, NoneType] = None)
52    def __init__(
53        self,
54        init_config: CharSamplerCorpusEngineInitConfig,
55        init_resource: Optional[CharSamplerCorpusEngineInitResource] = None
56    ):
57        super().__init__(init_config, init_resource)
58
59        assert init_resource
60        self.lexicon_collection = init_resource.lexicon_collection
61
62        self.txt_file_size_pairs: List[Tuple[Path, int]] = []
63        for txt_file in init_config.txt_files:
64            txt_file = io.file(txt_file, expandvars=True, exists=True)
65            self.txt_file_size_pairs.append((
66                txt_file,
67                getsize(txt_file),
68            ))
69        self.txt_file_probs = normalize_to_probs([size for _, size in self.txt_file_size_pairs])
@classmethod
def get_type_name(cls) -> str:
48    @classmethod
49    def get_type_name(cls) -> str:
50        return 'corpus'
@classmethod
def sample_text_line_from_file( cls, txt_file: pathlib.Path, size: int, rng: numpy.random._generator.Generator):
 71    @classmethod
 72    def sample_text_line_from_file(
 73        cls,
 74        txt_file: Path,
 75        size: int,
 76        rng: RandomGenerator,
 77    ):
 78        pos = int(rng.integers(0, size))
 79        with txt_file.open('rb') as fin:
 80            # Find the next newline.
 81            end = pos + 1
 82            while end < size:
 83                fin.seek(end)
 84                if fin.read(1) == b'\n':
 85                    break
 86                end += 1
 87            # Find the prev newline.
 88            begin = pos
 89            while begin >= 0:
 90                fin.seek(begin)
 91                if fin.read(1) == b'\n':
 92                    break
 93                begin -= 1
 94            # Read line.
 95            begin += 1
 96            fin.seek(begin)
 97            binary = fin.read(end - begin)
 98            # Decode.
 99            try:
100                return binary.decode()
101            except UnicodeError:
102                logger.exception(f'Failed to decode {binary}')
103                return ''
def sample_text_line(self, rng: numpy.random._generator.Generator):
105    def sample_text_line(self, rng: RandomGenerator):
106        txt_file, size = rng_choice(rng, self.txt_file_size_pairs, probs=self.txt_file_probs)
107        return self.sample_text_line_from_file(txt_file, size, rng)
def sample_and_prep_text(self, rng: numpy.random._generator.Generator):
109    def sample_and_prep_text(self, rng: RandomGenerator):
110        while True:
111            text = self.sample_text_line(rng)
112            segments: List[str] = []
113            for segment in text.split():
114                segment = ''.join(
115                    char for char in segment if self.lexicon_collection.has_char(char)
116                )
117                if segment:
118                    segments.append(segment)
119            if segments:
120                return ' '.join(segments)
def run( self, run_config: vkit.engine.char_sampler.type.CharSamplerEngineRunConfig, rng: numpy.random._generator.Generator) -> Sequence[str]:
122    def run(self, run_config: CharSamplerEngineRunConfig, rng: RandomGenerator) -> Sequence[str]:
123        if not run_config.enable_aggregator_mode:
124            num_chars = run_config.num_chars
125            if num_chars <= 0:
126                return []
127
128            # Uniform selection.
129            texts: List[str] = []
130            num_chars_in_texts = 0
131            while num_chars_in_texts + len(texts) - 1 < num_chars:
132                text = self.sample_and_prep_text(rng)
133                texts.append(text)
134                num_chars_in_texts += len(text)
135
136            chars = list(' '.join(texts))
137
138            # Trim and make sure the last char is not space.
139            if len(chars) > num_chars:
140                rest = chars[num_chars:]
141                chars = chars[:num_chars]
142                if chars[-1].isspace():
143                    chars.pop()
144                    assert not rest[0].isspace()
145                    chars.append(rest[0])
146
147            return chars
148
149        else:
150            return self.sample_and_prep_text(rng)