fix issue & add log
This commit is contained in:
parent
db50c9230a
commit
2d56c8f058
|
@ -827,6 +827,8 @@ static int32_t getPointInfoFromStateRight(SStreamAggSupporter* pAggSup, SStreamF
|
||||||
&curVLen, pWinCode);
|
&curVLen, pWinCode);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
|
qDebug("===stream=== set stream interp next point buf.ts:%" PRId64 ", groupId:%" PRId64 ", res:%d", pNextPoint->key.ts, pNextPoint->key.groupId, pWinCode);
|
||||||
|
|
||||||
setPointBuff(pNextPoint, pFillSup);
|
setPointBuff(pNextPoint, pFillSup);
|
||||||
|
|
||||||
if (*pWinCode != TSDB_CODE_SUCCESS) {
|
if (*pWinCode != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -844,6 +846,9 @@ static int32_t getPointInfoFromStateRight(SStreamAggSupporter* pAggSup, SStreamF
|
||||||
code = pAggSup->stateStore.streamStateFillGetPrev(pState, &pNextPoint->key, &pCurPoint->key,
|
code = pAggSup->stateStore.streamStateFillGetPrev(pState, &pNextPoint->key, &pCurPoint->key,
|
||||||
(void**)&pCurPoint->pResPos, &nextVLen, &tmpRes);
|
(void**)&pCurPoint->pResPos, &nextVLen, &tmpRes);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
|
qDebug("===stream=== set stream interp cur point buf.ts:%" PRId64 ", groupId:%" PRId64 ", res:%d", pCurPoint->key.ts, pCurPoint->key.groupId, tmpRes);
|
||||||
|
|
||||||
if (tmpRes == TSDB_CODE_SUCCESS) {
|
if (tmpRes == TSDB_CODE_SUCCESS) {
|
||||||
setPointBuff(pCurPoint, pFillSup);
|
setPointBuff(pCurPoint, pFillSup);
|
||||||
}
|
}
|
||||||
|
@ -872,6 +877,8 @@ static int32_t getPointInfoFromState(SStreamAggSupporter* pAggSup, SStreamFillSu
|
||||||
&curVLen, pWinCode);
|
&curVLen, pWinCode);
|
||||||
QUERY_CHECK_CODE(code, lino, _end);
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
|
|
||||||
|
qDebug("===stream=== set stream interp buf.ts:%" PRId64 ", groupId:%" PRId64, pCurPoint->key.ts, pCurPoint->key.groupId);
|
||||||
|
|
||||||
setPointBuff(pCurPoint, pFillSup);
|
setPointBuff(pCurPoint, pFillSup);
|
||||||
|
|
||||||
if (*pWinCode != TSDB_CODE_SUCCESS) {
|
if (*pWinCode != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -942,9 +949,8 @@ static void copyCalcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFi
|
||||||
}
|
}
|
||||||
|
|
||||||
static void setForceWindowCloseFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts) {
|
static void setForceWindowCloseFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts) {
|
||||||
TSKEY endTs = adustEndTsKey(ts, pFillSup->cur.key, &pFillSup->interval);
|
qDebug("===stream=== set force window close rule.ts:%" PRId64 ",cur key:%" PRId64 ", has prev%d, has next:%d", ts,
|
||||||
TSKEY startTs = adustPrevTsKey(ts, pFillSup->cur.key, &pFillSup->interval);
|
pFillSup->cur.key, hasPrevWindow(pFillSup), hasNextWindow(pFillSup));
|
||||||
|
|
||||||
pFillInfo->needFill = true;
|
pFillInfo->needFill = true;
|
||||||
pFillInfo->pos = FILL_POS_INVALID;
|
pFillInfo->pos = FILL_POS_INVALID;
|
||||||
switch (pFillInfo->type) {
|
switch (pFillInfo->type) {
|
||||||
|
@ -962,9 +968,9 @@ static void setForceWindowCloseFillRule(SStreamFillSupporter* pFillSup, SStreamF
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
case TSDB_FILL_PREV: {
|
case TSDB_FILL_PREV: {
|
||||||
if (ts == pFillSup->cur.key) {
|
if (ts >= pFillSup->cur.key) {
|
||||||
pFillInfo->pos = FILL_POS_START;
|
setFillKeyInfo(ts, ts + 1, &pFillSup->interval, pFillInfo);
|
||||||
pFillInfo->needFill = false;
|
pFillInfo->pResRow = &pFillSup->cur;
|
||||||
} else if (hasPrevWindow(pFillSup)) {
|
} else if (hasPrevWindow(pFillSup)) {
|
||||||
pFillInfo->pos = FILL_POS_INVALID;
|
pFillInfo->pos = FILL_POS_INVALID;
|
||||||
setFillKeyInfo(ts, ts + 1, &pFillSup->interval, pFillInfo);
|
setFillKeyInfo(ts, ts + 1, &pFillSup->interval, pFillInfo);
|
||||||
|
@ -1430,6 +1436,7 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, STimeWindowAggSup
|
||||||
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
||||||
for (; pGroupResInfo->index < numOfRows; pGroupResInfo->index++) {
|
for (; pGroupResInfo->index < numOfRows; pGroupResInfo->index++) {
|
||||||
SWinKey* pKey = (SWinKey*)taosArrayGet(pGroupResInfo->pRows, pGroupResInfo->index);
|
SWinKey* pKey = (SWinKey*)taosArrayGet(pGroupResInfo->pRows, pGroupResInfo->index);
|
||||||
|
qDebug("===stream=== build interp res. key:%" PRId64 ",groupId:%" PRId64, pKey->ts, pKey->groupId);
|
||||||
if (pBlock->info.id.groupId == 0) {
|
if (pBlock->info.id.groupId == 0) {
|
||||||
pBlock->info.id.groupId = pKey->groupId;
|
pBlock->info.id.groupId = pKey->groupId;
|
||||||
} else if (pBlock->info.id.groupId != pKey->groupId) {
|
} else if (pBlock->info.id.groupId != pKey->groupId) {
|
||||||
|
|
|
@ -276,14 +276,16 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW
|
||||||
streamStateFreeCur(pCur);
|
streamStateFreeCur(pCur);
|
||||||
return code;
|
return code;
|
||||||
} else {
|
} else {
|
||||||
SWinKey* pNext = taosArrayGet(pWinStates, index - 1);
|
SWinKey* pPrevKey = NULL;
|
||||||
if (qDebugFlag & DEBUG_DEBUG) {
|
SWinKey* pCurKey = taosArrayGet(pWinStates, index);
|
||||||
SWinKey* pTmp = taosArrayGet(pWinStates, index);
|
if (winKeyCmprImpl(pCurKey, pKey) == 0) {
|
||||||
if (winKeyCmprImpl(pTmp, pKey) != 0) {
|
pPrevKey = taosArrayGet(pWinStates, index - 1);
|
||||||
qError("%s failed at line %d since do not find cur SWinKey", __func__, lino);
|
} else {
|
||||||
}
|
pPrevKey = taosArrayGet(pWinStates, index);
|
||||||
|
qDebug("%s failed at line %d since do not find cur SWinKey. trigger may be force window close", __func__, __LINE__);
|
||||||
}
|
}
|
||||||
*pResKey = *pNext;
|
|
||||||
|
*pResKey = *pPrevKey;
|
||||||
return getHashSortRowBuff(pFileState, pResKey, ppVal, pVLen, pWinCode);
|
return getHashSortRowBuff(pFileState, pResKey, ppVal, pVLen, pWinCode);
|
||||||
}
|
}
|
||||||
(*pWinCode) = TSDB_CODE_FAILED;
|
(*pWinCode) = TSDB_CODE_FAILED;
|
||||||
|
|
Loading…
Reference in New Issue