Source code for cbp.builder.wifi_simulator

import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from .migr_simulator import MigrSimulator

from .hmm_simulator import PotentialType


[docs]class WifiSimulator(MigrSimulator): """simulate ensemble move, sensor is sparse - place sensors in some place - draw various data """ def __init__(self, time_step, d_col, d_row, random_seed): # use random_seed 3, random_sensor 16, reproduce the experiment result super().__init__(time_step, d_col, d_row, random_seed) self.record.obser_num = 0 self.hotspot = []
[docs] def compile(self): super().compile() potential = np.zeros(self.record.state_num) potential[0] = 0.7 potential[int(self.d_col / 2)] = 0.3 self.register_potential(PotentialType.INIT, potential)
[docs] def register_hotspot(self, row, col): self.record.obser_num += 1 self.hotspot.append((row, col))
def _produce_sensor_potential(self): potential = [] for cur_row in range(self.d_row): for cur_col in range(self.d_col): cur_potential = np.zeros(self.record.obser_num) for i, sensor in enumerate(self.hotspot): distance = np.linalg.norm( [cur_row - sensor[0], cur_col - sensor[1]]) cur_potential[i] = np.min([0.99, 1.0 * np.exp(-distance)]) potential.append(cur_potential / np.sum(cur_potential)) self.register_potential( PotentialType.EMISSION, np.array(potential).reshape( self.record.state_num, self.record.obser_num)) return int(self.d_col / 2)
[docs] def fixed_sensors(self, space_d=1): for cur_row in range(0, self.d_row, space_d): for cur_col in range(0, self.d_col, space_d): self.register_hotspot(cur_row, cur_col)
[docs] def random_sensor(self, num): for _ in range(num): row = self.rng.uniform(0, self.d_row - 1) col = self.rng.uniform(0, self.d_col - 1) self.register_hotspot(row, col)
[docs] def draw_sensor(self): """draw wifi sensor position in grid """ sensor_row = [] sensor_col = [] for sensor in self.hotspot: sensor_row.append(sensor[0] + 0.2) sensor_col.append(sensor[1] + 0.2) marker_col, marker_row = np.meshgrid( np.arange(self.d_col), np.arange(self.d_row)) pos_x = np.concatenate((marker_col.flatten(), sensor_col), axis=0) pos_y = np.concatenate((marker_row.flatten(), sensor_row), axis=0) x_y_size = np.concatenate( (np.ones(int(marker_col.size)) * 1.0, np.ones(len(sensor_col)) * 5.0)) style = ['cell'] * marker_col.size + ['sensor'] * len(sensor_row) ax = sns.scatterplot(x=pos_y, y=pos_x, size=x_y_size, hue=style, style=style, markers={'cell': 's', 'sensor': 'X'}) ax.set_xlim([-1, self.d_col]) ax.set_xticks(np.arange(0, self.d_col, 2)) ax.set_xticklabels(np.arange(0, self.d_col, 2).tolist()) ax.set_ylim([-1, self.d_row]) ax.set_yticks(np.arange(0, self.d_row, 2)) ax.set_yticklabels(np.arange(0, self.d_row, 2).tolist()) handles, _ = ax.get_legend_handles_labels() ax.legend(handles, ["cell", "sensor"]) plt.savefig(f"{self.path}/sensor.png", bbox_inches='tight') plt.close()
[docs] def viz_sensor(self): """draw raw sensor data """ for i in range(self.record.time_step): locations = self.record["sensor"][:, i] bins, _ = np.histogram( locations, np.arange( self.record.obser_num + 1)) self.__draw_sensordata(bins, f"{self.path}/observer_{i}_step.png")
def __draw_sensordata(self, bins, fig_name): xx = [] yy = [] xy_size = [] for xy, cnt in enumerate(bins): if cnt > 0: xx.append(self.hotspot[xy][0]) yy.append(self.hotspot[xy][1]) xy_size.append(int(cnt)) self.visualizer.visualize_location(xx, yy, xy_size, **{ "fig_name": fig_name, "xlabel": 'Observed'})