diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 8a8948fb17..1d9a9bcc61 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -155,7 +155,6 @@ typedef enum EStreamType { STREAM_MID_RETRIEVE, STREAM_PARTITION_DELETE_DATA, STREAM_GET_RESULT, - STREAM_DELETE_GROUP_DATA, STREAM_DROP_CHILD_TABLE, } EStreamType; @@ -403,7 +402,8 @@ int32_t dumpConfToDataBlock(SSDataBlock* pBlock, int32_t startCol); #define TSMA_RES_STB_EXTRA_COLUMN_NUM 4 // 3 columns: _wstart, _wend, _wduration, 1 tag: tbname static inline bool isTsmaResSTb(const char* stbName) { - return false; + static bool showTsmaTables = false; + if (showTsmaTables) return false; const char* pos = strstr(stbName, TSMA_RES_STB_POSTFIX); if (pos && strlen(stbName) == (pos - stbName) + strlen(TSMA_RES_STB_POSTFIX)) { return true; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index c81d649284..c22a3da5ad 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -316,7 +316,6 @@ TD_DEF_MSG_TYPE(TDMT_VND_ARB_CHECK_SYNC, "vnode-arb-check-sync", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_FETCH_TTL_EXPIRED_TBS, "vnode-fetch-ttl-expired-tbs", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TABLE_NAME, "vnode-table-name", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_DROP_TSMA_CTB, "vnode-drop-tsma-ctb", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_VND_MSG) TD_NEW_MSG_SEG(TDMT_SCH_MSG) // 3<<8 diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index de10d6844e..2cf791c8da 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -462,7 +462,7 @@ struct SStreamTask { struct SStreamMeta* pMeta; SSHashObj* pNameMap; void* pBackend; - int8_t subtableWithoutMd5; + int8_t subtableWithoutMd5; // only for tsma stream tasks char reserve[256]; char* backendPath; }; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 4b79ecf43a..0d804eadf0 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -182,7 +182,6 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TB_WITH_TSMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TSMA_CTB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_ANAL_ALGO, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_IP_WHITE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index a0356a6c4d..006f44b349 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -1014,7 +1014,6 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_NAME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TSMA_CTB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index a1044a9f86..e782d505a9 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -96,7 +96,6 @@ int32_t mndInitStb(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_DROP_TB_WITH_TSMA, mndProcessDropTbWithTsma); mndSetMsgHandle(pMnode, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mndProcessFetchTtlExpiredTbs); mndSetMsgHandle(pMnode, TDMT_VND_DROP_TABLE_RSP, mndTransProcessRsp); - mndSetMsgHandle(pMnode, TDMT_VND_DROP_TSMA_CTB_RSP, mndTransProcessRsp); // mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveStbReq); // mndSetMsgHandle(pMnode, TDMT_MND_CREATE_INDEX, mndProcessCreateIndexReq); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 391cbe78fb..ea2f3f91be 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -363,10 +363,10 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con tqError("%s failed to create data submit for stream since out of memory", id); return code; } - } else if (pCont->msgType == TDMT_VND_DELETE || pCont->msgType == TDMT_VND_DROP_TSMA_CTB) { + } else if (pCont->msgType == TDMT_VND_DELETE) { void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead)); int32_t len = pCont->bodyLen - sizeof(SMsgHead); - EStreamType blockType = pCont->msgType == TDMT_VND_DELETE ? STREAM_DELETE_DATA : STREAM_DELETE_GROUP_DATA; + EStreamType blockType = pCont->msgType == STREAM_DELETE_DATA; code = tqExtractDelDataBlock(pBody, len, ver, (void**)pItem, 0, blockType); if (code == TSDB_CODE_SUCCESS) { if (*pItem == NULL) { diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 3c14870d92..3c525e7e9c 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -464,6 +464,19 @@ static int32_t doBuildAndSendDropTableMsg(SVnode* pVnode, char* pStbFullname, SS TSDB_CHECK_CODE(terrno, lino, _exit); } } + + SMetaReader mr = {0}; + metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK); + // TODO wjm handle only one table + code = metaGetTableEntryByName(&mr, tbName); + if (isValidDstChildTable(&mr, TD_VID(pVnode), tbName, pTask->outputInfo.tbSink.stbUid)) { + STableSinkInfo* pTableSinkInfo = NULL; + bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, pDataBlock->info.id.groupId, &pTableSinkInfo); + if (alreadyCached) { + pTableSinkInfo->uid = mr.me.uid; + } + } + metaReaderClear(&mr); tqDebug("s-task:%s build drop %d table(s) msg", id, rows); code = tqPutReqToQueue(pVnode, &batchReq, encodeDropChildTableForRPC, TDMT_VND_DROP_TABLE); TSDB_CHECK_CODE(code, lino, _exit); @@ -473,10 +486,10 @@ static int32_t doBuildAndSendDropTableMsg(SVnode* pVnode, char* pStbFullname, SS memcpy(tbName, varDataVal(pData), varDataLen(pData)); tbName[varDataLen(pData) + 1] = 0; int64_t uid = *(int64_t*)colDataGetData(pUidCol, i); + // TODO wjm remove uid it's not my uid code = doWaitForDstTableDropped(pVnode, pTask, tbName, uid); TSDB_CHECK_CODE(code, lino, _exit); } - return code; _exit: if (batchReq.pArray) { diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 0b3d7b180a..a92049e5f3 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -777,15 +777,6 @@ int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, vo TSDB_CHECK_NULL(pCol, code, lino, _exit, terrno); code = colDataSetVal(pCol, i, (const char* )&pReq->uid, false); TSDB_CHECK_CODE(code, lino, _exit); - - /* - pCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); - TSDB_CHECK_NULL(pCol, code, lino, _exit, terrno); - char varTbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE + 1] = {0}; - varDataSetLen(varTbName, strlen(pReq->name)); - tsnprintf(varTbName + VARSTR_HEADER_SIZE, TSDB_TABLE_NAME_LEN + 1, "%s", pReq->name); - code = colDataSetVal(pCol, i, varTbName, false); - */ } code = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0, pRefBlock); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index a2b5f49b5c..723ebb333d 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -633,9 +633,6 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) { case TDMT_VND_ARB_CHECK_SYNC: { code = vnodePreProcessArbCheckSyncMsg(pVnode, pMsg); } break; - case TDMT_VND_DROP_TSMA_CTB: { - code = vnodePreProcessDropTSmaCtbMsg(pVnode, pMsg); - } break; case TDMT_VND_DROP_TABLE: { code = vnodePreProcessDropTbMsg(pVnode, pMsg); } break; @@ -843,11 +840,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg case TDMT_VND_ARB_CHECK_SYNC: vnodeProcessArbCheckSyncReq(pVnode, pReq, len, pRsp); break; - case TDMT_VND_DROP_TSMA_CTB: - if (vnodeProcessDropTSmaCtbReq(pVnode, ver, pReq, len, pRsp, pMsg) < 0) { - goto _err; - } - break; default: vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType); return TSDB_CODE_INVALID_MSG; @@ -2652,9 +2644,3 @@ int32_t vnodeAsyncCompact(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, int32_t tsdbAsyncCompact(STsdb *tsdb, const STimeWindow *tw, bool sync) { return 0; } #endif -static int32_t vnodeProcessDropTSmaCtbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp, - SRpcMsg *pOriginalMsg) { - pRsp->msgType = TDMT_VND_DROP_TSMA_CTB_RSP; - pRsp->code = TSDB_CODE_SUCCESS; - return pRsp->code; -} diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index fec35c3371..d5b4a52be7 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -1326,7 +1326,7 @@ int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTag int32_t winCode = TSDB_CODE_SUCCESS; code = pAPI->streamStateGetParName(pState, groupId, &pValue, true, &winCode); QUERY_CHECK_CODE(code, lino, _end); - + qInfo("wjm group id: %"PRId64 " winCode: %d, block type: %d", groupId, winCode, pSrcBlock->info.type); if (winCode != TSDB_CODE_SUCCESS) { SSDataBlock* pTmpBlock = NULL; code = blockCopyOneRow(pSrcBlock, rowId, &pTmpBlock); @@ -1335,6 +1335,8 @@ int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTag memset(pTmpBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN); pTmpBlock->info.id.groupId = groupId; char* tbName = pSrcBlock->info.parTbName; + printSpecDataBlock(pSrcBlock, "wjm", "recv", "wjm"); + printSpecDataBlock(pTmpBlock, "wjm", "recv", "wjm"); if (pTableSup->numOfExprs > 0) { code = projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs, NULL); @@ -1342,15 +1344,19 @@ int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTag SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX); QUERY_CHECK_NULL(pTbCol, code, lino, _end, terrno); + printSpecDataBlock(pSrcBlock, "wjm", "recv", "wjm"); + printSpecDataBlock(pTmpBlock, "wjm", "recv", "wjm"); memset(tbName, 0, TSDB_TABLE_NAME_LEN); int32_t len = 0; if (colDataIsNull_s(pTbCol, pDestBlock->info.rows - 1)) { + qInfo("wjm calculated tbnameis null"); len = 1; tbName[0] = 0; } else { void* pData = colDataGetData(pTbCol, pDestBlock->info.rows - 1); len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1); memcpy(tbName, varDataVal(pData), len); + qInfo("wjm calculated tbname: %s", tbName); code = pAPI->streamStatePutParName(pState, groupId, tbName); QUERY_CHECK_CODE(code, lino, _end); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b5cb22cf12..4cd32589d8 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -289,6 +289,7 @@ static int32_t doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* p pTaskInfo, &pTableScanInfo->metaCache); // ignore the table not exists error, since this table may have been dropped during the scan procedure. if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { + if (pTaskInfo->streamInfo.pState) blockDataCleanup(pBlock); code = 0; } } @@ -3038,10 +3039,6 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, pBlockInfo->rows, pTaskInfo, &pTableScanInfo->base.metaCache); // ignore the table not exists error, since this table may have been dropped during the scan procedure. - if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { - code = 0; - } - if (code) { blockDataFreeRes((SSDataBlock*)pBlock); QUERY_CHECK_CODE(code, lino, _end); @@ -3312,7 +3309,7 @@ static int32_t setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) qInfo("wjm, get uid: %"PRIu64, uidCol[i]); uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]); qInfo("wjm, get groupid: %"PRIu64, groupId); - code = colDataSetVal(pGpCol, i, (const char*)&groupId, false); + code = colDataSetVal(pGpCol, i, (const char*)(uidCol + i), false); QUERY_CHECK_CODE(code, lino, _end); } } @@ -3541,7 +3538,7 @@ static int32_t deletePartName(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32 int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; for (int32_t i = 0; i < pBlock->info.rows; i++) { - SColumnInfoData* pGpIdCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); + SColumnInfoData* pGpIdCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX); SColumnInfoData* pTbnameCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); int64_t* gpIdCol = (int64_t*)pGpIdCol->pData; void* pParName = NULL; @@ -3558,13 +3555,15 @@ static int32_t deletePartName(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32 QUERY_CHECK_CODE(code, lino, _end); char varTbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE + 1] = {0}; varDataSetLen(varTbName, strlen(pParName)); - tsnprintf(varTbName + VARSTR_HEADER_SIZE, TSDB_TABLE_NAME_LEN + 1, "%s", pParName); + int64_t len = tsnprintf(varTbName + VARSTR_HEADER_SIZE, TSDB_TABLE_NAME_LEN + 1, "%s", pParName); code = colDataSetVal(pTbnameCol, i, varTbName, false); qDebug("delete stream part for:%"PRId64 " res tb: %s", gpIdCol[i], (char*)pParName); pInfo->stateStore.streamStateFreeVal(pParName); QUERY_CHECK_CODE(code, lino, _end); code = pInfo->stateStore.streamStateDeleteParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, gpIdCol[i]); QUERY_CHECK_CODE(code, lino, _end); + pBlock->info.id.groupId = gpIdCol[i]; + memcpy(pBlock->info.parTbName, varTbName + VARSTR_HEADER_SIZE, TSDB_TABLE_NAME_LEN + 1); } _end: @@ -3962,7 +3961,13 @@ FETCH_NEXT_BLOCK: } code = setBlockIntoRes(pInfo, pRes, &pStreamInfo->fillHistoryWindow, false); - QUERY_CHECK_CODE(code, lino, _end); + if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { + pInfo->pRes->info.rows = 0; + code = TSDB_CODE_SUCCESS; + } else { + QUERY_CHECK_CODE(code, lino, _end); + } + if (pInfo->pRes->info.rows == 0) { continue; } diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 8ac1acb1a2..c2714659ec 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -433,9 +433,6 @@ int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx nodesDestroyNode(pQuery->pRoot); pQuery->pRoot = NULL; code = nodesCloneNode(pQuery->pPrepareRoot, &pQuery->pRoot); - if (NULL == pQuery->pRoot) { - code = code; - } } if (TSDB_CODE_SUCCESS == code) { rewriteExprAlias(pQuery->pRoot); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 34c83acee8..09a4b9c593 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -1534,21 +1534,20 @@ static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect if (TSDB_CODE_SUCCESS == code) { pSort->pSortKeys = NULL; code = nodesCloneList(pSelect->pOrderByList, &pSort->pSortKeys); - if (NULL == pSort->pSortKeys) { - code = code; - } - SNode* pNode = NULL; - SOrderByExprNode* firstSortKey = (SOrderByExprNode*)nodesListGetNode(pSort->pSortKeys, 0); - if (isPrimaryKeySort(pSelect->pOrderByList)) pSort->node.outputTsOrder = firstSortKey->order; - if (firstSortKey->pExpr->type == QUERY_NODE_COLUMN) { - SColumnNode* pCol = (SColumnNode*)firstSortKey->pExpr; - int16_t projIdx = 1; - FOREACH(pNode, pSelect->pProjectionList) { - SExprNode* pExpr = (SExprNode*)pNode; - if (0 == strcmp(pCol->node.aliasName, pExpr->aliasName)) { - pCol->projIdx = projIdx; break; + if (NULL != pSort->pSortKeys) { + SNode* pNode = NULL; + SOrderByExprNode* firstSortKey = (SOrderByExprNode*)nodesListGetNode(pSort->pSortKeys, 0); + if (isPrimaryKeySort(pSelect->pOrderByList)) pSort->node.outputTsOrder = firstSortKey->order; + if (firstSortKey->pExpr->type == QUERY_NODE_COLUMN) { + SColumnNode* pCol = (SColumnNode*)firstSortKey->pExpr; + int16_t projIdx = 1; + FOREACH(pNode, pSelect->pProjectionList) { + SExprNode* pExpr = (SExprNode*)pNode; + if (0 == strcmp(pCol->node.aliasName, pExpr->aliasName)) { + pCol->projIdx = projIdx; break; + } + projIdx++; } - projIdx++; } } } diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index e0e42087f3..e960c0ff5d 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -836,11 +836,9 @@ static int32_t stbSplSplitSessionForStream(SSplitContext* pCxt, SStableSplitInfo nodesDestroyNode(pMergeWin->pTsEnd); pMergeWin->pTsEnd = NULL; code = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index), &pMergeWin->pTsEnd); - if (NULL == pMergeWin->pTsEnd) { - code = code; - } } - code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow); + if (TSDB_CODE_SUCCESS == code) + code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow); } if (TSDB_CODE_SUCCESS == code) { code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index e0fa199199..6e27cd651e 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -758,6 +758,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S SUseDbRsp* pDbInfo = &pTask->outputInfo.shuffleDispatcher.dbInfo; hashValue = taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix); + stInfo("wjm ctbname for dispatch: %s, pDataBlock.info.parTbName: %s", ctbName, pDataBlock->info.parTbName); SBlockName bln = {0}; bln.hashValue = hashValue; memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName)); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 20c3e5a6b9..401aa7530d 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -166,6 +166,8 @@ const char* streamQueueItemGetTypeStr(int32_t type) { return "checkpoint-trigger"; case STREAM_INPUT__TRANS_STATE: return "trans-state"; + case STREAM_INPUT__REF_DATA_BLOCK: + return "ref-block"; default: return "datablock"; } @@ -211,7 +213,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte // do not merge blocks for sink node and check point data block int8_t type = qItem->type; if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || - type == STREAM_INPUT__TRANS_STATE) { + type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__REF_DATA_BLOCK) { const char* p = streamQueueItemGetTypeStr(type); if (*pInput == NULL) { @@ -504,4 +506,4 @@ void streamTaskPutbackToken(STokenBucket* pBucket) { // size in KB void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes) { pBucket->quotaRemain -= SIZE_IN_MiB(bytes); } -void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); } \ No newline at end of file +void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 94dff3f71c..da5e1f47e9 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -87,7 +87,7 @@ int32_t walNextValidMsg(SWalReader *pReader) { int32_t type = pReader->pHead->head.msgType; if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) || - (type == TDMT_VND_DROP_TSMA_CTB) || (IS_META_MSG(type) && pReader->cond.scanMeta)) { + (IS_META_MSG(type) && pReader->cond.scanMeta)) { TAOS_RETURN(walFetchBody(pReader)); } else if (type == TDMT_VND_DROP_TABLE && pReader->cond.scanDropCtb) { TAOS_RETURN(walFetchBody(pReader)); diff --git a/tests/system-test/2-query/tsma.py b/tests/system-test/2-query/tsma.py index 78a3c1406e..77e57bd36d 100644 --- a/tests/system-test/2-query/tsma.py +++ b/tests/system-test/2-query/tsma.py @@ -805,9 +805,8 @@ class TDTestCase: def test_query_with_tsma(self): self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)', 'count(ts)'], '5m') - #self.create_tsma('tsma2', 'test', 'meters', ['avg(c1)', 'avg(c2)', 'count(ts)'], '30m') - #self.create_tsma('tsma5', 'test', 'norm_tb', ['avg(c1)', 'avg(c2)'], '10m') - return + self.create_tsma('tsma2', 'test', 'meters', ['avg(c1)', 'avg(c2)', 'count(ts)'], '30m') + self.create_tsma('tsma5', 'test', 'norm_tb', ['avg(c1)', 'avg(c2)'], '10m') self.test_query_with_tsma_interval() self.test_query_with_tsma_agg() @@ -1228,10 +1227,10 @@ class TDTestCase: def run(self): self.init_data() - #self.test_ddl() + self.test_ddl() self.test_query_with_tsma() # bug to fix - #self.test_flush_query() + self.test_flush_query() #cluster test cluster_dnode_list = tdSql.get_cluseter_dnodes() @@ -1241,14 +1240,35 @@ class TDTestCase: self.test_td_32519() def test_td_32519(self): - tdSql.execute('INSERT INTO t1 VALUES("2024-10-24 11:45:00", 1,1,1,1,1,1,1, "a", "a")', queryTimes=1) - tdSql.execute('INSERT INTO t1 VALUES("2024-10-24 11:55:00", 2,1,1,1,1,1,1, "a", "a")', queryTimes=1) - tdSql.execute('DROP TABLE t1', queryTimes=1) - tdSql.execute('CREATE TABLE t1 USING meters TAGS(1, "a", "b", 1,1,1)') - tdSql.execute('INSERT INTO t1 VALUES("2024-10-24 11:59:00", 3,1,1,1,1,1,1, "a", "a")', queryTimes=1) - tdSql.execute('INSERT INTO t1 VALUES("2024-10-24 12:10:00", 4,1,1,1,1,1,1, "a", "a")', queryTimes=1) - tdSql.execute('INSERT INTO t1 VALUES("2024-10-24 12:20:00", 5,1,1,1,1,1,1, "a", "a")', queryTimes=1) + tdSql.execute("drop tsma test.tsma5") + self.create_recursive_tsma('tsma1', 'tsma_r', 'test', '1h', 'meters', ['avg(c1)', 'avg(c2)', 'count(ts)']) + tdSql.execute('INSERT INTO test.t1 VALUES("2024-10-24 11:45:00", 1,1,1,1,1,1,1, "a", "a")', queryTimes=1) + tdSql.execute('INSERT INTO test.t1 VALUES("2024-10-24 11:55:00", 2,1,1,1,1,1,1, "a", "a")', queryTimes=1) + tdSql.execute('DROP TABLE test.t1', queryTimes=1) + self.wait_query_err('desc test.`404e15422d96c8b5de9603c2296681b1`', 10, -2147473917) + self.wait_query_err('desc test.`82b56f091c4346369da0af777c3e580d`', 10, -2147473917) + self.wait_query_err('desc test.`163b7c69922cf6d83a98bfa44e52dade`', 10, -2147473917) + tdSql.execute('CREATE TABLE test.t1 USING test.meters TAGS(1, "a", "b", 1,1,1)') + tdSql.execute('INSERT INTO test.t1 VALUES("2024-10-24 11:59:00", 3,1,1,1,1,1,1, "a", "a")', queryTimes=1) + tdSql.execute('INSERT INTO test.t1 VALUES("2024-10-24 12:10:00", 4,1,1,1,1,1,1, "a", "a")', queryTimes=1) + tdSql.execute('INSERT INTO test.t1 VALUES("2024-10-24 12:20:00", 5,1,1,1,1,1,1, "a", "a")', queryTimes=1) tdSql.execute('FLUSH DATABASE test', queryTimes=1) + tdSql.query('SELECT * FROM test.t1', queryTimes=1) + tdSql.checkRows(3) + sql = 'SELECT * FROM test.`404e15422d96c8b5de9603c2296681b1`' + self.wait_query(sql, 3, 20) ## tsma1 output ctb for t1 + tdSql.query(sql, queryTimes=1) + tdSql.checkData(0,1, 1) + tdSql.checkData(1,1, 1) + tdSql.checkData(2,1, 1) + #sql = 'select * from test.`82b56f091c4346369da0af777c3e580d`' + #self.wait_query(sql, 2, 10) ## tsma2 output ctb for t1 + #tdSql.query(sql, queryTimes=1) + #tdSql.checkData(0, 1, 1) + #tdSql.checkData(1, 1, 2) + sql = 'select * from test.`163b7c69922cf6d83a98bfa44e52dade`' + self.wait_query(sql, 2, 20) ## tsma_r output ctb for t1 + tdSql.checkData(0, 1, 1) def test_create_tsma(self): function_name = sys._getframe().f_code.co_name