diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 7c82aa659d..9348606d0c 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -105,7 +105,7 @@ typedef struct SResultRowInfo { int16_t type:8; // data type for hash key int32_t size:24; // number of result set int32_t capacity; // max capacity - SResultRow* current; // current active result row + int32_t curPos; // current active result row index of pResult list } SResultRowInfo; typedef struct SColumnFilterElem { @@ -423,7 +423,7 @@ typedef struct STagScanInfo { SColumnInfo* pCols; SSDataBlock* pRes; int32_t totalTables; - int32_t currentIndex; + int32_t curPos; } STagScanInfo; typedef struct SOptrBasicInfo { diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index ea0806e541..3d585afb87 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -411,8 +411,8 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, SQueryRuntim pResultRowInfo->capacity = (int32_t)newCapacity; } -static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int64_t tid, char *pData, - int16_t bytes, bool masterscan, uint64_t tableGroupId) { +static SResultRow* doSetResultOutBufByKey(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, int64_t tid, + char* pData, int16_t bytes, bool masterscan, uint64_t tableGroupId) { bool existed = false; SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tableGroupId); @@ -426,16 +426,21 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes } if (p1 != NULL) { - pResultRowInfo->current = (*p1); - if (pResultRowInfo->size == 0) { existed = false; + assert(pResultRowInfo->curPos == -1); } else if (pResultRowInfo->size == 1) { existed = (pResultRowInfo->pResult[0] == (*p1)); + pResultRowInfo->curPos = 0; } else { // check if current pResultRowInfo contains the existed pResultRow SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tid); - void* ptr = taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); - existed = (ptr != NULL); + int64_t* index = taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); + if (index != NULL) { + pResultRowInfo->curPos = (int32_t) *index; + existed = true; + } else { + existed = false; + } } } } else { @@ -462,12 +467,12 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes pResult = *p1; } + pResultRowInfo->curPos = pResultRowInfo->size; pResultRowInfo->pResult[pResultRowInfo->size++] = pResult; - pResultRowInfo->current = pResult; - int64_t dummyVal = 0; + int64_t index = pResultRowInfo->curPos; SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tid); - taosHashPut(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &dummyVal, POINTER_BYTES); + taosHashPut(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &index, POINTER_BYTES); } // too many time window in query @@ -475,7 +480,7 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); } - return pResultRowInfo->current; + return pResultRowInfo->pResult[pResultRowInfo->curPos]; } static void getInitialStartTimeWindow(SQueryAttr* pQueryAttr, TSKEY ts, STimeWindow* w) { @@ -506,13 +511,8 @@ static void getInitialStartTimeWindow(SQueryAttr* pQueryAttr, TSKEY ts, STimeWin static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, SQueryAttr *pQueryAttr) { STimeWindow w = {0}; - if (pResultRowInfo->current == NULL) { // the first window, from the previous stored value -// if (pResultRowInfo->prevSKey == TSKEY_INITIAL_VAL) { - getInitialStartTimeWindow(pQueryAttr, ts, &w); -// pResultRowInfo->prevSKey = w.skey; -// } else { -// w.skey = pResultRowInfo->prevSKey; -// } + if (pResultRowInfo->curPos == -1) { // the first window, from the previous stored value + getInitialStartTimeWindow(pQueryAttr, ts, &w); if (pQueryAttr->interval.intervalUnit == 'n' || pQueryAttr->interval.intervalUnit == 'y') { w.ekey = taosTimeAdd(w.skey, pQueryAttr->interval.interval, pQueryAttr->interval.intervalUnit, pQueryAttr->precision) - 1; @@ -520,7 +520,7 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t w.ekey = w.skey + pQueryAttr->interval.interval - 1; } } else { - w = pResultRowInfo->current->win; + w = getResultRow(pResultRowInfo, pResultRowInfo->curPos)->win; } if (w.skey > ts || w.ekey < ts) { @@ -600,13 +600,13 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf return 0; } -static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int64_t tid, STimeWindow *win, +static int32_t setResultOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int64_t tid, STimeWindow *win, bool masterscan, SResultRow **pResult, int64_t tableGroupId, SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset) { assert(win->skey <= win->ekey); SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; - SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, tid, (char *)&win->skey, TSDB_KEYSIZE, masterscan, tableGroupId); + SResultRow *pResultRow = doSetResultOutBufByKey(pRuntimeEnv, pResultRowInfo, tid, (char *)&win->skey, TSDB_KEYSIZE, masterscan, tableGroupId); if (pResultRow == NULL) { *pResult = NULL; return TSDB_CODE_SUCCESS; @@ -707,9 +707,10 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey, if (skey == TSKEY_INITIAL_VAL) { if (pResultRowInfo->size == 0) { // assert(pResultRowInfo->current == NULL); - pResultRowInfo->current = NULL; + assert(pResultRowInfo->curPos == -1); + pResultRowInfo->curPos = -1; } else { - pResultRowInfo->current = pResultRowInfo->pResult[pResultRowInfo->size - 1]; + pResultRowInfo->curPos = pResultRowInfo->size - 1; } } else { @@ -721,9 +722,9 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey, } if (i == pResultRowInfo->size - 1) { - pResultRowInfo->current = pResultRowInfo->pResult[i]; + pResultRowInfo->curPos = i; } else { - pResultRowInfo->current = pResultRowInfo->pResult[i + 1]; // current not closed result object + pResultRowInfo->curPos = i + 1; // current not closed result object } } } @@ -732,7 +733,7 @@ static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, SQuer bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); if ((lastKey > pQueryAttr->window.ekey && ascQuery) || (lastKey < pQueryAttr->window.ekey && (!ascQuery))) { closeAllResultRows(pResultRowInfo); - pResultRowInfo->current = pResultRowInfo->pResult[pResultRowInfo->size - 1]; + pResultRowInfo->curPos = pResultRowInfo->size - 1; } else { int32_t step = ascQuery ? 1 : -1; doUpdateResultRowIndex(pResultRowInfo, lastKey - step, ascQuery, pQueryAttr->timeWindowInterpo); @@ -1241,7 +1242,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); - SResultRow* prevRow = pResultRowInfo->current; + int32_t prevIndex = pResultRowInfo->curPos; TSKEY* tsCols = NULL; if (pSDataBlock->pDataBlock != NULL) { @@ -1258,7 +1259,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); SResultRow* pResult = NULL; - int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx, + int32_t ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1270,25 +1271,17 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true); // prev time window not interpolation yet. -// int32_t curIndex = curTimeWindowIndex(pResultRowInfo); -// if (prevIndex != -1 && prevIndex < curIndex && pQueryAttr->timeWindowInterpo) { -// for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already. - if (prevRow != NULL && prevRow != pResultRowInfo->current && pQueryAttr->timeWindowInterpo) { - int32_t j = 0; - while(pResultRowInfo->pResult[j] != prevRow) { - j++; - } - - SResultRow* current = pResultRowInfo->current; - for(; pResultRowInfo->pResult[j] != current && j < pResultRowInfo->size; ++j) { - SResultRow* pRes = pResultRowInfo->pResult[j]; + int32_t curIndex = pResultRowInfo->curPos; + if (prevIndex != -1 && prevIndex < curIndex && pQueryAttr->timeWindowInterpo) { + for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already. + SResultRow* pRes = getResultRow(pResultRowInfo, j); if (pRes->closed) { assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) && resultRowInterpolated(pRes, RESULT_ROW_END_INTERP)); continue; } STimeWindow w = pRes->win; - ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &w, masterScan, &pResult, + ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &w, masterScan, &pResult, tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); if (ret != TSDB_CODE_SUCCESS) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1306,7 +1299,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } // restore current time window - ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx, + ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); if (ret != TSDB_CODE_SUCCESS) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1326,7 +1319,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } // null data, failed to allocate more memory buffer - int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &nextWin, masterScan, &pResult, tableGroupId, + int32_t code = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &nextWin, masterScan, &pResult, tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -1467,7 +1460,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf SResultRow* pResult = NULL; pInfo->curWindow.ekey = pInfo->curWindow.skey; - int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan, + int32_t ret = setResultOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan, &pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput, pBInfo->rowCellInfoOffset); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code @@ -1488,7 +1481,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf SResultRow* pResult = NULL; pInfo->curWindow.ekey = pInfo->curWindow.skey; - int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan, + int32_t ret = setResultOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan, &pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput, pBInfo->rowCellInfoOffset); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code @@ -1532,7 +1525,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasic } int64_t tid = 0; - SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, tid, d, len, true, groupIndex); + SResultRow *pResultRow = doSetResultOutBufByKey(pRuntimeEnv, pResultRowInfo, tid, d, len, true, groupIndex); assert (pResultRow != NULL); setResultRowKey(pResultRow, pData, type); @@ -2779,7 +2772,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey; STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr); - if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.tid, &win, masterScan, &pResult, groupId, + if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.tid, &win, masterScan, &pResult, groupId, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -2825,7 +2818,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey; STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr); - if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.tid, &win, masterScan, &pResult, groupId, + if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.tid, &win, masterScan, &pResult, groupId, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput, pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -3181,11 +3174,11 @@ static void updateTableQueryInfoForReverseScan(STableQueryInfo *pTableQueryInfo) pTableQueryInfo->cur.vgroupIndex = -1; // set the index to be the end slot of result rows array - SResultRowInfo* pResRowInfo = &pTableQueryInfo->resInfo; - if (pResRowInfo->size > 0) { - pResRowInfo->current = pResRowInfo->pResult[pResRowInfo->size - 1]; + SResultRowInfo* pResultRowInfo = &pTableQueryInfo->resInfo; + if (pResultRowInfo->size > 0) { + pResultRowInfo->curPos = pResultRowInfo->size - 1; } else { - pResRowInfo->current = NULL; + pResultRowInfo->curPos = -1; } } @@ -3240,7 +3233,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo; int64_t tid = 0; - SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, uid); + SResultRow* pRow = doSetResultOutBufByKey(pRuntimeEnv, pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, uid); for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { SColumnInfoData* pData = taosArrayGet(pDataBlock->pDataBlock, i); @@ -3477,7 +3470,7 @@ void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pRe int64_t tid = 0; SResultRow* pResultRow = - doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, tid, (char*)&tableGroupId, sizeof(tableGroupId), true, uid); + doSetResultOutBufByKey(pRuntimeEnv, pResultRowInfo, tid, (char*)&tableGroupId, sizeof(tableGroupId), true, uid); assert (pResultRow != NULL); /* @@ -3680,14 +3673,10 @@ void setIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key) { STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current; SResultRowInfo *pResultRowInfo = &pTableQueryInfo->resInfo; - if (pResultRowInfo->current != NULL) { + if (pResultRowInfo->curPos != -1) { return; } -// if (pWindowResInfo->prevSKey != TSKEY_INITIAL_VAL) { -// return; -// } - pTableQueryInfo->win.skey = key; STimeWindow win = {.skey = key, .ekey = pQueryAttr->window.ekey}; @@ -4609,8 +4598,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { } if (pResultRowInfo->size > 0) { - pResultRowInfo->current = pResultRowInfo->pResult[0]; -// pResultRowInfo->prevSKey = pResultRowInfo->pResult[0]->win.skey; + pResultRowInfo->curPos = 0; } qDebug("QInfo:0x%"PRIx64" start to repeat scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, @@ -4635,8 +4623,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) { pTableScanInfo->order = cond.order; if (pResultRowInfo->size > 0) { - pResultRowInfo->current = pResultRowInfo->pResult[pResultRowInfo->size - 1]; -// pResultRowInfo->prevSKey = pResultRowInfo->current->win.skey; + pResultRowInfo->curPos = pResultRowInfo->size - 1; } p = doTableScanImpl(pOperator, newgroup); @@ -5496,7 +5483,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI } else { SResultRow* pResult = NULL; pInfo->curWindow.ekey = pInfo->curWindow.skey; - int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan, + int32_t ret = setResultOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan, &pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput, pBInfo->rowCellInfoOffset); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code @@ -5513,10 +5500,11 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI } } + SResultRow* pResult = NULL; pInfo->curWindow.ekey = pInfo->curWindow.skey; - int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan, + int32_t ret = setResultOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan, &pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput, pBInfo->rowCellInfoOffset); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code @@ -6294,8 +6282,8 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) { SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0); - while(pInfo->currentIndex < pInfo->totalTables && count < maxNumOfTables) { - int32_t i = pInfo->currentIndex++; + while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) { + int32_t i = pInfo->curPos++; STableQueryInfo *item = taosArrayGetP(pa, i); char *output = pColInfo->pData + count * rsize; @@ -6339,8 +6327,8 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) { SExprInfo* pExprInfo = pOperator->pExpr; // todo use the column list instead of exprinfo count = 0; - while(pInfo->currentIndex < pInfo->totalTables && count < maxNumOfTables) { - int32_t i = pInfo->currentIndex++; + while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) { + int32_t i = pInfo->curPos++; STableQueryInfo* item = taosArrayGetP(pa, i); @@ -6369,7 +6357,7 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) { count += 1; } - if (pInfo->currentIndex >= pInfo->totalTables) { + if (pInfo->curPos >= pInfo->totalTables) { pOperator->status = OP_EXEC_DONE; } @@ -6388,7 +6376,7 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf assert(numOfGroup == 0 || numOfGroup == 1); pInfo->totalTables = pRuntimeEnv->tableqinfoGroupInfo.numOfTables; - pInfo->currentIndex = 0; + pInfo->curPos = 0; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "SeqTableTagScan"; diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index c14f46523d..04a7079128 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -44,8 +44,7 @@ int32_t getOutputInterResultBufSize(SQueryAttr* pQueryAttr) { int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t type) { pResultRowInfo->type = type; pResultRowInfo->size = 0; -// pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL; - pResultRowInfo->current = NULL; + pResultRowInfo->curPos = -1; pResultRowInfo->capacity = size; pResultRowInfo->pResult = calloc(pResultRowInfo->capacity, POINTER_BYTES); @@ -92,8 +91,7 @@ void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRo } pResultRowInfo->size = 0; - pResultRowInfo->current = NULL; -// pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL; + pResultRowInfo->curPos = -1; } int32_t numOfClosedResultRows(SResultRowInfo *pResultRowInfo) {