fix: fix bug for TD-17801
This commit is contained in:
parent
352965f76b
commit
8833a8bdfb
|
@ -68,9 +68,9 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
|
||||||
pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock;
|
pInfo->mergeDataBlocks = pProjPhyNode->mergeDataBlock;
|
||||||
|
|
||||||
// todo remove it soon
|
// todo remove it soon
|
||||||
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
|
// if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
|
||||||
pInfo->mergeDataBlocks = true;
|
// pInfo->mergeDataBlocks = true;
|
||||||
}
|
// }
|
||||||
|
|
||||||
int32_t numOfRows = 4096;
|
int32_t numOfRows = 4096;
|
||||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||||
|
@ -181,6 +181,16 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
|
||||||
return PROJECT_RETRIEVE_DONE;
|
return PROJECT_RETRIEVE_DONE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void printDataBlock1(SSDataBlock* pBlock, const char* flag) {
|
||||||
|
if (!pBlock || pBlock->info.rows == 0) {
|
||||||
|
qDebug("===stream===printDataBlock: Block is Null or Empty");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
char* pBuf = NULL;
|
||||||
|
qDebug("%s", dumpBlockData(pBlock, flag, &pBuf));
|
||||||
|
taosMemoryFree(pBuf);
|
||||||
|
}
|
||||||
|
|
||||||
SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||||
SProjectOperatorInfo* pProjectInfo = pOperator->info;
|
SProjectOperatorInfo* pProjectInfo = pOperator->info;
|
||||||
SOptrBasicInfo* pInfo = &pProjectInfo->binfo;
|
SOptrBasicInfo* pInfo = &pProjectInfo->binfo;
|
||||||
|
@ -229,6 +239,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
// for stream interval
|
// for stream interval
|
||||||
if (pBlock->info.type == STREAM_RETRIEVE) {
|
if (pBlock->info.type == STREAM_RETRIEVE) {
|
||||||
|
printDataBlock1(pBlock, "project1");
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -302,7 +313,8 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||||
if (pOperator->cost.openCost == 0) {
|
if (pOperator->cost.openCost == 0) {
|
||||||
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
|
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
printDataBlock1(p, "project");
|
||||||
return (p->info.rows > 0) ? p : NULL;
|
return (p->info.rows > 0) ? p : NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
|
||||||
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
|
} else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
ASSERT(pTask->isDataScan);
|
ASSERT(pTask->isDataScan);
|
||||||
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
|
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
|
||||||
qDebug("task %d %p set submit input %p %p %d", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef);
|
qDebug("task %d %p set submit input %p %p %d 1", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef);
|
||||||
qSetStreamInput(exec, pSubmit->data, STREAM_INPUT__DATA_SUBMIT, false);
|
qSetStreamInput(exec, pSubmit->data, STREAM_INPUT__DATA_SUBMIT, false);
|
||||||
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
|
} else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) {
|
||||||
SStreamDataBlock* pBlock = (SStreamDataBlock*)data;
|
SStreamDataBlock* pBlock = (SStreamDataBlock*)data;
|
||||||
|
@ -72,6 +72,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
qDebug("task %d(child %d) executed and get block");
|
||||||
|
|
||||||
SSDataBlock block = {0};
|
SSDataBlock block = {0};
|
||||||
assignOneDataBlock(&block, output);
|
assignOneDataBlock(&block, output);
|
||||||
block.info.childId = pTask->selfChildId;
|
block.info.childId = pTask->selfChildId;
|
||||||
|
@ -188,7 +190,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
|
||||||
if (pTask->execType == TASK_EXEC__NONE) {
|
if (pTask->execType == TASK_EXEC__NONE) {
|
||||||
ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK);
|
ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK);
|
||||||
streamTaskOutput(pTask, data);
|
streamTaskOutput(pTask, data);
|
||||||
return pRes;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt);
|
qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt);
|
||||||
|
|
Loading…
Reference in New Issue