激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - Java/Web調用Hadoop進行MapReduce示例代碼

Java/Web調用Hadoop進行MapReduce示例代碼

2021-02-22 11:14土豆拍死馬鈴薯 Java教程

本篇文章主要介紹了Java/Web調用Hadoop進行MapReduce示例代碼,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧

hadoop環(huán)境搭建詳見此文章 http://www.zmynmublwnt.cn/article/140756.html 。

我們已經(jīng)知道hadoop能夠通過hadoop jar ***.jar input output的形式通過命令行來調用,那么如何將其封裝成一個服務,讓java/web來調用它?使得用戶可以用方便的方式上傳文件到hadoop并進行處理,獲得結果。首先,***.jar是一個hadoop任務類的封裝,我們可以在沒有jar的情況下運行該類的main方法,將必要的參數(shù)傳遞給它。input 和output則將用戶上傳的文件使用hadoop的javaapi put到hadoop的文件系統(tǒng)中。然后再通過hadoop的javaapi 從文件系統(tǒng)中取得結果文件。

搭建javaweb工程。本文使用spring、springmvc、mybatis框架, 當然,這不是重點,就算沒有使用任何框架也能實現(xiàn)。

項目框架如下:

Java/Web調用Hadoop進行MapReduce示例代碼

項目中使用到的jar包如下:

Java/Web調用Hadoop進行MapReduce示例代碼Java/Web調用Hadoop進行MapReduce示例代碼

在spring的配置文件中,加入

?
1
2
3
4
5
<bean id="multipartresolver" class="org.springframework.web.multipart.commons.commonsmultipartresolver">
   <property name="defaultencoding" value="utf-8" />
   <property name="maxuploadsize" value="10485760000" />
   <property name="maxinmemorysize" value="40960" />
</bean>

使得項目支持文件上傳。

新建一個login.jsp 點擊登錄后進入user/login

Java/Web調用Hadoop進行MapReduce示例代碼

user/login中處理登錄,登錄成功后,【在hadoop文件系統(tǒng)中創(chuàng)建用戶文件夾】,然后跳轉到console.jsp

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package com.chenjie.controller;
 
import java.io.ioexception;
  
import javax.annotation.resource;
 
import javax.servlet.http.httpservletrequest;
 
import javax.servlet.http.httpservletresponse;
 
import org.apache.hadoop.conf.configuration;
 
import org.apache.hadoop.fs.filesystem;
 
import org.apache.hadoop.fs.path;
 
import org.springframework.stereotype.controller;
 
import org.springframework.web.bind.annotation.requestmapping;
 
import com.chenjie.pojo.jsonresult;
 
import com.chenjie.pojo.user;
 
import com.chenjie.service.userservice;
 
import com.chenjie.util.appconfig;
 
import com.google.gson.gson;
/**
 
 * 用戶請求控制器
 
 *
 
 * @author chen
 
 *
 
 */
 
@controller
 
// 聲明當前類為控制器
 
@requestmapping("/user")
 
// 聲明當前類的路徑
 
public class usercontroller {
 
  @resource(name = "userservice")
 
  private userservice userservice;// 由spring容器注入一個userservice實例
  /**
 
   * 登錄
 
   *
 
   * @param user
 
   *      用戶
 
   * @param request
 
   * @param response
 
   * @throws ioexception
 
   */
 
  @requestmapping("/login")
 
  // 聲明當前方法的路徑
 
  public string login(user user, httpservletrequest request,
 
      httpservletresponse response) throws ioexception {
 
    response.setcontenttype("application/json");// 設置響應內(nèi)容格式為json
 
    user result = userservice.login(user);// 調用userservice的登錄方法
 
    request.getsession().setattribute("user", result);
 
    if (result != null) {
 
      createhadoopfsfolder(result);
 
      return "console";
 
    }
 
    return "login";
 
  }
 
  public void createhadoopfsfolder(user user) throws ioexception {
 
    configuration conf = new configuration();
 
    conf.addresource(new path("/opt/hadoop-1.2.1/conf/core-site.xml"));
 
    conf.addresource(new path("/opt/hadoop-1.2.1/conf/hdfs-site.xml"));
 
 
 
    filesystem filesystem = filesystem.get(conf);
 
    system.out.println(filesystem.geturi());
 
 
 
    path file = new path("/user/" + user.getu_username());
 
    if (filesystem.exists(file)) {
 
      system.out.println("haddop hdfs user foler exists.");
 
      filesystem.delete(file, true);
 
      system.out.println("haddop hdfs user foler delete success.");
 
    }
 
    filesystem.mkdirs(file);
 
    system.out.println("haddop hdfs user foler creat success.");
 
  }
}

console.jsp中進行文件上傳和任務提交、

Java/Web調用Hadoop進行MapReduce示例代碼

文件上傳和任務提交:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
package com.chenjie.controller;
 
import java.io.file;
import java.io.ioexception;
import java.net.inetsocketaddress;
import java.net.uri;
import java.util.arraylist;
import java.util.iterator;
import java.util.list;
 
import javax.servlet.http.httpservletrequest;
import javax.servlet.http.httpservletresponse;
 
import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.fsdatainputstream;
import org.apache.hadoop.fs.filesystem;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.mapred.jobclient;
import org.apache.hadoop.mapred.jobconf;
import org.apache.hadoop.mapred.jobid;
import org.apache.hadoop.mapred.jobstatus;
import org.apache.hadoop.mapred.runningjob;
import org.springframework.stereotype.controller;
import org.springframework.web.bind.annotation.requestmapping;
import org.springframework.web.multipart.multipartfile;
import org.springframework.web.multipart.multiparthttpservletrequest;
import org.springframework.web.multipart.commons.commonsmultipartresolver;
 
import com.chenjie.pojo.user;
import com.chenjie.util.utils;
 
@controller
// 聲明當前類為控制器
@requestmapping("/hadoop")
// 聲明當前類的路徑
public class hadoopcontroller {
 
  @requestmapping("/upload")
  // 聲明當前方法的路徑
  //文件上傳
  public string upload(httpservletrequest request,
      httpservletresponse response) throws ioexception {
    list<string> filelist = (list<string>) request.getsession()
        .getattribute("filelist");//得到用戶已上傳文件列表
    if (filelist == null)
      filelist = new arraylist<string>();//如果文件列表為空,則新建
    user user = (user) request.getsession().getattribute("user");
    if (user == null)
      return "login";//如果用戶未登錄,則跳轉登錄頁面
    commonsmultipartresolver multipartresolver = new commonsmultipartresolver(
        request.getsession().getservletcontext());//得到在spring配置文件中注入的文件上傳組件
    if (multipartresolver.ismultipart(request)) {//如果請求是文件請求
      multiparthttpservletrequest multirequest = (multiparthttpservletrequest) request;
 
      iterator<string> iter = multirequest.getfilenames();//得到文件名迭代器
      while (iter.hasnext()) {
        multipartfile file = multirequest.getfile((string) iter.next());
        if (file != null) {
          string filename = file.getoriginalfilename();
          file folder = new file("/home/chenjie/cjhadooponline/"
              + user.getu_username());
          if (!folder.exists()) {
            folder.mkdir();//如果文件不目錄存在,則在服務器本地創(chuàng)建
          }
          string path = "/home/chenjie/cjhadooponline/"
              + user.getu_username() + "/" + filename;
 
          file localfile = new file(path);
 
          file.transferto(localfile);//將上傳文件拷貝到服務器本地目錄
          // filelist.add(path);
        }
        handleuploadfiles(user, filelist);//處理上傳文件
      }
 
    }
    request.getsession().setattribute("filelist", filelist);//將上傳文件列表保存在session中
    return "console";//返回console.jsp繼續(xù)上傳文件
  }
 
  @requestmapping("/wordcount")
  //調用hadoop進行mapreduce
  public void wordcount(httpservletrequest request,
      httpservletresponse response) {
    system.out.println("進入controller wordcount ");
    user user = (user) request.getsession().getattribute("user");
    system.out.println(user);
    // if(user == null)
    // return "login";
    wordcount c = new wordcount();//新建單詞統(tǒng)計任務
    string username = user.getu_username();
    string input = "hdfs://chenjie-virtual-machine:9000/user/" + username
        + "/wordcountinput";//指定hadoop文件系統(tǒng)的輸入文件夾
    string output = "hdfs://chenjie-virtual-machine:9000/user/" + username
        + "/wordcountoutput";//指定hadoop文件系統(tǒng)的輸出文件夾
    string reslt = output + "/part-r-00000";//默認輸出文件
    try {
      thread.sleep(3*1000);
      c.main(new string[] { input, output });//調用單詞統(tǒng)計任務
      configuration conf = new configuration();//新建hadoop配置
      conf.addresource(new path("/opt/hadoop-1.2.1/conf/core-site.xml"));//添加hadoop配置,找到hadoop部署信息
      conf.addresource(new path("/opt/hadoop-1.2.1/conf/hdfs-site.xml"));//hadoop配置,找到文件系統(tǒng)
 
      filesystem filesystem = filesystem.get(conf);//得打文件系統(tǒng)
      path file = new path(reslt);//找到輸出結果文件
      fsdatainputstream instream = filesystem.open(file);//打開
      uri uri = file.touri();//得到輸出文件路徑
      system.out.println(uri);
      string data = null;
      while ((data = instream.readline()) != null) {
        //system.out.println(data);
        response.getoutputstream().println(data);//講結果文件寫回用戶網(wǎng)頁
      }
//     inputstream in = filesystem.open(file);
//     outputstream out = new fileoutputstream("result.txt");
//     ioutils.copybytes(in, out, 4096, true);
      instream.close();
    } catch (exception e) {
      system.err.println(e.getmessage());
    }
  }
 
  @requestmapping("/mapreducestates")
  //得到mapreduce的狀態(tài)
  public void mapreduce(httpservletrequest request,
      httpservletresponse response) {
    float[] progress=new float[2];
    try {
      configuration conf1=new configuration();
      conf1.set("mapred.job.tracker", utils.jobtracker);
       
      jobstatus jobstatus = utils.getjobstatus(conf1);
//     while(!jobstatus.isjobcomplete()){
//       progress = utils.getmapreduceprogess(jobstatus);
//       response.getoutputstream().println("map:" + progress[0] + "reduce:" + progress[1]);
//       thread.sleep(1000);
//     }
      jobconf jc = new jobconf(conf1);
       
      jobclient jobclient = new jobclient(jc);
      jobstatus[] jobsstatus = jobclient.getalljobs(); 
      //這樣就得到了一個jobstatus數(shù)組,隨便取出一個元素取名叫jobstatus 
      jobstatus = jobsstatus[0]; 
      jobid jobid = jobstatus.getjobid(); //通過jobstatus獲取jobid 
      runningjob runningjob = jobclient.getjob(jobid); //通過jobid得到runningjob對象 
      runningjob.getjobstate();//可以獲取作業(yè)狀態(tài),狀態(tài)有五種,為jobstatus.failed 、jobstatus.killed、jobstatus.prep、jobstatus.running、jobstatus.succeeded 
      jobstatus.getusername();//可以獲取運行作業(yè)的用戶名。 
      runningjob.getjobname();//可以獲取作業(yè)名。 
      jobstatus.getstarttime();//可以獲取作業(yè)的開始時間,為utc毫秒數(shù)。 
      float map = runningjob.mapprogress();//可以獲取map階段完成的比例,0~1, 
      system.out.println("map=" + map);
      float reduce = runningjob.reduceprogress();//可以獲取reduce階段完成的比例。
      system.out.println("reduce="+reduce);
      runningjob.getfailureinfo();//可以獲取失敗信息。 
      runningjob.getcounters();//可以獲取作業(yè)相關的計數(shù)器,計數(shù)器的內(nèi)容和作業(yè)監(jiān)控頁面上看到的計數(shù)器的值一樣。 
       
       
    } catch (ioexception e) {
      progress[0] = 0;
      progress[1] = 0;
    }
   
    request.getsession().setattribute("map", progress[0]);
    request.getsession().setattribute("reduce", progress[1]);
  }
   
  //處理文件上傳
  public void handleuploadfiles(user user, list<string> filelist) {
    file folder = new file("/home/chenjie/cjhadooponline/"
        + user.getu_username());
    if (!folder.exists())
      return;
    if (folder.isdirectory()) {
      file[] files = folder.listfiles();
      for (file file : files) {
        system.out.println(file.getname());
        try {
          putfiletohadoopfsfolder(user, file, filelist);//將單個文件上傳到hadoop文件系統(tǒng)
        } catch (ioexception e) {
          system.err.println(e.getmessage());
        }
      }
    }
  }
 
  //將單個文件上傳到hadoop文件系統(tǒng)
  private void putfiletohadoopfsfolder(user user, file file,
      list<string> filelist) throws ioexception {
    configuration conf = new configuration();
    conf.addresource(new path("/opt/hadoop-1.2.1/conf/core-site.xml"));
    conf.addresource(new path("/opt/hadoop-1.2.1/conf/hdfs-site.xml"));
 
    filesystem filesystem = filesystem.get(conf);
    system.out.println(filesystem.geturi());
 
    path localfile = new path(file.getabsolutepath());
    path foler = new path("/user/" + user.getu_username()
        + "/wordcountinput");
    if (!filesystem.exists(foler)) {
      filesystem.mkdirs(foler);
    }
     
    path hadoopfile = new path("/user/" + user.getu_username()
        + "/wordcountinput/" + file.getname());
//   if (filesystem.exists(hadoopfile)) {
//     system.out.println("file exists.");
//   } else {
//     filesystem.mkdirs(hadoopfile);
//   }
    filesystem.copyfromlocalfile(true, true, localfile, hadoopfile);
    filelist.add(hadoopfile.touri().tostring());
 
  }
 
}

啟動hadoop:

Java/Web調用Hadoop進行MapReduce示例代碼

運行結果:

可以在任意平臺下,登錄該項目地址,上傳文件,得到結果。

Java/Web調用Hadoop進行MapReduce示例代碼

Java/Web調用Hadoop進行MapReduce示例代碼


Java/Web調用Hadoop進行MapReduce示例代碼

Java/Web調用Hadoop進行MapReduce示例代碼

Java/Web調用Hadoop進行MapReduce示例代碼

Java/Web調用Hadoop進行MapReduce示例代碼

運行成功。

源代碼:https://github.com/tudoupaisimalingshu/cjhadooponline

以上就是本文的全部內(nèi)容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。

原文鏈接:http://blog.csdn.net/csj941227/article/details/71786040

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 中国免费一级毛片 | 99亚洲伊人久久精品影院红桃 | 小视频免费在线观看 | 欧美一区二区三区中文字幕 | 亚洲伊人色欲综合网 | 欧美成人精品一区 | 久久午夜国产 | 欧美一级视频网站 | 欧美性色大片 | 成年免费视频黄网站在线观看 | 91看片在线观看视频 | 黄色片网页 | 亚洲第一视频 | 末成年女av片一区二区 | 在线观看欧美成人 | 国产男女 爽爽爽爽视频 | 看免费5xxaaa毛片 | 中文字幕免费在线看 | 精选久久| 最近日本电影hd免费观看 | 久久毛片 | 亚洲午夜影院在线观看 | 久久精品欧美一区二区 | 亚洲天堂在线电影 | 国产18成人免费视频 | avav在线播放 | 国产永久免费观看 | 欧美激情第一区 | 亚洲精品午夜国产va久久成人 | 色诱亚洲精品久久久久久 | 全免费午夜一级毛片真人 | 欧美a视频在线观看 | 久久久三级免费电影 | 国产亚洲精品久久久久婷婷瑜伽 | 久久综合婷婷 | 羞羞羞羞视频 | 在线播放一区二区三区 | 性欧美xxxx免费岛国不卡电影 | 男女隐私免费视频 | 成人免费毛片一 | 超碰97国产在线 |