fix issue

This commit is contained in:
54liuyao 2024-08-14 15:22:42 +08:00
parent 5a2b89c357
commit 216c53f0ae
7 changed files with 237 additions and 67 deletions

View File

@ -134,7 +134,7 @@ int32_t hashSortFileRemoveFn(SStreamFileState* pFileState, const void* pKey);
void clearSearchBuff(SStreamFileState* pFileState); void clearSearchBuff(SStreamFileState* pFileState);
int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** pVal,
int32_t* pVLen, int32_t* pWinCode); int32_t* pVLen, int32_t* pWinCode);
int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** ppVal,
int32_t* pVLen, int32_t* pWinCode); int32_t* pVLen, int32_t* pWinCode);
int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId); int32_t recoverFillSnapshot(SStreamFileState* pFileState, int64_t ckId);
void deleteHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey); void deleteHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey);

View File

@ -107,6 +107,7 @@ typedef struct SStreamFillInfo {
TSKEY current; // current Key for fill TSKEY current; // current Key for fill
TSKEY preRowKey; TSKEY preRowKey;
TSKEY nextRowKey; TSKEY nextRowKey;
TSKEY nextPointKey;
SResultRowData* pResRow; SResultRowData* pResRow;
SStreamFillLinearInfo* pLinearInfo; SStreamFillLinearInfo* pLinearInfo;
bool needFill; bool needFill;

View File

@ -54,15 +54,18 @@ void streamTimeSliceReleaseState(SOperatorInfo* pOperator) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0; int32_t lino = 0;
SStreamTimeSliceOperatorInfo* pInfo = pOperator->info; SStreamTimeSliceOperatorInfo* pInfo = pOperator->info;
int32_t winSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SWinKey); int32_t winNum = taosArrayGetSize(pInfo->historyWins);
int32_t winSize = winNum * sizeof(SWinKey);
int32_t resSize = winSize + sizeof(TSKEY); int32_t resSize = winSize + sizeof(TSKEY);
char* pBuff = taosMemoryCalloc(1, resSize); char* pBuff = taosMemoryCalloc(1, resSize);
QUERY_CHECK_NULL(pBuff, code, lino, _end, terrno); QUERY_CHECK_NULL(pBuff, code, lino, _end, terrno);
if (winNum > 0) {
memcpy(pBuff, pInfo->historyWins->pData, winSize); memcpy(pBuff, pInfo->historyWins->pData, winSize);
}
memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY)); memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY));
qDebug("===stream=== time slice operator relase state. save result count:%d", qDebug("===stream=== time slice operator relase state. save result count:%d", winNum);
(int32_t)taosArrayGetSize(pInfo->historyWins));
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_TIME_SLICE_OP_STATE_NAME, pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_TIME_SLICE_OP_STATE_NAME,
strlen(STREAM_TIME_SLICE_OP_STATE_NAME), pBuff, resSize); strlen(STREAM_TIME_SLICE_OP_STATE_NAME), pBuff, resSize);
pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState); pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState);
@ -169,6 +172,8 @@ void destroyStreamTimeSliceOperatorInfo(void* param) {
tSimpleHashCleanup(pInfo->pDeletedMap); tSimpleHashCleanup(pInfo->pDeletedMap);
clearGroupResInfo(&pInfo->groupResInfo); clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroy(pInfo->historyWins);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
@ -404,11 +409,11 @@ static void fillNormalRange(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFi
int32_t lino = 0; int32_t lino = 0;
while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) { while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) {
STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current}; STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current};
if (inWinRange(&pFillSup->winRange, &st)) { // if (inWinRange(&pFillSup->winRange, &st)) {
bool res = true; bool res = true;
code = fillPointResult(pFillSup, pFillInfo->pResRow, pFillInfo->current, pBlock, &res, true); code = fillPointResult(pFillSup, pFillInfo->pResRow, pFillInfo->current, pBlock, &res, true);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
} // }
pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit, pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
pFillSup->interval.precision); pFillSup->interval.precision);
} }
@ -905,6 +910,8 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
if (hasNextWindow(pFillSup)) { if (hasNextWindow(pFillSup)) {
nextWKey = pFillSup->next.key; nextWKey = pFillSup->next.key;
} }
TSKEY endTs = adustEndTsKey(ts, pFillSup->cur.key, &pFillSup->interval);
TSKEY startTs = adustPrevTsKey(ts, pFillSup->cur.key, &pFillSup->interval);
pFillInfo->needFill = true; pFillInfo->needFill = true;
pFillInfo->pos = FILL_POS_INVALID; pFillInfo->pos = FILL_POS_INVALID;
@ -914,11 +921,9 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
case TSDB_FILL_SET_VALUE: case TSDB_FILL_SET_VALUE:
case TSDB_FILL_SET_VALUE_F: { case TSDB_FILL_SET_VALUE_F: {
if (hasPrevWindow(pFillSup)) { if (hasPrevWindow(pFillSup)) {
TSKEY endTs = adustEndTsKey(ts, pFillSup->cur.key, &pFillSup->interval);
setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo); setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_END; pFillInfo->pos = FILL_POS_END;
} else { } else {
TSKEY startTs = adustPrevTsKey(ts, pFillSup->cur.key, &pFillSup->interval);
setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo); setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_START; pFillInfo->pos = FILL_POS_START;
} }
@ -927,7 +932,6 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
} break; } break;
case TSDB_FILL_PREV: { case TSDB_FILL_PREV: {
if (hasNextWindow(pFillSup)) { if (hasNextWindow(pFillSup)) {
TSKEY startTs = adustPrevTsKey(ts, pFillSup->cur.key, &pFillSup->interval);
setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo); setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_START; pFillInfo->pos = FILL_POS_START;
resetFillWindow(&pFillSup->prev); resetFillWindow(&pFillSup->prev);
@ -935,7 +939,6 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
pFillSup->prev.pRowVal = pFillSup->cur.pRowVal; pFillSup->prev.pRowVal = pFillSup->cur.pRowVal;
} else { } else {
ASSERT(hasPrevWindow(pFillSup)); ASSERT(hasPrevWindow(pFillSup));
TSKEY endTs = adustEndTsKey(ts, pFillSup->cur.key, &pFillSup->interval);
setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo); setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_END; pFillInfo->pos = FILL_POS_END;
} }
@ -943,7 +946,6 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
} break; } break;
case TSDB_FILL_NEXT: { case TSDB_FILL_NEXT: {
if (hasPrevWindow(pFillSup)) { if (hasPrevWindow(pFillSup)) {
TSKEY endTs = adustEndTsKey(ts, pFillSup->cur.key, &pFillSup->interval);
setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo); setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_END; pFillInfo->pos = FILL_POS_END;
resetFillWindow(&pFillSup->next); resetFillWindow(&pFillSup->next);
@ -951,7 +953,6 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
pFillSup->next.pRowVal = pFillSup->cur.pRowVal; pFillSup->next.pRowVal = pFillSup->cur.pRowVal;
} else { } else {
ASSERT(hasNextWindow(pFillSup)); ASSERT(hasNextWindow(pFillSup));
TSKEY startTs = adustPrevTsKey(ts, pFillSup->cur.key, &pFillSup->interval);
setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo); setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_START; pFillInfo->pos = FILL_POS_START;
resetFillWindow(&pFillSup->prev); resetFillWindow(&pFillSup->prev);
@ -970,7 +971,6 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
pFillInfo->pResRow = &pFillSup->prev; pFillInfo->pResRow = &pFillSup->prev;
pFillInfo->pLinearInfo->hasNext = false; pFillInfo->pLinearInfo->hasNext = false;
} else if (hasPrevWindow(pFillSup)) { } else if (hasPrevWindow(pFillSup)) {
TSKEY endTs = adustEndTsKey(ts, pFillSup->cur.key, &pFillSup->interval);
setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo); setFillKeyInfo(prevWKey, endTs, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_END; pFillInfo->pos = FILL_POS_END;
SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd); SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd);
@ -981,7 +981,6 @@ static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo
pFillInfo->pLinearInfo->hasNext = false; pFillInfo->pLinearInfo->hasNext = false;
} else { } else {
ASSERT(hasNextWindow(pFillSup)); ASSERT(hasNextWindow(pFillSup));
TSKEY startTs = adustPrevTsKey(ts, pFillSup->cur.key, &pFillSup->interval);
setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo); setFillKeyInfo(startTs, nextWKey, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_START; pFillInfo->pos = FILL_POS_START;
SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd); SET_WIN_KEY_INVALID(pFillInfo->pLinearInfo->nextEnd);
@ -1205,6 +1204,18 @@ _end:
} }
} }
void getNextResKey(int64_t curGroupId, SArray* pKeyArray, int32_t curIndex, TSKEY* pNextKey) {
int32_t nextIndex = curIndex + 1;
if (nextIndex < taosArrayGetSize(pKeyArray)) {
SWinKey* pKey = (SWinKey*)taosArrayGet(pKeyArray, nextIndex);
if (pKey->groupId == curGroupId) {
*pNextKey = pKey->ts;
return;
}
}
*pNextKey = INT64_MIN;
}
void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup, void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, SStreamFillSupporter* pFillSup,
SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) { SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -1235,6 +1246,10 @@ void doBuildTimeSlicePointResult(SStreamAggSupporter* pAggSup, SStreamFillSuppor
} }
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
getNextResKey(pKey->groupId, pGroupResInfo->pRows, pGroupResInfo->index, &pFillInfo->nextRowKey);
if (hasNextWindow(pFillSup)) {
pFillInfo->nextPointKey = nextPoint.key.ts;
}
setTimeSliceFillRule(pFillSup, pFillInfo, pKey->ts); setTimeSliceFillRule(pFillSup, pFillInfo, pKey->ts);
doStreamFillRange(pFillSup, pFillInfo, pBlock); doStreamFillRange(pFillSup, pFillInfo, pBlock);
releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore); releaseOutputBuf(pAggSup->pState, curPoint.pResPos, &pAggSup->stateStore);
@ -1326,13 +1341,13 @@ static int32_t doDeleteTimeSliceResult(SStreamAggSupporter* pAggSup, SSDataBlock
for (int32_t i = 0; i < pBlock->info.rows; i++) { for (int32_t i = 0; i < pBlock->info.rows; i++) {
while (1) { while (1) {
TSKEY ts = tsCalStarts[i]; TSKEY ts = tsCalStarts[i];
TSKEY endTs = tsCalEnds[i]; TSKEY endCalTs = tsCalEnds[i];
uint64_t groupId = groupIds[i]; uint64_t groupId = groupIds[i];
SWinKey key = {.ts = ts, .groupId = groupId}; SWinKey key = {.ts = ts, .groupId = groupId};
SWinKey nextKey = {.groupId = groupId}; SWinKey nextKey = {.groupId = groupId};
code = pAggSup->stateStore.streamStateFillGetNext(pAggSup->pState, &key, &nextKey, NULL, NULL, &winCode); code = pAggSup->stateStore.streamStateFillGetNext(pAggSup->pState, &key, &nextKey, NULL, NULL, &winCode);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
if (key.ts > endTs) { if (key.ts > endCalTs) {
break; break;
} }
(void)tSimpleHashRemove(pUpdatedMap, &key, sizeof(SWinKey)); (void)tSimpleHashRemove(pUpdatedMap, &key, sizeof(SWinKey));
@ -1675,6 +1690,9 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
copyFillValueInfo(pInfo->pFillSup, pInfo->pFillInfo); copyFillValueInfo(pInfo->pFillSup, pInfo->pFillInfo);
pInfo->ignoreNull = getIgoreNullRes(pExpSup); pInfo->ignoreNull = getIgoreNullRes(pExpSup);
pInfo->historyWins = taosArrayInit(4, sizeof(SWinKey));
QUERY_CHECK_NULL(pInfo->historyWins, code, lino, _error, terrno);
if (pHandle) { if (pHandle) {
pInfo->isHistoryOp = pHandle->fillHistory; pInfo->isHistoryOp = pHandle->fillHistory;
} }

View File

@ -10384,9 +10384,23 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
} }
} }
if (pStmt->pOptions->fillHistory && pSelect->hasInterpFunc) { if (pSelect->hasInterpFunc) {
if (pStmt->pOptions->fillHistory && pStmt->pOptions->triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"Stream interp unsupported Fill history"); "When trigger was force window close, Stream interp unsupported Fill history");
}
if (pStmt->pOptions->triggerType == STREAM_TRIGGER_WINDOW_CLOSE) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"Stream interp unsupported window close");
}
if ((SRealTableNode*)pSelect->pFromTable && ((SRealTableNode*)pSelect->pFromTable)->pMeta &&
TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&
!hasTbnameFunction(pSelect->pPartitionByList)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"Interp for stream on super table must patitioned by table name");
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -123,9 +123,34 @@ void clearSearchBuff(SStreamFileState* pFileState) {
} }
} }
int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t getStateFromRocksdbByCur(SStreamFileState* pFileState, SStreamStateCur* pCur, SWinKey* pResKey, SRowBuffPos** ppPos, int32_t* pVLen, int32_t* pWinCode) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
void* tmpVal = NULL;
int32_t len = 0;
(*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len);
if ((*pWinCode) == TSDB_CODE_SUCCESS) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
if (!pNewPos || !pNewPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
QUERY_CHECK_CODE(code, lino, _end);
}
memcpy(pNewPos->pRowBuff, tmpVal, len);
taosMemoryFreeClear(tmpVal);
*pVLen = getRowStateRowSize(pFileState);
(*ppPos) = pNewPos;
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** ppVal,
int32_t* pVLen, int32_t* pWinCode) { int32_t* pVLen, int32_t* pWinCode) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SArray* pWinStates = NULL; SArray* pWinStates = NULL;
SSHashObj* pSearchBuff = getSearchBuff(pFileState); SSHashObj* pSearchBuff = getSearchBuff(pFileState);
void* pState = getStateFileStore(pFileState); void* pState = getStateFileStore(pFileState);
@ -134,7 +159,20 @@ int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SW
pWinStates = (SArray*)(*ppBuff); pWinStates = (SArray*)(*ppBuff);
} else { } else {
SStreamStateCur* pCur = streamStateFillSeekKeyNext_rocksdb(pState, pKey); SStreamStateCur* pCur = streamStateFillSeekKeyNext_rocksdb(pState, pKey);
(*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)pVal, pVLen); void* tmpVal = NULL;
int32_t len = 0;
(*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len);
if ((*pWinCode) == TSDB_CODE_SUCCESS) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
if (!pNewPos || !pNewPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
QUERY_CHECK_CODE(code, lino, _end);
}
memcpy(pNewPos->pRowBuff, tmpVal, len);
taosMemoryFreeClear(tmpVal);
*pVLen = getRowStateRowSize(pFileState);
(*ppVal) = pNewPos;
}
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return code; return code;
} }
@ -142,7 +180,20 @@ int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SW
int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare); int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
if (index == -1) { if (index == -1) {
SStreamStateCur* pCur = streamStateFillSeekKeyNext_rocksdb(pState, pKey); SStreamStateCur* pCur = streamStateFillSeekKeyNext_rocksdb(pState, pKey);
(*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)pVal, pVLen); void* tmpVal = NULL;
int32_t len = 0;
(*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len);
if ((*pWinCode) == TSDB_CODE_SUCCESS) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
if (!pNewPos || !pNewPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
QUERY_CHECK_CODE(code, lino, _end);
}
memcpy(pNewPos->pRowBuff, tmpVal, len);
taosMemoryFreeClear(tmpVal);
*pVLen = getRowStateRowSize(pFileState);
(*ppVal) = pNewPos;
}
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return code; return code;
} else { } else {
@ -152,15 +203,21 @@ int32_t getHashSortNextRow(SStreamFileState* pFileState, const SWinKey* pKey, SW
} }
SWinKey* pNext = taosArrayGet(pWinStates, index + 1); SWinKey* pNext = taosArrayGet(pWinStates, index + 1);
*pResKey = *pNext; *pResKey = *pNext;
return getHashSortRowBuff(pFileState, pResKey, pVal, pVLen, pWinCode); return getHashSortRowBuff(pFileState, pResKey, ppVal, pVLen, pWinCode);
} }
(*pWinCode) = TSDB_CODE_FAILED; (*pWinCode) = TSDB_CODE_FAILED;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code; return code;
} }
int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** pVal, int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SWinKey* pResKey, void** ppVal,
int32_t* pVLen, int32_t* pWinCode) { int32_t* pVLen, int32_t* pWinCode) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SArray* pWinStates = NULL; SArray* pWinStates = NULL;
SSHashObj* pSearchBuff = getSearchBuff(pFileState); SSHashObj* pSearchBuff = getSearchBuff(pFileState);
void* pState = getStateFileStore(pFileState); void* pState = getStateFileStore(pFileState);
@ -169,7 +226,20 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW
pWinStates = (SArray*)(*ppBuff); pWinStates = (SArray*)(*ppBuff);
} else { } else {
SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, pKey); SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, pKey);
(*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)pVal, pVLen); void* tmpVal = NULL;
int32_t len = 0;
(*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len);
if ((*pWinCode) == TSDB_CODE_SUCCESS) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
if (!pNewPos || !pNewPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
QUERY_CHECK_CODE(code, lino, _end);
}
memcpy(pNewPos->pRowBuff, tmpVal, len);
taosMemoryFreeClear(tmpVal);
*pVLen = getRowStateRowSize(pFileState);
(*ppVal) = pNewPos;
}
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return code; return code;
} }
@ -177,15 +247,33 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW
int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare); int32_t index = binarySearch(pWinStates, size, pKey, fillStateKeyCompare);
if (index == -1 || index == 0) { if (index == -1 || index == 0) {
SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, pKey); SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, pKey);
(*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)pVal, pVLen); void* tmpVal = NULL;
int32_t len = 0;
(*pWinCode) = streamStateGetGroupKVByCur_rocksdb(pCur, pResKey, (const void**)&tmpVal, &len);
if ((*pWinCode) == TSDB_CODE_SUCCESS) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
if (!pNewPos || !pNewPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
QUERY_CHECK_CODE(code, lino, _end);
}
memcpy(pNewPos->pRowBuff, tmpVal, len);
taosMemoryFreeClear(tmpVal);
*pVLen = getRowStateRowSize(pFileState);
(*ppVal) = pNewPos;
}
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return code; return code;
} else { } else {
SWinKey* pNext = taosArrayGet(pWinStates, index - 1); SWinKey* pNext = taosArrayGet(pWinStates, index - 1);
*pResKey = *pNext; *pResKey = *pNext;
return getHashSortRowBuff(pFileState, pResKey, pVal, pVLen, pWinCode); return getHashSortRowBuff(pFileState, pResKey, ppVal, pVLen, pWinCode);
} }
(*pWinCode) = TSDB_CODE_FAILED; (*pWinCode) = TSDB_CODE_FAILED;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code; return code;
} }

