From fb8867f403cddc972887173252d86a19b6011b1d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 29 Aug 2024 19:55:09 +0800 Subject: [PATCH 1/9] refactor: remove assert. --- source/dnode/vnode/src/tq/tqSink.c | 8 +- source/dnode/vnode/src/tq/tqStreamTask.c | 6 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 27 ++++--- source/dnode/vnode/src/tsdb/tsdbRead2.c | 2 +- source/libs/executor/src/scanoperator.c | 6 +- source/libs/stream/src/streamDispatch.c | 65 +++++++++++------ source/libs/stream/src/streamExec.c | 81 ++++++++++++++------- source/libs/stream/src/streamMeta.c | 30 +++++--- source/libs/stream/src/streamQueue.c | 16 ++-- source/libs/stream/src/streamStartHistory.c | 47 ++++++++---- source/libs/stream/src/streamTask.c | 18 ++--- source/libs/stream/src/streamTaskSm.c | 1 - 12 files changed, 202 insertions(+), 105 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 2e22b7524b..9602e2bb9c 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -342,7 +342,10 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S // todo remove this void* pGpIdData = colDataGetData(pGpIdColInfo, rowId); - ASSERT(gid == *(int64_t*)pGpIdData); + if (gid != *(int64_t*)pGpIdData) { + tqError("s-task:%s vgId:%d invalid groupId:%d" PRId64 ":%" PRId64 " in sink task", id, vgId, gid, + *(int64_t*)pGpIdData); + } } code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid, @@ -747,7 +750,6 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat return code; } - ASSERT(pRow); void* p = taosArrayPush(pTableData->aRowP, &pRow); if (p == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -779,8 +781,6 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid); if (isValid) { // not valid table, ignore it tqDebug("s-task:%s set uid:%" PRIu64 " for dstTable:%s from meta", id, mr.me.uid, pTableSinkInfo->name.data); - ASSERT(terrno == 0); - // set the destination table uid (*uid) = mr.me.uid; pTableSinkInfo->uid = mr.me.uid; diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 4b206fc04f..b3535787cb 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -46,7 +46,10 @@ int32_t tqScanWal(STQ* pTq) { streamMetaWLock(pMeta); int32_t times = (--pMeta->scanInfo.scanCounter); - ASSERT(pMeta->scanInfo.scanCounter >= 0); + if (times < 0) { + tqError("vgId:%d invalid scan counter:%d, reset to 0", vgId); + times = 0; + } numOfTasks = taosArrayGetSize(pMeta->pTaskList); streamMetaWUnLock(pMeta); @@ -263,7 +266,6 @@ bool taskReadyForDataFromWal(SStreamTask* pTask) { // fill-history task has entered into the last phase, no need to anything if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) { - ASSERT(pState.state == TASK_STATUS__READY); // the maximum version of data in the WAL has reached already, the step2 is done tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr, pTask->dataRange.range.maxVer); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index a69ad80ee8..975fab3c98 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -339,13 +339,15 @@ int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); if (pRspHead == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("s-task:0x%x send dispatch error rsp, out of memory", req.taskId); - return -1; + return terrno; } pRspHead->vgId = htonl(req.upstreamNodeId); - ASSERT(pRspHead->vgId != 0); + if(pRspHead->vgId == 0) { + tqError("vgId:%d invalid dispatch msg from upstream to task:0x%x", pMeta->vgId, req.taskId); + return TSDB_CODE_INVALID_MSG; + } SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead)); pRsp->streamId = htobe64(req.streamId); @@ -926,7 +928,6 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) { streamTaskSetStatusReady(pTask); } else if (pState.state == TASK_STATUS__UNINIT) { // tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr); - // ASSERT(pTask->status.downstreamReady == 0); // tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId); tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState.name); } else { @@ -1000,7 +1001,10 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) TSDB_CODE_ACTION_IN_PROGRESS); } } else { // upstream not recv the checkpoint-source/trigger till now - ASSERT(pState.state == TASK_STATUS__READY || pState.state == TASK_STATUS__HALT); + if (!(pState.state == TASK_STATUS__READY || pState.state == TASK_STATUS__HALT)) { + tqFatal("s-task:%s invalid task status:%s", pTask->id.idStr, pState.name); + } + tqWarn( "s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all " "upstream sending checkpoint-source/trigger", @@ -1109,9 +1113,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t } else { code = streamTrySchedExec(pTask); } - } /*else { - ASSERT(status != TASK_STATUS__UNINIT); - }*/ + } streamMetaReleaseTask(pMeta, pTask); return code; @@ -1236,7 +1238,14 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { pTask->id.idStr, vgId, pTask->chkInfo.checkpointId, req.checkpointId); streamMutexLock(&pTask->lock); - ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId); + if (pTask->chkInfo.checkpointId < req.checkpointId) { + tqFatal("s-task:%s vgId:%d invalid consensus-checkpointId:%" PRId64 ", greater than existed checkpointId:%"PRId64, + pTask->id.idStr, vgId, req.checkpointId, pTask->chkInfo.checkpointId); + + streamMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_STREAM_INTERNAL_ERROR; + } if (pTask->status.consenChkptInfo.consenChkptTransId >= req.transId) { tqDebug("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId, diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 0e7607425d..6c504e4ff5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -4809,7 +4809,7 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi return code; _err: - tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr); + tsdbError("failed to create data reader, error at:%d code:%s %s", tstrerror(code), lino, idstr); tsdbReaderClose2(*ppReader); *ppReader = NULL; // reset the pointer value. return code; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 557794a062..50b27ee884 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -749,7 +749,7 @@ _end: freeTableCachedValObj(&val); } } - + if (freeReader) { pHandle->api.metaReaderFn.clearReader(&mr); } @@ -5548,7 +5548,7 @@ static int32_t getBlockForTableMergeScan(void* param, SSDataBlock** ppBlock) { pOperator->resultInfo.totalRows += pBlock->info.rows; pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; *ppBlock = pBlock; - + return code; } @@ -5703,7 +5703,7 @@ void startGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t numOfTables = 0; code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables); QUERY_CHECK_CODE(code, lino, _end); - + int32_t i = pInfo->tableStartIndex + 1; for (; i < numOfTables; ++i) { STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 59210fe99f..c8e9064204 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -306,7 +306,12 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD int32_t code = 0; int64_t now = taosGetTimestampMs(); int32_t numOfBlocks = taosArrayGetSize(pData->blocks); - ASSERT(numOfBlocks != 0 && pTask->msgInfo.pData == NULL); + + if (!(numOfBlocks != 0 && pTask->msgInfo.pData == NULL)) { + stError("s-task:0x%x dispatch block number:%d, exist not rsp dispatch msg:%p, abort build new dispatch msg", + pTask->id.idStr, numOfBlocks, pTask->msgInfo.pData); + return TSDB_CODE_STREAM_INTERNAL_ERROR; + } pTask->msgInfo.dispatchMsgType = pData->type; @@ -708,14 +713,15 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S } streamMutexUnlock(&pTask->msgInfo.lock); - ASSERT(found); - return 0; + if (!found) { + stError("s-task:%s not found req hash value:%u", pTask->id.idStr, hashValue); + return TSDB_CODE_STREAM_INTERNAL_ERROR; + } else { + return 0; + } } int32_t streamDispatchStreamBlock(SStreamTask* pTask) { - ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || - pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH)); - const char* id = pTask->id.idStr; int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputq.queue); if (numOfElems > 0) { @@ -739,8 +745,11 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { return 0; } - ASSERT(pTask->msgInfo.pData == NULL); - stDebug("s-task:%s start to dispatch msg, set output status:%d", id, pTask->outputq.status); + if (pTask->msgInfo.pData != NULL) { + stFatal("s-task:%s not rsp data:%p exist, should not dispatch msg now", id, pTask->msgInfo.pData); + } else { + stDebug("s-task:%s start to dispatch msg, set output status:%d", id, pTask->outputq.status); + } SStreamDataBlock* pBlock = NULL; streamQueueNextItem(pTask->outputq.queue, (SStreamQueueItem**)&pBlock); @@ -751,8 +760,11 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { } int32_t type = pBlock->type; - ASSERT(type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__CHECKPOINT_TRIGGER || - type == STREAM_INPUT__TRANS_STATE); + if (!(type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__CHECKPOINT_TRIGGER || + type == STREAM_INPUT__TRANS_STATE)) { + stError("s-task:%s invalid dispatch block type:%d", id, type); + return TSDB_CODE_INTERNAL_ERROR; + } pTask->execInfo.dispatch += 1; @@ -878,7 +890,6 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { SArray* pList = pActiveInfo->pReadyMsgList; int32_t num = taosArrayGetSize(pList); - if (pTmrInfo->launchChkptId != pActiveInfo->activeId) { streamMutexUnlock(&pActiveInfo->lock); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); @@ -902,7 +913,15 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { SArray* pNotRspList = taosArrayInit(4, sizeof(int32_t)); - ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) == num); + if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) { + streamMutexUnlock(&pActiveInfo->lock); + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stWarn("s-task:%s vgId:%d upstream number:%d not equals sent readyMsg:%d, quit from readyMsg send tmr, ref:%d", id, + vgId, (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), num, ref); + + streamMetaReleaseTask(pTask->pMeta, pTask); + return; + } for (int32_t i = 0; i < num; ++i) { STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, i); @@ -984,7 +1003,11 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { streamMutexLock(&pActiveInfo->lock); int32_t num = taosArrayGetSize(pList); - ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) == num); + if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) { + stError("s-task:%s invalid number of sent readyMsg:%d to upstream:%d", id, num, + (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList)); + return TSDB_CODE_STREAM_INTERNAL_ERROR; + } for (int32_t i = 0; i < num; ++i) { STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, i); @@ -1064,9 +1087,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) { int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) { int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN; - ASSERT(dataStrLen > 0); - - void* buf = taosMemoryCalloc(1, dataStrLen); + void* buf = taosMemoryCalloc(1, dataStrLen); if (buf == NULL) { return terrno; } @@ -1205,8 +1226,6 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa int32_t size = taosArrayGetSize(pActiveInfo->pReadyMsgList); if (size > 0) { - ASSERT(size == 1); - STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, 0); if (pReady == NULL) { return terrno; @@ -1267,7 +1286,6 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, stDebug("s-task:%s recv checkpoint-trigger from all upstream, continue", pTask->id.idStr); pActiveInfo->allUpstreamTriggerRecv = 1; } else { - ASSERT(numOfRecv <= total); stDebug("s-task:%s %d/%d checkpoint-trigger recv", pTask->id.idStr, numOfRecv, total); } @@ -1516,7 +1534,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i if (pMsgInfo->dispatchMsgType == STREAM_INPUT__TRANS_STATE) { stDebug("s-task:%s dispatch trans-state msgId:%d to downstream successfully, start to prepare transfer state", id, msgId); - ASSERT(pTask->info.fillHistory == 1); + if (pTask->info.fillHistory != 1) { + stFatal("s-task:%s unexpected dispatch rsp, not scan-history task, not recv this dispatch rsp", id); + } code = streamTransferStatePrepare(pTask); if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens @@ -1542,7 +1562,10 @@ static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchR } ((SMsgHead*)(*pBuf))->vgId = htonl(pReq->upstreamNodeId); - ASSERT(((SMsgHead*)(*pBuf))->vgId != 0); + + if (((SMsgHead*)(*pBuf))->vgId == 0) { + return TSDB_CODE_INVALID_MSG; + } SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT((*pBuf), sizeof(SMsgHead)); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 7d9a86538e..e18cbb97a2 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -45,7 +45,11 @@ static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBl pTask->outputInfo.smaSink.smaSink(pTask->outputInfo.smaSink.vnode, pTask->outputInfo.smaSink.smaId, pBlock->blocks); destroyStreamDataBlock(pBlock); } else { - ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH); + if (type != TASK_OUTPUT__FIXED_DISPATCH && type != TASK_OUTPUT__SHUFFLE_DISPATCH) { + stError("s-task:%s invalid stream output type:%d, internal error", pTask->id.idStr, type); + return TSDB_CODE_STREAM_INTERNAL_ERROR; + } + code = streamTaskPutDataIntoOutputQ(pTask, pBlock); if (code != TSDB_CODE_SUCCESS) { destroyStreamDataBlock(pBlock); @@ -127,7 +131,11 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) { SSDataBlock block = {0}; const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem; - ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1); + int32_t num = taosArrayGetSize(pRetrieveBlock->blocks); + if (num != 1) { + stError("s-task:%s invalid retrieve block, ignore", pTask->id.idStr, num); + continue; + } (void)assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0)); block.info.type = STREAM_PULL_OVER; @@ -178,7 +186,6 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to // current output should be dispatched to down stream nodes if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) { - ASSERT(numOfBlocks == taosArrayGetSize(pRes)); code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks); // todo: here we need continue retry to put it into output buffer if (code != TSDB_CODE_SUCCESS) { @@ -192,7 +199,6 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to } if (numOfBlocks > 0) { - ASSERT(numOfBlocks == taosArrayGetSize(pRes)); code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks); } else { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); @@ -277,7 +283,10 @@ static SScanhistoryDataInfo buildScanhistoryExecRet(EScanHistoryCode code, int32 } SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { - ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); + if(pTask->info.taskLevel != TASK_LEVEL__SOURCE) { + stError("s-task:%s not source scan-history task, not exec, quit", pTask->id.idStr); + return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0); + } void* exec = pTask->exec.pExecutor; bool finished = false; @@ -374,10 +383,16 @@ int32_t streamTransferStateDoPrepare(SStreamTask* pTask) { // It must be halted for a source stream task, since when the related scan-history-data task start scan the history // for the step 2. if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { - ASSERT(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP); + if (!(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP)) { + stError("s-task:%s invalid task status:%d", id, status); + return TSDB_CODE_STREAM_INTERNAL_ERROR; + } } else { - ASSERT(status == TASK_STATUS__READY || status == TASK_STATUS__PAUSE || status == TASK_STATUS__DROPPING || - status == TASK_STATUS__STOP); + if (!(status == TASK_STATUS__READY || status == TASK_STATUS__PAUSE || status == TASK_STATUS__DROPPING || + status == TASK_STATUS__STOP)) { + stError("s-task:%s invalid task status:%d", id, status); + return TSDB_CODE_STREAM_INTERNAL_ERROR; + } code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s halt stream task:%s failed, code:%s not transfer state to stream task", id, @@ -438,7 +453,10 @@ int32_t streamTransferStatePrepare(SStreamTask* pTask) { int32_t code = TSDB_CODE_SUCCESS; SStreamMeta* pMeta = pTask->pMeta; - ASSERT(pTask->status.appendTranstateBlock == 1); + if (pTask->status.appendTranstateBlock != 1) { + stError("s-task:%s not set appendTransBlock flag, internal error", pTask->id.idStr); + return TSDB_CODE_STREAM_INTERNAL_ERROR; + } int32_t level = pTask->info.taskLevel; if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states. @@ -476,14 +494,16 @@ static int32_t doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int code = qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK); } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { - ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput; code = qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); stDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit, pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); - ASSERT((*pVer) <= pSubmit->submit.ver); - (*pVer) = pSubmit->submit.ver; - + if ((*pVer) > pSubmit->submit.ver) { + stError("s-task:%s invalid recorded ver:%" PRId64 " greater than new block ver:%" PRId64 ", not update", id, + *pVer, pSubmit->submit.ver); + } else { + (*pVer) = pSubmit->submit.ver; + } } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput; @@ -500,8 +520,13 @@ static int32_t doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int stDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask, numOfBlocks, pMerged->ver); code = qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); - ASSERT((*pVer) <= pMerged->ver); - (*pVer) = pMerged->ver; + + if ((*pVer) > pMerged->ver) { + stError("s-task:%s invalid recorded ver:%" PRId64 " greater than new block ver:%" PRId64 ", not update", id, + *pVer, pMerged->ver); + } else { + (*pVer) = pMerged->ver; + } } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput; @@ -512,7 +537,8 @@ static int32_t doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int code = qSetMultiStreamInput(pExecutor, pCheckpoint->blocks, 1, pItem->type); } else { - ASSERT(0); + stError("s-task:%s invalid input block type:%d, discard", id, pItem->type); + code = TSDB_CODE_STREAM_INTERNAL_ERROR; } return code; @@ -542,7 +568,6 @@ void streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) stDebug("s-task:%s add transfer-state block into outputQ", id); } else { stDebug("s-task:%s all upstream tasks send transfer-state block, add transfer-state block into outputQ", id); - ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1); } // agg task should dispatch trans-state msg to sink task, to flush all data to sink task. @@ -560,7 +585,6 @@ void streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) } else { // non-dispatch task, do task state transfer directly streamFreeQitem((SStreamQueueItem*)pBlock); stDebug("s-task:%s non-dispatch task, level:%d start to transfer state directly", id, level); - ASSERT(pTask->info.fillHistory == 1); code = streamTransferStatePrepare(pTask); if (code != TSDB_CODE_SUCCESS) { @@ -606,7 +630,11 @@ static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, i } // update the currentVer if processing the submit blocks. - ASSERT(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer); + if (!(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer)) { + stError("s-task:%s invalid info, checkpointVer:%" PRId64 ", nextProcessVer:%" PRId64 " currentVer:%" PRId64, id, + pInfo->checkpointVer, pInfo->nextProcessVer, ver); + return; + } if (ver != pInfo->processedVer) { stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64 @@ -622,8 +650,6 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB // 1. transfer the ownership of executor state bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); if (dropRelHTask) { - ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask)); - STaskId* pHTaskId = &pTask->hTaskInfo.id; SStreamTask* pHTask = NULL; int32_t code = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId, &pHTask); @@ -692,12 +718,10 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { EExtractDataCode ret = streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize); if (ret == EXEC_AFTER_IDLE) { - ASSERT(pInput == NULL && numOfBlocks == 0); streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL); return 0; } else { if (pInput == NULL) { - ASSERT(numOfBlocks == 0); return 0; } } @@ -718,7 +742,10 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { } if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - ASSERT(type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__CHECKPOINT); + if (type != STREAM_INPUT__DATA_BLOCK && type != STREAM_INPUT__CHECKPOINT) { + stError("s-task:%s invalid block type:%d for sink task, discard", id, type); + continue; + } int64_t st = taosGetTimestampMs(); @@ -801,7 +828,11 @@ bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) { } void streamResumeTask(SStreamTask* pTask) { - ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__ACTIVE); + if (pTask->status.schedStatus != TASK_SCHED_STATUS__ACTIVE) { + stError("s-task:%s invalid sched status:%d, not resume task", pTask->id.idStr, pTask->status.schedStatus); + return; + } + const char* id = pTask->id.idStr; while (1) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index d6f1559578..16f5b98ef7 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -719,9 +719,13 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa } int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) { - size_t size = taosHashGetSize(pMeta->pTasksMap); - ASSERT(taosArrayGetSize(pMeta->pTaskList) == taosHashGetSize(pMeta->pTasksMap)); - return (int32_t)size; + int32_t size = (int32_t)taosHashGetSize(pMeta->pTasksMap); + int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList); + if (sizeInList != size) { + stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", pMeta->vgId, sizeInList, size); + } + + return size; } int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask) { @@ -775,7 +779,10 @@ static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id break; } } - ASSERT(remove); + + if (!remove) { + stError("s-task:0x%x not in meta task list, internal error", id->taskId); + } } static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) { @@ -849,10 +856,18 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id); (void)streamMetaRemoveTask(pMeta, &id); - ASSERT(taosHashGetSize(pMeta->pTasksMap) == taosArrayGetSize(pMeta->pTaskList)); + int32_t size = (int32_t) taosHashGetSize(pMeta->pTasksMap); + int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList); + if (sizeInList != size) { + stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", pMeta->vgId, sizeInList, size); + } streamMetaWUnLock(pMeta); - ASSERT(pTask->status.timerActive == 0); + int32_t numOfTmr = pTask->status.timerActive; + if (numOfTmr != 0) { + stError("s-task:0x%x vgId:%d invalid timer Active record:%d, internal error", pTask->id.idStr, numOfTmr); + } + if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) { stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt); (void)taosTmrStop(pTask->schedInfo.pDelayTimer); @@ -1056,8 +1071,6 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (streamTaskShouldPause(pTask)) { (void)atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1); } - - ASSERT(pTask->status.downstreamReady == 0); } tdbFree(pKey); @@ -1075,7 +1088,6 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) { } int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); - ASSERT(pMeta->numOfStreamTasks <= numOfTasks && pMeta->numOfPausedTasks <= numOfTasks); stDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks, pMeta->numOfStreamTasks, pMeta->numOfPausedTasks); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 3703ed07aa..eca728b2d5 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -90,7 +90,6 @@ void streamQueueNextItem(SStreamQueue* pQueue, SStreamQueueItem** pItem) { int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING); if (flag == STREAM_QUEUE__FAILED) { - ASSERT(pQueue->qItem != NULL); *pItem = streamQueueCurItem(pQueue); } else { pQueue->qItem = NULL; @@ -105,13 +104,20 @@ void streamQueueNextItem(SStreamQueue* pQueue, SStreamQueueItem** pItem) { } void streamQueueProcessSuccess(SStreamQueue* queue) { - ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING); + if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) { + stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING); + return; + } + queue->qItem = NULL; atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS); } void streamQueueProcessFail(SStreamQueue* queue) { - ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING); + if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) { + stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING); + return; + } atomic_store_8(&queue->status, STREAM_QUEUE__FAILED); } @@ -229,7 +235,6 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte } } else { if (*pInput == NULL) { - ASSERT((*numOfBlocks) == 0); *pInput = qItem; } else { // merge current block failed, let's handle the already merged blocks. void* newRet = NULL; @@ -340,7 +345,8 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); stDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); } else { - ASSERT(0); + stError("s-task:%s invalid type:%d to put in inputQ", pTask->id.idStr, type); + return TSDB_CODE_INVALID_PARA; } if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && type != STREAM_INPUT__CHECKPOINT_TRIGGER && diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index e68c088b9a..e4d6e934d1 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -111,11 +111,15 @@ void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) { } int32_t streamTaskStartScanHistory(SStreamTask* pTask) { - int32_t level = pTask->info.taskLevel; - ETaskStatus status = streamTaskGetStatus(pTask).state; + int32_t level = pTask->info.taskLevel; + SStreamTaskState state = streamTaskGetStatus(pTask); - ASSERT((pTask->status.downstreamReady == 1) && (status == TASK_STATUS__SCAN_HISTORY) && - (pTask->info.fillHistory == 1)); + if (((pTask->status.downstreamReady != 1) || (state.state != TASK_STATUS__SCAN_HISTORY) || + (pTask->info.fillHistory != 1))) { + stFatal("s-task:%s invalid status to start fill-history task, downReady:%d, status:%s, is-fill-history task:%d", + pTask->id.idStr, state.name, pTask->info.fillHistory); + return TSDB_CODE_STREAM_INTERNAL_ERROR; + } if (level == TASK_LEVEL__SOURCE) { return doStartScanHistoryTask(pTask); @@ -144,7 +148,6 @@ int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) { } SStreamTaskState p = streamTaskGetStatus(pTask); - ASSERT(p.state == TASK_STATUS__READY); int8_t schedStatus = pTask->status.schedStatus; if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { @@ -171,8 +174,6 @@ int32_t streamTaskOnScanHistoryTaskReady(SStreamTask* pTask) { if (code == 0) { SStreamTaskState p = streamTaskGetStatus(pTask); - ASSERT((p.state == TASK_STATUS__SCAN_HISTORY) && (pTask->info.fillHistory == 1)); - stDebug("s-task:%s fill-history task enters into scan-history data stage, status:%s", pTask->id.idStr, p.name); code = streamTaskStartScanHistory(pTask); } @@ -348,8 +349,6 @@ void tryLaunchHistoryTask(void* param, void* tmrId) { } if (streamTaskShouldStop(*ppTask)) { - ASSERT((*ppTask)->status.timerActive >= 1); - char* p = streamTaskGetStatus(*ppTask).name; int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1); stDebug("s-task:%s status:%s should stop, quit launch fill-history task timer, retry:%d, ref:%d", @@ -385,7 +384,10 @@ void tryLaunchHistoryTask(void* param, void* tmrId) { notRetryLaunchFillHistoryTask(pTask, pInfo, now); } else { // not reach the limitation yet, let's continue retrying launch related fill-history task. streamTaskSetRetryInfoForLaunch(pHTaskInfo); - ASSERT(pTask->status.timerActive >= 1); + if (pTask->status.timerActive < 1) { + stError("s-task:%s invalid timerActive recorder:%d, abort timer", pTask->id.idStr, pTask->status.timerActive); + return; + } // abort the timer if intend to stop task SStreamTask* pHTask = NULL; @@ -451,8 +453,6 @@ int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) { int32_t hTaskId = pTask->hTaskInfo.id.taskId; SLaunchHTaskInfo* pInfo = NULL; - ASSERT(hTaskId != 0); - stWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since not built yet", idStr, pMeta->vgId, hTaskId); STaskId id = streamTaskGetTaskId(pTask); @@ -480,11 +480,18 @@ int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) { return terrno; } - ASSERT(ref >= 1); + if (ref < 1) { + stError("s-task:%s invalid timerActive recorder:%d, abort timer", pTask->id.idStr, pTask->status.timerActive); + return TSDB_CODE_STREAM_INTERNAL_ERROR; + } stDebug("s-task:%s set timer active flag, ref:%d", idStr, ref); } else { // timer exists - ASSERT(pTask->status.timerActive >= 1); + if (pTask->status.timerActive < 1) { + stError("s-task:%s invalid timerActive recorder:%d, abort timer", pTask->id.idStr, pTask->status.timerActive); + return TSDB_CODE_STREAM_INTERNAL_ERROR; + } + stDebug("s-task:%s set timer active flag, task timer not null", idStr); streamTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr"); @@ -500,7 +507,11 @@ int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask) { bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t nextProcessVer) { SVersionRange* pRange = &pTask->dataRange.range; - ASSERT(nextProcessVer >= pRange->maxVer); + if (nextProcessVer < pRange->maxVer) { + stError("s-task:%s next processdVer:%"PRId64" is less than range max ver:%"PRId64, pTask->id.idStr, nextProcessVer, + pRange->maxVer); + return true; + } // maxVer for fill-history task is the version, where the last timestamp is acquired. // it's also the maximum version to scan data in tsdb. @@ -538,7 +549,11 @@ int32_t streamTaskSetRangeStreamCalc(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } else { - ASSERT(pTask->info.fillHistory == 0); + if (pTask->info.fillHistory != 0) { + stError("s-task:%s task should not be fill-history task, internal error", pTask->id.idStr); + return TSDB_CODE_STREAM_INTERNAL_ERROR; + } + if (pTask->info.taskLevel >= TASK_LEVEL__AGG) { return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index afe315bf58..bdcb00d240 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -134,8 +134,9 @@ int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool return code; } - if (fillHistory) { - ASSERT(hasFillhistory); + if (fillHistory && !hasFillhistory) { + stError("s-task:0x%x create task failed, due to inconsistent fill-history flag", pTask->id.taskId); + return TSDB_CODE_INVALID_PARA; } epsetAssign(&(pTask->info.mnodeEpset), pEpset); @@ -723,8 +724,11 @@ void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) { if ((pInfo != NULL) && pInfo->dataAllowed) { pInfo->dataAllowed = false; - int32_t t = atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1); - ASSERT(t <= streamTaskGetNumOfUpstream(pTask)); + if (pTask->upstreamInfo.numOfClosed < streamTaskGetNumOfUpstream(pTask)) { + int32_t t = atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1); + } else { + stError("s-task:%s not inc closed input, since they have been all closed already", pTask->id.idStr); + } } } @@ -734,7 +738,7 @@ void streamTaskOpenUpstreamInput(SStreamTask* pTask, int32_t taskId) { if (pInfo != NULL && (!pInfo->dataAllowed)) { int32_t t = atomic_sub_fetch_32(&pTask->upstreamInfo.numOfClosed, 1); - ASSERT(t >= 0); + stDebug("s-task:%s open inputQ for upstream:0x%x, remain closed:%d", pTask->id.idStr, taskId, t); pInfo->dataAllowed = true; } } @@ -770,8 +774,6 @@ int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask) { int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) { streamMutexLock(&pTask->lock); int8_t status = pTask->status.schedStatus; - ASSERT(status == TASK_SCHED_STATUS__WAITING || status == TASK_SCHED_STATUS__ACTIVE || - status == TASK_SCHED_STATUS__INACTIVE); pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; streamMutexUnlock(&pTask->lock); @@ -887,8 +889,6 @@ void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo) { } void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo) { - ASSERT(pInfo->tickCount == 0); - pInfo->waitInterval *= RETRY_LAUNCH_INTERVAL_INC_RATE; pInfo->tickCount = ceil(pInfo->waitInterval / WAIT_FOR_MINIMAL_INTERVAL); pInfo->retryTimes += 1; diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index cbaccff01d..ae5d8debb1 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -194,7 +194,6 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName, (void) taosArrayPop(pSM->pWaitingEventList); STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM->current.state, pEvtInfo->event); - ASSERT(pSM->pActiveTrans == NULL && pNextTrans != NULL); pSM->pActiveTrans = pNextTrans; pSM->startTs = taosGetTimestampMs(); From da1bdd689f90d889998ad7663ecafb1841a6a97f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 30 Aug 2024 09:06:45 +0800 Subject: [PATCH 2/9] fix(stream): fix syntax error. --- source/libs/stream/src/streamExec.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e18cbb97a2..fca0bf403f 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -133,7 +133,7 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem; int32_t num = taosArrayGetSize(pRetrieveBlock->blocks); if (num != 1) { - stError("s-task:%s invalid retrieve block, ignore", pTask->id.idStr, num); + stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num); continue; } From 52bb64961593741fe5472bf4b0229cbf6960c504 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 30 Aug 2024 09:08:15 +0800 Subject: [PATCH 3/9] fix(stream): fix syntax error. --- source/libs/stream/src/streamStartHistory.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index e4d6e934d1..a8c76d3f52 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -116,8 +116,8 @@ int32_t streamTaskStartScanHistory(SStreamTask* pTask) { if (((pTask->status.downstreamReady != 1) || (state.state != TASK_STATUS__SCAN_HISTORY) || (pTask->info.fillHistory != 1))) { - stFatal("s-task:%s invalid status to start fill-history task, downReady:%d, status:%s, is-fill-history task:%d", - pTask->id.idStr, state.name, pTask->info.fillHistory); + stFatal("s-task:%s invalid status:%s to start fill-history task, downReady:%d, is-fill-history task:%d", + pTask->id.idStr, state.name, pTask->status.downstreamReady, pTask->info.fillHistory); return TSDB_CODE_STREAM_INTERNAL_ERROR; } From 5a3427e91206119086e692d948c9381e76106672 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 30 Aug 2024 09:09:15 +0800 Subject: [PATCH 4/9] fix(stream): fix syntax error. --- source/libs/stream/src/streamDispatch.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index c8e9064204..d2cf17bff3 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -308,7 +308,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD int32_t numOfBlocks = taosArrayGetSize(pData->blocks); if (!(numOfBlocks != 0 && pTask->msgInfo.pData == NULL)) { - stError("s-task:0x%x dispatch block number:%d, exist not rsp dispatch msg:%p, abort build new dispatch msg", + stError("s-task:%s dispatch block number:%d, exist not rsp dispatch msg:%p, abort build new dispatch msg", pTask->id.idStr, numOfBlocks, pTask->msgInfo.pData); return TSDB_CODE_STREAM_INTERNAL_ERROR; } From eb42d47d96011e15e35824c34434bd79130a3311 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 30 Aug 2024 09:09:55 +0800 Subject: [PATCH 5/9] fix(stream): fix syntax error. --- source/libs/stream/src/streamMeta.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 16f5b98ef7..d8fe5fe6b8 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -865,7 +865,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t int32_t numOfTmr = pTask->status.timerActive; if (numOfTmr != 0) { - stError("s-task:0x%x vgId:%d invalid timer Active record:%d, internal error", pTask->id.idStr, numOfTmr); + stError("s-task:%s vgId:%d invalid timer Active record:%d, internal error", pTask->id.idStr, numOfTmr); } if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) { From af259945b6ae673de1f7a72da21c6248ac0b3323 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 30 Aug 2024 09:20:51 +0800 Subject: [PATCH 6/9] fix(stream): fix syntax error. --- source/libs/stream/src/streamMeta.c | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index d8fe5fe6b8..db07e214ad 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -794,6 +794,7 @@ static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) { int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { SStreamTask* pTask = NULL; + int32_t vgId = pMeta->vgId; // pre-delete operation streamMetaWLock(pMeta); @@ -806,19 +807,19 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t // desc the paused task counter if (streamTaskShouldPause(pTask)) { int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1); - stInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); + stInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", vgId, pTask->id.idStr, num); } // handle the dropping event (void)streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL); } else { - stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId); + stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", vgId, taskId); streamMetaWUnLock(pMeta); return 0; } streamMetaWUnLock(pMeta); - stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, pMeta->vgId); + stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, vgId); while (1) { int32_t timerActive = 0; @@ -859,13 +860,13 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t int32_t size = (int32_t) taosHashGetSize(pMeta->pTasksMap); int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList); if (sizeInList != size) { - stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", pMeta->vgId, sizeInList, size); + stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", vgId, sizeInList, size); } streamMetaWUnLock(pMeta); int32_t numOfTmr = pTask->status.timerActive; if (numOfTmr != 0) { - stError("s-task:%s vgId:%d invalid timer Active record:%d, internal error", pTask->id.idStr, numOfTmr); + stError("s-task:%s vgId:%d invalid timer Active record:%d, internal error", pTask->id.idStr, vgId, numOfTmr); } if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) { @@ -877,7 +878,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t streamMetaReleaseTask(pMeta, pTask); } else { - stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId); + stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", vgId, taskId); streamMetaWUnLock(pMeta); } From 9fc493f221e876c92273bdaf69ff9ef41455bdf3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 30 Aug 2024 09:35:08 +0800 Subject: [PATCH 7/9] fix(stream): fix syntax error. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 6c504e4ff5..febbf96768 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -4809,7 +4809,7 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi return code; _err: - tsdbError("failed to create data reader, error at:%d code:%s %s", tstrerror(code), lino, idstr); + tsdbError("failed to create data reader, error at:%d code:%s %s", lino, tstrerror(code), idstr); tsdbReaderClose2(*ppReader); *ppReader = NULL; // reset the pointer value. return code; From bb90755642af02922c192c00975a07db256e460e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 30 Aug 2024 09:36:16 +0800 Subject: [PATCH 8/9] fix(stream): fix syntax error. --- source/dnode/vnode/src/tq/tqSink.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 9602e2bb9c..c42d5e7de5 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -343,7 +343,7 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S // todo remove this void* pGpIdData = colDataGetData(pGpIdColInfo, rowId); if (gid != *(int64_t*)pGpIdData) { - tqError("s-task:%s vgId:%d invalid groupId:%d" PRId64 ":%" PRId64 " in sink task", id, vgId, gid, + tqError("s-task:%s vgId:%d invalid groupId:%" PRId64 " actual:%" PRId64 " in sink task", id, vgId, gid, *(int64_t*)pGpIdData); } } From 9aaed8631f567e62202de137f73ae534ddfbbbda Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 30 Aug 2024 09:57:04 +0800 Subject: [PATCH 9/9] fix(stream): fix syntax error. --- source/dnode/vnode/src/tq/tqStreamTask.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index b3535787cb..4c8d8079bf 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -47,7 +47,7 @@ int32_t tqScanWal(STQ* pTq) { streamMetaWLock(pMeta); int32_t times = (--pMeta->scanInfo.scanCounter); if (times < 0) { - tqError("vgId:%d invalid scan counter:%d, reset to 0", vgId); + tqError("vgId:%d invalid scan counter:%d, reset to 0", vgId, times); times = 0; }