trigger checkpoint
This commit is contained in:
parent
7ded6fc7a2
commit
551cd7cdc8
|
@ -844,68 +844,20 @@ static int32_t mndCreateCheckpoint(SMnode *pMnode, int32_t vgId, SList *pStreamL
|
||||||
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
|
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
void *pIter = NULL;
|
if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
|
||||||
SStreamObj *pStream = NULL;
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// listEleSize();
|
int64_t checkpointId = tGenIdPI64();
|
||||||
|
|
||||||
// iterate all stream obj
|
|
||||||
// SHashObj *vgIds = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
|
|
||||||
while (1) {
|
|
||||||
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
|
||||||
if (pIter == NULL) break;
|
|
||||||
|
|
||||||
// taosRLockLatch(&pStream->lock);
|
|
||||||
// for (int32_t i = 0; i < taosArrayGetSize(pStream->tasks); i++) {
|
|
||||||
// SArray *pLevel = taosArrayGetP(pStream->tasks, i);
|
|
||||||
// SStreamTask *pTask = taosArrayGetP(pLevel, 0);
|
|
||||||
// if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
|
||||||
// int32_t sz = taosArrayGetSize(pLevel);
|
|
||||||
// SList *list = taosHashGet(vgIds, &pTask->nodeId, sizeof(pTask->nodeId));
|
|
||||||
// if (list == NULL) {
|
|
||||||
// SList tlist;
|
|
||||||
// tdListInit(&tlist, TSDB_STREAM_FNAME_LEN);
|
|
||||||
// taosHashPut(vgIds, &pTask->nodeId, sizeof(pTask->nodeId), &tlist, sizeof(tlist));
|
|
||||||
// list = taosHashGet(vgIds, &pTask->nodeId, sizeof(pTask->nodeId));
|
|
||||||
// }
|
|
||||||
// tdListAppend(list, (void *)pStream->name);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// taosRUnLockLatch(&pStream->lock);
|
|
||||||
|
|
||||||
if (pIter == NULL) break;
|
|
||||||
// incr tick
|
|
||||||
int64_t currentTick = atomic_add_fetch_64(&pStream->currentTick, 1);
|
|
||||||
// if >= checkpointFreq, build msg TDMT_MND_STREAM_BEGIN_CHECKPOINT, put into write q
|
|
||||||
// if (currentTick >= pStream->checkpointFreq) {
|
|
||||||
atomic_store_64(&pStream->currentTick, 0);
|
|
||||||
SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg));
|
SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg));
|
||||||
|
pMsg->checkpointId = checkpointId;
|
||||||
pMsg->streamId = pStream->uid;
|
|
||||||
pMsg->checkpointId = tGenIdPI64();
|
|
||||||
memcpy(pMsg->streamName, pStream->name, TSDB_STREAM_FNAME_LEN);
|
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {
|
SRpcMsg rpcMsg = {
|
||||||
.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT,
|
.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT,
|
||||||
.pCont = pMsg,
|
.pCont = pMsg,
|
||||||
.contLen = sizeof(SMStreamDoCheckpointMsg),
|
.contLen = sizeof(SMStreamDoCheckpointMsg),
|
||||||
};
|
};
|
||||||
|
|
||||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
}
|
|
||||||
|
|
||||||
// void *vgIter = taosHashIterate(vgIds, NULL);
|
|
||||||
// size_t klen = 0;
|
|
||||||
// int64_t checkpointId = tGenIdPI64();
|
|
||||||
// while (vgIter) {
|
|
||||||
// int32_t *key = (int32_t *)taosHashGetKey(vgIter, &klen);
|
|
||||||
// SList *val = (SList *)vgIter;
|
|
||||||
|
|
||||||
// mndCreateCheckpoint(pMnode, *key, val);
|
|
||||||
// vgIter = taosHashIterate(vgIds, vgIter);
|
|
||||||
// }
|
|
||||||
// taosHashCleanup(vgIds);
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -992,39 +944,24 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, SHashObj *vgIds,
|
||||||
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
int64_t checkpointId) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint");
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
|
||||||
|
|
||||||
SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont;
|
|
||||||
|
|
||||||
SStreamObj *pStream = mndAcquireStream(pMnode, pMsg->streamName);
|
|
||||||
|
|
||||||
if (pStream == NULL || pStream->uid != pMsg->streamId) {
|
|
||||||
mError("failed to checkpoint since stream %s not found", pMsg->streamName);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// build new transaction:
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "stream-checkpoint");
|
|
||||||
if (pTrans == NULL) return -1;
|
if (pTrans == NULL) return -1;
|
||||||
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
|
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
|
||||||
if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
|
if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
|
||||||
mError("failed to checkpoint since stream %s", tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
|
mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId,
|
||||||
|
tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
|
||||||
mndReleaseStream(pMnode, pStream);
|
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosRLockLatch(&pStream->lock);
|
taosRLockLatch(&pStream->lock);
|
||||||
// 1. redo action: broadcast checkpoint source msg for all source vg
|
// 1. redo action: broadcast checkpoint source msg for all source vg
|
||||||
int32_t totLevel = taosArrayGetSize(pStream->tasks);
|
int32_t totLevel = taosArrayGetSize(pStream->tasks);
|
||||||
for (int32_t i = 0; i < totLevel; i++) {
|
for (int32_t i = 0; i < totLevel; i++) {
|
||||||
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
|
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
|
||||||
SStreamTask *pTask = taosArrayGetP(pLevel, 0);
|
SStreamTask *pTask = taosArrayGetP(pLevel, 0);
|
||||||
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
if (pTask->taskLevel == TASK_LEVEL__SOURCE && NULL == taosHashGet(vgIds, &pTask->nodeId, sizeof(pTask->nodeId))) {
|
||||||
int32_t sz = taosArrayGetSize(pLevel);
|
int32_t sz = taosArrayGetSize(pLevel);
|
||||||
for (int32_t j = 0; j < sz; j++) {
|
for (int32_t j = 0; j < sz; j++) {
|
||||||
SStreamTask *pTask = taosArrayGetP(pLevel, j);
|
SStreamTask *pTask = taosArrayGetP(pLevel, j);
|
||||||
|
@ -1032,17 +969,15 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
||||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->nodeId);
|
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->nodeId);
|
||||||
if (pVgObj == NULL) {
|
if (pVgObj == NULL) {
|
||||||
taosRUnLockLatch(&pStream->lock);
|
taosRUnLockLatch(&pStream->lock);
|
||||||
mndReleaseStream(pMnode, pStream);
|
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *buf;
|
void *buf;
|
||||||
int32_t tlen;
|
int32_t tlen;
|
||||||
if (mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask, pMsg) < 0) {
|
if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->nodeId, checkpointId) < 0) {
|
||||||
mndReleaseVgroup(pMnode, pVgObj);
|
mndReleaseVgroup(pMnode, pVgObj);
|
||||||
taosRUnLockLatch(&pStream->lock);
|
taosRUnLockLatch(&pStream->lock);
|
||||||
mndReleaseStream(pMnode, pStream);
|
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1062,17 +997,18 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
taosHashPut(vgIds, &pTask->nodeId, sizeof(pTask->nodeId), &pTask->nodeId, sizeof(pTask->nodeId));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// 2. reset tick
|
// 2. reset tick
|
||||||
|
pStream->checkpointFreq = checkpointId;
|
||||||
atomic_store_64(&pStream->currentTick, 0);
|
atomic_store_64(&pStream->currentTick, 0);
|
||||||
// 3. commit log: stream checkpoint info
|
// 3. commit log: stream checkpoint info
|
||||||
pStream->checkpointFreq = taosGetTimestampMs();
|
|
||||||
|
|
||||||
taosRUnLockLatch(&pStream->lock);
|
taosRUnLockLatch(&pStream->lock);
|
||||||
|
|
||||||
// code condtion
|
// // code condtion
|
||||||
|
|
||||||
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
|
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
|
||||||
if (pCommitRaw == NULL) {
|
if (pCommitRaw == NULL) {
|
||||||
|
@ -1094,15 +1030,31 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
||||||
mError("failed to prepare trans rebalance since %s", terrstr());
|
mError("failed to prepare trans rebalance since %s", terrstr());
|
||||||
goto _ERR;
|
goto _ERR;
|
||||||
}
|
}
|
||||||
|
|
||||||
mndReleaseStream(pMnode, pStream);
|
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return 0;
|
return 0;
|
||||||
_ERR:
|
_ERR:
|
||||||
mndReleaseStream(pMnode, pStream);
|
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
||||||
|
SMnode *pMnode = pReq->info.node;
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
void *pIter = NULL;
|
||||||
|
SStreamObj *pStream = NULL;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont;
|
||||||
|
int64_t checkpointId = pMsg->checkpointId;
|
||||||
|
|
||||||
|
SHashObj *vgIds = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||||
|
while (1) {
|
||||||
|
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
code = mndProcessStreamCheckpointTrans(pMnode, pStream, vgIds, checkpointId);
|
||||||
|
sdbRelease(pSdb, pStream);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
|
|
Loading…
Reference in New Issue