diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 90ff1a36c0..b71886de2d 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -256,16 +256,13 @@ enum { TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED1, "stream-unused1", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY, "stream-scan-history", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY_FINISH, "stream-scan-history-finish", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "stream-task-check", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT_READY, "stream-checkpoint-ready", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_REPORT_CHECKPOINT, "stream-report-checkpoint", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESTORE_CHECKPOINT, "stream-restore-checkpoint", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_PAUSE, "stream-task-pause", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESUME, "stream-task-resume", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_STOP, "stream-task-stop", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_MON_MSG) TD_DEF_MSG_TYPE(TDMT_MON_MAX_MSG, "monitor-max", NULL, NULL) @@ -304,8 +301,10 @@ enum { TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) // TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY_FINISH, "vnode-stream-scan-history-finish", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_UPDATE, "vnode-stream-update", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_CHECK, "vnode-stream-task-check", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 3ef3347547..8d5d5d224c 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -543,9 +543,16 @@ typedef struct { int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pRsp); int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp); -typedef struct { +typedef struct STaskStatusEntry { + int64_t streamId; + int32_t taskId; + int32_t status; +} STaskStatusEntry; + +typedef struct SStreamHbMsg { int32_t vgId; int32_t numOfTasks; + SArray* pTaskStatus; // SArray } SStreamHbMsg; int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp); diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index e48ac2ca20..13b81231d4 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -77,10 +77,10 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; code = 0; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 14704eca2d..c72ddbf8b9 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -737,10 +737,10 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; // if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 9bcded0c04..1ec89bdd2b 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -41,8 +41,11 @@ typedef struct SNodeEntry { } SNodeEntry; typedef struct SStreamVnodeRevertIndex { - SArray *pNodeEntryList; - int64_t ts; // snapshot ts + SArray *pNodeEntryList; + int64_t ts; // snapshot ts + SHashObj *pTaskMap; + SArray *pTaskList; + TdThreadMutex lock; } SStreamVnodeRevertIndex; typedef struct SVgroupChangeInfo { @@ -74,6 +77,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in int64_t streamId, int32_t taskId); static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); +static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode); static SArray *doExtractNodeListFromStream(SMnode *pMnode); static SArray *mndTakeVgroupSnapshot(SMnode *pMnode); @@ -106,7 +110,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint); - // mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); + mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq); @@ -118,6 +122,10 @@ int32_t mndInitStream(SMnode *pMnode) { mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask); + taosThreadMutexInit(&execNodeList.lock, NULL); + execNodeList.pTaskMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); + execNodeList.pTaskList = taosArrayInit(4, sizeof(STaskStatusEntry)); + return sdbSetTable(pMnode->pSdb, table); } @@ -853,6 +861,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { mndTransDrop(pTrans); + keepStreamTasksInBuf(pStream, &execNodeList); code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: @@ -1123,13 +1132,14 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SStreamObj *pStream = NULL; int32_t code = 0; - { + { // check if the node update happens or not int64_t ts = taosGetTimestampSec(); if (execNodeList.pNodeEntryList == NULL || (taosArrayGetSize(execNodeList.pNodeEntryList) == 0)) { if (execNodeList.pNodeEntryList != NULL) { execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList); } + execNodeList.pNodeEntryList = doExtractNodeListFromStream(pMnode); } @@ -1154,6 +1164,26 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { } } + {// check if all tasks are in TASK_STATUS__NORMAL status + bool ready = true; + + taosThreadMutexLock(&execNodeList.lock); + for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pTaskList); ++i) { + STaskStatusEntry* p = taosArrayGet(execNodeList.pTaskList, i); + if (p->status != TASK_STATUS__NORMAL) { + mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, create checkpoint msg not issued", + p->streamId, p->taskId, 0, streamGetTaskStatusStr(p->status)); + ready = false; + break; + } + } + taosThreadMutexUnlock(&execNodeList.lock); + + if (!ready) { + return 0; + } + } + SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont; int64_t checkpointId = pMsg->checkpointId; @@ -2109,6 +2139,7 @@ static SArray *doExtractNodeListFromStream(SMnode *pMnode) { return plist; } +// this function runs by only one thread, so it is not multi-thread safe static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1); if (old != 0) { @@ -2124,6 +2155,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { if (execNodeList.pNodeEntryList != NULL) { execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList); } + execNodeList.pNodeEntryList = doExtractNodeListFromStream(pMnode); } @@ -2172,85 +2204,118 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { return 0; } +static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode) { + int32_t level = taosArrayGetSize(pStream->tasks); + for (int32_t i = 0; i < level; i++) { + SArray *pLevel = taosArrayGetP(pStream->tasks, i); + + int32_t numOfTasks = taosArrayGetSize(pLevel); + for (int32_t j = 0; j < numOfTasks; j++) { + SStreamTask *pTask = taosArrayGetP(pLevel, j); + int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId}; + + void* p = taosHashGet(pExecNode->pTaskMap, keys, sizeof(keys)); + if (p == NULL) { + STaskStatusEntry entry = { + .streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .status = TASK_STATUS__STOP}; + taosArrayPush(pExecNode->pTaskList, &entry); + + int32_t ordinal = taosArrayGetSize(pExecNode->pTaskList) - 1; + taosHashPut(pExecNode->pTaskMap, keys, sizeof(keys), &ordinal, sizeof(ordinal)); + } + } + } +} + // todo: this process should be executed by the write queue worker of the mnode -// int32_t mndProcessStreamHb(SRpcMsg *pReq) { -// SMnode *pMnode = pReq->info.node; -// SSdb *pSdb = pMnode->pSdb; -// SStreamHbMsg req = {0}; -// int32_t code = TSDB_CODE_SUCCESS; -// -// SDecoder decoder = {0}; -// tDecoderInit(&decoder, (uint8_t *)pReq->pCont, pReq->contLen); -// -// if (tStartDecode(&decoder) < 0) return -1; -// -// if (tDecodeStreamHbMsg(&decoder, &req) < 0) { -// terrno = TSDB_CODE_INVALID_MSG; -// return -1; -// } -// + int32_t mndProcessStreamHb(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + SStreamHbMsg req = {0}; + int32_t code = TSDB_CODE_SUCCESS; + + SDecoder decoder = {0}; + tDecoderInit(&decoder, (uint8_t *)pReq->pCont, pReq->contLen); + + if (tStartDecode(&decoder) < 0) return -1; + + if (tDecodeStreamHbMsg(&decoder, &req) < 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + // int64_t now = taosGetTimestampSec(); -// mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); -// -// // timeout list -// bool nodeChanged = false; + mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); + + taosThreadMutexLock(&execNodeList.lock); + for(int32_t i = 0; i < req.numOfTasks; ++i) { + STaskStatusEntry* p = taosArrayGet(req.pTaskStatus, i); + int64_t k[2] = {p->streamId, p->taskId}; + int32_t index = *(int32_t*) taosHashGet(execNodeList.pTaskMap, &k, sizeof(k)); + + STaskStatusEntry* pStatusEntry = taosArrayGet(execNodeList.pTaskList, index); + pStatusEntry->status = p->status; + } + taosThreadMutexUnlock(&execNodeList.lock); + + // bool nodeChanged = false; // SArray* pList = taosArrayInit(4, sizeof(int32_t)); -// -// // record the timeout node -// for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) { -// SNodeEntry* pEntry = taosArrayGet(execNodeList.pNodeEntryList, i); -// int64_t duration = now - pEntry->hbTimestamp; -// if (duration > MND_STREAM_HB_INTERVAL) { // execNode timeout, try next -// taosArrayPush(pList, &pEntry); -// mWarn("nodeId:%d stream node timeout, since last hb:%"PRId64"s", pEntry->nodeId, duration); -// continue; +/* + // record the timeout node + for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) { + SNodeEntry* pEntry = taosArrayGet(execNodeList.pNodeEntryList, i); + int64_t duration = now - pEntry->hbTimestamp; + if (duration > MND_STREAM_HB_INTERVAL) { // execNode timeout, try next + taosArrayPush(pList, &pEntry); + mWarn("nodeId:%d stream node timeout, since last hb:%"PRId64"s", pEntry->nodeId, duration); + continue; + } + + if (pEntry->nodeId != req.vgId) { + continue; + } + + pEntry->hbTimestamp = now; + + // check epset to identify whether the node has been transferred to other dnodes. + // node the epset is changed, which means the node transfer has occurred for this node. +// if (!isEpsetEqual(&pEntry->epset, &req.epset)) { +// nodeChanged = true; +// break; // } -// -// if (pEntry->nodeId != req.vgId) { -// continue; -// } -// -// pEntry->hbTimestamp = now; -// -// // check epset to identify whether the node has been transferred to other dnodes. -// // node the epset is changed, which means the node transfer has occurred for this node. -//// if (!isEpsetEqual(&pEntry->epset, &req.epset)) { -//// nodeChanged = true; -//// break; -//// } -// } -// -// // todo handle the node timeout case. Once the vnode is off-line, we should check the dnode status from mnode, -// // to identify whether the dnode is truely offline or not. -// -// // handle the node changed case -// if (!nodeChanged) { -// return TSDB_CODE_SUCCESS; -// } -// -// int32_t nodeId = req.vgId; -// -// {// check all streams that involved this vnode should update the epset info -// SStreamObj *pStream = NULL; -// void *pIter = NULL; -// while (1) { -// pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); -// if (pIter == NULL) { -// break; -// } -// -// // update the related upstream and downstream tasks, todo remove this, no need this function -// taosWLockLatch(&pStream->lock); -//// streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset); -//// streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset); -// taosWUnLockLatch(&pStream->lock); -// -//// code = createStreamUpdateTrans(pMnode, pStream, nodeId, ); -//// if (code != TSDB_CODE_SUCCESS) { -//// todo + } + + // todo handle the node timeout case. Once the vnode is off-line, we should check the dnode status from mnode, + // to identify whether the dnode is truely offline or not. + + // handle the node changed case + if (!nodeChanged) { + return TSDB_CODE_SUCCESS; + } + + int32_t nodeId = req.vgId; + + {// check all streams that involved this vnode should update the epset info + SStreamObj *pStream = NULL; + void *pIter = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); + if (pIter == NULL) { + break; + } + + // update the related upstream and downstream tasks, todo remove this, no need this function + taosWLockLatch(&pStream->lock); +// streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset); +// streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset); + taosWUnLockLatch(&pStream->lock); + +// code = createStreamUpdateTrans(pMnode, pStream, nodeId, ); +// if (code != TSDB_CODE_SUCCESS) { +// todo //// } // } -// } -// -// return TSDB_CODE_SUCCESS; -//} + } +*/ + return TSDB_CODE_SUCCESS; +} diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index fd7f3d6be3..46ccec9f9e 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -411,13 +411,13 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { return sndProcessTaskRetrieveReq(pSnode, pMsg); case TDMT_STREAM_RETRIEVE_RSP: return sndProcessTaskRetrieveRsp(pSnode, pMsg); - case TDMT_STREAM_SCAN_HISTORY_FINISH: + case TDMT_VND_STREAM_SCAN_HISTORY_FINISH: return sndProcessStreamTaskScanHistoryFinishReq(pSnode, pMsg); - case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP: + case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP: return sndProcessTaskRecoverFinishRsp(pSnode, pMsg); - case TDMT_STREAM_TASK_CHECK: + case TDMT_VND_STREAM_TASK_CHECK: return sndProcessStreamTaskCheckReq(pSnode, pMsg); - case TDMT_STREAM_TASK_CHECK_RSP: + case TDMT_VND_STREAM_TASK_CHECK_RSP: return sndProcessStreamTaskCheckRsp(pSnode, pMsg); default: ASSERT(0); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f0974fb487..4e26dcd0d0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1045,9 +1045,10 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64 - " child id:%d, level:%d, fill-history:%d, trigger:%" PRId64 " ms", + " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms", vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer, - pTask->info.selfChildId, pTask->info.taskLevel, pTask->info.fillHistory, pTask->triggerParam); + pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), + pTask->info.fillHistory, pTask->triggerParam); return 0; } @@ -1335,9 +1336,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pStreamTask); } else { - // todo update the chkInfo version for current task. - // this task has an associated history stream task, so we need to scan wal from the end version of - // history scan. The current version of chkInfo.current is not updated during the history scan STimeWindow* pWindow = &pTask->dataRange.window; if (pTask->historyTaskId.taskId == 0) { @@ -1356,7 +1354,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { id, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey); } - // notify the downstream agg tasks that upstream tasks are ready to processing the WAL data, update the code = streamTaskScanHistoryDataComplete(pTask); streamMetaReleaseTask(pMeta, pTask); @@ -1408,6 +1405,7 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) { return 0; } +// only the agg tasks and the sink tasks will receive this message from upstream tasks int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) { char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); @@ -1434,6 +1432,7 @@ int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) { return code; } +// NOTE: the rsp msg should be kept in WAL file. int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) { char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); @@ -1462,6 +1461,20 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) { "s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history " "completed msg", pTask->id.idStr, req.downstreamId); + + // the scan-history finish status should be recorded in the WAL files. So the transfer of the task status from + // scan-history + // to normal should be executed by write thread of each vnode. + +// void* buf = NULL; +// int32_t tlen = 0; +// // encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen); +// +// SRpcMsg msg = {.msgType = TDMT_VND_CREATE_TABLE, .pCont = buf, .contLen = tlen}; +// if (tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &msg) != 0) { +// tqError("failed to put into write-queue since %s", terrstr()); +// } + streamProcessScanHistoryFinishRsp(pTask); } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 7029cc1af4..bb9146468f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -664,9 +664,9 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true); case TDMT_STREAM_TASK_DISPATCH_RSP: return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg); - case TDMT_STREAM_TASK_CHECK: + case TDMT_VND_STREAM_TASK_CHECK: return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg); - case TDMT_STREAM_TASK_CHECK_RSP: + case TDMT_VND_STREAM_TASK_CHECK_RSP: return tqProcessStreamTaskCheckRsp(pVnode->pTq, 0, pMsg); case TDMT_STREAM_RETRIEVE: return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg); @@ -674,9 +674,9 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg); case TDMT_VND_STREAM_SCAN_HISTORY: return tqProcessTaskScanHistory(pVnode->pTq, pMsg); - case TDMT_STREAM_SCAN_HISTORY_FINISH: + case TDMT_VND_STREAM_SCAN_HISTORY_FINISH: return tqProcessTaskScanHistoryFinishReq(pVnode->pTq, pMsg); - case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP: + case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP: return tqProcessTaskScanHistoryFinishRsp(pVnode->pTq, pMsg); case TDMT_VND_STREAM_CHECK_POINT_SOURCE: return tqProcessStreamCheckPointSourceReq(pVnode->pTq, pMsg); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 13f4e2ff06..98fba39fab 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -269,7 +269,7 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR } tEncoderClear(&encoder); - initRpcMsg(&msg, TDMT_STREAM_TASK_CHECK, buf, tlen + sizeof(SMsgHead)); + initRpcMsg(&msg, TDMT_VND_STREAM_TASK_CHECK, buf, tlen + sizeof(SMsgHead)); qDebug("s-task:%s (level:%d) dispatch check msg to s-task:0x%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr, pTask->info.taskLevel, pReq->streamId, pReq->downstreamTaskId, nodeId); @@ -277,50 +277,6 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR return 0; } -int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId, - SEpSet* pEpSet) { - void* buf = NULL; - int32_t code = -1; - SRpcMsg msg = {0}; - - int32_t tlen; - tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code); - if (code < 0) { - return -1; - } - - buf = rpcMallocCont(sizeof(SMsgHead) + tlen); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - ((SMsgHead*)buf)->vgId = htonl(vgId); - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - - SEncoder encoder; - tEncoderInit(&encoder, abuf, tlen); - if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) { - if (buf) { - rpcFreeCont(buf); - } - return code; - } - - tEncoderClear(&encoder); - - msg.contLen = tlen + sizeof(SMsgHead); - msg.pCont = buf; - msg.msgType = TDMT_STREAM_SCAN_HISTORY_FINISH; - - tmsgSendReq(pEpSet, &msg); - - const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); - qDebug("s-task:%s status:%s dispatch scan-history finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pStatus, - pReq->downstreamTaskId, vgId); - return 0; -} - static int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { int32_t code = 0; int32_t numOfBlocks = taosArrayGetSize(pData->blocks); @@ -718,7 +674,7 @@ int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHist tEncoderClear(&encoder); - initRpcMsg(&msg, TDMT_STREAM_SCAN_HISTORY_FINISH, buf, tlen + sizeof(SMsgHead)); + initRpcMsg(&msg, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, buf, tlen + sizeof(SMsgHead)); tmsgSendReq(pEpSet, &msg); const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index f4d990de4c..0258090fb0 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -13,12 +13,12 @@ * along with this program. If not, see . */ -#include #include "executor.h" #include "streamBackendRocksdb.h" #include "streamInt.h" #include "tref.h" #include "ttimer.h" +#include "tstream.h" static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; int32_t streamBackendId = 0; @@ -86,8 +86,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->expandFunc = expandFunc; pMeta->stage = stage; - // send heartbeat every 20sec. - pMeta->hbTmr = taosTmrStart(metaHbToMnode, 20000, pMeta, streamEnv.timer); + // send heartbeat every 5sec. + pMeta->hbTmr = taosTmrStart(metaHbToMnode, 5000, pMeta, streamEnv.timer); pMeta->pTaskBackendUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); @@ -572,6 +572,13 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI32(pEncoder, pReq->vgId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->numOfTasks) < 0) return -1; + + for(int32_t i = 0; i < pReq->numOfTasks; ++i) { + STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i); + if (tEncodeI64(pEncoder, ps->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, ps->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, ps->status) < 0) return -1; + } tEndEncode(pEncoder); return pEncoder->pos; } @@ -580,12 +587,22 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->vgId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->numOfTasks) < 0) return -1; + + pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry)); + for(int32_t i = 0; i < pReq->numOfTasks; ++i) { + STaskStatusEntry hb = {0}; + if (tDecodeI64(pDecoder, &hb.streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &hb.taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &hb.status) < 0) return -1; + + taosArrayPush(pReq->pTaskStatus, &hb); + } + tEndDecode(pDecoder); return 0; } void metaHbToMnode(void* param, void* tmrId) { -#if 0 SStreamMeta* pMeta = param; SStreamHbMsg hbMsg = {0}; @@ -630,6 +647,5 @@ void metaHbToMnode(void* param, void* tmrId) { tmsgSendReq(&pMeta->mgmtInfo.epset, &msg); // next hb will be issued in 20sec. - taosTmrReset(metaHbToMnode, 20000, pMeta, streamEnv.timer, pMeta->hbTmr); -#endif + taosTmrReset(metaHbToMnode, 5000, pMeta, streamEnv.timer, pMeta->hbTmr); } \ No newline at end of file diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 9320fa29d5..c4a8400b43 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -423,7 +423,7 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory int32_t taskLevel = pTask->info.taskLevel; ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK); - // sink node do not send end of scan history msg to its upstream, which is agg task. + // sink tasks do not send end of scan history msg to its upstream, which is agg task. streamAddEndScanHistoryMsg(pTask, pRpcInfo, pReq); int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1); @@ -440,9 +440,11 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory streamAggUpstreamScanHistoryFinish(pTask); } + // all upstream tasks have completed the scan-history task in the stream time window, let's start to extract data + // from the WAL files, which contains the real time stream data. streamNotifyUpstreamContinue(pTask); - // sink node does not receive the pause msg from mnode, so does not need enable it + // mnode will not send the pause/resume message to the sink task, so no need to enable the pause for sink tasks. if (pTask->info.taskLevel == TASK_LEVEL__AGG) { streamTaskEnablePause(pTask); }