diff --git a/include/util/tqueue.h b/include/util/tqueue.h index 466c577c00..2886190997 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -44,6 +44,8 @@ typedef struct STaosQset STaosQset; typedef struct STaosQall STaosQall; typedef struct { void *ahandle; + void *fp; + void *queue; int32_t workerId; int32_t threadNum; int64_t timestamp; @@ -81,8 +83,8 @@ int32_t taosAddIntoQset(STaosQset *qset, STaosQueue *queue, void *ahandle); void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue); int32_t taosGetQueueNumber(STaosQset *qset); -int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void **ahandle, FItem *itemFp); -int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp); +int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo); +int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *qinfo); void taosResetQsetThread(STaosQset *qset, void *pItem); extern int64_t tsRpcQueueMemoryAllowed; diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 663745a7d7..72d74d7ae5 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -242,13 +242,13 @@ static int32_t syncIOStopInternal(SSyncIO *io) { } static void *syncIOConsumerFunc(void *param) { - SSyncIO * io = param; - STaosQall *qall; - SRpcMsg * pRpcMsg, rpcMsg; - qall = taosAllocateQall(); + SSyncIO *io = param; + STaosQall *qall = taosAllocateQall(); + SRpcMsg *pRpcMsg, rpcMsg; + SQueueInfo qinfo = {0}; while (1) { - int numOfMsgs = taosReadAllQitemsFromQset(io->pQset, qall, NULL, NULL); + int numOfMsgs = taosReadAllQitemsFromQset(io->pQset, qall, &qinfo); sTrace("syncIOConsumerFunc %d msgs are received", numOfMsgs); if (numOfMsgs <= 0) { break; diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 94311bc435..1895472472 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -115,7 +115,7 @@ bool taosQueueEmpty(STaosQueue *queue) { bool empty = false; taosThreadMutexLock(&queue->mutex); - if (queue->head == NULL && queue->tail == NULL) { + if (queue->head == NULL && queue->tail == NULL && queue->numOfItems == 0 && queue->memOfItems == 0) { empty = true; } taosThreadMutexUnlock(&queue->mutex); @@ -397,7 +397,7 @@ void taosRemoveFromQset(STaosQset *qset, STaosQueue *queue) { int32_t taosGetQueueNumber(STaosQset *qset) { return qset->numOfQueues; } -int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void **ahandle, FItem *itemFp) { +int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, SQueueInfo *qinfo) { STaosQnode *pNode = NULL; int32_t code = 0; @@ -417,9 +417,10 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void if (queue->head) { pNode = queue->head; *ppItem = pNode->item; - if (ahandle) *ahandle = queue->ahandle; - if (itemFp) *itemFp = queue->itemFp; - if (ts) *ts = pNode->timestamp; + qinfo->ahandle = queue->ahandle; + qinfo->fp = queue->itemFp; + qinfo->queue = queue; + qinfo->timestamp = pNode->timestamp; queue->head = pNode->next; if (queue->head == NULL) queue->tail = NULL; @@ -440,7 +441,7 @@ int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, int64_t *ts, void return code; } -int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp) { +int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, SQueueInfo *qinfo) { STaosQueue *queue; int32_t code = 0; @@ -461,13 +462,16 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand qall->start = queue->head; qall->numOfItems = queue->numOfItems; code = qall->numOfItems; - if (ahandle) *ahandle = queue->ahandle; - if (itemsFp) *itemsFp = queue->itemsFp; + qinfo->ahandle = queue->ahandle; + qinfo->fp = queue->itemsFp; + qinfo->queue = queue; queue->head = NULL; queue->tail = NULL; queue->numOfItems = 0; queue->memOfItems = 0; + uTrace("read %d items from queue:%p, items:%d mem:%" PRId64, code, queue, queue->numOfItems, queue->memOfItems); + atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems); for (int32_t j = 1; j < qall->numOfItems; ++j) { tsem_wait(&qset->sem); diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 88bd36f0cb..5e3a0dc109 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -70,26 +70,24 @@ void tQWorkerCleanup(SQWorkerPool *pool) { static void *tQWorkerThreadFp(SQWorker *worker) { SQWorkerPool *pool = worker->pool; - FItem fp = NULL; - - void *msg = NULL; - void *ahandle = NULL; - int32_t code = 0; - int64_t ts = 0; + SQueueInfo qinfo = {0}; + void *msg = NULL; + int32_t code = 0; taosBlockSIGPIPE(); setThreadName(pool->name); uDebug("worker:%s:%d is running", pool->name, worker->id); while (1) { - if (taosReadQitemFromQset(pool->qset, (void **)&msg, &ts, &ahandle, &fp) == 0) { + if (taosReadQitemFromQset(pool->qset, (void **)&msg, &qinfo) == 0) { uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); break; } - if (fp != NULL) { - SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num, .timestamp = ts}; - (*fp)(&info, msg); + if (qinfo.fp != NULL) { + qinfo.workerId = worker->id; + qinfo.threadNum = pool->num; + (*((FItem)qinfo.fp))(&qinfo, msg); } } @@ -195,27 +193,26 @@ void tWWorkerCleanup(SWWorkerPool *pool) { static void *tWWorkerThreadFp(SWWorker *worker) { SWWorkerPool *pool = worker->pool; - FItems fp = NULL; - - void *msg = NULL; - void *ahandle = NULL; - int32_t numOfMsgs = 0; - int32_t qtype = 0; + SQueueInfo qinfo = {0}; + void *msg = NULL; + int32_t code = 0; + int32_t numOfMsgs = 0; taosBlockSIGPIPE(); setThreadName(pool->name); uDebug("worker:%s:%d is running", pool->name, worker->id); while (1) { - numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &ahandle, &fp); + numOfMsgs = taosReadAllQitemsFromQset(worker->qset, worker->qall, &qinfo); if (numOfMsgs == 0) { uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, worker->qset); break; } - if (fp != NULL) { - SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num}; - (*fp)(&info, worker->qall, numOfMsgs); + if (qinfo.fp != NULL) { + qinfo.workerId = worker->id; + qinfo.threadNum = pool->num; + (*((FItems)qinfo.fp))(&qinfo, worker->qall, numOfMsgs); } }