激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務(wù)器之家:專注于服務(wù)器技術(shù)及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術(shù)|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務(wù)器之家 - 編程語言 - Java教程 - 使用spring boot 整合kafka,延遲啟動消費者

使用spring boot 整合kafka,延遲啟動消費者

2021-11-10 12:58懶貓mao Java教程

這篇文章主要介紹了使用spring boot 整合kafka,延遲啟動消費者的操作,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教

spring boot 整合kafka,延遲啟動消費者

spring boot整合kafka的時候一般使用@KafkaListener來設(shè)置消費者,但是這種方式在spring啟動的時候就會立即開啟消費者。如果有需要根據(jù)配置信息延遲開啟指定的消費者就不能使用這種方式。

參考了類:KafkaListenerAnnotationBeanPostProcessor,我提取了一部分代碼。可以根據(jù)需要隨時動態(tài)的開啟消費者。還可以很方便的啟動多個消費者。

為了方便使用,我自定義了一個注解:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.springframework.kafka.annotation.TopicPartition;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target({ ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface DelayKafkaConsumer {
    String id() default "";
    String[] topics() default {};
    String errorHandler() default "";
    String groupId() default "";
    TopicPartition[] topicPartitions() default {};
    String beanRef() default "__listener";
}

配合注解使用的factory:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.ListableBeanFactory;
import org.springframework.beans.factory.ObjectFactory;
import org.springframework.beans.factory.config.*;
import org.springframework.context.expression.StandardBeanExpressionResolver;
import org.springframework.core.MethodIntrospector;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.convert.converter.Converter;
import org.springframework.core.convert.converter.GenericConverter;
import org.springframework.format.Formatter;
import org.springframework.format.FormatterRegistry;
import org.springframework.format.support.DefaultFormattingConversionService;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.KafkaListenerErrorHandler;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.messaging.converter.GenericMessageConverter;
import org.springframework.messaging.handler.annotation.support.*;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.InvocableHandlerMethod;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
 
@Service
public class MyKafkaConsumerFactory implements KafkaListenerConfigurer,BeanFactoryAware {
    private static final Logger logger = LoggerFactory.getLogger(MyKafkaConsumerFactory.class);
    private KafkaListenerEndpointRegistrar kafkaListenerEndpointRegistrar;
    private final AtomicInteger counter = new AtomicInteger();
    private BeanFactory beanFactory;
    private BeanExpressionResolver resolver = new StandardBeanExpressionResolver();
    private BeanExpressionContext expressionContext;
    private final ListenerScope listenerScope = new ListenerScope();
    private final KafkaHandlerMethodFactoryAdapter messageHandlerMethodFactory =
            new KafkaHandlerMethodFactoryAdapter();
 
    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        this.kafkaListenerEndpointRegistrar = registrar;
        addFormatters(messageHandlerMethodFactory.defaultFormattingConversionService);
    }
 
    public void startConsumer(KafkaListenerEndpoint endpoint){
        kafkaListenerEndpointRegistrar.registerEndpoint(endpoint);
    }
 
    public void startConsumer(Object target){
        logger.info("start consumer {} ...",target.getClass());
        Class<?> targetClass = AopUtils.getTargetClass(target);
        Map<Method, Set<DelayKafkaConsumer>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                new MethodIntrospector.MetadataLookup<Set<DelayKafkaConsumer>>() {
 
                    @Override
                    public Set<DelayKafkaConsumer> inspect(Method method) {
                        Set<DelayKafkaConsumer> listenerMethods = findListenerAnnotations(method);
                        return (!listenerMethods.isEmpty() ? listenerMethods : null);
                    }
 
                });
        if (annotatedMethods.size()==0)
            throw new IllegalArgumentException(target.getClass()+" need have method with @DelayKafkaConsumer");
        for (Map.Entry<Method, Set<DelayKafkaConsumer>> entry : annotatedMethods.entrySet()) {
            Method method = entry.getKey();
            logger.info("find message listen handler method : {} , object : {}",method.getName(),target.getClass());
            for (DelayKafkaConsumer listener : entry.getValue()) {
                if(listener.topics().length==0) {
                    logger.info("topics value is empty , will skip it , method : {} , target object : {}",method.getName(),target.getClass());
                    continue;
                }
                processKafkaListener(listener,method,target);
                logger.info("register method {} success , target object : {}",method.getName(),target.getClass());
            }
        }
        logger.info("{} consumer start complete .",target.getClass());
    }
 
    protected void processKafkaListener(DelayKafkaConsumer kafkaListener, Method method, Object bean) {
        Method methodToUse = checkProxy(method, bean);
        MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint();
        endpoint.setMethod(methodToUse);
        endpoint.setBeanFactory(this.beanFactory);
        String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
        if (StringUtils.hasText(errorHandlerBeanName)) {
            endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
        }
        processListener(endpoint, kafkaListener, bean, methodToUse);
    }
 
    protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, DelayKafkaConsumer kafkaListener, Object bean,
                                   Object adminTarget) {
        String beanRef = kafkaListener.beanRef();
        if (StringUtils.hasText(beanRef)) {
            this.listenerScope.addListener(beanRef, bean);
        }
        endpoint.setBean(bean);
        endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
        endpoint.setId(getEndpointId(kafkaListener));
        endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
        endpoint.setTopics(resolveTopics(kafkaListener));
        endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
        kafkaListenerEndpointRegistrar.registerEndpoint(endpoint);
        if (StringUtils.hasText(beanRef)) {
            this.listenerScope.removeListener(beanRef);
        }
    }
 
    private String getEndpointId(DelayKafkaConsumer kafkaListener) {
        if (StringUtils.hasText(kafkaListener.id())) {
            return resolve(kafkaListener.id());
        }
        else {
            return "Custom-Consumer" + this.counter.getAndIncrement();
        }
    }
 
    private String getEndpointGroupId(DelayKafkaConsumer kafkaListener, String id) {
        String groupId = null;
        if (StringUtils.hasText(kafkaListener.groupId())) {
            groupId = resolveExpressionAsString(kafkaListener.groupId(), "groupId");
        }
        if (groupId == null && StringUtils.hasText(kafkaListener.id())) {
            groupId = id;
        }
        return groupId;
    }
 
    private String[] resolveTopics(DelayKafkaConsumer kafkaListener) {
        String[] topics = kafkaListener.topics();
        List<String> result = new ArrayList<>();
        if (topics.length > 0) {
            for (int i = 0; i < topics.length; i++) {
                Object topic = resolveExpression(topics[i]);
                resolveAsString(topic, result);
            }
        }
        return result.toArray(new String[result.size()]);
    }
 
    private void resolveAsString(Object resolvedValue, List<String> result) {
        if (resolvedValue instanceof String[]) {
            for (Object object : (String[]) resolvedValue) {
                resolveAsString(object, result);
            }
        }
        else if (resolvedValue instanceof String) {
            result.add((String) resolvedValue);
        }
        else if (resolvedValue instanceof Iterable) {
            for (Object object : (Iterable<Object>) resolvedValue) {
                resolveAsString(object, result);
            }
        }
        else {
            throw new IllegalArgumentException(String.format(
                    "@DelayKafkaConsumer can't resolve '%s' as a String", resolvedValue));
        }
    }
 
    private TopicPartitionInitialOffset[] resolveTopicPartitions(DelayKafkaConsumer kafkaListener) {
        TopicPartition[] topicPartitions = kafkaListener.topicPartitions();
        List<TopicPartitionInitialOffset> result = new ArrayList<>();
        if (topicPartitions.length > 0) {
            for (TopicPartition topicPartition : topicPartitions) {
                result.addAll(resolveTopicPartitionsList(topicPartition));
            }
        }
        return result.toArray(new TopicPartitionInitialOffset[result.size()]);
    }
 
    private List<TopicPartitionInitialOffset> resolveTopicPartitionsList(TopicPartition topicPartition) {
        Object topic = resolveExpression(topicPartition.topic());
        Assert.state(topic instanceof String,
                "topic in @TopicPartition must resolve to a String, not " + topic.getClass());
        Assert.state(StringUtils.hasText((String) topic), "topic in @TopicPartition must not be empty");
        String[] partitions = topicPartition.partitions();
        PartitionOffset[] partitionOffsets = topicPartition.partitionOffsets();
        Assert.state(partitions.length > 0 || partitionOffsets.length > 0,
                "At least one 'partition' or 'partitionOffset' required in @TopicPartition for topic '" + topic + "'");
        List<TopicPartitionInitialOffset> result = new ArrayList<>();
        for (int i = 0; i < partitions.length; i++) {
            resolvePartitionAsInteger((String) topic, resolveExpression(partitions[i]), result);
        }
 
        for (PartitionOffset partitionOffset : partitionOffsets) {
            Object partitionValue = resolveExpression(partitionOffset.partition());
            Integer partition;
            if (partitionValue instanceof String) {
                Assert.state(StringUtils.hasText((String) partitionValue),
                        "partition in @PartitionOffset for topic '" + topic + "' cannot be empty");
                partition = Integer.valueOf((String) partitionValue);
            }
            else if (partitionValue instanceof Integer) {
                partition = (Integer) partitionValue;
            }
            else {
                throw new IllegalArgumentException(String.format(
                        "@PartitionOffset for topic '%s' can't resolve '%s' as an Integer or String, resolved to '%s'",
                        topic, partitionOffset.partition(), partitionValue.getClass()));
            }
 
            Object initialOffsetValue = resolveExpression(partitionOffset.initialOffset());
            Long initialOffset;
            if (initialOffsetValue instanceof String) {
                Assert.state(StringUtils.hasText((String) initialOffsetValue),
                        "'initialOffset' in @PartitionOffset for topic '" + topic + "' cannot be empty");
                initialOffset = Long.valueOf((String) initialOffsetValue);
            }
            else if (initialOffsetValue instanceof Long) {
                initialOffset = (Long) initialOffsetValue;
            }
            else {
                throw new IllegalArgumentException(String.format(
                        "@PartitionOffset for topic '%s' can't resolve '%s' as a Long or String, resolved to '%s'",
                        topic, partitionOffset.initialOffset(), initialOffsetValue.getClass()));
            }
 
            Object relativeToCurrentValue = resolveExpression(partitionOffset.relativeToCurrent());
            Boolean relativeToCurrent;
            if (relativeToCurrentValue instanceof String) {
                relativeToCurrent = Boolean.valueOf((String) relativeToCurrentValue);
            }
            else if (relativeToCurrentValue instanceof Boolean) {
                relativeToCurrent = (Boolean) relativeToCurrentValue;
            }
            else {
                throw new IllegalArgumentException(String.format(
                        "@PartitionOffset for topic '%s' can't resolve '%s' as a Boolean or String, resolved to '%s'",
                        topic, partitionOffset.relativeToCurrent(), relativeToCurrentValue.getClass()));
            }
 
            TopicPartitionInitialOffset topicPartitionOffset =
                    new TopicPartitionInitialOffset((String) topic, partition, initialOffset, relativeToCurrent);
            if (!result.contains(topicPartitionOffset)) {
                result.add(topicPartitionOffset);
            }
            else {
                throw new IllegalArgumentException(
                        String.format("@TopicPartition can't have the same partition configuration twice: [%s]",
                                topicPartitionOffset));
            }
        }
        return result;
    }
 
    private void resolvePartitionAsInteger(String topic, Object resolvedValue,
                                           List<TopicPartitionInitialOffset> result) {
        if (resolvedValue instanceof String[]) {
            for (Object object : (String[]) resolvedValue) {
                resolvePartitionAsInteger(topic, object, result);
            }
        }
        else if (resolvedValue instanceof String) {
            Assert.state(StringUtils.hasText((String) resolvedValue),
                    "partition in @TopicPartition for topic '" + topic + "' cannot be empty");
            result.add(new TopicPartitionInitialOffset(topic, Integer.valueOf((String) resolvedValue)));
        }
        else if (resolvedValue instanceof Integer[]) {
            for (Integer partition : (Integer[]) resolvedValue) {
                result.add(new TopicPartitionInitialOffset(topic, partition));
            }
        }
        else if (resolvedValue instanceof Integer) {
            result.add(new TopicPartitionInitialOffset(topic, (Integer) resolvedValue));
        }
        else if (resolvedValue instanceof Iterable) {
            for (Object object : (Iterable<Object>) resolvedValue) {
                resolvePartitionAsInteger(topic, object, result);
            }
        }
        else {
            throw new IllegalArgumentException(String.format(
                    "@DelayKafkaConsumer for topic '%s' can't resolve '%s' as an Integer or String", topic, resolvedValue));
        }
    }
 
    private Set<DelayKafkaConsumer> findListenerAnnotations(Method method) {
        Set<DelayKafkaConsumer> listeners = new HashSet<>();
        DelayKafkaConsumer ann = AnnotationUtils.findAnnotation(method, DelayKafkaConsumer.class);
        if (ann != null) {
            listeners.add(ann);
        }
        return listeners;
    }
 
    private Method checkProxy(Method methodArg, Object bean) {
        Method method = methodArg;
        if (AopUtils.isJdkDynamicProxy(bean)) {
            try {
                method = bean.getClass().getMethod(method.getName(), method.getParameterTypes());
                Class<?>[] proxiedInterfaces = ((Advised) bean).getProxiedInterfaces();
                for (Class<?> iface : proxiedInterfaces) {
                    try {
                        method = iface.getMethod(method.getName(), method.getParameterTypes());
                        break;
                    }
                    catch (NoSuchMethodException noMethod) {
                    }
                }
            }
            catch (SecurityException ex) {
                ReflectionUtils.handleReflectionException(ex);
            }
            catch (NoSuchMethodException ex) {
                throw new IllegalStateException(String.format(
                        "target method '%s' found on bean target class '%s', " +
                                "but not found in any interface(s) for bean JDK proxy. Either " +
                                "pull the method up to an interface or switch to subclass (CGLIB) " +
                                "proxies by setting proxy-target-class/proxyTargetClass " +
                                "attribute to 'true'", method.getName(), method.getDeclaringClass().getSimpleName()), ex);
            }
        }
        return method;
    }
 
    @Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        this.beanFactory = beanFactory;
        if (beanFactory instanceof ConfigurableListableBeanFactory) {
            this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();
            this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory,
                    this.listenerScope);
        }
    }
 
    private String resolveExpressionAsString(String value, String attribute) {
        Object resolved = resolveExpression(value);
        if (resolved instanceof String) {
            return (String) resolved;
        }
        else {
            throw new IllegalStateException("The [" + attribute + "] must resolve to a String. "
                    + "Resolved to [" + resolved.getClass() + "] for [" + value + "]");
        }
    }
 
    private Object resolveExpression(String value) {
        String resolvedValue = resolve(value);
        return this.resolver.evaluate(resolvedValue, this.expressionContext);
    }
 
    /**
     * Resolve the specified value if possible.
     * @param value the value to resolve
     * @return the resolved value
     * @see ConfigurableBeanFactory#resolveEmbeddedValue
     */
    private String resolve(String value) {
        if (this.beanFactory instanceof ConfigurableBeanFactory) {
            return ((ConfigurableBeanFactory) this.beanFactory).resolveEmbeddedValue(value);
        }
        return value;
    }
 
    private void addFormatters(FormatterRegistry registry) {
        for (Converter<?, ?> converter : getBeansOfType(Converter.class)) {
            registry.addConverter(converter);
        }
        for (GenericConverter converter : getBeansOfType(GenericConverter.class)) {
            registry.addConverter(converter);
        }
        for (org.springframework.format.Formatter<?> formatter : getBeansOfType(Formatter.class)) {
            registry.addFormatter(formatter);
        }
    }
 
    private <T> Collection<T> getBeansOfType(Class<T> type) {
        if (this.beanFactory instanceof ListableBeanFactory) {
            return ((ListableBeanFactory) this.beanFactory).getBeansOfType(type).values();
        }else {
            return Collections.emptySet();
        }
    }
 
    private static class ListenerScope implements Scope {
        private final Map<String, Object> listeners = new HashMap<>();
        ListenerScope() {
            super();
        }
 
        public void addListener(String key, Object bean) {
            this.listeners.put(key, bean);
        }
 
        public void removeListener(String key) {
            this.listeners.remove(key);
        }
 
        @Override
        public Object get(String name, ObjectFactory<?> objectFactory) {
            return this.listeners.get(name);
        }
 
        @Override
        public Object remove(String name) {
            return null;
        }
 
        @Override
        public void registerDestructionCallback(String name, Runnable callback) {
        }
 
        @Override
        public Object resolveContextualObject(String key) {
            return this.listeners.get(key);
        }
 
        @Override
        public String getConversationId() {
            return null;
        }
 
    }
 
    private class KafkaHandlerMethodFactoryAdapter implements MessageHandlerMethodFactory {
 
        private final DefaultFormattingConversionService defaultFormattingConversionService = new DefaultFormattingConversionService();
 
        private MessageHandlerMethodFactory messageHandlerMethodFactory;
 
        public void setMessageHandlerMethodFactory(MessageHandlerMethodFactory kafkaHandlerMethodFactory1) {
            this.messageHandlerMethodFactory = kafkaHandlerMethodFactory1;
        }
 
        @Override
        public InvocableHandlerMethod createInvocableHandlerMethod(Object bean, Method method) {
            return getMessageHandlerMethodFactory().createInvocableHandlerMethod(bean, method);
        }
 
        private MessageHandlerMethodFactory getMessageHandlerMethodFactory() {
            if (this.messageHandlerMethodFactory == null) {
                this.messageHandlerMethodFactory = createDefaultMessageHandlerMethodFactory();
            }
            return this.messageHandlerMethodFactory;
        }
 
        private MessageHandlerMethodFactory createDefaultMessageHandlerMethodFactory() {
            DefaultMessageHandlerMethodFactory defaultFactory = new DefaultMessageHandlerMethodFactory();
            defaultFactory.setBeanFactory(MyKafkaConsumerFactory.this.beanFactory);
 
            ConfigurableBeanFactory cbf =
                    (MyKafkaConsumerFactory.this.beanFactory instanceof ConfigurableBeanFactory ?
                            (ConfigurableBeanFactory) MyKafkaConsumerFactory.this.beanFactory : null);
 
            defaultFactory.setConversionService(this.defaultFormattingConversionService);
            List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<>();
 
            // Annotation-based argument resolution
            argumentResolvers.add(new HeaderMethodArgumentResolver(this.defaultFormattingConversionService, cbf));
            argumentResolvers.add(new HeadersMethodArgumentResolver());
 
            // Type-based argument resolution
            final GenericMessageConverter messageConverter = new GenericMessageConverter(this.defaultFormattingConversionService);
            argumentResolvers.add(new MessageMethodArgumentResolver(messageConverter));
            argumentResolvers.add(new PayloadArgumentResolver(messageConverter) {
 
                @Override
                protected boolean isEmptyPayload(Object payload) {
                    return payload == null || payload instanceof KafkaNull;
                }
 
            });
            defaultFactory.setArgumentResolvers(argumentResolvers);
            defaultFactory.afterPropertiesSet();
            return defaultFactory;
        }
    }
}

