Viewing File: /home/ubuntu/codegamaai-test/rag_drive/regdrive/lib/python3.10/site-packages/unstructured/utils.py

import functools
import html
import importlib
import json
import os
import platform
import subprocess
from datetime import datetime
from functools import wraps
from itertools import combinations
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    Generic,
    Iterable,
    Iterator,
    List,
    Optional,
    Sequence,
    Tuple,
    TypeVar,
    Union,
    cast,
)

import requests
from typing_extensions import ParamSpec

from unstructured.__version__ import __version__

if TYPE_CHECKING:
    from unstructured.documents.elements import Text

DATE_FORMATS = ("%Y-%m-%d", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d+%H:%M:%S", "%Y-%m-%dT%H:%M:%S%z")

_T = TypeVar("_T")
_P = ParamSpec("_P")


def htmlify_matrix_of_cell_texts(matrix: Sequence[Sequence[str]]) -> str:
    """Form an HTML table from "rows" and "columns" of `matrix`.

    Character overhead is minimized:
    - No whitespace padding is added for human readability
    - No newlines ("\n") are added
    - No `<thead>`, `<tbody>`, or `<tfoot>` elements are used; we can't tell where those might be
      semantically appropriate anyway so at best they would consume unnecessary space and at worst
      would be misleading.
    """

    def iter_trs(rows_of_cell_strs: Sequence[Sequence[str]]) -> Iterator[str]:
        for row_cell_strs in rows_of_cell_strs:
            # -- suppress emission of rows with no cells --
            if not row_cell_strs:
                continue
            yield f"<tr>{''.join(iter_tds(row_cell_strs))}</tr>"

    def iter_tds(row_cell_strs: Sequence[str]) -> Iterator[str]:
        for s in row_cell_strs:
            # -- take care of things like '<' and '>' in the text --
            s = html.escape(s)
            # -- substitute <br/> elements for line-feeds in the text --
            s = "<br/>".join(s.split("\n"))
            # -- strip leading and trailing whitespace, wrap it up and go --
            yield f"<td>{s.strip()}</td>"

    return f"<table>{''.join(iter_trs(matrix))}</table>" if matrix else ""


class lazyproperty(Generic[_T]):
    """Decorator like @property, but evaluated only on first access.

    Like @property, this can only be used to decorate methods having only a `self` parameter, and
    is accessed like an attribute on an instance, i.e. trailing parentheses are not used. Unlike
    @property, the decorated method is only evaluated on first access; the resulting value is
    cached and that same value returned on second and later access without re-evaluation of the
    method.

    Like @property, this class produces a *data descriptor* object, which is stored in the __dict__
    of the *class* under the name of the decorated method ('fget' nominally). The cached value is
    stored in the __dict__ of the *instance* under that same name.

    Because it is a data descriptor (as opposed to a *non-data descriptor*), its `__get__()` method
    is executed on each access of the decorated attribute; the __dict__ item of the same name is
    "shadowed" by the descriptor.

    While this may represent a performance improvement over a property, its greater benefit may be
    its other characteristics. One common use is to construct collaborator objects, removing that
    "real work" from the constructor, while still only executing once. It also de-couples client
    code from any sequencing considerations; if it's accessed from more than one location, it's
    assured it will be ready whenever needed.

    Loosely based on: https://stackoverflow.com/a/6849299/1902513.

    A lazyproperty is read-only. There is no counterpart to the optional "setter" (or deleter)
    behavior of an @property. This is critically important to maintaining its immutability and
    idempotence guarantees. Attempting to assign to a lazyproperty raises AttributeError
    unconditionally.

    The parameter names in the methods below correspond to this usage example::

        class Obj(object)

            @lazyproperty
            def fget(self):
                return 'some result'

        obj = Obj()

    Not suitable for wrapping a function (as opposed to a method) because it is not callable.
    """

    def __init__(self, fget: Callable[..., _T]) -> None:
        """*fget* is the decorated method (a "getter" function).

        A lazyproperty is read-only, so there is only an *fget* function (a regular
        @property can also have an fset and fdel function). This name was chosen for
        consistency with Python's `property` class which uses this name for the
        corresponding parameter.
        """
        # --- maintain a reference to the wrapped getter method
        self._fget = fget
        # --- and store the name of that decorated method
        self._name = fget.__name__
        # --- adopt fget's __name__, __doc__, and other attributes
        functools.update_wrapper(self, fget)  # pyright: ignore

    def __get__(self, obj: Any, type: Any = None) -> _T:
        """Called on each access of 'fget' attribute on class or instance.

        *self* is this instance of a lazyproperty descriptor "wrapping" the property
        method it decorates (`fget`, nominally).

        *obj* is the "host" object instance when the attribute is accessed from an
        object instance, e.g. `obj = Obj(); obj.fget`. *obj* is None when accessed on
        the class, e.g. `Obj.fget`.

        *type* is the class hosting the decorated getter method (`fget`) on both class
        and instance attribute access.
        """
        # --- when accessed on class, e.g. Obj.fget, just return this descriptor
        # --- instance (patched above to look like fget).
        if obj is None:
            return self  # type: ignore

        # --- when accessed on instance, start by checking instance __dict__ for
        # --- item with key matching the wrapped function's name
        value = obj.__dict__.get(self._name)
        if value is None:
            # --- on first access, the __dict__ item will be absent. Evaluate fget()
            # --- and store that value in the (otherwise unused) host-object
            # --- __dict__ value of same name ('fget' nominally)
            value = self._fget(obj)
            obj.__dict__[self._name] = value
        return cast(_T, value)

    def __set__(self, obj: Any, value: Any) -> None:
        """Raises unconditionally, to preserve read-only behavior.

        This decorator is intended to implement immutable (and idempotent) object
        attributes. For that reason, assignment to this property must be explicitly
        prevented.

        If this __set__ method was not present, this descriptor would become a
        *non-data descriptor*. That would be nice because the cached value would be
        accessed directly once set (__dict__ attrs have precedence over non-data
        descriptors on instance attribute lookup). The problem is, there would be
        nothing to stop assignment to the cached value, which would overwrite the result
        of `fget()` and break both the immutability and idempotence guarantees of this
        decorator.

        The performance with this __set__() method in place was roughly 0.4 usec per
        access when measured on a 2.8GHz development machine; so quite snappy and
        probably not a rich target for optimization efforts.
        """
        raise AttributeError("can't set attribute")


def save_as_jsonl(data: List[Dict], filename: str) -> None:
    with open(filename, "w+") as output_file:
        output_file.writelines(json.dumps(datum) + "\n" for datum in data)


def read_from_jsonl(filename: str) -> List[Dict]:
    with open(filename) as input_file:
        return [json.loads(line) for line in input_file]


def requires_dependencies(
    dependencies: Union[str, List[str]],
    extras: Optional[str] = None,
) -> Callable[[Callable[_P, _T]], Callable[_P, _T]]:
    if isinstance(dependencies, str):
        dependencies = [dependencies]

    def decorator(func: Callable[_P, _T]) -> Callable[_P, _T]:
        @wraps(func)
        def wrapper(*args: _P.args, **kwargs: _P.kwargs):
            missing_deps: List[str] = []
            for dep in dependencies:
                if not dependency_exists(dep):
                    missing_deps.append(dep)
            if len(missing_deps) > 0:
                raise ImportError(
                    f"Following dependencies are missing: {', '.join(missing_deps)}. "
                    + (
                        f"""Please install them using `pip install "unstructured[{extras}]"`."""
                        if extras
                        else f"Please install them using `pip install {' '.join(missing_deps)}`."
                    ),
                )
            return func(*args, **kwargs)

        return wrapper

    return decorator


def dependency_exists(dependency: str):
    try:
        importlib.import_module(dependency)
    except ImportError as e:
        # Check to make sure this isn't some unrelated import error.
        if dependency in repr(e):
            return False
    return True


# Copied from unstructured/ingest/connector/biomed.py
def validate_date_args(date: Optional[str] = None):
    if not date:
        raise ValueError("The argument date is None.")

    for format in DATE_FORMATS:
        try:
            datetime.strptime(date, format)
            return True
        except ValueError:
            pass

    raise ValueError(
        f"The argument {date} does not satisfy the format: "
        "YYYY-MM-DD or YYYY-MM-DDTHH:MM:SS or YYYY-MM-DD+HH:MM:SS or YYYY-MM-DDTHH:MM:SStz",
    )


def _first_and_remaining_iterator(it: Iterable) -> Tuple[Any, Iterator]:
    iterator = iter(it)
    try:
        out = next(iterator)
    except StopIteration:
        raise ValueError(
            "Expected at least 1 element in iterable from which to retrieve first, got empty "
            "iterable.",
        )
    return out, iterator


def first(it: Iterable) -> Any:
    """Returns the first item from an iterable. Raises an error if the iterable is empty."""
    out, _ = _first_and_remaining_iterator(it)
    return out


def only(it: Iterable) -> Any:
    """Returns the only element from a singleton iterable. Raises an error if the iterable is not a
    singleton."""
    out, iterator = _first_and_remaining_iterator(it)
    if any(True for _ in iterator):
        raise ValueError(
            "Expected only 1 element in passed argument, instead there are at least 2 elements.",
        )
    return out


def scarf_analytics():
    try:
        subprocess.check_output("nvidia-smi")
        gpu_present = True
    except Exception:
        gpu_present = False

    python_version = ".".join(platform.python_version().split(".")[:2])

    try:
        if os.getenv("SCARF_NO_ANALYTICS") != "true" and os.getenv("DO_NOT_TRACK") != "true":
            if "dev" in __version__:
                requests.get(
                    "https://packages.unstructured.io/python-telemetry?version="
                    + __version__
                    + "&platform="
                    + platform.system()
                    + "&python"
                    + python_version
                    + "&arch="
                    + platform.machine()
                    + "&gpu="
                    + str(gpu_present)
                    + "&dev=true",
                )
            else:
                requests.get(
                    "https://packages.unstructured.io/python-telemetry?version="
                    + __version__
                    + "&platform="
                    + platform.system()
                    + "&python"
                    + python_version
                    + "&arch="
                    + platform.machine()
                    + "&gpu="
                    + str(gpu_present)
                    + "&dev=false",
                )
    except Exception:
        pass


def ngrams(s: str, n: int) -> List:
    """Generate n-grams from a string s"""

    ngrams_list = []
    for i in range(len(s) - n + 1):
        ngram = []
        for j in range(n):
            ngram.append(s[i + j])
        ngrams_list.append(tuple(ngram))
    return ngrams_list


def calculate_shared_ngram_percentage(
    first_string: str,
    second_string: str,
    n: int,
) -> (float, List):
    """Calculate the percentage of common_ngrams between string_A and string_B
    with reference to the total number of ngrams in string_A"""

    if not n:
        return 0, {}
    first_string_ngrams = ngrams(first_string.split(), n)
    second_string_ngrams = ngrams(second_string.split(), n)

    if not first_string_ngrams:
        return 0

    common_ngrams = set(first_string_ngrams) & set(second_string_ngrams)
    percentage = (len(common_ngrams) / len(first_string_ngrams)) * 100
    return percentage, common_ngrams


def calculate_largest_ngram_percentage(first_string: str, second_string: str) -> (float, List, str):
    """Iteratively calculate_shared_ngram_percentage starting from the biggest
    ngram possible until is >0.0%"""

    shared_ngrams = []
    if len(first_string.split()) < len(second_string.split()):
        n = len(first_string.split()) - 1
    else:
        n = len(second_string.split()) - 1
        first_string, second_string = second_string, first_string
    ngram_percentage = 0
    while not ngram_percentage:
        ngram_percentage, shared_ngrams = calculate_shared_ngram_percentage(
            first_string,
            second_string,
            n,
        )
        if n == 0:
            break
        else:
            n -= 1
    return round(ngram_percentage, 2), shared_ngrams, str(n + 1)


def is_parent_box(
    parent_target: Union[List, Tuple],
    child_target: Union[List, Tuple],
    add: float = 0.0,
) -> bool:
    """True if the child_target bounding box is nested in the parent_target.
    Box format: [x_bottom_left, y_bottom_left, x_top_right, y_top_right].
    The parameter 'add' is the pixel error tolerance for extra pixels outside the parent region"""

    if len(parent_target) != 4:
        return False

    if add and len(parent_target) == 4:
        parent_target = list(parent_target)
        parent_target[0] -= add
        parent_target[1] -= add
        parent_target[2] += add
        parent_target[3] += add

    if (
        len(child_target) == 4
        and (child_target[0] >= parent_target[0] and child_target[1] >= parent_target[1])
        and (child_target[2] <= parent_target[2] and child_target[3] <= parent_target[3])
    ):
        return True
    if len(child_target) == 2 and (
        parent_target[0] <= child_target[0] <= parent_target[2]
        and parent_target[1] <= child_target[1] <= parent_target[3]
    ):
        return True
    return False


def calculate_overlap_percentage(
    box1: Union[List, Tuple],
    box2: Union[List, Tuple],
    intersection_ratio_method: str = "total",
):
    """Box format: [x_bottom_left, y_bottom_left, x_top_right, y_top_right].
    Calculates the percentage of overlapped region with reference to
    the biggest element-region (intersection_ratio_method="parent"),
    the smallest element-region (intersection_ratio_method="partial"), or to
    the disjunctive union region (intersection_ratio_method="total")
    """
    x1, y1 = box1[0]
    x2, y2 = box1[2]
    x3, y3 = box2[0]
    x4, y4 = box2[2]
    area_box1 = (x2 - x1) * (y2 - y1)
    area_box2 = (x4 - x3) * (y4 - y3)
    x_intersection1 = max(x1, x3)
    y_intersection1 = max(y1, y3)
    x_intersection2 = min(x2, x4)
    y_intersection2 = min(y2, y4)
    intersection_area = max(0, x_intersection2 - x_intersection1) * max(
        0,
        y_intersection2 - y_intersection1,
    )
    max_area = max(area_box1, area_box2)
    min_area = min(area_box1, area_box2)
    total_area = area_box1 + area_box2

    if intersection_ratio_method == "parent":
        if max_area == 0:
            return 0
        overlap_percentage = (intersection_area / max_area) * 100

    elif intersection_ratio_method == "partial":
        if min_area == 0:
            return 0
        overlap_percentage = (intersection_area / min_area) * 100

    else:
        if (area_box1 + area_box2) == 0:
            return 0

        overlap_percentage = (intersection_area / (area_box1 + area_box2 - intersection_area)) * 100

    return round(overlap_percentage, 2), max_area, min_area, total_area


def identify_overlapping_case(
    box_pair: Union[List[Union[List, Tuple]], Tuple[Union[List, Tuple]]],
    label_pair: Union[List[str], Tuple[str]],
    text_pair: Union[List[str], Tuple[str]],
    ix_pair: Union[List[str], Tuple[str]],
    sm_overlap_threshold: float = 10.0,
):
    """Classifies the overlapping case for an element_pair input.
    There are 5 categories of overlapping:
    'Small partial overlap', 'Partial overlap with empty content',
    'Partial overlap with duplicate text (sharing 100% of the text)',
    'Partial overlap without sharing text', and
    'Partial overlap sharing {calculate_largest_ngram_percentage(...)}% of the text'
    """
    overlapping_elements, overlapping_case, overlap_percentage, largest_ngram_percentage = (
        None,
        None,
        None,
        None,
    )
    box1, box2 = box_pair
    type1, type2 = label_pair
    text1, text2 = text_pair
    ix_element1, ix_element2 = ix_pair
    (overlap_percentage, max_area, min_area, total_area) = calculate_overlap_percentage(
        box1,
        box2,
        intersection_ratio_method="partial",
    )
    if overlap_percentage < sm_overlap_threshold:
        overlapping_elements = [
            f"{type1}(ix={ix_element1})",
            f"{type2}(ix={ix_element2})",
        ]
        overlapping_case = "Small partial overlap"

    else:
        if not text1:
            overlapping_elements = [
                f"{type1}(ix={ix_element1})",
                f"{type2}(ix={ix_element2})",
            ]
            overlapping_case = f"partial overlap with empty content in {type1}"

        elif not text2:
            overlapping_elements = [
                f"{type2}(ix={ix_element2})",
                f"{type1}(ix={ix_element1})",
            ]
            overlapping_case = f"partial overlap with empty content in {type2}"

        elif text1 in text2 or text2 in text1:
            overlapping_elements = [
                f"{type1}(ix={ix_element1})",
                f"{type2}(ix={ix_element2})",
            ]
            overlapping_case = "partial overlap with duplicate text"

        else:
            (
                largest_ngram_percentage,
                largest_shared_ngrams_max,
                largest_n,
            ) = calculate_largest_ngram_percentage(text1, text2)
            largest_ngram_percentage = round(largest_ngram_percentage, 2)
            if not largest_ngram_percentage:
                overlapping_elements = [
                    f"{type1}(ix={ix_element1})",
                    f"{type2}(ix={ix_element2})",
                ]
                overlapping_case = "partial overlap without sharing text"

            else:
                overlapping_elements = [
                    f"{type1}(ix={ix_element1})",
                    f"{type2}(ix={ix_element2})",
                ]
                ref_type = type1 if len(text1.split()) < len(text2.split()) else type2
                ref_type = "of the text from" + ref_type + f"({largest_n}-gram)"
                overlapping_case = f"partial overlap sharing {largest_ngram_percentage}% {ref_type}"
    return (
        overlapping_elements,
        overlapping_case,
        overlap_percentage,
        largest_ngram_percentage,
        max_area,
        min_area,
        total_area,
    )


# x1, y1 = box1[0]
def identify_overlapping_or_nesting_case(
    box_pair: Union[List[Union[List, Tuple]], Tuple[Union[List, Tuple]]],
    label_pair: Union[List[str], Tuple[str]],
    text_pair: Union[List[str], Tuple[str]],
    nested_error_tolerance_px: int = 5,
    sm_overlap_threshold: float = 10.0,
):
    """Identify if there are nested or overlapping elements. If overlapping is present,
    it identifies the case calling the method identify_overlapping_case"""
    box1, box2 = box_pair
    type1, type2 = label_pair
    ix_element1 = "".join([ch for ch in type1 if ch.isnumeric()])
    ix_element2 = "".join([ch for ch in type2 if ch.isnumeric()])
    type1 = type1[3:].strip()
    type2 = type2[3:].strip()
    x_bottom_left_1, y_bottom_left_1 = box1[0]
    x_top_right_1, y_top_right_1 = box1[2]
    x_bottom_left_2, y_bottom_left_2 = box2[0]
    x_top_right_2, y_top_right_2 = box2[2]
    box1_corners = [x_bottom_left_1, y_bottom_left_1, x_top_right_1, y_top_right_1]
    box2_corners = [x_bottom_left_2, y_bottom_left_2, x_top_right_2, y_top_right_2]

    horizontal_overlap = x_bottom_left_1 < x_top_right_2 and x_top_right_1 > x_bottom_left_2
    vertical_overlap = y_bottom_left_1 < y_top_right_2 and y_top_right_1 > y_bottom_left_2
    (
        overlapping_elements,
        overlapping_case,
        overlap_percentage,
        overlap_percentage_total,
        largest_ngram_percentage,
    ) = (
        None,
        None,
        None,
        None,
        None,
    )
    max_area, min_area, total_area = None, None, None

    if horizontal_overlap and vertical_overlap:
        overlap_percentage_total, _, _, _ = calculate_overlap_percentage(
            box1,
            box2,
            intersection_ratio_method="total",
        )
        overlap_percentage, max_area, min_area, total_area = calculate_overlap_percentage(
            box1,
            box2,
            intersection_ratio_method="parent",
        )

        if is_parent_box(box1_corners, box2_corners, add=nested_error_tolerance_px):
            overlapping_elements = [
                f"{type1}(ix={ix_element1})",
                f"{type2}(ix={ix_element2})",
            ]
            overlapping_case = f"nested {type2} in {type1}"
            overlap_percentage = 100

        elif is_parent_box(box2_corners, box1_corners, add=nested_error_tolerance_px):
            overlapping_elements = [
                f"{type2}(ix={ix_element2})",
                f"{type1}(ix={ix_element1})",
            ]
            overlapping_case = f"nested {type1} in {type2}"
            overlap_percentage = 100

        else:
            (
                overlapping_elements,
                overlapping_case,
                overlap_percentage,
                largest_ngram_percentage,
                max_area,
                min_area,
                total_area,
            ) = identify_overlapping_case(
                box_pair,
                label_pair,
                text_pair,
                (ix_element1, ix_element2),
                sm_overlap_threshold=sm_overlap_threshold,
            )
    return (
        overlapping_elements,
        overlapping_case,
        overlap_percentage,
        overlap_percentage_total,
        largest_ngram_percentage,
        max_area,
        min_area,
        total_area,
    )


def catch_overlapping_and_nested_bboxes(
    elements: List["Text"],
    nested_error_tolerance_px: int = 5,
    sm_overlap_threshold: float = 10.0,
) -> (bool, List[Dict]):
    """Catch overlapping and nested bounding boxes cases across a list of elements."""

    num_pages = elements[-1].metadata.page_number
    bounding_boxes = [[] for _ in range(num_pages)]

    text_labels = [[] for _ in range(num_pages)]
    text_content = [[] for _ in range(num_pages)]

    for ix, element in enumerate(elements):
        n_page_to_ix = element.metadata.page_number - 1
        bounding_boxes[n_page_to_ix].append(element.metadata.coordinates.to_dict()["points"])
        text_labels[n_page_to_ix].append(f"{ix}. {element.category}")
        text_content[n_page_to_ix].append(element.text)

    document_with_overlapping_flag = False
    overlapping_cases = []
    for page_number, (page_bboxes, page_labels, page_text) in enumerate(
        zip(bounding_boxes, text_labels, text_content),
        start=1,
    ):
        page_bboxes_combinations = list(combinations(page_bboxes, 2))
        page_labels_combinations = list(combinations(page_labels, 2))
        text_content_combinations = list(combinations(page_text, 2))

        for box_pair, label_pair, text_pair in zip(
            page_bboxes_combinations,
            page_labels_combinations,
            text_content_combinations,
        ):
            (
                overlapping_elements,
                overlapping_case,
                overlap_percentage,
                overlap_percentage_total,
                largest_ngram_percentage,
                max_area,
                min_area,
                total_area,
            ) = identify_overlapping_or_nesting_case(
                box_pair,
                label_pair,
                text_pair,
                nested_error_tolerance_px,
                sm_overlap_threshold,
            )

            if overlapping_case:
                overlapping_cases.append(
                    {
                        "overlapping_elements": overlapping_elements,
                        "overlapping_case": overlapping_case,
                        "overlap_percentage": f"{overlap_percentage}%",
                        "metadata": {
                            "largest_ngram_percentage": largest_ngram_percentage,
                            "overlap_percentage_total": f"{overlap_percentage_total}%",
                            "max_area": f"{round(max_area, 2)}pxˆ2",
                            "min_area": f"{round(min_area, 2)}pxˆ2",
                            "total_area": f"{round(total_area, 2)}pxˆ2",
                        },
                    },
                )
                document_with_overlapping_flag = True

    return document_with_overlapping_flag, overlapping_cases
Back to Directory File Manager