Function written in dynamo to PyRevit

Hi All,
Can anyone help me to convert this function into pyrevit compatible function. This is written in dynamo environment . I think we need to modify ToDSType (Geometry conversion in dynamo) and Flatten from DsCore.List

both elements list and connected list contains revit api elements

def Build_Relations_Graph(elements,connected):
	edges = []
	k = 0
	for e in elements:
		edge = []
		conn = connected[k]
		for c in conn:
			if (e.ToDSType(True)!=c.ToDSType(True)):
				edge.append({e.ToDSType(False),c.ToDSType(False)})	
			
		k+=1
		edges.append(edge)
		
	return Flatten(edges,1) 

Hi Jobin,

Can you please provide the full script. Also what are you trying to accomplish with the edges? Are you using this in a Dynamo script or pyRevit directly? You can look at this article which can help you out:

I am trying to implement Dijikstra in PyRevit. I have given few more details in the post in dynamo forum…Here is the link ToDSType(True) - Developers - Dynamo

if (e.ToDSType(True)!=c.ToDSType(True)):
edge.append({e.ToDSType(False),c.ToDSType(False)})

This is working in dynamo …I am trying to figure out the equivalent statement for pyrevit

Yes this works in dynamo as its using dstype for dynamo. Again what are you trying to do, are you trying to append the edge? If so you dont need the ToDSType(True) part since your getting the edge itself. However, you are not providing the entire script so its difficult to understand your intention. There are multiple ways you can retrieve an edge and add it to a list.

Ok first thing first you have 146 lines of just imports. You might need about 10 of them. If you are using from pyrevit import DB, you dont need to call it again using from Autodesk.Revit.DB import *. You defined the document manager, but then u create a doc variable that u call using pyrevit. I think the best thing to do is first address these issues. I can help you out with this.

1 Like

Okay. When the code was not working I tried with different option during which I added many imports but not deleted them.

Sorry just to clarify. Is this created by someone using Dynamo and you are trying to recreate it?

I created it using dynamo after watching AU presentaion. Now I am implementing in pyrevit.

WeTransfer - Send Large Files & Share Photos Online - Up to 2GB Free here is the dynamo script for the same

Here is the script cleaned a bit:

import math
import itertools
import collections

from collections import defaultdict
from pyrevit import DB, UI, forms, revit,script,EXEC_PARAMS,UI

uiapp = __revit__
uidoc = uiapp.ActiveUIDocument
doc = uiapp.ActiveUIDocument.Document
TestJ = []

__doc__ = """  This tool will place flow aroows in pipes.

Tip : Select the pipe in network and then run the script.

Shift + Click : This mode will place arrows in the reverse direction."""
__title__ = 'Pipe  \n Flow Arrow'
__author__ = 'Jobin Sunny'

my_Check = EXEC_PARAMS.config_mode # for checking click modes



my_list = [[1], [2, 3], [4, 5, 6, 7]]
flat_list = list(itertools.chain(*my_list))


if my_Check:
    Flow_Arrow_Direction = " Normal Flow"
else:
    Flow_Arrow_Direction = " Reverse Flow"

TestList = ["job","mott","PHE"]
Generic_Anno_Famlies = DB.FilteredElementCollector(doc).OfCategory(DB.BuiltInCategory.OST_GenericAnnotation).WhereElementIsElementType().ToElements()


Selected_Arrow_Family = forms.SelectFromList.show(Generic_Anno_Famlies,multiselect=False,name_attr='FamilyName',button_name='Select Flow Arrow Family') 

if not Selected_Arrow_Family: # stopping the script if user does not select any flow arrow family
    forms.alert("No flow arrow family selected","Script Cancelled")
    script.exit()
else:

    if not forms.check_selection():
        script.exit()

ElementIds = uidoc.Selection.GetElementIds()


for el in ElementIds:
    Selected_Pipe = doc.GetElement(el)

Test = []

Points_Test = []
#Making family active			
if not Selected_Arrow_Family.IsActive:
    Selected_Arrow_Family.Activate()

