Merge pull request #29544 from taosdata/fix/tag
refactor(stream): do some internal refactor .
This commit is contained in:
commit
3089e3d904
|
@ -41,6 +41,7 @@ typedef struct SStreamUpstreamEpInfo {
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it
|
bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it
|
||||||
int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer
|
int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer
|
||||||
|
int64_t lastMsgId;
|
||||||
} SStreamUpstreamEpInfo;
|
} SStreamUpstreamEpInfo;
|
||||||
|
|
||||||
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo);
|
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo);
|
||||||
|
|
|
@ -37,6 +37,7 @@ typedef enum {
|
||||||
SYNC_RD_QUEUE,
|
SYNC_RD_QUEUE,
|
||||||
STREAM_QUEUE,
|
STREAM_QUEUE,
|
||||||
ARB_QUEUE,
|
ARB_QUEUE,
|
||||||
|
STREAM_CTRL_QUEUE,
|
||||||
QUEUE_MAX,
|
QUEUE_MAX,
|
||||||
} EQueueType;
|
} EQueueType;
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,7 @@ typedef struct SVnodeMgmt {
|
||||||
const char *name;
|
const char *name;
|
||||||
SQueryAutoQWorkerPool queryPool;
|
SQueryAutoQWorkerPool queryPool;
|
||||||
SAutoQWorkerPool streamPool;
|
SAutoQWorkerPool streamPool;
|
||||||
|
SWWorkerPool streamCtrlPool;
|
||||||
SWWorkerPool fetchPool;
|
SWWorkerPool fetchPool;
|
||||||
SSingleWorker mgmtWorker;
|
SSingleWorker mgmtWorker;
|
||||||
SSingleWorker mgmtMultiWorker;
|
SSingleWorker mgmtMultiWorker;
|
||||||
|
@ -73,6 +74,7 @@ typedef struct {
|
||||||
SMultiWorker pApplyW;
|
SMultiWorker pApplyW;
|
||||||
STaosQueue *pQueryQ;
|
STaosQueue *pQueryQ;
|
||||||
STaosQueue *pStreamQ;
|
STaosQueue *pStreamQ;
|
||||||
|
STaosQueue *pStreamCtrlQ;
|
||||||
STaosQueue *pFetchQ;
|
STaosQueue *pFetchQ;
|
||||||
STaosQueue *pMultiMgmQ;
|
STaosQueue *pMultiMgmQ;
|
||||||
} SVnodeObj;
|
} SVnodeObj;
|
||||||
|
@ -134,6 +136,7 @@ int32_t vmPutMsgToSyncRdQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t vmPutMsgToQueryQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
|
int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||||
|
|
|
@ -1022,7 +1022,7 @@ SArray *vmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_TRIGGER_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -395,9 +395,13 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal,
|
||||||
while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
|
while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10);
|
||||||
|
|
||||||
tqNotifyClose(pVnode->pImpl->pTq);
|
tqNotifyClose(pVnode->pImpl->pTq);
|
||||||
|
|
||||||
dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ);
|
dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ);
|
||||||
while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
|
while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
|
||||||
|
|
||||||
|
dInfo("vgId:%d, wait for vnode stream ctrl queue:%p is empty", pVnode->vgId, pVnode->pStreamCtrlQ);
|
||||||
|
while (!taosQueueEmpty(pVnode->pStreamCtrlQ)) taosMsleep(10);
|
||||||
|
|
||||||
dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
|
dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
|
||||||
|
|
||||||
dInfo("vgId:%d, post close", pVnode->vgId);
|
dInfo("vgId:%d, post close", pVnode->vgId);
|
||||||
|
|
|
@ -137,6 +137,34 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void vmProcessStreamCtrlQueue(SQueueInfo *pInfo, STaosQall* pQall, int32_t numOfItems) {
|
||||||
|
SVnodeObj *pVnode = pInfo->ahandle;
|
||||||
|
void *pItem = NULL;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
if (taosGetQitem(pQall, &pItem) == 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
SRpcMsg *pMsg = pItem;
|
||||||
|
const STraceId *trace = &pMsg->info.traceId;
|
||||||
|
|
||||||
|
dGTrace("vgId:%d, msg:%p get from vnode-ctrl-stream queue", pVnode->vgId, pMsg);
|
||||||
|
code = vnodeProcessStreamCtrlMsg(pVnode->pImpl, pMsg, pInfo);
|
||||||
|
if (code != 0) {
|
||||||
|
terrno = code;
|
||||||
|
dGError("vgId:%d, msg:%p failed to process stream ctrl msg %s since %s", pVnode->vgId, pMsg,
|
||||||
|
TMSG_INFO(pMsg->msgType), tstrerror(code));
|
||||||
|
vmSendRsp(pMsg, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
taosFreeQitem(pMsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
static void vmProcessFetchQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
SVnodeObj *pVnode = pInfo->ahandle;
|
SVnodeObj *pVnode = pInfo->ahandle;
|
||||||
SRpcMsg *pMsg = NULL;
|
SRpcMsg *pMsg = NULL;
|
||||||
|
@ -245,6 +273,10 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
|
||||||
dGTrace("vgId:%d, msg:%p put into vnode-stream queue", pVnode->vgId, pMsg);
|
dGTrace("vgId:%d, msg:%p put into vnode-stream queue", pVnode->vgId, pMsg);
|
||||||
code = taosWriteQitem(pVnode->pStreamQ, pMsg);
|
code = taosWriteQitem(pVnode->pStreamQ, pMsg);
|
||||||
break;
|
break;
|
||||||
|
case STREAM_CTRL_QUEUE:
|
||||||
|
dGTrace("vgId:%d, msg:%p put into vnode-ctrl-stream queue", pVnode->vgId, pMsg);
|
||||||
|
code = taosWriteQitem(pVnode->pStreamCtrlQ, pMsg);
|
||||||
|
break;
|
||||||
case FETCH_QUEUE:
|
case FETCH_QUEUE:
|
||||||
dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg);
|
dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg);
|
||||||
code = taosWriteQitem(pVnode->pFetchQ, pMsg);
|
code = taosWriteQitem(pVnode->pFetchQ, pMsg);
|
||||||
|
@ -301,6 +333,8 @@ int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsg
|
||||||
|
|
||||||
int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_QUEUE); }
|
int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_QUEUE); }
|
||||||
|
|
||||||
|
int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_CTRL_QUEUE); }
|
||||||
|
|
||||||
int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
const STraceId *trace = &pMsg->info.traceId;
|
const STraceId *trace = &pMsg->info.traceId;
|
||||||
dGTrace("msg:%p, put into vnode-multi-mgmt queue", pMsg);
|
dGTrace("msg:%p, put into vnode-multi-mgmt queue", pMsg);
|
||||||
|
@ -373,6 +407,8 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
|
||||||
case STREAM_QUEUE:
|
case STREAM_QUEUE:
|
||||||
size = taosQueueItemSize(pVnode->pStreamQ);
|
size = taosQueueItemSize(pVnode->pStreamQ);
|
||||||
break;
|
break;
|
||||||
|
case STREAM_CTRL_QUEUE:
|
||||||
|
size = taosQueueItemSize(pVnode->pStreamCtrlQ);
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -417,9 +453,11 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
pVnode->pQueryQ = tQueryAutoQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
|
pVnode->pQueryQ = tQueryAutoQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
|
||||||
pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue);
|
pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue);
|
||||||
pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue);
|
pVnode->pFetchQ = tWWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItems)vmProcessFetchQueue);
|
||||||
|
pVnode->pStreamCtrlQ = tWWorkerAllocQueue(&pMgmt->streamCtrlPool, pVnode, (FItems)vmProcessStreamCtrlQueue);
|
||||||
|
|
||||||
if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncRdW.queue == NULL ||
|
if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncRdW.queue == NULL ||
|
||||||
pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL) {
|
pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL
|
||||||
|
|| pVnode->pStreamCtrlQ == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -435,15 +473,19 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
dInfo("vgId:%d, fetch-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
|
dInfo("vgId:%d, fetch-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pFetchQ,
|
||||||
taosQueueGetThreadId(pVnode->pFetchQ));
|
taosQueueGetThreadId(pVnode->pFetchQ));
|
||||||
dInfo("vgId:%d, stream-queue:%p is alloced", pVnode->vgId, pVnode->pStreamQ);
|
dInfo("vgId:%d, stream-queue:%p is alloced", pVnode->vgId, pVnode->pStreamQ);
|
||||||
|
dInfo("vgId:%d, stream-ctrl-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pStreamCtrlQ,
|
||||||
|
taosQueueGetThreadId(pVnode->pStreamCtrlQ));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
tQueryAutoQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
|
tQueryAutoQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ);
|
||||||
tAutoQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ);
|
tAutoQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ);
|
||||||
|
tWWorkerFreeQueue(&pMgmt->streamCtrlPool, pVnode->pStreamCtrlQ);
|
||||||
tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
|
tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
|
||||||
pVnode->pQueryQ = NULL;
|
pVnode->pQueryQ = NULL;
|
||||||
pVnode->pStreamQ = NULL;
|
pVnode->pStreamQ = NULL;
|
||||||
|
pVnode->pStreamCtrlQ = NULL;
|
||||||
pVnode->pFetchQ = NULL;
|
pVnode->pFetchQ = NULL;
|
||||||
dDebug("vgId:%d, queue is freed", pVnode->vgId);
|
dDebug("vgId:%d, queue is freed", pVnode->vgId);
|
||||||
}
|
}
|
||||||
|
@ -463,6 +505,11 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
|
||||||
pStreamPool->ratio = tsRatioOfVnodeStreamThreads;
|
pStreamPool->ratio = tsRatioOfVnodeStreamThreads;
|
||||||
if ((code = tAutoQWorkerInit(pStreamPool)) != 0) return code;
|
if ((code = tAutoQWorkerInit(pStreamPool)) != 0) return code;
|
||||||
|
|
||||||
|
SWWorkerPool *pStreamCtrlPool = &pMgmt->streamCtrlPool;
|
||||||
|
pStreamCtrlPool->name = "vnode-ctrl-stream";
|
||||||
|
pStreamCtrlPool->max = 1;
|
||||||
|
if ((code = tWWorkerInit(pStreamCtrlPool)) != 0) return code;
|
||||||
|
|
||||||
SWWorkerPool *pFPool = &pMgmt->fetchPool;
|
SWWorkerPool *pFPool = &pMgmt->fetchPool;
|
||||||
pFPool->name = "vnode-fetch";
|
pFPool->name = "vnode-fetch";
|
||||||
pFPool->max = tsNumOfVnodeFetchThreads;
|
pFPool->max = tsNumOfVnodeFetchThreads;
|
||||||
|
@ -495,5 +542,6 @@ void vmStopWorker(SVnodeMgmt *pMgmt) {
|
||||||
tQueryAutoQWorkerCleanup(&pMgmt->queryPool);
|
tQueryAutoQWorkerCleanup(&pMgmt->queryPool);
|
||||||
tAutoQWorkerCleanup(&pMgmt->streamPool);
|
tAutoQWorkerCleanup(&pMgmt->streamPool);
|
||||||
tWWorkerCleanup(&pMgmt->fetchPool);
|
tWWorkerCleanup(&pMgmt->fetchPool);
|
||||||
|
tWWorkerCleanup(&pMgmt->streamCtrlPool);
|
||||||
dDebug("vnode workers are closed");
|
dDebug("vnode workers are closed");
|
||||||
}
|
}
|
||||||
|
|
|
@ -112,6 +112,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||||
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||||
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||||
int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||||
|
int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||||
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
|
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
|
||||||
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
|
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
|
||||||
void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit);
|
void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit);
|
||||||
|
|
|
@ -1179,6 +1179,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo: add test case for invalid rsp for resume: injection error for always return error
|
||||||
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* msg, bool fromVnode) {
|
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* msg, bool fromVnode) {
|
||||||
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
|
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
|
||||||
|
|
||||||
|
|
|
@ -930,7 +930,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||||
vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
|
vTrace("vgId:%d, msg:%p in stream queue is processing", pVnode->config.vgId, pMsg);
|
||||||
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
|
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
|
||||||
pMsg->msgType == TDMT_VND_BATCH_META) &&
|
pMsg->msgType == TDMT_VND_BATCH_META) &&
|
||||||
!syncIsReadyForRead(pVnode->sync)) {
|
!syncIsReadyForRead(pVnode->sync)) {
|
||||||
|
@ -963,8 +963,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
|
||||||
return tqProcessTaskRetrieveTriggerReq(pVnode->pTq, pMsg);
|
return tqProcessTaskRetrieveTriggerReq(pVnode->pTq, pMsg);
|
||||||
case TDMT_STREAM_RETRIEVE_TRIGGER_RSP:
|
case TDMT_STREAM_RETRIEVE_TRIGGER_RSP:
|
||||||
return tqProcessTaskRetrieveTriggerRsp(pVnode->pTq, pMsg);
|
return tqProcessTaskRetrieveTriggerRsp(pVnode->pTq, pMsg);
|
||||||
case TDMT_MND_STREAM_HEARTBEAT_RSP:
|
|
||||||
return tqProcessStreamHbRsp(pVnode->pTq, pMsg);
|
|
||||||
case TDMT_MND_STREAM_REQ_CHKPT_RSP:
|
case TDMT_MND_STREAM_REQ_CHKPT_RSP:
|
||||||
return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg);
|
return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg);
|
||||||
case TDMT_VND_GET_STREAM_PROGRESS:
|
case TDMT_VND_GET_STREAM_PROGRESS:
|
||||||
|
@ -977,6 +975,24 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||||
|
vTrace("vgId:%d, msg:%p in stream ctrl queue is processing", pVnode->config.vgId, pMsg);
|
||||||
|
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
|
||||||
|
pMsg->msgType == TDMT_VND_BATCH_META) &&
|
||||||
|
!syncIsReadyForRead(pVnode->sync)) {
|
||||||
|
vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (pMsg->msgType) {
|
||||||
|
case TDMT_MND_STREAM_HEARTBEAT_RSP:
|
||||||
|
return tqProcessStreamHbRsp(pVnode->pTq, pMsg);
|
||||||
|
default:
|
||||||
|
vError("unknown msg type:%d in stream ctrl queue", pMsg->msgType);
|
||||||
|
return TSDB_CODE_APP_ERROR;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
|
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
|
||||||
int32_t code = tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
|
int32_t code = tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
|
||||||
if (code) {
|
if (code) {
|
||||||
|
|
|
@ -38,7 +38,7 @@ extern "C" {
|
||||||
#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec
|
#define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec
|
||||||
#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1)
|
#define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1)
|
||||||
#define STREAM_TASK_QUEUE_CAPACITY 5120
|
#define STREAM_TASK_QUEUE_CAPACITY 5120
|
||||||
#define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (30)
|
#define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (10)
|
||||||
|
|
||||||
// clang-format off
|
// clang-format off
|
||||||
#define stFatal(...) do { if (stDebugFlag & DEBUG_FATAL) { taosPrintLog("STM FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
|
#define stFatal(...) do { if (stDebugFlag & DEBUG_FATAL) { taosPrintLog("STM FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
|
||||||
|
|
|
@ -527,6 +527,76 @@ static void cleanupInMonitor(int32_t taskId, int64_t taskRefId, void* param) {
|
||||||
streamTaskFreeRefId(param);
|
streamTaskFreeRefId(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t sendFailedDispatchData(SStreamTask* pTask, int64_t now) {
|
||||||
|
int32_t code = 0;
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
|
||||||
|
|
||||||
|
streamMutexLock(&pMsgInfo->lock);
|
||||||
|
|
||||||
|
int32_t msgId = pMsgInfo->msgId;
|
||||||
|
SStreamDispatchReq* pReq = pTask->msgInfo.pData;
|
||||||
|
|
||||||
|
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch to down streams, msgId:%d", id, pTask->info.selfChildId,
|
||||||
|
msgId);
|
||||||
|
|
||||||
|
int32_t numOfRetry = 0;
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pTask->msgInfo.pSendInfo); ++i) {
|
||||||
|
SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, i);
|
||||||
|
if (pEntry == NULL) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pEntry->status == TSDB_CODE_SUCCESS && pEntry->rspTs > 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// downstream not rsp yet beyond threshold that is 10s
|
||||||
|
if (isDispatchRspTimeout(pEntry, now)) { // not respond yet beyonds 30s, re-send data
|
||||||
|
doSendFailedDispatch(pTask, pEntry, now, "timeout");
|
||||||
|
numOfRetry += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// downstream inputQ is closed
|
||||||
|
if (pEntry->status == TASK_INPUT_STATUS__BLOCKED) {
|
||||||
|
doSendFailedDispatch(pTask, pEntry, now, "downstream inputQ blocked");
|
||||||
|
numOfRetry += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// handle other errors
|
||||||
|
if (pEntry->status != TSDB_CODE_SUCCESS) {
|
||||||
|
doSendFailedDispatch(pTask, pEntry, now, "downstream error");
|
||||||
|
numOfRetry += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stDebug("s-task:%s complete retry shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfRetry,
|
||||||
|
msgId);
|
||||||
|
} else {
|
||||||
|
int32_t dstVgId = pTask->outputInfo.fixedDispatcher.nodeId;
|
||||||
|
SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet;
|
||||||
|
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
||||||
|
|
||||||
|
int32_t s = taosArrayGetSize(pTask->msgInfo.pSendInfo);
|
||||||
|
SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, 0);
|
||||||
|
if (pEntry != NULL) {
|
||||||
|
setResendInfo(pEntry, now);
|
||||||
|
code = doSendDispatchMsg(pTask, pReq, dstVgId, pEpSet);
|
||||||
|
|
||||||
|
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d, code:%s", id,
|
||||||
|
pTask->info.selfChildId, 1, downstreamTaskId, dstVgId, msgId, tstrerror(code));
|
||||||
|
} else {
|
||||||
|
stError("s-task:%s invalid index 0, size:%d", id, s);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
streamMutexUnlock(&pMsgInfo->lock);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static void doMonitorDispatchData(void* param, void* tmrId) {
|
static void doMonitorDispatchData(void* param, void* tmrId) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int64_t now = taosGetTimestampMs();
|
int64_t now = taosGetTimestampMs();
|
||||||
|
@ -590,65 +660,7 @@ static void doMonitorDispatchData(void* param, void* tmrId) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
code = sendFailedDispatchData(pTask, now);
|
||||||
SStreamDispatchReq* pReq = pTask->msgInfo.pData;
|
|
||||||
|
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
|
||||||
stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch to down streams, msgId:%d", id,
|
|
||||||
pTask->info.selfChildId, msgId);
|
|
||||||
|
|
||||||
int32_t numOfRetry = 0;
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pTask->msgInfo.pSendInfo); ++i) {
|
|
||||||
SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, i);
|
|
||||||
if (pEntry == NULL) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pEntry->status == TSDB_CODE_SUCCESS && pEntry->rspTs > 0) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// downstream not rsp yet beyond threshold that is 10s
|
|
||||||
if (isDispatchRspTimeout(pEntry, now)) { // not respond yet beyonds 30s, re-send data
|
|
||||||
doSendFailedDispatch(pTask, pEntry, now, "timeout");
|
|
||||||
numOfRetry += 1;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// downstream inputQ is closed
|
|
||||||
if (pEntry->status == TASK_INPUT_STATUS__BLOCKED) {
|
|
||||||
doSendFailedDispatch(pTask, pEntry, now, "downstream inputQ blocked");
|
|
||||||
numOfRetry += 1;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// handle other errors
|
|
||||||
if (pEntry->status != TSDB_CODE_SUCCESS) {
|
|
||||||
doSendFailedDispatch(pTask, pEntry, now, "downstream error");
|
|
||||||
numOfRetry += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
stDebug("s-task:%s complete retry shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr,
|
|
||||||
numOfRetry, msgId);
|
|
||||||
} else {
|
|
||||||
int32_t dstVgId = pTask->outputInfo.fixedDispatcher.nodeId;
|
|
||||||
SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet;
|
|
||||||
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
|
||||||
|
|
||||||
int32_t s = taosArrayGetSize(pTask->msgInfo.pSendInfo);
|
|
||||||
SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, 0);
|
|
||||||
if (pEntry != NULL) {
|
|
||||||
setResendInfo(pEntry, now);
|
|
||||||
code = doSendDispatchMsg(pTask, pReq, dstVgId, pEpSet);
|
|
||||||
|
|
||||||
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d, code:%s", id,
|
|
||||||
pTask->info.selfChildId, 1, downstreamTaskId, dstVgId, msgId, tstrerror(code));
|
|
||||||
} else {
|
|
||||||
stError("s-task:%s invalid index 0, size:%d", id, s);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (streamTaskShouldStop(pTask)) {
|
if (streamTaskShouldStop(pTask)) {
|
||||||
stDebug("s-task:%s should stop, abort from timer", pTask->id.idStr);
|
stDebug("s-task:%s should stop, abort from timer", pTask->id.idStr);
|
||||||
|
@ -880,7 +892,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
||||||
|
|
||||||
code = sendDispatchMsg(pTask, pTask->msgInfo.pData);
|
code = sendDispatchMsg(pTask, pTask->msgInfo.pData);
|
||||||
|
|
||||||
// todo: secure the timerActive and start timer in after lock pTask->lock
|
// todo: start timer in after lock pTask->lock
|
||||||
streamMutexLock(&pTask->lock);
|
streamMutexLock(&pTask->lock);
|
||||||
bool shouldStop = streamTaskShouldStop(pTask);
|
bool shouldStop = streamTaskShouldStop(pTask);
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pTask->lock);
|
||||||
|
@ -890,7 +902,6 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
||||||
} else {
|
} else {
|
||||||
streamMutexLock(&pTask->msgInfo.lock);
|
streamMutexLock(&pTask->msgInfo.lock);
|
||||||
if (pTask->msgInfo.inMonitor == 0) {
|
if (pTask->msgInfo.inMonitor == 0) {
|
||||||
// int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
|
||||||
stDebug("s-task:%s start dispatch monitor tmr in %dms, dispatch code:%s", id, DISPATCH_RETRY_INTERVAL_MS,
|
stDebug("s-task:%s start dispatch monitor tmr in %dms, dispatch code:%s", id, DISPATCH_RETRY_INTERVAL_MS,
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||||
|
@ -1842,6 +1853,9 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
||||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stDebug("s-task:%s lastMsgId:%"PRId64 " for upstream taskId:0x%x(vgId:%d)", id, pInfo->lastMsgId, pReq->upstreamTaskId,
|
||||||
|
pReq->upstreamNodeId);
|
||||||
|
|
||||||
if (pMeta->role == NODE_ROLE_FOLLOWER) {
|
if (pMeta->role == NODE_ROLE_FOLLOWER) {
|
||||||
stError("s-task:%s task on follower received dispatch msgs, dispatch msg rejected", id);
|
stError("s-task:%s task on follower received dispatch msgs, dispatch msg rejected", id);
|
||||||
status = TASK_INPUT_STATUS__REFUSED;
|
status = TASK_INPUT_STATUS__REFUSED;
|
||||||
|
@ -1866,7 +1880,21 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
||||||
stDebug("s-task:%s recv trans-state msgId:%d from upstream:0x%x", id, pReq->msgId, pReq->upstreamTaskId);
|
stDebug("s-task:%s recv trans-state msgId:%d from upstream:0x%x", id, pReq->msgId, pReq->upstreamTaskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pReq->msgId > pInfo->lastMsgId) {
|
||||||
status = streamTaskAppendInputBlocks(pTask, pReq);
|
status = streamTaskAppendInputBlocks(pTask, pReq);
|
||||||
|
if (status == TASK_INPUT_STATUS__NORMAL) {
|
||||||
|
stDebug("s-task:%s update the lastMsgId from %" PRId64 " to %d", id, pInfo->lastMsgId, pReq->msgId);
|
||||||
|
pInfo->lastMsgId = pReq->msgId;
|
||||||
|
} else {
|
||||||
|
stDebug("s-task:%s not update the lastMsgId, remain:%" PRId64, id, pInfo->lastMsgId);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
stWarn(
|
||||||
|
"s-task:%s duplicate msgId:%d from upstream:0x%x discard and return succ, from vgId:%d already recv "
|
||||||
|
"msgId:%" PRId64,
|
||||||
|
id, pReq->msgId, pReq->upstreamTaskId, pReq->upstreamNodeId, pInfo->lastMsgId);
|
||||||
|
status = TASK_INPUT_STATUS__NORMAL; // still return success
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -281,7 +281,6 @@ void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo,
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
|
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
|
||||||
|
|
||||||
// int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
|
||||||
int32_t code = streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false);
|
int32_t code = streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false);
|
||||||
|
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -300,7 +299,6 @@ void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, i
|
||||||
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
|
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
|
||||||
|
|
||||||
if (streamTaskShouldStop(pTask)) { // record the failure
|
if (streamTaskShouldStop(pTask)) { // record the failure
|
||||||
// int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
|
||||||
stDebug("s-task:0x%" PRIx64 " stopped, not launch rel history task:0x%" PRIx64, pInfo->id.taskId,
|
stDebug("s-task:0x%" PRIx64 " stopped, not launch rel history task:0x%" PRIx64, pInfo->id.taskId,
|
||||||
pInfo->hTaskId.taskId);
|
pInfo->hTaskId.taskId);
|
||||||
|
|
||||||
|
|
|
@ -100,6 +100,7 @@ static SStreamUpstreamEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
|
||||||
pEpInfo->nodeId = pTask->info.nodeId;
|
pEpInfo->nodeId = pTask->info.nodeId;
|
||||||
pEpInfo->taskId = pTask->id.taskId;
|
pEpInfo->taskId = pTask->id.taskId;
|
||||||
pEpInfo->stage = -1;
|
pEpInfo->stage = -1;
|
||||||
|
pEpInfo->lastMsgId = -1;
|
||||||
|
|
||||||
return pEpInfo;
|
return pEpInfo;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue