激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導(dǎo)航

PHP教程|ASP.NET教程|JAVA教程|ASP教程|編程技術(shù)|正則表達式|C/C++|IOS|C#|Swift|Android|JavaScript|易語言|

服務(wù)器之家 - 編程語言 - JAVA教程 - Spring5源碼解析之Spring中的異步和計劃任務(wù)

Spring5源碼解析之Spring中的異步和計劃任務(wù)

2021-01-20 14:11一葉知秋 JAVA教程

本篇文章主要介紹了Spring5源碼解析之Spring中的異步和計劃任務(wù),具有一定的參考價值,感興趣的小伙伴們可以參考一下。

Java提供了許多創(chuàng)建線程池的方式,并得到一個Future實例來作為任務(wù)結(jié)果。對于Spring同樣小菜一碟,通過其scheduling包就可以做到將任務(wù)線程中后臺執(zhí)行。

在本文的第一部分中,我們將討論下Spring中執(zhí)行計劃任務(wù)的一些基礎(chǔ)知識。之后,我們將解釋這些類是如何一起協(xié)作來啟動并執(zhí)行計劃任務(wù)的。下一部分將介紹計劃和異步任務(wù)的配置。最后,我們來寫個Demo,看看如何通過單元測試來編排計劃任務(wù)。

什么是Spring中的異步任務(wù)?

在我們正式的進入話題之前,對于Spring,我們需要理解下它實現(xiàn)的兩個不同的概念:異步任務(wù)和調(diào)度任務(wù)。顯然,兩者有一個很大的共同點:都在后臺工作。但是,它們之間存在了很大差異。調(diào)度任務(wù)與異步不同,其作用與Linux中的CRON job完全相同(windows里面也有計劃任務(wù))。舉個栗子,有一個任務(wù)必須每40分鐘執(zhí)行一次,那么,可以通過XML文件或者注解來進行此配置。簡單的異步任務(wù)在后臺執(zhí)行就好,無需配置執(zhí)行頻率。

因為它們是兩種不同的任務(wù)類型,它們兩個的執(zhí)行者自然也就不同。第一個看起來有點像Java的并發(fā)執(zhí)行器(concurrency executor),這里會有專門去寫一篇關(guān)于Java中的執(zhí)行器來具體了解。根據(jù)Spring文檔TaskExecutor所述,它提供了基于Spring的抽象來處理線程池,這點,也可以通過其類的注釋去了解。另一個抽象接口是TaskScheduler,它用于在將來給定的時間點來安排任務(wù),并執(zhí)行一次或定期執(zhí)行。

在分析源碼的過程中,發(fā)現(xiàn)另一個比較有趣的點是觸發(fā)器。它存在兩種類型:CronTrigger或PeriodTrigger。第一個模擬CRON任務(wù)的行為。所以我們可以在將來確切時間點提交一個任務(wù)的執(zhí)行。另一個觸發(fā)器可用于定期執(zhí)行任務(wù)。

Spring的異步任務(wù)類

讓我們從org.springframework.core.task.TaskExecutor類的分析開始。你會發(fā)現(xiàn),其簡單的不行,它是一個擴展Java的Executor接口的接口。它的唯一方法也就是是執(zhí)行,在參數(shù)中使用Runnable類型的任務(wù)。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package org.springframework.core.task;
import java.util.concurrent.Executor;
/**
 * Simple task executor interface that abstracts the execution
 * of a {@link Runnable}.
 *
 * <p>Implementations can use all sorts of different execution strategies,
 * such as: synchronous, asynchronous, using a thread pool, and more.
 *
 * <p>Equivalent to JDK 1.5's {@link java.util.concurrent.Executor}
 * interface; extending it now in Spring 3.0, so that clients may declare
 * a dependency on an Executor and receive any TaskExecutor implementation.
 * This interface remains separate from the standard Executor interface
 * mainly for backwards compatibility with JDK 1.4 in Spring 2.x.
 *
 * @author Juergen Hoeller
 * @since 2.0
 * @see java.util.concurrent.Executor
 */
@FunctionalInterface
public interface TaskExecutor extends Executor {
 /**
 * Execute the given {@code task}.
 * <p>The call might return immediately if the implementation uses
 * an asynchronous execution strategy, or might block in the case
 * of synchronous execution.
 * @param task the {@code Runnable} to execute (never {@code null})
 * @throws TaskRejectedException if the given task was not accepted
 */
 @Override
 void execute(Runnable task);
}

相對來說,org.springframework.scheduling.TaskScheduler接口就有點復(fù)雜了。它定義了一組以schedule開頭的名稱的方法允許我們定義將來要執(zhí)行的任務(wù)。所有 schedule* 方法返回java.util.concurrent.ScheduledFuture實例。Spring5中對scheduleAtFixedRate方法做了進一步的充實,其實最終調(diào)用的還是ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period);

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
public interface TaskScheduler {
 /**
 * Schedule the given {@link Runnable}, invoking it whenever the trigger
 * indicates a next execution time.
 * <p>Execution will end once the scheduler shuts down or the returned
 * {@link ScheduledFuture} gets cancelled.
 * @param task the Runnable to execute whenever the trigger fires
 * @param trigger an implementation of the {@link Trigger} interface,
 * e.g. a {@link org.springframework.scheduling.support.CronTrigger} object
 * wrapping a cron expression
 * @return a {@link ScheduledFuture} representing pending completion of the task,
 * or {@code null} if the given Trigger object never fires (i.e. returns
 * {@code null} from {@link Trigger#nextExecutionTime})
 * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted
 * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress)
 * @see org.springframework.scheduling.support.CronTrigger
 */
 @Nullable
 ScheduledFuture<?> schedule(Runnable task, Trigger trigger);
 
 /**
 * Schedule the given {@link Runnable}, invoking it at the specified execution time.
 * <p>Execution will end once the scheduler shuts down or the returned
 * {@link ScheduledFuture} gets cancelled.
 * @param task the Runnable to execute whenever the trigger fires
 * @param startTime the desired execution time for the task
 * (if this is in the past, the task will be executed immediately, i.e. as soon as possible)
 * @return a {@link ScheduledFuture} representing pending completion of the task
 * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted
 * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress)
 * 使用了默認實現(xiàn),值得我們學(xué)習(xí)使用的,Java9中同樣可以有私有實現(xiàn)的,從這里我們可以做到我只通過  * 一個接口你來實現(xiàn),我把其他相應(yīng)的功能默認實現(xiàn)下,最后調(diào)用你自定義實現(xiàn)的接口即可,使接口功能更  * 加一目了然
 * @since 5.0
 * @see #schedule(Runnable, Date)
 */
 default ScheduledFuture<?> schedule(Runnable task, Instant startTime) {
 return schedule(task, Date.from(startTime));
 }
 /**
 * Schedule the given {@link Runnable}, invoking it at the specified execution time.
 * <p>Execution will end once the scheduler shuts down or the returned
 * {@link ScheduledFuture} gets cancelled.
 * @param task the Runnable to execute whenever the trigger fires
 * @param startTime the desired execution time for the task
 * (if this is in the past, the task will be executed immediately, i.e. as soon as possible)
 * @return a {@link ScheduledFuture} representing pending completion of the task
 * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted
 * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress)
 */
 ScheduledFuture<?> schedule(Runnable task, Date startTime);
...
/**
 * Schedule the given {@link Runnable}, invoking it at the specified execution time
 * and subsequently with the given period.
 * <p>Execution will end once the scheduler shuts down or the returned
 * {@link ScheduledFuture} gets cancelled.
 * @param task the Runnable to execute whenever the trigger fires
 * @param startTime the desired first execution time for the task
 * (if this is in the past, the task will be executed immediately, i.e. as soon as possible)
 * @param period the interval between successive executions of the task
 * @return a {@link ScheduledFuture} representing pending completion of the task
 * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted
 * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress)
 * @since 5.0
 * @see #scheduleAtFixedRate(Runnable, Date, long)
 */
 default ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Instant startTime, Duration period) {
 return scheduleAtFixedRate(task, Date.from(startTime), period.toMillis());
 }
 /**
 * Schedule the given {@link Runnable}, invoking it at the specified execution time
 * and subsequently with the given period.
 * <p>Execution will end once the scheduler shuts down or the returned
 * {@link ScheduledFuture} gets cancelled.
 * @param task the Runnable to execute whenever the trigger fires
 * @param startTime the desired first execution time for the task
 * (if this is in the past, the task will be executed immediately, i.e. as soon as possible)
 * @param period the interval between successive executions of the task (in milliseconds)
 * @return a {@link ScheduledFuture} representing pending completion of the task
 * @throws org.springframework.core.task.TaskRejectedException if the given task was not accepted
 * for internal reasons (e.g. a pool overload handling policy or a pool shutdown in progress)
 */
 ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period);
...
}

