fixtures: minor cleanups to reorder_items

This makes some minor clarity and performance improvements to the code.
This commit is contained in:
Ran Benita 2024-05-27 00:44:05 +03:00
parent 98021838fd
commit 1eee63a891
1 changed files with 65 additions and 60 deletions

View File

@ -21,6 +21,7 @@ from typing import Generic
from typing import Iterable
from typing import Iterator
from typing import List
from typing import Mapping
from typing import MutableMapping
from typing import NoReturn
from typing import Optional
@ -161,6 +162,12 @@ def getfixturemarker(obj: object) -> Optional["FixtureFunctionMarker"]:
)
# Algorithm for sorting on a per-parametrized resource setup basis.
# It is called for Session scope first and performs sorting
# down to the lower scopes such as to minimize number of "high scope"
# setups and teardowns.
@dataclasses.dataclass(frozen=True)
class FixtureArgKey:
argname: str
@ -169,97 +176,91 @@ class FixtureArgKey:
item_cls: Optional[type]
def get_parametrized_fixture_keys(
_V = TypeVar("_V")
OrderedSet = Dict[_V, None]
def get_parametrized_fixture_argkeys(
item: nodes.Item, scope: Scope
) -> Iterator[FixtureArgKey]:
"""Return list of keys for all parametrized arguments which match
the specified scope."""
assert scope is not Scope.Function
try:
callspec: CallSpec2 = item.callspec # type: ignore[attr-defined]
except AttributeError:
return
item_cls = None
if scope is Scope.Session:
scoped_item_path = None
elif scope is Scope.Package:
# Package key = module's directory.
scoped_item_path = item.path.parent
elif scope is Scope.Module:
scoped_item_path = item.path
elif scope is Scope.Class:
scoped_item_path = item.path
item_cls = item.cls # type: ignore[attr-defined]
else:
assert_never(scope)
for argname in callspec.indices:
if callspec._arg2scope[argname] != scope:
continue
item_cls = None
if scope is Scope.Session:
scoped_item_path = None
elif scope is Scope.Package:
# Package key = module's directory.
scoped_item_path = item.path.parent
elif scope is Scope.Module:
scoped_item_path = item.path
elif scope is Scope.Class:
scoped_item_path = item.path
item_cls = item.cls # type: ignore[attr-defined]
else:
assert_never(scope)
param_index = callspec.indices[argname]
yield FixtureArgKey(argname, param_index, scoped_item_path, item_cls)
# Algorithm for sorting on a per-parametrized resource setup basis.
# It is called for Session scope first and performs sorting
# down to the lower scopes such as to minimize number of "high scope"
# setups and teardowns.
def reorder_items(items: Sequence[nodes.Item]) -> List[nodes.Item]:
argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[FixtureArgKey, None]]] = {}
argkeys_by_item: Dict[Scope, Dict[nodes.Item, OrderedSet[FixtureArgKey]]] = {}
items_by_argkey: Dict[Scope, Dict[FixtureArgKey, Deque[nodes.Item]]] = {}
for scope in HIGH_SCOPES:
scoped_argkeys_cache = argkeys_cache[scope] = {}
scoped_argkeys_by_item = argkeys_by_item[scope] = {}
scoped_items_by_argkey = items_by_argkey[scope] = defaultdict(deque)
for item in items:
keys = dict.fromkeys(get_parametrized_fixture_keys(item, scope), None)
if keys:
scoped_argkeys_cache[item] = keys
for key in keys:
scoped_items_by_argkey[key].append(item)
items_dict = dict.fromkeys(items, None)
argkeys = dict.fromkeys(get_parametrized_fixture_argkeys(item, scope))
if argkeys:
scoped_argkeys_by_item[item] = argkeys
for argkey in argkeys:
scoped_items_by_argkey[argkey].append(item)
items_set = dict.fromkeys(items)
return list(
reorder_items_atscope(items_dict, argkeys_cache, items_by_argkey, Scope.Session)
reorder_items_atscope(
items_set, argkeys_by_item, items_by_argkey, Scope.Session
)
)
def fix_cache_order(
item: nodes.Item,
argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[FixtureArgKey, None]]],
items_by_argkey: Dict[Scope, Dict[FixtureArgKey, "Deque[nodes.Item]"]],
) -> None:
for scope in HIGH_SCOPES:
for key in argkeys_cache[scope].get(item, []):
items_by_argkey[scope][key].appendleft(item)
def reorder_items_atscope(
items: Dict[nodes.Item, None],
argkeys_cache: Dict[Scope, Dict[nodes.Item, Dict[FixtureArgKey, None]]],
items_by_argkey: Dict[Scope, Dict[FixtureArgKey, "Deque[nodes.Item]"]],
items: OrderedSet[nodes.Item],
argkeys_by_item: Mapping[Scope, Mapping[nodes.Item, OrderedSet[FixtureArgKey]]],
items_by_argkey: Mapping[Scope, Mapping[FixtureArgKey, "Deque[nodes.Item]"]],
scope: Scope,
) -> Dict[nodes.Item, None]:
) -> OrderedSet[nodes.Item]:
if scope is Scope.Function or len(items) < 3:
return items
ignore: Set[Optional[FixtureArgKey]] = set()
items_deque = deque(items)
items_done: Dict[nodes.Item, None] = {}
scoped_items_by_argkey = items_by_argkey[scope]
scoped_argkeys_cache = argkeys_cache[scope]
scoped_argkeys_by_item = argkeys_by_item[scope]
ignore: Set[FixtureArgKey] = set()
items_deque = deque(items)
items_done: OrderedSet[nodes.Item] = {}
while items_deque:
no_argkey_group: Dict[nodes.Item, None] = {}
no_argkey_items: OrderedSet[nodes.Item] = {}
slicing_argkey = None
while items_deque:
item = items_deque.popleft()
if item in items_done or item in no_argkey_group:
if item in items_done or item in no_argkey_items:
continue
argkeys = dict.fromkeys(
(k for k in scoped_argkeys_cache.get(item, []) if k not in ignore), None
k for k in scoped_argkeys_by_item.get(item, ()) if k not in ignore
)
if not argkeys:
no_argkey_group[item] = None
no_argkey_items[item] = None
else:
slicing_argkey, _ = argkeys.popitem()
# We don't have to remove relevant items from later in the
@ -268,16 +269,20 @@ def reorder_items_atscope(
i for i in scoped_items_by_argkey[slicing_argkey] if i in items
]
for i in reversed(matching_items):
fix_cache_order(i, argkeys_cache, items_by_argkey)
items_deque.appendleft(i)
# Fix items_by_argkey order.
for other_scope in HIGH_SCOPES:
other_scoped_items_by_argkey = items_by_argkey[other_scope]
for argkey in argkeys_by_item[other_scope].get(i, ()):
other_scoped_items_by_argkey[argkey].appendleft(i)
break
if no_argkey_group:
no_argkey_group = reorder_items_atscope(
no_argkey_group, argkeys_cache, items_by_argkey, scope.next_lower()
if no_argkey_items:
reordered_no_argkey_items = reorder_items_atscope(
no_argkey_items, argkeys_by_item, items_by_argkey, scope.next_lower()
)
for item in no_argkey_group:
items_done[item] = None
ignore.add(slicing_argkey)
items_done.update(reordered_no_argkey_items)
if slicing_argkey is not None:
ignore.add(slicing_argkey)
return items_done