Merge pull request #27554 from taosdata/fix/syntax

refactor: remove assert
This commit is contained in:
Haojun Liao 2024-08-31 15:57:27 +08:00 committed by GitHub
commit 4c91928db3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 207 additions and 109 deletions

View File

@ -342,7 +342,10 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
// todo remove this
void* pGpIdData = colDataGetData(pGpIdColInfo, rowId);
ASSERT(gid == *(int64_t*)pGpIdData);
if (gid != *(int64_t*)pGpIdData) {
tqError("s-task:%s vgId:%d invalid groupId:%" PRId64 " actual:%" PRId64 " in sink task", id, vgId, gid,
*(int64_t*)pGpIdData);
}
}
code = setCreateTableMsgTableName(pCreateTbReq, pDataBlock, stbFullName, gid,
@ -747,7 +750,6 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
return code;
}
ASSERT(pRow);
void* p = taosArrayPush(pTableData->aRowP, &pRow);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
@ -779,8 +781,6 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI
bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid);
if (isValid) { // not valid table, ignore it
tqDebug("s-task:%s set uid:%" PRIu64 " for dstTable:%s from meta", id, mr.me.uid, pTableSinkInfo->name.data);
ASSERT(terrno == 0);
// set the destination table uid
(*uid) = mr.me.uid;
pTableSinkInfo->uid = mr.me.uid;

View File

@ -46,7 +46,10 @@ int32_t tqScanWal(STQ* pTq) {
streamMetaWLock(pMeta);
int32_t times = (--pMeta->scanInfo.scanCounter);
ASSERT(pMeta->scanInfo.scanCounter >= 0);
if (times < 0) {
tqError("vgId:%d invalid scan counter:%d, reset to 0", vgId, times);
times = 0;
}
numOfTasks = taosArrayGetSize(pMeta->pTaskList);
streamMetaWUnLock(pMeta);
@ -269,7 +272,6 @@ bool taskReadyForDataFromWal(SStreamTask* pTask) {
// fill-history task has entered into the last phase, no need to anything
if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) {
ASSERT(pState.state == TASK_STATUS__READY);
// the maximum version of data in the WAL has reached already, the step2 is done
tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
pTask->dataRange.range.maxVer);

View File

@ -339,13 +339,15 @@ int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
if (pRspHead == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("s-task:0x%x send dispatch error rsp, out of memory", req.taskId);
return -1;
return terrno;
}
pRspHead->vgId = htonl(req.upstreamNodeId);
ASSERT(pRspHead->vgId != 0);
if(pRspHead->vgId == 0) {
tqError("vgId:%d invalid dispatch msg from upstream to task:0x%x", pMeta->vgId, req.taskId);
return TSDB_CODE_INVALID_MSG;
}
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
pRsp->streamId = htobe64(req.streamId);
@ -926,7 +928,6 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
streamTaskSetStatusReady(pTask);
} else if (pState.state == TASK_STATUS__UNINIT) {
// tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
// ASSERT(pTask->status.downstreamReady == 0);
// tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId);
tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState.name);
} else {
@ -1000,7 +1001,10 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
TSDB_CODE_ACTION_IN_PROGRESS);
}
} else { // upstream not recv the checkpoint-source/trigger till now
ASSERT(pState.state == TASK_STATUS__READY || pState.state == TASK_STATUS__HALT);
if (!(pState.state == TASK_STATUS__READY || pState.state == TASK_STATUS__HALT)) {
tqFatal("s-task:%s invalid task status:%s", pTask->id.idStr, pState.name);
}
tqWarn(
"s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all "
"upstream sending checkpoint-source/trigger",
@ -1109,9 +1113,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
} else {
code = streamTrySchedExec(pTask);
}
} /*else {
ASSERT(status != TASK_STATUS__UNINIT);
}*/
}
streamMetaReleaseTask(pMeta, pTask);
return code;
@ -1234,7 +1236,14 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
pTask->id.idStr, vgId, pTask->chkInfo.checkpointId, req.checkpointId);
streamMutexLock(&pTask->lock);
ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId);
if (pTask->chkInfo.checkpointId < req.checkpointId) {
tqFatal("s-task:%s vgId:%d invalid consensus-checkpointId:%" PRId64 ", greater than existed checkpointId:%"PRId64,
pTask->id.idStr, vgId, req.checkpointId, pTask->chkInfo.checkpointId);
streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
}
SConsenChkptInfo* pConsenInfo = &pTask->status.consenChkptInfo;
if (pConsenInfo->consenChkptTransId >= req.transId) {

View File

@ -4817,7 +4817,7 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi
return code;
_err:
tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr);
tsdbError("failed to create data reader, error at:%d code:%s %s", lino, tstrerror(code), idstr);
tsdbReaderClose2(*ppReader);
*ppReader = NULL; // reset the pointer value.
return code;

