vkit.engine.image.combiner

  1# Copyright 2022 vkit-x Administrator. All Rights Reserved.
  2#
  3# This project (vkit-x/vkit) is dual-licensed under commercial and SSPL licenses.
  4#
  5# The commercial license gives you the full rights to create and distribute software
  6# on your own terms without any SSPL license obligations. For more information,
  7# please see the "LICENSE_COMMERCIAL.txt" file.
  8#
  9# This project is also available under Server Side Public License (SSPL).
 10# The SSPL licensing is ideal for use cases such as open source projects with
 11# SSPL distribution, student/academic purposes, hobby projects, internal research
 12# projects without external distribution, or other projects where all SSPL
 13# obligations can be met. For more information, please see the "LICENSE_SSPL.txt" file.
 14from typing import Sequence, List, Dict, Optional
 15import bisect
 16import heapq
 17
 18import attrs
 19from numpy.random import Generator as RandomGenerator
 20import numpy as np
 21import cv2 as cv
 22import iolite as io
 23
 24from vkit.utility import rng_choice, read_json_file
 25from vkit.element import Image, ImageMode, Mask
 26from vkit.engine.interface import (
 27    Engine,
 28    EngineExecutorFactory,
 29    NoneTypeEngineInitResource,
 30)
 31from .type import ImageEngineRunConfig
 32
 33
 34@attrs.define(frozen=True)
 35class ImageMeta:
 36    image_file: str
 37    grayscale_mean: float
 38    grayscale_std: float
 39
 40
 41class FolderTree:
 42    IMAGE = 'image'
 43    METAS_JSON = 'metas.json'
 44
 45
 46def load_image_metas_from_folder(folder: str):
 47    in_fd = io.folder(folder, expandvars=True, exists=True)
 48    image_fd = io.folder(
 49        in_fd / FolderTree.IMAGE,
 50        exists=True,
 51    )
 52    metas_json = io.file(
 53        in_fd / FolderTree.METAS_JSON,
 54        exists=True,
 55    )
 56
 57    image_metas: List[ImageMeta] = []
 58    for meta in read_json_file(metas_json):
 59        image_file = io.file(image_fd / meta['image_file'], exists=True)
 60        image_metas.append(
 61            ImageMeta(
 62                image_file=str(image_file),
 63                grayscale_mean=meta['grayscale_mean'],
 64                grayscale_std=meta['grayscale_std'],
 65            )
 66        )
 67
 68    return image_metas
 69
 70
 71@attrs.define
 72class ImageCombinerEngineInitConfig:
 73    image_meta_folder: str
 74    target_image_mode: ImageMode = ImageMode.RGB
 75    enable_cache: bool = False
 76    sigma: float = 3.0
 77    init_segment_width_min_ratio: float = 0.25
 78    gaussian_blur_kernel_size = 5
 79
 80
 81@attrs.define(order=True)
 82class PrioritizedSegment:
 83    y: int = attrs.field(order=True)
 84    left: int = attrs.field(order=False)
 85    right: int = attrs.field(order=False)
 86
 87
 88class ImageCombinerEngine(
 89    Engine[
 90        ImageCombinerEngineInitConfig,
 91        NoneTypeEngineInitResource,
 92        ImageEngineRunConfig,
 93        Image,
 94    ]
 95):  # yapf: disable
 96
 97    @classmethod
 98    def get_type_name(cls) -> str:
 99        return 'combiner'
