diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9c59e3f3ec..5e7f2bf0a6 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -473,7 +473,9 @@ typedef struct STaskStartInfo { typedef struct STaskUpdateInfo { SHashObj* pTasks; - int32_t transId; + int32_t activeTransId; + int32_t completeTransId; + int64_t completeTs; } STaskUpdateInfo; typedef struct SScanWalInfo { @@ -753,8 +755,8 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs); void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId, int64_t startTs); -void streamMetaClearUpdateTaskList(SStreamMeta* pMeta); -void streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId); +void streamMetaClearSetUpdateTaskListComplete(SStreamMeta* pMeta); +bool streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId); void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta); diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index cc1ed7f3fa..4dea9c17b0 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -297,6 +297,9 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) { } if (strlen(usedbRsp.db) == 0) { + taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); + if (usedbRsp.errCode != 0) { return usedbRsp.errCode; } else { @@ -366,9 +369,15 @@ int32_t processUseDbRsp(void* param, SDataBuf* pMsg, int32_t code) { } int32_t processCreateSTableRsp(void* param, SDataBuf* pMsg, int32_t code) { - if (pMsg == NULL || param == NULL) { + if (pMsg == NULL) { return TSDB_CODE_TSC_INVALID_INPUT; } + if (param == NULL) { + taosMemoryFree(pMsg->pEpSet); + taosMemoryFree(pMsg->pData); + return TSDB_CODE_TSC_INVALID_INPUT; + } + SRequestObj* pRequest = param; if (code != TSDB_CODE_SUCCESS) { diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 0224a20109..91883869e9 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -825,15 +825,17 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) { int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { if (code != 0){ - return code; + goto _return; } if (pMsg == NULL || param == NULL) { - return TSDB_CODE_INVALID_PARA; + code = TSDB_CODE_INVALID_PARA; + goto _return; } + SMqHbRsp rsp = {0}; code = tDeserializeSMqHbRsp(pMsg->pData, pMsg->len, &rsp); if (code != 0) { - return code; + goto _return; } int64_t refId = (int64_t)param; @@ -856,10 +858,14 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { taosWUnLockLatch(&tmq->lock); (void)taosReleaseRef(tmqMgmt.rsetId, refId); } + tDestroySMqHbRsp(&rsp); + +_return: + taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); - return 0; + return code; } void tmqSendHbReq(void* param, void* tmrId) { @@ -1441,7 +1447,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { while ((code = syncAskEp(tmq)) != 0) { if (retryCnt++ > MAX_RETRY_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) { tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes, code:%s", - tmq->consumerId, strerror(code)); + tmq->consumerId, tstrerror(code)); if (code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) { code = 0; } @@ -1511,19 +1517,29 @@ static void setVgIdle(tmq_t* tmq, char* topicName, int32_t vgId) { int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tmq_t* tmq = NULL; + SMqPollRspWrapper* pRspWrapper = NULL; + int8_t rspType = 0; + int32_t vgId = 0; + uint64_t requestId = 0; SMqPollCbParam* pParam = (SMqPollCbParam*)param; - if (pParam == NULL || pMsg == NULL) { + if (pMsg == NULL) { + return TSDB_CODE_TSC_INTERNAL_ERROR; + } + if (pParam == NULL) { + taosMemoryFreeClear(pMsg->pData); + taosMemoryFreeClear(pMsg->pEpSet); return TSDB_CODE_TSC_INTERNAL_ERROR; } int64_t refId = pParam->refId; - int32_t vgId = pParam->vgId; - uint64_t requestId = pParam->requestId; + vgId = pParam->vgId; + requestId = pParam->requestId; tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) { + taosMemoryFreeClear(pMsg->pData); + taosMemoryFreeClear(pMsg->pEpSet); return TSDB_CODE_TMQ_CONSUMER_CLOSED; } - SMqPollRspWrapper* pRspWrapper = NULL; int32_t ret = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0, (void**)&pRspWrapper); if (ret) { code = ret; @@ -1554,7 +1570,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { ASSERT(msgEpoch == clientEpoch); // handle meta rsp - int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType; + rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType; pRspWrapper->tmqRspType = rspType; pRspWrapper->reqId = requestId; pRspWrapper->pEpset = pMsg->pEpSet; @@ -1622,7 +1638,7 @@ END: } int32_t total = taosQueueItemSize(tmq->mqueue); tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64, - tmq->consumerId, rspType, vgId, total, requestId); + tmq ? tmq->consumerId : 0, rspType, vgId, total, requestId); if (tmq) (void)tsem2_post(&tmq->rspSem); if (pMsg) taosMemoryFreeClear(pMsg->pData); @@ -2817,7 +2833,10 @@ end: } int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { - if (param == NULL) return code; + if (param == NULL) { + goto FAIL; + } + SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId); if (tmq == NULL) { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 4dc59bf6fe..f02144a1d1 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -70,24 +70,24 @@ } while (0) static int32_t tSerializeSMonitorParas(SEncoder *encoder, const SMonitorParas *pMonitorParas) { - if (tEncodeI8(encoder, pMonitorParas->tsEnableMonitor) < 0) return -1; - if (tEncodeI32(encoder, pMonitorParas->tsMonitorInterval) < 0) return -1; - if (tEncodeI32(encoder, pMonitorParas->tsSlowLogScope) < 0) return -1; - if (tEncodeI32(encoder, pMonitorParas->tsSlowLogMaxLen) < 0) return -1; - if (tEncodeI32(encoder, pMonitorParas->tsSlowLogThreshold) < 0) return -1; - if (tEncodeI32(encoder, pMonitorParas->tsSlowLogThresholdTest) < 0) return -1; - if (tEncodeCStr(encoder, pMonitorParas->tsSlowLogExceptDb) < 0) return -1; + TAOS_CHECK_RETURN(tEncodeI8(encoder, pMonitorParas->tsEnableMonitor)); + TAOS_CHECK_RETURN(tEncodeI32(encoder, pMonitorParas->tsMonitorInterval)); + TAOS_CHECK_RETURN(tEncodeI32(encoder, pMonitorParas->tsSlowLogScope)); + TAOS_CHECK_RETURN(tEncodeI32(encoder, pMonitorParas->tsSlowLogMaxLen)); + TAOS_CHECK_RETURN(tEncodeI32(encoder, pMonitorParas->tsSlowLogThreshold)); + TAOS_CHECK_RETURN(tEncodeI32(encoder, pMonitorParas->tsSlowLogThresholdTest)); + TAOS_CHECK_RETURN(tEncodeCStr(encoder, pMonitorParas->tsSlowLogExceptDb)); return 0; } static int32_t tDeserializeSMonitorParas(SDecoder *decoder, SMonitorParas *pMonitorParas) { - if (tDecodeI8(decoder, (int8_t *)&pMonitorParas->tsEnableMonitor) < 0) return -1; - if (tDecodeI32(decoder, &pMonitorParas->tsMonitorInterval) < 0) return -1; - if (tDecodeI32(decoder, &pMonitorParas->tsSlowLogScope) < 0) return -1; - if (tDecodeI32(decoder, &pMonitorParas->tsSlowLogMaxLen) < 0) return -1; - if (tDecodeI32(decoder, &pMonitorParas->tsSlowLogThreshold) < 0) return -1; - if (tDecodeI32(decoder, &pMonitorParas->tsSlowLogThresholdTest) < 0) return -1; - if (tDecodeCStrTo(decoder, pMonitorParas->tsSlowLogExceptDb) < 0) return -1; + TAOS_CHECK_RETURN(tDecodeI8(decoder, (int8_t *)&pMonitorParas->tsEnableMonitor)); + TAOS_CHECK_RETURN(tDecodeI32(decoder, &pMonitorParas->tsMonitorInterval)); + TAOS_CHECK_RETURN(tDecodeI32(decoder, &pMonitorParas->tsSlowLogScope)); + TAOS_CHECK_RETURN(tDecodeI32(decoder, &pMonitorParas->tsSlowLogMaxLen)); + TAOS_CHECK_RETURN(tDecodeI32(decoder, &pMonitorParas->tsSlowLogThreshold)); + TAOS_CHECK_RETURN(tDecodeI32(decoder, &pMonitorParas->tsSlowLogThresholdTest)); + TAOS_CHECK_RETURN(tDecodeCStrTo(decoder, pMonitorParas->tsSlowLogExceptDb)); return 0; } @@ -98,8 +98,7 @@ static int32_t tDecodeTableTSMAInfoRsp(SDecoder *pDecoder, STableTSMAInfoRsp *pR int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) { if (pMsg == NULL) { - terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; - return -1; + return terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; } pIter->totalLen = htonl(pMsg->length); @@ -108,8 +107,7 @@ int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) { pIter->len = 0; pIter->pMsg = pMsg; if (pIter->totalLen <= sizeof(SSubmitReq)) { - terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; - return -1; + return terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; } return 0; @@ -130,9 +128,8 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { } if (pIter->len > pIter->totalLen) { - terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; *pPBlock = NULL; - return -1; + return terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; } if (pIter->len == pIter->totalLen) { @@ -193,46 +190,46 @@ int32_t tPrintFixedSchemaSubmitReq(SSubmitReq *pReq, STSchema *pTschema) { #endif int32_t tEncodeSEpSet(SEncoder *pEncoder, const SEpSet *pEp) { - if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1; - if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1; + TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pEp->inUse)); + TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pEp->numOfEps)); for (int32_t i = 0; i < TSDB_MAX_REPLICA; i++) { - if (tEncodeU16(pEncoder, pEp->eps[i].port) < 0) return -1; - if (tEncodeCStrWithLen(pEncoder, pEp->eps[i].fqdn, TSDB_FQDN_LEN) < 0) return -1; + TAOS_CHECK_RETURN(tEncodeU16(pEncoder, pEp->eps[i].port)); + TAOS_CHECK_RETURN(tEncodeCStrWithLen(pEncoder, pEp->eps[i].fqdn, TSDB_FQDN_LEN)); } return 0; } int32_t tDecodeSEpSet(SDecoder *pDecoder, SEpSet *pEp) { - if (tDecodeI8(pDecoder, &pEp->inUse) < 0) return -1; - if (tDecodeI8(pDecoder, &pEp->numOfEps) < 0) return -1; + TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pEp->inUse)); + TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pEp->numOfEps)); for (int32_t i = 0; i < TSDB_MAX_REPLICA; i++) { - if (tDecodeU16(pDecoder, &pEp->eps[i].port) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pEp->eps[i].fqdn) < 0) return -1; + TAOS_CHECK_RETURN(tDecodeU16(pDecoder, &pEp->eps[i].port)); + TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pEp->eps[i].fqdn)); } return 0; } int32_t tEncodeSQueryNodeAddr(SEncoder *pEncoder, SQueryNodeAddr *pAddr) { - if (tEncodeI32(pEncoder, pAddr->nodeId) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pAddr->epSet) < 0) return -1; + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pAddr->nodeId)); + TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pAddr->epSet)); return 0; } int32_t tEncodeSQueryNodeLoad(SEncoder *pEncoder, SQueryNodeLoad *pLoad) { - if (tEncodeSQueryNodeAddr(pEncoder, &pLoad->addr) < 0) return -1; - if (tEncodeU64(pEncoder, pLoad->load) < 0) return -1; + TAOS_CHECK_RETURN(tEncodeSQueryNodeAddr(pEncoder, &pLoad->addr)); + TAOS_CHECK_RETURN(tEncodeU64(pEncoder, pLoad->load)); return 0; } int32_t tDecodeSQueryNodeAddr(SDecoder *pDecoder, SQueryNodeAddr *pAddr) { - if (tDecodeI32(pDecoder, &pAddr->nodeId) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &pAddr->epSet) < 0) return -1; + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pAddr->nodeId)); + TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pAddr->epSet)); return 0; } int32_t tDecodeSQueryNodeLoad(SDecoder *pDecoder, SQueryNodeLoad *pLoad) { - if (tDecodeSQueryNodeAddr(pDecoder, &pLoad->addr) < 0) return -1; - if (tDecodeU64(pDecoder, &pLoad->load) < 0) return -1; + TAOS_CHECK_RETURN(tDecodeSQueryNodeAddr(pDecoder, &pLoad->addr)); + TAOS_CHECK_RETURN(tDecodeU64(pDecoder, &pLoad->load)); return 0; } @@ -258,63 +255,63 @@ void *taosDecodeSEpSet(const void *buf, SEpSet *pEp) { } static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pReq) { - if (tEncodeSClientHbKey(pEncoder, &pReq->connKey) < 0) return -1; + TAOS_CHECK_RETURN(tEncodeSClientHbKey(pEncoder, &pReq->connKey)); if (pReq->connKey.connType == CONN_TYPE__QUERY) { - if (tEncodeI64(pEncoder, pReq->app.appId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->app.pid) < 0) return -1; - if (tEncodeCStr(pEncoder, pReq->app.name) < 0) return -1; - if (tEncodeI64(pEncoder, pReq->app.startTime) < 0) return -1; - if (tEncodeU64(pEncoder, pReq->app.summary.numOfInsertsReq) < 0) return -1; - if (tEncodeU64(pEncoder, pReq->app.summary.numOfInsertRows) < 0) return -1; - if (tEncodeU64(pEncoder, pReq->app.summary.insertElapsedTime) < 0) return -1; - if (tEncodeU64(pEncoder, pReq->app.summary.insertBytes) < 0) return -1; - if (tEncodeU64(pEncoder, pReq->app.summary.fetchBytes) < 0) return -1; - if (tEncodeU64(pEncoder, pReq->app.summary.queryElapsedTime) < 0) return -1; - if (tEncodeU64(pEncoder, pReq->app.summary.numOfSlowQueries) < 0) return -1; - if (tEncodeU64(pEncoder, pReq->app.summary.totalRequests) < 0) return -1; - if (tEncodeU64(pEncoder, pReq->app.summary.currentRequests) < 0) return -1; + TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->app.appId)); + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->app.pid)); + TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pReq->app.name)); + TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->app.startTime)); + TAOS_CHECK_RETURN(tEncodeU64(pEncoder, pReq->app.summary.numOfInsertsReq)); + TAOS_CHECK_RETURN(tEncodeU64(pEncoder, pReq->app.summary.numOfInsertRows)); + TAOS_CHECK_RETURN(tEncodeU64(pEncoder, pReq->app.summary.insertElapsedTime)); + TAOS_CHECK_RETURN(tEncodeU64(pEncoder, pReq->app.summary.insertBytes)); + TAOS_CHECK_RETURN(tEncodeU64(pEncoder, pReq->app.summary.fetchBytes)); + TAOS_CHECK_RETURN(tEncodeU64(pEncoder, pReq->app.summary.queryElapsedTime)); + TAOS_CHECK_RETURN(tEncodeU64(pEncoder, pReq->app.summary.numOfSlowQueries)); + TAOS_CHECK_RETURN(tEncodeU64(pEncoder, pReq->app.summary.totalRequests)); + TAOS_CHECK_RETURN(tEncodeU64(pEncoder, pReq->app.summary.currentRequests)); int32_t queryNum = 0; if (pReq->query) { queryNum = 1; - if (tEncodeI32(pEncoder, queryNum) < 0) return -1; - if (tEncodeU32(pEncoder, pReq->query->connId) < 0) return -1; + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, queryNum)); + TAOS_CHECK_RETURN(tEncodeU32(pEncoder, pReq->query->connId)); int32_t num = taosArrayGetSize(pReq->query->queryDesc); - if (tEncodeI32(pEncoder, num) < 0) return -1; + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, num)); for (int32_t i = 0; i < num; ++i) { SQueryDesc *desc = taosArrayGet(pReq->query->queryDesc, i); - if (tEncodeCStr(pEncoder, desc->sql) < 0) return -1; - if (tEncodeU64(pEncoder, desc->queryId) < 0) return -1; - if (tEncodeI64(pEncoder, desc->useconds) < 0) return -1; - if (tEncodeI64(pEncoder, desc->stime) < 0) return -1; - if (tEncodeI64(pEncoder, desc->reqRid) < 0) return -1; - if (tEncodeI8(pEncoder, desc->stableQuery) < 0) return -1; - if (tEncodeI8(pEncoder, desc->isSubQuery) < 0) return -1; - if (tEncodeCStr(pEncoder, desc->fqdn) < 0) return -1; - if (tEncodeI32(pEncoder, desc->subPlanNum) < 0) return -1; + TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, desc->sql)); + TAOS_CHECK_RETURN(tEncodeU64(pEncoder, desc->queryId)); + TAOS_CHECK_RETURN(tEncodeI64(pEncoder, desc->useconds)); + TAOS_CHECK_RETURN(tEncodeI64(pEncoder, desc->stime)); + TAOS_CHECK_RETURN(tEncodeI64(pEncoder, desc->reqRid)); + TAOS_CHECK_RETURN(tEncodeI8(pEncoder, desc->stableQuery)); + TAOS_CHECK_RETURN(tEncodeI8(pEncoder, desc->isSubQuery)); + TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, desc->fqdn)); + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, desc->subPlanNum)); int32_t snum = desc->subDesc ? taosArrayGetSize(desc->subDesc) : 0; - if (tEncodeI32(pEncoder, snum) < 0) return -1; + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, snum)); for (int32_t m = 0; m < snum; ++m) { SQuerySubDesc *sDesc = taosArrayGet(desc->subDesc, m); - if (tEncodeI64(pEncoder, sDesc->tid) < 0) return -1; - if (tEncodeCStr(pEncoder, sDesc->status) < 0) return -1; + TAOS_CHECK_RETURN(tEncodeI64(pEncoder, sDesc->tid)); + TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, sDesc->status)); } } } else { - if (tEncodeI32(pEncoder, queryNum) < 0) return -1; + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, queryNum)); } } int32_t kvNum = taosHashGetSize(pReq->info); - if (tEncodeI32(pEncoder, kvNum) < 0) return -1; + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, kvNum)); void *pIter = taosHashIterate(pReq->info, NULL); while (pIter != NULL) { SKv *kv = pIter; - if (tEncodeSKv(pEncoder, kv) < 0) return -1; + TAOS_CHECK_RETURN(tEncodeSKv(pEncoder, kv)); pIter = taosHashIterate(pReq->info, pIter); } @@ -322,83 +319,88 @@ static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pR } static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq) { - if (tDecodeSClientHbKey(pDecoder, &pReq->connKey) < 0) return -1; + TAOS_CHECK_RETURN(tDecodeSClientHbKey(pDecoder, &pReq->connKey)); if (pReq->connKey.connType == CONN_TYPE__QUERY) { - if (tDecodeI64(pDecoder, &pReq->app.appId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->app.pid) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pReq->app.name) < 0) return -1; - if (tDecodeI64(pDecoder, &pReq->app.startTime) < 0) return -1; - if (tDecodeU64(pDecoder, &pReq->app.summary.numOfInsertsReq) < 0) return -1; - if (tDecodeU64(pDecoder, &pReq->app.summary.numOfInsertRows) < 0) return -1; - if (tDecodeU64(pDecoder, &pReq->app.summary.insertElapsedTime) < 0) return -1; - if (tDecodeU64(pDecoder, &pReq->app.summary.insertBytes) < 0) return -1; - if (tDecodeU64(pDecoder, &pReq->app.summary.fetchBytes) < 0) return -1; - if (tDecodeU64(pDecoder, &pReq->app.summary.queryElapsedTime) < 0) return -1; - if (tDecodeU64(pDecoder, &pReq->app.summary.numOfSlowQueries) < 0) return -1; - if (tDecodeU64(pDecoder, &pReq->app.summary.totalRequests) < 0) return -1; - if (tDecodeU64(pDecoder, &pReq->app.summary.currentRequests) < 0) return -1; + TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->app.appId)); + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->app.pid)); + TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pReq->app.name)); + TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->app.startTime)); + TAOS_CHECK_RETURN(tDecodeU64(pDecoder, &pReq->app.summary.numOfInsertsReq)); + TAOS_CHECK_RETURN(tDecodeU64(pDecoder, &pReq->app.summary.numOfInsertRows)); + TAOS_CHECK_RETURN(tDecodeU64(pDecoder, &pReq->app.summary.insertElapsedTime)); + TAOS_CHECK_RETURN(tDecodeU64(pDecoder, &pReq->app.summary.insertBytes)); + TAOS_CHECK_RETURN(tDecodeU64(pDecoder, &pReq->app.summary.fetchBytes)); + TAOS_CHECK_RETURN(tDecodeU64(pDecoder, &pReq->app.summary.queryElapsedTime)); + TAOS_CHECK_RETURN(tDecodeU64(pDecoder, &pReq->app.summary.numOfSlowQueries)); + TAOS_CHECK_RETURN(tDecodeU64(pDecoder, &pReq->app.summary.totalRequests)); + TAOS_CHECK_RETURN(tDecodeU64(pDecoder, &pReq->app.summary.currentRequests)); int32_t queryNum = 0; - if (tDecodeI32(pDecoder, &queryNum) < 0) return -1; + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &queryNum)); if (queryNum) { pReq->query = taosMemoryCalloc(1, sizeof(*pReq->query)); if (NULL == pReq->query) return -1; - if (tDecodeU32(pDecoder, &pReq->query->connId) < 0) return -1; + TAOS_CHECK_RETURN(tDecodeU32(pDecoder, &pReq->query->connId)); int32_t num = 0; - if (tDecodeI32(pDecoder, &num) < 0) return -1; + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &num)); if (num > 0) { pReq->query->queryDesc = taosArrayInit(num, sizeof(SQueryDesc)); if (NULL == pReq->query->queryDesc) return -1; for (int32_t i = 0; i < num; ++i) { SQueryDesc desc = {0}; - if (tDecodeCStrTo(pDecoder, desc.sql) < 0) return -1; - if (tDecodeU64(pDecoder, &desc.queryId) < 0) return -1; - if (tDecodeI64(pDecoder, &desc.useconds) < 0) return -1; - if (tDecodeI64(pDecoder, &desc.stime) < 0) return -1; - if (tDecodeI64(pDecoder, &desc.reqRid) < 0) return -1; - if (tDecodeI8(pDecoder, (int8_t *)&desc.stableQuery) < 0) return -1; - if (tDecodeI8(pDecoder, (int8_t *)&desc.isSubQuery) < 0) return -1; - if (tDecodeCStrTo(pDecoder, desc.fqdn) < 0) return -1; - if (tDecodeI32(pDecoder, &desc.subPlanNum) < 0) return -1; + TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, desc.sql)); + TAOS_CHECK_RETURN(tDecodeU64(pDecoder, &desc.queryId)); + TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &desc.useconds)); + TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &desc.stime)); + TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &desc.reqRid)); + TAOS_CHECK_RETURN(tDecodeI8(pDecoder, (int8_t *)&desc.stableQuery)); + TAOS_CHECK_RETURN(tDecodeI8(pDecoder, (int8_t *)&desc.isSubQuery)); + TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, desc.fqdn)); + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &desc.subPlanNum)); int32_t snum = 0; - if (tDecodeI32(pDecoder, &snum) < 0) return -1; + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &snum)); if (snum > 0) { desc.subDesc = taosArrayInit(snum, sizeof(SQuerySubDesc)); if (NULL == desc.subDesc) return -1; for (int32_t m = 0; m < snum; ++m) { SQuerySubDesc sDesc = {0}; - if (tDecodeI64(pDecoder, &sDesc.tid) < 0) return -1; - if (tDecodeCStrTo(pDecoder, sDesc.status) < 0) return -1; - if (!taosArrayPush(desc.subDesc, &sDesc)) return -1; + TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &sDesc.tid)); + TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, sDesc.status)); + if (!taosArrayPush(desc.subDesc, &sDesc)) { + return terrno; + } } } ASSERT(desc.subPlanNum == taosArrayGetSize(desc.subDesc)); - if (!taosArrayPush(pReq->query->queryDesc, &desc)) return -1; + if (!taosArrayPush(pReq->query->queryDesc, &desc)) { + return terrno; + } } } } } int32_t kvNum = 0; - if (tDecodeI32(pDecoder, &kvNum) < 0) return -1; + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &kvNum)); if (pReq->info == NULL) { pReq->info = taosHashInit(kvNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); } - if (pReq->info == NULL) return -1; + if (pReq->info == NULL) { + return terrno; + } for (int32_t i = 0; i < kvNum; i++) { SKv kv = {0}; - if (tDecodeSKv(pDecoder, &kv) < 0) return -1; + TAOS_CHECK_RETURN(tDecodeSKv(pDecoder, &kv)); int32_t code = taosHashPut(pReq->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); if (code) { - terrno = code; - return -1; + return terrno = code; } } @@ -406,75 +408,75 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq) } static int32_t tSerializeSClientHbRsp(SEncoder *pEncoder, const SClientHbRsp *pRsp) { - if (tEncodeSClientHbKey(pEncoder, &pRsp->connKey) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->status) < 0) return -1; + TAOS_CHECK_RETURN(tEncodeSClientHbKey(pEncoder, &pRsp->connKey)); + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pRsp->status)); int32_t queryNum = 0; if (pRsp->query) { queryNum = 1; - if (tEncodeI32(pEncoder, queryNum) < 0) return -1; - if (tEncodeU32(pEncoder, pRsp->query->connId) < 0) return -1; - if (tEncodeU64(pEncoder, pRsp->query->killRid) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->query->totalDnodes) < 0) return -1; - if (tEncodeI32(pEncoder, pRsp->query->onlineDnodes) < 0) return -1; - if (tEncodeI8(pEncoder, pRsp->query->killConnection) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pRsp->query->epSet) < 0) return -1; + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, queryNum)); + TAOS_CHECK_RETURN(tEncodeU32(pEncoder, pRsp->query->connId)); + TAOS_CHECK_RETURN(tEncodeU64(pEncoder, pRsp->query->killRid)); + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pRsp->query->totalDnodes)); + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pRsp->query->onlineDnodes)); + TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pRsp->query->killConnection)); + TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pRsp->query->epSet)); int32_t num = taosArrayGetSize(pRsp->query->pQnodeList); - if (tEncodeI32(pEncoder, num) < 0) return -1; + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, num)); for (int32_t i = 0; i < num; ++i) { SQueryNodeLoad *pLoad = taosArrayGet(pRsp->query->pQnodeList, i); - if (tEncodeSQueryNodeLoad(pEncoder, pLoad) < 0) return -1; + TAOS_CHECK_RETURN(tEncodeSQueryNodeLoad(pEncoder, pLoad)); } } else { - if (tEncodeI32(pEncoder, queryNum) < 0) return -1; + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, queryNum)); } int32_t kvNum = taosArrayGetSize(pRsp->info); - if (tEncodeI32(pEncoder, kvNum) < 0) return -1; + TAOS_CHECK_RETURN(tEncodeI32(pEncoder, kvNum)); for (int32_t i = 0; i < kvNum; i++) { SKv *kv = taosArrayGet(pRsp->info, i); - if (tEncodeSKv(pEncoder, kv) < 0) return -1; + TAOS_CHECK_RETURN(tEncodeSKv(pEncoder, kv)); } return 0; } static int32_t tDeserializeSClientHbRsp(SDecoder *pDecoder, SClientHbRsp *pRsp) { - if (tDecodeSClientHbKey(pDecoder, &pRsp->connKey) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->status) < 0) return -1; + TAOS_CHECK_RETURN(tDecodeSClientHbKey(pDecoder, &pRsp->connKey)); + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pRsp->status)); int32_t queryNum = 0; - if (tDecodeI32(pDecoder, &queryNum) < 0) return -1; + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &queryNum)); if (queryNum) { pRsp->query = taosMemoryCalloc(1, sizeof(*pRsp->query)); if (NULL == pRsp->query) return -1; - if (tDecodeU32(pDecoder, &pRsp->query->connId) < 0) return -1; - if (tDecodeU64(pDecoder, &pRsp->query->killRid) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->query->totalDnodes) < 0) return -1; - if (tDecodeI32(pDecoder, &pRsp->query->onlineDnodes) < 0) return -1; - if (tDecodeI8(pDecoder, &pRsp->query->killConnection) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &pRsp->query->epSet) < 0) return -1; + TAOS_CHECK_RETURN(tDecodeU32(pDecoder, &pRsp->query->connId)); + TAOS_CHECK_RETURN(tDecodeU64(pDecoder, &pRsp->query->killRid)); + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pRsp->query->totalDnodes)); + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pRsp->query->onlineDnodes)); + TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pRsp->query->killConnection)); + TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pRsp->query->epSet)); int32_t pQnodeNum = 0; - if (tDecodeI32(pDecoder, &pQnodeNum) < 0) return -1; + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pQnodeNum)); if (pQnodeNum > 0) { pRsp->query->pQnodeList = taosArrayInit(pQnodeNum, sizeof(SQueryNodeLoad)); - if (NULL == pRsp->query->pQnodeList) return -1; + if (NULL == pRsp->query->pQnodeList) return terrno; for (int32_t i = 0; i < pQnodeNum; ++i) { SQueryNodeLoad load = {0}; - if (tDecodeSQueryNodeLoad(pDecoder, &load) < 0) return -1; - if (!taosArrayPush(pRsp->query->pQnodeList, &load)) return -1; + TAOS_CHECK_RETURN(tDecodeSQueryNodeLoad(pDecoder, &load)); + if (!taosArrayPush(pRsp->query->pQnodeList, &load)) return terrno; } } } int32_t kvNum = 0; - if (tDecodeI32(pDecoder, &kvNum) < 0) return -1; + TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &kvNum)); pRsp->info = taosArrayInit(kvNum, sizeof(SKv)); if (pRsp->info == NULL) return -1; for (int32_t i = 0; i < kvNum; i++) { SKv kv = {0}; - if (tDecodeSKv(pDecoder, &kv)) return -1; - if (!taosArrayPush(pRsp->info, &kv)) return -1; + TAOS_CHECK_RETURN(tDecodeSKv(pDecoder, &kv)); + if (!taosArrayPush(pRsp->info, &kv)) return terrno; } return 0; @@ -5313,7 +5315,7 @@ int32_t tSerializeSMTimerMsg(void *buf, int32_t bufLen, SMTimerReq *pReq) { // return 0; // } -int32_t tSerializeDropOrphanTaskMsg(void* buf, int32_t bufLen, SMStreamDropOrphanMsg* pMsg) { +int32_t tSerializeDropOrphanTaskMsg(void *buf, int32_t bufLen, SMStreamDropOrphanMsg *pMsg) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -5336,7 +5338,7 @@ int32_t tSerializeDropOrphanTaskMsg(void* buf, int32_t bufLen, SMStreamDropOrpha return tlen; } -int32_t tDeserializeDropOrphanTaskMsg(void* buf, int32_t bufLen, SMStreamDropOrphanMsg* pMsg) { +int32_t tDeserializeDropOrphanTaskMsg(void *buf, int32_t bufLen, SMStreamDropOrphanMsg *pMsg) { SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index 88b8e98afb..75ba51e498 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -64,6 +64,8 @@ typedef struct SChkptReportInfo { } SChkptReportInfo; typedef struct SStreamExecInfo { + int32_t role; + bool switchFromFollower; bool initTaskList; SArray *pNodeList; int64_t ts; // snapshot ts @@ -152,7 +154,8 @@ bool streamTaskIterNextTask(SStreamTaskIter *pIter); int32_t streamTaskIterGetCurrent(SStreamTaskIter *pIter, SStreamTask **pTask); int32_t mndInitExecInfo(); void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo); -void mndInitStreamExecInfoForLeader(SMnode *pMnode); +void mndStreamResetInitTaskListLoadFlag(); +void mndUpdateStreamExecInfoRole(SMnode *pMnode, int32_t role); int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot); void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index d601bbf315..a35815cf4d 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -144,7 +144,7 @@ int32_t mndInitStream(SMnode *pMnode) { code = sdbSetTable(pMnode->pSdb, table); if (code) { - return terrno; + return code; } code = sdbSetTable(pMnode->pSdb, tableSeq); @@ -2024,7 +2024,7 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP return info; } -static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) { +static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes) { SSdb *pSdb = pMnode->pSdb; SStreamObj *pStream = NULL; void *pIter = NULL; @@ -2069,12 +2069,14 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange } } - void *p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb)); - void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb)); - if (p == NULL && p1 == NULL) { - mDebug("stream:0x%" PRIx64 " %s not involved nodeUpdate, ignore", pStream->uid, pStream->name); - sdbRelease(pSdb, pStream); - continue; + if (!includeAllNodes) { + void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb)); + void *p2 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb)); + if (p1 == NULL && p2 == NULL) { + mDebug("stream:0x%" PRIx64 " %s not involved nodeUpdate, ignore", pStream->uid, pStream->name); + sdbRelease(pSdb, pStream); + continue; + } } mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid, @@ -2192,11 +2194,36 @@ static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeLi return code; } +static int32_t addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) { + void *pIter = NULL; + int32_t code = 0; + while (1) { + SVgObj *pVgroup = NULL; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) { + break; + } + + code = taosHashPut(pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0); + sdbRelease(pSdb, pVgroup); + + if (code == 0) { + int32_t size = taosHashGetSize(pDBMap); + mDebug("add Db:%s into Dbs list (total:%d) for kill checkpoint trans", pVgroup->dbName, size); + } + } + + return code; +} + // this function runs by only one thread, so it is not multi-thread safe static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { int32_t code = 0; bool allReady = true; SArray *pNodeSnapshot = NULL; + SMnode *pMnode = pMsg->info.node; + int64_t ts = taosGetTimestampSec(); + bool updateAllVgroups = false; int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1); if (old != 0) { @@ -2204,10 +2231,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { return 0; } - mDebug("start to do node change checking"); - int64_t ts = taosGetTimestampSec(); - - SMnode *pMnode = pMsg->info.node; + mDebug("start to do node changing check"); streamMutexLock(&execInfo.lock); int32_t numOfNodes = extractStreamNodeList(pMnode); @@ -2240,10 +2264,20 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { } SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot); - if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) { + + { + if (execInfo.role == NODE_ROLE_LEADER && execInfo.switchFromFollower) { + mInfo("rollback all stream due to mnode leader/follower switch by using nodeUpdate trans"); + updateAllVgroups = true; + execInfo.switchFromFollower = false; // reset the flag + (void) addAllDbsIntoHashmap(changeInfo.pDBMap, pMnode->pSdb); + } + } + + if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0 || updateAllVgroups) { // kill current active checkpoint transaction, since the transaction is vnode wide. killAllCheckpointTrans(pMnode, &changeInfo); - code = mndProcessVgroupChange(pMnode, &changeInfo); + code = mndProcessVgroupChange(pMnode, &changeInfo, updateAllVgroups); // keep the new vnode snapshot if success if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { @@ -2284,6 +2318,9 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { int32_t size = sizeof(SMStreamNodeCheckMsg); SMStreamNodeCheckMsg *pMsg = rpcMallocCont(size); + if (pMsg == NULL) { + return terrno; + } SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = size}; return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); @@ -2459,6 +2496,10 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { { SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRsp)}; rsp.pCont = rpcMallocCont(rsp.contLen); + if (rsp.pCont == NULL) { + return terrno; + } + SMsgHead *pHead = rsp.pCont; pHead->vgId = htonl(req.nodeId); @@ -2667,11 +2708,13 @@ static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t* pEx static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code) { SRpcMsg rsp = {.code = code, .info = *pInfo, .contLen = msgSize}; rsp.pCont = rpcMallocCont(rsp.contLen); - SMsgHead *pHead = rsp.pCont; - pHead->vgId = htonl(vgId); + if (rsp.pCont != NULL) { + SMsgHead *pHead = rsp.pCont; + pHead->vgId = htonl(vgId); - tmsgSendRsp(&rsp); - pInfo->handle = NULL; // disable auto rsp + tmsgSendRsp(&rsp); + pInfo->handle = NULL; // disable auto rsp + } } int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) { @@ -2808,6 +2851,10 @@ static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) { int32_t code = mndProcessCreateStreamReq(pReq); if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { pReq->info.rsp = rpcMallocCont(1); + if (pReq->info.rsp == NULL) { + return terrno; + } + pReq->info.rspLen = 1; pReq->info.noResp = false; pReq->code = code; @@ -2819,6 +2866,10 @@ static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) { int32_t code = mndProcessDropStreamReq(pReq); if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { pReq->info.rsp = rpcMallocCont(1); + if (pReq->info.rsp == NULL) { + return terrno; + } + pReq->info.rspLen = 1; pReq->info.noResp = false; pReq->code = code; @@ -2835,10 +2886,39 @@ void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo) { pExecInfo->initTaskList = true; } -void mndInitStreamExecInfoForLeader(SMnode* pMnode) { +void mndStreamResetInitTaskListLoadFlag() { + mInfo("reset task list buffer init flag for leader"); execInfo.initTaskList = false; - mInfo("init stream execInfo for leader"); - mndInitStreamExecInfo(pMnode, &execInfo); +} + +void mndUpdateStreamExecInfoRole(SMnode* pMnode, int32_t role) { + execInfo.switchFromFollower = false; + + if (execInfo.role == NODE_ROLE_UNINIT) { + execInfo.role = role; + if (role == NODE_ROLE_LEADER) { + mInfo("init mnode is set to leader"); + } else { + mInfo("init mnode is set to follower"); + } + } else { + if (role == NODE_ROLE_LEADER) { + if (execInfo.role == NODE_ROLE_FOLLOWER) { + execInfo.role = role; + execInfo.switchFromFollower = true; + mInfo("mnode switch to be leader from follower"); + } else { + mInfo("mnode remain to be leader, do nothing"); + } + } else { // follower's + if (execInfo.role == NODE_ROLE_LEADER) { + execInfo.role = role; + mInfo("mnode switch to be follower from leader"); + } else { + mInfo("mnode remain to be follower, do nothing"); + } + } + } } void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) { @@ -2936,7 +3016,8 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) { // check if it is conflict with other trans in both sourceDb and targetDb. bool conflict = mndStreamTransConflictCheck(pMnode, pTask->streamId, MND_STREAM_DROP_NAME, false); if (conflict) { - TAOS_RETURN(TSDB_CODE_MND_TRANS_CONFLICT); + code = TSDB_CODE_MND_TRANS_CONFLICT; + goto _err; } SStreamObj dummyObj = {.uid = pTask->streamId, .sourceDb = "", .targetSTbName = ""}; @@ -2949,8 +3030,7 @@ static int32_t mndProcessDropOrphanTaskReq(SRpcMsg *pReq) { code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pTask->streamId); if (code) { - mndTransDrop(pTrans); - return code; + goto _err; } // drop all tasks @@ -2974,7 +3054,7 @@ _err: tDestroyDropOrphanTaskMsg(&msg); mndTransDrop(pTrans); - if (code == TSDB_CODE_SUCCESS) { + if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { mDebug("create drop %d orphan tasks trans succ", numOfTasks); } return code; diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 414cd402ec..494771e65e 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -334,5 +334,5 @@ void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) { taosMemoryFree(p); } - mDebug("complete clear checkpoints in Dbs"); + mDebug("complete clear checkpoints in all Dbs"); } diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 383ffe16da..07bba4e1b3 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -785,6 +785,9 @@ int32_t mndInitExecInfo() { return terrno; } + execInfo.role = NODE_ROLE_UNINIT; + execInfo.switchFromFollower = false; + taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList); taosHashSetFreeFp(execInfo.pChkptStreams, freeTaskList); taosHashSetFreeFp(execInfo.pStreamConsensus, freeTaskList); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index d4327eb4db..cf7769b932 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -363,6 +363,8 @@ static void mndBecomeFollower(const SSyncFSM *pFsm) { (void)tsem_post(&pMgmt->syncSem); } (void)taosThreadMutexUnlock(&pMgmt->lock); + + mndUpdateStreamExecInfoRole(pMnode, NODE_ROLE_FOLLOWER); } static void mndBecomeLearner(const SSyncFSM *pFsm) { @@ -385,7 +387,9 @@ static void mndBecomeLearner(const SSyncFSM *pFsm) { static void mndBecomeLeader(const SSyncFSM *pFsm) { mInfo("vgId:1, become leader"); SMnode *pMnode = pFsm->data; - mndInitStreamExecInfoForLeader(pMnode); + + mndUpdateStreamExecInfoRole(pMnode, NODE_ROLE_LEADER); + mndStreamResetInitTaskListLoadFlag(); } static bool mndApplyQueueEmpty(const SSyncFSM *pFsm) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index dc58bfd8c4..7037eb5199 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -193,28 +193,23 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM SStreamTask* pTask = *ppTask; const char* idstr = pTask->id.idStr; - if (pMeta->updateInfo.transId == -1) { // info needs to be kept till the new trans to update the nodeEp arrived. - streamMetaInitUpdateTaskList(pMeta, req.transId); + if (req.transId <= 0) { + tqError("vgId:%d invalid update nodeEp task, transId:%d, discard", vgId, req.taskId); + rsp.code = TSDB_CODE_SUCCESS; + streamMetaWUnLock(pMeta); + + taosArrayDestroy(req.pNodeList); + return rsp.code; } - if (pMeta->updateInfo.transId != req.transId) { - if (req.transId < pMeta->updateInfo.transId) { - tqError("s-task:%s vgId:%d disorder update nodeEp msg recv, discarded, newest transId:%d, recv:%d", idstr, vgId, - pMeta->updateInfo.transId, req.transId); - rsp.code = TSDB_CODE_SUCCESS; - streamMetaWUnLock(pMeta); + // info needs to be kept till the new trans to update the nodeEp arrived. + bool update = streamMetaInitUpdateTaskList(pMeta, req.transId); + if (!update) { + rsp.code = TSDB_CODE_SUCCESS; + streamMetaWUnLock(pMeta); - taosArrayDestroy(req.pNodeList); - return rsp.code; - } else { - tqInfo("s-task:%s vgId:%d receive new trans to update nodeEp msg from mnode, transId:%d, prev transId:%d", idstr, - vgId, req.transId, pMeta->updateInfo.transId); - // info needs to be kept till the new trans to update the nodeEp arrived. - streamMetaInitUpdateTaskList(pMeta, req.transId); - } - } else { - tqDebug("s-task:%s vgId:%d recv trans to update nodeEp from mnode, transId:%d, recorded update transId:%d", idstr, - vgId, req.transId, pMeta->updateInfo.transId); + taosArrayDestroy(req.pNodeList); + return rsp.code; } // duplicate update epset msg received, discard this redundant message @@ -311,7 +306,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM // persist to disk } - streamMetaClearUpdateTaskList(pMeta); + streamMetaClearSetUpdateTaskListComplete(pMeta); if (!restored) { tqDebug("vgId:%d vnode restore not completed, not start all tasks", vgId); @@ -775,8 +770,8 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { streamMetaWUnLock(pMeta); terrno = 0; - tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d", vgId, - pMeta->updateInfo.transId); + tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d, ts:%" PRId64, vgId, + pMeta->updateInfo.completeTransId, pMeta->updateInfo.completeTs); while (streamMetaTaskInTimer(pMeta)) { tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId); @@ -902,7 +897,7 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) { return restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER)); } else { if (pStartInfo->restartCount == 0) { - tqDebug("vgId:%d start all tasks completed in callbackFn, restartCount is 0", pMeta->vgId); + tqDebug("vgId:%d start all tasks completed in callbackFn, restartCounter is 0", pMeta->vgId); } else if (allReady) { pStartInfo->restartCount = 0; tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId); diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 093555c9c5..d7b60b2bcd 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -149,6 +149,9 @@ _error: } if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index 9019fa0fef..a9858eeb96 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -344,6 +344,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index d4e5dedd20..b80ea74006 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -147,6 +147,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index b732fccd8e..210c073c6d 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1814,6 +1814,10 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) { QUERY_CHECK_CODE(code, lino, _end); res->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT}; code = nodesListAppend(pFuncNode->pParameterList, (SNode*)res); + if (code != TSDB_CODE_SUCCESS) { + nodesDestroyNode((SNode*)res); + res = NULL; + } QUERY_CHECK_CODE(code, lino, _end); } #endif @@ -1945,7 +1949,7 @@ int32_t createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, SExprInfo** SExprInfo* pExp = &pExprs[i]; code = createExprFromTargetNode(pExp, pTargetNode); if (code != TSDB_CODE_SUCCESS) { - taosMemoryFreeClear(pExprs); + destroyExprInfo(pExprs, *numOfExprs); qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); return code; } diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 0b66834d45..5ece57cad1 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -579,6 +579,9 @@ _error: pTaskInfo->code = code; if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } return code; diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 3e9ac2b10a..b899524988 100644 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -1510,6 +1510,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && pDownstream != NULL && (*pDownstream) != NULL) { + destroyOperator(*pDownstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 69a9045004..e5289fa216 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -561,6 +561,10 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo } initBasicInfo(&pInfo->binfo, pResBlock); + pInfo->pGroupCols = NULL; + code = extractColumnInfo(pAggNode->pGroupKeys, &pInfo->pGroupCols); + QUERY_CHECK_CODE(code, lino, _error); + int32_t numOfScalarExpr = 0; SExprInfo* pScalarExprInfo = NULL; if (pAggNode->pExprs != NULL) { @@ -568,10 +572,6 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo QUERY_CHECK_CODE(code, lino, _error); } - pInfo->pGroupCols = NULL; - code = extractColumnInfo(pAggNode->pGroupKeys, &pInfo->pGroupCols); - QUERY_CHECK_CODE(code, lino, _error); - code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, numOfScalarExpr, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -618,6 +618,9 @@ _error: if (pOperator) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } @@ -1162,6 +1165,8 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo SExprInfo* pExprInfo = NULL; code = createExprInfo(pPartNode->pTargets, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); + pOperator->exprSupp.numOfExprs = numOfCols; + pOperator->exprSupp.pExprInfo = pExprInfo; pInfo->pGroupCols = makeColumnArrayFromList(pPartNode->pPartitionKeys); @@ -1227,8 +1232,6 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo setOperatorInfo(pOperator, "PartitionOperator", QUERY_NODE_PHYSICAL_PLAN_PARTITION, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->exprSupp.numOfExprs = numOfCols; - pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); @@ -1248,6 +1251,9 @@ _error: pTaskInfo->code = code; if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } TAOS_RETURN(code); @@ -1797,6 +1803,9 @@ _error: if (pInfo != NULL) destroyStreamPartitionOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 66a7408b13..4d2bdc62f8 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -107,10 +107,6 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pOperator->pTaskInfo = pTaskInfo; int32_t lino = 0; - int32_t numOfCols = 0; - SExprInfo* pExprInfo = NULL; - code = createExprInfo(pProjPhyNode->pProjections, NULL, &pExprInfo, &numOfCols); - TSDB_CHECK_CODE(code, lino, _error); SSDataBlock* pResBlock = createDataBlockFromDescNode(pProjPhyNode->node.pOutputDataBlockDesc); TSDB_CHECK_NULL(pResBlock, code, lino, _error, terrno); @@ -148,6 +144,11 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* } initResultSizeInfo(&pOperator->resultInfo, numOfRows); + + int32_t numOfCols = 0; + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pProjPhyNode->pProjections, NULL, &pExprInfo, &numOfCols); + TSDB_CHECK_CODE(code, lino, _error); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); TSDB_CHECK_CODE(code, lino, _error); @@ -182,6 +183,9 @@ _error: if (pInfo != NULL) destroyProjectOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -470,11 +474,6 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* SIndefRowsFuncPhysiNode* pPhyNode = (SIndefRowsFuncPhysiNode*)pNode; - int32_t numOfExpr = 0; - SExprInfo* pExprInfo = NULL; - code = createExprInfo(pPhyNode->pFuncs, NULL, &pExprInfo, &numOfExpr); - TSDB_CHECK_CODE(code, lino, _error); - if (pPhyNode->pExprs != NULL) { int32_t num = 0; SExprInfo* pSExpr = NULL; @@ -501,6 +500,11 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* code = blockDataEnsureCapacity(pResBlock, numOfRows); TSDB_CHECK_CODE(code, lino, _error); + int32_t numOfExpr = 0; + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pPhyNode->pFuncs, NULL, &pExprInfo, &numOfExpr); + TSDB_CHECK_CODE(code, lino, _error); + code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); TSDB_CHECK_CODE(code, lino, _error); @@ -534,6 +538,9 @@ _error: if (pInfo != NULL) destroyIndefinitOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 59b4e1cbbb..36f9ac0954 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -166,6 +166,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -841,6 +844,9 @@ _error: } if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } return code; diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 62506858fc..44a383772d 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -834,12 +834,14 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* } SExprSupp* pExpSup = &pOperator->exprSupp; + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + pInfo->binfo.pRes = pResBlock; + SExprInfo* pExprInfo = NULL; code = createExprInfo(pCountNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); - QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -863,7 +865,6 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); QUERY_CHECK_CODE(code, lino, _error); - pInfo->binfo.pRes = pResBlock; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStDeleted = tSimpleHashInit(64, hashFn); QUERY_CHECK_NULL(pInfo->pStDeleted, code, lino, _error, terrno); @@ -928,6 +929,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 93f30ea899..ff1ff579fc 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -895,14 +895,16 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); QUERY_CHECK_CODE(code, lino, _error); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + pInfo->binfo.pRes = pResBlock; + SExprSupp* pExpSup = &pOperator->exprSupp; int32_t numOfCols = 0; SExprInfo* pExprInfo = NULL; code = createExprInfo(pEventNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); - QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -986,6 +988,9 @@ _error: if (pInfo != NULL) destroyStreamEventOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 39e602ee84..75b15dbea4 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1370,6 +1370,9 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pFillExprInfo, &numOfFillCols); QUERY_CHECK_CODE(code, lino, _error); + code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI.functionStore); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI); if (!pInfo->pFillSup) { code = TSDB_CODE_FAILED; @@ -1440,9 +1443,6 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); QUERY_CHECK_CODE(code, lino, _error); - code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI.functionStore); - QUERY_CHECK_CODE(code, lino, _error); - pInfo->srcRowIndex = -1; setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -1463,6 +1463,9 @@ _error: if (pInfo != NULL) destroyStreamFillOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index cf3b53bf02..756a6d71e1 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1896,11 +1896,6 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN QUERY_CHECK_CODE(code, lino, _error); } - int32_t numOfCols = 0; - SExprInfo* pExprInfo = NULL; - code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); - QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&pInfo->binfo, pResBlock); @@ -1914,6 +1909,12 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN qInfo("copy state %p to %p", pTaskInfo->streamInfo.pState, pInfo->pState); pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex); + + int32_t numOfCols = 0; + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); + code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -2018,6 +2019,9 @@ _error: if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -3739,13 +3743,15 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode } } SExprSupp* pExpSup = &pOperator->exprSupp; + + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + pInfo->binfo.pRes = pResBlock; SExprInfo* pExprInfo = NULL; code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); - QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -3771,7 +3777,7 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode if (pSessionNode->window.pTsEnd) { pInfo->endTsIndex = ((SColumnNode*)pSessionNode->window.pTsEnd)->slotId; } - pInfo->binfo.pRes = pResBlock; + pInfo->order = TSDB_ORDER_ASC; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStDeleted = tSimpleHashInit(64, hashFn); @@ -3843,6 +3849,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -4102,6 +4111,9 @@ _error: } if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -4915,14 +4927,16 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* code = initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); QUERY_CHECK_CODE(code, lino, _error); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + pInfo->binfo.pRes = pResBlock; + SExprSupp* pExpSup = &pOperator->exprSupp; int32_t numOfCols = 0; SExprInfo* pExprInfo = NULL; code = createExprInfo(pStateNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); - QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -4998,6 +5012,9 @@ _error: if (pInfo != NULL) destroyStreamStateOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -5206,10 +5223,6 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* SStreamIntervalPhysiNode* pIntervalPhyNode = (SStreamIntervalPhysiNode*)pPhyNode; - SExprInfo* pExprInfo = NULL; - code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); - QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); initBasicInfo(&pInfo->binfo, pResBlock); @@ -5253,6 +5266,9 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex); size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); + QUERY_CHECK_CODE(code, lino, _error); code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pInfo->pState, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -5337,6 +5353,9 @@ _error: if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index e346946a7a..59c19a706c 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -570,8 +570,8 @@ int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFi _end: if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(pFillInfo->next.pRowVal); - taosArrayDestroy(pFillInfo->prev.pRowVal); + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + pFillInfo = taosDestroyFillInfo(pFillInfo); } (*ppFillInfo) = pFillInfo; return code; @@ -764,6 +764,7 @@ SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprIn SValueNode* pv = (SValueNode*)nodesListGetNode(pValNode->pNodeList, index); QUERY_CHECK_NULL(pv, code, lino, _end, terrno); code = nodesValueNodeToVariant(pv, &pFillCol[i].fillVal); + QUERY_CHECK_CODE(code, lino, _end); } if (TSDB_CODE_SUCCESS != code) { goto _end; diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 8cd547e333..258f886805 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -1212,6 +1212,9 @@ _error: if (pInfo != NULL) destroyTimeSliceOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -1249,10 +1252,11 @@ void destroyTimeSliceOperatorInfo(void* param) { } cleanupExprSupp(&pInfo->scalarSup); - - for (int32_t i = 0; i < pInfo->pFillColInfo->numOfFillExpr; ++i) { - taosVariantDestroy(&pInfo->pFillColInfo[i].fillVal); + if (pInfo->pFillColInfo != NULL) { + for (int32_t i = 0; i < pInfo->pFillColInfo->numOfFillExpr; ++i) { + taosVariantDestroy(&pInfo->pFillColInfo[i].fillVal); + } + taosMemoryFree(pInfo->pFillColInfo); } - taosMemoryFree(pInfo->pFillColInfo); taosMemoryFreeClear(param); } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index fa9dc79cc3..6a74c6a093 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1421,6 +1421,9 @@ _error: } if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -1700,6 +1703,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -1740,15 +1746,15 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc); + QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); + initBasicInfo(&pInfo->binfo, pResBlock); + int32_t numOfCols = 0; SExprInfo* pExprInfo = NULL; code = createExprInfo(pSessionNode->window.pFuncs, NULL, &pExprInfo, &numOfCols); QUERY_CHECK_CODE(code, lino, _error); - SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc); - QUERY_CHECK_NULL(pResBlock, code, lino, _error, terrno); - initBasicInfo(&pInfo->binfo, pResBlock); - code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); QUERY_CHECK_CODE(code, lino, _error); @@ -1796,6 +1802,9 @@ _error: if (pInfo != NULL) destroySWindowOperatorInfo(pInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -2113,6 +2122,9 @@ _error: if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo); if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -2380,11 +2392,6 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva goto _error; } - int32_t num = 0; - SExprInfo* pExprInfo = NULL; - code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num); - QUERY_CHECK_CODE(code, lino, _error); - SInterval interval = {.interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding, .intervalUnit = pIntervalPhyNode->intervalUnit, @@ -2408,6 +2415,11 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); + int32_t num = 0; + SExprInfo* pExprInfo = NULL; + code = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &pExprInfo, &num); + QUERY_CHECK_CODE(code, lino, _error); + code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { @@ -2450,6 +2462,9 @@ _error: if (pOperator != NULL) { pOperator->info = NULL; + if (pOperator->pDownstream == NULL && downstream != NULL) { + destroyOperator(downstream); + } destroyOperator(pOperator); } pTaskInfo->code = code; diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 846837fd99..a4f32356c6 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1173,49 +1173,79 @@ static int32_t findPosBytes(char *orgStr, char *delimStr, int32_t orgLen, int32_ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { int32_t code = TSDB_CODE_SUCCESS; - int32_t subPos = 0; - GET_TYPED_DATA(subPos, int32_t, GET_PARAM_TYPE(&pInput[1]), pInput[1].columnData->pData); - - int32_t subLen = INT16_MAX; - if (inputNum == 3) { - GET_TYPED_DATA(subLen, int32_t, GET_PARAM_TYPE(&pInput[2]), pInput[2].columnData->pData); - } - - SColumnInfoData *pInputData = pInput->columnData; + SColumnInfoData *pInputData[3]; SColumnInfoData *pOutputData = pOutput->columnData; - int32_t outputLen = pInputData->info.bytes; + for (int32_t i = 0; i < inputNum; ++i) { + pInputData[i] = pInput[i].columnData; + } + + int32_t outputLen = pInputData[0]->info.bytes; char *outputBuf = taosMemoryMalloc(outputLen); if (outputBuf == NULL) { qError("substr function memory allocation failure. size: %d", outputLen); return TSDB_CODE_OUT_OF_MEMORY; } - for (int32_t i = 0; i < pInput->numOfRows; ++i) { - if (colDataIsNull_s(pInputData, i)) { + int32_t numOfRows = 0; + for (int32_t i = 0; i < inputNum; ++i) { + numOfRows = TMAX(pInput[i].numOfRows, numOfRows); + } + + bool hasNullType = (IS_NULL_TYPE(GET_PARAM_TYPE(&pInput[0])) || IS_NULL_TYPE(GET_PARAM_TYPE(&pInput[1])) || + (inputNum == 3 && IS_NULL_TYPE(GET_PARAM_TYPE(&pInput[2])))); + + if (hasNullType || + (pInput[0].numOfRows == 1 && colDataIsNull_s(pInputData[0], 0)) || + (pInput[1].numOfRows == 1 && colDataIsNull_s(pInputData[1], 0)) || + (inputNum == 3 && (pInput[2].numOfRows == 1 && colDataIsNull_s(pInputData[2], 0)))) { + colDataSetNNULL(pOutputData, 0, numOfRows); + pOutput->numOfRows = numOfRows; + goto _return; + } + + int32_t colIdx[3]; + for (int32_t i = 0; i < numOfRows; ++i) { + colIdx[0] = (pInput[0].numOfRows == 1) ? 0 : i; + colIdx[1] = (pInput[1].numOfRows == 1) ? 0 : i; + if (inputNum == 3) { + colIdx[2] = (pInput[2].numOfRows == 1) ? 0 : i; + } + + if (colDataIsNull_s(pInputData[0], colIdx[0]) || colDataIsNull_s(pInputData[1], colIdx[1]) || + (inputNum == 3 && colDataIsNull_s(pInputData[2], colIdx[2]))) { colDataSetNULL(pOutputData, i); continue; } + + int32_t subPos = 0; + int32_t subLen = INT16_MAX; + GET_TYPED_DATA(subPos, int32_t, GET_PARAM_TYPE(&pInput[1]), colDataGetData(pInputData[1], colIdx[1])); + if (inputNum == 3) { + GET_TYPED_DATA(subLen, int32_t, GET_PARAM_TYPE(&pInput[2]), colDataGetData(pInputData[2], colIdx[2])); + } + if (subPos == 0 || subLen < 1) { varDataSetLen(outputBuf, 0); SCL_ERR_JRET(colDataSetVal(pOutputData, i, outputBuf, false)); continue; } - char *input = colDataGetData(pInput[0].columnData, i); + + char *input = colDataGetData(pInputData[0], colIdx[0]); int32_t len = varDataLen(input); int32_t startPosBytes; int32_t endPosBytes = len; if (subPos > 0) { - startPosBytes = (GET_PARAM_TYPE(pInput) == TSDB_DATA_TYPE_VARCHAR) ? findPosBytes(varDataVal(input), NULL, varDataLen(input), -1, subPos) : (subPos - 1) * TSDB_NCHAR_SIZE; + startPosBytes = (GET_PARAM_TYPE(&pInput[0]) == TSDB_DATA_TYPE_VARCHAR) ? findPosBytes(varDataVal(input), NULL, varDataLen(input), -1, subPos) : (subPos - 1) * TSDB_NCHAR_SIZE; startPosBytes = TMIN(startPosBytes, len); } else { startPosBytes = - (GET_PARAM_TYPE(pInput) == TSDB_DATA_TYPE_VARCHAR) ? findPosBytes(varDataVal(input), NULL, varDataLen(input), -1, subPos) : len + subPos * TSDB_NCHAR_SIZE; + (GET_PARAM_TYPE(&pInput[0]) == TSDB_DATA_TYPE_VARCHAR) ? findPosBytes(varDataVal(input), NULL, varDataLen(input), -1, subPos) : len + subPos * TSDB_NCHAR_SIZE; startPosBytes = TMAX(startPosBytes, 0); } if (inputNum == 3) { endPosBytes = - (GET_PARAM_TYPE(pInput) == TSDB_DATA_TYPE_VARCHAR) + (GET_PARAM_TYPE(&pInput[0]) == TSDB_DATA_TYPE_VARCHAR) ? startPosBytes + findPosBytes(varDataVal(input) + startPosBytes, NULL, varDataLen(input) - startPosBytes, -1, subLen + 1) : startPosBytes + subLen * TSDB_NCHAR_SIZE; endPosBytes = TMIN(endPosBytes, len); @@ -1230,10 +1260,10 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu varDataSetLen(output, 0); } - SCL_ERR_JRET(colDataSetVal(pOutputData, i, output, false)); + SCL_ERR_JRET(colDataSetVal(pOutputData, i, outputBuf, false)); } - pOutput->numOfRows = pInput->numOfRows; + pOutput->numOfRows = numOfRows; _return: taosMemoryFree(outputBuf); @@ -1295,30 +1325,40 @@ int32_t charFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutp int32_t outputLen = inputNum * 4 + 2; char *outputBuf = taosMemoryCalloc(outputLen, 1); if (outputBuf == NULL) { - SCL_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + SCL_ERR_RET(terrno); } - for (int32_t i = 0; i < pInput[0].numOfRows; ++i) { + int32_t numOfRows = 0; + for (int32_t i = 0; i < inputNum; ++i) { + numOfRows = TMAX(numOfRows, pInput[i].numOfRows); + } + for (int32_t i = 0; i < numOfRows; ++i) { char *output = varDataVal(outputBuf); for (int32_t j = 0; j < inputNum; ++j) { + int32_t colIdx = (pInput[j].numOfRows == 1) ? 0 : i; int32_t num; if (colDataIsNull_s(pInput[j].columnData, i)) { continue; } else if (IS_NUMERIC_TYPE(GET_PARAM_TYPE(&pInput[j]))) { - GET_TYPED_DATA(num, int32_t, GET_PARAM_TYPE(&pInput[j]), pInput[j].columnData->pData); + GET_TYPED_DATA(num, int32_t, GET_PARAM_TYPE(&pInput[j]), colDataGetData(pInput[j].columnData, colIdx)); getAsciiChar(num, &output); } else if (TSDB_DATA_TYPE_BINARY == GET_PARAM_TYPE(&pInput[j])) { - num = taosStr2Int32(varDataVal(pInput[j].columnData->pData), NULL, 10); + num = taosStr2Int32(varDataVal(colDataGetData(pInput[j].columnData, colIdx)), NULL, 10); getAsciiChar(num, &output); } else if (TSDB_DATA_TYPE_NCHAR == GET_PARAM_TYPE(&pInput[j])) { char *convBuf = taosMemoryMalloc(GET_PARAM_BYTES(&pInput[j])); - int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(pInput[j].columnData->pData), varDataLen(pInput[j].columnData->pData), convBuf); + if (convBuf == NULL) { + SCL_ERR_RET(terrno); + } + int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(colDataGetData(pInput[j].columnData, colIdx)), varDataLen(colDataGetData(pInput[j].columnData, colIdx)), convBuf); if (len < 0) { + taosMemoryFree(convBuf); code = TSDB_CODE_SCALAR_CONVERT_ERROR; goto _return; } convBuf[len] = 0; num = taosStr2Int32(convBuf, NULL, 10); getAsciiChar(num, &output); + taosMemoryFree(convBuf); } else { code = TSDB_CODE_FUNC_FUNTION_PARA_TYPE; goto _return; @@ -1510,13 +1550,13 @@ int32_t replaceFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pO bool needFreeTo = false; if (GET_PARAM_TYPE(&pInput[1]) != GET_PARAM_TYPE(&pInput[0])) { - SCL_ERR_RET(convBetweenNcharAndVarchar(varDataVal(colDataGetData(pInputData[1], colIdx2)), &fromStr, + SCL_ERR_JRET(convBetweenNcharAndVarchar(varDataVal(colDataGetData(pInputData[1], colIdx2)), &fromStr, varDataLen(colDataGetData(pInputData[1], colIdx2)), &fromLen, GET_PARAM_TYPE(&pInput[0]))); needFreeFrom = true; } if (GET_PARAM_TYPE(&pInput[2]) != GET_PARAM_TYPE(&pInput[0])) { - SCL_ERR_RET(convBetweenNcharAndVarchar(varDataVal(colDataGetData(pInputData[2], colIdx3)), &toStr, + SCL_ERR_JRET(convBetweenNcharAndVarchar(varDataVal(colDataGetData(pInputData[2], colIdx3)), &toStr, varDataLen(colDataGetData(pInputData[2], colIdx3)), &toLen, GET_PARAM_TYPE(&pInput[0]))); needFreeTo = true; @@ -1544,9 +1584,11 @@ int32_t replaceFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pO taosMemoryFree(fromStr); } varDataSetLen(outputBuf, totalLen); - SCL_ERR_RET(colDataSetVal(pOutputData, i, outputBuf, false)); + SCL_ERR_JRET(colDataSetVal(pOutputData, i, outputBuf, false)); } pOutput->numOfRows = numOfRows; +_return: + taosMemoryFree(outputBuf); return code; } @@ -1561,10 +1603,16 @@ int32_t substrIdxFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam * pInputData[1] = pInput[1].columnData; pInputData[2] = pInput[2].columnData; + for (int32_t i = 0; i < inputNum; ++i) { + if (pInput[i].numOfRows > numOfRows) { + numOfRows = pInput[i].numOfRows; + } + } + outputLen = pInputData[0]->info.bytes; if (GET_PARAM_TYPE(&pInput[0]) == TSDB_DATA_TYPE_NULL || GET_PARAM_TYPE(&pInput[1]) == TSDB_DATA_TYPE_NULL || GET_PARAM_TYPE(&pInput[2]) == TSDB_DATA_TYPE_NULL) { - colDataSetNNULL(pOutputData, 0, pInput[0].numOfRows); - pOutput->numOfRows = pInput[0].numOfRows; + colDataSetNNULL(pOutputData, 0, numOfRows); + pOutput->numOfRows = numOfRows; return TSDB_CODE_SUCCESS; } char *outputBuf = taosMemoryCalloc(outputLen + VARSTR_HEADER_SIZE, 1); @@ -1572,12 +1620,6 @@ int32_t substrIdxFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam * SCL_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - for (int32_t i = 0; i < inputNum; ++i) { - if (pInput[i].numOfRows > numOfRows) { - numOfRows = pInput[i].numOfRows; - } - } - for (int32_t k = 0; k < numOfRows; ++k) { bool hasNull = false; for (int32_t i = 0; i < inputNum; ++i) { @@ -1600,9 +1642,9 @@ int32_t substrIdxFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam * int32_t startPosBytes; int32_t endPosBytes; if (GET_PARAM_TYPE(&pInput[0]) != GET_PARAM_TYPE(&pInput[1])) { - SCL_ERR_RET(convBetweenNcharAndVarchar(varDataVal(colDataGetData(pInputData[1], colIdx2)), &delimStr, - varDataLen(colDataGetData(pInputData[1], colIdx2)), &delimLen, - GET_PARAM_TYPE(&pInput[0]))); + SCL_ERR_JRET(convBetweenNcharAndVarchar(varDataVal(colDataGetData(pInputData[1], colIdx2)), &delimStr, + varDataLen(colDataGetData(pInputData[1], colIdx2)), &delimLen, + GET_PARAM_TYPE(&pInput[0]))); needFreeDelim = true; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 4bf74d8d4f..9be8f5ffaa 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -561,12 +561,14 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d", id, vgId, pReq->taskId, numOfTasks); } + streamMetaWLock(pMeta); - if (streamMetaCommit(pMeta) < 0) { - // persist to disk + if (pReq->dropRelHTask) { + code = streamMetaCommit(pMeta); } } + // always return true return TSDB_CODE_SUCCESS; } @@ -594,13 +596,15 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer && pInfo->processedVer <= pReq->checkpointVer); - // update only it is in checkpoint status. - if (pStatus.state == TASK_STATUS__CK) { + // update only it is in checkpoint status, or during restore procedure. + if (pStatus.state == TASK_STATUS__CK || (!restored)) { pInfo->checkpointId = pReq->checkpointId; pInfo->checkpointVer = pReq->checkpointVer; pInfo->checkpointTime = pReq->checkpointTs; - code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); + if (restored) { + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); + } } streamTaskClearCheckInfo(pTask, true); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 5a9a60db1d..bf64af6558 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -762,18 +762,27 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { code = sendDispatchMsg(pTask, pTask->msgInfo.pData); - streamMutexLock(&pTask->msgInfo.lock); - if (pTask->msgInfo.inMonitor == 0) { - int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s start dispatch monitor tmr in %dms, ref:%d, dispatch code:%s", id, DISPATCH_RETRY_INTERVAL_MS, - ref, tstrerror(code)); - streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); - pTask->msgInfo.inMonitor = 1; - } else { - stDebug("s-task:%s already in dispatch monitor tmr", id); - } + // todo: secure the timerActive and start timer in after lock pTask->lock + streamMutexLock(&pTask->lock); + bool shouldStop = streamTaskShouldStop(pTask); + streamMutexUnlock(&pTask->lock); - streamMutexUnlock(&pTask->msgInfo.lock); + if (shouldStop) { + stDebug("s-task:%s in stop/dropping status, not start dispatch monitor tmr", id); + } else { + streamMutexLock(&pTask->msgInfo.lock); + if (pTask->msgInfo.inMonitor == 0) { + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s start dispatch monitor tmr in %dms, ref:%d, dispatch code:%s", id, DISPATCH_RETRY_INTERVAL_MS, + ref, tstrerror(code)); + streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS); + pTask->msgInfo.inMonitor = 1; + } else { + stDebug("s-task:%s already in dispatch monitor tmr", id); + } + + streamMutexUnlock(&pTask->msgInfo.lock); + } // this block can not be deleted until it has been sent to downstream task successfully. return TSDB_CODE_SUCCESS; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index d222004fb7..cd69c9168c 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -631,7 +631,13 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB (void) streamTaskReloadState(pTask); stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr, streamTaskGetStatus(pHTask).name); - + // todo execute qExecTask to fetch the reload-generated result, if this is stream is for session window query. + /* + * while(1) { + * qExecTask() + * } + * // put into the output queue. + */ streamMetaReleaseTask(pTask->pMeta, pHTask); } else { stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id, diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index e8afe96339..c50c3c484e 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -431,7 +431,8 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, pMeta->expandTaskFn = expandTaskFn; pMeta->stage = stage; pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT; - pMeta->updateInfo.transId = -1; + pMeta->updateInfo.activeTransId = -1; + pMeta->updateInfo.completeTransId = -1; pMeta->startInfo.completeFn = fn; pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); @@ -890,24 +891,28 @@ int32_t streamMetaBegin(SStreamMeta* pMeta) { } int32_t streamMetaCommit(SStreamMeta* pMeta) { - if (tdbCommit(pMeta->db, pMeta->txn) < 0) { + int32_t code = 0; + code = tdbCommit(pMeta->db, pMeta->txn); + if (code != 0) { stError("vgId:%d failed to commit stream meta", pMeta->vgId); - return -1; + return code; } - if (tdbPostCommit(pMeta->db, pMeta->txn) < 0) { + code = tdbPostCommit(pMeta->db, pMeta->txn); + if (code != 0) { stError("vgId:%d failed to do post-commit stream meta", pMeta->vgId); - return -1; + return code; } - if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, - TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { + code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, + TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + if (code != 0) { stError("vgId:%d failed to begin trans", pMeta->vgId); - return -1; + return code; } stDebug("vgId:%d stream meta file commit completed", pMeta->vgId); - return 0; + return code; } int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) { @@ -1781,12 +1786,56 @@ void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SSt } } -void streamMetaClearUpdateTaskList(SStreamMeta* pMeta) { - taosHashClear(pMeta->updateInfo.pTasks); - pMeta->updateInfo.transId = -1; +void streamMetaClearSetUpdateTaskListComplete(SStreamMeta* pMeta) { + STaskUpdateInfo* pInfo = &pMeta->updateInfo; + + taosHashClear(pInfo->pTasks); + + int32_t prev = pInfo->completeTransId; + pInfo->completeTransId = pInfo->activeTransId; + pInfo->activeTransId = -1; + pInfo->completeTs = taosGetTimestampMs(); + + stDebug("vgId:%d set the nodeEp update complete, ts:%" PRId64 ", complete transId:%d->%d, reset active transId", + pMeta->vgId, pInfo->completeTs, prev, pInfo->completeTransId); } -void streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId) { - taosHashClear(pMeta->updateInfo.pTasks); - pMeta->updateInfo.transId = transId; +bool streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId) { + STaskUpdateInfo* pInfo = &pMeta->updateInfo; + + if (transId > pInfo->completeTransId) { + if (pInfo->activeTransId == -1) { + taosHashClear(pInfo->pTasks); + pInfo->activeTransId = transId; + + stInfo("vgId:%d set the active epset update transId:%d, prev complete transId:%d", pMeta->vgId, transId, + pInfo->completeTransId); + return true; + } else { + if (pInfo->activeTransId == transId) { + // do nothing + return true; + } else if (transId < pInfo->activeTransId) { + stError("vgId:%d invalid(out of order)epset update transId:%d, active transId:%d, complete transId:%d, discard", + pMeta->vgId, transId, pInfo->activeTransId, pInfo->completeTransId); + return false; + } else { // transId > pInfo->activeTransId + taosHashClear(pInfo->pTasks); + int32_t prev = pInfo->activeTransId; + pInfo->activeTransId = transId; + + stInfo("vgId:%d active epset update transId updated from:%d to %d, prev complete transId:%d", pMeta->vgId, + transId, prev, pInfo->completeTransId); + return true; + } + } + } else if (transId == pInfo->completeTransId) { + stError("vgId:%d already handled epset update transId:%d, completeTs:%" PRId64 " ignore", pMeta->vgId, transId, + pInfo->completeTs); + return false; + } else { // pInfo->completeTransId > transId + stError("vgId:%d disorder update nodeEp msg recv, prev completed epset update transId:%d, recv:%d, discard", + pMeta->vgId, pInfo->activeTransId, transId); + return false; + } } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index c0b2b16d30..f190673430 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -637,7 +637,7 @@ bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) { numOfNodes, p->updateCount, prevTs); bool updated = false; - for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) { + for (int32_t i = 0; i < numOfNodes; ++i) { SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i); if (pInfo == NULL) { continue; diff --git a/source/libs/tdb/src/db/tdbPager.c b/source/libs/tdb/src/db/tdbPager.c index 01dd0ac766..a650847e1e 100644 --- a/source/libs/tdb/src/db/tdbPager.c +++ b/source/libs/tdb/src/db/tdbPager.c @@ -1180,7 +1180,7 @@ int tdbPagerRestoreJournals(SPager *pPager) { if (code) { taosArrayDestroy(pTxnList); (void)tdbCloseDir(&pDir); - tdbError("failed to restore file due to %s. jFileName:%s", strerror(code), jname); + tdbError("failed to restore file due to %s. jFileName:%s", tstrerror(code), jname); return code; } } diff --git a/tests/docs-examples-test/csharp.sh b/tests/docs-examples-test/csharp.sh index 0805b425b4..497cb074d9 100644 --- a/tests/docs-examples-test/csharp.sh +++ b/tests/docs-examples-test/csharp.sh @@ -2,6 +2,32 @@ set -e +check_transactions() { + for i in {1..30} + do + output=$(taos -s "show transactions;") + if [[ $output == *"Query OK, 0 row(s)"* ]]; then + echo "Success: No transactions are in progress." + return 0 + fi + sleep 1 + done + + echo "Error: Transactions are still in progress after 30 attempts." + return 1 +} + +reset_cache() { + response=$(curl --location -uroot:taosdata 'http://127.0.0.1:6041/rest/sql' --data 'reset query cache') + + if [[ $response == \{\"code\":0* ]]; then + echo "Success: Query cache reset successfully." + else + echo "Error: Failed to reset query cache. Response: $response" + return 1 + fi +} + pgrep taosd || taosd >> /dev/null 2>&1 & pgrep taosadapter || taosadapter >> /dev/null 2>&1 & cd ../../docs/examples/csharp @@ -10,56 +36,69 @@ dotnet run --project connect/connect.csproj dotnet run --project wsConnect/wsConnect.csproj taos -s "drop database if exists test" -sleep 1 +check_transactions || exit 1 +reset_cache || exit 1 dotnet run --project influxdbLine/influxdbline.csproj taos -s "drop database if exists test" -sleep 1 +check_transactions || exit 1 +reset_cache || exit 1 dotnet run --project optsTelnet/optstelnet.csproj taos -s "drop database if exists test" -sleep 1 +check_transactions || exit 1 +reset_cache || exit 1 dotnet run --project optsJSON/optsJSON.csproj # query taos -s "drop database if exists power" -sleep 1 +check_transactions || exit 1 +reset_cache || exit 1 dotnet run --project wsInsert/wsInsert.csproj dotnet run --project wsQuery/wsQuery.csproj taos -s "drop database if exists power" -sleep 1 +check_transactions || exit 1 +reset_cache || exit 1 dotnet run --project sqlInsert/sqlinsert.csproj dotnet run --project query/query.csproj # stmt taos -s "drop database if exists power" -sleep 1 +check_transactions || exit 1 +reset_cache || exit 1 dotnet run --project wsStmt/wsStmt.csproj taos -s "drop database if exists power" -sleep 1 +check_transactions || exit 1 +reset_cache || exit 1 dotnet run --project stmtInsert/stmtinsert.csproj # schemaless taos -s "drop database if exists power" -sleep 1 +check_transactions || exit 1 +reset_cache || exit 1 dotnet run --project wssml/wssml.csproj taos -s "drop database if exists power" -sleep 1 +check_transactions || exit 1 +reset_cache || exit 1 dotnet run --project nativesml/nativesml.csproj # subscribe taos -s "drop topic if exists topic_meters" -sleep 1 +check_transactions || exit 1 +reset_cache || exit 1 taos -s "drop database if exists power" -sleep 1 +check_transactions || exit 1 +reset_cache || exit 1 dotnet run --project wssubscribe/wssubscribe.csproj taos -s "drop topic if exists topic_meters" -sleep 1 +check_transactions || exit 1 +reset_cache || exit 1 taos -s "drop database if exists power" -sleep 1 +check_transactions || exit 1 +reset_cache || exit 1 dotnet run --project subscribe/subscribe.csproj diff --git a/tests/docs-examples-test/go.sh b/tests/docs-examples-test/go.sh index ea19d3212a..606265435d 100644 --- a/tests/docs-examples-test/go.sh +++ b/tests/docs-examples-test/go.sh @@ -17,6 +17,17 @@ check_transactions() { return 1 } +reset_cache() { + response=$(curl --location -uroot:taosdata 'http://127.0.0.1:6041/rest/sql' --data 'reset query cache') + + if [[ $response == \{\"code\":0* ]]; then + echo "Success: Query cache reset successfully." + else + echo "Error: Failed to reset query cache. Response: $response" + return 1 + fi +} + taosd >>/dev/null 2>&1 & taosadapter >>/dev/null 2>&1 & sleep 1 @@ -31,64 +42,83 @@ go run ./connect/connpool/main.go go run ./connect/wsexample/main.go taos -s "drop database if exists power" +check_transactions || exit 1 +reset_cache || exit 1 go run ./sqlquery/main.go taos -s "drop database if exists power" check_transactions || exit 1 +reset_cache || exit 1 go run ./queryreqid/main.go taos -s "drop database if exists power" check_transactions || exit 1 +reset_cache || exit 1 go run ./stmt/native/main.go taos -s "drop database if exists power" check_transactions || exit 1 +reset_cache || exit 1 go run ./stmt/ws/main.go taos -s "drop database if exists power" check_transactions || exit 1 +reset_cache || exit 1 sleep 3 go run ./schemaless/native/main.go taos -s "drop database if exists power" check_transactions || exit 1 +reset_cache || exit 1 go run ./schemaless/ws/main.go taos -s "drop topic if exists topic_meters" check_transactions || exit 1 +reset_cache || exit 1 taos -s "drop database if exists power" check_transactions || exit 1 +reset_cache || exit 1 go run ./tmq/native/main.go taos -s "drop topic if exists topic_meters" check_transactions || exit 1 +reset_cache || exit 1 taos -s "drop database if exists power" check_transactions || exit 1 +reset_cache || exit 1 go run ./tmq/ws/main.go taos -s "drop database if exists test" check_transactions || exit 1 +reset_cache || exit 1 go run ./insert/json/main.go taos -s "drop database if exists test" check_transactions || exit 1 +reset_cache || exit 1 go run ./insert/line/main.go taos -s "drop topic if exists topic_meters" check_transactions || exit 1 +reset_cache || exit 1 taos -s "drop database if exists power" check_transactions || exit 1 +reset_cache || exit 1 go run ./insert/sql/main.go taos -s "drop database if exists power" check_transactions || exit 1 +reset_cache || exit 1 go run ./insert/stmt/main.go taos -s "drop database if exists test" check_transactions || exit 1 +reset_cache || exit 1 go run ./insert/telnet/main.go go run ./query/sync/main.go taos -s "drop topic if exists example_tmq_topic" check_transactions || exit 1 +reset_cache || exit 1 taos -s "drop database if exists example_tmq" check_transactions || exit 1 +reset_cache || exit 1 go run ./sub/main.go