[TD-225] refactor codes.
This commit is contained in:
parent
64ab48d7ea
commit
2cd975745c
|
@ -40,7 +40,7 @@ typedef struct SGroupResInfo {
|
||||||
int32_t rowId;
|
int32_t rowId;
|
||||||
} SGroupResInfo;
|
} SGroupResInfo;
|
||||||
|
|
||||||
typedef struct SWindowResultPool {
|
typedef struct SResultRowPool {
|
||||||
int32_t elemSize;
|
int32_t elemSize;
|
||||||
int32_t blockSize;
|
int32_t blockSize;
|
||||||
int32_t numOfElemPerBlock;
|
int32_t numOfElemPerBlock;
|
||||||
|
@ -51,7 +51,7 @@ typedef struct SWindowResultPool {
|
||||||
} position;
|
} position;
|
||||||
|
|
||||||
SArray* pData; // SArray<void*>
|
SArray* pData; // SArray<void*>
|
||||||
} SWindowResultPool;
|
} SResultRowPool;
|
||||||
|
|
||||||
typedef struct SSqlGroupbyExpr {
|
typedef struct SSqlGroupbyExpr {
|
||||||
int16_t tableIndex;
|
int16_t tableIndex;
|
||||||
|
@ -188,9 +188,9 @@ typedef struct SQueryRuntimeEnv {
|
||||||
int32_t interBufSize; // intermediate buffer sizse
|
int32_t interBufSize; // intermediate buffer sizse
|
||||||
int32_t prevGroupId; // previous executed group id
|
int32_t prevGroupId; // previous executed group id
|
||||||
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
||||||
SHashObj* pWindowHashTable; // quick locate the window object for each result
|
SHashObj* pResultRowHashTable; // quick locate the window object for each result
|
||||||
char* keyBuf; // window key buffer
|
char* keyBuf; // window key buffer
|
||||||
SWindowResultPool* pool; // window result object pool
|
SResultRowPool* pool; // window result object pool
|
||||||
|
|
||||||
int32_t* rowCellInfoOffset;// offset value for each row result cell info
|
int32_t* rowCellInfoOffset;// offset value for each row result cell info
|
||||||
} SQueryRuntimeEnv;
|
} SQueryRuntimeEnv;
|
||||||
|
|
|
@ -73,12 +73,12 @@ __filter_func_t *getValueFilterFuncArray(int32_t type);
|
||||||
|
|
||||||
size_t getWindowResultSize(SQueryRuntimeEnv* pRuntimeEnv);
|
size_t getWindowResultSize(SQueryRuntimeEnv* pRuntimeEnv);
|
||||||
|
|
||||||
SWindowResultPool* initWindowResultPool(size_t size);
|
SResultRowPool* initResultRowPool(size_t size);
|
||||||
SResultRow* getNewWindowResult(SWindowResultPool* p);
|
SResultRow* getNewResultRow(SResultRowPool* p);
|
||||||
int64_t getWindowResultPoolMemSize(SWindowResultPool* p);
|
int64_t getResultRowPoolMemSize(SResultRowPool* p);
|
||||||
void* destroyWindowResultPool(SWindowResultPool* p);
|
void* destroyResultRowPool(SResultRowPool* p);
|
||||||
int32_t getNumOfAllocatedWindowResult(SWindowResultPool* p);
|
int32_t getNumOfAllocatedResultRows(SResultRowPool* p);
|
||||||
int32_t getNumOfUsedWindowResult(SWindowResultPool* p);
|
int32_t getNumOfUsedResultRows(SResultRowPool* p);
|
||||||
|
|
||||||
|
|
||||||
#endif // TDENGINE_QUERYUTIL_H
|
#endif // TDENGINE_QUERYUTIL_H
|
||||||
|
|
|
@ -452,7 +452,7 @@ static SResultRow *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindow
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid);
|
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid);
|
||||||
int32_t *p1 = (int32_t *) taosHashGet(pRuntimeEnv->pWindowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
int32_t *p1 = (int32_t *) taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||||
if (p1 != NULL) {
|
if (p1 != NULL) {
|
||||||
pWindowResInfo->curIndex = *p1;
|
pWindowResInfo->curIndex = *p1;
|
||||||
} else {
|
} else {
|
||||||
|
@ -485,7 +485,7 @@ static SResultRow *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindow
|
||||||
pWindowResInfo->capacity = (int32_t)newCapacity;
|
pWindowResInfo->capacity = (int32_t)newCapacity;
|
||||||
}
|
}
|
||||||
// pRuntimeEnv->summary.winInfoSize += (pQuery->numOfOutput * sizeof(SResultRowCellInfo) + pRuntimeEnv->interBufSize) * inc;
|
// pRuntimeEnv->summary.winInfoSize += (pQuery->numOfOutput * sizeof(SResultRowCellInfo) + pRuntimeEnv->interBufSize) * inc;
|
||||||
SResultRow* pResult = getNewWindowResult(pRuntimeEnv->pool);
|
SResultRow* pResult = getNewResultRow(pRuntimeEnv->pool);
|
||||||
pWindowResInfo->pResult[pWindowResInfo->size] = pResult;
|
pWindowResInfo->pResult[pWindowResInfo->size] = pResult;
|
||||||
int32_t ret = createQueryResultInfo(pQuery, pResult);
|
int32_t ret = createQueryResultInfo(pQuery, pResult);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -494,7 +494,7 @@ static SResultRow *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWindow
|
||||||
|
|
||||||
// add a new result set for a new group
|
// add a new result set for a new group
|
||||||
pWindowResInfo->curIndex = pWindowResInfo->size++;
|
pWindowResInfo->curIndex = pWindowResInfo->size++;
|
||||||
taosHashPut(pRuntimeEnv->pWindowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), (char *)&pWindowResInfo->curIndex, sizeof(int32_t));
|
taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), (char *)&pWindowResInfo->curIndex, sizeof(int32_t));
|
||||||
}
|
}
|
||||||
|
|
||||||
// too many time window in query
|
// too many time window in query
|
||||||
|
@ -1771,10 +1771,10 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
pRuntimeEnv->pTSBuf = tsBufDestroy(pRuntimeEnv->pTSBuf);
|
pRuntimeEnv->pTSBuf = tsBufDestroy(pRuntimeEnv->pTSBuf);
|
||||||
taosTFree(pRuntimeEnv->keyBuf);
|
taosTFree(pRuntimeEnv->keyBuf);
|
||||||
|
|
||||||
taosHashCleanup(pRuntimeEnv->pWindowHashTable);
|
taosHashCleanup(pRuntimeEnv->pResultRowHashTable);
|
||||||
pRuntimeEnv->pWindowHashTable = NULL;
|
pRuntimeEnv->pResultRowHashTable = NULL;
|
||||||
|
|
||||||
pRuntimeEnv->pool = destroyWindowResultPool(pRuntimeEnv->pool);
|
pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool);
|
||||||
}
|
}
|
||||||
|
|
||||||
#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
|
#define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED)
|
||||||
|
@ -4246,16 +4246,16 @@ static void queryCostStatis(SQInfo *pQInfo) {
|
||||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||||
SQueryCostInfo *pSummary = &pRuntimeEnv->summary;
|
SQueryCostInfo *pSummary = &pRuntimeEnv->summary;
|
||||||
|
|
||||||
uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pWindowHashTable);
|
uint64_t hashSize = taosHashGetMemSize(pQInfo->runtimeEnv.pResultRowHashTable);
|
||||||
hashSize += taosHashGetMemSize(pQInfo->tableqinfoGroupInfo.map);
|
hashSize += taosHashGetMemSize(pQInfo->tableqinfoGroupInfo.map);
|
||||||
pSummary->hashSize = hashSize;
|
pSummary->hashSize = hashSize;
|
||||||
|
|
||||||
// add the merge time
|
// add the merge time
|
||||||
pSummary->elapsedTime += pSummary->firstStageMergeTime;
|
pSummary->elapsedTime += pSummary->firstStageMergeTime;
|
||||||
|
|
||||||
SWindowResultPool* p = pQInfo->runtimeEnv.pool;
|
SResultRowPool* p = pQInfo->runtimeEnv.pool;
|
||||||
pSummary->winInfoSize = getWindowResultPoolMemSize(p);
|
pSummary->winInfoSize = getResultRowPoolMemSize(p);
|
||||||
pSummary->numOfTimeWindows = getNumOfAllocatedWindowResult(p);
|
pSummary->numOfTimeWindows = getNumOfAllocatedResultRows(p);
|
||||||
|
|
||||||
qDebug("QInfo:%p :cost summary: elapsed time:%"PRId64" us, first merge:%"PRId64" us, total blocks:%d, "
|
qDebug("QInfo:%p :cost summary: elapsed time:%"PRId64" us, first merge:%"PRId64" us, total blocks:%d, "
|
||||||
"load block statis:%d, load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64,
|
"load block statis:%d, load data block:%d, total rows:%"PRId64 ", check rows:%"PRId64,
|
||||||
|
@ -6322,9 +6322,9 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
|
||||||
pQInfo->runtimeEnv.interBufSize = getOutputInterResultBufSize(pQuery);
|
pQInfo->runtimeEnv.interBufSize = getOutputInterResultBufSize(pQuery);
|
||||||
pQInfo->runtimeEnv.summary.tableInfoSize += (pTableGroupInfo->numOfTables * sizeof(STableQueryInfo));
|
pQInfo->runtimeEnv.summary.tableInfoSize += (pTableGroupInfo->numOfTables * sizeof(STableQueryInfo));
|
||||||
|
|
||||||
pQInfo->runtimeEnv.pWindowHashTable = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
pQInfo->runtimeEnv.pResultRowHashTable = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
pQInfo->runtimeEnv.keyBuf = malloc(TSDB_MAX_BYTES_PER_ROW);
|
pQInfo->runtimeEnv.keyBuf = malloc(TSDB_MAX_BYTES_PER_ROW);
|
||||||
pQInfo->runtimeEnv.pool = initWindowResultPool(getWindowResultSize(&pQInfo->runtimeEnv));
|
pQInfo->runtimeEnv.pool = initResultRowPool(getWindowResultSize(&pQInfo->runtimeEnv));
|
||||||
|
|
||||||
pQInfo->pBuf = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
|
pQInfo->pBuf = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
|
||||||
if (pQInfo->pBuf == NULL) {
|
if (pQInfo->pBuf == NULL) {
|
||||||
|
|
|
@ -105,7 +105,7 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid);
|
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid);
|
||||||
taosHashRemove(pRuntimeEnv->pWindowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
taosHashRemove(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -138,14 +138,14 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid);
|
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid);
|
||||||
int32_t *p = (int32_t *)taosHashGet(pRuntimeEnv->pWindowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
int32_t *p = (int32_t *)taosHashGet(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||||
assert(p != NULL);
|
assert(p != NULL);
|
||||||
|
|
||||||
int32_t v = (*p - num);
|
int32_t v = (*p - num);
|
||||||
assert(v >= 0 && v <= pWindowResInfo->size);
|
assert(v >= 0 && v <= pWindowResInfo->size);
|
||||||
|
|
||||||
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid);
|
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, key, bytes, uid);
|
||||||
taosHashPut(pRuntimeEnv->pWindowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), (char *)&v, sizeof(int32_t));
|
taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), (char *)&v, sizeof(int32_t));
|
||||||
}
|
}
|
||||||
|
|
||||||
pWindowResInfo->curIndex = -1;
|
pWindowResInfo->curIndex = -1;
|
||||||
|
@ -292,8 +292,8 @@ size_t getWindowResultSize(SQueryRuntimeEnv* pRuntimeEnv) {
|
||||||
return (pRuntimeEnv->pQuery->numOfOutput * sizeof(SResultRowCellInfo)) + pRuntimeEnv->interBufSize + sizeof(SResultRow);
|
return (pRuntimeEnv->pQuery->numOfOutput * sizeof(SResultRowCellInfo)) + pRuntimeEnv->interBufSize + sizeof(SResultRow);
|
||||||
}
|
}
|
||||||
|
|
||||||
SWindowResultPool* initWindowResultPool(size_t size) {
|
SResultRowPool* initResultRowPool(size_t size) {
|
||||||
SWindowResultPool* p = calloc(1, sizeof(SWindowResultPool));
|
SResultRowPool* p = calloc(1, sizeof(SResultRowPool));
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -309,7 +309,7 @@ SWindowResultPool* initWindowResultPool(size_t size) {
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
SResultRow* getNewWindowResult(SWindowResultPool* p) {
|
SResultRow* getNewResultRow(SResultRowPool* p) {
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -330,7 +330,7 @@ SResultRow* getNewWindowResult(SWindowResultPool* p) {
|
||||||
return ptr;
|
return ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t getWindowResultPoolMemSize(SWindowResultPool* p) {
|
int64_t getResultRowPoolMemSize(SResultRowPool* p) {
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -338,15 +338,15 @@ int64_t getWindowResultPoolMemSize(SWindowResultPool* p) {
|
||||||
return taosArrayGetSize(p->pData) * p->blockSize;
|
return taosArrayGetSize(p->pData) * p->blockSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t getNumOfAllocatedWindowResult(SWindowResultPool* p) {
|
int32_t getNumOfAllocatedResultRows(SResultRowPool* p) {
|
||||||
return taosArrayGetSize(p->pData) * p->numOfElemPerBlock;
|
return taosArrayGetSize(p->pData) * p->numOfElemPerBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t getNumOfUsedWindowResult(SWindowResultPool* p) {
|
int32_t getNumOfUsedResultRows(SResultRowPool* p) {
|
||||||
return getNumOfAllocatedWindowResult(p) - p->numOfElemPerBlock + p->position.pos;
|
return getNumOfAllocatedResultRows(p) - p->numOfElemPerBlock + p->position.pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* destroyWindowResultPool(SWindowResultPool* p) {
|
void* destroyResultRowPool(SResultRowPool* p) {
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue