diff --git a/data_diff/diff_tables.py b/data_diff/diff_tables.py index 518cc122..6de54898 100644 --- a/data_diff/diff_tables.py +++ b/data_diff/diff_tables.py @@ -6,7 +6,7 @@ from enum import Enum from contextlib import contextmanager from operator import methodcaller -from typing import Dict, Set, Tuple, Iterator, Optional +from typing import Dict, Set, List, Tuple, Iterator, Optional from concurrent.futures import ThreadPoolExecutor, as_completed import attrs @@ -28,6 +28,7 @@ class Algorithm(Enum): DiffResult = Iterator[Tuple[str, tuple]] # Iterator[Tuple[Literal["+", "-"], tuple]] +DiffResultList = Iterator[List[Tuple[str, tuple]]] @attrs.define(frozen=False) @@ -187,6 +188,7 @@ class TableDiffer(ThreadBase, ABC): ignored_columns1: Set[str] = attrs.field(factory=set) ignored_columns2: Set[str] = attrs.field(factory=set) _ignored_columns_lock: threading.Lock = attrs.field(factory=threading.Lock, init=False) + yield_list: bool = False def diff_tables(self, table1: TableSegment, table2: TableSegment, info_tree: InfoTree = None) -> DiffResultWrapper: """Diff the given tables. @@ -255,7 +257,9 @@ def _diff_tables_wrapper(self, table1: TableSegment, table2: TableSegment, info_ def _validate_and_adjust_columns(self, table1: TableSegment, table2: TableSegment) -> None: pass - def _diff_tables_root(self, table1: TableSegment, table2: TableSegment, info_tree: InfoTree) -> DiffResult: + def _diff_tables_root( + self, table1: TableSegment, table2: TableSegment, info_tree: InfoTree + ) -> DiffResult | DiffResultList: return self._bisect_and_diff_tables(table1, table2, info_tree) @abstractmethod @@ -300,9 +304,9 @@ def _bisect_and_diff_tables(self, table1: TableSegment, table2: TableSegment, in f"size: table1 <= {btable1.approximate_size()}, table2 <= {btable2.approximate_size()}" ) - ti = ThreadedYielder(self.max_threadpool_size) + ti = ThreadedYielder(self.max_threadpool_size, self.yield_list) # Bisect (split) the table into segments, and diff them recursively. - ti.submit(self._bisect_and_diff_segments, ti, btable1, btable2, info_tree) + ti.submit(self._bisect_and_diff_segments, ti, btable1, btable2, info_tree, priority=999) # Now we check for the second min-max, to diff the portions we "missed". # This is achieved by subtracting the table ranges, and dividing the resulting space into aligned boxes. @@ -326,7 +330,7 @@ def _bisect_and_diff_tables(self, table1: TableSegment, table2: TableSegment, in for p1, p2 in new_regions: extra_tables = [t.new_key_bounds(min_key=p1, max_key=p2) for t in (table1, table2)] - ti.submit(self._bisect_and_diff_segments, ti, *extra_tables, info_tree) + ti.submit(self._bisect_and_diff_segments, ti, *extra_tables, info_tree, priority=999) return ti diff --git a/data_diff/thread_utils.py b/data_diff/thread_utils.py index 55b2d9d5..ba292ef5 100644 --- a/data_diff/thread_utils.py +++ b/data_diff/thread_utils.py @@ -56,19 +56,24 @@ class ThreadedYielder(Iterable): _futures: deque _yield: deque = attrs.field(alias="_yield") # Python keyword! _exception: Optional[None] + yield_list: bool - def __init__(self, max_workers: Optional[int] = None): + def __init__(self, max_workers: Optional[int] = None, yield_list: bool = False): super().__init__() self._pool = PriorityThreadPoolExecutor(max_workers) self._futures = deque() self._yield = deque() self._exception = None + self.yield_list = yield_list def _worker(self, fn, *args, **kwargs): try: res = fn(*args, **kwargs) if res is not None: - self._yield += res + if self.yield_list: + self._yield.append(res) + else: + self._yield += res except Exception as e: self._exception = e