From d46cf878eb48241eacb77a2c073f26a04007ed2f Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 24 Oct 2023 11:53:52 +0800 Subject: [PATCH] refactor stream backend --- source/dnode/mnode/impl/src/mndStream.c | 13 +++++++----- source/libs/stream/src/streamBackendRocksdb.c | 9 ++++++--- source/libs/stream/src/streamState.c | 20 ++++++++++++++++--- 3 files changed, 31 insertions(+), 11 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index b1bcc401f8..eb813c3d86 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1120,10 +1120,11 @@ static const char *mndGetStreamDB(SMnode *pMnode) { } static int32_t mndCheckNodeStatus(SMnode *pMnode) { - bool ready = true; + bool ready = false; // check if the node update happens or not int64_t ts = taosGetTimestampSec(); + taosThreadMutexLock(&execNodeList.lock); if (execNodeList.pNodeEntryList == NULL || (taosArrayGetSize(execNodeList.pNodeEntryList) == 0)) { if (execNodeList.pNodeEntryList != NULL) { execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList); @@ -1135,14 +1136,14 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) { if (taosArrayGetSize(execNodeList.pNodeEntryList) == 0) { mDebug("stream task node change checking done, no vgroups exist, do nothing"); execNodeList.ts = ts; - return -1; + goto _EXIT; } for (int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) { SNodeEntry *pNodeEntry = taosArrayGet(execNodeList.pNodeEntryList, i); if (pNodeEntry->stageUpdated) { mDebug("stream task not ready due to node update detected, checkpoint not issued"); - return -1; + goto _EXIT; } SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode); @@ -1155,13 +1156,12 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) { if (nodeUpdated) { mDebug("stream task not ready due to node update, checkpoint not issued"); - return -1; + goto _EXIT; } } // check if all tasks are in TASK_STATUS__NORMAL status - taosThreadMutexLock(&execNodeList.lock); for (int32_t i = 0; i < taosArrayGetSize(execNodeList.pTaskList); ++i) { STaskId *p = taosArrayGet(execNodeList.pTaskList, i); STaskStatusEntry *pEntry = taosHashGet(execNodeList.pTaskMap, p, sizeof(*p)); @@ -1176,6 +1176,9 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) { break; } } + ready = true; +_EXIT: + taosThreadMutexUnlock(&execNodeList.lock); return ready == true ? 0 : -1; } diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 104289aed3..4add9d2912 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2637,6 +2637,9 @@ int32_t streamStateFuncDel_rocksdb(SStreamState* pState, const STupleKey* key) { int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* key, const void* value, int32_t vLen) { int code = 0; SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; + if (value == NULL || vLen == 0) { + stError("streamStateSessionPut_rocksdb val: %p, len: %d", value, vLen); + } STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, value, vLen); return code; } @@ -2685,11 +2688,11 @@ SStreamStateCur* streamStateSessionSeekToLast_rocksdb(SStreamState* pState) { if (code != 0) { return NULL; } + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; - SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - SStreamStateCur* pCur = createStreamStateCursor(); + SStreamStateCur* pCur = createStreamStateCursor(); pCur->number = pState->number; - pCur->db = wrapper->rocksdb; + pCur->db = wrapper->db; pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot, (rocksdb_readoptions_t**)&pCur->readOpt); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 8c44d3eeb2..218d906504 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -706,9 +706,23 @@ void streamStateFreeVal(void* val) { int32_t streamStateSessionPut(SStreamState* pState, const SSessionKey* key, void* value, int32_t vLen) { #ifdef USE_ROCKSDB - qDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, key->win.skey, key->win.ekey, - key->groupId); - return streamStateSessionPut_rocksdb(pState, key, value, vLen); + int32_t code = TSDB_CODE_SUCCESS; + SRowBuffPos* pos = (SRowBuffPos*)value; + if (pos->needFree) { + if (isFlushedState(pState->pFileState, key->win.ekey, 0)) { + if (!pos->pRowBuff) { + return code; + } + code = streamStateSessionPut_rocksdb(pState, key, pos->pRowBuff, vLen); + streamStateReleaseBuf(pState, pos, true); + putFreeBuff(pState->pFileState, pos); + stDebug("===stream===save skey:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ".code:%d", key->win.skey, + key->win.ekey, key->groupId, code); + } else { + code = putSessionWinResultBuff(pState->pFileState, value); + } + } + return code; #else SStateSessionKey sKey = {.key = *key, .opNum = pState->number}; return tdbTbUpsert(pState->pTdbState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), value, vLen,