From 5ff63974166c4f199d203034efcae6927f7e842b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 12 Aug 2024 10:25:47 +0800 Subject: [PATCH] fix(stream): update return value check. --- source/dnode/mnode/impl/src/mndStream.c | 41 +++++++++++++++------- source/dnode/vnode/src/tqCommon/tqCommon.c | 4 +-- 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 20f0e7b105..35da6c379f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -419,13 +419,8 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, int32_t nullIndex = 0; int32_t dataIndex = 0; - for (int16_t i = 0; i < pObj->outputSchema.nCols; i++) { - SColLocation *pos = taosArrayGet(pCreate->fillNullCols, nullIndex); - if (pos == NULL) { - continue; - } - - if (nullIndex >= numOfNULL || i < pos->slotId) { + for (int32_t i = 0; i < pObj->outputSchema.nCols; i++) { + if (nullIndex >= numOfNULL) { pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes; pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId; pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags; @@ -433,14 +428,34 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type; dataIndex++; } else { - pFullSchema[i].bytes = 0; - pFullSchema[i].colId = pos->colId; - pFullSchema[i].flags = COL_SET_NULL; - memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN); - pFullSchema[i].type = pos->type; - nullIndex++; + SColLocation *pos = NULL; + if (nullIndex < taosArrayGetSize(pCreate->fillNullCols)) { + pos = taosArrayGet(pCreate->fillNullCols, nullIndex); + } + + if (pos == NULL) { + mError("invalid null column index, %d", nullIndex); + continue; + } + + if (i < pos->slotId) { + pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes; + pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId; + pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags; + strcpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name); + pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type; + dataIndex++; + } else { + pFullSchema[i].bytes = 0; + pFullSchema[i].colId = pos->colId; + pFullSchema[i].flags = COL_SET_NULL; + memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN); + pFullSchema[i].type = pos->type; + nullIndex++; + } } } + taosMemoryFree(pObj->outputSchema.pSchema); pObj->outputSchema.pSchema = pFullSchema; } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 7164c7f543..09acfcbafc 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -1244,8 +1244,8 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { // discard the rsp, since it is expired. if (req.startTs < pTask->execInfo.created) { tqWarn("s-task:%s vgId:%d create time:%" PRId64 " recv expired consensus checkpointId:%" PRId64 - " from task createTs:%" PRId64 ", discard", - pTask->id.idStr, pMeta->vgId, pTask->execInfo.created, req.checkpointId, req.startTs); + " from task createTs:%" PRId64 " < task createTs:%" PRId64 ", discard", + pTask->id.idStr, pMeta->vgId, pTask->execInfo.created, req.checkpointId, req.startTs, pTask->execInfo.created); streamMetaAddFailedTaskSelf(pTask, now); streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS;