from __future__ import annotations from enum import Enum import xmltodict def listify(element): if isinstance(element, list): return element return [element] class Event(): def __init__(self, _id:str, name:str, id_dict:dict, parent:Process=None) -> None: self._id = _id id_dict[_id] = self self.name = name.lower() self.pending = False self.executed = False self.included = False self.relations_to : list[Relationship] = [] self.relations_from : list[Relationship] = [] self.parent = parent def execute(self): self.executed = True self.pending = False for relationship in self.relations_from: relationship.execute() @property def enabled(self): if self.parent is not None: included = self.included and self.parent.enabled else: included = self.included no_conditions = all( condition.source.executed or not condition.source.included for condition in [ relation for relation in self.relations_to if relation.type == RelationsshipType.condition ] ) no_milestones = all( not milestone.source.pending or not milestone.source.included for milestone in [ relation for relation in self.relations_to if relation.type == RelationsshipType.milestone ] ) return included and no_conditions and no_milestones def enabled_list(self): if self.included: return [self] else: return [] def __repr__(self) -> str: return self.name def pending_list(self): if self.pending and self.included: return [self] else: return [] class Process(Event): def __init__(self, _id:str, name:str, label_mappings: dict, events: list, id_dict: dict,parent:Process=None) -> None: super().__init__(_id,name,id_dict,parent) self.process_process(label_mappings, events, id_dict) def process_process(self, label_mappings: dict, events: list, id_dict): self.events = [] for event in events: _id = event["@id"] label = label_mappings[_id] if "@type" in event: new_event = Process(_id, label, label_mappings, listify(event["event"]), id_dict, self) else: new_event = Event(_id, label, id_dict, self) self.events.append(new_event) def enabled_list(self): if self.enabled: enabled_events = [self] if self._id != "" else [] for event in self.events: enabled_events += event.enabled_list() else: enabled_events = [] return enabled_events def pending_list(self): pending_events = [] if self.pending and self.included: pending_events.append(self) for event in self.events: pending_events += event.pending_list() return pending_events class RelationsshipType(Enum): condition = 0 response = 1 coresponse = 2 exclude = 3 include = 4 milestone = 5 update = 6 spawn = 7 templateSpawn = 8 class Relationship(): def __init__(self, source:Event, target:Event, type) -> None: self.source = source self.target = target self.type = type def execute(self): if self.type == RelationsshipType.condition: pass # does nothing elif self.type == RelationsshipType.response: self.target.pending = True elif self.type == RelationsshipType.coresponse: pass # Don't know what this one does elif self.type == RelationsshipType.exclude: self.target.included = False elif self.type == RelationsshipType.include: self.target.included = True elif self.type == RelationsshipType.milestone: pass # does nothing elif self.type == RelationsshipType.update: pass elif self.type == RelationsshipType.spawn: pass elif self.type == RelationsshipType.templateSpawn: pass class Graph(): def __init__(self, process:Process, relationships:list[Relationship], id_dict: dict) -> None: self.process = process self.relationships = relationships self.id_dict = id_dict @property def enabled(self): return self.process.enabled_list() @property def pending(self): return self.process.pending_list() def xml_to_dcr(xml_file): with open(xml_file) as file_pointer: dcr_dict = xmltodict.parse(file_pointer.read())["dcrgraph"] label_mappings = { lm["@eventId"]:lm["@labelId"] for lm in listify(dcr_dict["specification"]["resources"]["labelMappings"]["labelMapping"]) } id_dict: dict[str,Event] = {} graph = Process("", "", label_mappings, listify(dcr_dict["specification"]["resources"]["events"]["event"]), id_dict) graph.included = True def extract_markings(key): return [ _id["@id"] for _id in listify(dcr_dict["runtime"]["marking"][key]["event"]) ] if dcr_dict["runtime"]["marking"][key] is not None else [] executed = extract_markings("executed") for _id in executed: id_dict[_id].executed = True included = extract_markings("included") for _id in included: id_dict[_id].included = True pending = extract_markings("pendingResponses") for _id in pending: id_dict[_id].pending = True def extract_relationships(key): return [ (r["@sourceId"], r["@targetId"]) for r in listify(dcr_dict["specification"]["constraints"][key][key[:-1]]) ] if dcr_dict["specification"]["constraints"][key] is not None else [] conditions = extract_relationships("conditions") responses = extract_relationships("responses") coresponses = extract_relationships("coresponses") excludes = extract_relationships("excludes") includes = extract_relationships("includes") milestones = extract_relationships("milestones") updates = extract_relationships("updates") spawns = extract_relationships("spawns") templateSpawns = extract_relationships("templateSpawns") relationships: list[Relationship] = [] for i, relationship_list in enumerate([conditions,responses,coresponses,excludes,includes,milestones,updates,spawns,templateSpawns]): for relationship in relationship_list: source = id_dict[relationship[0]] target = id_dict[relationship[1]] relationships.append(Relationship(source,target, RelationsshipType(i))) for relationship in relationships: relationship.source.relations_from.append(relationship) relationship.target.relations_to.append(relationship) return Graph(graph, relationship, id_dict)