diff --git a/deps/arm/dm_static/libdmodule.a b/deps/arm/dm_static/libdmodule.a index dbad0112cf..37077ef63b 100644 Binary files a/deps/arm/dm_static/libdmodule.a and b/deps/arm/dm_static/libdmodule.a differ diff --git a/deps/darwin/arm/dm_static/libdmodule.a b/deps/darwin/arm/dm_static/libdmodule.a index 2aab587b18..246b2247af 100644 Binary files a/deps/darwin/arm/dm_static/libdmodule.a and b/deps/darwin/arm/dm_static/libdmodule.a differ diff --git a/deps/darwin/x64/dm_static/libdmodule.a b/deps/darwin/x64/dm_static/libdmodule.a index 1fb6794f65..8745f57636 100644 Binary files a/deps/darwin/x64/dm_static/libdmodule.a and b/deps/darwin/x64/dm_static/libdmodule.a differ diff --git a/deps/mips/dm_static/libdmodule.a b/deps/mips/dm_static/libdmodule.a index d4b0582498..855a6a41d9 100644 Binary files a/deps/mips/dm_static/libdmodule.a and b/deps/mips/dm_static/libdmodule.a differ diff --git a/deps/x86/dm_static/libdmodule.a b/deps/x86/dm_static/libdmodule.a index 9d37818a79..6a3c0d45c2 100644 Binary files a/deps/x86/dm_static/libdmodule.a and b/deps/x86/dm_static/libdmodule.a differ diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 84d0dd4982..22a176f0bb 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -27,6 +27,8 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); +int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored); int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 8c726f9f03..1b1dcc9b54 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -87,8 +87,10 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 4213541351..bfac0bab9d 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -835,9 +835,11 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/node_mgmt/CMakeLists.txt b/source/dnode/mgmt/node_mgmt/CMakeLists.txt index 15c1d2fa4d..0cdc68345a 100644 --- a/source/dnode/mgmt/node_mgmt/CMakeLists.txt +++ b/source/dnode/mgmt/node_mgmt/CMakeLists.txt @@ -14,6 +14,10 @@ IF (TD_STORAGE) ENDIF () +IF (DEFINED GRANT_CFG_INCLUDE_DIR) + add_definitions(-DGRANTS_CFG) +ENDIF() + target_include_directories( dnode PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" diff --git a/source/dnode/mnode/impl/CMakeLists.txt b/source/dnode/mnode/impl/CMakeLists.txt index a172756aad..ceaf086dc1 100644 --- a/source/dnode/mnode/impl/CMakeLists.txt +++ b/source/dnode/mnode/impl/CMakeLists.txt @@ -26,6 +26,10 @@ target_link_libraries( mnode scheduler sdb wal transport cjson sync monitor executor qworker stream parser audit monitorfw ) +IF (DEFINED GRANT_CFG_INCLUDE_DIR) + add_definitions(-DGRANTS_CFG) +ENDIF() + IF (TD_GRANT) TARGET_LINK_LIBRARIES(mnode grant) ADD_DEFINITIONS(-D_GRANT) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index aed49809dd..57fd187da3 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -86,6 +86,10 @@ typedef struct SOrphanTask { int32_t nodeId; } SOrphanTask; +typedef struct { + SMsgHead head; +} SMStreamHbRspMsg, SMStreamReqCheckpointRspMsg; + int32_t mndInitStream(SMnode *pMnode); void mndCleanupStream(SMnode *pMnode); SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName); diff --git a/source/dnode/mnode/impl/src/mndCompact.c b/source/dnode/mnode/impl/src/mndCompact.c index 4e71684372..deaaf7f2af 100644 --- a/source/dnode/mnode/impl/src/mndCompact.c +++ b/source/dnode/mnode/impl/src/mndCompact.c @@ -240,7 +240,7 @@ int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj* pCompac SSdbRaw *pVgRaw = mndCompactActionEncode(pCompact); if (pVgRaw == NULL) return -1; - if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) { + if (mndTransAppendPrepareLog(pTrans, pVgRaw) != 0) { sdbFreeRaw(pVgRaw); return -1; } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index ef9a7205e1..9aba428ff6 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -300,15 +300,16 @@ static int32_t doAddShuffleSinkTask(SMnode* pMnode, SStreamObj* pStream, SEpSet* } static int64_t getVgroupLastVer(const SArray* pList, int32_t vgId) { - for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { + int32_t size = (int32_t) taosArrayGetSize(pList); + for (int32_t i = 0; i < size; ++i) { SVgroupVer* pVer = taosArrayGet(pList, i); if (pVer->vgId == vgId) { return pVer->ver; } } - mError("failed to find the vgId:%d for extract last version", vgId); - return -1; + mDebug("no data in vgId:%d for extract last version, set to be 0, total existed vgs:%d", vgId, size); + return 1; } static void streamTaskSetDataRange(SStreamTask* pTask, int64_t skey, SArray* pVerList, int32_t vgId) { @@ -472,6 +473,9 @@ static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream int code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam); if (code != 0) { + mError("create stream task, code:%s", tstrerror(code)); + + // todo drop the added source tasks. sdbRelease(pSdb, pVgroup); return code; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 8da56d2d46..3ef2f64df7 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -877,7 +877,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre int64_t ts = taosGetTimestampMs(); if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) { // mWarn("checkpoint interval less than the threshold, ignore it"); - return -1; + return TSDB_CODE_SUCCESS; } bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock); @@ -2179,5 +2179,16 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { mndReleaseStream(pMnode, pStream); taosThreadMutexUnlock(&execInfo.lock); + { + SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRspMsg)}; + rsp.pCont = rpcMallocCont(rsp.contLen); + SMsgHead* pHead = rsp.pCont; + pHead->vgId = htonl(req.nodeId); + + tmsgSendRsp(&rsp); + + pReq->info.handle = NULL; // disable auto rsp + } + return 0; } diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 1d296a1c6e..14f3c533e3 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -16,10 +16,6 @@ #include "mndStream.h" #include "mndTrans.h" -typedef struct { - SMsgHead head; -} SMStreamHbRspMsg; - typedef struct SFailedCheckpointInfo { int64_t streamUid; int64_t checkpointId; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 4dcbfe169d..04d74112d4 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -1217,7 +1217,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA if (numOfActions == 0) return 0; if ((code = mndTransExecSingleActions(pMnode, pTrans, pArray, topHalf)) != 0) { - return -1; + return code; } int32_t numOfExecuted = 0; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 34623c021b..3bef5b595b 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -180,6 +180,10 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { return tqStreamTaskProcessCheckpointReadyMsg(pSnode->pMeta, pMsg); case TDMT_MND_STREAM_HEARTBEAT_RSP: return tqStreamProcessStreamHbRsp(pSnode->pMeta, pMsg); + case TDMT_MND_STREAM_REQ_CHKPT_RSP: + return tqStreamProcessReqCheckpointRsp(pSnode->pMeta, pMsg); + case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP: + return tqStreamProcessCheckpointReadyRsp(pSnode->pMeta, pMsg); default: sndError("invalid snode msg:%d", pMsg->msgType); ASSERT(0); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index c9a6c7a10d..e610161544 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -259,6 +259,8 @@ int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqScanWal(STQ* pTq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a3fd19c7f8..011e62cb89 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1175,7 +1175,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) return TSDB_CODE_SUCCESS; } } else { - ASSERT(status == TASK_STATUS__HALT); +// ASSERT(status == TASK_STATUS__HALT); + if (status != TASK_STATUS__HALT) { + tqError("s-task:%s should in halt status, let's halt it directly", pTask->id.idStr); +// streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT); + } } // check if the checkpoint msg already sent or not. @@ -1225,3 +1229,11 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) { return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg); } + +int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg) { + return tqStreamProcessReqCheckpointRsp(pTq->pStreamMeta, pMsg); +} + +int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg) { + return tqStreamProcessCheckpointReadyRsp(pTq->pStreamMeta, pMsg); +} diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 4e49091da4..a2d45062b9 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -485,6 +485,10 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe return code; } +typedef struct SMStreamCheckpointReadyRspMsg { + SMsgHead head; +}SMStreamCheckpointReadyRspMsg; + int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) { int32_t vgId = pMeta->vgId; char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); @@ -513,6 +517,18 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) streamProcessCheckpointReadyMsg(pTask); streamMetaReleaseTask(pMeta, pTask); + + { // send checkpoint ready rsp + SRpcMsg rsp = {.code = 0, .info = pMsg->info, .contLen = sizeof(SMStreamCheckpointReadyRspMsg)}; + rsp.pCont = rpcMallocCont(rsp.contLen); + SMsgHead* pHead = rsp.pCont; + pHead->vgId = htonl(req.downstreamNodeId); + + tmsgSendRsp(&rsp); + + pMsg->info.handle = NULL; // disable auto rsp + } + return code; } @@ -938,9 +954,17 @@ int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) { return taosArrayGetSize(pMeta->pTaskList); } -int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { +static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) { rpcFreeCont(pMsg->pCont); pMsg->pCont = NULL; return TSDB_CODE_SUCCESS; } + +int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); } + +int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); } + +int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { + return doProcessDummyRspMsg(pMeta, pMsg); +} \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index e1f76b3a25..63ca7251f5 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -484,22 +484,26 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC snprintf(pVnode->monitor.strDnodeId, TSDB_NODE_ID_LEN, "%"PRId32, pVnode->config.syncCfg.nodeInfo[0].nodeId); snprintf(pVnode->monitor.strVgId, TSDB_VGROUP_ID_LEN, "%"PRId32, pVnode->config.vgId); - if(pVnode->monitor.insertCounter == NULL){ - int32_t label_count = 7; - const char *sample_labels[] = {VNODE_METRIC_TAG_NAME_SQL_TYPE, VNODE_METRIC_TAG_NAME_CLUSTER_ID, - VNODE_METRIC_TAG_NAME_DNODE_ID, VNODE_METRIC_TAG_NAME_DNODE_EP, - VNODE_METRIC_TAG_NAME_VGROUP_ID, VNODE_METRIC_TAG_NAME_USERNAME, - VNODE_METRIC_TAG_NAME_RESULT}; - taos_counter_t *counter = taos_counter_new(VNODE_METRIC_SQL_COUNT, "counter for insert sql", - label_count, sample_labels); - vInfo("vgId:%d, new metric:%p",TD_VID(pVnode), counter); - if(taos_collector_registry_register_metric(counter) == 1){ - taos_counter_destroy(counter); - counter = taos_collector_registry_get_metric(VNODE_METRIC_SQL_COUNT); - vInfo("vgId:%d, get metric from registry:%p",TD_VID(pVnode), counter); + if(tsEnableMonitor && pVnode->monitor.insertCounter == NULL){ + taos_counter_t *counter = NULL; + counter = taos_collector_registry_get_metric(VNODE_METRIC_SQL_COUNT); + if(counter == NULL){ + int32_t label_count = 7; + const char *sample_labels[] = {VNODE_METRIC_TAG_NAME_SQL_TYPE, VNODE_METRIC_TAG_NAME_CLUSTER_ID, + VNODE_METRIC_TAG_NAME_DNODE_ID, VNODE_METRIC_TAG_NAME_DNODE_EP, + VNODE_METRIC_TAG_NAME_VGROUP_ID, VNODE_METRIC_TAG_NAME_USERNAME, + VNODE_METRIC_TAG_NAME_RESULT}; + counter = taos_counter_new(VNODE_METRIC_SQL_COUNT, "counter for insert sql", + label_count, sample_labels); + vInfo("vgId:%d, new metric:%p",TD_VID(pVnode), counter); + if(taos_collector_registry_register_metric(counter) == 1){ + taos_counter_destroy(counter); + counter = taos_collector_registry_get_metric(VNODE_METRIC_SQL_COUNT); + vInfo("vgId:%d, get metric from registry:%p",TD_VID(pVnode), counter); + } } pVnode->monitor.insertCounter = counter; - vInfo("vgId:%d, succeed to set metric:%p",TD_VID(pVnode), counter); + vInfo("vgId:%d, succeed to set metric:%p",TD_VID(pVnode), counter); } return pVnode; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index b5ae29f434..e32d4b70e0 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -16,6 +16,7 @@ #include "audit.h" #include "cos.h" #include "tencode.h" +#include "tglobal.h" #include "tmsg.h" #include "tstrbuild.h" #include "vnd.h" @@ -800,6 +801,10 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg); case TDMT_MND_STREAM_HEARTBEAT_RSP: return tqProcessStreamHbRsp(pVnode->pTq, pMsg); + case TDMT_MND_STREAM_REQ_CHKPT_RSP: + return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg); + case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP: + return tqProcessTaskCheckpointReadyRsp(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in stream queue", pMsg->msgType); return TSDB_CODE_APP_ERROR; @@ -1704,7 +1709,7 @@ _exit: atomic_add_fetch_64(&pVnode->statis.nInsertSuccess, pSubmitRsp->affectedRows); atomic_add_fetch_64(&pVnode->statis.nBatchInsert, 1); - if(pSubmitRsp->affectedRows > 0 && strlen(pOriginalMsg->info.conn.user) > 0){ + if(tsEnableMonitor && pSubmitRsp->affectedRows > 0 && strlen(pOriginalMsg->info.conn.user) > 0){ const char *sample_labels[] = {VNODE_METRIC_TAG_VALUE_INSERT_AFFECTED_ROWS, pVnode->monitor.strClusterId, pVnode->monitor.strDnodeId, tsLocalEp, pVnode->monitor.strVgId, pOriginalMsg->info.conn.user, "Success"}; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 2f2bbe2016..9d285e2a53 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -373,6 +373,7 @@ static int32_t collectUseTable(const SName* pName, SHashObj* pTable) { return taosHashPut(pTable, fullName, strlen(fullName), pName, sizeof(SName)); } +#ifdef BUILD_NO_CALL static int32_t getViewMetaImpl(SParseContext* pParCxt, SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta) { #ifndef TD_ENTERPRISE return TSDB_CODE_PAR_TABLE_NOT_EXIST; @@ -396,6 +397,7 @@ static int32_t getViewMetaImpl(SParseContext* pParCxt, SParseMetaCache* pMetaCac } return code; } +#endif int32_t getTargetMetaImpl(SParseContext* pParCxt, SParseMetaCache* pMetaCache, const SName* pName, STableMeta** pMeta, bool couldBeView) { int32_t code = TSDB_CODE_SUCCESS; @@ -774,9 +776,11 @@ static bool isAggFunc(const SNode* pNode) { return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsAggFunc(((SFunctionNode*)pNode)->funcId)); } +#ifdef BUILD_NO_CALL static bool isSelectFunc(const SNode* pNode) { return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsSelectFunc(((SFunctionNode*)pNode)->funcId)); } +#endif static bool isWindowPseudoColumnFunc(const SNode* pNode) { return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsWindowPseudoColumnFunc(((SFunctionNode*)pNode)->funcId)); @@ -790,9 +794,11 @@ static bool isInterpPseudoColumnFunc(const SNode* pNode) { return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsInterpPseudoColumnFunc(((SFunctionNode*)pNode)->funcId)); } +#ifdef BUILD_NO_CALL static bool isTimelineFunc(const SNode* pNode) { return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsTimelineFunc(((SFunctionNode*)pNode)->funcId)); } +#endif static bool isImplicitTsFunc(const SNode* pNode) { return (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsImplicitTsFunc(((SFunctionNode*)pNode)->funcId)); @@ -7750,9 +7756,11 @@ static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, STabl return code; } +#ifdef BUILD_NO_CALL static bool isEventWindowQuery(SSelectStmt* pSelect) { return NULL != pSelect->pWindow && QUERY_NODE_EVENT_WINDOW == nodeType(pSelect->pWindow); } +#endif static bool hasJsonTypeProjection(SSelectStmt* pSelect) { SNode* pProj = NULL; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 60c545b9e5..78b914c3db 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -907,7 +907,6 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, SStreamChkptReadyInfo info = {.upStreamTaskId = pInfo->taskId, .upstreamNodeEpset = pInfo->epSet}; initRpcMsg(&info.msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead)); - info.msg.info.noResp = 1; // refactor later. stDebug("s-task:%s (level:%d) prepare checkpoint ready msg to upstream s-task:0x%" PRIx64 ":0x%x (vgId:%d) idx:%d, vgId:%d", diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index b63dc50836..9639921c77 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -934,9 +934,8 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) { } tEncoderClear(&encoder); - SRpcMsg msg = {.info.noResp = 1}; + SRpcMsg msg = {0}; initRpcMsg(&msg, TDMT_MND_STREAM_REQ_CHKPT, buf, tlen); - stDebug("s-task:%s vgId:%d build and send task checkpoint req", id, vgId); tmsgSendReq(&pTask->info.mnodeEpset, &msg); diff --git a/tests/army/community/cluster/incSnapshot.py b/tests/army/community/cluster/incSnapshot.py index 6bcf547136..85f030eb03 100644 --- a/tests/army/community/cluster/incSnapshot.py +++ b/tests/army/community/cluster/incSnapshot.py @@ -67,7 +67,6 @@ class TDTestCase(TBase): dirs = glob.glob(dnodesRootDir) for dir in dirs: if os.path.isdir(dir): - tdLog.debug("delete dir: %s " % (dnodesRootDir)) self.remove_directory(os.path.join(dir, "wal")) sc.dnodeStart(1) @@ -88,7 +87,7 @@ class TDTestCase(TBase): if bFinish: break - self.timestamp_step = 1 + self.timestamp_step = 1000 self.insert_rows = 6000 self.checkInsertCorrect() self.checkAggCorrect() diff --git a/tests/army/community/query/show.py b/tests/army/community/query/show.py new file mode 100644 index 0000000000..8b6844820e --- /dev/null +++ b/tests/army/community/query/show.py @@ -0,0 +1,152 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import time +import random + +import taos +import frame +import frame.etool + +from frame.log import * +from frame.cases import * +from frame.sql import * +from frame.caseBase import * +from frame import * +from frame.autogen import * + + +class TDTestCase(TBase): + updatecfgDict = { + } + + def insertData(self): + tdLog.info(f"create table and insert data.") + self.stb = "stb" + self.db = "db" + self.childtable_count = 10 + self.insert_rows = 10000 + + self.autoGen = AutoGen(startTs = 1600000000000*1000*1000, batch=500, fillOne=True) + self.autoGen.create_db(self.db, 2, 3, "precision 'ns'") + self.autoGen.create_stable(stbname = self.stb, tag_cnt = 5, column_cnt = 20, binary_len = 10, nchar_len = 5) + self.autoGen.create_child(self.stb, "child", self.childtable_count) + self.autoGen.insert_data(self.insert_rows, True) + + tdLog.info("create view.") + tdSql.execute(f"use {self.db}") + sqls = [ + "create view viewc0c1 as select c0,c1 from stb ", + "create view viewc0c1c2 as select c0,c1,c2 from stb ", + "create view viewc0c3 as select c0,c3 from stb where c3=1", + "create view viewc0c4c5 as select c4,c5 from stb ", + "create view viewc0c6 as select c0,c1,c6 from stb ", + "create view viewc0c7 as select c0,c1 from stb ", + "create view viewc0c7c8 as select c0,c7,c8 from stb where c8>0", + "create view viewc0c3c1 as select c0,c3,c1 from stb ", + "create view viewc2c4 as select c2,c4 from stb ", + "create view viewc2c5 as select c2,c5 from stb ", + ] + tdSql.executes(sqls) + + def checkView(self): + tdLog.info(f"check view like.") + + # like + sql = f"show views like 'view%'" + tdSql.query(sql) + tdSql.checkRows(10) + + sql = f"show views like 'vie_c0c1c2'" + tdSql.query(sql) + tdSql.checkRows(1) + tdSql.checkData(0,0,"viewc0c1c2") + + sql = f"show views like '%c2c_'" + tdSql.query(sql) + tdSql.checkRows(2) + tdSql.checkData(0,0, "viewc2c4") + tdSql.checkData(1,0, "viewc2c5") + + sql = f"show views like '%' " + tdSql.query(sql) + tdSql.checkRows(10) + + # zero + sql = "show views like '_' " + tdSql.query(sql) + tdSql.checkRows(0) + sql = "show views like 'a%' " + tdSql.query(sql) + tdSql.checkRows(0) + + + def doQuery(self): + tdLog.info(f"do query.") + + # __group_key + sql = f"select count(*) from {self.stb} " + tdSql.query(sql) + # column index 1 value same with 2 + allRows = self.insert_rows * self.childtable_count + tdSql.checkFirstValue(sql, allRows) + + def checkShow(self): + # not support + sql = "show accounts;" + tdSql.error(sql) + + # check result + sql = "SHOW CLUSTER;" + tdSql.query(sql) + tdSql.checkRows(1) + sql = "SHOW COMPACTS;" + tdSql.query(sql) + tdSql.checkRows(0) + sql = "SHOW COMPACT 1;" + tdSql.query(sql) + tdSql.checkRows(0) + sql = "SHOW CLUSTER MACHINES;" + tdSql.query(sql) + tdSql.checkRows(1) + + # run to check crash + sqls = [ + "show scores;", + "SHOW CLUSTER VARIABLES", + "SHOW BNODES;", + ] + tdSql.executes(sqls) + + + # run + def run(self): + tdLog.debug(f"start to excute {__file__}") + + # insert data + self.insertData() + + # check view + self.checkView() + + # do action + self.doQuery() + + + tdLog.success(f"{__file__} successfully executed") + + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/army/enterprise/s3/s3_basic.json b/tests/army/enterprise/s3/s3_basic.json index d7544a897c..747ac7c8ec 100644 --- a/tests/army/enterprise/s3/s3_basic.json +++ b/tests/army/enterprise/s3/s3_basic.json @@ -6,7 +6,7 @@ "user": "root", "password": "taosdata", "connection_pool_size": 8, - "num_of_records_per_req": 2000, + "num_of_records_per_req": 4000, "prepared_rand": 1000, "thread_count": 2, "create_table_thread_count": 1, @@ -18,26 +18,27 @@ "drop": "yes", "vgroups": 2, "replica": 1, - "duration":"1d", - "keep": "3d,6d,30d" + "duration":"15d", + "flush_each_batch":"yes", + "keep": "60d,100d,200d" }, "super_tables": [ { "name": "stb", "child_table_exists": "no", - "childtable_count": 4, - "insert_rows": 1000000, + "childtable_count": 2, + "insert_rows": 2000000, "childtable_prefix": "d", "insert_mode": "taosc", "timestamp_step": 1000, - "start_timestamp":"now-13d", + "start_timestamp":"now-90d", "columns": [ { "type": "bool", "name": "bc"}, { "type": "float", "name": "fc" }, { "type": "double", "name": "dc"}, - { "type": "tinyint", "name": "ti", "values":["1"]}, + { "type": "tinyint", "name": "ti"}, { "type": "smallint", "name": "si" }, - { "type": "int", "name": "ic" }, + { "type": "int", "name": "ic" ,"max": 1,"min": 1}, { "type": "bigint", "name": "bi" }, { "type": "utinyint", "name": "uti"}, { "type": "usmallint", "name": "usi"}, diff --git a/tests/army/enterprise/s3/s3_basic.py b/tests/army/enterprise/s3/s3_basic.py index 45519d925f..e9173dda00 100644 --- a/tests/army/enterprise/s3/s3_basic.py +++ b/tests/army/enterprise/s3/s3_basic.py @@ -58,8 +58,8 @@ class TDTestCase(TBase): tdSql.execute(f"use {self.db}") # come from s3_basic.json - self.childtable_count = 4 - self.insert_rows = 1000000 + self.childtable_count = 2 + self.insert_rows = 2000000 self.timestamp_step = 1000 def createStream(self, sname): diff --git a/tests/army/frame/autogen.py b/tests/army/frame/autogen.py index d1f02e7865..cf21977c75 100644 --- a/tests/army/frame/autogen.py +++ b/tests/army/frame/autogen.py @@ -14,15 +14,18 @@ import time # Auto Gen class # class AutoGen: - def __init__(self, fillOne=False): - self.ts = 1600000000000 - self.batch_size = 100 + def __init__(self, startTs = 1600000000000, step = 1000, batch = 100, fillOne=False): + self.startTs = startTs + self.ts = startTs + self.step = step + self.batch_size = batch + self.fillOne = fillOne seed = time.time() % 10000 random.seed(seed) - self.fillOne = fillOne # set start ts def set_start_ts(self, ts): + self.startTs = ts self.ts = ts # set batch size @@ -111,9 +114,9 @@ class AutoGen: return ''.join(random.choice(letters) for i in range(count)) # create db - def create_db(self, dbname, vgroups = 2, replica = 1): + def create_db(self, dbname, vgroups = 2, replica = 1, others=""): self.dbname = dbname - tdSql.execute(f'create database {dbname} vgroups {vgroups} replica {replica}') + tdSql.execute(f'create database {dbname} vgroups {vgroups} replica {replica} {others}') # create table or stable def create_stable(self, stbname, tag_cnt, column_cnt, binary_len, nchar_len): @@ -167,12 +170,12 @@ class AutoGen: def insert_data(self, cnt, bContinue=False): if not bContinue: - self.ts = 1600000000000 + self.ts = self.startTs - currTs = 1600000000000 + currTs = self.startTs for i in range(self.child_cnt): name = f"{self.child_name}{i}" - currTs = self.insert_data_child(name, cnt, self.batch_size, 1) + currTs = self.insert_data_child(name, cnt, self.batch_size, self.step) self.ts = currTs tdLog.info(f" insert data ok, child table={self.child_cnt} insert rows={cnt}") diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 12f1f62f63..bdccf33c32 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -15,10 +15,11 @@ ,,y,army,./pytest.sh python3 ./test.py -f community/cluster/snapshot.py -N 3 -L 3 -D 2 ,,y,army,./pytest.sh python3 ./test.py -f community/query/function/test_func_elapsed.py ,,y,army,./pytest.sh python3 ./test.py -f community/query/fill/fill_desc.py -N 3 -L 3 -D 2 -,,y,army,./pytest.sh python3 ./test.py -f community/cluster/incSnapshot.py -N 3 -L 3 -D 2 +,,y,army,./pytest.sh python3 ./test.py -f community/cluster/incSnapshot.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f community/query/query_basic.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f community/cluster/splitVgroupByLearner.py -N 3 ,,n,army,python3 ./test.py -f community/cmdline/fullopt.py +,,n,army,python3 ./test.py -f community/query/show.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f community/storage/oneStageComp.py -N 3 -L 3 -D 1 # diff --git a/tools/shell/src/shellAuto.c b/tools/shell/src/shellAuto.c index e9b9b9e944..b6917e5a76 100644 --- a/tools/shell/src/shellAuto.c +++ b/tools/shell/src/shellAuto.c @@ -105,7 +105,8 @@ SWords shellCommands[] = { {"create or replace aggregate function as outputtype bufsize language ", 0, 0, NULL}, {"create user pass sysinfo 0;", 0, 0, NULL}, {"create user pass sysinfo 1;", 0, 0, NULL}, -#ifdef TD_ENTERPRISE +#ifdef TD_ENTERPRISE + {"create view as select", 0, 0, NULL}, {"compact database ", 0, 0, NULL}, #endif {"describe ", 0, 0, NULL}, @@ -162,13 +163,20 @@ SWords shellCommands[] = { {"show create database \\G;", 0, 0, NULL}, {"show create stable \\G;", 0, 0, NULL}, {"show create table \\G;", 0, 0, NULL}, +#ifdef TD_ENTERPRISE + {"show create view \\G;", 0, 0, NULL}, +#endif {"show connections;", 0, 0, NULL}, + {"show compact", 0, 0, NULL}, + {"show compacts;", 0, 0, NULL}, {"show cluster;", 0, 0, NULL}, {"show cluster alive;", 0, 0, NULL}, + {"show cluster machines;", 0, 0, NULL}, {"show databases;", 0, 0, NULL}, {"show dnodes;", 0, 0, NULL}, {"show dnode variables;", 0, 0, NULL}, {"show functions;", 0, 0, NULL}, + {"show licences;", 0, 0, NULL}, {"show mnodes;", 0, 0, NULL}, {"show queries;", 0, 0, NULL}, // 80 @@ -185,6 +193,7 @@ SWords shellCommands[] = { {"show table distributed ", 0, 0, NULL}, {"show tags from ", 0, 0, NULL}, {"show tags from ", 0, 0, NULL}, + {"show table tags from ", 0, 0, NULL}, {"show topics;", 0, 0, NULL}, {"show transactions;", 0, 0, NULL}, {"show users;", 0, 0, NULL}, @@ -194,6 +203,8 @@ SWords shellCommands[] = { {"show vgroups;", 0, 0, NULL}, {"show consumers;", 0, 0, NULL}, {"show grants;", 0, 0, NULL}, + {"show grants full;", 0, 0, NULL}, + {"show grants logs;", 0, 0, NULL}, #ifdef TD_ENTERPRISE {"split vgroup ", 0, 0, NULL}, #endif @@ -302,6 +313,20 @@ char* key_systable[] = { char* udf_language[] = {"\'Python\'", "\'C\'"}; +// global keys can tips on anywhere +char* global_keys[] = { + "tbname", + "now", + "_wstart", + "_wend", + "_wduration", + "_qstart", + "_qend", + "_qduration", + "_qtag", + "_isfilled" + }; + // // ------- global variant define --------- // @@ -341,8 +366,9 @@ bool waitAutoFill = false; #define WT_VAR_KEYSELECT 20 #define WT_VAR_SYSTABLE 21 #define WT_VAR_LANGUAGE 22 +#define WT_VAR_GLOBALKEYS 23 -#define WT_VAR_CNT 23 +#define WT_VAR_CNT 24 #define WT_TEXT 0xFF @@ -494,10 +520,12 @@ void showHelp() { show connections;\n\ show cluster;\n\ show cluster alive;\n\ + show cluster machines;\n\ show databases;\n\ show dnodes;\n\ show dnode variables;\n\ show functions;\n\ + show licences;\n\ show mnodes;\n\ show queries;\n\ show query ;\n\ @@ -513,6 +541,7 @@ void showHelp() { show table distributed ;\n\ show tags from \n\ show tags from \n\ + show table tags from \n\ show topics;\n\ show transactions;\n\ show users;\n\ @@ -522,6 +551,8 @@ void showHelp() { show vgroups;\n\ show consumers;\n\ show grants;\n\ + show grants full;\n\ + show grants logs;\n\ ----- T ----- \n\ trim database ;\n\ ----- U ----- \n\ @@ -534,8 +565,12 @@ void showHelp() { balance vgroup ;\n\ balance vgroup leader on \n\ compact database ; \n\ + crate view as select ...\n\ redistribute vgroup dnode ;\n\ - split vgroup ;"); + split vgroup ;\n\ + show compacts;\n\ + show compact \n\ + show create view ;"); #endif printf("\n\n"); @@ -699,6 +734,7 @@ bool shellAutoInit() { GenerateVarType(WT_VAR_KEYSELECT, key_select, sizeof(key_select) / sizeof(char*)); GenerateVarType(WT_VAR_SYSTABLE, key_systable, sizeof(key_systable) / sizeof(char*)); GenerateVarType(WT_VAR_LANGUAGE, udf_language, sizeof(udf_language) / sizeof(char*)); + GenerateVarType(WT_VAR_GLOBALKEYS, global_keys, sizeof(global_keys) / sizeof(char*)); return true; } @@ -1800,6 +1836,13 @@ bool matchEnd(TAOS* con, SShellCmd* cmd) { goto _return; } + // global keys + if (fillWithType(con, cmd, last, WT_VAR_GLOBALKEYS)) { + ret = true; + goto _return; + } + + _return: taosMemoryFree(ps); return ret;