Merge branch '3.0' into enh/refactorBackend

This commit is contained in:
yihaoDeng 2023-11-24 11:10:39 +08:00
parent 50f996bbf7
commit 23ae62d268
2 changed files with 37 additions and 38 deletions

View File

@ -27,9 +27,9 @@
#include "tmisce.h"
#include "tname.h"
#define MND_STREAM_VER_NUMBER 4
#define MND_STREAM_RESERVE_SIZE 64
#define MND_STREAM_MAX_NUM 60
#define MND_STREAM_VER_NUMBER 4
#define MND_STREAM_RESERVE_SIZE 64
#define MND_STREAM_MAX_NUM 60
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
#define MND_STREAM_PAUSE_NAME "stream-pause"
@ -77,7 +77,7 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char* pMsg);
static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg);
static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans);
static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset,
int32_t retryCode);
@ -85,9 +85,9 @@ static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *p
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static void saveStreamTasksInfo(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
static int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDbName, size_t len);
static int32_t killActiveCheckpointTrans(SMnode *pMnode, const char *pDbName, size_t len);
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
static void freeCheckpointCandEntry(void*);
static void freeCheckpointCandEntry(void *);
static SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
@ -168,9 +168,8 @@ void mndCleanupStream(SMnode *pMnode) {
taosArrayDestroy(execInfo.pTaskList);
taosHashCleanup(execInfo.pTaskMap);
taosHashCleanup(execInfo.transMgmt.pDBTrans);
taosThreadMutexDestroy(&execInfo.lock);
taosHashCleanup(execInfo.transMgmt.pDBTrans);
taosHashCleanup(execInfo.transMgmt.pWaitingList);
taosThreadMutexDestroy(&execInfo.lock);
mDebug("mnd stream exec info cleanup");
}
@ -333,11 +332,11 @@ static void mndShowStreamStatus(char *dst, SStreamObj *pStream) {
}
}
SSdbRaw * mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
SSdbRow * mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
SSdbRaw *mndStreamSeqActionEncode(SStreamObj *pStream) { return NULL; }
SSdbRow *mndStreamSeqActionDecode(SSdbRaw *pRaw) { return NULL; }
int32_t mndStreamSeqActionInsert(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
int32_t mndStreamSeqActionDelete(SSdb *pSdb, SStreamSeq *pStream) { return 0; }
int32_t mndStreamSeqActionUpdate(SSdb *pSdb, SStreamSeq *pOldStream, SStreamSeq *pNewStream) { return 0; }
static void mndShowStreamTrigger(char *dst, SStreamObj *pStream) {
int8_t trigger = pStream->conf.trigger;
@ -761,7 +760,7 @@ int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream)
static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { // check for number of existed tasks
int32_t numOfStream = 0;
SStreamObj *pStream = NULL;
void *pIter = NULL;
void * pIter = NULL;
while ((pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
if (pStream->sourceDbUid == pStreamObj->sourceDbUid) {
@ -1087,7 +1086,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
}
STransAction act = {0};
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
initTransAction(&act, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY);
@ -1265,7 +1264,7 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) {
}
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
STaskId * p = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
if (pEntry == NULL) {
continue;
@ -1280,13 +1279,13 @@ static int32_t mndCheckNodeStatus(SMnode *pMnode) {
}
taosThreadMutexUnlock(&execInfo.lock);
return ready? 0:-1;
return ready ? 0 : -1;
}
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SMnode * pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
void * pIter = NULL;
SStreamObj *pStream = NULL;
int32_t code = 0;
@ -1308,7 +1307,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
static int32_t mndProcessStreamCheckpointInCandid(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
void *pIter = NULL;
void * pIter = NULL;
int32_t code = 0;
taosThreadMutexLock(&execInfo.lock);
@ -1327,7 +1326,7 @@ static int32_t mndProcessStreamCheckpointInCandid(SRpcMsg *pReq) {
SCheckpointCandEntry *pEntry = pIter;
SStreamObj *ps = mndAcquireStream(pMnode, pEntry->pName);
mDebug("start to launch checkpoint for stream:%s %"PRIx64" in candidate list", pEntry->pName, pEntry->streamId);
mDebug("start to launch checkpoint for stream:%s %" PRIx64 " in candidate list", pEntry->pName, pEntry->streamId);
code = mndProcessStreamCheckpointTrans(pMnode, ps, pEntry->checkpointId);
mndReleaseStream(pMnode, ps);
@ -2497,13 +2496,13 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
}
// kill all trans in the dst DB
static void killAllCheckpointTrans(SMnode* pMnode, SVgroupChangeInfo* pChangeInfo) {
void* pIter = NULL;
while((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) {
char* pDb = (char*) pIter;
static void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
void *pIter = NULL;
while ((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) {
char *pDb = (char *)pIter;
size_t len = 0;
void* pKey = taosHashGetKey(pDb, &len);
void * pKey = taosHashGetKey(pDb, &len);
killActiveCheckpointTrans(pMnode, pKey, len);
}
}
@ -2652,7 +2651,7 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
}
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char* pMsg) {
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, name);
if (pTrans == NULL) {
mError("failed to build trans:%s, reason: %s", name, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
@ -2660,7 +2659,7 @@ STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const
return NULL;
}
mDebug("s-task:0x%"PRIx64" start to build trans %s", pStream->uid, pMsg);
mDebug("s-task:0x%" PRIx64 " start to build trans %s", pStream->uid, pMsg);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
@ -2740,9 +2739,9 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
return TSDB_CODE_ACTION_IN_PROGRESS;
}
int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDBName, size_t len) {
int32_t killActiveCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
// data in the hash table will be removed automatically, no need to remove it here.
SStreamTransInfo* pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len);
SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, pDBName, len);
if (pTransInfo == NULL) {
return TSDB_CODE_SUCCESS;
}
@ -2753,7 +2752,7 @@ int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDBName, size_t le
return TSDB_CODE_SUCCESS;
}
STrans* pTrans = mndAcquireTrans(pMnode, pTransInfo->transId);
STrans *pTrans = mndAcquireTrans(pMnode, pTransInfo->transId);
if (pTrans != NULL) {
mInfo("kill checkpoint transId:%d in Db:%s", pTransInfo->transId, pDBName);
mndKillTrans(pMnode, pTrans);
@ -2764,7 +2763,7 @@ int32_t killActiveCheckpointTrans(SMnode *pMnode, const char* pDBName, size_t le
}
int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) {
STrans* pTrans = mndAcquireTrans(pMnode, transId);
STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) {
mInfo("kill checkpoint transId:%d to reset task status", transId);
mndKillTrans(pMnode, pTrans);
@ -2783,8 +2782,8 @@ int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) {
bool conflict = mndStreamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb);
if (conflict) {
mError("stream:%s other trans exists in DB:%s & %s failed to start reset-status trans",
pStream->name, pStream->sourceDb, pStream->targetDb);
mError("stream:%s other trans exists in DB:%s & %s failed to start reset-status trans", pStream->name,
pStream->sourceDb, pStream->targetDb);
continue;
}
@ -2928,7 +2927,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
return TSDB_CODE_SUCCESS;
}
void freeCheckpointCandEntry(void* param) {
SCheckpointCandEntry* pEntry = param;
void freeCheckpointCandEntry(void *param) {
SCheckpointCandEntry *pEntry = param;
taosMemoryFreeClear(pEntry->pName);
}

View File

@ -190,7 +190,7 @@ int32_t getCfIdx(const char* cfName) {
}
bool isValidCheckpoint(const char* dir) {
return true;
// return true;
STaskDbWrapper* pDb = taskDbOpenImpl(NULL, NULL, (char*)dir);
if (pDb == NULL) {
return true;