support fill history

This commit is contained in:
yihaoDeng 2023-06-15 21:29:52 +08:00
parent 693942b821
commit 9f9171719a
9 changed files with 506 additions and 383 deletions

View File

@ -1,11 +1,14 @@
# rocksdb # rocksdb
ExternalProject_Add(rocksdb ExternalProject_Add(rocksdb
GIT_REPOSITORY https://github.com/facebook/rocksdb.git URL https://github.com/facebook/rocksdb/archive/refs/tags/v8.1.1.tar.gz
GIT_TAG v8.1.1 URL_HASH MD5=3b4c97ee45df9c8a5517308d31ab008b
DOWNLOAD_NO_PROGRESS 1
DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download"
SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb" SOURCE_DIR "${TD_CONTRIB_DIR}/rocksdb"
CONFIGURE_COMMAND "" CONFIGURE_COMMAND ""
BUILD_COMMAND "" BUILD_COMMAND ""
INSTALL_COMMAND "" INSTALL_COMMAND ""
TEST_COMMAND "" TEST_COMMAND ""
) )

View File

@ -129,22 +129,30 @@ typedef struct SSerializeDataHandle {
} SSerializeDataHandle; } SSerializeDataHandle;
// incremental state storage // incremental state storage
typedef struct STdbState {
typedef struct SBackendWrapper {
void *rocksdb; void *rocksdb;
void **pHandle; void **pHandle;
void *writeOpts; void *writeOpts;
void *readOpts; void *readOpts;
void **cfOpts; void **cfOpts;
void *dbOpt; void *dbOpt;
struct SStreamTask *pOwner;
void *param; void *param;
void *env; void *env;
SListNode *pComparNode; SListNode *pComparNode;
void *pBackend; void *pBackend;
char idstr[64];
void *compactFactory; void *compactFactory;
TdThreadRwlock rwLock; TdThreadRwlock rwLock;
bool remove;
int64_t backendId;
char idstr[64];
} SBackendWrapper;
typedef struct STdbState {
SBackendWrapper *pBackendWrapper;
int64_t backendWrapperId;
char idstr[64];
struct SStreamTask *pOwner;
void *db; void *db;
void *pStateDb; void *pStateDb;
void *pFuncStateDb; void *pFuncStateDb;

View File

@ -366,6 +366,7 @@ typedef struct SStreamMeta {
void* streamBackend; void* streamBackend;
int64_t streamBackendRid; int64_t streamBackendRid;
SHashObj* pTaskBackendUnique; SHashObj* pTaskBackendUnique;
TdThreadMutex backendMutex;
} SStreamMeta; } SStreamMeta;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
@ -584,10 +585,10 @@ int32_t streamSetStatusNormal(SStreamTask* pTask);
const char* streamGetTaskStatusStr(int32_t status); const char* streamGetTaskStatusStr(int32_t status);
// source level // source level
int32_t streamSetParamForStreamScanner(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow); int32_t streamSetParamForStreamScanner(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
int32_t streamSourceScanHistoryData(SStreamTask* pTask); int32_t streamSourceScanHistoryData(SStreamTask* pTask);
//int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver); // int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver);
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
int32_t streamDispatchTransferStateMsg(SStreamTask* pTask); int32_t streamDispatchTransferStateMsg(SStreamTask* pTask);

View File

@ -25,7 +25,8 @@
#define SINK_NODE_LEVEL (0) #define SINK_NODE_LEVEL (0)
extern bool tsDeployOnSnode; extern bool tsDeployOnSnode;
static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, int32_t fillHistory); static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId,
SVgObj* pVgroup, int32_t fillHistory);
static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask); static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask);
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType, int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
@ -101,13 +102,13 @@ int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
return 0; return 0;
} }
int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SArray* pSinkNodeList, SStreamTask* pTask) { int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SArray* pSinkNodeList,
SStreamTask* pTask) {
bool isShuffle = false; bool isShuffle = false;
if (pStream->fixedSinkVgId == 0) { if (pStream->fixedSinkVgId == 0) {
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb); SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
if (pDb != NULL && pDb->cfg.numOfVgroups > 1) { if (pDb != NULL && pDb->cfg.numOfVgroups > 1) {
isShuffle = true; isShuffle = true;
pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH; pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH;
pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH; pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
@ -225,7 +226,8 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SArray* pTaskList, SStrea
return 0; return 0;
} }
int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, int32_t fillHistory) { int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup,
int32_t fillHistory) {
SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, fillHistory, 0, pTaskList); SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, fillHistory, 0, pTaskList);
if (pTask == NULL) { if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -248,7 +250,7 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas
// todo set the correct ts, which should be last key of queried table. // todo set the correct ts, which should be last key of queried table.
pTask->dataRange.window.skey = INT64_MIN; pTask->dataRange.window.skey = INT64_MIN;
pTask->dataRange.window.ekey = 1685959190000;//taosGetTimestampMs(); pTask->dataRange.window.ekey = 1685959190000; // taosGetTimestampMs();
mDebug("add source task 0x%x window:%" PRId64 " - %" PRId64, pTask->id.taskId, pTask->dataRange.window.skey, mDebug("add source task 0x%x window:%" PRId64 " - %" PRId64, pTask->id.taskId, pTask->dataRange.window.skey,
pTask->dataRange.window.ekey); pTask->dataRange.window.ekey);
@ -298,7 +300,7 @@ int32_t setEpToDownstreamTask(SStreamTask* pTask, SStreamTask* pDownstream) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
if(pDownstream->pUpstreamEpInfoList == NULL) { if (pDownstream->pUpstreamEpInfoList == NULL) {
pDownstream->pUpstreamEpInfoList = taosArrayInit(4, POINTER_BYTES); pDownstream->pUpstreamEpInfoList = taosArrayInit(4, POINTER_BYTES);
} }
@ -314,7 +316,7 @@ static SArray* addNewTaskList(SArray* pTasksList) {
// set the history task id // set the history task id
static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) { static void setHTasksId(SArray* pTaskList, const SArray* pHTaskList) {
for(int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pTaskList); ++i) {
SStreamTask** pStreamTask = taosArrayGet(pTaskList, i); SStreamTask** pStreamTask = taosArrayGet(pTaskList, i);
SStreamTask** pHTask = taosArrayGet(pHTaskList, i); SStreamTask** pHTask = taosArrayGet(pHTaskList, i);
@ -367,8 +369,8 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan*
// new stream task // new stream task
SArray** pSinkTaskList = taosArrayGet(pStream->tasks, SINK_NODE_LEVEL); SArray** pSinkTaskList = taosArrayGet(pStream->tasks, SINK_NODE_LEVEL);
int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, *pSinkTaskList, pStream, plan, pStream->uid, int32_t code =
0, hasExtraSink); addSourceStreamTask(pMnode, pVgroup, pTaskList, *pSinkTaskList, pStream, plan, pStream->uid, 0, hasExtraSink);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
return -1; return -1;
@ -390,8 +392,8 @@ static int32_t addSourceTasksForOneLevelStream(SMnode* pMnode, const SQueryPlan*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t uid, SStreamTask* pDownstreamTask, SMnode* pMnode, static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t uid, SStreamTask* pDownstreamTask,
SSubplan* pPlan, SVgObj* pVgroup) { SMnode* pMnode, SSubplan* pPlan, SVgObj* pVgroup) {
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, fillHistory, 0, pTaskList); SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SOURCE, fillHistory, 0, pTaskList);
if (pTask == NULL) { if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -400,9 +402,10 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui
// todo set the correct ts, which should be last key of queried table. // todo set the correct ts, which should be last key of queried table.
pTask->dataRange.window.skey = INT64_MIN; pTask->dataRange.window.skey = INT64_MIN;
pTask->dataRange.window.ekey = 1685959190000;//taosGetTimestampMs(); pTask->dataRange.window.ekey = 1685959190000; // taosGetTimestampMs();
mDebug("s-task:0x%x set time window:%"PRId64" - %"PRId64, pTask->id.taskId, pTask->dataRange.window.skey, pTask->dataRange.window.ekey); mDebug("s-task:0x%x set time window:%" PRId64 " - %" PRId64, pTask->id.taskId, pTask->dataRange.window.skey,
pTask->dataRange.window.ekey);
// all the source tasks dispatch result to a single agg node. // all the source tasks dispatch result to a single agg node.
setFixedDownstreamEpInfo(pTask, pDownstreamTask); setFixedDownstreamEpInfo(pTask, pDownstreamTask);
@ -413,8 +416,8 @@ static int32_t doAddSourceTask(SArray* pTaskList, int8_t fillHistory, int64_t ui
return setEpToDownstreamTask(pTask, pDownstreamTask); return setEpToDownstreamTask(pTask, pDownstreamTask);
} }
static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream, int32_t fillHistory, static int32_t doAddAggTask(uint64_t uid, SArray* pTaskList, SArray* pSinkNodeList, SMnode* pMnode, SStreamObj* pStream,
SStreamTask** pAggTask) { int32_t fillHistory, SStreamTask** pAggTask) {
*pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, fillHistory, pStream->conf.triggerParam, pTaskList); *pAggTask = tNewStreamTask(uid, TASK_LEVEL__AGG, fillHistory, pStream->conf.triggerParam, pTaskList);
if (*pAggTask == NULL) { if (*pAggTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -472,7 +475,8 @@ static int32_t addAggTask(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* pPlan
SArray* pHSinkNodeList = taosArrayGetP(pStream->pHTasksList, SINK_NODE_LEVEL); SArray* pHSinkNodeList = taosArrayGetP(pStream->pHTasksList, SINK_NODE_LEVEL);
*pHAggTask = NULL; *pHAggTask = NULL;
code = doAddAggTask(pStream->hTaskUid, pHAggTaskList, pHSinkNodeList, pMnode, pStream, pStream->conf.fillHistory, pHAggTask); code = doAddAggTask(pStream->hTaskUid, pHAggTaskList, pHSinkNodeList, pMnode, pStream, pStream->conf.fillHistory,
pHAggTask);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
if (pSnode != NULL) { if (pSnode != NULL) {
sdbRelease(pSdb, pSnode); sdbRelease(pSdb, pSnode);
@ -538,7 +542,8 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl
} }
if (pStream->conf.fillHistory) { if (pStream->conf.fillHistory) {
code = doAddSourceTask(pHSourceTaskList, pStream->conf.fillHistory, pStream->hTaskUid, pHDownstreamTask, pMnode, plan, pVgroup); code = doAddSourceTask(pHSourceTaskList, pStream->conf.fillHistory, pStream->hTaskUid, pHDownstreamTask, pMnode,
plan, pVgroup);
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -552,7 +557,8 @@ static int32_t addSourceTasksForMultiLevelStream(SMnode* pMnode, SQueryPlan* pPl
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStream, SArray** pCreatedTaskList, int32_t fillHistory) { static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStream, SArray** pCreatedTaskList,
int32_t fillHistory) {
SArray* pSinkTaskList = addNewTaskList(pTasksList); SArray* pSinkTaskList = addNewTaskList(pTasksList);
if (pStream->fixedSinkVgId == 0) { if (pStream->fixedSinkVgId == 0) {
if (mndAddShuffleSinkTasksToStream(pMnode, pSinkTaskList, pStream, fillHistory) < 0) { if (mndAddShuffleSinkTasksToStream(pMnode, pSinkTaskList, pStream, fillHistory) < 0) {
@ -560,7 +566,8 @@ static int32_t addSinkTasks(SArray* pTasksList, SMnode* pMnode, SStreamObj* pStr
return -1; return -1;
} }
} else { } else {
if (mndAddSinkTaskToStream(pStream, pSinkTaskList, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg, fillHistory) < 0) { if (mndAddSinkTaskToStream(pStream, pSinkTaskList, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg,
fillHistory) < 0) {
// TODO free // TODO free
return -1; return -1;
} }
@ -655,8 +662,8 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
terrno = TSDB_CODE_QRY_INVALID_INPUT; terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1; return -1;
} }
}else if(pTopic->subType == TOPIC_SUB_TYPE__TABLE && pTopic->ast != NULL){ } else if (pTopic->subType == TOPIC_SUB_TYPE__TABLE && pTopic->ast != NULL) {
SNode *pAst = NULL; SNode* pAst = NULL;
if (nodesStringToNode(pTopic->ast, &pAst) != 0) { if (nodesStringToNode(pTopic->ast, &pAst) != 0) {
mError("topic:%s, failed to create since %s", pTopic->name, terrstr()); mError("topic:%s, failed to create since %s", pTopic->name, terrstr());
return -1; return -1;
@ -671,7 +678,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
nodesDestroyNode(pAst); nodesDestroyNode(pAst);
} }
if(pPlan){ if (pPlan) {
int32_t levelNum = LIST_LENGTH(pPlan->pSubplans); int32_t levelNum = LIST_LENGTH(pPlan->pSubplans);
if (levelNum != 1) { if (levelNum != 1) {
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);

View File

@ -46,6 +46,7 @@ typedef struct {
void* streamBackendInit(const char* path); void* streamBackendInit(const char* path);
void streamBackendCleanup(void* arg); void streamBackendCleanup(void* arg);
void streamBackendHandleCleanup(void* arg);
SListNode* streamBackendAddCompare(void* backend, void* arg); SListNode* streamBackendAddCompare(void* backend, void* arg);
void streamBackendDelCompare(void* backend, void* arg); void streamBackendDelCompare(void* backend, void* arg);

View File

@ -55,6 +55,7 @@ int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamRe
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
extern int32_t streamBackendId; extern int32_t streamBackendId;
extern int32_t streamBackendWrapperId;
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -40,15 +40,7 @@ typedef struct {
rocksdb_comparator_t** pCompares; rocksdb_comparator_t** pCompares;
} RocksdbCfInst; } RocksdbCfInst;
uint32_t nextPow2(uint32_t x) { uint32_t nextPow2(uint32_t x);
x = x - 1;
x = x | (x >> 1);
x = x | (x >> 2);
x = x | (x >> 4);
x = x | (x >> 8);
x = x | (x >> 16);
return x + 1;
}
int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf); int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf);
void destroyRocksdbCfInst(RocksdbCfInst* inst); void destroyRocksdbCfInst(RocksdbCfInst* inst);
@ -71,7 +63,22 @@ typedef int (*BackendCmpFunc)(void* state, const char* aBuf, size_t aLen, const
typedef void (*DestroyFunc)(void* state); typedef void (*DestroyFunc)(void* state);
typedef int32_t (*EncodeValueFunc)(void* value, int32_t vlen, int64_t ttl, char** dest); typedef int32_t (*EncodeValueFunc)(void* value, int32_t vlen, int64_t ttl, char** dest);
typedef int32_t (*DecodeValueFunc)(void* value, int32_t vlen, int64_t* ttl, char** dest); typedef int32_t (*DecodeValueFunc)(void* value, int32_t vlen, int64_t* ttl, char** dest);
typedef struct {
const char* key;
int32_t len;
int idx;
BackendCmpFunc cmpFunc;
EncodeFunc enFunc;
DecodeFunc deFunc;
ToStringFunc toStrFunc;
CompareName cmpName;
DestroyFunc detroyFunc;
EncodeValueFunc enValueFunc;
DecodeValueFunc deValueFunc;
} SCfInit;
#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX));
const char* compareDefaultName(void* name); const char* compareDefaultName(void* name);
const char* compareStateName(void* name); const char* compareStateName(void* name);
const char* compareWinKeyName(void* name); const char* compareWinKeyName(void* name);
@ -80,6 +87,62 @@ const char* compareFuncKeyName(void* name);
const char* compareParKeyName(void* name); const char* compareParKeyName(void* name);
const char* comparePartagKeyName(void* name); const char* comparePartagKeyName(void* name);
int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
int defaultKeyEncode(void* k, char* buf);
int defaultKeyDecode(void* k, char* buf);
int defaultKeyToString(void* k, char* buf);
int stateKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
int stateKeyEncode(void* k, char* buf);
int stateKeyDecode(void* k, char* buf);
int stateKeyToString(void* k, char* buf);
int stateSessionKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
int stateSessionKeyEncode(void* ses, char* buf);
int stateSessionKeyDecode(void* ses, char* buf);
int stateSessionKeyToString(void* k, char* buf);
int winKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
int winKeyEncode(void* k, char* buf);
int winKeyDecode(void* k, char* buf);
int winKeyToString(void* k, char* buf);
int tupleKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
int tupleKeyEncode(void* k, char* buf);
int tupleKeyDecode(void* k, char* buf);
int tupleKeyToString(void* k, char* buf);
int parKeyDBComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen);
int parKeyEncode(void* k, char* buf);
int parKeyDecode(void* k, char* buf);
int parKeyToString(void* k, char* buf);
int stremaValueEncode(void* k, char* buf);
int streamValueDecode(void* k, char* buf);
int32_t streamValueToString(void* k, char* buf);
int32_t streaValueIsStale(void* k, int64_t ts);
void destroyFunc(void* arg);
int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest);
int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest);
SCfInit ginitDict[] = {
{"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName,
destroyFunc, encodeValueFunc, decodeValueFunc},
{"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyFunc,
encodeValueFunc, decodeValueFunc},
{"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyFunc,
encodeValueFunc, decodeValueFunc},
{"sess", 4, 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString,
compareSessionKeyName, destroyFunc, encodeValueFunc, decodeValueFunc},
{"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyFunc,
encodeValueFunc, decodeValueFunc},
{"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyFunc,
encodeValueFunc, decodeValueFunc},
{"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyFunc,
encodeValueFunc, decodeValueFunc},
};
void* streamBackendInit(const char* path) { void* streamBackendInit(const char* path) {
qDebug("start to init stream backend at %s", path); qDebug("start to init stream backend at %s", path);
SBackendHandle* pHandle = taosMemoryCalloc(1, sizeof(SBackendHandle)); SBackendHandle* pHandle = taosMemoryCalloc(1, sizeof(SBackendHandle));
@ -189,6 +252,69 @@ void streamBackendCleanup(void* arg) {
qDebug("destroy stream backend backend:%p", pHandle); qDebug("destroy stream backend backend:%p", pHandle);
return; return;
} }
void streamBackendHandleCleanup(void* arg) {
SBackendWrapper* wrapper = arg;
bool remove = false;
qDebug("start to do-close backendwrapper %p, %s", wrapper, wrapper->idstr);
if (wrapper->rocksdb == NULL) {
return;
}
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
char* err = NULL;
if (wrapper->remove) {
for (int i = 0; i < cfLen; i++) {
if (wrapper->pHandle[i] != NULL)
rocksdb_drop_column_family(wrapper->rocksdb, ((rocksdb_column_family_handle_t**)wrapper->pHandle)[i], &err);
if (err != NULL) {
// qError("failed to create cf:%s_%s, reason:%s", wrapper->idstr, ginitDict[i].key, err);
taosMemoryFreeClear(err);
}
}
} else {
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
for (int i = 0; i < cfLen; i++) {
if (wrapper->pHandle[i] != NULL) rocksdb_flush_cf(wrapper->rocksdb, flushOpt, wrapper->pHandle[i], &err);
if (err != NULL) {
qError("failed to create cf:%s_%s, reason:%s", wrapper->idstr, ginitDict[i].key, err);
taosMemoryFreeClear(err);
}
}
rocksdb_flushoptions_destroy(flushOpt);
}
for (int i = 0; i < cfLen; i++) {
if (wrapper->pHandle[i] != NULL) {
rocksdb_column_family_handle_destroy(wrapper->pHandle[i]);
}
}
taosMemoryFreeClear(wrapper->pHandle);
for (int i = 0; i < cfLen; i++) {
rocksdb_options_destroy(wrapper->cfOpts[i]);
rocksdb_block_based_options_destroy(((RocksdbCfParam*)wrapper->param)[i].tableOpt);
}
if (remove) {
streamBackendDelCompare(wrapper->pBackend, wrapper->pComparNode);
}
rocksdb_writeoptions_destroy(wrapper->writeOpts);
wrapper->writeOpts = NULL;
rocksdb_readoptions_destroy(wrapper->readOpts);
wrapper->readOpts = NULL;
taosMemoryFreeClear(wrapper->cfOpts);
taosMemoryFreeClear(wrapper->param);
taosThreadRwlockDestroy(&wrapper->rwLock);
wrapper->rocksdb = NULL;
taosReleaseRef(streamBackendId, wrapper->backendId);
qDebug("end to do-close backendwrapper %p, %s", wrapper, wrapper->idstr);
taosMemoryFree(wrapper);
return;
}
SListNode* streamBackendAddCompare(void* backend, void* arg) { SListNode* streamBackendAddCompare(void* backend, void* arg) {
SBackendHandle* pHandle = (SBackendHandle*)backend; SBackendHandle* pHandle = (SBackendHandle*)backend;
SListNode* node = NULL; SListNode* node = NULL;
@ -537,23 +663,6 @@ void destroyFunc(void* arg) {
return; return;
} }
typedef struct {
const char* key;
int32_t len;
int idx;
BackendCmpFunc cmpFunc;
EncodeFunc enFunc;
DecodeFunc deFunc;
ToStringFunc toStrFunc;
CompareName cmpName;
DestroyFunc detroyFunc;
EncodeValueFunc enValueFunc;
DecodeValueFunc deValueFunc;
} SCfInit;
#define GEN_COLUMN_FAMILY_NAME(name, idstr, SUFFIX) sprintf(name, "%s_%s", idstr, (SUFFIX));
int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest) { int32_t encodeValueFunc(void* value, int32_t vlen, int64_t ttl, char** dest) {
SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .data = (char*)(value)}; SStreamValue key = {.unixTimestamp = ttl, .len = vlen, .data = (char*)(value)};
int32_t len = 0; int32_t len = 0;
@ -608,22 +717,6 @@ int32_t decodeValueFunc(void* value, int32_t vlen, int64_t* ttl, char** dest) {
} }
return key.len; return key.len;
} }
SCfInit ginitDict[] = {
{"default", 7, 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, compareDefaultName,
destroyFunc, encodeValueFunc, decodeValueFunc},
{"state", 5, 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, destroyFunc,
encodeValueFunc, decodeValueFunc},
{"fill", 4, 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, destroyFunc,
encodeValueFunc, decodeValueFunc},
{"sess", 4, 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, stateSessionKeyToString,
compareSessionKeyName, destroyFunc, encodeValueFunc, decodeValueFunc},
{"func", 4, 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, destroyFunc,
encodeValueFunc, decodeValueFunc},
{"parname", 7, 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, destroyFunc,
encodeValueFunc, decodeValueFunc},
{"partag", 6, 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, destroyFunc,
encodeValueFunc, decodeValueFunc},
};
const char* compareDefaultName(void* arg) { const char* compareDefaultName(void* arg) {
(void)arg; (void)arg;
@ -817,22 +910,29 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId); qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId);
taosAcquireRef(streamBackendId, pState->streamBackendRid); taosAcquireRef(streamBackendId, pState->streamBackendRid);
SBackendHandle* handle = backend; SBackendHandle* handle = backend;
SBackendWrapper* pBackendWrapper = taosMemoryCalloc(1, sizeof(SBackendWrapper));
sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId);
taosThreadMutexLock(&handle->cfMutex); taosThreadMutexLock(&handle->cfMutex);
RocksdbCfInst** ppInst = taosHashGet(handle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1); RocksdbCfInst** ppInst = taosHashGet(handle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);
if (ppInst != NULL && *ppInst != NULL) { if (ppInst != NULL && *ppInst != NULL) {
RocksdbCfInst* inst = *ppInst; RocksdbCfInst* inst = *ppInst;
pState->pTdbState->rocksdb = inst->db; pBackendWrapper->rocksdb = inst->db;
pState->pTdbState->pHandle = (void**)inst->pHandle; pBackendWrapper->pHandle = (void**)inst->pHandle;
pState->pTdbState->writeOpts = inst->wOpt; pBackendWrapper->writeOpts = inst->wOpt;
pState->pTdbState->readOpts = inst->rOpt; pBackendWrapper->readOpts = inst->rOpt;
pState->pTdbState->cfOpts = (void**)(inst->cfOpt); pBackendWrapper->cfOpts = (void**)(inst->cfOpt);
pState->pTdbState->dbOpt = handle->dbOpt; pBackendWrapper->dbOpt = handle->dbOpt;
pState->pTdbState->param = inst->param; pBackendWrapper->param = inst->param;
pState->pTdbState->pBackend = handle; pBackendWrapper->pBackend = handle;
pState->pTdbState->pComparNode = inst->pCompareNode; pBackendWrapper->pComparNode = inst->pCompareNode;
taosThreadMutexUnlock(&handle->cfMutex); taosThreadMutexUnlock(&handle->cfMutex);
pBackendWrapper->backendId = pState->streamBackendRid;
memcpy(pBackendWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr));
int64_t id = taosAddRef(streamBackendWrapperId, pBackendWrapper);
pState->pTdbState->backendWrapperId = id;
pState->pTdbState->pBackendWrapper = pBackendWrapper;
qInfo("succ to open state %p on backendWrapper, %p, %s", pState, pBackendWrapper, pBackendWrapper->idstr);
return 0; return 0;
} }
taosThreadMutexUnlock(&handle->cfMutex); taosThreadMutexUnlock(&handle->cfMutex);
@ -865,92 +965,43 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
pCompare[i] = compare; pCompare[i] = compare;
} }
rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*)); rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*));
pState->pTdbState->rocksdb = handle->db; pBackendWrapper->rocksdb = handle->db;
pState->pTdbState->pHandle = (void**)cfHandle; pBackendWrapper->pHandle = (void**)cfHandle;
pState->pTdbState->writeOpts = rocksdb_writeoptions_create(); pBackendWrapper->writeOpts = rocksdb_writeoptions_create();
pState->pTdbState->readOpts = rocksdb_readoptions_create(); pBackendWrapper->readOpts = rocksdb_readoptions_create();
pState->pTdbState->cfOpts = (void**)cfOpt; pBackendWrapper->cfOpts = (void**)cfOpt;
pState->pTdbState->dbOpt = handle->dbOpt; pBackendWrapper->dbOpt = handle->dbOpt;
pState->pTdbState->param = param; pBackendWrapper->param = param;
pState->pTdbState->pBackend = handle; pBackendWrapper->pBackend = handle;
pBackendWrapper->backendId = pState->streamBackendRid;
taosThreadRwlockInit(&pState->pTdbState->rwLock, NULL); taosThreadRwlockInit(&pBackendWrapper->rwLock, NULL);
SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen}; SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen};
pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare); pBackendWrapper->pComparNode = streamBackendAddCompare(handle, &compare);
rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1); rocksdb_writeoptions_disable_WAL(pBackendWrapper->writeOpts, 1);
qInfo("succ to open state %p on backend, %p, 0x%" PRIx64 "-%d", pState, handle, pState->streamId, pState->taskId); memcpy(pBackendWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr));
int64_t id = taosAddRef(streamBackendWrapperId, pBackendWrapper);
pState->pTdbState->backendWrapperId = id;
pState->pTdbState->pBackendWrapper = pBackendWrapper;
qInfo("succ to open state %p on backendWrapper, %p, %s", pState, pBackendWrapper, pBackendWrapper->idstr);
return 0; return 0;
} }
void streamStateCloseBackend(SStreamState* pState, bool remove) { void streamStateCloseBackend(SStreamState* pState, bool remove) {
SBackendHandle* pHandle = pState->pTdbState->pBackend; SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper;
SBackendHandle* pHandle = wrapper->pBackend;
taosThreadMutexLock(&pHandle->cfMutex); taosThreadMutexLock(&pHandle->cfMutex);
RocksdbCfInst** ppInst = taosHashGet(pHandle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1); RocksdbCfInst** ppInst = taosHashGet(pHandle->cfInst, wrapper->idstr, strlen(pState->pTdbState->idstr) + 1);
if (ppInst != NULL && *ppInst != NULL) { if (ppInst != NULL && *ppInst != NULL) {
RocksdbCfInst* inst = *ppInst; RocksdbCfInst* inst = *ppInst;
taosMemoryFree(inst); taosMemoryFree(inst);
taosHashRemove(pHandle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1); taosHashRemove(pHandle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);
} }
taosThreadMutexUnlock(&pHandle->cfMutex); taosThreadMutexUnlock(&pHandle->cfMutex);
char* status[] = {"close", "drop"}; char* status[] = {"close", "drop"};
qInfo("start to close %s state %p on backend %p 0x%" PRIx64 "-%d", status[remove == false ? 0 : 1], pState, pHandle, qInfo("start to close %s state %p on backendWrapper %p %s", status[remove == false ? 0 : 1], pState, wrapper,
pState->streamId, pState->taskId); wrapper->idstr);
if (pState->pTdbState->rocksdb == NULL) { taosReleaseRef(streamBackendWrapperId, pState->pTdbState->backendWrapperId);
return;
}
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
char* err = NULL;
if (remove) {
for (int i = 0; i < cfLen; i++) {
if (pState->pTdbState->pHandle[i] != NULL)
rocksdb_drop_column_family(pState->pTdbState->rocksdb,
((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[i], &err);
if (err != NULL) {
qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err);
taosMemoryFreeClear(err);
}
}
} else {
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
for (int i = 0; i < cfLen; i++) {
if (pState->pTdbState->pHandle[i] != NULL)
rocksdb_flush_cf(pState->pTdbState->rocksdb, flushOpt, pState->pTdbState->pHandle[i], &err);
if (err != NULL) {
qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err);
taosMemoryFreeClear(err);
}
}
rocksdb_flushoptions_destroy(flushOpt);
}
for (int i = 0; i < cfLen; i++) {
if (pState->pTdbState->pHandle[i] != NULL) {
rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]);
}
}
taosMemoryFreeClear(pState->pTdbState->pHandle);
for (int i = 0; i < cfLen; i++) {
rocksdb_options_destroy(pState->pTdbState->cfOpts[i]);
rocksdb_block_based_options_destroy(((RocksdbCfParam*)pState->pTdbState->param)[i].tableOpt);
}
if (remove) {
streamBackendDelCompare(pState->pTdbState->pBackend, pState->pTdbState->pComparNode);
}
rocksdb_writeoptions_destroy(pState->pTdbState->writeOpts);
pState->pTdbState->writeOpts = NULL;
rocksdb_readoptions_destroy(pState->pTdbState->readOpts);
pState->pTdbState->readOpts = NULL;
taosMemoryFreeClear(pState->pTdbState->cfOpts);
taosMemoryFreeClear(pState->pTdbState->param);
taosThreadRwlockDestroy(&pState->pTdbState->rwLock);
pState->pTdbState->rocksdb = NULL;
taosReleaseRef(streamBackendId, pState->streamBackendRid);
} }
void streamStateDestroyCompar(void* arg) { void streamStateDestroyCompar(void* arg) {
SCfComparator* comp = (SCfComparator*)arg; SCfComparator* comp = (SCfComparator*)arg;
@ -969,27 +1020,27 @@ int streamStateGetCfIdx(SStreamState* pState, const char* funcName) {
break; break;
} }
} }
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper;
if (pState != NULL && idx != -1) { if (pState != NULL && idx != -1) {
rocksdb_column_family_handle_t* cf = NULL; rocksdb_column_family_handle_t* cf = NULL;
taosThreadRwlockRdlock(&pState->pTdbState->rwLock); taosThreadRwlockRdlock(&wrapper->rwLock);
cf = pState->pTdbState->pHandle[idx]; cf = wrapper->pHandle[idx];
taosThreadRwlockUnlock(&pState->pTdbState->rwLock); taosThreadRwlockUnlock(&wrapper->rwLock);
if (cf == NULL) { if (cf == NULL) {
char buf[128] = {0}; char buf[128] = {0};
GEN_COLUMN_FAMILY_NAME(buf, pState->pTdbState->idstr, ginitDict[idx].key); GEN_COLUMN_FAMILY_NAME(buf, wrapper->idstr, ginitDict[idx].key);
char* err = NULL; char* err = NULL;
taosThreadRwlockWrlock(&pState->pTdbState->rwLock); taosThreadRwlockWrlock(&wrapper->rwLock);
cf = rocksdb_create_column_family(pState->pTdbState->rocksdb, pState->pTdbState->cfOpts[idx], buf, &err); cf = rocksdb_create_column_family(wrapper->rocksdb, wrapper->cfOpts[idx], buf, &err);
if (err != NULL) { if (err != NULL) {
idx = -1; idx = -1;
qError("failed to to open cf, %p 0x%" PRIx64 "-%d_%s, reason:%s", pState, pState->streamId, pState->taskId, qError("failed to to open cf, %p %s_%s, reason:%s", pState, wrapper->idstr, funcName, err);
funcName, err);
taosMemoryFree(err); taosMemoryFree(err);
} else { } else {
pState->pTdbState->pHandle[idx] = cf; wrapper->pHandle[idx] = cf;
} }
taosThreadRwlockUnlock(&pState->pTdbState->rwLock); taosThreadRwlockUnlock(&wrapper->rwLock);
} }
} }
@ -1009,8 +1060,9 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
rocksdb_readoptions_t** readOpt) { rocksdb_readoptions_t** readOpt) {
int idx = streamStateGetCfIdx(pState, cfName); int idx = streamStateGetCfIdx(pState, cfName);
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper;
if (snapshot != NULL) { if (snapshot != NULL) {
*snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb); *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(wrapper->rocksdb);
} }
rocksdb_readoptions_t* rOpt = rocksdb_readoptions_create(); rocksdb_readoptions_t* rOpt = rocksdb_readoptions_create();
*readOpt = rOpt; *readOpt = rOpt;
@ -1018,8 +1070,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
rocksdb_readoptions_set_snapshot(rOpt, *snapshot); rocksdb_readoptions_set_snapshot(rOpt, *snapshot);
rocksdb_readoptions_set_fill_cache(rOpt, 0); rocksdb_readoptions_set_fill_cache(rOpt, 0);
return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt, return rocksdb_create_iterator_cf(wrapper->rocksdb, rOpt, ((rocksdb_column_family_handle_t**)wrapper->pHandle)[idx]);
((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[idx]);
} }
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ #define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
@ -1033,13 +1084,13 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
code = -1; \ code = -1; \
break; \ break; \
} \ } \
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; \
char toString[128] = {0}; \ char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = \ rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \
((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \ rocksdb_t* db = wrapper->rocksdb; \
rocksdb_t* db = pState->pTdbState->rocksdb; \ rocksdb_writeoptions_t* opts = wrapper->writeOpts; \
rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \
char* ttlV = NULL; \ char* ttlV = NULL; \
int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \ rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \
@ -1064,22 +1115,20 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
code = -1; \ code = -1; \
break; \ break; \
} \ } \
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; \
char toString[128] = {0}; \ char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = \ rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \
((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \ rocksdb_t* db = wrapper->rocksdb; \
rocksdb_t* db = pState->pTdbState->rocksdb; \ rocksdb_readoptions_t* opts = wrapper->readOpts; \
rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
size_t len = 0; \ size_t len = 0; \
char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
if (val == NULL || len == 0) { \ if (val == NULL || len == 0) { \
if (err == NULL) { \ if (err == NULL) { \
qTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \ qTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, wrapper->idstr, funcname); \
funcname); \
} else { \ } else { \
qError("streamState str: %s failed to read from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \ qError("streamState str: %s failed to read from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
err); \
taosMemoryFreeClear(err); \ taosMemoryFreeClear(err); \
} \ } \
code = -1; \ code = -1; \
@ -1087,18 +1136,16 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
char* p = NULL; \ char* p = NULL; \
int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \ int32_t tlen = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \
if (tlen <= 0) { \ if (tlen <= 0) { \
qError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, pState->pTdbState->idstr, \ qError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, wrapper->idstr, \
funcname); \ funcname); \
code = -1; \ code = -1; \
} else { \ } else { \
qTrace("streamState str: %s succ to read from %s_%s, valLen:%d", toString, pState->pTdbState->idstr, funcname, \ qTrace("streamState str: %s succ to read from %s_%s, valLen:%d", toString, wrapper->idstr, funcname, tlen); \
tlen); \
} \ } \
taosMemoryFree(val); \ taosMemoryFree(val); \
if (vLen != NULL) *vLen = tlen; \ if (vLen != NULL) *vLen = tlen; \
} \ } \
if (code == 0) \ if (code == 0) qDebug("streamState str: %s succ to read from %s_%s", toString, wrapper->idstr, funcname); \
qDebug("streamState str: %s succ to read from %s_%s", toString, pState->pTdbState->idstr, funcname); \
} while (0); } while (0);
#define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \ #define STREAM_STATE_DEL_ROCKSDB(pState, funcname, key) \
@ -1112,21 +1159,20 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
code = -1; \ code = -1; \
break; \ break; \
} \ } \
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper; \
char toString[128] = {0}; \ char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \ if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
rocksdb_column_family_handle_t* pHandle = \ rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pHandle)[ginitDict[i].idx]; \
((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \ rocksdb_t* db = wrapper->rocksdb; \
rocksdb_t* db = pState->pTdbState->rocksdb; \ rocksdb_writeoptions_t* opts = wrapper->writeOpts; \
rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \
rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err); \ rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err); \
if (err != NULL) { \ if (err != NULL) { \
qError("streamState str: %s failed to del from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \ qError("streamState str: %s failed to del from %s_%s, err: %s", toString, wrapper->idstr, funcname, err); \
err); \
taosMemoryFree(err); \ taosMemoryFree(err); \
code = -1; \ code = -1; \
} else { \ } else { \
qTrace("streamState str: %s succ to del from %s_%s", toString, pState->pTdbState->idstr, funcname); \ qTrace("streamState str: %s succ to del from %s_%s", toString, wrapper->idstr, funcname); \
} \ } \
} while (0); } while (0);
@ -1153,6 +1199,7 @@ int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) {
int32_t streamStateClear_rocksdb(SStreamState* pState) { int32_t streamStateClear_rocksdb(SStreamState* pState) {
qDebug("streamStateClear_rocksdb"); qDebug("streamStateClear_rocksdb");
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper;
char sKeyStr[128] = {0}; char sKeyStr[128] = {0};
char eKeyStr[128] = {0}; char eKeyStr[128] = {0};
SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number}; SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number};
@ -1161,10 +1208,10 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
int sLen = stateKeyEncode(&sKey, sKeyStr); int sLen = stateKeyEncode(&sKey, sKeyStr);
int eLen = stateKeyEncode(&eKey, eKeyStr); int eLen = stateKeyEncode(&eKey, eKeyStr);
if (pState->pTdbState->pHandle[1] != NULL) { if (wrapper->pHandle[1] != NULL) {
char* err = NULL; char* err = NULL;
rocksdb_delete_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[1], rocksdb_delete_range_cf(wrapper->rocksdb, wrapper->writeOpts, wrapper->pHandle[1], sKeyStr, sLen, eKeyStr, eLen,
sKeyStr, sLen, eKeyStr, eLen, &err); &err);
if (err != NULL) { if (err != NULL) {
char toStringStart[128] = {0}; char toStringStart[128] = {0};
char toStringEnd[128] = {0}; char toStringEnd[128] = {0};
@ -1174,7 +1221,7 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
qWarn("failed to delete range cf(state) start: %s, end:%s, reason:%s", toStringStart, toStringEnd, err); qWarn("failed to delete range cf(state) start: %s, end:%s, reason:%s", toStringStart, toStringEnd, err);
taosMemoryFree(err); taosMemoryFree(err);
} else { } else {
rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[1], sKeyStr, sLen, eKeyStr, eLen); rocksdb_compact_range_cf(wrapper->rocksdb, wrapper->pHandle[1], sKeyStr, sLen, eKeyStr, eLen);
} }
} }
@ -1268,8 +1315,9 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
} }
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper;
pCur->number = pState->number; pCur->number = pState->number;
pCur->db = pState->pTdbState->rocksdb; pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt); (rocksdb_readoptions_t**)&pCur->readOpt);
@ -1303,14 +1351,15 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateGetCur_rocksdb"); qDebug("streamStateGetCur_rocksdb");
int32_t code = 0; int32_t code = 0;
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper;
const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX}; const SStateKey maxStateKey = {.key = {.groupId = UINT64_MAX, .ts = INT64_MAX}, .opNum = INT64_MAX};
STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0); STREAM_STATE_PUT_ROCKSDB(pState, "state", &maxStateKey, "", 0);
char buf[128] = {0}; char buf[128] = {0};
int32_t klen = stateKeyEncode((void*)&maxStateKey, buf); int32_t klen = stateKeyEncode((void*)&maxStateKey, buf);
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL; if (pCur == NULL) return NULL;
pCur->db = pState->pTdbState->rocksdb; pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt); (rocksdb_readoptions_t**)&pCur->readOpt);
rocksdb_iter_seek(pCur->iter, buf, (size_t)klen); rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
@ -1330,10 +1379,11 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateGetCur_rocksdb"); qDebug("streamStateGetCur_rocksdb");
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL; if (pCur == NULL) return NULL;
pCur->db = pState->pTdbState->rocksdb; pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot, pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt); (rocksdb_readoptions_t**)&pCur->readOpt);
@ -1421,12 +1471,14 @@ int32_t streamStateSessionDel_rocksdb(SStreamState* pState, const SSessionKey* k
} }
SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) { SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pState, const SSessionKey* key) {
qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb"); qDebug("streamStateSessionSeekKeyCurrentPrev_rocksdb");
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
} }
pCur->number = pState->number; pCur->number = pState->number;
pCur->db = pState->pTdbState->rocksdb; pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot, pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt); (rocksdb_readoptions_t**)&pCur->readOpt);
@ -1462,11 +1514,12 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
} }
SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) { SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pState, SSessionKey* key) {
qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb"); qDebug("streamStateSessionSeekKeyCurrentNext_rocksdb");
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
} }
pCur->db = pState->pTdbState->rocksdb; pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot, pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt); (rocksdb_readoptions_t**)&pCur->readOpt);
pCur->number = pState->number; pCur->number = pState->number;
@ -1499,11 +1552,12 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) { SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, const SSessionKey* key) {
qDebug("streamStateSessionSeekKeyNext_rocksdb"); qDebug("streamStateSessionSeekKeyNext_rocksdb");
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
} }
pCur->db = pState->pTdbState->rocksdb; pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot, pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt); (rocksdb_readoptions_t**)&pCur->readOpt);
pCur->number = pState->number; pCur->number = pState->number;
@ -1593,10 +1647,11 @@ int32_t streamStateFillDel_rocksdb(SStreamState* pState, const SWinKey* key) {
SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillGetCur_rocksdb"); qDebug("streamStateFillGetCur_rocksdb");
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper;
if (pCur == NULL) return NULL; if (pCur == NULL) return NULL;
pCur->db = pState->pTdbState->rocksdb; pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot, pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt); (rocksdb_readoptions_t**)&pCur->readOpt);
@ -1651,12 +1706,13 @@ int32_t streamStateFillGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey,
SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillSeekKeyNext_rocksdb"); qDebug("streamStateFillSeekKeyNext_rocksdb");
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (!pCur) { if (!pCur) {
return NULL; return NULL;
} }
pCur->db = pState->pTdbState->rocksdb; pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot, pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt); (rocksdb_readoptions_t**)&pCur->readOpt);
@ -1687,12 +1743,13 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
} }
SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) { SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const SWinKey* key) {
qDebug("streamStateFillSeekKeyPrev_rocksdb"); qDebug("streamStateFillSeekKeyPrev_rocksdb");
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
} }
pCur->db = pState->pTdbState->rocksdb; pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot, pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt); (rocksdb_readoptions_t**)&pCur->readOpt);
@ -1723,12 +1780,13 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
} }
int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) { int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
qDebug("streamStateSessionGetKeyByRange_rocksdb"); qDebug("streamStateSessionGetKeyByRange_rocksdb");
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper;
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
return -1; return -1;
} }
pCur->number = pState->number; pCur->number = pState->number;
pCur->db = pState->pTdbState->rocksdb; pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot, pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt); (rocksdb_readoptions_t**)&pCur->readOpt);
@ -1959,6 +2017,7 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co
int code = 0; int code = 0;
char* err = NULL; char* err = NULL;
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper;
rocksdb_snapshot_t* snapshot = NULL; rocksdb_snapshot_t* snapshot = NULL;
rocksdb_readoptions_t* readopts = NULL; rocksdb_readoptions_t* readopts = NULL;
rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts); rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts);
@ -1991,15 +2050,16 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co
} }
rocksdb_iter_next(pIter); rocksdb_iter_next(pIter);
} }
rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot); rocksdb_release_snapshot(wrapper->rocksdb, snapshot);
rocksdb_readoptions_destroy(readopts); rocksdb_readoptions_destroy(readopts);
rocksdb_iter_destroy(pIter); rocksdb_iter_destroy(pIter);
return code; return code;
} }
void* streamDefaultIterCreate_rocksdb(SStreamState* pState) { void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper;
pCur->db = pState->pTdbState->rocksdb; pCur->db = wrapper->rocksdb;
pCur->iter = streamStateIterCreate(pState, "default", (rocksdb_snapshot_t**)&pCur->snapshot, pCur->iter = streamStateIterCreate(pState, "default", (rocksdb_snapshot_t**)&pCur->snapshot,
(rocksdb_readoptions_t**)&pCur->readOpt); (rocksdb_readoptions_t**)&pCur->readOpt);
return pCur; return pCur;
@ -2046,6 +2106,7 @@ void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_
void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); } void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); }
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen, int64_t ttl) { void* val, int32_t vlen, int64_t ttl) {
SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper;
int i = streamStateGetCfIdx(pState, cfName); int i = streamStateGetCfIdx(pState, cfName);
if (i < 0) { if (i < 0) {
@ -2057,7 +2118,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr
char* ttlV = NULL; char* ttlV = NULL;
int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, ttl, &ttlV); int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, ttl, &ttlV);
rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx]; rocksdb_column_family_handle_t* pCf = wrapper->pHandle[ginitDict[i].idx];
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
taosMemoryFree(ttlV); taosMemoryFree(ttlV);
return 0; return 0;
@ -2069,7 +2130,9 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
char* ttlV = tmpBuf; char* ttlV = tmpBuf;
int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV); int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV);
rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[cfIdx].idx]; SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper;
rocksdb_column_family_handle_t* pCf = wrapper->pHandle[ginitDict[cfIdx].idx];
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
if (tmpBuf == NULL) { if (tmpBuf == NULL) {
@ -2079,7 +2142,8 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
} }
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) { int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
char* err = NULL; char* err = NULL;
rocksdb_write(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, (rocksdb_writebatch_t*)pBatch, &err); SBackendWrapper* wrapper = pState->pTdbState->pBackendWrapper;
rocksdb_write(wrapper->rocksdb, wrapper->writeOpts, (rocksdb_writebatch_t*)pBatch, &err);
if (err != NULL) { if (err != NULL) {
qError("streamState failed to write batch, err:%s", err); qError("streamState failed to write batch, err:%s", err);
taosMemoryFree(err); taosMemoryFree(err);
@ -2087,3 +2151,13 @@ int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {
} }
return 0; return 0;
} }
uint32_t nextPow2(uint32_t x) {
x = x - 1;
x = x | (x >> 1);
x = x | (x >> 2);
x = x | (x >> 4);
x = x | (x >> 8);
x = x | (x >> 16);
return x + 1;
}

