激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務器之家:專注于服務器技術及軟件下載分享
分類導航

Mysql|Sql Server|Oracle|Redis|MongoDB|PostgreSQL|Sqlite|DB2|mariadb|Access|數據庫技術|

香港云服务器
服務器之家 - 數據庫 - MongoDB - 通用MapReduce程序復制HBase表數據

通用MapReduce程序復制HBase表數據

2020-05-20 16:29Angelababy_huan MongoDB

這篇文章主要為大家詳細介紹了通用MapReduce程序復制HBase表數據,具有一定的參考價值,感興趣的小伙伴們可以參考一下

編寫MR程序,讓其可以適合大部分的HBase表數據導入到HBase表數據。其中包括可以設置版本數、可以設置輸入表的列導入設置(選取其中某幾列)、可以設置輸出表的列導出設置(選取其中某幾列)。

原始表test1數據如下:

通用MapReduce程序復制HBase表數據

每個row key都有兩個版本的數據,這里只顯示了row key為1的數據

 在hbase shell 中創建數據表:

?
1
2
3
4
5
6
7
create 'test2',{NAME => 'cf1',VERSIONS => 10}  // 保存無版本、無列導入設置、無列導出設置的數據
create 'test3',{NAME => 'cf1',VERSIONS => 10}  // 保存無版本、無列導入設置、有列導出設置的數據
create 'test4',{NAME => 'cf1',VERSIONS => 10}  // 保存無版本、有列導入設置、無列導出設置的數據
create 'test5',{NAME => 'cf1',VERSIONS => 10}  // 保存有版本、無列導入設置、無列導出設置的數據
create 'test6',{NAME => 'cf1',VERSIONS => 10}  // 保存有版本、無列導入設置、有列導出設置的數據
create 'test7',{NAME => 'cf1',VERSIONS => 10}  // 保存有版本、有列導入設置、無列導出設置的數據
create 'test8',{NAME => 'cf1',VERSIONS => 10}  // 保存有版本、有列導入設置、有列導出設置的數據

main函數入口:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package GeneralHBaseToHBase;
import org.apache.hadoop.util.ToolRunner;
public class DriverTest {
 public static void main(String[] args) throws Exception {
 // 無版本設置、無列導入設置,無列導出設置
 String[] myArgs1= new String[]{
 "test1", // 輸入表
 "test2", // 輸出表
 "0"// 版本大小數,如果值為0,則為默認從輸入表導出最新的數據到輸出表
 "-1", // 列導入設置,如果為-1 ,則沒有設置列導入
 "-1" // 列導出設置,如果為-1,則沒有設置列導出
 };
 ToolRunner.run(HBaseDriver.getConfiguration(),
 new HBaseDriver(),
 myArgs1);
 // 無版本設置、有列導入設置,無列導出設置
 String[] myArgs2= new String[]{
 "test1",
 "test3",
 "0",
 "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14",
 "-1"
 };
 ToolRunner.run(HBaseDriver.getConfiguration(),
 new HBaseDriver(),
 myArgs2);
 // 無版本設置,無列導入設置,有列導出設置
 String[] myArgs3= new String[]{
 "test1",
 "test4",
 "0",
 "-1",
 "cf1:c1,cf1:c10,cf1:c14"
 };
 ToolRunner.run(HBaseDriver.getConfiguration(),
 new HBaseDriver(),
 myArgs3);
 // 有版本設置,無列導入設置,無列導出設置
 String[] myArgs4= new String[]{
 "test1",
 "test5",
 "2",
 "-1",
 "-1"
 };
 ToolRunner.run(HBaseDriver.getConfiguration(),
 new HBaseDriver(),
 myArgs4);
 // 有版本設置、有列導入設置,無列導出設置
 String[] myArgs5= new String[]{
 "test1",
 "test6",
 "2",
 "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14",
 "-1"
 };
 ToolRunner.run(HBaseDriver.getConfiguration(),
 new HBaseDriver(),
 myArgs5);
 
 // 有版本設置、無列導入設置,有列導出設置
 String[] myArgs6= new String[]{
 "test1",
 "test7",
 "2",
 "-1",
 "cf1:c1,cf1:c10,cf1:c14"
 };
 ToolRunner.run(HBaseDriver.getConfiguration(),
 new HBaseDriver(),
 myArgs6);
 // 有版本設置、有列導入設置,有列導出設置
 String[] myArgs7= new String[]{
 "test1",
 "test8",
 "2",
 "cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14",
 "cf1:c1,cf1:c10,cf1:c14"
 };
 ToolRunner.run(HBaseDriver.getConfiguration(),
 new HBaseDriver(),
 myArgs7);
 }
 
}

driver:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package GeneralHBaseToHBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import util.JarUtil;
 
 
public class HBaseDriver extends Configured implements Tool{
 public static String FROMTABLE=""; //導入表
 public static String TOTABLE=""; //導出表
 public static String SETVERSION=""; //是否設置版本
 // args => {FromTable,ToTable,SetVersion,ColumnFromTable,ColumnToTable}
 @Override
 public int run(String[] args) throws Exception {
 if(args.length!=5){
 System.err.println("Usage:\n demo.job.HBaseDriver <input> <inputTable> "
  + "<output> <outputTable>"
  +"< versions >"
  + " <set columns from inputTable> like <cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14> or <-1> "
  + "<set columns from outputTable> like <cf1:c1,cf1:c10,cf1:c14> or <-1>");
 return -1;
 }
 Configuration conf = getConf();
 FROMTABLE = args[0];
 TOTABLE = args[1];
 SETVERSION = args[2];
 conf.set("SETVERSION", SETVERSION);
 if(!args[3].equals("-1")){
 conf.set("COLUMNFROMTABLE", args[3]);
 }
 if(!args[4].equals("-1")){
 conf.set("COLUMNTOTABLE", args[4]);
 }
 String jobName ="From table "+FROMTABLE+ " ,Import to "+ TOTABLE;
 Job job = Job.getInstance(conf, jobName);
 job.setJarByClass(HBaseDriver.class);
 Scan scan = new Scan();
 // 判斷是否需要設置版本
 if(SETVERSION != "0" || SETVERSION != "1"){
 scan.setMaxVersions(Integer.parseInt(SETVERSION));
 }
 // 設置HBase表輸入:表名、scan、Mapper類、mapper輸出鍵類型、mapper輸出值類型
 TableMapReduceUtil.initTableMapperJob(
 FROMTABLE,
 scan,
 HBaseToHBaseMapper.class,
 ImmutableBytesWritable.class,
 Put.class,
 job);
 // 設置HBase表輸出:表名,reducer類
 TableMapReduceUtil.initTableReducerJob(TOTABLE, null, job);
 // 沒有 reducers, 直接寫入到 輸出文件
  job.setNumReduceTasks(0);
 
  return job.waitForCompletion(true) ? 0 : 1;
  
 }
 private static Configuration configuration;
 public static Configuration getConfiguration(){
 if(configuration==null){
 /**
 * TODO 了解如何直接從Windows提交代碼到Hadoop集群
 *  并修改其中的配置為實際配置
 */
 configuration = new Configuration();
 configuration.setBoolean("mapreduce.app-submission.cross-platform", true);// 配置使用跨平臺提交任務
 configuration.set("fs.defaultFS", "hdfs://master:8020");// 指定namenode
 configuration.set("mapreduce.framework.name", "yarn"); // 指定使用yarn框架
 configuration.set("yarn.resourcemanager.address", "master:8032"); // 指定resourcemanager
 configuration.set("yarn.resourcemanager.scheduler.address", "master:8030");// 指定資源分配器
 configuration.set("mapreduce.jobhistory.address", "master:10020");// 指定historyserver
 configuration.set("hbase.master", "master:16000");
 configuration.set("hbase.rootdir", "hdfs://master:8020/hbase");
 configuration.set("hbase.zookeeper.quorum", "slave1,slave2,slave3");
 configuration.set("hbase.zookeeper.property.clientPort", "2181");
 //TODO 需export->jar file ; 設置正確的jar包所在位置
 configuration.set("mapreduce.job.jar",JarUtil.jar(HBaseDriver.class));// 設置jar包路徑
 }
 
 return configuration;
 }
 
 
}

mapper:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
package GeneralHBaseToHBase;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map.Entry;
import java.util.NavigableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HBaseToHBaseMapper extends TableMapper<ImmutableBytesWritable, Put> {
 Logger log = LoggerFactory.getLogger(HBaseToHBaseMapper.class);
 private static int versionNum = 0;
 private static String[] columnFromTable = null;
 private static String[] columnToTable = null;
 private static String column1 = null;
 private static String column2 = null;
 @Override
 protected void setup(Context context)
 throws IOException, InterruptedException {
 Configuration conf = context.getConfiguration();
 versionNum = Integer.parseInt(conf.get("SETVERSION", "0"));
 column1 = conf.get("COLUMNFROMTABLE",null);
 if(!(column1 == null)){
 columnFromTable = column1.split(",");
 }
 column2 = conf.get("COLUMNTOTABLE",null);
 if(!(column2 == null)){
 columnToTable = column2.split(",");
 }
 }
 @Override
 protected void map(ImmutableBytesWritable key, Result value,
 Context context)
 throws IOException, InterruptedException {
 context.write(key, resultToPut(key,value));
 }
 /***
 * 把key,value轉換為Put
 * @param key
 * @param value
 * @return
 * @throws IOException
 */
 private Put resultToPut(ImmutableBytesWritable key, Result value) throws IOException {
 HashMap<String, String> fTableMap = new HashMap<>();
 HashMap<String, String> tTableMap = new HashMap<>();
 Put put = new Put(key.get());
 if(! (columnFromTable == null || columnFromTable.length == 0)){
 fTableMap = getFamilyAndColumn(columnFromTable);
 }
 if(! (columnToTable == null || columnToTable.length == 0)){
 tTableMap = getFamilyAndColumn(columnToTable);
 }
 if(versionNum==0){     
 if(fTableMap.size() == 0){  
 if(tTableMap.size() == 0){
  for (Cell kv : value.rawCells()) {
  put.add(kv); // 沒有設置版本,沒有設置列導入,沒有設置列導出
  }
  return put;
 } else{
  return getPut(put, value, tTableMap); // 無版本、無列導入、有列導出
 }
 } else {
 if(tTableMap.size() == 0){
  return getPut(put, value, fTableMap);// 無版本、有列導入、無列導出
 } else {
  return getPut(put, value, tTableMap);// 無版本、有列導入、有列導出
 }
 }
 } else{
 if(fTableMap.size() == 0){
 if(tTableMap.size() == 0){
  return getPut1(put, value); // 有版本,無列導入,無列導出
 }else{
  return getPut2(put, value, tTableMap); //有版本,無列導入,有列導出
 }
 }else{
 if(tTableMap.size() == 0){
  return getPut2(put,value,fTableMap);// 有版本,有列導入,無列導出
 }else{
  return getPut2(put,value,tTableMap); // 有版本,有列導入,有列導出
 }
 }
 }
 }
 /***
 * 無版本設置的情況下,對于有列導入或者列導出
 * @param put
 * @param value
 * @param tableMap
 * @return
 * @throws IOException
 */
 
 private Put getPut(Put put,Result value,HashMap<String, String> tableMap) throws IOException{
 for(Cell kv : value.rawCells()){
 byte[] family = kv.getFamily();
 if(tableMap.containsKey(new String(family))){
 String columnStr = tableMap.get(new String(family));
 ArrayList<String> columnBy = toByte(columnStr);
 if(columnBy.contains(new String(kv.getQualifier()))){
  put.add(kv); //沒有設置版本,沒有設置列導入,有設置列導出
 }
 }
 }
 return put;
 }
 /***
 * (有版本,無列導入,有列導出)或者(有版本,有列導入,無列導出)
 * @param put
 * @param value
 * @param tTableMap
 * @return
 */
 private Put getPut2(Put put,Result value,HashMap<String, String> tableMap){
 NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map=value.getMap();
  for(byte[] family:map.keySet()){
   if(tableMap.containsKey(new String(family))){
   String columnStr = tableMap.get(new String(family));
   log.info("@@@@@@@@@@@"+new String(family)+" "+columnStr);
 ArrayList<String> columnBy = toByte(columnStr);
   NavigableMap<byte[], NavigableMap<Long, byte[]>> familyMap = map.get(family);//列簇作為key獲取其中的列相關數據
    for(byte[] column:familyMap.keySet()){        //根據列名循壞
     log.info("!!!!!!!!!!!"+new String(column));
     if(columnBy.contains(new String(column))){
     NavigableMap<Long, byte[]> valuesMap = familyMap.get(column);
      for(Entry<Long, byte[]> s:valuesMap.entrySet()){//獲取列對應的不同版本數據,默認最新的一個
      System.out.println("***:"+new String(family)+" "+new String(column)+" "+s.getKey()+" "+new String(s.getValue()));
      put.addColumn(family, column, s.getKey(),s.getValue());
      }
     }
    }
   }
   
  }
 return put;
 }
 /***
 * 有版本、無列導入、無列導出
 * @param put
 * @param value
 * @return
 */
 private Put getPut1(Put put,Result value){
 NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> map=value.getMap();
  for(byte[] family:map.keySet()){
   NavigableMap<byte[], NavigableMap<Long, byte[]>> familyMap = map.get(family);//列簇作為key獲取其中的列相關數據
   for(byte[] column:familyMap.keySet()){        //根據列名循壞
    NavigableMap<Long, byte[]> valuesMap = familyMap.get(column);
    for(Entry<Long, byte[]> s:valuesMap.entrySet()){    //獲取列對應的不同版本數據,默認最新的一個
     put.addColumn(family, column, s.getKey(),s.getValue());
    }
   }
  }
  return put;
 }
 // str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"}
 /***
 * 得到列簇名與列名的k,v形式的map
 * @param str => {"cf1:c1","cf1:c2","cf1:c10","cf1:c11","cf1:c14"}
 * @return map => {"cf1" => "c1,c2,c10,c11,c14"}
 */
 private static HashMap<String, String> getFamilyAndColumn(String[] str){
 HashMap<String, String> map = new HashMap<>();
 HashSet<String> set = new HashSet<>();
 for(String s : str){
 set.add(s.split(":")[0]);
 }
 Object[] ob = set.toArray();
 for(int i=0; i<ob.length;i++){
 String family = String.valueOf(ob[i]);
 String columns = "";
 for(int j=0;j < str.length;j++){
 if(family.equals(str[j].split(":")[0])){
  columns += str[j].split(":")[1]+",";
 }
 }
 map.put(family, columns.substring(0, columns.length()-1));
 }
 return map;
 }
 
 private static ArrayList<String> toByte(String s){
 ArrayList<String> b = new ArrayList<>();
 String[] sarr = s.split(",");
 for(int i=0;i<sarr.length;i++){
 b.add(sarr[i]);
 }
 return b;
 }
}

