<fix>[query]: refactor.
This commit is contained in:
parent
94e19cc32a
commit
2523445452
|
@ -197,7 +197,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
|
|||
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows);
|
||||
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
|
||||
|
||||
int32_t colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows);
|
||||
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows);
|
||||
void blockDataCleanup(SSDataBlock* pDataBlock);
|
||||
|
||||
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
|
||||
|
|
|
@ -1214,7 +1214,7 @@ int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows)
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) {
|
||||
void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows) {
|
||||
if (IS_VAR_DATA_TYPE(pColumn->info.type)) {
|
||||
pColumn->varmeta.length = 0;
|
||||
} else {
|
||||
|
|
|
@ -406,85 +406,6 @@ static bool chkResultRowFromKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pR
|
|||
return p1 != NULL;
|
||||
}
|
||||
|
||||
#if 0
|
||||
static SResultRow* doSetResultOutBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, int64_t tid,
|
||||
char* pData, int16_t bytes, bool masterscan, uint64_t tableGroupId) {
|
||||
bool existed = false;
|
||||
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tableGroupId);
|
||||
|
||||
SResultRow** p1 =
|
||||
(SResultRow**)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||
|
||||
// in case of repeat scan/reverse scan, no new time window added.
|
||||
if (QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQueryAttr)) {
|
||||
if (!masterscan) { // the *p1 may be NULL in case of sliding+offset exists.
|
||||
return (p1 != NULL) ? *p1 : NULL;
|
||||
}
|
||||
|
||||
if (p1 != NULL) {
|
||||
if (pResultRowInfo->size == 0) {
|
||||
existed = false;
|
||||
// assert(pResultRowInfo->curPos == -1);
|
||||
} else if (pResultRowInfo->size == 1) {
|
||||
// existed = (pResultRowInfo->pResult[0] == (*p1));
|
||||
// pResultRowInfo->curPos = 0;
|
||||
} else { // check if current pResultRowInfo contains the existed pResultRow
|
||||
SET_RES_EXT_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tid, pResultRowInfo);
|
||||
int64_t* index =
|
||||
taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes));
|
||||
if (index != NULL) {
|
||||
// pResultRowInfo->curPos = (int32_t)*index;
|
||||
existed = true;
|
||||
} else {
|
||||
existed = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// In case of group by column query, the required SResultRow object must be existed in the pResultRowInfo object.
|
||||
if (p1 != NULL) {
|
||||
return *p1;
|
||||
}
|
||||
}
|
||||
|
||||
if (!existed) {
|
||||
// prepareResultListBuffer(pResultRowInfo, pRuntimeEnv);
|
||||
|
||||
SResultRow* pResult = NULL;
|
||||
if (p1 == NULL) {
|
||||
pResult = getNewResultRow(pRuntimeEnv->pool);
|
||||
int32_t ret = initResultRow(pResult);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
// add a new result set for a new group
|
||||
taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pResult,
|
||||
POINTER_BYTES);
|
||||
SResultRowCell cell = {.groupId = tableGroupId, .pRow = pResult};
|
||||
taosArrayPush(pRuntimeEnv->pResultRowArrayList, &cell);
|
||||
} else {
|
||||
pResult = *p1;
|
||||
}
|
||||
|
||||
pResultRowInfo->curPos = pResultRowInfo->size;
|
||||
pResultRowInfo->pResult[pResultRowInfo->size++] = pResult;
|
||||
|
||||
int64_t index = pResultRowInfo->curPos;
|
||||
SET_RES_EXT_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tid, pResultRowInfo);
|
||||
taosHashPut(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes), &index,
|
||||
POINTER_BYTES);
|
||||
}
|
||||
|
||||
// too many time window in query
|
||||
if (pResultRowInfo->size > MAX_INTERVAL_TIME_WINDOW) {
|
||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
|
||||
}
|
||||
|
||||
return pResultRowInfo->pResult[pResultRowInfo->curPos];
|
||||
}
|
||||
#endif
|
||||
|
||||
SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize) {
|
||||
SFilePage* pData = NULL;
|
||||
|
||||
|
@ -4009,64 +3930,6 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
|
|||
|
||||
static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo);
|
||||
|
||||
static int32_t setupQueryHandle(void* tsdb, STaskRuntimeEnv* pRuntimeEnv, int64_t qId, bool isSTableQuery) {
|
||||
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||
#if 0
|
||||
// TODO set the tags scan handle
|
||||
if (onlyQueryTags(pQueryAttr)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STsdbQueryCond cond = createTsdbQueryCond(pQueryAttr, &pQueryAttr->window);
|
||||
if (pQueryAttr->tsCompQuery || pQueryAttr->pointInterpQuery) {
|
||||
cond.type = BLOCK_LOAD_TABLE_SEQ_ORDER;
|
||||
}
|
||||
|
||||
if (!isSTableQuery
|
||||
&& (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1)
|
||||
&& (cond.order == TSDB_ORDER_ASC)
|
||||
&& (!QUERY_IS_INTERVAL_QUERY(pQueryAttr))
|
||||
&& (!pQueryAttr->groupbyColumn)
|
||||
&& (!pQueryAttr->simpleAgg)
|
||||
) {
|
||||
SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);
|
||||
STableQueryInfo* pCheckInfo = taosArrayGetP(pa, 0);
|
||||
cond.twindow = pCheckInfo->win;
|
||||
}
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
if (isFirstLastRowQuery(pQueryAttr)) {
|
||||
pRuntimeEnv->pTsdbReadHandle = tsdbQueryLastRow(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef);
|
||||
|
||||
// update the query time window
|
||||
pQueryAttr->window = cond.twindow;
|
||||
if (pQueryAttr->tableGroupInfo.numOfTables == 0) {
|
||||
pRuntimeEnv->tableqinfoGroupInfo.numOfTables = 0;
|
||||
} else {
|
||||
size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
|
||||
for(int32_t i = 0; i < numOfGroups; ++i) {
|
||||
SArray *group = GET_TABLEGROUP(pRuntimeEnv, i);
|
||||
|
||||
size_t t = taosArrayGetSize(group);
|
||||
for (int32_t j = 0; j < t; ++j) {
|
||||
STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
|
||||
|
||||
pCheckInfo->win = pQueryAttr->window;
|
||||
pCheckInfo->lastKey = pCheckInfo->win.skey;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (isCachedLastQuery(pQueryAttr)) {
|
||||
pRuntimeEnv->pTsdbReadHandle = tsdbQueryCacheLast(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef);
|
||||
} else if (pQueryAttr->pointInterpQuery) {
|
||||
pRuntimeEnv->pTsdbReadHandle = tsdbQueryRowsInExternalWindow(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef);
|
||||
} else {
|
||||
pRuntimeEnv->pTsdbReadHandle = tsdbQueryTables(tsdb, &cond, &pQueryAttr->tableGroupInfo, qId, &pQueryAttr->memRef);
|
||||
}
|
||||
#endif
|
||||
return terrno;
|
||||
}
|
||||
|
||||
static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQueryInfo* pTableQueryInfo, int32_t order) {
|
||||
#if 0
|
||||
if (order == TSDB_ORDER_ASC) {
|
||||
|
|
Loading…
Reference in New Issue