enh(stream): add ctrl queue to handle hb rsp, to avoid hb rsp not being confirmed if all stream threads are occupied.
This commit is contained in:
parent
d344228242
commit
30039bb53b
|
@ -37,6 +37,7 @@ typedef enum {
|
|||
SYNC_RD_QUEUE,
|
||||
STREAM_QUEUE,
|
||||
ARB_QUEUE,
|
||||
STREAM_CTRL_QUEUE,
|
||||
QUEUE_MAX,
|
||||
} EQueueType;
|
||||
|
||||
|
|
|
@ -32,6 +32,7 @@ typedef struct SVnodeMgmt {
|
|||
const char *name;
|
||||
SQueryAutoQWorkerPool queryPool;
|
||||
SAutoQWorkerPool streamPool;
|
||||
SWWorkerPool streamCtrlPool;
|
||||
SWWorkerPool fetchPool;
|
||||
SSingleWorker mgmtWorker;
|
||||
SSingleWorker mgmtMultiWorker;
|
||||
|
@ -73,6 +74,7 @@ typedef struct {
|
|||
SMultiWorker pApplyW;
|
||||
STaosQueue *pQueryQ;
|
||||
STaosQueue *pStreamQ;
|
||||
STaosQueue *pStreamCtrlQ;
|
||||
STaosQueue *pFetchQ;
|
||||
STaosQueue *pMultiMgmQ;
|
||||
} SVnodeObj;
|
||||
|
@ -134,6 +136,7 @@ int32_t vmPutMsgToSyncRdQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
|||
int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
|
|
|
@ -1022,7 +1022,7 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -395,9 +395,13 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal,
|
|||
while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
|
||||
|
||||
tqNotifyClose(pVnode->pImpl->pTq);
|
||||
|
||||
dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ);
|
||||
while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
|
||||
|
||||
dInfo("vgId:%d, wait for vnode stream ctrl queue:%p is empty", pVnode->vgId, pVnode->pStreamCtrlQ);
|
||||
while (!taosQueueEmpty(pVnode->pStreamCtrlQ)) taosMsleep(10);
|
||||
|
||||
dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
|
||||
|
||||
dInfo("vgId:%d, post close", pVnode->vgId);
|
||||
|
|
|
@ -137,6 +137,34 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
|||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static void vmProcessStreamCtrlQueue(SQueueInfo *pInfo, STaosQall* pQall, int32_t numOfItems) {
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
void *pItem = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
while (1) {
|
||||
if (taosGetQitem(pQall, &pItem) == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
SRpcMsg *pMsg = pItem;
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
|
||||
dGTrace("vgId:%d, msg:%p get from vnode-ctrl-stream queue", pVnode->vgId, pMsg);
|
||||
code = vnodeProcessStreamCtrlMsg(pVnode->pImpl, pMsg, pInfo);
|
||||
if (code != 0) {
|
||||
terrno = code;
|
||||
dGError("vgId:%d, msg:%p failed to process stream ctrl msg %s since %s", pVnode->vgId, pMsg,
|
||||
TMSG_INFO(pMsg->msgType), tstrerror(code));
|
||||
vmSendRsp(pMsg, code);
|
||||
}
|
||||
|
||||
dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
SRpcMsg *pMsg = NULL;
|
||||
|
@ -245,6 +273,10 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
|
|||
dGTrace("vgId:%d, msg:%p put into vnode-stream queue", pVnode->vgId, pMsg);
|
||||
code = taosWriteQitem(pVnode->pStreamQ, pMsg);
|
||||
break;
|
||||
case STREAM_CTRL_QUEUE:
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-ctrl-stream queue", pVnode->vgId, pMsg);
|
||||
code = taosWriteQitem(pVnode->pStreamCtrlQ, pMsg);
|
||||
break;
|
||||
case FETCH_QUEUE:
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg);
|
||||
code = taosWriteQitem(pVnode->pFetchQ, pMsg);
|
||||
|
@ -301,6 +333,8 @@ int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsg
|
|||
|
||||
int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_QUEUE); }
|
||||
|
||||
int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_CTRL_QUEUE); }
|
||||
|
||||
int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
dGTrace("msg:%p, put into vnode-multi-mgmt queue", pMsg);
|
||||
|
@ -373,6 +407,8 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
|
|||
case STREAM_QUEUE:
|
||||
size = taosQueueItemSize(pVnode->pStreamQ);
|
||||
break;
|
||||
case STREAM_CTRL_QUEUE:
|
||||
size = taosQueueItemSize(pVnode->pStreamCtrlQ);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -417,9 +453,11 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
|||
pVnode->pQueryQ = tQueryAutoQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
|
||||
pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue);
|
||||
pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue);
|
||||
pVnode->pStreamCtrlQ = tWWorkerAllocQueue(&pMgmt->streamCtrlPool, pVnode, (FItems)vmProcessStreamCtrlQueue);
|
||||
|
||||
if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncRdW.queue == NULL ||
|
||||
pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL) {
|
||||
pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL
|
||||
|| pVnode->pStreamCtrlQ == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
@ -435,15 +473,19 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
|||
dInfo("vgId:%d, fetch-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
|
||||
taosQueueGetThreadId(pVnode->pFetchQ));
|
||||
dInfo("vgId:%d, stream-queue:%p is alloced", pVnode->vgId, pVnode->pStreamQ);
|
||||
dInfo("vgId:%d, stream-ctrl-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pStreamCtrlQ,
|
||||
taosQueueGetThreadId(pVnode->pStreamCtrlQ));
|
||||
return 0;
|
||||
}
|
||||
|
||||
void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||
tQueryAutoQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
|
||||
tAutoQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ);
|
||||
tWWorkerFreeQueue(&pMgmt->streamCtrlPool, pVnode->pStreamCtrlQ);
|
||||
tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
|
||||
pVnode->pQueryQ = NULL;
|
||||
pVnode->pStreamQ = NULL;
|
||||
pVnode->pStreamCtrlQ = NULL;
|
||||
pVnode->pFetchQ = NULL;
|
||||
dDebug("vgId:%d, queue is freed", pVnode->vgId);
|
||||
}
|
||||
|
@ -463,6 +505,11 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
|
|||
pStreamPool->ratio = tsRatioOfVnodeStreamThreads;
|
||||
if ((code = tAutoQWorkerInit(pStreamPool)) != 0) return code;
|
||||
|
||||
SWWorkerPool *pStreamCtrlPool = &pMgmt->streamCtrlPool;
|
||||
pStreamCtrlPool->name = "vnode-ctrl-stream";
|
||||
pStreamCtrlPool->max = 1;
|
||||
if ((code = tWWorkerInit(pStreamCtrlPool)) != 0) return code;
|
||||
|
||||
SWWorkerPool *pFPool = &pMgmt->fetchPool;
|
||||
pFPool->name = "vnode-fetch";
|
||||
pFPool->max = tsNumOfVnodeFetchThreads;
|
||||
|
|
|
@ -112,6 +112,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
|||
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||
int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||
int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
|
||||
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
|
||||
void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit);
|
||||
|
|
|
@ -921,7 +921,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
|||
}
|
||||
|
||||
int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||
vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
|
||||
vTrace("vgId:%d, msg:%p in stream queue is processing", pVnode->config.vgId, pMsg);
|
||||
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
|
||||
pMsg->msgType == TDMT_VND_BATCH_META) &&
|
||||
!syncIsReadyForRead(pVnode->sync)) {
|
||||
|
@ -954,8 +954,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
|
|||
return tqProcessTaskRetrieveTriggerReq(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_RETRIEVE_TRIGGER_RSP:
|
||||
return tqProcessTaskRetrieveTriggerRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_MND_STREAM_HEARTBEAT_RSP:
|
||||
return tqProcessStreamHbRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_MND_STREAM_REQ_CHKPT_RSP:
|
||||
return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_GET_STREAM_PROGRESS:
|
||||
|
@ -968,6 +966,24 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
|
|||
}
|
||||
}
|
||||
|
||||
int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||
vTrace("vgId:%d, msg:%p in stream ctrl queue is processing", pVnode->config.vgId, pMsg);
|
||||
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
|
||||
pMsg->msgType == TDMT_VND_BATCH_META) &&
|
||||
!syncIsReadyForRead(pVnode->sync)) {
|
||||
vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
|
||||
return 0;
|
||||
}
|
||||
|
||||
switch (pMsg->msgType) {
|
||||
case TDMT_MND_STREAM_HEARTBEAT_RSP:
|
||||
return tqProcessStreamHbRsp(pVnode->pTq, pMsg);
|
||||
default:
|
||||
vError("unknown msg type:%d in stream ctrl queue", pMsg->msgType);
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
|
||||
int32_t code = tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
|
||||
if (code) {
|
||||
|
|
Loading…
Reference in New Issue