diff --git a/include/common/tcommon.h b/include/common/tcommon.h index c5404085bb..4eea744be1 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -56,7 +56,6 @@ enum { STREAM_INPUT__DATA_SUBMIT = 1, STREAM_INPUT__DATA_BLOCK, STREAM_INPUT__MERGED_SUBMIT, - // STREAM_INPUT__TABLE_SCAN, STREAM_INPUT__TQ_SCAN, STREAM_INPUT__DATA_RETRIEVE, STREAM_INPUT__GET_RES, @@ -154,7 +153,7 @@ typedef struct SQueryTableDataCond { int32_t order; // desc|asc order to iterate the data block int32_t numOfCols; SColumnInfo* colList; - int32_t type; // data block load type: + int32_t type; // data block load type: STimeWindow twindows; int64_t startVersion; int64_t endVersion; diff --git a/include/libs/stream/tstreamUpdate.h b/include/libs/stream/tstreamUpdate.h index 6e4a8d62d0..a4728e6382 100644 --- a/include/libs/stream/tstreamUpdate.h +++ b/include/libs/stream/tstreamUpdate.h @@ -34,11 +34,16 @@ typedef struct SUpdateInfo { TSKEY minTS; SScalableBf* pCloseWinSBF; SHashObj* pMap; + STimeWindow scanWindow; + uint64_t scanGroupId; + uint64_t maxVersion; } SUpdateInfo; SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark); SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark); bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); +void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version); +bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version); void updateInfoDestroy(SUpdateInfo *pInfo); void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo); void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo); diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 1417369096..980e01a65c 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -41,7 +41,7 @@ extern "C" { #define WAL_REFRESH_MS 1000 #define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) #define WAL_FILE_LEN (WAL_PATH_LEN + 32) -#define WAL_MAGIC 0xFAFBFCFDULL +#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL #define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3) typedef enum { diff --git a/include/util/taoserror.h b/include/util/taoserror.h index da2f58307d..1c48a46614 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -258,6 +258,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_SINGLE_STB_MODE_DB TAOS_DEF_ERROR_CODE(0, 0x03C5) #define TSDB_CODE_MND_INVALID_SCHEMA_VER TAOS_DEF_ERROR_CODE(0, 0x03C6) #define TSDB_CODE_MND_STABLE_UID_NOT_MATCH TAOS_DEF_ERROR_CODE(0, 0x03C7) +#define TSDB_CODE_MND_FIELD_CONFLICT_WITH_TSMA TAOS_DEF_ERROR_CODE(0, 0x03C8) // mnode-trans #define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index f8e64a3409..7cfc1c0b1d 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1231,9 +1231,7 @@ int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src) { colDataAssign(pDst, pSrc, src->info.rows, &src->info); } - dst->info.rows = src->info.rows; - dst->info.window = src->info.window; - dst->info.type = src->info.type; + dst->info = src->info; return TSDB_CODE_SUCCESS; } @@ -1708,9 +1706,9 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t rows = pDataBlock->info.rows; int32_t len = 0; - len += snprintf(dumpBuf + len, size - len, "===stream===%s |block type %d|child id %d|group id:%" PRIu64 "|uid:%ld|rows:%d\n", flag, + len += snprintf(dumpBuf + len, size - len, "===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%ld|rows:%d|version:%" PRIu64 "\n", flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId, - pDataBlock->info.uid, pDataBlock->info.rows); + pDataBlock->info.uid, pDataBlock->info.rows, pDataBlock->info.version); if (len >= size - 1) return dumpBuf; for (int32_t j = 0; j < rows; j++) { diff --git a/source/dnode/mnode/impl/inc/mndTopic.h b/source/dnode/mnode/impl/inc/mndTopic.h index 7eb53dbdbb..c5c4800e02 100644 --- a/source/dnode/mnode/impl/inc/mndTopic.h +++ b/source/dnode/mnode/impl/inc/mndTopic.h @@ -37,8 +37,6 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]); int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic); -int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char* stbname, int64_t suid, col_id_t colId); - #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index aab0c7e815..1b0a2c2a13 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -45,7 +45,9 @@ static int32_t mndProcessTableMetaReq(SRpcMsg *pReq); static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextStb(SMnode *pMnode, void *pIter); static int32_t mndProcessTableCfgReq(SRpcMsg *pReq); -static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp, void* alterOriData, int32_t alterOriDataLen); +static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp, + void *alterOriData, int32_t alterOriDataLen); +static int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t suid, col_id_t colId); int32_t mndInitStb(SMnode *pMnode) { SSdbTable table = { @@ -409,7 +411,8 @@ static FORCE_INLINE int32_t schemaExColIdCompare(const void *colId, const void * return 0; } -static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen, void* alterOriData, int32_t alterOriDataLen) { +static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen, + void *alterOriData, int32_t alterOriDataLen) { SEncoder encoder = {0}; int32_t contLen; SName name = {0}; @@ -709,7 +712,8 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat memcpy(pDst->db, pDb->name, TSDB_DB_FNAME_LEN); pDst->createdTime = taosGetTimestampMs(); pDst->updateTime = pDst->createdTime; - pDst->uid = (pCreate->source == TD_REQ_FROM_TAOX) ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); + pDst->uid = + (pCreate->source == TD_REQ_FROM_TAOX) ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); pDst->dbUid = pDb->uid; pDst->tagVer = 1; pDst->colVer = 1; @@ -895,9 +899,9 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq pSchema->flags = pField->flags; memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN); int32_t cIndex = mndFindSuperTableColumnIndex(pStb, pField->name); - if (cIndex >= 0){ + if (cIndex >= 0) { pSchema->colId = pStb->pColumns[cIndex].colId; - }else{ + } else { pSchema->colId = pDst->nextColId++; } } @@ -909,12 +913,11 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq pSchema->bytes = pField->bytes; memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN); int32_t cIndex = mndFindSuperTableTagIndex(pStb, pField->name); - if (cIndex >= 0){ + if (cIndex >= 0) { pSchema->colId = pStb->pTags[cIndex].colId; - }else{ + } else { pSchema->colId = pDst->nextColId++; } - } pDst->tagVer = createReq->tagVer; pDst->colVer = createReq->colVer; @@ -982,7 +985,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { } } else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) { goto _OVER; - } else if (createReq.source == TD_REQ_FROM_TAOX && (createReq.tagVer != 1 || createReq.colVer != 1)){ + } else if (createReq.source == TD_REQ_FROM_TAOX && (createReq.tagVer != 1 || createReq.colVer != 1)) { mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name); code = 0; goto _OVER; @@ -1009,7 +1012,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { } if (isAlter) { - bool needRsp = false; + bool needRsp = false; SStbObj pDst = {0}; if (mndBuildStbFromAlter(pStb, &pDst, &createReq) != 0) { taosMemoryFreeClear(pDst.pTags); @@ -1137,6 +1140,99 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p return 0; } +int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t suid, col_id_t colId) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + while (1) { + SMqTopicObj *pTopic = NULL; + pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic); + if (pIter == NULL) break; + + mDebug("topic:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, subType:%d sql:%s", + pTopic->name, stbname, suid, colId, pTopic->subType, pTopic->sql); + if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) { + sdbRelease(pSdb, pTopic); + continue; + } + + SNode *pAst = NULL; + if (nodesStringToNode(pTopic->ast, &pAst) != 0) { + ASSERT(0); + return -1; + } + + SNodeList *pNodeList = NULL; + nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList); + SNode *pNode = NULL; + FOREACH(pNode, pNodeList) { + SColumnNode *pCol = (SColumnNode *)pNode; + mDebug("topic:%s, check colId:%d tableId:%" PRId64 " ctbStbUid:%" PRId64, pTopic->name, pCol->colId, + pCol->tableId, pTopic->ctbStbUid); + + if (pCol->tableId != suid && pTopic->ctbStbUid != suid) { + mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId); + goto NEXT; + } + if (pCol->colId > 0 && pCol->colId == colId) { + sdbRelease(pSdb, pTopic); + nodesDestroyNode(pAst); + terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC; + mError("topic:%s, check colId:%d conflicted", pTopic->name, pCol->colId); + return -1; + } + mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId); + } + + NEXT: + sdbRelease(pSdb, pTopic); + nodesDestroyNode(pAst); + } + + while (1) { + SSmaObj *pSma = NULL; + pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma); + if (pIter == NULL) break; + + mDebug("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, sql:%s", pSma->name, stbname, + suid, colId, pSma->sql); + + SNode *pAst = NULL; + if (nodesStringToNode(pSma->ast, &pAst) != 0) { + terrno = TSDB_CODE_SDB_INVALID_DATA_CONTENT; + mError("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d failed since parse AST err", + pSma->name, stbname, suid, colId); + return -1; + } + + SNodeList *pNodeList = NULL; + nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList); + SNode *pNode = NULL; + FOREACH(pNode, pNodeList) { + SColumnNode *pCol = (SColumnNode *)pNode; + mDebug("tsma:%s, check colId:%d tableId:%" PRId64, pSma->name, pCol->colId, pCol->tableId); + + if ((pCol->tableId != suid) && (pSma->stbUid != suid)) { + mDebug("tsma:%s, check colId:%d passed", pSma->name, pCol->colId); + goto NEXT2; + } + if ((pCol->colId) > 0 && (pCol->colId == colId)) { + sdbRelease(pSdb, pSma); + nodesDestroyNode(pAst); + terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TSMA; + mError("tsma:%s, check colId:%d conflicted", pSma->name, pCol->colId); + return -1; + } + mDebug("tsma:%s, check colId:%d passed", pSma->name, pCol->colId); + } + + NEXT2: + sdbRelease(pSdb, pSma); + nodesDestroyNode(pAst); + } + + return 0; +} + static int32_t mndDropSuperTableTag(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const char *tagName) { int32_t tag = mndFindSuperTableTagIndex(pOld, tagName); if (tag < 0) { @@ -1380,7 +1476,8 @@ static int32_t mndSetAlterStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj * return 0; } -static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, void* alterOriData, int32_t alterOriDataLen) { +static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, void *alterOriData, + int32_t alterOriDataLen) { SSdb *pSdb = pMnode->pSdb; SVgObj *pVgroup = NULL; void *pIter = NULL; @@ -1607,7 +1704,8 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, SStbObj *pObj, void **pCont, i return 0; } -static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp, void* alterOriData, int32_t alterOriDataLen) { +static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp, + void *alterOriData, int32_t alterOriDataLen) { int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq); if (pTrans == NULL) goto _OVER; @@ -2204,12 +2302,12 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)maxDelay, false); - char rollup[128 + VARSTR_HEADER_SIZE] = {0}; + char rollup[128 + VARSTR_HEADER_SIZE] = {0}; int32_t rollupNum = (int32_t)taosArrayGetSize(pStb->pFuncs); for (int32_t i = 0; i < rollupNum; ++i) { char *funcName = taosArrayGet(pStb->pFuncs, i); if (i) { - strcat(varDataVal(rollup), ", "); + strcat(varDataVal(rollup), ", "); } strcat(varDataVal(rollup), funcName); } diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 7e3e5a9838..f343c2e055 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -72,56 +72,6 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) { return strchr(topic, '.') + 1; } -int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char* stbname, int64_t suid, col_id_t colId) { - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; - while (1) { - SMqTopicObj *pTopic = NULL; - pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic); - if (pIter == NULL) break; - - mDebug("topic:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, subType:%d sql:%s", - pTopic->name, stbname, suid, colId, pTopic->subType, pTopic->sql); - if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) { - sdbRelease(pSdb, pTopic); - continue; - } - - SNode *pAst = NULL; - if (nodesStringToNode(pTopic->ast, &pAst) != 0) { - ASSERT(0); - return -1; - } - - SNodeList *pNodeList = NULL; - nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList); - SNode *pNode = NULL; - FOREACH(pNode, pNodeList) { - SColumnNode *pCol = (SColumnNode *)pNode; - mDebug("topic:%s, check colId:%d tableId:%" PRId64 " ctbStbUid:%" PRId64, pTopic->name, pCol->colId, pCol->tableId, pTopic->ctbStbUid); - - if (pCol->tableId != suid && pTopic->ctbStbUid != suid) { - mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId); - goto NEXT; - } - if (pCol->colId > 0 && pCol->colId == colId) { - sdbRelease(pSdb, pTopic); - nodesDestroyNode(pAst); - terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC; - mError("topic:%s, check colId:%d conflicted", pTopic->name, pCol->colId); - return -1; - } - mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId); - } - - NEXT: - sdbRelease(pSdb, pTopic); - nodesDestroyNode(pAst); - } - - return 0; -} - SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index dcf374f4c4..46519dce14 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -137,6 +137,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTa int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle); void *tsdbGetIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta); +uint64_t getReaderMaxVersion(STsdbReader *pReader); int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader); int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids); diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h index e08925acc3..12c3f7f2e8 100644 --- a/source/dnode/vnode/src/inc/meta.h +++ b/source/dnode/vnode/src/inc/meta.h @@ -78,7 +78,10 @@ struct SMeta { TTB* pTagIdx; TTB* pTtlIdx; - TTB* pSmaIdx; + TTB* pSmaIdx; + + TTB* pTaskIdx; + SMetaIdx* pIdx; }; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index ce83b335d7..d47b1cc5cb 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -153,7 +153,7 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen); -int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data); +int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data, int64_t ver); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/meta/metaOpen.c b/source/dnode/vnode/src/meta/metaOpen.c index 396a58c988..4910830c7d 100644 --- a/source/dnode/vnode/src/meta/metaOpen.c +++ b/source/dnode/vnode/src/meta/metaOpen.c @@ -22,6 +22,7 @@ static int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int uidIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int smaIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); +static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); } static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); } @@ -130,6 +131,12 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { goto _err; } + ret = tdbTbOpen("stream.task.db", sizeof(int64_t), -1, taskIdxKeyCmpr, pMeta->pEnv, &pMeta->pTaskIdx); + if (ret < 0) { + metaError("vgId: %d, failed to open meta stream task index since %s", TD_VID(pVnode), tstrerror(terrno)); + goto _err; + } + // open index if (metaOpenIdx(pMeta) < 0) { metaError("vgId:%d, failed to open meta index since %s", TD_VID(pVnode), tstrerror(terrno)); @@ -143,6 +150,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) { _err: if (pMeta->pIdx) metaCloseIdx(pMeta); + if (pMeta->pTaskIdx) tdbTbClose(pMeta->pTaskIdx); if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx); if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx); if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx); @@ -162,6 +170,7 @@ _err: int metaClose(SMeta *pMeta) { if (pMeta) { if (pMeta->pIdx) metaCloseIdx(pMeta); + if (pMeta->pTaskIdx) tdbTbClose(pMeta->pTaskIdx); if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx); if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx); if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx); @@ -378,3 +387,16 @@ static int smaIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL return 0; } + +static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) { + int32_t uid1 = *(int32_t *)pKey1; + int32_t uid2 = *(int32_t *)pKey2; + + if (uid1 > uid2) { + return 1; + } else if (uid1 < uid2) { + return -1; + } + + return 0; +} diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 86b016b8be..a05013d996 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -695,7 +695,7 @@ FAIL: return -1; } -int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) { +int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) { void* pIter = NULL; bool failed = false; SStreamDataSubmit* pSubmit = NULL; @@ -713,7 +713,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) { SStreamTask* pTask = *(SStreamTask**)pIter; if (!pTask->isDataScan) continue; - qDebug("data submit enqueue stream task: %d", pTask->taskId); + qDebug("data submit enqueue stream task: %d, ver: %ld", pTask->taskId, ver); if (!failed) { if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) { diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 290ffe5c8d..e866112250 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -48,7 +48,7 @@ int32_t tqMetaOpen(STQ* pTq) { ASSERT(0); } - if (tdbTbOpen("handles", -1, -1, 0, pTq->pMetaStore, &pTq->pExecStore) < 0) { + if (tdbTbOpen("handles", -1, -1, NULL, pTq->pMetaStore, &pTq->pExecStore) < 0) { ASSERT(0); } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 6097ddd49e..0720cf015b 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -252,7 +252,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) SSubmitReq* pReq = (SSubmitReq*)data; pReq->version = ver; - tqProcessStreamTrigger(pTq, data); + tqProcessStreamTrigger(pTq, data, ver); } return 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index ed2558d344..e924b29fd4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2502,6 +2502,10 @@ void* tsdbGetIvtIdx(SMeta* pMeta) { return metaGetIvtIdx(pMeta); } +uint64_t getReaderMaxVersion(STsdbReader *pReader) { + return pReader->verRange.maxVer; +} + /** * @brief Get all suids since suid * diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index d587e201ef..577f9772be 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -437,6 +437,7 @@ typedef struct SessionWindowSupporter { SStreamAggSupporter* pStreamAggSup; int64_t gap; uint8_t parentType; + SAggSupporter* pIntervalAggSup; } SessionWindowSupporter; typedef struct STimeWindowSupp { @@ -1009,6 +1010,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted); bool functionNeedToExecute(SqlFunctionCtx* pCtx); bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup); +bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup); void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid); void printDataBlock(SSDataBlock* pBlock, const char* flag); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e38034f4aa..8821dbd5a1 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1131,7 +1131,8 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC); // must check update info first. bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]); - if ((update || (isSignleIntervalWindow(pInfo) && isCloseWindow(&win, &pInfo->twAggSup))) && out) { + if ((update || (isSignleIntervalWindow(pInfo) && isCloseWindow(&win, &pInfo->twAggSup) && + isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup))) && out) { appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid); } } @@ -1337,6 +1338,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: { SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); if (pSDB) { + STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; + uint64_t version = getReaderMaxVersion(pTableScanInfo->dataReader); + updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->cond.twindows, pInfo->groupId,version); pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; checkUpdateData(pInfo, true, pSDB, false); return pSDB; @@ -1390,6 +1394,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { setBlockIntoRes(pInfo, &block); + if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId, pInfo->pRes->info.version)) { + printDataBlock(pInfo->pRes, "stream scan ignore"); + blockDataCleanup(pInfo->pRes); + continue; + } + if (pBlockInfo->rows > 0) { break; } @@ -1406,6 +1416,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { // record the scan action. pInfo->numOfExec++; pOperator->resultInfo.totalRows += pBlockInfo->rows; + printDataBlock(pInfo->pRes, "stream scan"); if (pBlockInfo->rows == 0) { updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 63143875a3..802e1f2306 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1456,6 +1456,7 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) { static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval, SHashObj* pPullDataMap, SArray* closeWins, SArray* pRecyPages, SDiskbasedBuf* pDiscBuf) { + qDebug("===stream===close interval window"); void* pIte = NULL; size_t keyLen = 0; while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) { @@ -1772,10 +1773,11 @@ SSDataBlock* createDeleteBlock() { return pBlock; } -void initIntervalDownStream(SOperatorInfo* downstream, uint8_t type) { +void initIntervalDownStream(SOperatorInfo* downstream, uint8_t type, SAggSupporter* pSup) { ASSERT(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN); SStreamScanInfo* pScanInfo = downstream->info; pScanInfo->sessionSup.parentType = type; + pScanInfo->sessionSup.pIntervalAggSup = pSup; } SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, @@ -1851,7 +1853,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL) { - initIntervalDownStream(downstream, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL); + initIntervalDownStream(downstream, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, &pInfo->aggSup); } code = appendDownstream(pOperator, &downstream, 1); @@ -3111,7 +3113,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { - initIntervalDownStream(downstream, pPhyNode->type); + initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup); } code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index f2a5ba0ab5..ff1ef7b4b9 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -125,6 +125,9 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma pInfo->pCloseWinSBF = NULL; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK); + pInfo->maxVersion = 0; + pInfo->scanGroupId = 0; + pInfo->scanWindow = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; return pInfo; } @@ -185,15 +188,36 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { } if (ts < pInfo->minTS) { + qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64 , tableId, maxTs, *pMapMaxTs, ts); return true; } else if (res == TSDB_CODE_SUCCESS) { return false; } - qDebug("===stream===bucket:%d, tableId:%" PRIu64 ", maxTs:" PRIu64 ", maxMapTs:" PRIu64 ", ts:%" PRIu64, index, tableId, maxTs, *pMapMaxTs, ts); + qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64 , tableId, maxTs, *pMapMaxTs, ts); // check from tsdb api return true; } +void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version) { + qDebug("===stream===groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version); + pInfo->scanWindow = *pWin; + pInfo->scanGroupId = groupId; + pInfo->maxVersion = version; +} + +bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version) { + if (!pInfo) { + return false; + } + qDebug("===stream===check groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version); + if (pInfo->scanGroupId == groupId && pInfo->scanWindow.skey <= pWin->skey && + pWin->ekey <= pInfo->scanWindow.ekey && version <= pInfo->maxVersion ) { + qDebug("===stream===ignore groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version); + return true; + } + return false; +} + void updateInfoDestroy(SUpdateInfo *pInfo) { if (pInfo == NULL) { return; diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 68d8c54e28..a29c7a1309 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -116,15 +116,15 @@ int32_t walRollback(SWal *pWal, int64_t ver) { } walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr); - TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND); + TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND); - if (pIdxTFile == NULL) { + if (pIdxFile == NULL) { ASSERT(0); taosThreadMutexUnlock(&pWal->mutex); return -1; } int64_t idxOff = walGetVerIdxOffset(pWal, ver); - code = taosLSeekFile(pIdxTFile, idxOff, SEEK_SET); + code = taosLSeekFile(pIdxFile, idxOff, SEEK_SET); if (code < 0) { ASSERT(0); taosThreadMutexUnlock(&pWal->mutex); @@ -132,7 +132,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) { } // read idx file and get log file pos SWalIdxEntry entry; - if (taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) { + if (taosReadFile(pIdxFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) { ASSERT(0); taosThreadMutexUnlock(&pWal->mutex); return -1; @@ -140,24 +140,24 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ASSERT(entry.ver == ver); walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr); - TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND); - if (pLogTFile == NULL) { - ASSERT(0); + TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND); + if (pLogFile == NULL) { // TODO + terrno = TAOS_SYSTEM_ERROR(errno); taosThreadMutexUnlock(&pWal->mutex); return -1; } - code = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET); + code = taosLSeekFile(pLogFile, entry.offset, SEEK_SET); if (code < 0) { - ASSERT(0); // TODO + terrno = TAOS_SYSTEM_ERROR(errno); taosThreadMutexUnlock(&pWal->mutex); return -1; } // validate offset SWalCkHead head; - ASSERT(taosValidFile(pLogTFile)); - int64_t size = taosReadFile(pLogTFile, &head, sizeof(SWalCkHead)); + ASSERT(taosValidFile(pLogFile)); + int64_t size = taosReadFile(pLogFile, &head, sizeof(SWalCkHead)); if (size != sizeof(SWalCkHead)) { ASSERT(0); taosThreadMutexUnlock(&pWal->mutex); @@ -180,14 +180,14 @@ int32_t walRollback(SWal *pWal, int64_t ver) { } // truncate old files - code = taosFtruncateFile(pLogTFile, entry.offset); + code = taosFtruncateFile(pLogFile, entry.offset); if (code < 0) { ASSERT(0); terrno = TAOS_SYSTEM_ERROR(errno); taosThreadMutexUnlock(&pWal->mutex); return -1; } - code = taosFtruncateFile(pIdxTFile, idxOff); + code = taosFtruncateFile(pIdxFile, idxOff); if (code < 0) { ASSERT(0); terrno = TAOS_SYSTEM_ERROR(errno); @@ -205,8 +205,10 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ASSERT(((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize == 0); ((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->firstVer = -1; } - taosCloseFile(&pIdxTFile); - taosCloseFile(&pLogTFile); + taosCloseFile(&pIdxFile); + taosCloseFile(&pLogFile); + + walSaveMeta(pWal); // unlock taosThreadMutexUnlock(&pWal->mutex); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index f20c5d8593..3835260335 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -261,6 +261,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC,"Field used by topic") TAOS_DEFINE_ERROR(TSDB_CODE_MND_SINGLE_STB_MODE_DB, "Database is single stable mode") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SCHEMA_VER, "Invalid schema version while alter stb") TAOS_DEFINE_ERROR(TSDB_CODE_MND_STABLE_UID_NOT_MATCH, "Invalid stable uid while alter stb") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_CONFLICT_WITH_TSMA, "Field used by tsma") // mnode-trans TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists")