Java中的队列(Queue)是提供该功能的一种简单的数据结构,同时为简化队列操作的并发访问处理,我们选择了它的一个子类LinkedBlockingDeque。该类提供了对数据的插入、获取、查询等操作,其底层将数据以链表的形式保存。如果用 offer方法插入数据时队列没满,则数据插入成功,并立 即返回:如果队列满了,则直接返回 false。 如果用 poll方法删除数据时队列不为空, 则返回队 列头部的数据;如果队列为空,则立刻返回 null。
消息格式定义队列消息接口定义(QueueMsg)
/** * @author james mu * @date 2020/7/27 22:00 */ public interface QueueMsg { //消息键 String getKey(); //消息头 QueueMsgHeaders getHeaders(); //消息负载byte数组 byte[] getData(); }队列消息头接口定义(QueueMsgHeaders)
import java.util.Map; /** * @author james mu * @date 2020/7/27 21:55 */ public interface QueueMsgHeaders { //消息头放入 byte[] put(String key, byte[] value); //消息头通过key获取byte数组 byte[] get(String key); //消息头数据全部读取方法 Map<String, byte[]> getData(); }队列消息格式(ProtoQueueMsg)
/** * @author jamesmsw * @date 2021/2/19 2:23 下午 */ public class ProtoQueueMsg implements QueueMsg { private final String key; private final String value; private final QueueMsgHeaders headers; public ProtoQueueMsg(String key, String value) { this(key, value, new DefaultQueueMsgHeaders()); } public ProtoQueueMsg(String key, String value, QueueMsgHeaders headers) { this.key = key; this.value = value; this.headers = headers; } @Override public String getKey() { return key; } @Override public QueueMsgHeaders getHeaders() { return headers; } @Override public byte[] getData() { return value.getBytes(); } }默认队列消息头(DefaultQueueMsgHeaders)
import java.util.HashMap; import java.util.Map; /** * @author james mu * @date 2020/7/27 21:57 */ public class DefaultQueueMsgHeaders implements QueueMsgHeaders { protected final Map<String, byte[]> data = new HashMap<>(); @Override public byte[] put(String key, byte[] value) { return data.put(key, value); } @Override public byte[] get(String key) { return data.get(key); } @Override public Map<String, byte[]> getData() { return data; } } 消息生产者 import iot.technology.mqtt.storage.msg.QueueMsg; import iot.technology.mqtt.storage.queue.QueueCallback; /** * @author james mu * @date 2020/8/31 11:05 */ public class Producer<T extends QueueMsg> { private final InMemoryStorage storage = InMemoryStorage.getInstance(); private final String defaultTopic; public Producer(String defaultTopic) { this.defaultTopic = defaultTopic; } public void send(String topicName, T msg) { boolean result = storage.put(topicName, msg); } } 消息消费者 import lombok.extern.slf4j.Slf4j; import java.util.Collections; import java.util.List; import java.util.Set; import java.util.stream.Collectors; /** * @author james mu * @date 2020/8/31 11:23 */ @Slf4j public class Consumer<T extends QueueMsg> { private final InMemoryStorage storage = InMemoryStorage.getInstance(); private volatile Set<String> topics; private volatile boolean stopped; private volatile boolean subscribed; private final String topic; //虚构函数 public Consumer(String topic) { this.topic = topic; stopped = false; } public String getTopic() { return topic; } public void subscribe() { topics = Collections.singleton(topic); subscribed = true; } //批量订阅主题 public void subscribe(Set<String> topics) { this.topics = topics; subscribed = true; } public void unsubscribe() { stopped = true; } //不断读取topic集合下阻塞队列中的数据集合 public List<T> poll(long durationInMillis) { if (subscribed) { List<T> messages = topics .stream() .map(storage::get) .flatMap(List::stream) .map(msg -> (T) msg).collect(Collectors.toList()); if (messages.size() > 0) { return messages; } try { Thread.sleep(durationInMillis); } catch (InterruptedException e) { if (!stopped) { log.error("Failed to sleep.", e); } } } return Collections.emptyList(); } }至此,一个简单的消息队列中就实现完毕了。