From a8e86c9a675845cb48a953448d8b3eccbd5a0283 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 31 Oct 2020 23:14:43 +0800 Subject: [PATCH] [TD-1870] --- src/query/inc/qExecutor.h | 5 +++- src/query/inc/qUtil.h | 9 ++++++ src/query/src/qExecutor.c | 57 +++++++++++++++++++++----------------- src/query/src/qResultbuf.c | 8 +++--- src/query/src/qUtil.c | 34 +++++++++++------------ src/util/src/hash.c | 2 +- 6 files changed, 66 insertions(+), 49 deletions(-) diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index bdaec7e2fa..ecedf4c13f 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -70,7 +70,8 @@ typedef struct SResultRec { typedef struct SWindowResInfo { SWindowResult* pResult; // result list - SHashObj* hashList; // hash list for quick access +// uint64_t uid; // table uid, in order to identify the result from global hash table +// SHashObj* hashList; // hash list for quick access int16_t type; // data type for hash key int32_t capacity; // max capacity int32_t curIndex; // current start active index @@ -177,6 +178,8 @@ typedef struct SQueryRuntimeEnv { int32_t interBufSize; // intermediate buffer sizse int32_t prevGroupId; // previous executed group id SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file + SHashObj* pWindowHashTable; // quick locate the window object for each result + char* keyBuf; // window key buffer } SQueryRuntimeEnv; enum { diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index 32f26f66f5..fc997a44c5 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -15,6 +15,15 @@ #ifndef TDENGINE_QUERYUTIL_H #define TDENGINE_QUERYUTIL_H +#define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \ + do { \ + assert(sizeof(_uid) == sizeof(uint64_t)); \ + *(uint64_t *)(_k) = (_uid); \ + memcpy((_k) + sizeof(uint64_t), (_ori), (_len)); \ + } while (0) + +#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t)) + int32_t getOutputInterResultBufSize(SQuery* pQuery); void clearTimeWindowResBuf(SQueryRuntimeEnv* pRuntimeEnv, SWindowResult* pOneOutputRes); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 2eb0abd070..32ed7190c6 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -448,10 +448,11 @@ static bool hasNullValue(SColIndex* pColIndex, SDataStatis *pStatis, SDataStatis } static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, char *pData, - int16_t bytes, bool masterscan) { + int16_t bytes, bool masterscan, uint64_t uid) { SQuery *pQuery = pRuntimeEnv->pQuery; - int32_t *p1 = (int32_t *) taosHashGet(pWindowResInfo->hashList, pData, bytes); + SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid); + int32_t *p1 = (int32_t *) taosHashGet(pRuntimeEnv->pWindowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); if (p1 != NULL) { pWindowResInfo->curIndex = *p1; } else { @@ -495,7 +496,7 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin // add a new result set for a new group pWindowResInfo->curIndex = pWindowResInfo->size++; - taosHashPut(pWindowResInfo->hashList, pData, bytes, (char *)&pWindowResInfo->curIndex, sizeof(int32_t)); + taosHashPut(pRuntimeEnv->pWindowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), (char *)&pWindowResInfo->curIndex, sizeof(int32_t)); } // too many time window in query @@ -555,7 +556,7 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t return w; } -static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResultBuf *pResultBuf, int32_t sid, +static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResultBuf *pResultBuf, int32_t tid, int32_t numOfRowsPerPage) { if (pWindowRes->pageId != -1) { return 0; @@ -565,10 +566,10 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult // in the first scan, new space needed for results int32_t pageId = -1; - SIDList list = getDataBufPagesIdList(pResultBuf, sid); + SIDList list = getDataBufPagesIdList(pResultBuf, tid); if (taosArrayGetSize(list) == 0) { - pData = getNewDataBuf(pResultBuf, sid, &pageId); + pData = getNewDataBuf(pResultBuf, tid, &pageId); } else { SPageInfo* pi = getLastPageInfo(list); pData = getResBufPage(pResultBuf, pi->pageId); @@ -578,7 +579,7 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult // release current page first, and prepare the next one releaseResBufPageInfo(pResultBuf, pi); - pData = getNewDataBuf(pResultBuf, sid, &pageId); + pData = getNewDataBuf(pResultBuf, tid, &pageId); if (pData != NULL) { assert(pData->num == 0); // number of elements must be 0 for new allocated buffer } @@ -600,13 +601,12 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SDiskbasedResult return 0; } -static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, int32_t sid, +static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, SDataBlockInfo* pBockInfo, STimeWindow *win, bool masterscan, bool* newWind) { assert(win->skey <= win->ekey); SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; - SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win->skey, - TSDB_KEYSIZE, masterscan); + SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, pBockInfo->uid); if (pWindowRes == NULL) { *newWind = false; @@ -617,7 +617,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes // not assign result buffer yet, add new result buffer if (pWindowRes->pageId == -1) { - int32_t ret = addNewWindowResultBuf(pWindowRes, pResultBuf, sid, pRuntimeEnv->numOfRowsPerPage); + int32_t ret = addNewWindowResultBuf(pWindowRes, pResultBuf, pBockInfo->tid, pRuntimeEnv->numOfRowsPerPage); if (ret != TSDB_CODE_SUCCESS) { return -1; } @@ -1024,8 +1024,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * bool hasTimeWindow = false; STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win, masterScan, &hasTimeWindow) != - TSDB_CODE_SUCCESS) { + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) { taosTFree(sasArray); return; } @@ -1053,8 +1052,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * // null data, failed to allocate more memory buffer hasTimeWindow = false; - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &nextWin, masterScan, - &hasTimeWindow) != TSDB_CODE_SUCCESS) { + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &nextWin, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) { break; } @@ -1112,12 +1110,13 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat len = varDataLen(pData); } else if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) { SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); - qError("QInfo:%p group by not supported on double/float/binary/nchar columns, abort", pQInfo); + qError("QInfo:%p group by not supported on double/float columns, abort", pQInfo); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR); } - SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, d, len, true); + uint64_t uid = 0; // uid is always set to be 0. + SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, d, len, true, uid); if (pWindowRes == NULL) { return -1; } @@ -1335,7 +1334,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); bool hasTimeWindow = false; - int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &win, masterScan, &hasTimeWindow); + int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code continue; } @@ -1363,7 +1362,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS // null data, failed to allocate more memory buffer hasTimeWindow = false; - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo->tid, &nextWin, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) { + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &nextWin, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) { break; } @@ -1764,6 +1763,10 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle); pRuntimeEnv->pTSBuf = tsBufDestroy(pRuntimeEnv->pTSBuf); + taosTFree(pRuntimeEnv->keyBuf); + + taosHashCleanup(pRuntimeEnv->pWindowHashTable); + pRuntimeEnv->pWindowHashTable = NULL; } #define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED) @@ -2262,7 +2265,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo * pW TSKEY k = QUERY_IS_ASC_QUERY(pQuery)? pBlockInfo->window.skey:pBlockInfo->window.ekey; STimeWindow win = getActiveTimeWindow(pWindowResInfo, k, pQuery); - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pBlockInfo->tid, &win, masterScan, &hasTimeWindow) != + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pBlockInfo, &win, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) { // todo handle error in set result for timewindow } @@ -3787,8 +3790,9 @@ void setExecutionContext(SQInfo *pQInfo, int32_t groupIndex, TSKEY nextKey) { return; } + uint64_t uid = 0; // uid is always set to be 0 SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&groupIndex, - sizeof(groupIndex), true); + sizeof(groupIndex), true, uid); if (pWindowRes == NULL) { return; } @@ -4246,7 +4250,7 @@ static void queryCostStatis(SQInfo *pQInfo) { SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv; SQueryCostInfo *pSummary = &pRuntimeEnv->summary; - uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.windowResInfo.hashList); + uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pWindowHashTable); hashSize += taosHashGetMemSize(pQInfo->tableqinfoGroupInfo.map); int32_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo); @@ -4255,9 +4259,9 @@ static void queryCostStatis(SQInfo *pQInfo) { int32_t numOfTables = taosArrayGetSize(pa); for(int32_t j = 0; j < numOfTables; ++j) { - STableQueryInfo* pTableQueryInfo = taosArrayGetP(pa, j); +// STableQueryInfo* pTableQueryInfo = taosArrayGetP(pa, j); - hashSize += taosHashGetMemSize(pTableQueryInfo->windowResInfo.hashList); +// hashSize += taosHashGetMemSize(pTableQueryInfo->windowResInfo.hashList); } } @@ -6323,7 +6327,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou pQInfo->tableqinfoGroupInfo.pGroupList = taosArrayInit(numOfGroups, POINTER_BYTES); pQInfo->tableqinfoGroupInfo.numOfTables = pTableGroupInfo->numOfTables; pQInfo->tableqinfoGroupInfo.map = taosHashInit(pTableGroupInfo->numOfTables, - taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); + taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); } int tableIndex = 0; @@ -6331,6 +6335,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou pQInfo->runtimeEnv.interBufSize = getOutputInterResultBufSize(pQuery); pQInfo->runtimeEnv.summary.tableInfoSize += (pTableGroupInfo->numOfTables * sizeof(STableQueryInfo)); + pQInfo->runtimeEnv.pWindowHashTable = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + pQInfo->runtimeEnv.keyBuf = malloc(TSDB_MAX_BYTES_PER_ROW); + pQInfo->pBuf = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo)); if (pQInfo->pBuf == NULL) { goto _cleanup; diff --git a/src/query/src/qResultbuf.c b/src/query/src/qResultbuf.c index 3892c84c67..bd7c4694d0 100644 --- a/src/query/src/qResultbuf.c +++ b/src/query/src/qResultbuf.c @@ -407,14 +407,14 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf) { } if (pResultBuf->file != NULL) { - qDebug("QInfo:%p res output buffer closed, total:%" PRId64 " B, inmem size:%dbytes, file size:%"PRId64" bytes", - pResultBuf->handle, pResultBuf->totalBufSize, listNEles(pResultBuf->lruList) * pResultBuf->pageSize, + qDebug("QInfo:%p res output buffer closed, total:%.2f Kb, inmem size:%dbytes, file size:%"PRId64" bytes", + pResultBuf->handle, pResultBuf->totalBufSize/1024.0, listNEles(pResultBuf->lruList) * pResultBuf->pageSize, pResultBuf->fileSize); fclose(pResultBuf->file); } else { - qDebug("QInfo:%p res output buffer closed, total:%" PRId64 " bytes, no file created", pResultBuf->handle, - pResultBuf->totalBufSize); + qDebug("QInfo:%p res output buffer closed, total:%.2f Kb, no file created", pResultBuf->handle, + pResultBuf->totalBufSize/1024.0); } unlink(pResultBuf->path); diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 3482c52052..8273a21395 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -36,12 +36,6 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun pWindowResInfo->threshold = threshold; pWindowResInfo->type = type; - _hash_fn_t fn = taosGetDefaultHashFunction(type); - pWindowResInfo->hashList = taosHashInit(threshold, fn, true, false); - if (pWindowResInfo->hashList == NULL) { - return TSDB_CODE_QRY_OUT_OF_MEMORY; - } - pWindowResInfo->curIndex = -1; pWindowResInfo->size = 0; pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL; @@ -83,7 +77,7 @@ void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo) { return; } if (pWindowResInfo->capacity == 0) { - assert(pWindowResInfo->hashList == NULL && pWindowResInfo->pResult == NULL); + assert(/*pWindowResInfo->hashList == NULL && */pWindowResInfo->pResult == NULL); return; } @@ -93,7 +87,7 @@ void cleanupTimeWindowInfo(SWindowResInfo *pWindowResInfo) { } } - taosHashCleanup(pWindowResInfo->hashList); +// taosHashCleanup(pWindowResInfo->hashList); taosTFree(pWindowResInfo->pResult); } @@ -108,11 +102,11 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowR } pWindowResInfo->curIndex = -1; - taosHashCleanup(pWindowResInfo->hashList); +// taosHashCleanup(pWindowResInfo->hashList); pWindowResInfo->size = 0; - _hash_fn_t fn = taosGetDefaultHashFunction(pWindowResInfo->type); - pWindowResInfo->hashList = taosHashInit(pWindowResInfo->capacity, fn, true, false); +// _hash_fn_t fn = taosGetDefaultHashFunction(pWindowResInfo->type); +// pWindowResInfo->hashList = taosHashInit(pWindowResInfo->capacity, fn, true, false); pWindowResInfo->startTime = TSKEY_INITIAL_VAL; pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL; @@ -127,10 +121,10 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { int32_t numOfClosed = numOfClosedTimeWindow(pWindowResInfo); assert(num >= 0 && num <= numOfClosed); - int16_t type = pWindowResInfo->type; - - char *key = NULL; - int16_t bytes = -1; + int16_t type = pWindowResInfo->type; + uint64_t uid = 0; // uid is always set to be 0. + char *key = NULL; + int16_t bytes = -1; for (int32_t i = 0; i < num; ++i) { SWindowResult *pResult = &pWindowResInfo->pResult[i]; @@ -145,7 +139,8 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { bytes = tDataTypeDesc[pWindowResInfo->type].nSize; } - taosHashRemove(pWindowResInfo->hashList, (const char *)key, bytes); + SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); + taosHashRemove(pRuntimeEnv->pWindowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); } else { break; } @@ -177,12 +172,15 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { bytes = tDataTypeDesc[pWindowResInfo->type].nSize; } - int32_t *p = (int32_t *)taosHashGet(pWindowResInfo->hashList, (const char *)key, bytes); + SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); + int32_t *p = (int32_t *)taosHashGet(pRuntimeEnv->pWindowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); assert(p != NULL); int32_t v = (*p - num); assert(v >= 0 && v <= pWindowResInfo->size); - taosHashPut(pWindowResInfo->hashList, (char *)key, bytes, (char *)&v, sizeof(int32_t)); + + SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid); + taosHashPut(pRuntimeEnv->pWindowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), (char *)&v, sizeof(int32_t)); } pWindowResInfo->curIndex = -1; diff --git a/src/util/src/hash.c b/src/util/src/hash.c index 9aadee0924..8ffbb1f0f6 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -804,5 +804,5 @@ size_t taosHashGetMemSize(const SHashObj *pHashObj) { return 0; } - return (pHashObj->capacity * sizeof(SHashEntry) + POINTER_BYTES) + sizeof(SHashNode) * taosHashGetSize(pHashObj); + return (pHashObj->capacity * (sizeof(SHashEntry) + POINTER_BYTES)) + sizeof(SHashNode) * taosHashGetSize(pHashObj) + sizeof(SHashObj); }