2011-01-19 9 views
9

Actualmente estoy tratando de mejorar el rendimiento de un programa F # para que sea tan rápido como su equivalente en C#. El programa aplica una matriz de filtros a un búfer de píxeles. El acceso a la memoria siempre se hace usando punteros.Problema de rendimiento de manipulación de imágenes F #

Este es el código C# que se aplica a cada píxel de una imagen:

unsafe private static byte getPixelValue(byte* buffer, double* filter, int filterLength, double filterSum) 
{ 
    double sum = 0.0; 
    for (int i = 0; i < filterLength; ++i) 
    { 
     sum += (*buffer) * (*filter); 
     ++buffer; 
     ++filter; 
    } 

    sum = sum/filterSum; 

    if (sum > 255) return 255; 
    if (sum < 0) return 0; 
    return (byte) sum; 
} 

El # código F se parece a esto y toma tres veces más largo que el programa de C#:

let getPixelValue (buffer:nativeptr<byte>) (filterData:nativeptr<float>) filterLength filterSum : byte = 

    let rec accumulatePixel (acc:float) (buffer:nativeptr<byte>) (filter:nativeptr<float>) i = 
     if i > 0 then 
      let newAcc = acc + (float (NativePtr.read buffer) * (NativePtr.read filter)) 
      accumulatePixel newAcc (NativePtr.add buffer 1) (NativePtr.add filter 1) (i-1) 
     else 
      acc 

    let acc = (accumulatePixel 0.0 buffer filterData filterLength)/filterSum     

    match acc with 
     | _ when acc > 255.0 -> 255uy 
     | _ when acc < 0.0 -> 0uy 
     | _ -> byte acc 

El uso de variables mutables y un bucle for en F # da como resultado la misma velocidad que el uso de la recursión. Todos los proyectos están configurados para ejecutarse en modo de lanzamiento con la optimización de código activada.

¿Cómo se podría mejorar el rendimiento de la versión F #?

EDIT:

El cuello de botella parece estar en (NativePtr.get buffer offset). Si reemplazo este código con un valor fijo y también reemplazo el código correspondiente en la versión C# con un valor fijo, obtengo la misma velocidad para ambos programas. De hecho, en C# la velocidad no cambia en absoluto, pero en F # hace una gran diferencia.

¿Se puede cambiar este comportamiento o se arraiga profundamente en la arquitectura de F #?

EDIT 2:

I rediseñado el código de nuevo utilizar para de bucles. La velocidad de ejecución sigue siendo el mismo:

let mutable acc <- 0.0 
let mutable f <- filterData 
let mutable b <- tBuffer 
for i in 1 .. filter.FilterLength do 
    acc <- acc + (float (NativePtr.read b)) * (NativePtr.read f) 
    f <- NativePtr.add f 1 
    b <- NativePtr.add b 1 

Si se compara el código IL de una versión que utiliza (NativePtr.read b) y otra versión que es la misma excepto que utiliza un valor fijo 111uy en lugar de leerlo desde el puntero, Sólo las siguientes líneas en el cambio de código IL:

111uy tiene IL-Code ldc.i4.s 0x6f (0,3 segundos)

(NativePtr.read b) tiene líneas de IL-código ldloc.s b y ldobj uint8 (1,4 segundos)

Para comparar: C# hace el filtrado en 0.4 segundos.

El hecho de que leer el filtro no afecte el rendimiento mientras se lee desde el búfer de imagen es de alguna manera confuso. Antes de filtrar una línea de la imagen, copio la línea en un buffer que tiene la longitud de una línea. Es por eso que las operaciones de lectura no se extienden por toda la imagen, sino que se encuentran dentro de este búfer, que tiene un tamaño de aproximadamente 800 bytes.

+0

Basado en su comentario más reciente, me pregunto si el hecho de que el compilador de C# utiliza 'ldind.u1' en lugar de 'ldobj uint8' (que es el IL semánticamente equivalente que usa F #) hace la diferencia. Podría intentar ejecutar ildasm en el ejecutable F #, reemplazando 'ldobj uint8' con' ldind.u1', y ejecutando ilasm en él para ver si el rendimiento es diferente. – kvb

