fix drop child table with tsma
This commit is contained in:
parent
65dffbda0c
commit
9a92c136ce
|
@ -155,7 +155,6 @@ typedef enum EStreamType {
|
|||
STREAM_MID_RETRIEVE,
|
||||
STREAM_PARTITION_DELETE_DATA,
|
||||
STREAM_GET_RESULT,
|
||||
STREAM_DELETE_GROUP_DATA,
|
||||
STREAM_DROP_CHILD_TABLE,
|
||||
} EStreamType;
|
||||
|
||||
|
@ -403,7 +402,8 @@ int32_t dumpConfToDataBlock(SSDataBlock* pBlock, int32_t startCol);
|
|||
#define TSMA_RES_STB_EXTRA_COLUMN_NUM 4 // 3 columns: _wstart, _wend, _wduration, 1 tag: tbname
|
||||
|
||||
static inline bool isTsmaResSTb(const char* stbName) {
|
||||
return false;
|
||||
static bool showTsmaTables = false;
|
||||
if (showTsmaTables) return false;
|
||||
const char* pos = strstr(stbName, TSMA_RES_STB_POSTFIX);
|
||||
if (pos && strlen(stbName) == (pos - stbName) + strlen(TSMA_RES_STB_POSTFIX)) {
|
||||
return true;
|
||||
|
|
|
@ -316,7 +316,6 @@
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_ARB_CHECK_SYNC, "vnode-arb-check-sync", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_FETCH_TTL_EXPIRED_TBS, "vnode-fetch-ttl-expired-tbs", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_TABLE_NAME, "vnode-table-name", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TSMA_CTB, "vnode-drop-tsma-ctb", NULL, NULL)
|
||||
TD_CLOSE_MSG_SEG(TDMT_VND_MSG)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_SCH_MSG) // 3<<8
|
||||
|
|
|
@ -462,7 +462,7 @@ struct SStreamTask {
|
|||
struct SStreamMeta* pMeta;
|
||||
SSHashObj* pNameMap;
|
||||
void* pBackend;
|
||||
int8_t subtableWithoutMd5;
|
||||
int8_t subtableWithoutMd5; // only for tsma stream tasks
|
||||
char reserve[256];
|
||||
char* backendPath;
|
||||
};
|
||||
|
|
|
@ -182,7 +182,6 @@ SArray *mmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TB_WITH_TSMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TSMA_CTB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_ANAL_ALGO, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_IP_WHITE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -1014,7 +1014,6 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_NAME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TSMA_CTB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -96,7 +96,6 @@ int32_t mndInitStb(SMnode *pMnode) {
|
|||
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TB_WITH_TSMA, mndProcessDropTbWithTsma);
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mndProcessFetchTtlExpiredTbs);
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TABLE_RSP, mndTransProcessRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TSMA_CTB_RSP, mndTransProcessRsp);
|
||||
// mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveStbReq);
|
||||
|
||||
// mndSetMsgHandle(pMnode, TDMT_MND_CREATE_INDEX, mndProcessCreateIndexReq);
|
||||
|
|
|
@ -363,10 +363,10 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
|
|||
tqError("%s failed to create data submit for stream since out of memory", id);
|
||||
return code;
|
||||
}
|
||||
} else if (pCont->msgType == TDMT_VND_DELETE || pCont->msgType == TDMT_VND_DROP_TSMA_CTB) {
|
||||
} else if (pCont->msgType == TDMT_VND_DELETE) {
|
||||
void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
|
||||
int32_t len = pCont->bodyLen - sizeof(SMsgHead);
|
||||
EStreamType blockType = pCont->msgType == TDMT_VND_DELETE ? STREAM_DELETE_DATA : STREAM_DELETE_GROUP_DATA;
|
||||
EStreamType blockType = pCont->msgType == STREAM_DELETE_DATA;
|
||||
code = tqExtractDelDataBlock(pBody, len, ver, (void**)pItem, 0, blockType);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
if (*pItem == NULL) {
|
||||
|
|
|
@ -464,6 +464,19 @@ static int32_t doBuildAndSendDropTableMsg(SVnode* pVnode, char* pStbFullname, SS
|
|||
TSDB_CHECK_CODE(terrno, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
||||
SMetaReader mr = {0};
|
||||
metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
|
||||
// TODO wjm handle only one table
|
||||
code = metaGetTableEntryByName(&mr, tbName);
|
||||
if (isValidDstChildTable(&mr, TD_VID(pVnode), tbName, pTask->outputInfo.tbSink.stbUid)) {
|
||||
STableSinkInfo* pTableSinkInfo = NULL;
|
||||
bool alreadyCached = doGetSinkTableInfoFromCache(pTask->outputInfo.tbSink.pTbInfo, pDataBlock->info.id.groupId, &pTableSinkInfo);
|
||||
if (alreadyCached) {
|
||||
pTableSinkInfo->uid = mr.me.uid;
|
||||
}
|
||||
}
|
||||
metaReaderClear(&mr);
|
||||
tqDebug("s-task:%s build drop %d table(s) msg", id, rows);
|
||||
code = tqPutReqToQueue(pVnode, &batchReq, encodeDropChildTableForRPC, TDMT_VND_DROP_TABLE);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
@ -473,10 +486,10 @@ static int32_t doBuildAndSendDropTableMsg(SVnode* pVnode, char* pStbFullname, SS
|
|||
memcpy(tbName, varDataVal(pData), varDataLen(pData));
|
||||
tbName[varDataLen(pData) + 1] = 0;
|
||||
int64_t uid = *(int64_t*)colDataGetData(pUidCol, i);
|
||||
// TODO wjm remove uid it's not my uid
|
||||
code = doWaitForDstTableDropped(pVnode, pTask, tbName, uid);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
return code;
|
||||
|
||||
_exit:
|
||||
if (batchReq.pArray) {
|
||||
|
|
|
@ -777,15 +777,6 @@ int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, vo
|
|||
TSDB_CHECK_NULL(pCol, code, lino, _exit, terrno);
|
||||
code = colDataSetVal(pCol, i, (const char* )&pReq->uid, false);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
/*
|
||||
pCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
|
||||
TSDB_CHECK_NULL(pCol, code, lino, _exit, terrno);
|
||||
char varTbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE + 1] = {0};
|
||||
varDataSetLen(varTbName, strlen(pReq->name));
|
||||
tsnprintf(varTbName + VARSTR_HEADER_SIZE, TSDB_TABLE_NAME_LEN + 1, "%s", pReq->name);
|
||||
code = colDataSetVal(pCol, i, varTbName, false);
|
||||
*/
|
||||
}
|
||||
|
||||
code = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0, pRefBlock);
|
||||
|
|
|
@ -633,9 +633,6 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
case TDMT_VND_ARB_CHECK_SYNC: {
|
||||
code = vnodePreProcessArbCheckSyncMsg(pVnode, pMsg);
|
||||
} break;
|
||||
case TDMT_VND_DROP_TSMA_CTB: {
|
||||
code = vnodePreProcessDropTSmaCtbMsg(pVnode, pMsg);
|
||||
} break;
|
||||
case TDMT_VND_DROP_TABLE: {
|
||||
code = vnodePreProcessDropTbMsg(pVnode, pMsg);
|
||||
} break;
|
||||
|
@ -843,11 +840,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
|
|||
case TDMT_VND_ARB_CHECK_SYNC:
|
||||
vnodeProcessArbCheckSyncReq(pVnode, pReq, len, pRsp);
|
||||
break;
|
||||
case TDMT_VND_DROP_TSMA_CTB:
|
||||
if (vnodeProcessDropTSmaCtbReq(pVnode, ver, pReq, len, pRsp, pMsg) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType);
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
|
@ -2652,9 +2644,3 @@ int32_t vnodeAsyncCompact(SVnode *pVnode, int64_t ver, void *pReq, int32_t len,
|
|||
int32_t tsdbAsyncCompact(STsdb *tsdb, const STimeWindow *tw, bool sync) { return 0; }
|
||||
#endif
|
||||
|
||||
static int32_t vnodeProcessDropTSmaCtbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp,
|
||||
SRpcMsg *pOriginalMsg) {
|
||||
pRsp->msgType = TDMT_VND_DROP_TSMA_CTB_RSP;
|
||||
pRsp->code = TSDB_CODE_SUCCESS;
|
||||
return pRsp->code;
|
||||
}
|
||||
|
|
|
@ -1326,7 +1326,7 @@ int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTag
|
|||
int32_t winCode = TSDB_CODE_SUCCESS;
|
||||
code = pAPI->streamStateGetParName(pState, groupId, &pValue, true, &winCode);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
||||
qInfo("wjm group id: %"PRId64 " winCode: %d, block type: %d", groupId, winCode, pSrcBlock->info.type);
|
||||
if (winCode != TSDB_CODE_SUCCESS) {
|
||||
SSDataBlock* pTmpBlock = NULL;
|
||||
code = blockCopyOneRow(pSrcBlock, rowId, &pTmpBlock);
|
||||
|
@ -1335,6 +1335,8 @@ int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTag
|
|||
memset(pTmpBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
|
||||
pTmpBlock->info.id.groupId = groupId;
|
||||
char* tbName = pSrcBlock->info.parTbName;
|
||||
printSpecDataBlock(pSrcBlock, "wjm", "recv", "wjm");
|
||||
printSpecDataBlock(pTmpBlock, "wjm", "recv", "wjm");
|
||||
if (pTableSup->numOfExprs > 0) {
|
||||
code = projectApplyFunctions(pTableSup->pExprInfo, pDestBlock, pTmpBlock, pTableSup->pCtx, pTableSup->numOfExprs,
|
||||
NULL);
|
||||
|
@ -1342,15 +1344,19 @@ int32_t appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTag
|
|||
|
||||
SColumnInfoData* pTbCol = taosArrayGet(pDestBlock->pDataBlock, UD_TABLE_NAME_COLUMN_INDEX);
|
||||
QUERY_CHECK_NULL(pTbCol, code, lino, _end, terrno);
|
||||
printSpecDataBlock(pSrcBlock, "wjm", "recv", "wjm");
|
||||
printSpecDataBlock(pTmpBlock, "wjm", "recv", "wjm");
|
||||
memset(tbName, 0, TSDB_TABLE_NAME_LEN);
|
||||
int32_t len = 0;
|
||||
if (colDataIsNull_s(pTbCol, pDestBlock->info.rows - 1)) {
|
||||
qInfo("wjm calculated tbnameis null");
|
||||
len = 1;
|
||||
tbName[0] = 0;
|
||||
} else {
|
||||
void* pData = colDataGetData(pTbCol, pDestBlock->info.rows - 1);
|
||||
len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
|
||||
memcpy(tbName, varDataVal(pData), len);
|
||||
qInfo("wjm calculated tbname: %s", tbName);
|
||||
code = pAPI->streamStatePutParName(pState, groupId, tbName);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
|
|
@ -289,6 +289,7 @@ static int32_t doSetTagColumnData(STableScanBase* pTableScanInfo, SSDataBlock* p
|
|||
pTaskInfo, &pTableScanInfo->metaCache);
|
||||
// ignore the table not exists error, since this table may have been dropped during the scan procedure.
|
||||
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||
if (pTaskInfo->streamInfo.pState) blockDataCleanup(pBlock);
|
||||
code = 0;
|
||||
}
|
||||
}
|
||||
|
@ -3038,10 +3039,6 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
|||
code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
|
||||
pBlockInfo->rows, pTaskInfo, &pTableScanInfo->base.metaCache);
|
||||
// ignore the table not exists error, since this table may have been dropped during the scan procedure.
|
||||
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||
code = 0;
|
||||
}
|
||||
|
||||
if (code) {
|
||||
blockDataFreeRes((SSDataBlock*)pBlock);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
|
@ -3312,7 +3309,7 @@ static int32_t setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock)
|
|||
qInfo("wjm, get uid: %"PRIu64, uidCol[i]);
|
||||
uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]);
|
||||
qInfo("wjm, get groupid: %"PRIu64, groupId);
|
||||
code = colDataSetVal(pGpCol, i, (const char*)&groupId, false);
|
||||
code = colDataSetVal(pGpCol, i, (const char*)(uidCol + i), false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
}
|
||||
|
@ -3541,7 +3538,7 @@ static int32_t deletePartName(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
||||
SColumnInfoData* pGpIdCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||
SColumnInfoData* pGpIdCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
|
||||
SColumnInfoData* pTbnameCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
|
||||
int64_t* gpIdCol = (int64_t*)pGpIdCol->pData;
|
||||
void* pParName = NULL;
|
||||
|
@ -3558,13 +3555,15 @@ static int32_t deletePartName(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32
|
|||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
char varTbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE + 1] = {0};
|
||||
varDataSetLen(varTbName, strlen(pParName));
|
||||
tsnprintf(varTbName + VARSTR_HEADER_SIZE, TSDB_TABLE_NAME_LEN + 1, "%s", pParName);
|
||||
int64_t len = tsnprintf(varTbName + VARSTR_HEADER_SIZE, TSDB_TABLE_NAME_LEN + 1, "%s", pParName);
|
||||
code = colDataSetVal(pTbnameCol, i, varTbName, false);
|
||||
qDebug("delete stream part for:%"PRId64 " res tb: %s", gpIdCol[i], (char*)pParName);
|
||||
pInfo->stateStore.streamStateFreeVal(pParName);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
code = pInfo->stateStore.streamStateDeleteParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, gpIdCol[i]);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
pBlock->info.id.groupId = gpIdCol[i];
|
||||
memcpy(pBlock->info.parTbName, varTbName + VARSTR_HEADER_SIZE, TSDB_TABLE_NAME_LEN + 1);
|
||||
}
|
||||
|
||||
_end:
|
||||
|
@ -3962,7 +3961,13 @@ FETCH_NEXT_BLOCK:
|
|||
}
|
||||
|
||||
code = setBlockIntoRes(pInfo, pRes, &pStreamInfo->fillHistoryWindow, false);
|
||||
if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||
pInfo->pRes->info.rows = 0;
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
if (pInfo->pRes->info.rows == 0) {
|
||||
continue;
|
||||
}
|
||||
|
|
|
@ -433,9 +433,6 @@ int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx
|
|||
nodesDestroyNode(pQuery->pRoot);
|
||||
pQuery->pRoot = NULL;
|
||||
code = nodesCloneNode(pQuery->pPrepareRoot, &pQuery->pRoot);
|
||||
if (NULL == pQuery->pRoot) {
|
||||
code = code;
|
||||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
rewriteExprAlias(pQuery->pRoot);
|
||||
|
|
|
@ -1534,9 +1534,7 @@ static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pSort->pSortKeys = NULL;
|
||||
code = nodesCloneList(pSelect->pOrderByList, &pSort->pSortKeys);
|
||||
if (NULL == pSort->pSortKeys) {
|
||||
code = code;
|
||||
}
|
||||
if (NULL != pSort->pSortKeys) {
|
||||
SNode* pNode = NULL;
|
||||
SOrderByExprNode* firstSortKey = (SOrderByExprNode*)nodesListGetNode(pSort->pSortKeys, 0);
|
||||
if (isPrimaryKeySort(pSelect->pOrderByList)) pSort->node.outputTsOrder = firstSortKey->order;
|
||||
|
@ -1552,6 +1550,7 @@ static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
|||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pLogicNode = (SLogicNode*)pSort;
|
||||
|
|
|
@ -836,10 +836,8 @@ static int32_t stbSplSplitSessionForStream(SSplitContext* pCxt, SStableSplitInfo
|
|||
nodesDestroyNode(pMergeWin->pTsEnd);
|
||||
pMergeWin->pTsEnd = NULL;
|
||||
code = nodesCloneNode(nodesListGetNode(pPartWin->node.pTargets, index), &pMergeWin->pTsEnd);
|
||||
if (NULL == pMergeWin->pTsEnd) {
|
||||
code = code;
|
||||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code)
|
||||
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
|
|
@ -758,6 +758,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
|
|||
SUseDbRsp* pDbInfo = &pTask->outputInfo.shuffleDispatcher.dbInfo;
|
||||
hashValue =
|
||||
taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix);
|
||||
stInfo("wjm ctbname for dispatch: %s, pDataBlock.info.parTbName: %s", ctbName, pDataBlock->info.parTbName);
|
||||
SBlockName bln = {0};
|
||||
bln.hashValue = hashValue;
|
||||
memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName));
|
||||
|
|
|
@ -166,6 +166,8 @@ const char* streamQueueItemGetTypeStr(int32_t type) {
|
|||
return "checkpoint-trigger";
|
||||
case STREAM_INPUT__TRANS_STATE:
|
||||
return "trans-state";
|
||||
case STREAM_INPUT__REF_DATA_BLOCK:
|
||||
return "ref-block";
|
||||
default:
|
||||
return "datablock";
|
||||
}
|
||||
|
@ -211,7 +213,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte
|
|||
// do not merge blocks for sink node and check point data block
|
||||
int8_t type = qItem->type;
|
||||
if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
|
||||
type == STREAM_INPUT__TRANS_STATE) {
|
||||
type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||
const char* p = streamQueueItemGetTypeStr(type);
|
||||
|
||||
if (*pInput == NULL) {
|
||||
|
|
|
@ -87,7 +87,7 @@ int32_t walNextValidMsg(SWalReader *pReader) {
|
|||
|
||||
int32_t type = pReader->pHead->head.msgType;
|
||||
if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) ||
|
||||
(type == TDMT_VND_DROP_TSMA_CTB) || (IS_META_MSG(type) && pReader->cond.scanMeta)) {
|
||||
(IS_META_MSG(type) && pReader->cond.scanMeta)) {
|
||||
TAOS_RETURN(walFetchBody(pReader));
|
||||
} else if (type == TDMT_VND_DROP_TABLE && pReader->cond.scanDropCtb) {
|
||||
TAOS_RETURN(walFetchBody(pReader));
|
||||
|
|
|
@ -805,9 +805,8 @@ class TDTestCase:
|
|||
|
||||
def test_query_with_tsma(self):
|
||||
self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)', 'count(ts)'], '5m')
|
||||
#self.create_tsma('tsma2', 'test', 'meters', ['avg(c1)', 'avg(c2)', 'count(ts)'], '30m')
|
||||
#self.create_tsma('tsma5', 'test', 'norm_tb', ['avg(c1)', 'avg(c2)'], '10m')
|
||||
return
|
||||
self.create_tsma('tsma2', 'test', 'meters', ['avg(c1)', 'avg(c2)', 'count(ts)'], '30m')
|
||||
self.create_tsma('tsma5', 'test', 'norm_tb', ['avg(c1)', 'avg(c2)'], '10m')
|
||||
|
||||
self.test_query_with_tsma_interval()
|
||||
self.test_query_with_tsma_agg()
|
||||
|
@ -1228,10 +1227,10 @@ class TDTestCase:
|
|||
|
||||
def run(self):
|
||||
self.init_data()
|
||||
#self.test_ddl()
|
||||
self.test_ddl()
|
||||
self.test_query_with_tsma()
|
||||
# bug to fix
|
||||
#self.test_flush_query()
|
||||
self.test_flush_query()
|
||||
|
||||
#cluster test
|
||||
cluster_dnode_list = tdSql.get_cluseter_dnodes()
|
||||
|
@ -1241,14 +1240,35 @@ class TDTestCase:
|
|||
self.test_td_32519()
|
||||
|
||||
def test_td_32519(self):
|
||||
tdSql.execute('INSERT INTO t1 VALUES("2024-10-24 11:45:00", 1,1,1,1,1,1,1, "a", "a")', queryTimes=1)
|
||||
tdSql.execute('INSERT INTO t1 VALUES("2024-10-24 11:55:00", 2,1,1,1,1,1,1, "a", "a")', queryTimes=1)
|
||||
tdSql.execute('DROP TABLE t1', queryTimes=1)
|
||||
tdSql.execute('CREATE TABLE t1 USING meters TAGS(1, "a", "b", 1,1,1)')
|
||||
tdSql.execute('INSERT INTO t1 VALUES("2024-10-24 11:59:00", 3,1,1,1,1,1,1, "a", "a")', queryTimes=1)
|
||||
tdSql.execute('INSERT INTO t1 VALUES("2024-10-24 12:10:00", 4,1,1,1,1,1,1, "a", "a")', queryTimes=1)
|
||||
tdSql.execute('INSERT INTO t1 VALUES("2024-10-24 12:20:00", 5,1,1,1,1,1,1, "a", "a")', queryTimes=1)
|
||||
tdSql.execute("drop tsma test.tsma5")
|
||||
self.create_recursive_tsma('tsma1', 'tsma_r', 'test', '1h', 'meters', ['avg(c1)', 'avg(c2)', 'count(ts)'])
|
||||
tdSql.execute('INSERT INTO test.t1 VALUES("2024-10-24 11:45:00", 1,1,1,1,1,1,1, "a", "a")', queryTimes=1)
|
||||
tdSql.execute('INSERT INTO test.t1 VALUES("2024-10-24 11:55:00", 2,1,1,1,1,1,1, "a", "a")', queryTimes=1)
|
||||
tdSql.execute('DROP TABLE test.t1', queryTimes=1)
|
||||
self.wait_query_err('desc test.`404e15422d96c8b5de9603c2296681b1`', 10, -2147473917)
|
||||
self.wait_query_err('desc test.`82b56f091c4346369da0af777c3e580d`', 10, -2147473917)
|
||||
self.wait_query_err('desc test.`163b7c69922cf6d83a98bfa44e52dade`', 10, -2147473917)
|
||||
tdSql.execute('CREATE TABLE test.t1 USING test.meters TAGS(1, "a", "b", 1,1,1)')
|
||||
tdSql.execute('INSERT INTO test.t1 VALUES("2024-10-24 11:59:00", 3,1,1,1,1,1,1, "a", "a")', queryTimes=1)
|
||||
tdSql.execute('INSERT INTO test.t1 VALUES("2024-10-24 12:10:00", 4,1,1,1,1,1,1, "a", "a")', queryTimes=1)
|
||||
tdSql.execute('INSERT INTO test.t1 VALUES("2024-10-24 12:20:00", 5,1,1,1,1,1,1, "a", "a")', queryTimes=1)
|
||||
tdSql.execute('FLUSH DATABASE test', queryTimes=1)
|
||||
tdSql.query('SELECT * FROM test.t1', queryTimes=1)
|
||||
tdSql.checkRows(3)
|
||||
sql = 'SELECT * FROM test.`404e15422d96c8b5de9603c2296681b1`'
|
||||
self.wait_query(sql, 3, 20) ## tsma1 output ctb for t1
|
||||
tdSql.query(sql, queryTimes=1)
|
||||
tdSql.checkData(0,1, 1)
|
||||
tdSql.checkData(1,1, 1)
|
||||
tdSql.checkData(2,1, 1)
|
||||
#sql = 'select * from test.`82b56f091c4346369da0af777c3e580d`'
|
||||
#self.wait_query(sql, 2, 10) ## tsma2 output ctb for t1
|
||||
#tdSql.query(sql, queryTimes=1)
|
||||
#tdSql.checkData(0, 1, 1)
|
||||
#tdSql.checkData(1, 1, 2)
|
||||
sql = 'select * from test.`163b7c69922cf6d83a98bfa44e52dade`'
|
||||
self.wait_query(sql, 2, 20) ## tsma_r output ctb for t1
|
||||
tdSql.checkData(0, 1, 1)
|
||||
|
||||
def test_create_tsma(self):
|
||||
function_name = sys._getframe().f_code.co_name
|
||||
|
|
Loading…
Reference in New Issue