# install wget utility into the virtual image sudo yum install wget # use wget to download the Twain and Cooper's works $ wget -U firefox http://www.gutenberg.org/cache/epub/76/pg76.txt $ wget -U firefox http://www.gutenberg.org/cache/epub/3285/pg3285.txt # load both into the HDFS file system # first give the files better names # DS for Deerslayer # HF for Huckleberry Finn $ mv pg3285.txt DS.txt $ mv pg76.txt HF.txt # this next command will fail if the directory already exists $ hadoop fs -mkdir /user/cloudera # now put the text into the directory $ hadoop fs -put HF.txt /user/cloudera # way too much typing, create aliases for hadoop commands $ alias hput="hadoop fs -put" $ alias hcat="hadoop fs -cat" $ alias hls="hadoop fs -ls" # for CDH4 $ alias hrmr="hadoop fs -rm -r" # for CDH3 $ alias hrmr="hadoop fs -rmr" # load the other article # but add some compression because we can $ gzip DS.txt # the . in the next command references the cloudera home directory # in hdfs, /user/cloudera $ hput DS.txt.gz . # now take a look at the files we have in place $ hls Found 2 items -rw-r--r-- 1 cloudera supergroup 459386 2012-08-08 19:34 /user/cloudera/DS.txt.gz -rw-r--r-- 1 cloudera supergroup 597587 2012-08-08 19:35 /user/cloudera/HF.txt
很多 Hadoop 教程使用示例 jar 文件中所包含的单词计数示例。事实证明,大量分析都涉及计数和汇总。清单 4 中的示例显示如何调用计数器。
清单 4. 对 Twain 和 Cooper 的单词进行计数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
# hadoop comes with some examples # this next line uses the provided java implementation of a # word count program # for CDH4: hadoop jar /usr/lib/hadoop-0.20-mapreduce/hadoop-examples.jar wordcount HF.txt HF.out # for CDH3: hadoop jar /usr/lib/hadoop/hadoop-examples.jar wordcount HF.txt HF.out # for CDH4: hadoop jar /usr/lib/hadoop-0.20-mapreduce/hadoop-examples.jar wordcount DS.txt.gz DS.out # for CDH3: hadoop jar /usr/lib/hadoop/hadoop-examples.jar wordcount DS.txt.gz DS.out
# way too much typing, create aliases for hadoop commands $ alias hput="hadoop fs -put" $ alias hcat="hadoop fs -cat" $ alias hls="hadoop fs -ls" $ alias hrmr="hadoop fs -rmr" # first list the output directory $ hls /user/cloudera/HF.out Found 3 items -rw-r--r-- 1 cloudera supergroup 0 2012-08-08 19:38 /user/cloudera/HF.out/_SUCCESS drwxr-xr-x - cloudera supergroup 0 2012-08-08 19:38 /user/cloudera/HF.out/_logs -rw-r--r-- 1 cl... sup... 138218 2012-08-08 19:38 /user/cloudera/HF.out/part-r-00000 # now cat the file and pipe it to the less command $ hcat /user/cloudera/HF.out/part-r-00000 | less # here are a few lines from the file, the word elephants only got used twice elder, 1 eldest 1 elect 1 elected 1 electronic 27 electronically 1 electronically, 1 elegant 1 elegant!--'deed 1 elegant, 1 elephants 2
# way too much typing, create aliases for hadoop commands $ alias hput="hadoop fs -put" $ alias hcat="hadoop fs -cat" $ alias hls="hadoop fs -ls" $ alias hrmr="hadoop fs -rmr" $ hadoop jar /usr/lib/hadoop/hadoop-examples.jar wordcount HF.txt HF.out 12/08/08 19:26:23 INFO mapred.JobClient: Cleaning up the staging area hdfs://0.0.0.0/var/l... 12/08/08 19:26:23 ERROR security.UserGroupInformation: PriviledgedActionException as:cloudera (auth:SIMPLE) cause:org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory HF.out already exists org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory HF.out already exists at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat. checkOutputSpecs(FileOutputFormat.java:132) at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:872) at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:833) .... lines deleted # the simple fix is to remove the existing output directory $ hrmr HF.out # now you can re-run the job successfully # if you run short of space and the namenode enters safemode # clean up some file space and then $ hadoop dfsadmin -safemode leave
Hadoop 包括一个检查 HDFS 状态的浏览器界面。图 7 显示了单词计数作业的输出。
图 7. 使用浏览器查看 HDFS
Cloudera 网站免费提供了一个更复杂的控制台。这个控制台提供了大量超出标准 Hadoop Web 界面的功能。请注意,图 8 所示的 HDFS 健康状态为 Bad。
# here is the mapper we'll connect to the streaming hadoop interface # the mapper is reading the text in the file - not really appreciating Twain's humor # # modified from # http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/ $ cat mapper.py #!/usr/bin/env python import sys # read stdin for linein in sys.stdin: # strip blanks linein = linein.strip() # split into words mywords = linein.split() # loop on mywords, output the length of each word for word in mywords: # the reducer just cares about the first column, # normally there is a key - value pair print '%s %s' % (len(word), 0)
# the awk code is modified from http://www.commandlinefu.com # awk is calculating # NR - the number of words in total # sum/NR - the average word length # sqrt(mean2/NR) - the standard deviation $ cat statsreducer.awk awk '{delta = $1 - avg; avg += delta / NR; \ mean2 += delta * ($1 - avg); sum=$1+sum } \ END { print NR, sum/NR, sqrt(mean2 / NR); }'
# test locally # because we're using Hadoop Streaming, we can test the # mapper and reducer with simple pipes # the "sort" phase is a reminder the keys are sorted # before presentation to the reducer #in this example it doesn't matter what order the # word length values are presented for calculating the std deviation $ zcat ../DS.txt.gz | ./mapper.py | sort | ./statsreducer.awk 215107 4.56068 2.50734 # now run in hadoop with streaming # CDH4 hadoop jar /usr/lib/hadoop-mapreduce/hadoop-streaming.jar \ -input HF.txt -output HFstats -file ./mapper.py -file \ ./statsreducer.awk -mapper ./mapper.py -reducer ./statsreducer.awk # CDH3 $ hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u4.jar \ -input HF.txt -output HFstats -file ./mapper.py -file ./statsreducer.awk \ -mapper ./mapper.py -reducer ./statsreducer.awk $ hls HFstats Found 3 items -rw-r--r-- 1 cloudera supergroup 0 2012-08-12 15:38 /user/cloudera/HFstats/_SUCCESS drwxr-xr-x - cloudera supergroup 0 2012-08-12 15:37 /user/cloudera/HFstats/_logs -rw-r--r-- 1 cloudera ... 24 2012-08-12 15:37 /user/cloudera/HFstats/part-00000 $ hcat /user/cloudera/HFstats/part-00000 113365 4.11227 2.17086 # now for cooper $ hadoop jar /usr/lib/hadoop-0.20/contrib/streaming/hadoop-streaming-0.20.2-cdh3u4.jar \ -input DS.txt.gz -output DSstats -file ./mapper.py -file ./statsreducer.awk \ -mapper ./mapper.py -reducer ./statsreducer.awk $ hcat /user/cloudera/DSstats/part-00000 215107 4.56068 2.50734
Mark Twain 的粉丝若知道 Hadoop 发现 Cooper 使用较长的单词,并且其标准偏差令人震惊,那么他们就可以愉快地放松了(幽默意图)。当然,要是假设较短的单词会更好。让我们继续,下一步是将 HDFS 中的数据写入 Informix 和 DB2。
使用 Sqoop 通过 JDBC 将来自 HDFS 的数据写入 Informix、DB2 或 MySQL
#Sqoop needs access to the JDBC driver for every # database that it will access # please copy the driver for each database you plan to use for these exercises # the MySQL database and driver are already installed in the virtual image # but you still need to copy the driver to the sqoop/lib directory #one time copy of jdbc driver to sqoop lib directory $ sudo cp Informix_JDBC_Driver/lib/ifxjdbc*.jar /usr/lib/sqoop/lib/ $ sudo cp db2jdbc/db2jcc*.jar /usr/lib/sqoop/lib/ $ sudo cp /usr/lib/hive/lib/mysql-connector-java-5.1.15-bin.jar /usr/lib/sqoop/lib/
# create a target table to put the data # fire up dbaccess and use this sql # create table wordcount ( word char(36) primary key, n int); # now run the sqoop command # this is best put in a shell script to help avoid typos... $ sqoop export -D sqoop.export.records.per.statement=1 \ --fields-terminated-by '\t' --driver com.informix.jdbc.IfxDriver \ --connect \ "jdbc:informix-sqli://myhost:54321/stores_demo:informixserver=i7;user=me;password=mypw" \ --table wordcount --export-dir /user/cloudera/HF.out
12/08/08 21:39:42 INFO manager.SqlManager: Using default fetchSize of 1000 12/08/08 21:39:42 INFO tool.CodeGenTool: Beginning code generation 12/08/08 21:39:43 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM wordcount AS t WHERE 1=0 12/08/08 21:39:43 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM wordcount AS t WHERE 1=0 12/08/08 21:39:43 INFO orm.CompilationManager: HADOOP_HOME is /usr/lib/hadoop 12/08/08 21:39:43 INFO orm.CompilationManager: Found hadoop core jar at: /usr/lib/hadoop/hadoop-0.20.2-cdh3u4-core.jar 12/08/08 21:39:45 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/248b77c05740f863a15e0136accf32cf/wordcount.jar 12/08/08 21:39:45 INFO mapreduce.ExportJobBase: Beginning export of wordcount 12/08/08 21:39:45 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM wordcount AS t WHERE 1=0 12/08/08 21:39:46 INFO input.FileInputFormat: Total input paths to process : 1 12/08/08 21:39:46 INFO input.FileInputFormat: Total input paths to process : 1 12/08/08 21:39:46 INFO mapred.JobClient: Running job: job_201208081900_0012 12/08/08 21:39:47 INFO mapred.JobClient: map 0% reduce 0% 12/08/08 21:39:58 INFO mapred.JobClient: map 38% reduce 0% 12/08/08 21:40:00 INFO mapred.JobClient: map 64% reduce 0% 12/08/08 21:40:04 INFO mapred.JobClient: map 82% reduce 0% 12/08/08 21:40:07 INFO mapred.JobClient: map 98% reduce 0% 12/08/08 21:40:09 INFO mapred.JobClient: Task Id : attempt_201208081900_0012_m_000000_0, Status : FAILED java.io.IOException: java.sql.SQLException: Encoding or code set not supported. at ...SqlRecordWriter.close(AsyncSqlRecordWriter.java:187) at ...$NewDirectOutputCollector.close(MapTask.java:540) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:649) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:323) at org.apache.hadoop.mapred.Child$4.run(Child.java:270) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at ....doAs(UserGroupInformation.java:1177) at org.apache.hadoop.mapred.Child.main(Child.java:264) Caused by: java.sql.SQLException: Encoding or code set not supported. at com.informix.util.IfxErrMsg.getSQLException(IfxErrMsg.java:413) at com.informix.jdbc.IfxChar.toIfx(IfxChar.java:135) at com.informix.jdbc.IfxSqli.a(IfxSqli.java:1304) at com.informix.jdbc.IfxSqli.d(IfxSqli.java:1605) at com.informix.jdbc.IfxS 12/08/08 21:40:11 INFO mapred.JobClient: map 0% reduce 0% 12/08/08 21:40:15 INFO mapred.JobClient: Task Id : attempt_201208081900_0012_m_000000_1, Status : FAILED java.io.IOException: java.sql.SQLException: Unique constraint (informix.u169_821) violated. at .mapreduce.AsyncSqlRecordWriter.write(AsyncSqlRecordWriter.java:223) at .mapreduce.AsyncSqlRecordWriter.write(AsyncSqlRecordWriter.java:49) at .mapred.MapTask$NewDirectOutputCollector.write(MapTask.java:531) at .mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) at com.cloudera.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:82) at com.cloudera.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:40) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) at .mapreduce.AutoProgressMapper.run(AutoProgressMapper.java:189) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:647) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:323) at org.apache.hadoop.mapred.Child$4.run(Child.java:270) at java.security.AccessController.doPrivileged(Native Method) at javax.security.a 12/08/08 21:40:20 INFO mapred.JobClient: Task Id : attempt_201208081900_0012_m_000000_2, Status : FAILED java.sql.SQLException: Unique constraint (informix.u169_821) violated. at .mapreduce.AsyncSqlRecordWriter.write(AsyncSqlRecordWriter.java:223) at .mapreduce.AsyncSqlRecordWriter.write(AsyncSqlRecordWriter.java:49) at .mapred.MapTask$NewDirectOutputCollector.write(MapTask.java:531) at .mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) at com.cloudera.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:82) at com.cloudera.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:40) at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:144) at .mapreduce.AutoProgressMapper.run(AutoProgressMapper.java:189) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:647) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:323) at org.apache.hadoop.mapred.Child$4.run(Child.java:270) at java.security.AccessController.doPrivileged(Native Method) at javax.security.a 12/08/08 21:40:27 INFO mapred.JobClient: Job complete: job_201208081900_0012 12/08/08 21:40:27 INFO mapred.JobClient: Counters: 7 12/08/08 21:40:27 INFO mapred.JobClient: Job Counters 12/08/08 21:40:27 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=38479 12/08/08 21:40:27 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 12/08/08 21:40:27 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 12/08/08 21:40:27 INFO mapred.JobClient: Launched map tasks=4 12/08/08 21:40:27 INFO mapred.JobClient: Data-local map tasks=4 12/08/08 21:40:27 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0 12/08/08 21:40:27 INFO mapred.JobClient: Failed map tasks=1 12/08/08 21:40:27 INFO mapreduce.ExportJobBase: Transferred 0 bytes in 41.5758 seconds (0 bytes/sec) 12/08/08 21:40:27 INFO mapreduce.ExportJobBase: Exported 0 records. 12/08/08 21:40:27 ERROR tool.ExportTool: Error during export: Export job failed! # despite the errors above, rows are inserted into the wordcount table # one row is missing # the retry and duplicate key exception are most likely related, but # troubleshooting will be saved for a later article # check how we did # nothing like a "here document" shell script $ dbaccess stores_demo - <<eoj > select count(*) from wordcount; > eoj Database selected. (count(*)) 13837 1 row(s) retrieved. Database closed.
# here is the db2 syntax # create a destination table for db2 # #db2 => connect to sample # # Database Connection Information # # Database server = DB2/LINUXX8664 10.1.0 # SQL authorization ID = DB2INST1 # Local database alias = SAMPLE # #db2 => create table wordcount ( word char(36) not null primary key , n int) #DB20000I The SQL command completed successfully. # sqoop export -D sqoop.export.records.per.statement=1 \ --fields-terminated-by '\t' \ --driver com.ibm.db2.jcc.DB2Driver \ --connect "jdbc:db2://192.168.1.131:50001/sample" \ --username db2inst1 --password db2inst1 \ --table wordcount --export-dir /user/cloudera/HF.out 12/08/09 12:32:59 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead. 12/08/09 12:32:59 INFO manager.SqlManager: Using default fetchSize of 1000 12/08/09 12:32:59 INFO tool.CodeGenTool: Beginning code generation 12/08/09 12:32:59 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM wordcount AS t WHERE 1=0 12/08/09 12:32:59 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM wordcount AS t WHERE 1=0 12/08/09 12:32:59 INFO orm.CompilationManager: HADOOP_HOME is /usr/lib/hadoop 12/08/09 12:32:59 INFO orm.CompilationManager: Found hadoop core jar at: /usr/lib/hadoop/hadoop-0.20.2-cdh3u4-core.jar 12/08/09 12:33:00 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/5532984df6e28e5a45884a21bab245ba/wordcount.jar 12/08/09 12:33:00 INFO mapreduce.ExportJobBase: Beginning export of wordcount 12/08/09 12:33:01 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM wordcount AS t WHERE 1=0 12/08/09 12:33:02 INFO input.FileInputFormat: Total input paths to process : 1 12/08/09 12:33:02 INFO input.FileInputFormat: Total input paths to process : 1 12/08/09 12:33:02 INFO mapred.JobClient: Running job: job_201208091208_0002 12/08/09 12:33:03 INFO mapred.JobClient: map 0% reduce 0% 12/08/09 12:33:14 INFO mapred.JobClient: map 24% reduce 0% 12/08/09 12:33:17 INFO mapred.JobClient: map 44% reduce 0% 12/08/09 12:33:20 INFO mapred.JobClient: map 67% reduce 0% 12/08/09 12:33:23 INFO mapred.JobClient: map 86% reduce 0% 12/08/09 12:33:24 INFO mapred.JobClient: map 100% reduce 0% 12/08/09 12:33:25 INFO mapred.JobClient: Job complete: job_201208091208_0002 12/08/09 12:33:25 INFO mapred.JobClient: Counters: 16 12/08/09 12:33:25 INFO mapred.JobClient: Job Counters 12/08/09 12:33:25 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=21648 12/08/09 12:33:25 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 12/08/09 12:33:25 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 12/08/09 12:33:25 INFO mapred.JobClient: Launched map tasks=1 12/08/09 12:33:25 INFO mapred.JobClient: Data-local map tasks=1 12/08/09 12:33:25 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=0 12/08/09 12:33:25 INFO mapred.JobClient: FileSystemCounters 12/08/09 12:33:25 INFO mapred.JobClient: HDFS_BYTES_READ=138350 12/08/09 12:33:25 INFO mapred.JobClient: FILE_BYTES_WRITTEN=69425 12/08/09 12:33:25 INFO mapred.JobClient: Map-Reduce Framework 12/08/09 12:33:25 INFO mapred.JobClient: Map input records=13838 12/08/09 12:33:25 INFO mapred.JobClient: Physical memory (bytes) snapshot=105148416 12/08/09 12:33:25 INFO mapred.JobClient: Spilled Records=0 12/08/09 12:33:25 INFO mapred.JobClient: CPU time spent (ms)=9250 12/08/09 12:33:25 INFO mapred.JobClient: Total committed heap usage (bytes)=42008576 12/08/09 12:33:25 INFO mapred.JobClient: Virtual memory (bytes) snapshot=596447232 12/08/09 12:33:25 INFO mapred.JobClient: Map output records=13838 12/08/09 12:33:25 INFO mapred.JobClient: SPLIT_RAW_BYTES=126 12/08/09 12:33:25 INFO mapreduce.ExportJobBase: Transferred 135.1074 KB in 24.4977 seconds (5.5151 KB/sec) 12/08/09 12:33:25 INFO mapreduce.ExportJobBase: Exported 13838 records. # check on the results... # #db2 => select count(*) from wordcount # #1 #----------- # 13838 # # 1 record(s) selected. # #
# if you don't have Informix or DB2 you can still do this example # mysql - it is already installed in the VM, here is how to access # one time copy of the JDBC driver sudo cp /usr/lib/hive/lib/mysql-connector-java-5.1.15-bin.jar /usr/lib/sqoop/lib/ # now create the database and table $ mysql -u root Welcome to the MySQL monitor. Commands end with ; or \g. Your MySQL connection id is 45 Server version: 5.0.95 Source distribution Copyright (c) 2000, 2011, Oracle and/or its affiliates. All rights reserved. Oracle is a registered trademark of Oracle Corporation and/or its affiliates. Other names may be trademarks of their respective owners. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. mysql> create database mydemo; Query OK, 1 row affected (0.00 sec) mysql> use mydemo Database changed mysql> create table wordcount ( word char(36) not null primary key, n int); Query OK, 0 rows affected (0.00 sec) mysql> exit Bye # now export $ sqoop export --connect jdbc:mysql://localhost/mydemo \ --table wordcount --export-dir /user/cloudera/HF.out \ --fields-terminated-by '\t' --username root
# very much the same as above, just a different jdbc connection # and different table name sqoop import --driver com.ibm.db2.jcc.DB2Driver \ --connect "jdbc:db2://192.168.1.131:50001/sample" \ --table staff --username db2inst1 \ --password db2inst1 -m 1 # Here is another example # in this case set the sqoop default schema to be different from # the user login schema sqoop import --driver com.ibm.db2.jcc.DB2Driver \ --connect "jdbc:db2://192.168.1.3:50001/sample:currentSchema=DB2INST1;" \ --table helloworld \ --target-dir "/user/cloudera/sqoopin2" \ --username marty \ -P -m 1 # the the schema name is CASE SENSITIVE # the -P option prompts for a password that will not be visible in # a "ps" listing
# import the customer table into Hive $ sqoop import --driver com.informix.jdbc.IfxDriver \ --connect \ "jdbc:informix-sqli://myhost:54321/stores_demo:informixserver=ifx;user=me;password=you" \ --table customer # now tell hive where to find the informix data # to get to the hive command prompt just type in hive $ hive Hive history file=/tmp/cloudera/yada_yada_log123.txt hive> # here is the hiveql you need to create the tables # using a file is easier than typing create external table customer ( cn int, fname string, lname string, company string, addr1 string, addr2 string, city string, state string, zip string, phone string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/user/cloudera/customer' ; # we already imported the db2 staff table above # now tell hive where to find the db2 data create external table staff ( id int, name string, dept string, job string, years string, salary float, comm float) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/user/cloudera/staff' ; # you can put the commands in a file # and execute them as follows: $ hive -f hivestaff Hive history file=/tmp/cloudera/hive_job_log_cloudera_201208101502_2140728119.txt OK Time taken: 3.247 seconds OK 10 Sanders 20 Mgr 7 98357.5 NULL 20 Pernal 20 Sales 8 78171.25 612.45 30 Marenghi 38 Mgr 5 77506.75 NULL 40 O'Brien 38 Sales 6 78006.0 846.55 50 Hanes 15 Mgr 10 80 ... lines deleted # now for the join we've all been waiting for :-) # this is a simple case, Hadoop can scale well into the petabyte range! $ hive Hive history file=/tmp/cloudera/hive_job_log_cloudera_201208101548_497937669.txt hive> select customer.cn, staff.name, > customer.addr1, customer.city, customer.phone > from staff join customer > on ( staff.id = customer.cn ); Total MapReduce jobs = 1 Launching Job 1 out of 1 Number of reduce tasks not specified. Estimated from input data size: 1 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=number In order to limit the maximum number of reducers: set hive.exec.reducers.max=number In order to set a constant number of reducers: set mapred.reduce.tasks=number Starting Job = job_201208101425_0005, Tracking URL = http://0.0.0.0:50030/jobdetails.jsp?jobid=job_201208101425_0005 Kill Command = /usr/lib/hadoop/bin/hadoop job -Dmapred.job.tracker=0.0.0.0:8021 -kill job_201208101425_0005 2012-08-10 15:49:07,538 Stage-1 map = 0%, reduce = 0% 2012-08-10 15:49:11,569 Stage-1 map = 50%, reduce = 0% 2012-08-10 15:49:12,574 Stage-1 map = 100%, reduce = 0% 2012-08-10 15:49:19,686 Stage-1 map = 100%, reduce = 33% 2012-08-10 15:49:20,692 Stage-1 map = 100%, reduce = 100% Ended Job = job_201208101425_0005 OK 110 Ngan 520 Topaz Way Redwood City 415-743-3611 120 Naughton 6627 N. 17th Way Phoenix 602-265-8754 Time taken: 22.764 seconds
$ pig grunt> staffdb2 = load 'staff' using PigStorage(',') >> as ( id, name, dept, job, years, salary, comm ); grunt> custifx2 = load 'customer' using PigStorage(',') as >> (cn, fname, lname, company, addr1, addr2, city, state, zip, phone) >> ; grunt> joined = join custifx2 by cn, staffdb2 by id; # to make pig generate a result set use the dump command # no work has happened up till now grunt> dump joined; 2012-08-11 21:24:51,848 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig features used in the script: HASH_JOIN 2012-08-11 21:24:51,848 [main] INFO org.apache.pig.backend.hadoop.executionengine .HExecutionEngine - pig.usenewlogicalplan is set to true. New logical plan will be used. HadoopVersion PigVersion UserId StartedAt FinishedAt Features 0.20.2-cdh3u4 0.8.1-cdh3u4 cloudera 2012-08-11 21:24:51 2012-08-11 21:25:19 HASH_JOIN Success! Job Stats (time in seconds): JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime MaxReduceTime MinReduceTime AvgReduceTime Alias Feature Outputs job_201208111415_0006 2 1 8 8 8 10 10 10 custifx,joined,staffdb2 HASH_JOIN hdfs://0.0.0.0/tmp/temp1785920264/tmp-388629360, Input(s): Successfully read 35 records from: "hdfs://0.0.0.0/user/cloudera/staff" Successfully read 28 records from: "hdfs://0.0.0.0/user/cloudera/customer" Output(s): Successfully stored 2 records (377 bytes) in: "hdfs://0.0.0.0/tmp/temp1785920264/tmp-388629360" Counters: Total records written : 2 Total bytes written : 377 Spillable Memory Manager spill count : 0 Total bags proactively spilled: 0 Total records proactively spilled: 0 Job DAG: job_201208111415_0006 2012-08-11 21:25:19,145 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success! 2012-08-11 21:25:19,149 [main] INFO org.apache.hadoop.mapreduce.lib. input.FileInputFormat - Total input paths to process : 1 2012-08-11 21:25:19,149 [main] INFO org.apache.pig.backend.hadoop. executionengine.util.MapRedUtil - Total input paths to process : 1 (110,Roy ,Jaeger ,AA Athletics ,520 Topaz Way ,null,Redwood City ,CA,94062,415-743-3611 ,110,Ngan,15,Clerk,5,42508.20,206.60) (120,Fred ,Jewell ,Century Pro Shop ,6627 N. 17th Way ,null,Phoenix ,AZ,85016,602-265-8754 ,120,Naughton,38,Clerk,null,42954.75,180.00)
# this is for CDH4, the CDH3 image doesn't have fuse installed... $ mkdir fusemnt $ sudo hadoop-fuse-dfs dfs://localhost:8020 fusemnt/ INFO fuse_options.c:162 Adding FUSE arg fusemnt/ $ ls fusemnt tmp user var $ ls fusemnt/user cloudera hive $ ls fusemnt/user/cloudera customer DS.txt.gz HF.out HF.txt orders staff $ cat fusemnt/user/cloudera/orders/part-m-00001 1007,2008-05-31,117,null,n,278693 ,2008-06-05,125.90,25.20,null 1008,2008-06-07,110,closed Monday ,y,LZ230 ,2008-07-06,45.60,13.80,2008-07-21 1009,2008-06-14,111,next door to grocery ,n,4745 ,2008-06-21,20.40,10.00,2008-08-21 1010,2008-06-17,115,deliver 776 King St. if no answer ,n,429Q ,2008-06-29,40.60,12.30,2008-08-22 1011,2008-06-18,104,express ,n,B77897 ,2008-07-03,10.40,5.00,2008-08-29 1012,2008-06-18,117,null,n,278701 ,2008-06-29,70.80,14.20,null
Flume - 创建一个加载就绪的文件
Flume next generation(也称为 flume-ng)是一个高速并行加载程序。数据库具有高速加载程序,那么,它们如何良好配合呢?Flume-ng 的关系型用例是在本地或远程创建一个加载就绪的文件,那么关系服务器就可以使用其高速加载程序。是的,此功能与 Sqoop 重叠,但在清单 21 中所示的脚本是根据客户专门针对这种风格的数据库加载的要求而创建的。
$ sudo yum install flume-ng $ cat flumeconf/hdfs2dbloadfile.conf # # started with example from flume-ng documentation # modified to do hdfs source to file sink # # Define a memory channel called ch1 on agent1 agent1.channels.ch1.type = memory # Define an exec source called exec-source1 on agent1 and tell it # to bind to 0.0.0.0:31313. Connect it to channel ch1. agent1.sources.exec-source1.channels = ch1 agent1.sources.exec-source1.type = exec agent1.sources.exec-source1.command =hadoop fs -cat /user/cloudera/orders/part-m-00001 # this also works for all the files in the hdfs directory # agent1.sources.exec-source1.command =hadoop fs # -cat /user/cloudera/tsortin/* agent1.sources.exec-source1.bind = 0.0.0.0 agent1.sources.exec-source1.port = 31313 # Define a logger sink that simply file rolls # and connect it to the other end of the same channel. agent1.sinks.fileroll-sink1.channel = ch1 agent1.sinks.fileroll-sink1.type = FILE_ROLL agent1.sinks.fileroll-sink1.sink.directory =/tmp # Finally, now that we've defined all of our components, tell # agent1 which ones we want to activate. agent1.channels = ch1 agent1.sources = exec-source1 agent1.sinks = fileroll-sink1 # now time to run the script $ flume-ng agent --conf ./flumeconf/ -f ./flumeconf/hdfs2dbloadfile.conf -n agent1 # here is the output file # don't forget to stop flume - it will keep polling by default and generate # more files $ cat /tmp/1344780561160-1 1007,2008-05-31,117,null,n,278693 ,2008-06-05,125.90,25.20,null 1008,2008-06-07,110,closed Monday ,y,LZ230 ,2008-07-06,45.60,13.80,2008-07-21 1009,2008-06-14,111,next door to ,n,4745 ,2008-06-21,20.40,10.00,2008-08-21 1010,2008-06-17,115,deliver 776 King St. if no answer ,n,429Q ,2008-06-29,40.60,12.30,2008-08-22 1011,2008-06-18,104,express ,n,B77897 ,2008-07-03,10.40,5.00,2008-08-29 1012,2008-06-18,117,null,n,278701 ,2008-06-29,70.80,14.20,null # jump over to dbaccess and use the greatest # data loader in informix: the external table # external tables were actually developed for # informix XPS back in the 1996 timeframe # and are now available in may servers # drop table eorders; create external table eorders (on char(10), mydate char(18), foo char(18), bar char(18), f4 char(18), f5 char(18), f6 char(18), f7 char(18), f8 char(18), f9 char(18) ) using (datafiles ("disk:/tmp/myfoo" ) , delimiter ","); select * from eorders;
# This sample is for CDH3 # untar the examples # CDH4 $ tar -zxvf /usr/share/doc/oozie-3.1.3+154/oozie-examples.tar.gz # CDH3 $ tar -zxvf /usr/share/doc/oozie-2.3.2+27.19/oozie-examples.tar.gz # cd to the directory where the examples live # you MUST put these jobs into the hdfs store to run them $ hadoop fs -put examples examples # start up the oozie server - you need to be the oozie user # since the oozie user is a non-login id use the following su trick # CDH4 $ sudo su - oozie -s /usr/lib/oozie/bin/oozie-sys.sh start # CDH3 $ sudo su - oozie -s /usr/lib/oozie/bin/oozie-start.sh # checkthe status oozie admin -oozie http://localhost:11000/oozie -status System mode: NORMAL # some jar housekeeping so oozie can find what it needs $ cp /usr/lib/sqoop/sqoop-1.3.0-cdh3u4.jar examples/apps/sqoop/lib/ $ cp /home/cloudera/Informix_JDBC_Driver/lib/ifxjdbc.jar examples/apps/sqoop/lib/ $ cp /home/cloudera/Informix_JDBC_Driver/lib/ifxjdbcx.jar examples/apps/sqoop/lib/ # edit the workflow.xml file to use your relational database: ################################# <command> import --driver com.informix.jdbc.IfxDriver --connect jdbc:informix-sqli://192.168.1.143:54321/stores_demo:informixserver=ifx117 --table orders --username informix --password useyours --target-dir /user/${wf:user()}/${examplesRoot}/output-data/sqoop --verbose<command> ################################# # from the directory where you un-tarred the examples file do the following: $ hrmr examples;hput examples examples # now you can run your sqoop job by submitting it to oozie $ oozie job -oozie http://localhost:11000/oozie -config \ examples/apps/sqoop/job.properties -run job: 0000000-120812115858174-oozie-oozi-W # get the job status from the oozie server $ oozie job -oozie http://localhost:11000/oozie -info 0000000-120812115858174-oozie-oozi-W Job ID : 0000000-120812115858174-oozie-oozi-W ----------------------------------------------------------------------- Workflow Name : sqoop-wf App Path : hdfs://localhost:8020/user/cloudera/examples/apps/sqoop/workflow.xml Status : SUCCEEDED Run : 0 User : cloudera Group : users Created : 2012-08-12 16:05 Started : 2012-08-12 16:05 Last Modified : 2012-08-12 16:05 Ended : 2012-08-12 16:05 Actions ---------------------------------------------------------------------- ID Status Ext ID Ext Status Err Code --------------------------------------------------------------------- 0000000-120812115858174-oozie-oozi-W@sqoop-node OK job_201208120930_0005 SUCCEEDED - -------------------------------------------------------------------- # how to kill a job may come in useful at some point oozie job -oozie http://localhost:11000/oozie -kill 0000013-120812115858174-oozie-oozi-W # job output will be in the file tree $ hcat /user/cloudera/examples/output-data/sqoop/part-m-00003 1018,2008-07-10,121,SW corner of Biltmore Mall ,n,S22942 ,2008-07-13,70.50,20.00,2008-08-06 1019,2008-07-11,122,closed till noon Mondays ,n,Z55709 ,2008-07-16,90.00,23.00,2008-08-06 1020,2008-07-11,123,express ,n,W2286 ,2008-07-16,14.00,8.50,2008-09-20 1021,2008-07-23,124,ask for Elaine ,n,C3288 ,2008-07-25,40.00,12.00,2008-08-22 1022,2008-07-24,126,express ,n,W9925 ,2008-07-30,15.00,13.00,2008-09-02 1023,2008-07-24,127,no deliveries after 3 p.m. ,n,KF2961 ,2008-07-30,60.00,18.00,2008-08-22 # if you run into this error there is a good chance that your # database lock file is owned by root $ oozie job -oozie http://localhost:11000/oozie -config \ examples/apps/sqoop/job.properties -run Error: E0607 : E0607: Other error in operation [<openjpa-1.2.1-r752877:753278 fatal store error> org.apache.openjpa.persistence.RollbackException: The transaction has been rolled back. See the nested exceptions for details on the errors that occurred.], {1} # fix this as follows $ sudo chown oozie:oozie /var/lib/oozie/oozie-db/db.lck # and restart the oozie server $ sudo su - oozie -s /usr/lib/oozie/bin/oozie-stop.sh $ sudo su - oozie -s /usr/lib/oozie/bin/oozie-start.sh
# enter the command line shell for hbase $ hbase shell HBase Shell; enter 'help<RETURN> for list of supported commands. Type "exit<RETURN> to leave the HBase Shell Version 0.90.6-cdh3u4, r, Mon May 7 13:14:00 PDT 2012 # create a table with a single column family hbase(main):001:0> create 'mytable', 'mycolfamily' # if you get errors from hbase you need to fix the # network config # here is a sample of the error: ERROR: org.apache.hadoop.hbase.ZooKeeperConnectionException: HBase is able to connect to ZooKeeper but the connection closes immediately. This could be a sign that the server has too many connections (30 is the default). Consider inspecting your ZK server logs for that error and then make sure you are reusing HBaseConfiguration as often as you can. See HTable's javadoc for more information. # fix networking: # add the eth0 interface to /etc/hosts with a hostname $ sudo su - # ifconfig | grep addr eth0 Link encap:Ethernet HWaddr 00:0C:29:8C:C7:70 inet addr:192.168.1.134 Bcast:192.168.1.255 Mask:255.255.255.0 Interrupt:177 Base address:0x1400 inet addr:127.0.0.1 Mask:255.0.0.0 [root@myhost ~]# hostname myhost [root@myhost ~]# echo "192.168.1.134 myhost" >gt; /etc/hosts [root@myhost ~]# cd /etc/init.d # now that the host and address are defined restart Hadoop [root@myhost init.d]# for i in hadoop* > do > ./$i restart > done # now try table create again: $ hbase shell HBase Shell; enter 'help<RETURN> for list of supported commands. Type "exit<RETURN> to leave the HBase Shell Version 0.90.6-cdh3u4, r, Mon May 7 13:14:00 PDT 2012 hbase(main):001:0> create 'mytable' , 'mycolfamily' 0 row(s) in 1.0920 seconds hbase(main):002:0> # insert a row into the table you created # use some simple telephone call log data # Notice that mycolfamily can have multiple cells # this is very troubling for DBAs at first, but # you do get used to it hbase(main):001:0> put 'mytable', 'key123', 'mycolfamily:number','6175551212' 0 row(s) in 0.5180 seconds hbase(main):002:0> put 'mytable', 'key123', 'mycolfamily:duration','25' # now describe and then scan the table hbase(main):005:0> describe 'mytable' DESCRIPTION ENABLED {NAME => 'mytable', FAMILIES => [{NAME => 'mycolfam true ily', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => ' 0', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'f alse', BLOCKCACHE => 'true'}]} 1 row(s) in 0.2250 seconds # notice that timestamps are included hbase(main):007:0> scan 'mytable' ROW COLUMN+CELL key123 column=mycolfamily:duration, timestamp=1346868499125, value=25 key123 column=mycolfamily:number, timestamp=1346868540850, value=6175551212 1 row(s) in 0.0250 seconds
# HBase includes a REST server $ hbase rest start -p 9393 & # you get a bunch of messages.... # get the status of the HBase server $ curl http://localhost:9393/status/cluster # lots of output... # many lines deleted... mytable,,1346866763530.a00f443084f21c0eea4a075bbfdfc292. stores=1 storefiless=0 storefileSizeMB=0 memstoreSizeMB=0 storefileIndexSizeMB=0 # now scan the contents of mytable $ curl http://localhost:9393/mytable/* # lines deleted 12/09/05 15:08:49 DEBUG client.HTable$ClientScanner: Finished with scanning at REGION => # lines deleted <?xml version="1.0" encoding="UTF-8" standalone="yes"?> <CellSet><Row key="a2V5MTIz"> <Cell timestamp="1346868499125" column="bXljb2xmYW1pbHk6ZHVyYXRpb24=">MjU=</Cell> <Cell timestamp="1346868540850" column="bXljb2xmYW1pbHk6bnVtYmVy">NjE3NTU1MTIxMg==</Cell> <Cell timestamp="1346868425844" column="bXljb2xmYW1pbHk6bnVtYmVy">NjE3NTU1MTIxMg==</Cell> </Row></CellSet> # the values from the REST interface are base64 encoded $ echo a2V5MTIz | base64 -d key123 $ echo bXljb2xmYW1pbHk6bnVtYmVy | base64 -d mycolfamily:number # The table scan above gives the schema needed to insert into the HBase table $ echo RESTinsertedKey | base64 UkVTVGluc2VydGVkS2V5Cg== $ echo 7815551212 | base64 NzgxNTU1MTIxMgo= # add a table entry with a key value of "RESTinsertedKey" and # a phone number of "7815551212" # note - curl is all on one line $ curl -H "Content-Type: text/xml" -d '<CellSet> <Row key="UkVTVGluc2VydGVkS2V5Cg=="> <Cell column="bXljb2xmYW1pbHk6bnVtYmVy">NzgxNTU1MTIxMgo=<Cell> <Row><CellSet> http://192.168.1.134:9393/mytable/dummykey 12/09/05 15:52:34 DEBUG rest.RowResource: POST http://192.168.1.134:9393/mytable/dummykey 12/09/05 15:52:34 DEBUG rest.RowResource: PUT row=RESTinsertedKey\x0A, families={(family=mycolfamily, keyvalues=(RESTinsertedKey\x0A/mycolfamily:number/9223372036854775807/Put/vlen=11)} # trust, but verify hbase(main):002:0> scan 'mytable' ROW COLUMN+CELL RESTinsertedKey\x0A column=mycolfamily:number,timestamp=1346874754883,value=7815551212\x0A key123 column=mycolfamily:duration, timestamp=1346868499125, value=25 key123 column=mycolfamily:number, timestamp=1346868540850, value=6175551212 2 row(s) in 0.5610 seconds # notice the \x0A at the end of the key and value # this is the newline generated by the "echo" command # lets fix that $ printf 8885551212 | base64 ODg4NTU1MTIxMg== $ printf mykey | base64 bXlrZXk= # note - curl statement is all on one line! curl -H "Content-Type: text/xml" -d '<CellSet><Row key="bXlrZXk="> <Cell column="bXljb2xmYW1pbHk6bnVtYmVy">ODg4NTU1MTIxMg==<Cell> <Row><CellSet> http://192.168.1.134:9393/mytable/dummykey # trust but verify hbase(main):001:0> scan 'mytable' ROW COLUMN+CELL RESTinsertedKey\x0A column=mycolfamily:number,timestamp=1346875811168,value=7815551212\x0A key123 column=mycolfamily:duration, timestamp=1346868499125, value=25 key123 column=mycolfamily:number, timestamp=1346868540850, value=6175551212 mykey column=mycolfamily:number, timestamp=1346877875638, value=8885551212 3 row(s) in 0.6100 seconds