2012-09-11 22 views
9

Necesito encadenar dos trabajos de MapReduce. Usé JobControl para configurar job2 como dependiente de job1. ¡Funciona, se crean archivos de salida! ¡Pero no se detiene! En la cáscara permanece en este estado:(Hadoop) MapReduce - Trabajos en cadena - JobControl no se detiene

12/09/11 19:06:24 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 
12/09/11 19:06:25 INFO input.FileInputFormat: Total input paths to process : 1 
12/09/11 19:06:25 INFO util.NativeCodeLoader: Loaded the native-hadoop library 
12/09/11 19:06:25 WARN snappy.LoadSnappy: Snappy native library not loaded 
12/09/11 19:07:00 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 
12/09/11 19:07:00 INFO input.FileInputFormat: Total input paths to process : 1 

¿Cómo puedo detenerlo? Este es mi principal.

public static void main(String[] args) throws Exception { 
    Configuration conf = new Configuration(); 
    Configuration conf2 = new Configuration(); 

    Job job1 = new Job(conf, "canzoni"); 
    job1.setJarByClass(CanzoniOrdinate.class); 
    job1.setMapperClass(CanzoniMapper.class); 
    job1.setReducerClass(CanzoniReducer.class); 
    job1.setOutputKeyClass(Text.class); 
    job1.setOutputValueClass(IntWritable.class); 

    ControlledJob cJob1 = new ControlledJob(conf); 
    cJob1.setJob(job1); 
    FileInputFormat.addInputPath(job1, new Path(args[0])); 
    FileOutputFormat.setOutputPath(job1, new Path("/user/hduser/tmp")); 


    Job job2 = new Job(conf2, "songsort"); 
    job2.setJarByClass(CanzoniOrdinate.class); 
    job2.setMapperClass(CanzoniSorterMapper.class); 
    job2.setSortComparatorClass(ReverseOrder.class); 
    job2.setInputFormatClass(KeyValueTextInputFormat.class); 
    job2.setReducerClass(CanzoniSorterReducer.class); 
    job2.setMapOutputKeyClass(IntWritable.class); 
    job2.setMapOutputValueClass(Text.class); 
    job2.setOutputKeyClass(Text.class); 
    job2.setOutputValueClass(IntWritable.class); 

    ControlledJob cJob2 = new ControlledJob(conf2); 
    cJob2.setJob(job2); 
    FileInputFormat.addInputPath(job2, new Path("/user/hduser/tmp/part*")); 
    FileOutputFormat.setOutputPath(job2, new Path(args[1])); 

    JobControl jobctrl = new JobControl("jobctrl"); 
    jobctrl.addJob(cJob1); 
    jobctrl.addJob(cJob2); 
    cJob2.addDependingJob(cJob1); 
    jobctrl.run(); 


    //////////////// 
    // NEW CODE /// 
    ////////////// 


    // delete jobctrl.run(); 
    Thread t = new Thread(jobctrl); 
    t.start(); 
    String oldStatusJ1 = null; 
    String oldStatusJ2 = null; 
    while (!jobctrl.allFinished()) { 
     String status =cJob1.toString(); 
     String status2 =cJob2.toString(); 
     if (!status.equals(oldStatusJ1)) { 
     System.out.println(status); 
     oldStatusJ1 = status; 
     } 
     if (!status2.equals(oldStatusJ2)) { 
     System.out.println(status2); 
     oldStatusJ2 = status2; 
     }  
    } 
    System.exit(0); 

}}

+1

Lo resuelto utilizando un hilo para empezar JobControl. Comprobé que los trabajos se hayan realizado utilizando un ciclo while: while (! Jobctrl.allFinished()) y System.exit() fuera del ciclo. Ahora me gustaría que los trabajos devuelvan mensajes de información, todo lo que obtuve es saber qué trabajo está en ejecución, con ControlledJob.toString(). No sé cómo obtener mensajes de información como: número de tareas del asignador, número de tareas de reducción, registros de entrada o salida, etc. ¿alguna idea para obtener estos mensajes? –

+0

¿Es suficiente "job.getCounters(). ToString()"? – zsxwing

+0

¿Es esto un error en la clase JobControl? – Rags

Respuesta

5

esencialmente hice lo Pietro aludido anteriormente.

public class JobRunner implements Runnable { 
    private JobControl control; 

    public JobRunner(JobControl _control) { 
    this.control = _control; 
    } 

    public void run() { 
    this.control.run(); 
    } 
} 

y en mi map/reduce la clase que tengo:

public void handleRun(JobControl control) throws InterruptedException { 
    JobRunner runner = new JobRunner(control); 
    Thread t = new Thread(runner); 
    t.start(); 

    while (!control.allFinished()) { 
     System.out.println("Still running..."); 
     Thread.sleep(5000); 
    } 
} 

en la que acabo de pasar el objeto JobControl.

+2

+1 para proporcionar un ejemplo de trabajo – beterthanlife

3

El propio objeto JobControl es ejecutable, por lo que sólo se puede utilizar de esta manera:

new Thread(myJobControlInstance).start() 
0

Sólo un pellizco para el fragmento de código sinemetu1 lo había compartido ..

Usted puede dejar la llamada a la JobRunner como JobControl por sí mismo implementa Ejecutable

 Thread thread = new Thread(jobControl); 
     thread.start(); 

     while (!jobControl.allFinished()) { 
      System.out.println("Still running..."); 
      Thread.sleep(5000); 
     } 

también me encontré con este enlace donde el usuario confirma que JobControl sólo se puede ejecutar con el nuevo hilo. https://www.mail-archive.com/[email protected]/msg00556.html

0

probar esto:

Thread jcThread = new Thread(jobControl); 
    jcThread.start(); 
    System.out.println("循环判断jobControl运行状态 >>>>>>>>>>>>>>>>"); 
    while (true) { 
     if (jobControl.allFinished()) { 
     System.out.println("====>> jobControl.allFinished=" + jobControl.getSuccessfulJobList()); 
     jobControl.stop(); 
     // 如果不加 break 或者 return,程序会一直循环 
     break; 
    } 

    if (jobControl.getFailedJobList().size() > 0) { 
     succ = 0; 
     System.out.println("====>> jobControl.getFailedJobList=" + jobControl.getFailedJobList()); 
     jobControl.stop(); 

     // 如果不加 break 或者 return,程序会一直循环 
     break; 
    } 
} 
Cuestiones relacionadas