refact worker
This commit is contained in:
parent
5f9f8ac7e2
commit
fe8714fe56
|
@ -70,10 +70,7 @@ int32_t taosGetQueueNumber(STaosQset *qset);
|
|||
|
||||
int32_t taosReadQitemFromQset(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp);
|
||||
int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahandle, FItems *itemsFp);
|
||||
|
||||
int32_t taosReadQitemFromQsetByThread(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp, int32_t threadId);
|
||||
void taosResetQsetThread(STaosQset *qset, void *pItem);
|
||||
|
||||
int32_t taosGetQueueItemsNumber(STaosQueue *queue);
|
||||
int32_t taosGetQsetItemsNumber(STaosQset *qset);
|
||||
|
||||
|
|
|
@ -27,33 +27,33 @@ typedef struct SWWorkerPool SWWorkerPool;
|
|||
|
||||
typedef struct SQWorker {
|
||||
int32_t id; // worker ID
|
||||
TdThread thread; // thread
|
||||
TdThread thread; // thread
|
||||
SQWorkerPool *pool;
|
||||
} SQWorker, SFWorker;
|
||||
} SQWorker;
|
||||
|
||||
typedef struct SQWorkerPool {
|
||||
int32_t max; // max number of workers
|
||||
int32_t min; // min number of workers
|
||||
int32_t num; // current number of workers
|
||||
STaosQset *qset;
|
||||
const char *name;
|
||||
SQWorker *workers;
|
||||
int32_t max; // max number of workers
|
||||
int32_t min; // min number of workers
|
||||
int32_t num; // current number of workers
|
||||
STaosQset *qset;
|
||||
const char *name;
|
||||
SQWorker *workers;
|
||||
TdThreadMutex mutex;
|
||||
} SQWorkerPool, SFWorkerPool;
|
||||
} SQWorkerPool;
|
||||
|
||||
typedef struct SWWorker {
|
||||
int32_t id; // worker id
|
||||
TdThread thread; // thread
|
||||
TdThread thread; // thread
|
||||
STaosQall *qall;
|
||||
STaosQset *qset; // queue set
|
||||
SWWorkerPool *pool;
|
||||
} SWWorker;
|
||||
|
||||
typedef struct SWWorkerPool {
|
||||
int32_t max; // max number of workers
|
||||
int32_t nextId; // from 0 to max-1, cyclic
|
||||
const char *name;
|
||||
SWWorker *workers;
|
||||
int32_t max; // max number of workers
|
||||
int32_t nextId; // from 0 to max-1, cyclic
|
||||
const char *name;
|
||||
SWWorker *workers;
|
||||
TdThreadMutex mutex;
|
||||
} SWWorkerPool;
|
||||
|
||||
|
@ -62,11 +62,6 @@ void tQWorkerCleanup(SQWorkerPool *pool);
|
|||
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp);
|
||||
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue);
|
||||
|
||||
int32_t tFWorkerInit(SFWorkerPool *pool);
|
||||
void tFWorkerCleanup(SFWorkerPool *pool);
|
||||
STaosQueue *tFWorkerAllocQueue(SFWorkerPool *pool, void *ahandle, FItem fp);
|
||||
void tFWorkerFreeQueue(SFWorkerPool *pool, STaosQueue *queue);
|
||||
|
||||
int32_t tWWorkerInit(SWWorkerPool *pool);
|
||||
void tWWorkerCleanup(SWWorkerPool *pool);
|
||||
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp);
|
||||
|
|
|
@ -30,7 +30,7 @@ typedef struct SVnodesMgmt {
|
|||
SVnodesStat state;
|
||||
STfs *pTfs;
|
||||
SQWorkerPool queryPool;
|
||||
SFWorkerPool fetchPool;
|
||||
SQWorkerPool fetchPool;
|
||||
SWWorkerPool syncPool;
|
||||
SWWorkerPool writePool;
|
||||
const char *path;
|
||||
|
|
|
@ -262,7 +262,7 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
|
|||
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue);
|
||||
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue);
|
||||
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
|
||||
pVnode->pFetchQ = tFWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
|
||||
pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
|
||||
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
|
||||
|
||||
if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL ||
|
||||
|
@ -277,7 +277,7 @@ int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
|
|||
|
||||
void vmFreeQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||
tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
|
||||
tFWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
|
||||
tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
|
||||
tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
|
||||
tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
|
||||
tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
|
||||
|
@ -303,11 +303,11 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
|
|||
pQPool->max = maxQueryThreads;
|
||||
if (tQWorkerInit(pQPool) != 0) return -1;
|
||||
|
||||
SFWorkerPool *pFPool = &pMgmt->fetchPool;
|
||||
SQWorkerPool *pFPool = &pMgmt->fetchPool;
|
||||
pFPool->name = "vnode-fetch";
|
||||
pFPool->min = minFetchThreads;
|
||||
pFPool->max = maxFetchThreads;
|
||||
if (tFWorkerInit(pFPool) != 0) return -1;
|
||||
if (tQWorkerInit(pFPool) != 0) return -1;
|
||||
|
||||
SWWorkerPool *pWPool = &pMgmt->writePool;
|
||||
pWPool->name = "vnode-write";
|
||||
|
@ -330,7 +330,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) {
|
|||
|
||||
void vmStopWorker(SVnodesMgmt *pMgmt) {
|
||||
dndCleanupWorker(&pMgmt->mgmtWorker);
|
||||
tFWorkerCleanup(&pMgmt->fetchPool);
|
||||
tQWorkerCleanup(&pMgmt->fetchPool);
|
||||
tQWorkerCleanup(&pMgmt->queryPool);
|
||||
tWWorkerCleanup(&pMgmt->writePool);
|
||||
tWWorkerCleanup(&pMgmt->syncPool);
|
||||
|
|
|
@ -68,7 +68,6 @@ STaosQueue *taosOpenQueue() {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
queue->threadId = -1;
|
||||
uDebug("queue:%p is opened", queue);
|
||||
return queue;
|
||||
}
|
||||
|
@ -437,53 +436,6 @@ int32_t taosReadAllQitemsFromQset(STaosQset *qset, STaosQall *qall, void **ahand
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t taosReadQitemFromQsetByThread(STaosQset *qset, void **ppItem, void **ahandle, FItem *itemFp, int32_t threadId) {
|
||||
STaosQnode *pNode = NULL;
|
||||
int32_t code = -1;
|
||||
|
||||
tsem_wait(&qset->sem);
|
||||
|
||||
taosThreadMutexLock(&qset->mutex);
|
||||
|
||||
for (int32_t i = 0; i < qset->numOfQueues; ++i) {
|
||||
if (qset->current == NULL) qset->current = qset->head;
|
||||
STaosQueue *queue = qset->current;
|
||||
if (queue) qset->current = queue->next;
|
||||
if (queue == NULL) break;
|
||||
if (queue->head == NULL) continue;
|
||||
if (queue->threadId != -1 && queue->threadId != threadId) {
|
||||
code = 0;
|
||||
continue;
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&queue->mutex);
|
||||
|
||||
if (queue->head) {
|
||||
pNode = queue->head;
|
||||
pNode->queue = queue;
|
||||
queue->threadId = threadId;
|
||||
*ppItem = pNode->item;
|
||||
|
||||
if (ahandle) *ahandle = queue->ahandle;
|
||||
if (itemFp) *itemFp = queue->itemFp;
|
||||
|
||||
queue->head = pNode->next;
|
||||
if (queue->head == NULL) queue->tail = NULL;
|
||||
queue->numOfItems--;
|
||||
atomic_sub_fetch_32(&qset->numOfItems, 1);
|
||||
code = 1;
|
||||
uTrace("item:%p is read out from queue:%p, items:%d", *ppItem, queue, queue->numOfItems);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&queue->mutex);
|
||||
if (pNode) break;
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&qset->mutex);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void taosResetQsetThread(STaosQset *qset, void *pItem) {
|
||||
if (pItem == NULL) return;
|
||||
STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
|
||||
|
|
|
@ -93,7 +93,7 @@ static void *tQWorkerThreadFp(SQWorker *worker) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
STaosQueue *tWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp, ThreadFp threadFp) {
|
||||
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
|
||||
taosThreadMutexLock(&pool->mutex);
|
||||
STaosQueue *queue = taosOpenQueue();
|
||||
if (queue == NULL) {
|
||||
|
@ -114,7 +114,7 @@ STaosQueue *tWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp, Threa
|
|||
taosThreadAttrInit(&thAttr);
|
||||
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
if (taosThreadCreate(&worker->thread, &thAttr, threadFp, worker) != 0) {
|
||||
if (taosThreadCreate(&worker->thread, &thAttr, (ThreadFp)tQWorkerThreadFp, worker) != 0) {
|
||||
uError("worker:%s:%d failed to create thread to process since %s", pool->name, worker->id, strerror(errno));
|
||||
taosCloseQueue(queue);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -134,58 +134,11 @@ STaosQueue *tWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp, Threa
|
|||
return queue;
|
||||
}
|
||||
|
||||
STaosQueue *tQWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
|
||||
return tWorkerAllocQueue(pool, ahandle, fp, (ThreadFp)tQWorkerThreadFp);
|
||||
}
|
||||
|
||||
void tQWorkerFreeQueue(SQWorkerPool *pool, STaosQueue *queue) {
|
||||
taosCloseQueue(queue);
|
||||
uDebug("worker:%s, queue:%p is freed", pool->name, queue);
|
||||
}
|
||||
|
||||
int32_t tFWorkerInit(SFWorkerPool *pool) { return tQWorkerInit((SQWorkerPool *)pool); }
|
||||
|
||||
void tFWorkerCleanup(SFWorkerPool *pool) { tQWorkerCleanup(pool); }
|
||||
|
||||
static void *tFWorkerThreadFp(SQWorker *worker) {
|
||||
SQWorkerPool *pool = worker->pool;
|
||||
|
||||
FItem fp = NULL;
|
||||
void *msg = NULL;
|
||||
void *ahandle = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
taosBlockSIGPIPE();
|
||||
setThreadName(pool->name);
|
||||
uDebug("worker:%s:%d is running", pool->name, worker->id);
|
||||
|
||||
while (1) {
|
||||
code = taosReadQitemFromQsetByThread(pool->qset, (void **)&msg, &ahandle, &fp, worker->id);
|
||||
|
||||
if (code < 0) {
|
||||
uDebug("worker:%s:%d qset:%p, got no message and exiting", pool->name, worker->id, pool->qset);
|
||||
break;
|
||||
} else if (code == 0) {
|
||||
// uTrace("worker:%s:%d qset:%p, got no message and continue", pool->name, worker->id, pool->qset);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (fp != NULL) {
|
||||
(*fp)(ahandle, msg);
|
||||
}
|
||||
|
||||
taosResetQsetThread(pool->qset, msg);
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
STaosQueue *tFWorkerAllocQueue(SQWorkerPool *pool, void *ahandle, FItem fp) {
|
||||
return tWorkerAllocQueue(pool, ahandle, fp, (ThreadFp)tQWorkerThreadFp);
|
||||
}
|
||||
|
||||
void tFWorkerFreeQueue(SFWorkerPool *pool, STaosQueue *queue) { tQWorkerFreeQueue(pool, queue); }
|
||||
|
||||
int32_t tWWorkerInit(SWWorkerPool *pool) {
|
||||
pool->nextId = 0;
|
||||
pool->workers = calloc(pool->max, sizeof(SWWorker));
|
||||
|
|
Loading…
Reference in New Issue