From 6ffa3945eacce3764a275fce3b9eb272295507fa Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 29 Apr 2024 09:18:26 +0800 Subject: [PATCH 01/34] fix: create stream udf issue --- source/libs/parser/src/parAstParser.c | 7 +++++++ source/libs/parser/src/parTranslater.c | 5 +---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index a1f09088da..3da6ac668c 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -405,6 +405,13 @@ static int32_t collectMetaKeyFromDescribe(SCollectMetaKeyCxt* pCxt, SDescribeStm static int32_t collectMetaKeyFromCreateStream(SCollectMetaKeyCxt* pCxt, SCreateStreamStmt* pStmt) { int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->targetDbName, pStmt->targetTabName, pCxt->pMetaCache); + if (TSDB_CODE_SUCCESS == code && NULL != pStmt->pSubtable && NULL != pStmt->pQuery) { + SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; + pSelect->pSubtable = nodesCloneNode(pStmt->pSubtable); + if (NULL == pSelect->pSubtable) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } if (TSDB_CODE_SUCCESS == code) { code = collectMetaKeyFromQuery(pCxt, pStmt->pQuery); } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 8f77f0dedf..8740f7c883 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -9327,10 +9327,7 @@ static int32_t addSubtableNameToCreateStreamQuery(STranslateContext* pCxt, SCrea if (NULL == pStmt->pSubtable) { return TSDB_CODE_SUCCESS; } - pSelect->pSubtable = nodesCloneNode(pStmt->pSubtable); - if (NULL == pSelect->pSubtable) { - return TSDB_CODE_OUT_OF_MEMORY; - } + SRewriteSubtableCxt cxt = {.pCxt = pCxt, .pPartitionList = pSelect->pPartitionByList}; nodesRewriteExpr(&pSelect->pSubtable, rewriteSubtable, &cxt); return pCxt->errCode; From 9ba8e6bd345296694f2dc7a7510a02c1438dd858 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 09:26:06 +0800 Subject: [PATCH 02/34] refactor: add logs for drop streams. --- source/dnode/mnode/impl/src/mndStream.c | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index c87b8e84f4..f5cccbbeff 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1213,27 +1213,31 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { SMDropStreamReq dropReq = {0}; if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) { + mError("invalid drop stream msg recv, discarded"); terrno = TSDB_CODE_INVALID_MSG; return -1; } - pStream = mndAcquireStream(pMnode, dropReq.name); + mDebug("recv drop stream:%s msg", dropReq.name); + pStream = mndAcquireStream(pMnode, dropReq.name); if (pStream == NULL) { if (dropReq.igNotExists) { - mInfo("stream:%s not exist, ignore not exist is set", dropReq.name); + mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name); sdbRelease(pMnode->pSdb, pStream); tFreeMDropStreamReq(&dropReq); return 0; } else { terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; - mError("stream:%s not exist failed to drop", dropReq.name); + mError("stream:%s not exist failed to drop it", dropReq.name); tFreeMDropStreamReq(&dropReq); return -1; } } if (pStream->smaId != 0) { + mDebug("stream:%s try to drop sma related stream", dropReq.name); + void *pIter = NULL; SSmaObj *pSma = NULL; pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void**)&pSma); @@ -1241,13 +1245,21 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { if (pSma && pSma->uid == pStream->smaId) { sdbRelease(pMnode->pSdb, pSma); sdbRelease(pMnode->pSdb, pStream); + sdbCancelFetch(pMnode->pSdb, pIter); tFreeMDropStreamReq(&dropReq); terrno = TSDB_CODE_TSMA_MUST_BE_DROPPED; + + mError("try to drop sma-related stream:%s, code:%s only allowed to be dropped along with sma", dropReq.name, + tstrerror(terrno)); return -1; } - if (pSma) sdbRelease(pMnode->pSdb, pSma); - pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void**)&pSma); + + if (pSma) { + sdbRelease(pMnode->pSdb, pSma); + } + + pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma); } } @@ -1307,6 +1319,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { mndKillTransImpl(pMnode, transId, pStream->sourceDb); } + mDebug("stream:%s transId:%d start to drop related task when dropping stream", dropReq.name, transId); removeStreamTasksInBuf(pStream, &execInfo); SName name = {0}; From 54e9522c62842c267be0102bf2c7e4a27e1e4cf7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 09:37:53 +0800 Subject: [PATCH 03/34] refactor(stream): opt stream sink perf. --- source/dnode/vnode/src/tq/tqSink.c | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 25c2f32307..af8f567200 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -333,9 +333,10 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c } while (j < newLen && k < oldLen) { - SRow* pNewRow = taosArrayGetP(pNew->aRowP, j); - SRow* pOldRow = taosArrayGetP(pExisted->aRowP, k); - if (pNewRow->ts < pOldRow->ts) { + SRow* pNewRow = TARRAY_GET_ELEM(pNew->aRowP, j); + SRow* pOldRow = TARRAY_GET_ELEM(pExisted->aRowP, k); + + if (pNewRow->ts <= pOldRow->ts) { taosArrayPush(pFinal, &pNewRow); j += 1; } else if (pNewRow->ts > pOldRow->ts) { @@ -373,12 +374,12 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c } while (j < newLen) { - SRow* pRow = taosArrayGetP(pNew->aRowP, j++); + SRow* pRow = TARRAY_GET_ELEM(pNew->aRowP, j++); taosArrayPush(pFinal, &pRow); } while (k < oldLen) { - SRow* pRow = taosArrayGetP(pExisted->aRowP, k++); + SRow* pRow = TARRAY_GET_ELEM(pExisted->aRowP, k++); taosArrayPush(pFinal, &pRow); } From 0ef476a09efa7cac4d9ccb31c80d0b62829fabaa Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 30 Apr 2024 09:42:53 +0800 Subject: [PATCH 04/34] fix: ut issue --- source/libs/parser/src/parTranslater.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 8740f7c883..06bf0081c3 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1054,9 +1054,9 @@ static bool isPrimaryKey(STempTableNode* pTable, SNode* pExpr) { static bool hasPkInTable(const STableMeta* pTableMeta) { bool hasPK = pTableMeta->tableInfo.numOfColumns >= 2 && pTableMeta->schema[1].flags & COL_IS_KEY; if (hasPK) { - uInfo("has primary key, %s", pTableMeta->schema[1].name); + uDebug("has primary key, %s", pTableMeta->schema[1].name); } else { - uInfo("no primary key, %s", pTableMeta->schema[1].name); + uDebug("no primary key, %s", pTableMeta->schema[1].name); } return hasPK; } @@ -9327,6 +9327,10 @@ static int32_t addSubtableNameToCreateStreamQuery(STranslateContext* pCxt, SCrea if (NULL == pStmt->pSubtable) { return TSDB_CODE_SUCCESS; } + pSelect->pSubtable = nodesCloneNode(pStmt->pSubtable); + if (NULL == pSelect->pSubtable) { + return TSDB_CODE_OUT_OF_MEMORY; + } SRewriteSubtableCxt cxt = {.pCxt = pCxt, .pPartitionList = pSelect->pPartitionByList}; nodesRewriteExpr(&pSelect->pSubtable, rewriteSubtable, &cxt); From cd6261c8d4c4d8d3881555f150e323c3ba6cdaf9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 09:49:11 +0800 Subject: [PATCH 05/34] refactor: do some internal refactor. --- source/libs/stream/src/streamCheckStatus.c | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index ea9b2ef89f..8b9de2df4c 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -35,6 +35,7 @@ static int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, static void setCheckDownstreamReqInfo(SStreamTaskCheckReq* pReq, int64_t reqId, int32_t dstTaskId, int32_t dstNodeId); static void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault, int32_t* numOfNotRsp, SArray* pTimeoutList, SArray* pNotReadyList, const char* id); +static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId); static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId); // check status @@ -524,7 +525,7 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) { // the action of add status may incur the restart procedure, which should NEVER be executed in the timer thread. // The restart of all tasks requires that all tasks should not have active timer for now. Therefore, the execution // of restart in timer thread will result in a dead lock. -static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) { +int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId) { SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); if (pRunReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -614,8 +615,8 @@ void rspMonitorFn(void* param, void* tmrId) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug( "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart " - "detected, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", - id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + "detected, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", + id, pStat->name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); streamTaskCompleteCheckRsp(pInfo, false, id); taosThreadMutexUnlock(&pInfo->checkInfoLock); @@ -630,9 +631,9 @@ void rspMonitorFn(void* param, void* tmrId) { if (pInfo->stopCheckProcess == 1) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug( - "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notRsp:%d, notReady:%d, " - "fault:%d, timeout:%d, ready:%d ref:%d", - id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, total:%d, notRsp:%d, " + "notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", + id, pStat->name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); streamTaskCompleteCheckRsp(pInfo, false, id); taosThreadMutexUnlock(&pInfo->checkInfoLock); From 973ca3be805a247351478ecb964827150bc4825f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 09:51:40 +0800 Subject: [PATCH 06/34] refactor: do some internal refactor. --- source/libs/stream/src/streamCheckStatus.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 8b9de2df4c..d356a504c6 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -384,6 +384,8 @@ int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskI } void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { + const char* id = pTask->id.idStr; + SStreamTaskCheckReq req = { .streamId = pTask->id.streamId, .upstreamTaskId = pTask->id.taskId, @@ -398,7 +400,7 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->taskId); stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64, - pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId); + id, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId); streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet); } else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) { @@ -413,8 +415,7 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64, - pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, - p->reqId); + id, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, p->reqId); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); break; } From 9c39d11ea056df33b910118d909830f3d8425c4b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 09:55:56 +0800 Subject: [PATCH 07/34] other: add some logs for drop streams. --- source/dnode/mnode/impl/src/mndStream.c | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index f5cccbbeff..693e37a0ce 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1236,7 +1236,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { } if (pStream->smaId != 0) { - mDebug("stream:%s try to drop sma related stream", dropReq.name); + mDebug("stream:%s, uid:0x%"PRIx64" try to drop sma related stream", dropReq.name, pStream->uid); void *pIter = NULL; SSmaObj *pSma = NULL; @@ -1250,8 +1250,8 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { tFreeMDropStreamReq(&dropReq); terrno = TSDB_CODE_TSMA_MUST_BE_DROPPED; - mError("try to drop sma-related stream:%s, code:%s only allowed to be dropped along with sma", dropReq.name, - tstrerror(terrno)); + mError("try to drop sma-related stream:%s, uid:0x%" PRIx64 " code:%s only allowed to be dropped along with sma", + dropReq.name, pStream->uid, tstrerror(terrno)); return -1; } @@ -1279,7 +1279,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream"); if (pTrans == NULL) { - mError("stream:%s, failed to drop since %s", dropReq.name, terrstr()); + mError("stream:%s uid:0x%"PRIx64" failed to drop since %s", dropReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); tFreeMDropStreamReq(&dropReq); return -1; @@ -1289,7 +1289,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { // drop all tasks if (mndStreamSetDropAction(pMnode, pTrans, pStream) < 0) { - mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr()); + mError("stream:%s uid:0x%" PRIx64 " failed to drop task since %s", dropReq.name, pStream->uid, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); tFreeMDropStreamReq(&dropReq); @@ -1315,11 +1315,13 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { // kill the related checkpoint trans int32_t transId = mndStreamGetRelTrans(pMnode, pStream->uid); if (transId != 0) { - mDebug("drop active related transId:%d due to stream:%s dropped", transId, pStream->name); + mDebug("drop active transId:%d due to stream:%s uid:0x%" PRIx64 " dropped", transId, pStream->name, pStream->uid); mndKillTransImpl(pMnode, transId, pStream->sourceDb); } - mDebug("stream:%s transId:%d start to drop related task when dropping stream", dropReq.name, transId); + mDebug("stream:%s uid:0x%" PRIx64 " transId:%d start to drop related task when dropping stream", dropReq.name, + pStream->uid, transId); + removeStreamTasksInBuf(pStream, &execInfo); SName name = {0}; From d3e676b0843df0e90c8a3626e482f2862e9a7f17 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 10:14:32 +0800 Subject: [PATCH 08/34] fix(cos): fix syntax error. --- source/dnode/mnode/impl/src/mndStream.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 693e37a0ce..e8c22c8de2 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1279,7 +1279,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { STrans *pTrans = doCreateTrans(pMnode, pStream, pReq, TRN_CONFLICT_NOTHING, MND_STREAM_DROP_NAME, "drop stream"); if (pTrans == NULL) { - mError("stream:%s uid:0x%"PRIx64" failed to drop since %s", dropReq.name, terrstr()); + mError("stream:%s uid:0x%"PRIx64" failed to drop since %s", dropReq.name, pStream->uid, terrstr()); sdbRelease(pMnode->pSdb, pStream); tFreeMDropStreamReq(&dropReq); return -1; From 5f582723ebe67dcb0f0423baf527bd7491eab462 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Tue, 30 Apr 2024 10:23:40 +0800 Subject: [PATCH 09/34] fix: pSchemaExt crash --- source/libs/catalog/src/ctgUtil.c | 4 ++-- source/libs/parser/src/parUtil.c | 2 +- source/libs/qcom/src/queryUtil.c | 2 +- source/libs/qcom/src/querymsg.c | 4 ++-- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index 77725c1dda..6da1da2b52 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -1539,7 +1539,7 @@ int32_t ctgCloneMetaOutput(STableMetaOutput* output, STableMetaOutput** pOutput) if (output->tbMeta) { int32_t metaSize = CTG_META_SIZE(output->tbMeta); int32_t schemaExtSize = 0; - if (useCompress(output->ctbMeta.tableType)) { + if (useCompress(output->tbMeta->tableType) && (*pOutput)->tbMeta->schemaExt) { schemaExtSize = output->tbMeta->tableInfo.numOfColumns * sizeof(SSchemaExt); } (*pOutput)->tbMeta = taosMemoryMalloc(metaSize + schemaExtSize); @@ -1551,7 +1551,7 @@ int32_t ctgCloneMetaOutput(STableMetaOutput* output, STableMetaOutput** pOutput) } memcpy((*pOutput)->tbMeta, output->tbMeta, metaSize); - if (useCompress(output->ctbMeta.tableType)) { + if (useCompress(output->tbMeta->tableType) && (*pOutput)->tbMeta->schemaExt) { (*pOutput)->tbMeta->schemaExt = (SSchemaExt *)((char *)(*pOutput)->tbMeta + metaSize); memcpy((*pOutput)->tbMeta->schemaExt, output->tbMeta->schemaExt, schemaExtSize); } else { diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 5bd484f137..8f55850448 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -1225,7 +1225,7 @@ STableCfg* tableCfgDup(STableCfg* pCfg) { SSchema* pSchema = taosMemoryMalloc(schemaSize); memcpy(pSchema, pCfg->pSchemas, schemaSize); SSchemaExt* pSchemaExt = NULL; - if (useCompress(pCfg->tableType)) { + if (useCompress(pCfg->tableType) && pCfg->pSchemaExt) { int32_t schemaExtSize = pCfg->numOfColumns * sizeof(SSchemaExt); pSchemaExt = taosMemoryMalloc(schemaExtSize); memcpy(pSchemaExt, pCfg->pSchemaExt, schemaExtSize); diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 2a4bf196e2..4206c9292f 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -482,7 +482,7 @@ int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) { int32_t metaSize = sizeof(STableMeta) + numOfField * sizeof(SSchema); int32_t schemaExtSize = 0; - if (useCompress(pSrc->tableType)) { + if (useCompress(pSrc->tableType) && pSrc->schemaExt) { schemaExtSize = pSrc->tableInfo.numOfColumns * sizeof(SSchemaExt); } *pDst = taosMemoryMalloc(metaSize + schemaExtSize); diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 4af207254a..b13919e5e1 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -454,7 +454,7 @@ int32_t queryCreateCTableMetaFromMsg(STableMetaRsp *msg, SCTableMeta *pMeta) { int32_t queryCreateTableMetaFromMsg(STableMetaRsp *msg, bool isStb, STableMeta **pMeta) { int32_t total = msg->numOfColumns + msg->numOfTags; int32_t metaSize = sizeof(STableMeta) + sizeof(SSchema) * total; - int32_t schemaExtSize = useCompress(msg->tableType) ? sizeof(SSchemaExt) * msg->numOfColumns : 0; + int32_t schemaExtSize = (useCompress(msg->tableType) && msg->pSchemaExt) ? sizeof(SSchemaExt) * msg->numOfColumns : 0; STableMeta *pTableMeta = taosMemoryCalloc(1, metaSize + schemaExtSize); if (NULL == pTableMeta) { @@ -475,7 +475,7 @@ int32_t queryCreateTableMetaFromMsg(STableMetaRsp *msg, bool isStb, STableMeta * pTableMeta->tableInfo.numOfColumns = msg->numOfColumns; memcpy(pTableMeta->schema, msg->pSchemas, sizeof(SSchema) * total); - if (useCompress(msg->tableType)) { + if (useCompress(msg->tableType) && msg->pSchemaExt) { pTableMeta->schemaExt = pSchemaExt; memcpy(pSchemaExt, msg->pSchemaExt, schemaExtSize); } else { From 3a1c210601fc622cfc5f0a1deaace232821eaf18 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 30 Apr 2024 11:06:03 +0800 Subject: [PATCH 10/34] fix:use path of SStreamMeta --- source/dnode/vnode/src/tq/tqStreamStateSnap.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 50f413bcc9..58dff1d9b3 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -51,7 +51,7 @@ int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS SStreamSnapReader* pSnapReader = NULL; - if (streamSnapReaderOpen(meta, sver, chkpId, pTq->path, &pSnapReader) == 0) { + if (streamSnapReaderOpen(meta, sver, chkpId, meta->path, &pSnapReader) == 0) { pReader->complete = 1; } else { code = -1; @@ -139,7 +139,7 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS pWriter->sver = sver; pWriter->ever = ever; - sprintf(tdir, "%s%s%s", pTq->path, TD_DIRSEP, VNODE_TQ_STREAM); + sprintf(tdir, "%s%s%s", pTq->pStreamMeta->path, TD_DIRSEP, VNODE_TQ_STREAM); taosMkDir(tdir); SStreamSnapWriter* pSnapWriter = NULL; From 87c71337d1167721a31109a5125e6345f59b3463 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Tue, 30 Apr 2024 12:05:53 +0800 Subject: [PATCH 11/34] fix: pSchemaExt null --- source/libs/command/src/command.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index 847c694b80..39156cf4dd 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -151,7 +151,7 @@ static int32_t setDescResultIntoDataBlock(bool sysInfoUser, SSDataBlock* pBlock, STR_TO_VARSTR(buf, "VIEW COL"); } colDataSetVal(pCol4, pBlock->info.rows, buf, false); - if (useCompress(pMeta->tableType)) { + if (useCompress(pMeta->tableType) && pMeta->schemaExt) { if (i < pMeta->tableInfo.numOfColumns) { STR_TO_VARSTR(buf, columnEncodeStr(COMPRESS_L1_TYPE_U32(pMeta->schemaExt[i].compress))); colDataSetVal(pCol5, pBlock->info.rows, buf, false); @@ -201,7 +201,7 @@ static int32_t execDescribe(bool sysInfoUser, SNode* pStmt, SRetrieveTableRsp** code = setDescResultIntoDataBlock(sysInfoUser, pBlock, numOfRows, pDesc->pMeta, biMode); } if (TSDB_CODE_SUCCESS == code) { - if (pDesc->pMeta && useCompress(pDesc->pMeta->tableType)) { + if (pDesc->pMeta && useCompress(pDesc->pMeta->tableType) && pDesc->pMeta.schemaExt) { code = buildRetrieveTableRsp(pBlock, DESCRIBE_RESULT_COLS_COMPRESS, pRsp); } else { code = buildRetrieveTableRsp(pBlock, DESCRIBE_RESULT_COLS, pRsp); @@ -569,7 +569,7 @@ void appendColumnFields(char* buf, int32_t* len, STableCfg* pCfg) { sprintf(type + strlen(type), "(%d)", (int32_t)((pSchema->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)); } - if (useCompress(pCfg->tableType)) { + if (useCompress(pCfg->tableType) && pCfg->pSchemaExt) { sprintf(type + strlen(type), " ENCODE \'%s\'", columnEncodeStr(COMPRESS_L1_TYPE_U32(pCfg->pSchemaExt[i].compress))); sprintf(type + strlen(type), " COMPRESS \'%s\'", From 879513c831d369c9498ca72278b56690d041b272 Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Tue, 30 Apr 2024 12:09:20 +0800 Subject: [PATCH 12/34] fix build failed --- source/libs/command/src/command.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index 39156cf4dd..d19b4c6913 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -201,7 +201,7 @@ static int32_t execDescribe(bool sysInfoUser, SNode* pStmt, SRetrieveTableRsp** code = setDescResultIntoDataBlock(sysInfoUser, pBlock, numOfRows, pDesc->pMeta, biMode); } if (TSDB_CODE_SUCCESS == code) { - if (pDesc->pMeta && useCompress(pDesc->pMeta->tableType) && pDesc->pMeta.schemaExt) { + if (pDesc->pMeta && useCompress(pDesc->pMeta->tableType) && pDesc->pMeta->schemaExt) { code = buildRetrieveTableRsp(pBlock, DESCRIBE_RESULT_COLS_COMPRESS, pRsp); } else { code = buildRetrieveTableRsp(pBlock, DESCRIBE_RESULT_COLS, pRsp); From 2bd83169bbda3d5a6fa85e32fe5cdb37d322d130 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 30 Apr 2024 13:22:10 +0800 Subject: [PATCH 13/34] fix: memory leak issue --- source/libs/parser/src/parTranslater.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 06bf0081c3..124bfbc9d8 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -9327,9 +9327,11 @@ static int32_t addSubtableNameToCreateStreamQuery(STranslateContext* pCxt, SCrea if (NULL == pStmt->pSubtable) { return TSDB_CODE_SUCCESS; } - pSelect->pSubtable = nodesCloneNode(pStmt->pSubtable); if (NULL == pSelect->pSubtable) { - return TSDB_CODE_OUT_OF_MEMORY; + pSelect->pSubtable = nodesCloneNode(pStmt->pSubtable); + if (NULL == pSelect->pSubtable) { + return TSDB_CODE_OUT_OF_MEMORY; + } } SRewriteSubtableCxt cxt = {.pCxt = pCxt, .pPartitionList = pSelect->pPartitionByList}; From 4c27af9b13837a5ee0cc5b66c9b21e0baa450df0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 13:25:39 +0800 Subject: [PATCH 14/34] refactor: do some internal refactor. --- source/common/src/systable.c | 1 + source/dnode/mnode/impl/src/mndStream.c | 7 +++++++ source/dnode/vnode/src/tq/tqSink.c | 8 ++++---- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/source/common/src/systable.c b/source/common/src/systable.c index bf2f14339d..9de682dd3a 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -173,6 +173,7 @@ static const SSysDbTableSchema streamSchema[] = { {.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = false}, {.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "sink_quota", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "checkpoint_interval", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "checkpoint_backup", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "history_scan_idle", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index e8c22c8de2..b52f98adbf 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1503,6 +1503,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)&trigger, false); + // sink_quota char sinkQuota[20 + VARSTR_HEADER_SIZE] = {0}; sinkQuota[0] = '0'; char dstStr[20] = {0}; @@ -1510,6 +1511,12 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)dstStr, false); + // checkpoint interval + char tmp[20 + VARSTR_HEADER_SIZE] = {0}; + STR_TO_VARSTR(tmp, "none") + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)tmp, false); + // checkpoint backup type char backup[20 + VARSTR_HEADER_SIZE] = {0}; STR_TO_VARSTR(backup, "none") diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index af8f567200..34171a5872 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -333,8 +333,8 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c } while (j < newLen && k < oldLen) { - SRow* pNewRow = TARRAY_GET_ELEM(pNew->aRowP, j); - SRow* pOldRow = TARRAY_GET_ELEM(pExisted->aRowP, k); + SRow* pNewRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j); + SRow* pOldRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k); if (pNewRow->ts <= pOldRow->ts) { taosArrayPush(pFinal, &pNewRow); @@ -374,12 +374,12 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c } while (j < newLen) { - SRow* pRow = TARRAY_GET_ELEM(pNew->aRowP, j++); + SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j++); taosArrayPush(pFinal, &pRow); } while (k < oldLen) { - SRow* pRow = TARRAY_GET_ELEM(pExisted->aRowP, k++); + SRow* pRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k++); taosArrayPush(pFinal, &pRow); } From d73fd5492a8dcebac43ce37164eed071c2424c53 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 14:24:20 +0800 Subject: [PATCH 15/34] fix(test): update test case. --- source/dnode/mnode/impl/src/mndStream.c | 4 +++- tests/system-test/0-others/information_schema.py | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index b52f98adbf..fb15e4b857 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1513,7 +1513,9 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB // checkpoint interval char tmp[20 + VARSTR_HEADER_SIZE] = {0}; - STR_TO_VARSTR(tmp, "none") + sprintf(varDataVal(tmp), "%d sec", tsStreamCheckpointInterval); + varDataSetLen(tmp, strlen(varDataVal(tmp))); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataSetVal(pColInfo, numOfRows, (const char *)tmp, false); diff --git a/tests/system-test/0-others/information_schema.py b/tests/system-test/0-others/information_schema.py index 9a112c669e..2924ebd388 100644 --- a/tests/system-test/0-others/information_schema.py +++ b/tests/system-test/0-others/information_schema.py @@ -222,7 +222,7 @@ class TDTestCase: tdSql.query("select * from information_schema.ins_columns where db_name ='information_schema'") tdLog.info(len(tdSql.queryResult)) - tdSql.checkEqual(True, len(tdSql.queryResult) in range(253, 254)) + tdSql.checkEqual(True, len(tdSql.queryResult) in range(254, 255)) tdSql.query("select * from information_schema.ins_columns where db_name ='performance_schema'") tdSql.checkEqual(54, len(tdSql.queryResult)) From 35003cbb34d95b3b1e2483ec60ead7e36a5de99e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 30 Apr 2024 14:36:16 +0800 Subject: [PATCH 16/34] fix:use path of SStreamMeta --- source/dnode/vnode/src/tq/tqStreamStateSnap.c | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 58dff1d9b3..926a8c62a7 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -128,7 +128,6 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS int32_t code = 0; SStreamStateWriter* pWriter; - char tdir[TSDB_FILENAME_LEN * 2] = {0}; // alloc pWriter = (SStreamStateWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); if (pWriter == NULL) { @@ -139,15 +138,14 @@ int32_t streamStateSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamS pWriter->sver = sver; pWriter->ever = ever; - sprintf(tdir, "%s%s%s", pTq->pStreamMeta->path, TD_DIRSEP, VNODE_TQ_STREAM); - taosMkDir(tdir); + taosMkDir(pTq->pStreamMeta->path); SStreamSnapWriter* pSnapWriter = NULL; - if (streamSnapWriterOpen(pTq, sver, ever, tdir, &pSnapWriter) < 0) { + if (streamSnapWriterOpen(pTq, sver, ever, pTq->pStreamMeta->path, &pSnapWriter) < 0) { goto _err; } - tqDebug("vgId:%d, vnode %s snapshot writer opened, path:%s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER, tdir); + tqDebug("vgId:%d, vnode %s snapshot writer opened, path:%s", TD_VID(pTq->pVnode), STREAM_STATE_TRANSFER, pTq->pStreamMeta->path); pWriter->pWriterImpl = pSnapWriter; *ppWriter = pWriter; From 1ee686760f9056e37f5a0d42a3e6c8f885cfd6c5 Mon Sep 17 00:00:00 2001 From: haoranchen Date: Tue, 30 Apr 2024 15:05:11 +0800 Subject: [PATCH 17/34] test: increase timeout on mac test --- Jenkinsfile2 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Jenkinsfile2 b/Jenkinsfile2 index 05ba85b091..7df465ab2a 100644 --- a/Jenkinsfile2 +++ b/Jenkinsfile2 @@ -393,7 +393,7 @@ pipeline { agent{label " Mac_catalina "} steps { catchError(buildResult: 'FAILURE', stageResult: 'FAILURE') { - timeout(time: 30, unit: 'MINUTES'){ + timeout(time: 60, unit: 'MINUTES'){ pre_test() pre_test_build_mac() } From 0758c4ece03a5c41f03fea8bd461e868535c52c5 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 30 Apr 2024 15:38:09 +0800 Subject: [PATCH 18/34] fix:[TD-29869]use latest vg info when drop topic --- source/dnode/mnode/impl/src/mndSubscribe.c | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 8bbaadd203..0068b582cf 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -907,21 +907,34 @@ END: return code; } -static int32_t sendDeleteSubToVnode(SMqSubscribeObj *pSub, STrans *pTrans){ +static int32_t sendDeleteSubToVnode(SMnode *pMnode, SMqSubscribeObj *pSub, STrans *pTrans){ // iter all vnode to delete handle int32_t sz = taosArrayGetSize(pSub->unassignedVgs); for (int32_t i = 0; i < sz; i++) { SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i); SMqVDeleteReq *pReq = taosMemoryCalloc(1, sizeof(SMqVDeleteReq)); + if(pReq == NULL){ + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } pReq->head.vgId = htonl(pVgEp->vgId); pReq->vgId = pVgEp->vgId; pReq->consumerId = -1; memcpy(pReq->subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); + + SVgObj *pVgObj = mndAcquireVgroup(pMnode, pVgEp->vgId); + if (pVgObj == NULL) { + taosMemoryFree(pReq); + terrno = TSDB_CODE_MND_VGROUP_NOT_EXIST; + return -1; + } STransAction action = {0}; - action.epSet = pVgEp->epSet; + action.epSet = mndGetVgroupEpset(pMnode, pVgObj);; action.pCont = pReq; action.contLen = sizeof(SMqVDeleteReq); action.msgType = TDMT_VND_TMQ_DELETE_SUB; + + mndReleaseVgroup(pMnode, pVgObj); if (mndTransAppendRedoAction(pTrans, &action) != 0) { taosMemoryFree(pReq); return -1; @@ -1002,7 +1015,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg) { goto end; } - code = sendDeleteSubToVnode(pSub, pTrans); + code = sendDeleteSubToVnode(pMnode, pSub, pTrans); if (code != 0) { goto end; } @@ -1263,7 +1276,7 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName) goto END; } - code = sendDeleteSubToVnode(pSub, pTrans); + code = sendDeleteSubToVnode(pMnode, pSub, pTrans); if (code != 0) { goto END; } From cb5a0563e641608164b057c84a0cf3238a2cba64 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 15:43:12 +0800 Subject: [PATCH 19/34] fix(stream): fix error caused by merge. --- source/dnode/vnode/src/tq/tqSink.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 34171a5872..c0f58fc3ec 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -336,7 +336,7 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c SRow* pNewRow = *(SRow**)TARRAY_GET_ELEM(pNew->aRowP, j); SRow* pOldRow = *(SRow**)TARRAY_GET_ELEM(pExisted->aRowP, k); - if (pNewRow->ts <= pOldRow->ts) { + if (pNewRow->ts < pOldRow->ts) { taosArrayPush(pFinal, &pNewRow); j += 1; } else if (pNewRow->ts > pOldRow->ts) { From 0a1e686e109092fbfd398c05045d0f14b11ac998 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Tue, 30 Apr 2024 16:54:14 +0800 Subject: [PATCH 20/34] tsma hint add doc --- docs/en/12-taos-sql/06-select.md | 1 + docs/zh/12-taos-sql/06-select.md | 1 + 2 files changed, 2 insertions(+) diff --git a/docs/en/12-taos-sql/06-select.md b/docs/en/12-taos-sql/06-select.md index f538f0d58c..c6740a5242 100755 --- a/docs/en/12-taos-sql/06-select.md +++ b/docs/en/12-taos-sql/06-select.md @@ -95,6 +95,7 @@ The list of currently supported Hints is as follows: | PARTITION_FIRST| None | Use Partition before aggregate, conflict with SORT_FOR_GROUP | With normal column in partition by list | | PARA_TABLES_SORT| None | When sorting the supertable rows by timestamp, No temporary disk space is used. When there are numerous tables, each with long rows, the corresponding algorithm associated with this prompt may consume a substantial amount of memory, potentially leading to an Out Of Memory (OOM) situation. | Sorting the supertable rows by timestamp | | SMALLDATA_TS_SORT| None | When sorting the supertable rows by timestamp, if the length of query columns >= 256, and there are relatively few rows, this hint can improve performance. | Sorting the supertable rows by timestamp | +| SKIP_TSMA| None| To explicitly disable tsma optimization for select query|Select query with agg funcs| For example: diff --git a/docs/zh/12-taos-sql/06-select.md b/docs/zh/12-taos-sql/06-select.md index 0db0c99c59..b6792d0efc 100755 --- a/docs/zh/12-taos-sql/06-select.md +++ b/docs/zh/12-taos-sql/06-select.md @@ -95,6 +95,7 @@ Hints 是用户控制单个语句查询优化的一种手段,当 Hint 不适 | PARTITION_FIRST| 无 | 在聚合之前使用PARTITION计算分组, 与SORT_FOR_GROUP冲突 | partition by 列表有普通列时 | | PARA_TABLES_SORT| 无 | 超级表的数据按时间戳排序时, 不使用临时磁盘空间, 只使用内存。当子表数量多, 行长比较大时候, 会使用大量内存, 可能发生OOM | 超级表的数据按时间戳排序时 | | SMALLDATA_TS_SORT| 无 | 超级表的数据按时间戳排序时, 查询列长度大于等于256, 但是行数不多, 使用这个提示, 可以提高性能 | 超级表的数据按时间戳排序时 | +| SKIP_TSMA | 无 | 用于显示的禁用TSMA查询优化 | 带Agg函数的查询语句 | 举例: From 629502e12eee6365d4da272096727670c53d33e3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 17:00:51 +0800 Subject: [PATCH 21/34] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tq.c | 40 +++++++++---------- source/dnode/vnode/src/tqCommon/tqCommon.c | 15 ++----- source/libs/stream/inc/streamInt.h | 4 +- source/libs/stream/src/streamBackendRocksdb.c | 8 ++-- source/libs/stream/src/streamCheckpoint.c | 10 +++-- source/libs/stream/test/checkpointTest.cpp | 2 +- 6 files changed, 35 insertions(+), 44 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0e6b85bd2b..8dca3b2179 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -85,26 +85,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { } } -int32_t tqInitialize(STQ* pTq) { - if (tqMetaOpen(pTq) < 0) { - return -1; - } - - pTq->pOffsetStore = tqOffsetOpen(pTq); - if (pTq->pOffsetStore == NULL) { - return -1; - } - - int32_t vgId = TD_VID(pTq->pVnode); - pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, vgId, -1, tqStartTaskCompleteCallback); - if (pTq->pStreamMeta == NULL) { - return -1; - } - - /*int32_t code = */streamMetaLoadAllTasks(pTq->pStreamMeta); - return 0; -} - void tqClose(STQ* pTq) { qDebug("start to close tq"); if (pTq == NULL) { @@ -137,6 +117,26 @@ void tqClose(STQ* pTq) { taosMemoryFree(pTq); } +int32_t tqInitialize(STQ* pTq) { + if (tqMetaOpen(pTq) < 0) { + return -1; + } + + pTq->pOffsetStore = tqOffsetOpen(pTq); + if (pTq->pOffsetStore == NULL) { + return -1; + } + + int32_t vgId = TD_VID(pTq->pVnode); + pTq->pStreamMeta = streamMetaOpen(pTq->path, pTq, (FTaskExpand*)tqExpandTask, vgId, -1, tqStartTaskCompleteCallback); + if (pTq->pStreamMeta == NULL) { + return -1; + } + + /*int32_t code = */streamMetaLoadAllTasks(pTq->pStreamMeta); + return 0; +} + void tqNotifyClose(STQ* pTq) { if (pTq == NULL) { return; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 924b0a8207..61ecbe8e5c 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -27,6 +27,8 @@ typedef struct SMStreamCheckpointReadyRspMsg { SMsgHead head; } SMStreamCheckpointReadyRspMsg; +static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg); + static STaskId replaceStreamTaskId(SStreamTask* pTask) { ASSERT(pTask->info.fillHistory); STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; @@ -490,16 +492,6 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId); } -static void setParam(SStreamTask* pTask, int64_t* startCheckTs, bool* hasHTask, STaskId* pId) { - *startCheckTs = pTask->execInfo.checkTs; - - if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - *hasHTask = true; - pId->streamId = pTask->hTaskInfo.id.streamId; - pId->taskId = pTask->hTaskInfo.id.taskId; - } -} - int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) { char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); @@ -1053,10 +1045,9 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) { return taosArrayGetSize(pMeta->pTaskList); } -static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) { +int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) { rpcFreeCont(pMsg->pCont); pMsg->pCont = NULL; - return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 0ee31197dc..3a4e3d81fb 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -157,9 +157,7 @@ typedef enum ECHECKPOINT_BACKUP_TYPE { ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType(); -int32_t streamTaskBackupCheckpoint(char* id, char* path); -int32_t downloadCheckpoint(char* id, char* path); -int32_t deleteCheckpoint(char* id); +int32_t streamTaskDownloadCheckpointData(char* id, char* path); int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask); int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 662d02a48f..da1096e7de 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -333,7 +333,7 @@ int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t chkpId, c taosRemoveDir(defaultPath); } - code = downloadCheckpoint(key, chkpPath); + code = streamTaskDownloadCheckpointData(key, chkpPath); if (code != 0) { return code; } @@ -342,7 +342,7 @@ int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t chkpId, c return code; } int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { - int32_t code = downloadCheckpoint(key, chkpPath); + int32_t code = streamTaskDownloadCheckpointData(key, chkpPath); if (code != 0) { return code; } @@ -2110,8 +2110,8 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64 return code; } int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) { - STaskDbWrapper* pDb = arg; - ECHECKPOINT_BACKUP_TYPE utype = type; + STaskDbWrapper* pDb = arg; + ECHECKPOINT_BACKUP_TYPE utype = type; if (utype == DATA_UPLOAD_RSYNC) { return taskDbGenChkpUploadData__rsync(pDb, chkpId, path); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 3428fc36e1..5c7aaec623 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -27,7 +27,9 @@ typedef struct { } SAsyncUploadArg; static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); -static int32_t deleteCheckpointFile(char* id, char* name); +static int32_t deleteCheckpointFile(const char* id, const char* name); +static int32_t streamTaskBackupCheckpoint(char* id, char* path); +static int32_t deleteCheckpoint(char* id); int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; @@ -644,9 +646,9 @@ int32_t downloadCheckpointDataByName(const char* id, const char* fname, const ch return 0; } -int32_t downloadCheckpoint(char* id, char* path) { +int32_t streamTaskDownloadCheckpointData(char* id, char* path) { if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) { - stError("downloadCheckpoint parameters invalid"); + stError("streamTaskDownloadCheckpointData parameters invalid"); return -1; } @@ -672,7 +674,7 @@ int32_t deleteCheckpoint(char* id) { return 0; } -int32_t deleteCheckpointFile(char* id, char* name) { +int32_t deleteCheckpointFile(const char* id, const char* name) { char object[128] = {0}; snprintf(object, sizeof(object), "%s/%s", id, name); char* tmp = object; diff --git a/source/libs/stream/test/checkpointTest.cpp b/source/libs/stream/test/checkpointTest.cpp index 0caad479e5..34e80fc08b 100644 --- a/source/libs/stream/test/checkpointTest.cpp +++ b/source/libs/stream/test/checkpointTest.cpp @@ -59,7 +59,7 @@ TEST(testCase, checkpointUpload_Test) { TEST(testCase, checkpointDownload_Test) { char* id = "2013892036"; - // downloadCheckpoint(id, "/root/offset/download/"); + // streamTaskDownloadCheckpointData(id, "/root/offset/download/"); } TEST(testCase, checkpointDelete_Test) { From ed610d293f1c127e906e95c89fa6e40504dfd542 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 17:09:37 +0800 Subject: [PATCH 22/34] fix(stream): close task if it's in checkdown stream procedure. --- source/libs/stream/src/streamMeta.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index edc1a148a9..362c97df7c 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1263,7 +1263,8 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) { SStreamTask* pTask = *(SStreamTask**)pIter; if (pTask->status.timerActive >= 1) { - stDebug("s-task:%s in timer, blocking tasks in vgId:%d restart", pTask->id.idStr, pMeta->vgId); + stDebug("s-task:%s in timer, blocking tasks in vgId:%d restart, set closing again", pTask->id.idStr, pMeta->vgId); + streamTaskStop(pTask); inTimer = true; } } From 732e52d64d816feb7fc91103e19df72645324a06 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 19:11:58 +0800 Subject: [PATCH 23/34] refactor: do some internal refactor, and add some logs. --- include/libs/stream/tstream.h | 2 +- source/dnode/snode/src/snode.c | 4 +- source/dnode/vnode/src/tq/tq.c | 2 +- source/dnode/vnode/src/tq/tqStreamStateSnap.c | 3 +- source/libs/stream/src/streamMeta.c | 43 +++++++++---------- 5 files changed, 26 insertions(+), 28 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e3487c49d1..14aae0b96a 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -896,7 +896,7 @@ void streamMetaWUnLock(SStreamMeta* pMeta); void streamMetaResetStartInfo(STaskStartInfo* pMeta); SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta); void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader); -int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); +void streamMetaLoadAllTasks(SStreamMeta* pMeta); int32_t streamMetaStartAllTasks(SStreamMeta* pMeta); int32_t streamMetaStopAllTasks(SStreamMeta* pMeta); int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index b717504e1e..87f0681780 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -93,9 +93,7 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { goto FAIL; } - if (streamMetaLoadAllTasks(pSnode->pMeta) < 0) { - goto FAIL; - } + streamMetaLoadAllTasks(pSnode->pMeta); stopRsync(); startRsync(); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index baef3d9bd6..3a3b103e3f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -92,7 +92,7 @@ int32_t tqInitialize(STQ* pTq) { return -1; } - /*int32_t code = */streamMetaLoadAllTasks(pTq->pStreamMeta); + streamMetaLoadAllTasks(pTq->pStreamMeta); if (tqMetaTransform(pTq) < 0) { return -1; diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 926a8c62a7..290266d94a 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -179,5 +179,6 @@ int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) } int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) { - return streamMetaLoadAllTasks(pWriter->pTq->pStreamMeta); + streamMetaLoadAllTasks(pWriter->pTq->pStreamMeta); + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 362c97df7c..aca0a38d48 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -117,31 +117,22 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) { return 0; } -typedef struct { - int64_t chkpId; - char* path; - char* taskId; - - SArray* pChkpSave; - SArray* pChkpInUse; - int8_t chkpCap; - void* backend; - -} StreamMetaTaskState; - int32_t streamMetaOpenTdb(SStreamMeta* pMeta) { if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0, 0, NULL) < 0) { + stError("vgId:%d open file:%s failed, stream meta open failed", pMeta->vgId, pMeta->path); return -1; - // goto _err; } if (tdbTbOpen("task.db", STREAM_TASK_KEY_LEN, -1, NULL, pMeta->db, &pMeta->pTaskDb, 0) < 0) { + stError("vgId:%d, open task.db failed, stream meta open failed", pMeta->vgId); return -1; } if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) { + stError("vgId:%d, open checkpoint.db failed, stream meta open failed", pMeta->vgId); return -1; } + return 0; } @@ -231,17 +222,18 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) { if (compatible == STREAM_STATA_COMPATIBLE) { return 0; } else if (compatible == STREAM_STATA_NEED_CONVERT) { - stInfo("stream state need covert backend format"); + stInfo("vgId:%d stream state need covert backend format", pMeta->vgId); return streamMetaCvtDbFormat(pMeta); } else if (compatible == STREAM_STATA_NO_COMPATIBLE) { stError( - "stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream " + "vgId:%d stream read incompatible data, rm %s/vnode/vnode*/tq/stream if taosd cannot start, and rebuild stream " "manually", - tsDataDir); + pMeta->vgId, tsDataDir); return -1; } + return 0; } @@ -324,33 +316,40 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF if (streamMetaMayCvtDbFormat(pMeta) < 0) { goto _err; } + if (streamMetaBegin(pMeta) < 0) { + stError("vgId:%d begin trans for stream meta failed", pMeta->vgId); goto _err; } _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR); pMeta->pTasksMap = taosHashInit(64, fp, true, HASH_NO_LOCK); if (pMeta->pTasksMap == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } pMeta->updateInfo.pTasks = taosHashInit(64, fp, false, HASH_NO_LOCK); if (pMeta->updateInfo.pTasks == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } pMeta->startInfo.pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK); if (pMeta->startInfo.pReadyTaskSet == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } pMeta->startInfo.pFailedTaskSet = taosHashInit(4, fp, false, HASH_NO_LOCK); if (pMeta->startInfo.pFailedTaskSet == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } pMeta->pHbInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo)); if (pMeta->pHbInfo == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } @@ -824,7 +823,8 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) { return chkpId; } -int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { +// not allowed to return error code +void streamMetaLoadAllTasks(SStreamMeta* pMeta) { TBC* pCur = NULL; void* pKey = NULL; int32_t kLen = 0; @@ -833,7 +833,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { SDecoder decoder; if (pMeta == NULL) { - return TSDB_CODE_SUCCESS; + return; } SArray* pRecycleList = taosArrayInit(4, sizeof(STaskId)); @@ -844,7 +844,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { if (code != TSDB_CODE_SUCCESS) { stError("vgId:%d failed to open stream meta, code:%s, not load any stream tasks", vgId, tstrerror(terrno)); taosArrayDestroy(pRecycleList); - return TSDB_CODE_SUCCESS; + return; } tdbTbcMoveToFirst(pCur); @@ -937,7 +937,6 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { pMeta->numOfStreamTasks, pMeta->numOfPausedTasks); taosArrayDestroy(pRecycleList); - return TSDB_CODE_SUCCESS; } int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { @@ -1644,7 +1643,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 if (pStartInfo->startAllTasks != 1) { int64_t el = endTs - startTs; - qDebug("vgId:%d not start all task(s), not record status, s-task:0x%x launch succ:%d elapsed time:%" PRId64 "ms", + stDebug("vgId:%d not start all task(s), not record status, s-task:0x%x launch succ:%d elapsed time:%" PRId64 "ms", pMeta->vgId, taskId, ready, el); streamMetaWUnLock(pMeta); return 0; @@ -1652,7 +1651,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (p == NULL) { // task does not exists in current vnode, not record the complete info - qError("vgId:%d s-task:0x%x not exists discard the check downstream info", pMeta->vgId, taskId); + stError("vgId:%d s-task:0x%x not exists discard the check downstream info", pMeta->vgId, taskId); streamMetaWUnLock(pMeta); return 0; } From 05a204dd6c51717c78a0640816bf66528c42b93f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 19:30:56 +0800 Subject: [PATCH 24/34] refactor: do some internal refactor. --- source/libs/stream/src/streamMeta.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index aca0a38d48..a091a866a0 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -314,6 +314,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF } if (streamMetaMayCvtDbFormat(pMeta) < 0) { + stError("vgId:%d convert sub info format failed, open stream meta failed", pMeta->vgId); goto _err; } @@ -372,8 +373,8 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->numOfPausedTasks = 0; pMeta->numOfStreamTasks = 0; - stInfo("vgId:%d open stream meta successfully, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, - stage); + + stInfo("vgId:%d open stream meta succ, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage); pMeta->rid = taosAddRef(streamMetaId, pMeta); From bc56d863149baad8994d53bdc03306bb387a9777 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 30 Apr 2024 19:48:34 +0800 Subject: [PATCH 25/34] fix:[TD-29893]mkdir error in mac --- source/os/src/osDir.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/os/src/osDir.c b/source/os/src/osDir.c index 6e52c4ed27..667c3c146a 100644 --- a/source/os/src/osDir.c +++ b/source/os/src/osDir.c @@ -224,7 +224,7 @@ int32_t taosMulModeMkDir(const char *dirname, int mode, bool checkAccess) { #ifdef WINDOWS code = _mkdir(temp, mode); #elif defined(DARWIN) - code = mkdir(dirname, 0777); + code = mkdir(temp, 0777); #else code = mkdir(temp, mode); #endif From cdc7b03ac697e13622a2ede04b89646543576c5b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Apr 2024 22:44:32 +0800 Subject: [PATCH 26/34] fix(stream): fix syntax error. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 61ecbe8e5c..e404f1e7b9 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -764,13 +764,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { int64_t el = taosGetTimestampMs() - st; tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el / 1000.); - code = streamMetaLoadAllTasks(pMeta); - if (code != TSDB_CODE_SUCCESS) { - tqError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno)); - streamMetaWUnLock(pMeta); - code = terrno; - return code; - } + streamMetaLoadAllTasks(pMeta); { STaskStartInfo* pStartInfo = &pMeta->startInfo; From c5afbf547449cdf6122b94426e141a5d95f2fc6e Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Sat, 4 May 2024 21:09:25 +0800 Subject: [PATCH 27/34] fix: memory leak with cmd.command and buffer --- tools/shell/src/shellCommand.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tools/shell/src/shellCommand.c b/tools/shell/src/shellCommand.c index 265dc4530f..a160d2780c 100644 --- a/tools/shell/src/shellCommand.c +++ b/tools/shell/src/shellCommand.c @@ -488,6 +488,8 @@ int32_t shellReadCommand(char *command) { c = taosGetConsoleChar(); if (c == (char)EOF) { + taosMemoryFreeClear(cmd.buffer); + taosMemoryFreeClear(cmd.command); return c; } @@ -524,6 +526,8 @@ int32_t shellReadCommand(char *command) { case 4: // EOF or Ctrl+D taosResetTerminalMode(); printf("\r\n"); + taosMemoryFreeClear(cmd.buffer); + taosMemoryFreeClear(cmd.command); return -1; case 5: // ctrl E shellPositionCursorEnd(&cmd); From 9ad5c9404850f4369bf0cac0f562d2905228a67b Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Mon, 6 May 2024 09:56:47 +0800 Subject: [PATCH 28/34] Update 12-distinguished.md --- docs/zh/12-taos-sql/12-distinguished.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/12-taos-sql/12-distinguished.md b/docs/zh/12-taos-sql/12-distinguished.md index f73919a970..1cd3cd9f6b 100755 --- a/docs/zh/12-taos-sql/12-distinguished.md +++ b/docs/zh/12-taos-sql/12-distinguished.md @@ -53,7 +53,7 @@ window_clause: { } ``` -其中,interval_val 和 sliding_val 都表示时间段, 语法上支持三种方式,举例说明如下: +其中,interval_val 和 sliding_val 都表示时间段,interval_offset 表示窗口偏移量,interval_offset必须小于interval_val。语法上支持三种方式,举例说明如下: - INTERVAL(1s, 500a) SLIDING(1s), 自带时间单位的形式,其中的时间单位是单字符表示, 分别为: a (毫秒), b (纳秒), d (天), h (小时), m (分钟), n (月), s (秒), u (微妙), w (周), y (年). - INTERVAL(1000, 500) SLIDING(1000), 不带时间单位的形式,将使用查询库的时间精度作为默认时间单位,当存在多个库时默认采用精度更高的库. - INTERVAL('1s', '500a') SLIDING('1s'), 自带时间单位的字符串形式,字符串内部不能有任何空格等其它字符. From 135be711780ee570aa202ebd23aa984f42fd5df3 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Mon, 6 May 2024 09:58:27 +0800 Subject: [PATCH 29/34] Update 12-distinguished.md --- docs/zh/12-taos-sql/12-distinguished.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/12-taos-sql/12-distinguished.md b/docs/zh/12-taos-sql/12-distinguished.md index 1cd3cd9f6b..7fa1dbe66c 100755 --- a/docs/zh/12-taos-sql/12-distinguished.md +++ b/docs/zh/12-taos-sql/12-distinguished.md @@ -53,7 +53,7 @@ window_clause: { } ``` -其中,interval_val 和 sliding_val 都表示时间段,interval_offset 表示窗口偏移量,interval_offset必须小于interval_val。语法上支持三种方式,举例说明如下: +其中,interval_val 和 sliding_val 都表示时间段,interval_offset 表示窗口偏移量,interval_offset 必须小于 interval_val。语法上支持三种方式,举例说明如下: - INTERVAL(1s, 500a) SLIDING(1s), 自带时间单位的形式,其中的时间单位是单字符表示, 分别为: a (毫秒), b (纳秒), d (天), h (小时), m (分钟), n (月), s (秒), u (微妙), w (周), y (年). - INTERVAL(1000, 500) SLIDING(1000), 不带时间单位的形式,将使用查询库的时间精度作为默认时间单位,当存在多个库时默认采用精度更高的库. - INTERVAL('1s', '500a') SLIDING('1s'), 自带时间单位的字符串形式,字符串内部不能有任何空格等其它字符. From 61f5b34de1231375225f14d6c16a157c3825c316 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Mon, 6 May 2024 10:00:08 +0800 Subject: [PATCH 30/34] Update 12-distinguished.md --- docs/zh/12-taos-sql/12-distinguished.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/12-taos-sql/12-distinguished.md b/docs/zh/12-taos-sql/12-distinguished.md index 7fa1dbe66c..b979d44a5e 100755 --- a/docs/zh/12-taos-sql/12-distinguished.md +++ b/docs/zh/12-taos-sql/12-distinguished.md @@ -53,7 +53,7 @@ window_clause: { } ``` -其中,interval_val 和 sliding_val 都表示时间段,interval_offset 表示窗口偏移量,interval_offset 必须小于 interval_val。语法上支持三种方式,举例说明如下: +其中,interval_val 和 sliding_val 都表示时间段,interval_offset 表示窗口偏移量,interval_offset 必须小于 interval_val,语法上支持三种方式,举例说明如下: - INTERVAL(1s, 500a) SLIDING(1s), 自带时间单位的形式,其中的时间单位是单字符表示, 分别为: a (毫秒), b (纳秒), d (天), h (小时), m (分钟), n (月), s (秒), u (微妙), w (周), y (年). - INTERVAL(1000, 500) SLIDING(1000), 不带时间单位的形式,将使用查询库的时间精度作为默认时间单位,当存在多个库时默认采用精度更高的库. - INTERVAL('1s', '500a') SLIDING('1s'), 自带时间单位的字符串形式,字符串内部不能有任何空格等其它字符. From ffa2ec3876bbbe891ef30f36666a83759cd917db Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Mon, 6 May 2024 10:16:26 +0800 Subject: [PATCH 31/34] Update 12-distinguished.md --- docs/en/12-taos-sql/12-distinguished.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/12-taos-sql/12-distinguished.md b/docs/en/12-taos-sql/12-distinguished.md index 90237a54f5..0cc40506d0 100644 --- a/docs/en/12-taos-sql/12-distinguished.md +++ b/docs/en/12-taos-sql/12-distinguished.md @@ -31,7 +31,7 @@ A PARTITION BY clause is processed as follows: select _wstart, location, max(current) from meters partition by location interval(10m) ``` -The most common usage of PARTITION BY is partitioning the data in subtables by tags then perform computation when querying data in a supertable. More specifically, `PARTITION BY TBNAME` partitions the data of each subtable into a single timeline, and this method facilitates the statistical analysis in many use cases of processing timeseries data. For example, calculate the average voltage of each meter every 10 minutes +The most common usage of PARTITION BY is partitioning the data in subtables by tags then perform computation when querying data in a supertable. More specifically, `PARTITION BY TBNAME` partitions the data of each subtable into a single timeline, and this method facilitates the statistical analysis in many use cases of processing timeseries data. For example, calculate the average voltage of each meter every 10 minutes£º ```sql select _wstart, tbname, avg(voltage) from meters partition by tbname interval(10m) ``` @@ -49,7 +49,7 @@ window_clause: { } ``` -Both interval_val and sliding_value are time durations which have 3 forms of representation. +Both interval_val and sliding_value are time durations, and interval_offset is the window offset, interval_offset must be less than interval_val, There are 3 forms of representation. - INTERVAL(1s, 500a) SLIDING(1s), the unit char should be any one of a (millisecond), b (nanosecond), d (day), h (hour), m (minute), n (month), s (second), u (microsecond), w (week), y (year). - INTERVAL(1000, 500) SLIDING(1000), the unit will the same as the queried database, if there are more than one databases, higher precision will be used. - INTERVAL('1s', '500a') SLIDING('1s'), unit must be specified, no spaces allowed. From c61e6f7e45bce40febb605dec9ea37c82cb1c7f6 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Mon, 6 May 2024 10:17:25 +0800 Subject: [PATCH 32/34] Update 12-distinguished.md --- docs/en/12-taos-sql/12-distinguished.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/12-taos-sql/12-distinguished.md b/docs/en/12-taos-sql/12-distinguished.md index 0cc40506d0..4639a13bcc 100644 --- a/docs/en/12-taos-sql/12-distinguished.md +++ b/docs/en/12-taos-sql/12-distinguished.md @@ -44,7 +44,7 @@ Aggregation by time window is supported in TDengine. For example, in the case wh window_clause: { SESSION(ts_col, tol_val) | STATE_WINDOW(col) - | INTERVAL(interval_val [, offset]) [SLIDING (sliding_value)] [FILL({NONE | VALUE | PREV | NULL | LINEAR | NEXT})] + | INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_value)] [FILL({NONE | VALUE | PREV | NULL | LINEAR | NEXT})] | EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition } ``` From bcdc33f0e29626d9908b062357ed9290585ad885 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Mon, 6 May 2024 10:25:55 +0800 Subject: [PATCH 33/34] Update 06-select.md --- docs/zh/12-taos-sql/06-select.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/zh/12-taos-sql/06-select.md b/docs/zh/12-taos-sql/06-select.md index b6792d0efc..d754821749 100755 --- a/docs/zh/12-taos-sql/06-select.md +++ b/docs/zh/12-taos-sql/06-select.md @@ -58,6 +58,8 @@ window_clause: { SESSION(ts_col, tol_val) | STATE_WINDOW(col) | INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [WATERMARK(watermark_val)] [FILL(fill_mod_and_val)] + | EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition + | COUNT_WINDOW(count_val[, sliding_val]) interp_clause: RANGE(ts_val [, ts_val]) EVERY(every_val) FILL(fill_mod_and_val) From ab3df445fa497b03eadc158d042d46d5ef48a504 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Mon, 6 May 2024 10:27:40 +0800 Subject: [PATCH 34/34] Update 06-select.md --- docs/en/12-taos-sql/06-select.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/en/12-taos-sql/06-select.md b/docs/en/12-taos-sql/06-select.md index c6740a5242..e6ee64ddac 100755 --- a/docs/en/12-taos-sql/06-select.md +++ b/docs/en/12-taos-sql/06-select.md @@ -58,6 +58,8 @@ window_clause: { SESSION(ts_col, tol_val) | STATE_WINDOW(col) | INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)] [WATERMARK(watermark_val)] [FILL(fill_mod_and_val)] + | EVENT_WINDOW START WITH start_trigger_condition END WITH end_trigger_condition + | COUNT_WINDOW(count_val[, sliding_val]) interp_clause: RANGE(ts_val [, ts_val]) EVERY(every_val) FILL(fill_mod_and_val)