對(duì)HDFS上的文件進(jìn)行上傳和下載是對(duì)集群的基本操作,在《HADOOP權(quán)威指南》一書中,對(duì)文件的上傳和下載都有代碼的實(shí)例,但是對(duì)如何配置HADOOP客戶端卻是沒有講得很清楚,經(jīng)過長(zhǎng)時(shí)間的搜索和調(diào)試,總結(jié)了一下,如何配置使用集群的方法,以及自己測(cè)試可用的對(duì)集群上的文件進(jìn)行操作的程序。首先,需要配置對(duì)應(yīng)的環(huán)境變量:
hadoop_HOME="/home/work/tools/java/hadoop-client/hadoop"
for f in $hadoop_HOME/hadoop-*.jar; do
hadoop_CLASSPATH=${hadoop_CLASSPATH}:$f
done
for f in $hadoop_HOME/lib/*.jar; do
hadoop_CLASSPATH=${hadoop_CLASSPATH}:$f
done
hadoopvfs_HOME="/home/work/tools/java/hadoop-client/hadoop-vfs"
for f in $hadoopvfs_HOME/lib/*.jar; do
hadoop_CLASSPATH=${hadoop_CLASSPATH}:$f
done
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/home/work/tools/java/hadoop-client/hadoop/lib/native/Linux-amd64-64/
其中LD_LIBRARY_PATH是在調(diào)用時(shí)需要用到的庫(kù)的路徑,hadoop_CLASSPATH則是我們hadoop客戶端里各種jar包
有一點(diǎn)需要注意的是最好不要使用HADOOP_HOME這個(gè)變量,這個(gè)是一個(gè)系統(tǒng)使用的環(huán)境變量,最好不要和它沖突
編譯類的方法:
javac -classpath $CLASSPATH:$hadoop_CLASSPATH HDFSUtil.java
運(yùn)行的方法:
java -classpath $CLASSPATH:$hadoop_CLASSPATH HDFSUtil
但是在實(shí)際的使用過程中,會(huì)報(bào)No Permission之類的錯(cuò)誤,或者你能保證代碼沒有問題的情況下,在運(yùn)行的時(shí)候也會(huì)報(bào)一些奇奇怪怪的錯(cuò)誤
那么問題來了,這是什么鬼?
答案:這是因?yàn)闆]有配置對(duì)應(yīng)集群的配置文件
因?yàn)樵凇禜ADOOP權(quán)威指南》一書中,弱化了配置的東西,所以在具體使用集群的時(shí)候就會(huì)出現(xiàn)問題,如何解決呢,這樣子:
this.conf = new Configuration(false);
conf.addResource("./hadoop-site.xml");
conf.addResource("./hadoop-default.xml");
conf.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
為什么會(huì)這樣,書上只是很簡(jiǎn)單的:
this.conf = new Configuration();
那是因?yàn)槟J(rèn)你的集群在本地,所以不需要做配置,但是在實(shí)際使用的過程中,各個(gè)集群的配置是不同的,所以我們要引入集群的配置
這是非常重要的一點(diǎn),因?yàn)閷?shí)際使用的過程中我們都是使用的HADOOP的客戶端,而且是已經(jīng)搭好環(huán)境的集群,所以我們需要做好本地的配置
hadoop-site.xml和hadoop-default.xml這兩個(gè)文件在所使用的客戶端的conf目錄下,在addResource的時(shí)候指定好目錄就行了
將以上所提到的配置,全部配完之后,這個(gè)程序才能真正運(yùn)行起來,所以配置是非常重要的一環(huán)。
以下是對(duì)應(yīng)的工具的代碼,有興趣的看一下吧,使用的是文件流的方式來搞的,這樣子也可以打通FTP和HDFS之間文件的互傳:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
|
import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.URI; import java.net.URL; import java.io.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.Progressable; public class HDFSUtil { private String hdfs_node = "" ; private String hdfs_path = "" ; private String file_path = "" ; private String hadoop_site = "" ; private String hadoop_default = "" ; private Configuration conf = null ; public HDFSUtil(String hdfs_node) { this .hdfs_node = hdfs_node; } public String getHdfsNode() { return this .hdfs_node; } public void setHdfsPath(String hdfs_path){ this .hdfs_path = hdfs_path; } public String getHdfsPath(){ return this .hdfs_path; } public void setFilePath(String file_path){ this .file_path = file_path; } public String getFilePath(){ return this .file_path; } public void setHadoopSite(String hadoop_site){ this .hadoop_site = hadoop_site; } public String getHadoopSite(){ return this .hadoop_site; } public void setHadoopDefault(String hadoop_default){ this .hadoop_default = hadoop_default; } public String getHadoopDefault(){ return this .hadoop_default; } public int setConfigure( boolean flag) { if (flag == false ){ if ( this .getHadoopSite() == "" || this .getHadoopDefault() == "" ){ return - 1 ; } else { this .conf = new Configuration( false ); conf.addResource( this .getHadoopDefault()); conf.addResource( this .getHadoopSite()); conf.set( "fs.hdfs.impl" , org.apache.hadoop.hdfs.DistributedFileSystem. class .getName()); conf.set( "fs.file.impl" , org.apache.hadoop.fs.LocalFileSystem. class .getName()); return 0 ; } } this .conf = new Configuration(); return 0 ; } public Configuration getConfigure() { return this .conf; } public int upLoad(String localName, String remoteName) throws FileNotFoundException, IOException { InputStream inStream = null ; FileSystem fs = null ; try { inStream = new BufferedInputStream( new FileInputStream(localName)); fs = FileSystem.get(URI.create( this .hdfs_node), this .conf); OutputStream outStream = fs.create( new Path(remoteName) , new Progressable() { public void progress(){ System.out.print( '.' ); } }); IOUtils.copyBytes(inStream, outStream, 4096 , true ); inStream.close(); return 0 ; } catch (IOException e){ inStream.close(); e.printStackTrace(); return - 1 ; } } public int upLoad(InputStream inStream, String remoteName) throws FileNotFoundException, IOException { FileSystem fs = null ; try { fs = FileSystem.get(URI.create( this .hdfs_node), this .conf); OutputStream outStream = fs.create( new Path(remoteName) , new Progressable() { public void progress(){ System.out.print( '.' ); } }); IOUtils.copyBytes(inStream, outStream, 4096 , true ); inStream.close(); return 0 ; } catch (IOException e){ inStream.close(); e.printStackTrace(); return - 1 ; } } public int donwLoad(String remoteName, String localName, int lines) throws FileNotFoundException, IOException { FileOutputStream fos = null ; InputStreamReader isr = null ; BufferedReader br = null ; String str = null ; OutputStreamWriter osw = null ; BufferedWriter buffw = null ; PrintWriter pw = null ; FileSystem fs = null ; InputStream inStream = null ; try { fs = FileSystem.get(URI.create( this .hdfs_node + remoteName), this .conf); inStream = fs.open( new Path( this .hdfs_node + remoteName)); fos = new FileOutputStream(localName); osw = new OutputStreamWriter(fos, "UTF-8" ); buffw = new BufferedWriter(osw); pw = new PrintWriter(buffw); isr = new InputStreamReader(inStream, "UTF-8" ); br = new BufferedReader(isr); while ((str = br.readLine()) != null && lines > 0 ){ lines--; pw.println(str); } } catch (IOException e){ throw new IOException( "Couldn't write." , e); } finally { pw.close(); buffw.close(); osw.close(); fos.close(); inStream.close() } return 0 ; } //main to test public static void main(String[] args){ String hdfspath = null ; String localname = null ; String hdfsnode = null ; int lines = 0 ; if (args.length == 4 ){ hdfsnode = args[ 0 ]; hdfspath = args[ 1 ]; localname = args[ 2 ]; lines = Integer.parseInt(args[ 3 ]); } else { hdfspath = "/app/ps/spider/wdmqa/wangweilong/test/HDFSUtil.java" ; localname = "/home/work/workspace/project/dhc2-0/dhc/base/ftp/papapa" ; lines = 5 ; } HDFSUtil hdfsutil = new HDFSUtil(hdfsnode); hdfsutil.setFilePath(hdfsutil.getHdfsNode()+hdfspath); hdfsutil.setHadoopSite( "./hadoop-site.xml" ); hdfsutil.setHadoopDefault( "./hadoop-default.xml" ); hdfsutil.setConfigure( false ); try { hdfsutil.donwLoad(hdfspath, localname, lines); } catch (IOException e){ e.printStackTrace(); } } |
如果想要了解FTP上文件的下載,請(qǐng)參考這篇文章:
如果想要打通FTP和HDFS文件互傳,只要?jiǎng)?chuàng)建一個(gè)類,調(diào)用這兩篇文章中的工具的接口就可以搞定,自己寫的代碼,實(shí)測(cè)有效。
以上就是本文的全部?jī)?nèi)容了,希望能夠?qū)Υ蠹沂炀氄莆誮ava有所幫助。
請(qǐng)您花一點(diǎn)時(shí)間將文章分享給您的朋友或者留下評(píng)論。我們將會(huì)由衷感謝您的支持!