refact
This commit is contained in:
parent
913749ea91
commit
2ad1f30fae
|
@ -80,7 +80,6 @@ struct SVnodeInfo {
|
|||
};
|
||||
|
||||
struct SVnode {
|
||||
int32_t vgId;
|
||||
char* path;
|
||||
SVnodeCfg config;
|
||||
SVState state;
|
||||
|
@ -96,6 +95,8 @@ struct SVnode {
|
|||
STfs* pTfs;
|
||||
};
|
||||
|
||||
#define TD_VID(PVNODE) (PVNODE)->config.vgId
|
||||
|
||||
// sma
|
||||
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
|
||||
|
||||
|
|
|
@ -267,7 +267,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
}
|
||||
|
||||
vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
|
||||
pTq->pVnode->vgId, pReq->currentOffset, fetchOffset);
|
||||
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
|
||||
|
||||
SMqPollRsp rsp = {
|
||||
/*.consumerId = consumerId,*/
|
||||
|
@ -277,7 +277,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
|
||||
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
|
||||
if (pConsumer == NULL) {
|
||||
vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, pTq->pVnode->vgId);
|
||||
vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, TD_VID(pTq->pVnode));
|
||||
pMsg->pCont = NULL;
|
||||
pMsg->contLen = 0;
|
||||
pMsg->code = -1;
|
||||
|
@ -303,7 +303,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
}
|
||||
if (pTopic == NULL) {
|
||||
vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic,
|
||||
pTq->pVnode->vgId);
|
||||
TD_VID(pTq->pVnode));
|
||||
pMsg->pCont = NULL;
|
||||
pMsg->contLen = 0;
|
||||
pMsg->code = -1;
|
||||
|
@ -312,7 +312,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
}
|
||||
|
||||
vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch,
|
||||
pTq->pVnode->vgId);
|
||||
TD_VID(pTq->pVnode));
|
||||
|
||||
rsp.reqOffset = pReq->currentOffset;
|
||||
rsp.skipLogNum = 0;
|
||||
|
@ -323,7 +323,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
consumerEpoch = atomic_load_32(&pConsumer->epoch);
|
||||
if (consumerEpoch > reqEpoch) {
|
||||
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d",
|
||||
consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, consumerEpoch, reqEpoch);
|
||||
consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch);
|
||||
break;
|
||||
}
|
||||
SWalReadHead* pHead;
|
||||
|
@ -332,11 +332,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
// if data inserted during waiting, launch query and
|
||||
// response to user
|
||||
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
|
||||
pTq->pVnode->vgId, fetchOffset);
|
||||
TD_VID(pTq->pVnode), fetchOffset);
|
||||
break;
|
||||
}
|
||||
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
|
||||
pTq->pVnode->vgId, fetchOffset, pHead->msgType);
|
||||
TD_VID(pTq->pVnode), fetchOffset, pHead->msgType);
|
||||
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
|
||||
/*pHead = pTopic->pReadhandle->pHead;*/
|
||||
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
||||
|
@ -361,7 +361,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
|
||||
if (taosArrayGetSize(pRes) == 0) {
|
||||
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId,
|
||||
pReq->epoch, pTq->pVnode->vgId, fetchOffset);
|
||||
pReq->epoch, TD_VID(pTq->pVnode), fetchOffset);
|
||||
fetchOffset++;
|
||||
rsp.skipLogNum++;
|
||||
taosArrayDestroy(pRes);
|
||||
|
@ -390,7 +390,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
pMsg->pCont = buf;
|
||||
pMsg->contLen = tlen;
|
||||
pMsg->code = 0;
|
||||
vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset,
|
||||
vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset,
|
||||
pHead->msgType, consumerId, pReq->epoch);
|
||||
tmsgSendRsp(pMsg);
|
||||
taosMemoryFree(pHead);
|
||||
|
@ -422,7 +422,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
pMsg->contLen = tlen;
|
||||
pMsg->code = 0;
|
||||
tmsgSendRsp(pMsg);
|
||||
vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId,
|
||||
vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId,
|
||||
pReq->epoch);
|
||||
/*}*/
|
||||
|
||||
|
@ -446,14 +446,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
}
|
||||
|
||||
vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
|
||||
pTq->pVnode->vgId, pReq->currentOffset, fetchOffset);
|
||||
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
|
||||
|
||||
SMqPollRspV2 rspV2 = {0};
|
||||
rspV2.dataLen = 0;
|
||||
|
||||
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
|
||||
if (pConsumer == NULL) {
|
||||
vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, pTq->pVnode->vgId);
|
||||
vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, TD_VID(pTq->pVnode));
|
||||
pMsg->pCont = NULL;
|
||||
pMsg->contLen = 0;
|
||||
pMsg->code = -1;
|
||||
|
@ -479,7 +479,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
}
|
||||
if (pTopic == NULL) {
|
||||
vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic,
|
||||
pTq->pVnode->vgId);
|
||||
TD_VID(pTq->pVnode));
|
||||
pMsg->pCont = NULL;
|
||||
pMsg->contLen = 0;
|
||||
pMsg->code = -1;
|
||||
|
@ -488,7 +488,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
}
|
||||
|
||||
vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch,
|
||||
pTq->pVnode->vgId);
|
||||
TD_VID(pTq->pVnode));
|
||||
|
||||
rspV2.reqOffset = pReq->currentOffset;
|
||||
rspV2.skipLogNum = 0;
|
||||
|
@ -499,7 +499,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
consumerEpoch = atomic_load_32(&pConsumer->epoch);
|
||||
if (consumerEpoch > reqEpoch) {
|
||||
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d",
|
||||
consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, consumerEpoch, reqEpoch);
|
||||
consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch);
|
||||
break;
|
||||
}
|
||||
SWalReadHead* pHead;
|
||||
|
@ -508,11 +508,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
// if data inserted during waiting, launch query and
|
||||
// response to user
|
||||
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
|
||||
pTq->pVnode->vgId, fetchOffset);
|
||||
TD_VID(pTq->pVnode), fetchOffset);
|
||||
break;
|
||||
}
|
||||
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
|
||||
pTq->pVnode->vgId, fetchOffset, pHead->msgType);
|
||||
TD_VID(pTq->pVnode), fetchOffset, pHead->msgType);
|
||||
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
|
||||
/*pHead = pTopic->pReadhandle->pHead;*/
|
||||
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
||||
|
@ -537,7 +537,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
|
||||
if (taosArrayGetSize(pRes) == 0) {
|
||||
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId,
|
||||
pReq->epoch, pTq->pVnode->vgId, fetchOffset);
|
||||
pReq->epoch, TD_VID(pTq->pVnode), fetchOffset);
|
||||
fetchOffset++;
|
||||
rspV2.skipLogNum++;
|
||||
taosArrayDestroy(pRes);
|
||||
|
@ -597,7 +597,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
pMsg->pCont = buf;
|
||||
pMsg->contLen = msgLen;
|
||||
pMsg->code = 0;
|
||||
vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset,
|
||||
vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset,
|
||||
pHead->msgType, consumerId, pReq->epoch);
|
||||
tmsgSendRsp(pMsg);
|
||||
taosMemoryFree(pHead);
|
||||
|
@ -631,7 +631,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
pMsg->contLen = tlen;
|
||||
pMsg->code = 0;
|
||||
tmsgSendRsp(pMsg);
|
||||
vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId,
|
||||
vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId,
|
||||
pReq->epoch);
|
||||
/*}*/
|
||||
|
||||
|
@ -742,7 +742,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
|
|||
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle);
|
||||
ASSERT(pTopic->buffer.output[i].task);
|
||||
}
|
||||
vDebug("set topic %s to consumer %ld on vg %d", pTopic->topicName, req.consumerId, pTq->pVnode->vgId);
|
||||
vDebug("set topic %s to consumer %ld on vg %d", pTopic->topicName, req.consumerId, TD_VID(pTq->pVnode));
|
||||
taosArrayPush(pConsumer->topics, pTopic);
|
||||
if (create) {
|
||||
tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer);
|
||||
|
|
|
@ -70,8 +70,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
|||
|
||||
info.config.pTfs = pTfs;
|
||||
info.config.msgCb = msgCb;
|
||||
// memset(&info.config.walCfg, 0, sizeof(SWalCfg));
|
||||
// info.config.walCfg.level = TAOS_WAL_WRITE;
|
||||
|
||||
// crate handle
|
||||
pVnode = vnodeNew(dir, &info.config);
|
||||
|
@ -106,7 +104,6 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pVnode->vgId = pVnodeCfg->vgId;
|
||||
pVnode->msgCb = pVnodeCfg->msgCb;
|
||||
pVnode->pTfs = pVnodeCfg->pTfs;
|
||||
pVnode->path = strdup(path);
|
||||
|
@ -144,7 +141,7 @@ static int vnodeOpenImpl(SVnode *pVnode) {
|
|||
// Open tsdb
|
||||
sprintf(dir, "%s/tsdb", pVnode->path);
|
||||
pVnode->pTsdb =
|
||||
tsdbOpen(dir, pVnode->vgId, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta, pVnode->pTfs);
|
||||
tsdbOpen(dir, TD_VID(pVnode), &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta, pVnode->pTfs);
|
||||
if (pVnode->pTsdb == NULL) {
|
||||
// TODO: handle error
|
||||
return -1;
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#include "vnodeInt.h"
|
||||
|
||||
int vnodeQueryOpen(SVnode *pVnode) {
|
||||
return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, (void **)&pVnode->pQuery, &pVnode->msgCb);
|
||||
return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), NULL, (void **)&pVnode->pQuery, &pVnode->msgCb);
|
||||
}
|
||||
|
||||
void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); }
|
||||
|
@ -101,7 +101,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
metaRsp.numOfColumns = nCols;
|
||||
metaRsp.tableType = pTbCfg->type;
|
||||
metaRsp.tuid = uid;
|
||||
metaRsp.vgId = pVnode->vgId;
|
||||
metaRsp.vgId = TD_VID(pVnode);
|
||||
|
||||
memcpy(metaRsp.pSchemas, pSW->pSchema, sizeof(SSchema) * pSW->nCols);
|
||||
if (nTagCols) {
|
||||
|
@ -151,7 +151,7 @@ _exit:
|
|||
}
|
||||
|
||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
|
||||
pLoad->vgId = pVnode->vgId;
|
||||
pLoad->vgId = TD_VID(pVnode);
|
||||
pLoad->role = TAOS_SYNC_STATE_LEADER;
|
||||
pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);
|
||||
pLoad->numOfTimeSeries = 400;
|
||||
|
|
|
@ -36,7 +36,7 @@ void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs) {
|
|||
if (walWrite(pVnode->pWal, ver, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) {
|
||||
// TODO: handle error
|
||||
/*ASSERT(false);*/
|
||||
vError("vnode:%d write wal error since %s", pVnode->vgId, terrstr());
|
||||
vError("vnode:%d write wal error since %s", TD_VID(pVnode), terrstr());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -73,12 +73,12 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|||
case TDMT_VND_ALTER_STB:
|
||||
return vnodeProcessAlterStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)));
|
||||
case TDMT_VND_DROP_STB:
|
||||
vTrace("vgId:%d, process drop stb req", pVnode->vgId);
|
||||
vTrace("vgId:%d, process drop stb req", TD_VID(pVnode));
|
||||
break;
|
||||
case TDMT_VND_DROP_TABLE:
|
||||
break;
|
||||
case TDMT_VND_SUBMIT:
|
||||
/*printf("vnode %d write data %ld\n", pVnode->vgId, ver);*/
|
||||
/*printf("vnode %d write data %ld\n", TD_VID(pVnode), ver);*/
|
||||
if (pVnode->config.streamMode == 0) {
|
||||
*pRsp = taosMemoryCalloc(1, sizeof(SRpcMsg));
|
||||
(*pRsp)->handle = pMsg->handle;
|
||||
|
@ -245,7 +245,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
|
|||
|
||||
if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) {
|
||||
// TODO: handle error
|
||||
vError("vgId:%d, failed to create table: %s", pVnode->vgId, pCreateTbReq->name);
|
||||
vError("vgId:%d, failed to create table: %s", TD_VID(pVnode), pCreateTbReq->name);
|
||||
}
|
||||
// TODO: to encapsule a free API
|
||||
taosMemoryFree(pCreateTbReq->name);
|
||||
|
@ -268,7 +268,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
|
|||
}
|
||||
}
|
||||
|
||||
vTrace("vgId:%d process create %" PRIzu " tables", pVnode->vgId, taosArrayGetSize(vCreateTbBatchReq.pArray));
|
||||
vTrace("vgId:%d process create %" PRIzu " tables", TD_VID(pVnode), taosArrayGetSize(vCreateTbBatchReq.pArray));
|
||||
taosArrayDestroy(vCreateTbBatchReq.pArray);
|
||||
if (vCreateTbBatchRsp.rspList) {
|
||||
int32_t contLen = tSerializeSVCreateTbBatchRsp(NULL, 0, &vCreateTbBatchRsp);
|
||||
|
@ -289,7 +289,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
|
|||
|
||||
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq) {
|
||||
SVCreateTbReq vAlterTbReq = {0};
|
||||
vTrace("vgId:%d, process alter stb req", pVnode->vgId);
|
||||
vTrace("vgId:%d, process alter stb req", TD_VID(pVnode));
|
||||
tDeserializeSVCreateTbReq(pReq, &vAlterTbReq);
|
||||
// TODO: to encapsule a free API
|
||||
taosMemoryFree(vAlterTbReq.stbCfg.pSchema);
|
||||
|
|
Loading…
Reference in New Issue