Correction ive found TestJ and Test.
elementcategory function is not required as we can get the category from the element using blow line of code
cat = str(el.Category.Name)
Faltten from dynamo cannot be used here in pyrevit so I defined a function to flatten list. but it is not used instead itertools module is used to flatten.
all other list which contains test were created to debug the code. All_Possible_Path is the list which will be storing the final output. script.py is written in such a way that user will select one pipe (with one open end- whose location point is used as start point) and script will identify all other open ends and finally call to dijikstra muliple times with fixed start point and end points as other open ends. This is the logic used in script.py file
@JobinSunny
Youāre post made me want this script for myself.
But honestly your code is very messy in my opinion, so I completely remade your script.
Iām not sure if it will be helpful for you, because my implementation is completely different.
However, Iāll post my implementation of the path finding portion of your script because you gave me the idea.
Edit: I will try to explain how this works.
Nodes represent either a family connected to a pipe (I believe it will work with any family type that connects to pipes i.e. pipe accessory, plumbing fixture, mechanical equipment⦠ect.), or it may also represent an open end of a pipe.
The location of the family node is either the origin point defined in the family, or the origin of the open connector of as pipe node:
And if a node has any open connection, the end_node attribute is set to True.
Edges represent the pipes that connect the nodes together, where the first_node and second_node are the nodes that the pipe connects to.
The edges store the distance between the two nodes it connects which is used to calculate the weight of the nodes.
To generate the graph which represents the piping network I iterate over all the connections of an initial node and recursively generate the edges and nodes that it connects to that node.
There is also some additional conditions to ensure connected nodes reference the same edges, so their exist only 1 copy of each edge and node (This was the most difficult part for me to implement).
With that structure we can easily start at any node in the graph and traverse the graph to any other node by looking at the edges of a node and recursively traversing it.
Edit end:
This is what I tested it on, and it seemed to work just fine.
import sys
from Autodesk.Revit.DB import Element, ElementId, FamilyInstance, Connector, XYZ
from Autodesk.Revit.DB.Plumbing import Pipe
from Autodesk.Revit.UI import UIApplication
from Autodesk.Revit.UI.Selection import ISelectionFilter, ObjectType
from Autodesk.Revit.Exceptions import OperationCanceledException
from revit_utils.connectors import get_connectors, get_connected_connector, ConnectorType
UIAPP = __revit__ # type: UIApplication
UIDOC = UIAPP.ActiveUIDocument
DOC = UIDOC.Document
class Graph(dict):
def __init__(self, initial_element):
# type: (Graph, Pipe | FamilyInstance) -> None
super(Graph, self).__init__()
self._generate_graph(initial_element)
@property
def nodes(self):
# type: (Graph) -> [Node]
return self.values()
@property
def end_nodes(self):
# type: (Graph) -> [Node]
return [node for node in self.values() if node.end_node]
def add_node(self, node):
# type: (Graph, Node) -> None
self[node] = node
node.generate_edges(self)
for edge in node.edges:
end_node = edge.get_end_node(node)
if end_node is None or end_node in self:
continue
else:
self.add_node(end_node)
def dijsktra(self, initial, end):
# type: (Graph, Element, Element) -> Path
initial = self.get(initial.Id, None) # type: Node
end = self.get(end.Id, None) # type: Node
if initial is None or end is None:
raise ValueError("Elements are not inside graph")
# shortest paths is a dict of nodes
# whose value is a tuple of (previous node, weight)
shortest_paths = {initial: (None, 0.0)}
current_node = initial
visited = set()
while current_node != end:
visited.add(current_node)
paths = current_node.edges
weight_to_current_node = shortest_paths[current_node][1]
for path in paths:
path = path # type: Edge
next_node = path.get_end_node(current_node)
weight = path.distance + weight_to_current_node
if next_node not in shortest_paths:
shortest_paths[next_node] = (current_node, weight)
else:
current_shortest_weight = shortest_paths[next_node][1]
if current_shortest_weight > weight:
shortest_paths[next_node] = (current_node, weight)
next_destinations = {node: path_value for node, path_value in shortest_paths.items() if node not in visited}
if next_destinations:
current_node = min(next_destinations, key=lambda k: next_destinations[k][1])
else:
raise ValueError("Route not possible")
# next node is the destination with the lowest weight
# Work back through destinations in shortest path
path = [] # type: list
path_length = shortest_paths[current_node][1]
while current_node is not None:
path.append(current_node)
next_node = shortest_paths[current_node][0]
current_node = next_node
# Reverse path
path = path[::-1]
return Path(initial, end, path, path_length)
def _generate_graph(self, initial_element):
# type: (Graph, Element) -> None
if isinstance(initial_element, Pipe):
initial_node = Node(initial_element, get_connectors(initial_element, ConnectorType.Open)[0].Origin, end_node=True)
elif isinstance(initial_element, FamilyInstance):
initial_node = Node(initial_element, initial_element.Location.Point)
else:
return
self.add_node(initial_node)
def __repr__(self):
# type: (Graph) -> str
return "<{}.Graph object ar {:#018x}".format(__name__, (id(self)))
def get_next_node(path, next_element, graph):
# type: (Connector, Element, Graph) -> (Element, Node | None)
starting_element = path.Owner
if path.IsConnected:
connected_path = get_connected_connector(path)
connected_element = connected_path.Owner
if starting_element.Id == connected_element.Id:
return starting_element, None
if isinstance(connected_element, Pipe):
for pipe_connector in get_connectors(connected_element):
if pipe_connector.IsConnectedTo(path):
continue
elif pipe_connector.IsConnected:
next_element = get_connected_connector(pipe_connector, owner=True)
return get_next_node(pipe_connector, next_element, graph)
else:
return connected_element, graph.get(connected_element.Id, Node(connected_element, pipe_connector.Origin))
elif isinstance(connected_element, FamilyInstance):
return starting_element, graph.get(connected_element.Id, Node(connected_element, connected_element.Location.Point))
else:
return None, None
else:
if starting_element.Id == next_element.Id:
return starting_element, None
else:
return starting_element, graph.get(next_element.Id, Node(next_element, path.Origin, end_node=True))
class Node(dict):
def __init__(self, element, location, end_node=False):
# type: (Node, Element, XYZ, bool) -> None
self.element = element
self.location = location
self.end_node = end_node
super(Node, self).__init__()
@property
def edges(self):
# type: (Node) -> [Edge]
return self.values()
def generate_edges(self, graph):
# type: (Node, Graph) -> None
paths = get_connectors(self.element)
for path in paths:
edge_element, next_node = get_next_node(path, self.element, graph)
if next_node:
if edge_element in self.edges:
continue
edge = Edge(edge_element, self, next_node)
self[edge] = edge
next_node[edge] = edge
else:
self.end_node = True
def __eq__(self, other):
# type: (Node, object) -> bool
if isinstance(other, Node):
return self.element.Id == other.element.Id
if isinstance(other, Element):
return self.element.Id == other.Id
elif isinstance(other, ElementId):
return self.element.Id == other
else:
return self == other
def __hash__(self):
# type: (Node) -> int
return hash(self.element.Id)
def __repr__(self):
# type: (Node) -> str
return "<{}.Node object ar {:#018x}".format(__name__, (id(self)))
def __nonzero__(self):
# type: (Node) -> bool
return True if self.element else False
class Edge:
def __init__(self, element, first_node, second_node):
# type: (Edge, Element, Node, Node | None) -> None
self.element = element
self.first_node = first_node
self.second_node = second_node
self.distance = self.second_node.location.Subtract(self.first_node.location).GetLength()
def get_end_node(self, other_node):
# type: (Edge, Node) -> Node | None
if other_node == self.first_node:
return self.second_node
elif other_node == self.second_node:
return self.first_node
else:
return None
def __eq__(self, other):
# type: (Edge, object) -> bool
if isinstance(other, Edge):
return self.element.Id == other.element.Id
elif isinstance(other, Element):
return self.element.Id == other.Id
elif isinstance(other, ElementId):
return self.element.Id == other
else:
return self == other
def __hash__(self):
# type: (Edge) -> int
return hash(self.element.Id)
def __nonzero__(self):
# type: (Edge) -> bool
return True if self.element else False
class Path:
def __init__(self, start, end, paths, path_length):
# type: (Node, Node, [Node], float) -> None
self.start = start
self.end = end
self.paths = paths
self.path_length = path_length
class OpenConnectorFilter(ISelectionFilter):
def __init__(self):
pass
def AllowElement(self, element):
return len(get_connectors(element, connector_type=ConnectorType.Open)) >= 1
def AllowReference(self, reference, point):
return True
if __name__ == "__main__":
selection_filter = OpenConnectorFilter()
try:
start_element = DOC.GetElement(UIDOC.Selection.PickObject(ObjectType.Element, selection_filter))
end_element = DOC.GetElement(UIDOC.Selection.PickObject(ObjectType.Element, selection_filter))
except OperationCanceledException:
sys.exit()
graph = Graph(start_element)
path = graph.dijsktra(start_element, end_element)
print([node.element.Id.IntegerValue for node in path.paths], path.path_length)
These are the functions I imported from the ārevit_utils.connectorsā module.
from Autodesk.Revit.DB import (Element, Connector, Domain)
from enum import Enum
class ConnectorType(Enum):
All = 0
Open = 1
Closed = 2
def _get_connectors(element, domain):
# type: (Element, Domain) -> [Connector]
if hasattr(element, 'MEPModel'):
return tuple(connector for connector in element.MEPModel.ConnectorManager.Connectors
if connector.Domain == domain)
elif hasattr(element, "ConnectorManager"):
return tuple(connector for connector in element.ConnectorManager.Connectors
if connector.Domain == domain)
else:
return ()
def get_connectors(element, connector_type=ConnectorType.All, domain=Domain.DomainPiping):
# type: (Element, ConnectorType | int, Domain) -> [Connector]
"""
:param element: Element
:param connector_type: ConnectorType to return 0 = All, 1 = Open, 2 = Closed
:param domain: Domain of connectors
:rtype: tuple
:return: tuple of all connectors of an element
"""
if connector_type == ConnectorType.All:
return _get_connectors(element, domain)
elif connector_type == ConnectorType.Open:
return tuple(conn for conn in _get_connectors(element, domain) if not conn.IsConnected)
elif connector_type == ConnectorType.Closed:
return tuple(conn for conn in _get_connectors(element, domain) if conn.IsConnected)
def get_connected_connector(connector, owner=False):
# type: (Connector, bool) -> Element | Connector | None
"""
:param connector: Connector to get connector from
:param owner: bool for getting connector or element connector is attached to
:return: Element or Connector of connected element/connector
"""
if not connector:
return None
for ref in connector.AllRefs:
if ref.Owner.Category.Id.IntegerValue == -2008122: # this ignores pipe insulation
continue
elif ref.Owner.Id == connector.Owner.Id:
continue
elif connector.IsConnectedTo(ref) and isinstance(ref, Connector):
return ref.Owner if owner else ref