fix(stream): add timer to check stream task status.

This commit is contained in:
Haojun Liao 2023-08-22 10:46:49 +08:00
parent 873c22abb1
commit def0877980
11 changed files with 219 additions and 161 deletions

View File

@ -256,16 +256,13 @@ enum {
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED1, "stream-unused1", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY, "stream-scan-history", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY_FINISH, "stream-scan-history-finish", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "stream-task-check", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT_READY, "stream-checkpoint-ready", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_REPORT_CHECKPOINT, "stream-report-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESTORE_CHECKPOINT, "stream-restore-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_PAUSE, "stream-task-pause", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESUME, "stream-task-resume", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_STOP, "stream-task-stop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_MON_MSG)
TD_DEF_MSG_TYPE(TDMT_MON_MAX_MSG, "monitor-max", NULL, NULL)
@ -304,8 +301,10 @@ enum {
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG)
// TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY_FINISH, "vnode-stream-scan-history-finish", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_UPDATE, "vnode-stream-update", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_CHECK, "vnode-stream-task-check", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG)

View File

@ -543,9 +543,16 @@ typedef struct {
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pRsp);
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp);
typedef struct {
typedef struct STaskStatusEntry {
int64_t streamId;
int32_t taskId;
int32_t status;
} STaskStatusEntry;
typedef struct SStreamHbMsg {
int32_t vgId;
int32_t numOfTasks;
SArray* pTaskStatus; // SArray<SStreamTaskStatusEntry>
} SStreamHbMsg;
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp);

View File

@ -77,10 +77,10 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
code = 0;

View File

@ -737,10 +737,10 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
// if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -43,6 +43,9 @@ typedef struct SNodeEntry {
typedef struct SStreamVnodeRevertIndex {
SArray *pNodeEntryList;
int64_t ts; // snapshot ts
SHashObj *pTaskMap;
SArray *pTaskList;
TdThreadMutex lock;
} SStreamVnodeRevertIndex;
typedef struct SVgroupChangeInfo {
@ -74,6 +77,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
int64_t streamId, int32_t taskId);
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode);
static SArray *doExtractNodeListFromStream(SMnode *pMnode);
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode);
@ -106,7 +110,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint);
// mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq);
@ -118,6 +122,10 @@ int32_t mndInitStream(SMnode *pMnode) {
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
taosThreadMutexInit(&execNodeList.lock, NULL);
execNodeList.pTaskMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
execNodeList.pTaskList = taosArrayInit(4, sizeof(STaskStatusEntry));
return sdbSetTable(pMnode->pSdb, table);
}
@ -853,6 +861,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
mndTransDrop(pTrans);
keepStreamTasksInBuf(pStream, &execNodeList);
code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER:
@ -1123,13 +1132,14 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SStreamObj *pStream = NULL;
int32_t code = 0;
{
{ // check if the node update happens or not
int64_t ts = taosGetTimestampSec();
if (execNodeList.pNodeEntryList == NULL || (taosArrayGetSize(execNodeList.pNodeEntryList) == 0)) {
if (execNodeList.pNodeEntryList != NULL) {
execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList);
}
execNodeList.pNodeEntryList = doExtractNodeListFromStream(pMnode);
}
@ -1154,6 +1164,26 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
}
}
{// check if all tasks are in TASK_STATUS__NORMAL status
bool ready = true;
taosThreadMutexLock(&execNodeList.lock);
for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pTaskList); ++i) {
STaskStatusEntry* p = taosArrayGet(execNodeList.pTaskList, i);
if (p->status != TASK_STATUS__NORMAL) {
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, create checkpoint msg not issued",
p->streamId, p->taskId, 0, streamGetTaskStatusStr(p->status));
ready = false;
break;
}
}
taosThreadMutexUnlock(&execNodeList.lock);
if (!ready) {
return 0;
}
}
SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont;
int64_t checkpointId = pMsg->checkpointId;
@ -2109,6 +2139,7 @@ static SArray *doExtractNodeListFromStream(SMnode *pMnode) {
return plist;
}
// this function runs by only one thread, so it is not multi-thread safe
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
if (old != 0) {
@ -2124,6 +2155,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
if (execNodeList.pNodeEntryList != NULL) {
execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList);
}
execNodeList.pNodeEntryList = doExtractNodeListFromStream(pMnode);
}
@ -2172,85 +2204,118 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
return 0;
}
static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode) {
int32_t level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfTasks; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
void* p = taosHashGet(pExecNode->pTaskMap, keys, sizeof(keys));
if (p == NULL) {
STaskStatusEntry entry = {
.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .status = TASK_STATUS__STOP};
taosArrayPush(pExecNode->pTaskList, &entry);
int32_t ordinal = taosArrayGetSize(pExecNode->pTaskList) - 1;
taosHashPut(pExecNode->pTaskMap, keys, sizeof(keys), &ordinal, sizeof(ordinal));
}
}
}
}
// todo: this process should be executed by the write queue worker of the mnode
// int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// SMnode *pMnode = pReq->info.node;
// SSdb *pSdb = pMnode->pSdb;
// SStreamHbMsg req = {0};
// int32_t code = TSDB_CODE_SUCCESS;
//
// SDecoder decoder = {0};
// tDecoderInit(&decoder, (uint8_t *)pReq->pCont, pReq->contLen);
//
// if (tStartDecode(&decoder) < 0) return -1;
//
// if (tDecodeStreamHbMsg(&decoder, &req) < 0) {
// terrno = TSDB_CODE_INVALID_MSG;
// return -1;
// }
//
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SStreamHbMsg req = {0};
int32_t code = TSDB_CODE_SUCCESS;
SDecoder decoder = {0};
tDecoderInit(&decoder, (uint8_t *)pReq->pCont, pReq->contLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeStreamHbMsg(&decoder, &req) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
// int64_t now = taosGetTimestampSec();
// mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
//
// // timeout list
// bool nodeChanged = false;
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
taosThreadMutexLock(&execNodeList.lock);
for(int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry* p = taosArrayGet(req.pTaskStatus, i);
int64_t k[2] = {p->streamId, p->taskId};
int32_t index = *(int32_t*) taosHashGet(execNodeList.pTaskMap, &k, sizeof(k));
STaskStatusEntry* pStatusEntry = taosArrayGet(execNodeList.pTaskList, index);
pStatusEntry->status = p->status;
}
taosThreadMutexUnlock(&execNodeList.lock);
// bool nodeChanged = false;
// SArray* pList = taosArrayInit(4, sizeof(int32_t));
//
// // record the timeout node
// for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) {
// SNodeEntry* pEntry = taosArrayGet(execNodeList.pNodeEntryList, i);
// int64_t duration = now - pEntry->hbTimestamp;
// if (duration > MND_STREAM_HB_INTERVAL) { // execNode timeout, try next
// taosArrayPush(pList, &pEntry);
// mWarn("nodeId:%d stream node timeout, since last hb:%"PRId64"s", pEntry->nodeId, duration);
// continue;
// }
//
// if (pEntry->nodeId != req.vgId) {
// continue;
// }
//
// pEntry->hbTimestamp = now;
//
// // check epset to identify whether the node has been transferred to other dnodes.
// // node the epset is changed, which means the node transfer has occurred for this node.
//// if (!isEpsetEqual(&pEntry->epset, &req.epset)) {
//// nodeChanged = true;
//// break;
//// }
// }
//
// // todo handle the node timeout case. Once the vnode is off-line, we should check the dnode status from mnode,
// // to identify whether the dnode is truely offline or not.
//
// // handle the node changed case
// if (!nodeChanged) {
// return TSDB_CODE_SUCCESS;
// }
//
// int32_t nodeId = req.vgId;
//
// {// check all streams that involved this vnode should update the epset info
// SStreamObj *pStream = NULL;
// void *pIter = NULL;
// while (1) {
// pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
// if (pIter == NULL) {
/*
// record the timeout node
for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) {
SNodeEntry* pEntry = taosArrayGet(execNodeList.pNodeEntryList, i);
int64_t duration = now - pEntry->hbTimestamp;
if (duration > MND_STREAM_HB_INTERVAL) { // execNode timeout, try next
taosArrayPush(pList, &pEntry);
mWarn("nodeId:%d stream node timeout, since last hb:%"PRId64"s", pEntry->nodeId, duration);
continue;
}
if (pEntry->nodeId != req.vgId) {
continue;
}
pEntry->hbTimestamp = now;
// check epset to identify whether the node has been transferred to other dnodes.
// node the epset is changed, which means the node transfer has occurred for this node.
// if (!isEpsetEqual(&pEntry->epset, &req.epset)) {
// nodeChanged = true;
// break;
// }
//
// // update the related upstream and downstream tasks, todo remove this, no need this function
// taosWLockLatch(&pStream->lock);
//// streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset);
//// streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset);
// taosWUnLockLatch(&pStream->lock);
//
//// code = createStreamUpdateTrans(pMnode, pStream, nodeId, );
//// if (code != TSDB_CODE_SUCCESS) {
//// todo
}
// todo handle the node timeout case. Once the vnode is off-line, we should check the dnode status from mnode,
// to identify whether the dnode is truely offline or not.
// handle the node changed case
if (!nodeChanged) {
return TSDB_CODE_SUCCESS;
}
int32_t nodeId = req.vgId;
{// check all streams that involved this vnode should update the epset info
SStreamObj *pStream = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
break;
}
// update the related upstream and downstream tasks, todo remove this, no need this function
taosWLockLatch(&pStream->lock);
// streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset);
// streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset);
taosWUnLockLatch(&pStream->lock);
// code = createStreamUpdateTrans(pMnode, pStream, nodeId, );
// if (code != TSDB_CODE_SUCCESS) {
// todo
//// }
// }
// }
//
// return TSDB_CODE_SUCCESS;
//}
}
*/
return TSDB_CODE_SUCCESS;
}

