|
|
|
@ -411,8 +411,8 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, SQueryRuntim
|
|
|
|
|
pResultRowInfo->capacity = (int32_t)newCapacity;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int64_t tid, char *pData,
|
|
|
|
|
int16_t bytes, bool masterscan, uint64_t tableGroupId) {
|
|
|
|
|
static SResultRow* doSetResultOutBufByKey(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, int64_t tid,
|
|
|
|
|
char* pData, int16_t bytes, bool masterscan, uint64_t tableGroupId) {
|
|
|
|
|
bool existed = false;
|
|
|
|
|
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tableGroupId);
|
|
|
|
|
|
|
|
|
@ -426,16 +426,21 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (p1 != NULL) {
|
|
|
|
|
pResultRowInfo->current = (*p1);
|
|
|
|
|
|
|
|
|
|
if (pResultRowInfo->size == 0) {
|
|
|
|
|
existed = false;
|
|
|
|
|
assert(pResultRowInfo->curPos == -1);
|
|
|
|
|
} else if (pResultRowInfo->size == 1) {
|
|
|
|
|
existed = (pResultRowInfo->pResult[0] == (*p1));
|
|
|
|
|
pResultRowInfo->curPos = 0;
|
|
|
|
|
} else { // check if current pResultRowInfo contains the existed pResultRow
|
|
|
|
|
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tid);
|
|
|
|
|
void* ptr = taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
|
|
|
|
existed = (ptr != NULL);
|
|
|
|
|
int64_t* index = taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
|
|
|
|
if (index != NULL) {
|
|
|
|
|
pResultRowInfo->curPos = (int32_t) *index;
|
|
|
|
|
existed = true;
|
|
|
|
|
} else {
|
|
|
|
|
existed = false;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
@ -462,12 +467,12 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes
|
|
|
|
|
pResult = *p1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pResultRowInfo->curPos = pResultRowInfo->size;
|
|
|
|
|
pResultRowInfo->pResult[pResultRowInfo->size++] = pResult;
|
|
|
|
|
pResultRowInfo->current = pResult;
|
|
|
|
|
|
|
|
|
|
int64_t dummyVal = 0;
|
|
|
|
|
int64_t index = pResultRowInfo->curPos;
|
|
|
|
|
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tid);
|
|
|
|
|
taosHashPut(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &dummyVal, POINTER_BYTES);
|
|
|
|
|
taosHashPut(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &index, POINTER_BYTES);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// too many time window in query
|
|
|
|
@ -475,7 +480,7 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes
|
|
|
|
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return pResultRowInfo->current;
|
|
|
|
|
return pResultRowInfo->pResult[pResultRowInfo->curPos];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void getInitialStartTimeWindow(SQueryAttr* pQueryAttr, TSKEY ts, STimeWindow* w) {
|
|
|
|
@ -506,13 +511,8 @@ static void getInitialStartTimeWindow(SQueryAttr* pQueryAttr, TSKEY ts, STimeWin
|
|
|
|
|
static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, SQueryAttr *pQueryAttr) {
|
|
|
|
|
STimeWindow w = {0};
|
|
|
|
|
|
|
|
|
|
if (pResultRowInfo->current == NULL) { // the first window, from the previous stored value
|
|
|
|
|
// if (pResultRowInfo->prevSKey == TSKEY_INITIAL_VAL) {
|
|
|
|
|
getInitialStartTimeWindow(pQueryAttr, ts, &w);
|
|
|
|
|
// pResultRowInfo->prevSKey = w.skey;
|
|
|
|
|
// } else {
|
|
|
|
|
// w.skey = pResultRowInfo->prevSKey;
|
|
|
|
|
// }
|
|
|
|
|
if (pResultRowInfo->curPos == -1) { // the first window, from the previous stored value
|
|
|
|
|
getInitialStartTimeWindow(pQueryAttr, ts, &w);
|
|
|
|
|
|
|
|
|
|
if (pQueryAttr->interval.intervalUnit == 'n' || pQueryAttr->interval.intervalUnit == 'y') {
|
|
|
|
|
w.ekey = taosTimeAdd(w.skey, pQueryAttr->interval.interval, pQueryAttr->interval.intervalUnit, pQueryAttr->precision) - 1;
|
|
|
|
@ -520,7 +520,7 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t
|
|
|
|
|
w.ekey = w.skey + pQueryAttr->interval.interval - 1;
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
w = pResultRowInfo->current->win;
|
|
|
|
|
w = getResultRow(pResultRowInfo, pResultRowInfo->curPos)->win;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (w.skey > ts || w.ekey < ts) {
|
|
|
|
@ -600,13 +600,13 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int64_t tid, STimeWindow *win,
|
|
|
|
|
static int32_t setResultOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int64_t tid, STimeWindow *win,
|
|
|
|
|
bool masterscan, SResultRow **pResult, int64_t tableGroupId, SQLFunctionCtx* pCtx,
|
|
|
|
|
int32_t numOfOutput, int32_t* rowCellInfoOffset) {
|
|
|
|
|
assert(win->skey <= win->ekey);
|
|
|
|
|
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
|
|
|
|
|
|
|
|
|
|
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, tid, (char *)&win->skey, TSDB_KEYSIZE, masterscan, tableGroupId);
|
|
|
|
|
SResultRow *pResultRow = doSetResultOutBufByKey(pRuntimeEnv, pResultRowInfo, tid, (char *)&win->skey, TSDB_KEYSIZE, masterscan, tableGroupId);
|
|
|
|
|
if (pResultRow == NULL) {
|
|
|
|
|
*pResult = NULL;
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
@ -707,9 +707,10 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey,
|
|
|
|
|
if (skey == TSKEY_INITIAL_VAL) {
|
|
|
|
|
if (pResultRowInfo->size == 0) {
|
|
|
|
|
// assert(pResultRowInfo->current == NULL);
|
|
|
|
|
pResultRowInfo->current = NULL;
|
|
|
|
|
assert(pResultRowInfo->curPos == -1);
|
|
|
|
|
pResultRowInfo->curPos = -1;
|
|
|
|
|
} else {
|
|
|
|
|
pResultRowInfo->current = pResultRowInfo->pResult[pResultRowInfo->size - 1];
|
|
|
|
|
pResultRowInfo->curPos = pResultRowInfo->size - 1;
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
|
|
|
|
@ -721,9 +722,9 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (i == pResultRowInfo->size - 1) {
|
|
|
|
|
pResultRowInfo->current = pResultRowInfo->pResult[i];
|
|
|
|
|
pResultRowInfo->curPos = i;
|
|
|
|
|
} else {
|
|
|
|
|
pResultRowInfo->current = pResultRowInfo->pResult[i + 1]; // current not closed result object
|
|
|
|
|
pResultRowInfo->curPos = i + 1; // current not closed result object
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -732,7 +733,7 @@ static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, SQuer
|
|
|
|
|
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
|
|
|
|
|
if ((lastKey > pQueryAttr->window.ekey && ascQuery) || (lastKey < pQueryAttr->window.ekey && (!ascQuery))) {
|
|
|
|
|
closeAllResultRows(pResultRowInfo);
|
|
|
|
|
pResultRowInfo->current = pResultRowInfo->pResult[pResultRowInfo->size - 1];
|
|
|
|
|
pResultRowInfo->curPos = pResultRowInfo->size - 1;
|
|
|
|
|
} else {
|
|
|
|
|
int32_t step = ascQuery ? 1 : -1;
|
|
|
|
|
doUpdateResultRowIndex(pResultRowInfo, lastKey - step, ascQuery, pQueryAttr->timeWindowInterpo);
|
|
|
|
@ -1241,7 +1242,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
|
|
|
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
|
|
|
|
|
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
|
|
|
|
|
|
|
|
|
|
SResultRow* prevRow = pResultRowInfo->current;
|
|
|
|
|
int32_t prevIndex = pResultRowInfo->curPos;
|
|
|
|
|
|
|
|
|
|
TSKEY* tsCols = NULL;
|
|
|
|
|
if (pSDataBlock->pDataBlock != NULL) {
|
|
|
|
@ -1258,7 +1259,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
|
|
|
|
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
|
|
|
|
|
|
|
|
|
|
SResultRow* pResult = NULL;
|
|
|
|
|
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx,
|
|
|
|
|
int32_t ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx,
|
|
|
|
|
numOfOutput, pInfo->rowCellInfoOffset);
|
|
|
|
|
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
|
|
|
|
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
|
|
|
@ -1270,25 +1271,17 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
|
|
|
|
getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true);
|
|
|
|
|
|
|
|
|
|
// prev time window not interpolation yet.
|
|
|
|
|
// int32_t curIndex = curTimeWindowIndex(pResultRowInfo);
|
|
|
|
|
// if (prevIndex != -1 && prevIndex < curIndex && pQueryAttr->timeWindowInterpo) {
|
|
|
|
|
// for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already.
|
|
|
|
|
if (prevRow != NULL && prevRow != pResultRowInfo->current && pQueryAttr->timeWindowInterpo) {
|
|
|
|
|
int32_t j = 0;
|
|
|
|
|
while(pResultRowInfo->pResult[j] != prevRow) {
|
|
|
|
|
j++;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SResultRow* current = pResultRowInfo->current;
|
|
|
|
|
for(; pResultRowInfo->pResult[j] != current && j < pResultRowInfo->size; ++j) {
|
|
|
|
|
SResultRow* pRes = pResultRowInfo->pResult[j];
|
|
|
|
|
int32_t curIndex = pResultRowInfo->curPos;
|
|
|
|
|
if (prevIndex != -1 && prevIndex < curIndex && pQueryAttr->timeWindowInterpo) {
|
|
|
|
|
for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already.
|
|
|
|
|
SResultRow* pRes = getResultRow(pResultRowInfo, j);
|
|
|
|
|
if (pRes->closed) {
|
|
|
|
|
assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) && resultRowInterpolated(pRes, RESULT_ROW_END_INTERP));
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
STimeWindow w = pRes->win;
|
|
|
|
|
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &w, masterScan, &pResult,
|
|
|
|
|
ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &w, masterScan, &pResult,
|
|
|
|
|
tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
|
|
|
|
|
if (ret != TSDB_CODE_SUCCESS) {
|
|
|
|
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
|
|
|
@ -1306,7 +1299,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// restore current time window
|
|
|
|
|
ret = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx,
|
|
|
|
|
ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx,
|
|
|
|
|
numOfOutput, pInfo->rowCellInfoOffset);
|
|
|
|
|
if (ret != TSDB_CODE_SUCCESS) {
|
|
|
|
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
|
|
|
@ -1326,7 +1319,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// null data, failed to allocate more memory buffer
|
|
|
|
|
int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &nextWin, masterScan, &pResult, tableGroupId,
|
|
|
|
|
int32_t code = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &nextWin, masterScan, &pResult, tableGroupId,
|
|
|
|
|
pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
|
|
|
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
|
|
|
@ -1467,7 +1460,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf
|
|
|
|
|
SResultRow* pResult = NULL;
|
|
|
|
|
|
|
|
|
|
pInfo->curWindow.ekey = pInfo->curWindow.skey;
|
|
|
|
|
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan,
|
|
|
|
|
int32_t ret = setResultOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan,
|
|
|
|
|
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
|
|
|
|
|
pBInfo->rowCellInfoOffset);
|
|
|
|
|
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
|
|
|
@ -1488,7 +1481,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf
|
|
|
|
|
SResultRow* pResult = NULL;
|
|
|
|
|
|
|
|
|
|
pInfo->curWindow.ekey = pInfo->curWindow.skey;
|
|
|
|
|
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan,
|
|
|
|
|
int32_t ret = setResultOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan,
|
|
|
|
|
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
|
|
|
|
|
pBInfo->rowCellInfoOffset);
|
|
|
|
|
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
|
|
|
@ -1532,7 +1525,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasic
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int64_t tid = 0;
|
|
|
|
|
SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, tid, d, len, true, groupIndex);
|
|
|
|
|
SResultRow *pResultRow = doSetResultOutBufByKey(pRuntimeEnv, pResultRowInfo, tid, d, len, true, groupIndex);
|
|
|
|
|
assert (pResultRow != NULL);
|
|
|
|
|
|
|
|
|
|
setResultRowKey(pResultRow, pData, type);
|
|
|
|
@ -2779,7 +2772,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
|
|
|
|
|
TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;
|
|
|
|
|
|
|
|
|
|
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
|
|
|
|
|
if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.tid, &win, masterScan, &pResult, groupId,
|
|
|
|
|
if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.tid, &win, masterScan, &pResult, groupId,
|
|
|
|
|
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
|
|
|
|
|
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
|
|
|
|
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
|
|
|
@ -2825,7 +2818,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
|
|
|
|
|
TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;
|
|
|
|
|
|
|
|
|
|
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
|
|
|
|
|
if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.tid, &win, masterScan, &pResult, groupId,
|
|
|
|
|
if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.tid, &win, masterScan, &pResult, groupId,
|
|
|
|
|
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
|
|
|
|
|
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
|
|
|
|
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
|
|
|
@ -3181,11 +3174,11 @@ static void updateTableQueryInfoForReverseScan(STableQueryInfo *pTableQueryInfo)
|
|
|
|
|
pTableQueryInfo->cur.vgroupIndex = -1;
|
|
|
|
|
|
|
|
|
|
// set the index to be the end slot of result rows array
|
|
|
|
|
SResultRowInfo* pResRowInfo = &pTableQueryInfo->resInfo;
|
|
|
|
|
if (pResRowInfo->size > 0) {
|
|
|
|
|
pResRowInfo->current = pResRowInfo->pResult[pResRowInfo->size - 1];
|
|
|
|
|
SResultRowInfo* pResultRowInfo = &pTableQueryInfo->resInfo;
|
|
|
|
|
if (pResultRowInfo->size > 0) {
|
|
|
|
|
pResultRowInfo->curPos = pResultRowInfo->size - 1;
|
|
|
|
|
} else {
|
|
|
|
|
pResRowInfo->current = NULL;
|
|
|
|
|
pResultRowInfo->curPos = -1;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -3240,7 +3233,7 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, i
|
|
|
|
|
SResultRowInfo* pResultRowInfo = &pInfo->resultRowInfo;
|
|
|
|
|
|
|
|
|
|
int64_t tid = 0;
|
|
|
|
|
SResultRow* pRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, uid);
|
|
|
|
|
SResultRow* pRow = doSetResultOutBufByKey(pRuntimeEnv, pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, uid);
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
|
|
|
|
|
SColumnInfoData* pData = taosArrayGet(pDataBlock->pDataBlock, i);
|
|
|
|
@ -3477,7 +3470,7 @@ void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pRe
|
|
|
|
|
int64_t tid = 0;
|
|
|
|
|
|
|
|
|
|
SResultRow* pResultRow =
|
|
|
|
|
doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, tid, (char*)&tableGroupId, sizeof(tableGroupId), true, uid);
|
|
|
|
|
doSetResultOutBufByKey(pRuntimeEnv, pResultRowInfo, tid, (char*)&tableGroupId, sizeof(tableGroupId), true, uid);
|
|
|
|
|
assert (pResultRow != NULL);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
@ -3680,14 +3673,10 @@ void setIntervalQueryRange(SQueryRuntimeEnv *pRuntimeEnv, TSKEY key) {
|
|
|
|
|
STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current;
|
|
|
|
|
SResultRowInfo *pResultRowInfo = &pTableQueryInfo->resInfo;
|
|
|
|
|
|
|
|
|
|
if (pResultRowInfo->current != NULL) {
|
|
|
|
|
if (pResultRowInfo->curPos != -1) {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// if (pWindowResInfo->prevSKey != TSKEY_INITIAL_VAL) {
|
|
|
|
|
// return;
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
|
|
pTableQueryInfo->win.skey = key;
|
|
|
|
|
STimeWindow win = {.skey = key, .ekey = pQueryAttr->window.ekey};
|
|
|
|
|
|
|
|
|
@ -4609,8 +4598,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pResultRowInfo->size > 0) {
|
|
|
|
|
pResultRowInfo->current = pResultRowInfo->pResult[0];
|
|
|
|
|
// pResultRowInfo->prevSKey = pResultRowInfo->pResult[0]->win.skey;
|
|
|
|
|
pResultRowInfo->curPos = 0;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
qDebug("QInfo:0x%"PRIx64" start to repeat scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
|
|
|
|
@ -4635,8 +4623,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
|
|
|
|
|
pTableScanInfo->order = cond.order;
|
|
|
|
|
|
|
|
|
|
if (pResultRowInfo->size > 0) {
|
|
|
|
|
pResultRowInfo->current = pResultRowInfo->pResult[pResultRowInfo->size - 1];
|
|
|
|
|
// pResultRowInfo->prevSKey = pResultRowInfo->current->win.skey;
|
|
|
|
|
pResultRowInfo->curPos = pResultRowInfo->size - 1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
p = doTableScanImpl(pOperator, newgroup);
|
|
|
|
@ -5496,7 +5483,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
|
|
|
|
|
} else {
|
|
|
|
|
SResultRow* pResult = NULL;
|
|
|
|
|
pInfo->curWindow.ekey = pInfo->curWindow.skey;
|
|
|
|
|
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan,
|
|
|
|
|
int32_t ret = setResultOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan,
|
|
|
|
|
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
|
|
|
|
|
pBInfo->rowCellInfoOffset);
|
|
|
|
|
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
|
|
|
@ -5513,10 +5500,11 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SResultRow* pResult = NULL;
|
|
|
|
|
|
|
|
|
|
pInfo->curWindow.ekey = pInfo->curWindow.skey;
|
|
|
|
|
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan,
|
|
|
|
|
int32_t ret = setResultOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, pSDataBlock->info.tid, &pInfo->curWindow, masterScan,
|
|
|
|
|
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
|
|
|
|
|
pBInfo->rowCellInfoOffset);
|
|
|
|
|
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
|
|
|
@ -6294,8 +6282,8 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) {
|
|
|
|
|
|
|
|
|
|
SColumnInfoData* pColInfo = taosArrayGet(pRes->pDataBlock, 0);
|
|
|
|
|
|
|
|
|
|
while(pInfo->currentIndex < pInfo->totalTables && count < maxNumOfTables) {
|
|
|
|
|
int32_t i = pInfo->currentIndex++;
|
|
|
|
|
while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) {
|
|
|
|
|
int32_t i = pInfo->curPos++;
|
|
|
|
|
STableQueryInfo *item = taosArrayGetP(pa, i);
|
|
|
|
|
|
|
|
|
|
char *output = pColInfo->pData + count * rsize;
|
|
|
|
@ -6339,8 +6327,8 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) {
|
|
|
|
|
SExprInfo* pExprInfo = pOperator->pExpr; // todo use the column list instead of exprinfo
|
|
|
|
|
|
|
|
|
|
count = 0;
|
|
|
|
|
while(pInfo->currentIndex < pInfo->totalTables && count < maxNumOfTables) {
|
|
|
|
|
int32_t i = pInfo->currentIndex++;
|
|
|
|
|
while(pInfo->curPos < pInfo->totalTables && count < maxNumOfTables) {
|
|
|
|
|
int32_t i = pInfo->curPos++;
|
|
|
|
|
|
|
|
|
|
STableQueryInfo* item = taosArrayGetP(pa, i);
|
|
|
|
|
|
|
|
|
@ -6369,7 +6357,7 @@ static SSDataBlock* doTagScan(void* param, bool* newgroup) {
|
|
|
|
|
count += 1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pInfo->currentIndex >= pInfo->totalTables) {
|
|
|
|
|
if (pInfo->curPos >= pInfo->totalTables) {
|
|
|
|
|
pOperator->status = OP_EXEC_DONE;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -6388,7 +6376,7 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf
|
|
|
|
|
assert(numOfGroup == 0 || numOfGroup == 1);
|
|
|
|
|
|
|
|
|
|
pInfo->totalTables = pRuntimeEnv->tableqinfoGroupInfo.numOfTables;
|
|
|
|
|
pInfo->currentIndex = 0;
|
|
|
|
|
pInfo->curPos = 0;
|
|
|
|
|
|
|
|
|
|
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
|
|
|
|
pOperator->name = "SeqTableTagScan";
|
|
|
|
|