View File

@ -21,10 +21,17 @@
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
int32_t streamBackendId = 0; int32_t streamBackendId = 0;
static void streamMetaEnvInit() { streamBackendId = taosOpenRef(20, streamBackendCleanup); } int32_t streamBackendWrapperId = 0;
static void streamMetaEnvInit() {
streamBackendId = taosOpenRef(64, streamBackendCleanup);
streamBackendWrapperId = taosOpenRef(64, streamBackendHandleCleanup);
}
void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); }
void streamMetaCleanup() { taosCloseRef(streamBackendId); } void streamMetaCleanup() {
taosCloseRef(streamBackendId);
taosCloseRef(streamBackendWrapperId);
}
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) { SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) {
int32_t code = -1; int32_t code = -1;
@ -90,10 +97,14 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
pMeta->streamBackend = streamBackendInit(streamPath); pMeta->streamBackend = streamBackendInit(streamPath);
pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend); pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
pMeta->pTaskBackendUnique =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
taosMemoryFree(streamPath); taosMemoryFree(streamPath);
taosInitRWLatch(&pMeta->lock); taosInitRWLatch(&pMeta->lock);
taosThreadMutexInit(&pMeta->backendMutex, NULL);
return pMeta; return pMeta;
_err: _err:
@ -136,6 +147,8 @@ void streamMetaClose(SStreamMeta* pMeta) {
taosRemoveRef(streamBackendId, pMeta->streamBackendRid); taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
pMeta->pTaskList = taosArrayDestroy(pMeta->pTaskList); pMeta->pTaskList = taosArrayDestroy(pMeta->pTaskList);
taosMemoryFree(pMeta->path); taosMemoryFree(pMeta->path);
taosThreadMutexDestroy(&pMeta->backendMutex);
taosHashCleanup(pMeta->pTaskBackendUnique);
taosMemoryFree(pMeta); taosMemoryFree(pMeta);
} }

