refactor: remove assert in stream module.

This commit is contained in:
Haojun Liao 2024-08-21 10:00:35 +08:00
parent bbdd1f655b
commit b6084e64ce
14 changed files with 63 additions and 35 deletions

View File

@ -958,6 +958,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_STREAM_INVALID_STATETRANS TAOS_DEF_ERROR_CODE(0, 0x4103) #define TSDB_CODE_STREAM_INVALID_STATETRANS TAOS_DEF_ERROR_CODE(0, 0x4103)
#define TSDB_CODE_STREAM_TASK_IVLD_STATUS TAOS_DEF_ERROR_CODE(0, 0x4104) #define TSDB_CODE_STREAM_TASK_IVLD_STATUS TAOS_DEF_ERROR_CODE(0, 0x4104)
#define TSDB_CODE_STREAM_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x4105) #define TSDB_CODE_STREAM_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x4105)
#define TSDB_CODE_STREAM_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x4106)
// TDLite // TDLite
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100) #define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)

View File

@ -37,10 +37,14 @@ int32_t createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int6
return code; return code;
} }
if (srcTaskId <= 0) {
stDebug("s-task:0x%x invalid src task id:%d for creating checkpoint trigger block", pTask->id.idStr, srcTaskId);
return TSDB_CODE_INVALID_PARA;
}
pChkpoint->type = checkpointType; pChkpoint->type = checkpointType;
if (checkpointType == STREAM_INPUT__CHECKPOINT_TRIGGER && (pTask->info.taskLevel != TASK_LEVEL__SOURCE)) { if (checkpointType == STREAM_INPUT__CHECKPOINT_TRIGGER && (pTask->info.taskLevel != TASK_LEVEL__SOURCE)) {
pChkpoint->srcTaskId = srcTaskId; pChkpoint->srcTaskId = srcTaskId;
ASSERT(srcTaskId != 0);
} }
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
@ -189,10 +193,13 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId
int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) { int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
pBlock->srcTaskId = pTask->id.taskId; pBlock->srcTaskId = pTask->id.taskId;
pBlock->srcVgId = pTask->pMeta->vgId; pBlock->srcVgId = pTask->pMeta->vgId;
if (pTask->chkInfo.pActiveInfo->dispatchTrigger == true) {
stError("s-task:%s already dispatch checkpoint-trigger, not dispatch again", pTask->id.idStr);
return 0;
}
int32_t code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock); int32_t code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
if (code == 0) { if (code == 0) {
ASSERT(pTask->chkInfo.pActiveInfo->dispatchTrigger == false);
code = streamDispatchStreamBlock(pTask); code = streamDispatchStreamBlock(pTask);
} else { } else {
stError("s-task:%s failed to put checkpoint into outputQ, code:%s", pTask->id.idStr, tstrerror(code)); stError("s-task:%s failed to put checkpoint into outputQ, code:%s", pTask->id.idStr, tstrerror(code));
@ -593,8 +600,11 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
} }
} }
ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer && bool valid = (pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer &&
pInfo->processedVer <= pReq->checkpointVer); pInfo->processedVer <= pReq->checkpointVer);
if (!valid) {
stFatal("");
}
// update only it is in checkpoint status, or during restore procedure. // update only it is in checkpoint status, or during restore procedure.
if (pStatus.state == TASK_STATUS__CK || (!restored)) { if (pStatus.state == TASK_STATUS__CK || (!restored)) {
@ -1102,7 +1112,10 @@ void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
streamMutexLock(&pInfo->lock); streamMutexLock(&pInfo->lock);
// outputQ should be empty here // outputQ should be empty here
ASSERT(streamQueueGetNumOfUnAccessedItems(pTask->outputq.queue) == 0); if (streamQueueGetNumOfUnAccessedItems(pTask->outputq.queue) > 0) {
stFatal("s-task:%s items are still in outputQ, failed to init trigger dispatch info", pTask->id.idStr);
return;
}
pInfo->dispatchTrigger = true; pInfo->dispatchTrigger = true;
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
@ -1375,9 +1388,7 @@ int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) {
pTask->pBackend = NULL; pTask->pBackend = NULL;
} }
ASSERT(pTask->pBackend == NULL);
pTask->status.requireConsensusChkptId = true; pTask->status.requireConsensusChkptId = true;
stDebug("s-task:%s set the require consensus-checkpointId flag", id); stDebug("s-task:%s set the require consensus-checkpointId flag", id);
return 0; return 0;
} }

