# -*- coding: utf-8 -*-
#
# Copyright Université Rennes 1 / INSERM
# Contributor: Raphael Weber
#
# Under CeCILL license
# http://www.cecill.info
"""
Module defining :class:`.ViSiAnnoT`
"""
from PyQt5 import QtCore
import pyqtgraph as pg
import numpy as np
from threading import Thread
import os
from time import sleep
from shutil import rmtree
from ..tools import pyqt_overlayer
from ..tools import pyqtgraph_overlayer
from ..tools import datetime_converter
from ..tools import data_loader
from ..tools.video_loader import get_data_video
from ..tools.audio_loader import get_audio_wave_info, convert_key_to_channel_id
from .components.Signal import Signal
from .components.SignalWidget import SignalWidget
from .components.MenuBar import MenuBar
from .components.ProgressWidget import ProgressWidget
from .components.VideoWidget import VideoWidget
from .components.CustomTemporalRangeWidget import CustomTemporalRangeWidget
from .components.TruncTemporalRangeWidget import TruncTemporalRangeWidget
from .components.FromCursorTemporalRangeWidget import \
FromCursorTemporalRangeWidget
from .components.LogoWidgets import ZoomInWidget, ZoomOutWidget, FullVisiWidget
from .components.AnnotEventWidget import AnnotEventWidget
from .components.AnnotImageWidget import AnnotImageWidget
from ..configuration import check_configuration
[docs]class ViSiAnnoT():
def __init__(
self,
video_dict,
signal_dict,
annotevent_dict={},
annotimage_list=[],
threshold_dict={},
interval_dict={},
y_range_dict={},
poswid_dict={},
layout_mode=1,
trunc_duration=(0, 0),
flag_long_rec=False,
from_cursor_list=[],
zoom_factor=2,
nb_ticks=10,
flag_annot_overlap=False,
annot_dir="Annotations",
flag_pause_status=False,
max_points=5000,
time_zone="Europe/Paris",
flag_infinite_loop=True,
bg_color=(244, 244, 244),
bg_color_plot=(255, 255, 255),
font_name="Times",
font_size=12,
font_size_title=16,
font_color=(0, 0, 0),
current_fmt="%Y-%m-%dT%H:%M:%S.%s",
range_fmt="%H:%M:%S.%s",
ticks_fmt="%H:%M:%S.%s",
ticks_color=(93, 91, 89),
ticks_size=12,
ticks_offset=5,
y_ticks_width=30,
nb_table_annot=5,
height_widget_signal=150
):
"""
Class defining the visualization and annotation GUI for a set of
synchronized video(s) and signal(s).
The constructor takes as arguments dictionaries with the path to the
video files and signal files. It calls the method
:meth:`.set_all_data` in order to load data and store them in
attributes.
For a given video file, data are loaded in an instance of
**cv2.VideoCapture**. The set of video data is stored in
:attr:`.video_data_dict`. The widgets for plotting video are stored in
:attr:`.wid_vid_dict`.
For a given signal file, data are loaded in an instance of
:class:`.Signal`. The supported formats are txt, mat, h5 and wav. The
set of :class:`.Signal` instances is stored in
:attr:`.sig_dict`. The widgets for plotting signals are stored in
:attr:`.wid_sig_list`.
The reference frequency :attr:`.ViSiAnnoT.fps` is defined as the video
frequency. If there is no video to display, :attr:`.ViSiAnnoT.fps` is
defined as the frequency of the first signal to plot. The playback
speed (both video and signal temporal cursor) is at the reference
frequency.
The temporal range is defined by :attr:`.first_frame` and
:attr:`.last_frame` (sampled at :attr:`.ViSiAnnoT.fps`).
The signal widgets display the signal between those bounds. So when
zooming in/out, the temporal range is modified and then the display is
updated with the method :meth:`.update_signal_plot`.
The playback is managed with two separate threads:
- Reading next video frame - an instance of threading.Thread with the
method :meth:`.update_video_frame` as target,
- Updating plot - an instance of QtCore.QTimer connected to the method
:meth:`.update_plot`.
The current position in the video file (i.e. the current position of
the temporal cursor) is :attr:`.frame_id` (sampled at
:attr:`.ViSiAnnoT.fps`).
:param video_dict: video configuration, each item corresponds to one
camera. Key is the camera ID (string). Value is a configuration
list with 4 elements:
- (*str*) Path to the video file,
- (*str*) Delimiter to get beginning datetime in the video file
name,
- (*int*) Position of the beginning datetime in the video file
name, according to the delimiter,
- (*str*) Format of the beginning datetime in the video file name
(either ``"posix"`` or a format compliant with
``datetime.strptime()``).
:type video_dict: dict
:param signal_dict: signal configuration, each item corresponds to one
signal widget. Key is the widget ID (Y axis label, string). Value
is a nested list of signal configurations. Each element of the
nested list corresponds to one signal plot and is a configuration
list of 7 elements:
- (*str*) Path to the signal file, data must be stored in a 1D
array if regularly sampled, otherwise in a 2D array (where first
column is the timestamp in milliseconds and the second column the
signal value)
- (*str*) Delimiter to get beginning datetime in the signal file
name,
- (*int*) Position of the beginning datetime in the signal file
name, according to the delimiter,
- (*str*) Format of the beginning datetime in the signal file name
(either ``"posix"`` or a format compliant with
``datetime.strptime()``),
- (*str*) Key to access the data (in case of .mat or .h5 file),
- (*int* or *float* or *str*) Signal frequency, set it to ``0`` if
signal non regularly sampled, set it to ``-1`` if same frequency
as :attr:`.ViSiAnnoT.fps`, it may be a string with the path to
the frequency attribute in a .h5 file - in case of 2D data with
several value columns, then the column index must be specified,
e.g. ``"key - 1"`` or ``"key - colName"`` if there is an
attribute at ``key`` named ``columns`` with columns name being
comma-separated (first column is always the timestamps),
- (*dict*) Plot style, see
https://pyqtgraph.readthedocs.io/en/latest/graphicsItems/plotdataitem.html
for details, set it to ``None`` for default.
See :ref:`signal` for details and examples.
:type signal_dict: dict
:param annotevent_dict: events annotation configuration,
key is the label (string), value is the associated color (RGBA)
:type annotevent_dict: dict
:param annotimage_list: labels for image extraction
:type annotimage_list: list
:param threshold_dict: threshold configuration. Each item corresponds
to a signal widget on which to plot threshold(s). The key must be
the same as in ``signal_dict``. Value is a list of configuration
lists. This is a nested list because there can be several
thresholds plotted in the same widget. A configuration list has 2
elements:
- (*int* or *float*) Threshold value on Y axis,
- (*tuple* or *list*) Plot color (RGB).
:type threshold_dict: dict
:param interval_dict: interval configuration. Each item corresponds to
a signal widget on which to plot intervals. The key must be the
same as in ``signal_dict``. Value is a nested list of interval
configurations. Each element of the nested list corresponds to one
type of interval to be plotted on the same signal widget and is a
configuration list of 7 elements:
- (*str*) Path to the interval file, data can be stored as a 2D
array (where each line has 2 elements: start and stop frames) or
a 1D array (time series of 0 and 1),
- (*str*) Delimiter to get beginning datetime in the interval file
name,
- (*int*) Position of the beginning datetime in the interval file
name, according to the delimiter,
- (*str*) Format of the beginning datetime in the interval file
name (either ``"posix"`` or a format compliant with
``datetime.strptime()``),
- (*str*) Key to access the data (in case of .mat or .h5 file),
- (*int*) Signal frequency, set it to ``-1`` if same frequency as
:attr:`.ViSiAnnoT.fps`, it may be a string with the path to the
frequency attribute in a .h5 file,
- (*tuple* or *list*) Plot color (RGBA).
:type interval_dict: dict
:param y_range_dict: visible Y range for signal widgets, each item
corresponds to a signal widget. The key must be the same as in
``signal_dict``. Value is a list/tuple of length 2 with the min and
max values to display on the Y axis. The signal widgets that are
not specified in this dictionary have auto-range enabled for Y
axis.
:type y_range_dict: dict
:param poswid_dict: custom position of the widgets in the window to use
the positions defined by the layout mode (see input
``layout_mode``). Value is a tuple of length 2 ``(row, col)``
or 4 ``(row, col, rowspan, colspan)``. Key identifies the widget:
- ``"logo"``
- ``"select_trunc"``
- ``"select_manual"``
- ``"select_from_cursor"``
- ``"annot_event"``
- ``"annot_image"``
- ``"visi"``
- ``"zoomin"``
- ``"zoomout"``
- ``"progress"``
:type poswid_dict: dict
:param layout_mode: layout mode of the window for positioning the
widgets, one of the following:
- ``1`` (focus on video, works better with a big screen),
- ``2`` (focus on signal, suitable for a laptop screen),
- ``3`` (compact display with some features disabled).
:type layout_mode: int
:param trunc_duration: (tool for fast navigation) duration
``(min, sec)`` to be used for splitting video/file in the combo box
of temporal range selection. For example, for a video of 30
minutes, ``trunc_duration=(10, 0)`` will provide 3 temporal ranges
in the combo box: from 0 to 10 minutes, from 10 to 20 minutes and
from 20 to 30 minutes.
:type trunc_duration: list
:param flag_long_rec: specify if :class:`.ViSiAnnoT` is launched in the
context of :class:`.ViSiAnnoTLongRec` (long recording)
:type flag_long_rec: bool
:param from_cursor_list: (tool for fast navigation) list of durations
that are available in the combo box to select a temporal range
duration in order to display a new temporal range that will begin
at the current position of the temporal cursor. Each element is a
tuple of length 2 ``(min, sec)``. An example:
``[[0,30],[1,0],[2,0],[3,0],[4,0],[5,0]]``.
:type from_cursor_list: list
:param zoom_factor: zoom factor
:type zoom_factor: int
:param nb_ticks: number of temporal ticks on the X axis of the signals
widgets
:type nb_ticks: int
:param flag_annot_overlap: specify if overlap of events annotations is
enabled
:type flag_annot_overlap: bool
:param annot_dir: directory where to save annotations, automatically
created if it does not exist
:type annot_dir: str
:param flag_pause_status: specify if the video is paused when launching
:class:`.ViSiAnnoT`
:type flag_pause_status: bool
:param max_points: maximum number of points to plot for the signals
:type max_points: int
:param time_zone: time zone (compliant package pytz)
:type time_zone: str
:param flag_infinite_loop: specify if an infinite loop is set after
creating the window. Set it to ``False`` if several
:class:`.ViSiAnnoT` windows must be displayed simultaneousely, do
not forget to store each instance of :class:`.ViSiAnnoT` in a
variable and to set manually the infinite loop with
:func:`.infinite_loop_gui`
:type flag_infinite_loop: bool
:param bg_color: backgroud color of the GUI, RGB or HEX string
:type bg_color: tuple or str
:param bg_color_plot: background color of the signal plots, RGB or HEX
string
:type bg_color_plot: tuple or str
:param font_name: font used for the text in the GUI (must be available
in PyQt5)
:type font_name: str
:param font_size: font size of the text in the GUI
:type font_size: int
:param font_size_title: font size of the titles in the GUI (progress
bar and video widgets)
:type font_size_title: int
:param font_color: font color of the text in the GUI, RGB
:type font_color: tuple
:param current_fmt: datetime string format of the current temporal
position in progress bar, see keyword argument ``fmt`` of
:func:`.convert_datetime_to_string`
:type current_fmt: str
:param range_fmt: datetime string format of the temporal range
duration in progress bar, see keyword argument ``fmt`` of
:func:`.convert_datetime_to_string`
:type range_fmt: str
:param ticks_fmt: datetime string format of X axis ticks text, see
keyword argument ``fmt`` of :func:`.convert_datetime_to_string`
:type ticks_fmt: str
:param ticks_color: color of the ticks in the signal plots, RGB or HEX
string
:type ticks_color: tuple or str
:param ticks_size: size of the ticks values in the signal plots
:type ticks_size: int
:param ticks_offset: offset between the ticks and associated values in
the signal plots
:type ticks_offset: int
:param y_ticks_width: horizontal space in pixels for the text of Y axis
ticks in signal widgets
:type y_ticks_width: int
:param nb_table_annot: maximum number of labels in a row in the
widgets for events annotation and image annotation
:type nb_table_annot: int
:param height_widget_signal: minimum height in pixel of the signal
widgets
:type height_widget_signal: int
"""
# check input dictionaries are empty
if not any(video_dict) and not any(signal_dict):
raise Exception("Input dictionaries are empty")
# ******************************************************************* #
# *********************** miscellaneous ***************************** #
# ******************************************************************* #
#: (*str*) Datetime string format of the text of X axis ticks
self.ticks_fmt = ticks_fmt
#: (*int*) Number of temporal ticks on the X axis of the
#: signals plots
self.nb_ticks = nb_ticks
#: (*str*) Time zone (as in package pytz)
self.time_zone = time_zone
#: (*int*) Maximum number of points to plot for the signals
self.max_points = max_points
#: (*list*) Default plot styles for signals on a single widget
#: (length 10)
self.plot_style_list = [
{'pen': {'color': 'b', 'width': 1}},
{'pen': {'color': 'r', 'width': 1}},
{'pen': {'color': 'g', 'width': 1}},
{'pen': {'color': 'm', 'width': 1}},
{'pen': {'color': 'c', 'width': 1}},
{'pen': {'color': 'y', 'width': 1}},
{'pen': {'color': 'k', 'width': 1}},
{'pen': {'color': '#FFCCFF', 'width': 1}},
{'pen': {'color': '#00CCCC', 'width': 1}},
{'pen': {'color': '#4C9900', 'width': 1}}
]
#: (*str*) Directory where the events annotations and extracted images
#: are saved
self.annot_dir = annot_dir
# ******************************************************************* #
# ************************ long recordings ************************** #
# ******************************************************************* #
#: (*bool*) Specify if :class:`.ViSiAnnoT` is launched in the context
#: of :class:`.ViSiAnnoTLongRec`
self.flag_long_rec = flag_long_rec
#: (*int*) ID of the current video/signal file in case of long
#: recordings
#:
#: If :attr:`.flag_long_rec` is ``False``, then :attr:`.ite_file` is
#: always equal to 0.
self.ite_file = 0
#: (*int*) Number of files for reference modality in case of long
#: recording
#:
#: If :attr:`.flag_long_rec` is ``False``, then :attr:`.nb_files` is
#: set to 1.
self.nb_files = 1
# ******************************************************************* #
# *************************** data ********************************** #
# ******************************************************************* #
# initialize attributes that are set in the method set_all_data
#: (*dict*) Video data, each item corresponds to one camera
#:
#: Key is the camera ID (same keys as ``video_dict``, positional
#: argument of the constructor of :class:`.ViSiAnnoT`).
#:
#: Value is an instance of **cv2.VideoCapture** containing the video
#: data
self.video_data_dict = {}
#: (*dict*) Signal data, each item corresponds to a signal widget
#:
#: Key is the data type (same keys as ``signal_dict``, positional
#: argument of the constructor), used as label of the Y axis of the
#: corresponding widget.
#:
#: Value is a list of instances of :class:`.Signal` to plot on the
#: corresponding widget
self.sig_dict = None
#: (*dict*) Intervals to plot on signals, each item corresponds to one
#: signal widget
#:
#: Key is the data type of the signal widget on which to plot (same as
#: in positional argument ``signal_dict`` of the constructor of
#: :class:`.ViSiAnnoT`)
#:
#: Value is a list of lists, so that several intervals files can be
#: plotted on the same signal widget. Each sub-list has 3 elements:
#:
#: - (*numpy array*) Intervals data, shape :math:`(n_{intervals}, 2)`
#: - (*float*) Frequency (``0`` if timestamps, ``-1`` if same as
#: signal)
#: - (*tuple*) Plot color (RGBA)
self.interval_dict = {}
# check if not long recording => create attribute of reference
# frequency (otherwise already set in ViSiAnnoTLongRec)
if not self.flag_long_rec:
#: (*int*) Frequency of the video (or the first signal if there is
#: no video), it is the reference frequency
self.fps = None
#: (*int*) Number of frames in the video (or the first signal if there
#: is no video)
self.nframes = None
#: (*datetime.datetime*) Beginning datetime of the video (or the first
#: signal if there is no video)
self.beginning_datetime = None
# set data
self.set_all_data(video_dict, signal_dict, interval_dict)
#: (*dict*) Thresholds to plot on signals widgets, each item
#: corresponds to one signal widget
#:
#: Key is the data type of the signal widget on which to plot (same as
#: in positional argument ``signal_dict`` of the constructor of
#: :class:`.ViSiAnnoT`)
#:
#: Value is a list of length 2:
#:
#: - (*float*) Value of the threshold on Y axis
#: - (*tuple*) Color to plot (RGB), it can also be a string with HEX
#: color
self.threshold_dict = threshold_dict
# ******************************************************************* #
# ***************************** zoom ******************************** #
# ******************************************************************* #
#: (*int*) Start position (frame number) for custom manual zoom (set to
#: -1 if not defined)
self.zoom_pos_1 = -1
#: (*int*) End position (frame number) for custom manual zoom (set to
#: -1 if not defined)
self.zoom_pos_2 = -1
#: (*list*) Instances of pyqtgraph.LinearRegionItem with all the grey
#: regions for custom manual zoom
self.region_zoom_list = []
#: (*list*) Instances of pyqtgraph.TextItem with the duration of the
#: custom manual zoom
#:
#: Same length and order as :attr:`.ViSiAnnoT.wid_sig_list`, so that
#: one element corresponds to one signal widget
self.region_zoom_text_item_list = []
# ******************************************************************* #
# ****************************** time ******************************* #
# ******************************************************************* #
#: (*bool*) Specify if the video is paused
self.flag_pause_status = flag_pause_status
#: (*int*) Index of the current frame
self.frame_id = 0
#: (*int*) First frame that is displayed in the signal plots
self.first_frame = 0
#: (*int*) Last frame that is displayed in the signal plots
#:
#: Actually, the last frame that is displayed is
#: ``last_frame` - 1``, because of zero-indexation.
self.last_frame = self.nframes
#: (*bool*) Specify if the window is running
self.flag_processing = True
# ******************************************************************* #
# *********************** layout definition ************************* #
# ******************************************************************* #
# define window organization if none provided
if not any(poswid_dict):
nb_video = len(video_dict)
# check layout mode
if layout_mode == 1:
for ite_video, video_id in enumerate(video_dict.keys()):
poswid_dict[video_id] = (0, ite_video, 5, 1)
poswid_dict['select_trunc'] = (0, nb_video, 1, 2)
poswid_dict['select_manual'] = (1, nb_video, 1, 3)
poswid_dict['select_from_cursor'] = (0, nb_video + 2)
poswid_dict['annot_event'] = (2, nb_video, 1, 3)
poswid_dict['annot_image'] = (3, nb_video, 1, 3)
poswid_dict['visi'] = (4, nb_video)
poswid_dict['zoomin'] = (4, nb_video + 1)
poswid_dict['zoomout'] = (4, nb_video + 2)
poswid_dict['progress'] = (5, 0, 1, nb_video + 3)
elif layout_mode == 2:
for ite_video, video_id in enumerate(video_dict.keys()):
poswid_dict[video_id] = (0, ite_video, 4, 1)
poswid_dict['select_trunc'] = (1, nb_video, 1, 2)
poswid_dict['select_manual'] = (2, nb_video, 1, 3)
poswid_dict['select_from_cursor'] = (1, nb_video + 2)
poswid_dict['annot_event'] = (0, nb_video + 3, 4, 1)
poswid_dict['annot_image'] = (0, nb_video, 1, 3)
poswid_dict['visi'] = (3, nb_video)
poswid_dict['zoomin'] = (3, nb_video + 1)
poswid_dict['zoomout'] = (3, nb_video + 2)
poswid_dict['progress'] = (4, 0, 1, nb_video + 4)
elif layout_mode == 3:
for ite_video, video_id in enumerate(video_dict.keys()):
poswid_dict[video_id] = (0, ite_video, 2, 1)
poswid_dict['annot_event'] = (0, nb_video, 2, 1)
poswid_dict['annot_image'] = (0, nb_video + 1, 1, 3)
poswid_dict['visi'] = (1, nb_video + 1)
poswid_dict['zoomin'] = (1, nb_video + 2)
poswid_dict['zoomout'] = (1, nb_video + 3)
poswid_dict['progress'] = (2, 0, 1, nb_video + 4)
else:
raise Exception(
"No layout configuration given - got mode %d, "
"must be 1, 2 or 3" % layout_mode
)
# ******************************************************************* #
# ********************* display initialization ********************** #
# ******************************************************************* #
# ************** create GUI application and set font **************** #
#: (*QtWidgets.QApplication*) GUI initializer
self.app = pyqtgraph_overlayer.initialize_gui_and_bg_color(
color=bg_color_plot
)
# set style sheet
pyqt_overlayer.set_style_sheet(
self.app, font_name, font_size, font_color, [
"QGroupBox", "QComboBox", "QPushButton", "QRadioButton",
"QLabel", "QCheckBox", "QDateTimeEdit", "QTimeEdit"
]
)
# get default font for titles in pyqtgraph
font_default_title = {
"color": font_color, "size": "%dpt" % font_size_title
}
font_default_axis_label = {
"color": font_color, "font-size": "%dpt" % font_size_title
}
# ****************** create window and layout *********************** #
#: (*QtWidgets.QWidget*) Window container
self.win = None
#: (*QtWidgets.QGridLayout*) layout filling the window
self.lay = None
# create window
self.win, self.lay = pyqt_overlayer.create_window(
title="ViSiAnnoT", bg_color=bg_color
)
# listen to the callback method (keyboard interaction)
self.win.keyPressEvent = self.key_press
self.win.keyReleaseEvent = self.key_release
# ************************** menu *********************************** #
#: (:class:`.MenuBar`) Menu bar item, instance of a sub-class of
#: **QtWidgets.QMenuBar**, by default it is hidden, see
#: :meth:`.key_release` for the keyword shortcut for displaying it
self.menu_bar = MenuBar(self.win, self.lay)
# *************** widget for truncated temporal range *************** #
if len(self.sig_dict) > 0 and "select_trunc" in poswid_dict.keys():
# check trunc duration
if trunc_duration[0] == 0 and trunc_duration[1] == 0:
print(
"Duration of truncated temporal range is 0 => widget not "
"created"
)
self.wid_trunc = None
else:
#: (:class:`.TruncTemporalRangeWidget`) Widget for selecting a
#: truncated temporal range
self.wid_trunc = TruncTemporalRangeWidget(
self, poswid_dict['select_trunc'], trunc_duration
)
else:
self.wid_trunc = None
# **************** widget for custom temporal range ***************** #
if len(self.sig_dict) > 0 and "select_manual" in poswid_dict.keys():
#: (:class:`.CustomTemporalRangeWidget`) Widget for defining a
#: custom temporal range
self.wid_time_edit = CustomTemporalRangeWidget(
self, poswid_dict["select_manual"]
)
else:
self.wid_time_edit = None
# *************** widget for temporal re-scaling ******************** #
if len(self.sig_dict) > 0 and \
"select_from_cursor" in poswid_dict.keys() and \
len(from_cursor_list) > 0:
#: (:class:`.FromCursorTemporalRangeWidget`) Widget for selecting
#: a duration of temporal range to be started at the current frame
self.wid_from_cursor = FromCursorTemporalRangeWidget(
self, poswid_dict["select_from_cursor"], from_cursor_list
)
else:
self.wid_from_cursor = None
# ********************** progress bar ******************************* #
if "progress" in poswid_dict.keys():
#: (:class:`.ProgressWidget`) Widget containing the progress bar
self.wid_progress = ProgressWidget(
self, poswid_dict['progress'], title_style=font_default_title,
ticks_color=ticks_color, ticks_size=ticks_size,
ticks_offset=ticks_offset, nb_ticks=self.nb_ticks,
current_fmt=current_fmt, range_fmt=range_fmt,
ticks_fmt=self.ticks_fmt
)
else:
raise Exception(
"No widget position given for the progress bar => "
"add key 'progress' to positional argument poswid_dict"
)
# ************************ video widgets **************************** #
#: (*dict*) Video widgets, each item corresponds to one camera
#:
#: Key is the camera ID (same keys as the positional argument
#: ``video_dict`` of the constructor of :class:`.ViSiAnnoT`).
#:
#: Value is an instance of :class:`.VideoWidget` where the
#: corresponding video is displayed.
self.wid_vid_dict = {}
# loop on cameras
for video_id, (video_path, _, _, _) in video_dict.items():
# check if widget position exists
if video_id in poswid_dict.keys():
# create widget
self.wid_vid_dict[video_id] = VideoWidget(
self.lay, poswid_dict[video_id], video_path,
**font_default_title
)
# initialize image
self.wid_vid_dict[video_id].setAndDisplayImage(
self.video_data_dict[video_id], self.frame_id
)
# *********************** signal widgets **************************** #
#: (*list*) Signal widgets, each element is an instance of
#: :class:`.SignalWidget` (same order as :attr:`.sig_dict`)
self.wid_sig_list = []
if len(self.sig_dict) > 0:
# create signal widgets and initialize signal plots
self.init_signal_plot(
poswid_dict['progress'], y_range_dict=y_range_dict,
left_label_style=font_default_axis_label,
ticks_color=ticks_color, ticks_size=ticks_size, ticks_offset=2,
y_ticks_width=y_ticks_width, wid_height=height_widget_signal
)
# *********************** zoom widgets ****************************** #
if len(self.sig_dict) > 0 and "visi" in poswid_dict.keys():
#: (:class:`.FullVisiWidget`) Widget for zooming out to the full
#: temporal range
self.wid_visi = FullVisiWidget(
self, poswid_dict["visi"], "visibility"
)
else:
self.wid_visi = None
if len(self.sig_dict) > 0 and "zoomin" in poswid_dict.keys():
#: (:class:`.ZoomInWidget`) Widget for zooming in
self.wid_zoomin = ZoomInWidget(
self, poswid_dict["zoomin"], "zoomin", zoom_factor=zoom_factor
)
else:
self.wid_zoomin = None
if len(self.sig_dict) > 0 and "zoomout" in poswid_dict.keys():
#: (:class:`.ZoomOutWidget`) Widget for zooming out
self.wid_zoomout = ZoomOutWidget(
self, poswid_dict["zoomout"], "zoomout",
zoom_factor=zoom_factor
)
else:
self.wid_zoomout = None
# ******************* events annotation widget ********************** #
if len(annotevent_dict) > 0:
if "annot_event" in poswid_dict.keys():
#: (:class:`.AnnotEventWidget`) Widget for events annotation
self.wid_annotevent = AnnotEventWidget(
self, poswid_dict["annot_event"], annotevent_dict,
annot_dir, flag_annot_overlap=flag_annot_overlap,
nb_table=nb_table_annot
)
else:
raise Exception(
"No widget position given for the events annotation => "
"add key 'annot_event' to positinal argument poswid_dict"
)
else:
self.wid_annotevent = None
# ******************* image annotation widget *********************** #
if len(annotimage_list) > 0:
if "annot_image" in poswid_dict.keys():
# check layout mode
if layout_mode == 3:
flag_horizontal = False
else:
flag_horizontal = True
#: (:class:`.AnnotImageWidget`) Widget for image extraction
self.wid_annotimage = AnnotImageWidget(
self, poswid_dict["annot_image"], annotimage_list,
annot_dir, nb_table=nb_table_annot,
flag_horizontal=flag_horizontal
)
else:
raise Exception(
"No widget position given for the image annotation => "
"add key 'annot_image' to positinal argument poswid_dict"
)
else:
self.wid_annotimage = None
# ******************************************************************* #
# ************** thread for getting videos images ******************* #
# ******************************************************************* #
if any(self.video_data_dict):
#: (*threading.Thread*) Thread for getting video frames, connected
#: to the method :meth:`.ViSiAnnoT.update_video_frame`
self.update_frame_thread = Thread(target=self.update_video_frame)
self.update_frame_thread.start()
# ******************************************************************* #
# ************************ plot timer ******************************* #
# ******************************************************************* #
#: (*QtCore.QTimer*) Thread for updating the current frame position,
#: connected to the method :meth:`.ViSiAnnoT.update_plot`
self.timer = QtCore.QTimer()
self.timer.timeout.connect(self.update_plot)
self.timer.start(int(1000 / self.fps))
# ******************************************************************* #
# *********************** infinite loop ***************************** #
# ******************************************************************* #
# this does not apply in case of long recording, because there are
# other stuff to do after calling this constructor
if not self.flag_long_rec and flag_infinite_loop:
pyqt_overlayer.infinite_loop_gui(self.app)
# close streams, delete temporary folders
self.stop_processing()
# *********************************************************************** #
# Group: Methods for conversion between milliseconds and frame number
# *********************************************************************** #
[docs] def get_frame_id_in_ms(self, frame_id):
"""
Converts a frame number to milliseconds
:param frame_id: frame number sampled at the reference frequency
:attr:`.ViSiAnnoT.fps`
:type frame_id: int
:returns: frame number in milliseconds
:rtype: float
"""
return 1000 * frame_id / self.fps
[docs] def convert_ms_to_frame_ref(self, frame_ms):
"""
Converts milliseconds to frame number sampled at the reference
frequency :attr:`.ViSiAnnoT.fps`
:param frame_ms: frame number in milliseconds
:type frame_ms: float
:returns: frame number sampled at the reference frequency
:attr:`.ViSiAnnoT.fps`
:rtype: int
"""
return int(frame_ms * self.fps / 1000)
[docs] def get_current_range_in_ms(self):
"""
Converts the current temporal range defined by
:attr:`.ViSiAnnoT.first_frame` and :attr:`.ViSiAnnoT.last_frame` to
milliseconds
:returns:
- **first_frame_ms** (*int*) -- first frame of the current temporal
range in milliseconds
- **last_frame_ms** (*int*) -- last frame of the current temporal
range in milliseconds
"""
return self.get_frame_id_in_ms(self.first_frame), \
self.get_frame_id_in_ms(self.last_frame)
# *********************************************************************** #
# End group
# *********************************************************************** #
# *********************************************************************** #
# Group: Methods for displaying video, signals and progress bar
# *********************************************************************** #
[docs] def init_signal_plot(
self, progbar_wid_pos, y_range_dict={}, **kwargs
):
"""
Creates the signal widgets and initializes the signal plots
The widgets are automatically positioned below the progress bar.
It sets the attribute :attr:`.ViSiAnnoT.wid_sig_list`.
Make sure the attributes :attr:`.ViSiAnnoT.lay`,
:attr:`.ViSiAnnoT.sig_dict`, :attr:`.ViSiAnnoT.threshold_dict` and
:attr:`.ViSiAnnoT.interval_dict` are defined before calling this
method.
:param progbar_wid_pos: position of the progress bar widget, length 2
``(row, col)`` or 4 ``(row, col, rowspan, colspan)``
:type progbar_wid_pos: tuple of list
:param y_range_dict: visible Y range for signal widgets, see positional
argument ``y_range_dict`` of :class:`.ViSiAnnoT` constructor
:type y_range_dict: dict
:param kwargs: keyword arguments of the constructor of
:class:`.SignalWidget`, except ``y_range`` and ``left_label``
"""
# get current range in milliseconds
first_frame_ms, last_frame_ms = self.get_current_range_in_ms()
# convert progress bar widget position to a list
pos_sig = list(progbar_wid_pos)
# signal widget position is defined relatively to progress bar
# widget position
pos_sig[0] += 1
# create scroll area
scroll_lay, _ = pyqt_overlayer.add_scroll_area(
self.lay, pos_sig, flag_ignore_wheel_event=True
)
# loop on signals
for ite_sig, (signal_id, sig_list) in \
enumerate(self.sig_dict.items()):
# get Y range
if signal_id in y_range_dict.keys():
y_range = y_range_dict[signal_id]
# check number of elements in yrange configuration
check_configuration(signal_id, y_range, "YRange")
else:
y_range = []
# get list of intervals to plot in the signal widget
if signal_id in self.interval_dict.keys():
interval_list = self.interval_dict[signal_id]
else:
interval_list = []
# get list of thresholds to plot in the signal widget
if signal_id in self.threshold_dict.keys():
threshold_list = self.threshold_dict[signal_id]
# check number of elements in threshold configuration
check_configuration(signal_id, threshold_list, "Threshold")
else:
threshold_list = []
# create widget
wid = SignalWidget(
self, pos_sig, y_range=y_range, left_label=signal_id, **kwargs
)
# create plot items in the signal widget
wid.createPlotItems(
first_frame_ms, last_frame_ms, sig_list, interval_list,
threshold_list
)
# set temporal ticks and X axis range
pyqtgraph_overlayer.set_temporal_ticks(
wid, self.nb_ticks, (first_frame_ms, last_frame_ms),
self.beginning_datetime, fmt=self.ticks_fmt
)
# add widget to scroll area
scroll_lay.addWidget(wid)
# append widget to list of widgets
self.wid_sig_list.append(wid)
# reconnect to key_press event callback, so that keyPress events of
# scroll are ignored
wid.keyPressEvent = self.key_press
# get position of next signal widget
pos_sig[0] += 1
[docs] def update_video_frame(self):
"""
Reads the video stream (launched in a thread)
Called by the thread :attr:`.update_frame_thread`.
It updates the attribute :attr:`.wid_vid_dict` with the image at
the current frame for each camera.
"""
# check if the process still goes on
while self.flag_processing:
if not self.flag_pause_status:
# get image at the current frame for each camera
for video_id, data_video in self.video_data_dict.items():
self.wid_vid_dict[video_id].setImage(
data_video, self.frame_id
)
else:
sleep(0.001)
[docs] def stop_processing(self):
"""
Closes streams (elements of :attr:`.video_data_dict`) and
deletes temporary signal folder
It sets the value of :attr:`.flag_processing` to ``False`` so
that the thread :attr:`.update_frame_thread` is stopped.
"""
self.timer.stop()
self.app.quit()
self.flag_processing = False
# check if annotation directory exists
if os.path.isdir(self.annot_dir):
print("delete empty annotation folders/files if necessary")
# get list of files/folders in the annotation directory
annot_path_list = os.listdir(self.annot_dir)
# loop on annotation files/folders
for annot_file_name in annot_path_list:
# get path of file/folder
annot_path = "%s/%s" % (self.annot_dir, annot_file_name)
# split extension
name, ext = os.path.splitext(annot_file_name)
# check if directory of image annotation
if os.path.isdir(annot_path) and \
self.wid_annotimage is not None:
if annot_file_name in self.wid_annotimage.label_list:
if os.listdir(annot_path) == []:
rmtree(annot_path)
# check if file of events annotation
elif ext == ".txt":
# check if empty file
if os.path.getsize(annot_path) == 0:
# remove empty file
os.remove(annot_path)
# check if events annotation
if self.wid_annotevent is not None:
# update the list of files/folders in the annotation directory
annot_path_list = os.listdir(self.annot_dir)
# get file name of events annotation of protected label
protected_name = "%s_%s-datetime.txt" % (
self.wid_annotevent.file_name_base,
self.wid_annotevent.protected_label
)
# check if empty annotation directory (or only filled with
# events annotation of protected label)
if len(annot_path_list) == 0 or len(annot_path_list) == 1 and \
annot_path_list[0] == protected_name:
rmtree(self.annot_dir)
# close videos
print("close videos (if any)")
for data_video in self.video_data_dict.values():
if data_video is not None:
data_video.release()
# delete temporary files
if self.flag_long_rec:
print("delete temporary signal files")
rmtree(self.tmp_name, ignore_errors=True)
[docs] def update_plot(self):
"""
Updates (during playback) the displayed video frame and the plots of
the temporal cursor at the current frame :attr:`.frame_id`
It is called by the thread :attr:`.timer`.
The displayed video frame and the plots are updated by calling the
method :meth:`.plot_frame_id_position`.
It is only effective if :attr:`.flag_pause_status` is
``False``.
It increments the value of :attr:`.frame_id`.
"""
# update plot only if pause status is False
if not self.flag_pause_status:
# plot temporal cursor at the value of self.frame_id
self.plot_frame_id_position()
# increment current frame
self.frame_id += 1
# plot in a loop
# self.app.processEvents() # not sure about the usefullness of this
[docs] def update_frame_id(self, frame_id):
"""
Sets the value of current frame :attr:`.frame_id` and updates
the displayed video frame and the plots of the temporal cursor at new
current frame
The displayed video frame and the plots are updated by calling the
method :meth:`.plot_frame_id_position`.
:param frame_id: new current frame index
:type frame_id: int
"""
# update frame ID
self.frame_id = frame_id
# get image for each camera if pause status is true
if self.flag_pause_status:
for video_id, data_video in self.video_data_dict.items():
self.wid_vid_dict[video_id].setImage(
data_video, self.frame_id
)
# plot frame id position
self.plot_frame_id_position()
[docs] def plot_frame_id_position(self):
"""
Updates the displayed video frame and the plots of the temporal cursor
at the current frame position :attr:`.frame_id`
If :attr:`.frame_id` is out of the temporal range (defined by
:attr:`.first_frame` and :attr:`.last_frame`), then
the temporal range is updated. For example, in the context of long
recording, this might happen when navigating from one file to another.
If the temporal range is updated, then the method
:meth:`.update_signal_plot` is called in order to update the
signal plots with the new temporal range.
The attribute :attr:`.img_vid_dict` is set with the values in
:attr:`.im_dict` in order to update the displayed video
frame.
If the navigation point of the progress bar is not dragged, then the
attribute :attr:`.wid_progress` is modified in order to
update the position of the navigation point.
The attribute :attr:`.current_cursor_list` is set in order to
update the position of the temporal cursor in the signal widgets.
"""
# check if frame id overtakes the current range
if self.frame_id >= self.last_frame:
# check if frame id overtakes the reference file
if self.frame_id >= self.nframes:
if not self.flag_long_rec:
# single recording
self.frame_id = self.nframes - 1
else:
# long recordings => change file
ok = self.next_file()
if not ok:
self.frame_id = self.nframes - 1
else:
# get width of the current temporal range
temporal_width = self.last_frame - self.first_frame
# frame id is in the last temporal range window
if self.frame_id + temporal_width > self.nframes:
self.first_frame = self.nframes - temporal_width
self.last_frame = self.nframes
# frame id is in the next temporal range window
elif self.frame_id < self.last_frame + temporal_width:
self.first_frame = self.last_frame
self.last_frame = self.first_frame + temporal_width
else:
self.first_frame = self.frame_id
self.last_frame = self.first_frame + temporal_width
# update signals plot
self.update_signal_plot()
# check if frame_id undertakes the current range
elif self.frame_id < self.first_frame:
# check if frame undertakes the video
if self.frame_id < 0 and self.flag_long_rec:
# long recordings => change file
self.previous_file()
else:
# get width of the current temporal range
temporal_width = self.last_frame - self.first_frame
# frame id is the first temporal range window
if self.frame_id - temporal_width < 0:
self.first_frame = 0
self.last_frame = temporal_width
# frame id is in the previous temporal range window
elif self.frame_id > self.first_frame - temporal_width:
self.last_frame = self.first_frame
self.first_frame = self.last_frame - temporal_width
else:
self.last_frame = self.frame_id
self.first_frame = self.last_frame - temporal_width
# update signals plots
self.update_signal_plot()
# update temporal cursor
for wid in self.wid_sig_list:
wid.cursor.setPos(self.get_frame_id_in_ms(self.frame_id))
# update progress bar (if the progress bar is dragged, then there is no
# need to update it)
if not self.wid_progress.flag_dragged:
self.wid_progress.setProgressPlot(self.frame_id)
# set title of progress bar
self.wid_progress.updateTitle(self.fps, self.beginning_datetime)
# update video image
for video_id, wid_vid in self.wid_vid_dict.items():
wid_vid.displayImage()
[docs] def update_signal_plot(
self, flag_reset_combo_trunc=True, flag_reset_combo_from_cursor=True
):
"""
Updates the signal plots and the progress bar so that they span the
current temporal range defined by :attr:`.first_frame` and
:attr:`.last_frame`
:param flag_reset_combo_trunc: specify if the combo box of
:attr:`.wid_trunc` must be reset
:type flag_reset_combo_trunc: bool
:param flag_reset_combo_from_cursor: specify if the combo box of
:attr:`.wid_from_cursor` must be reset
:type flag_reset_combo_from_cursor: bool
"""
# reset combo boxes
if flag_reset_combo_trunc and self.wid_trunc is not None:
if self.wid_trunc.combo_box is not None:
self.wid_trunc.combo_box.setCurrentIndex(0)
if flag_reset_combo_from_cursor and self.wid_from_cursor is not None:
self.wid_from_cursor.combo_box.setCurrentIndex(0)
# set boundaries of progress bar with current temporal range
self.wid_progress.setBoundaries(self.first_frame, self.last_frame)
# update title of progress bar
self.wid_progress.updateTitle(self.fps, self.beginning_datetime)
# get current range in milliseconds
first_frame_ms, last_frame_ms = self.get_current_range_in_ms()
# update plots
for wid, (signal_id, sig_list) in zip(
self.wid_sig_list, self.sig_dict.items()
):
# check if there are intervals to plot
if signal_id in self.interval_dict.keys():
interval_list = self.interval_dict[signal_id]
else:
interval_list = []
# update plot items
wid.updatePlotItems(
first_frame_ms, last_frame_ms, sig_list, interval_list
)
# X axis ticks
pyqtgraph_overlayer.set_temporal_ticks(
wid, self.nb_ticks, (first_frame_ms, last_frame_ms),
self.beginning_datetime, fmt=self.ticks_fmt
)
# *********************************************************************** #
# End group
# *********************************************************************** #
# *********************************************************************** #
# Group: Methods for plotting region items (pyqtgraph.LinearRegionItem)
# *********************************************************************** #
[docs] def add_item_to_signals(self, item_list):
"""
Displays items in the signal widgets
:param item_list: items to display in the signal widgets, same length
as :attr:`.wid_sig_list`, each element corresponds to
one signal widget
:type item_list: list
"""
for wid, item in zip(self.wid_sig_list, item_list):
wid.addItem(item)
[docs] def create_text_item(
self, text, pos_ms, pos_y_list, text_color=(0, 0, 0),
border_color=(255, 255, 255), border_width=3
):
"""
Adds a text item to the signal widgets
(:attr:`.wid_sig_list`)
See
https://pyqtgraph.readthedocs.io/en/latest/functions.html#pyqtgraph.mkColor
for supported color formats.
:param text: text to display in the signal widgets (it is the same in
each widget)
:type text: str
:param pos_ms: temporal position (X axis) of the text item in
milliseconds
:type pos_ms: float
:param pos_y_list: position on the Y axis of the text item in each
signal widget, same length as :attr:`.wid_sig_list`
:type pos_y_list: float
:param text_color: color of the text
:type text_color: tuple or list or str
:param border_color: color of the text item border
:type border_color: tuple or list or str
:param border_width: width of the text item border in pixels
:type border_width: int
:returns: instances of pyqtgraph.TextItem, each element corresponds to
a signal widget, same length and order as :attr:`wid_sig_list`
:rtype: list
"""
# initialize list of text items
text_item_list = []
# loop on signal widgets
for wid, pos_y in zip(self.wid_sig_list, pos_y_list):
# create text item
text_item = pg.TextItem(
text, fill='w', color=text_color,
border={"color": border_color, "width": border_width}
)
# set text item position
text_item.setPos(pos_ms, pos_y)
# add text item to signal widget
wid.addItem(text_item)
# append list of text items
text_item_list.append(text_item)
return text_item_list
# *********************************************************************** #
# End group
# *********************************************************************** #
# *********************************************************************** #
# Group: Methods for mouse interaction with plots
# *********************************************************************** #
[docs] def get_mouse_y_position(self, ev):
"""
Gets position of the mouse on the Y axis of all the signal widgets
:param ev: emitted when the mouse is clicked/moved
:type ev: QtGui.QMouseEvent
:returns: same length as :attr:`.wid_sig_list`, each element
is the position of the mouse on the Y axis in the corresponding
signal widget, it returns ``[]`` if the mouse clicked on a label
item (most likely the widget title)
:rtype: list
"""
# check what is being clicked
for item in self.wid_sig_list[0].scene().items(ev.scenePos()):
# if widget title is checked, nothing is returned
if type(item) is pg.graphicsItems.LabelItem.LabelItem:
return []
# map the mouse position to the plot coordinates
position_y_list = []
for wid in self.wid_sig_list:
position_y = wid.getViewBox().mapToView(ev.pos()).y()
position_y_list.append(position_y)
return position_y_list
[docs] def zoom_or_annot_clicked(self, ev, pos_frame, pos_ms):
"""
Manages mouse click for zoom or annotation
:param ev: emitted when the mouse is clicked/moved
:type ev: QtGui.QMouseEvent
:param pos_frame: mouse position on the X axis in frame number (sampled
at the reference frequency :attr:`ViSiAnnoT.fps`)
:type pos_frame: int
:param pos_ms: mouse position on the X axis in milliseconds
:type pos_ms: int
"""
keyboard_modifiers = ev.modifiers()
# define position 1
if self.zoom_pos_1 == -1:
# zoom
self.zoom_pos_1 = max(0, min(pos_frame, self.nframes - 1))
# ctrl key => add annotation
if keyboard_modifiers == QtCore.Qt.ControlModifier and \
self.wid_annotevent is not None:
self.wid_annotevent.set_timestamp(self, self.zoom_pos_1, 0)
# define position 2
elif self.zoom_pos_2 == -1:
# zoom
self.zoom_pos_2 = max(0, min(pos_frame, self.nframes - 1))
# ctrl key => add annotation
if keyboard_modifiers == QtCore.Qt.ControlModifier and \
self.wid_annotevent is not None:
self.wid_annotevent.set_timestamp(self, self.zoom_pos_2, 1)
# swap pos_1 and pos_2 if necessary
if self.zoom_pos_1 > self.zoom_pos_2:
self.zoom_pos_1, self.zoom_pos_2 = \
self.zoom_pos_2, self.zoom_pos_1
# plot zoom region
self.region_zoom_list = self.add_region_to_widgets(
self.zoom_pos_1, self.zoom_pos_2
)
# compute zoom region duration
duration = (self.zoom_pos_2 - self.zoom_pos_1) / self.fps
# get list of Y position of the mouse in each signal widget
pos_y_list = self.get_mouse_y_position(ev)
# display zoom region duration
self.region_zoom_text_item_list = self.create_text_item(
"%.3f s" % duration, pos_ms, pos_y_list,
border_color=(150, 150, 150)
)
# both positions defined
elif self.zoom_pos_1 != -1 and self.zoom_pos_2 != -1:
# check if click is inside the zoom in area
if pos_frame >= self.zoom_pos_1 and pos_frame <= self.zoom_pos_2:
# ctrl key => add annotation
if keyboard_modifiers == QtCore.Qt.ControlModifier and \
self.wid_annotevent is not None:
self.wid_annotevent.add(self)
# no ctrl key => zoom
else:
# define new range
self.first_frame = self.zoom_pos_1
self.last_frame = self.zoom_pos_2
# update current frame if necessary
if self.frame_id < self.first_frame \
or self.frame_id >= self.last_frame:
self.update_frame_id(self.first_frame)
# update signals plots
self.update_signal_plot()
# in case the click is outside the zoom in area
else:
if self.wid_annotevent is not None:
if self.wid_annotevent.annot_array.size > 0:
# reset annotation times
self.wid_annotevent.reset_timestamp()
# remove zoom regions
self.remove_region_in_widgets(self.region_zoom_list)
self.region_zoom_list = []
# remove zoom regions description
pyqtgraph_overlayer.remove_item_in_widgets(
self.wid_sig_list, self.region_zoom_text_item_list
)
self.region_zoom_text_item_list = []
# reset zoom positions
self.zoom_pos_1 = -1
self.zoom_pos_2 = -1
# *********************************************************************** #
# End group
# *********************************************************************** #
# *********************************************************************** #
# Group: Callback method for key press interaction
# *********************************************************************** #
[docs] def key_press(self, ev):
"""
Callback method for key press interaction, see :ref:`keyboard`
:param ev: emmited when a key is pressed
:type ev: QtGui.QKeyEvent
"""
keyboard_modifiers = ev.modifiers()
# get pressed key
key = ev.key()
if key == QtCore.Qt.Key_Space:
self.flag_pause_status = not self.flag_pause_status
elif key == QtCore.Qt.Key_Left:
if keyboard_modifiers == QtCore.Qt.ControlModifier:
self.update_frame_id(self.frame_id - 60 * self.fps)
else:
self.update_frame_id(self.frame_id - self.fps)
if self.ite_file == 0:
self.update_frame_id(max(0, self.frame_id))
elif key == QtCore.Qt.Key_Right:
if keyboard_modifiers == QtCore.Qt.ControlModifier:
self.update_frame_id(self.frame_id + 60 * self.fps)
else:
self.update_frame_id(self.frame_id + self.fps)
if self.ite_file == self.nb_files - 1:
self.update_frame_id(min(self.nframes, self.frame_id))
elif key == QtCore.Qt.Key_Down:
if keyboard_modifiers == QtCore.Qt.ControlModifier:
self.update_frame_id(self.frame_id - 600 * self.fps)
else:
self.update_frame_id(self.frame_id - 10 * self.fps)
if self.ite_file == 0:
self.update_frame_id(max(0, self.frame_id))
elif key == QtCore.Qt.Key_Up:
if keyboard_modifiers == QtCore.Qt.ControlModifier:
self.update_frame_id(self.frame_id + 600 * self.fps)
else:
self.update_frame_id(self.frame_id + 10 * self.fps)
if self.ite_file == self.nb_files - 1:
self.update_frame_id(min(self.nframes, self.frame_id))
elif key == QtCore.Qt.Key_L:
self.update_frame_id(self.frame_id - 1)
if self.ite_file == 0:
self.update_frame_id(max(0, self.frame_id))
elif key == QtCore.Qt.Key_M:
self.update_frame_id(self.frame_id + 1)
if self.ite_file == self.nb_files - 1:
self.update_frame_id(min(self.nframes, self.frame_id))
elif key == QtCore.Qt.Key_I and len(self.wid_sig_list) > 0 and \
self.wid_zoomin is not None:
self.wid_zoomin.callback(self)
elif key == QtCore.Qt.Key_O and len(self.wid_sig_list) > 0 and \
self.wid_zoomout is not None:
self.wid_zoomout.callback(self)
elif key == QtCore.Qt.Key_N and len(self.wid_sig_list) > 0 and \
self.wid_visi is not None:
self.wid_visi.callback(self)
elif key == QtCore.Qt.Key_A and self.wid_annotevent is not None:
if len(self.wid_annotevent.label_list) > 0:
self.wid_annotevent.set_timestamp(self, self.frame_id, 0)
elif key == QtCore.Qt.Key_Z and self.wid_annotevent is not None:
if len(self.wid_annotevent.label_list) > 0:
self.wid_annotevent.set_timestamp(self, self.frame_id, 1)
elif key == QtCore.Qt.Key_E and self.wid_annotevent is not None:
if len(self.wid_annotevent.label_list) > 0:
self.wid_annotevent.add(self)
elif key == QtCore.Qt.Key_S and self.wid_annotevent is not None:
if len(self.wid_annotevent.label_list) > 0:
self.wid_annotevent.display(self)
elif key == QtCore.Qt.Key_PageDown and self.flag_long_rec:
self.change_file_in_long_rec(self.ite_file - 1, 0)
elif key == QtCore.Qt.Key_PageUp and self.flag_long_rec:
self.change_file_in_long_rec(self.ite_file + 1, 0)
elif key == QtCore.Qt.Key_Home:
self.update_frame_id(0)
elif key == QtCore.Qt.Key_End:
self.update_frame_id(self.nframes - 1)
elif key == QtCore.Qt.Key_D and keyboard_modifiers == \
(QtCore.Qt.ControlModifier | QtCore.Qt.ShiftModifier):
if self.wid_annotevent is not None:
self.wid_annotevent.clear_descriptions(self)
[docs] def key_release(self, ev):
"""
Callback method for key release interaction, see :ref:`keyboard`
:param ev: emmited when a key is released
:type ev: QtGui.QKeyEvent
"""
# get released key
key = ev.key()
if key == QtCore.Qt.Key_Alt:
if self.menu_bar.isVisible():
self.menu_bar.hide()
else:
self.menu_bar.show()
# *********************************************************************** #
# End group
# *********************************************************************** #
# *********************************************************************** #
# Group: Methods for setting video and signal data
# *********************************************************************** #
[docs] def set_all_data(self, video_dict, signal_dict, interval_dict):
"""
Sets video and signal data (to be called before plotting)
Make sure the following attributes are defined before calling this
method:
- :attr:`.plot_style_list`
- :attr:`.flag_long_rec`
Make sure the follwing attributes are initialized before calling this
method (it can be empty):
- :attr:`.video_data_dict`
- :attr:`.sig_dict`
- :attr:`.interval_dict`
Otherwise the video thread throws a RunTime error. These attributes are
then set thanks to the positional arguments ``video_dict`` and
``signal_dict``.
This method sets the following attributes:
- :attr:`.nframes`
- :attr:`.ViSiAnnoT.fps`
- :attr:`.beginning_datetime`
- :attr:`.sig_dict`
- :attr:`.interval_dict`
- :attr:`.data_wave`
If there is no video, the attributes :attr:`.nframes`,
:attr:`.ViSiAnnoT.fps` and :attr:`.beginning_datetime` are
set with the first signal in ``signal_dict``.
It raises an exception if 2 videos do not have the same FPS or have a
temporal shift of more than 1 second.
:param video_dict: same as first positional argument of
:class:`.ViSiAnnoT` constructor
:type video_dict: dict
:param signal_dict: same as second positional argument of
:class:`.ViSiAnnoT` constructor
:type signal_dict: dict
:param interval_dict: same as keyword argument of :class:`.ViSiAnnoT`
constructor
:type interval_dict: dict
"""
# ******************************************************************* #
# **************************** Video ******************************** #
# ******************************************************************* #
# initialize temporary lists (used in case of several cameras to check
# synchronization)
nframes_list = []
fps_list = []
beginning_datetime_list = []
# reset attributes
self.sig_dict = {}
self.interval_dict = {}
# loop on video
for ite, (video_id, video_config) in enumerate(video_dict.items()):
# check number of elements in configuration
check_configuration(
video_id, video_config, "Video", flag_long_rec=False
)
# get video configuration
path_video, delimiter, pos, fmt = video_config
# get video data
self.video_data_dict[video_id], nframes, fps = get_data_video(
path_video
)
# check if no video data
if self.video_data_dict is None:
beginning_datetime = None
else:
# get beginning datetime of video file
beginning_datetime = datetime_converter.get_datetime_from_path(
path_video, delimiter, pos, fmt, time_zone=self.time_zone
)
# update lists
nframes_list.append(nframes)
fps_list.append(fps)
beginning_datetime_list.append(beginning_datetime)
# check FPS
if fps <= 0 and path_video != '':
raise Warning("Video with null FPS at %s" % path_video)
# check if there is any video
if any(self.video_data_dict):
# make sure that FPS is not null
flag_ok = False
for ite_vid, fps in enumerate(fps_list):
if fps > 0:
flag_ok = True
break
# check if fps attribute to be set
if self.fps is None:
if flag_ok:
self.fps = fps_list[ite_vid]
else:
self.fps = 1
# get number of frames of the video
self.nframes = nframes_list[ite_vid]
# get beginning datetime of the video
self.beginning_datetime = beginning_datetime_list[ite_vid]
# check if more than 1 video
if len(nframes_list) > 1:
# update number of frames
self.nframes = max(nframes_list)
# check coherence
for fps in fps_list[1:]:
if self.fps != fps and fps >= 0:
if '' not in video_dict.values():
raise Exception(
"The 2 videos do not have the same FPS: "
"%s - %s" % (
list(video_dict.values())[0][0], path_video
)
)
# ******************************************************************* #
# ************************** No video ******************************* #
# ******************************************************************* #
# check if there is no video
# in this case the attributes fps, nframes and beginning_datetime are
# not defined yet => these attributes are defined with the first signal
if not any(video_dict):
# get first signal configuration
signal_id = list(signal_dict.keys())[0]
signal_config = list(signal_dict.values())[0][0]
# check number of elements in first signal configuration
check_configuration(
signal_id, signal_config, "Signal", flag_long_rec=False
)
# get first signal configuration
path, delimiter, pos, fmt, key_data, freq, _ = signal_config
# check if attribute fps to be set
if self.fps is None:
# get frequency and store it as reference frequency
self.fps = self.get_data_frequency(path, freq)
# get beginning date-time
self.beginning_datetime = \
datetime_converter.get_datetime_from_path(
path, delimiter, pos, fmt, time_zone=self.time_zone
)
# get data path (in case not synchronized)
if self.flag_long_rec and not self.flag_synchro:
# get first synchronization file content
lines = data_loader.get_txt_lines(path)
# get first signal file
path = lines[1].replace("\n", "")
# get number of frames
self.nframes = data_loader.get_nb_samples_generic(path, key_data)
# check if there is data indeed
if self.nframes == 0:
raise Exception(
"There is no data in the first signal file %s" % path
)
# ******************************************************************* #
# *************************** Signal ******************************** #
# ******************************************************************* #
# loop on signals
for signal_id, signal_config_list in signal_dict.items():
# initialize temporary list
sig_list_tmp = []
# loop on sub-signals
for ite_data, signal_config in enumerate(signal_config_list):
# check number of elements in signal configuration
check_configuration(
signal_id, signal_config, "Signal", flag_long_rec=False
)
# get configuration
path_data, _, _, _, key_data, freq_data, plot_style = \
signal_config
# ******************** load intervals *********************** #
if signal_id in interval_dict.keys():
# initialize dictionary value
self.interval_dict[signal_id] = []
# loop on intervals paths
for interval_config in interval_dict[signal_id]:
# check number of elements in interval configuration
check_configuration(
signal_id, interval_config, "Interval",
flag_long_rec=False
)
# get configuration
path_interval, _, _, _, key_interval, freq_interval, \
color_interval = interval_config
# get frequency
freq_interval = self.get_data_frequency(
path_interval, freq_interval
)
# asynchronous signal
if self.flag_long_rec and not self.flag_synchro:
# load intervals data
interval, _ = self.get_data_sig_tmp(
path_interval, signal_id, key_interval,
freq_interval, self.tmp_delimiter,
flag_interval=True
)
# check if empty
if interval is None:
interval = np.empty((0,))
# synchro OK
else:
# check if fake hole file
if path_interval == '':
interval = np.empty((0,))
else:
# load intervals data
interval = data_loader.get_data_interval(
path_interval, key_interval
)
# update dictionary value
self.interval_dict[signal_id].append(
[interval, freq_interval, color_interval]
)
# ********************** load data ************************** #
# asynchronous signal
if self.flag_long_rec and not self.flag_synchro:
# get data and frequency
data, freq_data = self.get_data_sig_tmp(
path_data, signal_id, key_data, freq_data,
self.tmp_delimiter
)
# synchronous signals
else:
# check if fake hole file
if path_data == '':
freq_data = 1
data = None
else:
# get frequency
freq_data = self.get_data_frequency(
path_data, freq_data
)
# keyword arguments for data_loader.get_data_generic
kwargs = {}
if os.path.splitext(path_data)[1] == ".wav":
kwargs["channel_id"] = convert_key_to_channel_id(
key_data
)
# load data
data = data_loader.get_data_generic(
path_data, key_data, **kwargs
)
# ********* convert data into an instance of Signal ********* #
# signal plot style
if plot_style is None or plot_style == "":
if ite_data < len(self.plot_style_list):
plot_style = self.plot_style_list[ite_data]
else:
raise Exception(
"No plot style provided for signal %s - %s "
"(sub-id %d) and cannot use the default style" % (
signal_id, key_data, ite_data
)
)
# create an instance of Signal
signal = Signal(
data, freq_data, max_points=self.max_points,
plot_style=plot_style, legend_text=key_data
)
# append temporary signal list
sig_list_tmp.append(signal)
# append list of signals
self.sig_dict[signal_id] = sig_list_tmp
[docs] def get_data_frequency(self, path, freq):
# get frequency if necessary
if os.path.splitext(path)[1] == ".wav":
_, freq, _ = get_audio_wave_info(path)
elif isinstance(freq, str):
freq = data_loader.get_attribute_generic(path, freq)
elif freq == -1:
freq = self.fps
return freq
[docs] @staticmethod
def get_file_sig_tmp(line, delimiter):
"""
Gets the file name and the start second in a line of a temporary
synchronization file (in case signal is not synchronized with video)
:param line: line containing the signal file name and start second
:type line: str
:param delimiter: delimiter used to split the line between file name
and start second
:type delimiter: str
:returns:
- **path** (*str*) -- path to the signal file
- **start_sec** (*int*) -- start second
"""
# get data file name and starting second
if delimiter in line:
line_split = line.split(delimiter)
path = line_split[0]
start_sec = int(line_split[1].replace("\n", ""))
else:
path = line.replace("\n", "")
start_sec = 0
return path, start_sec
[docs] def get_data_sig_tmp(
self, path, signal_id, key_data, freq_data, delimiter,
flag_interval=False
):
"""
Gets signal data after synchronization with video
:param path: path to the temporary synchronization file
:type path: str
:param signal_id: signal type (key in the dictionary ``signal_dict``,
second positional argument of :class:`.ViSiAnnoT` constructor)
:type signal_id: str
:param key_data: key to access the data (in case of .h5 or .mat file)
:type key_data: str
:param freq_data: signal frequency as found in the configuration file,
in case this is a string, then the frequency is retrieved in the
data file
:type freq_data: float or str
:param delimiter: delimiter used to split the lines of the temporary
signal files
:type delimiter: str
:param flag_interval: specify if data to load is intervals
:type flag_interval: bool
:returns: signal data synchronized with video
:rtype: numpy array
"""
# read temporary file
lines = data_loader.get_txt_lines(path)
# define empty data
if len(lines) == 0:
data = None
freq_data = None
else:
# initialize data list
data_list = []
duration_progress = 0
# look for data file path in order to get frequency if stored in
# file attribute
if isinstance(freq_data, str):
freq_data_tmp = None
for line in lines:
data_path, _ = ViSiAnnoT.get_file_sig_tmp(line, delimiter)
if data_path != "None":
freq_data_tmp = self.get_data_frequency(
data_path, freq_data
)
break
if freq_data_tmp is not None:
freq_data = freq_data_tmp
# data frequency is the same as reference frequency
elif freq_data == -1:
freq_data = self.fps
# loop on temporary file lines
for ite_line, line in enumerate(lines):
# get data path and starting second
data_path, start_sec = ViSiAnnoT.get_file_sig_tmp(
line, delimiter
)
# no data at the beginning
if data_path == "None":
# check if 2D data (signal not regularly sampled)
if freq_data == 0:
next_data = np.empty((0, 2))
else:
next_data = np.nan * np.ones(
(int(start_sec * freq_data),)
)
duration_progress += start_sec
else:
# check if 2D data (signal not regularly sampled)
if freq_data == 0:
# get first column (samples timestamps)
next_data_ts = data_loader.get_data_generic(
data_path, key=key_data, slicing=("col", 0)
)
# initialize slicing indexes
start_ind = 0
end_ind = None
# truncate data at the beginning if necessary
if ite_line == 0:
# 1D data (regularly sampled)
if freq_data > 0:
# get slicing index
start_ind = int(start_sec * freq_data)
# 2D data (not regularly sampled)
else:
# get indexes of samples after starting second
inds = np.where(
next_data_ts >= start_sec * 1000
)[0]
# get slicing index
start_ind = inds[0]
# update temporal offset
duration_progress = - start_sec
# truncate data at the end if necessary
if ite_line == len(lines) - 1:
# get duration of reference data file in seconds
ref_duration = self.nframes / self.fps
# 1D data
if freq_data > 0:
# get length of data so far
data_length = 0
for data_tmp in data_list:
data_length += data_tmp.shape[0]
# get remaining data length required to fill
# the reference data file
remaining_length = int(round(
freq_data * ref_duration - data_length
))
# get slicing index
end_ind = start_ind + remaining_length
# 2D data (not regularly sampled)
else:
temporal_limit = (
ref_duration - duration_progress
) * 1000
# get indexes of samples before temporal limit
inds = np.where(
next_data_ts <= temporal_limit
)[0]
# get slicing indexes
end_ind = inds[-1] + 1
# keyword arguments for loading data
kwargs = {"key": key_data}
# channel specification when loading audio
if data_path.split('.')[-1] == "wav":
kwargs["channel_id"] = convert_key_to_channel_id(
key_data
)
# slicing keyword argument for data loading
if start_ind == 0 and end_ind is None:
kwargs["slicing"] = ()
elif end_ind is None:
kwargs["slicing"] = (start_ind,)
else:
kwargs["slicing"] = (start_ind, end_ind)
# check if interval data
if flag_interval:
# load data with slicing
next_data = \
data_loader.get_data_interval_as_time_series(
data_path, **kwargs
)
else:
# load data with slicing
next_data = data_loader.get_data_generic(
data_path, **kwargs
)
# get duration of truncated data
if freq_data > 0:
duration = next_data.shape[0] / freq_data
else:
duration = (next_data[-1, 0] - next_data[0, 0]) / 1000
# temporal offset
next_data[:, 0] += duration_progress * 1000
duration_progress += duration
# concatenate data
data_list.append(next_data)
# get data as a numpy array
data = np.concatenate(tuple(data_list))
# convert intervals data from time series to intervals
if flag_interval:
data = data_loader.convert_time_series_to_intervals(data, 1)
return data, freq_data
# *********************************************************************** #
# End group
# *********************************************************************** #