100
101    def __init__(
102        self,
103        init_config: ImageCombinerEngineInitConfig,
104        init_resource: Optional[NoneTypeEngineInitResource] = None,
105    ):
106        super().__init__(init_config, init_resource)
107
108        self.image_metas = load_image_metas_from_folder(init_config.image_meta_folder)
109        self.image_metas = sorted(
110            self.image_metas,
111            key=lambda meta: meta.grayscale_mean,
112        )
113        self.image_metas_grayscale_means = [
114            image_meta.grayscale_mean for image_meta in self.image_metas
115        ]
116        self.enable_cache = init_config.enable_cache
117        self.image_file_to_cache_image: Dict[str, Image] = {}
118
119    def sample_image_metas_based_on_random_anchor(
120        self,
121        run_config: ImageEngineRunConfig,
122        rng: RandomGenerator,
123    ):
124        # Get candidates based on anchor.
125        anchor_image_meta = rng_choice(rng, self.image_metas)
126        grayscale_std = anchor_image_meta.grayscale_std
127        grayscale_mean = anchor_image_meta.grayscale_mean
128
129        grayscale_begin = round(grayscale_mean - self.init_config.sigma * grayscale_std)
130        grayscale_end = round(grayscale_mean + self.init_config.sigma * grayscale_std)
131
132        index_begin = bisect.bisect_left(self.image_metas_grayscale_means, x=grayscale_begin)
133        index_end = bisect.bisect_right(self.image_metas_grayscale_means, x=grayscale_end)
134        image_metas = self.image_metas[index_begin:index_end]
135
136        assert image_metas
137        return image_metas
138
139    @classmethod
140    def fill_np_edge_mask(
141        cls,
142        np_edge_mask: np.ndarray,
143        height: int,
144        width: int,
145        gaussian_blur_half_kernel_size: int,
146        up: int,
147        down: int,
148        left: int,
149        right: int,
150    ):
151        # Fill up.
152        up_min = max(0, up - gaussian_blur_half_kernel_size)
153        up_max = min(height - 1, up + gaussian_blur_half_kernel_size)
154        np_edge_mask[up_min:up_max + 1, left:right + 1] = 1
155
156        # Fill down.
157        down_min = max(0, down - gaussian_blur_half_kernel_size)
158        down_max = min(height - 1, down + gaussian_blur_half_kernel_size)
159        np_edge_mask[down_min:down_max + 1, left:right + 1] = 1
160
161        # Fill left.
162        left_min = max(0, left - gaussian_blur_half_kernel_size)
163        left_max = min(width - 1, left + gaussian_blur_half_kernel_size)
164        np_edge_mask[up:down + 1, left_min:left_max + 1] = 1
165
166        # Fill right.
167        right_min = max(0, right - gaussian_blur_half_kernel_size)
168        right_max = min(width - 1, right + gaussian_blur_half_kernel_size)
169        np_edge_mask[up:down + 1, right_min:right_max + 1] = 1
170
171    def synthesize_image(
172        self,
173        run_config: ImageEngineRunConfig,
174        image_metas: Sequence[ImageMeta],
175        rng: RandomGenerator,
176    ):
177        height = run_config.height
178        width = run_config.width
179
180        mat = np.zeros((height, width, 3), dtype=np.uint8)
181        edge_mask = Mask.from_shape((height, width))
182        gaussian_blur_half_kernel_size = self.init_config.gaussian_blur_kernel_size // 2 + 1
183
184        # Initialize segments.
185        priority_queue: List[PrioritizedSegment] = []
186        segment_width_min = int(
187            np.clip(
188                round(self.init_config.init_segment_width_min_ratio * width),
189                1,
190                width - 1,
191            )
192        )
193        left = 0
194        while left + segment_width_min - 1 < width:
195            right = rng.integers(
196                left + segment_width_min - 1,
197                width,
198            )
199            if right + 1 - left < segment_width_min or width - right - 1 < segment_width_min:
200                break
201            priority_queue.append(PrioritizedSegment(
202                y=0,
203                left=left,
204                right=right,
205            ))
206            left = right + 1
207        if left < width:
208            priority_queue.append(PrioritizedSegment(
209                y=0,
210                left=left,
211                right=width - 1,
212            ))
213
214        while priority_queue:
215            # Pop a segment
216            cur_segment = heapq.heappop(priority_queue)
217
218            # Deal with connection.
219            segments: List[PrioritizedSegment] = []
220            while priority_queue and priority_queue[0].y == cur_segment.y:
221                segments.append(heapq.heappop(priority_queue))
222
223            if segments:
224                segments.append(cur_segment)
225                segments = sorted(segments, key=lambda segment: segment.left)
226                cur_segment_idx = -1
227                for segment_idx, segment in enumerate(segments):
228                    if segment.left == cur_segment.left and segment.right == cur_segment.right:
229                        cur_segment_idx = segment_idx
230                        break
231                assert cur_segment_idx >= 0
232
233                begin = cur_segment_idx
234                while begin > 0 and segments[begin - 1].right + 1 == segments[begin].left:
235                    begin -= 1
236                end = cur_segment_idx
237                while end + 1 < len(segments) and segments[end].right + 1 == segments[end + 1].left:
238                    end += 1
239
240                if begin < end:
241                    # Update the current segment.
242                    cur_segment.left = segments[begin].left
243                    cur_segment.right = segments[end].right
244
245                # Push back.
246                for segment in segments[:begin]:
247                    heapq.heappush(priority_queue, segment)
248                for segment in segments[end + 1:]:
249                    heapq.heappush(priority_queue, segment)
250
251            # Load image.
252            image_meta = rng_choice(rng, image_metas)
253            if self.enable_cache and image_meta.image_file in self.image_file_to_cache_image:
254                segment_image = self.image_file_to_cache_image[image_meta.image_file]
255            else:
256                segment_image = Image.from_file(image_meta.image_file).to_target_mode_image(
257                    self.init_config.target_image_mode
258                )
259                if self.enable_cache:
260                    self.image_file_to_cache_image[image_meta.image_file] = segment_image
261
262            # Fill image and edge mask.
263            up = cur_segment.y
264            down = min(height - 1, up + segment_image.height - 1)
265            left = cur_segment.left
266            right = min(cur_segment.right, left + segment_image.width - 1)
267            mat[up:down + 1, left:right + 1] = \
268                segment_image.mat[:down + 1 - up, :right + 1 - left]
269
270            with edge_mask.writable_context:
271                self.fill_np_edge_mask(
272                    np_edge_mask=edge_mask.mat,
273                    height=height,
274                    width=width,
275                    gaussian_blur_half_kernel_size=gaussian_blur_half_kernel_size,
276                    up=up,
277                    down=down,
278                    left=left,
279                    right=right,
280                )
281
282            # Update segments.
283            if right == cur_segment.right:
284                # Reach the current right end.
285                cur_segment.y = down + 1
286                if cur_segment.y < height:
287                    heapq.heappush(priority_queue, cur_segment)
288            else:
289                # Not reaching the right end.
290                assert right < cur_segment.right
291                new_segment = PrioritizedSegment(
292                    y=down + 1,
293                    left=left,
294                    right=right,
295                )
296                if new_segment.y < height:
297                    heapq.heappush(priority_queue, new_segment)
298
299                cur_segment.left = right + 1
300                heapq.heappush(priority_queue, cur_segment)
301
302        # Apply gaussian blur.
303        gaussian_blur_sigma = gaussian_blur_half_kernel_size / 3
304        gaussian_blur_ksize = (self.init_config.gaussian_blur_kernel_size,) * 2
305        edge_mask.fill_np_array(
306            mat,
307            cv.GaussianBlur(mat, gaussian_blur_ksize, gaussian_blur_sigma),
308        )
309
310        return Image(mat=mat)
311
312    def run(self, run_config: ImageEngineRunConfig, rng: RandomGenerator) -> Image:
313        assert not run_config.disable_resizing
314        image_metas = self.sample_image_metas_based_on_random_anchor(run_config, rng)
315        return self.synthesize_image(run_config, image_metas, rng)
316
317
318image_combiner_engine_executor_factory = EngineExecutorFactory(ImageCombinerEngine)
class ImageMeta:
36class ImageMeta:
37    image_file: str
38    grayscale_mean: float
39    grayscale_std: float
ImageMeta(image_file: str, grayscale_mean: float, grayscale_std: float)
2def __init__(self, image_file, grayscale_mean, grayscale_std):
3    _setattr(self, 'image_file', image_file)
4    _setattr(self, 'grayscale_mean', grayscale_mean)
5    _setattr(self, 'grayscale_std', grayscale_std)

