fix(query): specify the primary timestamp column in ssdatablock when update the time range of ssdatablock.
This commit is contained in:
parent
03c916b436
commit
00821527be
|
@ -190,7 +190,7 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
|
||||||
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity,
|
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity,
|
||||||
const SColumnInfoData* pSource, uint32_t numOfRow2);
|
const SColumnInfoData* pSource, uint32_t numOfRow2);
|
||||||
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows);
|
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows);
|
||||||
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock);
|
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex);
|
||||||
|
|
||||||
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);
|
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);
|
||||||
void colDataTrim(SColumnInfoData* pColumnInfoData);
|
void colDataTrim(SColumnInfoData* pColumnInfoData);
|
||||||
|
|
|
@ -341,7 +341,7 @@ size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) {
|
||||||
|
|
||||||
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; }
|
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; }
|
||||||
|
|
||||||
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock) {
|
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) {
|
||||||
if (pDataBlock == NULL || pDataBlock->info.rows <= 0) {
|
if (pDataBlock == NULL || pDataBlock->info.rows <= 0) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -350,7 +350,8 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, 0);
|
int32_t index = (tsColumnIndex == -1)? 0:tsColumnIndex;
|
||||||
|
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, index);
|
||||||
if (pColInfoData->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
|
if (pColInfoData->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1903,7 +1903,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SArray* pColMatchIn
|
||||||
filterFreeInfo(filter);
|
filterFreeInfo(filter);
|
||||||
|
|
||||||
extractQualifiedTupleByFilterResult(pBlock, rowRes, keep);
|
extractQualifiedTupleByFilterResult(pBlock, rowRes, keep);
|
||||||
blockDataUpdateTsWindow(pBlock);
|
blockDataUpdateTsWindow(pBlock, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep) {
|
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep) {
|
||||||
|
@ -2072,7 +2072,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("%s result generated, rows:%d, groupId:%"PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows, pBlock->info.groupId);
|
qDebug("%s result generated, rows:%d, groupId:%"PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows, pBlock->info.groupId);
|
||||||
blockDataUpdateTsWindow(pBlock);
|
blockDataUpdateTsWindow(pBlock, 0);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2797,7 +2797,9 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
|
||||||
}
|
}
|
||||||
|
|
||||||
pRes->info.rows = numOfRows;
|
pRes->info.rows = numOfRows;
|
||||||
blockDataUpdateTsWindow(pRes);
|
|
||||||
|
// todo move this to time window aggregator, since the primary timestamp may not be known by exchange operator.
|
||||||
|
blockDataUpdateTsWindow(pRes, 0);
|
||||||
|
|
||||||
int64_t el = taosGetTimestampUs() - startTs;
|
int64_t el = taosGetTimestampUs() - startTs;
|
||||||
|
|
||||||
|
|
|
@ -553,7 +553,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
pInfo->pageIndex += 1;
|
pInfo->pageIndex += 1;
|
||||||
|
|
||||||
blockDataUpdateTsWindow(pInfo->binfo.pRes);
|
blockDataUpdateTsWindow(pInfo->binfo.pRes, 0);
|
||||||
pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId;
|
pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId;
|
||||||
return pInfo->binfo.pRes;
|
return pInfo->binfo.pRes;
|
||||||
}
|
}
|
||||||
|
|
|
@ -689,7 +689,7 @@ static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool inverti
|
||||||
}
|
}
|
||||||
pDataBlock->info.rows = size;
|
pDataBlock->info.rows = size;
|
||||||
pDataBlock->info.type = STREAM_REPROCESS;
|
pDataBlock->info.type = STREAM_REPROCESS;
|
||||||
blockDataUpdateTsWindow(pDataBlock);
|
blockDataUpdateTsWindow(pDataBlock, 0);
|
||||||
taosArrayClear(pInfo->tsArray);
|
taosArrayClear(pInfo->tsArray);
|
||||||
return pDataBlock;
|
return pDataBlock;
|
||||||
}
|
}
|
||||||
|
@ -899,7 +899,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
rows = pBlockInfo->rows;
|
rows = pBlockInfo->rows;
|
||||||
doFilter(pInfo->pCondition, pInfo->pRes, NULL);
|
doFilter(pInfo->pCondition, pInfo->pRes, NULL);
|
||||||
blockDataUpdateTsWindow(pInfo->pRes);
|
blockDataUpdateTsWindow(pInfo->pRes, 0);
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -790,6 +790,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
getTableScanInfo(pOperator, &pInfo->order, &scanFlag);
|
getTableScanInfo(pOperator, &pInfo->order, &scanFlag);
|
||||||
|
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, scanFlag, true);
|
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, scanFlag, true);
|
||||||
|
@ -938,8 +939,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
|
||||||
return pBInfo->pRes;
|
return pBInfo->pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t order = TSDB_ORDER_ASC;
|
int32_t order = TSDB_ORDER_ASC;
|
||||||
STimeWindow win = pTaskInfo->window;
|
|
||||||
|
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -952,6 +952,8 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, MAIN_SCAN, true);
|
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, MAIN_SCAN, true);
|
||||||
|
blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
|
||||||
|
|
||||||
doStateWindowAggImpl(pOperator, pInfo, pBlock);
|
doStateWindowAggImpl(pOperator, pInfo, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1429,6 +1431,8 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, MAIN_SCAN, true);
|
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, MAIN_SCAN, true);
|
||||||
|
blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
|
||||||
|
|
||||||
doSessionWindowAggImpl(pOperator, pInfo, pBlock);
|
doSessionWindowAggImpl(pOperator, pInfo, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -189,7 +189,7 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
pInfo->current += 1;
|
pInfo->current += 1;
|
||||||
|
|
||||||
blockDataUpdateTsWindow(pBlock);
|
blockDataUpdateTsWindow(pBlock, 0);
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue