From 090515c684a81b690fbe4dacda81ad1e912badb7 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 24 Aug 2023 17:20:04 +0800 Subject: [PATCH] refactor code --- source/dnode/vnode/src/tq/tq.c | 14 +++++----- source/dnode/vnode/src/tq/tqRestore.c | 23 ++++++++-------- source/libs/stream/src/streamBackendRocksdb.c | 26 +++++++++---------- source/libs/stream/src/streamMeta.c | 3 +++ 4 files changed, 35 insertions(+), 31 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f4e6c8c919..440813e262 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -219,7 +219,7 @@ void tqNotifyClose(STQ* pTq) { taosWUnLockLatch(&pMeta->lock); pMeta->killed = STREAM_META_WILL_STOP; - while(pMeta->killed != STREAM_META_OK_TO_STOP) { + while (pMeta->killed != STREAM_META_OK_TO_STOP) { taosMsleep(100); tqDebug("vgId:%d wait for meta to stop timer", pMeta->vgId); } @@ -1799,8 +1799,8 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { // set the initial value for generating check point // set the mgmt epset info according to the checkout source msg from mnode, todo opt perf -// pMeta->mgmtInfo.epset = req.mgmtEps; -// pMeta->mgmtInfo.mnodeId = req.mnodeId; + // pMeta->mgmtInfo.epset = req.mgmtEps; + // pMeta->mgmtInfo.mnodeId = req.mnodeId; if (pMeta->chkptNotReadyTasks == 0) { pMeta->chkptNotReadyTasks = taosArrayGetSize(pMeta->pTaskList); @@ -1908,9 +1908,9 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { // all tasks are closed, now let's restart the stream meta if (pMeta->closedTask == numOfCount) { tqDebug("vgId:%d all tasks are updated, commit the update nodeInfo", vgId); -// if (streamMetaCommit(pMeta) < 0) { - // persist to disk -// } + // if (streamMetaCommit(pMeta) < 0) { + // persist to disk + // } restartTasks = true; pMeta->closedTask = 0; // reset value } else { @@ -1921,7 +1921,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { _end: tDecoderClear(&decoder); -// tmsgSendRsp(&rsp); + // tmsgSendRsp(&rsp); if (restartTasks) { tqDebug("vgId:%d all tasks are stopped, restart them", vgId); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index cfef019e4e..63dceb7dc6 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -13,11 +13,11 @@ * along with this program. If not, see . */ -#include "vnd.h" #include "tq.h" +#include "vnd.h" static int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle); -static int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId); +static int32_t doSetOffsetForWalReader(SStreamTask* pTask, int32_t vgId); // this function should be executed by stream threads. // extract submit block from WAL, and add them into the input queue for the sources tasks. @@ -167,12 +167,12 @@ int32_t tqStartStreamTasks(STQ* pTq) { return 0; } -int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) { +int32_t doSetOffsetForWalReader(SStreamTask* pTask, int32_t vgId) { // seek the stored version and extract data from WAL int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader); if (pTask->chkInfo.currentVer < firstVer) { - tqWarn("vgId:%d s-task:%s ver:%"PRId64" earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64, vgId, - pTask->id.idStr, pTask->chkInfo.currentVer, firstVer, firstVer); + tqWarn("vgId:%d s-task:%s ver:%" PRId64 " earlier than the first ver of wal range %" PRId64 ", forward to %" PRId64, + vgId, pTask->id.idStr, pTask->chkInfo.currentVer, firstVer, firstVer); pTask->chkInfo.currentVer = firstVer; @@ -193,7 +193,8 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) { } // append the data for the stream - tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); + tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr, + pTask->chkInfo.currentVer); } } @@ -223,7 +224,7 @@ static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) { double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); appendTranstateIntoInputQ(pTask); - /*int32_t code = */streamSchedExec(pTask); + /*int32_t code = */ streamSchedExec(pTask); } else { qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal", id, ver, maxVer); @@ -278,7 +279,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { ASSERT(status == TASK_STATUS__NORMAL); // the maximum version of data in the WAL has reached already, the step2 is done tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr, - pTask->dataRange.range.maxVer); + pTask->dataRange.range.maxVer); streamMetaReleaseTask(pStreamMeta, pTask); continue; } @@ -306,10 +307,10 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { } int32_t numOfItems = streamTaskGetInputQItems(pTask); - int64_t maxVer = (pTask->info.fillHistory == 1)? pTask->dataRange.range.maxVer:INT64_MAX; + int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX; SStreamQueueItem* pItem = NULL; - code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, maxVer, pTask->id.idStr); + code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, pTask->id.idStr); if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItems == 0)) { // failed, continue checkForFillHistoryVerRange(pTask, walReaderGetCurrentVer(pTask->exec.pWalReader)); @@ -321,6 +322,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { if (pTask->status.taskStatus != TASK_STATUS__NORMAL) { tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus); + streamMetaReleaseTask(pStreamMeta, pTask); taosThreadMutexUnlock(&pTask->lock); continue; } @@ -360,4 +362,3 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { taosArrayDestroy(pTaskList); return 0; } - diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 34d22f9ac2..5ada8e05a6 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -439,7 +439,7 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) { if (code != 0) { qError("failed to restart stream backend from %s, reason: %s", chkp, tstrerror(TAOS_SYSTEM_ERROR(errno))); } else { - qInfo("succ to restart stream backend at checkpoint path: %s", chkp); + qInfo("start to restart stream backend at checkpoint path: %s", chkp); } } else { @@ -510,7 +510,11 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId) { /* list all cf and get prefix */ - streamStateOpenBackendCf(pHandle, (char*)backendPath, cfs, nCf); + code = streamStateOpenBackendCf(pHandle, (char*)backendPath, cfs, nCf); + if (code != 0) { + rocksdb_list_column_families_destroy(cfs, nCf); + goto _EXIT; + } } if (cfs != NULL) { rocksdb_list_column_families_destroy(cfs, nCf); @@ -545,16 +549,6 @@ void streamBackendCleanup(void* arg) { taosHashCleanup(pHandle->cfInst); if (pHandle->db) { - // char* err = NULL; - // rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create(); - // rocksdb_flushoptions_set_wait(flushOpt, 1); - // rocksdb_flush(pHandle->db, flushOpt, &err); - - // if (err != NULL) { - // qError("failed to flush db before streamBackend clean up, reason:%s", err); - // taosMemoryFree(err); - // } - // rocksdb_flushoptions_destroy(flushOpt); rocksdb_close(pHandle->db); } rocksdb_options_destroy(pHandle->dbOpt); @@ -1480,6 +1474,12 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t if (err != NULL) { qError("failed to open rocksdb cf, reason:%s", err); taosMemoryFree(err); + taosMemoryFree(cfHandle); + taosMemoryFree(pCompare); + taosMemoryFree(params); + taosMemoryFree(cfOpts); + // fix other leak + return -1; } else { qDebug("succ to open rocksdb cf"); } @@ -2851,7 +2851,7 @@ int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) { taosMemoryFree(err); return -1; } else { - qDebug("write batch to backend opt: %p", wrapper->pBackend); + qDebug("write batch to backend:%p", wrapper->pBackend); } return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 9a0f6a03d3..69c6766663 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -107,8 +107,10 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); if (pMeta->streamBackend == NULL) { qError("vgId:%d failed to init stream backend", pMeta->vgId); + qInfo("vgId:%d retry to init stream backend", pMeta->vgId); } } + // if (pMeta->streamBackend == NULL) { // goto _err; // } @@ -172,6 +174,7 @@ int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId); if (pMeta->streamBackend == NULL) { qError("vgId:%d failed to init stream backend", pMeta->vgId); + qInfo("vgId:%d retry to init stream backend", pMeta->vgId); // return -1; } }