mosse correlation

This commit is contained in:
Alexandre Foucher 2024-07-22 16:04:55 +02:00
parent 7ebd03ec98
commit 96b89635c5
14 changed files with 476 additions and 173 deletions

1
.gitignore vendored
View file

@ -1,2 +1,3 @@
**/*.egg-info **/*.egg-info
/dist /dist
**/__pycache__/

134
demo/mosse_viz.py Normal file
View file

@ -0,0 +1,134 @@
import numpy as np
import scipy.signal as signal
from scipy.fftpack import fft, fftshift, ifft
import matplotlib.pyplot as plt
from matplotlib.widgets import Slider
N = 256
def gaussian_filter1d(size,sigma):
filter_range = np.linspace(-int(size/2),int(size/2),size)
gaussian_filter = [1 / (sigma * np.sqrt(2*np.pi)) * np.exp(-x**2/(2*sigma**2)) for x in filter_range]
return gaussian_filter
def generate_signal(N:int) -> np.ndarray:
x = np.arange(1, N)
y = np.zeros((N))
for i in range(len(x)):
y[i] = np.random.normal(scale=1) + (y[i-1] if i > 1 else 0)
return np.convolve(y,gaussian_filter1d(N,1),'same')
if __name__ == '__main__':
np.random.seed(42)
f_full = generate_signal(2048)
x = np.arange(-180,180,360/N)
f = f_full[1024:1024+N]
h = f_full[1024:1024+N]
g = signal.gaussian(N, std=10,sym=True)
F = fft(f)
F_ = np.conjugate(F)
G = fft(g)
K_ = (G*F_)/(F*F_)
H = fft(h)
r = ifft(H*K_)
# ==========================================
fig, (ax1, ax2, ax3, ax4, ax5, ax6, ax7) = plt.subplots(7, 1, gridspec_kw={'height_ratios':[4,4,4,1,1,1,1]})
ax1.set_title('Terrain')
plt_f, = ax1.plot(x,f)
plt_h, = ax1.plot(x,h)
ax2.set_title('MOSSE response signal')
ax2.set_ylim([0, 1.2])
line_r = ax2.axvline(x=-N//2+np.argmax(abs(r)), color='r')
plt_r, = ax2.plot(x,abs(r))
plt_r2, = ax2.plot(x,abs(r))
ax3.set_title('Gaussian')
plt_g, = ax3.plot(x,g)
ax1.set_xlim([-180,180])
ax2.set_xlim([-180,180])
ax3.set_xlim([-180,180])
slider1 = Slider(ax4, 'sigma', 0.3, 10, valinit=0.1)
slider2 = Slider(ax5, 'shift', -N//2 , N//2, valinit=0, valstep=1)
slider3 = Slider(ax6, 'seed', 0 , 50, valinit=0, valstep=1)
slider4 = Slider(ax7, 'N', 128 , 1024, valinit=256, valstep=8)
sigma = 0.3
shift = 0
def update():
# K_ = (G*F_)/(F*F_)
window = np.ones((N)) #signal.windows.hamming(N)
H = fft(h*window)
F = fft(f*window)
R = H*G/F
r = ifft(R)
s = np.argmax(abs(r))
r2 = np.copy(r)
r2[s-5:s+5] = 0
plt_g.set_data(x,g)
plt_r.set_data(x,abs(r))
plt_r2.set_data(x,abs(r2))
plt_h.set_data(x,h)
plt_f.set_data(x,f)
ax1.set_ylim([min(np.min(h),np.min(f))-1, max(np.max(h),np.max(f))+1])
ax2.set_ylim([0, np.max(r)+0.2])
line_r.set_xdata(round((np.argmax(abs(r))/N-0.5)*360))
fig.canvas.draw_idle()
def update_sigma(val):
global g, G, sigma, N
sigma = val
g = signal.gaussian(N, std=sigma,sym=True)
G = fft(g)
update()
def update_shift(val):
global shift, H, h
shift = -val
h = f_full[1024+round(shift*(N/360)):1024+N+round(shift*(N/360))]
noise = np.random.normal(0,0.5, N)
h = h+noise
update()
def update_seed(val):
global f_full, f, h, F, H, F_, shift
np.random.seed(val)
f_full = generate_signal(2048)
f = f_full[1024:1024+N]
h = f_full[1024+round(shift*(N/360)):1024+N+round(shift*(N/360))]
noise = np.random.normal(0,0.5, N)
h = h+noise
update()
def update_n(val):
global g, G, N, f_full, f, h, F, H, F_, shift, x, sigma, slider2
N = val
x = np.arange(-180,180,360/N)
g = signal.gaussian(N, std=sigma,sym=True)
G = fft(g)
f = f_full[1024:1024+N]
h = f_full[1024+shift:1024+N+shift]
noise = np.random.normal(0,0.5, N)
h = h+noise
update()
slider1.on_changed(update_sigma)
slider2.on_changed(update_shift)
slider3.on_changed(update_seed)
slider4.on_changed(update_n)
plt.subplots_adjust(hspace=0.5)
plt.show()

54
demo/test_terrain.py Normal file
View file

@ -0,0 +1,54 @@
import matplotlib.pyplot as plt
import numpy as np
import cv2 as cv
import os
import pyskyline
import pyskyline.localization
FILE = 'terrain'
SIZE = 256
X = SIZE//2
Y = SIZE//2
if __name__ == '__main__':
if os.path.isfile(f'{FILE}_{SIZE}.npz'):
print("Save detected, loading ...")
terrain = pyskyline.Terrain.load(f'{FILE}_{SIZE}')
else:
print("No save detected, generating new terrain ...")
terrain = pyskyline.Terrain.generate(size=SIZE)
terrain.compute_all_skylines()
terrain.save(f'{FILE}_{SIZE}')
print(f"Done, terrain saved as {FILE}_{SIZE}.npz")
# Output elevation map
cv.imwrite('elevation_map.jpg', terrain.dem.elevation)
# Output color map
cv.imwrite('color_map.jpg', terrain.dem.color)
# Ouput field of view visualization
cv.imwrite('fov.jpg', terrain.compute_fov(X, Y))
# Ouput skyline
skyline = terrain.compute_skyline(X, Y)
plt.figure()
plt.grid(False)
plt.plot(np.arange(-180,180,360/256),skyline)
plt.title('Skyline')
plt.savefig('skyline.svg')
# Generate heatmap
skyline = np.roll(skyline, 8) # Add heading error
skyline += np.random.normal(0,0.5, 256) # Add random noise
mosse_correlation = pyskyline.MosseCorrelation(terrain, skyline)
scoremap = mosse_correlation.generate_scoremap()
plt.figure()
plt.grid(False)
fig = plt.imshow(scoremap)
plt.title('Score map')
plt.colorbar(fig)
plt.savefig('scoremap.svg')

View file

@ -1,18 +1,4 @@
""" from .digital_map import DigitalMap
Copyright (C) 2024 University of South Brittany, Lab-STICC UMR 6285 All Rights Reserved. from .raycast import bresenham_line, get_highest_angle
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from .raycast import bresenham_line
from .terrain import Terrain from .terrain import Terrain
from .localization.mosse_correlation import MosseCorrelation

View file

@ -0,0 +1,77 @@
import os
import numpy as np
import cv2 as cv
import perlin_numpy as perlin
class DigitalMap:
DEFAULT_MASK = os.path.dirname(__file__) + '/../../resources/mask.jpg'
DEFAULT_LAYERS = {
-1 : (169, 166, 97), # Water
18 : (175, 214, 238), # Sand
36 : ( 34, 139, 34), # Grass
82 : ( 20, 100, 20) # Tree
}
def __init__(self, seed:int, size:int, height:float, scale:float) -> None:
self.height = height
self.scale = scale
self.seed = seed
self.size = size
self.elevation : 'np.ndarray|None' = None
self.color : 'np.ndarray|None' = None
def generate_elevation_map(self, mask_path:str) -> None:
""" Generate grayscale elevation map from 2D perlin noise
Args:
mask_path (str): Terrain grayscale mask path.
"""
np.random.seed(self.seed)
perlin_noise = perlin.generate_fractal_noise_2d(
shape = (512,)*2,
res = (2**3,)*2,
octaves = 5
)
# Set noise in [0;1]
p_min = np.min(perlin_noise)
p_max = np.max(perlin_noise)
perlin_noise = (perlin_noise - p_min) / (p_max - p_min)
if self.size != 512:
perlin_noise = cv.resize(perlin_noise, (self.size,)*2, interpolation=cv.INTER_CUBIC)
# Open and rescale mask
mask = cv.imread(mask_path, cv.IMREAD_GRAYSCALE)
mask = cv.resize(mask, (self.size,)*2, interpolation=cv.INTER_CUBIC)
OCEAN_HEIGHT = 30
elevation_map = perlin_noise * mask
elevation_map = np.clip(elevation_map, OCEAN_HEIGHT, 255)
elevation_map = (elevation_map-OCEAN_HEIGHT) / (255-OCEAN_HEIGHT) * 255
self.elevation = elevation_map.astype(dtype=np.uint8)
def generate_color_map(self, layers:'dict|None'=None) -> None:
""" Generate RGB colored map based on elevation map.
Args:
layers (dict): Layer dictionary where the key is the height and value the color in BGR.
"""
if layers is None:
layers = self.DEFAULT_LAYERS
color_map = np.full((self.size, self.size, 3), layers[-1], dtype=np.uint8)
for height, color in list(layers.items())[1:]:
mask = self.elevation > height
color_map[mask] = color
self.color = color_map
@staticmethod
def generate(seed:int, size:int, height:float, scale:float, mask_path:str=DEFAULT_MASK) -> 'DigitalMap':
dem = DigitalMap(seed, size, height, scale)
dem.generate_elevation_map(mask_path)
dem.generate_color_map()
return dem

View file

@ -0,0 +1,36 @@
import numpy as np
import cv2 as cv
from multiprocessing import Pool, cpu_count
from ..terrain import Terrain
class Localizator:
def __init__(self, terrain:Terrain, skyline:np.ndarray) -> None:
self.terrain = terrain
self.skyline = skyline
def _compute_score_mp(self, func:callable, args:any, print_progress:bool=True):
if print_progress:
import rich.progress as rp
progress_bar = rp.Progress(
*rp.Progress.get_default_columns(),
rp.TimeElapsedColumn(),
rp.MofNCompleteColumn()
)
task = progress_bar.add_task('Compute heatmap', total=len(args))
progress_bar.start()
score_map = (self.terrain.dem.elevation != 0).astype(np.float16) * -1
with Pool(processes=cpu_count()) as pool:
results = pool.imap(func, args)
for x, y, score in results:
score_map[y, x] = np.clip(score, -65000, 65000)
progress_bar.advance(task, 1)
if print_progress:
progress_bar.stop()
return score_map
def generate_scoremap(self):
raise NotImplementedError()

View file

@ -3,24 +3,33 @@ import math
from scipy.fft import fft, ifft from scipy.fft import fft, ifft
from scipy.signal import gaussian from scipy.signal import gaussian
g = gaussian(256, std=2.25,sym=True) from ..terrain import Terrain
G = fft(g) from .localizator import Localizator
def compute_score(F : np.ndarray, h : np.ndarray) -> float: def compute_score(args) -> float:
""" Compute the correlation score of the two skyline based on a MOSSE correlation filter. x, y, F, H, G = args
Args:
F (np.ndarray): Real skyline in fourier domain.
h (np.ndarray): Simulated skyline in time domain.
Returns:
float: Correlation score, higher is the score higher the correlation is.
"""
H = fft(h)
r = abs(ifft(H*G/F)) r = abs(ifft(H*G/F))
shift = np.argmax(r) shift = np.argmax(r)
peak = r[shift] peak = r[shift]
r[shift-5:shift+5] = 0 r[shift-5:shift+5] = 0
score = peak/math.sqrt(np.sum(np.power(r,2))) score = peak/math.sqrt(np.sum(np.power(r,2)))
return math.log(score) return x, y, score
class MosseCorrelation(Localizator):
def __init__(self, terrain: Terrain, skyline: np.ndarray) -> None:
super().__init__(terrain, skyline)
self.F = fft(skyline)
self.G = fft(gaussian(skyline.shape[0], std=2.25,sym=True))
def generate_scoremap(self, ray_dist:float=1000.0):
ray_count = self.F.shape[0]
args = [(
x,
y,
self.F,
fft(self.terrain.compute_skyline(x,y,ray_count,ray_dist)),
self.G
) for x in range(self.terrain.dem.size) for y in range(self.terrain.dem.size) if self.terrain.dem.elevation[y,x] == 0]
scoremap = self._compute_score_mp(compute_score, args)
scoremap = (scoremap-np.min(scoremap)) / (np.max(scoremap)-np.min(scoremap))
return scoremap

View file

@ -1,22 +1,8 @@
#!/usr/bin/env python
"""
Copyright (C) 2024 University of South Brittany, Lab-STICC UMR 6285 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import numpy as np import numpy as np
import cv2 as cv
import math
from .digital_map import DigitalMap
def bresenham_line(x:int, y:int, dist:float, angle:float, limit:int) -> list: def bresenham_line(x:int, y:int, dist:float, angle:float, limit:int) -> list:
"""Generate list of points of all grid cell where raycast hit. """Generate list of points of all grid cell where raycast hit.
@ -42,7 +28,7 @@ def bresenham_line(x:int, y:int, dist:float, angle:float, limit:int) -> list:
sy = 1 if y < y1 else -1 sy = 1 if y < y1 else -1
error = dx + dy error = dx + dy
while (x != x1 and y != y1) and (0 <= x < limit and 0 <= y < limit): while (x != x1 or y != y1) and (0 <= x < limit and 0 <= y < limit):
points.append((x,y)) points.append((x,y))
e2 = 2 * error e2 = 2 * error
@ -53,5 +39,75 @@ def bresenham_line(x:int, y:int, dist:float, angle:float, limit:int) -> list:
error += dx error += dx
y += sy y += sy
return points return points
def get_highest_angle(points:list, dem:DigitalMap) -> tuple:
m_height = 0
m_angle = 0
m_pos = points[-1]
for x, y in points[1:]:
height = dem.elevation[y,x]
if height > m_height:
m_height = height
dist = math.sqrt((points[0][0]-x)**2+(points[0][1]-y)**2) * dem.scale
angle = np.arctan((height * dem.height/255)/dist)
if angle >= m_angle:
m_angle = angle
m_pos = (x,y)
return np.rad2deg(m_angle), m_pos
def mp_compute_skyline(args:tuple) -> tuple:
dem, x, y, ray_count, ray_dist = args
return (x,y,compute_skyline(dem,x,y,ray_count,ray_dist))
def compute_skyline(dem:DigitalMap, x:int, y:int, ray_count:int, ray_dist:float) -> np.ndarray:
"""_summary_
Args:
x (int): X coordinate
y (int): Y coordinate
ray_count (int, optional): Raycast count. Defaults to 256.
ray_dist (float, optional): Raycast distance in meter. Defaults to 1000.0.
Returns:
np.ndarray: skyline array.
"""
ray_step = 2 * np.pi / (ray_count-1)
line = np.zeros((ray_count), dtype=np.float16)
for i in range(ray_count):
points = bresenham_line(x, y, ray_dist/dem.scale, ray_step*i, dem.size)
angle, _ = get_highest_angle(points, dem)
line[i] = angle
return line
def compute_fov(dem:DigitalMap, x:int, y:int, ray_count:int=256, ray_dist:float=1000.0) -> np.ndarray:
"""_summary_
Args:
x (int): X coordinate
y (int): Y coordinate
ray_count (int, optional): Raycast count. Defaults to 256.
ray_dist (float, optional): Raycast distance in meter. Defaults to 1000.0.
Returns:
np.ndarray: skyline array.
"""
ray_step = 2 * np.pi / (ray_count-1)
output = np.copy(dem.color)
output = cv.circle(output, (x,y), 3, (255,125,0),-1)
last_pos = None
for i in range(ray_count):
points = bresenham_line(x, y, ray_dist/dem.scale, ray_step*i, dem.size)
_, pos = get_highest_angle(points, dem)
if last_pos is not None:
output = cv.line(output,last_pos,pos,(0,0,255),2)
last_pos = pos
return output

View file

@ -1,98 +1,37 @@
#!/usr/bin/env python from multiprocessing import Pool, cpu_count
"""
Copyright (C) 2024 University of South Brittany, Lab-STICC UMR 6285 All Rights Reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
import math
import numpy as np import numpy as np
import cv2 as cv import cv2 as cv
import perlin_numpy as perlin from .digital_map import DigitalMap
from .raycast import bresenham_line from . import raycast
print(__file__)
class Terrain: class Terrain:
DEFAULT_MASK = os.path.dirname(__file__) + '/../../resources/mask.jpg' def __init__(self, dem:DigitalMap) -> None:
DEFAULT_RAYCOUNT = 256 self.dem = dem
DEFAULT_RAYDIST = 1000.0 self.skylines = {}
DEFAULT_LAYERS = {
-1 : (169, 166, 97), # Water
18 : (175, 214, 238), # Sand
36 : ( 34, 139, 34), # Grass
82 : ( 20, 100, 20) # Tree
}
def __init__(self, seed:int, size:int, height:float, scale:float) -> None: def compute_all_skylines(self, ray_count:int=256, ray_dist:float=1000.0, print_progress:bool=True):
self.seed = seed args = [(self.dem, x, y, ray_count, ray_dist) for x in range(self.dem.size) for y in range(self.dem.size) if self.dem.elevation[y,x] == 0]
self.size = size
self.height = height
self.scale = scale
self.elevation_map = None
self.color_map = None
self.skylines = None
def generate_elevation_map(self, mask_path:str) -> None: if print_progress:
""" Generate grayscale elevation map from 2D perlin noise import rich.progress as rp
progress_bar = rp.Progress(
Args: *rp.Progress.get_default_columns(),
mask_path (str): Terrain grayscale mask path. rp.TimeElapsedColumn(),
""" rp.MofNCompleteColumn()
np.random.seed(self.seed)
perlin_noise = perlin.generate_fractal_noise_2d(
shape = (512,)*2,
res = (2**3,)*2,
octaves = 5
) )
task = progress_bar.add_task('Compute skylines', total=len(args))
progress_bar.start()
# Set noise in [0;1] with Pool(processes=cpu_count()) as pool:
p_min = np.min(perlin_noise) results = pool.imap(raycast.mp_compute_skyline, args)
p_max = np.max(perlin_noise) for x, y, skyline in results:
perlin_noise = (perlin_noise - p_min) / (p_max - p_min) self.skylines[(x,y)] = skyline
progress_bar.advance(task, 1)
if self.size != 512: if print_progress:
perlin_noise = cv.resize(perlin_noise, (self.size,)*2, interpolation=cv.INTER_CUBIC) progress_bar.stop()
# Open and rescale mask def compute_skyline(self, x:int, y:int, ray_count:int=256, ray_dist:float=1000.0):
mask = cv.imread(mask_path, cv.IMREAD_GRAYSCALE)
mask = cv.resize(mask, (self.size,)*2, interpolation=cv.INTER_CUBIC)
elevation_map = perlin_noise * mask
self.elevation_map = elevation_map.astype(dtype=np.uint8)
def generate_colored_map(self, layers:'dict|None'=None) -> None:
""" Generate RGB colored map based on elevation map.
Args:
layers (dict): Layer dictionary where the key is the height and value the color in BGR.
"""
if layers is None:
layers = self.DEFAULT_LAYERS
h,w = self.elevation_map.shape
color_map = np.full((h,w,3), layers[-1], dtype=np.uint8)
for height, color in list(layers.items())[1:]:
mask = self.elevation_map > height
color_map[mask] = color
self.color_map = cv.cvtColor(color_map, cv.COLOR_BGR2RGB)
def compute_skyline(self, x:int, y:int, ray_count:int=256, ray_dist:float=1000.0) -> np.ndarray:
"""_summary_ """_summary_
Args: Args:
@ -104,30 +43,55 @@ class Terrain:
Returns: Returns:
np.ndarray: skyline array. np.ndarray: skyline array.
""" """
ray_step = 2 * np.pi / (ray_count-1) if (x,y) not in self.skylines:
line = np.zeros((ray_count), dtype=np.float16) self.skylines[(x,y)] = raycast.compute_skyline(self.dem, x, y, ray_count, ray_dist)
return self.skylines[(x,y)]
for i in range(ray_count): def compute_fov(self, x:int, y:int, ray_count:int=256, ray_dist:float=1000.0):
max_height = -10 """_summary_
max_v_angle = 0
points = bresenham_line(x, y, ray_dist/self.scale, ray_step*i, self.size)
for x0, y0 in points:
height = self.elevation_map[y0,x0]
if height > max_height:
max_height = height
dist = math.sqrt((x0-x)**2+(y0-y)**2) * self.scale
if dist != 0.0:
v_angle = np.arctan((height * self.height/255)/dist)
if v_angle > max_v_angle:
max_v_angle = v_angle
line[i] = np.rad2deg(max_v_angle) Args:
x (int): X coordinate
y (int): Y coordinate
ray_count (int, optional): Raycast count. Defaults to 256.
ray_dist (float, optional): Raycast distance in meter. Defaults to 1000.0.
return line Returns:
np.ndarray: skyline array.
"""
return raycast.compute_fov(self.dem, x, y, ray_count, ray_dist)
def save(self, file:str):
np.savez_compressed(f'{file}.npz',
size = self.dem.size,
seed = self.dem.seed,
height = self.dem.height,
scale = self.dem.scale,
elevation_map = self.dem.elevation,
skylines = self.skylines
)
@staticmethod
def load(file:str) -> 'Terrain':
if not file.endswith('.npz'):
file += ".npz"
save = np.load(file,allow_pickle=True)
dem = DigitalMap(
seed = save['seed'],
size = save['size'],
height = save['height'],
scale = save['scale']
)
dem.elevation = save.get('elevation_map')
dem.generate_color_map()
terrain = Terrain(dem)
terrain.skylines = save['skylines'].item()
return terrain
@staticmethod @staticmethod
def generate(seed:int=0x7B16, size:int=512, height:float=25.0, scale:float=1.0, def generate(seed:int=0x7B16, size:int=512, height:float=25.0, scale:float=1.0,
mask_path:str=DEFAULT_MASK) -> 'Terrain': mask_path:str=DigitalMap.DEFAULT_MASK) -> 'Terrain':
""" Generate a procedural terrain using seed and mask, and precompute all horizon lines """ Generate a procedural terrain using seed and mask, and precompute all horizon lines
Args: Args:
@ -140,8 +104,7 @@ class Terrain:
Returns: Returns:
Terrain: _description_ Terrain: _description_
""" """
terrain = Terrain(seed,size,height,scale) dem = DigitalMap.generate(seed, size, height, scale, mask_path)
terrain.generate_elevation_map(mask_path) terrain = Terrain(dem)
terrain.generate_colored_map()
return terrain return terrain

View file

@ -1,13 +0,0 @@
import matplotlib.pyplot as plt
import numpy as np
import pyskyline
terrain = pyskyline.Terrain.generate()
h = terrain.compute_skyline(330,330,360)
plt.figure()
plt.imshow(terrain.elevation_map)
plt.figure()
plt.plot(np.arange(0,360,360/h.shape[0]), h)
plt.show()