diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 24c4f2912c..18b8f6d79b 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -27,7 +27,6 @@ target_sources( "src/meta/metaSnapshot.c" # sma - "src/sma/sma.c" "src/sma/smaEnv.c" "src/sma/smaUtil.c" "src/sma/smaOpen.c" diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 2d6edae0e7..7183c423fe 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -47,7 +47,8 @@ struct SSmaEnv { }; typedef struct { - int32_t smaRef; + int8_t inited; + int32_t rsetId; } SSmaMgmt; #define SMA_ENV_LOCK(env) ((env)->lock) @@ -95,6 +96,7 @@ enum { TASK_TRIGGER_STAT_CANCELLED = 4, TASK_TRIGGER_STAT_FINISHED = 5, }; + void tdDestroySmaEnv(SSmaEnv *pSmaEnv); void *tdFreeSmaEnv(SSmaEnv *pSmaEnv); @@ -104,6 +106,10 @@ int32_t tdInsertRSmaData(SSma *pSma, char *msg); int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat); int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat); + +void *tdAcquireSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln); +int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln); + int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType); int32_t tdLockSma(SSma *pSma); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index ae659dc396..34fac045a7 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -163,6 +163,8 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pSchema, bool const char* stbFullName, int32_t vgId); // sma +int32_t smaInit(); +void smaCleanUp(); int32_t smaOpen(SVnode* pVnode); int32_t smaClose(SSma* pSma); int32_t smaBegin(SSma* pSma); diff --git a/source/dnode/vnode/src/sma/sma.c b/source/dnode/vnode/src/sma/sma.c deleted file mode 100644 index 12f93f9400..0000000000 --- a/source/dnode/vnode/src/sma/sma.c +++ /dev/null @@ -1,257 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "sma.h" - -// functions for external invocation - -// TODO: Who is responsible for resource allocate and release? -int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg) { - int32_t code = TSDB_CODE_SUCCESS; - - if ((code = tdProcessTSmaInsertImpl(pSma, indexUid, msg)) < 0) { - smaWarn("vgId:%d, insert tsma data failed since %s", SMA_VID(pSma), tstrerror(terrno)); - } - // TODO: destroy SSDataBlocks(msg) - return code; -} - -int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg) { - int32_t code = TSDB_CODE_SUCCESS; - - if ((code = tdProcessTSmaCreateImpl(pSma, version, msg)) < 0) { - smaWarn("vgId:%d, create tsma failed since %s", SMA_VID(pSma), tstrerror(terrno)); - } - // TODO: destroy SSDataBlocks(msg) - return code; -} - -int32_t smaGetTSmaDays(SVnodeCfg* pCfg, void* pCont, uint32_t contLen, int32_t* days) { - int32_t code = TSDB_CODE_SUCCESS; - if ((code = tdProcessTSmaGetDaysImpl(pCfg, pCont, contLen, days)) < 0) { - smaWarn("vgId:%d, get tsma days failed since %s", pCfg->vgId, tstrerror(terrno)); - } - smaDebug("vgId:%d, get tsma days %d", pCfg->vgId, *days); - return code; -} - - -// functions for internal invocation - -#if 0 - -/** - * @brief TODO: Assume that the final generated result it less than 3M - * - * @param pReq - * @param pDataBlocks - * @param vgId - * @param suid // TODO: check with Liao whether suid response is reasonable - * - * TODO: colId should be set - */ -int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, - tb_uid_t suid, const char* stbName, bool isCreateCtb) { - int32_t sz = taosArrayGetSize(pDataBlocks); - int32_t bufSize = sizeof(SSubmitReq); - for (int32_t i = 0; i < sz; ++i) { - SDataBlockInfo* pBlkInfo = &((SSDataBlock*)taosArrayGet(pDataBlocks, i))->info; - bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(pBlkInfo->numOfCols)); - bufSize += sizeof(SSubmitBlk); - } - - *pReq = taosMemoryCalloc(1, bufSize); - if (!(*pReq)) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; - } - void* pDataBuf = *pReq; - - SArray* pTagArray = NULL; - int32_t msgLen = sizeof(SSubmitReq); - int32_t numOfBlks = 0; - int32_t schemaLen = 0; - SRowBuilder rb = {0}; - tdSRowInit(&rb, pTSchema->version); - - for (int32_t i = 0; i < sz; ++i) { - SSDataBlock* pDataBlock = taosArrayGet(pDataBlocks, i); - SDataBlockInfo* pDataBlkInfo = &pDataBlock->info; - int32_t colNum = pDataBlkInfo->numOfCols; - int32_t rows = pDataBlkInfo->rows; - int32_t rowSize = pDataBlkInfo->rowSize; - int64_t groupId = pDataBlkInfo->groupId; - - if (rb.nCols != colNum) { - tdSRowSetTpInfo(&rb, colNum, pTSchema->flen); - } - - if(isCreateCtb) { - SMetaReader mr = {0}; - const char* ctbName = buildCtbNameByGroupId(stbName, pDataBlock->info.groupId); - if (metaGetTableEntryByName(&mr, ctbName) != 0) { - smaDebug("vgId:%d, no tsma ctb %s exists", vgId, ctbName); - } - SVCreateTbReq ctbReq = {0}; - ctbReq.name = ctbName; - ctbReq.type = TSDB_CHILD_TABLE; - ctbReq.ctb.suid = suid; - - STagVal tagVal = {.cid = colNum + PRIMARYKEY_TIMESTAMP_COL_ID, - .type = TSDB_DATA_TYPE_BIGINT, - .i64 = groupId}; - STag* pTag = NULL; - if(!pTagArray) { - pTagArray = taosArrayInit(1, sizeof(STagVal)); - if (!pTagArray) goto _err; - } - taosArrayClear(pTagArray); - taosArrayPush(pTagArray, &tagVal); - tTagNew(pTagArray, 1, false, &pTag); - if (pTag == NULL) { - tdDestroySVCreateTbReq(&ctbReq); - goto _err; - } - ctbReq.ctb.pTag = (uint8_t*)pTag; - - int32_t code; - tEncodeSize(tEncodeSVCreateTbReq, &ctbReq, schemaLen, code); - - tdDestroySVCreateTbReq(&ctbReq); - if (code < 0) { - goto _err; - } - } - - - - SSubmitBlk* pSubmitBlk = POINTER_SHIFT(pDataBuf, msgLen); - pSubmitBlk->suid = suid; - pSubmitBlk->uid = groupId; - pSubmitBlk->numOfRows = rows; - - msgLen += sizeof(SSubmitBlk); - int32_t dataLen = 0; - for (int32_t j = 0; j < rows; ++j) { // iterate by row - tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen)); // set row buf - bool isStartKey = false; - int32_t offset = 0; - for (int32_t k = 0; k < colNum; ++k) { // iterate by column - SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); - STColumn* pCol = &pTSchema->columns[k]; - void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); - switch (pColInfoData->info.type) { - case TSDB_DATA_TYPE_TIMESTAMP: - if (!isStartKey) { - isStartKey = true; - tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, - offset, k); - - } else { - tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, - true, offset, k); - } - break; - case TSDB_DATA_TYPE_NCHAR: { - tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_NCHAR, TD_VTYPE_NORM, var, true, - offset, k); - break; - } - case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY - tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, TSDB_DATA_TYPE_VARCHAR, TD_VTYPE_NORM, var, true, - offset, k); - break; - } - case TSDB_DATA_TYPE_VARBINARY: - case TSDB_DATA_TYPE_DECIMAL: - case TSDB_DATA_TYPE_BLOB: - case TSDB_DATA_TYPE_JSON: - case TSDB_DATA_TYPE_MEDIUMBLOB: - uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type); - TASSERT(0); - break; - default: - if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) { - if (pCol->type == pColInfoData->info.type) { - tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, var, true, offset, - k); - } else { - char tv[8] = {0}; - if (pColInfoData->info.type == TSDB_DATA_TYPE_FLOAT) { - float v = 0; - GET_TYPED_DATA(v, float, pColInfoData->info.type, var); - SET_TYPED_DATA(&tv, pCol->type, v); - } else if (pColInfoData->info.type == TSDB_DATA_TYPE_DOUBLE) { - double v = 0; - GET_TYPED_DATA(v, double, pColInfoData->info.type, var); - SET_TYPED_DATA(&tv, pCol->type, v); - } else if (IS_SIGNED_NUMERIC_TYPE(pColInfoData->info.type)) { - int64_t v = 0; - GET_TYPED_DATA(v, int64_t, pColInfoData->info.type, var); - SET_TYPED_DATA(&tv, pCol->type, v); - } else { - uint64_t v = 0; - GET_TYPED_DATA(v, uint64_t, pColInfoData->info.type, var); - SET_TYPED_DATA(&tv, pCol->type, v); - } - tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID + k, pCol->type, TD_VTYPE_NORM, tv, true, offset, - k); - } - } else { - uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type); - TASSERT(0); - } - break; - } - offset += TYPE_BYTES[pCol->type]; // sum/avg would convert to int64_t/uint64_t/double during aggregation - } - dataLen += TD_ROW_LEN(rb.pBuf); -#ifdef TD_DEBUG_PRINT_ROW - tdSRowPrint(rb.pBuf, pTSchema, __func__); -#endif - } - - ++numOfBlks; - - pSubmitBlk->dataLen = dataLen; - msgLen += pSubmitBlk->dataLen; - } - - (*pReq)->length = msgLen; - - (*pReq)->header.vgId = htonl(vgId); - (*pReq)->header.contLen = htonl(msgLen); - (*pReq)->length = (*pReq)->header.contLen; - (*pReq)->numOfBlocks = htonl(numOfBlks); - SSubmitBlk* blk = (SSubmitBlk*)((*pReq) + 1); - while (numOfBlks--) { - int32_t dataLen = blk->dataLen; - blk->uid = htobe64(blk->uid); - blk->suid = htobe64(blk->suid); - blk->padding = htonl(blk->padding); - blk->sversion = htonl(blk->sversion); - blk->dataLen = htonl(blk->dataLen); - blk->schemaLen = htonl(blk->schemaLen); - blk->numOfRows = htons(blk->numOfRows); - blk = (SSubmitBlk*)(blk->data + dataLen); - } - return TSDB_CODE_SUCCESS; -_err: - taosMemoryFreeClear(*pReq); - taosArrayDestroy(pTagArray); - - return TSDB_CODE_FAILED; -} -#endif diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 6e75176136..e241c14fc7 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -121,7 +121,7 @@ static int32_t tdProcessRSmaPreCommitImpl(SSma *pSma) { // step 3: perform persist task for qTaskInfo tdRSmaPersistExecImpl(pRSmaStat); - smaDebug("vgId:%d, rsma pre commit succeess", SMA_VID(pSma)); + smaDebug("vgId:%d, rsma pre commit success", SMA_VID(pSma)); return TSDB_CODE_SUCCESS; } @@ -173,6 +173,7 @@ static int32_t tdProcessRSmaPostCommitImpl(SSma *pSma) { } if ((pDir = taosOpenDir(dir)) == NULL) { + regfree(®ex); terrno = TAOS_SYSTEM_ERROR(errno); smaWarn("vgId:%d, rsma post commit, open dir %s failed since %s", TD_VID(pVnode), dir, terrstr()); return TSDB_CODE_FAILED; diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index c7b938f884..bb8dd48236 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -18,7 +18,7 @@ typedef struct SSmaStat SSmaStat; #define RSMA_TASK_INFO_HASH_SLOT 8 -#define SMA_MGMT_REF_NUM 1024 +#define SMA_MGMT_REF_NUM 10240 extern SSmaMgmt smaMgmt; @@ -30,7 +30,62 @@ static int32_t tdInitSmaEnv(SSma *pSma, int8_t smaType, const char *path, SSmaE static void *tdFreeTSmaStat(STSmaStat *pStat); static void tdDestroyRSmaStat(void *pRSmaStat); +/** + * @brief rsma init + * + * @return int32_t + */ // implementation +int32_t smaInit() { + int8_t old; + int32_t nLoops = 0; + while (1) { + old = atomic_val_compare_exchange_8(&smaMgmt.inited, 0, 2); + if (old != 2) break; + if (++nLoops > 1000) { + sched_yield(); + nLoops = 0; + } + } + + if (old == 0) { + smaMgmt.rsetId = taosOpenRef(SMA_MGMT_REF_NUM, tdDestroyRSmaStat); + + if (smaMgmt.rsetId < 0) { + smaError("failed to init sma rset since %s", terrstr()); + atomic_store_8(&smaMgmt.inited, 0); + return TSDB_CODE_FAILED; + } + + smaInfo("sma rset is initialized, rsetId:%d", smaMgmt.rsetId); + atomic_store_8(&smaMgmt.inited, 1); + } + + return TSDB_CODE_SUCCESS; +} + +/** + * @brief rsma cleanup + * + */ +void smaCleanUp() { + int8_t old; + int32_t nLoops = 0; + while (1) { + old = atomic_val_compare_exchange_8(&smaMgmt.inited, 1, 2); + if (old != 2) break; + if (++nLoops > 1000) { + sched_yield(); + nLoops = 0; + } + } + + if (old == 1) { + smaInfo("sma rset is cleaned up, resetId:%d", smaMgmt.rsetId); + taosCloseRef(smaMgmt.rsetId); + atomic_store_8(&smaMgmt.inited, 0); + } +} static SSmaEnv *tdNewSmaEnv(const SSma *pSma, int8_t smaType, const char *path) { SSmaEnv *pEnv = NULL; @@ -135,17 +190,16 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT); // init smaMgmt - smaMgmt.smaRef = taosOpenRef(SMA_MGMT_REF_NUM, tdDestroyRSmaStat); - if (smaMgmt.smaRef < 0) { - smaError("init smaRef failed, num:%d", SMA_MGMT_REF_NUM); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; - } + smaInit(); - int64_t refId = taosAddRef(smaMgmt.smaRef, pRSmaStat); + int64_t refId = taosAddRef(smaMgmt.rsetId, pRSmaStat); if (refId < 0) { - smaError("taosAddRef smaRef failed, since:%s", tstrerror(terrno)); + smaError("vgId:%d, taosAddRef refId:%" PRIi64 " to rsetId rsetId:%d max:%d failed since:%s", SMA_VID(pSma), + refId, smaMgmt.rsetId, SMA_MGMT_REF_NUM, tstrerror(terrno)); return TSDB_CODE_FAILED; + } else { + smaDebug("vgId:%d, taosAddRef refId:%" PRIi64 " to rsetId rsetId:%d max:%d succeed", SMA_VID(pSma), refId, + smaMgmt.rsetId, SMA_MGMT_REF_NUM); } pRSmaStat->refId = refId; @@ -275,8 +329,13 @@ int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) { tdDestroyTSmaStat(SMA_TSMA_STAT(pSmaStat)); } else if (smaType == TSDB_SMA_TYPE_ROLLUP) { SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSmaStat); - if (taosRemoveRef(smaMgmt.smaRef, RSMA_REF_ID(pRSmaStat)) < 0) { - smaError("remove refId from rsmaRef:0x%" PRIx64 " failed since %s", RSMA_REF_ID(pRSmaStat), terrstr()); + if (taosRemoveRef(smaMgmt.rsetId, RSMA_REF_ID(pRSmaStat)) < 0) { + smaError("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " failed since %s", SMA_VID(pRSmaStat->pSma), + RSMA_REF_ID(pRSmaStat), smaMgmt.rsetId, terrstr()); + ASSERT(0); + } else { + smaDebug("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " succeed", SMA_VID(pRSmaStat->pSma), + RSMA_REF_ID(pRSmaStat), smaMgmt.rsetId); } } else { ASSERT(0); @@ -323,7 +382,7 @@ int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType) { } break; default: - TASSERT(0); + smaError("vgId:%d undefined smaType:%", SMA_VID(pSma), smaType); return TSDB_CODE_FAILED; } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 4c2e606e0e..c44a46ac5a 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -19,7 +19,8 @@ #define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid SSmaMgmt smaMgmt = { - .smaRef = -1, + .inited = 0, + .rsetId = -1, }; #define TD_QTASKINFO_FNAME_PREFIX "qtaskinfo.ver" @@ -608,9 +609,11 @@ _err: static void tdRSmaFetchTrigger(void *param, void *tmrId) { SRSmaInfoItem *pItem = param; SSma *pSma = NULL; - SRSmaStat *pStat = (SRSmaStat *)taosAcquireRef(smaMgmt.smaRef, pItem->refId); + SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__); + if (!pStat) { - smaDebug("rsma fetch task not start since already destroyed"); + smaDebug("rsma fetch task not start since already destroyed, rsetId rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId, + pItem->refId); return; } @@ -622,9 +625,10 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { case TASK_TRIGGER_STAT_PAUSED: case TASK_TRIGGER_STAT_CANCELLED: case TASK_TRIGGER_STAT_FINISHED: { - taosReleaseRef(smaMgmt.smaRef, pItem->refId); - smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is cancelled", - SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid); + tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__); + smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is %" PRIi8 + ", rsetId rsetId:%" PRIi64 " refId:%d", + SMA_VID(pSma), pItem->level, pItem->pRsmaInfo->suid, rsmaTriggerStat, smaMgmt.rsetId, pItem->refId); return; } default: @@ -665,7 +669,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { } _end: - taosReleaseRef(smaMgmt.smaRef, pItem->refId); + tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__); } static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem, tb_uid_t suid, @@ -1258,7 +1262,8 @@ _end: } atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); - taosReleaseRef(smaMgmt.smaRef, pRSmaStat->refId); + smaDebug("vgId:%d, release rsetId rsetId:%" PRIi64 " refId:%d", SMA_VID(pSma), smaMgmt.rsetId, pRSmaStat->refId); + tdReleaseSmaRef(smaMgmt.rsetId, pRSmaStat->refId, __func__, __LINE__); taosThreadExit(NULL); return NULL; } @@ -1283,7 +1288,9 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) { atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat))); } atomic_store_8(RSMA_RUNNING_STAT(pRSmaStat), 0); - taosReleaseRef(smaMgmt.smaRef, pRSmaStat->refId); + smaDebug("vgId:%d, release rsetId rsetId:%" PRIi64 " refId:%d)", SMA_VID(pRSmaStat->pSma), smaMgmt.rsetId, + pRSmaStat->refId); + tdReleaseSmaRef(smaMgmt.rsetId, pRSmaStat->refId, __func__, __LINE__); } taosThreadAttrDestroy(&thAttr); @@ -1297,8 +1304,8 @@ static void tdRSmaPersistTask(SRSmaStat *pRSmaStat) { */ static void tdRSmaPersistTrigger(void *param, void *tmrId) { SRSmaStat *rsmaStat = param; - SRSmaStat *pRSmaStat = (SRSmaStat *)taosAcquireRef(smaMgmt.smaRef, rsmaStat->refId); - + SRSmaStat *pRSmaStat = (SRSmaStat *)taosAcquireRef(smaMgmt.rsetId, rsmaStat->refId); + ASSERT(0); if (!pRSmaStat) { smaDebug("rsma persistence task not start since already destroyed"); return; @@ -1341,5 +1348,5 @@ static void tdRSmaPersistTrigger(void *param, void *tmrId) { smaWarn("rsma persistence not start since unknown stat %" PRIi8, tmrStat); } break; } - taosReleaseRef(smaMgmt.smaRef, rsmaStat->refId); + taosReleaseRef(smaMgmt.rsetId, rsmaStat->refId); } diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index ecd8db8ea9..6d71f3a250 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -20,6 +20,36 @@ #define SMA_STORAGE_MINUTES_DAY 1440 #define SMA_STORAGE_SPLIT_FACTOR 14400 // least records in tsma file +// TODO: Who is responsible for resource allocate and release? +int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) { + int32_t code = TSDB_CODE_SUCCESS; + + if ((code = tdProcessTSmaInsertImpl(pSma, indexUid, msg)) < 0) { + smaWarn("vgId:%d, insert tsma data failed since %s", SMA_VID(pSma), tstrerror(terrno)); + } + // TODO: destroy SSDataBlocks(msg) + return code; +} + +int32_t tdProcessTSmaCreate(SSma *pSma, int64_t version, const char *msg) { + int32_t code = TSDB_CODE_SUCCESS; + + if ((code = tdProcessTSmaCreateImpl(pSma, version, msg)) < 0) { + smaWarn("vgId:%d, create tsma failed since %s", SMA_VID(pSma), tstrerror(terrno)); + } + // TODO: destroy SSDataBlocks(msg) + return code; +} + +int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) { + int32_t code = TSDB_CODE_SUCCESS; + if ((code = tdProcessTSmaGetDaysImpl(pCfg, pCont, contLen, days)) < 0) { + smaWarn("vgId:%d, get tsma days failed since %s", pCfg->vgId, tstrerror(terrno)); + } + smaDebug("vgId:%d, get tsma days %d", pCfg->vgId, *days); + return code; +} + /** * @brief Judge the tsma file split days * diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index 2ce6d23648..22d82e8df0 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -294,4 +294,23 @@ int32_t tdRemoveTFile(STFile *pTFile) { } // smaXXXUtil ================ +void *tdAcquireSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln) { + void *pResult = taosAcquireRef(rsetId, refId); + if (!pResult) { + smaWarn("%s:%d taosAcquireRef for rsetId:%" PRIi64 " refId:%d failed since %s", tags, ln, rsetId, refId, terrstr()); + } else { + smaDebug("%s:%d taosAcquireRef for rsetId:%" PRIi64 " refId:%d success", tags, ln, rsetId, refId); + } + return pResult; +} + +int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln) { + if (taosReleaseRef(rsetId, refId) < 0) { + smaWarn("%s:%d taosReleaseRef for rsetId:%" PRIi64 " refId:%d failed since %s", tags, ln, rsetId, refId, terrstr()); + return TSDB_CODE_FAILED; + } + smaDebug("%s:%d taosReleaseRef for rsetId:%" PRIi64 " refId:%d success", tags, ln, rsetId, refId); + + return TSDB_CODE_SUCCESS; +} // ... \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeModule.c b/source/dnode/vnode/src/vnd/vnodeModule.c index d0aede145e..69f87a2c1a 100644 --- a/source/dnode/vnode/src/vnd/vnodeModule.c +++ b/source/dnode/vnode/src/vnd/vnodeModule.c @@ -100,6 +100,7 @@ void vnodeCleanup() { walCleanUp(); tqCleanUp(); + smaCleanUp(); } int vnodeScheduleTask(int (*execute)(void*), void* arg) { diff --git a/tests/system-test/0-others/taosShellError.py b/tests/system-test/0-others/taosShellError.py index 825ac015fb..5735e55c03 100644 --- a/tests/system-test/0-others/taosShellError.py +++ b/tests/system-test/0-others/taosShellError.py @@ -162,28 +162,28 @@ class TDTestCase: keyDict['h'] = 'abc' retCode, retVal = taos_command(buildPath, "h", keyDict['h'], "taos>", keyDict['c'], '') if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal): - tdLog.info("taos -h %s test suceess"%keyDict['h']) + tdLog.info("taos -h %s test success"%keyDict['h']) else: tdLog.exit("taos -h %s fail"%keyDict['h']) keyDict['h'] = '\'abc\'' retCode, retVal = taos_command(buildPath, "h", keyDict['h'], "taos>", keyDict['c'], '') if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal): - tdLog.info("taos -h %s test suceess"%keyDict['h']) + tdLog.info("taos -h %s test success"%keyDict['h']) else: tdLog.exit("taos -h %s fail"%keyDict['h']) keyDict['h'] = '3' retCode, retVal = taos_command(buildPath, "h", keyDict['h'], "taos>", keyDict['c'], '') if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal): - tdLog.info("taos -h %s test suceess"%keyDict['h']) + tdLog.info("taos -h %s test success"%keyDict['h']) else: tdLog.exit("taos -h %s fail"%keyDict['h']) keyDict['h'] = '\'3\'' retCode, retVal = taos_command(buildPath, "h", keyDict['h'], "taos>", keyDict['c'], '') if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal): - tdLog.info("taos -h %s test suceess"%keyDict['h']) + tdLog.info("taos -h %s test success"%keyDict['h']) else: tdLog.exit("taos -h %s fail"%keyDict['h']) @@ -193,42 +193,42 @@ class TDTestCase: keyDict['P'] = 'abc' retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '') if (retCode == "TAOS_FAIL") and ("Invalid port" in retVal): - tdLog.info("taos -P %s test suceess"%keyDict['P']) + tdLog.info("taos -P %s test success"%keyDict['P']) else: tdLog.exit("taos -P %s fail"%keyDict['P']) keyDict['P'] = '\'abc\'' retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '') if (retCode == "TAOS_FAIL") and ("Invalid port" in retVal): - tdLog.info("taos -P %s test suceess"%keyDict['P']) + tdLog.info("taos -P %s test success"%keyDict['P']) else: tdLog.exit("taos -P %s fail"%keyDict['P']) keyDict['P'] = '3' retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '') if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal): - tdLog.info("taos -P %s test suceess"%keyDict['P']) + tdLog.info("taos -P %s test success"%keyDict['P']) else: tdLog.exit("taos -P %s fail"%keyDict['P']) keyDict['P'] = '\'3\'' retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '') if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal): - tdLog.info("taos -P %s test suceess"%keyDict['P']) + tdLog.info("taos -P %s test success"%keyDict['P']) else: tdLog.exit("taos -P %s fail"%keyDict['P']) keyDict['P'] = '12ab' retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '') if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal): - tdLog.info("taos -P %s test suceess"%keyDict['P']) + tdLog.info("taos -P %s test success"%keyDict['P']) else: tdLog.exit("taos -P %s fail"%keyDict['P']) keyDict['P'] = '\'12ab\'' retCode, retVal = taos_command(buildPath, "P", keyDict['P'], "taos>", keyDict['c'], '') if (retCode == "TAOS_FAIL") and ("Unable to establish connection" in retVal): - tdLog.info("taos -P %s test suceess"%keyDict['P']) + tdLog.info("taos -P %s test success"%keyDict['P']) else: tdLog.exit("taos -P %s fail"%keyDict['P']) @@ -293,7 +293,7 @@ class TDTestCase: keyDict['p'] = 'errorPassword' retCode, retVal = taos_command(buildPath, "u", keyDict['u'], "taos>", keyDict['c'], sqlString, 'p', keyDict['p']) if retCode == "TAOS_FAIL" and "Authentication failure" in retVal: - tdLog.info("taos -p %s test suceess"%keyDict['p']) + tdLog.info("taos -p %s test success"%keyDict['p']) else: tdLog.exit("taos -u %s -p %s"%(keyDict['u'], keyDict['p']))