From 2004c1a34681a67d433ccc9624da031fbf52540f Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 17 Oct 2023 18:19:52 +0800 Subject: [PATCH] fix transfer crash --- include/libs/stream/tstream.h | 2 +- source/dnode/vnode/src/tq/tq.c | 8 ++---- source/dnode/vnode/src/tq/tqStreamTaskSnap.c | 3 -- source/libs/stream/inc/streamBackendRocksdb.h | 7 +++-- source/libs/stream/inc/streamInt.h | 1 + source/libs/stream/src/streamBackendRocksdb.c | 9 +++++- source/libs/stream/src/streamMeta.c | 28 +++++++++++++------ source/libs/stream/src/streamSnapshot.c | 27 ++++++++++-------- source/libs/stream/src/streamTask.c | 5 ++++ 9 files changed, 58 insertions(+), 32 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1d4e87e2bd..04866cb3c4 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -727,7 +727,7 @@ int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta); -void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t chkpId, int64_t* ref); +int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 778a01719f..6f1675f528 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -738,11 +738,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, ver); if (code != TSDB_CODE_SUCCESS) return code; - pTask->pBackend = streamMetaGetBackendByTaskKey(pTq->pStreamMeta, (char*)pTask->id.idStr, pTask->checkpointingId, - &pTask->backendRefId); - - // taskDbUpdateChkpId(pTask->pBackend, pTask->checkpointingId); - if (pTask->pBackend == NULL) return -1; + // code = streamTaskSetDb(pTq->pStreamMeta, pTask); + // taskDbUpdateChkpId(pTask->pBackend, pTask->checkpointingId); + // if (pTask->pBackend == NULL) return -1; streamTaskOpenAllUpstreamInput(pTask); diff --git a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c index 7b3f1aac6d..566affa50d 100644 --- a/source/dnode/vnode/src/tq/tqStreamTaskSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamTaskSnap.c @@ -242,11 +242,8 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t goto _err; } tDecoderClear(&decoder); - // tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) - taosWLockLatch(&pTq->pStreamMeta->lock); int64_t key[2] = {task.streamId, task.taskId}; - taosWLockLatch(&pTq->pStreamMeta->lock); if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr), pTq->pStreamMeta->txn) < 0) { diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 637fbf1017..c108acce3e 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -66,8 +66,11 @@ typedef struct { TdThreadMutex mutex; char* idstr; char* path; - int64_t refId; + int64_t refId; + void* pTask; + int64_t streamId; + int64_t taskId; int64_t chkpId; SArray* chkpSaved; SArray* chkpInUse; @@ -184,7 +187,7 @@ int32_t streamBackendTriggerChkp(void* pMeta, char* dst); int32_t streamBackendAddInUseChkp(void* arg, int64_t chkpId); int32_t streamBackendDelInUseChkp(void* arg, int64_t chkpId); -int32_t taskDbBuildSnap(void* arg); +int32_t taskDbBuildSnap(void* arg, SArray* pSnap); // int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); #endif \ No newline at end of file diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index a5e7531f7e..801b1d6211 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -44,6 +44,7 @@ typedef struct { int64_t streamId; int64_t taskId; int64_t chkpId; + char* dbPrefixPath; } SStreamTaskSnap; extern SStreamGlobalEnv streamEnv; extern int32_t streamBackendId; diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 48e9d506d6..8aa6878b10 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1004,7 +1004,7 @@ int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpI return 0; } -int32_t taskDbBuildSnap(void* arg) { +int32_t taskDbBuildSnap(void* arg, SArray* pSnap) { SStreamMeta* pMeta = arg; void* pIter = taosHashIterate(pMeta->pTaskDbUnique, NULL); int32_t code = 0; @@ -1017,6 +1017,13 @@ int32_t taskDbBuildSnap(void* arg) { taskDbRemoveRef(pTaskDb); pIter = taosHashIterate(pMeta->pTaskDbUnique, pIter); + + SStreamTask* pTask = pTaskDb->pTask; + SStreamTaskSnap snap = {.streamId = pTask->id.streamId, + .taskId = pTask->id.taskId, + .chkpId = pTaskDb->chkpId, + .dbPrefixPath = taosStrdup(pTaskDb->path)}; + taosArrayPush(pSnap, &snap); } return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 656b115bcf..3ba360a4ae 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -222,28 +222,40 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) { return 0; } -void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t chkpId, int64_t* ref) { +int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg) { + SStreamTask* pTask = arg; + + char* key = (char*)pTask->id.idStr; + int64_t chkpId = pTask->checkpointingId; + taosThreadMutexLock(&pMeta->backendMutex); void** ppBackend = taosHashGet(pMeta->pTaskDbUnique, key, strlen(key)); if (ppBackend != NULL && *ppBackend != NULL) { taskDbAddRef(*ppBackend); - *ref = ((STaskDbWrapper*)*ppBackend)->refId; + + STaskDbWrapper* pBackend = *ppBackend; + + pTask->backendRefId = pBackend->refId; + pTask->pBackend = pBackend; taosThreadMutexUnlock(&pMeta->backendMutex); - return *ppBackend; + return 0; } - void* pBackend = taskDbOpen(pMeta->path, key, chkpId); + STaskDbWrapper* pBackend = taskDbOpen(pMeta->path, key, chkpId); if (pBackend == NULL) { taosThreadMutexUnlock(&pMeta->backendMutex); - return NULL; + return -1; } + int64_t tref = taosAddRef(taskDbWrapperId, pBackend); - *ref = tref; - ((STaskDbWrapper*)pBackend)->refId = tref; + pTask->backendRefId = tref; + pTask->pBackend = pBackend; + pBackend->refId = tref; + pBackend->pTask = pTask; taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*)); taosThreadMutexUnlock(&pMeta->backendMutex); - return pBackend; + return 0; } SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId, int64_t stage) { int32_t code = -1; diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 676effe778..88e47e127b 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -139,7 +139,7 @@ TdFilePtr streamOpenFile(char* path, char* name, int32_t opt) { return taosOpenFile(fullname, opt); } -int32_t streamTaskDbGetSnapInfo(void* arg, char* path) { return taskDbBuildSnap(arg); } +int32_t streamTaskDbGetSnapInfo(void* arg, char* path, SArray* pSnap) { return taskDbBuildSnap(arg, pSnap); } void snapFileDebugInfo(SBackendSnapFile2* pSnapFile) { if (qDebugFlag & DEBUG_DEBUG) { @@ -236,16 +236,13 @@ int32_t snapFileReadMeta(SBackendSnapFile2* pSnapFile) { return 0; } int32_t streamBackendSnapInitFile(char* metaPath, SStreamTaskSnap* pSnap, SBackendSnapFile2* pSnapFile) { - // SBanckendFile* pFile = taosMemoryCalloc(1, sizeof(SBanckendFile)); int32_t code = -1; - char* path = taosMemoryCalloc(1, strlen(metaPath) + 256); - char idstr[64] = {0}; - sprintf(idstr, "0x%" PRIx64 "-0x%x", pSnap->streamId, (int32_t)(pSnap->taskId)); - - sprintf(path, "%s%s%s%s%s%s%s%" PRId64 "", metaPath, TD_DIRSEP, idstr, TD_DIRSEP, "checkpoints", TD_DIRSEP, - "checkpoint", pSnap->chkpId); - if (taosIsDir(path)) { + char* path = taosMemoryCalloc(1, strlen(pSnap->dbPrefixPath) + 256); + // char idstr[64] = {0}; + sprintf(path, "%s%s%s%s%s%" PRId64 "", pSnap->dbPrefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", + pSnap->chkpId); + if (!taosIsDir(path)) { goto _ERROR; } @@ -261,7 +258,7 @@ int32_t streamBackendSnapInitFile(char* metaPath, SStreamTaskSnap* pSnap, SBacke } snapFileDebugInfo(pSnapFile); - + path = NULL; code = 0; _ERROR: @@ -294,8 +291,8 @@ void snapFileDestroy(SBackendSnapFile2* pSnap) { int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta) { // impl later - SArray* pSnapSet = NULL; - int32_t code = streamTaskDbGetSnapInfo(pMeta, path); + SArray* pSnapSet = taosArrayInit(4, sizeof(SStreamTaskSnap)); + int32_t code = streamTaskDbGetSnapInfo(pMeta, path, pSnapSet); if (code != 0) { return -1; } @@ -310,6 +307,11 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, void* pMeta ASSERT(code == 0); taosArrayPush(pDbSnapSet, &snapFile); } + for (int i = 0; i < taosArrayGetSize(pSnapSet); i++) { + SStreamTaskSnap* pSnap = taosArrayGet(pSnapSet, i); + taosMemoryFree(pSnap->dbPrefixPath); + } + taosArrayDestroy(pSnapSet); pHandle->pDbSnapSet = pDbSnapSet; pHandle->currIdx = 0; @@ -389,6 +391,7 @@ _NEXT: item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx); } } + item = taosArrayGet(pSnapFile->pFileList, pSnapFile->currFileIdx); qDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", STREAM_STATE_TRANSFER, item->name, (int64_t)pSnapFile->offset, item->size, pSnapFile->currFileIdx); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 8da515a650..7295323ec1 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -356,6 +356,7 @@ void tFreeStreamTask(SStreamTask* pTask) { } if (pTask->pBackend) { taskDbRemoveRef(pTask->pBackend); + pTask->pBackend = NULL; } @@ -390,6 +391,10 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i streamTaskInitTokenBucket(&pTask->tokenBucket, 150, 100); taosThreadMutexInit(&pTask->lock, NULL); + + if (streamTaskSetDb(pMeta, pTask) != 0) { + return -1; + } streamTaskOpenAllUpstreamInput(pTask); return TSDB_CODE_SUCCESS;