refact queue

This commit is contained in:
Shengliang Guan 2022-03-22 16:31:21 +08:00
parent a340baa879
commit f48fddc7c4
4 changed files with 4 additions and 5 deletions

View File

@ -44,7 +44,6 @@ typedef struct STaosQset STaosQset;
typedef struct STaosQall STaosQall; typedef struct STaosQall STaosQall;
typedef struct { typedef struct {
void *ahandle; void *ahandle;
int32_t qsize;
int32_t workerId; int32_t workerId;
int32_t threadNum; int32_t threadNum;
} SQueueInfo; } SQueueInfo;

View File

@ -101,14 +101,14 @@ static void dmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
int32_t dmStartWorker(SDnodeMgmt *pMgmt) { int32_t dmStartWorker(SDnodeMgmt *pMgmt) {
SQWorkerAllCfg mgmtCfg = { SQWorkerAllCfg mgmtCfg = {
.minNum = 0, .maxNum = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt}; .minNum = 1, .maxNum = 1, .name = "dnode-mgmt", .fp = (FItem)dmProcessQueue, .param = pMgmt};
if (tQWorkerAllInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) { if (tQWorkerAllInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) {
dError("failed to start dnode mgmt worker since %s", terrstr()); dError("failed to start dnode mgmt worker since %s", terrstr());
return -1; return -1;
} }
SQWorkerAllCfg statusCfg = { SQWorkerAllCfg statusCfg = {
.minNum = 0, .maxNum = 1, .name = "dnode-status", .fp = (FItem)dmProcessQueue, .param = pMgmt}; .minNum = 1, .maxNum = 1, .name = "dnode-status", .fp = (FItem)dmProcessQueue, .param = pMgmt};
if (tQWorkerAllInit(&pMgmt->statusWorker, &statusCfg) != 0) { if (tQWorkerAllInit(&pMgmt->statusWorker, &statusCfg) != 0) {
dError("failed to start dnode status worker since %s", terrstr()); dError("failed to start dnode status worker since %s", terrstr());
return -1; return -1;

View File

@ -127,7 +127,7 @@ int32_t qmStartWorker(SQnodeMgmt *pMgmt) {
.fp = (FItem)qmProcessFetchQueue, .fp = (FItem)qmProcessFetchQueue,
.param = pMgmt}; .param = pMgmt};
if (tQWorkerAllInit(&pMgmt->queryWorker, &fetchCfg) != 0) { if (tQWorkerAllInit(&pMgmt->fetchWorker, &fetchCfg) != 0) {
dError("failed to start qnode-fetch worker since %s", terrstr()); dError("failed to start qnode-fetch worker since %s", terrstr());
return -1; return -1;
} }

View File

@ -212,7 +212,7 @@ static void *tWWorkerThreadFp(SWWorker *worker) {
if (fp != NULL) { if (fp != NULL) {
SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num}; SQueueInfo info = {.ahandle = ahandle, .workerId = worker->id, .threadNum = pool->num};
(*fp)(ahandle, worker->qall, numOfMsgs); (*fp)(&info, worker->qall, numOfMsgs);
} }
} }