Category_Names = ["Pipes","Pipe Fittings","Pipe Accessories","Mechanical Equipment"]


def flatten(lst, depth=None):
    if depth is None:
        depth = float('inf')
    result = []
    for i in lst:
        if isinstance(i, list) and depth > 0:
            result.extend(flatten(i, depth-1))
        else:
            result.append(i)
    return result

#Function to collect all elements in the connected network

def ConnectedNetworks(ElementList,bool_):
    if isinstance(ElementList,list):
        connector = ElementList
        toggle = 0
    else:
        connector = [ElementList]
        toggle = 1
        
    bool__ = bool_
            
    def nextElements(elem):
        listout = []
        if elem.GetType() == DB.Connector:
            conn = elem
            for c in conn.AllRefs:
                if c.Owner.Id.Equals(elem.Owner.Id):
                    continue
                elif isinstance(c.Owner,DB.MEPSystem):
                    continue
                else:
                    newelem = c.Owner
                listout.append(newelem)
            return listout
        try:
            connectors = elem.ConnectorManager.Connectors
        except:
            connectors = elem.MEPModel.ConnectorManager.Connectors
        for conn in connectors:
            for c in conn.AllRefs:			
                if c.Owner.Id.Equals(elem.Id):
                    continue
                elif isinstance(c.Owner,DB.MEPSystem):
                    continue
                elif c.Owner.Id.Equals(ownerId):
                    continue
                else:
                    newelem = c.Owner
                listout.append(newelem)
        return listout
    
    def nextElementsWithOwner(elem):
        listout = []
        if elem.GetType() == DB.Connector:
            conn = elem
            for c in conn.AllRefs:
                if c.Owner.Id.Equals(elem.Owner.Id):
                    continue
                elif isinstance(c.Owner,DB.MEPSystem):
                    continue
                else:
                    newelem = c.Owner
                listout.append(newelem)
            return listout
        try:
            connectors = elem.ConnectorManager.Connectors
        except:
            connectors = elem.MEPModel.ConnectorManager.Connectors
        for conn in connectors:
            for c in conn.AllRefs:			
                if c.Owner.Id.Equals(elem.Id):
                    continue
                elif isinstance(c.Owner,DB.MEPSystem):
                    continue
                else:
                    newelem = c.Owner
                listout.append(newelem)
        return listout
        
    def collector(elem):
        cont = 0
        elements = nextElements(elem)
        for x in elements:
            if x.Id in lookup:
                cont += 1
            else:
                item = doc.GetElement(x.Id)
    
                lookup[x.Id] = item
                collector(x)
        if cont == len(elements):
            return elem
            
    def collectorWithOwner(elem):
        cont = 0
        elements = nextElementsWithOwner(elem)
        for x in elements:
            if x.Id in lookup:
                cont += 1
            else:
                item = doc.GetElement(x.Id)
    
                lookup[x.Id] = item
                collectorWithOwner(x)
        if cont == len(elements):
            return elem
            
    
    listout = []
    if bool__:
        for x in connector:
            lookup = collections.OrderedDict()
            collectorWithOwner(x)
            listout.append(lookup.Values)
    else:
        for x in connector:
            lookup = collections.OrderedDict()
            if x.GetType() == DB.Connector:
                ownerId = x.Owner.Id
            else:
                ownerId = x.Id
            collector(x)
            listout.append(lookup.Values)
    
    #Assign your output to the OUT variable.
    if toggle:
        return lookup.Values
    else:
        return listout


def elementcategory(element):
    ele = element
    ele_cat = ele.Category

Curr_View = doc.ActiveView

def placeArrows(pipe):
    Center_Of_Pipe = pipe.Location.Curve.Evaluate(0.5, True)
    NewFlowArrowFamily_Created = doc.Create.NewFamilyInstance(Center_Of_Pipe,Selected_Arrow_Family ,Curr_View);
    return NewFlowArrowFamily_Created


