diff --git a/include/common/tmsg.h b/include/common/tmsg.h index fb32cb382a..7501c8fce0 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2427,6 +2427,7 @@ typedef struct { static FORCE_INLINE void tDestroyTSma(STSma* pSma) { if (pSma) { + taosMemoryFreeClear(pSma->dstTbName); taosMemoryFreeClear(pSma->expr); taosMemoryFreeClear(pSma->tagsFilter); } @@ -2455,7 +2456,7 @@ int32_t tEncodeSVCreateTSmaReq(SEncoder* pCoder, const SVCreateTSmaReq* pReq); int32_t tDecodeSVCreateTSmaReq(SDecoder* pCoder, SVCreateTSmaReq* pReq); int32_t tEncodeTSma(SEncoder* pCoder, const STSma* pSma); -int32_t tDecodeTSma(SDecoder* pCoder, STSma* pSma); +int32_t tDecodeTSma(SDecoder* pCoder, STSma* pSma, bool deepCopy); static int32_t tEncodeTSmaWrapper(SEncoder* pEncoder, const STSmaWrapper* pReq) { if (tEncodeI32(pEncoder, pReq->number) < 0) return -1; @@ -2465,10 +2466,10 @@ static int32_t tEncodeTSmaWrapper(SEncoder* pEncoder, const STSmaWrapper* pReq) return 0; } -static int32_t tDecodeTSmaWrapper(SDecoder* pDecoder, STSmaWrapper* pReq) { +static int32_t tDecodeTSmaWrapper(SDecoder* pDecoder, STSmaWrapper* pReq, bool deepCopy) { if (tDecodeI32(pDecoder, &pReq->number) < 0) return -1; for (int32_t i = 0; i < pReq->number; ++i) { - tDecodeTSma(pDecoder, pReq->tSma + i); + tDecodeTSma(pDecoder, pReq->tSma + i, deepCopy); } return 0; } diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 8d9951f9e3..1231fed867 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -684,12 +684,15 @@ int32_t* taosGetErrno(); #define TSDB_CODE_SML_INVALID_DB_CONF TAOS_DEF_ERROR_CODE(0, 0x3003) //tsma -#define TSDB_CODE_TSMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x3100) -#define TSDB_CODE_TSMA_NO_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x3101) -#define TSDB_CODE_TSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3102) -#define TSDB_CODE_TSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3103) -#define TSDB_CODE_TSMA_NO_INDEX_IN_CACHE TAOS_DEF_ERROR_CODE(0, 0x3104) -#define TSDB_CODE_TSMA_RM_SKEY_IN_HASH TAOS_DEF_ERROR_CODE(0, 0x3105) +#define TSDB_CODE_TSMA_INIT_FAILED TAOS_DEF_ERROR_CODE(0, 0x3100) +#define TSDB_CODE_TSMA_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x3101) +#define TSDB_CODE_TSMA_NO_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x3102) +#define TSDB_CODE_TSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3103) +#define TSDB_CODE_TSMA_INVALID_STAT TAOS_DEF_ERROR_CODE(0, 0x3104) +#define TSDB_CODE_TSMA_INVALID_PTR TAOS_DEF_ERROR_CODE(0, 0x3105) +#define TSDB_CODE_TSMA_INVALID_PARA TAOS_DEF_ERROR_CODE(0, 0x3106) +#define TSDB_CODE_TSMA_NO_INDEX_IN_CACHE TAOS_DEF_ERROR_CODE(0, 0x3107) + //rsma #define TSDB_CODE_RSMA_INVALID_ENV TAOS_DEF_ERROR_CODE(0, 0x3150) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 2f7ca249ef..aafeca0b39 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2653,7 +2653,7 @@ int32_t tSerializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) { } int32_t numOfIndex = taosArrayGetSize(pRsp->pIndexRsp); - if (tEncodeI32(&encoder, numOfIndex) < 0) return -1; + if (tEncodeI32(&encoder, numOfIndex) < 0) return -1; for (int32_t i = 0; i < numOfIndex; ++i) { STableIndexRsp *pIndexRsp = taosArrayGet(pRsp->pIndexRsp, i); if (tEncodeCStr(&encoder, pIndexRsp->tbName) < 0) return -1; @@ -2738,7 +2738,7 @@ int32_t tDeserializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) { } taosArrayPush(pRsp->pIndexRsp, &tableIndexRsp); } - + tEndDecode(&decoder); tDecoderClear(&decoder); @@ -4000,7 +4000,7 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) { return 0; } -int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) { +int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma, bool deepCopy) { if (tDecodeI8(pCoder, &pSma->version) < 0) return -1; if (tDecodeI8(pCoder, &pSma->intervalUnit) < 0) return -1; if (tDecodeI8(pCoder, &pSma->slidingUnit) < 0) return -1; @@ -4012,17 +4012,30 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) { if (tDecodeI64(pCoder, &pSma->indexUid) < 0) return -1; if (tDecodeI64(pCoder, &pSma->tableUid) < 0) return -1; if (tDecodeI64(pCoder, &pSma->dstTbUid) < 0) return -1; - if (tDecodeCStr(pCoder, &pSma->dstTbName) < 0) return -1; + if (deepCopy) { + if (tDecodeCStrAlloc(pCoder, &pSma->dstTbName) < 0) return -1; + } else { + if (tDecodeCStr(pCoder, &pSma->dstTbName) < 0) return -1; + } + if (tDecodeI64(pCoder, &pSma->interval) < 0) return -1; if (tDecodeI64(pCoder, &pSma->offset) < 0) return -1; if (tDecodeI64(pCoder, &pSma->sliding) < 0) return -1; if (pSma->exprLen > 0) { - if (tDecodeCStr(pCoder, &pSma->expr) < 0) return -1; + if (deepCopy) { + if (tDecodeCStrAlloc(pCoder, &pSma->expr) < 0) return -1; + } else { + if (tDecodeCStr(pCoder, &pSma->expr) < 0) return -1; + } } else { pSma->expr = NULL; } if (pSma->tagsFilterLen > 0) { - if (tDecodeCStr(pCoder, &pSma->tagsFilter) < 0) return -1; + if (deepCopy) { + if (tDecodeCStrAlloc(pCoder, &pSma->tagsFilter) < 0) return -1; + } else { + if (tDecodeCStr(pCoder, &pSma->tagsFilter) < 0) return -1; + } } else { pSma->tagsFilter = NULL; } @@ -4045,7 +4058,7 @@ int32_t tEncodeSVCreateTSmaReq(SEncoder *pCoder, const SVCreateTSmaReq *pReq) { int32_t tDecodeSVCreateTSmaReq(SDecoder *pCoder, SVCreateTSmaReq *pReq) { if (tStartDecode(pCoder) < 0) return -1; - tDecodeTSma(pCoder, pReq); + tDecodeTSma(pCoder, pReq, false); tEndDecode(pCoder); return 0; @@ -4879,4 +4892,3 @@ int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) { if (tDecodeCStrTo(pDecoder, pOffset->subKey) < 0) return -1; return 0; } - diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 8dca589320..15eb35c700 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -31,7 +31,7 @@ target_sources( "src/sma/smaEnv.c" "src/sma/smaOpen.c" "src/sma/smaRollup.c" - "src/sma/smaTimeRange2.c" + "src/sma/smaTimeRange.c" # tsdb "src/tsdb/tsdbCommit.c" diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index e9da125841..1e77022d04 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -38,8 +38,6 @@ typedef struct SSmaStatItem SSmaStatItem; typedef struct SSmaKey SSmaKey; typedef struct SRSmaInfo SRSmaInfo; -#define SMA_IVLD_FID INT_MIN - struct SSmaEnv { TdThreadRwlock lock; int8_t type; @@ -49,45 +47,38 @@ struct SSmaEnv { #define SMA_ENV_LOCK(env) ((env)->lock) #define SMA_ENV_TYPE(env) ((env)->type) #define SMA_ENV_STAT(env) ((env)->pStat) -#define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems) +#define SMA_ENV_STAT_ITEM(env) ((env)->pStat->tsmaStatItem) struct SSmaStatItem { - int8_t state; // ETsdbSmaStat - STSma *pTSma; // cache schema + int8_t state; // ETsdbSmaStat + STSma *pTSma; // cache schema + STSchema *pTSchema; }; struct SSmaStat { union { - SHashObj *smaStatItems; // key: indexUid, value: SSmaStatItem for tsma - SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo; + SSmaStatItem tsmaStatItem; + SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo; }; T_REF_DECLARE() }; -#define SMA_STAT_ITEMS(s) ((s)->smaStatItems) +#define SMA_STAT_ITEM(s) ((s)->tsmaStatItem) #define SMA_STAT_INFO_HASH(s) ((s)->rsmaInfoHash) void tdDestroySmaEnv(SSmaEnv *pSmaEnv); void *tdFreeSmaEnv(SSmaEnv *pSmaEnv); -#if 0 -int32_t tbGetTSmaStatus(SSma *pSma, STSma *param, void *result); -int32_t tbRemoveTSmaData(SSma *pSma, STSma *param, STimeWindow *pWin); -#endif -int32_t tdInitSma(SSma *pSma); int32_t tdDropTSma(SSma *pSma, char *pMsg); int32_t tdDropTSmaData(SSma *pSma, int64_t indexUid); int32_t tdInsertRSmaData(SSma *pSma, char *msg); int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat); int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat); -int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType, bool onlyCheck); +int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType); int32_t tdLockSma(SSma *pSma); int32_t tdUnLockSma(SSma *pSma); -static FORCE_INLINE int16_t tdTSmaAdd(SSma *pSma, int16_t n) { return atomic_add_fetch_16(&SMA_TSMA_NUM(pSma), n); } -static FORCE_INLINE int16_t tdTSmaSub(SSma *pSma, int16_t n) { return atomic_sub_fetch_16(&SMA_TSMA_NUM(pSma), n); } - static FORCE_INLINE int32_t tdRLockSmaEnv(SSmaEnv *pEnv) { int code = taosThreadRwlockRdlock(&(pEnv->lock)); if (code != 0) { @@ -160,11 +151,10 @@ static FORCE_INLINE void tdSmaStatSetDropped(SSmaStatItem *pStatItem) { } } -static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType); -void *tdFreeSmaStatItem(SSmaStatItem *pSmaStatItem); -static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType); -static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path, SDiskID did); -static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SDiskID did, SSmaEnv **pEnv); +static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType); +void *tdFreeSmaStatItem(SSmaStatItem *pSmaStatItem); +static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType); +void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeRSmaInfo(SRSmaInfo *pInfo); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 300a5f890e..52593f7afb 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -147,6 +147,9 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg); +SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, + const char* stbFullName, int32_t vgId); + // sma int32_t smaOpen(SVnode* pVnode); int32_t smaClose(SSma* pSma); @@ -245,7 +248,6 @@ struct STbUidStore { }; struct SSma { - int16_t nTSma; bool locked; TdThreadMutex mutex; SVnode* pVnode; @@ -261,7 +263,6 @@ struct SSma { #define SMA_META(s) ((s)->pVnode->pMeta) #define SMA_VID(s) TD_VID((s)->pVnode) #define SMA_TFS(s) ((s)->pVnode->pTfs) -#define SMA_TSMA_NUM(s) ((s)->nTSma) #define SMA_TSMA_ENV(s) ((s)->pTSmaEnv) #define SMA_RSMA_ENV(s) ((s)->pRSmaEnv) #define SMA_RSMA_TSDB0(s) ((s)->pVnode->pTsdb) diff --git a/source/dnode/vnode/src/meta/metaEntry.c b/source/dnode/vnode/src/meta/metaEntry.c index db99257ea7..15ef38719f 100644 --- a/source/dnode/vnode/src/meta/metaEntry.c +++ b/source/dnode/vnode/src/meta/metaEntry.c @@ -75,7 +75,7 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - if (tDecodeTSma(pCoder, pME->smaEntry.tsma) < 0) return -1; + if (tDecodeTSma(pCoder, pME->smaEntry.tsma, true) < 0) return -1; } else { ASSERT(0); } diff --git a/source/dnode/vnode/src/sma/sma.c b/source/dnode/vnode/src/sma/sma.c index 98e5d7c66d..b5c55a2f83 100644 --- a/source/dnode/vnode/src/sma/sma.c +++ b/source/dnode/vnode/src/sma/sma.c @@ -44,3 +44,209 @@ int32_t smaGetTSmaDays(SVnodeCfg* pCfg, void* pCont, uint32_t contLen, int32_t* smaDebug("vgId:%d, get tsma days %d", pCfg->vgId, *days); return code; } + +#if 0 + +/** + * @brief TODO: Assume that the final generated result it less than 3M + * + * @param pReq + * @param pDataBlocks + * @param vgId + * @param suid // TODO: check with Liao whether suid response is reasonable + * + * TODO: colId should be set + */ +int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, + tb_uid_t suid, const char* stbName, bool isCreateCtb) { + int32_t sz = taosArrayGetSize(pDataBlocks); + int32_t bufSize = sizeof(SSubmitReq); + for (int32_t i = 0; i < sz; ++i) { + SDataBlockInfo* pBlkInfo = &((SSDataBlock*)taosArrayGet(pDataBlocks, i))->info; + bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(pBlkInfo->numOfCols)); + bufSize += sizeof(SSubmitBlk); + } + + *pReq = taosMemoryCalloc(1, bufSize); + if (!(*pReq)) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + void* pDataBuf = *pReq; + + SArray* pTagArray = NULL; + int32_t msgLen = sizeof(SSubmitReq); + int32_t numOfBlks = 0; + int32_t schemaLen = 0; + SRowBuilder rb = {0}; + tdSRowInit(&rb, pTSchema->version); + + for (int32_t i = 0; i < sz; ++i) { + SSDataBlock* pDataBlock = taosArrayGet(pDataBlocks, i); + SDataBlockInfo* pDataBlkInfo = &pDataBlock->info; + int32_t colNum = pDataBlkInfo->numOfCols; + int32_t rows = pDataBlkInfo->rows; + int32_t rowSize = pDataBlkInfo->rowSize; + int64_t groupId = pDataBlkInfo->groupId; + + if (rb.nCols != colNum) { + tdSRowSetTpInfo(&rb, colNum, pTSchema->flen); + } + + if(isCreateCtb) { + SMetaReader mr = {0}; + const char* ctbName = buildCtbNameByGroupId(stbName, pDataBlock->info.groupId); + if (metaGetTableEntryByName(&mr, ctbName) != 0) { + smaDebug("vgId:%d, no tsma ctb %s exists", vgId, ctbName); + } + SVCreateTbReq ctbReq = {0}; + ctbReq.name = ctbName; + ctbReq.type = TSDB_CHILD_TABLE; + ctbReq.ctb.suid = suid; + + STagVal tagVal = {.cid = colNum + PRIMARYKEY_TIMESTAMP_COL_ID, + .type = TSDB_DATA_TYPE_BIGINT, + .i64 = groupId}; + STag* pTag = NULL; + if(!pTagArray) { + pTagArray = taosArrayInit(1, sizeof(STagVal)); + if (!pTagArray) goto _err; + } + taosArrayClear(pTagArray); + taosArrayPush(pTagArray, &tagVal); + tTagNew(pTagArray, 1, false, &pTag); + if (pTag == NULL) { + tdDestroySVCreateTbReq(&ctbReq); + goto _err; + } + ctbReq.ctb.pTag = (uint8_t*)pTag; + + int32_t code; + tEncodeSize(tEncodeSVCreateTbReq, &ctbReq, schemaLen, code); + + tdDestroySVCreateTbReq(&ctbReq); + if (code < 0) { + goto _err; + } + } + + + + SSubmitBlk* pSubmitBlk = POINTER_SHIFT(pDataBuf, msgLen); + pSubmitBlk->suid = suid; + pSubmitBlk->uid = groupId; + pSubmitBlk->numOfRows = rows; + + msgLen += sizeof(SSubmitBlk); + int32_t dataLen = 0; + for (int32_t j = 0; j < rows; ++j) { // iterate by row + tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen)); // set row buf + bool isStartKey = false; + int32_t offset = 0; + for (int32_t k = 0; k < colNum; ++k) { // iterate by column + SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); + STColumn* pCol = &pTSchema->columns[k]; + void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); + switch (pColInfoData->info.type) { + case TSDB_DATA_TYPE_TIMESTAMP: + if (!isStartKey) { + isStartKey = true; + tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, + offset, k); + + } else { + tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, + true, offset, k); + } + break; + case TSDB_DATA_TYPE_NCHAR: { + tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_NCHAR, TD_VTYPE_NORM, var, true, + offset, k); + break; + } + case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY + tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_VARCHAR, TD_VTYPE_NORM, var, true, + offset, k); + break; + } + case TSDB_DATA_TYPE_VARBINARY: + case TSDB_DATA_TYPE_DECIMAL: + case TSDB_DATA_TYPE_BLOB: + case TSDB_DATA_TYPE_JSON: + case TSDB_DATA_TYPE_MEDIUMBLOB: + uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type); + TASSERT(0); + break; + default: + if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) { + if (pCol->type == pColInfoData->info.type) { + tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, var, true, offset, + k); + } else { + char tv[8] = {0}; + if (pColInfoData->info.type == TSDB_DATA_TYPE_FLOAT) { + float v = 0; + GET_TYPED_DATA(v, float, pColInfoData->info.type, var); + SET_TYPED_DATA(&tv, pCol->type, v); + } else if (pColInfoData->info.type == TSDB_DATA_TYPE_DOUBLE) { + double v = 0; + GET_TYPED_DATA(v, double, pColInfoData->info.type, var); + SET_TYPED_DATA(&tv, pCol->type, v); + } else if (IS_SIGNED_NUMERIC_TYPE(pColInfoData->info.type)) { + int64_t v = 0; + GET_TYPED_DATA(v, int64_t, pColInfoData->info.type, var); + SET_TYPED_DATA(&tv, pCol->type, v); + } else { + uint64_t v = 0; + GET_TYPED_DATA(v, uint64_t, pColInfoData->info.type, var); + SET_TYPED_DATA(&tv, pCol->type, v); + } + tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, tv, true, offset, + k); + } + } else { + uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type); + TASSERT(0); + } + break; + } + offset += TYPE_BYTES[pCol->type]; // sum/avg would convert to int64_t/uint64_t/double during aggregation + } + dataLen += TD_ROW_LEN(rb.pBuf); +#ifdef TD_DEBUG_PRINT_ROW + tdSRowPrint(rb.pBuf, pTSchema, __func__); +#endif + } + + ++numOfBlks; + + pSubmitBlk->dataLen = dataLen; + msgLen += pSubmitBlk->dataLen; + } + + (*pReq)->length = msgLen; + + (*pReq)->header.vgId = htonl(vgId); + (*pReq)->header.contLen = htonl(msgLen); + (*pReq)->length = (*pReq)->header.contLen; + (*pReq)->numOfBlocks = htonl(numOfBlks); + SSubmitBlk* blk = (SSubmitBlk*)((*pReq) + 1); + while (numOfBlks--) { + int32_t dataLen = blk->dataLen; + blk->uid = htobe64(blk->uid); + blk->suid = htobe64(blk->suid); + blk->padding = htonl(blk->padding); + blk->sversion = htonl(blk->sversion); + blk->dataLen = htonl(blk->dataLen); + blk->schemaLen = htonl(blk->schemaLen); + blk->numOfRows = htons(blk->numOfRows); + blk = (SSubmitBlk*)(blk->data + dataLen); + } + return TSDB_CODE_SUCCESS; +_err: + taosMemoryFreeClear(*pReq); + taosArrayDestroy(pTagArray); + + return TSDB_CODE_FAILED; +} +#endif diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index 5eec5076e8..f71c222772 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -17,123 +17,17 @@ typedef struct SSmaStat SSmaStat; -static const char *TSDB_SMA_DNAME[] = { - "", // TSDB_SMA_TYPE_BLOCK - "tsma", // TSDB_SMA_TYPE_TIME_RANGE - "rsma", // TSDB_SMA_TYPE_ROLLUP -}; - -#define SMA_TEST_INDEX_NAME "smaTestIndexName" // TODO: just for test -#define SMA_TEST_INDEX_UID 2000000001 // TODO: just for test -#define SMA_STATE_HASH_SLOT 4 - #define RSMA_TASK_INFO_HASH_SLOT 8 -typedef struct SPoolMem { - int64_t size; - struct SPoolMem *prev; - struct SPoolMem *next; -} SPoolMem; - // declaration of static functions -// insert data - -static void tdGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]); - -// Pool Memory -static SPoolMem *openPool(); -static void clearPool(SPoolMem *pPool); -static void closePool(SPoolMem *pPool); -static void *poolMalloc(void *arg, size_t size); -static void poolFree(void *arg, void *ptr); +static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType); +static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path); +static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEnv **pEnv); // implementation -static SPoolMem *openPool() { - SPoolMem *pPool = (SPoolMem *)taosMemoryMalloc(sizeof(*pPool)); - - pPool->prev = pPool->next = pPool; - pPool->size = 0; - - return pPool; -} - -static void clearPool(SPoolMem *pPool) { - if (!pPool) return; - - SPoolMem *pMem; - - do { - pMem = pPool->next; - - if (pMem == pPool) break; - - pMem->next->prev = pMem->prev; - pMem->prev->next = pMem->next; - pPool->size -= pMem->size; - - taosMemoryFree(pMem); - } while (1); - - assert(pPool->size == 0); -} - -static void closePool(SPoolMem *pPool) { - if (pPool) { - clearPool(pPool); - taosMemoryFree(pPool); - } -} - -static void *poolMalloc(void *arg, size_t size) { - void *ptr = NULL; - SPoolMem *pPool = (SPoolMem *)arg; - SPoolMem *pMem; - - pMem = (SPoolMem *)taosMemoryMalloc(sizeof(*pMem) + size); - if (!pMem) { - assert(0); - } - - pMem->size = sizeof(*pMem) + size; - pMem->next = pPool->next; - pMem->prev = pPool; - - pPool->next->prev = pMem; - pPool->next = pMem; - pPool->size += pMem->size; - - ptr = (void *)(&pMem[1]); - return ptr; -} - -static void poolFree(void *arg, void *ptr) { - SPoolMem *pPool = (SPoolMem *)arg; - SPoolMem *pMem; - - pMem = &(((SPoolMem *)ptr)[-1]); - - pMem->next->prev = pMem->prev; - pMem->prev->next = pMem->next; - pPool->size -= pMem->size; - - taosMemoryFree(pMem); -} - -int32_t tdInitSma(SSma *pSma) { - int32_t numOfTSma = taosArrayGetSize(metaGetSmaTbUids(SMA_META(pSma))); - if (numOfTSma > 0) { - atomic_store_16(&SMA_TSMA_NUM(pSma), (int16_t)numOfTSma); - } - return TSDB_CODE_SUCCESS; -} - -static void tdGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) { - snprintf(dirName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s", TD_DIRSEP, vgId, TD_DIRSEP, TSDB_SMA_DNAME[smaType]); -} - -static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path, SDiskID did) { +static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path) { SSmaEnv *pEnv = NULL; pEnv = (SSmaEnv *)taosMemoryCalloc(1, sizeof(SSmaEnv)); @@ -156,18 +50,17 @@ static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path, return NULL; } - return pEnv; } -static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SDiskID did, SSmaEnv **pEnv) { +static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaEnv **pEnv) { if (!pEnv) { terrno = TSDB_CODE_INVALID_PTR; return TSDB_CODE_FAILED; } if (!(*pEnv)) { - if (!(*pEnv = tdNewSmaEnv(pSma, smaType, path, did))) { + if (!(*pEnv = tdNewSmaEnv(pSma, smaType, path))) { return TSDB_CODE_FAILED; } } @@ -183,15 +76,16 @@ static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SDiskI */ void tdDestroySmaEnv(SSmaEnv *pSmaEnv) { if (pSmaEnv) { - tdDestroySmaState(pSmaEnv->pStat, SMA_ENV_TYPE(pSmaEnv)); - taosMemoryFreeClear(pSmaEnv->pStat); + pSmaEnv->pStat = tdFreeSmaState(pSmaEnv->pStat, SMA_ENV_TYPE(pSmaEnv)); taosThreadRwlockDestroy(&(pSmaEnv->lock)); } } void *tdFreeSmaEnv(SSmaEnv *pSmaEnv) { - tdDestroySmaEnv(pSmaEnv); - taosMemoryFreeClear(pSmaEnv); + if (pSmaEnv) { + tdDestroySmaEnv(pSmaEnv); + taosMemoryFreeClear(pSmaEnv); + } return NULL; } @@ -239,13 +133,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) { return TSDB_CODE_FAILED; } } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { - SMA_STAT_ITEMS(*pSmaStat) = - taosHashInit(SMA_STATE_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - - if (!SMA_STAT_ITEMS(*pSmaStat)) { - taosMemoryFreeClear(*pSmaStat); - return TSDB_CODE_FAILED; - } + // TODO } else { ASSERT(0); } @@ -262,6 +150,12 @@ void *tdFreeSmaStatItem(SSmaStatItem *pSmaStatItem) { return NULL; } +void* tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType) { + tdDestroySmaState(pSmaStat, smaType); + taosMemoryFreeClear(pSmaStat); + return NULL; +} + /** * @brief Release resources allocated for its member fields, not including itself. * @@ -270,16 +164,10 @@ void *tdFreeSmaStatItem(SSmaStatItem *pSmaStatItem) { */ int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) { if (pSmaStat) { - // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready. if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { - void *item = taosHashIterate(SMA_STAT_ITEMS(pSmaStat), NULL); - while (item) { - SSmaStatItem *pItem = *(SSmaStatItem **)item; - tdFreeSmaStatItem(pItem); - item = taosHashIterate(SMA_STAT_ITEMS(pSmaStat), item); - } - taosHashCleanup(SMA_STAT_ITEMS(pSmaStat)); + tdFreeSmaStatItem(&pSmaStat->tsmaStatItem); } else if (smaType == TSDB_SMA_TYPE_ROLLUP) { + // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready. void *infoHash = taosHashIterate(SMA_STAT_INFO_HASH(pSmaStat), NULL); while (infoHash) { SRSmaInfo *pInfoHash = *(SRSmaInfo **)infoHash; @@ -317,7 +205,7 @@ int32_t tdUnLockSma(SSma *pSma) { return 0; } -int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType, bool onlyCheck) { +int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) { SSmaEnv *pEnv = NULL; // return if already init @@ -344,26 +232,7 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType, bool onlyCheck) { if (!pEnv) { char rname[TSDB_FILENAME_LEN] = {0}; - SDiskID did = {0}; - if (tfsAllocDisk(SMA_TFS(pSma), TFS_PRIMARY_LEVEL, &did) < 0) { - tdUnLockSma(pSma); - return TSDB_CODE_FAILED; - } - - if (did.level < 0 || did.id < 0) { - tdUnLockSma(pSma); - smaError("vgId:%d, init sma env failed since invalid did(%d,%d)", SMA_VID(pSma), did.level, did.id); - return TSDB_CODE_FAILED; - } - - tdGetSmaDir(SMA_VID(pSma), smaType, rname); - - if (tfsMkdirRecurAt(SMA_TFS(pSma), rname, did) < 0) { - tdUnLockSma(pSma); - return TSDB_CODE_FAILED; - } - - if (tdInitSmaEnv(pSma, smaType, rname, did, &pEnv) < 0) { + if (tdInitSmaEnv(pSma, smaType, rname, &pEnv) < 0) { tdUnLockSma(pSma); return TSDB_CODE_FAILED; } diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index dde6578054..a1c47a96c0 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -132,7 +132,9 @@ int32_t smaClose(SSma *pSma) { if SMA_RSMA_TSDB0 (pSma) tsdbClose(&SMA_RSMA_TSDB0(pSma)); if SMA_RSMA_TSDB1 (pSma) tsdbClose(&SMA_RSMA_TSDB1(pSma)); if SMA_RSMA_TSDB2 (pSma) tsdbClose(&SMA_RSMA_TSDB2(pSma)); - taosMemoryFree(pSma); + // SMA_TSMA_ENV(pSma) = tdFreeSmaEnv(SMA_TSMA_ENV(pSma)); + // SMA_RSMA_ENV(pSma) = tdFreeSmaEnv(SMA_RSMA_ENV(pSma)); + taosMemoryFreeClear(pSma); } return 0; } \ No newline at end of file diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 1b34529506..b2dcce8f4c 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -181,7 +181,7 @@ int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) { return TSDB_CODE_SUCCESS; } - if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP, false) != TSDB_CODE_SUCCESS) { + if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_ROLLUP) != TSDB_CODE_SUCCESS) { terrno = TSDB_CODE_TDB_INIT_FAILED; return TSDB_CODE_FAILED; } diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index bca5b1543e..4352c466c5 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -142,7 +142,6 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { ASSERT(pItem); if (!pItem->pTSma) { - // cache smaMeta STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid); if (!pTSma) { terrno = TSDB_CODE_TSMA_NO_INDEX_IN_META; @@ -150,27 +149,28 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { return TSDB_CODE_FAILED; } pItem->pTSma = pTSma; + pItem->pTSchema = metaGetTbTSchema(SMA_META(pSma), pTSma->dstTbUid, -1); + ASSERT(pItem->pTSchema); // TODO } - STSma *pTSma = pItem->pTSma; - - ASSERT(pTSma->indexUid == indexUid); - - SMetaReader mr = {0}; - - const char *dbName = "testDb"; - if (metaGetTableEntryByName(&mr, dbName) != 0) { - smaDebug("vgId:%d, tsma no table testTb exists for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), indexUid, tstrerror(terrno)); - SVCreateStbReq pReq = {0}; - pReq.name = dbName; - pReq.suid = pTSma->dstTbUid; - pReq.schemaRow = pCfg->schemaRow; - pReq.schemaTag = pCfg->schemaTag; - } + ASSERT(pItem->pTSma->indexUid == indexUid); SSubmitReq *pSubmitReq = NULL; - buildSubmitReqFromDataBlock(&pSubmitReq, (const SArray *)msg, NULL, pItem->pTSma->dstVgId, - pItem->pTSma->dstTbUid); + + pSubmitReq = tdBlockToSubmit((const SArray *)msg, pItem->pTSchema, true, pItem->pTSma->dstTbUid, + pItem->pTSma->dstTbName, pItem->pTSma->dstVgId); + + ASSERT(pSubmitReq); // TODO + + ASSERT(!strncasecmp("td.tsma.rst.tb", pItem->pTSma->dstTbName, 14)); + + SRpcMsg submitReqMsg = { + .msgType = TDMT_VND_SUBMIT, + .pCont = pSubmitReq, + .contLen = ntohl(pSubmitReq->length), + }; + + ASSERT(tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &submitReqMsg) == 0); tdUnRefSmaStat(pSma, pStat); diff --git a/source/dnode/vnode/src/sma/smaTimeRange2.c b/source/dnode/vnode/src/sma/smaTimeRange2.c deleted file mode 100644 index 9c613873ab..0000000000 --- a/source/dnode/vnode/src/sma/smaTimeRange2.c +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "sma.h" -#include "tsdb.h" - -typedef STsdbCfg STSmaKeepCfg; - -#undef _TEST_SMA_PRINT_DEBUG_LOG_ -#define SMA_STORAGE_MINUTES_MAX 86400 -#define SMA_STORAGE_MINUTES_DAY 1440 -#define SMA_STORAGE_MINUTES_MIN 1440 -#define SMA_STORAGE_TSDB_MINUTES 86400 -#define SMA_STORAGE_TSDB_TIMES 10 -#define SMA_STORAGE_SPLIT_FACTOR 14400 // least records in tsma file TODO: the feasible value? -#define SMA_KEY_LEN 16 // TSKEY+groupId 8+8 -#define SMA_DROP_EXPIRED_TIME 10 // default is 10 seconds - -#define SMA_STATE_ITEM_HASH_SLOT 32 - -// static func - -/** - * @brief Judge the tsma file split days - * - * @param pCfg - * @param pCont - * @param contLen - * @param days unit is minute - * @return int32_t - */ -int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) { - SDecoder coder = {0}; - tDecoderInit(&coder, pCont, contLen); - - STSma tsma = {0}; - if (tDecodeSVCreateTSmaReq(&coder, &tsma) < 0) { - terrno = TSDB_CODE_MSG_DECODE_ERROR; - goto _err; - } - STsdbCfg *pTsdbCfg = &pCfg->tsdbCfg; - int64_t sInterval = convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_SECOND); - if (sInterval <= 0) { - *days = pTsdbCfg->days; - return 0; - } - int64_t records = pTsdbCfg->days * 60 / sInterval; - if (records >= SMA_STORAGE_SPLIT_FACTOR) { - *days = pTsdbCfg->days; - } else { - int64_t mInterval = convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_MINUTE); - int64_t daysPerFile = mInterval * SMA_STORAGE_MINUTES_DAY * 2; - - if (daysPerFile > SMA_STORAGE_MINUTES_MAX) { - *days = SMA_STORAGE_MINUTES_MAX; - } else { - *days = (int32_t)daysPerFile; - } - - if (*days < pTsdbCfg->days) { - *days = pTsdbCfg->days; - } - } - tDecoderClear(&coder); - return 0; -_err: - tDecoderClear(&coder); - return -1; -} - -// read data - -// implementation - -/** - * @brief Insert/Update Time-range-wise SMA data. - * - If interval < SMA_STORAGE_SPLIT_HOURS(e.g. 24), save the SMA data as a part of DFileSet to e.g. - * v3f1900.tsma.${sma_index_name}. The days is the same with that for TS data files. - * - If interval >= SMA_STORAGE_SPLIT_HOURS, save the SMA data to e.g. vnode3/tsma/v3f632.tsma.${sma_index_name}. The - * days is 30 times of the interval, and the minimum days is SMA_STORAGE_TSDB_DAYS(30d). - * - The destination file of one data block for some interval is determined by its start TS key. - * - * @param pSma - * @param msg - * @return int32_t - */ -int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { - STsdbCfg *pCfg = SMA_TSDB_CFG(pSma); - - const SArray *pDataBlocks = (const SArray *)msg; - - // TODO: destroy SSDataBlocks(msg) - - // For super table aggregation, the sma data is stored in vgroup calculated from the hash value of stable name. Thus - // the sma data would arrive ahead of the update-expired-window msg. - if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_TIME_RANGE, false) != TSDB_CODE_SUCCESS) { - terrno = TSDB_CODE_TDB_INIT_FAILED; - return TSDB_CODE_FAILED; - } - - if (!pDataBlocks) { - terrno = TSDB_CODE_INVALID_PTR; - smaWarn("vgId:%d, insert tsma data failed since pDataBlocks is NULL", SMA_VID(pSma)); - return terrno; - } - - if (taosArrayGetSize(pDataBlocks) <= 0) { - terrno = TSDB_CODE_INVALID_PARA; - smaWarn("vgId:%d, insert tsma data failed since pDataBlocks is empty", SMA_VID(pSma)); - return TSDB_CODE_FAILED; - } - - SSmaEnv *pEnv = SMA_TSMA_ENV(pSma); - SSmaStat *pStat = SMA_ENV_STAT(pEnv); - SSmaStatItem *pItem = NULL; - - tdRefSmaStat(pSma, pStat); - - if (pStat && SMA_STAT_ITEMS(pStat)) { - pItem = taosHashGet(SMA_STAT_ITEMS(pStat), &indexUid, sizeof(indexUid)); - } - - if (!pItem || !(pItem = *(SSmaStatItem **)pItem) || tdSmaStatIsDropped(pItem)) { - terrno = TSDB_CODE_TSMA_INVALID_STAT; - tdUnRefSmaStat(pSma, pStat); - return TSDB_CODE_FAILED; - } - - STSma *pTSma = pItem->pTSma; - - tdUnRefSmaStat(pSma, pStat); - - return TSDB_CODE_SUCCESS; -} - -int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) { - SSmaCfg *pCfg = (SSmaCfg *)pMsg; - - if (metaCreateTSma(SMA_META(pSma), version, pCfg) < 0) { - return -1; - } - - if (TD_VID(pSma->pVnode) == pCfg->dstVgId) { - // create stable to save tsma result in dstVgId - SVCreateStbReq pReq = {0}; - pReq.name = pCfg->dstTbName; - pReq.suid = pCfg->dstTbUid; - pReq.schemaRow = pCfg->schemaRow; - pReq.schemaTag = pCfg->schemaTag; - - if (metaCreateSTable(SMA_META(pSma), version, &pReq) < 0) { - return -1; - } - } - - tdTSmaAdd(pSma, 1); - return 0; -} \ No newline at end of file diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 0cca1d2e10..b628f0dde5 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -15,10 +15,7 @@ #include "tq.h" -static SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool createTb, int64_t suid, - const char* stbFullName, int32_t vgId); - -static SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid, +SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid, const char* stbFullName, int32_t vgId) { SSubmitReq* ret = NULL; SArray* tagArray = taosArrayInit(1, sizeof(STagVal)); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 10e0888988..56e6e15c25 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -284,7 +284,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, int64_t version, SRpcMsg *pMsg, SRp void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { // TODO - // blockDebugShowData(data, __func__); + blockDebugShowData(data, __func__); tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data); } diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index 0161fac9b5..2c1e6fbbbd 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -121,7 +121,7 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) { // decode STSmaWrapper dstTSmaWrapper = {0}; - void *result = tDecodeTSmaWrapper(pSW, &dstTSmaWrapper); + void *result = tDecodeTSmaWrapper(pSW, &dstTSmaWrapper, false); EXPECT_NE(result, nullptr); EXPECT_EQ(tSmaWrapper.number, dstTSmaWrapper.number); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 079d5ef590..0a60115b35 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -559,8 +559,10 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_ALREADY_EXIST, "Tsma already exists TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_NO_INDEX_IN_META, "No tsma index in meta") TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_ENV, "Invalid tsma env") TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_STAT, "Invalid tsma state") +TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_PTR, "Invalid tsma pointer") +TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_INVALID_PARA, "Invalid tsma parameters") TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_NO_INDEX_IN_CACHE, "No tsma index in cache") -TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_RM_SKEY_IN_HASH, "Rm tsma skey in cache") + //rsma TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env")