diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 9d386ca2f8..962bab309c 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -656,7 +656,9 @@ _OVER: return -1; } -static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { // check for number of existed tasks +// 1. stream number check +// 2. target stable can not be target table of other existed streams. +static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) { int32_t numOfStream = 0; SStreamObj *pStream = NULL; void *pIter = NULL; @@ -669,14 +671,16 @@ static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { / sdbRelease(pMnode->pSdb, pStream); if (numOfStream > MND_STREAM_MAX_NUM) { - mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM); + mError("too many streams, no more than %d for each database, failed to create stream:%s", MND_STREAM_MAX_NUM, + pStreamObj->name); sdbCancelFetch(pMnode->pSdb, pIter); terrno = TSDB_CODE_MND_TOO_MANY_STREAMS; return terrno; } if (pStream->targetStbUid == pStreamObj->targetStbUid) { - mError("Cannot write the same stable as other stream:%s", pStream->name); + mError("Cannot write the same stable as other stream:%s, failed to create stream:%s", pStream->name, + pStreamObj->name); sdbCancelFetch(pMnode->pSdb, pIter); terrno = TSDB_CODE_MND_INVALID_TARGET_TABLE; return terrno; @@ -741,7 +745,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - if (checkForNumOfStreams(pMnode, &streamObj) < 0) { + if (doStreamCheck(pMnode, &streamObj) < 0) { goto _OVER; } @@ -976,7 +980,6 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre int32_t code = -1; int64_t ts = taosGetTimestampMs(); if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) { - // mWarn("checkpoint interval less than the threshold, ignore it"); return TSDB_CODE_SUCCESS; } @@ -1393,6 +1396,15 @@ int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) return 0; } +static void int64ToHexStr(int64_t id, char *pBuf, int32_t bufLen) { + memset(pBuf, 0, bufLen); + pBuf[2] = '0'; + pBuf[3] = 'x'; + + int32_t len = tintToHex(id, &pBuf[4]); + varDataSetLen(pBuf, len + 2); +} + static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -1417,19 +1429,14 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB // stream id char buf[128] = {0}; - int32_t len = tintToHex(pStream->uid, &buf[4]); - buf[2] = '0'; - buf[3] = 'x'; - varDataSetLen(buf, len + 2); + int64ToHexStr(pStream->uid, buf, tListLen(buf)); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, buf, false); // related fill-history stream id - memset(buf, 0, tListLen(buf)); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); if (pStream->hTaskUid != 0) { - len = tintToHex(pStream->hTaskUid, &buf[4]); - varDataSetLen(buf, len + 2); + int64ToHexStr(pStream->hTaskUid, buf, tListLen(buf)); colDataSetVal(pColInfo, numOfRows, buf, false); } else { colDataSetVal(pColInfo, numOfRows, buf, true); @@ -1528,11 +1535,8 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS // task id pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - char idstr[128] = {0}; - int32_t len = tintToHex(pTask->id.taskId, &idstr[4]); - idstr[2] = '0'; - idstr[3] = 'x'; - varDataSetLen(idstr, len + 2); + char idstr[128] = {0}; + int64ToHexStr(pTask->id.taskId, idstr, tListLen(idstr)); colDataSetVal(pColInfo, numOfRows, idstr, false); // node type @@ -1648,11 +1652,7 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS // history_task_id pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); if (pe->hTaskId != 0) { - memset(idstr, 0, tListLen(idstr)); - len = tintToHex(pe->hTaskId, &idstr[4]); - idstr[2] = '0'; - idstr[3] = 'x'; - varDataSetLen(idstr, len + 2); + int64ToHexStr(pe->hTaskId, idstr, tListLen(idstr)); colDataSetVal(pColInfo, numOfRows, idstr, false); } else { colDataSetVal(pColInfo, numOfRows, 0, true); @@ -2023,7 +2023,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange } } - // no need to build the trans to handle the vgroup upddate + // no need to build the trans to handle the vgroup update if (pTrans == NULL) { return 0; } diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 1fedee3bcf..6a381ad31e 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -258,7 +258,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); if (numOfUpdated > 0) { - mDebug("%d stream node(s) need updated from report of hbMsg(vgId:%d)", numOfUpdated, req.vgId); + mDebug("%d stream node(s) need updated from hbMsg(vgId:%d)", numOfUpdated, req.vgId); setNodeEpsetExpiredFlag(req.pUpdateNodes); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 00ca0319d0..8edc0fed4d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1102,7 +1102,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) } if (!pTq->pVnode->restored) { - tqDebug("vgId:%d checkpoint-source msg received during restoring, s-task:0x%x ignore it", vgId, req.taskId); + tqDebug("vgId:%d checkpoint-source msg received during restoring, checkpointId:%" PRId64 + ", transId:%d s-task:0x%x ignore it", + vgId, req.checkpointId, req.transId, req.taskId); SRpcMsg rsp = {0}; buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); tmsgSendRsp(&rsp); // error occurs @@ -1111,7 +1113,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId); if (pTask == NULL) { - tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed", vgId, req.taskId); + tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. checkpointId:%" PRId64 + " transId:%d it may have been destroyed", + vgId, req.taskId, req.checkpointId, req.transId); SRpcMsg rsp = {0}; buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0); tmsgSendRsp(&rsp); // error occurs @@ -1123,7 +1127,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) pTask->chkInfo.checkpointingId = req.checkpointId; pTask->chkInfo.transId = req.transId; - tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64 + tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpointId:%" PRId64 ", transId:%d set it failed", pTask->id.idStr, req.checkpointId, req.transId); streamMetaReleaseTask(pMeta, pTask); @@ -1140,7 +1144,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) if (req.mndTrigger == 1) { if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) { - tqError("s-task:%s not ready for checkpoint, since it is halt, ignore checkpoint:%" PRId64 ", set it failure", + tqError("s-task:%s not ready for checkpoint, since it is halt, ignore checkpointId:%" PRId64 ", set it failure", pTask->id.idStr, req.checkpointId); taosThreadMutexUnlock(&pTask->lock); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 5dafad8951..5be1630189 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -3644,7 +3644,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb { char tbuf[256] = {0}; ginitDict[i].toStrFunc((void*)key, tbuf); - stDebug("streamState str: %s succ to write to %s_%s, len: %d", tbuf, wrapper->idstr, ginitDict[i].key, vlen); + stTrace("streamState str: %s succ to write to %s_%s, len: %d", tbuf, wrapper->idstr, ginitDict[i].key, vlen); } return 0; } @@ -3669,7 +3669,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb { char tbuf[256] = {0}; ginitDict[cfIdx].toStrFunc((void*)key, tbuf); - stDebug("streamState str: %s succ to write to %s_%s", tbuf, wrapper->idstr, ginitDict[cfIdx].key); + stTrace("streamState str: %s succ to write to %s_%s", tbuf, wrapper->idstr, ginitDict[cfIdx].key); } return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 3c22f33f93..e3a0eef071 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1141,7 +1141,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) { entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.transId; if (entry.checkpointInfo.failed) { - stInfo("s-task:%s send kill checkpoint trans info, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.transId); + stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.transId); } }