Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,4 @@ cython_debug/
marimo/_static/
marimo/_lsp/
__marimo__/
examples/automator-license.lic
29 changes: 29 additions & 0 deletions examples/Drone.sysml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package MechanicalObjectExample {
abstract item def MechanicalObject {
attribute mass :> ISQ::mass;
}

abstract item mechanicalObjects[*] : MechanicalObject;

metadata def <mec> MechanicalObjectMetadata :> Metaobjects::SemanticMetadata {
:>> baseType = mechanicalObjects meta SysML::Usage;
}

part def DroneSystem {
part def Drone {
#mec part battery {
attribute :>> mass = 2.5 [SI::kg];
item shape[1] : ShapeItems::Box :>> shape {
attribute :>> length = 10 [SI::cm];
attribute :>> width = 4 [SI::cm];
attribute :>> height = 3 [SI::cm];
}
}
#mec part propulsionUnit {
attribute :>> mass = 0.5 [SI::kg];
item shape[1] : ShapeItems::Cylinder :>> shape;
}
}
}

}
15 changes: 15 additions & 0 deletions examples/Drone2.sysml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package MechanicalObjectExample {
private import ScalarValues::*;
part def DroneSystem {
part def Drone {
part battery {
/*attribute mass:ISQ::MassValue = 2.5 [SI::kg];*/
attribute m:Real=2.5;
}
part propulsionUnit {
attribute mass:ISQ::MassValue = 0.5 [SI::kg];
}
}
}

}
1,416 changes: 1,416 additions & 0 deletions examples/ESA.sysml

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions examples/Test3.sysml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package FlashlightStarterModel {
part flashlight {
exhibit state flashlightStates {
state off;
transition start then off;
}
}
}

3 changes: 0 additions & 3 deletions examples/test2.sysml

This file was deleted.