View File

@ -33,7 +33,6 @@ int32_t createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t
return code; return code;
} }
ASSERT((pReq->blockNum == taosArrayGetSize(pReq->data)) && (pReq->blockNum == taosArrayGetSize(pReq->dataLen)));
for (int32_t i = 0; i < blockNum; i++) { for (int32_t i = 0; i < blockNum; i++) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pReq->data, i); SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(pReq->data, i);
SSDataBlock* pDataBlock = taosArrayGet(pArray, i); SSDataBlock* pDataBlock = taosArrayGet(pArray, i);
@ -52,7 +51,6 @@ int32_t createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t
} }
int32_t len = tsDecompressString(pInput, compLen, 1, p, fullLen, ONE_STAGE_COMP, NULL, 0); int32_t len = tsDecompressString(pInput, compLen, 1, p, fullLen, ONE_STAGE_COMP, NULL, 0);
ASSERT(len == fullLen);
pInput = p; pInput = p;
} }

View File

@ -229,8 +229,11 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p
if (tEncodeI32(pEncoder, pReq->upstreamRelTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamRelTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1; if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1; if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1;
ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum); if (taosArrayGetSize(pReq->data) != pReq->blockNum || taosArrayGetSize(pReq->dataLen) == pReq->blockNum) {
return TSDB_CODE_INVALID_MSG;
}
for (int32_t i = 0; i < pReq->blockNum; i++) { for (int32_t i = 0; i < pReq->blockNum; i++) {
int32_t* pLen = taosArrayGet(pReq->dataLen, i); int32_t* pLen = taosArrayGet(pReq->dataLen, i);
void* data = taosArrayGetP(pReq->data, i); void* data = taosArrayGetP(pReq->data, i);
@ -261,7 +264,6 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->totalLen) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->totalLen) < 0) return -1;
ASSERT(pReq->blockNum > 0);
pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*)); pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*));
pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t)); pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t));
for (int32_t i = 0; i < pReq->blockNum; i++) { for (int32_t i = 0; i < pReq->blockNum; i++) {
@ -270,7 +272,10 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
void* data; void* data;
if (tDecodeI32(pDecoder, &len1) < 0) return -1; if (tDecodeI32(pDecoder, &len1) < 0) return -1;
if (tDecodeBinaryAlloc(pDecoder, &data, &len2) < 0) return -1; if (tDecodeBinaryAlloc(pDecoder, &data, &len2) < 0) return -1;
ASSERT(len1 == len2);
if (len1 != len2) {
return TSDB_CODE_INVALID_MSG;
}
void* p = taosArrayPush(pReq->dataLen, &len1); void* p = taosArrayPush(pReq->dataLen, &len1);
if (p == NULL) { if (p == NULL) {

View File

@ -452,7 +452,9 @@ static void fillTokenBucket(STokenBucket* pBucket, const char* id) {
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
int64_t deltaToken = now - pBucket->tokenFillTimestamp; int64_t deltaToken = now - pBucket->tokenFillTimestamp;
ASSERT(pBucket->numOfToken >= 0); if (pBucket->numOfToken < 0) {
return;
}
int32_t incNum = (deltaToken / 1000.0) * pBucket->numRate; int32_t incNum = (deltaToken / 1000.0) * pBucket->numRate;
if (incNum > 0) { if (incNum > 0) {

View File

@ -22,9 +22,8 @@ static void streamTaskSchedHelper(void* param, void* tmrId);
int32_t streamSetupScheduleTrigger(SStreamTask* pTask) { int32_t streamSetupScheduleTrigger(SStreamTask* pTask) {
if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) { if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
ASSERT(ref == 2 && pTask->schedInfo.pDelayTimer == NULL); stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", pTask->id.idStr, ref,
pTask->info.delaySchedParam);
stDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.delaySchedParam);
pTask->schedInfo.pDelayTimer = pTask->schedInfo.pDelayTimer =
taosTmrStart(streamTaskSchedHelper, (int32_t)pTask->info.delaySchedParam, pTask, streamTimer); taosTmrStart(streamTaskSchedHelper, (int32_t)pTask->info.delaySchedParam, pTask, streamTimer);

View File

@ -441,7 +441,9 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta
SBackendSnapFile2 snapFile = {0}; SBackendSnapFile2 snapFile = {0};
code = streamBackendSnapInitFile(path, pSnap, &snapFile); code = streamBackendSnapInitFile(path, pSnap, &snapFile);
ASSERT(code == 0); if (code) {
goto _err;
}
void* p = taosArrayPush(pDbSnapSet, &snapFile); void* p = taosArrayPush(pDbSnapSet, &snapFile);
if (p == NULL) { if (p == NULL) {
@ -767,7 +769,10 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
if (!taosIsDir(path)) { if (!taosIsDir(path)) {
code = taosMulMkDir(path); code = taosMulMkDir(path);
stInfo("%s mkdir %s", STREAM_STATE_TRANSFER, path); stInfo("%s mkdir %s", STREAM_STATE_TRANSFER, path);
ASSERT(code == 0); if (code) {
stError("s-task:0x%x failed to mkdir:%s", (int32_t) snapInfo.taskId, path);
return code;
}
} }
pDbSnapFile->path = path; pDbSnapFile->path = path;

View File

@ -53,10 +53,9 @@ static int32_t streamTaskSetReady(SStreamTask* pTask) {
pTask->id.idStr, pTask->info.taskLevel, numOfUps, p.name); pTask->id.idStr, pTask->info.taskLevel, numOfUps, p.name);
} }
ASSERT(pTask->status.downstreamReady == 0);
pTask->status.downstreamReady = 1; pTask->status.downstreamReady = 1;
pTask->execInfo.readyTs = taosGetTimestampMs(); pTask->execInfo.readyTs = taosGetTimestampMs();
int64_t el = (pTask->execInfo.readyTs - pTask->execInfo.checkTs); int64_t el = (pTask->execInfo.readyTs - pTask->execInfo.checkTs);
stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s", stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s",
pTask->id.idStr, numOfDowns, el, p.name); pTask->id.idStr, numOfDowns, el, p.name);
@ -212,8 +211,6 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
int32_t code = 0; int32_t code = 0;
ASSERT(hTaskId != 0);
// check stream task status in the first place. // check stream task status in the first place.
SStreamTaskState pStatus = streamTaskGetStatus(pTask); SStreamTaskState pStatus = streamTaskGetStatus(pTask);
if (pStatus.state != TASK_STATUS__READY && pStatus.state != TASK_STATUS__HALT && if (pStatus.state != TASK_STATUS__READY && pStatus.state != TASK_STATUS__HALT &&

View File

@ -50,8 +50,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
SArray* pTaskList = NULL; SArray* pTaskList = NULL;
code = prepareBeforeStartTasks(pMeta, &pTaskList, now); code = prepareBeforeStartTasks(pMeta, &pTaskList, now);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
ASSERT(pTaskList == NULL); return TSDB_CODE_SUCCESS; // ignore the error and return directly
return TSDB_CODE_SUCCESS;
} }
// broadcast the check downstream tasks msg only for tasks with related fill-history tasks. // broadcast the check downstream tasks msg only for tasks with related fill-history tasks.
@ -364,7 +363,10 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
return TSDB_CODE_STREAM_TASK_IVLD_STATUS; return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
} }
ASSERT(pTask->status.downstreamReady == 0); if(pTask->status.downstreamReady != 0) {
stFatal("s-task:0x%x downstream should be not ready, but it ready here, internal error happens", taskId);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
}
// avoid initialization and destroy running concurrently. // avoid initialization and destroy running concurrently.
streamMutexLock(&pTask->lock); streamMutexLock(&pTask->lock);

View File

@ -168,9 +168,7 @@ static STaskStateTrans* streamTaskFindTransform(ETaskStatus state, const EStream
} }
if (isInvalidStateTransfer(state, event)) { if (isInvalidStateTransfer(state, event)) {
return NULL; stError("invalid state transfer %d, handle event:%d", state, GET_EVT_NAME(event));
} else {
ASSERT(0);
} }
return NULL; return NULL;
@ -182,8 +180,6 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName,
stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr, stDebug("s-task:%s handle event:%s completed, elapsed time:%" PRId64 "ms state:%s -> %s", pTask->id.idStr,
pEventName, el, pSM->prev.state.name, pSM->current.name); pEventName, el, pSM->prev.state.name, pSM->current.name);
ASSERT(taosArrayGetSize(pSM->pWaitingEventList) == 1);
SFutureHandleEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0); SFutureHandleEventInfo* pEvtInfo = taosArrayGet(pSM->pWaitingEventList, 0);
if (pEvtInfo == NULL) { if (pEvtInfo == NULL) {
return terrno; return terrno;

View File

@ -57,6 +57,9 @@ int32_t streamCleanBeforeQuitTmr(SStreamTmrInfo* pInfo, SStreamTask* pTask) {
atomic_store_8(&pInfo->isActive, 0); atomic_store_8(&pInfo->isActive, 0);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
ASSERT(ref >= 0); if (ref < 0) {
stFatal("invalid task timer ref value:%d, %s", ref, pTask->id.idStr);
}
return ref; return ref;
} }

View File

@ -564,7 +564,10 @@ _end:
int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) { int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
ASSERT(pInfo); if (pInfo == NULL) {
return TSDB_CODE_INVALID_PARA;
}
SDecoder decoder = {0}; SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen); tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
@ -623,7 +626,11 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) {
code = taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), pVal, valSize); code = taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), pVal, valSize);
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
} }
ASSERT(mapSize == taosHashGetSize(pInfo->pMap));
if (mapSize != taosHashGetSize(pInfo->pMap)) {
return TSDB_CODE_INVALID_MSG;
}
if (tDecodeU64(&decoder, &pInfo->maxDataVersion) < 0) return -1; if (tDecodeU64(&decoder, &pInfo->maxDataVersion) < 0) return -1;
if (tDecodeI32(&decoder, &pInfo->pkColLen) < 0) return -1; if (tDecodeI32(&decoder, &pInfo->pkColLen) < 0) return -1;

