diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index e87b356be7..198c93a937 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -137,10 +137,10 @@ struct SSmaStat { #define RSMA_FS_LOCK(r) (&(r)->lock) struct SRSmaInfoItem { - int8_t level : 4; - int8_t fetchLevel : 4; + int8_t level; + int8_t fetchLevel; int8_t triggerStat; - uint16_t nScanned; + uint32_t nScanned; int32_t streamFlushed : 1; int32_t maxDelay : 31; // ms tmr_h tmrId; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index ada7c9d6b7..3884b1df7a 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -331,7 +331,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId); - smaInfo("vgId:%d, open task:%p table:%" PRIi64 " level:%" PRIi8 ", checkpointId:%" PRIi64 ", maxdelay:%" PRIi64 + smaInfo("vgId:%d, open rsma task:%p table:%" PRIi64 " level:%" PRIi8 ", checkpointId:%" PRIi64 ", maxdelay:%" PRIi64 " watermark:%" PRIi64 ", finally maxdelay:%" PRIi32, TD_VID(pVnode), pItem->pStreamTask, pRSmaInfo->suid, (int8_t)(idx + 1), pStreamTask->chkInfo.checkpointId, param->maxdelay[idx], param->watermark[idx], pItem->maxDelay); @@ -1161,7 +1161,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { if (streamFlushed) { pRSmaInfo->items[i].streamFlushed = 1; if (++nStreamFlushed >= nTaskInfo) { - smaInfo("vgId:%d rsma commit, checkpoint ready, %d us consumed, received/total: %d/%d", TD_VID(pVnode), + smaInfo("vgId:%d, rsma commit, checkpoint ready, %d us consumed, received/total: %d/%d", TD_VID(pVnode), nSleep * 10, nStreamFlushed, nTaskInfo); taosHashCancelIterate(pInfoHash, infoHash); goto _checkpoint; @@ -1292,20 +1292,14 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { } int8_t fetchTriggerStat = - atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE); + atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_ACTIVE); switch (fetchTriggerStat) { case TASK_TRIGGER_STAT_ACTIVE: { smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active", SMA_VID(pSma), pItem->level, pRSmaInfo->suid); // async process - pItem->fetchLevel = pItem->level; -#if 0 - // debugging codes - SRSmaInfo *qInfo = tdAcquireRSmaInfoBySuid(pSma, pRSmaInfo->suid); - SRSmaInfoItem *qItem = RSMA_INFO_ITEM(qInfo, pItem->level - 1); - make sure(qItem->level == pItem->level); - make sure(qItem->fetchLevel == pItem->fetchLevel); -#endif + atomic_store_8(&pItem->fetchLevel, 1); + if (atomic_load_8(&pRSmaInfo->assigned) == 0) { tsem_post(&(pStat->notEmpty)); } @@ -1351,13 +1345,14 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) { SArray *pResList = NULL; for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) { SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1); - if (pItem->fetchLevel) { - pItem->fetchLevel = 0; + + if (1 == atomic_val_compare_exchange_8(&pItem->fetchLevel, 1, 0)) { qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, i - 1); if (!taskInfo) { continue; } +#if 0 if ((++pItem->nScanned * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) { smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi16 " maxDelay:%d, fetch executed", SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay); @@ -1375,6 +1370,7 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) { } pItem->nScanned = 0; +#endif if ((terrno = qSetSMAInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { goto _err; @@ -1509,12 +1505,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { TSDB_CHECK_CODE(code, lino, _exit); } - int8_t curStat = atomic_load_8(RSMA_COMMIT_STAT(pRSmaStat)); - if (curStat == 1) { - smaDebug("vgId:%d, fetch all not exec as commit stat is %" PRIi8, SMA_VID(pSma), curStat); - } else { - tdRSmaFetchAllResult(pSma, pInfo); - } + tdRSmaFetchAllResult(pSma, pInfo); if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) { atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0); diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index cc77474e79..69b19f4bc5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -661,6 +661,9 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0))) goto _exit; pTbData->minKey = TMIN(pTbData->minKey, key.ts); lRow = tRow; + tsdbDebug("vgId:%d, %s, insert col row[%d] with ts:%" PRIi64 ", ver:%" PRIi64 ", uid:%" PRIi64, + TD_VID(pMemTable->pTsdb->pVnode), pMemTable->pTsdb->path, tRow.iRow, tRow.pTSRow->ts, tRow.version, + pSubmitTbData->uid); // remain row ++tRow.iRow; @@ -680,6 +683,9 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, lRow = tRow; ++tRow.iRow; + tsdbDebug("vgId:%d, %s, insert col row[%d] with ts:%" PRIi64 ", ver:%" PRIi64 ", uid:%" PRIi64, + TD_VID(pMemTable->pTsdb->pVnode), pMemTable->pTsdb->path, tRow.iRow, tRow.pTSRow->ts, tRow.version, + pSubmitTbData->uid); } } @@ -721,6 +727,9 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0); if (code) goto _exit; lRow = tRow; + tsdbDebug("vgId:%d, %s, insert row[%d] with ts:%" PRIi64 ", ver:%" PRIi64 ", uid:%" PRIi64, + TD_VID(pMemTable->pTsdb->pVnode), pMemTable->pTsdb->path, iRow, tRow.pTSRow->ts, tRow.version, + pSubmitTbData->uid); pTbData->minKey = TMIN(pTbData->minKey, key.ts); @@ -744,6 +753,9 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, lRow = tRow; iRow++; + tsdbDebug("vgId:%d, %s, insert row[%d] with ts:%" PRIi64 ", ver:%" PRIi64 ", uid:%" PRIi64, + TD_VID(pMemTable->pTsdb->pVnode), pMemTable->pTsdb->path, iRow, tRow.pTSRow->ts, tRow.version, + pSubmitTbData->uid); } }