View File

@ -116,16 +116,33 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
pState->taskId = pStreamTask->id.taskId; pState->taskId = pStreamTask->id.taskId;
pState->streamId = pStreamTask->id.streamId; pState->streamId = pStreamTask->id.streamId;
sprintf(pState->pTdbState->idstr, "0x%" PRIx64 "-%d", pState->streamId, pState->taskId);
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
SStreamMeta* pMeta = pStreamTask->pMeta; SStreamMeta* pMeta = pStreamTask->pMeta;
pState->streamBackendRid = pMeta->streamBackendRid; pState->streamBackendRid = pMeta->streamBackendRid;
// taosWLockLatch(&pMeta->lock);
taosThreadMutexLock(&pMeta->backendMutex);
void* uniqueId =
taosHashGet(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);
if (uniqueId == NULL) {
int code = streamStateOpenBackend(pMeta->streamBackend, pState); int code = streamStateOpenBackend(pMeta->streamBackend, pState);
if (code == -1) { if (code == -1) {
taosReleaseRef(streamBackendId, pMeta->streamBackendRid); taosReleaseRef(streamBackendId, pState->streamBackendRid);
taosThreadMutexUnlock(&pMeta->backendMutex);
taosMemoryFree(pState); taosMemoryFree(pState);
pState = NULL; return NULL;
} }
taosHashPut(pMeta->pTaskBackendUnique, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1,
&pState->pTdbState->backendWrapperId, sizeof(pState->pTdbState->backendWrapperId));
} else {
int64_t id = *(int64_t*)uniqueId;
pState->pTdbState->backendWrapperId = id;
pState->pTdbState->pBackendWrapper = taosAcquireRef(streamBackendWrapperId, id);
taosAcquireRef(streamBackendId, pState->streamBackendRid);
}
taosThreadMutexUnlock(&pMeta->backendMutex);
pState->pTdbState->pOwner = pTask; pState->pTdbState->pOwner = pTask;
pState->pFileState = NULL; pState->pFileState = NULL;
@ -1113,9 +1130,7 @@ int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) {
#endif #endif
} }
void streamStateReloadInfo(SStreamState* pState, TSKEY ts) { void streamStateReloadInfo(SStreamState* pState, TSKEY ts) { streamFileStateReloadInfo(pState->pFileState, ts); }
streamFileStateReloadInfo(pState->pFileState, ts);
}
#if 0 #if 0
char* streamStateSessionDump(SStreamState* pState) { char* streamStateSessionDump(SStreamState* pState) {