項目需求
近日需要實現用戶推薦相關的功能,也就是說向用戶推薦他可能喜歡的東西。
我們的數據分析工程師會將用戶以及用戶可能喜歡的東西整理成文檔給我,我只需要將數據從文檔中讀取出來,然后對數據進行進一步的清洗(例如去掉特殊符號,長度如果太長則截取)。然后將處理后的數據存入數據庫(Mysql)。
所以分為三步:
- 讀取文檔獲得數據
- 對獲得的數據進行處理
- 更新數據庫(新增或更新)
考慮到這個數據量以后會越來越大,這里沒有使用 poi 來讀取數據,而直接使用了 SpringBatch。
實現步驟
本文假設讀者已經能夠使用 SpringBoot 連接處理 Mysql,所以這部分文中會省略。
1、創建 Maven 項目,并在 pom.xml 中添加依賴
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version> 1.5 . 2 .RELEASE</version> </parent> <properties> <java.version> 1.8 </java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version> 1.2 . 0 </version> </dependency> <!-- 工具類依賴--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version> 1.12 . 6 </version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version> 3.4 </version> </dependency> <!-- 數據庫相關依賴 --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version> 1.0 . 26 </version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies> |
這里是這個小項目中用到的所有依賴,包括連接數據庫的依賴以及工具類等。
2、編寫 Model 類
我們要從文檔中讀取的有效列就是 uid,tag,type,就是用戶 ID,用戶可能包含的標簽(用于推送),用戶類別(用戶用戶之間互相推薦)。
UserMap.java 中的 @Entity,@Column 注解,是為了利用 JPA 生成數據表而寫的,可要可不要。
UserMap.java
1
2
3
4
5
6
7
8
9
10
11
12
13
|
@Data @EqualsAndHashCode @NoArgsConstructor @AllArgsConstructor //@Entity(name = "user_map") public class UserMap extends BaseModel { @Column (name = "uid" , unique = true , nullable = false ) private Long uid; @Column (name = "tag" ) private String tag; @Column (name = "type" ) private Integer type; } |
3、實現批處理配置類
BatchConfiguration.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
|
@Configuration @EnableBatchProcessing public class BatchConfiguration { @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier ( "prodDataSource" ) DataSource prodDataSource; @Bean public FlatFileItemReader<UserMap> reader() { FlatFileItemReader<UserMap> reader = new FlatFileItemReader<>(); reader.setLineMapper( new DefaultLineMapper<UserMap>() {{ setLineTokenizer( new DelimitedLineTokenizer( "|" ) {{ setNames( new String[]{ "uid" , "tag" , "type" }); }}); setFieldSetMapper( new BeanWrapperFieldSetMapper<UserMap>() {{ setTargetType(UserMap. class ); }}); }}); return reader; } @Bean public JdbcBatchItemWriter<UserMap> importWriter() { JdbcBatchItemWriter<UserMap> writer = new JdbcBatchItemWriter<>(); writer.setItemSqlParameterSourceProvider( new BeanPropertyItemSqlParameterSourceProvider<>()); writer.setSql( "INSERT INTO user_map (uid,tag,type) VALUES (:uid, :tag,:type)" ); writer.setDataSource(prodDataSource); return writer; } @Bean public JdbcBatchItemWriter<UserMap> updateWriter() { JdbcBatchItemWriter<UserMap> writer = new JdbcBatchItemWriter<>(); writer.setItemSqlParameterSourceProvider( new BeanPropertyItemSqlParameterSourceProvider<>()); writer.setSql( "UPDATE user_map SET type = (:type),tag = (:tag) WHERE uid = (:uid)" ); writer.setDataSource(prodDataSource); return writer; } @Bean public UserMapItemProcessor processor(UserMapItemProcessor.ProcessStatus processStatus) { return new UserMapItemProcessor(processStatus); } @Bean public Job importUserJob(JobCompletionNotificationListener listener) { return jobBuilderFactory.get( "importUserJob" ) .incrementer( new RunIdIncrementer()) .listener(listener) .flow(importStep()) .end() .build(); } @Bean public Step importStep() { return stepBuilderFactory.get( "importStep" ) .<UserMap, UserMap>chunk( 100 ) .reader(reader()) .processor(processor(IMPORT)) .writer(importWriter()) .build(); } @Bean public Job updateUserJob(JobCompletionNotificationListener listener) { return jobBuilderFactory.get( "updateUserJob" ) .incrementer( new RunIdIncrementer()) .listener(listener) .flow(updateStep()) .end() .build(); } @Bean public Step updateStep() { return stepBuilderFactory.get( "updateStep" ) .<UserMap, UserMap>chunk( 100 ) .reader(reader()) .processor(processor(UPDATE)) .writer(updateWriter()) .build(); } } |
prodDataSource 是假設用戶已經設置好的,如果不知道怎么配置,也可以參考之前的文章進行配置:Springboot 集成 Mybatis。
reader(),這方法從文件中讀取數據,并且設置了一些必要的參數。緊接著是寫操作 importWriter()
和 updateWriter()
,讀者看其中一個就好,因為我這里是需要更新或者修改的,所以分為兩個。
processor(ProcessStatus status)
,該方法是對我們處理數據的類進行實例化,這里我根據 status 是 IMPORT 還是 UPDATE 來獲取不同的處理結果。
其他的看代碼就可以看懂了,哈哈,不詳細說了。
4、將獲得的數據進行清洗
UserMapItemProcessor.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
public class UserMapItemProcessor implements ItemProcessor<UserMap, UserMap> { private static final int MAX_TAG_LENGTH = 200 ; private ProcessStatus processStatus; public UserMapItemProcessor(ProcessStatus processStatus) { this .processStatus = processStatus; } @Autowired IUserMapService userMapService; private static final String TAG_PATTERN_STR = "^[a-zA-Z0-9\\u4E00-\\u9FA5_-]+$" ; public static final Pattern TAG_PATTERN = Pattern.compile(TAG_PATTERN_STR); private static final Logger LOG = LoggerFactory.getLogger(UserMapItemProcessor. class ); @Override public UserMap process(UserMap userMap) throws Exception { Long uid = userMap.getUid(); String tag = cleanTag(userMap.getTag()); Integer label = userMap.getType() == null ? Integer.valueOf( 0 ) : userMap.getType(); if (StringUtils.isNotBlank(tag)) { Map<String, Object> params = new HashMap<>(); params.put( "uid" , uid); UserMap userMapFromDB = userMapService.selectOne(params); if (userMapFromDB == null ) { if ( this .processStatus == ProcessStatus.IMPORT) { return new UserMap(uid, tag, label); } } else { if ( this .processStatus == ProcessStatus.UPDATE) { if (!tag.equals(userMapFromDB.getTag()) && !label.equals(userMapFromDB.getType())) { userMapFromDB.setType(label); userMapFromDB.setTag(tag); return userMapFromDB; } } } } return null ; } /** * 清洗標簽 * * @param tag * @return */ private static String cleanTag(String tag) { if (StringUtils.isNotBlank(tag)) { try { tag = tag.substring(tag.indexOf( "{" ) + 1 , tag.lastIndexOf( "}" )); String[] tagArray = tag.split( "," ); Optional<String> reduce = Arrays.stream(tagArray).parallel() .map(str -> str.split( ":" )[ 0 ]) .map(str -> str.replaceAll( "\'" , "" )) .map(str -> str.replaceAll( " " , "" )) .filter(str -> TAG_PATTERN.matcher(str).matches()) .reduce((x, y) -> x + "," + y); Function<String, String> str = (s -> s.length() > MAX_TAG_LENGTH ? s.substring( 0 , MAX_TAG_LENGTH) : s); return str.apply(reduce.get()); } catch (Exception e) { LOG.error(e.getMessage(), e); } } return null ; } protected enum ProcessStatus { IMPORT, UPDATE; } public static void main(String[] args) { String distinctTag = cleanTag( "Counter({'《重新定義》': 3, '輕想上的輕小說': 3, '小說': 2, 'Fate': 2, '同人小說': 2, '雪狼八組': 1, " + "'社會': 1, '人文': 1, '短篇': 1, '重新定義': 1, 'AMV': 1, '《FBD》': 1, '《雪狼六組》': 1, '戰爭': 1, '《灰羽聯盟》': 1, " + "'誰說輕想沒人寫小說': 1})" ); System.out.println(distinctTag); } } |
讀取到的數據格式如 main()
方法所示,清理之后的結果如:
輕想上的輕小說,小說,Fate,同人小說,雪狼八組,社會,人文,短篇,重新定義,AMV,戰爭,誰說輕想沒人寫小說 。
去掉了特殊符號以及數字等。使用了 Java8 的 Lambda 表達式。
并且這里在處理的時候,判斷如果該數據用戶已經存在,則進行更新,如果不存在,則新增。
5、Job 執行結束回調類
JobCompletionNotificationListener.java
1
2
3
4
5
6
7
8
9
10
11
12
13
|
@Component public class JobCompletionNotificationListener extends JobExecutionListenerSupport { private static final Logger log = LoggerFactory.getLogger(JobCompletionNotificationListener. class ); private final JdbcTemplate jdbcTemplate; @Autowired public JobCompletionNotificationListener(JdbcTemplate jdbcTemplate) { this .jdbcTemplate = jdbcTemplate; } @Override public void afterJob(JobExecution jobExecution) { System.out.println( "end ....." ); } } |
具體的邏輯可自行實現。
完成以上幾個步驟,運行項目,就可以讀取并寫入數據到數據庫了。
總結
以上就是這篇文章的全部內容了,希望本文的內容對大家學習或者工作能帶來一定的幫助,如果有疑問大家可以留言交流,謝謝大家對服務器之家的支持。
原文鏈接:https://huanxi.pub/2017/04/27/SpringBatch讀取txt文件并寫入數據庫/