diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 21d8405f58..f1cfa58f62 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -96,6 +96,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); int32_t mndProcessRpcMsg(SRpcMsg *pMsg); int32_t mndProcessSyncMsg(SRpcMsg *pMsg); int32_t mndPreProcessMsg(SRpcMsg *pMsg); +void mndAbortPreprocessMsg(SRpcMsg *pMsg); /** * @brief Generate machine code diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 38a179869f..b07e8f39d5 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -106,6 +106,9 @@ typedef struct SInterpFuncLogicNode { SNodeList* pFuncs; STimeWindow timeRange; int64_t interval; + EFillMode fillMode; + SNode* pFillValues; // SNodeListNode + SNode* pTimeSeries; // SColumnNode } SInterpFuncLogicNode; typedef enum EModifyTableType { MODIFY_TABLE_TYPE_INSERT = 1, MODIFY_TABLE_TYPE_DELETE } EModifyTableType; @@ -309,6 +312,9 @@ typedef struct SInterpFuncPhysiNode { SNodeList* pFuncs; STimeWindow timeRange; int64_t interval; + EFillMode fillMode; + SNode* pFillValues; // SNodeListNode + SNode* pTimeSeries; // SColumnNode } SInterpFuncPhysiNode; typedef struct SJoinPhysiNode { diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index f3f147955a..3b8a37f420 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -64,6 +64,8 @@ typedef struct { int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb); +int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg); + int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index e8c4faa240..dd9fd384ce 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -107,7 +107,9 @@ typedef struct SSyncFSM { void (*FpReConfigCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta cbMeta); void (*FpLeaderTransferCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); - int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); + + int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void *pReaderParam, void** ppReader); + int32_t (*FpGetSnapshotInfo)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void** ppReader); int32_t (*FpSnapshotStopRead)(struct SSyncFSM* pFsm, void* pReader); @@ -193,8 +195,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak); bool syncEnvIsStart(); const char* syncStr(ESyncState state); bool syncIsRestoreFinish(int64_t rid); -int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); -int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta); + int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index fde016375b..9fb9bcebf7 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -564,6 +564,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x265A) #define TSDB_CODE_PAR_GROUP_BY_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x265B) #define TSDB_CODE_PAR_INVALID_TABLE_OPTION TAOS_DEF_ERROR_CODE(0, 0x265C) +#define TSDB_CODE_PAR_INVALID_INTERP_CLAUSE TAOS_DEF_ERROR_CODE(0, 0x265D) //planner #define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700) diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 5ea55d558f..95b721b4dd 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -550,6 +550,8 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { pMsg->msgType != TDMT_MND_TRANS_TIMER) { mError("msg:%p, failed to check mnode state since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType)); + mndAbortPreprocessMsg(pMsg); + SEpSet epSet = {0}; mndGetMnodeEpSet(pMsg->info.node, &epSet); diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index 5374f48e47..f32a3129de 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -25,6 +25,13 @@ int32_t mndPreProcessMsg(SRpcMsg *pMsg) { return qWorkerPreprocessQueryMsg(pMnode->pQuery, pMsg); } +void mndAbortPreprocessMsg(SRpcMsg *pMsg) { + if (TDMT_VND_QUERY != pMsg->msgType) return; + + SMnode *pMnode = pMsg->info.node; + qWorkerAbortPreprocessQueryMsg(pMnode->pQuery, pMsg); +} + int32_t mndProcessQueryMsg(SRpcMsg *pMsg) { int32_t code = -1; SMnode *pMnode = pMsg->info.node; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 3e3850de1a..3a023bcece 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -73,7 +73,17 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM } } -int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { +int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) { + // TODO: + + // atomic operation + // step1. sdbGetCommitInfo + // step2. create ppReader with pReaderParam + + return 0; +} + +int32_t mndSyncGetSnapshotInfo(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { SMnode *pMnode = pFsm->data; sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex); return 0; @@ -159,6 +169,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { pFsm->FpRestoreFinishCb = mndRestoreFinish; pFsm->FpReConfigCb = mndReConfig; pFsm->FpGetSnapshot = mndSyncGetSnapshot; + pFsm->FpGetSnapshotInfo = mndSyncGetSnapshotInfo; pFsm->FpSnapshotStartRead = mndSnapshotStartRead; pFsm->FpSnapshotStopRead = mndSnapshotStopRead; pFsm->FpSnapshotDoRead = mndSnapshotDoRead; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index e83b992f06..70d89102e6 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -116,7 +116,7 @@ typedef void *tsdbReaderT; #define BLOCK_LOAD_TABLE_SEQ_ORDER 2 #define BLOCK_LOAD_TABLE_RR_ORDER 3 -tsdbReaderT *tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId, +tsdbReaderT tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *tableInfoGroup, uint64_t qId, uint64_t taskId); tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STableListInfo *groupList, uint64_t qId, void *pMemRef); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index d399b5e3bf..70b6e24b07 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -121,7 +121,7 @@ int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSu int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkRsp* pRsp); int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); -tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, +tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, uint64_t taskId); tsdbReaderT tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, void* pMemRef); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index e7a40eeeb9..ce73246e51 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -500,7 +500,7 @@ static int32_t setCurrentSchema(SVnode* pVnode, STsdbReadHandle* pTsdbReadHandle return TSDB_CODE_SUCCESS; } -tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, +tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* tableList, uint64_t qId, uint64_t taskId) { STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId); if (pTsdbReadHandle == NULL) { @@ -508,7 +508,7 @@ tsdbReaderT* tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, STableLi } if (emptyQueryTimewindow(pTsdbReadHandle)) { - return (tsdbReaderT*)pTsdbReadHandle; + return (tsdbReaderT)pTsdbReadHandle; } // todo apply the lastkey of table check to avoid to load header file diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index ae61aa454e..297a486ee9 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -353,8 +353,8 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c SyncIndex beginIndex = SYNC_INDEX_INVALID; char logBuf[256] = {0}; - if (pFsm->FpGetSnapshot != NULL) { - (*pFsm->FpGetSnapshot)(pFsm, &snapshot); + if (pFsm->FpGetSnapshotInfo != NULL) { + (*pFsm->FpGetSnapshotInfo)(pFsm, &snapshot); beginIndex = snapshot.lastApplyIndex; } @@ -416,7 +416,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) { pFsm->FpCommitCb = vnodeSyncCommitMsg; pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg; pFsm->FpRollBackCb = vnodeSyncRollBackMsg; - pFsm->FpGetSnapshot = vnodeSyncGetSnapshot; + pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot; pFsm->FpRestoreFinishCb = NULL; pFsm->FpReConfigCb = vnodeSyncReconfig; pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead; diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 00643be0e1..ae0f669d65 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -412,6 +412,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); } + EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pTblScanNode->scan.pScanCols->length); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize); @@ -426,27 +427,57 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_NEW(level + 1, "I/O: "); int32_t nodeNum = taosArrayGetSize(pResNode->pExecInfo); - for (int32_t i = 0; i < nodeNum; ++i) { + struct STableScanAnalyzeInfo info = {0}; + + int32_t maxIndex = 0; + int32_t totalRows = 0; + for(int32_t i = 0; i < nodeNum; ++i) { SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, i); STableScanAnalyzeInfo *pScanInfo = (STableScanAnalyzeInfo *)execInfo->verboseInfo; - EXPLAIN_ROW_APPEND("total_blocks=%d", pScanInfo->totalBlocks); - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + info.totalBlocks += pScanInfo->totalBlocks; + info.loadBlocks += pScanInfo->loadBlocks; + info.totalRows += pScanInfo->totalRows; + info.skipBlocks += pScanInfo->skipBlocks; + info.filterTime += pScanInfo->filterTime; + info.loadBlockStatis += pScanInfo->loadBlockStatis; + info.totalCheckedRows += pScanInfo->totalCheckedRows; + info.filterOutBlocks += pScanInfo->filterOutBlocks; - EXPLAIN_ROW_APPEND("load_blocks=%d", pScanInfo->loadBlocks); - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); - - EXPLAIN_ROW_APPEND("load_block_SMAs=%d", pScanInfo->loadBlockStatis); - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); - - EXPLAIN_ROW_APPEND("total_rows=%" PRIu64, pScanInfo->totalRows); - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); - - EXPLAIN_ROW_APPEND("check_rows=%" PRIu64, pScanInfo->totalCheckedRows); - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + if (pScanInfo->totalRows > totalRows) { + totalRows = pScanInfo->totalRows; + maxIndex = i; + } } + EXPLAIN_ROW_APPEND("total_blocks=%.1f", ((double)info.totalBlocks) / nodeNum); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + + EXPLAIN_ROW_APPEND("load_blocks=%.1f", ((double)info.loadBlocks) / nodeNum); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + + EXPLAIN_ROW_APPEND("load_block_SMAs=%.1f", ((double)info.loadBlockStatis) / nodeNum); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + + EXPLAIN_ROW_APPEND("total_rows=%.1f", ((double)info.totalRows) / nodeNum); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + + EXPLAIN_ROW_APPEND("check_rows=%.1f", ((double)info.totalCheckedRows) / nodeNum); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_END(); + + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + + //Rows out: Avg 4166.7 rows x 24 workers. Max 4187 rows (seg7) with 0.220 ms to first row, 1.738 ms to end, start offset by 1.470 ms. + SExplainExecInfo *execInfo = taosArrayGet(pResNode->pExecInfo, maxIndex); + STableScanAnalyzeInfo *p1 = (STableScanAnalyzeInfo *)execInfo->verboseInfo; + + EXPLAIN_ROW_NEW(level + 1, " "); + EXPLAIN_ROW_APPEND("max_row_task=%d, total_rows:%" PRId64 ", ep:%s (cost=%.3f..%.3f)", maxIndex, p1->totalRows, "tbd", + execInfo->startupCost, execInfo->totalCost); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 756eb5f375..c33b6622e3 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -369,6 +369,8 @@ typedef struct SSysTableScanInfo { typedef struct SBlockDistInfo { SSDataBlock* pResBlock; void* pHandle; + SReadHandle readHandle; + uint64_t uid; // table uid } SBlockDistInfo; // todo remove this @@ -740,7 +742,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, SBlockDistScanPhysiNode* pBlockScanNode, + SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 58918667f3..54b5961585 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2812,7 +2812,8 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan // todo add more information about exchange operation int32_t type = pOperator->operatorType; if (type == QUERY_NODE_PHYSICAL_PLAN_EXCHANGE || type == QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN || - type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN) { + type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN || type == QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN || + type == QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN) { *order = TSDB_ORDER_ASC; *scanFlag = MAIN_SCAN; return TSDB_CODE_SUCCESS; @@ -2840,7 +2841,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { SAggOperatorInfo* pAggInfo = pOperator->info; SExprSupp* pSup = &pOperator->exprSupp; - SOptrBasicInfo* pInfo = &pAggInfo->binfo; SOperatorInfo* downstream = pOperator->pDownstream[0]; int64_t st = taosGetTimestampUs(); @@ -4087,6 +4087,46 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { + SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*) pPhyNode; + pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); + + if (pBlockNode->tableType == TSDB_SUPER_TABLE) { + int32_t code = tsdbGetAllTableList(pHandle->meta, pBlockNode->uid, pTableListInfo->pTableList); + if (code != TSDB_CODE_SUCCESS) { + pTaskInfo->code = terrno; + return NULL; + } + } else { // Create one table group. + STableKeyInfo info = {.lastKey = 0, .uid = pBlockNode->uid}; + taosArrayPush(pTableListInfo->pTableList, &info); + } + + SQueryTableDataCond cond = {0}; + + { + cond.order = TSDB_ORDER_ASC; + cond.numOfCols = 1; + cond.colList = taosMemoryCalloc(1, sizeof(SColumnInfo)); + if (cond.colList == NULL) { + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return NULL; + } + + cond.colList->colId = 1; + cond.colList->type = TSDB_DATA_TYPE_TIMESTAMP; + cond.colList->bytes = sizeof(TSKEY); + + cond.numOfTWindows = 1; + cond.twindows = taosMemoryCalloc(1, sizeof(STimeWindow)); + cond.twindows[0] = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; + cond.suid = pBlockNode->suid; + cond.type = BLOCK_LOAD_OFFSET_SEQ_ORDER; + } + tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo, queryId, taskId); + cleanupQueryTableDataCond(&cond); + + return createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo); } else { ASSERT(0); } @@ -4284,8 +4324,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId, SNode* pTagCond) { - int32_t code = - getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo, pTagCond); + int32_t code = getTableList(pHandle->meta, &pTableScanNode->scan, pTableListInfo, pTagCond); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6f0187fa53..426d45976f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include #include "filter.h" #include "function.h" #include "functionMgt.h" @@ -367,13 +368,14 @@ void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* p static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { STableScanInfo* pTableScanInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSDataBlock* pBlock = pTableScanInfo->pResBlock; int64_t st = taosGetTimestampUs(); while (tsdbNextDataBlock(pTableScanInfo->dataReader)) { - if (isTaskKilled(pOperator->pTaskInfo)) { - longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + if (isTaskKilled(pTaskInfo)) { + longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); } // process this data block based on the probabilities @@ -396,7 +398,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { continue; } - uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); + uint64_t* groupId = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); if (groupId) { pBlock->info.groupId = *groupId; } @@ -589,44 +591,76 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* pInfo->dataReader = pReadHandle; // pInfo->prevGroupId = -1; - pOperator->name = "TableSeqScanOperator"; + pOperator->name = "TableSeqScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; + pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL, NULL, NULL); return pOperator; } +static int32_t doGetTableRowSize(void* pMeta, uint64_t uid) { + int32_t rowLen = 0; + + SMetaReader mr = {0}; + metaReaderInit(&mr, pMeta, 0); + metaGetTableEntryByUid(&mr, uid); + if (mr.me.type == TSDB_SUPER_TABLE) { + int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols; + for(int32_t i = 0; i < numOfCols; ++i) { + rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes; + } + } else if (mr.me.type == TSDB_CHILD_TABLE) { + uint64_t suid = mr.me.ctbEntry.suid; + metaGetTableEntryByUid(&mr, suid); + int32_t numOfCols = mr.me.stbEntry.schemaRow.nCols; + + for(int32_t i = 0; i < numOfCols; ++i) { + rowLen += mr.me.stbEntry.schemaRow.pSchema[i].bytes; + } + } else if (mr.me.type == TSDB_NORMAL_TABLE) { + int32_t numOfCols = mr.me.ntbEntry.schemaRow.nCols; + for(int32_t i = 0; i < numOfCols; ++i) { + rowLen += mr.me.ntbEntry.schemaRow.pSchema[i].bytes; + } + } + + metaReaderClear(&mr); + return rowLen; +} + static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } - STableScanInfo* pTableScanInfo = pOperator->info; + SBlockDistInfo* pBlockScanInfo = pOperator->info; - STableBlockDistInfo blockDistInfo = {0}; - blockDistInfo.maxRows = INT_MIN; - blockDistInfo.minRows = INT_MAX; + STableBlockDistInfo blockDistInfo = {.minRows = INT_MAX, .maxRows = INT_MIN}; + blockDistInfo.rowSize = doGetTableRowSize(pBlockScanInfo->readHandle.meta, pBlockScanInfo->uid); - tsdbGetFileBlocksDistInfo(pTableScanInfo->dataReader, &blockDistInfo); - blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pTableScanInfo->dataReader); + tsdbGetFileBlocksDistInfo(pBlockScanInfo->pHandle, &blockDistInfo); + blockDistInfo.numOfInmemRows = (int32_t)tsdbGetNumOfRowsInMemTable(pBlockScanInfo->pHandle); - SSDataBlock* pBlock = pTableScanInfo->pResBlock; - pBlock->info.rows = 1; + SSDataBlock* pBlock = pBlockScanInfo->pResBlock; - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0); + int32_t slotId = pOperator->exprSupp.pExprInfo->base.resSchema.slotId; + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, slotId); int32_t len = tSerializeBlockDistInfo(NULL, 0, &blockDistInfo); char* p = taosMemoryCalloc(1, len + VARSTR_HEADER_SIZE); tSerializeBlockDistInfo(varDataVal(p), len, &blockDistInfo); varDataSetLen(p, len); + blockDataEnsureCapacity(pBlock, 1); colDataAppend(pColInfo, 0, p, false); taosMemoryFree(p); + pBlock->info.rows = 1; + pOperator->status = OP_EXEC_DONE; return pBlock; } @@ -636,7 +670,8 @@ static void destroyBlockDistScanOperatorInfo(void* param, int32_t numOfOutput) { blockDataDestroy(pDistInfo->pResBlock); } -SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, + SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo) { SBlockDistInfo* pInfo = taosMemoryCalloc(1, sizeof(SBlockDistInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -644,21 +679,20 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* goto _error; } - pInfo->pHandle = dataReader; + pInfo->pHandle = dataReader; + pInfo->readHandle = *readHandle; + pInfo->uid = uid; + pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc); - pInfo->pResBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + int32_t numOfCols = 0; + SExprInfo* pExprInfo = createExprInfo(pBlockScanNode->pScanPseudoCols, NULL, &numOfCols); + initExprSupp(&pOperator->exprSupp, pExprInfo, numOfCols); - SColumnInfoData infoData = {0}; - infoData.info.type = TSDB_DATA_TYPE_VARCHAR; - infoData.info.bytes = 1024; - - taosArrayPush(pInfo->pResBlock->pDataBlock, &infoData); - - pOperator->name = "DataBlockInfoScanOperator"; - // pOperator->operatorType = OP_TableBlockInfoScan; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + pOperator->name = "DataBlockDistScanOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL, @@ -1835,6 +1869,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs); SArray* colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, COL_MATCH_FROM_COL_ID); + initExprSupp(&pOperator->exprSupp, pExprInfo, numOfExprs); + pInfo->pTableList = pTableListInfo; pInfo->pColMatchInfo = colList; pInfo->pRes = createResDataBlock(pDescNode); @@ -1847,8 +1883,6 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; - pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->exprSupp.numOfExprs = numOfExprs; pOperator->pTaskInfo = pTaskInfo; initResultSizeInfo(pOperator, 4096); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index f8650d95c1..c556e94c74 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2892,7 +2892,10 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin SArray* pChWins = getWinInfos(&pChInfo->streamAggSup, groupId); int32_t chWinSize = taosArrayGetSize(pChWins); int32_t index = binarySearch(pChWins, chWinSize, pParentWin->win.skey, TSDB_ORDER_DESC, getSessionWindowEndkey); - for (int32_t k = index; k > 0 && k < chWinSize; k++) { + if (index < 0) { + index = 0; + } + for (int32_t k = index; k < chWinSize; k++) { SResultWindowInfo* pcw = taosArrayGet(pChWins, k); if (pParentWin->win.skey <= pcw->win.skey && pcw->win.ekey <= pParentWin->win.ekey) { SResultRow* pChResult = NULL; diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index c243c1c175..e691d562c6 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -192,6 +192,8 @@ int32_t twaFunction(SqlFunctionCtx *pCtx); int32_t twaFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock); bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); + +bool blockDistSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t blockDistFunction(SqlFunctionCtx *pCtx); int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 522ee09b3c..cfad00f458 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -419,7 +419,7 @@ static int32_t translateTopBot(SFunctionNode* pFunc, char* pErrBuf, int32_t len) return TSDB_CODE_SUCCESS; } -int32_t topCreateMergePara(SNodeList* pRawParameters, SNode* pPartialRes, SNodeList** pParameters) { +int32_t topBotCreateMergePara(SNodeList* pRawParameters, SNode* pPartialRes, SNodeList** pParameters) { int32_t code = nodesListMakeAppend(pParameters, pPartialRes); if (TSDB_CODE_SUCCESS == code) { code = nodesListStrictAppend(*pParameters, nodesCloneNode(nodesListGetNode(pRawParameters, 1))); @@ -427,65 +427,6 @@ int32_t topCreateMergePara(SNodeList* pRawParameters, SNode* pPartialRes, SNodeL return TSDB_CODE_SUCCESS; } -static int32_t translateTopBotImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) { - int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); - - if (isPartial) { - if (2 != numOfParams) { - return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); - } - - uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; - uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type; - if (!IS_NUMERIC_TYPE(para1Type) || !IS_INTEGER_TYPE(para2Type)) { - return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); - } - - // param1 - SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1); - if (nodeType(pParamNode1) != QUERY_NODE_VALUE) { - return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); - } - - SValueNode* pValue = (SValueNode*)pParamNode1; - if (pValue->node.resType.type != TSDB_DATA_TYPE_BIGINT) { - return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); - } - - if (pValue->datum.i < 1 || pValue->datum.i > 100) { - return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName); - } - - pValue->notReserved = true; - - // set result type - pFunc->node.resType = - (SDataType){.bytes = getTopBotInfoSize(pValue->datum.i) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; - } else { - if (1 != numOfParams) { - return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); - } - - uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; - if (TSDB_DATA_TYPE_BINARY != para1Type) { - return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); - } - - // Do nothing. We can only access output of partial functions as input, - // so original input type cannot be obtained, resType will be set same - // as original function input type after merge function created. - } - return TSDB_CODE_SUCCESS; -} - -static int32_t translateTopBotPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { - return translateTopBotImpl(pFunc, pErrBuf, len, true); -} - -static int32_t translateTopBotMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { - return translateTopBotImpl(pFunc, pErrBuf, len, false); -} - static int32_t translateSpread(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { if (1 != LIST_LENGTH(pFunc->pParameterList)) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); @@ -1735,31 +1676,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = topFunction, .finalizeFunc = topBotFinalize, .combineFunc = topCombine, - .pPartialFunc = "_top_partial", - .pMergeFunc = "_top_merge", - // .createMergeParaFuc = topCreateMergePara - }, - { - .name = "_top_partial", - .type = FUNCTION_TYPE_TOP_PARTIAL, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, - .translateFunc = translateTopBotPartial, - .getEnvFunc = getTopBotFuncEnv, - .initFunc = topBotFunctionSetup, - .processFunc = topFunction, - .finalizeFunc = topBotPartialFinalize, - .combineFunc = topCombine, - }, - { - .name = "_top_merge", - .type = FUNCTION_TYPE_TOP_MERGE, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, - .translateFunc = translateTopBotMerge, - .getEnvFunc = getTopBotMergeFuncEnv, - .initFunc = functionSetup, - .processFunc = topFunctionMerge, - .finalizeFunc = topBotMergeFinalize, - .combineFunc = topCombine, + .pPartialFunc = "top", + .pMergeFunc = "top", + .createMergeParaFuc = topBotCreateMergePara }, { .name = "bottom", @@ -1771,30 +1690,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = bottomFunction, .finalizeFunc = topBotFinalize, .combineFunc = bottomCombine, - .pPartialFunc = "_bottom_partial", - .pMergeFunc = "_bottom_merge" - }, - { - .name = "_bottom_partial", - .type = FUNCTION_TYPE_BOTTOM_PARTIAL, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, - .translateFunc = translateTopBotPartial, - .getEnvFunc = getTopBotFuncEnv, - .initFunc = topBotFunctionSetup, - .processFunc = bottomFunction, - .finalizeFunc = topBotPartialFinalize, - .combineFunc = bottomCombine, - }, - { - .name = "_bottom_merge", - .type = FUNCTION_TYPE_BOTTOM_MERGE, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_INDEFINITE_ROWS_FUNC, - .translateFunc = translateTopBotMerge, - .getEnvFunc = getTopBotMergeFuncEnv, - .initFunc = functionSetup, - .processFunc = bottomFunctionMerge, - .finalizeFunc = topBotMergeFinalize, - .combineFunc = bottomCombine, + .pPartialFunc = "bottom", + .pMergeFunc = "bottom", + .createMergeParaFuc = topBotCreateMergePara }, { .name = "spread", @@ -2524,7 +2422,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getSelectivityFuncEnv, // todo remove this function later. .initFunc = functionSetup, .processFunc = NULL, - .finalizeFunc = NULL + .finalizeFunc = NULL, + .pPartialFunc = "_select_value", + .pMergeFunc = "_select_value" }, { .name = "_block_dist", @@ -2532,6 +2432,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .classification = FUNC_MGT_AGG_FUNC, .translateFunc = translateBlockDistFunc, .getEnvFunc = getBlockDistFuncEnv, + .initFunc = blockDistSetup, .processFunc = blockDistFunction, .finalizeFunc = blockDistFinalize }, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 583e8bd300..01fcc61cf0 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -67,8 +67,7 @@ typedef struct STopBotResItem { typedef struct STopBotRes { int32_t maxSize; - int16_t type; // store the original input type, used in merge function - int32_t numOfItems; + int16_t type; STopBotResItem* pItems; } STopBotRes; @@ -1481,11 +1480,13 @@ int32_t minMaxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3 if (pSBuf->assign && ((((*(double*)&pDBuf->v) < (*(double*)&pSBuf->v)) ^ isMinFunc) || !pDBuf->assign)) { *(double*)&pDBuf->v = *(double*)&pSBuf->v; replaceTupleData(&pDBuf->tuplePos, &pSBuf->tuplePos); + pDBuf->assign = true; } } else { if (pSBuf->assign && (((pDBuf->v < pSBuf->v) ^ isMinFunc) || !pDBuf->assign)) { pDBuf->v = pSBuf->v; replaceTupleData(&pDBuf->tuplePos, &pSBuf->tuplePos); + pDBuf->assign = true; } } pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes); @@ -1745,10 +1746,10 @@ int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { if (IS_INTEGER_TYPE(type)) { avg = pStddevRes->isum / ((double)pStddevRes->count); - pStddevRes->result = sqrt(pStddevRes->quadraticISum / ((double)pStddevRes->count) - avg * avg); + pStddevRes->result = sqrt(fabs(pStddevRes->quadraticISum / ((double)pStddevRes->count) - avg * avg)); } else { avg = pStddevRes->dsum / ((double)pStddevRes->count); - pStddevRes->result = sqrt(pStddevRes->quadraticDSum / ((double)pStddevRes->count) - avg * avg); + pStddevRes->result = sqrt(fabs(pStddevRes->quadraticDSum / ((double)pStddevRes->count) - avg * avg)); } return functionFinalize(pCtx, pBlock); @@ -2870,12 +2871,6 @@ bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { return true; } -bool getTopBotMergeFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { - // intermediate result is binary and length contains VAR header size - pEnv->calcMemSize = pFunc->node.resType.bytes; - return true; -} - bool topBotFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { if (!functionSetup(pCtx, pResInfo)) { return false; @@ -2950,50 +2945,6 @@ int32_t bottomFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } -static void topBotTransferInfo(SqlFunctionCtx* pCtx, STopBotRes* pInput, bool isTopQuery) { - for (int32_t i = 0; i < pInput->numOfItems; i++) { - addResult(pCtx, &pInput->pItems[i], pInput->type, isTopQuery); - } -} - -int32_t topFunctionMerge(SqlFunctionCtx* pCtx) { - SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); - SInputColumnInfoData* pInput = &pCtx->input; - SColumnInfoData* pCol = pInput->pData[0]; - ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); - - int32_t start = pInput->startRowIndex; - char* data = colDataGetData(pCol, start); - STopBotRes* pInputInfo = (STopBotRes*)varDataVal(data); - STopBotRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - - pInfo->maxSize = pInputInfo->maxSize; - pInfo->type = pInputInfo->type; - topBotTransferInfo(pCtx, pInputInfo, true); - SET_VAL(GET_RES_INFO(pCtx), pEntryInfo->numOfRes, pEntryInfo->numOfRes); - - return TSDB_CODE_SUCCESS; -} - -int32_t bottomFunctionMerge(SqlFunctionCtx* pCtx) { - SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); - SInputColumnInfoData* pInput = &pCtx->input; - SColumnInfoData* pCol = pInput->pData[0]; - ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); - - int32_t start = pInput->startRowIndex; - char* data = colDataGetData(pCol, start); - STopBotRes* pInputInfo = (STopBotRes*)varDataVal(data); - STopBotRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - - pInfo->maxSize = pInputInfo->maxSize; - pInfo->type = pInputInfo->type; - topBotTransferInfo(pCtx, pInputInfo, false); - SET_VAL(GET_RES_INFO(pCtx), pEntryInfo->numOfRes, pEntryInfo->numOfRes); - - return TSDB_CODE_SUCCESS; -} - static int32_t topBotResComparFn(const void* p1, const void* p2, const void* param) { uint16_t type = *(uint16_t*)param; @@ -3042,8 +2993,6 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData // allocate the buffer and keep the data of this row into the new allocated buffer pEntryInfo->numOfRes++; - // accumulate number of items for each vgroup, this info is needed for merge - pRes->numOfItems++; taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn, !isTopQuery); } else { // replace the minimum value in the result @@ -3142,7 +3091,7 @@ void copyTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS releaseBufPage(pCtx->pBuf, pPage); } -int32_t topBotFinalizeImpl(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, bool isMerge) { +int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); @@ -3162,39 +3111,13 @@ int32_t topBotFinalizeImpl(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, bool isMer colDataAppend(pCol, currentRow, (const char*)&pItem->v.i, false); } - if (!isMerge) { - setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow); - } + setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow); currentRow += 1; } return pEntryInfo->numOfRes; } -int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return topBotFinalizeImpl(pCtx, pBlock, false); } - -int32_t topBotMergeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { - return topBotFinalizeImpl(pCtx, pBlock, true); -} - -int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { - SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); - STopBotRes* pRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - int32_t resultBytes = getTopBotInfoSize(pRes->maxSize); - char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); - - memcpy(varDataVal(res), pRes, resultBytes); - varDataSetLen(res, resultBytes); - - int32_t slotId = pCtx->pExpr->base.resSchema.slotId; - SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); - - colDataAppend(pCol, pBlock->info.rows, res, false); - - taosMemoryFree(res); - return 1; -} - void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, bool isTopQuery) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); STopBotRes* pRes = getTopBotOutputInfo(pCtx); @@ -3209,8 +3132,6 @@ void addResult(SqlFunctionCtx* pCtx, STopBotResItem* pSourceItem, int16_t type, pItem->tuplePos.pageId = -1; replaceTupleData(&pItem->tuplePos, &pSourceItem->tuplePos); pEntryInfo->numOfRes++; - // accumulate number of items for each vgroup, this info is needed for merge - pRes->numOfItems++; taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn, !isTopQuery); } else { // replace the minimum value in the result @@ -5002,7 +4923,19 @@ int32_t twaFinalize(struct SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return functionFinalize(pCtx, pBlock); } +bool blockDistSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { + if (!functionSetup(pCtx, pResultInfo)) { + return false; + } + + STableBlockDistInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + pInfo->minRows = INT32_MAX; + return true; +} + int32_t blockDistFunction(SqlFunctionCtx* pCtx) { + const int32_t BLOCK_DIST_RESULT_ROWS = 24; + SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; @@ -5020,6 +4953,11 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) { pDistInfo->totalRows += p1.totalRows; pDistInfo->numOfFiles += p1.numOfFiles; + pDistInfo->defMinRows = p1.defMinRows; + pDistInfo->defMaxRows = p1.defMaxRows; + pDistInfo->rowSize = p1.rowSize; + pDistInfo->numOfSmallBlocks = p1.numOfSmallBlocks; + if (pDistInfo->minRows > p1.minRows) { pDistInfo->minRows = p1.minRows; } @@ -5031,7 +4969,7 @@ int32_t blockDistFunction(SqlFunctionCtx* pCtx) { pDistInfo->blockRowsHisto[i] += p1.blockRowsHisto[i]; } - pResInfo->numOfRes = 1; + pResInfo->numOfRes = BLOCK_DIST_RESULT_ROWS; // default output rows return TSDB_CODE_SUCCESS; } @@ -5043,7 +4981,7 @@ int32_t tSerializeBlockDistInfo(void* buf, int32_t bufLen, const STableBlockDist if (tEncodeU32(&encoder, pInfo->rowSize) < 0) return -1; if (tEncodeU16(&encoder, pInfo->numOfFiles) < 0) return -1; - if (tEncodeU32(&encoder, pInfo->rowSize) < 0) return -1; + if (tEncodeU32(&encoder, pInfo->numOfBlocks) < 0) return -1; if (tEncodeU32(&encoder, pInfo->numOfTables) < 0) return -1; if (tEncodeU64(&encoder, pInfo->totalSize) < 0) return -1; @@ -5074,7 +5012,7 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo if (tDecodeU32(&decoder, &pInfo->rowSize) < 0) return -1; if (tDecodeU16(&decoder, &pInfo->numOfFiles) < 0) return -1; - if (tDecodeU32(&decoder, &pInfo->rowSize) < 0) return -1; + if (tDecodeU32(&decoder, &pInfo->numOfBlocks) < 0) return -1; if (tDecodeU32(&decoder, &pInfo->numOfTables) < 0) return -1; if (tDecodeU64(&decoder, &pInfo->totalSize) < 0) return -1; @@ -5096,32 +5034,29 @@ int32_t tDeserializeBlockDistInfo(void* buf, int32_t bufLen, STableBlockDistInfo int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - char* pData = GET_ROWCELL_INTERBUF(pResInfo); + STableBlockDistInfo* pData = GET_ROWCELL_INTERBUF(pResInfo); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0); int32_t row = 0; - - STableBlockDistInfo info = {0}; - tDeserializeBlockDistInfo(varDataVal(pData), varDataLen(pData), &info); - char st[256] = {0}; + double totalRawSize = pData->totalRows * pData->rowSize; int32_t len = - sprintf(st + VARSTR_HEADER_SIZE, "Blocks=[%d] Size=[%.3fKb] Average_Block_size=[%.3fKb] Compression_Ratio=[%.3f]", - info.numOfBlocks, info.totalSize / 1024.0, info.totalSize / (info.numOfBlocks * 1024.0), - info.totalSize / (info.totalRows * info.rowSize * 1.0)); + sprintf(st + VARSTR_HEADER_SIZE, "Total_Blocks=[%d] Total_Size=[%.2f Kb] Average_size=[%.2f Kb] Compression_Ratio=[%.2f %c]", + pData->numOfBlocks, pData->totalSize / 1024.0, ((double)pData->totalSize) / pData->numOfBlocks, + pData->totalSize * 100 / totalRawSize, '%'); varDataSetLen(st, len); colDataAppend(pColInfo, row++, st, false); - len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Rows=[%ld] MinRows=[%d] MaxRows=[%d] Averge_Rows=[%ld] Inmem_Rows=[%d]", - info.totalRows, info.minRows, info.maxRows, info.totalRows / info.numOfBlocks, info.numOfInmemRows); + len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Rows=[%"PRId64"] Inmem_Rows=[%d] MinRows=[%d] MaxRows=[%d] Average_Rows=[%"PRId64"]", + pData->totalRows, pData->numOfInmemRows, pData->minRows, pData->maxRows, pData->totalRows / pData->numOfBlocks); varDataSetLen(st, len); colDataAppend(pColInfo, row++, st, false); - len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Files=[%d] Total_Vgroups=[%d]", info.numOfTables, - info.numOfFiles, 0); + len = sprintf(st + VARSTR_HEADER_SIZE, "Total_Tables=[%d] Total_Files=[%d] Total_Vgroups=[%d]", pData->numOfTables, + pData->numOfFiles, 0); varDataSetLen(st, len); colDataAppend(pColInfo, row++, st, false); @@ -5133,40 +5068,56 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t maxVal = 0; int32_t minVal = INT32_MAX; - for (int32_t i = 0; i < sizeof(info.blockRowsHisto) / sizeof(info.blockRowsHisto[0]); ++i) { - if (maxVal < info.blockRowsHisto[i]) { - maxVal = info.blockRowsHisto[i]; + for (int32_t i = 0; i < tListLen(pData->blockRowsHisto); ++i) { + if (maxVal < pData->blockRowsHisto[i]) { + maxVal = pData->blockRowsHisto[i]; } - if (minVal > info.blockRowsHisto[i]) { - minVal = info.blockRowsHisto[i]; + if (minVal > pData->blockRowsHisto[i]) { + minVal = pData->blockRowsHisto[i]; } } int32_t delta = maxVal - minVal; int32_t step = delta / 50; + if (step == 0) { + step = 1; + } - int32_t numOfBuckets = sizeof(info.blockRowsHisto) / sizeof(info.blockRowsHisto[0]); - int32_t bucketRange = (info.maxRows - info.minRows) / numOfBuckets; + int32_t numOfBuckets = sizeof(pData->blockRowsHisto) / sizeof(pData->blockRowsHisto[0]); + int32_t bucketRange = (pData->maxRows - pData->minRows) / numOfBuckets; - for (int32_t i = 0; i < 20; ++i) { - len += sprintf(st + VARSTR_HEADER_SIZE, "%04d |", info.defMinRows + bucketRange * (i + 1)); + bool singleModel = false; + if (bucketRange == 0) { + singleModel = true; + step = 20; + bucketRange = (pData->defMaxRows - pData->defMinRows) / numOfBuckets; + } + + for (int32_t i = 0; i < tListLen(pData->blockRowsHisto); ++i) { + len = sprintf(st + VARSTR_HEADER_SIZE, "%04d |", pData->defMinRows + bucketRange * (i + 1)); + + int32_t num = 0; + if (singleModel && pData->blockRowsHisto[i] > 0) { + num = 20; + } else { + num = (pData->blockRowsHisto[i] + step - 1) / step; + } - int32_t num = (info.blockRowsHisto[i] + step - 1) / step; for (int32_t j = 0; j < num; ++j) { int32_t x = sprintf(st + VARSTR_HEADER_SIZE + len, "%c", '|'); len += x; } - double v = info.blockRowsHisto[i] * 100.0 / info.numOfBlocks; - len += sprintf(st + VARSTR_HEADER_SIZE + len, " %d (%.3f%c)", info.blockRowsHisto[i], v, '%'); + double v = pData->blockRowsHisto[i] * 100.0 / pData->numOfBlocks; + len += sprintf(st + VARSTR_HEADER_SIZE + len, " %d (%.2f%c)", pData->blockRowsHisto[i], v, '%'); printf("%s\n", st); varDataSetLen(st, len); colDataAppend(pColInfo, row++, st, false); } - return row; + return TSDB_CODE_SUCCESS; } typedef struct SDerivInfo { @@ -5362,4 +5313,4 @@ int32_t interpFunction(SqlFunctionCtx* pCtx) { #endif return TSDB_CODE_SUCCESS; -} \ No newline at end of file +} diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index b12f910cb1..5eaca60ea5 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -464,6 +464,9 @@ static SNode* logicInterpFuncCopy(const SInterpFuncLogicNode* pSrc, SInterpFuncL CLONE_NODE_LIST_FIELD(pFuncs); COPY_OBJECT_FIELD(timeRange, sizeof(STimeWindow)); COPY_SCALAR_FIELD(interval); + COPY_SCALAR_FIELD(fillMode); + CLONE_NODE_FIELD(pFillValues); + CLONE_NODE_FIELD(pTimeSeries); return (SNode*)pDst; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 57d1ee7857..bde5159087 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2133,6 +2133,9 @@ static const char* jkInterpFuncPhysiPlanFuncs = "Funcs"; static const char* jkInterpFuncPhysiPlanStartTime = "StartTime"; static const char* jkInterpFuncPhysiPlanEndTime = "EndTime"; static const char* jkInterpFuncPhysiPlanInterval = "Interval"; +static const char* jkInterpFuncPhysiPlanFillMode = "FillMode"; +static const char* jkInterpFuncPhysiPlanFillValues = "FillValues"; +static const char* jkInterpFuncPhysiPlanTimeSeries = "TimeSeries"; static int32_t physiInterpFuncNodeToJson(const void* pObj, SJson* pJson) { const SInterpFuncPhysiNode* pNode = (const SInterpFuncPhysiNode*)pObj; @@ -2153,6 +2156,15 @@ static int32_t physiInterpFuncNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkInterpFuncPhysiPlanInterval, pNode->interval); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkInterpFuncPhysiPlanFillMode, pNode->fillMode); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkInterpFuncPhysiPlanFillValues, nodeToJson, pNode->pFillValues); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkInterpFuncPhysiPlanTimeSeries, nodeToJson, pNode->pTimeSeries); + } return code; } @@ -2176,6 +2188,15 @@ static int32_t jsonToPhysiInterpFuncNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBigIntValue(pJson, jkInterpFuncPhysiPlanInterval, &pNode->interval); } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkInterpFuncPhysiPlanFillMode, pNode->fillMode, code); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkInterpFuncPhysiPlanFillValues, &pNode->pFillValues); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkInterpFuncPhysiPlanTimeSeries, &pNode->pTimeSeries); + } return code; } diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 6b3fdad27d..ccb0d37da6 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -700,8 +700,11 @@ SNode* addEveryClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pEvery) { SNode* addFillClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pFill) { CHECK_PARSER_STATUS(pCxt); - if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) { - ((SSelectStmt*)pStmt)->pFill = pFill; + if (QUERY_NODE_SELECT_STMT == nodeType(pStmt) && NULL != pFill) { + SFillNode* pFillClause = (SFillNode*)pFill; + nodesDestroyNode(pFillClause->pWStartTs); + pFillClause->pWStartTs = createPrimaryKeyCol(pCxt); + ((SSelectStmt*)pStmt)->pFill = (SNode*)pFillClause; } return pStmt; } @@ -909,7 +912,7 @@ SNode* createDefaultTableOptions(SAstCreateContext* pCxt) { pOptions->watermark1 = TSDB_DEFAULT_ROLLUP_WATERMARK; pOptions->watermark2 = TSDB_DEFAULT_ROLLUP_WATERMARK; pOptions->ttl = TSDB_DEFAULT_TABLE_TTL; - pOptions->commentNull = true; // mark null + pOptions->commentNull = true; // mark null return (SNode*)pOptions; } @@ -918,7 +921,7 @@ SNode* createAlterTableOptions(SAstCreateContext* pCxt) { STableOptions* pOptions = (STableOptions*)nodesMakeNode(QUERY_NODE_TABLE_OPTIONS); CHECK_OUT_OF_MEM(pOptions); pOptions->ttl = -1; - pOptions->commentNull = true; // mark null + pOptions->commentNull = true; // mark null return (SNode*)pOptions; } @@ -940,9 +943,9 @@ SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType case TABLE_OPTION_ROLLUP: ((STableOptions*)pOptions)->pRollupFuncs = pVal; break; - case TABLE_OPTION_TTL:{ + case TABLE_OPTION_TTL: { int64_t ttl = taosStr2Int64(((SToken*)pVal)->z, NULL, 10); - if (ttl > INT32_MAX){ + if (ttl > INT32_MAX) { ttl = INT32_MAX; } // ttl can not be smaller than 0, because there is a limitation in sql.y (TTL NK_INTEGER) diff --git a/source/libs/parser/src/parAstParser.c b/source/libs/parser/src/parAstParser.c index e3218f972b..6058978451 100644 --- a/source/libs/parser/src/parAstParser.c +++ b/source/libs/parser/src/parAstParser.c @@ -419,6 +419,24 @@ static int32_t collectMetaKeyFromDelete(SCollectMetaKeyCxt* pCxt, SDeleteStmt* p return collectMetaKeyFromRealTableImpl(pCxt, (SRealTableNode*)pStmt->pFromTable, AUTH_TYPE_WRITE); } +static int32_t collectMetaKeyFromShowBlockDist(SCollectMetaKeyCxt* pCxt, SShowTableDistributedStmt* pStmt) { + SName name = {.type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId}; + strcpy(name.dbname, pStmt->dbName); + strcpy(name.tname, pStmt->tableName); + int32_t code = catalogRemoveTableMeta(pCxt->pParseCxt->pCatalog, &name); + if (TSDB_CODE_SUCCESS == code) { + code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache); + } + + if (TSDB_CODE_SUCCESS == code) { + code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache); + } + if (TSDB_CODE_SUCCESS == code) { + code = reserveDbVgInfoInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache); + } + return code; +} + static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { pCxt->pStmt = pStmt; switch (nodeType(pStmt)) { @@ -497,6 +515,8 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) { return collectMetaKeyFromShowTransactions(pCxt, (SShowStmt*)pStmt); case QUERY_NODE_DELETE_STMT: return collectMetaKeyFromDelete(pCxt, (SDeleteStmt*)pStmt); + case QUERY_NODE_SHOW_TABLE_DISTRIBUTED_STMT: + return collectMetaKeyFromShowBlockDist(pCxt, (SShowTableDistributedStmt*)pStmt); default: break; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 77ef9027cd..a60dba3a9a 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1932,26 +1932,33 @@ static int32_t getFillTimeRange(STranslateContext* pCxt, SNode* pWhere, STimeWin return code; } -static int32_t checkFill(STranslateContext* pCxt, SIntervalWindowNode* pInterval) { - SFillNode* pFill = (SFillNode*)pInterval->pFill; +static int32_t checkFill(STranslateContext* pCxt, SFillNode* pFill, SValueNode* pInterval) { + if (FILL_MODE_NONE == pFill->mode) { + return TSDB_CODE_SUCCESS; + } + if (TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_INITIALIZER) || TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_DESC_INITIALIZER)) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE); } - int64_t timeRange = TABS(pFill->timeRange.skey - pFill->timeRange.ekey); - int64_t intervalRange = 0; - SValueNode* pInter = (SValueNode*)pInterval->pInterval; - if (TIME_IS_VAR_DURATION(pInter->unit)) { + // interp FILL clause + if (NULL == pInterval) { + return TSDB_CODE_SUCCESS; + } + + int64_t timeRange = TABS(pFill->timeRange.skey - pFill->timeRange.ekey); + int64_t intervalRange = 0; + if (TIME_IS_VAR_DURATION(pInterval->unit)) { int64_t f = 1; - if (pInter->unit == 'n') { + if (pInterval->unit == 'n') { f = 30L * MILLISECOND_PER_DAY; - } else if (pInter->unit == 'y') { + } else if (pInterval->unit == 'y') { f = 365L * MILLISECOND_PER_DAY; } - intervalRange = pInter->datum.i * f; + intervalRange = pInterval->datum.i * f; } else { - intervalRange = pInter->datum.i; + intervalRange = pInterval->datum.i; } if ((timeRange == 0) || (timeRange / intervalRange) >= MAX_INTERVAL_TIME_WINDOW) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE); @@ -1967,7 +1974,7 @@ static int32_t translateFill(STranslateContext* pCxt, SNode* pWhere, SIntervalWi int32_t code = getFillTimeRange(pCxt, pWhere, &(((SFillNode*)pInterval->pFill)->timeRange)); if (TSDB_CODE_SUCCESS == code) { - code = checkFill(pCxt, pInterval); + code = checkFill(pCxt, (SFillNode*)pInterval->pFill, (SValueNode*)pInterval->pInterval); } return code; } @@ -2109,6 +2116,64 @@ static int32_t translateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) { return code; } +static int32_t createDefaultFillNode(STranslateContext* pCxt, SNode** pOutput) { + SFillNode* pFill = (SFillNode*)nodesMakeNode(QUERY_NODE_FILL); + if (NULL == pFill) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pFill->mode = FILL_MODE_NONE; + + SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); + if (NULL == pCol) { + nodesDestroyNode((SNode*)pFill); + return TSDB_CODE_OUT_OF_MEMORY; + } + pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID; + strcpy(pCol->colName, PK_TS_COL_INTERNAL_NAME); + pFill->pWStartTs = (SNode*)pCol; + + *pOutput = (SNode*)pFill; + return TSDB_CODE_SUCCESS; +} + +static int32_t translateInterpFill(STranslateContext* pCxt, SSelectStmt* pSelect) { + int32_t code = TSDB_CODE_SUCCESS; + + if (NULL == pSelect->pFill) { + code = createDefaultFillNode(pCxt, &pSelect->pFill); + } + if (TSDB_CODE_SUCCESS == code) { + code = translateExpr(pCxt, &pSelect->pFill); + } + if (TSDB_CODE_SUCCESS == code) { + code = getFillTimeRange(pCxt, pSelect->pRange, &(((SFillNode*)pSelect->pFill)->timeRange)); + } + if (TSDB_CODE_SUCCESS == code) { + code = checkFill(pCxt, (SFillNode*)pSelect->pFill, (SValueNode*)pSelect->pEvery); + } + + return code; +} + +static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) { + if (!pSelect->hasInterpFunc) { + if (NULL != pSelect->pRange || NULL != pSelect->pEvery || NULL != pSelect->pFill) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_INTERP_CLAUSE); + } + return TSDB_CODE_SUCCESS; + } + + int32_t code = translateExpr(pCxt, &pSelect->pRange); + if (TSDB_CODE_SUCCESS == code) { + code = translateExpr(pCxt, &pSelect->pEvery); + } + if (TSDB_CODE_SUCCESS == code) { + code = translateInterpFill(pCxt, pSelect); + } + return code; +} + static int32_t translatePartitionBy(STranslateContext* pCxt, SNodeList* pPartitionByList) { pCxt->currClause = SQL_CLAUSE_PARTITION_BY; return translateExprList(pCxt, pPartitionByList); @@ -2378,6 +2443,9 @@ static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) { if (TSDB_CODE_SUCCESS == code) { code = checkLimit(pCxt, pSelect); } + if (TSDB_CODE_SUCCESS == code) { + code = translateInterp(pCxt, pSelect); + } if (TSDB_CODE_SUCCESS == code) { code = rewriteUniqueStmt(pCxt, pSelect); } @@ -3388,18 +3456,18 @@ static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStm pReq->delay2 = pStmt->pOptions->maxDelay2; pReq->watermark1 = pStmt->pOptions->watermark1; pReq->watermark2 = pStmt->pOptions->watermark2; -// pReq->ttl = pStmt->pOptions->ttl; + // pReq->ttl = pStmt->pOptions->ttl; columnDefNodeToField(pStmt->pCols, &pReq->pColumns); columnDefNodeToField(pStmt->pTags, &pReq->pTags); pReq->numOfColumns = LIST_LENGTH(pStmt->pCols); pReq->numOfTags = LIST_LENGTH(pStmt->pTags); - if(pStmt->pOptions->commentNull == false){ + if (pStmt->pOptions->commentNull == false) { pReq->comment = strdup(pStmt->pOptions->comment); if (NULL == pReq->comment) { return TSDB_CODE_OUT_OF_MEMORY; } pReq->commentLen = strlen(pStmt->pOptions->comment); - }else{ + } else { pReq->commentLen = -1; } @@ -3452,14 +3520,14 @@ static int32_t buildAlterSuperTableReq(STranslateContext* pCxt, SAlterTableStmt* pAlterReq->alterType = pStmt->alterType; if (TSDB_ALTER_TABLE_UPDATE_OPTIONS == pStmt->alterType) { -// pAlterReq->ttl = pStmt->pOptions->ttl; + // pAlterReq->ttl = pStmt->pOptions->ttl; if (pStmt->pOptions->commentNull == false) { pAlterReq->comment = strdup(pStmt->pOptions->comment); if (NULL == pAlterReq->comment) { return TSDB_CODE_OUT_OF_MEMORY; } pAlterReq->commentLen = strlen(pStmt->pOptions->comment); - }else{ + } else { pAlterReq->commentLen = -1; } @@ -4725,7 +4793,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt* return TSDB_CODE_OUT_OF_MEMORY; } req.commentLen = strlen(pStmt->pOptions->comment); - }else{ + } else { req.commentLen = -1; } req.ntb.schemaRow.nCols = LIST_LENGTH(pStmt->pCols); @@ -4878,11 +4946,11 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, S struct SVCreateTbReq req = {0}; req.type = TD_CHILD_TABLE; req.name = strdup(pStmt->tableName); - req.ttl = pStmt->pOptions->ttl; + req.ttl = pStmt->pOptions->ttl; if (pStmt->pOptions->commentNull == false) { req.comment = strdup(pStmt->pOptions->comment); req.commentLen = strlen(pStmt->pOptions->comment); - } else{ + } else { req.commentLen = -1; } req.ctb.suid = suid; @@ -5460,15 +5528,14 @@ static int32_t buildUpdateOptionsReq(STranslateContext* pCxt, SAlterTableStmt* p pReq->newTTL = pStmt->pOptions->ttl; } - if (TSDB_CODE_SUCCESS == code){ - if(pStmt->pOptions->commentNull == false) { + if (TSDB_CODE_SUCCESS == code) { + if (pStmt->pOptions->commentNull == false) { pReq->newComment = strdup(pStmt->pOptions->comment); if (NULL == pReq->newComment) { code = TSDB_CODE_OUT_OF_MEMORY; } pReq->newCommentLen = strlen(pReq->newComment); - } - else{ + } else { pReq->newCommentLen = -1; } } diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 95cdf58d5a..d94b430c45 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -196,6 +196,8 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "%s function does not supportted in group query"; case TSDB_CODE_PAR_INVALID_TABLE_OPTION: return "Invalid option %s"; + case TSDB_CODE_PAR_INVALID_INTERP_CLAUSE: + return "Invalid usage of RANGE clause, EVERY clause or FILL clause"; case TSDB_CODE_OUT_OF_MEMORY: return "Out of memory"; default: diff --git a/source/libs/parser/test/parSelectTest.cpp b/source/libs/parser/test/parSelectTest.cpp index 693c422ae0..11e09cd83d 100644 --- a/source/libs/parser/test/parSelectTest.cpp +++ b/source/libs/parser/test/parSelectTest.cpp @@ -267,8 +267,6 @@ TEST_F(ParserSelectTest, interp) { run("SELECT INTERP(c1) FROM t1 EVERY(5s)"); - run("SELECT INTERP(c1) FROM t1 EVERY(5s) FILL(LINEAR)"); - run("SELECT INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s)"); run("SELECT INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s) FILL(LINEAR)"); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index b365afff02..c7490faa0c 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -508,10 +508,15 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p code = rewriteExprsForSelect(pInterpFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT); } - if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pRange) { - // SRangeNode* pRange = (SRangeNode*)pSelect->pRange; - // pInterpFunc->timeRange.skey = ((SValueNode*)pRange->pStart)->datum.i; - // pInterpFunc->timeRange.ekey = ((SValueNode*)pRange->pEnd)->datum.i; + if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pFill) { + SFillNode* pFill = (SFillNode*)pSelect->pFill; + pInterpFunc->timeRange = pFill->timeRange; + pInterpFunc->fillMode = pFill->mode; + pInterpFunc->pTimeSeries = nodesCloneNode(pFill->pWStartTs); + pInterpFunc->pFillValues = nodesCloneNode(pFill->pValues); + if (NULL == pInterpFunc->pTimeSeries || (NULL != pFill->pValues && NULL == pInterpFunc->pFillValues)) { + code = TSDB_CODE_OUT_OF_MEMORY; + } } if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pEvery) { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 70a8aca8bd..6af5a15147 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -877,6 +877,12 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh if (TSDB_CODE_SUCCESS == code) { pInterpFunc->timeRange = pFuncLogicNode->timeRange; pInterpFunc->interval = pFuncLogicNode->interval; + pInterpFunc->fillMode = pFuncLogicNode->fillMode; + pInterpFunc->pFillValues = nodesCloneNode(pFuncLogicNode->pFillValues); + pInterpFunc->pTimeSeries = nodesCloneNode(pFuncLogicNode->pTimeSeries); + if (NULL == pInterpFunc->pTimeSeries || (NULL != pFuncLogicNode->pFillValues && NULL == pInterpFunc->pFillValues)) { + code = TSDB_CODE_OUT_OF_MEMORY; + } } if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/qworker/inc/qwMsg.h b/source/libs/qworker/inc/qwMsg.h index ecff861f50..8c7c030dce 100644 --- a/source/libs/qworker/inc/qwMsg.h +++ b/source/libs/qworker/inc/qwMsg.h @@ -23,6 +23,7 @@ extern "C" { #include "qwInt.h" #include "dataSinkMgt.h" +int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF); int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain, const char* sql); int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 5635ec8fc6..82a62b5c5a 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -283,6 +283,26 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; } +int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { + if (NULL == qWorkerMgmt || NULL == pMsg) { + QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + SSubQueryMsg *msg = pMsg->pCont; + SQWorker * mgmt = (SQWorker *)qWorkerMgmt; + + uint64_t sId = msg->sId; + uint64_t qId = msg->queryId; + uint64_t tId = msg->taskId; + int64_t rId = msg->refId; + + QW_SCH_TASK_DLOG("Abort prerocessQuery start, handle:%p", pMsg->info.handle); + qwAbortPrerocessQuery(QW_FPARAMS()); + QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p", pMsg->info.handle); + + return TSDB_CODE_SUCCESS; +} + int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 800cc4c6e5..0f95da42e2 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -482,6 +482,13 @@ _return: QW_RET(code); } +int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) { + QW_ERR_RET(qwDropTask(QW_FPARAMS())); + + QW_RET(TSDB_CODE_SUCCESS); +} + + int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t code = 0; bool queryRsped = false; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index b910b59f83..dcfea84f36 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -238,6 +238,9 @@ int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg); bool syncNodeInRaftGroup(SSyncNode* ths, SRaftId* pRaftId); SSyncSnapshotSender* syncNodeGetSnapshotSender(SSyncNode* ths, SRaftId* pDestId); +int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta); +int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct SSnapshotMeta* sMeta); + void syncStartNormal(int64_t rid); void syncStartStandBy(int64_t rid); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 58e40668a2..d2726201cc 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -497,7 +497,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs do { SyncIndex myLastIndex = syncNodeGetLastIndex(ths); SSnapshot snapshot; - ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); + ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); bool condition0 = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && syncNodeHasSnapshot(ths); @@ -710,7 +710,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) { // advance commit index to sanpshot first SSnapshot snapshot; - ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); + ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); if (snapshot.lastApplyIndex >= 0 && snapshot.lastApplyIndex > ths->commitIndex) { SyncIndex commitBegin = ths->commitIndex; SyncIndex commitEnd = snapshot.lastApplyIndex; diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 5e5e23b312..47fd23baae 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -123,7 +123,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries syncIndexMgrLog2("recv SyncAppendEntriesReply, before pMatchIndex:", ths->pMatchIndex); if (gRaftDetailLog) { SSnapshot snapshot; - ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); + ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); sTrace("recv SyncAppendEntriesReply, before snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu", snapshot.lastApplyIndex, snapshot.lastApplyTerm); } @@ -175,7 +175,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ASSERT(pSender != NULL); bool hasSnapshot = syncNodeHasSnapshot(ths); SSnapshot snapshot; - ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); + ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); // start sending snapshot first time // start here, stop by receiver @@ -209,7 +209,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries syncIndexMgrLog2("recv SyncAppendEntriesReply, after pMatchIndex:", ths->pMatchIndex); if (gRaftDetailLog) { SSnapshot snapshot; - ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); + ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot); sTrace("recv SyncAppendEntriesReply, after snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu", snapshot.lastApplyIndex, snapshot.lastApplyTerm); } diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 738cbd3c2b..ec3e1ab2ba 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -50,7 +50,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { // advance commit index to sanpshot first SSnapshot snapshot; - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); if (snapshot.lastApplyIndex > 0 && snapshot.lastApplyIndex > pSyncNode->commitIndex) { SyncIndex commitBegin = pSyncNode->commitIndex; SyncIndex commitEnd = snapshot.lastApplyIndex; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 7fb3ba4ac0..e13271341d 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -809,9 +809,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { pSyncNode->restoreFinish = false; // pSyncNode->pSnapshot = NULL; - // if (pSyncNode->pFsm->FpGetSnapshot != NULL) { + // if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { // pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot)); - // pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, pSyncNode->pSnapshot); + // pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, pSyncNode->pSnapshot); // } // tsem_init(&(pSyncNode->restoreSem), 0, 0); @@ -1658,8 +1658,8 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode) { bool syncNodeHasSnapshot(SSyncNode* pSyncNode) { bool ret = false; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; - if (pSyncNode->pFsm->FpGetSnapshot != NULL) { - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) { ret = true; } @@ -1669,19 +1669,19 @@ bool syncNodeHasSnapshot(SSyncNode* pSyncNode) { bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index) { ASSERT(syncNodeHasSnapshot(pSyncNode)); - ASSERT(pSyncNode->pFsm->FpGetSnapshot != NULL); + ASSERT(pSyncNode->pFsm->FpGetSnapshotInfo != NULL); ASSERT(index >= SYNC_INDEX_BEGIN); SSnapshot snapshot; - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); bool b = (index <= snapshot.lastApplyIndex); return b; } SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode) { SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; - if (pSyncNode->pFsm->FpGetSnapshot != NULL) { - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); } SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); @@ -1694,8 +1694,8 @@ SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) { if (syncNodeHasSnapshot(pSyncNode)) { // has snapshot SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; - if (pSyncNode->pFsm->FpGetSnapshot != NULL) { - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); } SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); @@ -1747,8 +1747,8 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) { if (syncNodeHasSnapshot(pSyncNode)) { // has snapshot SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; - if (pSyncNode->pFsm->FpGetSnapshot != NULL) { - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); } if (index > snapshot.lastApplyIndex + 1) { diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 08564f8293..f044ae5733 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -124,7 +124,7 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { logStoreSimpleLog2("begin append entries peers LogStore:", pSyncNode->pLogStore); if (gRaftDetailLog) { SSnapshot snapshot; - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); sTrace("begin append entries peers, snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu", snapshot.lastApplyIndex, snapshot.lastApplyTerm); } diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index d1bfc39d1e..029891a692 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -45,7 +45,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->replicaIndex = replicaIndex; pSender->term = pSyncNode->pRaftStore->currentTerm; pSender->privateTerm = taosGetTimestampMs() + 100; - pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot)); + pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot)); pSender->finish = false; } else { sError("snapshotSenderCreate cannot create sender"); @@ -84,7 +84,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { pSender->blockLen = 0; // get current snapshot info - pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot)); + pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &(pSender->snapshot)); sTrace("snapshotSenderStart lastApplyIndex:%ld, lastApplyTerm:%lu, lastConfigIndex:%ld", pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); @@ -558,7 +558,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { } SSnapshot snapshot; - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); do { char *eventLog = snapshotReceiver2SimpleStr(pReceiver, "snapshot receiver finish"); diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index 3ee2b458d6..6fd6944273 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -36,9 +36,9 @@ void cleanup() { walCleanUp(); } void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { SyncIndex beginIndex = SYNC_INDEX_INVALID; - if (pFsm->FpGetSnapshot != NULL) { + if (pFsm->FpGetSnapshotInfo != NULL) { SSnapshot snapshot; - pFsm->FpGetSnapshot(pFsm, &snapshot); + pFsm->FpGetSnapshotInfo(pFsm, &snapshot); beginIndex = snapshot.lastApplyIndex; } @@ -159,7 +159,7 @@ SSyncFSM* createFsm() { pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpRollBackCb = RollBackCb; - pFsm->FpGetSnapshot = GetSnapshotCb; + pFsm->FpGetSnapshotInfo = GetSnapshotCb; pFsm->FpRestoreFinishCb = RestoreFinishCb; pFsm->FpSnapshotStartRead = SnapshotStartRead; pFsm->FpSnapshotStopRead = SnapshotStopRead; diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index b1dc53d804..a185870a3b 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -35,9 +35,9 @@ void cleanup() { walCleanUp(); } void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { SyncIndex beginIndex = SYNC_INDEX_INVALID; - if (pFsm->FpGetSnapshot != NULL) { + if (pFsm->FpGetSnapshotInfo != NULL) { SSnapshot snapshot; - pFsm->FpGetSnapshot(pFsm, &snapshot); + pFsm->FpGetSnapshotInfo(pFsm, &snapshot); beginIndex = snapshot.lastApplyIndex; } @@ -90,7 +90,7 @@ SSyncFSM* createFsm() { pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpRollBackCb = RollBackCb; - pFsm->FpGetSnapshot = GetSnapshotCb; + pFsm->FpGetSnapshotInfo = GetSnapshotCb; pFsm->FpRestoreFinishCb = RestoreFinishCb; pFsm->FpReConfigCb = ReConfigCb; diff --git a/source/libs/sync/test/syncRaftLogTest2.cpp b/source/libs/sync/test/syncRaftLogTest2.cpp index 64e1da51a1..32ff441a6f 100644 --- a/source/libs/sync/test/syncRaftLogTest2.cpp +++ b/source/libs/sync/test/syncRaftLogTest2.cpp @@ -54,7 +54,7 @@ void init() { pSyncNode->pWal = pWal; pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); - pSyncNode->pFsm->FpGetSnapshot = GetSnapshotCb; + pSyncNode->pFsm->FpGetSnapshotInfo = GetSnapshotCb; } void cleanup() { diff --git a/source/libs/sync/test/syncRaftLogTest3.cpp b/source/libs/sync/test/syncRaftLogTest3.cpp index b47f8c96c5..c81a6d3ca9 100644 --- a/source/libs/sync/test/syncRaftLogTest3.cpp +++ b/source/libs/sync/test/syncRaftLogTest3.cpp @@ -54,7 +54,7 @@ void init() { pSyncNode->pWal = pWal; pSyncNode->pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM)); - pSyncNode->pFsm->FpGetSnapshot = GetSnapshotCb; + pSyncNode->pFsm->FpGetSnapshotInfo = GetSnapshotCb; } void cleanup() { @@ -80,7 +80,7 @@ void test1() { bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); SSnapshot snapshot; - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode); @@ -146,7 +146,7 @@ void test2() { bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); SSnapshot snapshot; - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode); @@ -203,7 +203,7 @@ void test3() { bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); SSnapshot snapshot; - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode); @@ -268,7 +268,7 @@ void test4() { bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); SSnapshot snapshot; - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode); @@ -335,7 +335,7 @@ void test5() { bool hasSnapshot = syncNodeHasSnapshot(pSyncNode); SSnapshot snapshot; - pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode); SyncTerm lastTerm = syncNodeGetLastTerm(pSyncNode); diff --git a/source/libs/sync/test/syncReplicateTest.cpp b/source/libs/sync/test/syncReplicateTest.cpp index d955e64f81..66f347b620 100644 --- a/source/libs/sync/test/syncReplicateTest.cpp +++ b/source/libs/sync/test/syncReplicateTest.cpp @@ -32,9 +32,9 @@ void cleanup() { walCleanUp(); } void CommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta) { SyncIndex beginIndex = SYNC_INDEX_INVALID; - if (pFsm->FpGetSnapshot != NULL) { + if (pFsm->FpGetSnapshotInfo != NULL) { SSnapshot snapshot; - pFsm->FpGetSnapshot(pFsm, &snapshot); + pFsm->FpGetSnapshotInfo(pFsm, &snapshot); beginIndex = snapshot.lastApplyIndex; } @@ -75,7 +75,7 @@ SSyncFSM* createFsm() { pFsm->FpCommitCb = CommitCb; pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpRollBackCb = RollBackCb; - pFsm->FpGetSnapshot = GetSnapshotCb; + pFsm->FpGetSnapshotInfo = GetSnapshotCb; return pFsm; } diff --git a/source/libs/sync/test/syncSnapshotSenderTest.cpp b/source/libs/sync/test/syncSnapshotSenderTest.cpp index 404ba2acae..04a02460af 100644 --- a/source/libs/sync/test/syncSnapshotSenderTest.cpp +++ b/source/libs/sync/test/syncSnapshotSenderTest.cpp @@ -40,7 +40,7 @@ SSyncSnapshotSender* createSender() { pSyncNode->pFsm->FpSnapshotStartRead = SnapshotStartRead; pSyncNode->pFsm->FpSnapshotStopRead = SnapshotStopRead; pSyncNode->pFsm->FpSnapshotDoRead = SnapshotDoRead; - pSyncNode->pFsm->FpGetSnapshot = GetSnapshot; + pSyncNode->pFsm->FpGetSnapshotInfo = GetSnapshot; SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, 2); pSender->start = true; diff --git a/source/libs/sync/test/syncSnapshotTest.cpp b/source/libs/sync/test/syncSnapshotTest.cpp index 820500e2d8..954bdd8ea5 100644 --- a/source/libs/sync/test/syncSnapshotTest.cpp +++ b/source/libs/sync/test/syncSnapshotTest.cpp @@ -35,9 +35,9 @@ const char *pWalDir = "./syncSnapshotTest_wal"; void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { SyncIndex beginIndex = SYNC_INDEX_INVALID; - if (pFsm->FpGetSnapshot != NULL) { + if (pFsm->FpGetSnapshotInfo != NULL) { SSnapshot snapshot; - pFsm->FpGetSnapshot(pFsm, &snapshot); + pFsm->FpGetSnapshotInfo(pFsm, &snapshot); beginIndex = snapshot.lastApplyIndex; } @@ -79,7 +79,7 @@ void initFsm() { pFsm->FpCommitCb = CommitCb; pFsm->FpPreCommitCb = PreCommitCb; pFsm->FpRollBackCb = RollBackCb; - pFsm->FpGetSnapshot = GetSnapshotCb; + pFsm->FpGetSnapshotInfo = GetSnapshotCb; } SSyncNode *syncNodeInit() { diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index 1eac372bd4..74dfaa192a 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -172,7 +172,7 @@ SSyncFSM* createFsm() { pFsm->FpRollBackCb = RollBackCb; pFsm->FpReConfigCb = ReConfigCb; - pFsm->FpGetSnapshot = GetSnapshotCb; + pFsm->FpGetSnapshotInfo = GetSnapshotCb; pFsm->FpRestoreFinishCb = RestoreFinishCb; pFsm->FpSnapshotStartRead = SnapshotStartRead; diff --git a/tests/pytest/util/common.py b/tests/pytest/util/common.py index 7b00e6f331..e158f8edd2 100644 --- a/tests/pytest/util/common.py +++ b/tests/pytest/util/common.py @@ -11,13 +11,20 @@ # -*- coding: utf-8 -*- +from collections import defaultdict import random import string -from util.sql import tdSql -from util.dnodes import tdDnodes import requests import time import socket + +import taos +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * + class TDCom: def init(self, conn, logSql): tdSql.init(conn.cursor(), logSql) @@ -170,4 +177,122 @@ class TDCom: def close(self): self.cursor.close() + def create_database(self,tsql, dbName='test',dropFlag=1,precision="ms", **kwargs): + if dropFlag == 1: + tsql.execute("drop database if exists %s"%(dbName)) + ''' + vgroups replica precision strict wal fsync comp cachelast single_stable buffer pagesize pages minrows maxrows duration keep retentions + ''' + sqlString = f'create database if not exists {dbName} precision "{precision}" vgroups 4' + if len(kwargs) > 0: + dbParams = "" + for param, value in kwargs.items(): + dbParams += f'{param} {value} ' + sqlString += f'{dbParams}' + + tsql.execute(sqlString) + tdLog.debug("complete to create database %s"%(dbName)) + return + + def create_stable(self,tsql, dbName,stbName,columnDict,tagDict): + colSchema = '' + for i in range(columnDict['int']): + colSchema += ', c%d int'%i + tagSchema = '' + for i in range(tagDict['int']): + if i > 0: + tagSchema += ',' + tagSchema += 't%d int'%i + + tsql.execute("create table if not exists %s.%s (ts timestamp %s) tags(%s)"%(dbName, stbName, colSchema, tagSchema)) + tdLog.debug("complete to create %s.%s" %(dbName, stbName)) + return + + def create_ctables(self,tsql, dbName,stbName,ctbNum,tagDict): + tsql.execute("use %s" %dbName) + tagsValues = '' + for i in range(tagDict['int']): + if i > 0: + tagsValues += ',' + tagsValues += '%d'%i + + pre_create = "create table" + sql = pre_create + #tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) + for i in range(ctbNum): + sql += " %s_%d using %s tags(%s)"%(stbName,i,stbName,tagsValues) + if (i > 0) and (i%100 == 0): + tsql.execute(sql) + sql = pre_create + if sql != pre_create: + tsql.execute(sql) + + tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName)) + return + + def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs=0): + tdLog.debug("start to insert data ............") + tsql.execute("use %s" %dbName) + pre_insert = "insert into " + sql = pre_insert + if startTs == 0: + t = time.time() + startTs = int(round(t * 1000)) + #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) + for i in range(ctbNum): + sql += " %s_%d values "%(stbName,i) + for j in range(rowsPerTbl): + sql += "(%d, %d, %d)"%(startTs + j, j, j) + if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)): + tsql.execute(sql) + if j < rowsPerTbl - 1: + sql = "insert into %s_%d values " %(stbName,i) + else: + sql = "insert into " + #end sql + if sql != pre_insert: + #print("insert sql:%s"%sql) + tsql.execute(sql) + tdLog.debug("insert data ............ [OK]") + return + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files or "taosd.exe" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def getClientCfgPath(self): + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + cfgPath = buildPath + "/../sim/psim/cfg" + tdLog.info("cfgPath: %s" % cfgPath) + return cfgPath + + def newcur(self,host='localhost',port=6030,user='root',password='taosdata'): + cfgPath = self.getClientCfgPath() + con=taos.connect(host=host, user=user, password=password, config=cfgPath, port=port) + cur=con.cursor() + print(cur) + return cur + + def newTdSql(self, host='localhost',port=6030,user='root',password='taosdata'): + newTdSql = TDSql() + cur = self.newcur(host=host,port=port,user=user,password=password) + newTdSql.init(cur, False) + return newTdSql + tdCom = TDCom() diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index a8ec0f5151..720625a570 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -73,6 +73,7 @@ ./test.sh -f tsim/stream/basic1.sim ./test.sh -f tsim/stream/basic2.sim # ./test.sh -f tsim/stream/distributeInterval0.sim +# ./test.sh -f tsim/stream/distributesession0.sim # ./test.sh -f tsim/stream/session0.sim # ./test.sh -f tsim/stream/session1.sim # ./test.sh -f tsim/stream/state0.sim diff --git a/tests/script/tsim/stream/distributesession0.sim b/tests/script/tsim/stream/distributesession0.sim new file mode 100644 index 0000000000..78f65ed8a3 --- /dev/null +++ b/tests/script/tsim/stream/distributesession0.sim @@ -0,0 +1,58 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 + +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +sql create dnode $hostname2 port 7200 + +system sh/exec.sh -n dnode2 -s start + +sql create database test vgroups 4; +sql use test; +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table ts1 using st tags(1,1,1); +sql create table ts2 using st tags(2,2,2); +sql create stream stream_t1 trigger at_once into streamtST as select _wstartts, count(*) c1, sum(a) c2 , max(b) c3 from st session(ts, 10s) ; + +sleep 1000 + +sql insert into ts1 values(1648791211000,1,1,1) (1648791211005,1,1,1); +sql insert into ts2 values(1648791221004,1,2,3) (1648791221008,2,2,3); +sql insert into ts1 values(1648791211005,1,1,1); +sql insert into ts2 values(1648791221006,5,5,5) (1648791221007,5,5,5); +sql insert into ts2 values(1648791221008,5,5,5) (1648791221008,5,5,5)(1648791221006,5,5,5); +sql insert into ts1 values(1648791231000,1,1,1) (1648791231002,1,1,1) (1648791231006,1,1,1); +sql insert into ts1 values(1648791211000,6,6,6) (1648791231002,2,2,2); +sql insert into ts1 values(1648791211002,7,7,7); +sql insert into ts1 values(1648791211002,7,7,7) ts2 values(1648791221008,5,5,5) ; + +$loop_count = 0 +loop1: +sql select * from streamtST; + +sleep 300 +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $data01 != 10 then + print =====data01=$data01 + goto loop1 +endi + +if $data02 != 34 then + print =====data02=$data02 + goto loop1 +endi + +if $data03 != 7 then + print ======$data03 + return -1 +endi + +system sh/stop_dnodes.sh \ No newline at end of file diff --git a/tests/system-test/2-query/Today.py b/tests/system-test/2-query/Today.py index 9eb06de9fb..a9c3215a12 100644 --- a/tests/system-test/2-query/Today.py +++ b/tests/system-test/2-query/Today.py @@ -4,394 +4,163 @@ from util.dnodes import * from util.log import * from util.sql import * from util.cases import * +import datetime class TDTestCase: - updatecfgDict = {'rpcDebugFlag': '143'} def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor()) + self.today_date = datetime.datetime.strptime( + datetime.datetime.now().strftime("%Y-%m-%d"), "%Y-%m-%d") + self.time_unit = ['b','u','a','s','m','h','d','w'] + self.error_param = ['1.5','abc','!@#','"abc"','today()'] + self.arithmetic_operators = ['+','-','*','/'] + self.relational_operator = ['<','<=','=','>=','>'] + # prepare data + self.ntbname = 'ntb' + self.stbname = 'stb' + self.column_dict = { + 'ts':'timestamp', + 'c1':'int', + 'c2':'float', + 'c3':'double', + 'c4':'timestamp' + } + self.tag_dict = { + 't0':'int' + } + self.tbnum = 2 + self.tag_values = [ + f'10', + f'100' + ] + self.values_list = [f'now,1,1.55,100.555555,today()', + f'now+1d,10,11.11,99.999999,now()', + f'today(),3,3.333,333.333333,now()', + f'today()-1d,10,11.11,99.999999,now()', + f'today()+1d,1,1.55,100.555555,today()'] + + def set_create_normaltable_sql(self, ntbname, column_dict): + column_sql = '' + for k, v in column_dict.items(): + column_sql += f"{k} {v}," + create_ntb_sql = f'create table {ntbname} ({column_sql[:-1]})' + return create_ntb_sql + def set_create_stable_sql(self,stbname,column_dict,tag_dict): + column_sql = '' + tag_sql = '' + for k,v in column_dict.items(): + column_sql += f"{k} {v}," + for k,v in tag_dict.items(): + tag_sql += f"{k} {v}," + create_stb_sql = f'create table {stbname} ({column_sql[:-1]}) tags({tag_sql[:-1]})' + return create_stb_sql + def data_check(self,column_dict={},tbname = '',values_list = [],tb_num = 1,tb = 'tb'): + for k,v in column_dict.items(): + num_up = 0 + num_down = 0 + num_same = 0 + if v.lower() == 'timestamp': + tdSql.query(f'select {k} from {tbname}') + for i in tdSql.queryResult: + if i[0] > self.today_date: + num_up += 1 + elif i[0] == self.today_date: + num_same += 1 + elif i[0] < self.today_date: + num_down += 1 + tdSql.query(f"select today() from {tbname}") + tdSql.checkRows(len(values_list)*tb_num) + tdSql.checkData(0, 0, str(self.today_date)) + tdSql.query(f"select * from {tbname} where {k}=today()") + if tb == 'tb': + tdSql.checkRows(num_same*tb_num) + elif tb == 'stb': + tdSql.checkRows(num_same) + for i in [f'{tbname}',f'db.{tbname}']: + for unit in self.time_unit: + for symbol in ['+','-']: + tdSql.query(f"select today() {symbol}1{unit} from {i}") + tdSql.checkRows(len(values_list)*tb_num) + for unit in self.error_param: + for symbol in self.arithmetic_operators: + tdSql.error(f'select today() {symbol}{unit} from {i}') + for symbol in self.arithmetic_operators: + tdSql.query(f'select now(){symbol}null from {i}') + tdSql.checkData(0,0,None) + for symbol in self.relational_operator: + tdSql.query(f'select * from {i} where {k} {symbol} today()') + if symbol == '<' : + if tb == 'tb': + tdSql.checkRows(num_down*tb_num) + elif tb == 'stb': + tdSql.checkRows(num_down) + elif symbol == '<=': + if tb == 'tb': + tdSql.checkRows((num_same+num_down)*tb_num) + elif tb == 'stb': + tdSql.checkRows(num_same+num_down) + elif symbol == '=': + if tb == 'tb': + tdSql.checkRows(num_same*tb_num) + elif tb == 'stb': + tdSql.checkRows(num_same) + elif symbol == '>=': + if tb == 'tb': + tdSql.checkRows((num_up + num_same)*tb_num) + elif tb == 'stb': + tdSql.checkRows(num_up + num_same) + elif symbol == '>': + if tb == 'tb': + tdSql.checkRows(num_up*tb_num) + elif tb == 'stb': + tdSql.checkRows(num_up) + tdSql.query(f"select today()/0 from {tbname}") + tdSql.checkRows(len(values_list)*tb_num) + tdSql.checkData(0,0,None) + tdSql.query(f"select {k} from {tbname} where {k}=today()") + if tb == 'tb': + tdSql.checkRows(num_same*tb_num) + for i in range(num_same*tb_num): + tdSql.checkData(i, 0, str(self.today_date)) + elif tb == 'stb': + tdSql.checkRows(num_same) + for i in range(num_same): + tdSql.checkData(i, 0, str(self.today_date)) + def today_check_ntb(self): + tdSql.prepare() + tdSql.execute(self.set_create_normaltable_sql(self.ntbname,self.column_dict)) + for i in self.values_list: + tdSql.execute( + f'insert into {self.ntbname} values({i})') + self.data_check(self.column_dict,self.ntbname,self.values_list) + tdSql.execute('drop database db') + def today_check_stb_tb(self): + tdSql.prepare() + tdSql.execute(self.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict)) + for i in range(self.tbnum): + tdSql.execute(f'create table if not exists {self.stbname}_{i} using {self.stbname} tags({self.tag_values[i]})') + for j in self.values_list: + tdSql.execute(f'insert into {self.stbname}_{i} values ({j})') + # check child table + for i in range(self.tbnum): + self.data_check(self.column_dict,f'{self.stbname}_{i}',self.values_list) + # check stable + self.data_check(self.column_dict,self.stbname,self.values_list,self.tbnum,'stb') + tdSql.execute('drop database db') def run(self): # sourcery skip: extract-duplicate-method - # for func now() , today(), timezone() - tdSql.prepare() - today_date = datetime.datetime.strptime( - datetime.datetime.now().strftime("%Y-%m-%d"), "%Y-%m-%d") + + tdLog.printNoPrefix("==========check today() for normal table ==========") + self.today_check_ntb() + tdLog.printNoPrefix("==========check today() for stable and child table==========") + self.today_check_stb_tb() - tdLog.printNoPrefix("==========step1:create tables==========") - tdSql.execute( - '''create table if not exists ntb - (ts timestamp, c1 int, c2 float,c3 double,c4 timestamp) - ''' - ) - tdSql.execute( - '''create table if not exists stb - (ts timestamp, c1 int, c2 float,c3 double,c4 timestamp) tags(t0 int) - ''' - ) - tdSql.execute( - '''create table if not exists stb_1 using stb tags(100) - ''' - ) - tdLog.printNoPrefix("==========step2:insert data into ntb==========") - tdSql.execute( - 'insert into ntb values(now,1,1.55,100.555555,today())("2020-1-1 00:00:00",10,11.11,99.999999,now())(today(),3,3.333,333.333333,now())') - tdSql.execute( - 'insert into stb_1 values(now,1,1.55,100.555555,today())("2020-1-1 00:00:00",10,11.11,99.999999,now())(today(),3,3.333,333.333333,now())') - tdLog.printNoPrefix("==========step2:query test of ntb ==========") - - # test function today() - # normal table - tdSql.query("select today() from ntb") - tdSql.checkRows(3) - tdSql.checkData(0, 0, str(today_date)) - tdSql.query("select today() from db.ntb") - tdSql.checkRows(3) - tdSql.checkData(0, 0, str(today_date)) - tdSql.query("select today() +1w from ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1w from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1d from ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1d from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1h from ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1h from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1m from ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1m from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1s from ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1s from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1a from ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1a from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1u from ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1u from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1b from ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1b from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1w from ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1w from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1d from ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1d from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1h from ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1h from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1m from ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1m from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1s from ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1s from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1a from ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1a from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1u from ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1u from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1b from ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1b from db.ntb") - tdSql.checkRows(3) - tdSql.query("select * from ntb where ts=today()") - tdSql.checkRows(1) - tdSql.checkData(0, 1, 3) - tdSql.query("select * from ntb where ts<=today()") - tdSql.checkRows(2) - tdSql.checkData(0, 1, 10) - # for bug - # tdSql.query("select * from ntb where ts=today()") - tdSql.checkRows(2) - tdSql.checkData(0, 1, 3) - # tdSql.query("select * from ntb where ts>today()") - # tdSql.checkRows(1) - tdSql.query("select c4 from ntb where c4=today()") - tdSql.checkRows(1) - tdSql.checkData(0, 0, str(today_date)) - tdSql.query("select today() from ntb where ts=today()") - # tdSql.checkRows(2) - # tdSql.query("select * from ntb where ts>today()") - # tdSql.checkRows(1) - tdSql.query("select c4 from stb where c4=today()") - tdSql.checkRows(1) - tdSql.checkData(0, 0, str(today_date)) - tdSql.query("select today() from stb where ts=today()") - tdSql.checkRows(2) - tdSql.query("select * from ntb where ts>today()") - tdSql.checkRows(1) - tdSql.query("select c4 from stb_1 where c4=today()") - tdSql.checkRows(1) - tdSql.query("select today() from stb_1 where ts 2022-01-01 00:00:00.000 + 'event':'', + 'columnDict': {'int':2}, + 'tagDict': {'int':1} + } + + cdbName = 'cdb' + # some parameter to consumer processor + consumerId = 0 + expectrowcnt = 0 + topicList = '' + ifcheckdata = 0 + ifManualCommit = 1 + groupId = 'group.id:cgrp1' + autoCommit = 'enable.auto.commit:false' + autoCommitInterval = 'auto.commit.interval.ms:1000' + autoOffset = 'auto.offset.reset:earliest' + + pollDelay = 20 + showMsg = 1 + showRow = 1 + + hostname = socket.gethostname() + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + logSql = False + tdSql.init(conn.cursor(), logSql) + + def tmqCase12(self): + tdLog.printNoPrefix("======== test case 12: ") + tdLog.info("step 1: create database, stb, ctb and insert data") + + tmqCom.initConsumerTable(self.cdbName) + + tdCom.create_database(tdSql,self.paraDict["dbName"],self.paraDict["dropFlag"], self.paraDict['precision']) + + self.paraDict["stbName"] = 'stb1' + tdCom.create_stable(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["columnDict"],self.paraDict["tagDict"]) + tdCom.create_ctables(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["tagDict"]) + tdCom.insert_data(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"]) + + self.paraDict["stbName"] = 'stb2' + tdCom.create_stable(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["columnDict"],self.paraDict["tagDict"]) + tdCom.create_ctables(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["tagDict"]) + tdCom.insert_data(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"]) + + tdLog.info("create topics from db") + topicName1 = 'topic_%s'%(self.paraDict['dbName']) + tdSql.execute("create topic %s as database %s" %(topicName1, self.paraDict['dbName'])) + + topicList = topicName1 + keyList = '%s,%s,%s,%s'%(self.groupId,self.autoCommit,self.autoCommitInterval,self.autoOffset) + self.expectrowcnt = self.paraDict["rowsPerTbl"] * self.paraDict["ctbNum"] * 2 + tmqCom.insertConsumerInfo(self.consumerId, self.expectrowcnt,topicList,keyList,self.ifcheckdata,self.ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(self.pollDelay,self.paraDict["dbName"],self.showMsg, self.showRow,self.cdbName) + + tdLog.info("After waiting for a period of time, drop one stable") + time.sleep(10) + tdSql.execute("drop table %s.%s" %(self.paraDict['dbName'], self.paraDict['stbName'])) + + tdLog.info("wait result from consumer, then check it") + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + if not (totalConsumeRows >= self.expectrowcnt/2 and totalConsumeRows <= self.expectrowcnt): + tdLog.info("act consume rows: %d, expect consume rows: between %d and %d"%(totalConsumeRows, self.expectrowcnt/2, self.expectrowcnt)) + tdLog.exit("tmq consume rows error!") + + time.sleep(15) + tdSql.query("drop topic %s"%topicName1) + + tdLog.printNoPrefix("======== test case 12 end ...... ") + + def run(self): + tdSql.prepare() + self.tmqCase12() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/subscribeStb4.py b/tests/system-test/7-tmq/subscribeStb4.py index d8a9ca95b0..f3982c8f1f 100644 --- a/tests/system-test/7-tmq/subscribeStb4.py +++ b/tests/system-test/7-tmq/subscribeStb4.py @@ -196,16 +196,16 @@ class TDTestCase: auotCtbPrefix = 'autoCtb' # create and start thread - parameterDict = {'cfg': '', \ - 'actionType': 0, \ - 'dbName': 'db1', \ - 'dropFlag': 1, \ - 'vgroups': 4, \ - 'replica': 1, \ - 'stbName': 'stb1', \ - 'ctbNum': 10, \ - 'rowsPerTbl': 10000, \ - 'batchNum': 100, \ + parameterDict = {'cfg': '', + 'actionType': 0, + 'dbName': 'db1', + 'dropFlag': 1, + 'vgroups': 4, + 'replica': 1, + 'stbName': 'stb1', + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 100, 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath @@ -349,7 +349,5 @@ class TDTestCase: tdSql.close() tdLog.success(f"{__file__} successfully executed") -event = threading.Event() - tdCases.addLinux(__file__, TDTestCase()) tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/tmqCommon.py b/tests/system-test/7-tmq/tmqCommon.py new file mode 100644 index 0000000000..68adaa4126 --- /dev/null +++ b/tests/system-test/7-tmq/tmqCommon.py @@ -0,0 +1,118 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +from collections import defaultdict +import random +import string +import threading +import requests +import time +# import socketfrom + +import taos +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * + +# class actionType(Enum): +# CREATE_DATABASE = 0 +# CREATE_STABLE = 1 +# CREATE_CTABLE = 2 +# INSERT_DATA = 3 + +class TMQCom: + def init(self, conn, logSql): + tdSql.init(conn.cursor()) + # tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def initConsumerTable(self,cdbName='cdb'): + tdLog.info("create consume database, and consume info table, and consume result table") + tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) + tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) + tdSql.query("drop table if exists %s.consumeresult "%(cdbName)) + + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) + tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) + + def initConsumerInfoTable(self,cdbName='cdb'): + tdLog.info("drop consumeinfo table") + tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) + + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + sql = "insert into %s.consumeinfo values "%cdbName + sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) + tdLog.info("consume info sql: %s"%sql) + tdSql.query(sql) + + def selectConsumeResult(self,expectRows,cdbName='cdb'): + resultList=[] + while 1: + tdSql.query("select * from %s.consumeresult"%cdbName) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == expectRows: + break + else: + time.sleep(5) + + for i in range(expectRows): + tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) + resultList.append(tdSql.getData(i , 3)) + + return resultList + + def startTmqSimProcess(self,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): + buildPath = tdCom.getBuildPath() + cfgPath = tdCom.getClientCfgPath() + if valgrind == 1: + logFile = cfgPath + '/../log/valgrind-tmq.log' + shellCmd = 'nohup valgrind --log-file=' + logFile + shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' + + if (platform.system().lower() == 'windows'): + shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" + else: + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" + tdLog.info(shellCmd) + os.system(shellCmd) + + def syncCreateDbStbCtbInsertData(self, tsql, paraDict): + tdCom.create_database(tsql, paraDict["dbName"],paraDict["dropFlag"], paraDict['precision']) + tdCom.create_stable(tsql, paraDict["dbName"],paraDict["stbName"], paraDict["columnDict"], paraDict["tagDict"]) + tdCom.create_ctables(tsql, paraDict["dbName"],paraDict["stbName"],paraDict["ctbNum"],paraDict["tagDict"]) + if "event" in paraDict and type(paraDict['event']) == type(threading.Event()): + paraDict["event"].set() + tdCom.insert_data(tsql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]) + return + + def threadFunction(self, **paraDict): + # create new connector for new tdSql instance in my thread + newTdSql = tdCom.newTdSql() + self.syncCreateDbStbCtbInsertData(self, newTdSql, paraDict) + return + + def asyncCreateDbStbCtbInsertData(self, paraDict): + pThread = threading.Thread(target=self.threadFunction, kwargs=paraDict) + pThread.start() + return pThread + + def close(self): + self.cursor.close() + +tmqCom = TMQCom()