之前提到兩個觸發(fā)器組件,都實現(xiàn)了org.springframework.scheduling.Trigger接口。這里,我們只需關(guān)注一個的方法nextExecutionTime ,其定義下一個觸發(fā)任務(wù)的執(zhí)行時間。它的兩個實現(xiàn),CronTrigger和PeriodicTrigger,由org.springframework.scheduling.TriggerContext來實現(xiàn)信息的存儲,由此,我們可以很輕松獲得一個任務(wù)的最后一個執(zhí)行時間(lastScheduledExecutionTime),給定任務(wù)的最后完成時間(lastCompletionTime)或最后一個實際執(zhí)行時間(lastActualExecutionTime)。接下來,我們通過閱讀源代碼來簡單的了解下這些東西。org.springframework.scheduling.concurrent.ConcurrentTaskScheduler包含一個私有類EnterpriseConcurrentTriggerScheduler。在這個class里面,我們可以找到schedule方法:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public ScheduledFuture<?> schedule(Runnable task, final Trigger trigger) {
 ManagedScheduledExecutorService executor = (ManagedScheduledExecutorService) scheduledExecutor;
 return executor.schedule(task, new javax.enterprise.concurrent.Trigger() {
  @Override
  public Date getNextRunTime(LastExecution le, Date taskScheduledTime) {
   return trigger.nextExecutionTime(le != null ?
    new SimpleTriggerContext(le.getScheduledStart(), le.getRunStart(), le.getRunEnd()) :
    new SimpleTriggerContext());
  }
  @Override
  public boolean skipRun(LastExecution lastExecution, Date scheduledRunTime) {
   return false;
  }
 });
}

SimpleTriggerContext從其名字就可以看到很多東西了,因為它實現(xiàn)了TriggerContext接口。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
 * Simple data holder implementation of the {@link TriggerContext} interface.
 *
 * @author Juergen Hoeller
 * @since 3.0
 */
public class SimpleTriggerContext implements TriggerContext {
 @Nullable
 private volatile Date lastScheduledExecutionTime;
 @Nullable
 private volatile Date lastActualExecutionTime;
 @Nullable
 private volatile Date lastCompletionTime;
...
 /**
 * Create a SimpleTriggerContext with the given time values.
 * @param lastScheduledExecutionTime last <i>scheduled</i> execution time
 * @param lastActualExecutionTime last <i>actual</i> execution time
 * @param lastCompletionTime last completion time
 */
 public SimpleTriggerContext(Date lastScheduledExecutionTime, Date lastActualExecutionTime, Date lastCompletionTime) {
 this.lastScheduledExecutionTime = lastScheduledExecutionTime;
 this.lastActualExecutionTime = lastActualExecutionTime;
 this.lastCompletionTime = lastCompletionTime;
 }
 ...
}

也正如你看到的,在構(gòu)造函數(shù)中設(shè)置的時間值來自javax.enterprise.concurrent.LastExecution的實現(xiàn),其中:

  • getScheduledStart:返回上次開始執(zhí)行任務(wù)的時間。它對應(yīng)于TriggerContext的lastScheduledExecutionTime屬性。
  • getRunStart:返回給定任務(wù)開始運行的時間。在TriggerContext中,它對應(yīng)于lastActualExecutionTime。
  • getRunEnd:任務(wù)終止時返回。它用于在TriggerContext中設(shè)置lastCompletionTime。

Spring調(diào)度和異步執(zhí)行中的另一個重要類是org.springframework.core.task.support.TaskExecutorAdapter。它是一個將java.util.concurrent.Executor作為Spring基本的執(zhí)行器的適配器(描述的有點拗口,看下面代碼就明了了),之前已經(jīng)描述了TaskExecutor。實際上,它引用了Java的ExecutorService,它也是繼承了Executor接口。此引用用于完成所有提交的任務(wù)。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
 * Adapter that takes a JDK {@code java.util.concurrent.Executor} and
 * exposes a Spring {@link org.springframework.core.task.TaskExecutor} for it.
 * Also detects an extended {@code java.util.concurrent.ExecutorService 從此解釋上面的說明}, adapting
 * the {@link org.springframework.core.task.AsyncTaskExecutor} interface accordingly.
 *
 * @author Juergen Hoeller
 * @since 3.0
 * @see java.util.concurrent.Executor
 * @see java.util.concurrent.ExecutorService
 * @see java.util.concurrent.Executors
 */
