Function written in dynamo to PyRevit

Correction ive found TestJ and Test.

elementcategory function is not required as we can get the category from the element using blow line of code
cat = str(el.Category.Name)
Faltten from dynamo cannot be used here in pyrevit so I defined a function to flatten list. but it is not used instead itertools module is used to flatten.
all other list which contains test were created to debug the code. All_Possible_Path is the list which will be storing the final output. script.py is written in such a way that user will select one pipe (with one open end- whose location point is used as start point) and script will identify all other open ends and finally call to dijikstra muliple times with fixed start point and end points as other open ends. This is the logic used in script.py file

@JobinSunny
Youā€™re post made me want this script for myself.
But honestly your code is very messy in my opinion, so I completely remade your script.
Iā€™m not sure if it will be helpful for you, because my implementation is completely different.
However, Iā€™ll post my implementation of the path finding portion of your script because you gave me the idea.

Edit: I will try to explain how this works.
Nodes represent either a family connected to a pipe (I believe it will work with any family type that connects to pipes i.e. pipe accessory, plumbing fixture, mechanical equipmentā€¦ ect.), or it may also represent an open end of a pipe.
The location of the family node is either the origin point defined in the family, or the origin of the open connector of as pipe node:
And if a node has any open connection, the end_node attribute is set to True.

Edges represent the pipes that connect the nodes together, where the first_node and second_node are the nodes that the pipe connects to.
The edges store the distance between the two nodes it connects which is used to calculate the weight of the nodes.

To generate the graph which represents the piping network I iterate over all the connections of an initial node and recursively generate the edges and nodes that it connects to that node.
There is also some additional conditions to ensure connected nodes reference the same edges, so their exist only 1 copy of each edge and node (This was the most difficult part for me to implement).

With that structure we can easily start at any node in the graph and traverse the graph to any other node by looking at the edges of a node and recursively traversing it.
Edit end:

This is what I tested it on, and it seemed to work just fine.

import sys

from Autodesk.Revit.DB import Element, ElementId, FamilyInstance, Connector, XYZ
from Autodesk.Revit.DB.Plumbing import Pipe
from Autodesk.Revit.UI import UIApplication
from Autodesk.Revit.UI.Selection import ISelectionFilter, ObjectType
from Autodesk.Revit.Exceptions import OperationCanceledException

from revit_utils.connectors import get_connectors, get_connected_connector, ConnectorType

UIAPP = __revit__  # type: UIApplication
UIDOC = UIAPP.ActiveUIDocument
DOC = UIDOC.Document


class Graph(dict):
    def __init__(self, initial_element):
        # type: (Graph, Pipe | FamilyInstance) -> None
        super(Graph, self).__init__()
        self._generate_graph(initial_element)

    @property
    def nodes(self):
        # type: (Graph) -> [Node]
        return self.values()

    @property
    def end_nodes(self):
        # type: (Graph) -> [Node]
        return [node for node in self.values() if node.end_node]

    def add_node(self, node):
        # type: (Graph, Node) -> None
        self[node] = node
        node.generate_edges(self)
        for edge in node.edges:
            end_node = edge.get_end_node(node)
            if end_node is None or end_node in self:
                continue
            else:
                self.add_node(end_node)

    def dijsktra(self, initial, end):
        # type: (Graph, Element, Element) -> Path
        initial = self.get(initial.Id, None)  # type: Node
        end = self.get(end.Id, None)  # type: Node
        if initial is None or end is None:
            raise ValueError("Elements are not inside graph")
        # shortest paths is a dict of nodes
        # whose value is a tuple of (previous node, weight)
        shortest_paths = {initial: (None, 0.0)}
        current_node = initial
        visited = set()

        while current_node != end:
            visited.add(current_node)
            paths = current_node.edges
            weight_to_current_node = shortest_paths[current_node][1]

            for path in paths:
                path = path  # type: Edge
                next_node = path.get_end_node(current_node)
                weight = path.distance + weight_to_current_node

                if next_node not in shortest_paths:
                    shortest_paths[next_node] = (current_node, weight)
                else:
                    current_shortest_weight = shortest_paths[next_node][1]
                    if current_shortest_weight > weight:
                        shortest_paths[next_node] = (current_node, weight)

            next_destinations = {node: path_value for node, path_value in shortest_paths.items() if node not in visited}
            if next_destinations:
                current_node = min(next_destinations, key=lambda k: next_destinations[k][1])
            else:
                raise ValueError("Route not possible")
            # next node is the destination with the lowest weight

        # Work back through destinations in shortest path
        path = []  # type: list
        path_length = shortest_paths[current_node][1]
        while current_node is not None:
            path.append(current_node)
            next_node = shortest_paths[current_node][0]
            current_node = next_node
        # Reverse path
        path = path[::-1]
        return Path(initial, end, path, path_length)

    def _generate_graph(self, initial_element):
        # type: (Graph, Element) -> None
        if isinstance(initial_element, Pipe):
            initial_node = Node(initial_element, get_connectors(initial_element, ConnectorType.Open)[0].Origin, end_node=True)
        elif isinstance(initial_element, FamilyInstance):
            initial_node = Node(initial_element, initial_element.Location.Point)
        else:
            return
        self.add_node(initial_node)

    def __repr__(self):
        # type: (Graph) -> str
        return "<{}.Graph object ar {:#018x}".format(__name__, (id(self)))


