前言
我想,作為開發人員,即便你不是大資料開發工程師,也應該聽說過Hadoop。而且我認為,不論你是從事哪個開發崗位,都應該對它有所瞭解,因為Hadoop的思想已經滲透在眾多技術中。
如果你之前對Hadoop還不太熟悉,希望透過本文對Hadoop的介紹、基本原理以及簡單的實踐案例讓你瞭解它,並且在你之後的工作中有所幫助。
Hadoop :我可以處理海量資料
什麼是 Hadoop?怎麼處理大資料?在這裏給沒有概念的小夥伴介紹一下。
如上圖,Hadoop 其實就是一個用 Java 編寫的專案,包含了 HDFS、MapReduce 以及 Yarn 這幾個核心元件。本文重點介紹 HDFS 和 MapReduce,Yarn 作為資源排程元件,本文就不重點說明了。
HDFS : 儲存歸我
HDFS 全稱為分散式檔案系統 (Hadoop Distributed File System),在大資料領域中用來儲存海量的資料。
沒錯,只要是涉及到分散式,任何一個大問題都能分成多個小問題一一解決,即分而治之。
HDFS 的分而治之就是將一個體量巨大的檔案切分成多個檔案(資料塊)分佈在不同伺服器上儲存。這樣即便是TB到PB級別的大檔案,也不會對儲存空間以及IO效能有太大影響。下面是 HDFS 的架構。
簡單來說,一個大檔案分佈在不同節點一定需要統一管理,這裏的節點在 HDFS 中就是 DataNode ,而統一管理的就是 NameNode。爲了不偏離文章主題,這裏不對此深入說明。
MapReduce :計算歸我
在 MapReduce 沒有出現之前,計算都是單機。如果是對大量資料進行計算,那對記憶體和CPU都需要特別高的要求。在2000年代主流記憶體容量最多為幾十兆到幾百兆,顯然在單機處理不了大量資料的計算。
那 MapReduce 是怎麼處理的?
既然大檔案可以基於 HDFS 分佈在不同節點,那計算程式也分佈到這些節點計算,最後把結果彙總不就好了?
MapReduce 的核心理念就是這樣的:“相比移動資料,移動計算更為經濟高效。”,這也影響了後續眾多的技術。
MapReduce 的計算流程大概如下圖,有分散式系統開發經驗的小夥伴應該不難理解。
對 Hadoop 有了基本概念後,下面用一個非常典型的面試題進行實操,看看如何基於Hadoop處理“統計10G大小的檔案中字元出現的次數”。
Hadoop 環境搭建
Hadoop本身就是一個Java專案,所以,對於Javaer來說搭建起來也比較容易。下面是搭建步驟:
前往Hadoop官方下載二進制包並在本地解壓。
配置環境變數,其實就是指向到Hadoop的幾個目錄,大概是這樣:
export JAVA_HOME=/xxx export HADOOP_HOME=/xxx export HADOOP_HOME=/xxx export PATH=$PATH:$JAVA_HOME/bin:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar:$HADOOP_HOME/bin:$HADOOP_HOME/sbin export HADOOP_MAPRED_HOME=$HADOOP_HOME export HADOOP_COMMON_HOME=$HADOOP_HOME export HADOOP_HDFS_HOME=$HADOOP_HOME export YARN_HOME=$HADOOP_HOME
Hadoop是透過SSH進行節點間的通訊和管理,所以各節點需要開啟遠程登錄。以Mac系統為例,需要在下圖中開啟遠程登錄並且開啟免密碼登入。
免密碼登入需要執行以下命令建立秘鑰,並放在ssh授權目錄。
ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
配置 Hadoop 包下
./etc/hadoop
的幾個檔案export JAVA_HOME=/xxx
<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://localhost:9000</value> </property> <property> <name>hadoop.tmp.dir</name> <value>file:/xxx/tmp</value> </property> </configuration>
<configuration> <property> <name>dfs.replication</name> <value>1</value> </property> <property> <name>dfs.permissions</name> <value>false</value> </property> <property> <name>dfs.namenode.name.dir</name> <value>file:/xxx/namenode</value> </property> <property> <name>dfs.datanode.data.dir</name> <value>file:/xxx/datanode</value> </property> </configuration>
<configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> <property> <name>mapreduce.application.classpath</name> <value>/xxx</value> </property> <property> <name>mapreduce.shuffle.enable.ssl</name> <value>false</value> </property> </configuration>
<configuration> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> </configuration>
yarn-site.xml 配置
mapred-site.xml 配置MapReduce任務排程平臺
hdfs-site.xml 配置資料儲存相關內容
core-site.xml 配置 HDFS 服務釋出地址和臨時檔案目錄
hadoop-env.sh 配置 JDK 環境
在 Hadoop 包下的./bin目錄下執行下面的命令初始化 hdfs
./hdfs namenode -format
在 Hadoop 包下的
./sbin
目錄下執行下面的命令啟動hdfs和yarn。./start-dfs.sh ./start-yarn.sh
啟動成功後分別透過127.0.0.1:9870 、127.0.0.1:8088 訪問,Hadoop就算搭建完成了。
統計10G大小的檔案中字元出現的次數
準備10G的檔案
我這裏使用python指令碼生成了以空格分割單詞的檔案,內容大概如下:
然後多次執行命令 cat random_words_in_chunks.txt >> 10G_words.txt
快速將檔案擴大至10G。
上傳至HDFS
透過執行下面的命令將這10G的檔案上傳到 HDFS 的test目錄。
#建立檔案目錄 hadoop fs -mkdir -p hdfs://127.0.0.1:9000/test #上傳檔案 hadoop fs -put 10G_words.txt /test
執行成功後就可以透過 Web UI介面看到資料。
編寫 MapReduce 程式
之所以稱之為 MapReduce,是因為 MapReduce 將計算分為兩個階段,一個Map階段,一個Reduce階段。
我們要統計字元出現的次數,只需要關注Map和Reduce分別做什麼操作。
透過Map階段可以將字元轉換成我們熟悉的鍵值對,key為字元,value為數字,例如(“字元1”,1)。
透過Reduce階段可以將同樣key的value進行疊加,從而達到目的。
接下來看程式碼的具體編寫。
建立一個Maven專案,引入相關依賴。
<dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <version>3.3.6</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.3.6</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs-client</artifactId> <version>3.3.6</version> </dependency> </dependencies>
MapReduce為我們提供了Mapper類來完成Map階段的計算任務,所以,我們只需要繼承Mapper類編寫邏輯即可。程式碼如下
public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 的泛型分別是計算任務的輸入的鍵值對型別以及輸出的鍵值對型別。
Map階段的輸入還只是讀取的一行字元,所以需要透過StringTokenizer將這些字元以空格進行分割得到每個字元,然後遍歷轉化為("字元",1)的鍵值對作為輸出。
本次上傳到 HDFS 的10G檔案被分割為77個數據塊,那麼最終會有77個Map計算任務以及結果輸出。
同樣,MapReduce為我們提供了Reducer類來完成Reduce階段的計算任務,所以,我們只需要繼承Reducer類編寫邏輯即可。程式碼如下
public class IntSumReducer extends Reducer<Text, IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 的泛型同Mapper。
不同Map階段輸出的相同key最終會被同一個Reduce任務作為輸入,在編碼時需要將相同輸出的值進行疊加,最終完成每個字元出現的次數。
除了要編寫Mapper和Reducer類,還需要寫一個程式入口,即main方法。程式碼最終會編譯為jar包交給yarn,最終會有多少任務,資源的申請以及任務的分發都在這個類完成。程式碼如下:
public class WordCount { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); for (int i = 0; i < otherArgs.length - 1; ++i) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
啟動分散式計算
以上的程式碼編寫完後,將程式打成jar包,然後執行下面的命令就可以啟動分散式計算。
hadoop jar mr-wordcount-1.0-SNAPSHOT.jar WordCount /test /testresult
命令中的 WordCount
是 main 方法類。後面的兩個路徑是HDFS的目錄,一個代表要計算的資料在哪裏,一個程式碼計算後的結果存在哪裏。
命令執行後,可以透過yarn的 web ui 或者控制檯看到任務分發和執行。
計算過程就如下圖一樣,程式被分發到資料所在的節點,然後計算結果最後進行彙總。
MapReduce 任務執行完成後,在 hdfs 的 web ui 介面就可以看到計算結果。
至此,10G檔案字元統計就透過Hadoop的分而治之快速完成。而作為開發人員來講,只需要編寫兩段極簡單的程式碼,不要考慮資源如何分配、程式如何移動、計算結果如何聚合就完成了一個分散式系統。
總結
Hadoop是基於Java開發的專案,所以對於Javaer來說容易上手。
Hadoop 的出現大大降低了分散式程式設計的難度,它不單單是一個專案,更重要的是它的思想和理念:“分而治之”和“移動計算”。
Hadoop 的影響不僅僅是大資料領域,在它之後的分散式資料庫、微服務等等都受其影響。
作者:王二蛋呀
連結:https://juejin.cn/post/7437158688688947254