2012-06-08 13 views
6

Todo el mundo sabe que el cerdo han apoyado DBStorage, pero sólo se admiten resultados de la carga de cerdo a mysql como laUna manera de leer datos de la tabla de MySQL a Pig

STORE data INTO DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'INSERT ...'); 

pero por favor, muéstrame el camino para leer tabla de MySQL como que

data = LOAD 'my_table' AS DBStorage('com.mysql.jdbc.Driver', 'dbc:mysql://host/db', 'SELECT * FROM my_table'); 

Aquí está mi código

public class DBLoader extends LoadFunc { 
    private final Log log = LogFactory.getLog(getClass()); 
    private ArrayList mProtoTuple = null; 
    private Connection con; 
    private String jdbcURL; 
    private String user; 
    private String pass; 
    private int batchSize; 
    private int count = 0; 
    private String query; 
    ResultSet result; 
    protected TupleFactory mTupleFactory = TupleFactory.getInstance(); 

    public DBLoader() { 
    } 

    public DBLoader(String driver, String jdbcURL, String user, String pass, 
      String query) { 

     try { 
      Class.forName(driver); 
     } catch (ClassNotFoundException e) { 
      log.error("can't load DB driver:" + driver, e); 
      throw new RuntimeException("Can't load DB Driver", e); 
     } 
     this.jdbcURL = jdbcURL; 
     this.user = user; 
     this.pass = pass; 
     this.query = query; 

    } 

    @Override 
    public InputFormat getInputFormat() throws IOException { 
     // TODO Auto-generated method stub 
     return new TextInputFormat(); 
    } 

    @Override 
    public Tuple getNext() throws IOException { 
     // TODO Auto-generated method stub 
     boolean next = false; 

     try { 
      next = result.next(); 
     } catch (SQLException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 

     if (!next) 
      return null; 
     int numColumns = 0; 
     // Get result set meta data 
     ResultSetMetaData rsmd; 
     try { 
      rsmd = result.getMetaData(); 
      numColumns = rsmd.getColumnCount(); 
     } catch (SQLException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 

     for (int i = 0; i < numColumns; i++) { 

      try { 
       Object field = result.getObject(i); 

       switch (DataType.findType(field)) { 
       case DataType.NULL: 

        mProtoTuple.add(null); 

        break; 

       case DataType.BOOLEAN: 
        mProtoTuple.add((Boolean) field); 

        break; 

       case DataType.INTEGER: 
        mProtoTuple.add((Integer) field); 

        break; 

       case DataType.LONG: 
        mProtoTuple.add((Long) field); 

        break; 

       case DataType.FLOAT: 
        mProtoTuple.add((Float) field); 

        break; 

       case DataType.DOUBLE: 
        mProtoTuple.add((Double) field); 

        break; 

       case DataType.BYTEARRAY: 
        byte[] b = ((DataByteArray) field).get(); 
        mProtoTuple.add(b); 

        break; 
       case DataType.CHARARRAY: 
        mProtoTuple.add((String) field); 

        break; 
       case DataType.BYTE: 
        mProtoTuple.add((Byte) field); 

        break; 

       case DataType.MAP: 
       case DataType.TUPLE: 
       case DataType.BAG: 
        throw new RuntimeException("Cannot store a non-flat tuple " 
          + "using DbStorage"); 

       default: 
        throw new RuntimeException("Unknown datatype " 
          + DataType.findType(field)); 

       } 

      } catch (Exception ee) { 
       throw new RuntimeException(ee); 
      } 
     } 

     Tuple t = mTupleFactory.newTuple(mProtoTuple); 
     mProtoTuple.clear(); 
     return t; 

    } 

    @Override 
    public void prepareToRead(RecordReader arg0, PigSplit arg1) 
      throws IOException { 

     con = null; 
     if (query == null) { 
      throw new IOException("SQL Insert command not specified"); 
     } 
     try { 
      if (user == null || pass == null) { 
       con = DriverManager.getConnection(jdbcURL); 
      } else { 
       con = DriverManager.getConnection(jdbcURL, user, pass); 
      } 
      con.setAutoCommit(false); 
      result = con.createStatement().executeQuery(query); 
     } catch (SQLException e) { 
      log.error("Unable to connect to JDBC @" + jdbcURL); 
      throw new IOException("JDBC Error", e); 
     } 
     count = 0; 
    } 

    @Override 
    public void setLocation(String location, Job job) throws IOException { 
     // TODO Auto-generated method stub 

     //TextInputFormat.setInputPaths(job, location); 

    } 

    class MyDBInputFormat extends InputFormat<NullWritable, NullWritable>{ 

     @Override 
     public RecordReader<NullWritable, NullWritable> createRecordReader(
       InputSplit arg0, TaskAttemptContext arg1) throws IOException, 
       InterruptedException { 
      // TODO Auto-generated method stub 
      return null; 
     } 

     @Override 
     public List<InputSplit> getSplits(JobContext arg0) throws IOException, 
       InterruptedException { 
      // TODO Auto-generated method stub 
      return null; 
     } 

    } 

} 

Intento muchas veces escribir UDF pero no éxito .....

Respuesta

2

Como usted dice, DBStorage solo admite guardar resultados en una base de datos.

Para cargar datos de MySQL, puede buscar en un proyecto llamado sqoop (que copia datos de una base de datos a HDFS), o puede realizar un volcado de MySQL y luego copiar el archivo en HDFS. Ambas formas requieren alguna interacción y no se pueden usar directamente desde dentro de Pig.

Una tercera opción sería estudiar la escritura de un Pig LoadFunc (dices que has intentado escribir una UDF). No debería ser demasiado difícil, necesitará pasar las mismas opciones que DBStorage (controlador, credenciales de conexión y una consulta SQL para ejecutar), y probablemente también pueda usar alguna inspección de metadatos del conjunto de resultados para generar automáticamente un esquema.

+0

hola, gracias. Pero como mencioné antes, solo quiero cargar directamente usado desde dentro de Pig. Publico mi código extiendo de LoadFunc como dijiste. Cuando ejecuto mi código desde grande, siempre arroja una excepción. – phuongdo

+0

A continuación, agregue la excepción que está viendo –

+0

@phuongdo ¿Tiene éxito al escribir Pig LoadFunc para cargar datos de mysql? – Shri

Cuestiones relacionadas