refact queue

This commit is contained in:
Shengliang Guan 2022-03-22 14:38:26 +08:00
parent fe8714fe56
commit 4091771274
5 changed files with 108 additions and 18 deletions

View File

@ -67,6 +67,38 @@ void tWWorkerCleanup(SWWorkerPool *pool);
STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp); STaosQueue *tWWorkerAllocQueue(SWWorkerPool *pool, void *ahandle, FItems fp);
void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue); void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue);
typedef struct {
const char *name;
int32_t minNum;
int32_t maxNum;
FItem fp;
void *param;
} SQWorkerAllCfg;
typedef struct {
const char *name;
STaosQueue *queue;
SQWorkerPool pool;
} SQWorkerAll;
typedef struct {
const char *name;
int32_t maxNum;
FItems fp;
void *param;
} SWWorkerAllCfg;
typedef struct {
const char *name;
STaosQueue *queue;
SWWorkerPool pool;
} SWWorkerAll;
int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg);
void tQWorkerAllCleanup(SQWorkerAll *pWorker);
int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg);
void tWWorkerAllCleanup(SWWorkerAll *pWorker);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -28,9 +28,9 @@ typedef struct SMnodeMgmt {
SDnode *pDnode; SDnode *pDnode;
SMgmtWrapper *pWrapper; SMgmtWrapper *pWrapper;
const char *path; const char *path;
SDnodeWorker readWorker; SQWorkerAll readWorker;
SDnodeWorker writeWorker; SQWorkerAll writeWorker;
SDnodeWorker syncWorker; SQWorkerAll syncWorker;
SReplica replicas[TSDB_MAX_REPLICA]; SReplica replicas[TSDB_MAX_REPLICA];
int8_t replica; int8_t replica;
int8_t selfIndex; int8_t selfIndex;

View File

@ -42,9 +42,9 @@ static void mmProcessQueue(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SNodeMsg *pMsg) { static int32_t mmPutMsgToWorker(SMnodeMgmt *pMgmt, SQWorkerAll *pWorker, SNodeMsg *pMsg) {
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
return dndWriteMsgToWorker(pWorker, pMsg); return taosWriteQitem(pWorker->queue, pMsg);
} }
int32_t mmProcessWriteMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) { int32_t mmProcessWriteMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
@ -59,7 +59,7 @@ int32_t mmProcessReadMsg(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg); return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
} }
static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRpcMsg *pRpc) { static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SQWorkerAll *pWorker, SRpcMsg *pRpc) {
SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg)); SNodeMsg *pMsg = taosAllocateQitem(sizeof(SNodeMsg));
if (pMsg == NULL) { if (pMsg == NULL) {
return -1; return -1;
@ -68,7 +68,7 @@ static int32_t mmPutRpcMsgToWorker(SMnodeMgmt *pMgmt, SDnodeWorker *pWorker, SRp
dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType)); dTrace("msg:%p, is created and put into worker:%s, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType));
pMsg->rpcMsg = *pRpc; pMsg->rpcMsg = *pRpc;
int32_t code = dndWriteMsgToWorker(pWorker, pMsg); int32_t code = taosWriteQitem(pWorker->queue, pMsg);
if (code != 0) { if (code != 0) {
dTrace("msg:%p, is freed", pMsg); dTrace("msg:%p, is freed", pMsg);
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
@ -89,18 +89,20 @@ int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
} }
int32_t mmStartWorker(SMnodeMgmt *pMgmt) { int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
if (dndInitWorker(pMgmt, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmProcessQueue) != 0) { SQWorkerAllCfg cfg = {.minNum = 0, .maxNum = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt};
dError("failed to start mnode read worker since %s", terrstr());
if (tQWorkerAllInit(&pMgmt->readWorker, &cfg) != 0) {
dError("failed to start mnode-read worker since %s", terrstr());
return -1; return -1;
} }
if (dndInitWorker(pMgmt, &pMgmt->writeWorker, DND_WORKER_SINGLE, "mnode-write", 0, 1, mmProcessQueue) != 0) { if (tQWorkerAllInit(&pMgmt->writeWorker, &cfg) != 0) {
dError("failed to start mnode write worker since %s", terrstr()); dError("failed to start mnode-write worker since %s", terrstr());
return -1; return -1;
} }
if (dndInitWorker(pMgmt, &pMgmt->syncWorker, DND_WORKER_SINGLE, "mnode-sync", 0, 1, mmProcessQueue) != 0) { if (tQWorkerAllInit(&pMgmt->syncWorker, &cfg) != 0) {
dError("failed to start mnode sync worker since %s", terrstr()); dError("failed to start mnode sync-worker since %s", terrstr());
return -1; return -1;
} }
@ -108,7 +110,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
} }
void mmStopWorker(SMnodeMgmt *pMgmt) { void mmStopWorker(SMnodeMgmt *pMgmt) {
dndCleanupWorker(&pMgmt->readWorker); tQWorkerAllCleanup(&pMgmt->readWorker);
dndCleanupWorker(&pMgmt->writeWorker); tQWorkerAllCleanup(&pMgmt->writeWorker);
dndCleanupWorker(&pMgmt->syncWorker); tQWorkerAllCleanup(&pMgmt->syncWorker);
} }

View File

@ -441,7 +441,6 @@ void taosResetQsetThread(STaosQset *qset, void *pItem) {
STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode)); STaosQnode *pNode = (STaosQnode *)((char *)pItem - sizeof(STaosQnode));
taosThreadMutexLock(&qset->mutex); taosThreadMutexLock(&qset->mutex);
pNode->queue->threadId = -1;
for (int32_t i = 0; i < pNode->queue->numOfItems; ++i) { for (int32_t i = 0; i < pNode->queue->numOfItems; ++i) {
tsem_post(&qset->sem); tsem_post(&qset->sem);
} }

View File

@ -22,7 +22,7 @@ typedef void *(*ThreadFp)(void *param);
int32_t tQWorkerInit(SQWorkerPool *pool) { int32_t tQWorkerInit(SQWorkerPool *pool) {
pool->qset = taosOpenQset(); pool->qset = taosOpenQset();
pool->workers = calloc(sizeof(SQWorker), pool->max); pool->workers = calloc(pool->max, sizeof(SQWorker));
if (pool->workers == NULL) { if (pool->workers == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
@ -279,3 +279,60 @@ void tWWorkerFreeQueue(SWWorkerPool *pool, STaosQueue *queue) {
taosCloseQueue(queue); taosCloseQueue(queue);
uDebug("worker:%s, queue:%p is freed", pool->name, queue); uDebug("worker:%s, queue:%p is freed", pool->name, queue);
} }
int32_t tQWorkerAllInit(SQWorkerAll *pWorker, const SQWorkerAllCfg *pCfg) {
SQWorkerPool *pPool = &pWorker->pool;
pPool->name = pCfg->name;
pPool->min = pCfg->minNum;
pPool->max = pCfg->maxNum;
if (tQWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pWorker->queue = tQWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
if (pWorker->queue == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pWorker->name = pCfg->name;
return 0;
}
void tQWorkerAllCleanup(SQWorkerAll *pWorker) {
if (pWorker->queue == NULL) return;
while (!taosQueueEmpty(pWorker->queue)) {
taosMsleep(10);
}
tQWorkerCleanup(&pWorker->pool);
tQWorkerFreeQueue(&pWorker->pool, pWorker->queue);
}
int32_t tWWorkerAllInit(SWWorkerAll *pWorker, const SWWorkerAllCfg *pCfg) {
SWWorkerPool *pPool = &pWorker->pool;
pPool->name = pCfg->name;
pPool->max = pCfg->maxNum;
if (tWWorkerInit(pPool) != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pWorker->queue = tWWorkerAllocQueue(pPool, pCfg->param, pCfg->fp);
if (pWorker->queue == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pWorker->name = pCfg->name;
return 0;
}
void tWWorkerAllCleanup(SWWorkerAll *pWorker) {
if (pWorker->queue == NULL) return;
while (!taosQueueEmpty(pWorker->queue)) {
taosMsleep(10);
}
tWWorkerCleanup(&pWorker->pool);
tWWorkerFreeQueue(&pWorker->pool, pWorker->queue);
}