public class TaskExecutorAdapter implements AsyncListenableTaskExecutor {
 private final Executor concurrentExecutor;
 @Nullable
 private TaskDecorator taskDecorator;
 ...
  /**
 * Create a new TaskExecutorAdapter,
 * using the given JDK concurrent executor.
 * @param concurrentExecutor the JDK concurrent executor to delegate to
 */
 public TaskExecutorAdapter(Executor concurrentExecutor) {
 Assert.notNull(concurrentExecutor, "Executor must not be null");
 this.concurrentExecutor = concurrentExecutor;
 }
  /**
 * Delegates to the specified JDK concurrent executor.
 * @see java.util.concurrent.Executor#execute(Runnable)
 */
 @Override
 public void execute(Runnable task) {
 try {
  doExecute(this.concurrentExecutor, this.taskDecorator, task);
 }
 catch (RejectedExecutionException ex) {
  throw new TaskRejectedException(
   "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex);
 }
 }
 @Override
 public void execute(Runnable task, long startTimeout) {
 execute(task);
 }
 @Override
 public Future<?> submit(Runnable task) {
 try {
  if (this.taskDecorator == null && this.concurrentExecutor instanceof ExecutorService) {
  return ((ExecutorService) this.concurrentExecutor).submit(task);
  }
  else {
  FutureTask<Object> future = new FutureTask<>(task, null);
  doExecute(this.concurrentExecutor, this.taskDecorator, future);
  return future;
  }
 }
 catch (RejectedExecutionException ex) {
  throw new TaskRejectedException(
   "Executor [" + this.concurrentExecutor + "] did not accept task: " + task, ex);
 }
 }
 ...
}

在Spring中配置異步和計劃任務(wù)

下面我們通過代碼的方式來實現(xiàn)異步任務(wù)。首先,我們需要通過注解來啟用配置。它的XML配置如下:

?
1
2
3
4
<task:scheduler id="taskScheduler"/>
<task:executor id="taskExecutor" pool-size="2" />
<task:annotation-driven executor="taskExecutor" scheduler="taskScheduler"/>
<context:component-scan base-package="com.migo.async"/>

可以通過將@EnableScheduling和@EnableAsync注解添加到配置類(用@Configuration注解)來激活兩者。完事,我們就可以開始著手實現(xiàn)調(diào)度和異步任務(wù)。為了實現(xiàn)調(diào)度任務(wù),我們可以使用@Scheduled注解。我們可以從org.springframework.scheduling.annotation包中找到這個注解。它包含了以下幾個屬性:

  • cron:使用CRON風(fēng)格(Linux配置定時任務(wù)的風(fēng)格)的配置來配置需要啟動的帶注解的任務(wù)。
  • zone:要解析CRON表達式的時區(qū)。
  • fixedDelay或fixedDelayString:在固定延遲時間后執(zhí)行任務(wù)。即任務(wù)將在最后一次調(diào)用結(jié)束和下一次調(diào)用的開始之間的這個固定時間段后執(zhí)行。
  • fixedRate或fixedRateString:使用fixedRate注解的方法的調(diào)用將以固定的時間段(例如:每10秒鐘)進行,與執(zhí)行生命周期(開始,結(jié)束)無關(guān)。
  • initialDelay或initialDelayString:延遲首次執(zhí)行調(diào)度方法的時間。請注意,所有值(fixedDelay ,fixedRate ,initialDelay )必須以毫秒表示。 需要特別記住的是 ,用@Scheduled注解的方法不能接受任何參數(shù),并且不返回任何內(nèi)容(void),如果有返回值,返回值也會被忽略掉的,沒什么卵用。定時任務(wù)方法由容器管理,而不是由調(diào)用者在運行時調(diào)用。它們由 org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor來解析,其中包含以下方法來拒絕執(zhí)行所有不正確定義的函數(shù):
?
1
2
3
4
5
6
7
8
9
10
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
 try {
  Assert.isTrue(method.getParameterCount() == 0,
   "Only no-arg methods may be annotated with @Scheduled");
 /**
 *  之前的版本中直接把返回值非空的給拒掉了,在Spring 4.3 Spring5 的版本中就沒那么嚴格了
   * Assert.isTrue(void.class.equals(method.getReturnType()),
   *        "Only void-returning methods may be annotated with @Scheduled");
   **/       
// ...
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
 * 注釋很重要
 * An annotation that marks a method to be scheduled. Exactly one of
 * the {@link #cron()}, {@link #fixedDelay()}, or {@link #fixedRate()}
 * attributes must be specified.
 *
 * <p>The annotated method must expect no arguments. It will typically have
 * a {@code void} return type; if not, the returned value will be ignored
 * when called through the scheduler.
 *
 * <p>Processing of {@code @Scheduled} annotations is performed by
 * registering a {@link ScheduledAnnotationBeanPostProcessor}. This can be
 * done manually or, more conveniently, through the {@code <task:annotation-driven/>}
 * element or @{@link EnableScheduling} annotation.
 *
 * <p>This annotation may be used as a <em>meta-annotation</em> to create custom
 * <em>composed annotations</em> with attribute overrides.
 *
 * @author Mark Fisher
 * @author Dave Syer
 * @author Chris Beams
 * @since 3.0
 * @see EnableScheduling
 * @see ScheduledAnnotationBeanPostProcessor
 * @see Schedules
 */
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(Schedules.class)
public @interface Scheduled {
...
}

使用@Async注解標(biāo)記一個方法或一個類(通過標(biāo)記一個類,我們自動將其所有方法標(biāo)記為異步)。與@Scheduled不同,異步任務(wù)可以接受參數(shù),并可能返回某些東西。

寫一個在Spring中執(zhí)行異步任務(wù)的Demo

有了上面這些知識,我們可以來編寫異步和計劃任務(wù)。我們將通過測試用例來展示。我們從不同的任務(wù)執(zhí)行器(task executors)的測試開始:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:applicationContext-test.xml"})
@WebAppConfiguration
public class TaskExecutorsTest {
 
 @Test
 public void simpeAsync() throws InterruptedException {
  /**
   * SimpleAsyncTaskExecutor creates new Thread for every task and executes it asynchronously. The threads aren't reused as in
   * native Java's thread pools.
   *
   * The number of concurrently executed threads can be specified through concurrencyLimit bean property
   * (concurrencyLimit XML attribute). Here it's more simple to invoke setConcurrencyLimit method.
   * Here the tasks will be executed by 2 simultaneous threads. Without specifying this value,
   * the number of executed threads will be indefinite.
   *
   * You can observe that only 2 tasks are executed at a given time - even if 3 are submitted to execution (lines 40-42).
   **/
  SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("thread_name_prefix_____");
  executor.setConcurrencyLimit(2);
  executor.execute(new SimpleTask("SimpleAsyncTask-1", Counters.simpleAsyncTask, 1000));
  executor.execute(new SimpleTask("SimpleAsyncTask-2", Counters.simpleAsyncTask, 1000));
 
  Thread.sleep(1050);
  assertTrue("2 threads should be terminated, but "+Counters.simpleAsyncTask.getNb()+" were instead", Counters.simpleAsyncTask.getNb() == 2);
 
  executor.execute(new SimpleTask("SimpleAsyncTask-3", Counters.simpleAsyncTask, 1000));
  executor.execute(new SimpleTask("SimpleAsyncTask-4", Counters.simpleAsyncTask, 1000));
  executor.execute(new SimpleTask("SimpleAsyncTask-5", Counters.simpleAsyncTask, 2000));
   
  Thread.sleep(1050);
  assertTrue("4 threads should be terminated, but "+Counters.simpleAsyncTask.getNb()+" were instead", Counters.simpleAsyncTask.getNb() == 4);
  executor.execute(new SimpleTask("SimpleAsyncTask-6", Counters.simpleAsyncTask, 1000));
 
  Thread.sleep(1050);
  assertTrue("6 threads should be terminated, but "+Counters.simpleAsyncTask.getNb()+" were instead",
   Counters.simpleAsyncTask.getNb() == 6);
 }
  
 @Test
 public void syncTaskTest() {
  /**
   * SyncTask works almost as Java's CountDownLatch. In fact, this executor is synchronous with the calling thread. In our case,
   * SyncTaskExecutor tasks will be synchronous with JUnit thread. It means that the testing thread will sleep 5
   * seconds after executing the third task ('SyncTask-3'). To prove that, we check if the total execution time is ~5 seconds.
   **/
  long start = System.currentTimeMillis();
  SyncTaskExecutor executor = new SyncTaskExecutor();
  executor.execute(new SimpleTask("SyncTask-1", Counters.syncTask, 0));
  executor.execute(new SimpleTask("SyncTask-2", Counters.syncTask, 0));
  executor.execute(new SimpleTask("SyncTask-3", Counters.syncTask, 0));
  executor.execute(new SimpleTask("SyncTask-4", Counters.syncTask, 5000));
  executor.execute(new SimpleTask("SyncTask-5", Counters.syncTask, 0));
  long end = System.currentTimeMillis();
  int execTime = Math.round((end-start)/1000);
  assertTrue("Execution time should be 5 seconds but was "+execTime+" seconds", execTime == 5);
 }
  
 @Test
 public void threadPoolTest() throws InterruptedException {
  /**
   * This executor can be used to expose Java's native ThreadPoolExecutor as Spring bean, with the
   * possibility to set core pool size, max pool size and queue capacity through bean properties.
   *
   * It works exactly as ThreadPoolExecutor from java.util.concurrent package. It means that our pool starts
   * with 2 threads (core pool size) and can be growth until 3 (max pool size).
   * In additionally, 1 task can be stored in the queue. This task will be treated
   * as soon as one from 3 threads ends to execute provided task. In our case, we try to execute 5 tasks
   * in 3 places pool and 1 place queue. So the 5th task should be rejected and TaskRejectedException should be thrown.
   **/
  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
  executor.setCorePoolSize(2);
  executor.setMaxPoolSize(3);
  executor.setQueueCapacity(1);
  executor.initialize();
 
  executor.execute(new SimpleTask("ThreadPoolTask-1", Counters.threadPool, 1000));
  executor.execute(new SimpleTask("ThreadPoolTask-2", Counters.threadPool, 1000));
  executor.execute(new SimpleTask("ThreadPoolTask-3", Counters.threadPool, 1000));
  executor.execute(new SimpleTask("ThreadPoolTask-4", Counters.threadPool, 1000));
  boolean wasTre = false;
  try {
   executor.execute(new SimpleTask("ThreadPoolTask-5", Counters.threadPool, 1000));
  } catch (TaskRejectedException tre) {
   wasTre = true;
  }
  assertTrue("The last task should throw a TaskRejectedException but it wasn't", wasTre);
 
  Thread.sleep(3000);
 
  assertTrue("4 tasks should be terminated, but "+Counters.threadPool.getNb()+" were instead",
   Counters.threadPool.getNb()==4);
 }
 
}
 
class SimpleTask implements Runnable {
 private String name;
 private Counters counter;
 private int sleepTime;
  
 public SimpleTask(String name, Counters counter, int sleepTime) {
  this.name = name;
  this.counter = counter;
  this.sleepTime = sleepTime;
 }
  
 @Override
 public void run() {
  try {
   Thread.sleep(this.sleepTime);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
  this.counter.increment();
  System.out.println("Running task '"+this.name+"' in Thread "+Thread.currentThread().getName());
 }
  
 @Override
 public String toString() {
     return "Task {"+this.name+"}";
 }
}
 
enum Counters {
     
 simpleAsyncTask(0),
 syncTask(0),
 threadPool(0);
  
 private int nb;
  
 public int getNb() {
  return this.nb;
 }
  
 public synchronized void increment() {
  this.nb++;
 }
 
 private Counters(int n) {
  this.nb = n;
 }
}

在過去,我們可以有更多的執(zhí)行器可以使用(SimpleThreadPoolTaskExecutor,TimerTaskExecutor 這些都2.x 3.x的老古董了)。但都被棄用并由本地Java的執(zhí)行器取代成為Spring的首選。看看輸出的結(jié)果:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Running task 'SimpleAsyncTask-1' in Thread thread_name_prefix_____1
Running task 'SimpleAsyncTask-2' in Thread thread_name_prefix_____2
Running task 'SimpleAsyncTask-3' in Thread thread_name_prefix_____3
Running task 'SimpleAsyncTask-4' in Thread thread_name_prefix_____4
Running task 'SimpleAsyncTask-5' in Thread thread_name_prefix_____5
Running task 'SimpleAsyncTask-6' in Thread thread_name_prefix_____6
Running task 'SyncTask-1' in Thread main
Running task 'SyncTask-2' in Thread main
Running task 'SyncTask-3' in Thread main
Running task 'SyncTask-4' in Thread main
Running task 'SyncTask-5' in Thread main
Running task 'ThreadPoolTask-2' in Thread ThreadPoolTaskExecutor-2
Running task 'ThreadPoolTask-1' in Thread ThreadPoolTaskExecutor-1
Running task 'ThreadPoolTask-4' in Thread ThreadPoolTaskExecutor-3
Running task 'ThreadPoolTask-3' in Thread ThreadPoolTaskExecutor-2

以此我們可以推斷出,第一個測試為每個任務(wù)創(chuàng)建新的線程。通過使用不同的線程名稱,我們可以看到相應(yīng)區(qū)別。第二個,同步執(zhí)行器,應(yīng)該執(zhí)行所調(diào)用線程中的任務(wù)。這里可以看到'main'是主線程的名稱,它的主線程調(diào)用執(zhí)行同步所有任務(wù)。最后一種例子涉及最大可創(chuàng)建3個線程的線程池。從結(jié)果可以看到,他們也確實只有3個創(chuàng)建線程。

現(xiàn)在,我們將編寫一些單元測試來看看@Async和@Scheduled實現(xiàn)。

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations={"classpath:applicationContext-test.xml"})
@WebAppConfiguration
public class AnnotationTest {
 
 @Autowired
 private GenericApplicationContext context;
     
 @Test
 public void testScheduled() throws InterruptedException {
 
   System.out.println("Start sleeping");
   Thread.sleep(6000);
   System.out.println("Wake up !");
 
   TestScheduledTask scheduledTask = (TestScheduledTask) context.getBean("testScheduledTask");
    /**
    * Test fixed delay. It's executed every 6 seconds. The first execution is registered after application's context start.
    **/
   assertTrue("Scheduled task should be executed 2 times (1 before sleep in this method, 1 after the sleep), but was "+scheduledTask.getFixedDelayCounter(),
    scheduledTask.getFixedDelayCounter() == 2);
    
    /**
    * Test fixed rate. It's executed every 6 seconds. The first execution is registered after application's context start.
    * Unlike fixed delay, a fixed rate configuration executes one task with specified time. For example, it will execute on
    * 6 seconds delayed task at 10:30:30, 10:30:36, 10:30:42 and so on - even if the task 10:30:30 taken 30 seconds to
    * be terminated. In teh case of fixed delay, if the first task takes 30 seconds, the next will be executed 6 seconds
    * after the first one, so the execution flow will be: 10:30:30, 10:31:06, 10:31:12.
    **/
   assertTrue("Scheduled task should be executed 2 times (1 before sleep in this method, 1 after the sleep), but was "+scheduledTask.getFixedRateCounter(),
    scheduledTask.getFixedRateCounter() == 2);
    /**
    * Test fixed rate with initial delay attribute. The initialDelay attribute is set to 6 seconds. It causes that
    * scheduled method is executed 6 seconds after application's context start. In our case, it should be executed
    * only once because of previous Thread.sleep(6000) invocation.
    **/
   assertTrue("Scheduled task should be executed 1 time (0 before sleep in this method, 1 after the sleep), but was "+scheduledTask.getInitialDelayCounter(), scheduledTask.getInitialDelayCounter() == 1);
    /**
    * Test cron scheduled task. Cron is scheduled to be executed every 6 seconds. It's executed only once,
    * so we can deduce that it's not invoked directly before applications
    * context start, but only after configured time (6 seconds in our case).
    **/
   assertTrue("Scheduled task should be executed 1 time (0 before sleep in this method, 1 after the sleep), but was "+scheduledTask.getCronCounter(), scheduledTask.getCronCounter() == 1);
 }
     
 @Test
 public void testAsyc() throws InterruptedException {
    /**
    * To test @Async annotation, we can create a bean in-the-fly. AsyncCounter bean is a
    * simple counter which value should be equals to 2 at the end of the test. A supplementary test
    * concerns threads which execute both of AsyncCounter methods: one which
    * isn't annotated with @Async and another one which is annotated with it. For the first one, invoking
    * thread should have the same name as the main thread. For annotated method, it can't be executed in
    * the main thread. It must be executed asynchrounously in a new thread.
    **/
   context.registerBeanDefinition("asyncCounter", new RootBeanDefinition(AsyncCounter.class));
    
   String currentName = Thread.currentThread().getName();
   AsyncCounter asyncCounter = context.getBean("asyncCounter", AsyncCounter.class);
   asyncCounter.incrementNormal();
   assertTrue("Thread executing normal increment should be the same as JUnit thread but it wasn't (expected '"+currentName+"', got '"+asyncCounter.getNormalThreadName()+"')",
           asyncCounter.getNormalThreadName().equals(currentName));
   asyncCounter.incrementAsync();
   // sleep 50ms and give some time to AsyncCounter to update asyncThreadName value
   Thread.sleep(50);
 
   assertFalse("Thread executing @Async increment shouldn't be the same as JUnit thread but it wasn (JUnit thread '"+currentName+"', @Async thread '"+asyncCounter.getAsyncThreadName()+"')",
           asyncCounter.getAsyncThreadName().equals(currentName));
   System.out.println("Main thread execution's name: "+currentName);
   System.out.println("AsyncCounter normal increment thread execution's name: "+asyncCounter.getNormalThreadName());
   System.out.println("AsyncCounter @Async increment thread execution's name: "+asyncCounter.getAsyncThreadName());
   assertTrue("Counter should be 2, but was "+asyncCounter.getCounter(), asyncCounter.getCounter()==2);
 }
 
}
 
class AsyncCounter {
     
 private int counter = 0;
 private String normalThreadName;
 private String asyncThreadName;
  
 public void incrementNormal() {
  normalThreadName = Thread.currentThread().getName();
  this.counter++;
 }
  
 @Async
 public void incrementAsync() {
  asyncThreadName = Thread.currentThread().getName();
  this.counter++;
 }
  
 public String getAsyncThreadName() {
  return asyncThreadName;
 }
  
 public String getNormalThreadName() {
  return normalThreadName;
 }
  
 public int getCounter() {
  return this.counter;
 }
     
}

另外,我們需要創(chuàng)建新的配置文件和一個包含定時任務(wù)方法的類:

?
1
2
3
4
5
6
7
8
9
10
11
<!-- imported configuration file first -->
<!-- Activates various annotations to be detected in bean classes -->
<context:annotation-config />
 
<!-- Scans the classpath for annotated components that will be auto-registered as Spring beans.
 For example @Controller and @Service. Make sure to set the correct base-package-->
<context:component-scan base-package="com.migo.test.schedulers" />
 
<task:scheduler id="taskScheduler"/>
<task:executor id="taskExecutor" pool-size="40" />
<task:annotation-driven executor="taskExecutor" scheduler="taskScheduler"/>
?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// scheduled methods after, all are executed every 6 seconds (scheduledAtFixedRate and scheduledAtFixedDelay start to execute at
// application context start, two other methods begin 6 seconds after application's context start)
@Component
public class TestScheduledTask {
 
 private int fixedRateCounter = 0;
 private int fixedDelayCounter = 0;
 private int initialDelayCounter = 0;
 private int cronCounter = 0;
 
 @Scheduled(fixedRate = 6000)
 public void scheduledAtFixedRate() {
  System.out.println("<R> Increment at fixed rate");
  fixedRateCounter++;
 }
  
 @Scheduled(fixedDelay = 6000)
 public void scheduledAtFixedDelay() {
  System.out.println("<D> Incrementing at fixed delay");
  fixedDelayCounter++;
 }
  
 @Scheduled(fixedDelay = 6000, initialDelay = 6000)
 public void scheduledWithInitialDelay() {
  System.out.println("<DI> Incrementing with initial delay");
  initialDelayCounter++;
 }
  
 @Scheduled(cron = "**/6 ** ** ** ** **")
 public void scheduledWithCron() {
  System.out.println("<C> Incrementing with cron");
  cronCounter++;
      
 }
  
 public int getFixedRateCounter() {
  return this.fixedRateCounter;
 }
  
 public int getFixedDelayCounter() {
  return this.fixedDelayCounter;
 }
  
 public int getInitialDelayCounter() {
  return this.initialDelayCounter;
 }
  
 public int getCronCounter() {
  return this.cronCounter;
 }
     
}

該測試的輸出:

?
1
2
3
4
5
6
7
8
9
10
11
<R> Increment at fixed rate
<D> Incrementing at fixed delay
Start sleeping
<C> Incrementing with cron
<DI> Incrementing with initial delay
<R> Increment at fixed rate
<D> Incrementing at fixed delay
Wake up !
Main thread execution's name: main
AsyncCounter normal increment thread execution's name: main
AsyncCounter @Async increment thread execution's name: taskExecutor-1

本文向我們介紹了關(guān)于Spring框架另一個大家比較感興趣的功能–定時任務(wù)。我們可以看到,與Linux CRON風(fēng)格配置類似,這些任務(wù)同樣可以按照固定的頻率進行定時任務(wù)的設(shè)置。我們還通過例子證明了使用@Async注解的方法會在不同線程中執(zhí)行。

以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持服務(wù)器之家。

原文鏈接:https://muyinchen.github.io/2017/10/17/Spring5%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90-Spring%E4%B8%AD%E7%9A%84%E5%BC%82%E6%AD%A5%E5%92%8C%E8%AE%A1%E5%88%92%E4%BB%BB%E5%8A%A1/?utm_source=tuicool&utm_medium=referral

延伸 · 閱讀

精彩推薦
主站蜘蛛池模板: 91色一区二区三区 | 久久久久九九九女人毛片 | www.成人在线视频 | 欧美精品久久久久久久久老牛影院 | 欧美18一19sex性护士农村 | 欧美人与性禽动交精品 | 天天艹综合 | 国产精品久久久久久久久久iiiii | 免费在线观看国产精品 | 一级片观看| 在线看免费观看av | 91看片网页版 | 欧美成人国产va精品日本一级 | 国产羞羞视频在线观看免费应用 | 特一级黄色毛片 | 在线播放免费人成毛片乱码 | h视频在线播放 | 懂色av懂色aⅴ精彩av | 免费在线观看毛片 | 国产成人高潮免费观看精品 | 日韩视频1| 毛片免费在线 | 国内精品伊人久久久久网站 | 99一区二区 | 日本一区二区三区精品 | 亚洲日本韩国在线观看 | 国产免费传媒av片在线 | 斗破苍穹在线观看免费完整观看 | 国产亚洲精久久久久久蜜臀 | 欧美激情性色生活片在线观看 | 在线免费亚洲 | 国产成年人网站 | 毛片视频播放 | 免费人成年短视频在线观看网站 | 日日艹夜夜艹 | 激情久久免费视频 | 国产一级午夜 | 久久艹国产精品 | 久久亚洲春色中文字幕久久 | 免费看性xxx高清视频自由 | 日韩视频www|