From 04f3784696cc11f534451196b2aee4a9f2ea9d98 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 31 May 2023 11:05:03 +0000 Subject: [PATCH 1/6] change log level --- source/libs/stream/src/streamBackendRocksdb.c | 14 +++++++------- source/libs/stream/src/streamDispatch.c | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index c8a6597bad..9d94a28c8a 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1026,10 +1026,10 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \ if (err != NULL) { \ taosMemoryFree(err); \ - qDebug("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ + qError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ code = -1; \ } else { \ - qDebug("streamState str:%s succ to write to %s, valLen:%d", toString, funcname, vLen); \ + qTrace("streamState str:%s succ to write to %s, valLen:%d", toString, funcname, vLen); \ } \ taosMemoryFree(ttlV); \ } while (0); @@ -1056,10 +1056,10 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \ if (val == NULL) { \ if (err == NULL) { \ - qDebug("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \ + qTrace("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \ funcname); \ } else { \ - qDebug("streamState str: %s failed to read from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \ + qError("streamState str: %s failed to read from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \ err); \ taosMemoryFreeClear(err); \ } \ @@ -1068,11 +1068,11 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa char* p = NULL; \ int32_t len = ginitDict[i].deValueFunc(val, len, NULL, (char**)pVal); \ if (len < 0) { \ - qDebug("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, pState->pTdbState->idstr, \ + qError("streamState str: %s failed to read from %s_%s, err: already ttl ", toString, pState->pTdbState->idstr, \ funcname); \ code = -1; \ } else { \ - qDebug("streamState str: %s succ to read from %s_%s, valLen:%d", toString, pState->pTdbState->idstr, funcname, \ + qTrace("streamState str: %s succ to read from %s_%s, valLen:%d", toString, pState->pTdbState->idstr, funcname, \ len); \ } \ taosMemoryFree(val); \ @@ -1107,7 +1107,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa taosMemoryFree(err); \ code = -1; \ } else { \ - qDebug("streamState str: %s succ to del from %s_%s", toString, pState->pTdbState->idstr, funcname); \ + qTrace("streamState str: %s succ to del from %s_%s", toString, pState->pTdbState->idstr, funcname); \ } \ } while (0); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 401a8b9e74..042ea373a1 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -157,7 +157,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) tEncodeStreamRetrieveReq(&encoder, &req); tEncoderClear(&encoder); - SRpcMsg rpcMsg = { .code = 0, .msgType = TDMT_STREAM_RETRIEVE, .pCont = buf, .contLen = sizeof(SMsgHead) + len }; + SRpcMsg rpcMsg = {.code = 0, .msgType = TDMT_STREAM_RETRIEVE, .pCont = buf, .contLen = sizeof(SMsgHead) + len}; if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) { ASSERT(0); goto CLEAR; From 64edab63025818aa807737cab687f31d0c9206c9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 1 Jun 2023 07:51:52 +0000 Subject: [PATCH 2/6] refactor code --- source/libs/stream/src/streamBackendRocksdb.c | 30 +++++++++---------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 9d94a28c8a..eabec058d7 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1134,31 +1134,29 @@ int32_t streamStateDel_rocksdb(SStreamState* pState, const SWinKey* key) { int32_t streamStateClear_rocksdb(SStreamState* pState) { qDebug("streamStateClear_rocksdb"); - SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number}; - SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number}; char sKeyStr[128] = {0}; char eKeyStr[128] = {0}; + SStateKey sKey = {.key = {.ts = 0, .groupId = 0}, .opNum = pState->number}; + SStateKey eKey = {.key = {.ts = INT64_MAX, .groupId = UINT64_MAX}, .opNum = pState->number}; int sLen = stateKeyEncode(&sKey, sKeyStr); int eLen = stateKeyEncode(&eKey, eKeyStr); - char toStringStart[128] = {0}; - char toStringEnd[128] = {0}; - if (qDebugFlag & DEBUG_TRACE) { - stateKeyToString(&sKey, toStringStart); - stateKeyToString(&eKey, toStringEnd); - } - - char* err = NULL; if (pState->pTdbState->pHandle[1] != NULL) { + char* err = NULL; rocksdb_delete_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[1], sKeyStr, sLen, eKeyStr, eLen, &err); - } - // rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr, - // eLen); - if (err != NULL) { - qWarn("failed to delete range cf(state) start: %s, end:%s, reason:%s", toStringStart, toStringEnd, err); - taosMemoryFree(err); + if (err != NULL) { + char toStringStart[128] = {0}; + char toStringEnd[128] = {0}; + stateKeyToString(&sKey, toStringStart); + stateKeyToString(&eKey, toStringEnd); + + qWarn("failed to delete range cf(state) start: %s, end:%s, reason:%s", toStringStart, toStringEnd, err); + taosMemoryFree(err); + } else { + rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[1], sKeyStr, sLen, eKeyStr, eLen); + } } return 0; From 0315e895918de6cad123a1e136c45dbbbbe38402 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 1 Jun 2023 11:13:01 +0000 Subject: [PATCH 3/6] Avoid creating the same ID task multiple times --- source/libs/stream/src/streamBackendRocksdb.c | 2 +- source/libs/stream/src/streamMeta.c | 48 ++++++++++--------- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index eabec058d7..bd72566e5a 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -719,7 +719,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t qDebug("succ to open rocksdb cf"); } // close default cf - rocksdb_column_family_handle_destroy(cfHandle[0]); + if (((rocksdb_column_family_handle_t**)cfHandle)[0] != 0) rocksdb_column_family_handle_destroy(cfHandle[0]); rocksdb_options_destroy(cfOpts[0]); handle->db = db; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 98e63f7f51..e3f8b12367 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -205,24 +205,25 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { // add to the ready tasks hash map, not the restored tasks hash map int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { - if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { - tFreeStreamTask(pTask); - return -1; - } - - if (streamMetaSaveTask(pMeta, pTask) < 0) { - tFreeStreamTask(pTask); - return -1; - } - - if (streamMetaCommit(pMeta) < 0) { - tFreeStreamTask(pTask); - return -1; - } - void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); if (p == NULL) { + if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { + tFreeStreamTask(pTask); + return -1; + } + + if (streamMetaSaveTask(pMeta, pTask) < 0) { + tFreeStreamTask(pTask); + return -1; + } + + if (streamMetaCommit(pMeta) < 0) { + tFreeStreamTask(pTask); + return -1; + } taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); + } else { + return 0; } taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES); @@ -359,18 +360,19 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { tDecodeStreamTask(&decoder, pTask); tDecoderClear(&decoder); - if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.version) < 0) { - tdbFree(pKey); - tdbFree(pVal); - tdbTbcClose(pCur); - return -1; - } - + // remove duplicate void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); if (p == NULL) { + if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.version) < 0) { + tdbFree(pKey); + tdbFree(pVal); + tdbTbcClose(pCur); + return -1; + } taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); + } else { + continue; } - if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, sizeof(void*)) < 0) { tdbFree(pKey); tdbFree(pVal); From 32c3cfd51a767baea59c756e4302c5410f36ad03 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 1 Jun 2023 11:13:27 +0000 Subject: [PATCH 4/6] Avoid creating the same ID task multiple times --- include/libs/stream/tstream.h | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1d4bbf073e..8316e6ef50 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -206,7 +206,7 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) { void* streamQueueNextItem(SStreamQueue* queue); SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type); -void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit); +void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit); SStreamDataSubmit* streamSubmitBlockClone(SStreamDataSubmit* pSubmit); @@ -284,7 +284,7 @@ struct SStreamTask { int16_t dispatchMsgType; SStreamStatus status; int32_t selfChildId; - int32_t nodeId; // vgroup id + int32_t nodeId; // vgroup id SEpSet epSet; SCheckpointInfo chkInfo; STaskExec exec; @@ -346,12 +346,14 @@ typedef struct SStreamMeta { void* streamBackend; int32_t streamBackendId; int64_t streamBackendRid; + SHashObj* pTaskBackendUnique; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo); -SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam, SArray* pTaskList); +SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam, + SArray* pTaskList); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); void tFreeStreamTask(SStreamTask* pTask); From 1571ea844af5bdc52407326b37d081b1c9d55d24 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 1 Jun 2023 11:25:55 +0000 Subject: [PATCH 5/6] Avoid creating the same ID task multiple times --- source/libs/stream/src/streamMeta.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index e3f8b12367..8c26052fdb 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -367,16 +367,22 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); + taosMemoryFree(pTask); return -1; } taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); } else { + tdbFree(pKey); + tdbFree(pVal); + tdbTbcClose(pCur); + taosMemoryFree(pTask); continue; } if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, sizeof(void*)) < 0) { tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); + taosMemoryFree(pTask); return -1; } From 420ae73728044b13fa51743adebfba303240e480 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 2 Jun 2023 00:35:45 +0000 Subject: [PATCH 6/6] Avoid creating the same ID task multiple times --- source/libs/stream/src/streamBackendRocksdb.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index bd72566e5a..c743ecf7f7 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -128,7 +128,9 @@ void* streamBackendInit(const char* path) { */ streamStateOpenBackendCf(pHandle, (char*)path, cfs, nCf); } - rocksdb_list_column_families_destroy(cfs, nCf); + if (cfs != NULL) { + rocksdb_list_column_families_destroy(cfs, nCf); + } return (void*)pHandle; _EXIT: