diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index e95878b04e..04103ec26d 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -80,7 +80,6 @@ struct SVnodeInfo { }; struct SVnode { - int32_t vgId; char* path; SVnodeCfg config; SVState state; @@ -96,6 +95,8 @@ struct SVnode { STfs* pTfs; }; +#define TD_VID(PVNODE) (PVNODE)->config.vgId + // sma void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index bfcc5dd2bb..3a4168e759 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -267,7 +267,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, - pTq->pVnode->vgId, pReq->currentOffset, fetchOffset); + TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset); SMqPollRsp rsp = { /*.consumerId = consumerId,*/ @@ -277,7 +277,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); if (pConsumer == NULL) { - vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, pTq->pVnode->vgId); + vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, TD_VID(pTq->pVnode)); pMsg->pCont = NULL; pMsg->contLen = 0; pMsg->code = -1; @@ -303,7 +303,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } if (pTopic == NULL) { vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic, - pTq->pVnode->vgId); + TD_VID(pTq->pVnode)); pMsg->pCont = NULL; pMsg->contLen = 0; pMsg->code = -1; @@ -312,7 +312,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch, - pTq->pVnode->vgId); + TD_VID(pTq->pVnode)); rsp.reqOffset = pReq->currentOffset; rsp.skipLogNum = 0; @@ -323,7 +323,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { consumerEpoch = atomic_load_32(&pConsumer->epoch); if (consumerEpoch > reqEpoch) { vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d", - consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, consumerEpoch, reqEpoch); + consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch); break; } SWalReadHead* pHead; @@ -332,11 +332,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { // if data inserted during waiting, launch query and // response to user vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, - pTq->pVnode->vgId, fetchOffset); + TD_VID(pTq->pVnode), fetchOffset); break; } vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, - pTq->pVnode->vgId, fetchOffset, pHead->msgType); + TD_VID(pTq->pVnode), fetchOffset, pHead->msgType); /*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/ /*pHead = pTopic->pReadhandle->pHead;*/ if (pHead->msgType == TDMT_VND_SUBMIT) { @@ -361,7 +361,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { if (taosArrayGetSize(pRes) == 0) { vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, - pReq->epoch, pTq->pVnode->vgId, fetchOffset); + pReq->epoch, TD_VID(pTq->pVnode), fetchOffset); fetchOffset++; rsp.skipLogNum++; taosArrayDestroy(pRes); @@ -390,7 +390,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->pCont = buf; pMsg->contLen = tlen; pMsg->code = 0; - vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset, + vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset, pHead->msgType, consumerId, pReq->epoch); tmsgSendRsp(pMsg); taosMemoryFree(pHead); @@ -422,7 +422,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->contLen = tlen; pMsg->code = 0; tmsgSendRsp(pMsg); - vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId, + vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch); /*}*/ @@ -446,14 +446,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, - pTq->pVnode->vgId, pReq->currentOffset, fetchOffset); + TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset); SMqPollRspV2 rspV2 = {0}; rspV2.dataLen = 0; STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); if (pConsumer == NULL) { - vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, pTq->pVnode->vgId); + vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, TD_VID(pTq->pVnode)); pMsg->pCont = NULL; pMsg->contLen = 0; pMsg->code = -1; @@ -479,7 +479,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } if (pTopic == NULL) { vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic, - pTq->pVnode->vgId); + TD_VID(pTq->pVnode)); pMsg->pCont = NULL; pMsg->contLen = 0; pMsg->code = -1; @@ -488,7 +488,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch, - pTq->pVnode->vgId); + TD_VID(pTq->pVnode)); rspV2.reqOffset = pReq->currentOffset; rspV2.skipLogNum = 0; @@ -499,7 +499,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { consumerEpoch = atomic_load_32(&pConsumer->epoch); if (consumerEpoch > reqEpoch) { vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d", - consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, consumerEpoch, reqEpoch); + consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch); break; } SWalReadHead* pHead; @@ -508,11 +508,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { // if data inserted during waiting, launch query and // response to user vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, - pTq->pVnode->vgId, fetchOffset); + TD_VID(pTq->pVnode), fetchOffset); break; } vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, - pTq->pVnode->vgId, fetchOffset, pHead->msgType); + TD_VID(pTq->pVnode), fetchOffset, pHead->msgType); /*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/ /*pHead = pTopic->pReadhandle->pHead;*/ if (pHead->msgType == TDMT_VND_SUBMIT) { @@ -537,7 +537,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { if (taosArrayGetSize(pRes) == 0) { vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, - pReq->epoch, pTq->pVnode->vgId, fetchOffset); + pReq->epoch, TD_VID(pTq->pVnode), fetchOffset); fetchOffset++; rspV2.skipLogNum++; taosArrayDestroy(pRes); @@ -597,7 +597,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->pCont = buf; pMsg->contLen = msgLen; pMsg->code = 0; - vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset, + vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset, pHead->msgType, consumerId, pReq->epoch); tmsgSendRsp(pMsg); taosMemoryFree(pHead); @@ -631,7 +631,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->contLen = tlen; pMsg->code = 0; tmsgSendRsp(pMsg); - vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId, + vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch); /*}*/ @@ -742,7 +742,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle); ASSERT(pTopic->buffer.output[i].task); } - vDebug("set topic %s to consumer %ld on vg %d", pTopic->topicName, req.consumerId, pTq->pVnode->vgId); + vDebug("set topic %s to consumer %ld on vg %d", pTopic->topicName, req.consumerId, TD_VID(pTq->pVnode)); taosArrayPush(pConsumer->topics, pTopic); if (create) { tqHandleMovePut(pTq->tqMeta, req.consumerId, pConsumer); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 7e17af0180..c8943a023a 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -70,8 +70,6 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { info.config.pTfs = pTfs; info.config.msgCb = msgCb; - // memset(&info.config.walCfg, 0, sizeof(SWalCfg)); - // info.config.walCfg.level = TAOS_WAL_WRITE; // crate handle pVnode = vnodeNew(dir, &info.config); @@ -106,7 +104,6 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) { return NULL; } - pVnode->vgId = pVnodeCfg->vgId; pVnode->msgCb = pVnodeCfg->msgCb; pVnode->pTfs = pVnodeCfg->pTfs; pVnode->path = strdup(path); @@ -144,7 +141,7 @@ static int vnodeOpenImpl(SVnode *pVnode) { // Open tsdb sprintf(dir, "%s/tsdb", pVnode->path); pVnode->pTsdb = - tsdbOpen(dir, pVnode->vgId, &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta, pVnode->pTfs); + tsdbOpen(dir, TD_VID(pVnode), &(pVnode->config.tsdbCfg), vBufPoolGetMAF(pVnode), pVnode->pMeta, pVnode->pTfs); if (pVnode->pTsdb == NULL) { // TODO: handle error return -1; diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 75079d50b2..4202c02a0c 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -16,7 +16,7 @@ #include "vnodeInt.h" int vnodeQueryOpen(SVnode *pVnode) { - return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, (void **)&pVnode->pQuery, &pVnode->msgCb); + return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), NULL, (void **)&pVnode->pQuery, &pVnode->msgCb); } void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); } @@ -101,7 +101,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { metaRsp.numOfColumns = nCols; metaRsp.tableType = pTbCfg->type; metaRsp.tuid = uid; - metaRsp.vgId = pVnode->vgId; + metaRsp.vgId = TD_VID(pVnode); memcpy(metaRsp.pSchemas, pSW->pSchema, sizeof(SSchema) * pSW->nCols); if (nTagCols) { @@ -151,7 +151,7 @@ _exit: } int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { - pLoad->vgId = pVnode->vgId; + pLoad->vgId = TD_VID(pVnode); pLoad->role = TAOS_SYNC_STATE_LEADER; pLoad->numOfTables = metaGetTbNum(pVnode->pMeta); pLoad->numOfTimeSeries = 400; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 87cef53042..288241eb66 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -36,7 +36,7 @@ void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs) { if (walWrite(pVnode->pWal, ver, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) { // TODO: handle error /*ASSERT(false);*/ - vError("vnode:%d write wal error since %s", pVnode->vgId, terrstr()); + vError("vnode:%d write wal error since %s", TD_VID(pVnode), terrstr()); } } @@ -73,12 +73,12 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { case TDMT_VND_ALTER_STB: return vnodeProcessAlterStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))); case TDMT_VND_DROP_STB: - vTrace("vgId:%d, process drop stb req", pVnode->vgId); + vTrace("vgId:%d, process drop stb req", TD_VID(pVnode)); break; case TDMT_VND_DROP_TABLE: break; case TDMT_VND_SUBMIT: - /*printf("vnode %d write data %ld\n", pVnode->vgId, ver);*/ + /*printf("vnode %d write data %ld\n", TD_VID(pVnode), ver);*/ if (pVnode->config.streamMode == 0) { *pRsp = taosMemoryCalloc(1, sizeof(SRpcMsg)); (*pRsp)->handle = pMsg->handle; @@ -245,7 +245,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) { // TODO: handle error - vError("vgId:%d, failed to create table: %s", pVnode->vgId, pCreateTbReq->name); + vError("vgId:%d, failed to create table: %s", TD_VID(pVnode), pCreateTbReq->name); } // TODO: to encapsule a free API taosMemoryFree(pCreateTbReq->name); @@ -268,7 +268,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR } } - vTrace("vgId:%d process create %" PRIzu " tables", pVnode->vgId, taosArrayGetSize(vCreateTbBatchReq.pArray)); + vTrace("vgId:%d process create %" PRIzu " tables", TD_VID(pVnode), taosArrayGetSize(vCreateTbBatchReq.pArray)); taosArrayDestroy(vCreateTbBatchReq.pArray); if (vCreateTbBatchRsp.rspList) { int32_t contLen = tSerializeSVCreateTbBatchRsp(NULL, 0, &vCreateTbBatchRsp); @@ -289,7 +289,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq) { SVCreateTbReq vAlterTbReq = {0}; - vTrace("vgId:%d, process alter stb req", pVnode->vgId); + vTrace("vgId:%d, process alter stb req", TD_VID(pVnode)); tDeserializeSVCreateTbReq(pReq, &vAlterTbReq); // TODO: to encapsule a free API taosMemoryFree(vAlterTbReq.stbCfg.pSchema);