diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 050c6d09dc..141aff0337 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -132,6 +132,7 @@ int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen); int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle); int32_t tqMetaDecodeCheckInfo(STqCheckInfo *info, void *pVal, int32_t vLen); int32_t tqMetaDecodeOffsetInfo(STqOffset *info, void *pVal, int32_t vLen); +int32_t tqMetaSaveOffset(STQ* pTq, STqOffset* pOffset); int32_t tqMetaGetHandle(STQ* pTq, const char* key, STqHandle** pHandle); int32_t tqMetaGetOffset(STQ* pTq, const char* subkey, STqOffset** pOffset); int32_t tqMetaTransform(STQ* pTq); diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 9e90c1cd59..62c2713fbd 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -102,6 +102,37 @@ int32_t tqMetaDecodeOffsetInfo(STqOffset* info, void* pVal, int32_t vLen) { return code; } +int32_t tqMetaSaveOffset(STQ* pTq, STqOffset* pOffset) { + void* buf = NULL; + int32_t code = TDB_CODE_SUCCESS; + int32_t vlen; + SEncoder encoder; + tEncodeSize(tEncodeSTqOffset, pOffset, vlen, code); + if (code < 0) { + goto END; + } + + + buf = taosMemoryCalloc(1, vlen); + if (buf == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto END; + } + + tEncoderInit(&encoder, buf, vlen); + code = tEncodeSTqOffset(&encoder, pOffset); + if (code < 0) { + goto END; + } + + TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), buf, vlen)); + +END: + tEncoderClear(&encoder); + taosMemoryFree(buf); + return code; +} + int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen, const void* value, int32_t vLen) { int32_t code = TDB_CODE_SUCCESS; TXN* txn = NULL; diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index 63257d7fab..42b42286fb 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -44,14 +44,15 @@ int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) { int64_t ret = 0; int32_t size = 0; + int32_t total = 0; while (1) { if ((ret = taosReadFile(pFile, &size, INT_BYTES)) != INT_BYTES) { if (ret != 0) { code = TSDB_CODE_INVALID_MSG; } - goto END; + break; } - + total += INT_BYTES; size = htonl(size); pMemBuf = taosMemoryCalloc(1, size); if (pMemBuf == NULL) { @@ -64,6 +65,7 @@ int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) { goto END; } + total += size; STqOffset offset = {0}; TQ_ERR_GO_TO_END(tqMetaDecodeOffsetInfo(&offset, pMemBuf, size)); code = taosHashPut(pTq->pOffset, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)); @@ -71,13 +73,18 @@ int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name) { tDeleteSTqOffset(&offset); goto END; } - TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pOffsetStore, offset.subKey, strlen(offset.subKey), pMemBuf, size)); - tqInfo("tq: offset restore from file to tdb, subkey:%s", offset.subKey); + tqInfo("tq: offset restore from file to tdb, size:%d, hash size:%d subkey:%s", total, taosHashGetSize(pTq->pOffset), offset.subKey); taosMemoryFree(pMemBuf); pMemBuf = NULL; } + void *pIter = NULL; + while ((pIter = taosHashIterate(pTq->pOffset, pIter))) { + STqOffset* pOffset = (STqOffset*)pIter; + tqMetaSaveOffset(pTq, pOffset); + } + END: taosCloseFile(&pFile); taosMemoryFree(pMemBuf);