View File

@ -0,0 +1,82 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step2
sql create database test2 vgroups 1;
sql use test2;
sql create stable st(ts timestamp,a int,b int,c int, d double) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
print step2_0
sql create stream streams2_0_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_0_1 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(prev);
sql create stream streams2_0_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_0_2 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(next);
sql create stream streams2_0_3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_0_3 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(linear);
sql create stream streams2_0_4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_0_4 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(NULL);
sql create stream streams2_0_5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_0_5 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(value,11,22,33,44);
print step2_1
sql_error create stream streams2_1_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_1_1 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 range(1648791212000, 1648791215000) every(1s) fill(prev);
sql_error create stream streams2_1_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_1_2 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 range(1648791212000, 1648791215000) every(1s) fill(next);
sql_error create stream streams2_1_3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_1_3 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 range(1648791212000, 1648791215000) every(1s) fill(linear);
sql_error create stream streams2_1_4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_1_4 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 range(1648791212000, 1648791215000) every(1s) fill(NULL);
sql_error create stream streams2_1_5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_1_5 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 range(1648791212000, 1648791215000) every(1s) fill(value,11,22,33,44);
print step2_2
sql_error create stream streams2_2_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_2_1 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st every(1s) fill(prev);
sql_error create stream streams2_2_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_2_2 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st every(1s) fill(next);
sql_error create stream streams2_2_3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_2_3 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st every(1s) fill(linear);
sql_error create stream streams2_2_4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_2_4 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st every(1s) fill(NULL);
sql_error create stream streams2_2_5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_2_5 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st every(1s) fill(value,11,22,33,44);
print step2_3
sql_error create stream streams2_3_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_3_1 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st partition by a every(1s) fill(prev);
sql_error create stream streams2_3_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_3_2 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st partition by a every(1s) fill(next);
sql_error create stream streams2_3_3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_3_3 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st partition by a every(1s) fill(linear);
sql_error create stream streams2_3_4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_3_4 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st partition by a every(1s) fill(NULL);
sql_error create stream streams2_3_5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_3_5 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st partition by a every(1s) fill(value,11,22,33,44);
print step2_4
sql_error create stream streams2_4_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_4_1 as select INTERP(a) FROM t1 RANGE('2023-01-01 00:00:00') fill(prev);
sql_error create stream streams2_4_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_4_2 as select INTERP(a) FROM t1 RANGE('2023-01-01 00:00:00') fill(next);
sql_error create stream streams2_4_3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_4_3 as select INTERP(a) FROM t1 RANGE('2023-01-01 00:00:00') fill(linear);
sql_error create stream streams2_4_4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_4_4 as select INTERP(a) FROM t1 RANGE('2023-01-01 00:00:00') fill(NULL);
sql_error create stream streams2_4_5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_4_5 as select INTERP(a) FROM t1 RANGE('2023-01-01 00:00:00') fill(value,11,22,33,44);
print step2_5
sql_error create stream streams2_5_1 trigger window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_5_1 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(prev);
sql_error create stream streams2_5_2 trigger window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_5_2 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(next);
sql_error create stream streams2_5_3 trigger window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_5_3 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(linear);
sql_error create stream streams2_5_4 trigger window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_5_4 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(NULL);
sql_error create stream streams2_5_5 trigger window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_5_5 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(value,11,22,33,44);
print step2_6
sql create stream streams2_6_1 trigger at_once FILL_HISTORY 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_6_1 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(prev);
sql create stream streams2_6_2 trigger at_once FILL_HISTORY 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_6_2 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(next);
sql create stream streams2_6_3 trigger at_once FILL_HISTORY 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_6_3 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(linear);
sql create stream streams2_6_4 trigger at_once FILL_HISTORY 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_6_4 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(NULL);
sql create stream streams2_6_5 trigger at_once FILL_HISTORY 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_6_5 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(value,11,22,33,44);
sql_error create stream streams2_6_6 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_6_6 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(prev);
sql_error create stream streams2_6_7 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_6_7 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(next);
sql_error create stream streams2_6_8 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_6_8 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(linear);
sql_error create stream streams2_6_9 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_6_9 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(NULL);
sql_error create stream streams2_6_10 trigger force_window_close FILL_HISTORY 1 IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_6_10 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 every(1s) fill(value,11,22,33,44);
run tsim/stream/checkTaskStatus.sim
run tsim/stream/checkTaskStatus.sim
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -211,39 +211,6 @@ if $data17 != 44.000000000 then
goto loop0_2 goto loop0_2
endi endi
print step2
sql create database test2 vgroups 1;
sql use test2;
sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql_error create stream streams2_1_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_1 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 range(1648791212000, 1648791215000) every(1s) fill(prev);
sql_error create stream streams2_1_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_2 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 range(1648791212000, 1648791215000) every(1s) fill(next);
sql_error create stream streams2_1_3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_3 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 range(1648791212000, 1648791215000) every(1s) fill(linear);
sql_error create stream streams2_1_4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_4 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 range(1648791212000, 1648791215000) every(1s) fill(NULL);
sql_error create stream streams2_1_5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_5 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from t1 range(1648791212000, 1648791215000) every(1s) fill(value,11,22,33,44);
sql_error create stream streams2_2_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1_1 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st every(1s) fill(prev);
sql_error create stream streams2_2_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1_2 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st every(1s) fill(next);
sql_error create stream streams2_2_3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1_3 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st every(1s) fill(linear);
sql_error create stream streams2_2_4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1_4 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st every(1s) fill(NULL);
sql_error create stream streams2_2_5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1_5 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st every(1s) fill(value,11,22,33,44);
sql_error create stream streams2_3_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_1 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st prartition by a every(1s) fill(prev);
sql_error create stream streams2_3_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_2 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st prartition by a every(1s) fill(next);
sql_error create stream streams2_3_3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_3 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st prartition by a every(1s) fill(linear);
sql_error create stream streams2_3_4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_4 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st prartition by a every(1s) fill(NULL);
sql_error create stream streams2_3_5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2_5 as select interp(a), _isfilled as a1, interp(b), _isfilled as a2, interp(c), _isfilled as a3, interp(d) from st prartition by a every(1s) fill(value,11,22,33,44);
sql_error create stream streams2_4_1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3_1 as select INTERP(a) FROM t1 RANGE('2023-01-01 00:00:00') fill(prev);
sql_error create stream streams2_4_2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3_2 as select INTERP(a) FROM t1 RANGE('2023-01-01 00:00:00') fill(next);
sql_error create stream streams2_4_3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3_3 as select INTERP(a) FROM t1 RANGE('2023-01-01 00:00:00') fill(linear);
sql_error create stream streams2_4_4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3_4 as select INTERP(a) FROM t1 RANGE('2023-01-01 00:00:00') fill(NULL);
sql_error create stream streams2_4_5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt3_5 as select INTERP(a) FROM t1 RANGE('2023-01-01 00:00:00') fill(value,11,22,33,44);
print step3 print step3
sql create database test3 vgroups 4; sql create database test3 vgroups 4;