2 changes: 1 addition & 1 deletion src/flexo_syside_lib/committer.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def commit_sysml_to_flexo(
if verbose:
print(f"[Flexo] Found existing project '{project_name}' -> id: {proj_id}")
else:
_, created_id, _ = create_sysml_project(client, project_name)
created_project_obj, created_id, initial_commit_id = create_sysml_project(client, project_name)
proj_id = created_id
created_project = True
if verbose:
Expand Down
223 changes: 223 additions & 0 deletions src/flexo_syside_lib/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,3 +286,226 @@ def convert_json_to_sysml_textual(json_flexo:str, debug:bool=False):

return (sysml_text, deserialized_model), captured_warnings

def _children_iter(elem):
children = getattr(elem, "owned_elements", None)
if not children:
return []
try:
return list(children)
except TypeError:
out = []
children.for_each(lambda e: out.append(e))
return out

def find_partusage_by_definition(elem, defining_part_name: str, usage_name: str | None = None):
"""
Return the FIRST/HIGHEST PartUsage whose PartDefinition name matches `defining_part_name`
and optionally whose own PartUsage.name matches `usage_name`.

Parameters:
elem : SysML AST root node for traversal.
defining_part_name : The PartDefinition.name to match (e.g., "Component").
usage_name : Optional PartUsage.name to filter on (e.g., "rootmodule").
"""
def has_matching_def(node):
"""Return True if this PartUsage has a PartDefinition with the given name."""
if not node.try_cast(syside.PartUsage):
return False
try:
for pd in node.part_definitions:
if getattr(pd, "name", None) == defining_part_name:
return True
except Exception:
pass
return False

def matches_usage_name(node):
"""Optional check that PartUsage.name matches the filter (if provided)."""
if usage_name is None:
return True
return getattr(node, "name", None) == usage_name

def dfs(node):
is_part = bool(node.try_cast(syside.PartUsage))
here_matches = is_part and has_matching_def(node) and matches_usage_name(node)

subtree_has_match = here_matches
child_found = None

for ch in _children_iter(node):
found, child_has = dfs(ch)
subtree_has_match = subtree_has_match or child_has or (found is not None)
if found is not None and child_found is None:
child_found = found

if here_matches:
return node, True # this node satisfies the filters, return it

if child_found is not None:
return child_found, True

return None, subtree_has_match

found, _ = dfs(elem)
return found

def find_component_partusage(elem):
"""
Find the FIRST/HIGHEST PartUsage whose PartDefinition name is "Component"
AND that PartUsage has at least one DIRECT child that is a PartUsage.
Return that DIRECT child (the first direct PartUsage child encountered).
Works with SysIDE's Python SysMLv2 API.
"""

def is_component_partusage(node) -> bool:
pu = node.try_cast(syside.PartUsage)
if not pu:
return False
try:
for pd in pu.part_definitions:
if getattr(pd, "name", None) == "Component":
return True
except Exception:
# If we cannot inspect part_definitions (API inconsistency or other error),
# conservatively treat this node as not being a Component.
pass
return False

def first_direct_partusage_child(node):
for ch in _children_iter(node):
pu_child = ch.try_cast(syside.PartUsage)
if pu_child:
return ch # return the direct child node itself
return None

def dfs(node):
# Pre-order: highest match wins
if is_component_partusage(node):
direct_child = first_direct_partusage_child(node)
if direct_child is not None:
return direct_child

for ch in _children_iter(node):
found = dfs(ch)
if found is not None:
return found

return None

return dfs(elem)

def walk_ownership_tree(element, level: int = 0) -> None:
"""
Prints out all elements in a model in a tree-like format, where child
elements appear indented under their parent elements. For example:

Parent
Child1
Child2
Grandchild

Args:
element: The model element to start printing from (syside.Element)
level: How many levels to indent (increases for nested elements)
"""

if element.try_cast(syside.AttributeUsage):
attr = element.cast(syside.AttributeUsage)
expression_a1 = next(iter(attr.owned_elements), None)
if expression_a1 is not None and isinstance(expression_a1, syside.LiteralRational):
print(" " * level, f"{attr.name} = {expression_a1.value}")
elif expression_a1 is not None and isinstance(expression_a1, syside.LiteralInteger):
print(" " * level, f"{attr.name} = {expression_a1.value}")
else:
print(" " * level, f"{attr.name}", type(expression_a1))
elif element.name is not None:
print(" " * level, element.name)
# Recursively call walk_ownership_tree() for each owned element
# (child element).
element.owned_elements.for_each(
lambda owned_element: walk_ownership_tree(owned_element, level + 1)
)

def find_part_by_name(element, name: str, part_level: int = 0):
"""
Depth-first search for a PartUsage by name.
Prints the part hierarchy as it goes and returns the first match.

Args:
element: The model element to search from (syside.Element)
name: The part name to find
part_level: Current indentation level for printing
"""

part = element.try_cast(syside.PartUsage)
if part:
print(" " * part_level + part.name)
if part.name == name:
return part
part_level += 1 # indent children of parts

# Iterate children in a way that allows early return
children = getattr(element, "owned_elements", None)
if not children:
return None

# Try to iterate directly; if not iterable, materialize via for_each
try:
iterator = iter(children)
except TypeError:
lst = []
children.for_each(lambda e: lst.append(e))
iterator = iter(lst)

for child in iterator:
found = find_part_by_name(child, name, part_level)
if found is not None:
return found

return None

def find_expression_attribute_values(element, level=0):
"""
Find and evaluate expression attribute values in a SysML model element.
Follows the CTO-provided logic. Expects 'syside' to be importable.
"""
try:
import syside # type: ignore
except Exception:
# Defer to evaluate_sysml_expressions' import path resolution
try:
from flexo_syside_lib import syside # type: ignore
except Exception:
from flexo_syside_lib.core import syside # type: ignore

if hasattr(element, "try_cast") and element.try_cast(syside.AttributeUsage):
attr = element.cast(syside.AttributeUsage)
expression_a1 = None
try:
expression_a1 = next(iter(attr.owned_elements), None)
except Exception:
expression_a1 = None
if expression_a1 is not None and isinstance(expression_a1, syside.Expression):
compiler = syside.Compiler()
result, report = compiler.evaluate(expression_a1)
assert not report.fatal, report.diagnostics
name = (
getattr(attr, "qualified_name", None)
or getattr(attr, "declared_name", None)
or "<unnamed>"
)
print(f"{name}: {result}")

try:
element.owned_elements.for_each(
lambda owned_element: find_expression_attribute_values(
owned_element, level + 1
)
)
except Exception:
# If owned_elements is not an iterable with for_each, try a generic iteration
try:
for owned_element in element.owned_elements:
find_expression_attribute_values(owned_element, level + 1)
except Exception:
pass
Loading