diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index 7d7001fdf9..699e761f5a 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -32,7 +32,7 @@ typedef SList SStreamSnapshot; typedef void* (*_state_buff_get_fn)(void* pRowBuff, const void* pKey, size_t keyLen); typedef int32_t (*_state_buff_remove_fn)(void* pRowBuff, const void* pKey, size_t keyLen); -typedef int32_t (*_state_buff_remove_by_pos_fn)(SStreamFileState* pState, SRowBuffPos* pPos); +typedef void (*_state_buff_remove_by_pos_fn)(SStreamFileState* pState, SRowBuffPos* pPos); typedef void (*_state_buff_cleanup_fn)(void* pRowBuff); typedef void* (*_state_buff_create_statekey_fn)(SRowBuffPos* pPos, int64_t num); @@ -83,7 +83,7 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos); int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen); int32_t deleteSessionWinStateBuffFn(void* pBuff, const void* key, size_t keyLen); -int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos); +void deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos); int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStreamStateCur* pCur, const SSessionKey* pWinKey, void** ppVal, int32_t* pVLen); diff --git a/include/util/tlist.h b/include/util/tlist.h index 866a37fee4..73b2ec79e3 100644 --- a/include/util/tlist.h +++ b/include/util/tlist.h @@ -224,7 +224,7 @@ void tdListInit(SList *list, int32_t eleSize); void tdListEmpty(SList *list); SList *tdListNew(int32_t eleSize); void *tdListFree(SList *list); -void *tdListFreeP(SList *list, FDelete fp); +void tdListFreeP(SList *list, FDelete fp); void tdListPrependNode(SList *list, SListNode *node); void tdListAppendNode(SList *list, SListNode *node); int32_t tdListPrepend(SList *list, void *data); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 60beea0876..d495e53d2e 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -183,7 +183,7 @@ static void removeResults(SArray* pWins, SSHashObj* pUpdatedMap) { void* value = *(void**)tmp; taosMemoryFree(value); int32_t tmpRes = tSimpleHashRemove(pUpdatedMap, pW, sizeof(SWinKey)); - qTrace("%s at line %d res:%s", __func__, __LINE__, tmpRes); + qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes); } } } @@ -225,7 +225,7 @@ static void doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId) SStreamIntervalOperatorInfo* pInfo = pOperator->info; SWinKey key = {.ts = ts, .groupId = groupId}; int32_t tmpRes = tSimpleHashRemove(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey)); - qTrace("%s at line %d res:%s", __func__, __LINE__, tmpRes); + qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes); pAPI->stateStore.streamStateDel(pInfo->pState, &key); } @@ -294,7 +294,7 @@ static int32_t doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, S } if (pUpdatedMap) { int32_t tmpRes = tSimpleHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey)); - qTrace("%s at line %d res:%s", __func__, __LINE__, tmpRes); + qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes); } getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC); } while (win.ekey <= endTsCols[i]); @@ -370,7 +370,7 @@ static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp TSDB_CHECK_CODE(code, lino, _end); } int32_t tmpRes = tSimpleHashIterateRemove(pHashMap, pWinKey, sizeof(SWinKey), &pIte, &iter); - qTrace("%s at line %d res:%s", __func__, __LINE__, tmpRes); + qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes); } } @@ -744,14 +744,14 @@ static int32_t processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pF // pull data is over taosArrayDestroy(chArray); int32_t tmpRes = taosHashRemove(pMap, &winRes, sizeof(SWinKey)); - qTrace("%s at line %d res:%s", __func__, __LINE__, tmpRes); + qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes); res = true; qDebug("===stream===retrive pull data over.window %" PRId64, winRes.ts); void* pFinalCh = taosHashGet(pFinalMap, &winRes, sizeof(SWinKey)); if (pFinalCh) { int32_t tmpRes = taosHashRemove(pFinalMap, &winRes, sizeof(SWinKey)); - qTrace("%s at line %d res:%s", __func__, __LINE__, tmpRes); + qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes); doDeleteWindow(pOperator, winRes.ts, winRes.groupId); STimeWindow nextWin = getFinalTimeWindow(winRes.ts, pInterval); SPullWindowInfo pull = {.window = nextWin, @@ -2215,7 +2215,7 @@ void removeSessionDeleteResults(SSHashObj* pHashMap, SArray* pWins) { SSessionKey key = {0}; getSessionHashKey(&pWin->sessionWin, &key); int32_t tmpRes = tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey)); - qTrace("%s at line %d res:%s", __func__, __LINE__, tmpRes); + qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes); } } @@ -2233,7 +2233,7 @@ void removeSessionResults(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SAr if (pVal) { releaseOutputBuf(pAggSup->pState, *(void**)pVal, &pAggSup->pSessionAPI->stateStore); int32_t tmpRes = tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey)); - qTrace("%s at line %d res:%s", __func__, __LINE__, tmpRes); + qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes); } } } @@ -2883,14 +2883,8 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa } } - int32_t code = pAPI->stateStore.streamStateGetByPos(pState, pPos, (void**)&pRow); - if (code == TSDB_CODE_FAILED) { - // for history - qWarn("===stream===not found session result key:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 "", - pKey->win.skey, pKey->win.ekey, pKey->groupId); - pGroupResInfo->index += 1; - continue; - } + code = pAPI->stateStore.streamStateGetByPos(pState, pPos, (void**)&pRow); + TSDB_CHECK_CODE(code, lino, _end); doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset); // no results, continue to check the next one diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index bca9f5cdfb..c3bd2d2422 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -86,7 +86,11 @@ static int32_t addNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInf int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); - ASSERT(pNewPos->pRowBuff); + if (!pNewPos || !pNewPos->pRowBuff) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _end); + } + memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); void* tmp = taosArrayPush(pWinInfos, &pNewPos); if (!tmp) { @@ -107,7 +111,11 @@ static int32_t insertNewSessionWindow(SStreamFileState* pFileState, SArray* pWin int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); - ASSERT(pNewPos->pRowBuff); + if (!pNewPos || !pNewPos->pRowBuff) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _end); + } + memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); void* tmp = taosArrayInsert(pWinInfos, index, &pNewPos); if (!tmp) { @@ -125,7 +133,14 @@ _end: } SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKey, void* p, int32_t* pVLen) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); + if (!pNewPos || !pNewPos->pRowBuff) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _end); + } + memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); pNewPos->needFree = true; pNewPos->beFlushed = true; @@ -135,7 +150,13 @@ SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKe int32_t len = getRowStateRowSize(pFileState); memset(pNewPos->pRowBuff, 0, len); } + +_end: taosMemoryFree(p); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + return NULL; + } return pNewPos; } @@ -169,6 +190,11 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen); if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { (*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen); + if (!(*pVal)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _end); + } + (*pWinCode) = code_file; qDebug("===stream===0 get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file); } else { @@ -215,6 +241,11 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen); if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { (*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen); + if (!(*pVal)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _end); + } + (*pWinCode) = code_file; qDebug("===stream===1 get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file); goto _end; @@ -307,19 +338,30 @@ _end: } int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); + if (!pNewPos || !pNewPos->pRowBuff) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _end); + } pNewPos->needFree = true; pNewPos->beFlushed = true; void* pBuff = NULL; - int32_t code = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen); - if (code != TSDB_CODE_SUCCESS) { - return code; + int32_t winCode = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen); + if (winCode != TSDB_CODE_SUCCESS) { + return winCode; } memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); memcpy(pNewPos->pRowBuff, pBuff, *pVLen); taosMemoryFreeClear(pBuff); (*pVal) = pNewPos; - return TSDB_CODE_SUCCESS; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } int32_t deleteSessionWinStateBuffFn(void* pBuff, const void* key, size_t keyLen) { @@ -343,12 +385,12 @@ int32_t deleteSessionWinStateBuffFn(void* pBuff, const void* key, size_t keyLen) return TSDB_CODE_SUCCESS; } -int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) { +void deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) { SSHashObj* pSessionBuff = getRowStateBuff(pFileState); SSessionKey* pWinKey = (SSessionKey*)pPos->pKey; void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t)); if (!ppBuff) { - return TSDB_CODE_SUCCESS; + return; } SArray* pWinStates = (SArray*)(*ppBuff); int32_t size = taosArrayGetSize(pWinStates); @@ -360,7 +402,6 @@ int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffP taosArrayRemove(pWinStates, index); } } - return TSDB_CODE_SUCCESS; } int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStreamStateCur* pCur, @@ -416,6 +457,11 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream } } pNewPos = getNewRowPosForWrite(pFileState); + if (!pNewPos || !pNewPos->pRowBuff) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _end); + } + memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey)); pNewPos->needFree = true; pNewPos->beFlushed = true; @@ -664,6 +710,11 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void code = TSDB_CODE_SUCCESS; } else if (code == TSDB_CODE_SUCCESS && pVal) { SRowBuffPos* pNewPos = getNewRowPosForWrite(pCur->pStreamFileState); + if (!pNewPos || !pNewPos->pRowBuff) { + code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFreeClear(pData); + return code; + } memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); pNewPos->needFree = true; pNewPos->beFlushed = true; @@ -758,6 +809,11 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch int32_t code_file = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen); if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen); + if (!(*pVal)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _end); + } + (*pWinCode) = code_file; qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file); @@ -810,6 +866,11 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen); if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen); + if (!(*pVal)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _end); + } + (*pWinCode) = code_file; qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file); @@ -896,6 +957,11 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)((char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE))); if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) { (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen); + if (!(*pVal)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _end); + } + goto _end; } } @@ -903,6 +969,10 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C pWinKey->win.ekey = endTs; (*pVal) = createSessionWinBuff(pFileState, pWinKey, NULL, NULL); taosMemoryFree(pRockVal); + if (!(*pVal)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _end); + } } else { code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal); TSDB_CHECK_CODE(code, lino, _end); @@ -942,6 +1012,11 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C if (tmpKey.win.ekey < pFirstWinKey->win.skey) { *pWinKey = tmpKey; (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen); + if (!(*pVal)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _end); + } + (*pWinCount) = code_file; qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file); @@ -1001,6 +1076,11 @@ int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey int32_t code_file = getCountWinStateFromDisc(pFileStore, pWinKey, &p, pVLen); if (code_file == TSDB_CODE_SUCCESS && isFlushedState(pFileState, endTs, 0)) { (*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen); + if (!(*pVal)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _end); + } + qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file); } else { diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index f682673dc0..970f6e9bfc 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -71,15 +71,15 @@ int32_t stateHashBuffRemoveFn(void* pBuff, const void* pKey, size_t keyLen) { return tSimpleHashRemove(pBuff, pKey, keyLen); } -int32_t stateHashBuffRemoveByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) { +void stateHashBuffRemoveByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) { size_t keyLen = pFileState->keyLen; SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pPos->pKey, keyLen); if (ppPos) { if ((*ppPos) == pPos) { - return tSimpleHashRemove(pFileState->rowStateBuff, pPos->pKey, keyLen); + int32_t tmpRes = tSimpleHashRemove(pFileState->rowStateBuff, pPos->pKey, keyLen); + qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes); } } - return TSDB_CODE_SUCCESS; } void stateHashBuffClearFn(void* pBuff) { tSimpleHashClear(pBuff); } @@ -276,13 +276,15 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) { pFileState->stateBuffRemoveByPosFn(pFileState, pPos); } destroyRowBuffPos(pPos); - tdListPopNode(pFileState->usedBuffs, pNode); - taosMemoryFreeClear(pNode); + SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode); + taosMemoryFreeClear(tmp); } } } -void clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max) { +int32_t clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; uint64_t i = 0; SListIter iter = {0}; tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD); @@ -291,16 +293,24 @@ void clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushLi while ((pNode = tdListNext(&iter)) != NULL && i < max) { SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; if (isFlushedState(pFileState, pFileState->getTs(pPos->pKey), 0) && !pPos->beUsed) { - tdListAppend(pFlushList, &pPos); + code = tdListAppend(pFlushList, &pPos); + TSDB_CHECK_CODE(code, lino, _end); + pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey)); pFileState->stateBuffRemoveByPosFn(pFileState, pPos); - tdListPopNode(pFileState->usedBuffs, pNode); - taosMemoryFreeClear(pNode); + SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode); + taosMemoryFreeClear(tmp); if (pPos->pRowBuff) { i++; } } } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } void streamFileStateClear(SStreamFileState* pFileState) { @@ -314,7 +324,9 @@ bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushM void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used) { pPos->beUsed = used; } -void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) { +int32_t popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; uint64_t i = 0; SListIter iter = {0}; tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD); @@ -327,11 +339,13 @@ void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uin ASSERT(pPos->needFree == true); continue; } - tdListAppend(pFlushList, &pPos); + code = tdListAppend(pFlushList, &pPos); + TSDB_CHECK_CODE(code, lino, _end); + pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey)); pFileState->stateBuffRemoveByPosFn(pFileState, pPos); - tdListPopNode(pFileState->usedBuffs, pNode); - taosMemoryFreeClear(pNode); + SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode); + taosMemoryFreeClear(tmp); if (pPos->pRowBuff) { i++; } @@ -339,22 +353,35 @@ void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uin } qInfo("stream state flush %d rows to disk. is used:%d", listNEles(pFlushList), used); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } int32_t flushRowBuff(SStreamFileState* pFileState) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SStreamSnapshot* pFlushList = tdListNew(POINTER_BYTES); if (!pFlushList) { - return TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _end); } uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO); num = TMAX(num, FLUSH_NUM); - clearFlushedRowBuff(pFileState, pFlushList, num); + code = clearFlushedRowBuff(pFileState, pFlushList, num); + TSDB_CHECK_CODE(code, lino, _end); + if (isListEmpty(pFlushList)) { - popUsedBuffs(pFileState, pFlushList, num, false); + code = popUsedBuffs(pFileState, pFlushList, num, false); + TSDB_CHECK_CODE(code, lino, _end); if (isListEmpty(pFlushList)) { - popUsedBuffs(pFileState, pFlushList, num, true); + code = popUsedBuffs(pFileState, pFlushList, num, true); + TSDB_CHECK_CODE(code, lino, _end); } } @@ -369,7 +396,12 @@ int32_t flushRowBuff(SStreamFileState* pFileState) { } tdListFreeP(pFlushList, destroyRowBuffPosPtr); - return TSDB_CODE_SUCCESS; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } int32_t clearRowBuff(SStreamFileState* pFileState) { @@ -400,8 +432,20 @@ void streamFileStateClearBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) { } SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos)); + if (!pPos) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _end); + } + pPos->pKey = taosMemoryCalloc(1, pFileState->keyLen); + if (!pPos->pKey) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _end); + } + void* pBuff = getFreeBuff(pFileState); if (pBuff) { pPos->pRowBuff = pBuff; @@ -417,18 +461,28 @@ SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) { } } - int32_t code = clearRowBuff(pFileState); - ASSERT(code == 0); + code = clearRowBuff(pFileState); + TSDB_CHECK_CODE(code, lino, _end); + pPos->pRowBuff = getFreeBuff(pFileState); + code = tdListAppend(pFileState->usedBuffs, &pPos); + TSDB_CHECK_CODE(code, lino, _end); _end: - tdListAppend(pFileState->usedBuffs, &pPos); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + return NULL; + } + ASSERT(pPos->pRowBuff != NULL); return pPos; } SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState) { SRowBuffPos* newPos = getNewRowPos(pFileState); + if (!newPos) { + return NULL; + } newPos->beUsed = true; newPos->beFlushed = false; newPos->needFree = false; @@ -451,7 +505,11 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi goto _end; } SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); - ASSERT(pNewPos->pRowBuff); + if (!pNewPos || !pNewPos->pRowBuff) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _end); + } + memcpy(pNewPos->pKey, pKey, keyLen); (*pWinCode) = TSDB_CODE_FAILED; @@ -498,42 +556,65 @@ int32_t resetRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t key return TSDB_CODE_FAILED; } -static void recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) { +static int32_t recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; int32_t len = 0; void* pBuff = NULL; - pFileState->stateFileGetFn(pFileState, pPos->pKey, &pBuff, &len); + code = pFileState->stateFileGetFn(pFileState, pPos->pKey, &pBuff, &len); + TSDB_CHECK_CODE(code, lino, _end); memcpy(pPos->pRowBuff, pBuff, len); taosMemoryFree(pBuff); + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (pPos->pRowBuff) { if (pPos->needFree) { - recoverSessionRowBuff(pFileState, pPos); + code = recoverSessionRowBuff(pFileState, pPos); + TSDB_CHECK_CODE(code, lino, _end); } (*pVal) = pPos->pRowBuff; - return TSDB_CODE_SUCCESS; + goto _end; } pPos->pRowBuff = getFreeBuff(pFileState); if (!pPos->pRowBuff) { if (pFileState->curRowCount < pFileState->maxRowCount) { pPos->pRowBuff = taosMemoryCalloc(1, pFileState->rowSize); + if (!pPos->pRowBuff) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _end); + } pFileState->curRowCount++; } else { - int32_t code = clearRowBuff(pFileState); - ASSERT(code == 0); + code = clearRowBuff(pFileState); + TSDB_CHECK_CODE(code, lino, _end); pPos->pRowBuff = getFreeBuff(pFileState); } ASSERT(pPos->pRowBuff); } - recoverSessionRowBuff(pFileState, pPos); + code = recoverSessionRowBuff(pFileState, pPos); + TSDB_CHECK_CODE(code, lino, _end); + (*pVal) = pPos->pRowBuff; if (!pPos->needFree) { - tdListPrepend(pFileState->usedBuffs, &pPos); + code = tdListPrepend(pFileState->usedBuffs, &pPos); } - return TSDB_CODE_SUCCESS; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) { @@ -726,6 +807,7 @@ void recoverSesssion(SStreamFileState* pFileState, int64_t ckId) { void recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (pFileState->maxTs != INT64_MIN) { int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs) ? INT64_MIN @@ -744,6 +826,11 @@ void recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { void* pVal = NULL; int32_t vlen = 0; SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState); + if (!pNewPos || !pNewPos->pRowBuff) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _end); + } + code = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen); if (code != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) { destroyRowBuffPos(pNewPos); @@ -763,6 +850,11 @@ void recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { } streamStateCurPrev_rocksdb(pCur); } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } streamStateFreeCur(pCur); } diff --git a/source/util/src/tlist.c b/source/util/src/tlist.c index 90a2b1baab..d4b7335832 100644 --- a/source/util/src/tlist.c +++ b/source/util/src/tlist.c @@ -55,13 +55,11 @@ void tdListEmptyP(SList *list, FDelete fp) { } } -void *tdListFreeP(SList *list, FDelete fp) { +void tdListFreeP(SList *list, FDelete fp) { if (list) { tdListEmptyP(list, fp); taosMemoryFree(list); } - - return NULL; } void tdListPrependNode(SList *list, SListNode *node) { TD_DLIST_PREPEND(list, node); }