假設今有2 Jobs, 皆讀取相同格式之資料,但是每個Job 處理的方式與輸出資料皆不相同。
Job 1: Source Data A→ Map → Reduce → Output B
Job 2: Source Data A→ Map → Reduce → Output C
若今天以Job1 → Job 2的順序,分別需要 20 seconds, 30 seconds。則總時間需要 50 seconds。是否能夠平行處理的技巧,達到Performance的提升?
利用Multithreads 讓Job1與Job2 同時進行。期望透過平行處理的方式,讓工作能夠在30~50秒內做完,提升處理速度。
伺服器2台:cloud1, cloud2。
CPU: 雙路AMD 雙核心
硬碟:1TB 7200 rpm.
Hadoop 0.20.x,master node為cloud1,slave node為cloud1與cloud2。
MySQL Database運行在cloud2。
資料來源1 → WordCount ( MapReduce) →以資料格式1 輸出到MySQL Server
資料來源1 → WordCount ( MapReduce) →以資料格式2輸出到MySQL Server
資料集:自己產生之資料集,大小為556 KB。
Job1執行時間(秒) | Job2執行時間(秒) | 總執行時間 | |
Job1與Job2循序執行 | 21 | 21 | 42 |
Job1與Job2採多執行緒執行 | 21 | 21 | 21 |
以上述小型測試發現,Multi-Thread Job in Hadoop或許是一個不錯的並行加速方案。
import java.io.IOException; import java.util.*; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*; import java.io.DataInput; import java.io.DataOutput; import java.lang.Thread; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import org.apache.hadoop.mapred.lib.db.DBWritable; import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mapred.lib.db.DBConfiguration; import org.apache.hadoop.mapred.lib.db.DBOutputFormat; import org.apache.hadoop.mapred.lib.db.DBInputFormat; /* MySQL DB Schema: DROP TABLE IF EXISTS `WordCount`.`Counting`; CREATE TABLE `WordCount`.`Counting` ( `name` char(48) default NULL, `count` int(11) default NULL ) ENGINE=InnoDB DEFAULT CHARSET=latin1; */ public class DBWordCount extends Thread { public void run() //throws Exception { /* Start ! */ try { JobClient.runJob(conf); } catch(Exception e) { // do nothing } } public void fnSetJob1(String[] args) throws Exception { conf.setJobName("MySQL DB Wordcount Job1"); Class.forName("com.mysql.jdbc.Driver"); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(DBOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); // Set up your host name and account String[] MyDBPath={"jdbc:mysql://MySQL主機位置:3306/WordCount","帳號", "密碼"}; DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",MyDBPath[0], MyDBPath[1], MyDBPath[2]); // Setup Output MySQL Format DBOutputFormat.setOutput(conf, "Counting","name", "count"); // Set Mapper and Reducer Class conf.setMapperClass(Map.class); //conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class); conf.setMapOutputKeyClass(Text.class); conf.setMapOutputValueClass(IntWritable.class); conf.setOutputKeyClass(WordCountInfoRecord.class); conf.setOutputValueClass(NullWritable.class); } public void fnSetJob2(String[] args) throws Exception { //JobConf conf = new JobConf(DBWordCount.class); conf.setJobName("MySQL DB Wordcount Job2"); Class.forName("com.mysql.jdbc.Driver"); // Set up your host name and account String[] MyDBPath={"jdbc:mysql://MySQL主機位置:3306/WordCount","帳號", "密碼"}; conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(DBOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); // Setup MySQL Connection , default account:root , no password DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",MyDBPath[0], MyDBPath[1], MyDBPath[2]); // Setup Output MySQL Format DBOutputFormat.setOutput(conf, "Counting","name", "count"); // Set Mapper and Reducer Class conf.setMapperClass(Map.class); //conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce2.class); // I've tried all combinations , but the bug still happen. conf.setMapOutputKeyClass(Text.class); conf.setMapOutputValueClass(IntWritable.class); conf.setOutputKeyClass(WordCountInfoRecord.class); conf.setOutputValueClass(NullWritable.class); } JobConf conf = new JobConf(DBWordCount.class); //JobConf conf2 = new JobConf(DBWordCount.class); // Output Record Object static class WordCountInfoRecord implements Writable, DBWritable { public String name; public int count; public WordCountInfoRecord() { } public WordCountInfoRecord(String str, int c) { this.name = str; this.count = c; } public void readFields(DataInput in) throws IOException { this.name = Text.readString(in); this.count = in.readInt(); } public void write(DataOutput out) throws IOException { Text.writeString(out, this.name); out.writeInt(this.count); } public void readFields(ResultSet result) throws SQLException { this.name = result.getString(1); this.count = result.getInt(2); } public void write(PreparedStatement stmt) throws SQLException { stmt.setString(1, this.name); stmt.setInt(2, this.count); } public String toString() { return new String(this.name + " " + this.count); } } public static class Map extends MapReduceBase implements Mapper{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key, Text value, OutputCollector output, Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); output.collect(word, one); } } } public static class Reduce extends MapReduceBase implements Reducer { public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } // Output Data into MySQL output.collect(new WordCountInfoRecord(key.toString(),sum), NullWritable.get()); } } public static class Reduce2 extends MapReduceBase implements Reducer { public void reduce(Text key, Iterator values, OutputCollector output, Reporter reporter) throws IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } // Output Data into MySQL output.collect(new WordCountInfoRecord("Job2_"+key.toString(),sum), NullWritable.get()); } } public static void main(String[] args) throws Exception { DBWordCount thread1=new DBWordCount(); // Set Thread1 thread1.fnSetJob1(args); DBWordCount thread2=new DBWordCount(); // Set Thread2 thread2.fnSetJob2(args); // Thread 1 Start thread1.start(); // Thread 2 Start thread2.start(); } }