diff --git a/mmif/utils/video_document_helper.py b/mmif/utils/video_document_helper.py index 4c153243..87c1823b 100644 --- a/mmif/utils/video_document_helper.py +++ b/mmif/utils/video_document_helper.py @@ -7,7 +7,7 @@ import warnings from io import StringIO from typing import Iterable # todo: replace with collections.abc.Iterable in Python 3.9 -from typing import List, Union, Tuple +from typing import List, Optional, Union, Tuple import mmif from mmif import Annotation, Document, Mmif @@ -57,13 +57,13 @@ class SamplingMode(Enum): ), SamplingMode.SINGLE: ( "uses the middle representative if present, otherwise " - "extracts a frame from the midpoint of the start/end " + "extracts an image from the midpoint of the start/end " "interval (midpoint is calculated by floor division " "of the sum of start and end)." ), SamplingMode.ALL: ( "uses all target timepoints if present, otherwise " - "extracts all frames from the time interval." + "extracts all images from the time interval." ), } SAMPLING_MODE_DEFAULT = SamplingMode.REPRESENTATIVES @@ -120,35 +120,6 @@ def open_container(video_document: Document): return container -def capture(video_document: Document): - """ - .. deprecated:: - Use :py:func:`open_container` instead. See issue #379. - - Captures a video file using OpenCV and adds fps, frame count, and duration as properties to the document. - - :param video_document: :py:class:`~mmif.serialize.annotation.Document` instance that holds a video document (``"@type": ".../VideoDocument/..."``) - :return: `OpenCV VideoCapture `_ object - """ - warnings.warn( - f'capture() is deprecated; use open_container() instead. ' - f'{_PTS_BUG_NOTICE}', - DeprecationWarning, stacklevel=2, - ) - cv2 = _check_cv_dep('cv2') - if video_document is None or video_document.at_type != DocumentTypes.VideoDocument: - raise ValueError(f'The document does not exist.') - - v = cv2.VideoCapture(video_document.location_path(nonexist_ok=False)) - fps = round(v.get(cv2.CAP_PROP_FPS), 2) - fc = v.get(cv2.CAP_PROP_FRAME_COUNT) - dur = round(fc / fps, 3) * 1000 - video_document.add_property(FPS_DOCPROP_KEY, fps) - video_document.add_property(FRAMECOUNT_DOCPROP_KEY, fc) - video_document.add_property(DURATION_DOCPROP_KEY, dur) - return v - - def get_framerate(video_document: Document) -> float: """ Gets the frame rate of a video document. First by checking the fps @@ -174,24 +145,24 @@ def get_framerate(video_document: Document) -> float: container.close() -def extract_timepoints_as_images( +def extract_images_from_timepoints( video_document: Document, timepoints_ms: Iterable[int], as_PIL: bool = False, ): """ - Extracts frames at the given media-timeline timepoints (in milliseconds). + Extracts images at the given media-timeline timepoints (in milliseconds). - For each requested timepoint, returns the frame whose actual + For each requested timepoint, returns the image whose actual presentation timestamp (PTS) is closest to it. Duplicate timepoints - produce duplicate frames at the same list positions as the input. + produce duplicate images at the same list positions as the input. :param video_document: :py:class:`~mmif.serialize.annotation.Document` holding a video document (``"@type": ".../VideoDocument/..."``) :param timepoints_ms: iterable of timepoint values in milliseconds :param as_PIL: return :py:class:`PIL.Image.Image` (RGB) instead of :py:class:`~numpy.ndarray` (BGR) - :returns: frames in the same order (and with the same multiplicity) as + :returns: images in the same order (and with the same multiplicity) as ``timepoints_ms`` :rtype: list """ @@ -260,174 +231,6 @@ def _emit(frame, t_ms): return [result_map[t] for t in original_timepoints if t in result_map] -def extract_frames_as_images(video_document: Document, framenums: Iterable[int], as_PIL: bool = False, record_ffmpeg_errors: bool = False): - """ - .. deprecated:: - Use :py:func:`extract_timepoints_as_images` instead. See issue #379. - - Extracts frames from a video document as a list of :py:class:`numpy.ndarray`. - Use with :py:func:`sample_frames` function to get the list of frame numbers first. - - :param video_document: :py:class:`~mmif.serialize.annotation.Document` instance that holds a video document (``"@type": ".../VideoDocument/..."``) - :param framenums: iterable integers representing the frame numbers to extract - :param as_PIL: return :py:class:`PIL.Image.Image` instead of :py:class:`~numpy.ndarray` - :param record_ffmpeg_errors: if True, records and warns about FFmpeg stderr output during extraction - :return: frames as a list of :py:class:`~numpy.ndarray` or :py:class:`~PIL.Image.Image` - """ - warnings.warn( - f'extract_frames_as_images() is deprecated; use ' - f'extract_timepoints_as_images() instead. {_PTS_BUG_NOTICE}', - DeprecationWarning, stacklevel=2, - ) - cv2 = _check_cv_dep('cv2') - # deduplicate and sort frame numbers for extraction, then map back to original order - original_framenums = list(framenums) - unique_framenums = sorted(set(original_framenums)) - if as_PIL: - Image = _check_cv_dep('PIL.Image') - unique_frames = {} - video = capture(video_document) - cur_f = 0 - tot_fcount = video_document.get_property(FRAMECOUNT_DOCPROP_KEY) - # when the target frame is more than this frames away, fast-forward instead of reading frame by frame - # this is sanity-checked with a small number of video samples - # (frame-by-frame ndarrays are compared with fast-forwarded ndarrays) - skip_threadhold = 1000 - framenumi = iter(unique_framenums) - next_target_f = next(framenumi, None) - cpipes = _check_cv_dep('wurlitzer').pipes - ffmpeg_errs = StringIO() - with cpipes(stderr=ffmpeg_errs, stdout=sys.stdout): - while True: - if next_target_f is None or cur_f > tot_fcount or next_target_f > tot_fcount: - break - if next_target_f - cur_f > skip_threadhold: - while next_target_f - cur_f > skip_threadhold: - cur_f += skip_threadhold - else: - video.set(cv2.CAP_PROP_POS_FRAMES, cur_f) - ret, frame = video.read() - if cur_f == next_target_f: - if not ret: - sec = convert(cur_f, 'f', 's', video_document.get_property(FPS_DOCPROP_KEY)) - warnings.warn(f'Frame #{cur_f} ({sec}s) could not be read from the video {video_document.id} @ {video_document.location} .') - else: - unique_frames[cur_f] = Image.fromarray(frame[:, :, ::-1]) if as_PIL else frame - next_target_f = next(framenumi, None) - cur_f += 1 - ffmpeg_err_str = ffmpeg_errs.getvalue() - if ffmpeg_err_str and record_ffmpeg_errors: - warnings.warn(f'FFmpeg output during extracting frames: {ffmpeg_err_str}') - video.release() - # return frames in original input order, duplicating where needed - return [unique_frames[f] for f in original_framenums if f in unique_frames] - - -def get_mid_framenum(mmif: Mmif, time_frame: Annotation) -> int: - """ - .. deprecated:: - Use :py:func:`extract_frames_by_mode` instead. - """ - warnings.warn('This function is deprecated. Use ``extract_frames_by_mode()`` instead.', DeprecationWarning, stacklevel=2) - return _get_mid_framenum(mmif, time_frame) - - -def _get_mid_framenum(mmif: Mmif, time_frame: Annotation) -> int: - """ - Calculates the middle frame number of a time interval annotation. - - :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance - :param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance that holds a time interval annotation (``"@type": ".../TimeFrame/..."``) - :return: middle frame number as an integer - """ - timeunit = time_frame.get_property('timeUnit') - video_document = mmif[time_frame.get_property('document')] - fps = get_framerate(video_document) - return int(convert(time_frame.get_property('start') + time_frame.get_property('end'), timeunit, 'frame', fps) // 2) - - -def extract_mid_frame(mmif: Mmif, time_frame: Annotation, as_PIL: bool = False): - """ - .. deprecated:: - Use :py:func:`extract_frames_by_mode` instead. - - Extracts the middle frame of a time interval annotation as a numpy ndarray. - - :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance - :param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance that holds a time interval annotation (``"@type": ".../TimeFrame/..."``) - :param as_PIL: return :py:class:`~PIL.Image.Image` instead of :py:class:`~numpy.ndarray` - :return: frame as a :py:class:`numpy.ndarray` or :py:class:`PIL.Image.Image` - """ - warnings.warn('This function is deprecated. Use ``extract_frames_by_mode()`` instead.', DeprecationWarning, stacklevel=2) - vd = mmif[time_frame.get_property('document')] - fn = get_mid_framenum(mmif, time_frame) - return extract_frames_as_images(vd, [fn], as_PIL=as_PIL)[0] - - -def get_representative_framenums(mmif: Mmif, time_frame: Annotation) -> List[int]: - """ - .. deprecated:: - Use :py:func:`extract_frames_by_mode` instead. - - Calculates the representative frame numbers from an annotation. To pick the representative frames, it first looks - up the ``representatives`` property of the ``TimeFrame`` annotation. If it is not found, it will calculate the - number of the middle frame. - - :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance - :param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance that holds a time interval annotation containing a `representatives` property (``"@type": ".../TimeFrame/..."``) - :return: representative frame number as an integer - """ - warnings.warn('This function is deprecated. Use ``extract_frames_by_mode()`` instead.', DeprecationWarning, stacklevel=2) - if 'representatives' not in time_frame.properties: - return [_get_mid_framenum(mmif, time_frame)] - timeunit = time_frame.get_property('timeUnit') - video_document = mmif[time_frame.get_property('document')] - fps = get_framerate(video_document) - representatives = time_frame.get_property('representatives') - ref_frams = [] - for rep_id in representatives: - try: - rep_anno = mmif[rep_id] - except KeyError as ke: - raise ValueError(f'Representative timepoint {rep_id} not found in any view. ({ke})') - ref_frams.append(int(convert(rep_anno.get_property('timePoint'), timeunit, 'frame', fps))) - return ref_frams - - -def get_representative_framenum(mmif: Mmif, time_frame: Annotation) -> int: - """ - .. deprecated:: - Use :py:func:`extract_frames_by_mode` instead. - - A thin wrapper around :py:func:`get_representative_framenums` to return a single representative frame number. Always - return the first frame number found. - """ - warnings.warn('This function is deprecated. Use ``extract_frames_by_mode()`` instead.', DeprecationWarning, stacklevel=2) - try: - return get_representative_framenums(mmif, time_frame)[0] - except IndexError: - raise ValueError(f'No representative frame found in the TimeFrame annotation {time_frame.id}.') - - -def extract_representative_frame(mmif: Mmif, time_frame: Annotation, as_PIL: bool = False, first_only: bool = True): - """ - .. deprecated:: - Use :py:func:`extract_frames_by_mode` instead. - - Extracts the representative frame of an annotation as a numpy ndarray or PIL Image. - - :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance - :param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance that holds a time interval annotation (``"@type": ".../TimeFrame/..."``) - :param as_PIL: return :py:class:`~PIL.Image.Image` instead of :py:class:`~numpy.ndarray` - :param first_only: return the first representative frame only - :return: frame as a :py:class:`numpy.ndarray` or :py:class:`PIL.Image.Image` - """ - warnings.warn('This function is deprecated. Use ``extract_frames_by_mode()`` instead.', DeprecationWarning, stacklevel=2) - video_document = mmif[time_frame.get_property('document')] - rep_frame_num = [get_representative_framenum(mmif, time_frame)] if first_only else get_representative_framenums(mmif, time_frame) - return extract_frames_as_images(video_document, rep_frame_num, as_PIL=as_PIL)[0] - - def _tp_ids_to_timepoints_ms(mmif: Mmif, tp_ids: List[str]) -> List[int]: """ Converts a list of timepoint annotation IDs to media-timeline timepoints in milliseconds. @@ -486,86 +289,114 @@ def _timeframe_to_timepoint_range_ms( return int(round(start)), int(round(end)) -def _sample_all_timepoints_ms(mmif: Mmif, time_frame: Annotation) -> List[int]: +def _sample_all_timepoint_pairs_ms( + mmif: Mmif, time_frame: Annotation +) -> List[Tuple[int, Optional[str]]]: """ - Samples all timepoints (ms) from a TimeFrame. Uses all ``targets`` if - present, otherwise samples the start/end interval at the stream's - average frame rate. + Samples all (timepoint_ms, source_id) pairs from a TimeFrame. Uses all + ``targets`` if present (source is the TP annotation id), otherwise + samples the start/end interval at the stream's average frame rate + (source is None for each sampled point). :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance :param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance of a TimeFrame - :return: list of timepoint values in ms + :return: list of (ms, source_id) pairs; source_id is ``None`` when + the point came from interval sampling rather than a TP :rtype: list """ if 'targets' in time_frame.properties: - return _tp_ids_to_timepoints_ms( - mmif, time_frame.get_property('targets')) + target_ids = time_frame.get_property('targets') + ms_list = _tp_ids_to_timepoints_ms(mmif, target_ids) + return list(zip(ms_list, target_ids)) start_ms, end_ms = _timeframe_to_timepoint_range_ms(mmif, time_frame) video_document = _resolve_video_document(mmif, time_frame) fps = get_framerate(video_document) step_ms = 1000.0 / fps - return sample_timepoints(start_ms, end_ms, step_ms) + return [(t, None) for t in sample_timepoints(start_ms, end_ms, step_ms)] -def _sample_representatives_timepoints_ms( +def _sample_representatives_timepoint_pairs_ms( mmif: Mmif, time_frame: Annotation -) -> List[int]: +) -> List[Tuple[int, str]]: """ - Samples timepoints (ms) from a TimeFrame's representatives. Returns an - empty list if ``representatives`` is not present (skips the TimeFrame). + Samples (timepoint_ms, source_id) pairs from a TimeFrame's + representatives. Returns an empty list if ``representatives`` is not + present (skips the TimeFrame). Source is always the rep TP id. :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance :param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance of a TimeFrame - :return: list of timepoint values in ms (empty if no representatives) + :return: list of (ms, rep_id) pairs (empty if no representatives) :rtype: list """ if 'representatives' in time_frame.properties: reps = time_frame.get_property('representatives') if reps: - return _tp_ids_to_timepoints_ms(mmif, reps) + ms_list = _tp_ids_to_timepoints_ms(mmif, reps) + return list(zip(ms_list, reps)) return [] -def _sample_single_timepoint_ms( +def _sample_single_timepoint_pair_ms( mmif: Mmif, time_frame: Annotation -) -> List[int]: +) -> List[Tuple[int, Optional[str]]]: """ - Samples a single timepoint (ms) from a TimeFrame. Uses the middle - representative if ``representatives`` is present, otherwise the - midpoint of the start/end interval. + Samples a single (timepoint_ms, source_id) pair from a TimeFrame. + Uses the middle representative if ``representatives`` is present + (source is the rep TP id), otherwise falls back to the midpoint of + the start/end interval (source is None). :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance :param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance of a TimeFrame - :return: list containing a single timepoint value in ms + :return: single-element list ``[(ms, source_id)]``; source_id is + ``None`` when the point came from the interval midpoint fallback :rtype: list """ if 'representatives' in time_frame.properties: reps = time_frame.get_property('representatives') if reps: mid = reps[len(reps) // 2] - return _tp_ids_to_timepoints_ms(mmif, [mid]) + ms_list = _tp_ids_to_timepoints_ms(mmif, [mid]) + return [(ms_list[0], mid)] start_ms, end_ms = _timeframe_to_timepoint_range_ms(mmif, time_frame) - return [(start_ms + end_ms) // 2] + return [((start_ms + end_ms) // 2, None)] -def extract_target_frames(mmif: Mmif, annotation: Annotation, min_timepoints: int = 0, max_timepoints: int = sys.maxsize, fraction: float = 1.0, as_PIL: bool = False): +def extract_images_by_count_with_sources( + mmif: Mmif, + annotation: Annotation, + min_timepoints: int = 0, + max_timepoints: int = sys.maxsize, + fraction: float = 1.0, + as_PIL: bool = False + ) -> Tuple[List, List[str]]: """ - Extracts frames corresponding to the timepoints listed in the ``targets`` property of an annotation. - Selection of timepoints is based on minimum, maximum, and fraction of targets to include. + Extracts images at a count-controlled subset of the timepoints listed + in the ``targets`` property of an annotation, alongside the IDs of the + selected target TPs. + + The number of timepoints chosen is ``max(min_timepoints, + int(num_targets * fraction))``, clamped to ``max_timepoints`` and to + the number of available targets. The chosen indices are spread evenly + across the target list. :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance - :param annotation: :py:class:`~mmif.serialize.annotation.Annotation` instance containing a ``targets`` property + :param annotation: :py:class:`~mmif.serialize.annotation.Annotation` + instance containing a ``targets`` property :param min_timepoints: minimum number of timepoints to include :param max_timepoints: maximum number of timepoints to include :param fraction: fraction of targets to include (ideally) - :param as_PIL: return :py:class:`~PIL.Image.Image` instead of :py:class:`~numpy.ndarray` - :return: a tuple containing (list of frames, list of selected target IDs) + :param as_PIL: return :py:class:`~PIL.Image.Image` instead of + :py:class:`~numpy.ndarray` + :return: tuple of (list of images, list of selected target TP IDs); + the two lists are parallel + :rtype: tuple """ if 'targets' not in annotation.properties: - raise ValueError(f'Annotation {annotation.id} does not have a "targets" property.') + raise ValueError( + f'Annotation {annotation.id} does not have a "targets" property.') targets = annotation.get_property('targets') num_targets = len(targets) @@ -580,47 +411,126 @@ def extract_target_frames(mmif: Mmif, annotation: Annotation, min_timepoints: in if count == 1: indices = [num_targets // 2] else: - indices = [int(i * (num_targets - 1) / (count - 1)) for i in range(count)] + indices = [int(i * (num_targets - 1) / (count - 1)) + for i in range(count)] selected_target_ids = [targets[i] for i in indices] timepoints_ms = _tp_ids_to_timepoints_ms(mmif, selected_target_ids) video_doc = _resolve_video_document(mmif, annotation) - images = extract_timepoints_as_images(video_doc, timepoints_ms, as_PIL=as_PIL) + images = extract_images_from_timepoints( + video_doc, timepoints_ms, as_PIL=as_PIL) return images, selected_target_ids -def extract_frames_by_mode( +def extract_images_by_count( + mmif: Mmif, + annotation: Annotation, + min_timepoints: int = 0, + max_timepoints: int = sys.maxsize, + fraction: float = 1.0, + as_PIL: bool = False + ) -> List: + """ + Extracts images at a count-controlled subset of the timepoints listed + in the ``targets`` property of an annotation. See + :py:func:`extract_images_by_count_with_sources` for selection details + and for a variant that also returns the IDs of the selected target TPs. + + :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance + :param annotation: :py:class:`~mmif.serialize.annotation.Annotation` + instance containing a ``targets`` property + :param min_timepoints: minimum number of timepoints to include + :param max_timepoints: maximum number of timepoints to include + :param fraction: fraction of targets to include (ideally) + :param as_PIL: return :py:class:`~PIL.Image.Image` instead of + :py:class:`~numpy.ndarray` + :return: list of images + :rtype: list + """ + images, _ = extract_images_by_count_with_sources( + mmif, annotation, + min_timepoints=min_timepoints, + max_timepoints=max_timepoints, + fraction=fraction, + as_PIL=as_PIL, + ) + return images + + +def extract_images_by_mode_with_sources( mmif: Mmif, time_frame: Annotation, mode: Union[SamplingMode, None] = None, - as_PIL: bool = False -) -> List: + as_PIL: bool = False, +) -> Tuple[List, List[Union[str, int]]]: """ - Extracts frames from a TimeFrame annotation based on a - sampling mode. If ``mode`` is not specified, uses the - context-level default (set via + Extracts images from a TimeFrame using a :py:class:`SamplingMode`, + alongside the per-image source: a TP annotation id (``str``) when the + image was selected from a TP (a representative or a target), or the + sampled timepoint in milliseconds (``int``) when a fallback path was + used (SINGLE with no representatives, or ALL with no targets). + + If ``mode`` is not specified, uses the context-level default (set via :py:data:`_sampling_mode` context variable). :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance :param time_frame: TimeFrame annotation to sample from - :param mode: :py:class:`SamplingMode`, or None to use - the context default - :param as_PIL: return PIL Images instead of ndarrays - :return: list of frames (may be empty for - ``REPRESENTATIVES`` mode when no representatives exist) + :param mode: :py:class:`SamplingMode`, or None to use the context + default + :param as_PIL: return :py:class:`PIL.Image.Image` instead of + :py:class:`~numpy.ndarray` + :return: tuple of (list of images, list of sources); the two lists + are parallel. May be ``([], [])`` for ``REPRESENTATIVES`` mode + when no representatives exist. + :rtype: tuple """ if mode is None: mode = _sampling_mode.get() if mode == SamplingMode.ALL: - timepoints_ms = _sample_all_timepoints_ms(mmif, time_frame) + pairs = _sample_all_timepoint_pairs_ms(mmif, time_frame) elif mode == SamplingMode.REPRESENTATIVES: - timepoints_ms = _sample_representatives_timepoints_ms(mmif, time_frame) + pairs = _sample_representatives_timepoint_pairs_ms(mmif, time_frame) else: - timepoints_ms = _sample_single_timepoint_ms(mmif, time_frame) - if not timepoints_ms: - return [] + pairs = _sample_single_timepoint_pair_ms(mmif, time_frame) + if not pairs: + return [], [] + timepoints_ms = [ms for ms, _ in pairs] + sources: List[Union[str, int]] = [ + tp_id if tp_id is not None else ms for ms, tp_id in pairs + ] video_doc = _resolve_video_document(mmif, time_frame) - return extract_timepoints_as_images(video_doc, timepoints_ms, as_PIL=as_PIL) + images = extract_images_from_timepoints( + video_doc, timepoints_ms, as_PIL=as_PIL) + return images, sources + + +def extract_images_by_mode( + mmif: Mmif, + time_frame: Annotation, + mode: Union[SamplingMode, None] = None, + as_PIL: bool = False, +) -> List: + """ + Extracts images from a TimeFrame using a :py:class:`SamplingMode`. + See :py:func:`extract_images_by_mode_with_sources` for the variant + that also returns the per-image source IDs / timepoints. + + If ``mode`` is not specified, uses the context-level default (set via + :py:data:`_sampling_mode` context variable). + + :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance + :param time_frame: TimeFrame annotation to sample from + :param mode: :py:class:`SamplingMode`, or None to use the context + default + :param as_PIL: return :py:class:`PIL.Image.Image` instead of + :py:class:`~numpy.ndarray` + :return: list of images (may be empty for ``REPRESENTATIVES`` mode + when no representatives exist) + :rtype: list + """ + images, _ = extract_images_by_mode_with_sources( + mmif, time_frame, mode=mode, as_PIL=as_PIL) + return images def sample_timepoints( @@ -655,23 +565,251 @@ def sample_timepoints( return timepoints -def sample_frames(start_frame: int, end_frame: int, sample_rate: float = 1) -> List[int]: +def convert_timepoint(mmif: Mmif, timepoint: Annotation, out_unit: str) -> Union[int, float, str]: """ - .. deprecated:: - Use :py:func:`sample_timepoints` instead. See issue #379. - - Helper function to sample frames from a time interval. - Can also be used as a "cutoff" function when used with ``start_frame==0`` and ``sample_rate==1``. + Converts a time point included in an annotation to a different time unit. + The input annotation must have ``timePoint`` property. - :param start_frame: start frame of the interval - :param end_frame: end frame of the interval - :param sample_rate: sampling rate (or step) to configure how often to take a frame, default is 1, meaning all consecutive frames are sampled - :return: list of frame numbers to extract + :param mmif: input MMIF to obtain fps and input timeunit + :param timepoint: :py:class:`~mmif.serialize.annotation.Annotation` instance with ``timePoint`` property + :param out_unit: time unit to which the point is converted (``frames``, ``seconds``, ``milliseconds``) + :return: frame number (integer) or second/millisecond (float) of input timepoint """ - warnings.warn( - f'sample_frames() is deprecated; use sample_timepoints() instead. ' - f'{_PTS_BUG_NOTICE}', - DeprecationWarning, stacklevel=2, + in_unit = timepoint.get_property('timeUnit') + vd = mmif[timepoint.get_property('document')] + return convert(timepoint.get_property('timePoint'), in_unit, out_unit, get_framerate(vd)) + + +def convert_timeframe(mmif: Mmif, time_frame: Annotation, out_unit: str) -> Tuple[Union[int, float, str], Union[int, float, str]]: + """ + Converts start and end points in a ``TimeFrame`` annotation a different time unit. + + :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance + :param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance that holds a time interval annotation (``"@type": ".../TimeFrame/..."``) + :param out_unit: time unit to which the point is converted + :return: tuple of frame numbers, seconds/milliseconds, or ISO notation of TimeFrame's start and end + """ + in_unit = time_frame.get_property('timeUnit') + vd = mmif[time_frame.get_property('document')] + fps = get_framerate(vd) + return convert(time_frame.get_property('start'), in_unit, out_unit, fps), convert(time_frame.get_property('end'), in_unit, out_unit, fps) + + +def capture(video_document: Document): + """ + .. deprecated:: + Use :py:func:`open_container` instead. See issue #379. + + Captures a video file using OpenCV and adds fps, frame count, and duration as properties to the document. + + :param video_document: :py:class:`~mmif.serialize.annotation.Document` instance that holds a video document (``"@type": ".../VideoDocument/..."``) + :return: `OpenCV VideoCapture `_ object + """ + warnings.warn( + f'capture() is deprecated; use open_container() instead. ' + f'{_PTS_BUG_NOTICE}', + DeprecationWarning, stacklevel=2, + ) + cv2 = _check_cv_dep('cv2') + if video_document is None or video_document.at_type != DocumentTypes.VideoDocument: + raise ValueError(f'The document does not exist.') + + v = cv2.VideoCapture(video_document.location_path(nonexist_ok=False)) + fps = round(v.get(cv2.CAP_PROP_FPS), 2) + fc = v.get(cv2.CAP_PROP_FRAME_COUNT) + dur = round(fc / fps, 3) * 1000 + video_document.add_property(FPS_DOCPROP_KEY, fps) + video_document.add_property(FRAMECOUNT_DOCPROP_KEY, fc) + video_document.add_property(DURATION_DOCPROP_KEY, dur) + return v + + +def extract_frames_as_images(video_document: Document, framenums: Iterable[int], as_PIL: bool = False, record_ffmpeg_errors: bool = False): + """ + .. deprecated:: + Use :py:func:`extract_images_from_timepoints` instead. See issue #379. + + Extracts frames from a video document as a list of :py:class:`numpy.ndarray`. + Use with :py:func:`sample_frames` function to get the list of frame numbers first. + + :param video_document: :py:class:`~mmif.serialize.annotation.Document` instance that holds a video document (``"@type": ".../VideoDocument/..."``) + :param framenums: iterable integers representing the frame numbers to extract + :param as_PIL: return :py:class:`PIL.Image.Image` instead of :py:class:`~numpy.ndarray` + :param record_ffmpeg_errors: if True, records and warns about FFmpeg stderr output during extraction + :return: frames as a list of :py:class:`~numpy.ndarray` or :py:class:`~PIL.Image.Image` + """ + warnings.warn( + f'extract_frames_as_images() is deprecated; use ' + f'extract_images_from_timepoints() instead. {_PTS_BUG_NOTICE}', + DeprecationWarning, stacklevel=2, + ) + cv2 = _check_cv_dep('cv2') + # deduplicate and sort frame numbers for extraction, then map back to original order + original_framenums = list(framenums) + unique_framenums = sorted(set(original_framenums)) + if as_PIL: + Image = _check_cv_dep('PIL.Image') + unique_frames = {} + video = capture(video_document) + cur_f = 0 + tot_fcount = video_document.get_property(FRAMECOUNT_DOCPROP_KEY) + # when the target frame is more than this frames away, fast-forward instead of reading frame by frame + # this is sanity-checked with a small number of video samples + # (frame-by-frame ndarrays are compared with fast-forwarded ndarrays) + skip_threadhold = 1000 + framenumi = iter(unique_framenums) + next_target_f = next(framenumi, None) + cpipes = _check_cv_dep('wurlitzer').pipes + ffmpeg_errs = StringIO() + with cpipes(stderr=ffmpeg_errs, stdout=sys.stdout): + while True: + if next_target_f is None or cur_f > tot_fcount or next_target_f > tot_fcount: + break + if next_target_f - cur_f > skip_threadhold: + while next_target_f - cur_f > skip_threadhold: + cur_f += skip_threadhold + else: + video.set(cv2.CAP_PROP_POS_FRAMES, cur_f) + ret, frame = video.read() + if cur_f == next_target_f: + if not ret: + sec = convert(cur_f, 'f', 's', video_document.get_property(FPS_DOCPROP_KEY)) + warnings.warn(f'Frame #{cur_f} ({sec}s) could not be read from the video {video_document.id} @ {video_document.location} .') + else: + unique_frames[cur_f] = Image.fromarray(frame[:, :, ::-1]) if as_PIL else frame + next_target_f = next(framenumi, None) + cur_f += 1 + ffmpeg_err_str = ffmpeg_errs.getvalue() + if ffmpeg_err_str and record_ffmpeg_errors: + warnings.warn(f'FFmpeg output during extracting frames: {ffmpeg_err_str}') + video.release() + # return frames in original input order, duplicating where needed + return [unique_frames[f] for f in original_framenums if f in unique_frames] + + +def _get_mid_framenum(mmif: Mmif, time_frame: Annotation) -> int: + """ + Calculates the middle frame number of a time interval annotation. + Used internally by deprecated helpers below. + + :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance + :param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance that holds a time interval annotation (``"@type": ".../TimeFrame/..."``) + :return: middle frame number as an integer + """ + timeunit = time_frame.get_property('timeUnit') + video_document = mmif[time_frame.get_property('document')] + fps = get_framerate(video_document) + return int(convert(time_frame.get_property('start') + time_frame.get_property('end'), timeunit, 'frame', fps) // 2) + + +def get_mid_framenum(mmif: Mmif, time_frame: Annotation) -> int: + """ + .. deprecated:: + Use :py:func:`extract_frames_by_mode` instead. + """ + warnings.warn('This function is deprecated. Use ``extract_frames_by_mode()`` instead.', DeprecationWarning, stacklevel=2) + return _get_mid_framenum(mmif, time_frame) + + +def extract_mid_frame(mmif: Mmif, time_frame: Annotation, as_PIL: bool = False): + """ + .. deprecated:: + Use :py:func:`extract_frames_by_mode` instead. + + Extracts the middle frame of a time interval annotation as a numpy ndarray. + + :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance + :param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance that holds a time interval annotation (``"@type": ".../TimeFrame/..."``) + :param as_PIL: return :py:class:`~PIL.Image.Image` instead of :py:class:`~numpy.ndarray` + :return: frame as a :py:class:`numpy.ndarray` or :py:class:`PIL.Image.Image` + """ + warnings.warn('This function is deprecated. Use ``extract_frames_by_mode()`` instead.', DeprecationWarning, stacklevel=2) + vd = mmif[time_frame.get_property('document')] + fn = get_mid_framenum(mmif, time_frame) + return extract_frames_as_images(vd, [fn], as_PIL=as_PIL)[0] + + +def get_representative_framenums(mmif: Mmif, time_frame: Annotation) -> List[int]: + """ + .. deprecated:: + Use :py:func:`extract_frames_by_mode` instead. + + Calculates the representative frame numbers from an annotation. To pick the representative frames, it first looks + up the ``representatives`` property of the ``TimeFrame`` annotation. If it is not found, it will calculate the + number of the middle frame. + + :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance + :param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance that holds a time interval annotation containing a `representatives` property (``"@type": ".../TimeFrame/..."``) + :return: representative frame number as an integer + """ + warnings.warn('This function is deprecated. Use ``extract_frames_by_mode()`` instead.', DeprecationWarning, stacklevel=2) + if 'representatives' not in time_frame.properties: + return [_get_mid_framenum(mmif, time_frame)] + timeunit = time_frame.get_property('timeUnit') + video_document = mmif[time_frame.get_property('document')] + fps = get_framerate(video_document) + representatives = time_frame.get_property('representatives') + ref_frams = [] + for rep_id in representatives: + try: + rep_anno = mmif[rep_id] + except KeyError as ke: + raise ValueError(f'Representative timepoint {rep_id} not found in any view. ({ke})') + ref_frams.append(int(convert(rep_anno.get_property('timePoint'), timeunit, 'frame', fps))) + return ref_frams + + +def get_representative_framenum(mmif: Mmif, time_frame: Annotation) -> int: + """ + .. deprecated:: + Use :py:func:`extract_frames_by_mode` instead. + + A thin wrapper around :py:func:`get_representative_framenums` to return a single representative frame number. Always + return the first frame number found. + """ + warnings.warn('This function is deprecated. Use ``extract_frames_by_mode()`` instead.', DeprecationWarning, stacklevel=2) + try: + return get_representative_framenums(mmif, time_frame)[0] + except IndexError: + raise ValueError(f'No representative frame found in the TimeFrame annotation {time_frame.id}.') + + +def extract_representative_frame(mmif: Mmif, time_frame: Annotation, as_PIL: bool = False, first_only: bool = True): + """ + .. deprecated:: + Use :py:func:`extract_frames_by_mode` instead. + + Extracts the representative frame of an annotation as a numpy ndarray or PIL Image. + + :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance + :param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance that holds a time interval annotation (``"@type": ".../TimeFrame/..."``) + :param as_PIL: return :py:class:`~PIL.Image.Image` instead of :py:class:`~numpy.ndarray` + :param first_only: return the first representative frame only + :return: frame as a :py:class:`numpy.ndarray` or :py:class:`PIL.Image.Image` + """ + warnings.warn('This function is deprecated. Use ``extract_frames_by_mode()`` instead.', DeprecationWarning, stacklevel=2) + video_document = mmif[time_frame.get_property('document')] + rep_frame_num = [get_representative_framenum(mmif, time_frame)] if first_only else get_representative_framenums(mmif, time_frame) + return extract_frames_as_images(video_document, rep_frame_num, as_PIL=as_PIL)[0] + + +def sample_frames(start_frame: int, end_frame: int, sample_rate: float = 1) -> List[int]: + """ + .. deprecated:: + Use :py:func:`sample_timepoints` instead. See issue #379. + + Helper function to sample frames from a time interval. + Can also be used as a "cutoff" function when used with ``start_frame==0`` and ``sample_rate==1``. + + :param start_frame: start frame of the interval + :param end_frame: end frame of the interval + :param sample_rate: sampling rate (or step) to configure how often to take a frame, default is 1, meaning all consecutive frames are sampled + :return: list of frame numbers to extract + """ + warnings.warn( + f'sample_frames() is deprecated; use sample_timepoints() instead. ' + f'{_PTS_BUG_NOTICE}', + DeprecationWarning, stacklevel=2, ) if sample_rate < 1: raise ValueError(f"Sample rate must be greater than 1, but got {sample_rate}") @@ -704,36 +842,6 @@ def get_annotation_property(mmif, annotation, prop_name): return annotation.get_property(prop_name) -def convert_timepoint(mmif: Mmif, timepoint: Annotation, out_unit: str) -> Union[int, float, str]: - """ - Converts a time point included in an annotation to a different time unit. - The input annotation must have ``timePoint`` property. - - :param mmif: input MMIF to obtain fps and input timeunit - :param timepoint: :py:class:`~mmif.serialize.annotation.Annotation` instance with ``timePoint`` property - :param out_unit: time unit to which the point is converted (``frames``, ``seconds``, ``milliseconds``) - :return: frame number (integer) or second/millisecond (float) of input timepoint - """ - in_unit = timepoint.get_property('timeUnit') - vd = mmif[timepoint.get_property('document')] - return convert(timepoint.get_property('timePoint'), in_unit, out_unit, get_framerate(vd)) - - -def convert_timeframe(mmif: Mmif, time_frame: Annotation, out_unit: str) -> Tuple[Union[int, float, str], Union[int, float, str]]: - """ - Converts start and end points in a ``TimeFrame`` annotation a different time unit. - - :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance - :param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance that holds a time interval annotation (``"@type": ".../TimeFrame/..."``) - :param out_unit: time unit to which the point is converted - :return: tuple of frame numbers, seconds/milliseconds, or ISO notation of TimeFrame's start and end - """ - in_unit = time_frame.get_property('timeUnit') - vd = mmif[time_frame.get_property('document')] - fps = get_framerate(vd) - return convert(time_frame.get_property('start'), in_unit, out_unit, fps), convert(time_frame.get_property('end'), in_unit, out_unit, fps) - - def framenum_to_second(video_doc: Document, frame: int): """ .. deprecated:: @@ -765,7 +873,7 @@ def framenum_to_millisecond(video_doc: Document, frame: int): def second_to_framenum(video_doc: Document, second) -> int: """ .. deprecated:: - Use :py:func:`extract_timepoints_as_images` or stay in the time + Use :py:func:`extract_images_from_timepoints` or stay in the time domain. See issue #379. """ warnings.warn( @@ -779,7 +887,7 @@ def second_to_framenum(video_doc: Document, second) -> int: def millisecond_to_framenum(video_doc: Document, millisecond: float) -> int: """ .. deprecated:: - Use :py:func:`extract_timepoints_as_images` or stay in the time + Use :py:func:`extract_images_from_timepoints` or stay in the time domain. See issue #379. """ warnings.warn( @@ -788,3 +896,44 @@ def millisecond_to_framenum(video_doc: Document, millisecond: float) -> int: ) fps = get_framerate(video_doc) return int(convert(millisecond, 'ms', 'f', fps)) + + +def extract_timepoints_as_images(*args, **kwargs): + """ + .. deprecated:: + Renamed to :py:func:`extract_images_from_timepoints`. + """ + warnings.warn( + 'extract_timepoints_as_images() is deprecated; ' + 'use extract_images_from_timepoints() instead.', + DeprecationWarning, stacklevel=2, + ) + return extract_images_from_timepoints(*args, **kwargs) + + +def extract_target_frames(*args, **kwargs): + """ + .. deprecated:: + Renamed to :py:func:`extract_images_by_count_with_sources`. + For a bare-images variant, use :py:func:`extract_images_by_count`. + """ + warnings.warn( + 'extract_target_frames() is deprecated; ' + 'use extract_images_by_count_with_sources() instead.', + DeprecationWarning, stacklevel=2, + ) + return extract_images_by_count_with_sources(*args, **kwargs) + + +def extract_frames_by_mode(*args, **kwargs): + """ + .. deprecated:: + Renamed to :py:func:`extract_images_by_mode`. For per-image source + IDs, use :py:func:`extract_images_by_mode_with_sources`. + """ + warnings.warn( + 'extract_frames_by_mode() is deprecated; ' + 'use extract_images_by_mode() instead.', + DeprecationWarning, stacklevel=2, + ) + return extract_images_by_mode(*args, **kwargs) diff --git a/tests/test_utils.py b/tests/test_utils.py index fff35331..0f17be05 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -195,18 +195,18 @@ def test_sample_timepoints(self): with pytest.raises(ValueError): vdh.sample_timepoints(0, 100, -10) - def test_extract_timepoints_as_images(self): + def test_extract_images_from_timepoints(self): # basic: three distinct timepoints ms_list = [1000, 2000, 3000] - imgs = vdh.extract_timepoints_as_images( + imgs = vdh.extract_images_from_timepoints( self.video_doc, ms_list, as_PIL=False) self.assertEqual(3, len(imgs)) # empty input self.assertEqual( - [], vdh.extract_timepoints_as_images(self.video_doc, [])) + [], vdh.extract_images_from_timepoints(self.video_doc, [])) # duplicates preserved in input order dup_ms = [500, 250, 500, 750, 250] - dup_imgs = vdh.extract_timepoints_as_images(self.video_doc, dup_ms) + dup_imgs = vdh.extract_images_from_timepoints(self.video_doc, dup_ms) self.assertEqual(5, len(dup_imgs)) def _make_timepoints(self, count): @@ -222,27 +222,31 @@ def _make_timepoints(self, count): tps.append(tp) return tps - def test_sample_all_timepoints_ms(self): + def test_sample_all_timepoint_pairs_ms(self): tps = self._make_timepoints(10) parent = self.a_view.new_annotation( AnnotationTypes.TimeFrame, aid='tf_0', targets=[tp.id for tp in tps]) - ms_list = vdh._sample_all_timepoints_ms(self.mmif_obj, parent) - self.assertEqual([i * 100 for i in range(10)], ms_list) + pairs = vdh._sample_all_timepoint_pairs_ms(self.mmif_obj, parent) + # source IDs are the target TP ids; ms values match timePoints + self.assertEqual( + [(i * 100, tp.id) for i, tp in enumerate(tps)], pairs) - # start/end fallback (no targets): sampled at the stream's frame rate + # start/end fallback (no targets): sampled at the stream's frame + # rate; source is None for each sampled point parent2 = self.a_view.new_annotation( AnnotationTypes.TimeFrame, aid='tf_1', start=0, end=1000, timeUnit='milliseconds', document=self.video_doc.id) - ms_list2 = vdh._sample_all_timepoints_ms(self.mmif_obj, parent2) + pairs2 = vdh._sample_all_timepoint_pairs_ms(self.mmif_obj, parent2) # 30 frames in 1000ms at 29.97fps (step ≈ 33.37ms) - self.assertEqual(30, len(ms_list2)) - self.assertEqual(0, ms_list2[0]) - self.assertLess(ms_list2[-1], 1000) + self.assertEqual(30, len(pairs2)) + self.assertEqual((0, None), pairs2[0]) + self.assertTrue(all(src is None for _, src in pairs2)) + self.assertLess(pairs2[-1][0], 1000) - def test_sample_representatives_timepoints_ms(self): + def test_sample_representatives_timepoint_pairs_ms(self): tps = self._make_timepoints(10) reps = [tps[2].id, tps[5].id, tps[8].id] parent = self.a_view.new_annotation( @@ -250,19 +254,20 @@ def test_sample_representatives_timepoints_ms(self): targets=[tp.id for tp in tps], representatives=reps) - ms_list = vdh._sample_representatives_timepoints_ms( + pairs = vdh._sample_representatives_timepoint_pairs_ms( self.mmif_obj, parent) - self.assertEqual([200, 500, 800], ms_list) + self.assertEqual( + [(200, reps[0]), (500, reps[1]), (800, reps[2])], pairs) # no representatives → empty (skip) parent2 = self.a_view.new_annotation( AnnotationTypes.TimeFrame, aid='tf_1', targets=[tp.id for tp in tps]) self.assertEqual( - [], vdh._sample_representatives_timepoints_ms( + [], vdh._sample_representatives_timepoint_pairs_ms( self.mmif_obj, parent2)) - def test_sample_single_timepoint_ms(self): + def test_sample_single_timepoint_pair_ms(self): tps = self._make_timepoints(10) reps = [tps[2].id, tps[5].id, tps[8].id] parent = self.a_view.new_annotation( @@ -270,19 +275,120 @@ def test_sample_single_timepoint_ms(self): targets=[tp.id for tp in tps], representatives=reps) - # middle representative (index 1 of 3 → tps[5] → 500ms) + # middle representative (index 1 of 3 → tps[5] → 500ms, source=reps[1]) self.assertEqual( - [500], - vdh._sample_single_timepoint_ms(self.mmif_obj, parent)) + [(500, reps[1])], + vdh._sample_single_timepoint_pair_ms(self.mmif_obj, parent)) - # start/end fallback midpoint + # start/end fallback midpoint: source is None parent2 = self.a_view.new_annotation( AnnotationTypes.TimeFrame, aid='tf_1', start=100, end=500, timeUnit='milliseconds', document=self.video_doc.id) self.assertEqual( - [300], - vdh._sample_single_timepoint_ms(self.mmif_obj, parent2)) + [(300, None)], + vdh._sample_single_timepoint_pair_ms(self.mmif_obj, parent2)) + + def test_extract_images_by_count(self): + tps = self._make_timepoints(10) + parent = self.a_view.new_annotation( + AnnotationTypes.TimeFrame, aid='tf_0', + targets=[tp.id for tp in tps]) + + # bare form: returns just images + imgs = vdh.extract_images_by_count(self.mmif_obj, parent, fraction=0.3) + self.assertEqual(3, len(imgs)) + + # missing 'targets' raises ValueError (bare delegates through + # _with_sources) + no_targets = self.a_view.new_annotation( + AnnotationTypes.TimeFrame, aid='tf_no', + start=0, end=100, timeUnit='milliseconds', + document=self.video_doc.id) + with pytest.raises(ValueError): + vdh.extract_images_by_count(self.mmif_obj, no_targets) + + def test_extract_images_by_count_with_sources(self): + tps = self._make_timepoints(10) + parent = self.a_view.new_annotation( + AnnotationTypes.TimeFrame, aid='tf_0', + targets=[tp.id for tp in tps]) + + imgs, ids = vdh.extract_images_by_count_with_sources( + self.mmif_obj, parent, fraction=0.5) + self.assertEqual(len(imgs), len(ids)) + self.assertEqual(5, len(imgs)) + # source IDs are a subset of the input targets, in order + self.assertEqual(ids, sorted(ids, key=[t.id for t in tps].index)) + for tid in ids: + self.assertIn(tid, [t.id for t in tps]) + + # empty targets → empty parallel lists + empty = self.a_view.new_annotation( + AnnotationTypes.TimeFrame, aid='tf_empty', targets=[]) + self.assertEqual( + ([], []), + vdh.extract_images_by_count_with_sources(self.mmif_obj, empty)) + + def test_extract_images_by_mode_with_sources_reps(self): + tps = self._make_timepoints(10) + reps = [tps[2].id, tps[5].id, tps[8].id] + parent = self.a_view.new_annotation( + AnnotationTypes.TimeFrame, aid='tf_0', + targets=[tp.id for tp in tps], + representatives=reps) + + imgs, sources = vdh.extract_images_by_mode_with_sources( + self.mmif_obj, parent, mode=vdh.SamplingMode.REPRESENTATIVES) + self.assertEqual(3, len(imgs)) + # all sources are TP IDs (str), not ms ints + self.assertEqual(reps, sources) + self.assertTrue(all(isinstance(s, str) for s in sources)) + + def test_extract_images_by_mode_with_sources_single_fallback(self): + # SINGLE mode without representatives → midpoint fallback, + # source is the midpoint ms (int) + parent = self.a_view.new_annotation( + AnnotationTypes.TimeFrame, aid='tf_0', + start=100, end=500, timeUnit='milliseconds', + document=self.video_doc.id) + imgs, sources = vdh.extract_images_by_mode_with_sources( + self.mmif_obj, parent, mode=vdh.SamplingMode.SINGLE) + self.assertEqual(1, len(imgs)) + self.assertEqual([300], sources) + self.assertTrue(all(isinstance(s, int) for s in sources)) + + def test_extract_images_by_mode_with_sources_all_fallback(self): + # ALL mode without targets → stream-rate sampling, sources are + # the sampled ms ints + parent = self.a_view.new_annotation( + AnnotationTypes.TimeFrame, aid='tf_0', + start=0, end=200, timeUnit='milliseconds', + document=self.video_doc.id) + imgs, sources = vdh.extract_images_by_mode_with_sources( + self.mmif_obj, parent, mode=vdh.SamplingMode.ALL) + self.assertEqual(len(imgs), len(sources)) + self.assertGreater(len(imgs), 0) + self.assertTrue(all(isinstance(s, int) for s in sources)) + # the sources match the sampled ms values + self.assertEqual(sorted(sources), sources) + + def test_renamed_helpers_emit_deprecation_warnings(self): + # The three pre-rename names live as deprecation aliases that + # delegate to the new names. + with pytest.warns(DeprecationWarning, match='extract_images_from_timepoints'): + vdh.extract_timepoints_as_images(self.video_doc, [500]) + + tps = self._make_timepoints(4) + parent = self.a_view.new_annotation( + AnnotationTypes.TimeFrame, aid='tf_dep', + targets=[tp.id for tp in tps]) + with pytest.warns(DeprecationWarning, match='extract_images_by_count_with_sources'): + vdh.extract_target_frames(self.mmif_obj, parent) + + with pytest.warns(DeprecationWarning, match='extract_images_by_mode'): + vdh.extract_frames_by_mode( + self.mmif_obj, parent, mode=vdh.SamplingMode.ALL) def test_pts_offset_regression(self): # regression for https://github.com/clamsproject/mmif-python/issues/379 @@ -314,7 +420,7 @@ def test_pts_offset_regression(self): # requested 33ms should resolve to the actual PTS-equivalent frame # (start_time is ~33ms; the first frame's PTS is nearest 33ms) - imgs = vdh.extract_timepoints_as_images(vd, [33], as_PIL=False) + imgs = vdh.extract_images_from_timepoints(vd, [33], as_PIL=False) self.assertEqual(1, len(imgs)) got_pts = pts_by_hash.get(hash(imgs[0].tobytes())) self.assertIsNotNone(got_pts)