Monday, September 2, 2013

Creating a Cython extension type for use with multiProcessing for function fitting

If you have to fit a complex function to a very big data set it would be nice to be able to use all the cores your cpu has. Because the data set is very big it should be efficient to simply split the data set over a number of cores and calculate the total error sum (sum squared error) in parts in parallel. This sounds simple but it took me some effort to do this in python/Cython on both linux and windows. After googling for a while, I decided that using the multiProcessing module should work best for my specific situation (which contains a lot of python code which makes it difficult to turn the GIL temporarily off). On linux I had things running relatively fast, but on windows I could not get it to function. The difference is caused by the lack of real processes on windows (or at least they work differently). On a fork() in linux everything is copied but this does not happen on windows and you have to take care that all data is correctly passed to the child process (read "Explicitly pass resources to child processes" in the multiprocessing documentation).

To try things out I started with a simple example:

#!/usr/bin/env python
from multiprocessing import Process,Queue
import sys,numpy,pylab

class TFitFunc:
    def __init__(self,X0,x,y,pid=1):
        self.a=X0[0]
        self.b=X0[1]
        self.c=X0[2]
        self.x=x[:]
        self.y=y[:]
        self.pid=pid
        
    def __call__(self,X):
        self.a=X[0]
        self.b=X[1]
        self.c=X[2]
        errsum=0
        for i in range(100):
            ymod=self.a*self.x**2+self.b*self.x+self.c
            errsum+=numpy.sum((ymod[:]-self.y[:])**2)
        
        return(errsum)
        
        
class TFitFuncComplex:
    def __init__(self,X0,x,y,pid=1):
        self.a=X0[0]
        self.b=X0[1]
        self.c=X0[2]
        self.x=x[:]
        self.y=y[:]
        self.pid=pid
        
    def __call__(self,X):
        self.a=X[0]
        self.b=X[1]
        self.c=X[2]
        errsum=0
        for i in range(100):
            ymod=self.a*self.x**2+self.b*self.x+self.c+\
                numpy.sin(numpy.sqrt(self.x))*numpy.cos(self.x+0.5)\
                /self.a*numpy.sqrt(self.b)
            errsum+=numpy.sum((ymod[:]-self.y[:])**2)
        
        return(errsum)        
        
def f(fitfunc,X,Q=None):
    errsum=fitfunc(X)
    print "%d: errsum:%e"%(fitfunc.pid,errsum)
    if(Q!=None):
        Q.put(errsum)
    return(errsum)
    
def main():
    a=1.0
    b=2.0
    c=3.0
    x1=numpy.r_[0:10:-10000000j]
    x2=numpy.r_[0:10:-10000000j]
    
    y1=a*x1**2+b*x1+c
    y1+=y1*0.35*(numpy.random.random(len(x1))-0.5)
    y2=a*x2**2+b*x2+c
    y2+=y2*0.35*(numpy.random.random(len(x2))-0.5)
    X0=[0.5,2.5,1.5]
    f1=TFitFunc(X0,x1,y1,1)
    f2=TFitFunc(X0,x2,y2,2)
    
    ps=[]
    for i in range(2):
        if(i==0):
            p=Process(target=f,args=(f1,X0))
        else:
            p=Process(target=f,args=(f2,X0))
        p.start()
        ps.append(p)
    for p in ps:
        p.join()
    
    
def main_complex():
    a=1.0
    b=2.0
    c=3.0
    x1=numpy.r_[0:10:-1000000j]
    x2=numpy.r_[0:10:-1000000j]
    
    y1=a*x1**2+b*x1+c
    y1+=y1*0.35*(numpy.random.random(len(x1))-0.5)
    y2=a*x2**2+b*x2+c
    y2+=y2*0.35*(numpy.random.random(len(x2))-0.5)
    X0=[0.5,2.5,1.5]
    f1=TFitFuncComplex(X0,x1,y1,1)
    f2=TFitFuncComplex(X0,x2,y2,2)
    
    ps=[]
    Qs=[]
    errSum=0.0
    for i in range(2):
        Qs.append(Queue())
        if(i==0):
            p=Process(target=f,args=(f1,X0,Qs[i]))
        else:
            p=Process(target=f,args=(f2,X0,Qs[i]))
        p.start()
        ps.append(p)
    for i in range(2):
        errSum+=Qs[i].get()
        ps[i].join()
    print "Total errsum: %e"%(errSum)
    
    
def main_single_complex():
    a=1.0
    b=2.0
    c=3.0
    x1=numpy.r_[0:10:-1000000j]
    x2=numpy.r_[0:10:-1000000j]
    
    y1=a*x1**2+b*x1+c
    y1+=y1*0.35*(numpy.random.random(len(x1))-0.5)
    y2=a*x2**2+b*x2+c
    y2+=y2*0.35*(numpy.random.random(len(x2))-0.5)
    X0=[0.5,2.5,1.5]
    f1=TFitFuncComplex(X0,x1,y1,1)
    f2=TFitFuncComplex(X0,x2,y2,2)
    errSum=f(f1,X0)
    errSum+=f(f2,X0)
    print "Total errsum: %e"%(errSum)

def main_single():
    a=1.0
    b=2.0
    c=3.0
    x1=numpy.r_[0:10:-10000000j]
    x2=numpy.r_[0:10:-10000000j]
    
    y1=a*x1**2+b*x1+c
    y1+=y1*0.35*(numpy.random.random(len(x1))-0.5)
    y2=a*x2**2+b*x2+c
    y2+=y2*0.35*(numpy.random.random(len(x2))-0.5)
    X0=[0.5,2.5,1.5]
    f1=TFitFunc(X0,x1,y1,1)
    f2=TFitFunc(X0,x2,y2,2)
    f(f1,X0)
    f(f2,X0)
    
if __name__=='__main__':
    main_single()
    #~ main()
    #~ main_complex()
    #~ main_single_complex()

This works fine on linux and windows. However, this is pure python. Normally, I use a lot of Cython code in extension types (aka cython classes). And this I could not get to work without some more research. Now first my solution. The first part shows the .pyx file with two classes, one normal python class with some cython code in it and one real extension type. The second part shows the script using these classes.

TFitFunctions.pyx
#!/usr/bin/env python
import numpy as np
cimport numpy as np

class TFitFunc:
    """
    Simple demonstration class to be used as fit function.
    By definition of the __call__ member function an object of this
    class is callable (functor). All additional data required 
    to calculate the error sum should be passed to the constructor.
    The function here is simply
    y=a*x**2+b*x+c
    """
    def __init__(self,X0,x,y,pid=1):
        """
        Constructor. 
        Arguments:
        X0: list of initial values for the three model parameters [a, b, c]
        x: array of x values
        y: array of y values (typically experimentally determined data points)
        pid: optional "process id"
        """
        self.a=X0[0]
        self.b=X0[1]
        self.c=X0[2]
        self.x=x[:]
        self.y=y[:]
        self._pid=pid
    
    def pid(self):
        return(self._pid)
        
    def __call__(self,X):
        """
        Make objects of this class callable. The argument is a list/array
        of model parameter values [a,b,c]
        The function returns the sum squared error
        """
        cdef double errsum
        cdef int i
        self.a=X[0] #could also have used X directly in the calculation below
        self.b=X[1]
        self.c=X[2]
        errsum=0
        for i in range(100): #do this a hundred times to waste some CPU time
            ymod=self.a*self.x**2+self.b*self.x+self.c #calculate model values
            errsum+=np.sum((ymod[:]-self.y[:])**2) # calculate summed square error
        errsum/=100.0 
        return(errsum)
        
        
