vkit.engine.char_sampler.corpus
1# Copyright 2022 vkit-x Administrator. All Rights Reserved. 2# 3# This project (vkit-x/vkit) is dual-licensed under commercial and SSPL licenses. 4# 5# The commercial license gives you the full rights to create and distribute software 6# on your own terms without any SSPL license obligations. For more information, 7# please see the "LICENSE_COMMERCIAL.txt" file. 8# 9# This project is also available under Server Side Public License (SSPL). 10# The SSPL licensing is ideal for use cases such as open source projects with 11# SSPL distribution, student/academic purposes, hobby projects, internal research 12# projects without external distribution, or other projects where all SSPL 13# obligations can be met. For more information, please see the "LICENSE_SSPL.txt" file. 14from typing import Sequence, List, Optional, Tuple 15from os.path import getsize 16import logging 17from pathlib import Path 18 19import attrs 20from numpy.random import Generator as RandomGenerator 21import iolite as io 22 23from vkit.utility import normalize_to_probs, rng_choice 24from ..interface import Engine, EngineExecutorFactory 25from .type import CharSamplerEngineInitResource, CharSamplerEngineRunConfig 26 27logger = logging.getLogger(__name__) 28 29 30@attrs.define 31class CharSamplerCorpusEngineInitConfig: 32 txt_files: Sequence[str] 33 34 35CharSamplerCorpusEngineInitResource = CharSamplerEngineInitResource 36 37 38class CharSamplerCorpusEngine( 39 Engine[ 40 CharSamplerCorpusEngineInitConfig, 41 CharSamplerCorpusEngineInitResource, 42 CharSamplerEngineRunConfig, 43 Sequence[str], 44 ] 45): # yapf: disable 46 47 @classmethod 48 def get_type_name(cls) -> str: 49 return 'corpus' 50 51 def __init__( 52 self, 53 init_config: CharSamplerCorpusEngineInitConfig, 54 init_resource: Optional[CharSamplerCorpusEngineInitResource] = None 55 ): 56 super().__init__(init_config, init_resource) 57 58 assert init_resource 59 self.lexicon_collection = init_resource.lexicon_collection 60 61 self.txt_file_size_pairs: List[Tuple[Path, int]] = [] 62 for txt_file in init_config.txt_files: 63 txt_file = io.file(txt_file, expandvars=True, exists=True) 64 self.txt_file_size_pairs.append(( 65 txt_file, 66 getsize(txt_file), 67 )) 68 self.txt_file_probs = normalize_to_probs([size for _, size in self.txt_file_size_pairs]) 69 70 @classmethod 71 def sample_text_line_from_file( 72 cls, 73 txt_file: Path, 74 size: int, 75 rng: RandomGenerator, 76 ): 77 pos = int(rng.integers(0, size)) 78 with txt_file.open('rb') as fin: 79 # Find the next newline. 80 end = pos + 1 81 while end < size: 82 fin.seek(end) 83 if fin.read(1) == b'\n': 84 break 85 end += 1 86 # Find the prev newline. 87 begin = pos 88 while begin >= 0: 89 fin.seek(begin) 90 if fin.read(1) == b'\n': 91 break 92 begin -= 1 93 # Read line. 94 begin += 1 95 fin.seek(begin) 96 binary = fin.read(end - begin) 97 # Decode. 98 try: 99 return binary.decode() 100 except UnicodeError: 101 logger.exception(f'Failed to decode {binary}') 102 return '' 103 104 def sample_text_line(self, rng: RandomGenerator): 105 txt_file, size = rng_choice(rng, self.txt_file_size_pairs, probs=self.txt_file_probs) 106 return self.sample_text_line_from_file(txt_file, size, rng) 107 108 def sample_and_prep_text(self, rng: RandomGenerator): 109 while True: 110 text = self.sample_text_line(rng) 111 segments: List[str] = [] 112 for segment in text.split(): 113 segment = ''.join( 114 char for char in segment if self.lexicon_collection.has_char(char) 115 ) 116 if segment: 117 segments.append(segment) 118 if segments: 119 return ' '.join(segments) 120 121 def run( 122 self, 123 run_config: CharSamplerEngineRunConfig, 124 rng: Optional[RandomGenerator] = None, 125 ) -> Sequence[str]: 126 assert rng is not None 127 128 if not run_config.enable_aggregator_mode: 129 num_chars = run_config.num_chars 130 if num_chars <= 0: 131 return [] 132 133 # Uniform selection. 134 texts: List[str] = [] 135 num_chars_in_texts = 0 136 while num_chars_in_texts + len(texts) - 1 < num_chars: 137 text = self.sample_and_prep_text(rng) 138 texts.append(text) 139 num_chars_in_texts += len(text) 140 141 chars = list(' '.join(texts)) 142 143 # Trim and make sure the last char is not space. 144 if len(chars) > num_chars: 145 rest = chars[num_chars:] 146 chars = chars[:num_chars] 147 if chars[-1].isspace(): 148 chars.pop() 149 assert not rest[0].isspace() 150 chars.append(rest[0]) 151 152 return chars 153 154 else: 155 return self.sample_and_prep_text(rng) 156 157 158char_sampler_corpus_engine_executor_factory = EngineExecutorFactory(CharSamplerCorpusEngine)
class
CharSamplerCorpusEngineInitConfig:
class
CharSamplerCorpusEngine(vkit.engine.interface.Engine[vkit.engine.char_sampler.corpus.CharSamplerCorpusEngineInitConfig, vkit.engine.char_sampler.type.CharSamplerEngineInitResource, vkit.engine.char_sampler.type.CharSamplerEngineRunConfig, typing.Sequence[str]]):
39class CharSamplerCorpusEngine( 40 Engine[ 41 CharSamplerCorpusEngineInitConfig, 42 CharSamplerCorpusEngineInitResource, 43 CharSamplerEngineRunConfig, 44 Sequence[str], 45 ] 46): # yapf: disable 47 48 @classmethod 49 def get_type_name(cls) -> str: 50 return 'corpus' 51 52 def __init__( 53 self, 54 init_config: CharSamplerCorpusEngineInitConfig, 55 init_resource: Optional[CharSamplerCorpusEngineInitResource] = None 56 ): 57 super().__init__(init_config, init_resource) 58 59 assert init_resource 60 self.lexicon_collection = init_resource.lexicon_collection 61 62 self.txt_file_size_pairs: List[Tuple[Path, int]] = [] 63 for txt_file in init_config.txt_files: 64 txt_file = io.file(txt_file, expandvars=True, exists=True) 65 self.txt_file_size_pairs.append(( 66 txt_file, 67 getsize(txt_file), 68 )) 69 self.txt_file_probs = normalize_to_probs([size for _, size in self.txt_file_size_pairs]) 70 71 @classmethod 72 def sample_text_line_from_file( 73 cls, 74 txt_file: Path, 75 size: int, 76 rng: RandomGenerator, 77 ): 78 pos = int(rng.integers(0, size)) 79 with txt_file.open('rb') as fin: 80 # Find the next newline. 81 end = pos + 1 82 while end < size: 83 fin.seek(end) 84 if fin.read(1) == b'\n': 85 break 86 end += 1 87 # Find the prev newline. 88 begin = pos 89 while begin >= 0: 90 fin.seek(begin) 91 if fin.read(1) == b'\n': 92 break 93 begin -= 1 94 # Read line. 95 begin += 1 96 fin.seek(begin) 97 binary = fin.read(end - begin) 98 # Decode. 99 try: 100 return binary.decode() 101 except UnicodeError: 102 logger.exception(f'Failed to decode {binary}') 103 return '' 104 105 def sample_text_line(self, rng: RandomGenerator): 106 txt_file, size = rng_choice(rng, self.txt_file_size_pairs, probs=self.txt_file_probs) 107 return self.sample_text_line_from_file(txt_file, size, rng) 108 109 def sample_and_prep_text(self, rng: RandomGenerator): 110 while True: 111 text = self.sample_text_line(rng) 112 segments: List[str] = [] 113 for segment in text.split(): 114 segment = ''.join( 115 char for char in segment if self.lexicon_collection.has_char(char) 116 ) 117 if segment: 118 segments.append(segment) 119 if segments: 120 return ' '.join(segments) 121 122 def run( 123 self, 124 run_config: CharSamplerEngineRunConfig, 125 rng: Optional[RandomGenerator] = None, 126 ) -> Sequence[str]: 127 assert rng is not None 128 129 if not run_config.enable_aggregator_mode: 130 num_chars = run_config.num_chars 131 if num_chars <= 0: 132 return [] 133 134 # Uniform selection. 135 texts: List[str] = [] 136 num_chars_in_texts = 0 137 while num_chars_in_texts + len(texts) - 1 < num_chars: 138 text = self.sample_and_prep_text(rng) 139 texts.append(text) 140 num_chars_in_texts += len(text) 141 142 chars = list(' '.join(texts)) 143 144 # Trim and make sure the last char is not space. 145 if len(chars) > num_chars: 146 rest = chars[num_chars:] 147 chars = chars[:num_chars] 148 if chars[-1].isspace(): 149 chars.pop() 150 assert not rest[0].isspace() 151 chars.append(rest[0]) 152 153 return chars 154 155 else: 156 return self.sample_and_prep_text(rng)
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
CharSamplerCorpusEngine( init_config: vkit.engine.char_sampler.corpus.CharSamplerCorpusEngineInitConfig, init_resource: Union[vkit.engine.char_sampler.type.CharSamplerEngineInitResource, NoneType] = None)
52 def __init__( 53 self, 54 init_config: CharSamplerCorpusEngineInitConfig, 55 init_resource: Optional[CharSamplerCorpusEngineInitResource] = None 56 ): 57 super().__init__(init_config, init_resource) 58 59 assert init_resource 60 self.lexicon_collection = init_resource.lexicon_collection 61 62 self.txt_file_size_pairs: List[Tuple[Path, int]] = [] 63 for txt_file in init_config.txt_files: 64 txt_file = io.file(txt_file, expandvars=True, exists=True) 65 self.txt_file_size_pairs.append(( 66 txt_file, 67 getsize(txt_file), 68 )) 69 self.txt_file_probs = normalize_to_probs([size for _, size in self.txt_file_size_pairs])
@classmethod
def
sample_text_line_from_file( cls, txt_file: pathlib.Path, size: int, rng: numpy.random._generator.Generator):
71 @classmethod 72 def sample_text_line_from_file( 73 cls, 74 txt_file: Path, 75 size: int, 76 rng: RandomGenerator, 77 ): 78 pos = int(rng.integers(0, size)) 79 with txt_file.open('rb') as fin: 80 # Find the next newline. 81 end = pos + 1 82 while end < size: 83 fin.seek(end) 84 if fin.read(1) == b'\n': 85 break 86 end += 1 87 # Find the prev newline. 88 begin = pos 89 while begin >= 0: 90 fin.seek(begin) 91 if fin.read(1) == b'\n': 92 break 93 begin -= 1 94 # Read line. 95 begin += 1 96 fin.seek(begin) 97 binary = fin.read(end - begin) 98 # Decode. 99 try: 100 return binary.decode() 101 except UnicodeError: 102 logger.exception(f'Failed to decode {binary}') 103 return ''
def
sample_and_prep_text(self, rng: numpy.random._generator.Generator):
109 def sample_and_prep_text(self, rng: RandomGenerator): 110 while True: 111 text = self.sample_text_line(rng) 112 segments: List[str] = [] 113 for segment in text.split(): 114 segment = ''.join( 115 char for char in segment if self.lexicon_collection.has_char(char) 116 ) 117 if segment: 118 segments.append(segment) 119 if segments: 120 return ' '.join(segments)
def
run( self, run_config: vkit.engine.char_sampler.type.CharSamplerEngineRunConfig, rng: Union[numpy.random._generator.Generator, NoneType] = None) -> Sequence[str]:
122 def run( 123 self, 124 run_config: CharSamplerEngineRunConfig, 125 rng: Optional[RandomGenerator] = None, 126 ) -> Sequence[str]: 127 assert rng is not None 128 129 if not run_config.enable_aggregator_mode: 130 num_chars = run_config.num_chars 131 if num_chars <= 0: 132 return [] 133 134 # Uniform selection. 135 texts: List[str] = [] 136 num_chars_in_texts = 0 137 while num_chars_in_texts + len(texts) - 1 < num_chars: 138 text = self.sample_and_prep_text(rng) 139 texts.append(text) 140 num_chars_in_texts += len(text) 141 142 chars = list(' '.join(texts)) 143 144 # Trim and make sure the last char is not space. 145 if len(chars) > num_chars: 146 rest = chars[num_chars:] 147 chars = chars[:num_chars] 148 if chars[-1].isspace(): 149 chars.pop() 150 assert not rest[0].isspace() 151 chars.append(rest[0]) 152 153 return chars 154 155 else: 156 return self.sample_and_prep_text(rng)