adj stream operator result

This commit is contained in:
54liuyao 2024-07-18 10:09:15 +08:00
parent d5873aa1b2
commit ab642b245c
6 changed files with 226 additions and 62 deletions

View File

@ -32,7 +32,7 @@ typedef SList SStreamSnapshot;
typedef void* (*_state_buff_get_fn)(void* pRowBuff, const void* pKey, size_t keyLen);
typedef int32_t (*_state_buff_remove_fn)(void* pRowBuff, const void* pKey, size_t keyLen);
typedef int32_t (*_state_buff_remove_by_pos_fn)(SStreamFileState* pState, SRowBuffPos* pPos);
typedef void (*_state_buff_remove_by_pos_fn)(SStreamFileState* pState, SRowBuffPos* pPos);
typedef void (*_state_buff_cleanup_fn)(void* pRowBuff);
typedef void* (*_state_buff_create_statekey_fn)(SRowBuffPos* pPos, int64_t num);
@ -83,7 +83,7 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey,
int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos);
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen);
int32_t deleteSessionWinStateBuffFn(void* pBuff, const void* key, size_t keyLen);
int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos);
void deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos);
int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStreamStateCur* pCur,
const SSessionKey* pWinKey, void** ppVal, int32_t* pVLen);

View File

@ -224,7 +224,7 @@ void tdListInit(SList *list, int32_t eleSize);
void tdListEmpty(SList *list);
SList *tdListNew(int32_t eleSize);
void *tdListFree(SList *list);
void *tdListFreeP(SList *list, FDelete fp);
void tdListFreeP(SList *list, FDelete fp);
void tdListPrependNode(SList *list, SListNode *node);
void tdListAppendNode(SList *list, SListNode *node);
int32_t tdListPrepend(SList *list, void *data);

View File

@ -183,7 +183,7 @@ static void removeResults(SArray* pWins, SSHashObj* pUpdatedMap) {
void* value = *(void**)tmp;
taosMemoryFree(value);
int32_t tmpRes = tSimpleHashRemove(pUpdatedMap, pW, sizeof(SWinKey));
qTrace("%s at line %d res:%s", __func__, __LINE__, tmpRes);
qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
}
}
}
@ -225,7 +225,7 @@ static void doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId)
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SWinKey key = {.ts = ts, .groupId = groupId};
int32_t tmpRes = tSimpleHashRemove(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey));
qTrace("%s at line %d res:%s", __func__, __LINE__, tmpRes);
qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
pAPI->stateStore.streamStateDel(pInfo->pState, &key);
}
@ -294,7 +294,7 @@ static int32_t doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, S
}
if (pUpdatedMap) {
int32_t tmpRes = tSimpleHashRemove(pUpdatedMap, &winRes, sizeof(SWinKey));
qTrace("%s at line %d res:%s", __func__, __LINE__, tmpRes);
qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
}
getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC);
} while (win.ekey <= endTsCols[i]);
@ -370,7 +370,7 @@ static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp
TSDB_CHECK_CODE(code, lino, _end);
}
int32_t tmpRes = tSimpleHashIterateRemove(pHashMap, pWinKey, sizeof(SWinKey), &pIte, &iter);
qTrace("%s at line %d res:%s", __func__, __LINE__, tmpRes);
qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
}
}
@ -744,14 +744,14 @@ static int32_t processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pF
// pull data is over
taosArrayDestroy(chArray);
int32_t tmpRes = taosHashRemove(pMap, &winRes, sizeof(SWinKey));
qTrace("%s at line %d res:%s", __func__, __LINE__, tmpRes);
qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
res = true;
qDebug("===stream===retrive pull data over.window %" PRId64, winRes.ts);
void* pFinalCh = taosHashGet(pFinalMap, &winRes, sizeof(SWinKey));
if (pFinalCh) {
int32_t tmpRes = taosHashRemove(pFinalMap, &winRes, sizeof(SWinKey));
qTrace("%s at line %d res:%s", __func__, __LINE__, tmpRes);
qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
doDeleteWindow(pOperator, winRes.ts, winRes.groupId);
STimeWindow nextWin = getFinalTimeWindow(winRes.ts, pInterval);
SPullWindowInfo pull = {.window = nextWin,
@ -2215,7 +2215,7 @@ void removeSessionDeleteResults(SSHashObj* pHashMap, SArray* pWins) {
SSessionKey key = {0};
getSessionHashKey(&pWin->sessionWin, &key);
int32_t tmpRes = tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
qTrace("%s at line %d res:%s", __func__, __LINE__, tmpRes);
qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
}
}
@ -2233,7 +2233,7 @@ void removeSessionResults(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SAr
if (pVal) {
releaseOutputBuf(pAggSup->pState, *(void**)pVal, &pAggSup->pSessionAPI->stateStore);
int32_t tmpRes = tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
qTrace("%s at line %d res:%s", __func__, __LINE__, tmpRes);
qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
}
}
}
@ -2883,14 +2883,8 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa
}
}
int32_t code = pAPI->stateStore.streamStateGetByPos(pState, pPos, (void**)&pRow);
if (code == TSDB_CODE_FAILED) {
// for history
qWarn("===stream===not found session result key:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 "",
pKey->win.skey, pKey->win.ekey, pKey->groupId);
pGroupResInfo->index += 1;
continue;
}
code = pAPI->stateStore.streamStateGetByPos(pState, pPos, (void**)&pRow);
TSDB_CHECK_CODE(code, lino, _end);
doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
// no results, continue to check the next one