cdef class TFitFuncComplex:
    """
    Simple demonstration class to be used as fit function.
    Very similar to TFitFunc but with a more complex (and time consuming)
    function. Another big difference is that now the class is defined
    as an extension type. 
    
    By definition of the __call__ member function an object of this
    class is callable (functor). All additional data required 
    to calculate the error sum should be passed to the constructor.
    The function here is simply
    y=a*x**2+b*x+c+sin(sqrt(x))*cos(x+0.5)/(a*b*b)
    """
    cdef double a,b,c   #in an extension type class member variables must be defined here
    cdef int _pid
    cdef np.ndarray x,y
    
    def __init__(self,X0,np.ndarray[double, ndim=1]x,np.ndarray[double, ndim=1]y,int pid=1):
        """
        Constructor. 
        Arguments:
        X0: list/array of initial values for the three model parameters [a, b, c]
        x: array of x values
        y: array of y values (typically experimentally determined data points)
        pid: optional "process id"
        """
        self.a=X0[0]
        self.b=X0[1]
        self.c=X0[2]
        self.x=x[:]
        self.y=y[:]
        self._pid=pid
        
    def pid(self):
        return(self._pid)
        
    def __call__(self,X):
        """
        Make objects of this class callable. The argument is a list/array
        of model parameter values [a,b,c]
        The function returns the sum squared error
        """
        cdef double errsum
        cdef int i
        self.a=X[0] #could also have used X directly in the calculation below
        self.b=X[1]
        self.c=X[2]
        errsum=0
        for i in xrange(100):#do this a hundred times to waste some CPU time
            ymod=self.a*self.x**2+self.b*self.x+self.c+\
                np.sin(np.sqrt(self.x))*np.cos(self.x+0.5)\
                /self.a*np.sqrt(self.b) #calculate model values
            errsum+=np.sum((ymod[:]-self.y[:])**2) # calculate summed square error
        errsum/=100.0 
        return(errsum)
        
   
    def __reduce__(self):
        """
        Without this function the code will not run with multiProcessing
        on Windows.
        It has something to do with making an extension type
        pickable. For a normal python class this is not required
        (see TFitFunc)
        """
        return TFitFuncComplex, ([self.a,self.b,self.c],self.x,self.y,self._pid)

calling script:

#!/usr/bin/env python
from multiprocessing import Process,Queue
import sys,numpy
from TFitFunctions import TFitFunc,TFitFuncComplex

"""
Demonstration of use Process with a callable Python/Cython classes
Can be easily extended into a real multiProcessing fitting
setup for use with (e.g.) fmin

The basic idea is that you have a huge amount of data points
that must be evaluated for the calculation of the summed squared error.
These data points are independent by nature and thus ideal for
parallel processing.
"""
      
        
def f(fitfunc,X,Q=None):
    """
    Function to be passed to Process as target. The function
    will call fitfunc with X as argument and put the result in the
    Queue Q is one is passed as argument.
    """
    errsum=fitfunc(X)
    print "%d: errsum:%e"%(fitfunc.pid(),errsum)
    if(Q!=None):
        Q.put(errsum)
    return(errsum)
    
def main():
    """
    Demonstration of use of TFitFunc
    """
    a=1.0 #model parameters
    b=2.0
    c=3.0
    x1=numpy.r_[0:10:-10000000j] #data points for first process (x) 
    x2=numpy.r_[0:10:-10000000j] #measurement points for second process (x)
    
    y1=a*x1**2+b*x1+c  #"measured" data for process 1
    y1+=y1*0.35*(numpy.random.random(len(x1))-0.5) #add some noise
    y2=a*x2**2+b*x2+c #"measured" data for process 2
    y2+=y2*0.35*(numpy.random.random(len(x2))-0.5) #add some noise
    X0=[0.5,2.5,1.5] #initial guess for model parameters
    f1=TFitFunc(X0,x1,y1,1) #Create fit function for process 1
    f2=TFitFunc(X0,x2,y2,2) #Create fit function for process 2
    
    ps=[] #to contain process
    Qs=[]
    errSum=0.0
    for i in range(2):
        Qs.append(Queue())
        if(i==0):
            p=Process(target=f,args=(f1,X0,Qs[i])) #create process 1
        else:
            p=Process(target=f,args=(f2,X0,Qs[i])) #create process 2
        p.start() #start process
        ps.append(p) #add process "handle"
    
    for i in range(2):
        errSum+=Qs[i].get() #collect error sums from processes
        ps[i].join() #wait for process to finish
    print "Total errsum: %e"%(errSum)
    

    
