From 33954da1e87b98cbfa437bc79ed5b9f174bca12c Mon Sep 17 00:00:00 2001 From: Keigh Rim Date: Sun, 26 Apr 2026 14:48:35 -0400 Subject: [PATCH 1/3] moving deprecated functions --- mmif/utils/video_document_helper.py | 302 +--------------------------- 1 file changed, 11 insertions(+), 291 deletions(-) diff --git a/mmif/utils/video_document_helper.py b/mmif/utils/video_document_helper.py index 4c153243..5e716c47 100644 --- a/mmif/utils/video_document_helper.py +++ b/mmif/utils/video_document_helper.py @@ -120,35 +120,6 @@ def open_container(video_document: Document): return container -def capture(video_document: Document): - """ - .. deprecated:: - Use :py:func:`open_container` instead. See issue #379. - - Captures a video file using OpenCV and adds fps, frame count, and duration as properties to the document. - - :param video_document: :py:class:`~mmif.serialize.annotation.Document` instance that holds a video document (``"@type": ".../VideoDocument/..."``) - :return: `OpenCV VideoCapture `_ object - """ - warnings.warn( - f'capture() is deprecated; use open_container() instead. ' - f'{_PTS_BUG_NOTICE}', - DeprecationWarning, stacklevel=2, - ) - cv2 = _check_cv_dep('cv2') - if video_document is None or video_document.at_type != DocumentTypes.VideoDocument: - raise ValueError(f'The document does not exist.') - - v = cv2.VideoCapture(video_document.location_path(nonexist_ok=False)) - fps = round(v.get(cv2.CAP_PROP_FPS), 2) - fc = v.get(cv2.CAP_PROP_FRAME_COUNT) - dur = round(fc / fps, 3) * 1000 - video_document.add_property(FPS_DOCPROP_KEY, fps) - video_document.add_property(FRAMECOUNT_DOCPROP_KEY, fc) - video_document.add_property(DURATION_DOCPROP_KEY, dur) - return v - - def get_framerate(video_document: Document) -> float: """ Gets the frame rate of a video document. First by checking the fps @@ -260,78 +231,6 @@ def _emit(frame, t_ms): return [result_map[t] for t in original_timepoints if t in result_map] -def extract_frames_as_images(video_document: Document, framenums: Iterable[int], as_PIL: bool = False, record_ffmpeg_errors: bool = False): - """ - .. deprecated:: - Use :py:func:`extract_timepoints_as_images` instead. See issue #379. - - Extracts frames from a video document as a list of :py:class:`numpy.ndarray`. - Use with :py:func:`sample_frames` function to get the list of frame numbers first. - - :param video_document: :py:class:`~mmif.serialize.annotation.Document` instance that holds a video document (``"@type": ".../VideoDocument/..."``) - :param framenums: iterable integers representing the frame numbers to extract - :param as_PIL: return :py:class:`PIL.Image.Image` instead of :py:class:`~numpy.ndarray` - :param record_ffmpeg_errors: if True, records and warns about FFmpeg stderr output during extraction - :return: frames as a list of :py:class:`~numpy.ndarray` or :py:class:`~PIL.Image.Image` - """ - warnings.warn( - f'extract_frames_as_images() is deprecated; use ' - f'extract_timepoints_as_images() instead. {_PTS_BUG_NOTICE}', - DeprecationWarning, stacklevel=2, - ) - cv2 = _check_cv_dep('cv2') - # deduplicate and sort frame numbers for extraction, then map back to original order - original_framenums = list(framenums) - unique_framenums = sorted(set(original_framenums)) - if as_PIL: - Image = _check_cv_dep('PIL.Image') - unique_frames = {} - video = capture(video_document) - cur_f = 0 - tot_fcount = video_document.get_property(FRAMECOUNT_DOCPROP_KEY) - # when the target frame is more than this frames away, fast-forward instead of reading frame by frame - # this is sanity-checked with a small number of video samples - # (frame-by-frame ndarrays are compared with fast-forwarded ndarrays) - skip_threadhold = 1000 - framenumi = iter(unique_framenums) - next_target_f = next(framenumi, None) - cpipes = _check_cv_dep('wurlitzer').pipes - ffmpeg_errs = StringIO() - with cpipes(stderr=ffmpeg_errs, stdout=sys.stdout): - while True: - if next_target_f is None or cur_f > tot_fcount or next_target_f > tot_fcount: - break - if next_target_f - cur_f > skip_threadhold: - while next_target_f - cur_f > skip_threadhold: - cur_f += skip_threadhold - else: - video.set(cv2.CAP_PROP_POS_FRAMES, cur_f) - ret, frame = video.read() - if cur_f == next_target_f: - if not ret: - sec = convert(cur_f, 'f', 's', video_document.get_property(FPS_DOCPROP_KEY)) - warnings.warn(f'Frame #{cur_f} ({sec}s) could not be read from the video {video_document.id} @ {video_document.location} .') - else: - unique_frames[cur_f] = Image.fromarray(frame[:, :, ::-1]) if as_PIL else frame - next_target_f = next(framenumi, None) - cur_f += 1 - ffmpeg_err_str = ffmpeg_errs.getvalue() - if ffmpeg_err_str and record_ffmpeg_errors: - warnings.warn(f'FFmpeg output during extracting frames: {ffmpeg_err_str}') - video.release() - # return frames in original input order, duplicating where needed - return [unique_frames[f] for f in original_framenums if f in unique_frames] - - -def get_mid_framenum(mmif: Mmif, time_frame: Annotation) -> int: - """ - .. deprecated:: - Use :py:func:`extract_frames_by_mode` instead. - """ - warnings.warn('This function is deprecated. Use ``extract_frames_by_mode()`` instead.', DeprecationWarning, stacklevel=2) - return _get_mid_framenum(mmif, time_frame) - - def _get_mid_framenum(mmif: Mmif, time_frame: Annotation) -> int: """ Calculates the middle frame number of a time interval annotation. @@ -346,88 +245,6 @@ def _get_mid_framenum(mmif: Mmif, time_frame: Annotation) -> int: return int(convert(time_frame.get_property('start') + time_frame.get_property('end'), timeunit, 'frame', fps) // 2) -def extract_mid_frame(mmif: Mmif, time_frame: Annotation, as_PIL: bool = False): - """ - .. deprecated:: - Use :py:func:`extract_frames_by_mode` instead. - - Extracts the middle frame of a time interval annotation as a numpy ndarray. - - :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance - :param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance that holds a time interval annotation (``"@type": ".../TimeFrame/..."``) - :param as_PIL: return :py:class:`~PIL.Image.Image` instead of :py:class:`~numpy.ndarray` - :return: frame as a :py:class:`numpy.ndarray` or :py:class:`PIL.Image.Image` - """ - warnings.warn('This function is deprecated. Use ``extract_frames_by_mode()`` instead.', DeprecationWarning, stacklevel=2) - vd = mmif[time_frame.get_property('document')] - fn = get_mid_framenum(mmif, time_frame) - return extract_frames_as_images(vd, [fn], as_PIL=as_PIL)[0] - - -def get_representative_framenums(mmif: Mmif, time_frame: Annotation) -> List[int]: - """ - .. deprecated:: - Use :py:func:`extract_frames_by_mode` instead. - - Calculates the representative frame numbers from an annotation. To pick the representative frames, it first looks - up the ``representatives`` property of the ``TimeFrame`` annotation. If it is not found, it will calculate the - number of the middle frame. - - :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance - :param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance that holds a time interval annotation containing a `representatives` property (``"@type": ".../TimeFrame/..."``) - :return: representative frame number as an integer - """ - warnings.warn('This function is deprecated. Use ``extract_frames_by_mode()`` instead.', DeprecationWarning, stacklevel=2) - if 'representatives' not in time_frame.properties: - return [_get_mid_framenum(mmif, time_frame)] - timeunit = time_frame.get_property('timeUnit') - video_document = mmif[time_frame.get_property('document')] - fps = get_framerate(video_document) - representatives = time_frame.get_property('representatives') - ref_frams = [] - for rep_id in representatives: - try: - rep_anno = mmif[rep_id] - except KeyError as ke: - raise ValueError(f'Representative timepoint {rep_id} not found in any view. ({ke})') - ref_frams.append(int(convert(rep_anno.get_property('timePoint'), timeunit, 'frame', fps))) - return ref_frams - - -def get_representative_framenum(mmif: Mmif, time_frame: Annotation) -> int: - """ - .. deprecated:: - Use :py:func:`extract_frames_by_mode` instead. - - A thin wrapper around :py:func:`get_representative_framenums` to return a single representative frame number. Always - return the first frame number found. - """ - warnings.warn('This function is deprecated. Use ``extract_frames_by_mode()`` instead.', DeprecationWarning, stacklevel=2) - try: - return get_representative_framenums(mmif, time_frame)[0] - except IndexError: - raise ValueError(f'No representative frame found in the TimeFrame annotation {time_frame.id}.') - - -def extract_representative_frame(mmif: Mmif, time_frame: Annotation, as_PIL: bool = False, first_only: bool = True): - """ - .. deprecated:: - Use :py:func:`extract_frames_by_mode` instead. - - Extracts the representative frame of an annotation as a numpy ndarray or PIL Image. - - :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance - :param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance that holds a time interval annotation (``"@type": ".../TimeFrame/..."``) - :param as_PIL: return :py:class:`~PIL.Image.Image` instead of :py:class:`~numpy.ndarray` - :param first_only: return the first representative frame only - :return: frame as a :py:class:`numpy.ndarray` or :py:class:`PIL.Image.Image` - """ - warnings.warn('This function is deprecated. Use ``extract_frames_by_mode()`` instead.', DeprecationWarning, stacklevel=2) - video_document = mmif[time_frame.get_property('document')] - rep_frame_num = [get_representative_framenum(mmif, time_frame)] if first_only else get_representative_framenums(mmif, time_frame) - return extract_frames_as_images(video_document, rep_frame_num, as_PIL=as_PIL)[0] - - def _tp_ids_to_timepoints_ms(mmif: Mmif, tp_ids: List[str]) -> List[int]: """ Converts a list of timepoint annotation IDs to media-timeline timepoints in milliseconds. @@ -489,8 +306,8 @@ def _timeframe_to_timepoint_range_ms( def _sample_all_timepoints_ms(mmif: Mmif, time_frame: Annotation) -> List[int]: """ Samples all timepoints (ms) from a TimeFrame. Uses all ``targets`` if - present, otherwise samples the start/end interval at the stream's - average frame rate. + present, otherwise samples the start/end interval of the time_frameat + the stream's average frame rate, resulting one sample per second. :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance :param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` @@ -551,7 +368,14 @@ def _sample_single_timepoint_ms( return [(start_ms + end_ms) // 2] -def extract_target_frames(mmif: Mmif, annotation: Annotation, min_timepoints: int = 0, max_timepoints: int = sys.maxsize, fraction: float = 1.0, as_PIL: bool = False): +def extract_target_frames( + mmif: Mmif, + annotation: Annotation, + min_timepoints: int = 0, + max_timepoints: int = sys.maxsize, + fraction: float = 1.0, + as_PIL: bool = False + ) -> Tuple[List, List[str]]: """ Extracts frames corresponding to the timepoints listed in the ``targets`` property of an annotation. Selection of timepoints is based on minimum, maximum, and fraction of targets to include. @@ -655,55 +479,6 @@ def sample_timepoints( return timepoints -def sample_frames(start_frame: int, end_frame: int, sample_rate: float = 1) -> List[int]: - """ - .. deprecated:: - Use :py:func:`sample_timepoints` instead. See issue #379. - - Helper function to sample frames from a time interval. - Can also be used as a "cutoff" function when used with ``start_frame==0`` and ``sample_rate==1``. - - :param start_frame: start frame of the interval - :param end_frame: end frame of the interval - :param sample_rate: sampling rate (or step) to configure how often to take a frame, default is 1, meaning all consecutive frames are sampled - :return: list of frame numbers to extract - """ - warnings.warn( - f'sample_frames() is deprecated; use sample_timepoints() instead. ' - f'{_PTS_BUG_NOTICE}', - DeprecationWarning, stacklevel=2, - ) - if sample_rate < 1: - raise ValueError(f"Sample rate must be greater than 1, but got {sample_rate}") - frame_nums: List[int] = [] - cur_f = start_frame - while cur_f < end_frame: - ceiling = math.ceil(cur_f) - if ceiling < end_frame: - frame_nums.append(math.ceil(cur_f)) - cur_f += sample_rate - return frame_nums - - -def get_annotation_property(mmif, annotation, prop_name): - """ - .. deprecated:: 1.0.8 - Will be removed in 2.0.0. - Use :py:meth:`mmif.serialize.annotation.Annotation.get_property` method instead. - - Get a property value from an annotation. If the property is not found in the annotation, it will look up the metadata of the annotation's parent view and return the value from there. - - :param mmif: MMIF object containing the annotation - :param annotation: Annotation object to get property from - :param prop_name: name of the property to retrieve - :return: the property value - """ - warnings.warn(f'{__name__}() is deprecated. ' - f'Directly ask the annotation for a property by calling annotation.get_property() instead.', - DeprecationWarning) - return annotation.get_property(prop_name) - - def convert_timepoint(mmif: Mmif, timepoint: Annotation, out_unit: str) -> Union[int, float, str]: """ Converts a time point included in an annotation to a different time unit. @@ -732,59 +507,4 @@ def convert_timeframe(mmif: Mmif, time_frame: Annotation, out_unit: str) -> Tupl vd = mmif[time_frame.get_property('document')] fps = get_framerate(vd) return convert(time_frame.get_property('start'), in_unit, out_unit, fps), convert(time_frame.get_property('end'), in_unit, out_unit, fps) - - -def framenum_to_second(video_doc: Document, frame: int): - """ - .. deprecated:: - Use :py:func:`~mmif.utils.timeunit_helper.convert` with ``ms``/``s`` - directly. See issue #379. - """ - warnings.warn( - f'framenum_to_second() is deprecated. {_PTS_BUG_NOTICE}', - DeprecationWarning, stacklevel=2, - ) - fps = get_framerate(video_doc) - return convert(frame, 'f', 's', fps) - - -def framenum_to_millisecond(video_doc: Document, frame: int): - """ - .. deprecated:: - Use :py:func:`~mmif.utils.timeunit_helper.convert` with ``ms``/``s`` - directly. See issue #379. - """ - warnings.warn( - f'framenum_to_millisecond() is deprecated. {_PTS_BUG_NOTICE}', - DeprecationWarning, stacklevel=2, - ) - fps = get_framerate(video_doc) - return convert(frame, 'f', 'ms', fps) - - -def second_to_framenum(video_doc: Document, second) -> int: - """ - .. deprecated:: - Use :py:func:`extract_timepoints_as_images` or stay in the time - domain. See issue #379. - """ - warnings.warn( - f'second_to_framenum() is deprecated. {_PTS_BUG_NOTICE}', - DeprecationWarning, stacklevel=2, - ) - fps = get_framerate(video_doc) - return int(convert(second, 's', 'f', fps)) - - -def millisecond_to_framenum(video_doc: Document, millisecond: float) -> int: - """ - .. deprecated:: - Use :py:func:`extract_timepoints_as_images` or stay in the time - domain. See issue #379. - """ - warnings.warn( - f'millisecond_to_framenum() is deprecated. {_PTS_BUG_NOTICE}', - DeprecationWarning, stacklevel=2, - ) - fps = get_framerate(video_doc) - return int(convert(millisecond, 'ms', 'f', fps)) +# \ No newline at end of file From 747bcee2c53c99c0cdad42bf060c21012ad58cbf Mon Sep 17 00:00:00 2001 From: Keigh Rim Date: Sun, 26 Apr 2026 14:52:10 -0400 Subject: [PATCH 2/3] added deprecated functions back in VDH at the end of the file for readability --- mmif/utils/video_document_helper.py | 289 +++++++++++++++++++++++++++- 1 file changed, 288 insertions(+), 1 deletion(-) diff --git a/mmif/utils/video_document_helper.py b/mmif/utils/video_document_helper.py index 5e716c47..59e50368 100644 --- a/mmif/utils/video_document_helper.py +++ b/mmif/utils/video_document_helper.py @@ -507,4 +507,291 @@ def convert_timeframe(mmif: Mmif, time_frame: Annotation, out_unit: str) -> Tupl vd = mmif[time_frame.get_property('document')] fps = get_framerate(vd) return convert(time_frame.get_property('start'), in_unit, out_unit, fps), convert(time_frame.get_property('end'), in_unit, out_unit, fps) -# \ No newline at end of file + + +def capture(video_document: Document): + """ + .. deprecated:: + Use :py:func:`open_container` instead. See issue #379. + + Captures a video file using OpenCV and adds fps, frame count, and duration as properties to the document. + + :param video_document: :py:class:`~mmif.serialize.annotation.Document` instance that holds a video document (``"@type": ".../VideoDocument/..."``) + :return: `OpenCV VideoCapture `_ object + """ + warnings.warn( + f'capture() is deprecated; use open_container() instead. ' + f'{_PTS_BUG_NOTICE}', + DeprecationWarning, stacklevel=2, + ) + cv2 = _check_cv_dep('cv2') + if video_document is None or video_document.at_type != DocumentTypes.VideoDocument: + raise ValueError(f'The document does not exist.') + + v = cv2.VideoCapture(video_document.location_path(nonexist_ok=False)) + fps = round(v.get(cv2.CAP_PROP_FPS), 2) + fc = v.get(cv2.CAP_PROP_FRAME_COUNT) + dur = round(fc / fps, 3) * 1000 + video_document.add_property(FPS_DOCPROP_KEY, fps) + video_document.add_property(FRAMECOUNT_DOCPROP_KEY, fc) + video_document.add_property(DURATION_DOCPROP_KEY, dur) + return v + + +def extract_frames_as_images(video_document: Document, framenums: Iterable[int], as_PIL: bool = False, record_ffmpeg_errors: bool = False): + """ + .. deprecated:: + Use :py:func:`extract_timepoints_as_images` instead. See issue #379. + + Extracts frames from a video document as a list of :py:class:`numpy.ndarray`. + Use with :py:func:`sample_frames` function to get the list of frame numbers first. + + :param video_document: :py:class:`~mmif.serialize.annotation.Document` instance that holds a video document (``"@type": ".../VideoDocument/..."``) + :param framenums: iterable integers representing the frame numbers to extract + :param as_PIL: return :py:class:`PIL.Image.Image` instead of :py:class:`~numpy.ndarray` + :param record_ffmpeg_errors: if True, records and warns about FFmpeg stderr output during extraction + :return: frames as a list of :py:class:`~numpy.ndarray` or :py:class:`~PIL.Image.Image` + """ + warnings.warn( + f'extract_frames_as_images() is deprecated; use ' + f'extract_timepoints_as_images() instead. {_PTS_BUG_NOTICE}', + DeprecationWarning, stacklevel=2, + ) + cv2 = _check_cv_dep('cv2') + # deduplicate and sort frame numbers for extraction, then map back to original order + original_framenums = list(framenums) + unique_framenums = sorted(set(original_framenums)) + if as_PIL: + Image = _check_cv_dep('PIL.Image') + unique_frames = {} + video = capture(video_document) + cur_f = 0 + tot_fcount = video_document.get_property(FRAMECOUNT_DOCPROP_KEY) + # when the target frame is more than this frames away, fast-forward instead of reading frame by frame + # this is sanity-checked with a small number of video samples + # (frame-by-frame ndarrays are compared with fast-forwarded ndarrays) + skip_threadhold = 1000 + framenumi = iter(unique_framenums) + next_target_f = next(framenumi, None) + cpipes = _check_cv_dep('wurlitzer').pipes + ffmpeg_errs = StringIO() + with cpipes(stderr=ffmpeg_errs, stdout=sys.stdout): + while True: + if next_target_f is None or cur_f > tot_fcount or next_target_f > tot_fcount: + break + if next_target_f - cur_f > skip_threadhold: + while next_target_f - cur_f > skip_threadhold: + cur_f += skip_threadhold + else: + video.set(cv2.CAP_PROP_POS_FRAMES, cur_f) + ret, frame = video.read() + if cur_f == next_target_f: + if not ret: + sec = convert(cur_f, 'f', 's', video_document.get_property(FPS_DOCPROP_KEY)) + warnings.warn(f'Frame #{cur_f} ({sec}s) could not be read from the video {video_document.id} @ {video_document.location} .') + else: + unique_frames[cur_f] = Image.fromarray(frame[:, :, ::-1]) if as_PIL else frame + next_target_f = next(framenumi, None) + cur_f += 1 + ffmpeg_err_str = ffmpeg_errs.getvalue() + if ffmpeg_err_str and record_ffmpeg_errors: + warnings.warn(f'FFmpeg output during extracting frames: {ffmpeg_err_str}') + video.release() + # return frames in original input order, duplicating where needed + return [unique_frames[f] for f in original_framenums if f in unique_frames] + + +def get_mid_framenum(mmif: Mmif, time_frame: Annotation) -> int: + """ + .. deprecated:: + Use :py:func:`extract_frames_by_mode` instead. + """ + warnings.warn('This function is deprecated. Use ``extract_frames_by_mode()`` instead.', DeprecationWarning, stacklevel=2) + return _get_mid_framenum(mmif, time_frame) + + +def extract_mid_frame(mmif: Mmif, time_frame: Annotation, as_PIL: bool = False): + """ + .. deprecated:: + Use :py:func:`extract_frames_by_mode` instead. + + Extracts the middle frame of a time interval annotation as a numpy ndarray. + + :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance + :param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance that holds a time interval annotation (``"@type": ".../TimeFrame/..."``) + :param as_PIL: return :py:class:`~PIL.Image.Image` instead of :py:class:`~numpy.ndarray` + :return: frame as a :py:class:`numpy.ndarray` or :py:class:`PIL.Image.Image` + """ + warnings.warn('This function is deprecated. Use ``extract_frames_by_mode()`` instead.', DeprecationWarning, stacklevel=2) + vd = mmif[time_frame.get_property('document')] + fn = get_mid_framenum(mmif, time_frame) + return extract_frames_as_images(vd, [fn], as_PIL=as_PIL)[0] + + +def get_representative_framenums(mmif: Mmif, time_frame: Annotation) -> List[int]: + """ + .. deprecated:: + Use :py:func:`extract_frames_by_mode` instead. + + Calculates the representative frame numbers from an annotation. To pick the representative frames, it first looks + up the ``representatives`` property of the ``TimeFrame`` annotation. If it is not found, it will calculate the + number of the middle frame. + + :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance + :param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance that holds a time interval annotation containing a `representatives` property (``"@type": ".../TimeFrame/..."``) + :return: representative frame number as an integer + """ + warnings.warn('This function is deprecated. Use ``extract_frames_by_mode()`` instead.', DeprecationWarning, stacklevel=2) + if 'representatives' not in time_frame.properties: + return [_get_mid_framenum(mmif, time_frame)] + timeunit = time_frame.get_property('timeUnit') + video_document = mmif[time_frame.get_property('document')] + fps = get_framerate(video_document) + representatives = time_frame.get_property('representatives') + ref_frams = [] + for rep_id in representatives: + try: + rep_anno = mmif[rep_id] + except KeyError as ke: + raise ValueError(f'Representative timepoint {rep_id} not found in any view. ({ke})') + ref_frams.append(int(convert(rep_anno.get_property('timePoint'), timeunit, 'frame', fps))) + return ref_frams + + +def get_representative_framenum(mmif: Mmif, time_frame: Annotation) -> int: + """ + .. deprecated:: + Use :py:func:`extract_frames_by_mode` instead. + + A thin wrapper around :py:func:`get_representative_framenums` to return a single representative frame number. Always + return the first frame number found. + """ + warnings.warn('This function is deprecated. Use ``extract_frames_by_mode()`` instead.', DeprecationWarning, stacklevel=2) + try: + return get_representative_framenums(mmif, time_frame)[0] + except IndexError: + raise ValueError(f'No representative frame found in the TimeFrame annotation {time_frame.id}.') + + +def extract_representative_frame(mmif: Mmif, time_frame: Annotation, as_PIL: bool = False, first_only: bool = True): + """ + .. deprecated:: + Use :py:func:`extract_frames_by_mode` instead. + + Extracts the representative frame of an annotation as a numpy ndarray or PIL Image. + + :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance + :param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance that holds a time interval annotation (``"@type": ".../TimeFrame/..."``) + :param as_PIL: return :py:class:`~PIL.Image.Image` instead of :py:class:`~numpy.ndarray` + :param first_only: return the first representative frame only + :return: frame as a :py:class:`numpy.ndarray` or :py:class:`PIL.Image.Image` + """ + warnings.warn('This function is deprecated. Use ``extract_frames_by_mode()`` instead.', DeprecationWarning, stacklevel=2) + video_document = mmif[time_frame.get_property('document')] + rep_frame_num = [get_representative_framenum(mmif, time_frame)] if first_only else get_representative_framenums(mmif, time_frame) + return extract_frames_as_images(video_document, rep_frame_num, as_PIL=as_PIL)[0] + + +def sample_frames(start_frame: int, end_frame: int, sample_rate: float = 1) -> List[int]: + """ + .. deprecated:: + Use :py:func:`sample_timepoints` instead. See issue #379. + + Helper function to sample frames from a time interval. + Can also be used as a "cutoff" function when used with ``start_frame==0`` and ``sample_rate==1``. + + :param start_frame: start frame of the interval + :param end_frame: end frame of the interval + :param sample_rate: sampling rate (or step) to configure how often to take a frame, default is 1, meaning all consecutive frames are sampled + :return: list of frame numbers to extract + """ + warnings.warn( + f'sample_frames() is deprecated; use sample_timepoints() instead. ' + f'{_PTS_BUG_NOTICE}', + DeprecationWarning, stacklevel=2, + ) + if sample_rate < 1: + raise ValueError(f"Sample rate must be greater than 1, but got {sample_rate}") + frame_nums: List[int] = [] + cur_f = start_frame + while cur_f < end_frame: + ceiling = math.ceil(cur_f) + if ceiling < end_frame: + frame_nums.append(math.ceil(cur_f)) + cur_f += sample_rate + return frame_nums + + +def get_annotation_property(mmif, annotation, prop_name): + """ + .. deprecated:: 1.0.8 + Will be removed in 2.0.0. + Use :py:meth:`mmif.serialize.annotation.Annotation.get_property` method instead. + + Get a property value from an annotation. If the property is not found in the annotation, it will look up the metadata of the annotation's parent view and return the value from there. + + :param mmif: MMIF object containing the annotation + :param annotation: Annotation object to get property from + :param prop_name: name of the property to retrieve + :return: the property value + """ + warnings.warn(f'{__name__}() is deprecated. ' + f'Directly ask the annotation for a property by calling annotation.get_property() instead.', + DeprecationWarning) + return annotation.get_property(prop_name) + + +def framenum_to_second(video_doc: Document, frame: int): + """ + .. deprecated:: + Use :py:func:`~mmif.utils.timeunit_helper.convert` with ``ms``/``s`` + directly. See issue #379. + """ + warnings.warn( + f'framenum_to_second() is deprecated. {_PTS_BUG_NOTICE}', + DeprecationWarning, stacklevel=2, + ) + fps = get_framerate(video_doc) + return convert(frame, 'f', 's', fps) + + +def framenum_to_millisecond(video_doc: Document, frame: int): + """ + .. deprecated:: + Use :py:func:`~mmif.utils.timeunit_helper.convert` with ``ms``/``s`` + directly. See issue #379. + """ + warnings.warn( + f'framenum_to_millisecond() is deprecated. {_PTS_BUG_NOTICE}', + DeprecationWarning, stacklevel=2, + ) + fps = get_framerate(video_doc) + return convert(frame, 'f', 'ms', fps) + + +def second_to_framenum(video_doc: Document, second) -> int: + """ + .. deprecated:: + Use :py:func:`extract_timepoints_as_images` or stay in the time + domain. See issue #379. + """ + warnings.warn( + f'second_to_framenum() is deprecated. {_PTS_BUG_NOTICE}', + DeprecationWarning, stacklevel=2, + ) + fps = get_framerate(video_doc) + return int(convert(second, 's', 'f', fps)) + + +def millisecond_to_framenum(video_doc: Document, millisecond: float) -> int: + """ + .. deprecated:: + Use :py:func:`extract_timepoints_as_images` or stay in the time + domain. See issue #379. + """ + warnings.warn( + f'millisecond_to_framenum() is deprecated. {_PTS_BUG_NOTICE}', + DeprecationWarning, stacklevel=2, + ) + fps = get_framerate(video_doc) + return int(convert(millisecond, 'ms', 'f', fps)) From 23f8d5949bb4ba1f6cfa36d2eb9767c717ba52be Mon Sep 17 00:00:00 2001 From: Keigh Rim Date: Sun, 26 Apr 2026 18:41:00 -0400 Subject: [PATCH 3/3] VDH: added image extraction with source TP information, renaming for API consistency --- mmif/utils/video_document_helper.py | 300 ++++++++++++++++++++-------- tests/test_utils.py | 154 +++++++++++--- 2 files changed, 351 insertions(+), 103 deletions(-) diff --git a/mmif/utils/video_document_helper.py b/mmif/utils/video_document_helper.py index 59e50368..87c1823b 100644 --- a/mmif/utils/video_document_helper.py +++ b/mmif/utils/video_document_helper.py @@ -7,7 +7,7 @@ import warnings from io import StringIO from typing import Iterable # todo: replace with collections.abc.Iterable in Python 3.9 -from typing import List, Union, Tuple +from typing import List, Optional, Union, Tuple import mmif from mmif import Annotation, Document, Mmif @@ -57,13 +57,13 @@ class SamplingMode(Enum): ), SamplingMode.SINGLE: ( "uses the middle representative if present, otherwise " - "extracts a frame from the midpoint of the start/end " + "extracts an image from the midpoint of the start/end " "interval (midpoint is calculated by floor division " "of the sum of start and end)." ), SamplingMode.ALL: ( "uses all target timepoints if present, otherwise " - "extracts all frames from the time interval." + "extracts all images from the time interval." ), } SAMPLING_MODE_DEFAULT = SamplingMode.REPRESENTATIVES @@ -145,24 +145,24 @@ def get_framerate(video_document: Document) -> float: container.close() -def extract_timepoints_as_images( +def extract_images_from_timepoints( video_document: Document, timepoints_ms: Iterable[int], as_PIL: bool = False, ): """ - Extracts frames at the given media-timeline timepoints (in milliseconds). + Extracts images at the given media-timeline timepoints (in milliseconds). - For each requested timepoint, returns the frame whose actual + For each requested timepoint, returns the image whose actual presentation timestamp (PTS) is closest to it. Duplicate timepoints - produce duplicate frames at the same list positions as the input. + produce duplicate images at the same list positions as the input. :param video_document: :py:class:`~mmif.serialize.annotation.Document` holding a video document (``"@type": ".../VideoDocument/..."``) :param timepoints_ms: iterable of timepoint values in milliseconds :param as_PIL: return :py:class:`PIL.Image.Image` (RGB) instead of :py:class:`~numpy.ndarray` (BGR) - :returns: frames in the same order (and with the same multiplicity) as + :returns: images in the same order (and with the same multiplicity) as ``timepoints_ms`` :rtype: list """ @@ -231,20 +231,6 @@ def _emit(frame, t_ms): return [result_map[t] for t in original_timepoints if t in result_map] -def _get_mid_framenum(mmif: Mmif, time_frame: Annotation) -> int: - """ - Calculates the middle frame number of a time interval annotation. - - :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance - :param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance that holds a time interval annotation (``"@type": ".../TimeFrame/..."``) - :return: middle frame number as an integer - """ - timeunit = time_frame.get_property('timeUnit') - video_document = mmif[time_frame.get_property('document')] - fps = get_framerate(video_document) - return int(convert(time_frame.get_property('start') + time_frame.get_property('end'), timeunit, 'frame', fps) // 2) - - def _tp_ids_to_timepoints_ms(mmif: Mmif, tp_ids: List[str]) -> List[int]: """ Converts a list of timepoint annotation IDs to media-timeline timepoints in milliseconds. @@ -303,93 +289,114 @@ def _timeframe_to_timepoint_range_ms( return int(round(start)), int(round(end)) -def _sample_all_timepoints_ms(mmif: Mmif, time_frame: Annotation) -> List[int]: +def _sample_all_timepoint_pairs_ms( + mmif: Mmif, time_frame: Annotation +) -> List[Tuple[int, Optional[str]]]: """ - Samples all timepoints (ms) from a TimeFrame. Uses all ``targets`` if - present, otherwise samples the start/end interval of the time_frameat - the stream's average frame rate, resulting one sample per second. + Samples all (timepoint_ms, source_id) pairs from a TimeFrame. Uses all + ``targets`` if present (source is the TP annotation id), otherwise + samples the start/end interval at the stream's average frame rate + (source is None for each sampled point). :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance :param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance of a TimeFrame - :return: list of timepoint values in ms + :return: list of (ms, source_id) pairs; source_id is ``None`` when + the point came from interval sampling rather than a TP :rtype: list """ if 'targets' in time_frame.properties: - return _tp_ids_to_timepoints_ms( - mmif, time_frame.get_property('targets')) + target_ids = time_frame.get_property('targets') + ms_list = _tp_ids_to_timepoints_ms(mmif, target_ids) + return list(zip(ms_list, target_ids)) start_ms, end_ms = _timeframe_to_timepoint_range_ms(mmif, time_frame) video_document = _resolve_video_document(mmif, time_frame) fps = get_framerate(video_document) step_ms = 1000.0 / fps - return sample_timepoints(start_ms, end_ms, step_ms) + return [(t, None) for t in sample_timepoints(start_ms, end_ms, step_ms)] -def _sample_representatives_timepoints_ms( +def _sample_representatives_timepoint_pairs_ms( mmif: Mmif, time_frame: Annotation -) -> List[int]: +) -> List[Tuple[int, str]]: """ - Samples timepoints (ms) from a TimeFrame's representatives. Returns an - empty list if ``representatives`` is not present (skips the TimeFrame). + Samples (timepoint_ms, source_id) pairs from a TimeFrame's + representatives. Returns an empty list if ``representatives`` is not + present (skips the TimeFrame). Source is always the rep TP id. :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance :param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance of a TimeFrame - :return: list of timepoint values in ms (empty if no representatives) + :return: list of (ms, rep_id) pairs (empty if no representatives) :rtype: list """ if 'representatives' in time_frame.properties: reps = time_frame.get_property('representatives') if reps: - return _tp_ids_to_timepoints_ms(mmif, reps) + ms_list = _tp_ids_to_timepoints_ms(mmif, reps) + return list(zip(ms_list, reps)) return [] -def _sample_single_timepoint_ms( +def _sample_single_timepoint_pair_ms( mmif: Mmif, time_frame: Annotation -) -> List[int]: +) -> List[Tuple[int, Optional[str]]]: """ - Samples a single timepoint (ms) from a TimeFrame. Uses the middle - representative if ``representatives`` is present, otherwise the - midpoint of the start/end interval. + Samples a single (timepoint_ms, source_id) pair from a TimeFrame. + Uses the middle representative if ``representatives`` is present + (source is the rep TP id), otherwise falls back to the midpoint of + the start/end interval (source is None). :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance :param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance of a TimeFrame - :return: list containing a single timepoint value in ms + :return: single-element list ``[(ms, source_id)]``; source_id is + ``None`` when the point came from the interval midpoint fallback :rtype: list """ if 'representatives' in time_frame.properties: reps = time_frame.get_property('representatives') if reps: mid = reps[len(reps) // 2] - return _tp_ids_to_timepoints_ms(mmif, [mid]) + ms_list = _tp_ids_to_timepoints_ms(mmif, [mid]) + return [(ms_list[0], mid)] start_ms, end_ms = _timeframe_to_timepoint_range_ms(mmif, time_frame) - return [(start_ms + end_ms) // 2] + return [((start_ms + end_ms) // 2, None)] -def extract_target_frames( - mmif: Mmif, - annotation: Annotation, - min_timepoints: int = 0, - max_timepoints: int = sys.maxsize, - fraction: float = 1.0, +def extract_images_by_count_with_sources( + mmif: Mmif, + annotation: Annotation, + min_timepoints: int = 0, + max_timepoints: int = sys.maxsize, + fraction: float = 1.0, as_PIL: bool = False ) -> Tuple[List, List[str]]: """ - Extracts frames corresponding to the timepoints listed in the ``targets`` property of an annotation. - Selection of timepoints is based on minimum, maximum, and fraction of targets to include. + Extracts images at a count-controlled subset of the timepoints listed + in the ``targets`` property of an annotation, alongside the IDs of the + selected target TPs. + + The number of timepoints chosen is ``max(min_timepoints, + int(num_targets * fraction))``, clamped to ``max_timepoints`` and to + the number of available targets. The chosen indices are spread evenly + across the target list. :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance - :param annotation: :py:class:`~mmif.serialize.annotation.Annotation` instance containing a ``targets`` property + :param annotation: :py:class:`~mmif.serialize.annotation.Annotation` + instance containing a ``targets`` property :param min_timepoints: minimum number of timepoints to include :param max_timepoints: maximum number of timepoints to include :param fraction: fraction of targets to include (ideally) - :param as_PIL: return :py:class:`~PIL.Image.Image` instead of :py:class:`~numpy.ndarray` - :return: a tuple containing (list of frames, list of selected target IDs) + :param as_PIL: return :py:class:`~PIL.Image.Image` instead of + :py:class:`~numpy.ndarray` + :return: tuple of (list of images, list of selected target TP IDs); + the two lists are parallel + :rtype: tuple """ if 'targets' not in annotation.properties: - raise ValueError(f'Annotation {annotation.id} does not have a "targets" property.') + raise ValueError( + f'Annotation {annotation.id} does not have a "targets" property.') targets = annotation.get_property('targets') num_targets = len(targets) @@ -404,47 +411,126 @@ def extract_target_frames( if count == 1: indices = [num_targets // 2] else: - indices = [int(i * (num_targets - 1) / (count - 1)) for i in range(count)] + indices = [int(i * (num_targets - 1) / (count - 1)) + for i in range(count)] selected_target_ids = [targets[i] for i in indices] timepoints_ms = _tp_ids_to_timepoints_ms(mmif, selected_target_ids) video_doc = _resolve_video_document(mmif, annotation) - images = extract_timepoints_as_images(video_doc, timepoints_ms, as_PIL=as_PIL) + images = extract_images_from_timepoints( + video_doc, timepoints_ms, as_PIL=as_PIL) return images, selected_target_ids -def extract_frames_by_mode( +def extract_images_by_count( + mmif: Mmif, + annotation: Annotation, + min_timepoints: int = 0, + max_timepoints: int = sys.maxsize, + fraction: float = 1.0, + as_PIL: bool = False + ) -> List: + """ + Extracts images at a count-controlled subset of the timepoints listed + in the ``targets`` property of an annotation. See + :py:func:`extract_images_by_count_with_sources` for selection details + and for a variant that also returns the IDs of the selected target TPs. + + :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance + :param annotation: :py:class:`~mmif.serialize.annotation.Annotation` + instance containing a ``targets`` property + :param min_timepoints: minimum number of timepoints to include + :param max_timepoints: maximum number of timepoints to include + :param fraction: fraction of targets to include (ideally) + :param as_PIL: return :py:class:`~PIL.Image.Image` instead of + :py:class:`~numpy.ndarray` + :return: list of images + :rtype: list + """ + images, _ = extract_images_by_count_with_sources( + mmif, annotation, + min_timepoints=min_timepoints, + max_timepoints=max_timepoints, + fraction=fraction, + as_PIL=as_PIL, + ) + return images + + +def extract_images_by_mode_with_sources( mmif: Mmif, time_frame: Annotation, mode: Union[SamplingMode, None] = None, - as_PIL: bool = False -) -> List: + as_PIL: bool = False, +) -> Tuple[List, List[Union[str, int]]]: """ - Extracts frames from a TimeFrame annotation based on a - sampling mode. If ``mode`` is not specified, uses the - context-level default (set via + Extracts images from a TimeFrame using a :py:class:`SamplingMode`, + alongside the per-image source: a TP annotation id (``str``) when the + image was selected from a TP (a representative or a target), or the + sampled timepoint in milliseconds (``int``) when a fallback path was + used (SINGLE with no representatives, or ALL with no targets). + + If ``mode`` is not specified, uses the context-level default (set via :py:data:`_sampling_mode` context variable). :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance :param time_frame: TimeFrame annotation to sample from - :param mode: :py:class:`SamplingMode`, or None to use - the context default - :param as_PIL: return PIL Images instead of ndarrays - :return: list of frames (may be empty for - ``REPRESENTATIVES`` mode when no representatives exist) + :param mode: :py:class:`SamplingMode`, or None to use the context + default + :param as_PIL: return :py:class:`PIL.Image.Image` instead of + :py:class:`~numpy.ndarray` + :return: tuple of (list of images, list of sources); the two lists + are parallel. May be ``([], [])`` for ``REPRESENTATIVES`` mode + when no representatives exist. + :rtype: tuple """ if mode is None: mode = _sampling_mode.get() if mode == SamplingMode.ALL: - timepoints_ms = _sample_all_timepoints_ms(mmif, time_frame) + pairs = _sample_all_timepoint_pairs_ms(mmif, time_frame) elif mode == SamplingMode.REPRESENTATIVES: - timepoints_ms = _sample_representatives_timepoints_ms(mmif, time_frame) + pairs = _sample_representatives_timepoint_pairs_ms(mmif, time_frame) else: - timepoints_ms = _sample_single_timepoint_ms(mmif, time_frame) - if not timepoints_ms: - return [] + pairs = _sample_single_timepoint_pair_ms(mmif, time_frame) + if not pairs: + return [], [] + timepoints_ms = [ms for ms, _ in pairs] + sources: List[Union[str, int]] = [ + tp_id if tp_id is not None else ms for ms, tp_id in pairs + ] video_doc = _resolve_video_document(mmif, time_frame) - return extract_timepoints_as_images(video_doc, timepoints_ms, as_PIL=as_PIL) + images = extract_images_from_timepoints( + video_doc, timepoints_ms, as_PIL=as_PIL) + return images, sources + + +def extract_images_by_mode( + mmif: Mmif, + time_frame: Annotation, + mode: Union[SamplingMode, None] = None, + as_PIL: bool = False, +) -> List: + """ + Extracts images from a TimeFrame using a :py:class:`SamplingMode`. + See :py:func:`extract_images_by_mode_with_sources` for the variant + that also returns the per-image source IDs / timepoints. + + If ``mode`` is not specified, uses the context-level default (set via + :py:data:`_sampling_mode` context variable). + + :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance + :param time_frame: TimeFrame annotation to sample from + :param mode: :py:class:`SamplingMode`, or None to use the context + default + :param as_PIL: return :py:class:`PIL.Image.Image` instead of + :py:class:`~numpy.ndarray` + :return: list of images (may be empty for ``REPRESENTATIVES`` mode + when no representatives exist) + :rtype: list + """ + images, _ = extract_images_by_mode_with_sources( + mmif, time_frame, mode=mode, as_PIL=as_PIL) + return images def sample_timepoints( @@ -541,7 +627,7 @@ def capture(video_document: Document): def extract_frames_as_images(video_document: Document, framenums: Iterable[int], as_PIL: bool = False, record_ffmpeg_errors: bool = False): """ .. deprecated:: - Use :py:func:`extract_timepoints_as_images` instead. See issue #379. + Use :py:func:`extract_images_from_timepoints` instead. See issue #379. Extracts frames from a video document as a list of :py:class:`numpy.ndarray`. Use with :py:func:`sample_frames` function to get the list of frame numbers first. @@ -554,7 +640,7 @@ def extract_frames_as_images(video_document: Document, framenums: Iterable[int], """ warnings.warn( f'extract_frames_as_images() is deprecated; use ' - f'extract_timepoints_as_images() instead. {_PTS_BUG_NOTICE}', + f'extract_images_from_timepoints() instead. {_PTS_BUG_NOTICE}', DeprecationWarning, stacklevel=2, ) cv2 = _check_cv_dep('cv2') @@ -601,6 +687,21 @@ def extract_frames_as_images(video_document: Document, framenums: Iterable[int], return [unique_frames[f] for f in original_framenums if f in unique_frames] +def _get_mid_framenum(mmif: Mmif, time_frame: Annotation) -> int: + """ + Calculates the middle frame number of a time interval annotation. + Used internally by deprecated helpers below. + + :param mmif: :py:class:`~mmif.serialize.mmif.Mmif` instance + :param time_frame: :py:class:`~mmif.serialize.annotation.Annotation` instance that holds a time interval annotation (``"@type": ".../TimeFrame/..."``) + :return: middle frame number as an integer + """ + timeunit = time_frame.get_property('timeUnit') + video_document = mmif[time_frame.get_property('document')] + fps = get_framerate(video_document) + return int(convert(time_frame.get_property('start') + time_frame.get_property('end'), timeunit, 'frame', fps) // 2) + + def get_mid_framenum(mmif: Mmif, time_frame: Annotation) -> int: """ .. deprecated:: @@ -772,7 +873,7 @@ def framenum_to_millisecond(video_doc: Document, frame: int): def second_to_framenum(video_doc: Document, second) -> int: """ .. deprecated:: - Use :py:func:`extract_timepoints_as_images` or stay in the time + Use :py:func:`extract_images_from_timepoints` or stay in the time domain. See issue #379. """ warnings.warn( @@ -786,7 +887,7 @@ def second_to_framenum(video_doc: Document, second) -> int: def millisecond_to_framenum(video_doc: Document, millisecond: float) -> int: """ .. deprecated:: - Use :py:func:`extract_timepoints_as_images` or stay in the time + Use :py:func:`extract_images_from_timepoints` or stay in the time domain. See issue #379. """ warnings.warn( @@ -795,3 +896,44 @@ def millisecond_to_framenum(video_doc: Document, millisecond: float) -> int: ) fps = get_framerate(video_doc) return int(convert(millisecond, 'ms', 'f', fps)) + + +def extract_timepoints_as_images(*args, **kwargs): + """ + .. deprecated:: + Renamed to :py:func:`extract_images_from_timepoints`. + """ + warnings.warn( + 'extract_timepoints_as_images() is deprecated; ' + 'use extract_images_from_timepoints() instead.', + DeprecationWarning, stacklevel=2, + ) + return extract_images_from_timepoints(*args, **kwargs) + + +def extract_target_frames(*args, **kwargs): + """ + .. deprecated:: + Renamed to :py:func:`extract_images_by_count_with_sources`. + For a bare-images variant, use :py:func:`extract_images_by_count`. + """ + warnings.warn( + 'extract_target_frames() is deprecated; ' + 'use extract_images_by_count_with_sources() instead.', + DeprecationWarning, stacklevel=2, + ) + return extract_images_by_count_with_sources(*args, **kwargs) + + +def extract_frames_by_mode(*args, **kwargs): + """ + .. deprecated:: + Renamed to :py:func:`extract_images_by_mode`. For per-image source + IDs, use :py:func:`extract_images_by_mode_with_sources`. + """ + warnings.warn( + 'extract_frames_by_mode() is deprecated; ' + 'use extract_images_by_mode() instead.', + DeprecationWarning, stacklevel=2, + ) + return extract_images_by_mode(*args, **kwargs) diff --git a/tests/test_utils.py b/tests/test_utils.py index fff35331..0f17be05 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -195,18 +195,18 @@ def test_sample_timepoints(self): with pytest.raises(ValueError): vdh.sample_timepoints(0, 100, -10) - def test_extract_timepoints_as_images(self): + def test_extract_images_from_timepoints(self): # basic: three distinct timepoints ms_list = [1000, 2000, 3000] - imgs = vdh.extract_timepoints_as_images( + imgs = vdh.extract_images_from_timepoints( self.video_doc, ms_list, as_PIL=False) self.assertEqual(3, len(imgs)) # empty input self.assertEqual( - [], vdh.extract_timepoints_as_images(self.video_doc, [])) + [], vdh.extract_images_from_timepoints(self.video_doc, [])) # duplicates preserved in input order dup_ms = [500, 250, 500, 750, 250] - dup_imgs = vdh.extract_timepoints_as_images(self.video_doc, dup_ms) + dup_imgs = vdh.extract_images_from_timepoints(self.video_doc, dup_ms) self.assertEqual(5, len(dup_imgs)) def _make_timepoints(self, count): @@ -222,27 +222,31 @@ def _make_timepoints(self, count): tps.append(tp) return tps - def test_sample_all_timepoints_ms(self): + def test_sample_all_timepoint_pairs_ms(self): tps = self._make_timepoints(10) parent = self.a_view.new_annotation( AnnotationTypes.TimeFrame, aid='tf_0', targets=[tp.id for tp in tps]) - ms_list = vdh._sample_all_timepoints_ms(self.mmif_obj, parent) - self.assertEqual([i * 100 for i in range(10)], ms_list) + pairs = vdh._sample_all_timepoint_pairs_ms(self.mmif_obj, parent) + # source IDs are the target TP ids; ms values match timePoints + self.assertEqual( + [(i * 100, tp.id) for i, tp in enumerate(tps)], pairs) - # start/end fallback (no targets): sampled at the stream's frame rate + # start/end fallback (no targets): sampled at the stream's frame + # rate; source is None for each sampled point parent2 = self.a_view.new_annotation( AnnotationTypes.TimeFrame, aid='tf_1', start=0, end=1000, timeUnit='milliseconds', document=self.video_doc.id) - ms_list2 = vdh._sample_all_timepoints_ms(self.mmif_obj, parent2) + pairs2 = vdh._sample_all_timepoint_pairs_ms(self.mmif_obj, parent2) # 30 frames in 1000ms at 29.97fps (step ≈ 33.37ms) - self.assertEqual(30, len(ms_list2)) - self.assertEqual(0, ms_list2[0]) - self.assertLess(ms_list2[-1], 1000) + self.assertEqual(30, len(pairs2)) + self.assertEqual((0, None), pairs2[0]) + self.assertTrue(all(src is None for _, src in pairs2)) + self.assertLess(pairs2[-1][0], 1000) - def test_sample_representatives_timepoints_ms(self): + def test_sample_representatives_timepoint_pairs_ms(self): tps = self._make_timepoints(10) reps = [tps[2].id, tps[5].id, tps[8].id] parent = self.a_view.new_annotation( @@ -250,19 +254,20 @@ def test_sample_representatives_timepoints_ms(self): targets=[tp.id for tp in tps], representatives=reps) - ms_list = vdh._sample_representatives_timepoints_ms( + pairs = vdh._sample_representatives_timepoint_pairs_ms( self.mmif_obj, parent) - self.assertEqual([200, 500, 800], ms_list) + self.assertEqual( + [(200, reps[0]), (500, reps[1]), (800, reps[2])], pairs) # no representatives → empty (skip) parent2 = self.a_view.new_annotation( AnnotationTypes.TimeFrame, aid='tf_1', targets=[tp.id for tp in tps]) self.assertEqual( - [], vdh._sample_representatives_timepoints_ms( + [], vdh._sample_representatives_timepoint_pairs_ms( self.mmif_obj, parent2)) - def test_sample_single_timepoint_ms(self): + def test_sample_single_timepoint_pair_ms(self): tps = self._make_timepoints(10) reps = [tps[2].id, tps[5].id, tps[8].id] parent = self.a_view.new_annotation( @@ -270,19 +275,120 @@ def test_sample_single_timepoint_ms(self): targets=[tp.id for tp in tps], representatives=reps) - # middle representative (index 1 of 3 → tps[5] → 500ms) + # middle representative (index 1 of 3 → tps[5] → 500ms, source=reps[1]) self.assertEqual( - [500], - vdh._sample_single_timepoint_ms(self.mmif_obj, parent)) + [(500, reps[1])], + vdh._sample_single_timepoint_pair_ms(self.mmif_obj, parent)) - # start/end fallback midpoint + # start/end fallback midpoint: source is None parent2 = self.a_view.new_annotation( AnnotationTypes.TimeFrame, aid='tf_1', start=100, end=500, timeUnit='milliseconds', document=self.video_doc.id) self.assertEqual( - [300], - vdh._sample_single_timepoint_ms(self.mmif_obj, parent2)) + [(300, None)], + vdh._sample_single_timepoint_pair_ms(self.mmif_obj, parent2)) + + def test_extract_images_by_count(self): + tps = self._make_timepoints(10) + parent = self.a_view.new_annotation( + AnnotationTypes.TimeFrame, aid='tf_0', + targets=[tp.id for tp in tps]) + + # bare form: returns just images + imgs = vdh.extract_images_by_count(self.mmif_obj, parent, fraction=0.3) + self.assertEqual(3, len(imgs)) + + # missing 'targets' raises ValueError (bare delegates through + # _with_sources) + no_targets = self.a_view.new_annotation( + AnnotationTypes.TimeFrame, aid='tf_no', + start=0, end=100, timeUnit='milliseconds', + document=self.video_doc.id) + with pytest.raises(ValueError): + vdh.extract_images_by_count(self.mmif_obj, no_targets) + + def test_extract_images_by_count_with_sources(self): + tps = self._make_timepoints(10) + parent = self.a_view.new_annotation( + AnnotationTypes.TimeFrame, aid='tf_0', + targets=[tp.id for tp in tps]) + + imgs, ids = vdh.extract_images_by_count_with_sources( + self.mmif_obj, parent, fraction=0.5) + self.assertEqual(len(imgs), len(ids)) + self.assertEqual(5, len(imgs)) + # source IDs are a subset of the input targets, in order + self.assertEqual(ids, sorted(ids, key=[t.id for t in tps].index)) + for tid in ids: + self.assertIn(tid, [t.id for t in tps]) + + # empty targets → empty parallel lists + empty = self.a_view.new_annotation( + AnnotationTypes.TimeFrame, aid='tf_empty', targets=[]) + self.assertEqual( + ([], []), + vdh.extract_images_by_count_with_sources(self.mmif_obj, empty)) + + def test_extract_images_by_mode_with_sources_reps(self): + tps = self._make_timepoints(10) + reps = [tps[2].id, tps[5].id, tps[8].id] + parent = self.a_view.new_annotation( + AnnotationTypes.TimeFrame, aid='tf_0', + targets=[tp.id for tp in tps], + representatives=reps) + + imgs, sources = vdh.extract_images_by_mode_with_sources( + self.mmif_obj, parent, mode=vdh.SamplingMode.REPRESENTATIVES) + self.assertEqual(3, len(imgs)) + # all sources are TP IDs (str), not ms ints + self.assertEqual(reps, sources) + self.assertTrue(all(isinstance(s, str) for s in sources)) + + def test_extract_images_by_mode_with_sources_single_fallback(self): + # SINGLE mode without representatives → midpoint fallback, + # source is the midpoint ms (int) + parent = self.a_view.new_annotation( + AnnotationTypes.TimeFrame, aid='tf_0', + start=100, end=500, timeUnit='milliseconds', + document=self.video_doc.id) + imgs, sources = vdh.extract_images_by_mode_with_sources( + self.mmif_obj, parent, mode=vdh.SamplingMode.SINGLE) + self.assertEqual(1, len(imgs)) + self.assertEqual([300], sources) + self.assertTrue(all(isinstance(s, int) for s in sources)) + + def test_extract_images_by_mode_with_sources_all_fallback(self): + # ALL mode without targets → stream-rate sampling, sources are + # the sampled ms ints + parent = self.a_view.new_annotation( + AnnotationTypes.TimeFrame, aid='tf_0', + start=0, end=200, timeUnit='milliseconds', + document=self.video_doc.id) + imgs, sources = vdh.extract_images_by_mode_with_sources( + self.mmif_obj, parent, mode=vdh.SamplingMode.ALL) + self.assertEqual(len(imgs), len(sources)) + self.assertGreater(len(imgs), 0) + self.assertTrue(all(isinstance(s, int) for s in sources)) + # the sources match the sampled ms values + self.assertEqual(sorted(sources), sources) + + def test_renamed_helpers_emit_deprecation_warnings(self): + # The three pre-rename names live as deprecation aliases that + # delegate to the new names. + with pytest.warns(DeprecationWarning, match='extract_images_from_timepoints'): + vdh.extract_timepoints_as_images(self.video_doc, [500]) + + tps = self._make_timepoints(4) + parent = self.a_view.new_annotation( + AnnotationTypes.TimeFrame, aid='tf_dep', + targets=[tp.id for tp in tps]) + with pytest.warns(DeprecationWarning, match='extract_images_by_count_with_sources'): + vdh.extract_target_frames(self.mmif_obj, parent) + + with pytest.warns(DeprecationWarning, match='extract_images_by_mode'): + vdh.extract_frames_by_mode( + self.mmif_obj, parent, mode=vdh.SamplingMode.ALL) def test_pts_offset_regression(self): # regression for https://github.com/clamsproject/mmif-python/issues/379 @@ -314,7 +420,7 @@ def test_pts_offset_regression(self): # requested 33ms should resolve to the actual PTS-equivalent frame # (start_time is ~33ms; the first frame's PTS is nearest 33ms) - imgs = vdh.extract_timepoints_as_images(vd, [33], as_PIL=False) + imgs = vdh.extract_images_from_timepoints(vd, [33], as_PIL=False) self.assertEqual(1, len(imgs)) got_pts = pts_by_hash.get(hash(imgs[0].tobytes())) self.assertIsNotNone(got_pts)