refactor: do some internal refactor.
This commit is contained in:
parent
66477a28ca
commit
39a1fa8f78
|
@ -291,7 +291,7 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr
|
||||||
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot);
|
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot);
|
||||||
int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData);
|
int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData);
|
||||||
|
|
||||||
int32_t buildSnapContext(SMeta *pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta,
|
int32_t buildSnapContext(SVnode *pVnode, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta,
|
||||||
SSnapContext **ctxRet);
|
SSnapContext **ctxRet);
|
||||||
int32_t getMetafromSnapShot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid);
|
int32_t getMetafromSnapShot(SSnapContext *ctx, void **pBuf, int32_t *contLen, int16_t *type, int64_t *uid);
|
||||||
SMetaTableInfo getUidfromSnapShot(SSnapContext *ctx);
|
SMetaTableInfo getUidfromSnapShot(SSnapContext *ctx);
|
||||||
|
|
|
@ -260,12 +260,12 @@ static void saveSuperTableInfoForChildTable(SMetaEntry* me, SHashObj* suidInfo)
|
||||||
taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable));
|
taosHashPut(suidInfo, &me->uid, sizeof(tb_uid_t), &dataTmp, sizeof(STableInfoForChildTable));
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta,
|
int32_t buildSnapContext(SVnode* pVnode, int64_t snapVersion, int64_t suid, int8_t subType, bool withMeta,
|
||||||
SSnapContext** ctxRet) {
|
SSnapContext** ctxRet) {
|
||||||
SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext));
|
SSnapContext* ctx = taosMemoryCalloc(1, sizeof(SSnapContext));
|
||||||
if (ctx == NULL) return -1;
|
if (ctx == NULL) return -1;
|
||||||
*ctxRet = ctx;
|
*ctxRet = ctx;
|
||||||
ctx->pMeta = pMeta;
|
ctx->pMeta = pVnode->pMeta;
|
||||||
ctx->snapVersion = snapVersion;
|
ctx->snapVersion = snapVersion;
|
||||||
ctx->suid = suid;
|
ctx->suid = suid;
|
||||||
ctx->subType = subType;
|
ctx->subType = subType;
|
||||||
|
@ -301,7 +301,7 @@ int32_t buildSnapContext(SMeta* pMeta, int64_t snapVersion, int64_t suid, int8_t
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdbTbGet(pMeta->pUidIdx, &tmp->uid, sizeof(tb_uid_t), NULL, NULL) <
|
if (tdbTbGet(ctx->pMeta->pUidIdx, &tmp->uid, sizeof(tb_uid_t), NULL, NULL) <
|
||||||
0) { // check if table exist for now, need optimize later
|
0) { // check if table exist for now, need optimize later
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -276,12 +276,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
SReadHandle handle = {
|
SReadHandle handle = { .vnode = pVnode, .initTqReader = 1, .pStateBackend = pStreamState };
|
||||||
.meta = pVnode->pMeta,
|
|
||||||
.vnode = pVnode,
|
|
||||||
.initTqReader = 1,
|
|
||||||
.pStateBackend = pStreamState,
|
|
||||||
};
|
|
||||||
pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode));
|
pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode));
|
||||||
if (!pRSmaInfo->taskInfo[idx]) {
|
if (!pRSmaInfo->taskInfo[idx]) {
|
||||||
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
|
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
|
||||||
|
@ -853,12 +848,7 @@ static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t
|
||||||
code = qSerializeTaskStatus(srcTaskInfo, &pOutput, &len);
|
code = qSerializeTaskStatus(srcTaskInfo, &pOutput, &len);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
SReadHandle handle = {
|
SReadHandle handle = { .vnode = pVnode, .initTqReader = 1 };
|
||||||
.meta = pVnode->pMeta,
|
|
||||||
.vnode = pVnode,
|
|
||||||
.initTqReader = 1,
|
|
||||||
};
|
|
||||||
|
|
||||||
if (ASSERTS(!dstTaskInfo, "dstTaskInfo:%p is not NULL", dstTaskInfo)) {
|
if (ASSERTS(!dstTaskInfo, "dstTaskInfo:%p is not NULL", dstTaskInfo)) {
|
||||||
code = TSDB_CODE_APP_ERROR;
|
code = TSDB_CODE_APP_ERROR;
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
|
@ -670,8 +670,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
int64_t ver = pRef->refVer;
|
int64_t ver = pRef->refVer;
|
||||||
pHandle->pRef = pRef;
|
pHandle->pRef = pRef;
|
||||||
|
|
||||||
SReadHandle handle = {
|
SReadHandle handle = {.vnode = pVnode, .initTableReader = true, .initTqReader = true, .version = ver};
|
||||||
.meta = pVnode->pMeta, .vnode = pVnode, .initTableReader = true, .initTqReader = true, .version = ver};
|
|
||||||
pHandle->snapshotVer = ver;
|
pHandle->snapshotVer = ver;
|
||||||
|
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
|
@ -689,7 +688,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
|
|
||||||
pHandle->execHandle.execDb.pFilterOutTbUid =
|
pHandle->execHandle.execDb.pFilterOutTbUid =
|
||||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||||
buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
|
buildSnapContext(handle.vnode, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
|
||||||
(SSnapContext**)(&handle.sContext));
|
(SSnapContext**)(&handle.sContext));
|
||||||
|
|
||||||
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
|
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
|
||||||
|
@ -708,7 +707,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList);
|
tqReaderSetTbUidList(pHandle->execHandle.pTqReader, tbUidList);
|
||||||
taosArrayDestroy(tbUidList);
|
taosArrayDestroy(tbUidList);
|
||||||
|
|
||||||
buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
|
buildSnapContext(handle.vnode, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
|
||||||
(SSnapContext**)(&handle.sContext));
|
(SSnapContext**)(&handle.sContext));
|
||||||
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
|
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
|
||||||
}
|
}
|
||||||
|
@ -787,8 +786,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SReadHandle handle = {
|
SReadHandle handle = {.vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState};
|
||||||
.meta = pTq->pVnode->pMeta, .vnode = pTq->pVnode, .initTqReader = 1, .pStateBackend = pTask->pState};
|
|
||||||
|
|
||||||
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
|
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId);
|
||||||
if (pTask->exec.pExecutor == NULL) {
|
if (pTask->exec.pExecutor == NULL) {
|
||||||
|
|
|
@ -298,11 +298,10 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
walSetRefVer(handle.pRef, handle.snapshotVer);
|
walSetRefVer(handle.pRef, handle.snapshotVer);
|
||||||
|
|
||||||
SReadHandle reader = {
|
SReadHandle reader = {
|
||||||
.meta = pTq->pVnode->pMeta,
|
|
||||||
.vnode = pTq->pVnode,
|
.vnode = pTq->pVnode,
|
||||||
.initTableReader = true,
|
.initTableReader = true,
|
||||||
.initTqReader = true,
|
.initTqReader = true,
|
||||||
.version = handle.snapshotVer,
|
.version = handle.snapshotVer
|
||||||
};
|
};
|
||||||
|
|
||||||
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
|
@ -330,7 +329,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||||
handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode);
|
handle.execHandle.pTqReader = tqReaderOpen(pTq->pVnode);
|
||||||
|
|
||||||
buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta,
|
buildSnapContext(reader.vnode, reader.version, 0, handle.execHandle.subType, handle.fetchMeta,
|
||||||
(SSnapContext**)(&reader.sContext));
|
(SSnapContext**)(&reader.sContext));
|
||||||
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0);
|
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0);
|
||||||
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
|
@ -347,7 +346,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList);
|
tqReaderSetTbUidList(handle.execHandle.pTqReader, tbUidList);
|
||||||
taosArrayDestroy(tbUidList);
|
taosArrayDestroy(tbUidList);
|
||||||
|
|
||||||
buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType,
|
buildSnapContext(reader.vnode, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType,
|
||||||
handle.fetchMeta, (SSnapContext**)(&reader.sContext));
|
handle.fetchMeta, (SSnapContext**)(&reader.sContext));
|
||||||
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0);
|
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0);
|
||||||
}
|
}
|
||||||
|
|
|
@ -250,7 +250,7 @@ static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
uint8_t *pCont;
|
uint8_t *pCont;
|
||||||
SEncoder *pCoder = &(SEncoder){0};
|
SEncoder *pCoder = &(SEncoder){0};
|
||||||
SDeleteRes res = {0};
|
SDeleteRes res = {0};
|
||||||
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
SReadHandle handle = {.config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
||||||
|
|
||||||
code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
|
code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
|
@ -1203,7 +1203,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
|
||||||
|
|
||||||
void *pAllocMsg = NULL;
|
void *pAllocMsg = NULL;
|
||||||
SSubmitReq2Msg *pMsg = (SSubmitReq2Msg *)pReq;
|
SSubmitReq2Msg *pMsg = (SSubmitReq2Msg *)pReq;
|
||||||
if (0 == pMsg->ver) {
|
if (0 == pMsg->version) {
|
||||||
code = vnodeSubmitReqConvertToSubmitReq2(pVnode, (SSubmitReq *)pMsg, pSubmitReq);
|
code = vnodeSubmitReqConvertToSubmitReq2(pVnode, (SSubmitReq *)pMsg, pSubmitReq);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = vnodeRebuildSubmitReqMsg(pSubmitReq, &pReq);
|
code = vnodeRebuildSubmitReqMsg(pSubmitReq, &pReq);
|
||||||
|
|
Loading…
Reference in New Issue