def main_complex():
    """
    Same as main but now with TFitFuncComplex
    """
    a=1.0
    b=2.0
    c=3.0
    x1=numpy.r_[0:10:-1000000j]
    x2=numpy.r_[0:10:-1000000j]
    
    y1=a*x1**2+b*x1+c
    y1+=y1*0.35*(numpy.random.random(len(x1))-0.5)
    y2=a*x2**2+b*x2+c
    y2+=y2*0.35*(numpy.random.random(len(x2))-0.5)
    X0=[0.5,2.5,1.5]
    f1=TFitFuncComplex(X0,x1,y1,1)
    f2=TFitFuncComplex(X0,x2,y2,2)
    
    
    ps=[]
    Qs=[]
    errSum=0.0
    for i in range(2):
        Qs.append(Queue())
        if(i==0):
            p=Process(target=f,args=(f1,X0,Qs[i]))
        else:
            p=Process(target=f,args=(f2,X0,Qs[i]))
        p.start()
        ps.append(p)
    for i in range(2):
        errSum+=Qs[i].get()
        ps[i].join()
    print "Total errsum: %e"%(errSum)
    

    
def calcErrorSum(X,fitfuncs):
    """
    Calculate error sum for given model parameters (X)
    by using the functions in fitfuncs in (parallel) processes
    """
    ps=[]
    Qs=[]
    errSum=0.0
    for i in range(len(fitfuncs)):
        Qs.append(Queue())
        p=Process(target=f,args=(fitfuncs[i],X,Qs[i]))
        p.start()
        ps.append(p)
    for i in range(2):
        errSum+=Qs[i].get()
        ps[i].join()
    print "Total errsum: %e"%(errSum)
    return(errSum)
    
def main_complex2():
    """
    Same as main_complex but now with data creation part seperated
    from function evaluation part. PLease note that
    the function calcErrorSum can be used as a (multiProcessing)
    argument to (e.g.) fmin
    """
    a=1.0
    b=2.0
    c=3.0
    x1=numpy.r_[0:10:-1000000j]
    x2=numpy.r_[0:10:-1000000j]
    
    y1=a*x1**2+b*x1+c
    y1+=y1*0.35*(numpy.random.random(len(x1))-0.5)
    y2=a*x2**2+b*x2+c
    y2+=y2*0.35*(numpy.random.random(len(x2))-0.5)
    X0=[0.5,2.5,1.5]
    f1=TFitFuncComplex(X0,x1,y1,1)
    f2=TFitFuncComplex(X0,x2,y2,2)
    fitfuncs=[f1,f2]
    
    calcErrorSum(X0,fitfuncs)
    
        
    
def main_single_complex():
    """
    Same a main_complex but now serial evaluation (for time comparison purposes)
    """
    a=1.0
    b=2.0
    c=3.0
    x1=numpy.r_[0:10:-1000000j]
    x2=numpy.r_[0:10:-1000000j]
    
    y1=a*x1**2+b*x1+c
    y1+=y1*0.35*(numpy.random.random(len(x1))-0.5)
    y2=a*x2**2+b*x2+c
    y2+=y2*0.35*(numpy.random.random(len(x2))-0.5)
    X0=[0.5,2.5,1.5]
    f1=TFitFuncComplex(X0,x1,y1,1)
    f2=TFitFuncComplex(X0,x2,y2,2)
    errSum=f(f1,X0)
    errSum+=f(f2,X0)
    print "Total errsum: %e"%(errSum)

def main_single():
    """
    Same a main but now serial evaluation (for time comparison purposes)
    """
    a=1.0
    b=2.0
    c=3.0
    x1=numpy.r_[0:10:-10000000j]
    x2=numpy.r_[0:10:-10000000j]
    
    y1=a*x1**2+b*x1+c
    y1+=y1*0.35*(numpy.random.random(len(x1))-0.5)
    y2=a*x2**2+b*x2+c
    y2+=y2*0.35*(numpy.random.random(len(x2))-0.5)
    X0=[0.5,2.5,1.5]
    f1=TFitFunc(X0,x1,y1,1)
    f2=TFitFunc(X0,x2,y2,2)
    f(f1,X0)
    f(f2,X0)
    
