diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index cfc2b08ab6..bd0c1b0725 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -26,8 +26,8 @@ #include "mndUser.h" #include "mndVgroup.h" #include "parser.h" -#include "tname.h" #include "tmisce.h" +#include "tname.h" #define MND_STREAM_VER_NUMBER 3 #define MND_STREAM_RESERVE_SIZE 64 @@ -36,18 +36,18 @@ typedef struct SNodeEntry { int32_t nodeId; - SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes. - int64_t hbTimestamp; // second + SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes. + int64_t hbTimestamp; // second } SNodeEntry; typedef struct SStreamVnodeRevertIndex { - SArray* pNodeEntryList; - int64_t ts; // snapshot ts + SArray *pNodeEntryList; + int64_t ts; // snapshot ts } SStreamVnodeRevertIndex; typedef struct SVgroupChangeInfo { - SHashObj* pDBMap; - SArray* pUpdateNodeList; //SArray + SHashObj *pDBMap; + SArray *pUpdateNodeList; // SArray } SVgroupChangeInfo; static int32_t mndNodeCheckSentinel = 0; @@ -75,8 +75,8 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); -static int32_t mndPersistTransLog(SStreamObj* pStream, STrans* pTrans); -static void initTransAction(STransAction* pAction, void* pCont, int32_t contLen, int32_t msgType, const SEpSet* pEpset); +static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans); +static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset); int32_t mndInitStream(SMnode *pMnode) { SSdbTable table = { @@ -103,7 +103,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint); -// mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); + // mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq); @@ -179,6 +179,7 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { if (sver != MND_STREAM_VER_NUMBER) { terrno = 0; + mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER); goto STREAM_DECODE_OVER; } @@ -784,16 +785,16 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { } } -// pDb = mndAcquireDb(pMnode, streamObj.sourceDb); -// if (pDb->cfg.replications != 1) { -// mError("stream source db must have only 1 replica, but %s has %d", pDb->name, pDb->cfg.replications); -// terrno = TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB; -// mndReleaseDb(pMnode, pDb); -// pDb = NULL; -// goto _OVER; -// } + // pDb = mndAcquireDb(pMnode, streamObj.sourceDb); + // if (pDb->cfg.replications != 1) { + // mError("stream source db must have only 1 replica, but %s has %d", pDb->name, pDb->cfg.replications); + // terrno = TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB; + // mndReleaseDb(pMnode, pDb); + // pDb = NULL; + // goto _OVER; + // } -// mndReleaseDb(pMnode, pDb); + // mndReleaseDb(pMnode, pDb); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream"); if (pTrans == NULL) { @@ -875,7 +876,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { pMsg->checkpointId = checkpointId; int32_t size = sizeof(SMStreamDoCheckpointMsg); - SRpcMsg rpcMsg = { .msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size}; + SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size}; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); return 0; } @@ -1057,7 +1058,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream } STransAction action = {0}; - SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); + SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj); initTransAction(&action, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset); mndReleaseVgroup(pMnode, pVgObj); @@ -1174,6 +1175,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { sdbRelease(pMnode->pSdb, pStream); return -1; } + mInfo("trans:%d, used to drop stream:%s", pTrans->id, dropReq.name); mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb); @@ -1182,6 +1184,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { mndTransDrop(pTrans); return -1; } + // mndTransSetSerial(pTrans); // drop all tasks if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { @@ -1747,7 +1750,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return TSDB_CODE_ACTION_IN_PROGRESS; } -static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg* pMsg, const SVgroupChangeInfo* pInfo, int64_t streamId, int32_t taskId) { +static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, int64_t streamId, + int32_t taskId) { pMsg->streamId = streamId; pMsg->taskId = taskId; pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo)); @@ -1793,7 +1797,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha return TSDB_CODE_SUCCESS; } -int32_t mndPersistTransLog(SStreamObj* pStream, STrans* pTrans) { +int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans) { SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream); if (pCommitRaw == NULL) { mError("failed to encode stream since %s", terrstr()); @@ -1817,7 +1821,7 @@ int32_t mndPersistTransLog(SStreamObj* pStream, STrans* pTrans) { return 0; } -void initTransAction(STransAction* pAction, void* pCont, int32_t contLen, int32_t msgType, const SEpSet* pEpset) { +void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset) { pAction->epSet = *pEpset; pAction->contLen = contLen; pAction->pCont = pCont; @@ -1890,11 +1894,11 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr return TSDB_CODE_ACTION_IN_PROGRESS; } -static bool isNodeEpsetChanged(const SEpSet* pPrevEpset, const SEpSet* pCurrent) { - const SEp* pEp = GET_ACTIVE_EP(pPrevEpset); +static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) { + const SEp *pEp = GET_ACTIVE_EP(pPrevEpset); - for(int32_t i = 0; i < pCurrent->numOfEps; ++i) { - const SEp* p = &(pCurrent->eps[i]); + for (int32_t i = 0; i < pCurrent->numOfEps; ++i) { + const SEp *p = &(pCurrent->eps[i]); if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) { return false; } @@ -1949,12 +1953,12 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP return info; } -static SArray* mndTakeVgroupSnapshot(SMnode* pMnode) { +static SArray *mndTakeVgroupSnapshot(SMnode *pMnode) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; SVgObj *pVgroup = NULL; - SArray* pVgroupListSnapshot = taosArrayInit(4, sizeof(SNodeEntry)); + SArray *pVgroupListSnapshot = taosArrayInit(4, sizeof(SNodeEntry)); while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); @@ -1974,7 +1978,7 @@ static SArray* mndTakeVgroupSnapshot(SMnode* pMnode) { return pVgroupListSnapshot; } -static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChangeInfo) { +static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) { SSdb *pSdb = pMnode->pSdb; // check all streams that involved this vnode should update the epset info @@ -1986,14 +1990,14 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChange break; } - void* p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb)); - void* p1 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb)); + void *p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb)); + void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb)); if (p == NULL && p1 == NULL) { mndReleaseStream(pMnode, pStream); continue; } - mDebug("stream:0x%"PRIx64" involved node changed, create update trans", pStream->uid); + mDebug("stream:0x%" PRIx64 " involved node changed, create update trans", pStream->uid); int32_t code = createStreamUpdateTrans(pMnode, pStream, pChangeInfo); if (code != TSDB_CODE_SUCCESS) { // todo @@ -2003,12 +2007,12 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChange return 0; } -static SArray* doExtractNodeListFromStream(SMnode *pMnode) { +static SArray *doExtractNodeListFromStream(SMnode *pMnode) { SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; void *pIter = NULL; - SHashObj* pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) { @@ -2024,7 +2028,7 @@ static SArray* doExtractNodeListFromStream(SMnode *pMnode) { int32_t numOfTasks = taosArrayGetSize(pLevel); for (int32_t k = 0; k < numOfTasks; ++k) { SStreamTask *pTask = taosArrayGetP(pLevel, k); - SNodeEntry entry = {0}; + SNodeEntry entry = {0}; epsetAssign(&entry.epset, &pTask->info.epSet); entry.nodeId = pTask->info.nodeId; entry.hbTimestamp = -1; @@ -2034,14 +2038,15 @@ static SArray* doExtractNodeListFromStream(SMnode *pMnode) { } taosWUnLockLatch(&pStream->lock); + sdbRelease(pSdb, pStream); } - SArray* plist = taosArrayInit(taosHashGetSize(pHash), sizeof(SNodeEntry)); + SArray *plist = taosArrayInit(taosHashGetSize(pHash), sizeof(SNodeEntry)); // convert to list pIter = NULL; - while((pIter = taosHashIterate(pHash, pIter)) != NULL) { - SNodeEntry* pEntry = (SNodeEntry*) pIter; + while ((pIter = taosHashIterate(pHash, pIter)) != NULL) { + SNodeEntry *pEntry = (SNodeEntry *)pIter; taosArrayPush(plist, pEntry); } taosHashCleanup(pHash); @@ -2094,7 +2099,8 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { return 0; } -typedef struct SMStreamNodeCheckMsg{} SMStreamNodeCheckMsg; +typedef struct SMStreamNodeCheckMsg { +} SMStreamNodeCheckMsg; static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; @@ -2104,13 +2110,14 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { } SMStreamNodeCheckMsg *pMsg = rpcMallocCont(sizeof(SMStreamNodeCheckMsg)); - SRpcMsg rpcMsg = { .msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = sizeof(SMStreamNodeCheckMsg)}; + SRpcMsg rpcMsg = { + .msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = sizeof(SMStreamNodeCheckMsg)}; tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); return 0; } // todo: this process should be executed by the write queue worker of the mnode -//int32_t mndProcessStreamHb(SRpcMsg *pReq) { +// int32_t mndProcessStreamHb(SRpcMsg *pReq) { // SMnode *pMnode = pReq->info.node; // SSdb *pSdb = pMnode->pSdb; // SStreamHbMsg req = {0}; diff --git a/source/dnode/mnode/sdb/src/sdbRaw.c b/source/dnode/mnode/sdb/src/sdbRaw.c index 3a16ee3f13..244e50b52e 100644 --- a/source/dnode/mnode/sdb/src/sdbRaw.c +++ b/source/dnode/mnode/sdb/src/sdbRaw.c @@ -46,7 +46,7 @@ SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) { void sdbFreeRaw(SSdbRaw *pRaw) { if (pRaw != NULL) { #if 1 - mTrace("raw:%p, is freed", pRaw); + mTrace("raw:%p, is freed, len:%d, table:%s", pRaw, pRaw->dataLen, sdbTableName(pRaw->type)); #endif taosMemoryFree(pRaw); } diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 920ec8a254..6ab39ce81b 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1411,8 +1411,8 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t return 0; } int streamStateOpenBackend(void* backend, SStreamState* pState) { - qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId); - // taosAcquireRef(streamBackendId, pState->streamBackendRid); + // qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId); + taosAcquireRef(streamBackendId, pState->streamBackendRid); SBackendWrapper* handle = backend; SBackendCfWrapper* pBackendCfWrapper = taosMemoryCalloc(1, sizeof(SBackendCfWrapper)); @@ -1495,6 +1495,9 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) { void streamStateCloseBackend(SStreamState* pState, bool remove) { SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; SBackendWrapper* pHandle = wrapper->pBackend; + + qInfo("start to close state on backend: %p", pHandle); + taosThreadMutexLock(&pHandle->cfMutex); RocksdbCfInst** ppInst = taosHashGet(pHandle->cfInst, wrapper->idstr, strlen(pState->pTdbState->idstr) + 1); if (ppInst != NULL && *ppInst != NULL) { @@ -1505,7 +1508,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) { taosThreadMutexUnlock(&pHandle->cfMutex); char* status[] = {"close", "drop"}; - qInfo("start to close %s state %p on backendWrapper %p %s", status[remove == false ? 0 : 1], pState, wrapper, + qInfo("start to %s state %p on backendWrapper %p %s", status[remove == false ? 0 : 1], pState, wrapper, wrapper->idstr); wrapper->remove |= remove; // update by other pState taosReleaseRef(streamBackendCfWrapperId, pState->pTdbState->backendCfWrapperId); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index b1ed36de18..26904ebbfb 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -138,7 +138,8 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz int64_t id = *(int64_t*)uniqueId; pState->pTdbState->backendCfWrapperId = id; pState->pTdbState->pBackendCfWrapper = taosAcquireRef(streamBackendCfWrapperId, id); - + // already exist stream task for + qInfo("already exist stream state for %s", pState->pTdbState->idstr); taosAcquireRef(streamBackendId, pState->streamBackendRid); } taosThreadMutexUnlock(&pMeta->backendMutex); @@ -148,6 +149,8 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT); pState->parNameMap = tSimpleHashInit(1024, hashFn); + qInfo("succ to open state %p on backend %p 0x%" PRIx64 "-%d", pState, pMeta->streamBackend, pState->streamId, + pState->taskId); return pState; #else