import dataclasses
import logging
import math
import os
from typing import Any, Callable, Dict, List, Optional, Tuple
import torch
from datasets import Dataset
from torch.utils.data import DataLoader
from tqdm.autonotebook import tqdm
from transformers import (
EvalPrediction,
TrainerCallback,
TrainingArguments,
)
from transformers import Trainer as TransformersTrainer
from transformers.trainer_utils import PredictionOutput
from span_marker.evaluation import compute_f1_via_seqeval
from span_marker.label_normalizer import AutoLabelNormalizer, LabelNormalizer
from span_marker.model_card import ModelCardCallback
from span_marker.modeling import SpanMarkerModel
from span_marker.tokenizer import SpanMarkerTokenizer
logger = logging.getLogger(__name__)
[docs]class Trainer(TransformersTrainer):
"""
Trainer is a simple but feature-complete training and eval loop for SpanMarker,
built tightly on top of the 🤗 Transformers :external:doc:`Trainer <main_classes/trainer>`.
Args:
model (Optional[SpanMarkerModel]):
The model to train, evaluate or use for predictions. If not provided, a ``model_init`` must be passed.
args (Optional[~transformers.TrainingArguments]):
The arguments to tweak for training. Will default to a basic instance of :class:`~transformers.TrainingArguments` with the
``output_dir`` set to a directory named *models/my_span_marker_model* in the current directory if not provided.
train_dataset (Optional[~datasets.Dataset]):
The dataset to use for training. Must contain ``tokens`` and ``ner_tags`` columns, and may contain
``document_id`` and ``sentence_id`` columns for document-level context during training.
eval_dataset (Optional[~datasets.Dataset]):
The dataset to use for evaluation. Must contain ``tokens`` and ``ner_tags`` columns, and may contain
``document_id`` and ``sentence_id`` columns for document-level context during evaluation.
model_init (Optional[Callable[[], SpanMarkerModel]]):
A function that instantiates the model to be used. If provided, each call to :meth:`Trainer.train` will start
from a new instance of the model as given by this function.
The function may have zero argument, or a single one containing the optuna/Ray Tune/SigOpt trial object, to
be able to choose different architectures according to hyper parameters (such as layer count, sizes of
inner layers, dropout probabilities etc).
compute_metrics (Optional[Callable[[~transformers.EvalPrediction], Dict]]):
The function that will be used to compute metrics at evaluation. Must take a :class:`~transformers.EvalPrediction` and return
a dictionary string to metric values.
callbacks (Optional[List[~transformers.TrainerCallback]]):
A list of callbacks to customize the training loop. Will add those to the list of default callbacks
detailed in the Hugging Face :external:doc:`Callback documentation <main_classes/callback>`.
If you want to remove one of the default callbacks used, use the :meth:`~Trainer.remove_callback` method.
optimizers (Tuple[Optional[~torch.optim.Optimizer], Optional[~torch.optim.lr_scheduler.LambdaLR]]): A tuple
containing the optimizer and the scheduler to use. Will default to an instance of ``AdamW`` on your model
and a scheduler given by ``get_linear_schedule_with_warmup`` controlled by ``args``.
preprocess_logits_for_metrics (Optional[Callable[[~torch.Tensor, ~torch.Tensor], ~torch.Tensor]]):
A function that preprocess the logits right before caching them at each evaluation step. Must take two
tensors, the logits and the labels, and return the logits once processed as desired. The modifications made
by this function will be reflected in the predictions received by ``compute_metrics``.
Note that the labels (second parameter) will be ``None`` if the dataset does not have them.
Important attributes:
- **model** -- Always points to the core model.
- **model_wrapped** -- Always points to the most external model in case one or more other modules wrap the
original model. This is the model that should be used for the forward pass. For example, under ``DeepSpeed``,
the inner model is wrapped in ``DeepSpeed`` and then again in :class:`torch.nn.DistributedDataParallel`. If the
inner model hasn't been wrapped, then ``self.model_wrapped`` is the same as ``self.model``.
- **is_model_parallel** -- Whether or not a model has been switched to a model parallel mode (different from
data parallelism, this means some of the model layers are split on different GPUs).
- **place_model_on_device** -- Whether or not to automatically place the model on the device - it will be set
to ``False`` if model parallel or deepspeed is used, or if the default
`TrainingArguments.place_model_on_device` is overridden to return ``False``.
- **is_in_train** -- Whether or not a model is currently running :meth:`~Trainer.train` (e.g. when ``evaluate`` is called while
in ``train``)
"""
REQUIRED_COLUMNS: Tuple[str] = ("tokens", "ner_tags")
OPTIONAL_COLUMNS: Tuple[str] = ("document_id", "sentence_id")
def __init__(
self,
model: Optional[SpanMarkerModel] = None,
args: Optional[TrainingArguments] = None,
train_dataset: Optional[Dataset] = None,
eval_dataset: Optional[Dataset] = None,
model_init: Callable[[], SpanMarkerModel] = None,
compute_metrics: Optional[Callable[[EvalPrediction], Dict]] = None,
callbacks: Optional[List[TrainerCallback]] = None,
optimizers: Tuple[Optional[torch.optim.Optimizer], Optional[torch.optim.lr_scheduler.LambdaLR]] = (None, None),
preprocess_logits_for_metrics: Optional[Callable[[torch.Tensor, torch.Tensor], torch.Tensor]] = None,
) -> None:
# Extract the model from an initializer function
if model_init:
self.model_init = model_init
model = self.call_model_init()
# To convert dataset labels to a common format (list of label-start-end tuples)
self.label_normalizer = AutoLabelNormalizer.from_config(model.config)
# Set some Training arguments that must be set for SpanMarker
if args is None:
args = TrainingArguments(
output_dir="models/my_span_marker_model", include_inputs_for_metrics=True, remove_unused_columns=False
)
else:
args = dataclasses.replace(args, include_inputs_for_metrics=True, remove_unused_columns=False)
# Always compute `compute_f1_via_seqeval` - optionally compute user-provided metrics
if compute_metrics is not None:
compute_metrics_func = lambda eval_prediction: {
**compute_f1_via_seqeval(model.tokenizer, eval_prediction, self.is_in_train),
**compute_metrics(eval_prediction),
}
else:
compute_metrics_func = lambda eval_prediction: compute_f1_via_seqeval(
model.tokenizer, eval_prediction, self.is_in_train
)
# If the model ID is set via the TrainingArguments, but not via the SpanMarkerModelCardData,
# then we can set it here for the model card regardless
if args.hub_model_id and not model.model_card_data.model_id:
model.model_card_data.model_id = args.hub_model_id
if not model.model_card_data.dataset_id:
# Inferring is hacky - it may break in the future, so let's be safe
try:
model.model_card_data.infer_dataset_id(train_dataset)
except Exception:
pass
super().__init__(
model=model,
args=args,
data_collator=model.data_collator,
train_dataset=train_dataset,
eval_dataset=eval_dataset,
tokenizer=model.tokenizer,
model_init=None,
compute_metrics=compute_metrics_func,
callbacks=callbacks,
optimizers=optimizers,
preprocess_logits_for_metrics=preprocess_logits_for_metrics,
)
# We have to provide the __init__ with None for model_init and then override it here again
# We do this because we need `model` to already be defined in this SpanMarker Trainer class
# and the Transformers Trainer would complain if we provide both a model and a model_init
# in its __init__.
self.model_init = model_init
# Override the type hint
self.model: SpanMarkerModel
# Add the callback for filling the model card data with hyperparameters
# and evaluation results
self.add_callback(ModelCardCallback(self))
[docs] def preprocess_dataset(
self,
dataset: Dataset,
label_normalizer: LabelNormalizer,
tokenizer: SpanMarkerTokenizer,
dataset_name: str = "train",
is_evaluate: bool = False,
) -> Dataset:
"""Normalize the ``ner_tags`` labels and call tokenizer on ``tokens``.
Args:
dataset (~datasets.Dataset): A Hugging Face dataset with ``tokens`` and ``ner_tags`` columns.
label_normalizer (LabelNormalizer): A callable that normalizes ``ner_tags`` into start-end-label tuples.
tokenizer (SpanMarkerTokenizer): The tokenizer responsible for tokenizing ``tokens`` into input IDs,
and adding start and end markers.
dataset_name (str, optional): The name of the dataset. Defaults to "train".
is_evaluate (bool, optional): Whether to return the number of words for each sample.
Required for evaluation. Defaults to False.
Raises:
ValueError: If the ``dataset`` does not contain ``tokens`` and ``ner_tags`` columns.
Returns:
Dataset: The normalized and tokenized version of the input dataset.
"""
for column in self.REQUIRED_COLUMNS:
if column not in dataset.column_names:
raise ValueError(f"The {dataset_name} dataset must contain a {column!r} column.")
# Drop all unused columns, only keep "tokens", "ner_tags", "document_id", "sentence_id"
dataset = dataset.remove_columns(
set(dataset.column_names) - set(self.OPTIONAL_COLUMNS) - set(self.REQUIRED_COLUMNS)
)
# Normalize the labels to a common format (list of label-start-end tuples)
# Also add "entity_count" and "word_count" labels
dataset = dataset.map(
label_normalizer,
input_columns=("tokens", "ner_tags"),
desc=f"Label normalizing the {dataset_name} dataset",
batched=True,
)
# Setting model card data based on training data
if not is_evaluate:
# Pick some example entities from each entity class for the model card.
if not self.model.model_card_data.label_example_list:
self.model.model_card_data.set_label_examples(
dataset, self.model.config.id2label, self.model.config.outside_id
)
if not self.model.model_card_data.train_set_metrics_list:
self.model.model_card_data.set_train_set_metrics(dataset)
# Set some example sentences for the model card widget
if is_evaluate and not self.model.model_card_data.widget:
self.model.model_card_data.set_widget_examples(dataset)
# Remove dataset columns that are only used for model card
dataset = dataset.remove_columns(("entity_count", "word_count"))
# Tokenize and add start/end markers
with tokenizer.entity_tracker(split=dataset_name):
dataset = dataset.map(
tokenizer,
batched=True,
remove_columns=set(dataset.column_names) - set(self.OPTIONAL_COLUMNS),
desc=f"Tokenizing the {dataset_name} dataset",
fn_kwargs={"return_num_words": is_evaluate},
)
# If "document_id" AND "sentence_id" exist in the training dataset
if {"document_id", "sentence_id"} <= set(dataset.column_names):
# If training, set the config flag that this model is trained with document context
if not is_evaluate:
self.model.config.trained_with_document_context = True
# If evaluating and the model was not trained with document context, warn
elif not self.model.config.trained_with_document_context:
logger.warning(
"This model was trained without document-level context: "
"evaluation with document-level context may cause decreased performance."
)
dataset = dataset.sort(column_names=["document_id", "sentence_id"])
dataset = self.add_context(
dataset,
tokenizer.model_max_length,
max_prev_context=self.model.config.max_prev_context,
max_next_context=self.model.config.max_next_context,
)
elif is_evaluate and self.model.config.trained_with_document_context:
logger.warning(
"This model was trained with document-level context: "
"evaluation without document-level context may cause decreased performance."
)
# Spread between multiple samples where needed
original_length = len(dataset)
dataset = dataset.map(
Trainer.spread_sample,
batched=True,
desc="Spreading data between multiple samples",
fn_kwargs={
"model_max_length": tokenizer.model_max_length,
"marker_max_length": self.model.config.marker_max_length,
},
)
new_length = len(dataset)
logger.info(
f"Spread {original_length} sentences across {new_length} samples, "
f"a {(new_length / original_length) - 1:%} increase. You can increase "
"`model_max_length` or `marker_max_length` to decrease the number of samples, "
"but recognize that longer samples are slower."
)
return dataset
[docs] @staticmethod
def add_context(
dataset: Dataset,
model_max_length: int,
max_prev_context: Optional[int] = None,
max_next_context: Optional[int] = None,
show_progress_bar: bool = True,
) -> Dataset:
"""Add document-level context from previous and next sentences in the same document.
Args:
dataset (`Dataset`): The partially processed dataset, containing `"input_ids"`, `"start_position_ids"`,
`"end_position_ids"`, `"document_id"` and `"sentence_id"` columns.
model_max_length (`int`): The total number of tokens that can be processed before
truncation.
max_prev_context (`Optional[int]`): The maximum number of previous sentences to include. Defaults to None,
representing as many previous sentences as fits.
max_next_context (`Optional[int]`): The maximum number of next sentences to include. Defaults to None,
representing as many previous sentences as fits.
show_progress_bar (`bool`): Whether to show a progress bar. Defaults to `True`.
Returns:
Dataset: A copy of the Dataset with additional previous and next sentences added to input_ids.
"""
all_input_ids = []
all_start_position_ids = []
all_end_position_ids = []
for sample_idx, sample in tqdm(
enumerate(dataset),
desc="Adding document-level context",
total=len(dataset),
leave=False,
disable=not show_progress_bar,
):
# Sequentially add next context, previous context, next context, previous context, etc. until
# max token length or max_prev/next_context
tokens = sample["input_ids"][1:-1]
start_position_ids = sample["start_position_ids"]
end_position_ids = sample["end_position_ids"]
next_context_added = 0
prev_context_added = 0
remaining_space = model_max_length - len(tokens) - 2
while remaining_space > 0:
next_context_index = sample_idx + next_context_added + 1
should_add_next = (
(max_next_context is None or next_context_added < max_next_context)
and next_context_index < len(dataset)
and dataset[next_context_index]["document_id"] == sample["document_id"]
)
if should_add_next:
# TODO: [1:-1][:remaining_space] is not efficient
tokens += dataset[next_context_index]["input_ids"][1:-1][:remaining_space]
next_context_added += 1
remaining_space = model_max_length - len(tokens) - 2
if remaining_space <= 0:
break
prev_context_index = sample_idx - prev_context_added - 1
should_add_prev = (
(max_prev_context is None or prev_context_added < max_prev_context)
and prev_context_index >= 0
and dataset[prev_context_index]["document_id"] == sample["document_id"]
)
if should_add_prev:
# TODO: [1:-1][remaining_space:] is not efficient
prepended_tokens = dataset[prev_context_index]["input_ids"][1:-1][-remaining_space:]
tokens = prepended_tokens + tokens
# TODO: Use numpy? np.array(sample["start_position_ids"]) + len(prepended_tokens)
start_position_ids = [index + len(prepended_tokens) for index in start_position_ids]
end_position_ids = [index + len(prepended_tokens) for index in end_position_ids]
prev_context_added += 1
if not should_add_next and not should_add_prev:
break
remaining_space = model_max_length - len(tokens) - 2
all_input_ids.append([sample["input_ids"][0]] + tokens + [sample["input_ids"][-1]])
all_start_position_ids.append(start_position_ids)
all_end_position_ids.append(end_position_ids)
dataset = dataset.remove_columns(("input_ids", "start_position_ids", "end_position_ids"))
dataset = dataset.add_column("input_ids", all_input_ids)
dataset = dataset.add_column("start_position_ids", all_start_position_ids)
dataset = dataset.add_column("end_position_ids", all_end_position_ids)
return dataset
[docs] @staticmethod
def spread_sample(
batch: Dict[str, List[Any]], model_max_length: int, marker_max_length: int
) -> Dict[str, List[Any]]:
"""Spread sentences between multiple samples if lack of space per sample requires it.
Args:
batch (`Dict[str, List[Any]]`): A dictionary of dataset keys to lists of values.
model_max_length (`int`): The total number of tokens that can be processed before
truncation.
marker_max_length (`int`): The maximum length for each of the span markers. A value of 128
means that each training and inferencing sample contains a maximum of 128 start markers
and 128 end markers, for a total of 256 markers per sample.
Returns:
Dict[str, List[Any]]: A dictionary of dataset keys to lists of values.
"""
keys = batch.keys()
values = batch.values()
total_sample_length = model_max_length + 2 * marker_max_length
batch_samples = {key: [] for key in keys}
for sample in zip(*values):
sample = dict(zip(keys, sample))
sample_marker_space = (total_sample_length - len(sample["input_ids"])) // 2
spread_between_n = math.ceil(len(sample["start_position_ids"]) / sample_marker_space)
for i in range(spread_between_n):
sample_copy = sample.copy()
start = i * sample_marker_space
end = (i + 1) * sample_marker_space
sample_copy["start_position_ids"] = sample["start_position_ids"][start:end]
sample_copy["end_position_ids"] = sample["end_position_ids"][start:end]
if "labels" in sample:
sample_copy["labels"] = sample["labels"][start:end]
sample_copy["num_spans"] = len(sample_copy["start_position_ids"])
for key, value in sample_copy.items():
batch_samples[key].append(value)
return batch_samples
def get_train_dataloader(self) -> DataLoader:
"""Return the preprocessed training DataLoader."""
self.train_dataset = self.preprocess_dataset(self.train_dataset, self.label_normalizer, self.tokenizer)
return super().get_train_dataloader()
def get_eval_dataloader(self, eval_dataset: Optional[Dataset] = None) -> DataLoader:
"""Return the preprocessed evaluation DataLoader."""
eval_dataset = eval_dataset or self.eval_dataset
if eval_dataset is not None:
eval_dataset = self.preprocess_dataset(
eval_dataset, self.label_normalizer, self.tokenizer, dataset_name="evaluation", is_evaluate=True
)
return super().get_eval_dataloader(eval_dataset)
def get_test_dataloader(self, test_dataset: Dataset) -> DataLoader:
"""Return the preprocessed evaluation DataLoader."""
test_dataset = self.preprocess_dataset(
test_dataset, self.label_normalizer, self.tokenizer, dataset_name="test", is_evaluate=True
)
return super().get_test_dataloader(test_dataset)
def predict(
self, test_dataset: Dataset, ignore_keys: Optional[List[str]] = None, metric_key_prefix: str = "test"
) -> PredictionOutput:
logger.warning(
f"`Trainer.predict` is not recommended for a {self.model.__class__.__name__}. "
f"Consider using `{self.model.__class__.__name__}.predict` instead."
)
return super().predict(test_dataset, ignore_keys, metric_key_prefix)
[docs] def create_model_card(self, *_args, **_kwargs) -> None:
"""
Creates a draft of a model card using the information available to the `Trainer`,
the `SpanMarkerModel` and the `SpanMarkerModelCardData`.
"""
with open(os.path.join(self.args.output_dir, "README.md"), "w", encoding="utf8") as f:
f.write(self.model.generate_model_card())