[td-805] opt query perf
This commit is contained in:
parent
d5e0bfc42e
commit
4cb2dc53f3
|
@ -1481,7 +1481,7 @@ static bool first_last_function_setup(SQLFunctionCtx *pCtx) {
|
|||
|
||||
// todo opt for null block
|
||||
static void first_function(SQLFunctionCtx *pCtx) {
|
||||
if (pCtx->order == TSDB_ORDER_DESC) {
|
||||
if (pCtx->order == TSDB_ORDER_DESC || pCtx->preAggVals.dataBlockLoaded == false) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1550,28 +1550,17 @@ static void first_data_assign_impl(SQLFunctionCtx *pCtx, char *pData, int32_t in
|
|||
* to decide if the value is earlier than current intermediate result
|
||||
*/
|
||||
static void first_dist_function(SQLFunctionCtx *pCtx) {
|
||||
assert(pCtx->size > 0);
|
||||
|
||||
if (pCtx->size == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* do not to check data in the following cases:
|
||||
* 1. data block that are not loaded
|
||||
* 2. scan data files in desc order
|
||||
*/
|
||||
if (pCtx->order == TSDB_ORDER_DESC) {
|
||||
if (pCtx->order == TSDB_ORDER_DESC || pCtx->preAggVals.dataBlockLoaded == false) {
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t notNullElems = 0;
|
||||
|
||||
// data block is discard, not loaded, do not need to check it
|
||||
if (!pCtx->preAggVals.dataBlockLoaded) {
|
||||
return;
|
||||
}
|
||||
|
||||
// find the first not null value
|
||||
for (int32_t i = 0; i < pCtx->size; ++i) {
|
||||
char *data = GET_INPUT_CHAR_INDEX(pCtx, i);
|
||||
|
@ -1655,7 +1644,7 @@ static void first_dist_func_second_merge(SQLFunctionCtx *pCtx) {
|
|||
* least one data in this block that is not null.(TODO opt for this case)
|
||||
*/
|
||||
static void last_function(SQLFunctionCtx *pCtx) {
|
||||
if (pCtx->order != pCtx->param[0].i64Key) {
|
||||
if (pCtx->order != pCtx->param[0].i64Key || pCtx->preAggVals.dataBlockLoaded == false) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -22,26 +22,22 @@ extern "C" {
|
|||
|
||||
#include "os.h"
|
||||
#include "qextbuffer.h"
|
||||
#include "hash.h"
|
||||
|
||||
typedef struct SIDList {
|
||||
uint32_t alloc;
|
||||
int32_t size;
|
||||
int32_t* pData;
|
||||
} SIDList;
|
||||
typedef struct SArray* SIDList;
|
||||
|
||||
typedef struct SDiskbasedResultBuf {
|
||||
int32_t numOfRowsPerPage;
|
||||
int32_t numOfPages;
|
||||
int64_t totalBufSize;
|
||||
int32_t fd; // data file fd
|
||||
int32_t allocateId; // allocated page id
|
||||
int32_t incStep; // minimum allocated pages
|
||||
char* pBuf; // mmap buffer pointer
|
||||
char* path; // file path
|
||||
int32_t numOfRowsPerPage;
|
||||
int32_t numOfPages;
|
||||
int64_t totalBufSize;
|
||||
int32_t fd; // data file fd
|
||||
int32_t allocateId; // allocated page id
|
||||
int32_t incStep; // minimum allocated pages
|
||||
char* pBuf; // mmap buffer pointer
|
||||
char* path; // file path
|
||||
|
||||
uint32_t numOfAllocGroupIds; // number of allocated id list
|
||||
void* idsTable; // id hash table
|
||||
SIDList* list; // for each id, there is a page id list
|
||||
SHashObj* idsTable; // id hash table
|
||||
SIDList list; // for each id, there is a page id list
|
||||
} SDiskbasedResultBuf;
|
||||
|
||||
#define DEFAULT_INTERN_BUF_PAGE_SIZE (8192L*5)
|
||||
|
@ -112,7 +108,7 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle);
|
|||
* @param pList
|
||||
* @return
|
||||
*/
|
||||
int32_t getLastPageId(SIDList *pList);
|
||||
int32_t getLastPageId(SIDList pList);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -361,7 +361,7 @@ static bool hasTagValOutput(SQuery* pQuery) {
|
|||
* @return
|
||||
*/
|
||||
static bool hasNullValue(SColIndex* pColIndex, SDataStatis *pStatis, SDataStatis **pColStatis) {
|
||||
if (pStatis != NULL) {
|
||||
if (pStatis != NULL && !TSDB_COL_IS_TAG(pColIndex->flag)) {
|
||||
*pColStatis = &pStatis[pColIndex->colIndex];
|
||||
assert((*pColStatis)->colId == pColIndex->colId);
|
||||
} else {
|
||||
|
@ -472,10 +472,10 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult
|
|||
int32_t pageId = -1;
|
||||
SIDList list = getDataBufPagesIdList(pResultBuf, sid);
|
||||
|
||||
if (list.size == 0) {
|
||||
if (taosArrayGetSize(list) == 0) {
|
||||
pData = getNewDataBuf(pResultBuf, sid, &pageId);
|
||||
} else {
|
||||
pageId = getLastPageId(&list);
|
||||
pageId = getLastPageId(list);
|
||||
pData = GET_RES_BUF_PAGE_BY_ID(pResultBuf, pageId);
|
||||
|
||||
if (pData->num >= numOfRowsPerPage) {
|
||||
|
@ -2069,7 +2069,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, void* pQueryHandle,
|
|||
int32_t colId = pSqlFunc->colInfo.colId;
|
||||
|
||||
status |= aAggs[functionId].dataReqFunc(&pRuntimeEnv->pCtx[i], pBlockInfo->window.skey, pBlockInfo->window.ekey, colId);
|
||||
if ((status & BLK_DATA_ALL_NEEDED) != 0) {
|
||||
if ((status & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -2670,16 +2670,19 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
|
|||
SIDList list = getDataBufPagesIdList(pResultBuf, pQInfo->offset + id);
|
||||
|
||||
int32_t total = 0;
|
||||
for (int32_t i = 0; i < list.size; ++i) {
|
||||
tFilePage *pData = GET_RES_BUF_PAGE_BY_ID(pResultBuf, list.pData[i]);
|
||||
int32_t size = taosArrayGetSize(list);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
int32_t* pgId = taosArrayGet(list, i);
|
||||
tFilePage *pData = GET_RES_BUF_PAGE_BY_ID(pResultBuf, *pgId);
|
||||
total += pData->num;
|
||||
}
|
||||
|
||||
int32_t rows = total;
|
||||
|
||||
int32_t offset = 0;
|
||||
for (int32_t num = 0; num < list.size; ++num) {
|
||||
tFilePage *pData = GET_RES_BUF_PAGE_BY_ID(pResultBuf, list.pData[num]);
|
||||
for (int32_t j = 0; j < size; ++j) {
|
||||
int32_t* pgId = taosArrayGet(list, j);
|
||||
tFilePage *pData = GET_RES_BUF_PAGE_BY_ID(pResultBuf, *pgId);
|
||||
|
||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
int32_t bytes = pRuntimeEnv->pCtx[i].outputBytes;
|
||||
|
@ -2745,7 +2748,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
|
|||
STableQueryInfo *item = taosArrayGetP(pGroup, i);
|
||||
|
||||
SIDList list = getDataBufPagesIdList(pRuntimeEnv->pResultBuf, TSDB_TABLEID(item->pTable)->tid);
|
||||
if (list.size > 0 && item->windowResInfo.size > 0) {
|
||||
if (taosArrayGetSize(list) > 0 && item->windowResInfo.size > 0) {
|
||||
pTableList[numOfTables] = item;
|
||||
numOfTables += 1;
|
||||
}
|
||||
|
@ -4208,14 +4211,14 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
|
|||
|
||||
pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, pRuntimeEnv->topBotQuery, isSTableQuery);
|
||||
|
||||
if (isSTableQuery) {
|
||||
if (isSTableQuery && !onlyQueryTags(pRuntimeEnv->pQuery)) {
|
||||
int32_t rows = getInitialPageNum(pQInfo);
|
||||
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize, pQInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pQuery->intervalTime == 0) {
|
||||
if (!QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||
int16_t type = TSDB_DATA_TYPE_NULL;
|
||||
|
||||
if (pRuntimeEnv->groupbyNormalCol) { // group by columns not tags;
|
||||
|
|
|
@ -2,7 +2,6 @@
|
|||
#include "hash.h"
|
||||
#include "qextbuffer.h"
|
||||
#include "taoserror.h"
|
||||
#include "tsqlfunction.h"
|
||||
#include "queryLog.h"
|
||||
|
||||
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t size, int32_t rowSize, void* handle) {
|
||||
|
@ -20,11 +19,10 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t si
|
|||
|
||||
// init id hash table
|
||||
pResBuf->idsTable = taosHashInit(size, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
|
||||
pResBuf->list = calloc(size, sizeof(SIDList));
|
||||
pResBuf->numOfAllocGroupIds = size;
|
||||
pResBuf->list = taosArrayInit(size, POINTER_BYTES);
|
||||
|
||||
char path[4096] = {0};
|
||||
getTmpfilePath("tsdb_q_buf", path);
|
||||
getTmpfilePath("tsdb_qbuf", path);
|
||||
pResBuf->path = strdup(path);
|
||||
|
||||
pResBuf->fd = open(pResBuf->path, O_CREAT | O_RDWR, 0666);
|
||||
|
@ -48,7 +46,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t si
|
|||
return TSDB_CODE_QRY_OUT_OF_MEMORY; // todo change error code
|
||||
}
|
||||
|
||||
qDebug("QInfo:%p create tmp file for output result, %s, %" PRId64 "bytes", handle, pResBuf->path,
|
||||
qDebug("QInfo:%p create tmp file for output result: %s, %" PRId64 "bytes", handle, pResBuf->path,
|
||||
pResBuf->totalBufSize);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -90,7 +88,7 @@ static FORCE_INLINE bool noMoreAvailablePages(SDiskbasedResultBuf* pResultBuf) {
|
|||
return (pResultBuf->allocateId == pResultBuf->numOfPages - 1);
|
||||
}
|
||||
|
||||
static int32_t getGroupIndex(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
|
||||
static FORCE_INLINE int32_t getGroupIndex(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
|
||||
assert(pResultBuf != NULL);
|
||||
|
||||
char* p = taosHashGet(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t));
|
||||
|
@ -99,61 +97,30 @@ static int32_t getGroupIndex(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
|
|||
}
|
||||
|
||||
int32_t slot = GET_INT32_VAL(p);
|
||||
assert(slot >= 0 && slot < pResultBuf->numOfAllocGroupIds);
|
||||
assert(slot >= 0 && slot < taosHashGetSize(pResultBuf->idsTable));
|
||||
|
||||
return slot;
|
||||
}
|
||||
|
||||
static int32_t addNewGroupId(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
|
||||
int32_t num = getNumOfResultBufGroupId(pResultBuf); // the num is the newest allocated group id slot
|
||||
|
||||
if (pResultBuf->numOfAllocGroupIds <= num) {
|
||||
size_t n = pResultBuf->numOfAllocGroupIds << 1u;
|
||||
|
||||
SIDList* p = (SIDList*)realloc(pResultBuf->list, sizeof(SIDList) * n);
|
||||
assert(p != NULL);
|
||||
|
||||
memset(&p[pResultBuf->numOfAllocGroupIds], 0, sizeof(SIDList) * pResultBuf->numOfAllocGroupIds);
|
||||
|
||||
pResultBuf->list = p;
|
||||
pResultBuf->numOfAllocGroupIds = n;
|
||||
}
|
||||
|
||||
taosHashPut(pResultBuf->idsTable, (const char*)&groupId, sizeof(int32_t), &num, sizeof(int32_t));
|
||||
|
||||
SArray* pa = taosArrayInit(1, sizeof(int32_t));
|
||||
taosArrayPush(pResultBuf->list, &pa);
|
||||
|
||||
assert(taosArrayGetSize(pResultBuf->list) == taosHashGetSize(pResultBuf->idsTable));
|
||||
return num;
|
||||
}
|
||||
|
||||
static int32_t doRegisterId(SIDList* pList, int32_t id) {
|
||||
if (pList->size >= pList->alloc) {
|
||||
int32_t s = 0;
|
||||
if (pList->alloc == 0) {
|
||||
s = 4;
|
||||
assert(pList->pData == NULL);
|
||||
} else {
|
||||
s = pList->alloc << 1u;
|
||||
}
|
||||
|
||||
int32_t* c = realloc(pList->pData, s * sizeof(int32_t));
|
||||
assert(c);
|
||||
|
||||
memset(&c[pList->alloc], 0, sizeof(int32_t) * pList->alloc);
|
||||
|
||||
pList->pData = c;
|
||||
pList->alloc = s;
|
||||
}
|
||||
|
||||
pList->pData[pList->size++] = id;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void registerPageId(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t pageId) {
|
||||
int32_t slot = getGroupIndex(pResultBuf, groupId);
|
||||
if (slot < 0) {
|
||||
slot = addNewGroupId(pResultBuf, groupId);
|
||||
}
|
||||
|
||||
SIDList* pList = &pResultBuf->list[slot];
|
||||
doRegisterId(pList, pageId);
|
||||
SIDList pList = taosArrayGetP(pResultBuf->list, slot);
|
||||
taosArrayPush(pList, &pageId);
|
||||
}
|
||||
|
||||
tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) {
|
||||
|
@ -178,12 +145,11 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32
|
|||
int32_t getNumOfRowsPerPage(SDiskbasedResultBuf* pResultBuf) { return pResultBuf->numOfRowsPerPage; }
|
||||
|
||||
SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
|
||||
SIDList list = {0};
|
||||
int32_t slot = getGroupIndex(pResultBuf, groupId);
|
||||
if (slot < 0) {
|
||||
return list;
|
||||
return taosArrayInit(1, sizeof(int32_t));
|
||||
} else {
|
||||
return pResultBuf->list[slot];
|
||||
return taosArrayGetP(pResultBuf->list, slot);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -202,22 +168,20 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) {
|
|||
|
||||
tfree(pResultBuf->path);
|
||||
|
||||
for (int32_t i = 0; i < pResultBuf->numOfAllocGroupIds; ++i) {
|
||||
SIDList* pList = &pResultBuf->list[i];
|
||||
tfree(pList->pData);
|
||||
size_t size = taosArrayGetSize(pResultBuf->list);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SArray* pa = taosArrayGetP(pResultBuf->list, i);
|
||||
taosArrayDestroy(pa);
|
||||
}
|
||||
|
||||
tfree(pResultBuf->list);
|
||||
taosArrayDestroy(pResultBuf->list);
|
||||
taosHashCleanup(pResultBuf->idsTable);
|
||||
|
||||
tfree(pResultBuf);
|
||||
}
|
||||
|
||||
int32_t getLastPageId(SIDList *pList) {
|
||||
if (pList == NULL || pList->size <= 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return pList->pData[pList->size - 1];
|
||||
int32_t getLastPageId(SIDList pList) {
|
||||
size_t size = taosArrayGetSize(pList);
|
||||
return *(int32_t*) taosArrayGet(pList, size - 1);
|
||||
}
|
||||
|
||||
|
|
|
@ -90,6 +90,12 @@ typedef struct SBlockOrderSupporter {
|
|||
int32_t* numOfBlocksPerTable;
|
||||
} SBlockOrderSupporter;
|
||||
|
||||
typedef struct SIOCostSummary {
|
||||
int64_t blockLoadTime;
|
||||
int64_t statisInfoLoadTime;
|
||||
int64_t blockMergeTime;
|
||||
} SIOCostSummary;
|
||||
|
||||
typedef struct STsdbQueryHandle {
|
||||
STsdbRepo* pTsdb;
|
||||
SQueryFilePos cur; // current position
|
||||
|
@ -116,6 +122,8 @@ typedef struct STsdbQueryHandle {
|
|||
SArray* defaultLoadColumn;// default load column
|
||||
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
|
||||
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */
|
||||
|
||||
SIOCostSummary cost;
|
||||
} STsdbQueryHandle;
|
||||
|
||||
static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle);
|
||||
|
@ -622,6 +630,8 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
|
|||
tfree(data);
|
||||
|
||||
int64_t et = taosGetTimestampUs() - st;
|
||||
|
||||
pQueryHandle->cost.blockLoadTime += et;
|
||||
tsdbDebug("%p load file block into buffer, elapsed time:%"PRId64 " us", pQueryHandle, et);
|
||||
|
||||
return blockLoaded;
|
||||
|
@ -1784,23 +1794,22 @@ void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT* pQueryHandle, SDataBlockInfo* p
|
|||
int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataStatis** pBlockStatis) {
|
||||
STsdbQueryHandle* pHandle = (STsdbQueryHandle*) pQueryHandle;
|
||||
|
||||
SQueryFilePos* cur = &pHandle->cur;
|
||||
if (cur->mixBlock) {
|
||||
SQueryFilePos* c = &pHandle->cur;
|
||||
if (c->mixBlock) {
|
||||
*pBlockStatis = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
assert((cur->slot >= 0 && cur->slot < pHandle->numOfBlocks) ||
|
||||
((cur->slot == pHandle->numOfBlocks) && (cur->slot == 0)));
|
||||
|
||||
STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[cur->slot];
|
||||
|
||||
// file block with subblocks has no statistics data
|
||||
STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[c->slot];
|
||||
assert((c->slot >= 0 && c->slot < pHandle->numOfBlocks) || ((c->slot == pHandle->numOfBlocks) && (c->slot == 0)));
|
||||
|
||||
// file block with sub-blocks has no statistics data
|
||||
if (pBlockInfo->compBlock->numOfSubBlocks > 1) {
|
||||
*pBlockStatis = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int64_t stime = taosGetTimestampUs();
|
||||
tsdbLoadCompData(&pHandle->rhelper, pBlockInfo->compBlock, NULL);
|
||||
|
||||
// todo opt perf
|
||||
|
@ -1830,7 +1839,10 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
|
|||
pHandle->statis[i].max = pBlockInfo->compBlock->keyLast;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int64_t elapsed = taosGetTimestampUs() - stime;
|
||||
pHandle->cost.statisInfoLoadTime += elapsed;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -2351,6 +2363,10 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
|
|||
tsdbUnRefMemTable(pQueryHandle->pTsdb, pQueryHandle->imem);
|
||||
|
||||
tsdbDestroyHelper(&pQueryHandle->rhelper);
|
||||
|
||||
tsdbDebug(":io-cost summary: statis-info time:%"PRId64"us, datablock time:%" PRId64"us ,%p", pQueryHandle->cost.statisInfoLoadTime,
|
||||
pQueryHandle->cost.blockLoadTime, pQueryHandle->qinfo);
|
||||
|
||||
tfree(pQueryHandle);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue