vkit.engine.image.combiner
1# Copyright 2022 vkit-x Administrator. All Rights Reserved. 2# 3# This project (vkit-x/vkit) is dual-licensed under commercial and SSPL licenses. 4# 5# The commercial license gives you the full rights to create and distribute software 6# on your own terms without any SSPL license obligations. For more information, 7# please see the "LICENSE_COMMERCIAL.txt" file. 8# 9# This project is also available under Server Side Public License (SSPL). 10# The SSPL licensing is ideal for use cases such as open source projects with 11# SSPL distribution, student/academic purposes, hobby projects, internal research 12# projects without external distribution, or other projects where all SSPL 13# obligations can be met. For more information, please see the "LICENSE_SSPL.txt" file. 14from typing import Sequence, List, Dict, Optional 15import bisect 16import heapq 17 18import attrs 19from numpy.random import Generator as RandomGenerator 20import numpy as np 21import cv2 as cv 22import iolite as io 23 24from vkit.utility import rng_choice, read_json_file 25from vkit.element import Image, ImageMode, Mask 26from vkit.mechanism.distortion import rotate 27from ..interface import ( 28 Engine, 29 EngineExecutorFactory, 30 NoneTypeEngineInitResource, 31) 32from .type import ImageEngineRunConfig 33 34 35@attrs.define(frozen=True) 36class ImageMeta: 37 image_file: str 38 grayscale_mean: float 39 grayscale_std: float 40 41 42class FolderTree: 43 IMAGE = 'image' 44 METAS_JSON = 'metas.json' 45 46 47def load_image_metas_from_folder(folder: str): 48 in_fd = io.folder(folder, expandvars=True, exists=True) 49 image_fd = io.folder( 50 in_fd / FolderTree.IMAGE, 51 exists=True, 52 ) 53 metas_json = io.file( 54 in_fd / FolderTree.METAS_JSON, 55 exists=True, 56 ) 57 58 image_metas: List[ImageMeta] = [] 59 for meta in read_json_file(metas_json): 60 image_file = io.file(image_fd / meta['image_file'], exists=True) 61 image_metas.append( 62 ImageMeta( 63 image_file=str(image_file), 64 grayscale_mean=meta['grayscale_mean'], 65 grayscale_std=meta['grayscale_std'], 66 ) 67 ) 68 69 return image_metas 70 71 72@attrs.define 73class ImageCombinerEngineInitConfig: 74 image_meta_folder: str 75 target_image_mode: ImageMode = ImageMode.RGB 76 enable_cache: bool = False 77 prob_use_only_the_anchor_image: float = 0.7 78 prob_rotate_image: float = 0.5 79 sigma: float = 3.0 80 init_segment_width_min_ratio: float = 0.25 81 gaussian_blur_kernel_size = 5 82 83 84@attrs.define(order=True) 85class PrioritizedSegment: 86 y: int = attrs.field(order=True) 87 left: int = attrs.field(order=False) 88 right: int = attrs.field(order=False) 89 90 91class ImageCombinerEngine( 92 Engine[ 93 ImageCombinerEngineInitConfig, 94 NoneTypeEngineInitResource, 95 ImageEngineRunConfig, 96 Image, 97 ] 98): # yapf: disable 99 100 @classmethod 101 def get_type_name(cls) -> str: 102 return 'combiner' 103 104 def __init__( 105 self, 106 init_config: ImageCombinerEngineInitConfig, 107 init_resource: Optional[NoneTypeEngineInitResource] = None, 108 ): 109 super().__init__(init_config, init_resource) 110 111 self.image_metas = load_image_metas_from_folder(init_config.image_meta_folder) 112 self.image_metas = sorted( 113 self.image_metas, 114 key=lambda meta: meta.grayscale_mean, 115 ) 116 self.image_metas_grayscale_means = [ 117 image_meta.grayscale_mean for image_meta in self.image_metas 118 ] 119 self.enable_cache = init_config.enable_cache 120 self.image_file_to_cache_image: Dict[str, Image] = {} 121 122 def sample_image_metas_based_on_random_anchor( 123 self, 124 run_config: ImageEngineRunConfig, 125 rng: RandomGenerator, 126 ): 127 # Get candidates based on anchor. 128 anchor_image_meta = rng_choice(rng, self.image_metas) 129 130 if rng.random() < self.init_config.prob_use_only_the_anchor_image: 131 return [anchor_image_meta] 132 133 else: 134 grayscale_std = anchor_image_meta.grayscale_std 135 grayscale_mean = anchor_image_meta.grayscale_mean 136 137 grayscale_begin = round(grayscale_mean - self.init_config.sigma * grayscale_std) 138 grayscale_end = round(grayscale_mean + self.init_config.sigma * grayscale_std) 139 140 index_begin = bisect.bisect_left(self.image_metas_grayscale_means, x=grayscale_begin) 141 index_end = bisect.bisect_right(self.image_metas_grayscale_means, x=grayscale_end) 142 image_metas = self.image_metas[index_begin:index_end] 143 assert image_metas 144 return image_metas 145 146 @classmethod 147 def fill_np_edge_mask( 148 cls, 149 np_edge_mask: np.ndarray, 150 height: int, 151 width: int, 152 gaussian_blur_half_kernel_size: int, 153 up: int, 154 down: int, 155 left: int, 156 right: int, 157 ): 158 # Fill up. 159 up_min = max(0, up - gaussian_blur_half_kernel_size) 160 up_max = min(height - 1, up + gaussian_blur_half_kernel_size) 161 np_edge_mask[up_min:up_max + 1, left:right + 1] = 1 162 163 # Fill down. 164 down_min = max(0, down - gaussian_blur_half_kernel_size) 165 down_max = min(height - 1, down + gaussian_blur_half_kernel_size) 166 np_edge_mask[down_min:down_max + 1, left:right + 1] = 1 167 168 # Fill left. 169 left_min = max(0, left - gaussian_blur_half_kernel_size) 170 left_max = min(width - 1, left + gaussian_blur_half_kernel_size) 171 np_edge_mask[up:down + 1, left_min:left_max + 1] = 1 172 173 # Fill right. 174 right_min = max(0, right - gaussian_blur_half_kernel_size) 175 right_max = min(width - 1, right + gaussian_blur_half_kernel_size) 176 np_edge_mask[up:down + 1, right_min:right_max + 1] = 1 177 178 def synthesize_image( 179 self, 180 run_config: ImageEngineRunConfig, 181 image_metas: Sequence[ImageMeta], 182 rng: RandomGenerator, 183 ): 184 height = run_config.height 185 width = run_config.width 186 187 mat = np.zeros((height, width, 3), dtype=np.uint8) 188 edge_mask = Mask.from_shape((height, width)) 189 gaussian_blur_half_kernel_size = self.init_config.gaussian_blur_kernel_size // 2 + 1 190 191 # Initialize segments. 192 priority_queue: List[PrioritizedSegment] = [] 193 segment_width_min = int( 194 np.clip( 195 round(self.init_config.init_segment_width_min_ratio * width), 196 1, 197 width - 1, 198 ) 199 ) 200 left = 0 201 while left + segment_width_min - 1 < width: 202 right = rng.integers( 203 left + segment_width_min - 1, 204 width, 205 ) 206 if right + 1 - left < segment_width_min or width - right - 1 < segment_width_min: 207 break 208 priority_queue.append(PrioritizedSegment( 209 y=0, 210 left=left, 211 right=right, 212 )) 213 left = right + 1 214 if left < width: 215 priority_queue.append(PrioritizedSegment( 216 y=0, 217 left=left, 218 right=width - 1, 219 )) 220 221 # For random rotation. 222 image_file_to_rotate_flag: Dict[str, bool] = {} 223 224 while priority_queue: 225 # Pop a segment 226 cur_segment = heapq.heappop(priority_queue) 227 228 # Deal with connection. 229 segments: List[PrioritizedSegment] = [] 230 while priority_queue and priority_queue[0].y == cur_segment.y: 231 segments.append(heapq.heappop(priority_queue)) 232 233 if segments: 234 segments.append(cur_segment) 235 segments = sorted(segments, key=lambda segment: segment.left) 236 cur_segment_idx = -1 237 for segment_idx, segment in enumerate(segments): 238 if segment.left == cur_segment.left and segment.right == cur_segment.right: 239 cur_segment_idx = segment_idx 240 break 241 assert cur_segment_idx >= 0 242 243 begin = cur_segment_idx 244 while begin > 0 and segments[begin - 1].right + 1 == segments[begin].left: 245 begin -= 1 246 end = cur_segment_idx 247 while end + 1 < len(segments) and segments[end].right + 1 == segments[end + 1].left: 248 end += 1 249 250 if begin < end: 251 # Update the current segment. 252 cur_segment.left = segments[begin].left 253 cur_segment.right = segments[end].right 254 255 # Push back. 256 for segment in segments[:begin]: 257 heapq.heappush(priority_queue, segment) 258 for segment in segments[end + 1:]: 259 heapq.heappush(priority_queue, segment) 260 261 # Load image. 262 image_meta = rng_choice(rng, image_metas) 263 264 if self.enable_cache and image_meta.image_file in self.image_file_to_cache_image: 265 segment_image = self.image_file_to_cache_image[image_meta.image_file] 266 267 else: 268 segment_image = Image.from_file(image_meta.image_file).to_target_mode_image( 269 self.init_config.target_image_mode 270 ) 271 272 if image_meta.image_file not in image_file_to_rotate_flag: 273 rotate_flag = (rng.random() < self.init_config.prob_rotate_image) 274 image_file_to_rotate_flag[image_meta.image_file] = rotate_flag 275 276 if image_file_to_rotate_flag[image_meta.image_file]: 277 segment_image = rotate.distort_image( 278 {'angle': 90}, 279 image=segment_image, 280 ) 281 282 if self.enable_cache: 283 self.image_file_to_cache_image[image_meta.image_file] = segment_image 284 285 # Fill image and edge mask. 286 up = cur_segment.y 287 down = min(height - 1, up + segment_image.height - 1) 288 left = cur_segment.left 289 right = min(cur_segment.right, left + segment_image.width - 1) 290 mat[up:down + 1, left:right + 1] = \ 291 segment_image.mat[:down + 1 - up, :right + 1 - left] 292 293 with edge_mask.writable_context: 294 self.fill_np_edge_mask( 295 np_edge_mask=edge_mask.mat, 296 height=height, 297 width=width, 298 gaussian_blur_half_kernel_size=gaussian_blur_half_kernel_size, 299 up=up, 300 down=down, 301 left=left, 302 right=right, 303 ) 304 305 # Update segments. 306 if right == cur_segment.right: 307 # Reach the current right end. 308 cur_segment.y = down + 1 309 if cur_segment.y < height: 310 heapq.heappush(priority_queue, cur_segment) 311 else: 312 # Not reaching the right end. 313 assert right < cur_segment.right 314 new_segment = PrioritizedSegment( 315 y=down + 1, 316 left=left, 317 right=right, 318 ) 319 if new_segment.y < height: 320 heapq.heappush(priority_queue, new_segment) 321 322 cur_segment.left = right + 1 323 heapq.heappush(priority_queue, cur_segment) 324 325 # Apply gaussian blur. 326 gaussian_blur_sigma = gaussian_blur_half_kernel_size / 3 327 gaussian_blur_ksize = (self.init_config.gaussian_blur_kernel_size,) * 2 328 edge_mask.fill_np_array( 329 mat, 330 cv.GaussianBlur(mat, gaussian_blur_ksize, gaussian_blur_sigma), 331 ) 332 333 return Image(mat=mat) 334 335 def run( 336 self, 337 run_config: ImageEngineRunConfig, 338 rng: Optional[RandomGenerator] = None, 339 ) -> Image: 340 assert rng is not None 341 342 assert not run_config.disable_resizing 343 image_metas = self.sample_image_metas_based_on_random_anchor(run_config, rng) 344 return self.synthesize_image(run_config, image_metas, rng) 345 346 347image_combiner_engine_executor_factory = EngineExecutorFactory(ImageCombinerEngine)
class
ImageMeta:
ImageMeta(image_file: str, grayscale_mean: float, grayscale_std: float)
2def __init__(self, image_file, grayscale_mean, grayscale_std): 3 _setattr = _cached_setattr_get(self) 4 _setattr('image_file', image_file) 5 _setattr('grayscale_mean', grayscale_mean) 6 _setattr('grayscale_std', grayscale_std)
Method generated by attrs for class ImageMeta.
class
FolderTree:
def
load_image_metas_from_folder(folder: str):
48def load_image_metas_from_folder(folder: str): 49 in_fd = io.folder(folder, expandvars=True, exists=True) 50 image_fd = io.folder( 51 in_fd / FolderTree.IMAGE, 52 exists=True, 53 ) 54 metas_json = io.file( 55 in_fd / FolderTree.METAS_JSON, 56 exists=True, 57 ) 58 59 image_metas: List[ImageMeta] = [] 60 for meta in read_json_file(metas_json): 61 image_file = io.file(image_fd / meta['image_file'], exists=True) 62 image_metas.append( 63 ImageMeta( 64 image_file=str(image_file), 65 grayscale_mean=meta['grayscale_mean'], 66 grayscale_std=meta['grayscale_std'], 67 ) 68 ) 69 70 return image_metas
class
ImageCombinerEngineInitConfig:
74class ImageCombinerEngineInitConfig: 75 image_meta_folder: str 76 target_image_mode: ImageMode = ImageMode.RGB 77 enable_cache: bool = False 78 prob_use_only_the_anchor_image: float = 0.7 79 prob_rotate_image: float = 0.5 80 sigma: float = 3.0 81 init_segment_width_min_ratio: float = 0.25 82 gaussian_blur_kernel_size = 5
ImageCombinerEngineInitConfig( image_meta_folder: str, target_image_mode: vkit.element.image.ImageMode = <ImageMode.RGB: 'rgb'>, enable_cache: bool = False, prob_use_only_the_anchor_image: float = 0.7, prob_rotate_image: float = 0.5, sigma: float = 3.0, init_segment_width_min_ratio: float = 0.25)
2def __init__(self, image_meta_folder, target_image_mode=attr_dict['target_image_mode'].default, enable_cache=attr_dict['enable_cache'].default, prob_use_only_the_anchor_image=attr_dict['prob_use_only_the_anchor_image'].default, prob_rotate_image=attr_dict['prob_rotate_image'].default, sigma=attr_dict['sigma'].default, init_segment_width_min_ratio=attr_dict['init_segment_width_min_ratio'].default): 3 self.image_meta_folder = image_meta_folder 4 self.target_image_mode = target_image_mode 5 self.enable_cache = enable_cache 6 self.prob_use_only_the_anchor_image = prob_use_only_the_anchor_image 7 self.prob_rotate_image = prob_rotate_image 8 self.sigma = sigma 9 self.init_segment_width_min_ratio = init_segment_width_min_ratio
Method generated by attrs for class ImageCombinerEngineInitConfig.
class
PrioritizedSegment:
class
ImageCombinerEngine(vkit.engine.interface.Engine[vkit.engine.image.combiner.ImageCombinerEngineInitConfig, vkit.engine.interface.NoneTypeEngineInitResource, vkit.engine.image.type.ImageEngineRunConfig, vkit.element.image.Image]):
92class ImageCombinerEngine( 93 Engine[ 94 ImageCombinerEngineInitConfig, 95 NoneTypeEngineInitResource, 96 ImageEngineRunConfig, 97 Image, 98 ] 99): # yapf: disable 100 101 @classmethod 102 def get_type_name(cls) -> str: 103 return 'combiner' 104 105 def __init__( 106 self, 107 init_config: ImageCombinerEngineInitConfig, 108 init_resource: Optional[NoneTypeEngineInitResource] = None, 109 ): 110 super().__init__(init_config, init_resource) 111 112 self.image_metas = load_image_metas_from_folder(init_config.image_meta_folder) 113 self.image_metas = sorted( 114 self.image_metas, 115 key=lambda meta: meta.grayscale_mean, 116 ) 117 self.image_metas_grayscale_means = [ 118 image_meta.grayscale_mean for image_meta in self.image_metas 119 ] 120 self.enable_cache = init_config.enable_cache 121 self.image_file_to_cache_image: Dict[str, Image] = {} 122 123 def sample_image_metas_based_on_random_anchor( 124 self, 125 run_config: ImageEngineRunConfig, 126 rng: RandomGenerator, 127 ): 128 # Get candidates based on anchor. 129 anchor_image_meta = rng_choice(rng, self.image_metas) 130 131 if rng.random() < self.init_config.prob_use_only_the_anchor_image: 132 return [anchor_image_meta] 133 134 else: 135 grayscale_std = anchor_image_meta.grayscale_std 136 grayscale_mean = anchor_image_meta.grayscale_mean 137 138 grayscale_begin = round(grayscale_mean - self.init_config.sigma * grayscale_std) 139 grayscale_end = round(grayscale_mean + self.init_config.sigma * grayscale_std) 140 141 index_begin = bisect.bisect_left(self.image_metas_grayscale_means, x=grayscale_begin) 142 index_end = bisect.bisect_right(self.image_metas_grayscale_means, x=grayscale_end) 143 image_metas = self.image_metas[index_begin:index_end] 144 assert image_metas 145 return image_metas 146 147 @classmethod 148 def fill_np_edge_mask( 149 cls, 150 np_edge_mask: np.ndarray, 151 height: int, 152 width: int, 153 gaussian_blur_half_kernel_size: int, 154 up: int, 155 down: int, 156 left: int, 157 right: int, 158 ): 159 # Fill up. 160 up_min = max(0, up - gaussian_blur_half_kernel_size) 161 up_max = min(height - 1, up + gaussian_blur_half_kernel_size) 162 np_edge_mask[up_min:up_max + 1, left:right + 1] = 1 163 164 # Fill down. 165 down_min = max(0, down - gaussian_blur_half_kernel_size) 166 down_max = min(height - 1, down + gaussian_blur_half_kernel_size) 167 np_edge_mask[down_min:down_max + 1, left:right + 1] = 1 168 169 # Fill left. 170 left_min = max(0, left - gaussian_blur_half_kernel_size) 171 left_max = min(width - 1, left + gaussian_blur_half_kernel_size) 172 np_edge_mask[up:down + 1, left_min:left_max + 1] = 1 173 174 # Fill right. 175 right_min = max(0, right - gaussian_blur_half_kernel_size) 176 right_max = min(width - 1, right + gaussian_blur_half_kernel_size) 177 np_edge_mask[up:down + 1, right_min:right_max + 1] = 1 178 179 def synthesize_image( 180 self, 181 run_config: ImageEngineRunConfig, 182 image_metas: Sequence[ImageMeta], 183 rng: RandomGenerator, 184 ): 185 height = run_config.height 186 width = run_config.width 187 188 mat = np.zeros((height, width, 3), dtype=np.uint8) 189 edge_mask = Mask.from_shape((height, width)) 190 gaussian_blur_half_kernel_size = self.init_config.gaussian_blur_kernel_size // 2 + 1 191 192 # Initialize segments. 193 priority_queue: List[PrioritizedSegment] = [] 194 segment_width_min = int( 195 np.clip( 196 round(self.init_config.init_segment_width_min_ratio * width), 197 1, 198 width - 1, 199 ) 200 ) 201 left = 0 202 while left + segment_width_min - 1 < width: 203 right = rng.integers( 204 left + segment_width_min - 1, 205 width, 206 ) 207 if right + 1 - left < segment_width_min or width - right - 1 < segment_width_min: 208 break 209 priority_queue.append(PrioritizedSegment( 210 y=0, 211 left=left, 212 right=right, 213 )) 214 left = right + 1 215 if left < width: 216 priority_queue.append(PrioritizedSegment( 217 y=0, 218 left=left, 219 right=width - 1, 220 )) 221 222 # For random rotation. 223 image_file_to_rotate_flag: Dict[str, bool] = {} 224 225 while priority_queue: 226 # Pop a segment 227 cur_segment = heapq.heappop(priority_queue) 228 229 # Deal with connection. 230 segments: List[PrioritizedSegment] = [] 231 while priority_queue and priority_queue[0].y == cur_segment.y: 232 segments.append(heapq.heappop(priority_queue)) 233 234 if segments: 235 segments.append(cur_segment) 236 segments = sorted(segments, key=lambda segment: segment.left) 237 cur_segment_idx = -1 238 for segment_idx, segment in enumerate(segments): 239 if segment.left == cur_segment.left and segment.right == cur_segment.right: 240 cur_segment_idx = segment_idx 241 break 242 assert cur_segment_idx >= 0 243 244 begin = cur_segment_idx 245 while begin > 0 and segments[begin - 1].right + 1 == segments[begin].left: 246 begin -= 1 247 end = cur_segment_idx 248 while end + 1 < len(segments) and segments[end].right + 1 == segments[end + 1].left: 249 end += 1 250 251 if begin < end: 252 # Update the current segment. 253 cur_segment.left = segments[begin].left 254 cur_segment.right = segments[end].right 255 256 # Push back. 257 for segment in segments[:begin]: 258 heapq.heappush(priority_queue, segment) 259 for segment in segments[end + 1:]: 260 heapq.heappush(priority_queue, segment) 261 262 # Load image. 263 image_meta = rng_choice(rng, image_metas) 264 265 if self.enable_cache and image_meta.image_file in self.image_file_to_cache_image: 266 segment_image = self.image_file_to_cache_image[image_meta.image_file] 267 268 else: 269 segment_image = Image.from_file(image_meta.image_file).to_target_mode_image( 270 self.init_config.target_image_mode 271 ) 272 273 if image_meta.image_file not in image_file_to_rotate_flag: 274 rotate_flag = (rng.random() < self.init_config.prob_rotate_image) 275 image_file_to_rotate_flag[image_meta.image_file] = rotate_flag 276 277 if image_file_to_rotate_flag[image_meta.image_file]: 278 segment_image = rotate.distort_image( 279 {'angle': 90}, 280 image=segment_image, 281 ) 282 283 if self.enable_cache: 284 self.image_file_to_cache_image[image_meta.image_file] = segment_image 285 286 # Fill image and edge mask. 287 up = cur_segment.y 288 down = min(height - 1, up + segment_image.height - 1) 289 left = cur_segment.left 290 right = min(cur_segment.right, left + segment_image.width - 1) 291 mat[up:down + 1, left:right + 1] = \ 292 segment_image.mat[:down + 1 - up, :right + 1 - left] 293 294 with edge_mask.writable_context: 295 self.fill_np_edge_mask( 296 np_edge_mask=edge_mask.mat, 297 height=height, 298 width=width, 299 gaussian_blur_half_kernel_size=gaussian_blur_half_kernel_size, 300 up=up, 301 down=down, 302 left=left, 303 right=right, 304 ) 305 306 # Update segments. 307 if right == cur_segment.right: 308 # Reach the current right end. 309 cur_segment.y = down + 1 310 if cur_segment.y < height: 311 heapq.heappush(priority_queue, cur_segment) 312 else: 313 # Not reaching the right end. 314 assert right < cur_segment.right 315 new_segment = PrioritizedSegment( 316 y=down + 1, 317 left=left, 318 right=right, 319 ) 320 if new_segment.y < height: 321 heapq.heappush(priority_queue, new_segment) 322 323 cur_segment.left = right + 1 324 heapq.heappush(priority_queue, cur_segment) 325 326 # Apply gaussian blur. 327 gaussian_blur_sigma = gaussian_blur_half_kernel_size / 3 328 gaussian_blur_ksize = (self.init_config.gaussian_blur_kernel_size,) * 2 329 edge_mask.fill_np_array( 330 mat, 331 cv.GaussianBlur(mat, gaussian_blur_ksize, gaussian_blur_sigma), 332 ) 333 334 return Image(mat=mat) 335 336 def run( 337 self, 338 run_config: ImageEngineRunConfig, 339 rng: Optional[RandomGenerator] = None, 340 ) -> Image: 341 assert rng is not None 342 343 assert not run_config.disable_resizing 344 image_metas = self.sample_image_metas_based_on_random_anchor(run_config, rng) 345 return self.synthesize_image(run_config, image_metas, rng)
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
ImageCombinerEngine( init_config: vkit.engine.image.combiner.ImageCombinerEngineInitConfig, init_resource: Union[vkit.engine.interface.NoneTypeEngineInitResource, NoneType] = None)
105 def __init__( 106 self, 107 init_config: ImageCombinerEngineInitConfig, 108 init_resource: Optional[NoneTypeEngineInitResource] = None, 109 ): 110 super().__init__(init_config, init_resource) 111 112 self.image_metas = load_image_metas_from_folder(init_config.image_meta_folder) 113 self.image_metas = sorted( 114 self.image_metas, 115 key=lambda meta: meta.grayscale_mean, 116 ) 117 self.image_metas_grayscale_means = [ 118 image_meta.grayscale_mean for image_meta in self.image_metas 119 ] 120 self.enable_cache = init_config.enable_cache 121 self.image_file_to_cache_image: Dict[str, Image] = {}
def
sample_image_metas_based_on_random_anchor( self, run_config: vkit.engine.image.type.ImageEngineRunConfig, rng: numpy.random._generator.Generator):
123 def sample_image_metas_based_on_random_anchor( 124 self, 125 run_config: ImageEngineRunConfig, 126 rng: RandomGenerator, 127 ): 128 # Get candidates based on anchor. 129 anchor_image_meta = rng_choice(rng, self.image_metas) 130 131 if rng.random() < self.init_config.prob_use_only_the_anchor_image: 132 return [anchor_image_meta] 133 134 else: 135 grayscale_std = anchor_image_meta.grayscale_std 136 grayscale_mean = anchor_image_meta.grayscale_mean 137 138 grayscale_begin = round(grayscale_mean - self.init_config.sigma * grayscale_std) 139 grayscale_end = round(grayscale_mean + self.init_config.sigma * grayscale_std) 140 141 index_begin = bisect.bisect_left(self.image_metas_grayscale_means, x=grayscale_begin) 142 index_end = bisect.bisect_right(self.image_metas_grayscale_means, x=grayscale_end) 143 image_metas = self.image_metas[index_begin:index_end] 144 assert image_metas 145 return image_metas
@classmethod
def
fill_np_edge_mask( cls, np_edge_mask: numpy.ndarray, height: int, width: int, gaussian_blur_half_kernel_size: int, up: int, down: int, left: int, right: int):
147 @classmethod 148 def fill_np_edge_mask( 149 cls, 150 np_edge_mask: np.ndarray, 151 height: int, 152 width: int, 153 gaussian_blur_half_kernel_size: int, 154 up: int, 155 down: int, 156 left: int, 157 right: int, 158 ): 159 # Fill up. 160 up_min = max(0, up - gaussian_blur_half_kernel_size) 161 up_max = min(height - 1, up + gaussian_blur_half_kernel_size) 162 np_edge_mask[up_min:up_max + 1, left:right + 1] = 1 163 164 # Fill down. 165 down_min = max(0, down - gaussian_blur_half_kernel_size) 166 down_max = min(height - 1, down + gaussian_blur_half_kernel_size) 167 np_edge_mask[down_min:down_max + 1, left:right + 1] = 1 168 169 # Fill left. 170 left_min = max(0, left - gaussian_blur_half_kernel_size) 171 left_max = min(width - 1, left + gaussian_blur_half_kernel_size) 172 np_edge_mask[up:down + 1, left_min:left_max + 1] = 1 173 174 # Fill right. 175 right_min = max(0, right - gaussian_blur_half_kernel_size) 176 right_max = min(width - 1, right + gaussian_blur_half_kernel_size) 177 np_edge_mask[up:down + 1, right_min:right_max + 1] = 1
def
synthesize_image( self, run_config: vkit.engine.image.type.ImageEngineRunConfig, image_metas: Sequence[vkit.engine.image.combiner.ImageMeta], rng: numpy.random._generator.Generator):
179 def synthesize_image( 180 self, 181 run_config: ImageEngineRunConfig, 182 image_metas: Sequence[ImageMeta], 183 rng: RandomGenerator, 184 ): 185 height = run_config.height 186 width = run_config.width 187 188 mat = np.zeros((height, width, 3), dtype=np.uint8) 189 edge_mask = Mask.from_shape((height, width)) 190 gaussian_blur_half_kernel_size = self.init_config.gaussian_blur_kernel_size // 2 + 1 191 192 # Initialize segments. 193 priority_queue: List[PrioritizedSegment] = [] 194 segment_width_min = int( 195 np.clip( 196 round(self.init_config.init_segment_width_min_ratio * width), 197 1, 198 width - 1, 199 ) 200 ) 201 left = 0 202 while left + segment_width_min - 1 < width: 203 right = rng.integers( 204 left + segment_width_min - 1, 205 width, 206 ) 207 if right + 1 - left < segment_width_min or width - right - 1 < segment_width_min: 208 break 209 priority_queue.append(PrioritizedSegment( 210 y=0, 211 left=left, 212 right=right, 213 )) 214 left = right + 1 215 if left < width: 216 priority_queue.append(PrioritizedSegment( 217 y=0, 218 left=left, 219 right=width - 1, 220 )) 221 222 # For random rotation. 223 image_file_to_rotate_flag: Dict[str, bool] = {} 224 225 while priority_queue: 226 # Pop a segment 227 cur_segment = heapq.heappop(priority_queue) 228 229 # Deal with connection. 230 segments: List[PrioritizedSegment] = [] 231 while priority_queue and priority_queue[0].y == cur_segment.y: 232 segments.append(heapq.heappop(priority_queue)) 233 234 if segments: 235 segments.append(cur_segment) 236 segments = sorted(segments, key=lambda segment: segment.left) 237 cur_segment_idx = -1 238 for segment_idx, segment in enumerate(segments): 239 if segment.left == cur_segment.left and segment.right == cur_segment.right: 240 cur_segment_idx = segment_idx 241 break 242 assert cur_segment_idx >= 0 243 244 begin = cur_segment_idx 245 while begin > 0 and segments[begin - 1].right + 1 == segments[begin].left: 246 begin -= 1 247 end = cur_segment_idx 248 while end + 1 < len(segments) and segments[end].right + 1 == segments[end + 1].left: 249 end += 1 250 251 if begin < end: 252 # Update the current segment. 253 cur_segment.left = segments[begin].left 254 cur_segment.right = segments[end].right 255 256 # Push back. 257 for segment in segments[:begin]: 258 heapq.heappush(priority_queue, segment) 259 for segment in segments[end + 1:]: 260 heapq.heappush(priority_queue, segment) 261 262 # Load image. 263 image_meta = rng_choice(rng, image_metas) 264 265 if self.enable_cache and image_meta.image_file in self.image_file_to_cache_image: 266 segment_image = self.image_file_to_cache_image[image_meta.image_file] 267 268 else: 269 segment_image = Image.from_file(image_meta.image_file).to_target_mode_image( 270 self.init_config.target_image_mode 271 ) 272 273 if image_meta.image_file not in image_file_to_rotate_flag: 274 rotate_flag = (rng.random() < self.init_config.prob_rotate_image) 275 image_file_to_rotate_flag[image_meta.image_file] = rotate_flag 276 277 if image_file_to_rotate_flag[image_meta.image_file]: 278 segment_image = rotate.distort_image( 279 {'angle': 90}, 280 image=segment_image, 281 ) 282 283 if self.enable_cache: 284 self.image_file_to_cache_image[image_meta.image_file] = segment_image 285 286 # Fill image and edge mask. 287 up = cur_segment.y 288 down = min(height - 1, up + segment_image.height - 1) 289 left = cur_segment.left 290 right = min(cur_segment.right, left + segment_image.width - 1) 291 mat[up:down + 1, left:right + 1] = \ 292 segment_image.mat[:down + 1 - up, :right + 1 - left] 293 294 with edge_mask.writable_context: 295 self.fill_np_edge_mask( 296 np_edge_mask=edge_mask.mat, 297 height=height, 298 width=width, 299 gaussian_blur_half_kernel_size=gaussian_blur_half_kernel_size, 300 up=up, 301 down=down, 302 left=left, 303 right=right, 304 ) 305 306 # Update segments. 307 if right == cur_segment.right: 308 # Reach the current right end. 309 cur_segment.y = down + 1 310 if cur_segment.y < height: 311 heapq.heappush(priority_queue, cur_segment) 312 else: 313 # Not reaching the right end. 314 assert right < cur_segment.right 315 new_segment = PrioritizedSegment( 316 y=down + 1, 317 left=left, 318 right=right, 319 ) 320 if new_segment.y < height: 321 heapq.heappush(priority_queue, new_segment) 322 323 cur_segment.left = right + 1 324 heapq.heappush(priority_queue, cur_segment) 325 326 # Apply gaussian blur. 327 gaussian_blur_sigma = gaussian_blur_half_kernel_size / 3 328 gaussian_blur_ksize = (self.init_config.gaussian_blur_kernel_size,) * 2 329 edge_mask.fill_np_array( 330 mat, 331 cv.GaussianBlur(mat, gaussian_blur_ksize, gaussian_blur_sigma), 332 ) 333 334 return Image(mat=mat)
def
run( self, run_config: vkit.engine.image.type.ImageEngineRunConfig, rng: Union[numpy.random._generator.Generator, NoneType] = None) -> vkit.element.image.Image:
336 def run( 337 self, 338 run_config: ImageEngineRunConfig, 339 rng: Optional[RandomGenerator] = None, 340 ) -> Image: 341 assert rng is not None 342 343 assert not run_config.disable_resizing 344 image_metas = self.sample_image_metas_based_on_random_anchor(run_config, rng) 345 return self.synthesize_image(run_config, image_metas, rng)