Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/tsdb_last
This commit is contained in:
commit
e0900aad8d
|
@ -56,7 +56,6 @@ enum {
|
|||
STREAM_INPUT__DATA_SUBMIT = 1,
|
||||
STREAM_INPUT__DATA_BLOCK,
|
||||
STREAM_INPUT__MERGED_SUBMIT,
|
||||
// STREAM_INPUT__TABLE_SCAN,
|
||||
STREAM_INPUT__TQ_SCAN,
|
||||
STREAM_INPUT__DATA_RETRIEVE,
|
||||
STREAM_INPUT__GET_RES,
|
||||
|
@ -154,7 +153,7 @@ typedef struct SQueryTableDataCond {
|
|||
int32_t order; // desc|asc order to iterate the data block
|
||||
int32_t numOfCols;
|
||||
SColumnInfo* colList;
|
||||
int32_t type; // data block load type:
|
||||
int32_t type; // data block load type:
|
||||
STimeWindow twindows;
|
||||
int64_t startVersion;
|
||||
int64_t endVersion;
|
||||
|
|
|
@ -34,11 +34,16 @@ typedef struct SUpdateInfo {
|
|||
TSKEY minTS;
|
||||
SScalableBf* pCloseWinSBF;
|
||||
SHashObj* pMap;
|
||||
STimeWindow scanWindow;
|
||||
uint64_t scanGroupId;
|
||||
uint64_t maxVersion;
|
||||
} SUpdateInfo;
|
||||
|
||||
SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark);
|
||||
SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark);
|
||||
bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts);
|
||||
void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version);
|
||||
bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version);
|
||||
void updateInfoDestroy(SUpdateInfo *pInfo);
|
||||
void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo);
|
||||
void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo);
|
||||
|
|
|
@ -41,7 +41,7 @@ extern "C" {
|
|||
#define WAL_REFRESH_MS 1000
|
||||
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
|
||||
#define WAL_FILE_LEN (WAL_PATH_LEN + 32)
|
||||
#define WAL_MAGIC 0xFAFBFCFDULL
|
||||
#define WAL_MAGIC 0xFAFBFCFDF4F3F2F1ULL
|
||||
#define WAL_SCAN_BUF_SIZE (1024 * 1024 * 3)
|
||||
|
||||
typedef enum {
|
||||
|
|
|
@ -258,6 +258,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_MND_SINGLE_STB_MODE_DB TAOS_DEF_ERROR_CODE(0, 0x03C5)
|
||||
#define TSDB_CODE_MND_INVALID_SCHEMA_VER TAOS_DEF_ERROR_CODE(0, 0x03C6)
|
||||
#define TSDB_CODE_MND_STABLE_UID_NOT_MATCH TAOS_DEF_ERROR_CODE(0, 0x03C7)
|
||||
#define TSDB_CODE_MND_FIELD_CONFLICT_WITH_TSMA TAOS_DEF_ERROR_CODE(0, 0x03C8)
|
||||
|
||||
// mnode-trans
|
||||
#define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0)
|
||||
|
|
|
@ -1231,9 +1231,7 @@ int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
|
|||
colDataAssign(pDst, pSrc, src->info.rows, &src->info);
|
||||
}
|
||||
|
||||
dst->info.rows = src->info.rows;
|
||||
dst->info.window = src->info.window;
|
||||
dst->info.type = src->info.type;
|
||||
dst->info = src->info;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -1708,9 +1706,9 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
|
|||
int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock);
|
||||
int32_t rows = pDataBlock->info.rows;
|
||||
int32_t len = 0;
|
||||
len += snprintf(dumpBuf + len, size - len, "===stream===%s |block type %d|child id %d|group id:%" PRIu64 "|uid:%ld|rows:%d\n", flag,
|
||||
len += snprintf(dumpBuf + len, size - len, "===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%ld|rows:%d|version:%" PRIu64 "\n", flag,
|
||||
(int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId,
|
||||
pDataBlock->info.uid, pDataBlock->info.rows);
|
||||
pDataBlock->info.uid, pDataBlock->info.rows, pDataBlock->info.version);
|
||||
if (len >= size - 1) return dumpBuf;
|
||||
|
||||
for (int32_t j = 0; j < rows; j++) {
|
||||
|
|
|
@ -37,8 +37,6 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]);
|
|||
|
||||
int32_t mndSetTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);
|
||||
|
||||
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char* stbname, int64_t suid, col_id_t colId);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -45,7 +45,9 @@ static int32_t mndProcessTableMetaReq(SRpcMsg *pReq);
|
|||
static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
|
||||
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
|
||||
static int32_t mndProcessTableCfgReq(SRpcMsg *pReq);
|
||||
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp, void* alterOriData, int32_t alterOriDataLen);
|
||||
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
|
||||
void *alterOriData, int32_t alterOriDataLen);
|
||||
static int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t suid, col_id_t colId);
|
||||
|
||||
int32_t mndInitStb(SMnode *pMnode) {
|
||||
SSdbTable table = {
|
||||
|
@ -409,7 +411,8 @@ static FORCE_INLINE int32_t schemaExColIdCompare(const void *colId, const void *
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen, void* alterOriData, int32_t alterOriDataLen) {
|
||||
static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, int32_t *pContLen,
|
||||
void *alterOriData, int32_t alterOriDataLen) {
|
||||
SEncoder encoder = {0};
|
||||
int32_t contLen;
|
||||
SName name = {0};
|
||||
|
@ -709,7 +712,8 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat
|
|||
memcpy(pDst->db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
pDst->createdTime = taosGetTimestampMs();
|
||||
pDst->updateTime = pDst->createdTime;
|
||||
pDst->uid = (pCreate->source == TD_REQ_FROM_TAOX) ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||
pDst->uid =
|
||||
(pCreate->source == TD_REQ_FROM_TAOX) ? pCreate->suid : mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
|
||||
pDst->dbUid = pDb->uid;
|
||||
pDst->tagVer = 1;
|
||||
pDst->colVer = 1;
|
||||
|
@ -895,9 +899,9 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq
|
|||
pSchema->flags = pField->flags;
|
||||
memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
|
||||
int32_t cIndex = mndFindSuperTableColumnIndex(pStb, pField->name);
|
||||
if (cIndex >= 0){
|
||||
if (cIndex >= 0) {
|
||||
pSchema->colId = pStb->pColumns[cIndex].colId;
|
||||
}else{
|
||||
} else {
|
||||
pSchema->colId = pDst->nextColId++;
|
||||
}
|
||||
}
|
||||
|
@ -909,12 +913,11 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq
|
|||
pSchema->bytes = pField->bytes;
|
||||
memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
|
||||
int32_t cIndex = mndFindSuperTableTagIndex(pStb, pField->name);
|
||||
if (cIndex >= 0){
|
||||
if (cIndex >= 0) {
|
||||
pSchema->colId = pStb->pTags[cIndex].colId;
|
||||
}else{
|
||||
} else {
|
||||
pSchema->colId = pDst->nextColId++;
|
||||
}
|
||||
|
||||
}
|
||||
pDst->tagVer = createReq->tagVer;
|
||||
pDst->colVer = createReq->colVer;
|
||||
|
@ -982,7 +985,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
|
|||
}
|
||||
} else if (terrno != TSDB_CODE_MND_STB_NOT_EXIST) {
|
||||
goto _OVER;
|
||||
} else if (createReq.source == TD_REQ_FROM_TAOX && (createReq.tagVer != 1 || createReq.colVer != 1)){
|
||||
} else if (createReq.source == TD_REQ_FROM_TAOX && (createReq.tagVer != 1 || createReq.colVer != 1)) {
|
||||
mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name);
|
||||
code = 0;
|
||||
goto _OVER;
|
||||
|
@ -1009,7 +1012,7 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
if (isAlter) {
|
||||
bool needRsp = false;
|
||||
bool needRsp = false;
|
||||
SStbObj pDst = {0};
|
||||
if (mndBuildStbFromAlter(pStb, &pDst, &createReq) != 0) {
|
||||
taosMemoryFreeClear(pDst.pTags);
|
||||
|
@ -1137,6 +1140,99 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char *stbname, int64_t suid, col_id_t colId) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SMqTopicObj *pTopic = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
mDebug("topic:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, subType:%d sql:%s",
|
||||
pTopic->name, stbname, suid, colId, pTopic->subType, pTopic->sql);
|
||||
if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
|
||||
sdbRelease(pSdb, pTopic);
|
||||
continue;
|
||||
}
|
||||
|
||||
SNode *pAst = NULL;
|
||||
if (nodesStringToNode(pTopic->ast, &pAst) != 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
SNodeList *pNodeList = NULL;
|
||||
nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList);
|
||||
SNode *pNode = NULL;
|
||||
FOREACH(pNode, pNodeList) {
|
||||
SColumnNode *pCol = (SColumnNode *)pNode;
|
||||
mDebug("topic:%s, check colId:%d tableId:%" PRId64 " ctbStbUid:%" PRId64, pTopic->name, pCol->colId,
|
||||
pCol->tableId, pTopic->ctbStbUid);
|
||||
|
||||
if (pCol->tableId != suid && pTopic->ctbStbUid != suid) {
|
||||
mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
|
||||
goto NEXT;
|
||||
}
|
||||
if (pCol->colId > 0 && pCol->colId == colId) {
|
||||
sdbRelease(pSdb, pTopic);
|
||||
nodesDestroyNode(pAst);
|
||||
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
|
||||
mError("topic:%s, check colId:%d conflicted", pTopic->name, pCol->colId);
|
||||
return -1;
|
||||
}
|
||||
mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
|
||||
}
|
||||
|
||||
NEXT:
|
||||
sdbRelease(pSdb, pTopic);
|
||||
nodesDestroyNode(pAst);
|
||||
}
|
||||
|
||||
while (1) {
|
||||
SSmaObj *pSma = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
mDebug("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, sql:%s", pSma->name, stbname,
|
||||
suid, colId, pSma->sql);
|
||||
|
||||
SNode *pAst = NULL;
|
||||
if (nodesStringToNode(pSma->ast, &pAst) != 0) {
|
||||
terrno = TSDB_CODE_SDB_INVALID_DATA_CONTENT;
|
||||
mError("tsma:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d failed since parse AST err",
|
||||
pSma->name, stbname, suid, colId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
SNodeList *pNodeList = NULL;
|
||||
nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList);
|
||||
SNode *pNode = NULL;
|
||||
FOREACH(pNode, pNodeList) {
|
||||
SColumnNode *pCol = (SColumnNode *)pNode;
|
||||
mDebug("tsma:%s, check colId:%d tableId:%" PRId64, pSma->name, pCol->colId, pCol->tableId);
|
||||
|
||||
if ((pCol->tableId != suid) && (pSma->stbUid != suid)) {
|
||||
mDebug("tsma:%s, check colId:%d passed", pSma->name, pCol->colId);
|
||||
goto NEXT2;
|
||||
}
|
||||
if ((pCol->colId) > 0 && (pCol->colId == colId)) {
|
||||
sdbRelease(pSdb, pSma);
|
||||
nodesDestroyNode(pAst);
|
||||
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TSMA;
|
||||
mError("tsma:%s, check colId:%d conflicted", pSma->name, pCol->colId);
|
||||
return -1;
|
||||
}
|
||||
mDebug("tsma:%s, check colId:%d passed", pSma->name, pCol->colId);
|
||||
}
|
||||
|
||||
NEXT2:
|
||||
sdbRelease(pSdb, pSma);
|
||||
nodesDestroyNode(pAst);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndDropSuperTableTag(SMnode *pMnode, const SStbObj *pOld, SStbObj *pNew, const char *tagName) {
|
||||
int32_t tag = mndFindSuperTableTagIndex(pOld, tagName);
|
||||
if (tag < 0) {
|
||||
|
@ -1380,7 +1476,8 @@ static int32_t mndSetAlterStbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, void* alterOriData, int32_t alterOriDataLen) {
|
||||
static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb, void *alterOriData,
|
||||
int32_t alterOriDataLen) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SVgObj *pVgroup = NULL;
|
||||
void *pIter = NULL;
|
||||
|
@ -1607,7 +1704,8 @@ static int32_t mndBuildSMAlterStbRsp(SDbObj *pDb, SStbObj *pObj, void **pCont, i
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp, void* alterOriData, int32_t alterOriDataLen) {
|
||||
static int32_t mndAlterStbImp(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *pStb, bool needRsp,
|
||||
void *alterOriData, int32_t alterOriDataLen) {
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
@ -2204,12 +2302,12 @@ static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
|
|||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, numOfRows, (const char *)maxDelay, false);
|
||||
|
||||
char rollup[128 + VARSTR_HEADER_SIZE] = {0};
|
||||
char rollup[128 + VARSTR_HEADER_SIZE] = {0};
|
||||
int32_t rollupNum = (int32_t)taosArrayGetSize(pStb->pFuncs);
|
||||
for (int32_t i = 0; i < rollupNum; ++i) {
|
||||
char *funcName = taosArrayGet(pStb->pFuncs, i);
|
||||
if (i) {
|
||||
strcat(varDataVal(rollup), ", ");
|
||||
strcat(varDataVal(rollup), ", ");
|
||||
}
|
||||
strcat(varDataVal(rollup), funcName);
|
||||
}
|
||||
|
|
|
@ -72,56 +72,6 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) {
|
|||
return strchr(topic, '.') + 1;
|
||||
}
|
||||
|
||||
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, const char* stbname, int64_t suid, col_id_t colId) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SMqTopicObj *pTopic = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
mDebug("topic:%s, check tag and column modifiable, stb:%s suid:%" PRId64 " colId:%d, subType:%d sql:%s",
|
||||
pTopic->name, stbname, suid, colId, pTopic->subType, pTopic->sql);
|
||||
if (pTopic->subType != TOPIC_SUB_TYPE__COLUMN) {
|
||||
sdbRelease(pSdb, pTopic);
|
||||
continue;
|
||||
}
|
||||
|
||||
SNode *pAst = NULL;
|
||||
if (nodesStringToNode(pTopic->ast, &pAst) != 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
SNodeList *pNodeList = NULL;
|
||||
nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList);
|
||||
SNode *pNode = NULL;
|
||||
FOREACH(pNode, pNodeList) {
|
||||
SColumnNode *pCol = (SColumnNode *)pNode;
|
||||
mDebug("topic:%s, check colId:%d tableId:%" PRId64 " ctbStbUid:%" PRId64, pTopic->name, pCol->colId, pCol->tableId, pTopic->ctbStbUid);
|
||||
|
||||
if (pCol->tableId != suid && pTopic->ctbStbUid != suid) {
|
||||
mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
|
||||
goto NEXT;
|
||||
}
|
||||
if (pCol->colId > 0 && pCol->colId == colId) {
|
||||
sdbRelease(pSdb, pTopic);
|
||||
nodesDestroyNode(pAst);
|
||||
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
|
||||
mError("topic:%s, check colId:%d conflicted", pTopic->name, pCol->colId);
|
||||
return -1;
|
||||
}
|
||||
mDebug("topic:%s, check colId:%d passed", pTopic->name, pCol->colId);
|
||||
}
|
||||
|
||||
NEXT:
|
||||
sdbRelease(pSdb, pTopic);
|
||||
nodesDestroyNode(pAst);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
|
|
|
@ -137,6 +137,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTa
|
|||
int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle);
|
||||
void *tsdbGetIdx(SMeta *pMeta);
|
||||
void *tsdbGetIvtIdx(SMeta *pMeta);
|
||||
uint64_t getReaderMaxVersion(STsdbReader *pReader);
|
||||
|
||||
int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader);
|
||||
int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
|
||||
|
|
|
@ -78,7 +78,10 @@ struct SMeta {
|
|||
TTB* pTagIdx;
|
||||
TTB* pTtlIdx;
|
||||
|
||||
TTB* pSmaIdx;
|
||||
TTB* pSmaIdx;
|
||||
|
||||
TTB* pTaskIdx;
|
||||
|
||||
SMetaIdx* pIdx;
|
||||
};
|
||||
|
||||
|
|
|
@ -153,7 +153,7 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen);
|
|||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data);
|
||||
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* data, int64_t ver);
|
||||
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
|
|
|
@ -22,6 +22,7 @@ static int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL
|
|||
static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||
static int uidIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||
static int smaIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||
static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||
|
||||
static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); }
|
||||
static int32_t metaDestroyLock(SMeta *pMeta) { return taosThreadRwlockDestroy(&pMeta->lock); }
|
||||
|
@ -130,6 +131,12 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
ret = tdbTbOpen("stream.task.db", sizeof(int64_t), -1, taskIdxKeyCmpr, pMeta->pEnv, &pMeta->pTaskIdx);
|
||||
if (ret < 0) {
|
||||
metaError("vgId: %d, failed to open meta stream task index since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
||||
// open index
|
||||
if (metaOpenIdx(pMeta) < 0) {
|
||||
metaError("vgId:%d, failed to open meta index since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
|
@ -143,6 +150,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
|
|||
|
||||
_err:
|
||||
if (pMeta->pIdx) metaCloseIdx(pMeta);
|
||||
if (pMeta->pTaskIdx) tdbTbClose(pMeta->pTaskIdx);
|
||||
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
|
||||
if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx);
|
||||
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
|
||||
|
@ -162,6 +170,7 @@ _err:
|
|||
int metaClose(SMeta *pMeta) {
|
||||
if (pMeta) {
|
||||
if (pMeta->pIdx) metaCloseIdx(pMeta);
|
||||
if (pMeta->pTaskIdx) tdbTbClose(pMeta->pTaskIdx);
|
||||
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
|
||||
if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx);
|
||||
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
|
||||
|
@ -378,3 +387,16 @@ static int smaIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL
|
|||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
|
||||
int32_t uid1 = *(int32_t *)pKey1;
|
||||
int32_t uid2 = *(int32_t *)pKey2;
|
||||
|
||||
if (uid1 > uid2) {
|
||||
return 1;
|
||||
} else if (uid1 < uid2) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -695,7 +695,7 @@ FAIL:
|
|||
return -1;
|
||||
}
|
||||
|
||||
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
|
||||
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
|
||||
void* pIter = NULL;
|
||||
bool failed = false;
|
||||
SStreamDataSubmit* pSubmit = NULL;
|
||||
|
@ -713,7 +713,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq) {
|
|||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||
if (!pTask->isDataScan) continue;
|
||||
|
||||
qDebug("data submit enqueue stream task: %d", pTask->taskId);
|
||||
qDebug("data submit enqueue stream task: %d, ver: %ld", pTask->taskId, ver);
|
||||
|
||||
if (!failed) {
|
||||
if (streamTaskInput(pTask, (SStreamQueueItem*)pSubmit) < 0) {
|
||||
|
|
|
@ -48,7 +48,7 @@ int32_t tqMetaOpen(STQ* pTq) {
|
|||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (tdbTbOpen("handles", -1, -1, 0, pTq->pMetaStore, &pTq->pExecStore) < 0) {
|
||||
if (tdbTbOpen("handles", -1, -1, NULL, pTq->pMetaStore, &pTq->pExecStore) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
|
|
|
@ -252,7 +252,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
|||
SSubmitReq* pReq = (SSubmitReq*)data;
|
||||
pReq->version = ver;
|
||||
|
||||
tqProcessStreamTrigger(pTq, data);
|
||||
tqProcessStreamTrigger(pTq, data, ver);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -2502,6 +2502,10 @@ void* tsdbGetIvtIdx(SMeta* pMeta) {
|
|||
return metaGetIvtIdx(pMeta);
|
||||
}
|
||||
|
||||
uint64_t getReaderMaxVersion(STsdbReader *pReader) {
|
||||
return pReader->verRange.maxVer;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Get all suids since suid
|
||||
*
|
||||
|
|
|
@ -437,6 +437,7 @@ typedef struct SessionWindowSupporter {
|
|||
SStreamAggSupporter* pStreamAggSup;
|
||||
int64_t gap;
|
||||
uint8_t parentType;
|
||||
SAggSupporter* pIntervalAggSup;
|
||||
} SessionWindowSupporter;
|
||||
|
||||
typedef struct STimeWindowSupp {
|
||||
|
@ -1009,6 +1010,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs,
|
|||
TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted);
|
||||
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
|
||||
bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup);
|
||||
bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
|
||||
void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid);
|
||||
void printDataBlock(SSDataBlock* pBlock, const char* flag);
|
||||
|
||||
|
|
|
@ -1131,7 +1131,8 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
|
|||
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC);
|
||||
// must check update info first.
|
||||
bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]);
|
||||
if ((update || (isSignleIntervalWindow(pInfo) && isCloseWindow(&win, &pInfo->twAggSup))) && out) {
|
||||
if ((update || (isSignleIntervalWindow(pInfo) && isCloseWindow(&win, &pInfo->twAggSup) &&
|
||||
isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup))) && out) {
|
||||
appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid);
|
||||
}
|
||||
}
|
||||
|
@ -1337,6 +1338,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: {
|
||||
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
||||
if (pSDB) {
|
||||
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||
uint64_t version = getReaderMaxVersion(pTableScanInfo->dataReader);
|
||||
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->cond.twindows, pInfo->groupId,version);
|
||||
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
|
||||
checkUpdateData(pInfo, true, pSDB, false);
|
||||
return pSDB;
|
||||
|
@ -1390,6 +1394,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
|
||||
setBlockIntoRes(pInfo, &block);
|
||||
|
||||
if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId, pInfo->pRes->info.version)) {
|
||||
printDataBlock(pInfo->pRes, "stream scan ignore");
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pBlockInfo->rows > 0) {
|
||||
break;
|
||||
}
|
||||
|
@ -1406,6 +1416,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
// record the scan action.
|
||||
pInfo->numOfExec++;
|
||||
pOperator->resultInfo.totalRows += pBlockInfo->rows;
|
||||
printDataBlock(pInfo->pRes, "stream scan");
|
||||
|
||||
if (pBlockInfo->rows == 0) {
|
||||
updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo);
|
||||
|
|
|
@ -1456,6 +1456,7 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
|
|||
static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval,
|
||||
SHashObj* pPullDataMap, SArray* closeWins, SArray* pRecyPages,
|
||||
SDiskbasedBuf* pDiscBuf) {
|
||||
qDebug("===stream===close interval window");
|
||||
void* pIte = NULL;
|
||||
size_t keyLen = 0;
|
||||
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
|
||||
|
@ -1772,10 +1773,11 @@ SSDataBlock* createDeleteBlock() {
|
|||
return pBlock;
|
||||
}
|
||||
|
||||
void initIntervalDownStream(SOperatorInfo* downstream, uint8_t type) {
|
||||
void initIntervalDownStream(SOperatorInfo* downstream, uint8_t type, SAggSupporter* pSup) {
|
||||
ASSERT(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
|
||||
SStreamScanInfo* pScanInfo = downstream->info;
|
||||
pScanInfo->sessionSup.parentType = type;
|
||||
pScanInfo->sessionSup.pIntervalAggSup = pSup;
|
||||
}
|
||||
|
||||
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
|
@ -1851,7 +1853,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||
|
||||
if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL) {
|
||||
initIntervalDownStream(downstream, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL);
|
||||
initIntervalDownStream(downstream, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, &pInfo->aggSup);
|
||||
}
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
|
@ -3111,7 +3113,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo,
|
||||
aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
|
||||
initIntervalDownStream(downstream, pPhyNode->type);
|
||||
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup);
|
||||
}
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
|
|
@ -125,6 +125,9 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma
|
|||
pInfo->pCloseWinSBF = NULL;
|
||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||
pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK);
|
||||
pInfo->maxVersion = 0;
|
||||
pInfo->scanGroupId = 0;
|
||||
pInfo->scanWindow = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
|
||||
return pInfo;
|
||||
}
|
||||
|
||||
|
@ -185,15 +188,36 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) {
|
|||
}
|
||||
|
||||
if (ts < pInfo->minTS) {
|
||||
qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64 , tableId, maxTs, *pMapMaxTs, ts);
|
||||
return true;
|
||||
} else if (res == TSDB_CODE_SUCCESS) {
|
||||
return false;
|
||||
}
|
||||
qDebug("===stream===bucket:%d, tableId:%" PRIu64 ", maxTs:" PRIu64 ", maxMapTs:" PRIu64 ", ts:%" PRIu64, index, tableId, maxTs, *pMapMaxTs, ts);
|
||||
qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64 , tableId, maxTs, *pMapMaxTs, ts);
|
||||
// check from tsdb api
|
||||
return true;
|
||||
}
|
||||
|
||||
void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version) {
|
||||
qDebug("===stream===groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version);
|
||||
pInfo->scanWindow = *pWin;
|
||||
pInfo->scanGroupId = groupId;
|
||||
pInfo->maxVersion = version;
|
||||
}
|
||||
|
||||
bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version) {
|
||||
if (!pInfo) {
|
||||
return false;
|
||||
}
|
||||
qDebug("===stream===check groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version);
|
||||
if (pInfo->scanGroupId == groupId && pInfo->scanWindow.skey <= pWin->skey &&
|
||||
pWin->ekey <= pInfo->scanWindow.ekey && version <= pInfo->maxVersion ) {
|
||||
qDebug("===stream===ignore groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void updateInfoDestroy(SUpdateInfo *pInfo) {
|
||||
if (pInfo == NULL) {
|
||||
return;
|
||||
|
|
|
@ -116,15 +116,15 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
|||
}
|
||||
|
||||
walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
|
||||
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
|
||||
TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
|
||||
|
||||
if (pIdxTFile == NULL) {
|
||||
if (pIdxFile == NULL) {
|
||||
ASSERT(0);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
int64_t idxOff = walGetVerIdxOffset(pWal, ver);
|
||||
code = taosLSeekFile(pIdxTFile, idxOff, SEEK_SET);
|
||||
code = taosLSeekFile(pIdxFile, idxOff, SEEK_SET);
|
||||
if (code < 0) {
|
||||
ASSERT(0);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
|
@ -132,7 +132,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
|||
}
|
||||
// read idx file and get log file pos
|
||||
SWalIdxEntry entry;
|
||||
if (taosReadFile(pIdxTFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
|
||||
if (taosReadFile(pIdxFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
|
||||
ASSERT(0);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
|
@ -140,24 +140,24 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
|||
ASSERT(entry.ver == ver);
|
||||
|
||||
walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
|
||||
TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
|
||||
if (pLogTFile == NULL) {
|
||||
ASSERT(0);
|
||||
TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
|
||||
if (pLogFile == NULL) {
|
||||
// TODO
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
code = taosLSeekFile(pLogTFile, entry.offset, SEEK_SET);
|
||||
code = taosLSeekFile(pLogFile, entry.offset, SEEK_SET);
|
||||
if (code < 0) {
|
||||
ASSERT(0);
|
||||
// TODO
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
// validate offset
|
||||
SWalCkHead head;
|
||||
ASSERT(taosValidFile(pLogTFile));
|
||||
int64_t size = taosReadFile(pLogTFile, &head, sizeof(SWalCkHead));
|
||||
ASSERT(taosValidFile(pLogFile));
|
||||
int64_t size = taosReadFile(pLogFile, &head, sizeof(SWalCkHead));
|
||||
if (size != sizeof(SWalCkHead)) {
|
||||
ASSERT(0);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
|
@ -180,14 +180,14 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
|||
}
|
||||
|
||||
// truncate old files
|
||||
code = taosFtruncateFile(pLogTFile, entry.offset);
|
||||
code = taosFtruncateFile(pLogFile, entry.offset);
|
||||
if (code < 0) {
|
||||
ASSERT(0);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
code = taosFtruncateFile(pIdxTFile, idxOff);
|
||||
code = taosFtruncateFile(pIdxFile, idxOff);
|
||||
if (code < 0) {
|
||||
ASSERT(0);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
|
@ -205,8 +205,10 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
|||
ASSERT(((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize == 0);
|
||||
((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->firstVer = -1;
|
||||
}
|
||||
taosCloseFile(&pIdxTFile);
|
||||
taosCloseFile(&pLogTFile);
|
||||
taosCloseFile(&pIdxFile);
|
||||
taosCloseFile(&pLogFile);
|
||||
|
||||
walSaveMeta(pWal);
|
||||
|
||||
// unlock
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
|
|
|
@ -261,6 +261,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC,"Field used by topic")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SINGLE_STB_MODE_DB, "Database is single stable mode")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SCHEMA_VER, "Invalid schema version while alter stb")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STABLE_UID_NOT_MATCH, "Invalid stable uid while alter stb")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_CONFLICT_WITH_TSMA, "Field used by tsma")
|
||||
|
||||
// mnode-trans
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists")
|
||||
|
|
Loading…
Reference in New Issue