vkit.engine.char_sampler.corpus
1# Copyright 2022 vkit-x Administrator. All Rights Reserved. 2# 3# This project (vkit-x/vkit) is dual-licensed under commercial and SSPL licenses. 4# 5# The commercial license gives you the full rights to create and distribute software 6# on your own terms without any SSPL license obligations. For more information, 7# please see the "LICENSE_COMMERCIAL.txt" file. 8# 9# This project is also available under Server Side Public License (SSPL). 10# The SSPL licensing is ideal for use cases such as open source projects with 11# SSPL distribution, student/academic purposes, hobby projects, internal research 12# projects without external distribution, or other projects where all SSPL 13# obligations can be met. For more information, please see the "LICENSE_SSPL.txt" file. 14from typing import Sequence, List, Optional, Tuple 15from os.path import getsize 16import logging 17from pathlib import Path 18 19import attrs 20from numpy.random import Generator as RandomGenerator 21import iolite as io 22 23from vkit.utility import normalize_to_probs, rng_choice 24from vkit.engine.interface import Engine, EngineExecutorFactory 25from .type import CharSamplerEngineInitResource, CharSamplerEngineRunConfig 26 27logger = logging.getLogger(__name__) 28 29 30@attrs.define 31class CharSamplerCorpusEngineInitConfig: 32 txt_files: Sequence[str] 33 34 35CharSamplerCorpusEngineInitResource = CharSamplerEngineInitResource 36 37 38class CharSamplerCorpusEngine( 39 Engine[ 40 CharSamplerCorpusEngineInitConfig, 41 CharSamplerCorpusEngineInitResource, 42 CharSamplerEngineRunConfig, 43 Sequence[str], 44 ] 45): # yapf: disable 46 47 @classmethod 48 def get_type_name(cls) -> str: 49 return 'corpus' 50 51 def __init__( 52 self, 53 init_config: CharSamplerCorpusEngineInitConfig, 54 init_resource: Optional[CharSamplerCorpusEngineInitResource] = None 55 ): 56 super().__init__(init_config, init_resource) 57 58 assert init_resource 59 self.lexicon_collection = init_resource.lexicon_collection 60 61 self.txt_file_size_pairs: List[Tuple[Path, int]] = [] 62 for txt_file in init_config.txt_files: 63 txt_file = io.file(txt_file, expandvars=True, exists=True) 64 self.txt_file_size_pairs.append(( 65 txt_file, 66 getsize(txt_file), 67 )) 68 self.txt_file_probs = normalize_to_probs([size for _, size in self.txt_file_size_pairs]) 69 70 @classmethod 71 def sample_text_line_from_file( 72 cls, 73 txt_file: Path, 74 size: int, 75 rng: RandomGenerator, 76 ): 77 pos = int(rng.integers(0, size)) 78 with txt_file.open('rb') as fin: 79 # Find the next newline. 80 end = pos + 1 81 while end < size: 82 fin.seek(end) 83 if fin.read(1) == b'\n': 84 break 85 end += 1 86 # Find the prev newline. 87 begin = pos 88 while begin >= 0: 89 fin.seek(begin) 90 if fin.read(1) == b'\n': 91 break 92 begin -= 1 93 # Read line. 94 begin += 1 95 fin.seek(begin) 96 binary = fin.read(end - begin) 97 # Decode. 98 try: 99 return binary.decode() 100 except UnicodeError: 101 logger.exception(f'Failed to decode {binary}') 102 return '' 103 104 def sample_text_line(self, rng: RandomGenerator): 105 txt_file, size = rng_choice(rng, self.txt_file_size_pairs, probs=self.txt_file_probs) 106 return self.sample_text_line_from_file(txt_file, size, rng) 107 108 def sample_and_prep_text(self, rng: RandomGenerator): 109 while True: 110 text = self.sample_text_line(rng) 111 segments: List[str] = [] 112 for segment in text.split(): 113 segment = ''.join( 114 char for char in segment if self.lexicon_collection.has_char(char) 115 ) 116 if segment: 117 segments.append(segment) 118 if segments: 119 return ' '.join(segments) 120 121 def run(self, run_config: CharSamplerEngineRunConfig, rng: RandomGenerator) -> Sequence[str]: 122 if not run_config.enable_aggregator_mode: 123 num_chars = run_config.num_chars 124 if num_chars <= 0: 125 return [] 126 127 # Uniform selection. 128 texts: List[str] = [] 129 num_chars_in_texts = 0 130 while num_chars_in_texts + len(texts) - 1 < num_chars: 131 text = self.sample_and_prep_text(rng) 132 texts.append(text) 133 num_chars_in_texts += len(text) 134 135 chars = list(' '.join(texts)) 136 137 # Trim and make sure the last char is not space. 138 if len(chars) > num_chars: 139 rest = chars[num_chars:] 140 chars = chars[:num_chars] 141 if chars[-1].isspace(): 142 chars.pop() 143 assert not rest[0].isspace() 144 chars.append(rest[0]) 145 146 return chars 147 148 else: 149 return self.sample_and_prep_text(rng) 150 151 152char_sampler_corpus_engine_executor_factory = EngineExecutorFactory(CharSamplerCorpusEngine)
class
CharSamplerCorpusEngineInitConfig:
class
CharSamplerCorpusEngine(vkit.engine.interface.Engine[vkit.engine.char_sampler.corpus.CharSamplerCorpusEngineInitConfig, vkit.engine.char_sampler.type.CharSamplerEngineInitResource, vkit.engine.char_sampler.type.CharSamplerEngineRunConfig, typing.Sequence[str]]):
39class CharSamplerCorpusEngine( 40 Engine[ 41 CharSamplerCorpusEngineInitConfig, 42 CharSamplerCorpusEngineInitResource, 43 CharSamplerEngineRunConfig, 44 Sequence[str], 45 ] 46): # yapf: disable 47 48 @classmethod 49 def get_type_name(cls) -> str: 50 return 'corpus' 51 52 def __init__( 53 self, 54 init_config: CharSamplerCorpusEngineInitConfig, 55 init_resource: Optional[CharSamplerCorpusEngineInitResource] = None 56 ): 57 super().__init__(init_config, init_resource) 58 59 assert init_resource 60 self.lexicon_collection = init_resource.lexicon_collection 61 62 self.txt_file_size_pairs: List[Tuple[Path, int]] = [] 63 for txt_file in init_config.txt_files: 64 txt_file = io.file(txt_file, expandvars=True, exists=True) 65 self.txt_file_size_pairs.append(( 66 txt_file, 67 getsize(txt_file), 68 )) 69 self.txt_file_probs = normalize_to_probs([size for _, size in self.txt_file_size_pairs]) 70 71 @classmethod 72 def sample_text_line_from_file( 73 cls, 74 txt_file: Path, 75 size: int, 76 rng: RandomGenerator, 77 ): 78 pos = int(rng.integers(0, size)) 79 with txt_file.open('rb') as fin: 80 # Find the next newline. 81 end = pos + 1 82 while end < size: 83 fin.seek(end) 84 if fin.read(1) == b'\n': 85 break 86 end += 1 87 # Find the prev newline. 88 begin = pos 89 while begin >= 0: 90 fin.seek(begin) 91 if fin.read(1) == b'\n': 92 break 93 begin -= 1 94 # Read line. 95 begin += 1 96 fin.seek(begin) 97 binary = fin.read(end - begin) 98 # Decode. 99 try: 100 return binary.decode() 101 except UnicodeError: 102 logger.exception(f'Failed to decode {binary}') 103 return '' 104 105 def sample_text_line(self, rng: RandomGenerator): 106 txt_file, size = rng_choice(rng, self.txt_file_size_pairs, probs=self.txt_file_probs) 107 return self.sample_text_line_from_file(txt_file, size, rng) 108 109 def sample_and_prep_text(self, rng: RandomGenerator): 110 while True: 111 text = self.sample_text_line(rng) 112 segments: List[str] = [] 113 for segment in text.split(): 114 segment = ''.join( 115 char for char in segment if self.lexicon_collection.has_char(char) 116 ) 117 if segment: 118 segments.append(segment) 119 if segments: 120 return ' '.join(segments) 121 122 def run(self, run_config: CharSamplerEngineRunConfig, rng: RandomGenerator) -> Sequence[str]: 123 if not run_config.enable_aggregator_mode: 124 num_chars = run_config.num_chars 125 if num_chars <= 0: 126 return [] 127 128 # Uniform selection. 129 texts: List[str] = [] 130 num_chars_in_texts = 0 131 while num_chars_in_texts + len(texts) - 1 < num_chars: 132 text = self.sample_and_prep_text(rng) 133 texts.append(text) 134 num_chars_in_texts += len(text) 135 136 chars = list(' '.join(texts)) 137 138 # Trim and make sure the last char is not space. 139 if len(chars) > num_chars: 140 rest = chars[num_chars:] 141 chars = chars[:num_chars] 142 if chars[-1].isspace(): 143 chars.pop() 144 assert not rest[0].isspace() 145 chars.append(rest[0]) 146 147 return chars 148 149 else: 150 return self.sample_and_prep_text(rng)
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
CharSamplerCorpusEngine( init_config: vkit.engine.char_sampler.corpus.CharSamplerCorpusEngineInitConfig, init_resource: Union[vkit.engine.char_sampler.type.CharSamplerEngineInitResource, NoneType] = None)
52 def __init__( 53 self, 54 init_config: CharSamplerCorpusEngineInitConfig, 55 init_resource: Optional[CharSamplerCorpusEngineInitResource] = None 56 ): 57 super().__init__(init_config, init_resource) 58 59 assert init_resource 60 self.lexicon_collection = init_resource.lexicon_collection 61 62 self.txt_file_size_pairs: List[Tuple[Path, int]] = [] 63 for txt_file in init_config.txt_files: 64 txt_file = io.file(txt_file, expandvars=True, exists=True) 65 self.txt_file_size_pairs.append(( 66 txt_file, 67 getsize(txt_file), 68 )) 69 self.txt_file_probs = normalize_to_probs([size for _, size in self.txt_file_size_pairs])
@classmethod
def
sample_text_line_from_file( cls, txt_file: pathlib.Path, size: int, rng: numpy.random._generator.Generator):
71 @classmethod 72 def sample_text_line_from_file( 73 cls, 74 txt_file: Path, 75 size: int, 76 rng: RandomGenerator, 77 ): 78 pos = int(rng.integers(0, size)) 79 with txt_file.open('rb') as fin: 80 # Find the next newline. 81 end = pos + 1 82 while end < size: 83 fin.seek(end) 84 if fin.read(1) == b'\n': 85 break 86 end += 1 87 # Find the prev newline. 88 begin = pos 89 while begin >= 0: 90 fin.seek(begin) 91 if fin.read(1) == b'\n': 92 break 93 begin -= 1 94 # Read line. 95 begin += 1 96 fin.seek(begin) 97 binary = fin.read(end - begin) 98 # Decode. 99 try: 100 return binary.decode() 101 except UnicodeError: 102 logger.exception(f'Failed to decode {binary}') 103 return ''
def
sample_and_prep_text(self, rng: numpy.random._generator.Generator):
109 def sample_and_prep_text(self, rng: RandomGenerator): 110 while True: 111 text = self.sample_text_line(rng) 112 segments: List[str] = [] 113 for segment in text.split(): 114 segment = ''.join( 115 char for char in segment if self.lexicon_collection.has_char(char) 116 ) 117 if segment: 118 segments.append(segment) 119 if segments: 120 return ' '.join(segments)
def
run( self, run_config: vkit.engine.char_sampler.type.CharSamplerEngineRunConfig, rng: numpy.random._generator.Generator) -> Sequence[str]:
122 def run(self, run_config: CharSamplerEngineRunConfig, rng: RandomGenerator) -> Sequence[str]: 123 if not run_config.enable_aggregator_mode: 124 num_chars = run_config.num_chars 125 if num_chars <= 0: 126 return [] 127 128 # Uniform selection. 129 texts: List[str] = [] 130 num_chars_in_texts = 0 131 while num_chars_in_texts + len(texts) - 1 < num_chars: 132 text = self.sample_and_prep_text(rng) 133 texts.append(text) 134 num_chars_in_texts += len(text) 135 136 chars = list(' '.join(texts)) 137 138 # Trim and make sure the last char is not space. 139 if len(chars) > num_chars: 140 rest = chars[num_chars:] 141 chars = chars[:num_chars] 142 if chars[-1].isspace(): 143 chars.pop() 144 assert not rest[0].isspace() 145 chars.append(rest[0]) 146 147 return chars 148 149 else: 150 return self.sample_and_prep_text(rng)