from pathlib import Path
import attrs
import cv2
import logging
import numpy as np
import polars as pl
import re
import sys
from polars.testing import assert_frame_equal
from tifffile import tifffile
from tqdm import tqdm
from typing import ClassVar, Callable, Any, Literal, TypedDict
from typing import Self
from neuralib.io import csv_header
from neuralib.typing import PathLike
from neuralib.util.utils import keys_with_value
from neuralib.util.verbose import fprint
__all__ = ['SequenceLabeller']
Logger = logging.getLogger(__name__)
# ================ #
# KeyBoard Mapping #
# ================ #
class KeyMapping(TypedDict, total=False):
"""For controlling the keyboard in different OS"""
escape: int
backspace: int
# space: int # labeller specific printable for notes
enter: int
left_square_bracket: int # [
right_square_bracket: int # ]
left: int
right: int
up: int
down: int
plus: int # +
minus: int # -
equal: int # =
#
COMMON_KEYMAPPING: KeyMapping = {
'escape': 27,
# 'space': 32,
'enter': 13,
'left_square_bracket': 91,
'right_square_bracket': 93,
# ord
'plus': ord('+'),
'minus': ord('-'),
'equal': ord('=')
}
WIN_KEYMAPPING: KeyMapping = {
**COMMON_KEYMAPPING,
'backspace': 8,
'left': 2424832,
'right': 2555904,
'up': 2490368,
'down': 2621440
}
MAC_KEYMAPPING: KeyMapping = {
**COMMON_KEYMAPPING,
'backspace': 127,
'left': 2,
'right': 3,
'up': 0,
'down': 1
}
LINUX_KEYMAPPING: KeyMapping = {
**COMMON_KEYMAPPING,
'backspace': 8,
'left': 81,
'right': 83,
'up': 82,
'down': 84
}
def get_keymapping() -> KeyMapping:
p = sys.platform
if p in ('linux', 'linux2'):
return LINUX_KEYMAPPING
elif p == 'darwin':
return MAC_KEYMAPPING
elif p == 'win32':
return WIN_KEYMAPPING
@attrs.define
class FrameInfo:
filename: str
"""name of the an image/frame"""
image: np.ndarray
"""`Array[uint, [H, W]|[H, W, 3])`"""
notes: str | None = attrs.field(default=None)
"""notes for the image"""
@property
def itype(self) -> Literal['gray', 'rgb']:
"""image color type"""
if self.image.ndim == 2:
return 'gray'
elif self.image.ndim == 3:
return 'rgb'
else:
raise TypeError('')
@property
def text_color(self) -> int | tuple[int, int, int]:
if self.itype == 'gray':
return 2 ** 16 - 1
elif self.itype == 'rgb':
return 0, 0, 255
else:
raise RuntimeError('')
@property
def height(self) -> int:
return self.image.shape[0]
@property
def width(self) -> int:
return self.image.shape[1]
class CloseSaveInterrupt(KeyboardInterrupt):
"""write & quiet triggered"""
def __init__(self, mode: Literal[':wq', ':q', ':q!']):
self.mode = mode
[docs]
class SequenceLabeller:
window_title: ClassVar[str] = 'SeqLabeller'
[docs]
def __init__(self, seqs_info: list[FrameInfo],
output: PathLike | None = None):
self.seqs_info = seqs_info
self.output = output # for notes
self.message_queue: list[str] = []
self.buffer = '' # input buffer
self._frame_index = 0
def __len__(self) -> int:
return len(self.seqs_info)
[docs]
@classmethod
def load_sequences(cls, seqs: np.ndarray | list[np.ndarray],
filenames: list[str] | None = None,
output: PathLike | None = None) -> Self:
"""
:param seqs:
:param filenames:
:param output:
:return:
"""
if isinstance(seqs, np.ndarray):
seqs = list(seqs)
n_frames = len(seqs)
if filenames is None:
filenames = np.arange(n_frames).astype(str)
seqs_info = [FrameInfo(filenames[i], seqs[i], None) for i in range(n_frames)]
return SequenceLabeller(seqs_info, Path(output) if output is not None else None)
[docs]
@classmethod
def load_from_dir(cls, directory: PathLike,
file_suffix: str = '.tif',
sort_func: Callable[[Path], Any] | None = None,
single_frame_per_file: bool = True,
output: PathLike | None = None) -> Self:
"""
:param directory: directory contain image sequences
:param file_suffix: sequence file suffix
:param sort_func: sorted function with signature `(filename:Path) -> Comparable`
:param single_frame_per_file:
:param output:
:return:
"""
if not Path(directory).is_dir():
raise NotADirectoryError(f'{directory}')
files = sorted(list(directory.glob(f'*{file_suffix}')), key=sort_func)
if len(files) == 0:
raise FileNotFoundError('')
else:
fprint(f'LOAD image sequence: {len(files)} files', vtype='io')
seqs = []
for f in tqdm(files, unit='file', ncols=80):
if file_suffix == '.pdf':
from neuralib.imglib.io import read_pdf
img = read_pdf(f, dpi=200)
seqs.append(img)
else:
if single_frame_per_file:
img = cv2.imread(str(f))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
seqs.append(img)
elif not single_frame_per_file and file_suffix in ('.tif', '.tiff'):
seqs.append(tifffile.imread(str(f)))
else:
raise NotImplementedError('')
filenames = [it.stem for it in files]
seqs_info = [FrameInfo(filenames[i], seqs[i], None) for i in range(len(seqs))]
return SequenceLabeller(seqs_info, Path(output) if output is not None else None)
@property
def n_frames(self) -> int:
"""aka. number of images"""
return len(self.seqs_info)
@property
def current_frame_index(self) -> int:
return self._frame_index
@current_frame_index.setter
def current_frame_index(self, value: int):
n = len(self.seqs_info)
self._frame_index = (value + n) % n
self.message_queue = []
info = self.seqs_info[self._frame_index]
if info.filename is not None:
self.enqueue_message(f'{info.filename}')
if (note := self.read_note()) is not None:
self.enqueue_message(note)
@property
def text_color(self) -> float | tuple[int, int, int]:
return self.seqs_info[self.current_frame_index].text_color
# ===== #
# Notes #
# ===== #
[docs]
def save_note(self):
"""save image-related notes to file"""
from datetime import datetime
fields = ['filename', 'notes', 'datetime']
t = datetime.now().replace(second=0, microsecond=0).strftime("%Y-%m-%d %H:%M") # TODO to frame dependent
with csv_header(self.output, fields, quotes_header='notes') as csv:
for seq in self.seqs_info:
csv(str(seq.filename), seq.notes, t)
_prev_note: pl.DataFrame | None = None # for checking changes
[docs]
def load_note(self):
"""read image-related notes from file"""
from neuralib.util.verbose import printdf
df = self._prev_note = pl.read_csv(self.output, schema_overrides={'filename': pl.Utf8})
printdf(df)
for i, info in enumerate(self.seqs_info):
note = df.filter(pl.col('filename') == info.filename)['notes'].item()
self.seqs_info[i].notes = note
[docs]
def write_note(self, note: str, *, append_mode: bool = False):
if append_mode:
prev = self.seqs_info[self.current_frame_index].notes
self.seqs_info[self.current_frame_index].notes = prev + ';' + note
else:
self.seqs_info[self.current_frame_index].notes = note
self.current_frame_index = self.current_frame_index # trigger enqueue_message
if self.output is None:
self.enqueue_message('specify output first for writing notes!')
[docs]
def read_note(self) -> str | None:
return self.seqs_info[self._frame_index].notes
[docs]
def clear_note(self):
self.seqs_info[self.current_frame_index].notes = None
[docs]
def check_note_changes(self) -> bool:
"""True if any changes in notes"""
if self._prev_note is None:
if any([seq.notes is not None for seq in self.seqs_info]):
return True
else:
prev_note = self._prev_note.select('filename', 'notes')
cur_note = pl.DataFrame([
[str(seq.filename), seq.notes]
for seq in self.seqs_info
], schema=['filename', 'notes'], orient='row')
try:
assert_frame_equal(prev_note, cur_note)
except AssertionError:
return True
return False
# ============= #
# Key & Command #
# ============= #
[docs]
def goto_begin(self):
self.current_frame_index = 0
[docs]
def goto_end(self):
self.current_frame_index = self.n_frames - 1
[docs]
def go_to(self, i: int):
if i > len(self) or i < 0:
self.enqueue_message(f'invalid sequence index: {i}')
return
self.current_frame_index = i
[docs]
def handle_keycode(self, k: int):
mapping = get_keymapping()
ret = self._handle_keymapping(mapping, k)
if ret is not None: # printable
self.buffer += chr(k)
def _handle_keymapping(self, mapping: KeyMapping, value: int) -> int | None:
"""
Handling the keyboard mapping
:param mapping:
:param value:
:return: int value if cannot find key in keymapping, otherwise return None
"""
try:
ret = keys_with_value(mapping, value, to_item=True)
except (KeyError, ValueError):
return value
else:
if ret == 'left':
self.current_frame_index -= 1
elif ret == 'right':
self.current_frame_index += 1
elif ret == 'left_square_bracket':
self.current_frame_index += 10
elif ret == 'right_square_bracket':
self.current_frame_index -= 10
elif ret == 'backspace':
if len(self.buffer) > 0:
self.buffer = self.buffer[:-1]
elif ret == 'enter': # handle command in buffer
command = self._proc_image_command = self.buffer
self.buffer = ''
try:
self.handle_command(command)
except KeyboardInterrupt:
raise
except BaseException as e:
self.enqueue_message(f'command "{command}" {type(e).__name__}: {e}')
elif ret == 'escape':
self.buffer = ''
[docs]
def handle_command(self, command: str):
Logger.debug(f'command: {command}')
if command == ':h':
self.enqueue_message(':h : print this document')
self.enqueue_message(':q : quit (unable if not save the changes)')
self.enqueue_message(':q! : quit (without save)')
self.enqueue_message(':wq : save notes and quit')
self.enqueue_message(':c : clear current note')
self.enqueue_message(':i : print current file index')
self.enqueue_message(':N : goto N-th image')
self.enqueue_message('+message : append note')
self.enqueue_message('message : (replace) note')
elif command == ':c':
self.enqueue_message(f'clear notes: {self.seqs_info[self.current_frame_index].notes}')
self.clear_note()
elif command == ':i':
self.enqueue_message(f'current file index: {self.current_frame_index}')
elif re.match(r'^:(\d)', command):
match = re.search(r'^:(\d)', command)
self.go_to(int(match.group(1)))
elif command.startswith('+'):
self.write_note(command[1:], append_mode=True)
elif not command.startswith(':'):
self.write_note(command)
elif command in (':wq', ':q', ':q!'):
raise CloseSaveInterrupt(command)
else:
raise RuntimeError(f'unknown command : "{command}"')
# ============ #
# Msg / Buffer #
# ============ #
[docs]
def enqueue_message(self, text: str):
self.message_queue.append(text)
def _show_queued_message(self, image: np.ndarray):
"""drawing enqueued message"""
y = 70
s = 30
i = 0
while i < len(self.message_queue):
m = self.message_queue[i]
cv2.putText(image, m, (10, y), cv2.FONT_HERSHEY_SIMPLEX, 1, self.text_color, 2, cv2.LINE_AA)
i += 1
y += s
def _show_buffer(self, image):
"""drawing input buffer content"""
cv2.putText(image, self.buffer, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, self.text_color, 2, cv2.LINE_AA)
# ========= #
# Main Loop #
# ========= #
[docs]
def main(self):
"""main loop for the GUI"""
cv2.namedWindow(self.window_title, cv2.WINDOW_GUI_NORMAL)
if self.output is not None:
if self.output.exists():
self.load_note()
try:
while True:
try:
while True:
self._loop()
except CloseSaveInterrupt as e:
if e.mode == ':wq':
if self.output is not None:
self.save_note()
fprint(f'SAVE csv -> {str(self.output)}!', vtype='io')
break
elif e.mode == ':q!':
break
elif e.mode == ':q':
if self.check_note_changes():
self.enqueue_message('please save the note using :wq, or force quit using :q!')
continue
else:
break
finally:
cv2.destroyWindow(self.window_title)
def _loop(self):
#
try:
info = self.seqs_info[self.current_frame_index]
except IndexError:
pass
else:
image = info.image.copy()
if len(self.buffer):
self._show_buffer(image)
self._show_queued_message(image)
cv2.imshow(self.window_title, image)
#
if sys.platform in ('darwin', 'linux', 'linux2'):
k = cv2.waitKey(1)
elif sys.platform == 'win32':
k = cv2.waitKeyEx(1)
else:
raise RuntimeError('')
if k >= 0:
self.handle_keycode(k)
# ============= #
# Main Argparse #
# ============= #
def main():
import argparse
ap = argparse.ArgumentParser(description='run the sequences labeller')
ap.add_argument('-D', '--dir', type=Path, required=True, help='path with image sequences', dest='directory')
ap.add_argument('--suffix', choices=('.pdf', '.tif', '.tiff'), default='.pdf', help='image sequence suffix')
ap.add_argument('-O', '--output', type=Path, default=None, help='csv output for note')
opt = ap.parse_args()
labeller = SequenceLabeller.load_from_dir(opt.directory,
file_suffix=opt.suffix,
output=opt.output)
labeller.main()
if __name__ == '__main__':
main()