def get_next_node(path, next_element, graph):
    # type: (Connector, Element, Graph) -> (Element, Node | None)
    starting_element = path.Owner
    if path.IsConnected:
        connected_path = get_connected_connector(path)
        connected_element = connected_path.Owner
        if starting_element.Id == connected_element.Id:
            return starting_element, None
        if isinstance(connected_element, Pipe):
            for pipe_connector in get_connectors(connected_element):
                if pipe_connector.IsConnectedTo(path):
                    continue
                elif pipe_connector.IsConnected:
                    next_element = get_connected_connector(pipe_connector, owner=True)
                    return get_next_node(pipe_connector, next_element, graph)
                else:
                    return connected_element, graph.get(connected_element.Id, Node(connected_element, pipe_connector.Origin))
        elif isinstance(connected_element, FamilyInstance):
            return starting_element, graph.get(connected_element.Id, Node(connected_element, connected_element.Location.Point))
        else:
            return None, None
    else:
        if starting_element.Id == next_element.Id:
            return starting_element, None
        else:
            return starting_element, graph.get(next_element.Id, Node(next_element, path.Origin, end_node=True))


class Node(dict):
    def __init__(self, element, location, end_node=False):
        # type: (Node, Element, XYZ, bool) -> None
        self.element = element
        self.location = location
        self.end_node = end_node
        super(Node, self).__init__()

    @property
    def edges(self):
        # type: (Node) -> [Edge]
        return self.values()

    def generate_edges(self, graph):
        # type: (Node, Graph) -> None
        paths = get_connectors(self.element)
        for path in paths:
            edge_element, next_node = get_next_node(path, self.element, graph)
            if next_node:
                if edge_element in self.edges:
                    continue
                edge = Edge(edge_element, self, next_node)
                self[edge] = edge
                next_node[edge] = edge
            else:
                self.end_node = True

    def __eq__(self, other):
        # type: (Node, object) -> bool
        if isinstance(other, Node):
            return self.element.Id == other.element.Id
        if isinstance(other, Element):
            return self.element.Id == other.Id
        elif isinstance(other, ElementId):
            return self.element.Id == other
        else:
            return self == other

    def __hash__(self):
        # type: (Node) -> int
        return hash(self.element.Id)

    def __repr__(self):
        # type: (Node) -> str
        return "<{}.Node object ar {:#018x}".format(__name__, (id(self)))

    def __nonzero__(self):
        # type: (Node) -> bool
        return True if self.element else False


