diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 3b6ed7143a..a0898fa94b 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -965,6 +965,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_STREAM_INVALID_STATETRANS TAOS_DEF_ERROR_CODE(0, 0x4103) #define TSDB_CODE_STREAM_TASK_IVLD_STATUS TAOS_DEF_ERROR_CODE(0, 0x4104) #define TSDB_CODE_STREAM_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x4105) +#define TSDB_CODE_STREAM_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x4106) // TDLite #define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100) diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index f1a6ef8414..901632894e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -901,7 +901,10 @@ int32_t tLDataIterNextRow(SLDataIter *pIter, const char *idStr, bool* hasNext) { pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow); _exit: - tsdbError("failed to exec stt-file nextIter, lino:%d, code:%s, %s", lino, tstrerror(code), idStr); + if (code) { + tsdbError("failed to exec stt-file nextIter, lino:%d, code:%s, %s", lino, tstrerror(code), idStr); + } + *hasNext = (code == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL); return code; } diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 2de86b8794..b7fb48ddc6 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -97,8 +97,6 @@ void streamTaskSendCheckMsg(SStreamTask* pTask) { .stage = pTask->pMeta->stage, }; - ASSERT(pTask->status.downstreamReady == 0); - // serialize streamProcessScanHistoryFinishRsp if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { streamTaskStartMonitorCheckRsp(pTask); @@ -187,8 +185,6 @@ void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SS } int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { - ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); - int64_t now = taosGetTimestampMs(); const char* id = pTask->id.idStr; STaskCheckInfo* pInfo = &pTask->taskCheckInfo; @@ -200,6 +196,11 @@ int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* return TSDB_CODE_SUCCESS; } + if (pTask->id.taskId != pRsp->upstreamTaskId) { + stError("s-task:%s invalid check downstream rsp, upstream task:0x%x discard", id, pRsp->upstreamTaskId); + return TSDB_CODE_INVALID_MSG; + } + if (pRsp->status == TASK_DOWNSTREAM_READY) { int32_t code = streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id); if (code != TSDB_CODE_SUCCESS) { @@ -235,7 +236,6 @@ int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* streamMetaAddFailedTaskSelf(pTask, now); } else { // TASK_DOWNSTREAM_NOT_READY, rsp-check monitor will retry in 300 ms - ASSERT(left > 0); stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id, pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left); } @@ -315,8 +315,6 @@ void streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) { } void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) { - ASSERT(pInfo->inCheckProcess == 0); - taosArrayDestroy(pInfo->pList); pInfo->pList = NULL; @@ -338,7 +336,10 @@ void processDownstreamReadyRsp(SStreamTask* pTask) { (void) streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true); if (pTask->status.taskStatus == TASK_STATUS__HALT) { - ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) && (pTask->info.fillHistory == 0)); + if (!HAS_RELATED_FILLHISTORY_TASK(pTask) || (pTask->info.fillHistory != 0)) { + stError("s-task:%s status:halt fillhistory:%d not handle the ready rsp", pTask->id.idStr, + pTask->info.fillHistory); + } // halt it self for count window stream task until the related fill history task completed. stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr, @@ -396,7 +397,6 @@ void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutput pInfo->notReadyTasks = 1; } else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) { pInfo->notReadyTasks = taosArrayGetSize(pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos); - ASSERT(pInfo->notReadyTasks == pOutputInfo->shuffleDispatcher.dbInfo.vgNum); } pInfo->startTs = startTs; @@ -461,7 +461,6 @@ int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) { if (pInfo->inCheckProcess == 0) { pInfo->inCheckProcess = 1; } else { - ASSERT(pInfo->startTs > 0); stError("s-task:%s already in check procedure, checkTs:%" PRId64 ", start monitor check rsp failed", id, pInfo->startTs); pInfo->stopCheckProcess = 0; // disable auto stop of check process @@ -564,8 +563,6 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { break; } } - } else { - ASSERT(0); } } @@ -585,7 +582,6 @@ void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, i (*numOfFault) += 1; } else { // TASK_DOWNSTREAM_NOT_READY if (p->rspTs == 0) { // not response yet - ASSERT(p->status == -1); if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec. (void) taosArrayPush(pTimeoutList, &p->taskId); } else { // el < CHECK_NOT_RSP_DURATION @@ -610,9 +606,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) { int32_t vgId = pTask->pMeta->vgId; int32_t numOfTimeout = taosArrayGetSize(pTimeoutList); - ASSERT(pTask->status.downstreamReady == 0); pInfo->timeoutStartTs = taosGetTimestampMs(); - for (int32_t i = 0; i < numOfTimeout; ++i) { int32_t* px = taosArrayGet(pTimeoutList, i); if (px == NULL) { @@ -623,7 +617,13 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) { SDownstreamStatusInfo* p = NULL; findCheckRspStatus(pInfo, taskId, &p); if (p != NULL) { - ASSERT(p->status == -1 && p->rspTs == 0); + + if (p->status != -1 || p->rspTs != 0) { + stError("s-task:%s invalid rsp record entry, index:%d, status:%d, rspTs:%"PRId64, pTask->id.idStr, i, p->status, + p->rspTs); + continue; + } + doSendCheckMsg(pTask, p); } } @@ -662,8 +662,6 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) { int32_t vgId = pTask->pMeta->vgId; int32_t numOfNotReady = taosArrayGetSize(pNotReadyList); - ASSERT(pTask->status.downstreamReady == 0); - // reset the info, and send the check msg to failure downstream again for (int32_t i = 0; i < numOfNotReady; ++i) { int32_t* pTaskId = taosArrayGet(pNotReadyList, i); @@ -696,12 +694,10 @@ int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64 void rspMonitorFn(void* param, void* tmrId) { SStreamTask* pTask = param; SStreamMeta* pMeta = pTask->pMeta; - SStreamTaskState pStat = streamTaskGetStatus(pTask); STaskCheckInfo* pInfo = &pTask->taskCheckInfo; int32_t vgId = pTask->pMeta->vgId; int64_t now = taosGetTimestampMs(); int64_t timeoutDuration = now - pInfo->timeoutStartTs; - ETaskStatus state = pStat.state; const char* id = pTask->id.idStr; int32_t numOfReady = 0; int32_t numOfFault = 0; @@ -712,9 +708,13 @@ void rspMonitorFn(void* param, void* tmrId) { stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id); - if (state == TASK_STATUS__STOP) { + streamMutexLock(&pTask->lock); + SStreamTaskState state = streamTaskGetStatus(pTask); + streamMutexUnlock(&pTask->lock); + + if (state.state == TASK_STATUS__STOP) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat.name, vgId, ref); + stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, state.name, vgId, ref); streamTaskCompleteCheckRsp(pInfo, true, id); @@ -728,9 +728,9 @@ void rspMonitorFn(void* param, void* tmrId) { return; } - if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) { + if (state.state == TASK_STATUS__DROPPING || state.state == TASK_STATUS__READY) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat.name, vgId, ref); + stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, state.name, vgId, ref); streamTaskCompleteCheckRsp(pInfo, true, id); streamMetaReleaseTask(pMeta, pTask); @@ -740,7 +740,7 @@ void rspMonitorFn(void* param, void* tmrId) { streamMutexLock(&pInfo->checkInfoLock); if (pInfo->notReadyTasks == 0) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", id, pStat.name, + stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", id, state.name, vgId, ref); streamTaskCompleteCheckRsp(pInfo, false, id); @@ -752,21 +752,34 @@ void rspMonitorFn(void* param, void* tmrId) { SArray* pNotReadyList = taosArrayInit(4, sizeof(int64_t)); SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t)); - if (pStat.state == TASK_STATUS__UNINIT) { + if (state.state == TASK_STATUS__UNINIT) { getCheckRspStatus(pInfo, timeoutDuration, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id); numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList); numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList); // fault tasks detected, not try anymore - ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == total); + bool jumpOut = false; + if ((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) != total) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + + stError( + "s-task:%s vgId:%d internal error in handling the check downstream procedure, rsp number is inconsistent, " + "stop rspMonitor tmr, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", + id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + jumpOut = true; + } + if (numOfFault > 0) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug( "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart " "detected, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", - id, pStat.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + id, state.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + jumpOut = true; + } + if (jumpOut) { streamTaskCompleteCheckRsp(pInfo, false, id); streamMutexUnlock(&pInfo->checkInfoLock); streamMetaReleaseTask(pMeta, pTask); @@ -776,7 +789,7 @@ void rspMonitorFn(void* param, void* tmrId) { return; } } else { // unexpected status - stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat.name); + stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, state.name); } // checking of downstream tasks has been stopped by other threads @@ -785,7 +798,7 @@ void rspMonitorFn(void* param, void* tmrId) { stDebug( "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, total:%d, notRsp:%d, " "notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", - id, pStat.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + id, state.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); streamTaskCompleteCheckRsp(pInfo, false, id); streamMutexUnlock(&pInfo->checkInfoLock); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 0ef7c2312a..1a0b1e8665 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -40,7 +40,10 @@ int32_t createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int6 pChkpoint->type = checkpointType; if (checkpointType == STREAM_INPUT__CHECKPOINT_TRIGGER && (pTask->info.taskLevel != TASK_LEVEL__SOURCE)) { pChkpoint->srcTaskId = srcTaskId; - ASSERT(srcTaskId != 0); + if (srcTaskId <= 0) { + stDebug("s-task:%s invalid src task id:%d for creating checkpoint trigger block", pTask->id.idStr, srcTaskId); + return TSDB_CODE_INVALID_PARA; + } } SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); @@ -189,10 +192,13 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) { pBlock->srcTaskId = pTask->id.taskId; pBlock->srcVgId = pTask->pMeta->vgId; + if (pTask->chkInfo.pActiveInfo->dispatchTrigger == true) { + stError("s-task:%s already dispatch checkpoint-trigger, not dispatch again", pTask->id.idStr); + return 0; + } int32_t code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock); if (code == 0) { - ASSERT(pTask->chkInfo.pActiveInfo->dispatchTrigger == false); code = streamDispatchStreamBlock(pTask); } else { stError("s-task:%s failed to put checkpoint into outputQ, code:%s", pTask->id.idStr, tstrerror(code)); @@ -593,9 +599,16 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV } } - ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer && + bool valid = (pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer && pInfo->processedVer <= pReq->checkpointVer); + if (!valid) { + stFatal("invalid checkpoint id check, current checkpointId:%" PRId64 " checkpointVer:%" PRId64 + " processedVer:%" PRId64 " req checkpointId:%" PRId64 " checkpointVer:%" PRId64, + pInfo->checkpointId, pInfo->checkpointVer, pInfo->processedVer, pReq->checkpointId, pReq->checkpointVer); + return TSDB_CODE_STREAM_INTERNAL_ERROR; + } + // update only it is in checkpoint status, or during restore procedure. if (pStatus.state == TASK_STATUS__CK || (!restored)) { pInfo->checkpointId = pReq->checkpointId; @@ -1102,7 +1115,10 @@ void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) { streamMutexLock(&pInfo->lock); // outputQ should be empty here - ASSERT(streamQueueGetNumOfUnAccessedItems(pTask->outputq.queue) == 0); + if (streamQueueGetNumOfUnAccessedItems(pTask->outputq.queue) > 0) { + stFatal("s-task:%s items are still in outputQ, failed to init trigger dispatch info", pTask->id.idStr); + return; + } pInfo->dispatchTrigger = true; if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { @@ -1375,9 +1391,7 @@ int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) { pTask->pBackend = NULL; } - ASSERT(pTask->pBackend == NULL); pTask->status.requireConsensusChkptId = true; - stDebug("s-task:%s set the require consensus-checkpointId flag", id); return 0; } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 0602bf9334..d85435d21c 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -33,7 +33,6 @@ int32_t createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t return code; } - ASSERT((pReq->blockNum == taosArrayGetSize(pReq->data)) && (pReq->blockNum == taosArrayGetSize(pReq->dataLen))); for (int32_t i = 0; i < blockNum; i++) { SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pReq->data, i); SSDataBlock* pDataBlock = taosArrayGet(pArray, i); @@ -52,7 +51,6 @@ int32_t createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t } int32_t len = tsDecompressString(pInput, compLen, 1, p, fullLen, ONE_STAGE_COMP, NULL, 0); - ASSERT(len == fullLen); pInput = p; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index bf64af6558..918a6b6cb1 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -254,7 +254,11 @@ static SStreamDispatchReq* createDispatchDataReq(SStreamTask* pTask, const SStre int32_t type = pTask->outputInfo.type; int32_t num = streamTaskGetNumOfDownstream(pTask); - ASSERT(type == TASK_OUTPUT__SHUFFLE_DISPATCH || type == TASK_OUTPUT__FIXED_DISPATCH); + if(type != TASK_OUTPUT__SHUFFLE_DISPATCH && type != TASK_OUTPUT__FIXED_DISPATCH) { + terrno = TSDB_CODE_INVALID_PARA; + stError("s-task:%s invalid dispatch type:%d not dispatch data", pTask->id.idStr, type); + return NULL; + } SStreamDispatchReq* pReqs = taosMemoryCalloc(num, sizeof(SStreamDispatchReq)); if (pReqs == NULL) { @@ -279,7 +283,7 @@ static SStreamDispatchReq* createDispatchDataReq(SStreamTask* pTask, const SStre return NULL; } } - } else { + } else { // shuffle dispatch int32_t numOfBlocks = taosArrayGetSize(pData->blocks); int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c index 6fe2d818b3..8105614d76 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/libs/stream/src/streamMsg.c @@ -16,6 +16,7 @@ #include "streamMsg.h" #include "os.h" #include "tstream.h" +#include "streamInt.h" int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) { if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1; @@ -229,8 +230,12 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p if (tEncodeI32(pEncoder, pReq->upstreamRelTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1; if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1; - ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum); - ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum); + + if (taosArrayGetSize(pReq->data) != pReq->blockNum || taosArrayGetSize(pReq->dataLen) != pReq->blockNum) { + stError("invalid dispatch req msg"); + return TSDB_CODE_INVALID_MSG; + } + for (int32_t i = 0; i < pReq->blockNum; i++) { int32_t* pLen = taosArrayGet(pReq->dataLen, i); void* data = taosArrayGetP(pReq->data, i); @@ -261,7 +266,6 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->totalLen) < 0) return -1; - ASSERT(pReq->blockNum > 0); pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*)); pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t)); for (int32_t i = 0; i < pReq->blockNum; i++) { @@ -270,7 +274,10 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { void* data; if (tDecodeI32(pDecoder, &len1) < 0) return -1; if (tDecodeBinaryAlloc(pDecoder, &data, &len2) < 0) return -1; - ASSERT(len1 == len2); + + if (len1 != len2) { + return TSDB_CODE_INVALID_MSG; + } void* p = taosArrayPush(pReq->dataLen, &len1); if (p == NULL) { diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 752101afbd..3703ed07aa 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -452,7 +452,9 @@ static void fillTokenBucket(STokenBucket* pBucket, const char* id) { int64_t now = taosGetTimestampMs(); int64_t deltaToken = now - pBucket->tokenFillTimestamp; - ASSERT(pBucket->numOfToken >= 0); + if (pBucket->numOfToken < 0) { + return; + } int32_t incNum = (deltaToken / 1000.0) * pBucket->numRate; if (incNum > 0) { diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index e8c7be5204..585cf63cfc 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -22,9 +22,8 @@ static void streamTaskSchedHelper(void* param, void* tmrId); int32_t streamSetupScheduleTrigger(SStreamTask* pTask) { if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) { int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); - ASSERT(ref == 2 && pTask->schedInfo.pDelayTimer == NULL); - - stDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.delaySchedParam); + stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", pTask->id.idStr, ref, + pTask->info.delaySchedParam); pTask->schedInfo.pDelayTimer = taosTmrStart(streamTaskSchedHelper, (int32_t)pTask->info.delaySchedParam, pTask, streamTimer); diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 9122af0e12..0390422623 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -441,7 +441,9 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta SBackendSnapFile2 snapFile = {0}; code = streamBackendSnapInitFile(path, pSnap, &snapFile); - ASSERT(code == 0); + if (code) { + goto _err; + } void* p = taosArrayPush(pDbSnapSet, &snapFile); if (p == NULL) { @@ -767,7 +769,10 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa if (!taosIsDir(path)) { code = taosMulMkDir(path); stInfo("%s mkdir %s", STREAM_STATE_TRANSFER, path); - ASSERT(code == 0); + if (code) { + stError("s-task:0x%x failed to mkdir:%s", (int32_t) snapInfo.taskId, path); + return code; + } } pDbSnapFile->path = path; diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index 0dbb6ed454..e68c088b9a 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -53,10 +53,9 @@ static int32_t streamTaskSetReady(SStreamTask* pTask) { pTask->id.idStr, pTask->info.taskLevel, numOfUps, p.name); } - ASSERT(pTask->status.downstreamReady == 0); pTask->status.downstreamReady = 1; - pTask->execInfo.readyTs = taosGetTimestampMs(); + int64_t el = (pTask->execInfo.readyTs - pTask->execInfo.checkTs); stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s", pTask->id.idStr, numOfDowns, el, p.name); @@ -212,8 +211,6 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { int64_t now = taosGetTimestampMs(); int32_t code = 0; - ASSERT(hTaskId != 0); - // check stream task status in the first place. SStreamTaskState pStatus = streamTaskGetStatus(pTask); if (pStatus.state != TASK_STATUS__READY && pStatus.state != TASK_STATUS__HALT && diff --git a/source/libs/stream/src/streamStartTask.c b/source/libs/stream/src/streamStartTask.c index 99f4e84951..92aad2ece1 100644 --- a/source/libs/stream/src/streamStartTask.c +++ b/source/libs/stream/src/streamStartTask.c @@ -50,8 +50,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { SArray* pTaskList = NULL; code = prepareBeforeStartTasks(pMeta, &pTaskList, now); if (code != TSDB_CODE_SUCCESS) { - ASSERT(pTaskList == NULL); - return TSDB_CODE_SUCCESS; + return TSDB_CODE_SUCCESS; // ignore the error and return directly } // broadcast the check downstream tasks msg only for tasks with related fill-history tasks. @@ -364,7 +363,10 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } - ASSERT(pTask->status.downstreamReady == 0); + if(pTask->status.downstreamReady != 0) { + stFatal("s-task:0x%x downstream should be not ready, but it ready here, internal error happens", taskId); + return TSDB_CODE_STREAM_INTERNAL_ERROR; + } // avoid initialization and destroy running concurrently. streamMutexLock(&pTask->lock); diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 8fd26dda27..46fb7a521d 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -168,9 +168,7 @@ static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStream } if (isInvalidStateTransfer(state, event)) { - return NULL; - } else { - ASSERT(0); + stError("invalid state transfer %d, handle event:%s", state, GET_EVT_NAME(event)); } return NULL; @@ -182,8 +180,6 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName, stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr, pEventName, el, pSM->prev.state.name, pSM->current.name); - ASSERT(taosArrayGetSize(pSM->pWaitingEventList) == 1); - SFutureHandleEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0); if (pEvtInfo == NULL) { return terrno; @@ -501,8 +497,11 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even STaskStateTrans* pTrans = pSM->pActiveTrans; if (pTrans == NULL) { ETaskStatus s = pSM->current.state; - ASSERT(s == TASK_STATUS__DROPPING || s == TASK_STATUS__PAUSE || s == TASK_STATUS__STOP || - s == TASK_STATUS__UNINIT || s == TASK_STATUS__READY); + + if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__PAUSE && s != TASK_STATUS__STOP && + s != TASK_STATUS__UNINIT && s != TASK_STATUS__READY) { + stError("s-task:%s invalid task status:%s on handling event:%s success", id, pSM->current.name, GET_EVT_NAME(pSM->prev.evt)); + } // the pSM->prev.evt may be 0, so print string is not appropriate. stDebug("s-task:%s event:%s handled failed, current status:%s, trigger event:%s", id, GET_EVT_NAME(event), diff --git a/source/libs/stream/src/streamTimer.c b/source/libs/stream/src/streamTimer.c index 8f2fc83b72..b6275c0eb1 100644 --- a/source/libs/stream/src/streamTimer.c +++ b/source/libs/stream/src/streamTimer.c @@ -57,6 +57,9 @@ int32_t streamCleanBeforeQuitTmr(SStreamTmrInfo* pInfo, SStreamTask* pTask) { atomic_store_8(&pInfo->isActive, 0); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - ASSERT(ref >= 0); + if (ref < 0) { + stFatal("invalid task timer ref value:%d, %s", ref, pTask->id.idStr); + } + return ref; } \ No newline at end of file diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index bc2253d4bd..8e32822fb7 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -573,7 +573,7 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) { if (tDecodeI32(&decoder, &size) < 0) return -1; pInfo->pTsBuckets = taosArrayInit(size, sizeof(TSKEY)); QUERY_CHECK_NULL(pInfo->pTsBuckets, code, lino, _error, terrno); - + TSKEY ts = INT64_MIN; for (int32_t i = 0; i < size; i++) { if (tDecodeI64(&decoder, &ts) < 0) return -1; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 1d2e2357d1..a58baf5883 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -809,6 +809,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exi TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_EXEC_CANCELLED, "Stream task exec cancelled") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVALID_STATETRANS, "Invalid task state to handle event") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_IVLD_STATUS, "Invalid task status to proceed") +TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INTERNAL_ERROR, "Stream internal error") +TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_NOT_LEADER, "Stream task not on leader vnode") // TDLite TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")