From f5d796a081bdcb73eb020cdb2cc0703112a5d8bd Mon Sep 17 00:00:00 2001 From: kailixu Date: Mon, 6 Nov 2023 19:53:09 +0800 Subject: [PATCH] chore: print stream state --- source/dnode/vnode/src/sma/smaRollup.c | 7 ++++- source/libs/stream/src/tstreamFileState.c | 33 +++++++++++++++++++++++ 2 files changed, 39 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 3884b1df7a..ac99dc9de3 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -678,7 +678,12 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma code = terrno ? terrno : TSDB_CODE_RSMA_RESULT; tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE); taosMemoryFree(pReq); - TSDB_CHECK_CODE(code, lino, _exit); + smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIi64 + ", ver:%" PRIi64, + SMA_VID(pSma), __func__, lino, tstrerror(code), suid, pItem->level, output ? output->info.id.uid : -1, + output ? output->info.version : -1); + continue; + // TSDB_CHECK_CODE(code, lino, _exit); } smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64, diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 584e81fafc..a597858e63 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -515,6 +515,17 @@ void streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) { taosEncodeFixedI64(&buff, *pKey); } +static void getDebugRowBuff(char* val, int32_t vlen, char* output) { + for (int32_t i = 0; i < vlen; ++i) { + if (*(val + i) == '\0') { + sprintf(output + i, "0"); + } else { + sprintf(output + i, "%c", *(val + i)); + } + } + output[vlen] = 0; +} + int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) { int32_t code = TSDB_CODE_SUCCESS; SListIter iter = {0}; @@ -530,6 +541,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, int32_t len = pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 1; char* buf = taosMemoryCalloc(1, len); + char output[1024]; void* batch = streamStateCreateBatch(); while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) { @@ -546,6 +558,15 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, } void* pSKey = pFileState->stateBuffCreateStateKeyFn(pPos, ((SStreamState*)pFileState->pFileStore)->number); +#if 1 + SStateKey* pStateKey = pSKey; + char* pStateVal = pPos->pRowBuff; + int32_t pStateVLen = pFileState->rowSize; + assert(pStateVLen < 1024); + getDebugRowBuff(pStateVal, pStateVLen, output); + qDebug("%s:%d key:[%" PRIu64 ",%" PRIi64 ",%" PRIi64 "] vlen:%d, val:%s", __func__, __LINE__, pStateKey->key.groupId, + pStateKey->key.ts, pStateKey->opNum, pStateVLen, output); +#endif code = streamStatePutBatchOptimize(pFileState->pFileStore, idx, batch, pSKey, pPos->pRowBuff, pFileState->rowSize, 0, buf); taosMemoryFreeClear(pSKey); @@ -691,6 +712,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { if (pCur == NULL) { return -1; } + char output[1024]; int32_t recoverNum = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount); while (code == TSDB_CODE_SUCCESS) { if (pFileState->curRowCount >= recoverNum) { @@ -710,6 +732,17 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { } ASSERT(vlen == pFileState->rowSize); memcpy(pNewPos->pRowBuff, pVal, vlen); + +#if 1 + SStateKey* pStateKey = pNewPos->pKey; + char* pStateVal = pVal; + int32_t pStateVLen = vlen; + assert(pStateVLen < 1024); + getDebugRowBuff(pStateVal, pStateVLen, output); + qDebug("%s:%d key:[%" PRIu64 ",%" PRIi64 ",%" PRIi64 "] vlen:%d, val:%s", __func__, __LINE__, pStateKey->key.groupId, + pStateKey->key.ts, pStateKey->opNum, pStateVLen, output); +#endif + taosMemoryFreeClear(pVal); pNewPos->beFlushed = true; code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);