聊聊Postgres中的IPC之SI Message Queue (3)

如下图所示,minMsgmun和MaxMsgmim就像两个指针,它们区分出了哪些无效消息已经被所有的进程读取以及哪些消息还在等待某些进程读取。在minMsgnum之前的消息已经被所有进程读完;maxMsgnum之后的区域尚未使用;两者之间的消息是还没有被所有进程读完的。当有进程调用函数SendSharedlnvalidMessage将其产生的无效消息添加到shmInvalBuffer中时,maxMsgnum就开始向后移动。SendSharedlnvalidMessage中将调用SIInsertDataEntries来完成无效消息的插人。

聊聊Postgres中的IPC之SI Message Queue

在向SI Message队列中插入无效消息时,可能出现可用空间不够的情况(此时队列中全是没有完全被读取完毕的无效消息),需要清空一部分未处理无效消息,这个操作称为清理无效消息队列,只有当当前消息数与将要插人消息数之和超过shmInvalBuffer中nextThreshold时才会进行清理操作。这时,那些还没有处理完SI Message队列中无效消息的进程将收到清理通知,然后这些进程将抛弃其Cache中的所有元组(相当于重新载人Cache的内容)。

显然,让所有进程重载Cache会导致较高的I/O次数。为了减少重载Cache的次数,PostgreSQL会在无效消息队列中设置两个界限值lowbound和minsig,其计算方式如下:

• lowbound=maxMsgNum-MAXNUMMESSAGES+minFree,其中 minFree 为需要释放的队列空间的最小值(minFree指出了需要在无效消息队列中清理出多少个空位用于容纳新的无效消息)。

• minsig = maxMsgNum-MAXNUMMESSAGES/2,这里给出的是minsig的初始值,在进程重载过程中minsig会进行调整。
SICleanupQueue

/* * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the * furthest-back backend that needs signaling (if any), and reset any * backends that are too far back. Note that because we ignore sendOnly * backends here it is possible for them to keep sending messages without * a problem even when they are the only active backend. */ min = segP->maxMsgNum; minsig = min - SIG_THRESHOLD; lowbound = min - MAXNUMMESSAGES + minFree;

可以看到,lowbound实际上给出了此次清理过程中必须要释放的空间的位置,这是一个强制性的限制,nextMsgNum值低于lowbound的进程都将其resetState字段置为真,这些进程将会自动进行重载Cache的工作。对于那些nextMsgNum值介于lowbound和minaig之间的进程,虽然它们并不影响本次淸理,但是为了尽量避免经常进行清理操作,会要求这些进程加快处理无效消息的进度(CatchUp)。淸理操作会找出这些进程中进度最慢的一个,向它发送SIGUSR1信号。该进程接收到SIGUSR1后会一次性处理完所有的无效消息,然后继续向下一个进度最慢的进程发送SIGUSR1让它也加快处理进度。

清理无效消息队列的工作由函数SICleanupQueue实现,该函数的minFree参数给出了这一次淸理操作至少需要释放出的空间大小。该函数的流程如下:

SICleanupQueue ->SendProcSignal

1)计算 lowbound 和 minsig 的值。

2) 对每一个进程的ProcState结构进行检査,将nextMsgNum低于lowbound的进程resetState字段设置为true,并在nextMsgNum介于lowboumi和minsig之间的进程中找出进度最慢的一个。

3) 重新计算nextThreshoW参数。

4) 向步骤2中找到的进度最慢的进程发送SIGUSR1信号。

Postgres进程通过函数ProcessCatchupInterrupt来处理SIGUSR1信号,该函数最终将调用ReceiveSharedlnvalidMessages来处理所有未处理的无效消息,最后调用SICleanupQueue (minFree参数为0)向下一个进度最慢的进程发送SIGUSR1信号(调用栈如下)。

ProcessCatchupInterrupt ->AcceptInvalidationMessages ->ReceiveSharedInvalidMessages ->SICleanupQueue

每个进程在需要刷新其Cache时也会调用ReceiveSharedInvalidMessages函数用于读取并处理无效消息,函数参数为两个函数指针:

1) invalFunction:用于处理一条无效消息。

2) resetFunction:将该后台进程的Cache元组全部抛弃。

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/wpjgxx.html