Merge remote-tracking branch 'origin/enh/triggerCheckPoint2' into enh/triggerCheckPoint2

# Conflicts:
#	source/dnode/mnode/impl/src/mndStream.c
This commit is contained in:
Haojun Liao 2023-08-23 17:53:50 +08:00
commit 809d3087ff
2 changed files with 87 additions and 80 deletions

View File

@ -77,12 +77,12 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
int64_t streamId, int32_t taskId); int64_t streamId, int32_t taskId);
static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode); static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode);
static SArray *doExtractNodeListFromStream(SMnode *pMnode); static SArray *doExtractNodeListFromStream(SMnode *pMnode);
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode); static SArray *mndTakeVgroupSnapshot(SMnode *pMnode);
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList); static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans); static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans);
static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset); static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset);
int32_t mndInitStream(SMnode *pMnode) { int32_t mndInitStream(SMnode *pMnode) {
@ -129,7 +129,12 @@ int32_t mndInitStream(SMnode *pMnode) {
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
} }
void mndCleanupStream(SMnode *pMnode) {} void mndCleanupStream(SMnode *pMnode) {
taosArrayDestroy(execNodeList.pTaskList);
taosHashCleanup(execNodeList.pTaskMap);
taosThreadMutexDestroy(&execNodeList.lock);
mDebug("mnd stream cleanup");
}
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) { SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -1156,7 +1161,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode); SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode);
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot); SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot);
bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
taosArrayDestroy(changeInfo.pUpdateNodeList); taosArrayDestroy(changeInfo.pUpdateNodeList);
taosHashCleanup(changeInfo.pDBMap); taosHashCleanup(changeInfo.pDBMap);
taosArrayDestroy(pNodeSnapshot); taosArrayDestroy(pNodeSnapshot);
@ -1167,12 +1172,12 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
} }
} }
{// check if all tasks are in TASK_STATUS__NORMAL status { // check if all tasks are in TASK_STATUS__NORMAL status
bool ready = true; bool ready = true;
taosThreadMutexLock(&execNodeList.lock); taosThreadMutexLock(&execNodeList.lock);
for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pTaskList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(execNodeList.pTaskList); ++i) {
STaskStatusEntry* p = taosArrayGet(execNodeList.pTaskList, i); STaskStatusEntry *p = taosArrayGet(execNodeList.pTaskList, i);
if (p->status != TASK_STATUS__NORMAL) { if (p->status != TASK_STATUS__NORMAL) {
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, create checkpoint msg not issued", mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, create checkpoint msg not issued",
p->streamId, p->taskId, 0, streamGetTaskStatusStr(p->status)); p->streamId, p->taskId, 0, streamGetTaskStatusStr(p->status));
@ -1197,9 +1202,9 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
} }
mDebug("start to trigger checkpoint, checkpointId: %" PRId64 "", checkpointId); mDebug("start to trigger checkpoint, checkpointId: %" PRId64 "", checkpointId);
const char* pDb = mndGetStreamDB(pMnode); const char *pDb = mndGetStreamDB(pMnode);
mndTransSetDbName(pTrans, pDb, "checkpoint"); mndTransSetDbName(pTrans, pDb, "checkpoint");
taosMemoryFree((void*)pDb); taosMemoryFree((void *)pDb);
if (mndTransCheckConflict(pMnode, pTrans) != 0) { if (mndTransCheckConflict(pMnode, pTrans) != 0) {
mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId, mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId,
@ -2226,7 +2231,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
return 0; return 0;
} }
static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode) { static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode) {
int32_t level = taosArrayGetSize(pStream->tasks); int32_t level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; i++) { for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i); SArray *pLevel = taosArrayGetP(pStream->tasks, i);
@ -2234,9 +2239,9 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p
int32_t numOfTasks = taosArrayGetSize(pLevel); int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfTasks; j++) { for (int32_t j = 0; j < numOfTasks; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j); SStreamTask *pTask = taosArrayGetP(pLevel, j);
int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId}; int64_t keys[2] = {pTask->id.streamId, pTask->id.taskId};
void* p = taosHashGet(pExecNode->pTaskMap, keys, sizeof(keys)); void *p = taosHashGet(pExecNode->pTaskMap, keys, sizeof(keys));
if (p == NULL) { if (p == NULL) {
STaskStatusEntry entry = { STaskStatusEntry entry = {
.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .status = TASK_STATUS__STOP}; .streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .status = TASK_STATUS__STOP};
@ -2250,21 +2255,23 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p
} }
// todo: this process should be executed by the write queue worker of the mnode // todo: this process should be executed by the write queue worker of the mnode
int32_t mndProcessStreamHb(SRpcMsg *pReq) { int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SStreamHbMsg req = {0}; SStreamHbMsg req = {0};
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SDecoder decoder = {0}; SDecoder decoder = {0};
tDecoderInit(&decoder, pReq->pCont, pReq->contLen); tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
if (tDecodeStreamHbMsg(&decoder, &req) < 0) { if (tDecodeStreamHbMsg(&decoder, &req) < 0) {
tDecoderClear(&decoder);
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
tDecoderClear(&decoder);
// int64_t now = taosGetTimestampSec(); // int64_t now = taosGetTimestampSec();
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
taosThreadMutexLock(&execNodeList.lock); taosThreadMutexLock(&execNodeList.lock);
@ -2273,7 +2280,6 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p
doExtractTasksFromStream(pMnode); doExtractTasksFromStream(pMnode);
} }
// todo remove it when drop stream
for(int32_t i = 0; i < req.numOfTasks; ++i) { for(int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry* p = taosArrayGet(req.pTaskStatus, i); STaskStatusEntry* p = taosArrayGet(req.pTaskStatus, i);
int64_t k[2] = {p->streamId, p->taskId}; int64_t k[2] = {p->streamId, p->taskId};
@ -2287,64 +2293,66 @@ static void keepStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* p
} }
taosThreadMutexUnlock(&execNodeList.lock); taosThreadMutexUnlock(&execNodeList.lock);
taosArrayDestroy(req.pTaskStatus);
// bool nodeChanged = false; // bool nodeChanged = false;
// SArray* pList = taosArrayInit(4, sizeof(int32_t)); // SArray* pList = taosArrayInit(4, sizeof(int32_t));
/* /*
// record the timeout node // record the timeout node
for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) { for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) {
SNodeEntry* pEntry = taosArrayGet(execNodeList.pNodeEntryList, i); SNodeEntry* pEntry = taosArrayGet(execNodeList.pNodeEntryList, i);
int64_t duration = now - pEntry->hbTimestamp; int64_t duration = now - pEntry->hbTimestamp;
if (duration > MND_STREAM_HB_INTERVAL) { // execNode timeout, try next if (duration > MND_STREAM_HB_INTERVAL) { // execNode timeout, try next
taosArrayPush(pList, &pEntry); taosArrayPush(pList, &pEntry);
mWarn("nodeId:%d stream node timeout, since last hb:%"PRId64"s", pEntry->nodeId, duration); mWarn("nodeId:%d stream node timeout, since last hb:%"PRId64"s", pEntry->nodeId, duration);
continue; continue;
}
if (pEntry->nodeId != req.vgId) {
continue;
}
pEntry->hbTimestamp = now;
// check epset to identify whether the node has been transferred to other dnodes.
// node the epset is changed, which means the node transfer has occurred for this node.
// if (!isEpsetEqual(&pEntry->epset, &req.epset)) {
// nodeChanged = true;
// break;
// }
}
// todo handle the node timeout case. Once the vnode is off-line, we should check the dnode status from mnode,
// to identify whether the dnode is truely offline or not.
// handle the node changed case
if (!nodeChanged) {
return TSDB_CODE_SUCCESS;
}
int32_t nodeId = req.vgId;
{// check all streams that involved this vnode should update the epset info
SStreamObj *pStream = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
break;
} }
// update the related upstream and downstream tasks, todo remove this, no need this function if (pEntry->nodeId != req.vgId) {
taosWLockLatch(&pStream->lock); continue;
// streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset); }
// streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset);
taosWUnLockLatch(&pStream->lock);
// code = createStreamUpdateTrans(pMnode, pStream, nodeId, ); pEntry->hbTimestamp = now;
// if (code != TSDB_CODE_SUCCESS) {
// todo // check epset to identify whether the node has been transferred to other dnodes.
//// } // node the epset is changed, which means the node transfer has occurred for this node.
// } // if (!isEpsetEqual(&pEntry->epset, &req.epset)) {
} // nodeChanged = true;
*/ // break;
// }
}
// todo handle the node timeout case. Once the vnode is off-line, we should check the dnode status from mnode,
// to identify whether the dnode is truely offline or not.
// handle the node changed case
if (!nodeChanged) {
return TSDB_CODE_SUCCESS;
}
int32_t nodeId = req.vgId;
{// check all streams that involved this vnode should update the epset info
SStreamObj *pStream = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
break;
}
// update the related upstream and downstream tasks, todo remove this, no need this function
taosWLockLatch(&pStream->lock);
// streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset);
// streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset);
taosWUnLockLatch(&pStream->lock);
// code = createStreamUpdateTrans(pMnode, pStream, nodeId, );
// if (code != TSDB_CODE_SUCCESS) {
// todo
//// }
// }
}
*/
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -790,14 +790,13 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*
for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) { for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
if (wrapper->pHandle[i]) { if (wrapper->pHandle[i]) {
rocksdb_column_family_handle_t* p = wrapper->pHandle[i]; rocksdb_column_family_handle_t* p = wrapper->pHandle[i];
size_t len = 0;
char* name = rocksdb_column_family_handle_get_name(p, &len);
// char buf[64] = {0};
// memcpy(buf, name, len);
// qError("column name: name: %s, len: %d", buf, (int)len);
// taosMemoryFree(name);
taosArrayPush(pHandle, &p); taosArrayPush(pHandle, &p);
// size_t len = 0;
// char* name = rocksdb_column_family_handle_get_name(p, &len);
// char buf[64] = {0};
// memcpy(buf, name, len);
// qError("column name: name: %s, len: %d", buf, (int)len);
// taosMemoryFree(name);
} }
} }
taosThreadRwlockUnlock(&wrapper->rwLock); taosThreadRwlockUnlock(&wrapper->rwLock);