All_Elements_In_Network = ConnectedNetworks(Selected_Pipe,True) #False since I don't want selected element
All_Pipes_In_Network = []
for el in All_Elements_In_Network:
    cat = str(el.Category.Name)
    if cat=="Pipes":
      All_Pipes_In_Network.append(el)  
      
      
cat = Selected_Pipe.Category.Name


#Getting the first point
conn = Selected_Pipe.ConnectorManager.Connectors
first_Point = None
for c in conn:
    if not c.IsConnected: #first_Point = DB.XYZ(c.Origin.X,c.Origin.Y,c.Origin.Z)
        first_Point = c.Origin
 
startLoc = first_Point

 
#Getting other open points 
Other_Open_Points = []

for el in All_Pipes_In_Network:
    if el.Id.IntegerValue != Selected_Pipe.Id.IntegerValue:
        conn_ = []
        conn_= el.ConnectorManager.Connectors
        for c_ in conn_:
            if not c_.IsConnected: #DB.XYZ(c_.Origin.X,c_.Origin.Y,c_.Origin.Z)
                next_point = c_.Origin
                Other_Open_Points.append(next_point)   


# Here Comes Dijikstra

def Find_Path_Between_Two_Points(Point1,Poiint2,All_Elements_Of_Category_):
    Closest_Elements_ = Find_Closest_Elements(Point1,Poiint2,All_Elements_Of_Category_) #okay
    
    Connection_ = Find_Connection_Between_Elements(All_Elements_Of_Category_) #okay
    
    Build_Graph_ = Build_Relations_Graph(All_Elements_Of_Category_,Connection_) 
    
    graph_ = Graph()

    for edge in Build_Graph_:
        kn = 0
        for i in edge:
            if kn == 0:
                a = i
            else:
                b = i
            kn = kn+1
        graph_.add_edge(a,b,1)
    
    path = dijsktra(graph_,Closest_Elements_[0], Closest_Elements_[1])
    
    for i in Build_Graph_:
        inter = []
        for j in i:
            inter.append(j.IntegerValue)
        TestJ.append(inter)
    
    
    return path
    
class Graph():
    def __init__(self):
        self.edges = defaultdict(list)
        self.weights = {}
    
    def add_edge(self, from_node, to_node, weight):
        # Note: assumes edges are bi-directional
        self.edges[from_node].append(to_node)
        self.edges[to_node].append(from_node)
        self.weights[(from_node, to_node)] = weight
        self.weights[(to_node, from_node)] = weight


def dijsktra(graph, initial, end):
    # shortest paths is a dict of nodes
    # whose value is a tuple of (previous node, weight)
    shortest_paths = {initial: (None, 0)}
    current_node = initial
    visited = set()
    
    while current_node != end:
        visited.add(current_node)
        destinations = graph.edges[current_node]
        weight_to_current_node = shortest_paths[current_node][1]

        for next_node in destinations:
            weight = graph.weights[(current_node, next_node)] + weight_to_current_node
            if next_node not in shortest_paths:
                shortest_paths[next_node] = (current_node, weight)
            else:
                current_shortest_weight = shortest_paths[next_node][1]
                if current_shortest_weight > weight:
                    shortest_paths[next_node] = (current_node, weight)
        
        next_destinations = {node: shortest_paths[node] for node in shortest_paths if node not in visited}
        if not next_destinations:
            return "Route Not Possible"
        # next node is the destination with the lowest weight
        current_node = min(next_destinations, key=lambda k: next_destinations[k][1])
    
    # Work back through destinations in shortest path
    path = []
    while current_node is not None:
        path.append(current_node)
        next_node = shortest_paths[current_node][0]
        current_node = next_node
    # Reverse path
    path = path[::-1]
    return path


def pow2(n):
    return	math.pow(n,2)

def distance(p1,p2):
    return math.fabs(math.sqrt(pow2(p2.X-p1.X)+pow2(p2.Y-p1.Y)+pow2(p2.Z-p1.Z)))
  