程序運行完之后,在hbase shell中查看每個表,看是否數據導入正確:

test2:(無版本、無列導入設置、無列導出設置)

通用MapReduce程序復制HBase表數據

test3 (無版本、有列導入設置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、無列導出設置)

通用MapReduce程序復制HBase表數據

test4(無版本、無列導入設置、有列導出設置("cf1:c1,cf1:c10,cf1:c14"))

通用MapReduce程序復制HBase表數據

test5(有版本、無列導入設置、無列導出設置)

通用MapReduce程序復制HBase表數據

test6(有版本、有列導入設置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、無列導出設置)

通用MapReduce程序復制HBase表數據

test7(有版本、無列導入設置、有列導出設置("cf1:c1,cf1:c10,cf1:c14"))

通用MapReduce程序復制HBase表數據

test8(有版本、有列導入設置("cf1:c1,cf1:c2,cf1:c10,cf1:c11,cf1:c14")、有列導出設置("cf1:c1,cf1:c10,cf1:c14"))

通用MapReduce程序復制HBase表數據

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持服務器之家。

原文鏈接:https://blog.csdn.net/Angelababy_huan/article/details/53236693

延伸 · 閱讀

精彩推薦
  • MongoDB分布式文檔存儲數據庫之MongoDB分片集群的問題

    分布式文檔存儲數據庫之MongoDB分片集群的問題

    這篇文章主要介紹了分布式文檔存儲數據庫之MongoDB分片集群的問題,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋...

    Linux-18743072020-12-20
  • MongoDBMongoDB安裝圖文教程

    MongoDB安裝圖文教程

    這篇文章主要為大家詳細介紹了MongoDB安裝圖文教程,分為兩大部分為大家介紹下載MongoDB和安裝MongoDB的方法,感興趣的小伙伴們可以參考一下 ...

    Yangyi.He6132020-05-07
  • MongoDBMongoDB憑什么躋身數據庫排行前五

    MongoDB憑什么躋身數據庫排行前五

    MongoDB以比去年同期超出65.96分的成績繼續雄踞榜單前五,這個增幅在全榜僅次于PostgreSQL的77.99,而其相對于4月份的6.10分的增長也是僅次于微軟SQL Server排名...

    孫浩峰3892020-05-22
  • MongoDBMongoDB 內存使用情況分析

    MongoDB 內存使用情況分析

    都說 MongoDB 是個內存大戶,但是怎么知道它到底用了多少內存呢...

    MongoDB教程網10002020-09-29
  • MongoDBMongoDB中javascript腳本編程簡介和入門實例

    MongoDB中javascript腳本編程簡介和入門實例

    作為一個數據庫,MongoDB有一個很大的優勢——它使用js管理數據庫,所以也能夠使用js腳本進行復雜的管理——這種方法非常靈活 ...

    MongoDB教程網6982020-04-24
  • MongoDBMongodb實現定時備份與恢復的方法教程

    Mongodb實現定時備份與恢復的方法教程

    這篇文章主要給大家介紹了Mongodb實現定時備份與恢復的方法教程,文中通過示例代碼介紹的非常詳細,對大家具有一定的參考學習價值,需要的朋友們下面...

    chenjsh364522020-05-13
  • MongoDBmongodb基本命令實例小結

    mongodb基本命令實例小結

    這篇文章主要介紹了mongodb基本命令,結合實例形式總結分析了MongoDB數據庫切換、查看、刪除、查詢等基本命令用法與操作注意事項,需要的朋友可以參考下...

    dawn-liu3652020-05-26
  • MongoDB遷移sqlserver數據到MongoDb的方法

    遷移sqlserver數據到MongoDb的方法

    這篇文章主要介紹了遷移sqlserver數據到MongoDb的方法,本文給大家介紹的非常詳細,對大家的學習或工作具有一定的參考借鑒價值,需要的朋友可以參考下...

    聽楓xl9682021-01-03
