refact worker
This commit is contained in:
parent
4fc74ced94
commit
bb5eef2dcf
|
@ -110,7 +110,7 @@ typedef struct {
|
||||||
int32_t totalVnodes;
|
int32_t totalVnodes;
|
||||||
SRWLatch latch;
|
SRWLatch latch;
|
||||||
SQWorkerPool queryPool;
|
SQWorkerPool queryPool;
|
||||||
SQWorkerPool fetchPool;
|
SFWorkerPool fetchPool;
|
||||||
SWWorkerPool syncPool;
|
SWWorkerPool syncPool;
|
||||||
SWWorkerPool writePool;
|
SWWorkerPool writePool;
|
||||||
} SVnodesMgmt;
|
} SVnodesMgmt;
|
||||||
|
|
|
@ -920,7 +920,7 @@ static int32_t dndInitVnodeWorkers(SDnode *pDnode) {
|
||||||
pPool->name = "vnode-fetch";
|
pPool->name = "vnode-fetch";
|
||||||
pPool->min = minFetchThreads;
|
pPool->min = minFetchThreads;
|
||||||
pPool->max = maxFetchThreads;
|
pPool->max = maxFetchThreads;
|
||||||
if (tQWorkerInit(pPool) != 0) return -1;
|
if (tFWorkerInit(pPool) != 0) return -1;
|
||||||
|
|
||||||
SWWorkerPool *pMPool = &pMgmt->writePool;
|
SWWorkerPool *pMPool = &pMgmt->writePool;
|
||||||
pMPool->name = "vnode-write";
|
pMPool->name = "vnode-write";
|
||||||
|
@ -938,7 +938,7 @@ static int32_t dndInitVnodeWorkers(SDnode *pDnode) {
|
||||||
|
|
||||||
static void dndCleanupVnodeWorkers(SDnode *pDnode) {
|
static void dndCleanupVnodeWorkers(SDnode *pDnode) {
|
||||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
||||||
tQWorkerCleanup(&pMgmt->fetchPool);
|
tFWorkerCleanup(&pMgmt->fetchPool);
|
||||||
tQWorkerCleanup(&pMgmt->queryPool);
|
tQWorkerCleanup(&pMgmt->queryPool);
|
||||||
tWWorkerCleanup(&pMgmt->writePool);
|
tWWorkerCleanup(&pMgmt->writePool);
|
||||||
tWWorkerCleanup(&pMgmt->syncPool);
|
tWWorkerCleanup(&pMgmt->syncPool);
|
||||||
|
@ -951,7 +951,7 @@ static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) {
|
||||||
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)dndProcessVnodeWriteQueue);
|
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)dndProcessVnodeWriteQueue);
|
||||||
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)dndProcessVnodeApplyQueue);
|
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)dndProcessVnodeApplyQueue);
|
||||||
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)dndProcessVnodeSyncQueue);
|
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)dndProcessVnodeSyncQueue);
|
||||||
pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)dndProcessVnodeFetchQueue);
|
pVnode->pFetchQ = tFWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)dndProcessVnodeFetchQueue);
|
||||||
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)dndProcessVnodeQueryQueue);
|
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)dndProcessVnodeQueryQueue);
|
||||||
|
|
||||||
if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL ||
|
if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL ||
|
||||||
|
@ -966,7 +966,7 @@ static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) {
|
||||||
static void dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) {
|
static void dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) {
|
||||||
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
|
||||||
tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
|
tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
|
||||||
tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
|
tFWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
|
||||||
tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
|
tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ);
|
||||||
tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
|
tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ);
|
||||||
tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
|
tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ);
|
||||||
|
|
Loading…
Reference in New Issue