View File

@ -411,13 +411,13 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
return sndProcessTaskRetrieveReq(pSnode, pMsg);
case TDMT_STREAM_RETRIEVE_RSP:
return sndProcessTaskRetrieveRsp(pSnode, pMsg);
case TDMT_STREAM_SCAN_HISTORY_FINISH:
case TDMT_VND_STREAM_SCAN_HISTORY_FINISH:
return sndProcessStreamTaskScanHistoryFinishReq(pSnode, pMsg);
case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP:
case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP:
return sndProcessTaskRecoverFinishRsp(pSnode, pMsg);
case TDMT_STREAM_TASK_CHECK:
case TDMT_VND_STREAM_TASK_CHECK:
return sndProcessStreamTaskCheckReq(pSnode, pMsg);
case TDMT_STREAM_TASK_CHECK_RSP:
case TDMT_VND_STREAM_TASK_CHECK_RSP:
return sndProcessStreamTaskCheckRsp(pSnode, pMsg);
default:
ASSERT(0);

View File

@ -1045,9 +1045,10 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
}
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64
" child id:%d, level:%d, fill-history:%d, trigger:%" PRId64 " ms",
" child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms",
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer,
pTask->info.selfChildId, pTask->info.taskLevel, pTask->info.fillHistory, pTask->triggerParam);
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->info.fillHistory, pTask->triggerParam);
return 0;
}
@ -1335,9 +1336,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
streamMetaReleaseTask(pMeta, pTask);
streamMetaReleaseTask(pMeta, pStreamTask);
} else {
// todo update the chkInfo version for current task.
// this task has an associated history stream task, so we need to scan wal from the end version of
// history scan. The current version of chkInfo.current is not updated during the history scan
STimeWindow* pWindow = &pTask->dataRange.window;
if (pTask->historyTaskId.taskId == 0) {
@ -1356,7 +1354,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
id, pTask->chkInfo.currentVer, pWindow->skey, pWindow->ekey);
}
// notify the downstream agg tasks that upstream tasks are ready to processing the WAL data, update the
code = streamTaskScanHistoryDataComplete(pTask);
streamMetaReleaseTask(pMeta, pTask);
@ -1408,6 +1405,7 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) {
return 0;
}
// only the agg tasks and the sink tasks will receive this message from upstream tasks
int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
@ -1434,6 +1432,7 @@ int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
return code;
}
// NOTE: the rsp msg should be kept in WAL file.
int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
@ -1462,6 +1461,20 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
"s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history "
"completed msg",
pTask->id.idStr, req.downstreamId);
// the scan-history finish status should be recorded in the WAL files. So the transfer of the task status from
// scan-history
// to normal should be executed by write thread of each vnode.
// void* buf = NULL;
// int32_t tlen = 0;
// // encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen);
//
// SRpcMsg msg = {.msgType = TDMT_VND_CREATE_TABLE, .pCont = buf, .contLen = tlen};
// if (tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
// tqError("failed to put into write-queue since %s", terrstr());
// }
streamProcessScanHistoryFinishRsp(pTask);
}

View File