View File

@ -252,7 +252,7 @@ int32_t l2ComressInitImpl_xz(char *lossyColumns, float fPrecision, double dPreci
} }
int32_t l2CompressImpl_xz(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize, int32_t l2CompressImpl_xz(const char *const input, const int32_t inputSize, char *const output, int32_t outputSize,
const char type, int8_t lvl) { const char type, int8_t lvl) {
size_t len = FL2_compress(output + 1, outputSize - 1, input, inputSize, lvl); size_t len = 0;//FL2_compress(output + 1, outputSize - 1, input, inputSize, lvl);
if (len > inputSize) { if (len > inputSize) {
output[0] = 0; output[0] = 0;
memcpy(output + 1, input, inputSize); memcpy(output + 1, input, inputSize);
@ -264,7 +264,7 @@ int32_t l2CompressImpl_xz(const char *const input, const int32_t inputSize, char
int32_t l2DecompressImpl_xz(const char *const input, const int32_t compressedSize, char *const output, int32_t l2DecompressImpl_xz(const char *const input, const int32_t compressedSize, char *const output,
int32_t outputSize, const char type) { int32_t outputSize, const char type) {
if (input[0] == 1) { if (input[0] == 1) {
return FL2_decompress(output, outputSize, input + 1, compressedSize - 1); return 0;//FL2_decompress(output, outputSize, input + 1, compressedSize - 1);
} else if (input[0] == 0) { } else if (input[0] == 0) {
memcpy(output, input + 1, compressedSize - 1); memcpy(output, input + 1, compressedSize - 1);
return compressedSize - 1; return compressedSize - 1;

View File

@ -802,6 +802,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exi
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_EXEC_CANCELLED, "Stream task exec cancelled") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_EXEC_CANCELLED, "Stream task exec cancelled")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVALID_STATETRANS, "Invalid task state to handle event") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVALID_STATETRANS, "Invalid task state to handle event")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_IVLD_STATUS, "Invalid task status to proceed") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_IVLD_STATUS, "Invalid task status to proceed")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INTERNAL_ERROR, "Stream internal error")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_NOT_LEADER, "Stream task not on leader vnode")
// TDLite // TDLite
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags") TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")