2011-08-25 16 views
27

Estoy intentando escribir un script corto para agrupar mis datos a través de clojure (llamando a clases de Mahout). Tengo mis datos de entrada en este formato (que es el resultado de un php script)Agrupación (fkmeans) con Mahout usando Clojure

format: (tag) (image) (frequency) 
tag_sit image_a 0 
tag_sit image_b 1 
tag_lorem image_a 1 
tag_lorem image_b 0 
tag_dolor image_a 0 
tag_dolor image_b 1 
tag_ipsum image_a 1 
tag_ipsum image_b 1 
tag_amit image_a 1 
tag_amit image_b 0 
... (more) 

Entonces ellos escriben en un SequenceFile uso de este script (clojure)

#!./bin/clj 
(ns sensei.sequence.core) 

(require 'clojure.string) 
(require 'clojure.java.io) 

(import org.apache.hadoop.conf.Configuration) 
(import org.apache.hadoop.fs.FileSystem) 
(import org.apache.hadoop.fs.Path) 
(import org.apache.hadoop.io.SequenceFile) 
(import org.apache.hadoop.io.Text) 

(import org.apache.mahout.math.VectorWritable) 
(import org.apache.mahout.math.SequentialAccessSparseVector) 

(with-open [reader (clojure.java.io/reader *in*)] 
    (let [hadoop_configuration ((fn [] 
           (let [conf (new Configuration)] 
            (. conf set "fs.default.name" "hdfs://localhost:9000/") 
            conf))) 
     hadoop_fs (FileSystem/get hadoop_configuration)] 
    (reduce 
     (fn [writer [index value]] 
     (. writer append index value) 
     writer) 
     (SequenceFile/createWriter 
     hadoop_fs 
     hadoop_configuration 
     (new Path "test/sensei") 
     Text 
     VectorWritable) 
     (map 
     (fn [[tag row_vector]] 
      (let [input_index (new Text tag) 
       input_vector (new VectorWritable)] 
      (. input_vector set row_vector) 
      [input_index input_vector])) 
     (map 
      (fn [[tag photo_list]] 
      (let [photo_map (apply hash-map photo_list) 
        input_vector (new SequentialAccessSparseVector (count (vals photo_map)))] 
       (loop [frequency_list (vals photo_map)] 
       (if (zero? (count frequency_list)) 
        [tag input_vector] 
        (when-not (zero? (count frequency_list)) 
        (. input_vector set 
         (mod (count frequency_list) (count (vals photo_map))) 
         (Integer/parseInt (first frequency_list))) 
        (recur (rest frequency_list))))))) 
      (reduce 
      (fn [result next_line] 
       (let [[tag photo frequency] (clojure.string/split next_line #" ")] 
       (update-in result [tag] 
        #(if (nil? %) 
        [photo frequency] 
        (conj % photo frequency))))) 
      {} 
      (line-seq reader))))))) 

Básicamente se convierte la entrada en archivo de secuencia, en este formato

clave (texto): $ tag_uri valor (VectorWritable): un vector (cardinalidad = número de documentos) con un índice numérico y la frecuencia respectiva <0:1 1:0 2:0 3:1 4:0 ...>

Luego procedo a hacer la masa real con este script (refiriéndose a este blog post)

#!./bin/clj 

(ns sensei.clustering.fkmeans) 

(import org.apache.hadoop.conf.Configuration) 
(import org.apache.hadoop.fs.Path) 

(import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver) 
(import org.apache.mahout.common.distance.EuclideanDistanceMeasure) 
(import org.apache.mahout.clustering.kmeans.RandomSeedGenerator) 

(let [hadoop_configuration ((fn [] 
           (let [conf (new Configuration)] 
            (. conf set "fs.default.name" "hdfs://127.0.0.1:9000/") 
            conf))) 
     input_path (new Path "test/sensei") 
     output_path (new Path "test/clusters") 
     clusters_in_path (new Path "test/clusters/cluster-0")] 
    (FuzzyKMeansDriver/run 
    hadoop_configuration 
    input_path 
    (RandomSeedGenerator/buildRandom 
     hadoop_configuration 
     input_path 
     clusters_in_path 
     (int 2) 
     (new EuclideanDistanceMeasure)) 
    output_path 
    (new EuclideanDistanceMeasure) 
    (double 0.5) 
    (int 10) 
    (float 5.0) 
    true 
    false 
    (double 0.0) 
    false)) '' runSequential 

Sin embargo estoy recibiendo una salida como ésta

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". 
SLF4J: Defaulting to no-operation (NOP) logger implementation 
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 
11/08/25 15:20:16 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
11/08/25 15:20:16 INFO compress.CodecPool: Got brand-new compressor 
11/08/25 15:20:16 INFO compress.CodecPool: Got brand-new decompressor 
11/08/25 15:20:17 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 
11/08/25 15:20:17 INFO input.FileInputFormat: Total input paths to process : 1 
11/08/25 15:20:17 INFO mapred.JobClient: Running job: job_local_0001 
11/08/25 15:20:17 INFO mapred.MapTask: io.sort.mb = 100 
11/08/25 15:20:17 INFO mapred.MapTask: data buffer = 79691776/99614720 
11/08/25 15:20:17 INFO mapred.MapTask: record buffer = 262144/327680 
11/08/25 15:20:17 WARN mapred.LocalJobRunner: job_local_0001 
java.lang.IllegalStateException: No clusters found. Check your -c path. 
     at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansMapper.setup(FuzzyKMeansMapper.java:62) 
     at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:142) 
     at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) 
     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369) 
     at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:210) 
11/08/25 15:20:18 INFO mapred.JobClient: map 0% reduce 0% 
11/08/25 15:20:18 INFO mapred.JobClient: Job complete: job_local_0001 
11/08/25 15:20:18 INFO mapred.JobClient: Counters: 0 
Exception in thread "main" java.lang.RuntimeException: java.lang.InterruptedException: Fuzzy K-Means Iteration failed processing test/clusters/cluster-0/part-randomSeed 
     at clojure.lang.Util.runtimeException(Util.java:153) 
     at clojure.lang.Compiler.eval(Compiler.java:6417) 
     at clojure.lang.Compiler.load(Compiler.java:6843) 
     at clojure.lang.Compiler.loadFile(Compiler.java:6804) 
     at clojure.main$load_script.invoke(main.clj:282) 
     at clojure.main$script_opt.invoke(main.clj:342) 
     at clojure.main$main.doInvoke(main.clj:426) 
     at clojure.lang.RestFn.invoke(RestFn.java:436) 
     at clojure.lang.Var.invoke(Var.java:409) 
     at clojure.lang.AFn.applyToHelper(AFn.java:167) 
     at clojure.lang.Var.applyTo(Var.java:518) 
     at clojure.main.main(main.java:37) 
Caused by: java.lang.InterruptedException: Fuzzy K-Means Iteration failed processing test/clusters/cluster-0/part-randomSeed 
     at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.runIteration(FuzzyKMeansDriver.java:252) 
     at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClustersMR(FuzzyKMeansDriver.java:421) 
     at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClusters(FuzzyKMeansDriver.java:345) 
     at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.run(FuzzyKMeansDriver.java:295) 
     at sensei.clustering.fkmeans$eval17.invoke(fkmeans.clj:35) 
     at clojure.lang.Compiler.eval(Compiler.java:6406) 
     ... 10 more 

Cuando runSequential se establece en true

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". 
SLF4J: Defaulting to no-operation (NOP) logger implementation 
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. 
11/09/07 14:32:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
11/09/07 14:32:32 INFO compress.CodecPool: Got brand-new compressor 
11/09/07 14:32:32 INFO compress.CodecPool: Got brand-new decompressor 
Exception in thread "main" java.lang.IllegalStateException: Clusters is empty! 
     at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClustersSeq(FuzzyKMeansDriver.java:361) 
     at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.buildClusters(FuzzyKMeansDriver.java:343) 
     at org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver.run(FuzzyKMeansDriver.java:295) 
     at sensei.clustering.fkmeans$eval17.invoke(fkmeans.clj:35) 
     at clojure.lang.Compiler.eval(Compiler.java:6465) 
     at clojure.lang.Compiler.load(Compiler.java:6902) 
     at clojure.lang.Compiler.loadFile(Compiler.java:6863) 
     at clojure.main$load_script.invoke(main.clj:282) 
     at clojure.main$script_opt.invoke(main.clj:342) 
     at clojure.main$main.doInvoke(main.clj:426) 
     at clojure.lang.RestFn.invoke(RestFn.java:436) 
     at clojure.lang.Var.invoke(Var.java:409) 
     at clojure.lang.AFn.applyToHelper(AFn.java:167) 
     at clojure.lang.Var.applyTo(Var.java:518) 
     at clojure.main.main(main.java:37) 

También reescribí el script fkmeans en este formulario

#!./bin/clj 

(ns sensei.clustering.fkmeans) 

(import org.apache.hadoop.conf.Configuration) 
(import org.apache.hadoop.fs.Path) 

(import org.apache.mahout.clustering.fuzzykmeans.FuzzyKMeansDriver) 
(import org.apache.mahout.common.distance.EuclideanDistanceMeasure) 
(import org.apache.mahout.clustering.kmeans.RandomSeedGenerator) 

(let [hadoop_configuration ((fn [] 
           (let [conf (new Configuration)] 
            (. conf set "fs.default.name" "hdfs://localhost:9000/") 
            conf))) 
     driver (new FuzzyKMeansDriver)] 
    (. driver setConf hadoop_configuration) 
    (. driver 
    run 
    (into-array String ["--input" "test/sensei" 
         "--output" "test/clusters" 
         "--clusters" "test/clusters/clusters-0" 
         "--clustering" 
         "--overwrite" 
         "--emitMostLikely" "false" 
         "--numClusters" "3" 
         "--maxIter" "10" 
         "--m" "5"]))) 

pero todavía está consiguiendo mismo error que la primera versión inicial:/

herramienta de línea de comandos se ejecuta bien

$ bin/mahout fkmeans --input test/sensei --output test/clusters --clusters test/clusters/clusters-0 --clustering --overwrite --emitMostLikely false --numClusters 10 --maxIter 10 --m 5 

Sin embargo, no devolvería los puntos cuando intento clusterdumper a pesar de que - -la opción de agrupamiento existe en el comando anterior y --pointsDir se define aquí

$ ./bin/mahout clusterdump --seqFileDir test/clusters/clusters-1 --pointsDir test/clusters/clusteredPoints --output sensei.txt 

Ma Hout versión utilizada: 0,6-instantánea, clojure 1.3.0 a la instantánea

Por favor, hágamelo saber si me olvido de cualquier cosa

+1

¿Puedes verificar, realmente se generaron los grupos iniciales? –

+1

¿Cuál es la actualización de esto? – AlphaMale

+1

Sí, ¿alguna vez se resolvió? Si es así podrías publicar una respuesta. –

Respuesta

2

Mi conjetura es que la aplicación de Mahout Fuzzy-c-means necesita grupos iniciales a comenzar con, que tal vez no suministró?

También suena un poco como si estuviera ejecutando un solo nodo? Tenga en cuenta que para los sistemas de un solo nodo, debe evitar toda la sobrecarga de Mahout/Hadoop y simplemente usar un algoritmo de agrupación regular. Hadoop/Mahout tiene un costo considerable que solo da sus frutos cuando ya no se pueden procesar los datos en un solo sistema. No es "mapa reducido" a menos que lo haga en un número grande de sistemas.