def Find_Closest_Elements(Point1,Point2,All_Elements_Of_Category):
    points = []
    pointElements = []
    for e in All_Elements_Of_Category: 
        eLoc = e.Location
        Test.append(eLoc)
        #test.append(eLoc_dynamo.Get_Geometry())
        
    
        #startEndPoint.append(eLoc)
        try: # For curve based elements
            
            
            s1 = e.Location.Curve.Evaluate(0, True)
            points.append(s1)
            pointElements.append(e)
            s2 = e.Location.Curve.Evaluate(1, True)
            points.append(s2)
            pointElements.append(e)
        except: # For point based elements
            points.append(eLoc.Point)
            pointElements.append(e)
            
    d0S = 1000000000
    d0E = 1000000000
    minS, minE = [],[]
    sEl, eEl = [],[]
    
    for p,e in zip(points,pointElements):
        dS = distance(p,startLoc)
        dE = distance(p,endLoc)
        if (dS<d0S):
            minS = p
            sEl = e
            d0S = dS
        if (dE<d0E):
            minE = p
            eEl = e
            d0E = dE
            
    
    
    return 	sEl,eEl

#Find Connection between Elements

def Find_Connection_Between_Elements(All_Elements_Of_Category): 
    p = []
    dir = []
    rep = []
    ref = []
    
    #elements = List.Flatten(elements,2)
    
    for e in All_Elements_Of_Category:
        refs = []
    
        try:
            connectors = e.MEPModel.ConnectorManager.Connectors
        except:
            connectors = e.ConnectorManager.Connectors
        for conn in connectors:
            for c in conn.AllRefs:
                connectortype = str(c.ConnectorType)
                if connectortype == "End":
                    refs.append(c.Owner)
        ref.append(refs)
    
    #Assign your output to the OUT variable.
    
    return ref
    

def Build_Relations_Graph(elements,connected):
    edges = []
    k = 0
    for e in elements:
        edge = []
        conn = connected[k]
        for c in conn:
            if (e!=c):
                edge.append({e.Id,c.Id})	
            
        k+=1
        edges.append(edge)
        
    return [elem for sublist in edges for elem in sublist]

# Calling Dijikstra

All_Possible_Path = []



for lp in Other_Open_Points:
    endLoc = lp
    path_ = Find_Path_Between_Two_Points(first_Point,lp,All_Elements_In_Network)
    All_Possible_Path.append(path_)
    

print(All_Possible_Path)

1 Like

ouput of Build_Relations_Graph function is not same when compared with dynamo

Yes because Dynamo conversion Geometry is different as explained here:

Couple of questions that might resolve this, do you know Python? Have you check the Revit API docs?

yes I know python. As per my understanding geometry conversion in dynamo is not to be considered when we write our script in script.py file right?..that’s why I removed TODSType in script.py file

Yes but you are trying to get the edge itself in that function, are you trying to get them as curves? I havent watched the presentation to understand the full intent. If it is as curve you can use e.AsCurve() method. However, from your explanation I cannot understand if you are getting the connected edges why you would want the Ids not the elements?

def Build_Relations_Graph(elements,connected):
    edges = []
    k = 0
    for e in elements:
        edge = []
        conn = connected[k]
        for c in conn:
            if e != c:
                edge.append((e, c))
        k += 1
        edges.append(edge)
    return [elem for sublist in edges for elem in sublist]

edge should contain set composed of hashable elements to make comparisons (example like Id).

Ok can you please upload the file here, (revit file as I barely use MEP), and what the output from dynamo looks like.

1 Like

WeTransfer - Send Large Files & Share Photos Online - Up to 2GB Free revit file


If YOU select 2 end pipes this is the dynamo output

Ok a couple of things to note, these are defined but not used anywhere that I can see:
def elementcategory(element), flatten function, TestJ, Test, All_Possible_Path and Points_Test

I would suggest creating a class for the connected network and one for Dijikstra to make it easier to read.

Im still going through the logic, hopefully will be able to help.