547
主站蜘蛛池模板: 久久精品一区二区三区不卡牛牛 | 91成人午夜性a一级毛片 | 中文字幕在线永久视频 | 亚洲视频成人 | 99爱在线免费观看 | 欧美日韩在线中文字幕 | 激情综合婷婷久久 | 九九视频精品在线观看 | av中文一区 | 欧美日韩大片在线观看 | 国产91免费看 | 亚洲综合无码一区二区 | 国产中出在线观看 | 午夜视频在线观看免费视频 | 久久99精品久久久久久园产越南 | 亚洲午夜天堂吃瓜在线 | sesee99| 日本黄视频在线观看 | 黄色免费不卡视频 | 天天看天天摸天天操 | 蜜桃视频在线观看视频 | 精品一区二区在线播放 | 国产亚洲精久久久久久蜜臀 | 九色成人在线 | 在线播放免费人成毛片乱码 | 欧美成人高清视频 | 黄网站在线播放视频免费观看 | 欧美视频国产精品 | 久久91亚洲人成电影网站 | 精品国产高清一区二区三区 | 久色乳综合思思在线视频 | 亚洲人成在线播放网站 | 免费国产a | 99久久精品日本一区二区免费 | 精品国产一级毛片 | 亚洲5区 | 国产一级毛片国产 | 成人国产精品一区二区毛片在线 | 国产高清自拍一区 | 国产一级毛片高清视频 | 日韩精品羞羞答答 |