@ -652,9 +652,9 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
case TDMT_STREAM_TASK_DISPATCH_RSP:
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_CHECK:
case TDMT_VND_STREAM_TASK_CHECK:
return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_CHECK_RSP:
case TDMT_VND_STREAM_TASK_CHECK_RSP:
return tqProcessStreamTaskCheckRsp(pVnode->pTq, 0, pMsg);
case TDMT_STREAM_RETRIEVE:
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
@ -662,9 +662,9 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_SCAN_HISTORY:
return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
case TDMT_STREAM_SCAN_HISTORY_FINISH:
case TDMT_VND_STREAM_SCAN_HISTORY_FINISH:
return tqProcessTaskScanHistoryFinishReq(pVnode->pTq, pMsg);
case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP:
case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP:
return tqProcessTaskScanHistoryFinishRsp(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_CHECK_POINT_SOURCE:
return tqProcessStreamCheckPointSourceReq(pVnode->pTq, pMsg);

View File

@ -269,7 +269,7 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR
}
tEncoderClear(&encoder);
initRpcMsg(&msg, TDMT_STREAM_TASK_CHECK, buf, tlen + sizeof(SMsgHead));
initRpcMsg(&msg, TDMT_VND_STREAM_TASK_CHECK, buf, tlen + sizeof(SMsgHead));
qDebug("s-task:%s (level:%d) dispatch check msg to s-task:0x%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr,
pTask->info.taskLevel, pReq->streamId, pReq->downstreamTaskId, nodeId);
@ -277,50 +277,6 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR
return 0;
}
int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
SEpSet* pEpSet) {
void* buf = NULL;
int32_t code = -1;
SRpcMsg msg = {0};
int32_t tlen;
tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code);
if (code < 0) {
return -1;
}
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
((SMsgHead*)buf)->vgId = htonl(vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) {
if (buf) {
rpcFreeCont(buf);
}
return code;
}
tEncoderClear(&encoder);
msg.contLen = tlen + sizeof(SMsgHead);
msg.pCont = buf;
msg.msgType = TDMT_STREAM_SCAN_HISTORY_FINISH;
tmsgSendReq(pEpSet, &msg);
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
qDebug("s-task:%s status:%s dispatch scan-history finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pStatus,
pReq->downstreamTaskId, vgId);
return 0;
}
static int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
int32_t code = 0;
int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
@ -718,7 +674,7 @@ int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHist
tEncoderClear(&encoder);
initRpcMsg(&msg, TDMT_STREAM_SCAN_HISTORY_FINISH, buf, tlen + sizeof(SMsgHead));
initRpcMsg(&msg, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, buf, tlen + sizeof(SMsgHead));
tmsgSendReq(pEpSet, &msg);
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);

View File

@ -13,12 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <libs/sync/sync.h>
#include "executor.h"
#include "streamBackendRocksdb.h"
#include "streamInt.h"
#include "tref.h"
#include "ttimer.h"
#include "tstream.h"
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
int32_t streamBackendId = 0;
@ -86,8 +86,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->expandFunc = expandFunc;
pMeta->stage = stage;
// send heartbeat every 20sec.
pMeta->hbTmr = taosTmrStart(metaHbToMnode, 20000, pMeta, streamEnv.timer);
// send heartbeat every 5sec.
pMeta->hbTmr = taosTmrStart(metaHbToMnode, 5000, pMeta, streamEnv.timer);
pMeta->pTaskBackendUnique =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
@ -572,6 +572,13 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->vgId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->numOfTasks) < 0) return -1;
for(int32_t i = 0; i < pReq->numOfTasks; ++i) {
STaskStatusEntry* ps = taosArrayGet(pReq->pTaskStatus, i);
if (tEncodeI64(pEncoder, ps->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, ps->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, ps->status) < 0) return -1;
}
tEndEncode(pEncoder);
return pEncoder->pos;
}
@ -580,12 +587,22 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->vgId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->numOfTasks) < 0) return -1;
pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry));
for(int32_t i = 0; i < pReq->numOfTasks; ++i) {
STaskStatusEntry hb = {0};
if (tDecodeI64(pDecoder, &hb.streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &hb.taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &hb.status) < 0) return -1;
taosArrayPush(pReq->pTaskStatus, &hb);
}
tEndDecode(pDecoder);
return 0;
}
void metaHbToMnode(void* param, void* tmrId) {
#if 0
SStreamMeta* pMeta = param;
SStreamHbMsg hbMsg = {0};
@ -630,6 +647,5 @@ void metaHbToMnode(void* param, void* tmrId) {
tmsgSendReq(&pMeta->mgmtInfo.epset, &msg);
// next hb will be issued in 20sec.
taosTmrReset(metaHbToMnode, 20000, pMeta, streamEnv.timer, pMeta->hbTmr);
#endif
taosTmrReset(metaHbToMnode, 5000, pMeta, streamEnv.timer, pMeta->hbTmr);
}

View File

@ -423,7 +423,7 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
int32_t taskLevel = pTask->info.taskLevel;
ASSERT(taskLevel == TASK_LEVEL__AGG || taskLevel == TASK_LEVEL__SINK);
// sink node do not send end of scan history msg to its upstream, which is agg task.
// sink tasks do not send end of scan history msg to its upstream, which is agg task.
streamAddEndScanHistoryMsg(pTask, pRpcInfo, pReq);
int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1);
@ -440,9 +440,11 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
streamAggUpstreamScanHistoryFinish(pTask);
}
// all upstream tasks have completed the scan-history task in the stream time window, let's start to extract data
// from the WAL files, which contains the real time stream data.
streamNotifyUpstreamContinue(pTask);
// sink node does not receive the pause msg from mnode, so does not need enable it
// mnode will not send the pause/resume message to the sink task, so no need to enable the pause for sink tasks.
if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
streamTaskEnablePause(pTask);
}