diff --git a/include/common/streamMsg.h b/include/common/streamMsg.h index 55f3a53e15..3203538ef2 100644 --- a/include/common/streamMsg.h +++ b/include/common/streamMsg.h @@ -79,10 +79,12 @@ typedef struct SStreamTaskNodeUpdateMsg { int64_t streamId; int32_t taskId; SArray* pNodeList; // SArray + SArray* pTaskList; // SArray, taskId list } SStreamTaskNodeUpdateMsg; int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg); int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg); +void tDestroyNodeUpdateMsg(SStreamTaskNodeUpdateMsg* pMsg); typedef struct { int64_t reqId; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 237a046aa8..025f45086b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -522,6 +522,7 @@ typedef struct STaskStartInfo { typedef struct STaskUpdateInfo { SHashObj* pTasks; + SArray* pTaskList; int32_t activeTransId; int32_t completeTransId; int64_t completeTs; diff --git a/source/common/src/msg/streamMsg.c b/source/common/src/msg/streamMsg.c index 3b42c578f8..09088d0bca 100644 --- a/source/common/src/msg/streamMsg.c +++ b/source/common/src/msg/streamMsg.c @@ -143,6 +143,18 @@ int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpda // todo this new attribute will be result in being incompatible with previous version TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pMsg->transId)); + + int32_t numOfTasks = taosArrayGetSize(pMsg->pTaskList); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, numOfTasks)); + + for (int32_t i = 0; i < numOfTasks; ++i) { + int32_t* pId = taosArrayGet(pMsg->pTaskList, i); + if (pId == NULL) { + TAOS_CHECK_EXIT(terrno); + } + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, *(int32_t*)pId)); + } + tEndEncode(pEncoder); _exit: if (code) { @@ -162,10 +174,10 @@ int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* int32_t size = 0; TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size)); + pMsg->pNodeList = taosArrayInit(size, sizeof(SNodeUpdateInfo)); - if (pMsg->pNodeList == NULL) { - TAOS_CHECK_EXIT(terrno); - } + TSDB_CHECK_NULL(pMsg->pNodeList, code, lino, _exit, terrno); + for (int32_t i = 0; i < size; ++i) { SNodeUpdateInfo info = {0}; TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info.nodeId)); @@ -179,11 +191,33 @@ int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pMsg->transId)); + // number of tasks + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &size)); + pMsg->pTaskList = taosArrayInit(size, sizeof(int32_t)); + if (pMsg->pTaskList == NULL) { + TAOS_CHECK_EXIT(terrno); + } + + for (int32_t i = 0; i < size; ++i) { + int32_t id = 0; + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &id)); + if (taosArrayPush(pMsg->pTaskList, &id) == NULL) { + TAOS_CHECK_EXIT(terrno); + } + } + tEndDecode(pDecoder); _exit: return code; } +void tDestroyNodeUpdateMsg(SStreamTaskNodeUpdateMsg* pMsg) { + taosArrayDestroy(pMsg->pNodeList); + taosArrayDestroy(pMsg->pTaskList); + pMsg->pNodeList = NULL; + pMsg->pTaskList = NULL; +} + int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) { int32_t code = 0; int32_t lino; diff --git a/source/dnode/mnode/impl/src/mndStreamTransAct.c b/source/dnode/mnode/impl/src/mndStreamTransAct.c index 45718f2d26..c210b12a67 100644 --- a/source/dnode/mnode/impl/src/mndStreamTransAct.c +++ b/source/dnode/mnode/impl/src/mndStreamTransAct.c @@ -145,7 +145,7 @@ static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask return 0; } -static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SStreamTaskId *pId, +static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChangeInfo *pInfo, SArray* pTaskList, SStreamTaskId *pId, int32_t transId) { int32_t code = 0; @@ -158,6 +158,8 @@ static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChang code = terrno; } + pMsg->pTaskList = pTaskList; + if (code == 0) { void *p = taosArrayAddAll(pMsg->pNodeList, pInfo->pUpdateNodeList); if (p == NULL) { @@ -166,10 +168,10 @@ static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChang } } -static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId, +static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, SArray* pList, int32_t nodeId, SStreamTaskId *pId, int32_t transId) { SStreamTaskNodeUpdateMsg req = {0}; - initNodeUpdateMsg(&req, pInfo, pId, transId); + initNodeUpdateMsg(&req, pInfo, pList, pId, transId); int32_t code = 0; int32_t blen; @@ -177,7 +179,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha tEncodeSize(tEncodeStreamTaskUpdateMsg, &req, blen, code); if (code < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; - taosArrayDestroy(req.pNodeList); + tDestroyNodeUpdateMsg(&req); return terrno; } @@ -185,7 +187,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha void *buf = taosMemoryMalloc(tlen); if (buf == NULL) { - taosArrayDestroy(req.pNodeList); + tDestroyNodeUpdateMsg(&req); return terrno; } @@ -196,7 +198,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha if (code == -1) { tEncoderClear(&encoder); taosMemoryFree(buf); - taosArrayDestroy(req.pNodeList); + tDestroyNodeUpdateMsg(&req); return code; } @@ -209,7 +211,27 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha *pBuf = buf; *pLen = tlen; - taosArrayDestroy(req.pNodeList); + tDestroyNodeUpdateMsg(&req); + return TSDB_CODE_SUCCESS; +} + +// todo: set the task id list for a given nodeId +static int32_t createUpdateTaskList(int32_t vgId, SArray* pList) { + for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { + STaskId *p = taosArrayGet(execInfo.pTaskList, i); + if (p == NULL) { + continue; + } + + STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); + if (pe->nodeId == vgId) { + void *pRet = taosArrayPush(pList, &pe->id.taskId); + if (pRet == NULL) { + return terrno; + } + } + } + return TSDB_CODE_SUCCESS; } @@ -218,11 +240,23 @@ static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask int32_t len = 0; SEpSet epset = {0}; bool hasEpset = false; + SArray* pTaskList = taosArrayInit(4, sizeof(int32_t)); + if (pTaskList == NULL) { + return terrno; + } - bool unusedRet = streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); - int32_t code = doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id); + bool unusedRet = streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); + int32_t code = createUpdateTaskList(pTask->info.nodeId, pTaskList); + if (code != 0) { + taosArrayDestroy(pTaskList); + return code; + } + + // pTaskList already freed here + code = doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTaskList,pTask->info.nodeId, &pTask->id, pTrans->id); if (code) { mError("failed to build stream task epset update msg, code:%s", tstrerror(code)); + taosMemoryFree(pBuf); return code; } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 92b6300beb..de3722267f 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -170,29 +170,29 @@ int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream // this is to process request from transaction, always return true. int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored, bool isLeader) { - int32_t vgId = pMeta->vgId; - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; - int64_t st = taosGetTimestampMs(); - bool updated = false; - int32_t code = 0; - SStreamTask* pTask = NULL; - SStreamTask* pHTask = NULL; - + int32_t vgId = pMeta->vgId; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; + int64_t st = taosGetTimestampMs(); + bool updated = false; + int32_t code = 0; + SStreamTask* pTask = NULL; + SStreamTask* pHTask = NULL; SStreamTaskNodeUpdateMsg req = {0}; + SDecoder decoder; - SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, len); - if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) { + code = tDecodeStreamTaskUpdateMsg(&decoder, &req); + tDecoderClear(&decoder); + + if (code < 0) { rsp.code = TSDB_CODE_MSG_DECODE_ERROR; tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code)); - tDecoderClear(&decoder); + tDestroyNodeUpdateMsg(&req); return rsp.code; } - tDecoderClear(&decoder); - int32_t gError = streamGetFatalError(pMeta); if (gError != 0) { tqError("vgId:%d global fatal occurs, code:%s, ts:%" PRId64 " func:%s", pMeta->vgId, tstrerror(gError), diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index b4fa3a29ed..4ed0dd1749 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -382,6 +382,31 @@ void streamMetaRemoveDB(void* arg, char* key) { streamMutexUnlock(&pMeta->backendMutex); } +int32_t streamMetaUpdateInfoInit(STaskUpdateInfo* pInfo) { + _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR); + + pInfo->pTasks = taosHashInit(64, fp, false, HASH_NO_LOCK); + if (pInfo->pTasks == NULL) { + return terrno; + } + + pInfo->pTaskList = taosArrayInit(4, sizeof(int32_t)); + if (pInfo->pTaskList == NULL) { + return terrno; + } + + return TSDB_CODE_SUCCESS; +} + +void streamMetaUpdateInfoCleanup(STaskUpdateInfo* pInfo) { + taosHashCleanup(pInfo->pTasks); + taosArrayDestroy(pInfo->pTaskList); + pInfo->pTasks = NULL; + pInfo->pTaskList = NULL; +} + + + int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, FTaskExpand expandTaskFn, int32_t vgId, int64_t stage, startComplete_fn_t fn, SStreamMeta** p) { QRY_PARAM_CHECK(p); @@ -435,8 +460,8 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, pMeta->pTasksMap = taosHashInit(64, fp, true, HASH_NO_LOCK); TSDB_CHECK_NULL(pMeta->pTasksMap, code, lino, _err, terrno); - pMeta->updateInfo.pTasks = taosHashInit(64, fp, false, HASH_NO_LOCK); - TSDB_CHECK_NULL(pMeta->updateInfo.pTasks, code, lino, _err, terrno); + code = streamMetaUpdateInfoInit(&pMeta->updateInfo); + TSDB_CHECK_CODE(code, lino, _err); code = streamMetaInitStartInfo(&pMeta->startInfo); TSDB_CHECK_CODE(code, lino, _err); @@ -641,8 +666,8 @@ void streamMetaCloseImpl(void* arg) { taosHashCleanup(pMeta->pTasksMap); taosHashCleanup(pMeta->pTaskDbUnique); - taosHashCleanup(pMeta->updateInfo.pTasks); + streamMetaUpdateInfoCleanup(&pMeta->updateInfo); streamMetaClearStartInfo(&pMeta->startInfo); destroyMetaHbInfo(pMeta->pHbInfo); @@ -1492,16 +1517,19 @@ void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SSt void streamMetaClearSetUpdateTaskListComplete(SStreamMeta* pMeta) { STaskUpdateInfo* pInfo = &pMeta->updateInfo; + int32_t num = taosArrayGetSize(pInfo->pTaskList); taosHashClear(pInfo->pTasks); + taosArrayClear(pInfo->pTaskList); int32_t prev = pInfo->completeTransId; pInfo->completeTransId = pInfo->activeTransId; pInfo->activeTransId = -1; pInfo->completeTs = taosGetTimestampMs(); - stDebug("vgId:%d set the nodeEp update complete, ts:%" PRId64 ", complete transId:%d->%d, reset active transId", - pMeta->vgId, pInfo->completeTs, prev, pInfo->completeTransId); + stDebug("vgId:%d set the nodeEp update complete, ts:%" PRId64 + ", complete transId:%d->%d, update Tasks:%d reset active transId", + pMeta->vgId, pInfo->completeTs, prev, pInfo->completeTransId, num); } bool streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId) {