diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index f35eec786c..8882dada9a 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -92,7 +92,9 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { streamStateClose(pItem->pStreamState, false); } - taosMemoryFreeClear(pItem->pStreamTask); + if (pItem->pStreamTask) { + tFreeStreamTask(pItem->pStreamTask); + } tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1); } @@ -173,8 +175,8 @@ int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore, bool isAdd) { return TSDB_CODE_FAILED; } - void *pIter = taosHashIterate(pStore->uidHash, NULL); - while (pIter) { + void *pIter = NULL; + while ((pIter = taosHashIterate(pStore->uidHash, pIter))) { tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); SArray *pTbUids = *(SArray **)pIter; @@ -182,8 +184,6 @@ int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore, bool isAdd) { taosHashCancelIterate(pStore->uidHash, pIter); return TSDB_CODE_FAILED; } - - pIter = taosHashIterate(pStore->uidHash, pIter); } return TSDB_CODE_SUCCESS; } @@ -234,11 +234,12 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo, int8_t idx) { if ((param->qmsgLen > 0) && param->qmsg[idx]) { - SRetention *pRetention = SMA_RETENTION(pSma); - STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma); - SVnode *pVnode = pSma->pVnode; - char taskInfDir[TSDB_FILENAME_LEN] = {0}; - void *pStreamState = NULL; + SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]); + SRetention *pRetention = SMA_RETENTION(pSma); + STsdbCfg *pTsdbCfg = SMA_TSDB_CFG(pSma); + SVnode *pVnode = pSma->pVnode; + char taskInfDir[TSDB_FILENAME_LEN] = {0}; + void *pStreamState = NULL; // set the backend of stream state tdRSmaQTaskInfoGetFullPath(pVnode, pRSmaInfo->suid, idx + 1, pVnode->pTfs, taskInfDir); @@ -258,32 +259,30 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_FAILED; } + pItem->pStreamTask = pStreamTask; pStreamTask->id.taskId = 0; pStreamTask->id.streamId = pRSmaInfo->suid + idx; pStreamTask->chkInfo.startTs = taosGetTimestampMs(); pStreamTask->pMeta = pVnode->pTq->pStreamMeta; + pStreamTask->exec.qmsg = taosMemoryMalloc(2); + sprintf(pStreamTask->exec.qmsg, "%d", idx); pStreamTask->chkInfo.checkpointId = pTsdbCfg->retentions[idx + 1].checkpointId; pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1); if (!pStreamState) { terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN; - taosMemoryFreeClear(pStreamTask); return TSDB_CODE_FAILED; } + pItem->pStreamState = pStreamState; SReadHandle handle = {.vnode = pVnode, .initTqReader = 1, .pStateBackend = pStreamState}; initStorageAPI(&handle.api); - pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode), 0); if (!pRSmaInfo->taskInfo[idx]) { terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE; - taosMemoryFreeClear(pStreamTask); return TSDB_CODE_FAILED; } - SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]); pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE; // fetch the data when reboot - pItem->pStreamState = pStreamState; - pItem->pStreamTask = pStreamTask; if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) { int64_t msInterval = convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND); @@ -509,11 +508,10 @@ static void tdUidStoreDestory(STbUidStore *pStore) { if (pStore->uidHash) { if (pStore->tbUids) { // When pStore->tbUids not NULL, the pStore->uidHash has k/v; otherwise pStore->uidHash only has keys. - void *pIter = taosHashIterate(pStore->uidHash, NULL); - while (pIter) { + void *pIter = NULL; + while ((pIter = taosHashIterate(pStore->uidHash, pIter))) { SArray *arr = *(SArray **)pIter; taosArrayDestroy(arr); - pIter = taosHashIterate(pStore->uidHash, pIter); } } taosHashCleanup(pStore->uidHash); @@ -1082,62 +1080,77 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { SSma *pSma = pRSmaStat->pSma; SVnode *pVnode = pSma->pVnode; SArray *pResList = NULL; - SRSmaFS fs = {0}; if (taosHashGetSize(pInfoHash) <= 0) { return TSDB_CODE_SUCCESS; } - void *infoHash = NULL; + // stream state: trigger checkpoint - while ((infoHash = taosHashIterate(pInfoHash, infoHash))) { - SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; - if (RSMA_INFO_IS_DEL(pRSmaInfo)) { - continue; - } - for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { - if (pRSmaInfo->taskInfo[i]) { - code = qSetSMAInput(pRSmaInfo->taskInfo[i], pRSmaStat->blocks, 1, STREAM_INPUT__CHECKPOINT); - TSDB_CHECK_CODE(code, lino, _exit); - pRSmaInfo->items[i].streamFlushed = 0; - ++nTaskInfo; - } - } - } - // stream state: process checkpoint response in async mode - int32_t nStreamFlushed = 0; - int32_t nSleep = 0; - while (true) { + do { + void *infoHash = NULL; while ((infoHash = taosHashIterate(pInfoHash, infoHash))) { SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; if (RSMA_INFO_IS_DEL(pRSmaInfo)) { continue; } for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { - if (pRSmaInfo->taskInfo[i] && (0 == pRSmaInfo->items[i].streamFlushed)) { - int8_t streamFlushed = 0; - code = tdRSmaExecAndSubmitResult(pSma, pRSmaInfo->taskInfo[i], &pRSmaInfo->items[i], pRSmaInfo->pTSchema, - pRSmaInfo->suid, &pResList, &streamFlushed); - TSDB_CHECK_CODE(code, lino, _exit); - if (streamFlushed && (++nStreamFlushed >= nTaskInfo)) { - smaInfo("vgId:%d checkpoint ready, %d us consumed, received/total: %d/%d", TD_VID(pVnode), nSleep * 10, - nStreamFlushed, nTaskInfo); - goto _checkpoint; + if (pRSmaInfo->taskInfo[i]) { + code = qSetSMAInput(pRSmaInfo->taskInfo[i], pRSmaStat->blocks, 1, STREAM_INPUT__CHECKPOINT); + if (code) { + taosHashCancelIterate(pInfoHash, infoHash); + TSDB_CHECK_CODE(code, lino, _exit); } + pRSmaInfo->items[i].streamFlushed = 0; + ++nTaskInfo; } } } - taosUsleep(10); - ++nSleep; - smaDebug("vgId:%d, wait for checkpoint ready, %d us elapsed, received/total: %d/%d", TD_VID(pVnode), nSleep * 10, - nStreamFlushed, nTaskInfo); - } - // stream state: build checkpoint in backend + } while (0); + + // stream state: wait checkpoint ready in async mode + do { + int32_t nStreamFlushed = 0; + int32_t nSleep = 0; + void *infoHash = NULL; + while (true) { + while ((infoHash = taosHashIterate(pInfoHash, infoHash))) { + SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; + if (RSMA_INFO_IS_DEL(pRSmaInfo)) { + continue; + } + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + if (pRSmaInfo->taskInfo[i] && (0 == pRSmaInfo->items[i].streamFlushed)) { + int8_t streamFlushed = 0; + code = tdRSmaExecAndSubmitResult(pSma, pRSmaInfo->taskInfo[i], &pRSmaInfo->items[i], pRSmaInfo->pTSchema, + pRSmaInfo->suid, &pResList, &streamFlushed); + if (code) { + taosHashCancelIterate(pInfoHash, infoHash); + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (streamFlushed && (++nStreamFlushed >= nTaskInfo)) { + smaInfo("vgId:%d checkpoint ready, %d us consumed, received/total: %d/%d", TD_VID(pVnode), nSleep * 10, + nStreamFlushed, nTaskInfo); + taosHashCancelIterate(pInfoHash, infoHash); + goto _checkpoint; + } + } + } + } + taosUsleep(10); + ++nSleep; + smaDebug("vgId:%d, wait for checkpoint ready, %d us elapsed, received/total: %d/%d", TD_VID(pVnode), nSleep * 10, + nStreamFlushed, nTaskInfo); + } + } while (0); _checkpoint: + // stream state: build checkpoint in backend do { - void *infHash = NULL; - while ((infHash = taosHashIterate(pInfoHash, infHash))) { - SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infHash; + void *infoHash = NULL; + + while ((infoHash = taosHashIterate(pInfoHash, infoHash))) { + SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; if (RSMA_INFO_IS_DEL(pRSmaInfo)) { continue; } @@ -1150,7 +1163,26 @@ _checkpoint: pTask->checkpointingId = taosGetTimestampNs(); pTask->chkInfo.checkpointId = pTask->checkpointingId; code = streamTaskBuildCheckpoint(pTask); - TSDB_CHECK_CODE(code, lino, _exit); + if (code) { + taosHashCancelIterate(pInfoHash, infoHash); + TSDB_CHECK_CODE(code, lino, _exit); + } + + taosWLockLatch(&pTask->pMeta->lock); + if (streamMetaSaveTask(pTask->pMeta, pTask) != 0) { + taosWUnLockLatch(&pTask->pMeta->lock); + code = TSDB_CODE_OUT_OF_MEMORY; + taosHashCancelIterate(pInfoHash, infoHash); + TSDB_CHECK_CODE(code, lino, _exit); + } + + if (streamMetaCommit(pTask->pMeta) != 0) { + taosWUnLockLatch(&pTask->pMeta->lock); + code = TSDB_CODE_OUT_OF_MEMORY; + taosHashCancelIterate(pInfoHash, infoHash); + TSDB_CHECK_CODE(code, lino, _exit); + } + taosWUnLockLatch(&pTask->pMeta->lock); // save checkpointId to vnode.json (pVnode->config.tsdbCfg.retentions + i + 1)->checkpointId = pTask->checkpointingId; @@ -1158,6 +1190,8 @@ _checkpoint: smaInfo("vgId:%d, commit task:%p, build stream checkpoint success, table:%" PRIi64 ", level:%d, checkpointId:%" PRIi64, TD_VID(pVnode), pTask, pRSmaInfo->suid, i + 1, pTask->checkpointingId); + + } } } @@ -1452,6 +1486,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { if (ASSERTS(oldVal >= 0, "oldVal of nFetchAll: %d < 0", oldVal)) { code = TSDB_CODE_APP_ERROR; + taosHashCancelIterate(infoHash, pIter); TSDB_CHECK_CODE(code, lino, _exit); }