feat(wal): ref

This commit is contained in:
Liu Jicong 2022-07-25 15:31:36 +08:00
parent 5b654779ba
commit 222e925644
3 changed files with 12 additions and 5 deletions

View File

@ -385,8 +385,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
} }
if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) { if (pHandle->execHandle.subType != TOPIC_SUB_TYPE__COLUMN) {
int64_t fetchVer = fetchOffsetNew.version + 1; int64_t fetchVer = fetchOffsetNew.version + 1;
SWalCkHead* pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) { if (pCkHead == NULL) {
code = -1; code = -1;
goto OVER; goto OVER;

View File

@ -52,7 +52,7 @@ int32_t tqMetaOpen(STQ* pTq) {
ASSERT(0); ASSERT(0);
} }
TXN txn; TXN txn = {0};
if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) { if (tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
ASSERT(0); ASSERT(0);
@ -75,7 +75,13 @@ int32_t tqMetaOpen(STQ* pTq) {
STqHandle handle; STqHandle handle;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
tDecodeSTqHandle(&decoder, &handle); tDecodeSTqHandle(&decoder, &handle);
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
handle.pRef = walOpenRef(pTq->pVnode->pWal);
if (handle.pRef == NULL) {
ASSERT(0);
}
walRefVer(handle.pRef, handle.snapshotVer);
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
SReadHandle reader = { SReadHandle reader = {
.meta = pTq->pVnode->pMeta, .meta = pTq->pVnode->pMeta,
@ -94,6 +100,7 @@ int32_t tqMetaOpen(STQ* pTq) {
handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner); handle.execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
ASSERT(handle.execHandle.pExecReader); ASSERT(handle.execHandle.pExecReader);
} else { } else {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
handle.execHandle.execDb.pFilterOutTbUid = handle.execHandle.execDb.pFilterOutTbUid =
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
} }

View File

@ -54,7 +54,7 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
} }
/*if (pReader->cond.enableRef) {*/ /*if (pReader->cond.enableRef) {*/
/*taosHashPut(pWal->pRefHash, &pReader->readerId, sizeof(int64_t), &pReader, sizeof(void *));*/ /* taosHashPut(pWal->pRefHash, &pReader->readerId, sizeof(int64_t), &pReader, sizeof(void *));*/
/*}*/ /*}*/
return pReader; return pReader;