fix(stream): handle the repeatly issued nodeUpdate msg from dnode.

This commit is contained in:
Haojun Liao 2023-10-25 09:55:02 +08:00
parent 26aaa4a83c
commit 03165d5327
6 changed files with 80 additions and 45 deletions

View File

@ -436,33 +436,38 @@ typedef struct STaskStartInfo {
int32_t elapsedTime; int32_t elapsedTime;
} STaskStartInfo; } STaskStartInfo;
typedef struct STaskUpdateInfo {
SHashObj* pTasks;
int32_t transId;
} STaskUpdateInfo;
// meta // meta
typedef struct SStreamMeta { typedef struct SStreamMeta {
char* path; char* path;
TDB* db; TDB* db;
TTB* pTaskDb; TTB* pTaskDb;
TTB* pCheckpointDb; TTB* pCheckpointDb;
SHashObj* pTasksMap; SHashObj* pTasksMap;
SArray* pTaskList; // SArray<STaskId*> SArray* pTaskList; // SArray<STaskId*>
void* ahandle; void* ahandle;
TXN* txn; TXN* txn;
FTaskExpand* expandFunc; FTaskExpand* expandFunc;
int32_t vgId; int32_t vgId;
int64_t stage; int64_t stage;
int32_t role; int32_t role;
STaskStartInfo startInfo; STaskStartInfo startInfo;
SRWLatch lock; SRWLatch lock;
int32_t walScanCounter; int32_t walScanCounter;
void* streamBackend; void* streamBackend;
int64_t streamBackendRid; int64_t streamBackendRid;
SHashObj* pTaskBackendUnique; SHashObj* pTaskBackendUnique;
TdThreadMutex backendMutex; TdThreadMutex backendMutex;
SMetaHbInfo* pHbInfo; SMetaHbInfo* pHbInfo;
SHashObj* pUpdateTaskSet; STaskUpdateInfo updateInfo;
int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta
int32_t numOfPausedTasks; int32_t numOfPausedTasks;
int32_t chkptNotReadyTasks; int32_t chkptNotReadyTasks;
int64_t rid; int64_t rid;
int64_t chkpId; int64_t chkpId;
SArray* chkpSaved; SArray* chkpSaved;
@ -664,6 +669,7 @@ typedef struct SNodeUpdateInfo {
} SNodeUpdateInfo; } SNodeUpdateInfo;
typedef struct SStreamTaskNodeUpdateMsg { typedef struct SStreamTaskNodeUpdateMsg {
int32_t transId; // to identify the msg
int64_t streamId; int64_t streamId;
int32_t taskId; int32_t taskId;
SArray* pNodeList; // SArray<SNodeUpdateInfo> SArray* pNodeList; // SArray<SNodeUpdateInfo>

View File

@ -1883,18 +1883,19 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
return TSDB_CODE_ACTION_IN_PROGRESS; return TSDB_CODE_ACTION_IN_PROGRESS;
} }
static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, int64_t streamId, static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId,
int32_t taskId) { int32_t transId) {
pMsg->streamId = streamId; pMsg->streamId = pId->streamId;
pMsg->taskId = taskId; pMsg->taskId = pId->taskId;
pMsg->transId = transId;
pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo)); pMsg->pNodeList = taosArrayInit(taosArrayGetSize(pInfo->pUpdateNodeList), sizeof(SNodeUpdateInfo));
taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList); taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList);
} }
static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId, static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId,
int64_t streamId, int32_t taskId) { SStreamTaskId* pId, int32_t transId) {
SStreamTaskNodeUpdateMsg req = {0}; SStreamTaskNodeUpdateMsg req = {0};
initNodeUpdateMsg(&req, pInfo, streamId, taskId); initNodeUpdateMsg(&req, pInfo, pId, transId);
int32_t code = 0; int32_t code = 0;
int32_t blen; int32_t blen;
@ -1968,7 +1969,7 @@ void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_
// todo extract method: traverse stream tasks // todo extract method: traverse stream tasks
// build trans to update the epset // build trans to update the epset
static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans* pTrans) { static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans* pTrans) {
mDebug("start to build stream:0x%" PRIx64 " task DAG update", pStream->uid); mDebug("start to build stream:0x%" PRIx64 " tasks epset update", pStream->uid);
taosWLockLatch(&pStream->lock); taosWLockLatch(&pStream->lock);
int32_t numOfLevels = taosArrayGetSize(pStream->tasks); int32_t numOfLevels = taosArrayGetSize(pStream->tasks);
@ -1983,7 +1984,7 @@ static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *p
void *pBuf = NULL; void *pBuf = NULL;
int32_t len = 0; int32_t len = 0;
streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, pTask->id.streamId, pTask->id.taskId); doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
STransAction action = {0}; STransAction action = {0};
initTransAction(&action, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet); initTransAction(&action, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &pTask->info.epSet);

View File

@ -125,7 +125,6 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision); int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
int32_t type, int32_t vgId); int32_t type, int32_t vgId);
//int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId);
int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId); int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId);
// tqMeta // tqMeta
@ -133,7 +132,6 @@ int32_t tqMetaOpen(STQ* pTq);
int32_t tqMetaClose(STQ* pTq); int32_t tqMetaClose(STQ* pTq);
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle); int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle);
int32_t tqMetaDeleteHandle(STQ* pTq, const char* key); int32_t tqMetaDeleteHandle(STQ* pTq, const char* key);
//int32_t tqMetaRestoreHandle(STQ* pTq);
int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen); int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen);
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key); int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key);
int32_t tqMetaRestoreCheckInfo(STQ* pTq); int32_t tqMetaRestoreCheckInfo(STQ* pTq);

View File

@ -20,6 +20,12 @@ typedef struct {
int8_t inited; int8_t inited;
} STqMgmt; } STqMgmt;
typedef struct STaskUpdateEntry {
int64_t streamId;
int32_t taskId;
int32_t transId;
} STaskUpdateEntry;
static STqMgmt tqMgmt = {0}; static STqMgmt tqMgmt = {0};
// 0: not init // 0: not init
@ -1869,7 +1875,26 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
} }
SStreamTask* pTask = *ppTask; SStreamTask* pTask = *ppTask;
tqDebug("s-task:%s receive nodeEp update msg from mnode", pTask->id.idStr);
if (pMeta->updateInfo.transId != req.transId) {
pMeta->updateInfo.transId = req.transId;
tqDebug("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", pTask->id.idStr, req.transId);
// info needs to be kept till the new trans to update the nodeEp arrived.
taosHashClear(pMeta->updateInfo.pTasks);
} else {
tqDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", pTask->id.idStr, req.transId);
}
STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId};
void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
if (exist != NULL) {
tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId,
req.transId);
rsp.code = TSDB_CODE_SUCCESS;
taosWUnLockLatch(&pMeta->lock);
taosArrayDestroy(req.pNodeList);
return rsp.code;
}
streamTaskUpdateEpsetInfo(pTask, req.pNodeList); streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
streamTaskResetStatus(pTask); streamTaskResetStatus(pTask);
@ -1899,12 +1924,14 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
} }
streamTaskStop(pTask); streamTaskStop(pTask);
taosHashPut(pMeta->pUpdateTaskSet, &pTask->id, sizeof(pTask->id), NULL, 0);
// keep the already handled info
taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0);
if (ppHTask != NULL) { if (ppHTask != NULL) {
streamTaskStop(*ppHTask); streamTaskStop(*ppHTask);
tqDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr); tqDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr);
taosHashPut(pMeta->pUpdateTaskSet, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0); taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0);
} else { } else {
tqDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr); tqDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr);
} }
@ -1913,7 +1940,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
// possibly only handle the stream task. // possibly only handle the stream task.
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
int32_t updateTasks = taosHashGetSize(pMeta->pUpdateTaskSet); int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
pMeta->startInfo.startedAfterNodeUpdate = 1; pMeta->startInfo.startedAfterNodeUpdate = 1;
@ -1922,8 +1949,6 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
updateTasks, (numOfTasks - updateTasks)); updateTasks, (numOfTasks - updateTasks));
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
} else { } else {
taosHashClear(pMeta->pUpdateTaskSet);
if (!pTq->pVnode->restored) { if (!pTq->pVnode->restored) {
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId); tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId);
pMeta->startInfo.startedAfterNodeUpdate = 0; pMeta->startInfo.startedAfterNodeUpdate = 0;

View File

@ -1175,6 +1175,9 @@ int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpda
if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pMsg->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pMsg->taskId) < 0) return -1;
// todo this new attribute will be result in being incompatible with previous version
if (tEncodeI32(pEncoder, pMsg->transId) < 0) return -1;
int32_t size = taosArrayGetSize(pMsg->pNodeList); int32_t size = taosArrayGetSize(pMsg->pNodeList);
if (tEncodeI32(pEncoder, size) < 0) return -1; if (tEncodeI32(pEncoder, size) < 0) return -1;
@ -1193,6 +1196,8 @@ int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg*
if (tDecodeI64(pDecoder, &pMsg->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pMsg->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pMsg->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pMsg->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pMsg->transId) < 0) return -1;
int32_t size = 0; int32_t size = 0;
if (tDecodeI32(pDecoder, &size) < 0) return -1; if (tDecodeI32(pDecoder, &size) < 0) return -1;
pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo)); pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo));

View File

@ -143,8 +143,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
goto _err; goto _err;
} }
pMeta->pUpdateTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK); pMeta->updateInfo.pTasks = taosHashInit(64, fp, false, HASH_NO_LOCK);
if (pMeta->pUpdateTaskSet == NULL) { if (pMeta->updateInfo.pTasks == NULL) {
goto _err; goto _err;
} }
@ -219,7 +219,7 @@ _err:
if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb); if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
if (pMeta->db) tdbClose(pMeta->db); if (pMeta->db) tdbClose(pMeta->db);
if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo); if (pMeta->pHbInfo) taosMemoryFreeClear(pMeta->pHbInfo);
if (pMeta->pUpdateTaskSet) taosHashCleanup(pMeta->pUpdateTaskSet); if (pMeta->updateInfo.pTasks) taosHashCleanup(pMeta->updateInfo.pTasks);
if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet); if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
taosMemoryFree(pMeta); taosMemoryFree(pMeta);
@ -340,7 +340,7 @@ void streamMetaCloseImpl(void* arg) {
taosHashCleanup(pMeta->pTasksMap); taosHashCleanup(pMeta->pTasksMap);
taosHashCleanup(pMeta->pTaskBackendUnique); taosHashCleanup(pMeta->pTaskBackendUnique);
taosHashCleanup(pMeta->pUpdateTaskSet); taosHashCleanup(pMeta->updateInfo.pTasks);
taosHashCleanup(pMeta->startInfo.pReadyTaskSet); taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
taosMemoryFree(pMeta->pHbInfo); taosMemoryFree(pMeta->pHbInfo);