Skip to content

Processes

behavysis.processes.update_configs.UpdateConfigs

summary

Source code in behavysis/processes/update_configs.py
class UpdateConfigs:
    """_summary_"""

    @staticmethod
    def update_configs(
        configs_fp: str,
        default_configs_fp: str,
        overwrite: Literal["user", "all"],
    ) -> str:
        """
        Initialises the config files with the given `default_configs`.
        The different types of overwriting are:
        - "user": Only the user parameters are updated.
        - "all": All parameters are updated.

        Parameters
        ----------
        configs_fp : str
            The filepath of the existing config file.
        default_configs_fp : str
            The filepath of the default config file to use.
        overwrite : Literal["user", "all"]
            Specifies how to update the config files.

        Returns
        -------
        str
            Description of the function's outcome.
        """
        logger, io_obj = init_logger_io_obj()
        # Parsing in the experiment's existing JSON configs
        try:
            configs = ExperimentConfigs.read_json(configs_fp)
        except (FileNotFoundError, ValidationError):
            configs = ExperimentConfigs()
        # Reading in the new configs from the given configs_fp
        default_configs = ExperimentConfigs.read_json(default_configs_fp)
        # Overwriting the configs file (with given method)
        if overwrite == "user":
            configs.user = default_configs.user
            configs.ref = default_configs.ref
            logger.info("Updating user and ref configs.")
        elif overwrite == "all":
            configs = default_configs
            logger.info("Updating all configs.")
        else:
            raise ValueError(
                f'Invalid value "{overwrite}" passed to function. ' 'The value must be either "user", or "all".'
            )
        # Writing new configs to JSON file
        configs.write_json(configs_fp)
        return get_io_obj_content(io_obj)

update_configs(configs_fp, default_configs_fp, overwrite) staticmethod

Initialises the config files with the given default_configs. The different types of overwriting are: - "user": Only the user parameters are updated. - "all": All parameters are updated.

Parameters:

Name Type Description Default
configs_fp str

The filepath of the existing config file.

required
default_configs_fp str

The filepath of the default config file to use.

required
overwrite Literal['user', 'all']

Specifies how to update the config files.

required

Returns:

Type Description
str

Description of the function's outcome.

Source code in behavysis/processes/update_configs.py
@staticmethod
def update_configs(
    configs_fp: str,
    default_configs_fp: str,
    overwrite: Literal["user", "all"],
) -> str:
    """
    Initialises the config files with the given `default_configs`.
    The different types of overwriting are:
    - "user": Only the user parameters are updated.
    - "all": All parameters are updated.

    Parameters
    ----------
    configs_fp : str
        The filepath of the existing config file.
    default_configs_fp : str
        The filepath of the default config file to use.
    overwrite : Literal["user", "all"]
        Specifies how to update the config files.

    Returns
    -------
    str
        Description of the function's outcome.
    """
    logger, io_obj = init_logger_io_obj()
    # Parsing in the experiment's existing JSON configs
    try:
        configs = ExperimentConfigs.read_json(configs_fp)
    except (FileNotFoundError, ValidationError):
        configs = ExperimentConfigs()
    # Reading in the new configs from the given configs_fp
    default_configs = ExperimentConfigs.read_json(default_configs_fp)
    # Overwriting the configs file (with given method)
    if overwrite == "user":
        configs.user = default_configs.user
        configs.ref = default_configs.ref
        logger.info("Updating user and ref configs.")
    elif overwrite == "all":
        configs = default_configs
        logger.info("Updating all configs.")
    else:
        raise ValueError(
            f'Invalid value "{overwrite}" passed to function. ' 'The value must be either "user", or "all".'
        )
    # Writing new configs to JSON file
    configs.write_json(configs_fp)
    return get_io_obj_content(io_obj)

behavysis.processes.format_vid.FormatVid

Class for formatting videos based on given parameters.

Source code in behavysis/processes/format_vid.py
class FormatVid:
    """
    Class for formatting videos based on given parameters.
    """

    @classmethod
    def format_vid(cls, raw_vid_fp: str, formatted_vid_fp: str, configs_fp: str, overwrite: bool) -> str:
        """
        Formats the input video with the given parameters.

        Parameters
        ----------
        raw_fp : str
            The input video filepath.
        formatted_fp : str
            The output video filepath.
        configs_fp : str
            The JSON configs filepath.
        overwrite : bool
            Whether to overwrite the output file (if it exists).

        Returns
        -------
        str
            Description of the function's outcome.
        """
        logger, io_obj = init_logger_io_obj()
        if not overwrite and os.path.exists(formatted_vid_fp):
            logger.warning(file_exists_msg(formatted_vid_fp))
            return get_io_obj_content(io_obj)
        # Finding all necessary config parameters for video formatting
        configs = ExperimentConfigs.read_json(configs_fp)
        configs_filt = configs.user.format_vid
        # Processing the video
        ffmpeg_process_vid(
            in_fp=raw_vid_fp,
            dst_fp=formatted_vid_fp,
            logger=logger,
            width_px=configs.get_ref(configs_filt.width_px),
            height_px=configs.get_ref(configs_filt.height_px),
            fps=configs.get_ref(configs_filt.fps),
            start_sec=configs.get_ref(configs_filt.start_sec),
            stop_sec=configs.get_ref(configs_filt.stop_sec),
        )
        cls.get_vids_metadata(raw_vid_fp, formatted_vid_fp, configs_fp)
        return get_io_obj_content(io_obj)

    @classmethod
    def get_vids_metadata(cls, raw_vid_fp: str, formatted_vid_fp: str, configs_fp: str) -> str:
        """
        Finds the video metadata/parameters for either the raw or formatted video,
        and stores this data in the experiment's config file.

        Parameters
        ----------
        raw_fp : str
            The input video filepath.
        formatted_fp : str
            The output video filepath.
        configs_fp : str
            The JSON configs filepath.

        Returns
        -------
        str
            Description of the function's outcome.
        """
        logger, io_obj = init_logger_io_obj()
        # Saving video metadata to configs dict
        configs = ExperimentConfigs.read_json(configs_fp)
        configs.auto.raw_vid = get_vid_metadata(raw_vid_fp, logger)
        configs.auto.formatted_vid = get_vid_metadata(formatted_vid_fp, logger)
        logger.info("Video metadata stored in config file.")
        configs.write_json(configs_fp)
        return get_io_obj_content(io_obj)

format_vid(raw_vid_fp, formatted_vid_fp, configs_fp, overwrite) classmethod

Formats the input video with the given parameters.

Parameters:

Name Type Description Default
raw_fp str

The input video filepath.

required
formatted_fp str

The output video filepath.

required
configs_fp str

The JSON configs filepath.

required
overwrite bool

Whether to overwrite the output file (if it exists).

required

Returns:

Type Description
str

Description of the function's outcome.

Source code in behavysis/processes/format_vid.py
@classmethod
def format_vid(cls, raw_vid_fp: str, formatted_vid_fp: str, configs_fp: str, overwrite: bool) -> str:
    """
    Formats the input video with the given parameters.

    Parameters
    ----------
    raw_fp : str
        The input video filepath.
    formatted_fp : str
        The output video filepath.
    configs_fp : str
        The JSON configs filepath.
    overwrite : bool
        Whether to overwrite the output file (if it exists).

    Returns
    -------
    str
        Description of the function's outcome.
    """
    logger, io_obj = init_logger_io_obj()
    if not overwrite and os.path.exists(formatted_vid_fp):
        logger.warning(file_exists_msg(formatted_vid_fp))
        return get_io_obj_content(io_obj)
    # Finding all necessary config parameters for video formatting
    configs = ExperimentConfigs.read_json(configs_fp)
    configs_filt = configs.user.format_vid
    # Processing the video
    ffmpeg_process_vid(
        in_fp=raw_vid_fp,
        dst_fp=formatted_vid_fp,
        logger=logger,
        width_px=configs.get_ref(configs_filt.width_px),
        height_px=configs.get_ref(configs_filt.height_px),
        fps=configs.get_ref(configs_filt.fps),
        start_sec=configs.get_ref(configs_filt.start_sec),
        stop_sec=configs.get_ref(configs_filt.stop_sec),
    )
    cls.get_vids_metadata(raw_vid_fp, formatted_vid_fp, configs_fp)
    return get_io_obj_content(io_obj)

get_vids_metadata(raw_vid_fp, formatted_vid_fp, configs_fp) classmethod

Finds the video metadata/parameters for either the raw or formatted video, and stores this data in the experiment's config file.

Parameters:

Name Type Description Default
raw_fp str

The input video filepath.

required
formatted_fp str

The output video filepath.

required
configs_fp str

The JSON configs filepath.

required

Returns:

Type Description
str

Description of the function's outcome.

Source code in behavysis/processes/format_vid.py
@classmethod
def get_vids_metadata(cls, raw_vid_fp: str, formatted_vid_fp: str, configs_fp: str) -> str:
    """
    Finds the video metadata/parameters for either the raw or formatted video,
    and stores this data in the experiment's config file.

    Parameters
    ----------
    raw_fp : str
        The input video filepath.
    formatted_fp : str
        The output video filepath.
    configs_fp : str
        The JSON configs filepath.

    Returns
    -------
    str
        Description of the function's outcome.
    """
    logger, io_obj = init_logger_io_obj()
    # Saving video metadata to configs dict
    configs = ExperimentConfigs.read_json(configs_fp)
    configs.auto.raw_vid = get_vid_metadata(raw_vid_fp, logger)
    configs.auto.formatted_vid = get_vid_metadata(formatted_vid_fp, logger)
    logger.info("Video metadata stored in config file.")
    configs.write_json(configs_fp)
    return get_io_obj_content(io_obj)

behavysis.processes.run_dlc.RunDLC

summary

Source code in behavysis/processes/run_dlc.py
class RunDLC:
    """_summary_"""

    @classmethod
    def ma_dlc_run_single(
        cls,
        formatted_vid_fp: str,
        keypoints_fp: str,
        configs_fp: str,
        gputouse: int | None,
        overwrite: bool,
    ) -> str:
        """
        Running custom DLC script to generate a DLC keypoints dataframe from a single video.
        """
        logger, io_obj = init_logger_io_obj()
        if not overwrite and os.path.exists(keypoints_fp):
            logger.warning(file_exists_msg(keypoints_fp))
            return get_io_obj_content(io_obj)
        # Getting model_fp
        configs = ExperimentConfigs.read_json(configs_fp)
        model_fp = configs.get_ref(configs.user.run_dlc.model_fp)
        # Derive more parameters
        temp_dlc_dir = os.path.join(CACHE_DIR, f"dlc_{gputouse}")
        keypoints_dir = os.path.dirname(keypoints_fp)
        # Making output directories
        os.makedirs(temp_dlc_dir, exist_ok=True)

        # Assertion: the config.yaml file must exist.
        if not os.path.isfile(model_fp):
            raise ValueError(
                f'The given model_fp file does not exist: "{model_fp}".\n'
                + 'Check this file and specify a DLC ".yaml" config file.'
            )

        # Running the DLC subprocess (in a separate conda env)
        run_dlc_subproc(model_fp, [formatted_vid_fp], temp_dlc_dir, CACHE_DIR, gputouse, logger)

        # Exporting the h5 to chosen file format
        export2df(formatted_vid_fp, temp_dlc_dir, keypoints_dir, logger)
        silent_remove(temp_dlc_dir)

        return get_io_obj_content(io_obj)

    @staticmethod
    def ma_dlc_run_batch(
        vid_fp_ls: list[str],
        keypoints_dir: str,
        configs_dir: str,
        gputouse: int | None,
        overwrite: bool,
    ) -> str:
        """
        Running custom DLC script to generate a DLC keypoints dataframe from a single video.
        """
        logger, io_obj = init_logger_io_obj()

        # Specifying the GPU to use and making the output directory
        # Making output directories
        temp_dlc_dir = os.path.join(CACHE_DIR, f"dlc_{gputouse}")
        os.makedirs(temp_dlc_dir, exist_ok=True)

        # If overwrite is False, filtering for only experiments that need processing
        if not overwrite:
            # Getting only the vid_fp_ls elements that do not exist in keypoints_dir
            vid_fp_ls = [
                vid_fp
                for vid_fp in vid_fp_ls
                if not os.path.exists(os.path.join(keypoints_dir, f"{get_name(vid_fp)}.{KeypointsDf.IO}"))
            ]

        # If there are no videos to process, return
        if len(vid_fp_ls) == 0:
            return get_io_obj_content(io_obj)

        # Getting the DLC model config path
        # Getting the names of the files that need processing
        dlc_fp_ls = [get_name(i) for i in vid_fp_ls]
        # Getting their corresponding configs_fp
        dlc_fp_ls = [os.path.join(configs_dir, f"{i}.json") for i in dlc_fp_ls]
        # Reading their configs
        dlc_fp_ls = [ExperimentConfigs.read_json(i) for i in dlc_fp_ls]
        # Getting their model_fp
        dlc_fp_ls = [i.user.run_dlc.model_fp for i in dlc_fp_ls]
        # Converting to a set
        dlc_fp_set = set(dlc_fp_ls)
        # Assertion: all model_fp must be the same
        assert len(dlc_fp_set) == 1
        # Getting the model_fp
        model_fp = dlc_fp_set.pop()
        # Assertion: the config.yaml file must exist.
        assert os.path.isfile(model_fp), (
            f'The given model_fp file does not exist: "{model_fp}".\n'
            + 'Check this file and specify a DLC ".yaml" config file.'
        )

        # Running the DLC subprocess (in a separate conda env)
        run_dlc_subproc(model_fp, vid_fp_ls, temp_dlc_dir, CACHE_DIR, gputouse, logger)

        # Exporting the h5 to chosen file format
        for vid_fp in vid_fp_ls:
            export2df(vid_fp, temp_dlc_dir, keypoints_dir, logger)
        silent_remove(temp_dlc_dir)
        return get_io_obj_content(io_obj)

ma_dlc_run_batch(vid_fp_ls, keypoints_dir, configs_dir, gputouse, overwrite) staticmethod

Running custom DLC script to generate a DLC keypoints dataframe from a single video.

Source code in behavysis/processes/run_dlc.py
@staticmethod
def ma_dlc_run_batch(
    vid_fp_ls: list[str],
    keypoints_dir: str,
    configs_dir: str,
    gputouse: int | None,
    overwrite: bool,
) -> str:
    """
    Running custom DLC script to generate a DLC keypoints dataframe from a single video.
    """
    logger, io_obj = init_logger_io_obj()

    # Specifying the GPU to use and making the output directory
    # Making output directories
    temp_dlc_dir = os.path.join(CACHE_DIR, f"dlc_{gputouse}")
    os.makedirs(temp_dlc_dir, exist_ok=True)

    # If overwrite is False, filtering for only experiments that need processing
    if not overwrite:
        # Getting only the vid_fp_ls elements that do not exist in keypoints_dir
        vid_fp_ls = [
            vid_fp
            for vid_fp in vid_fp_ls
            if not os.path.exists(os.path.join(keypoints_dir, f"{get_name(vid_fp)}.{KeypointsDf.IO}"))
        ]

    # If there are no videos to process, return
    if len(vid_fp_ls) == 0:
        return get_io_obj_content(io_obj)

    # Getting the DLC model config path
    # Getting the names of the files that need processing
    dlc_fp_ls = [get_name(i) for i in vid_fp_ls]
    # Getting their corresponding configs_fp
    dlc_fp_ls = [os.path.join(configs_dir, f"{i}.json") for i in dlc_fp_ls]
    # Reading their configs
    dlc_fp_ls = [ExperimentConfigs.read_json(i) for i in dlc_fp_ls]
    # Getting their model_fp
    dlc_fp_ls = [i.user.run_dlc.model_fp for i in dlc_fp_ls]
    # Converting to a set
    dlc_fp_set = set(dlc_fp_ls)
    # Assertion: all model_fp must be the same
    assert len(dlc_fp_set) == 1
    # Getting the model_fp
    model_fp = dlc_fp_set.pop()
    # Assertion: the config.yaml file must exist.
    assert os.path.isfile(model_fp), (
        f'The given model_fp file does not exist: "{model_fp}".\n'
        + 'Check this file and specify a DLC ".yaml" config file.'
    )

    # Running the DLC subprocess (in a separate conda env)
    run_dlc_subproc(model_fp, vid_fp_ls, temp_dlc_dir, CACHE_DIR, gputouse, logger)

    # Exporting the h5 to chosen file format
    for vid_fp in vid_fp_ls:
        export2df(vid_fp, temp_dlc_dir, keypoints_dir, logger)
    silent_remove(temp_dlc_dir)
    return get_io_obj_content(io_obj)

ma_dlc_run_single(formatted_vid_fp, keypoints_fp, configs_fp, gputouse, overwrite) classmethod

Running custom DLC script to generate a DLC keypoints dataframe from a single video.

Source code in behavysis/processes/run_dlc.py
@classmethod
def ma_dlc_run_single(
    cls,
    formatted_vid_fp: str,
    keypoints_fp: str,
    configs_fp: str,
    gputouse: int | None,
    overwrite: bool,
) -> str:
    """
    Running custom DLC script to generate a DLC keypoints dataframe from a single video.
    """
    logger, io_obj = init_logger_io_obj()
    if not overwrite and os.path.exists(keypoints_fp):
        logger.warning(file_exists_msg(keypoints_fp))
        return get_io_obj_content(io_obj)
    # Getting model_fp
    configs = ExperimentConfigs.read_json(configs_fp)
    model_fp = configs.get_ref(configs.user.run_dlc.model_fp)
    # Derive more parameters
    temp_dlc_dir = os.path.join(CACHE_DIR, f"dlc_{gputouse}")
    keypoints_dir = os.path.dirname(keypoints_fp)
    # Making output directories
    os.makedirs(temp_dlc_dir, exist_ok=True)

    # Assertion: the config.yaml file must exist.
    if not os.path.isfile(model_fp):
        raise ValueError(
            f'The given model_fp file does not exist: "{model_fp}".\n'
            + 'Check this file and specify a DLC ".yaml" config file.'
        )

    # Running the DLC subprocess (in a separate conda env)
    run_dlc_subproc(model_fp, [formatted_vid_fp], temp_dlc_dir, CACHE_DIR, gputouse, logger)

    # Exporting the h5 to chosen file format
    export2df(formatted_vid_fp, temp_dlc_dir, keypoints_dir, logger)
    silent_remove(temp_dlc_dir)

    return get_io_obj_content(io_obj)

behavysis.processes.calculate_params.CalculateParams

Source code in behavysis/processes/calculate_params.py
class CalculateParams:
    @staticmethod
    def start_frame_from_likelihood(
        keypoints_fp: str,
        configs_fp: str,
    ) -> str:
        """
        Determines the starting frame of the experiment based on
        when the subject "likely" entered the frame of view.

        This is done by looking at a sliding window of time. If the median likelihood of the subject
        existing in each frame across the sliding window is greater than the defined pcutoff, then
        the determine this as the start time.

        Notes
        -----
        The config file must contain the following parameters:
        ```
        - user
            - calculate_params
                - start_frame
                    - bodyparts: list[str]
                    - window_sec: float
                    - pcutoff: float
        ```
        """
        logger, io_obj = init_logger_io_obj()
        start_frame, stop_frame = calc_exists_from_likelihood(keypoints_fp, configs_fp, logger)
        # Writing to configs
        configs = ExperimentConfigs.read_json(configs_fp)
        configs.auto.start_frame = start_frame
        configs.write_json(configs_fp)
        return get_io_obj_content(io_obj)

    @staticmethod
    def start_frame_from_csv(keypoints_fp: str, configs_fp: str) -> str:
        """
        Reads the start time of the experiment from a given CSV file
        (filepath specified in config file).

        Expects value to be in seconds (so will convert to frames).
        Also expects the csv_fp to be a csv file,
        where the first column is the name of the video and the second column
        is the start time.

        Notes
        -----
        The config file must contain the following parameters:
        ```
        - user
            - calculate_params
                - start_frame_from_csv
                    - csv_fp: str
                    - name: None | str
        ```
        """
        logger, io_obj = init_logger_io_obj()
        # Getting necessary config parameters
        configs = ExperimentConfigs.read_json(configs_fp)
        configs_filt = configs.user.calculate_params.start_frame_from_csv
        fps = configs.auto.formatted_vid.fps
        csv_fp = configs.get_ref(configs_filt.csv_fp)
        name = configs.get_ref(configs_filt.name)
        assert fps != -1, "fps not yet set. Please calculate fps first with `proj.get_vid_metadata`."
        # Using the name of the video as the name of the experiment if not specified
        if name is None:
            name = get_name(keypoints_fp)
        # Reading csv_fp
        start_times_df = pd.read_csv(csv_fp, index_col=0)
        start_times_df.index = start_times_df.index.astype(str)
        assert name in start_times_df.index.values, (
            f"{name} not in {csv_fp}.\n"
            "Update the `name` parameter in the configs file or check the start_frames csv file."
        )
        # Getting start time in seconds
        start_sec = start_times_df.loc[name][0]
        # Converting to start frame
        start_frame = int(np.round(start_sec * fps, 0))
        # Writing to configs
        configs = ExperimentConfigs.read_json(configs_fp)
        configs.auto.start_frame = start_frame
        configs.write_json(configs_fp)
        return get_io_obj_content(io_obj)

    @staticmethod
    def stop_frame_from_likelihood(keypoints_fp: str, configs_fp: str) -> str:
        """
        Determines the starting frame of the experiment based on
        when the subject "likely" entered the frame of view.

        This is done by looking at a sliding window of time. If the median likelihood of the subject
        existing in each frame across the sliding window is greater than the defined pcutoff, then
        the determine this as the start time.

        """
        logger, io_obj = init_logger_io_obj()
        start_frame, stop_frame = calc_exists_from_likelihood(keypoints_fp, configs_fp, logger)
        # Writing to configs
        configs = ExperimentConfigs.read_json(configs_fp)
        configs.auto.stop_frame = stop_frame
        configs.write_json(configs_fp)
        return get_io_obj_content(io_obj)

    @staticmethod
    def stop_frame_from_dur(keypoints_fp: str, configs_fp: str) -> str:
        """
        Calculates the end time according to the following equation:

        ```
        stop_frame = start_frame + experiment_duration
        ```

        Notes
        -----
        The config file must contain the following parameters:
        ```
        - user
            - calculate_params
                - stop_frame_from_dur
                    - dur_sec: float
        ```
        """
        logger, io_obj = init_logger_io_obj()
        # Getting necessary config parameters
        configs = ExperimentConfigs.read_json(configs_fp)
        configs_filt = configs.user.calculate_params.stop_frame_from_dur
        dur_sec = configs.get_ref(configs_filt.dur_sec)
        start_frame = configs.auto.start_frame
        fps = configs.auto.formatted_vid.fps
        total_frames = configs.auto.formatted_vid.total_frames
        assert start_frame != -1, "start_frame is None. Please calculate start_frame first."
        assert fps != -1, "fps not yet set. Please calculate fps first with `proj.get_vid_metadata`."
        # Calculating stop_frame
        dur_frames = int(dur_sec * fps)
        stop_frame = start_frame + dur_frames
        # Make a warning if the use-specified dur_sec is larger than the duration of the video.
        if total_frames is None:
            logger.warning("The length of the video itself has not been calculated yet.")
        elif stop_frame > total_frames:
            logger.warning(
                "The user specified dur_sec in the configs file is greater "
                "than the actual length of the video. Please check to see if this video is "
                "too short or if the dur_sec value is incorrect."
            )
        # Writing to config
        configs = ExperimentConfigs.read_json(configs_fp)
        configs.auto.stop_frame = stop_frame
        configs.write_json(configs_fp)
        return get_io_obj_content(io_obj)

    @staticmethod
    def dur_frames_from_likelihood(keypoints_fp: str, configs_fp: str) -> str:
        """
        Calculates the duration in seconds, from the time the specified bodyparts appeared
        to the time they disappeared.
        Appear/disappear is calculated from likelihood.
        """
        logger, io_obj = init_logger_io_obj()
        start_frame, stop_frame = calc_exists_from_likelihood(keypoints_fp, configs_fp, logger)
        # Writing to configs
        configs = ExperimentConfigs.read_json(configs_fp)
        configs.auto.dur_frames = stop_frame - start_frame
        configs.write_json(configs_fp)
        return get_io_obj_content(io_obj)

    @staticmethod
    def px_per_mm(keypoints_fp: str, configs_fp: str) -> str:
        """
        Calculates the pixels per mm conversion for the video.

        This is done by averaging the (x, y) coordinates of each corner,
        finding the average x difference for the widths in pixels and y distance
        for the heights in pixels,
        dividing these pixel distances by their respective mm distances
        (from the *config.json file),
        and taking the average of these width and height conversions to estimate
        the px to mm
        conversion.

        Notes
        -----
        The config file must contain the following parameters:
        ```
        - user
            - calculate_params
                - px_per_mm
                    - point_a: str
                    - point_b: str
                    - dist_mm: float
        ```
        """
        logger, io_obj = init_logger_io_obj()
        # Getting necessary config parameters
        configs = ExperimentConfigs.read_json(configs_fp)
        configs_filt = configs.user.calculate_params.px_per_mm
        pt_a = configs.get_ref(configs_filt.pt_a)
        pt_b = configs.get_ref(configs_filt.pt_b)
        pcutoff = configs.get_ref(configs_filt.pcutoff)
        dist_mm = configs.get_ref(configs_filt.dist_mm)
        # Loading dataframe
        keypoints_df = KeypointsDf.clean_headings(KeypointsDf.read(keypoints_fp))
        # Imputing missing values with 0 (only really relevant for `likelihood` columns)
        keypoints_df = keypoints_df.fillna(0)
        # Checking that the two reference points are valid
        KeypointsDf.check_bpts_exist(keypoints_df, [pt_a, pt_b])
        # Getting calibration points (x, y, likelihood) values
        pt_a_df = keypoints_df[IndivCols.SINGLE.value, pt_a]
        pt_b_df = keypoints_df[IndivCols.SINGLE.value, pt_b]
        for pt_df, pt in ([pt_a_df, pt_a], [pt_b_df, pt_b]):
            assert np.any(pt_df[CoordsCols.LIKELIHOOD.value] > pcutoff), (
                f'No points for "{pt}" are above the pcutoff of {pcutoff}.\n'
                "Consider lowering the pcutoff in the configs file.\n"
                f'The highest likelihood value in "{pt}" is {np.nanmax(pt_df[CoordsCols.LIKELIHOOD.value])}.'
            )
        # Interpolating points which are below a likelihood threshold (linear)
        pt_a_df.loc[pt_a_df[CoordsCols.LIKELIHOOD.value] < pcutoff] = np.nan
        pt_a_df = pt_a_df.interpolate(method="linear", axis=0).bfill().ffill()
        pt_b_df.loc[pt_b_df[CoordsCols.LIKELIHOOD.value] < pcutoff] = np.nan
        pt_b_df = pt_b_df.interpolate(method="linear", axis=0).bfill().ffill()
        # Getting distance between calibration points
        # TODO: use variable names for x and y
        dist_px = np.nanmean(np.sqrt(np.square(pt_a_df["x"] - pt_b_df["x"]) + np.square(pt_a_df["y"] - pt_b_df["y"])))
        # Finding pixels per mm conversion, using the given arena width and height as calibration
        px_per_mm = dist_px / dist_mm
        # Saving to configs file
        configs = ExperimentConfigs.read_json(configs_fp)
        configs.auto.px_per_mm = px_per_mm
        configs.write_json(configs_fp)
        return get_io_obj_content(io_obj)

dur_frames_from_likelihood(keypoints_fp, configs_fp) staticmethod

Calculates the duration in seconds, from the time the specified bodyparts appeared to the time they disappeared. Appear/disappear is calculated from likelihood.

Source code in behavysis/processes/calculate_params.py
@staticmethod
def dur_frames_from_likelihood(keypoints_fp: str, configs_fp: str) -> str:
    """
    Calculates the duration in seconds, from the time the specified bodyparts appeared
    to the time they disappeared.
    Appear/disappear is calculated from likelihood.
    """
    logger, io_obj = init_logger_io_obj()
    start_frame, stop_frame = calc_exists_from_likelihood(keypoints_fp, configs_fp, logger)
    # Writing to configs
    configs = ExperimentConfigs.read_json(configs_fp)
    configs.auto.dur_frames = stop_frame - start_frame
    configs.write_json(configs_fp)
    return get_io_obj_content(io_obj)

px_per_mm(keypoints_fp, configs_fp) staticmethod

Calculates the pixels per mm conversion for the video.

This is done by averaging the (x, y) coordinates of each corner, finding the average x difference for the widths in pixels and y distance for the heights in pixels, dividing these pixel distances by their respective mm distances (from the *config.json file), and taking the average of these width and height conversions to estimate the px to mm conversion.

Notes

The config file must contain the following parameters:

- user
    - calculate_params
        - px_per_mm
            - point_a: str
            - point_b: str
            - dist_mm: float

