Java:基于websocket实现Kafka消息推送

发布于:2021-10-16 09:28:18

1 pom.xml




javax
javaee-api
7.0



org.apache.kafka
kafka_2.9.2
0.8.1.1


org.apache.kafka
kafka-clients
RELEASE


org.slf4j
slf4j-nop
1.7.2



2 web套接字
websocket

package com.kafkaweb;

import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;

import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;

@ServerEndpoint("/websocket")
public class WebSocketTest {
private static int onlineCount = 0;
public static CopyOnWriteArraySet webSocketSet = new CopyOnWriteArraySet();
private Session session;
@OnOpen
public void onOpen(Session session){
this.session = session;
webSocketSet.add(this); //加入set中
addOnlineCount(); //在线数加1
System.out.println("有新连接加入!当前在线人数为" + getOnlineCount());
}

@OnClose
public void onClose(){
webSocketSet.remove(this); //从set中删除
subOnlineCount(); //在线数减1
System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount());
}

@OnMessage
public void onMessage(String message, Session session) {
System.out.println("来自客户端的消息:" + message);
//群发消息
for(WebSocketTest item: webSocketSet){
try {
item.sendMessage(message);
} catch (IOException e) {
e.printStackTrace();
continue;
}
}
}

@OnError
public void onError(Session session, Throwable error){
System.out.println("发生错误");
error.printStackTrace();
}


public void sendMessage(String message) throws IOException{
this.session.getBasicRemote().sendText(message);
}

public static synchronized int getOnlineCount() {
return onlineCount;
}

public static synchronized void addOnlineCount() {
WebSocketTest.onlineCount++;
}
public static synchronized void subOnlineCount() {
WebSocketTest.onlineCount--;
}
}

3 生产者

package com.kafkaweb;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

public class ProducerKafka {
private final Producer producer;
public final static String TOPIC = "kafkaTopicWebSocket";

private ProducerKafka(){
Properties props = new Properties();
//此处配置的是kafka的端口
props.put("metadata.broker.list", "127.0.0.1:9092");

//配置value的序列化类
props.put("serializer.class", "kafka.serializer.StringEncoder");
//配置key的序列化类
props.put("key.serializer.class", "kafka.serializer.StringEncoder");

props.put("request.required.acks","-1");
producer = new Producer(new ProducerConfig(props));
}

void produce() {
int messageNo = 100;
final int COUNT = 10000000;

while (messageNo < COUNT) {
String key = String.valueOf(messageNo);
String data = "hello kafka message " + key;
producer.send(new KeyedMessage(TOPIC, key ,data));
System.out.println(data);
messageNo ++;
}
}
//http://www.open-open.com/lib/view/open1412991579999.html
public static void main( String[] args ) {
ProducerKafka sendMsg = new ProducerKafka();
sendMsg.produce();
}
}

4 消费者

package com.kafkaweb;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import javax.websocket.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Arrays;
import java.io.IOException;

import static com.kafkaweb.WebSocketTest.webSocketSet;

public class ConsumerKafka extends Thread {
// 消费者
private final ConsumerConnector consumer;

public ConsumerKafka(){
Properties props = new Properties();
props.put("zookeeper.connect", "127.0.0.1:2181");
props.put("group.id", "jd-group");
props.put("zookeeper.session.timeout.ms", "4000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
props.put("serializer.class", "kafka.serializer.StringEncoder");
// 配置生效
ConsumerConfig config = new ConsumerConfig(props);
// 消费者配置
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);

}

@Override
public void run(){
Map topicCountMap = new HashMap<>();
topicCountMap.put(ProducerKafka.TOPIC, new Integer(1));
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());

Map>> consumerMap =
consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
KafkaStream stream = consumerMap.get(ProducerKafka.TOPIC).get(0);

while(true){
try{
ConsumerIterator it = stream.iterator();
System.out.println(it.next().message());
for(WebSocketTest webSocket: webSocketSet){
webSocket.sendMessage(it.next().message());
}
}catch (IOException e){
System.out.println(e.getMessage());
continue;
}
}
//供测试用,若通过tomcat启动需通过其他方法启动线程
public static void main(String[] args){
ConsumerKafka consumerKafka = new ConsumerKafka();
consumerKafka.start();
}
}

5 启动流程



启动zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

启动kafka

bin/kafka-server-start.sh config/server.properties

启动消费者
项目中的RunThread启动Tomcat
配置Tomcat,启动
Tomcat配置参考:Maven部署JavaWeb启动生产者
项目中的ProducerKafka启动Run即可
运行生产者参考:Maven部署Kafka

传送门:https://github.com/xindaqi/java.git



【参考文献】
[1]https://juejin.im/post/5b20e2e16fb9a01e2c698c51
[2]https://www.cnblogs.com/xdp-gacl/p/5193279.html
[3]https://blog.csdn.net/weixin_38175358/article/details/85105730
[4]https://blog.csdn.net/Xin_101/article/details/97813718
[5]https://blog.csdn.net/Xin_101/article/details/97938371

相关推荐

最新更新

猜你喜欢