forked from xuos/xiuos
				
			
		
			
				
	
	
		
			159 lines
		
	
	
		
			6.0 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
			
		
		
	
	
			159 lines
		
	
	
		
			6.0 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
#! /usr/bin/python3
 | 
						|
# this script builds a decision tree to select suitable kernels from developers'  preferences
 | 
						|
from __future__ import annotations
 | 
						|
from typing import TypeVar, Generic, Tuple, Optional
 | 
						|
import os
 | 
						|
import yaml
 | 
						|
try:
 | 
						|
    from yaml import CLoader as Loader
 | 
						|
except ImportError:
 | 
						|
    from yaml import Loader
 | 
						|
 | 
						|
ubiquitous_dir = os.environ.get('UBIQUITOUS_ROOT')
 | 
						|
kernel_dir = os.environ.get('KERNEL_ROOT')
 | 
						|
 | 
						|
YAMLS_PATH = f"{ubiquitous_dir}/feature_yaml/"
 | 
						|
REQUIREMENT_PATH = f"{kernel_dir}/requirement.yaml"
 | 
						|
 | 
						|
COMPONENT_KEY = "Feature"
 | 
						|
PARAMETER_KEY = "Parameter"
 | 
						|
MANUAL_KEY = "Kernel"
 | 
						|
 | 
						|
# tree node for kernel features
 | 
						|
V = TypeVar("V")
 | 
						|
class TreeNode(Generic[V]):
 | 
						|
    def __init__(self, depth: int, name: str, value: V):
 | 
						|
        self.depth = depth
 | 
						|
        self.name = name
 | 
						|
        self.value = value
 | 
						|
        self.children = dict()
 | 
						|
 | 
						|
    def __repr__(self):
 | 
						|
        res = f"L{self.depth}: name {self.name} - value {self.value} - {len(self.children)} children\n"
 | 
						|
        for c in self.children:
 | 
						|
            # recursively build representation
 | 
						|
            res += self.children[c].__repr__() 
 | 
						|
        return res
 | 
						|
        
 | 
						|
    def node_number(self):
 | 
						|
        if len(self.children) == 0:
 | 
						|
            return 1
 | 
						|
        number = 1
 | 
						|
        for c in self.children:
 | 
						|
            number += self.children[c].node_number()
 | 
						|
        return number
 | 
						|
 | 
						|
 | 
						|
# this class should be constructed from the yaml file
 | 
						|
class Kernel:
 | 
						|
    def __init__(self, name, feature_yaml: dict):
 | 
						|
        self.name = name
 | 
						|
        self.features_root: Optional[TreeNode[bool]] = None
 | 
						|
        self.parameters_root: Optional[TreeNode[int]] = None
 | 
						|
        self.extract_components(feature_yaml)
 | 
						|
        self.extract_parameters(feature_yaml)
 | 
						|
        
 | 
						|
    def __repr__(self):
 | 
						|
        return f"{self.name}\nFeature Tree:\n{self.features_root}Parameter Tree:\n{self.parameters_root}\n"
 | 
						|
 | 
						|
    def extract_components(self, feature_yaml: dict):
 | 
						|
        self.features_root = TreeNode(0, COMPONENT_KEY, True)
 | 
						|
        for k,v in feature_yaml[COMPONENT_KEY].items():
 | 
						|
            node = TreeNode(1, k, v)
 | 
						|
            if k in feature_yaml:
 | 
						|
                self.build_sub_tree(2, node, feature_yaml)
 | 
						|
            self.features_root.children[k] = node
 | 
						|
         
 | 
						|
    def build_sub_tree(self, depth, node: TreeNode[bool], feature_yaml: dict):
 | 
						|
        if feature_yaml[node.name] is None:
 | 
						|
            return
 | 
						|
        for k,v in feature_yaml[node.name].items():
 | 
						|
            child = TreeNode(depth, k, v)
 | 
						|
            if child.name in feature_yaml:
 | 
						|
                self.build_sub_tree(depth+1, child, feature_yaml)
 | 
						|
            node.children[k] = child
 | 
						|
 | 
						|
    def extract_parameters(self, feature_yaml: dict):
 | 
						|
        self.parameters_root = TreeNode(0, PARAMETER_KEY, 0)
 | 
						|
        for k,v in feature_yaml[PARAMETER_KEY].items():
 | 
						|
            child = TreeNode(1, k, v)
 | 
						|
            self.parameters_root.children[k] = child
 | 
						|
 | 
						|
    def check(self, requirements_root, root) -> Tuple[bool, int]:
 | 
						|
        distance = 0
 | 
						|
        q = list()
 | 
						|
        q.append((requirements_root, root))
 | 
						|
        while len(q) > 0:
 | 
						|
            (r_node, s_node) = q.pop()
 | 
						|
            for c in r_node.children:
 | 
						|
                if c in s_node.children and self.compare(c, s_node.children[c].value, r_node.children[c].value):
 | 
						|
                    q.append((r_node.children[c], s_node.children[c]))
 | 
						|
                    distance += (r_node.children[c].value - s_node.children[c].value)**2
 | 
						|
                else:
 | 
						|
                    return False, distance
 | 
						|
        return True, distance
 | 
						|
 | 
						|
    def compare(self, name, v1, v2):
 | 
						|
        if name.endswith("GREATER_THAN"):
 | 
						|
            return v1 >= v2
 | 
						|
        elif name.endswith("LESS_THAN"):        
 | 
						|
            return v1 <= v2
 | 
						|
        else:
 | 
						|
            return v1 == v2
 | 
						|
 | 
						|
# load the requirements from the yaml file
 | 
						|
def load_required_features()->dict:
 | 
						|
    with open(REQUIREMENT_PATH, 'r') as f:
 | 
						|
        content = f.read()
 | 
						|
        feature_yaml = yaml.load(content, Loader=Loader)
 | 
						|
        print(feature_yaml)
 | 
						|
        return feature_yaml
 | 
						|
 | 
						|
# load kernel yamls from ubiquitous directory and build Kernel class
 | 
						|
def load_kernel_features()->list[Kernel]:
 | 
						|
    kernels = {}
 | 
						|
    for file_name in os.listdir(YAMLS_PATH):
 | 
						|
        kernel_name = file_name.rstrip(".yaml")
 | 
						|
        with open(os.path.join(YAMLS_PATH, file_name), 'r') as f:
 | 
						|
            content = f.read()
 | 
						|
            feature_yaml = yaml.load(content, Loader=Loader)
 | 
						|
            kernels[kernel_name] = Kernel(kernel_name, feature_yaml)
 | 
						|
    return kernels
 | 
						|
 | 
						|
# recommend a suitable kernel according to the requirement
 | 
						|
def select_kernel(requirement: dict, kernels: dict[Kernel]) -> str:
 | 
						|
    selected = None
 | 
						|
    selected_node_number = 0
 | 
						|
    selected_distance = 0
 | 
						|
    requirement = Kernel("requirement", requirement)
 | 
						|
    for name, kernel in kernels.items():
 | 
						|
        # here should be a tree matching algorithm
 | 
						|
        # requirement trees (both features and parameters) 
 | 
						|
        # should be subtrees of the recommended kernel
 | 
						|
        pass_features, _ = kernel.check(requirement.features_root, kernel.features_root)
 | 
						|
        pass_parameters, distance = kernel.check(requirement.parameters_root, kernel.parameters_root)
 | 
						|
        print(name, pass_features, pass_parameters, distance)
 | 
						|
        if pass_features and pass_parameters:
 | 
						|
            if selected is None:
 | 
						|
                selected = kernel
 | 
						|
                selected_node_number = kernel.features_root.node_number()
 | 
						|
                selected_node_number = distance
 | 
						|
            else:
 | 
						|
                node_number = kernel.features_root.node_number()
 | 
						|
                if selected_node_number > node_number or (selected_node_number == node_number and selected_distance > distance):
 | 
						|
                    selected = kernel
 | 
						|
                    selected_node_number = kernel.features_root.node_number()
 | 
						|
                    selected_node_number = distance
 | 
						|
    if selected is None:
 | 
						|
        return "Cannot find suitable kernel"
 | 
						|
    else:
 | 
						|
        return selected.name
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    requirement = load_required_features()
 | 
						|
    if MANUAL_KEY in requirement:
 | 
						|
        selected = list(requirement[MANUAL_KEY].keys())[0][7:]
 | 
						|
    else:
 | 
						|
        kernels = load_kernel_features()
 | 
						|
        selected = select_kernel(requirement, kernels)
 | 
						|
    print(f"Selected kernel is: {selected}") |