fix(query): set the proper buffer page size.
This commit is contained in:
parent
a2a7674021
commit
2ced335b04
|
@ -1246,7 +1246,7 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t newRows = (payloadSize - additional) / rowSize;
|
int32_t newRows = (payloadSize - additional) / rowSize;
|
||||||
ASSERT(newRows <= nRows && newRows > 1);
|
ASSERT(newRows <= nRows && newRows >= 1);
|
||||||
|
|
||||||
return newRows;
|
return newRows;
|
||||||
}
|
}
|
||||||
|
|
|
@ -672,6 +672,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
|
||||||
SArray* pColList);
|
SArray* pColList);
|
||||||
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win);
|
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win);
|
||||||
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag);
|
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag);
|
||||||
|
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
|
||||||
|
|
||||||
void doSetOperatorCompleted(SOperatorInfo* pOperator);
|
void doSetOperatorCompleted(SOperatorInfo* pOperator);
|
||||||
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SArray* pColMatchInfo);
|
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SArray* pColMatchInfo);
|
||||||
|
|
|
@ -3938,6 +3938,21 @@ static void destroyOperatorInfo(SOperatorInfo* pOperator) {
|
||||||
taosMemoryFreeClear(pOperator);
|
taosMemoryFreeClear(pOperator);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz) {
|
||||||
|
*defaultPgsz = 4096;
|
||||||
|
while (*defaultPgsz < rowSize * 4) {
|
||||||
|
*defaultPgsz <<= 1u;
|
||||||
|
}
|
||||||
|
|
||||||
|
// at least four pages need to be in buffer
|
||||||
|
*defaultBufsz = 4096 * 256;
|
||||||
|
if ((*defaultBufsz) <= (*defaultPgsz)) {
|
||||||
|
(*defaultBufsz) = (*defaultPgsz) * 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
|
int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t numOfOutput, size_t keyBufSize,
|
||||||
const char* pKey) {
|
const char* pKey) {
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
|
@ -3950,18 +3965,11 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t defaultPgsz = 4096;
|
uint32_t defaultPgsz = 0;
|
||||||
while (defaultPgsz < pAggSup->resultRowSize * 4) {
|
uint32_t defaultBufsz = 0;
|
||||||
defaultPgsz <<= 1u;
|
getBufferPgSize(pAggSup->resultRowSize, &defaultPgsz, &defaultBufsz);
|
||||||
}
|
|
||||||
|
|
||||||
// at least four pages need to be in buffer
|
int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, TD_TMP_DIR_PATH);
|
||||||
int32_t defaultBufsz = 4096 * 256;
|
|
||||||
if (defaultBufsz <= defaultPgsz) {
|
|
||||||
defaultBufsz = defaultPgsz * 4;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code = createDiskbasedBuf(&pAggSup->pResultBuf, defaultPgsz, defaultBufsz, pKey, TD_TMP_DIR_PATH);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -439,7 +439,6 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
memcpy(data + (*columnLen), src, varDataTLen(src));
|
memcpy(data + (*columnLen), src, varDataTLen(src));
|
||||||
int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage);
|
int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage);
|
||||||
ASSERT(v > 0);
|
ASSERT(v > 0);
|
||||||
printf("len:%d\n", v);
|
|
||||||
|
|
||||||
contentLen = varDataTLen(src);
|
contentLen = varDataTLen(src);
|
||||||
}
|
}
|
||||||
|
@ -490,16 +489,13 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
|
||||||
|
|
||||||
int32_t *rows = (int32_t*) pPage;
|
int32_t *rows = (int32_t*) pPage;
|
||||||
if (*rows >= pInfo->rowCapacity) {
|
if (*rows >= pInfo->rowCapacity) {
|
||||||
|
// release buffer
|
||||||
|
releaseBufPage(pInfo->pBuf, pPage);
|
||||||
|
|
||||||
// add a new page for current group
|
// add a new page for current group
|
||||||
int32_t pageId = 0;
|
int32_t pageId = 0;
|
||||||
pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
|
pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
|
||||||
taosArrayPush(p->pPageList, &pageId);
|
taosArrayPush(p->pPageList, &pageId);
|
||||||
|
|
||||||
// // number of rows
|
|
||||||
// *(int32_t*) pPage = 0;
|
|
||||||
//
|
|
||||||
// uint64_t* groupId = (pPage + sizeof(int32_t));
|
|
||||||
// *groupId = 0;
|
|
||||||
memset(pPage, 0, getBufPageSize(pInfo->pBuf));
|
memset(pPage, 0, getBufPageSize(pInfo->pBuf));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -566,6 +562,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
|
||||||
blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity);
|
blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity);
|
||||||
|
|
||||||
pInfo->pageIndex += 1;
|
pInfo->pageIndex += 1;
|
||||||
|
releaseBufPage(pInfo->pBuf, page);
|
||||||
|
|
||||||
blockDataUpdateTsWindow(pInfo->binfo.pRes, 0);
|
blockDataUpdateTsWindow(pInfo->binfo.pRes, 0);
|
||||||
pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId;
|
pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId;
|
||||||
|
@ -631,7 +628,11 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = createDiskbasedBuf(&pInfo->pBuf, 4096, 4096 * 256, pTaskInfo->id.str, TD_TMP_DIR_PATH);
|
uint32_t defaultPgsz = 0;
|
||||||
|
uint32_t defaultBufsz = 0;
|
||||||
|
getBufferPgSize(pResultBlock->info.rowSize, &defaultPgsz, &defaultBufsz);
|
||||||
|
|
||||||
|
int32_t code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, TD_TMP_DIR_PATH);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue