refactor: do some internal refactor.
This commit is contained in:
parent
f60f821c13
commit
3f11c849cd
|
@ -65,7 +65,7 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
||||||
}
|
}
|
||||||
|
|
||||||
rowSize +=
|
rowSize +=
|
||||||
(numOfOutput * sizeof(bool)); // expand rowSize to mark if col is null for top/bottom result(saveTupleData)
|
(numOfOutput * sizeof(bool)); // expand rowSize to mark if col is null for top/bottom result(doSaveTupleData)
|
||||||
return rowSize;
|
return rowSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -106,7 +106,7 @@ bool irateFuncSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
|
||||||
int32_t irateFunction(SqlFunctionCtx *pCtx);
|
int32_t irateFunction(SqlFunctionCtx *pCtx);
|
||||||
int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
int32_t cacheLastRowFunction(SqlFunctionCtx* pCtx);
|
int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx);
|
||||||
|
|
||||||
bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
int32_t firstFunction(SqlFunctionCtx *pCtx);
|
int32_t firstFunction(SqlFunctionCtx *pCtx);
|
||||||
|
|
|
@ -2232,7 +2232,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.translateFunc = translateFirstLast,
|
.translateFunc = translateFirstLast,
|
||||||
.getEnvFunc = getFirstLastFuncEnv,
|
.getEnvFunc = getFirstLastFuncEnv,
|
||||||
.initFunc = functionSetup,
|
.initFunc = functionSetup,
|
||||||
.processFunc = cacheLastRowFunction,
|
.processFunc = cachedLastRowFunction,
|
||||||
.finalizeFunc = firstLastFinalize
|
.finalizeFunc = firstLastFinalize
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
|
@ -86,6 +86,7 @@ typedef struct SFirstLastRes {
|
||||||
bool isNull;
|
bool isNull;
|
||||||
int32_t bytes;
|
int32_t bytes;
|
||||||
int64_t ts;
|
int64_t ts;
|
||||||
|
STuplePos pos;
|
||||||
char buf[];
|
char buf[];
|
||||||
} SFirstLastRes;
|
} SFirstLastRes;
|
||||||
|
|
||||||
|
@ -1141,8 +1142,8 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
|
static void doSaveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
|
||||||
static void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
|
static void doCopyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
|
||||||
|
|
||||||
static int32_t findRowIndex(int32_t start, int32_t num, SColumnInfoData* pCol, const char* tval) {
|
static int32_t findRowIndex(int32_t start, int32_t num, SColumnInfoData* pCol, const char* tval) {
|
||||||
// the data is loaded, not only the block SMA value
|
// the data is loaded, not only the block SMA value
|
||||||
|
@ -1195,7 +1196,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
pBuf->v = *(int64_t*)tval;
|
pBuf->v = *(int64_t*)tval;
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
|
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
|
||||||
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
|
doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
||||||
|
@ -1207,7 +1208,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
pBuf->v = val;
|
pBuf->v = val;
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
|
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
|
||||||
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
|
doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1220,7 +1221,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
pBuf->v = val;
|
pBuf->v = val;
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
|
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
|
||||||
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
|
doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
|
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
|
||||||
|
@ -1232,7 +1233,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
pBuf->v = val;
|
pBuf->v = val;
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
|
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
|
||||||
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
|
doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (type == TSDB_DATA_TYPE_FLOAT) {
|
} else if (type == TSDB_DATA_TYPE_FLOAT) {
|
||||||
|
@ -1246,7 +1247,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
|
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
|
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
|
||||||
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
|
doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1271,7 +1272,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
if (!pBuf->assign) {
|
if (!pBuf->assign) {
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1283,7 +1284,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
if ((*val < pData[i]) ^ isMinFunc) {
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1302,7 +1303,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
if (!pBuf->assign) {
|
if (!pBuf->assign) {
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1314,7 +1315,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
if ((*val < pData[i]) ^ isMinFunc) {
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1333,7 +1334,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
if (!pBuf->assign) {
|
if (!pBuf->assign) {
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1345,7 +1346,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
if ((*val < pData[i]) ^ isMinFunc) {
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1364,7 +1365,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
if (!pBuf->assign) {
|
if (!pBuf->assign) {
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1376,7 +1377,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
if ((*val < pData[i]) ^ isMinFunc) {
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1397,7 +1398,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
if (!pBuf->assign) {
|
if (!pBuf->assign) {
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1409,7 +1410,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
if ((*val < pData[i]) ^ isMinFunc) {
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1428,7 +1429,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
if (!pBuf->assign) {
|
if (!pBuf->assign) {
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1440,7 +1441,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
if ((*val < pData[i]) ^ isMinFunc) {
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1459,7 +1460,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
if (!pBuf->assign) {
|
if (!pBuf->assign) {
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1471,7 +1472,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
if ((*val < pData[i]) ^ isMinFunc) {
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1490,7 +1491,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
if (!pBuf->assign) {
|
if (!pBuf->assign) {
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1502,7 +1503,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
if ((*val < pData[i]) ^ isMinFunc) {
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1522,7 +1523,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
if (!pBuf->assign) {
|
if (!pBuf->assign) {
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1534,7 +1535,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
if ((*val < pData[i]) ^ isMinFunc) {
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1553,7 +1554,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
if (!pBuf->assign) {
|
if (!pBuf->assign) {
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
saveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
pBuf->assign = true;
|
pBuf->assign = true;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1565,7 +1566,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
||||||
if ((*val < pData[i]) ^ isMinFunc) {
|
if ((*val < pData[i]) ^ isMinFunc) {
|
||||||
*val = pData[i];
|
*val = pData[i];
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
copyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2640,7 +2641,7 @@ int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx)
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t getFirstLastInfoSize(int32_t resBytes) {
|
int32_t getFirstLastInfoSize(int32_t resBytes) {
|
||||||
return sizeof(SFirstLastRes) + resBytes + sizeof(int64_t) + sizeof(STuplePos);
|
return sizeof(SFirstLastRes) + resBytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
|
@ -2669,6 +2670,33 @@ static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowInde
|
||||||
return *(TSKEY*)colDataGetData(pTsColInfo, rowIndex);
|
return *(TSKEY*)colDataGetData(pTsColInfo, rowIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void saveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx, SFirstLastRes* pInfo) {
|
||||||
|
if (pCtx->subsidiaries.num <= 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!pInfo->hasResult) {
|
||||||
|
doSaveTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos);
|
||||||
|
} else {
|
||||||
|
doCopyTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int32_t currentTs, int32_t type, char* pData) {
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
SFirstLastRes* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
|
if (IS_VAR_DATA_TYPE(type)) {
|
||||||
|
pInfo->bytes = varDataTLen(pData);
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(pInfo->buf, pData, pInfo->bytes);
|
||||||
|
pInfo->ts = currentTs;
|
||||||
|
saveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo);
|
||||||
|
|
||||||
|
pInfo->hasResult = true;
|
||||||
|
}
|
||||||
|
|
||||||
// This ordinary first function does not care if current scan is ascending order or descending order scan
|
// This ordinary first function does not care if current scan is ascending order or descending order scan
|
||||||
// the OPTIMIZED version of first function will only handle the ascending order scan
|
// the OPTIMIZED version of first function will only handle the ascending order scan
|
||||||
int32_t firstFunction(SqlFunctionCtx* pCtx) {
|
int32_t firstFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
@ -2680,9 +2708,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
|
||||||
SInputColumnInfoData* pInput = &pCtx->input;
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||||
|
|
||||||
int32_t type = pInputCol->info.type;
|
pInfo->bytes = pInputCol->info.bytes;
|
||||||
int32_t bytes = pInputCol->info.bytes;
|
|
||||||
pInfo->bytes = bytes;
|
|
||||||
|
|
||||||
// All null data column, return directly.
|
// All null data column, return directly.
|
||||||
if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
|
if (pInput->colDataAggIsSet && (pInput->pColumnDataAgg[0]->numOfNull == pInput->totalRows)) {
|
||||||
|
@ -2700,8 +2726,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
|
||||||
if (blockDataOrder == TSDB_ORDER_ASC) {
|
if (blockDataOrder == TSDB_ORDER_ASC) {
|
||||||
// filter according to current result firstly
|
// filter according to current result firstly
|
||||||
if (pResInfo->numOfRes > 0) {
|
if (pResInfo->numOfRes > 0) {
|
||||||
TSKEY ts = *(TSKEY*)(pInfo->buf + bytes);
|
if (pInfo->ts < startKey) {
|
||||||
if (ts < startKey) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2715,26 +2740,8 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
char* data = colDataGetData(pInputCol, i);
|
char* data = colDataGetData(pInputCol, i);
|
||||||
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
||||||
|
if (pResInfo->numOfRes == 0 || pInfo->ts > cts) {
|
||||||
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) > cts) {
|
doSaveCurrentVal(pCtx, i, cts, pInputCol->info.type, data);
|
||||||
if (IS_VAR_DATA_TYPE(type)) {
|
|
||||||
bytes = varDataTLen(data);
|
|
||||||
pInfo->bytes = bytes;
|
|
||||||
}
|
|
||||||
memcpy(pInfo->buf, data, bytes);
|
|
||||||
*(TSKEY*)(pInfo->buf + bytes) = cts;
|
|
||||||
// handle selectivity
|
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
|
||||||
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
|
|
||||||
if (!pInfo->hasResult) {
|
|
||||||
saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
|
||||||
} else {
|
|
||||||
copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pInfo->hasResult = true;
|
|
||||||
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
|
||||||
pResInfo->numOfRes = 1;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2742,8 +2749,7 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
|
||||||
// in case of descending order time stamp serial, which usually happens as the results of the nest query,
|
// in case of descending order time stamp serial, which usually happens as the results of the nest query,
|
||||||
// all data needs to be check.
|
// all data needs to be check.
|
||||||
if (pResInfo->numOfRes > 0) {
|
if (pResInfo->numOfRes > 0) {
|
||||||
TSKEY ts = *(TSKEY*)(pInfo->buf + bytes);
|
if (pInfo->ts < endKey) {
|
||||||
if (ts < endKey) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2758,24 +2764,8 @@ int32_t firstFunction(SqlFunctionCtx* pCtx) {
|
||||||
char* data = colDataGetData(pInputCol, i);
|
char* data = colDataGetData(pInputCol, i);
|
||||||
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
||||||
|
|
||||||
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) > cts) {
|
if (pResInfo->numOfRes == 0 || pInfo->ts > cts) {
|
||||||
if (IS_VAR_DATA_TYPE(type)) {
|
doSaveCurrentVal(pCtx, i, cts, pInputCol->info.type, data);
|
||||||
bytes = varDataTLen(data);
|
|
||||||
pInfo->bytes = bytes;
|
|
||||||
}
|
|
||||||
memcpy(pInfo->buf, data, bytes);
|
|
||||||
*(TSKEY*)(pInfo->buf + bytes) = cts;
|
|
||||||
// handle selectivity
|
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
|
||||||
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
|
|
||||||
if (!pInfo->hasResult) {
|
|
||||||
saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
|
||||||
} else {
|
|
||||||
copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pInfo->hasResult = true;
|
|
||||||
pResInfo->numOfRes = 1;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2821,26 +2811,10 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
char* data = colDataGetData(pInputCol, i);
|
char* data = colDataGetData(pInputCol, i);
|
||||||
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
||||||
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) {
|
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
|
||||||
if (IS_VAR_DATA_TYPE(type)) {
|
doSaveCurrentVal(pCtx, i, cts, type, data);
|
||||||
bytes = varDataTLen(data);
|
|
||||||
pInfo->bytes = bytes;
|
|
||||||
}
|
|
||||||
memcpy(pInfo->buf, data, bytes);
|
|
||||||
*(TSKEY*)(pInfo->buf + bytes) = cts;
|
|
||||||
// handle selectivity
|
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
|
||||||
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
|
|
||||||
if (!pInfo->hasResult) {
|
|
||||||
saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
|
||||||
} else {
|
|
||||||
copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pInfo->hasResult = true;
|
|
||||||
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
|
||||||
pResInfo->numOfRes = 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} else { // descending order
|
} else { // descending order
|
||||||
|
@ -2853,24 +2827,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) {
|
||||||
|
|
||||||
char* data = colDataGetData(pInputCol, i);
|
char* data = colDataGetData(pInputCol, i);
|
||||||
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
||||||
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) {
|
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
|
||||||
if (IS_VAR_DATA_TYPE(type)) {
|
doSaveCurrentVal(pCtx, i, cts, type, data);
|
||||||
bytes = varDataTLen(data);
|
|
||||||
pInfo->bytes = bytes;
|
|
||||||
}
|
|
||||||
memcpy(pInfo->buf, data, bytes);
|
|
||||||
*(TSKEY*)(pInfo->buf + bytes) = cts;
|
|
||||||
// handle selectivity
|
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
|
||||||
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
|
|
||||||
if (!pInfo->hasResult) {
|
|
||||||
saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
|
||||||
} else {
|
|
||||||
copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pInfo->hasResult = true;
|
|
||||||
pResInfo->numOfRes = 1;
|
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -2885,8 +2843,9 @@ static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, S
|
||||||
int32_t start = pColInfo->startRowIndex;
|
int32_t start = pColInfo->startRowIndex;
|
||||||
|
|
||||||
pOutput->bytes = pInput->bytes;
|
pOutput->bytes = pInput->bytes;
|
||||||
TSKEY* tsIn = (TSKEY*)(pInput->buf + pInput->bytes);
|
TSKEY* tsIn = &pInput->ts;
|
||||||
TSKEY* tsOut = (TSKEY*)(pOutput->buf + pInput->bytes);
|
TSKEY* tsOut = &pOutput->ts;
|
||||||
|
|
||||||
if (pOutput->hasResult) {
|
if (pOutput->hasResult) {
|
||||||
if (isFirst) {
|
if (isFirst) {
|
||||||
if (*tsIn > *tsOut) {
|
if (*tsIn > *tsOut) {
|
||||||
|
@ -2898,20 +2857,12 @@ static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, S
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
*tsOut = *tsIn;
|
*tsOut = *tsIn;
|
||||||
memcpy(pOutput->buf, pInput->buf, pOutput->bytes);
|
memcpy(pOutput->buf, pInput->buf, pOutput->bytes);
|
||||||
// handle selectivity
|
saveTupleData(pCtx->pSrcBlock, start, pCtx, pOutput);
|
||||||
STuplePos* pTuplePos = (STuplePos*)(pOutput->buf + pOutput->bytes + sizeof(TSKEY));
|
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
|
||||||
if (!pOutput->hasResult) {
|
|
||||||
saveTupleData(pCtx, start, pCtx->pSrcBlock, pTuplePos);
|
|
||||||
} else {
|
|
||||||
copyTupleData(pCtx, start, pCtx->pSrcBlock, pTuplePos);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pOutput->hasResult = true;
|
|
||||||
|
|
||||||
return;
|
pOutput->hasResult = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuery) {
|
static int32_t firstLastFunctionMergeImpl(SqlFunctionCtx* pCtx, bool isFirstQuery) {
|
||||||
|
@ -2953,34 +2904,34 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
colDataAppend(pCol, pBlock->info.rows, pRes->buf, pRes->isNull || pResInfo->isNullRes);
|
colDataAppend(pCol, pBlock->info.rows, pRes->buf, pRes->isNull || pResInfo->isNullRes);
|
||||||
|
|
||||||
// handle selectivity
|
// handle selectivity
|
||||||
STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY));
|
setSelectivityValue(pCtx, pBlock, &pRes->pos, pBlock->info.rows);
|
||||||
setSelectivityValue(pCtx, pBlock, pTuplePos, pBlock->info.rows);
|
|
||||||
|
|
||||||
return pResInfo->numOfRes;
|
return pResInfo->numOfRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||||
SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||||
|
|
||||||
int32_t resultBytes = getFirstLastInfoSize(pRes->bytes);
|
int32_t resultBytes = getFirstLastInfoSize(pRes->bytes);
|
||||||
char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
|
||||||
|
|
||||||
|
// todo check for failure
|
||||||
|
char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
||||||
memcpy(varDataVal(res), pRes, resultBytes);
|
memcpy(varDataVal(res), pRes, resultBytes);
|
||||||
|
|
||||||
varDataSetLen(res, resultBytes);
|
varDataSetLen(res, resultBytes);
|
||||||
|
|
||||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
|
||||||
colDataAppend(pCol, pBlock->info.rows, res, false);
|
colDataAppend(pCol, pBlock->info.rows, res, false);
|
||||||
// handle selectivity
|
setSelectivityValue(pCtx, pBlock, &pRes->pos, pBlock->info.rows);
|
||||||
STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY));
|
|
||||||
setSelectivityValue(pCtx, pBlock, pTuplePos, pBlock->info.rows);
|
|
||||||
|
|
||||||
taosMemoryFree(res);
|
taosMemoryFree(res);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//todo rewrite:
|
||||||
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||||
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
|
||||||
char* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
char* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
|
||||||
|
@ -2998,6 +2949,28 @@ int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void doSaveLastrow(SqlFunctionCtx *pCtx, char* pData, int32_t rowIndex, int64_t cts, SFirstLastRes* pInfo) {
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||||
|
|
||||||
|
if (colDataIsNull_s(pInputCol, rowIndex)) {
|
||||||
|
pInfo->isNull = true;
|
||||||
|
} else {
|
||||||
|
pInfo->isNull = false;
|
||||||
|
|
||||||
|
if (IS_VAR_DATA_TYPE(pInputCol->info.type)) {
|
||||||
|
pInfo->bytes = varDataTLen(pData);
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(pInfo->buf, pData, pInfo->bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->ts = cts;
|
||||||
|
saveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo);
|
||||||
|
|
||||||
|
pInfo->hasResult = true;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
|
int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
|
||||||
int32_t numOfElems = 0;
|
int32_t numOfElems = 0;
|
||||||
|
|
||||||
|
@ -3007,12 +2980,9 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
|
||||||
SInputColumnInfoData* pInput = &pCtx->input;
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||||
|
|
||||||
int32_t type = pInputCol->info.type;
|
|
||||||
int32_t bytes = pInputCol->info.bytes;
|
int32_t bytes = pInputCol->info.bytes;
|
||||||
pInfo->bytes = bytes;
|
pInfo->bytes = bytes;
|
||||||
|
|
||||||
SColumnDataAgg* pColAgg = (pInput->colDataAggIsSet) ? pInput->pColumnDataAgg[0] : NULL;
|
|
||||||
|
|
||||||
TSKEY startKey = getRowPTs(pInput->pPTS, 0);
|
TSKEY startKey = getRowPTs(pInput->pPTS, 0);
|
||||||
TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1);
|
TSKEY endKey = getRowPTs(pInput->pPTS, pInput->totalRows - 1);
|
||||||
|
|
||||||
|
@ -3022,31 +2992,10 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
|
||||||
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
|
for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) {
|
||||||
char* data = colDataGetData(pInputCol, i);
|
char* data = colDataGetData(pInputCol, i);
|
||||||
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
||||||
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf) < cts) {
|
|
||||||
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
|
|
||||||
pInfo->isNull = true;
|
|
||||||
} else {
|
|
||||||
pInfo->isNull = false;
|
|
||||||
if (IS_VAR_DATA_TYPE(type)) {
|
|
||||||
bytes = varDataTLen(data);
|
|
||||||
pInfo->bytes = bytes;
|
|
||||||
}
|
|
||||||
memcpy(pInfo->buf + sizeof(TSKEY), data, bytes);
|
|
||||||
}
|
|
||||||
*(TSKEY*)(pInfo->buf) = cts;
|
|
||||||
numOfElems++;
|
numOfElems++;
|
||||||
// handle selectivity
|
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
|
||||||
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
|
doSaveLastrow(pCtx, data, i, cts, pInfo);
|
||||||
if (!pInfo->hasResult) {
|
|
||||||
saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
|
||||||
} else {
|
|
||||||
copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pInfo->hasResult = true;
|
|
||||||
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
|
||||||
pResInfo->numOfRes = 1;
|
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -3054,31 +3003,10 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
|
||||||
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; ++i) {
|
||||||
char* data = colDataGetData(pInputCol, i);
|
char* data = colDataGetData(pInputCol, i);
|
||||||
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
||||||
if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf) < cts) {
|
|
||||||
if (pInputCol->hasNull && colDataIsNull(pInputCol, pInput->totalRows, i, pColAgg)) {
|
|
||||||
pInfo->isNull = true;
|
|
||||||
} else {
|
|
||||||
pInfo->isNull = false;
|
|
||||||
if (IS_VAR_DATA_TYPE(type)) {
|
|
||||||
bytes = varDataTLen(data);
|
|
||||||
pInfo->bytes = bytes;
|
|
||||||
}
|
|
||||||
memcpy(pInfo->buf + sizeof(TSKEY), data, bytes);
|
|
||||||
}
|
|
||||||
*(TSKEY*)(pInfo->buf) = cts;
|
|
||||||
numOfElems++;
|
numOfElems++;
|
||||||
// handle selectivity
|
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
|
||||||
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
|
doSaveLastrow(pCtx, data, i, cts, pInfo);
|
||||||
if (!pInfo->hasResult) {
|
|
||||||
saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
|
||||||
} else {
|
|
||||||
copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pInfo->hasResult = true;
|
|
||||||
pResInfo->numOfRes = 1;
|
|
||||||
// DO_UPDATE_TAG_COLUMNS(pCtx, ts);
|
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -3088,21 +3016,6 @@ int32_t lastRowFunction(SqlFunctionCtx* pCtx) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t lastRowFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
|
||||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
|
||||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
|
||||||
|
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
|
||||||
|
|
||||||
SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pResInfo);
|
|
||||||
colDataAppend(pCol, pBlock->info.rows, pRes->buf + sizeof(TSKEY), pRes->isNull);
|
|
||||||
// handle selectivity
|
|
||||||
STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY));
|
|
||||||
setSelectivityValue(pCtx, pBlock, pTuplePos, pBlock->info.rows);
|
|
||||||
|
|
||||||
return pResInfo->numOfRes;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||||
pEnv->calcMemSize = sizeof(SDiffInfo);
|
pEnv->calcMemSize = sizeof(SDiffInfo);
|
||||||
return true;
|
return true;
|
||||||
|
@ -3425,7 +3338,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
|
||||||
|
|
||||||
// save the data of this tuple
|
// save the data of this tuple
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
saveTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
|
doSaveTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
|
||||||
}
|
}
|
||||||
#ifdef BUF_PAGE_DEBUG
|
#ifdef BUF_PAGE_DEBUG
|
||||||
qDebug("page_saveTuple i:%d, item:%p,pageId:%d, offset:%d\n", pEntryInfo->numOfRes, pItem, pItem->tuplePos.pageId,
|
qDebug("page_saveTuple i:%d, item:%p,pageId:%d, offset:%d\n", pEntryInfo->numOfRes, pItem, pItem->tuplePos.pageId,
|
||||||
|
@ -3449,7 +3362,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
|
||||||
|
|
||||||
// save the data of this tuple by over writing the old data
|
// save the data of this tuple by over writing the old data
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
copyTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
|
doCopyTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
|
||||||
}
|
}
|
||||||
#ifdef BUF_PAGE_DEBUG
|
#ifdef BUF_PAGE_DEBUG
|
||||||
qDebug("page_copyTuple pageId:%d, offset:%d", pItem->tuplePos.pageId, pItem->tuplePos.offset);
|
qDebug("page_copyTuple pageId:%d, offset:%d", pItem->tuplePos.pageId, pItem->tuplePos.offset);
|
||||||
|
@ -3466,7 +3379,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
|
||||||
* |(n columns, one bit for each column)| src column #1| src column #2|
|
* |(n columns, one bit for each column)| src column #1| src column #2|
|
||||||
* +------------------------------------+--------------+--------------+
|
* +------------------------------------+--------------+--------------+
|
||||||
*/
|
*/
|
||||||
void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
|
void doSaveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
|
||||||
SFilePage* pPage = NULL;
|
SFilePage* pPage = NULL;
|
||||||
|
|
||||||
// todo refactor: move away
|
// todo refactor: move away
|
||||||
|
@ -3527,7 +3440,7 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
|
void doCopyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
|
||||||
SFilePage* pPage = getBufPage(pCtx->pBuf, pPos->pageId);
|
SFilePage* pPage = getBufPage(pCtx->pBuf, pPos->pageId);
|
||||||
|
|
||||||
int32_t numOfCols = pCtx->subsidiaries.num;
|
int32_t numOfCols = pCtx->subsidiaries.num;
|
||||||
|
@ -4843,7 +4756,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da
|
||||||
if (pInfo->numSampled < pInfo->samples) {
|
if (pInfo->numSampled < pInfo->samples) {
|
||||||
sampleAssignResult(pInfo, data, pInfo->numSampled);
|
sampleAssignResult(pInfo, data, pInfo->numSampled);
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[pInfo->numSampled]);
|
doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[pInfo->numSampled]);
|
||||||
}
|
}
|
||||||
pInfo->numSampled++;
|
pInfo->numSampled++;
|
||||||
} else {
|
} else {
|
||||||
|
@ -4851,7 +4764,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da
|
||||||
if (j < pInfo->samples) {
|
if (j < pInfo->samples) {
|
||||||
sampleAssignResult(pInfo, data, j);
|
sampleAssignResult(pInfo, data, j);
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
if (pCtx->subsidiaries.num > 0) {
|
||||||
copyTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[j]);
|
doCopyTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[j]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5993,7 +5906,7 @@ int32_t interpFunction(SqlFunctionCtx* pCtx) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t cacheLastRowFunction(SqlFunctionCtx* pCtx) {
|
int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) {
|
||||||
int32_t numOfElems = 0;
|
int32_t numOfElems = 0;
|
||||||
|
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
@ -6002,9 +5915,7 @@ int32_t cacheLastRowFunction(SqlFunctionCtx* pCtx) {
|
||||||
SInputColumnInfoData* pInput = &pCtx->input;
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
SColumnInfoData* pInputCol = pInput->pData[0];
|
SColumnInfoData* pInputCol = pInput->pData[0];
|
||||||
|
|
||||||
int32_t type = pInputCol->info.type;
|
|
||||||
int32_t bytes = pInputCol->info.bytes;
|
int32_t bytes = pInputCol->info.bytes;
|
||||||
|
|
||||||
pInfo->bytes = bytes;
|
pInfo->bytes = bytes;
|
||||||
|
|
||||||
// last_row function does not ignore the null value
|
// last_row function does not ignore the null value
|
||||||
|
@ -6014,28 +5925,7 @@ int32_t cacheLastRowFunction(SqlFunctionCtx* pCtx) {
|
||||||
char* data = colDataGetData(pInputCol, i);
|
char* data = colDataGetData(pInputCol, i);
|
||||||
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
TSKEY cts = getRowPTs(pInput->pPTS, i);
|
||||||
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
|
if (pResInfo->numOfRes == 0 || pInfo->ts < cts) {
|
||||||
if (colDataIsNull_s(pInputCol, i)) {
|
doSaveLastrow(pCtx, data, i, cts, pInfo);
|
||||||
pInfo->isNull = true;
|
|
||||||
} else {
|
|
||||||
if (IS_VAR_DATA_TYPE(type)) {
|
|
||||||
bytes = varDataTLen(data);
|
|
||||||
pInfo->bytes = bytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
memcpy(pInfo->buf, data, bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
pInfo->ts = cts;
|
|
||||||
if (pCtx->subsidiaries.num > 0) {
|
|
||||||
STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY));
|
|
||||||
if (!pInfo->hasResult) {
|
|
||||||
saveTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
|
||||||
} else {
|
|
||||||
copyTupleData(pCtx, i, pCtx->pSrcBlock, pTuplePos);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pInfo->hasResult = true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue