聊聊Disruptor 和 Aeron 这两个开源库 (2)

The main goal of Aeron is high performance. That is why it makes sense why it can work over UDP but doesn’t support TCP. But someone can ask what features Aeron provides on top of UDP?

Aeron is OSI Layer 4 (Transport) Services. It supports next features:

  1. Connection Oriented Communication

  2. Reliability

  3. Flow Control

  4. Congestion Avoidance/Control

  5. Multiplexing

Architecture

Aeron uses unidirectional connections. If you need to send requests and receive responses, you should use two connections.Publisher and Media Driver (see later) are used to send a message, Subscriber and Media Driver — to receive. Client talks to Media Driver via shared memory.

 

聊聊Disruptor 和 Aeron 这两个开源库

 

 

 

 

为什么将这两个库在一起聊呢,通常Disruptor框架的核心数据结构circular buffer是Java实现的,主要原因是Java对并发的支持比较友好,而且比较早的支持了内存模型,但是 C++ 11以后,C++ 同样在并发方面有了长足的进步, 

Aeron就是在这个背景下产生的,虽然核心代码依旧是Java,但是在对C++客户端支持的设计中也实现了比较多的,有价值的数据结构,比如OneToOneRingBuffer, ManyToOneRingBuffer。

 

下面就以OneToOneRingBuffer为例进行说明,

 

/* * Copyright 2014-2020 Real Logic Limited. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * https://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef AERON_RING_BUFFER_ONE_TO_ONE_H #define AERON_RING_BUFFER_ONE_TO_ONE_H #include <climits> #include <functional> #include <algorithm> #include "util/Index.h" #include "util/LangUtil.h" #include "AtomicBuffer.h" #include "Atomic64.h" #include "RingBufferDescriptor.h" #include "RecordDescriptor.h" namespace aeron { namespace concurrent { namespace ringbuffer { class OneToOneRingBuffer { public: explicit OneToOneRingBuffer(concurrent::AtomicBuffer &buffer) : m_buffer(buffer) { m_capacity = buffer.capacity() - RingBufferDescriptor::TRAILER_LENGTH; RingBufferDescriptor::checkCapacity(m_capacity); m_maxMsgLength = m_capacity / 8; m_tailPositionIndex = m_capacity + RingBufferDescriptor::TAIL_POSITION_OFFSET; m_headCachePositionIndex = m_capacity + RingBufferDescriptor::HEAD_CACHE_POSITION_OFFSET; m_headPositionIndex = m_capacity + RingBufferDescriptor::HEAD_POSITION_OFFSET; m_correlationIdCounterIndex = m_capacity + RingBufferDescriptor::CORRELATION_COUNTER_OFFSET; m_consumerHeartbeatIndex = m_capacity + RingBufferDescriptor::CONSUMER_HEARTBEAT_OFFSET; } OneToOneRingBuffer(const OneToOneRingBuffer &) = delete; OneToOneRingBuffer &operator=(const OneToOneRingBuffer &) = delete; inline util::index_t capacity() const { return m_capacity; } bool write(std::int32_t msgTypeId, concurrent::AtomicBuffer &srcBuffer, util::index_t srcIndex, util::index_t length) { RecordDescriptor::checkMsgTypeId(msgTypeId); checkMsgLength(length); const util::index_t recordLength = length + RecordDescriptor::HEADER_LENGTH; const util::index_t requiredCapacity = util::BitUtil::align(recordLength, RecordDescriptor::ALIGNMENT); const util::index_t mask = m_capacity - 1; std::int64_t head = m_buffer.getInt64(m_headCachePositionIndex); std::int64_t tail = m_buffer.getInt64(m_tailPositionIndex); const util::index_t availableCapacity = m_capacity - (util::index_t)(tail - head); if (requiredCapacity > availableCapacity) { head = m_buffer.getInt64Volatile(m_headPositionIndex); if (requiredCapacity > (m_capacity - (util::index_t)(tail - head))) { return false; } m_buffer.putInt64(m_headCachePositionIndex, head); } util::index_t padding = 0; auto recordIndex = static_cast<util::index_t>(tail & mask); const util::index_t toBufferEndLength = m_capacity - recordIndex; if (requiredCapacity > toBufferEndLength) { auto headIndex = static_cast<std::int32_t>(head & mask); if (requiredCapacity > headIndex) { head = m_buffer.getInt64Volatile(m_headPositionIndex); headIndex = static_cast<std::int32_t>(head & mask); if (requiredCapacity > headIndex) { return false; } m_buffer.putInt64Ordered(m_headCachePositionIndex, head); } padding = toBufferEndLength; } if (0 != padding) { m_buffer.putInt64Ordered( recordIndex, RecordDescriptor::makeHeader(padding, RecordDescriptor::PADDING_MSG_TYPE_ID)); recordIndex = 0; } m_buffer.putBytes(RecordDescriptor::encodedMsgOffset(recordIndex), srcBuffer, srcIndex, length); m_buffer.putInt64Ordered(recordIndex, RecordDescriptor::makeHeader(recordLength, msgTypeId)); m_buffer.putInt64Ordered(m_tailPositionIndex, tail + requiredCapacity + padding); return true; } int read(const handler_t &handler, int messageCountLimit) { const std::int64_t head = m_buffer.getInt64(m_headPositionIndex); const auto headIndex = static_cast<std::int32_t>(head & (m_capacity - 1)); const std::int32_t contiguousBlockLength = m_capacity - headIndex; int messagesRead = 0; int bytesRead = 0; auto cleanup = util::InvokeOnScopeExit { [&]() { if (bytesRead != 0) { m_buffer.setMemory(headIndex, bytesRead, 0); m_buffer.putInt64Ordered(m_headPositionIndex, head + bytesRead); } }}; while ((bytesRead < contiguousBlockLength) && (messagesRead < messageCountLimit)) { const std::int32_t recordIndex = headIndex + bytesRead; const std::int64_t header = m_buffer.getInt64Volatile(recordIndex); const std::int32_t recordLength = RecordDescriptor::recordLength(header); if (recordLength <= 0) { break; } bytesRead += util::BitUtil::align(recordLength, RecordDescriptor::ALIGNMENT); const std::int32_t msgTypeId = RecordDescriptor::messageTypeId(header); if (RecordDescriptor::PADDING_MSG_TYPE_ID == msgTypeId) { continue; } ++messagesRead; handler( msgTypeId, m_buffer, RecordDescriptor::encodedMsgOffset(recordIndex), recordLength - RecordDescriptor::HEADER_LENGTH); } return messagesRead; } inline int read(const handler_t &handler) { return read(handler, INT_MAX); } inline util::index_t maxMsgLength() const { return m_maxMsgLength; } inline std::int64_t nextCorrelationId() { return m_buffer.getAndAddInt64(m_correlationIdCounterIndex, 1); } inline void consumerHeartbeatTime(std::int64_t time) { m_buffer.putInt64Ordered(m_consumerHeartbeatIndex, time); } inline std::int64_t consumerHeartbeatTime() const { return m_buffer.getInt64Volatile(m_consumerHeartbeatIndex); } inline std::int64_t producerPosition() const { return m_buffer.getInt64Volatile(m_tailPositionIndex); } inline std::int64_t consumerPosition() const { return m_buffer.getInt64Volatile(m_headPositionIndex); } inline std::int32_t size() const { std::int64_t headBefore; std::int64_t tail; std::int64_t headAfter = m_buffer.getInt64Volatile(m_headPositionIndex); do { headBefore = headAfter; tail = m_buffer.getInt64Volatile(m_tailPositionIndex); headAfter = m_buffer.getInt64Volatile(m_headPositionIndex); } while (headAfter != headBefore); return static_cast<std::int32_t>(tail - headAfter); } bool unblock() { return false; } private: concurrent::AtomicBuffer &m_buffer; util::index_t m_capacity; util::index_t m_maxMsgLength; util::index_t m_headPositionIndex; util::index_t m_headCachePositionIndex; util::index_t m_tailPositionIndex; util::index_t m_correlationIdCounterIndex; util::index_t m_consumerHeartbeatIndex; inline void checkMsgLength(util::index_t length) const { if (length > m_maxMsgLength) { throw util::IllegalArgumentException( "encoded message exceeds maxMsgLength of " + std::to_string(m_maxMsgLength) + " length=" + std::to_string(length), SOURCEINFO); } } }; }}} #endif

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wppfyz.html