Source code in behavysis/processes/calculate_params.py
@staticmethod
def px_per_mm(keypoints_fp: str, configs_fp: str) -> str:
    """
    Calculates the pixels per mm conversion for the video.

    This is done by averaging the (x, y) coordinates of each corner,
    finding the average x difference for the widths in pixels and y distance
    for the heights in pixels,
    dividing these pixel distances by their respective mm distances
    (from the *config.json file),
    and taking the average of these width and height conversions to estimate
    the px to mm
    conversion.

    Notes
    -----
    The config file must contain the following parameters:
    ```
    - user
        - calculate_params
            - px_per_mm
                - point_a: str
                - point_b: str
                - dist_mm: float
    ```
    """
    logger, io_obj = init_logger_io_obj()
    # Getting necessary config parameters
    configs = ExperimentConfigs.read_json(configs_fp)
    configs_filt = configs.user.calculate_params.px_per_mm
    pt_a = configs.get_ref(configs_filt.pt_a)
    pt_b = configs.get_ref(configs_filt.pt_b)
    pcutoff = configs.get_ref(configs_filt.pcutoff)
    dist_mm = configs.get_ref(configs_filt.dist_mm)
    # Loading dataframe
    keypoints_df = KeypointsDf.clean_headings(KeypointsDf.read(keypoints_fp))
    # Imputing missing values with 0 (only really relevant for `likelihood` columns)
    keypoints_df = keypoints_df.fillna(0)
    # Checking that the two reference points are valid
    KeypointsDf.check_bpts_exist(keypoints_df, [pt_a, pt_b])
    # Getting calibration points (x, y, likelihood) values
    pt_a_df = keypoints_df[IndivCols.SINGLE.value, pt_a]
    pt_b_df = keypoints_df[IndivCols.SINGLE.value, pt_b]
    for pt_df, pt in ([pt_a_df, pt_a], [pt_b_df, pt_b]):
        assert np.any(pt_df[CoordsCols.LIKELIHOOD.value] > pcutoff), (
            f'No points for "{pt}" are above the pcutoff of {pcutoff}.\n'
            "Consider lowering the pcutoff in the configs file.\n"
            f'The highest likelihood value in "{pt}" is {np.nanmax(pt_df[CoordsCols.LIKELIHOOD.value])}.'
        )
    # Interpolating points which are below a likelihood threshold (linear)
    pt_a_df.loc[pt_a_df[CoordsCols.LIKELIHOOD.value] < pcutoff] = np.nan
    pt_a_df = pt_a_df.interpolate(method="linear", axis=0).bfill().ffill()
    pt_b_df.loc[pt_b_df[CoordsCols.LIKELIHOOD.value] < pcutoff] = np.nan
    pt_b_df = pt_b_df.interpolate(method="linear", axis=0).bfill().ffill()
    # Getting distance between calibration points
    # TODO: use variable names for x and y
    dist_px = np.nanmean(np.sqrt(np.square(pt_a_df["x"] - pt_b_df["x"]) + np.square(pt_a_df["y"] - pt_b_df["y"])))
    # Finding pixels per mm conversion, using the given arena width and height as calibration
    px_per_mm = dist_px / dist_mm
    # Saving to configs file
    configs = ExperimentConfigs.read_json(configs_fp)
    configs.auto.px_per_mm = px_per_mm
    configs.write_json(configs_fp)
    return get_io_obj_content(io_obj)

start_frame_from_csv(keypoints_fp, configs_fp) staticmethod

Reads the start time of the experiment from a given CSV file (filepath specified in config file).

Expects value to be in seconds (so will convert to frames). Also expects the csv_fp to be a csv file, where the first column is the name of the video and the second column is the start time.

Notes

The config file must contain the following parameters:

- user
    - calculate_params
        - start_frame_from_csv
            - csv_fp: str
            - name: None | str

Source code in behavysis/processes/calculate_params.py
@staticmethod
def start_frame_from_csv(keypoints_fp: str, configs_fp: str) -> str:
    """
    Reads the start time of the experiment from a given CSV file
    (filepath specified in config file).

    Expects value to be in seconds (so will convert to frames).
    Also expects the csv_fp to be a csv file,
    where the first column is the name of the video and the second column
    is the start time.

    Notes
    -----
    The config file must contain the following parameters:
    ```
    - user
        - calculate_params
            - start_frame_from_csv
                - csv_fp: str
                - name: None | str
    ```
    """
    logger, io_obj = init_logger_io_obj()
    # Getting necessary config parameters
    configs = ExperimentConfigs.read_json(configs_fp)
    configs_filt = configs.user.calculate_params.start_frame_from_csv
    fps = configs.auto.formatted_vid.fps
    csv_fp = configs.get_ref(configs_filt.csv_fp)
    name = configs.get_ref(configs_filt.name)
    assert fps != -1, "fps not yet set. Please calculate fps first with `proj.get_vid_metadata`."
    # Using the name of the video as the name of the experiment if not specified
    if name is None:
        name = get_name(keypoints_fp)
    # Reading csv_fp
    start_times_df = pd.read_csv(csv_fp, index_col=0)
    start_times_df.index = start_times_df.index.astype(str)
    assert name in start_times_df.index.values, (
        f"{name} not in {csv_fp}.\n"
        "Update the `name` parameter in the configs file or check the start_frames csv file."
    )
    # Getting start time in seconds
    start_sec = start_times_df.loc[name][0]
    # Converting to start frame
    start_frame = int(np.round(start_sec * fps, 0))
    # Writing to configs
    configs = ExperimentConfigs.read_json(configs_fp)
    configs.auto.start_frame = start_frame
    configs.write_json(configs_fp)
    return get_io_obj_content(io_obj)

start_frame_from_likelihood(keypoints_fp, configs_fp) staticmethod

Determines the starting frame of the experiment based on when the subject "likely" entered the frame of view.

This is done by looking at a sliding window of time. If the median likelihood of the subject existing in each frame across the sliding window is greater than the defined pcutoff, then the determine this as the start time.

Notes

The config file must contain the following parameters:

- user
    - calculate_params
        - start_frame
            - bodyparts: list[str]
            - window_sec: float
            - pcutoff: float

Source code in behavysis/processes/calculate_params.py
@staticmethod
def start_frame_from_likelihood(
    keypoints_fp: str,
    configs_fp: str,
) -> str:
    """
    Determines the starting frame of the experiment based on
    when the subject "likely" entered the frame of view.

    This is done by looking at a sliding window of time. If the median likelihood of the subject
    existing in each frame across the sliding window is greater than the defined pcutoff, then
    the determine this as the start time.

    Notes
    -----
    The config file must contain the following parameters:
    ```
    - user
        - calculate_params
            - start_frame
                - bodyparts: list[str]
                - window_sec: float
                - pcutoff: float
    ```
    """
    logger, io_obj = init_logger_io_obj()
    start_frame, stop_frame = calc_exists_from_likelihood(keypoints_fp, configs_fp, logger)
    # Writing to configs
    configs = ExperimentConfigs.read_json(configs_fp)
    configs.auto.start_frame = start_frame
    configs.write_json(configs_fp)
    return get_io_obj_content(io_obj)

stop_frame_from_dur(keypoints_fp, configs_fp) staticmethod

Calculates the end time according to the following equation:

stop_frame = start_frame + experiment_duration
Notes

The config file must contain the following parameters:

- user
    - calculate_params
        - stop_frame_from_dur
            - dur_sec: float

Source code in behavysis/processes/calculate_params.py
@staticmethod
def stop_frame_from_dur(keypoints_fp: str, configs_fp: str) -> str:
    """
    Calculates the end time according to the following equation:

    ```
    stop_frame = start_frame + experiment_duration
    ```

    Notes
    -----
    The config file must contain the following parameters:
    ```
    - user
        - calculate_params
            - stop_frame_from_dur
                - dur_sec: float
    ```
    """
    logger, io_obj = init_logger_io_obj()
    # Getting necessary config parameters
    configs = ExperimentConfigs.read_json(configs_fp)
    configs_filt = configs.user.calculate_params.stop_frame_from_dur
    dur_sec = configs.get_ref(configs_filt.dur_sec)
    start_frame = configs.auto.start_frame
    fps = configs.auto.formatted_vid.fps
    total_frames = configs.auto.formatted_vid.total_frames
    assert start_frame != -1, "start_frame is None. Please calculate start_frame first."
    assert fps != -1, "fps not yet set. Please calculate fps first with `proj.get_vid_metadata`."
    # Calculating stop_frame
    dur_frames = int(dur_sec * fps)
    stop_frame = start_frame + dur_frames
    # Make a warning if the use-specified dur_sec is larger than the duration of the video.
    if total_frames is None:
        logger.warning("The length of the video itself has not been calculated yet.")
    elif stop_frame > total_frames:
        logger.warning(
            "The user specified dur_sec in the configs file is greater "
            "than the actual length of the video. Please check to see if this video is "
            "too short or if the dur_sec value is incorrect."
        )
    # Writing to config
    configs = ExperimentConfigs.read_json(configs_fp)
    configs.auto.stop_frame = stop_frame
    configs.write_json(configs_fp)
    return get_io_obj_content(io_obj)

stop_frame_from_likelihood(keypoints_fp, configs_fp) staticmethod

Determines the starting frame of the experiment based on when the subject "likely" entered the frame of view.

This is done by looking at a sliding window of time. If the median likelihood of the subject existing in each frame across the sliding window is greater than the defined pcutoff, then the determine this as the start time.

Source code in behavysis/processes/calculate_params.py
@staticmethod
def stop_frame_from_likelihood(keypoints_fp: str, configs_fp: str) -> str:
    """
    Determines the starting frame of the experiment based on
    when the subject "likely" entered the frame of view.

    This is done by looking at a sliding window of time. If the median likelihood of the subject
    existing in each frame across the sliding window is greater than the defined pcutoff, then
    the determine this as the start time.

    """
    logger, io_obj = init_logger_io_obj()
    start_frame, stop_frame = calc_exists_from_likelihood(keypoints_fp, configs_fp, logger)
    # Writing to configs
    configs = ExperimentConfigs.read_json(configs_fp)
    configs.auto.stop_frame = stop_frame
    configs.write_json(configs_fp)
    return get_io_obj_content(io_obj)

behavysis.processes.preprocess.Preprocess

summary

Source code in behavysis/processes/preprocess.py
class Preprocess:
    """_summary_"""

    @classmethod
    def start_stop_trim(cls, src_fp: str, dst_fp: str, configs_fp: str, overwrite: bool) -> str:
        """
        Filters the rows of a DLC formatted dataframe to include only rows within the start
        and end time of the experiment, given a corresponding configs dict.

        Parameters
        ----------
        dlc_fp : str
            The file path of the input DLC formatted dataframe.
        dst_fp : str
            The file path of the output trimmed dataframe.
        configs_fp : str
            The file path of the configs dict.
        overwrite : bool
            If True, overwrite the output file if it already exists. If False, skip processing
            if the output file already exists.

        Returns
        -------
        str
            An outcome message indicating the result of the trimming process.

        Notes
        -----
        The config file must contain the following parameters:
        ```
        - user
            - preprocess
                - start_stop_trim
                    - start_frame: int
                    - stop_frame: int
        ```
        """
        logger, io_obj = init_logger_io_obj()
        if not overwrite and os.path.exists(dst_fp):
            logger.warning(file_exists_msg(dst_fp))
            return get_io_obj_content(io_obj)
        # Getting necessary config parameters
        configs = ExperimentConfigs.read_json(configs_fp)
        start_frame = configs.auto.start_frame
        stop_frame = configs.auto.stop_frame
        # Reading file
        keypoints_df = KeypointsDf.read(src_fp)
        # Trimming dataframe between start and stop frames
        keypoints_df = keypoints_df.loc[start_frame:stop_frame, :]
        KeypointsDf.write(keypoints_df, dst_fp)
        return get_io_obj_content(io_obj)

    @classmethod
    def interpolate_stationary(cls, src_fp: str, dst_fp: str, configs_fp: str, overwrite: bool) -> str:
        """
        If the point detection (above a certain threshold) is below a certain proportion, then the x and y coordinates are set to the given values (usually corners).
        Otherwise, does nothing (encouraged to run Preprocess.interpolate afterwards).

        Notes
        -----
        The config file must contain the following parameters:
        ```
        - user
            - preprocess
                - interpolate_stationary
                    [
                        - bodypart: str (assumed to be the "single" individual)
                        - pcutoff: float (between 0 and 1)
                        - pcutoff_all: float (between 0 and 1)
                        - x: float (between 0 and 1 - proportion of the video width)
                        - y: float (between 0 and 1 - proportion of the video height)
                    ]
        ```
        """
        logger, io_obj = init_logger_io_obj()
        if not overwrite and os.path.exists(dst_fp):
            logger.warning(file_exists_msg(dst_fp))
            return get_io_obj_content(io_obj)
        # Getting necessary config parameters list
        configs = ExperimentConfigs.read_json(configs_fp)
        configs_filt_ls = configs.user.preprocess.interpolate_stationary
        # scorer = configs.auto.scorer_name
        width_px = configs.auto.formatted_vid.width_px
        height_px = configs.auto.formatted_vid.height_px
        if width_px is None or height_px is None:
            raise ValueError(
                "Width and height must be provided in the formatted video. Try running FormatVid.format_vid."
            )
        # Reading file
        keypoints_df = KeypointsDf.read(src_fp)
        # Getting the scorer name
        scorer = keypoints_df.columns.unique(KeypointsDf.CN.SCORER.value)[0]
        # For each bodypart, filling in the given point
        for configs_filt in configs_filt_ls:
            # Getting config parameters
            bodypart = configs_filt.bodypart
            pcutoff = configs_filt.pcutoff
            pcutoff_all = configs_filt.pcutoff_all
            x = configs_filt.x
            y = configs_filt.y
            # Converting x and y from video proportions to pixel coordinates
            x = x * width_px
            y = y * height_px
            # Getting "is_detected" for each frame for the bodypart
            is_detected = keypoints_df[(scorer, "single", bodypart, CoordsCols.LIKELIHOOD.value)] >= pcutoff
            # If the bodypart is detected in less than the given proportion of the video, then set the x and y coordinates to the given values
            if is_detected.mean() < pcutoff_all:
                keypoints_df[(scorer, "single", bodypart, CoordsCols.X.value)] = x
                keypoints_df[(scorer, "single", bodypart, CoordsCols.Y.value)] = y
                keypoints_df[(scorer, "single", bodypart, CoordsCols.LIKELIHOOD.value)] = pcutoff
                logger.info(
                    f"{bodypart} is detected in less than {pcutoff_all} of the video."
                    f" Setting x and y coordinates to ({x}, {y})."
                )
            else:
                logger.info(
                    f"{bodypart} is detected in more than {pcutoff_all} of the video."
                    " No need for stationary interpolation."
                )
        # Saving
        KeypointsDf.write(keypoints_df, dst_fp)
        return get_io_obj_content(io_obj)

    @classmethod
    def interpolate(cls, src_fp: str, dst_fp: str, configs_fp: str, overwrite: bool) -> str:
        """
        "Smooths" out noticeable jitter of points, where the likelihood (and accuracy) of
        a point's coordinates are low (e.g., when the subject's head goes out of view). It
        does this by linearly interpolating the frames of a body part that are below a given
        likelihood pcutoff.

        Notes
        -----
        The config file must contain the following parameters:
        ```
        - user
            - preprocess
                - interpolate
                    - pcutoff: float
        ```
        """
        logger, io_obj = init_logger_io_obj()
        if not overwrite and os.path.exists(dst_fp):
            logger.warning(file_exists_msg(dst_fp))
            return get_io_obj_content(io_obj)
        # Getting necessary config parameters
        configs = ExperimentConfigs.read_json(configs_fp)
        configs_filt = configs.user.preprocess.interpolate
        # Reading file
        keypoints_df = KeypointsDf.read(src_fp)
        # Gettings the unique groups of (individual, bodypart) groups.
        unique_cols = keypoints_df.columns.droplevel(["coords"]).unique()
        # Setting low-likelihood points to Nan to later interpolate
        for scorer, indiv, bp in unique_cols:
            # Imputing Nan likelihood points with 0
            keypoints_df[(scorer, indiv, bp, CoordsCols.LIKELIHOOD.value)].fillna(value=0, inplace=True)
            # Setting x and y coordinates of points that have low likelihood to Nan
            to_remove = keypoints_df[(scorer, indiv, bp, CoordsCols.LIKELIHOOD.value)] < configs_filt.pcutoff
            keypoints_df.loc[to_remove, (scorer, indiv, bp, CoordsCols.X.value)] = np.nan
            keypoints_df.loc[to_remove, (scorer, indiv, bp, CoordsCols.Y.value)] = np.nan
        # linearly interpolating Nan x and y points.
        # Also backfilling points at the start.
        # Also forward filling points at the end.
        # Also imputing nan points with 0 (if the ENTIRE column is nan, then it's imputed)
        keypoints_df = keypoints_df.interpolate(method="linear").bfill().ffill()
        # if df.isnull().values.any() then the entire column is nan (log warning)
        # keypoints_df = keypoints_df.fillna(0)
        KeypointsDf.write(keypoints_df, dst_fp)
        return get_io_obj_content(io_obj)

    @classmethod
    def refine_ids(cls, src_fp: str, dst_fp: str, configs_fp: str, overwrite: bool) -> str:
        """
        Ensures that the identity is correctly tracked for maDLC.
        Assumes interpolate_points has already been run.

        Notes
        -----
        The config file must contain the following parameters:
        ```
        - user
            - preprocess
                - refine_ids
                    - marked: str
                    - unmarked: str
                    - marking: str
                    - window_sec: float
                    - metric: ["current", "rolling", "binned"]
        ```
        """
        logger, io_obj = init_logger_io_obj()
        if not overwrite and os.path.exists(dst_fp):
            logger.warning(file_exists_msg(dst_fp))
            return get_io_obj_content(io_obj)
        # Reading file
        keypoints_df = KeypointsDf.read(src_fp)
        # Getting necessary config parameters
        configs = ExperimentConfigs.read_json(configs_fp)
        configs_filt = configs.user.preprocess.refine_ids
        marked = configs.get_ref(configs_filt.marked)
        unmarked = configs.get_ref(configs_filt.unmarked)
        marking = configs.get_ref(configs_filt.marking)
        window_sec = configs.get_ref(configs_filt.window_sec)
        bpts = configs.get_ref(configs_filt.bodyparts)
        metric = configs.get_ref(configs_filt.metric)
        fps = configs.auto.formatted_vid.fps
        # Calculating more parameters
        window_frames = int(np.round(fps * window_sec, 0))
        # Error checking for invalid/non-existent column names marked, unmarked, and marking
        for column, level in [
            (marked, "individuals"),
            (unmarked, "individuals"),
            (marking, "bodyparts"),
        ]:
            if column not in keypoints_df.columns.unique(level):
                raise ValueError(
                    f'The marking value in the config file, "{column}", is not a column name in the DLC file.'
                )
        # Checking that bodyparts are all valid
        KeypointsDf.check_bpts_exist(keypoints_df, bpts)
        # Calculating the distances between the averaged bodycentres and the marking
        mark_dists_df = get_mark_dists_df(keypoints_df, marked, unmarked, [marking], bpts, logger)
        # Getting "to_switch" decision series for each frame
        switch_df = get_id_switch_df(mark_dists_df, window_frames, marked, unmarked, logger)
        # Updating df with the switched values
        switched_keypoints_df = switch_identities(keypoints_df, switch_df[metric], marked, unmarked, logger)
        KeypointsDf.write(switched_keypoints_df, dst_fp)
        return get_io_obj_content(io_obj)

