diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 34496432ae..63da78a174 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -535,7 +535,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, bool fillHistory SArray* pTaskList, bool hasFillhistory); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); -void tFreeStreamTask(SStreamTask* pTask); +void tFreeStreamTask(SStreamTask* pTask, bool metaLock); int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver); int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo); @@ -818,7 +818,7 @@ bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask); bool streamTaskSetSchedStatusWait(SStreamTask* pTask); int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask); int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask); -int32_t streamTaskClearHTaskAttr(SStreamTask* pTask); +int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, bool metaLock); int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event); int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, void* pFn); diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 172c3952ad..ae72172bbb 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -182,7 +182,7 @@ void *freeStreamTasks(SArray *pTaskLevel) { int32_t taskSz = taosArrayGetSize(pLevel); for (int32_t j = 0; j < taskSz; j++) { SStreamTask *pTask = taosArrayGetP(pLevel, j); - tFreeStreamTask(pTask); + tFreeStreamTask(pTask, true); } taosArrayDestroy(pLevel); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 1d40dd33b2..696daca918 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1214,7 +1214,7 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) { if (pEntry->status != TASK_STATUS__READY) { mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, checkpoint msg not issued", - pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamTaskGetStatusStr(pEntry->status)); + pEntry->id.streamId, (int32_t)pEntry->id.taskId, pEntry->nodeId, streamTaskGetStatusStr(pEntry->status)); ready = false; break; } @@ -2893,14 +2893,33 @@ static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) { } } +typedef struct SFailedCheckpointInfo { + int64_t streamUid; + int64_t checkpointId; + int32_t transId; +} SFailedCheckpointInfo; + +static void addIntoCheckpointList(SArray* pList, const SFailedCheckpointInfo* pInfo) { + int32_t num = taosArrayGetSize(pList); + for(int32_t i = 0; i < num; ++i) { + SFailedCheckpointInfo* p = taosArrayGet(pList, i); + if (p->transId == pInfo->transId) { + return; + } + } + + taosArrayPush(pList, pInfo); +} + int32_t mndProcessStreamHb(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamHbMsg req = {0}; - bool checkpointFailed = false; - int64_t checkpointId = 0; - int64_t streamId = 0; - int32_t transId = 0; +// bool checkpointFailed = false; +// int64_t checkpointId = 0; +// int64_t streamId = 0; +// int32_t transId = 0; + SArray* pList = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); SDecoder decoder = {0}; tDecoderInit(&decoder, pReq->pCont, pReq->contLen); @@ -2961,19 +2980,13 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { streamTaskStatusCopy(pTaskEntry, p); if (p->checkpointId != 0) { - if (checkpointId != 0) { - ASSERT(checkpointId == p->checkpointId); - } else { - checkpointId = p->checkpointId; - } - if (p->checkpointFailed) { mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId, p->checkpointId, p->chkpointTransId); - checkpointFailed = p->checkpointFailed; - streamId = p->id.streamId; - transId = p->chkpointTransId; + SFailedCheckpointInfo info = { + .transId = p->chkpointTransId, .checkpointId = p->checkpointId, .streamUid = p->id.streamId}; + addIntoCheckpointList(pList, &info); } } } @@ -2992,15 +3005,20 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { // current checkpoint is failed, rollback from the checkpoint trans // kill the checkpoint trans and then set all tasks status to be normal - if (checkpointFailed && checkpointId != 0) { + if (taosArrayGetSize(pList) > 0) { bool allReady = true; SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady); taosArrayDestroy(p); if (allReady || snodeChanged) { // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal - mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", checkpointId); - mndResetStatusFromCheckpoint(pMnode, streamId, transId); + for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) { + SFailedCheckpointInfo *pInfo = taosArrayGet(pList, i); + mInfo("checkpointId:%" PRId64 " transId:%d failed, issue task-reset trans to reset all tasks status", + pInfo->checkpointId, pInfo->transId); + + mndResetStatusFromCheckpoint(pMnode, pInfo->streamUid, pInfo->transId); + } } else { mInfo("not all vgroups are ready, wait for next HB from stream tasks to reset the task status"); } @@ -3009,6 +3027,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { taosThreadMutexUnlock(&execInfo.lock); streamMetaClearHbMsg(&req); + taosArrayDestroy(pList); return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 138bcbb133..dd20f38093 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -97,7 +97,7 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { } if (pItem->pStreamTask) { - tFreeStreamTask(pItem->pStreamTask); + tFreeStreamTask(pItem->pStreamTask, true); } taosArrayDestroy(pItem->pResList); tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f35a3233d7..2e947e4a4c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1202,16 +1202,8 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamProcessCheckpointSourceReq(pTask, &req); taosThreadMutexUnlock(&pTask->lock); - int32_t total = 0; - streamMetaWLock(pMeta); - - // set the initial value for generating check point - // set the mgmt epset info according to the checkout source msg from mnode, todo update mgmt epset if needed - total = pMeta->numOfStreamTasks; - streamMetaWUnLock(pMeta); - - qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", total checkpoint reqs:%d", - pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, total); + qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d", + pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId); code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask, 1); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 00b3860565..b457b1da87 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -617,7 +617,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve if (code < 0) { tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code)); - tFreeStreamTask(pTask); + tFreeStreamTask(pTask, true); return code; } @@ -645,7 +645,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve } } else { tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store", vgId, taskId); - tFreeStreamTask(pTask); + tFreeStreamTask(pTask, true); } return code; @@ -663,7 +663,8 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pHTaskId = &pTask->hTaskInfo.id; streamMetaUnregisterTask(pMeta, pHTaskId->streamId, pHTaskId->taskId); - tqDebug("vgId:%d drop fill-history task:0x%x dropped firstly", vgId, (int32_t)pHTaskId->taskId); + tqDebug("s-task:0x%x vgId:%d drop fill-history task:0x%x firstly", pReq->taskId, vgId, + (int32_t)pHTaskId->taskId); } streamMetaReleaseTask(pMeta, pTask); } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 8c43a0d423..98963967fb 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -503,11 +503,16 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { if ((code == TSDB_CODE_SUCCESS) && dropRelHTask) { // transferred from the halt status, it is done the fill-history procedure and finish with the checkpoint // free it and remove fill-history task from disk meta-store - ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask)); - SStreamTaskId hTaskId = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId}; + taosThreadMutexLock(&pTask->lock); + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { + SStreamTaskId hTaskId = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId}; - stDebug("s-task:%s fill-history finish checkpoint done, drop related fill-history task:0x%x", id, hTaskId.taskId); - streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pTask->pMeta->vgId, &hTaskId); + stDebug("s-task:%s fill-history finish checkpoint done, drop related fill-history task:0x%x", id, hTaskId.taskId); + streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pTask->pMeta->vgId, &hTaskId); + } else { + stWarn("s-task:%s related fill-history task:0x%x is erased", id, (int32_t)pTask->hTaskInfo.id.taskId); + } + taosThreadMutexUnlock(&pTask->lock); } // clear the checkpoint info if failed diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 4a1fa40091..112777da9e 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -597,19 +597,19 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa } if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { - tFreeStreamTask(pTask); + tFreeStreamTask(pTask, false); return -1; } taosArrayPush(pMeta->pTaskList, &pTask->id); if (streamMetaSaveTask(pMeta, pTask) < 0) { - tFreeStreamTask(pTask); + tFreeStreamTask(pTask, false); return -1; } if (streamMetaCommit(pMeta) < 0) { - tFreeStreamTask(pTask); + tFreeStreamTask(pTask, false); return -1; } @@ -653,7 +653,7 @@ void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) stTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref); } else if (ref == 0) { stTrace("s-task:%s all refs are gone, free it", pTask->id.idStr); - tFreeStreamTask(pTask); + tFreeStreamTask(pTask, true); } else if (ref < 0) { stError("task ref is invalid, ref:%d, %s", ref, pTask->id.idStr); } @@ -724,14 +724,13 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t pTask = *ppTask; // it is an fill-history task, remove the related stream task's id that points to it - if (pTask->info.fillHistory == 1) { - streamTaskClearHTaskAttr(pTask); - } else { - atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); - } + atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); + streamMetaRemoveTask(pMeta, &id); + + streamMetaWUnLock(pMeta); ASSERT(pTask->status.timerActive == 0); @@ -742,13 +741,12 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t streamMetaReleaseTask(pMeta, pTask); } - streamMetaRemoveTask(pMeta, &id); streamMetaReleaseTask(pMeta, pTask); } else { stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId); + streamMetaWUnLock(pMeta); } - streamMetaWUnLock(pMeta); return 0; } @@ -862,7 +860,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (tDecodeStreamTask(&decoder, pTask) < 0) { tDecoderClear(&decoder); doClear(pKey, pVal, pCur, pRecycleList); - tFreeStreamTask(pTask); + tFreeStreamTask(pTask, false); stError( "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild " "stream manually", @@ -873,7 +871,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (pTask->status.taskStatus == TASK_STATUS__DROPPING) { int32_t taskId = pTask->id.taskId; - tFreeStreamTask(pTask); + tFreeStreamTask(pTask, false); STaskId id = streamTaskGetTaskId(pTask); taosArrayPush(pRecycleList, &id); @@ -889,7 +887,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (p == NULL) { if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1) < 0) { doClear(pKey, pVal, pCur, pRecycleList); - tFreeStreamTask(pTask); + tFreeStreamTask(pTask, false); return -1; } @@ -903,7 +901,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES) < 0) { doClear(pKey, pVal, pCur, pRecycleList); - tFreeStreamTask(pTask); + tFreeStreamTask(pTask, false); return -1; } @@ -1306,28 +1304,28 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) { } void streamMetaRLock(SStreamMeta* pMeta) { -// stTrace("vgId:%d meta-rlock", pMeta->vgId); + stTrace("vgId:%d meta-rlock", pMeta->vgId); taosThreadRwlockRdlock(&pMeta->lock); } void streamMetaRUnLock(SStreamMeta* pMeta) { -// stTrace("vgId:%d meta-runlock", pMeta->vgId); + stTrace("vgId:%d meta-runlock", pMeta->vgId); int32_t code = taosThreadRwlockUnlock(&pMeta->lock); if (code != TSDB_CODE_SUCCESS) { stError("vgId:%d meta-runlock failed, code:%d", pMeta->vgId, code); } else { -// stDebug("vgId:%d meta-runlock completed", pMeta->vgId); + stDebug("vgId:%d meta-runlock completed", pMeta->vgId); } } void streamMetaWLock(SStreamMeta* pMeta) { -// stTrace("vgId:%d meta-wlock", pMeta->vgId); + stTrace("vgId:%d meta-wlock", pMeta->vgId); taosThreadRwlockWrlock(&pMeta->lock); -// stTrace("vgId:%d meta-wlock completed", pMeta->vgId); + stTrace("vgId:%d meta-wlock completed", pMeta->vgId); } void streamMetaWUnLock(SStreamMeta* pMeta) { -// stTrace("vgId:%d meta-wunlock", pMeta->vgId); + stTrace("vgId:%d meta-wunlock", pMeta->vgId); taosThreadRwlockUnlock(&pMeta->lock); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 2f821832ca..83055c0f70 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -340,11 +340,16 @@ int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) { return 0; } -void tFreeStreamTask(SStreamTask* pTask) { +void tFreeStreamTask(SStreamTask* pTask, bool metaLock) { char* p = NULL; int32_t taskId = pTask->id.taskId; STaskExecStatisInfo* pStatis = &pTask->execInfo; + // check for mnode + if (pTask->pMeta != NULL && ) { + streamTaskClearHTaskAttr(pTask, metaLock); + } + ETaskStatus status1 = TASK_STATUS__UNINIT; taosThreadMutexLock(&pTask->lock); if (pTask->status.pSM != NULL) { @@ -733,22 +738,32 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) { return status; } -int32_t streamTaskClearHTaskAttr(SStreamTask* pTask) { - SStreamMeta* pMeta = pTask->pMeta; +int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, bool metaLock) { + SStreamMeta* pMeta = pTask->pMeta; + STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId}; if (pTask->info.fillHistory == 0) { - return TSDB_CODE_SUCCESS; + return 0; } - STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId}; - SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId)); + if (metaLock) { + streamMetaWLock(pTask->pMeta); + } + SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId)); if (ppStreamTask != NULL) { + taosThreadMutexLock(&(*ppStreamTask)->lock); CLEAR_RELATED_FILLHISTORY_TASK((*ppStreamTask)); streamMetaSaveTask(pMeta, *ppStreamTask); + taosThreadMutexUnlock(&(*ppStreamTask)->lock); + stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr, (int32_t)sTaskId.taskId); } + if (metaLock) { + streamMetaWUnLock(pTask->pMeta); + } + return TSDB_CODE_SUCCESS; } diff --git a/tests/system-test/8-stream/scalar_function.py b/tests/system-test/8-stream/scalar_function.py index eda643f661..90257df252 100644 --- a/tests/system-test/8-stream/scalar_function.py +++ b/tests/system-test/8-stream/scalar_function.py @@ -6,8 +6,8 @@ from util.cases import * from util.common import * class TDTestCase: - updatecfgDict = {'vdebugFlag': 143, 'qdebugflag':135, 'tqdebugflag':135, 'udebugflag':135, 'rpcdebugflag':135, - 'asynclog': 0, 'stdebugflag':135} + updatecfgDict = {'debugFlag':0, 'vdebugFlag': 143, 'qdebugflag':135, 'tqdebugflag':135, 'udebugflag':135, 'rpcdebugflag':135, + 'asynclog': 0, 'stdebugflag':143} def init(self, conn, logSql, replicaVar=1): self.replicaVar = int(replicaVar) tdLog.debug("start to execute %s" % __file__)