Skip to content

Commit

Permalink
#134 fixed. #136 implemented, not switched to batching mode.
Browse files Browse the repository at this point in the history
  • Loading branch information
nicolay-r committed Jan 4, 2024
1 parent 41a7ec6 commit 4f3726f
Show file tree
Hide file tree
Showing 25 changed files with 231 additions and 186 deletions.
Empty file added arelight/download.py
Empty file.
4 changes: 2 additions & 2 deletions arelight/pipelines/data/annot_pairs_nolabel.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


def create_neutral_annotation_pipeline(synonyms, dist_in_terms_bound, terms_per_context,
doc_provider, text_parser, dist_in_sentences=0):
doc_provider, text_pipeline, dist_in_sentences=0):

nolabel_annotator = AlgorithmBasedTextOpinionAnnotator(
value_to_group_id_func=lambda value:
Expand All @@ -28,7 +28,7 @@ def create_neutral_annotation_pipeline(synonyms, dist_in_terms_bound, terms_per_

annotation_pipeline = text_opinion_extraction_pipeline(
entity_index_func=lambda indexed_entity: indexed_entity.ID,
text_parser=text_parser,
pipeline_items=text_pipeline,
get_doc_by_id_func=doc_provider.by_id,
annotators=[
nolabel_annotator
Expand Down
19 changes: 9 additions & 10 deletions arelight/pipelines/items/backend_d3js_graphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from arekit.common.experiment.data_type import DataType
from arekit.common.labels.scaler.base import BaseLabelScaler
from arekit.common.labels.str_fmt import StringLabelsFormatter
from arekit.common.pipeline.context import PipelineContext
from arekit.common.pipeline.items.base import BasePipelineItem

from arelight.arekit.parse_predict import iter_predicted_labels
Expand All @@ -20,7 +19,8 @@

class D3jsGraphsBackendPipelineItem(BasePipelineItem):

def __init__(self, graph_min_links=0.01, graph_a_labels=None, weights=True):
def __init__(self, graph_min_links=0.01, graph_a_labels=None, weights=True, **kwargs):
super(D3jsGraphsBackendPipelineItem, self).__init__(**kwargs)
self.__graph_min_links = graph_min_links

# Setup filters for the A and B graphs for further operations application.
Expand Down Expand Up @@ -58,19 +58,18 @@ def iter_column_value(self, samples, column_value):
yield parsed_row[column_value]

def apply_core(self, input_data, pipeline_ctx):
assert(isinstance(input_data, PipelineContext))

predict_filepath = input_data.provide("predict_filepath")
result_reader = input_data.provide("predict_reader")
labels_fmt = input_data.provide("labels_formatter")
predict_filepath = pipeline_ctx.provide("predict_filepath")
result_reader = pipeline_ctx.provide("predict_reader")
labels_fmt = pipeline_ctx.provide("labels_formatter")
assert(isinstance(labels_fmt, StringLabelsFormatter))
labels_scaler = input_data.provide("labels_scaler")
labels_scaler = pipeline_ctx.provide("labels_scaler")
assert(isinstance(labels_scaler, BaseLabelScaler))
predict_storage = result_reader.read(predict_filepath)
assert(isinstance(predict_storage, BaseRowsStorage))

# Reading samples.
samples_io = input_data.provide("samples_io")
samples_io = pipeline_ctx.provide("samples_io")
samples_filepath = samples_io.create_target(data_type=DataType.Test)
samples = samples_io.Reader.read(samples_filepath)

Expand All @@ -90,5 +89,5 @@ def apply_core(self, input_data, pipeline_ctx):
weights=self.__graph_weights)

# Saving graph as the collection name for it.
input_data.update("d3js_graph_a", value=graph)
input_data.update("d3js_collection_name", value=samples_io.Prefix)
pipeline_ctx.update("d3js_graph_a", value=graph)
pipeline_ctx.update("d3js_collection_name", value=samples_io.Prefix)
25 changes: 12 additions & 13 deletions arelight/pipelines/items/backend_d3js_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

from arekit.common.data.rows_fmt import create_base_column_fmt
from arekit.common.labels.str_fmt import StringLabelsFormatter
from arekit.common.pipeline.context import PipelineContext
from arekit.common.pipeline.items.base import BasePipelineItem

from arelight.backend.d3js.relations_graph_operations import graphs_operations
Expand All @@ -17,21 +16,21 @@

class D3jsGraphOperationsBackendPipelineItem(BasePipelineItem):

def __init__(self):
def __init__(self, **kwargs):
# Parameters for sampler.
super(D3jsGraphOperationsBackendPipelineItem, self).__init__(**kwargs)
self.__column_fmts = [create_base_column_fmt(fmt_type="parser")]

def apply_core(self, input_data, pipeline_ctx):
assert(isinstance(input_data, PipelineContext))

graph_a = input_data.provide_or_none("d3js_graph_a")
graph_b = input_data.provide_or_none("d3js_graph_b")
op = input_data.provide_or_none("d3js_graph_operations")
weights = input_data.provide_or_none("d3js_graph_weights")
target_dir = input_data.provide("d3js_graph_output_dir")
collection_name = input_data.provide("d3js_collection_name")
labels_fmt = input_data.provide("labels_formatter")
host_port = input_data.provide_or_none("d3js_host")

graph_a = pipeline_ctx.provide_or_none("d3js_graph_a")
graph_b = pipeline_ctx.provide_or_none("d3js_graph_b")
op = pipeline_ctx.provide_or_none("d3js_graph_operations")
weights = pipeline_ctx.provide_or_none("d3js_graph_weights")
target_dir = pipeline_ctx.provide("d3js_graph_output_dir")
collection_name = pipeline_ctx.provide("d3js_collection_name")
labels_fmt = pipeline_ctx.provide("labels_formatter")
host_port = pipeline_ctx.provide_or_none("d3js_host")
assert(isinstance(labels_fmt, StringLabelsFormatter))

graph = graphs_operations(graph_A=graph_a, graph_B=graph_b, operation=op, weights=weights) \
Expand All @@ -49,7 +48,7 @@ def apply_core(self, input_data, pipeline_ctx):
save_demo_page(target_dir=target_dir,
collection_name=collection_name,
host_root_path=f"http://localhost:{host_port}/" if host_port is not None else "./",
desc_name=input_data.provide_or_none("d3js_collection_description"),
desc_name=pipeline_ctx.provide_or_none("d3js_collection_description"),
desc_labels={label_type.__name__: labels_fmt.label_to_str(label_type())
for label_type in labels_fmt._stol.values()})

Expand Down
4 changes: 2 additions & 2 deletions arelight/pipelines/items/entities_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@

class TextEntitiesParser(BasePipelineItem):

def __init__(self, id_assigner, display_value_func=None):
def __init__(self, id_assigner, display_value_func=None, **kwargs):
assert(isinstance(id_assigner, IdAssigner))
assert(callable(display_value_func) or display_value_func is None)
super(TextEntitiesParser, self).__init__()
super(TextEntitiesParser, self).__init__(**kwargs)
self.__id_assigner = id_assigner
self.__disp_value_func = display_value_func

Expand Down
10 changes: 4 additions & 6 deletions arelight/pipelines/items/entities_ner_dp.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,25 @@

class DeepPavlovNERPipelineItem(SentenceObjectsParserPipelineItem):

def __init__(self, id_assigner, ner_model_name, obj_filter=None, chunk_limit=128, display_value_func=None):
def __init__(self, id_assigner, ner_model_name, obj_filter=None,
chunk_limit=128, display_value_func=None, **kwargs):
""" chunk_limit: int
length of text part in words that is going to be provided in input.
"""
assert(callable(obj_filter) or obj_filter is None)
assert(isinstance(chunk_limit, int) and chunk_limit > 0)
assert(isinstance(id_assigner, IdAssigner))
assert(callable(display_value_func) or display_value_func is None)
super(DeepPavlovNERPipelineItem, self).__init__(partitioning=TermsPartitioning(), **kwargs)

# Initialize bert-based model instance.
self.__dp_ner = DeepPavlovNER(ner_model_name)
self.__obj_filter = obj_filter
self.__chunk_limit = chunk_limit
self.__id_assigner = id_assigner
self.__disp_value_func = display_value_func
super(DeepPavlovNERPipelineItem, self).__init__(TermsPartitioning())

def _get_parts_provider_func(self, input_data, pipeline_ctx):
def _get_parts_provider_func(self, input_data):
return self.__iter_subs_values_with_bounds(input_data)

def __iter_subs_values_with_bounds(self, terms_list):
Expand Down Expand Up @@ -65,6 +66,3 @@ def __iter_parsed_entities(self, processed_sequences, chunk_terms_list, chunk_of
value=value, e_type=s_obj.ObjectType, entity_id=self.__id_assigner.get_id(),
display_value=self.__disp_value_func(value) if self.__disp_value_func is not None else None)
yield entity, Bound(pos=chunk_offset + s_obj.Position, length=s_obj.Length)

def apply_core(self, input_data, pipeline_ctx):
return super(DeepPavlovNERPipelineItem, self).apply_core(input_data=input_data, pipeline_ctx=pipeline_ctx)
12 changes: 3 additions & 9 deletions arelight/pipelines/items/entities_ner_transformers.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,26 @@
from arekit.common.bound import Bound
from arekit.common.docs.objects_parser import SentenceObjectsParserPipelineItem
from arekit.common.text.partitioning.str import StringPartitioning

from arelight.pipelines.items.entity import IndexedEntity
from arelight.utils import IdAssigner, auto_import


class TransformersNERPipelineItem(SentenceObjectsParserPipelineItem):

def __init__(self, id_assigner, ner_model_name, device, obj_filter=None, display_value_func=None):
def __init__(self, id_assigner, ner_model_name, device, obj_filter=None, display_value_func=None, **kwargs):
""" chunk_limit: int
length of text part in words that is going to be provided in input.
"""
assert(callable(obj_filter) or obj_filter is None)
assert(isinstance(id_assigner, IdAssigner))
assert(callable(display_value_func) or display_value_func is None)
super(TransformersNERPipelineItem, self).__init__(**kwargs)

# Setup third-party modules.
model_init = auto_import("arelight.third_party.transformers.init_token_classification_model")
self.annotate_ner = auto_import("arelight.third_party.transformers.annotate_ner")

# Transformers-related parameters.

self.__device = device
self.__model, self.__tokenizer = model_init(model_path=ner_model_name, device=self.__device)

Expand All @@ -30,9 +29,7 @@ def __init__(self, id_assigner, ner_model_name, device, obj_filter=None, display
self.__id_assigner = id_assigner
self.__disp_value_func = display_value_func

super(TransformersNERPipelineItem, self).__init__(StringPartitioning())

def _get_parts_provider_func(self, input_data, pipeline_ctx):
def _get_parts_provider_func(self, input_data):
assert(isinstance(input_data, str))
parts = self.annotate_ner(model=self.__model, tokenizer=self.__tokenizer, text=input_data,
device=self.__device)
Expand All @@ -55,6 +52,3 @@ def __iter_parsed_entities(self, parts):
display_value=self.__disp_value_func(value) if self.__disp_value_func is not None else None)

yield entity, Bound(pos=p["start"], length=p["end"] - p["start"])

def apply_core(self, input_data, pipeline_ctx):
return super(TransformersNERPipelineItem, self).apply_core(input_data=input_data, pipeline_ctx=pipeline_ctx)
24 changes: 10 additions & 14 deletions arelight/pipelines/items/inference_bert_opennre.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
import json
import logging
import os
from os.path import exists, join

import logging
import torch

from arekit.common.experiment.data_type import DataType
from arekit.common.pipeline.context import PipelineContext
from arekit.common.pipeline.items.base import BasePipelineItem
from arekit.common.utils import download

from opennre.encoder import BERTEntityEncoder, BERTEncoder
from opennre.model import SoftmaxNN

from arelight.third_party.torch import sentence_re_loader
from arelight.utils import get_default_download_dir

from arelight.utils import get_default_download_dir, download

logger = logging.getLogger(__name__)

Expand All @@ -24,8 +20,9 @@ class BertOpenNREInferencePipelineItem(BasePipelineItem):

def __init__(self, pretrained_bert=None, checkpoint_path=None, device_type='cpu',
max_seq_length=128, pooler='cls', batch_size=10, tokenizers_parallelism=True,
table_name="contents", task_kwargs=None, predefined_ckpts=None):
table_name="contents", task_kwargs=None, predefined_ckpts=None, **kwargs):
assert(isinstance(tokenizers_parallelism, bool))
super(BertOpenNREInferencePipelineItem, self).__init__(**kwargs)

self.__model = None
self.__pretrained_bert = pretrained_bert
Expand Down Expand Up @@ -161,21 +158,20 @@ def __iter_predict_result(self, samples_filepath, batch_size):
return results_it, total

def apply_core(self, input_data, pipeline_ctx):
assert(isinstance(input_data, PipelineContext))

# Fetching the input data.
labels_scaler = input_data.provide("labels_scaler")
labels_scaler = pipeline_ctx.provide("labels_scaler")

# Try to obrain from the specific input variable.
samples_filepath = input_data.provide_or_none("opennre_samples_filepath")
samples_filepath = pipeline_ctx.provide_or_none("opennre_samples_filepath")
if samples_filepath is None:
samples_io = input_data.provide("samples_io")
samples_io = pipeline_ctx.provide("samples_io")
samples_filepath = samples_io.create_target(data_type=DataType.Test)

# Initialize model if the latter has not been yet.
if self.__model is None:

ckpt_dir = input_data.provide_or_none("opennre_ckpt_cache_dir")
ckpt_dir = pipeline_ctx.provide_or_none("opennre_ckpt_cache_dir")

self.__model = self.init_bert_model(
pretrain_path=self.__pretrained_bert,
Expand All @@ -189,5 +185,5 @@ def apply_core(self, input_data, pipeline_ctx):
dir_to_donwload=get_default_download_dir() if ckpt_dir is None else ckpt_dir)

iter_infer, total = self.__iter_predict_result(samples_filepath=samples_filepath, batch_size=self.__batch_size)
input_data.update("iter_infer", iter_infer)
input_data.update("iter_total", total)
pipeline_ctx.update("iter_infer", iter_infer)
pipeline_ctx.update("iter_total", total)
14 changes: 6 additions & 8 deletions arelight/pipelines/items/inference_writer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from arekit.common.pipeline.context import PipelineContext
from arekit.common.pipeline.items.base import BasePipelineItem

from arelight.predict_provider import BasePredictProvider
Expand All @@ -7,24 +6,23 @@

class InferenceWriterPipelineItem(BasePipelineItem):

def __init__(self, writer):
def __init__(self, writer, **kwargs):
assert(isinstance(writer, BasePredictWriter))
super(InferenceWriterPipelineItem, self).__init__(**kwargs)
self.__writer = writer

def apply_core(self, input_data, pipeline_ctx):
assert(isinstance(input_data, PipelineContext))

# Setup predicted result writer.
target = input_data.provide("predict_filepath")
print(target)
target = pipeline_ctx.provide("predict_filepath")

self.__writer.set_target(target)

# Gathering the content
title, contents_it = BasePredictProvider().provide(
sample_id_with_uint_labels_iter=input_data.provide("iter_infer"),
labels_count=input_data.provide("labels_scaler").LabelsCount)
sample_id_with_uint_labels_iter=pipeline_ctx.provide("iter_infer"),
labels_count=pipeline_ctx.provide("labels_scaler").LabelsCount)

with self.__writer:
self.__writer.write(title=title, contents_it=contents_it,
total=input_data.provide_or_none("iter_total"))
total=pipeline_ctx.provide_or_none("iter_total"))
4 changes: 1 addition & 3 deletions arelight/pipelines/items/serializer_arekit.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from arekit.common.pipeline.context import PipelineContext
from arekit.contrib.utils.pipelines.items.sampling.base import BaseSerializerPipelineItem


Expand All @@ -8,9 +7,8 @@ class AREkitSerializerPipelineItem(BaseSerializerPipelineItem):
"""

def apply_core(self, input_data, pipeline_ctx):
assert(isinstance(input_data, PipelineContext))
super(AREkitSerializerPipelineItem, self).apply_core(input_data=input_data,
pipeline_ctx=pipeline_ctx)

# Host samples into the result for further pipeline items.
input_data.update("samples_io", self._samples_io)
pipeline_ctx.update("samples_io", self._samples_io)
Loading

0 comments on commit 4f3726f

Please sign in to comment.