From 232404fa86c157ec7e13808b20691ee170ed745b Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 29 Mar 2022 15:21:48 +0800 Subject: [PATCH] shm queue --- source/util/src/tprocess.c | 49 ++++++++++++++++++-------------------- 1 file changed, 23 insertions(+), 26 deletions(-) diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 2675c6cdf2..b2d4d1eddc 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -208,16 +208,11 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) { } static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, const char *pBody, - int32_t rawBodyLen, ProcFuncType funcType) { + int32_t rawBodyLen, ProcFuncType ftype) { const int32_t headLen = CEIL8(rawHeadLen); const int32_t bodyLen = CEIL8(rawBodyLen); const int32_t fullLen = headLen + bodyLen + 8; - if (headLen <= 0 || bodyLen <= 0) { - terrno = TSDB_CODE_INVALID_PARA; - return -1; - } - taosThreadMutexLock(pQueue->mutex); if (fullLen > pQueue->avail) { taosThreadMutexUnlock(pQueue->mutex); @@ -225,13 +220,14 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t return -1; } + const int32_t pos = pQueue->tail; if (pQueue->tail < pQueue->total) { - *(int16_t *)(pQueue->pBuffer + pQueue->head) = headLen; - *(int8_t *)(pQueue->pBuffer + pQueue->head + 2) = (int8_t)funcType; - *(int32_t *)(pQueue->pBuffer + pQueue->head + 4) = bodyLen; + *(int16_t *)(pQueue->pBuffer + pQueue->tail) = headLen; + *(int8_t *)(pQueue->pBuffer + pQueue->tail + 2) = (int8_t)ftype; + *(int32_t *)(pQueue->pBuffer + pQueue->tail + 4) = bodyLen; } else { *(int16_t *)(pQueue->pBuffer) = headLen; - *(int8_t *)(pQueue->pBuffer + pQueue->head + 2) = (int8_t)funcType; + *(int8_t *)(pQueue->pBuffer + 2) = (int8_t)ftype; *(int32_t *)(pQueue->pBuffer + 4) = bodyLen; } @@ -250,19 +246,19 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t memcpy(pQueue->pBuffer + headLen, pBody, rawBodyLen); pQueue->tail = headLen + bodyLen; } else if (remain < 8 + headLen) { - memcpy(pQueue->pBuffer + pQueue->head + 8, pHead, remain - 8); + memcpy(pQueue->pBuffer + pQueue->tail + 8, pHead, remain - 8); memcpy(pQueue->pBuffer, pHead + remain - 8, rawHeadLen - (remain - 8)); memcpy(pQueue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen); pQueue->tail = headLen - (remain - 8) + bodyLen; - } else if (remain < 8 + bodyLen) { - memcpy(pQueue->pBuffer + pQueue->head + 8, pHead, rawHeadLen); - memcpy(pQueue->pBuffer + pQueue->head + 8 + headLen, pBody, remain - 8 - headLen); + } else if (remain < 8 + headLen + bodyLen) { + memcpy(pQueue->pBuffer + pQueue->tail + 8, pHead, rawHeadLen); + memcpy(pQueue->pBuffer + pQueue->tail + 8 + headLen, pBody, remain - 8 - headLen); memcpy(pQueue->pBuffer, pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen)); pQueue->tail = bodyLen - (remain - 8 - headLen); } else { - memcpy(pQueue->pBuffer + pQueue->head + 8, pHead, rawHeadLen); - memcpy(pQueue->pBuffer + pQueue->head + headLen + 8, pBody, rawBodyLen); - pQueue->tail = pQueue->head + headLen + bodyLen + 8; + memcpy(pQueue->pBuffer + pQueue->tail + 8, pHead, rawHeadLen); + memcpy(pQueue->pBuffer + pQueue->tail + headLen + 8, pBody, rawBodyLen); + pQueue->tail = pQueue->tail + headLen + bodyLen + 8; } } @@ -271,8 +267,8 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t taosThreadMutexUnlock(pQueue->mutex); tsem_post(&pQueue->sem); - uTrace("proc:%s, push msg to queue:%p remains:%d, head:%d:%p body:%d:%p ftype:%d", pQueue->name, pQueue, pQueue->items, - headLen, pHead, bodyLen, pBody, funcType); + uTrace("proc:%s, push msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", pQueue->name, pos, ftype, + pQueue->items, headLen, pHead, bodyLen, pBody); return 0; } @@ -312,6 +308,7 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea return -1; } + const int32_t pos = pQueue->head; if (pQueue->head < pQueue->tail) { memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen); memcpy(pBody, pQueue->pBuffer + pQueue->head + 8 + headLen, bodyLen); @@ -331,7 +328,7 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea memcpy(pHead + remain - 8, pQueue->pBuffer, headLen - (remain - 8)); memcpy(pBody, pQueue->pBuffer + headLen - (remain - 8), bodyLen); pQueue->head = headLen - (remain - 8) + bodyLen; - } else if (remain < 8 + bodyLen) { + } else if (remain < 8 + headLen + bodyLen) { memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen); memcpy(pBody, pQueue->pBuffer + pQueue->head + 8 + headLen, remain - 8 - headLen); memcpy(pBody + remain - 8 - headLen, pQueue->pBuffer, bodyLen - (remain - 8 - headLen)); @@ -353,8 +350,8 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea *pBodyLen = bodyLen; *pFuncType = (ProcFuncType)ftype; - uTrace("proc:%s, pop msg from queue:%p remains:%d, head:%d:%p body:%d:%p ftype:%d", pQueue->name, pQueue, pQueue->items, - headLen, pHead, bodyLen, pBody, ftype); + uTrace("proc:%s, pop msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", pQueue->name, pos, ftype, + pQueue->items, headLen, pHead, bodyLen, pBody); return 1; } @@ -472,11 +469,11 @@ void taosProcCleanup(SProcObj *pProc) { } int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, - ProcFuncType funcType) { - return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, funcType); + ProcFuncType ftype) { + return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, ftype); } int32_t taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, - ProcFuncType funcType) { - return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, funcType); + ProcFuncType ftype) { + return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, ftype); }