shm queue
This commit is contained in:
parent
ac6b121348
commit
232404fa86
|
@ -208,16 +208,11 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, const char *pBody,
|
static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, const char *pBody,
|
||||||
int32_t rawBodyLen, ProcFuncType funcType) {
|
int32_t rawBodyLen, ProcFuncType ftype) {
|
||||||
const int32_t headLen = CEIL8(rawHeadLen);
|
const int32_t headLen = CEIL8(rawHeadLen);
|
||||||
const int32_t bodyLen = CEIL8(rawBodyLen);
|
const int32_t bodyLen = CEIL8(rawBodyLen);
|
||||||
const int32_t fullLen = headLen + bodyLen + 8;
|
const int32_t fullLen = headLen + bodyLen + 8;
|
||||||
|
|
||||||
if (headLen <= 0 || bodyLen <= 0) {
|
|
||||||
terrno = TSDB_CODE_INVALID_PARA;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosThreadMutexLock(pQueue->mutex);
|
taosThreadMutexLock(pQueue->mutex);
|
||||||
if (fullLen > pQueue->avail) {
|
if (fullLen > pQueue->avail) {
|
||||||
taosThreadMutexUnlock(pQueue->mutex);
|
taosThreadMutexUnlock(pQueue->mutex);
|
||||||
|
@ -225,13 +220,14 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const int32_t pos = pQueue->tail;
|
||||||
if (pQueue->tail < pQueue->total) {
|
if (pQueue->tail < pQueue->total) {
|
||||||
*(int16_t *)(pQueue->pBuffer + pQueue->head) = headLen;
|
*(int16_t *)(pQueue->pBuffer + pQueue->tail) = headLen;
|
||||||
*(int8_t *)(pQueue->pBuffer + pQueue->head + 2) = (int8_t)funcType;
|
*(int8_t *)(pQueue->pBuffer + pQueue->tail + 2) = (int8_t)ftype;
|
||||||
*(int32_t *)(pQueue->pBuffer + pQueue->head + 4) = bodyLen;
|
*(int32_t *)(pQueue->pBuffer + pQueue->tail + 4) = bodyLen;
|
||||||
} else {
|
} else {
|
||||||
*(int16_t *)(pQueue->pBuffer) = headLen;
|
*(int16_t *)(pQueue->pBuffer) = headLen;
|
||||||
*(int8_t *)(pQueue->pBuffer + pQueue->head + 2) = (int8_t)funcType;
|
*(int8_t *)(pQueue->pBuffer + 2) = (int8_t)ftype;
|
||||||
*(int32_t *)(pQueue->pBuffer + 4) = bodyLen;
|
*(int32_t *)(pQueue->pBuffer + 4) = bodyLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -250,19 +246,19 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t
|
||||||
memcpy(pQueue->pBuffer + headLen, pBody, rawBodyLen);
|
memcpy(pQueue->pBuffer + headLen, pBody, rawBodyLen);
|
||||||
pQueue->tail = headLen + bodyLen;
|
pQueue->tail = headLen + bodyLen;
|
||||||
} else if (remain < 8 + headLen) {
|
} else if (remain < 8 + headLen) {
|
||||||
memcpy(pQueue->pBuffer + pQueue->head + 8, pHead, remain - 8);
|
memcpy(pQueue->pBuffer + pQueue->tail + 8, pHead, remain - 8);
|
||||||
memcpy(pQueue->pBuffer, pHead + remain - 8, rawHeadLen - (remain - 8));
|
memcpy(pQueue->pBuffer, pHead + remain - 8, rawHeadLen - (remain - 8));
|
||||||
memcpy(pQueue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen);
|
memcpy(pQueue->pBuffer + headLen - (remain - 8), pBody, rawBodyLen);
|
||||||
pQueue->tail = headLen - (remain - 8) + bodyLen;
|
pQueue->tail = headLen - (remain - 8) + bodyLen;
|
||||||
} else if (remain < 8 + bodyLen) {
|
} else if (remain < 8 + headLen + bodyLen) {
|
||||||
memcpy(pQueue->pBuffer + pQueue->head + 8, pHead, rawHeadLen);
|
memcpy(pQueue->pBuffer + pQueue->tail + 8, pHead, rawHeadLen);
|
||||||
memcpy(pQueue->pBuffer + pQueue->head + 8 + headLen, pBody, remain - 8 - headLen);
|
memcpy(pQueue->pBuffer + pQueue->tail + 8 + headLen, pBody, remain - 8 - headLen);
|
||||||
memcpy(pQueue->pBuffer, pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen));
|
memcpy(pQueue->pBuffer, pBody + remain - 8 - headLen, rawBodyLen - (remain - 8 - headLen));
|
||||||
pQueue->tail = bodyLen - (remain - 8 - headLen);
|
pQueue->tail = bodyLen - (remain - 8 - headLen);
|
||||||
} else {
|
} else {
|
||||||
memcpy(pQueue->pBuffer + pQueue->head + 8, pHead, rawHeadLen);
|
memcpy(pQueue->pBuffer + pQueue->tail + 8, pHead, rawHeadLen);
|
||||||
memcpy(pQueue->pBuffer + pQueue->head + headLen + 8, pBody, rawBodyLen);
|
memcpy(pQueue->pBuffer + pQueue->tail + headLen + 8, pBody, rawBodyLen);
|
||||||
pQueue->tail = pQueue->head + headLen + bodyLen + 8;
|
pQueue->tail = pQueue->tail + headLen + bodyLen + 8;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -271,8 +267,8 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, const char *pHead, int16_t
|
||||||
taosThreadMutexUnlock(pQueue->mutex);
|
taosThreadMutexUnlock(pQueue->mutex);
|
||||||
tsem_post(&pQueue->sem);
|
tsem_post(&pQueue->sem);
|
||||||
|
|
||||||
uTrace("proc:%s, push msg to queue:%p remains:%d, head:%d:%p body:%d:%p ftype:%d", pQueue->name, pQueue, pQueue->items,
|
uTrace("proc:%s, push msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", pQueue->name, pos, ftype,
|
||||||
headLen, pHead, bodyLen, pBody, funcType);
|
pQueue->items, headLen, pHead, bodyLen, pBody);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -312,6 +308,7 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const int32_t pos = pQueue->head;
|
||||||
if (pQueue->head < pQueue->tail) {
|
if (pQueue->head < pQueue->tail) {
|
||||||
memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen);
|
memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen);
|
||||||
memcpy(pBody, pQueue->pBuffer + pQueue->head + 8 + headLen, bodyLen);
|
memcpy(pBody, pQueue->pBuffer + pQueue->head + 8 + headLen, bodyLen);
|
||||||
|
@ -331,7 +328,7 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea
|
||||||
memcpy(pHead + remain - 8, pQueue->pBuffer, headLen - (remain - 8));
|
memcpy(pHead + remain - 8, pQueue->pBuffer, headLen - (remain - 8));
|
||||||
memcpy(pBody, pQueue->pBuffer + headLen - (remain - 8), bodyLen);
|
memcpy(pBody, pQueue->pBuffer + headLen - (remain - 8), bodyLen);
|
||||||
pQueue->head = headLen - (remain - 8) + bodyLen;
|
pQueue->head = headLen - (remain - 8) + bodyLen;
|
||||||
} else if (remain < 8 + bodyLen) {
|
} else if (remain < 8 + headLen + bodyLen) {
|
||||||
memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen);
|
memcpy(pHead, pQueue->pBuffer + pQueue->head + 8, headLen);
|
||||||
memcpy(pBody, pQueue->pBuffer + pQueue->head + 8 + headLen, remain - 8 - headLen);
|
memcpy(pBody, pQueue->pBuffer + pQueue->head + 8 + headLen, remain - 8 - headLen);
|
||||||
memcpy(pBody + remain - 8 - headLen, pQueue->pBuffer, bodyLen - (remain - 8 - headLen));
|
memcpy(pBody + remain - 8 - headLen, pQueue->pBuffer, bodyLen - (remain - 8 - headLen));
|
||||||
|
@ -353,8 +350,8 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea
|
||||||
*pBodyLen = bodyLen;
|
*pBodyLen = bodyLen;
|
||||||
*pFuncType = (ProcFuncType)ftype;
|
*pFuncType = (ProcFuncType)ftype;
|
||||||
|
|
||||||
uTrace("proc:%s, pop msg from queue:%p remains:%d, head:%d:%p body:%d:%p ftype:%d", pQueue->name, pQueue, pQueue->items,
|
uTrace("proc:%s, pop msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", pQueue->name, pos, ftype,
|
||||||
headLen, pHead, bodyLen, pBody, ftype);
|
pQueue->items, headLen, pHead, bodyLen, pBody);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -472,11 +469,11 @@ void taosProcCleanup(SProcObj *pProc) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
|
int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
|
||||||
ProcFuncType funcType) {
|
ProcFuncType ftype) {
|
||||||
return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, funcType);
|
return taosProcQueuePush(pProc->pChildQueue, pHead, headLen, pBody, bodyLen, ftype);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
|
int32_t taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen,
|
||||||
ProcFuncType funcType) {
|
ProcFuncType ftype) {
|
||||||
return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, funcType);
|
return taosProcQueuePush(pProc->pParentQueue, pHead, headLen, pBody, bodyLen, ftype);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue