From db57fc4ba85b4f6396f15e04a25c54cf90731bf6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 19 Mar 2025 23:15:15 +0800 Subject: [PATCH] fix(stream): fix race condition in send msg. (#30277) Co-authored-by: 54liuyao <54liuyao@163.com> Co-authored-by: Jinqing Kuang Co-authored-by: wangmm0220 Co-authored-by: yihaoDeng --- include/common/streamMsg.h | 1 + source/common/src/tdatablock.c | 1 + source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 2 +- source/dnode/vnode/src/tq/tqSink.c | 4 +- source/libs/executor/src/executil.c | 8 +- source/libs/stream/src/streamBackendRocksdb.c | 84 +++++----- source/libs/stream/src/streamDispatch.c | 158 +++++++++++------- source/libs/stream/src/streamExec.c | 2 +- source/libs/stream/src/streamMeta.c | 14 +- source/libs/stream/src/streamSessionState.c | 10 +- source/libs/stream/src/streamStartHistory.c | 2 - source/libs/stream/src/streamStartTask.c | 8 +- source/libs/stream/src/streamTask.c | 1 + source/libs/stream/src/tstreamFileState.c | 1 + 14 files changed, 172 insertions(+), 124 deletions(-) diff --git a/include/common/streamMsg.h b/include/common/streamMsg.h index 5696b592da..54f86a8d4e 100644 --- a/include/common/streamMsg.h +++ b/include/common/streamMsg.h @@ -41,6 +41,7 @@ typedef struct SStreamUpstreamEpInfo { SEpSet epSet; bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer + int64_t lastMsgId; } SStreamUpstreamEpInfo; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 0e7d56c001..348079702f 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2547,6 +2547,7 @@ int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf taskIdStr, flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId, pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version, pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName); + goto _exit; if (len >= size - 1) { goto _exit; } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 5acd06bbda..ff8553531a 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -551,7 +551,7 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { SWWorkerPool *pStreamCtrlPool = &pMgmt->streamCtrlPool; pStreamCtrlPool->name = "vnode-stream-ctrl"; - pStreamCtrlPool->max = 1; + pStreamCtrlPool->max = 4; if ((code = tWWorkerInit(pStreamCtrlPool)) != 0) return code; SWWorkerPool *pFPool = &pMgmt->fetchPool; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index d0131ab76a..2ac09b5639 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -864,7 +864,7 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI int32_t vgId = TD_VID(pVnode); int64_t suid = pTask->outputInfo.tbSink.stbUid; const char* id = pTask->id.idStr; - int32_t timeout = 300; // 5min + int32_t timeout = 60; // 1min int64_t start = taosGetTimestampSec(); while (pTableSinkInfo->uid == 0) { @@ -987,6 +987,8 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat if (code) { tqDebug("s-task:%s failed to build auto create table-name:%s, groupId:0x%" PRId64, id, dstTableName, groupId); return code; + } else { + tqDebug("s-task:%s no table name given, generated sub-table-name:%s, groupId:0x%" PRId64, id, dstTableName, groupId); } } else { if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) && diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 147d62d245..92c42bd777 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2942,11 +2942,11 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr qDebug("%s===stream===%s: Block is Empty. block type %d", taskIdStr, flag, pBlock->info.type); return; } - if (qDebugFlag & DEBUG_DEBUG) { + if (qDebugFlag & DEBUG_INFO) { char* pBuf = NULL; int32_t code = dumpBlockData(pBlock, flag, &pBuf, taskIdStr); if (code == 0) { - qDebug("%s", pBuf); + qInfo("%s", pBuf); taosMemoryFree(pBuf); } } @@ -2962,13 +2962,13 @@ void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr pBlock->info.version); return; } - if (qDebugFlag & DEBUG_DEBUG) { + if (qDebugFlag & DEBUG_INFO) { char* pBuf = NULL; char flagBuf[64]; snprintf(flagBuf, sizeof(flagBuf), "%s %s", flag, opStr); int32_t code = dumpBlockData(pBlock, flagBuf, &pBuf, taskIdStr); if (code == 0) { - qDebug("%s", pBuf); + qInfo("%s", pBuf); taosMemoryFree(pBuf); } } diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index eb262793ae..a8ca5f15de 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -949,7 +949,7 @@ void streamBackendCleanup(void* arg) { streamMutexDestroy(&pHandle->mutex); streamMutexDestroy(&pHandle->cfMutex); - stDebug("vgId:%d destroy stream backend:%p", (int32_t) pHandle->vgId, pHandle); + stDebug("vgId:%d destroy stream backend:%p", (int32_t)pHandle->vgId, pHandle); taosMemoryFree(pHandle); } @@ -3101,37 +3101,37 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfKe return rocksdb_create_iterator_cf(wrapper->db, *readOpt, ((rocksdb_column_family_handle_t**)wrapper->pCf)[idx]); } -#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ - do { \ - code = 0; \ - char buf[128] = {0}; \ - char* err = NULL; \ - int i = streamStateGetCfIdx(pState, funcname); \ - if (i < 0) { \ - stWarn("streamState failed to get cf name: %s", funcname); \ - code = TSDB_CODE_THIRDPARTY_ERROR; \ - break; \ - } \ - STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ - TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1)); \ - char toString[128] = {0}; \ - if (stDebugFlag & DEBUG_TRACE) TAOS_UNUSED((ginitDict[i].toStrFunc((void*)key, toString))); \ - int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ - rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \ - rocksdb_writeoptions_t* opts = wrapper->writeOpt; \ - rocksdb_t* db = wrapper->db; \ - char* ttlV = NULL; \ - int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ - rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \ - if (err != NULL) { \ - stError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ - taosMemoryFree(err); \ - code = TSDB_CODE_THIRDPARTY_ERROR; \ - } else { \ - stTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d, %p", toString, funcname, vLen, \ - ttlVLen, wrapper); \ - } \ - taosMemoryFree(ttlV); \ +#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \ + do { \ + code = 0; \ + char buf[128] = {0}; \ + char* err = NULL; \ + int i = streamStateGetCfIdx(pState, funcname); \ + if (i < 0) { \ + stWarn("streamState failed to get cf name: %s", funcname); \ + code = TSDB_CODE_THIRDPARTY_ERROR; \ + break; \ + } \ + STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; \ + TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1)); \ + char toString[128] = {0}; \ + TAOS_UNUSED((ginitDict[i].toStrFunc((void*)key, toString))); \ + int32_t klen = ginitDict[i].enFunc((void*)key, buf); \ + rocksdb_column_family_handle_t* pHandle = ((rocksdb_column_family_handle_t**)wrapper->pCf)[ginitDict[i].idx]; \ + rocksdb_writeoptions_t* opts = wrapper->writeOpt; \ + rocksdb_t* db = wrapper->db; \ + char* ttlV = NULL; \ + int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \ + rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \ + if (err != NULL) { \ + stError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \ + taosMemoryFree(err); \ + code = TSDB_CODE_THIRDPARTY_ERROR; \ + } else { \ + stInfo("[InternalERR] write streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d, %p", toString, \ + funcname, vLen, ttlVLen, wrapper); \ + } \ + taosMemoryFree(ttlV); \ } while (0); #define STREAM_STATE_GET_ROCKSDB(pState, funcname, key, pVal, vLen) \ @@ -4200,22 +4200,16 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* int32_t res = 0; SSessionKey tmpKey = *key; int32_t valSize = *pVLen; - void* tmp = taosMemoryMalloc(valSize); - if (!tmp) { - return -1; - } SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev_rocksdb(pState, key); int32_t code = streamStateSessionGetKVByCur_rocksdb(pState, pCur, key, pVal, pVLen); if (code == 0) { if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) { - memcpy(tmp, *pVal, valSize); goto _end; } void* stateKey = (char*)(*pVal) + (valSize - keyDataLen); if (fn(pKeyData, stateKey) == true) { - memcpy(tmp, *pVal, valSize); goto _end; } @@ -4230,7 +4224,6 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* if (code == 0) { void* stateKey = (char*)(*pVal) + (valSize - keyDataLen); if (fn(pKeyData, stateKey) == true) { - memcpy(tmp, *pVal, valSize); goto _end; } } @@ -4238,11 +4231,11 @@ int32_t streamStateStateAddIfNotExist_rocksdb(SStreamState* pState, SSessionKey* *key = tmpKey; res = 1; - memset(tmp, 0, valSize); _end: - taosMemoryFreeClear(*pVal); - *pVal = tmp; + if (res == 0 && valSize > *pVLen){ + stError("[InternalERR] [skey:%"PRId64 ",ekey:%"PRId64 ",groupId:%"PRIu64 "],valSize:%d bigger than get rocksdb len:%d", key->win.skey, key->win.ekey, key->groupId, valSize, *pVLen); + } streamStateFreeCur(pCur); return res; } @@ -4358,7 +4351,7 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi } int32_t streamStateDeleteParName_rocksdb(SStreamState* pState, int64_t groupId) { - int code = 0; + int code = 0; STREAM_STATE_DEL_ROCKSDB(pState, "parname", &groupId); return code; } @@ -4515,6 +4508,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb void* val, int32_t vlen, int64_t ttl, void* tmpBuf) { int32_t code = 0; char buf[128] = {0}; + char toString[128] = {0}; char* dst = NULL; size_t size = 0; @@ -4528,6 +4522,10 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb } } int32_t klen = ginitDict[cfIdx].enFunc((void*)key, buf); + + ginitDict[cfIdx].toStrFunc((void*)key, toString); + qInfo("[InternalERR] write cfIdx:%d key:%s vlen:%d", cfIdx, toString, vlen); + char* ttlV = tmpBuf; int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(dst, size, ttl, &ttlV); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 0e8cbb32bf..18895f465d 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -131,6 +131,7 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r code = tmsgSendReq(&pEpInfo->epSet, &rpcMsg); if (code != 0) { + rpcFreeCont(buf); stError("s-task:%s (child %d) failed to send retrieve req to task:0x%x (vgId:%d) QID:0x%" PRIx64 " code:%s", pTask->id.idStr, pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req->reqId, tstrerror(code)); } else { @@ -243,12 +244,13 @@ void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups) { void clearBufferedDispatchMsg(SStreamTask* pTask) { SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo; + + streamMutexLock(&pMsgInfo->lock); + if (pMsgInfo->pData != NULL) { destroyDispatchMsg(pMsgInfo->pData, streamTaskGetNumOfDownstream(pTask)); } - streamMutexLock(&pMsgInfo->lock); - pMsgInfo->checkpointId = -1; pMsgInfo->transId = -1; pMsgInfo->pData = NULL; @@ -527,6 +529,76 @@ static void cleanupInMonitor(int32_t taskId, int64_t taskRefId, void* param) { streamTaskFreeRefId(param); } +static int32_t sendFailedDispatchData(SStreamTask* pTask, int64_t now) { + int32_t code = 0; + const char* id = pTask->id.idStr; + SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo; + + streamMutexLock(&pMsgInfo->lock); + + int32_t msgId = pMsgInfo->msgId; + SStreamDispatchReq* pReq = pTask->msgInfo.pData; + + if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch to down streams, msgId:%d", id, pTask->info.selfChildId, + msgId); + + int32_t numOfRetry = 0; + for (int32_t i = 0; i < taosArrayGetSize(pTask->msgInfo.pSendInfo); ++i) { + SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, i); + if (pEntry == NULL) { + continue; + } + + if (pEntry->status == TSDB_CODE_SUCCESS && pEntry->rspTs > 0) { + continue; + } + + // downstream not rsp yet beyond threshold that is 10s + if (isDispatchRspTimeout(pEntry, now)) { // not respond yet beyonds 30s, re-send data + doSendFailedDispatch(pTask, pEntry, now, "timeout"); + numOfRetry += 1; + continue; + } + + // downstream inputQ is closed + if (pEntry->status == TASK_INPUT_STATUS__BLOCKED) { + doSendFailedDispatch(pTask, pEntry, now, "downstream inputQ blocked"); + numOfRetry += 1; + continue; + } + + // handle other errors + if (pEntry->status != TSDB_CODE_SUCCESS) { + doSendFailedDispatch(pTask, pEntry, now, "downstream error"); + numOfRetry += 1; + } + } + + stDebug("s-task:%s complete retry shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfRetry, + msgId); + } else { + int32_t dstVgId = pTask->outputInfo.fixedDispatcher.nodeId; + SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet; + int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; + + int32_t s = taosArrayGetSize(pTask->msgInfo.pSendInfo); + SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, 0); + if (pEntry != NULL) { + setResendInfo(pEntry, now); + code = doSendDispatchMsg(pTask, pReq, dstVgId, pEpSet); + + stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d, code:%s", id, + pTask->info.selfChildId, 1, downstreamTaskId, dstVgId, msgId, tstrerror(code)); + } else { + stError("s-task:%s invalid index 0, size:%d", id, s); + } + } + + streamMutexUnlock(&pMsgInfo->lock); + return code; +} + static void doMonitorDispatchData(void* param, void* tmrId) { int32_t code = 0; int64_t now = taosGetTimestampMs(); @@ -590,65 +662,7 @@ static void doMonitorDispatchData(void* param, void* tmrId) { return; } - { - SStreamDispatchReq* pReq = pTask->msgInfo.pData; - - if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch to down streams, msgId:%d", id, - pTask->info.selfChildId, msgId); - - int32_t numOfRetry = 0; - for (int32_t i = 0; i < taosArrayGetSize(pTask->msgInfo.pSendInfo); ++i) { - SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, i); - if (pEntry == NULL) { - continue; - } - - if (pEntry->status == TSDB_CODE_SUCCESS && pEntry->rspTs > 0) { - continue; - } - - // downstream not rsp yet beyond threshold that is 10s - if (isDispatchRspTimeout(pEntry, now)) { // not respond yet beyonds 30s, re-send data - doSendFailedDispatch(pTask, pEntry, now, "timeout"); - numOfRetry += 1; - continue; - } - - // downstream inputQ is closed - if (pEntry->status == TASK_INPUT_STATUS__BLOCKED) { - doSendFailedDispatch(pTask, pEntry, now, "downstream inputQ blocked"); - numOfRetry += 1; - continue; - } - - // handle other errors - if (pEntry->status != TSDB_CODE_SUCCESS) { - doSendFailedDispatch(pTask, pEntry, now, "downstream error"); - numOfRetry += 1; - } - } - - stDebug("s-task:%s complete retry shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, - numOfRetry, msgId); - } else { - int32_t dstVgId = pTask->outputInfo.fixedDispatcher.nodeId; - SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet; - int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; - - int32_t s = taosArrayGetSize(pTask->msgInfo.pSendInfo); - SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, 0); - if (pEntry != NULL) { - setResendInfo(pEntry, now); - code = doSendDispatchMsg(pTask, pReq, dstVgId, pEpSet); - - stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d, code:%s", id, - pTask->info.selfChildId, 1, downstreamTaskId, dstVgId, msgId, tstrerror(code)); - } else { - stError("s-task:%s invalid index 0, size:%d", id, s); - } - } - } + code = sendFailedDispatchData(pTask, now); if (streamTaskShouldStop(pTask)) { stDebug("s-task:%s should stop, abort from timer", pTask->id.idStr); @@ -887,7 +901,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { code = sendDispatchMsg(pTask, pTask->msgInfo.pData); - // todo: secure the timerActive and start timer in after lock pTask->lock + // todo: start timer in after lock pTask->lock streamMutexLock(&pTask->lock); bool shouldStop = streamTaskShouldStop(pTask); streamMutexUnlock(&pTask->lock); @@ -897,7 +911,6 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { } else { streamMutexLock(&pTask->msgInfo.lock); if (pTask->msgInfo.inMonitor == 0) { - // int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start dispatch monitor tmr in %dms, dispatch code:%s", id, DISPATCH_RETRY_INTERVAL_MS, tstrerror(code)); streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); @@ -1850,6 +1863,9 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S return TSDB_CODE_STREAM_TASK_NOT_EXIST; } + stDebug("s-task:%s lastMsgId:%"PRId64 " for upstream taskId:0x%x(vgId:%d)", id, pInfo->lastMsgId, pReq->upstreamTaskId, + pReq->upstreamNodeId); + if (pMeta->role == NODE_ROLE_FOLLOWER) { stError("s-task:%s task on follower received dispatch msgs, dispatch msg rejected", id); status = TASK_INPUT_STATUS__REFUSED; @@ -1874,7 +1890,21 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S stDebug("s-task:%s recv trans-state msgId:%d from upstream:0x%x", id, pReq->msgId, pReq->upstreamTaskId); } - status = streamTaskAppendInputBlocks(pTask, pReq); + if (pReq->msgId > pInfo->lastMsgId) { + status = streamTaskAppendInputBlocks(pTask, pReq); + if (status == TASK_INPUT_STATUS__NORMAL) { + stDebug("s-task:%s update the lastMsgId from %" PRId64 " to %d", id, pInfo->lastMsgId, pReq->msgId); + pInfo->lastMsgId = pReq->msgId; + } else { + stDebug("s-task:%s not update the lastMsgId, remain:%" PRId64, id, pInfo->lastMsgId); + } + } else { + stWarn( + "s-task:%s duplicate msgId:%d from upstream:0x%x discard and return succ, from vgId:%d already recv " + "msgId:%" PRId64, + id, pReq->msgId, pReq->upstreamTaskId, pReq->upstreamNodeId, pInfo->lastMsgId); + status = TASK_INPUT_STATUS__NORMAL; // still return success + } } } } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 1015917f61..67100ee51d 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -881,7 +881,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { double el = (taosGetTimestampMs() - st) / 1000.0; if (el > 2.0) { // elapsed more than 5 sec, not occupy the CPU anymore - stDebug("s-task:%s occupy more than 5.0s, release the exec threads and idle for 500ms", id); + stDebug("s-task:%s occupy more than 2.0s, release the exec threads and idle for 500ms", id); streamTaskSetIdleInfo(pTask, 500); return code; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 36ecf99b35..bc08f4f3d4 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1375,13 +1375,23 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader) pMeta->role = (isLeader) ? NODE_ROLE_LEADER : NODE_ROLE_FOLLOWER; if (!isLeader) { streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId); + } else { // wait for nodeep update if become leader from follower + if (prevStage == NODE_ROLE_FOLLOWER) { + pMeta->startInfo.tasksWillRestart = 1; + } } streamMetaWUnLock(pMeta); if (isLeader) { - stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb, rid:%" PRId64, - pMeta->vgId, stage, prevStage, isLeader, pMeta->rid); + if (prevStage == NODE_ROLE_FOLLOWER) { + stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb, rid:%" PRId64 + " restart after nodeEp being updated", + pMeta->vgId, stage, prevStage, isLeader, pMeta->rid); + } else { + stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb, rid:%" PRId64, + pMeta->vgId, stage, prevStage, isLeader, pMeta->rid); + } streamMetaStartHb(pMeta); } else { stInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d sendMsg beforeClosing:%d", pMeta->vgId, diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index d2d7c7b11b..ef4519bf60 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -149,10 +149,16 @@ SRowBuffPos* createSessionWinBuff(SStreamFileState* pFileState, SSessionKey* pKe memcpy(pNewPos->pKey, pKey, sizeof(SSessionKey)); pNewPos->needFree = true; pNewPos->beFlushed = true; + int32_t len = getRowStateRowSize(pFileState); if (p) { - memcpy(pNewPos->pRowBuff, p, *pVLen); + if (*pVLen > len){ + qError("[InternalERR] read key:[skey:%"PRId64 ",ekey:%"PRId64 ",groupId:%"PRIu64 "],session window buffer is too small, *pVLen:%d, len:%d", pKey->win.skey, pKey->win.ekey, pKey->groupId, *pVLen, len); + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + QUERY_CHECK_CODE(code, lino, _end); + }else{ + memcpy(pNewPos->pRowBuff, p, *pVLen); + } } else { - int32_t len = getRowStateRowSize(pFileState); memset(pNewPos->pRowBuff, 0, len); } diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index 5c15616ca0..bd9859ebb5 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -296,7 +296,6 @@ void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, SStreamMeta* pMeta = pTask->pMeta; SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; - // int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t code = streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false); if (code) { @@ -315,7 +314,6 @@ void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, i SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; if (streamTaskShouldStop(pTask)) { // record the failure - // int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:0x%" PRIx64 " stopped, not launch rel history task:0x%" PRIx64, pInfo->id.taskId, pInfo->hTaskId.taskId); diff --git a/source/libs/stream/src/streamStartTask.c b/source/libs/stream/src/streamStartTask.c index 28df04adc8..13cf4a41cc 100644 --- a/source/libs/stream/src/streamStartTask.c +++ b/source/libs/stream/src/streamStartTask.c @@ -465,7 +465,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas } int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { - streamMetaRLock(pMeta); + streamMetaWLock(pMeta); SArray* pTaskList = NULL; int32_t num = taosArrayGetSize(pMeta->pTaskList); @@ -473,7 +473,7 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { if (num == 0) { stDebug("vgId:%d stop all %d task(s) completed, elapsed time:0 Sec.", pMeta->vgId, num); - streamMetaRUnLock(pMeta); + streamMetaWUnLock(pMeta); return TSDB_CODE_SUCCESS; } @@ -482,7 +482,7 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { // send hb msg to mnode before closing all tasks. int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList); if (code != TSDB_CODE_SUCCESS) { - streamMetaRUnLock(pMeta); + streamMetaWUnLock(pMeta); return code; } @@ -509,7 +509,7 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { double el = (taosGetTimestampMs() - st) / 1000.0; stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, num, el); - streamMetaRUnLock(pMeta); + streamMetaWUnLock(pMeta); return code; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index e4e8a37b37..8e6ff0ba65 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -100,6 +100,7 @@ static SStreamUpstreamEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) { pEpInfo->nodeId = pTask->info.nodeId; pEpInfo->taskId = pTask->id.taskId; pEpInfo->stage = -1; + pEpInfo->lastMsgId = -1; return pEpInfo; } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 0255ac01b5..239711fde7 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -960,6 +960,7 @@ int32_t recoverSession(SStreamFileState* pFileState, int64_t ckId) { if (vlen != pFileState->rowSize) { code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + qError("[InternalERR] read key:[skey:%"PRId64 ",ekey:%"PRId64 ",groupId:%"PRIu64 "],vlen:%d, rowSize:%d", key.win.skey, key.win.ekey, key.groupId, vlen, pFileState->rowSize); QUERY_CHECK_CODE(code, lino, _end); }