Merge branch 'enh/TD-23769-3.0x' of github.com:taosdata/TDengine into enh/TD-23769-3.0x

This commit is contained in:
kailixu 2023-11-07 15:46:33 +08:00
commit 5f4aa0c445
3 changed files with 9 additions and 26 deletions

View File

@ -168,7 +168,6 @@ typedef struct {
struct SStreamFileState *pFileState; struct SStreamFileState *pFileState;
int32_t number; int32_t number;
SSHashObj *parNameMap; SSHashObj *parNameMap;
int64_t checkPointId;
int32_t taskId; int32_t taskId;
int64_t streamId; int64_t streamId;
int64_t streamBackendRid; int64_t streamBackendRid;

View File

@ -221,7 +221,6 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
} }
pState->pTdbState->pOwner = pTask; pState->pTdbState->pOwner = pTask;
pState->checkPointId = 0;
return pState; return pState;
@ -274,7 +273,6 @@ int32_t streamStateCommit(SStreamState* pState) {
SStreamSnapshot* pShot = getSnapshot(pState->pFileState); SStreamSnapshot* pShot = getSnapshot(pState->pFileState);
flushSnapshot(pState->pFileState, pShot, true); flushSnapshot(pState->pFileState, pShot, true);
} }
pState->checkPointId++;
return 0; return 0;
#else #else
if (tdbCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) { if (tdbCommit(pState->pTdbState->db, pState->pTdbState->txn) < 0) {
@ -288,7 +286,6 @@ int32_t streamStateCommit(SStreamState* pState) {
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
return -1; return -1;
} }
pState->checkPointId++;
return 0; return 0;
#endif #endif
} }

View File

@ -28,6 +28,7 @@
#define MIN_NUM_OF_ROW_BUFF 10240 #define MIN_NUM_OF_ROW_BUFF 10240
#define TASK_KEY "streamFileState" #define TASK_KEY "streamFileState"
#define STREAM_STATE_INFO_NAME "StreamStateCheckPoint"
struct SStreamFileState { struct SStreamFileState {
SList* usedBuffs; SList* usedBuffs;
@ -192,15 +193,13 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
recoverSesssion(pFileState, checkpointId); recoverSesssion(pFileState, checkpointId);
} }
char keyBuf[128] = {0};
void* valBuf = NULL; void* valBuf = NULL;
int32_t len = 0; int32_t len = 0;
sprintf(keyBuf, "%s:%" PRId64 "", TASK_KEY, ((SStreamState*)pFileState->pFileStore)->checkPointId); int32_t code = streamDefaultGet_rocksdb(pFileState->pFileStore, STREAM_STATE_INFO_NAME, &valBuf, &len);
int32_t code = streamDefaultGet_rocksdb(pFileState->pFileStore, keyBuf, &valBuf, &len);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
ASSERT(len == sizeof(TSKEY)); ASSERT(len == sizeof(TSKEY));
streamFileStateDecode(&pFileState->flushMark, valBuf, len); streamFileStateDecode(&pFileState->flushMark, valBuf, len);
qDebug("===stream===flushMark read:%" PRId64 ",checkpointid:%" PRId64, pFileState->flushMark, ((SStreamState*)pFileState->pFileStore)->checkPointId); qDebug("===stream===flushMark read:%" PRId64, pFileState->flushMark);
} }
return pFileState; return pFileState;
@ -600,24 +599,12 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
pFileState->id, numOfElems, BATCH_LIMIT, elapsed); pFileState->id, numOfElems, BATCH_LIMIT, elapsed);
if (flushState) { if (flushState) {
{ void* valBuf = NULL;
char keyBuf[128] = {0}; int32_t len = 0;
void* valBuf = NULL; streamFileStateEncode(&pFileState->flushMark, &valBuf, &len);
int32_t len = 0; qDebug("===stream===flushMark write:%" PRId64, pFileState->flushMark);
sprintf(keyBuf, "%s:%" PRId64 "", TASK_KEY, ((SStreamState*)pFileState->pFileStore)->checkPointId); streamStatePutBatch(pFileState->pFileStore, "default", batch, STREAM_STATE_INFO_NAME, valBuf, len, 0);
streamFileStateEncode(&pFileState->flushMark, &valBuf, &len); taosMemoryFree(valBuf);
qDebug("===stream===flushMark write:%" PRId64 ",checkpoint id:%" PRId64, pFileState->flushMark, ((SStreamState*)pFileState->pFileStore)->checkPointId);
streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0);
taosMemoryFree(valBuf);
}
{
char keyBuf[128] = {0};
char valBuf[64] = {0};
int32_t len = 0;
memcpy(keyBuf, TASK_KEY, strlen(TASK_KEY));
len = sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId);
code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0);
}
streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
} }