Merge pull request #16495 from taosdata/feature/3_liaohj

refactor(query): do some internal refactor.
This commit is contained in:
Haojun Liao 2022-08-30 17:58:40 +08:00 committed by GitHub
commit 707c20c46c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 213 additions and 218 deletions

View File

@ -92,6 +92,8 @@ struct SResultRowEntryInfo;
//for selectivity query, the corresponding tag value is assigned if the data is qualified //for selectivity query, the corresponding tag value is assigned if the data is qualified
typedef struct SSubsidiaryResInfo { typedef struct SSubsidiaryResInfo {
int16_t num; int16_t num;
int32_t rowLen;
char* buf; // serialize data buffer
struct SqlFunctionCtx **pCtx; struct SqlFunctionCtx **pCtx;
} SSubsidiaryResInfo; } SSubsidiaryResInfo;
@ -118,6 +120,11 @@ typedef struct SInputColumnInfoData {
uint64_t uid; // table uid, used to set the tag value when building the final query result for selectivity functions. uint64_t uid; // table uid, used to set the tag value when building the final query result for selectivity functions.
} SInputColumnInfoData; } SInputColumnInfoData;
typedef struct SSerializeDataHandle {
struct SDiskbasedBuf* pBuf;
int32_t currentPage;
} SSerializeDataHandle;
// sql function runtime context // sql function runtime context
typedef struct SqlFunctionCtx { typedef struct SqlFunctionCtx {
SInputColumnInfoData input; SInputColumnInfoData input;
@ -137,10 +144,9 @@ typedef struct SqlFunctionCtx {
SFuncExecFuncs fpSet; SFuncExecFuncs fpSet;
SScalarFuncExecFuncs sfp; SScalarFuncExecFuncs sfp;
struct SExprInfo *pExpr; struct SExprInfo *pExpr;
struct SDiskbasedBuf *pBuf;
struct SSDataBlock *pSrcBlock; struct SSDataBlock *pSrcBlock;
struct SSDataBlock *pDstBlock; // used by indefinite rows function to set selectivity struct SSDataBlock *pDstBlock; // used by indefinite rows function to set selectivity
int32_t curBufPage; SSerializeDataHandle saveHandle;
bool isStream; bool isStream;
char udfName[TSDB_FUNC_NAME_LEN]; char udfName[TSDB_FUNC_NAME_LEN];

View File

@ -58,11 +58,10 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
/** /**
* *
* @param pBuf * @param pBuf
* @param groupId
* @param pageId * @param pageId
* @return * @return
*/ */
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId); void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId);
/** /**
* *

View File

@ -303,6 +303,7 @@ typedef struct SAggSupporter {
char* keyBuf; // window key buffer char* keyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
int32_t currentPageId; // current write page id
} SAggSupporter; } SAggSupporter;
typedef struct { typedef struct {
@ -327,7 +328,6 @@ typedef struct STableScanInfo {
SQueryTableDataCond cond; SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag; int32_t dataBlockLoadFlag;
// SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded.
SSampleExecInfo sample; // sample execution info SSampleExecInfo sample; // sample execution info
int32_t currentGroupId; int32_t currentGroupId;
int32_t currentTable; int32_t currentTable;
@ -431,6 +431,7 @@ typedef struct SStreamAggSupporter {
char* pKeyBuf; // window key buffer char* pKeyBuf; // window key buffer
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
int32_t currentPageId; // buffer page that is active
SSDataBlock* pScanBlock; SSDataBlock* pScanBlock;
} SStreamAggSupporter; } SStreamAggSupporter;
@ -1009,7 +1010,7 @@ int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimary
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput,
int32_t size); int32_t size);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize); SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize);
SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, SResultWindowInfo* getSessionTimeWindow(SStreamAggSupporter* pAggSup, TSKEY startTs,
TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex); TSKEY endTs, uint64_t groupId, int64_t gap, int32_t* pIndex);
SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, SResultWindowInfo* getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs,

View File

@ -46,8 +46,8 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
rowSize += pCtx[i].resDataInfo.interBufSize; rowSize += pCtx[i].resDataInfo.interBufSize;
} }
rowSize += rowSize += (numOfOutput * sizeof(bool));
(numOfOutput * sizeof(bool)); // expand rowSize to mark if col is null for top/bottom result(doSaveTupleData) // expand rowSize to mark if col is null for top/bottom result(saveTupleData)
return rowSize; return rowSize;
} }
@ -1178,7 +1178,6 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
SqlFunctionCtx* pCtx = &pFuncCtx[i]; SqlFunctionCtx* pCtx = &pFuncCtx[i];
pCtx->functionId = -1; pCtx->functionId = -1;
pCtx->curBufPage = -1;
pCtx->pExpr = pExpr; pCtx->pExpr = pExpr;
if (pExpr->pExpr->nodeType == QUERY_NODE_FUNCTION) { if (pExpr->pExpr->nodeType == QUERY_NODE_FUNCTION) {
@ -1222,6 +1221,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
pCtx->isStream = false; pCtx->isStream = false;
pCtx->param = pFunct->pParam; pCtx->param = pFunct->pParam;
pCtx->saveHandle.currentPage = -1;
} }
for (int32_t i = 1; i < numOfOutput; ++i) { for (int32_t i = 1; i < numOfOutput; ++i) {

View File

@ -179,26 +179,23 @@ static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pR
} }
#endif #endif
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize) { SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
SFilePage* pData = NULL; SFilePage* pData = NULL;
// in the first scan, new space needed for results // in the first scan, new space needed for results
int32_t pageId = -1; int32_t pageId = -1;
SIDList list = getDataBufPagesIdList(pResultBuf); if (*currentPageId == -1) {
pData = getNewBufPage(pResultBuf, &pageId);
if (taosArrayGetSize(list) == 0) {
pData = getNewBufPage(pResultBuf, tableGroupId, &pageId);
pData->num = sizeof(SFilePage); pData->num = sizeof(SFilePage);
} else { } else {
SPageInfo* pi = getLastPageInfo(list); pData = getBufPage(pResultBuf, *currentPageId);
pData = getBufPage(pResultBuf, getPageId(pi)); pageId = *currentPageId;
pageId = getPageId(pi);
if (pData->num + interBufSize > getBufPageSize(pResultBuf)) { if (pData->num + interBufSize > getBufPageSize(pResultBuf)) {
// release current page first, and prepare the next one // release current page first, and prepare the next one
releaseBufPageInfo(pResultBuf, pi); releaseBufPage(pResultBuf, pData);
pData = getNewBufPage(pResultBuf, tableGroupId, &pageId); pData = getNewBufPage(pResultBuf, &pageId);
if (pData != NULL) { if (pData != NULL) {
pData->num = sizeof(SFilePage); pData->num = sizeof(SFilePage);
} }
@ -215,9 +212,9 @@ SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int
SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num); SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
pResultRow->pageId = pageId; pResultRow->pageId = pageId;
pResultRow->offset = (int32_t)pData->num; pResultRow->offset = (int32_t)pData->num;
*currentPageId = pageId;
pData->num += interBufSize; pData->num += interBufSize;
return pResultRow; return pResultRow;
} }
@ -263,11 +260,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// allocate a new buffer page // allocate a new buffer page
if (pResult == NULL) { if (pResult == NULL) {
#ifdef BUF_PAGE_DEBUG
qDebug("page_2");
#endif
ASSERT(pSup->resultRowSize > 0); ASSERT(pSup->resultRowSize > 0);
pResult = getNewResultRow(pResultBuf, groupId, pSup->resultRowSize); pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
initResultRow(pResult); initResultRow(pResult);
@ -302,7 +296,7 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes
SIDList list = getDataBufPagesIdList(pResultBuf); SIDList list = getDataBufPagesIdList(pResultBuf);
if (taosArrayGetSize(list) == 0) { if (taosArrayGetSize(list) == 0) {
pData = getNewBufPage(pResultBuf, tid, &pageId); pData = getNewBufPage(pResultBuf, &pageId);
pData->num = sizeof(SFilePage); pData->num = sizeof(SFilePage);
} else { } else {
SPageInfo* pi = getLastPageInfo(list); SPageInfo* pi = getLastPageInfo(list);
@ -313,7 +307,7 @@ static int32_t addNewWindowResultBuf(SResultRow* pWindowRes, SDiskbasedBuf* pRes
// release current page first, and prepare the next one // release current page first, and prepare the next one
releaseBufPageInfo(pResultBuf, pi); releaseBufPageInfo(pResultBuf, pi);
pData = getNewBufPage(pResultBuf, tid, &pageId); pData = getNewBufPage(pResultBuf, &pageId);
if (pData != NULL) { if (pData != NULL) {
pData->num = sizeof(SFilePage); pData->num = sizeof(SFilePage);
} }
@ -3092,7 +3086,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) {
offset += sizeof(int32_t); offset += sizeof(int32_t);
uint64_t tableGroupId = *(uint64_t*)(result + offset); uint64_t tableGroupId = *(uint64_t*)(result + offset);
SResultRow* resultRow = getNewResultRow(pSup->pResultBuf, tableGroupId, pSup->resultRowSize); SResultRow* resultRow = getNewResultRow(pSup->pResultBuf, &pSup->currentPageId, pSup->resultRowSize);
if (!resultRow) { if (!resultRow) {
return TSDB_CODE_TSC_INVALID_INPUT; return TSDB_CODE_TSC_INVALID_INPUT;
} }
@ -3440,8 +3434,10 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize, int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
const char* pKey) { const char* pKey) {
int32_t code = 0;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
pAggSup->currentPageId = -1;
pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t)); pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
pAggSup->pResultRowHashTable = tSimpleHashInit(10, hashFn); pAggSup->pResultRowHashTable = tSimpleHashInit(10, hashFn);
@ -3455,18 +3451,18 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz); getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
if (!osTempSpaceAvailable()) { if (!osTempSpaceAvailable()) {
terrno = TSDB_CODE_NO_AVAIL_DISK; code = TSDB_CODE_NO_AVAIL_DISK;
qError("Init stream agg supporter failed since %s", terrstr(terrno)); qError("Init stream agg supporter failed since %s, %s", terrstr(code), pKey);
return terrno;
}
int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
if (code != TSDB_CODE_SUCCESS) {
qError("Create agg result buf failed since %s", tstrerror(code));
return code; return code;
} }
return TSDB_CODE_SUCCESS; code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, tsTempDir);
if (code != TSDB_CODE_SUCCESS) {
qError("Create agg result buf failed since %s, %s", tstrerror(code), pKey);
return code;
}
return code;
} }
void cleanupAggSup(SAggSupporter* pAggSup) { void cleanupAggSup(SAggSupporter* pAggSup) {
@ -3488,7 +3484,7 @@ int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInf
} }
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
pSup->pCtx[i].pBuf = pAggSup->pResultBuf; pSup->pCtx[i].saveHandle.pBuf = pAggSup->pResultBuf;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -3520,6 +3516,7 @@ void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
} }
taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx); taosMemoryFreeClear(pCtx[i].subsidiaries.pCtx);
taosMemoryFreeClear(pCtx[i].subsidiaries.buf);
taosMemoryFree(pCtx[i].input.pData); taosMemoryFree(pCtx[i].input.pData);
taosMemoryFree(pCtx[i].input.pColumnDataAgg); taosMemoryFree(pCtx[i].input.pColumnDataAgg);
} }
@ -4678,6 +4675,7 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlFunctionCtx* pCtx, int32_t numOfOutput,
int32_t size) { int32_t size) {
pSup->currentPageId = -1;
pSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); pSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
pSup->keySize = sizeof(int64_t) + sizeof(TSKEY); pSup->keySize = sizeof(int64_t) + sizeof(TSKEY);
pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize); pSup->pKeyBuf = taosMemoryCalloc(1, pSup->keySize);
@ -4705,7 +4703,8 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey, SqlF
} }
int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, tsTempDir); int32_t code = createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, tsTempDir);
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
pCtx[i].pBuf = pSup->pResultBuf; pCtx[i].saveHandle.pBuf = pSup->pResultBuf;
} }
return code; return code;
} }

View File

@ -547,7 +547,7 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len); p = taosHashGet(pInfo->pGroupSet, pInfo->keyBuf, len);
int32_t pageId = 0; int32_t pageId = 0;
pPage = getNewBufPage(pInfo->pBuf, 0, &pageId); pPage = getNewBufPage(pInfo->pBuf, &pageId);
taosArrayPush(p->pPageList, &pageId); taosArrayPush(p->pPageList, &pageId);
*(int32_t *) pPage = 0; *(int32_t *) pPage = 0;
@ -562,7 +562,7 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
// add a new page for current group // add a new page for current group
int32_t pageId = 0; int32_t pageId = 0;
pPage = getNewBufPage(pInfo->pBuf, 0, &pageId); pPage = getNewBufPage(pInfo->pBuf, &pageId);
taosArrayPush(p->pPageList, &pageId); taosArrayPush(p->pPageList, &pageId);
memset(pPage, 0, getBufPageSize(pInfo->pBuf)); memset(pPage, 0, getBufPageSize(pInfo->pBuf));
} }

View File

@ -195,16 +195,6 @@ static int32_t doIngroupLimitOffset(SLimitInfo* pLimitInfo, uint64_t groupId, SS
return PROJECT_RETRIEVE_DONE; return PROJECT_RETRIEVE_DONE;
} }
void printDataBlock1(SSDataBlock* pBlock, const char* flag) {
if (!pBlock || pBlock->info.rows == 0) {
qDebug("===stream===printDataBlock: Block is Null or Empty");
return;
}
char* pBuf = NULL;
qDebug("%s", dumpBlockData(pBlock, flag, &pBuf));
taosMemoryFreeClear(pBuf);
}
SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
SProjectOperatorInfo* pProjectInfo = pOperator->info; SProjectOperatorInfo* pProjectInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pProjectInfo->binfo; SOptrBasicInfo* pInfo = &pProjectInfo->binfo;

View File

@ -1828,12 +1828,6 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt
return needed; return needed;
} }
void increaseTs(SqlFunctionCtx* pCtx) {
if (pCtx[0].pExpr->pExpr->_function.pFunctNode->funcType == FUNCTION_TYPE_WSTART) {
// pCtx[0].increase = true;
}
}
void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSupporter* pSup) { void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSupporter* pSup) {
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
// Todo(liuyao) support partition by column // Todo(liuyao) support partition by column
@ -1895,7 +1889,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
if (isStream) { if (isStream) {
ASSERT(numOfCols > 0); ASSERT(numOfCols > 0);
increaseTs(pSup->pCtx);
initStreamFunciton(pSup->pCtx, pSup->numOfExprs); initStreamFunciton(pSup->pCtx, pSup->numOfExprs);
} }
@ -3050,6 +3043,7 @@ static void clearStreamIntervalOperator(SStreamFinalIntervalOperatorInfo* pInfo)
tSimpleHashClear(pInfo->aggSup.pResultRowHashTable); tSimpleHashClear(pInfo->aggSup.pResultRowHashTable);
clearDiskbasedBuf(pInfo->aggSup.pResultBuf); clearDiskbasedBuf(pInfo->aggSup.pResultBuf);
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
pInfo->aggSup.currentPageId = -1;
} }
static void clearSpecialDataBlock(SSDataBlock* pBlock) { static void clearSpecialDataBlock(SSDataBlock* pBlock) {
@ -3420,7 +3414,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
initBasicInfo(&pInfo->binfo, pResBlock); initBasicInfo(&pInfo->binfo, pResBlock);
ASSERT(numOfCols > 0); ASSERT(numOfCols > 0);
increaseTs(pOperator->exprSupp.pCtx);
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
@ -3451,6 +3444,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
// semi interval operator does not catch result // semi interval operator does not catch result
pInfo->isFinal = false; pInfo->isFinal = false;
pOperator->name = "StreamSemiIntervalOperator"; pOperator->name = "StreamSemiIntervalOperator";
ASSERT(pInfo->aggSup.currentPageId == -1);
} }
if (!IS_FINAL_OP(pInfo) || numOfChild == 0) { if (!IS_FINAL_OP(pInfo) || numOfChild == 0) {
@ -3559,11 +3553,10 @@ int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo*
initBasicInfo(pBasicInfo, pResultBlock); initBasicInfo(pBasicInfo, pResultBlock);
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
pSup->pCtx[i].pBuf = NULL; pSup->pCtx[i].saveHandle.pBuf = NULL;
} }
ASSERT(numOfCols > 0); ASSERT(numOfCols > 0);
increaseTs(pSup->pCtx);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -3820,7 +3813,7 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes
} }
if (pWinInfo->pos.pageId == -1) { if (pWinInfo->pos.pageId == -1) {
*pResult = getNewResultRow(pAggSup->pResultBuf, groupId, pAggSup->resultRowSize); *pResult = getNewResultRow(pAggSup->pResultBuf, &pAggSup->currentPageId, pAggSup->resultRowSize);
if (*pResult == NULL) { if (*pResult == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
@ -4337,6 +4330,7 @@ static void clearStreamSessionOperator(SStreamSessionAggOperatorInfo* pInfo) {
} }
} }
clearDiskbasedBuf(pInfo->streamAggSup.pResultBuf); clearDiskbasedBuf(pInfo->streamAggSup.pResultBuf);
pInfo->streamAggSup.currentPageId = -1;
} }
static void removeSessionResults(SHashObj* pHashMap, SArray* pWins) { static void removeSessionResults(SHashObj* pHashMap, SArray* pWins) {

View File

@ -97,7 +97,7 @@ static int32_t doAddToBucket(SLHashObj* pHashObj, SLHashBucket* pBucket, int32_t
// allocate the overflow buffer page to hold this k/v. // allocate the overflow buffer page to hold this k/v.
int32_t newPageId = -1; int32_t newPageId = -1;
SFilePage* pNewPage = getNewBufPage(pHashObj->pBuf, 0, &newPageId); SFilePage* pNewPage = getNewBufPage(pHashObj->pBuf, &newPageId);
if (pNewPage == NULL) { if (pNewPage == NULL) {
return terrno; return terrno;
} }
@ -227,7 +227,7 @@ static int32_t doAddNewBucket(SLHashObj* pHashObj) {
} }
int32_t pageId = -1; int32_t pageId = -1;
SFilePage* p = getNewBufPage(pHashObj->pBuf, 0, &pageId); SFilePage* p = getNewBufPage(pHashObj->pBuf, &pageId);
if (p == NULL) { if (p == NULL) {
return terrno; return terrno;
} }

View File

@ -180,7 +180,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
} }
int32_t pageId = -1; int32_t pageId = -1;
void* pPage = getNewBufPage(pHandle->pBuf, pHandle->sourceId, &pageId); void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
if (pPage == NULL) { if (pPage == NULL) {
blockDataDestroy(p); blockDataDestroy(p);
return terrno; return terrno;
@ -512,7 +512,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
} }
int32_t pageId = -1; int32_t pageId = -1;
void* pPage = getNewBufPage(pHandle->pBuf, pHandle->sourceId, &pageId); void* pPage = getNewBufPage(pHandle->pBuf, &pageId);
if (pPage == NULL) { if (pPage == NULL) {
return terrno; return terrno;
} }

View File

@ -1146,8 +1146,9 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
return true; return true;
} }
static void doSaveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); static STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock);
static void doCopyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); static int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos);
static const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos);
static int32_t findRowIndex(int32_t start, int32_t num, SColumnInfoData* pCol, const char* tval) { static int32_t findRowIndex(int32_t start, int32_t num, SColumnInfoData* pCol, const char* tval) {
// the data is loaded, not only the block SMA value // the data is loaded, not only the block SMA value
@ -1199,7 +1200,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
pBuf->v = *(int64_t*)tval; pBuf->v = *(int64_t*)tval;
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
} }
} else { } else {
if (IS_SIGNED_NUMERIC_TYPE(type)) { if (IS_SIGNED_NUMERIC_TYPE(type)) {
@ -1211,7 +1212,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
*(int64_t*)&pBuf->v = val; *(int64_t*)&pBuf->v = val;
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
} }
} }
@ -1224,7 +1225,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
*(uint64_t*)&pBuf->v = val; *(uint64_t*)&pBuf->v = val;
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
} }
} }
} else if (type == TSDB_DATA_TYPE_DOUBLE) { } else if (type == TSDB_DATA_TYPE_DOUBLE) {
@ -1236,7 +1237,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
*(double*)&pBuf->v = val; *(double*)&pBuf->v = val;
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
} }
} }
} else if (type == TSDB_DATA_TYPE_FLOAT) { } else if (type == TSDB_DATA_TYPE_FLOAT) {
@ -1250,7 +1251,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval);
doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos); pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock);
} }
} }
} }
@ -1275,7 +1276,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
@ -1287,7 +1288,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if ((*val < pData[i]) ^ isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
} }
} }
@ -1306,7 +1307,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
@ -1318,7 +1319,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if ((*val < pData[i]) ^ isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
} }
} }
@ -1337,7 +1338,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
@ -1349,7 +1350,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if ((*val < pData[i]) ^ isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
} }
} }
@ -1368,7 +1369,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
@ -1380,7 +1381,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if ((*val < pData[i]) ^ isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
} }
} }
@ -1401,7 +1402,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
@ -1413,7 +1414,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if ((*val < pData[i]) ^ isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
} }
} }
@ -1432,7 +1433,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
@ -1444,7 +1445,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if ((*val < pData[i]) ^ isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
} }
} }
@ -1463,7 +1464,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
@ -1475,7 +1476,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if ((*val < pData[i]) ^ isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
} }
} }
@ -1494,7 +1495,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
@ -1506,7 +1507,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if ((*val < pData[i]) ^ isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
} }
} }
@ -1526,7 +1527,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
@ -1538,7 +1539,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if ((*val < pData[i]) ^ isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
} }
} }
@ -1557,7 +1558,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if (!pBuf->assign) { if (!pBuf->assign) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
doSaveTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock);
} }
pBuf->assign = true; pBuf->assign = true;
} else { } else {
@ -1569,7 +1570,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
if ((*val < pData[i]) ^ isMinFunc) { if ((*val < pData[i]) ^ isMinFunc) {
*val = pData[i]; *val = pData[i];
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
doCopyTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos);
} }
} }
} }
@ -1580,7 +1581,7 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
_min_max_over: _min_max_over:
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pBuf->nullTupleSaved ) { if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pBuf->nullTupleSaved ) {
doSaveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pBuf->nullTuplePos); pBuf->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
pBuf->nullTupleSaved = true; pBuf->nullTupleSaved = true;
} }
return numOfElems; return numOfElems;
@ -1599,8 +1600,7 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) {
} }
static void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex); static void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex);
static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, int32_t rowIndex);
static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, int32_t rIndex);
int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
@ -1648,21 +1648,17 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple
return; return;
} }
int32_t pageId = pTuplePos->pageId; if (pCtx->saveHandle.pBuf != NULL) {
int32_t offset = pTuplePos->offset;
if (pTuplePos->pageId != -1) { if (pTuplePos->pageId != -1) {
int32_t numOfCols = pCtx->subsidiaries.num; int32_t numOfCols = pCtx->subsidiaries.num;
SFilePage* pPage = getBufPage(pCtx->pBuf, pageId); const char* p = loadTupleData(pCtx, pTuplePos);
bool* nullList = (bool*)((char*)pPage + offset); bool* nullList = (bool*)p;
char* pStart = (char*)(nullList + numOfCols * sizeof(bool)); char* pStart = (char*)(nullList + numOfCols * sizeof(bool));
// todo set the offset value to optimize the performance. // todo set the offset value to optimize the performance.
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { for (int32_t j = 0; j < numOfCols; ++j) {
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId; int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
@ -1674,8 +1670,7 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple
} }
pStart += pDstCol->info.bytes; pStart += pDstCol->info.bytes;
} }
}
releaseBufPage(pCtx->pBuf, pPage);
} }
} }
@ -2756,15 +2751,15 @@ static FORCE_INLINE TSKEY getRowPTs(SColumnInfoData* pTsColInfo, int32_t rowInde
return *(TSKEY*)colDataGetData(pTsColInfo, rowIndex); return *(TSKEY*)colDataGetData(pTsColInfo, rowIndex);
} }
static void saveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx, SFirstLastRes* pInfo) { static void firstlastSaveTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SqlFunctionCtx* pCtx, SFirstLastRes* pInfo) {
if (pCtx->subsidiaries.num <= 0) { if (pCtx->subsidiaries.num <= 0) {
return; return;
} }
if (!pInfo->hasResult) { if (!pInfo->hasResult) {
doSaveTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos); pInfo->pos = saveTupleData(pCtx, rowIndex, pSrcBlock);
} else { } else {
doCopyTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos); updateTupleData(pCtx, rowIndex, pSrcBlock, &pInfo->pos);
} }
} }
@ -2778,7 +2773,7 @@ static void doSaveCurrentVal(SqlFunctionCtx* pCtx, int32_t rowIndex, int64_t cur
memcpy(pInfo->buf, pData, pInfo->bytes); memcpy(pInfo->buf, pData, pInfo->bytes);
pInfo->ts = currentTs; pInfo->ts = currentTs;
saveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo); firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo);
pInfo->hasResult = true; pInfo->hasResult = true;
} }
@ -2982,7 +2977,7 @@ static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, S
pOutput->bytes = pInput->bytes; pOutput->bytes = pInput->bytes;
memcpy(pOutput->buf, pInput->buf, pOutput->bytes); memcpy(pOutput->buf, pInput->buf, pOutput->bytes);
saveTupleData(pCtx->pSrcBlock, start, pCtx, pOutput); firstlastSaveTupleData(pCtx->pSrcBlock, start, pCtx, pOutput);
pOutput->hasResult = true; pOutput->hasResult = true;
} }
@ -3087,7 +3082,7 @@ static void doSaveLastrow(SqlFunctionCtx* pCtx, char* pData, int32_t rowIndex, i
} }
pInfo->ts = cts; pInfo->ts = cts;
saveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo); firstlastSaveTupleData(pCtx->pSrcBlock, rowIndex, pCtx, pInfo);
pInfo->hasResult = true; pInfo->hasResult = true;
} }
@ -3420,7 +3415,7 @@ int32_t topFunction(SqlFunctionCtx* pCtx) {
} }
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) { if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) {
doSaveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pRes->nullTuplePos); pRes->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
pRes->nullTupleSaved = true; pRes->nullTupleSaved = true;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -3448,7 +3443,7 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) {
} }
if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) { if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pRes->nullTupleSaved) {
doSaveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pRes->nullTuplePos); pRes->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
pRes->nullTupleSaved = true; pRes->nullTupleSaved = true;
} }
@ -3500,7 +3495,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
// save the data of this tuple // save the data of this tuple
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
doSaveTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); pItem->tuplePos = saveTupleData(pCtx, rowIndex, pSrcBlock);
} }
#ifdef BUF_PAGE_DEBUG #ifdef BUF_PAGE_DEBUG
qDebug("page_saveTuple i:%d, item:%p,pageId:%d, offset:%d\n", pEntryInfo->numOfRes, pItem, pItem->tuplePos.pageId, qDebug("page_saveTuple i:%d, item:%p,pageId:%d, offset:%d\n", pEntryInfo->numOfRes, pItem, pItem->tuplePos.pageId,
@ -3524,7 +3519,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
// save the data of this tuple by over writing the old data // save the data of this tuple by over writing the old data
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
doCopyTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); updateTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos);
} }
#ifdef BUF_PAGE_DEBUG #ifdef BUF_PAGE_DEBUG
qDebug("page_copyTuple pageId:%d, offset:%d", pItem->tuplePos.pageId, pItem->tuplePos.offset); qDebug("page_copyTuple pageId:%d, offset:%d", pItem->tuplePos.pageId, pItem->tuplePos.offset);
@ -3541,38 +3536,13 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData
* |(n columns, one bit for each column)| src column #1| src column #2| * |(n columns, one bit for each column)| src column #1| src column #2|
* +------------------------------------+--------------+--------------+ * +------------------------------------+--------------+--------------+
*/ */
void doSaveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsidiaryResInfo* pSubsidiaryies, char* buf) {
SFilePage* pPage = NULL; char* nullList = buf;
char* pStart = (char*)(nullList + sizeof(bool) * pSubsidiaryies->num);
// todo refactor: move away
int32_t completeRowSize = pCtx->subsidiaries.num * sizeof(bool);
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
completeRowSize += pc->pExpr->base.resSchema.bytes;
}
if (pCtx->curBufPage == -1) {
pPage = getNewBufPage(pCtx->pBuf, 0, &pCtx->curBufPage);
pPage->num = sizeof(SFilePage);
} else {
pPage = getBufPage(pCtx->pBuf, pCtx->curBufPage);
if (pPage->num + completeRowSize > getBufPageSize(pCtx->pBuf)) {
// current page is all used, let's prepare a new buffer page
releaseBufPage(pCtx->pBuf, pPage);
pPage = getNewBufPage(pCtx->pBuf, 0, &pCtx->curBufPage);
pPage->num = sizeof(SFilePage);
}
}
pPos->pageId = pCtx->curBufPage;
pPos->offset = pPage->num;
// keep the current row data, extract method
int32_t offset = 0; int32_t offset = 0;
bool* nullList = (bool*)((char*)pPage + pPage->num); for (int32_t i = 0; i < pSubsidiaryies->num; ++i) {
char* pStart = (char*)(nullList + sizeof(bool) * pCtx->subsidiaries.num); SqlFunctionCtx* pc = pSubsidiaryies->pCtx[i];
for (int32_t i = 0; i < pCtx->subsidiaries.num; ++i) {
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[i];
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0]; SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
int32_t srcSlotId = pFuncParam->pCol->slotId; int32_t srcSlotId = pFuncParam->pCol->slotId;
@ -3593,50 +3563,88 @@ void doSaveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock*
offset += pCol->info.bytes; offset += pCol->info.bytes;
} }
pPage->num += completeRowSize; return buf;
setBufPageDirty(pPage, true);
releaseBufPage(pCtx->pBuf, pPage);
#ifdef BUF_PAGE_DEBUG
qDebug("page_saveTuple pos:%p,pageId:%d, offset:%d\n", pPos, pPos->pageId, pPos->offset);
#endif
} }
void doCopyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { static STuplePos doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length) {
SFilePage* pPage = getBufPage(pCtx->pBuf, pPos->pageId); STuplePos p = {0};
if (pHandle->pBuf != NULL) {
SFilePage* pPage = NULL;
int32_t numOfCols = pCtx->subsidiaries.num; if (pHandle->currentPage == -1) {
pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage);
bool* nullList = (bool*)((char*)pPage + pPos->offset); pPage->num = sizeof(SFilePage);
char* pStart = (char*)(nullList + numOfCols * sizeof(bool));
int32_t offset = 0;
for (int32_t i = 0; i < numOfCols; ++i) {
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[i];
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
int32_t srcSlotId = pFuncParam->pCol->slotId;
SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId);
if ((nullList[i] = colDataIsNull_s(pCol, rowIndex)) == true) {
offset += pCol->info.bytes;
continue;
}
char* p = colDataGetData(pCol, rowIndex);
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
memcpy(pStart + offset, p, (pCol->info.type == TSDB_DATA_TYPE_JSON) ? getJsonValueLen(p) : varDataTLen(p));
} else { } else {
memcpy(pStart + offset, p, pCol->info.bytes); pPage = getBufPage(pHandle->pBuf, pHandle->currentPage);
if (pPage->num + length > getBufPageSize(pHandle->pBuf)) {
// current page is all used, let's prepare a new buffer page
releaseBufPage(pHandle->pBuf, pPage);
pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage);
pPage->num = sizeof(SFilePage);
}
} }
offset += pCol->info.bytes; p = (STuplePos) {.pageId = pHandle->currentPage, .offset = pPage->num};
} memcpy(pPage->data + pPage->num, pBuf, length);
pPage->num += length;
setBufPageDirty(pPage, true); setBufPageDirty(pPage, true);
releaseBufPage(pCtx->pBuf, pPage); releaseBufPage(pHandle->pBuf, pPage);
#ifdef BUF_PAGE_DEBUG } else {
qDebug("page_copyTuple pos:%p, pageId:%d, offset:%d", pPos, pPos->pageId, pPos->offset); // other tuple save policy
#endif }
return p;
}
STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock) {
if (pCtx->subsidiaries.rowLen == 0) {
int32_t rowLen = 0;
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
rowLen += pc->pExpr->base.resSchema.bytes;
}
pCtx->subsidiaries.rowLen = rowLen + pCtx->subsidiaries.num * sizeof(bool);
pCtx->subsidiaries.buf = taosMemoryMalloc(pCtx->subsidiaries.rowLen);
}
char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf);
return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen);
}
static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STuplePos* pPos) {
if (pHandle->pBuf != NULL) {
SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId);
memcpy(pPage->data + pPos->offset, pBuf, length);
setBufPageDirty(pPage, true);
releaseBufPage(pHandle->pBuf, pPage);
} else {
}
return TSDB_CODE_SUCCESS;
}
static int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) {
char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf);
doUpdateTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, pPos);
return TSDB_CODE_SUCCESS;
}
static char* doLoadTupleData(SSerializeDataHandle* pHandle, const STuplePos* pPos) {
if (pHandle->pBuf != NULL) {
SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId);
char* p = pPage->data + pPos->offset;
releaseBufPage(pHandle->pBuf, pPage);
return p;
} else {
return NULL;
}
}
static const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos) {
return doLoadTupleData(&pCtx->saveHandle, pPos);
} }
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
@ -3788,8 +3796,6 @@ int32_t spreadFunction(SqlFunctionCtx* pCtx) {
SColumnInfoData* pCol = pInput->pData[0]; SColumnInfoData* pCol = pInput->pData[0];
int32_t start = pInput->startRowIndex; int32_t start = pInput->startRowIndex;
int32_t numOfRows = pInput->numOfRows;
// check the valid data one by one // check the valid data one by one
for (int32_t i = start; i < pInput->numOfRows + start; ++i) { for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
if (colDataIsNull_f(pCol->nullbitmap, i)) { if (colDataIsNull_f(pCol->nullbitmap, i)) {
@ -4964,7 +4970,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da
if (pInfo->numSampled < pInfo->samples) { if (pInfo->numSampled < pInfo->samples) {
sampleAssignResult(pInfo, data, pInfo->numSampled); sampleAssignResult(pInfo, data, pInfo->numSampled);
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
doSaveTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[pInfo->numSampled]); pInfo->tuplePos[pInfo->numSampled] = saveTupleData(pCtx, index, pCtx->pSrcBlock);
} }
pInfo->numSampled++; pInfo->numSampled++;
} else { } else {
@ -4972,7 +4978,7 @@ static void doReservoirSample(SqlFunctionCtx* pCtx, SSampleInfo* pInfo, char* da
if (j < pInfo->samples) { if (j < pInfo->samples) {
sampleAssignResult(pInfo, data, j); sampleAssignResult(pInfo, data, j);
if (pCtx->subsidiaries.num > 0) { if (pCtx->subsidiaries.num > 0) {
doCopyTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[j]); updateTupleData(pCtx, index, pCtx->pSrcBlock, &pInfo->tuplePos[j]);
} }
} }
} }
@ -4995,7 +5001,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
} }
if (pInfo->numSampled == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) { if (pInfo->numSampled == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) {
doSaveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pInfo->nullTuplePos); pInfo->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock);
pInfo->nullTupleSaved = true; pInfo->nullTupleSaved = true;
} }

View File

@ -372,7 +372,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
pPageIdList = pList; pPageIdList = pList;
} }
pSlot->info.data = getNewBufPage(pBucket->pBuffer, groupId, &pageId); pSlot->info.data = getNewBufPage(pBucket->pBuffer, &pageId);
pSlot->info.pageId = pageId; pSlot->info.pageId = pageId;
taosArrayPush(pPageIdList, &pageId); taosArrayPush(pPageIdList, &pageId);
} }

View File

@ -371,7 +371,7 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) { void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
pBuf->statis.getPages += 1; pBuf->statis.getPages += 1;
char* availablePage = NULL; char* availablePage = NULL;

View File

@ -18,7 +18,7 @@ void simpleTest() {
int32_t pageId = 0; int32_t pageId = 0;
int32_t groupId = 0; int32_t groupId = 0;
SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId)); SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
ASSERT_TRUE(pBufPage != NULL); ASSERT_TRUE(pBufPage != NULL);
ASSERT_EQ(getTotalBufSize(pBuf), 1024); ASSERT_EQ(getTotalBufSize(pBuf), 1024);
@ -29,26 +29,26 @@ void simpleTest() {
releaseBufPage(pBuf, pBufPage); releaseBufPage(pBuf, pBufPage);
SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId)); SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t = static_cast<SFilePage*>(getBufPage(pBuf, pageId)); SFilePage* t = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t == pBufPage1); ASSERT_TRUE(t == pBufPage1);
SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId)); SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t1 = static_cast<SFilePage*>(getBufPage(pBuf, pageId)); SFilePage* t1 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t1 == pBufPage2); ASSERT_TRUE(t1 == pBufPage2);
SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId)); SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t2 = static_cast<SFilePage*>(getBufPage(pBuf, pageId)); SFilePage* t2 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t2 == pBufPage3); ASSERT_TRUE(t2 == pBufPage3);
SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId)); SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t3 = static_cast<SFilePage*>(getBufPage(pBuf, pageId)); SFilePage* t3 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t3 == pBufPage4); ASSERT_TRUE(t3 == pBufPage4);
releaseBufPage(pBuf, pBufPage2); releaseBufPage(pBuf, pBufPage2);
SFilePage* pBufPage5 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId)); SFilePage* pBufPage5 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t4 = static_cast<SFilePage*>(getBufPage(pBuf, pageId)); SFilePage* t4 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t4 == pBufPage5); ASSERT_TRUE(t4 == pBufPage5);
@ -64,7 +64,7 @@ void writeDownTest() {
int32_t groupId = 0; int32_t groupId = 0;
int32_t nx = 12345; int32_t nx = 12345;
SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId)); SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
ASSERT_TRUE(pBufPage != NULL); ASSERT_TRUE(pBufPage != NULL);
*(int32_t*)(pBufPage->data) = nx; *(int32_t*)(pBufPage->data) = nx;
@ -73,22 +73,22 @@ void writeDownTest() {
setBufPageDirty(pBufPage, true); setBufPageDirty(pBufPage, true);
releaseBufPage(pBuf, pBufPage); releaseBufPage(pBuf, pBufPage);
SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId)); SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t1 = static_cast<SFilePage*>(getBufPage(pBuf, pageId)); SFilePage* t1 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t1 == pBufPage1); ASSERT_TRUE(t1 == pBufPage1);
ASSERT_TRUE(pageId == 1); ASSERT_TRUE(pageId == 1);
SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId)); SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t2 = static_cast<SFilePage*>(getBufPage(pBuf, pageId)); SFilePage* t2 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t2 == pBufPage2); ASSERT_TRUE(t2 == pBufPage2);
ASSERT_TRUE(pageId == 2); ASSERT_TRUE(pageId == 2);
SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId)); SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t3 = static_cast<SFilePage*>(getBufPage(pBuf, pageId)); SFilePage* t3 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t3 == pBufPage3); ASSERT_TRUE(t3 == pBufPage3);
ASSERT_TRUE(pageId == 3); ASSERT_TRUE(pageId == 3);
SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId)); SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t4 = static_cast<SFilePage*>(getBufPage(pBuf, pageId)); SFilePage* t4 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t4 == pBufPage4); ASSERT_TRUE(t4 == pBufPage4);
ASSERT_TRUE(pageId == 4); ASSERT_TRUE(pageId == 4);
@ -113,32 +113,32 @@ void recyclePageTest() {
int32_t groupId = 0; int32_t groupId = 0;
int32_t nx = 12345; int32_t nx = 12345;
SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId)); SFilePage* pBufPage = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
ASSERT_TRUE(pBufPage != NULL); ASSERT_TRUE(pBufPage != NULL);
releaseBufPage(pBuf, pBufPage); releaseBufPage(pBuf, pBufPage);
SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId)); SFilePage* pBufPage1 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t1 = static_cast<SFilePage*>(getBufPage(pBuf, pageId)); SFilePage* t1 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t1 == pBufPage1); ASSERT_TRUE(t1 == pBufPage1);
ASSERT_TRUE(pageId == 1); ASSERT_TRUE(pageId == 1);
SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId)); SFilePage* pBufPage2 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t2 = static_cast<SFilePage*>(getBufPage(pBuf, pageId)); SFilePage* t2 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t2 == pBufPage2); ASSERT_TRUE(t2 == pBufPage2);
ASSERT_TRUE(pageId == 2); ASSERT_TRUE(pageId == 2);
SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId)); SFilePage* pBufPage3 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t3 = static_cast<SFilePage*>(getBufPage(pBuf, pageId)); SFilePage* t3 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t3 == pBufPage3); ASSERT_TRUE(t3 == pBufPage3);
ASSERT_TRUE(pageId == 3); ASSERT_TRUE(pageId == 3);
SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId)); SFilePage* pBufPage4 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t4 = static_cast<SFilePage*>(getBufPage(pBuf, pageId)); SFilePage* t4 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t4 == pBufPage4); ASSERT_TRUE(t4 == pBufPage4);
ASSERT_TRUE(pageId == 4); ASSERT_TRUE(pageId == 4);
releaseBufPage(pBuf, t4); releaseBufPage(pBuf, t4);
SFilePage* pBufPage5 = static_cast<SFilePage*>(getNewBufPage(pBuf, groupId, &pageId)); SFilePage* pBufPage5 = static_cast<SFilePage*>(getNewBufPage(pBuf, &pageId));
SFilePage* t5 = static_cast<SFilePage*>(getBufPage(pBuf, pageId)); SFilePage* t5 = static_cast<SFilePage*>(getBufPage(pBuf, pageId));
ASSERT_TRUE(t5 == pBufPage5); ASSERT_TRUE(t5 == pBufPage5);
ASSERT_TRUE(pageId == 5); ASSERT_TRUE(pageId == 5);