Merge pull request #24379 from taosdata/fix/addTestCaseToStreamBackend
Fix/add test case to stream backend
This commit is contained in:
commit
3508c9b2f4
|
@ -19,10 +19,7 @@
|
|||
#include "rocksdb/c.h"
|
||||
//#include "streamInt.h"
|
||||
#include "streamState.h"
|
||||
#include "tcoding.h"
|
||||
#include "tcommon.h"
|
||||
#include "tcompare.h"
|
||||
#include "ttimer.h"
|
||||
|
||||
typedef struct SCfComparator {
|
||||
rocksdb_comparator_t** comp;
|
||||
|
|
|
@ -14,8 +14,6 @@
|
|||
*/
|
||||
|
||||
#include "streamBackendRocksdb.h"
|
||||
#include "executor.h"
|
||||
#include "query.h"
|
||||
#include "streamInt.h"
|
||||
#include "tcommon.h"
|
||||
#include "tref.h"
|
||||
|
@ -1059,14 +1057,14 @@ int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId) {
|
|||
rocksdb_column_family_handle_t** ppCf = NULL;
|
||||
|
||||
int32_t nCf = chkpGetAllDbCfHandle2(pTaskDb, &ppCf);
|
||||
qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pTaskDb, pChkpIdDir, nCf);
|
||||
stDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pTaskDb, pChkpIdDir, nCf);
|
||||
|
||||
if ((code = chkpPreFlushDb(pTaskDb->db, ppCf, nCf)) == 0) {
|
||||
if ((code = chkpDoDbCheckpoint(pTaskDb->db, pChkpIdDir)) != 0) {
|
||||
stError("stream backend:%p failed to do checkpoint at:%s", pTaskDb, pChkpIdDir);
|
||||
} else {
|
||||
qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pTaskDb, pChkpIdDir,
|
||||
taosGetTimestampMs() - st);
|
||||
stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pTaskDb, pChkpIdDir,
|
||||
taosGetTimestampMs() - st);
|
||||
}
|
||||
} else {
|
||||
stError("stream backend:%p failed to flush db at:%s", pTaskDb, pChkpIdDir);
|
||||
|
@ -1797,7 +1795,7 @@ STaskDbWrapper* taskDbOpenImpl(char* key, char* statePath, char* dbPath) {
|
|||
cfNames = NULL;
|
||||
}
|
||||
|
||||
qDebug("succ to init stream backend at %s, backend:%p", dbPath, pTaskDb);
|
||||
stDebug("succ to init stream backend at %s, backend:%p", dbPath, pTaskDb);
|
||||
return pTaskDb;
|
||||
_EXIT:
|
||||
|
||||
|
@ -1826,7 +1824,7 @@ void taskDbDestroy(void* pDb, bool flush) {
|
|||
|
||||
streamMetaRemoveDB(wrapper->pMeta, wrapper->idstr);
|
||||
|
||||
qDebug("succ to destroy stream backend:%p", wrapper);
|
||||
stDebug("succ to destroy stream backend:%p", wrapper);
|
||||
|
||||
int8_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]);
|
||||
|
||||
|
@ -2323,7 +2321,7 @@ int streamStateGetCfIdx(SStreamState* pState, const char* funcName) {
|
|||
stError("failed to open cf, %p %s_%s, reason:%s", pState, wrapper->idstr, funcName, err);
|
||||
taosMemoryFree(err);
|
||||
} else {
|
||||
qDebug("succ to open cf, %p %s_%s", pState, wrapper->idstr, funcName);
|
||||
stDebug("succ to open cf, %p %s_%s", pState, wrapper->idstr, funcName);
|
||||
wrapper->pCf[idx] = cf;
|
||||
}
|
||||
}
|
||||
|
@ -2365,14 +2363,14 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
|
|||
char* err = NULL; \
|
||||
int i = streamStateGetCfIdx(pState, funcname); \
|
||||
if (i < 0) { \
|
||||
qWarn("streamState failed to get cf name: %s", funcname); \
|
||||
stWarn("streamState failed to get cf name: %s", funcname); \
|
||||
code = -1; \
|
||||
break; \
|
||||
} \
|
||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
|
||||
wrapper->dataWritten += 1; \
|
||||
char toString[128] = {0}; \
|
||||
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
||||
if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
||||
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
||||
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
|
||||
rocksdb_writeoptions_t* opts = wrapper->writeOpt; \
|
||||
|
@ -2385,54 +2383,54 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
|
|||
taosMemoryFree(err); \
|
||||
code = -1; \
|
||||
} else { \
|
||||
qTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d, %p", toString, funcname, vLen, \
|
||||
ttlVLen, wrapper); \
|
||||
stTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d, %p", toString, funcname, vLen, \
|
||||
ttlVLen, wrapper); \
|
||||
} \
|
||||
taosMemoryFree(ttlV); \
|
||||
} while (0);
|
||||
|
||||
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
|
||||
do { \
|
||||
code = 0; \
|
||||
char buf[128] = {0}; \
|
||||
char* err = NULL; \
|
||||
int i = streamStateGetCfIdx(pState, funcname); \
|
||||
if (i < 0) { \
|
||||
qWarn("streamState failed to get cf name: %s", funcname); \
|
||||
code = -1; \
|
||||
break; \
|
||||
} \
|
||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
|
||||
char toString[128] = {0}; \
|
||||
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
||||
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
||||
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
|
||||
rocksdb_t* db = wrapper->db; \
|
||||
rocksdb_readoptions_t* opts = wrapper->readOpt; \
|
||||
size_t len = 0; \
|
||||
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
|
||||
if (val == NULL || len == 0) { \
|
||||
if (err == NULL) { \
|
||||
qTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \
|
||||
} else { \
|
||||
stError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
|
||||
taosMemoryFreeClear(err); \
|
||||
} \
|
||||
code = -1; \
|
||||
} else { \
|
||||
char* p = NULL; \
|
||||
int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \
|
||||
if (tlen <= 0) { \
|
||||
stError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \
|
||||
funcname); \
|
||||
code = -1; \
|
||||
} else { \
|
||||
qTrace("streamState str: %s succ to read from %s_%s, valLen:%d, %p", toString, wrapper->idstr, funcname, tlen, \
|
||||
wrapper); \
|
||||
} \
|
||||
taosMemoryFree(val); \
|
||||
if (vLen != NULL) *vLen = tlen; \
|
||||
} \
|
||||
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
|
||||
do { \
|
||||
code = 0; \
|
||||
char buf[128] = {0}; \
|
||||
char* err = NULL; \
|
||||
int i = streamStateGetCfIdx(pState, funcname); \
|
||||
if (i < 0) { \
|
||||
stWarn("streamState failed to get cf name: %s", funcname); \
|
||||
code = -1; \
|
||||
break; \
|
||||
} \
|
||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
|
||||
char toString[128] = {0}; \
|
||||
if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
||||
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
||||
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
|
||||
rocksdb_t* db = wrapper->db; \
|
||||
rocksdb_readoptions_t* opts = wrapper->readOpt; \
|
||||
size_t len = 0; \
|
||||
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
|
||||
if (val == NULL || len == 0) { \
|
||||
if (err == NULL) { \
|
||||
stTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \
|
||||
} else { \
|
||||
stError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
|
||||
taosMemoryFreeClear(err); \
|
||||
} \
|
||||
code = -1; \
|
||||
} else { \
|
||||
char* p = NULL; \
|
||||
int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \
|
||||
if (tlen <= 0) { \
|
||||
stError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \
|
||||
funcname); \
|
||||
code = -1; \
|
||||
} else { \
|
||||
stTrace("streamState str: %s succ to read from %s_%s, valLen:%d, %p", toString, wrapper->idstr, funcname, \
|
||||
tlen, wrapper); \
|
||||
} \
|
||||
taosMemoryFree(val); \
|
||||
if (vLen != NULL) *vLen = tlen; \
|
||||
} \
|
||||
} while (0);
|
||||
|
||||
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \
|
||||
|
@ -2442,14 +2440,14 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
|
|||
char* err = NULL; \
|
||||
int i = streamStateGetCfIdx(pState, funcname); \
|
||||
if (i < 0) { \
|
||||
qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \
|
||||
stWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \
|
||||
code = -1; \
|
||||
break; \
|
||||
} \
|
||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
|
||||
wrapper->dataWritten += 1; \
|
||||
char toString[128] = {0}; \
|
||||
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
||||
if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
||||
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
||||
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
|
||||
rocksdb_t* db = wrapper->db; \
|
||||
|
@ -2460,7 +2458,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
|
|||
taosMemoryFree(err); \
|
||||
code = -1; \
|
||||
} else { \
|
||||
qTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname); \
|
||||
stTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname); \
|
||||
} \
|
||||
} while (0);
|
||||
|
||||
|
@ -2681,7 +2679,7 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) {
|
|||
}
|
||||
#ifdef BUILD_NO_CALL
|
||||
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
|
||||
qDebug("streamStateGetCur_rocksdb");
|
||||
stDebug("streamStateGetCur_rocksdb");
|
||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
||||
|
||||
SStreamStateCur* pCur = createStreamStateCursor();
|
||||
|
@ -2775,7 +2773,7 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k
|
|||
}
|
||||
|
||||
SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState) {
|
||||
qDebug("streamStateSessionSeekToLast_rocksdb");
|
||||
stDebug("streamStateSessionSeekToLast_rocksdb");
|
||||
|
||||
int32_t code = 0;
|
||||
|
||||
|
@ -2812,7 +2810,7 @@ SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState) {
|
|||
}
|
||||
|
||||
int32_t streamStateSessionCurPrev_rocksdb(SStreamStateCur* pCur) {
|
||||
qDebug("streamStateCurPrev_rocksdb");
|
||||
stDebug("streamStateCurPrev_rocksdb");
|
||||
if (!pCur) return -1;
|
||||
|
||||
rocksdb_iter_prev(pCur->iter);
|
||||
|
@ -2862,7 +2860,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
|
|||
return pCur;
|
||||
}
|
||||
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) {
|
||||
qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb");
|
||||
stDebug("streamStateSessionSeekKeyCurrentNext_rocksdb");
|
||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
||||
SStreamStateCur* pCur = createStreamStateCursor();
|
||||
if (pCur == NULL) {
|
||||
|
@ -2900,7 +2898,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
|
|||
}
|
||||
|
||||
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) {
|
||||
qDebug("streamStateSessionSeekKeyNext_rocksdb");
|
||||
stDebug("streamStateSessionSeekKeyNext_rocksdb");
|
||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
||||
SStreamStateCur* pCur = createStreamStateCursor();
|
||||
if (pCur == NULL) {
|
||||
|
@ -3004,7 +3002,7 @@ int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) {
|
|||
}
|
||||
|
||||
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
|
||||
qDebug("streamStateFillGetCur_rocksdb");
|
||||
stDebug("streamStateFillGetCur_rocksdb");
|
||||
SStreamStateCur* pCur = createStreamStateCursor();
|
||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
||||
|
||||
|
@ -3064,7 +3062,7 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
|
|||
}
|
||||
|
||||
SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
|
||||
qDebug("streamStateFillSeekKeyNext_rocksdb");
|
||||
stDebug("streamStateFillSeekKeyNext_rocksdb");
|
||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
||||
SStreamStateCur* pCur = createStreamStateCursor();
|
||||
if (!pCur) {
|
||||
|
@ -3102,7 +3100,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
|
|||
return NULL;
|
||||
}
|
||||
SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) {
|
||||
qDebug("streamStateFillSeekKeyPrev_rocksdb");
|
||||
stDebug("streamStateFillSeekKeyPrev_rocksdb");
|
||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
||||
SStreamStateCur* pCur = createStreamStateCursor();
|
||||
if (pCur == NULL) {
|
||||
|
|
|
@ -201,7 +201,7 @@ int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) {
|
|||
void* key = taosHashGetKey(pIter, NULL);
|
||||
code = streamStateCvtDataFormat(pMeta->path, key, *(void**)pIter);
|
||||
if (code != 0) {
|
||||
qError("failed to cvt data");
|
||||
stError("failed to cvt data");
|
||||
goto _EXIT;
|
||||
}
|
||||
|
||||
|
@ -225,11 +225,11 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
|
|||
if (compatible == STREAM_STATA_COMPATIBLE) {
|
||||
return 0;
|
||||
} else if (compatible == STREAM_STATA_NEED_CONVERT) {
|
||||
qInfo("stream state need covert backend format");
|
||||
stInfo("stream state need covert backend format");
|
||||
|
||||
return streamMetaCvtDbFormat(pMeta);
|
||||
} else if (compatible == STREAM_STATA_NO_COMPATIBLE) {
|
||||
qError(
|
||||
stError(
|
||||
"stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream "
|
||||
"manually",
|
||||
tsDataDir);
|
||||
|
|
|
@ -17,17 +17,14 @@
|
|||
|
||||
#include "query.h"
|
||||
#include "streamBackendRocksdb.h"
|
||||
#include "taos.h"
|
||||
#include "tcommon.h"
|
||||
#include "thash.h"
|
||||
#include "tsimplehash.h"
|
||||
|
||||
|
||||
typedef int (*__session_compare_fn_t) (const SSessionKey* pWin, const void* pDatas, int pos);
|
||||
typedef int (*__session_compare_fn_t)(const SSessionKey* pWin, const void* pDatas, int pos);
|
||||
|
||||
int sessionStateKeyCompare(const SSessionKey* pWin1, const void* pDatas, int pos) {
|
||||
SRowBuffPos* pPos2 = taosArrayGetP(pDatas, pos);
|
||||
SSessionKey* pWin2 = (SSessionKey*) pPos2->pKey;
|
||||
SSessionKey* pWin2 = (SSessionKey*)pPos2->pKey;
|
||||
return sessionWinKeyCmpr(pWin1, pWin2);
|
||||
}
|
||||
|
||||
|
@ -61,12 +58,12 @@ int32_t binarySearch(void* keyList, int num, const void* key, __session_compare_
|
|||
int64_t getSessionWindowEndkey(void* data, int32_t index) {
|
||||
SArray* pWinInfos = (SArray*)data;
|
||||
SRowBuffPos** ppos = taosArrayGet(pWinInfos, index);
|
||||
SSessionKey* pWin = (SSessionKey*)((*ppos)->pKey);
|
||||
SSessionKey* pWin = (SSessionKey*)((*ppos)->pKey);
|
||||
return pWin->win.ekey;
|
||||
}
|
||||
|
||||
bool inSessionWindow(SSessionKey* pKey, TSKEY ts, int64_t gap) {
|
||||
if (ts + gap >= pKey->win.skey && ts - gap <= pKey->win.ekey) {
|
||||
if (ts + gap >= pKey->win.skey && ts - gap <= pKey->win.ekey) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
@ -80,7 +77,8 @@ static SRowBuffPos* addNewSessionWindow(SStreamFileState* pFileState, SArray* pW
|
|||
return pNewPos;
|
||||
}
|
||||
|
||||
static SRowBuffPos* insertNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, const SSessionKey* pKey, int32_t index) {
|
||||
static SRowBuffPos* insertNewSessionWindow(SStreamFileState* pFileState, SArray* pWinInfos, const SSessionKey* pKey,
|
||||
int32_t index) {
|
||||
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
|
||||
ASSERT(pNewPos->pRowBuff);
|
||||
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
|
||||
|
@ -97,11 +95,12 @@ SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKe
|
|||
return pNewPos;
|
||||
}
|
||||
|
||||
int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal, int32_t* pVLen) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, TSKEY gap, void** pVal,
|
||||
int32_t* pVLen) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
|
||||
SArray* pWinStates = NULL;
|
||||
void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
|
||||
SArray* pWinStates = NULL;
|
||||
void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
|
||||
if (ppBuff) {
|
||||
pWinStates = (SArray*)(*ppBuff);
|
||||
} else {
|
||||
|
@ -146,7 +145,7 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey,
|
|||
|
||||
if (index + 1 < size) {
|
||||
pPos = taosArrayGetP(pWinStates, index + 1);
|
||||
if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) ) {
|
||||
if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap))) {
|
||||
(*pVal) = pPos;
|
||||
SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
|
||||
pPos->beUsed = true;
|
||||
|
@ -157,9 +156,9 @@ int32_t getSessionWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey,
|
|||
|
||||
if (index + 1 == 0) {
|
||||
if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, gap)) {
|
||||
void* p = NULL;
|
||||
void* pFileStore = getStateFileStore(pFileState);
|
||||
int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
|
||||
void* p = NULL;
|
||||
void* pFileStore = getStateFileStore(pFileState);
|
||||
int32_t code_file = streamStateSessionAddIfNotExist_rocksdb(pFileStore, pKey, gap, &p, pVLen);
|
||||
if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
|
||||
(*pVal) = createSessionWinBuff(pFileState, pKey, p, pVLen);
|
||||
code = code_file;
|
||||
|
@ -184,10 +183,10 @@ _end:
|
|||
}
|
||||
|
||||
int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos) {
|
||||
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
|
||||
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
|
||||
SSessionKey* pKey = pPos->pKey;
|
||||
SArray* pWinStates = NULL;
|
||||
void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
|
||||
SArray* pWinStates = NULL;
|
||||
void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
|
||||
if (ppBuff) {
|
||||
pWinStates = (SArray*)(*ppBuff);
|
||||
} else {
|
||||
|
@ -202,7 +201,7 @@ int32_t putSessionWinResultBuff(SStreamFileState* pFileState, SRowBuffPos* pPos)
|
|||
}
|
||||
|
||||
// find the first position which is smaller than the pKey
|
||||
int32_t index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare);
|
||||
int32_t index = binarySearch(pWinStates, size, pKey, sessionStateKeyCompare);
|
||||
if (index >= 0) {
|
||||
taosArrayInsert(pWinStates, index, &pPos);
|
||||
} else {
|
||||
|
@ -218,7 +217,7 @@ int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, v
|
|||
SRowBuffPos* pNewPos = getNewRowPosForWrite(pFileState);
|
||||
memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey));
|
||||
pNewPos->needFree = true;
|
||||
void* pBuff = NULL;
|
||||
void* pBuff = NULL;
|
||||
int32_t code = streamStateSessionGet_rocksdb(getStateFileStore(pFileState), pKey, &pBuff, pVLen);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
|
@ -229,16 +228,16 @@ int32_t getSessionFlushedBuff(SStreamFileState* pFileState, SSessionKey* pKey, v
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t deleteSessionWinStateBuffFn(void* pBuff, const void *key, size_t keyLen) {
|
||||
SSHashObj* pSessionBuff = (SSHashObj*) pBuff;
|
||||
SSessionKey* pWinKey = (SSessionKey*) key;
|
||||
void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
|
||||
int32_t deleteSessionWinStateBuffFn(void* pBuff, const void* key, size_t keyLen) {
|
||||
SSHashObj* pSessionBuff = (SSHashObj*)pBuff;
|
||||
SSessionKey* pWinKey = (SSessionKey*)key;
|
||||
void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
|
||||
if (!ppBuff) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
SArray* pWinStates = (SArray*)(*ppBuff);
|
||||
int32_t size = taosArrayGetSize(pWinStates);
|
||||
TSKEY gap = 0;
|
||||
TSKEY gap = 0;
|
||||
int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
|
||||
if (index >= 0) {
|
||||
SRowBuffPos* pPos = taosArrayGetP(pWinStates, index);
|
||||
|
@ -251,15 +250,15 @@ int32_t deleteSessionWinStateBuffFn(void* pBuff, const void *key, size_t keyLen)
|
|||
}
|
||||
|
||||
int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
|
||||
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
|
||||
SSessionKey* pWinKey = (SSessionKey*) pPos->pKey;
|
||||
void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
|
||||
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
|
||||
SSessionKey* pWinKey = (SSessionKey*)pPos->pKey;
|
||||
void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
|
||||
if (!ppBuff) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
SArray* pWinStates = (SArray*)(*ppBuff);
|
||||
int32_t size = taosArrayGetSize(pWinStates);
|
||||
TSKEY gap = 0;
|
||||
TSKEY gap = 0;
|
||||
int32_t index = binarySearch(pWinStates, size, pWinKey, sessionStateKeyCompare);
|
||||
if (index >= 0) {
|
||||
SRowBuffPos* pItemPos = taosArrayGetP(pWinStates, index);
|
||||
|
@ -270,11 +269,12 @@ int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffP
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStreamStateCur* pCur, const SSessionKey* pWinKey, void** ppVal, int32_t* pVLen) {
|
||||
int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStreamStateCur* pCur,
|
||||
const SSessionKey* pWinKey, void** ppVal, int32_t* pVLen) {
|
||||
SRowBuffPos* pNewPos = NULL;
|
||||
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
|
||||
void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
|
||||
SArray* pWinStates = NULL;
|
||||
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
|
||||
void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
|
||||
SArray* pWinStates = NULL;
|
||||
if (!ppBuff) {
|
||||
pWinStates = taosArrayInit(16, POINTER_BYTES);
|
||||
tSimpleHashPut(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t), &pWinStates, POINTER_BYTES);
|
||||
|
@ -297,10 +297,10 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream
|
|||
} else {
|
||||
if (size > 0) {
|
||||
SRowBuffPos* pPos = taosArrayGetP(pWinStates, 0);
|
||||
if (sessionWinKeyCmpr(pWinKey, pPos->pKey) >=0) {
|
||||
if (sessionWinKeyCmpr(pWinKey, pPos->pKey) >= 0) {
|
||||
// pCur is invalid
|
||||
SSessionKey pTmpKey = *pWinKey;
|
||||
int32_t code = getSessionWinResultBuff(pFileState, &pTmpKey, 0, (void**)&pNewPos, pVLen);
|
||||
int32_t code = getSessionWinResultBuff(pFileState, &pTmpKey, 0, (void**)&pNewPos, pVLen);
|
||||
ASSERT(code == TSDB_CODE_FAILED);
|
||||
goto _end;
|
||||
}
|
||||
|
@ -320,7 +320,7 @@ void sessionWinStateClear(SStreamFileState* pFileState) {
|
|||
void* pIte = NULL;
|
||||
size_t keyLen = 0;
|
||||
int32_t iter = 0;
|
||||
void* pBuff = getRowStateBuff(pFileState);
|
||||
void* pBuff = getRowStateBuff(pFileState);
|
||||
while ((pIte = tSimpleHashIterate(pBuff, pIte, &iter)) != NULL) {
|
||||
SArray* pWinStates = *((void**)pIte);
|
||||
int32_t size = taosArrayGetSize(pWinStates);
|
||||
|
@ -336,7 +336,7 @@ void sessionWinStateCleanup(void* pBuff) {
|
|||
size_t keyLen = 0;
|
||||
int32_t iter = 0;
|
||||
while ((pIte = tSimpleHashIterate(pBuff, pIte, &iter)) != NULL) {
|
||||
SArray* pWinStates = (SArray*) (*(void**)pIte);
|
||||
SArray* pWinStates = (SArray*)(*(void**)pIte);
|
||||
taosArrayDestroy(pWinStates);
|
||||
}
|
||||
tSimpleHashCleanup(pBuff);
|
||||
|
@ -395,11 +395,13 @@ static void transformCursor(SStreamFileState* pFileState, SStreamStateCur* pCur)
|
|||
pCur->pStreamFileState = pFileState;
|
||||
}
|
||||
|
||||
static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t groupId, SArray* pWinStates, SStreamStateCur** ppCur) {
|
||||
static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t groupId, SArray* pWinStates,
|
||||
SStreamStateCur** ppCur) {
|
||||
SSessionKey key = {.groupId = groupId};
|
||||
int32_t code = streamStateSessionGetKVByCur_rocksdb(*ppCur, &key, NULL, NULL);
|
||||
if (taosArrayGetSize(pWinStates) > 0 && (code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0)) {
|
||||
if ( !(*ppCur) ) {
|
||||
int32_t code = streamStateSessionGetKVByCur_rocksdb(*ppCur, &key, NULL, NULL);
|
||||
if (taosArrayGetSize(pWinStates) > 0 &&
|
||||
(code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0)) {
|
||||
if (!(*ppCur)) {
|
||||
(*ppCur) = createStreamStateCursor();
|
||||
}
|
||||
transformCursor(pFileState, *ppCur);
|
||||
|
@ -410,8 +412,8 @@ static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t
|
|||
}
|
||||
|
||||
SStreamStateCur* sessionWinStateSeekKeyCurrentNext(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
|
||||
SArray* pWinStates = NULL;
|
||||
int32_t index = -1;
|
||||
SArray* pWinStates = NULL;
|
||||
int32_t index = -1;
|
||||
SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
|
||||
if (pCur) {
|
||||
if (sessionStateKeyCompare(pWinKey, pWinStates, index) > 0) {
|
||||
|
@ -427,8 +429,8 @@ SStreamStateCur* sessionWinStateSeekKeyCurrentNext(SStreamFileState* pFileState,
|
|||
}
|
||||
|
||||
SStreamStateCur* sessionWinStateSeekKeyNext(SStreamFileState* pFileState, const SSessionKey* pWinKey) {
|
||||
SArray* pWinStates = NULL;
|
||||
int32_t index = -1;
|
||||
SArray* pWinStates = NULL;
|
||||
int32_t index = -1;
|
||||
SStreamStateCur* pCur = seekKeyCurrentPrev_buff(pFileState, pWinKey, &pWinStates, &index);
|
||||
if (pCur) {
|
||||
sessionWinStateMoveToNext(pCur);
|
||||
|
@ -449,7 +451,7 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void
|
|||
|
||||
SSHashObj* pSessionBuff = getRowStateBuff(pCur->pStreamFileState);
|
||||
void** ppBuff = tSimpleHashGet(pSessionBuff, &pKey->groupId, sizeof(uint64_t));
|
||||
SArray* pWinStates = NULL;
|
||||
SArray* pWinStates = NULL;
|
||||
if (ppBuff) {
|
||||
pWinStates = (SArray*)(*ppBuff);
|
||||
}
|
||||
|
@ -467,7 +469,8 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void
|
|||
} else {
|
||||
void* pData = NULL;
|
||||
code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, &pData, pVLen);
|
||||
if (taosArrayGetSize(pWinStates) > 0 && (code == TSDB_CODE_FAILED || sessionStateKeyCompare(pKey, pWinStates, 0) >= 0)) {
|
||||
if (taosArrayGetSize(pWinStates) > 0 &&
|
||||
(code == TSDB_CODE_FAILED || sessionStateKeyCompare(pKey, pWinStates, 0) >= 0)) {
|
||||
transformCursor(pCur->pStreamFileState, pCur);
|
||||
SRowBuffPos* pPos = taosArrayGetP(pWinStates, pCur->buffIndex);
|
||||
if (pVal) {
|
||||
|
@ -498,9 +501,9 @@ int32_t sessionWinStateMoveToNext(SStreamStateCur* pCur) {
|
|||
|
||||
int32_t sessionWinStateGetKeyByRange(SStreamFileState* pFileState, const SSessionKey* key, SSessionKey* curKey) {
|
||||
SStreamStateCur* pCur = sessionWinStateSeekKeyCurrentPrev(pFileState, key);
|
||||
SSessionKey tmpKey = *key;
|
||||
int32_t code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
|
||||
bool hasCurrentPrev = true;
|
||||
SSessionKey tmpKey = *key;
|
||||
int32_t code = sessionWinStateGetKVByCur(pCur, &tmpKey, NULL, NULL);
|
||||
bool hasCurrentPrev = true;
|
||||
if (code == TSDB_CODE_FAILED) {
|
||||
streamStateFreeCur(pCur);
|
||||
pCur = sessionWinStateSeekKeyNext(pFileState, key);
|
||||
|
@ -535,13 +538,13 @@ _end:
|
|||
}
|
||||
|
||||
int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
|
||||
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
|
||||
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
|
||||
SSessionKey* pWinKey = key;
|
||||
TSKEY gap = 0;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
|
||||
SArray* pWinStates = NULL;
|
||||
void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
|
||||
TSKEY gap = 0;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SSHashObj* pSessionBuff = getRowStateBuff(pFileState);
|
||||
SArray* pWinStates = NULL;
|
||||
void** ppBuff = tSimpleHashGet(pSessionBuff, &pWinKey->groupId, sizeof(uint64_t));
|
||||
if (ppBuff) {
|
||||
pWinStates = (SArray*)(*ppBuff);
|
||||
} else {
|
||||
|
@ -560,7 +563,8 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
|
|||
if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
|
||||
(*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
|
||||
code = code_file;
|
||||
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file);
|
||||
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
|
||||
pWinKey->win.ekey, code_file);
|
||||
} else {
|
||||
(*pVal) = addNewSessionWindow(pFileState, pWinStates, key);
|
||||
code = TSDB_CODE_FAILED;
|
||||
|
@ -589,7 +593,8 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
|
|||
if (index + 1 < size) {
|
||||
pPos = taosArrayGetP(pWinStates, index + 1);
|
||||
void* stateKey = (char*)(pPos->pRowBuff) + (valSize - keyDataLen);
|
||||
if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) || fn(pKeyData, stateKey) == true) {
|
||||
if (inSessionWindow(pPos->pKey, startTs, gap) || (endTs != INT64_MIN && inSessionWindow(pPos->pKey, endTs, gap)) ||
|
||||
fn(pKeyData, stateKey) == true) {
|
||||
(*pVal) = pPos;
|
||||
SSessionKey* pDestWinKey = (SSessionKey*)pPos->pKey;
|
||||
pPos->beUsed = true;
|
||||
|
@ -607,7 +612,8 @@ int32_t getStateWinResultBuff(SStreamFileState* pFileState, SSessionKey* key, ch
|
|||
if (code_file == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
|
||||
(*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen);
|
||||
code = code_file;
|
||||
qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file);
|
||||
qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
|
||||
pWinKey->win.ekey, code_file);
|
||||
goto _end;
|
||||
} else {
|
||||
taosMemoryFree(p);
|
||||
|
|
|
@ -15,10 +15,8 @@
|
|||
|
||||
#include "streamSnapshot.h"
|
||||
#include "query.h"
|
||||
#include "rocksdb/c.h"
|
||||
#include "streamBackendRocksdb.h"
|
||||
#include "streamInt.h"
|
||||
#include "tcommon.h"
|
||||
|
||||
enum SBackendFileType {
|
||||
ROCKSDB_OPTIONS_TYPE = 1,
|
||||
|
@ -158,7 +156,7 @@ void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) {
|
|||
}
|
||||
sprintf(buf + strlen(buf) - 1, "]");
|
||||
|
||||
qInfo("%s %" PRId64 "-%" PRId64 " get file list: %s", STREAM_STATE_TRANSFER, pSnapFile->snapInfo.streamId,
|
||||
stInfo("%s %" PRId64 "-%" PRId64 " get file list: %s", STREAM_STATE_TRANSFER, pSnapFile->snapInfo.streamId,
|
||||
pSnapFile->snapInfo.taskId, buf);
|
||||
taosMemoryFree(buf);
|
||||
}
|
||||
|
@ -203,7 +201,7 @@ int32_t snapFileGenMeta(SBackendSnapFile2* pSnapFile) {
|
|||
int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) {
|
||||
TdDirPtr pDir = taosOpenDir(pSnapFile->path);
|
||||
if (NULL == pDir) {
|
||||
qError("%s failed to open %s", STREAM_STATE_TRANSFER, pSnapFile->path);
|
||||
stError("%s failed to open %s", STREAM_STATE_TRANSFER, pSnapFile->path);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -397,7 +395,7 @@ _NEXT:
|
|||
}
|
||||
item = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx);
|
||||
|
||||
qDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64
|
||||
stDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64
|
||||
", file no.%d, total set:%d, current set idx: %d",
|
||||
STREAM_STATE_TRANSFER, item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx,
|
||||
(int)taosArrayGetSize(pHandle->pDbSnapSet), pHandle->currIdx);
|
||||
|
@ -515,7 +513,7 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t
|
|||
stError("%s failed to write snap, file name:%s, reason:%s", STREAM_STATE_TRANSFER, pHdr->name, tstrerror(code));
|
||||
return code;
|
||||
} else {
|
||||
qInfo("succ to write data %s", pItem->name);
|
||||
stInfo("succ to write data %s", pItem->name);
|
||||
}
|
||||
pSnapFile->offset += bytes;
|
||||
} else {
|
||||
|
@ -538,7 +536,7 @@ int32_t streamSnapWriteImpl(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t
|
|||
}
|
||||
|
||||
taosPWriteFile(pSnapFile->fd, pHdr->data, pHdr->size, pSnapFile->offset);
|
||||
qInfo("succ to write data %s", pItem->name);
|
||||
stInfo("succ to write data %s", pItem->name);
|
||||
pSnapFile->offset += pHdr->size;
|
||||
}
|
||||
code = 0;
|
||||
|
@ -563,7 +561,7 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
|
|||
"checkpoint", snapInfo.chkpId);
|
||||
if (!taosIsDir(path)) {
|
||||
code = taosMulMkDir(path);
|
||||
qInfo("%s mkdir %s", STREAM_STATE_TRANSFER, path);
|
||||
stInfo("%s mkdir %s", STREAM_STATE_TRANSFER, path);
|
||||
ASSERT(code == 0);
|
||||
}
|
||||
|
||||
|
|
|
@ -13,8 +13,6 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "query.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tencode.h"
|
||||
#include "tstreamUpdate.h"
|
||||
#include "ttime.h"
|
||||
|
@ -387,8 +385,8 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) {
|
|||
|
||||
bool isIncrementalTimeStamp(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
|
||||
TSKEY *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t));
|
||||
bool res = true;
|
||||
if ( pMapMaxTs && ts < *pMapMaxTs ) {
|
||||
bool res = true;
|
||||
if (pMapMaxTs && ts < *pMapMaxTs) {
|
||||
res = false;
|
||||
} else {
|
||||
taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), &ts, sizeof(TSKEY));
|
||||
|
|
Loading…
Reference in New Issue