diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 4c68d1f667..8956801bd4 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -207,8 +207,6 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) { atomic_store_8(&queue->status, STREAM_QUEUE__FAILED); } -static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; } - void* streamQueueNextItem(SStreamQueue* queue); SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 8bbbd3524d..434f2924d1 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -112,7 +112,7 @@ void resetTaskInfo(qTaskInfo_t tinfo) { clearStreamBlock(pTaskInfo->pRoot); } -static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { +static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->numOfDownstream == 0) { qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id); @@ -129,7 +129,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu pOperator->status = OP_NOT_OPENED; SStreamScanInfo* pInfo = pOperator->info; - qDebug("s-task set source blocks:%d %s", (int32_t)numOfBlocks, id); + qDebug("s-task:%s set source blocks:%d", id, (int32_t)numOfBlocks); ASSERT(pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0); if (type == STREAM_INPUT__MERGED_SUBMIT) { @@ -144,9 +144,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } else if (type == STREAM_INPUT__DATA_BLOCK) { for (int32_t i = 0; i < numOfBlocks; ++i) { SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; - SPackedData tmp = { - .pDataBlock = pDataBlock, - }; + SPackedData tmp = { .pDataBlock = pDataBlock }; taosArrayPush(pInfo->pBlockLists, &tmp); } pInfo->blockType = STREAM_INPUT__DATA_BLOCK; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 9a39bb09dc..0286df2a9a 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -345,6 +345,8 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { return 0; } +static void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; } + void* streamQueueNextItem(SStreamQueue* queue) { int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING); if (dequeueFlag == STREAM_QUEUE__FAILED) { diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 7fb35ad2ad..e574cdbe8a 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -159,7 +159,8 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* return dst; } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { SStreamMergedSubmit2* pMerged = streamMergedSubmitNew(); - ASSERT(pMerged); + // todo handle error + streamMergeSubmit(pMerged, (SStreamDataSubmit2*)dst); streamMergeSubmit(pMerged, (SStreamDataSubmit2*)pElem); taosFreeQitem(dst); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index bd06f96798..ff51a5a6ae 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -66,7 +66,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* SArray* pBlockList = pMerged->submits; int32_t numOfBlocks = taosArrayGetSize(pBlockList); - qDebug("st-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks); + qDebug("s-task:%s %p set submit input (merged), numOfblocks:%d", pTask->id.idStr, pTask, numOfBlocks); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)data; @@ -259,9 +259,10 @@ int32_t streamExecForAll(SStreamTask* pTask) { int32_t code = 0; while (1) { int32_t batchSize = 1; - void* pInput = NULL; int16_t times = 0; + SStreamQueueItem* pInput = NULL; + // merge multiple input data if possible in the input queue. qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr); @@ -274,6 +275,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { qDebug("===stream===try agian batchSize:%d", batchSize); continue; } + qDebug("===stream===break batchSize:%d", batchSize); break; } @@ -285,6 +287,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { break; } } else { + // todo we need to sort the data block, instead of just appending into the array list. void* newRet = NULL; if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) { streamQueueProcessFail(pTask->inputQueue); @@ -304,6 +307,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { if (pInput) { streamFreeQitem(pInput); } + return 0; } @@ -312,14 +316,14 @@ int32_t streamExecForAll(SStreamTask* pTask) { } if (pTask->taskLevel == TASK_LEVEL__SINK) { - ASSERT(((SStreamQueueItem*)pInput)->type == STREAM_INPUT__DATA_BLOCK); + ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK); qDebug("s-task:%s sink node start to sink result. numOfBlocks:%d", pTask->id.idStr, batchSize); - streamTaskOutput(pTask, pInput); + streamTaskOutput(pTask, (SStreamDataBlock*)pInput); continue; } SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); - qDebug("s-task:%s start to execute, numOfBlocks:%d", pTask->id.idStr, batchSize); + qDebug("s-task:%s start to execute, block batches:%d", pTask->id.idStr, batchSize); streamTaskExecImpl(pTask, pInput, pRes);