Merge branch 'enh/dv3' into feature/TD-19042

This commit is contained in:
yihaoDeng 2023-06-02 06:01:49 +00:00
commit 04996a6463
2 changed files with 7 additions and 6 deletions

View File

@ -1002,9 +1002,10 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = pInfo->pSrcBlock; SSDataBlock* pBlock = pInfo->pSrcBlock;
uint64_t groupId = pBlock->info.id.groupId; uint64_t groupId = pBlock->info.id.groupId;
SSDataBlock* pRes = pInfo->pRes; SSDataBlock* pRes = pInfo->pRes;
SColumnInfoData* pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol);
TSKEY* tsCol = (TSKEY*)pTsCol->pData;
pRes->info.id.groupId = groupId; pRes->info.id.groupId = groupId;
SColumnInfoData* pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol); pInfo->srcRowIndex++;
TSKEY* tsCol = (TSKEY*)pTsCol->pData;
if (pInfo->srcRowIndex == 0) { if (pInfo->srcRowIndex == 0) {
keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize); keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize);
@ -1242,7 +1243,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
SSDataBlock* fillResult = NULL; SSDataBlock* fillResult = NULL;
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) { while (1) {
if (pInfo->srcRowIndex >= pInfo->pSrcBlock->info.rows) { if (pInfo->srcRowIndex >= pInfo->pSrcBlock->info.rows || pInfo->pSrcBlock->info.rows == 0) {
// If there are delete datablocks, we receive them first. // If there are delete datablocks, we receive them first.
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) { if (pBlock == NULL) {
@ -1281,7 +1282,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) {
case STREAM_PULL_DATA: { case STREAM_PULL_DATA: {
doApplyStreamScalarCalculation(pOperator, pBlock, pInfo->pSrcBlock); doApplyStreamScalarCalculation(pOperator, pBlock, pInfo->pSrcBlock);
memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN); memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
pInfo->srcRowIndex = 0; pInfo->srcRowIndex = -1;
} break; } break;
case STREAM_CREATE_CHILD_TABLE: { case STREAM_CREATE_CHILD_TABLE: {
return pBlock; return pBlock;
@ -1497,7 +1498,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
goto _error; goto _error;
} }
pInfo->srcRowIndex = 0; pInfo->srcRowIndex = -1;
setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo, setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo,
pTaskInfo); pTaskInfo);
pOperator->fpSet = pOperator->fpSet =

View File

@ -2904,7 +2904,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin
SStreamScanInfo* pScanInfo = downstream->info; SStreamScanInfo* pScanInfo = downstream->info;
pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type}; pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type};
pScanInfo->pState = pAggSup->pState; pScanInfo->pState = pAggSup->pState;
if ((!pScanInfo->igCheckUpdate || type == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) && !pScanInfo->pUpdateInfo) { if (!pScanInfo->pUpdateInfo) {
pScanInfo->pUpdateInfo = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark); pScanInfo->pUpdateInfo = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark);
} }
pScanInfo->twAggSup = *pTwSup; pScanInfo->twAggSup = *pTwSup;