+0

Lo reemplacé pero no hubo diferencia. Gracias de cualquier manera. –

Respuesta

12

Si nos fijamos en el código IL real del bucle interior que atraviesa ambas memorias intermedias en paralelo generados por compilador de C# (parte pertinente):

L_0017: ldarg.0 
L_0018: ldc.i4.1 
L_0019: conv.i 
L_001a: add 
L_001b: starg.s buffer 
L_001d: ldarg.1 
L_001e: ldc.i4.8 
L_001f: conv.i 
L_0020: add 

y F # compilador:

L_0017: ldc.i4.1 
L_0018: conv.i 
L_0019: sizeof uint8 
L_001f: mul 
L_0020: add 
L_0021: ldarg.2 
L_0022: ldc.i4.1 
L_0023: conv.i 
L_0024: sizeof float64 
L_002a: mul 
L_002b: add 

notaremos que mientras que el código C# usa solo el operador add mientras que F # necesita ambos mul y add. Pero obviamente en cada paso solo necesitamos incrementar los punteros (por los valores 'sizeof byte' y 'sizeof float' respectivamente), no para calcular la dirección (addrBase + (sizeof byte)) F # mul es innecesario (siempre se multiplica por 1).

La causa de ello es que C# define ++ operador para los punteros, mientras que F # proporciona sólo add : nativeptr<'T> -> int -> nativeptr<'T> operador:

