Here is the script cleaned a bit:
import math
import itertools
import collections
from collections import defaultdict
from pyrevit import DB, UI, forms, revit,script,EXEC_PARAMS,UI
uiapp = __revit__
uidoc = uiapp.ActiveUIDocument
doc = uiapp.ActiveUIDocument.Document
TestJ = []
__doc__ = """ This tool will place flow aroows in pipes.
Tip : Select the pipe in network and then run the script.
Shift + Click : This mode will place arrows in the reverse direction."""
__title__ = 'Pipe \n Flow Arrow'
__author__ = 'Jobin Sunny'
my_Check = EXEC_PARAMS.config_mode # for checking click modes
my_list = [[1], [2, 3], [4, 5, 6, 7]]
flat_list = list(itertools.chain(*my_list))
if my_Check:
Flow_Arrow_Direction = " Normal Flow"
else:
Flow_Arrow_Direction = " Reverse Flow"
TestList = ["job","mott","PHE"]
Generic_Anno_Famlies = DB.FilteredElementCollector(doc).OfCategory(DB.BuiltInCategory.OST_GenericAnnotation).WhereElementIsElementType().ToElements()
Selected_Arrow_Family = forms.SelectFromList.show(Generic_Anno_Famlies,multiselect=False,name_attr='FamilyName',button_name='Select Flow Arrow Family')
if not Selected_Arrow_Family: # stopping the script if user does not select any flow arrow family
forms.alert("No flow arrow family selected","Script Cancelled")
script.exit()
else:
if not forms.check_selection():
script.exit()
ElementIds = uidoc.Selection.GetElementIds()
for el in ElementIds:
Selected_Pipe = doc.GetElement(el)
Test = []
Points_Test = []
#Making family active
if not Selected_Arrow_Family.IsActive:
Selected_Arrow_Family.Activate()
Category_Names = ["Pipes","Pipe Fittings","Pipe Accessories","Mechanical Equipment"]
def flatten(lst, depth=None):
if depth is None:
depth = float('inf')
result = []
for i in lst:
if isinstance(i, list) and depth > 0:
result.extend(flatten(i, depth-1))
else:
result.append(i)
return result
#Function to collect all elements in the connected network
def ConnectedNetworks(ElementList,bool_):
if isinstance(ElementList,list):
connector = ElementList
toggle = 0
else:
connector = [ElementList]
toggle = 1
bool__ = bool_
def nextElements(elem):
listout = []
if elem.GetType() == DB.Connector:
conn = elem
for c in conn.AllRefs:
if c.Owner.Id.Equals(elem.Owner.Id):
continue
elif isinstance(c.Owner,DB.MEPSystem):
continue
else:
newelem = c.Owner
listout.append(newelem)
return listout
try:
connectors = elem.ConnectorManager.Connectors
except:
connectors = elem.MEPModel.ConnectorManager.Connectors
for conn in connectors:
for c in conn.AllRefs:
if c.Owner.Id.Equals(elem.Id):
continue
elif isinstance(c.Owner,DB.MEPSystem):
continue
elif c.Owner.Id.Equals(ownerId):
continue
else:
newelem = c.Owner
listout.append(newelem)
return listout
def nextElementsWithOwner(elem):
listout = []
if elem.GetType() == DB.Connector:
conn = elem
for c in conn.AllRefs:
if c.Owner.Id.Equals(elem.Owner.Id):
continue
elif isinstance(c.Owner,DB.MEPSystem):
continue
else:
newelem = c.Owner
listout.append(newelem)
return listout
try:
connectors = elem.ConnectorManager.Connectors
except:
connectors = elem.MEPModel.ConnectorManager.Connectors
for conn in connectors:
for c in conn.AllRefs:
if c.Owner.Id.Equals(elem.Id):
continue
elif isinstance(c.Owner,DB.MEPSystem):
continue
else:
newelem = c.Owner
listout.append(newelem)
return listout
def collector(elem):
cont = 0
elements = nextElements(elem)
for x in elements:
if x.Id in lookup:
cont += 1
else:
item = doc.GetElement(x.Id)
lookup[x.Id] = item
collector(x)
if cont == len(elements):
return elem
def collectorWithOwner(elem):
cont = 0
elements = nextElementsWithOwner(elem)
for x in elements:
if x.Id in lookup:
cont += 1
else:
item = doc.GetElement(x.Id)
lookup[x.Id] = item
collectorWithOwner(x)
if cont == len(elements):
return elem
listout = []
if bool__:
for x in connector:
lookup = collections.OrderedDict()
collectorWithOwner(x)
listout.append(lookup.Values)
else:
for x in connector:
lookup = collections.OrderedDict()
if x.GetType() == DB.Connector:
ownerId = x.Owner.Id
else:
ownerId = x.Id
collector(x)
listout.append(lookup.Values)
#Assign your output to the OUT variable.
if toggle:
return lookup.Values
else:
return listout
def elementcategory(element):
ele = element
ele_cat = ele.Category
Curr_View = doc.ActiveView
def placeArrows(pipe):
Center_Of_Pipe = pipe.Location.Curve.Evaluate(0.5, True)
NewFlowArrowFamily_Created = doc.Create.NewFamilyInstance(Center_Of_Pipe,Selected_Arrow_Family ,Curr_View);
return NewFlowArrowFamily_Created
All_Elements_In_Network = ConnectedNetworks(Selected_Pipe,True) #False since I don't want selected element
All_Pipes_In_Network = []
for el in All_Elements_In_Network:
cat = str(el.Category.Name)
if cat=="Pipes":
All_Pipes_In_Network.append(el)
cat = Selected_Pipe.Category.Name
#Getting the first point
conn = Selected_Pipe.ConnectorManager.Connectors
first_Point = None
for c in conn:
if not c.IsConnected: #first_Point = DB.XYZ(c.Origin.X,c.Origin.Y,c.Origin.Z)
first_Point = c.Origin
startLoc = first_Point
#Getting other open points
Other_Open_Points = []
for el in All_Pipes_In_Network:
if el.Id.IntegerValue != Selected_Pipe.Id.IntegerValue:
conn_ = []
conn_= el.ConnectorManager.Connectors
for c_ in conn_:
if not c_.IsConnected: #DB.XYZ(c_.Origin.X,c_.Origin.Y,c_.Origin.Z)
next_point = c_.Origin
Other_Open_Points.append(next_point)
# Here Comes Dijikstra
def Find_Path_Between_Two_Points(Point1,Poiint2,All_Elements_Of_Category_):
Closest_Elements_ = Find_Closest_Elements(Point1,Poiint2,All_Elements_Of_Category_) #okay
Connection_ = Find_Connection_Between_Elements(All_Elements_Of_Category_) #okay
Build_Graph_ = Build_Relations_Graph(All_Elements_Of_Category_,Connection_)
graph_ = Graph()
for edge in Build_Graph_:
kn = 0
for i in edge:
if kn == 0:
a = i
else:
b = i
kn = kn+1
graph_.add_edge(a,b,1)
path = dijsktra(graph_,Closest_Elements_[0], Closest_Elements_[1])
for i in Build_Graph_:
inter = []
for j in i:
inter.append(j.IntegerValue)
TestJ.append(inter)
return path
class Graph():
def __init__(self):
self.edges = defaultdict(list)
self.weights = {}
def add_edge(self, from_node, to_node, weight):
# Note: assumes edges are bi-directional
self.edges[from_node].append(to_node)
self.edges[to_node].append(from_node)
self.weights[(from_node, to_node)] = weight
self.weights[(to_node, from_node)] = weight
def dijsktra(graph, initial, end):
# shortest paths is a dict of nodes
# whose value is a tuple of (previous node, weight)
shortest_paths = {initial: (None, 0)}
current_node = initial
visited = set()
while current_node != end:
visited.add(current_node)
destinations = graph.edges[current_node]
weight_to_current_node = shortest_paths[current_node][1]
for next_node in destinations:
weight = graph.weights[(current_node, next_node)] + weight_to_current_node
if next_node not in shortest_paths:
shortest_paths[next_node] = (current_node, weight)
else:
current_shortest_weight = shortest_paths[next_node][1]
if current_shortest_weight > weight:
shortest_paths[next_node] = (current_node, weight)
next_destinations = {node: shortest_paths[node] for node in shortest_paths if node not in visited}
if not next_destinations:
return "Route Not Possible"
# next node is the destination with the lowest weight
current_node = min(next_destinations, key=lambda k: next_destinations[k][1])
# Work back through destinations in shortest path
path = []
while current_node is not None:
path.append(current_node)
next_node = shortest_paths[current_node][0]
current_node = next_node
# Reverse path
path = path[::-1]
return path
def pow2(n):
return math.pow(n,2)
def distance(p1,p2):
return math.fabs(math.sqrt(pow2(p2.X-p1.X)+pow2(p2.Y-p1.Y)+pow2(p2.Z-p1.Z)))
def Find_Closest_Elements(Point1,Point2,All_Elements_Of_Category):
points = []
pointElements = []
for e in All_Elements_Of_Category:
eLoc = e.Location
Test.append(eLoc)
#test.append(eLoc_dynamo.Get_Geometry())
#startEndPoint.append(eLoc)
try: # For curve based elements
s1 = e.Location.Curve.Evaluate(0, True)
points.append(s1)
pointElements.append(e)
s2 = e.Location.Curve.Evaluate(1, True)
points.append(s2)
pointElements.append(e)
except: # For point based elements
points.append(eLoc.Point)
pointElements.append(e)
d0S = 1000000000
d0E = 1000000000
minS, minE = [],[]
sEl, eEl = [],[]
for p,e in zip(points,pointElements):
dS = distance(p,startLoc)
dE = distance(p,endLoc)
if (dS<d0S):
minS = p
sEl = e
d0S = dS
if (dE<d0E):
minE = p
eEl = e
d0E = dE
return sEl,eEl
#Find Connection between Elements
def Find_Connection_Between_Elements(All_Elements_Of_Category):
p = []
dir = []
rep = []
ref = []
#elements = List.Flatten(elements,2)
for e in All_Elements_Of_Category:
refs = []
try:
connectors = e.MEPModel.ConnectorManager.Connectors
except:
connectors = e.ConnectorManager.Connectors
for conn in connectors:
for c in conn.AllRefs:
connectortype = str(c.ConnectorType)
if connectortype == "End":
refs.append(c.Owner)
ref.append(refs)
#Assign your output to the OUT variable.
return ref
def Build_Relations_Graph(elements,connected):
edges = []
k = 0
for e in elements:
edge = []
conn = connected[k]
for c in conn:
if (e!=c):
edge.append({e.Id,c.Id})
k+=1
edges.append(edge)
return [elem for sublist in edges for elem in sublist]
# Calling Dijikstra
All_Possible_Path = []
for lp in Other_Open_Points:
endLoc = lp
path_ = Find_Path_Between_Two_Points(first_Point,lp,All_Elements_In_Network)
All_Possible_Path.append(path_)
print(All_Possible_Path)