enh(stream): create update stream trans.

This commit is contained in:
Haojun Liao 2023-08-09 19:38:13 +08:00
parent f3c5f20ee2
commit d34b9af054
3 changed files with 167 additions and 86 deletions

View File

@ -32,8 +32,6 @@
#define MND_STREAM_VER_NUMBER 3
#define MND_STREAM_RESERVE_SIZE 64
#define MND_STREAM_MAX_NUM 60
#define MND_STREAM_HB_INTERVAL 100 // 100 sec
typedef struct SNodeEntry {
int32_t nodeId;
@ -44,6 +42,7 @@ typedef struct SNodeEntry {
typedef struct SStreamVnodeRevertIndex {
SArray* pDBList;
SArray* pNodeEntryList;
int64_t ts; // snapshot ts
} SStreamVnodeRevertIndex;
static int32_t mndNodeCheckSentinel = 0;
@ -98,7 +97,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
// mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq);
@ -863,7 +862,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = sizeof(SMStreamDoCheckpointMsg)};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
// tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
return 0;
}
@ -1740,6 +1739,7 @@ typedef struct SVgroupChangeInfo {
} SVgroupChangeInfo;
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg* pMsg, const SVgroupChangeInfo* pInfo) {
pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo));
taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList);
}
@ -1821,7 +1821,6 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr
}
mDebug("start to build stream:0x%" PRIx64 " task DAG update", pStream->uid);
ASSERT(0);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
@ -1856,7 +1855,25 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr
}
taosWUnLockLatch(&pStream->lock);
return mndPersistTransLog(pStream, pTrans);
int32_t code = mndPersistTransLog(pStream, pTrans);
if (code != TSDB_CODE_SUCCESS) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
// todo. 1. multiple change, 2. replica change problem
@ -1943,6 +1960,7 @@ int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChangeInfo) {
continue;
}
mDebug("stream:0x%"PRIx64" involved node changed, create update trans", pStream->uid);
int32_t code = createStreamUpdateTrans(pMnode, pStream, pChangeInfo);
if (code != TSDB_CODE_SUCCESS) {
// todo
@ -1952,7 +1970,54 @@ int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo* pChangeInfo) {
return 0;
}
static SArray* doExtractNodeList(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
SHashObj* pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
break;
}
taosWLockLatch(&pStream->lock);
int32_t numOfLevels = taosArrayGetSize(pStream->tasks);
for (int32_t j = 0; j < numOfLevels; ++j) {
SArray *pLevel = taosArrayGetP(pStream->tasks, j);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask *pTask = taosArrayGetP(pLevel, k);
SNodeEntry entry = {0};
epsetAssign(&entry.epset, &pTask->info.epSet);
entry.nodeId = pTask->info.nodeId;
entry.hbTimestamp = -1;
taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry));
}
}
taosWUnLockLatch(&pStream->lock);
}
SArray* plist = taosArrayInit(taosHashGetSize(pHash), sizeof(SNodeEntry));
// convert to list
pIter = NULL;
while((pIter = taosHashIterate(pHash, pIter)) != NULL) {
SNodeEntry* pEntry = (SNodeEntry*) pIter;
taosArrayPush(plist, pEntry);
}
taosHashCleanup(pHash);
return plist;
}
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
return 0;
int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
if (old != 0) {
mDebug("still in checking node change");
@ -1962,98 +2027,111 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
mDebug("start to do node change checking");
SMnode *pMnode = pMsg->info.node;
if (execNodeList.pNodeEntryList == NULL) {
execNodeList.pNodeEntryList = doExtractNodeList(pMnode);
}
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
int64_t ts = taosGetTimestampSec();
SVgroupChangeInfo changeInfo = mndFindChangedVgroupInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot);
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
mndProcessVgroupChange(pMnode, &changeInfo);
}
taosArrayDestroy(changeInfo.pUpdateNodeList);
taosHashCleanup(changeInfo.pDBMap);
// keep the new vnode snapshot
taosArrayDestroy(execNodeList.pNodeEntryList);
execNodeList.pNodeEntryList = pNodeSnapshot;
execNodeList.ts = ts;
mDebug("end to do node change checking");
atomic_store_32(&mndNodeCheckSentinel, 0);
return 0;
}
// todo: this process should be executed by the write queue worker of the mnode
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SStreamHbMsg req = {0};
int32_t code = TSDB_CODE_SUCCESS;
SDecoder decoder = {0};
tDecoderInit(&decoder, (uint8_t *)pReq->pCont, pReq->contLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeStreamHbMsg(&decoder, &req) < 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
int64_t now = taosGetTimestampSec();
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
// timeout list
bool nodeChanged = false;
SArray* pList = taosArrayInit(4, sizeof(int32_t));
// record the timeout node
for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) {
SNodeEntry* pEntry = taosArrayGet(execNodeList.pNodeEntryList, i);
int64_t duration = now - pEntry->hbTimestamp;
if (duration > MND_STREAM_HB_INTERVAL) { // execNode timeout, try next
taosArrayPush(pList, &pEntry);
mWarn("nodeId:%d stream node timeout, since last hb:%"PRId64"s", pEntry->nodeId, duration);
continue;
}
if (pEntry->nodeId != req.vgId) {
continue;
}
pEntry->hbTimestamp = now;
// check epset to identify whether the node has been transferred to other dnodes.
// node the epset is changed, which means the node transfer has occurred for this node.
// if (!isEpsetEqual(&pEntry->epset, &req.epset)) {
// nodeChanged = true;
//int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// SMnode *pMnode = pReq->info.node;
// SSdb *pSdb = pMnode->pSdb;
// SStreamHbMsg req = {0};
// int32_t code = TSDB_CODE_SUCCESS;
//
// SDecoder decoder = {0};
// tDecoderInit(&decoder, (uint8_t *)pReq->pCont, pReq->contLen);
//
// if (tStartDecode(&decoder) < 0) return -1;
//
// if (tDecodeStreamHbMsg(&decoder, &req) < 0) {
// terrno = TSDB_CODE_INVALID_MSG;
// return -1;
// }
//
// int64_t now = taosGetTimestampSec();
// mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
//
// // timeout list
// bool nodeChanged = false;
// SArray* pList = taosArrayInit(4, sizeof(int32_t));
//
// // record the timeout node
// for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) {
// SNodeEntry* pEntry = taosArrayGet(execNodeList.pNodeEntryList, i);
// int64_t duration = now - pEntry->hbTimestamp;
// if (duration > MND_STREAM_HB_INTERVAL) { // execNode timeout, try next
// taosArrayPush(pList, &pEntry);
// mWarn("nodeId:%d stream node timeout, since last hb:%"PRId64"s", pEntry->nodeId, duration);
// continue;
// }
//
// if (pEntry->nodeId != req.vgId) {
// continue;
// }
//
// pEntry->hbTimestamp = now;
//
// // check epset to identify whether the node has been transferred to other dnodes.
// // node the epset is changed, which means the node transfer has occurred for this node.
//// if (!isEpsetEqual(&pEntry->epset, &req.epset)) {
//// nodeChanged = true;
//// break;
//// }
// }
//
// // todo handle the node timeout case. Once the vnode is off-line, we should check the dnode status from mnode,
// // to identify whether the dnode is truely offline or not.
//
// // handle the node changed case
// if (!nodeChanged) {
// return TSDB_CODE_SUCCESS;
// }
//
// int32_t nodeId = req.vgId;
//
// {// check all streams that involved this vnode should update the epset info
// SStreamObj *pStream = NULL;
// void *pIter = NULL;
// while (1) {
// pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
// if (pIter == NULL) {
// break;
// }
}
// todo handle the node timeout case. Once the vnode is off-line, we should check the dnode status from mnode,
// to identify whether the dnode is truely offline or not.
// handle the node changed case
if (!nodeChanged) {
return TSDB_CODE_SUCCESS;
}
int32_t nodeId = req.vgId;
{// check all streams that involved this vnode should update the epset info
SStreamObj *pStream = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
break;
}
// update the related upstream and downstream tasks, todo remove this, no need this function
taosWLockLatch(&pStream->lock);
// streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset);
// streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset);
taosWUnLockLatch(&pStream->lock);
// code = createStreamUpdateTrans(pMnode, pStream, nodeId, );
// if (code != TSDB_CODE_SUCCESS) {
// todo
//
// // update the related upstream and downstream tasks, todo remove this, no need this function
// taosWLockLatch(&pStream->lock);
//// streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset);
//// streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset);
// taosWUnLockLatch(&pStream->lock);
//
//// code = createStreamUpdateTrans(pMnode, pStream, nodeId, );
//// if (code != TSDB_CODE_SUCCESS) {
//// todo
//// }
// }
}
}
return TSDB_CODE_SUCCESS;
}
// }
//
// return TSDB_CODE_SUCCESS;
//}

View File

@ -658,6 +658,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->status.schedStatus);
// the inputQ is empty due to the checkpoint process, so we need to scan data from WAL here.
if ((!streamTaskShouldStop(&pTask->status)) && (!streamTaskShouldPause(&pTask->status))) {
streamSchedExec(pTask);
}

View File

@ -547,6 +547,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
}
void metaHbToMnode(void* param, void* tmrId) {
#if 0
SStreamMeta* pMeta = param;
SStreamHbMsg hbMsg = {0};
@ -592,4 +593,5 @@ void metaHbToMnode(void* param, void* tmrId) {
// next hb will be issued in 20sec.
taosTmrReset(metaHbToMnode, 20000, pMeta, streamEnv.timer, pMeta->hbTmr);
#endif
}