class Edge:
    def __init__(self, element, first_node, second_node):
        # type: (Edge, Element, Node, Node | None) -> None
        self.element = element
        self.first_node = first_node
        self.second_node = second_node
        self.distance = self.second_node.location.Subtract(self.first_node.location).GetLength()

    def get_end_node(self, other_node):
        # type: (Edge, Node) -> Node | None
        if other_node == self.first_node:
            return self.second_node
        elif other_node == self.second_node:
            return self.first_node
        else:
            return None

    def __eq__(self, other):
        # type: (Edge, object) -> bool
        if isinstance(other, Edge):
            return self.element.Id == other.element.Id
        elif isinstance(other, Element):
            return self.element.Id == other.Id
        elif isinstance(other, ElementId):
            return self.element.Id == other
        else:
            return self == other

    def __hash__(self):
        # type: (Edge) -> int
        return hash(self.element.Id)

    def __nonzero__(self):
        # type: (Edge) -> bool
        return True if self.element else False


class Path:
    def __init__(self, start, end, paths, path_length):
        # type: (Node, Node, [Node], float) -> None
        self.start = start
        self.end = end
        self.paths = paths
        self.path_length = path_length


class OpenConnectorFilter(ISelectionFilter):
    def __init__(self):
        pass

    def AllowElement(self, element):
        return len(get_connectors(element, connector_type=ConnectorType.Open)) >= 1

    def AllowReference(self, reference, point):
        return True


if __name__ == "__main__":
    selection_filter = OpenConnectorFilter()
    try:
        start_element = DOC.GetElement(UIDOC.Selection.PickObject(ObjectType.Element, selection_filter))
        end_element = DOC.GetElement(UIDOC.Selection.PickObject(ObjectType.Element, selection_filter))
    except OperationCanceledException:
        sys.exit()

    graph = Graph(start_element)
    path = graph.dijsktra(start_element, end_element)
    print([node.element.Id.IntegerValue for node in path.paths], path.path_length)

These are the functions I imported from the ā€œrevit_utils.connectorsā€ module.

from Autodesk.Revit.DB import (Element, Connector, Domain)

from enum import Enum


class ConnectorType(Enum):
    All = 0
    Open = 1
    Closed = 2


def _get_connectors(element, domain):
    # type: (Element, Domain) -> [Connector]
    if hasattr(element, 'MEPModel'):
        return tuple(connector for connector in element.MEPModel.ConnectorManager.Connectors
                     if connector.Domain == domain)
    elif hasattr(element, "ConnectorManager"):
        return tuple(connector for connector in element.ConnectorManager.Connectors
                     if connector.Domain == domain)
    else:
        return ()


def get_connectors(element, connector_type=ConnectorType.All, domain=Domain.DomainPiping):
    # type: (Element, ConnectorType | int, Domain) -> [Connector]
    """
    :param element: Element
    :param connector_type: ConnectorType to return 0 = All, 1 = Open, 2 = Closed
    :param domain: Domain of connectors
    :rtype: tuple
    :return: tuple of all connectors of an element
    """
    if connector_type == ConnectorType.All:
        return _get_connectors(element, domain)
    elif connector_type == ConnectorType.Open:
        return tuple(conn for conn in _get_connectors(element, domain) if not conn.IsConnected)
    elif connector_type == ConnectorType.Closed:
        return tuple(conn for conn in _get_connectors(element, domain) if conn.IsConnected)



def get_connected_connector(connector, owner=False):
    # type: (Connector, bool) -> Element | Connector | None
    """
    :param connector: Connector to get connector from
    :param owner:  bool for getting connector or element connector is attached to
    :return: Element or Connector of connected element/connector
    """
    if not connector:
        return None
    for ref in connector.AllRefs:
        if ref.Owner.Category.Id.IntegerValue == -2008122: # this ignores pipe insulation
            continue
        elif ref.Owner.Id == connector.Owner.Id:
            continue
        elif connector.IsConnectedTo(ref) and isinstance(ref, Connector):
            return ref.Owner if owner else ref