fix(stream): add more logs.

This commit is contained in:
Haojun Liao 2023-06-08 17:54:22 +08:00
parent 6c13491428
commit 6726edae0c
2 changed files with 4 additions and 2 deletions

View File

@ -412,7 +412,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
if (k == 0) {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
void* colData = colDataGetData(pColData, j);
tqDebug("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData);
tqTrace("tq sink pipe2, row %d, col %d ts %" PRId64, j, k, *(int64_t*)colData);
}
if (IS_SET_NULL(pCol)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);

View File

@ -361,11 +361,13 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
SStateKey sKey = {.key = *((SWinKey*)pPos->pKey), .opNum = ((SStreamState*)pFileState->pFileStore)->number};
code = streamStatePutBatch(pFileState->pFileStore, "state", batch, &sKey, pPos->pRowBuff, pFileState->rowSize, 0);
qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code);
qDebug("===stream===put %" PRId64 " to disc, code:%d, size:%d", sKey.key.ts, code, pFileState->rowSize);
}
if (streamStateGetBatchSize(batch) > 0) {
code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
}
streamStateClearBatch(batch);
if (flushState) {