Method generated by attrs for class ImageMeta.

class FolderTree:
42class FolderTree:
43    IMAGE = 'image'
44    METAS_JSON = 'metas.json'
FolderTree()
def load_image_metas_from_folder(folder: str):
47def load_image_metas_from_folder(folder: str):
48    in_fd = io.folder(folder, expandvars=True, exists=True)
49    image_fd = io.folder(
50        in_fd / FolderTree.IMAGE,
51        exists=True,
52    )
53    metas_json = io.file(
54        in_fd / FolderTree.METAS_JSON,
55        exists=True,
56    )
57
58    image_metas: List[ImageMeta] = []
59    for meta in read_json_file(metas_json):
60        image_file = io.file(image_fd / meta['image_file'], exists=True)
61        image_metas.append(
62            ImageMeta(
63                image_file=str(image_file),
64                grayscale_mean=meta['grayscale_mean'],
65                grayscale_std=meta['grayscale_std'],
66            )
67        )
68
69    return image_metas
class ImageCombinerEngineInitConfig:
73class ImageCombinerEngineInitConfig:
74    image_meta_folder: str
75    target_image_mode: ImageMode = ImageMode.RGB
76    enable_cache: bool = False
77    sigma: float = 3.0
78    init_segment_width_min_ratio: float = 0.25
79    gaussian_blur_kernel_size = 5
ImageCombinerEngineInitConfig( image_meta_folder: str, target_image_mode: vkit.element.image.ImageMode = <ImageMode.RGB: 'rgb'>, enable_cache: bool = False, sigma: float = 3.0, init_segment_width_min_ratio: float = 0.25)
2def __init__(self, image_meta_folder, target_image_mode=attr_dict['target_image_mode'].default, enable_cache=attr_dict['enable_cache'].default, sigma=attr_dict['sigma'].default, init_segment_width_min_ratio=attr_dict['init_segment_width_min_ratio'].default):
3    self.image_meta_folder = image_meta_folder
4    self.target_image_mode = target_image_mode
5    self.enable_cache = enable_cache
6    self.sigma = sigma
7    self.init_segment_width_min_ratio = init_segment_width_min_ratio

Method generated by attrs for class ImageCombinerEngineInitConfig.

class PrioritizedSegment:
83class PrioritizedSegment:
84    y: int = attrs.field(order=True)
85    left: int = attrs.field(order=False)
86    right: int = attrs.field(order=False)
PrioritizedSegment(y: int, left: int, right: int)
2def __init__(self, y, left, right):
3    self.y = y
4    self.left = left
5    self.right = right

Method generated by attrs for class PrioritizedSegment.

 89class ImageCombinerEngine(
 90    Engine[
 91        ImageCombinerEngineInitConfig,
 92        NoneTypeEngineInitResource,
 93        ImageEngineRunConfig,
 94        Image,
 95    ]
 96):  # yapf: disable
 97
 98    @classmethod
 99    def get_type_name(cls) -> str:
100        return 'combiner'
101
102    def __init__(
103        self,
104        init_config: ImageCombinerEngineInitConfig,
105        init_resource: Optional[NoneTypeEngineInitResource] = None,
106    ):
107        super().__init__(init_config, init_resource)
108
109        self.image_metas = load_image_metas_from_folder(init_config.image_meta_folder)
110        self.image_metas = sorted(
111            self.image_metas,
112            key=lambda meta: meta.grayscale_mean,
113        )
114        self.image_metas_grayscale_means = [
115            image_meta.grayscale_mean for image_meta in self.image_metas
116        ]
117        self.enable_cache = init_config.enable_cache
118        self.image_file_to_cache_image: Dict[str, Image] = {}
119
120    def sample_image_metas_based_on_random_anchor(
121        self,
122        run_config: ImageEngineRunConfig,
123        rng: RandomGenerator,
124    ):
125        # Get candidates based on anchor.
126        anchor_image_meta = rng_choice(rng, self.image_metas)
127        grayscale_std = anchor_image_meta.grayscale_std
128        grayscale_mean = anchor_image_meta.grayscale_mean
129
130        grayscale_begin = round(grayscale_mean - self.init_config.sigma * grayscale_std)
131        grayscale_end = round(grayscale_mean + self.init_config.sigma * grayscale_std)
132
133        index_begin = bisect.bisect_left(self.image_metas_grayscale_means, x=grayscale_begin)
134        index_end = bisect.bisect_right(self.image_metas_grayscale_means, x=grayscale_end)
135        image_metas = self.image_metas[index_begin:index_end]
136
137        assert image_metas
138        return image_metas
139
140    @classmethod
141    def fill_np_edge_mask(
142        cls,
143        np_edge_mask: np.ndarray,
144        height: int,
145        width: int,
146        gaussian_blur_half_kernel_size: int,
147        up: int,
148        down: int,
149        left: int,
150        right: int,
151    ):
152        # Fill up.
153        up_min = max(0, up - gaussian_blur_half_kernel_size)
154        up_max = min(height - 1, up + gaussian_blur_half_kernel_size)
155        np_edge_mask[up_min:up_max + 1, left:right + 1] = 1
156
157        # Fill down.
158        down_min = max(0, down - gaussian_blur_half_kernel_size)
159        down_max = min(height - 1, down + gaussian_blur_half_kernel_size)
160        np_edge_mask[down_min:down_max + 1, left:right + 1] = 1
161
162        # Fill left.
163        left_min = max(0, left - gaussian_blur_half_kernel_size)
164        left_max = min(width - 1, left + gaussian_blur_half_kernel_size)
165        np_edge_mask[up:down + 1, left_min:left_max + 1] = 1
166
167        # Fill right.
168        right_min = max(0, right - gaussian_blur_half_kernel_size)
169        right_max = min(width - 1, right + gaussian_blur_half_kernel_size)
170        np_edge_mask[up:down + 1, right_min:right_max + 1] = 1
171
172    def synthesize_image(
173        self,
174        run_config: ImageEngineRunConfig,
175        image_metas: Sequence[ImageMeta],
176        rng: RandomGenerator,
177    ):
178        height = run_config.height
179        width = run_config.width
180
181        mat = np.zeros((height, width, 3), dtype=np.uint8)
182        edge_mask = Mask.from_shape((height, width))
183        gaussian_blur_half_kernel_size = self.init_config.gaussian_blur_kernel_size // 2 + 1
184
185        # Initialize segments.
186        priority_queue: List[PrioritizedSegment] = []
187        segment_width_min = int(
188            np.clip(
189                round(self.init_config.init_segment_width_min_ratio * width),
190                1,
191                width - 1,
192            )
193        )
194        left = 0
195        while left + segment_width_min - 1 < width:
196            right = rng.integers(
197                left + segment_width_min - 1,
198                width,
199            )
200            if right + 1 - left < segment_width_min or width - right - 1 < segment_width_min:
201                break
202            priority_queue.append(PrioritizedSegment(
203                y=0,
204                left=left,
205                right=right,
206            ))
207            left = right + 1
208        if left < width:
209            priority_queue.append(PrioritizedSegment(
210                y=0,
211                left=left,
212                right=width - 1,
213            ))
214
215        while priority_queue:
216            # Pop a segment
217            cur_segment = heapq.heappop(priority_queue)
218
219            # Deal with connection.
220            segments: List[PrioritizedSegment] = []
221            while priority_queue and priority_queue[0].y == cur_segment.y:
222                segments.append(heapq.heappop(priority_queue))
223
224            if segments:
225                segments.append(cur_segment)
226                segments = sorted(segments, key=lambda segment: segment.left)
227                cur_segment_idx = -1
228                for segment_idx, segment in enumerate(segments):
229                    if segment.left == cur_segment.left and segment.right == cur_segment.right:
230                        cur_segment_idx = segment_idx
231                        break
232                assert cur_segment_idx >= 0
233
234                begin = cur_segment_idx
235                while begin > 0 and segments[begin - 1].right + 1 == segments[begin].left:
236                    begin -= 1
237                end = cur_segment_idx
238                while end + 1 < len(segments) and segments[end].right + 1 == segments[end + 1].left:
239                    end += 1
240
241                if begin < end:
242                    # Update the current segment.
243                    cur_segment.left = segments[begin].left
244                    cur_segment.right = segments[end].right
245
246                # Push back.
247                for segment in segments[:begin]:
248                    heapq.heappush(priority_queue, segment)
249                for segment in segments[end + 1:]:
250                    heapq.heappush(priority_queue, segment)
251
252            # Load image.
253            image_meta = rng_choice(rng, image_metas)
254            if self.enable_cache and image_meta.image_file in self.image_file_to_cache_image:
255                segment_image = self.image_file_to_cache_image[image_meta.image_file]
256            else:
257                segment_image = Image.from_file(image_meta.image_file).to_target_mode_image(
258                    self.init_config.target_image_mode
259                )
260                if self.enable_cache:
261                    self.image_file_to_cache_image[image_meta.image_file] = segment_image
262
263            # Fill image and edge mask.
264            up = cur_segment.y
265            down = min(height - 1, up + segment_image.height - 1)
266            left = cur_segment.left
267            right = min(cur_segment.right, left + segment_image.width - 1)
268            mat[up:down + 1, left:right + 1] = \
269                segment_image.mat[:down + 1 - up, :right + 1 - left]
270
271            with edge_mask.writable_context:
272                self.fill_np_edge_mask(
273                    np_edge_mask=edge_mask.mat,
274                    height=height,
275                    width=width,
276                    gaussian_blur_half_kernel_size=gaussian_blur_half_kernel_size,
277                    up=up,
278                    down=down,
279                    left=left,
280                    right=right,
281                )
282
283            # Update segments.
284            if right == cur_segment.right:
285                # Reach the current right end.
286                cur_segment.y = down + 1
287                if cur_segment.y < height:
288                    heapq.heappush(priority_queue, cur_segment)
289            else:
290                # Not reaching the right end.
291                assert right < cur_segment.right
292                new_segment = PrioritizedSegment(
293                    y=down + 1,
294                    left=left,
295                    right=right,
296                )
297                if new_segment.y < height:
298                    heapq.heappush(priority_queue, new_segment)
299
300                cur_segment.left = right + 1
301                heapq.heappush(priority_queue, cur_segment)
302
303        # Apply gaussian blur.
304        gaussian_blur_sigma = gaussian_blur_half_kernel_size / 3
305        gaussian_blur_ksize = (self.init_config.gaussian_blur_kernel_size,) * 2
306        edge_mask.fill_np_array(
307            mat,
308            cv.GaussianBlur(mat, gaussian_blur_ksize, gaussian_blur_sigma),
309        )
310
311        return Image(mat=mat)
312
313    def run(self, run_config: ImageEngineRunConfig, rng: RandomGenerator) -> Image:
314        assert not run_config.disable_resizing
315        image_metas = self.sample_image_metas_based_on_random_anchor(run_config, rng)
316        return self.synthesize_image(run_config, image_metas, rng)

