vkit.engine.image.combiner
1# Copyright 2022 vkit-x Administrator. All Rights Reserved. 2# 3# This project (vkit-x/vkit) is dual-licensed under commercial and SSPL licenses. 4# 5# The commercial license gives you the full rights to create and distribute software 6# on your own terms without any SSPL license obligations. For more information, 7# please see the "LICENSE_COMMERCIAL.txt" file. 8# 9# This project is also available under Server Side Public License (SSPL). 10# The SSPL licensing is ideal for use cases such as open source projects with 11# SSPL distribution, student/academic purposes, hobby projects, internal research 12# projects without external distribution, or other projects where all SSPL 13# obligations can be met. For more information, please see the "LICENSE_SSPL.txt" file. 14from typing import Sequence, List, Dict, Optional 15import bisect 16import heapq 17 18import attrs 19from numpy.random import Generator as RandomGenerator 20import numpy as np 21import cv2 as cv 22import iolite as io 23 24from vkit.utility import rng_choice, read_json_file 25from vkit.element import Image, ImageMode, Mask 26from vkit.engine.interface import ( 27 Engine, 28 EngineExecutorFactory, 29 NoneTypeEngineInitResource, 30) 31from .type import ImageEngineRunConfig 32 33 34@attrs.define(frozen=True) 35class ImageMeta: 36 image_file: str 37 grayscale_mean: float 38 grayscale_std: float 39 40 41class FolderTree: 42 IMAGE = 'image' 43 METAS_JSON = 'metas.json' 44 45 46def load_image_metas_from_folder(folder: str): 47 in_fd = io.folder(folder, expandvars=True, exists=True) 48 image_fd = io.folder( 49 in_fd / FolderTree.IMAGE, 50 exists=True, 51 ) 52 metas_json = io.file( 53 in_fd / FolderTree.METAS_JSON, 54 exists=True, 55 ) 56 57 image_metas: List[ImageMeta] = [] 58 for meta in read_json_file(metas_json): 59 image_file = io.file(image_fd / meta['image_file'], exists=True) 60 image_metas.append( 61 ImageMeta( 62 image_file=str(image_file), 63 grayscale_mean=meta['grayscale_mean'], 64 grayscale_std=meta['grayscale_std'], 65 ) 66 ) 67 68 return image_metas 69 70 71@attrs.define 72class ImageCombinerEngineInitConfig: 73 image_meta_folder: str 74 target_image_mode: ImageMode = ImageMode.RGB 75 enable_cache: bool = False 76 sigma: float = 3.0 77 init_segment_width_min_ratio: float = 0.25 78 gaussian_blur_kernel_size = 5 79 80 81@attrs.define(order=True) 82class PrioritizedSegment: 83 y: int = attrs.field(order=True) 84 left: int = attrs.field(order=False) 85 right: int = attrs.field(order=False) 86 87 88class ImageCombinerEngine( 89 Engine[ 90 ImageCombinerEngineInitConfig, 91 NoneTypeEngineInitResource, 92 ImageEngineRunConfig, 93 Image, 94 ] 95): # yapf: disable 96 97 @classmethod 98 def get_type_name(cls) -> str: 99 return 'combiner' 100 101 def __init__( 102 self, 103 init_config: ImageCombinerEngineInitConfig, 104 init_resource: Optional[NoneTypeEngineInitResource] = None, 105 ): 106 super().__init__(init_config, init_resource) 107 108 self.image_metas = load_image_metas_from_folder(init_config.image_meta_folder) 109 self.image_metas = sorted( 110 self.image_metas, 111 key=lambda meta: meta.grayscale_mean, 112 ) 113 self.image_metas_grayscale_means = [ 114 image_meta.grayscale_mean for image_meta in self.image_metas 115 ] 116 self.enable_cache = init_config.enable_cache 117 self.image_file_to_cache_image: Dict[str, Image] = {} 118 119 def sample_image_metas_based_on_random_anchor( 120 self, 121 run_config: ImageEngineRunConfig, 122 rng: RandomGenerator, 123 ): 124 # Get candidates based on anchor. 125 anchor_image_meta = rng_choice(rng, self.image_metas) 126 grayscale_std = anchor_image_meta.grayscale_std 127 grayscale_mean = anchor_image_meta.grayscale_mean 128 129 grayscale_begin = round(grayscale_mean - self.init_config.sigma * grayscale_std) 130 grayscale_end = round(grayscale_mean + self.init_config.sigma * grayscale_std) 131 132 index_begin = bisect.bisect_left(self.image_metas_grayscale_means, x=grayscale_begin) 133 index_end = bisect.bisect_right(self.image_metas_grayscale_means, x=grayscale_end) 134 image_metas = self.image_metas[index_begin:index_end] 135 136 assert image_metas 137 return image_metas 138 139 @classmethod 140 def fill_np_edge_mask( 141 cls, 142 np_edge_mask: np.ndarray, 143 height: int, 144 width: int, 145 gaussian_blur_half_kernel_size: int, 146 up: int, 147 down: int, 148 left: int, 149 right: int, 150 ): 151 # Fill up. 152 up_min = max(0, up - gaussian_blur_half_kernel_size) 153 up_max = min(height - 1, up + gaussian_blur_half_kernel_size) 154 np_edge_mask[up_min:up_max + 1, left:right + 1] = 1 155 156 # Fill down. 157 down_min = max(0, down - gaussian_blur_half_kernel_size) 158 down_max = min(height - 1, down + gaussian_blur_half_kernel_size) 159 np_edge_mask[down_min:down_max + 1, left:right + 1] = 1 160 161 # Fill left. 162 left_min = max(0, left - gaussian_blur_half_kernel_size) 163 left_max = min(width - 1, left + gaussian_blur_half_kernel_size) 164 np_edge_mask[up:down + 1, left_min:left_max + 1] = 1 165 166 # Fill right. 167 right_min = max(0, right - gaussian_blur_half_kernel_size) 168 right_max = min(width - 1, right + gaussian_blur_half_kernel_size) 169 np_edge_mask[up:down + 1, right_min:right_max + 1] = 1 170 171 def synthesize_image( 172 self, 173 run_config: ImageEngineRunConfig, 174 image_metas: Sequence[ImageMeta], 175 rng: RandomGenerator, 176 ): 177 height = run_config.height 178 width = run_config.width 179 180 mat = np.zeros((height, width, 3), dtype=np.uint8) 181 edge_mask = Mask.from_shape((height, width)) 182 gaussian_blur_half_kernel_size = self.init_config.gaussian_blur_kernel_size // 2 + 1 183 184 # Initialize segments. 185 priority_queue: List[PrioritizedSegment] = [] 186 segment_width_min = int( 187 np.clip( 188 round(self.init_config.init_segment_width_min_ratio * width), 189 1, 190 width - 1, 191 ) 192 ) 193 left = 0 194 while left + segment_width_min - 1 < width: 195 right = rng.integers( 196 left + segment_width_min - 1, 197 width, 198 ) 199 if right + 1 - left < segment_width_min or width - right - 1 < segment_width_min: 200 break 201 priority_queue.append(PrioritizedSegment( 202 y=0, 203 left=left, 204 right=right, 205 )) 206 left = right + 1 207 if left < width: 208 priority_queue.append(PrioritizedSegment( 209 y=0, 210 left=left, 211 right=width - 1, 212 )) 213 214 while priority_queue: 215 # Pop a segment 216 cur_segment = heapq.heappop(priority_queue) 217 218 # Deal with connection. 219 segments: List[PrioritizedSegment] = [] 220 while priority_queue and priority_queue[0].y == cur_segment.y: 221 segments.append(heapq.heappop(priority_queue)) 222 223 if segments: 224 segments.append(cur_segment) 225 segments = sorted(segments, key=lambda segment: segment.left) 226 cur_segment_idx = -1 227 for segment_idx, segment in enumerate(segments): 228 if segment.left == cur_segment.left and segment.right == cur_segment.right: 229 cur_segment_idx = segment_idx 230 break 231 assert cur_segment_idx >= 0 232 233 begin = cur_segment_idx 234 while begin > 0 and segments[begin - 1].right + 1 == segments[begin].left: 235 begin -= 1 236 end = cur_segment_idx 237 while end + 1 < len(segments) and segments[end].right + 1 == segments[end + 1].left: 238 end += 1 239 240 if begin < end: 241 # Update the current segment. 242 cur_segment.left = segments[begin].left 243 cur_segment.right = segments[end].right 244 245 # Push back. 246 for segment in segments[:begin]: 247 heapq.heappush(priority_queue, segment) 248 for segment in segments[end + 1:]: 249 heapq.heappush(priority_queue, segment) 250 251 # Load image. 252 image_meta = rng_choice(rng, image_metas) 253 if self.enable_cache and image_meta.image_file in self.image_file_to_cache_image: 254 segment_image = self.image_file_to_cache_image[image_meta.image_file] 255 else: 256 segment_image = Image.from_file(image_meta.image_file).to_target_mode_image( 257 self.init_config.target_image_mode 258 ) 259 if self.enable_cache: 260 self.image_file_to_cache_image[image_meta.image_file] = segment_image 261 262 # Fill image and edge mask. 263 up = cur_segment.y 264 down = min(height - 1, up + segment_image.height - 1) 265 left = cur_segment.left 266 right = min(cur_segment.right, left + segment_image.width - 1) 267 mat[up:down + 1, left:right + 1] = \ 268 segment_image.mat[:down + 1 - up, :right + 1 - left] 269 270 with edge_mask.writable_context: 271 self.fill_np_edge_mask( 272 np_edge_mask=edge_mask.mat, 273 height=height, 274 width=width, 275 gaussian_blur_half_kernel_size=gaussian_blur_half_kernel_size, 276 up=up, 277 down=down, 278 left=left, 279 right=right, 280 ) 281 282 # Update segments. 283 if right == cur_segment.right: 284 # Reach the current right end. 285 cur_segment.y = down + 1 286 if cur_segment.y < height: 287 heapq.heappush(priority_queue, cur_segment) 288 else: 289 # Not reaching the right end. 290 assert right < cur_segment.right 291 new_segment = PrioritizedSegment( 292 y=down + 1, 293 left=left, 294 right=right, 295 ) 296 if new_segment.y < height: 297 heapq.heappush(priority_queue, new_segment) 298 299 cur_segment.left = right + 1 300 heapq.heappush(priority_queue, cur_segment) 301 302 # Apply gaussian blur. 303 gaussian_blur_sigma = gaussian_blur_half_kernel_size / 3 304 gaussian_blur_ksize = (self.init_config.gaussian_blur_kernel_size,) * 2 305 edge_mask.fill_np_array( 306 mat, 307 cv.GaussianBlur(mat, gaussian_blur_ksize, gaussian_blur_sigma), 308 ) 309 310 return Image(mat=mat) 311 312 def run(self, run_config: ImageEngineRunConfig, rng: RandomGenerator) -> Image: 313 assert not run_config.disable_resizing 314 image_metas = self.sample_image_metas_based_on_random_anchor(run_config, rng) 315 return self.synthesize_image(run_config, image_metas, rng) 316 317 318image_combiner_engine_executor_factory = EngineExecutorFactory(ImageCombinerEngine)
class
ImageMeta:
ImageMeta(image_file: str, grayscale_mean: float, grayscale_std: float)
2def __init__(self, image_file, grayscale_mean, grayscale_std): 3 _setattr(self, 'image_file', image_file) 4 _setattr(self, 'grayscale_mean', grayscale_mean) 5 _setattr(self, 'grayscale_std', grayscale_std)
Method generated by attrs for class ImageMeta.
class
FolderTree:
def
load_image_metas_from_folder(folder: str):
47def load_image_metas_from_folder(folder: str): 48 in_fd = io.folder(folder, expandvars=True, exists=True) 49 image_fd = io.folder( 50 in_fd / FolderTree.IMAGE, 51 exists=True, 52 ) 53 metas_json = io.file( 54 in_fd / FolderTree.METAS_JSON, 55 exists=True, 56 ) 57 58 image_metas: List[ImageMeta] = [] 59 for meta in read_json_file(metas_json): 60 image_file = io.file(image_fd / meta['image_file'], exists=True) 61 image_metas.append( 62 ImageMeta( 63 image_file=str(image_file), 64 grayscale_mean=meta['grayscale_mean'], 65 grayscale_std=meta['grayscale_std'], 66 ) 67 ) 68 69 return image_metas
class
ImageCombinerEngineInitConfig:
73class ImageCombinerEngineInitConfig: 74 image_meta_folder: str 75 target_image_mode: ImageMode = ImageMode.RGB 76 enable_cache: bool = False 77 sigma: float = 3.0 78 init_segment_width_min_ratio: float = 0.25 79 gaussian_blur_kernel_size = 5
ImageCombinerEngineInitConfig( image_meta_folder: str, target_image_mode: vkit.element.image.ImageMode = <ImageMode.RGB: 'rgb'>, enable_cache: bool = False, sigma: float = 3.0, init_segment_width_min_ratio: float = 0.25)
2def __init__(self, image_meta_folder, target_image_mode=attr_dict['target_image_mode'].default, enable_cache=attr_dict['enable_cache'].default, sigma=attr_dict['sigma'].default, init_segment_width_min_ratio=attr_dict['init_segment_width_min_ratio'].default): 3 self.image_meta_folder = image_meta_folder 4 self.target_image_mode = target_image_mode 5 self.enable_cache = enable_cache 6 self.sigma = sigma 7 self.init_segment_width_min_ratio = init_segment_width_min_ratio
Method generated by attrs for class ImageCombinerEngineInitConfig.
class
PrioritizedSegment:
class
ImageCombinerEngine(vkit.engine.interface.Engine[vkit.engine.image.combiner.ImageCombinerEngineInitConfig, vkit.engine.interface.NoneTypeEngineInitResource, vkit.engine.image.type.ImageEngineRunConfig, vkit.element.image.Image]):
89class ImageCombinerEngine( 90 Engine[ 91 ImageCombinerEngineInitConfig, 92 NoneTypeEngineInitResource, 93 ImageEngineRunConfig, 94 Image, 95 ] 96): # yapf: disable 97 98 @classmethod 99 def get_type_name(cls) -> str: 100 return 'combiner' 101 102 def __init__( 103 self, 104 init_config: ImageCombinerEngineInitConfig, 105 init_resource: Optional[NoneTypeEngineInitResource] = None, 106 ): 107 super().__init__(init_config, init_resource) 108 109 self.image_metas = load_image_metas_from_folder(init_config.image_meta_folder) 110 self.image_metas = sorted( 111 self.image_metas, 112 key=lambda meta: meta.grayscale_mean, 113 ) 114 self.image_metas_grayscale_means = [ 115 image_meta.grayscale_mean for image_meta in self.image_metas 116 ] 117 self.enable_cache = init_config.enable_cache 118 self.image_file_to_cache_image: Dict[str, Image] = {} 119 120 def sample_image_metas_based_on_random_anchor( 121 self, 122 run_config: ImageEngineRunConfig, 123 rng: RandomGenerator, 124 ): 125 # Get candidates based on anchor. 126 anchor_image_meta = rng_choice(rng, self.image_metas) 127 grayscale_std = anchor_image_meta.grayscale_std 128 grayscale_mean = anchor_image_meta.grayscale_mean 129 130 grayscale_begin = round(grayscale_mean - self.init_config.sigma * grayscale_std) 131 grayscale_end = round(grayscale_mean + self.init_config.sigma * grayscale_std) 132 133 index_begin = bisect.bisect_left(self.image_metas_grayscale_means, x=grayscale_begin) 134 index_end = bisect.bisect_right(self.image_metas_grayscale_means, x=grayscale_end) 135 image_metas = self.image_metas[index_begin:index_end] 136 137 assert image_metas 138 return image_metas 139 140 @classmethod 141 def fill_np_edge_mask( 142 cls, 143 np_edge_mask: np.ndarray, 144 height: int, 145 width: int, 146 gaussian_blur_half_kernel_size: int, 147 up: int, 148 down: int, 149 left: int, 150 right: int, 151 ): 152 # Fill up. 153 up_min = max(0, up - gaussian_blur_half_kernel_size) 154 up_max = min(height - 1, up + gaussian_blur_half_kernel_size) 155 np_edge_mask[up_min:up_max + 1, left:right + 1] = 1 156 157 # Fill down. 158 down_min = max(0, down - gaussian_blur_half_kernel_size) 159 down_max = min(height - 1, down + gaussian_blur_half_kernel_size) 160 np_edge_mask[down_min:down_max + 1, left:right + 1] = 1 161 162 # Fill left. 163 left_min = max(0, left - gaussian_blur_half_kernel_size) 164 left_max = min(width - 1, left + gaussian_blur_half_kernel_size) 165 np_edge_mask[up:down + 1, left_min:left_max + 1] = 1 166 167 # Fill right. 168 right_min = max(0, right - gaussian_blur_half_kernel_size) 169 right_max = min(width - 1, right + gaussian_blur_half_kernel_size) 170 np_edge_mask[up:down + 1, right_min:right_max + 1] = 1 171 172 def synthesize_image( 173 self, 174 run_config: ImageEngineRunConfig, 175 image_metas: Sequence[ImageMeta], 176 rng: RandomGenerator, 177 ): 178 height = run_config.height 179 width = run_config.width 180 181 mat = np.zeros((height, width, 3), dtype=np.uint8) 182 edge_mask = Mask.from_shape((height, width)) 183 gaussian_blur_half_kernel_size = self.init_config.gaussian_blur_kernel_size // 2 + 1 184 185 # Initialize segments. 186 priority_queue: List[PrioritizedSegment] = [] 187 segment_width_min = int( 188 np.clip( 189 round(self.init_config.init_segment_width_min_ratio * width), 190 1, 191 width - 1, 192 ) 193 ) 194 left = 0 195 while left + segment_width_min - 1 < width: 196 right = rng.integers( 197 left + segment_width_min - 1, 198 width, 199 ) 200 if right + 1 - left < segment_width_min or width - right - 1 < segment_width_min: 201 break 202 priority_queue.append(PrioritizedSegment( 203 y=0, 204 left=left, 205 right=right, 206 )) 207 left = right + 1 208 if left < width: 209 priority_queue.append(PrioritizedSegment( 210 y=0, 211 left=left, 212 right=width - 1, 213 )) 214 215 while priority_queue: 216 # Pop a segment 217 cur_segment = heapq.heappop(priority_queue) 218 219 # Deal with connection. 220 segments: List[PrioritizedSegment] = [] 221 while priority_queue and priority_queue[0].y == cur_segment.y: 222 segments.append(heapq.heappop(priority_queue)) 223 224 if segments: 225 segments.append(cur_segment) 226 segments = sorted(segments, key=lambda segment: segment.left) 227 cur_segment_idx = -1 228 for segment_idx, segment in enumerate(segments): 229 if segment.left == cur_segment.left and segment.right == cur_segment.right: 230 cur_segment_idx = segment_idx 231 break 232 assert cur_segment_idx >= 0 233 234 begin = cur_segment_idx 235 while begin > 0 and segments[begin - 1].right + 1 == segments[begin].left: 236 begin -= 1 237 end = cur_segment_idx 238 while end + 1 < len(segments) and segments[end].right + 1 == segments[end + 1].left: 239 end += 1 240 241 if begin < end: 242 # Update the current segment. 243 cur_segment.left = segments[begin].left 244 cur_segment.right = segments[end].right 245 246 # Push back. 247 for segment in segments[:begin]: 248 heapq.heappush(priority_queue, segment) 249 for segment in segments[end + 1:]: 250 heapq.heappush(priority_queue, segment) 251 252 # Load image. 253 image_meta = rng_choice(rng, image_metas) 254 if self.enable_cache and image_meta.image_file in self.image_file_to_cache_image: 255 segment_image = self.image_file_to_cache_image[image_meta.image_file] 256 else: 257 segment_image = Image.from_file(image_meta.image_file).to_target_mode_image( 258 self.init_config.target_image_mode 259 ) 260 if self.enable_cache: 261 self.image_file_to_cache_image[image_meta.image_file] = segment_image 262 263 # Fill image and edge mask. 264 up = cur_segment.y 265 down = min(height - 1, up + segment_image.height - 1) 266 left = cur_segment.left 267 right = min(cur_segment.right, left + segment_image.width - 1) 268 mat[up:down + 1, left:right + 1] = \ 269 segment_image.mat[:down + 1 - up, :right + 1 - left] 270 271 with edge_mask.writable_context: 272 self.fill_np_edge_mask( 273 np_edge_mask=edge_mask.mat, 274 height=height, 275 width=width, 276 gaussian_blur_half_kernel_size=gaussian_blur_half_kernel_size, 277 up=up, 278 down=down, 279 left=left, 280 right=right, 281 ) 282 283 # Update segments. 284 if right == cur_segment.right: 285 # Reach the current right end. 286 cur_segment.y = down + 1 287 if cur_segment.y < height: 288 heapq.heappush(priority_queue, cur_segment) 289 else: 290 # Not reaching the right end. 291 assert right < cur_segment.right 292 new_segment = PrioritizedSegment( 293 y=down + 1, 294 left=left, 295 right=right, 296 ) 297 if new_segment.y < height: 298 heapq.heappush(priority_queue, new_segment) 299 300 cur_segment.left = right + 1 301 heapq.heappush(priority_queue, cur_segment) 302 303 # Apply gaussian blur. 304 gaussian_blur_sigma = gaussian_blur_half_kernel_size / 3 305 gaussian_blur_ksize = (self.init_config.gaussian_blur_kernel_size,) * 2 306 edge_mask.fill_np_array( 307 mat, 308 cv.GaussianBlur(mat, gaussian_blur_ksize, gaussian_blur_sigma), 309 ) 310 311 return Image(mat=mat) 312 313 def run(self, run_config: ImageEngineRunConfig, rng: RandomGenerator) -> Image: 314 assert not run_config.disable_resizing 315 image_metas = self.sample_image_metas_based_on_random_anchor(run_config, rng) 316 return self.synthesize_image(run_config, image_metas, rng)
Abstract base class for generic types.
A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::
class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.
This class can then be used as follows::
def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default
ImageCombinerEngine( init_config: vkit.engine.image.combiner.ImageCombinerEngineInitConfig, init_resource: Union[vkit.engine.interface.NoneTypeEngineInitResource, NoneType] = None)
102 def __init__( 103 self, 104 init_config: ImageCombinerEngineInitConfig, 105 init_resource: Optional[NoneTypeEngineInitResource] = None, 106 ): 107 super().__init__(init_config, init_resource) 108 109 self.image_metas = load_image_metas_from_folder(init_config.image_meta_folder) 110 self.image_metas = sorted( 111 self.image_metas, 112 key=lambda meta: meta.grayscale_mean, 113 ) 114 self.image_metas_grayscale_means = [ 115 image_meta.grayscale_mean for image_meta in self.image_metas 116 ] 117 self.enable_cache = init_config.enable_cache 118 self.image_file_to_cache_image: Dict[str, Image] = {}
def
sample_image_metas_based_on_random_anchor( self, run_config: vkit.engine.image.type.ImageEngineRunConfig, rng: numpy.random._generator.Generator):
120 def sample_image_metas_based_on_random_anchor( 121 self, 122 run_config: ImageEngineRunConfig, 123 rng: RandomGenerator, 124 ): 125 # Get candidates based on anchor. 126 anchor_image_meta = rng_choice(rng, self.image_metas) 127 grayscale_std = anchor_image_meta.grayscale_std 128 grayscale_mean = anchor_image_meta.grayscale_mean 129 130 grayscale_begin = round(grayscale_mean - self.init_config.sigma * grayscale_std) 131 grayscale_end = round(grayscale_mean + self.init_config.sigma * grayscale_std) 132 133 index_begin = bisect.bisect_left(self.image_metas_grayscale_means, x=grayscale_begin) 134 index_end = bisect.bisect_right(self.image_metas_grayscale_means, x=grayscale_end) 135 image_metas = self.image_metas[index_begin:index_end] 136 137 assert image_metas 138 return image_metas
@classmethod
def
fill_np_edge_mask( cls, np_edge_mask: numpy.ndarray, height: int, width: int, gaussian_blur_half_kernel_size: int, up: int, down: int, left: int, right: int):
140 @classmethod 141 def fill_np_edge_mask( 142 cls, 143 np_edge_mask: np.ndarray, 144 height: int, 145 width: int, 146 gaussian_blur_half_kernel_size: int, 147 up: int, 148 down: int, 149 left: int, 150 right: int, 151 ): 152 # Fill up. 153 up_min = max(0, up - gaussian_blur_half_kernel_size) 154 up_max = min(height - 1, up + gaussian_blur_half_kernel_size) 155 np_edge_mask[up_min:up_max + 1, left:right + 1] = 1 156 157 # Fill down. 158 down_min = max(0, down - gaussian_blur_half_kernel_size) 159 down_max = min(height - 1, down + gaussian_blur_half_kernel_size) 160 np_edge_mask[down_min:down_max + 1, left:right + 1] = 1 161 162 # Fill left. 163 left_min = max(0, left - gaussian_blur_half_kernel_size) 164 left_max = min(width - 1, left + gaussian_blur_half_kernel_size) 165 np_edge_mask[up:down + 1, left_min:left_max + 1] = 1 166 167 # Fill right. 168 right_min = max(0, right - gaussian_blur_half_kernel_size) 169 right_max = min(width - 1, right + gaussian_blur_half_kernel_size) 170 np_edge_mask[up:down + 1, right_min:right_max + 1] = 1
def
synthesize_image( self, run_config: vkit.engine.image.type.ImageEngineRunConfig, image_metas: Sequence[vkit.engine.image.combiner.ImageMeta], rng: numpy.random._generator.Generator):
172 def synthesize_image( 173 self, 174 run_config: ImageEngineRunConfig, 175 image_metas: Sequence[ImageMeta], 176 rng: RandomGenerator, 177 ): 178 height = run_config.height 179 width = run_config.width 180 181 mat = np.zeros((height, width, 3), dtype=np.uint8) 182 edge_mask = Mask.from_shape((height, width)) 183 gaussian_blur_half_kernel_size = self.init_config.gaussian_blur_kernel_size // 2 + 1 184 185 # Initialize segments. 186 priority_queue: List[PrioritizedSegment] = [] 187 segment_width_min = int( 188 np.clip( 189 round(self.init_config.init_segment_width_min_ratio * width), 190 1, 191 width - 1, 192 ) 193 ) 194 left = 0 195 while left + segment_width_min - 1 < width: 196 right = rng.integers( 197 left + segment_width_min - 1, 198 width, 199 ) 200 if right + 1 - left < segment_width_min or width - right - 1 < segment_width_min: 201 break 202 priority_queue.append(PrioritizedSegment( 203 y=0, 204 left=left, 205 right=right, 206 )) 207 left = right + 1 208 if left < width: 209 priority_queue.append(PrioritizedSegment( 210 y=0, 211 left=left, 212 right=width - 1, 213 )) 214 215 while priority_queue: 216 # Pop a segment 217 cur_segment = heapq.heappop(priority_queue) 218 219 # Deal with connection. 220 segments: List[PrioritizedSegment] = [] 221 while priority_queue and priority_queue[0].y == cur_segment.y: 222 segments.append(heapq.heappop(priority_queue)) 223 224 if segments: 225 segments.append(cur_segment) 226 segments = sorted(segments, key=lambda segment: segment.left) 227 cur_segment_idx = -1 228 for segment_idx, segment in enumerate(segments): 229 if segment.left == cur_segment.left and segment.right == cur_segment.right: 230 cur_segment_idx = segment_idx 231 break 232 assert cur_segment_idx >= 0 233 234 begin = cur_segment_idx 235 while begin > 0 and segments[begin - 1].right + 1 == segments[begin].left: 236 begin -= 1 237 end = cur_segment_idx 238 while end + 1 < len(segments) and segments[end].right + 1 == segments[end + 1].left: 239 end += 1 240 241 if begin < end: 242 # Update the current segment. 243 cur_segment.left = segments[begin].left 244 cur_segment.right = segments[end].right 245 246 # Push back. 247 for segment in segments[:begin]: 248 heapq.heappush(priority_queue, segment) 249 for segment in segments[end + 1:]: 250 heapq.heappush(priority_queue, segment) 251 252 # Load image. 253 image_meta = rng_choice(rng, image_metas) 254 if self.enable_cache and image_meta.image_file in self.image_file_to_cache_image: 255 segment_image = self.image_file_to_cache_image[image_meta.image_file] 256 else: 257 segment_image = Image.from_file(image_meta.image_file).to_target_mode_image( 258 self.init_config.target_image_mode 259 ) 260 if self.enable_cache: 261 self.image_file_to_cache_image[image_meta.image_file] = segment_image 262 263 # Fill image and edge mask. 264 up = cur_segment.y 265 down = min(height - 1, up + segment_image.height - 1) 266 left = cur_segment.left 267 right = min(cur_segment.right, left + segment_image.width - 1) 268 mat[up:down + 1, left:right + 1] = \ 269 segment_image.mat[:down + 1 - up, :right + 1 - left] 270 271 with edge_mask.writable_context: 272 self.fill_np_edge_mask( 273 np_edge_mask=edge_mask.mat, 274 height=height, 275 width=width, 276 gaussian_blur_half_kernel_size=gaussian_blur_half_kernel_size, 277 up=up, 278 down=down, 279 left=left, 280 right=right, 281 ) 282 283 # Update segments. 284 if right == cur_segment.right: 285 # Reach the current right end. 286 cur_segment.y = down + 1 287 if cur_segment.y < height: 288 heapq.heappush(priority_queue, cur_segment) 289 else: 290 # Not reaching the right end. 291 assert right < cur_segment.right 292 new_segment = PrioritizedSegment( 293 y=down + 1, 294 left=left, 295 right=right, 296 ) 297 if new_segment.y < height: 298 heapq.heappush(priority_queue, new_segment) 299 300 cur_segment.left = right + 1 301 heapq.heappush(priority_queue, cur_segment) 302 303 # Apply gaussian blur. 304 gaussian_blur_sigma = gaussian_blur_half_kernel_size / 3 305 gaussian_blur_ksize = (self.init_config.gaussian_blur_kernel_size,) * 2 306 edge_mask.fill_np_array( 307 mat, 308 cv.GaussianBlur(mat, gaussian_blur_ksize, gaussian_blur_sigma), 309 ) 310 311 return Image(mat=mat)
def
run( self, run_config: vkit.engine.image.type.ImageEngineRunConfig, rng: numpy.random._generator.Generator) -> vkit.element.image.Image: