opti:[TS-4593] transform offset from file to tdb in tmq

This commit is contained in:
wangmm0220 2024-08-05 16:48:45 +08:00
parent 666e813b5f
commit a450e8c1f5
3 changed files with 43 additions and 4 deletions

View File

@ -132,6 +132,7 @@ int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen);
int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle);
int32_t tqMetaDecodeCheckInfo(STqCheckInfo *info, void *pVal, int32_t vLen);
int32_t tqMetaDecodeOffsetInfo(STqOffset *info, void *pVal, int32_t vLen);
int32_t tqMetaSaveOffset(STQ* pTq, STqOffset* pOffset);
int32_t tqMetaGetHandle(STQ* pTq, const char* key, STqHandle** pHandle);
int32_t tqMetaGetOffset(STQ* pTq, const char* subkey, STqOffset** pOffset);
int32_t tqMetaTransform(STQ* pTq);

View File

@ -102,6 +102,37 @@ int32_t tqMetaDecodeOffsetInfo(STqOffset* info, void* pVal, int32_t vLen) {
return code;
}
int32_t tqMetaSaveOffset(STQ* pTq, STqOffset* pOffset) {
void* buf = NULL;
int32_t code = TDB_CODE_SUCCESS;
int32_t vlen;
SEncoder encoder;
tEncodeSize(tEncodeSTqOffset, pOffset, vlen, code);
if (code < 0) {
goto END;
}
buf = taosMemoryCalloc(1, vlen);
if (buf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto END;
}
tEncoderInit(&encoder, buf, vlen);
code = tEncodeSTqOffset(&encoder, pOffset);
if (code < 0) {
goto END;
}
TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), buf, vlen));
END:
tEncoderClear(&encoder);
taosMemoryFree(buf);
return code;
}
int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen, const void* value, int32_t vLen) {
int32_t code = TDB_CODE_SUCCESS;
TXN* txn = NULL;

View File

@ -44,14 +44,15 @@ int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) {
int64_t ret = 0;
int32_t size = 0;
int32_t total = 0;
while (1) {
if ((ret = taosReadFile(pFile, &size, INT_BYTES)) != INT_BYTES) {
if (ret != 0) {
code = TSDB_CODE_INVALID_MSG;
}
goto END;
break;
}
total += INT_BYTES;
size = htonl(size);
pMemBuf = taosMemoryCalloc(1, size);
if (pMemBuf == NULL) {
@ -64,6 +65,7 @@ int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) {
goto END;
}
total += size;
STqOffset offset = {0};
TQ_ERR_GO_TO_END(tqMetaDecodeOffsetInfo(&offset, pMemBuf, size));
code = taosHashPut(pTq->pOffset, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset));
@ -71,13 +73,18 @@ int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) {
tDeleteSTqOffset(&offset);
goto END;
}
TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pOffsetStore, offset.subKey, strlen(offset.subKey), pMemBuf, size));
tqInfo("tq: offset restore from file to tdb, subkey:%s", offset.subKey);
tqInfo("tq: offset restore from file to tdb, size:%d, hash size:%d subkey:%s", total, taosHashGetSize(pTq->pOffset), offset.subKey);
taosMemoryFree(pMemBuf);
pMemBuf = NULL;
}
void *pIter = NULL;
while ((pIter = taosHashIterate(pTq->pOffset, pIter))) {
STqOffset* pOffset = (STqOffset*)pIter;
tqMetaSaveOffset(pTq, pOffset);
}
END:
taosCloseFile(&pFile);
taosMemoryFree(pMemBuf);