Merge pull request #20143 from taosdata/feature/3_liaohj
refactor: do some internal refactor.
This commit is contained in:
commit
cc393f3939
|
@ -824,7 +824,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
|
|||
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
|
||||
*pRefId = pTmq->refId;
|
||||
|
||||
tscDebug("consumer:0x%"PRIx64" next retrieve ep from mnode in 1s", pTmq->consumerId);
|
||||
tscDebug("consumer:0x%"PRIx64" retrieve ep from mnode in 1s", pTmq->consumerId);
|
||||
taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer);
|
||||
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
|
||||
tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam);
|
||||
|
@ -832,7 +832,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
|
|||
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
|
||||
*pRefId = pTmq->refId;
|
||||
|
||||
tscDebug("consumer:0x%"PRIx64" next commit to mnode in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval/1000.0);
|
||||
tscDebug("consumer:0x%"PRIx64" commit to vnode(s) in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval/1000.0);
|
||||
taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
|
||||
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
|
||||
// do nothing
|
||||
|
@ -1578,25 +1578,20 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
|
|||
}
|
||||
|
||||
void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) {
|
||||
/*strcpy(pReq->topic, pTopic->topicName);*/
|
||||
/*strcpy(pReq->cgroup, tmq->groupId);*/
|
||||
|
||||
int32_t groupLen = strlen(tmq->groupId);
|
||||
memcpy(pReq->subKey, tmq->groupId, groupLen);
|
||||
pReq->subKey[groupLen] = TMQ_SEPARATOR;
|
||||
strcpy(pReq->subKey + groupLen + 1, pTopic->topicName);
|
||||
|
||||
pReq->withTbName = tmq->withTbName;
|
||||
pReq->timeout = timeout;
|
||||
pReq->consumerId = tmq->consumerId;
|
||||
pReq->timeout = timeout;
|
||||
pReq->epoch = tmq->epoch;
|
||||
/*pReq->currentOffset = reqOffset;*/
|
||||
pReq->reqOffset = pVg->currentOffset;
|
||||
pReq->reqId = generateRequestId();
|
||||
|
||||
pReq->useSnapshot = tmq->useSnapshot;
|
||||
|
||||
pReq->head.vgId = pVg->vgId;
|
||||
pReq->useSnapshot = tmq->useSnapshot;
|
||||
pReq->reqId = generateRequestId();
|
||||
}
|
||||
|
||||
SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
||||
|
@ -1646,6 +1641,76 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
|||
return pRspObj;
|
||||
}
|
||||
|
||||
static int32_t handleErrorBeforePoll(SMqClientVg* pVg, tmq_t* pTmq) {
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
tsem_post(&pTmq->rspSem);
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) {
|
||||
SMqPollReq req = {0};
|
||||
tmqBuildConsumeReqImpl(&req, pTmq, timeout, pTopic, pVg);
|
||||
|
||||
int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
|
||||
if (msgSize < 0) {
|
||||
return handleErrorBeforePoll(pVg, pTmq);
|
||||
}
|
||||
|
||||
char* msg = taosMemoryCalloc(1, msgSize);
|
||||
if (NULL == msg) {
|
||||
return handleErrorBeforePoll(pVg, pTmq);
|
||||
}
|
||||
|
||||
if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
|
||||
taosMemoryFree(msg);
|
||||
return handleErrorBeforePoll(pVg, pTmq);
|
||||
}
|
||||
|
||||
SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
|
||||
if (pParam == NULL) {
|
||||
taosMemoryFree(msg);
|
||||
return handleErrorBeforePoll(pVg, pTmq);
|
||||
}
|
||||
|
||||
pParam->refId = pTmq->refId;
|
||||
pParam->epoch = pTmq->epoch;
|
||||
pParam->pVg = pVg; // pVg may be released,fix it
|
||||
pParam->pTopic = pTopic;
|
||||
pParam->vgId = pVg->vgId;
|
||||
|
||||
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||
if (sendInfo == NULL) {
|
||||
taosMemoryFree(pParam);
|
||||
taosMemoryFree(msg);
|
||||
return handleErrorBeforePoll(pVg, pTmq);
|
||||
}
|
||||
|
||||
sendInfo->msgInfo = (SDataBuf){
|
||||
.pData = msg,
|
||||
.len = msgSize,
|
||||
.handle = NULL,
|
||||
};
|
||||
|
||||
sendInfo->requestId = req.reqId;
|
||||
sendInfo->requestObjRefId = 0;
|
||||
sendInfo->param = pParam;
|
||||
sendInfo->fp = tmqPollCb;
|
||||
sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
|
||||
|
||||
int64_t transporterId = 0;
|
||||
char offsetFormatBuf[80];
|
||||
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->currentOffset);
|
||||
|
||||
tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:0x%" PRIx64,
|
||||
pTmq->consumerId, pTopic->topicName, pVg->vgId, pTmq->epoch, offsetFormatBuf, req.reqId);
|
||||
asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
|
||||
|
||||
pVg->pollCnt++;
|
||||
pTmq->pollCnt++;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// broadcast the poll request to all related vnodes
|
||||
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
||||
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
|
||||
|
@ -1654,7 +1719,9 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
|||
for (int i = 0; i < numOfTopics; i++) {
|
||||
|
||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
|
||||
int32_t numOfVg = taosArrayGetSize(pTopic->vgs);
|
||||
|
||||
for (int j = 0; j < numOfVg; j++) {
|
||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
|
||||
if (vgStatus == TMQ_VG_STATUS__WAIT) {
|
||||
|
@ -1673,77 +1740,10 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
|||
}
|
||||
|
||||
atomic_store_32(&pVg->vgSkipCnt, 0);
|
||||
|
||||
SMqPollReq req = {0};
|
||||
tmqBuildConsumeReqImpl(&req, tmq, timeout, pTopic, pVg);
|
||||
int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
|
||||
if (msgSize < 0) {
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
tsem_post(&tmq->rspSem);
|
||||
return -1;
|
||||
int32_t code = doTmqPollImpl(tmq, pTopic, pVg, timeout);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
char* msg = taosMemoryCalloc(1, msgSize);
|
||||
if (NULL == msg) {
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
tsem_post(&tmq->rspSem);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
|
||||
taosMemoryFree(msg);
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
tsem_post(&tmq->rspSem);
|
||||
return -1;
|
||||
}
|
||||
|
||||
SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
|
||||
if (pParam == NULL) {
|
||||
taosMemoryFree(msg);
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
tsem_post(&tmq->rspSem);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pParam->refId = tmq->refId;
|
||||
pParam->epoch = tmq->epoch;
|
||||
|
||||
pParam->pVg = pVg;
|
||||
pParam->pTopic = pTopic;
|
||||
pParam->vgId = pVg->vgId;
|
||||
|
||||
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||
if (sendInfo == NULL) {
|
||||
taosMemoryFree(msg);
|
||||
taosMemoryFree(pParam);
|
||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
tsem_post(&tmq->rspSem);
|
||||
return -1;
|
||||
}
|
||||
|
||||
sendInfo->msgInfo = (SDataBuf){
|
||||
.pData = msg,
|
||||
.len = msgSize,
|
||||
.handle = NULL,
|
||||
};
|
||||
|
||||
sendInfo->requestId = req.reqId;
|
||||
sendInfo->requestObjRefId = 0;
|
||||
sendInfo->param = pParam;
|
||||
sendInfo->fp = tmqPollCb;
|
||||
sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
|
||||
|
||||
int64_t transporterId = 0;
|
||||
|
||||
char offsetFormatBuf[80];
|
||||
tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset);
|
||||
|
||||
tscDebug("consumer:0x%" PRIx64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:0x%" PRIx64,
|
||||
tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
|
||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
|
||||
|
||||
pVg->pollCnt++;
|
||||
tmq->pollCnt++;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -27,13 +27,8 @@ void mndCleanupTopic(SMnode *pMnode);
|
|||
|
||||
SMqTopicObj *mndAcquireTopic(SMnode *pMnode, const char *topicName);
|
||||
void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic);
|
||||
|
||||
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic);
|
||||
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw);
|
||||
|
||||
int32_t mndDropTopicByDB(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
||||
int32_t mndCheckTopicExist(SMnode *pMnode, SDbObj *pDb);
|
||||
|
||||
bool mndTopicExistsForDb(SMnode *pMnode, SDbObj *pDb);
|
||||
const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]);
|
||||
|
||||
int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);
|
||||
|
|
|
@ -256,8 +256,9 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
|
|||
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
|
||||
int32_t status = atomic_load_32(&pConsumer->status);
|
||||
|
||||
mDebug("check for consumer:0x%"PRIx64" status:%d(%s), sub-time:%"PRId64", uptime:%"PRId64,
|
||||
pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->upTime);
|
||||
mDebug("check for consumer:0x%"PRIx64" status:%d(%s), sub-time:%"PRId64", uptime:%"PRId64", hbstatus:%d",
|
||||
pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->upTime,
|
||||
hbStatus);
|
||||
|
||||
if (status == MQ_CONSUMER_STATUS__READY) {
|
||||
if (hbStatus > MND_CONSUMER_LOST_HB_CNT) {
|
||||
|
@ -269,6 +270,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
|
|||
.pCont = pLostMsg,
|
||||
.contLen = sizeof(SMqConsumerLostMsg),
|
||||
};
|
||||
|
||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||
}
|
||||
} else if (status == MQ_CONSUMER_STATUS__LOST_REBD) {
|
||||
|
@ -282,6 +284,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
|
|||
.pCont = pClearMsg,
|
||||
.contLen = sizeof(SMqConsumerClearMsg),
|
||||
};
|
||||
|
||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||
}
|
||||
} else if (status == MQ_CONSUMER_STATUS__LOST) {
|
||||
|
|
|
@ -1035,13 +1035,23 @@ static int32_t mndBuildDropDbRsp(SDbObj *pDb, int32_t *pRspLen, void **ppRsp, bo
|
|||
|
||||
static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "drop-db");
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mInfo("trans:%d, used to drop db:%s", pTrans->id, pDb->name);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "drop-db");
|
||||
if (pTrans == NULL) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
mInfo("trans:%d start to drop db:%s", pTrans->id, pDb->name);
|
||||
|
||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
||||
if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
||||
if (mndCheckTopicExist(pMnode, pDb) < 0) goto _OVER;
|
||||
if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndTopicExistsForDb(pMnode, pDb)) {
|
||||
terrno = TSDB_CODE_MND_TOPIC_MUST_BE_DELETED;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
|
||||
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
|
||||
|
@ -1092,10 +1102,12 @@ static int32_t mndProcessDropDbReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
code = mndDropDb(pMnode, pReq, pDb);
|
||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
_OVER:
|
||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mError("db:%s, failed to drop since %s", dropReq.db, terrstr());
|
||||
}
|
||||
|
||||
|
|
|
@ -551,8 +551,6 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
|||
pSubplan = (SSubplan*)nodesListGetNode(pNodeListNode->pNodeList, 0);
|
||||
}
|
||||
|
||||
ASSERT(pSub->unassignedVgs);
|
||||
|
||||
void* pIter = NULL;
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
|
||||
|
|
|
@ -122,6 +122,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
|
|||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SMsgHead *pMsgHead = (SMsgHead *)buf;
|
||||
|
||||
pMsgHead->contLen = htonl(tlen);
|
||||
|
|
|
@ -31,6 +31,9 @@
|
|||
#define MND_TOPIC_VER_NUMBER 2
|
||||
#define MND_TOPIC_RESERVE_SIZE 64
|
||||
|
||||
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic);
|
||||
SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw);
|
||||
|
||||
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic);
|
||||
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic);
|
||||
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic);
|
||||
|
@ -79,6 +82,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
|||
if (pTopic->physicalPlan) {
|
||||
physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
|
||||
}
|
||||
|
||||
int32_t schemaLen = 0;
|
||||
if (pTopic->schema.nCols) {
|
||||
schemaLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
|
||||
|
@ -88,7 +92,9 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
|||
int32_t size = sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + pTopic->astLen + schemaLen + ntbColLen +
|
||||
MND_TOPIC_RESERVE_SIZE;
|
||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
|
||||
if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
|
||||
if (pRaw == NULL) {
|
||||
goto TOPIC_ENCODE_OVER;
|
||||
}
|
||||
|
||||
int32_t dataPos = 0;
|
||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->name, TSDB_TOPIC_FNAME_LEN, TOPIC_ENCODE_OVER);
|
||||
|
@ -106,6 +112,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
|||
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
|
||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
|
||||
SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER);
|
||||
|
||||
if (pTopic->astLen) {
|
||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->ast, pTopic->astLen, TOPIC_ENCODE_OVER);
|
||||
}
|
||||
|
@ -123,6 +130,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
|||
taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema);
|
||||
SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
|
||||
}
|
||||
|
||||
SDB_SET_INT64(pRaw, dataPos, pTopic->ntbUid, TOPIC_ENCODE_OVER);
|
||||
if (pTopic->ntbUid != 0) {
|
||||
int32_t sz = taosArrayGetSize(pTopic->ntbColIds);
|
||||
|
@ -132,6 +140,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
|||
SDB_SET_INT16(pRaw, dataPos, colId, TOPIC_ENCODE_OVER);
|
||||
}
|
||||
}
|
||||
|
||||
SDB_SET_INT64(pRaw, dataPos, pTopic->ctbStbUid, TOPIC_ENCODE_OVER);
|
||||
|
||||
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
|
||||
|
@ -247,10 +256,9 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
|||
SDB_GET_INT16(pRaw, dataPos, &colId, TOPIC_DECODE_OVER);
|
||||
taosArrayPush(pTopic->ntbColIds, &colId);
|
||||
}
|
||||
|
||||
SDB_GET_INT64(pRaw, dataPos, &pTopic->ctbStbUid, TOPIC_DECODE_OVER);
|
||||
|
||||
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
TOPIC_DECODE_OVER:
|
||||
|
@ -266,12 +274,12 @@ TOPIC_DECODE_OVER:
|
|||
}
|
||||
|
||||
static int32_t mndTopicActionInsert(SSdb *pSdb, SMqTopicObj *pTopic) {
|
||||
mTrace("topic:%s, perform insert action", pTopic->name);
|
||||
mTrace("topic:%s perform insert action", pTopic->name);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
|
||||
mTrace("topic:%s, perform delete action", pTopic->name);
|
||||
mTrace("topic:%s perform delete action", pTopic->name);
|
||||
taosMemoryFreeClear(pTopic->sql);
|
||||
taosMemoryFreeClear(pTopic->ast);
|
||||
taosMemoryFreeClear(pTopic->physicalPlan);
|
||||
|
@ -281,7 +289,7 @@ static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic) {
|
|||
}
|
||||
|
||||
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopicObj *pNewTopic) {
|
||||
mTrace("topic:%s, perform update action", pOldTopic->name);
|
||||
mTrace("topic:%s perform update action", pOldTopic->name);
|
||||
atomic_exchange_64(&pOldTopic->updateTime, pNewTopic->updateTime);
|
||||
atomic_exchange_32(&pOldTopic->version, pNewTopic->version);
|
||||
|
||||
|
@ -364,7 +372,8 @@ static int32_t extractTopicTbInfo(SNode *pAst, SMqTopicObj *pTopic) {
|
|||
|
||||
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb,
|
||||
const char *userName) {
|
||||
mInfo("topic:%s to create", pCreate->name);
|
||||
mInfo("start to create topic:%s", pCreate->name);
|
||||
|
||||
SMqTopicObj topicObj = {0};
|
||||
tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
|
||||
tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
|
@ -383,19 +392,18 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
topicObj.sqlLen = strlen(pCreate->sql) + 1;
|
||||
topicObj.subType = pCreate->subType;
|
||||
topicObj.withMeta = pCreate->withMeta;
|
||||
if (topicObj.withMeta) {
|
||||
if (topicObj.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
|
||||
if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
if (pCreate->withMeta) {
|
||||
terrno = TSDB_CODE_MND_INVALID_TOPIC_OPTION;
|
||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (pCreate->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
topicObj.ast = strdup(pCreate->ast);
|
||||
topicObj.astLen = strlen(pCreate->ast) + 1;
|
||||
|
||||
qDebugL("ast %s", topicObj.ast);
|
||||
qDebugL("topic:%s ast %s", topicObj.name, topicObj.ast);
|
||||
|
||||
SNode *pAst = NULL;
|
||||
if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
|
||||
|
@ -409,18 +417,20 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
|
||||
SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true};
|
||||
if (qCreateQueryPlan(&cxt, &pPlan, NULL) != 0) {
|
||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||
mError("failed to create topic:%s since %s", pCreate->name, terrstr());
|
||||
taosMemoryFree(topicObj.ast);
|
||||
taosMemoryFree(topicObj.sql);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int64_t ntbUid;
|
||||
topicObj.ntbColIds = taosArrayInit(0, sizeof(int16_t));
|
||||
if (topicObj.ntbColIds == NULL) {
|
||||
taosMemoryFree(topicObj.ast);
|
||||
taosMemoryFree(topicObj.sql);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
extractTopicTbInfo(pAst, &topicObj);
|
||||
|
||||
if (topicObj.ntbUid == 0) {
|
||||
|
@ -449,6 +459,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
topicObj.stbUid = pStb->uid;
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
}
|
||||
|
@ -467,7 +478,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
taosMemoryFreeClear(topicObj.physicalPlan);
|
||||
return -1;
|
||||
}
|
||||
mInfo("trans:%d, used to create topic:%s", pTrans->id, pCreate->name);
|
||||
|
||||
mInfo("trans:%d to create topic:%s", pTrans->id, pCreate->name);
|
||||
|
||||
SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj);
|
||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
|
@ -476,6 +488,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
|
||||
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||
|
||||
if (topicObj.ntbUid != 0) {
|
||||
|
@ -544,7 +557,11 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
taosMemoryFreeClear(topicObj.sql);
|
||||
taosMemoryFreeClear(topicObj.ast);
|
||||
taosArrayDestroy(topicObj.ntbColIds);
|
||||
if (topicObj.schema.nCols) taosMemoryFreeClear(topicObj.schema.pSchema);
|
||||
|
||||
if (topicObj.schema.nCols) {
|
||||
taosMemoryFreeClear(topicObj.schema.pSchema);
|
||||
}
|
||||
|
||||
mndTransDrop(pTrans);
|
||||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
@ -589,11 +606,13 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
code = mndCreateTopic(pMnode, pReq, &createTopicReq, pDb, pReq->info.conn.user);
|
||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
if (code == 0) {
|
||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
_OVER:
|
||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||
mError("topic:%s, failed to create since %s", createTopicReq.name, terrstr());
|
||||
mError("failed to create topic:%s since %s", createTopicReq.name, terrstr());
|
||||
}
|
||||
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
|
@ -605,13 +624,18 @@ _OVER:
|
|||
|
||||
static int32_t mndDropTopic(SMnode *pMnode, STrans *pTrans, SRpcMsg *pReq, SMqTopicObj *pTopic) {
|
||||
int32_t code = -1;
|
||||
if (mndUserRemoveTopic(pMnode, pTrans, pTopic->name) != 0) goto _OVER;
|
||||
if (mndUserRemoveTopic(pMnode, pTrans, pTopic->name) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
SSdbRaw *pCommitRaw = mndTopicActionEncode(pTopic);
|
||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) goto _OVER;
|
||||
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED);
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
code = 0;
|
||||
|
||||
_OVER:
|
||||
|
@ -650,7 +674,9 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
|||
SMqConsumerObj *pConsumer;
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
||||
if (pIter == NULL) break;
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (pConsumer->status == MQ_CONSUMER_STATUS__LOST_REBD) continue;
|
||||
|
||||
|
@ -661,7 +687,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
|||
mndReleaseConsumer(pMnode, pConsumer);
|
||||
mndReleaseTopic(pMnode, pTopic);
|
||||
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;
|
||||
mError("topic:%s, failed to drop since subscribed by consumer:%" PRId64 ", in consumer group %s", dropReq.name,
|
||||
mError("topic:%s, failed to drop since subscribed by consumer:0x%" PRIx64 ", in consumer group %s", dropReq.name,
|
||||
pConsumer->consumerId, pConsumer->cgroup);
|
||||
return -1;
|
||||
}
|
||||
|
@ -773,6 +799,8 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
|
||||
*pNumOfTopics = 0;
|
||||
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
|
||||
if (pDb == NULL) {
|
||||
|
@ -785,7 +813,9 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo
|
|||
while (1) {
|
||||
SMqTopicObj *pTopic = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
|
||||
if (pIter == NULL) break;
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (pTopic->dbUid == pDb->uid) {
|
||||
numOfTopics++;
|
||||
|
@ -814,10 +844,9 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
|
|||
int32_t cols = 0;
|
||||
|
||||
char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE + 5] = {0};
|
||||
tstrncpy(varDataVal(topicName), mndGetDbStr(pTopic->name), sizeof(topicName) - 2);
|
||||
/*tNameFromString(&n, pTopic->name, T_NAME_ACCT | T_NAME_DB);*/
|
||||
/*tNameGetDbName(&n, varDataVal(topicName));*/
|
||||
varDataSetLen(topicName, strlen(varDataVal(topicName)));
|
||||
const char* pName = mndGetDbStr(pTopic->name);
|
||||
STR_TO_VARSTR(topicName, pName);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)topicName, false);
|
||||
|
||||
|
@ -825,6 +854,7 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
|
|||
tNameFromString(&n, pTopic->db, T_NAME_ACCT | T_NAME_DB);
|
||||
tNameGetDbName(&n, varDataVal(dbName));
|
||||
varDataSetLen(dbName, strlen(varDataVal(dbName)));
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)dbName, false);
|
||||
|
||||
|
@ -832,8 +862,8 @@ static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
|
|||
colDataSetVal(pColInfo, numOfRows, (const char *)&pTopic->createTime, false);
|
||||
|
||||
char sql[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||
tstrncpy(&sql[VARSTR_HEADER_SIZE], pTopic->sql, TSDB_SHOW_SQL_LEN);
|
||||
varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE]));
|
||||
STR_TO_VARSTR(sql, pTopic->sql);
|
||||
|
||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataSetVal(pColInfo, numOfRows, (const char *)sql, false);
|
||||
|
||||
|
@ -868,24 +898,26 @@ static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter) {
|
|||
sdbCancelFetch(pSdb, pIter);
|
||||
}
|
||||
|
||||
int32_t mndCheckTopicExist(SMnode *pMnode, SDbObj *pDb) {
|
||||
bool mndTopicExistsForDb(SMnode *pMnode, SDbObj *pDb) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
void *pIter = NULL;
|
||||
SMqTopicObj *pTopic = NULL;
|
||||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
|
||||
if (pIter == NULL) break;
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (pTopic->dbUid == pDb->uid) {
|
||||
sdbRelease(pSdb, pTopic);
|
||||
terrno = TSDB_CODE_MND_TOPIC_MUST_BE_DELETED;
|
||||
return -1;
|
||||
return true;
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pTopic);
|
||||
}
|
||||
return 0;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
|
|
@ -1062,10 +1062,14 @@ int32_t mndUserRemoveTopic(SMnode *pMnode, STrans *pTrans, char *topic) {
|
|||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_USER, pIter, (void **)&pUser);
|
||||
if (pIter == NULL) break;
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
code = -1;
|
||||
if (mndUserDupObj(pUser, &newUser) != 0) break;
|
||||
if (mndUserDupObj(pUser, &newUser) != 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
bool inTopic = (taosHashGet(newUser.topics, topic, len) != NULL);
|
||||
if (inTopic) {
|
||||
|
|
|
@ -114,7 +114,10 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
|
|||
}
|
||||
|
||||
void tqClose(STQ* pTq) {
|
||||
if (pTq) {
|
||||
if (pTq == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
tqOffsetClose(pTq->pOffsetStore);
|
||||
taosHashCleanup(pTq->pHandle);
|
||||
taosHashCleanup(pTq->pPushMgr);
|
||||
|
@ -124,7 +127,6 @@ void tqClose(STQ* pTq) {
|
|||
streamMetaClose(pTq->pStreamMeta);
|
||||
taosMemoryFree(pTq);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) {
|
||||
int32_t len = 0;
|
||||
|
@ -158,7 +160,7 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq,
|
|||
};
|
||||
tmsgSendRsp(&resp);
|
||||
|
||||
tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) send rsp, res msg type %d, offset type:%d",
|
||||
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, res msg type %d, offset type:%d",
|
||||
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->resMsgType, pRsp->rspOffset.type);
|
||||
|
||||
return 0;
|
||||
|
@ -215,9 +217,9 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
|
|||
|
||||
char buf1[80] = {0};
|
||||
char buf2[80] = {0};
|
||||
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
||||
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
||||
tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s",
|
||||
tFormatOffset(buf1, tListLen(buf1), &pRsp->reqOffset);
|
||||
tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset);
|
||||
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s",
|
||||
TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
|
||||
|
||||
return 0;
|
||||
|
@ -273,7 +275,7 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
|
|||
char buf2[80] = {0};
|
||||
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
||||
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
||||
tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s",
|
||||
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s",
|
||||
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
||||
|
||||
return 0;
|
||||
|
@ -332,8 +334,7 @@ int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, co
|
|||
char buf2[80] = {0};
|
||||
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
||||
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
||||
tqDebug("taosx rsp, vgId:%d, from consumer:%" PRId64
|
||||
", (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s",
|
||||
tqDebug("taosx rsp, vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s",
|
||||
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
||||
|
||||
return 0;
|
||||
|
@ -344,14 +345,16 @@ static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOf
|
|||
pLeft->val.version <= pRight->val.version;
|
||||
}
|
||||
|
||||
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||
int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||
STqOffset offset = {0};
|
||||
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, msg, msgLen);
|
||||
tDecoderInit(&decoder, (uint8_t*) msg, msgLen);
|
||||
if (tDecodeSTqOffset(&decoder, &offset) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
if (offset.val.type == TMQ_OFFSET__SNAPSHOT_DATA || offset.val.type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||
|
@ -360,17 +363,20 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t m
|
|||
} else if (offset.val.type == TMQ_OFFSET__LOG) {
|
||||
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, offset.subKey,
|
||||
TD_VID(pTq->pVnode), offset.val.version);
|
||||
if (offset.val.version + 1 == version) {
|
||||
if (offset.val.version + 1 == sversion) {
|
||||
offset.val.version += 1;
|
||||
}
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
|
||||
if (pOffset != NULL && tqOffsetLessOrEqual(&offset, pOffset)) {
|
||||
return 0;
|
||||
tqError("invalid commit offset type:%d", offset.val.type);
|
||||
return -1;
|
||||
}
|
||||
|
||||
STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey);
|
||||
if (pSavedOffset != NULL && tqOffsetLessOrEqual(&offset, pSavedOffset)) {
|
||||
return 0; // no need to update the offset value
|
||||
}
|
||||
|
||||
// save the new offset value
|
||||
if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
|
@ -378,27 +384,25 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t version, char* msg, int32_t m
|
|||
|
||||
if (offset.val.type == TMQ_OFFSET__LOG) {
|
||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey));
|
||||
if (pHandle) {
|
||||
if (walRefVer(pHandle->pRef, offset.val.version) < 0) {
|
||||
if (pHandle && (walRefVer(pHandle->pRef, offset.val.version) < 0)) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// rsp
|
||||
|
||||
/*}*/
|
||||
/*}*/
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
|
||||
void* pIter = NULL;
|
||||
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pTq->pCheckInfo, pIter);
|
||||
if (pIter == NULL) break;
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
STqCheckInfo* pCheck = (STqCheckInfo*)pIter;
|
||||
|
||||
if (pCheck->ntbUid == tbUid) {
|
||||
int32_t sz = taosArrayGetSize(pCheck->colIdList);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
|
@ -410,6 +414,7 @@ int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -454,6 +459,7 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
|
|||
if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -474,17 +480,15 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
// 1. find handle
|
||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||
/*ASSERT(pHandle);*/
|
||||
if (pHandle == NULL) {
|
||||
tqError("tmq poll: no consumer handle for consumer:%" PRId64 ", in vgId:%d, subkey %s", consumerId,
|
||||
TD_VID(pTq->pVnode), req.subKey);
|
||||
tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s not found", consumerId, TD_VID(pTq->pVnode),
|
||||
req.subKey);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// check rebalance
|
||||
// 2. check rebalance
|
||||
if (pHandle->consumerId != consumerId) {
|
||||
tqError("tmq poll: consumer handle mismatch for consumer:%" PRId64
|
||||
", in vgId:%d, subkey %s, handle consumer id %" PRId64,
|
||||
tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
|
||||
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
|
||||
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
|
||||
return -1;
|
||||
|
@ -498,7 +502,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
char buf[80];
|
||||
tFormatOffset(buf, 80, &reqOffset);
|
||||
tqDebug("tmq poll: consumer %" PRId64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId,
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId,
|
||||
req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
|
||||
|
||||
// 2.reset offset if needed
|
||||
|
@ -510,7 +514,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
fetchOffsetNew = pOffset->val;
|
||||
char formatBuf[80];
|
||||
tFormatOffset(formatBuf, 80, &fetchOffsetNew);
|
||||
tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, offset reset to %s", consumerId, pHandle->subKey,
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vg %d, offset reset to %s", consumerId, pHandle->subKey,
|
||||
TD_VID(pTq->pVnode), formatBuf);
|
||||
} else {
|
||||
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
|
||||
|
@ -595,8 +599,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
code = -1;
|
||||
}
|
||||
|
||||
tqDebug("tmq poll: consumer %" PRId64
|
||||
", subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "",
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "",
|
||||
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
|
||||
dataRsp.rspOffset.uid, dataRsp.rspOffset.ts);
|
||||
|
||||
|
@ -619,8 +622,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
if (tqSendMetaPollRsp(pTq, pMsg, &req, &metaRsp) < 0) {
|
||||
code = -1;
|
||||
}
|
||||
tqDebug("tmq poll: consumer %" PRId64 ", subkey %s, vg %d, send meta offset type:%d,uid:%" PRId64
|
||||
",version:%" PRId64 "",
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey %s, vg %d, send meta offset type:%d,uid:%" PRId64
|
||||
",version:%" PRId64,
|
||||
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
|
||||
metaRsp.rspOffset.version);
|
||||
taosMemoryFree(metaRsp.metaRsp);
|
||||
|
@ -638,8 +641,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
fetchOffsetNew = taosxRsp.rspOffset;
|
||||
}
|
||||
|
||||
tqDebug("taosx poll: consumer %" PRId64 ", subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%" PRId64
|
||||
",version:%" PRId64 "",
|
||||
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%" PRId64
|
||||
",version:%" PRId64,
|
||||
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), taosxRsp.blockNum, taosxRsp.rspOffset.type,
|
||||
taosxRsp.rspOffset.uid, taosxRsp.rspOffset.version);
|
||||
}
|
||||
|
@ -675,7 +678,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
SWalCont* pHead = &pCkHead->head;
|
||||
|
||||
tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId,
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId,
|
||||
req.epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType);
|
||||
|
||||
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
||||
|
@ -716,12 +719,13 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
taosMemoryFreeClear(pCkHead);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
||||
|
||||
tqDebug("vgId:%d, delete sub: %s", pTq->pVnode->config.vgId, pReq->subKey);
|
||||
|
@ -755,10 +759,10 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t version, char* msg, int32_t msgL
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||
int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||
STqCheckInfo info = {0};
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, msg, msgLen);
|
||||
tDecoderInit(&decoder, (uint8_t*) msg, msgLen);
|
||||
if (tDecodeSTqCheckInfo(&decoder, &info) < 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
|
@ -775,7 +779,7 @@ int32_t tqProcessAddCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t m
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||
int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||
if (taosHashRemove(pTq->pCheckInfo, msg, strlen(msg)) < 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
|
@ -787,7 +791,7 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t version, char* msg, int32_t m
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||
SMqRebVgReq req = {0};
|
||||
tDecodeSMqRebVgReq(msg, &req);
|
||||
// todo lock
|
||||
|
@ -975,7 +979,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||
SStreamTaskCheckReq req;
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, msgBody, msgLen);
|
||||
tDecoderInit(&decoder, (uint8_t*) msgBody, msgLen);
|
||||
tDecodeSStreamTaskCheckReq(&decoder, &req);
|
||||
tDecoderClear(&decoder);
|
||||
int32_t taskId = req.downstreamTaskId;
|
||||
|
@ -997,7 +1001,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
if (pTask) streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
|
||||
tqDebug("tq recv task check req(reqId: %" PRId64 ") %d at node %d check req from task %d at node %d, status %d",
|
||||
tqDebug("tq recv task check req(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d",
|
||||
rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||
|
||||
SEncoder encoder;
|
||||
|
@ -1028,7 +1032,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||
int32_t code;
|
||||
SStreamTaskCheckRsp rsp;
|
||||
|
||||
|
@ -1041,7 +1045,7 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_
|
|||
}
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
tqDebug("tq recv task check rsp(reqId: %" PRId64 ") %d at node %d check req from task %d at node %d, status %d",
|
||||
tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d",
|
||||
rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId);
|
||||
|
@ -1049,12 +1053,12 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_
|
|||
return -1;
|
||||
}
|
||||
|
||||
code = streamProcessTaskCheckRsp(pTask, &rsp, version);
|
||||
code = streamProcessTaskCheckRsp(pTask, &rsp, sversion);
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||
int32_t code;
|
||||
#if 0
|
||||
code = streamMetaAddSerializedTask(pTq->pStreamMeta, version, msg, msgLen);
|
||||
|
@ -1069,6 +1073,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg
|
|||
if (pTask == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
|
||||
code = tDecodeSStreamTask(&decoder, pTask);
|
||||
|
@ -1080,14 +1085,14 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg
|
|||
tDecoderClear(&decoder);
|
||||
|
||||
// 2.save task
|
||||
code = streamMetaAddTask(pTq->pStreamMeta, version, pTask);
|
||||
code = streamMetaAddTask(pTq->pStreamMeta, sversion, pTask);
|
||||
if (code < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// 3.go through recover steps to fill history
|
||||
if (pTask->fillHistory) {
|
||||
streamTaskCheckDownstream(pTask, version);
|
||||
streamTaskCheckDownstream(pTask, sversion);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -1158,7 +1163,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||
int32_t code;
|
||||
SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg;
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
|
||||
|
@ -1167,7 +1172,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t m
|
|||
}
|
||||
|
||||
// do recovery step 2
|
||||
code = streamSourceRecoverScanStep2(pTask, version);
|
||||
code = streamSourceRecoverScanStep2(pTask, sversion);
|
||||
if (code < 0) {
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
return -1;
|
||||
|
@ -1215,7 +1220,7 @@ int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
SStreamRecoverFinishReq req;
|
||||
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, msg, msgLen);
|
||||
tDecoderInit(&decoder, (uint8_t*) msg, msgLen);
|
||||
tDecodeSStreamRecoverFinishReq(&decoder, &req);
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
|
@ -1369,7 +1374,10 @@ int32_t tqProcessSubmitReq(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
|
|||
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
|
||||
if (pIter == NULL) break;
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||
if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
|
||||
if (pTask->taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
|
||||
|
@ -1453,7 +1461,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen) {
|
||||
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
||||
streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
|
||||
return 0;
|
||||
|
@ -1465,7 +1473,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||
SStreamRetrieveReq req;
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, msgBody, msgLen);
|
||||
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
|
||||
tDecodeStreamRetrieveReq(&decoder, &req);
|
||||
tDecoderClear(&decoder);
|
||||
int32_t taskId = req.dstTaskId;
|
||||
|
@ -1498,7 +1506,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
|
|||
|
||||
SStreamDispatchReq req;
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, msgBody, msgLen);
|
||||
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
|
||||
if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
|
||||
code = TSDB_CODE_MSG_DECODE_ERROR;
|
||||
tDecoderClear(&decoder);
|
||||
|
@ -1561,4 +1569,4 @@ FAIL:
|
|||
return -1;
|
||||
}
|
||||
|
||||
int32_t tqCheckLogInWal(STQ* pTq, int64_t version) { return version <= pTq->walLogLastVer; }
|
||||
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
|
||||
|
|
Loading…
Reference in New Issue