fix: reset queueSize after the queueItem is consumed and executed by the worker

This commit is contained in:
Shengliang Guan 2022-07-12 13:56:54 +08:00
parent 52b63db905
commit b9165a9503
5 changed files with 24 additions and 6 deletions

View File

@ -67,6 +67,7 @@ void taosFreeQitem(void *pItem);
void taosWriteQitem(STaosQueue *queue, void *pItem); void taosWriteQitem(STaosQueue *queue, void *pItem);
int32_t taosReadQitem(STaosQueue *queue, void **ppItem); int32_t taosReadQitem(STaosQueue *queue, void **ppItem);
bool taosQueueEmpty(STaosQueue *queue); bool taosQueueEmpty(STaosQueue *queue);
void taosUpdateItemSize(STaosQueue *queue, int32_t items);
int32_t taosQueueItemSize(STaosQueue *queue); int32_t taosQueueItemSize(STaosQueue *queue);
int64_t taosQueueMemorySize(STaosQueue *queue); int64_t taosQueueMemorySize(STaosQueue *queue);

View File

@ -369,6 +369,8 @@ static void *syncIOConsumerFunc(void *param) {
taosFreeQitem(pRpcMsg); taosFreeQitem(pRpcMsg);
} }
taosUpdateItemSize(qinfo.queue, numOfMsgs);
} }
taosFreeQall(qall); taosFreeQall(qall);

View File

@ -32,11 +32,12 @@ void processShellMsg() {
SRpcMsg * pRpcMsg, rpcMsg; SRpcMsg * pRpcMsg, rpcMsg;
int type; int type;
void * pvnode; void * pvnode;
SQueueInfo qinfo = {0};
qall = taosAllocateQall(); qall = taosAllocateQall();
while (1) { while (1) {
int numOfMsgs = taosReadAllQitemsFromQset(qset, qall, &pvnode, NULL); int numOfMsgs = taosReadAllQitemsFromQset(qset, qall, &pvnode, &qinfo);
tDebug("%d shell msgs are received", numOfMsgs); tDebug("%d shell msgs are received", numOfMsgs);
if (numOfMsgs <= 0) break; if (numOfMsgs <= 0) break;
@ -86,6 +87,8 @@ void processShellMsg() {
rpcSendResponse(&nRpcMsg); rpcSendResponse(&nRpcMsg);
} }
} }
taosUpdateItemSize(qinfo.queue, numOfMsgs);
} }
taosFreeQall(qall); taosFreeQall(qall);

View File

@ -123,6 +123,14 @@ bool taosQueueEmpty(STaosQueue *queue) {
return empty; return empty;
} }
void taosUpdateItemSize(STaosQueue *queue, int32_t items) {
if (queue == NULL) return;
taosThreadMutexLock(&queue->mutex);
queue->numOfItems -= items;
taosThreadMutexUnlock(&queue->mutex);
}
int32_t taosQueueItemSize(STaosQueue *queue) { int32_t taosQueueItemSize(STaosQueue *queue) {
if (queue == NULL) return 0; if (queue == NULL) return 0;
@ -257,6 +265,7 @@ int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall) {
queue->tail = NULL; queue->tail = NULL;
queue->numOfItems = 0; queue->numOfItems = 0;
queue->memOfItems = 0; queue->memOfItems = 0;
uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, code, queue, queue->numOfItems, queue->memOfItems);
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems); if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
} }
@ -424,11 +433,11 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo)
queue->head = pNode->next; queue->head = pNode->next;
if (queue->head == NULL) queue->tail = NULL; if (queue->head == NULL) queue->tail = NULL;
queue->numOfItems--; // queue->numOfItems--;
queue->memOfItems -= pNode->size; queue->memOfItems -= pNode->size;
atomic_sub_fetch_32(&qset->numOfItems, 1); atomic_sub_fetch_32(&qset->numOfItems, 1);
code = 1; code = 1;
uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems, uTrace("item:%p is read out from queue:%p, items:%d mem:%" PRId64, *ppItem, queue, queue->numOfItems - 1,
queue->memOfItems); queue->memOfItems);
} }
@ -468,9 +477,9 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *
queue->head = NULL; queue->head = NULL;
queue->tail = NULL; queue->tail = NULL;
queue->numOfItems = 0; // queue->numOfItems = 0;
queue->memOfItems = 0; queue->memOfItems = 0;
uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, code, queue, queue->numOfItems, queue->memOfItems); uTrace("read %d items from queue:%p, items:0 mem:%" PRId64, code, queue, queue->memOfItems);
atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems); atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
for (int32_t j = 1; j < qall->numOfItems; ++j) { for (int32_t j = 1; j < qall->numOfItems; ++j) {

View File

@ -79,7 +79,7 @@ static void *tQWorkerThreadFp(SQWorker *worker) {
uDebug("worker:%s:%d is running", pool->name, worker->id); uDebug("worker:%s:%d is running", pool->name, worker->id);
while (1) { while (1) {
if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) { if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) {
uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset);
break; break;
} }
@ -89,6 +89,8 @@ static void *tQWorkerThreadFp(SQWorker *worker) {
qinfo.threadNum = pool->num; qinfo.threadNum = pool->num;
(*((FItem)qinfo.fp))(&qinfo, msg); (*((FItem)qinfo.fp))(&qinfo, msg);
} }
taosUpdateItemSize(qinfo.queue, 1);
} }
return NULL; return NULL;
@ -214,6 +216,7 @@ static void *tWWorkerThreadFp(SWWorker *worker) {
qinfo.threadNum = pool->num; qinfo.threadNum = pool->num;
(*((FItems)qinfo.fp))(&qinfo, worker->qall, numOfMsgs); (*((FItems)qinfo.fp))(&qinfo, worker->qall, numOfMsgs);
} }
taosUpdateItemSize(qinfo.queue, numOfMsgs);
} }
return NULL; return NULL;