Sqoop utilisant l’API Java pour importer dans les tables Hbase

J’ai essayé d’importer des données de la firebase database MySQL vers Hbase en utilisant Sqoop, mais j’ai rencontré une erreur par la suite. S’il vous plaît pourriez-vous m’aider à ce sujet? (J’utilise Sqoop 1)

Mon code est comme:

import com.cloudera.sqoop.SqoopOptions; import com.cloudera.sqoop.tool.ImportTool; import com.cloudera.sqoop.SqoopOptions.IncrementalMode; import com.cloudera.sqoop.tool.SqoopTool; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.sqoop.Sqoop; public class SqoopScheduler { Logger log = Logger.getLogger(SqoopScheduler.class); private static Configuration configuration = null; private static SqoopOptions SqoopOptions = new SqoopOptions(); private static final Ssortingng driver = "com.mysql.jdbc.Driver"; private static final Ssortingng connectionSsortingng = "jdbc:mysql://jira.com:3306/jirarepository"; private static final Ssortingng username = "jiraadmin"; private static final Ssortingng password = "jiraadmin"; private static final Ssortingng splitBy = "issue_id"; private static final int Counter = 21000; private static final Ssortingng querySsortingng = "select * from issues where issue_id < "; private static void setUp() { SqoopOptions.setJobName("HBase_SequentialImport"); SqoopOptions.setMapreduceJobName("HBase_SequentialImport"); SqoopOptions.setDriverClassName(driver); SqoopOptions.setConnectString(connectionString); SqoopOptions.setUsername(username); SqoopOptions.setPassword(password); SqoopOptions.setSplitByCol(splitBy); SqoopOptions.setSqlQuery(queryString + Counter + " and $CONDITIONS"); SqoopOptions.setHBaseBulkLoadEnabled(true); SqoopOptions.setHBaseTable("jira_issues"); SqoopOptions.setHBaseColFamily("issue_detail"); SqoopOptions.setHBaseRowKeyColumn(splitBy); } private static int runIt() { int res; res = new ImportTool().run(SqoopOptions); if (res != 0) { throw new RuntimeException("Sqoop API Failed - return code : "+ Integer.toString(res)); } return res; } @SuppressWarnings("deprecation") public static void main(String[] args) throws Exception { setUp(); int result = runIt(); System.out.println(result); } } 

L’erreur que je rencontre est la suivante:

 Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.JavaMain], main() threw exception, java.lang.NullPointerException org.apache.oozie.action.hadoop.JavaMainException: java.lang.NullPointerException at org.apache.oozie.action.hadoop.JavaMain.run(JavaMain.java:60) at org.apache.oozie.action.hadoop.LauncherMain.run(LauncherMain.java:46) at org.apache.oozie.action.hadoop.JavaMain.main(JavaMain.java:38) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.oozie.action.hadoop.LauncherMapper.map(LauncherMapper.java:228) at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54) at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:453) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343) at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler.runSubtask(LocalContainerLauncher.java:370) at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler.runTask(LocalContainerLauncher.java:295) at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler.access$200(LocalContainerLauncher.java:181) at org.apache.hadoop.mapred.LocalContainerLauncher$EventHandler$1.run(LocalContainerLauncher.java:224) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) at java.util.concurrent.FutureTask.run(FutureTask.java:262) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NullPointerException at org.apache.hadoop.fs.FileSystem.fixRelativePart(FileSystem.java:2147) at org.apache.hadoop.hdfs.DissortingbutedFileSystem.delete(DissortingbutedFileSystem.java:633) at org.apache.sqoop.mapreduce.HBaseBulkImportJob.jobTeardown(HBaseBulkImportJob.java:124) at org.apache.sqoop.mapreduce.ImportJobBase.runImport(ImportJobBase.java:282) at org.apache.sqoop.manager.SqlManager.importQuery(SqlManager.java:724) at org.apache.sqoop.tool.ImportTool.importTable(ImportTool.java:499) at org.apache.sqoop.tool.ImportTool.run(ImportTool.java:605) at SqoopScheduler.runIt(SqoopScheduler.java:61) at SqoopScheduler.main(SqoopScheduler.java:75) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.oozie.action.hadoop.JavaMain.run(JavaMain.java:57) ... 19 more 

Changez le querySsortingng en commençant par $ CONDITIONS après la clause where

  private static final Ssortingng querySsortingng = "select * from issues WHERE \$CONDITIONS AND issue_id < "; SqoopOptions.setSqlQuery(queryString + Counter); 

Après quelques tentatives, il est apparu que l’erreur était due à une faille dans la façon dont MySQL Map réduisait les travaux. Il essaie de définir la taille de récupération de la carte sqoop. Réduire le travail en interne là où elle échoue.

Répondre à celui-ci pour que quiconque se retrouve avec celui-ci puisse avancer facilement.

Tout ce que vous avez à faire est de spécifier une taille de récupération explicite dans SqoopOptions comme:

 private static SqoopOptions SqoopOptions = new SqoopOptions(); SqoopOptions.setFetchSize(2000); 

Et puis ça devrait bien fonctionner.