Merge pull request #11839 from taosdata/feature/3.0_liaohj
refactor(query): do some internal refactor.
This commit is contained in:
commit
0468b12d56
|
@ -399,8 +399,8 @@ typedef struct SOptrBasicInfo {
|
||||||
// TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset
|
// TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset
|
||||||
typedef struct SAggSupporter {
|
typedef struct SAggSupporter {
|
||||||
SHashObj* pResultRowHashTable; // quick locate the window object for each result
|
SHashObj* pResultRowHashTable; // quick locate the window object for each result
|
||||||
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
|
// SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
|
||||||
SArray* pResultRowArrayList; // The array list that contains the Result rows
|
// SArray* pResultRowArrayList; // The array list that contains the Result rows
|
||||||
char* keyBuf; // window key buffer
|
char* keyBuf; // window key buffer
|
||||||
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
||||||
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
|
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
|
||||||
|
|
|
@ -324,11 +324,16 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, jmp_buf env)
|
||||||
newCapacity = (int64_t)(pResultRowInfo->capacity * 1.5);
|
newCapacity = (int64_t)(pResultRowInfo->capacity * 1.5);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (newCapacity == pResultRowInfo->capacity) {
|
if (newCapacity <= pResultRowInfo->capacity) {
|
||||||
newCapacity += 4;
|
newCapacity += 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
pResultRowInfo->pPosition = taosMemoryRealloc(pResultRowInfo->pPosition, newCapacity * sizeof(SResultRowPosition));
|
char* p = taosMemoryRealloc(pResultRowInfo->pPosition, newCapacity * sizeof(SResultRowPosition));
|
||||||
|
if (p == NULL) {
|
||||||
|
longjmp(env, TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
pResultRowInfo->pPosition = (SResultRowPosition*)p;
|
||||||
|
|
||||||
int32_t inc = (int32_t)newCapacity - pResultRowInfo->capacity;
|
int32_t inc = (int32_t)newCapacity - pResultRowInfo->capacity;
|
||||||
memset(&pResultRowInfo->pPosition[pResultRowInfo->capacity], 0, sizeof(SResultRowPosition) * inc);
|
memset(&pResultRowInfo->pPosition[pResultRowInfo->capacity], 0, sizeof(SResultRowPosition) * inc);
|
||||||
|
@ -419,88 +424,56 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId,
|
||||||
* | 8 bytes | actual length |
|
* | 8 bytes | actual length |
|
||||||
* +----------+---------------+
|
* +----------+---------------+
|
||||||
*/
|
*/
|
||||||
static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, int64_t uid,
|
static SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, int64_t uid,
|
||||||
char* pData, int16_t bytes, bool masterscan, uint64_t groupId,
|
char* pData, int16_t bytes, bool masterscan, uint64_t groupId,
|
||||||
SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup) {
|
SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup) {
|
||||||
bool existInCurrentResusltRowInfo = false;
|
|
||||||
SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
|
SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId);
|
||||||
|
|
||||||
SResultRowPosition* p1 =
|
SResultRowPosition* p1 =
|
||||||
(SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
(SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||||
|
|
||||||
|
SResultRow* pResult = NULL;
|
||||||
|
|
||||||
// in case of repeat scan/reverse scan, no new time window added.
|
// in case of repeat scan/reverse scan, no new time window added.
|
||||||
if (isIntervalQuery) {
|
if (isIntervalQuery) {
|
||||||
if (!masterscan) { // the *p1 may be NULL in case of sliding+offset exists.
|
if (masterscan && p1 != NULL) { // the *p1 may be NULL in case of sliding+offset exists.
|
||||||
if (p1 != NULL) {
|
pResult = getResultRowByPos(pResultBuf, p1);
|
||||||
return getResultRowByPos(pResultBuf, p1);
|
|
||||||
} else {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (p1 != NULL) {
|
|
||||||
if (pResultRowInfo->size == 0) {
|
|
||||||
existInCurrentResusltRowInfo =
|
|
||||||
false; // this time window created by other timestamp that does not belongs to current table.
|
|
||||||
} else if (pResultRowInfo->size == 1) {
|
|
||||||
SResultRowPosition* p = &pResultRowInfo->pPosition[0];
|
|
||||||
existInCurrentResusltRowInfo = (p->pageId == p1->pageId && p->offset == p1->offset);
|
|
||||||
} else { // check if current pResultRowInfo contains the existInCurrentResusltRowInfo pResultRow
|
|
||||||
SET_RES_EXT_WINDOW_KEY(pSup->keyBuf, pData, bytes, uid, pResultRowInfo);
|
|
||||||
int64_t* index = taosHashGet(pSup->pResultRowListSet, pSup->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes));
|
|
||||||
if (index != NULL) {
|
|
||||||
// TODO check the scan order for current opened time window
|
|
||||||
existInCurrentResusltRowInfo = true;
|
|
||||||
} else {
|
|
||||||
existInCurrentResusltRowInfo = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
|
// In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
|
||||||
// pResultRowInfo object.
|
// pResultRowInfo object.
|
||||||
if (p1 != NULL) {
|
if (p1 != NULL) {
|
||||||
return getResultRowByPos(pResultBuf, p1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
SResultRow* pResult = NULL;
|
|
||||||
if (!existInCurrentResusltRowInfo) {
|
|
||||||
// 1. close current opened time window
|
|
||||||
if (pResultRowInfo->cur.pageId != -1) { // todo extract function
|
|
||||||
SResultRowPosition pos = pResultRowInfo->cur;
|
|
||||||
SFilePage* pPage = getBufPage(pResultBuf, pos.pageId);
|
|
||||||
SResultRow* pRow = (SResultRow*)((char*)pPage + pos.offset);
|
|
||||||
closeResultRow(pRow);
|
|
||||||
releaseBufPage(pResultBuf, pPage);
|
|
||||||
}
|
|
||||||
|
|
||||||
prepareResultListBuffer(pResultRowInfo, pTaskInfo->env);
|
|
||||||
if (p1 == NULL) {
|
|
||||||
pResult = getNewResultRow_rv(pResultBuf, groupId, pSup->resultRowSize);
|
|
||||||
initResultRow(pResult);
|
|
||||||
|
|
||||||
// add a new result set for a new group
|
|
||||||
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
|
|
||||||
taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
|
|
||||||
sizeof(SResultRowPosition));
|
|
||||||
SResultRowCell cell = {.groupId = groupId, .pos = pos};
|
|
||||||
taosArrayPush(pSup->pResultRowArrayList, &cell);
|
|
||||||
} else {
|
|
||||||
pResult = getResultRowByPos(pResultBuf, p1);
|
pResult = getResultRowByPos(pResultBuf, p1);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. set the new time window to be the new active time window
|
|
||||||
pResultRowInfo->pPosition[pResultRowInfo->size++] =
|
|
||||||
(SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
|
|
||||||
pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
|
|
||||||
SET_RES_EXT_WINDOW_KEY(pSup->keyBuf, pData, bytes, uid, pResultRowInfo);
|
|
||||||
taosHashPut(pSup->pResultRowListSet, pSup->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes), &pResultRowInfo->cur,
|
|
||||||
POINTER_BYTES);
|
|
||||||
} else {
|
|
||||||
pResult = getResultRowByPos(pResultBuf, p1);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 1. close current opened time window
|
||||||
|
if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId &&
|
||||||
|
pResult->offset != pResultRowInfo->cur.offset))) {
|
||||||
|
// todo extract function
|
||||||
|
SResultRowPosition pos = pResultRowInfo->cur;
|
||||||
|
SFilePage* pPage = getBufPage(pResultBuf, pos.pageId);
|
||||||
|
SResultRow* pRow = (SResultRow*)((char*)pPage + pos.offset);
|
||||||
|
closeResultRow(pRow);
|
||||||
|
releaseBufPage(pResultBuf, pPage);
|
||||||
|
}
|
||||||
|
|
||||||
|
// allocate a new buffer page
|
||||||
|
prepareResultListBuffer(pResultRowInfo, pTaskInfo->env);
|
||||||
|
if (pResult == NULL) {
|
||||||
|
pResult = getNewResultRow_rv(pResultBuf, groupId, pSup->resultRowSize);
|
||||||
|
initResultRow(pResult);
|
||||||
|
|
||||||
|
// add a new result set for a new group
|
||||||
|
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
|
||||||
|
taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, sizeof(SResultRowPosition));
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. set the new time window to be the new active time window
|
||||||
|
pResultRowInfo->pPosition[pResultRowInfo->size++] =
|
||||||
|
(SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
|
||||||
|
pResultRowInfo->cur = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
|
||||||
|
|
||||||
// too many time window in query
|
// too many time window in query
|
||||||
if (pResultRowInfo->size > MAX_INTERVAL_TIME_WINDOW) {
|
if (pResultRowInfo->size > MAX_INTERVAL_TIME_WINDOW) {
|
||||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
|
longjmp(pTaskInfo->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
|
||||||
|
@ -656,7 +629,7 @@ static int32_t setResultOutputBufByKey_rv(SResultRowInfo* pResultRowInfo, int64_
|
||||||
int32_t numOfOutput, int32_t* rowCellInfoOffset, SAggSupporter* pAggSup,
|
int32_t numOfOutput, int32_t* rowCellInfoOffset, SAggSupporter* pAggSup,
|
||||||
SExecTaskInfo* pTaskInfo) {
|
SExecTaskInfo* pTaskInfo) {
|
||||||
assert(win->skey <= win->ekey);
|
assert(win->skey <= win->ekey);
|
||||||
SResultRow* pResultRow = doSetResultOutBufByKey_rv(pAggSup->pResultBuf, pResultRowInfo, id, (char*)&win->skey,
|
SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, id, (char*)&win->skey,
|
||||||
TSDB_KEYSIZE, masterscan, tableGroupId, pTaskInfo, true, pAggSup);
|
TSDB_KEYSIZE, masterscan, tableGroupId, pTaskInfo, true, pAggSup);
|
||||||
|
|
||||||
if (pResultRow == NULL) {
|
if (pResultRow == NULL) {
|
||||||
|
@ -1708,7 +1681,7 @@ int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char*
|
||||||
SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo;
|
SResultRowInfo* pResultRowInfo = &binfo->resultRowInfo;
|
||||||
SqlFunctionCtx* pCtx = binfo->pCtx;
|
SqlFunctionCtx* pCtx = binfo->pCtx;
|
||||||
|
|
||||||
SResultRow* pResultRow = doSetResultOutBufByKey_rv(pBuf, pResultRowInfo, groupId, (char*)pData, bytes, true, groupId,
|
SResultRow* pResultRow = doSetResultOutBufByKey(pBuf, pResultRowInfo, groupId, (char*)pData, bytes, true, groupId,
|
||||||
pTaskInfo, false, pAggSup);
|
pTaskInfo, false, pAggSup);
|
||||||
assert(pResultRow != NULL);
|
assert(pResultRow != NULL);
|
||||||
|
|
||||||
|
@ -2714,7 +2687,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t
|
||||||
|
|
||||||
int64_t tid = 0;
|
int64_t tid = 0;
|
||||||
int64_t groupId = 0;
|
int64_t groupId = 0;
|
||||||
SResultRow* pRow = doSetResultOutBufByKey_rv(pSup->pResultBuf, pResultRowInfo, tid, (char*)&tid, sizeof(tid), true,
|
SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, tid, (char*)&tid, sizeof(tid), true,
|
||||||
groupId, pTaskInfo, false, pSup);
|
groupId, pTaskInfo, false, pSup);
|
||||||
|
|
||||||
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
|
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
|
||||||
|
@ -3002,7 +2975,7 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, u
|
||||||
int32_t* rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset;
|
int32_t* rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset;
|
||||||
|
|
||||||
SResultRow* pResultRow =
|
SResultRow* pResultRow =
|
||||||
doSetResultOutBufByKey_rv(pAggInfo->aggSup.pResultBuf, pResultRowInfo, uid, (char*)&groupId, sizeof(groupId),
|
doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, uid, (char*)&groupId, sizeof(groupId),
|
||||||
true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
|
true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
|
||||||
assert(pResultRow != NULL);
|
assert(pResultRow != NULL);
|
||||||
|
|
||||||
|
@ -4899,8 +4872,8 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi
|
||||||
initResultRow(resultRow);
|
initResultRow(resultRow);
|
||||||
prepareResultListBuffer(&pInfo->resultRowInfo, pOperator->pTaskInfo->env);
|
prepareResultListBuffer(&pInfo->resultRowInfo, pOperator->pTaskInfo->env);
|
||||||
// pInfo->resultRowInfo.cur = pInfo->resultRowInfo.size;
|
// pInfo->resultRowInfo.cur = pInfo->resultRowInfo.size;
|
||||||
pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] =
|
// pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] =
|
||||||
(SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset};
|
// (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset};
|
||||||
pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset};
|
pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5613,10 +5586,10 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
|
||||||
pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
|
pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput);
|
||||||
pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
|
pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t));
|
||||||
pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);
|
pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);
|
||||||
pAggSup->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
|
// pAggSup->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
|
||||||
pAggSup->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell));
|
// pAggSup->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell));
|
||||||
|
|
||||||
if (pAggSup->keyBuf == NULL || pAggSup->pResultRowArrayList == NULL || pAggSup->pResultRowListSet == NULL ||
|
if (pAggSup->keyBuf == NULL /*|| pAggSup->pResultRowArrayList == NULL || pAggSup->pResultRowListSet == NULL*/ ||
|
||||||
pAggSup->pResultRowHashTable == NULL) {
|
pAggSup->pResultRowHashTable == NULL) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
@ -5632,8 +5605,8 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n
|
||||||
static void cleanupAggSup(SAggSupporter* pAggSup) {
|
static void cleanupAggSup(SAggSupporter* pAggSup) {
|
||||||
taosMemoryFreeClear(pAggSup->keyBuf);
|
taosMemoryFreeClear(pAggSup->keyBuf);
|
||||||
taosHashCleanup(pAggSup->pResultRowHashTable);
|
taosHashCleanup(pAggSup->pResultRowHashTable);
|
||||||
taosHashCleanup(pAggSup->pResultRowListSet);
|
// taosHashCleanup(pAggSup->pResultRowListSet);
|
||||||
taosArrayDestroy(pAggSup->pResultRowArrayList);
|
// taosArrayDestroy(pAggSup->pResultRowArrayList);
|
||||||
destroyDiskbasedBuf(pAggSup->pResultBuf);
|
destroyDiskbasedBuf(pAggSup->pResultBuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6356,7 +6329,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
|
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
|
tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
|
||||||
if (pDataReader == NULL) {
|
if (pDataReader == NULL && terrno != 0) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue