recover flush mark

This commit is contained in:
liuyao 2023-11-07 14:11:08 +08:00
parent f5d796a081
commit e972ab16fe
1 changed files with 30 additions and 19 deletions

View File

@ -27,6 +27,8 @@
#define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024) #define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024)
#define MIN_NUM_OF_ROW_BUFF 10240 #define MIN_NUM_OF_ROW_BUFF 10240
#define TASK_KEY "streamFileState"
struct SStreamFileState { struct SStreamFileState {
SList* usedBuffs; SList* usedBuffs;
SList* freeBuffs; SList* freeBuffs;
@ -113,6 +115,15 @@ void* sessionCreateStateKey(SRowBuffPos* pPos, int64_t num) {
return pStateKey; return pStateKey;
} }
static void streamFileStateDecode(TSKEY* pKey, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, pKey); }
static void streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) {
*pLen = sizeof(TSKEY);
(*pVal) = taosMemoryCalloc(1, *pLen);
void* buff = *pVal;
taosEncodeFixedI64(&buff, *pKey);
}
SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize,
GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId, GetTsFun fp, void* pFile, TSKEY delMark, const char* taskId, int64_t checkpointId,
int8_t type) { int8_t type) {
@ -181,6 +192,17 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_
recoverSesssion(pFileState, checkpointId); recoverSesssion(pFileState, checkpointId);
} }
char keyBuf[128] = {0};
void* valBuf = NULL;
int32_t len = 0;
sprintf(keyBuf, "%s:%" PRId64 "", TASK_KEY, ((SStreamState*)pFileState->pFileStore)->checkPointId);
int32_t code = streamDefaultGet_rocksdb(pFileState->pFileStore, keyBuf, &valBuf, &len);
if (code == TSDB_CODE_SUCCESS) {
ASSERT(len == sizeof(TSKEY));
streamFileStateDecode(&pFileState->flushMark, valBuf, len);
qDebug("===stream===flushMark read:%" PRId64 ",checkpointid:%" PRId64, pFileState->flushMark, ((SStreamState*)pFileState->pFileStore)->checkPointId);
}
return pFileState; return pFileState;
_error: _error:
@ -506,15 +528,6 @@ SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) {
return pFileState->usedBuffs; return pFileState->usedBuffs;
} }
void streamFileStateDecode(TSKEY* pKey, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, pKey); }
void streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) {
*pLen = sizeof(TSKEY);
(*pVal) = taosMemoryCalloc(1, *pLen);
void* buff = *pVal;
taosEncodeFixedI64(&buff, *pKey);
}
static void getDebugRowBuff(char* val, int32_t vlen, char* output) { static void getDebugRowBuff(char* val, int32_t vlen, char* output) {
for (int32_t i = 0; i < vlen; ++i) { for (int32_t i = 0; i < vlen; ++i) {
if (*(val + i) == '\0') { if (*(val + i) == '\0') {
@ -550,6 +563,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
continue; continue;
} }
pPos->beFlushed = true; pPos->beFlushed = true;
pFileState->flushMark = TMAX(pFileState->flushMark, pFileState->getTs(pPos->pKey));
qDebug("===stream===flushed start:%" PRId64, pFileState->getTs(pPos->pKey)); qDebug("===stream===flushed start:%" PRId64, pFileState->getTs(pPos->pKey));
if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) { if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
@ -586,13 +600,13 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
pFileState->id, numOfElems, BATCH_LIMIT, elapsed); pFileState->id, numOfElems, BATCH_LIMIT, elapsed);
if (flushState) { if (flushState) {
const char* taskKey = "streamFileState";
{ {
char keyBuf[128] = {0}; char keyBuf[128] = {0};
void* valBuf = NULL; void* valBuf = NULL;
int32_t len = 0; int32_t len = 0;
sprintf(keyBuf, "%s:%" PRId64 "", taskKey, ((SStreamState*)pFileState->pFileStore)->checkPointId); sprintf(keyBuf, "%s:%" PRId64 "", TASK_KEY, ((SStreamState*)pFileState->pFileStore)->checkPointId);
streamFileStateEncode(&pFileState->flushMark, &valBuf, &len); streamFileStateEncode(&pFileState->flushMark, &valBuf, &len);
qDebug("===stream===flushMark write:%" PRId64 ",checkpoint id:%" PRId64, pFileState->flushMark, ((SStreamState*)pFileState->pFileStore)->checkPointId);
streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0); streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0);
taosMemoryFree(valBuf); taosMemoryFree(valBuf);
} }
@ -600,7 +614,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
char keyBuf[128] = {0}; char keyBuf[128] = {0};
char valBuf[64] = {0}; char valBuf[64] = {0};
int32_t len = 0; int32_t len = 0;
memcpy(keyBuf, taskKey, strlen(taskKey)); memcpy(keyBuf, TASK_KEY, strlen(TASK_KEY));
len = sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId); len = sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId);
code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0); code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len, 0);
} }
@ -612,26 +626,23 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
} }
int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) { int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) {
const char* taskKey = "streamFileState";
char keyBuf[128] = {0}; char keyBuf[128] = {0};
sprintf(keyBuf, "%s:%" PRId64 "", taskKey, checkpointId); sprintf(keyBuf, "%s:%" PRId64 "", TASK_KEY, checkpointId);
return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf); return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf);
} }
int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list) { int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list) {
const char* taskKey = "streamFileState"; return streamDefaultIterGet_rocksdb(pFileState->pFileStore, TASK_KEY, NULL, list);
return streamDefaultIterGet_rocksdb(pFileState->pFileStore, taskKey, NULL, list);
} }
int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
const char* taskKey = "streamFileState";
int64_t maxCheckPointId = 0; int64_t maxCheckPointId = 0;
{ {
char buf[128] = {0}; char buf[128] = {0};
void* val = NULL; void* val = NULL;
int32_t len = 0; int32_t len = 0;
memcpy(buf, taskKey, strlen(taskKey)); memcpy(buf, TASK_KEY, strlen(TASK_KEY));
code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len); code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len);
if (code != 0 || len == 0 || val == NULL) { if (code != 0 || len == 0 || val == NULL) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
@ -645,7 +656,7 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) {
char buf[128] = {0}; char buf[128] = {0};
void* val = 0; void* val = 0;
int32_t len = 0; int32_t len = 0;
sprintf(buf, "%s:%" PRId64 "", taskKey, i); sprintf(buf, "%s:%" PRId64 "", TASK_KEY, i);
code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len); code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len);
if (code != 0) { if (code != 0) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;