From e6dd04f5739caaaf30eb292240036a0bd50b8c6c Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Mon, 27 Jun 2022 14:47:14 +0800 Subject: [PATCH] refactor: rsma restore --- include/common/tdataformat.h | 1 - include/common/tmsg.h | 2 +- source/common/src/tdataformat.c | 16 -------- source/common/src/tmsg.c | 4 +- source/dnode/mnode/impl/src/mndStb.c | 20 +++++----- source/dnode/vnode/inc/vnode.h | 5 +++ source/dnode/vnode/src/meta/metaEntry.c | 18 ++++++--- source/dnode/vnode/src/meta/metaQuery.c | 2 + source/dnode/vnode/src/meta/metaTable.c | 4 ++ source/dnode/vnode/src/sma/smaEnv.c | 12 +++--- source/dnode/vnode/src/sma/smaOpen.c | 49 +++++++++++++++++++++---- source/dnode/vnode/src/sma/smaRollup.c | 19 +++------- source/dnode/vnode/src/tsdb/tsdbRead.c | 3 ++ source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/executorMain.c | 8 +++- source/libs/executor/src/executorimpl.c | 15 ++++++-- 16 files changed, 111 insertions(+), 69 deletions(-) diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index f9ede63f7f..8f7f22a6a0 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -78,7 +78,6 @@ int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag); int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag); int32_t tTagToValArray(const STag *pTag, SArray **ppArray); void debugPrintSTag(STag *pTag, const char *tag, int32_t ln); // TODO: remove -void debugCheckTags(STag *pTag); // TODO: remove // STRUCT ================= struct STColumn { diff --git a/include/common/tmsg.h b/include/common/tmsg.h index a5688af18a..c5b0b89311 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1886,7 +1886,7 @@ typedef struct SVCreateStbReq { int8_t rollup; SSchemaWrapper schemaRow; SSchemaWrapper schemaTag; - SRSmaParam pRSmaParam; + SRSmaParam rsmaParam; } SVCreateStbReq; int tEncodeSVCreateStbReq(SEncoder* pCoder, const SVCreateStbReq* pReq); diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 8460a27a0e..7c1b31b6e4 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -862,21 +862,6 @@ void debugPrintSTag(STag *pTag, const char *tag, int32_t ln) { printf("\n"); } -void debugCheckTags(STag *pTag) { - switch (pTag->flags) { - case 0x0: - case 0x20: - case 0x40: - case 0x60: - break; - default: - ASSERT(0); - } - - ASSERT(pTag->nTag <= 128 && pTag->nTag >= 0); - ASSERT(pTag->ver <= 512 && pTag->ver >= 0); // temp condition for pTag->ver -} - static int32_t tPutTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) { int32_t n = 0; @@ -999,7 +984,6 @@ int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag) { debugPrintSTag(*ppTag, __func__, __LINE__); #endif - debugCheckTags(*ppTag); // TODO: remove this line after debug return code; _err: diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 8a052026f2..e9b5c67d76 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4763,7 +4763,7 @@ int tEncodeSVCreateStbReq(SEncoder *pCoder, const SVCreateStbReq *pReq) { if (tEncodeSSchemaWrapper(pCoder, &pReq->schemaRow) < 0) return -1; if (tEncodeSSchemaWrapper(pCoder, &pReq->schemaTag) < 0) return -1; if (pReq->rollup) { - if (tEncodeSRSmaParam(pCoder, &pReq->pRSmaParam) < 0) return -1; + if (tEncodeSRSmaParam(pCoder, &pReq->rsmaParam) < 0) return -1; } tEndEncode(pCoder); @@ -4779,7 +4779,7 @@ int tDecodeSVCreateStbReq(SDecoder *pCoder, SVCreateStbReq *pReq) { if (tDecodeSSchemaWrapper(pCoder, &pReq->schemaRow) < 0) return -1; if (tDecodeSSchemaWrapper(pCoder, &pReq->schemaTag) < 0) return -1; if (pReq->rollup) { - if (tDecodeSRSmaParam(pCoder, &pReq->pRSmaParam) < 0) return -1; + if (tDecodeSRSmaParam(pCoder, &pReq->rsmaParam) < 0) return -1; } tEndDecode(pCoder); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 92f85ecd04..dd01a0fa16 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -427,17 +427,17 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt req.schemaTag.pSchema = pStb->pTags; if (req.rollup) { - req.pRSmaParam.maxdelay[0] = pStb->maxdelay[0]; - req.pRSmaParam.maxdelay[1] = pStb->maxdelay[1]; + req.rsmaParam.maxdelay[0] = pStb->maxdelay[0]; + req.rsmaParam.maxdelay[1] = pStb->maxdelay[1]; if (pStb->ast1Len > 0) { - if (mndConvertRsmaTask(&req.pRSmaParam.qmsg[0], &req.pRSmaParam.qmsgLen[0], pStb->pAst1, pStb->uid, - STREAM_TRIGGER_WINDOW_CLOSE, req.pRSmaParam.watermark[0]) < 0) { + if (mndConvertRsmaTask(&req.rsmaParam.qmsg[0], &req.rsmaParam.qmsgLen[0], pStb->pAst1, pStb->uid, + STREAM_TRIGGER_WINDOW_CLOSE, req.rsmaParam.watermark[0]) < 0) { goto _err; } } if (pStb->ast2Len > 0) { - if (mndConvertRsmaTask(&req.pRSmaParam.qmsg[1], &req.pRSmaParam.qmsgLen[1], pStb->pAst2, pStb->uid, - STREAM_TRIGGER_WINDOW_CLOSE, req.pRSmaParam.watermark[1]) < 0) { + if (mndConvertRsmaTask(&req.rsmaParam.qmsg[1], &req.rsmaParam.qmsgLen[1], pStb->pAst2, pStb->uid, + STREAM_TRIGGER_WINDOW_CLOSE, req.rsmaParam.watermark[1]) < 0) { goto _err; } } @@ -470,12 +470,12 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt tEncoderClear(&encoder); *pContLen = contLen; - taosMemoryFreeClear(req.pRSmaParam.qmsg[0]); - taosMemoryFreeClear(req.pRSmaParam.qmsg[1]); + taosMemoryFreeClear(req.rsmaParam.qmsg[0]); + taosMemoryFreeClear(req.rsmaParam.qmsg[1]); return pHead; _err: - taosMemoryFreeClear(req.pRSmaParam.qmsg[0]); - taosMemoryFreeClear(req.pRSmaParam.qmsg[1]); + taosMemoryFreeClear(req.rsmaParam.qmsg[0]); + taosMemoryFreeClear(req.rsmaParam.qmsg[1]); return NULL; } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index c49b33beb2..5c2d2cd712 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -199,15 +199,20 @@ typedef struct { uint64_t groupId; } STableKeyInfo; +#define TABLE_ROLLUP_ON ((int8_t)0x1) +#define TABLE_IS_ROLLUP(FLG) (((FLG) & (TABLE_ROLLUP_ON)) != 0) +#define TABLE_SET_ROLLUP(FLG) ((FLG) |= TABLE_ROLLUP_ON) struct SMetaEntry { int64_t version; int8_t type; + int8_t flags; // TODO: need refactor? tb_uid_t uid; char *name; union { struct { SSchemaWrapper schemaRow; SSchemaWrapper schemaTag; + SRSmaParam rsmaParam; } stbEntry; struct { int64_t ctime; diff --git a/source/dnode/vnode/src/meta/metaEntry.c b/source/dnode/vnode/src/meta/metaEntry.c index acf5b0b613..23d7665ba3 100644 --- a/source/dnode/vnode/src/meta/metaEntry.c +++ b/source/dnode/vnode/src/meta/metaEntry.c @@ -24,22 +24,25 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) { if (tEncodeCStr(pCoder, pME->name) < 0) return -1; if (pME->type == TSDB_SUPER_TABLE) { + if (tEncodeI8(pCoder, pME->flags) < 0) return -1; // TODO: need refactor? if (tEncodeSSchemaWrapper(pCoder, &pME->stbEntry.schemaRow) < 0) return -1; if (tEncodeSSchemaWrapper(pCoder, &pME->stbEntry.schemaTag) < 0) return -1; + if (TABLE_IS_ROLLUP(pME->flags)) { + if (tEncodeSRSmaParam(pCoder, &pME->stbEntry.rsmaParam) < 0) return -1; + } } else if (pME->type == TSDB_CHILD_TABLE) { if (tEncodeI64(pCoder, pME->ctbEntry.ctime) < 0) return -1; if (tEncodeI32(pCoder, pME->ctbEntry.ttlDays) < 0) return -1; - if (tEncodeI32(pCoder, pME->ctbEntry.commentLen) < 0) return -1; + if (tEncodeI32v(pCoder, pME->ctbEntry.commentLen) < 0) return -1; if (pME->ctbEntry.commentLen > 0){ if (tEncodeCStr(pCoder, pME->ctbEntry.comment) < 0) return -1; } if (tEncodeI64(pCoder, pME->ctbEntry.suid) < 0) return -1; - debugCheckTags((STag*)pME->ctbEntry.pTags); // TODO: remove after debug if (tEncodeTag(pCoder, (const STag *)pME->ctbEntry.pTags) < 0) return -1; } else if (pME->type == TSDB_NORMAL_TABLE) { if (tEncodeI64(pCoder, pME->ntbEntry.ctime) < 0) return -1; if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1; - if (tEncodeI32(pCoder, pME->ntbEntry.commentLen) < 0) return -1; + if (tEncodeI32v(pCoder, pME->ntbEntry.commentLen) < 0) return -1; if (pME->ntbEntry.commentLen > 0){ if (tEncodeCStr(pCoder, pME->ntbEntry.comment) < 0) return -1; } @@ -64,23 +67,26 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) { if (tDecodeCStr(pCoder, &pME->name) < 0) return -1; if (pME->type == TSDB_SUPER_TABLE) { + if (tDecodeI8(pCoder, &pME->flags) < 0) return -1; // TODO: need refactor? if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schemaRow) < 0) return -1; if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schemaTag) < 0) return -1; + if (TABLE_IS_ROLLUP(pME->flags)) { + if (tDecodeSRSmaParam(pCoder, &pME->stbEntry.rsmaParam) < 0) return -1; + } } else if (pME->type == TSDB_CHILD_TABLE) { if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1; if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1; - if (tDecodeI32(pCoder, &pME->ctbEntry.commentLen) < 0) return -1; + if (tDecodeI32v(pCoder, &pME->ctbEntry.commentLen) < 0) return -1; if (pME->ctbEntry.commentLen > 0){ if (tDecodeCStr(pCoder, &pME->ctbEntry.comment) < 0) return -1; } if (tDecodeI64(pCoder, &pME->ctbEntry.suid) < 0) return -1; if (tDecodeTag(pCoder, (STag **)&pME->ctbEntry.pTags) < 0) return -1; // (TODO) - debugCheckTags((STag*)pME->ctbEntry.pTags); // TODO: remove after debug } else if (pME->type == TSDB_NORMAL_TABLE) { if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1; if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1; - if (tDecodeI32(pCoder, &pME->ntbEntry.commentLen) < 0) return -1; + if (tDecodeI32v(pCoder, &pME->ntbEntry.commentLen) < 0) return -1; if (pME->ntbEntry.commentLen > 0){ if (tDecodeCStr(pCoder, &pME->ntbEntry.comment) < 0) return -1; } diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index f57ee54400..85106f46c2 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -342,6 +342,7 @@ SMStbCursor *metaOpenStbCursor(SMeta *pMeta, tb_uid_t suid) { pStbCur = (SMStbCursor *)taosMemoryCalloc(1, sizeof(*pStbCur)); if (pStbCur == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -351,6 +352,7 @@ SMStbCursor *metaOpenStbCursor(SMeta *pMeta, tb_uid_t suid) { ret = tdbTbcOpen(pMeta->pSuidIdx, &pStbCur->pCur, NULL); if (ret < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; metaULock(pMeta); taosMemoryFree(pStbCur); return NULL; diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index ea425ca7de..a621b4ddb0 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -139,6 +139,10 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { me.name = pReq->name; me.stbEntry.schemaRow = pReq->schemaRow; me.stbEntry.schemaTag = pReq->schemaTag; + if (pReq->rollup) { + TABLE_SET_ROLLUP(me.flags); + me.stbEntry.rsmaParam = pReq->rsmaParam; + } if (metaHandleEntry(pMeta, &me) < 0) goto _err; diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index 1e8832615e..a5194d176e 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -174,11 +174,8 @@ static void tdDestroyRSmaStat(SRSmaStat *pStat) { // step 1: set persistence task cancelled atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED); - // step 2: clean timer + // step 2: stop the persistence timer taosTmrStopA(&RSMA_TMR_ID(pStat)); - if (RSMA_TMR_HANDLE(pStat)) { - taosTmrCleanUp(RSMA_TMR_HANDLE(pStat)); - } // step 3: wait the persistence thread to finish int32_t nLoops = 0; @@ -194,7 +191,6 @@ static void tdDestroyRSmaStat(SRSmaStat *pStat) { sched_yield(); nLoops = 0; } - taosMsleep(1000); // TODO: remove this line when release } } @@ -219,7 +215,11 @@ static void tdDestroyRSmaStat(SRSmaStat *pStat) { sched_yield(); nLoops = 0; } - taosMsleep(1000); // TODO: remove this line when release + } + + // step 6: free the timer handle + if (RSMA_TMR_HANDLE(pStat)) { + taosTmrCleanUp(RSMA_TMR_HANDLE(pStat)); } } } diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index 2f40df8b45..681b9131cc 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -18,6 +18,7 @@ static int32_t smaEvalDays(SRetention *r, int8_t precision); static int32_t smaSetKeepCfg(STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type); +static int32_t smaRestore(SSma *pSma); #define SMA_SET_KEEP_CFG(l) \ do { \ @@ -120,6 +121,12 @@ int32_t smaOpen(SVnode *pVnode) { } pVnode->pSma = pSma; + + // restore the sma + if (smaRestore(pSma) < 0) { + goto _err; + } + return 0; _err: taosMemoryFreeClear(pSma); @@ -127,7 +134,7 @@ _err: } int32_t smaCloseEnv(SSma *pSma) { - if(pSma) { + if (pSma) { SMA_TSMA_ENV(pSma) = tdFreeSmaEnv(SMA_TSMA_ENV(pSma)); SMA_RSMA_ENV(pSma) = tdFreeSmaEnv(SMA_RSMA_ENV(pSma)); } @@ -153,13 +160,41 @@ int32_t smaClose(SSma *pSma) { /** * @brief rsma env restore - * - * @param pSma - * @return int32_t + * + * @param pSma + * @return int32_t */ -int32_t smaRestore(SSma *pSma) { - if (!pSma) return 0; +static int32_t smaRestore(SSma *pSma) { // iterate all stables to restore the rsma env - + SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t)); + if (tsdbGetStbIdList(SMA_META(pSma), 0, suidList) < 0) { + smaError("failed to restore rsma since get stb id list error: %s", terrstr()); + return TSDB_CODE_FAILED; + } + + SMetaReader mr = {0}; + metaReaderInit(&mr, SMA_META(pSma), 0); + for (int32_t i = 0; i < taosArrayGetSize(suidList); ++i) { + tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i); + smaDebug("suid [%d] is %" PRIi64, i, suid); + if (metaGetTableEntryByUid(&mr, suid) < 0) { + metaReaderClear(&mr); + taosArrayDestroy(suidList); + smaError("failed to get table meta for %" PRIi64 " since %s", suid, terrstr()); + return TSDB_CODE_FAILED; + } + ASSERT(mr.me.type == TSDB_SUPER_TABLE); + if (TABLE_IS_ROLLUP(mr.me.flags)) { + SRSmaParam *param = &mr.me.stbEntry.rsmaParam; + for (int i = 0; i < 2; ++i) { + smaDebug("%s:%d table:%" PRIi64 " maxdelay[%d]:%" PRIi64 " watermark[%d]:%" PRIi64, __func__, __LINE__, suid, i, + param->maxdelay[i], i, param->watermark[i]); + } + } + } + + metaReaderClear(&mr); + taosArrayDestroy(suidList); + return TSDB_CODE_SUCCESS; } \ No newline at end of file diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 020ee38db9..0f30aeea6a 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -246,7 +246,7 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { SMeta *pMeta = pVnode->pMeta; SMsgCb *pMsgCb = &pVnode->msgCb; - SRSmaParam *param = &pReq->pRSmaParam; + SRSmaParam *param = &pReq->rsmaParam; if ((param->qmsgLen[0] == 0) && (param->qmsgLen[1] == 0)) { smaWarn("vgId:%d, no qmsg1/qmsg2 for rollup stable %s %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid); @@ -502,8 +502,10 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) } taosMemoryFreeClear(pReq); + } else if (terrno == 0) { + smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level); } else { - smaDebug("vgId:%d, no rsma %" PRIi8 " data generated since %s", SMA_VID(pSma), pItem->level, tstrerror(terrno)); + smaDebug("vgId:%d, no rsma %" PRIi8 " data fetched since %s", SMA_VID(pSma), pItem->level, tstrerror(terrno)); } tdDestroySDataBlockArray(pResult); @@ -661,18 +663,6 @@ static void *tdRSmaPersistExec(void *param) { goto _end; } -#if 0 - SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t)); - if (tsdbGetStbIdList(SMA_META(pSma), 0, suidList) < 0) { - ASSERT(0); - } else { - for (int32_t i = 0; i < taosArrayGetSize(suidList); ++i) { - tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i); - smaDebug("suid [%d] is %" PRIi64, i, suid); - } - } -#endif - void *infoHash = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL); if (!infoHash) { goto _end; @@ -852,6 +842,7 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) { } break; default: { smaWarn("%s:%d rsma persistence not start since unknown stat %" PRIi8, __func__, __LINE__, tmrStat); + ASSERT(0); } break; } } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index bd40fd4a1f..03f18cc766 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2886,6 +2886,9 @@ int32_t tsdbGetCtbIdList(SMeta* pMeta, int64_t suid, SArray* list) { */ int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) { SMStbCursor* pCur = metaOpenStbCursor(pMeta, suid); + if(!pCur) { + return TSDB_CODE_FAILED; + } while (1) { tb_uid_t id = metaStbCursorNext(pCur); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 5b0dae00cf..eecefe625c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -811,7 +811,7 @@ int32_t getMaximumIdleDurationSec(); * nOptrWithVal: *nOptrWithVal save the number of optr with value * return: result code, 0 means success */ -int32_t encodeOperator(SOperatorInfo* ops, char** data, int32_t *length); +int32_t encodeOperator(SOperatorInfo* ops, char** data, int32_t *length, int32_t *nOptrWithVal); /* * ops: root operator, created by caller diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index a9e1e03178..e4c0959185 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -222,7 +222,13 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) { return TSDB_CODE_INVALID_PARA; } - return encodeOperator(pTaskInfo->pRoot, pOutput, len); + int32_t nOptrWithVal = 0; + int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal); + if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal = 0)) { + taosMemoryFreeClear(*pOutput); + *len = 0; + } + return code; } int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 56cf6a5a72..0720e8d8c4 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4472,12 +4472,12 @@ int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHa return 0; } -int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length) { +int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length, int32_t* nOptrWithVal) { int32_t code = TDB_CODE_SUCCESS; char* pCurrent = NULL; int32_t currLength = 0; if (ops->fpSet.encodeResultRow) { - if (result == NULL || length == NULL) { + if (result == NULL || length == NULL || nOptrWithVal == NULL) { return TSDB_CODE_TSC_INVALID_INPUT; } code = ops->fpSet.encodeResultRow(ops, &pCurrent, &currLength); @@ -4488,8 +4488,13 @@ int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length) { *result = NULL; } return code; + } else if (currLength == 0) { + ASSERT(!pCurrent); + goto _downstream; } - + + ++(*nOptrWithVal); + ASSERT(currLength >= 0); if (*result == NULL) { @@ -4516,8 +4521,10 @@ int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length) { taosMemoryFree(pCurrent); *length = *(int32_t*)(*result); } + +_downstream: for (int32_t i = 0; i < ops->numOfDownstream; ++i) { - code = encodeOperator(ops->pDownstream[i], result, length); + code = encodeOperator(ops->pDownstream[i], result, length, nOptrWithVal); if (code != TDB_CODE_SUCCESS) { return code; }