if __name__=='__main__':
    #~ main_single()
    #~ main()
    #~ main_complex2()
    main_complex()
    #~ main_single_complex()

The code in TFitFunctions.pyx shows that the trick for an extension type is to add the __reduce__ method to make it pickable. This is only needed on windows.

4 comments:

  1. Hi, I understand this is an old post. But I would really appreciate if you could help me with a question related to this blog entry. Like your example code, I have a Cython extension type and a situation that needs some parallel computation distributed over multi-CPUs. For this reason, I wish to make use of multi-processing capabilities of the multiprocessing module. In your code main_complex(), when you pass in the extension type as an argument, you are effectively making a copy of the object, right? In my situation, such a copy is expensive because of the object's memory requirements. Is there any way the object can be shared? My processes don't need to modify the state of the object; hence I don't foresee any synchronization issues arising due to that. Appreciate your inputs!

    ReplyDelete
  2. Sorry didn't see your question earlier. As this has been a while I will need a little bit of time to look into this. Will get back to you on this.

    ReplyDelete
  3. As described in the multiprocessing documentation (https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Array) there are ways to use shared memory. In the example below I use the Array type to use shared memory. This is a pure python example but I see no reason why it would not also work with Cython. For efficiency it might be necessary to use the multiprocessing.sharedctypes module.

    The example python code:
    #!/usr/bin/env python
    from multiprocessing import Process,Queue,Array
    import sys,numpy,pylab



    class TFitFuncA:
    def __init__(self,X0,x,y,pid=1):
    self.a=X0[0]
    self.b=X0[1]
    self.c=X0[2]
    self.x=x
    self.y=y
    self.pid=pid
    self.N=len(x)

    def __call__(self,X):
    self.a=X[0]
    self.b=X[1]
    self.c=X[2]
    errsum=0
    for i in range(self.N):
    ymod=self.a*self.x[i]**2+self.b*self.x[i]+self.c
    errsum+=(ymod-self.y[i])**2

    return(errsum)


    def f(fitfunc,X,Q=None):
    errsum=fitfunc(X)
    print "%d: errsum:%e"%(fitfunc.pid,errsum)
    if(Q!=None):
    Q.put(errsum)
    return(errsum)



    def main_array():

    a=1.0
    b=2.0
    c=3.0
    #~ x1=numpy.r_[0:10:-10000000j]
    #~ x2=numpy.r_[0:10:-10000000j]
    N=1000000
    ax1 = Array('d', range(N),lock=False) #lock false because no writing to these arrays
    dx=10./(N+1.0)
    for i in range(N):
    ax1[i]=i*dx

    ay1 = Array('d', range(N))
    ay2 = Array('d', range(N))
    ran_array=numpy.random.random(len(ax1))
    ran_array2=numpy.random.random(len(ax1))

    for i in range(N):
    ay1[i]=a*ax1[i]**2+b*ax1[i]+c
    ay1[i]+=ay1[i]*0.35*(ran_array[i]-0.5)
    ay2[i]=a*ax1[i]**2+b*ax1[i]+c
    ay2[i]+=ay2[i]*0.35*(ran_array2[i]-0.5)
    X0=[0.5,2.5,1.5]
    f1=TFitFuncA(X0,ax1,ay1,1)
    f2=TFitFuncA(X0,ax1,ay2,2)
    print "Init done"
    ps=[]
    Qs=[]
    errSum=0.0
    for i in range(2):
    Qs.append(Queue())
    if(i==0):
    p=Process(target=f,args=(f1,X0,Qs[i]))
    else:
    p=Process(target=f,args=(f2,X0,Qs[i]))
    p.start()
    ps.append(p)
    for i in range(2):
    errSum+=Qs[i].get()
    ps[i].join()
    print "Total errsum: %e"%(errSum)


    if __name__=='__main__':
    main_array()

    ReplyDelete
  4. Thank you so much! I stumbled upon your blog again and am pleasantly surprised to see your reply! I too had figured out that using Array type for shared memory access would work, and it did. Thanks again!

    ReplyDelete