From 3314a5eaa7a70e7253769c6e20dd770a170f71d6 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 9 Aug 2024 16:56:13 +0800 Subject: [PATCH 1/4] fix(lrucache): return fail if array init failed --- source/util/src/tlrucache.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/source/util/src/tlrucache.c b/source/util/src/tlrucache.c index 5d28eeafb1..83e0e2d811 100644 --- a/source/util/src/tlrucache.c +++ b/source/util/src/tlrucache.c @@ -384,6 +384,9 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry * bool freeOnFail) { LRUStatus status = TAOS_LRU_STATUS_OK; SArray *lastReferenceList = taosArrayInit(16, POINTER_BYTES); + if (!lastReferenceList) { + return TAOS_LRU_STATUS_FAIL; + } (void)taosThreadMutexLock(&shard->mutex); From 5ff63974166c4f199d203034efcae6927f7e842b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 12 Aug 2024 10:25:47 +0800 Subject: [PATCH 2/4] fix(stream): update return value check. --- source/dnode/mnode/impl/src/mndStream.c | 41 +++++++++++++++------- source/dnode/vnode/src/tqCommon/tqCommon.c | 4 +-- 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 20f0e7b105..35da6c379f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -419,13 +419,8 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, int32_t nullIndex = 0; int32_t dataIndex = 0; - for (int16_t i = 0; i < pObj->outputSchema.nCols; i++) { - SColLocation *pos = taosArrayGet(pCreate->fillNullCols, nullIndex); - if (pos == NULL) { - continue; - } - - if (nullIndex >= numOfNULL || i < pos->slotId) { + for (int32_t i = 0; i < pObj->outputSchema.nCols; i++) { + if (nullIndex >= numOfNULL) { pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes; pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId; pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags; @@ -433,14 +428,34 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type; dataIndex++; } else { - pFullSchema[i].bytes = 0; - pFullSchema[i].colId = pos->colId; - pFullSchema[i].flags = COL_SET_NULL; - memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN); - pFullSchema[i].type = pos->type; - nullIndex++; + SColLocation *pos = NULL; + if (nullIndex < taosArrayGetSize(pCreate->fillNullCols)) { + pos = taosArrayGet(pCreate->fillNullCols, nullIndex); + } + + if (pos == NULL) { + mError("invalid null column index, %d", nullIndex); + continue; + } + + if (i < pos->slotId) { + pFullSchema[i].bytes = pObj->outputSchema.pSchema[dataIndex].bytes; + pFullSchema[i].colId = i + 1; // pObj->outputSchema.pSchema[dataIndex].colId; + pFullSchema[i].flags = pObj->outputSchema.pSchema[dataIndex].flags; + strcpy(pFullSchema[i].name, pObj->outputSchema.pSchema[dataIndex].name); + pFullSchema[i].type = pObj->outputSchema.pSchema[dataIndex].type; + dataIndex++; + } else { + pFullSchema[i].bytes = 0; + pFullSchema[i].colId = pos->colId; + pFullSchema[i].flags = COL_SET_NULL; + memset(pFullSchema[i].name, 0, TSDB_COL_NAME_LEN); + pFullSchema[i].type = pos->type; + nullIndex++; + } } } + taosMemoryFree(pObj->outputSchema.pSchema); pObj->outputSchema.pSchema = pFullSchema; } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 7164c7f543..09acfcbafc 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -1244,8 +1244,8 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { // discard the rsp, since it is expired. if (req.startTs < pTask->execInfo.created) { tqWarn("s-task:%s vgId:%d create time:%" PRId64 " recv expired consensus checkpointId:%" PRId64 - " from task createTs:%" PRId64 ", discard", - pTask->id.idStr, pMeta->vgId, pTask->execInfo.created, req.checkpointId, req.startTs); + " from task createTs:%" PRId64 " < task createTs:%" PRId64 ", discard", + pTask->id.idStr, pMeta->vgId, pTask->execInfo.created, req.checkpointId, req.startTs, pTask->execInfo.created); streamMetaAddFailedTaskSelf(pTask, now); streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; From 3b718371f72b21a705834d7adeaf9bd3565ca945 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 12 Aug 2024 10:47:45 +0800 Subject: [PATCH 3/4] change chan opt --- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 8186fcd4e9..8b289a5a57 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -387,7 +387,7 @@ int32_t dmInitClient(SDnode *pDnode) { rpcInit.supportBatch = 1; rpcInit.batchSize = 8 * 1024; rpcInit.timeToGetConn = tsTimeToGetAvailableConn; - rpcInit.notWaitAvaliableConn = 1; + rpcInit.notWaitAvaliableConn = 0; (void)taosVersionStrToInt(version, &(rpcInit.compatibilityVer)); From 0d8cda26cdaa289e32f7a03252e1e41562ce9b35 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 12 Aug 2024 10:53:25 +0800 Subject: [PATCH 4/4] fix(stream): reset the error code. --- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 2ad5464c31..9829d5ab3a 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -126,7 +126,7 @@ static void vmProcessStreamQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { dGTrace("vgId:%d, msg:%p get from vnode-stream queue", pVnode->vgId, pMsg); int32_t code = vnodeProcessStreamMsg(pVnode->pImpl, pMsg, pInfo); if (code != 0) { - if (terrno != 0) code = terrno; + terrno = code; dGError("vgId:%d, msg:%p failed to process stream msg %s since %s", pVnode->vgId, pMsg, TMSG_INFO(pMsg->msgType), tstrerror(code)); vmSendRsp(pMsg, code);