refactor(query): do some internal refactor.
This commit is contained in:
parent
3c6aea81a1
commit
ba8b10e27b
|
@ -92,6 +92,8 @@ struct SResultRowEntryInfo;
|
|||
//for selectivity query, the corresponding tag value is assigned if the data is qualified
|
||||
typedef struct SSubsidiaryResInfo {
|
||||
int16_t num;
|
||||
int32_t rowLen;
|
||||
char* buf; // serialize data buffer
|
||||
struct SqlFunctionCtx **pCtx;
|
||||
} SSubsidiaryResInfo;
|
||||
|
||||
|
@ -118,6 +120,11 @@ typedef struct SInputColumnInfoData {
|
|||
uint64_t uid; // table uid, used to set the tag value when building the final query result for selectivity functions.
|
||||
} SInputColumnInfoData;
|
||||
|
||||
typedef struct SSerializeDataHandle {
|
||||
struct SDiskbasedBuf* pBuf;
|
||||
int32_t currentPage;
|
||||
} SSerializeDataHandle;
|
||||
|
||||
// sql function runtime context
|
||||
typedef struct SqlFunctionCtx {
|
||||
SInputColumnInfoData input;
|
||||
|
@ -137,10 +144,9 @@ typedef struct SqlFunctionCtx {
|
|||
SFuncExecFuncs fpSet;
|
||||
SScalarFuncExecFuncs sfp;
|
||||
struct SExprInfo *pExpr;
|
||||
struct SDiskbasedBuf *pBuf;
|
||||
struct SSDataBlock *pSrcBlock;
|
||||
struct SSDataBlock *pDstBlock; // used by indefinite rows function to set selectivity
|
||||
int32_t curBufPage;
|
||||
SSerializeDataHandle saveHandle;
|
||||
bool isStream;
|
||||
|
||||
char udfName[TSDB_FUNC_NAME_LEN];
|
||||
|
|
|
@ -58,11 +58,10 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
|
|||
/**
|
||||
*
|
||||
* @param pBuf
|
||||
* @param groupId
|
||||
* @param pageId
|
||||
* @return
|
||||
*/
|
||||
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId);
|
||||
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId);
|
||||
|
||||
/**
|
||||
*
|
||||
|
|
|
@ -46,8 +46,8 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
|||
rowSize += pCtx[i].resDataInfo.interBufSize;
|
||||
}
|
||||
|
||||
rowSize +=
|
||||
(numOfOutput * sizeof(bool)); // expand rowSize to mark if col is null for top/bottom result(doSaveTupleData)
|
||||
rowSize += (numOfOutput * sizeof(bool));
|
||||
// expand rowSize to mark if col is null for top/bottom result(saveTupleData)
|
||||
return rowSize;
|
||||
}
|
||||
|
||||
|
@ -1175,7 +1175,6 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
|
|||
SqlFunctionCtx* pCtx = &pFuncCtx[i];
|
||||
|
||||
pCtx->functionId = -1;
|
||||
pCtx->curBufPage = -1;
|
||||
pCtx->pExpr = pExpr;
|
||||
|
||||
if (pExpr->pExpr->nodeType == QUERY_NODE_FUNCTION) {
|
||||
|
@ -1219,6 +1218,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
|
|||
pCtx->isStream = false;
|
||||
|
||||
pCtx->param = pFunct->pParam;
|
||||
pCtx->saveHandle.currentPage = -1;
|
||||
}
|
||||
|
||||
for (int32_t i = 1; i < numOfOutput; ++i) {
|
||||
|
|
|
@ -187,7 +187,7 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int
|
|||
SIDList list = getDataBufPagesIdList(pResultBuf);
|
||||
|
||||
if (taosArrayGetSize(list) == 0) {
|
||||
pData = getNewBufPage(pResultBuf, tableGroupId, &pageId);
|
||||
pData = getNewBufPage(pResultBuf, &pageId);
|
||||
pData->num = sizeof(SFilePage);
|
||||
} else {
|
||||
SPageInfo* pi = getLastPageInfo(list);
|
||||
|
@ -198,7 +198,7 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int
|
|||
// release current page first, and prepare the next one
|
||||
releaseBufPageInfo(pResultBuf, pi);
|
||||
|
||||
pData = getNewBufPage(pResultBuf, tableGroupId, &pageId);
|
||||
pData = getNewBufPage(pResultBuf, &pageId);
|
||||
if (pData != NULL) {
|
||||
pData->num = sizeof(SFilePage);
|
||||
}
|
||||
|
@ -302,7 +302,7 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes
|
|||
SIDList list = getDataBufPagesIdList(pResultBuf);
|
||||
|
||||
if (taosArrayGetSize(list) == 0) {
|
||||
pData = getNewBufPage(pResultBuf, tid, &pageId);
|
||||
pData = getNewBufPage(pResultBuf, &pageId);
|
||||
pData->num = sizeof(SFilePage);
|
||||
} else {
|
||||
SPageInfo* pi = getLastPageInfo(list);
|
||||
|
@ -313,7 +313,7 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes
|
|||
// release current page first, and prepare the next one
|
||||
releaseBufPageInfo(pResultBuf, pi);
|
||||
|
||||
pData = getNewBufPage(pResultBuf, tid, &pageId);
|
||||
pData = getNewBufPage(pResultBuf, &pageId);
|
||||
if (pData != NULL) {
|
||||
pData->num = sizeof(SFilePage);
|
||||
}
|
||||
|
@ -3488,7 +3488,7 @@ int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInf
|
|||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
pSup->pCtx[i].pBuf = pAggSup->pResultBuf;
|
||||
pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -3520,6 +3520,7 @@ void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
|||
}
|
||||
|
||||
taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
|
||||
taosMemoryFreeClear(pCtx[i].subsidiaries.buf);
|
||||
taosMemoryFree(pCtx[i].input.pData);
|
||||
taosMemoryFree(pCtx[i].input.pColumnDataAgg);
|
||||
}
|
||||
|
@ -4704,7 +4705,8 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF
|
|||
}
|
||||
int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, tsTempDir);
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
pCtx[i].pBuf = pSup->pResultBuf;
|
||||
pCtx[i].saveHandle.pBuf = pSup->pResultBuf;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -547,7 +547,7 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
|
|||
p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);
|
||||
|
||||
int32_t pageId = 0;
|
||||
pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
|
||||
pPage = getNewBufPage(pInfo->pBuf, &pageId);
|
||||
taosArrayPush(p->pPageList, &pageId);
|
||||
|
||||
*(int32_t *) pPage = 0;
|
||||
|
@ -562,7 +562,7 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
|
|||
|
||||
// add a new page for current group
|
||||
int32_t pageId = 0;
|
||||
pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
|
||||
pPage = getNewBufPage(pInfo->pBuf, &pageId);
|
||||
taosArrayPush(p->pPageList, &pageId);
|
||||
memset(pPage, 0, getBufPageSize(pInfo->pBuf));
|
||||
}
|
||||
|
|
|
@ -195,16 +195,6 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
|
|||
return PROJECT_RETRIEVE_DONE;
|
||||
}
|
||||
|
||||
void printDataBlock1(SSDataBlock* pBlock, const char* flag) {
|
||||
if (!pBlock || pBlock->info.rows == 0) {
|
||||
qDebug("===stream===printDataBlock: Block is Null or Empty");
|
||||
return;
|
||||
}
|
||||
char* pBuf = NULL;
|
||||
qDebug("%s", dumpBlockData(pBlock, flag, &pBuf));
|
||||
taosMemoryFreeClear(pBuf);
|
||||
}
|
||||
|
||||
SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
|
||||
SProjectOperatorInfo* pProjectInfo = pOperator->info;
|
||||
SOptrBasicInfo* pInfo = &pProjectInfo->binfo;
|
||||
|
|
|
@ -3529,7 +3529,7 @@ int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo*
|
|||
initBasicInfo(pBasicInfo, pResultBlock);
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
pSup->pCtx[i].pBuf = NULL;
|
||||
pSup->pCtx[i].saveHandle.pBuf = NULL;
|
||||
}
|
||||
|
||||
ASSERT(numOfCols > 0);
|
||||
|
|
|
@ -97,7 +97,7 @@ static int32_t doAddToBucket(SLHashObj* pHashObj, SLHashBucket* pBucket, int32_t
|
|||
|
||||
// allocate the overflow buffer page to hold this k/v.
|
||||
int32_t newPageId = -1;
|
||||
SFilePage* pNewPage = getNewBufPage(pHashObj->pBuf, 0, &newPageId);
|
||||
SFilePage* pNewPage = getNewBufPage(pHashObj->pBuf, &newPageId);
|
||||
if (pNewPage == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
@ -227,7 +227,7 @@ static int32_t doAddNewBucket(SLHashObj* pHashObj) {
|
|||
}
|
||||
|
||||
int32_t pageId = -1;
|
||||
SFilePage* p = getNewBufPage(pHashObj->pBuf, 0, &pageId);
|
||||
SFilePage* p = getNewBufPage(pHashObj->pBuf, &pageId);
|
||||
if (p == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
|
|
@ -180,7 +180,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
|
|||
}
|
||||
|
||||
int32_t pageId = -1;
|
||||
void* pPage = getNewBufPage(pHandle->pBuf, pHandle->sourceId, &pageId);
|
||||
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
|
||||
if (pPage == NULL) {
|
||||
blockDataDestroy(p);
|
||||
return terrno;
|
||||
|
@ -512,7 +512,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
|||
}
|
||||
|
||||
int32_t pageId = -1;
|
||||
void* pPage = getNewBufPage(pHandle->pBuf, pHandle->sourceId, &pageId);
|
||||
void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
|
||||
if (pPage == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
|
|
@ -1146,8 +1146,9 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static void doSaveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
|
||||
static void doCopyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
|
||||
static STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock);
|
||||
static int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
|
||||
static const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos);
|
||||
|
||||
static int32_t findRowIndex(int32_t start, int32_t num, SColumnInfoData* pCol, const char* tval) {
|
||||
// the data is loaded, not only the block SMA value
|
||||
|
@ -1199,7 +1200,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
pBuf->v = *(int64_t*)tval;
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
|
||||
doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
|
||||
}
|
||||
} else {
|
||||
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
||||
|
@ -1211,7 +1212,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
*(int64_t*)&pBuf->v = val;
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
|
||||
doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1224,7 +1225,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
*(uint64_t*)&pBuf->v = val;
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
|
||||
doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
|
||||
}
|
||||
}
|
||||
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
|
||||
|
@ -1236,7 +1237,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
*(double*)&pBuf->v = val;
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
|
||||
doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
|
||||
}
|
||||
}
|
||||
} else if (type == TSDB_DATA_TYPE_FLOAT) {
|
||||
|
@ -1250,7 +1251,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
|
||||
doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1275,7 +1276,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
if (!pBuf->assign) {
|
||||
*val = pData[i];
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock);
|
||||
}
|
||||
pBuf->assign = true;
|
||||
} else {
|
||||
|
@ -1287,7 +1288,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
if ((*val < pData[i]) ^ isMinFunc) {
|
||||
*val = pData[i];
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1306,7 +1307,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
if (!pBuf->assign) {
|
||||
*val = pData[i];
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock);
|
||||
}
|
||||
pBuf->assign = true;
|
||||
} else {
|
||||
|
@ -1318,7 +1319,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
if ((*val < pData[i]) ^ isMinFunc) {
|
||||
*val = pData[i];
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1337,7 +1338,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
if (!pBuf->assign) {
|
||||
*val = pData[i];
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock);
|
||||
}
|
||||
pBuf->assign = true;
|
||||
} else {
|
||||
|
@ -1349,7 +1350,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
if ((*val < pData[i]) ^ isMinFunc) {
|
||||
*val = pData[i];
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1368,7 +1369,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
if (!pBuf->assign) {
|
||||
*val = pData[i];
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock);
|
||||
}
|
||||
pBuf->assign = true;
|
||||
} else {
|
||||
|
@ -1380,7 +1381,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
if ((*val < pData[i]) ^ isMinFunc) {
|
||||
*val = pData[i];
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1401,7 +1402,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
if (!pBuf->assign) {
|
||||
*val = pData[i];
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock);
|
||||
}
|
||||
pBuf->assign = true;
|
||||
} else {
|
||||
|
@ -1413,7 +1414,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
if ((*val < pData[i]) ^ isMinFunc) {
|
||||
*val = pData[i];
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1432,7 +1433,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
if (!pBuf->assign) {
|
||||
*val = pData[i];
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock);
|
||||
}
|
||||
pBuf->assign = true;
|
||||
} else {
|
||||
|
@ -1444,7 +1445,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
if ((*val < pData[i]) ^ isMinFunc) {
|
||||
*val = pData[i];
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1463,7 +1464,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
if (!pBuf->assign) {
|
||||
*val = pData[i];
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock);
|
||||
}
|
||||
pBuf->assign = true;
|
||||
} else {
|
||||
|
@ -1475,7 +1476,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
if ((*val < pData[i]) ^ isMinFunc) {
|
||||
*val = pData[i];
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1494,7 +1495,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
if (!pBuf->assign) {
|
||||
*val = pData[i];
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock);
|
||||
}
|
||||
pBuf->assign = true;
|
||||
} else {
|
||||
|
@ -1506,7 +1507,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
if ((*val < pData[i]) ^ isMinFunc) {
|
||||
*val = pData[i];
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1526,7 +1527,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
if (!pBuf->assign) {
|
||||
*val = pData[i];
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock);
|
||||
}
|
||||
pBuf->assign = true;
|
||||
} else {
|
||||
|
@ -1538,7 +1539,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
if ((*val < pData[i]) ^ isMinFunc) {
|
||||
*val = pData[i];
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1557,7 +1558,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
if (!pBuf->assign) {
|
||||
*val = pData[i];
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock);
|
||||
}
|
||||
pBuf->assign = true;
|
||||
} else {
|
||||
|
@ -1569,7 +1570,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
if ((*val < pData[i]) ^ isMinFunc) {
|
||||
*val = pData[i];
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1580,7 +1581,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
|
|||
|
||||
_min_max_over:
|
||||
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pBuf->nullTupleSaved ) {
|
||||
doSaveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pBuf->nullTuplePos);
|
||||
pBuf->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
|
||||
pBuf->nullTupleSaved = true;
|
||||
}
|
||||
return numOfElems;
|
||||
|
@ -1599,8 +1600,7 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
static void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex);
|
||||
|
||||
static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, int32_t rIndex);
|
||||
static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, int32_t rowIndex);
|
||||
|
||||
int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
|
||||
|
@ -1648,34 +1648,29 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple
|
|||
return;
|
||||
}
|
||||
|
||||
int32_t pageId = pTuplePos->pageId;
|
||||
int32_t offset = pTuplePos->offset;
|
||||
if (pCtx->saveHandle.pBuf != NULL) {
|
||||
if (pTuplePos->pageId != -1) {
|
||||
int32_t numOfCols = pCtx->subsidiaries.num;
|
||||
const char* p = loadTupleData(pCtx, pTuplePos);
|
||||
|
||||
if (pTuplePos->pageId != -1) {
|
||||
int32_t numOfCols = pCtx->subsidiaries.num;
|
||||
SFilePage* pPage = getBufPage(pCtx->pBuf, pageId);
|
||||
bool* nullList = (bool*)p;
|
||||
char* pStart = (char*)(nullList + numOfCols * sizeof(bool));
|
||||
|
||||
bool* nullList = (bool*)((char*)pPage + offset);
|
||||
char* pStart = (char*)(nullList + numOfCols * sizeof(bool));
|
||||
// todo set the offset value to optimize the performance.
|
||||
for (int32_t j = 0; j < numOfCols; ++j) {
|
||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
||||
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
|
||||
|
||||
// todo set the offset value to optimize the performance.
|
||||
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
|
||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
||||
|
||||
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
|
||||
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
|
||||
|
||||
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
||||
ASSERT(pc->pExpr->base.resSchema.bytes == pDstCol->info.bytes);
|
||||
if (nullList[j]) {
|
||||
colDataAppendNULL(pDstCol, rowIndex);
|
||||
} else {
|
||||
colDataAppend(pDstCol, rowIndex, pStart, false);
|
||||
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
||||
ASSERT(pc->pExpr->base.resSchema.bytes == pDstCol->info.bytes);
|
||||
if (nullList[j]) {
|
||||
colDataAppendNULL(pDstCol, rowIndex);
|
||||
} else {
|
||||
colDataAppend(pDstCol, rowIndex, pStart, false);
|
||||
}
|
||||
pStart += pDstCol->info.bytes;
|
||||
}
|
||||
pStart += pDstCol->info.bytes;
|
||||
}
|
||||
|
||||
releaseBufPage(pCtx->pBuf, pPage);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2756,15 +2751,15 @@ static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowInde
|
|||
return *(TSKEY*)colDataGetData(pTsColInfo, rowIndex);
|
||||
}
|
||||
|
||||
static void saveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx, SFirstLastRes* pInfo) {
|
||||
static void firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx, SFirstLastRes* pInfo) {
|
||||
if (pCtx->subsidiaries.num <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!pInfo->hasResult) {
|
||||
doSaveTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos);
|
||||
pInfo->pos = saveTupleData(pCtx, rowIndex, pSrcBlock);
|
||||
} else {
|
||||
doCopyTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos);
|
||||
updateTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2778,7 +2773,7 @@ static void doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int64_t cur
|
|||
|
||||
memcpy(pInfo->buf, pData, pInfo->bytes);
|
||||
pInfo->ts = currentTs;
|
||||
saveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo);
|
||||
firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo);
|
||||
|
||||
pInfo->hasResult = true;
|
||||
}
|
||||
|
@ -2982,7 +2977,7 @@ static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, S
|
|||
pOutput->bytes = pInput->bytes;
|
||||
|
||||
memcpy(pOutput->buf, pInput->buf, pOutput->bytes);
|
||||
saveTupleData(pCtx->pSrcBlock, start, pCtx, pOutput);
|
||||
firstlastSaveTupleData(pCtx->pSrcBlock, start, pCtx, pOutput);
|
||||
|
||||
pOutput->hasResult = true;
|
||||
}
|
||||
|
@ -3087,7 +3082,7 @@ static void doSaveLastrow(SqlFunctionCtx* pCtx, char* pData, int32_t rowIndex, i
|
|||
}
|
||||
|
||||
pInfo->ts = cts;
|
||||
saveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo);
|
||||
firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo);
|
||||
|
||||
pInfo->hasResult = true;
|
||||
}
|
||||
|
@ -3420,7 +3415,7 @@ int32_t topFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) {
|
||||
doSaveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pRes->nullTuplePos);
|
||||
pRes->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
|
||||
pRes->nullTupleSaved = true;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -3448,7 +3443,7 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) {
|
||||
doSaveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pRes->nullTuplePos);
|
||||
pRes->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
|
||||
pRes->nullTupleSaved = true;
|
||||
}
|
||||
|
||||
|
@ -3500,7 +3495,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
|
|||
|
||||
// save the data of this tuple
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
doSaveTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
|
||||
pItem->tuplePos = saveTupleData(pCtx, rowIndex, pSrcBlock);
|
||||
}
|
||||
#ifdef BUF_PAGE_DEBUG
|
||||
qDebug("page_saveTuple i:%d, item:%p,pageId:%d, offset:%d\n", pEntryInfo->numOfRes, pItem, pItem->tuplePos.pageId,
|
||||
|
@ -3524,7 +3519,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
|
|||
|
||||
// save the data of this tuple by over writing the old data
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
doCopyTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
|
||||
updateTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
|
||||
}
|
||||
#ifdef BUF_PAGE_DEBUG
|
||||
qDebug("page_copyTuple pageId:%d, offset:%d", pItem->tuplePos.pageId, pItem->tuplePos.offset);
|
||||
|
@ -3541,38 +3536,13 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
|
|||
* |(n columns, one bit for each column)| src column #1| src column #2|
|
||||
* +------------------------------------+--------------+--------------+
|
||||
*/
|
||||
void doSaveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
|
||||
SFilePage* pPage = NULL;
|
||||
void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsidiaryResInfo* pSubsidiaryies, char* buf) {
|
||||
char* nullList = buf;
|
||||
char* pStart = (char*)(nullList + sizeof(bool) * pSubsidiaryies->num);
|
||||
|
||||
// todo refactor: move away
|
||||
int32_t completeRowSize = pCtx->subsidiaries.num * sizeof(bool);
|
||||
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
|
||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
||||
completeRowSize += pc->pExpr->base.resSchema.bytes;
|
||||
}
|
||||
|
||||
if (pCtx->curBufPage == -1) {
|
||||
pPage = getNewBufPage(pCtx->pBuf, 0, &pCtx->curBufPage);
|
||||
pPage->num = sizeof(SFilePage);
|
||||
} else {
|
||||
pPage = getBufPage(pCtx->pBuf, pCtx->curBufPage);
|
||||
if (pPage->num + completeRowSize > getBufPageSize(pCtx->pBuf)) {
|
||||
// current page is all used, let's prepare a new buffer page
|
||||
releaseBufPage(pCtx->pBuf, pPage);
|
||||
pPage = getNewBufPage(pCtx->pBuf, 0, &pCtx->curBufPage);
|
||||
pPage->num = sizeof(SFilePage);
|
||||
}
|
||||
}
|
||||
|
||||
pPos->pageId = pCtx->curBufPage;
|
||||
pPos->offset = pPage->num;
|
||||
|
||||
// keep the current row data, extract method
|
||||
int32_t offset = 0;
|
||||
bool* nullList = (bool*)((char*)pPage + pPage->num);
|
||||
char* pStart = (char*)(nullList + sizeof(bool) * pCtx->subsidiaries.num);
|
||||
for (int32_t i = 0; i < pCtx->subsidiaries.num; ++i) {
|
||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[i];
|
||||
for (int32_t i = 0; i < pSubsidiaryies->num; ++i) {
|
||||
SqlFunctionCtx* pc = pSubsidiaryies->pCtx[i];
|
||||
|
||||
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
|
||||
int32_t srcSlotId = pFuncParam->pCol->slotId;
|
||||
|
@ -3593,50 +3563,90 @@ void doSaveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock*
|
|||
offset += pCol->info.bytes;
|
||||
}
|
||||
|
||||
pPage->num += completeRowSize;
|
||||
|
||||
setBufPageDirty(pPage, true);
|
||||
releaseBufPage(pCtx->pBuf, pPage);
|
||||
#ifdef BUF_PAGE_DEBUG
|
||||
qDebug("page_saveTuple pos:%p,pageId:%d, offset:%d\n", pPos, pPos->pageId, pPos->offset);
|
||||
#endif
|
||||
return buf;
|
||||
}
|
||||
|
||||
void doCopyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
|
||||
SFilePage* pPage = getBufPage(pCtx->pBuf, pPos->pageId);
|
||||
static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length) {
|
||||
STuplePos p = {0};
|
||||
if (pHandle->pBuf != NULL) {
|
||||
SFilePage* pPage = NULL;
|
||||
|
||||
int32_t numOfCols = pCtx->subsidiaries.num;
|
||||
|
||||
bool* nullList = (bool*)((char*)pPage + pPos->offset);
|
||||
char* pStart = (char*)(nullList + numOfCols * sizeof(bool));
|
||||
|
||||
int32_t offset = 0;
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[i];
|
||||
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
|
||||
int32_t srcSlotId = pFuncParam->pCol->slotId;
|
||||
|
||||
SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId);
|
||||
if ((nullList[i] = colDataIsNull_s(pCol, rowIndex)) == true) {
|
||||
offset += pCol->info.bytes;
|
||||
continue;
|
||||
}
|
||||
|
||||
char* p = colDataGetData(pCol, rowIndex);
|
||||
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
|
||||
memcpy(pStart + offset, p, (pCol->info.type == TSDB_DATA_TYPE_JSON) ? getJsonValueLen(p) : varDataTLen(p));
|
||||
if (pHandle->currentPage == -1) {
|
||||
pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage);
|
||||
pPage->num = sizeof(SFilePage);
|
||||
} else {
|
||||
memcpy(pStart + offset, p, pCol->info.bytes);
|
||||
pPage = getBufPage(pHandle->pBuf, pHandle->currentPage);
|
||||
if (pPage->num + length > getBufPageSize(pHandle->pBuf)) {
|
||||
// current page is all used, let's prepare a new buffer page
|
||||
releaseBufPage(pHandle->pBuf, pPage);
|
||||
pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage);
|
||||
pPage->num = sizeof(SFilePage);
|
||||
}
|
||||
}
|
||||
|
||||
offset += pCol->info.bytes;
|
||||
p = (STuplePos) {.pageId = pHandle->currentPage, .offset = pPage->num};
|
||||
memcpy(pPage->data + pPage->num, pBuf, length);
|
||||
|
||||
pPage->num += length;
|
||||
setBufPageDirty(pPage, true);
|
||||
releaseBufPage(pHandle->pBuf, pPage);
|
||||
} else {
|
||||
// other tuple save policy
|
||||
}
|
||||
|
||||
setBufPageDirty(pPage, true);
|
||||
releaseBufPage(pCtx->pBuf, pPage);
|
||||
#ifdef BUF_PAGE_DEBUG
|
||||
qDebug("page_copyTuple pos:%p, pageId:%d, offset:%d", pPos, pPos->pageId, pPos->offset);
|
||||
#endif
|
||||
return p;
|
||||
}
|
||||
|
||||
STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock) {
|
||||
if (pCtx->subsidiaries.rowLen == 0) {
|
||||
int32_t rowLen = 0;
|
||||
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
|
||||
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
|
||||
rowLen += pc->pExpr->base.resSchema.bytes;
|
||||
}
|
||||
|
||||
pCtx->subsidiaries.rowLen = rowLen + pCtx->subsidiaries.num * sizeof(bool);
|
||||
pCtx->subsidiaries.buf = taosMemoryMalloc(pCtx->subsidiaries.rowLen);
|
||||
}
|
||||
|
||||
char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf);
|
||||
return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen);
|
||||
}
|
||||
|
||||
static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STuplePos* pPos) {
|
||||
if (pHandle->pBuf != NULL) {
|
||||
SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId);
|
||||
memcpy(pPage->data + pPos->offset, pBuf, length);
|
||||
setBufPageDirty(pPage, true);
|
||||
releaseBufPage(pHandle->pBuf, pPage);
|
||||
} else {
|
||||
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
|
||||
int32_t rowLen = 0;
|
||||
int32_t completeRowSize = rowLen + pCtx->subsidiaries.num * sizeof(bool);
|
||||
char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf);
|
||||
doUpdateTupleData(&pCtx->saveHandle, buf, completeRowSize, pPos);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static char* doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPos) {
|
||||
if (pHandle->pBuf != NULL) {
|
||||
SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId);
|
||||
char* p = pPage->data + pPos->offset;
|
||||
releaseBufPage(pHandle->pBuf, pPage);
|
||||
return p;
|
||||
} else {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos) {
|
||||
return doLoadTupleData(&pCtx->saveHandle, pPos);
|
||||
}
|
||||
|
||||
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||
|
@ -3788,8 +3798,6 @@ int32_t spreadFunction(SqlFunctionCtx* pCtx) {
|
|||
SColumnInfoData* pCol = pInput->pData[0];
|
||||
|
||||
int32_t start = pInput->startRowIndex;
|
||||
int32_t numOfRows = pInput->numOfRows;
|
||||
|
||||
// check the valid data one by one
|
||||
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
|
||||
if (colDataIsNull_f(pCol->nullbitmap, i)) {
|
||||
|
@ -4964,7 +4972,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da
|
|||
if (pInfo->numSampled < pInfo->samples) {
|
||||
sampleAssignResult(pInfo, data, pInfo->numSampled);
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[pInfo->numSampled]);
|
||||
pInfo->tuplePos[pInfo->numSampled] = saveTupleData(pCtx, index, pCtx->pSrcBlock);
|
||||
}
|
||||
pInfo->numSampled++;
|
||||
} else {
|
||||
|
@ -4972,7 +4980,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da
|
|||
if (j < pInfo->samples) {
|
||||
sampleAssignResult(pInfo, data, j);
|
||||
if (pCtx->subsidiaries.num > 0) {
|
||||
doCopyTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[j]);
|
||||
updateTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[j]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -4995,7 +5003,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
|
||||
if (pInfo->numSampled == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) {
|
||||
doSaveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pInfo->nullTuplePos);
|
||||
pInfo->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
|
||||
pInfo->nullTupleSaved = true;
|
||||
}
|
||||
|
||||
|
|
|
@ -372,7 +372,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
|
|||
pPageIdList = pList;
|
||||
}
|
||||
|
||||
pSlot->info.data = getNewBufPage(pBucket->pBuffer, groupId, &pageId);
|
||||
pSlot->info.data = getNewBufPage(pBucket->pBuffer, &pageId);
|
||||
pSlot->info.pageId = pageId;
|
||||
taosArrayPush(pPageIdList, &pageId);
|
||||
}
|
||||
|
|
|
@ -371,7 +371,7 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) {
|
||||
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
|
||||
pBuf->statis.getPages += 1;
|
||||
|
||||
char* availablePage = NULL;
|
||||
|
|
|
@ -18,7 +18,7 @@ void simpleTest() {
|
|||
int32_t pageId = 0;
|
||||
int32_t groupId = 0;
|
||||
|
||||
SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
|
||||
ASSERT_TRUE(pBufPage != NULL);
|
||||
|
||||
ASSERT_EQ(getTotalBufSize(pBuf), 1024);
|
||||
|
@ -29,26 +29,26 @@ void simpleTest() {
|
|||
|
||||
releaseBufPage(pBuf, pBufPage);
|
||||
|
||||
SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
|
||||
|
||||
SFilePage* t = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t == pBufPage1);
|
||||
|
||||
SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
|
||||
SFilePage* t1 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t1 == pBufPage2);
|
||||
|
||||
SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
|
||||
SFilePage* t2 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t2 == pBufPage3);
|
||||
|
||||
SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
|
||||
SFilePage* t3 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t3 == pBufPage4);
|
||||
|
||||
releaseBufPage(pBuf, pBufPage2);
|
||||
|
||||
SFilePage* pBufPage5 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* pBufPage5 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
|
||||
SFilePage* t4 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t4 == pBufPage5);
|
||||
|
||||
|
@ -64,7 +64,7 @@ void writeDownTest() {
|
|||
int32_t groupId = 0;
|
||||
int32_t nx = 12345;
|
||||
|
||||
SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
|
||||
ASSERT_TRUE(pBufPage != NULL);
|
||||
|
||||
*(int32_t*)(pBufPage->data) = nx;
|
||||
|
@ -73,22 +73,22 @@ void writeDownTest() {
|
|||
setBufPageDirty(pBufPage, true);
|
||||
releaseBufPage(pBuf, pBufPage);
|
||||
|
||||
SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
|
||||
SFilePage* t1 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t1 == pBufPage1);
|
||||
ASSERT_TRUE(pageId == 1);
|
||||
|
||||
SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
|
||||
SFilePage* t2 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t2 == pBufPage2);
|
||||
ASSERT_TRUE(pageId == 2);
|
||||
|
||||
SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
|
||||
SFilePage* t3 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t3 == pBufPage3);
|
||||
ASSERT_TRUE(pageId == 3);
|
||||
|
||||
SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
|
||||
SFilePage* t4 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t4 == pBufPage4);
|
||||
ASSERT_TRUE(pageId == 4);
|
||||
|
@ -113,32 +113,32 @@ void recyclePageTest() {
|
|||
int32_t groupId = 0;
|
||||
int32_t nx = 12345;
|
||||
|
||||
SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
|
||||
ASSERT_TRUE(pBufPage != NULL);
|
||||
releaseBufPage(pBuf, pBufPage);
|
||||
|
||||
SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
|
||||
SFilePage* t1 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t1 == pBufPage1);
|
||||
ASSERT_TRUE(pageId == 1);
|
||||
|
||||
SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
|
||||
SFilePage* t2 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t2 == pBufPage2);
|
||||
ASSERT_TRUE(pageId == 2);
|
||||
|
||||
SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
|
||||
SFilePage* t3 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t3 == pBufPage3);
|
||||
ASSERT_TRUE(pageId == 3);
|
||||
|
||||
SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
|
||||
SFilePage* t4 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t4 == pBufPage4);
|
||||
ASSERT_TRUE(pageId == 4);
|
||||
releaseBufPage(pBuf, t4);
|
||||
|
||||
SFilePage* pBufPage5 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId));
|
||||
SFilePage* pBufPage5 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
|
||||
SFilePage* t5 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
|
||||
ASSERT_TRUE(t5 == pBufPage5);
|
||||
ASSERT_TRUE(pageId == 5);
|
||||
|
|
Loading…
Reference in New Issue