import argparse
import torch
import os
from torch.nn import functional as F
from torch.utils.data import DataLoader
from torchvision import transforms as T
from torchvision.transforms.functional import to_pil_image
from threading import Thread
from tqdm import tqdm
from torch.utils.data import Dataset
from PIL import Image
from typing import Callable, Optional, List, Tuple
import glob
from torch import nn
from torchvision.models.resnet import ResNet, Bottleneck
from torch import Tensor
import torchvision
import numpy as np
import cv2
import uuid
# --------------- hy ---------------
class HomographicAlignment:
Apply homographic alignment on background to match with the source image.
def __init__(self):
self.detector = cv2.ORB_create()
self.matcher = cv2.DescriptorMatcher_create(cv2.DESCRIPTOR_MATCHER_BRUTEFORCE)
def __call__(self, src, bgr):
src = np.asarray(src)
bgr = np.asarray(bgr)
keypoints_src, descriptors_src = self.detector.detectAndCompute(src, None)
keypoints_bgr, descriptors_bgr = self.detector.detectAndCompute(bgr, None)
matches = self.matcher.match(descriptors_bgr, descriptors_src, None)
matches.sort(key=lambda x: x.distance, reverse=False)
num_good_matches = int(len(matches) * 0.15)
matches = matches[:num_good_matches]
points_src = np.zeros((len(matches), 2), dtype=np.Float32)
points_bgr = np.zeros((len(matches), 2), dtype=np.float32)
for i, match in enumerate(matches):
points_src[i, :] = keypoints_src[match.trainIdx].pt
points_bgr[i, :] = keypoints_bgr[match.queryIdx].pt
H, _ = cv2.findHomography(points_bgr, points_src, cv2.RANSAC)
h, w = src.shape[:2]
bgr = cv2.warpPerspective(bgr, H, (w, h))
msk = cv2.warpPerspective(np.ones((h, w)), H, (w, h))
# For areas that is outside of the background,
# We just copy pixels from the source.
bgr[msk != 1] = src[msk != 1]
src = Image.fromarray(src)
bgr = Image.fromarray(bgr)
return src, bgr
class Refiner(nn.Module):
# For TorchScript export optimization.
__constants__ = ['kernel_size', 'patch_crop_method', 'patch_replace_method']
def __init__(self,
mode: str,
sample_pixels: int,
threshold: float,
kernel_size: int = 3,
prevent_oversampling: bool = True,
patch_crop_method: str = 'unfold',
patch_replace_method: str = 'scatter_nd'):
assert mode in ['full', 'sampling', 'thresholding']
assert kernel_size in [1, 3]
assert patch_crop_method in ['unfold', 'roi_align', 'gather']
assert patch_replace_method in ['scatter_nd', 'scatter_element']
self.mode = mode
self.sample_pixels = sample_pixels
self.threshold = threshold
self.kernel_size = kernel_size
self.prevent_oversampling = prevent_oversampling
self.patch_crop_method = patch_crop_method
self.patch_replace_method = patch_replace_method
channels = [32, 24, 16, 12, 4]
self.conv1 = nn.Conv2d(channels[0] 6 4, channels[1], kernel_size, bias=False)
self.bn1 = nn.BatchNorm2d(channels[1])
self.conv2 = nn.Conv2d(channels[1], channels[2], kernel_size, bias=False)
self.bn2 = nn.BatchNorm2d(channels[2])
self.conv3 = nn.Conv2d(channels[2] 6, channels[3], kernel_size, bias=False)
self.bn3 = nn.BatchNorm2d(channels[3])
self.conv4 = nn.Conv2d(channels[3], channels[4], kernel_size, bias=True)
self.relu = nn.ReLU(True)
def forward(self,
src: torch.Tensor,
bgr: torch.Tensor,
pha: torch.Tensor,
fgr: torch.Tensor,
err: torch.Tensor,
hid: torch.Tensor):
H_full, W_full = src.shape[2:]
H_half, W_half = H_full // 2, W_full // 2
H_quat, W_quat = H_full // 4, W_full // 4
src_bgr = torch.cat([src, bgr], dim=1)
if self.mode != 'full':
err = F.interpolate(err, (H_quat, W_quat), mode='bilinear', align_corners=False)
ref = self.select_refinement_regions(err)
idx = torch.nonzero(ref.squeeze(1))
idx = idx[:, 0], idx[:, 1], idx[:, 2]
if idx[0].size(0) > 0:
x = torch.cat([hid, pha, fgr], dim=1)
x = F.interpolate(x, (H_half, W_half), mode='bilinear', align_corners=False)
x = self.crop_patch(x, idx, 2, 3 if self.kernel_size == 3 else 0)
y = F.interpolate(src_bgr, (H_half, W_half), mode='bilinear', align_corners=False)
y = self.crop_patch(y, idx, 2, 3 if self.kernel_size == 3 else 0)
x = self.conv1(torch.cat([x, y], dim=1))
x = self.bn1(x)
x = self.relu(x)
x = self.conv2(x)
x = self.bn2(x)
x = self.relu(x)
x = F.interpolate(x, 8 if self.kernel_size == 3 else 4, mode='nearest')
y = self.crop_patch(src_bgr, idx, 4, 2 if self.kernel_size == 3 else 0)
x = self.conv3(torch.cat([x, y], dim=1))
x = self.bn3(x)
x = self.relu(x)
x = self.conv4(x)
out = torch.cat([pha, fgr], dim=1)
out = F.interpolate(out, (H_full, W_full), mode='bilinear', align_corners=False)
out = self.replace_patch(out, x, idx)
pha = out[:, :1]
fgr = out[:, 1:]
pha = F.interpolate(pha, (H_full, W_full), mode='bilinear', align_corners=False)
fgr = F.interpolate(fgr, (H_full, W_full), mode='bilinear', align_corners=False)
x = torch.cat([hid, pha, fgr], dim=1)
x = F.interpolate(x, (H_half, W_half), mode='bilinear', align_corners=False)
y = F.interpolate(src_bgr, (H_half, W_half), mode='bilinear', align_corners=False)
if self.kernel_size == 3:
x = F.pad(x, (3, 3, 3, 3))
y = F.pad(y, (3, 3, 3, 3))
x = self.conv1(torch.cat([x, y], dim=1))
x = self.bn1(x)
x = self.relu(x)
x = self.conv2(x)
x = self.bn2(x)
x = self.relu(x)
if self.kernel_size == 3:
x = F.interpolate(x, (H_full 4, W_full 4))
y = F.pad(src_bgr, (2, 2, 2, 2))
x = F.interpolate(x, (H_full, W_full), mode='nearest')
y = src_bgr
x = self.conv3(torch.cat([x, y], dim=1))
x = self.bn3(x)
x = self.relu(x)
x = self.conv4(x)
pha = x[:, :1]
fgr = x[:, 1:]
ref = torch.ones((src.size(0), 1, H_quat, W_quat), device=src.device, dtype=src.dtype)
return pha, fgr, ref
def select_refinement_regions(self, err: torch.Tensor):
Select refinement regions.
err: error map (B, 1, H, W)
ref: refinement regions (B, 1, H, W). FloatTensor. 1 is selected, 0 is not.
if self.mode == 'sampling':
# Sampling mode.
b, _, h, w = err.shape
err = err.view(b, -1)
idx = err.topk(self.sample_pixels // 16, dim=1, sorted=False).indices
ref = torch.zeros_like(err)
ref.scatter_(1, idx, 1.)
if self.prevent_oversampling:
ref = ref.view(b, 1, h, w)
# Thresholding mode.
ref = err.gt(self.threshold).float()
return ref
def crop_patch(self,
x: torch.Tensor,
idx: Tuple[torch.Tensor, torch.Tensor, torch.Tensor],
size: int,
padding: int):
Crops selected patches from image given indices.
x: image (B, C, H, W).
idx: selection indices Tuple[(P,), (P,), (P,),], where the 3 values are (B, H, W) index.
size: center size of the patch, also stride of the crop.
padding: expansion size of the patch.
patch: (P, C, h, w), where h = w = size 2 * padding.
if padding != 0:
x = F.pad(x, (padding,) * 4)
if self.patch_crop_method == 'unfold':
# Use unfold. Best performance for PyTorch and TorchScript.
return x.permute(0, 2, 3, 1) \
.unfold(1, size 2 * padding, size) \
.unfold(2, size 2 * padding, size)[idx[0], idx[1], idx[2]]
elif self.patch_crop_method == 'roi_align':
# Use roi_align. Best compatibility for ONNX.
idx = idx[0].type_as(x), idx[1].type_as(x), idx[2].type_as(x)
b = idx[0]
x1 = idx[2] * size - 0.5
y1 = idx[1] * size - 0.5
x2 = idx[2] * size size 2 * padding - 0.5
y2 = idx[1] * size size 2 * padding - 0.5
boxes = torch.stack([b, x1, y1, x2, y2], dim=1)
return torchvision.ops.roi_align(x, boxes, size 2 * padding, sampling_ratio=1)
# Use gather. Crops out patches pixel by pixel.
idx_pix = self.compute_pixel_indices(x, idx, size, padding)
pat = torch.gather(x.view(-1), 0, idx_pix.view(-1))
pat = pat.view(-1, x.size(1), size 2 * padding, size 2 * padding)
return pat
def replace_patch(self,
x: torch.Tensor,
y: torch.Tensor,
idx: Tuple[torch.Tensor, torch.Tensor, torch.Tensor]):
Replaces patches back into image given index.
x: image (B, C, H, W)
y: patches (P, C, h, w)
idx: selection indices Tuple[(P,), (P,), (P,)] where the 3 values are (B, H, W) index.
image: (B, C, H, W), where patches at idx locations are replaced with y.
xB, xC, xH, xW = x.shape
yB, yC, yH, yW = y.shape
if self.patch_replace_method == 'scatter_nd':
# Use scatter_nd. Best performance for PyTorch and TorchScript. Replacing patch by patch.
x = x.view(xB, xC, xH // yH, yH, xW // yW, yW).permute(0, 2, 4, 1, 3, 5)
x[idx[0], idx[1], idx[2]] = y
x = x.permute(0, 3, 1, 4, 2, 5).view(xB, xC, xH, xW)
return x
# Use scatter_element. Best compatibility for ONNX. Replacing pixel by pixel.
idx_pix = self.compute_pixel_indices(x, idx, size=4, padding=0)
return x.view(-1).scatter_(0, idx_pix.view(-1), y.view(-1)).view(x.shape)
def compute_pixel_indices(self,
x: torch.Tensor,
idx: Tuple[torch.Tensor, torch.Tensor, torch.Tensor],
size: int,
padding: int):
Compute selected pixel indices in the tensor.
Used for crop_method == 'gather' and replace_method == 'scatter_element', which crop and replace pixel by pixel.
x: image: (B, C, H, W)
idx: selection indices Tuple[(P,), (P,), (P,),], where the 3 values are (B, H, W) index.
size: center size of the patch, also stride of the crop.
padding: expansion size of the patch.
idx: (P, C, O, O) long tensor where O is the output size: size 2 * padding, P is number of patches.
the element are indices pointing to the input x.view(-1).
B, C, H, W = x.shape
S, P = size, padding
O = S 2 * P
b, y, x = idx
n = b.size(0)
c = torch.arange(C)
o = torch.arange(O)
idx_pat = (c * H * W).view(C, 1, 1).expand([C, O, O]) (o * W).view(1, O, 1).expand([C, O, O]) o.view(1, 1,
[C, O, O])
idx_loc = b * W * H y * W * S x * S
idx_pix = idx_loc.view(-1, 1, 1, 1).expand([n, C, O, O]) idx_pat.view(1, C, O, O).expand([n, C, O, O])
return idx_pix
def load_matched_state_dict(model, state_dict, print_stats=True):
Only loads weights that matched in key and shape. Ignore other weights.
num_matched, num_total = 0, 0
curr_state_dict = model.state_dict()
for key in curr_state_dict.keys():
num_total = 1
if key in state_dict and curr_state_dict[key].shape == state_dict[key].shape:
curr_state_dict[key] = state_dict[key]
num_matched = 1
if print_stats:
print(f'Loaded state_dict: {num_matched}/{num_total} matched')
def _make_divisible(v: float, divisor: int, min_value: Optional[int] = None) -> int:
This function is taken from the original tf repo.
It ensures that all layers have a channel number that is divisible by 8
It can be seen here:
if min_value is None:
min_value = divisor
new_v = max(min_value, int(v divisor / 2) // divisor * divisor)
# Make sure that round down does not go down by more than 10%.
if new_v < 0.9 * v:
new_v = divisor
return new_v
class ConvNormActivation(torch.nn.Sequential):
def __init__(
in_channels: int,
out_channels: int,
kernel_size: int = 3,
stride: int = 1,
padding: Optional[int] = None,
groups: int = 1,
norm_layer: Optional[Callable[..., torch.nn.Module]] = torch.nn.BatchNorm2d,
activation_layer: Optional[Callable[..., torch.nn.Module]] = torch.nn.ReLU,
dilation: int = 1,
inplace: bool = True,
) -> None:
if padding is None:
padding = (kernel_size - 1) // 2 * dilation
layers = [torch.nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding,
dilation=dilation, groups=groups, bias=norm_layer is None)]
if norm_layer is not None:
if activation_layer is not None:
self.out_channels = out_channels
class InvertedResidual(nn.Module):
def __init__(
inp: int,
oup: int,
stride: int,
expand_ratio: int,
norm_layer: Optional[Callable[..., nn.Module]] = None
) -> None:
super(InvertedResidual, self).__init__()
self.stride = stride
assert stride in [1, 2]
if norm_layer is None:
norm_layer = nn.BatchNorm2d
hidden_dim = int(round(inp * expand_ratio))
self.use_res_connect = self.stride == 1 and inp == oup
layers: List[nn.Module] = []
if expand_ratio != 1:
# pw
layers.append(ConvNormActivation(inp, hidden_dim, kernel_size=1, norm_layer=norm_layer,
# dw
ConvNormActivation(hidden_dim, hidden_dim, stride=stride, groups=hidden_dim, norm_layer=norm_layer,
# pw-linear
nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
self.conv = nn.Sequential(*layers)
self.out_channels = oup
self._is_cn = stride > 1
def forward(self, x: Tensor) -> Tensor:
if self.use_res_connect:
return x self.conv(x)
return self.conv(x)
class MobileNetV2(nn.Module):
def __init__(
num_classes: int = 1000,
width_mult: float = 1.0,
inverted_residual_setting: Optional[List[List[int]]] = None,
round_nearest: int = 8,
block: Optional[Callable[..., nn.Module]] = None,
norm_layer: Optional[Callable[..., nn.Module]] = None
) -> None:
MobileNet V2 main class
num_classes (int): Number of classes
width_mult (float): Width multiplier - adjusts number of channels in each layer by this amount
inverted_residual_setting: Network structure
round_nearest (int): Round the number of channels in each layer to be a multiple of this number
Set to 1 to turn off rounding
block: Module specifying inverted residual building block for mobilenet
norm_layer: Module specifying the normalization layer to use
super(MobileNetV2, self).__init__()
if block is None:
block = InvertedResidual
if norm_layer is None:
norm_layer = nn.BatchNorm2d
input_channel = 32
last_channel = 1280
if inverted_residual_setting is None:
inverted_residual_setting = [
# t, c, n, s
[1, 16, 1, 1],
[6, 24, 2, 2],
[6, 32, 3, 2],
[6, 64, 4, 2],
[6, 96, 3, 1],
[6, 160, 3, 2],
[6, 320, 1, 1],
# only check the first element, assuming user knows t,c,n,s are required
if len(inverted_residual_setting) == 0 or len(inverted_residual_setting[0]) != 4:
raise ValueError("inverted_residual_setting should be non-empty "
"or a 4-element list, got {}".format(inverted_residual_setting))
# building first layer
input_channel = _make_divisible(input_channel * width_mult, round_nearest)
self.last_channel = _make_divisible(last_channel * max(1.0, width_mult), round_nearest)
features: List[nn.Module] = [ConvNormActivation(3, input_channel, stride=2, norm_layer=norm_layer,
# building inverted residual blocks
for t, c, n, s in inverted_residual_setting:
output_channel = _make_divisible(c * width_mult, round_nearest)
for i in range(n):
stride = s if i == 0 else 1
features.append(block(input_channel, output_channel, stride, expand_ratio=t, norm_layer=norm_layer))
input_channel = output_channel
# building last several layers
features.append(ConvNormActivation(input_channel, self.last_channel, kernel_size=1, norm_layer=norm_layer,
# make it nn.Sequential
self.features = nn.Sequential(*features)
# building classifier
self.classifier = nn.Sequential(
nn.Linear(self.last_channel, num_classes),
# weight initialization
for m in self.modules():
if isinstance(m, nn.Conv2d):
nn.init.kaiming_normal_(m.weight, mode='fan_out')
if m.bias is not None:
elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
elif isinstance(m, nn.Linear):
nn.init.normal_(m.weight, 0, 0.01)
def _forward_impl(self, x: Tensor) -> Tensor:
# This exists since TorchScript doesn't support inheritance, so the superclass method
# (this one) needs to have a name other than `forward` that can be accessed in a subclass
x = self.features(x)
# Cannot use "squeeze" as batch-size can be 1
x = nn.functional.adaptive_avg_pool2d(x, (1, 1))
x = torch.flatten(x, 1)
x = self.classifier(x)
return x
def forward(self, x: Tensor) -> Tensor:
return self._forward_impl(x)
class MobileNetV2Encoder(MobileNetV2):
MobileNetV2Encoder inherits from torchvision's official MobileNetV2. It is modified to
use dilation on the last block to maintain output stride 16, and deleted the
classifier block that was originally used for classification. The forward method
additionally returns the feature maps at all resolutions for decoder's use.