2012-07-30 27 views
87

A menudo necesito aplicar una función a los grupos de un gran DataFrame (de tipos de datos mixtos) y me gustaría aprovechar los múltiples núcleos.Aplicación eficiente de una función a un panda DataFrame agrupado en paralelo

Puedo crear un iterador a partir de los grupos y usar el módulo de multiprocesamiento, pero no es eficiente porque todos los grupos y los resultados de la función deben ser escaneados para mensajes entre procesos.

¿Hay alguna manera de evitar el decapado o incluso evitar la copia del DataFrame por completo? Parece que las funciones de memoria compartida de los módulos de multiprocesamiento están limitadas a las matrices numpy. ¿Hay más opciones?

+0

Por lo que yo sé, no hay manera de compartir objetos arbitrarios. Me pregunto si el decapado lleva mucho más tiempo que la ganancia a través del multiprocesamiento. Tal vez deberías buscar la posibilidad de crear paquetes de trabajo más grandes para cada proceso para reducir el tiempo de decapado relativo. Otra posibilidad sería usar multiprocesamiento cuando crea los grupos. –

+3

Hago algo así pero usando UWSGI, Flask y preforking: cargo el marco de datos de pandas en un proceso, lo doblo x veces (convirtiéndolo en un objeto de memoria compartido) y luego llamo a esos procesos desde otro proceso de python donde concaturo los resultados. atm. Utilizo JSON como un proceso de comunicación, pero esto está por venir (aunque aún es muy experimental): http://pandas.pydata.org/pandas-docs/dev/io.html#msgpack-experimental – Carst

+0

Por cierto, ¿lo hizo? ¿Alguna vez has mirado HDF5 con fragmentación? (HDF5 no se guarda para la escritura simultánea, pero también puede guardar en archivos separados y, al final, concatenar cosas) – Carst

Respuesta

12

De los comentarios anteriores, parece que esto está planeado para pandas en algún momento (también hay un interesante rosetta project que acabo de notar).

Sin embargo, hasta que cada funcionalidad paralelo se incorpora en pandas, me di cuenta de que es muy fácil de escribir eficiente & sin memoria de copia de aumentos paralelos a pandas directamente utilizando cython + OpenMP y C++.

Aquí está un ejemplo corto de escribir un GroupBy de suma paralelo, cuyo uso es algo como esto:

import pandas as pd 
import para_group_demo 

df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)}) 
print para_group_demo.sum(df.a, df.b) 

y la salida es:

 sum 
key  
0  6 
1  11 
2  4 

Nota Sin duda, este la funcionalidad del ejemplo simple eventualmente formará parte de pandas. Algunas cosas, sin embargo, serán más naturales para paralelizar en C++ durante algún tiempo, y es importante tener en cuenta lo fácil que es combinar esto en pandas.


Para hacer esto, escribí una simple extensión de archivo de fuente única cuyo código sigue.

Se inicia con algunas importaciones y definiciones de tipo

from libc.stdint cimport int64_t, uint64_t 
from libcpp.vector cimport vector 
from libcpp.unordered_map cimport unordered_map 

cimport cython 
from cython.operator cimport dereference as deref, preincrement as inc 
from cython.parallel import prange 

import pandas as pd 

ctypedef unordered_map[int64_t, uint64_t] counts_t 
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t 
ctypedef vector[counts_t] counts_vec_t 

El tipo C++ unordered_map es para sumar por un solo hilo, y la vector es para sumar por todos los hilos.

Ahora a la función sum.Comienza con typed memory views para un acceso rápido:

def sum(crit, vals): 
    cdef int64_t[:] crit_view = crit.values 
    cdef int64_t[:] vals_view = vals.values 

La función continúa dividiendo el semi-igualmente a las roscas (aquí Hardcoded a 4), y teniendo cada suma hilo las entradas en su gama:

cdef uint64_t num_threads = 4 
    cdef uint64_t l = len(crit) 
    cdef uint64_t s = l/num_threads + 1 
    cdef uint64_t i, j, e 
    cdef counts_vec_t counts 
    counts = counts_vec_t(num_threads) 
    counts.resize(num_threads) 
    with cython.boundscheck(False): 
     for i in prange(num_threads, nogil=True): 
      j = i * s 
      e = j + s 
      if e > l: 
       e = l 
      while j < e: 
       counts[i][crit_view[j]] += vals_view[j] 
       inc(j) 

Cuando los hilos han completado, la función combina todos los resultados (de las diferentes rangos) en un solo unordered_map:

cdef counts_t total 
    cdef counts_it_t it, e_it 
    for i in range(num_threads): 
     it = counts[i].begin() 
     e_it = counts[i].end() 
     while it != e_it: 
      total[deref(it).first] += deref(it).second 
      inc(it)   

todos th izquierda de a es crear un DataFrame y devolver los resultados:

key, sum_ = [], [] 
    it = total.begin() 
    e_it = total.end() 
    while it != e_it: 
     key.append(deref(it).first) 
     sum_.append(deref(it).second) 
     inc(it) 

    df = pd.DataFrame({'key': key, 'sum': sum_}) 
    df.set_index('key', inplace=True) 
    return df 
Cuestiones relacionadas