diff --git a/include/util/tqueue.h b/include/util/tqueue.h index d51184edfc..9faf90113e 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -70,10 +70,7 @@ int32_t taosGetQueueNumber(STaosQset *qset); int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp); int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp); - -int32_t taosReadQitemFromQsetByThread(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp, int32_t threadId); void taosResetQsetThread(STaosQset *qset, void *pItem); - int32_t taosGetQueueItemsNumber(STaosQueue *queue); int32_t taosGetQsetItemsNumber(STaosQset *qset); diff --git a/include/util/tworker.h b/include/util/tworker.h index dabd1ac9b3..3da8a1db63 100644 --- a/include/util/tworker.h +++ b/include/util/tworker.h @@ -27,33 +27,33 @@ typedef struct SWWorkerPool SWWorkerPool; typedef struct SQWorker { int32_t id; // worker ID - TdThread thread; // thread + TdThread thread; // thread SQWorkerPool *pool; -} SQWorker, SFWorker; +} SQWorker; typedef struct SQWorkerPool { - int32_t max; // max number of workers - int32_t min; // min number of workers - int32_t num; // current number of workers - STaosQset *qset; - const char *name; - SQWorker *workers; + int32_t max; // max number of workers + int32_t min; // min number of workers + int32_t num; // current number of workers + STaosQset *qset; + const char *name; + SQWorker *workers; TdThreadMutex mutex; -} SQWorkerPool, SFWorkerPool; +} SQWorkerPool; typedef struct SWWorker { int32_t id; // worker id - TdThread thread; // thread + TdThread thread; // thread STaosQall *qall; STaosQset *qset; // queue set SWWorkerPool *pool; } SWWorker; typedef struct SWWorkerPool { - int32_t max; // max number of workers - int32_t nextId; // from 0 to max-1, cyclic - const char *name; - SWWorker *workers; + int32_t max; // max number of workers + int32_t nextId; // from 0 to max-1, cyclic + const char *name; + SWWorker *workers; TdThreadMutex mutex; } SWWorkerPool; @@ -62,11 +62,6 @@ void tQWorkerCleanup(SQWorkerPool *pool); STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp); void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue); -int32_t tFWorkerInit(SFWorkerPool *pool); -void tFWorkerCleanup(SFWorkerPool *pool); -STaosQueue *tFWorkerAllocQueue(SFWorkerPool *pool, void *ahandle, FItem fp); -void tFWorkerFreeQueue(SFWorkerPool *pool, STaosQueue *queue); - int32_t tWWorkerInit(SWWorkerPool *pool); void tWWorkerCleanup(SWWorkerPool *pool); STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp); diff --git a/source/dnode/mgmt/vnode/inc/vmInt.h b/source/dnode/mgmt/vnode/inc/vmInt.h index d670e073da..ba59258d07 100644 --- a/source/dnode/mgmt/vnode/inc/vmInt.h +++ b/source/dnode/mgmt/vnode/inc/vmInt.h @@ -30,7 +30,7 @@ typedef struct SVnodesMgmt { SVnodesStat state; STfs *pTfs; SQWorkerPool queryPool; - SFWorkerPool fetchPool; + SQWorkerPool fetchPool; SWWorkerPool syncPool; SWWorkerPool writePool; const char *path; diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index 035b805d0e..e0632cee68 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -262,7 +262,7 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue); pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue); pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue); - pVnode->pFetchQ = tFWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue); + pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue); pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL || @@ -277,7 +277,7 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); - tFWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); + tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ); tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ); tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); @@ -303,11 +303,11 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { pQPool->max = maxQueryThreads; if (tQWorkerInit(pQPool) != 0) return -1; - SFWorkerPool *pFPool = &pMgmt->fetchPool; + SQWorkerPool *pFPool = &pMgmt->fetchPool; pFPool->name = "vnode-fetch"; pFPool->min = minFetchThreads; pFPool->max = maxFetchThreads; - if (tFWorkerInit(pFPool) != 0) return -1; + if (tQWorkerInit(pFPool) != 0) return -1; SWWorkerPool *pWPool = &pMgmt->writePool; pWPool->name = "vnode-write"; @@ -330,7 +330,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { void vmStopWorker(SVnodesMgmt *pMgmt) { dndCleanupWorker(&pMgmt->mgmtWorker); - tFWorkerCleanup(&pMgmt->fetchPool); + tQWorkerCleanup(&pMgmt->fetchPool); tQWorkerCleanup(&pMgmt->queryPool); tWWorkerCleanup(&pMgmt->writePool); tWWorkerCleanup(&pMgmt->syncPool); diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 99675630f4..258ca1402f 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -68,7 +68,6 @@ STaosQueue *taosOpenQueue() { return NULL; } - queue->threadId = -1; uDebug("queue:%p is opened", queue); return queue; } @@ -437,53 +436,6 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand return code; } -int32_t taosReadQitemFromQsetByThread(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp, int32_t threadId) { - STaosQnode *pNode = NULL; - int32_t code = -1; - - tsem_wait(&qset->sem); - - taosThreadMutexLock(&qset->mutex); - - for (int32_t i = 0; i < qset->numOfQueues; ++i) { - if (qset->current == NULL) qset->current = qset->head; - STaosQueue *queue = qset->current; - if (queue) qset->current = queue->next; - if (queue == NULL) break; - if (queue->head == NULL) continue; - if (queue->threadId != -1 && queue->threadId != threadId) { - code = 0; - continue; - } - - taosThreadMutexLock(&queue->mutex); - - if (queue->head) { - pNode = queue->head; - pNode->queue = queue; - queue->threadId = threadId; - *ppItem = pNode->item; - - if (ahandle) *ahandle = queue->ahandle; - if (itemFp) *itemFp = queue->itemFp; - - queue->head = pNode->next; - if (queue->head == NULL) queue->tail = NULL; - queue->numOfItems--; - atomic_sub_fetch_32(&qset->numOfItems, 1); - code = 1; - uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems); - } - - taosThreadMutexUnlock(&queue->mutex); - if (pNode) break; - } - - taosThreadMutexUnlock(&qset->mutex); - - return code; -} - void taosResetQsetThread(STaosQset *qset, void *pItem) { if (pItem == NULL) return; STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode)); diff --git a/source/util/src/tworker.c b/source/util/src/tworker.c index 78098af3bd..7d7f819dec 100644 --- a/source/util/src/tworker.c +++ b/source/util/src/tworker.c @@ -93,7 +93,7 @@ static void *tQWorkerThreadFp(SQWorker *worker) { return NULL; } -STaosQueue *tWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp, ThreadFp threadFp) { +STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) { taosThreadMutexLock(&pool->mutex); STaosQueue *queue = taosOpenQueue(); if (queue == NULL) { @@ -114,7 +114,7 @@ STaosQueue *tWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp, Threa taosThreadAttrInit(&thAttr); taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); - if (taosThreadCreate(&worker->thread, &thAttr, threadFp, worker) != 0) { + if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) { uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno)); taosCloseQueue(queue); terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -134,58 +134,11 @@ STaosQueue *tWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp, Threa return queue; } -STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) { - return tWorkerAllocQueue(pool, ahandle, fp, (ThreadFp)tQWorkerThreadFp); -} - void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) { taosCloseQueue(queue); uDebug("worker:%s, queue:%p is freed", pool->name, queue); } -int32_t tFWorkerInit(SFWorkerPool *pool) { return tQWorkerInit((SQWorkerPool *)pool); } - -void tFWorkerCleanup(SFWorkerPool *pool) { tQWorkerCleanup(pool); } - -static void *tFWorkerThreadFp(SQWorker *worker) { - SQWorkerPool *pool = worker->pool; - - FItem fp = NULL; - void *msg = NULL; - void *ahandle = NULL; - int32_t code = 0; - - taosBlockSIGPIPE(); - setThreadName(pool->name); - uDebug("worker:%s:%d is running", pool->name, worker->id); - - while (1) { - code = taosReadQitemFromQsetByThread(pool->qset, (void **)&msg, &ahandle, &fp, worker->id); - - if (code < 0) { - uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset); - break; - } else if (code == 0) { - // uTrace("worker:%s:%d qset:%p, got no message and continue", pool->name, worker->id, pool->qset); - continue; - } - - if (fp != NULL) { - (*fp)(ahandle, msg); - } - - taosResetQsetThread(pool->qset, msg); - } - - return NULL; -} - -STaosQueue *tFWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) { - return tWorkerAllocQueue(pool, ahandle, fp, (ThreadFp)tQWorkerThreadFp); -} - -void tFWorkerFreeQueue(SFWorkerPool *pool, STaosQueue *queue) { tQWorkerFreeQueue(pool, queue); } - int32_t tWWorkerInit(SWWorkerPool *pool) { pool->nextId = 0; pool->workers = calloc(pool->max, sizeof(SWWorker));