chore: add debug info

This commit is contained in:
kailixu 2023-11-06 17:27:36 +08:00
parent 7e8c123fca
commit dc5284a19c
3 changed files with 25 additions and 22 deletions

View File

@ -137,10 +137,10 @@ struct SSmaStat {
#define RSMA_FS_LOCK(r) (&(r)->lock) #define RSMA_FS_LOCK(r) (&(r)->lock)
struct SRSmaInfoItem { struct SRSmaInfoItem {
int8_t level : 4; int8_t level;
int8_t fetchLevel : 4; int8_t fetchLevel;
int8_t triggerStat; int8_t triggerStat;
uint16_t nScanned; uint32_t nScanned;
int32_t streamFlushed : 1; int32_t streamFlushed : 1;
int32_t maxDelay : 31; // ms int32_t maxDelay : 31; // ms
tmr_h tmrId; tmr_h tmrId;

View File

@ -331,7 +331,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId); taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId);
smaInfo("vgId:%d, open task:%p table:%" PRIi64 " level:%" PRIi8 ", checkpointId:%" PRIi64 ", maxdelay:%" PRIi64 smaInfo("vgId:%d, open rsma task:%p table:%" PRIi64 " level:%" PRIi8 ", checkpointId:%" PRIi64 ", maxdelay:%" PRIi64
" watermark:%" PRIi64 ", finally maxdelay:%" PRIi32, " watermark:%" PRIi64 ", finally maxdelay:%" PRIi32,
TD_VID(pVnode), pItem->pStreamTask, pRSmaInfo->suid, (int8_t)(idx + 1), pStreamTask->chkInfo.checkpointId, TD_VID(pVnode), pItem->pStreamTask, pRSmaInfo->suid, (int8_t)(idx + 1), pStreamTask->chkInfo.checkpointId,
param->maxdelay[idx], param->watermark[idx], pItem->maxDelay); param->maxdelay[idx], param->watermark[idx], pItem->maxDelay);
@ -1161,7 +1161,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
if (streamFlushed) { if (streamFlushed) {
pRSmaInfo->items[i].streamFlushed = 1; pRSmaInfo->items[i].streamFlushed = 1;
if (++nStreamFlushed >= nTaskInfo) { if (++nStreamFlushed >= nTaskInfo) {
smaInfo("vgId:%d rsma commit, checkpoint ready, %d us consumed, received/total: %d/%d", TD_VID(pVnode), smaInfo("vgId:%d, rsma commit, checkpoint ready, %d us consumed, received/total: %d/%d", TD_VID(pVnode),
nSleep * 10, nStreamFlushed, nTaskInfo); nSleep * 10, nStreamFlushed, nTaskInfo);
taosHashCancelIterate(pInfoHash, infoHash); taosHashCancelIterate(pInfoHash, infoHash);
goto _checkpoint; goto _checkpoint;
@ -1292,20 +1292,14 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) {
} }
int8_t fetchTriggerStat = int8_t fetchTriggerStat =
atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE); atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_ACTIVE);
switch (fetchTriggerStat) { switch (fetchTriggerStat) {
case TASK_TRIGGER_STAT_ACTIVE: { case TASK_TRIGGER_STAT_ACTIVE: {
smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active", smaDebug("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64 " since stat is active",
SMA_VID(pSma), pItem->level, pRSmaInfo->suid); SMA_VID(pSma), pItem->level, pRSmaInfo->suid);
// async process // async process
pItem->fetchLevel = pItem->level; atomic_store_8(&pItem->fetchLevel, 1);
#if 0
// debugging codes
SRSmaInfo *qInfo = tdAcquireRSmaInfoBySuid(pSma, pRSmaInfo->suid);
SRSmaInfoItem *qItem = RSMA_INFO_ITEM(qInfo, pItem->level - 1);
make sure(qItem->level == pItem->level);
make sure(qItem->fetchLevel == pItem->fetchLevel);
#endif
if (atomic_load_8(&pRSmaInfo->assigned) == 0) { if (atomic_load_8(&pRSmaInfo->assigned) == 0) {
tsem_post(&(pStat->notEmpty)); tsem_post(&(pStat->notEmpty));
} }
@ -1351,13 +1345,14 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
SArray *pResList = NULL; SArray *pResList = NULL;
for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) { for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1); SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1);
if (pItem->fetchLevel) {
pItem->fetchLevel = 0; if (1 == atomic_val_compare_exchange_8(&pItem->fetchLevel, 1, 0)) {
qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, i - 1); qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, i - 1);
if (!taskInfo) { if (!taskInfo) {
continue; continue;
} }
#if 0
if ((++pItem->nScanned * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) { if ((++pItem->nScanned * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) {
smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi16 " maxDelay:%d, fetch executed", smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi16 " maxDelay:%d, fetch executed",
SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay); SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay);
@ -1375,6 +1370,7 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) {
} }
pItem->nScanned = 0; pItem->nScanned = 0;
#endif
if ((terrno = qSetSMAInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { if ((terrno = qSetSMAInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) {
goto _err; goto _err;
@ -1509,12 +1505,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
int8_t curStat = atomic_load_8(RSMA_COMMIT_STAT(pRSmaStat)); tdRSmaFetchAllResult(pSma, pInfo);
if (curStat == 1) {
smaDebug("vgId:%d, fetch all not exec as commit stat is %" PRIi8, SMA_VID(pSma), curStat);
} else {
tdRSmaFetchAllResult(pSma, pInfo);
}
if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) { if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) {
atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0); atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0);

View File

@ -661,6 +661,9 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0))) goto _exit; if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0))) goto _exit;
pTbData->minKey = TMIN(pTbData->minKey, key.ts); pTbData->minKey = TMIN(pTbData->minKey, key.ts);
lRow = tRow; lRow = tRow;
tsdbDebug("vgId:%d, %s, insert col row[%d] with ts:%" PRIi64 ", ver:%" PRIi64 ", uid:%" PRIi64,
TD_VID(pMemTable->pTsdb->pVnode), pMemTable->pTsdb->path, tRow.iRow, tRow.pTSRow->ts, tRow.version,
pSubmitTbData->uid);
// remain row // remain row
++tRow.iRow; ++tRow.iRow;
@ -680,6 +683,9 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData,
lRow = tRow; lRow = tRow;
++tRow.iRow; ++tRow.iRow;
tsdbDebug("vgId:%d, %s, insert col row[%d] with ts:%" PRIi64 ", ver:%" PRIi64 ", uid:%" PRIi64,
TD_VID(pMemTable->pTsdb->pVnode), pMemTable->pTsdb->path, tRow.iRow, tRow.pTSRow->ts, tRow.version,
pSubmitTbData->uid);
} }
} }
@ -721,6 +727,9 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0); code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0);
if (code) goto _exit; if (code) goto _exit;
lRow = tRow; lRow = tRow;
tsdbDebug("vgId:%d, %s, insert row[%d] with ts:%" PRIi64 ", ver:%" PRIi64 ", uid:%" PRIi64,
TD_VID(pMemTable->pTsdb->pVnode), pMemTable->pTsdb->path, iRow, tRow.pTSRow->ts, tRow.version,
pSubmitTbData->uid);
pTbData->minKey = TMIN(pTbData->minKey, key.ts); pTbData->minKey = TMIN(pTbData->minKey, key.ts);
@ -744,6 +753,9 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData,
lRow = tRow; lRow = tRow;
iRow++; iRow++;
tsdbDebug("vgId:%d, %s, insert row[%d] with ts:%" PRIi64 ", ver:%" PRIi64 ", uid:%" PRIi64,
TD_VID(pMemTable->pTsdb->pVnode), pMemTable->pTsdb->path, iRow, tRow.pTSRow->ts, tRow.version,
pSubmitTbData->uid);
} }
} }