View File

@ -749,7 +749,7 @@ _end:
freeTableCachedValObj(&val);
}
}
if (freeReader) {
pHandle->api.metaReaderFn.clearReader(&mr);
}
@ -5548,7 +5548,7 @@ static int32_t getBlockForTableMergeScan(void* param, SSDataBlock** ppBlock) {
pOperator->resultInfo.totalRows += pBlock->info.rows;
pInfo->base.readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
*ppBlock = pBlock;
return code;
}
@ -5703,7 +5703,7 @@ void startGroupTableMergeScan(SOperatorInfo* pOperator) {
int32_t numOfTables = 0;
code = tableListGetSize(pInfo->base.pTableListInfo, &numOfTables);
QUERY_CHECK_CODE(code, lino, _end);
int32_t i = pInfo->tableStartIndex + 1;
for (; i < numOfTables; ++i) {
STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);

View File

@ -306,7 +306,12 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
int32_t code = 0;
int64_t now = taosGetTimestampMs();
int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
ASSERT(numOfBlocks != 0 && pTask->msgInfo.pData == NULL);
if (!(numOfBlocks != 0 && pTask->msgInfo.pData == NULL)) {
stError("s-task:%s dispatch block number:%d, exist not rsp dispatch msg:%p, abort build new dispatch msg",
pTask->id.idStr, numOfBlocks, pTask->msgInfo.pData);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
}
pTask->msgInfo.dispatchMsgType = pData->type;
@ -711,14 +716,15 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
}
streamMutexUnlock(&pTask->msgInfo.lock);
ASSERT(found);
return 0;
if (!found) {
stError("s-task:%s not found req hash value:%u", pTask->id.idStr, hashValue);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
} else {
return 0;
}
}
int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH ||
pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH));
const char* id = pTask->id.idStr;
int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputq.queue);
if (numOfElems > 0) {
@ -742,8 +748,11 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
return 0;
}
ASSERT(pTask->msgInfo.pData == NULL);
stDebug("s-task:%s start to dispatch msg, set output status:%d", id, pTask->outputq.status);
if (pTask->msgInfo.pData != NULL) {
stFatal("s-task:%s not rsp data:%p exist, should not dispatch msg now", id, pTask->msgInfo.pData);
} else {
stDebug("s-task:%s start to dispatch msg, set output status:%d", id, pTask->outputq.status);
}
SStreamDataBlock* pBlock = NULL;
streamQueueNextItem(pTask->outputq.queue, (SStreamQueueItem**)&pBlock);
@ -754,8 +763,11 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
}
int32_t type = pBlock->type;
ASSERT(type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
type == STREAM_INPUT__TRANS_STATE);
if (!(type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
type == STREAM_INPUT__TRANS_STATE)) {
stError("s-task:%s invalid dispatch block type:%d", id, type);
return TSDB_CODE_INTERNAL_ERROR;
}
pTask->execInfo.dispatch += 1;
@ -881,7 +893,6 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
SArray* pList = pActiveInfo->pReadyMsgList;
int32_t num = taosArrayGetSize(pList);
if (pTmrInfo->launchChkptId != pActiveInfo->activeId) {
streamMutexUnlock(&pActiveInfo->lock);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
@ -905,7 +916,15 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
SArray* pNotRspList = taosArrayInit(4, sizeof(int32_t));
ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) == num);
if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) {
streamMutexUnlock(&pActiveInfo->lock);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stWarn("s-task:%s vgId:%d upstream number:%d not equals sent readyMsg:%d, quit from readyMsg send tmr, ref:%d", id,
vgId, (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), num, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
for (int32_t i = 0; i < num; ++i) {
STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, i);
@ -991,7 +1010,11 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
streamMutexLock(&pActiveInfo->lock);
int32_t num = taosArrayGetSize(pList);
ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) == num);
if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) {
stError("s-task:%s invalid number of sent readyMsg:%d to upstream:%d", id, num,
(int32_t)taosArrayGetSize(pTask->upstreamInfo.pList));
return TSDB_CODE_STREAM_INTERNAL_ERROR;
}
for (int32_t i = 0; i < num; ++i) {
STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, i);
@ -1071,9 +1094,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock) + PAYLOAD_PREFIX_LEN;
ASSERT(dataStrLen > 0);
void* buf = taosMemoryCalloc(1, dataStrLen);
void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) {
return terrno;
}
@ -1221,8 +1242,6 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
int32_t size = taosArrayGetSize(pActiveInfo->pReadyMsgList);
if (size > 0) {
ASSERT(size == 1);
STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, 0);
if (pReady == NULL) {
return terrno;
@ -1292,7 +1311,6 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
stDebug("s-task:%s recv checkpoint-trigger from all upstream, continue", pTask->id.idStr);
pActiveInfo->allUpstreamTriggerRecv = 1;
} else {
ASSERT(numOfRecv <= total);
stDebug("s-task:%s %d/%d checkpoint-trigger recv", pTask->id.idStr, numOfRecv, total);
}
@ -1541,7 +1559,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
if (pMsgInfo->dispatchMsgType == STREAM_INPUT__TRANS_STATE) {
stDebug("s-task:%s dispatch trans-state msgId:%d to downstream successfully, start to prepare transfer state", id,
msgId);
ASSERT(pTask->info.fillHistory == 1);
if (pTask->info.fillHistory != 1) {
stFatal("s-task:%s unexpected dispatch rsp, not scan-history task, not recv this dispatch rsp", id);
}
code = streamTransferStatePrepare(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens
@ -1567,7 +1587,10 @@ static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchR
}
((SMsgHead*)(*pBuf))->vgId = htonl(pReq->upstreamNodeId);
ASSERT(((SMsgHead*)(*pBuf))->vgId != 0);
if (((SMsgHead*)(*pBuf))->vgId == 0) {
return TSDB_CODE_INVALID_MSG;
}
SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT((*pBuf), sizeof(SMsgHead));

View File

@ -45,7 +45,11 @@ static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBl
pTask->outputInfo.smaSink.smaSink(pTask->outputInfo.smaSink.vnode, pTask->outputInfo.smaSink.smaId, pBlock->blocks);
destroyStreamDataBlock(pBlock);
} else {
ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH);
if (type != TASK_OUTPUT__FIXED_DISPATCH && type != TASK_OUTPUT__SHUFFLE_DISPATCH) {
stError("s-task:%s invalid stream output type:%d, internal error", pTask->id.idStr, type);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
}
code = streamTaskPutDataIntoOutputQ(pTask, pBlock);
if (code != TSDB_CODE_SUCCESS) {
destroyStreamDataBlock(pBlock);
@ -127,7 +131,11 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to
if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
SSDataBlock block = {0};
const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)pItem;
ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1);
int32_t num = taosArrayGetSize(pRetrieveBlock->blocks);
if (num != 1) {
stError("s-task:%s invalid retrieve block number:%d, ignore", pTask->id.idStr, num);
continue;
}
(void)assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0));
block.info.type = STREAM_PULL_OVER;
@ -178,7 +186,6 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to
// current output should be dispatched to down stream nodes
if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
ASSERT(numOfBlocks == taosArrayGetSize(pRes));
code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
// todo: here we need continue retry to put it into output buffer
if (code != TSDB_CODE_SUCCESS) {
@ -192,7 +199,6 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to
}
if (numOfBlocks > 0) {
ASSERT(numOfBlocks == taosArrayGetSize(pRes));
code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
} else {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
@ -277,7 +283,10 @@ static SScanhistoryDataInfo buildScanhistoryExecRet(EScanHistoryCode code, int32
}
SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
if(pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
stError("s-task:%s not source scan-history task, not exec, quit", pTask->id.idStr);
return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
}
void* exec = pTask->exec.pExecutor;
bool finished = false;
@ -374,10 +383,16 @@ int32_t streamTransferStateDoPrepare(SStreamTask* pTask) {
// It must be halted for a source stream task, since when the related scan-history-data task start scan the history
// for the step 2.
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
ASSERT(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP);
if (!(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING || status == TASK_STATUS__STOP)) {
stError("s-task:%s invalid task status:%d", id, status);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
}
} else {
ASSERT(status == TASK_STATUS__READY || status == TASK_STATUS__PAUSE || status == TASK_STATUS__DROPPING ||
status == TASK_STATUS__STOP);
if (!(status == TASK_STATUS__READY || status == TASK_STATUS__PAUSE || status == TASK_STATUS__DROPPING ||
status == TASK_STATUS__STOP)) {
stError("s-task:%s invalid task status:%d", id, status);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
}
code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s halt stream task:%s failed, code:%s not transfer state to stream task", id,
@ -438,7 +453,10 @@ int32_t streamTransferStatePrepare(SStreamTask* pTask) {
int32_t code = TSDB_CODE_SUCCESS;
SStreamMeta* pMeta = pTask->pMeta;
ASSERT(pTask->status.appendTranstateBlock == 1);
if (pTask->status.appendTranstateBlock != 1) {
stError("s-task:%s not set appendTransBlock flag, internal error", pTask->id.idStr);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
}
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states.
@ -476,14 +494,16 @@ static int32_t doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int
code = qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput;
code = qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT);
stDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit,
pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver);
ASSERT((*pVer) <= pSubmit->submit.ver);
(*pVer) = pSubmit->submit.ver;
if ((*pVer) > pSubmit->submit.ver) {
stError("s-task:%s invalid recorded ver:%" PRId64 " greater than new block ver:%" PRId64 ", not update", id,
*pVer, pSubmit->submit.ver);
} else {
(*pVer) = pSubmit->submit.ver;
}
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput;
@ -500,8 +520,13 @@ static int32_t doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int
stDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask, numOfBlocks,
pMerged->ver);
code = qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
ASSERT((*pVer) <= pMerged->ver);
(*pVer) = pMerged->ver;
if ((*pVer) > pMerged->ver) {
stError("s-task:%s invalid recorded ver:%" PRId64 " greater than new block ver:%" PRId64 ", not update", id,
*pVer, pMerged->ver);
} else {
(*pVer) = pMerged->ver;
}
} else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
@ -512,7 +537,8 @@ static int32_t doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int
code = qSetMultiStreamInput(pExecutor, pCheckpoint->blocks, 1, pItem->type);
} else {
ASSERT(0);
stError("s-task:%s invalid input block type:%d, discard", id, pItem->type);
code = TSDB_CODE_STREAM_INTERNAL_ERROR;
}
return code;
@ -542,7 +568,6 @@ void streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock)
stDebug("s-task:%s add transfer-state block into outputQ", id);
} else {
stDebug("s-task:%s all upstream tasks send transfer-state block, add transfer-state block into outputQ", id);
ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1);
}
// agg task should dispatch trans-state msg to sink task, to flush all data to sink task.
@ -560,7 +585,6 @@ void streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock)
} else { // non-dispatch task, do task state transfer directly
streamFreeQitem((SStreamQueueItem*)pBlock);
stDebug("s-task:%s non-dispatch task, level:%d start to transfer state directly", id, level);
ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStatePrepare(pTask);
if (code != TSDB_CODE_SUCCESS) {
@ -606,7 +630,11 @@ static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, i
}
// update the currentVer if processing the submit blocks.
ASSERT(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer);
if (!(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer)) {
stError("s-task:%s invalid info, checkpointVer:%" PRId64 ", nextProcessVer:%" PRId64 " currentVer:%" PRId64, id,
pInfo->checkpointVer, pInfo->nextProcessVer, ver);
return;
}
if (ver != pInfo->processedVer) {
stDebug("s-task:%s update processedVer(unsaved) from %" PRId64 " to %" PRId64 " nextProcessVer:%" PRId64
@ -622,8 +650,6 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB
// 1. transfer the ownership of executor state
bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
if (dropRelHTask) {
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask));
STaskId* pHTaskId = &pTask->hTaskInfo.id;
SStreamTask* pHTask = NULL;
int32_t code = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId, &pHTask);
@ -692,12 +718,10 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
EExtractDataCode ret = streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize);
if (ret == EXEC_AFTER_IDLE) {
ASSERT(pInput == NULL && numOfBlocks == 0);
streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL);
return 0;
} else {
if (pInput == NULL) {
ASSERT(numOfBlocks == 0);
return 0;
}
}
@ -718,7 +742,10 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
}
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
ASSERT(type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__CHECKPOINT);
if (type != STREAM_INPUT__DATA_BLOCK && type != STREAM_INPUT__CHECKPOINT) {
stError("s-task:%s invalid block type:%d for sink task, discard", id, type);
continue;
}
int64_t st = taosGetTimestampMs();
@ -801,7 +828,11 @@ bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) {
}
void streamResumeTask(SStreamTask* pTask) {
ASSERT(pTask->status.schedStatus == TASK_SCHED_STATUS__ACTIVE);
if (pTask->status.schedStatus != TASK_SCHED_STATUS__ACTIVE) {
stError("s-task:%s invalid sched status:%d, not resume task", pTask->id.idStr, pTask->status.schedStatus);
return;
}
const char* id = pTask->id.idStr;
while (1) {

View File

@ -719,9 +719,13 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
}
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
size_t size = taosHashGetSize(pMeta->pTasksMap);
ASSERT(taosArrayGetSize(pMeta->pTaskList) == taosHashGetSize(pMeta->pTasksMap));
return (int32_t)size;
int32_t size = (int32_t)taosHashGetSize(pMeta->pTasksMap);
int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList);
if (sizeInList != size) {
stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", pMeta->vgId, sizeInList, size);
}
return size;
}
int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask) {
@ -775,7 +779,10 @@ static void doRemoveIdFromList(SArray* pTaskList, int32_t num, SStreamTaskId* id
break;
}
}
ASSERT(remove);
if (!remove) {
stError("s-task:0x%x not in meta task list, internal error", id->taskId);
}
}
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
@ -787,6 +794,7 @@ static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
SStreamTask* pTask = NULL;
int32_t vgId = pMeta->vgId;
// pre-delete operation
streamMetaWLock(pMeta);
@ -799,19 +807,19 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
// desc the paused task counter
if (streamTaskShouldPause(pTask)) {
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
stInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
stInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", vgId, pTask->id.idStr, num);
}
// handle the dropping event
(void)streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL);
} else {
stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId);
stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", vgId, taskId);
streamMetaWUnLock(pMeta);
return 0;
}
streamMetaWUnLock(pMeta);
stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, pMeta->vgId);
stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, vgId);
while (1) {
int32_t timerActive = 0;
@ -849,10 +857,18 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
doRemoveIdFromList(pMeta->pTaskList, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);
(void)streamMetaRemoveTask(pMeta, &id);
ASSERT(taosHashGetSize(pMeta->pTasksMap) == taosArrayGetSize(pMeta->pTaskList));
int32_t size = (int32_t) taosHashGetSize(pMeta->pTasksMap);
int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList);
if (sizeInList != size) {
stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", vgId, sizeInList, size);
}
streamMetaWUnLock(pMeta);
ASSERT(pTask->status.timerActive == 0);
int32_t numOfTmr = pTask->status.timerActive;
if (numOfTmr != 0) {
stError("s-task:%s vgId:%d invalid timer Active record:%d, internal error", pTask->id.idStr, vgId, numOfTmr);
}
if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt);
(void)taosTmrStop(pTask->schedInfo.pDelayTimer);
@ -862,7 +878,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
streamMetaReleaseTask(pMeta, pTask);
} else {
stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId);
stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", vgId, taskId);
streamMetaWUnLock(pMeta);
}
@ -1062,8 +1078,6 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
if (streamTaskShouldPause(pTask)) {
(void)atomic_add_fetch_32(&pMeta->numOfPausedTasks, 1);
}
ASSERT(pTask->status.downstreamReady == 0);
}
tdbFree(pKey);
@ -1081,7 +1095,6 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
}
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
ASSERT(pMeta->numOfStreamTasks <= numOfTasks && pMeta->numOfPausedTasks <= numOfTasks);
stDebug("vgId:%d load %d tasks into meta from disk completed, streamTask:%d, paused:%d", pMeta->vgId, numOfTasks,
pMeta->numOfStreamTasks, pMeta->numOfPausedTasks);

