refactor: do some internal refactor and add some logs for tmq.
This commit is contained in:
parent
0369ff2231
commit
b46098793d
|
@ -29,7 +29,10 @@ extern "C" {
|
||||||
#define calloc CALLOC_FUNC_TAOS_FORBID
|
#define calloc CALLOC_FUNC_TAOS_FORBID
|
||||||
#define realloc REALLOC_FUNC_TAOS_FORBID
|
#define realloc REALLOC_FUNC_TAOS_FORBID
|
||||||
#define free FREE_FUNC_TAOS_FORBID
|
#define free FREE_FUNC_TAOS_FORBID
|
||||||
|
#ifdef strdup
|
||||||
|
#undef strdup
|
||||||
#define strdup STRDUP_FUNC_TAOS_FORBID
|
#define strdup STRDUP_FUNC_TAOS_FORBID
|
||||||
|
#endif
|
||||||
#endif // ifndef ALLOW_FORBID_FUNC
|
#endif // ifndef ALLOW_FORBID_FUNC
|
||||||
|
|
||||||
#endif // if !defined(WINDOWS)
|
#endif // if !defined(WINDOWS)
|
||||||
|
|
|
@ -15,17 +15,11 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndConsumer.h"
|
#include "mndConsumer.h"
|
||||||
#include "mndDb.h"
|
|
||||||
#include "mndDnode.h"
|
|
||||||
#include "mndMnode.h"
|
|
||||||
#include "mndPrivilege.h"
|
#include "mndPrivilege.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndStb.h"
|
|
||||||
#include "mndSubscribe.h"
|
#include "mndSubscribe.h"
|
||||||
#include "mndTopic.h"
|
#include "mndTopic.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "mndUser.h"
|
|
||||||
#include "mndVgroup.h"
|
|
||||||
#include "tcompare.h"
|
#include "tcompare.h"
|
||||||
#include "tname.h"
|
#include "tname.h"
|
||||||
|
|
||||||
|
@ -209,6 +203,7 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) {
|
||||||
taosMemoryFree(pConsumerNew);
|
taosMemoryFree(pConsumerNew);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
FAIL:
|
FAIL:
|
||||||
tDeleteSMqConsumerObj(pConsumerNew);
|
tDeleteSMqConsumerObj(pConsumerNew);
|
||||||
taosMemoryFree(pConsumerNew);
|
taosMemoryFree(pConsumerNew);
|
||||||
|
@ -580,6 +575,10 @@ static int32_t validateTopics(const SArray* pTopicList, SMnode* pMnode, const ch
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void* topicNameDup(void* p){
|
||||||
|
return taosStrdup((char*) p);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
char *msgStr = pMsg->pCont;
|
char *msgStr = pMsg->pCont;
|
||||||
|
@ -616,15 +615,15 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
|
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
|
||||||
tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);
|
tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);
|
||||||
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
|
|
||||||
taosArrayDestroy(pConsumerNew->rebNewTopics);
|
|
||||||
pConsumerNew->rebNewTopics = pTopicList; // all subscribe topics should re-balance.
|
|
||||||
subscribe.topicNames = NULL;
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < newTopicNum; i++) {
|
// set the update type
|
||||||
char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, i));
|
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
|
||||||
taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy);
|
pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
|
||||||
}
|
|
||||||
|
// all subscribed topics should re-balance.
|
||||||
|
taosArrayDestroy(pConsumerNew->rebNewTopics);
|
||||||
|
pConsumerNew->rebNewTopics = pTopicList;
|
||||||
|
subscribe.topicNames = NULL;
|
||||||
|
|
||||||
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
|
if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) goto _over;
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
|
||||||
|
@ -646,17 +645,11 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
goto _over;
|
goto _over;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// set the update type
|
||||||
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
|
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
|
||||||
|
pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup);
|
||||||
|
|
||||||
for (int32_t i = 0; i < newTopicNum; i++) {
|
int32_t oldTopicNum = (pExistedConsumer->currentTopics)? taosArrayGetSize(pExistedConsumer->currentTopics):0;
|
||||||
char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, i));
|
|
||||||
taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t oldTopicNum = 0;
|
|
||||||
if (pExistedConsumer->currentTopics) {
|
|
||||||
oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t i = 0, j = 0;
|
int32_t i = 0, j = 0;
|
||||||
while (i < oldTopicNum || j < newTopicNum) {
|
while (i < oldTopicNum || j < newTopicNum) {
|
||||||
|
@ -692,11 +685,8 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pExistedConsumer && taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 &&
|
// no topics need to be rebalanced
|
||||||
taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
|
if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
|
||||||
/*if (taosArrayGetSize(pConsumerNew->assignedTopics) == 0) {*/
|
|
||||||
/*pConsumerNew->updateType = */
|
|
||||||
/*}*/
|
|
||||||
goto _over;
|
goto _over;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -718,8 +708,9 @@ _over:
|
||||||
tDeleteSMqConsumerObj(pConsumerNew);
|
tDeleteSMqConsumerObj(pConsumerNew);
|
||||||
taosMemoryFree(pConsumerNew);
|
taosMemoryFree(pConsumerNew);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: replace with destroy subscribe msg
|
// TODO: replace with destroy subscribe msg
|
||||||
if (subscribe.topicNames) taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
|
taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -750,12 +741,12 @@ SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
|
||||||
CM_ENCODE_OVER:
|
CM_ENCODE_OVER:
|
||||||
taosMemoryFreeClear(buf);
|
taosMemoryFreeClear(buf);
|
||||||
if (terrno != 0) {
|
if (terrno != 0) {
|
||||||
mError("consumer:%" PRId64 ", failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
|
mError("consumer:0x%" PRIx64 " failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
|
||||||
sdbFreeRaw(pRaw);
|
sdbFreeRaw(pRaw);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
mTrace("consumer:%" PRId64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
|
mTrace("consumer:0x%" PRIx64 ", encode to raw:%p, row:%p", pConsumer->consumerId, pRaw, pConsumer);
|
||||||
return pRaw;
|
return pRaw;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -823,8 +814,8 @@ static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
|
static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer) {
|
||||||
mDebug("consumer:0x%" PRIx64 " perform delete action, status:%s", pConsumer->consumerId,
|
mDebug("consumer:0x%" PRIx64 " perform delete action, status:(%d)%s", pConsumer->consumerId,
|
||||||
mndConsumerStatusName(pConsumer->status));
|
pConsumer->status, mndConsumerStatusName(pConsumer->status));
|
||||||
tDeleteSMqConsumerObj(pConsumer);
|
tDeleteSMqConsumerObj(pConsumer);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1075,22 +1066,23 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
|
||||||
|
|
||||||
// consumer group
|
// consumer group
|
||||||
char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
|
char cgroup[TSDB_CGROUP_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
tstrncpy(varDataVal(cgroup), pConsumer->cgroup, TSDB_CGROUP_LEN);
|
STR_TO_VARSTR(cgroup, pConsumer->cgroup);
|
||||||
varDataSetLen(cgroup, strlen(varDataVal(cgroup)));
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)cgroup, false);
|
||||||
|
|
||||||
// client id
|
// client id
|
||||||
char clientId[256 + VARSTR_HEADER_SIZE] = {0};
|
char clientId[256 + VARSTR_HEADER_SIZE] = {0};
|
||||||
tstrncpy(varDataVal(clientId), pConsumer->clientId, 256);
|
STR_TO_VARSTR(clientId, pConsumer->clientId);
|
||||||
varDataSetLen(clientId, strlen(varDataVal(clientId)));
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)clientId, false);
|
||||||
|
|
||||||
// status
|
// status
|
||||||
char status[20 + VARSTR_HEADER_SIZE] = {0};
|
char status[20 + VARSTR_HEADER_SIZE] = {0};
|
||||||
tstrncpy(varDataVal(status), mndConsumerStatusName(pConsumer->status), 20);
|
const char* pStatusName = mndConsumerStatusName(pConsumer->status);
|
||||||
varDataSetLen(status, strlen(varDataVal(status)));
|
STR_TO_VARSTR(status, pStatusName);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)status, false);
|
||||||
|
|
||||||
|
@ -1123,8 +1115,11 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *
|
||||||
|
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosRUnLockLatch(&pConsumer->lock);
|
taosRUnLockLatch(&pConsumer->lock);
|
||||||
sdbRelease(pSdb, pConsumer);
|
sdbRelease(pSdb, pConsumer);
|
||||||
|
|
||||||
|
pBlock->info.rows = numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
pShow->numOfRows += numOfRows;
|
pShow->numOfRows += numOfRows;
|
||||||
|
|
|
@ -16,15 +16,10 @@
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndSubscribe.h"
|
#include "mndSubscribe.h"
|
||||||
#include "mndConsumer.h"
|
#include "mndConsumer.h"
|
||||||
#include "mndDb.h"
|
|
||||||
#include "mndDnode.h"
|
|
||||||
#include "mndMnode.h"
|
|
||||||
#include "mndScheduler.h"
|
#include "mndScheduler.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndStb.h"
|
|
||||||
#include "mndTopic.h"
|
#include "mndTopic.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "mndUser.h"
|
|
||||||
#include "mndVgroup.h"
|
#include "mndVgroup.h"
|
||||||
#include "tcompare.h"
|
#include "tcompare.h"
|
||||||
#include "tname.h"
|
#include "tname.h"
|
||||||
|
@ -1041,7 +1036,6 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
|
||||||
}
|
}
|
||||||
|
|
||||||
// do not show for cleared subscription
|
// do not show for cleared subscription
|
||||||
#if 1
|
|
||||||
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
|
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
|
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
|
||||||
|
@ -1087,8 +1081,6 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
|
||||||
|
|
||||||
pBlock->info.rows = numOfRows;
|
pBlock->info.rows = numOfRows;
|
||||||
|
|
||||||
taosRUnLockLatch(&pSub->lock);
|
taosRUnLockLatch(&pSub->lock);
|
||||||
|
|
|
@ -217,7 +217,7 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
|
||||||
char buf2[80] = {0};
|
char buf2[80] = {0};
|
||||||
tFormatOffset(buf1, tListLen(buf1), &pRsp->reqOffset);
|
tFormatOffset(buf1, tListLen(buf1), &pRsp->reqOffset);
|
||||||
tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset);
|
tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset);
|
||||||
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s",
|
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
|
||||||
TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
|
TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -275,7 +275,7 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
|
||||||
char buf2[80] = {0};
|
char buf2[80] = {0};
|
||||||
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
||||||
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
||||||
tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d), block num:%d, reqOffset:%s, rspOffset:%s",
|
tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d), block num:%d, req:%s, rsp:%s",
|
||||||
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -604,7 +604,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
code = -1;
|
code = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp data block:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "",
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "",
|
||||||
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
|
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
|
||||||
dataRsp.rspOffset.uid, dataRsp.rspOffset.ts);
|
dataRsp.rspOffset.uid, dataRsp.rspOffset.ts);
|
||||||
|
|
||||||
|
|
|
@ -113,9 +113,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pRsp->withTbName == false);
|
ASSERT(!(pRsp->withTbName || pRsp->withSchema));
|
||||||
ASSERT(pRsp->withSchema == false);
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -197,7 +197,7 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 "epoch:%d vgId:%d", pHandle->subKey,
|
tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 " epoch:%d vgId:%d", pHandle->subKey,
|
||||||
(int32_t)strlen(pHandle->subKey), pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode));
|
(int32_t)strlen(pHandle->subKey), pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode));
|
||||||
|
|
||||||
void* buf = taosMemoryCalloc(1, vlen);
|
void* buf = taosMemoryCalloc(1, vlen);
|
||||||
|
|
|
@ -193,7 +193,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
|
||||||
memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo));
|
memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo));
|
||||||
taosWUnLockLatch(&pHandle->pushHandle.lock);
|
taosWUnLockLatch(&pHandle->pushHandle.lock);
|
||||||
|
|
||||||
tqDebug("vgId:%d offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%" PRId64 ", rspOffset:%" PRId64,
|
tqDebug("vgId:%d offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, req:%" PRId64 ", rsp:%" PRId64,
|
||||||
TD_VID(pTq->pVnode), fetchOffset, pHandle->pushHandle.consumerId, pHandle->pushHandle.epoch, rsp.blockNum,
|
TD_VID(pTq->pVnode), fetchOffset, pHandle->pushHandle.consumerId, pHandle->pushHandle.epoch, rsp.blockNum,
|
||||||
rsp.reqOffset, rsp.rspOffset);
|
rsp.reqOffset, rsp.rspOffset);
|
||||||
|
|
||||||
|
@ -210,25 +210,30 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg));
|
void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg));
|
||||||
int32_t len = msgLen - sizeof(SSubmitReq2Msg);
|
int32_t len = msgLen - sizeof(SSubmitReq2Msg);
|
||||||
|
|
||||||
tqDebug("vgId:%d tq push msg version:%" PRId64 " type: %s, p head %p, p body %p, len %d", pTq->pVnode->config.vgId, ver,
|
|
||||||
TMSG_INFO(msgType), msg, pReq, len);
|
|
||||||
|
|
||||||
if (msgType == TDMT_VND_SUBMIT) {
|
if (msgType == TDMT_VND_SUBMIT) {
|
||||||
// lock push mgr to avoid potential msg lost
|
// lock push mgr to avoid potential msg lost
|
||||||
taosWLockLatch(&pTq->pushLock);
|
taosWLockLatch(&pTq->pushLock);
|
||||||
if (taosHashGetSize(pTq->pPushMgr) != 0) {
|
|
||||||
|
|
||||||
tqDebug("vgId:%d, push handle num %d", pTq->pVnode->config.vgId, taosHashGetSize(pTq->pPushMgr));
|
int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr);
|
||||||
|
if (numOfRegisteredPush > 0) {
|
||||||
|
tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d",
|
||||||
|
pTq->pVnode->config.vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush);
|
||||||
|
|
||||||
SArray* cachedKeys = taosArrayInit(0, sizeof(void*));
|
SArray* cachedKeys = taosArrayInit(0, sizeof(void*));
|
||||||
SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t));
|
SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t));
|
||||||
|
|
||||||
void* data = taosMemoryMalloc(len);
|
void* data = taosMemoryMalloc(len);
|
||||||
if (data == NULL) {
|
if (data == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
tqError("failed to copy data for stream since out of memory");
|
tqError("failed to copy data for stream since out of memory");
|
||||||
taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree);
|
taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree);
|
||||||
taosArrayDestroy(cachedKeyLens);
|
taosArrayDestroy(cachedKeyLens);
|
||||||
|
|
||||||
|
// unlock
|
||||||
|
taosWUnLockLatch(&pTq->pushLock);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(data, pReq, len);
|
memcpy(data, pReq, len);
|
||||||
|
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
|
@ -262,7 +267,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
};
|
};
|
||||||
qStreamSetScanMemData(task, submit);
|
qStreamSetScanMemData(task, submit);
|
||||||
|
|
||||||
// exec
|
// here start to scan submit block to extract the subscribed data
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pDataBlock = NULL;
|
SSDataBlock* pDataBlock = NULL;
|
||||||
uint64_t ts = 0;
|
uint64_t ts = 0;
|
||||||
|
@ -278,7 +283,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
pRsp->blockNum++;
|
pRsp->blockNum++;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("vgId:%d, tq handle push, subkey: %s, block num: %d", pTq->pVnode->config.vgId, pPushEntry->subKey,
|
tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d", pTq->pVnode->config.vgId, pPushEntry->subKey,
|
||||||
pRsp->blockNum);
|
pRsp->blockNum);
|
||||||
if (pRsp->blockNum > 0) {
|
if (pRsp->blockNum > 0) {
|
||||||
// set offset
|
// set offset
|
||||||
|
@ -295,6 +300,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
tqPushDataRsp(pTq, pPushEntry);
|
tqPushDataRsp(pTq, pPushEntry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// delete entry
|
// delete entry
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(cachedKeys); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(cachedKeys); i++) {
|
||||||
void* key = taosArrayGetP(cachedKeys, i);
|
void* key = taosArrayGetP(cachedKeys, i);
|
||||||
|
|
|
@ -292,9 +292,12 @@ void tqCloseReader(STqReader* pReader) {
|
||||||
|
|
||||||
int32_t tqSeekVer(STqReader* pReader, int64_t ver) {
|
int32_t tqSeekVer(STqReader* pReader, int64_t ver) {
|
||||||
if (walReadSeekVer(pReader->pWalReader, ver) < 0) {
|
if (walReadSeekVer(pReader->pWalReader, ver) < 0) {
|
||||||
|
tqError("tmq poll: wal reader failed to seek to ver:%"PRId64, ver);
|
||||||
return -1;
|
return -1;
|
||||||
|
} else {
|
||||||
|
tqDebug("tmq poll: wal reader seek to ver:%"PRId64, ver);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
|
int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
|
||||||
|
@ -302,28 +305,33 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (!fromProcessedMsg) {
|
if (!fromProcessedMsg) {
|
||||||
if (walNextValidMsg(pReader->pWalReader) < 0) {
|
SWalReader* pWalReader = pReader->pWalReader;
|
||||||
pReader->ver =
|
|
||||||
pReader->pWalReader->curVersion - (pReader->pWalReader->curInvalid | pReader->pWalReader->curStopped);
|
if (walNextValidMsg(pWalReader) < 0) {
|
||||||
|
pReader->ver = pWalReader->curVersion - (pWalReader->curInvalid | pWalReader->curStopped);
|
||||||
ret->offset.type = TMQ_OFFSET__LOG;
|
ret->offset.type = TMQ_OFFSET__LOG;
|
||||||
ret->offset.version = pReader->ver;
|
ret->offset.version = pReader->ver;
|
||||||
ret->fetchType = FETCH_TYPE__NONE;
|
ret->fetchType = FETCH_TYPE__NONE;
|
||||||
tqDebug("return offset %" PRId64 ", no more valid", ret->offset.version);
|
tqDebug("return offset %" PRId64 ", no more valid", ret->offset.version);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
void* body = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
|
|
||||||
int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
|
void* body = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
|
||||||
int64_t ver = pReader->pWalReader->pHead->head.version;
|
int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
|
||||||
|
int64_t ver = pWalReader->pHead->head.version;
|
||||||
|
|
||||||
|
tqDebug("tmq poll: extract submit msg from wal, version:%"PRId64" len:%d", ver, bodyLen);
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
if (pReader->pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
|
if (pWalReader->pHead->head.msgType != TDMT_VND_SUBMIT) {
|
||||||
// TODO do filter
|
// TODO do filter
|
||||||
ret->fetchType = FETCH_TYPE__META;
|
ret->fetchType = FETCH_TYPE__META;
|
||||||
ret->meta = pReader->pWalReader->pHead->head.body;
|
ret->meta = pWalReader->pHead->head.body;
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
#endif
|
#endif
|
||||||
tqReaderSetSubmitReq2(pReader, body, bodyLen, ver);
|
tqReaderSetSubmitReq2(pReader, body, bodyLen, ver);
|
||||||
/*tqReaderSetDataMsg(pReader, body, pReader->pWalReader->pHead->head.version);*/
|
/*tqReaderSetDataMsg(pReader, body, pWalReader->pHead->head.version);*/
|
||||||
#if 0
|
#if 0
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
@ -358,7 +366,7 @@ int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t v
|
||||||
// if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
|
// if (tInitSubmitMsgIter(pMsg, &pReader->msgIter) < 0) return -1;
|
||||||
// while (true) {
|
// while (true) {
|
||||||
// if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1;
|
// if (tGetSubmitMsgNext(&pReader->msgIter, &pReader->pBlock) < 0) return -1;
|
||||||
// tqDebug("submitnext vgId:%d, block:%p, dataLen:%d, len:%d, uid:%"PRId64, pReader->pWalReader->pWal->cfg.vgId, pReader->pBlock, pReader->msgIter.dataLen,
|
// tqDebug("submitnext vgId:%d, block:%p, dataLen:%d, len:%d, uid:%"PRId64, pWalReader->pWal->cfg.vgId, pReader->pBlock, pReader->msgIter.dataLen,
|
||||||
// pReader->msgIter.len, pReader->msgIter.uid);
|
// pReader->msgIter.len, pReader->msgIter.uid);
|
||||||
// if (pReader->pBlock == NULL) break;
|
// if (pReader->pBlock == NULL) break;
|
||||||
// }
|
// }
|
||||||
|
@ -371,10 +379,8 @@ int32_t tqReaderSetDataMsg(STqReader* pReader, const SSubmitReq* pMsg, int64_t v
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int32_t tqReaderSetSubmitReq2(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) {
|
int32_t tqReaderSetSubmitReq2(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) {
|
||||||
ASSERT(pReader->msg2.msgStr == NULL);
|
ASSERT(pReader->msg2.msgStr == NULL && msgStr && msgLen && (ver >= 0));
|
||||||
ASSERT(msgStr);
|
|
||||||
ASSERT(msgLen);
|
|
||||||
ASSERT(ver >= 0);
|
|
||||||
pReader->msg2.msgStr = msgStr;
|
pReader->msg2.msgStr = msgStr;
|
||||||
pReader->msg2.msgLen = msgLen;
|
pReader->msg2.msgLen = msgLen;
|
||||||
pReader->msg2.ver = ver;
|
pReader->msg2.ver = ver;
|
||||||
|
@ -421,7 +427,10 @@ bool tqNextDataBlock(STqReader* pReader) {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
bool tqNextDataBlock2(STqReader* pReader) {
|
bool tqNextDataBlock2(STqReader* pReader) {
|
||||||
if (pReader->msg2.msgStr == NULL) return false;
|
if (pReader->msg2.msgStr == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
ASSERT(pReader->setMsg == 1);
|
ASSERT(pReader->setMsg == 1);
|
||||||
|
|
||||||
tqDebug("tq reader next data block %p, %d %" PRId64 " %d", pReader->msg2.msgStr, pReader->msg2.msgLen,
|
tqDebug("tq reader next data block %p, %d %" PRId64 " %d", pReader->msg2.msgStr, pReader->msg2.msgLen,
|
||||||
|
@ -528,7 +537,7 @@ int32_t tqScanSubmitSplit(SArray* pBlocks, SArray* schemas, STqReader* pReader)
|
||||||
if (pReader->pSchema == NULL) {
|
if (pReader->pSchema == NULL) {
|
||||||
tqWarn("vgId:%d, cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64
|
tqWarn("vgId:%d, cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64
|
||||||
"), version %d, possibly dropped table",
|
"), version %d, possibly dropped table",
|
||||||
pReader->pWalReader->pWal->cfg.vgId, pReader->msgIter.uid, pReader->msgIter.suid, sversion);
|
pWalReader->pWal->cfg.vgId, pReader->msgIter.uid, pReader->msgIter.suid, sversion);
|
||||||
pReader->cachedSchemaSuid = 0;
|
pReader->cachedSchemaSuid = 0;
|
||||||
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
|
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -538,7 +547,7 @@ int32_t tqScanSubmitSplit(SArray* pBlocks, SArray* schemas, STqReader* pReader)
|
||||||
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, 1);
|
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, 1);
|
||||||
if (pReader->pSchemaWrapper == NULL) {
|
if (pReader->pSchemaWrapper == NULL) {
|
||||||
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
|
tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
|
||||||
pReader->pWalReader->pWal->cfg.vgId, pReader->msgIter.uid, pReader->cachedSchemaVer);
|
pWalReader->pWal->cfg.vgId, pReader->msgIter.uid, pReader->cachedSchemaVer);
|
||||||
pReader->cachedSchemaSuid = 0;
|
pReader->cachedSchemaSuid = 0;
|
||||||
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
|
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -1264,8 +1264,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 1; i < nColData; i++) {
|
for (int32_t j = 1; j < nColData; j++) {
|
||||||
if (aColData[i].nVal != aColData[0].nVal) {
|
if (aColData[j].nVal != aColData[0].nVal) {
|
||||||
code = TSDB_CODE_INVALID_MSG;
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
@ -1299,8 +1299,8 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
||||||
SVCreateTbRsp *pCreateTbRsp = taosArrayReserve(pSubmitRsp->aCreateTbRsp, 1);
|
SVCreateTbRsp *pCreateTbRsp = taosArrayReserve(pSubmitRsp->aCreateTbRsp, 1);
|
||||||
|
|
||||||
// create table
|
// create table
|
||||||
if (metaCreateTable(pVnode->pMeta, version, pSubmitTbData->pCreateTbReq, &pCreateTbRsp->pMeta) ==
|
if (metaCreateTable(pVnode->pMeta, version, pSubmitTbData->pCreateTbReq, &pCreateTbRsp->pMeta) == 0) {
|
||||||
0) { // create table success
|
// create table success
|
||||||
|
|
||||||
if (newTbUids == NULL &&
|
if (newTbUids == NULL &&
|
||||||
(newTbUids = taosArrayInit(TARRAY_SIZE(pSubmitReq->aSubmitTbData), sizeof(int64_t))) == NULL) {
|
(newTbUids = taosArrayInit(TARRAY_SIZE(pSubmitReq->aSubmitTbData), sizeof(int64_t))) == NULL) {
|
||||||
|
@ -1330,7 +1330,7 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq
|
||||||
pSubmitRsp->affectedRows += affectedRows;
|
pSubmitRsp->affectedRows += affectedRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
// update table uid list
|
// update the affected table uid list
|
||||||
if (taosArrayGetSize(newTbUids) > 0) {
|
if (taosArrayGetSize(newTbUids) > 0) {
|
||||||
vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode),
|
vDebug("vgId:%d, add %d table into query table list in handling submit", TD_VID(pVnode),
|
||||||
(int32_t)taosArrayGetSize(newTbUids));
|
(int32_t)taosArrayGetSize(newTbUids));
|
||||||
|
|
|
@ -133,8 +133,7 @@ typedef struct {
|
||||||
int64_t snapshotVer;
|
int64_t snapshotVer;
|
||||||
// const SSubmitReq* pReq;
|
// const SSubmitReq* pReq;
|
||||||
|
|
||||||
SPackedData submit;
|
SPackedData submit;
|
||||||
|
|
||||||
SSchemaWrapper* schema;
|
SSchemaWrapper* schema;
|
||||||
char tbName[TSDB_TABLE_NAME_LEN];
|
char tbName[TSDB_TABLE_NAME_LEN];
|
||||||
int8_t recoverStep;
|
int8_t recoverStep;
|
||||||
|
|
|
@ -1035,8 +1035,9 @@ int32_t qStreamScanMemData(qTaskInfo_t tinfo, const SSubmitReq* pReq, int64_t sc
|
||||||
|
|
||||||
int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) {
|
int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) {
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
|
ASSERT((pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE )&& (pTaskInfo->streamInfo.submit.msgStr == NULL));
|
||||||
ASSERT(pTaskInfo->streamInfo.submit.msgStr == NULL);
|
qDebug("set the submit block for future scan");
|
||||||
|
|
||||||
pTaskInfo->streamInfo.submit = submit;
|
pTaskInfo->streamInfo.submit = submit;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1050,6 +1051,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) {
|
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
uint16_t type = pOperator->operatorType;
|
uint16_t type = pOperator->operatorType;
|
||||||
pOperator->status = OP_OPENED;
|
pOperator->status = OP_OPENED;
|
||||||
|
|
|
@ -1562,7 +1562,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStreamScanInfo* pInfo = pOperator->info;
|
SStreamScanInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
qDebug("queue scan called");
|
qDebug("start to exec queue scan");
|
||||||
|
|
||||||
if (pTaskInfo->streamInfo.submit.msgStr != NULL) {
|
if (pTaskInfo->streamInfo.submit.msgStr != NULL) {
|
||||||
if (pInfo->tqReader->msg2.msgStr == NULL) {
|
if (pInfo->tqReader->msg2.msgStr == NULL) {
|
||||||
|
@ -1587,7 +1587,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
SSDataBlock block = {0};
|
SSDataBlock block = {0};
|
||||||
|
|
||||||
int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL);
|
int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
|
if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue