diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 3c3071c8df..716f51933e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2657,6 +2657,34 @@ typedef struct { SEpSet epSet; } SVgEpSet; +typedef struct { + int64_t refId; + int64_t suid; + int8_t level; +} SRSmaFetchMsg; + +static FORCE_INLINE int32_t tEncodeSRSmaFetchMsg(SEncoder* pCoder, const SRSmaFetchMsg* pReq) { + if (tStartEncode(pCoder) < 0) return -1; + + if (tEncodeI64(pCoder, pReq->refId) < 0) return -1; + if (tEncodeI64(pCoder, pReq->suid) < 0) return -1; + if (tEncodeI8(pCoder, pReq->level) < 0) return -1; + + tEndEncode(pCoder); + return 0; +} + +static FORCE_INLINE int32_t tDecodeSRSmaFetchMsg(SDecoder* pCoder, SRSmaFetchMsg* pReq) { + if (tStartDecode(pCoder) < 0) return -1; + + if (tDecodeI64(pCoder, &pReq->refId) < 0) return -1; + if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1; + if (tDecodeI8(pCoder, &pReq->level) < 0) return -1; + + tEndDecode(pCoder); + return 0; +} + typedef struct { int8_t version; // for compatibility(default 0) int8_t intervalUnit; // MACRO: TIME_UNIT_XXX diff --git a/include/util/tlockfree.h b/include/util/tlockfree.h index 8db6be8860..82dd5a0e8b 100644 --- a/include/util/tlockfree.h +++ b/include/util/tlockfree.h @@ -25,9 +25,9 @@ extern "C" { // reference counting typedef void (*_ref_fn_t)(const void *pObj); -#define T_REF_DECLARE() \ - struct { \ - int32_t val; \ +#define T_REF_DECLARE() \ + struct { \ + volatile int32_t val; \ } _ref; #define T_REF_REGISTER_FUNC(s, e) \ diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 6411d06081..006d9e749c 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -489,7 +489,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea smaObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); ASSERT(smaObj.uid != 0); char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0}; - snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "td.tsma.rst.tb.%s", pCreate->name); + snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "%s_td_tsma_rst_tb",pCreate->name); memcpy(smaObj.dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN); smaObj.dstTbUid = mndGenerateUid(smaObj.dstTbName, TSDB_TABLE_FNAME_LEN); smaObj.stbUid = pStb->uid; @@ -603,6 +603,9 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea if (mndPersistStream(pMnode, pTrans, &streamObj) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; + mDebug("mndSma: create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d", + pCreate->name, smaObj.uid, smaObj.stbUid, smaObj.dstTbUid, smaObj.dstTbName, smaObj.dstVgId); + code = 0; _OVER: diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 1aee08027c..944d7759b2 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -115,24 +115,29 @@ struct SSmaStat { #define RSMA_FS_LOCK(r) (&(r)->lock) struct SRSmaInfoItem { - void *taskInfo; // qTaskInfo_t - int64_t refId; - tmr_h tmrId; - int32_t maxDelay; int8_t level; int8_t triggerStat; + int32_t maxDelay; + tmr_h tmrId; }; struct SRSmaInfo { STSchema *pTSchema; int64_t suid; + int64_t refId; // refId of SRSmaStat int8_t delFlag; T_REF_DECLARE() SRSmaInfoItem items[TSDB_RETENTION_L2]; + void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t + void *iTaskInfo[TSDB_RETENTION_L2]; // immutable }; -#define RSMA_INFO_HEAD_LEN 24 -#define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1) -#define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1) + +#define RSMA_INFO_HEAD_LEN 32 +#define RSMA_INFO_IS_DEL(r) ((r)->delFlag == 1) +#define RSMA_INFO_SET_DEL(r) ((r)->delFlag = 1) +#define RSMA_INFO_QTASK(r, i) ((r)->taskInfo[i]) +#define RSMA_INFO_IQTASK(r, i) ((r)->iTaskInfo[i]) +#define RSMA_INFO_ITEM(r, i) (&(r)->items[i]) enum { TASK_TRIGGER_STAT_INIT = 0, @@ -168,8 +173,8 @@ int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat); int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo); int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo); -void *tdAcquireSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln); -int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln); +void *tdAcquireSmaRef(int32_t rsetId, int64_t refId); +int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId); int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType); @@ -223,12 +228,11 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) { void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName); void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName); -int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pDest, SRSmaInfo *pSrc); +int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo **pDest, SRSmaInfo *pSrc); void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level); static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); -void tdRemoveRSmaInfoBySuid(SSma *pSma, int64_t suid); int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index fdf2da5558..7427f79509 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -381,6 +381,8 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) { terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST; metaReaderClear(&mr); return -1; + } else if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) { + terrno = TSDB_CODE_SUCCESS; } metaReaderClear(&mr); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 3c88b6f5cb..373cfdfb47 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -308,12 +308,12 @@ static int32_t tdProcessRSmaSyncPostCommitImpl(SSma *pSma) { * @return int32_t */ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { - SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); - if (!pSmaEnv) { + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + if (!pEnv) { return TSDB_CODE_SUCCESS; } - SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); + SSmaStat *pStat = SMA_ENV_STAT(pEnv); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); // step 1: set rsma stat @@ -337,18 +337,26 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { } // step 3: swap rsmaInfoHash and iRsmaInfoHash - ASSERT(!RSMA_IMU_INFO_HASH(pRSmaStat)); + // lock + taosWLockLatch(SMA_ENV_LOCK(pEnv)); + ASSERT(RSMA_INFO_HASH(pRSmaStat)); + ASSERT(!RSMA_IMU_INFO_HASH(pRSmaStat)); RSMA_IMU_INFO_HASH(pRSmaStat) = RSMA_INFO_HASH(pRSmaStat); RSMA_INFO_HASH(pRSmaStat) = taosHashInit(RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); if (!RSMA_INFO_HASH(pRSmaStat)) { + // unlock + taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); smaError("vgId:%d, rsma async commit failed since %s", SMA_VID(pSma), terrstr()); return TSDB_CODE_FAILED; } + // unlock + taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); + // step 4: others pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; @@ -383,26 +391,52 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) { * @return int32_t */ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { - SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); - if (!pSmaEnv) { + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + if (!pEnv) { return TSDB_CODE_SUCCESS; } - SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); + SSmaStat *pStat = SMA_ENV_STAT(pEnv); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); // step 1: merge rsmaInfoHash and iRsmaInfoHash - taosWLockLatch(SMA_ENV_LOCK(pSmaEnv)); - + // lock + taosWLockLatch(SMA_ENV_LOCK(pEnv)); +#if 0 if (taosHashGetSize(RSMA_INFO_HASH(pRSmaStat)) <= 0) { - // TODO: optimization - just switch the hash pointer if rsmaInfoHash is empty - } - + // just switch the hash pointer if rsmaInfoHash is empty + if (taosHashGetSize(RSMA_IMU_INFO_HASH(pRSmaStat)) > 0) { + SHashObj *infoHash = RSMA_INFO_HASH(pRSmaStat); + RSMA_INFO_HASH(pRSmaStat) = RSMA_IMU_INFO_HASH(pRSmaStat); + RSMA_IMU_INFO_HASH(pRSmaStat) = infoHash; + } + } else { +#endif +#if 1 void *pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), NULL); while (pIter) { tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); if (!taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t))) { + SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter; + if (RSMA_INFO_IS_DEL(pRSmaInfo)) { + int32_t refVal = T_REF_VAL_GET(pRSmaInfo); + if (refVal == 0) { + tdFreeRSmaInfo(pSma, pRSmaInfo, true); + smaDebug( + "vgId:%d, rsma async post commit, free rsma info since already deleted and ref is 0 for " + "table:%" PRIi64, + SMA_VID(pSma), *pSuid); + } else { + smaDebug( + "vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for " + "table:%" PRIi64, + SMA_VID(pSma), refVal, *pSuid); + } + + pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter); + continue; + } taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter)); smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), *pSuid); @@ -416,11 +450,14 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter); } +#endif + // } taosHashCleanup(RSMA_IMU_INFO_HASH(pRSmaStat)); RSMA_IMU_INFO_HASH(pRSmaStat) = NULL; - taosWUnLockLatch(SMA_ENV_LOCK(pSmaEnv)); + // unlock + taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); // step 2: cleanup outdated qtaskinfo files tdCleanupQTaskInfoFiles(pSma, pRSmaStat); diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index 4b831225bc..ccb6ad3a72 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -17,7 +17,7 @@ typedef struct SSmaStat SSmaStat; -#define SMA_MGMT_REF_NUM 10240 +#define SMA_MGMT_REF_NUM 10240 extern SSmaMgmt smaMgmt; @@ -171,7 +171,7 @@ int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) { int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) { if (!pRSmaInfo) return 0; - + int ref = T_REF_INC(pRSmaInfo); smaDebug("vgId:%d, ref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref); return 0; @@ -183,9 +183,6 @@ int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) { int ref = T_REF_DEC(pRSmaInfo); smaDebug("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref); - if (ref == 0) { - tdRemoveRSmaInfoBySuid(pSma, pRSmaInfo->suid); - } return 0; } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 1c6d65c4d8..fd2222c5e4 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -32,12 +32,14 @@ static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *ui static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids); static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo, int8_t idx); -static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *rsmaItem, - STSchema *pTSchema, tb_uid_t suid, int8_t level); -static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid); -static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat, - int8_t blkType); -static void tdRSmaFetchTrigger(void *param, void *tmrId); +static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid, + int8_t level); +static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid); +static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); + +static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, + SRSmaStat *pStat, int8_t blkType); +static void tdRSmaFetchTrigger(void *param, void *tmrId); static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile); static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish); @@ -115,17 +117,26 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { if (pInfo) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { SRSmaInfoItem *pItem = &pInfo->items[i]; - if (pItem->taskInfo) { - if (isDeepFree && pItem->tmrId) { - smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pInfo->suid, - pItem->tmrId, i + 1); - taosTmrStopA(&pItem->tmrId); - } - tdFreeQTaskInfo(&pItem->taskInfo, SMA_VID(pSma), i + 1); + + if (isDeepFree && pItem->tmrId) { + smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pInfo->suid, + pItem->tmrId, i + 1); + taosTmrStopA(&pItem->tmrId); + } + + if (isDeepFree && pInfo->taskInfo[i]) { + tdFreeQTaskInfo(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1); } else { smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma), pInfo->suid, i + 1); } + + if (pInfo->iTaskInfo[i]) { + tdFreeQTaskInfo(&pInfo->iTaskInfo[i], SMA_VID(pSma), i + 1); + } else { + smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty iTaskInfo", + SMA_VID(pSma), pInfo->suid, i + 1); + } } if (isDeepFree) { taosMemoryFreeClear(pInfo->pTSchema); @@ -155,7 +166,12 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids) return TSDB_CODE_FAILED; } - pRSmaInfo = tdGetRSmaInfoBySuid(pSma, *suid); + if (!taosArrayGetSize(tbUids)) { + smaDebug("vgId:%d, no need to update tbUidList for suid:%" PRIi64 " since Empty tbUids", SMA_VID(pSma), *suid); + return TSDB_CODE_SUCCESS; + } + + pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, *suid); if (!pRSmaInfo) { smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid); @@ -163,26 +179,21 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids) return TSDB_CODE_FAILED; } - if (pRSmaInfo->items[0].taskInfo) { - if ((qUpdateQualifiedTableId(pRSmaInfo->items[0].taskInfo, tbUids, true) < 0)) { - smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr()); - return TSDB_CODE_FAILED; - } else { - smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, SMA_VID(pSma), - pRSmaInfo->items[0].taskInfo, *suid, *(int64_t *)taosArrayGet(tbUids, 0)); - } - } - - if (pRSmaInfo->items[1].taskInfo) { - if ((qUpdateQualifiedTableId(pRSmaInfo->items[1].taskInfo, tbUids, true) < 0)) { - smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " since %s", SMA_VID(pSma), *suid, terrstr()); - return TSDB_CODE_FAILED; - } else { - smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, SMA_VID(pSma), - pRSmaInfo->items[1].taskInfo, *suid, *(int64_t *)taosArrayGet(tbUids, 0)); + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + if (pRSmaInfo->taskInfo[i]) { + if ((qUpdateQualifiedTableId(pRSmaInfo->taskInfo[i], tbUids, true) < 0)) { + tdReleaseRSmaInfo(pSma, pRSmaInfo); + smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i, + terrstr()); + return TSDB_CODE_FAILED; + } else { + smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 " uid:%" PRIi64 " level %d", + SMA_VID(pSma), pRSmaInfo->taskInfo[0], *suid, *(int64_t *)taosArrayGet(tbUids, 0), i); + } } } + tdReleaseRSmaInfo(pSma, pRSmaInfo); return TSDB_CODE_SUCCESS; } @@ -267,13 +278,12 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat .initTqReader = 1, }; - SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]); - pItem->refId = RSMA_REF_ID(pStat); - pItem->taskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle); - if (!pItem->taskInfo) { + pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle); + if (!pRSmaInfo->taskInfo[idx]) { terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE; return TSDB_CODE_FAILED; } + SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]); pItem->triggerStat = TASK_TRIGGER_STAT_INACTIVE; if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) { int64_t msInterval = @@ -342,6 +352,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con } pRSmaInfo->pTSchema = pTSchema; pRSmaInfo->suid = suid; + pRSmaInfo->refId = RSMA_REF_ID(pStat); T_REF_INIT_VAL(pRSmaInfo, 1); if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0) { @@ -411,7 +422,7 @@ int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) { SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); - SRSmaInfo *pRSmaInfo = tdGetRSmaInfoBySuid(pSma, pReq->suid); + SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, pReq->suid); if (!pRSmaInfo) { smaWarn("vgId:%d, drop rsma for stable %s %" PRIi64 " failed no rsma in hash", TD_VID(pVnode), pReq->name, @@ -423,8 +434,10 @@ int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) { RSMA_INFO_SET_DEL(pRSmaInfo); tdUnRefRSmaInfo(pSma, pRSmaInfo); - // save to file + tdReleaseRSmaInfo(pSma, pRSmaInfo); + // save to file + // TODO smaDebug("vgId:%d, drop rsma for table %" PRIi64 " succeed", TD_VID(pVnode), pReq->suid); return TSDB_CODE_SUCCESS; } @@ -567,8 +580,32 @@ static void tdDestroySDataBlockArray(SArray *pArray) { taosArrayDestroy(pArray); } -static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat, - int8_t blkType) { +/** + * @brief retention of rsma1/rsma2 + * + * @param pSma + * @param now + * @return int32_t + */ +int32_t smaDoRetention(SSma *pSma, int64_t now) { + int32_t code = TSDB_CODE_SUCCESS; + if (VND_IS_RSMA(pSma->pVnode)) { + return code; + } + + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + if (pSma->pRSmaTsdb[i]) { + code = tsdbDoRetention(pSma->pRSmaTsdb[i], now); + if (code) goto _end; + } + } + +_end: + return code; +} + +static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, + SRSmaStat *pStat, int8_t blkType) { SArray *pResult = NULL; SSma *pSma = pStat->pSma; @@ -576,7 +613,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche SSDataBlock *output = NULL; uint64_t ts; - int32_t code = qExecTask(pItem->taskInfo, &output, &ts); + int32_t code = qExecTask(taskInfo, &output, &ts); if (code < 0) { smaError("vgId:%d, qExecTask for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), suid, pItem->level, terrstr(code)); @@ -637,29 +674,32 @@ _err: return TSDB_CODE_FAILED; } -static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfoItem *pItem, - STSchema *pTSchema, tb_uid_t suid, int8_t level) { - if (!pItem || !pItem->taskInfo) { +static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid, + int8_t level) { + int32_t idx = level - 1; + if (!pInfo || !RSMA_INFO_QTASK(pInfo, idx)) { smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid); return TSDB_CODE_SUCCESS; } - if (!pTSchema) { + if (!pInfo->pTSchema) { smaWarn("vgId:%d, no schema to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, suid); return TSDB_CODE_FAILED; } smaDebug("vgId:%d, execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, SMA_VID(pSma), level, - pItem->taskInfo, suid); + RSMA_INFO_QTASK(pInfo, idx), suid); - if (qSetMultiStreamInput(pItem->taskInfo, pMsg, 1, inputType) < 0) { // INPUT__DATA_SUBMIT + if (qSetMultiStreamInput(RSMA_INFO_QTASK(pInfo, idx), pMsg, 1, inputType) < 0) { // INPUT__DATA_SUBMIT smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno)); return TSDB_CODE_FAILED; } - SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); - SRSmaStat *pStat = SMA_RSMA_STAT(pEnv->pStat); + SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); + SRSmaStat *pStat = SMA_RSMA_STAT(pEnv->pStat); + SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx); - tdRSmaFetchAndSubmitResult(pItem, pTSchema, suid, pStat, STREAM_INPUT__DATA_SUBMIT); + tdRSmaFetchAndSubmitResult(RSMA_INFO_QTASK(pInfo, idx), pItem, pInfo->pTSchema, suid, pStat, + STREAM_INPUT__DATA_SUBMIT); atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); if (smaMgmt.tmrHandle) { @@ -678,7 +718,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType * @param suid * @return SRSmaInfo* */ -static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) { +static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pStat = NULL; SRSmaInfo *pRSmaInfo = NULL; @@ -692,94 +732,86 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) { return NULL; } + taosRLockLatch(SMA_ENV_LOCK(pEnv)); pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); if (pRSmaInfo && (pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { + if (RSMA_INFO_IS_DEL(pRSmaInfo)) { + taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + return NULL; + } + tdRefRSmaInfo(pSma, pRSmaInfo); + taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return pRSmaInfo; } - if (RSMA_COMMIT_STAT(pStat) == 0) { + if (RSMA_COMMIT_STAT(pStat) == 0) { // return NULL if not in committing stat + taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return NULL; } + taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); // clone the SRSmaInfo from iRsmaInfoHash to rsmaInfoHash if in committing stat SRSmaInfo *pCowRSmaInfo = NULL; // lock taosWLockLatch(SMA_ENV_LOCK(pEnv)); - if (!taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t))) { // 2-phase lock + if (!(pCowRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)))) { // 2-phase lock void *iRSmaInfo = taosHashGet(RSMA_IMU_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); if (iRSmaInfo) { SRSmaInfo *pIRSmaInfo = *(SRSmaInfo **)iRSmaInfo; - if (pIRSmaInfo) { - if (tdCloneRSmaInfo(pSma, pCowRSmaInfo, pIRSmaInfo) < 0) { + if (pIRSmaInfo && !RSMA_INFO_IS_DEL(pIRSmaInfo)) { + if (tdCloneRSmaInfo(pSma, &pCowRSmaInfo, pIRSmaInfo) < 0) { + // unlock taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); smaError("vgId:%d, clone rsma info failed for suid:%" PRIu64 " since %s", SMA_VID(pSma), suid, terrstr()); return NULL; } smaDebug("vgId:%d, clone rsma info succeed for suid:%" PRIu64, SMA_VID(pSma), suid); if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pCowRSmaInfo, sizeof(pCowRSmaInfo)) < 0) { + // unlock taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); + smaError("vgId:%d, clone rsma info failed for suid:%" PRIu64 " since %s", SMA_VID(pSma), suid, terrstr()); return NULL; } } } + } else { + pCowRSmaInfo = *(SRSmaInfo **)pCowRSmaInfo; + ASSERT(!pCowRSmaInfo); + } + + if(pCowRSmaInfo) { + tdRefRSmaInfo(pSma, pCowRSmaInfo); } // unlock taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); return pCowRSmaInfo; } -/** - * @brief During the drop procedure, only need to delete the object in rsmaInfoHash. - * - * @param pSma - * @param suid - * @return SRSmaInfo* - */ -void tdRemoveRSmaInfoBySuid(SSma *pSma, int64_t suid) { - SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); - SRSmaStat *pStat = NULL; - SRSmaInfo *pRSmaInfo = NULL; - - if (!pEnv) { - return; - } - - pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); - if (!pStat || !RSMA_INFO_HASH(pStat)) { - return; - } - - pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); - if (pRSmaInfo) { - if ((pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { - tdFreeRSmaInfo(pSma, pRSmaInfo, true); - } - taosHashRemove(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); - smaDebug("vgId:%d, remove from infoHash for table:%" PRIu64 " succeed", SMA_VID(pSma), suid); +static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { + if (pInfo) { + tdUnRefRSmaInfo(pSma, pInfo); } } static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) { - SRSmaInfo *pRSmaInfo = tdGetRSmaInfoBySuid(pSma, suid); + SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid); if (!pRSmaInfo) { smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid); return TSDB_CODE_SUCCESS; } - if (!pRSmaInfo->items[0].taskInfo) { + if (!RSMA_INFO_QTASK(pRSmaInfo, 0)) { + tdReleaseRSmaInfo(pSma, pRSmaInfo); smaDebug("vgId:%d, execute rsma, no rsma qTaskInfo for suid:%" PRIu64, SMA_VID(pSma), suid); return TSDB_CODE_SUCCESS; } if (inputType == STREAM_INPUT__DATA_SUBMIT) { - tdRefRSmaInfo(pSma, pRSmaInfo); - - tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[0], pRSmaInfo->pTSchema, suid, TSDB_RETENTION_L1); - tdExecuteRSmaImpl(pSma, pMsg, inputType, &pRSmaInfo->items[1], pRSmaInfo->pTSchema, suid, TSDB_RETENTION_L2); - - tdUnRefRSmaInfo(pSma, pRSmaInfo); + tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo, suid, TSDB_RETENTION_L1); + tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo, suid, TSDB_RETENTION_L2); } + tdReleaseRSmaInfo(pSma, pRSmaInfo); return TSDB_CODE_SUCCESS; } @@ -990,26 +1022,28 @@ static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem * SRSmaInfo *pRSmaInfo = NULL; void *qTaskInfo = NULL; - pRSmaInfo = tdGetRSmaInfoBySuid(pSma, pItem->suid); + pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, pItem->suid); if (!pRSmaInfo) { smaDebug("vgId:%d, no restore as no rsma info for table:%" PRIu64, SMA_VID(pSma), pItem->suid); return TSDB_CODE_SUCCESS; } if (pItem->type == TSDB_RETENTION_L1) { - qTaskInfo = pRSmaInfo->items[0].taskInfo; + qTaskInfo = RSMA_INFO_QTASK(pRSmaInfo, 0); } else if (pItem->type == TSDB_RETENTION_L2) { - qTaskInfo = pRSmaInfo->items[1].taskInfo; + qTaskInfo = RSMA_INFO_QTASK(pRSmaInfo, 1); } else { ASSERT(0); } if (!qTaskInfo) { + tdReleaseRSmaInfo(pSma, pRSmaInfo); smaDebug("vgId:%d, no restore as NULL rsma qTaskInfo for table:%" PRIu64, SMA_VID(pSma), pItem->suid); return TSDB_CODE_SUCCESS; } if (qDeserializeTaskStatus(qTaskInfo, pItem->qTaskInfo, pItem->len) < 0) { + tdReleaseRSmaInfo(pSma, pRSmaInfo); smaError("vgId:%d, restore rsma task failed for table:%" PRIi64 " level %d since %s", SMA_VID(pSma), pItem->suid, pItem->type, terrstr()); return TSDB_CODE_FAILED; @@ -1017,6 +1051,7 @@ static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem * smaDebug("vgId:%d, restore rsma task success for table:%" PRIi64 " level %d", SMA_VID(pSma), pItem->suid, pItem->type); + tdReleaseRSmaInfo(pSma, pRSmaInfo); return TSDB_CODE_SUCCESS; } @@ -1195,8 +1230,14 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { while (infoHash) { SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; + + if (RSMA_INFO_IS_DEL(pRSmaInfo)) { + infoHash = taosHashIterate(pInfoHash, infoHash); + continue; + } + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { - qTaskInfo_t taskInfo = pRSmaInfo->items[i].taskInfo; + qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pRSmaInfo, i); if (!taskInfo) { smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1); continue; @@ -1290,11 +1331,17 @@ _err: static void tdRSmaFetchTrigger(void *param, void *tmrId) { SRSmaInfoItem *pItem = param; SSma *pSma = NULL; - SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__); + SRSmaInfo *pRSmaInfo = tdGetRSmaInfoByItem(pItem); + + if (RSMA_INFO_IS_DEL(pRSmaInfo)) { + return; + } + + SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); if (!pStat) { smaDebug("rsma fetch task not start since already destroyed, rsetId rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId, - pItem->refId); + pRSmaInfo->refId); return; } @@ -1305,10 +1352,10 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { switch (rsmaTriggerStat) { case TASK_TRIGGER_STAT_PAUSED: case TASK_TRIGGER_STAT_CANCELLED: { - tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__); + tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data since stat is %" PRIi8 ", rsetId rsetId:%" PRIi64 " refId:%d", - SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pItem->refId); + SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaInfo->refId); if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) { taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay > 5000 ? 5000 : pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId); @@ -1319,11 +1366,6 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { break; } - SRSmaInfo *pRSmaInfo = tdGetRSmaInfoByItem(pItem); - if (RSMA_INFO_IS_DEL(pRSmaInfo)) { - goto _end; - } - int8_t fetchTriggerStat = atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE); switch (fetchTriggerStat) { @@ -1332,16 +1374,14 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { pItem->level, pRSmaInfo->suid); // sync procedure => async process - tdRefRSmaInfo(pSma, pRSmaInfo); SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; - qSetMultiStreamInput(pItem->taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK); - tdRSmaFetchAndSubmitResult(pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, STREAM_INPUT__DATA_BLOCK); - tdCleanupStreamInputDataBlock(pItem->taskInfo); + qTaskInfo_t taskInfo = pRSmaInfo->taskInfo[pItem->level - 1]; + qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK); + tdRSmaFetchAndSubmitResult(taskInfo, pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, + STREAM_INPUT__DATA_BLOCK); + tdCleanupStreamInputDataBlock(taskInfo); - tdUnRefRSmaInfo(pSma, pRSmaInfo); - // atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); - // taosTmrReset(tdRSmaFetchTrigger, 5000, pItem, smaMgmt.tmrHandle, &pItem->tmrId); } break; case TASK_TRIGGER_STAT_PAUSED: { smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused", @@ -1362,22 +1402,5 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { } _end: - tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__); + tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); } - -int32_t smaDoRetention(SSma *pSma, int64_t now) { - int32_t code = TSDB_CODE_SUCCESS; - if (VND_IS_RSMA(pSma->pVnode)) { - return code; - } - - for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { - if (pSma->pRSmaTsdb[i]) { - code = tsdbDoRetention(pSma->pRSmaTsdb[i], now); - if (code) goto _end; - } - } - -_end: - return code; -} \ No newline at end of file diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index 6d71f3a250..7f69acc889 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -116,8 +116,10 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) { } // create stable to save tsma result in dstVgId + SName stbFullName = {0}; + tNameFromString(&stbFullName, pCfg->dstTbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); SVCreateStbReq pReq = {0}; - pReq.name = pCfg->dstTbName; + pReq.name = (char*)tNameGetTableName(&stbFullName); pReq.suid = pCfg->dstTbUid; pReq.schemaRow = pCfg->schemaRow; pReq.schemaTag = pCfg->schemaTag; @@ -125,6 +127,12 @@ int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) { if (metaCreateSTable(SMA_META(pSma), version, &pReq) < 0) { return -1; } + + smaDebug("vgId:%d, success to create sma index %s %" PRIi64 " on stb:%" PRIi64 ", dstSuid:%" PRIi64 + " dstTb:%s dstVg:%d", + SMA_VID(pSma), pCfg->indexName, pCfg->indexUid, pCfg->tableUid, pCfg->dstTbUid, pReq.name, pCfg->dstVgId); + } else { + ASSERT(0); } return 0; diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index 9dce166afa..d9f38ffd09 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -287,22 +287,22 @@ int32_t tdRemoveTFile(STFile *pTFile) { } // smaXXXUtil ================ -void *tdAcquireSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln) { +void *tdAcquireSmaRef(int32_t rsetId, int64_t refId) { void *pResult = taosAcquireRef(rsetId, refId); if (!pResult) { - smaWarn("%s:%d taosAcquireRef for rsetId:%" PRIi64 " refId:%d failed since %s", tags, ln, rsetId, refId, terrstr()); + smaWarn("rsma acquire ref for rsetId:%" PRIi64 " refId:%d failed since %s", rsetId, refId, terrstr()); } else { - smaDebug("%s:%d taosAcquireRef for rsetId:%" PRIi64 " refId:%d success", tags, ln, rsetId, refId); + smaDebug("rsma acquire ref for rsetId:%" PRIi64 " refId:%d success", rsetId, refId); } return pResult; } -int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln) { +int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId) { if (taosReleaseRef(rsetId, refId) < 0) { - smaWarn("%s:%d taosReleaseRef for rsetId:%" PRIi64 " refId:%d failed since %s", tags, ln, rsetId, refId, terrstr()); + smaWarn("rsma release ref for rsetId:%" PRIi64 " refId:%d failed since %s", rsetId, refId, terrstr()); return TSDB_CODE_FAILED; } - smaDebug("%s:%d taosReleaseRef for rsetId:%" PRIi64 " refId:%d success", tags, ln, rsetId, refId); + smaDebug("rsma release ref for rsetId:%" PRIi64 " refId:%d success", rsetId, refId); return TSDB_CODE_SUCCESS; } @@ -313,7 +313,7 @@ static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t char *pOutput = NULL; int32_t len = 0; - if (qSerializeTaskStatus(srcTaskInfo, &pOutput, &len) < 0) { + if ((terrno = qSerializeTaskStatus(srcTaskInfo, &pOutput, &len)) < 0) { smaError("vgId:%d, rsma clone, table %" PRIi64 " serialize qTaskInfo failed since %s", TD_VID(pVnode), suid, terrstr()); goto _err; @@ -337,41 +337,34 @@ static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t goto _err; } - smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " succeed", TD_VID(pVnode), suid); + smaDebug("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " succeed", TD_VID(pVnode), suid); taosMemoryFreeClear(pOutput); return TSDB_CODE_SUCCESS; _err: taosMemoryFreeClear(pOutput); tdFreeQTaskInfo(dstTaskInfo, TD_VID(pVnode), idx + 1); + smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid, + terrstr()); return TSDB_CODE_FAILED; } /** * @brief pTSchema is shared - * - * @param pSma - * @param pDest - * @param pSrc - * @return int32_t + * + * @param pSma + * @param pDest + * @param pSrc + * @return int32_t */ -int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pDest, SRSmaInfo *pSrc) { +int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo **pDest, SRSmaInfo *pSrc) { SVnode *pVnode = pSma->pVnode; SRSmaParam *param = NULL; if (!pSrc) { + *pDest = NULL; return TSDB_CODE_SUCCESS; } - if (!pDest) { - pDest = taosMemoryCalloc(1, sizeof(SRSmaInfo)); - if (!pDest) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; - } - } - - memcpy(pDest, pSrc, sizeof(SRSmaInfo)); - SMetaReader mr = {0}; metaReaderInit(&mr, SMA_META(pSma), 0); smaDebug("vgId:%d, rsma clone, suid is %" PRIi64, TD_VID(pVnode), pSrc->suid); @@ -384,21 +377,22 @@ int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pDest, SRSmaInfo *pSrc) { ASSERT(mr.me.uid == pSrc->suid); if (TABLE_IS_ROLLUP(mr.me.flags)) { param = &mr.me.stbEntry.rsmaParam; - for (int i = 0; i < TSDB_RETENTION_L2; ++i) { - SRSmaInfoItem *pItem = &pSrc->items[i]; - if (pItem->taskInfo) { - tdCloneQTaskInfo(pSma, pDest->items[i].taskInfo, pItem->taskInfo, param, pSrc->suid, i); + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + if (tdCloneQTaskInfo(pSma, pSrc->iTaskInfo[i], pSrc->taskInfo[i], param, pSrc->suid, i) < 0) { + goto _err; } } smaDebug("vgId:%d, rsma clone env success for %" PRIi64, TD_VID(pVnode), pSrc->suid); } metaReaderClear(&mr); + + *pDest = pSrc; // pointer copy + return TSDB_CODE_SUCCESS; _err: + *pDest = NULL; metaReaderClear(&mr); - tdFreeRSmaInfo(pSma, pDest, false); + smaError("vgId:%d, rsma clone env failed for %" PRIi64 " since %s", TD_VID(pVnode), pSrc->suid, terrstr()); return TSDB_CODE_FAILED; -} - -// ... \ No newline at end of file +} \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index fbbe5bc695..15cf183b2a 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -351,7 +351,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { // TODO: remove the function void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { // TODO - blockDebugShowDataBlocks(data, __func__); tdProcessTSmaInsert(((SVnode *)pVnode)->pSma, smaId, (const char *)data); } @@ -852,6 +851,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq if (metaCreateTable(pVnode->pMeta, version, &createTbReq) < 0) { if (terrno != TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { submitBlkRsp.code = terrno; + pRsp->code = terrno; tDecoderClear(&decoder); taosArrayDestroy(createTbReq.ctb.tagName); goto _exit;