2010-04-07 5 views
7

I recently asked about trying to optimise a Python loop for a scientific application, y recibí an excellent, smart way of recoding it within NumPy which reduced execution time by a factor of around 100 por mí!Reescribiendo un bucle for en NumPy puro para disminuir el tiempo de ejecución

Sin embargo, el cálculo del valor B está en realidad anidado dentro de algunos otros bucles, porque se evalúa en una cuadrícula de posiciones normal. ¿Existe una reescritura NumPy igualmente inteligente para afeitar el tiempo de este procedimiento?

Sospecho que la ganancia de rendimiento para esta parte sería menos marcada, y las desventajas serían presumiblemente que no sería posible informar al usuario sobre el progreso del cálculo, que los resultados no podrían escribirse en el archivo de salida hasta el final del cálculo, y posiblemente eso haciendo un paso enorme tendría implicaciones de memoria? ¿Es posible eludir alguno de estos?

import numpy as np 
import time 

def reshape_vector(v): 
    b = np.empty((3,1)) 
    for i in range(3): 
     b[i][0] = v[i] 
    return b 

def unit_vectors(r): 
    return r/np.sqrt((r*r).sum(0)) 

def calculate_dipole(mu, r_i, mom_i): 
    relative = mu - r_i 
    r_unit = unit_vectors(relative) 
    A = 1e-7 

    num = A*(3*np.sum(mom_i*r_unit, 0)*r_unit - mom_i) 
    den = np.sqrt(np.sum(relative*relative, 0))**3 
    B = np.sum(num/den, 1) 
    return B 

N = 20000 # number of dipoles 
r_i = np.random.random((3,N)) # positions of dipoles 
mom_i = np.random.random((3,N)) # moments of dipoles 
a = np.random.random((3,3)) # three basis vectors for this crystal 
n = [10,10,10] # points at which to evaluate sum 
gamma_mu = 135.5 # a constant 

t_start = time.clock() 
for i in range(n[0]): 
    r_frac_x = np.float(i)/np.float(n[0]) 
    r_test_x = r_frac_x * a[0] 
    for j in range(n[1]): 
     r_frac_y = np.float(j)/np.float(n[1]) 
     r_test_y = r_frac_y * a[1] 
     for k in range(n[2]): 
      r_frac_z = np.float(k)/np.float(n[2]) 
      r_test = r_test_x +r_test_y + r_frac_z * a[2] 
      r_test_fast = reshape_vector(r_test) 
      B = calculate_dipole(r_test_fast, r_i, mom_i) 
      omega = gamma_mu*np.sqrt(np.dot(B,B)) 
      # write r_test, B and omega to a file 
    frac_done = np.float(i+1)/(n[0]+1) 
    t_elapsed = (time.clock()-t_start) 
    t_remain = (1-frac_done)*t_elapsed/frac_done 
    print frac_done*100,'% done in',t_elapsed/60.,'minutes...approximately',t_remain/60.,'minutes remaining' 

Respuesta

2

Una cosa obvia que puede hacer es reemplazar la línea

r_test_fast = reshape_vector(r_test) 

con

r_test_fast = r_test.reshape((3,1)) 

Probablemente no hará ninguna diferencia en el rendimiento, pero en cualquier caso, tiene sentido para usar las nuily builtins en lugar de reinventar la rueda.

En general, como probablemente ya hayas notado, el truco de optimizar numpy es expresar el algoritmo con la ayuda de numpy operaciones de matriz completa o al menos con slices en lugar de iterar sobre cada elemento en python code. Lo que tiende a evitar este tipo de "vectorización" son las denominadas dependencias portadas por bucle, es decir, bucles en los que cada iteración depende del resultado de una iteración previa. Mirando brevemente su código, no tiene tal cosa, y debería ser posible vectorizar su código muy bien.

EDIT: Una solución

no he verificado esto es correcto, pero debe darle una idea de cómo acercarse a ella.

Primero, tome cartesian() function, which we'll use. Entonces

 

def calculate_dipole_vect(mus, r_i, mom_i): 
    # Treat each mu sequentially 
    Bs = [] 
    omega = [] 
    for mu in mus: 
     rel = mu - r_i 
     r_norm = np.sqrt((rel * rel).sum(1)) 
     r_unit = rel/r_norm[:, np.newaxis] 
     A = 1e-7 

     num = A*(3*np.sum(mom_i * r_unit, 0)*r_unit - mom_i) 
     den = r_norm ** 3 
     B = np.sum(num/den[:, np.newaxis], 0) 
     Bs.append(B) 
     omega.append(gamma_mu * np.sqrt(np.dot(B, B))) 
    return Bs, omega 


# Transpose to get more "natural" ordering with row-major numpy 
r_i = r_i.T 
mom_i = mom_i.T 

t_start = time.clock() 
r_frac = cartesian((np.arange(n[0])/float(n[0]), 
        np.arange(n[1])/float(n[1]), 
        np.arange(n[2])/float(n[2]))) 
r_test = np.dot(r_frac, a) 
B, omega = calculate_dipole_vect(r_test, r_i, mom_i) 

print 'Total time for vectorized: %f s' % (time.clock() - t_start) 
 

Bueno, en mi prueba, esto es, en realidad, un poco más lento que el enfoque basado en bucles desde el que comencé. El asunto es que, en la versión original de la pregunta, ya estaba vectorizado con operaciones de matriz completa en arreglos de formas (20000, 3), por lo que cualquier vectorización adicional en realidad no aporta mucho más beneficio. De hecho, puede empeorar el rendimiento, como se indicó anteriormente, tal vez debido a grandes arreglos temporales.

+0

Creo que la sugerencia de Justin al perfil fue probablemente acertada, pero muchas gracias por eso ... aunque no estoy seguro de que la use, creo que tratar de entender ese ejemplo es probablemente una muy buena forma de aprender. :) – Statto

2

Si codifica profile, verá que el 99% del tiempo de ejecución está en calculate_dipole por lo que reducir el tiempo para este bucle realmente no dará una reducción notable en el tiempo de ejecución. Aún necesita enfocarse en calculate_dipole si quiere hacer esto más rápido. Probé mi código de Cython para calculate_dipole en esto y obtuve una reducción de alrededor de un factor de 2 en el tiempo total. También podría haber otras formas de mejorar el código de Cython.