220 lines
7.1 KiB
Python
220 lines
7.1 KiB
Python
from __future__ import annotations
|
|
from enum import Enum
|
|
|
|
import xmltodict
|
|
|
|
def listify(element):
|
|
if isinstance(element, list):
|
|
return element
|
|
return [element]
|
|
|
|
class Event():
|
|
def __init__(self, _id:str, name:str, id_dict:dict, parent:Process=None) -> None:
|
|
self._id = _id
|
|
self.name = name.lower()
|
|
self.pending = False
|
|
self.executed = False
|
|
self.included = False
|
|
self.relations_to : list[Relationship] = []
|
|
self.relations_from : list[Relationship] = []
|
|
self.parent = parent
|
|
|
|
id_dict[_id] = self
|
|
|
|
def execute(self):
|
|
self.executed = True
|
|
self.pending = False
|
|
for relationship in self.relations_from:
|
|
relationship.execute()
|
|
|
|
@property
|
|
def enabled(self):
|
|
if self.parent is not None:
|
|
included = self.included and self.parent.enabled
|
|
else:
|
|
included = self.included
|
|
|
|
no_conditions = all(
|
|
condition.source.executed or not condition.source.included
|
|
for condition in [
|
|
relation
|
|
for relation in self.relations_to
|
|
if relation.type == RelationsshipType.condition
|
|
]
|
|
)
|
|
no_milestones = all(
|
|
not milestone.source.pending or not milestone.source.included
|
|
for milestone in [
|
|
relation
|
|
for relation in self.relations_to
|
|
if relation.type == RelationsshipType.milestone
|
|
]
|
|
)
|
|
|
|
return included and no_conditions and no_milestones
|
|
|
|
def enabled_list(self):
|
|
if self.included:
|
|
return [self]
|
|
else:
|
|
return []
|
|
|
|
def __repr__(self) -> str:
|
|
return self.name
|
|
|
|
def pending_list(self):
|
|
if self.pending and self.included:
|
|
return [self]
|
|
else:
|
|
return []
|
|
|
|
class Process(Event):
|
|
def __init__(self, _id:str, name:str, label_mappings: dict, events: list, id_dict: dict,parent:Process=None) -> None:
|
|
super().__init__(_id,name,id_dict,parent)
|
|
self.process_process(label_mappings, events, id_dict)
|
|
|
|
def process_process(self, label_mappings: dict, events: list, id_dict):
|
|
self.events = []
|
|
for event in events:
|
|
_id = event["@id"]
|
|
label = label_mappings[_id]
|
|
if "@type" in event:
|
|
new_event = Process(_id, label, label_mappings, listify(event["event"]), id_dict, self)
|
|
else:
|
|
new_event = Event(_id, label, id_dict, self)
|
|
|
|
self.events.append(new_event)
|
|
|
|
def enabled_list(self):
|
|
if self.enabled:
|
|
enabled_events = [self] if self._id != "" else []
|
|
for event in self.events:
|
|
enabled_events += event.enabled_list()
|
|
else:
|
|
enabled_events = []
|
|
|
|
return enabled_events
|
|
|
|
def pending_list(self):
|
|
pending_events = []
|
|
if self.pending and self.included:
|
|
pending_events.append(self)
|
|
|
|
for event in self.events:
|
|
pending_events += event.pending_list()
|
|
|
|
return pending_events
|
|
|
|
class RelationsshipType(Enum):
|
|
condition = 0
|
|
response = 1
|
|
coresponse = 2
|
|
exclude = 3
|
|
include = 4
|
|
milestone = 5
|
|
update = 6
|
|
spawn = 7
|
|
templateSpawn = 8
|
|
|
|
class Relationship():
|
|
def __init__(self, source:Event, target:Event, type) -> None:
|
|
self.source = source
|
|
self.target = target
|
|
self.type = type
|
|
|
|
def execute(self):
|
|
if self.type == RelationsshipType.condition:
|
|
pass # does nothing (Since the source is executed as well)
|
|
elif self.type == RelationsshipType.response:
|
|
self.target.pending = True
|
|
elif self.type == RelationsshipType.coresponse:
|
|
pass # Don't know what this one does
|
|
elif self.type == RelationsshipType.exclude:
|
|
self.target.included = False
|
|
elif self.type == RelationsshipType.include:
|
|
self.target.included = True
|
|
elif self.type == RelationsshipType.milestone:
|
|
pass # does nothing (Since the source is executed as well)
|
|
elif self.type == RelationsshipType.update:
|
|
pass # Don't know what this one does
|
|
elif self.type == RelationsshipType.spawn:
|
|
pass # We figured it was outside the assignments scope to implement this
|
|
elif self.type == RelationsshipType.templateSpawn:
|
|
pass # Don't know what this one does
|
|
|
|
class Graph():
|
|
def __init__(self, process:Process, relationships:list[Relationship], id_dict: dict) -> None:
|
|
self.process = process
|
|
self.relationships = relationships
|
|
self.id_dict = id_dict
|
|
|
|
@property
|
|
def enabled(self):
|
|
return self.process.enabled_list()
|
|
|
|
@property
|
|
def pending(self):
|
|
return self.process.pending_list()
|
|
|
|
def xml_to_dcr(xml_file):
|
|
with open(xml_file) as file_pointer:
|
|
dcr_dict = xmltodict.parse(file_pointer.read())["dcrgraph"]
|
|
|
|
label_mappings = {
|
|
lm["@eventId"]:lm["@labelId"]
|
|
for lm in listify(dcr_dict["specification"]["resources"]["labelMappings"]["labelMapping"])
|
|
}
|
|
|
|
id_dict: dict[str,Event] = {}
|
|
graph = Process("", "", label_mappings, listify(dcr_dict["specification"]["resources"]["events"]["event"]), id_dict)
|
|
graph.included = True
|
|
|
|
def extract_markings(key):
|
|
return [
|
|
_id["@id"]
|
|
for _id in listify(dcr_dict["runtime"]["marking"][key]["event"])
|
|
] if dcr_dict["runtime"]["marking"][key] is not None else []
|
|
|
|
executed = extract_markings("executed")
|
|
for _id in executed:
|
|
id_dict[_id].executed = True
|
|
|
|
included = extract_markings("included")
|
|
for _id in included:
|
|
id_dict[_id].included = True
|
|
|
|
pending = extract_markings("pendingResponses")
|
|
for _id in pending:
|
|
id_dict[_id].pending = True
|
|
|
|
|
|
def extract_relationships(key):
|
|
return [
|
|
(r["@sourceId"], r["@targetId"])
|
|
for r in listify(dcr_dict["specification"]["constraints"][key][key[:-1]])
|
|
] if dcr_dict["specification"]["constraints"][key] is not None else []
|
|
|
|
conditions = extract_relationships("conditions")
|
|
responses = extract_relationships("responses")
|
|
coresponses = extract_relationships("coresponses")
|
|
excludes = extract_relationships("excludes")
|
|
includes = extract_relationships("includes")
|
|
milestones = extract_relationships("milestones")
|
|
updates = extract_relationships("updates")
|
|
spawns = extract_relationships("spawns")
|
|
templateSpawns = extract_relationships("templateSpawns")
|
|
|
|
relationships: list[Relationship] = []
|
|
|
|
for i, relationship_list in enumerate([conditions,responses,coresponses,excludes,includes,milestones,updates,spawns,templateSpawns]):
|
|
for relationship in relationship_list:
|
|
source = id_dict[relationship[0]]
|
|
target = id_dict[relationship[1]]
|
|
relationships.append(Relationship(source,target, RelationsshipType(i)))
|
|
|
|
for relationship in relationships:
|
|
relationship.source.relations_from.append(relationship)
|
|
relationship.target.relations_to.append(relationship)
|
|
|
|
return Graph(graph, relationship, id_dict)
|