refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-05-10 22:22:01 +08:00
parent c6e5879f44
commit dbd79cc80c
5 changed files with 16 additions and 13 deletions

View File

@ -207,8 +207,6 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) {
atomic_store_8(&queue->status, STREAM_QUEUE__FAILED); atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
} }
static FORCE_INLINE void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; }
void* streamQueueNextItem(SStreamQueue* queue); void* streamQueueNextItem(SStreamQueue* queue);
SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type); SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type);

View File

@ -112,7 +112,7 @@ void resetTaskInfo(qTaskInfo_t tinfo) {
clearStreamBlock(pTaskInfo->pRoot); clearStreamBlock(pTaskInfo->pRoot);
} }
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) { static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, const char* id) {
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if (pOperator->numOfDownstream == 0) { if (pOperator->numOfDownstream == 0) {
qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id); qError("failed to find stream scan operator to set the input data block, %s" PRIx64, id);
@ -129,7 +129,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
qDebug("s-task set source blocks:%d %s", (int32_t)numOfBlocks, id); qDebug("s-task:%s set source blocks:%d", id, (int32_t)numOfBlocks);
ASSERT(pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0); ASSERT(pInfo->validBlockIndex == 0 && taosArrayGetSize(pInfo->pBlockLists) == 0);
if (type == STREAM_INPUT__MERGED_SUBMIT) { if (type == STREAM_INPUT__MERGED_SUBMIT) {
@ -144,9 +144,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
} else if (type == STREAM_INPUT__DATA_BLOCK) { } else if (type == STREAM_INPUT__DATA_BLOCK) {
for (int32_t i = 0; i < numOfBlocks; ++i) { for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i];
SPackedData tmp = { SPackedData tmp = { .pDataBlock = pDataBlock };
.pDataBlock = pDataBlock,
};
taosArrayPush(pInfo->pBlockLists, &tmp); taosArrayPush(pInfo->pBlockLists, &tmp);
} }
pInfo->blockType = STREAM_INPUT__DATA_BLOCK; pInfo->blockType = STREAM_INPUT__DATA_BLOCK;

View File

@ -345,6 +345,8 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
return 0; return 0;
} }
static void* streamQueueCurItem(SStreamQueue* queue) { return queue->qItem; }
void* streamQueueNextItem(SStreamQueue* queue) { void* streamQueueNextItem(SStreamQueue* queue) {
int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING); int8_t dequeueFlag = atomic_exchange_8(&queue->status, STREAM_QUEUE__PROCESSING);
if (dequeueFlag == STREAM_QUEUE__FAILED) { if (dequeueFlag == STREAM_QUEUE__FAILED) {

View File

@ -159,7 +159,8 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem*
return dst; return dst;
} else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit2* pMerged = streamMergedSubmitNew(); SStreamMergedSubmit2* pMerged = streamMergedSubmitNew();
ASSERT(pMerged); // todo handle error
streamMergeSubmit(pMerged, (SStreamDataSubmit2*)dst); streamMergeSubmit(pMerged, (SStreamDataSubmit2*)dst);
streamMergeSubmit(pMerged, (SStreamDataSubmit2*)pElem); streamMergeSubmit(pMerged, (SStreamDataSubmit2*)pElem);
taosFreeQitem(dst); taosFreeQitem(dst);

View File

@ -66,7 +66,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray*
SArray* pBlockList = pMerged->submits; SArray* pBlockList = pMerged->submits;
int32_t numOfBlocks = taosArrayGetSize(pBlockList); int32_t numOfBlocks = taosArrayGetSize(pBlockList);
qDebug("st-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks); qDebug("s-task:%s %p set submit input (merged), numOfblocks:%d", pTask->id.idStr, pTask, numOfBlocks);
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
} else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)data; const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)data;
@ -259,9 +259,10 @@ int32_t streamExecForAll(SStreamTask* pTask) {
int32_t code = 0; int32_t code = 0;
while (1) { while (1) {
int32_t batchSize = 1; int32_t batchSize = 1;
void* pInput = NULL;
int16_t times = 0; int16_t times = 0;
SStreamQueueItem* pInput = NULL;
// merge multiple input data if possible in the input queue. // merge multiple input data if possible in the input queue.
qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr); qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr);
@ -274,6 +275,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
qDebug("===stream===try agian batchSize:%d", batchSize); qDebug("===stream===try agian batchSize:%d", batchSize);
continue; continue;
} }
qDebug("===stream===break batchSize:%d", batchSize); qDebug("===stream===break batchSize:%d", batchSize);
break; break;
} }
@ -285,6 +287,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
break; break;
} }
} else { } else {
// todo we need to sort the data block, instead of just appending into the array list.
void* newRet = NULL; void* newRet = NULL;
if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) { if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) {
streamQueueProcessFail(pTask->inputQueue); streamQueueProcessFail(pTask->inputQueue);
@ -304,6 +307,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
if (pInput) { if (pInput) {
streamFreeQitem(pInput); streamFreeQitem(pInput);
} }
return 0; return 0;
} }
@ -312,14 +316,14 @@ int32_t streamExecForAll(SStreamTask* pTask) {
} }
if (pTask->taskLevel == TASK_LEVEL__SINK) { if (pTask->taskLevel == TASK_LEVEL__SINK) {
ASSERT(((SStreamQueueItem*)pInput)->type == STREAM_INPUT__DATA_BLOCK); ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
qDebug("s-task:%s sink node start to sink result. numOfBlocks:%d", pTask->id.idStr, batchSize); qDebug("s-task:%s sink node start to sink result. numOfBlocks:%d", pTask->id.idStr, batchSize);
streamTaskOutput(pTask, pInput); streamTaskOutput(pTask, (SStreamDataBlock*)pInput);
continue; continue;
} }
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
qDebug("s-task:%s start to execute, numOfBlocks:%d", pTask->id.idStr, batchSize); qDebug("s-task:%s start to execute, block batches:%d", pTask->id.idStr, batchSize);
streamTaskExecImpl(pTask, pInput, pRes); streamTaskExecImpl(pTask, pInput, pRes);