diff --git a/source/dnode/mnode/impl/inc/mndTopic.h b/source/dnode/mnode/impl/inc/mndTopic.h index 4becad6da2..7eb53dbdbb 100644 --- a/source/dnode/mnode/impl/inc/mndTopic.h +++ b/source/dnode/mnode/impl/inc/mndTopic.h @@ -37,7 +37,7 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]); int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic); -int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId); +int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char* stbname, int64_t suid, col_id_t colId); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 1306a0252f..682e78acc0 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1144,7 +1144,7 @@ static int32_t mndDropSuperTableTag(SMnode *pMnode, const SStbObj *pOld, SStbObj } col_id_t colId = pOld->pTags[tag].colId; - if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) { + if (mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId) != 0) { return -1; } @@ -1179,7 +1179,7 @@ static int32_t mndAlterStbTagName(SMnode *pMnode, const SStbObj *pOld, SStbObj * } col_id_t colId = pOld->pTags[tag].colId; - if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) { + if (mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId) != 0) { return -1; } @@ -1213,7 +1213,7 @@ static int32_t mndAlterStbTagBytes(SMnode *pMnode, const SStbObj *pOld, SStbObj } col_id_t colId = pOld->pTags[tag].colId; - if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) { + if (mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId) != 0) { return -1; } @@ -1295,7 +1295,7 @@ static int32_t mndDropSuperTableColumn(SMnode *pMnode, const SStbObj *pOld, SStb } col_id_t colId = pOld->pColumns[col].colId; - if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) { + if (mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId) != 0) { return -1; } @@ -1329,7 +1329,7 @@ static int32_t mndAlterStbColumnBytes(SMnode *pMnode, const SStbObj *pOld, SStbO } col_id_t colId = pOld->pColumns[col].colId; - if (mndCheckColAndTagModifiable(pMnode, pOld->uid, colId) != 0) { + if (mndCheckColAndTagModifiable(pMnode, pOld->name, pOld->uid, colId) != 0) { return -1; } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index ea33e0afd4..7acfc95bfc 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -72,13 +72,16 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) { return strchr(topic, '.') + 1; } -int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId) { +int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char* stbname, int64_t suid, col_id_t colId) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; while (1) { SMqTopicObj *pTopic = NULL; pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic); if (pIter == NULL) break; + + mDebug("topic:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, subType:%d sql:%s", + pTopic->name, stbname, suid, colId, pTopic->subType, pTopic->sql); if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) { sdbRelease(pSdb, pTopic); continue; @@ -95,14 +98,20 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId SNode *pNode = NULL; FOREACH(pNode, pNodeList) { SColumnNode *pCol = (SColumnNode *)pNode; - if (pCol->tableId != suid && pTopic->ctbStbUid != suid) goto NEXT; + mDebug("topic:%s, check colId:%d tableId:%" PRId64 " ctbStbUid:%" PRId64, pTopic->name, pCol->colId, pCol->tableId, pTopic->ctbStbUid); + + if (pCol->tableId != suid && pTopic->ctbStbUid != suid) { + mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId); + goto NEXT; + } if (pCol->colId > 0 && pCol->colId == colId) { sdbRelease(pSdb, pTopic); nodesDestroyNode(pAst); terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC; + mError("topic:%s, check colId:%d conflicted", pTopic->name, pCol->colId); return -1; } - mTrace("topic:%s, colId:%d is used", pTopic->name, pCol->colId); + mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId); } NEXT: diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 4673b01d27..217d40e3aa 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -67,8 +67,6 @@ struct STSmaStat { struct SRSmaStat { SSma *pSma; int64_t commitAppliedVer; // vnode applied version for async commit - int64_t commitSubmitVer; // rsma submit version for async commit - int64_t submitVer; // latest submit version int64_t refId; // shared by fetch tasks int8_t triggerStat; // shared by fetch tasks int8_t commitStat; // 0 not in committing, 1 in committing @@ -91,7 +89,6 @@ struct SSmaStat { #define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat) #define RSMA_COMMIT_STAT(r) (&(r)->commitStat) #define RSMA_REF_ID(r) ((r)->refId) -#define RSMA_SUBMIT_VER(r) ((r)->submitVer) struct SRSmaInfoItem { void *taskInfo; // qTaskInfo_t @@ -223,13 +220,6 @@ struct STFInfo { uint32_t ftype; uint32_t fver; int64_t fsize; - - // specific fields - union { - struct { - int64_t submitVer; - } qTaskInfo; - }; }; enum { diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 12dc4e94db..b7c23c8527 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -178,7 +178,6 @@ int32_t smaAsyncPostCommit(SSma* pSma); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); -int64_t tdRSmaGetMaxSubmitVer(SSma* pSma, int8_t level); int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq); int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 2174a479e7..a0e3932245 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -146,7 +146,6 @@ static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) { // step 3: perform persist task for qTaskInfo pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; - pRSmaStat->commitSubmitVer = pRSmaStat->submitVer; tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)); smaDebug("vgId:%d, rsma pre commit success", SMA_VID(pSma)); @@ -317,7 +316,6 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { // step 4: others pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; - pRSmaStat->commitSubmitVer = pRSmaStat->submitVer; return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index e7293da60b..a6fde1e2d2 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -560,17 +560,6 @@ static void tdDestroySDataBlockArray(SArray *pArray) { taosArrayDestroy(pArray); } -int64_t tdRSmaGetMaxSubmitVer(SSma *pSma, int8_t level) { - if (level == TSDB_RETENTION_L0) { - return pSma->pVnode->state.applied; - } - - SSmaEnv *pRSmaEnv = SMA_RSMA_ENV(pSma); - SRSmaStat *pRSmaStat = (SRSmaStat *)(SMA_ENV_STAT(pRSmaEnv)); - - return atomic_load_64(&pRSmaStat->submitVer); -} - static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat, int8_t blkType) { SArray *pResult = NULL; @@ -615,13 +604,16 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche goto _err; } - if (pReq && tdProcessSubmitReq(sinkTsdb, atomic_add_fetch_64(&pStat->submitVer, 1), pReq) < 0) { + if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) { taosMemoryFreeClear(pReq); smaError("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), suid, pItem->level, terrstr()); goto _err; } + smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " version:%"PRIi64, SMA_VID(pSma), + suid, pItem->level, output->info.version); + taosMemoryFreeClear(pReq); taosArrayClear(pResult); } else if (terrno == 0) { @@ -908,12 +900,8 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) { goto _err; } - ASSERT(tFileInfo.qTaskInfo.submitVer > 0); - SSmaEnv *pRSmaEnv = pSma->pRSmaEnv; SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pRSmaEnv); - atomic_store_64(&pRSmaStat->submitVer, tFileInfo.qTaskInfo.submitVer); - smaDebug("%s:%d tFileInfo.qTaskInfo.submitVer = %" PRIi64, __func__, __LINE__, tFileInfo.qTaskInfo.submitVer); SRSmaQTaskInfoIter fIter = {0}; if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) { @@ -1266,7 +1254,6 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { } if (isFileCreated) { - tFile.info.qTaskInfo.submitVer = atomic_load_64(&pRSmaStat->commitSubmitVer); if (tdUpdateTFileHeader(&tFile) < 0) { smaError("vgId:%d, rsma, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile), tstrerror(terrno)); @@ -1346,6 +1333,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { tdRSmaFetchAndSubmitResult(pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, STREAM_INPUT__DATA_BLOCK); tdUnRefRSmaInfo(pSma, pRSmaInfo); + // atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); + // taosTmrReset(tdRSmaFetchTrigger, 5000, pItem, smaMgmt.tmrHandle, &pItem->tmrId); } break; case TASK_TRIGGER_STAT_PAUSED: { smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused", diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index 13befabed5..9dce166afa 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -32,9 +32,6 @@ static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo) { tlen += taosEncodeFixedU32(buf, pInfo->ftype); tlen += taosEncodeFixedU32(buf, pInfo->fver); tlen += taosEncodeFixedI64(buf, pInfo->fsize); - if (pInfo->ftype == TD_FTYPE_RSMA_QTASKINFO) { - tlen += taosEncodeFixedI64(buf, pInfo->qTaskInfo.submitVer); - } return tlen; } @@ -44,10 +41,6 @@ static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) { buf = taosDecodeFixedU32(buf, &(pInfo->ftype)); buf = taosDecodeFixedU32(buf, &(pInfo->fver)); buf = taosDecodeFixedI64(buf, &(pInfo->fsize)); - // specific - if (pInfo->ftype == TD_FTYPE_RSMA_QTASKINFO) { - buf = taosDecodeFixedI64(buf, &(pInfo->qTaskInfo.submitVer)); - } return buf; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 26ced6cf6a..cec714e0ee 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2265,10 +2265,6 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) { int64_t startVer = (pCond->startVersion == -1) ? 0 : pCond->startVersion; - if (VND_IS_RSMA(pVnode)) { - return (SVersionRange){.minVer = startVer, .maxVer = tdRSmaGetMaxSubmitVer(pVnode->pSma, level)}; - } - int64_t endVer = 0; if (pCond->endVersion == -1) { // user not specified end version, set current maximum version of vnode as the endVersion diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 66b3093a8d..f1f7b3cfbc 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3880,11 +3880,11 @@ int32_t elapsedFunction(SqlFunctionCtx* pCtx) { if (pCtx->end.key != INT64_MIN) { pInfo->min = pCtx->end.key; } else { - pInfo->min = ptsList[0]; + pInfo->min = ptsList[start]; } } else { if (pCtx->start.key == INT64_MIN) { - pInfo->min = (pInfo->min > ptsList[0]) ? ptsList[0] : pInfo->min; + pInfo->min = (pInfo->min > ptsList[start]) ? ptsList[start] : pInfo->min; } else { pInfo->min = pCtx->start.key; } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index de9f16f171..6a30f32f2b 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -787,6 +787,14 @@ static int32_t createFillLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect return code; } +static bool isPrimaryKeySort(SNodeList* pOrderByList) { + SNode* pExpr = ((SOrderByExprNode*)nodesListGetNode(pOrderByList, 0))->pExpr; + if (QUERY_NODE_COLUMN != nodeType(pExpr)) { + return false; + } + return PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pExpr)->colId; +} + static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) { if (NULL == pSelect->pOrderByList) { return TSDB_CODE_SUCCESS; @@ -800,7 +808,9 @@ static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect pSort->groupSort = pSelect->groupSort; pSort->node.groupAction = pSort->groupSort ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR; pSort->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; - pSort->node.resultDataOrder = pSort->groupSort ? DATA_ORDER_LEVEL_IN_GROUP : DATA_ORDER_LEVEL_GLOBAL; + pSort->node.resultDataOrder = isPrimaryKeySort(pSelect->pOrderByList) + ? (pSort->groupSort ? DATA_ORDER_LEVEL_IN_GROUP : DATA_ORDER_LEVEL_GLOBAL) + : DATA_ORDER_LEVEL_NONE; int32_t code = nodesCollectColumns(pSelect, SQL_CLAUSE_ORDER_BY, NULL, COLLECT_COL_TYPE_ALL, &pSort->node.pTargets); if (TSDB_CODE_SUCCESS == code && NULL == pSort->node.pTargets) { diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 9e844dd6a2..8586234b7e 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -215,6 +215,14 @@ static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) { return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild)); } +static bool stbSplIsMultiTbScanChild(bool streamQuery, SLogicNode* pNode) { + if (1 != LIST_LENGTH(pNode->pChildren)) { + return false; + } + SNode* pChild = nodesListGetNode(pNode->pChildren, 0); + return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild)); +} + static bool stbSplNeedSplitWindow(bool streamQuery, SLogicNode* pNode) { SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode; if (WINDOW_TYPE_INTERVAL == pWindow->winType) { @@ -247,7 +255,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { case QUERY_NODE_LOGIC_PLAN_JOIN: return !(((SJoinLogicNode*)pNode)->isSingleTableJoin); case QUERY_NODE_LOGIC_PLAN_PARTITION: - return stbSplHasMultiTbScan(streamQuery, pNode); + return stbSplIsMultiTbScanChild(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_AGG: return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); case QUERY_NODE_LOGIC_PLAN_WINDOW: @@ -969,8 +977,28 @@ static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) return code; } +static int32_t stbSplCreateMergeKeysForPartitionNode(SLogicNode* pPart, SNodeList** pMergeKeys) { + SNode* pPrimaryKey = + nodesCloneNode(stbSplFindPrimaryKeyFromScan((SScanLogicNode*)nodesListGetNode(pPart->pChildren, 0))); + if (NULL == pPrimaryKey) { + return TSDB_CODE_OUT_OF_MEMORY; + } + int32_t code = nodesListAppend(pPart->pTargets, pPrimaryKey); + if (TSDB_CODE_SUCCESS == code) { + code = stbSplCreateMergeKeysByPrimaryKey(pPrimaryKey, pMergeKeys); + } + return code; +} + static int32_t stbSplSplitPartitionNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { - int32_t code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, NULL, pInfo->pSplitNode, true); + int32_t code = TSDB_CODE_SUCCESS; + SNodeList* pMergeKeys = NULL; + if (pInfo->pSplitNode->requireDataOrder >= DATA_ORDER_LEVEL_IN_GROUP) { + code = stbSplCreateMergeKeysForPartitionNode(pInfo->pSplitNode, &pMergeKeys); + } + if (TSDB_CODE_SUCCESS == code) { + code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pInfo->pSplitNode, pMergeKeys, pInfo->pSplitNode, true); + } if (TSDB_CODE_SUCCESS == code) { code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, pInfo->pSplitNode, SPLIT_FLAG_STABLE_SPLIT)); diff --git a/source/libs/planner/test/planPartByTest.cpp b/source/libs/planner/test/planPartByTest.cpp index 256960a15e..48ab1e1ac2 100644 --- a/source/libs/planner/test/planPartByTest.cpp +++ b/source/libs/planner/test/planPartByTest.cpp @@ -67,6 +67,8 @@ TEST_F(PlanPartitionByTest, withTimeLineFunc) { useDb("root", "test"); run("SELECT TWA(c1) FROM st1 PARTITION BY c1"); + + run("SELECT MAVG(c1, 2) FROM st1 PARTITION BY c1"); } TEST_F(PlanPartitionByTest, withSlimit) { diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 41ca72dc7b..29773b95c3 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -3671,6 +3671,22 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) { SNode *t = node->pLeft; node->pLeft = node->pRight; node->pRight = t; + switch (node->opType) { + case OP_TYPE_GREATER_THAN: + node->opType = OP_TYPE_LOWER_THAN; + break; + case OP_TYPE_LOWER_THAN: + node->opType = OP_TYPE_GREATER_THAN; + break; + case OP_TYPE_GREATER_EQUAL: + node->opType = OP_TYPE_LOWER_EQUAL; + break; + case OP_TYPE_LOWER_EQUAL: + node->opType = OP_TYPE_GREATER_EQUAL; + break; + default: + break; + } } if (OP_TYPE_IN == node->opType && QUERY_NODE_NODE_LIST != nodeType(node->pRight)) { diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 3fa6344009..9fd4e483c2 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -396,6 +396,7 @@ typedef struct SDelayQueue { int transDQCreate(uv_loop_t* loop, SDelayQueue** queue); void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)); SDelayTask* transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); +void transDQCancel(SDelayQueue* queue, SDelayTask* task); bool transEpSetIsEqual(SEpSet* a, SEpSet* b); /* diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 9de8c273d9..638067e7ef 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -27,7 +27,6 @@ typedef struct SCliConn { SConnBuffer readBuf; STransQueue cliMsgs; queue q; - uint64_t expireTime; STransCtx ctx; bool broken; // link broken or not @@ -37,6 +36,7 @@ typedef struct SCliConn { char* ip; uint32_t port; + SDelayTask* task; // debug and log info struct sockaddr_in addr; struct sockaddr_in localAddr; @@ -65,6 +65,7 @@ typedef struct SCliThrd { queue msg; TdThreadMutex msgMtx; SDelayQueue* delayQueue; + SDelayQueue* timeoutQueue; uint64_t nextTimeout; // next timeout void* pTransInst; // @@ -92,9 +93,10 @@ static void* createConnPool(int size); static void* destroyConnPool(void* pool); static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port); static void addConnToPool(void* pool, SCliConn* conn); +static void doCloseIdleConn(void* param); // register timer in each thread to clear expire conn -static void cliTimeoutCb(uv_timer_t* handle); +// static void cliTimeoutCb(uv_timer_t* handle); // alloc buf for recv static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); // callback after read nbytes from socket @@ -184,7 +186,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { pThrd = (SCliThrd*)(exh)->pThrd; \ } \ } while (0) -#define CONN_PERSIST_TIME(para) (para * 1000 * 10) +#define CONN_PERSIST_TIME(para) ((para) == 0 ? 3 * 1000 : (para)) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) #define CONN_SHOULD_RELEASE(conn, head) \ @@ -384,10 +386,6 @@ void cliHandleResp(SCliConn* conn) { } uv_read_start((uv_stream_t*)conn->stream, cliAllocRecvBufferCb, cliRecvCb); - // start thread's timer of conn pool if not active - if (!uv_is_active((uv_handle_t*)&pThrd->timer) && pTransInst->idleTime > 0) { - // uv_timer_start((uv_timer_t*)&pThrd->timer, cliTimeoutCb, CONN_PERSIST_TIME(pRpc->idleTime) / 2, 0); - } } void cliHandleExcept(SCliConn* pConn) { @@ -441,30 +439,30 @@ void cliHandleExcept(SCliConn* pConn) { transUnrefCliHandle(pConn); } -void cliTimeoutCb(uv_timer_t* handle) { - SCliThrd* pThrd = handle->data; - STrans* pTransInst = pThrd->pTransInst; - int64_t currentTime = pThrd->nextTimeout; - tTrace("%s conn timeout, try to remove expire conn from conn pool", pTransInst->label); - - SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL); - while (p != NULL) { - while (!QUEUE_IS_EMPTY(&p->conn)) { - queue* h = QUEUE_HEAD(&p->conn); - SCliConn* c = QUEUE_DATA(h, SCliConn, q); - if (c->expireTime < currentTime) { - QUEUE_REMOVE(h); - transUnrefCliHandle(c); - } else { - break; - } - } - p = taosHashIterate((SHashObj*)pThrd->pool, p); - } - - pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); - uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pTransInst->idleTime) / 2, 0); -} +// void cliTimeoutCb(uv_timer_t* handle) { +// SCliThrd* pThrd = handle->data; +// STrans* pTransInst = pThrd->pTransInst; +// int64_t currentTime = pThrd->nextTimeout; +// tTrace("%s conn timeout, try to remove expire conn from conn pool", pTransInst->label); +// +// SConnList* p = taosHashIterate((SHashObj*)pThrd->pool, NULL); +// while (p != NULL) { +// while (!QUEUE_IS_EMPTY(&p->conn)) { +// queue* h = QUEUE_HEAD(&p->conn); +// SCliConn* c = QUEUE_DATA(h, SCliConn, q); +// if (c->expireTime < currentTime) { +// QUEUE_REMOVE(h); +// transUnrefCliHandle(c); +// } else { +// break; +// } +// } +// p = taosHashIterate((SHashObj*)pThrd->pool, p); +// } +// +// pThrd->nextTimeout = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); +// uv_timer_start(handle, cliTimeoutCb, CONN_PERSIST_TIME(pTransInst->idleTime) / 2, 0); +// } void* createConnPool(int size) { // thread local, no lock @@ -506,6 +504,10 @@ static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { QUEUE_REMOVE(&conn->q); QUEUE_INIT(&conn->q); assert(h == &conn->q); + + transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); + conn->task = NULL; + return conn; } static int32_t allocConnRef(SCliConn* conn, bool update) { @@ -537,6 +539,7 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) { transReleaseExHandle(transGetRefMgt(), handle); return 0; } + static void addConnToPool(void* pool, SCliConn* conn) { if (conn->status == ConnInPool) { return; @@ -547,7 +550,6 @@ static void addConnToPool(void* pool, SCliConn* conn) { allocConnRef(conn, true); STrans* pTransInst = thrd->pTransInst; - conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); cliReleaseUnfinishedMsg(conn); transQueueClear(&conn->cliMsgs); transCtxCleanup(&conn->ctx); @@ -562,7 +564,13 @@ static void addConnToPool(void* pool, SCliConn* conn) { assert(plist != NULL); QUEUE_INIT(&conn->q); QUEUE_PUSH(&plist->conn, &conn->q); + assert(!QUEUE_IS_EMPTY(&plist->conn)); + + STaskArg* arg = taosMemoryCalloc(1, sizeof(STaskArg)); + arg->param1 = conn; + arg->param2 = thrd; + conn->task = transDQSched(thrd->timeoutQueue, doCloseIdleConn, arg, CONN_PERSIST_TIME(pTransInst->idleTime)); } static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SCliConn* conn = handle->data; @@ -631,6 +639,8 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { transRemoveExHandle(transGetRefMgt(), conn->refId); conn->refId = -1; + if (conn->task != NULL) transDQCancel(((SCliThrd*)conn->hostThrd)->timeoutQueue, conn->task); + if (clear) { if (!uv_is_closing((uv_handle_t*)conn->stream)) { uv_read_stop(conn->stream); @@ -997,6 +1007,8 @@ static SCliThrd* createThrdObj() { pThrd->pool = createConnPool(4); transDQCreate(pThrd->loop, &pThrd->delayQueue); + transDQCreate(pThrd->loop, &pThrd->timeoutQueue); + pThrd->quit = false; return pThrd; } @@ -1012,6 +1024,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { transAsyncPoolDestroy(pThrd->asyncPool); transDQDestroy(pThrd->delayQueue, destroyCmsg); + transDQDestroy(pThrd->timeoutQueue, NULL); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); } @@ -1058,6 +1071,10 @@ static void doCloseIdleConn(void* param) { STaskArg* arg = param; SCliConn* conn = arg->param1; SCliThrd* pThrd = arg->param2; + tTrace("%s conn %p idle, close it", CONN_GET_INST_LABEL(conn), conn); + conn->task = NULL; + cliDestroyConn(conn, true); + taosMemoryFree(arg); } static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { @@ -1248,11 +1265,17 @@ int transReleaseCliHandle(void* handle) { if (pThrd == NULL) { return -1; } + STransMsg tmsg = {.info.handle = handle}; - SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg)); + TRACE_SET_MSGID(&tmsg.info.traceId, tGenIdPI64()); + + SCliMsg* cmsg = taosMemoryCalloc(1, sizeof(SCliMsg)); cmsg->msg = tmsg; cmsg->type = Release; + STraceId* trace = &tmsg.info.traceId; + tGDebug("send release request at thread:%08" PRId64 "", pThrd->pid); + if (0 != transAsyncSend(pThrd->asyncPool, &cmsg->q)) { return -1; } diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 34849df2b2..6ac75a75e1 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -480,7 +480,7 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) { SDelayTask* task = container_of(minNode, SDelayTask, node); STaskArg* arg = task->arg; - freeFunc(arg->param1); + if (freeFunc) freeFunc(arg->param1); taosMemoryFree(arg); taosMemoryFree(task); @@ -491,9 +491,16 @@ void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) { void transDQCancel(SDelayQueue* queue, SDelayTask* task) { uv_timer_stop(queue->timer); - if (heapSize(queue->heap) <= 0) return; + if (heapSize(queue->heap) <= 0) { + taosMemoryFree(task->arg); + taosMemoryFree(task); + return; + } heapRemove(queue->heap, &task->node); + taosMemoryFree(task->arg); + taosMemoryFree(task); + if (heapSize(queue->heap) != 0) { HeapNode* minNode = heapMin(queue->heap); if (minNode != NULL) return; diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 3fb947bdba..9b89847477 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -149,32 +149,34 @@ static void* transAcceptThread(void* arg); static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName); static bool addHandleToAcceptloop(void* arg); -#define CONN_SHOULD_RELEASE(conn, head) \ - do { \ - if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ - conn->status = ConnRelease; \ - transClearBuffer(&conn->readBuf); \ - transFreeMsg(transContFromHead((char*)head)); \ - tTrace("conn %p received release request", conn); \ - \ - STransMsg tmsg = {.code = 0, .info.handle = (void*)conn, .info.ahandle = NULL}; \ - SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); \ - srvMsg->msg = tmsg; \ - srvMsg->type = Release; \ - srvMsg->pConn = conn; \ - reallocConnRef(conn); \ - if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ - return; \ - } \ - if (conn->regArg.init) { \ - tTrace("conn %p release, notify server app", conn); \ - STrans* pTransInst = conn->pTransInst; \ - (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \ - memset(&conn->regArg, 0, sizeof(conn->regArg)); \ - } \ - uvStartSendRespInternal(srvMsg); \ - return; \ - } \ +#define CONN_SHOULD_RELEASE(conn, head) \ + do { \ + if ((head)->release == 1 && (head->msgLen) == sizeof(*head)) { \ + reallocConnRef(conn); \ + tTrace("conn %p received release request", conn); \ + \ + STraceId traceId = head->traceId; \ + conn->status = ConnRelease; \ + transClearBuffer(&conn->readBuf); \ + transFreeMsg(transContFromHead((char*)head)); \ + \ + STransMsg tmsg = {.code = 0, .info.handle = (void*)conn, .info.traceId = traceId, .info.ahandle = NULL}; \ + SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); \ + srvMsg->msg = tmsg; \ + srvMsg->type = Release; \ + srvMsg->pConn = conn; \ + if (!transQueuePush(&conn->srvMsgs, srvMsg)) { \ + return; \ + } \ + if (conn->regArg.init) { \ + tTrace("conn %p release, notify server app", conn); \ + STrans* pTransInst = conn->pTransInst; \ + (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); \ + memset(&conn->regArg, 0, sizeof(conn->regArg)); \ + } \ + uvStartSendRespInternal(srvMsg); \ + return; \ + } \ } while (0) #define SRV_RELEASE_UV(loop) \ diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 079a532131..543b1e2b34 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -62,7 +62,7 @@ python3 ./test.py -f 2-query/char_length.py -R python3 ./test.py -f 2-query/check_tsdb.py python3 ./test.py -f 2-query/check_tsdb.py -R -# python3 ./test.py -f 1-insert/update_data.py +python3 ./test.py -f 1-insert/update_data.py python3 ./test.py -f 1-insert/delete_data.py python3 ./test.py -f 2-query/db.py @@ -222,7 +222,7 @@ python3 ./test.py -f 7-tmq/tmqDropStbCtb.py python3 ./test.py -f 7-tmq/tmqDropNtb.py python3 ./test.py -f 7-tmq/tmqUdf.py # python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot0.py -# python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot1.py +python3 ./test.py -f 7-tmq/tmqUdf-multCtb-snapshot1.py python3 ./test.py -f 7-tmq/stbTagFilter-1ctb.py # python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py diff --git a/tools/shell/inc/shellInt.h b/tools/shell/inc/shellInt.h index 825866e163..358377f804 100644 --- a/tools/shell/inc/shellInt.h +++ b/tools/shell/inc/shellInt.h @@ -26,6 +26,10 @@ #include "ttypes.h" #include "tutil.h" +#ifdef WEBSOCKET +#include "taosws.h" +#endif + #define SHELL_MAX_HISTORY_SIZE 1000 #define SHELL_MAX_COMMAND_SIZE 1048586 #define SHELL_HISTORY_FILE ".taos_history" @@ -67,6 +71,12 @@ typedef struct { int32_t pktNum; int32_t displayWidth; int32_t abort; +#ifdef WEBSOCKET + bool restful; + bool cloud; + char* dsn; + int32_t timeout; +#endif } SShellArgs; typedef struct { @@ -85,6 +95,10 @@ typedef struct { TAOS* conn; TdThread pid; tsem_t cancelSem; +#ifdef WEBSOCKET + WS_TAOS* ws_conn; + bool stop_query; +#endif } SShellObj; // shellArguments.c @@ -95,7 +109,10 @@ int32_t shellReadCommand(char* command); // shellEngine.c int32_t shellExecute(); - +int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision); +void shellPrintHeader(TAOS_FIELD *fields, int32_t *width, int32_t num_fields); +void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t length, int32_t precision); +void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, int32_t length, int32_t precision); // shellUtil.c int32_t shellCheckIntSize(); void shellPrintVersion(); @@ -109,6 +126,14 @@ void shellExit(); // shellNettest.c void shellTestNetWork(); +#ifdef WEBSOCKET +void shellCheckConnectMode(); +// shellWebsocket.c +int shell_conn_ws_server(bool first); +int32_t shell_run_websocket(); +void shellRunSingleCommandWebsocketImp(char *command); +#endif + // shellMain.c extern SShellObj shell; diff --git a/tools/shell/src/shellArguments.c b/tools/shell/src/shellArguments.c index 466aa52390..88ef46c5d6 100644 --- a/tools/shell/src/shellArguments.c +++ b/tools/shell/src/shellArguments.c @@ -43,6 +43,12 @@ #define SHELL_VERSION "Print program version." #define SHELL_EMAIL "" +#ifdef WEBSOCKET +#define SHELL_DSN "The dsn to use when connecting to cloud server." +#define SHELL_REST "Use restful mode when connecting." +#define SHELL_TIMEOUT "Set the timeout for websocket query in seconds, default is 10." +#endif + static int32_t shellParseSingleOpt(int32_t key, char *arg); void shellPrintHelp() { @@ -65,6 +71,11 @@ void shellPrintHelp() { printf("%s%s%s%s\r\n", indent, "-s,", indent, SHELL_CMD); printf("%s%s%s%s\r\n", indent, "-t,", indent, SHELL_STARTUP); printf("%s%s%s%s\r\n", indent, "-u,", indent, SHELL_USER); +#ifdef WEBSOCKET + printf("%s%s%s%s\r\n", indent, "-E,", indent, SHELL_DSN); + printf("%s%s%s%s\r\n", indent, "-R,", indent, SHELL_REST); + printf("%s%s%s%s\r\n", indent, "-T,", indent, SHELL_TIMEOUT); +#endif printf("%s%s%s%s\r\n", indent, "-w,", indent, SHELL_WIDTH); printf("%s%s%s%s\r\n", indent, "-V,", indent, SHELL_VERSION); printf("\r\n\r\nReport bugs to %s.\r\n", SHELL_EMAIL); @@ -95,6 +106,11 @@ static struct argp_option shellOptions[] = { {"display-width", 'w', "WIDTH", 0, SHELL_WIDTH}, {"netrole", 'n', "NETROLE", 0, SHELL_NET_ROLE}, {"pktlen", 'l', "PKTLEN", 0, SHELL_PKG_LEN}, +#ifdef WEBSOCKET + {"dsn", 'E', "DSN", 0, SHELL_DSN}, + {"restful", 'R', 0, 0, SHELL_REST}, + {"timeout", 'T', "SECONDS", 0, SHELL_TIMEOUT}, +#endif {"pktnum", 'N', "PKTNUM", 0, SHELL_PKT_NUM}, {0}, }; @@ -120,9 +136,15 @@ static int32_t shellParseSingleOpt(int32_t key, char *arg) { switch (key) { case 'h': pArgs->host = arg; +#ifdef WEBSOCKET + pArgs->cloud = false; +#endif break; case 'P': pArgs->port = atoi(arg); +#ifdef WEBSOCKET + pArgs->cloud = false; +#endif if (pArgs->port == 0) pArgs->port = -1; break; case 'u': @@ -137,6 +159,9 @@ static int32_t shellParseSingleOpt(int32_t key, char *arg) { pArgs->is_gen_auth = true; break; case 'c': +#ifdef WEBSOCKET + pArgs->cloud = false; +#endif pArgs->cfgdir = arg; break; case 'C': @@ -172,6 +197,18 @@ static int32_t shellParseSingleOpt(int32_t key, char *arg) { case 'N': pArgs->pktNum = atoi(arg); break; +#ifdef WEBSOCKET + case 'R': + pArgs->restful = true; + break; + case 'E': + pArgs->dsn = arg; + pArgs->cloud = true; + break; + case 'T': + pArgs->timeout = atoi(arg); + break; +#endif case 'V': pArgs->is_version = true; break; @@ -208,7 +245,11 @@ int32_t shellParseArgsWithoutArgp(int argc, char *argv[]) { } if (key[1] == 'h' || key[1] == 'P' || key[1] == 'u' || key[1] == 'a' || key[1] == 'c' || key[1] == 's' || - key[1] == 'f' || key[1] == 'd' || key[1] == 'w' || key[1] == 'n' || key[1] == 'l' || key[1] == 'N') { + key[1] == 'f' || key[1] == 'd' || key[1] == 'w' || key[1] == 'n' || key[1] == 'l' || key[1] == 'N' +#ifdef WEBSOCKET + || key[1] == 'E' || key[1] == 'T' +#endif + ) { if (i + 1 >= argc) { fprintf(stderr, "option %s requires an argument\r\n", key); return -1; @@ -221,7 +262,11 @@ int32_t shellParseArgsWithoutArgp(int argc, char *argv[]) { shellParseSingleOpt(key[1], val); i++; } else if (key[1] == 'p' || key[1] == 'A' || key[1] == 'C' || key[1] == 'r' || key[1] == 'k' || - key[1] == 't' || key[1] == 'V' || key[1] == '?' || key[1] == 1) { + key[1] == 't' || key[1] == 'V' || key[1] == '?' || key[1] == 1 +#ifdef WEBSOCKET + ||key[1] == 'R' +#endif + ) { shellParseSingleOpt(key[1], NULL); } else { fprintf(stderr, "invalid option %s\r\n", key); diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 56bc1ed6cc..a2310ea9c9 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -25,14 +25,9 @@ static int32_t shellRunSingleCommand(char *command); static int32_t shellRunCommand(char *command); static void shellRunSingleCommandImp(char *command); static char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision); -static void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, int32_t length, - int32_t precision); static int32_t shellDumpResultToFile(const char *fname, TAOS_RES *tres); static void shellPrintNChar(const char *str, int32_t length, int32_t width); -static void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t length, int32_t precision); static int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql); -static int32_t shellCalcColWidth(TAOS_FIELD *field, int32_t precision); -static void shellPrintHeader(TAOS_FIELD *fields, int32_t *width, int32_t num_fields); static int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql); static int32_t shellDumpResult(TAOS_RES *tres, char *fname, int32_t *error_no, bool vertical, const char *sql); static void shellReadHistory(); @@ -94,8 +89,15 @@ int32_t shellRunSingleCommand(char *command) { shellSourceFile(c_ptr); return 0; } - - shellRunSingleCommandImp(command); +#ifdef WEBSOCKET + if (shell.args.restful || shell.args.cloud) { + shellRunSingleCommandWebsocketImp(command); + } else { +#endif + shellRunSingleCommandImp(command); +#ifdef WEBSOCKET + } +#endif return 0; } @@ -937,7 +939,16 @@ void *shellCancelHandler(void *arg) { taosMsleep(10); continue; } - taos_kill_query(shell.conn); + +#ifdef WEBSOCKET + if (shell.args.restful || shell.args.cloud) { + shell.stop_query = true; + } else { +#endif + taos_kill_query(shell.conn); +#ifdef WEBSOCKET + } +#endif #ifdef WINDOWS printf("\n%s", shell.info.promptHeader); #endif @@ -981,16 +992,26 @@ int32_t shellExecute() { fflush(stdout); SShellArgs *pArgs = &shell.args; - if (shell.args.auth == NULL) { - shell.conn = taos_connect(pArgs->host, pArgs->user, pArgs->password, pArgs->database, pArgs->port); +#ifdef WEBSOCKET + if (shell.args.restful || shell.args.cloud) { + if (shell_conn_ws_server(1)) { + return -1; + } } else { - shell.conn = taos_connect_auth(pArgs->host, pArgs->user, pArgs->auth, pArgs->database, pArgs->port); - } +#endif + if (shell.args.auth == NULL) { + shell.conn = taos_connect(pArgs->host, pArgs->user, pArgs->password, pArgs->database, pArgs->port); + } else { + shell.conn = taos_connect_auth(pArgs->host, pArgs->user, pArgs->auth, pArgs->database, pArgs->port); + } - if (shell.conn == NULL) { - fflush(stdout); - return -1; + if (shell.conn == NULL) { + fflush(stdout); + return -1; + } +#ifdef WEBSOCKET } +#endif shellReadHistory(); @@ -1005,8 +1026,16 @@ int32_t shellExecute() { if (pArgs->file[0] != 0) { shellSourceFile(pArgs->file); } +#ifdef WEBSOCKET + if (shell.args.restful || shell.args.cloud) { + ws_close(shell.ws_conn); + } else { +#endif + taos_close(shell.conn); +#ifdef WEBSOCKET + } +#endif - taos_close(shell.conn); shellWriteHistory(); shellCleanupHistory(); return 0; @@ -1026,10 +1055,15 @@ int32_t shellExecute() { taosSetSignal(SIGINT, shellQueryInterruptHandler); - shellGetGrantInfo(); - +#ifdef WEBSOCKET + if (!shell.args.restful && !shell.args.cloud) { +#endif + shellGetGrantInfo(); +#ifdef WEBSOCKET + } +#endif while (1) { - taosThreadCreate(&shell.pid, NULL, shellThreadLoop, shell.conn); + taosThreadCreate(&shell.pid, NULL, shellThreadLoop, NULL); taosThreadJoin(shell.pid, NULL); taosThreadClear(&shell.pid); } diff --git a/tools/shell/src/shellMain.c b/tools/shell/src/shellMain.c index 6672cee367..703533f8a9 100644 --- a/tools/shell/src/shellMain.c +++ b/tools/shell/src/shellMain.c @@ -19,6 +19,11 @@ SShellObj shell = {0}; int main(int argc, char *argv[]) { +#ifdef WEBSOCKET + shell.args.timeout = 10; + shell.args.cloud = true; +#endif + if (shellCheckIntSize() != 0) { return -1; } @@ -41,7 +46,9 @@ int main(int argc, char *argv[]) { shellPrintHelp(); return 0; } - +#ifdef WEBSOCKET + shellCheckConnectMode(); +#endif taos_init(); if (shell.args.is_dump_config) { diff --git a/tools/shell/src/shellUtil.c b/tools/shell/src/shellUtil.c index e96e3d3619..e5e61e0b24 100644 --- a/tools/shell/src/shellUtil.c +++ b/tools/shell/src/shellUtil.c @@ -121,6 +121,36 @@ void shellCheckServerStatus() { } } while (1); } +#ifdef WEBSOCKET +void shellCheckConnectMode() { + if (shell.args.dsn) { + shell.args.cloud = true; + shell.args.restful = false; + return; + } + if (shell.args.cloud) { + shell.args.dsn = getenv("TDENGINE_CLOUD_DSN"); + if (shell.args.dsn) { + shell.args.cloud = true; + shell.args.restful = false; + return; + } + if (shell.args.restful) { + if (!shell.args.host) { + shell.args.host = "localhost"; + } + if (!shell.args.port) { + shell.args.port = 6041; + } + shell.args.dsn = taosMemoryCalloc(1, 1024); + snprintf(shell.args.dsn, 1024, "ws://%s:%d/rest/ws", + shell.args.host, shell.args.port); + } + shell.args.cloud = false; + return; + } +} +#endif void shellExit() { if (shell.conn != NULL) { @@ -129,4 +159,4 @@ void shellExit() { } taos_cleanup(); exit(EXIT_FAILURE); -} \ No newline at end of file +} diff --git a/tools/shell/src/shellWebsocket.c b/tools/shell/src/shellWebsocket.c new file mode 100644 index 0000000000..fee2325c34 --- /dev/null +++ b/tools/shell/src/shellWebsocket.c @@ -0,0 +1,260 @@ + +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifdef WEBSOCKET +#include "taosws.h" +#include "shellInt.h" + +int shell_conn_ws_server(bool first) { + shell.ws_conn = ws_connect_with_dsn(shell.args.dsn); + if (!shell.ws_conn) { + fprintf(stderr, "failed to connect %s, reason: %s\n", + shell.args.dsn, ws_errstr(NULL)); + return -1; + } + if (first && shell.args.restful) { + fprintf(stdout, "successfully connect to %s\n\n", + shell.args.dsn); + } else if (first && shell.args.cloud) { + fprintf(stdout, "successfully connect to cloud service\n"); + } + return 0; +} + +static int horizontalPrintWebsocket(WS_RES* wres) { + const void* data = NULL; + int rows; + ws_fetch_block(wres, &data, &rows); + if (!rows) { + return 0; + } + int num_fields = ws_field_count(wres); + TAOS_FIELD* fields = (TAOS_FIELD*)ws_fetch_fields(wres); + int precision = ws_result_precision(wres); + + int width[TSDB_MAX_COLUMNS]; + for (int col = 0; col < num_fields; col++) { + width[col] = shellCalcColWidth(fields + col, precision); + } + + shellPrintHeader(fields, width, num_fields); + + int numOfRows = 0; + do { + uint8_t ty; + uint32_t len; + for (int i = 0; i < rows; i++) { + for (int j = 0; j < num_fields; j++) { + putchar(' '); + const void *value = ws_get_value_in_block(wres, i, j, &ty, &len); + shellPrintField((const char*)value, fields+j, width[j], len, precision); + putchar(' '); + putchar('|'); + } + putchar('\r'); + putchar('\n'); + } + numOfRows += rows; + ws_fetch_block(wres, &data, &rows); + } while (rows && !shell.stop_query); + return numOfRows; +} + +static int verticalPrintWebsocket(WS_RES* wres) { + int rows = 0; + const void* data = NULL; + ws_fetch_block(wres, &data, &rows); + if (!rows) { + return 0; + } + int num_fields = ws_field_count(wres); + TAOS_FIELD* fields = (TAOS_FIELD*)ws_fetch_fields(wres); + int precision = ws_result_precision(wres); + + int maxColNameLen = 0; + for (int col = 0; col < num_fields; col++) { + int len = (int)strlen(fields[col].name); + if (len > maxColNameLen) { + maxColNameLen = len; + } + } + int numOfRows = 0; + do { + uint8_t ty; + uint32_t len; + for (int i = 0; i < rows; i++) { + printf("*************************** %d.row ***************************\n", + numOfRows + 1); + for (int j = 0; j < num_fields; j++) { + TAOS_FIELD* field = fields + j; + int padding = (int)(maxColNameLen - strlen(field->name)); + printf("%*.s%s: ", padding, " ", field->name); + const void *value = ws_get_value_in_block(wres, i, j, &ty, &len); + shellPrintField((const char*)value, field, 0, len, precision); + putchar('\n'); + } + numOfRows++; + } + ws_fetch_block(wres, &data, &rows); + } while (rows && !shell.stop_query); + return numOfRows; +} + +static int dumpWebsocketToFile(const char* fname, WS_RES* wres) { + char fullname[PATH_MAX] = {0}; + if (taosExpandDir(fname, fullname, PATH_MAX) != 0) { + tstrncpy(fullname, fname, PATH_MAX); + } + + TdFilePtr pFile = taosOpenFile(fullname, + TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); + if (pFile == NULL) { + fprintf(stderr, "failed to open file: %s\r\n", fullname); + return -1; + } + int rows = 0; + const void* data = NULL; + ws_fetch_block(wres, &data, &rows); + if (!rows) { + taosCloseFile(&pFile); + return 0; + } + int numOfRows = 0; + TAOS_FIELD* fields = (TAOS_FIELD*)ws_fetch_fields(wres); + int num_fields = ws_field_count(wres); + int precision = ws_result_precision(wres); + for (int col = 0; col < num_fields; col++) { + if (col > 0) { + taosFprintfFile(pFile, ","); + } + taosFprintfFile(pFile, "%s", fields[col].name); + } + taosFprintfFile(pFile, "\r\n"); + do { + uint8_t ty; + uint32_t len; + numOfRows += rows; + for (int i = 0; i < rows; i++) { + for (int j = 0; j < num_fields; j++) { + if (j > 0) { + taosFprintfFile(pFile, ","); + } + const void *value = ws_get_value_in_block(wres, i, j, &ty, &len); + shellDumpFieldToFile(pFile, (const char*)value, fields + j, len, precision); + } + taosFprintfFile(pFile, "\r\n"); + } + ws_fetch_block(wres, &data, &rows); + } while (rows && !shell.stop_query); + taosCloseFile(&pFile); + return numOfRows; +} + +static int shellDumpWebsocket(WS_RES *wres, char *fname, int *error_no, bool vertical) { + int numOfRows = 0; + if (fname != NULL) { + numOfRows = dumpWebsocketToFile(fname, wres); + } else if (vertical) { + numOfRows = verticalPrintWebsocket(wres); + } else { + numOfRows = horizontalPrintWebsocket(wres); + } + *error_no = ws_errno(wres); + return numOfRows; +} + +void shellRunSingleCommandWebsocketImp(char *command) { + int64_t st, et; + char *sptr = NULL; + char *cptr = NULL; + char *fname = NULL; + bool printMode = false; + + if ((sptr = strstr(command, ">>")) != NULL) { + cptr = strstr(command, ";"); + if (cptr != NULL) { + *cptr = '\0'; + } + + fname = sptr + 2; + while (*fname == ' ') fname++; + *sptr = '\0'; + } + + if ((sptr = strstr(command, "\\G")) != NULL) { + cptr = strstr(command, ";"); + if (cptr != NULL) { + *cptr = '\0'; + } + + *sptr = '\0'; + printMode = true; // When output to a file, the switch does not work. + } + + if (!shell.ws_conn && shell_conn_ws_server(0)) { + return; + } + + shell.stop_query = false; + st = taosGetTimestampUs(); + + WS_RES* res = ws_query_timeout(shell.ws_conn, command, shell.args.timeout); + int code = ws_errno(res); + if (code != 0) { + et = taosGetTimestampUs(); + fprintf(stderr, "\nDB: error: %s (%.6fs)\n", ws_errstr(res), (et - st)/1E6); + if (code == TSDB_CODE_WS_SEND_TIMEOUT || code == TSDB_CODE_WS_RECV_TIMEOUT) { + fprintf(stderr, "Hint: use -t to increase the timeout in seconds\n"); + } else if (code == TSDB_CODE_WS_INTERNAL_ERRO || code == TSDB_CODE_WS_CLOSED) { + fprintf(stderr, "TDengine server is down, will try to reconnect\n"); + shell.ws_conn = NULL; + } + ws_free_result(res); + return; + } + + if (shellRegexMatch(command, "^\\s*use\\s+[a-zA-Z0-9_]+\\s*;\\s*$", REG_EXTENDED | REG_ICASE)) { + fprintf(stdout, "Database changed.\r\n\r\n"); + fflush(stdout); + ws_free_result(res); + return; + } + + int numOfRows = 0; + if (ws_is_update_query(res)) { + numOfRows = ws_affected_rows(res); + et = taosGetTimestampUs(); + printf("Query Ok, %d of %d row(s) in database (%.6fs)\n", numOfRows, numOfRows, + (et - st)/1E6); + } else { + int error_no = 0; + numOfRows = shellDumpWebsocket(res, fname, &error_no, printMode); + if (numOfRows < 0) { + ws_free_result(res); + return; + } + et = taosGetTimestampUs(); + if (error_no == 0 && !shell.stop_query) { + printf("Query OK, %d row(s) in set (%.6fs)\n", numOfRows, + (et - st)/1E6); + } else { + printf("Query interrupted, %d row(s) in set (%.6fs)\n", numOfRows, + (et - st)/1E6); + } + } + printf("\n"); + ws_free_result(res); +} +#endif