chore: print stream state
This commit is contained in:
parent
2f4c732cfb
commit
f5d796a081
|
@ -678,7 +678,12 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
|
||||||
code = terrno ? terrno : TSDB_CODE_RSMA_RESULT;
|
code = terrno ? terrno : TSDB_CODE_RSMA_RESULT;
|
||||||
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
|
tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE);
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIi64
|
||||||
|
", ver:%" PRIi64,
|
||||||
|
SMA_VID(pSma), __func__, lino, tstrerror(code), suid, pItem->level, output ? output->info.id.uid : -1,
|
||||||
|
output ? output->info.version : -1);
|
||||||
|
continue;
|
||||||
|
// TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64,
|
smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64,
|
||||||
|
|
|
@ -515,6 +515,17 @@ void streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) {
|
||||||
taosEncodeFixedI64(&buff, *pKey);
|
taosEncodeFixedI64(&buff, *pKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void getDebugRowBuff(char* val, int32_t vlen, char* output) {
|
||||||
|
for (int32_t i = 0; i < vlen; ++i) {
|
||||||
|
if (*(val + i) == '\0') {
|
||||||
|
sprintf(output + i, "0");
|
||||||
|
} else {
|
||||||
|
sprintf(output + i, "%c", *(val + i));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
output[vlen] = 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) {
|
int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SListIter iter = {0};
|
SListIter iter = {0};
|
||||||
|
@ -530,6 +541,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
|
||||||
|
|
||||||
int32_t len = pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 1;
|
int32_t len = pFileState->rowSize + sizeof(uint64_t) + sizeof(int32_t) + 1;
|
||||||
char* buf = taosMemoryCalloc(1, len);
|
char* buf = taosMemoryCalloc(1, len);
|
||||||
|
char output[1024];
|
||||||
|
|
||||||
void* batch = streamStateCreateBatch();
|
void* batch = streamStateCreateBatch();
|
||||||
while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) {
|
while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) {
|
||||||
|
@ -546,6 +558,15 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
|
||||||
}
|
}
|
||||||
|
|
||||||
void* pSKey = pFileState->stateBuffCreateStateKeyFn(pPos, ((SStreamState*)pFileState->pFileStore)->number);
|
void* pSKey = pFileState->stateBuffCreateStateKeyFn(pPos, ((SStreamState*)pFileState->pFileStore)->number);
|
||||||
|
#if 1
|
||||||
|
SStateKey* pStateKey = pSKey;
|
||||||
|
char* pStateVal = pPos->pRowBuff;
|
||||||
|
int32_t pStateVLen = pFileState->rowSize;
|
||||||
|
assert(pStateVLen < 1024);
|
||||||
|
getDebugRowBuff(pStateVal, pStateVLen, output);
|
||||||
|
qDebug("%s:%d key:[%" PRIu64 ",%" PRIi64 ",%" PRIi64 "] vlen:%d, val:%s", __func__, __LINE__, pStateKey->key.groupId,
|
||||||
|
pStateKey->key.ts, pStateKey->opNum, pStateVLen, output);
|
||||||
|
#endif
|
||||||
code = streamStatePutBatchOptimize(pFileState->pFileStore, idx, batch, pSKey, pPos->pRowBuff, pFileState->rowSize,
|
code = streamStatePutBatchOptimize(pFileState->pFileStore, idx, batch, pSKey, pPos->pRowBuff, pFileState->rowSize,
|
||||||
0, buf);
|
0, buf);
|
||||||
taosMemoryFreeClear(pSKey);
|
taosMemoryFreeClear(pSKey);
|
||||||
|
@ -691,6 +712,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
|
||||||
if (pCur == NULL) {
|
if (pCur == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
char output[1024];
|
||||||
int32_t recoverNum = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount);
|
int32_t recoverNum = TMIN(MIN_NUM_OF_ROW_BUFF, pFileState->maxRowCount);
|
||||||
while (code == TSDB_CODE_SUCCESS) {
|
while (code == TSDB_CODE_SUCCESS) {
|
||||||
if (pFileState->curRowCount >= recoverNum) {
|
if (pFileState->curRowCount >= recoverNum) {
|
||||||
|
@ -710,6 +732,17 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
|
||||||
}
|
}
|
||||||
ASSERT(vlen == pFileState->rowSize);
|
ASSERT(vlen == pFileState->rowSize);
|
||||||
memcpy(pNewPos->pRowBuff, pVal, vlen);
|
memcpy(pNewPos->pRowBuff, pVal, vlen);
|
||||||
|
|
||||||
|
#if 1
|
||||||
|
SStateKey* pStateKey = pNewPos->pKey;
|
||||||
|
char* pStateVal = pVal;
|
||||||
|
int32_t pStateVLen = vlen;
|
||||||
|
assert(pStateVLen < 1024);
|
||||||
|
getDebugRowBuff(pStateVal, pStateVLen, output);
|
||||||
|
qDebug("%s:%d key:[%" PRIu64 ",%" PRIi64 ",%" PRIi64 "] vlen:%d, val:%s", __func__, __LINE__, pStateKey->key.groupId,
|
||||||
|
pStateKey->key.ts, pStateKey->opNum, pStateVLen, output);
|
||||||
|
#endif
|
||||||
|
|
||||||
taosMemoryFreeClear(pVal);
|
taosMemoryFreeClear(pVal);
|
||||||
pNewPos->beFlushed = true;
|
pNewPos->beFlushed = true;
|
||||||
code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
|
code = tSimpleHashPut(pFileState->rowStateBuff, pNewPos->pKey, pFileState->keyLen, &pNewPos, POINTER_BYTES);
|
||||||
|
|
Loading…
Reference in New Issue