commit
43abf0d3f1
|
@ -34,11 +34,10 @@ typedef enum {
|
||||||
TSDB_SUPER_TABLE = 1, // super table
|
TSDB_SUPER_TABLE = 1, // super table
|
||||||
TSDB_CHILD_TABLE = 2, // table created from super table
|
TSDB_CHILD_TABLE = 2, // table created from super table
|
||||||
TSDB_NORMAL_TABLE = 3, // ordinary table
|
TSDB_NORMAL_TABLE = 3, // ordinary table
|
||||||
TSDB_STREAM_TABLE = 4, // table created from stream computing
|
TSDB_TEMP_TABLE = 4, // temp table created by nest query
|
||||||
TSDB_TEMP_TABLE = 5, // temp table created by nest query
|
TSDB_SYSTEM_TABLE = 5,
|
||||||
TSDB_SYSTEM_TABLE = 6,
|
TSDB_TSMA_TABLE = 6, // time-range-wise sma
|
||||||
TSDB_TSMA_TABLE = 7, // time-range-wise sma
|
TSDB_TABLE_MAX = 7
|
||||||
TSDB_TABLE_MAX = 8
|
|
||||||
} ETableType;
|
} ETableType;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
|
|
|
@ -2537,6 +2537,15 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq)
|
||||||
return (void*)buf;
|
return (void*)buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||||
|
int64_t ntbUid;
|
||||||
|
SArray* colIdList; // SArray<int16_t>
|
||||||
|
} SCheckAlterInfo;
|
||||||
|
|
||||||
|
int32_t tEncodeSCheckAlterInfo(SEncoder* pEncoder, const SCheckAlterInfo* pInfo);
|
||||||
|
int32_t tDecodeSCheckAlterInfo(SDecoder* pDecoder, SCheckAlterInfo* pInfo);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int64_t offset;
|
int64_t offset;
|
||||||
|
|
|
@ -186,6 +186,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp)
|
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp)
|
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_COMMIT_OFFSET, "vnode-commit-offset", STqOffset, STqOffset)
|
TD_DEF_MSG_TYPE(TDMT_VND_MQ_COMMIT_OFFSET, "vnode-commit-offset", STqOffset, STqOffset)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_VND_CHECK_ALTER_INFO, "vnode-alter-check-info", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL)
|
||||||
|
|
|
@ -64,7 +64,8 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers);
|
||||||
* @param SReadHandle
|
* @param SReadHandle
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchemaWrapper);
|
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols,
|
||||||
|
SSchemaWrapper** pSchemaWrapper);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the input data block for the stream scan.
|
* Set the input data block for the stream scan.
|
||||||
|
|
|
@ -332,6 +332,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_VND_INVALID_TABLE_ACTION TAOS_DEF_ERROR_CODE(0, 0x0519)
|
#define TSDB_CODE_VND_INVALID_TABLE_ACTION TAOS_DEF_ERROR_CODE(0, 0x0519)
|
||||||
#define TSDB_CODE_VND_COL_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051a)
|
#define TSDB_CODE_VND_COL_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051a)
|
||||||
#define TSDB_CODE_VND_TABLE_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051b)
|
#define TSDB_CODE_VND_TABLE_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051b)
|
||||||
|
#define TSDB_CODE_VND_COL_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x051c)
|
||||||
|
|
||||||
// tsdb
|
// tsdb
|
||||||
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600)
|
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600)
|
||||||
|
|
|
@ -5623,6 +5623,33 @@ int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeSCheckAlterInfo(SEncoder *pEncoder, const SCheckAlterInfo *pInfo) {
|
||||||
|
if (tEncodeCStr(pEncoder, pInfo->topic) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pInfo->ntbUid) < 0) return -1;
|
||||||
|
int32_t sz = taosArrayGetSize(pInfo->colIdList);
|
||||||
|
if (tEncodeI32(pEncoder, sz) < 0) return -1;
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
int16_t colId = *(int16_t *)taosArrayGet(pInfo->colIdList, i);
|
||||||
|
if (tEncodeI16(pEncoder, colId) < 0) return -1;
|
||||||
|
}
|
||||||
|
return pEncoder->pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeSCheckAlterInfo(SDecoder *pDecoder, SCheckAlterInfo *pInfo) {
|
||||||
|
if (tDecodeCStrTo(pDecoder, pInfo->topic) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pInfo->ntbUid) < 0) return -1;
|
||||||
|
int32_t sz;
|
||||||
|
if (tDecodeI32(pDecoder, &sz) < 0) return -1;
|
||||||
|
pInfo->colIdList = taosArrayInit(sz, sizeof(int16_t));
|
||||||
|
if (pInfo->colIdList == NULL) return -1;
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
int16_t colId;
|
||||||
|
if (tDecodeI16(pDecoder, &colId) < 0) return -1;
|
||||||
|
taosArrayPush(pInfo->colIdList, &colId);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) {
|
int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) {
|
||||||
int32_t nUid = taosArrayGetSize(pRes->uidList);
|
int32_t nUid = taosArrayGetSize(pRes->uidList);
|
||||||
|
|
||||||
|
|
|
@ -224,6 +224,7 @@ SArray *mmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_VND_CHECK_ALTER_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -352,6 +352,7 @@ SArray *vmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_VND_CHECK_ALTER_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -477,6 +477,10 @@ typedef struct {
|
||||||
char* physicalPlan;
|
char* physicalPlan;
|
||||||
SSchemaWrapper schema;
|
SSchemaWrapper schema;
|
||||||
int64_t stbUid;
|
int64_t stbUid;
|
||||||
|
// forbid condition
|
||||||
|
int64_t ntbUid;
|
||||||
|
SArray* ntbColIds;
|
||||||
|
int64_t ctbStbUid;
|
||||||
} SMqTopicObj;
|
} SMqTopicObj;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -57,6 +57,7 @@ int32_t mndInitTopic(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndTransProcessRsp);
|
||||||
|
mndSetMsgHandle(pMnode, TDMT_VND_CHECK_ALTER_INFO_RSP, mndTransProcessRsp);
|
||||||
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
|
||||||
|
@ -74,7 +75,6 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) {
|
||||||
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId) {
|
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
bool found = false;
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SMqTopicObj *pTopic = NULL;
|
SMqTopicObj *pTopic = NULL;
|
||||||
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
|
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
|
||||||
|
@ -95,10 +95,12 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId
|
||||||
SNode *pNode = NULL;
|
SNode *pNode = NULL;
|
||||||
FOREACH(pNode, pNodeList) {
|
FOREACH(pNode, pNodeList) {
|
||||||
SColumnNode *pCol = (SColumnNode *)pNode;
|
SColumnNode *pCol = (SColumnNode *)pNode;
|
||||||
if (pCol->tableId != suid) goto NEXT;
|
if (pCol->tableId != suid && pTopic->ctbStbUid != suid) goto NEXT;
|
||||||
if (pCol->colId > 0 && pCol->colId == colId) {
|
if (pCol->colId > 0 && pCol->colId == colId) {
|
||||||
found = true;
|
sdbRelease(pSdb, pTopic);
|
||||||
goto NEXT;
|
nodesDestroyNode(pAst);
|
||||||
|
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
mTrace("topic:%s, colId:%d is used", pTopic->name, pCol->colId);
|
mTrace("topic:%s, colId:%d is used", pTopic->name, pCol->colId);
|
||||||
}
|
}
|
||||||
|
@ -106,10 +108,6 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId
|
||||||
NEXT:
|
NEXT:
|
||||||
sdbRelease(pSdb, pTopic);
|
sdbRelease(pSdb, pTopic);
|
||||||
nodesDestroyNode(pAst);
|
nodesDestroyNode(pAst);
|
||||||
if (found) {
|
|
||||||
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -127,8 +125,10 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
||||||
if (pTopic->schema.nCols) {
|
if (pTopic->schema.nCols) {
|
||||||
schemaLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
|
schemaLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
|
||||||
}
|
}
|
||||||
int32_t size =
|
int32_t ntbColLen = taosArrayGetSize(pTopic->ntbColIds) * sizeof(int16_t);
|
||||||
sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + pTopic->astLen + schemaLen + MND_TOPIC_RESERVE_SIZE;
|
|
||||||
|
int32_t size = sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + pTopic->astLen + schemaLen + ntbColLen +
|
||||||
|
MND_TOPIC_RESERVE_SIZE;
|
||||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
|
SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
|
||||||
if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
|
if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
|
||||||
|
|
||||||
|
@ -164,6 +164,16 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
||||||
taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema);
|
taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema);
|
||||||
SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
|
SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
|
||||||
}
|
}
|
||||||
|
SDB_SET_INT64(pRaw, dataPos, pTopic->ntbUid, TOPIC_ENCODE_OVER);
|
||||||
|
if (pTopic->ntbUid != 0) {
|
||||||
|
int32_t sz = taosArrayGetSize(pTopic->ntbColIds);
|
||||||
|
SDB_SET_INT32(pRaw, dataPos, sz, TOPIC_ENCODE_OVER);
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
int16_t colId = *(int16_t *)taosArrayGet(pTopic->ntbColIds, i);
|
||||||
|
SDB_SET_INT16(pRaw, dataPos, colId, TOPIC_ENCODE_OVER);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
SDB_SET_INT64(pRaw, dataPos, pTopic->ctbStbUid, TOPIC_ENCODE_OVER);
|
||||||
|
|
||||||
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
|
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);
|
SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);
|
||||||
|
@ -259,6 +269,20 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
||||||
pTopic->schema.version = 0;
|
pTopic->schema.version = 0;
|
||||||
pTopic->schema.pSchema = NULL;
|
pTopic->schema.pSchema = NULL;
|
||||||
}
|
}
|
||||||
|
SDB_GET_INT64(pRaw, dataPos, &pTopic->ntbUid, TOPIC_DECODE_OVER);
|
||||||
|
if (pTopic->ntbUid != 0) {
|
||||||
|
int32_t ntbColNum;
|
||||||
|
SDB_GET_INT32(pRaw, dataPos, &ntbColNum, TOPIC_DECODE_OVER);
|
||||||
|
pTopic->ntbColIds = taosArrayInit(ntbColNum, sizeof(int16_t));
|
||||||
|
if (pTopic->ntbColIds == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto TOPIC_DECODE_OVER;
|
||||||
|
}
|
||||||
|
int16_t colId;
|
||||||
|
SDB_GET_INT16(pRaw, dataPos, &colId, TOPIC_DECODE_OVER);
|
||||||
|
taosArrayPush(pTopic->ntbColIds, &colId);
|
||||||
|
}
|
||||||
|
SDB_GET_INT64(pRaw, dataPos, &pTopic->ctbStbUid, TOPIC_DECODE_OVER);
|
||||||
|
|
||||||
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
|
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
|
||||||
|
|
||||||
|
@ -346,6 +370,26 @@ static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t extractTopicTbInfo(SNode *pAst, SMqTopicObj *pTopic) {
|
||||||
|
SNodeList *pNodeList = NULL;
|
||||||
|
nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList);
|
||||||
|
int64_t suid = ((SRealTableNode *)((SSelectStmt *)pAst)->pFromTable)->pMeta->suid;
|
||||||
|
int8_t tableType = ((SRealTableNode *)((SSelectStmt *)pAst)->pFromTable)->pMeta->tableType;
|
||||||
|
if (tableType == TSDB_CHILD_TABLE) {
|
||||||
|
pTopic->ctbStbUid = suid;
|
||||||
|
} else if (tableType == TSDB_NORMAL_TABLE) {
|
||||||
|
SNode *pNode = NULL;
|
||||||
|
FOREACH(pNode, pNodeList) {
|
||||||
|
SColumnNode *pCol = (SColumnNode *)pNode;
|
||||||
|
if (pCol->tableType == TSDB_NORMAL_TABLE) {
|
||||||
|
pTopic->ntbUid = pCol->tableId;
|
||||||
|
taosArrayPush(pTopic->ntbColIds, &pCol->colId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
|
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
|
||||||
mDebug("topic:%s to create", pCreate->name);
|
mDebug("topic:%s to create", pCreate->name);
|
||||||
SMqTopicObj topicObj = {0};
|
SMqTopicObj topicObj = {0};
|
||||||
|
@ -386,6 +430,19 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t ntbUid;
|
||||||
|
topicObj.ntbColIds = taosArrayInit(0, sizeof(int16_t));
|
||||||
|
if (topicObj.ntbColIds == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
extractTopicTbInfo(pAst, &topicObj);
|
||||||
|
|
||||||
|
if (topicObj.ntbUid == 0) {
|
||||||
|
taosArrayDestroy(topicObj.ntbColIds);
|
||||||
|
topicObj.ntbColIds = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) {
|
if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) {
|
||||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||||
taosMemoryFree(topicObj.ast);
|
taosMemoryFree(topicObj.ast);
|
||||||
|
@ -433,6 +490,60 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
}
|
}
|
||||||
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||||
|
|
||||||
|
if (topicObj.ntbUid != 0) {
|
||||||
|
SCheckAlterInfo info;
|
||||||
|
memcpy(info.topic, topicObj.name, TSDB_TOPIC_FNAME_LEN);
|
||||||
|
info.ntbUid = topicObj.ntbUid;
|
||||||
|
info.colIdList = topicObj.ntbColIds;
|
||||||
|
// broadcast forbid alter info
|
||||||
|
void *pIter = NULL;
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
SVgObj *pVgroup = NULL;
|
||||||
|
while (1) {
|
||||||
|
// iterate vg
|
||||||
|
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
if (!mndVgroupInDb(pVgroup, topicObj.dbUid)) {
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// encoder check alter info
|
||||||
|
int32_t len;
|
||||||
|
int32_t code;
|
||||||
|
tEncodeSize(tEncodeSCheckAlterInfo, &info, len, code);
|
||||||
|
if (code < 0) {
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
|
||||||
|
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
|
SEncoder encoder;
|
||||||
|
tEncoderInit(&encoder, abuf, len);
|
||||||
|
if (tEncodeSCheckAlterInfo(&encoder, &info) < 0) {
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId);
|
||||||
|
// add redo action
|
||||||
|
STransAction action = {0};
|
||||||
|
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
|
action.pCont = buf;
|
||||||
|
action.contLen = sizeof(SMsgHead) + len;
|
||||||
|
action.msgType = TDMT_VND_CHECK_ALTER_INFO;
|
||||||
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||||
taosMemoryFreeClear(topicObj.physicalPlan);
|
taosMemoryFreeClear(topicObj.physicalPlan);
|
||||||
|
@ -442,7 +553,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
||||||
|
|
||||||
taosMemoryFreeClear(topicObj.physicalPlan);
|
taosMemoryFreeClear(topicObj.physicalPlan);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
return 0;
|
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
|
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
|
||||||
|
|
|
@ -89,7 +89,7 @@ typedef struct {
|
||||||
STqExecDb execDb;
|
STqExecDb execDb;
|
||||||
};
|
};
|
||||||
int32_t numOfCols; // number of out pout column, temporarily used
|
int32_t numOfCols; // number of out pout column, temporarily used
|
||||||
SSchemaWrapper *pSchemaWrapper; // columns that are involved in query
|
SSchemaWrapper* pSchemaWrapper; // columns that are involved in query
|
||||||
} STqExecHandle;
|
} STqExecHandle;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -110,9 +110,6 @@ typedef struct {
|
||||||
// exec
|
// exec
|
||||||
STqExecHandle execHandle;
|
STqExecHandle execHandle;
|
||||||
|
|
||||||
// prevent drop
|
|
||||||
int64_t ntbUid;
|
|
||||||
SArray* colIdList; // SArray<int32_t>
|
|
||||||
} STqHandle;
|
} STqHandle;
|
||||||
|
|
||||||
struct STQ {
|
struct STQ {
|
||||||
|
@ -120,9 +117,9 @@ struct STQ {
|
||||||
SHashObj* pushMgr; // consumerId -> STqHandle*
|
SHashObj* pushMgr; // consumerId -> STqHandle*
|
||||||
SHashObj* handles; // subKey -> STqHandle
|
SHashObj* handles; // subKey -> STqHandle
|
||||||
SHashObj* pStreamTasks; // taksId -> SStreamTask
|
SHashObj* pStreamTasks; // taksId -> SStreamTask
|
||||||
|
SHashObj* pAlterInfo; // topic -> SAlterCheckInfo
|
||||||
STqOffsetStore* pOffsetStore;
|
STqOffsetStore* pOffsetStore;
|
||||||
SVnode* pVnode;
|
SVnode* pVnode;
|
||||||
SWal* pWal;
|
|
||||||
TDB* pMetaStore;
|
TDB* pMetaStore;
|
||||||
TTB* pExecStore;
|
TTB* pExecStore;
|
||||||
};
|
};
|
||||||
|
|
|
@ -137,12 +137,13 @@ STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableL
|
||||||
// tq
|
// tq
|
||||||
int tqInit();
|
int tqInit();
|
||||||
void tqCleanUp();
|
void tqCleanUp();
|
||||||
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal);
|
STQ* tqOpen(const char* path, SVnode* pVnode);
|
||||||
void tqClose(STQ*);
|
void tqClose(STQ*);
|
||||||
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
|
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
|
||||||
int tqCommit(STQ*);
|
int tqCommit(STQ*);
|
||||||
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
|
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
|
||||||
int32_t tqCheckColModifiable(STQ* pTq, int32_t colId);
|
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId);
|
||||||
|
int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
|
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
|
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen);
|
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||||
|
|
|
@ -640,6 +640,10 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
||||||
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
|
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
if (tqCheckColModifiable(pMeta->pVnode->pTq, uid, pColumn->colId) != 0) {
|
||||||
|
terrno = TSDB_CODE_VND_COL_SUBSCRIBED;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
pSchema->version++;
|
pSchema->version++;
|
||||||
tlen = (pSchema->nCols - iCol - 1) * sizeof(SSchema);
|
tlen = (pSchema->nCols - iCol - 1) * sizeof(SSchema);
|
||||||
if (tlen) {
|
if (tlen) {
|
||||||
|
@ -656,6 +660,10 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
||||||
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
|
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
if (tqCheckColModifiable(pMeta->pVnode->pTq, uid, pColumn->colId) != 0) {
|
||||||
|
terrno = TSDB_CODE_VND_COL_SUBSCRIBED;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
pSchema->version++;
|
pSchema->version++;
|
||||||
pColumn->bytes = pAlterTbReq->colModBytes;
|
pColumn->bytes = pAlterTbReq->colModBytes;
|
||||||
break;
|
break;
|
||||||
|
@ -664,6 +672,10 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
||||||
terrno = TSDB_CODE_VND_TABLE_COL_NOT_EXISTS;
|
terrno = TSDB_CODE_VND_TABLE_COL_NOT_EXISTS;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
if (tqCheckColModifiable(pMeta->pVnode->pTq, uid, pColumn->colId) != 0) {
|
||||||
|
terrno = TSDB_CODE_VND_COL_SUBSCRIBED;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
pSchema->version++;
|
pSchema->version++;
|
||||||
strcpy(pColumn->name, pAlterTbReq->colNewName);
|
strcpy(pColumn->name, pAlterTbReq->colNewName);
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -51,7 +51,7 @@ void tqCleanUp() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
|
STQ* tqOpen(const char* path, SVnode* pVnode) {
|
||||||
STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
|
STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
|
||||||
if (pTq == NULL) {
|
if (pTq == NULL) {
|
||||||
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
|
||||||
|
@ -59,7 +59,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
|
||||||
}
|
}
|
||||||
pTq->path = strdup(path);
|
pTq->path = strdup(path);
|
||||||
pTq->pVnode = pVnode;
|
pTq->pVnode = pVnode;
|
||||||
pTq->pWal = pWal;
|
|
||||||
|
|
||||||
pTq->handles = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
pTq->handles = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||||
|
|
||||||
|
@ -67,6 +66,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
|
||||||
|
|
||||||
pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
|
pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
|
||||||
|
|
||||||
|
pTq->pAlterInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||||
|
|
||||||
if (tqMetaOpen(pTq) < 0) {
|
if (tqMetaOpen(pTq) < 0) {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
@ -91,6 +92,7 @@ void tqClose(STQ* pTq) {
|
||||||
}
|
}
|
||||||
taosHashCleanup(pTq->pStreamTasks);
|
taosHashCleanup(pTq->pStreamTasks);
|
||||||
taosHashCleanup(pTq->pushMgr);
|
taosHashCleanup(pTq->pushMgr);
|
||||||
|
taosHashCleanup(pTq->pAlterInfo);
|
||||||
taosMemoryFree(pTq->path);
|
taosMemoryFree(pTq->path);
|
||||||
tqMetaClose(pTq);
|
tqMetaClose(pTq);
|
||||||
taosMemoryFree(pTq);
|
taosMemoryFree(pTq);
|
||||||
|
@ -208,18 +210,18 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqCheckColModifiable(STQ* pTq, int32_t colId) {
|
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pTq->handles, pIter);
|
pIter = taosHashIterate(pTq->pAlterInfo, pIter);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
STqHandle* pExec = (STqHandle*)pIter;
|
SCheckAlterInfo* pCheck = (SCheckAlterInfo*)pIter;
|
||||||
if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pCheck->ntbUid == tbUid) {
|
||||||
int32_t sz = pExec->execHandle.pSchemaWrapper->nCols;
|
int32_t sz = taosArrayGetSize(pCheck->colIdList);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SSchema* pSchema = &pExec->execHandle.pSchemaWrapper->pSchema[i];
|
int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
|
||||||
if (pSchema->colId == colId) {
|
if (forbidColId == colId) {
|
||||||
taosHashCancelIterate(pTq->handles, pIter);
|
taosHashCancelIterate(pTq->pAlterInfo, pIter);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -270,6 +272,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STqOffsetVal reqOffset = pReq->reqOffset;
|
STqOffsetVal reqOffset = pReq->reqOffset;
|
||||||
STqOffsetVal fetchOffsetNew;
|
STqOffsetVal fetchOffsetNew;
|
||||||
|
SWalCkHead* pCkHead = NULL;
|
||||||
|
|
||||||
// 1.find handle
|
// 1.find handle
|
||||||
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
|
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
|
||||||
|
@ -459,6 +462,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
OVER:
|
OVER:
|
||||||
|
if (pCkHead) taosMemoryFree(pCkHead);
|
||||||
// TODO wrap in destroy func
|
// TODO wrap in destroy func
|
||||||
taosArrayDestroy(dataRsp.blockDataLen);
|
taosArrayDestroy(dataRsp.blockDataLen);
|
||||||
taosArrayDestroyP(dataRsp.blockData, (FDelete)taosMemoryFree);
|
taosArrayDestroyP(dataRsp.blockData, (FDelete)taosMemoryFree);
|
||||||
|
@ -488,6 +492,22 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
|
SCheckAlterInfo info = {0};
|
||||||
|
SDecoder decoder;
|
||||||
|
tDecoderInit(&decoder, msg, msgLen);
|
||||||
|
if (tDecodeSCheckAlterInfo(&decoder, &info) < 0) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
if (taosHashPut(pTq->pAlterInfo, info.topic, strlen(info.topic), &info, sizeof(SCheckAlterInfo)) < 0) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
SMqRebVgReq req = {0};
|
SMqRebVgReq req = {0};
|
||||||
tDecodeSMqRebVgReq(msg, &req);
|
tDecodeSMqRebVgReq(msg, &req);
|
||||||
|
|
|
@ -201,10 +201,12 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
|
||||||
if (pRsp->withTbName) {
|
if (pRsp->withTbName) {
|
||||||
int64_t uid = pExec->pExecReader->msgIter.uid;
|
int64_t uid = pExec->pExecReader->msgIter.uid;
|
||||||
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
|
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
|
||||||
|
blockDataFreeRes(&block);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
|
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
|
||||||
|
blockDataFreeRes(&block);
|
||||||
tqAddBlockSchemaToRsp(pExec, pRsp);
|
tqAddBlockSchemaToRsp(pExec, pRsp);
|
||||||
pRsp->blockNum++;
|
pRsp->blockNum++;
|
||||||
}
|
}
|
||||||
|
@ -220,10 +222,12 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
|
||||||
if (pRsp->withTbName) {
|
if (pRsp->withTbName) {
|
||||||
int64_t uid = pExec->pExecReader->msgIter.uid;
|
int64_t uid = pExec->pExecReader->msgIter.uid;
|
||||||
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
|
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
|
||||||
|
blockDataFreeRes(&block);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
|
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
|
||||||
|
blockDataFreeRes(&block);
|
||||||
tqAddBlockSchemaToRsp(pExec, pRsp);
|
tqAddBlockSchemaToRsp(pExec, pRsp);
|
||||||
pRsp->blockNum++;
|
pRsp->blockNum++;
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,8 +89,8 @@ int32_t tqMetaOpen(STQ* pTq) {
|
||||||
.version = handle.snapshotVer,
|
.version = handle.snapshotVer,
|
||||||
};
|
};
|
||||||
|
|
||||||
handle.execHandle.execCol.task =
|
handle.execHandle.execCol.task = qCreateQueueExecTaskInfo(
|
||||||
qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper);
|
handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper);
|
||||||
ASSERT(handle.execHandle.execCol.task);
|
ASSERT(handle.execHandle.execCol.task);
|
||||||
void* scanner = NULL;
|
void* scanner = NULL;
|
||||||
qExtractStreamScanner(handle.execHandle.execCol.task, &scanner);
|
qExtractStreamScanner(handle.execHandle.execCol.task, &scanner);
|
||||||
|
|
|
@ -340,29 +340,30 @@ FAIL:
|
||||||
|
|
||||||
void tqReaderSetColIdList(STqReader* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; }
|
void tqReaderSetColIdList(STqReader* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; }
|
||||||
|
|
||||||
int tqReaderSetTbUidList(STqReader* pHandle, const SArray* tbUidList) {
|
int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) {
|
||||||
if (pHandle->tbIdHash) {
|
if (pReader->tbIdHash) {
|
||||||
taosHashClear(pHandle->tbIdHash);
|
taosHashClear(pReader->tbIdHash);
|
||||||
|
} else {
|
||||||
|
pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||||
}
|
}
|
||||||
|
|
||||||
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
if (pReader->tbIdHash == NULL) {
|
||||||
if (pHandle->tbIdHash == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
||||||
int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
|
int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
|
||||||
taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
|
taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tqReaderAddTbUidList(STqReader* pHandle, const SArray* tbUidList) {
|
int tqReaderAddTbUidList(STqReader* pReader, const SArray* tbUidList) {
|
||||||
if (pHandle->tbIdHash == NULL) {
|
if (pReader->tbIdHash == NULL) {
|
||||||
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||||
if (pHandle->tbIdHash == NULL) {
|
if (pReader->tbIdHash == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -370,18 +371,18 @@ int tqReaderAddTbUidList(STqReader* pHandle, const SArray* tbUidList) {
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
||||||
int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
|
int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
|
||||||
taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
|
taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tqReaderRemoveTbUidList(STqReader* pHandle, const SArray* tbUidList) {
|
int tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
|
||||||
ASSERT(pHandle->tbIdHash != NULL);
|
ASSERT(pReader->tbIdHash != NULL);
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
||||||
int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
|
int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
|
||||||
taosHashRemove(pHandle->tbIdHash, pKey, sizeof(int64_t));
|
taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t));
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -110,7 +110,6 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
|
||||||
// TODO
|
// TODO
|
||||||
ret = rpcMallocCont(cap);
|
ret = rpcMallocCont(cap);
|
||||||
ret->header.vgId = vgId;
|
ret->header.vgId = vgId;
|
||||||
ret->version = htonl(1);
|
|
||||||
ret->length = sizeof(SSubmitReq);
|
ret->length = sizeof(SSubmitReq);
|
||||||
ret->numOfBlocks = htonl(sz);
|
ret->numOfBlocks = htonl(sz);
|
||||||
|
|
||||||
|
|
|
@ -135,7 +135,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
||||||
// open tq
|
// open tq
|
||||||
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR);
|
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR);
|
||||||
taosRealPath(tdir, NULL, sizeof(tdir));
|
taosRealPath(tdir, NULL, sizeof(tdir));
|
||||||
pVnode->pTq = tqOpen(tdir, pVnode, pVnode->pWal);
|
pVnode->pTq = tqOpen(tdir, pVnode);
|
||||||
if (pVnode->pTq == NULL) {
|
if (pVnode->pTq == NULL) {
|
||||||
vError("vgId:%d, failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno));
|
vError("vgId:%d, failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
goto _err;
|
goto _err;
|
||||||
|
|
|
@ -206,6 +206,12 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
case TDMT_VND_CHECK_ALTER_INFO:
|
||||||
|
if (tqProcessCheckAlterInfoReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
||||||
|
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
break;
|
||||||
case TDMT_STREAM_TASK_DEPLOY: {
|
case TDMT_STREAM_TASK_DEPLOY: {
|
||||||
if (tqProcessTaskDeployReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
if (tqProcessTaskDeployReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
||||||
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||||
|
|
|
@ -351,7 +351,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
|
||||||
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
|
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
|
||||||
|
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
|
|
||||||
qDebug("switch to next table %ld (cursor %d), %ld rows returned", uid, pTableScanInfo->currentTable,
|
qDebug("switch to next table %ld (cursor %d), %ld rows returned", uid, pTableScanInfo->currentTable,
|
||||||
pInfo->pTableScanOp->resultInfo.totalRows);
|
pInfo->pTableScanOp->resultInfo.totalRows);
|
||||||
pInfo->pTableScanOp->resultInfo.totalRows = 0;
|
pInfo->pTableScanOp->resultInfo.totalRows = 0;
|
||||||
|
|
|
@ -2667,6 +2667,7 @@ static int32_t jsonToExprNode(const SJson* pJson, void* pObj) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static const char* jkColumnTableId = "TableId";
|
static const char* jkColumnTableId = "TableId";
|
||||||
|
static const char* jkColumnTableType = "TableType";
|
||||||
static const char* jkColumnColId = "ColId";
|
static const char* jkColumnColId = "ColId";
|
||||||
static const char* jkColumnColType = "ColType";
|
static const char* jkColumnColType = "ColType";
|
||||||
static const char* jkColumnDbName = "DbName";
|
static const char* jkColumnDbName = "DbName";
|
||||||
|
@ -2683,6 +2684,9 @@ static int32_t columnNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddIntegerToObject(pJson, jkColumnTableId, pNode->tableId);
|
code = tjsonAddIntegerToObject(pJson, jkColumnTableId, pNode->tableId);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddIntegerToObject(pJson, jkColumnTableType, pNode->tableType);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddIntegerToObject(pJson, jkColumnColId, pNode->colId);
|
code = tjsonAddIntegerToObject(pJson, jkColumnColId, pNode->colId);
|
||||||
}
|
}
|
||||||
|
@ -2718,6 +2722,9 @@ static int32_t jsonToColumnNode(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetUBigIntValue(pJson, jkColumnTableId, &pNode->tableId);
|
code = tjsonGetUBigIntValue(pJson, jkColumnTableId, &pNode->tableId);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonGetTinyIntValue(pJson, jkColumnTableType, &pNode->tableType);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetSmallIntValue(pJson, jkColumnColId, &pNode->colId);
|
code = tjsonGetSmallIntValue(pJson, jkColumnColId, &pNode->colId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -336,6 +336,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_NOT_EXIST, "Table does not exists
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TABLE_ACTION, "Invalid table action")
|
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TABLE_ACTION, "Invalid table action")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already exists")
|
TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already exists")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_COL_NOT_EXISTS, "Table column not exists")
|
TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_COL_NOT_EXISTS, "Table column not exists")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_SUBSCRIBED, "Table column is subscribed")
|
||||||
|
|
||||||
// tsdb
|
// tsdb
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")
|
||||||
|
|
|
@ -1,91 +0,0 @@
|
||||||
###################################################################
|
|
||||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
|
||||||
# All rights reserved.
|
|
||||||
#
|
|
||||||
# This file is proprietary and confidential to TAOS Technologies.
|
|
||||||
# No part of this file may be reproduced, stored, transmitted,
|
|
||||||
# disclosed or used in any form or by any means other than as
|
|
||||||
# expressly provided by the written permission from Jianhui Tao
|
|
||||||
#
|
|
||||||
###################################################################
|
|
||||||
|
|
||||||
# -*- coding: utf-8 -*-
|
|
||||||
|
|
||||||
from util.log import tdLog
|
|
||||||
from util.cases import tdCases
|
|
||||||
from util.sql import tdSql
|
|
||||||
|
|
||||||
class TDTestCase:
|
|
||||||
def init(self, conn, logSql):
|
|
||||||
tdLog.debug("start to execute %s" % __file__)
|
|
||||||
tdSql.init(conn.cursor(), logSql)
|
|
||||||
|
|
||||||
self.ts = 1538548685000
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
tdSql.prepare()
|
|
||||||
|
|
||||||
# test case for https://jira.taosdata.com:18080/browse/TD-3679
|
|
||||||
print("==============step1")
|
|
||||||
tdSql.execute(
|
|
||||||
"create topic tq_test partitions 10")
|
|
||||||
tdSql.execute(
|
|
||||||
"insert into tq_test.p1(off, ts, content) values(0, %d, 'aaaa')" % self.ts)
|
|
||||||
tdSql.execute(
|
|
||||||
"insert into tq_test.p1(off, ts, content) values(1, %d, 'aaaa')" % (self.ts + 1))
|
|
||||||
tdSql.execute(
|
|
||||||
"insert into tq_test.p1(off, ts, content) values(2, %d, 'aaaa')" % (self.ts + 2))
|
|
||||||
tdSql.execute(
|
|
||||||
"insert into tq_test.p1(off, ts, content) values(3, %d, 'aaaa')" % (self.ts + 3))
|
|
||||||
|
|
||||||
print("==============step2")
|
|
||||||
|
|
||||||
tdSql.query("select * from tq_test.p1")
|
|
||||||
tdSql.checkRows(4)
|
|
||||||
|
|
||||||
tdSql.query("select * from tq_test.p1 where ts >= %d" % self.ts)
|
|
||||||
tdSql.checkRows(4)
|
|
||||||
|
|
||||||
tdSql.query("select * from tq_test.p1 where ts > %d" % self.ts)
|
|
||||||
tdSql.checkRows(3)
|
|
||||||
|
|
||||||
tdSql.query("select * from tq_test.p1 where ts = %d" % self.ts)
|
|
||||||
tdSql.checkRows(1)
|
|
||||||
|
|
||||||
|
|
||||||
tdSql.execute("use db")
|
|
||||||
tdSql.execute("create table test(ts timestamp, start timestamp, value int)")
|
|
||||||
tdSql.execute("insert into test values(%d, %d, 1)" % (self.ts, self.ts))
|
|
||||||
tdSql.execute("insert into test values(%d, %d, 1)" % (self.ts + 1, self.ts + 1))
|
|
||||||
tdSql.execute("insert into test values(%d, %d, 1)" % (self.ts + 2, self.ts + 2))
|
|
||||||
tdSql.execute("insert into test values(%d, %d, 1)" % (self.ts + 3, self.ts + 3))
|
|
||||||
|
|
||||||
tdSql.query("select * from test")
|
|
||||||
tdSql.checkRows(4)
|
|
||||||
|
|
||||||
tdSql.query("select * from test where ts >= %d" % self.ts)
|
|
||||||
tdSql.checkRows(4)
|
|
||||||
|
|
||||||
tdSql.query("select * from test where ts > %d" % self.ts)
|
|
||||||
tdSql.checkRows(3)
|
|
||||||
|
|
||||||
tdSql.query("select * from test where ts = %d" % self.ts)
|
|
||||||
tdSql.checkRows(1)
|
|
||||||
|
|
||||||
tdSql.query("select * from test where start >= %d" % self.ts)
|
|
||||||
tdSql.checkRows(4)
|
|
||||||
|
|
||||||
tdSql.query("select * from test where start > %d" % self.ts)
|
|
||||||
tdSql.checkRows(3)
|
|
||||||
|
|
||||||
tdSql.query("select * from test where start = %d" % self.ts)
|
|
||||||
tdSql.checkRows(1)
|
|
||||||
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
tdSql.close()
|
|
||||||
tdLog.success("%s successfully executed" % __file__)
|
|
||||||
|
|
||||||
|
|
||||||
tdCases.addWindows(__file__, TDTestCase())
|
|
||||||
tdCases.addLinux(__file__, TDTestCase())
|
|
|
@ -28,6 +28,7 @@ target_link_libraries(
|
||||||
sdbDump
|
sdbDump
|
||||||
PUBLIC dnode
|
PUBLIC dnode
|
||||||
PUBLIC mnode
|
PUBLIC mnode
|
||||||
|
PUBLIC stream
|
||||||
PUBLIC sdb
|
PUBLIC sdb
|
||||||
PUBLIC os
|
PUBLIC os
|
||||||
)
|
)
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
Subproject commit 69b558ccbfe54a4407fe23eeae2e67c540f59e55
|
Subproject commit 0b8a3373bb7548f8106d13e7d3b0a988d3c4d48a
|
|
@ -1 +1 @@
|
||||||
Subproject commit d8f19ede56f1f489c5d2ac8f963cced01e68ecef
|
Subproject commit df8678f070e3f707faf59baebec90065f6e1268b
|
Loading…
Reference in New Issue