fix:stream scan handles empty datablcok
This commit is contained in:
parent
6bfcc2e4c3
commit
b30c56bf7c
|
@ -1052,6 +1052,9 @@ static uint64_t getGroupIdByData(SStreamScanInfo* pInfo, uint64_t uid, TSKEY ts,
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) {
|
static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) {
|
||||||
|
if (pBlock->info.rows == 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
if ((*pRowIndex) == pBlock->info.rows) {
|
if ((*pRowIndex) == pBlock->info.rows) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -1184,10 +1187,10 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
|
static int32_t generateSessionScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, SSDataBlock* pDestBlock) {
|
||||||
|
blockDataCleanup(pDestBlock);
|
||||||
if (pSrcBlock->info.rows == 0) {
|
if (pSrcBlock->info.rows == 0) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
blockDataCleanup(pDestBlock);
|
|
||||||
int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
|
int32_t code = blockDataEnsureCapacity(pDestBlock, pSrcBlock->info.rows);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -1837,6 +1840,12 @@ FETCH_NEXT_BLOCK:
|
||||||
}
|
}
|
||||||
setBlockGroupIdByUid(pInfo, pDelBlock);
|
setBlockGroupIdByUid(pInfo, pDelBlock);
|
||||||
printDataBlock(pDelBlock, "stream scan delete recv filtered");
|
printDataBlock(pDelBlock, "stream scan delete recv filtered");
|
||||||
|
if (pDelBlock->info.rows == 0) {
|
||||||
|
if (pInfo->tqReader) {
|
||||||
|
blockDataDestroy(pDelBlock);
|
||||||
|
}
|
||||||
|
goto FETCH_NEXT_BLOCK;
|
||||||
|
}
|
||||||
if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
|
if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) {
|
||||||
generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
|
generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes);
|
||||||
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
|
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT;
|
||||||
|
|
Loading…
Reference in New Issue