diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 597d1bd4f2..71a3dc6cc3 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -690,7 +690,7 @@ int stateKeyDecode(void* k, char* buf) { int stateKeyToString(void* k, char* buf) { SStateKey* key = k; int n = 0; - n += sprintf(buf + n, "[groupId:%" PRId64 ",", key->key.groupId); + n += sprintf(buf + n, "[groupId:%" PRIu64 ",", key->key.groupId); n += sprintf(buf + n, "ts:%" PRIi64 ",", key->key.ts); n += sprintf(buf + n, "opNum:%" PRIi64 "]", key->opNum); return n; @@ -1630,6 +1630,12 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK char buf[128] = {0}; int32_t klen = stateKeyEncode((void*)&maxStateKey, buf); + { + char tbuf[256] = {0}; + stateKeyToString((void*)&maxStateKey, tbuf); + qDebug("seek to last:%s", tbuf); + } + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); if (pCur == NULL) return NULL; @@ -2408,14 +2414,18 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb char* ttlV = tmpBuf; int32_t ttlVLen = ginitDict[cfIdx].enValueFunc(val, vlen, ttl, &ttlV); - SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; - + SBackendCfWrapper* wrapper = pState->pTdbState->pBackendCfWrapper; rocksdb_column_family_handle_t* pCf = wrapper->pHandle[ginitDict[cfIdx].idx]; rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); if (tmpBuf == NULL) { taosMemoryFree(ttlV); } + { + char tbuf[256] = {0}; + ginitDict[cfIdx].toStrFunc((void*)key, tbuf); + qDebug("stream state: %s succ to state", tbuf); + } return 0; } int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch) {