Abstract base class for generic types.

A generic type is typically declared by inheriting from this class parameterized with one or more type variables. For example, a generic mapping type might be defined as::

class Mapping(Generic[KT, VT]): def __getitem__(self, key: KT) -> VT: ... # Etc.

This class can then be used as follows::

def lookup_name(mapping: Mapping[KT, VT], key: KT, default: VT) -> VT: try: return mapping[key] except KeyError: return default

ImageCombinerEngine( init_config: vkit.engine.image.combiner.ImageCombinerEngineInitConfig, init_resource: Union[vkit.engine.interface.NoneTypeEngineInitResource, NoneType] = None)
102    def __init__(
103        self,
104        init_config: ImageCombinerEngineInitConfig,
105        init_resource: Optional[NoneTypeEngineInitResource] = None,
106    ):
107        super().__init__(init_config, init_resource)
108
109        self.image_metas = load_image_metas_from_folder(init_config.image_meta_folder)
110        self.image_metas = sorted(
111            self.image_metas,
112            key=lambda meta: meta.grayscale_mean,
113        )
114        self.image_metas_grayscale_means = [
115            image_meta.grayscale_mean for image_meta in self.image_metas
116        ]
117        self.enable_cache = init_config.enable_cache
118        self.image_file_to_cache_image: Dict[str, Image] = {}
@classmethod
def get_type_name(cls) -> str:
 98    @classmethod
 99    def get_type_name(cls) -> str:
100        return 'combiner'
def sample_image_metas_based_on_random_anchor( self, run_config: vkit.engine.image.type.ImageEngineRunConfig, rng: numpy.random._generator.Generator):
120    def sample_image_metas_based_on_random_anchor(
121        self,
122        run_config: ImageEngineRunConfig,
123        rng: RandomGenerator,
124    ):
125        # Get candidates based on anchor.
126        anchor_image_meta = rng_choice(rng, self.image_metas)
127        grayscale_std = anchor_image_meta.grayscale_std
128        grayscale_mean = anchor_image_meta.grayscale_mean
129
130        grayscale_begin = round(grayscale_mean - self.init_config.sigma * grayscale_std)
131        grayscale_end = round(grayscale_mean + self.init_config.sigma * grayscale_std)
132
133        index_begin = bisect.bisect_left(self.image_metas_grayscale_means, x=grayscale_begin)
134        index_end = bisect.bisect_right(self.image_metas_grayscale_means, x=grayscale_end)
135        image_metas = self.image_metas[index_begin:index_end]
136
137        assert image_metas
138        return image_metas
@classmethod
def fill_np_edge_mask( cls, np_edge_mask: numpy.ndarray, height: int, width: int, gaussian_blur_half_kernel_size: int, up: int, down: int, left: int, right: int):
140    @classmethod
141    def fill_np_edge_mask(
142        cls,
143        np_edge_mask: np.ndarray,
144        height: int,
145        width: int,
146        gaussian_blur_half_kernel_size: int,
147        up: int,
148        down: int,
149        left: int,
150        right: int,
151    ):
152        # Fill up.
153        up_min = max(0, up - gaussian_blur_half_kernel_size)
154        up_max = min(height - 1, up + gaussian_blur_half_kernel_size)
155        np_edge_mask[up_min:up_max + 1, left:right + 1] = 1
156
157        # Fill down.
158        down_min = max(0, down - gaussian_blur_half_kernel_size)
159        down_max = min(height - 1, down + gaussian_blur_half_kernel_size)
160        np_edge_mask[down_min:down_max + 1, left:right + 1] = 1
161
162        # Fill left.
163        left_min = max(0, left - gaussian_blur_half_kernel_size)
164        left_max = min(width - 1, left + gaussian_blur_half_kernel_size)
165        np_edge_mask[up:down + 1, left_min:left_max + 1] = 1
166
167        # Fill right.
168        right_min = max(0, right - gaussian_blur_half_kernel_size)
169        right_max = min(width - 1, right + gaussian_blur_half_kernel_size)
170        np_edge_mask[up:down + 1, right_min:right_max + 1] = 1
def synthesize_image( self, run_config: vkit.engine.image.type.ImageEngineRunConfig, image_metas: Sequence[vkit.engine.image.combiner.ImageMeta], rng: numpy.random._generator.Generator):
172    def synthesize_image(
173        self,
174        run_config: ImageEngineRunConfig,
175        image_metas: Sequence[ImageMeta],
176        rng: RandomGenerator,
177    ):
178        height = run_config.height
179        width = run_config.width
180
181        mat = np.zeros((height, width, 3), dtype=np.uint8)
182        edge_mask = Mask.from_shape((height, width))
183        gaussian_blur_half_kernel_size = self.init_config.gaussian_blur_kernel_size // 2 + 1
184
185        # Initialize segments.
186        priority_queue: List[PrioritizedSegment] = []
187        segment_width_min = int(
188            np.clip(
189                round(self.init_config.init_segment_width_min_ratio * width),
190                1,
191                width - 1,
192            )
193        )
194        left = 0
195        while left + segment_width_min - 1 < width:
196            right = rng.integers(
197                left + segment_width_min - 1,
198                width,
199            )
200            if right + 1 - left < segment_width_min or width - right - 1 < segment_width_min:
201                break
202            priority_queue.append(PrioritizedSegment(
203                y=0,
204                left=left,
205                right=right,
206            ))
207            left = right + 1
208        if left < width:
209            priority_queue.append(PrioritizedSegment(
210                y=0,
211                left=left,
212                right=width - 1,
213            ))
214
215        while priority_queue:
216            # Pop a segment
217            cur_segment = heapq.heappop(priority_queue)
218
219            # Deal with connection.
220            segments: List[PrioritizedSegment] = []
221            while priority_queue and priority_queue[0].y == cur_segment.y:
222                segments.append(heapq.heappop(priority_queue))
223
224            if segments:
225                segments.append(cur_segment)
226                segments = sorted(segments, key=lambda segment: segment.left)
227                cur_segment_idx = -1
228                for segment_idx, segment in enumerate(segments):
229                    if segment.left == cur_segment.left and segment.right == cur_segment.right:
230                        cur_segment_idx = segment_idx
231                        break
232                assert cur_segment_idx >= 0
233
234                begin = cur_segment_idx
235                while begin > 0 and segments[begin - 1].right + 1 == segments[begin].left:
236                    begin -= 1
237                end = cur_segment_idx
238                while end + 1 < len(segments) and segments[end].right + 1 == segments[end + 1].left:
239                    end += 1
240
241                if begin < end:
242                    # Update the current segment.
243                    cur_segment.left = segments[begin].left
244                    cur_segment.right = segments[end].right
245
246                # Push back.
247                for segment in segments[:begin]:
248                    heapq.heappush(priority_queue, segment)
249                for segment in segments[end + 1:]:
250                    heapq.heappush(priority_queue, segment)
251
252            # Load image.
253            image_meta = rng_choice(rng, image_metas)
254            if self.enable_cache and image_meta.image_file in self.image_file_to_cache_image:
255                segment_image = self.image_file_to_cache_image[image_meta.image_file]
256            else:
257                segment_image = Image.from_file(image_meta.image_file).to_target_mode_image(
258                    self.init_config.target_image_mode
259                )
260                if self.enable_cache:
261                    self.image_file_to_cache_image[image_meta.image_file] = segment_image
262
263            # Fill image and edge mask.
264            up = cur_segment.y
265            down = min(height - 1, up + segment_image.height - 1)
266            left = cur_segment.left
267            right = min(cur_segment.right, left + segment_image.width - 1)
268            mat[up:down + 1, left:right + 1] = \
269                segment_image.mat[:down + 1 - up, :right + 1 - left]
270
271            with edge_mask.writable_context:
272                self.fill_np_edge_mask(
273                    np_edge_mask=edge_mask.mat,
274                    height=height,
275                    width=width,
276                    gaussian_blur_half_kernel_size=gaussian_blur_half_kernel_size,
277                    up=up,
278                    down=down,
279                    left=left,
280                    right=right,
281                )
282
283            # Update segments.
284            if right == cur_segment.right:
285                # Reach the current right end.
286                cur_segment.y = down + 1
287                if cur_segment.y < height:
288                    heapq.heappush(priority_queue, cur_segment)
289            else:
290                # Not reaching the right end.
291                assert right < cur_segment.right
292                new_segment = PrioritizedSegment(
293                    y=down + 1,
294                    left=left,
295                    right=right,
296                )
297                if new_segment.y < height:
298                    heapq.heappush(priority_queue, new_segment)
299
300                cur_segment.left = right + 1
301                heapq.heappush(priority_queue, cur_segment)
302
303        # Apply gaussian blur.
304        gaussian_blur_sigma = gaussian_blur_half_kernel_size / 3
305        gaussian_blur_ksize = (self.init_config.gaussian_blur_kernel_size,) * 2
306        edge_mask.fill_np_array(
307            mat,
308            cv.GaussianBlur(mat, gaussian_blur_ksize, gaussian_blur_sigma),
309        )
310
311        return Image(mat=mat)
def run( self, run_config: vkit.engine.image.type.ImageEngineRunConfig, rng: numpy.random._generator.Generator) -> vkit.element.image.Image:
313    def run(self, run_config: ImageEngineRunConfig, rng: RandomGenerator) -> Image:
314        assert not run_config.disable_resizing
315        image_metas = self.sample_image_metas_based_on_random_anchor(run_config, rng)
316        return self.synthesize_image(run_config, image_metas, rng)