feat(stream): adjust stream scan
This commit is contained in:
parent
1b69d30a16
commit
b28553395f
|
@ -402,8 +402,6 @@ typedef struct SStreamScanInfo {
|
|||
uint64_t numOfExec; // execution times
|
||||
STqReader* tqReader;
|
||||
|
||||
int32_t tsArrayIndex;
|
||||
SArray* tsArray;
|
||||
uint64_t groupId;
|
||||
SUpdateInfo* pUpdateInfo;
|
||||
|
||||
|
|
|
@ -1247,30 +1247,38 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
return pBlock;
|
||||
} else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
|
||||
qDebug("scan mode %d", pInfo->scanMode);
|
||||
if (pInfo->scanMode == STREAM_SCAN_FROM_RES) {
|
||||
blockDataDestroy(pInfo->pUpdateRes);
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
return pInfo->pRes;
|
||||
} else if (pInfo->scanMode == STREAM_SCAN_FROM_UPDATERES) {
|
||||
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
||||
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
||||
return pInfo->pUpdateRes;
|
||||
} else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE || pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RETRIEVE) {
|
||||
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
||||
if (pSDB) {
|
||||
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
|
||||
checkUpdateData(pInfo, true, pSDB, false);
|
||||
return pSDB;
|
||||
}
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
switch (pInfo->scanMode) {
|
||||
case STREAM_SCAN_FROM_RES: {
|
||||
blockDataDestroy(pInfo->pUpdateRes);
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
return pInfo->pRes;
|
||||
} break;
|
||||
case STREAM_SCAN_FROM_UPDATERES: {
|
||||
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
|
||||
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
||||
return pInfo->pUpdateRes;
|
||||
} break;
|
||||
case STREAM_SCAN_FROM_DATAREADER_RANGE:
|
||||
case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {
|
||||
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
||||
if (pSDB) {
|
||||
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
|
||||
checkUpdateData(pInfo, true, pSDB, false);
|
||||
return pSDB;
|
||||
}
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
} break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
if (isStateWindow(pInfo) && pInfo->sessionSup.pStreamAggSup->pScanBlock->info.rows > 0) {
|
||||
SStreamAggSupporter* pSup = pInfo->sessionSup.pStreamAggSup;
|
||||
if (isStateWindow(pInfo) && pSup->pScanBlock->info.rows > 0) {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
||||
pInfo->updateResIndex = 0;
|
||||
copyDataBlock(pInfo->pUpdateRes, pInfo->sessionSup.pStreamAggSup->pScanBlock);
|
||||
blockDataCleanup(pInfo->sessionSup.pStreamAggSup->pScanBlock);
|
||||
copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock);
|
||||
blockDataCleanup(pSup->pScanBlock);
|
||||
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
||||
return pInfo->pUpdateRes;
|
||||
}
|
||||
|
@ -1329,7 +1337,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
|
||||
/*pOperator->status = OP_EXEC_DONE;*/
|
||||
} else if (pInfo->pUpdateInfo) {
|
||||
pInfo->tsArrayIndex = 0;
|
||||
checkUpdateData(pInfo, true, pInfo->pRes, true);
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlockInfo->window.ekey);
|
||||
if (pInfo->pUpdateDataRes->info.rows > 0) {
|
||||
|
@ -1387,7 +1394,7 @@ static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
#if 1
|
||||
if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
|
||||
STableScanInfo* pTableScanInfo = pStreamScan->pTableScanOp->info;
|
||||
destroyTableScanOperatorInfo(pTableScanInfo, 1);
|
||||
destroyTableScanOperatorInfo(pTableScanInfo, numOfOutput);
|
||||
}
|
||||
#endif
|
||||
if (pStreamScan->tqReader) {
|
||||
|
@ -1401,8 +1408,8 @@ static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
blockDataDestroy(pStreamScan->pUpdateRes);
|
||||
blockDataDestroy(pStreamScan->pPullDataRes);
|
||||
blockDataDestroy(pStreamScan->pDeleteDataRes);
|
||||
blockDataDestroy(pStreamScan->pUpdateDataRes);
|
||||
taosArrayDestroy(pStreamScan->pBlockLists);
|
||||
taosArrayDestroy(pStreamScan->tsArray);
|
||||
taosMemoryFree(pStreamScan);
|
||||
}
|
||||
|
||||
|
@ -1444,11 +1451,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->tsArray = taosArrayInit(4, sizeof(int32_t));
|
||||
if (pInfo->tsArray == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
if (pHandle->vnode) {
|
||||
SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
|
||||
STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info;
|
||||
|
|
|
@ -1648,6 +1648,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
}
|
||||
}
|
||||
nodesDestroyNode((SNode*)pInfo->pPhyNode);
|
||||
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
||||
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
@ -2934,9 +2935,10 @@ SSDataBlock* createSpecialDataBlock(EStreamType type) {
|
|||
pBlock->info.groupId = 0;
|
||||
pBlock->info.rows = 0;
|
||||
pBlock->info.type = type;
|
||||
pBlock->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t);
|
||||
pBlock->info.rowSize = sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) +
|
||||
sizeof(uint64_t) + sizeof(TSKEY) + sizeof(TSKEY);
|
||||
|
||||
pBlock->pDataBlock = taosArrayInit(3, sizeof(SColumnInfoData));
|
||||
pBlock->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData));
|
||||
SColumnInfoData infoData = {0};
|
||||
infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
infoData.info.bytes = sizeof(TSKEY);
|
||||
|
|
|
@ -207,6 +207,7 @@ void updateInfoDestroy(SUpdateInfo *pInfo) {
|
|||
}
|
||||
|
||||
taosArrayDestroy(pInfo->pTsSBFs);
|
||||
taosHashCleanup(pInfo->pMap);
|
||||
taosMemoryFree(pInfo);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue