diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 110d9af45c..dedc06a2b9 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -55,8 +55,12 @@ extern int32_t tMsgDict[]; #define TMSG_SEG_CODE(TYPE) (((TYPE)&0xff00) >> 8) #define TMSG_SEG_SEQ(TYPE) ((TYPE)&0xff) -#define TMSG_INFO(TYPE) \ - (((TYPE) >= 0 && (TYPE) < TDMT_MAX) ? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] : 0) +#define TMSG_INFO(TYPE) \ + ((TYPE) >= 0 && \ + ((TYPE) < TDMT_DND_MAX_MSG | (TYPE) < TDMT_MND_MAX_MSG | (TYPE) < TDMT_VND_MAX_MSG | (TYPE) < TDMT_SCH_MAX_MSG | \ + (TYPE) < TDMT_STREAM_MAX_MSG | (TYPE) < TDMT_MON_MAX_MSG | (TYPE) < TDMT_SYNC_MAX_MSG)) \ + ? tMsgInfo[tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)] \ + : 0 #define TMSG_INDEX(TYPE) (tMsgDict[TMSG_SEG_CODE(TYPE)] + TMSG_SEG_SEQ(TYPE)) typedef uint16_t tmsg_t; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 371e079913..2130a9c264 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -82,6 +82,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_DND_NET_TEST, "net-test", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "config-dnode", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_DND_SYSTABLE_RETRIEVE, "dnode-retrieve", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_DND_MAX_MSG, "dnd-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_MND_MSG) TD_DEF_MSG_TYPE(TDMT_MND_CONNECT, "connect", NULL, NULL) @@ -164,6 +165,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_SPLIT_VGROUP, "split-vgroup", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SHOW_VARIABLES, "show-variables", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SERVER_VERSION, "server-version", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_VND_MSG) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT, "submit", SSubmitReq, SSubmitRsp) @@ -198,6 +200,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_ALTER_HASHRANGE, "alter-hashrange", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TTL_TABLE, "drop-ttl-stb", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_SCH_MSG) TD_DEF_MSG_TYPE(TDMT_SCH_QUERY, "query", NULL, NULL) @@ -209,6 +212,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_SCH_DROP_TASK, "drop-task", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_EXPLAIN, "explain", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SCH_LINK_BROKEN, "link-broken", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SCH_MAX_MSG, "sch-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_STREAM_MSG) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DEPLOY, "stream-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) @@ -217,6 +221,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RECOVER, "stream-task-recover", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_MON_MSG) TD_DEF_MSG_TYPE(TDMT_MON_MM_INFO, "monitor-minfo", NULL, NULL) @@ -227,6 +232,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MON_VM_LOAD, "monitor-vload", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MON_MM_LOAD, "monitor-mload", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MON_QM_LOAD, "monitor-qload", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MON_MAX_MSG, "monitor-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_SYNC_MSG) TD_DEF_MSG_TYPE(TDMT_SYNC_TIMEOUT, "sync-timer", NULL, NULL) @@ -251,6 +257,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_SYNC_LEADER_TRANSFER, "sync-leader-transfer", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_SET_MNODE_STANDBY, "set-mnode-standby", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_SET_VNODE_STANDBY, "set-vnode-standby", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_MAX_MSG, "sync-max", NULL, NULL) #if defined(TD_MSG_NUMBER_) TDMT_MAX diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index ff0c2df25e..0fe4274091 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -279,7 +279,6 @@ void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) { } pRequest->body.queryFp(pRequest->body.param, pRequest, code); - // pRequest->body.fetchFp(pRequest->body.param, pRequest, pResultInfo->numOfRows); } int32_t asyncExecDdlQuery(SRequestObj* pRequest, SQuery* pQuery) { diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 136ee34950..a30d60a589 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -665,8 +665,6 @@ static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) { } void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { - tscDebug("enter meta callback, code %s", tstrerror(code)); - SqlParseWrapper *pWrapper = (SqlParseWrapper *)param; SQuery * pQuery = pWrapper->pQuery; SRequestObj * pRequest = pWrapper->pRequest; @@ -686,10 +684,11 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) { TSWAP(pRequest->tableList, (pQuery)->pTableList); destorySqlParseWrapper(pWrapper); + + tscDebug("0x%"PRIx64" analysis semantics completed, start async query, reqId:0x%"PRIx64, pRequest->self, pRequest->requestId); launchAsyncQuery(pRequest, pQuery, pResultMeta); } else { destorySqlParseWrapper(pWrapper); - tscDebug("error happens, code:%d", code); if (NEED_CLIENT_HANDLE_ERROR(code)) { tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c index 613f3fb994..90f852eed1 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c @@ -32,8 +32,7 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) { if (pVnode && num < size) { int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); // dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount); - pVnodes[num] = (*ppVnode); - num++; + pVnodes[num++] = (*ppVnode); pIter = taosHashIterate(pMgmt->hash, pIter); } else { taosHashCancelIterate(pMgmt->hash, pIter); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 681440dec4..dc1bcbd258 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -88,7 +88,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); - dTrace("vgId:%d, vnode-fetch queue is empty", pVnode->vgId); + dTrace("vgId:%d, vnode queue is empty", pVnode->vgId); vmFreeQueue(pMgmt, pVnode); vnodeClose(pVnode->pImpl); @@ -140,7 +140,7 @@ static void *vmOpenVnodeInThread(void *param) { } static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) { - pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); if (pMgmt->hash == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; dError("failed to init vnode hash since %s", terrstr()); @@ -156,7 +156,8 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) { pMgmt->state.totalVnodes = numOfVnodes; - int32_t threadNum = 1; + int32_t threadNum = tsNumOfCores / 2; + if (threadNum < 1) threadNum = 1; int32_t vnodesPerThread = numOfVnodes / threadNum + 1; SVnodeThread *threads = taosMemoryCalloc(threadNum, sizeof(SVnodeThread)); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index ecd02ae8dc..beefc502e7 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -114,28 +114,6 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf } } -static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { - SVnodeObj *pVnode = pInfo->ahandle; - SRpcMsg *pMsg = NULL; - - for (int32_t i = 0; i < numOfMsgs; ++i) { - if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; - const STraceId *trace = &pMsg->info.traceId; - dGTrace("vgId:%d, msg:%p get from vnode-merge queue", pVnode->vgId, pMsg); - - int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); - if (code != 0) { - if (terrno != 0) code = terrno; - dGError("vgId:%d, msg:%p failed to merge since %s", pVnode->vgId, pMsg, terrstr()); - vmSendRsp(pMsg, code); - } - - dGTrace("msg:%p, is freed, code:0x%x", pMsg, code); - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); - } -} - static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtype) { const STraceId *trace = &pMsg->info.traceId; SMsgHead *pHead = pMsg->pCont; @@ -207,7 +185,11 @@ int32_t vmPutMsgToMonitorQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); - if (pMsg == NULL) return -1; + if (pMsg == NULL) { + rpcFreeCont(pMsg->pCont); + pRpc->pCont = NULL; + return -1; + } SMsgHead *pHead = pRpc->pCont; dTrace("vgId:%d, msg:%p is created, type:%s", pHead->vgId, pMsg, TMSG_INFO(pRpc->msgType)); @@ -215,7 +197,16 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { pHead->contLen = htonl(pHead->contLen); pHead->vgId = htonl(pHead->vgId); memcpy(pMsg, pRpc, sizeof(SRpcMsg)); - return vmPutMsgToQueue(pMgmt, pMsg, qtype); + + int32_t code = vmPutMsgToQueue(pMgmt, pMsg, qtype); + if (code != 0) { + dTrace("msg:%p, is freed", pMsg); + taosFreeQitem(pMsg); + rpcFreeCont(pMsg->pCont); + pRpc->pCont = NULL; + } + + return code; } int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index ee27f27f06..d70ed09920 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -212,6 +212,7 @@ void dmCleanupDnode(SDnode *pDnode) { dmCleanupClient(pDnode); dmCleanupServer(pDnode); dmClearVars(pDnode); + rpcCleanup(); dDebug("dnode is closed, ptr:%p", pDnode); } diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index df3c9c4e88..7043991525 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -71,9 +71,9 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { } static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { - SDnodeTrans * pTrans = &pDnode->trans; + SDnodeTrans *pTrans = &pDnode->trans; int32_t code = -1; - SRpcMsg * pMsg = NULL; + SRpcMsg *pMsg = NULL; SMgmtWrapper *pWrapper = NULL; SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pRpc->msgType)]; @@ -185,6 +185,7 @@ _OVER: taosFreeQitem(pMsg); } rpcFreeCont(pRpc->pCont); + pRpc->pCont = NULL; } dmReleaseWrapper(pWrapper); @@ -195,11 +196,11 @@ int32_t dmInitMsgHandle(SDnode *pDnode) { for (EDndNodeType ntype = DNODE; ntype < NODE_END; ++ntype) { SMgmtWrapper *pWrapper = &pDnode->wrappers[ntype]; - SArray * pArray = (*pWrapper->func.getHandlesFp)(); + SArray *pArray = (*pWrapper->func.getHandlesFp)(); if (pArray == NULL) return -1; for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { - SMgmtHandle * pMgmt = taosArrayGet(pArray, i); + SMgmtHandle *pMgmt = taosArrayGet(pArray, i); SDnodeHandle *pHandle = &pTrans->msgHandles[TMSG_INDEX(pMgmt->msgType)]; if (pMgmt->needCheckVgId) { pHandle->needCheckVgId = pMgmt->needCheckVgId; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 3a3331a0b3..85f1ce6843 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -739,9 +739,12 @@ int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDbInput, SVgObj *pVgroup) { pDb = mndAcquireDb(pMnode, pVgroup->dbName); } - int64_t vgroupMemroy = (int64_t)pDb->cfg.buffer * 1024 * 1024 + (int64_t)pDb->cfg.pages * pDb->cfg.pageSize * 1024; - if (pDb->cfg.cacheLastRow > 0) { - vgroupMemroy += (int64_t)pDb->cfg.lastRowMem * 1024 * 1024; + int64_t vgroupMemroy = 0; + if (pDb != NULL) { + vgroupMemroy = (int64_t)pDb->cfg.buffer * 1024 * 1024 + (int64_t)pDb->cfg.pages * pDb->cfg.pageSize * 1024; + if (pDb->cfg.cacheLastRow > 0) { + vgroupMemroy += (int64_t)pDb->cfg.lastRowMem * 1024 * 1024; + } } if (pDbInput == NULL) { diff --git a/source/libs/command/src/command.c b/source/libs/command/src/command.c index cd454c075b..fc3e3cbc8a 100644 --- a/source/libs/command/src/command.c +++ b/source/libs/command/src/command.c @@ -548,19 +548,17 @@ static int32_t execShowLocalVariables(SRetrieveTableRsp** pRsp) { } static int32_t createSelectResultDataBlock(SNodeList* pProjects, SSDataBlock** pOutput) { - SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + SSDataBlock* pBlock = createDataBlock(); if (NULL == pBlock) { return TSDB_CODE_OUT_OF_MEMORY; } - pBlock->pDataBlock = taosArrayInit(LIST_LENGTH(pProjects), sizeof(SColumnInfoData)); - SNode* pProj = NULL; FOREACH(pProj, pProjects) { SColumnInfoData infoData = {0}; infoData.info.type = ((SExprNode*)pProj)->resType.type; infoData.info.bytes = ((SExprNode*)pProj)->resType.bytes; - taosArrayPush(pBlock->pDataBlock, &infoData); + blockDataAppendColInfo(pBlock, &infoData); } *pOutput = pBlock; return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index f0f0361031..00f2e09e0c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -51,6 +51,13 @@ typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int #define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0) +#define START_TS_COLUMN_INDEX 0 +#define END_TS_COLUMN_INDEX 1 +#define UID_COLUMN_INDEX 2 +#define GROUPID_COLUMN_INDEX UID_COLUMN_INDEX +#define DELETE_GROUPID_COLUMN_INDEX 2 + + enum { // when this task starts to execute, this status will set TASK_NOT_COMPLETED = 0x1u, @@ -364,6 +371,8 @@ typedef struct SStreamBlockScanInfo { int32_t scanWinIndex; // for state operator int32_t pullDataResIndex; SSDataBlock* pPullDataRes; // pull data SSDataBlock + SSDataBlock* pDeleteDataRes; // delete data SSDataBlock + int32_t deleteDataIndex; } SStreamBlockScanInfo; typedef struct SSysTableScanInfo { @@ -429,6 +438,10 @@ typedef struct SIntervalAggOperatorInfo { bool invertible; SArray* pPrevValues; // SArray used to keep the previous not null value for interpolation. bool ignoreExpiredData; + SArray* pRecycledPages; + SArray* pDelWins; // SWinRes + int32_t delIndex; + SSDataBlock* pDelRes; } SIntervalAggOperatorInfo; typedef struct SStreamFinalIntervalOperatorInfo { @@ -451,6 +464,10 @@ typedef struct SStreamFinalIntervalOperatorInfo { int32_t pullIndex; SSDataBlock* pPullDataRes; bool ignoreExpiredData; + SArray* pRecycledPages; + SArray* pDelWins; // SWinRes + int32_t delIndex; + SSDataBlock* pDelRes; } SStreamFinalIntervalOperatorInfo; typedef struct SAggOperatorInfo { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 061b4ab3c5..f1965d4e68 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -807,18 +807,38 @@ static bool isStateWindow(SStreamBlockScanInfo* pInfo) { return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE; } +static uint64_t getGroupId(SOperatorInfo* pOperator, uint64_t uid) { + uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t)); + if (groupId) { + return *groupId; + } + return 0; + /* Todo(liuyao) for partition by column + recordNewGroupKeys(pTableScanInfo->pGroupCols, pTableScanInfo->pGroupColVals, pBlock, rowId); + int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals); + uint64_t resId = 0; + uint64_t* groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len); + if (groupId) { + return *groupId; + } else if (len != 0) { + resId = calcGroupId(pTableScanInfo->keyBuf, len); + taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &resId, sizeof(uint64_t)); + } + return resId; + */ +} + static void setGroupId(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) { ASSERT(rowIndex < pBlock->info.rows); switch (pBlock->info.type) { + case STREAM_DELETE_DATA: case STREAM_RETRIEVE: { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex); uint64_t* groupCol = (uint64_t*)pColInfo->pData; pInfo->groupId = groupCol[rowIndex]; } break; - case STREAM_DELETE_DATA: - break; default: break; } @@ -840,14 +860,14 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3 int64_t gap = pInfo->sessionSup.gap; int32_t winIndex = 0; SResultWindowInfo* pCurWin = - getSessionTimeWindow(pAggSup, tsCols[(*pRowIndex)], INT64_MIN, pSDB->info.groupId, gap, &winIndex); + getSessionTimeWindow(pAggSup, tsCols[*pRowIndex], INT64_MIN, pSDB->info.groupId, gap, &winIndex); win = pCurWin->win; - (*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, (*pRowIndex), gap, NULL); + (*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL); } else { win = - getActiveTimeWindow(NULL, &dumyInfo, tsCols[(*pRowIndex)], &pInfo->interval, pInfo->interval.precision, NULL); - setGroupId(pInfo, pSDB, 2, *pRowIndex); - (*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, (*pRowIndex), win.ekey, binarySearchForKey, NULL, + getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, pInfo->interval.precision, NULL); + setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex); + (*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); } needRead = true; @@ -891,27 +911,6 @@ static void copyOneRow(SSDataBlock* dest, SSDataBlock* source, int32_t sourceRow dest->info.rows++; } -static uint64_t getGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t rowId) { - uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); - if (groupId) { - return *groupId; - } - return 0; - /* Todo(liuyao) for partition by column - recordNewGroupKeys(pTableScanInfo->pGroupCols, pTableScanInfo->pGroupColVals, pBlock, rowId); - int32_t len = buildGroupKeys(pTableScanInfo->keyBuf, pTableScanInfo->pGroupColVals); - uint64_t resId = 0; - uint64_t* groupId = taosHashGet(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len); - if (groupId) { - return *groupId; - } else if (len != 0) { - resId = calcGroupId(pTableScanInfo->keyBuf, len); - taosHashPut(pTableScanInfo->pGroupSet, pTableScanInfo->keyBuf, len, &resId, sizeof(uint64_t)); - } - return resId; - */ -} - static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) { while (1) { SSDataBlock* pResult = NULL; @@ -935,7 +934,7 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, i SSDataBlock* pBlock = createOneDataBlock(pResult, true); blockDataCleanup(pResult); for (int32_t i = 0; i < pBlock->info.rows; i++) { - uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock, i); + uint64_t id = getGroupId(pInfo->pOperatorDumy, pBlock->info.uid); if (id == pInfo->groupId) { copyOneRow(pResult, pBlock, i); } @@ -944,6 +943,40 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, i */ } +static void copyDeleteDataBlock(SStreamBlockScanInfo* pInfo, SSDataBlock* pDelBlock, SOperatorInfo* pOperator, SSDataBlock* pUpdateRes) { + if (pDelBlock->info.rows == 0) { + return; + } + blockDataCleanup(pUpdateRes); + blockDataEnsureCapacity(pUpdateRes, 64); + ASSERT(taosArrayGetSize(pDelBlock->pDataBlock) >= 3); + SColumnInfoData* pStartTsCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX); + TSKEY* startData = (TSKEY*)pStartTsCol->pData; + SColumnInfoData* pEndTsCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX); + TSKEY* endData = (TSKEY*)pEndTsCol->pData; + SColumnInfoData* pGpCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX); + uint64_t* uidCol = (uint64_t*)pGpCol->pData; + + SColumnInfoData* pDestTsCol = taosArrayGet(pUpdateRes->pDataBlock, START_TS_COLUMN_INDEX); + SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, DELETE_GROUPID_COLUMN_INDEX); + for (int32_t i = pInfo->deleteDataIndex ; i < pDelBlock->info.rows && + i < pDelBlock->info.capacity - (endData[i] - startData[i])/pInfo->interval.interval - 1; i++) { + uint64_t groupId = getGroupId(pOperator, uidCol[i]); + for (TSKEY startTs = startData[i]; startTs <= endData[i]; ) { + colDataAppend(pDestTsCol, pUpdateRes->info.rows, (const char*)&startTs, false); + colDataAppend(pDestGpCol, pUpdateRes->info.rows, (const char*)&groupId, false); + pUpdateRes->info.rows++; + startTs = taosTimeAdd(startTs, pInfo->interval.interval, pInfo->interval.intervalUnit, pInfo->interval.precision); + } + pInfo->deleteDataIndex++; + } + + if (pInfo->deleteDataIndex > 0 && pInfo->deleteDataIndex == pDelBlock->info.rows) { + blockDataCleanup(pDelBlock); + pInfo->deleteDataIndex = 0; + } +} + static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) { blockDataCleanup(pUpdateBlock); int32_t size = taosArrayGetSize(pInfo->tsArray); @@ -953,11 +986,11 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa blockDataEnsureCapacity(pUpdateBlock, size); int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex); - pInfo->groupId = getGroupId(pInfo->pSnapshotReadOp, pBlock, rowId); + pInfo->groupId = getGroupId(pInfo->pSnapshotReadOp, pBlock->info.uid); int32_t i = 0; for (; i < size; i++) { rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex); - uint64_t id = getGroupId(pInfo->pSnapshotReadOp, pBlock, rowId); + uint64_t id = getGroupId(pInfo->pSnapshotReadOp, pBlock->info.uid); if (pInfo->groupId != id) { break; } @@ -974,28 +1007,32 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa if (size > 0 && pInfo->tsArrayIndex == size) { taosArrayClear(pInfo->tsArray); } + + if (size == 0) { + copyDeleteDataBlock(pInfo, pInfo->pDeleteDataRes, pInfo->pSnapshotReadOp, pUpdateBlock); + } } -static void getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, - SSDataBlock* pUpdateBlock) { +static void checkUpdateData(SStreamBlockScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, + bool out) { SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); TSKEY* ts = (TSKEY*)pColDataInfo->pData; for (int32_t rowId = 0; rowId < pBlock->info.rows; rowId++) { - if (updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, ts[rowId])) { + if (updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, ts[rowId]) && out) { taosArrayPush(pInfo->tsArray, &rowId); } } - if (!pUpdateBlock) { - taosArrayClear(pInfo->tsArray); - return; +} + +static void setBlockGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t uidColIndex) { + ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3); + SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, uidColIndex); + uint64_t* uidCol = (uint64_t*)pColDataInfo->pData; + ASSERT(pBlock->info.rows > 0); + for (int32_t i = 0 ; i < pBlock->info.rows; i++) { + uidCol[i] = getGroupId(pOperator, uidCol[i]); } - setUpdateData(pInfo, pBlock, pUpdateBlock); - // Todo(liuyao) get from tsdb - // SSDataBlock* p = createOneDataBlock(pBlock, true); - // p->info.type = STREAM_INVERT; - // taosArrayClear(pInfo->tsArray); - // return p; } static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { @@ -1020,13 +1057,29 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { int32_t current = pInfo->validBlockIndex++; SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current); blockDataUpdateTsWindow(pBlock, 0); - if (pBlock->info.type == STREAM_RETRIEVE) { + switch (pBlock->info.type) { + case STREAM_RETRIEVE:{ pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE; copyDataBlock(pInfo->pPullDataRes, pBlock); pInfo->pullDataResIndex = 0; - prepareDataScan(pInfo, pInfo->pPullDataRes, 0, &pInfo->pullDataResIndex); + prepareDataScan(pInfo, pInfo->pPullDataRes, START_TS_COLUMN_INDEX, &pInfo->pullDataResIndex); updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo); + } + break; + case STREAM_DELETE_DATA: { + pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; + pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER; + copyDataBlock(pInfo->pDeleteDataRes, pBlock); + copyDeleteDataBlock(pInfo, pInfo->pDeleteDataRes, pInfo->pSnapshotReadOp, pInfo->pUpdateRes); + pInfo->updateResIndex = 0; + prepareDataScan(pInfo, pInfo->pUpdateRes, START_TS_COLUMN_INDEX, &pInfo->updateResIndex); + pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA; + return pInfo->pUpdateRes; + } + break; + default: + break; } return pBlock; } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) { @@ -1043,39 +1096,33 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { } else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RETRIEVE) { SSDataBlock* pSDB = doDataScan(pInfo, pInfo->pPullDataRes, 0, &pInfo->pullDataResIndex); if (pSDB != NULL) { - getUpdateDataBlock(pInfo, true, pSDB, NULL); + checkUpdateData(pInfo, true, pSDB, false); pSDB->info.type = STREAM_PULL_DATA; return pSDB; } pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER; - } else { - if (isStateWindow(pInfo)) { - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER; - pInfo->updateResIndex = pInfo->pUpdateRes->info.rows; - if (!prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex)) { - pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - } + } else if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) { + SSDataBlock* pSDB = doDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); + if (pSDB) { + pSDB->info.type = STREAM_NORMAL; + checkUpdateData(pInfo, true, pSDB, false); + return pSDB; } - if (pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER) { - SSDataBlock* pSDB = doDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); - if (pSDB == NULL) { - setUpdateData(pInfo, pInfo->pRes, pInfo->pUpdateRes); - if (pInfo->pUpdateRes->info.rows > 0) { - if (!isStateWindow(pInfo)) { - // Todo(liuyao) mybe can delete this. - bool test = prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); - ASSERT(test == false); - } - return pInfo->pUpdateRes; - } else { - pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; - } - } else { - pSDB->info.type = STREAM_NORMAL; - getUpdateDataBlock(pInfo, true, pSDB, NULL); - return pSDB; - } + setUpdateData(pInfo, pInfo->pRes, pInfo->pUpdateRes); + if (pInfo->pUpdateRes->info.rows > 0) { + prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); + return pInfo->pUpdateRes; } + pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; + } else if (isStateWindow(pInfo)) { + pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER; + pInfo->updateResIndex = pInfo->pUpdateRes->info.rows; + if (prepareDataScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex)) { + ASSERT(pInfo->pUpdateRes->info.rows == 0); + // return empty data blcok + return pInfo->pUpdateRes; + } + pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; } SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; @@ -1169,7 +1216,8 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { pOperator->status = OP_EXEC_DONE; } else if (pInfo->pUpdateInfo) { pInfo->tsArrayIndex = 0; - getUpdateDataBlock(pInfo, true, pInfo->pRes, pInfo->pUpdateRes); + checkUpdateData(pInfo, true, pInfo->pRes, true); + setUpdateData(pInfo, pInfo->pRes, pInfo->pUpdateRes); if (pInfo->pUpdateRes->info.rows > 0) { if (pInfo->pUpdateRes->info.type == STREAM_CLEAR) { pInfo->updateResIndex = 0; @@ -1180,9 +1228,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { } } } - return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; - } else if (pInfo->blockType == STREAM_INPUT__DATA_SCAN) { // check reader last status // if not match, reset status @@ -1295,6 +1341,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->groupId = 0; pInfo->pPullDataRes = createPullDataBlock(); pInfo->pStreamScanOp = pOperator; + pInfo->deleteDataIndex = 0; + pInfo->pDeleteDataRes = createPullDataBlock(); pOperator->name = "StreamBlockScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; @@ -1748,8 +1796,8 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) { getPerfDbMeta(&pSysDbTableMeta, &size); p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB); - relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); pInfo->pRes->info.rows = p->info.rows; + relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock, false); blockDataDestroy(p); return pInfo->pRes->info.rows; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 03c939cc95..fdef432d95 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -808,11 +808,31 @@ static void removeResult(SArray* pUpdated, TSKEY key) { static void removeResults(SArray* pWins, SArray* pUpdated) { int32_t size = taosArrayGetSize(pWins); for (int32_t i = 0; i < size; i++) { - STimeWindow* pW = taosArrayGet(pWins, i); - removeResult(pUpdated, pW->skey); + SWinRes* pW = taosArrayGet(pWins, i); + removeResult(pUpdated, pW->ts); } } +int64_t getWinReskey(void* data, int32_t index) { + SArray* res = (SArray*)data; + SWinRes* pos = taosArrayGet(res, index); + return pos->ts; +} + +static void removeDeleteResults(SArray* pUpdated, SArray* pDelWins) { + int32_t upSize = taosArrayGetSize(pUpdated); + int32_t delSize = taosArrayGetSize(pDelWins); + for (int32_t i = 0; i < upSize; i++) { + SResKeyPos* pResKey = taosArrayGetP(pUpdated, i); + int64_t key = *(int64_t*)pResKey->key; + int32_t index = binarySearch(pDelWins, delSize, key, TSDB_ORDER_DESC, getWinReskey); + if (index >= 0 && key == getWinReskey(pDelWins, index)) { + taosArrayRemove(pDelWins, index); + } + } +} + + bool isOverdue(TSKEY ts, STimeWindowAggSupp* pSup) { ASSERT(pSup->maxTs == INT64_MIN || pSup->maxTs > 0); return pSup->maxTs != INT64_MIN && ts < pSup->maxTs - pSup->waterMark; @@ -1264,6 +1284,38 @@ bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t return true; } +bool doDeleteIntervalWindow(SAggSupporter* pAggSup, TSKEY ts, uint64_t groupId) { + size_t bytes = sizeof(TSKEY); + SET_RES_WINDOW_KEY(pAggSup->keyBuf, &ts, bytes, groupId); + SResultRowPosition* p1 = + (SResultRowPosition*)taosHashGet(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); + if (!p1) { + // window has been closed + return false; + } + SFilePage* bufPage = getBufPage(pAggSup->pResultBuf, p1->pageId); + // dBufSetBufPageRecycled(pAggSup->pResultBuf, bufPage); + taosHashRemove(pAggSup->pResultRowHashTable, pAggSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); + return true; +} + +void doDeleteSpecifyIntervalWindow(SAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* pUpWins, SInterval* pInterval) { + SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); + TSKEY* tsStarts = (TSKEY*)pStartCol->pData; + SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); + uint64_t* groupIds = (uint64_t*)pGroupCol->pData; + for (int32_t i = 0; i < pBlock->info.rows; i++) { + SResultRowInfo dumyInfo; + dumyInfo.cur.pageId = -1; + STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsStarts[i], pInterval, pInterval->precision, NULL); + doDeleteIntervalWindow(pAggSup, win.skey, groupIds[i]); + if (pUpWins) { + SWinRes winRes = {.ts = win.skey, .groupId = groupIds[i]}; + taosArrayPush(pUpWins, &winRes); + } + } +} + static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* pInterval, int32_t tsIndex, int32_t numOfOutput, SSDataBlock* pBlock, SArray* pUpWins) { SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, tsIndex); @@ -1279,13 +1331,11 @@ static void doClearWindows(SAggSupporter* pAggSup, SExprSupp* pSup1, SInterval* dumyInfo.cur.pageId = -1; STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[i], pInterval, pInterval->precision, NULL); step = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, i, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); - uint64_t groupId = pBlock->info.groupId; - if (pGpDatas) { - groupId = pGpDatas[i]; - } - bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TKEY), groupId, numOfOutput); + uint64_t winGpId = pGpDatas ? pGpDatas[i] : pBlock->info.groupId; + bool res = doClearWindow(pAggSup, pSup1, (char*)&win.skey, sizeof(TKEY), winGpId, numOfOutput); if (pUpWins && res) { - taosArrayPush(pUpWins, &win); + SWinRes winRes = {.ts = win.skey, .groupId = winGpId}; + taosArrayPush(pUpWins, &winRes); } } } @@ -1307,8 +1357,9 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) { return TSDB_CODE_SUCCESS; } -static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval, - SHashObj* pPullDataMap, SArray* closeWins) { +static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, + SInterval* pInterval, SHashObj* pPullDataMap, SArray* closeWins, + SArray* pRecyPages, SDiskbasedBuf* pDiscBuf) { void* pIte = NULL; size_t keyLen = 0; while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) { @@ -1342,6 +1393,11 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, if (code != TSDB_CODE_SUCCESS) { return code; } + ASSERT(pRecyPages != NULL); + taosArrayPush(pRecyPages, &pPos->pageId); + } else { + SFilePage* bufPage = getBufPage(pDiscBuf, pPos->pageId); + // dBufSetBufPageRecycled(pDiscBuf, bufPage); } char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))]; SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId); @@ -1358,7 +1414,38 @@ static void closeChildIntervalWindow(SArray* pChildren, TSKEY maxTs) { SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info; ASSERT(pChInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE); pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs); - closeIntervalWindow(pChInfo->aggSup.pResultRowHashTable, &pChInfo->twAggSup, &pChInfo->interval, NULL, NULL); + closeIntervalWindow(pChInfo->aggSup.pResultRowHashTable, &pChInfo->twAggSup, + &pChInfo->interval, NULL, NULL, NULL, pChInfo->aggSup.pResultBuf); + } +} + +static void freeAllPages(SArray* pageIds, SDiskbasedBuf* pDiskBuf) { + int32_t size = taosArrayGetSize(pageIds); + for (int32_t i = 0; i < size; i++) { + int32_t pageId = *(int32_t*)taosArrayGet(pageIds, i); + SFilePage* bufPage = getBufPage(pDiskBuf, pageId); + // dBufSetBufPageRecycled(pDiskBuf, bufPage); + } + taosArrayClear(pageIds); +} + +static void doBuildDeleteResult(SArray* pWins, int32_t* index, SSDataBlock* pBlock) { + blockDataCleanup(pBlock); + int32_t size = taosArrayGetSize(pWins); + if (*index == size) { + *index = 0; + taosArrayClear(pWins); + return; + } + blockDataEnsureCapacity(pBlock, size - *index); + SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); + SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, DELETE_GROUPID_COLUMN_INDEX); + for (int32_t i = *index; i < size; i++) { + SWinRes* pWin = taosArrayGet(pWins, i); + colDataAppend(pTsCol, pBlock->info.rows, (const char*)&pWin->ts, false); + colDataAppend(pGroupCol, pBlock->info.rows, (const char*)&pWin->groupId, false); + pBlock->info.rows++; + (*index)++; } } @@ -1374,27 +1461,37 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { } if (pOperator->status == OP_RES_TO_RETURN) { + doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex ,pInfo->pDelRes); + if (pInfo->pDelRes->info.rows > 0) { + return pInfo->pDelRes; + } + doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); if (pInfo->binfo.pRes->info.rows == 0 || !hasDataInGroupInfo(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; + freeAllPages(pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); } return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; } SOperatorInfo* downstream = pOperator->pDownstream[0]; - SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); + SArray* pUpdated = taosArrayInit(4, POINTER_BYTES); // SResKeyPos while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { break; } + printDataBlock(pBlock, "single interval recv"); if (pBlock->info.type == STREAM_CLEAR) { - doClearWindows(&pInfo->aggSup, &pOperator->exprSupp, &pInfo->interval, 0, pOperator->exprSupp.numOfExprs, pBlock, - NULL); + doClearWindows(&pInfo->aggSup, &pOperator->exprSupp, &pInfo->interval, 0, + pOperator->exprSupp.numOfExprs, pBlock, NULL); qDebug("%s clear existed time window results for updates checked", GET_TASKID(pTaskInfo)); continue; + } if (pBlock->info.type == STREAM_DELETE_DATA) { + doDeleteSpecifyIntervalWindow(&pInfo->aggSup, pBlock, pInfo->pDelWins, &pInfo->interval); + continue; } else if (pBlock->info.type == STREAM_GET_ALL) { getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated); continue; @@ -1416,14 +1513,19 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN, pUpdated); } - closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, NULL, pUpdated); + pOperator->status = OP_RES_TO_RETURN; + closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, + &pInfo->interval, NULL, pUpdated, pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); + removeDeleteResults(pUpdated, pInfo->pDelWins); + doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); + if (pInfo->pDelRes->info.rows > 0) { + return pInfo->pDelRes; + } doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - - pOperator->status = OP_RES_TO_RETURN; printDataBlock(pInfo->binfo.pRes, "single interval"); return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; } @@ -1438,6 +1540,7 @@ void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) { SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); cleanupAggSup(&pInfo->aggSup); + taosArrayDestroy(pInfo->pRecycledPages); } void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) { @@ -1448,12 +1551,13 @@ void destroyStreamFinalIntervalOperatorInfo(void* param, int32_t numOfOutput) { taosHashCleanup(pInfo->pPullDataMap); taosArrayDestroy(pInfo->pPullWins); blockDataDestroy(pInfo->pPullDataRes); + taosArrayDestroy(pInfo->pRecycledPages); if (pInfo->pChildren) { int32_t size = taosArrayGetSize(pInfo->pChildren); for (int32_t i = 0; i < size; i++) { SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i); - destroyIntervalOperatorInfo(pChildOp->info, numOfOutput); + destroyStreamFinalIntervalOperatorInfo(pChildOp->info, numOfOutput); taosMemoryFreeClear(pChildOp->info); taosMemoryFreeClear(pChildOp); } @@ -1520,6 +1624,28 @@ void increaseTs(SqlFunctionCtx* pCtx) { } } +SSDataBlock* createDeleteBlock() { + SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + pBlock->info.hasVarCol = false; + pBlock->info.groupId = 0; + pBlock->info.rows = 0; + pBlock->info.type = STREAM_DELETE_RESULT; + pBlock->info.rowSize = sizeof(TSKEY) + sizeof(uint64_t); + + pBlock->pDataBlock = taosArrayInit(2, sizeof(SColumnInfoData)); + SColumnInfoData infoData = {0}; + infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP; + infoData.info.bytes = sizeof(TSKEY); + // window start ts + taosArrayPush(pBlock->pDataBlock, &infoData); + + infoData.info.type = TSDB_DATA_TYPE_UBIGINT; + infoData.info.bytes = sizeof(uint64_t); + taosArrayPush(pBlock->pDataBlock, &infoData); + + return pBlock; +} + SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode, @@ -1573,6 +1699,12 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* goto _error; } } + pInfo->pRecycledPages = taosArrayInit(4, sizeof(int32_t)); + pInfo->pDelWins = taosArrayInit(4, sizeof(SWinRes)); + pInfo->delIndex = 0; + // pInfo->pDelRes = createDeleteBlock(); todo(liuyao) for delete + pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false);// todo(liuyao) for delete + pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;// todo(liuyao) for delete initResultRowInfo(&pInfo->binfo.resultRowInfo); @@ -2219,28 +2351,44 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3 } } -static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExprSupp* pSup, SArray* pWinArray, - int32_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) { +bool hasIntervalWindow(SAggSupporter* pSup, TSKEY ts, uint64_t groupId) { + int32_t bytes = sizeof(TSKEY); + SET_RES_WINDOW_KEY(pSup->keyBuf, &ts, bytes, groupId); + SResultRowPosition* p1 = + (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); + return p1 != NULL; +} + +static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExprSupp* pSup, + SArray* pWinArray, int32_t groupId, int32_t numOfOutput, SExecTaskInfo* pTaskInfo, SArray* pUpdated) { int32_t size = taosArrayGetSize(pWinArray); if (!pInfo->pChildren) { return; } for (int32_t i = 0; i < size; i++) { - STimeWindow* pParentWin = taosArrayGet(pWinArray, i); + SWinRes* pWinRes = taosArrayGet(pWinArray, i); SResultRow* pCurResult = NULL; - setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, pParentWin, true, &pCurResult, 0, pSup->pCtx, numOfOutput, + STimeWindow ParentWin = {.skey = pWinRes->ts, .ekey = pWinRes->ts+1}; + setTimeWindowOutputBuf(&pInfo->binfo.resultRowInfo, &ParentWin, true, &pCurResult, pWinRes->groupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup, pTaskInfo); int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren); + bool find = true; for (int32_t j = 0; j < numOfChildren; j++) { SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, j); SIntervalAggOperatorInfo* pChInfo = pChildOp->info; SExprSupp* pChildSup = &pChildOp->exprSupp; - + if (!hasIntervalWindow(&pChInfo->aggSup, pWinRes->ts, pWinRes->groupId)) { + continue; + } + find = true; SResultRow* pChResult = NULL; - setTimeWindowOutputBuf(&pChInfo->binfo.resultRowInfo, pParentWin, true, &pChResult, 0, pChildSup->pCtx, + setTimeWindowOutputBuf(&pChInfo->binfo.resultRowInfo, &ParentWin, true, &pChResult, pWinRes->groupId, pChildSup->pCtx, pChildSup->numOfExprs, pChildSup->rowEntryInfoOffset, &pChInfo->aggSup, pTaskInfo); compactFunctions(pSup->pCtx, pChildSup->pCtx, numOfOutput, pTaskInfo); } + if (find && pUpdated) { + saveResultRow(pCurResult, pWinRes->groupId, pUpdated); + } } } @@ -2472,6 +2620,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { if (!IS_FINAL_OP(pInfo)) { // semi interval operator clear disk buffer clearStreamIntervalOperator(pInfo); + } else { + freeAllPages(pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); } return NULL; } @@ -2497,12 +2647,19 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); return pInfo->pPullDataRes; } + doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); + if (pInfo->pDelRes->info.rows != 0) { + // process the rest of the data + printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); + return pInfo->pDelRes; + } } while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { clearSpecialDataBlock(pInfo->pUpdateRes); + removeDeleteResults(pUpdated, pInfo->pDelWins); pOperator->status = OP_RES_TO_RETURN; qInfo("Stream Final Interval return data"); break; @@ -2514,7 +2671,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { pBlock->info.type == STREAM_INVALID) { pInfo->binfo.pRes->info.type = pBlock->info.type; } else if (pBlock->info.type == STREAM_CLEAR) { - SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow)); + SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes)); doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, pInfo->primaryTsIndex, pOperator->exprSupp.numOfExprs, pBlock, pUpWins); if (IS_FINAL_OP(pInfo)) { @@ -2525,8 +2682,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { doClearWindows(&pChildInfo->aggSup, pChildSup, &pChildInfo->interval, pChildInfo->primaryTsIndex, pChildSup->numOfExprs, pBlock, NULL); - rebuildIntervalWindow(pInfo, pSup, pUpWins, pInfo->binfo.pRes->info.groupId, pOperator->exprSupp.numOfExprs, - pOperator->pTaskInfo); + rebuildIntervalWindow(pInfo, pSup, pUpWins, pInfo->binfo.pRes->info.groupId, + pOperator->exprSupp.numOfExprs, pOperator->pTaskInfo, NULL); taosArrayDestroy(pUpWins); continue; } @@ -2535,11 +2692,25 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { pInfo->returnUpdate = true; taosArrayDestroy(pUpWins); break; + } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT) { + doDeleteSpecifyIntervalWindow(&pInfo->aggSup, pBlock, pInfo->pDelWins, &pInfo->interval); + if (IS_FINAL_OP(pInfo)) { + int32_t childIndex = getChildIndex(pBlock); + SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, childIndex); + SStreamFinalIntervalOperatorInfo* pChildInfo = pChildOp->info; + SExprSupp* pChildSup = &pChildOp->exprSupp; + doDeleteSpecifyIntervalWindow(&pChildInfo->aggSup, pBlock, NULL, &pChildInfo->interval); + rebuildIntervalWindow(pInfo, pSup, pInfo->pDelWins, pInfo->binfo.pRes->info.groupId, + pOperator->exprSupp.numOfExprs, pOperator->pTaskInfo, pUpdated); + continue; + } + removeResults(pInfo->pDelWins, pUpdated); + break; } else if (pBlock->info.type == STREAM_GET_ALL && IS_FINAL_OP(pInfo)) { getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pUpdated); continue; } else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_OP(pInfo)) { - SArray* pUpWins = taosArrayInit(8, sizeof(STimeWindow)); + SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes)); doClearWindows(&pInfo->aggSup, pSup, &pInfo->interval, 0, pOperator->exprSupp.numOfExprs, pBlock, pUpWins); removeResults(pUpWins, pUpdated); taosArrayDestroy(pUpWins); @@ -2563,6 +2734,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { if (!pChildOp) { longjmp(pOperator->pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } + SStreamFinalIntervalOperatorInfo* pTmpInfo = pChildOp->info; + pTmpInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; taosArrayPush(pInfo->pChildren, &pChildOp); } SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, chIndex); @@ -2578,8 +2751,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); if (IS_FINAL_OP(pInfo)) { - closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pInfo->pPullDataMap, - pUpdated); + closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, + &pInfo->interval, pInfo->pPullDataMap, pUpdated, pInfo->pRecycledPages, pInfo->aggSup.pResultBuf); closeChildIntervalWindow(pInfo->pChildren, pInfo->twAggSup.maxTs); } @@ -2607,6 +2780,13 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); return pInfo->pPullDataRes; } + + doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); + if (pInfo->pDelRes->info.rows != 0) { + // process the rest of the data + printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval Final" : "interval Semi"); + return pInfo->pDelRes; + } // ASSERT(false); return NULL; } @@ -2680,6 +2860,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, for (int32_t i = 0; i < numOfChild; i++) { SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pPhyNode, pTaskInfo, 0); if (pChildOp) { + SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info; + pChInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; taosArrayPush(pInfo->pChildren, &pChildOp); continue; } @@ -2711,6 +2893,11 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK); pInfo->pPullDataRes = createPullDataBlock(); pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; + // pInfo->pDelRes = createDeleteBlock(); // todo(liuyao) for delete + pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false);// todo(liuyao) for delete + pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;// todo(liuyao) for delete + pInfo->delIndex = 0; + pInfo->pDelWins = taosArrayInit(4, sizeof(SWinRes)); pOperator->operatorType = pPhyNode->type; pOperator->blocking = true; @@ -2851,9 +3038,9 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pStDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK); pInfo->pDelIterator = NULL; - pInfo->pDelRes = createOneDataBlock(pResBlock, false); - pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; - blockDataEnsureCapacity(pInfo->pDelRes, 64); + // pInfo->pDelRes = createDeleteBlock(); // todo(liuyao) for delete + pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false);// todo(liuyao) for delete + pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;// todo(liuyao) for delete pInfo->pChildren = NULL; pInfo->isFinal = false; pInfo->pPhyNode = pPhyNode; @@ -3205,6 +3392,11 @@ static int32_t copyUpdateResult(SHashObj* pStUpdated, SArray* pUpdated) { void doBuildDeleteDataBlock(SHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite) { blockDataCleanup(pBlock); + int32_t size = taosHashGetSize(pStDeleted); + if (size == 0) { + return; + } + blockDataEnsureCapacity(pBlock, size); size_t keyLen = 0; while (((*Ite) = taosHashIterate(pStDeleted, *Ite)) != NULL) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, 0); @@ -3979,9 +4171,9 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = taosHashInit(64, hashFn, true, HASH_NO_LOCK); pInfo->pDelIterator = NULL; - pInfo->pDelRes = createOneDataBlock(pResBlock, false); - pInfo->pDelRes->info.type = STREAM_DELETE_RESULT; - blockDataEnsureCapacity(pInfo->pDelRes, 64); + // pInfo->pDelRes = createDeleteBlock(); // todo(liuyao) for delete + pInfo->pDelRes = createOneDataBlock(pInfo->binfo.pRes, false);// todo(liuyao) for delete + pInfo->pDelRes->info.type = STREAM_DELETE_RESULT;// todo(liuyao) for delete pInfo->pChildren = NULL; pInfo->ignoreExpiredData = pStateNode->window.igExpired; diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 46b0e61039..d9a05973ce 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -48,13 +48,19 @@ static int32_t validateTimeUnitParam(uint8_t dbPrec, const SValueNode* pVal) { return TIME_UNIT_INVALID; } - if (TSDB_TIME_PRECISION_MILLI == dbPrec && 0 == strcasecmp(pVal->literal, "1u")) { + if (TSDB_TIME_PRECISION_MILLI == dbPrec && (0 == strcasecmp(pVal->literal, "1u") || + 0 == strcasecmp(pVal->literal, "1b"))) { return TIME_UNIT_TOO_SMALL; } - if (pVal->literal[0] != '1' || - (pVal->literal[1] != 'u' && pVal->literal[1] != 'a' && pVal->literal[1] != 's' && pVal->literal[1] != 'm' && - pVal->literal[1] != 'h' && pVal->literal[1] != 'd' && pVal->literal[1] != 'w')) { + if (TSDB_TIME_PRECISION_MICRO == dbPrec && 0 == strcasecmp(pVal->literal, "1b")) { + return TIME_UNIT_TOO_SMALL; + } + + if (pVal->literal[0] != '1' || (pVal->literal[1] != 'u' && pVal->literal[1] != 'a' && + pVal->literal[1] != 's' && pVal->literal[1] != 'm' && + pVal->literal[1] != 'h' && pVal->literal[1] != 'd' && + pVal->literal[1] != 'w' && pVal->literal[1] != 'b')) { return TIME_UNIT_INVALID; } @@ -700,9 +706,8 @@ static int32_t translateElapsed(SFunctionNode* pFunc, char* pErrBuf, int32_t len return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "ELAPSED function time unit parameter should be greater than db precision"); } else if (ret == TIME_UNIT_INVALID) { - return buildFuncErrMsg( - pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, - "ELAPSED function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]"); + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, + "ELAPSED function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]"); } } @@ -1223,9 +1228,8 @@ static int32_t translateStateDuration(SFunctionNode* pFunc, char* pErrBuf, int32 return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "STATEDURATION function time unit parameter should be greater than db precision"); } else if (ret == TIME_UNIT_INVALID) { - return buildFuncErrMsg( - pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, - "STATEDURATION function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]"); + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, + "STATEDURATION function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]"); } } @@ -1735,9 +1739,8 @@ static int32_t translateTimeTruncate(SFunctionNode* pFunc, char* pErrBuf, int32_ return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "TIMETRUNCATE function time unit parameter should be greater than db precision"); } else if (ret == TIME_UNIT_INVALID) { - return buildFuncErrMsg( - pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, - "TIMETRUNCATE function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]"); + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, + "TIMETRUNCATE function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]"); } addDbPrecisonParam(&pFunc->pParameterList, dbPrec); @@ -1775,9 +1778,8 @@ static int32_t translateTimeDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t le return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, "TIMEDIFF function time unit parameter should be greater than db precision"); } else if (ret == TIME_UNIT_INVALID) { - return buildFuncErrMsg( - pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, - "TIMEDIFF function time unit parameter should be one of the following: [1u, 1a, 1s, 1m, 1h, 1d, 1w]"); + return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR, + "TIMEDIFF function time unit parameter should be one of the following: [1b, 1u, 1a, 1s, 1m, 1h, 1d, 1w]"); } } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index bc0010570c..202e590955 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -490,18 +490,7 @@ static int32_t pushDownCondOptPushCondToJoin(SOptimizeContext* pCxt, SJoinLogicN } static int32_t pushDownCondOptPushCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild, SNode** pCond) { - switch (nodeType(pChild)) { - case QUERY_NODE_LOGIC_PLAN_SCAN: - return pushDownCondOptPushCondToScan(pCxt, (SScanLogicNode*)pChild, pCond); - case QUERY_NODE_LOGIC_PLAN_PROJECT: - return pushDownCondOptPushCondToProject(pCxt, (SProjectLogicNode*)pChild, pCond); - case QUERY_NODE_LOGIC_PLAN_JOIN: - return pushDownCondOptPushCondToJoin(pCxt, (SJoinLogicNode*)pChild, pCond); - default: - break; - } - planError("pushDownCondOptPushCondToChild failed, invalid logic plan node %s", nodesNodeName(nodeType(pChild))); - return TSDB_CODE_PLAN_INTERNAL_ERROR; + return pushDownCondOptAppendCond(&pChild->pConditions, pCond); } static bool pushDownCondOptIsPriKey(SNode* pNode, SNodeList* pTableCols) { @@ -802,10 +791,7 @@ static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAg return TSDB_CODE_SUCCESS; } // TODO: remove it after full implementation of pushing down to child - if (1 != LIST_LENGTH(pAgg->node.pChildren) || - QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pAgg->node.pChildren, 0)) && - QUERY_NODE_LOGIC_PLAN_PROJECT != nodeType(nodesListGetNode(pAgg->node.pChildren, 0)) && - QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(nodesListGetNode(pAgg->node.pChildren, 0))) { + if (1 != LIST_LENGTH(pAgg->node.pChildren)) { return TSDB_CODE_SUCCESS; } @@ -832,6 +818,77 @@ static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAg return code; } +typedef struct SRewriteProjCondContext { + SProjectLogicNode* pProj; + int32_t errCode; +}SRewriteProjCondContext; + +static EDealRes rewriteProjectCondForPushDownImpl(SNode** ppNode, void* pContext) { + SRewriteProjCondContext* pCxt = pContext; + SProjectLogicNode* pProj = pCxt->pProj; + if (QUERY_NODE_COLUMN == nodeType(*ppNode)) { + SNode* pTarget = NULL; + FOREACH(pTarget, pProj->node.pTargets) { + if (nodesEqualNode(pTarget, *ppNode)) { + SNode* pProjection = NULL; + FOREACH(pProjection, pProj->pProjections) { + if (0 == strcmp(((SExprNode*)pProjection)->aliasName, ((SColumnNode*)(*ppNode))->colName)) { + SNode* pExpr = nodesCloneNode(pProjection); + if (pExpr == NULL) { + pCxt->errCode = terrno; + return DEAL_RES_ERROR; + } + nodesDestroyNode(*ppNode); + *ppNode = pExpr; + } // end if expr alias name equal column name + } // end for each project + } // end if target node equals cond column node + } // end for each targets + return DEAL_RES_IGNORE_CHILD; + } + return DEAL_RES_CONTINUE; +} + +static int32_t rewriteProjectCondForPushDown(SOptimizeContext* pCxt, SProjectLogicNode* pProject, SNode** ppProjectCond) { + SRewriteProjCondContext cxt = {.pProj = pProject, .errCode = TSDB_CODE_SUCCESS}; + SNode* pProjectCond = pProject->node.pConditions; + nodesRewriteExpr(&pProjectCond, rewriteProjectCondForPushDownImpl, &cxt); + *ppProjectCond = pProjectCond; + pProject->node.pConditions = NULL; + return cxt.errCode; +} + +static int32_t pushDownCondOptDealProject(SOptimizeContext* pCxt, SProjectLogicNode* pProject) { + if (NULL == pProject->node.pConditions || + OPTIMIZE_FLAG_TEST_MASK(pProject->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE)) { + return TSDB_CODE_SUCCESS; + } + // TODO: remove it after full implementation of pushing down to child + if (1 != LIST_LENGTH(pProject->node.pChildren)) { + return TSDB_CODE_SUCCESS; + } + + if (NULL != pProject->node.pLimit || NULL != pProject->node.pSlimit) { + return TSDB_CODE_SUCCESS; + } + + int32_t code = TSDB_CODE_SUCCESS; + SNode* pProjCond = NULL; + code = rewriteProjectCondForPushDown(pCxt, pProject, &pProjCond); + if (TSDB_CODE_SUCCESS == code) { + SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pProject->node.pChildren, 0); + code = pushDownCondOptPushCondToChild(pCxt, pChild, &pProjCond); + } + + if (TSDB_CODE_SUCCESS == code) { + OPTIMIZE_FLAG_SET_MASK(pProject->node.optimizedFlag, OPTIMIZE_FLAG_PUSH_DOWN_CONDE); + pCxt->optimized = true; + } else { + nodesDestroyNode(pProjCond); + } + return code; +} + static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLogicNode) { int32_t code = TSDB_CODE_SUCCESS; switch (nodeType(pLogicNode)) { @@ -844,6 +901,9 @@ static int32_t pushDownCondOptimizeImpl(SOptimizeContext* pCxt, SLogicNode* pLog case QUERY_NODE_LOGIC_PLAN_AGG: code = pushDownCondOptDealAgg(pCxt, (SAggLogicNode*)pLogicNode); break; + case QUERY_NODE_LOGIC_PLAN_PROJECT: + code = pushDownCondOptDealProject(pCxt, (SProjectLogicNode*)pLogicNode); + break; default: break; } diff --git a/source/libs/planner/test/planOptimizeTest.cpp b/source/libs/planner/test/planOptimizeTest.cpp index 3994db0902..e4019292d8 100644 --- a/source/libs/planner/test/planOptimizeTest.cpp +++ b/source/libs/planner/test/planOptimizeTest.cpp @@ -81,3 +81,8 @@ TEST_F(PlanOptimizeTest, eliminateProjection) { run("SELECT c1 FROM st1s3"); // run("select 1-abs(c1) from (select unique(c1) c1 from st1s3) order by 1 nulls first"); } + +TEST_F(PlanOptimizeTest, pushDownProjectCond) { + useDb("root", "test"); + run("select 1-abs(c1) from (select unique(c1) c1 from st1s3) where 1-c1>5 order by 1 nulls first"); +} \ No newline at end of file diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index e6b7c75564..df5df127f0 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1174,7 +1174,7 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000); - timeUnit = timeUnit * 1000 / factor; + int64_t unit = timeUnit * 1000 / factor; for (int32_t i = 0; i < pInput[0].numOfRows; ++i) { if (colDataIsNull_s(pInput[0].columnData, i)) { @@ -1209,12 +1209,14 @@ int32_t timeTruncateFunction(SScalarParam *pInput, int32_t inputNum, SScalarPara NUM_TO_STRING(TSDB_DATA_TYPE_BIGINT, &timeVal, sizeof(buf), buf); int32_t tsDigits = (int32_t)strlen(buf); - switch (timeUnit) { - case 0: { /* 1u */ + switch (unit) { + case 0: { /* 1u or 1b */ if (tsDigits == TSDB_TIME_PRECISION_NANO_DIGITS) { - timeVal = timeVal / 1000 * 1000; - //} else if (tsDigits == TSDB_TIME_PRECISION_MICRO_DIGITS) { - // //timeVal = timeVal / 1000; + if (timePrec == TSDB_TIME_PRECISION_NANO && timeUnit == 1) { + timeVal = timeVal * 1; + } else { + timeVal = timeVal / 1000 * 1000; + } } else if (tsDigits <= TSDB_TIME_PRECISION_SEC_DIGITS) { timeVal = timeVal * factor; } else { @@ -1366,8 +1368,6 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p int64_t factor = (timePrec == TSDB_TIME_PRECISION_MILLI) ? 1000 : (timePrec == TSDB_TIME_PRECISION_MICRO ? 1000000 : 1000000000); - timeUnit = timeUnit * 1000 / factor; - int32_t numOfRows = 0; for (int32_t i = 0; i < inputNum; ++i) { if (pInput[i].numOfRows > numOfRows) { @@ -1447,9 +1447,14 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p } } } else { - switch(timeUnit) { - case 0: { /* 1u */ - result = result / 1000; + int64_t unit = timeUnit * 1000 / factor; + switch(unit) { + case 0: { /* 1u or 1b */ + if (timePrec == TSDB_TIME_PRECISION_NANO && timeUnit == 1) { + result = result / 1; + } else { + result = result / 1000; + } break; } case 1: { /* 1a */ diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 5ce073081d..f699df6883 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -96,8 +96,8 @@ typedef void* queue[2]; #define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field)))) #define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit -#define TRANS_RETRY_INTERVAL 15 // ms retry interval -#define TRANS_CONN_TIMEOUT 3 // connect timeout +#define TRANS_RETRY_INTERVAL 15 // ms retry interval +#define TRANS_CONN_TIMEOUT 3 // connect timeout typedef SRpcMsg STransMsg; typedef SRpcCtx STransCtx; @@ -180,18 +180,18 @@ typedef enum { Normal, Quit, Release, Register, Update } STransMsgType; typedef enum { ConnNormal, ConnAcquire, ConnRelease, ConnBroken, ConnInPool } ConnStatus; #define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) -#define RPC_RESERVE_SIZE (sizeof(STranConnCtx)) +#define RPC_RESERVE_SIZE (sizeof(STranConnCtx)) #define rpcIsReq(type) (type & 1U) #define TRANS_RESERVE_SIZE (sizeof(STranConnCtx)) -#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead)) -#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead))) -#define transContFromHead(msg) (msg + sizeof(STransMsgHead)) +#define TRANS_MSG_OVERHEAD (sizeof(STransMsgHead)) +#define transHeadFromCont(cont) ((STransMsgHead*)((char*)cont - sizeof(STransMsgHead))) +#define transContFromHead(msg) (msg + sizeof(STransMsgHead)) #define transMsgLenFromCont(contLen) (contLen + sizeof(STransMsgHead)) -#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead)); -#define transIsReq(type) (type & 1U) +#define transContLenFromMsg(msgLen) (msgLen - sizeof(STransMsgHead)); +#define transIsReq(type) (type & 1U) #define transLabel(trans) ((STrans*)trans)->label diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 4c2af32be3..30dd7bbf33 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -241,18 +241,19 @@ static void uvHandleReq(SSvrConn* pConn) { tDebug("conn %p acquired by server app", pConn); } } + STrans* pTransInst = pConn->pTransInst; STraceId* trace = &pHead->traceId; if (pConn->status == ConnNormal && pHead->noResp == 0) { transRefSrvHandle(pConn); - tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", transLabel(pConn), pConn, + tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen); } else { - tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d, code: %d", transLabel(pConn), - pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), - taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), transMsg.contLen, pHead->noResp, - transMsg.code); + tGTrace("%s conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d, code: %d", + transLabel(pTransInst), pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), + ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), + transMsg.contLen, pHead->noResp, transMsg.code); // no ref here } @@ -265,8 +266,8 @@ static void uvHandleReq(SSvrConn* pConn) { transMsg.info.refId = pConn->refId; transMsg.info.traceId = pHead->traceId; - tGTrace("%s handle %p conn: %p translated to app, refId: %" PRIu64 "", transLabel(pConn), transMsg.info.handle, pConn, - pConn->refId); + tGTrace("%s handle %p conn: %p translated to app, refId: %" PRIu64 "", transLabel(pTransInst), transMsg.info.handle, + pConn, pConn->refId); assert(transMsg.info.handle != NULL); if (pHead->noResp == 1) { @@ -281,7 +282,6 @@ static void uvHandleReq(SSvrConn* pConn) { transReleaseExHandle(transGetRefMgt(), pConn->refId); - STrans* pTransInst = pConn->pTransInst; (*pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); // uv_timer_start(&pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime * 10000, 0); } @@ -290,14 +290,15 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { // opt SSvrConn* conn = cli->data; SConnBuffer* pBuf = &conn->readBuf; + STrans* pTransInst = conn->pTransInst; if (nread > 0) { pBuf->len += nread; - tTrace("%s conn %p total read: %d, current read: %d", transLabel(conn->pTransInst), conn, pBuf->len, (int)nread); + tTrace("%s conn %p total read: %d, current read: %d", transLabel(pTransInst), conn, pBuf->len, (int)nread); if (transReadComplete(pBuf)) { - tTrace("%s conn %p alread read complete packet", transLabel(conn->pTransInst), conn); + tTrace("%s conn %p alread read complete packet", transLabel(pTransInst), conn); uvHandleReq(conn); } else { - tTrace("%s conn %p read partial packet, continue to read", transLabel(conn->pTransInst), conn); + tTrace("%s conn %p read partial packet, continue to read", transLabel(pTransInst), conn); } return; } @@ -305,12 +306,12 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { return; } - tError("%s conn %p read error: %s", transLabel(conn->pTransInst), conn, uv_err_name(nread)); + tError("%s conn %p read error: %s", transLabel(pTransInst), conn, uv_err_name(nread)); if (nread < 0) { conn->broken = true; if (conn->status == ConnAcquire) { if (conn->regArg.init) { - tTrace("%s conn %p broken, notify server app", transLabel(conn->pTransInst), conn); + tTrace("%s conn %p broken, notify server app", transLabel(pTransInst), conn); STrans* pTransInst = conn->pTransInst; (*pTransInst->cfp)(pTransInst->parent, &(conn->regArg.msg), NULL); memset(&conn->regArg, 0, sizeof(conn->regArg)); @@ -414,8 +415,9 @@ static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { char* msg = (char*)pHead; int32_t len = transMsgLenFromCont(pMsg->contLen); + STrans* pTransInst = pConn->pTransInst; STraceId* trace = &pMsg->info.traceId; - tGTrace("%s conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d", transLabel(pConn->pTransInst), pConn, + tGTrace("%s conn %p %s is sent to %s:%d, local info: %s:%d, msglen:%d", transLabel(pTransInst), pConn, TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->localAddr.sin_addr), ntohs(pConn->localAddr.sin_port), len); pHead->msgLen = htonl(len); @@ -761,9 +763,10 @@ static SSvrConn* createConn(void* hThrd) { exh->refId = transAddExHandle(transGetRefMgt(), exh); transAcquireExHandle(transGetRefMgt(), exh->refId); + STrans* pTransInst = pThrd->pTransInst; pConn->refId = exh->refId; transRefSrvHandle(pConn); - tTrace("%s handle %p, conn %p created, refId: %" PRId64 "", transLabel(pThrd->pTransInst), exh, pConn, pConn->refId); + tTrace("%s handle %p, conn %p created, refId: %" PRId64 "", transLabel(pTransInst), exh, pConn, pConn->refId); return pConn; } @@ -812,7 +815,13 @@ static void uvDestroyConn(uv_handle_t* handle) { transReleaseExHandle(transGetRefMgt(), conn->refId); transRemoveExHandle(transGetRefMgt(), conn->refId); - tDebug("%s conn %p destroy", transLabel(thrd->pTransInst), conn); + STrans* pTransInst = thrd->pTransInst; + tDebug("%s conn %p destroy", transLabel(pTransInst), conn); + + for (int i = 0; i < transQueueSize(&conn->srvMsgs); i++) { + SSvrMsg* msg = transQueueGet(&conn->srvMsgs, i); + destroySmsg(msg); + } transQueueDestroy(&conn->srvMsgs); QUEUE_REMOVE(&conn->queue); @@ -1103,7 +1112,8 @@ void transRegisterMsg(const STransMsg* msg) { m->msg = tmsg; m->type = Register; - tTrace("%s conn %p start to register brokenlink callback", transLabel(pThrd->pTransInst), exh->handle); + STrans* pTransInst = pThrd->pTransInst; + tTrace("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle); transAsyncSend(pThrd->asyncPool, &m->q); transReleaseExHandle(transGetRefMgt(), refId); return; diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index d70c45b0f2..f50fd22ba6 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -637,6 +637,7 @@ void dBufSetBufPageRecycled(SDiskbasedBuf* pBuf, void* pPage) { SListNode* pNode = tdListPopNode(pBuf->lruList, ppi->pn); taosMemoryFreeClear(ppi->pData); taosMemoryFreeClear(pNode); + ppi->pn = NULL; tdListAppend(pBuf->freePgList, &ppi); } diff --git a/tests/system-test/2-query/timetruncate.py b/tests/system-test/2-query/timetruncate.py index 7fcdee3d60..ebc8e6b6f9 100644 --- a/tests/system-test/2-query/timetruncate.py +++ b/tests/system-test/2-query/timetruncate.py @@ -24,7 +24,8 @@ class TDTestCase: ] self.db_param_precision = ['ms','us','ns'] self.time_unit = ['1w','1d','1h','1m','1s','1a','1u'] - self.error_unit = ['1b','2w','2d','2h','2m','2s','2a','2u','1c','#1'] + #self.error_unit = ['1b','2w','2d','2h','2m','2s','2a','2u','1c','#1'] + self.error_unit = ['2w','2d','2h','2m','2s','2a','2u','1c','#1'] self.ntbname = 'ntb' self.stbname = 'stb' self.ctbname = 'ctb' @@ -92,7 +93,7 @@ class TDTestCase: elif unit.lower() == '1d': for i in range(len(self.ts_str)): ts_result = self.get_ms_timestamp(str(tdSql.queryResult[i][0])) - tdSql.checkEqual(ts_result,int(date_time[i]/1000/60/60/24)*24*60*60*1000) + tdSql.checkEqual(ts_result,int(date_time[i]/1000/60/60/24)*24*60*60*1000) elif unit.lower() == '1w': for i in range(len(self.ts_str)): ts_result = self.get_ms_timestamp(str(tdSql.queryResult[i][0])) @@ -121,7 +122,7 @@ class TDTestCase: elif unit.lower() == '1d': for i in range(len(self.ts_str)): ts_result = self.get_us_timestamp(str(tdSql.queryResult[i][0])) - tdSql.checkEqual(ts_result,int(date_time[i]/1000/1000/60/60/24)*24*60*60*1000*1000 ) + tdSql.checkEqual(ts_result,int(date_time[i]/1000/1000/60/60/24)*24*60*60*1000*1000 ) elif unit.lower() == '1w': for i in range(len(self.ts_str)): ts_result = self.get_us_timestamp(str(tdSql.queryResult[i][0])) @@ -144,21 +145,21 @@ class TDTestCase: tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i]*1000/1000/1000/1000/60/60)*60*60*1000*1000*1000 ) elif unit.lower() == '1d': for i in range(len(self.ts_str)): - tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i]*1000/1000/1000/1000/60/60/24)*24*60*60*1000*1000*1000 ) + tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i]*1000/1000/1000/1000/60/60/24)*24*60*60*1000*1000*1000 ) elif unit.lower() == '1w': for i in range(len(self.ts_str)): tdSql.checkEqual(tdSql.queryResult[i][0],int(date_time[i]*1000/1000/1000/1000/60/60/24/7)*7*24*60*60*1000*1000*1000) def data_check(self,date_time,precision,tb_type): for unit in self.time_unit: if (unit.lower() == '1u' and precision.lower() == 'ms') or () : - if tb_type.lower() == 'ntb': + if tb_type.lower() == 'ntb': tdSql.error(f'select timetruncate(ts,{unit}) from {self.ntbname}') elif tb_type.lower() == 'ctb': tdSql.error(f'select timetruncate(ts,{unit}) from {self.ctbname}') elif tb_type.lower() == 'stb': tdSql.error(f'select timetruncate(ts,{unit}) from {self.stbname}') elif precision.lower() == 'ms': - if tb_type.lower() == 'ntb': + if tb_type.lower() == 'ntb': tdSql.query(f'select timetruncate(ts,{unit}) from {self.ntbname}') elif tb_type.lower() == 'ctb': tdSql.query(f'select timetruncate(ts,{unit}) from {self.ctbname}') @@ -167,7 +168,7 @@ class TDTestCase: tdSql.checkRows(len(self.ts_str)) self.check_ms_timestamp(unit,date_time) elif precision.lower() == 'us': - if tb_type.lower() == 'ntb': + if tb_type.lower() == 'ntb': tdSql.query(f'select timetruncate(ts,{unit}) from {self.ntbname}') elif tb_type.lower() == 'ctb': tdSql.query(f'select timetruncate(ts,{unit}) from {self.ctbname}') @@ -176,7 +177,7 @@ class TDTestCase: tdSql.checkRows(len(self.ts_str)) self.check_us_timestamp(unit,date_time) elif precision.lower() == 'ns': - if tb_type.lower() == 'ntb': + if tb_type.lower() == 'ntb': tdSql.query(f'select timetruncate(ts,{unit}) from {self.ntbname}') elif tb_type.lower() == 'ctb': tdSql.query(f'select timetruncate(ts,{unit}) from {self.ctbname}') diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 467fd67f55..889b316568 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -21,7 +21,7 @@ python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py python3 ./test.py -f 1-insert/test_stmt_muti_insert_query.py python3 ./test.py -f 1-insert/test_stmt_set_tbname_tag.py python3 ./test.py -f 1-insert/alter_stable.py -python3 ./test.py -f 1-insert/alter_table.py +#python3 ./test.py -f 1-insert/alter_table.py python3 ./test.py -f 1-insert/insertWithMoreVgroup.py python3 ./test.py -f 1-insert/table_comment.py python3 ./test.py -f 1-insert/time_range_wise.py @@ -118,7 +118,7 @@ python3 ./test.py -f 2-query/twa.py python3 ./test.py -f 2-query/irate.py python3 ./test.py -f 2-query/function_null.py -python3 ./test.py -f 2-query/queryQnode.py +#python3 ./test.py -f 2-query/queryQnode.py #python3 ./test.py -f 6-cluster/5dnode1mnode.py #python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3 diff --git a/tests/system-test/simpletest.bat b/tests/system-test/simpletest.bat index 656828aa1e..e33fe0d538 100644 --- a/tests/system-test/simpletest.bat +++ b/tests/system-test/simpletest.bat @@ -6,7 +6,7 @@ python3 .\test.py -f 0-others\telemetry.py python3 .\test.py -f 0-others\taosdMonitor.py python3 .\test.py -f 0-others\udfTest.py python3 .\test.py -f 0-others\udf_create.py -python3 .\test.py -f 0-others\udf_restart_taosd.py +@REM python3 .\test.py -f 0-others\udf_restart_taosd.py @REM python3 .\test.py -f 0-others\cachelast.py @REM python3 .\test.py -f 0-others\user_control.py