Merge pull request #14957 from taosdata/feature/3_liaohj

fix(query): fix partition by bug for last_row query.
This commit is contained in:
Haojun Liao 2022-07-15 20:13:26 +08:00 committed by GitHub
commit 09cf3cc8ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 150 additions and 244 deletions

View File

@ -139,18 +139,29 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) {
colDataAppend(pDst, 0, p, isNull); colDataAppend(pDst, 0, p, isNull);
} }
pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes);
pInfo->pRes->info.rows = 1;
if (pInfo->pseudoExprSup.numOfExprs > 0) { if (pInfo->pseudoExprSup.numOfExprs > 0) {
SExprSupp* pSup = &pInfo->pseudoExprSup; SExprSupp* pSup = &pInfo->pseudoExprSup;
addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
GET_TASKID(pTaskInfo)); GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
return NULL;
}
} }
pInfo->pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes); if (pTableList->map != NULL) {
int64_t* groupId = taosHashGet(pTableList->map, &pInfo->pRes->info.uid, sizeof(int64_t)); int64_t* groupId = taosHashGet(pTableList->map, &pInfo->pRes->info.uid, sizeof(int64_t));
pInfo->pRes->info.groupId = *groupId; pInfo->pRes->info.groupId = *groupId;
} else {
ASSERT(taosArrayGetSize(pTableList->pTableList) == 1);
STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, 0);
pInfo->pRes->info.groupId = pKeyInfo->groupId;
}
pInfo->indexOfBufferedRes += 1; pInfo->indexOfBufferedRes += 1;
pInfo->pRes->info.rows = 1;
return pInfo->pRes; return pInfo->pRes;
} else { } else {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
@ -182,8 +193,12 @@ SSDataBlock* doScanLastrow(SOperatorInfo* pOperator) {
STableKeyInfo* pKeyInfo = taosArrayGet(pGroupTableList, 0); STableKeyInfo* pKeyInfo = taosArrayGet(pGroupTableList, 0);
pInfo->pRes->info.groupId = pKeyInfo->groupId; pInfo->pRes->info.groupId = pKeyInfo->groupId;
addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes, code = addTagPseudoColumnData(&pInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pInfo->pRes,
GET_TASKID(pTaskInfo)); GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
return NULL;
}
} }
tsdbLastrowReaderClose(pInfo->pLastrowReader); tsdbLastrowReaderClose(pInfo->pLastrowReader);

View File

@ -65,7 +65,7 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
} }
rowSize += rowSize +=
(numOfOutput * sizeof(bool)); // expand rowSize to mark if col is null for top/bottom result(saveTupleData) (numOfOutput * sizeof(bool)); // expand rowSize to mark if col is null for top/bottom result(doSaveTupleData)
return rowSize; return rowSize;
} }

View File

@ -106,7 +106,7 @@ bool irateFuncSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
int32_t irateFunction(SqlFunctionCtx *pCtx); int32_t irateFunction(SqlFunctionCtx *pCtx);
int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t cacheLastRowFunction(SqlFunctionCtx* pCtx); int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx);
bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t firstFunction(SqlFunctionCtx *pCtx); int32_t firstFunction(SqlFunctionCtx *pCtx);

View File

@ -2233,7 +2233,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.translateFunc = translateFirstLast, .translateFunc = translateFirstLast,
.getEnvFunc = getFirstLastFuncEnv, .getEnvFunc = getFirstLastFuncEnv,
.initFunc = functionSetup, .initFunc = functionSetup,
.processFunc = cacheLastRowFunction, .processFunc = cachedLastRowFunction,
.finalizeFunc = firstLastFinalize .finalizeFunc = firstLastFinalize
}, },
{ {

View File

@ -80,13 +80,14 @@ typedef struct STopBotRes {
} STopBotRes; } STopBotRes;
typedef struct SFirstLastRes { typedef struct SFirstLastRes {
bool hasResult; bool hasResult;
// used for last_row function only, isNullRes in SResultRowEntry can not be passed to downstream.So, // used for last_row function only, isNullRes in SResultRowEntry can not be passed to downstream.So,
// this attribute is required // this attribute is required
bool isNull; bool isNull;
int32_t bytes; int32_t bytes;
int64_t ts; int64_t ts;
char buf[]; STuplePos pos;
char buf[];
} SFirstLastRes; } SFirstLastRes;
typedef struct SStddevRes { typedef struct SStddevRes {
@ -1141,8 +1142,8 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
return true; return true;
} }
static void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); static void doSaveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); static void doCopyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
static int32_t findRowIndex(int32_t start, int32_t num, SColumnInfoData* pCol, const char* tval) { static int32_t findRowIndex(int32_t start, int32_t num, SColumnInfoData* pCol, const char* tval) {
// the data is loaded, not only the block SMA value // the data is loaded, not only the block SMA value
@ -1195,7 +1196,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
pBuf->v = *(int64_t*)tval; pBuf->v = *(int64_t*)tval;
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
} else { } else {
if (IS_SIGNED_NUMERIC_TYPE(type)) { if (IS_SIGNED_NUMERIC_TYPE(type)) {
@ -1207,7 +1208,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
pBuf->v = val; pBuf->v = val;
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
} }
@ -1220,7 +1221,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
pBuf->v = val; pBuf->v = val;
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
} }
} else if (type == TSDB_DATA_TYPE_DOUBLE) { } else if (type == TSDB_DATA_TYPE_DOUBLE) {
@ -1232,7 +1233,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
pBuf->v = val; pBuf->v = val;
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
} }
} else if (type == TSDB_DATA_TYPE_FLOAT) { } else if (type == TSDB_DATA_TYPE_FLOAT) {
@ -1246,7 +1247,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
} }
} }
@ -1271,7 +1272,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
@ -1283,7 +1284,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if ((*val < pData[i]) ^ isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
} }
} }
@ -1302,7 +1303,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
@ -1314,7 +1315,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if ((*val < pData[i]) ^ isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
} }
} }
@ -1333,7 +1334,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
@ -1345,7 +1346,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if ((*val < pData[i]) ^ isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
} }
} }
@ -1364,7 +1365,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
@ -1376,7 +1377,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if ((*val < pData[i]) ^ isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
} }
} }
@ -1397,7 +1398,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
@ -1409,7 +1410,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if ((*val < pData[i]) ^ isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
} }
} }
@ -1428,7 +1429,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
@ -1440,7 +1441,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if ((*val < pData[i]) ^ isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
} }
} }
@ -1459,7 +1460,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
@ -1471,7 +1472,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if ((*val < pData[i]) ^ isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
} }
} }
@ -1490,7 +1491,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
@ -1502,7 +1503,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if ((*val < pData[i]) ^ isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
} }
} }
@ -1522,7 +1523,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
@ -1534,7 +1535,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if ((*val < pData[i]) ^ isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
} }
} }
@ -1553,7 +1554,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
@ -1565,7 +1566,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if ((*val < pData[i]) ^ isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
} }
} }
@ -2640,7 +2641,7 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx)
} }
int32_t getFirstLastInfoSize(int32_t resBytes) { int32_t getFirstLastInfoSize(int32_t resBytes) {
return sizeof(SFirstLastRes) + resBytes + sizeof(int64_t) + sizeof(STuplePos); return sizeof(SFirstLastRes) + resBytes;
} }
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
@ -2669,6 +2670,33 @@ static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowInde
return *(TSKEY*)colDataGetData(pTsColInfo, rowIndex); return *(TSKEY*)colDataGetData(pTsColInfo, rowIndex);
} }
static void saveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx, SFirstLastRes* pInfo) {
if (pCtx->subsidiaries.num <= 0) {
return;
}
if (!pInfo->hasResult) {
doSaveTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos);
} else {
doCopyTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos);
}
}
static void doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int64_t currentTs, int32_t type, char* pData) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
if (IS_VAR_DATA_TYPE(type)) {
pInfo->bytes = varDataTLen(pData);
}
memcpy(pInfo->buf, pData, pInfo->bytes);
pInfo->ts = currentTs;
saveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo);
pInfo->hasResult = true;
}
// This ordinary first function does not care if current scan is ascending order or descending order scan // This ordinary first function does not care if current scan is ascending order or descending order scan
// the OPTIMIZED version of first function will only handle the ascending order scan // the OPTIMIZED version of first function will only handle the ascending order scan
int32_t firstFunction(SqlFunctionCtx* pCtx) { int32_t firstFunction(SqlFunctionCtx* pCtx) {
@ -2680,9 +2708,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pInputCol = pInput->pData[0];
int32_t type = pInputCol->info.type; pInfo->bytes = pInputCol->info.bytes;
int32_t bytes = pInputCol->info.bytes;
pInfo->bytes = bytes;
// All null data column, return directly. // All null data column, return directly.
if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) { if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
@ -2700,8 +2726,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
if (blockDataOrder == TSDB_ORDER_ASC) { if (blockDataOrder == TSDB_ORDER_ASC) {
// filter according to current result firstly // filter according to current result firstly
if (pResInfo->numOfRes > 0) { if (pResInfo->numOfRes > 0) {
TSKEY ts = *(TSKEY*)(pInfo->buf + bytes); if (pInfo->ts < startKey) {
if (ts < startKey) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
@ -2715,26 +2740,8 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
char* data = colDataGetData(pInputCol, i); char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i); TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || pInfo->ts > cts) {
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) > cts) { doSaveCurrentVal(pCtx, i, cts, pInputCol->info.type, data);
if (IS_VAR_DATA_TYPE(type)) {
bytes = varDataTLen(data);
pInfo->bytes = bytes;
}
memcpy(pInfo->buf, data, bytes);
*(TSKEY*)(pInfo->buf + bytes) = cts;
// handle selectivity
if (pCtx->subsidiaries.num > 0) {
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
if (!pInfo->hasResult) {
saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
} else {
copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
}
}
pInfo->hasResult = true;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
pResInfo->numOfRes = 1;
break; break;
} }
} }
@ -2742,8 +2749,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
// in case of descending order time stamp serial, which usually happens as the results of the nest query, // in case of descending order time stamp serial, which usually happens as the results of the nest query,
// all data needs to be check. // all data needs to be check.
if (pResInfo->numOfRes > 0) { if (pResInfo->numOfRes > 0) {
TSKEY ts = *(TSKEY*)(pInfo->buf + bytes); if (pInfo->ts < endKey) {
if (ts < endKey) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
} }
@ -2758,24 +2764,8 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
char* data = colDataGetData(pInputCol, i); char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i); TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) > cts) { if (pResInfo->numOfRes == 0 || pInfo->ts > cts) {
if (IS_VAR_DATA_TYPE(type)) { doSaveCurrentVal(pCtx, i, cts, pInputCol->info.type, data);
bytes = varDataTLen(data);
pInfo->bytes = bytes;
}
memcpy(pInfo->buf, data, bytes);
*(TSKEY*)(pInfo->buf + bytes) = cts;
// handle selectivity
if (pCtx->subsidiaries.num > 0) {
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
if (!pInfo->hasResult) {
saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
} else {
copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
}
}
pInfo->hasResult = true;
pResInfo->numOfRes = 1;
break; break;
} }
} }
@ -2821,26 +2811,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
char* data = colDataGetData(pInputCol, i); char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i); TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) { if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
if (IS_VAR_DATA_TYPE(type)) { doSaveCurrentVal(pCtx, i, cts, type, data);
bytes = varDataTLen(data);
pInfo->bytes = bytes;
}
memcpy(pInfo->buf, data, bytes);
*(TSKEY*)(pInfo->buf + bytes) = cts;
// handle selectivity
if (pCtx->subsidiaries.num > 0) {
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
if (!pInfo->hasResult) {
saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
} else {
copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
}
}
pInfo->hasResult = true;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
pResInfo->numOfRes = 1;
} }
break; break;
} }
} else { // descending order } else { // descending order
@ -2853,24 +2827,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
char* data = colDataGetData(pInputCol, i); char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i); TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) { if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
if (IS_VAR_DATA_TYPE(type)) { doSaveCurrentVal(pCtx, i, cts, type, data);
bytes = varDataTLen(data);
pInfo->bytes = bytes;
}
memcpy(pInfo->buf, data, bytes);
*(TSKEY*)(pInfo->buf + bytes) = cts;
// handle selectivity
if (pCtx->subsidiaries.num > 0) {
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
if (!pInfo->hasResult) {
saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
} else {
copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
}
}
pInfo->hasResult = true;
pResInfo->numOfRes = 1;
} }
break; break;
} }
@ -2885,8 +2843,9 @@ static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, S
int32_t start = pColInfo->startRowIndex; int32_t start = pColInfo->startRowIndex;
pOutput->bytes = pInput->bytes; pOutput->bytes = pInput->bytes;
TSKEY* tsIn = (TSKEY*)(pInput->buf + pInput->bytes); TSKEY* tsIn = &pInput->ts;
TSKEY* tsOut = (TSKEY*)(pOutput->buf + pInput->bytes); TSKEY* tsOut = &pOutput->ts;
if (pOutput->hasResult) { if (pOutput->hasResult) {
if (isFirst) { if (isFirst) {
if (*tsIn > *tsOut) { if (*tsIn > *tsOut) {
@ -2898,20 +2857,12 @@ static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, S
} }
} }
} }
*tsOut = *tsIn; *tsOut = *tsIn;
memcpy(pOutput->buf, pInput->buf, pOutput->bytes); memcpy(pOutput->buf, pInput->buf, pOutput->bytes);
// handle selectivity saveTupleData(pCtx->pSrcBlock, start, pCtx, pOutput);
STuplePos* pTuplePos = (STuplePos*)(pOutput->buf + pOutput->bytes + sizeof(TSKEY));
if (pCtx->subsidiaries.num > 0) {
if (!pOutput->hasResult) {
saveTupleData(pCtx, start, pCtx->pSrcBlock, pTuplePos);
} else {
copyTupleData(pCtx, start, pCtx->pSrcBlock, pTuplePos);
}
}
pOutput->hasResult = true;
return; pOutput->hasResult = true;
} }
static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuery) { static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuery) {
@ -2953,34 +2904,34 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
colDataAppend(pCol, pBlock->info.rows, pRes->buf, pRes->isNull || pResInfo->isNullRes); colDataAppend(pCol, pBlock->info.rows, pRes->buf, pRes->isNull || pResInfo->isNullRes);
// handle selectivity // handle selectivity
STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY)); setSelectivityValue(pCtx, pBlock, &pRes->pos, pBlock->info.rows);
setSelectivityValue(pCtx, pBlock, pTuplePos, pBlock->info.rows);
return pResInfo->numOfRes; return pResInfo->numOfRes;
} }
int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
int32_t resultBytes = getFirstLastInfoSize(pRes->bytes); int32_t resultBytes = getFirstLastInfoSize(pRes->bytes);
char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
// todo check for failure
char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
memcpy(varDataVal(res), pRes, resultBytes); memcpy(varDataVal(res), pRes, resultBytes);
varDataSetLen(res, resultBytes); varDataSetLen(res, resultBytes);
int32_t slotId = pCtx->pExpr->base.resSchema.slotId; int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
colDataAppend(pCol, pBlock->info.rows, res, false); colDataAppend(pCol, pBlock->info.rows, res, false);
// handle selectivity setSelectivityValue(pCtx, pBlock, &pRes->pos, pBlock->info.rows);
STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY));
setSelectivityValue(pCtx, pBlock, pTuplePos, pBlock->info.rows);
taosMemoryFree(res); taosMemoryFree(res);
return 1; return 1;
} }
//todo rewrite:
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
char* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo); char* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
@ -2998,6 +2949,28 @@ int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void doSaveLastrow(SqlFunctionCtx *pCtx, char* pData, int32_t rowIndex, int64_t cts, SFirstLastRes* pInfo) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
if (colDataIsNull_s(pInputCol, rowIndex)) {
pInfo->isNull = true;
} else {
pInfo->isNull = false;
if (IS_VAR_DATA_TYPE(pInputCol->info.type)) {
pInfo->bytes = varDataTLen(pData);
}
memcpy(pInfo->buf, pData, pInfo->bytes);
}
pInfo->ts = cts;
saveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo);
pInfo->hasResult = true;
}
int32_t lastRowFunction(SqlFunctionCtx* pCtx) { int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = 0; int32_t numOfElems = 0;
@ -3007,12 +2980,9 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pInputCol = pInput->pData[0];
int32_t type = pInputCol->info.type;
int32_t bytes = pInputCol->info.bytes; int32_t bytes = pInputCol->info.bytes;
pInfo->bytes = bytes; pInfo->bytes = bytes;
SColumnDataAgg* pColAgg = (pInput->colDataAggIsSet) ? pInput->pColumnDataAgg[0] : NULL;
TSKEY startKey = getRowPTs(pInput->pPTS, 0); TSKEY startKey = getRowPTs(pInput->pPTS, 0);
TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1); TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1);
@ -3022,31 +2992,10 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
char* data = colDataGetData(pInputCol, i); char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i); TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf) < cts) { numOfElems++;
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
pInfo->isNull = true; if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
} else { doSaveLastrow(pCtx, data, i, cts, pInfo);
pInfo->isNull = false;
if (IS_VAR_DATA_TYPE(type)) {
bytes = varDataTLen(data);
pInfo->bytes = bytes;
}
memcpy(pInfo->buf + sizeof(TSKEY), data, bytes);
}
*(TSKEY*)(pInfo->buf) = cts;
numOfElems++;
// handle selectivity
if (pCtx->subsidiaries.num > 0) {
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
if (!pInfo->hasResult) {
saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
} else {
copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
}
}
pInfo->hasResult = true;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
pResInfo->numOfRes = 1;
} }
break; break;
} }
@ -3054,31 +3003,10 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) { for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
char* data = colDataGetData(pInputCol, i); char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i); TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf) < cts) { numOfElems++;
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
pInfo->isNull = true; if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
} else { doSaveLastrow(pCtx, data, i, cts, pInfo);
pInfo->isNull = false;
if (IS_VAR_DATA_TYPE(type)) {
bytes = varDataTLen(data);
pInfo->bytes = bytes;
}
memcpy(pInfo->buf + sizeof(TSKEY), data, bytes);
}
*(TSKEY*)(pInfo->buf) = cts;
numOfElems++;
// handle selectivity
if (pCtx->subsidiaries.num > 0) {
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
if (!pInfo->hasResult) {
saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
} else {
copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
}
}
pInfo->hasResult = true;
pResInfo->numOfRes = 1;
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
} }
break; break;
} }
@ -3088,21 +3016,6 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t lastRowFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pResInfo);
colDataAppend(pCol, pBlock->info.rows, pRes->buf + sizeof(TSKEY), pRes->isNull);
// handle selectivity
STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY));
setSelectivityValue(pCtx, pBlock, pTuplePos, pBlock->info.rows);
return pResInfo->numOfRes;
}
bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
pEnv->calcMemSize = sizeof(SDiffInfo); pEnv->calcMemSize = sizeof(SDiffInfo);
return true; return true;
@ -3425,7 +3338,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
// save the data of this tuple // save the data of this tuple
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
saveTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); doSaveTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
} }
#ifdef BUF_PAGE_DEBUG #ifdef BUF_PAGE_DEBUG
qDebug("page_saveTuple i:%d, item:%p,pageId:%d, offset:%d\n", pEntryInfo->numOfRes, pItem, pItem->tuplePos.pageId, qDebug("page_saveTuple i:%d, item:%p,pageId:%d, offset:%d\n", pEntryInfo->numOfRes, pItem, pItem->tuplePos.pageId,
@ -3449,7 +3362,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
// save the data of this tuple by over writing the old data // save the data of this tuple by over writing the old data
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
copyTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); doCopyTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
} }
#ifdef BUF_PAGE_DEBUG #ifdef BUF_PAGE_DEBUG
qDebug("page_copyTuple pageId:%d, offset:%d", pItem->tuplePos.pageId, pItem->tuplePos.offset); qDebug("page_copyTuple pageId:%d, offset:%d", pItem->tuplePos.pageId, pItem->tuplePos.offset);
@ -3466,7 +3379,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
* |(n columns, one bit for each column)| src column #1| src column #2| * |(n columns, one bit for each column)| src column #1| src column #2|
* +------------------------------------+--------------+--------------+ * +------------------------------------+--------------+--------------+
*/ */
void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { void doSaveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
SFilePage* pPage = NULL; SFilePage* pPage = NULL;
// todo refactor: move away // todo refactor: move away
@ -3527,7 +3440,7 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
#endif #endif
} }
void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { void doCopyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
SFilePage* pPage = getBufPage(pCtx->pBuf, pPos->pageId); SFilePage* pPage = getBufPage(pCtx->pBuf, pPos->pageId);
int32_t numOfCols = pCtx->subsidiaries.num; int32_t numOfCols = pCtx->subsidiaries.num;
@ -4843,7 +4756,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da
if (pInfo->numSampled < pInfo->samples) { if (pInfo->numSampled < pInfo->samples) {
sampleAssignResult(pInfo, data, pInfo->numSampled); sampleAssignResult(pInfo, data, pInfo->numSampled);
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[pInfo->numSampled]); doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[pInfo->numSampled]);
} }
pInfo->numSampled++; pInfo->numSampled++;
} else { } else {
@ -4851,7 +4764,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da
if (j < pInfo->samples) { if (j < pInfo->samples) {
sampleAssignResult(pInfo, data, j); sampleAssignResult(pInfo, data, j);
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
copyTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[j]); doCopyTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[j]);
} }
} }
} }
@ -5993,7 +5906,7 @@ int32_t interpFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t cacheLastRowFunction(SqlFunctionCtx* pCtx) { int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = 0; int32_t numOfElems = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
@ -6002,9 +5915,7 @@ int32_t cacheLastRowFunction(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pInputCol = pInput->pData[0];
int32_t type = pInputCol->info.type;
int32_t bytes = pInputCol->info.bytes; int32_t bytes = pInputCol->info.bytes;
pInfo->bytes = bytes; pInfo->bytes = bytes;
// last_row function does not ignore the null value // last_row function does not ignore the null value
@ -6014,28 +5925,7 @@ int32_t cacheLastRowFunction(SqlFunctionCtx* pCtx) {
char* data = colDataGetData(pInputCol, i); char* data = colDataGetData(pInputCol, i);
TSKEY cts = getRowPTs(pInput->pPTS, i); TSKEY cts = getRowPTs(pInput->pPTS, i);
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
if (colDataIsNull_s(pInputCol, i)) { doSaveLastrow(pCtx, data, i, cts, pInfo);
pInfo->isNull = true;
} else {
if (IS_VAR_DATA_TYPE(type)) {
bytes = varDataTLen(data);
pInfo->bytes = bytes;
}
memcpy(pInfo->buf, data, bytes);
}
pInfo->ts = cts;
if (pCtx->subsidiaries.num > 0) {
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
if (!pInfo->hasResult) {
saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
} else {
copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
}
}
pInfo->hasResult = true;
} }
} }

View File

@ -1993,7 +1993,8 @@ static bool lastRowScanOptMayBeOptimized(SLogicNode* pNode) {
SNode* pFunc = NULL; SNode* pFunc = NULL;
FOREACH(pFunc, ((SAggLogicNode*)pNode)->pAggFuncs) { FOREACH(pFunc, ((SAggLogicNode*)pNode)->pAggFuncs) {
if (FUNCTION_TYPE_LAST_ROW != ((SFunctionNode*)pFunc)->funcType && if (FUNCTION_TYPE_LAST_ROW != ((SFunctionNode*)pFunc)->funcType &&
FUNCTION_TYPE_SELECT_VALUE != ((SFunctionNode*)pFunc)->funcType) { FUNCTION_TYPE_SELECT_VALUE != ((SFunctionNode*)pFunc)->funcType &&
FUNCTION_TYPE_GROUP_KEY != ((SFunctionNode*)pFunc)->funcType) {
return false; return false;
} }
} }