diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 7b63056897..2c6e5f5ca1 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -351,7 +351,11 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { nLoops = 0; } } - smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void*)taosGetSelfPthreadId()); + smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); + if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) { + return TSDB_CODE_FAILED; + } + smaInfo("vgId:%d, rsma commit, operator state commited, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); #if 0 // consuming task of qTaskInfo clone // step 4: swap queue/qall and iQueue/iQall @@ -391,13 +395,14 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) { if (!pSmaEnv) { return TSDB_CODE_SUCCESS; } - +#if 0 SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); // perform persist task for qTaskInfo operator if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) { return TSDB_CODE_FAILED; } +#endif return TSDB_CODE_SUCCESS; } @@ -421,8 +426,8 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { // lock // taosWLockLatch(SMA_ENV_LOCK(pEnv)); - void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL); - while (pIter) { + void *pIter = NULL; + while ((pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter))) { tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter; if (RSMA_INFO_IS_DEL(pRSmaInfo)) { @@ -440,14 +445,13 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { SMA_VID(pSma), refVal, *pSuid); } - pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter); continue; } - +#if 0 if (pRSmaInfo->taskInfo[0]) { if (pRSmaInfo->iTaskInfo[0]) { SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pRSmaInfo->iTaskInfo[0]; - tdFreeRSmaInfo(pSma, pRSmaInfo, true); + tdFreeRSmaInfo(pSma, pRSmaInfo, false); pRSmaInfo->iTaskInfo[0] = NULL; } } else { @@ -456,8 +460,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter)); smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), *pSuid); - - pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter); +#endif } for (int32_t i = 0; i < taosArrayGetSize(rsmaDeleted); ++i) { diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 049c2fef9d..6414f822c0 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -19,9 +19,9 @@ #define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid #define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt #define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt -#define RSMA_FETCH_DELAY_MAX (1800000) // ms -#define RSMA_FETCH_SKIP_MAX (1000) // cnt -#define RSMA_FETCH_ACTIVE_MAX (1800) // ms +#define RSMA_FETCH_DELAY_MAX (180000) // ms +#define RSMA_FETCH_SKIP_MAX (10) // cnt +#define RSMA_FETCH_ACTIVE_MAX (180) // ms SSmaMgmt smaMgmt = { .inited = 0, @@ -163,8 +163,8 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { pInfo->iQall = NULL; } - taosMemoryFree(pInfo); - } + taosMemoryFree(pInfo); + } return NULL; } @@ -968,13 +968,12 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { goto _err; } - void *pIter = taosHashIterate(uidStore.uidHash, NULL); - while (pIter) { + void *pIter = NULL; + while ((pIter = taosHashIterate(uidStore.uidHash, pIter))) { tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); if (tdExecuteRSmaAsync(pSma, pMsg, inputType, *pTbSuid) < 0) { goto _err; } - pIter = taosHashIterate(uidStore.uidHash, pIter); } if (tdRSmaExecCheck(pSma) < 0) { @@ -1418,7 +1417,10 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { } for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { +#if 0 qTaskInfo_t taskInfo = RSMA_INFO_IQTASK(pRSmaInfo, i); +#endif + qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pRSmaInfo, i); if (!taskInfo) { smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1); continue; @@ -1644,24 +1646,27 @@ static int32_t tdRSmaConsumeAndFetch(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubm } int64_t curMs = taosGetTimestampMs(); - if ((pItem->nSkipped > RSMA_FETCH_SKIP_MAX) || (pItem->nSkipped * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) { - pItem->nSkipped = 0; - smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch executed", - SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay); - } else { - if (((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX)) { - ++pItem->nSkipped; - smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ", - SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); - continue; - } else { - smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ", - SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); - } - } + // if ((pItem->nSkipped > RSMA_FETCH_SKIP_MAX) || (pItem->nSkipped * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) { + // pItem->nSkipped = 0; + // smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch executed", + // SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay); + // } else { + // if (((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX)) { + // ++pItem->nSkipped; + // smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ", + // SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); + // continue; + // } else { + // smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ", + // SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); + // } + // } pItem->lastFetch = curMs; + // smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ", + // SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); + if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { goto _err; } @@ -1817,25 +1822,16 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { ASSERT(0); } - smaInfo("prop:vgId:%d loop end check", SMA_VID(pSma)); if (atomic_load_64(&pRSmaStat->nBufItems) <= 0) { if (pVnode->inClose) { - smaInfo("prop:vgId:%d loop end check - inClose and break", SMA_VID(pSma)); break; } - smaInfo("prop:vgId:%d loop end check - wait for notEmpty", SMA_VID(pSma)); tsem_wait(&pRSmaStat->notEmpty); - smaInfo("prop:vgId:%d loop end check - received notEmpty", SMA_VID(pSma)); if (pVnode->inClose && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) { smaInfo("prop:vgId:%d loop end check - break - inClose:%d, nBufItems:%" PRIi64, SMA_VID(pSma), pVnode->inClose, atomic_load_64(&pRSmaStat->nBufItems)); break; - } else { - smaInfo("prop:vgId:%d loop end check - continue - inClose:%d, nBufItems:%" PRIi64, SMA_VID(pSma), - pVnode->inClose, atomic_load_64(&pRSmaStat->nBufItems)); } - } else { - smaInfo("prop:vgId:%d loop end check - continue to run", SMA_VID(pSma)); } } // end of while(true) diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index da70222485..d771797963 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -375,6 +375,9 @@ int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { if (TABLE_IS_ROLLUP(mr.me.flags)) { param = &mr.me.stbEntry.rsmaParam; for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + if (!pInfo->iTaskInfo[i]) { + continue; + } if (tdCloneQTaskInfo(pSma, pInfo->taskInfo[i], pInfo->iTaskInfo[i], param, pInfo->suid, i) < 0) { goto _err; }