[<NoDynamicInvocation>] 
let inline add (x : nativeptr<'a>) (n:int) : nativeptr<'a> = to_nativeint x + nativeint n * (# "sizeof !0" type('a) : nativeint #) |> of_nativeint 

así que no es "profundamente arraigada" en Fa #, es sólo que module NativePtr carece inc y dec funciones.

Por cierto, sospecho que la muestra anterior podría escribirse de una manera más concisa si los argumentos se pasaron como matrices en lugar de punteros sin formato.

ACTUALIZACIÓN:

Lo mismo sucede con el siguiente código tiene la velocidad de sólo el 1% hasta (que parece generar muy similar a C# IL):

let getPixelValue (buffer:nativeptr<byte>) (filterData:nativeptr<float>) filterLength filterSum : byte = 

    let rec accumulatePixel (acc:float) (buffer:nativeptr<byte>) (filter:nativeptr<float>) i = 
     if i > 0 then 
      let newAcc = acc + (float (NativePtr.read buffer) * (NativePtr.read filter)) 
      accumulatePixel newAcc (NativePtr.ofNativeInt <| (NativePtr.toNativeInt buffer) + (nativeint 1)) (NativePtr.ofNativeInt <| (NativePtr.toNativeInt filter) + (nativeint 8)) (i-1) 
     else 
      acc 

    let acc = (accumulatePixel 0.0 buffer filterData filterLength)/filterSum     

    match acc with 
     | _ when acc > 255.0 -> 255uy 
     | _ when acc < 0.0 -> 0uy 
     | _ -> byte acc 

Otro pensamiento: también podría depender de el número de llamadas a getPixelValue que realiza su prueba (F # divide esta función en dos métodos, mientras que C# lo hace en uno).

¿Es posible que publique su código de prueba aquí?

En cuanto a la matriz - Me gustaría que el código sea al menos más conciso (y no unsafe).

ACTUALIZACIÓN # 2:

Parece que el cuello de botella real aquí es byte->float conversión.

C#:

L_0003: ldarg.1 
L_0004: ldind.u1 
L_0005: conv.r8 

F #:

L_000c: ldarg.1 
L_000d: ldobj uint8 
L_0012: conv.r.un 
L_0013: conv.r8 

Por alguna razón F # utiliza la siguiente ruta: byte->float32->float64 mientras que C# sólo hace byte->float64. No estoy seguro de por qué es eso, pero con el siguiente truco mi versión de F # se ejecuta con la misma velocidad que C# en muestra de prueba de gradbot (BTW, gracias gradbot para la prueba!):

let inline preadConvert (p : nativeptr<byte>) = (# "conv.r8" (# "ldobj !0" type (byte) p : byte #) : float #) 
let inline pinc (x : nativeptr<'a>) : nativeptr<'a> = NativePtr.toNativeInt x + (# "sizeof !0" type('a) : nativeint #) |> NativePtr.ofNativeInt 

let rec accumulatePixel_ed (acc, buffer, filter, i) = 
     if i > 0 then 
      accumulatePixel_ed 
       (acc + (preadConvert buffer) * (NativePtr.read filter), 
       (pinc buffer), 
       (pinc filter), 
       (i-1)) 
     else 
      acc 

Resultados:

adrian 6374985677.162810 1408.870900 ms 
    gradbot 6374985677.162810 1218.908200 ms 
     C# 6374985677.162810 227.832800 ms 
C# Offset 6374985677.162810 224.921000 ms 
    mutable 6374985677.162810 1254.337300 ms 
    ed'ka 6374985677.162810 227.543100 ms 

ÚLTIMA ACTUALIZACIÓN Resultó que podemos alcanzar la misma velocidad, incluso sin ningún tipo de hacks:

let rec accumulatePixel_ed_last (acc, buffer, filter, i) = 
     if i > 0 then 
      accumulatePixel_ed_last 
       (acc + (float << int16 <| NativePtr.read buffer) * (NativePtr.read filter), 
       (NativePtr.add buffer 1), 
       (NativePtr.add filter 1), 
       (i-1)) 
     else 
      acc 

Todo lo que tenemos que hacer es convertir byte en, digamos int16 y luego en float. De esta manera, se evitarán las instrucciones "costosas" conv.r.un.

PS código de conversión pertinente de "Prim-types.fs":

let inline float (x: ^a) = 
    (^a : (static member ToDouble : ^a -> float) (x)) 
    when ^a : float  = (# "" x : float #) 
    when ^a : float32 = (# "conv.r8" x : float #) 
    // [skipped] 
    when ^a : int16  = (# "conv.r8" x : float #) 
    // [skipped] 
    when ^a : byte  = (# "conv.r.un conv.r8" x : float #) 
    when ^a : decimal = (System.Convert.ToDouble((# "" x : decimal #))) 
+0

¡Gracias por esta interesante idea! Creé mi función de adición personalizada para punteros que no usa la multiplicación. La aceleración es solo alrededor del 1% del tiempo total de ejecución. ¿Crees que la versión de matriz todavía tendría un alto rendimiento? –

+4

¡Buen trabajo de detective! – kvb

+0

¡Guau! Estoy realmente impresionado :-) –

1

¿Cómo se puede comparar esto? Tiene menos llamadas al NativePtr.

let getPixelValue (buffer:nativeptr<byte>) (filterData:nativeptr<float>) filterLength filterSum : byte = 
    let accumulatePixel (acc:float) (buffer:nativeptr<byte>) (filter:nativeptr<float>) length = 
     let rec accumulate acc offset = 
      if offset < length then 
       let newAcc = acc + (float (NativePtr.get buffer offset) * (NativePtr.get filter offset)) 
       accumulate newAcc (offset + 1) 
      else 
       acc 
     accumulate acc 0 

    let acc = (accumulatePixel 0.0 buffer filterData filterLength)/filterSum     

    match acc with 
     | _ when acc > 255.0 -> 255uy 
     | _ when acc < 0.0 -> 0uy 
     | _ -> byte acc 

F código fuente de NativePtr.

[<NoDynamicInvocation>] 
[<CompiledName("AddPointerInlined")>] 
let inline add (x : nativeptr<'T>) (n:int) : nativeptr<'T> = toNativeInt x + nativeint n * (# "sizeof !0" type('T) : nativeint #) |> ofNativeInt 

[<NoDynamicInvocation>] 
[<CompiledName("GetPointerInlined")>] 
let inline get (p : nativeptr<'T>) n = (# "ldobj !0" type ('T) (add p n) : 'T #) 
+0

¡Gracias! Esta es una buena idea, pero la mejora en el rendimiento es marginal. –

+0

Creo que (NativePtr.get buffer offset) es tan rápido como (NativePtr.read (NativePtr.add buffer offset)), probablemente haga lo mismo detrás de las escenas ... –

+0

Tienes razón. Agregué el código fuente F # a mi respuesta. – gradbot

1

Mis resultados en una prueba más grande.

adrian 6374730426.098020 1561.102500 ms 
    gradbot 6374730426.098020 1842.768000 ms 
     C# 6374730426.098020 150.793500 ms 
C# Offset 6374730426.098020 150.318900 ms 
    mutable 6374730426.098020 1446.616700 ms 

F # código de prueba

open Microsoft.FSharp.NativeInterop 
open System.Runtime.InteropServices 
open System.Diagnostics 

open AccumulatePixel 
#nowarn "9" 

let test size fn = 
    let bufferByte = Marshal.AllocHGlobal(size * 4) 
    let bufferFloat = Marshal.AllocHGlobal(size * 8) 

    let bi = NativePtr.ofNativeInt bufferByte 
    let bf = NativePtr.ofNativeInt bufferFloat 

    let random = System.Random() 

    for i in 1 .. size do 
     NativePtr.set bi i (byte <| random.Next() % 256) 
     NativePtr.set bf i (random.NextDouble()) 

    let duration (f, name) = 
     let stopWatch = Stopwatch.StartNew() 
     let time = f(0.0, bi, bf, size) 
     stopWatch.Stop() 
     printfn "%10s %f %f ms" name time stopWatch.Elapsed.TotalMilliseconds 

    List.iter duration fn 

    Marshal.FreeHGlobal bufferFloat 
    Marshal.FreeHGlobal bufferByte 

let rec accumulatePixel_adrian (acc, buffer, filter, i) = 
    if i > 0 then 
     let newAcc = acc + (float (NativePtr.read buffer) * (NativePtr.read filter)) 
     accumulatePixel_adrian (newAcc, (NativePtr.add buffer 1), (NativePtr.add filter 1), (i - 1)) 
    else 
     acc 

let accumulatePixel_gradbot (acc, buffer, filter, length) = 
    let rec accumulate acc offset = 
     if offset < length then 
      let newAcc = acc + (float (NativePtr.get buffer offset) * (NativePtr.get filter offset)) 
      accumulate newAcc (offset + 1) 
     else 
      acc 
    accumulate acc 0 

let accumulatePixel_mutable (acc, buffer, filter, length) = 
    let mutable acc = 0.0 
    let mutable f = filter 
    let mutable b = buffer 
    for i in 1 .. length do 
     acc <- acc + (float (NativePtr.read b)) * (NativePtr.read f) 
     f <- NativePtr.add f 1 
     b <- NativePtr.add b 1 
    acc 

[ 
    accumulatePixel_adrian, "adrian"; 
    accumulatePixel_gradbot, "gradbot"; 
    AccumulatePixel.getPixelValue, "C#"; 
    AccumulatePixel.getPixelValueOffset, "C# Offset"; 
    accumulatePixel_mutable, "mutable"; 
] 
|> test 100000000 

System.Console.ReadLine() |> ignore 

C# código de prueba

namespace AccumulatePixel 
{ 
    public class AccumulatePixel 
    { 
     unsafe public static double getPixelValue(double sum, byte* buffer, double* filter, int filterLength) 
     { 
      for (int i = 0; i < filterLength; ++i) 
      { 
       sum += (*buffer) * (*filter); 
       ++buffer; 
       ++filter; 
      } 

      return sum; 
     } 

     unsafe public static double getPixelValueOffset(double sum, byte* buffer, double* filter, int filterLength) 
     { 
      for (int i = 0; i < filterLength; ++i) 
      { 
       sum += buffer[i] * filter[i]; 
      } 

      return sum; 
     } 
    } 
} 
Cuestiones relacionadas