diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 34372dc2ff..61eca6cc4f 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -208,8 +208,6 @@ void* qExtractReaderFromStreamScanner(void* scanner); int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner); -int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem); - int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo); int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver); int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index fe7ebc8c8c..6618650648 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -51,7 +51,6 @@ enum { TASK_STATUS__RECOVER_PREPARE, TASK_STATUS__RECOVER1, TASK_STATUS__RECOVER2, - TASK_STATUS__RESTORE, // only available for source task to replay WAL from the checkpoint, todo remove it TASK_STATUS__PAUSE, }; @@ -346,7 +345,7 @@ typedef struct SStreamMeta { FTaskExpand* expandFunc; int32_t vgId; SRWLatch lock; - int32_t walScan; + int32_t walScanCounter; void* streamBackend; } SStreamMeta; @@ -546,8 +545,9 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz); // recover and fill history int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version); int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version); -int32_t streamProcessTaskCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq); +int32_t streamTaskCheckStatus(SStreamTask* pTask); int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version); + // common int32_t streamSetParamForRecover(SStreamTask* pTask); int32_t streamRestoreParam(SStreamTask* pTask); diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 168ae8b66a..2e44911682 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -534,8 +534,7 @@ static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSm uint16_t *index = colHash ? (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen) : NULL; if (index) { if (colField[*index].type != kv->type) { - uError("SML:0x%" PRIx64 " point type and db type mismatch. point type: %d, db type: %d, key: %s", info->id, - colField[*index].type, kv->type, kv->key); + uError("SML:0x%" PRIx64 " point type and db type mismatch. db type: %d, point type: %d, key: %s", info->id, colField[*index].type, kv->type, kv->key); return TSDB_CODE_SML_INVALID_DATA; } diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 9292be83e9..b5ae9116ef 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1377,7 +1377,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN); tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN); - tscDebug("consumer:0x%" PRIx64 ", update topic:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet); + tscDebug("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet); pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg)); for (int32_t j = 0; j < vgNumGet; j++) { @@ -1447,14 +1447,14 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i); if (pTopicCur->vgs) { int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs); - tscDebug("consumer:0x%" PRIx64 ", new vg num: %d", tmq->consumerId, vgNumCur); + tscDebug("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur); for (int32_t j = 0; j < vgNumCur; j++) { SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j); makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId); char buf[80]; tFormatOffset(buf, 80, &pVgCur->currentOffset); - tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, + tscDebug("consumer:0x%" PRIx64 ", doUpdateLocalEp current vg, epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, tmq->epoch, pVgCur->vgId, vgKey, buf); SVgroupSaveInfo info = {.offset = pVgCur->currentOffset, .numOfRows = pVgCur->numOfRows}; @@ -1790,8 +1790,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { pVg->epSet = *pollRspWrapper->pEpset; } - // update the local offset value only for the returned values. - pVg->currentOffset = pDataRsp->rspOffset; + if(pDataRsp->rspOffset.type != 0){ // if offset is validate + pVg->currentOffset = pDataRsp->rspOffset; // update the local offset value only for the returned values. + } atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); char buf[80]; @@ -1801,12 +1802,13 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { " total:%" PRId64 " reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); + pVg->emptyBlockReceiveTs = taosGetTimestampMs(); taosFreeQitem(pollRspWrapper); } else { // build rsp int64_t numOfRows = 0; SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows); tmq->totalRows += numOfRows; - + pVg->emptyBlockReceiveTs = 0; tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 " vg total:%" PRId64 " total:%" PRId64 ", reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows, @@ -1828,7 +1830,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) { SMqClientVg* pVg = pollRspWrapper->vgHandle; - pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset; + if(pollRspWrapper->metaRsp.rspOffset.type != 0){ // if offset is validate + pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset; + } atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); // build rsp SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); @@ -1846,7 +1850,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { if (pollRspWrapper->taosxRsp.head.epoch == consumerEpoch) { SMqClientVg* pVg = pollRspWrapper->vgHandle; - pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset; + if(pollRspWrapper->taosxRsp.rspOffset.type != 0){ // if offset is validate + pVg->currentOffset = pollRspWrapper->taosxRsp.rspOffset; + } atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); if (pollRspWrapper->taosxRsp.blockNum == 0) { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 0674975972..5824f0830a 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5329,9 +5329,9 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) { int32_t headLen = sizeof(SMsgHead); - SMsgHead *pHead = buf; - pHead->vgId = pReq->head.vgId; - pHead->contLen = pReq->head.contLen; +// SMsgHead *pHead = buf; +// pHead->vgId = pReq->head.vgId; +// pHead->contLen = pReq->head.contLen; SDecoder decoder = {0}; tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen); @@ -6839,10 +6839,8 @@ int32_t tEncodeSTqOffsetVal(SEncoder *pEncoder, const STqOffsetVal *pOffsetVal) if (tEncodeI64(pEncoder, pOffsetVal->ts) < 0) return -1; } else if (pOffsetVal->type == TMQ_OFFSET__LOG) { if (tEncodeI64(pEncoder, pOffsetVal->version) < 0) return -1; - } else if (pOffsetVal->type < 0) { - // do nothing } else { - ASSERT(0); + // do nothing } return 0; } @@ -6854,10 +6852,8 @@ int32_t tDecodeSTqOffsetVal(SDecoder *pDecoder, STqOffsetVal *pOffsetVal) { if (tDecodeI64(pDecoder, &pOffsetVal->ts) < 0) return -1; } else if (pOffsetVal->type == TMQ_OFFSET__LOG) { if (tDecodeI64(pDecoder, &pOffsetVal->version) < 0) return -1; - } else if (pOffsetVal->type < 0) { - // do nothing } else { - ASSERT(0); + // do nothing } return 0; } diff --git a/source/common/src/tname.c b/source/common/src/tname.c index e5ed7a3728..c6210ca8c9 100644 --- a/source/common/src/tname.c +++ b/source/common/src/tname.c @@ -122,10 +122,8 @@ int32_t tNameLen(const SName* name) { int32_t len2 = (int32_t)strlen(name->tname); if (name->type == TSDB_DB_NAME_T) { - ASSERT(len2 == 0); return len + len1 + TSDB_NAME_DELIMITER_LEN; } else { - ASSERT(len2 > 0); return len + len1 + len2 + TSDB_NAME_DELIMITER_LEN * 2; } } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index cb7178dec2..5edc183c84 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -519,7 +519,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_ADD_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_DEL_CHECKINFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_TMQ_CONSUME, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_DEL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndConsumer.h b/source/dnode/mnode/impl/inc/mndConsumer.h index aa38b94fd7..96401511d2 100644 --- a/source/dnode/mnode/impl/inc/mndConsumer.h +++ b/source/dnode/mnode/impl/inc/mndConsumer.h @@ -23,13 +23,12 @@ extern "C" { #endif enum { - MQ_CONSUMER_STATUS__MODIFY = 1, + MQ_CONSUMER_STATUS_REBALANCE = 1, // MQ_CONSUMER_STATUS__MODIFY_IN_REB, // this value is not used anymore MQ_CONSUMER_STATUS__READY, MQ_CONSUMER_STATUS__LOST, // MQ_CONSUMER_STATUS__LOST_IN_REB, // this value is not used anymore MQ_CONSUMER_STATUS__LOST_REBD, - MQ_CONSUMER_STATUS__REMOVED, }; int32_t mndInitConsumer(SMnode *pMnode); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index fcd314d2ae..2579ff5231 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -142,7 +142,7 @@ typedef enum { CONSUMER_UPDATE__REMOVE, CONSUMER_UPDATE__LOST, CONSUMER_UPDATE__RECOVER, - CONSUMER_UPDATE__MODIFY, // subscribe req need change consume topic + CONSUMER_UPDATE__REBALANCE, // subscribe req need change consume topic } ECsmUpdateType; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 65a2fa72a2..d144747e81 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -192,15 +192,18 @@ FAIL: return -1; } +// todo check the clear process static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { SMnode *pMnode = pMsg->info.node; SMqConsumerClearMsg *pClearMsg = pMsg->pCont; - SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId); + + SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pClearMsg->consumerId); if (pConsumer == NULL) { + mError("consumer:0x%"PRIx64" failed to be found to clear it", pClearMsg->consumerId); return 0; } - mInfo("receive consumer clear msg, consumer id %" PRId64 ", status %s", pClearMsg->consumerId, + mInfo("consumer:0x%" PRIx64 " needs to be cleared, status %s", pClearMsg->consumerId, mndConsumerStatusName(pConsumer->status)); if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) { @@ -215,6 +218,8 @@ static int32_t mndProcessConsumerClearMsg(SRpcMsg *pMsg) { STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pMsg, "clear-csm"); if (pTrans == NULL) goto FAIL; + + // this is the drop action, not the update action if (mndSetConsumerDropLogs(pMnode, pTrans, pConsumerNew) != 0) goto FAIL; if (mndTransPrepare(pMnode, pTrans) != 0) goto FAIL; @@ -299,28 +304,36 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { if (status == MQ_CONSUMER_STATUS__READY) { if (hbStatus > MND_CONSUMER_LOST_HB_CNT) { SMqConsumerLostMsg *pLostMsg = rpcMallocCont(sizeof(SMqConsumerLostMsg)); + if (pLostMsg == NULL) { + mError("consumer:0x%"PRIx64" failed to transfer consumer status to lost due to out of memory. alloc size:%d", + pConsumer->consumerId, (int32_t)sizeof(SMqConsumerLostMsg)); + continue; + } pLostMsg->consumerId = pConsumer->consumerId; SRpcMsg rpcMsg = { - .msgType = TDMT_MND_TMQ_CONSUMER_LOST, - .pCont = pLostMsg, - .contLen = sizeof(SMqConsumerLostMsg), - }; + .msgType = TDMT_MND_TMQ_CONSUMER_LOST, .pCont = pLostMsg, .contLen = sizeof(SMqConsumerLostMsg)}; + mDebug("consumer:0x%"PRIx64" hb not received beyond threshold %d, set to lost", pConsumer->consumerId, + MND_CONSUMER_LOST_HB_CNT); tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } } else if (status == MQ_CONSUMER_STATUS__LOST_REBD) { // if the client is lost longer than one day, clear it. Otherwise, do nothing about the lost consumers. if (hbStatus > MND_CONSUMER_LOST_CLEAR_THRESHOLD) { SMqConsumerClearMsg *pClearMsg = rpcMallocCont(sizeof(SMqConsumerClearMsg)); + if (pClearMsg == NULL) { + mError("consumer:0x%"PRIx64" failed to clear consumer due to out of memory. alloc size:%d", + pConsumer->consumerId, (int32_t)sizeof(SMqConsumerClearMsg)); + continue; + } pClearMsg->consumerId = pConsumer->consumerId; SRpcMsg rpcMsg = { - .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, - .pCont = pClearMsg, - .contLen = sizeof(SMqConsumerClearMsg), - }; + .msgType = TDMT_MND_TMQ_LOST_CONSUMER_CLEAR, .pCont = pClearMsg, .contLen = sizeof(SMqConsumerClearMsg)}; + mDebug("consumer:0x%" PRIx64 " lost beyond threshold %d, clear it", pConsumer->consumerId, + MND_CONSUMER_LOST_CLEAR_THRESHOLD); tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); } } else if (status == MQ_CONSUMER_STATUS__LOST) { @@ -334,7 +347,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) { taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId); } taosRUnLockLatch(&pConsumer->lock); - } else { + } else { // MQ_CONSUMER_STATUS_REBALANCE taosRLockLatch(&pConsumer->lock); int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics); @@ -449,7 +462,6 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { // 1. check consumer status int32_t status = atomic_load_32(&pConsumer->status); -#if 1 if (status == MQ_CONSUMER_STATUS__LOST_REBD) { mInfo("try to recover consumer:0x%" PRIx64, consumerId); SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); @@ -463,7 +475,6 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &pRpcMsg); } -#endif if (status != MQ_CONSUMER_STATUS__READY) { mInfo("consumer:0x%" PRIx64 " not ready, status: %s", consumerId, mndConsumerStatusName(status)); @@ -660,7 +671,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { tstrncpy(pConsumerNew->clientId, subscribe.clientId, tListLen(pConsumerNew->clientId)); // set the update type - pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; + pConsumerNew->updateType = CONSUMER_UPDATE__REBALANCE; taosArrayDestroy(pConsumerNew->assignedTopics); pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup); @@ -673,7 +684,6 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { if (mndTransPrepare(pMnode, pTrans) != 0) goto _over; } else { - /*taosRLockLatch(&pExistedConsumer->lock);*/ int32_t status = atomic_load_32(&pExistedConsumer->status); mInfo("receive subscribe request from existed consumer:0x%" PRIx64 @@ -691,7 +701,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { } // set the update type - pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY; + pConsumerNew->updateType = CONSUMER_UPDATE__REBALANCE; taosArrayDestroy(pConsumerNew->assignedTopics); pConsumerNew->assignedTopics = taosArrayDup(pTopicList, topicNameDup); @@ -870,9 +880,10 @@ static void updateConsumerStatus(SMqConsumerObj *pConsumer) { int32_t status = pConsumer->status; if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) { - if (status == MQ_CONSUMER_STATUS__MODIFY) { + if (status == MQ_CONSUMER_STATUS_REBALANCE) { pConsumer->status = MQ_CONSUMER_STATUS__READY; } else if (status == MQ_CONSUMER_STATUS__LOST) { + ASSERT(taosArrayGetSize(pConsumer->currentTopics) == 0); pConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD; } } @@ -881,7 +892,7 @@ static void updateConsumerStatus(SMqConsumerObj *pConsumer) { // remove from new topic static void removeFromNewTopicList(SMqConsumerObj *pConsumer, const char *pTopic) { int32_t size = taosArrayGetSize(pConsumer->rebNewTopics); - for (int32_t i = 0; i < taosArrayGetSize(pConsumer->rebNewTopics); i++) { + for (int32_t i = 0; i < size; i++) { char *p = taosArrayGetP(pConsumer->rebNewTopics, i); if (strcmp(pTopic, p) == 0) { taosArrayRemove(pConsumer->rebNewTopics, i); @@ -902,32 +913,57 @@ static void removeFromRemoveTopicList(SMqConsumerObj *pConsumer, const char *pTo if (strcmp(pTopic, p) == 0) { taosArrayRemove(pConsumer->rebRemovedTopics, i); taosMemoryFree(p); + + mDebug("consumer:0x%" PRIx64 " remove topic:%s in the removed topic list, remain removedTopics:%d", + pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->rebRemovedTopics)); break; } } } +static void removeFromCurrentTopicList(SMqConsumerObj *pConsumer, const char *pTopic) { + int32_t sz = taosArrayGetSize(pConsumer->currentTopics); + for (int32_t i = 0; i < sz; i++) { + char *topic = taosArrayGetP(pConsumer->currentTopics, i); + if (strcmp(pTopic, topic) == 0) { + taosArrayRemove(pConsumer->currentTopics, i); + taosMemoryFree(topic); + + mDebug("consumer:0x%" PRIx64 " remove topic:%s in the current topic list, remain currentTopics:%d", + pConsumer->consumerId, pTopic, (int)taosArrayGetSize(pConsumer->currentTopics)); + break; + } + } +} + +static bool existInCurrentTopicList(const SMqConsumerObj* pConsumer, const char* pTopic) { + bool existing = false; + int32_t size = taosArrayGetSize(pConsumer->currentTopics); + for (int32_t i = 0; i < size; i++) { + char *topic = taosArrayGetP(pConsumer->currentTopics, i); + + if (strcmp(topic, pTopic) == 0) { + existing = true; + break; + } + } + + return existing; +} + static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, SMqConsumerObj *pNewConsumer) { mDebug("consumer:0x%" PRIx64 " perform update action, update type:%d, subscribe-time:%" PRId64 ", uptime:%" PRId64, pOldConsumer->consumerId, pNewConsumer->updateType, pOldConsumer->subscribeTime, pOldConsumer->upTime); taosWLockLatch(&pOldConsumer->lock); - if (pNewConsumer->updateType == CONSUMER_UPDATE__MODIFY) { - SArray *tmp = pOldConsumer->rebNewTopics; - pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics; - pNewConsumer->rebNewTopics = tmp; - - tmp = pOldConsumer->rebRemovedTopics; - pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics; - pNewConsumer->rebRemovedTopics = tmp; - - tmp = pOldConsumer->assignedTopics; - pOldConsumer->assignedTopics = pNewConsumer->assignedTopics; - pNewConsumer->assignedTopics = tmp; + if (pNewConsumer->updateType == CONSUMER_UPDATE__REBALANCE) { + TSWAP(pOldConsumer->rebNewTopics, pNewConsumer->rebNewTopics); + TSWAP(pOldConsumer->rebRemovedTopics, pNewConsumer->rebRemovedTopics); + TSWAP(pOldConsumer->assignedTopics, pNewConsumer->assignedTopics); pOldConsumer->subscribeTime = pNewConsumer->upTime; - pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY; + pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; } else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) { int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics); for (int32_t i = 0; i < sz; i++) { @@ -937,10 +973,10 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, pOldConsumer->rebalanceTime = pNewConsumer->upTime; - int32_t status = pOldConsumer->status; + int32_t prevStatus = pOldConsumer->status; pOldConsumer->status = MQ_CONSUMER_STATUS__LOST; mDebug("consumer:0x%" PRIx64 " state %s -> %s, reb-time:%" PRId64 ", reb-removed-topics:%d", - pOldConsumer->consumerId, mndConsumerStatusName(status), mndConsumerStatusName(pOldConsumer->status), + pOldConsumer->consumerId, mndConsumerStatusName(prevStatus), mndConsumerStatusName(pOldConsumer->status), pOldConsumer->rebalanceTime, (int)taosArrayGetSize(pOldConsumer->rebRemovedTopics)); } else if (pNewConsumer->updateType == CONSUMER_UPDATE__RECOVER) { int32_t sz = taosArrayGetSize(pOldConsumer->assignedTopics); @@ -950,8 +986,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, } pOldConsumer->rebalanceTime = pNewConsumer->upTime; - - pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY; + pOldConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; } else if (pNewConsumer->updateType == CONSUMER_UPDATE__TOUCH) { atomic_add_fetch_32(&pOldConsumer->epoch, 1); @@ -960,24 +995,16 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, } else if (pNewConsumer->updateType == CONSUMER_UPDATE__ADD) { char *pNewTopic = taosStrdup(taosArrayGetP(pNewConsumer->rebNewTopics, 0)); - // not exist in current topic - bool existing = false; - int32_t numOfExistedTopics = taosArrayGetSize(pOldConsumer->currentTopics); - for (int32_t i = 0; i < numOfExistedTopics; i++) { - char *topic = taosArrayGetP(pOldConsumer->currentTopics, i); - if (strcmp(topic, pNewTopic) == 0) { - existing = true; - } - } - + // check if exist in current topic removeFromNewTopicList(pOldConsumer, pNewTopic); // add to current topic - if (!existing) { + bool existing = existInCurrentTopicList(pOldConsumer, pNewTopic); + if (existing) { + taosMemoryFree(pNewTopic); + } else { // added into current topic list taosArrayPush(pOldConsumer->currentTopics, &pNewTopic); taosArraySort(pOldConsumer->currentTopics, taosArrayCompareString); - } else { - taosMemoryFree(pNewTopic); } // set status @@ -1002,16 +1029,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, removeFromRemoveTopicList(pOldConsumer, removedTopic); // remove from current topic - int32_t i = 0; - int32_t sz = taosArrayGetSize(pOldConsumer->currentTopics); - for (i = 0; i < sz; i++) { - char *topic = taosArrayGetP(pOldConsumer->currentTopics, i); - if (strcmp(removedTopic, topic) == 0) { - taosArrayRemove(pOldConsumer->currentTopics, i); - taosMemoryFree(topic); - break; - } - } + removeFromCurrentTopicList(pOldConsumer, removedTopic); // set status int32_t status = pOldConsumer->status; @@ -1160,7 +1178,7 @@ static const char *mndConsumerStatusName(int status) { case MQ_CONSUMER_STATUS__LOST: case MQ_CONSUMER_STATUS__LOST_REBD: return "lost"; - case MQ_CONSUMER_STATUS__MODIFY: + case MQ_CONSUMER_STATUS_REBALANCE: return "rebalancing"; default: return "unknown"; diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index c69f08eb6b..6dab018236 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -225,7 +225,7 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_L memcpy(pConsumer->cgroup, cgroup, TSDB_CGROUP_LEN); pConsumer->epoch = 0; - pConsumer->status = MQ_CONSUMER_STATUS__MODIFY; + pConsumer->status = MQ_CONSUMER_STATUS_REBALANCE; pConsumer->hbStatus = 0; taosInitRWLatch(&pConsumer->lock); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 015c497de1..573c60549e 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -213,13 +213,9 @@ static void doRemoveExistedConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs); for (int32_t j = 0; j < consumerVgNum; j++) { - SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); - SMqRebOutputVg outputVg = { - .oldConsumerId = consumerId, - .newConsumerId = -1, - .pVgEp = pVgEp, - }; + SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); + SMqRebOutputVg outputVg = {.oldConsumerId = consumerId, .newConsumerId = -1, .pVgEp = pVgEp}; taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRIx64, pSubKey, pVgEp->vgId, consumerId); } @@ -484,14 +480,16 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu for (int32_t i = 0; i < vgNum; i++) { SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i); if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg) < 0) { - goto REB_FAIL; + mndTransDrop(pTrans); + return -1; } } // 2. redo log: subscribe and vg assignment // subscribe if (mndSetSubCommitLogs(pMnode, pTrans, pOutput->pSub) != 0) { - goto REB_FAIL; + mndTransDrop(pTrans); + return -1; } // 3. commit log: consumer to update status and epoch @@ -506,11 +504,15 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { tDeleteSMqConsumerObj(pConsumerNew); taosMemoryFree(pConsumerNew); - goto REB_FAIL; + + mndTransDrop(pTrans); + return -1; } + tDeleteSMqConsumerObj(pConsumerNew); taosMemoryFree(pConsumerNew); } + // 3.2 set new consumer consumerNum = taosArrayGetSize(pOutput->newConsumers); for (int32_t i = 0; i < consumerNum; i++) { @@ -527,8 +529,11 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { tDeleteSMqConsumerObj(pConsumerNew); taosMemoryFree(pConsumerNew); - goto REB_FAIL; + + mndTransDrop(pTrans); + return -1; } + tDeleteSMqConsumerObj(pConsumerNew); taosMemoryFree(pConsumerNew); } @@ -549,8 +554,11 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu if (mndSetConsumerCommitLogs(pMnode, pTrans, pConsumerNew) != 0) { tDeleteSMqConsumerObj(pConsumerNew); taosMemoryFree(pConsumerNew); - goto REB_FAIL; + + mndTransDrop(pTrans); + return -1; } + tDeleteSMqConsumerObj(pConsumerNew); taosMemoryFree(pConsumerNew); } @@ -563,15 +571,12 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu // 6. execution if (mndTransPrepare(pMnode, pTrans) != 0) { mError("failed to prepare trans rebalance since %s", terrstr()); - goto REB_FAIL; + mndTransDrop(pTrans); + return -1; } mndTransDrop(pTrans); return 0; - -REB_FAIL: - mndTransDrop(pTrans); - return -1; } static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { @@ -584,16 +589,11 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { // here we only handle one topic rebalance requirement to ensure the atomic execution of this transaction. while (1) { -// if (rebalanceOnce) { -// break; -// } - pIter = taosHashIterate(pReq->rebSubHash, pIter); if (pIter == NULL) { break; } - // todo handle the malloc failure SMqRebInputObj rebInput = {0}; SMqRebOutputObj rebOutput = {0}; rebOutput.newConsumers = taosArrayInit(0, sizeof(int64_t)); @@ -601,6 +601,20 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { rebOutput.modifyConsumers = taosArrayInit(0, sizeof(int64_t)); rebOutput.rebVgs = taosArrayInit(0, sizeof(SMqRebOutputVg)); + if (rebOutput.newConsumers == NULL || rebOutput.removedConsumers == NULL || rebOutput.modifyConsumers == NULL || + rebOutput.rebVgs == NULL) { + taosArrayDestroy(rebOutput.newConsumers); + taosArrayDestroy(rebOutput.removedConsumers); + taosArrayDestroy(rebOutput.modifyConsumers); + taosArrayDestroy(rebOutput.rebVgs); + + terrno = TSDB_CODE_OUT_OF_MEMORY; + mInfo("mq re-balance failed, due to out of memory"); + taosHashCleanup(pReq->rebSubHash); + mndRebEnd(); + return -1; + } + SMqRebInfo *pRebInfo = (SMqRebInfo *)pIter; SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebInfo->key); @@ -640,6 +654,7 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { rebInput.oldConsumerNum = taosHashGetSize(pSub->consumerHash); rebOutput.pSub = tCloneSubscribeObj(pSub); taosRUnLockLatch(&pSub->lock); + mInfo("sub topic:%s has %d consumers sub till now", pRebInfo->key, rebInput.oldConsumerNum); mndReleaseSubscribe(pMnode, pSub); } @@ -661,9 +676,6 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { taosArrayDestroy(rebOutput.rebVgs); tDeleteSubscribeObj(rebOutput.pSub); taosMemoryFree(rebOutput.pSub); - -// taosSsleep(100); -// rebalanceOnce = true; } // reset flag diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index acc0d29382..30b2fb74ca 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -100,6 +100,7 @@ typedef struct { SWalRef* pRef; STqPushHandle pushHandle; // push STqExecHandle execHandle; // exec + SRpcMsg* msg; } STqHandle; typedef struct { @@ -113,7 +114,7 @@ struct STQ { char* path; int64_t walLogLastVer; SRWLatch lock; - SHashObj* pPushMgr; // consumerId -> STqPushEntry + SHashObj* pPushMgr; // subKey -> STqHandle SHashObj* pHandle; // subKey -> STqHandle SHashObj* pCheckInfo; // topic -> SAlterCheckInfo STqOffsetStore* pOffsetStore; @@ -146,7 +147,7 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows); int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type); -int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry); +int32_t tqPushDataRsp(STQ* pTq, STqHandle* pHandle); // tqMeta int32_t tqMetaOpen(STQ* pTq); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 0d76010a2a..4aa363093d 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -193,9 +193,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode); void tqNotifyClose(STQ*); void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); -int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, - int32_t type); -int tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer); +int tqRegisterPushEntry(STQ* pTq, void* handle, SRpcMsg* pMsg); +int tqUnregisterPushHandle(STQ* pTq, void* pHandle); int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed. int tqCommit(STQ*); @@ -216,6 +215,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msg int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit); +int32_t tqProcessSubmitReqForSubscribe(STQ* pTq); int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 82e57daf56..b18a44c2bb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -71,18 +71,11 @@ static void destroyTqHandle(void* data) { walCloseReader(pData->pWalReader); tqCloseReader(pData->execHandle.pTqReader); } -} - -static void tqPushEntryFree(void* data) { - STqPushEntry* p = *(void**)data; - if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) { - tDeleteSMqDataRsp(p->pDataRsp); - } else if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__TAOSX_RSP) { - tDeleteSTaosxRsp((STaosxRsp*)p->pDataRsp); + if(pData->msg != NULL) { + rpcFreeCont(pData->msg->pCont); + taosMemoryFree(pData->msg); + pData->msg = NULL; } - - taosMemoryFree(p->pDataRsp); - taosMemoryFree(p); } static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) { @@ -105,14 +98,18 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { taosHashSetFreeFp(pTq->pHandle, destroyTqHandle); taosInitRWLatch(&pTq->lock); - pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); - taosHashSetFreeFp(pTq->pPushMgr, tqPushEntryFree); + pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo); - tqInitialize(pTq); - return pTq; + int32_t code = tqInitialize(pTq); + if (code != TSDB_CODE_SUCCESS) { + tqClose(pTq); + return NULL; + } else { + return pTq; + } } int32_t tqInitialize(STQ* pTq) { @@ -228,17 +225,19 @@ static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqData return 0; } -int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { - SMqDataRsp* pRsp = pPushEntry->pDataRsp; - SMqRspHead* pHeader = &pPushEntry->pDataRsp->head; - doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType); +int32_t tqPushDataRsp(STQ* pTq, STqHandle* pHandle) { + SMqDataRsp dataRsp = {0}; + dataRsp.head.consumerId = pHandle->consumerId; + dataRsp.head.epoch = pHandle->epoch; + dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; + doSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP); char buf1[80] = {0}; char buf2[80] = {0}; - tFormatOffset(buf1, tListLen(buf1), &pRsp->reqOffset); - tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset); + tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset); + tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset); tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", - TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2); + TD_VID(pTq->pVnode), dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2); return 0; } @@ -382,13 +381,13 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey); - - taosWLockLatch(&pTq->lock); - int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); - if (code != 0) { - tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey); - } - taosWUnLockLatch(&pTq->lock); + int32_t code = 0; +// taosWLockLatch(&pTq->lock); +// int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey)); +// if (code != 0) { +// tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey); +// } +// taosWUnLockLatch(&pTq->lock); STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); if (pHandle) { @@ -557,13 +556,12 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } taosWLockLatch(&pTq->lock); - atomic_store_32(&pHandle->epoch, -1); + atomic_store_32(&pHandle->epoch, 0); // remove if it has been register in the push manager, and return one empty block to consumer - tqUnregisterPushHandle(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true); + tqUnregisterPushHandle(pTq, pHandle); atomic_store_64(&pHandle->consumerId, req.newConsumerId); - atomic_add_fetch_32(&pHandle->epoch, 1); if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { qStreamCloseTsdbReader(pTaskInfo); @@ -601,11 +599,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->chkInfo.currentVer = ver; // expand executor - if (pTask->fillHistory) { - pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; - } else { - pTask->status.taskStatus = TASK_STATUS__RESTORE; - } + pTask->status.taskStatus = (pTask->fillHistory)? TASK_STATUS__WAIT_DOWNSTREAM:TASK_STATUS__NORMAL; if (pTask->taskLevel == TASK_LEVEL__SOURCE) { pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); @@ -664,8 +658,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } streamSetupTrigger(pTask); - tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, - pTask->id.idStr, pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel); + + tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", vgId, pTask->id.idStr, + pTask->chkInfo.version, pTask->selfChildId, pTask->taskLevel); // next valid version will add one pTask->chkInfo.version += 1; @@ -693,8 +688,9 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { }; SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); + if (pTask) { - rsp.status = (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL) ? 1 : 0; + rsp.status = streamTaskCheckStatus(pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask); tqDebug("tq recv task check req(reqId:0x%" PRIx64 @@ -1069,6 +1065,34 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) { return 0; } +int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) { + int32_t vgId = TD_VID(pTq->pVnode); + + taosWLockLatch(&pTq->lock); + if(taosHashGetSize(pTq->pPushMgr) > 0){ + void *pIter = taosHashIterate(pTq->pPushMgr, NULL); + while(pIter){ + STqHandle* pHandle = *(STqHandle**)pIter; + tqDebug("vgId:%d start set submit for pHandle:%p, consume id:0x%"PRIx64, vgId, pHandle, pHandle->consumerId); + if(ASSERT(pHandle->msg != NULL)){ + tqError("pHandle->msg should not be null"); + break; + }else{ + SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info}; + tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg); + taosMemoryFree(pHandle->msg); + pHandle->msg = NULL; + } + pIter = taosHashIterate(pTq->pPushMgr, pIter); + } + taosHashClear(pTq->pPushMgr); + } + // unlock + taosWUnLockLatch(&pTq->lock); + + return 0; +} + int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { #if 0 void* pIter = NULL; @@ -1147,11 +1171,8 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); if (pTask != NULL) { if (pTask->status.taskStatus == TASK_STATUS__NORMAL) { - tqDebug("vgId:%d s-task:%s start to process run req", vgId, pTask->id.idStr); - streamProcessRunReq(pTask); - } else if (pTask->status.taskStatus == TASK_STATUS__RESTORE) { - tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId, pTask->id.idStr, - pTask->chkInfo.version); + tqDebug("vgId:%d s-task:%s start to process block from wal, last chk point:%" PRId64, vgId, + pTask->id.idStr, pTask->chkInfo.version); streamProcessRunReq(pTask); } else { tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr); @@ -1347,11 +1368,11 @@ int32_t tqStartStreamTasks(STQ* pTq) { return 0; } - pMeta->walScan += 1; + pMeta->walScanCounter += 1; - if (pMeta->walScan > 1) { - tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScan); - taosWUnLockLatch(&pMeta->lock); + if (pMeta->walScanCounter > 1) { + tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter); + taosWUnLockLatch(&pTq->pStreamMeta->lock); return 0; } diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index e8051a1406..34e93cec2d 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -31,57 +31,67 @@ char* tqOffsetBuildFName(const char* path, int32_t fVer) { int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) { TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ); - if (pFile != NULL) { - STqOffsetHead head = {0}; - int64_t code; + if (pFile == NULL) { + return TSDB_CODE_SUCCESS; + } - while (1) { - if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) { - if (code == 0) { - break; - } else { - return -1; - } - } - int32_t size = htonl(head.size); - void* memBuf = taosMemoryCalloc(1, size); - if (memBuf == NULL) { + int32_t vgId = TD_VID(pStore->pTq->pVnode); + int64_t code = 0; + + STqOffsetHead head = {0}; + + while (1) { + if ((code = taosReadFile(pFile, &head, sizeof(STqOffsetHead))) != sizeof(STqOffsetHead)) { + if (code == 0) { + break; + } else { return -1; } - if ((code = taosReadFile(pFile, memBuf, size)) != size) { - taosMemoryFree(memBuf); - return -1; - } - STqOffset offset; - SDecoder decoder; - tDecoderInit(&decoder, memBuf, size); - if (tDecodeSTqOffset(&decoder, &offset) < 0) { - taosMemoryFree(memBuf); - tDecoderClear(&decoder); - return -1; - } - - tDecoderClear(&decoder); - if (taosHashPut(pStore->pHash, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) { - return -1; - } - - if (offset.val.type == TMQ_OFFSET__LOG) { - STqHandle* pHandle = taosHashGet(pStore->pTq->pHandle, offset.subKey, strlen(offset.subKey)); - if (pHandle) { - if (walRefVer(pHandle->pRef, offset.val.version) < 0) { - tqError("vgId: %d, tq handle %s ref ver %" PRId64 "error", pStore->pTq->pVnode->config.vgId, - pHandle->subKey, offset.val.version); - } - } - } - - taosMemoryFree(memBuf); } - taosCloseFile(&pFile); + int32_t size = htonl(head.size); + void* pMemBuf = taosMemoryCalloc(1, size); + if (pMemBuf == NULL) { + tqError("vgId:%d failed to restore offset from file, since out of memory, malloc size:%d", vgId, size); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + if ((code = taosReadFile(pFile, pMemBuf, size)) != size) { + taosMemoryFree(pMemBuf); + return -1; + } + + STqOffset offset; + SDecoder decoder; + tDecoderInit(&decoder, pMemBuf, size); + if (tDecodeSTqOffset(&decoder, &offset) < 0) { + taosMemoryFree(pMemBuf); + tDecoderClear(&decoder); + return code; + } + + tDecoderClear(&decoder); + if (taosHashPut(pStore->pHash, offset.subKey, strlen(offset.subKey), &offset, sizeof(STqOffset)) < 0) { + return -1; + } + + // todo remove this + if (offset.val.type == TMQ_OFFSET__LOG) { + STqHandle* pHandle = taosHashGet(pStore->pTq->pHandle, offset.subKey, strlen(offset.subKey)); + if (pHandle) { + if (walRefVer(pHandle->pRef, offset.val.version) < 0) { +// tqError("vgId: %d, tq handle %s ref ver %" PRId64 "error", pStore->pTq->pVnode->config.vgId, pHandle->subKey, +// offset.val.version); + } + } + } + + taosMemoryFree(pMemBuf); } - return 0; + + taosCloseFile(&pFile); + return TSDB_CODE_SUCCESS; } STqOffsetStore* tqOffsetOpen(STQ* pTq) { @@ -89,6 +99,7 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) { if (pStore == NULL) { return NULL; } + pStore->pTq = pTq; pStore->needCommit = 0; pTq->pOffsetStore = pStore; @@ -98,12 +109,14 @@ STqOffsetStore* tqOffsetOpen(STQ* pTq) { taosMemoryFree(pStore); return NULL; } + char* fname = tqOffsetBuildFName(pStore->pTq->path, 0); if (tqOffsetRestoreFromFile(pStore, fname) < 0) { taosMemoryFree(fname); taosMemoryFree(pStore); return NULL; } + taosMemoryFree(fname); return pStore; } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 7a1a6b7454..0575b7299d 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -206,121 +206,60 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ } #endif -typedef struct { - void* pKey; - int64_t keyLen; -} SItem; - -static void recordPushedEntry(SArray* cachedKey, void* pIter); -static void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq); - -static void freeItem(void* param) { - SItem* p = (SItem*)param; - taosMemoryFree(p->pKey); -} - -static void doPushDataForEntry(void* pIter, STqExecHandle* pExec, STQ* pTq, int64_t ver, int32_t vgId, char* pData, - int32_t dataLen, SArray* pCachedKey) { - STqPushEntry* pPushEntry = *(STqPushEntry**)pIter; - - SMqDataRsp* pRsp = pPushEntry->pDataRsp; - if (pRsp->reqOffset.version >= ver) { - tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip", vgId, - pRsp->reqOffset.version, ver); - return; - } - - qTaskInfo_t pTaskInfo = pExec->task; - - // prepare scan mem data - SPackedData submit = {.msgStr = pData, .msgLen = dataLen, .ver = ver}; - - if (qStreamSetScanMemData(pTaskInfo, submit) != 0) { - return; - } - qStreamSetOpen(pTaskInfo); - // here start to scan submit block to extract the subscribed data - int32_t totalRows = 0; - - while (1) { - SSDataBlock* pDataBlock = NULL; - uint64_t ts = 0; - if (qExecTask(pTaskInfo, &pDataBlock, &ts) < 0) { - tqDebug("vgId:%d, tq exec error since %s", vgId, terrstr()); - } - - if (pDataBlock == NULL) { - break; - } - - tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision); - pRsp->blockNum++; - totalRows += pDataBlock->info.rows; - } - - tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d, rows:%d", vgId, pPushEntry->subKey, pRsp->blockNum, - totalRows); - - if (pRsp->blockNum > 0) { - tqOffsetResetToLog(&pRsp->rspOffset, ver); - tqPushDataRsp(pTq, pPushEntry); - recordPushedEntry(pCachedKey, pIter); - } -} - int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { - void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg)); - int32_t len = msgLen - sizeof(SSubmitReq2Msg); - int32_t vgId = TD_VID(pTq->pVnode); +// void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg)); +// int32_t len = msgLen - sizeof(SSubmitReq2Msg); +// int32_t vgId = TD_VID(pTq->pVnode); if (msgType == TDMT_VND_SUBMIT) { + tqProcessSubmitReqForSubscribe(pTq); // lock push mgr to avoid potential msg lost - taosWLockLatch(&pTq->lock); - - int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr); - if (numOfRegisteredPush > 0) { - tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d", - vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush); - - void* data = taosMemoryMalloc(len); - if (data == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("failed to copy data for stream since out of memory, vgId:%d", vgId); - taosWUnLockLatch(&pTq->lock); - return -1; - } - - memcpy(data, pReq, len); - - SArray* cachedKey = taosArrayInit(0, sizeof(SItem)); - void* pIter = NULL; - - while (1) { - pIter = taosHashIterate(pTq->pPushMgr, pIter); - if (pIter == NULL) { - break; - } - - STqPushEntry* pPushEntry = *(STqPushEntry**)pIter; - - STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey)); - if (pHandle == NULL) { - tqDebug("vgId:%d, failed to find handle %s in pushing data to consumer, ignore", pTq->pVnode->config.vgId, - pPushEntry->subKey); - continue; - } - - STqExecHandle* pExec = &pHandle->execHandle; - doPushDataForEntry(pIter, pExec, pTq, ver, vgId, data, len, cachedKey); - } - - doRemovePushedEntry(cachedKey, pTq); - taosArrayDestroyEx(cachedKey, freeItem); - taosMemoryFree(data); - } - - // unlock - taosWUnLockLatch(&pTq->lock); +// taosWLockLatch(&pTq->lock); +// +// int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr); +// if (numOfRegisteredPush > 0) { +// tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d", +// vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush); +// +// void* data = taosMemoryMalloc(len); +// if (data == NULL) { +// terrno = TSDB_CODE_OUT_OF_MEMORY; +// tqError("failed to copy data for stream since out of memory, vgId:%d", vgId); +// taosWUnLockLatch(&pTq->lock); +// return -1; +// } +// +// memcpy(data, pReq, len); +// +// SArray* cachedKey = taosArrayInit(0, sizeof(SItem)); +// void* pIter = NULL; +// +// while (1) { +// pIter = taosHashIterate(pTq->pPushMgr, pIter); +// if (pIter == NULL) { +// break; +// } +// +// STqPushEntry* pPushEntry = *(STqPushEntry**)pIter; +// +// STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey)); +// if (pHandle == NULL) { +// tqDebug("vgId:%d, failed to find handle %s in pushing data to consumer, ignore", pTq->pVnode->config.vgId, +// pPushEntry->subKey); +// continue; +// } +// +// STqExecHandle* pExec = &pHandle->execHandle; +// doPushDataForEntry(pIter, pExec, pTq, ver, vgId, data, len, cachedKey); +// } +// +// doRemovePushedEntry(cachedKey, pTq); +// taosArrayDestroyEx(cachedKey, freeItem); +// taosMemoryFree(data); +// } +// +// // unlock +// taosWUnLockLatch(&pTq->lock); } tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, (int)taosHashGetSize(pTq->pStreamMeta->pTasks)); @@ -335,21 +274,6 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v } if (msgType == TDMT_VND_SUBMIT) { -#if 0 - void* data = taosMemoryMalloc(len); - if (data == NULL) { - // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", vgId); - return -1; - } - - memcpy(data, pReq, len); - SPackedData submit = {.msgStr = data, .msgLen = len, .ver = ver}; - - tqDebug("vgId:%d tq copy submit msg:%p len:%d ver:%" PRId64 " from %p for stream", vgId, data, len, ver, pReq); - tqProcessSubmitReq(pTq, submit); -#endif SPackedData submit = {0}; tqProcessSubmitReq(pTq, submit); } @@ -362,83 +286,39 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v return 0; } -int32_t tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, - int32_t type) { - uint64_t consumerId = pRequest->consumerId; + +int32_t tqRegisterPushEntry(STQ* pTq, void* handle, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); - STqHandle* pTqHandle = pHandle; - - STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry)); - if (pPushEntry == NULL) { - tqDebug("tmq poll: consumer:0x%" PRIx64 ", vgId:%d failed to malloc, size:%d", consumerId, vgId, - (int32_t)sizeof(STqPushEntry)); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + STqHandle* pHandle = (STqHandle*) handle; + if(pHandle->msg == NULL){ + pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg)); + memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); + pHandle->msg->pCont = rpcMallocCont(pMsg->contLen); + }else{ + void *tmp = pHandle->msg->pCont; + memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); + pHandle->msg->pCont = tmp; } - pPushEntry->info = pRpcMsg->info; - memcpy(pPushEntry->subKey, pTqHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN); - - if (type == TMQ_MSG_TYPE__TAOSX_RSP) { - pPushEntry->pDataRsp = taosMemoryCalloc(1, sizeof(STaosxRsp)); - memcpy(pPushEntry->pDataRsp, pDataRsp, sizeof(STaosxRsp)); - } else if (type == TMQ_MSG_TYPE__POLL_RSP) { - pPushEntry->pDataRsp = taosMemoryCalloc(1, sizeof(SMqDataRsp)); - memcpy(pPushEntry->pDataRsp, pDataRsp, sizeof(SMqDataRsp)); - } - - SMqRspHead* pHead = &pPushEntry->pDataRsp->head; - pHead->consumerId = consumerId; - pHead->epoch = pRequest->epoch; - pHead->mqMsgType = type; - - taosHashPut(pTq->pPushMgr, pTqHandle->subKey, strlen(pTqHandle->subKey), &pPushEntry, sizeof(void*)); - - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s offset:%" PRId64 ", vgId:%d save handle to push mgr, total:%d", - consumerId, pTqHandle->subKey, pDataRsp->reqOffset.version, vgId, taosHashGetSize(pTq->pPushMgr)); + memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen); + pHandle->msg->contLen = pMsg->contLen; + int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES); + tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen); return 0; } -int32_t tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) { - int32_t vgId = TD_VID(pTq->pVnode); - STqPushEntry** pEntry = taosHashGet(pTq->pPushMgr, pKey, keyLen); +int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) { + STqHandle *pHandle = (STqHandle*)handle; + int32_t vgId = TD_VID(pTq->pVnode); - if (pEntry != NULL) { - uint64_t cId = (*pEntry)->pDataRsp->head.consumerId; - ASSERT(consumerId == cId); + int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey)); + tqError("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId); + if(pHandle->msg != NULL) { + tqPushDataRsp(pTq, pHandle); - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s vgId:%d remove from push mgr, remains:%d", consumerId, - (*pEntry)->subKey, vgId, taosHashGetSize(pTq->pPushMgr) - 1); - - if (rspConsumer) { // rsp the old consumer with empty block. - tqPushDataRsp(pTq, *pEntry); - } - - taosHashRemove(pTq->pPushMgr, pKey, keyLen); + rpcFreeCont(pHandle->msg->pCont); + taosMemoryFree(pHandle->msg); + pHandle->msg = NULL; } - return 0; } - -void recordPushedEntry(SArray* cachedKey, void* pIter) { - size_t kLen = 0; - void* key = taosHashGetKey(pIter, &kLen); - SItem item = {.pKey = strndup(key, kLen), .keyLen = kLen}; - taosArrayPush(cachedKey, &item); -} - -void doRemovePushedEntry(SArray* pCachedKeys, STQ* pTq) { - int32_t vgId = TD_VID(pTq->pVnode); - int32_t numOfKeys = (int32_t)taosArrayGetSize(pCachedKeys); - - for (int32_t i = 0; i < numOfKeys; i++) { - SItem* pItem = taosArrayGet(pCachedKeys, i); - if (taosHashRemove(pTq->pPushMgr, pItem->pKey, pItem->keyLen) != 0) { - tqError("vgId:%d, tq push hash remove key error, key: %s", vgId, (char*)pItem->pKey); - } - } - - if (numOfKeys > 0) { - tqDebug("vgId:%d, pushed %d items and remain:%d", vgId, numOfKeys, (int32_t)taosHashGetSize(pTq->pPushMgr)); - } -} diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index b5fae3184a..5d7267b3dc 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -18,16 +18,15 @@ static int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle); // this function should be executed by stream threads. -// there is a case that the WAL increases more fast than the restore procedure, and this restore procedure -// will not stop eventually. +// extract submit block from WAL, and add them into the input queue for the sources tasks. int32_t tqStreamTasksScanWal(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; int64_t st = taosGetTimestampMs(); while (1) { - int32_t scan = pMeta->walScan; - tqDebug("vgId:%d continue check if data in wal are available, scan:%d", vgId, scan); + int32_t scan = pMeta->walScanCounter; + tqDebug("vgId:%d continue check if data in wal are available, walScanCounter:%d", vgId, scan); // check all restore tasks bool shouldIdle = true; @@ -37,12 +36,12 @@ int32_t tqStreamTasksScanWal(STQ* pTq) { if (shouldIdle) { taosWLockLatch(&pMeta->lock); - pMeta->walScan -= 1; - times = pMeta->walScan; + pMeta->walScanCounter -= 1; + times = pMeta->walScanCounter; - ASSERT(pMeta->walScan >= 0); + ASSERT(pMeta->walScanCounter >= 0); - if (pMeta->walScan <= 0) { + if (pMeta->walScanCounter <= 0) { taosWUnLockLatch(&pMeta->lock); break; } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 017b479c1b..d186c63871 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -169,20 +169,20 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType); - // lock - taosWLockLatch(&pTq->lock); - qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); int code = tqScanData(pTq, pHandle, &dataRsp, pOffset); if(code != 0) { goto end; } - // till now, all data has been transferred to consumer, new data needs to push client once arrived. +// till now, all data has been transferred to consumer, new data needs to push client once arrived. if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { - code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); + // lock + taosWLockLatch(&pTq->lock); + code = tqRegisterPushEntry(pTq, pHandle, pMsg); taosWUnLockLatch(&pTq->lock); + tDeleteSMqDataRsp(&dataRsp); return code; } @@ -197,7 +197,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, tFormatOffset(buf, 80, &dataRsp.rspOffset); tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, reqId:0x%" PRIx64 " code:%d", consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code); - taosWUnLockLatch(&pTq->lock); +// taosWUnLockLatch(&pTq->lock); tDeleteSMqDataRsp(&dataRsp); } return code; @@ -211,6 +211,12 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, SMqMetaRsp metaRsp = {0}; STaosxRsp taosxRsp = {0}; tqInitTaosxRsp(&taosxRsp, pRequest); + qTaskInfo_t task = pHandle->execHandle.task; + if(qTaskIsExecuting(task)){ + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + tDeleteSTaosxRsp(&taosxRsp); + return code; + } if (offset->type != TMQ_OFFSET__LOG) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index eb15400d05..1ef86f5b30 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3395,6 +3395,11 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) { STableUidList* pUidList = &pStatus->uidList; while (1) { + if (pReader->flag == READER_STATUS_SHOULD_STOP) { + tsdbWarn("tsdb reader is stopped ASAP, %s", pReader->idStr); + return TSDB_CODE_SUCCESS; + } + STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter; initMemDataIterator(*pBlockScanInfo, pReader); @@ -3474,45 +3479,68 @@ static bool fileBlockPartiallyRead(SFileBlockDumpInfo* pDumpInfo, bool asc) { ((pDumpInfo->rowIndex > 0 && asc) || (pDumpInfo->rowIndex < (pDumpInfo->totalRows - 1) && (!asc))); } +typedef enum { + TSDB_READ_RETURN = 0x1, + TSDB_READ_CONTINUE = 0x2, +} ERetrieveType; + +static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) { + int32_t code = TSDB_CODE_SUCCESS; + SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; + SDataBlockIter* pBlockIter = &pReader->status.blockIter; + + while(1) { + terrno = 0; + + code = doLoadLastBlockSequentially(pReader); + if (code != TSDB_CODE_SUCCESS || pReader->flag == READER_STATUS_SHOULD_STOP) { + terrno = code; + return TSDB_READ_RETURN; + } + + if (pResBlock->info.rows > 0) { + return TSDB_READ_RETURN; + } + + // all data blocks are checked in this last block file, now let's try the next file + ASSERT(pReader->status.pTableIter == NULL); + code = initForFirstBlockInFile(pReader, pBlockIter); + + // error happens or all the data files are completely checked + if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false) || + pReader->flag == READER_STATUS_SHOULD_STOP) { + terrno = code; + return TSDB_READ_RETURN; + } + + if (pBlockIter->numOfBlocks > 0) { // there are data blocks existed. + return TSDB_READ_CONTINUE; + } else { // all blocks in data file are checked, let's check the data in last files + resetTableListIndex(&pReader->status); + } + } +} + static int32_t buildBlockFromFiles(STsdbReader* pReader) { int32_t code = TSDB_CODE_SUCCESS; bool asc = ASCENDING_TRAVERSE(pReader->order); SDataBlockIter* pBlockIter = &pReader->status.blockIter; + SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; if (pBlockIter->numOfBlocks == 0) { - _begin: - code = doLoadLastBlockSequentially(pReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - if (pReader->resBlockInfo.pResBlock->info.rows > 0) { - return TSDB_CODE_SUCCESS; - } - - // all data blocks are checked in this last block file, now let's try the next file - if (pReader->status.pTableIter == NULL) { - code = initForFirstBlockInFile(pReader, pBlockIter); - - // error happens or all the data files are completely checked - if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { - return code; - } - - // this file does not have data files, let's start check the last block file if exists - if (pBlockIter->numOfBlocks == 0) { - resetTableListIndex(&pReader->status); - goto _begin; - } + // let's try to extract data from stt files. + ERetrieveType type = doReadDataFromLastFiles(pReader); + if (type == TSDB_READ_RETURN) { + return terrno; } code = doBuildDataBlock(pReader); - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS || pReader->flag == READER_STATUS_SHOULD_STOP) { return code; } - if (pReader->resBlockInfo.pResBlock->info.rows > 0) { + if (pResBlock->info.rows > 0) { return TSDB_CODE_SUCCESS; } } @@ -3530,30 +3558,22 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { if (hasNext) { // check for the next block in the block accessed order list initBlockDumpInfo(pReader, pBlockIter); } else { - if (pReader->status.pCurrentFileset->nSttF > 0) { - // data blocks in current file are exhausted, let's try the next file now - SBlockData* pBlockData = &pReader->status.fileBlockData; - if (pBlockData->uid != 0) { - tBlockDataClear(pBlockData); - } + // all data blocks in files are checked, let's check the data in last files. + ASSERT(pReader->status.pCurrentFileset->nSttF > 0); - tBlockDataReset(pBlockData); - resetDataBlockIterator(pBlockIter, pReader->order); - resetTableListIndex(&pReader->status); - goto _begin; - } else { - code = initForFirstBlockInFile(pReader, pBlockIter); + // data blocks in current file are exhausted, let's try the next file now + SBlockData* pBlockData = &pReader->status.fileBlockData; + if (pBlockData->uid != 0) { + tBlockDataClear(pBlockData); + } - // error happens or all the data files are completely checked - if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { - return code; - } + tBlockDataReset(pBlockData); + resetDataBlockIterator(pBlockIter, pReader->order); + resetTableListIndex(&pReader->status); - // this file does not have blocks, let's start check the last block file - if (pBlockIter->numOfBlocks == 0) { - resetTableListIndex(&pReader->status); - goto _begin; - } + ERetrieveType type = doReadDataFromLastFiles(pReader); + if (type == TSDB_READ_RETURN) { + return terrno; } } } @@ -3561,11 +3581,11 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { code = doBuildDataBlock(pReader); } - if (code != TSDB_CODE_SUCCESS) { + if (code != TSDB_CODE_SUCCESS || pReader->flag == READER_STATUS_SHOULD_STOP) { return code; } - if (pReader->resBlockInfo.pResBlock->info.rows > 0) { + if (pResBlock->info.rows > 0) { return TSDB_CODE_SUCCESS; } } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index dd6eda7d93..fecfa32885 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -458,7 +458,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp walApplyVer(pVnode->pWal, version); if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { - /*vInfo("vgId:%d, push msg end", pVnode->config.vgId);*/ +// /*vInfo("vgId:%d, push msg end", pVnode->config.vgId);*/ vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } @@ -497,11 +497,16 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { vTrace("message in vnode query queue is processing"); - if ((pMsg->msgType == TDMT_SCH_QUERY) && !syncIsReadyForRead(pVnode->sync)) { + if ((pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_VND_TMQ_CONSUME) && !syncIsReadyForRead(pVnode->sync)) { vnodeRedirectRpcMsg(pVnode, pMsg, terrno); return 0; } + if (pMsg->msgType == TDMT_VND_TMQ_CONSUME && !pVnode->restored) { + vnodeRedirectRpcMsg(pVnode, pMsg, TSDB_CODE_SYN_RESTORING); + return 0; + } + SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb}; switch (pMsg->msgType) { case TDMT_SCH_QUERY: @@ -509,6 +514,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0); case TDMT_SCH_QUERY_CONTINUE: return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0); + case TDMT_VND_TMQ_CONSUME: + return tqProcessPollReq(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in query queue", pMsg->msgType); return TSDB_CODE_APP_ERROR; @@ -518,17 +525,12 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg); if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG || - pMsg->msgType == TDMT_VND_BATCH_META || pMsg->msgType == TDMT_VND_TMQ_CONSUME) && + pMsg->msgType == TDMT_VND_BATCH_META) && !syncIsReadyForRead(pVnode->sync)) { vnodeRedirectRpcMsg(pVnode, pMsg, terrno); return 0; } - if (pMsg->msgType == TDMT_VND_TMQ_CONSUME && !pVnode->restored) { - vnodeRedirectRpcMsg(pVnode, pMsg, TSDB_CODE_SYN_RESTORING); - return 0; - } - switch (pMsg->msgType) { case TDMT_SCH_FETCH: case TDMT_SCH_MERGE_FETCH: @@ -547,8 +549,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { return vnodeGetTableCfg(pVnode, pMsg, true); case TDMT_VND_BATCH_META: return vnodeGetBatchMeta(pVnode, pMsg); - case TDMT_VND_TMQ_CONSUME: - return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_RUN: return tqProcessTaskRunReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_DISPATCH: diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 500e9cb52d..f2ee5358cc 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -550,7 +550,12 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) vInfo("vgId:%d, sync restore finished, start to restore stream tasks by replay wal", pVnode->config.vgId); // start to restore all stream tasks - tqStartStreamTasks(pVnode->pTq); + if (tsDisableStream) { + vInfo("vgId:%d, not restore stream tasks, since disabled", pVnode->config.vgId); + } else { + vInfo("vgId:%d start to restore stream tasks", pVnode->config.vgId); + tqStartStreamTasks(pVnode->pTq); + } } static void vnodeBecomeFollower(const SSyncFSM *pFsm) { diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index ec087c1168..433eacd30e 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -1082,7 +1082,7 @@ int32_t ctgUpdateUserEnqueue(SCatalog *pCtg, SGetUserAuthRsp *pAuth, bool syncOp if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg)); taosMemoryFree(op); - CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } msg->pCtg = pCtg; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 7a5715e5ed..2d991a14f5 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -139,7 +139,6 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; } else if (type == STREAM_INPUT__DATA_SUBMIT) { - ASSERT(numOfBlocks == 1); taosArrayPush(pInfo->pBlockLists, input); pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; } else if (type == STREAM_INPUT__DATA_BLOCK) { @@ -313,7 +312,6 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v qTaskInfo_t pTaskInfo = NULL; code = qCreateExecTask(readers, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM); if (code != TSDB_CODE_SUCCESS) { - nodesDestroyNode((SNode*)pPlan); qDestroyTask(pTaskInfo); terrno = code; return NULL; @@ -801,7 +799,11 @@ void qDestroyTask(qTaskInfo_t qTaskHandle) { return; } - qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows); + if (pTaskInfo->pRoot != NULL) { + qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows); + } else { + qDebug("%s execTask completed", GET_TASKID(pTaskInfo)); + } printTaskExecCostInLog(pTaskInfo); // print the query cost summary doDestroyTask(pTaskInfo); @@ -854,15 +856,6 @@ int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) { } } -#if 0 -int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); - taosWriteQitem(pTaskInfo->streamInfo.inputQueue->queue, pItem); - return 0; -} -#endif - int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); @@ -897,8 +890,7 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); - ASSERT(pInfo->twAggSup.calTriggerSaved == 0); - ASSERT(pInfo->twAggSup.deleteMarkSaved == 0); + ASSERT(pInfo->twAggSup.calTriggerSaved == 0 && pInfo->twAggSup.deleteMarkSaved == 0); qInfo("save stream param for interval: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); @@ -914,9 +906,8 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); - ASSERT(pInfo->twAggSup.calTriggerSaved == 0); - ASSERT(pInfo->twAggSup.deleteMarkSaved == 0); + ASSERT(pInfo->twAggSup.calTriggerSaved == 0 && pInfo->twAggSup.deleteMarkSaved == 0); qInfo("save stream param for session: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); pInfo->twAggSup.calTriggerSaved = pInfo->twAggSup.calTrigger; @@ -929,8 +920,7 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) { SStreamStateAggOperatorInfo* pInfo = pOperator->info; ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE); - ASSERT(pInfo->twAggSup.calTriggerSaved == 0); - ASSERT(pInfo->twAggSup.deleteMarkSaved == 0); + ASSERT(pInfo->twAggSup.calTriggerSaved == 0 && pInfo->twAggSup.deleteMarkSaved == 0); qInfo("save stream param for state: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); @@ -991,7 +981,6 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) { if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) { if (pOperator->numOfDownstream > 1) { qError("unexpected stream, multiple downstream"); - /*ASSERT(0);*/ return -1; } return 0; diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c index b6b250a325..a4d8327b6a 100644 --- a/source/libs/executor/src/querytask.c +++ b/source/libs/executor/src/querytask.c @@ -99,6 +99,7 @@ int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHand if (NULL == (*pTaskInfo)->pRoot) { int32_t code = (*pTaskInfo)->code; doDestroyTask(*pTaskInfo); + (*pTaskInfo) = NULL; return code; } else { return TSDB_CODE_SUCCESS; @@ -206,11 +207,14 @@ static void freeBlock(void* pParam) { void doDestroyTask(SExecTaskInfo* pTaskInfo) { qDebug("%s execTask is freed", GET_TASKID(pTaskInfo)); destroyOperator(pTaskInfo->pRoot); + pTaskInfo->pRoot = NULL; + cleanupQueriedTableScanInfo(&pTaskInfo->schemaInfo); cleanupStreamInfo(&pTaskInfo->streamInfo); if (!pTaskInfo->localFetch.localExec) { nodesDestroyNode((SNode*)pTaskInfo->pSubplan); + pTaskInfo->pSubplan = NULL; } taosArrayDestroyEx(pTaskInfo->pResultBlockList, freeBlock); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index ed90bb605a..79fce05491 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -30,6 +30,11 @@ #define SYSTABLE_SHOW_TYPE_OFFSET QUERY_NODE_SHOW_DNODES_STMT +typedef struct SRewriteTbNameContext { + int32_t errCode; + char* pTbName; +} SRewriteTbNameContext; + typedef struct STranslateContext { SParseContext* pParseCxt; int32_t errCode; @@ -2541,6 +2546,64 @@ static int32_t setTableCacheLastMode(STranslateContext* pCxt, SSelectStmt* pSele return code; } +static EDealRes doTranslateTbName(SNode** pNode, void* pContext) { + switch (nodeType(*pNode)) { + case QUERY_NODE_FUNCTION: { + SFunctionNode *pFunc = (SFunctionNode *)*pNode; + if (FUNCTION_TYPE_TBNAME == pFunc->funcType) { + SRewriteTbNameContext *pCxt = (SRewriteTbNameContext*)pContext; + SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE); + if (NULL == pVal) { + pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; + return DEAL_RES_ERROR; + } + + int32_t tbLen = strlen(pCxt->pTbName); + pVal->literal = taosStrdup(pCxt->pTbName); + if (NULL == pVal->literal) { + pCxt->errCode = TSDB_CODE_OUT_OF_MEMORY; + return DEAL_RES_ERROR; + } + pVal->isDuration = false; + pVal->translate = true; + pVal->node.resType.type = TSDB_DATA_TYPE_BINARY; + pVal->node.resType.bytes = tbLen + VARSTR_HEADER_SIZE; + pVal->datum.p = taosMemoryCalloc(1, tbLen + VARSTR_HEADER_SIZE + 1); + varDataSetLen(pVal->datum.p, tbLen); + strncpy(varDataVal(pVal->datum.p), pVal->literal, tbLen); + strcpy(pVal->node.userAlias, pFunc->node.userAlias); + strcpy(pVal->node.aliasName, pFunc->node.aliasName); + + nodesDestroyNode(*pNode); + *pNode = (SNode*)pVal; + } + break; + } + default: + break; + } + return DEAL_RES_CONTINUE; +} + +static int32_t replaceTbName(STranslateContext* pCxt, SSelectStmt* pSelect) { + if (QUERY_NODE_REAL_TABLE != nodeType(pSelect->pFromTable)) { + return TSDB_CODE_SUCCESS; + } + + SRealTableNode* pTable = (SRealTableNode*)pSelect->pFromTable; + if (TSDB_CHILD_TABLE != pTable->pMeta->tableType && TSDB_NORMAL_TABLE != pTable->pMeta->tableType && TSDB_SYSTEM_TABLE != pTable->pMeta->tableType) { + return TSDB_CODE_SUCCESS; + } + + SNode** pNode = NULL; + SRewriteTbNameContext pRewriteCxt = {0}; + pRewriteCxt.pTbName = pTable->table.tableName; + + nodesRewriteExprPostOrder(&pSelect->pWhere, doTranslateTbName, &pRewriteCxt); + + return pRewriteCxt.errCode; +} + static int32_t checkJoinTable(STranslateContext* pCxt, SJoinTableNode* pJoinTable) { if ((QUERY_NODE_TEMP_TABLE == nodeType(pJoinTable->pLeft) && !isTimeLineQuery(((STempTableNode*)pJoinTable->pLeft)->pSubquery)) || @@ -3655,6 +3718,10 @@ static int32_t translateSelectFrom(STranslateContext* pCxt, SSelectStmt* pSelect if (TSDB_CODE_SUCCESS == code) { code = setTableCacheLastMode(pCxt, pSelect); } + if (TSDB_CODE_SUCCESS == code) { + code = replaceTbName(pCxt, pSelect); + } + return code; } diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 00da4ef9a2..71fbe5e086 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -39,7 +39,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); -int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); +int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecoverFinishReq* pReq, int32_t vgId, SEpSet* pEpSet); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 120fe68b5d..1fa3dc3354 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -216,9 +216,10 @@ int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { } int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { - qDebug("vgId:%d s-task:%s receive dispatch req from taskId:%d", pReq->upstreamNodeId, pTask->id.idStr, - pReq->upstreamTaskId); + qDebug("s-task:%s receive dispatch req from taskId:%d(vgId:%d)", pTask->id.idStr, pReq->upstreamTaskId, + pReq->upstreamNodeId); + // todo add the input queue buffer limitation streamTaskEnqueueBlocks(pTask, pReq, pRsp); tDeleteStreamDispatchReq(pReq); @@ -226,10 +227,6 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S if (streamTryExec(pTask) < 0) { return -1; } - - /*if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/ - /*streamDispatch(pTask);*/ - /*}*/ } else { streamSchedExec(pTask); } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 549374ed94..d12eca7ce3 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -208,7 +208,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis return 0; } -int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet) { +int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet) { void* buf = NULL; int32_t code = -1; SRpcMsg msg = {0}; @@ -240,7 +240,7 @@ int32_t streamDispatchOneCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* msg.pCont = buf; msg.msgType = TDMT_STREAM_TASK_CHECK; - qDebug("dispatch from s-task:%s to downstream s-task:%" PRIx64 ":%d node %d: check msg", pTask->id.idStr, + qDebug("s-task:%s dispatch check msg to downstream s-task:%" PRIx64 ":%d node %d: check msg", pTask->id.idStr, pReq->streamId, pReq->downstreamTaskId, nodeId); tmsgSendReq(pEpSet, &msg); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 7ea093973a..66ce5a4a60 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -28,7 +28,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* while (pTask->taskLevel == TASK_LEVEL__SOURCE) { int8_t status = atomic_load_8(&pTask->status.taskStatus); - if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__RESTORE) { + if (status != TASK_STATUS__NORMAL) { qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr, atomic_load_8(&pTask->status.taskStatus)); taosMsleep(2); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 0f33ef6a26..796df03ea6 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -304,6 +304,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { tdbTbcClose(pCur); return -1; } + tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecodeStreamTask(&decoder, pTask); tDecoderClear(&decoder); @@ -322,7 +323,6 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { return -1; } - /*pTask->status.taskStatus = TASK_STATUS__NORMAL;*/ if (pTask->fillHistory) { pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; streamTaskCheckDownstream(pTask, ver); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 0d1440fbde..0324580885 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -54,6 +54,8 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { // checkstatus int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { + qDebug("s-taks:%s in fill history stage, ver:%"PRId64, pTask->id.idStr, version); + SStreamTaskCheckReq req = { .streamId = pTask->id.streamId, .upstreamTaskId = pTask->id.taskId, @@ -63,16 +65,18 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { // serialize if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + req.reqId = tGenIdPI64(); req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId; req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; pTask->checkReqId = req.reqId; - qDebug("task %d at node %d check downstream task %d at node %d", pTask->id.taskId, pTask->nodeId, req.downstreamTaskId, + qDebug("s-task:%s at node %d check downstream task %d at node %d", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); - streamDispatchOneCheckReq(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); + streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t vgSz = taosArrayGetSize(vgInfo); pTask->recoverTryingDownstream = vgSz; pTask->checkReqIds = taosArrayInit(vgSz, sizeof(int64_t)); @@ -83,14 +87,15 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { taosArrayPush(pTask->checkReqIds, &req.reqId); req.downstreamNodeId = pVgInfo->vgId; req.downstreamTaskId = pVgInfo->taskId; - qDebug("task %d at node %d check downstream task %d at node %d (shuffle)", pTask->id.taskId, pTask->nodeId, + qDebug("s-task:%s at node %d check downstream task %d at node %d (shuffle)", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); - streamDispatchOneCheckReq(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); + streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } } else { - qDebug("task %d at node %d direct launch recover since no downstream", pTask->id.taskId, pTask->nodeId); + qDebug("s-task:%s at node %d direct launch recover since no downstream", pTask->id.idStr, pTask->nodeId); streamTaskLaunchRecover(pTask, version); } + return 0; } @@ -109,14 +114,14 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp req.downstreamTaskId, req.downstreamNodeId); if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { - streamDispatchOneCheckReq(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet); + streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; int32_t vgSz = taosArrayGetSize(vgInfo); for (int32_t i = 0; i < vgSz; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); if (pVgInfo->taskId == req.downstreamTaskId) { - streamDispatchOneCheckReq(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet); + streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet); } } } @@ -124,8 +129,8 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp return 0; } -int32_t streamProcessTaskCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* pReq) { - return atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL; +int32_t streamTaskCheckStatus(SStreamTask* pTask) { + return atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL? 1:0; } int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version) { @@ -135,7 +140,9 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* if (pRsp->status == 1) { if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { bool found = false; - for (int32_t i = 0; i < taosArrayGetSize(pTask->checkReqIds); i++) { + + int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds); + for (int32_t i = 0; i < numOfReqs; i++) { int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i); if (reqId == pRsp->reqId) { found = true; @@ -149,9 +156,12 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* int32_t left = atomic_sub_fetch_32(&pTask->recoverTryingDownstream, 1); ASSERT(left >= 0); + if (left == 0) { taosArrayDestroy(pTask->checkReqIds); pTask->checkReqIds = NULL; + + qDebug("s-task:%s all downstream tasks:%d are ready, now enter into recover stage", pTask->id.idStr, numOfReqs); streamTaskLaunchRecover(pTask, version); } } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { @@ -163,7 +173,10 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* } else { ASSERT(0); } - } else { // not ready, it should wait for at least 100ms and then retry + } else { // not ready, wait for 100ms and retry + qDebug("s-task:%s downstream taskId:%d (vgId:%d) not ready, wait for 100ms and retry", pTask->id.idStr, + pRsp->downstreamTaskId, pRsp->downstreamNodeId); + taosMsleep(100); streamRecheckOneDownstream(pTask, pRsp); } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index dc3ff3e6de..6154e30938 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -262,7 +262,7 @@ static int32_t walFetchBodyNew(SWalReader *pReader) { SWalCont *pReadHead = &pReader->pHead->head; int64_t ver = pReadHead->version; - wDebug("vgId:%d, wal starts to fetch body, ver:%" PRId64 " ,len:%d", pReader->pWal->cfg.vgId, ver, + wDebug("vgId:%d, wal starts to fetch body, ver:%" PRId64 " ,len:%d, total", pReader->pWal->cfg.vgId, ver, pReadHead->bodyLen); if (pReader->capacity < pReadHead->bodyLen) { diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 8e68e78acc..f8ce680531 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -255,7 +255,10 @@ void taosArraySet(SArray* pArray, size_t index, void* pData) { } void taosArrayPopFrontBatch(SArray* pArray, size_t cnt) { - ASSERT(cnt <= pArray->size); + if (cnt > pArray->size) { + cnt = pArray->size; + } + pArray->size = pArray->size - cnt; if (pArray->size == 0 || cnt == 0) { return; diff --git a/source/util/src/tcache.c b/source/util/src/tcache.c index 22eee547d0..28d9b412a0 100644 --- a/source/util/src/tcache.c +++ b/source/util/src/tcache.c @@ -264,7 +264,6 @@ static void pushfrontNodeInEntryList(SCacheEntry *pEntry, SCacheNode *pNode) { static void removeNodeInEntryList(SCacheEntry *pe, SCacheNode *prev, SCacheNode *pNode) { if (prev == NULL) { - ASSERT(pe->next == pNode); pe->next = pNode->pNext; } else { prev->pNext = pNode->pNext; @@ -464,7 +463,6 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen SCacheNode *pNode = doSearchInEntryList(pe, key, keyLen, &prev); if (pNode != NULL) { int32_t ref = T_REF_INC(pNode); - ASSERT(ref > 0); } taosRUnLockLatch(&pe->latch); @@ -607,7 +605,6 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { uDebug("cache:%s, key:%p, %p successfully removed from hash table, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, ref); if (ref > 0) { - ASSERT(pNode->pTNodeHeader == NULL); taosAddToTrashcan(pCacheObj, pNode); } else { // ref == 0 atomic_sub_fetch_64(&pCacheObj->sizeInBytes, pNode->size); @@ -916,7 +913,6 @@ void taosStopCacheRefreshWorker(void) { size_t taosCacheGetNumOfObj(const SCacheObj *pCacheObj) { return pCacheObj->numOfElems + pCacheObj->numOfElemsInTrash; } SCacheIter *taosCacheCreateIter(const SCacheObj *pCacheObj) { - ASSERT(pCacheObj != NULL); SCacheIter *pIter = taosMemoryCalloc(1, sizeof(SCacheIter)); pIter->pCacheObj = (SCacheObj *)pCacheObj; pIter->entryIndex = -1; @@ -966,12 +962,8 @@ bool taosCacheIterNext(SCacheIter *pIter) { SCacheNode *pNode = pEntry->next; for (int32_t i = 0; i < pEntry->num; ++i) { - ASSERT(pNode != NULL); - pIter->pCurrent[i] = pNode; int32_t ref = T_REF_INC(pIter->pCurrent[i]); - ASSERT(ref >= 1); - pNode = pNode->pNext; } diff --git a/source/util/src/thash.c b/source/util/src/thash.c index b79824e910..cf4f17bfbc 100644 --- a/source/util/src/thash.c +++ b/source/util/src/thash.c @@ -259,8 +259,6 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp pHashObj->freeFp = NULL; pHashObj->callbackFp = NULL; - ASSERT((pHashObj->capacity & (pHashObj->capacity - 1)) == 0); - pHashObj->hashList = (SHashEntry **)taosMemoryMalloc(pHashObj->capacity * sizeof(void *)); if (pHashObj->hashList == NULL) { taosMemoryFree(pHashObj); @@ -343,7 +341,6 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, const vo while (pNode) { if ((pNode->keyLen == keyLen) && (*(pHashObj->equalFp))(GET_HASH_NODE_KEY(pNode), key, keyLen) == 0 && pNode->removed == 0) { - ASSERT(pNode->hashVal == hashVal); break; } @@ -701,8 +698,6 @@ SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, s void pushfrontNodeInEntryList(SHashEntry *pEntry, SHashNode *pNode) { pNode->next = pEntry->next; pEntry->next = pNode; - - ASSERT(pNode->next != pNode); pEntry->num += 1; } @@ -816,19 +811,7 @@ void *taosHashIterate(SHashObj *pHashObj, void *p) { /*uint16_t prevRef = atomic_load_16(&pNode->refCount);*/ uint16_t afterRef = atomic_add_fetch_16(&pNode->refCount, 1); -#if 0 - ASSERT(prevRef < afterRef); - // the reference count value is overflow, which will cause the delete node operation immediately. - if (prevRef > afterRef) { - uError("hash entry ref count overflow, prev ref:%d, current ref:%d", prevRef, afterRef); - // restore the value - atomic_sub_fetch_16(&pNode->refCount, 1); - data = NULL; - } else { - data = GET_HASH_NODE_DATA(pNode); - } -#endif data = GET_HASH_NODE_DATA(pNode); if (afterRef >= MAX_WARNING_REF_COUNT) { diff --git a/source/util/src/tlosertree.c b/source/util/src/tlosertree.c index bf99212b78..c476baa790 100644 --- a/source/util/src/tlosertree.c +++ b/source/util/src/tlosertree.c @@ -115,8 +115,6 @@ void tMergeTreeAdjust(SMultiwayMergeTreeInfo* pTree, int32_t idx) { } void tMergeTreeRebuild(SMultiwayMergeTreeInfo* pTree) { - ASSERT((pTree->totalSources & 0x1) == 0); - tMergeTreeInit(pTree); for (int32_t i = pTree->totalSources - 1; i >= pTree->numOfSources; i--) { tMergeTreeAdjust(pTree, i); diff --git a/source/util/src/tsched.c b/source/util/src/tsched.c index 467f26b362..5c1706e405 100644 --- a/source/util/src/tsched.c +++ b/source/util/src/tsched.c @@ -137,7 +137,6 @@ void *taosProcessSchedQueue(void *scheduler) { while (1) { if ((ret = tsem_wait(&pSched->fullSem)) != 0) { uFatal("wait %s fullSem failed(%s)", pSched->label, strerror(errno)); - ASSERT(0); } if (atomic_load_8(&pSched->stop)) { break; @@ -145,7 +144,6 @@ void *taosProcessSchedQueue(void *scheduler) { if ((ret = taosThreadMutexLock(&pSched->queueMutex)) != 0) { uFatal("lock %s queueMutex failed(%s)", pSched->label, strerror(errno)); - ASSERT(0); } msg = pSched->queue[pSched->fullSlot]; @@ -154,12 +152,10 @@ void *taosProcessSchedQueue(void *scheduler) { if ((ret = taosThreadMutexUnlock(&pSched->queueMutex)) != 0) { uFatal("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno)); - ASSERT(0); } if ((ret = tsem_post(&pSched->emptySem)) != 0) { uFatal("post %s emptySem failed(%s)", pSched->label, strerror(errno)); - ASSERT(0); } if (msg.fp) @@ -187,12 +183,10 @@ int taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) { if ((ret = tsem_wait(&pSched->emptySem)) != 0) { uFatal("wait %s emptySem failed(%s)", pSched->label, strerror(errno)); - ASSERT(0); } if ((ret = taosThreadMutexLock(&pSched->queueMutex)) != 0) { uFatal("lock %s queueMutex failed(%s)", pSched->label, strerror(errno)); - ASSERT(0); } pSched->queue[pSched->emptySlot] = *pMsg; @@ -200,12 +194,10 @@ int taosScheduleTask(void *queueScheduler, SSchedMsg *pMsg) { if ((ret = taosThreadMutexUnlock(&pSched->queueMutex)) != 0) { uFatal("unlock %s queueMutex failed(%s)", pSched->label, strerror(errno)); - ASSERT(0); } if ((ret = tsem_post(&pSched->fullSem)) != 0) { uFatal("post %s fullSem failed(%s)", pSched->label, strerror(errno)); - ASSERT(0); } return ret; } diff --git a/source/util/src/tskiplist.c b/source/util/src/tskiplist.c index c72c5c70ae..222e0e8a51 100644 --- a/source/util/src/tskiplist.c +++ b/source/util/src/tskiplist.c @@ -268,8 +268,9 @@ SSkipListIterator *tSkipListCreateIter(SSkipList *pSkipList) { } SSkipListIterator *tSkipListCreateIterFromVal(SSkipList *pSkipList, const char *val, int32_t type, int32_t order) { - ASSERT(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC); - ASSERT(pSkipList != NULL); + if (order != TSDB_ORDER_ASC && order != TSDB_ORDER_DESC) { + return NULL; + } SSkipListIterator *iter = doCreateSkipListIterator(pSkipList, order); if (val == NULL) { @@ -585,7 +586,6 @@ static FORCE_INLINE int32_t getSkipListRandLevel(SSkipList *pSkipList) { } } - ASSERT(level <= pSkipList->maxLevel); return level; } diff --git a/utils/test/c/tmqSim.c b/utils/test/c/tmqSim.c index f2de219f4e..d98a45f0d3 100644 --- a/utils/test/c/tmqSim.c +++ b/utils/test/c/tmqSim.c @@ -232,7 +232,7 @@ void saveConfigToLogFile() { taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[k], g_stConfInfo.stThreads[i].value[k]); } taosFprintfFile(g_fp, "\n"); - taosFprintfFile(g_fp, " expect rows: %" PRIx64 "\n", g_stConfInfo.stThreads[i].expectMsgCnt); + taosFprintfFile(g_fp, " expect rows: %" PRId64 "\n", g_stConfInfo.stThreads[i].expectMsgCnt); } char tmpString[128];