Analyzers API Reference
video_processor.analyzers.diagram_analyzer
Diagram analysis using vision model classification and single-pass extraction.
DiagramAnalyzer
Vision model-based diagram detection and analysis.
Source code in video_processor/analyzers/diagram_analyzer.py
| class DiagramAnalyzer:
"""Vision model-based diagram detection and analysis."""
def __init__(
self,
provider_manager: Optional[ProviderManager] = None,
confidence_threshold: float = 0.3,
):
self.pm = provider_manager or ProviderManager()
self.confidence_threshold = confidence_threshold
def classify_frame(self, image_path: Union[str, Path]) -> dict:
"""
Classify a single frame using vision model.
Returns dict with is_diagram, diagram_type, confidence, brief_description.
"""
image_bytes = _read_image_bytes(image_path)
raw = self.pm.analyze_image(image_bytes, _CLASSIFY_PROMPT, max_tokens=512)
result = _parse_json_response(raw)
if result is None:
return {
"is_diagram": False,
"diagram_type": "unknown",
"confidence": 0.0,
"brief_description": "",
}
return result
def analyze_diagram_single_pass(self, image_path: Union[str, Path]) -> dict:
"""
Full single-pass diagram analysis — description, text, mermaid, chart data.
Returns parsed dict or empty dict on failure.
"""
image_bytes = _read_image_bytes(image_path)
raw = self.pm.analyze_image(image_bytes, _ANALYSIS_PROMPT, max_tokens=4096)
result = _parse_json_response(raw)
return result or {}
def caption_frame(self, image_path: Union[str, Path]) -> str:
"""Get a brief caption for a screengrab fallback."""
image_bytes = _read_image_bytes(image_path)
return self.pm.analyze_image(image_bytes, _CAPTION_PROMPT, max_tokens=256)
def process_frames(
self,
frame_paths: List[Union[str, Path]],
diagrams_dir: Optional[Path] = None,
captures_dir: Optional[Path] = None,
) -> Tuple[List[DiagramResult], List[ScreenCapture]]:
"""
Process a list of extracted frames: classify, analyze diagrams, screengrab fallback.
Thresholds:
- confidence >= 0.7 → full diagram analysis (story 3.2)
- 0.3 <= confidence < 0.7 → screengrab fallback (story 3.3)
- confidence < 0.3 → skip
Returns (diagrams, screen_captures).
"""
diagrams: List[DiagramResult] = []
captures: List[ScreenCapture] = []
diagram_idx = 0
capture_idx = 0
for i, fp in enumerate(tqdm(frame_paths, desc="Analyzing frames", unit="frame")):
fp = Path(fp)
logger.info(f"Classifying frame {i}/{len(frame_paths)}: {fp.name}")
try:
classification = self.classify_frame(fp)
except Exception as e:
logger.warning(f"Classification failed for frame {i}: {e}")
continue
confidence = float(classification.get("confidence", 0.0))
if confidence < self.confidence_threshold:
logger.debug(f"Frame {i}: confidence {confidence:.2f} below threshold, skipping")
continue
if confidence >= 0.7:
# Full diagram analysis
logger.info(
f"Frame {i}: diagram detected (confidence {confidence:.2f}), analyzing..."
)
try:
analysis = self.analyze_diagram_single_pass(fp)
except Exception as e:
logger.warning(
f"Diagram analysis failed for frame {i}: {e}, falling back to screengrab"
)
analysis = {}
if not analysis:
# Analysis failed — fall back to screengrab
capture = self._save_screengrab(fp, i, capture_idx, captures_dir, confidence)
captures.append(capture)
capture_idx += 1
continue
# Build DiagramResult
dtype = analysis.get("diagram_type", classification.get("diagram_type", "unknown"))
try:
diagram_type = DiagramType(dtype)
except ValueError:
diagram_type = DiagramType.unknown
# Normalize relationships: llava sometimes returns dicts instead of strings
raw_rels = analysis.get("relationships") or []
relationships = []
for rel in raw_rels:
if isinstance(rel, str):
relationships.append(rel)
elif isinstance(rel, dict):
src = rel.get("source", rel.get("from", "?"))
dst = rel.get("destination", rel.get("to", "?"))
label = rel.get("label", rel.get("relationship", ""))
relationships.append(
f"{src} -> {dst}: {label}" if label else f"{src} -> {dst}"
)
else:
relationships.append(str(rel))
# Normalize elements: llava may return dicts or nested lists
raw_elements = analysis.get("elements") or []
elements = []
for elem in raw_elements:
if isinstance(elem, str):
elements.append(elem)
elif isinstance(elem, dict):
name = elem.get("name", elem.get("element", ""))
etype = elem.get("type", elem.get("element_type", ""))
if name and etype:
elements.append(f"{etype}: {name}")
elif name:
elements.append(name)
else:
elements.append(json.dumps(elem))
elif isinstance(elem, list):
elements.extend(str(e) for e in elem)
else:
elements.append(str(elem))
# Normalize text_content: llava may return dict instead of string
raw_text = analysis.get("text_content")
if isinstance(raw_text, dict):
parts = []
for k, v in raw_text.items():
if isinstance(v, list):
parts.append(f"{k}: {', '.join(str(x) for x in v)}")
else:
parts.append(f"{k}: {v}")
text_content = "\n".join(parts)
elif isinstance(raw_text, list):
text_content = "\n".join(str(x) for x in raw_text)
else:
text_content = raw_text
try:
dr = DiagramResult(
frame_index=i,
diagram_type=diagram_type,
confidence=confidence,
description=analysis.get("description"),
text_content=text_content,
elements=elements,
relationships=relationships,
mermaid=analysis.get("mermaid"),
chart_data=analysis.get("chart_data"),
)
except Exception as e:
logger.warning(
f"DiagramResult validation failed for frame {i}: {e}, "
"falling back to screengrab"
)
capture = self._save_screengrab(fp, i, capture_idx, captures_dir, confidence)
captures.append(capture)
capture_idx += 1
continue
# Save outputs (story 3.4)
if diagrams_dir:
diagrams_dir.mkdir(parents=True, exist_ok=True)
prefix = f"diagram_{diagram_idx}"
# Original frame
img_dest = diagrams_dir / f"{prefix}.jpg"
shutil.copy2(fp, img_dest)
dr.image_path = f"diagrams/{prefix}.jpg"
# Mermaid source
if dr.mermaid:
mermaid_dest = diagrams_dir / f"{prefix}.mermaid"
mermaid_dest.write_text(dr.mermaid)
dr.mermaid_path = f"diagrams/{prefix}.mermaid"
# Analysis JSON
json_dest = diagrams_dir / f"{prefix}.json"
json_dest.write_text(dr.model_dump_json(indent=2))
diagrams.append(dr)
diagram_idx += 1
else:
# Screengrab fallback (0.3 <= confidence < 0.7)
logger.info(
f"Frame {i}: uncertain (confidence {confidence:.2f}), saving as screengrab"
)
capture = self._save_screengrab(fp, i, capture_idx, captures_dir, confidence)
captures.append(capture)
capture_idx += 1
logger.info(
f"Diagram processing complete: {len(diagrams)} diagrams, {len(captures)} screengrabs"
)
return diagrams, captures
def _save_screengrab(
self,
frame_path: Path,
frame_index: int,
capture_index: int,
captures_dir: Optional[Path],
confidence: float,
) -> ScreenCapture:
"""Save a frame as a captioned screengrab."""
caption = ""
try:
caption = self.caption_frame(frame_path)
except Exception as e:
logger.warning(f"Caption failed for frame {frame_index}: {e}")
sc = ScreenCapture(
frame_index=frame_index,
caption=caption,
confidence=confidence,
)
if captures_dir:
captures_dir.mkdir(parents=True, exist_ok=True)
prefix = f"capture_{capture_index}"
img_dest = captures_dir / f"{prefix}.jpg"
shutil.copy2(frame_path, img_dest)
sc.image_path = f"captures/{prefix}.jpg"
json_dest = captures_dir / f"{prefix}.json"
json_dest.write_text(sc.model_dump_json(indent=2))
return sc
|
analyze_diagram_single_pass(image_path)
Full single-pass diagram analysis — description, text, mermaid, chart data.
Returns parsed dict or empty dict on failure.
Source code in video_processor/analyzers/diagram_analyzer.py
| def analyze_diagram_single_pass(self, image_path: Union[str, Path]) -> dict:
"""
Full single-pass diagram analysis — description, text, mermaid, chart data.
Returns parsed dict or empty dict on failure.
"""
image_bytes = _read_image_bytes(image_path)
raw = self.pm.analyze_image(image_bytes, _ANALYSIS_PROMPT, max_tokens=4096)
result = _parse_json_response(raw)
return result or {}
|
caption_frame(image_path)
Get a brief caption for a screengrab fallback.
Source code in video_processor/analyzers/diagram_analyzer.py
| def caption_frame(self, image_path: Union[str, Path]) -> str:
"""Get a brief caption for a screengrab fallback."""
image_bytes = _read_image_bytes(image_path)
return self.pm.analyze_image(image_bytes, _CAPTION_PROMPT, max_tokens=256)
|
classify_frame(image_path)
Classify a single frame using vision model.
Returns dict with is_diagram, diagram_type, confidence, brief_description.
Source code in video_processor/analyzers/diagram_analyzer.py
| def classify_frame(self, image_path: Union[str, Path]) -> dict:
"""
Classify a single frame using vision model.
Returns dict with is_diagram, diagram_type, confidence, brief_description.
"""
image_bytes = _read_image_bytes(image_path)
raw = self.pm.analyze_image(image_bytes, _CLASSIFY_PROMPT, max_tokens=512)
result = _parse_json_response(raw)
if result is None:
return {
"is_diagram": False,
"diagram_type": "unknown",
"confidence": 0.0,
"brief_description": "",
}
return result
|
process_frames(frame_paths, diagrams_dir=None, captures_dir=None)
Process a list of extracted frames: classify, analyze diagrams, screengrab fallback.
Thresholds
- confidence >= 0.7 → full diagram analysis (story 3.2)
- 0.3 <= confidence < 0.7 → screengrab fallback (story 3.3)
- confidence < 0.3 → skip
Returns (diagrams, screen_captures).
Source code in video_processor/analyzers/diagram_analyzer.py
| def process_frames(
self,
frame_paths: List[Union[str, Path]],
diagrams_dir: Optional[Path] = None,
captures_dir: Optional[Path] = None,
) -> Tuple[List[DiagramResult], List[ScreenCapture]]:
"""
Process a list of extracted frames: classify, analyze diagrams, screengrab fallback.
Thresholds:
- confidence >= 0.7 → full diagram analysis (story 3.2)
- 0.3 <= confidence < 0.7 → screengrab fallback (story 3.3)
- confidence < 0.3 → skip
Returns (diagrams, screen_captures).
"""
diagrams: List[DiagramResult] = []
captures: List[ScreenCapture] = []
diagram_idx = 0
capture_idx = 0
for i, fp in enumerate(tqdm(frame_paths, desc="Analyzing frames", unit="frame")):
fp = Path(fp)
logger.info(f"Classifying frame {i}/{len(frame_paths)}: {fp.name}")
try:
classification = self.classify_frame(fp)
except Exception as e:
logger.warning(f"Classification failed for frame {i}: {e}")
continue
confidence = float(classification.get("confidence", 0.0))
if confidence < self.confidence_threshold:
logger.debug(f"Frame {i}: confidence {confidence:.2f} below threshold, skipping")
continue
if confidence >= 0.7:
# Full diagram analysis
logger.info(
f"Frame {i}: diagram detected (confidence {confidence:.2f}), analyzing..."
)
try:
analysis = self.analyze_diagram_single_pass(fp)
except Exception as e:
logger.warning(
f"Diagram analysis failed for frame {i}: {e}, falling back to screengrab"
)
analysis = {}
if not analysis:
# Analysis failed — fall back to screengrab
capture = self._save_screengrab(fp, i, capture_idx, captures_dir, confidence)
captures.append(capture)
capture_idx += 1
continue
# Build DiagramResult
dtype = analysis.get("diagram_type", classification.get("diagram_type", "unknown"))
try:
diagram_type = DiagramType(dtype)
except ValueError:
diagram_type = DiagramType.unknown
# Normalize relationships: llava sometimes returns dicts instead of strings
raw_rels = analysis.get("relationships") or []
relationships = []
for rel in raw_rels:
if isinstance(rel, str):
relationships.append(rel)
elif isinstance(rel, dict):
src = rel.get("source", rel.get("from", "?"))
dst = rel.get("destination", rel.get("to", "?"))
label = rel.get("label", rel.get("relationship", ""))
relationships.append(
f"{src} -> {dst}: {label}" if label else f"{src} -> {dst}"
)
else:
relationships.append(str(rel))
# Normalize elements: llava may return dicts or nested lists
raw_elements = analysis.get("elements") or []
elements = []
for elem in raw_elements:
if isinstance(elem, str):
elements.append(elem)
elif isinstance(elem, dict):
name = elem.get("name", elem.get("element", ""))
etype = elem.get("type", elem.get("element_type", ""))
if name and etype:
elements.append(f"{etype}: {name}")
elif name:
elements.append(name)
else:
elements.append(json.dumps(elem))
elif isinstance(elem, list):
elements.extend(str(e) for e in elem)
else:
elements.append(str(elem))
# Normalize text_content: llava may return dict instead of string
raw_text = analysis.get("text_content")
if isinstance(raw_text, dict):
parts = []
for k, v in raw_text.items():
if isinstance(v, list):
parts.append(f"{k}: {', '.join(str(x) for x in v)}")
else:
parts.append(f"{k}: {v}")
text_content = "\n".join(parts)
elif isinstance(raw_text, list):
text_content = "\n".join(str(x) for x in raw_text)
else:
text_content = raw_text
try:
dr = DiagramResult(
frame_index=i,
diagram_type=diagram_type,
confidence=confidence,
description=analysis.get("description"),
text_content=text_content,
elements=elements,
relationships=relationships,
mermaid=analysis.get("mermaid"),
chart_data=analysis.get("chart_data"),
)
except Exception as e:
logger.warning(
f"DiagramResult validation failed for frame {i}: {e}, "
"falling back to screengrab"
)
capture = self._save_screengrab(fp, i, capture_idx, captures_dir, confidence)
captures.append(capture)
capture_idx += 1
continue
# Save outputs (story 3.4)
if diagrams_dir:
diagrams_dir.mkdir(parents=True, exist_ok=True)
prefix = f"diagram_{diagram_idx}"
# Original frame
img_dest = diagrams_dir / f"{prefix}.jpg"
shutil.copy2(fp, img_dest)
dr.image_path = f"diagrams/{prefix}.jpg"
# Mermaid source
if dr.mermaid:
mermaid_dest = diagrams_dir / f"{prefix}.mermaid"
mermaid_dest.write_text(dr.mermaid)
dr.mermaid_path = f"diagrams/{prefix}.mermaid"
# Analysis JSON
json_dest = diagrams_dir / f"{prefix}.json"
json_dest.write_text(dr.model_dump_json(indent=2))
diagrams.append(dr)
diagram_idx += 1
else:
# Screengrab fallback (0.3 <= confidence < 0.7)
logger.info(
f"Frame {i}: uncertain (confidence {confidence:.2f}), saving as screengrab"
)
capture = self._save_screengrab(fp, i, capture_idx, captures_dir, confidence)
captures.append(capture)
capture_idx += 1
logger.info(
f"Diagram processing complete: {len(diagrams)} diagrams, {len(captures)} screengrabs"
)
return diagrams, captures
|
video_processor.analyzers.content_analyzer
Content cross-referencing between transcript and diagram entities.
ContentAnalyzer
Cross-references transcript and diagram entities for richer knowledge.
Source code in video_processor/analyzers/content_analyzer.py
| class ContentAnalyzer:
"""Cross-references transcript and diagram entities for richer knowledge."""
def __init__(self, provider_manager: Optional[ProviderManager] = None):
self.pm = provider_manager
def cross_reference(
self,
transcript_entities: List[Entity],
diagram_entities: List[Entity],
) -> List[Entity]:
"""
Merge entities from transcripts and diagrams.
Merges by exact name overlap first, then uses LLM for fuzzy matching
of remaining entities. Adds source attribution.
"""
merged: dict[str, Entity] = {}
# Index transcript entities
for e in transcript_entities:
key = e.name.lower()
merged[key] = Entity(
name=e.name,
type=e.type,
descriptions=list(e.descriptions),
source="transcript",
occurrences=list(e.occurrences),
)
# Merge diagram entities
for e in diagram_entities:
key = e.name.lower()
if key in merged:
existing = merged[key]
existing.source = "both"
existing.descriptions = list(set(existing.descriptions + e.descriptions))
existing.occurrences.extend(e.occurrences)
else:
merged[key] = Entity(
name=e.name,
type=e.type,
descriptions=list(e.descriptions),
source="diagram",
occurrences=list(e.occurrences),
)
# LLM fuzzy matching for unmatched entities
if self.pm:
unmatched_t = [
e
for e in transcript_entities
if e.name.lower() not in {d.name.lower() for d in diagram_entities}
]
unmatched_d = [
e
for e in diagram_entities
if e.name.lower() not in {t.name.lower() for t in transcript_entities}
]
if unmatched_t and unmatched_d:
matches = self._fuzzy_match(unmatched_t, unmatched_d)
for t_name, d_name in matches:
t_key = t_name.lower()
d_key = d_name.lower()
if t_key in merged and d_key in merged:
t_entity = merged[t_key]
d_entity = merged.pop(d_key)
t_entity.source = "both"
t_entity.descriptions = list(
set(t_entity.descriptions + d_entity.descriptions)
)
t_entity.occurrences.extend(d_entity.occurrences)
return list(merged.values())
def _fuzzy_match(
self,
transcript_entities: List[Entity],
diagram_entities: List[Entity],
) -> List[tuple[str, str]]:
"""Use LLM to fuzzy-match entity names across sources."""
if not self.pm:
return []
t_names = [e.name for e in transcript_entities]
d_names = [e.name for e in diagram_entities]
prompt = (
"Match entities that refer to the same thing across these two lists.\n\n"
f"Transcript entities: {t_names}\n"
f"Diagram entities: {d_names}\n\n"
"Return a JSON array of matched pairs:\n"
'[{"transcript": "name from list 1", "diagram": "name from list 2"}]\n\n'
"Only include confident matches. Return empty array if no matches.\n"
"Return ONLY the JSON array."
)
try:
raw = self.pm.chat([{"role": "user", "content": prompt}], temperature=0.2)
parsed = parse_json_from_response(raw)
if isinstance(parsed, list):
return [
(item["transcript"], item["diagram"])
for item in parsed
if isinstance(item, dict) and "transcript" in item and "diagram" in item
]
except Exception as e:
logger.warning(f"Fuzzy matching failed: {e}")
return []
def enrich_key_points(
self,
key_points: List[KeyPoint],
diagrams: list,
transcript_text: str,
) -> List[KeyPoint]:
"""
Link key points to relevant diagrams by entity overlap and temporal proximity.
"""
if not diagrams:
return key_points
# Build diagram entity index
diagram_entities: dict[int, set[str]] = {}
for i, d in enumerate(diagrams):
elements = d.get("elements", []) if isinstance(d, dict) else getattr(d, "elements", [])
text = (
d.get("text_content", "") if isinstance(d, dict) else getattr(d, "text_content", "")
)
entities = set(str(e).lower() for e in elements)
if text:
entities.update(word.lower() for word in text.split() if len(word) > 3)
diagram_entities[i] = entities
# Match key points to diagrams
for kp in key_points:
kp_words = set(kp.point.lower().split())
if kp.details:
kp_words.update(kp.details.lower().split())
related = []
for idx, d_entities in diagram_entities.items():
overlap = kp_words & d_entities
if len(overlap) >= 2:
related.append(idx)
if related:
kp.related_diagrams = related
return key_points
|
cross_reference(transcript_entities, diagram_entities)
Merge entities from transcripts and diagrams.
Merges by exact name overlap first, then uses LLM for fuzzy matching
of remaining entities. Adds source attribution.
Source code in video_processor/analyzers/content_analyzer.py
| def cross_reference(
self,
transcript_entities: List[Entity],
diagram_entities: List[Entity],
) -> List[Entity]:
"""
Merge entities from transcripts and diagrams.
Merges by exact name overlap first, then uses LLM for fuzzy matching
of remaining entities. Adds source attribution.
"""
merged: dict[str, Entity] = {}
# Index transcript entities
for e in transcript_entities:
key = e.name.lower()
merged[key] = Entity(
name=e.name,
type=e.type,
descriptions=list(e.descriptions),
source="transcript",
occurrences=list(e.occurrences),
)
# Merge diagram entities
for e in diagram_entities:
key = e.name.lower()
if key in merged:
existing = merged[key]
existing.source = "both"
existing.descriptions = list(set(existing.descriptions + e.descriptions))
existing.occurrences.extend(e.occurrences)
else:
merged[key] = Entity(
name=e.name,
type=e.type,
descriptions=list(e.descriptions),
source="diagram",
occurrences=list(e.occurrences),
)
# LLM fuzzy matching for unmatched entities
if self.pm:
unmatched_t = [
e
for e in transcript_entities
if e.name.lower() not in {d.name.lower() for d in diagram_entities}
]
unmatched_d = [
e
for e in diagram_entities
if e.name.lower() not in {t.name.lower() for t in transcript_entities}
]
if unmatched_t and unmatched_d:
matches = self._fuzzy_match(unmatched_t, unmatched_d)
for t_name, d_name in matches:
t_key = t_name.lower()
d_key = d_name.lower()
if t_key in merged and d_key in merged:
t_entity = merged[t_key]
d_entity = merged.pop(d_key)
t_entity.source = "both"
t_entity.descriptions = list(
set(t_entity.descriptions + d_entity.descriptions)
)
t_entity.occurrences.extend(d_entity.occurrences)
return list(merged.values())
|
enrich_key_points(key_points, diagrams, transcript_text)
Link key points to relevant diagrams by entity overlap and temporal proximity.
Source code in video_processor/analyzers/content_analyzer.py
| def enrich_key_points(
self,
key_points: List[KeyPoint],
diagrams: list,
transcript_text: str,
) -> List[KeyPoint]:
"""
Link key points to relevant diagrams by entity overlap and temporal proximity.
"""
if not diagrams:
return key_points
# Build diagram entity index
diagram_entities: dict[int, set[str]] = {}
for i, d in enumerate(diagrams):
elements = d.get("elements", []) if isinstance(d, dict) else getattr(d, "elements", [])
text = (
d.get("text_content", "") if isinstance(d, dict) else getattr(d, "text_content", "")
)
entities = set(str(e).lower() for e in elements)
if text:
entities.update(word.lower() for word in text.split() if len(word) > 3)
diagram_entities[i] = entities
# Match key points to diagrams
for kp in key_points:
kp_words = set(kp.point.lower().split())
if kp.details:
kp_words.update(kp.details.lower().split())
related = []
for idx, d_entities in diagram_entities.items():
overlap = kp_words & d_entities
if len(overlap) >= 2:
related.append(idx)
if related:
kp.related_diagrams = related
return key_points
|
video_processor.analyzers.action_detector
Enhanced action item detection from transcripts and diagrams.
ActionDetector
Detects action items from transcripts using heuristics and LLM.
Source code in video_processor/analyzers/action_detector.py
| class ActionDetector:
"""Detects action items from transcripts using heuristics and LLM."""
def __init__(self, provider_manager: Optional[ProviderManager] = None):
self.pm = provider_manager
def detect_from_transcript(
self,
text: str,
segments: Optional[List[TranscriptSegment]] = None,
) -> List[ActionItem]:
"""
Detect action items from transcript text.
Uses LLM extraction when available, falls back to pattern matching.
Segments are used to attach timestamps.
"""
if self.pm:
items = self._llm_extract(text)
else:
items = self._pattern_extract(text)
# Attach timestamps from segments if available
if segments and items:
self._attach_timestamps(items, segments)
return items
def detect_from_diagrams(
self,
diagrams: list,
) -> List[ActionItem]:
"""
Extract action items mentioned in diagram text content.
Looks for action-oriented language in diagram text/elements.
"""
items: List[ActionItem] = []
for diagram in diagrams:
text = ""
if isinstance(diagram, dict):
text = diagram.get("text_content", "") or ""
elements = diagram.get("elements", [])
else:
text = getattr(diagram, "text_content", "") or ""
elements = getattr(diagram, "elements", [])
combined = text + " " + " ".join(str(e) for e in elements)
if not combined.strip():
continue
if self.pm:
diagram_items = self._llm_extract(combined)
else:
diagram_items = self._pattern_extract(combined)
for item in diagram_items:
item.source = "diagram"
items.extend(diagram_items)
return items
def merge_action_items(
self,
transcript_items: List[ActionItem],
diagram_items: List[ActionItem],
) -> List[ActionItem]:
"""
Merge action items from transcript and diagram sources.
Deduplicates by checking for similar action text.
"""
merged: List[ActionItem] = list(transcript_items)
existing_actions = {a.action.lower().strip() for a in merged}
for item in diagram_items:
normalized = item.action.lower().strip()
if normalized not in existing_actions:
merged.append(item)
existing_actions.add(normalized)
return merged
def _llm_extract(self, text: str) -> List[ActionItem]:
"""Extract action items using LLM."""
if not self.pm:
return []
prompt = (
"Extract all action items, tasks, and commitments "
"from the following text.\n\n"
f"TEXT:\n{text[:8000]}\n\n"
"Return a JSON array:\n"
'[{"action": "...", "assignee": "...", "deadline": "...", '
'"priority": "...", "context": "..."}]\n\n'
"Only include clear, actionable items. "
"Set fields to null if not mentioned.\n"
"Return ONLY the JSON array."
)
try:
raw = self.pm.chat(
[{"role": "user", "content": prompt}],
temperature=0.3,
)
parsed = parse_json_from_response(raw)
if isinstance(parsed, list):
return [
ActionItem(
action=item.get("action", ""),
assignee=item.get("assignee"),
deadline=item.get("deadline"),
priority=item.get("priority"),
context=item.get("context"),
source="transcript",
)
for item in parsed
if isinstance(item, dict) and item.get("action")
]
except Exception as e:
logger.warning(f"LLM action extraction failed: {e}")
return []
def _pattern_extract(self, text: str) -> List[ActionItem]:
"""Extract action items using regex pattern matching."""
items: List[ActionItem] = []
sentences = re.split(r"[.!?]\s+", text)
for sentence in sentences:
sentence = sentence.strip()
if not sentence or len(sentence) < 10:
continue
for pattern in _ACTION_PATTERNS:
if pattern.search(sentence):
items.append(
ActionItem(
action=sentence,
source="transcript",
)
)
break # One match per sentence is enough
return items
def _attach_timestamps(
self,
items: List[ActionItem],
segments: List[TranscriptSegment],
) -> None:
"""Attach timestamps to action items by finding matching segments."""
for item in items:
action_lower = item.action.lower()
best_overlap = 0
best_segment = None
for seg in segments:
seg_lower = seg.text.lower()
# Check word overlap
action_words = set(action_lower.split())
seg_words = set(seg_lower.split())
overlap = len(action_words & seg_words)
if overlap > best_overlap:
best_overlap = overlap
best_segment = seg
if best_segment and best_overlap >= 3:
if not item.context:
item.context = f"at {best_segment.start:.0f}s"
|
detect_from_diagrams(diagrams)
Extract action items mentioned in diagram text content.
Looks for action-oriented language in diagram text/elements.
Source code in video_processor/analyzers/action_detector.py
| def detect_from_diagrams(
self,
diagrams: list,
) -> List[ActionItem]:
"""
Extract action items mentioned in diagram text content.
Looks for action-oriented language in diagram text/elements.
"""
items: List[ActionItem] = []
for diagram in diagrams:
text = ""
if isinstance(diagram, dict):
text = diagram.get("text_content", "") or ""
elements = diagram.get("elements", [])
else:
text = getattr(diagram, "text_content", "") or ""
elements = getattr(diagram, "elements", [])
combined = text + " " + " ".join(str(e) for e in elements)
if not combined.strip():
continue
if self.pm:
diagram_items = self._llm_extract(combined)
else:
diagram_items = self._pattern_extract(combined)
for item in diagram_items:
item.source = "diagram"
items.extend(diagram_items)
return items
|
detect_from_transcript(text, segments=None)
Detect action items from transcript text.
Uses LLM extraction when available, falls back to pattern matching.
Segments are used to attach timestamps.
Source code in video_processor/analyzers/action_detector.py
| def detect_from_transcript(
self,
text: str,
segments: Optional[List[TranscriptSegment]] = None,
) -> List[ActionItem]:
"""
Detect action items from transcript text.
Uses LLM extraction when available, falls back to pattern matching.
Segments are used to attach timestamps.
"""
if self.pm:
items = self._llm_extract(text)
else:
items = self._pattern_extract(text)
# Attach timestamps from segments if available
if segments and items:
self._attach_timestamps(items, segments)
return items
|
merge_action_items(transcript_items, diagram_items)
Merge action items from transcript and diagram sources.
Deduplicates by checking for similar action text.
Source code in video_processor/analyzers/action_detector.py
| def merge_action_items(
self,
transcript_items: List[ActionItem],
diagram_items: List[ActionItem],
) -> List[ActionItem]:
"""
Merge action items from transcript and diagram sources.
Deduplicates by checking for similar action text.
"""
merged: List[ActionItem] = list(transcript_items)
existing_actions = {a.action.lower().strip() for a in merged}
for item in diagram_items:
normalized = item.action.lower().strip()
if normalized not in existing_actions:
merged.append(item)
existing_actions.add(normalized)
return merged
|
Overview
The analyzers module contains the core content extraction logic for PlanOpticon. These analyzers process video frames and transcripts to extract structured knowledge: diagrams, key points, action items, and cross-referenced entities.
All analyzers accept an optional ProviderManager instance. When provided, they use LLM capabilities for richer extraction. Without one, they fall back to heuristic/pattern-based methods where possible.
DiagramAnalyzer
from video_processor.analyzers.diagram_analyzer import DiagramAnalyzer
Vision model-based diagram detection and analysis. Classifies video frames as diagrams, slides, screenshots, or other content, then performs full extraction on high-confidence frames.
Constructor
def __init__(
self,
provider_manager: Optional[ProviderManager] = None,
confidence_threshold: float = 0.3,
)
| Parameter |
Type |
Default |
Description |
provider_manager |
Optional[ProviderManager] |
None |
LLM provider (creates a default if not provided) |
confidence_threshold |
float |
0.3 |
Minimum confidence to process a frame at all |
classify_frame()
def classify_frame(self, image_path: Union[str, Path]) -> dict
Classify a single frame using a vision model. Determines whether the frame contains a diagram, slide, or other visual content worth extracting.
Parameters:
| Parameter |
Type |
Description |
image_path |
Union[str, Path] |
Path to the frame image file |
Returns: dict with the following keys:
| Key |
Type |
Description |
is_diagram |
bool |
Whether the frame contains extractable content |
diagram_type |
str |
One of: flowchart, sequence, architecture, whiteboard, chart, table, slide, screenshot, unknown |
confidence |
float |
Detection confidence from 0.0 to 1.0 |
content_type |
str |
Content category: slide, diagram, document, screen_share, whiteboard, chart, person, other |
brief_description |
str |
One-sentence description of the frame content |
Important: Frames showing people, webcam feeds, or video conference participant views return confidence: 0.0. The classifier is tuned to detect only shared/presented content.
analyzer = DiagramAnalyzer()
result = analyzer.classify_frame("/path/to/frame_042.jpg")
if result["confidence"] >= 0.7:
print(f"Diagram detected: {result['diagram_type']}")
analyze_diagram_single_pass()
def analyze_diagram_single_pass(self, image_path: Union[str, Path]) -> dict
Full single-pass diagram analysis. Extracts description, text content, elements, relationships, Mermaid syntax, and chart data in a single LLM call.
Returns: dict with the following keys:
| Key |
Type |
Description |
diagram_type |
str |
Diagram classification |
description |
str |
Detailed description of the visual content |
text_content |
str |
All visible text, preserving structure |
elements |
list[str] |
Identified elements/components |
relationships |
list[str] |
Relationships in "A -> B: label" format |
mermaid |
str |
Valid Mermaid diagram syntax |
chart_data |
dict \| None |
Chart data with labels, values, chart_type (only for data charts) |
Returns an empty dict on failure.
caption_frame()
def caption_frame(self, image_path: Union[str, Path]) -> str
Get a brief 1-2 sentence caption for a frame. Used as a fallback when full diagram analysis is not warranted.
Returns: str -- a brief description of the frame content.
process_frames()
def process_frames(
self,
frame_paths: List[Union[str, Path]],
diagrams_dir: Optional[Path] = None,
captures_dir: Optional[Path] = None,
) -> Tuple[List[DiagramResult], List[ScreenCapture]]
Process a batch of extracted video frames through the full classification and analysis pipeline.
Parameters:
| Parameter |
Type |
Default |
Description |
frame_paths |
List[Union[str, Path]] |
required |
Paths to frame images |
diagrams_dir |
Optional[Path] |
None |
Output directory for diagram files (images, mermaid, JSON) |
captures_dir |
Optional[Path] |
None |
Output directory for screengrab fallback files |
Returns: Tuple[List[DiagramResult], List[ScreenCapture]]
Confidence thresholds:
| Confidence Range |
Action |
| >= 0.7 |
Full diagram analysis -- extracts elements, relationships, Mermaid syntax |
| 0.3 to 0.7 |
Screengrab fallback -- saves frame with a brief caption |
| < 0.3 |
Skipped entirely |
Output files (when directories are provided):
For diagrams (diagrams_dir):
diagram_N.jpg -- original frame image
diagram_N.mermaid -- Mermaid source (if generated)
diagram_N.json -- full DiagramResult as JSON
For screen captures (captures_dir):
capture_N.jpg -- original frame image
capture_N.json -- ScreenCapture metadata as JSON
from pathlib import Path
from video_processor.analyzers.diagram_analyzer import DiagramAnalyzer
from video_processor.providers.manager import ProviderManager
analyzer = DiagramAnalyzer(
provider_manager=ProviderManager(),
confidence_threshold=0.3,
)
frame_paths = list(Path("output/frames").glob("*.jpg"))
diagrams, captures = analyzer.process_frames(
frame_paths,
diagrams_dir=Path("output/diagrams"),
captures_dir=Path("output/captures"),
)
print(f"Found {len(diagrams)} diagrams, {len(captures)} screengrabs")
for d in diagrams:
print(f" [{d.diagram_type.value}] {d.description}")
ContentAnalyzer
from video_processor.analyzers.content_analyzer import ContentAnalyzer
Cross-references transcript and diagram entities for richer knowledge extraction. Merges entities found in different sources and enriches key points with diagram links.
Constructor
def __init__(self, provider_manager: Optional[ProviderManager] = None)
| Parameter |
Type |
Default |
Description |
provider_manager |
Optional[ProviderManager] |
None |
Required for LLM-based fuzzy matching |
cross_reference()
def cross_reference(
self,
transcript_entities: List[Entity],
diagram_entities: List[Entity],
) -> List[Entity]
Merge entities from transcripts and diagrams into a unified list with source attribution.
Merge strategy:
- Index all transcript entities by lowercase name, marked with
source="transcript"
- Merge diagram entities: if a name matches, set
source="both" and combine descriptions/occurrences; otherwise add as source="diagram"
- If a
ProviderManager is available, use LLM fuzzy matching to find additional matches among unmatched entities (e.g., "PostgreSQL" from transcript matching "Postgres" from diagram)
Parameters:
| Parameter |
Type |
Description |
transcript_entities |
List[Entity] |
Entities extracted from transcript |
diagram_entities |
List[Entity] |
Entities extracted from diagrams |
Returns: List[Entity] -- merged entity list with source attribution.
from video_processor.analyzers.content_analyzer import ContentAnalyzer
from video_processor.models import Entity
analyzer = ContentAnalyzer(provider_manager=pm)
transcript_entities = [
Entity(name="PostgreSQL", type="technology"),
Entity(name="Alice", type="person"),
]
diagram_entities = [
Entity(name="Postgres", type="technology"),
Entity(name="Redis", type="technology"),
]
merged = analyzer.cross_reference(transcript_entities, diagram_entities)
# "PostgreSQL" and "Postgres" may be fuzzy-matched and merged
enrich_key_points()
def enrich_key_points(
self,
key_points: List[KeyPoint],
diagrams: list,
transcript_text: str,
) -> List[KeyPoint]
Link key points to relevant diagrams by entity overlap. Examines word overlap between key point text and diagram elements/text content.
Parameters:
| Parameter |
Type |
Description |
key_points |
List[KeyPoint] |
Key points to enrich |
diagrams |
list |
List of DiagramResult objects or dicts |
transcript_text |
str |
Full transcript text (reserved for future use) |
Returns: List[KeyPoint] -- key points with related_diagrams indices populated.
A key point is linked to a diagram when they share 2 or more words (excluding short words) between the key point text/details and the diagram's elements/text content.
ActionDetector
from video_processor.analyzers.action_detector import ActionDetector
Detects action items from transcripts and diagram content using LLM extraction with a regex pattern fallback.
Constructor
def __init__(self, provider_manager: Optional[ProviderManager] = None)
| Parameter |
Type |
Default |
Description |
provider_manager |
Optional[ProviderManager] |
None |
Required for LLM-based extraction |
detect_from_transcript()
def detect_from_transcript(
self,
text: str,
segments: Optional[List[TranscriptSegment]] = None,
) -> List[ActionItem]
Detect action items from transcript text.
Parameters:
| Parameter |
Type |
Default |
Description |
text |
str |
required |
Transcript text to analyze |
segments |
Optional[List[TranscriptSegment]] |
None |
Transcript segments for timestamp attachment |
Returns: List[ActionItem] -- detected action items with source="transcript".
Extraction modes:
- LLM mode (when
provider_manager is set): Sends the transcript to the LLM with a structured extraction prompt. Extracts action, assignee, deadline, priority, and context.
- Pattern mode (fallback): Matches sentences against regex patterns for action-oriented language.
Pattern matching detects sentences containing:
- "need/needs to", "should/must/shall"
- "will/going to", "action item/todo/follow-up"
- "assigned to/responsible for", "deadline/due by"
- "let's/let us", "make sure/ensure"
- "can you/could you/please"
Timestamp attachment: When segments are provided, each action item is matched to the most relevant transcript segment (by word overlap, minimum 3 matching words), and a timestamp is added to context.
detect_from_diagrams()
def detect_from_diagrams(self, diagrams: list) -> List[ActionItem]
Extract action items from diagram text content and elements. Processes each diagram's combined text using either LLM or pattern extraction.
Parameters:
| Parameter |
Type |
Description |
diagrams |
list |
List of DiagramResult objects or dicts |
Returns: List[ActionItem] -- action items with source="diagram".
merge_action_items()
def merge_action_items(
self,
transcript_items: List[ActionItem],
diagram_items: List[ActionItem],
) -> List[ActionItem]
Merge action items from multiple sources, deduplicating by action text (case-insensitive, whitespace-normalized).
Returns: List[ActionItem] -- deduplicated merged list.
Usage example
from video_processor.analyzers.action_detector import ActionDetector
from video_processor.providers.manager import ProviderManager
detector = ActionDetector(provider_manager=ProviderManager())
# From transcript
transcript_items = detector.detect_from_transcript(
text="Alice needs to update the API docs by Friday. "
"Bob should review the PR before merging.",
segments=transcript_segments,
)
# From diagrams
diagram_items = detector.detect_from_diagrams(diagram_results)
# Merge and deduplicate
all_items = detector.merge_action_items(transcript_items, diagram_items)
for item in all_items:
print(f"[{item.priority or 'unset'}] {item.action}")
if item.assignee:
print(f" Assignee: {item.assignee}")
if item.deadline:
print(f" Deadline: {item.deadline}")
Pattern fallback (no LLM)
# Works without any API keys
detector = ActionDetector() # No provider_manager
items = detector.detect_from_transcript(
"We need to finalize the database schema. "
"Please update the deployment scripts."
)
# Returns ActionItems matched by regex patterns