fix:load pHandle if necessary wher tq restore
This commit is contained in:
parent
fb123d8761
commit
8a81278024
|
@ -214,7 +214,7 @@ int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
|
|||
|
||||
void walRefFirstVer(SWal *, SWalRef *);
|
||||
void walRefLastVer(SWal *, SWalRef *);
|
||||
SWalRef *walRefCommittedVer(SWal *);
|
||||
void walRefCommitVer(SWal *, SWalRef *);
|
||||
|
||||
SWalRef *walOpenRef(SWal *);
|
||||
void walCloseRef(SWal *pWal, int64_t refId);
|
||||
|
|
|
@ -31,7 +31,7 @@ typedef void *(*__array_item_dup_fn_t)(void *);
|
|||
|
||||
typedef void (*FDelete)(void *);
|
||||
typedef int32_t (*FEncode)(void **buf, const void *dst);
|
||||
typedef void *(*FDecode)(const void *buf, void *dst);
|
||||
typedef void *(*FDecode)(const void *buf, void *dst, int8_t sver);
|
||||
|
||||
#define TD_EQ 0x1
|
||||
#define TD_GT 0x2
|
||||
|
|
|
@ -244,7 +244,7 @@ int32_t taosArraySearchIdx(const SArray* pArray, const void* key, __compar_fn_t
|
|||
void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* param);
|
||||
|
||||
int32_t taosEncodeArray(void** buf, const SArray* pArray, FEncode encode);
|
||||
void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz);
|
||||
void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz, int8_t sver);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -572,7 +572,7 @@ typedef struct {
|
|||
//SMqVgEp* tCloneSMqVgEp(const SMqVgEp* pVgEp);
|
||||
//void tDeleteSMqVgEp(SMqVgEp* pVgEp);
|
||||
int32_t tEncodeSMqVgEp(void** buf, const SMqVgEp* pVgEp);
|
||||
void* tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp);
|
||||
void* tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp, int8_t sver);
|
||||
|
||||
typedef struct {
|
||||
int64_t consumerId; // -1 for unassigned
|
||||
|
|
|
@ -207,9 +207,13 @@ int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
|
|||
return tlen;
|
||||
}
|
||||
|
||||
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp) {
|
||||
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp, int8_t sver) {
|
||||
buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
|
||||
// buf = taosDecodeString(buf, &pVgEp->qmsg);
|
||||
if(sver == 1){
|
||||
uint64_t size = 0;
|
||||
buf = taosDecodeVariantU64(buf, &size);
|
||||
buf = POINTER_SHIFT(buf, size);
|
||||
}
|
||||
buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
|
||||
return (void *)buf;
|
||||
}
|
||||
|
@ -390,18 +394,18 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer, int8_t s
|
|||
return (void *)buf;
|
||||
}
|
||||
|
||||
SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) {
|
||||
SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp));
|
||||
if (pConsumerEpNew == NULL) return NULL;
|
||||
pConsumerEpNew->consumerId = pConsumerEpOld->consumerId;
|
||||
pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, NULL);
|
||||
return pConsumerEpNew;
|
||||
}
|
||||
|
||||
void tDeleteSMqConsumerEp(void *data) {
|
||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data;
|
||||
taosArrayDestroy(pConsumerEp->vgs);
|
||||
}
|
||||
//SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) {
|
||||
// SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp));
|
||||
// if (pConsumerEpNew == NULL) return NULL;
|
||||
// pConsumerEpNew->consumerId = pConsumerEpOld->consumerId;
|
||||
// pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, NULL);
|
||||
// return pConsumerEpNew;
|
||||
//}
|
||||
//
|
||||
//void tDeleteSMqConsumerEp(void *data) {
|
||||
// SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data;
|
||||
// taosArrayDestroy(pConsumerEp->vgs);
|
||||
//}
|
||||
|
||||
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
|
||||
int32_t tlen = 0;
|
||||
|
@ -436,7 +440,7 @@ int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
|
|||
|
||||
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp, int8_t sver) {
|
||||
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
|
||||
buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp));
|
||||
buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
|
||||
if (sver > 1){
|
||||
int32_t szVgs = 0;
|
||||
buf = taosDecodeFixedI32(buf, &szVgs);
|
||||
|
@ -580,6 +584,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
|
|||
// do nothing
|
||||
}
|
||||
}
|
||||
tlen += taosEncodeString(buf, pSub->qmsg);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
|
@ -602,7 +607,7 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
|
|||
taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp));
|
||||
}
|
||||
|
||||
buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp));
|
||||
buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp), sver);
|
||||
buf = taosDecodeStringTo(buf, pSub->dbName);
|
||||
|
||||
if (sver > 1){
|
||||
|
@ -626,6 +631,7 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub, int8_t sver) {
|
|||
}
|
||||
}
|
||||
}
|
||||
buf = taosDecodeString(buf, &pSub->qmsg);
|
||||
}
|
||||
return (void *)buf;
|
||||
}
|
||||
|
|
|
@ -139,6 +139,7 @@ static STqMgmt tqMgmt = {0};
|
|||
|
||||
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle);
|
||||
int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
|
||||
void tqDestroyTqHandle(void* data);
|
||||
|
||||
// tqRead
|
||||
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
|
||||
|
@ -161,6 +162,8 @@ int32_t tqMetaRestoreHandle(STQ* pTq);
|
|||
int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_t vLen);
|
||||
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key);
|
||||
int32_t tqMetaRestoreCheckInfo(STQ* pTq);
|
||||
int32_t tqMetaGetHandle(STQ* pTq, const char* key);
|
||||
int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle);
|
||||
|
||||
STqOffsetStore* tqOffsetOpen(STQ* pTq);
|
||||
void tqOffsetClose(STqOffsetStore*);
|
||||
|
|
|
@ -62,7 +62,7 @@ void tqCleanUp() {
|
|||
}
|
||||
}
|
||||
|
||||
static void destroyTqHandle(void* data) {
|
||||
void tqDestroyTqHandle(void* data) {
|
||||
STqHandle* pData = (STqHandle*)data;
|
||||
qDestroyTask(pData->execHandle.task);
|
||||
|
||||
|
@ -102,7 +102,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
|
|||
pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
|
||||
|
||||
pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||
taosHashSetFreeFp(pTq->pHandle, destroyTqHandle);
|
||||
taosHashSetFreeFp(pTq->pHandle, tqDestroyTqHandle);
|
||||
|
||||
taosInitRWLatch(&pTq->lock);
|
||||
pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
|
||||
|
@ -661,13 +661,17 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
return -1;
|
||||
}
|
||||
|
||||
SVnode* pVnode = pTq->pVnode;
|
||||
int32_t vgId = TD_VID(pVnode);
|
||||
|
||||
tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
|
||||
tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
|
||||
req.oldConsumerId, req.newConsumerId);
|
||||
|
||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||
STqHandle* pHandle = NULL;
|
||||
while(1){
|
||||
pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||
if (pHandle || tqMetaGetHandle(pTq, req.subKey) < 0){
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (pHandle == NULL) {
|
||||
if (req.oldConsumerId != -1) {
|
||||
tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
|
||||
|
@ -678,86 +682,13 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
|
||||
goto end;
|
||||
}
|
||||
|
||||
STqHandle tqHandle = {0};
|
||||
pHandle = &tqHandle;
|
||||
|
||||
memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
|
||||
pHandle->consumerId = req.newConsumerId;
|
||||
pHandle->epoch = -1;
|
||||
|
||||
pHandle->execHandle.subType = req.subType;
|
||||
pHandle->fetchMeta = req.withMeta;
|
||||
|
||||
// TODO version should be assigned and refed during preprocess
|
||||
SWalRef* pRef = walRefCommittedVer(pVnode->pWal);
|
||||
if (pRef == NULL) {
|
||||
ret = -1;
|
||||
STqHandle handle = {0};
|
||||
ret = tqCreateHandle(pTq, &req, &handle);
|
||||
if(ret < 0){
|
||||
tqDestroyTqHandle(&handle);
|
||||
goto end;
|
||||
}
|
||||
|
||||
int64_t ver = pRef->refVer;
|
||||
pHandle->pRef = pRef;
|
||||
|
||||
SReadHandle handle = {.vnode = pVnode, .initTableReader = true, .initTqReader = true, .version = ver};
|
||||
initStorageAPI(&handle.api);
|
||||
|
||||
pHandle->snapshotVer = ver;
|
||||
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
pHandle->execHandle.execCol.qmsg = taosStrdup(req.qmsg);
|
||||
|
||||
pHandle->execHandle.task = qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId,
|
||||
&pHandle->execHandle.numOfCols, req.newConsumerId);
|
||||
void* scanner = NULL;
|
||||
qExtractStreamScanner(pHandle->execHandle.task, &scanner);
|
||||
pHandle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
|
||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||
pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
|
||||
pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
|
||||
|
||||
pHandle->execHandle.execDb.pFilterOutTbUid =
|
||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||
buildSnapContext(handle.vnode, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
|
||||
(SSnapContext**)(&handle.sContext));
|
||||
|
||||
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
|
||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
|
||||
pHandle->execHandle.execTb.suid = req.suid;
|
||||
pHandle->execHandle.execTb.qmsg = taosStrdup(req.qmsg);
|
||||
|
||||
if (strcmp(pHandle->execHandle.execTb.qmsg, "") != 0) {
|
||||
if (nodesStringToNode(pHandle->execHandle.execTb.qmsg, &pHandle->execHandle.execTb.node) != 0) {
|
||||
tqError("nodesStringToNode error in sub stable, since %s, vgId:%d, subkey:%s consumer:0x%" PRIx64, terrstr(),
|
||||
pVnode->config.vgId, req.subKey, pHandle->consumerId);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
buildSnapContext(handle.vnode, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
|
||||
(SSnapContext**)(&handle.sContext));
|
||||
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
|
||||
|
||||
SArray* tbUidList = NULL;
|
||||
ret = qGetTableList(req.suid, pVnode, pHandle->execHandle.execTb.node, &tbUidList, pHandle->execHandle.task);
|
||||
if (ret != TDB_CODE_SUCCESS) {
|
||||
tqError("qGetTableList error:%d vgId:%d, subkey:%s consumer:0x%" PRIx64, ret, pVnode->config.vgId, req.subKey,
|
||||
pHandle->consumerId);
|
||||
taosArrayDestroy(tbUidList);
|
||||
goto end;
|
||||
}
|
||||
tqDebug("tq try to get ctb for stb subscribe, vgId:%d, subkey:%s consumer:0x%" PRIx64 " suid:%" PRId64,
|
||||
pVnode->config.vgId, req.subKey, pHandle->consumerId, req.suid);
|
||||
pHandle->execHandle.pTqReader = tqReaderOpen(pVnode);
|
||||
tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList, NULL);
|
||||
taosArrayDestroy(tbUidList);
|
||||
}
|
||||
|
||||
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
||||
tqDebug("try to persist handle %s consumer:0x%" PRIx64, req.subKey, pHandle->consumerId);
|
||||
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
||||
goto end;
|
||||
ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
|
||||
} else {
|
||||
taosWLockLatch(&pTq->lock);
|
||||
|
||||
|
|
|
@ -88,9 +88,9 @@ int32_t tqMetaOpen(STQ* pTq) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (tqMetaRestoreHandle(pTq) < 0) {
|
||||
return -1;
|
||||
}
|
||||
// if (tqMetaRestoreHandle(pTq) < 0) {
|
||||
// return -1;
|
||||
// }
|
||||
|
||||
if (tqMetaRestoreCheckInfo(pTq) < 0) {
|
||||
return -1;
|
||||
|
@ -274,6 +274,120 @@ int32_t tqMetaDeleteHandle(STQ* pTq, const char* key) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int buildHandle(STQ* pTq, STqHandle* handle){
|
||||
SVnode* pVnode = pTq->pVnode;
|
||||
int32_t vgId = TD_VID(pVnode);
|
||||
|
||||
handle->pRef = walOpenRef(pVnode->pWal);
|
||||
if (handle->pRef == NULL) {
|
||||
return -1;
|
||||
}
|
||||
walSetRefVer(handle->pRef, handle->snapshotVer);
|
||||
|
||||
SReadHandle reader = {
|
||||
.vnode = pVnode,
|
||||
.initTableReader = true,
|
||||
.initTqReader = true,
|
||||
.version = handle->snapshotVer,
|
||||
};
|
||||
|
||||
initStorageAPI(&reader.api);
|
||||
|
||||
if (handle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
handle->execHandle.task =
|
||||
qCreateQueueExecTaskInfo(handle->execHandle.execCol.qmsg, &reader, vgId, &handle->execHandle.numOfCols, handle->consumerId);
|
||||
if (handle->execHandle.task == NULL) {
|
||||
tqError("cannot create exec task for %s", handle->subKey);
|
||||
return -1;
|
||||
}
|
||||
void* scanner = NULL;
|
||||
qExtractStreamScanner(handle->execHandle.task, &scanner);
|
||||
if (scanner == NULL) {
|
||||
tqError("cannot extract stream scanner for %s", handle->subKey);
|
||||
return -1;
|
||||
}
|
||||
handle->execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
|
||||
if (handle->execHandle.pTqReader == NULL) {
|
||||
tqError("cannot extract exec reader for %s", handle->subKey);
|
||||
return -1;
|
||||
}
|
||||
} else if (handle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||
handle->pWalReader = walOpenReader(pVnode->pWal, NULL);
|
||||
handle->execHandle.pTqReader = tqReaderOpen(pVnode);
|
||||
|
||||
buildSnapContext(reader.vnode, reader.version, 0, handle->execHandle.subType, handle->fetchMeta,
|
||||
(SSnapContext**)(&reader.sContext));
|
||||
handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId);
|
||||
} else if (handle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
handle->pWalReader = walOpenReader(pVnode->pWal, NULL);
|
||||
|
||||
if(handle->execHandle.execTb.qmsg != NULL && strcmp(handle->execHandle.execTb.qmsg, "") != 0) {
|
||||
if (nodesStringToNode(handle->execHandle.execTb.qmsg, &handle->execHandle.execTb.node) != 0) {
|
||||
tqError("nodesStringToNode error in sub stable, since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
buildSnapContext(reader.vnode, reader.version, handle->execHandle.execTb.suid, handle->execHandle.subType,
|
||||
handle->fetchMeta, (SSnapContext**)(&reader.sContext));
|
||||
handle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, handle->consumerId);
|
||||
|
||||
SArray* tbUidList = NULL;
|
||||
int ret = qGetTableList(handle->execHandle.execTb.suid, pVnode, handle->execHandle.execTb.node, &tbUidList, handle->execHandle.task);
|
||||
if(ret != TDB_CODE_SUCCESS) {
|
||||
tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle->subKey, handle->consumerId);
|
||||
taosArrayDestroy(tbUidList);
|
||||
return -1;
|
||||
}
|
||||
tqDebug("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pVnode->config.vgId, handle->execHandle.execTb.suid);
|
||||
handle->execHandle.pTqReader = tqReaderOpen(pVnode);
|
||||
tqReaderSetTbUidList(handle->execHandle.pTqReader, tbUidList, NULL);
|
||||
taosArrayDestroy(tbUidList);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int restoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle){
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
||||
tDecodeSTqHandle(&decoder, handle);
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
if(buildHandle(pTq, handle) < 0){
|
||||
return -1;
|
||||
}
|
||||
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
|
||||
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
|
||||
}
|
||||
|
||||
int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
|
||||
memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN);
|
||||
handle->consumerId = req->newConsumerId;
|
||||
handle->epoch = -1;
|
||||
|
||||
handle->execHandle.subType = req->subType;
|
||||
handle->fetchMeta = req->withMeta;
|
||||
if(req->subType == TOPIC_SUB_TYPE__COLUMN){
|
||||
handle->execHandle.execCol.qmsg = taosStrdup(req->qmsg);
|
||||
}else if(req->subType == TOPIC_SUB_TYPE__DB){
|
||||
handle->execHandle.execDb.pFilterOutTbUid =
|
||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||
}else if(req->subType == TOPIC_SUB_TYPE__TABLE){
|
||||
handle->execHandle.execTb.suid = req->suid;
|
||||
handle->execHandle.execTb.qmsg = taosStrdup(req->qmsg);
|
||||
}
|
||||
|
||||
handle->snapshotVer = walGetLastVer(pTq->pVnode->pWal);
|
||||
|
||||
if(buildHandle(pTq, handle) < 0){
|
||||
return -1;
|
||||
}
|
||||
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
|
||||
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
|
||||
}
|
||||
|
||||
int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||
int code = 0;
|
||||
TBC* pCur = NULL;
|
||||
|
@ -281,97 +395,40 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
void* pKey = NULL;
|
||||
int kLen = 0;
|
||||
void* pVal = NULL;
|
||||
int vLen = 0;
|
||||
SDecoder decoder;
|
||||
|
||||
tdbTbcMoveToFirst(pCur);
|
||||
|
||||
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
||||
STqHandle handle = {0};
|
||||
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
||||
tDecodeSTqHandle(&decoder, &handle);
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
handle.pRef = walOpenRef(pTq->pVnode->pWal);
|
||||
if (handle.pRef == NULL) {
|
||||
code = -1;
|
||||
goto end;
|
||||
code = restoreHandle(pTq, pVal, vLen, &handle);
|
||||
if (code < 0){
|
||||
tqDestroyTqHandle(&handle);
|
||||
break;
|
||||
}
|
||||
walSetRefVer(handle.pRef, handle.snapshotVer);
|
||||
|
||||
SReadHandle reader = {
|
||||
.vnode = pTq->pVnode,
|
||||
.initTableReader = true,
|
||||
.initTqReader = true,
|
||||
.version = handle.snapshotVer
|
||||
};
|
||||
|
||||
initStorageAPI(&reader.api);
|
||||
|
||||
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
handle.execHandle.task =
|
||||
qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, 0);
|
||||
if (handle.execHandle.task == NULL) {
|
||||
tqError("cannot create exec task for %s", handle.subKey);
|
||||
code = -1;
|
||||
goto end;
|
||||
}
|
||||
void* scanner = NULL;
|
||||
qExtractStreamScanner(handle.execHandle.task, &scanner);
|
||||
if (scanner == NULL) {
|
||||
tqError("cannot extract stream scanner for %s", handle.subKey);
|
||||
code = -1;
|
||||
goto end;
|
||||
}
|
||||
handle.execHandle.pTqReader = qExtractReaderFromStreamScanner(scanner);
|
||||
if (handle.execHandle.pTqReader == NULL) {
|
||||
tqError("cannot extract exec reader for %s", handle.subKey);
|
||||
code = -1;
|
||||
goto end;
|
||||
}
|
||||
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||
handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode);
|
||||
|
||||
buildSnapContext(reader.vnode, reader.version, 0, handle.execHandle.subType, handle.fetchMeta,
|
||||
(SSnapContext**)(&reader.sContext));
|
||||
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0);
|
||||
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||
|
||||
if(handle.execHandle.execTb.qmsg != NULL && strcmp(handle.execHandle.execTb.qmsg, "") != 0) {
|
||||
if (nodesStringToNode(handle.execHandle.execTb.qmsg, &handle.execHandle.execTb.node) != 0) {
|
||||
tqError("nodesStringToNode error in sub stable, since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
buildSnapContext(reader.vnode, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType,
|
||||
handle.fetchMeta, (SSnapContext**)(&reader.sContext));
|
||||
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0);
|
||||
|
||||
SArray* tbUidList = NULL;
|
||||
int ret = qGetTableList(handle.execHandle.execTb.suid, pTq->pVnode, handle.execHandle.execTb.node, &tbUidList, handle.execHandle.task);
|
||||
if(ret != TDB_CODE_SUCCESS) {
|
||||
tqError("qGetTableList error:%d handle %s consumer:0x%" PRIx64, ret, handle.subKey, handle.consumerId);
|
||||
taosArrayDestroy(tbUidList);
|
||||
goto end;
|
||||
}
|
||||
tqDebug("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pTq->pVnode->config.vgId, handle.execHandle.execTb.suid);
|
||||
handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode);
|
||||
tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList, NULL);
|
||||
taosArrayDestroy(tbUidList);
|
||||
}
|
||||
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, vgId);
|
||||
taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
|
||||
}
|
||||
|
||||
end:
|
||||
tdbFree(pKey);
|
||||
tdbFree(pVal);
|
||||
tdbTbcClose(pCur);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tqMetaGetHandle(STQ* pTq, const char* key) {
|
||||
void* pVal = NULL;
|
||||
int vLen = 0;
|
||||
|
||||
if (tdbTbGet(pTq->pExecStore, key, (int)strlen(key), &pVal, &vLen) < 0) {
|
||||
return -1;
|
||||
}
|
||||
STqHandle handle = {0};
|
||||
int code = restoreHandle(pTq, pVal, vLen, &handle);
|
||||
if (code < 0){
|
||||
tqDestroyTqHandle(&handle);
|
||||
}
|
||||
tdbFree(pVal);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -81,26 +81,11 @@ void walRefLastVer(SWal *pWal, SWalRef *pRef) {
|
|||
wDebug("vgId:%d, wal ref version %" PRId64 " for last", pWal->cfg.vgId, ver);
|
||||
}
|
||||
|
||||
SWalRef *walRefCommittedVer(SWal *pWal) {
|
||||
SWalRef *pRef = walOpenRef(pWal);
|
||||
if (pRef == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
void walRefCommitVer(SWal *pWal, SWalRef *pRef) {
|
||||
taosThreadMutexLock(&pWal->mutex);
|
||||
|
||||
int64_t ver = walGetCommittedVer(pWal);
|
||||
|
||||
wDebug("vgId:%d, wal ref version %" PRId64 " for committed", pWal->cfg.vgId, ver);
|
||||
|
||||
pRef->refVer = ver;
|
||||
// bsearch in fileSet
|
||||
SWalFileInfo tmpInfo;
|
||||
tmpInfo.firstVer = ver;
|
||||
SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE);
|
||||
ASSERT(pRet != NULL);
|
||||
// pRef->refFile = pRet->firstVer;
|
||||
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return pRef;
|
||||
wDebug("vgId:%d, wal ref version %" PRId64 " for committed", pWal->cfg.vgId, ver);
|
||||
}
|
||||
|
|
|
@ -476,13 +476,13 @@ int32_t taosEncodeArray(void** buf, const SArray* pArray, FEncode encode) {
|
|||
return tlen;
|
||||
}
|
||||
|
||||
void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz) {
|
||||
void* taosDecodeArray(const void* buf, SArray** pArray, FDecode decode, int32_t dataSz, int8_t sver) {
|
||||
int32_t sz;
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
*pArray = taosArrayInit(sz, sizeof(void*));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
void* data = taosMemoryCalloc(1, dataSz);
|
||||
buf = decode(buf, data);
|
||||
buf = decode(buf, data, sver);
|
||||
taosArrayPush(*pArray, &data);
|
||||
}
|
||||
return (void*)buf;
|
||||
|
|
Loading…
Reference in New Issue