View File

@ -90,7 +90,6 @@ void streamQueueNextItem(SStreamQueue* pQueue, SStreamQueueItem** pItem) {
int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING);
if (flag == STREAM_QUEUE__FAILED) {
ASSERT(pQueue->qItem != NULL);
*pItem = streamQueueCurItem(pQueue);
} else {
pQueue->qItem = NULL;
@ -105,13 +104,20 @@ void streamQueueNextItem(SStreamQueue* pQueue, SStreamQueueItem** pItem) {
}
void streamQueueProcessSuccess(SStreamQueue* queue) {
ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
return;
}
queue->qItem = NULL;
atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS);
}
void streamQueueProcessFail(SStreamQueue* queue) {
ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) {
stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING);
return;
}
atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
}
@ -229,7 +235,6 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte
}
} else {
if (*pInput == NULL) {
ASSERT((*numOfBlocks) == 0);
*pInput = qItem;
} else { // merge current block failed, let's handle the already merged blocks.
void* newRet = NULL;
@ -340,7 +345,8 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
stDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
} else {
ASSERT(0);
stError("s-task:%s invalid type:%d to put in inputQ", pTask->id.idStr, type);
return TSDB_CODE_INVALID_PARA;
}
if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && type != STREAM_INPUT__CHECKPOINT_TRIGGER &&

View File

@ -111,11 +111,15 @@ void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) {
}
int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
int32_t level = pTask->info.taskLevel;
ETaskStatus status = streamTaskGetStatus(pTask).state;
int32_t level = pTask->info.taskLevel;
SStreamTaskState state = streamTaskGetStatus(pTask);
ASSERT((pTask->status.downstreamReady == 1) && (status == TASK_STATUS__SCAN_HISTORY) &&
(pTask->info.fillHistory == 1));
if (((pTask->status.downstreamReady != 1) || (state.state != TASK_STATUS__SCAN_HISTORY) ||
(pTask->info.fillHistory != 1))) {
stFatal("s-task:%s invalid status:%s to start fill-history task, downReady:%d, is-fill-history task:%d",
pTask->id.idStr, state.name, pTask->status.downstreamReady, pTask->info.fillHistory);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
}
if (level == TASK_LEVEL__SOURCE) {
return doStartScanHistoryTask(pTask);
@ -144,7 +148,6 @@ int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) {
}
SStreamTaskState p = streamTaskGetStatus(pTask);
ASSERT(p.state == TASK_STATUS__READY);
int8_t schedStatus = pTask->status.schedStatus;
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
@ -171,8 +174,6 @@ int32_t streamTaskOnScanHistoryTaskReady(SStreamTask* pTask) {
if (code == 0) {
SStreamTaskState p = streamTaskGetStatus(pTask);
ASSERT((p.state == TASK_STATUS__SCAN_HISTORY) && (pTask->info.fillHistory == 1));
stDebug("s-task:%s fill-history task enters into scan-history data stage, status:%s", pTask->id.idStr, p.name);
code = streamTaskStartScanHistory(pTask);
}
@ -348,8 +349,6 @@ void tryLaunchHistoryTask(void* param, void* tmrId) {
}
if (streamTaskShouldStop(*ppTask)) {
ASSERT((*ppTask)->status.timerActive >= 1);
char* p = streamTaskGetStatus(*ppTask).name;
int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1);
stDebug("s-task:%s status:%s should stop, quit launch fill-history task timer, retry:%d, ref:%d",
@ -385,7 +384,10 @@ void tryLaunchHistoryTask(void* param, void* tmrId) {
notRetryLaunchFillHistoryTask(pTask, pInfo, now);
} else { // not reach the limitation yet, let's continue retrying launch related fill-history task.
streamTaskSetRetryInfoForLaunch(pHTaskInfo);
ASSERT(pTask->status.timerActive >= 1);
if (pTask->status.timerActive < 1) {
stError("s-task:%s invalid timerActive recorder:%d, abort timer", pTask->id.idStr, pTask->status.timerActive);
return;
}
// abort the timer if intend to stop task
SStreamTask* pHTask = NULL;
@ -451,8 +453,6 @@ int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
int32_t hTaskId = pTask->hTaskInfo.id.taskId;
SLaunchHTaskInfo* pInfo = NULL;
ASSERT(hTaskId != 0);
stWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since not built yet", idStr, pMeta->vgId, hTaskId);
STaskId id = streamTaskGetTaskId(pTask);
@ -480,11 +480,18 @@ int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
return terrno;
}
ASSERT(ref >= 1);
if (ref < 1) {
stError("s-task:%s invalid timerActive recorder:%d, abort timer", pTask->id.idStr, pTask->status.timerActive);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
}
stDebug("s-task:%s set timer active flag, ref:%d", idStr, ref);
} else { // timer exists
ASSERT(pTask->status.timerActive >= 1);
if (pTask->status.timerActive < 1) {
stError("s-task:%s invalid timerActive recorder:%d, abort timer", pTask->id.idStr, pTask->status.timerActive);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
}
stDebug("s-task:%s set timer active flag, task timer not null", idStr);
streamTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer,
pTask->pMeta->vgId, " start-history-task-tmr");
@ -500,7 +507,11 @@ int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask) {
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t nextProcessVer) {
SVersionRange* pRange = &pTask->dataRange.range;
ASSERT(nextProcessVer >= pRange->maxVer);
if (nextProcessVer < pRange->maxVer) {
stError("s-task:%s next processdVer:%"PRId64" is less than range max ver:%"PRId64, pTask->id.idStr, nextProcessVer,
pRange->maxVer);
return true;
}
// maxVer for fill-history task is the version, where the last timestamp is acquired.
// it's also the maximum version to scan data in tsdb.
@ -538,7 +549,11 @@ int32_t streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
} else {
ASSERT(pTask->info.fillHistory == 0);
if (pTask->info.fillHistory != 0) {
stError("s-task:%s task should not be fill-history task, internal error", pTask->id.idStr);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
}
if (pTask->info.taskLevel >= TASK_LEVEL__AGG) {
return TSDB_CODE_SUCCESS;
}

View File

@ -134,8 +134,9 @@ int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool
return code;
}
if (fillHistory) {
ASSERT(hasFillhistory);
if (fillHistory && !hasFillhistory) {
stError("s-task:0x%x create task failed, due to inconsistent fill-history flag", pTask->id.taskId);
return TSDB_CODE_INVALID_PARA;
}
epsetAssign(&(pTask->info.mnodeEpset), pEpset);
@ -728,8 +729,11 @@ void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
if ((pInfo != NULL) && pInfo->dataAllowed) {
pInfo->dataAllowed = false;
int32_t t = atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
ASSERT(t <= streamTaskGetNumOfUpstream(pTask));
if (pTask->upstreamInfo.numOfClosed < streamTaskGetNumOfUpstream(pTask)) {
int32_t t = atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
} else {
stError("s-task:%s not inc closed input, since they have been all closed already", pTask->id.idStr);
}
}
}
@ -739,7 +743,7 @@ void streamTaskOpenUpstreamInput(SStreamTask* pTask, int32_t taskId) {
if (pInfo != NULL && (!pInfo->dataAllowed)) {
int32_t t = atomic_sub_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
ASSERT(t >= 0);
stDebug("s-task:%s open inputQ for upstream:0x%x, remain closed:%d", pTask->id.idStr, taskId, t);
pInfo->dataAllowed = true;
}
}
@ -775,8 +779,6 @@ int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask) {
int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) {
streamMutexLock(&pTask->lock);
int8_t status = pTask->status.schedStatus;
ASSERT(status == TASK_SCHED_STATUS__WAITING || status == TASK_SCHED_STATUS__ACTIVE ||
status == TASK_SCHED_STATUS__INACTIVE);
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
streamMutexUnlock(&pTask->lock);
@ -892,8 +894,6 @@ void streamTaskInitForLaunchHTask(SHistoryTaskInfo* pInfo) {
}
void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo) {
ASSERT(pInfo->tickCount == 0);
pInfo->waitInterval *= RETRY_LAUNCH_INTERVAL_INC_RATE;
pInfo->tickCount = ceil(pInfo->waitInterval / WAIT_FOR_MINIMAL_INTERVAL);
pInfo->retryTimes += 1;

View File

@ -194,7 +194,6 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName,
void* px = taosArrayPop(pSM->pWaitingEventList);
STaskStateTrans* pNextTrans = streamTaskFindTransform(pSM->current.state, pEvtInfo->event);
ASSERT(pSM->pActiveTrans == NULL && pNextTrans != NULL);
pSM->pActiveTrans = pNextTrans;
pSM->startTs = taosGetTimestampMs();