通過startConsumer來啟動一個消費者(多次調(diào)用會啟動多個消費者)。target必須至少包含一個有@DelayKafkaConsumer注解的方法。這里類似@KafkaListener。我去掉了一部分功能,保留了比較常用的部分。

這里提供了一個通過注解的方式在spring boot項目中動態(tài)控制consumer的方法。還有其他的方法來達到這種效果,不過我覺得這種方法比較方便。

java項目集成springboot使用kafka消費者,啟動失敗報錯 Failed to construct kafka consumer

之前博客里面提到本公司為物聯(lián)網(wǎng)項目。項目中使用mqtt+kafka進行與設(shè)備端的通訊,之前的協(xié)議格式為json格式,現(xiàn)在改成字節(jié)數(shù)組byte[]格式進行通信。

集成springboot后,具體的demo網(wǎng)上很多,接下來有時間會出一份kafka的demo。

報錯信息如下:

Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry';
nested exception is org.apache.kafka.common.KafkaException:Failed to construct kafka consumer

原因分析:

之前json格式通信時候,構(gòu)建kafka消費工廠的時候,其中ConcurrentMessageListenerContainer的key為String類型,而value現(xiàn)在為byte[]類型,所以構(gòu)建消費者工廠的時候需要指定正確的value類型。

代碼如下:

?
1
2
3
4
5
6
7
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, byte[]>> kafkaListenerContainerByteFactory() {
    ConcurrentKafkaListenerContainerFactory<String, byte[]> factory = new ConcurrentKafkaListenerContainerFactory<String, byte[]>();
    factory.setConsumerFactory(consumerByteFactory());
    factory.setConcurrency(concurrency);
    factory.getContainerProperties().setPollTimeout(1500);
    return factory;
   }

整體kafka生產(chǎn)者+kafka消費者的demo會在接下來的博客中陸續(xù)整理。

以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持服務(wù)器之家。

原文鏈接:https://blog.csdn.net/weixin_42170534/article/details/80892411

延伸 · 閱讀

精彩推薦
  • Java教程Java使用SAX解析xml的示例

    Java使用SAX解析xml的示例

    這篇文章主要介紹了Java使用SAX解析xml的示例,幫助大家更好的理解和學習使用Java,感興趣的朋友可以了解下...

    大行者10067412021-08-30
  • Java教程小米推送Java代碼

    小米推送Java代碼

    今天小編就為大家分享一篇關(guān)于小米推送Java代碼,小編覺得內(nèi)容挺不錯的,現(xiàn)在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧...

    富貴穩(wěn)中求8032021-07-12
  • Java教程升級IDEA后Lombok不能使用的解決方法

    升級IDEA后Lombok不能使用的解決方法

    最近看到提示IDEA提示升級,尋思已經(jīng)有好久沒有升過級了。升級完畢重啟之后,突然發(fā)現(xiàn)好多錯誤,本文就來介紹一下如何解決,感興趣的可以了解一下...

    程序猿DD9332021-10-08
  • Java教程20個非常實用的Java程序代碼片段

    20個非常實用的Java程序代碼片段

    這篇文章主要為大家分享了20個非常實用的Java程序片段,對java開發(fā)項目有所幫助,感興趣的小伙伴們可以參考一下 ...

    lijiao5352020-04-06
  • Java教程Java實現(xiàn)搶紅包功能

    Java實現(xiàn)搶紅包功能

    這篇文章主要為大家詳細介紹了Java實現(xiàn)搶紅包功能,采用多線程模擬多人同時搶紅包,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙...

    littleschemer13532021-05-16
  • Java教程xml與Java對象的轉(zhuǎn)換詳解

    xml與Java對象的轉(zhuǎn)換詳解

    這篇文章主要介紹了xml與Java對象的轉(zhuǎn)換詳解的相關(guān)資料,需要的朋友可以參考下...

    Java教程網(wǎng)2942020-09-17
  • Java教程Java BufferWriter寫文件寫不進去或缺失數(shù)據(jù)的解決

    Java BufferWriter寫文件寫不進去或缺失數(shù)據(jù)的解決

    這篇文章主要介紹了Java BufferWriter寫文件寫不進去或缺失數(shù)據(jù)的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望...

    spcoder14552021-10-18
  • Java教程Java8中Stream使用的一個注意事項

    Java8中Stream使用的一個注意事項

    最近在工作中發(fā)現(xiàn)了對于集合操作轉(zhuǎn)換的神器,java8新特性 stream,但在使用中遇到了一個非常重要的注意點,所以這篇文章主要給大家介紹了關(guān)于Java8中S...

    阿杜7482021-02-04
主站蜘蛛池模板: 久久久久中精品中文字幕19 | 美女羞羞视频在线观看 | 亚洲一区久久 | 午夜久久视频 | 国产成人综合在线视频 | 日本一区二区视频在线 | 18被视频免费观看视频 | 国产乱淫a∨片免费视频 | 欧美一区二区三区不卡免费观看 | 2021国产精品| 免费黄网站在线播放 | 福利片在线看 | 国产午夜精品久久久久 | 91精品国产综合久久婷婷香蕉 | 最新日本中文字幕在线观看 | 蜜桃91麻豆 | 欧美特黄三级成人 | 小雪奶水翁胀公吸小说最新章节 | 青草伊人网 | 99亚洲伊人久久精品影院红桃 | 久久色网站 | 成人免费淫片视频软件 | 久草最新在线 | 免费观看一级黄色片 | 久久久综合视频 | 免费黄色在线电影 | 欧美乱淫| 久久久免费电影 | 在线播放免费视频 | 高清av免费 | 午夜精品区 | 成人av一二三区 | 成人毛片免费看 | 久久综合入口 | 91情侣偷在线精品国产 | 欧美日韩在线视频一区二区 | 国产成人免费高清激情视频 | 国产精品久久久久久久久久大牛 | 国产一级毛片a | 黄色网络免费看 | av电影在线网 |