Skip to content
Snippets Groups Projects
Commit ca6c4281 authored by rchia16's avatar rchia16
Browse files

docs and clean up

parent cc41ef43
Branches
No related merge requests found
File added
File added
......@@ -73,16 +73,62 @@ from config import WINDOW_SIZE, WINDOW_SHIFT, IMU_FS, DATA_DIR, BR_FS\
IMU_COLS = ['acc_x', 'acc_y', 'acc_z', 'gyro_x', 'gyro_y', 'gyro_z']
def utc_to_local(utc_dt, tz=None):
"""Converts UTC datetime to specified timezone
Arguments
---------
utc_dt : datetime
input datetime to convert
tz : pytz.timezone
timezone
Returns
-------
datetime
"""
return utc_dt.replace(tzinfo=timezone.utc).astimezone(tz=tz)
def datetime_from_utc_to_local(utc_datetime):
"""Converts UTC datetime to local time
Arguments
---------
utc_dt : datetime
input datetime to convert
Returns
-------
datetime
"""
now_timestamp = time.time()
offset = datetime.fromtimestamp(now_timestamp) - datetime.utcfromtimestamp(now_timestamp)
return utc_datetime + offset
# Load data
def load_bioharness_file(f:str, skiprows=0, skipfooter=0, **kwargs):
"""
Load and retrieve bioharness file. Interpolates any empty time rows
Arguments
---------
f : str
filename
skiprows : int
num. of rows to skip from top
skipfooter : int
num. of rows to skip from bottom
**kwargs
Returns
-------
pandas.DataFrame
"""
df_list = []
fmt = "%d/%m/%Y %H:%M:%S.%f"
# Set keyword arguments for read_csv
method = partial(pd.read_csv, skipinitialspace=True,
skiprows=list(range(1, skiprows+1)),
skipfooter=skipfooter,
......@@ -91,22 +137,38 @@ def load_bioharness_file(f:str, skiprows=0, skipfooter=0, **kwargs):
)
df = method(f)
if 'Time' not in df.columns.values:
# Set to datetime format
df['Time'] = pd.to_datetime(
df.rename(columns={'Date':'Day'})[
['Day','Month','Year']]) \
+ pd.to_timedelta(df['ms'], unit='ms')
# Interpolate empty time rows
if pd.isna(df['Time']).any():
df['Time'].interpolate(inplace=True)
df['Time'] = pd.to_datetime(df['Time'], format="%d/%m/%Y %H:%M:%S.%f")
df['Time'] = df['Time'].dt.strftime("%d/%m/%Y %H:%M:%S.%f")
df['Time'] = pd.to_datetime(df['Time'], format=fmt)
df['Time'] = df['Time'].dt.strftime(fmt)
return df
def load_bioharness_files(f_list:list, skiprows=0, skipfooter=0, **kwargs):
"""
Appends the output for load_bioharness_file
Arguments
---------
f_list : list
list of bioharness files to read
skiprows : int
num. of rows to skip from top
skipfooter : int
num. of rows to skip from bottom
**kwargs
Returns
-------
pandas.DataFrame
"""
df_list = []
method = partial(pd.read_csv, skipinitialspace=True,
skiprows=list(range(1, skiprows+1)),
skipfooter=skipfooter,
header=0, **kwargs)
for f in f_list:
df_list.append(load_bioharness_file(f))
......@@ -114,12 +176,38 @@ def load_bioharness_files(f_list:list, skiprows=0, skipfooter=0, **kwargs):
return df
def bioharness_datetime_to_seconds(val):
"""
Converts the bioharness datetime to seconds
Arguments
---------
val : str
bioharness time string
Returns
-------
float
"""
fmt = "%d/%m/%Y %H:%M:%S.%f"
dstr = datetime.strptime(val, fmt)
seconds = dstr.timestamp()
return seconds
def load_imu_file(imu_file:str):
"""
Load and retrieve the specified tobtii imu compressed file
Arguments
---------
imu_file : str
Tobii Glasses IMU file to read in gzip compressed format
Returns
-------
pd.DataFrame, dict
"""
hdr_file = imu_file.replace('imudata.gz', 'recording.g3')
df = pd.read_json(imu_file, lines=True, compression='gzip')
......@@ -128,6 +216,7 @@ def load_imu_file(imu_file:str):
if df.empty: return df, hdr
# Create DataFrame from data column
data_df = pd.DataFrame(df['data'].tolist())
df = pd.concat([df.drop('data', axis=1), data_df], axis=1)
......@@ -137,18 +226,22 @@ def load_imu_file(imu_file:str):
start_time = datetime.fromisoformat(iso_tz[:-1])
start_time = utc_to_local(start_time, tz=tzinfo).astimezone(tzinfo)
# Drop NA rows
na_inds = df.loc[pd.isna(df['accelerometer']), :].index.values
df.drop(index=na_inds, inplace=True)
# Interpolate times to account for any empty rows
imu_times = df['timestamp'].values
df['timestamp_interp'] = imu_times
df['timestamp_interp'] = df['timestamp_interp'].interpolate()
imu_times = df['timestamp_interp'].values
# Convert to local time
imu_datetimes = [start_time + timedelta(seconds=val) \
for val in imu_times]
imu_s = np.array([time.timestamp() for time in imu_datetimes])
df['sec'] = imu_s
# Remove any rows that are beyond 3-hours, accommodating for erroneous data
time_check_thold = df['sec'].min() + 3*3600
mask = df['sec'] > time_check_thold
if np.any(mask):
......@@ -157,6 +250,18 @@ def load_imu_file(imu_file:str):
return df, hdr
def load_imu_files(f_list:list):
"""
Appends the output for load_imu_file
Arguments
---------
f_list : list
list of bioharness files to read
Returns
-------
pandas.DataFrame, list
"""
data, hdr = [], []
tmp = []
for f in f_list:
......@@ -168,14 +273,26 @@ def load_imu_files(f_list:list):
return data_df, hdr
def load_e4_file(e4_file:str):
''' First row is the initial time of the session as unix time.
Second row is the sample rate in Hz'''
"""Loads BVP data from the specified zip compressed e4 file and the start
time and sampling frequency as a dict.
Attributes
----------
e4_file : str
.zip e4 filename to load
Returns
-------
pandas.DataFrame, dict
"""
zip_file = ZipFile(e4_file)
dfs = {csv_file.filename: pd.read_csv(zip_file.open(csv_file.filename)
,header=None)
for csv_file in zip_file.infolist()
if csv_file.filename.endswith('.csv')}
bvp = dfs["BVP.csv"]
# First row is the initial time of the session as unix time.
# Second row is the sample rate in Hz
t0 = bvp.iloc[0].values[0]
fs = bvp.iloc[1].values[0]
nsamples = len(bvp) - 2
......@@ -198,6 +315,18 @@ def load_e4_file(e4_file:str):
return bvp, hdr
def load_e4_files(f_list:list):
"""
Appends the output for load_e4_file
Arguments
---------
f_list : list
list of e4 files to read
Returns
-------
pandas.DataFrame, list
"""
tmp = []
data = []
hdr = []
......@@ -211,6 +340,20 @@ def load_e4_files(f_list:list):
# Synchronising data
def sync_to_ref(df0, df1):
"""
Synchronises both DataFrames
Arguments
---------
df0 : pandas.DataFrame
data to sync
df1 : pandas.DataFrame
data to sync
Returns
-------
pandas.DataFrame, pandas.DataFrame
"""
dsync0 = DataSynchronizer()
dsync1 = DataSynchronizer()
......@@ -224,80 +367,6 @@ def sync_to_ref(df0, df1):
return dsync0.sync_df(df0), dsync1.sync_df(df1)
def pss_br_calculations(win, pss_df=None, br_df=None):
n_out = 5
if win[-1] == 0: return [None]*n_out
dsync = DataSynchronizer()
pss_fs = BR_FS
pss_col = [col for col in pss_df.columns.values if\
'breathing' in col.lower()][0]
pss_ms = pss_df['ms'].values
br_ms = br_df['ms'].values
t0, t1 = pss_ms[win][0], pss_ms[win][-1]
diff = pss_ms[win][1:] - pss_ms[win][:-1]
mask = np.abs(diff/1e3) > 60
diff_chk = np.any(mask)
if diff_chk: return [None]*n_out
# Get pressure estimate for window
pss_win = pss_df.iloc[win]
pss_data = pss_win[pss_col]
pss_filt = pressure_signal_processing(pss_data, fs=pss_fs)
xf, yf = do_pad_fft(pss_filt, fs=pss_fs)
pss_est = xf[yf.argmax()]*60
# Sync and get summary br output
dsync.set_bounds(br_ms, t0, t1)
br_win = dsync.sync_df(br_df)
br_out = np.median(br_win['BR'].values)
# Get subject and condition
sbj_out = pss_win['subject'].values[0]
time_out = np.median(pss_win['sec'].values)
return time_out, pss_est, br_out, sbj_out, cond_out
def get_pss_br_estimates(pss_df, br_df, window_size=12, window_shift=1):
pss_fs = BR_FS
# pss_col = [col for col in pss_df.columns.values if\
# 'breathing' in col.lower()][0]
pss_ms = pss_df['sec'].values
br_ms = br_df['sec'].values
inds = np.arange(0, len(pss_ms))
vsw_out = vsw(inds, len(inds), sub_window_size=int(window_size*pss_fs),
stride_size=int(window_shift*pss_fs))
# dsync = DataSynchronizer()
pss_est, br_out = [], []
cond_out, sbj_out = [], []
func = partial(pss_br_calculations, pss_df=pss_df, br_df=br_df)
# for i, win in enumerate(vsw_out):
# tmp = func(win)
with Pool(cpu_count()) as p:
tmp = p.map(func, vsw_out)
time_out, pss_est, br_out, sbj_out, cond_out = zip(*tmp)
time_array = np.array(time_out)
pss_est_array = np.array(pss_est)
br_out_array = np.array(br_out)
sbj_out_array = np.array(sbj_out)
cond_out_array = np.array(cond_out)
df = pd.DataFrame(
np.array(
[time_array, sbj_out_array, cond_out_array,
pss_est_array, br_out_array]
).T,
columns=['ms', 'subject', 'condition', 'pss', 'br'])
df.dropna(inplace=True)
return df
# Multiprocessing task for windowing dataframe
def imu_df_win_task(w_inds, df, i, cols):
time = df['sec'].values
......@@ -310,11 +379,6 @@ def imu_df_win_task(w_inds, df, i, cols):
if diff_chk:
return
# sbj = w_df['subject'].values.astype(int)
# sbj_mask = np.any((sbj[1:] - sbj[:-1])>0)
# if sbj_mask:
# return
if cols is None:
cols = IMU_COLS
......@@ -343,51 +407,6 @@ def imu_df_win_task(w_inds, df, i, cols):
return x_out, y_out
def bvp_df_win_task(w_inds, df, i, cols):
time = df['sec'].values
fs = PPG_FS
if w_inds[-1] == 0: return
w_df = df.iloc[w_inds]
t0, t1 = time[w_inds][0], time[w_inds][-1]
diff = time[w_inds[1:]] - time[w_inds[0:-1]]
mask = np.abs(diff)>20
diff_chk = np.any(mask)
if diff_chk:
return
# sbj = w_df['subject'].values.astype(int)
# sbj_mask = np.any((sbj[1:] - sbj[:-1])>0)
# if sbj_mask:
# return
if cols is None:
cols = ['bvp']
data = w_df[cols].values
# DSP
sd_data = (data - np.mean(data, axis=0))/np.std(data, axis=0)
filt_data = bvp_signal_processing(sd_data.copy(), fs)
x_out = pd.DataFrame(filt_data,
columns=cols)
sm_out = w_df['BR'].values
ps_out = w_df['PSS'].values
x_vec_time = np.median(time[w_inds])
ps_freq = int(get_max_frequency(ps_out, fs=fs))
y_tmp = np.array([x_vec_time, np.nanmedian(sm_out), ps_freq])
x_out['sec'] = x_vec_time
x_out['id'] = i
y_out = pd.DataFrame([y_tmp], columns=['sec', 'br', 'pss'])
return x_out, y_out
def df_win_task(w_inds, df, i, cols):
time = df['sec'].values
if w_inds[-1] == 0: return
......@@ -588,7 +607,7 @@ def load_tsfresh(xsens_df, project_dir,
pd.DataFrame
"""
raise NotImplementedError("To be implemented")
# raise NotImplementedError("To be implemented")
assert data_cols is not None, "invalid selection for data columns"
pkl_file = join(project_dir, 'tsfresh.pkl')
......@@ -975,12 +994,6 @@ def imu_rr_dsp(subject,
# include standing or not
test_df_tmp = get_test_data(cal_df, activity_df, xsens_df, test_standing)
test_df = pd.concat([df for df in test_df_tmp['data']], axis=0)
x_test_df, y_test_df = get_df_windows(test_df,
imu_df_win_task,
window_size=window_size,
window_shift=window_shift,
fs=fs,
)
acc_dsp_df, acc_y_dsp_df = get_df_windows(test_df, dsp_win_func,
window_size=window_size,
......@@ -1003,7 +1016,8 @@ def imu_rr_dsp(subject,
plt.plot(gyr_y_dsp_df[lbl_str]); plt.plot(gyr_dsp_df['pred'])
plt.show()
eval_handle = EvalHandler(y_test.flatten(), preds.flatten(), subject,
# TODO
eval_handle = DSPEvalHandler(y_test.flatten(), preds.flatten(), subject,
pfh, None, overwrite=overwrite)
eval_handle.update_eval_history()
eval_handle.save_eval_history()
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment