Merge pull request #12788 from taosdata/feature/3_liaohj

fix(query): specify the primary timestamp column in ssdatablock
This commit is contained in:
Haojun Liao 2022-05-21 14:46:08 +08:00 committed by GitHub
commit 6466187a63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 39 additions and 29 deletions

View File

@ -190,7 +190,7 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity, int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity,
const SColumnInfoData* pSource, uint32_t numOfRow2); const SColumnInfoData* pSource, uint32_t numOfRow2);
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows); int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows);
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock); int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex);
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows); int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);
void colDataTrim(SColumnInfoData* pColumnInfoData); void colDataTrim(SColumnInfoData* pColumnInfoData);

View File

@ -341,7 +341,7 @@ size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) {
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; } size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; }
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock) { int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) {
if (pDataBlock == NULL || pDataBlock->info.rows <= 0) { if (pDataBlock == NULL || pDataBlock->info.rows <= 0) {
return 0; return 0;
} }
@ -350,7 +350,8 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock) {
return -1; return -1;
} }
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, 0); int32_t index = (tsColumnIndex == -1)? 0:tsColumnIndex;
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, index);
if (pColInfoData->info.type != TSDB_DATA_TYPE_TIMESTAMP) { if (pColInfoData->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
return 0; return 0;
} }

View File

@ -1903,7 +1903,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SArray* pColMatchIn
filterFreeInfo(filter); filterFreeInfo(filter);
extractQualifiedTupleByFilterResult(pBlock, rowRes, keep); extractQualifiedTupleByFilterResult(pBlock, rowRes, keep);
blockDataUpdateTsWindow(pBlock); blockDataUpdateTsWindow(pBlock, 0);
} }
void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep) { void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep) {
@ -2072,7 +2072,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
} }
qDebug("%s result generated, rows:%d, groupId:%"PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows, pBlock->info.groupId); qDebug("%s result generated, rows:%d, groupId:%"PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows, pBlock->info.groupId);
blockDataUpdateTsWindow(pBlock); blockDataUpdateTsWindow(pBlock, 0);
return 0; return 0;
} }
@ -2797,7 +2797,9 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
} }
pRes->info.rows = numOfRows; pRes->info.rows = numOfRows;
blockDataUpdateTsWindow(pRes);
// todo move this to time window aggregator, since the primary timestamp may not be known by exchange operator.
blockDataUpdateTsWindow(pRes, 0);
int64_t el = taosGetTimestampUs() - startTs; int64_t el = taosGetTimestampUs() - startTs;

View File

@ -553,7 +553,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
pInfo->pageIndex += 1; pInfo->pageIndex += 1;
blockDataUpdateTsWindow(pInfo->binfo.pRes); blockDataUpdateTsWindow(pInfo->binfo.pRes, 0);
pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId; pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId;
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
} }

View File

@ -689,7 +689,7 @@ static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool inverti
} }
pDataBlock->info.rows = size; pDataBlock->info.rows = size;
pDataBlock->info.type = STREAM_REPROCESS; pDataBlock->info.type = STREAM_REPROCESS;
blockDataUpdateTsWindow(pDataBlock); blockDataUpdateTsWindow(pDataBlock, 0);
taosArrayClear(pInfo->tsArray); taosArrayClear(pInfo->tsArray);
return pDataBlock; return pDataBlock;
} }
@ -899,7 +899,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
} }
rows = pBlockInfo->rows; rows = pBlockInfo->rows;
doFilter(pInfo->pCondition, pInfo->pRes, NULL); doFilter(pInfo->pCondition, pInfo->pRes, NULL);
blockDataUpdateTsWindow(pInfo->pRes); blockDataUpdateTsWindow(pInfo->pRes, 0);
break; break;
} }

View File

@ -621,7 +621,7 @@ static void saveDataBlockLastRow(char** pRow, SArray* pDataBlock, int32_t rowInd
} }
} }
static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
uint64_t tableGroupId) { uint64_t tableGroupId) {
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info; SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
@ -639,13 +639,17 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
// int32_t prevIndex = pResultRowInfo->curPos; // int32_t prevIndex = pResultRowInfo->curPos;
TSKEY* tsCols = NULL; TSKEY* tsCols = NULL;
if (pSDataBlock->pDataBlock != NULL) { if (pBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData; tsCols = (int64_t*)pColDataInfo->pData;
if (tsCols != NULL) {
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
}
} }
int32_t startPos = 0; int32_t startPos = 0;
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols, pSDataBlock->info.rows, ascScan); TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols, pBlock->info.rows, ascScan);
STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval,
pInfo->interval.precision, &pInfo->win); pInfo->interval.precision, &pInfo->win);
@ -670,7 +674,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
int32_t forwardStep = 0; int32_t forwardStep = 0;
TSKEY ekey = ascScan? win.ekey:win.skey; TSKEY ekey = ascScan? win.ekey:win.skey;
forwardStep = forwardStep =
getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order); getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order);
ASSERT(forwardStep > 0); ASSERT(forwardStep > 0);
// prev time window not interpolation yet. // prev time window not interpolation yet.
@ -686,7 +690,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
} }
STimeWindow w = pRes->win; STimeWindow w = pRes->win;
ret = setTimeWindowOutputBuf(pResultRowInfo, pSDataBlock->info.uid, &w, masterScan, &pResult, tableGroupId, ret = setTimeWindowOutputBuf(pResultRowInfo, pBlock->info.uid, &w, masterScan, &pResult, tableGroupId,
pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup,
pTaskInfo); pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
@ -694,17 +698,17 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
} }
assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
doTimeWindowInterpolation(pOperatorInfo, &pInfo->binfo, pSDataBlock->pDataBlock, *(TSKEY*)pInfo->pRow[0], -1, doTimeWindowInterpolation(pOperatorInfo, &pInfo->binfo, pBlock->pDataBlock, *(TSKEY*)pInfo->pRow[0], -1,
tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP); tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP);
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pInfo->binfo.pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP); setNotInterpoWindowKey(pInfo->binfo.pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP);
doApplyFunctions(pInfo->binfo.pCtx, &w, &pInfo->timeWindowData, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); doApplyFunctions(pInfo->binfo.pCtx, &w, &pInfo->timeWindowData, startPos, 0, tsCols, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
} }
// restore current time window // restore current time window
ret = setTimeWindowOutputBuf(pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, tableGroupId, ret = setTimeWindowOutputBuf(pResultRowInfo, pBlock->info.uid, &win, masterScan, &pResult, tableGroupId,
pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup,
pTaskInfo); pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
@ -714,17 +718,17 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
#endif #endif
// window start key interpolation // window start key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep, doWindowBorderInterpolation(pOperatorInfo, pBlock, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep,
pInfo->order, false); pInfo->order, false);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols,
pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
STimeWindow nextWin = win; STimeWindow nextWin = win;
while (1) { while (1) {
int32_t prevEndPos = (forwardStep - 1) * step + startPos; int32_t prevEndPos = (forwardStep - 1) * step + startPos;
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order); startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, pInfo->order);
if (startPos < 0) { if (startPos < 0) {
break; break;
} }
@ -748,20 +752,20 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
ekey = ascScan? nextWin.ekey:nextWin.skey; ekey = ascScan? nextWin.ekey:nextWin.skey;
forwardStep = forwardStep =
getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order); getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order);
// window start(end) key interpolation // window start(end) key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, doWindowBorderInterpolation(pOperatorInfo, pBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep,
pInfo->order, false); pInfo->order, false);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols,
pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
} }
if (pInfo->timeWindowInterpo) { if (pInfo->timeWindowInterpo) {
int32_t rowIndex = ascScan ? (pSDataBlock->info.rows - 1) : 0; int32_t rowIndex = ascScan ? (pBlock->info.rows - 1) : 0;
saveDataBlockLastRow(pInfo->pRow, pSDataBlock->pDataBlock, rowIndex, pSDataBlock->info.numOfCols); saveDataBlockLastRow(pInfo->pRow, pBlock->pDataBlock, rowIndex, pBlock->info.numOfCols);
} }
return pUpdated; return pUpdated;
@ -938,8 +942,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
return pBInfo->pRes; return pBInfo->pRes;
} }
int32_t order = TSDB_ORDER_ASC; int32_t order = TSDB_ORDER_ASC;
STimeWindow win = pTaskInfo->window;
SOperatorInfo* downstream = pOperator->pDownstream[0]; SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) { while (1) {
@ -952,6 +955,8 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
} }
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, MAIN_SCAN, true); setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, MAIN_SCAN, true);
blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
doStateWindowAggImpl(pOperator, pInfo, pBlock); doStateWindowAggImpl(pOperator, pInfo, pBlock);
} }
@ -1429,6 +1434,8 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, MAIN_SCAN, true); setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, MAIN_SCAN, true);
blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId);
doSessionWindowAggImpl(pOperator, pInfo, pBlock); doSessionWindowAggImpl(pOperator, pInfo, pBlock);
} }

View File

@ -189,7 +189,7 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) {
pInfo->current += 1; pInfo->current += 1;
blockDataUpdateTsWindow(pBlock); blockDataUpdateTsWindow(pBlock, 0);
return pBlock; return pBlock;
} }