interpolate(src_fp, dst_fp, configs_fp, overwrite) classmethod

"Smooths" out noticeable jitter of points, where the likelihood (and accuracy) of a point's coordinates are low (e.g., when the subject's head goes out of view). It does this by linearly interpolating the frames of a body part that are below a given likelihood pcutoff.

Notes

The config file must contain the following parameters:

- user
    - preprocess
        - interpolate
            - pcutoff: float

Source code in behavysis/processes/preprocess.py
@classmethod
def interpolate(cls, src_fp: str, dst_fp: str, configs_fp: str, overwrite: bool) -> str:
    """
    "Smooths" out noticeable jitter of points, where the likelihood (and accuracy) of
    a point's coordinates are low (e.g., when the subject's head goes out of view). It
    does this by linearly interpolating the frames of a body part that are below a given
    likelihood pcutoff.

    Notes
    -----
    The config file must contain the following parameters:
    ```
    - user
        - preprocess
            - interpolate
                - pcutoff: float
    ```
    """
    logger, io_obj = init_logger_io_obj()
    if not overwrite and os.path.exists(dst_fp):
        logger.warning(file_exists_msg(dst_fp))
        return get_io_obj_content(io_obj)
    # Getting necessary config parameters
    configs = ExperimentConfigs.read_json(configs_fp)
    configs_filt = configs.user.preprocess.interpolate
    # Reading file
    keypoints_df = KeypointsDf.read(src_fp)
    # Gettings the unique groups of (individual, bodypart) groups.
    unique_cols = keypoints_df.columns.droplevel(["coords"]).unique()
    # Setting low-likelihood points to Nan to later interpolate
    for scorer, indiv, bp in unique_cols:
        # Imputing Nan likelihood points with 0
        keypoints_df[(scorer, indiv, bp, CoordsCols.LIKELIHOOD.value)].fillna(value=0, inplace=True)
        # Setting x and y coordinates of points that have low likelihood to Nan
        to_remove = keypoints_df[(scorer, indiv, bp, CoordsCols.LIKELIHOOD.value)] < configs_filt.pcutoff
        keypoints_df.loc[to_remove, (scorer, indiv, bp, CoordsCols.X.value)] = np.nan
        keypoints_df.loc[to_remove, (scorer, indiv, bp, CoordsCols.Y.value)] = np.nan
    # linearly interpolating Nan x and y points.
    # Also backfilling points at the start.
    # Also forward filling points at the end.
    # Also imputing nan points with 0 (if the ENTIRE column is nan, then it's imputed)
    keypoints_df = keypoints_df.interpolate(method="linear").bfill().ffill()
    # if df.isnull().values.any() then the entire column is nan (log warning)
    # keypoints_df = keypoints_df.fillna(0)
    KeypointsDf.write(keypoints_df, dst_fp)
    return get_io_obj_content(io_obj)

interpolate_stationary(src_fp, dst_fp, configs_fp, overwrite) classmethod

If the point detection (above a certain threshold) is below a certain proportion, then the x and y coordinates are set to the given values (usually corners). Otherwise, does nothing (encouraged to run Preprocess.interpolate afterwards).

Notes

The config file must contain the following parameters:

- user
    - preprocess
        - interpolate_stationary
            [
                - bodypart: str (assumed to be the "single" individual)
                - pcutoff: float (between 0 and 1)
                - pcutoff_all: float (between 0 and 1)
                - x: float (between 0 and 1 - proportion of the video width)
                - y: float (between 0 and 1 - proportion of the video height)
            ]

Source code in behavysis/processes/preprocess.py
@classmethod
def interpolate_stationary(cls, src_fp: str, dst_fp: str, configs_fp: str, overwrite: bool) -> str:
    """
    If the point detection (above a certain threshold) is below a certain proportion, then the x and y coordinates are set to the given values (usually corners).
    Otherwise, does nothing (encouraged to run Preprocess.interpolate afterwards).

    Notes
    -----
    The config file must contain the following parameters:
    ```
    - user
        - preprocess
            - interpolate_stationary
                [
                    - bodypart: str (assumed to be the "single" individual)
                    - pcutoff: float (between 0 and 1)
                    - pcutoff_all: float (between 0 and 1)
                    - x: float (between 0 and 1 - proportion of the video width)
                    - y: float (between 0 and 1 - proportion of the video height)
                ]
    ```
    """
    logger, io_obj = init_logger_io_obj()
    if not overwrite and os.path.exists(dst_fp):
        logger.warning(file_exists_msg(dst_fp))
        return get_io_obj_content(io_obj)
    # Getting necessary config parameters list
    configs = ExperimentConfigs.read_json(configs_fp)
    configs_filt_ls = configs.user.preprocess.interpolate_stationary
    # scorer = configs.auto.scorer_name
    width_px = configs.auto.formatted_vid.width_px
    height_px = configs.auto.formatted_vid.height_px
    if width_px is None or height_px is None:
        raise ValueError(
            "Width and height must be provided in the formatted video. Try running FormatVid.format_vid."
        )
    # Reading file
    keypoints_df = KeypointsDf.read(src_fp)
    # Getting the scorer name
    scorer = keypoints_df.columns.unique(KeypointsDf.CN.SCORER.value)[0]
    # For each bodypart, filling in the given point
    for configs_filt in configs_filt_ls:
        # Getting config parameters
        bodypart = configs_filt.bodypart
        pcutoff = configs_filt.pcutoff
        pcutoff_all = configs_filt.pcutoff_all
        x = configs_filt.x
        y = configs_filt.y
        # Converting x and y from video proportions to pixel coordinates
        x = x * width_px
        y = y * height_px
        # Getting "is_detected" for each frame for the bodypart
        is_detected = keypoints_df[(scorer, "single", bodypart, CoordsCols.LIKELIHOOD.value)] >= pcutoff
        # If the bodypart is detected in less than the given proportion of the video, then set the x and y coordinates to the given values
        if is_detected.mean() < pcutoff_all:
            keypoints_df[(scorer, "single", bodypart, CoordsCols.X.value)] = x
            keypoints_df[(scorer, "single", bodypart, CoordsCols.Y.value)] = y
            keypoints_df[(scorer, "single", bodypart, CoordsCols.LIKELIHOOD.value)] = pcutoff
            logger.info(
                f"{bodypart} is detected in less than {pcutoff_all} of the video."
                f" Setting x and y coordinates to ({x}, {y})."
            )
        else:
            logger.info(
                f"{bodypart} is detected in more than {pcutoff_all} of the video."
                " No need for stationary interpolation."
            )
    # Saving
    KeypointsDf.write(keypoints_df, dst_fp)
    return get_io_obj_content(io_obj)

refine_ids(src_fp, dst_fp, configs_fp, overwrite) classmethod

Ensures that the identity is correctly tracked for maDLC. Assumes interpolate_points has already been run.

Notes

The config file must contain the following parameters:

- user
    - preprocess
        - refine_ids
            - marked: str
            - unmarked: str
            - marking: str
            - window_sec: float
            - metric: ["current", "rolling", "binned"]

Source code in behavysis/processes/preprocess.py
@classmethod
def refine_ids(cls, src_fp: str, dst_fp: str, configs_fp: str, overwrite: bool) -> str:
    """
    Ensures that the identity is correctly tracked for maDLC.
    Assumes interpolate_points has already been run.

    Notes
    -----
    The config file must contain the following parameters:
    ```
    - user
        - preprocess
            - refine_ids
                - marked: str
                - unmarked: str
                - marking: str
                - window_sec: float
                - metric: ["current", "rolling", "binned"]
    ```
    """
    logger, io_obj = init_logger_io_obj()
    if not overwrite and os.path.exists(dst_fp):
        logger.warning(file_exists_msg(dst_fp))
        return get_io_obj_content(io_obj)
    # Reading file
    keypoints_df = KeypointsDf.read(src_fp)
    # Getting necessary config parameters
    configs = ExperimentConfigs.read_json(configs_fp)
    configs_filt = configs.user.preprocess.refine_ids
    marked = configs.get_ref(configs_filt.marked)
    unmarked = configs.get_ref(configs_filt.unmarked)
    marking = configs.get_ref(configs_filt.marking)
    window_sec = configs.get_ref(configs_filt.window_sec)
    bpts = configs.get_ref(configs_filt.bodyparts)
    metric = configs.get_ref(configs_filt.metric)
    fps = configs.auto.formatted_vid.fps
    # Calculating more parameters
    window_frames = int(np.round(fps * window_sec, 0))
    # Error checking for invalid/non-existent column names marked, unmarked, and marking
    for column, level in [
        (marked, "individuals"),
        (unmarked, "individuals"),
        (marking, "bodyparts"),
    ]:
        if column not in keypoints_df.columns.unique(level):
            raise ValueError(
                f'The marking value in the config file, "{column}", is not a column name in the DLC file.'
            )
    # Checking that bodyparts are all valid
    KeypointsDf.check_bpts_exist(keypoints_df, bpts)
    # Calculating the distances between the averaged bodycentres and the marking
    mark_dists_df = get_mark_dists_df(keypoints_df, marked, unmarked, [marking], bpts, logger)
    # Getting "to_switch" decision series for each frame
    switch_df = get_id_switch_df(mark_dists_df, window_frames, marked, unmarked, logger)
    # Updating df with the switched values
    switched_keypoints_df = switch_identities(keypoints_df, switch_df[metric], marked, unmarked, logger)
    KeypointsDf.write(switched_keypoints_df, dst_fp)
    return get_io_obj_content(io_obj)

start_stop_trim(src_fp, dst_fp, configs_fp, overwrite) classmethod

Filters the rows of a DLC formatted dataframe to include only rows within the start and end time of the experiment, given a corresponding configs dict.

Parameters:

Name Type Description Default
dlc_fp str

The file path of the input DLC formatted dataframe.

required
dst_fp str

The file path of the output trimmed dataframe.

required
configs_fp str

The file path of the configs dict.

required
overwrite bool

If True, overwrite the output file if it already exists. If False, skip processing if the output file already exists.

required

Returns:

Type Description
str

An outcome message indicating the result of the trimming process.

Notes

The config file must contain the following parameters:

- user
    - preprocess
        - start_stop_trim
            - start_frame: int
            - stop_frame: int

Source code in behavysis/processes/preprocess.py
@classmethod
def start_stop_trim(cls, src_fp: str, dst_fp: str, configs_fp: str, overwrite: bool) -> str:
    """
    Filters the rows of a DLC formatted dataframe to include only rows within the start
    and end time of the experiment, given a corresponding configs dict.

    Parameters
    ----------
    dlc_fp : str
        The file path of the input DLC formatted dataframe.
    dst_fp : str
        The file path of the output trimmed dataframe.
    configs_fp : str
        The file path of the configs dict.
    overwrite : bool
        If True, overwrite the output file if it already exists. If False, skip processing
        if the output file already exists.

    Returns
    -------
    str
        An outcome message indicating the result of the trimming process.

    Notes
    -----
    The config file must contain the following parameters:
    ```
    - user
        - preprocess
            - start_stop_trim
                - start_frame: int
                - stop_frame: int
    ```
    """
    logger, io_obj = init_logger_io_obj()
    if not overwrite and os.path.exists(dst_fp):
        logger.warning(file_exists_msg(dst_fp))
        return get_io_obj_content(io_obj)
    # Getting necessary config parameters
    configs = ExperimentConfigs.read_json(configs_fp)
    start_frame = configs.auto.start_frame
    stop_frame = configs.auto.stop_frame
    # Reading file
    keypoints_df = KeypointsDf.read(src_fp)
    # Trimming dataframe between start and stop frames
    keypoints_df = keypoints_df.loc[start_frame:stop_frame, :]
    KeypointsDf.write(keypoints_df, dst_fp)
    return get_io_obj_content(io_obj)

behavysis.processes.extract_features.ExtractFeatures

Source code in behavysis/processes/extract_features.py
class ExtractFeatures:
    @staticmethod
    def extract_features(
        keypoints_fp: str,
        features_fp: str,
        configs_fp: str,
        overwrite: bool,
    ) -> str:
        """
        Extracting features from preprocessed keypoints dataframe using SimBA
        processes.

        Parameters
        ----------
        keypoints_fp : str
            Preprocessed keypoints filepath.
        dst_fp : str
            Filepath to save extracted_features dataframe.
        configs_fp : str
            Configs JSON filepath.
        overwrite : bool
            Whether to overwrite the dst_fp file (if it exists).

        Returns
        -------
        str
            The outcome of the process.
        """
        logger, io_obj = init_logger_io_obj()
        if not overwrite and os.path.exists(features_fp):
            logger.warning(file_exists_msg(features_fp))
            return get_io_obj_content(io_obj)
        # Getting directory and file paths
        name = get_name(keypoints_fp)
        cpid = get_cpid()
        configs_dir = os.path.dirname(configs_fp)
        simba_in_dir = os.path.join(CACHE_DIR, f"input_{cpid}")
        simba_dir = os.path.join(CACHE_DIR, f"simba_proj_{cpid}")
        simba_features_dir = os.path.join(simba_dir, "project_folder", "csv", "features_extracted")
        simba_features_fp = os.path.join(simba_features_dir, f"{name}.csv")
        # Removing temp folders (preemptively)
        silent_remove(simba_in_dir)
        silent_remove(simba_dir)
        # Preparing keypoints dataframes for input to SimBA project
        os.makedirs(simba_in_dir, exist_ok=True)
        simba_in_fp = os.path.join(simba_in_dir, f"{name}.csv")
        # Selecting bodyparts for SimBA (8 bpts, 2 indivs)
        keypoints_df = KeypointsDf.read(keypoints_fp)
        keypoints_df = select_cols(keypoints_df, configs_fp, logger)
        # Saving keypoints index to use in the SimBA features extraction df
        index = keypoints_df.index
        # Need to remove index name for SimBA to import correctly
        keypoints_df.index.name = None
        # Saving as csv
        keypoints_df.to_csv(simba_in_fp)
        # Running SimBA env and script to run SimBA feature extraction
        run_simba_subproc(simba_dir, simba_in_dir, configs_dir, CACHE_DIR, cpid, logger)
        # Exporting SimBA feature extraction csv to disk
        export2df(simba_features_fp, features_fp, index, logger)
        # Removing temp folders
        silent_remove(simba_in_dir)
        silent_remove(simba_dir)
        return get_io_obj_content(io_obj)

extract_features(keypoints_fp, features_fp, configs_fp, overwrite) staticmethod

Extracting features from preprocessed keypoints dataframe using SimBA processes.

Parameters:

Name Type Description Default
keypoints_fp str

Preprocessed keypoints filepath.

required
dst_fp str

Filepath to save extracted_features dataframe.

required
configs_fp str

Configs JSON filepath.

required
overwrite bool

Whether to overwrite the dst_fp file (if it exists).

required

Returns:

Type Description
str

The outcome of the process.

Source code in behavysis/processes/extract_features.py
@staticmethod
def extract_features(
    keypoints_fp: str,
    features_fp: str,
    configs_fp: str,
    overwrite: bool,
) -> str:
    """
    Extracting features from preprocessed keypoints dataframe using SimBA
    processes.

    Parameters
    ----------
    keypoints_fp : str
        Preprocessed keypoints filepath.
    dst_fp : str
        Filepath to save extracted_features dataframe.
    configs_fp : str
        Configs JSON filepath.
    overwrite : bool
        Whether to overwrite the dst_fp file (if it exists).

    Returns
    -------
    str
        The outcome of the process.
    """
    logger, io_obj = init_logger_io_obj()
    if not overwrite and os.path.exists(features_fp):
        logger.warning(file_exists_msg(features_fp))
        return get_io_obj_content(io_obj)
    # Getting directory and file paths
    name = get_name(keypoints_fp)
    cpid = get_cpid()
    configs_dir = os.path.dirname(configs_fp)
    simba_in_dir = os.path.join(CACHE_DIR, f"input_{cpid}")
    simba_dir = os.path.join(CACHE_DIR, f"simba_proj_{cpid}")
    simba_features_dir = os.path.join(simba_dir, "project_folder", "csv", "features_extracted")
    simba_features_fp = os.path.join(simba_features_dir, f"{name}.csv")
    # Removing temp folders (preemptively)
    silent_remove(simba_in_dir)
    silent_remove(simba_dir)
    # Preparing keypoints dataframes for input to SimBA project
    os.makedirs(simba_in_dir, exist_ok=True)
    simba_in_fp = os.path.join(simba_in_dir, f"{name}.csv")
    # Selecting bodyparts for SimBA (8 bpts, 2 indivs)
    keypoints_df = KeypointsDf.read(keypoints_fp)
    keypoints_df = select_cols(keypoints_df, configs_fp, logger)
    # Saving keypoints index to use in the SimBA features extraction df
    index = keypoints_df.index
    # Need to remove index name for SimBA to import correctly
    keypoints_df.index.name = None
    # Saving as csv
    keypoints_df.to_csv(simba_in_fp)
    # Running SimBA env and script to run SimBA feature extraction
    run_simba_subproc(simba_dir, simba_in_dir, configs_dir, CACHE_DIR, cpid, logger)
    # Exporting SimBA feature extraction csv to disk
    export2df(simba_features_fp, features_fp, index, logger)
    # Removing temp folders
    silent_remove(simba_in_dir)
    silent_remove(simba_dir)
    return get_io_obj_content(io_obj)

behavysis.processes.classify_behavs.ClassifyBehavs

Source code in behavysis/processes/classify_behavs.py
class ClassifyBehavs:
    @classmethod
    def classify_behavs(
        cls,
        features_fp: str,
        behavs_fp: str,
        configs_fp: str,
        overwrite: bool,
    ) -> str:
        """
        Given model config files in the BehavClassifier format, generates beahviour predidctions
        on the given extracted features dataframe.

        Parameters
        ----------
        features_fp : str
            _description_
        dst_fp : str
            _description_
        configs_fp : str
            _description_
        overwrite : bool
            Whether to overwrite the output file (if it exists).

        Returns
        -------
        str
            Description of the function's outcome.

        Notes
        -----
        The config file must contain the following parameters:
        ```
        - user
            - classify_behaviours
                - models: list[str]
        ```
        Where the `models` list is a list of `model_config.json` filepaths.
        """
        logger, io_obj = init_logger_io_obj()
        if not overwrite and os.path.exists(behavs_fp):
            logger.warning(file_exists_msg(behavs_fp))
            return get_io_obj_content(io_obj)
        # Getting necessary config parameters
        configs = ExperimentConfigs.read_json(configs_fp)
        fps = configs.auto.formatted_vid.fps
        model_configs_ls = configs.user.classify_behavs
        # Getting features data
        features_df = FeaturesDf.read(features_fp)
        # Initialising y_preds df
        # Getting predictions for each classifier model and saving
        # in a list of pd.DataFrames
        behavs_df_ls = []
        for model_config in model_configs_ls:
            proj_dir = configs.get_ref(model_config.proj_dir)
            behav_name = configs.get_ref(model_config.behav_name)
            behav_model = BehavClassifier.load(proj_dir, behav_name)
            pcutoff = get_pcutoff(configs.get_ref(model_config.pcutoff), behav_model.configs.pcutoff, logger)
            min_window_secs = configs.get_ref(model_config.min_empty_window_secs)
            min_window_frames = int(np.round(min_window_secs * fps))
            # Running the clf pipeline
            behav_df_i = behav_model.pipeline_inference(features_df)
            # Getting prob and pred column names
            prob_col = (behav_name, OutcomesPredictedCols.PROB.value)
            pred_col = (behav_name, OutcomesPredictedCols.PRED.value)
            # Using pcutoff to get binary predictions
            behav_df_i[pred_col] = (behav_df_i[prob_col] > pcutoff).astype(int)
            # Filling in small non-behav bouts
            behav_df_i[pred_col] = merge_bouts(behav_df_i[pred_col], min_window_frames, logger)
            # Adding model predictions df to list
            behavs_df_ls.append(behav_df_i)
            # Logging outcome
            logger.info(f"Completed {behav_name} classification.")
        # If no models were run, then return outcome
        if len(behavs_df_ls) == 0:
            return get_io_obj_content(io_obj)
        # Concatenating predictions to a single dataframe
        behavs_df = pd.concat(behavs_df_ls, axis=1)
        # Saving behav_preds df
        BehavPredictedDf.write(behavs_df, behavs_fp)
        return get_io_obj_content(io_obj)

classify_behavs(features_fp, behavs_fp, configs_fp, overwrite) classmethod

Given model config files in the BehavClassifier format, generates beahviour predidctions on the given extracted features dataframe.

Parameters:

Name Type Description Default
features_fp str

description

required
dst_fp str

description

required
configs_fp str

description

required
overwrite bool

Whether to overwrite the output file (if it exists).

required

Returns:

Type Description
str

Description of the function's outcome.

Notes

The config file must contain the following parameters:

- user
    - classify_behaviours
        - models: list[str]
Where the models list is a list of model_config.json filepaths.

Source code in behavysis/processes/classify_behavs.py
@classmethod
def classify_behavs(
    cls,
    features_fp: str,
    behavs_fp: str,
    configs_fp: str,
    overwrite: bool,
) -> str:
    """
    Given model config files in the BehavClassifier format, generates beahviour predidctions
    on the given extracted features dataframe.

    Parameters
    ----------
    features_fp : str
        _description_
    dst_fp : str
        _description_
    configs_fp : str
        _description_
    overwrite : bool
        Whether to overwrite the output file (if it exists).

    Returns
    -------
    str
        Description of the function's outcome.

    Notes
    -----
    The config file must contain the following parameters:
    ```
    - user
        - classify_behaviours
            - models: list[str]
    ```
    Where the `models` list is a list of `model_config.json` filepaths.
    """
    logger, io_obj = init_logger_io_obj()
    if not overwrite and os.path.exists(behavs_fp):
        logger.warning(file_exists_msg(behavs_fp))
        return get_io_obj_content(io_obj)
    # Getting necessary config parameters
    configs = ExperimentConfigs.read_json(configs_fp)
    fps = configs.auto.formatted_vid.fps
    model_configs_ls = configs.user.classify_behavs
    # Getting features data
    features_df = FeaturesDf.read(features_fp)
    # Initialising y_preds df
    # Getting predictions for each classifier model and saving
    # in a list of pd.DataFrames
    behavs_df_ls = []
    for model_config in model_configs_ls:
        proj_dir = configs.get_ref(model_config.proj_dir)
        behav_name = configs.get_ref(model_config.behav_name)
        behav_model = BehavClassifier.load(proj_dir, behav_name)
        pcutoff = get_pcutoff(configs.get_ref(model_config.pcutoff), behav_model.configs.pcutoff, logger)
        min_window_secs = configs.get_ref(model_config.min_empty_window_secs)
        min_window_frames = int(np.round(min_window_secs * fps))
        # Running the clf pipeline
        behav_df_i = behav_model.pipeline_inference(features_df)
        # Getting prob and pred column names
        prob_col = (behav_name, OutcomesPredictedCols.PROB.value)
        pred_col = (behav_name, OutcomesPredictedCols.PRED.value)
        # Using pcutoff to get binary predictions
        behav_df_i[pred_col] = (behav_df_i[prob_col] > pcutoff).astype(int)
        # Filling in small non-behav bouts
        behav_df_i[pred_col] = merge_bouts(behav_df_i[pred_col], min_window_frames, logger)
        # Adding model predictions df to list
        behavs_df_ls.append(behav_df_i)
        # Logging outcome
        logger.info(f"Completed {behav_name} classification.")
    # If no models were run, then return outcome
    if len(behavs_df_ls) == 0:
        return get_io_obj_content(io_obj)
    # Concatenating predictions to a single dataframe
    behavs_df = pd.concat(behavs_df_ls, axis=1)
    # Saving behav_preds df
    BehavPredictedDf.write(behavs_df, behavs_fp)
    return get_io_obj_content(io_obj)

behavysis.processes.analyse_behavs.AnalyseBehavs

Source code in behavysis/processes/analyse_behavs.py
class AnalyseBehavs:
    @staticmethod
    def analyse_behavs(
        behavs_fp: str,
        dst_dir: str,
        configs_fp: str,
    ) -> str:
        """
        Takes a behavs dataframe and generates a summary and binned version of the data.
        """
        logger, io_obj = init_logger_io_obj()
        f_name = get_func_name_in_stack()
        name = get_name(behavs_fp)
        dst_subdir = os.path.join(dst_dir, f_name)
        # Calculating the deltas (changes in body position) between each frame for the subject
        configs = ExperimentConfigs.read_json(configs_fp)
        fps, _, _, _, bins_ls, cbins_ls = configs.get_analysis_configs()
        # Loading in dataframe
        behavs_df = BehavScoredDf.read(behavs_fp)
        # Setting all na and undetermined behav to non-behav
        behavs_df = behavs_df.fillna(0).replace(BehavValues.UNDETERMINED.value, BehavValues.NON_BEHAV.value)
        # Getting the behaviour names and each user_defined for the behaviour
        # Not incl. the `pred` or `prob` (`prob` shouldn't be here anyway) columns
        columns = np.isin(
            behavs_df.columns.get_level_values(BehavScoredDf.CN.OUTCOMES.value),
            [BehavScoredDf.OutcomesCols.PRED.value],
            invert=True,
        )
        behavs_df = behavs_df.loc[:, columns]
        behavs_df = AnalysisDf.basic_clean(behavs_df)
        # Writing the behavs_df to the fbf file
        fbf_fp = os.path.join(dst_subdir, FBF, f"{name}.{AnalysisDf.IO}")
        AnalysisDf.write(behavs_df, fbf_fp)
        # Making the summary and binned dataframes
        AnalysisBinnedDf.summary_binned_behavs(
            behavs_df,
            dst_subdir,
            name,
            fps,
            bins_ls,
            cbins_ls,
        )
        return get_io_obj_content(io_obj)

analyse_behavs(behavs_fp, dst_dir, configs_fp) staticmethod

Takes a behavs dataframe and generates a summary and binned version of the data.

Source code in behavysis/processes/analyse_behavs.py
@staticmethod
def analyse_behavs(
    behavs_fp: str,
    dst_dir: str,
    configs_fp: str,
) -> str:
    """
    Takes a behavs dataframe and generates a summary and binned version of the data.
    """
    logger, io_obj = init_logger_io_obj()
    f_name = get_func_name_in_stack()
    name = get_name(behavs_fp)
    dst_subdir = os.path.join(dst_dir, f_name)
    # Calculating the deltas (changes in body position) between each frame for the subject
    configs = ExperimentConfigs.read_json(configs_fp)
    fps, _, _, _, bins_ls, cbins_ls = configs.get_analysis_configs()
    # Loading in dataframe
    behavs_df = BehavScoredDf.read(behavs_fp)
    # Setting all na and undetermined behav to non-behav
    behavs_df = behavs_df.fillna(0).replace(BehavValues.UNDETERMINED.value, BehavValues.NON_BEHAV.value)
    # Getting the behaviour names and each user_defined for the behaviour
    # Not incl. the `pred` or `prob` (`prob` shouldn't be here anyway) columns
    columns = np.isin(
        behavs_df.columns.get_level_values(BehavScoredDf.CN.OUTCOMES.value),
        [BehavScoredDf.OutcomesCols.PRED.value],
        invert=True,
    )
    behavs_df = behavs_df.loc[:, columns]
    behavs_df = AnalysisDf.basic_clean(behavs_df)
    # Writing the behavs_df to the fbf file
    fbf_fp = os.path.join(dst_subdir, FBF, f"{name}.{AnalysisDf.IO}")
    AnalysisDf.write(behavs_df, fbf_fp)
    # Making the summary and binned dataframes
    AnalysisBinnedDf.summary_binned_behavs(
        behavs_df,
        dst_subdir,
        name,
        fps,
        bins_ls,
        cbins_ls,
    )
    return get_io_obj_content(io_obj)

behavysis.processes.analyse.Analyse

Source code in behavysis/processes/analyse.py
class Analyse:
    @staticmethod
    def in_roi(
        keypoints_fp: str,
        dst_dir: str,
        configs_fp: str,
    ) -> str:
        """
        Determines the frames in which the subject is inside the cage (from average
        of given bodypoints).

        Points are `padding_px` padded (away) from center.
        """
        logger, io_obj = init_logger_io_obj()
        f_name = get_func_name_in_stack()
        name = get_name(keypoints_fp)
        dst_subdir = os.path.join(dst_dir, f_name)
        # Calculating the deltas (changes in body position) between each frame for the subject
        configs = ExperimentConfigs.read_json(configs_fp)
        fps, _, _, px_per_mm, bins_ls, cbins_ls = configs.get_analysis_configs()
        configs_filt_ls = configs.user.analyse.in_roi
        # Loading in dataframe
        keypoints_df = KeypointsDf.clean_headings(KeypointsDf.read(keypoints_fp))
        assert keypoints_df.shape[0] > 0, "No frames in keypoints_df. Please check keypoints file."
        # Getting indivs list
        indivs, _ = KeypointsDf.get_indivs_bpts(keypoints_df)
        # Making analysis_df
        analysis_df_ls = []
        scatter_df_ls = []
        corners_df_ls = []
        roi_names_ls = []
        # For each roi, calculate the in-roi status of the subject
        x = CoordsCols.X.value
        y = CoordsCols.Y.value
        idx = pd.IndexSlice
        for configs_filt in configs_filt_ls:
            # Getting necessary config parameters
            roi_name = configs.get_ref(configs_filt.roi_name)
            is_in = configs.get_ref(configs_filt.is_in)
            bpts = configs.get_ref(configs_filt.bodyparts)
            padding_mm = configs.get_ref(configs_filt.padding_mm)
            roi_corners = configs.get_ref(configs_filt.roi_corners)
            # Calculating more parameters
            padding_px = padding_mm / px_per_mm
            # Checking bodyparts and roi_corners exist
            KeypointsDf.check_bpts_exist(keypoints_df, bpts)
            KeypointsDf.check_bpts_exist(keypoints_df, roi_corners)
            # Getting average corner coordinates. Assumes arena does not move.
            corners_i_df = pd.DataFrame([keypoints_df[(IndivCols.SINGLE.value, pt)].mean() for pt in roi_corners]).drop(
                columns=["likelihood"]
            )
            # Adjusting x-y to have `padding_px` dilation/erosion from the points themselves
            roi_center = corners_i_df.mean()
            for i in corners_i_df.index:
                # Calculating angle from centre to point (going out from centre)
                theta = np.arctan2(
                    corners_i_df.loc[i, y] - roi_center[y],
                    corners_i_df.loc[i, x] - roi_center[x],
                )
                # Getting x, y distances so point is `padding_px` padded (away) from center
                corners_i_df.loc[i, x] = corners_i_df.loc[i, x] + (padding_px * np.cos(theta))
                corners_i_df.loc[i, y] = corners_i_df.loc[i, y] + (padding_px * np.sin(theta))
            # Making the res_df
            analysis_i_df = AnalysisDf.init_df(keypoints_df.index)
            # For each individual, getting the in-roi status
            for indiv in indivs:
                # Getting average body center (x, y) for each individual
                analysis_i_df[(indiv, x)] = keypoints_df.loc[:, idx[indiv, bpts, x]].mean(axis=1).values  # type: ignore
                analysis_i_df[(indiv, y)] = keypoints_df.loc[:, idx[indiv, bpts, y]].mean(axis=1).values  # type: ignore
                # Determining if the indiv body center is in the ROI
                analysis_i_df[(indiv, roi_name)] = analysis_i_df[indiv].apply(
                    lambda pt: pt_in_roi(pt, corners_i_df, logger), axis=1
                )
            # Inverting in_roi status if is_in is False
            if not is_in:
                analysis_i_df.loc[:, idx[:, roi_name]] = ~analysis_i_df.loc[:, idx[:, roi_name]]  # type: ignore
            analysis_df_ls.append(analysis_i_df.loc[:, idx[:, roi_name]].astype(np.int8))  # type: ignore
            scatter_df_ls.append(analysis_i_df)
            corners_df_ls.append(corners_i_df)
            roi_names_ls.append(roi_name)
        # Concatenating all analysis_df_ls and roi_corners_df_ls
        analysis_df = pd.concat(analysis_df_ls, axis=1)
        scatter_df = pd.concat(scatter_df_ls, axis=1)
        corners_df = pd.concat(corners_df_ls, keys=roi_names_ls, names=["roi"]).reset_index(level="roi")
        # Saving analysis_df
        fbf_fp = os.path.join(dst_subdir, FBF, f"{name}.{AnalysisDf.IO}")
        AnalysisDf.write(analysis_df, fbf_fp)
        plot_fp = os.path.join(dst_subdir, "scatter_plot", f"{name}.png")
        AnalysisDf.make_location_scatterplot(scatter_df, corners_df, plot_fp)
        # Summarising and binning analysis_df
        AnalysisBinnedDf.summary_binned_behavs(
            analysis_df,
            dst_subdir,
            name,
            fps,
            bins_ls,
            cbins_ls,
        )
        return get_io_obj_content(io_obj)

    @staticmethod
    def speed(
        keypoints_fp: str,
        dst_dir: str,
        configs_fp: str,
    ) -> str:
        """
        Determines the speed of the subject in each frame.
        """
        logger, io_obj = init_logger_io_obj()
        f_name = get_func_name_in_stack()
        name = get_name(keypoints_fp)
        dst_subdir = os.path.join(dst_dir, f_name)
        # Calculating the deltas (changes in body position) between each frame for the subject
        configs = ExperimentConfigs.read_json(configs_fp)
        fps, _, _, px_per_mm, bins_ls, cbins_ls = configs.get_analysis_configs()
        configs_filt = configs.user.analyse.speed
        bpts = configs.get_ref(configs_filt.bodyparts)
        smoothing_sec = configs.get_ref(configs_filt.smoothing_sec)
        # Calculating more parameters
        smoothing_frames = int(smoothing_sec * fps)

        # Loading in dataframe
        keypoints_df = KeypointsDf.clean_headings(KeypointsDf.read(keypoints_fp))
        assert keypoints_df.shape[0] > 0, "No frames in keypoints_df. Please check keypoints file."
        # Checking body-centre bodypart exists
        KeypointsDf.check_bpts_exist(keypoints_df, bpts)
        # Getting indivs and bpts list
        indivs, _ = KeypointsDf.get_indivs_bpts(keypoints_df)

        # Calculating speed of subject for each frame
        analysis_df = AnalysisDf.init_df(keypoints_df.index)
        # keypoints_df.index = analysis_df.index
        idx = pd.IndexSlice
        for indiv in indivs:
            # Making a rolling window of 3 frames for average body-centre
            # Otherwise jitter contributes to movement
            jitter_frames = 3
            smoothed_xy_df = keypoints_df.rolling(window=jitter_frames, min_periods=1, center=True).agg(np.nanmean)
            # Getting changes in x-y values between frames (deltas)
            delta_x = smoothed_xy_df.loc[:, idx[indiv, bpts, "x"]].mean(axis=1).diff()  # type: ignore
            delta_y = smoothed_xy_df.loc[:, idx[indiv, bpts, "y"]].mean(axis=1).diff()  # type: ignore
            delta = np.array(np.sqrt(np.power(delta_x, 2) + np.power(delta_y, 2)))
            # Storing speed (raw and smoothed)
            analysis_df[(indiv, "SpeedMMperSec")] = (delta / px_per_mm) * fps
            analysis_df[(indiv, "SpeedMMperSecSmoothed")] = (
                analysis_df[(indiv, "SpeedMMperSec")]
                .rolling(window=smoothing_frames, min_periods=1, center=True)
                .agg(np.nanmean)
            )
        # Backfilling the analysis_df so no nan's
        analysis_df = analysis_df.bfill()
        # Saving analysis_df
        fbf_fp = os.path.join(dst_subdir, FBF, f"{name}.{AnalysisDf.IO}")
        AnalysisDf.write(analysis_df, fbf_fp)

        # Summarising and binning analysis_df
        AnalysisBinnedDf.summary_binned_quantitative(
            analysis_df,
            dst_subdir,
            name,
            fps,
            bins_ls,
            cbins_ls,
        )
        return get_io_obj_content(io_obj)

    @staticmethod
    def social_distance(
        keypoints_fp: str,
        dst_dir: str,
        configs_fp: str,
    ) -> str:
        """
        Determines the speed of the subject in each frame.
        """
        logger, io_obj = init_logger_io_obj()
        f_name = get_func_name_in_stack()
        name = get_name(keypoints_fp)
        dst_subdir = os.path.join(dst_dir, f_name)
        # Calculating the deltas (changes in body position) between each frame for the subject
        configs = ExperimentConfigs.read_json(configs_fp)
        fps, _, _, px_per_mm, bins_ls, cbins_ls = configs.get_analysis_configs()
        configs_filt = configs.user.analyse.social_distance
        bpts = configs.get_ref(configs_filt.bodyparts)
        smoothing_sec = configs.get_ref(configs_filt.smoothing_sec)
        # Calculating more parameters
        smoothing_frames = int(smoothing_sec * fps)

        # Loading in dataframe
        keypoints_df = KeypointsDf.clean_headings(KeypointsDf.read(keypoints_fp))
        assert keypoints_df.shape[0] > 0, "No frames in keypoints_df. Please check keypoints file."
        # Checking body-centre bodypart exists
        KeypointsDf.check_bpts_exist(keypoints_df, bpts)
        # Getting indivs and bpts list
        indivs, _ = KeypointsDf.get_indivs_bpts(keypoints_df)

        # Calculating speed of subject for each frame
        analysis_df = AnalysisDf.init_df(keypoints_df.index)
        idx = pd.IndexSlice
        # Assumes there are only two individuals
        indiv_a = indivs[0]
        indiv_b = indivs[1]
        # Getting distances between each individual
        idx_a = idx[indiv_b, bpts, "x"]
        dist_x = (keypoints_df.loc[:, idx_a] - keypoints_df.loc[:, idx_a]).mean(axis=1)  # type: ignore
        idx_b = idx[indiv_a, bpts, "y"]
        dist_y = (keypoints_df.loc[:, idx_b] - keypoints_df.loc[:, idx_b]).mean(axis=1)  # type: ignore
        dist = np.array(np.sqrt(np.power(dist_x, 2) + np.power(dist_y, 2)))
        # Adding mm distance to saved analysis_df table
        analysis_df[(f"{indiv_a}_{indiv_b}", "DistMM")] = dist / px_per_mm
        analysis_df[(f"{indiv_a}_{indiv_b}", "DistMMSmoothed")] = (
            analysis_df[(f"{indiv_a}_{indiv_b}", "DistMM")]
            .rolling(window=smoothing_frames, min_periods=1, center=True)
            .agg(np.nanmean)
        )
        # Saving analysis_df
        fbf_fp = os.path.join(dst_subdir, FBF, f"{name}.{AnalysisDf.IO}")
        AnalysisDf.write(analysis_df, fbf_fp)

        # Summarising and binning analysis_df
        AnalysisBinnedDf.summary_binned_quantitative(
            analysis_df,
            dst_subdir,
            name,
            fps,
            bins_ls,
            cbins_ls,
        )
        return get_io_obj_content(io_obj)

    @staticmethod
    def freezing(
        keypoints_fp: str,
        dst_dir: str,
        configs_fp: str,
    ) -> str:
        """
        Determines the frames in which the subject is frozen.

        "Frozen" is defined as not moving outside of a radius of `threshold_mm`, and only
        includes bouts that last longer than `window_sec` spent seconds.

        NOTE: method is "greedy" because it looks at a freezing bout from earliest possible frame.
        """
        logger, io_obj = init_logger_io_obj()
        f_name = get_func_name_in_stack()
        name = get_name(keypoints_fp)
        dst_subdir = os.path.join(dst_dir, f_name)
        # Calculating the deltas (changes in body position) between each frame for the subject
        configs = ExperimentConfigs.read_json(configs_fp)
        fps, _, _, px_per_mm, bins_ls, cbins_ls = configs.get_analysis_configs()
        configs_filt = configs.user.analyse.freezing
        bpts = configs.get_ref(configs_filt.bodyparts)
        thresh_mm = configs.get_ref(configs_filt.thresh_mm)
        smoothing_sec = configs.get_ref(configs_filt.smoothing_sec)
        window_sec = configs.get_ref(configs_filt.window_sec)
        # Calculating more parameters
        thresh_px = thresh_mm / px_per_mm
        smoothing_frames = int(smoothing_sec * fps)
        window_frames = int(np.round(fps * window_sec, 0))

        # Loading in dataframe
        keypoints_df = KeypointsDf.clean_headings(KeypointsDf.read(keypoints_fp))
        assert keypoints_df.shape[0] > 0, "No frames in keypoints_df. Please check keypoints file."
        # Checking body-centre bodypart exists
        KeypointsDf.check_bpts_exist(keypoints_df, bpts)
        # Getting indivs and bpts list
        indivs, _ = KeypointsDf.get_indivs_bpts(keypoints_df)

        # Calculating speed of subject for each frame
        analysis_df = AnalysisDf.init_df(keypoints_df.index)
        keypoints_df.index = analysis_df.index
        for indiv in indivs:
            temp_df = pd.DataFrame(index=analysis_df.index)
            # Calculating frame-by-frame delta distances for current bpt
            for bpt in bpts:
                # Getting x and y changes
                delta_x = keypoints_df[(indiv, bpt, "x")].diff()
                delta_y = keypoints_df[(indiv, bpt, "y")].diff()
                # Getting Euclidean distance between frames for bpt
                delta = np.sqrt(np.power(delta_x, 2) + np.power(delta_y, 2))
                # Converting from px to mm
                temp_df[f"{bpt}_dist"] = delta
                # Smoothing
                temp_df[f"{bpt}_dist"] = (
                    temp_df[f"{bpt}_dist"].rolling(window=smoothing_frames, min_periods=1, center=True).agg(np.nanmean)
                )
            # If ALL bodypoints do not leave `thresh_px`
            analysis_df[(indiv, f_name)] = temp_df.apply(lambda x: pd.Series(np.all(x < thresh_px)), axis=1).astype(
                np.int8
            )

            # Getting start, stop, and duration of each freezing behav bout
            freezingbouts_df = BehavScoredDf.vect2bouts_df(analysis_df[(indiv, f_name)] == 1)
            # For each freezing bout, if there is less than window_frames, tehn
            # it is not actually freezing
            for _, row in freezingbouts_df.iterrows():
                if row["dur"] < window_frames:
                    analysis_df.loc[row["start"] : row["stop"], (indiv, f_name)] = 0
        # Saving analysis_df
        fbf_fp = os.path.join(dst_subdir, FBF, f"{name}.{AnalysisDf.IO}")
        AnalysisDf.write(analysis_df, fbf_fp)

        # Summarising and binning analysis_df
        AnalysisBinnedDf.summary_binned_behavs(
            analysis_df,
            dst_subdir,
            name,
            fps,
            bins_ls,
            cbins_ls,
        )
        return get_io_obj_content(io_obj)

freezing(keypoints_fp, dst_dir, configs_fp) staticmethod

Determines the frames in which the subject is frozen.

"Frozen" is defined as not moving outside of a radius of threshold_mm, and only includes bouts that last longer than window_sec spent seconds.

NOTE: method is "greedy" because it looks at a freezing bout from earliest possible frame.

Source code in behavysis/processes/analyse.py
@staticmethod
def freezing(
    keypoints_fp: str,
    dst_dir: str,
    configs_fp: str,
) -> str:
    """
    Determines the frames in which the subject is frozen.

    "Frozen" is defined as not moving outside of a radius of `threshold_mm`, and only
    includes bouts that last longer than `window_sec` spent seconds.

    NOTE: method is "greedy" because it looks at a freezing bout from earliest possible frame.
    """
    logger, io_obj = init_logger_io_obj()
    f_name = get_func_name_in_stack()
    name = get_name(keypoints_fp)
    dst_subdir = os.path.join(dst_dir, f_name)
    # Calculating the deltas (changes in body position) between each frame for the subject
    configs = ExperimentConfigs.read_json(configs_fp)
    fps, _, _, px_per_mm, bins_ls, cbins_ls = configs.get_analysis_configs()
    configs_filt = configs.user.analyse.freezing
    bpts = configs.get_ref(configs_filt.bodyparts)
    thresh_mm = configs.get_ref(configs_filt.thresh_mm)
    smoothing_sec = configs.get_ref(configs_filt.smoothing_sec)
    window_sec = configs.get_ref(configs_filt.window_sec)
    # Calculating more parameters
    thresh_px = thresh_mm / px_per_mm
    smoothing_frames = int(smoothing_sec * fps)
    window_frames = int(np.round(fps * window_sec, 0))

    # Loading in dataframe
    keypoints_df = KeypointsDf.clean_headings(KeypointsDf.read(keypoints_fp))
    assert keypoints_df.shape[0] > 0, "No frames in keypoints_df. Please check keypoints file."
    # Checking body-centre bodypart exists
    KeypointsDf.check_bpts_exist(keypoints_df, bpts)
    # Getting indivs and bpts list
    indivs, _ = KeypointsDf.get_indivs_bpts(keypoints_df)

    # Calculating speed of subject for each frame
    analysis_df = AnalysisDf.init_df(keypoints_df.index)
    keypoints_df.index = analysis_df.index
    for indiv in indivs:
        temp_df = pd.DataFrame(index=analysis_df.index)
        # Calculating frame-by-frame delta distances for current bpt
        for bpt in bpts:
            # Getting x and y changes
            delta_x = keypoints_df[(indiv, bpt, "x")].diff()
            delta_y = keypoints_df[(indiv, bpt, "y")].diff()
            # Getting Euclidean distance between frames for bpt
            delta = np.sqrt(np.power(delta_x, 2) + np.power(delta_y, 2))
            # Converting from px to mm
            temp_df[f"{bpt}_dist"] = delta
            # Smoothing
            temp_df[f"{bpt}_dist"] = (
                temp_df[f"{bpt}_dist"].rolling(window=smoothing_frames, min_periods=1, center=True).agg(np.nanmean)
            )
        # If ALL bodypoints do not leave `thresh_px`
        analysis_df[(indiv, f_name)] = temp_df.apply(lambda x: pd.Series(np.all(x < thresh_px)), axis=1).astype(
            np.int8
        )

        # Getting start, stop, and duration of each freezing behav bout
        freezingbouts_df = BehavScoredDf.vect2bouts_df(analysis_df[(indiv, f_name)] == 1)
        # For each freezing bout, if there is less than window_frames, tehn
        # it is not actually freezing
        for _, row in freezingbouts_df.iterrows():
            if row["dur"] < window_frames:
                analysis_df.loc[row["start"] : row["stop"], (indiv, f_name)] = 0
    # Saving analysis_df
    fbf_fp = os.path.join(dst_subdir, FBF, f"{name}.{AnalysisDf.IO}")
    AnalysisDf.write(analysis_df, fbf_fp)

    # Summarising and binning analysis_df
    AnalysisBinnedDf.summary_binned_behavs(
        analysis_df,
        dst_subdir,
        name,
        fps,
        bins_ls,
        cbins_ls,
    )
    return get_io_obj_content(io_obj)

in_roi(keypoints_fp, dst_dir, configs_fp) staticmethod

Determines the frames in which the subject is inside the cage (from average of given bodypoints).

Points are padding_px padded (away) from center.

Source code in behavysis/processes/analyse.py
@staticmethod
def in_roi(
    keypoints_fp: str,
    dst_dir: str,
    configs_fp: str,
) -> str:
    """
    Determines the frames in which the subject is inside the cage (from average
    of given bodypoints).

    Points are `padding_px` padded (away) from center.
    """
    logger, io_obj = init_logger_io_obj()
    f_name = get_func_name_in_stack()
    name = get_name(keypoints_fp)
    dst_subdir = os.path.join(dst_dir, f_name)
    # Calculating the deltas (changes in body position) between each frame for the subject
    configs = ExperimentConfigs.read_json(configs_fp)
    fps, _, _, px_per_mm, bins_ls, cbins_ls = configs.get_analysis_configs()
    configs_filt_ls = configs.user.analyse.in_roi
    # Loading in dataframe
    keypoints_df = KeypointsDf.clean_headings(KeypointsDf.read(keypoints_fp))
    assert keypoints_df.shape[0] > 0, "No frames in keypoints_df. Please check keypoints file."
    # Getting indivs list
    indivs, _ = KeypointsDf.get_indivs_bpts(keypoints_df)
    # Making analysis_df
    analysis_df_ls = []
    scatter_df_ls = []
    corners_df_ls = []
    roi_names_ls = []
    # For each roi, calculate the in-roi status of the subject
    x = CoordsCols.X.value
    y = CoordsCols.Y.value
    idx = pd.IndexSlice
    for configs_filt in configs_filt_ls:
        # Getting necessary config parameters
        roi_name = configs.get_ref(configs_filt.roi_name)
        is_in = configs.get_ref(configs_filt.is_in)
        bpts = configs.get_ref(configs_filt.bodyparts)
        padding_mm = configs.get_ref(configs_filt.padding_mm)
        roi_corners = configs.get_ref(configs_filt.roi_corners)
        # Calculating more parameters
        padding_px = padding_mm / px_per_mm
        # Checking bodyparts and roi_corners exist
        KeypointsDf.check_bpts_exist(keypoints_df, bpts)
        KeypointsDf.check_bpts_exist(keypoints_df, roi_corners)
        # Getting average corner coordinates. Assumes arena does not move.
        corners_i_df = pd.DataFrame([keypoints_df[(IndivCols.SINGLE.value, pt)].mean() for pt in roi_corners]).drop(
            columns=["likelihood"]
        )
        # Adjusting x-y to have `padding_px` dilation/erosion from the points themselves
        roi_center = corners_i_df.mean()
        for i in corners_i_df.index:
            # Calculating angle from centre to point (going out from centre)
            theta = np.arctan2(
                corners_i_df.loc[i, y] - roi_center[y],
                corners_i_df.loc[i, x] - roi_center[x],
            )
            # Getting x, y distances so point is `padding_px` padded (away) from center
            corners_i_df.loc[i, x] = corners_i_df.loc[i, x] + (padding_px * np.cos(theta))
            corners_i_df.loc[i, y] = corners_i_df.loc[i, y] + (padding_px * np.sin(theta))
        # Making the res_df
        analysis_i_df = AnalysisDf.init_df(keypoints_df.index)
        # For each individual, getting the in-roi status
        for indiv in indivs:
            # Getting average body center (x, y) for each individual
            analysis_i_df[(indiv, x)] = keypoints_df.loc[:, idx[indiv, bpts, x]].mean(axis=1).values  # type: ignore
            analysis_i_df[(indiv, y)] = keypoints_df.loc[:, idx[indiv, bpts, y]].mean(axis=1).values  # type: ignore
            # Determining if the indiv body center is in the ROI
            analysis_i_df[(indiv, roi_name)] = analysis_i_df[indiv].apply(
                lambda pt: pt_in_roi(pt, corners_i_df, logger), axis=1
            )
        # Inverting in_roi status if is_in is False
        if not is_in:
            analysis_i_df.loc[:, idx[:, roi_name]] = ~analysis_i_df.loc[:, idx[:, roi_name]]  # type: ignore
        analysis_df_ls.append(analysis_i_df.loc[:, idx[:, roi_name]].astype(np.int8))  # type: ignore
        scatter_df_ls.append(analysis_i_df)
        corners_df_ls.append(corners_i_df)
        roi_names_ls.append(roi_name)
    # Concatenating all analysis_df_ls and roi_corners_df_ls
    analysis_df = pd.concat(analysis_df_ls, axis=1)
    scatter_df = pd.concat(scatter_df_ls, axis=1)
    corners_df = pd.concat(corners_df_ls, keys=roi_names_ls, names=["roi"]).reset_index(level="roi")
    # Saving analysis_df
    fbf_fp = os.path.join(dst_subdir, FBF, f"{name}.{AnalysisDf.IO}")
    AnalysisDf.write(analysis_df, fbf_fp)
    plot_fp = os.path.join(dst_subdir, "scatter_plot", f"{name}.png")
    AnalysisDf.make_location_scatterplot(scatter_df, corners_df, plot_fp)
    # Summarising and binning analysis_df
    AnalysisBinnedDf.summary_binned_behavs(
        analysis_df,
        dst_subdir,
        name,
        fps,
        bins_ls,
        cbins_ls,
    )
    return get_io_obj_content(io_obj)

social_distance(keypoints_fp, dst_dir, configs_fp) staticmethod

Determines the speed of the subject in each frame.

Source code in behavysis/processes/analyse.py
@staticmethod
def social_distance(
    keypoints_fp: str,
    dst_dir: str,
    configs_fp: str,
) -> str:
    """
    Determines the speed of the subject in each frame.
    """
    logger, io_obj = init_logger_io_obj()
    f_name = get_func_name_in_stack()
    name = get_name(keypoints_fp)
    dst_subdir = os.path.join(dst_dir, f_name)
    # Calculating the deltas (changes in body position) between each frame for the subject
    configs = ExperimentConfigs.read_json(configs_fp)
    fps, _, _, px_per_mm, bins_ls, cbins_ls = configs.get_analysis_configs()
    configs_filt = configs.user.analyse.social_distance
    bpts = configs.get_ref(configs_filt.bodyparts)
    smoothing_sec = configs.get_ref(configs_filt.smoothing_sec)
    # Calculating more parameters
    smoothing_frames = int(smoothing_sec * fps)

    # Loading in dataframe
    keypoints_df = KeypointsDf.clean_headings(KeypointsDf.read(keypoints_fp))
    assert keypoints_df.shape[0] > 0, "No frames in keypoints_df. Please check keypoints file."
    # Checking body-centre bodypart exists
    KeypointsDf.check_bpts_exist(keypoints_df, bpts)
    # Getting indivs and bpts list
    indivs, _ = KeypointsDf.get_indivs_bpts(keypoints_df)

    # Calculating speed of subject for each frame
    analysis_df = AnalysisDf.init_df(keypoints_df.index)
    idx = pd.IndexSlice
    # Assumes there are only two individuals
    indiv_a = indivs[0]
    indiv_b = indivs[1]
    # Getting distances between each individual
    idx_a = idx[indiv_b, bpts, "x"]
    dist_x = (keypoints_df.loc[:, idx_a] - keypoints_df.loc[:, idx_a]).mean(axis=1)  # type: ignore
    idx_b = idx[indiv_a, bpts, "y"]
    dist_y = (keypoints_df.loc[:, idx_b] - keypoints_df.loc[:, idx_b]).mean(axis=1)  # type: ignore
    dist = np.array(np.sqrt(np.power(dist_x, 2) + np.power(dist_y, 2)))
    # Adding mm distance to saved analysis_df table
    analysis_df[(f"{indiv_a}_{indiv_b}", "DistMM")] = dist / px_per_mm
    analysis_df[(f"{indiv_a}_{indiv_b}", "DistMMSmoothed")] = (
        analysis_df[(f"{indiv_a}_{indiv_b}", "DistMM")]
        .rolling(window=smoothing_frames, min_periods=1, center=True)
        .agg(np.nanmean)
    )
    # Saving analysis_df
    fbf_fp = os.path.join(dst_subdir, FBF, f"{name}.{AnalysisDf.IO}")
    AnalysisDf.write(analysis_df, fbf_fp)

    # Summarising and binning analysis_df
    AnalysisBinnedDf.summary_binned_quantitative(
        analysis_df,
        dst_subdir,
        name,
        fps,
        bins_ls,
        cbins_ls,
    )
    return get_io_obj_content(io_obj)

speed(keypoints_fp, dst_dir, configs_fp) staticmethod

Determines the speed of the subject in each frame.

Source code in behavysis/processes/analyse.py
@staticmethod
def speed(
    keypoints_fp: str,
    dst_dir: str,
    configs_fp: str,
) -> str:
    """
    Determines the speed of the subject in each frame.
    """
    logger, io_obj = init_logger_io_obj()
    f_name = get_func_name_in_stack()
    name = get_name(keypoints_fp)
    dst_subdir = os.path.join(dst_dir, f_name)
    # Calculating the deltas (changes in body position) between each frame for the subject
    configs = ExperimentConfigs.read_json(configs_fp)
    fps, _, _, px_per_mm, bins_ls, cbins_ls = configs.get_analysis_configs()
    configs_filt = configs.user.analyse.speed
    bpts = configs.get_ref(configs_filt.bodyparts)
    smoothing_sec = configs.get_ref(configs_filt.smoothing_sec)
    # Calculating more parameters
    smoothing_frames = int(smoothing_sec * fps)

    # Loading in dataframe
    keypoints_df = KeypointsDf.clean_headings(KeypointsDf.read(keypoints_fp))
    assert keypoints_df.shape[0] > 0, "No frames in keypoints_df. Please check keypoints file."
    # Checking body-centre bodypart exists
    KeypointsDf.check_bpts_exist(keypoints_df, bpts)
    # Getting indivs and bpts list
    indivs, _ = KeypointsDf.get_indivs_bpts(keypoints_df)

    # Calculating speed of subject for each frame
    analysis_df = AnalysisDf.init_df(keypoints_df.index)
    # keypoints_df.index = analysis_df.index
    idx = pd.IndexSlice
    for indiv in indivs:
        # Making a rolling window of 3 frames for average body-centre
        # Otherwise jitter contributes to movement
        jitter_frames = 3
        smoothed_xy_df = keypoints_df.rolling(window=jitter_frames, min_periods=1, center=True).agg(np.nanmean)
        # Getting changes in x-y values between frames (deltas)
        delta_x = smoothed_xy_df.loc[:, idx[indiv, bpts, "x"]].mean(axis=1).diff()  # type: ignore
        delta_y = smoothed_xy_df.loc[:, idx[indiv, bpts, "y"]].mean(axis=1).diff()  # type: ignore
        delta = np.array(np.sqrt(np.power(delta_x, 2) + np.power(delta_y, 2)))
        # Storing speed (raw and smoothed)
        analysis_df[(indiv, "SpeedMMperSec")] = (delta / px_per_mm) * fps
        analysis_df[(indiv, "SpeedMMperSecSmoothed")] = (
            analysis_df[(indiv, "SpeedMMperSec")]
            .rolling(window=smoothing_frames, min_periods=1, center=True)
            .agg(np.nanmean)
        )
    # Backfilling the analysis_df so no nan's
    analysis_df = analysis_df.bfill()
    # Saving analysis_df
    fbf_fp = os.path.join(dst_subdir, FBF, f"{name}.{AnalysisDf.IO}")
    AnalysisDf.write(analysis_df, fbf_fp)

    # Summarising and binning analysis_df
    AnalysisBinnedDf.summary_binned_quantitative(
        analysis_df,
        dst_subdir,
        name,
        fps,
        bins_ls,
        cbins_ls,
    )
    return get_io_obj_content(io_obj)

behavysis.processes.combine_analysis

CombineAnalysis

Source code in behavysis/processes/combine_analysis.py
class CombineAnalysis:
    @classmethod
    def combine_analysis(
        cls,
        analysis_dir: str,
        analysis_combined_fp: str,
        configs_fp: str,
        overwrite: bool,
    ) -> str:
        """
        Concatenates across columns the frame-by-frame dataframes for all analysis subdirectories
        and saves this in a single dataframe.
        """
        logger, io_obj = init_logger_io_obj()
        if not overwrite and os.path.exists(analysis_combined_fp):
            logger.warning(file_exists_msg(analysis_combined_fp))
            return get_io_obj_content(io_obj)
        name = get_name(configs_fp)
        # For each analysis subdir, combining fbf files
        analysis_subdir_ls = [i for i in os.listdir(analysis_dir) if os.path.isdir(os.path.join(analysis_dir, i))]
        # If no analysis files, then return warning and don't make df
        if len(analysis_subdir_ls) == 0:
            logger.warning("no analysis fbf files made. Run `exp.analyse` first")
            return get_io_obj_content(io_obj)
        # Reading in each fbf analysis df
        comb_df_ls = [
            AnalysisDf.read(os.path.join(analysis_dir, analysis_subdir, FBF, f"{name}.{AnalysisDf.IO}"))
            for analysis_subdir in analysis_subdir_ls
        ]
        # Making combined df from list of dfs
        comb_df = pd.concat(
            comb_df_ls,
            axis=1,
            keys=analysis_subdir_ls,
            names=[AnalysisCombinedDf.CN.ANALYSIS.value],
        )
        # Writing to file
        AnalysisCombinedDf.write(comb_df, analysis_combined_fp)
        return get_io_obj_content(io_obj)

combine_analysis(analysis_dir, analysis_combined_fp, configs_fp, overwrite) classmethod

Concatenates across columns the frame-by-frame dataframes for all analysis subdirectories and saves this in a single dataframe.

Source code in behavysis/processes/combine_analysis.py
@classmethod
def combine_analysis(
    cls,
    analysis_dir: str,
    analysis_combined_fp: str,
    configs_fp: str,
    overwrite: bool,
) -> str:
    """
    Concatenates across columns the frame-by-frame dataframes for all analysis subdirectories
    and saves this in a single dataframe.
    """
    logger, io_obj = init_logger_io_obj()
    if not overwrite and os.path.exists(analysis_combined_fp):
        logger.warning(file_exists_msg(analysis_combined_fp))
        return get_io_obj_content(io_obj)
    name = get_name(configs_fp)
    # For each analysis subdir, combining fbf files
    analysis_subdir_ls = [i for i in os.listdir(analysis_dir) if os.path.isdir(os.path.join(analysis_dir, i))]
    # If no analysis files, then return warning and don't make df
    if len(analysis_subdir_ls) == 0:
        logger.warning("no analysis fbf files made. Run `exp.analyse` first")
        return get_io_obj_content(io_obj)
    # Reading in each fbf analysis df
    comb_df_ls = [
        AnalysisDf.read(os.path.join(analysis_dir, analysis_subdir, FBF, f"{name}.{AnalysisDf.IO}"))
        for analysis_subdir in analysis_subdir_ls
    ]
    # Making combined df from list of dfs
    comb_df = pd.concat(
        comb_df_ls,
        axis=1,
        keys=analysis_subdir_ls,
        names=[AnalysisCombinedDf.CN.ANALYSIS.value],
    )
    # Writing to file
    AnalysisCombinedDf.write(comb_df, analysis_combined_fp)
    return get_io_obj_content(io_obj)

behavysis.processes.evaluate.Evaluate

summary

Source code in behavysis/processes/evaluate/__init__.py
class Evaluate:
    """__summary__"""

    ###############################################################################################
    #               MAKE KEYPOINTS PLOTS
    ###############################################################################################

    @staticmethod
    def keypoints_plot(
        vid_fp: str,
        dlc_fp: str,
        behavs_fp: str,
        dst_dir: str,
        configs_fp: str,
        overwrite: bool,
    ) -> str:
        """
        Make keypoints evaluation plot of likelihood of each bodypart through time.
        """
        name = get_name(dlc_fp)
        dst_dir = os.path.join(dst_dir, Evaluate.keypoints_plot.__name__)
        dst_fp = os.path.join(dst_dir, f"{name}.png")
        os.makedirs(dst_dir, exist_ok=True)

        # Getting necessary config parameters
        configs = ExperimentConfigs.read_json(configs_fp)
        configs_filt = configs.user.evaluate.keypoints_plot
        bpts = configs.get_ref(configs_filt.bodyparts)
        fps = configs.auto.formatted_vid.fps

        # Read the file
        df = KeypointsDf.clean_headings(KeypointsDf.read(dlc_fp))
        # Checking the bodyparts specified in the configs exist in the dataframe
        KeypointsDf.check_bpts_exist(df, bpts)
        # Making data-long ways
        idx = pd.IndexSlice
        df = (
            df.loc[:, idx[:, bpts]]
            .stack([KeypointsDf.CN.INDIVIDUALS.value, KeypointsDf.CN.BODYPARTS.value])
            .reset_index()
        )
        # Adding the timestamp column
        df["timestamp"] = df[KeypointsDf.IN.FRAME.value] / fps
        # Making plot
        g = sns.FacetGrid(
            df,
            row=KeypointsDf.CN.INDIVIDUALS.value,
            height=5,
            aspect=10,
        )
        g.map_dataframe(
            sns.lineplot,
            x="timestamp",
            y=CoordsCols.LIKELIHOOD.value,
            hue=KeypointsDf.CN.BODYPARTS.value,
            alpha=0.4,
        )
        g.add_legend()
        # Saving plot
        g.savefig(dst_fp)
        g.figure.clf()
        return ""

    ###############################################################################################
    # MAKE BEHAVIOUR PLOTS
    ###############################################################################################

    @staticmethod
    def behav_plot(
        vid_fp: str,
        dlc_fp: str,
        behavs_fp: str,
        dst_dir: str,
        configs_fp: str,
        overwrite: bool,
    ) -> str:
        """
        Make behaviour evaluation plot of the predicted and actual behaviours through time.
        """
        name = get_name(behavs_fp)
        dst_dir = os.path.join(dst_dir, Evaluate.behav_plot.__name__)
        dst_fp = os.path.join(dst_dir, f"{name}.png")
        os.makedirs(dst_dir, exist_ok=True)
        # If overwrite is False, checking if we should skip processing
        if not overwrite and os.path.exists(dst_fp):
            return file_exists_msg()

        # Getting necessary config parameters
        configs = ExperimentConfigs.read_json(configs_fp)
        # configs_filt = configs.user.evaluate.behav_plot
        fps = float(configs.auto.formatted_vid.fps)

        # Read the file
        df = BehavScoredDf.read(behavs_fp)
        # Making data-long ways
        df = (
            df.stack([BehavScoredDf.CN.BEHAVS.value, BehavScoredDf.CN.OUTCOMES.value])
            .reset_index()
            .rename(columns={0: "value"})
        )
        # Adding the timestamp column
        df["timestamp"] = df[BehavScoredDf.IN.FRAME.value] / fps
        # Making plot
        g = sns.FacetGrid(
            df,
            row=BehavScoredDf.CN.BEHAVS.value,
            height=5,
            aspect=10,
        )
        g.map_dataframe(
            sns.lineplot,
            x="timestamp",
            y="value",
            hue=BehavScoredDf.CN.OUTCOMES.value,
            alpha=0.4,
        )
        g.add_legend()
        # Saving plot
        g.savefig(dst_fp)
        g.figure.clf()
        return ""

behav_plot(vid_fp, dlc_fp, behavs_fp, dst_dir, configs_fp, overwrite) staticmethod

Make behaviour evaluation plot of the predicted and actual behaviours through time.

Source code in behavysis/processes/evaluate/__init__.py
@staticmethod
def behav_plot(
    vid_fp: str,
    dlc_fp: str,
    behavs_fp: str,
    dst_dir: str,
    configs_fp: str,
    overwrite: bool,
) -> str:
    """
    Make behaviour evaluation plot of the predicted and actual behaviours through time.
    """
    name = get_name(behavs_fp)
    dst_dir = os.path.join(dst_dir, Evaluate.behav_plot.__name__)
    dst_fp = os.path.join(dst_dir, f"{name}.png")
    os.makedirs(dst_dir, exist_ok=True)
    # If overwrite is False, checking if we should skip processing
    if not overwrite and os.path.exists(dst_fp):
        return file_exists_msg()

    # Getting necessary config parameters
    configs = ExperimentConfigs.read_json(configs_fp)
    # configs_filt = configs.user.evaluate.behav_plot
    fps = float(configs.auto.formatted_vid.fps)

    # Read the file
    df = BehavScoredDf.read(behavs_fp)
    # Making data-long ways
    df = (
        df.stack([BehavScoredDf.CN.BEHAVS.value, BehavScoredDf.CN.OUTCOMES.value])
        .reset_index()
        .rename(columns={0: "value"})
    )
    # Adding the timestamp column
    df["timestamp"] = df[BehavScoredDf.IN.FRAME.value] / fps
    # Making plot
    g = sns.FacetGrid(
        df,
        row=BehavScoredDf.CN.BEHAVS.value,
        height=5,
        aspect=10,
    )
    g.map_dataframe(
        sns.lineplot,
        x="timestamp",
        y="value",
        hue=BehavScoredDf.CN.OUTCOMES.value,
        alpha=0.4,
    )
    g.add_legend()
    # Saving plot
    g.savefig(dst_fp)
    g.figure.clf()
    return ""

keypoints_plot(vid_fp, dlc_fp, behavs_fp, dst_dir, configs_fp, overwrite) staticmethod

Make keypoints evaluation plot of likelihood of each bodypart through time.

Source code in behavysis/processes/evaluate/__init__.py
@staticmethod
def keypoints_plot(
    vid_fp: str,
    dlc_fp: str,
    behavs_fp: str,
    dst_dir: str,
    configs_fp: str,
    overwrite: bool,
) -> str:
    """
    Make keypoints evaluation plot of likelihood of each bodypart through time.
    """
    name = get_name(dlc_fp)
    dst_dir = os.path.join(dst_dir, Evaluate.keypoints_plot.__name__)
    dst_fp = os.path.join(dst_dir, f"{name}.png")
    os.makedirs(dst_dir, exist_ok=True)

    # Getting necessary config parameters
    configs = ExperimentConfigs.read_json(configs_fp)
    configs_filt = configs.user.evaluate.keypoints_plot
    bpts = configs.get_ref(configs_filt.bodyparts)
    fps = configs.auto.formatted_vid.fps

    # Read the file
    df = KeypointsDf.clean_headings(KeypointsDf.read(dlc_fp))
    # Checking the bodyparts specified in the configs exist in the dataframe
    KeypointsDf.check_bpts_exist(df, bpts)
    # Making data-long ways
    idx = pd.IndexSlice
    df = (
        df.loc[:, idx[:, bpts]]
        .stack([KeypointsDf.CN.INDIVIDUALS.value, KeypointsDf.CN.BODYPARTS.value])
        .reset_index()
    )
    # Adding the timestamp column
    df["timestamp"] = df[KeypointsDf.IN.FRAME.value] / fps
    # Making plot
    g = sns.FacetGrid(
        df,
        row=KeypointsDf.CN.INDIVIDUALS.value,
        height=5,
        aspect=10,
    )
    g.map_dataframe(
        sns.lineplot,
        x="timestamp",
        y=CoordsCols.LIKELIHOOD.value,
        hue=KeypointsDf.CN.BODYPARTS.value,
        alpha=0.4,
    )
    g.add_legend()
    # Saving plot
    g.savefig(dst_fp)
    g.figure.clf()
    return ""