opti:[TS-4593] transform offset from file to tdb in tmq

This commit is contained in:
wangmm0220 2024-07-18 14:32:04 +08:00
parent 18ed8e2904
commit 1d8791a106
7 changed files with 40 additions and 30 deletions

View File

@ -3612,6 +3612,7 @@ typedef struct {
int32_t tEncodeSTqOffset(SEncoder* pEncoder, const STqOffset* pOffset);
int32_t tDecodeSTqOffset(SDecoder* pDecoder, STqOffset* pOffset);
void tDeleteSTqOffset(void* val);
typedef struct SMqVgOffset {
int64_t consumerId;

View File

@ -691,7 +691,7 @@ static void* monitorThreadFunc(void* param) {
tscDebug("monitorThreadFunc start");
int64_t quitTime = 0;
while (1) {
if (atomic_load_32(&slowLogFlag) > 0 > 0) {
if (atomic_load_32(&slowLogFlag) > 0) {
if (quitCnt == 0) {
monitorSendAllSlowLogAtQuit();
if (quitCnt == 0) {

View File

@ -9258,6 +9258,12 @@ void tOffsetDestroy(void *param) {
taosMemoryFreeClear(pVal->primaryKey.pData);
}
}
void tDeleteSTqOffset(void *param) {
STqOffset *pVal = (STqOffset *)param;
tOffsetDestroy(&pVal->val);
}
int32_t tEncodeSTqOffset(SEncoder *pEncoder, const STqOffset *pOffset) {
if (tEncodeSTqOffsetVal(pEncoder, &pOffset->val) < 0) return -1;
if (tEncodeCStr(pEncoder, pOffset->subKey) < 0) return -1;

View File

@ -131,6 +131,7 @@ int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen, const
int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen);
int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle);
int32_t tqMetaDecodeCheckInfo(STqCheckInfo *info, void *pVal, int32_t vLen);
int32_t tqMetaDecodeOffsetInfo(STqOffset *info, void *pVal, int32_t vLen);
int32_t tqMetaGetHandle(STQ* pTq, const char* key, STqHandle** pHandle);
void* tqMetaGetOffset(STQ* pTq, const char* subkey);
int32_t tqMetaTransform(STQ* pTq);
@ -141,7 +142,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data);
// tqOffset
int32_t tqBuildFName(char** data, const char* path, char* name);
int32_t tqOffsetRestoreFromFile(SHashObj* pOffset, char* name);
int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name);
// tq util
int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type);

View File

@ -80,6 +80,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
pTq->pOffset = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_ENTRY_LOCK);
taosHashSetFreeFp(pTq->pOffset, (FDelete)tDeleteSTqOffset);
int32_t code = tqInitialize(pTq);
if (code != TSDB_CODE_SUCCESS) {
@ -228,7 +229,7 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
return -1;
}
if (tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset)) < 0) {
if (tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), msg, msgLen - sizeof(vgOffset.consumerId)) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}

View File

@ -87,6 +87,18 @@ int32_t tqMetaDecodeCheckInfo(STqCheckInfo *info, void *pVal, int32_t vLen){
return code;
}
int32_t tqMetaDecodeOffsetInfo(STqOffset *info, void *pVal, int32_t vLen){
SDecoder decoder = {0};
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
int32_t code = tDecodeSTqOffset(&decoder, info);
if (code != 0) {
tDeleteSTqOffset(info);
return TSDB_CODE_OUT_OF_MEMORY;
}
tDecoderClear(&decoder);
return code;
}
int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen, const void* value, int32_t vLen) {
int32_t code = TDB_CODE_SUCCESS;
TXN* txn = NULL;
@ -113,21 +125,9 @@ int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen) {
static int32_t tqMetaTransformOffsetInfo(STQ* pTq, char* path) {
int32_t code = TDB_CODE_SUCCESS;
void* pIter = NULL;
TQ_ERR_RETURN(tqOffsetRestoreFromFile(pTq->pOffset, path));
while (1) {
pIter = taosHashIterate(pTq->pOffset, pIter);
if (pIter == NULL) {
break;
}
STqOffset* offset = (STqOffset*)pIter;
TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pOffsetStore, offset->subKey, strlen(offset->subKey), offset, sizeof(STqOffset)));
}
TQ_ERR_RETURN(tqOffsetRestoreFromFile(pTq, path));
END:
taosHashCancelIterate(pTq->pOffset, pIter);
return code;
}
@ -140,7 +140,14 @@ void* tqMetaGetOffset(STQ* pTq, const char* subkey){
return NULL;
}
if(taosHashPut(pTq->pOffset, subkey, strlen(subkey), data, sizeof(STqOffset)) != 0){
STqOffset offset = {0};
if (tqMetaDecodeOffsetInfo(&offset, data, vLen) != TDB_CODE_SUCCESS) {
tdbFree(data);
return NULL;
}
if(taosHashPut(pTq->pOffset, subkey, strlen(subkey), &offset, sizeof(STqOffset)) != 0){
tDeleteSTqOffset(&offset);
tdbFree(data);
return NULL;
}

View File

@ -32,10 +32,9 @@ int32_t tqBuildFName(char** data, const char* path, char* name) {
return TDB_CODE_SUCCESS;
}
int32_t tqOffsetRestoreFromFile(SHashObj* pOffset, char* name) {
int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) {
int32_t code = TDB_CODE_SUCCESS;
void* pMemBuf = NULL;
SDecoder decoder = {0};
TdFilePtr pFile = taosOpenFile(name, TD_FILE_READ);
if (pFile == NULL) {
@ -65,19 +64,15 @@ int32_t tqOffsetRestoreFromFile(SHashObj* pOffset, char* name) {
goto END;
}
STqOffset offset;
tDecoderInit(&decoder, pMemBuf, size);
if (tDecodeSTqOffset(&decoder, &offset) < 0) {
code = TSDB_CODE_INVALID_MSG;
STqOffset offset = {0};
TQ_ERR_GO_TO_END(tqMetaDecodeOffsetInfo(&offset, pMemBuf, size));
code = taosHashPut(pTq->pOffset, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset));
if (code != TDB_CODE_SUCCESS) {
tDeleteSTqOffset(&offset);
goto END;
}
TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pOffsetStore, offset.subKey, strlen(offset.subKey), pMemBuf, size));
if (taosHashPut(pOffset, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto END;
}
tDecoderClear(&decoder);
taosMemoryFree(pMemBuf);
pMemBuf = NULL;
}
@ -85,7 +80,6 @@ int32_t tqOffsetRestoreFromFile(SHashObj* pOffset, char* name) {
END:
taosCloseFile(&pFile);
taosMemoryFree(pMemBuf);
tDecoderClear(&decoder);
return code;
}