diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 93e0064192..cb616f7afc 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -41,5 +41,6 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg); int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode); int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode); +void tqSetRestoreVersionInfo(SStreamTask* pTask); #endif // TDENGINE_TQ_COMMON_H diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index f17716eda0..b717504e1e 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -58,17 +58,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer streamSetupScheduleTrigger(pTask); SCheckpointInfo *pChkInfo = &pTask->chkInfo; - - // checkpoint ver is the kept version, handled data should be the next version. - if (pChkInfo->checkpointId != 0) { - pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1; - pChkInfo->processedVer = pChkInfo->checkpointVer; - pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer; - pTask->execInfo.startCheckpointId = pChkInfo->checkpointId; - - sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, - pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); - } + tqSetRestoreVersionInfo(pTask); char* p = streamTaskGetStatus(pTask)->name; if (pTask->info.fillHistory) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1abbfabc60..567d61e27a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -760,16 +760,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { streamSetupScheduleTrigger(pTask); SCheckpointInfo* pChkInfo = &pTask->chkInfo; - - // checkpoint ver is the kept version, handled data should be the next version. - if (pChkInfo->checkpointId != 0) { - pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1; - pChkInfo->processedVer = pChkInfo->checkpointVer; - pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer; - pTask->execInfo.startCheckpointId = pChkInfo->checkpointId; - tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr, - pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); - } + tqSetRestoreVersionInfo(pTask); char* p = streamTaskGetStatus(pTask)->name; const char* pNext = streamTaskGetStatusStr(pTask->status.taskStatus); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index d2c7924cf5..4ce8579ea0 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -86,6 +86,22 @@ int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode) return TSDB_CODE_SUCCESS; } +void tqSetRestoreVersionInfo(SStreamTask* pTask) { + SCheckpointInfo* pChkInfo = &pTask->chkInfo; + + // checkpoint ver is the kept version, handled data should be the next version. + if (pChkInfo->checkpointId != 0) { + pChkInfo->nextProcessVer = pChkInfo->checkpointVer + 1; + pChkInfo->processedVer = pChkInfo->checkpointVer; + pTask->execInfo.startCheckpointId = pChkInfo->checkpointId; + + tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr, + pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); + } + + pTask->execInfo.startCheckpointVer = pChkInfo->nextProcessVer; +} + int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) { int32_t vgId = pMeta->vgId; int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 23e873d335..115a61f647 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -108,6 +108,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe goto _error; } + // todd: the pk information should comes from the physical plan for(int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) { SColMatchItem* pItem = taosArrayGet(pInfo->matchInfo.pList, i); if (pItem->isPk) { @@ -223,8 +224,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { blockDataCleanup(pInfo->pBufferedRes); taosArrayClear(pInfo->pUidList); - int32_t code = pInfo->readHandle.api.cacheFn.retrieveRows(pInfo->pLastrowReader, pInfo->pBufferedRes, pInfo->pSlotIds, - pInfo->pDstSlotIds, pInfo->pUidList); + int32_t code = pInfo->readHandle.api.cacheFn.retrieveRows(pInfo->pLastrowReader, pInfo->pBufferedRes, + pInfo->pSlotIds, pInfo->pDstSlotIds, pInfo->pUidList); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } @@ -293,9 +294,11 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { } if (NULL == pInfo->pLastrowReader) { - code = pInfo->readHandle.api.cacheFn.openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, - taosArrayGetSize(pInfo->matchInfo.pList), pInfo->pCidList, pInfo->pSlotIds, suid, &pInfo->pLastrowReader, - pTaskInfo->id.str, pInfo->pFuncTypeList, &pInfo->pkCol, pInfo->numOfPks); + code = pInfo->readHandle.api.cacheFn.openReader( + pInfo->readHandle.vnode, pInfo->retrieveType, pList, num, taosArrayGetSize(pInfo->matchInfo.pList), + pInfo->pCidList, pInfo->pSlotIds, suid, &pInfo->pLastrowReader, pTaskInfo->id.str, pInfo->pFuncTypeList, + &pInfo->pkCol, pInfo->numOfPks); + if (code != TSDB_CODE_SUCCESS) { pInfo->currentGroupIndex += 1; taosArrayClear(pInfo->pUidList); diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index e7e0be6d57..a824979801 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -570,13 +570,14 @@ static int32_t createLastRowScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSu if (pScanLogicNode->pVgroupList) { vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); } - int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode); + int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode); if (TSDB_CODE_SUCCESS == code && pScanLogicNode->pFuncTypes != NULL) { pScan->pFuncTypes = taosArrayInit(taosArrayGetSize(pScanLogicNode->pFuncTypes), sizeof(int32_t)); if (NULL == pScan->pFuncTypes) { return TSDB_CODE_OUT_OF_MEMORY; } + SNode* pTargetNode = NULL; int funcTypeIndex = 0; FOREACH(pTargetNode, ((SScanPhysiNode*)pScan)->pScanCols) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index b4c06e135a..f7f790fbe7 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1288,7 +1288,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { } SStreamTask* pTask = *(SStreamTask**)pIter; - stDebug("vgId:%d s-task:%s set closing flag", vgId, pTask->id.idStr); + stDebug("vgId:%d s-task:%s set task closing flag", vgId, pTask->id.idStr); streamTaskStop(pTask); } @@ -1649,6 +1649,13 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 return 0; } + void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + if (p == NULL) { // task does not exists in current vnode, not record the complete info + qError("vgId:%d s-task:0x%x not exists discard the check downstream info", pMeta->vgId, taskId); + streamMetaWUnLock(pMeta); + return 0; + } + SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet; STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready}; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 70c2619f6f..7dc93ceccf 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -993,16 +993,17 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i); if (p->taskId == taskId) { ASSERT(reqId == p->reqId); - p->status = status; - p->rspTs = rspTs; // count down one, since it is ready now - if (p->status == TASK_DOWNSTREAM_READY) { + if ((p->status != TASK_DOWNSTREAM_READY) && (status == TASK_DOWNSTREAM_READY)) { *pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1); } else { *pNotReady = pInfo->notReadyTasks; } + p->status = status; + p->rspTs = rspTs; + taosThreadMutexUnlock(&pInfo->checkInfoLock); return TSDB_CODE_SUCCESS; } @@ -1026,7 +1027,7 @@ int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) { } taosThreadMutexUnlock(&pInfo->checkInfoLock); - stDebug("s-task:%s set the in check procedure flag", id); + stDebug("s-task:%s set the in-check-procedure flag", id); return 0; } @@ -1038,7 +1039,7 @@ int32_t streamTaskCompleteCheck(STaskCheckInfo* pInfo, const char* id) { } int64_t el = taosGetTimestampMs() - pInfo->startTs; - stDebug("s-task:%s check downstream completed, elapsed time:%" PRId64 " ms", id, el); + stDebug("s-task:%s clear the in-check-procedure flag, elapsed time:%" PRId64 " ms", id, el); pInfo->startTs = 0; pInfo->inCheckProcess = 0; @@ -1098,27 +1099,36 @@ static void rspMonitorFn(void* param, void* tmrId) { int64_t now = taosGetTimestampMs(); int64_t el = now - pInfo->startTs; ETaskStatus state = pStat->state; - int32_t numOfReady = 0; int32_t numOfFault = 0; + const char* id = pTask->id.idStr; - stDebug("s-task:%s start to do check downstream rsp check", pTask->id.idStr); + stDebug("s-task:%s start to do check downstream rsp check", id); - if (state == TASK_STATUS__STOP || state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) { + if (state == TASK_STATUS__STOP) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s status:%s vgId:%d quit from monitor rsp tmr, ref:%d", pTask->id.idStr, pStat->name, vgId, ref); - streamTaskCompleteCheck(pInfo, pTask->id.idStr); + stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); + streamTaskCompleteCheck(pInfo, id); + + streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); + return; + } + + if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); + streamTaskCompleteCheck(pInfo, id); return; } taosThreadMutexLock(&pInfo->checkInfoLock); if (pInfo->notReadyTasks == 0) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", pTask->id.idStr, - pStat->name, vgId, ref); + stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", id, pStat->name, + vgId, ref); taosThreadMutexUnlock(&pInfo->checkInfoLock); - streamTaskCompleteCheck(pInfo, pTask->id.idStr); + streamTaskCompleteCheck(pInfo, id); return; } @@ -1131,13 +1141,12 @@ static void rspMonitorFn(void* param, void* tmrId) { if (p->status == TASK_DOWNSTREAM_READY) { numOfReady += 1; } else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) { - stDebug("s-task:%s recv status from downstream, task:0x%x, quit from check downstream tasks", pTask->id.idStr, - p->taskId); + stDebug("s-task:%s recv status from downstream, task:0x%x, quit from check downstream tasks", id, p->taskId); numOfFault += 1; - } else { // TASK_DOWNSTREAM_NOT_READY - if (p->rspTs == 0) { // not response yet + } else { // TASK_DOWNSTREAM_NOT_READY + if (p->rspTs == 0) { // not response yet ASSERT(p->status == -1); - if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec. + if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec. taosArrayPush(pTimeoutList, &p->taskId); } else { // el < CHECK_NOT_RSP_DURATION // do nothing and continue waiting for their rsps @@ -1148,25 +1157,26 @@ static void rspMonitorFn(void* param, void* tmrId) { } } } else { // unexpected status - stError("s-task:%s unexpected task status:%s during waiting for check rsp", pTask->id.idStr, pStat->name); + stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name); } int32_t numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList); int32_t numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList); // fault tasks detected, not try anymore - if (((numOfReady + numOfFault + numOfNotReady + numOfTimeout) == taosArrayGetSize(pInfo->pList)) && (numOfFault > 0)) { + if (((numOfReady + numOfFault + numOfNotReady + numOfTimeout) == taosArrayGetSize(pInfo->pList)) && + (numOfFault > 0)) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug( "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart " - "detected, ref:%d", - pTask->id.idStr, pStat->name, vgId, ref); + "detected, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", + id, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); taosThreadMutexUnlock(&pInfo->checkInfoLock); taosArrayDestroy(pNotReadyList); taosArrayDestroy(pTimeoutList); - streamTaskCompleteCheck(pInfo, pTask->id.idStr); + streamTaskCompleteCheck(pInfo, id); return; } @@ -1176,14 +1186,14 @@ static void rspMonitorFn(void* param, void* tmrId) { stDebug( "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notReady:%d, fault:%d, " "timeout:%d, ready:%d ref:%d", - pTask->id.idStr, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + id, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); taosThreadMutexUnlock(&pInfo->checkInfoLock); // add the not-ready tasks into the final task status result buf, along with related fill-history task if exists. streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; - streamMetaAddTaskLaunchResult(pTask->pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId, pInfo->startTs, now, false); + STaskId* pHId = &pTask->hTaskInfo.id; + streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false); } return; } @@ -1205,7 +1215,7 @@ static void rspMonitorFn(void* param, void* tmrId) { } } - stDebug("s-task:%s %d downstream task(s) not ready, send check msg again", pTask->id.idStr, numOfNotReady); + stDebug("s-task:%s %d downstream task(s) not ready, send check msg again", id, numOfNotReady); } if (numOfTimeout > 0) { @@ -1225,15 +1235,14 @@ static void rspMonitorFn(void* param, void* tmrId) { } } - stDebug("s-task:%s %d downstream tasks timeout, send check msg again, start ts:%" PRId64, pTask->id.idStr, - numOfTimeout, now); + stDebug("s-task:%s %d downstream tasks timeout, send check msg again, start ts:%" PRId64, id, numOfTimeout, now); } taosThreadMutexUnlock(&pInfo->checkInfoLock); taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); - stDebug("s-task:%s continue checking rsp in 200ms, notReady:%d, fault:%d, timeout:%d, ready:%d", pTask->id.idStr, - numOfNotReady, numOfFault, numOfTimeout, numOfReady); + stDebug("s-task:%s continue checking rsp in 200ms, notReady:%d, fault:%d, timeout:%d, ready:%d", id, numOfNotReady, + numOfFault, numOfTimeout, numOfReady); taosArrayDestroy(pNotReadyList); taosArrayDestroy(pTimeoutList);