feat(tmq): check alter for ntb

This commit is contained in:
Liu Jicong 2022-07-22 16:05:28 +08:00
parent ac2b13740b
commit 2441ec22bd
27 changed files with 273 additions and 157 deletions

View File

@ -34,11 +34,10 @@ typedef enum {
TSDB_SUPER_TABLE = 1, // super table TSDB_SUPER_TABLE = 1, // super table
TSDB_CHILD_TABLE = 2, // table created from super table TSDB_CHILD_TABLE = 2, // table created from super table
TSDB_NORMAL_TABLE = 3, // ordinary table TSDB_NORMAL_TABLE = 3, // ordinary table
TSDB_STREAM_TABLE = 4, // table created from stream computing TSDB_TEMP_TABLE = 4, // temp table created by nest query
TSDB_TEMP_TABLE = 5, // temp table created by nest query TSDB_SYSTEM_TABLE = 5,
TSDB_SYSTEM_TABLE = 6, TSDB_TSMA_TABLE = 6, // time-range-wise sma
TSDB_TSMA_TABLE = 7, // time-range-wise sma TSDB_TABLE_MAX = 7
TSDB_TABLE_MAX = 8
} ETableType; } ETableType;
typedef enum { typedef enum {

View File

@ -2537,6 +2537,15 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq)
return (void*)buf; return (void*)buf;
} }
typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN];
int64_t ntbUid;
SArray* colIdList; // SArray<int16_t>
} SCheckAlterInfo;
int32_t tEncodeSCheckAlterInfo(SEncoder* pEncoder, const SCheckAlterInfo* pInfo);
int32_t tDecodeSCheckAlterInfo(SDecoder* pDecoder, SCheckAlterInfo* pInfo);
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
int64_t offset; int64_t offset;

View File

@ -186,6 +186,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp) TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp) TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_COMMIT_OFFSET, "vnode-commit-offset", STqOffset, STqOffset) TD_DEF_MSG_TYPE(TDMT_VND_MQ_COMMIT_OFFSET, "vnode-commit-offset", STqOffset, STqOffset)
TD_DEF_MSG_TYPE(TDMT_VND_CHECK_ALTER_INFO, "vnode-alter-check-info", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL)

View File

@ -64,7 +64,8 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers);
* @param SReadHandle * @param SReadHandle
* @return * @return
*/ */
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchemaWrapper); qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols,
SSchemaWrapper** pSchemaWrapper, int64_t* ntbUid);
/** /**
* Set the input data block for the stream scan. * Set the input data block for the stream scan.

View File

@ -332,6 +332,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_VND_INVALID_TABLE_ACTION TAOS_DEF_ERROR_CODE(0, 0x0519) #define TSDB_CODE_VND_INVALID_TABLE_ACTION TAOS_DEF_ERROR_CODE(0, 0x0519)
#define TSDB_CODE_VND_COL_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051a) #define TSDB_CODE_VND_COL_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051a)
#define TSDB_CODE_VND_TABLE_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051b) #define TSDB_CODE_VND_TABLE_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051b)
#define TSDB_CODE_VND_COL_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x051c)
// tsdb // tsdb
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600)

View File

@ -5623,6 +5623,33 @@ int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) {
return 0; return 0;
} }
int32_t tEncodeSCheckAlterInfo(SEncoder *pEncoder, const SCheckAlterInfo *pInfo) {
if (tEncodeCStr(pEncoder, pInfo->topic) < 0) return -1;
if (tEncodeI64(pEncoder, pInfo->ntbUid) < 0) return -1;
int32_t sz = taosArrayGetSize(pInfo->colIdList);
if (tEncodeI32(pEncoder, sz) < 0) return -1;
for (int32_t i = 0; i < sz; i++) {
int16_t colId = *(int16_t *)taosArrayGet(pInfo->colIdList, i);
if (tEncodeI16(pEncoder, colId) < 0) return -1;
}
return pEncoder->pos;
}
int32_t tDecodeSCheckAlterInfo(SDecoder *pDecoder, SCheckAlterInfo *pInfo) {
if (tDecodeCStrTo(pDecoder, pInfo->topic) < 0) return -1;
if (tDecodeI64(pDecoder, &pInfo->ntbUid) < 0) return -1;
int32_t sz;
if (tDecodeI32(pDecoder, &sz) < 0) return -1;
pInfo->colIdList = taosArrayInit(sz, sizeof(int16_t));
if (pInfo->colIdList == NULL) return -1;
for (int32_t i = 0; i < sz; i++) {
int16_t colId;
if (tDecodeI16(pDecoder, &colId) < 0) return -1;
taosArrayPush(pInfo->colIdList, &colId);
}
return 0;
}
int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) { int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) {
int32_t nUid = taosArrayGetSize(pRes->uidList); int32_t nUid = taosArrayGetSize(pRes->uidList);

View File

@ -224,6 +224,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CHECK_ALTER_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -352,6 +352,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CHECK_ALTER_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -144,33 +144,33 @@ typedef enum {
} ECsmUpdateType; } ECsmUpdateType;
typedef struct { typedef struct {
int32_t id; int32_t id;
ETrnStage stage; ETrnStage stage;
ETrnPolicy policy; ETrnPolicy policy;
ETrnConflct conflict; ETrnConflct conflict;
ETrnExec exec; ETrnExec exec;
EOperType oper; EOperType oper;
int32_t code; int32_t code;
int32_t failedTimes; int32_t failedTimes;
void* rpcRsp; void* rpcRsp;
int32_t rpcRspLen; int32_t rpcRspLen;
int32_t redoActionPos; int32_t redoActionPos;
SArray* redoActions; SArray* redoActions;
SArray* undoActions; SArray* undoActions;
SArray* commitActions; SArray* commitActions;
int64_t createdTime; int64_t createdTime;
int64_t lastExecTime; int64_t lastExecTime;
int32_t lastAction; int32_t lastAction;
int32_t lastErrorNo; int32_t lastErrorNo;
tmsg_t lastMsgType; tmsg_t lastMsgType;
SEpSet lastEpset; SEpSet lastEpset;
char dbname1[TSDB_DB_FNAME_LEN]; char dbname1[TSDB_DB_FNAME_LEN];
char dbname2[TSDB_DB_FNAME_LEN]; char dbname2[TSDB_DB_FNAME_LEN];
int32_t startFunc; int32_t startFunc;
int32_t stopFunc; int32_t stopFunc;
int32_t paramLen; int32_t paramLen;
void* param; void* param;
SArray* pRpcArray; SArray* pRpcArray;
} STrans; } STrans;
typedef struct { typedef struct {
@ -477,6 +477,9 @@ typedef struct {
char* physicalPlan; char* physicalPlan;
SSchemaWrapper schema; SSchemaWrapper schema;
int64_t stbUid; int64_t stbUid;
// forbid condition
int64_t ntbUid;
SArray* ntbColIds;
} SMqTopicObj; } SMqTopicObj;
typedef struct { typedef struct {

View File

@ -57,6 +57,7 @@ int32_t mndInitTopic(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_CHECK_ALTER_INFO_RSP, mndTransProcessRsp);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
@ -127,8 +128,10 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
if (pTopic->schema.nCols) { if (pTopic->schema.nCols) {
schemaLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema); schemaLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
} }
int32_t size = int32_t ntbColLen = taosArrayGetSize(pTopic->ntbColIds) * sizeof(int16_t);
sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + pTopic->astLen + schemaLen + MND_TOPIC_RESERVE_SIZE;
int32_t size = sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + pTopic->astLen + schemaLen + ntbColLen +
MND_TOPIC_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size); SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
if (pRaw == NULL) goto TOPIC_ENCODE_OVER; if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
@ -164,6 +167,15 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema); taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema);
SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
} }
SDB_SET_INT64(pRaw, dataPos, pTopic->ntbUid, TOPIC_ENCODE_OVER);
if (pTopic->ntbUid != 0) {
int32_t sz = taosArrayGetSize(pTopic->ntbColIds);
SDB_SET_INT32(pRaw, dataPos, sz, TOPIC_ENCODE_OVER);
for (int32_t i = 0; i < sz; i++) {
int16_t colId = *(int16_t *)taosArrayGet(pTopic->ntbColIds, i);
SDB_SET_INT16(pRaw, dataPos, colId, TOPIC_ENCODE_OVER);
}
}
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER); SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER); SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);
@ -259,6 +271,19 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
pTopic->schema.version = 0; pTopic->schema.version = 0;
pTopic->schema.pSchema = NULL; pTopic->schema.pSchema = NULL;
} }
SDB_GET_INT64(pRaw, dataPos, &pTopic->ntbUid, TOPIC_DECODE_OVER);
if (pTopic->ntbUid != 0) {
int32_t ntbColNum;
SDB_GET_INT32(pRaw, dataPos, &ntbColNum, TOPIC_DECODE_OVER);
pTopic->ntbColIds = taosArrayInit(ntbColNum, sizeof(int16_t));
if (pTopic->ntbColIds == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto TOPIC_DECODE_OVER;
}
int16_t colId;
SDB_GET_INT16(pRaw, dataPos, &colId, TOPIC_DECODE_OVER);
taosArrayPush(pTopic->ntbColIds, &colId);
}
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER); SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
@ -346,6 +371,19 @@ static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
return 0; return 0;
} }
static int32_t extractTopicTbInfo(SNode *pAst, int64_t *ntbUid, SArray *colIds) {
SNodeList *pNodeList = NULL;
nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList);
SNode *pNode = NULL;
FOREACH(pNode, pNodeList) {
SColumnNode *pCol = (SColumnNode *)pNode;
if (pCol->tableType != TSDB_NORMAL_TABLE) return -1;
*ntbUid = pCol->tableId;
taosArrayPush(colIds, &pCol->colId);
}
return 0;
}
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) { static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
mDebug("topic:%s to create", pCreate->name); mDebug("topic:%s to create", pCreate->name);
SMqTopicObj topicObj = {0}; SMqTopicObj topicObj = {0};
@ -386,6 +424,19 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
return -1; return -1;
} }
int64_t ntbUid;
SArray *colIds = taosArrayInit(0, sizeof(int16_t));
if (colIds == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (extractTopicTbInfo(pAst, &ntbUid, colIds) < 0) {
taosArrayDestroy(colIds);
} else {
topicObj.ntbUid = ntbUid;
topicObj.ntbColIds = colIds;
}
if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) { if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) {
mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
taosMemoryFree(topicObj.ast); taosMemoryFree(topicObj.ast);
@ -433,6 +484,59 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
} }
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (topicObj.ntbUid != 0) {
SCheckAlterInfo info;
memcpy(info.topic, topicObj.name, TSDB_TOPIC_FNAME_LEN);
info.ntbUid = topicObj.ntbUid;
info.colIdList = topicObj.ntbColIds;
// broadcast forbid alter info
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
while (1) {
// iterate vg
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (!mndVgroupInDb(pVgroup, topicObj.dbUid)) {
sdbRelease(pSdb, pVgroup);
continue;
}
// encoder check alter info
int32_t len;
int32_t code;
tEncodeSize(tEncodeSCheckAlterInfo, &info, len, code);
if (code < 0) {
sdbRelease(pSdb, pVgroup);
mndTransDrop(pTrans);
return -1;
}
void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, len);
if (tEncodeSCheckAlterInfo(&encoder, &info) < 0) {
sdbRelease(pSdb, pVgroup);
mndTransDrop(pTrans);
return -1;
}
tEncoderClear(&encoder);
((SMsgHead *)buf)->vgId = pVgroup->vgId;
// add redo action
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
action.pCont = buf;
action.contLen = sizeof(SMsgHead) + len;
action.msgType = TDMT_VND_CHECK_ALTER_INFO;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(buf);
sdbRelease(pSdb, pVgroup);
mndTransDrop(pTrans);
return -1;
}
}
}
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
taosMemoryFreeClear(topicObj.physicalPlan); taosMemoryFreeClear(topicObj.physicalPlan);

View File

@ -89,7 +89,7 @@ typedef struct {
STqExecDb execDb; STqExecDb execDb;
}; };
int32_t numOfCols; // number of out pout column, temporarily used int32_t numOfCols; // number of out pout column, temporarily used
SSchemaWrapper *pSchemaWrapper; // columns that are involved in query SSchemaWrapper* pSchemaWrapper; // columns that are involved in query
} STqExecHandle; } STqExecHandle;
typedef struct { typedef struct {
@ -120,9 +120,9 @@ struct STQ {
SHashObj* pushMgr; // consumerId -> STqHandle* SHashObj* pushMgr; // consumerId -> STqHandle*
SHashObj* handles; // subKey -> STqHandle SHashObj* handles; // subKey -> STqHandle
SHashObj* pStreamTasks; // taksId -> SStreamTask SHashObj* pStreamTasks; // taksId -> SStreamTask
SHashObj* pAlterInfo; // topic -> SAlterCheckInfo
STqOffsetStore* pOffsetStore; STqOffsetStore* pOffsetStore;
SVnode* pVnode; SVnode* pVnode;
SWal* pWal;
TDB* pMetaStore; TDB* pMetaStore;
TTB* pExecStore; TTB* pExecStore;
}; };

View File

@ -137,12 +137,13 @@ STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableL
// tq // tq
int tqInit(); int tqInit();
void tqCleanUp(); void tqCleanUp();
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal); STQ* tqOpen(const char* path, SVnode* pVnode);
void tqClose(STQ*); void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqCommit(STQ*); int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
int32_t tqCheckColModifiable(STQ* pTq, int32_t colId); int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId);
int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen);

View File

@ -637,6 +637,10 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
goto _err; goto _err;
} }
if (tqCheckColModifiable(pMeta->pVnode->pTq, uid, pColumn->colId) != 0) {
terrno = TSDB_CODE_VND_COL_SUBSCRIBED;
goto _err;
}
pSchema->version++; pSchema->version++;
tlen = (pSchema->nCols - iCol - 1) * sizeof(SSchema); tlen = (pSchema->nCols - iCol - 1) * sizeof(SSchema);
if (tlen) { if (tlen) {
@ -653,6 +657,10 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
goto _err; goto _err;
} }
if (tqCheckColModifiable(pMeta->pVnode->pTq, uid, pColumn->colId) != 0) {
terrno = TSDB_CODE_VND_COL_SUBSCRIBED;
goto _err;
}
pSchema->version++; pSchema->version++;
pColumn->bytes = pAlterTbReq->colModBytes; pColumn->bytes = pAlterTbReq->colModBytes;
break; break;
@ -661,6 +669,10 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
terrno = TSDB_CODE_VND_TABLE_COL_NOT_EXISTS; terrno = TSDB_CODE_VND_TABLE_COL_NOT_EXISTS;
goto _err; goto _err;
} }
if (tqCheckColModifiable(pMeta->pVnode->pTq, uid, pColumn->colId) != 0) {
terrno = TSDB_CODE_VND_COL_SUBSCRIBED;
goto _err;
}
pSchema->version++; pSchema->version++;
strcpy(pColumn->name, pAlterTbReq->colNewName); strcpy(pColumn->name, pAlterTbReq->colNewName);
break; break;

View File

@ -51,7 +51,7 @@ void tqCleanUp() {
} }
} }
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { STQ* tqOpen(const char* path, SVnode* pVnode) {
STQ* pTq = taosMemoryCalloc(1, sizeof(STQ)); STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
if (pTq == NULL) { if (pTq == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
@ -59,7 +59,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
} }
pTq->path = strdup(path); pTq->path = strdup(path);
pTq->pVnode = pVnode; pTq->pVnode = pVnode;
pTq->pWal = pWal;
pTq->handles = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); pTq->handles = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
@ -67,6 +66,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
pTq->pAlterInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
if (tqMetaOpen(pTq) < 0) { if (tqMetaOpen(pTq) < 0) {
ASSERT(0); ASSERT(0);
} }
@ -91,6 +92,7 @@ void tqClose(STQ* pTq) {
} }
taosHashCleanup(pTq->pStreamTasks); taosHashCleanup(pTq->pStreamTasks);
taosHashCleanup(pTq->pushMgr); taosHashCleanup(pTq->pushMgr);
taosHashCleanup(pTq->pAlterInfo);
taosMemoryFree(pTq->path); taosMemoryFree(pTq->path);
tqMetaClose(pTq); tqMetaClose(pTq);
taosMemoryFree(pTq); taosMemoryFree(pTq);
@ -208,18 +210,18 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
return 0; return 0;
} }
int32_t tqCheckColModifiable(STQ* pTq, int32_t colId) { int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
void* pIter = NULL; void* pIter = NULL;
while (1) { while (1) {
pIter = taosHashIterate(pTq->handles, pIter); pIter = taosHashIterate(pTq->pAlterInfo, pIter);
if (pIter == NULL) break; if (pIter == NULL) break;
STqHandle* pExec = (STqHandle*)pIter; SCheckAlterInfo* pCheck = (SCheckAlterInfo*)pIter;
if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pCheck->ntbUid == tbUid) {
int32_t sz = pExec->execHandle.pSchemaWrapper->nCols; int32_t sz = taosArrayGetSize(pCheck->colIdList);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SSchema* pSchema = &pExec->execHandle.pSchemaWrapper->pSchema[i]; int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
if (pSchema->colId == colId) { if (forbidColId == colId) {
taosHashCancelIterate(pTq->handles, pIter); taosHashCancelIterate(pTq->pAlterInfo, pIter);
return -1; return -1;
} }
} }
@ -488,6 +490,22 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
return 0; return 0;
} }
int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen) {
SCheckAlterInfo info = {0};
SDecoder decoder;
tDecoderInit(&decoder, msg, msgLen);
if (tDecodeSCheckAlterInfo(&decoder, &info) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tDecoderClear(&decoder);
if (taosHashPut(pTq->pAlterInfo, info.topic, strlen(info.topic), &info, sizeof(SCheckAlterInfo)) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
SMqRebVgReq req = {0}; SMqRebVgReq req = {0};
tDecodeSMqRebVgReq(msg, &req); tDecodeSMqRebVgReq(msg, &req);
@ -524,7 +542,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
}; };
pHandle->execHandle.execCol.task = pHandle->execHandle.execCol.task =
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols,
&pHandle->execHandle.pSchemaWrapper); &pHandle->execHandle.pSchemaWrapper, &pHandle->ntbUid);
ASSERT(pHandle->execHandle.execCol.task); ASSERT(pHandle->execHandle.execCol.task);
void* scanner = NULL; void* scanner = NULL;
qExtractStreamScanner(pHandle->execHandle.execCol.task, &scanner); qExtractStreamScanner(pHandle->execHandle.execCol.task, &scanner);

View File

@ -90,7 +90,8 @@ int32_t tqMetaOpen(STQ* pTq) {
}; };
handle.execHandle.execCol.task = handle.execHandle.execCol.task =
qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper); qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols,
&handle.execHandle.pSchemaWrapper, &handle.ntbUid);
ASSERT(handle.execHandle.execCol.task); ASSERT(handle.execHandle.execCol.task);
void* scanner = NULL; void* scanner = NULL;
qExtractStreamScanner(handle.execHandle.execCol.task, &scanner); qExtractStreamScanner(handle.execHandle.execCol.task, &scanner);

View File

@ -110,7 +110,6 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
// TODO // TODO
ret = rpcMallocCont(cap); ret = rpcMallocCont(cap);
ret->header.vgId = vgId; ret->header.vgId = vgId;
ret->version = htonl(1);
ret->length = sizeof(SSubmitReq); ret->length = sizeof(SSubmitReq);
ret->numOfBlocks = htonl(sz); ret->numOfBlocks = htonl(sz);

View File

@ -135,7 +135,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
// open tq // open tq
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR); sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR);
taosRealPath(tdir, NULL, sizeof(tdir)); taosRealPath(tdir, NULL, sizeof(tdir));
pVnode->pTq = tqOpen(tdir, pVnode, pVnode->pWal); pVnode->pTq = tqOpen(tdir, pVnode);
if (pVnode->pTq == NULL) { if (pVnode->pTq == NULL) {
vError("vgId:%d, failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId:%d, failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;

View File

@ -206,6 +206,12 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
goto _err; goto _err;
} }
break; break;
case TDMT_VND_CHECK_ALTER_INFO:
if (tqProcessCheckAlterInfoReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
goto _err;
}
break;
case TDMT_STREAM_TASK_DEPLOY: { case TDMT_STREAM_TASK_DEPLOY: {
if (tqProcessTaskDeployReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), if (tqProcessTaskDeployReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) { pMsg->contLen - sizeof(SMsgHead)) < 0) {

View File

@ -157,6 +157,7 @@ typedef struct {
SQueryTableDataCond tableCond; SQueryTableDataCond tableCond;
int64_t recoverStartVer; int64_t recoverStartVer;
int64_t recoverEndVer; int64_t recoverEndVer;
int64_t ntbUid;
} SStreamTaskInfo; } SStreamTaskInfo;
typedef struct { typedef struct {

View File

@ -121,7 +121,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
} }
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols,
SSchemaWrapper** pSchemaWrapper) { SSchemaWrapper** pSchemaWrapper, int64_t* ntbUid) {
if (msg == NULL) { if (msg == NULL) {
// TODO create raw scan // TODO create raw scan
return NULL; return NULL;
@ -156,6 +156,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* n
} }
*pSchemaWrapper = tCloneSSchemaWrapper(((SExecTaskInfo*)pTaskInfo)->schemaInfo.qsw); *pSchemaWrapper = tCloneSSchemaWrapper(((SExecTaskInfo*)pTaskInfo)->schemaInfo.qsw);
*ntbUid = ((SExecTaskInfo*)pTaskInfo)->streamInfo.ntbUid;
return pTaskInfo; return pTaskInfo;
} }

View File

@ -315,6 +315,9 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pInfo = pOperator->info; SStreamScanInfo* pInfo = pOperator->info;
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
tsdbReaderClose(pTSInfo->dataReader);
pTSInfo->dataReader = NULL;
#if 0 #if 0
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) && if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
pInfo->tqReader->pWalReader->curVersion != pOffset->version) { pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
@ -348,9 +351,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList); int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
#ifndef NDEBUG #ifndef NDEBUG
qDebug("switch to next table %ld (cursor %d), %ld rows returned", uid, pTableScanInfo->currentTable,
qDebug("switch to next table %ld (cursor %d), %ld rows returned", uid, pInfo->pTableScanOp->resultInfo.totalRows);
pTableScanInfo->currentTable, pInfo->pTableScanOp->resultInfo.totalRows);
pInfo->pTableScanOp->resultInfo.totalRows = 0; pInfo->pTableScanOp->resultInfo.totalRows = 0;
#endif #endif
@ -367,6 +369,14 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
// TODO after dropping table, table may be not found // TODO after dropping table, table may be not found
ASSERT(found); ASSERT(found);
if (pTableScanInfo->dataReader == NULL) {
if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond,
pTaskInfo->tableqinfoList.pTableList, &pTableScanInfo->dataReader, NULL) < 0 ||
pTableScanInfo->dataReader == NULL) {
ASSERT(0);
}
}
tsdbSetTableId(pTableScanInfo->dataReader, uid); tsdbSetTableId(pTableScanInfo->dataReader, uid);
int64_t oldSkey = pTableScanInfo->cond.twindows.skey; int64_t oldSkey = pTableScanInfo->cond.twindows.skey;
pTableScanInfo->cond.twindows.skey = ts + 1; pTableScanInfo->cond.twindows.skey = ts + 1;

View File

@ -4169,6 +4169,9 @@ int32_t extractTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode,
pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version; pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
} else { } else {
pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow); pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
if (pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) {
pTaskInfo->streamInfo.ntbUid = mr.me.uid;
}
} }
metaReaderClear(&mr); metaReaderClear(&mr);

View File

@ -982,6 +982,9 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
if (!pResult) { if (!pResult) {
blockDataCleanup(pSDB); blockDataCleanup(pSDB);
*pRowIndex = 0; *pRowIndex = 0;
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
tsdbReaderClose(pTableScanInfo->dataReader);
pTableScanInfo->dataReader = NULL;
return NULL; return NULL;
} }
@ -1003,6 +1006,9 @@ static SSDataBlock* doDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_
} }
if (!pResult) { if (!pResult) {
pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
tsdbReaderClose(pTableScanInfo->dataReader);
pTableScanInfo->dataReader = NULL;
return NULL; return NULL;
} }
@ -2051,8 +2057,8 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid; uint64_t suid = pInfo->pCur->mr.me.ctbEntry.suid;
int32_t code = metaGetTableEntryByUid(&mr, suid); int32_t code = metaGetTableEntryByUid(&mr, suid);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("failed to get super table meta, cname:%s, suid:0x%" PRIx64 ", code:%s, %s", qError("failed to get super table meta, cname:%s, suid:0x%" PRIx64 ", code:%s, %s", pInfo->pCur->mr.me.name,
pInfo->pCur->mr.me.name, suid, tstrerror(terrno), GET_TASKID(pTaskInfo)); suid, tstrerror(terrno), GET_TASKID(pTaskInfo));
metaReaderClear(&mr); metaReaderClear(&mr);
metaCloseTbCursor(pInfo->pCur); metaCloseTbCursor(pInfo->pCur);
pInfo->pCur = NULL; pInfo->pCur = NULL;
@ -2158,7 +2164,6 @@ static SSDataBlock* sysTableScanUserTables(SOperatorInfo* pOperator) {
} }
} }
static SSDataBlock* sysTableScanUserSTables(SOperatorInfo* pOperator) { static SSDataBlock* sysTableScanUserSTables(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSysTableScanInfo* pInfo = pOperator->info; SSysTableScanInfo* pInfo = pOperator->info;
@ -2184,12 +2189,13 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
getDBNameFromCondition(pInfo->pCondition, dbName); getDBNameFromCondition(pInfo->pCondition, dbName);
sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName); sprintf(pInfo->req.db, "%d.%s", pInfo->accountId, dbName);
} }
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) { if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
return sysTableScanUserTables(pOperator); return sysTableScanUserTables(pOperator);
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TAGS, TSDB_TABLE_FNAME_LEN) == 0) { } else if (strncasecmp(name, TSDB_INS_TABLE_USER_TAGS, TSDB_TABLE_FNAME_LEN) == 0) {
return sysTableScanUserTags(pOperator); return sysTableScanUserTags(pOperator);
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, TSDB_TABLE_FNAME_LEN) == 0 && IS_SYS_DBNAME(pInfo->req.db)) { } else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, TSDB_TABLE_FNAME_LEN) == 0 &&
IS_SYS_DBNAME(pInfo->req.db)) {
return sysTableScanUserSTables(pOperator); return sysTableScanUserSTables(pOperator);
} else { // load the meta from mnode of the given epset } else { // load the meta from mnode of the given epset
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {

View File

@ -336,6 +336,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_NOT_EXIST, "Table does not exists
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TABLE_ACTION, "Invalid table action") TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TABLE_ACTION, "Invalid table action")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already exists") TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_COL_NOT_EXISTS, "Table column not exists") TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_COL_NOT_EXISTS, "Table column not exists")
TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_SUBSCRIBED, "Table column is subscribed")
// tsdb // tsdb
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")

View File

@ -1,91 +0,0 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.ts = 1538548685000
def run(self):
tdSql.prepare()
# test case for https://jira.taosdata.com:18080/browse/TD-3679
print("==============step1")
tdSql.execute(
"create topic tq_test partitions 10")
tdSql.execute(
"insert into tq_test.p1(off, ts, content) values(0, %d, 'aaaa')" % self.ts)
tdSql.execute(
"insert into tq_test.p1(off, ts, content) values(1, %d, 'aaaa')" % (self.ts + 1))
tdSql.execute(
"insert into tq_test.p1(off, ts, content) values(2, %d, 'aaaa')" % (self.ts + 2))
tdSql.execute(
"insert into tq_test.p1(off, ts, content) values(3, %d, 'aaaa')" % (self.ts + 3))
print("==============step2")
tdSql.query("select * from tq_test.p1")
tdSql.checkRows(4)
tdSql.query("select * from tq_test.p1 where ts >= %d" % self.ts)
tdSql.checkRows(4)
tdSql.query("select * from tq_test.p1 where ts > %d" % self.ts)
tdSql.checkRows(3)
tdSql.query("select * from tq_test.p1 where ts = %d" % self.ts)
tdSql.checkRows(1)
tdSql.execute("use db")
tdSql.execute("create table test(ts timestamp, start timestamp, value int)")
tdSql.execute("insert into test values(%d, %d, 1)" % (self.ts, self.ts))
tdSql.execute("insert into test values(%d, %d, 1)" % (self.ts + 1, self.ts + 1))
tdSql.execute("insert into test values(%d, %d, 1)" % (self.ts + 2, self.ts + 2))
tdSql.execute("insert into test values(%d, %d, 1)" % (self.ts + 3, self.ts + 3))
tdSql.query("select * from test")
tdSql.checkRows(4)
tdSql.query("select * from test where ts >= %d" % self.ts)
tdSql.checkRows(4)
tdSql.query("select * from test where ts > %d" % self.ts)
tdSql.checkRows(3)
tdSql.query("select * from test where ts = %d" % self.ts)
tdSql.checkRows(1)
tdSql.query("select * from test where start >= %d" % self.ts)
tdSql.checkRows(4)
tdSql.query("select * from test where start > %d" % self.ts)
tdSql.checkRows(3)
tdSql.query("select * from test where start = %d" % self.ts)
tdSql.checkRows(1)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -830,9 +830,9 @@ class TDTestCase:
cfgPath = buildPath + "/../sim/psim/cfg" cfgPath = buildPath + "/../sim/psim/cfg"
tdLog.info("cfgPath: %s" % cfgPath) tdLog.info("cfgPath: %s" % cfgPath)
# self.tmqCase1(cfgPath, buildPath) self.tmqCase1(cfgPath, buildPath)
# self.tmqCase2(cfgPath, buildPath) self.tmqCase2(cfgPath, buildPath)
# self.tmqCase3(cfgPath, buildPath) self.tmqCase3(cfgPath, buildPath)
self.tmqCase4(cfgPath, buildPath) self.tmqCase4(cfgPath, buildPath)
self.tmqCase5(cfgPath, buildPath) self.tmqCase5(cfgPath, buildPath)

View File

@ -28,6 +28,7 @@ target_link_libraries(
sdbDump sdbDump
PUBLIC dnode PUBLIC dnode
PUBLIC mnode PUBLIC mnode
PUBLIC stream
PUBLIC sdb PUBLIC sdb
PUBLIC os PUBLIC os
) )
@ -37,4 +38,4 @@ target_include_directories(
PRIVATE "${TD_SOURCE_DIR}/source/dnode/mnode/impl/inc" PRIVATE "${TD_SOURCE_DIR}/source/dnode/mnode/impl/inc"
PRIVATE "${TD_SOURCE_DIR}/source/dnode/mnode/sdb/inc" PRIVATE "${TD_SOURCE_DIR}/source/dnode/mnode/sdb/inc"
PRIVATE "${TD_SOURCE_DIR}/source/dnode/mgmt/node_mgmt/inc" PRIVATE "${TD_SOURCE_DIR}/source/dnode/mgmt/node_mgmt/inc"
) )