import collections
import itertools
import os
from logging import getLogger
from typing import List
import graphviz
from pyknp_eventgraph import EventGraph
from pyknp_eventgraph.eventgraph import Event, Relation
from pyknp_eventgraph.helper import PAS_ORDER
logger = getLogger(__name__)
[docs]def make_image(evg: EventGraph, output: str, with_detail: bool = True, with_original_text: bool = True):
"""Visualize an EventGraph.
Args:
evg (EventGraph): An EventGraph.
output (str): Path to an output file. The file extension must be '.svg'.
with_detail (bool): If true, detail information will be included.
with_original_text (bool): If true, original sentences will be included.
"""
output, ext = os.path.splitext(output)
assert ext == '.svg', 'the extension of the output file must be ".svg"'
# Group sentences and events by their sentence IDs
sentences = {k: list(v) for k, v in itertools.groupby(evg.sentences, key=lambda x: x.sid.rsplit('-', 1)[0])}
events = {k: list(v) for k, v in itertools.groupby(evg.events, key=lambda x: x.sid.rsplit('-', 1)[0])}
# Create a base image
g = graphviz.Digraph('G', format='svg')
g.attr('graph', ranksep='0', margin='0', pad='0')
num_cluster = 0
for did in sentences.keys():
doc_sentences = sentences.get(did, [])
doc_events = events.get(did, [])
if with_original_text:
with g.subgraph(name=f'cluster_{num_cluster}') as h:
h.attr('graph', style='invis')
h.node(
name=f'head_{num_cluster}',
label='\\l'.join(sentence.surf for sentence in doc_sentences) + '\\l',
shape='plaintext'
)
h.node(
name=f'cluster_{num_cluster}_top',
label='',
shape='none',
width='0'
)
num_cluster += 1
sent_events_list = _split_events_by_sid(doc_events) # too long sentences are split
for row, sent_events in enumerate(sent_events_list):
with g.subgraph(name=f'cluster_{num_cluster}') as c:
c.attr('graph', style='invis')
for event in reversed(sent_events):
node = Node(event)
c.node(
name=node.name,
label=node.to_string(with_detail),
shape='box',
labelloc='b',
height='0'
)
c.node(
name=f'cluster_{num_cluster}_top',
label='',
shape='none',
width='0'
)
num_cluster += 1
for event in doc_events:
for relation in event.outgoing_relations:
edge = Edge(relation)
g.edge(
tail_name=edge.modifier_node_name,
head_name=edge.head_node_name,
label=edge.to_string(),
weight='1',
constraint='false'
)
# align clusters vertically
for i in range(num_cluster - 1):
g.edge(tail_name=f'cluster_{i}_top', head_name=f'cluster_{i + 1}_top', style='invis')
output_dir = os.path.abspath(os.path.dirname(output))
if not os.path.exists(output_dir):
os.makedirs(output_dir)
logger.debug('Render an image')
g.render(output, cleanup=True)
logger.debug('Successfully constructed visualization')
def _split_events_by_sid(events: List[Event], max_length: int = 4) -> List[List[Event]]:
"""Group events by their sentence IDs.
Args:
events: A list of events.
max_length: A maximum number of events which are written in the same row.
Returns:
A list of lists of events.
"""
ssid_events_map = collections.defaultdict(list)
for event in events:
ssid_events_map[event.ssid].append(event)
split_events = []
for ssid, sent_events in sorted(ssid_events_map.items(), key=lambda x: x[0]):
for i in range(0, len(sent_events), max_length):
split_events.append(sent_events[i:i+max_length])
return split_events
class Node:
def __init__(self, event: Event):
self.event = event
@property
def name(self) -> str:
"""The name of this node."""
return f'event_{self.event.evid}'
@property
def surf(self) -> str:
"""The surface string of this node."""
return self.event.surf_with_mark
@property
def pas(self) -> str:
"""The PAS of this node."""
pred = self.event.pas.predicate.standard_reps
if self.event.pas.predicate.type_:
pred += f':{self.event.pas.predicate.type_}'
args = []
for case in sorted(self.event.pas.arguments, key=lambda x: PAS_ORDER.get(x, 99)):
arg = self.event.pas.arguments[case][0]
if '外の関係' not in case:
args.append(f'{arg.head_reps}:{case}')
return ', '.join([pred] + args)
@property
def features(self) -> str:
"""The features of this node."""
features = []
if self.event.features.negation:
features.append('否定')
if self.event.features.tense:
features.append(f'時制:{self.event.features.tense}')
for modality in self.event.features.modality:
features.append(f'モダリティ:{modality}')
return ', '.join(features)
def to_string(self, with_detail: bool) -> str:
"""Return the string.
Args:
with_detail: Whether to include the detail information.
Returns:
The string of a given event.
"""
content = ''
if with_detail:
surf = self.surf
if surf.endswith(')'):
main, adjunct = surf[:-1].rsplit('(', 1)
surf = f'{main.strip()}<font color="gray">{adjunct.strip()}</font>'
content += f'<tr><td align="left">[surf] {surf}</td></tr>'
pas = self.pas
content += f'<tr><td align="left">[pas] {pas}</td></tr>'
features = self.features
if features:
content += f'<tr><td align="left">[features] {features}</td></tr>'
else:
surf = self.surf
if self.surf.endswith(')'):
main, adjunct = self.surf[:-1].rsplit('(', 1)
surf = f'{main.strip()}<font color="gray">{adjunct.strip()}</font>'
content += f'<tr><td align="left">{surf}</td></tr>'
return f'<<table border="0" cellborder="0" cellspacing="1">{content}</table>>'
class Edge:
def __init__(self, relation: Relation):
self.relation = relation
@property
def modifier_node_name(self) -> str:
"""The name of the modifier node."""
return Node(self.relation.modifier).name
@property
def head_node_name(self) -> str:
"""The name of the head node."""
return Node(self.relation.head).name
def to_string(self) -> str:
"""Return the string."""
label = self.relation.label. \
replace('談話関係', '談'). \
replace('連体修飾', '▼'). \
replace('補文', '■'). \
replace('係り受け', '')
out = label
if out and self.relation.surf:
out += f':{self.relation.surf}'
return f' {out} '