opti:[TS-4593] transform offset from file to tdb in tmq

This commit is contained in:
wangmm0220 2024-07-15 09:48:04 +08:00
parent 5957b2c19a
commit e78a75d183
16 changed files with 506 additions and 1018 deletions

View File

@ -9297,12 +9297,12 @@ int32_t tEncodeSTqCheckInfo(SEncoder *pEncoder, const STqCheckInfo *pInfo) {
int32_t tDecodeSTqCheckInfo(SDecoder *pDecoder, STqCheckInfo *pInfo) {
if (tDecodeCStrTo(pDecoder, pInfo->topic) < 0) return -1;
if (tDecodeI64(pDecoder, &pInfo->ntbUid) < 0) return -1;
int32_t sz;
int32_t sz = 0;
if (tDecodeI32(pDecoder, &sz) < 0) return -1;
pInfo->colIdList = taosArrayInit(sz, sizeof(int16_t));
if (pInfo->colIdList == NULL) return -1;
for (int32_t i = 0; i < sz; i++) {
int16_t colId;
int16_t colId = 0;
if (tDecodeI16(pDecoder, &colId) < 0) return -1;
taosArrayPush(pInfo->colIdList, &colId);
}

View File

@ -725,19 +725,19 @@ static int32_t mndDropCheckInfoByTopic(SMnode *pMnode, STrans *pTrans, SMqTopicO
continue;
}
buf = taosMemoryCalloc(1, sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN);
buf = taosMemoryCalloc(1, sizeof(SMsgHead) + sizeof(pTopic->ntbUid));
if (buf == NULL){
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId);
memcpy(abuf, pTopic->name, TSDB_TOPIC_FNAME_LEN);
*(int64_t*)abuf = pTopic->ntbUid;
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = buf;
action.contLen = sizeof(SMsgHead) + TSDB_TOPIC_FNAME_LEN;
action.contLen = sizeof(SMsgHead) + sizeof(pTopic->ntbUid);
action.msgType = TDMT_VND_TMQ_DEL_CHECKINFO;
code = mndTransAppendRedoAction(pTrans, &action);
if (code != 0) {

View File

@ -68,11 +68,8 @@ set(
"src/tq/tqOffset.c"
"src/tq/tqPush.c"
"src/tq/tqSink.c"
"src/tq/tqCommit.c"
"src/tq/tqStreamTask.c"
"src/tq/tqHandleSnapshot.c"
"src/tq/tqCheckInfoSnapshot.c"
"src/tq/tqOffsetSnapshot.c"
"src/tq/tqSnapshot.c"
"src/tq/tqStreamStateSnap.c"
"src/tq/tqStreamTaskSnap.c"

View File

@ -41,8 +41,6 @@ extern "C" {
#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
typedef struct STqOffsetStore STqOffsetStore;
#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
// tqExec
@ -101,10 +99,11 @@ struct STQ {
SHashObj* pPushMgr; // subKey -> STqHandle
SHashObj* pHandle; // subKey -> STqHandle
SHashObj* pCheckInfo; // topic -> SAlterCheckInfo
STqOffsetStore* pOffsetStore;
SHashObj* pOffset; // subKey -> STqOffsetVal
TDB* pMetaDB;
TTB* pExecStore;
TTB* pCheckStore;
TTB* pOffsetStore;
SStreamMeta* pStreamMeta;
};
@ -128,29 +127,22 @@ int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId);
int32_t tqMetaOpen(STQ* pTq);
int32_t tqMetaClose(STQ* pTq);
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle);
int32_t tqMetaDeleteHandle(STQ* pTq, const char* key);
int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen);
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key);
int32_t tqMetaRestoreCheckInfo(STQ* pTq);
int32_t tqMetaGetHandle(STQ* pTq, const char* key);
int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle);
STqOffsetStore* tqOffsetOpen(STQ* pTq);
int32_t tqMetaTransform(STQ* pTq);
void tqOffsetClose(STqOffsetStore*);
STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey);
int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset);
int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey);
int32_t tqOffsetCommitFile(STqOffsetStore* pStore);
int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen, const void* value, int32_t vLen);
int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen);
int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle);
int32_t tqMetaDecodeCheckInfo(STqCheckInfo *info, void *pVal, int32_t vLen);
int32_t tqMetaGetHandle(STQ* pTq, const char* key, STqHandle** pHandle);
void* tqMetaGetCheckInfo(STQ* pTq, int64_t tbUid);
void* tqMetaGetOffset(STQ* pTq, const char* subkey);
int32_t tqMetaTransform(STQ* pTq);
// tqSink
int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
const char* pIdStr, bool newSubTableRule);
void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data);
// tqOffset
char* tqOffsetBuildFName(const char* path, int32_t fVer);
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
int32_t tqBuildFName(char** data, const char* path, char* name);
int32_t tqOffsetRestoreFromFile(SHashObj* pOffset, char* name);
// tq util
int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type);
@ -166,6 +158,25 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c
SVCreateTbReq* buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols,
SSDataBlock* pDataBlock, SArray* pTagArray, bool newSubTableRule);
#define TQ_ERR_GO_TO_END(c) \
do { \
code = c; \
if (code != TSDB_CODE_SUCCESS) { \
goto END; \
} \
} while (0)
#define TQ_ERR_RETURN(c) \
do { \
code = c; \
if (code != TSDB_CODE_SUCCESS) { \
return code; \
} \
} while (0)
#define TQ_SUBSCRIBE_NAME "subscribe"
#define TQ_OFFSET_NAME "offset-ver0"
#ifdef __cplusplus
}
#endif

View File

@ -71,10 +71,6 @@ typedef struct STsdbSnapRAWReader STsdbSnapRAWReader;
typedef struct STsdbSnapRAWWriter STsdbSnapRAWWriter;
typedef struct STqSnapReader STqSnapReader;
typedef struct STqSnapWriter STqSnapWriter;
typedef struct STqOffsetReader STqOffsetReader;
typedef struct STqOffsetWriter STqOffsetWriter;
typedef struct STqCheckInfoReader STqCheckInfoReader;
typedef struct STqCheckInfoWriter STqCheckInfoWriter;
typedef struct SStreamTaskReader SStreamTaskReader;
typedef struct SStreamTaskWriter SStreamTaskWriter;
typedef struct SStreamStateReader SStreamStateReader;
@ -268,7 +264,6 @@ int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqBuildStreamTask(void* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqScanWal(STQ* pTq);
int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId);
// tq-mq
@ -351,29 +346,17 @@ int32_t tsdbSnapRAWWrite(STsdbSnapRAWWriter* pWriter, SSnapDataHdr* pHdr);
int32_t tsdbSnapRAWWriterPrepareClose(STsdbSnapRAWWriter* pWriter);
int32_t tsdbSnapRAWWriterClose(STsdbSnapRAWWriter** ppWriter, int8_t rollback);
// STqSnapshotReader ==
int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** ppReader);
int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, int8_t type, STqSnapReader** ppReader);
int32_t tqSnapReaderClose(STqSnapReader** ppReader);
int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData);
// STqSnapshotWriter ======================================
int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** ppWriter);
int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback);
int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
// STqCheckInfoshotReader ==
int32_t tqCheckInfoReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqCheckInfoReader** ppReader);
int32_t tqCheckInfoReaderClose(STqCheckInfoReader** ppReader);
int32_t tqCheckInfoRead(STqCheckInfoReader* pReader, uint8_t** ppData);
// STqCheckInfoshotWriter ======================================
int32_t tqCheckInfoWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqCheckInfoWriter** ppWriter);
int32_t tqCheckInfoWriterClose(STqCheckInfoWriter** ppWriter, int8_t rollback);
int32_t tqCheckInfoWrite(STqCheckInfoWriter* pWriter, uint8_t* pData, uint32_t nData);
// STqOffsetReader ========================================
int32_t tqOffsetReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetReader** ppReader);
int32_t tqOffsetReaderClose(STqOffsetReader** ppReader);
int32_t tqOffsetSnapRead(STqOffsetReader* pReader, uint8_t** ppData);
// STqOffsetWriter ========================================
int32_t tqOffsetWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetWriter** ppWriter);
int32_t tqOffsetWriterClose(STqOffsetWriter** ppWriter, int8_t rollback);
int32_t tqOffsetSnapWrite(STqOffsetWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t tqSnapHandleWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t tqSnapCheckInfoWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t tqSnapOffsetWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
// SStreamTaskWriter ======================================
int32_t streamTaskSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskReader** ppReader);

View File

@ -76,9 +76,11 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
taosInitRWLatch(&pTq->lock);
pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
pTq->pCheckInfo = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
pTq->pOffset = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_ENTRY_LOCK);
int32_t code = tqInitialize(pTq);
if (code != TSDB_CODE_SUCCESS) {
tqClose(pTq);
@ -98,18 +100,10 @@ int32_t tqInitialize(STQ* pTq) {
streamMetaLoadAllTasks(pTq->pStreamMeta);
if (tqMetaTransform(pTq) < 0) {
if (tqMetaOpen(pTq) < 0) {
return -1;
}
if (tqMetaRestoreCheckInfo(pTq) < 0) {
return -1;
}
pTq->pOffsetStore = tqOffsetOpen(pTq);
if (pTq->pOffsetStore == NULL) {
return -1;
}
return 0;
}
@ -133,10 +127,10 @@ void tqClose(STQ* pTq) {
pIter = taosHashIterate(pTq->pPushMgr, pIter);
}
tqOffsetClose(pTq->pOffsetStore);
taosHashCleanup(pTq->pHandle);
taosHashCleanup(pTq->pPushMgr);
taosHashCleanup(pTq->pCheckInfo);
taosHashCleanup(pTq->pOffset);
taosMemoryFree(pTq->path);
tqMetaClose(pTq);
@ -221,7 +215,7 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
goto end;
}
STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
STqOffset* pSavedOffset = (STqOffset*)tqMetaGetOffset(pTq, pOffset->subKey);
if (pSavedOffset != NULL && tqOffsetEqual(pOffset, pSavedOffset)) {
tqInfo("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
@ -229,10 +223,14 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
}
// save the new offset value
code = tqOffsetWrite(pTq->pOffsetStore, pOffset);
if(code != 0) {
code = TSDB_CODE_INVALID_MSG;
goto end;
if (taosHashPut(pTq->pOffset, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset))){
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (tqMetaSaveInfo(pTq, pTq->pOffsetStore, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset)) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
@ -284,25 +282,16 @@ end:
}
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
void* pIter = NULL;
STqCheckInfo* pCheck = tqMetaGetCheckInfo(pTq, tbUid);
if(pCheck == NULL) {
return 0;
}
while (1) {
pIter = taosHashIterate(pTq->pCheckInfo, pIter);
if (pIter == NULL) {
break;
}
STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
if (pCheck->ntbUid == tbUid) {
int32_t sz = taosArrayGetSize(pCheck->colIdList);
for (int32_t i = 0; i < sz; i++) {
int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
if (forbidColId == colId) {
taosHashCancelIterate(pTq->pCheckInfo, pIter);
return -1;
}
}
int32_t sz = taosArrayGetSize(pCheck->colIdList);
for (int32_t i = 0; i < sz; i++) {
int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
if (forbidColId == colId) {
return -1;
}
}
@ -360,21 +349,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
while (1) {
taosWLockLatch(&pTq->lock);
// 1. find handle
pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) {
do {
if (tqMetaGetHandle(pTq, req.subKey) == 0) {
pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle != NULL) {
break;
}
}
tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
terrno = TSDB_CODE_INVALID_MSG;
taosWUnLockLatch(&pTq->lock);
code = -1;
goto END;
} while (0);
code = tqMetaGetHandle(pTq, req.subKey, &pHandle);
if (code != TDB_CODE_SUCCESS) {
tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
terrno = TSDB_CODE_INVALID_MSG;
taosWUnLockLatch(&pTq->lock);
return -1;
}
// 2. check rebalance status
@ -443,7 +423,7 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
tDecoderClear(&decoder);
STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, vgOffset.offset.subKey);
STqOffset* pSavedOffset = (STqOffset*)tqMetaGetOffset(pTq, vgOffset.offset.subKey);
if (pSavedOffset == NULL) {
terrno = TSDB_CODE_TMQ_NO_COMMITTED;
return terrno;
@ -523,7 +503,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
if (reqOffset.type == TMQ_OFFSET__LOG) {
dataRsp.common.rspOffset.version = reqOffset.version;
} else if (reqOffset.type < 0) {
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
STqOffset* pOffset = (STqOffset*)(STqOffset*)tqMetaGetOffset(pTq, req.subKey);
if (pOffset != NULL) {
if (pOffset->val.type != TMQ_OFFSET__LOG) {
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s, no valid wal info", consumerId, vgId, req.subKey);
@ -590,12 +570,14 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
}
taosWLockLatch(&pTq->lock);
code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey);
if (code != 0) {
tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey);
if (taosHashRemove(pTq->pOffset, pReq->subKey, strlen(pReq->subKey) != 0)) {
tqError("cannot process tq delete req %s, since no such offset in hash", pReq->subKey);
}
if (tqMetaDeleteInfo(pTq, pTq->pOffsetStore, pReq->subKey, strlen(pReq->subKey)) != 0) {
tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
}
if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) {
if (tqMetaDeleteInfo(pTq, pTq->pExecStore, pReq->subKey, strlen(pReq->subKey)) < 0) {
tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey);
}
taosWUnLockLatch(&pTq->lock);
@ -603,20 +585,39 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
return 0;
}
void mergeTwoArray(SArray* a, SArray* b){
size_t bLen = taosArrayGetSize(b);
for(int i = 0; i < bLen; i++){
void* dataB = taosArrayGet(b, i);
size_t aLen = taosArrayGetSize(a);
int j = 0;
for(; j < aLen; j++){
void* dataA = taosArrayGet(a, i);
if(*(int64_t*)dataA == *(int64_t*)dataB){
break;
}
}
if(j == aLen){
taosArrayPush(a, dataB);
}
}
}
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
STqCheckInfo info = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
if(tqMetaDecodeCheckInfo(&info, msg, msgLen) != 0){
return -1;
}
tDecoderClear(&decoder);
if (taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)) < 0) {
STqCheckInfo *old = tqMetaGetCheckInfo(pTq, info.ntbUid);
if (old != NULL){
mergeTwoArray(info.colIdList, old->colIdList);
}
if (taosHashPut(pTq->pCheckInfo, &info.ntbUid, sizeof(info.ntbUid), &info, sizeof(STqCheckInfo)) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tDeleteSTqCheckInfo(&info);
return -1;
}
if (tqMetaSaveCheckInfo(pTq, info.topic, msg, msgLen) < 0) {
if (tqMetaSaveInfo(pTq, pTq->pCheckStore, &info.ntbUid, sizeof(info.ntbUid), msg, msgLen) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
@ -628,7 +629,7 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (tqMetaDeleteCheckInfo(pTq, msg) < 0) {
if (tqMetaDeleteInfo(pTq, pTq->pCheckStore, msg, msgLen) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
@ -652,19 +653,10 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
tqInfo("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
req.oldConsumerId, req.newConsumerId);
taosRLockLatch(&pTq->lock);
STqHandle* pHandle = NULL;
while (1) {
pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle) {
break;
}
taosRLockLatch(&pTq->lock);
ret = tqMetaGetHandle(pTq, req.subKey);
taosRUnLockLatch(&pTq->lock);
if (ret < 0) {
break;
}
}
ret = tqMetaGetHandle(pTq, req.subKey, &pHandle);
taosRUnLockLatch(&pTq->lock);
if (pHandle == NULL) {
if (req.oldConsumerId != -1) {
tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
@ -675,7 +667,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
goto end;
}
STqHandle handle = {0};
ret = tqCreateHandle(pTq, &req, &handle);
ret = tqMetaCreateHandle(pTq, &req, &handle);
if (ret < 0) {
tqDestroyTqHandle(&handle);
goto end;

View File

@ -1,196 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "meta.h"
#include "tdbInt.h"
#include "tq.h"
// STqCheckInfoReader ========================================
struct STqCheckInfoReader {
STQ* pTq;
int64_t sver;
int64_t ever;
TBC* pCur;
};
int32_t tqCheckInfoReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqCheckInfoReader** ppReader) {
int32_t code = 0;
STqCheckInfoReader* pReader = NULL;
// alloc
pReader = (STqCheckInfoReader*)taosMemoryCalloc(1, sizeof(STqCheckInfoReader));
if (pReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pReader->pTq = pTq;
pReader->sver = sver;
pReader->ever = ever;
// impl
code = tdbTbcOpen(pTq->pCheckStore, &pReader->pCur, NULL);
if (code) {
taosMemoryFree(pReader);
goto _err;
}
code = tdbTbcMoveToFirst(pReader->pCur);
if (code) {
taosMemoryFree(pReader);
goto _err;
}
tqInfo("vgId:%d, vnode checkinfo tq reader opened", TD_VID(pTq->pVnode));
*ppReader = pReader;
return code;
_err:
tqError("vgId:%d, vnode checkinfo tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
*ppReader = NULL;
return code;
}
int32_t tqCheckInfoReaderClose(STqCheckInfoReader** ppReader) {
int32_t code = 0;
tdbTbcClose((*ppReader)->pCur);
taosMemoryFree(*ppReader);
*ppReader = NULL;
return code;
}
int32_t tqCheckInfoRead(STqCheckInfoReader* pReader, uint8_t** ppData) {
int32_t code = 0;
void* pKey = NULL;
void* pVal = NULL;
int32_t kLen = 0;
int32_t vLen = 0;
if (tdbTbcNext(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) {
goto _exit;
}
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen);
if (*ppData == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
pHdr->type = SNAP_DATA_TQ_CHECKINFO;
pHdr->size = vLen;
memcpy(pHdr->data, pVal, vLen);
_exit:
tdbFree(pKey);
tdbFree(pVal);
tqInfo("vgId:%d, vnode check info tq read data, vLen:%d", TD_VID(pReader->pTq->pVnode), vLen);
return code;
_err:
tdbFree(pKey);
tdbFree(pVal);
tqError("vgId:%d, vnode check info tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code));
return code;
}
// STqCheckInfoWriter ========================================
struct STqCheckInfoWriter {
STQ* pTq;
int64_t sver;
int64_t ever;
TXN* txn;
};
int32_t tqCheckInfoWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqCheckInfoWriter** ppWriter) {
int32_t code = 0;
STqCheckInfoWriter* pWriter;
// alloc
pWriter = (STqCheckInfoWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pWriter->pTq = pTq;
pWriter->sver = sver;
pWriter->ever = ever;
if (tdbBegin(pTq->pMetaDB, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
code = -1;
taosMemoryFree(pWriter);
goto _err;
}
*ppWriter = pWriter;
return code;
_err:
tqError("vgId:%d, tq check info writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
*ppWriter = NULL;
return code;
}
int32_t tqCheckInfoWriterClose(STqCheckInfoWriter** ppWriter, int8_t rollback) {
int32_t code = 0;
STqCheckInfoWriter* pWriter = *ppWriter;
STQ* pTq = pWriter->pTq;
if (rollback) {
tdbAbort(pWriter->pTq->pMetaDB, pWriter->txn);
} else {
code = tdbCommit(pWriter->pTq->pMetaDB, pWriter->txn);
if (code) goto _err;
code = tdbPostCommit(pWriter->pTq->pMetaDB, pWriter->txn);
if (code) goto _err;
}
taosMemoryFree(pWriter);
*ppWriter = NULL;
return code;
_err:
tqError("vgId:%d, tq check info writer close failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
return code;
}
int32_t tqCheckInfoWrite(STqCheckInfoWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
STQ* pTq = pWriter->pTq;
STqCheckInfo info = {0};
SDecoder decoder;
SDecoder* pDecoder = &decoder;
tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
code = tDecodeSTqCheckInfo(pDecoder, &info);
if (code) goto _err;
code = taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo));
if (code) goto _err;
code = tqMetaSaveCheckInfo(pTq, info.topic, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
if (code) goto _err;
tDecoderClear(pDecoder);
return code;
_err:
tDecoderClear(pDecoder);
tqError("vgId:%d, vnode check info tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
return code;
}

View File

@ -1,28 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tq.h"
int tqCommit(STQ* pTq) {
#if 0
// stream meta commit does not be aligned to the vnode commit
if (streamMetaCommit(pTq->pStreamMeta) < 0) {
tqError("vgId:%d, failed to commit stream meta since %s", TD_VID(pTq->pVnode), terrstr());
return -1;
}
#endif
return tqOffsetCommitFile(pTq->pOffsetStore);
}

View File

@ -75,125 +75,116 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) {
return 0;
}
int32_t tqMetaOpen(STQ* pTq) {
if (tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0, 0, NULL) < 0) {
return -1;
int32_t tqMetaDecodeCheckInfo(STqCheckInfo *info, void *pVal, int32_t vLen){
SDecoder decoder = {0};
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
int32_t code = tDecodeSTqCheckInfo(&decoder, info);
if (code != 0) {
tDeleteSTqCheckInfo(info);
return TSDB_CODE_OUT_OF_MEMORY;
}
if (tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore, 0) < 0) {
return -1;
}
if (tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore, 0) < 0) {
return -1;
}
return 0;
}
int32_t tqMetaClose(STQ* pTq) {
if (pTq->pExecStore) {
tdbTbClose(pTq->pExecStore);
}
if (pTq->pCheckStore) {
tdbTbClose(pTq->pCheckStore);
}
tdbClose(pTq->pMetaDB);
return 0;
}
int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen) {
TXN* txn;
if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
return -1;
}
if (tdbTbUpsert(pTq->pCheckStore, key, strlen(key), value, vLen, txn) < 0) {
return -1;
}
if (tdbCommit(pTq->pMetaDB, txn) < 0) {
return -1;
}
if (tdbPostCommit(pTq->pMetaDB, txn) < 0) {
return -1;
}
return 0;
}
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key) {
TXN* txn;
if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
return -1;
}
if (tdbTbDelete(pTq->pCheckStore, key, (int)strlen(key), txn) < 0) {
tqWarn("vgId:%d, tq try delete checkinfo failed %s", pTq->pVnode->config.vgId, key);
}
if (tdbCommit(pTq->pMetaDB, txn) < 0) {
return -1;
}
if (tdbPostCommit(pTq->pMetaDB, txn) < 0) {
return -1;
}
return 0;
}
int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
TBC* pCur = NULL;
if (tdbTbcOpen(pTq->pCheckStore, &pCur, NULL) < 0) {
return -1;
}
void* pKey = NULL;
int kLen = 0;
void* pVal = NULL;
int vLen = 0;
SDecoder decoder;
int32_t code = 0;
tdbTbcMoveToFirst(pCur);
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
STqCheckInfo info;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
code = tDecodeSTqCheckInfo(&decoder, &info);
if (code != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto END;
}
tDecoderClear(&decoder);
code = taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo));
if (code != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto END;
}
}
END:
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
tDecoderClear(&decoder);
return code;
}
void* tqMetaGetCheckInfo(STQ* pTq, int64_t tbUid){
void* data = taosHashGet(pTq->pCheckInfo, &tbUid, sizeof(tbUid));
if (data == NULL) {
int vLen = 0;
if (tdbTbGet(pTq->pCheckStore, &tbUid, sizeof(tbUid), &data, &vLen) < 0) {
tdbFree(data);
return NULL;
}
STqCheckInfo info= {0};
if(tqMetaDecodeCheckInfo(&info, data, vLen) != 0) {
tdbFree(data);
return NULL;
}
tdbFree(data);
if(taosHashPut(pTq->pCheckInfo, &tbUid, sizeof(tbUid), &info, sizeof(STqCheckInfo)) != 0){
tDeleteSTqCheckInfo(&info);
return NULL;
}
return taosHashGet(pTq->pCheckInfo, &tbUid, sizeof(tbUid));
} else {
return data;
}
}
int32_t tqMetaSaveInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen, const void* value, int32_t vLen) {
int32_t code = TDB_CODE_SUCCESS;
TXN* txn = NULL;
TQ_ERR_RETURN(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
TQ_ERR_RETURN(tdbTbUpsert(ttb, key, kLen, value, vLen, txn));
TQ_ERR_RETURN(tdbCommit(pTq->pMetaDB, txn));
TQ_ERR_RETURN(tdbPostCommit(pTq->pMetaDB, txn));
return 0;
}
int32_t tqMetaDeleteInfo(STQ* pTq, TTB* ttb, const void* key, int32_t kLen) {
int32_t code = TDB_CODE_SUCCESS;
TXN* txn = NULL;
TQ_ERR_RETURN(tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
TQ_ERR_RETURN(tdbTbDelete(ttb, key, kLen, txn));
TQ_ERR_RETURN(tdbCommit(pTq->pMetaDB, txn));
TQ_ERR_RETURN(tdbPostCommit(pTq->pMetaDB, txn));
return 0;
}
static int32_t tqMetaTransformOffsetInfo(STQ* pTq, char* path) {
int32_t code = TDB_CODE_SUCCESS;
void* pIter = NULL;
TQ_ERR_RETURN(tqOffsetRestoreFromFile(pTq->pOffset, path));
while (1) {
pIter = taosHashIterate(pTq->pOffset, pIter);
if (pIter == NULL) {
break;
}
STqOffset* offset = (STqOffset*)pIter;
TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pOffsetStore, offset->subKey, strlen(offset->subKey), offset, sizeof(STqOffset)));
}
END:
taosHashCancelIterate(pTq->pOffset, pIter);
return code;
}
void* tqMetaGetOffset(STQ* pTq, const char* subkey){
void* data = taosHashGet(pTq->pOffset, subkey, strlen(subkey));
if (data == NULL) {
int vLen = 0;
if (tdbTbGet(pTq->pOffsetStore, subkey, strlen(subkey), &data, &vLen) < 0) {
tdbFree(data);
return NULL;
}
if(taosHashPut(pTq->pOffset, subkey, strlen(subkey), data, sizeof(STqOffset)) != 0){
tdbFree(data);
return NULL;
}
tdbFree(data);
return taosHashGet(pTq->pOffset, subkey, strlen(subkey));
} else {
return data;
}
}
int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
int32_t code;
int32_t code = TDB_CODE_SUCCESS;
int32_t vlen;
void* buf = NULL;
SEncoder encoder;
tEncodeSize(tEncodeSTqHandle, pHandle, vlen, code);
if (code < 0) {
goto end;
goto END;
}
tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 " epoch:%d vgId:%d", pHandle->subKey,
@ -202,71 +193,33 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
buf = taosMemoryCalloc(1, vlen);
if (buf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
goto END;
}
tEncoderInit(&encoder, buf, vlen);
code = tEncodeSTqHandle(&encoder, pHandle);
if (code < 0) {
goto end;
goto END;
}
TXN* txn = NULL;
code = tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
if (code < 0) {
goto end;
}
TQ_ERR_GO_TO_END(tqMetaSaveInfo(pTq, pTq->pExecStore, key, (int)strlen(key), buf, vlen));
code = tdbTbUpsert(pTq->pExecStore, key, (int)strlen(key), buf, vlen, txn);
if (code < 0) {
goto end;
}
code = tdbCommit(pTq->pMetaDB, txn);
if (code < 0) {
goto end;
}
code = tdbPostCommit(pTq->pMetaDB, txn);
end:
END:
tEncoderClear(&encoder);
taosMemoryFree(buf);
return code;
}
int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
TXN* txn;
if (tdbBegin(pTq->pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) <
0) {
return -1;
}
if (tdbTbDelete(pTq->pExecStore, key, (int)strlen(key), txn) < 0) {
}
if (tdbCommit(pTq->pMetaDB, txn) < 0) {
return -1;
}
if (tdbPostCommit(pTq->pMetaDB, txn) < 0) {
return -1;
}
return 0;
}
static int buildHandle(STQ* pTq, STqHandle* handle){
static int tqMetaInitHandle(STQ* pTq, STqHandle* handle){
int32_t code = TDB_CODE_SUCCESS;
SVnode* pVnode = pTq->pVnode;
int32_t vgId = TD_VID(pVnode);
handle->pRef = walOpenRef(pVnode->pWal);
if (handle->pRef == NULL) {
return -1;
return TSDB_CODE_OUT_OF_MEMORY;
}
walSetRefVer(handle->pRef, handle->snapshotVer);
TQ_ERR_RETURN(walSetRefVer(handle->pRef, handle->snapshotVer));
SReadHandle reader = {
.vnode = pVnode,
@ -282,18 +235,18 @@ static int buildHandle(STQ* pTq, STqHandle* handle){
qCreateQueueExecTaskInfo(handle->execHandle.execCol.qmsg, &reader, vgId, &handle->execHandle.numOfCols, handle->consumerId);
if (handle->execHandle.task == NULL) {
tqError("cannot create exec task for %s", handle->subKey);
return -1;
return TSDB_CODE_OUT_OF_MEMORY;
}
void* scanner = NULL;
qExtractStreamScanner(handle->execHandle.task, &scanner);
(void)qExtractStreamScanner(handle->execHandle.task, &scanner);
if (scanner == NULL) {
tqError("cannot extract stream scanner for %s", handle->subKey);
return -1;
return TSDB_CODE_SCH_INTERNAL_ERROR;
}
handle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
if (handle->execHandle.pTqReader == NULL) {
tqError("cannot extract exec reader for %s", handle->subKey);
return -1;
return TSDB_CODE_SCH_INTERNAL_ERROR;
}
} else if (handle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
handle->pWalReader = walOpenReader(pVnode->pWal, NULL, 0);
@ -308,7 +261,7 @@ static int buildHandle(STQ* pTq, STqHandle* handle){
if(handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) {
if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) {
tqError("nodesStringToNode error in sub stable, since %s", terrstr());
return -1;
return TSDB_CODE_SCH_INTERNAL_ERROR;
}
}
buildSnapContext(reader.vnode, reader.version, handle->execHandle.execTb.suid, handle->execHandle.subType,
@ -320,7 +273,7 @@ static int buildHandle(STQ* pTq, STqHandle* handle){
if(ret != TDB_CODE_SUCCESS) {
tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle->subKey, handle->consumerId);
taosArrayDestroy(tbUidList);
return -1;
return TSDB_CODE_SCH_INTERNAL_ERROR;
}
tqInfo("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pVnode->config.vgId, handle->execHandle.execTb.suid);
handle->execHandle.pTqReader = tqReaderOpen(pVnode);
@ -330,24 +283,23 @@ static int buildHandle(STQ* pTq, STqHandle* handle){
return 0;
}
static int restoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle){
static int32_t tqMetaRestoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle){
int32_t vgId = TD_VID(pTq->pVnode);
SDecoder decoder;
int32_t code = 0;
SDecoder decoder = {0};
int32_t code = TDB_CODE_SUCCESS;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
code = tDecodeSTqHandle(&decoder, handle);
if (code) goto end;
code = buildHandle(pTq, handle);
if (code) goto end;
tqInfo("restoreHandle %s consumer 0x%" PRIx64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
TQ_ERR_GO_TO_END(tDecodeSTqHandle(&decoder, handle));
TQ_ERR_GO_TO_END(tqMetaInitHandle(pTq, handle));
tqInfo("tqMetaRestoreHandle %s consumer 0x%" PRIx64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
code = taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
end:
END:
tDecoderClear(&decoder);
return code;
}
int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
int32_t tqMetaCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
int32_t vgId = TD_VID(pTq->pVnode);
memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN);
@ -367,165 +319,186 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal);
if(buildHandle(pTq, handle) < 0){
if(tqMetaInitHandle(pTq, handle) < 0){
return -1;
}
tqInfo("tqCreateHandle %s consumer 0x%" PRIx64 " vgId:%d, snapshotVer:%" PRId64, handle->subKey, handle->consumerId, vgId, handle->snapshotVer);
tqInfo("tqMetaCreateHandle %s consumer 0x%" PRIx64 " vgId:%d, snapshotVer:%" PRId64, handle->subKey, handle->consumerId, vgId, handle->snapshotVer);
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
}
static int32_t tqMetaTransformInfo(TDB* pMetaDB, TTB* pExecStoreOld, TTB* pExecStoreNew){
TBC* pCur = NULL;
if (tdbTbcOpen(pExecStoreOld, &pCur, NULL) < 0) {
return -1;
}
TXN* txn;
if (tdbBegin(pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
return -1;
}
static int32_t tqMetaTransformStoreInfo(TDB* pMetaDB, TTB* pExecStoreOld, TTB* pExecStoreNew){
TBC* pCur = NULL;
void* pKey = NULL;
int kLen = 0;
void* pVal = NULL;
int vLen = 0;
TXN* txn = NULL;
tdbTbcMoveToFirst(pCur);
int32_t code = TDB_CODE_SUCCESS;
TQ_ERR_GO_TO_END(tdbTbcOpen(pExecStoreOld, &pCur, NULL));
TQ_ERR_GO_TO_END(tdbBegin(pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
TQ_ERR_GO_TO_END(tdbTbcMoveToFirst(pCur));
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
if (tdbTbUpsert(pExecStoreNew, pKey, kLen, pVal, vLen, txn) < 0) {
tqError("transform sub info error");
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
return -1;
}
TQ_ERR_GO_TO_END (tdbTbUpsert(pExecStoreNew, pKey, kLen, pVal, vLen, txn));
}
TQ_ERR_GO_TO_END (tdbCommit(pMetaDB, txn));
TQ_ERR_GO_TO_END (tdbPostCommit(pMetaDB, txn));
END:
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
(void)tdbTbcClose(pCur);
return code;
}
if (tdbCommit(pMetaDB, txn) < 0) {
return -1;
static int32_t tqMetaTransformCheckInfo(TDB* pMetaDB, TTB* pCheckOld, TTB* pCheckNew){
TBC* pCur = NULL;
void* pKey = NULL;
int kLen = 0;
void* pVal = NULL;
int vLen = 0;
TXN* txn = NULL;
int32_t code = TDB_CODE_SUCCESS;
TQ_ERR_GO_TO_END(tdbTbcOpen(pCheckOld, &pCur, NULL));
TQ_ERR_GO_TO_END(tdbBegin(pMetaDB, &txn, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED));
TQ_ERR_GO_TO_END(tdbTbcMoveToFirst(pCur));
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
STqCheckInfo info= {0};
TQ_ERR_GO_TO_END(tqMetaDecodeCheckInfo(&info, pVal, vLen));
int64_t uid = info.ntbUid;
tDeleteSTqCheckInfo(&info);
TQ_ERR_GO_TO_END (tdbTbUpsert(pCheckNew, &uid, sizeof(uid), pVal, vLen, txn));
}
if (tdbPostCommit(pMetaDB, txn) < 0) {
return -1;
TQ_ERR_GO_TO_END (tdbCommit(pMetaDB, txn));
TQ_ERR_GO_TO_END (tdbPostCommit(pMetaDB, txn));
END:
tdbFree(pKey);
tdbFree(pVal);
(void)tdbTbcClose(pCur);
return code;
}
int32_t tqMetaGetHandle(STQ* pTq, const char* key, STqHandle** pHandle) {
void* data = taosHashGet(pTq->pHandle, key, strlen(key));
if(data == NULL){
int vLen = 0;
if (tdbTbGet(pTq->pExecStore, key, (int)strlen(key), &data, &vLen) < 0) {
tdbFree(data);
return TSDB_CODE_OUT_OF_MEMORY;
}
STqHandle handle = {0};
if (tqMetaRestoreHandle(pTq, data, vLen, &handle) != 0){
tdbFree(data);
tqDestroyTqHandle(&handle);
return TSDB_CODE_OUT_OF_MEMORY;
}
tdbFree(data);
if(taosHashPut(pTq->pHandle, key, strlen(key), &handle, sizeof(STqHandle)) != 0){
tqDestroyTqHandle(&handle);
return TSDB_CODE_OUT_OF_MEMORY;
}
*pHandle = taosHashGet(pTq->pCheckInfo, key, strlen(key));
if(*pHandle == NULL){
return TSDB_CODE_OUT_OF_MEMORY;
}
}else{
*pHandle = data;
}
return 0;
return TDB_CODE_SUCCESS;
}
int32_t tqMetaOpenTdb(STQ* pTq) {
int32_t code = TDB_CODE_SUCCESS;
TQ_ERR_GO_TO_END(tdbOpen(pTq->path, 16 * 1024, 1, &pTq->pMetaDB, 0, 0, NULL));
TQ_ERR_GO_TO_END(tdbTbOpen("tq.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pExecStore, 0));
TQ_ERR_GO_TO_END(tdbTbOpen("tq.check.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pCheckStore, 0));
TQ_ERR_GO_TO_END(tdbTbOpen("tq.offset.db", -1, -1, NULL, pTq->pMetaDB, &pTq->pOffsetStore, 0));
END:
return code;
}
static int32_t replaceTqPath(char** path){
char* tpath = NULL;
int32_t code = TDB_CODE_SUCCESS;
TQ_ERR_RETURN(tqBuildFName(&tpath, *path, TQ_SUBSCRIBE_NAME));
taosMemoryFree(*path);
*path = tpath;
return TDB_CODE_SUCCESS;
}
int32_t tqMetaOpen(STQ* pTq) {
char* maindb = NULL;
int32_t code = TDB_CODE_SUCCESS;
TQ_ERR_GO_TO_END(tqBuildFName(&maindb, pTq->path, TDB_MAINDB_NAME));
if(!taosCheckExistFile(maindb)){
TQ_ERR_GO_TO_END(replaceTqPath(&pTq->path));
TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq));
}else{
TQ_ERR_GO_TO_END(tqMetaTransform(pTq));
}
END:
taosMemoryFree(maindb);
return code;
}
int32_t tqMetaTransform(STQ* pTq) {
int32_t len = strlen(pTq->path) + 64;
char* maindb = taosMemoryCalloc(1, len);
sprintf(maindb, "%s%s%s", pTq->path, TD_DIRSEP, TDB_MAINDB_NAME);
int32_t code = TDB_CODE_SUCCESS;
TDB* pMetaDB = NULL;
TTB* pExecStore = NULL;
TTB* pCheckStore = NULL;
char* offsetNew = NULL;
char* offset = NULL;
TQ_ERR_GO_TO_END(tqBuildFName(&offset, pTq->path, TQ_OFFSET_NAME));
if(!taosCheckExistFile(maindb)){
taosMemoryFree(maindb);
char* tpath = taosMemoryCalloc(1, len);
if(tpath == NULL){
return -1;
}
sprintf(tpath, "%s%s%s", pTq->path, TD_DIRSEP, "subscribe");
taosMemoryFree(pTq->path);
pTq->path = tpath;
return tqMetaOpen(pTq);
}
TQ_ERR_GO_TO_END(tdbOpen(pTq->path, 16 * 1024, 1, &pMetaDB, 0, 0, NULL));
TQ_ERR_GO_TO_END(tdbTbOpen("tq.db", -1, -1, NULL, pMetaDB, &pExecStore, 0));
TQ_ERR_GO_TO_END(tdbTbOpen("tq.check.db", -1, -1, NULL, pMetaDB, &pCheckStore, 0));
int32_t code = 0;
TDB* pMetaDB = NULL;
TTB* pExecStore = NULL;
TTB* pCheckStore = NULL;
char* offsetNew = NULL;
char* offset = tqOffsetBuildFName(pTq->path, 0);
if(offset == NULL){
code = -1;
goto END;
}
TQ_ERR_GO_TO_END(replaceTqPath(&pTq->path));
TQ_ERR_GO_TO_END(tqMetaOpenTdb(pTq));
TQ_ERR_GO_TO_END(tqMetaTransformStoreInfo(pTq->pMetaDB, pExecStore, pTq->pExecStore));
TQ_ERR_GO_TO_END(tqMetaTransformCheckInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore));
if (tdbOpen(pTq->path, 16 * 1024, 1, &pMetaDB, 0, 0, NULL) < 0) {
code = -1;
goto END;
}
if (tdbTbOpen("tq.db", -1, -1, NULL, pMetaDB, &pExecStore, 0) < 0) {
code = -1;
goto END;
}
if (tdbTbOpen("tq.check.db", -1, -1, NULL, pMetaDB, &pCheckStore, 0) < 0) {
code = -1;
goto END;
}
char* tpath = taosMemoryCalloc(1, len);
if(tpath == NULL){
code = -1;
goto END;
}
sprintf(tpath, "%s%s%s", pTq->path, TD_DIRSEP, "subscribe");
taosMemoryFree(pTq->path);
pTq->path = tpath;
if (tqMetaOpen(pTq) < 0) {
code = -1;
goto END;
}
if( tqMetaTransformInfo(pTq->pMetaDB, pExecStore, pTq->pExecStore) < 0){
code = -1;
goto END;
}
if(tqMetaTransformInfo(pTq->pMetaDB, pCheckStore, pTq->pCheckStore) < 0){
code = -1;
goto END;
}
tdbTbClose(pExecStore);
pExecStore = NULL;
tdbTbClose(pCheckStore);
pCheckStore = NULL;
tdbClose(pMetaDB);
pMetaDB = NULL;
offsetNew = tqOffsetBuildFName(pTq->path, 0);
if(offsetNew == NULL){
code = -1;
goto END;
}
TQ_ERR_GO_TO_END(tqBuildFName(&offsetNew, pTq->path, TQ_OFFSET_NAME));
if(taosCheckExistFile(offset) && taosCopyFile(offset, offsetNew) < 0){
tqError("copy offset file error");
code = -1;
goto END;
}
taosRemoveFile(maindb);
taosRemoveFile(offset);
TQ_ERR_GO_TO_END(tqMetaTransformOffsetInfo(pTq, offsetNew));
(void)taosRemoveFile(offset);
END:
taosMemoryFree(maindb);
END:
taosMemoryFree(offset);
taosMemoryFree(offsetNew);
tdbTbClose(pExecStore);
tdbTbClose(pCheckStore);
tdbClose(pMetaDB);
//return 0 always, so ignore
(void)tdbTbClose(pExecStore);
(void)tdbTbClose(pCheckStore);
(void)tdbClose(pMetaDB);
return code;
}
int32_t tqMetaGetHandle(STQ* pTq, const char* key) {
void* pVal = NULL;
int vLen = 0;
if (tdbTbGet(pTq->pExecStore, key, (int)strlen(key), &pVal, &vLen) < 0) {
return -1;
int32_t tqMetaClose(STQ* pTq) {
if (pTq->pExecStore) {
(void)tdbTbClose(pTq->pExecStore);
}
STqHandle handle = {0};
int code = restoreHandle(pTq, pVal, vLen, &handle);
if (code < 0){
tqDestroyTqHandle(&handle);
if (pTq->pCheckStore) {
(void)tdbTbClose(pTq->pCheckStore);
}
tdbFree(pVal);
return code;
if (pTq->pOffsetStore) {
(void)tdbTbClose(pTq->pOffsetStore);
}
(void)tdbClose(pTq->pMetaDB);
return 0;
}

View File

@ -16,192 +16,76 @@
#include "tq.h"
struct STqOffsetStore {
STQ* pTq;
SHashObj* pHash; // SHashObj<subscribeKey, offset>
int8_t needCommit;
};
char* tqOffsetBuildFName(const char* path, int32_t fVer) {
int32_t len = strlen(path);
char* fname = taosMemoryCalloc(1, len + 40);
int32_t tqBuildFName(char** data, const char* path, char* name) {
int32_t len = strlen(path) + strlen(name) + 2;
char* fname = taosMemoryCalloc(1, len);
if(fname == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
snprintf(fname, len + 40, "%s/offset-ver%d", path, fVer);
return fname;
int32_t code = snprintf(fname, len, "%s%s%s", path, TD_DIRSEP, name);
if (code < 0){
code = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(fname);
return code;
}
*data = fname;
return TDB_CODE_SUCCESS;
}
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) {
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ);
int32_t tqOffsetRestoreFromFile(SHashObj* pOffset, char* name) {
int32_t code = TDB_CODE_SUCCESS;
void* pMemBuf = NULL;
SDecoder decoder = {0};
TdFilePtr pFile = taosOpenFile(name, TD_FILE_READ);
if (pFile == NULL) {
return TSDB_CODE_SUCCESS;
code = TAOS_SYSTEM_ERROR(errno);
goto END;
}
int32_t vgId = TD_VID(pStore->pTq->pVnode);
int64_t code = 0;
int64_t ret = 0;
int32_t size = 0;
while (1) {
if ((code = taosReadFile(pFile, &size, INT_BYTES)) != INT_BYTES) {
if (code == 0) {
break;
} else {
return -1;
if ((ret = taosReadFile(pFile, &size, INT_BYTES)) != INT_BYTES) {
if (ret != 0) {
code = TSDB_CODE_INVALID_MSG;
}
goto END;
}
size = htonl(size);
void* pMemBuf = taosMemoryCalloc(1, size);
pMemBuf = taosMemoryCalloc(1, size);
if (pMemBuf == NULL) {
tqError("vgId:%d failed to restore offset from file, since out of memory, malloc size:%d", vgId, size);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
code = TSDB_CODE_OUT_OF_MEMORY;
goto END;
}
if ((code = taosReadFile(pFile, pMemBuf, size)) != size) {
taosMemoryFree(pMemBuf);
return -1;
if (taosReadFile(pFile, pMemBuf, size) != size) {
terrno = TSDB_CODE_INVALID_MSG;
goto END;
}
STqOffset offset = {0};
SDecoder decoder;
STqOffset offset;
tDecoderInit(&decoder, pMemBuf, size);
if (tDecodeSTqOffset(&decoder, &offset) < 0) {
taosMemoryFree(pMemBuf);
tDecoderClear(&decoder);
return code;
code = TSDB_CODE_INVALID_MSG;
goto END;
}
if (taosHashPut(pOffset, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto END;
}
tDecoderClear(&decoder);
if (taosHashPut(pStore->pHash, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) {
return -1;
}
// todo remove this
if (offset.val.type == TMQ_OFFSET__LOG) {
STqHandle* pHandle = taosHashGet(pStore->pTq->pHandle, offset.subKey, strlen(offset.subKey));
if (pHandle) {
if (walSetRefVer(pHandle->pRef, offset.val.version) < 0) {
// tqError("vgId: %d, tq handle %s ref ver %" PRId64 "error", pStore->pTq->pVnode->config.vgId, pHandle->subKey,
// offset.val.version);
}
}
}
taosMemoryFree(pMemBuf);
pMemBuf = NULL;
}
END:
taosCloseFile(&pFile);
return TSDB_CODE_SUCCESS;
}
STqOffsetStore* tqOffsetOpen(STQ* pTq) {
STqOffsetStore* pStore = taosMemoryCalloc(1, sizeof(STqOffsetStore));
if (pStore == NULL) {
return NULL;
}
pStore->pTq = pTq;
pStore->needCommit = 0;
pTq->pOffsetStore = pStore;
pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
if (pStore->pHash == NULL) {
taosMemoryFree(pStore);
return NULL;
}
taosHashSetFreeFp(pStore->pHash, tOffsetDestroy);
char* fname = tqOffsetBuildFName(pStore->pTq->path, 0);
if (tqOffsetRestoreFromFile(pStore, fname) < 0) {
taosMemoryFree(fname);
taosMemoryFree(pStore);
return NULL;
}
taosMemoryFree(fname);
return pStore;
}
void tqOffsetClose(STqOffsetStore* pStore) {
if(pStore == NULL) return;
tqOffsetCommitFile(pStore);
taosHashCleanup(pStore->pHash);
taosMemoryFree(pStore);
}
STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey) {
return (STqOffset*)taosHashGet(pStore->pHash, subscribeKey, strlen(subscribeKey));
}
int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) {
pStore->needCommit = 1;
return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset));
}
int32_t tqOffsetDelete(STqOffsetStore* pStore, const char* subscribeKey) {
return taosHashRemove(pStore->pHash, subscribeKey, strlen(subscribeKey));
}
int32_t tqOffsetCommitFile(STqOffsetStore* pStore) {
if (!pStore->needCommit) {
return 0;
}
// TODO file name should be with a newer version
char* fname = tqOffsetBuildFName(pStore->pTq->path, 0);
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
const char* err = strerror(errno);
tqError("vgId:%d, failed to open offset file %s, since %s", TD_VID(pStore->pTq->pVnode), fname, err);
taosMemoryFree(fname);
return -1;
}
taosMemoryFree(fname);
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pStore->pHash, pIter);
if (pIter == NULL) {
break;
}
STqOffset* pOffset = (STqOffset*)pIter;
int32_t bodyLen;
int32_t code;
tEncodeSize(tEncodeSTqOffset, pOffset, bodyLen, code);
if (code < 0) {
taosHashCancelIterate(pStore->pHash, pIter);
return -1;
}
int32_t totLen = INT_BYTES + bodyLen;
void* buf = taosMemoryCalloc(1, totLen);
void* abuf = POINTER_SHIFT(buf, INT_BYTES);
*(int32_t*)buf = htonl(bodyLen);
SEncoder encoder;
tEncoderInit(&encoder, abuf, bodyLen);
tEncodeSTqOffset(&encoder, pOffset);
// write file
int64_t writeLen;
if ((writeLen = taosWriteFile(pFile, buf, totLen)) != totLen) {
tqError("write offset incomplete, len %d, write len %" PRId64, bodyLen, writeLen);
taosHashCancelIterate(pStore->pHash, pIter);
taosMemoryFree(buf);
return -1;
}
taosMemoryFree(buf);
}
// close and rename file
taosCloseFile(&pFile);
pStore->needCommit = 0;
return 0;
taosMemoryFree(pMemBuf);
tDecoderClear(&decoder);
return code;
}

View File

@ -1,168 +0,0 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "meta.h"
#include "tdbInt.h"
#include "tq.h"
// STqOffsetReader ========================================
struct STqOffsetReader {
STQ* pTq;
int64_t sver;
int64_t ever;
int8_t readEnd;
};
int32_t tqOffsetReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetReader** ppReader) {
STqOffsetReader* pReader = NULL;
pReader = taosMemoryCalloc(1, sizeof(STqOffsetReader));
if (pReader == NULL) {
*ppReader = NULL;
return -1;
}
pReader->pTq = pTq;
pReader->sver = sver;
pReader->ever = ever;
tqInfo("vgId:%d, vnode snapshot tq offset reader opened", TD_VID(pTq->pVnode));
*ppReader = pReader;
return 0;
}
int32_t tqOffsetReaderClose(STqOffsetReader** ppReader) {
taosMemoryFree(*ppReader);
*ppReader = NULL;
return 0;
}
int32_t tqOffsetSnapRead(STqOffsetReader* pReader, uint8_t** ppData) {
if (pReader->readEnd != 0) return 0;
char* fname = tqOffsetBuildFName(pReader->pTq->path, 0);
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ);
if (pFile == NULL) {
taosMemoryFree(fname);
return 0;
}
int64_t sz = 0;
if (taosStatFile(fname, &sz, NULL, NULL) < 0) {
taosCloseFile(&pFile);
taosMemoryFree(fname);
return -1;
}
taosMemoryFree(fname);
SSnapDataHdr* buf = taosMemoryCalloc(1, sz + sizeof(SSnapDataHdr));
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosCloseFile(&pFile);
return terrno;
}
void* abuf = POINTER_SHIFT(buf, sizeof(SSnapDataHdr));
int64_t contLen = taosReadFile(pFile, abuf, sz);
if (contLen != sz) {
taosCloseFile(&pFile);
taosMemoryFree(buf);
return -1;
}
buf->size = sz;
buf->type = SNAP_DATA_TQ_OFFSET;
*ppData = (uint8_t*)buf;
pReader->readEnd = 1;
taosCloseFile(&pFile);
return 0;
}
// STqOffseWriter ========================================
struct STqOffsetWriter {
STQ* pTq;
int64_t sver;
int64_t ever;
int32_t tmpFileVer;
char* fname;
};
int32_t tqOffsetWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetWriter** ppWriter) {
int32_t code = 0;
STqOffsetWriter* pWriter;
pWriter = (STqOffsetWriter*)taosMemoryCalloc(1, sizeof(STqOffsetWriter));
if (pWriter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pWriter->pTq = pTq;
pWriter->sver = sver;
pWriter->ever = ever;
*ppWriter = pWriter;
return code;
_err:
tqError("vgId:%d, tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
*ppWriter = NULL;
return code;
}
int32_t tqOffsetWriterClose(STqOffsetWriter** ppWriter, int8_t rollback) {
STqOffsetWriter* pWriter = *ppWriter;
STQ* pTq = pWriter->pTq;
char* fname = tqOffsetBuildFName(pTq->path, 0);
if (rollback) {
if (taosRemoveFile(pWriter->fname) < 0) {
taosMemoryFree(fname);
return -1;
}
} else {
if (taosRenameFile(pWriter->fname, fname) < 0) {
taosMemoryFree(fname);
return -1;
}
if (tqOffsetRestoreFromFile(pTq->pOffsetStore, fname) < 0) {
taosMemoryFree(fname);
return -1;
}
}
taosMemoryFree(fname);
taosMemoryFree(pWriter->fname);
taosMemoryFree(pWriter);
*ppWriter = NULL;
return 0;
}
int32_t tqOffsetSnapWrite(STqOffsetWriter* pWriter, uint8_t* pData, uint32_t nData) {
STQ* pTq = pWriter->pTq;
pWriter->tmpFileVer = 1;
pWriter->fname = tqOffsetBuildFName(pTq->path, pWriter->tmpFileVer);
TdFilePtr pFile = taosOpenFile(pWriter->fname, TD_FILE_CREATE | TD_FILE_WRITE);
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
int64_t size = pHdr->size;
if (pFile) {
int64_t contLen = taosWriteFile(pFile, pHdr->data, size);
if (contLen != size) {
taosCloseFile(&pFile);
return -1;
}
taosCloseFile(&pFile);
} else {
return -1;
}
return 0;
}

View File

@ -23,9 +23,10 @@ struct STqSnapReader {
int64_t sver;
int64_t ever;
TBC* pCur;
int8_t type;
};
int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** ppReader) {
int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, int8_t type, STqSnapReader** ppReader) {
int32_t code = 0;
STqSnapReader* pReader = NULL;
@ -38,9 +39,21 @@ int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** p
pReader->pTq = pTq;
pReader->sver = sver;
pReader->ever = ever;
pReader->type = type;
// impl
code = tdbTbcOpen(pTq->pExecStore, &pReader->pCur, NULL);
TTB *pTb = NULL;
if (type == SNAP_DATA_TQ_CHECKINFO) {
pTb = pTq->pCheckStore;
} else if (type == SNAP_DATA_TQ_HANDLE) {
pTb = pTq->pExecStore;
} else if (type == SNAP_DATA_TQ_OFFSET) {
pTb = pTq->pOffsetStore;
} else {
code = TSDB_CODE_INVALID_MSG;
goto _err;
}
code = tdbTbcOpen(pTb, &pReader->pCur, NULL);
if (code) {
taosMemoryFree(pReader);
goto _err;
@ -91,7 +104,7 @@ int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) {
}
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
pHdr->type = SNAP_DATA_TQ_HANDLE;
pHdr->type = pReader->type;
pHdr->size = vLen;
memcpy(pHdr->data, pVal, vLen);
@ -169,7 +182,7 @@ _err:
return code;
}
int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t tqSnapHandleWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
STQ* pTq = pWriter->pTq;
SDecoder decoder = {0};
@ -180,7 +193,7 @@ int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
code = tDecodeSTqHandle(pDecoder, &handle);
if (code) goto end;
taosWLockLatch(&pTq->lock);
code = tqMetaSaveHandle(pTq, handle.subKey, &handle);
code = tqMetaSaveInfo(pTq, pTq->pExecStore, handle.subKey, (int)strlen(handle.subKey), pData, nData);
taosWUnLockLatch(&pTq->lock);
end:
@ -189,3 +202,36 @@ end:
tqInfo("vgId:%d, vnode snapshot tq write result:%d", TD_VID(pTq->pVnode), code);
return code;
}
int32_t tqSnapCheckInfoWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
STQ* pTq = pWriter->pTq;
STqCheckInfo info = {0};
if(tqMetaDecodeCheckInfo(&info, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)) != 0){
goto _err;
}
code = tqMetaSaveInfo(pTq, pTq->pCheckStore, &info.ntbUid, sizeof(info.ntbUid), pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
if (code) goto _err;
return code;
_err:
tqError("vgId:%d, vnode check info tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
return code;
}
int32_t tqSnapOffsetWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
STQ* pTq = pWriter->pTq;
STqOffset *info = (STqOffset*)(pData + sizeof(SSnapDataHdr));
code = tqMetaSaveInfo(pTq, pTq->pOffsetStore, info->subKey, strlen(info->subKey), pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
if (code) goto _err;
return code;
_err:
tqError("vgId:%d, vnode check info tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code));
return code;
}

View File

@ -77,7 +77,7 @@ static int32_t tqInitTaosxRsp(SMqDataRspCommon* pRsp, STqOffsetVal pOffset) {
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
SRpcMsg* pMsg, bool* pBlockReturned) {
uint64_t consumerId = pRequest->consumerId;
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey);
STqOffset* pOffset = (STqOffset*)tqMetaGetOffset(pTq, pRequest->subKey);
int32_t vgId = TD_VID(pTq->pVnode);
*pBlockReturned = false;

View File

@ -458,11 +458,6 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
TSDB_CHECK_CODE(code, lino, _exit);
}
if (tqCommit(pVnode->pTq) < 0) {
code = TSDB_CODE_FAILED;
TSDB_CHECK_CODE(code, lino, _exit);
}
// commit info
if (vnodeCommitInfo(dir) < 0) {
code = terrno;

View File

@ -59,9 +59,9 @@ struct SVSnapReader {
int8_t tqHandleDone;
STqSnapReader *pTqSnapReader;
int8_t tqOffsetDone;
STqOffsetReader *pTqOffsetReader;
STqSnapReader *pTqOffsetReader;
int8_t tqCheckInfoDone;
STqCheckInfoReader *pTqCheckInfoReader;
STqSnapReader *pTqCheckInfoReader;
// stream
int8_t streamTaskDone;
SStreamTaskReader *pStreamTaskReader;
@ -233,11 +233,11 @@ void vnodeSnapReaderClose(SVSnapReader *pReader) {
}
if (pReader->pTqOffsetReader) {
tqOffsetReaderClose(&pReader->pTqOffsetReader);
tqSnapReaderClose(&pReader->pTqOffsetReader);
}
if (pReader->pTqCheckInfoReader) {
tqCheckInfoReaderClose(&pReader->pTqCheckInfoReader);
tqSnapReaderClose(&pReader->pTqCheckInfoReader);
}
taosMemoryFree(pReader);
@ -365,7 +365,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
vInfo("vgId:%d tq transform start", vgId);
if (!pReader->tqHandleDone) {
if (pReader->pTqSnapReader == NULL) {
code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, &pReader->pTqSnapReader);
code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_HANDLE, &pReader->pTqSnapReader);
if (code < 0) goto _err;
}
@ -384,11 +384,11 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
}
if (!pReader->tqCheckInfoDone) {
if (pReader->pTqCheckInfoReader == NULL) {
code = tqCheckInfoReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, &pReader->pTqCheckInfoReader);
code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_CHECKINFO, &pReader->pTqCheckInfoReader);
if (code < 0) goto _err;
}
code = tqCheckInfoRead(pReader->pTqCheckInfoReader, ppData);
code = tqSnapRead(pReader->pTqCheckInfoReader, ppData);
if (code) {
goto _err;
} else {
@ -396,18 +396,18 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
goto _exit;
} else {
pReader->tqCheckInfoDone = 1;
code = tqCheckInfoReaderClose(&pReader->pTqCheckInfoReader);
code = tqSnapReaderClose(&pReader->pTqCheckInfoReader);
if (code) goto _err;
}
}
}
if (!pReader->tqOffsetDone) {
if (pReader->pTqOffsetReader == NULL) {
code = tqOffsetReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, &pReader->pTqOffsetReader);
code = tqSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->ever, SNAP_DATA_TQ_OFFSET, &pReader->pTqOffsetReader);
if (code < 0) goto _err;
}
code = tqOffsetSnapRead(pReader->pTqOffsetReader, ppData);
code = tqSnapRead(pReader->pTqOffsetReader, ppData);
if (code) {
goto _err;
} else {
@ -415,7 +415,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
goto _exit;
} else {
pReader->tqOffsetDone = 1;
code = tqOffsetReaderClose(&pReader->pTqOffsetReader);
code = tqSnapReaderClose(&pReader->pTqOffsetReader);
if (code) goto _err;
}
}
@ -536,9 +536,9 @@ struct SVSnapWriter {
// tsdb raw
STsdbSnapRAWWriter *pTsdbSnapRAWWriter;
// tq
STqSnapWriter *pTqSnapWriter;
STqOffsetWriter *pTqOffsetWriter;
STqCheckInfoWriter *pTqCheckInfoWriter;
STqSnapWriter *pTqSnapHandleWriter;
STqSnapWriter *pTqSnapOffsetWriter;
STqSnapWriter *pTqSnapCheckInfoWriter;
// stream
SStreamTaskWriter *pStreamTaskWriter;
SStreamStateWriter *pStreamStateWriter;
@ -736,18 +736,18 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
if (code) goto _exit;
}
if (pWriter->pTqSnapWriter) {
code = tqSnapWriterClose(&pWriter->pTqSnapWriter, rollback);
if (pWriter->pTqSnapHandleWriter) {
code = tqSnapWriterClose(&pWriter->pTqSnapHandleWriter, rollback);
if (code) goto _exit;
}
if (pWriter->pTqCheckInfoWriter) {
code = tqCheckInfoWriterClose(&pWriter->pTqCheckInfoWriter, rollback);
if (pWriter->pTqSnapCheckInfoWriter) {
code = tqSnapWriterClose(&pWriter->pTqSnapCheckInfoWriter, rollback);
if (code) goto _exit;
}
if (pWriter->pTqOffsetWriter) {
code = tqOffsetWriterClose(&pWriter->pTqOffsetWriter, rollback);
if (pWriter->pTqSnapOffsetWriter) {
code = tqSnapWriterClose(&pWriter->pTqSnapOffsetWriter, rollback);
if (code) goto _exit;
}
@ -872,32 +872,32 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
} break;
case SNAP_DATA_TQ_HANDLE: {
// tq handle
if (pWriter->pTqSnapWriter == NULL) {
code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapWriter);
if (pWriter->pTqSnapHandleWriter == NULL) {
code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapHandleWriter);
if (code) goto _err;
}
code = tqSnapWrite(pWriter->pTqSnapWriter, pData, nData);
code = tqSnapHandleWrite(pWriter->pTqSnapHandleWriter, pData, nData);
if (code) goto _err;
} break;
case SNAP_DATA_TQ_CHECKINFO: {
// tq checkinfo
if (pWriter->pTqCheckInfoWriter == NULL) {
code = tqCheckInfoWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqCheckInfoWriter);
if (pWriter->pTqSnapCheckInfoWriter == NULL) {
code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapCheckInfoWriter);
if (code) goto _err;
}
code = tqCheckInfoWrite(pWriter->pTqCheckInfoWriter, pData, nData);
code = tqSnapCheckInfoWrite(pWriter->pTqSnapCheckInfoWriter, pData, nData);
if (code) goto _err;
} break;
case SNAP_DATA_TQ_OFFSET: {
// tq offset
if (pWriter->pTqOffsetWriter == NULL) {
code = tqOffsetWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqOffsetWriter);
if (pWriter->pTqSnapOffsetWriter == NULL) {
code = tqSnapWriterOpen(pVnode->pTq, pWriter->sver, pWriter->ever, &pWriter->pTqSnapOffsetWriter);
if (code) goto _err;
}
code = tqOffsetSnapWrite(pWriter->pTqOffsetWriter, pData, nData);
code = tqSnapOffsetWrite(pWriter->pTqSnapOffsetWriter, pData, nData);
if (code) goto _err;
} break;
case SNAP_DATA_STREAM_TASK:

View File

@ -52,15 +52,14 @@ int32_t walSetRefVer(SWalRef *pRef, int64_t ver) {
taosThreadMutexLock(&pWal->mutex);
if (ver < pWal->vers.firstVer || ver > pWal->vers.lastVer) {
taosThreadMutexUnlock(&pWal->mutex);
terrno = TSDB_CODE_WAL_INVALID_VER;
return -1;
return TSDB_CODE_WAL_INVALID_VER;
}
pRef->refVer = ver;
taosThreadMutexUnlock(&pWal->mutex);
}
return 0;
return TSDB_CODE_SUCCESS;
}
void walRefFirstVer(SWal *pWal, SWalRef *pRef) {