refactor stream code
This commit is contained in:
parent
003e4a886d
commit
1c28f53525
|
@ -19,10 +19,7 @@
|
||||||
#include "rocksdb/c.h"
|
#include "rocksdb/c.h"
|
||||||
//#include "streamInt.h"
|
//#include "streamInt.h"
|
||||||
#include "streamState.h"
|
#include "streamState.h"
|
||||||
#include "tcoding.h"
|
|
||||||
#include "tcommon.h"
|
#include "tcommon.h"
|
||||||
#include "tcompare.h"
|
|
||||||
#include "ttimer.h"
|
|
||||||
|
|
||||||
typedef struct SCfComparator {
|
typedef struct SCfComparator {
|
||||||
rocksdb_comparator_t** comp;
|
rocksdb_comparator_t** comp;
|
||||||
|
|
|
@ -14,8 +14,6 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "streamBackendRocksdb.h"
|
#include "streamBackendRocksdb.h"
|
||||||
#include "executor.h"
|
|
||||||
#include "query.h"
|
|
||||||
#include "streamInt.h"
|
#include "streamInt.h"
|
||||||
#include "tcommon.h"
|
#include "tcommon.h"
|
||||||
#include "tref.h"
|
#include "tref.h"
|
||||||
|
@ -1059,14 +1057,14 @@ int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId) {
|
||||||
rocksdb_column_family_handle_t** ppCf = NULL;
|
rocksdb_column_family_handle_t** ppCf = NULL;
|
||||||
|
|
||||||
int32_t nCf = chkpGetAllDbCfHandle2(pTaskDb, &ppCf);
|
int32_t nCf = chkpGetAllDbCfHandle2(pTaskDb, &ppCf);
|
||||||
qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pTaskDb, pChkpIdDir, nCf);
|
stDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pTaskDb, pChkpIdDir, nCf);
|
||||||
|
|
||||||
if ((code = chkpPreFlushDb(pTaskDb->db, ppCf, nCf)) == 0) {
|
if ((code = chkpPreFlushDb(pTaskDb->db, ppCf, nCf)) == 0) {
|
||||||
if ((code = chkpDoDbCheckpoint(pTaskDb->db, pChkpIdDir)) != 0) {
|
if ((code = chkpDoDbCheckpoint(pTaskDb->db, pChkpIdDir)) != 0) {
|
||||||
stError("stream backend:%p failed to do checkpoint at:%s", pTaskDb, pChkpIdDir);
|
stError("stream backend:%p failed to do checkpoint at:%s", pTaskDb, pChkpIdDir);
|
||||||
} else {
|
} else {
|
||||||
qDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pTaskDb, pChkpIdDir,
|
stDebug("stream backend:%p end to do checkpoint at:%s, time cost:%" PRId64 "ms", pTaskDb, pChkpIdDir,
|
||||||
taosGetTimestampMs() - st);
|
taosGetTimestampMs() - st);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
stError("stream backend:%p failed to flush db at:%s", pTaskDb, pChkpIdDir);
|
stError("stream backend:%p failed to flush db at:%s", pTaskDb, pChkpIdDir);
|
||||||
|
@ -1797,7 +1795,7 @@ STaskDbWrapper* taskDbOpenImpl(char* key, char* statePath, char* dbPath) {
|
||||||
cfNames = NULL;
|
cfNames = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("succ to init stream backend at %s, backend:%p", dbPath, pTaskDb);
|
stDebug("succ to init stream backend at %s, backend:%p", dbPath, pTaskDb);
|
||||||
return pTaskDb;
|
return pTaskDb;
|
||||||
_EXIT:
|
_EXIT:
|
||||||
|
|
||||||
|
@ -1826,7 +1824,7 @@ void taskDbDestroy(void* pDb, bool flush) {
|
||||||
|
|
||||||
streamMetaRemoveDB(wrapper->pMeta, wrapper->idstr);
|
streamMetaRemoveDB(wrapper->pMeta, wrapper->idstr);
|
||||||
|
|
||||||
qDebug("succ to destroy stream backend:%p", wrapper);
|
stDebug("succ to destroy stream backend:%p", wrapper);
|
||||||
|
|
||||||
int8_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]);
|
int8_t nCf = sizeof(ginitDict) / sizeof(ginitDict[0]);
|
||||||
|
|
||||||
|
@ -2323,7 +2321,7 @@ int streamStateGetCfIdx(SStreamState* pState, const char* funcName) {
|
||||||
stError("failed to open cf, %p %s_%s, reason:%s", pState, wrapper->idstr, funcName, err);
|
stError("failed to open cf, %p %s_%s, reason:%s", pState, wrapper->idstr, funcName, err);
|
||||||
taosMemoryFree(err);
|
taosMemoryFree(err);
|
||||||
} else {
|
} else {
|
||||||
qDebug("succ to open cf, %p %s_%s", pState, wrapper->idstr, funcName);
|
stDebug("succ to open cf, %p %s_%s", pState, wrapper->idstr, funcName);
|
||||||
wrapper->pCf[idx] = cf;
|
wrapper->pCf[idx] = cf;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2365,14 +2363,14 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
|
||||||
char* err = NULL; \
|
char* err = NULL; \
|
||||||
int i = streamStateGetCfIdx(pState, funcname); \
|
int i = streamStateGetCfIdx(pState, funcname); \
|
||||||
if (i < 0) { \
|
if (i < 0) { \
|
||||||
qWarn("streamState failed to get cf name: %s", funcname); \
|
stWarn("streamState failed to get cf name: %s", funcname); \
|
||||||
code = -1; \
|
code = -1; \
|
||||||
break; \
|
break; \
|
||||||
} \
|
} \
|
||||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
|
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
|
||||||
wrapper->dataWritten += 1; \
|
wrapper->dataWritten += 1; \
|
||||||
char toString[128] = {0}; \
|
char toString[128] = {0}; \
|
||||||
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
||||||
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
||||||
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
|
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
|
||||||
rocksdb_writeoptions_t* opts = wrapper->writeOpt; \
|
rocksdb_writeoptions_t* opts = wrapper->writeOpt; \
|
||||||
|
@ -2385,54 +2383,54 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
|
||||||
taosMemoryFree(err); \
|
taosMemoryFree(err); \
|
||||||
code = -1; \
|
code = -1; \
|
||||||
} else { \
|
} else { \
|
||||||
qTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d, %p", toString, funcname, vLen, \
|
stTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d, %p", toString, funcname, vLen, \
|
||||||
ttlVLen, wrapper); \
|
ttlVLen, wrapper); \
|
||||||
} \
|
} \
|
||||||
taosMemoryFree(ttlV); \
|
taosMemoryFree(ttlV); \
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
|
#define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \
|
||||||
do { \
|
do { \
|
||||||
code = 0; \
|
code = 0; \
|
||||||
char buf[128] = {0}; \
|
char buf[128] = {0}; \
|
||||||
char* err = NULL; \
|
char* err = NULL; \
|
||||||
int i = streamStateGetCfIdx(pState, funcname); \
|
int i = streamStateGetCfIdx(pState, funcname); \
|
||||||
if (i < 0) { \
|
if (i < 0) { \
|
||||||
qWarn("streamState failed to get cf name: %s", funcname); \
|
stWarn("streamState failed to get cf name: %s", funcname); \
|
||||||
code = -1; \
|
code = -1; \
|
||||||
break; \
|
break; \
|
||||||
} \
|
} \
|
||||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
|
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
|
||||||
char toString[128] = {0}; \
|
char toString[128] = {0}; \
|
||||||
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
||||||
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
||||||
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
|
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
|
||||||
rocksdb_t* db = wrapper->db; \
|
rocksdb_t* db = wrapper->db; \
|
||||||
rocksdb_readoptions_t* opts = wrapper->readOpt; \
|
rocksdb_readoptions_t* opts = wrapper->readOpt; \
|
||||||
size_t len = 0; \
|
size_t len = 0; \
|
||||||
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
|
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
|
||||||
if (val == NULL || len == 0) { \
|
if (val == NULL || len == 0) { \
|
||||||
if (err == NULL) { \
|
if (err == NULL) { \
|
||||||
qTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \
|
stTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \
|
||||||
} else { \
|
} else { \
|
||||||
stError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
|
stError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
|
||||||
taosMemoryFreeClear(err); \
|
taosMemoryFreeClear(err); \
|
||||||
} \
|
} \
|
||||||
code = -1; \
|
code = -1; \
|
||||||
} else { \
|
} else { \
|
||||||
char* p = NULL; \
|
char* p = NULL; \
|
||||||
int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \
|
int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \
|
||||||
if (tlen <= 0) { \
|
if (tlen <= 0) { \
|
||||||
stError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \
|
stError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \
|
||||||
funcname); \
|
funcname); \
|
||||||
code = -1; \
|
code = -1; \
|
||||||
} else { \
|
} else { \
|
||||||
qTrace("streamState str: %s succ to read from %s_%s, valLen:%d, %p", toString, wrapper->idstr, funcname, tlen, \
|
stTrace("streamState str: %s succ to read from %s_%s, valLen:%d, %p", toString, wrapper->idstr, funcname, \
|
||||||
wrapper); \
|
tlen, wrapper); \
|
||||||
} \
|
} \
|
||||||
taosMemoryFree(val); \
|
taosMemoryFree(val); \
|
||||||
if (vLen != NULL) *vLen = tlen; \
|
if (vLen != NULL) *vLen = tlen; \
|
||||||
} \
|
} \
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \
|
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \
|
||||||
|
@ -2442,14 +2440,14 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
|
||||||
char* err = NULL; \
|
char* err = NULL; \
|
||||||
int i = streamStateGetCfIdx(pState, funcname); \
|
int i = streamStateGetCfIdx(pState, funcname); \
|
||||||
if (i < 0) { \
|
if (i < 0) { \
|
||||||
qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \
|
stWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \
|
||||||
code = -1; \
|
code = -1; \
|
||||||
break; \
|
break; \
|
||||||
} \
|
} \
|
||||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
|
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \
|
||||||
wrapper->dataWritten += 1; \
|
wrapper->dataWritten += 1; \
|
||||||
char toString[128] = {0}; \
|
char toString[128] = {0}; \
|
||||||
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
if (stDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
|
||||||
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
|
||||||
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
|
rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \
|
||||||
rocksdb_t* db = wrapper->db; \
|
rocksdb_t* db = wrapper->db; \
|
||||||
|
@ -2460,7 +2458,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe
|
||||||
taosMemoryFree(err); \
|
taosMemoryFree(err); \
|
||||||
code = -1; \
|
code = -1; \
|
||||||
} else { \
|
} else { \
|
||||||
qTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname); \
|
stTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname); \
|
||||||
} \
|
} \
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
|
@ -2681,7 +2679,7 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState) {
|
||||||
}
|
}
|
||||||
#ifdef BUILD_NO_CALL
|
#ifdef BUILD_NO_CALL
|
||||||
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
|
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
|
||||||
qDebug("streamStateGetCur_rocksdb");
|
stDebug("streamStateGetCur_rocksdb");
|
||||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
||||||
|
|
||||||
SStreamStateCur* pCur = createStreamStateCursor();
|
SStreamStateCur* pCur = createStreamStateCursor();
|
||||||
|
@ -2775,7 +2773,7 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState) {
|
SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState) {
|
||||||
qDebug("streamStateSessionSeekToLast_rocksdb");
|
stDebug("streamStateSessionSeekToLast_rocksdb");
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -2812,7 +2810,7 @@ SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamStateSessionCurPrev_rocksdb(SStreamStateCur* pCur) {
|
int32_t streamStateSessionCurPrev_rocksdb(SStreamStateCur* pCur) {
|
||||||
qDebug("streamStateCurPrev_rocksdb");
|
stDebug("streamStateCurPrev_rocksdb");
|
||||||
if (!pCur) return -1;
|
if (!pCur) return -1;
|
||||||
|
|
||||||
rocksdb_iter_prev(pCur->iter);
|
rocksdb_iter_prev(pCur->iter);
|
||||||
|
@ -2862,7 +2860,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
|
||||||
return pCur;
|
return pCur;
|
||||||
}
|
}
|
||||||
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) {
|
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) {
|
||||||
qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb");
|
stDebug("streamStateSessionSeekKeyCurrentNext_rocksdb");
|
||||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
||||||
SStreamStateCur* pCur = createStreamStateCursor();
|
SStreamStateCur* pCur = createStreamStateCursor();
|
||||||
if (pCur == NULL) {
|
if (pCur == NULL) {
|
||||||
|
@ -2900,7 +2898,7 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) {
|
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) {
|
||||||
qDebug("streamStateSessionSeekKeyNext_rocksdb");
|
stDebug("streamStateSessionSeekKeyNext_rocksdb");
|
||||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
||||||
SStreamStateCur* pCur = createStreamStateCursor();
|
SStreamStateCur* pCur = createStreamStateCursor();
|
||||||
if (pCur == NULL) {
|
if (pCur == NULL) {
|
||||||
|
@ -3004,7 +3002,7 @@ int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
|
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
|
||||||
qDebug("streamStateFillGetCur_rocksdb");
|
stDebug("streamStateFillGetCur_rocksdb");
|
||||||
SStreamStateCur* pCur = createStreamStateCursor();
|
SStreamStateCur* pCur = createStreamStateCursor();
|
||||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
||||||
|
|
||||||
|
@ -3064,7 +3062,7 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
|
SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
|
||||||
qDebug("streamStateFillSeekKeyNext_rocksdb");
|
stDebug("streamStateFillSeekKeyNext_rocksdb");
|
||||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
||||||
SStreamStateCur* pCur = createStreamStateCursor();
|
SStreamStateCur* pCur = createStreamStateCursor();
|
||||||
if (!pCur) {
|
if (!pCur) {
|
||||||
|
@ -3102,7 +3100,7 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) {
|
SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) {
|
||||||
qDebug("streamStateFillSeekKeyPrev_rocksdb");
|
stDebug("streamStateFillSeekKeyPrev_rocksdb");
|
||||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
||||||
SStreamStateCur* pCur = createStreamStateCursor();
|
SStreamStateCur* pCur = createStreamStateCursor();
|
||||||
if (pCur == NULL) {
|
if (pCur == NULL) {
|
||||||
|
|
Loading…
Reference in New Issue