from collections import Counter
from functools import reduce
from typing import Any, Dict, List, NamedTuple, Union
from ska_ser_skallop.mvp_control.describing.mvp_names import DomainList
DevName = str
DevState = str
DevStates = Dict[DevName, DevState]
[docs]class DevicesInState(NamedTuple):
count: int
items: List[DevName]
Grouping = Dict[DevState, DevicesInState]
[docs]class DevList(list):
def __init__(self, *items: Union[str, List[str]]) -> None:
the_list = []
for item in items:
the_list = [*the_list, *item] if isinstance(item, list) else [*the_list, item]
super().__init__(the_list)
def __sub__(self, other: List[str]):
return DevList([item for item in self if item not in other])
def __add__(self, other: List[str]):
other = other if isinstance(other, DevList) else DevList(other)
return DevList([*self, *(other - self)])
[docs] def filter_out(self, domain: DomainList, *tags: str):
selector = domain.filter(*tags).list
return self.__sub__(selector)
[docs] def select(self, domain: DomainList, *tags: str):
selector = domain.filter(*tags).list
return DevList([item for item in self if item in selector])
[docs]class Inspection:
def __init__(self, name: str, val: str) -> None:
self._val = val
self._name = name
[docs] def is_in_state(self, *state: str) -> bool:
return self._val in state
[docs] def in_state(self, *state: str) -> Dict:
return {self._name: self._val} if self._val in state else {}
[docs] def not_in_state(self, *state: str) -> Dict:
return {self._name: self._val} if self._val not in state else {}
[docs] def generate_faulty_message(self, *state: str) -> List:
faulty = self.not_in_state(*state)
return [
f"expected {device} to be in {state} but instead was {actual}"
for device, actual in faulty.items()
]
[docs]class ListInspection:
def __init__(self, the_list: Dict[DevName, DevState]) -> None:
self.list = the_list
[docs] def are_all_the_same(self) -> bool:
for value in self.list.values():
if any(self.are_not_in_state(value)):
return False
return True
@property
def value(self) -> Union[List[DevState], DevState]:
value = list(self.get_grouping().keys())
if len(value) == 1:
return value[0]
return value
@property
def devices(self) -> DevList:
return DevList(list(self.list.keys()))
[docs] def get_grouping(self) -> Grouping:
counts = Counter(self.list.values())
grouping = {}
for state, count in counts.items():
keys = [key for key in self.list.keys() if self.list[key] == state]
grouping[state] = DevicesInState(count, keys)
return grouping
[docs] def are_in_state(self, *state: str) -> List[bool]:
return [val in state for val in self.list.values()]
[docs] def in_state(self, *state: str) -> DevList:
return DevList([key for key, val in self.list.items() if val in state])
[docs] def not_in_state(self, *state: str) -> DevList:
return DevList([key for key, val in self.list.items() if val not in state])
[docs] def are_not_in_state(self, *state: str) -> Dict[DevName, DevState]:
return {key: val for key, val in self.list.items() if val not in state}
def __bool__(self) -> bool:
return self.list != {}
[docs] def generate_faulty_message(self, *state: str) -> List:
faulty = self.are_not_in_state(*state)
return [
f"expected {device} to be in {state} but instead was {actual}"
for device, actual in faulty.items()
]
def __add__(self, other):
inspections = ListInspections()
inspections["1"] = self
inspections["2"] = other
return inspections._flatten()
def __sub__(self, other: "ListInspection"):
first_list = self.list
second_list = other.list
devices_from_second_list = second_list.keys()
return ListInspection(
{
device: device_state
for device, device_state in first_list
if device not in devices_from_second_list
}
)
[docs]class ListInspections(dict):
"""A composite structure of multiple ListInspection objects (or recursively ListInspections) that is
'ducked typed' into a listInspection class. I.e. it allows to be handled in the same way as the more
simpler ListInspection (are_in_state,in_state etc ) except the multiplicity is implicitly taken into account
Args:
dict ([type]): [description]
"""
def _flatten(self) -> ListInspection:
inspections = [
val.list for val in self.values() if isinstance(val, (ListInspection, ListInspections))
]
if inspections:
return ListInspection(reduce(lambda x, y: {**x, **y}, inspections))
return ListInspection({})
@property
def list(self) -> Dict:
return self._flatten().list
@property
def value(self) -> Union[List[DevState], DevState]:
return self._flatten().value
@property
def devices(self) -> DevList:
return self._flatten().devices
[docs] def are_in_state(self, *state: str) -> List:
return self._flatten().are_in_state(*state)
[docs] def in_state(self, *state: str) -> DevList:
return self._flatten().in_state(*state)
[docs] def not_in_state(self, *state: str) -> DevList:
return self._flatten().not_in_state(*state)
[docs] def are_not_in_state(self, *state: str) -> Dict[DevName, DevState]:
return self._flatten().are_not_in_state(*state)
def __setitem__(self, k: str, v: Any) -> None:
if not isinstance(v, ListInspections) and not isinstance(v, ListInspection):
if isinstance(v, dict):
v = ListInspection(v)
super().__setitem__(k, v)
def __add__(self, other):
agg = ListInspections()
agg["1"] = self
agg["2"] = other
return agg
def __sub__(self, other: Union[ListInspection, "ListInspections"]) -> ListInspection:
first_inspection = self._flatten()
second_inspection = other._flatten() if isinstance(other, ListInspections) else other
return first_inspection - second_inspection
[docs] def are_all_the_same(self) -> bool:
return self._flatten().are_all_the_same()
[docs] def get_grouping(self) -> Dict:
return self._flatten().get_grouping()
[docs] def generate_faulty_message(self, *state: str) -> List:
faulty = self._flatten().are_not_in_state(*state)
return [
f"expected {device} to be in {state} but instead was {actual}"
for device, actual in faulty.items()
]