refactor stream code

This commit is contained in:
Yihao Deng 2024-01-08 08:38:16 +00:00
parent 1c28f53525
commit 4c5441da7e
4 changed files with 77 additions and 75 deletions

View File

@ -201,7 +201,7 @@ int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) {
void* key = taosHashGetKey(pIter, NULL);
code = streamStateCvtDataFormat(pMeta->path, key, *(void**)pIter);
if (code != 0) {
qError("failed to cvt data");
stError("failed to cvt data");
goto _EXIT;
}
@ -225,11 +225,11 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
if (compatible == STREAM_STATA_COMPATIBLE) {
return 0;
} else if (compatible == STREAM_STATA_NEED_CONVERT) {
qInfo("stream state need covert backend format");
stInfo("stream state need covert backend format");
return streamMetaCvtDbFormat(pMeta);
} else if (compatible == STREAM_STATA_NO_COMPATIBLE) {
qError(
stError(
"stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
"manually",
tsDataDir);

View File

@ -17,17 +17,14 @@
#include "query.h"
#include "streamBackendRocksdb.h"
#include "taos.h"
#include "tcommon.h"
#include "thash.h"
#include "tsimplehash.h"
typedef int (*__session_compare_fn_t) (const SSessionKey* pWin, const void* pDatas, int pos);
typedef int (*__session_compare_fn_t)(const SSessionKey* pWin, const void* pDatas, int pos);
int sessionStateKeyCompare(const SSessionKey* pWin1, const void* pDatas, int pos) {
SRowBuffPos* pPos2 = taosArrayGetP(pDatas, pos);
SSessionKey* pWin2 = (SSessionKey*) pPos2->pKey;
SSessionKey* pWin2 = (SSessionKey*)pPos2->pKey;
return sessionWinKeyCmpr(pWin1, pWin2);
}
@ -80,7 +77,8 @@ static SRowBuffPos* addNewSessionWindow(SStreamFileState* pFileState, SArray* pW
return pNewPos;
}
static SRowBuffPos* insertNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, const SSessionKey* pKey, int32_t index) {
static SRowBuffPos* insertNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, const SSessionKey* pKey,
int32_t index) {
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
ASSERT(pNewPos->pRowBuff);
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
@ -97,7 +95,8 @@ SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKe
return pNewPos;
}
int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen) {
int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal,
int32_t* pVLen) {
int32_t code = TSDB_CODE_SUCCESS;
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
SArray* pWinStates = NULL;
@ -146,7 +145,7 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey,
if (index + 1 < size) {
pPos = taosArrayGetP(pWinStates, index + 1);
if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) ) {
if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap))) {
(*pVal) = pPos;
SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
pPos->beUsed = true;
@ -229,9 +228,9 @@ int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, v
return TSDB_CODE_SUCCESS;
}
int32_t deleteSessionWinStateBuffFn(void* pBuff, const void *key, size_t keyLen) {
SSHashObj* pSessionBuff = (SSHashObj*) pBuff;
SSessionKey* pWinKey = (SSessionKey*) key;
int32_t deleteSessionWinStateBuffFn(void* pBuff, const void* key, size_t keyLen) {
SSHashObj* pSessionBuff = (SSHashObj*)pBuff;
SSessionKey* pWinKey = (SSessionKey*)key;
void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
if (!ppBuff) {
return TSDB_CODE_SUCCESS;
@ -252,7 +251,7 @@ int32_t deleteSessionWinStateBuffFn(void* pBuff, const void *key, size_t keyLen)
int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
SSessionKey* pWinKey = (SSessionKey*) pPos->pKey;
SSessionKey* pWinKey = (SSessionKey*)pPos->pKey;
void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
if (!ppBuff) {
return TSDB_CODE_SUCCESS;
@ -270,7 +269,8 @@ int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffP
return TSDB_CODE_SUCCESS;
}
int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStreamStateCur* pCur, const SSessionKey* pWinKey, void** ppVal, int32_t* pVLen) {
int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStreamStateCur* pCur,
const SSessionKey* pWinKey, void** ppVal, int32_t* pVLen) {
SRowBuffPos* pNewPos = NULL;
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
@ -297,7 +297,7 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream
} else {
if (size > 0) {
SRowBuffPos* pPos = taosArrayGetP(pWinStates, 0);
if (sessionWinKeyCmpr(pWinKey, pPos->pKey) >=0) {
if (sessionWinKeyCmpr(pWinKey, pPos->pKey) >= 0) {
// pCur is invalid
SSessionKey pTmpKey = *pWinKey;
int32_t code = getSessionWinResultBuff(pFileState, &pTmpKey, 0, (void**)&pNewPos, pVLen);
@ -336,7 +336,7 @@ void sessionWinStateCleanup(void* pBuff) {
size_t keyLen = 0;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(pBuff, pIte, &iter)) != NULL) {
SArray* pWinStates = (SArray*) (*(void**)pIte);
SArray* pWinStates = (SArray*)(*(void**)pIte);
taosArrayDestroy(pWinStates);
}
tSimpleHashCleanup(pBuff);
@ -395,11 +395,13 @@ static void transformCursor(SStreamFileState* pFileState, SStreamStateCur* pCur)
pCur->pStreamFileState = pFileState;
}
static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t groupId, SArray* pWinStates, SStreamStateCur** ppCur) {
static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t groupId, SArray* pWinStates,
SStreamStateCur** ppCur) {
SSessionKey key = {.groupId = groupId};
int32_t code = streamStateSessionGetKVByCur_rocksdb(*ppCur, &key, NULL, NULL);
if (taosArrayGetSize(pWinStates) > 0 && (code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0)) {
if ( !(*ppCur) ) {
if (taosArrayGetSize(pWinStates) > 0 &&
(code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0)) {
if (!(*ppCur)) {
(*ppCur) = createStreamStateCursor();
}
transformCursor(pFileState, *ppCur);
@ -467,7 +469,8 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void
} else {
void* pData = NULL;
code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, &pData, pVLen);
if (taosArrayGetSize(pWinStates) > 0 && (code == TSDB_CODE_FAILED || sessionStateKeyCompare(pKey, pWinStates, 0) >= 0)) {
if (taosArrayGetSize(pWinStates) > 0 &&
(code == TSDB_CODE_FAILED || sessionStateKeyCompare(pKey, pWinStates, 0) >= 0)) {
transformCursor(pCur->pStreamFileState, pCur);
SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex);
if (pVal) {
@ -560,7 +563,8 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
(*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
code = code_file;
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file);
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
pWinKey->win.ekey, code_file);
} else {
(*pVal) = addNewSessionWindow(pFileState, pWinStates, key);
code = TSDB_CODE_FAILED;
@ -589,7 +593,8 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
if (index + 1 < size) {
pPos = taosArrayGetP(pWinStates, index + 1);
void* stateKey = (char*)(pPos->pRowBuff) + (valSize - keyDataLen);
if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) || fn(pKeyData, stateKey) == true) {
if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) ||
fn(pKeyData, stateKey) == true) {
(*pVal) = pPos;
SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
pPos->beUsed = true;
@ -607,7 +612,8 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
(*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
code = code_file;
qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file);
qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
pWinKey->win.ekey, code_file);
goto _end;
} else {
taosMemoryFree(p);

View File

@ -15,10 +15,8 @@
#include "streamSnapshot.h"
#include "query.h"
#include "rocksdb/c.h"
#include "streamBackendRocksdb.h"
#include "streamInt.h"
#include "tcommon.h"
enum SBackendFileType {
ROCKSDB_OPTIONS_TYPE = 1,
@ -158,7 +156,7 @@ void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
}
sprintf(buf + strlen(buf) - 1, "]");
qInfo("%s %" PRId64 "-%" PRId64 " get file list: %s", STREAM_STATE_TRANSFER, pSnapFile->snapInfo.streamId,
stInfo("%s %" PRId64 "-%" PRId64 " get file list: %s", STREAM_STATE_TRANSFER, pSnapFile->snapInfo.streamId,
pSnapFile->snapInfo.taskId, buf);
taosMemoryFree(buf);
}
@ -203,7 +201,7 @@ int32_t snapFileGenMeta(SBackendSnapFile2* pSnapFile) {
int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) {
TdDirPtr pDir = taosOpenDir(pSnapFile->path);
if (NULL == pDir) {
qError("%s failed to open %s", STREAM_STATE_TRANSFER, pSnapFile->path);
stError("%s failed to open %s", STREAM_STATE_TRANSFER, pSnapFile->path);
return -1;
}
@ -397,7 +395,7 @@ _NEXT:
}
item = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
qDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64
stDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64
", file no.%d, total set:%d, current set idx: %d",
STREAM_STATE_TRANSFER, item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx,
(int)taosArrayGetSize(pHandle->pDbSnapSet), pHandle->currIdx);
@ -515,7 +513,7 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t
stError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code));
return code;
} else {
qInfo("succ to write data %s", pItem->name);
stInfo("succ to write data %s", pItem->name);
}
pSnapFile->offset += bytes;
} else {
@ -538,7 +536,7 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t
}
taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset);
qInfo("succ to write data %s", pItem->name);
stInfo("succ to write data %s", pItem->name);
pSnapFile->offset += pHdr->size;
}
code = 0;
@ -563,7 +561,7 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
"checkpoint", snapInfo.chkpId);
if (!taosIsDir(path)) {
code = taosMulMkDir(path);
qInfo("%s mkdir %s", STREAM_STATE_TRANSFER, path);
stInfo("%s mkdir %s", STREAM_STATE_TRANSFER, path);
ASSERT(code == 0);
}

View File

@ -13,8 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "query.h"
#include "tdatablock.h"
#include "tencode.h"
#include "tstreamUpdate.h"
#include "ttime.h"
@ -388,7 +386,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
bool isIncrementalTimeStamp(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
TSKEY *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
bool res = true;
if ( pMapMaxTs && ts < *pMapMaxTs ) {
if (pMapMaxTs && ts < *pMapMaxTs) {
res = false;
} else {
taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), &ts, sizeof(TSKEY));