View File

@ -86,7 +86,11 @@ static int32_t addNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInf
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
ASSERT(pNewPos->pRowBuff);
if (!pNewPos || !pNewPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
void* tmp = taosArrayPush(pWinInfos, &pNewPos);
if (!tmp) {
@ -107,7 +111,11 @@ static int32_t insertNewSessionWindow(SStreamFileState* pFileState, SArray* pWin
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
ASSERT(pNewPos->pRowBuff);
if (!pNewPos || !pNewPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
void* tmp = taosArrayInsert(pWinInfos, index, &pNewPos);
if (!tmp) {
@ -125,7 +133,14 @@ _end:
}
SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKey, void* p, int32_t* pVLen) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
if (!pNewPos || !pNewPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
pNewPos->needFree = true;
pNewPos->beFlushed = true;
@ -135,7 +150,13 @@ SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKe
int32_t len = getRowStateRowSize(pFileState);
memset(pNewPos->pRowBuff, 0, len);
}
_end:
taosMemoryFree(p);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return NULL;
}
return pNewPos;
}
@ -169,6 +190,11 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey,
int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
(*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen);
if (!(*pVal)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
(*pWinCode) = code_file;
qDebug("===stream===0 get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file);
} else {
@ -215,6 +241,11 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey,
int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
(*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen);
if (!(*pVal)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
(*pWinCode) = code_file;
qDebug("===stream===1 get session win:%" PRId64 ",%" PRId64 " from disc, res %d", startTs, endTs, code_file);
goto _end;
@ -307,19 +338,30 @@ _end:
}
int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
if (!pNewPos || !pNewPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
pNewPos->needFree = true;
pNewPos->beFlushed = true;
void* pBuff = NULL;
int32_t code = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen);
if (code != TSDB_CODE_SUCCESS) {
return code;
int32_t winCode = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen);
if (winCode != TSDB_CODE_SUCCESS) {
return winCode;
}
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
memcpy(pNewPos->pRowBuff, pBuff, *pVLen);
taosMemoryFreeClear(pBuff);
(*pVal) = pNewPos;
return TSDB_CODE_SUCCESS;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t deleteSessionWinStateBuffFn(void* pBuff, const void* key, size_t keyLen) {
@ -343,12 +385,12 @@ int32_t deleteSessionWinStateBuffFn(void* pBuff, const void* key, size_t keyLen)
return TSDB_CODE_SUCCESS;
}
int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
void deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
SSessionKey* pWinKey = (SSessionKey*)pPos->pKey;
void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
if (!ppBuff) {
return TSDB_CODE_SUCCESS;
return;
}
SArray* pWinStates = (SArray*)(*ppBuff);
int32_t size = taosArrayGetSize(pWinStates);
@ -360,7 +402,6 @@ int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffP
taosArrayRemove(pWinStates, index);
}
}
return TSDB_CODE_SUCCESS;
}
int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStreamStateCur* pCur,
@ -416,6 +457,11 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream
}
}
pNewPos = getNewRowPosForWrite(pFileState);
if (!pNewPos || !pNewPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey));
pNewPos->needFree = true;
pNewPos->beFlushed = true;
@ -664,6 +710,11 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void
code = TSDB_CODE_SUCCESS;
} else if (code == TSDB_CODE_SUCCESS && pVal) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pCur->pStreamFileState);
if (!pNewPos || !pNewPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFreeClear(pData);
return code;
}
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
pNewPos->needFree = true;
pNewPos->beFlushed = true;
@ -758,6 +809,11 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
int32_t code_file = streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen);
if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
(*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
if (!(*pVal)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
(*pWinCode) = code_file;
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
pWinKey->win.ekey, code_file);
@ -810,6 +866,11 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
streamStateStateAddIfNotExist_rocksdb(pFileStore, pWinKey, pKeyData, keyDataLen, fn, &p, pVLen);
if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
(*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
if (!(*pVal)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
(*pWinCode) = code_file;
qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
pWinKey->win.ekey, code_file);
@ -896,6 +957,11 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)((char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)));
if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) {
(*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
if (!(*pVal)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
goto _end;
}
}
@ -903,6 +969,10 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
pWinKey->win.ekey = endTs;
(*pVal) = createSessionWinBuff(pFileState, pWinKey, NULL, NULL);
taosMemoryFree(pRockVal);
if (!(*pVal)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
} else {
code = addNewSessionWindow(pFileState, pWinStates, pWinKey, (SRowBuffPos**)pVal);
TSDB_CHECK_CODE(code, lino, _end);
@ -942,6 +1012,11 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
if (tmpKey.win.ekey < pFirstWinKey->win.skey) {
*pWinKey = tmpKey;
(*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
if (!(*pVal)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
(*pWinCount) = code_file;
qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
pWinKey->win.ekey, code_file);
@ -1001,6 +1076,11 @@ int32_t createCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey
int32_t code_file = getCountWinStateFromDisc(pFileStore, pWinKey, &p, pVLen);
if (code_file == TSDB_CODE_SUCCESS && isFlushedState(pFileState, endTs, 0)) {
(*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
if (!(*pVal)) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
pWinKey->win.ekey, code_file);
} else {

View File

@ -71,15 +71,15 @@ int32_t stateHashBuffRemoveFn(void* pBuff, const void* pKey, size_t keyLen) {
return tSimpleHashRemove(pBuff, pKey, keyLen);
}
int32_t stateHashBuffRemoveByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
void stateHashBuffRemoveByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
size_t keyLen = pFileState->keyLen;
SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pPos->pKey, keyLen);
if (ppPos) {
if ((*ppPos) == pPos) {
return tSimpleHashRemove(pFileState->rowStateBuff, pPos->pKey, keyLen);
int32_t tmpRes = tSimpleHashRemove(pFileState->rowStateBuff, pPos->pKey, keyLen);
qTrace("%s at line %d res:%d", __func__, __LINE__, tmpRes);
}
}
return TSDB_CODE_SUCCESS;
}
void stateHashBuffClearFn(void* pBuff) { tSimpleHashClear(pBuff); }
@ -276,13 +276,15 @@ void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts, bool all) {
pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
}
destroyRowBuffPos(pPos);
tdListPopNode(pFileState->usedBuffs, pNode);
taosMemoryFreeClear(pNode);
SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
taosMemoryFreeClear(tmp);
}
}
}
void clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max) {
int32_t clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
uint64_t i = 0;
SListIter iter = {0};
tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
@ -291,16 +293,24 @@ void clearFlushedRowBuff(SStreamFileState* pFileState, SStreamSnapshot* pFlushLi
while ((pNode = tdListNext(&iter)) != NULL && i < max) {
SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data;
if (isFlushedState(pFileState, pFileState->getTs(pPos->pKey), 0) && !pPos->beUsed) {
tdListAppend(pFlushList, &pPos);
code = tdListAppend(pFlushList, &pPos);
TSDB_CHECK_CODE(code, lino, _end);
pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
tdListPopNode(pFileState->usedBuffs, pNode);
taosMemoryFreeClear(pNode);
SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
taosMemoryFreeClear(tmp);
if (pPos->pRowBuff) {
i++;
}
}
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
void streamFileStateClear(SStreamFileState* pFileState) {
@ -314,7 +324,9 @@ bool needClearDiskBuff(SStreamFileState* pFileState) { return pFileState->flushM
void streamFileStateReleaseBuff(SStreamFileState* pFileState, SRowBuffPos* pPos, bool used) { pPos->beUsed = used; }
void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) {
int32_t popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uint64_t max, bool used) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
uint64_t i = 0;
SListIter iter = {0};
tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD);
@ -327,11 +339,13 @@ void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uin
ASSERT(pPos->needFree == true);
continue;
}
tdListAppend(pFlushList, &pPos);
code = tdListAppend(pFlushList, &pPos);
TSDB_CHECK_CODE(code, lino, _end);
pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
pFileState->stateBuffRemoveByPosFn(pFileState, pPos);
tdListPopNode(pFileState->usedBuffs, pNode);
taosMemoryFreeClear(pNode);
SListNode* tmp = tdListPopNode(pFileState->usedBuffs, pNode);
taosMemoryFreeClear(tmp);
if (pPos->pRowBuff) {
i++;
}
@ -339,22 +353,35 @@ void popUsedBuffs(SStreamFileState* pFileState, SStreamSnapshot* pFlushList, uin
}
qInfo("stream state flush %d rows to disk. is used:%d", listNEles(pFlushList), used);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t flushRowBuff(SStreamFileState* pFileState) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamSnapshot* pFlushList = tdListNew(POINTER_BYTES);
if (!pFlushList) {
return TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO);
num = TMAX(num, FLUSH_NUM);
clearFlushedRowBuff(pFileState, pFlushList, num);
code = clearFlushedRowBuff(pFileState, pFlushList, num);
TSDB_CHECK_CODE(code, lino, _end);
if (isListEmpty(pFlushList)) {
popUsedBuffs(pFileState, pFlushList, num, false);
code = popUsedBuffs(pFileState, pFlushList, num, false);
TSDB_CHECK_CODE(code, lino, _end);
if (isListEmpty(pFlushList)) {
popUsedBuffs(pFileState, pFlushList, num, true);
code = popUsedBuffs(pFileState, pFlushList, num, true);
TSDB_CHECK_CODE(code, lino, _end);
}
}
@ -369,7 +396,12 @@ int32_t flushRowBuff(SStreamFileState* pFileState) {
}
tdListFreeP(pFlushList, destroyRowBuffPosPtr);
return TSDB_CODE_SUCCESS;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t clearRowBuff(SStreamFileState* pFileState) {
@ -400,8 +432,20 @@ void streamFileStateClearBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
}
SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos));
if (!pPos) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
pPos->pKey = taosMemoryCalloc(1, pFileState->keyLen);
if (!pPos->pKey) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
void* pBuff = getFreeBuff(pFileState);
if (pBuff) {
pPos->pRowBuff = pBuff;
@ -417,18 +461,28 @@ SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) {
}
}
int32_t code = clearRowBuff(pFileState);
ASSERT(code == 0);
code = clearRowBuff(pFileState);
TSDB_CHECK_CODE(code, lino, _end);
pPos->pRowBuff = getFreeBuff(pFileState);
code = tdListAppend(pFileState->usedBuffs, &pPos);
TSDB_CHECK_CODE(code, lino, _end);
_end:
tdListAppend(pFileState->usedBuffs, &pPos);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return NULL;
}
ASSERT(pPos->pRowBuff != NULL);
return pPos;
}
SRowBuffPos* getNewRowPosForWrite(SStreamFileState* pFileState) {
SRowBuffPos* newPos = getNewRowPos(pFileState);
if (!newPos) {
return NULL;
}
newPos->beUsed = true;
newPos->beFlushed = false;
newPos->needFree = false;
@ -451,7 +505,11 @@ int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, voi
goto _end;
}
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
ASSERT(pNewPos->pRowBuff);
if (!pNewPos || !pNewPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
memcpy(pNewPos->pKey, pKey, keyLen);
(*pWinCode) = TSDB_CODE_FAILED;
@ -498,42 +556,65 @@ int32_t resetRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t key
return TSDB_CODE_FAILED;
}
static void recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
static int32_t recoverSessionRowBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t len = 0;
void* pBuff = NULL;
pFileState->stateFileGetFn(pFileState, pPos->pKey, &pBuff, &len);
code = pFileState->stateFileGetFn(pFileState, pPos->pKey, &pBuff, &len);
TSDB_CHECK_CODE(code, lino, _end);
memcpy(pPos->pRowBuff, pBuff, len);
taosMemoryFree(pBuff);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (pPos->pRowBuff) {
if (pPos->needFree) {
recoverSessionRowBuff(pFileState, pPos);
code = recoverSessionRowBuff(pFileState, pPos);
TSDB_CHECK_CODE(code, lino, _end);
}
(*pVal) = pPos->pRowBuff;
return TSDB_CODE_SUCCESS;
goto _end;
}
pPos->pRowBuff = getFreeBuff(pFileState);
if (!pPos->pRowBuff) {
if (pFileState->curRowCount < pFileState->maxRowCount) {
pPos->pRowBuff = taosMemoryCalloc(1, pFileState->rowSize);
if (!pPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
pFileState->curRowCount++;
} else {
int32_t code = clearRowBuff(pFileState);
ASSERT(code == 0);
code = clearRowBuff(pFileState);
TSDB_CHECK_CODE(code, lino, _end);
pPos->pRowBuff = getFreeBuff(pFileState);
}
ASSERT(pPos->pRowBuff);
}
recoverSessionRowBuff(pFileState, pPos);
code = recoverSessionRowBuff(pFileState, pPos);
TSDB_CHECK_CODE(code, lino, _end);
(*pVal) = pPos->pRowBuff;
if (!pPos->needFree) {
tdListPrepend(pFileState->usedBuffs, &pPos);
code = tdListPrepend(pFileState->usedBuffs, &pPos);
}
return TSDB_CODE_SUCCESS;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen) {
@ -726,6 +807,7 @@ void recoverSesssion(SStreamFileState* pFileState, int64_t ckId) {
void recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (pFileState->maxTs != INT64_MIN) {
int64_t mark = (INT64_MIN + pFileState->deleteMark >= pFileState->maxTs)
? INT64_MIN
@ -744,6 +826,11 @@ void recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
void* pVal = NULL;
int32_t vlen = 0;
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
if (!pNewPos || !pNewPos->pRowBuff) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _end);
}
code = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
if (code != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) {
destroyRowBuffPos(pNewPos);
@ -763,6 +850,11 @@ void recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
}
streamStateCurPrev_rocksdb(pCur);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
streamStateFreeCur(pCur);
}

View File

@ -55,13 +55,11 @@ void tdListEmptyP(SList *list, FDelete fp) {
}
}
void *tdListFreeP(SList *list, FDelete fp) {
void tdListFreeP(SList *list, FDelete fp) {
if (list) {
tdListEmptyP(list, fp);
taosMemoryFree(list);
}
return NULL;
}
void tdListPrependNode(SList *list, SListNode *node) { TD_DLIST_PREPEND(list, node); }