Merge pull request #30103 from taosdata/fix/checkpoint
enh(stream): add checkpoint queue to execute the checkpoint procedure.
This commit is contained in:
commit
e5d678ca0b
|
@ -40,6 +40,7 @@ typedef enum {
|
|||
ARB_QUEUE,
|
||||
STREAM_CTRL_QUEUE,
|
||||
STREAM_LONG_EXEC_QUEUE,
|
||||
STREAM_CHKPT_QUEUE,
|
||||
QUEUE_MAX,
|
||||
} EQueueType;
|
||||
|
||||
|
|
|
@ -355,6 +355,7 @@
|
|||
TD_DEF_MSG_TYPE(TDMT_STREAM_DROP, "stream-drop", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE_TRIGGER, "stream-retri-trigger", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_CONSEN_CHKPT, "stream-consen-chkpt", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_CHKPT_EXEC, "stream-exec-chkpt", NULL, NULL)
|
||||
TD_CLOSE_MSG_SEG(TDMT_STREAM_MSG)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_MON_MSG) //5 << 8
|
||||
|
|
|
@ -702,8 +702,8 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t clearRelHalt);
|
|||
|
||||
int32_t streamExecTask(SStreamTask* pTask);
|
||||
int32_t streamResumeTask(SStreamTask* pTask);
|
||||
int32_t streamTrySchedExec(SStreamTask* pTask);
|
||||
int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType);
|
||||
int32_t streamTrySchedExec(SStreamTask* pTask, bool chkptExec);
|
||||
int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType, bool chkptExec);
|
||||
void streamTaskResumeInFuture(SStreamTask* pTask);
|
||||
void streamTaskClearSchedIdleInfo(SStreamTask* pTask);
|
||||
void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime);
|
||||
|
|
|
@ -47,7 +47,7 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pMsg);
|
|||
int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t smPutNodeMsgToWriteQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t smPutNodeMsgToStreamQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg);
|
||||
int32_t smPutNodeMsgToChkptQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -102,6 +102,8 @@ SArray *smGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_CHKPT_EXEC, smPutNodeMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
_OVER:
|
||||
if (code != 0) {
|
||||
|
|
|
@ -162,6 +162,9 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
|
|||
case WRITE_QUEUE:
|
||||
code = smPutNodeMsgToWriteQueue(pMgmt, pMsg);
|
||||
break;
|
||||
case STREAM_CHKPT_QUEUE:
|
||||
code = smPutNodeMsgToStreamQueue(pMgmt, pMsg);
|
||||
break;
|
||||
default:
|
||||
code = TSDB_CODE_INVALID_PARA;
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
|
@ -172,7 +175,6 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
|
|||
}
|
||||
|
||||
int32_t smPutNodeMsgToMgmtQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
int32_t code = 0;
|
||||
SMultiWorker *pWorker = taosArrayGetP(pMgmt->writeWroker, 0);
|
||||
if (pWorker == NULL) {
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
|
@ -198,3 +200,10 @@ int32_t smPutNodeMsgToStreamQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
|
||||
return taosWriteQitem(pWorker->queue, pMsg);
|
||||
}
|
||||
|
||||
//int32_t smPutNodeMsgToChkptQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
// SSingleWorker *pWorker = &pMgmt->chkptWorker;
|
||||
//
|
||||
// dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
|
||||
// return taosWriteQitem(pWorker->queue, pMsg);
|
||||
//}
|
||||
|
|
|
@ -34,6 +34,7 @@ typedef struct SVnodeMgmt {
|
|||
SAutoQWorkerPool streamPool;
|
||||
SAutoQWorkerPool streamLongExecPool;
|
||||
SWWorkerPool streamCtrlPool;
|
||||
SWWorkerPool streamChkPool;
|
||||
SWWorkerPool fetchPool;
|
||||
SSingleWorker mgmtWorker;
|
||||
SSingleWorker mgmtMultiWorker;
|
||||
|
@ -77,6 +78,7 @@ typedef struct {
|
|||
STaosQueue *pStreamQ;
|
||||
STaosQueue *pStreamCtrlQ;
|
||||
STaosQueue *pStreamLongExecQ;
|
||||
STaosQueue *pStreamChkQ;
|
||||
STaosQueue *pFetchQ;
|
||||
STaosQueue *pMultiMgmQ;
|
||||
} SVnodeObj;
|
||||
|
@ -141,6 +143,7 @@ int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
|||
int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToStreamLongExecQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
|
||||
int32_t vmPutMsgToStreamChkQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
|
|
|
@ -1022,6 +1022,7 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamCtrlQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY, vmPutMsgToStreamLongExecQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_CHKPT_EXEC, vmPutMsgToStreamChkQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -407,6 +407,9 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal,
|
|||
pVnode->pStreamLongExecQ, taosQueueItemSize(pVnode->pStreamLongExecQ));
|
||||
while (!taosQueueEmpty(pVnode->pStreamLongExecQ)) taosMsleep(50);
|
||||
|
||||
dInfo("vgId:%d, wait for vnode stream chkpt queue:%p is empty", pVnode->vgId, pVnode->pStreamChkQ);
|
||||
while (!taosQueueEmpty(pVnode->pStreamChkQ)) taosMsleep(10);
|
||||
|
||||
dInfo("vgId:%d, all vnode queues is empty", pVnode->vgId);
|
||||
|
||||
dInfo("vgId:%d, post close", pVnode->vgId);
|
||||
|
|
|
@ -165,6 +165,34 @@ static void vmProcessStreamCtrlQueue(SQueueInfo *pInfo, STaosQall* pQall, int32_
|
|||
}
|
||||
}
|
||||
|
||||
static void vmProcessStreamChkptQueue(SQueueInfo *pInfo, STaosQall* pQall, int32_t numOfItems) {
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
void *pItem = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
while (1) {
|
||||
if (taosGetQitem(pQall, &pItem) == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
SRpcMsg *pMsg = pItem;
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
|
||||
dGTrace("vgId:%d, msg:%p get from vnode-stream-chkpt queue", pVnode->vgId, pMsg);
|
||||
code = vnodeProcessStreamChkptMsg(pVnode->pImpl, pMsg, pInfo);
|
||||
if (code != 0) {
|
||||
terrno = code;
|
||||
dGError("vgId:%d, msg:%p failed to process stream chkpt msg %s since %s", pVnode->vgId, pMsg,
|
||||
TMSG_INFO(pMsg->msgType), tstrerror(code));
|
||||
vmSendRsp(pMsg, code);
|
||||
}
|
||||
|
||||
dGTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
static void vmProcessStreamLongExecQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||
SVnodeObj *pVnode = pInfo->ahandle;
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
|
@ -301,6 +329,10 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
|
|||
dGTrace("vgId:%d, msg:%p put into vnode-stream-long-exec queue", pVnode->vgId, pMsg);
|
||||
code = taosWriteQitem(pVnode->pStreamLongExecQ, pMsg);
|
||||
break;
|
||||
case STREAM_CHKPT_QUEUE:
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-stream-chkpt queue", pVnode->vgId, pMsg);
|
||||
code = taosWriteQitem(pVnode->pStreamChkQ, pMsg);
|
||||
break;
|
||||
case FETCH_QUEUE:
|
||||
dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg);
|
||||
code = taosWriteQitem(pVnode->pFetchQ, pMsg);
|
||||
|
@ -361,6 +393,8 @@ int32_t vmPutMsgToStreamCtrlQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmP
|
|||
|
||||
int32_t vmPutMsgToStreamLongExecQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_LONG_EXEC_QUEUE); }
|
||||
|
||||
int32_t vmPutMsgToStreamChkQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_CHKPT_QUEUE); }
|
||||
|
||||
int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
dGTrace("msg:%p, put into vnode-multi-mgmt queue", pMsg);
|
||||
|
@ -439,6 +473,8 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
|
|||
case STREAM_LONG_EXEC_QUEUE:
|
||||
size = taosQueueItemSize(pVnode->pStreamLongExecQ);
|
||||
break;
|
||||
case STREAM_CHKPT_QUEUE:
|
||||
size = taosQueueItemSize(pVnode->pStreamChkQ);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -487,10 +523,11 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
|||
pVnode->pStreamQ = tAutoQWorkerAllocQueue(&pMgmt->streamPool, pVnode, (FItem)vmProcessStreamQueue, 2);
|
||||
pVnode->pStreamCtrlQ = tWWorkerAllocQueue(&pMgmt->streamCtrlPool, pVnode, (FItems)vmProcessStreamCtrlQueue);
|
||||
pVnode->pStreamLongExecQ = tAutoQWorkerAllocQueue(&pMgmt->streamLongExecPool, pVnode, (FItem)vmProcessStreamLongExecQueue, 1);
|
||||
pVnode->pStreamChkQ = tWWorkerAllocQueue(&pMgmt->streamChkPool, pVnode, (FItems)vmProcessStreamChkptQueue);
|
||||
|
||||
if (pVnode->pWriteW.queue == NULL || pVnode->pSyncW.queue == NULL || pVnode->pSyncRdW.queue == NULL ||
|
||||
pVnode->pApplyW.queue == NULL || pVnode->pQueryQ == NULL || pVnode->pStreamQ == NULL || pVnode->pFetchQ == NULL
|
||||
|| pVnode->pStreamCtrlQ == NULL || pVnode->pStreamLongExecQ == NULL) {
|
||||
|| pVnode->pStreamCtrlQ == NULL || pVnode->pStreamLongExecQ == NULL || pVnode->pStreamChkQ == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
@ -509,6 +546,8 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
|||
dInfo("vgId:%d, stream-long-exec-queue:%p is alloced", pVnode->vgId, pVnode->pStreamLongExecQ);
|
||||
dInfo("vgId:%d, stream-ctrl-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pStreamCtrlQ,
|
||||
taosQueueGetThreadId(pVnode->pStreamCtrlQ));
|
||||
dInfo("vgId:%d, stream-chk-queue:%p is alloced, thread:%08" PRId64, pVnode->vgId, pVnode->pStreamChkQ,
|
||||
taosQueueGetThreadId(pVnode->pStreamChkQ));
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -517,6 +556,7 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
|||
tAutoQWorkerFreeQueue(&pMgmt->streamPool, pVnode->pStreamQ);
|
||||
tAutoQWorkerFreeQueue(&pMgmt->streamLongExecPool, pVnode->pStreamLongExecQ);
|
||||
tWWorkerFreeQueue(&pMgmt->streamCtrlPool, pVnode->pStreamCtrlQ);
|
||||
tWWorkerFreeQueue(&pMgmt->streamChkPool, pVnode->pStreamChkQ);
|
||||
tWWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ);
|
||||
pVnode->pQueryQ = NULL;
|
||||
pVnode->pFetchQ = NULL;
|
||||
|
@ -525,6 +565,8 @@ void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
|||
pVnode->pStreamCtrlQ = NULL;
|
||||
pVnode->pStreamLongExecQ = NULL;
|
||||
|
||||
pVnode->pStreamChkQ = NULL;
|
||||
pVnode->pFetchQ = NULL;
|
||||
dDebug("vgId:%d, queue is freed", pVnode->vgId);
|
||||
}
|
||||
|
||||
|
@ -554,6 +596,11 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) {
|
|||
pStreamCtrlPool->max = 1;
|
||||
if ((code = tWWorkerInit(pStreamCtrlPool)) != 0) return code;
|
||||
|
||||
SWWorkerPool *pStreamChkPool = &pMgmt->streamChkPool;
|
||||
pStreamChkPool->name = "vnode-stream-chkpt";
|
||||
pStreamChkPool->max = 1;
|
||||
if ((code = tWWorkerInit(pStreamChkPool)) != 0) return code;
|
||||
|
||||
SWWorkerPool *pFPool = &pMgmt->fetchPool;
|
||||
pFPool->name = "vnode-fetch";
|
||||
pFPool->max = tsNumOfVnodeFetchThreads;
|
||||
|
@ -587,6 +634,7 @@ void vmStopWorker(SVnodeMgmt *pMgmt) {
|
|||
tAutoQWorkerCleanup(&pMgmt->streamPool);
|
||||
tAutoQWorkerCleanup(&pMgmt->streamLongExecPool);
|
||||
tWWorkerCleanup(&pMgmt->streamCtrlPool);
|
||||
tWWorkerCleanup(&pMgmt->streamChkPool);
|
||||
tWWorkerCleanup(&pMgmt->fetchPool);
|
||||
dDebug("vnode workers are closed");
|
||||
}
|
||||
|
|
|
@ -92,7 +92,7 @@ FAIL:
|
|||
}
|
||||
|
||||
int32_t sndInit(SSnode *pSnode) {
|
||||
if (streamTaskSchedTask(&pSnode->msgCb, pSnode->pMeta->vgId, 0, 0, STREAM_EXEC_T_START_ALL_TASKS) != 0) {
|
||||
if (streamTaskSchedTask(&pSnode->msgCb, pSnode->pMeta->vgId, 0, 0, STREAM_EXEC_T_START_ALL_TASKS, false) != 0) {
|
||||
sndError("failed to start all tasks");
|
||||
}
|
||||
return 0;
|
||||
|
@ -138,6 +138,8 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
|
|||
return tqStreamTaskProcessRetrieveTriggerReq(pSnode->pMeta, pMsg);
|
||||
case TDMT_STREAM_RETRIEVE_TRIGGER_RSP:
|
||||
return tqStreamTaskProcessRetrieveTriggerRsp(pSnode->pMeta, pMsg);
|
||||
case TDMT_STREAM_CHKPT_EXEC:
|
||||
return tqStreamTaskProcessRunReq(pSnode->pMeta, pMsg, true);
|
||||
default:
|
||||
sndError("invalid snode msg:%d", pMsg->msgType);
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
|
|
|
@ -114,6 +114,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
|||
int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||
int32_t vnodeProcessStreamCtrlMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||
int32_t vnodeProcessStreamLongExecMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||
int32_t vnodeProcessStreamChkptMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
|
||||
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
|
||||
void vnodeProposeCommitOnNeed(SVnode *pVnode, bool atExit);
|
||||
|
|
|
@ -165,7 +165,7 @@ static void doStartScanWal(void* param, void* tmrId) {
|
|||
taosMsleep(10000);
|
||||
#endif
|
||||
|
||||
code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
|
||||
code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA, false);
|
||||
if (code) {
|
||||
tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
|
||||
}
|
||||
|
@ -216,7 +216,7 @@ void tqScanWalAsync(STQ* pTq) {
|
|||
}
|
||||
|
||||
int32_t tqStopStreamAllTasksAsync(SStreamMeta* pMeta, SMsgCb* pMsgCb) {
|
||||
return streamTaskSchedTask(pMsgCb, pMeta->vgId, 0, 0, STREAM_EXEC_T_STOP_ALL_TASKS);
|
||||
return streamTaskSchedTask(pMsgCb, pMeta->vgId, 0, 0, STREAM_EXEC_T_STOP_ALL_TASKS, false);
|
||||
}
|
||||
|
||||
int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
|
||||
|
@ -322,7 +322,7 @@ bool taskReadyForDataFromWal(SStreamTask* pTask) {
|
|||
// check whether input queue is full or not
|
||||
if (streamQueueIsFull(pTask->inputq.queue)) {
|
||||
tqTrace("s-task:%s input queue is full, launch task without scanning wal", pTask->id.idStr);
|
||||
int32_t code = streamTrySchedExec(pTask);
|
||||
int32_t code = streamTrySchedExec(pTask, false);
|
||||
if (code) {
|
||||
tqError("s-task:%s failed to start task while inputQ is full", pTask->id.idStr);
|
||||
}
|
||||
|
@ -461,7 +461,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, int32_t* pNumOfTasks) {
|
|||
streamMutexUnlock(&pTask->lock);
|
||||
|
||||
if ((numOfItems > 0) || hasNewData) {
|
||||
code = streamTrySchedExec(pTask);
|
||||
code = streamTrySchedExec(pTask, false);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||
taosArrayDestroy(pTaskList);
|
||||
|
|
|
@ -131,7 +131,7 @@ int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
|
|||
tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks);
|
||||
|
||||
int32_t type = restart ? STREAM_EXEC_T_RESTART_ALL_TASKS : STREAM_EXEC_T_START_ALL_TASKS;
|
||||
return streamTaskSchedTask(cb, vgId, 0, 0, type);
|
||||
return streamTaskSchedTask(cb, vgId, 0, 0, type, false);
|
||||
}
|
||||
|
||||
int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) {
|
||||
|
@ -143,7 +143,7 @@ int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream
|
|||
}
|
||||
|
||||
tqDebug("vgId:%d start task:0x%x async", vgId, taskId);
|
||||
return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK);
|
||||
return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK, false);
|
||||
}
|
||||
|
||||
// this is to process request from transaction, always return true.
|
||||
|
@ -1222,7 +1222,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
|
|||
} else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
|
||||
// code = tqScanWalAsync((STQ*)handle, false);
|
||||
} else {
|
||||
code = streamTrySchedExec(pTask);
|
||||
code = streamTrySchedExec(pTask, false);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -940,6 +940,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
|||
|
||||
int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||
vTrace("vgId:%d, msg:%p in stream queue is processing", pVnode->config.vgId, pMsg);
|
||||
|
||||
// todo: NOTE: some command needs to run on follower, such as, stop_all_tasks
|
||||
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
|
||||
pMsg->msgType == TDMT_VND_BATCH_META) &&
|
||||
!syncIsReadyForRead(pVnode->sync)) {
|
||||
|
@ -1016,6 +1018,24 @@ int32_t vnodeProcessStreamLongExecMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo
|
|||
}
|
||||
}
|
||||
|
||||
int32_t vnodeProcessStreamChkptMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||
vTrace("vgId:%d, msg:%p in stream chkpt queue is processing", pVnode->config.vgId, pMsg);
|
||||
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
|
||||
pMsg->msgType == TDMT_VND_BATCH_META) &&
|
||||
!syncIsReadyForRead(pVnode->sync)) {
|
||||
vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
|
||||
return 0;
|
||||
}
|
||||
|
||||
switch (pMsg->msgType) {
|
||||
case TDMT_STREAM_CHKPT_EXEC:
|
||||
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
|
||||
default:
|
||||
vError("unknown msg type:%d in stream chkpt queue", pMsg->msgType);
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
}
|
||||
}
|
||||
|
||||
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
|
||||
int32_t code = tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data);
|
||||
if (code) {
|
||||
|
|
|
@ -597,7 +597,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
|||
streamMetaWUnLock(pMeta);
|
||||
|
||||
tqInfo("vgId:%d stream task already loaded, start them", vgId);
|
||||
int32_t code = streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_START_ALL_TASKS);
|
||||
int32_t code = streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_START_ALL_TASKS, false);
|
||||
if (code != 0) {
|
||||
tqError("vgId:%d failed to sched stream task, code:%s", vgId, tstrerror(code));
|
||||
}
|
||||
|
|
|
@ -717,7 +717,7 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
|
|||
// The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution
|
||||
// of restart in timer thread will result in a deadlock.
|
||||
int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) {
|
||||
return streamTaskSchedTask(pMsgCb, vgId, streamId, taskId, STREAM_EXEC_T_ADD_FAILED_TASK);
|
||||
return streamTaskSchedTask(pMsgCb, vgId, streamId, taskId, STREAM_EXEC_T_ADD_FAILED_TASK, false);
|
||||
}
|
||||
|
||||
static void doCleanup(SStreamTask* pTask, SArray* pNotReadyList, SArray* pTimeoutList, void* param) {
|
||||
|
|
|
@ -93,7 +93,7 @@ int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, i
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
return streamTrySchedExec(pTask);
|
||||
return streamTrySchedExec(pTask, true);
|
||||
}
|
||||
|
||||
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) {
|
||||
|
|
|
@ -1834,6 +1834,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
|||
int32_t status = 0;
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
const char* id = pTask->id.idStr;
|
||||
bool chkptMsg = false;
|
||||
|
||||
stDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64 ", msgId:%d", id,
|
||||
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen, pReq->msgId);
|
||||
|
@ -1863,6 +1864,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
|||
// blocked. Note that there is no race condition here.
|
||||
if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
||||
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
|
||||
chkptMsg = true;
|
||||
stDebug("s-task:%s close inputQ for upstream:0x%x, msgId:%d", id, pReq->upstreamTaskId, pReq->msgId);
|
||||
} else if (pReq->type == STREAM_INPUT__TRANS_STATE) {
|
||||
stDebug("s-task:%s recv trans-state msgId:%d from upstream:0x%x", id, pReq->msgId, pReq->upstreamTaskId);
|
||||
|
@ -1890,5 +1892,5 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
|||
tmsgSendRsp(pRsp);
|
||||
}
|
||||
|
||||
return streamTrySchedExec(pTask);
|
||||
return streamTrySchedExec(pTask, chkptMsg);
|
||||
}
|
||||
|
|
|
@ -144,11 +144,20 @@ void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem
|
|||
|
||||
// let's try the ordinary input q
|
||||
pQueue->qItem = NULL;
|
||||
int32_t num = taosGetQitem(pQueue->qall, &pQueue->qItem);
|
||||
int32_t code = taosGetQitem(pQueue->qall, &pQueue->qItem);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to extract data from inputQ, code:%s", id, tstrerror(code));
|
||||
}
|
||||
|
||||
if (pQueue->qItem == NULL) {
|
||||
num = taosReadAllQitems(pQueue->pQueue, pQueue->qall);
|
||||
num = taosGetQitem(pQueue->qall, &pQueue->qItem);
|
||||
code = taosReadAllQitems(pQueue->pQueue, pQueue->qall);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to read qitem into qall, code:%s", id, tstrerror(code));
|
||||
}
|
||||
code = taosGetQitem(pQueue->qall, &pQueue->qItem);
|
||||
if (code) {
|
||||
stError("s-task:%s failed to extract data from inputQ(qall), code:%s", id, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
*pItem = streamQueueCurItem(pQueue);
|
||||
|
|
|
@ -93,17 +93,21 @@ void streamSetupScheduleTrigger(SStreamTask* pTask) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t streamTrySchedExec(SStreamTask* pTask) {
|
||||
int32_t streamTrySchedExec(SStreamTask* pTask, bool chkptQueue) {
|
||||
if (streamTaskSetSchedStatusWait(pTask)) {
|
||||
return streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pTask->id.streamId, pTask->id.taskId, 0);
|
||||
return streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pTask->id.streamId, pTask->id.taskId, 0, chkptQueue);
|
||||
} else {
|
||||
stTrace("s-task:%s not launch task since sched status:%d", pTask->id.idStr, pTask->status.schedStatus);
|
||||
if (chkptQueue) {
|
||||
stWarn("s-task:%s not launch task in chkpt queue, may delay checkpoint procedure", pTask->id.idStr);
|
||||
} else {
|
||||
stTrace("s-task:%s not launch task since sched status:%d", pTask->id.idStr, pTask->status.schedStatus);
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType) {
|
||||
int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType, bool chkptExec) {
|
||||
int32_t code = 0;
|
||||
int32_t tlen = 0;
|
||||
|
||||
|
@ -142,10 +146,18 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3
|
|||
stDebug("vgId:%d create msg to exec, type:%d, %s", vgId, execType, streamTaskGetExecType(execType));
|
||||
}
|
||||
|
||||
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = buf, .contLen = tlen + sizeof(SMsgHead)};
|
||||
code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg);
|
||||
if (code) {
|
||||
stError("vgId:%d failed to put msg into stream queue, code:%s, %x", vgId, tstrerror(code), taskId);
|
||||
if (chkptExec) {
|
||||
SRpcMsg msg = {.msgType = TDMT_STREAM_CHKPT_EXEC, .pCont = buf, .contLen = tlen + sizeof(SMsgHead)};
|
||||
code = tmsgPutToQueue(pMsgCb, STREAM_CHKPT_QUEUE, &msg);
|
||||
if (code) {
|
||||
stError("vgId:%d failed to put msg into stream chkpt queue, code:%s, %x", vgId, tstrerror(code), taskId);
|
||||
}
|
||||
} else {
|
||||
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = buf, .contLen = tlen + sizeof(SMsgHead)};
|
||||
code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg);
|
||||
if (code) {
|
||||
stError("vgId:%d failed to put msg into stream queue, code:%s, %x", vgId, tstrerror(code), taskId);
|
||||
}
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -191,12 +203,17 @@ void streamTaskResumeHelper(void* param, void* tmrId) {
|
|||
return;
|
||||
}
|
||||
|
||||
code = streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK);
|
||||
code = streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK,
|
||||
(p.state == TASK_STATUS__CK));
|
||||
if (code) {
|
||||
stError("s-task:%s sched task failed, code:%s", pId->idStr, tstrerror(code));
|
||||
} else {
|
||||
stDebug("trigger to resume s-task:%s after idled for %dms", pId->idStr, pTask->status.schedIdleTime);
|
||||
|
||||
if (p.state == TASK_STATUS__CK) {
|
||||
stDebug("trigger to resume s-task:%s in stream chkpt queue after idled for %dms", pId->idStr,
|
||||
pTask->status.schedIdleTime);
|
||||
} else {
|
||||
stDebug("trigger to resume s-task:%s after idled for %dms", pId->idStr, pTask->status.schedIdleTime);
|
||||
}
|
||||
// release the task ref count
|
||||
streamTaskClearSchedIdleInfo(pTask);
|
||||
}
|
||||
|
@ -339,7 +356,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
|
|||
}
|
||||
}
|
||||
|
||||
code = streamTrySchedExec(pTask);
|
||||
code = streamTrySchedExec(pTask, false);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("s-task:%s failed to sched to run, wait for next time", pTask->id.idStr);
|
||||
}
|
||||
|
|
|
@ -1194,7 +1194,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
|
|||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
return streamTrySchedExec(pTask);
|
||||
return streamTrySchedExec(pTask, false);
|
||||
}
|
||||
|
||||
void streamTaskSetRemoveBackendFiles(SStreamTask* pTask) { pTask->status.removeBackendFiles = true; }
|
||||
|
|
|
@ -22,6 +22,8 @@ from util.cluster import *
|
|||
# should be used by -N option
|
||||
class TDTestCase:
|
||||
updatecfgDict = {'checkpointInterval': 60 ,
|
||||
'vdebugflag':143,
|
||||
'ddebugflag':143
|
||||
}
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
|
|
Loading…
Reference in New Issue