# ant.py # #*********************************************************** # # Python Modul zur Optimierung von kombinatorischen Anordnungsproblemen # mittels Ant Colony Optimization (Ameisenalgorithmus) # # Datum: 03.06.2014 # # Autor : Lars Ulke-Winter # BTU Cottbus-Senftenberg # Fachgebiet Polymerbasierter Leichtbau # #*********************************************************** import numpy from random import SystemRandom from copy import deepcopy # for Loging the the probabilityMatrix class Ant(object): """ Create an Ant indivdual """ def __init__(self, values, fitnessFunction=None): """ Arguments: - `labyrinthWay`: List of Integers which describes the way through labyrinth --> [0, 4, 9, 1, 5] - `transferTemplate`: Template list of symbols used for transfering labyrinth way in real order problem --> [0.15, 0.78, 15.0, 1.0, 20.] - `fitnessFunction`: Fitness function which is the Ant-Object associated (returns a scalar for real order problem); default --> None """ self.fitnessFunction = fitnessFunction self.values = values self.indices = [] # List of indices according to their values --> Set by AntColony._convertValues2Indices() self.numberOfVariables = len(values) self.fitness = None def getFitness(self): """ Computes the fitness according to self.fitnessFunction """ self.fitness = self.fitnessFunction(self.values) class AntColony(object): """ Collect Ant-objects with same transferFunction and fitnessFunction """ def __init__(self, ants, listOfPossibilities=[], fitnessFunction=None, randomCreate=True, membersize=100): """ Arguments: - `ants`: List of Ant-objects or an empty one [] - `listOfPossibilities`: List of List with possible symbols for each variable for whole AntColony --> [[0.1, 0.2],[1,2,3,4],['a','b','c','d']] - `fitnessFunction`: Fitness function which are the Ant-Objects associated with (returns a scalar for real order problem) - `randomCreate`: should the Ant-objects randomly create; default --> True - `membersize`: if randomCreate=True then create membersize Ant-objects randomly """ self.ants = ants self.membersize=len(ants) self.listOfPossibilities = listOfPossibilities self.numberOfVariables = len(self.listOfPossibilities) self.fitnessFunction = fitnessFunction self.averageFitness = None self.minFitness = None if randomCreate: self.membersize=membersize self._randomInitalize(self.membersize) def _randomInitalize(self,membersize=10): """ Helping function Initilize randomly an AntColony with membersize Ant-objects where variables contain self.listOfPossibilities """ antList=[] for j in range(membersize): ind = [] vector = [] for variable in self.listOfPossibilities: var = SystemRandom().choice(variable) ind.append(variable.index(var)) vector.append(var) a = Ant(vector,fitnessFunction=self.fitnessFunction) a.index = ind antList.append(a) self.ants=antList def _order2Fitness(self): """ Helping function Orders the Ant-objects of self.ants according their fitness min. fitness first Sets the Attributes self.maxFitness as well as self.averageFitnesses """ self.ants.sort(key=lambda ant: ant.fitness)#,reverse=True) # sets Attribute self.maxFitness self.minFitness=self.ants[0].fitness # sets Attribute self.averageFitness fitnessValues = [] for ant in self.ants: fitnessValues.append(ant.fitness) sumOfFitnesses=numpy.sum(fitnessValues) self.averageFitness = sumOfFitnesses/self.membersize def getAllFitnesses(self): ''' Compute the fitness of all Ant-objects of self.ants ''' for ant in self.ants: ant.getFitness() # call the Ant-objects.getFitness() class ACO(object): """ Implements ACO-Algorithms """ def __init__(self,startAntColony,priorityFunction=None): """ Arguments: - `startAntColony`: AntColony-object with wich to start - `priorityFunction`: priority-Function """ self.colony=startAntColony self.priorityFunction = priorityFunction self.tauij = [] self.probabilityMatrix = [] self.probabilityMatrixDict = {} # Dictionary of probabilityMatrix {iterationNumber:probabilityMatrix,...} # create global probability-matrix tauij self.tau0=0.1 # start values in tauij for i in range(len(self.colony.listOfPossibilities)): # tau=[self.tau0 for j in range(len(self.colony.listOfPossibilities[i]))] tau = [self.tau0]*len(self.colony.listOfPossibilities[i]) p = [0]*len(self.colony.listOfPossibilities[i]) # p =[0 for k in range(len(self.colony.listOfPossibilities[i]))] # self.tauij=numpy.array(l).reshape(self.colony.numberOfVariables,len(self.colony.listOfPossibilities)) self.tauij.append(tau) self.probabilityMatrix.append(p) self.reducedListOfPossibilities = [] # create global heuristic-matrix etaij,with dimension (self.numberOfVariables,self.numberOfPossibilities) # etaij = [] # for i in self.colony.listOfPossibilities: # etaij.append(self.priorityFunction(i)) # l=etaij*self.colony.numberOfVariables # self.etaij=numpy.array(l).reshape(self.colony.numberOfVariables,len(self.colony.listOfPossibilities)) # create global probability-matrix pij,with dimension (self.numberOfVariables,self.numberOfPossibilities) # pij0=0. # start values in self.probabilityMatrix # l=[pij0 for i in range(len(self.colony.listOfPossibilities))]*self.colony.numberOfVariables # self.probabilityMatrix=numpy.array(l).reshape(self.colony.numberOfVariables,len(self.colony.listOfPossibilities)) def _setProbabilityMatrix(self,alpha=1.,beta=1.): """ Sets the probabilities of the probability-Matrix for selection of the variables to create Ant-Objects """ # sum(tau_ij^alpha*eta_ij^beta) listSumOfTauijTimesPrioRows = [] for rowIndex in range(self.colony.numberOfVariables): sumOfRow=0. for columnIndex in range(len(self.colony.listOfPossibilities[rowIndex])): sumOfRow += (self.tauij[rowIndex][columnIndex]**alpha) listSumOfTauijTimesPrioRows.append(sumOfRow) # p_ij=(tau_ij^alpha*eta_ij^beta)/sum(tau_ij^alpha*eta_ij^beta) for rowIndex in range(self.colony.numberOfVariables): for columnIndex in range(len(self.colony.listOfPossibilities[rowIndex])): self.probabilityMatrix[rowIndex][columnIndex] = (self.tauij[rowIndex][columnIndex]**alpha)/listSumOfTauijTimesPrioRows[rowIndex] def _selection(self,method='ACO', q = 0.5): """ Returns a List of indices, which are selected according to probability in self.colony.probabilityMatrix Multiple Roulette wheel selection """ indexList = [] if method =='AC': # selection method AC # for each variable for i in range(self.colony.numberOfVariables): randNumber = SystemRandom().random() # Random number [0,1) preCumSum=0.; cumSum=0. for index,value in enumerate(self.probabilityMatrix[i]): cumSum+=value if (cumSum < randNumber) and (preCumSum < randNumber): preCumSum += value else: indexList.append(index) break else: # selection method ACO --> some (whichSelection<=q) ants walk the way with a probability (same as AC), others (whichSelection>q) always the way with highest probability # for each variable for i in range(self.colony.numberOfVariables): whichSelection = SystemRandom().random() # Random number [0,1) if whichSelection <= q: # selection same as AC randNumber = SystemRandom().random() # Random number [0,1) preCumSum=0.; cumSum=0. for index,value in enumerate(self.probabilityMatrix[i]): cumSum+=value if (cumSum < randNumber) and (preCumSum < randNumber): preCumSum += value else: indexList.append(index) break else: # the value with highest probability indexList.append(self.probabilityMatrix[i].index(max(self.probabilityMatrix[i]))) return indexList def _convertIndices2Values(self,listOfIndices): """ Returns a List of values according to indices in self.colony.listOfPossibilities """ values = [] for i in range(self.colony.numberOfVariables): values.append(self.colony.listOfPossibilities[i][listOfIndices[i]]) return values def _createNewAntColony(self,selectionMethod='ACO', ACOq=0.5): ''' Returns a new AntColony-Object according to self.probabilityMatrix ''' antList = [] for member in range(self.colony.membersize): indices = self._selection(method=selectionMethod,q=ACOq) values = self._convertIndices2Values(indices) ant = Ant(values,fitnessFunction=self.colony.fitnessFunction) ant.indices = indices antList.append(ant) return AntColony(antList, listOfPossibilities=self.colony.listOfPossibilities, fitnessFunction=self.colony.fitnessFunction, randomCreate=False, membersize=self.colony.membersize) def getBest(self,roh=0.1, w=1., selection='ACO', q=0.5, switch=3, eps=0.95, maxIterations=100, showIteration='YES', showResults='YES', logProbabilityMatrix=False, withSecondRun='NO', fitnessFunction2=None, probThreshold=1.0, secondRunAfterIteration=5, eps2=0.99): # Values for the second run with reduced Variables """ Return the Best Ant Individuum (minimum Value) - roh: weathering factor of tauij--> default=0.1 - w: scale factor for updating tauij --> deltaTauij =1/(w*fitness) - selection: Method of selection (Algorithm) 'AC' or 'ACO'-->default - q: Probability for 'ACO' selection (Best way or probability way) - eps: Termination criterion: terminate when bestFitness/averageFitness maxIterations - showIteration: if 'YES' (default) iteration will be shown else ('NO') - logProbabilityMatrix: if False (default) attribute probabilityMatrixDict is empty, True probabilityMatrix is loged: {iterationNumber:probabilityMatrix,...} """ # save optimization parameters self.roh = roh; self.w=w; self.selection=selection; self.q=q; self.switch=switch; self.eps=eps; self.maxIterations=maxIterations self.showIteration=showIteration; self.showResults=showResults; self.logProbabilityMatrix=logProbabilityMatrix self.withSecondRun=withSecondRun self.fitnessFunction2=fitnessFunction2 self.eps2=eps2 if withSecondRun!='NO': self.probThreshold=probThreshold self.secondRunAfterIteration = secondRunAfterIteration else: self.probThreshold=0.0 self.secondRunAfterIteration = self.maxIterations # compute all fitnesses and order self.colony.getAllFitnesses() self.colony._order2Fitness() best = self.colony.ants[0] # print 'Bester Startvektor: '+str(best.values) iteration = 0 while iteration <= self.maxIterations and (best.fitness/self.colony.averageFitness) < self.eps\ and (iteration < self.secondRunAfterIteration) or not (self._checkIfAllVariablesHasProbsBiggerThan(threshold=self.probThreshold)): if self.showIteration =='YES': print 'Best :'+str(best.fitness)+'; Aver.: '+str(self.colony.averageFitness)+'; Best/Aver.= '+str(best.fitness/self.colony.averageFitness)+'; Iter : '+str(iteration) # weathering self.tauij for varIndex in range(len(self.tauij)): for valueIndex in range(len(self.tauij[varIndex])): self.tauij[varIndex][valueIndex]*=(1-self.roh) # update self.tauij according to fitnesses for a in self.colony.ants: for varIndex,valueIndex in enumerate(a.indices): deltaTauij = 1./(self.w*a.fitness) self.tauij[varIndex][valueIndex] +=deltaTauij # update self.probabilityMatrix according to fitnesses for selection in next Generation self._setProbabilityMatrix() # Log the probabilityMatrix if self.logProbabilityMatrix: self.probabilityMatrixDict[iteration]=deepcopy(self.probabilityMatrix) if iteration <= self.switch: self.colony = self._createNewAntColony(selectionMethod='AC', ACOq=self.q) else: self.colony = self._createNewAntColony(selectionMethod=self.selection, ACOq=self.q) self.colony.getAllFitnesses() self.colony._order2Fitness() minInColony = self.colony.ants[0] if minInColony.fitness < best.fitness: best = minInColony iteration += 1 iteration -= 1 funCalls = self.colony.membersize*(iteration+1) # Second run with reduced listOfPossibilities and maybe a different fitness function iteration2 = 0; best1 = deepcopy(best) if withSecondRun!='NO': if self.showIteration != 'NO': print print 'Second run begins!' print best2,iteration2,funCalls2=self._getBest2Run(best1, self.fitnessFunction2) iteration+=iteration2; funCalls+=funCalls2 # select best individuum of both runs for returning, according to the fitness function of first run if best.fitness > self.colony.fitnessFunction(best2.values): best=best2; # print output as summary if showResults !='NO': if withSecondRun != 'NO': print print '###########' print '# Results #' print '###########' print print 'First run:' print '-----------' print 'Fitness function: '+self.colony.fitnessFunction.__name__ print 'Iterationen : '+str(iteration-iteration2) print 'Solution : '+str(best1.values)+' Fitness --> '+str(best1.fitness) print print 'Second run:' print '-----------' print 'Fitness function: '+self.fitnessFunction2.__name__ print 'Iterationen : '+str(iteration2)+' --> Total: '+str(iteration) print 'Solution : '+str(best2.values)+' Fitness --> '+str(best2.fitness) print 'Reduced selection matrix with p > '+str(self.probThreshold)+' :' for var in self.reducedListOfPossibilities: print var else: print print '###########' print '# Results #' print '###########' print print 'Fitness function: '+self.colony.fitnessFunction.__name__ print 'Iterationen : '+str(iteration-iteration2) print 'Solution : '+str(best1.values)+' Fitness --> '+str(best1.fitness) # return Tuple with the best Ant-Object (min. Fitness),reached number of iterations (all iteration-startIteration (sum of both runs)), whole number of funCalls (sum of both runs) return best, iteration,funCalls # ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ # Hilsfunktionen um den Loesungsraum fuer einen nochmaligen Durchlauf einzuschraenken # Damit sollen die Wertebelegungen herausgefunden werden deren Zielfunktionen aehnliche Fitnesswerte aufweisen, um in einem weiteren Durchlauf eine Feinjustierung vornehmen zu koennen # # + _checkIfAllVariablesHasProbsBiggerThan(threshold=0.1) # diese Funktion ueberprueft, ob fuer jede Variable mindestens eine Wertebelegung die Auswahlwahrscheinlichkeit groesser als threshold ist, # haben sich also Wege mit mindestens der Auswahlwahrscheinlichkeit threshold durchgesetzt liefert die Funktion True zurueck ansonsten False # # + _selectNewListOfPossibilities(threshold=0.1) # liefert eine eingeschraenkte moegliche Besetzungsmatrix zurueck, bei denen die Variablenzustaende in der zugeordneten Wahrscheinlichkeitsmatrix mindestens die groesse von threshold aufweisen # # + _addToListOfPossibilities(listToAdd) # Fuegt zu einer bestehenden Liste moeglicher Auswahlzustaende weitere Auswahlmoeglichkeiten hinzu, wenn diese nicht schon in der bestehenden Liste existieren. # Damit soll verhindert werden, dass die Variablen des besten Individuums in einem nachfolgenden Durchlauf mit reduzierter moeglicher Wertebelegung erhalten bleiben def _checkIfAllVariablesHasProbsBiggerThan(self,threshold=0.1): ''' Returns True if a value of a variable has a selection probability bigger than threshold, it must be true for each variable . otherwise False ''' for probs in self.probabilityMatrix: probsGreater=filter(lambda x:x>threshold,probs) if not probsGreater: return False else: continue return True def _reduceListOfPossibilities(self,threshold=0.1): ''' Returns a new list of a possible variable allocation according to a threshold in the probabilityMatrix. If the referenced value the probabilityMatrix bigger than threshold than it is in self.reducedListOfPossibilities ''' for varOldIndex, varOld in enumerate(self.probabilityMatrix): valueNew = [] for valueOldIndex in range(len(varOld)): if self.probabilityMatrix[varOldIndex][valueOldIndex] >= threshold: # print self.probabilityMatrix[varOldIndex][valueOldIndex] valueNew.append(self.colony.listOfPossibilities[varOldIndex][valueOldIndex]) self.reducedListOfPossibilities.append(valueNew) def _addToListOfPossibilities(self,listToAdd): for index,elem in enumerate(listToAdd): if elem not in self.reducedListOfPossibilities[index]: self.reducedListOfPossibilities[index].append(elem) # ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ def _getBest2Run(self,best1Run,newFitnessFunction): # Create the reduced List (probability of selection greater than self.probThreshold) self._reduceListOfPossibilities(threshold=self.probThreshold) # add the values of the best individuum self._addToListOfPossibilities(best1Run.values) # create a new colony rondomly with same membersize as before, but with another fitness function and reduced possible variable list ac=AntColony([], self.reducedListOfPossibilities, fitnessFunction=newFitnessFunction, randomCreate=True, membersize=self.colony.membersize) aco = ACO(ac) best2,iteration2,funCalls2 = aco.getBest(roh=self.roh, w=self.w, selection=self.selection, switch=self.switch, q=self.q, eps=self.eps2, maxIterations=self.maxIterations, showIteration=self.showIteration, showResults='NO', logProbabilityMatrix=self.logProbabilityMatrix, withSecondRun='NO') return best2,iteration2,funCalls2 # Example if __name__ == '__main__': # Fitness functions (in this example same functions f(x)=f2(x) ) # Fitness function for first run def f(x): return numpy.sum(x) # Fitness function for second run with reduced possible variables (reduced listOfPossibilities) def f2(x): return numpy.sum(x) ac = AntColony([],listOfPossibilities=[[10., 3., 2., 5., 6., 7., 8., 9., 1., 4.], [10., 1., 3., 5., 6., 7., 8., 9., 4., 2.], [10., 4., 3., 1., 6., 7., 8., 9., 4., 2.], [10., 4., 2., 1., 6., 7., 8., 9., 3., 5.], [10., 3., 5., 1., 6., 7., 8., 9., 4., 2.], [10., 4., 5., 1., 2., 7., 8., 9., 6., 3.], [10., 3., 5., 1., 6., 4., 8., 9., 7., 2.], [10., 3., 5., 1., 6., 7., 4., 9., 8., 2.], [10., 3., 4., 2., 6., 7., 8., 9., 1., 5.], [10., 3., 5., 2., 6., 4., 8., 9., 1., 7.]], fitnessFunction=f, randomCreate=True, membersize=1000) print '---------- ACO to reduce the sum ----------' aco1=ACO(ac) best,iteration,funCalls = aco1.getBest(roh=0.1,w=10.,selection='ACO', switch=10, q=1.0, eps=0.75, maxIterations=100, # Optimization parameters showIteration='YES', showResults='YES', logProbabilityMatrix=True, # print and log results withSecondRun='NO', fitnessFunction2=f2, # fitness function which is consulted in second run probThreshold=0.06, secondRunAfterIteration=15, # conditions which has to be fulfilled at least before switching to second run; # means you deside when to switch: probThreshold or secondRunAfterIteration # the filter probThreshold will be applied to reduce the listOfPossibilities anyway eps2=0.80) # termination condition of second run # best,iteration,funCalls = aco1.getBest(roh=0.1,w=10.,selection='ACO', switch=10, q=0.3, eps=0.75, maxIterations=100, showIteration='NO', showResults='YES', # logProbabilityMatrix=False)