Merge pull request #14459 from taosdata/feature/TD-16993
fix(stream): project ignore retrive datablock
This commit is contained in:
commit
8a76abc3bf
|
@ -56,7 +56,8 @@ typedef enum EStreamType {
|
||||||
STREAM_CLEAR,
|
STREAM_CLEAR,
|
||||||
STREAM_INVALID,
|
STREAM_INVALID,
|
||||||
STREAM_GET_ALL,
|
STREAM_GET_ALL,
|
||||||
STREAM_DELETE,
|
STREAM_DELETE_RESULT,
|
||||||
|
STREAM_DELETE_DATA,
|
||||||
STREAM_RETRIEVE,
|
STREAM_RETRIEVE,
|
||||||
STREAM_PULL_DATA,
|
STREAM_PULL_DATA,
|
||||||
STREAM_PULL_OVER,
|
STREAM_PULL_OVER,
|
||||||
|
|
|
@ -1259,6 +1259,7 @@ int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
|
||||||
|
|
||||||
dst->info.rows = src->info.rows;
|
dst->info.rows = src->info.rows;
|
||||||
dst->info.window = src->info.window;
|
dst->info.window = src->info.window;
|
||||||
|
dst->info.type = src->info.type;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3253,6 +3253,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
if (pBlock->info.type == STREAM_RETRIEVE) {
|
||||||
|
// for stream interval
|
||||||
|
return pBlock;
|
||||||
|
}
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
|
int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
|
||||||
|
|
|
@ -807,6 +807,23 @@ static bool isStateWindow(SStreamBlockScanInfo* pInfo) {
|
||||||
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
|
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void setGroupId(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
|
||||||
|
ASSERT(rowIndex < pBlock->info.rows);
|
||||||
|
switch (pBlock->info.type)
|
||||||
|
{
|
||||||
|
case STREAM_RETRIEVE: {
|
||||||
|
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
|
||||||
|
uint64_t* groupCol = (uint64_t*)pColInfo->pData;
|
||||||
|
pInfo->groupId = groupCol[rowIndex];
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case STREAM_DELETE_DATA:
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
|
static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
|
||||||
STimeWindow win = {
|
STimeWindow win = {
|
||||||
.skey = INT64_MIN,
|
.skey = INT64_MIN,
|
||||||
|
@ -829,6 +846,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3
|
||||||
} else {
|
} else {
|
||||||
win =
|
win =
|
||||||
getActiveTimeWindow(NULL, &dumyInfo, tsCols[(*pRowIndex)], &pInfo->interval, pInfo->interval.precision, NULL);
|
getActiveTimeWindow(NULL, &dumyInfo, tsCols[(*pRowIndex)], &pInfo->interval, pInfo->interval.precision, NULL);
|
||||||
|
setGroupId(pInfo, pSDB, 2, *pRowIndex);
|
||||||
(*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, (*pRowIndex), win.ekey, binarySearchForKey, NULL,
|
(*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, (*pRowIndex), win.ekey, binarySearchForKey, NULL,
|
||||||
TSDB_ORDER_ASC);
|
TSDB_ORDER_ASC);
|
||||||
}
|
}
|
||||||
|
|
|
@ -2852,7 +2852,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
||||||
pInfo->pStDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
|
pInfo->pStDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
|
||||||
pInfo->pDelIterator = NULL;
|
pInfo->pDelIterator = NULL;
|
||||||
pInfo->pDelRes = createOneDataBlock(pResBlock, false);
|
pInfo->pDelRes = createOneDataBlock(pResBlock, false);
|
||||||
pInfo->pDelRes->info.type = STREAM_DELETE;
|
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
|
||||||
blockDataEnsureCapacity(pInfo->pDelRes, 64);
|
blockDataEnsureCapacity(pInfo->pDelRes, 64);
|
||||||
pInfo->pChildren = NULL;
|
pInfo->pChildren = NULL;
|
||||||
pInfo->isFinal = false;
|
pInfo->isFinal = false;
|
||||||
|
@ -3980,7 +3980,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
pInfo->pSeDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
|
pInfo->pSeDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
|
||||||
pInfo->pDelIterator = NULL;
|
pInfo->pDelIterator = NULL;
|
||||||
pInfo->pDelRes = createOneDataBlock(pResBlock, false);
|
pInfo->pDelRes = createOneDataBlock(pResBlock, false);
|
||||||
pInfo->pDelRes->info.type = STREAM_DELETE;
|
pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;
|
||||||
blockDataEnsureCapacity(pInfo->pDelRes, 64);
|
blockDataEnsureCapacity(pInfo->pDelRes, 64);
|
||||||
pInfo->pChildren = NULL;
|
pInfo->pChildren = NULL;
|
||||||
pInfo->ignoreExpiredData = pStateNode->window.igExpired;
|
pInfo->ignoreExpiredData = pStateNode->window.igExpired;
|
||||||
|
|
|
@ -106,6 +106,7 @@
|
||||||
./test.sh -f tsim/stream/partitionby1.sim
|
./test.sh -f tsim/stream/partitionby1.sim
|
||||||
./test.sh -f tsim/stream/schedSnode.sim
|
./test.sh -f tsim/stream/schedSnode.sim
|
||||||
./test.sh -f tsim/stream/windowClose.sim
|
./test.sh -f tsim/stream/windowClose.sim
|
||||||
|
./test.sh -f tsim/stream/ignoreExpiredData.sim
|
||||||
|
|
||||||
# ---- transaction
|
# ---- transaction
|
||||||
./test.sh -f tsim/trans/lossdata1.sim
|
./test.sh -f tsim/trans/lossdata1.sim
|
||||||
|
|
Loading…
Reference in New Issue