factor code

This commit is contained in:
yihaoDeng 2023-05-11 13:34:17 +00:00
parent 6cf1adf1b1
commit eb3ab4fcea
4 changed files with 10 additions and 18 deletions

View File

@ -110,14 +110,6 @@ int32_t streamStateGetParName_rocksdb(SStreamState* pState, int64_t groupId, voi
void streamStateDestroy_rocksdb(SStreamState* pState, bool remove); void streamStateDestroy_rocksdb(SStreamState* pState, bool remove);
void* streamStateCreateBatch();
int32_t streamStateGetBatchSize(void* pBatch);
void streamStateClearBatch(void* pBatch);
void streamStateDestroyBatch(void* pBatch);
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen);
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
// default cf // default cf
int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen); int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen);
int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen); int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen);
@ -136,7 +128,7 @@ int32_t streamStateGetBatchSize(void* pBatch);
void streamStateClearBatch(void* pBatch); void streamStateClearBatch(void* pBatch);
void streamStateDestroyBatch(void* pBatch); void streamStateDestroyBatch(void* pBatch);
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen); void* val, int32_t vlen, int64_t ttl);
int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch);
// int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); // int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result);
#endif #endif

View File

@ -1979,7 +1979,7 @@ int32_t streamStateGetBatchSize(void* pBatch) {
void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); } void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_writebatch_t*)pBatch); }
void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); } void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); }
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key, int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen) { void* val, int32_t vlen, int64_t ttl) {
int i = streamGetInit(cfName); int i = streamGetInit(cfName);
if (i < 0) { if (i < 0) {
@ -1990,7 +1990,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_wr
int32_t klen = ginitDict[i].enFunc((void*)key, buf); int32_t klen = ginitDict[i].enFunc((void*)key, buf);
char* ttlV = NULL; char* ttlV = NULL;
int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, 0, &ttlV); int32_t ttlVLen = ginitDict[i].enValueFunc(val, vlen, ttl, &ttlV);
rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx]; rocksdb_column_family_handle_t* pCf = pState->pTdbState->pHandle[ginitDict[i].idx];
rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen); rocksdb_writebatch_put_cf((rocksdb_writebatch_t*)pBatch, pCf, buf, (size_t)klen, ttlV, (size_t)ttlVLen);
taosMemoryFree(ttlV); taosMemoryFree(ttlV);

View File

@ -119,7 +119,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
pState->taskId = pTask->id.taskId; pState->taskId = pTask->id.taskId;
pState->streamId = pTask->id.streamId; pState->streamId = pTask->id.streamId;
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
qWarn("open stream state1"); // qWarn("open stream state1");
taosAcquireRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid); taosAcquireRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
int code = streamStateOpenBackend(pTask->pMeta->streamBackend, pState); int code = streamStateOpenBackend(pTask->pMeta->streamBackend, pState);
if (code == -1) { if (code == -1) {
@ -127,7 +127,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
taosMemoryFree(pState); taosMemoryFree(pState);
pState = NULL; pState = NULL;
} }
qWarn("open stream state2, %s", statePath); // qWarn("open stream state2, %s", statePath);
pState->pTdbState->pOwner = pTask; pState->pTdbState->pOwner = pTask;
pState->pFileState = NULL; pState->pFileState = NULL;
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT);
@ -225,6 +225,7 @@ void streamStateClose(SStreamState* pState, bool remove) {
#ifdef USE_ROCKSDB #ifdef USE_ROCKSDB
// streamStateCloseBackend(pState); // streamStateCloseBackend(pState);
streamStateDestroy(pState, remove); streamStateDestroy(pState, remove);
taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
#else #else
tdbCommit(pState->pTdbState->db, pState->pTdbState->txn); tdbCommit(pState->pTdbState->db, pState->pTdbState->txn);
tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn); tdbPostCommit(pState->pTdbState->db, pState->pTdbState->txn);
@ -236,7 +237,6 @@ void streamStateClose(SStreamState* pState, bool remove) {
tdbTbClose(pState->pTdbState->pParTagDb); tdbTbClose(pState->pTdbState->pParTagDb);
tdbClose(pState->pTdbState->db); tdbClose(pState->pTdbState->db);
#endif #endif
taosReleaseRef(pTask->pMeta->streamBackendId, pTask->pMeta->streamBackendRid);
} }
int32_t streamStateBegin(SStreamState* pState) { int32_t streamStateBegin(SStreamState* pState) {
@ -404,7 +404,7 @@ int32_t streamStateSaveInfo(SStreamState* pState, void* pKey, int32_t keyLen, vo
int32_t code = 0; int32_t code = 0;
void* batch = streamStateCreateBatch(); void* batch = streamStateCreateBatch();
code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen); code = streamStatePutBatch(pState, "default", batch, pKey, pVal, vLen, 0);
if (code != 0) { if (code != 0) {
return code; return code;
} }

View File

@ -355,7 +355,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
} }
SStateKey sKey = {.key = *((SWinKey*)pPos->pKey), .opNum = ((SStreamState*)pFileState->pFileStore)->number}; SStateKey sKey = {.key = *((SWinKey*)pPos->pKey), .opNum = ((SStreamState*)pFileState->pFileStore)->number};
code = streamStatePutBatch(pFileState->pFileStore, "state", batch, &sKey, pPos->pRowBuff, pFileState->rowSize); code = streamStatePutBatch(pFileState->pFileStore, "state", batch, &sKey, pPos->pRowBuff, pFileState->rowSize, 0);
qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code); qDebug("===stream===put %" PRId64 " to disc, res %d", sKey.key.ts, code);
} }
if (streamStateGetBatchSize(batch) > 0) { if (streamStateGetBatchSize(batch) > 0) {
@ -371,7 +371,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
int32_t len = 0; int32_t len = 0;
sprintf(keyBuf, "%s:%" PRId64 "", taskKey, ((SStreamState*)pFileState->pFileStore)->checkPointId); sprintf(keyBuf, "%s:%" PRId64 "", taskKey, ((SStreamState*)pFileState->pFileStore)->checkPointId);
streamFileStateEncode(&pFileState->flushMark, &valBuf, &len); streamFileStateEncode(&pFileState->flushMark, &valBuf, &len);
code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len); code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0);
taosMemoryFree(valBuf); taosMemoryFree(valBuf);
} }
{ {
@ -380,7 +380,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
int32_t len = 0; int32_t len = 0;
memcpy(keyBuf, taskKey, strlen(taskKey)); memcpy(keyBuf, taskKey, strlen(taskKey));
len = sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId); len = sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId);
code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len); code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0);
} }
streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
} }