Merge remote-tracking branch 'origin/enh/triggerCheckPoint2' into enh/triggerCheckPoint2
# Conflicts: # source/dnode/mnode/impl/src/mndStream.c
This commit is contained in:
commit
2a9668da67
|
@ -26,8 +26,8 @@
|
|||
#include "mndUser.h"
|
||||
#include "mndVgroup.h"
|
||||
#include "parser.h"
|
||||
#include "tname.h"
|
||||
#include "tmisce.h"
|
||||
#include "tname.h"
|
||||
|
||||
#define MND_STREAM_VER_NUMBER 3
|
||||
#define MND_STREAM_RESERVE_SIZE 64
|
||||
|
@ -36,18 +36,18 @@
|
|||
|
||||
typedef struct SNodeEntry {
|
||||
int32_t nodeId;
|
||||
SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes.
|
||||
int64_t hbTimestamp; // second
|
||||
SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes.
|
||||
int64_t hbTimestamp; // second
|
||||
} SNodeEntry;
|
||||
|
||||
typedef struct SStreamVnodeRevertIndex {
|
||||
SArray* pNodeEntryList;
|
||||
int64_t ts; // snapshot ts
|
||||
SArray *pNodeEntryList;
|
||||
int64_t ts; // snapshot ts
|
||||
} SStreamVnodeRevertIndex;
|
||||
|
||||
typedef struct SVgroupChangeInfo {
|
||||
SHashObj* pDBMap;
|
||||
SArray* pUpdateNodeList; //SArray<SNodeUpdateInfo>
|
||||
SHashObj *pDBMap;
|
||||
SArray *pUpdateNodeList; // SArray<SNodeUpdateInfo>
|
||||
} SVgroupChangeInfo;
|
||||
|
||||
static int32_t mndNodeCheckSentinel = 0;
|
||||
|
@ -75,8 +75,8 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
|
|||
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
|
||||
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
|
||||
|
||||
static int32_t mndPersistTransLog(SStreamObj* pStream, STrans* pTrans);
|
||||
static void initTransAction(STransAction* pAction, void* pCont, int32_t contLen, int32_t msgType, const SEpSet* pEpset);
|
||||
static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans);
|
||||
static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset);
|
||||
|
||||
int32_t mndInitStream(SMnode *pMnode) {
|
||||
SSdbTable table = {
|
||||
|
@ -103,7 +103,7 @@ int32_t mndInitStream(SMnode *pMnode) {
|
|||
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint);
|
||||
// mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
|
||||
// mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
|
||||
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq);
|
||||
|
||||
|
@ -179,6 +179,7 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
|
|||
|
||||
if (sver != MND_STREAM_VER_NUMBER) {
|
||||
terrno = 0;
|
||||
mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
|
||||
goto STREAM_DECODE_OVER;
|
||||
}
|
||||
|
||||
|
@ -784,16 +785,16 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
// pDb = mndAcquireDb(pMnode, streamObj.sourceDb);
|
||||
// if (pDb->cfg.replications != 1) {
|
||||
// mError("stream source db must have only 1 replica, but %s has %d", pDb->name, pDb->cfg.replications);
|
||||
// terrno = TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB;
|
||||
// mndReleaseDb(pMnode, pDb);
|
||||
// pDb = NULL;
|
||||
// goto _OVER;
|
||||
// }
|
||||
// pDb = mndAcquireDb(pMnode, streamObj.sourceDb);
|
||||
// if (pDb->cfg.replications != 1) {
|
||||
// mError("stream source db must have only 1 replica, but %s has %d", pDb->name, pDb->cfg.replications);
|
||||
// terrno = TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB;
|
||||
// mndReleaseDb(pMnode, pDb);
|
||||
// pDb = NULL;
|
||||
// goto _OVER;
|
||||
// }
|
||||
|
||||
// mndReleaseDb(pMnode, pDb);
|
||||
// mndReleaseDb(pMnode, pDb);
|
||||
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream");
|
||||
if (pTrans == NULL) {
|
||||
|
@ -875,7 +876,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
|
|||
pMsg->checkpointId = checkpointId;
|
||||
|
||||
int32_t size = sizeof(SMStreamDoCheckpointMsg);
|
||||
SRpcMsg rpcMsg = { .msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size};
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size};
|
||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||
return 0;
|
||||
}
|
||||
|
@ -1057,7 +1058,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
|
|||
}
|
||||
|
||||
STransAction action = {0};
|
||||
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
|
||||
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
|
||||
initTransAction(&action, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset);
|
||||
mndReleaseVgroup(pMnode, pVgObj);
|
||||
|
||||
|
@ -1174,6 +1175,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
|||
sdbRelease(pMnode->pSdb, pStream);
|
||||
return -1;
|
||||
}
|
||||
|
||||
mInfo("trans:%d, used to drop stream:%s", pTrans->id, dropReq.name);
|
||||
|
||||
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
|
||||
|
@ -1182,6 +1184,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
|||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
// mndTransSetSerial(pTrans);
|
||||
|
||||
// drop all tasks
|
||||
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
|
||||
|
@ -1747,7 +1750,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
|||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg* pMsg, const SVgroupChangeInfo* pInfo, int64_t streamId, int32_t taskId) {
|
||||
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, int64_t streamId,
|
||||
int32_t taskId) {
|
||||
pMsg->streamId = streamId;
|
||||
pMsg->taskId = taskId;
|
||||
pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo));
|
||||
|
@ -1793,7 +1797,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t mndPersistTransLog(SStreamObj* pStream, STrans* pTrans) {
|
||||
int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans) {
|
||||
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
|
||||
if (pCommitRaw == NULL) {
|
||||
mError("failed to encode stream since %s", terrstr());
|
||||
|
@ -1817,7 +1821,7 @@ int32_t mndPersistTransLog(SStreamObj* pStream, STrans* pTrans) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void initTransAction(STransAction* pAction, void* pCont, int32_t contLen, int32_t msgType, const SEpSet* pEpset) {
|
||||
void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset) {
|
||||
pAction->epSet = *pEpset;
|
||||
pAction->contLen = contLen;
|
||||
pAction->pCont = pCont;
|
||||
|
@ -1890,11 +1894,11 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr
|
|||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
static bool isNodeEpsetChanged(const SEpSet* pPrevEpset, const SEpSet* pCurrent) {
|
||||
const SEp* pEp = GET_ACTIVE_EP(pPrevEpset);
|
||||
static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) {
|
||||
const SEp *pEp = GET_ACTIVE_EP(pPrevEpset);
|
||||
|
||||
for(int32_t i = 0; i < pCurrent->numOfEps; ++i) {
|
||||
const SEp* p = &(pCurrent->eps[i]);
|
||||
for (int32_t i = 0; i < pCurrent->numOfEps; ++i) {
|
||||
const SEp *p = &(pCurrent->eps[i]);
|
||||
if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) {
|
||||
return false;
|
||||
}
|
||||
|
@ -1949,12 +1953,12 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
|
|||
return info;
|
||||
}
|
||||
|
||||
static SArray* mndTakeVgroupSnapshot(SMnode* pMnode) {
|
||||
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
SVgObj *pVgroup = NULL;
|
||||
|
||||
SArray* pVgroupListSnapshot = taosArrayInit(4, sizeof(SNodeEntry));
|
||||
SArray *pVgroupListSnapshot = taosArrayInit(4, sizeof(SNodeEntry));
|
||||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||
|
@ -1974,7 +1978,7 @@ static SArray* mndTakeVgroupSnapshot(SMnode* pMnode) {
|
|||
return pVgroupListSnapshot;
|
||||
}
|
||||
|
||||
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChangeInfo) {
|
||||
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
// check all streams that involved this vnode should update the epset info
|
||||
|
@ -1986,14 +1990,14 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChange
|
|||
break;
|
||||
}
|
||||
|
||||
void* p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
|
||||
void* p1 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb));
|
||||
void *p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
|
||||
void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb));
|
||||
if (p == NULL && p1 == NULL) {
|
||||
mndReleaseStream(pMnode, pStream);
|
||||
continue;
|
||||
}
|
||||
|
||||
mDebug("stream:0x%"PRIx64" involved node changed, create update trans", pStream->uid);
|
||||
mDebug("stream:0x%" PRIx64 " involved node changed, create update trans", pStream->uid);
|
||||
int32_t code = createStreamUpdateTrans(pMnode, pStream, pChangeInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// todo
|
||||
|
@ -2003,12 +2007,12 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChange
|
|||
return 0;
|
||||
}
|
||||
|
||||
static SArray* doExtractNodeListFromStream(SMnode *pMnode) {
|
||||
static SArray *doExtractNodeListFromStream(SMnode *pMnode) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SStreamObj *pStream = NULL;
|
||||
void *pIter = NULL;
|
||||
|
||||
SHashObj* pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||
if (pIter == NULL) {
|
||||
|
@ -2024,7 +2028,7 @@ static SArray* doExtractNodeListFromStream(SMnode *pMnode) {
|
|||
int32_t numOfTasks = taosArrayGetSize(pLevel);
|
||||
for (int32_t k = 0; k < numOfTasks; ++k) {
|
||||
SStreamTask *pTask = taosArrayGetP(pLevel, k);
|
||||
SNodeEntry entry = {0};
|
||||
SNodeEntry entry = {0};
|
||||
epsetAssign(&entry.epset, &pTask->info.epSet);
|
||||
entry.nodeId = pTask->info.nodeId;
|
||||
entry.hbTimestamp = -1;
|
||||
|
@ -2034,14 +2038,15 @@ static SArray* doExtractNodeListFromStream(SMnode *pMnode) {
|
|||
}
|
||||
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
sdbRelease(pSdb, pStream);
|
||||
}
|
||||
|
||||
SArray* plist = taosArrayInit(taosHashGetSize(pHash), sizeof(SNodeEntry));
|
||||
SArray *plist = taosArrayInit(taosHashGetSize(pHash), sizeof(SNodeEntry));
|
||||
|
||||
// convert to list
|
||||
pIter = NULL;
|
||||
while((pIter = taosHashIterate(pHash, pIter)) != NULL) {
|
||||
SNodeEntry* pEntry = (SNodeEntry*) pIter;
|
||||
while ((pIter = taosHashIterate(pHash, pIter)) != NULL) {
|
||||
SNodeEntry *pEntry = (SNodeEntry *)pIter;
|
||||
taosArrayPush(plist, pEntry);
|
||||
}
|
||||
taosHashCleanup(pHash);
|
||||
|
@ -2094,7 +2099,8 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
typedef struct SMStreamNodeCheckMsg{} SMStreamNodeCheckMsg;
|
||||
typedef struct SMStreamNodeCheckMsg {
|
||||
} SMStreamNodeCheckMsg;
|
||||
|
||||
static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
|
@ -2104,13 +2110,14 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
SMStreamNodeCheckMsg *pMsg = rpcMallocCont(sizeof(SMStreamNodeCheckMsg));
|
||||
SRpcMsg rpcMsg = { .msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = sizeof(SMStreamNodeCheckMsg)};
|
||||
SRpcMsg rpcMsg = {
|
||||
.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = sizeof(SMStreamNodeCheckMsg)};
|
||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// todo: this process should be executed by the write queue worker of the mnode
|
||||
//int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||
// int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||
// SMnode *pMnode = pReq->info.node;
|
||||
// SSdb *pSdb = pMnode->pSdb;
|
||||
// SStreamHbMsg req = {0};
|
||||
|
|
|
@ -46,7 +46,7 @@ SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) {
|
|||
void sdbFreeRaw(SSdbRaw *pRaw) {
|
||||
if (pRaw != NULL) {
|
||||
#if 1
|
||||
mTrace("raw:%p, is freed", pRaw);
|
||||
mTrace("raw:%p, is freed, len:%d, table:%s", pRaw, pRaw->dataLen, sdbTableName(pRaw->type));
|
||||
#endif
|
||||
taosMemoryFree(pRaw);
|
||||
}
|
||||
|
|
|
@ -1411,8 +1411,8 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
|
|||
return 0;
|
||||
}
|
||||
int streamStateOpenBackend(void* backend, SStreamState* pState) {
|
||||
qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId);
|
||||
// taosAcquireRef(streamBackendId, pState->streamBackendRid);
|
||||
// qInfo("start to open state %p on backend %p 0x%" PRIx64 "-%d", pState, backend, pState->streamId, pState->taskId);
|
||||
taosAcquireRef(streamBackendId, pState->streamBackendRid);
|
||||
SBackendWrapper* handle = backend;
|
||||
SBackendCfWrapper* pBackendCfWrapper = taosMemoryCalloc(1, sizeof(SBackendCfWrapper));
|
||||
|
||||
|
@ -1495,6 +1495,9 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
|
|||
void streamStateCloseBackend(SStreamState* pState, bool remove) {
|
||||
SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper;
|
||||
SBackendWrapper* pHandle = wrapper->pBackend;
|
||||
|
||||
qInfo("start to close state on backend: %p", pHandle);
|
||||
|
||||
taosThreadMutexLock(&pHandle->cfMutex);
|
||||
RocksdbCfInst** ppInst = taosHashGet(pHandle->cfInst, wrapper->idstr, strlen(pState->pTdbState->idstr) + 1);
|
||||
if (ppInst != NULL && *ppInst != NULL) {
|
||||
|
@ -1505,7 +1508,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
|
|||
taosThreadMutexUnlock(&pHandle->cfMutex);
|
||||
|
||||
char* status[] = {"close", "drop"};
|
||||
qInfo("start to close %s state %p on backendWrapper %p %s", status[remove == false ? 0 : 1], pState, wrapper,
|
||||
qInfo("start to %s state %p on backendWrapper %p %s", status[remove == false ? 0 : 1], pState, wrapper,
|
||||
wrapper->idstr);
|
||||
wrapper->remove |= remove; // update by other pState
|
||||
taosReleaseRef(streamBackendCfWrapperId, pState->pTdbState->backendCfWrapperId);
|
||||
|
|
|
@ -138,7 +138,8 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
|
|||
int64_t id = *(int64_t*)uniqueId;
|
||||
pState->pTdbState->backendCfWrapperId = id;
|
||||
pState->pTdbState->pBackendCfWrapper = taosAcquireRef(streamBackendCfWrapperId, id);
|
||||
|
||||
// already exist stream task for
|
||||
qInfo("already exist stream state for %s", pState->pTdbState->idstr);
|
||||
taosAcquireRef(streamBackendId, pState->streamBackendRid);
|
||||
}
|
||||
taosThreadMutexUnlock(&pMeta->backendMutex);
|
||||
|
@ -148,6 +149,8 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
|
|||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT);
|
||||
|
||||
pState->parNameMap = tSimpleHashInit(1024, hashFn);
|
||||
qInfo("succ to open state %p on backend %p 0x%" PRIx64 "-%d", pState, pMeta->streamBackend, pState->streamId,
|
||||
pState->taskId);
|
||||
return pState;
|
||||
|
||||
#else
|
||||
|
|
Loading…
Reference in New Issue