diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 5b03db6f0c..ae53e0cb65 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -378,6 +378,56 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } +// set input +static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_t* pCurrentVer, const char* id) { + void* pExecutor = pTask->exec.pExecutor; + + const SStreamQueueItem* pItem = pInput; + if (pItem->type == STREAM_INPUT__GET_RES) { + const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput; + qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK); + + } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { + ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); + const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput; + qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); + qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit, + pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); + ASSERT((*pCurrentVer) < pSubmit->submit.ver); + (*pCurrentVer) = pSubmit->submit.ver; + + } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { + const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput; + + SArray* pBlockList = pBlock->blocks; + int32_t numOfBlocks = taosArrayGetSize(pBlockList); + qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, id, numOfBlocks, pBlock->sourceVer); + qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK); + + } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { + const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput; + + SArray* pBlockList = pMerged->submits; + int32_t numOfBlocks = taosArrayGetSize(pBlockList); + qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask, + numOfBlocks, pMerged->ver); + qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); + ASSERT((*pCurrentVer) < pMerged->ver); + (*pCurrentVer) = pMerged->ver; + + } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { + const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput; + qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK); + + } else if (pItem->type == STREAM_INPUT__CHECKPOINT) { + const SStreamCheckpoint* pCheckpoint = (const SStreamCheckpoint*)pInput; + qSetMultiStreamInput(pExecutor, pCheckpoint->pBlock, 1, STREAM_INPUT__CHECKPOINT); + + } else { + ASSERT(0); + } +} + /** * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * appropriate batch of blocks should be handled in 5 to 10 sec. @@ -418,52 +468,12 @@ int32_t streamExecForAll(SStreamTask* pTask) { } int64_t st = taosGetTimestampMs(); - qDebug("s-task:%s start to process batch of blocks, num:%d", id, batchSize); + + const SStreamQueueItem* pItem = pInput; + qDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, batchSize, pItem->type); + int64_t currentVer = pTask->chkInfo.currentVer; - - { - // set input - void* pExecutor = pTask->exec.pExecutor; - - const SStreamQueueItem* pItem = pInput; - if (pItem->type == STREAM_INPUT__GET_RES) { - const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput; - qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK); - } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { - ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); - const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput; - qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); - qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit, - pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); - ASSERT(currentVer < pSubmit->submit.ver); - currentVer = pSubmit->submit.ver; - } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { - const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput; - - SArray* pBlockList = pBlock->blocks; - int32_t numOfBlocks = taosArrayGetSize(pBlockList); - qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, id, numOfBlocks, pBlock->sourceVer); - qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK); - } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { - const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput; - - SArray* pBlockList = pMerged->submits; - int32_t numOfBlocks = taosArrayGetSize(pBlockList); - qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask, - numOfBlocks, pMerged->ver); - qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); - ASSERT(currentVer < pMerged->ver); - currentVer = pMerged->ver; - } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { - const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput; - qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK); - } else if (pItem->type == STREAM_INPUT__CHECKPOINT) { - const SStreamCheckpoint* pCheckpoint = (const SStreamCheckpoint*)pInput; - qSetMultiStreamInput(pExecutor, pCheckpoint->pBlock, 1, STREAM_INPUT__CHECKPOINT); - } else { - ASSERT(0); - } - } + doSetStreamInputBlock(pTask, pInput, ¤tVer, id); int64_t resSize = 0; int32_t totalBlocks = 0; @@ -523,7 +533,6 @@ int32_t streamTryExec(SStreamTask* pTask) { return -1; } - // todo the task should be commit here atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); qDebug("s-task:%s exec completed, status:%s, sched-status:%d", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); @@ -551,17 +560,22 @@ int32_t streamTryExec(SStreamTask* pTask) { if (code == TSDB_CODE_SUCCESS) { taosWLockLatch(&pTask->pMeta->lock); + ASSERT(pTask->chkInfo.keptCheckpointId < pTask->checkpointingId); + pTask->chkInfo.keptCheckpointId = pTask->checkpointingId; + streamMetaSaveTask(pTask->pMeta, pTask); if (streamMetaCommit(pTask->pMeta) < 0) { taosWUnLockLatch(&pTask->pMeta->lock); - qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr()); + qError("s-task:%s failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", ver:%" PRId64 + ", since %s", + pTask->id.idStr, pTask->chkInfo.keptCheckpointId, pTask->chkInfo.version, terrstr()); return -1; } else { taosWUnLockLatch(&pTask->pMeta->lock); - qDebug("s-task:%s commit after checkpoint generating", pTask->id.idStr); } - qInfo("vgId:%d s-task:%s commit task status after checkpoint completed", pMeta->vgId, pTask->id.idStr); + qInfo("vgId:%d s-task:%s commit task status after checkpoint completed, checkpointId:%" PRId64 ", ver:%" PRId64, + pMeta->vgId, pTask->id.idStr, pTask->chkInfo.keptCheckpointId, pTask->chkInfo.version); } else { // todo: let's retry send rsp to upstream/mnode }