From 32983a11e1c4c85a1d3b1256975fb9200e9264d3 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 10 Aug 2023 20:34:00 +0800 Subject: [PATCH] add log and fix crash --- source/dnode/vnode/src/vnd/vnodeSnapshot.c | 11 ++++++++++- source/libs/stream/src/streamMeta.c | 15 +++++++++------ 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 70d74268c5..a10b38eb64 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -220,20 +220,29 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) } // STREAM ============ + vInfo("stream task start"); if (!pReader->streamTaskDone) { if (pReader->pStreamTaskReader == NULL) { + vInfo("stream task start 1"); code = streamTaskSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamTaskReader); - if (code) goto _err; + if (code) { + vInfo("stream task start err"); + goto _err; + } } code = streamTaskSnapRead(pReader->pStreamTaskReader, ppData); + vInfo("stream task start 2"); if (code) { + vInfo("stream task start 3"); goto _err; } else { if (*ppData) { goto _exit; + vInfo("stream task start 4"); } else { pReader->streamTaskDone = 1; code = streamTaskSnapReaderClose(pReader->pStreamTaskReader); + vInfo("stream task start 5"); if (code) goto _err; pReader->pStreamTaskReader = NULL; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 4b094e4e98..2ac0f6a50c 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -43,7 +43,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); if (pMeta == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - qError("vgId:%d failed to prepare stream meta, alloc size:%"PRIzu", out of memory", vgId, sizeof(SStreamMeta)); + qError("vgId:%d failed to prepare stream meta, alloc size:%" PRIzu ", out of memory", vgId, sizeof(SStreamMeta)); return NULL; } @@ -122,7 +122,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF taosInitRWLatch(&pMeta->lock); taosThreadMutexInit(&pMeta->backendMutex, NULL); - qInfo("vgId:%d open stream meta successfully, latest checkpoint:%"PRId64, vgId, chkpId); + qInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64, vgId, chkpId); return pMeta; _err: @@ -211,7 +211,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa *pAdded = false; int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId}; - void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); + void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (p == NULL) { if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { tFreeStreamTask(pTask); @@ -247,7 +247,7 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) { SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { taosRLockLatch(&pMeta->lock); - int64_t keys[2] = {streamId, taskId}; + int64_t keys[2] = {streamId, taskId}; SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (ppTask != NULL) { if (!streamTaskShouldStop(&(*ppTask)->status)) { @@ -291,7 +291,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t // pre-delete operation taosWLockLatch(&pMeta->lock); - int64_t keys[2] = {streamId, taskId}; + int64_t keys[2] = {streamId, taskId}; SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (ppTask) { pTask = *ppTask; @@ -390,6 +390,9 @@ int64_t streamGetLatestCheckpointId(SStreamMeta* pMeta) { tdbTbcMoveToFirst(pCur); while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { + if (pVal != NULL && vLen != 0) { + break; + } SCheckpointInfo info; tDecoderInit(&decoder, (uint8_t*)pVal, vLen); if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) { @@ -461,7 +464,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) { // do duplicate task check. int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId}; - void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); + void* p = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); if (p == NULL) { if (pMeta->expandFunc(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer) < 0) { tdbFree(pKey);