from logging import getLogger
from typing import Tuple, List, Dict, Union, Optional, TYPE_CHECKING
from pyknp import Tag, Morpheme
from pyknp_eventgraph.builder import Builder
from pyknp_eventgraph.component import Component
from pyknp_eventgraph.features import Features, FeaturesBuilder, JsonFeaturesBuilder
from pyknp_eventgraph.helper import PAS_ORDER, convert_katakana_to_hiragana
from pyknp_eventgraph.pas import PAS, PASBuilder, JsonPASBuilder
from pyknp_eventgraph.relation import Relation
from pyknp_eventgraph.base_phrase import BasePhrase, group_base_phrases
from pyknp_eventgraph.helper import convert_mrphs_to_surf
if TYPE_CHECKING:
from pyknp_eventgraph.sentence import Sentence
Morpheme_ = Union[str, Morpheme]
logger = getLogger(__name__)
[docs]class Event(Component):
"""Event is the basic information unit of EventGraph. Event is closely related to PAS but more
application-oriented with respect to the following points:
* Semantic heaviness: Some predicates are too semantically light for applications to treat as information units.
EventGraph constrains an event to have a semantically heavy predicate.
* Rich linguistic features: Linguistic features such as tense and modality are assigned to events.
Attributes:
sentence (:class:`.Sentence`): A sentence to which this event belongs.
evid (int): A serial event ID.
sid (str): An original sentence ID.
ssid (int): A serial sentence ID.
start (:class:`pyknp.knp.tag.Tag`, optional): A start tag.
head (:class:`pyknp.knp.tag.Tag`, optional): A head tag.
end (:class:`pyknp.knp.tag.Tag`, optional): An end tag.
pas (PAS, optional): A predicate argument structure.
outgoing_relations (List[Relation]): A list of relations where this event is the modifier.
incoming_relations (List[Relation]): A list of relations where this event is the head.
features (Features, optional): Linguistic features.
parent (Event, optional): A parent event.
children (List[Event]): A list of child events.
head_base_phrase (Token, optional): A head basic phrase.
"""
def __init__(
self,
sentence: 'Sentence',
evid: int,
sid: str,
ssid: int,
start: Optional[Tag] = None,
head: Optional[Tag] = None,
end: Optional[Tag] = None
):
self.sentence: Sentence = sentence
self.evid: int = evid
self.sid: str = sid
self.ssid: int = ssid
self.start: Tag = start
self.head: Tag = head
self.end: Tag = end
self.pas: Optional[PAS] = None
self.outgoing_relations: List[Relation] = []
self.incoming_relations: List[Relation] = []
self.features: Optional[Features] = None
self.parent: Optional[Event] = None
self.children: List[Event] = []
self.head_base_phrase: Optional[BasePhrase] = None
self._surf = None
self._surf_with_mark = None
self._mrphs = None
self._mrphs_with_mark = None
self._normalized_mrphs = None
self._normalized_mrphs_with_mark = None
self._normalized_mrphs_without_exophora = None
self._normalized_mrphs_with_mark_without_exophora = None
self._reps = None
self._reps_with_mark = None
self._normalized_reps = None
self._normalized_reps_with_mark = None
self._content_rep_list = None
@property
def event_id(self) -> int:
"""An alias to evid."""
return self.evid
@property
def surf(self) -> str:
"""A surface string."""
if self._surf is None:
self._surf = self.surf_()
return self._surf
@property
def surf_with_mark(self) -> str:
"""A surface string with marks."""
if self._surf_with_mark is None:
self._surf_with_mark = self.surf_with_mark_()
return self._surf_with_mark
@property
def mrphs(self) -> str:
"""A tokenized surface string."""
if self._mrphs is None:
self._mrphs = self.mrphs_()
return self._mrphs
@property
def mrphs_with_mark(self) -> str:
"""A tokenized surface string with marks."""
if self._mrphs_with_mark is None:
self._mrphs_with_mark = self.mrphs_with_mark_()
return self._mrphs_with_mark
@property
def normalized_mrphs(self) -> str:
"""A tokenized/normalized surface string."""
if self._normalized_mrphs is None:
self._normalized_mrphs = self.normalized_mrphs_()
return self._normalized_mrphs
@property
def normalized_mrphs_with_mark(self) -> str:
"""A tokenized/normalized surface string with marks."""
if self._normalized_mrphs_with_mark is None:
self._normalized_mrphs_with_mark = self.normalized_mrphs_with_mark_()
return self._normalized_mrphs_with_mark
@property
def normalized_mrphs_without_exophora(self) -> str:
"""A tokenized/normalized surface string without exophora."""
if self._normalized_mrphs_without_exophora is None:
self._normalized_mrphs_without_exophora = self.normalized_mrphs_without_exophora_()
return self._normalized_mrphs_without_exophora
@property
def normalized_mrphs_with_mark_without_exophora(self) -> str:
"""A tokenized/normalized surface string with marks but without exophora."""
if self._normalized_mrphs_with_mark_without_exophora is None:
self._normalized_mrphs_with_mark_without_exophora = self.normalized_mrphs_with_mark_without_exophora_()
return self._normalized_mrphs_with_mark_without_exophora
@property
def reps(self) -> str:
"""A representative string."""
if self._reps is None:
self._reps = self.reps_()
return self._reps
@property
def reps_with_mark(self) -> str:
"""A representative string with marks."""
if self._reps_with_mark is None:
self._reps_with_mark = self.reps_with_mark_()
return self._reps_with_mark
@property
def normalized_reps(self) -> str:
"""A normalized representative string."""
if self._normalized_reps is None:
self._normalized_reps = self.normalized_reps_()
return self._normalized_reps
@property
def normalized_reps_with_mark(self) -> str:
"""A normalized representative string with marks."""
if self._normalized_reps_with_mark is None:
self._normalized_reps_with_mark = self.normalized_reps_with_mark_()
return self._normalized_reps_with_mark
@property
def content_rep_list(self) -> List[str]:
"""A list of content words."""
if self._content_rep_list is None:
self._content_rep_list = self.content_rep_list_()
return self._content_rep_list
[docs] def surf_(self, include_modifiers: bool = False) -> str:
"""A surface string.
Args:
include_modifiers: If true, tokens of events that modify this event will be included.
"""
return convert_mrphs_to_surf(self.mrphs_(include_modifiers))
[docs] def surf_with_mark_(self, include_modifiers: bool = False) -> str:
"""A surface string with marks.
Args:
include_modifiers: If true, tokens of events that modify this event will be included.
"""
return convert_mrphs_to_surf(self.mrphs_with_mark_(include_modifiers))
[docs] def mrphs_(self, include_modifiers: bool = False) -> str:
"""A tokenized surface string.
Args:
include_modifiers: If true, tokens of events that modify this event will be included.
"""
return self._to_text(truncate=False, add_mark=False, include_modifiers=include_modifiers)
[docs] def mrphs_with_mark_(self, include_modifiers: bool = False) -> str:
"""A tokenized surface string with marks.
Args:
include_modifiers: If true, tokens of events that modify this event will be included.
"""
return self._to_text(truncate=False, add_mark=True, include_modifiers=include_modifiers)
[docs] def normalized_mrphs_(self, include_modifiers: bool = False) -> str:
"""A tokenized/normalized surface string.
Args:
include_modifiers: If true, tokens of events that modify this event will be included.
"""
return self._to_text(truncate=True, add_mark=False, include_modifiers=include_modifiers)
[docs] def normalized_mrphs_with_mark_(self, include_modifiers: bool = False) -> str:
"""A tokenized/normalized surface string with marks.
Args:
include_modifiers: If true, tokens of events that modify this event will be included.
"""
return self._to_text(truncate=True, add_mark=True, include_modifiers=include_modifiers)
[docs] def normalized_mrphs_without_exophora_(self, include_modifiers: bool = False) -> str:
"""A tokenized/normalized surface string without exophora.
Args:
include_modifiers: If true, tokens of events that modify this event will be included.
"""
return self._to_text(truncate=True, add_mark=False, exclude_exophora=True, include_modifiers=include_modifiers)
[docs] def normalized_mrphs_with_mark_without_exophora_(self, include_modifiers: bool = False) -> str:
"""A tokenized/normalized surface string with marks but without exophora.
Args:
include_modifiers: If true, tokens of events that modify this event will be included.
"""
return self._to_text(truncate=True, add_mark=True, exclude_exophora=True, include_modifiers=include_modifiers)
[docs] def reps_(self, include_modifiers: bool = False) -> str:
"""A representative string.
Args:
include_modifiers: If true, tokens of events that modify this event will be included.
"""
return self._to_text('reps', truncate=False, add_mark=False, include_modifiers=include_modifiers)
[docs] def reps_with_mark_(self, include_modifiers: bool = False) -> str:
"""A representative string with marks.
Args:
include_modifiers: If true, tokens of events that modify this event will be included.
"""
return self._to_text('reps', truncate=False, add_mark=True, include_modifiers=include_modifiers)
[docs] def normalized_reps_(self, include_modifiers: bool = False) -> str:
"""A normalized representative string.
Args:
include_modifiers: If true, tokens of events that modify this event will be included.
"""
return self._to_text('reps', truncate=True, add_mark=False, include_modifiers=include_modifiers)
[docs] def normalized_reps_with_mark_(self, include_modifiers: bool = False) -> str:
"""A normalized representative string with marks.
Args:
include_modifiers: If true, tokens of events that modify this event will be included.
"""
return self._to_text('reps', truncate=True, add_mark=True, include_modifiers=include_modifiers)
[docs] def content_rep_list_(self) -> List[str]:
"""A list of content words."""
content_rep_list = []
for bp in self._collect_base_phrases():
if bp.tag is None:
continue
for mrph in bp.tag.mrph_list():
if '<内容語>' in mrph.fstring or '<準内容語>' in mrph.fstring:
content_rep_list.append(mrph.repname or f'{mrph.midasi}/{mrph.midasi}')
return content_rep_list
def _to_text(
self,
mode: str = 'mrphs',
truncate: bool = False,
add_mark: bool = False,
exclude_exophora: bool = False,
include_modifiers: bool = False,
exclude_adnominal: bool = False,
) -> str:
"""Convert this event to a text.
Args:
mode: A type of token representation, which can take either "mrphs" or "reps".
truncate: If true, adjunct words are truncated.
add_mark: If true, special marks are added.
exclude_exophora: If true, exophora will not be used.
include_modifiers: If true, tokens of events that modify this event will be included.
exclude_adnominal: If true, base phrases modified by this event will be excluded.
"""
assert mode in {'mrphs', 'reps'}
# Create a list of base phrases to show.
grouped_bps = group_base_phrases(self._collect_base_phrases(
exclude_exophora=exclude_exophora,
exclude_adnominal=exclude_adnominal
))
# Create a list of morphemes.
grouped_mrphs = [[morpheme for bp in bps for morpheme in bp.morphemes] for bps in grouped_bps]
# Truncate the morphemes.
truncated_pos = self._find_truncated_position(grouped_bps)
if truncate:
grouped_mrphs = grouped_mrphs[:truncated_pos[0] + 1]
grouped_mrphs[-1] = grouped_mrphs[-1][:truncated_pos[1] + 1]
# Create a map from a position to a string to be inserted.
additional_texts = self._get_additional_texts(
grouped_bps=grouped_bps,
grouped_mrphs=grouped_mrphs,
mode=mode,
add_mark=add_mark,
normalize=truncate,
truncated_pos=truncated_pos,
include_modifiers=include_modifiers,
exclude_exophora=exclude_exophora
)
return self._format_grouped_mrphs(
grouped_mrphs=grouped_mrphs,
mode=mode,
normalize=truncate,
additional_texts=additional_texts
)
def _collect_base_phrases(
self,
exclude_exophora: bool = False,
exclude_adnominal: bool = False,
) -> List[BasePhrase]:
"""Collect base phrases belonging to this event.
Args:
exclude_exophora: If true, exophora will be excluded.
exclude_adnominal: If true, base phrases modified by this event will be excluded.
Returns:
A list of base phrases that belong to this event.
"""
# Collect head base phrases.
head_bps = [self.pas.predicate.head_base_phrase]
for args in self.pas.arguments.values():
for arg in args:
if arg.head_base_phrase.omitted_case:
if exclude_exophora and arg.head_base_phrase.exophora:
# e.g., [著者 が]
continue
if exclude_adnominal and arg.head_base_phrase.tag == self.pas.predicate.head_base_phrase.tag.parent:
# e.g., [車が] of "[車が] 高速道路を低速で走る" -> "車は危ない"
continue
head_bps.append(arg.head_base_phrase)
continue
if arg.head_base_phrase.is_event_head or arg.head_base_phrase.is_event_end:
continue
if arg.head_base_phrase.tag.tag_id > self.end.tag_id:
continue
head_bps.append(arg.head_base_phrase)
return sorted(list(set(bp for head_bp in head_bps for bp in head_bp.to_list())))
def _find_truncated_position(self, grouped_bps: List[List[BasePhrase]]) -> Tuple[int, int]:
"""Find a position just before adjunct words start.
Args:
grouped_bps: A list of base phrases grouped by bunsetsu IDs.
Returns:
A position just before adjunct words start.
"""
seen_head = False
for group_index, bps in enumerate(grouped_bps):
# Ignore base phrases of a omitted case because they never become a predicate.
if any(bp.omitted_case for bp in bps):
continue
mrph_index_offset = 0
for bp in bps:
# Skip base phrases until the current base phrase reaches to the predicate's head base phrase.
seen_head = seen_head or bp == self.pas.predicate.head_base_phrase
if not seen_head:
mrph_index_offset += len(bp.morphemes)
continue
# Find a position to be truncated.
for mrph_index, mrph in reversed(list(enumerate(bp.morphemes))):
if mrph.hinsi == '助動詞' \
and mrph.genkei == 'です' \
and 0 < mrph_index \
and bp.morphemes[mrph_index - 1].hinsi == '形容詞':
# adjective + 'です' -> ignore 'です' (e.g., 美しいです -> 美しい)
return group_index, mrph_index_offset + mrph_index - 1
if mrph.hinsi == '判定詞' \
and mrph.midasi == 'じゃ' \
and 0 < mrph_index \
and '<活用語>' in bp.morphemes[mrph_index - 1].fstring:
# adjective or verb +'じゃん' -> ignore 'じゃん' (e.g., 使えないじゃん -> 使えない)
return group_index, mrph_index_offset + mrph_index - 1
if ('<活用語>' in mrph.fstring or '<用言意味表記末尾>' in mrph.fstring) \
and mrph.genkei not in {'のだ', 'んだ'}:
# Check the last word with conjugation except some meaningless words.
return group_index, mrph_index_offset + mrph_index
mrph_index_offset += len(bp.morphemes)
return len(grouped_bps) - 1, sum(len(bp.tag.mrph_list()) for bp in grouped_bps[-1]) - 1
@staticmethod
def _get_additional_texts(
grouped_bps: List[List[BasePhrase]],
grouped_mrphs: List[List[Morpheme_]],
mode: str,
add_mark: bool,
normalize: bool,
truncated_pos: Tuple[int, int],
include_modifiers: bool,
exclude_exophora: bool
) -> Dict[Tuple[int, int, str], str]:
"""Get a mapping from a position to a mark.
Args:
grouped_bps: A list of base phrases grouped by bunsetsu IDs.
grouped_mrphs: A list of morphemes grouped by bunsetsu IDs.
mode: A type of token representation, which can take either "mrphs" or "reps".
add_mark: If true, add special marks.
normalize: If true, the last content word will be normalized.
truncated_pos: A position just before adjunct words start.
include_modifiers: If true, tokens of events that modify this event will be included.
exclude_exophora: If true, exophora will not be used.
Returns:
A mapping from positions to marks.
"""
additional_texts: Dict[Tuple[int, int, str], str] = {} # (group_index, mrph_index, "start" or "end") -> text
def event_str(event: 'Event') -> str:
return event._to_text(
mode,
truncate=False,
add_mark=True,
exclude_exophora=exclude_exophora,
include_modifiers=include_modifiers,
exclude_adnominal=True
).replace(' (', '').replace(')', '')
last_tid = -1
for group_index, (bps, mrphs) in enumerate(zip(grouped_bps, grouped_mrphs)):
start_pos = (group_index, 0, 'start')
end_pos = (group_index, len(mrphs) - 1, 'end')
is_omitted = any(bp.omitted_case for bp in bps)
if is_omitted:
additional_texts[start_pos] = '['
additional_texts[end_pos] = ']'
continue
if not add_mark and not include_modifiers:
continue
adnominal_events = sorted(
[event for bp in bps for event in bp.adnominal_events],
key=lambda e: e.evid
)
if adnominal_events:
if include_modifiers:
additional_texts[start_pos] = ' '.join(event_str(event) for event in adnominal_events)
else:
additional_texts[start_pos] = '▼'
sentential_complement_events = sorted(
[event for bp in bps for event in bp.sentential_complement_events],
key=lambda e: e.evid
)
if sentential_complement_events:
if include_modifiers:
additional_texts[start_pos] = ' '.join(event_str(event) for event in sentential_complement_events)
else:
additional_texts[start_pos] = '■'
mrph_index = 0
for bp in bps:
pos = (group_index, mrph_index, 'start')
if last_tid != -1 and last_tid + 1 != bp.tid and pos not in additional_texts:
additional_texts[group_index, mrph_index, 'start'] = '|'
last_tid = bp.tid
mrph_index += len(bp.tag.mrph_list())
last_pos = (len(grouped_mrphs) - 1, len(grouped_mrphs[-1]) - 1)
if add_mark and not normalize and truncated_pos != last_pos:
additional_texts[(truncated_pos[0], truncated_pos[1], 'end')] = '('
additional_texts[(len(grouped_mrphs) - 1, len(grouped_mrphs[-1]) - 1, 'end')] = ')'
return additional_texts
@staticmethod
def _format_grouped_mrphs(
grouped_mrphs: List[List[Morpheme_]],
mode: str, normalize: bool,
additional_texts: Dict[Tuple[int, int, str], str]
) -> str:
"""Format a list of morphemes grouped by bunsetsu IDs to create a text.
Args:
grouped_mrphs: A list of morphemes grouped by bunsetsu IDs.
mode: A type of token representation, which can take either "mrphs" or "reps".
normalize: If true, the last content word will be normalized.
additional_texts: A mapping from positions to marks.
"""
assert mode in {'mrphs', 'reps'}
ret = []
for group_index, mrphs in enumerate(grouped_mrphs):
for mrph_index, mrph in enumerate(mrphs):
if (group_index, mrph_index, 'start') in additional_texts:
ret.append(additional_texts[(group_index, mrph_index, 'start')])
if isinstance(mrph, str):
if mrph in PAS_ORDER:
case = convert_katakana_to_hiragana(mrph)
ret.append(case if mode == 'mrphs' else f'{case}/{case}')
else:
ret.append(mrph)
else:
if mode == 'reps':
ret.append(mrph.repname or f'{mrph.midasi}/{mrph.midasi}')
else:
if normalize and (group_index, mrph_index) == (len(grouped_mrphs) - 1, len(mrphs) - 1):
if mrph.hinsi == '助動詞' and mrph.genkei == 'ぬ':
# Exception: prevent transforming "できません" into "できませぬ".
ret.append(mrph.midasi)
else:
ret.append(mrph.genkei)
else:
ret.append(mrph.midasi)
if (group_index, mrph_index, 'end') in additional_texts:
ret.append(additional_texts[(group_index, mrph_index, 'end')])
return ' '.join(ret).replace('[ ', '[').replace(' ]', ']').replace('( ', '(').replace(' )', ')')
[docs] def to_dict(self) -> dict:
"""Convert this object into a dictionary."""
return dict(
event_id=self.evid,
sid=self.sid,
ssid=self.ssid,
rel=[r.to_dict() for r in self.outgoing_relations],
surf=self.surf,
surf_with_mark=self.surf_with_mark,
mrphs=self.mrphs,
mrphs_with_mark=self.mrphs_with_mark,
normalized_mrphs=self.normalized_mrphs,
normalized_mrphs_with_mark=self.normalized_mrphs_with_mark,
normalized_mrphs_without_exophora=self.normalized_mrphs_without_exophora,
normalized_mrphs_with_mark_without_exophora=self.normalized_mrphs_with_mark_without_exophora,
reps=self.reps,
reps_with_mark=self.reps_with_mark,
normalized_reps=self.normalized_reps,
normalized_reps_with_mark=self.normalized_reps_with_mark,
content_rep_list=self.content_rep_list,
pas=self.pas.to_dict(),
features=self.features.to_dict()
)
[docs] def to_string(self) -> str:
"""Convert this object into a string."""
return f'<Event, evid: {self.evid}, surf: {self.surf}>'
class EventBuilder(Builder):
def __call__(self, sentence: 'Sentence', start: Tag, head: Tag, end: Tag):
logger.debug('Create an event')
event = Event(sentence, Builder.evid, sentence.sid, sentence.ssid, start, head, end)
PASBuilder()(event)
FeaturesBuilder()(event)
sentence.events.append(event)
Builder.evid += 1
# Make this sentence and its components accessible from builders.
for tid in range(start.tag_id, end.tag_id + 1):
Builder.stid_event_map[(sentence.ssid, tid)] = event
logger.debug('Successfully created a event.')
return event
class JsonEventBuilder(Builder):
def __call__(self, sentence: 'Sentence', dump: dict) -> Event:
logger.debug('Create an event')
event = Event(sentence, Builder.evid, sentence.sid, sentence.ssid)
event._surf = dump['surf']
event._surf_with_mark = dump['surf_with_mark']
event._mrphs = dump['mrphs']
event._mrphs_with_mark = dump['mrphs_with_mark']
event._normalized_mrphs = dump['normalized_mrphs']
event._normalized_mrphs_with_mark = dump['normalized_mrphs_with_mark']
event._normalized_mrphs_without_exophora = dump['normalized_mrphs_without_exophora']
event._normalized_mrphs_with_mark_without_exophora = dump['normalized_mrphs_with_mark_without_exophora']
event._reps = dump['reps']
event._reps_with_mark = dump['reps_with_mark']
event._normalized_reps = dump['normalized_reps']
event._normalized_reps_with_mark = dump['normalized_reps_with_mark']
event._content_rep_list = dump['content_rep_list']
JsonPASBuilder()(event, dump['pas'])
JsonFeaturesBuilder()(event, dump['features'])
sentence.events.append(event)
Builder.evid += 1
# Make this sentence and its components accessible from builders.
Builder.evid_event_map[event.evid] = event
logger.debug('Successfully created a event.')
return event