Merge branch '3.0' into feature/TD-11274-3.0
This commit is contained in:
commit
9b4a9d24d5
|
@ -34,11 +34,10 @@ typedef enum {
|
|||
TSDB_SUPER_TABLE = 1, // super table
|
||||
TSDB_CHILD_TABLE = 2, // table created from super table
|
||||
TSDB_NORMAL_TABLE = 3, // ordinary table
|
||||
TSDB_STREAM_TABLE = 4, // table created from stream computing
|
||||
TSDB_TEMP_TABLE = 5, // temp table created by nest query
|
||||
TSDB_SYSTEM_TABLE = 6,
|
||||
TSDB_TSMA_TABLE = 7, // time-range-wise sma
|
||||
TSDB_TABLE_MAX = 8
|
||||
TSDB_TEMP_TABLE = 4, // temp table created by nest query
|
||||
TSDB_SYSTEM_TABLE = 5,
|
||||
TSDB_TSMA_TABLE = 6, // time-range-wise sma
|
||||
TSDB_TABLE_MAX = 7
|
||||
} ETableType;
|
||||
|
||||
typedef enum {
|
||||
|
|
|
@ -2537,6 +2537,15 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq)
|
|||
return (void*)buf;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||
int64_t ntbUid;
|
||||
SArray* colIdList; // SArray<int16_t>
|
||||
} SCheckAlterInfo;
|
||||
|
||||
int32_t tEncodeSCheckAlterInfo(SEncoder* pEncoder, const SCheckAlterInfo* pInfo);
|
||||
int32_t tDecodeSCheckAlterInfo(SDecoder* pDecoder, SCheckAlterInfo* pInfo);
|
||||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
int64_t offset;
|
||||
|
|
|
@ -186,6 +186,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_CHANGE, "vnode-mq-vg-change", SMqRebVgReq, SMqRebVgRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_VG_DELETE, "vnode-mq-vg-delete", SMqVDeleteReq, SMqVDeleteRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_MQ_COMMIT_OFFSET, "vnode-commit-offset", STqOffset, STqOffset)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CHECK_ALTER_INFO, "vnode-alter-check-info", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_TOPIC, "vnode-create-topic", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_TOPIC, "vnode-alter-topic", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TOPIC, "vnode-drop-topic", NULL, NULL)
|
||||
|
|
|
@ -64,7 +64,8 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers);
|
|||
* @param SReadHandle
|
||||
* @return
|
||||
*/
|
||||
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchemaWrapper);
|
||||
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols,
|
||||
SSchemaWrapper** pSchemaWrapper);
|
||||
|
||||
/**
|
||||
* Set the input data block for the stream scan.
|
||||
|
|
|
@ -332,6 +332,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_VND_INVALID_TABLE_ACTION TAOS_DEF_ERROR_CODE(0, 0x0519)
|
||||
#define TSDB_CODE_VND_COL_ALREADY_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051a)
|
||||
#define TSDB_CODE_VND_TABLE_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x051b)
|
||||
#define TSDB_CODE_VND_COL_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x051c)
|
||||
|
||||
// tsdb
|
||||
#define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600)
|
||||
|
|
|
@ -226,8 +226,17 @@ static int32_t taosSetTfsCfg(SConfig *pCfg) {
|
|||
}
|
||||
|
||||
if (tsDataDir[0] == 0) {
|
||||
uError("datadir not set");
|
||||
return -1;
|
||||
if (pItem->str != NULL) {
|
||||
taosAddDataDir(0, pItem->str, 0, 1);
|
||||
tstrncpy(tsDataDir, pItem->str, PATH_MAX);
|
||||
if (taosMulMkDir(tsDataDir) != 0) {
|
||||
uError("failed to create dataDir:%s since %s", tsDataDir, terrstr());
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
uError("datadir not set");
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -5623,6 +5623,33 @@ int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tEncodeSCheckAlterInfo(SEncoder *pEncoder, const SCheckAlterInfo *pInfo) {
|
||||
if (tEncodeCStr(pEncoder, pInfo->topic) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pInfo->ntbUid) < 0) return -1;
|
||||
int32_t sz = taosArrayGetSize(pInfo->colIdList);
|
||||
if (tEncodeI32(pEncoder, sz) < 0) return -1;
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
int16_t colId = *(int16_t *)taosArrayGet(pInfo->colIdList, i);
|
||||
if (tEncodeI16(pEncoder, colId) < 0) return -1;
|
||||
}
|
||||
return pEncoder->pos;
|
||||
}
|
||||
|
||||
int32_t tDecodeSCheckAlterInfo(SDecoder *pDecoder, SCheckAlterInfo *pInfo) {
|
||||
if (tDecodeCStrTo(pDecoder, pInfo->topic) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pInfo->ntbUid) < 0) return -1;
|
||||
int32_t sz;
|
||||
if (tDecodeI32(pDecoder, &sz) < 0) return -1;
|
||||
pInfo->colIdList = taosArrayInit(sz, sizeof(int16_t));
|
||||
if (pInfo->colIdList == NULL) return -1;
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
int16_t colId;
|
||||
if (tDecodeI16(pDecoder, &colId) < 0) return -1;
|
||||
taosArrayPush(pInfo->colIdList, &colId);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) {
|
||||
int32_t nUid = taosArrayGetSize(pRes->uidList);
|
||||
|
||||
|
|
|
@ -224,6 +224,7 @@ SArray *mmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_SMA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_CHECK_ALTER_INFO_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -352,6 +352,7 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_CHANGE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_VG_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_MQ_COMMIT_OFFSET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_CHECK_ALTER_INFO, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_CONSUME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DELETE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_COMMIT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -144,33 +144,33 @@ typedef enum {
|
|||
} ECsmUpdateType;
|
||||
|
||||
typedef struct {
|
||||
int32_t id;
|
||||
ETrnStage stage;
|
||||
ETrnPolicy policy;
|
||||
ETrnConflct conflict;
|
||||
ETrnExec exec;
|
||||
EOperType oper;
|
||||
int32_t code;
|
||||
int32_t failedTimes;
|
||||
void* rpcRsp;
|
||||
int32_t rpcRspLen;
|
||||
int32_t redoActionPos;
|
||||
SArray* redoActions;
|
||||
SArray* undoActions;
|
||||
SArray* commitActions;
|
||||
int64_t createdTime;
|
||||
int64_t lastExecTime;
|
||||
int32_t lastAction;
|
||||
int32_t lastErrorNo;
|
||||
tmsg_t lastMsgType;
|
||||
SEpSet lastEpset;
|
||||
char dbname1[TSDB_DB_FNAME_LEN];
|
||||
char dbname2[TSDB_DB_FNAME_LEN];
|
||||
int32_t startFunc;
|
||||
int32_t stopFunc;
|
||||
int32_t paramLen;
|
||||
void* param;
|
||||
SArray* pRpcArray;
|
||||
int32_t id;
|
||||
ETrnStage stage;
|
||||
ETrnPolicy policy;
|
||||
ETrnConflct conflict;
|
||||
ETrnExec exec;
|
||||
EOperType oper;
|
||||
int32_t code;
|
||||
int32_t failedTimes;
|
||||
void* rpcRsp;
|
||||
int32_t rpcRspLen;
|
||||
int32_t redoActionPos;
|
||||
SArray* redoActions;
|
||||
SArray* undoActions;
|
||||
SArray* commitActions;
|
||||
int64_t createdTime;
|
||||
int64_t lastExecTime;
|
||||
int32_t lastAction;
|
||||
int32_t lastErrorNo;
|
||||
tmsg_t lastMsgType;
|
||||
SEpSet lastEpset;
|
||||
char dbname1[TSDB_DB_FNAME_LEN];
|
||||
char dbname2[TSDB_DB_FNAME_LEN];
|
||||
int32_t startFunc;
|
||||
int32_t stopFunc;
|
||||
int32_t paramLen;
|
||||
void* param;
|
||||
SArray* pRpcArray;
|
||||
} STrans;
|
||||
|
||||
typedef struct {
|
||||
|
@ -477,6 +477,10 @@ typedef struct {
|
|||
char* physicalPlan;
|
||||
SSchemaWrapper schema;
|
||||
int64_t stbUid;
|
||||
// forbid condition
|
||||
int64_t ntbUid;
|
||||
SArray* ntbColIds;
|
||||
int64_t ctbStbUid;
|
||||
} SMqTopicObj;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -57,6 +57,7 @@ int32_t mndInitTopic(SMnode *pMnode) {
|
|||
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndTransProcessRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_CHECK_ALTER_INFO_RSP, mndTransProcessRsp);
|
||||
|
||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
|
||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
|
||||
|
@ -74,7 +75,6 @@ const char *mndTopicGetShowName(const char topic[TSDB_TOPIC_FNAME_LEN]) {
|
|||
int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
bool found = false;
|
||||
while (1) {
|
||||
SMqTopicObj *pTopic = NULL;
|
||||
pIter = sdbFetch(pSdb, SDB_TOPIC, pIter, (void **)&pTopic);
|
||||
|
@ -95,10 +95,12 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId
|
|||
SNode *pNode = NULL;
|
||||
FOREACH(pNode, pNodeList) {
|
||||
SColumnNode *pCol = (SColumnNode *)pNode;
|
||||
if (pCol->tableId != suid) goto NEXT;
|
||||
if (pCol->tableId != suid && pTopic->ctbStbUid != suid) goto NEXT;
|
||||
if (pCol->colId > 0 && pCol->colId == colId) {
|
||||
found = true;
|
||||
goto NEXT;
|
||||
sdbRelease(pSdb, pTopic);
|
||||
nodesDestroyNode(pAst);
|
||||
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
|
||||
return -1;
|
||||
}
|
||||
mTrace("topic:%s, colId:%d is used", pTopic->name, pCol->colId);
|
||||
}
|
||||
|
@ -106,10 +108,6 @@ int32_t mndCheckColAndTagModifiable(SMnode *pMnode, int64_t suid, col_id_t colId
|
|||
NEXT:
|
||||
sdbRelease(pSdb, pTopic);
|
||||
nodesDestroyNode(pAst);
|
||||
if (found) {
|
||||
terrno = TSDB_CODE_MND_FIELD_CONFLICT_WITH_TOPIC;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -127,8 +125,10 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
|||
if (pTopic->schema.nCols) {
|
||||
schemaLen = taosEncodeSSchemaWrapper(NULL, &pTopic->schema);
|
||||
}
|
||||
int32_t size =
|
||||
sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + pTopic->astLen + schemaLen + MND_TOPIC_RESERVE_SIZE;
|
||||
int32_t ntbColLen = taosArrayGetSize(pTopic->ntbColIds) * sizeof(int16_t);
|
||||
|
||||
int32_t size = sizeof(SMqTopicObj) + physicalPlanLen + pTopic->sqlLen + pTopic->astLen + schemaLen + ntbColLen +
|
||||
MND_TOPIC_RESERVE_SIZE;
|
||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
|
||||
if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
|
||||
|
||||
|
@ -164,6 +164,16 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
|||
taosEncodeSSchemaWrapper(&aswBuf, &pTopic->schema);
|
||||
SDB_SET_BINARY(pRaw, dataPos, swBuf, schemaLen, TOPIC_ENCODE_OVER);
|
||||
}
|
||||
SDB_SET_INT64(pRaw, dataPos, pTopic->ntbUid, TOPIC_ENCODE_OVER);
|
||||
if (pTopic->ntbUid != 0) {
|
||||
int32_t sz = taosArrayGetSize(pTopic->ntbColIds);
|
||||
SDB_SET_INT32(pRaw, dataPos, sz, TOPIC_ENCODE_OVER);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
int16_t colId = *(int16_t *)taosArrayGet(pTopic->ntbColIds, i);
|
||||
SDB_SET_INT16(pRaw, dataPos, colId, TOPIC_ENCODE_OVER);
|
||||
}
|
||||
}
|
||||
SDB_SET_INT64(pRaw, dataPos, pTopic->ctbStbUid, TOPIC_ENCODE_OVER);
|
||||
|
||||
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
|
||||
SDB_SET_DATALEN(pRaw, dataPos, TOPIC_ENCODE_OVER);
|
||||
|
@ -259,6 +269,20 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
|||
pTopic->schema.version = 0;
|
||||
pTopic->schema.pSchema = NULL;
|
||||
}
|
||||
SDB_GET_INT64(pRaw, dataPos, &pTopic->ntbUid, TOPIC_DECODE_OVER);
|
||||
if (pTopic->ntbUid != 0) {
|
||||
int32_t ntbColNum;
|
||||
SDB_GET_INT32(pRaw, dataPos, &ntbColNum, TOPIC_DECODE_OVER);
|
||||
pTopic->ntbColIds = taosArrayInit(ntbColNum, sizeof(int16_t));
|
||||
if (pTopic->ntbColIds == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto TOPIC_DECODE_OVER;
|
||||
}
|
||||
int16_t colId;
|
||||
SDB_GET_INT16(pRaw, dataPos, &colId, TOPIC_DECODE_OVER);
|
||||
taosArrayPush(pTopic->ntbColIds, &colId);
|
||||
}
|
||||
SDB_GET_INT64(pRaw, dataPos, &pTopic->ctbStbUid, TOPIC_DECODE_OVER);
|
||||
|
||||
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
|
||||
|
||||
|
@ -346,6 +370,26 @@ static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t extractTopicTbInfo(SNode *pAst, SMqTopicObj *pTopic) {
|
||||
SNodeList *pNodeList = NULL;
|
||||
nodesCollectColumns((SSelectStmt *)pAst, SQL_CLAUSE_FROM, NULL, COLLECT_COL_TYPE_ALL, &pNodeList);
|
||||
int64_t suid = ((SRealTableNode *)((SSelectStmt *)pAst)->pFromTable)->pMeta->suid;
|
||||
int8_t tableType = ((SRealTableNode *)((SSelectStmt *)pAst)->pFromTable)->pMeta->tableType;
|
||||
if (tableType == TSDB_CHILD_TABLE) {
|
||||
pTopic->ctbStbUid = suid;
|
||||
} else if (tableType == TSDB_NORMAL_TABLE) {
|
||||
SNode *pNode = NULL;
|
||||
FOREACH(pNode, pNodeList) {
|
||||
SColumnNode *pCol = (SColumnNode *)pNode;
|
||||
if (pCol->tableType == TSDB_NORMAL_TABLE) {
|
||||
pTopic->ntbUid = pCol->tableId;
|
||||
taosArrayPush(pTopic->ntbColIds, &pCol->colId);
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
|
||||
mDebug("topic:%s to create", pCreate->name);
|
||||
SMqTopicObj topicObj = {0};
|
||||
|
@ -386,6 +430,19 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
return -1;
|
||||
}
|
||||
|
||||
int64_t ntbUid;
|
||||
topicObj.ntbColIds = taosArrayInit(0, sizeof(int16_t));
|
||||
if (topicObj.ntbColIds == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
extractTopicTbInfo(pAst, &topicObj);
|
||||
|
||||
if (topicObj.ntbUid == 0) {
|
||||
taosArrayDestroy(topicObj.ntbColIds);
|
||||
topicObj.ntbColIds = NULL;
|
||||
}
|
||||
|
||||
if (qExtractResultSchema(pAst, &topicObj.schema.nCols, &topicObj.schema.pSchema) != 0) {
|
||||
mError("topic:%s, failed to create since %s", pCreate->name, terrstr());
|
||||
taosMemoryFree(topicObj.ast);
|
||||
|
@ -433,6 +490,60 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
}
|
||||
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||
|
||||
if (topicObj.ntbUid != 0) {
|
||||
SCheckAlterInfo info;
|
||||
memcpy(info.topic, topicObj.name, TSDB_TOPIC_FNAME_LEN);
|
||||
info.ntbUid = topicObj.ntbUid;
|
||||
info.colIdList = topicObj.ntbColIds;
|
||||
// broadcast forbid alter info
|
||||
void *pIter = NULL;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SVgObj *pVgroup = NULL;
|
||||
while (1) {
|
||||
// iterate vg
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||
if (pIter == NULL) break;
|
||||
if (!mndVgroupInDb(pVgroup, topicObj.dbUid)) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
continue;
|
||||
}
|
||||
|
||||
// encoder check alter info
|
||||
int32_t len;
|
||||
int32_t code;
|
||||
tEncodeSize(tEncodeSCheckAlterInfo, &info, len, code);
|
||||
if (code < 0) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
mndTransDrop(pTrans);
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
|
||||
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, abuf, len);
|
||||
if (tEncodeSCheckAlterInfo(&encoder, &info) < 0) {
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
tEncoderClear(&encoder);
|
||||
((SMsgHead *)buf)->vgId = htonl(pVgroup->vgId);
|
||||
// add redo action
|
||||
STransAction action = {0};
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
action.pCont = buf;
|
||||
action.contLen = sizeof(SMsgHead) + len;
|
||||
action.msgType = TDMT_VND_CHECK_ALTER_INFO;
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(buf);
|
||||
sdbRelease(pSdb, pVgroup);
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
taosMemoryFreeClear(topicObj.physicalPlan);
|
||||
|
@ -442,7 +553,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
|
|||
|
||||
taosMemoryFreeClear(topicObj.physicalPlan);
|
||||
mndTransDrop(pTrans);
|
||||
return 0;
|
||||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
|
||||
|
|
|
@ -89,7 +89,7 @@ typedef struct {
|
|||
STqExecDb execDb;
|
||||
};
|
||||
int32_t numOfCols; // number of out pout column, temporarily used
|
||||
SSchemaWrapper *pSchemaWrapper; // columns that are involved in query
|
||||
SSchemaWrapper* pSchemaWrapper; // columns that are involved in query
|
||||
} STqExecHandle;
|
||||
|
||||
typedef struct {
|
||||
|
@ -110,9 +110,6 @@ typedef struct {
|
|||
// exec
|
||||
STqExecHandle execHandle;
|
||||
|
||||
// prevent drop
|
||||
int64_t ntbUid;
|
||||
SArray* colIdList; // SArray<int32_t>
|
||||
} STqHandle;
|
||||
|
||||
struct STQ {
|
||||
|
@ -120,9 +117,9 @@ struct STQ {
|
|||
SHashObj* pushMgr; // consumerId -> STqHandle*
|
||||
SHashObj* handles; // subKey -> STqHandle
|
||||
SHashObj* pStreamTasks; // taksId -> SStreamTask
|
||||
SHashObj* pAlterInfo; // topic -> SAlterCheckInfo
|
||||
STqOffsetStore* pOffsetStore;
|
||||
SVnode* pVnode;
|
||||
SWal* pWal;
|
||||
TDB* pMetaStore;
|
||||
TTB* pExecStore;
|
||||
};
|
||||
|
|
|
@ -137,12 +137,13 @@ STsdbReader tsdbQueryCacheLastT(STsdb* tsdb, SQueryTableDataCond* pCond, STableL
|
|||
// tq
|
||||
int tqInit();
|
||||
void tqCleanUp();
|
||||
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal);
|
||||
STQ* tqOpen(const char* path, SVnode* pVnode);
|
||||
void tqClose(STQ*);
|
||||
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
|
||||
int tqCommit(STQ*);
|
||||
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
|
||||
int32_t tqCheckColModifiable(STQ* pTq, int32_t colId);
|
||||
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId);
|
||||
int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||
|
|
|
@ -640,6 +640,10 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
|||
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
|
||||
goto _err;
|
||||
}
|
||||
if (tqCheckColModifiable(pMeta->pVnode->pTq, uid, pColumn->colId) != 0) {
|
||||
terrno = TSDB_CODE_VND_COL_SUBSCRIBED;
|
||||
goto _err;
|
||||
}
|
||||
pSchema->version++;
|
||||
tlen = (pSchema->nCols - iCol - 1) * sizeof(SSchema);
|
||||
if (tlen) {
|
||||
|
@ -656,6 +660,10 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
|||
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
|
||||
goto _err;
|
||||
}
|
||||
if (tqCheckColModifiable(pMeta->pVnode->pTq, uid, pColumn->colId) != 0) {
|
||||
terrno = TSDB_CODE_VND_COL_SUBSCRIBED;
|
||||
goto _err;
|
||||
}
|
||||
pSchema->version++;
|
||||
pColumn->bytes = pAlterTbReq->colModBytes;
|
||||
break;
|
||||
|
@ -664,6 +672,10 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
|||
terrno = TSDB_CODE_VND_TABLE_COL_NOT_EXISTS;
|
||||
goto _err;
|
||||
}
|
||||
if (tqCheckColModifiable(pMeta->pVnode->pTq, uid, pColumn->colId) != 0) {
|
||||
terrno = TSDB_CODE_VND_COL_SUBSCRIBED;
|
||||
goto _err;
|
||||
}
|
||||
pSchema->version++;
|
||||
strcpy(pColumn->name, pAlterTbReq->colNewName);
|
||||
break;
|
||||
|
|
|
@ -51,7 +51,7 @@ void tqCleanUp() {
|
|||
}
|
||||
}
|
||||
|
||||
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
|
||||
STQ* tqOpen(const char* path, SVnode* pVnode) {
|
||||
STQ* pTq = taosMemoryCalloc(1, sizeof(STQ));
|
||||
if (pTq == NULL) {
|
||||
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
|
||||
|
@ -59,7 +59,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
|
|||
}
|
||||
pTq->path = strdup(path);
|
||||
pTq->pVnode = pVnode;
|
||||
pTq->pWal = pWal;
|
||||
|
||||
pTq->handles = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||
|
||||
|
@ -67,6 +66,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
|
|||
|
||||
pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
|
||||
|
||||
pTq->pAlterInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||
|
||||
if (tqMetaOpen(pTq) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
@ -91,6 +92,7 @@ void tqClose(STQ* pTq) {
|
|||
}
|
||||
taosHashCleanup(pTq->pStreamTasks);
|
||||
taosHashCleanup(pTq->pushMgr);
|
||||
taosHashCleanup(pTq->pAlterInfo);
|
||||
taosMemoryFree(pTq->path);
|
||||
tqMetaClose(pTq);
|
||||
taosMemoryFree(pTq);
|
||||
|
@ -208,18 +210,18 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqCheckColModifiable(STQ* pTq, int32_t colId) {
|
||||
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) {
|
||||
void* pIter = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pTq->handles, pIter);
|
||||
pIter = taosHashIterate(pTq->pAlterInfo, pIter);
|
||||
if (pIter == NULL) break;
|
||||
STqHandle* pExec = (STqHandle*)pIter;
|
||||
if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
int32_t sz = pExec->execHandle.pSchemaWrapper->nCols;
|
||||
SCheckAlterInfo* pCheck = (SCheckAlterInfo*)pIter;
|
||||
if (pCheck->ntbUid == tbUid) {
|
||||
int32_t sz = taosArrayGetSize(pCheck->colIdList);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SSchema* pSchema = &pExec->execHandle.pSchemaWrapper->pSchema[i];
|
||||
if (pSchema->colId == colId) {
|
||||
taosHashCancelIterate(pTq->handles, pIter);
|
||||
int16_t forbidColId = *(int16_t*)taosArrayGet(pCheck->colIdList, i);
|
||||
if (forbidColId == colId) {
|
||||
taosHashCancelIterate(pTq->pAlterInfo, pIter);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -270,6 +272,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
int32_t code = 0;
|
||||
STqOffsetVal reqOffset = pReq->reqOffset;
|
||||
STqOffsetVal fetchOffsetNew;
|
||||
SWalCkHead* pCkHead = NULL;
|
||||
|
||||
// 1.find handle
|
||||
STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey));
|
||||
|
@ -459,6 +462,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
|
||||
OVER:
|
||||
if (pCkHead) taosMemoryFree(pCkHead);
|
||||
// TODO wrap in destroy func
|
||||
taosArrayDestroy(dataRsp.blockDataLen);
|
||||
taosArrayDestroyP(dataRsp.blockData, (FDelete)taosMemoryFree);
|
||||
|
@ -488,6 +492,22 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||
SCheckAlterInfo info = {0};
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, msg, msgLen);
|
||||
if (tDecodeSCheckAlterInfo(&decoder, &info) < 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
tDecoderClear(&decoder);
|
||||
if (taosHashPut(pTq->pAlterInfo, info.topic, strlen(info.topic), &info, sizeof(SCheckAlterInfo)) < 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||
SMqRebVgReq req = {0};
|
||||
tDecodeSMqRebVgReq(msg, &req);
|
||||
|
|
|
@ -201,10 +201,12 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
|
|||
if (pRsp->withTbName) {
|
||||
int64_t uid = pExec->pExecReader->msgIter.uid;
|
||||
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
|
||||
blockDataFreeRes(&block);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
|
||||
blockDataFreeRes(&block);
|
||||
tqAddBlockSchemaToRsp(pExec, pRsp);
|
||||
pRsp->blockNum++;
|
||||
}
|
||||
|
@ -220,10 +222,12 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR
|
|||
if (pRsp->withTbName) {
|
||||
int64_t uid = pExec->pExecReader->msgIter.uid;
|
||||
if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) {
|
||||
blockDataFreeRes(&block);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
tqAddBlockDataToRsp(&block, pRsp, taosArrayGetSize(block.pDataBlock));
|
||||
blockDataFreeRes(&block);
|
||||
tqAddBlockSchemaToRsp(pExec, pRsp);
|
||||
pRsp->blockNum++;
|
||||
}
|
||||
|
|
|
@ -89,8 +89,8 @@ int32_t tqMetaOpen(STQ* pTq) {
|
|||
.version = handle.snapshotVer,
|
||||
};
|
||||
|
||||
handle.execHandle.execCol.task =
|
||||
qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper);
|
||||
handle.execHandle.execCol.task = qCreateQueueExecTaskInfo(
|
||||
handle.execHandle.execCol.qmsg, &reader, &handle.execHandle.numOfCols, &handle.execHandle.pSchemaWrapper);
|
||||
ASSERT(handle.execHandle.execCol.task);
|
||||
void* scanner = NULL;
|
||||
qExtractStreamScanner(handle.execHandle.execCol.task, &scanner);
|
||||
|
|
|
@ -340,29 +340,30 @@ FAIL:
|
|||
|
||||
void tqReaderSetColIdList(STqReader* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; }
|
||||
|
||||
int tqReaderSetTbUidList(STqReader* pHandle, const SArray* tbUidList) {
|
||||
if (pHandle->tbIdHash) {
|
||||
taosHashClear(pHandle->tbIdHash);
|
||||
int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) {
|
||||
if (pReader->tbIdHash) {
|
||||
taosHashClear(pReader->tbIdHash);
|
||||
} else {
|
||||
pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||
}
|
||||
|
||||
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||
if (pHandle->tbIdHash == NULL) {
|
||||
if (pReader->tbIdHash == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
||||
int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
|
||||
taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
|
||||
taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tqReaderAddTbUidList(STqReader* pHandle, const SArray* tbUidList) {
|
||||
if (pHandle->tbIdHash == NULL) {
|
||||
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||
if (pHandle->tbIdHash == NULL) {
|
||||
int tqReaderAddTbUidList(STqReader* pReader, const SArray* tbUidList) {
|
||||
if (pReader->tbIdHash == NULL) {
|
||||
pReader->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||
if (pReader->tbIdHash == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
@ -370,18 +371,18 @@ int tqReaderAddTbUidList(STqReader* pHandle, const SArray* tbUidList) {
|
|||
|
||||
for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
||||
int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
|
||||
taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
|
||||
taosHashPut(pReader->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tqReaderRemoveTbUidList(STqReader* pHandle, const SArray* tbUidList) {
|
||||
ASSERT(pHandle->tbIdHash != NULL);
|
||||
int tqReaderRemoveTbUidList(STqReader* pReader, const SArray* tbUidList) {
|
||||
ASSERT(pReader->tbIdHash != NULL);
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
||||
int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
|
||||
taosHashRemove(pHandle->tbIdHash, pKey, sizeof(int64_t));
|
||||
taosHashRemove(pReader->tbIdHash, pKey, sizeof(int64_t));
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -110,7 +110,6 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
|
|||
// TODO
|
||||
ret = rpcMallocCont(cap);
|
||||
ret->header.vgId = vgId;
|
||||
ret->version = htonl(1);
|
||||
ret->length = sizeof(SSubmitReq);
|
||||
ret->numOfBlocks = htonl(sz);
|
||||
|
||||
|
|
|
@ -135,7 +135,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
|||
// open tq
|
||||
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR);
|
||||
taosRealPath(tdir, NULL, sizeof(tdir));
|
||||
pVnode->pTq = tqOpen(tdir, pVnode, pVnode->pWal);
|
||||
pVnode->pTq = tqOpen(tdir, pVnode);
|
||||
if (pVnode->pTq == NULL) {
|
||||
vError("vgId:%d, failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||
goto _err;
|
||||
|
|
|
@ -206,6 +206,12 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
|||
goto _err;
|
||||
}
|
||||
break;
|
||||
case TDMT_VND_CHECK_ALTER_INFO:
|
||||
if (tqProcessCheckAlterInfoReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
||||
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
break;
|
||||
case TDMT_STREAM_TASK_DEPLOY: {
|
||||
if (tqProcessTaskDeployReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
|
||||
pMsg->contLen - sizeof(SMsgHead)) < 0) {
|
||||
|
|
|
@ -351,7 +351,6 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
|
|||
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
|
||||
|
||||
#ifndef NDEBUG
|
||||
|
||||
qDebug("switch to next table %ld (cursor %d), %ld rows returned", uid, pTableScanInfo->currentTable,
|
||||
pInfo->pTableScanOp->resultInfo.totalRows);
|
||||
pInfo->pTableScanOp->resultInfo.totalRows = 0;
|
||||
|
|
|
@ -875,7 +875,7 @@ bool isUdfcUvMsgComplete(SClientConnBuf *connBuf);
|
|||
void udfcUvHandleRsp(SClientUvConn *conn);
|
||||
void udfcUvHandleError(SClientUvConn *conn);
|
||||
void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
|
||||
void onUdfcPipetWrite(uv_write_t *write, int status);
|
||||
void onUdfcPipeWrite(uv_write_t *write, int status);
|
||||
void onUdfcPipeConnect(uv_connect_t *connect, int status);
|
||||
int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask);
|
||||
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask);
|
||||
|
@ -1226,7 +1226,7 @@ int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *
|
|||
}
|
||||
|
||||
void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
|
||||
SClientUvConn *conn = handle->data;
|
||||
SClientUvConn *conn = handle->data;
|
||||
SClientConnBuf *connBuf = &conn->readBuf;
|
||||
|
||||
int32_t msgHeadSize = sizeof(int32_t) + sizeof(int64_t);
|
||||
|
@ -1244,6 +1244,9 @@ void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf
|
|||
buf->base = NULL;
|
||||
buf->len = 0;
|
||||
}
|
||||
} else if (connBuf->total == -1 && connBuf->len < msgHeadSize) {
|
||||
buf->base = connBuf->buf + connBuf->len;
|
||||
buf->len = msgHeadSize - connBuf->len;
|
||||
} else {
|
||||
connBuf->cap = connBuf->total > connBuf->cap ? connBuf->total : connBuf->cap;
|
||||
void *resultBuf = taosMemoryRealloc(connBuf->buf, connBuf->cap);
|
||||
|
@ -1258,8 +1261,7 @@ void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf
|
|||
}
|
||||
}
|
||||
|
||||
fnTrace("conn buf cap - len - total : %d - %d - %d", connBuf->cap, connBuf->len, connBuf->total);
|
||||
|
||||
fnDebug("udfc uv alloc buffer: cap - len - total : %d - %d - %d", connBuf->cap, connBuf->len, connBuf->total);
|
||||
}
|
||||
|
||||
bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) {
|
||||
|
@ -1267,7 +1269,7 @@ bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) {
|
|||
connBuf->total = *(int32_t *) (connBuf->buf);
|
||||
}
|
||||
if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) {
|
||||
fnTrace("udfc complete message is received, now handle it");
|
||||
fnDebug("udfc complete message is received, now handle it");
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
|
@ -1278,7 +1280,7 @@ void udfcUvHandleRsp(SClientUvConn *conn) {
|
|||
int64_t seqNum = *(int64_t *) (connBuf->buf + sizeof(int32_t)); // msglen then seqnum
|
||||
|
||||
if (QUEUE_EMPTY(&conn->taskQueue)) {
|
||||
fnError("udfc no task waiting for response on connection");
|
||||
fnError("udfc no task waiting on connection. response seqnum:%"PRId64, seqNum);
|
||||
return;
|
||||
}
|
||||
bool found = false;
|
||||
|
@ -1287,6 +1289,7 @@ void udfcUvHandleRsp(SClientUvConn *conn) {
|
|||
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
|
||||
|
||||
while (h != &conn->taskQueue) {
|
||||
fnDebug("udfc handle response iterate through queue. uvTask:%d-%p", task->seqNum, task);
|
||||
if (task->seqNum == seqNum) {
|
||||
if (found == false) {
|
||||
found = true;
|
||||
|
@ -1315,6 +1318,7 @@ void udfcUvHandleRsp(SClientUvConn *conn) {
|
|||
}
|
||||
|
||||
void udfcUvHandleError(SClientUvConn *conn) {
|
||||
fnDebug("handle error on conn: %p, pipe: %p", conn, conn->pipe);
|
||||
while (!QUEUE_EMPTY(&conn->taskQueue)) {
|
||||
QUEUE* h = QUEUE_HEAD(&conn->taskQueue);
|
||||
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue);
|
||||
|
@ -1328,7 +1332,7 @@ void udfcUvHandleError(SClientUvConn *conn) {
|
|||
}
|
||||
|
||||
void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
|
||||
fnTrace("udfc client %p, client read from pipe. nread: %zd", client, nread);
|
||||
fnDebug("udfc client %p, client read from pipe. nread: %zd", client, nread);
|
||||
if (nread == 0) return;
|
||||
|
||||
SClientUvConn *conn = client->data;
|
||||
|
@ -1338,31 +1342,25 @@ void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) {
|
|||
if (isUdfcUvMsgComplete(connBuf)) {
|
||||
udfcUvHandleRsp(conn);
|
||||
}
|
||||
|
||||
}
|
||||
if (nread < 0) {
|
||||
fnError("udfc client pipe %p read error: %zd, %s.", client, nread, uv_strerror(nread));
|
||||
fnError("udfc client pipe %p read error: %zd(%s).", client, nread, uv_strerror(nread));
|
||||
if (nread == UV_EOF) {
|
||||
fnError("\tudfc client pipe %p closed", client);
|
||||
}
|
||||
udfcUvHandleError(conn);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void onUdfcPipetWrite(uv_write_t *write, int status) {
|
||||
SClientUvTaskNode *uvTask = write->data;
|
||||
uv_pipe_t *pipe = uvTask->pipe;
|
||||
fnTrace("udfc client %p write length:%zu", pipe, uvTask->reqBuf.len);
|
||||
SClientUvConn *conn = pipe->data;
|
||||
if (status == 0) {
|
||||
QUEUE_INSERT_TAIL(&conn->taskQueue, &uvTask->connTaskQueue);
|
||||
} else {
|
||||
fnError("udfc client %p write error.", pipe);
|
||||
void onUdfcPipeWrite(uv_write_t *write, int status) {
|
||||
SClientUvConn *conn = write->data;
|
||||
if (status < 0) {
|
||||
fnError("udfc client connection %p write failed. status: %d(%s)", conn, status, uv_strerror(status));
|
||||
udfcUvHandleError(conn);
|
||||
} else {
|
||||
fnDebug("udfc client connection %p write succeed", conn);
|
||||
}
|
||||
taosMemoryFree(write);
|
||||
taosMemoryFree(uvTask->reqBuf.base);
|
||||
}
|
||||
|
||||
void onUdfcPipeConnect(uv_connect_t *connect, int status) {
|
||||
|
@ -1419,7 +1417,7 @@ int32_t udfcCreateUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN
|
|||
}
|
||||
|
||||
int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) {
|
||||
fnTrace("queue uv task to event loop, task: %d, %p", uvTask->type, uvTask);
|
||||
fnDebug("queue uv task to event loop, uvTask: %d-%p", uvTask->type, uvTask);
|
||||
SUdfcProxy *udfc = uvTask->udfc;
|
||||
uv_mutex_lock(&udfc->taskQueueMutex);
|
||||
QUEUE_INSERT_TAIL(&udfc->taskQueue, &uvTask->recvTaskQueue);
|
||||
|
@ -1427,14 +1425,14 @@ int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) {
|
|||
uv_async_send(&udfc->loopTaskAync);
|
||||
|
||||
uv_sem_wait(&uvTask->taskSem);
|
||||
fnInfo("udfc uv task finished. task: %d, %p", uvTask->type, uvTask);
|
||||
fnInfo("udfc uvTask finished. uvTask:%"PRId64"-%d-%p", uvTask->seqNum, uvTask->type, uvTask);
|
||||
uv_sem_destroy(&uvTask->taskSem);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
|
||||
fnTrace("event loop start uv task. task: %d, %p", uvTask->type, uvTask);
|
||||
fnDebug("event loop start uv task. uvTask: %"PRId64"-%d-%p", uvTask->seqNum, uvTask->type, uvTask);
|
||||
int32_t code = 0;
|
||||
|
||||
switch (uvTask->type) {
|
||||
|
@ -1465,10 +1463,12 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
|
|||
code = TSDB_CODE_UDF_PIPE_NO_PIPE;
|
||||
} else {
|
||||
uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t));
|
||||
write->data = uvTask;
|
||||
int err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipetWrite);
|
||||
write->data = pipe->data;
|
||||
QUEUE* connTaskQueue = &((SClientUvConn*)pipe->data)->taskQueue;
|
||||
QUEUE_INSERT_TAIL(connTaskQueue, &uvTask->connTaskQueue);
|
||||
int err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipeWrite);
|
||||
if (err != 0) {
|
||||
fnError("udfc event loop start req/rsp task uv_write failed. code: %s", uv_strerror(err));
|
||||
fnError("udfc event loop start req_rsp task uv_write failed. uvtask: %p, code: %s", uvTask, uv_strerror(err));
|
||||
}
|
||||
code = err;
|
||||
}
|
||||
|
@ -1618,6 +1618,7 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
|
|||
SClientUvTaskNode *uvTask = NULL;
|
||||
|
||||
udfcCreateUvTask(task, uvTaskType, &uvTask);
|
||||
fnDebug("udfc client task: %p created uvTask: %p. pipe: %p", task, uvTask, task->session->udfUvPipe);
|
||||
udfcQueueUvTask(uvTask);
|
||||
udfcGetUdfTaskResultFromUvTask(task, uvTask);
|
||||
if (uvTaskType == UV_TASK_CONNECT) {
|
||||
|
@ -1625,6 +1626,8 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
|
|||
SClientUvConn *conn = uvTask->pipe->data;
|
||||
conn->session = task->session;
|
||||
}
|
||||
taosMemoryFree(uvTask->reqBuf.base);
|
||||
uvTask->reqBuf.base = NULL;
|
||||
taosMemoryFree(uvTask);
|
||||
uvTask = NULL;
|
||||
return task->errCode;
|
||||
|
@ -1670,7 +1673,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
|
|||
|
||||
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
|
||||
SSDataBlock* output, SUdfInterBuf *newState) {
|
||||
fnTrace("udfc call udf. callType: %d, funcHandle: %p", callType, handle);
|
||||
fnDebug("udfc call udf. callType: %d, funcHandle: %p", callType, handle);
|
||||
SUdfcUvSession *session = (SUdfcUvSession *) handle;
|
||||
if (session->udfUvPipe == NULL) {
|
||||
fnError("No pipe to udfd");
|
||||
|
|
|
@ -671,6 +671,9 @@ void udfdAllocBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf) {
|
|||
fnError("udfd can not allocate enough memory") buf->base = NULL;
|
||||
buf->len = 0;
|
||||
}
|
||||
} else if (ctx->inputTotal == -1 && ctx->inputLen < msgHeadSize) {
|
||||
buf->base = ctx->inputBuf + ctx->inputLen;
|
||||
buf->len = msgHeadSize - ctx->inputLen;
|
||||
} else {
|
||||
ctx->inputCap = ctx->inputTotal > ctx->inputCap ? ctx->inputTotal : ctx->inputCap;
|
||||
void *inputBuf = taosMemoryRealloc(ctx->inputBuf, ctx->inputCap);
|
||||
|
|
|
@ -2667,6 +2667,7 @@ static int32_t jsonToExprNode(const SJson* pJson, void* pObj) {
|
|||
}
|
||||
|
||||
static const char* jkColumnTableId = "TableId";
|
||||
static const char* jkColumnTableType = "TableType";
|
||||
static const char* jkColumnColId = "ColId";
|
||||
static const char* jkColumnColType = "ColType";
|
||||
static const char* jkColumnDbName = "DbName";
|
||||
|
@ -2683,6 +2684,9 @@ static int32_t columnNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkColumnTableId, pNode->tableId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkColumnTableType, pNode->tableType);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkColumnColId, pNode->colId);
|
||||
}
|
||||
|
@ -2718,6 +2722,9 @@ static int32_t jsonToColumnNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetUBigIntValue(pJson, jkColumnTableId, &pNode->tableId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetTinyIntValue(pJson, jkColumnTableType, &pNode->tableType);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetSmallIntValue(pJson, jkColumnColId, &pNode->colId);
|
||||
}
|
||||
|
|
|
@ -626,6 +626,7 @@ void cfgDumpCfg(SConfig *pCfg, bool tsc, bool dump) {
|
|||
int32_t cfgLoadFromEnvVar(SConfig *pConfig) {
|
||||
char *line = NULL, *name, *value, *value2, *value3;
|
||||
int32_t olen, vlen, vlen2, vlen3;
|
||||
int32_t code = 0;
|
||||
ssize_t _bytes = 0;
|
||||
TdCmdPtr pCmd = taosOpenCmd("set");
|
||||
if (pCmd == NULL) {
|
||||
|
@ -658,9 +659,12 @@ int32_t cfgLoadFromEnvVar(SConfig *pConfig) {
|
|||
if (vlen3 != 0) value3[vlen3] = 0;
|
||||
}
|
||||
|
||||
cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_VAR);
|
||||
if (value2 != NULL && value3 != NULL && value2[0] != 0 && value3[0] != 0 && strcasecmp(name, "dataDir") == 0) {
|
||||
cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_ENV_VAR);
|
||||
code = cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_ENV_VAR);
|
||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
} else {
|
||||
code = cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_VAR);
|
||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -674,6 +678,7 @@ int32_t cfgLoadFromEnvVar(SConfig *pConfig) {
|
|||
int32_t cfgLoadFromEnvCmd(SConfig *pConfig, const char **envCmd) {
|
||||
char buf[1024], *name, *value, *value2, *value3;
|
||||
int32_t olen, vlen, vlen2, vlen3;
|
||||
int32_t code = 0;
|
||||
int32_t index = 0;
|
||||
if (envCmd == NULL) return 0;
|
||||
while (envCmd[index]!=NULL) {
|
||||
|
@ -700,9 +705,12 @@ int32_t cfgLoadFromEnvCmd(SConfig *pConfig, const char **envCmd) {
|
|||
if (vlen3 != 0) value3[vlen3] = 0;
|
||||
}
|
||||
|
||||
cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_CMD);
|
||||
if (value2 != NULL && value3 != NULL && value2[0] != 0 && value3[0] != 0 && strcasecmp(name, "dataDir") == 0) {
|
||||
cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_ENV_CMD);
|
||||
code = cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_ENV_CMD);
|
||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
} else {
|
||||
code = cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_CMD);
|
||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -713,6 +721,7 @@ int32_t cfgLoadFromEnvCmd(SConfig *pConfig, const char **envCmd) {
|
|||
int32_t cfgLoadFromEnvFile(SConfig *pConfig, const char *envFile) {
|
||||
char *line = NULL, *name, *value, *value2, *value3;
|
||||
int32_t olen, vlen, vlen2, vlen3;
|
||||
int32_t code = 0;
|
||||
ssize_t _bytes = 0;
|
||||
|
||||
const char *filepath = ".env";
|
||||
|
@ -761,9 +770,12 @@ int32_t cfgLoadFromEnvFile(SConfig *pConfig, const char *envFile) {
|
|||
if (vlen3 != 0) value3[vlen3] = 0;
|
||||
}
|
||||
|
||||
cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_FILE);
|
||||
if (value2 != NULL && value3 != NULL && value2[0] != 0 && value3[0] != 0 && strcasecmp(name, "dataDir") == 0) {
|
||||
cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_ENV_FILE);
|
||||
code = cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_ENV_FILE);
|
||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
} else {
|
||||
code = cfgSetItem(pConfig, name, value, CFG_STYPE_ENV_FILE);
|
||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -819,11 +831,12 @@ int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath) {
|
|||
if (vlen3 != 0) value3[vlen3] = 0;
|
||||
}
|
||||
|
||||
code = cfgSetItem(pConfig, name, value, CFG_STYPE_CFG_FILE);
|
||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
if (value2 != NULL && value3 != NULL && value2[0] != 0 && value3[0] != 0 && strcasecmp(name, "dataDir") == 0) {
|
||||
code = cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_CFG_FILE);
|
||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
} else {
|
||||
code = cfgSetItem(pConfig, name, value, CFG_STYPE_CFG_FILE);
|
||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -839,9 +852,75 @@ int32_t cfgLoadFromCfgFile(SConfig *pConfig, const char *filepath) {
|
|||
}
|
||||
}
|
||||
|
||||
// int32_t cfgLoadFromCfgText(SConfig *pConfig, const char *configText) {
|
||||
// char *line = NULL, *name, *value, *value2, *value3;
|
||||
// int32_t olen, vlen, vlen2, vlen3;
|
||||
// ssize_t _bytes = 0;
|
||||
// int32_t code = 0;
|
||||
|
||||
// TdFilePtr pFile = taosOpenFile(filepath, TD_FILE_READ | TD_FILE_STREAM);
|
||||
// if (pFile == NULL) {
|
||||
// // success when the file does not exist
|
||||
// if (errno == ENOENT) {
|
||||
// terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
// uInfo("failed to load from cfg file %s since %s, use default parameters", filepath, terrstr());
|
||||
// return 0;
|
||||
// } else {
|
||||
// uError("failed to load from cfg file %s since %s", filepath, terrstr());
|
||||
// return -1;
|
||||
// }
|
||||
// }
|
||||
|
||||
// while (!taosEOFFile(pFile)) {
|
||||
// name = value = value2 = value3 = NULL;
|
||||
// olen = vlen = vlen2 = vlen3 = 0;
|
||||
|
||||
// _bytes = taosGetLineFile(pFile, &line);
|
||||
// if (_bytes <= 0) {
|
||||
// break;
|
||||
// }
|
||||
|
||||
// if(line[_bytes - 1] == '\n') line[_bytes - 1] = 0;
|
||||
|
||||
// paGetToken(line, &name, &olen);
|
||||
// if (olen == 0) continue;
|
||||
// name[olen] = 0;
|
||||
|
||||
// paGetToken(name + olen + 1, &value, &vlen);
|
||||
// if (vlen == 0) continue;
|
||||
// value[vlen] = 0;
|
||||
|
||||
// paGetToken(value + vlen + 1, &value2, &vlen2);
|
||||
// if (vlen2 != 0) {
|
||||
// value2[vlen2] = 0;
|
||||
// paGetToken(value2 + vlen2 + 1, &value3, &vlen3);
|
||||
// if (vlen3 != 0) value3[vlen3] = 0;
|
||||
// }
|
||||
|
||||
// code = cfgSetItem(pConfig, name, value, CFG_STYPE_CFG_FILE);
|
||||
// if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
// if (value2 != NULL && value3 != NULL && value2[0] != 0 && value3[0] != 0 && strcasecmp(name, "dataDir") == 0) {
|
||||
// code = cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_CFG_FILE);
|
||||
// if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
// }
|
||||
// }
|
||||
|
||||
// taosCloseFile(&pFile);
|
||||
// if (line != NULL) taosMemoryFreeClear(line);
|
||||
|
||||
// if (code == 0 || (code != 0 && terrno == TSDB_CODE_CFG_NOT_FOUND)) {
|
||||
// uInfo("load from cfg file %s success", filepath);
|
||||
// return 0;
|
||||
// } else {
|
||||
// uError("failed to load from cfg file %s since %s", filepath, terrstr());
|
||||
// return -1;
|
||||
// }
|
||||
// }
|
||||
|
||||
int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) {
|
||||
char *cfgLineBuf = NULL, *name, *value, *value2, *value3;
|
||||
int32_t olen, vlen, vlen2, vlen3;
|
||||
int32_t code = 0;
|
||||
if (url == NULL || strlen(url) == 0) {
|
||||
uInfo("fail to load apoll url");
|
||||
return 0;
|
||||
|
@ -916,9 +995,12 @@ int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) {
|
|||
paGetToken(value2 + vlen2 + 1, &value3, &vlen3);
|
||||
if (vlen3 != 0) value3[vlen3] = 0;
|
||||
}
|
||||
cfgSetItem(pConfig, name, value, CFG_STYPE_APOLLO_URL);
|
||||
if (value2 != NULL && value3 != NULL && value2[0] != 0 && value3[0] != 0 && strcasecmp(name, "dataDir") == 0) {
|
||||
cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_APOLLO_URL);
|
||||
code = cfgSetTfsItem(pConfig, name, value, value2, value3, CFG_STYPE_APOLLO_URL);
|
||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
} else {
|
||||
code = cfgSetItem(pConfig, name, value, CFG_STYPE_APOLLO_URL);
|
||||
if (code != 0 && terrno != TSDB_CODE_CFG_NOT_FOUND) break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -336,6 +336,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_NOT_EXIST, "Table does not exists
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_INVALID_TABLE_ACTION, "Invalid table action")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_TABLE_COL_NOT_EXISTS, "Table column not exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_SUBSCRIBED, "Table column is subscribed")
|
||||
|
||||
// tsdb
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")
|
||||
|
|
|
@ -1,91 +0,0 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from util.log import tdLog
|
||||
from util.cases import tdCases
|
||||
from util.sql import tdSql
|
||||
|
||||
class TDTestCase:
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor(), logSql)
|
||||
|
||||
self.ts = 1538548685000
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
|
||||
# test case for https://jira.taosdata.com:18080/browse/TD-3679
|
||||
print("==============step1")
|
||||
tdSql.execute(
|
||||
"create topic tq_test partitions 10")
|
||||
tdSql.execute(
|
||||
"insert into tq_test.p1(off, ts, content) values(0, %d, 'aaaa')" % self.ts)
|
||||
tdSql.execute(
|
||||
"insert into tq_test.p1(off, ts, content) values(1, %d, 'aaaa')" % (self.ts + 1))
|
||||
tdSql.execute(
|
||||
"insert into tq_test.p1(off, ts, content) values(2, %d, 'aaaa')" % (self.ts + 2))
|
||||
tdSql.execute(
|
||||
"insert into tq_test.p1(off, ts, content) values(3, %d, 'aaaa')" % (self.ts + 3))
|
||||
|
||||
print("==============step2")
|
||||
|
||||
tdSql.query("select * from tq_test.p1")
|
||||
tdSql.checkRows(4)
|
||||
|
||||
tdSql.query("select * from tq_test.p1 where ts >= %d" % self.ts)
|
||||
tdSql.checkRows(4)
|
||||
|
||||
tdSql.query("select * from tq_test.p1 where ts > %d" % self.ts)
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query("select * from tq_test.p1 where ts = %d" % self.ts)
|
||||
tdSql.checkRows(1)
|
||||
|
||||
|
||||
tdSql.execute("use db")
|
||||
tdSql.execute("create table test(ts timestamp, start timestamp, value int)")
|
||||
tdSql.execute("insert into test values(%d, %d, 1)" % (self.ts, self.ts))
|
||||
tdSql.execute("insert into test values(%d, %d, 1)" % (self.ts + 1, self.ts + 1))
|
||||
tdSql.execute("insert into test values(%d, %d, 1)" % (self.ts + 2, self.ts + 2))
|
||||
tdSql.execute("insert into test values(%d, %d, 1)" % (self.ts + 3, self.ts + 3))
|
||||
|
||||
tdSql.query("select * from test")
|
||||
tdSql.checkRows(4)
|
||||
|
||||
tdSql.query("select * from test where ts >= %d" % self.ts)
|
||||
tdSql.checkRows(4)
|
||||
|
||||
tdSql.query("select * from test where ts > %d" % self.ts)
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query("select * from test where ts = %d" % self.ts)
|
||||
tdSql.checkRows(1)
|
||||
|
||||
tdSql.query("select * from test where start >= %d" % self.ts)
|
||||
tdSql.checkRows(4)
|
||||
|
||||
tdSql.query("select * from test where start > %d" % self.ts)
|
||||
tdSql.checkRows(3)
|
||||
|
||||
tdSql.query("select * from test where start = %d" % self.ts)
|
||||
tdSql.checkRows(1)
|
||||
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -89,13 +89,13 @@
|
|||
./test.sh -f tsim/parser/alter_column.sim
|
||||
./test.sh -f tsim/parser/alter_stable.sim
|
||||
./test.sh -f tsim/parser/alter.sim
|
||||
# ./test.sh -f tsim/parser/alter1.sim
|
||||
# TD-17661 ./test.sh -f tsim/parser/alter1.sim
|
||||
./test.sh -f tsim/parser/auto_create_tb_drop_tb.sim
|
||||
./test.sh -f tsim/parser/auto_create_tb.sim
|
||||
./test.sh -f tsim/parser/between_and.sim
|
||||
./test.sh -f tsim/parser/binary_escapeCharacter.sim
|
||||
# ./test.sh -f tsim/parser/col_arithmetic_operation.sim
|
||||
# ./test.sh -f tsim/parser/columnValue.sim
|
||||
# TD-17738 ./test.sh -f tsim/parser/col_arithmetic_operation.sim
|
||||
# TD-17661 ./test.sh -f tsim/parser/columnValue.sim
|
||||
./test.sh -f tsim/parser/commit.sim
|
||||
# TD-17661 ./test.sh -f tsim/parser/condition.sim
|
||||
./test.sh -f tsim/parser/constCol.sim
|
||||
|
@ -112,9 +112,9 @@
|
|||
./test.sh -f tsim/parser/fourArithmetic-basic.sim
|
||||
# TD-17659 ./test.sh -f tsim/parser/function.sim
|
||||
./test.sh -f tsim/parser/groupby-basic.sim
|
||||
# ./test.sh -f tsim/parser/groupby.sim
|
||||
./test.sh -f tsim/parser/groupby.sim
|
||||
# TD-17622 ./test.sh -f tsim/parser/having_child.sim
|
||||
# ./test.sh -f tsim/parser/having.sim
|
||||
# TD-17622 ./test.sh -f tsim/parser/having.sim
|
||||
./test.sh -f tsim/parser/import_commit1.sim
|
||||
./test.sh -f tsim/parser/import_commit2.sim
|
||||
./test.sh -f tsim/parser/import_commit3.sim
|
||||
|
@ -122,49 +122,46 @@
|
|||
./test.sh -f tsim/parser/import.sim
|
||||
./test.sh -f tsim/parser/insert_multiTbl.sim
|
||||
./test.sh -f tsim/parser/insert_tb.sim
|
||||
# ./test.sh -f tsim/parser/interp.sim
|
||||
# TD-17038 ./test.sh -f tsim/parser/interp.sim
|
||||
./test.sh -f tsim/parser/join_manyblocks.sim
|
||||
# ./test.sh -f tsim/parser/join_multitables.sim
|
||||
# TD-17713 ./test.sh -f tsim/parser/join_multitables.sim
|
||||
# TD-17713 ./test.sh -f tsim/parser/join_multivnode.sim
|
||||
# TD-17707 ./test.sh -f tsim/parser/join.sim
|
||||
./test.sh -f tsim/parser/last_cache.sim
|
||||
./test.sh -f tsim/parser/last_groupby.sim
|
||||
# TD-17675 ./test.sh -f tsim/parser/lastrow.sim
|
||||
# TD-17722 ./test.sh -f tsim/parser/lastrow.sim
|
||||
./test.sh -f tsim/parser/like.sim
|
||||
# ./test.sh -f tsim/parser/limit.sim
|
||||
# ./test.sh -f tsim/parser/limit1.sim
|
||||
# ./test.sh -f tsim/parser/limit2.sim
|
||||
# TD-17464 ./test.sh -f tsim/parser/limit.sim
|
||||
# TD-17464 ./test.sh -f tsim/parser/limit1.sim
|
||||
# TD-17623 ./test.sh -f tsim/parser/limit2.sim
|
||||
./test.sh -f tsim/parser/mixed_blocks.sim
|
||||
./test.sh -f tsim/parser/nchar.sim
|
||||
# TD-17703 ./test.sh -f tsim/parser/nestquery.sim
|
||||
# ./test.sh -f tsim/parser/null_char.sim
|
||||
# TD-17685 ./test.sh -f tsim/parser/null_char.sim
|
||||
./test.sh -f tsim/parser/precision_ns.sim
|
||||
./test.sh -f tsim/parser/projection_limit_offset.sim
|
||||
./test.sh -f tsim/parser/regex.sim
|
||||
./test.sh -f tsim/parser/select_across_vnodes.sim
|
||||
./test.sh -f tsim/parser/select_distinct_tag.sim
|
||||
./test.sh -f tsim/parser/select_from_cache_disk.sim
|
||||
# ./test.sh -f tsim/parser/select_with_tags.sim
|
||||
# TD-17659 ./test.sh -f tsim/parser/select_with_tags.sim
|
||||
./test.sh -f tsim/parser/selectResNum.sim
|
||||
# TD-17685 ./test.sh -f tsim/parser/set_tag_vals.sim
|
||||
./test.sh -f tsim/parser/single_row_in_tb.sim
|
||||
# TD-17684 ./test.sh -f tsim/parser/sliding.sim
|
||||
# ./test.sh -f tsim/parser/slimit_alter_tags.sim
|
||||
# ./test.sh -f tsim/parser/slimit.sim
|
||||
# ./test.sh -f tsim/parser/slimit1.sim
|
||||
# TD-17722 ./test.sh -f tsim/parser/slimit_alter_tags.sim
|
||||
# TD-17722 ./test.sh -f tsim/parser/slimit.sim
|
||||
# TD-17722 ./test.sh -f tsim/parser/slimit1.sim
|
||||
./test.sh -f tsim/parser/stableOp.sim
|
||||
# ./test.sh -f tsim/parser/tags_dynamically_specifiy.sim
|
||||
# ./test.sh -f tsim/parser/tags_filter.sim
|
||||
# TD-17661 ./test.sh -f tsim/parser/tags_dynamically_specifiy.sim
|
||||
# TD-17661 ./test.sh -f tsim/parser/tags_filter.sim
|
||||
./test.sh -f tsim/parser/tbnameIn.sim
|
||||
./test.sh -f tsim/parser/timestamp.sim
|
||||
./test.sh -f tsim/parser/top_groupby.sim
|
||||
./test.sh -f tsim/parser/topbot.sim
|
||||
# ./test.sh -f tsim/parser/udf_dll_stable.sim
|
||||
# ./test.sh -f tsim/parser/udf_dll.sim
|
||||
# ./test.sh -f tsim/parser/udf.sim
|
||||
./test.sh -f tsim/parser/union.sim
|
||||
# TD-17704 ./test.sh -f tsim/parser/union_sysinfo.sim
|
||||
# ./test.sh -f tsim/parser/where.sim
|
||||
# TD-17661 ./test.sh -f tsim/parser/where.sim
|
||||
|
||||
# ---- query
|
||||
./test.sh -f tsim/query/interval.sim
|
||||
|
@ -325,7 +322,7 @@
|
|||
./test.sh -f tsim/vnode/stable_replica3_vnode3.sim
|
||||
|
||||
# --- sync
|
||||
# ./test.sh -f tsim/sync/3Replica1VgElect.sim
|
||||
./test.sh -f tsim/sync/3Replica1VgElect.sim
|
||||
./test.sh -f tsim/sync/3Replica5VgElect.sim
|
||||
./test.sh -f tsim/sync/oneReplica1VgElect.sim
|
||||
./test.sh -f tsim/sync/oneReplica5VgElect.sim
|
||||
|
@ -423,18 +420,18 @@
|
|||
./test.sh -f tsim/tag/bool_binary.sim
|
||||
./test.sh -f tsim/tag/bool_int.sim
|
||||
./test.sh -f tsim/tag/bool.sim
|
||||
# ./test.sh -f tsim/tag/change.sim
|
||||
# ./test.sh -f tsim/tag/column.sim
|
||||
# ./test.sh -f tsim/tag/commit.sim
|
||||
# ./test.sh -f tsim/tag/create.sim
|
||||
# /test.sh -f tsim/tag/delete.sim
|
||||
# ./test.sh -f tsim/tag/double.sim
|
||||
# ./test.sh -f tsim/tag/filter.sim
|
||||
# TD-17661 ./test.sh -f tsim/tag/change.sim
|
||||
./test.sh -f tsim/tag/column.sim
|
||||
./test.sh -f tsim/tag/commit.sim
|
||||
# TD-17661 ./test.sh -f tsim/tag/create.sim
|
||||
# TD-17661 ./test.sh -f tsim/tag/delete.sim
|
||||
# TD-17661 ./test.sh -f tsim/tag/double.sim
|
||||
# TD-17661 ./test.sh -f tsim/tag/filter.sim
|
||||
# TD-17407 ./test.sh -f tsim/tag/float.sim
|
||||
./test.sh -f tsim/tag/int_binary.sim
|
||||
./test.sh -f tsim/tag/int_float.sim
|
||||
./test.sh -f tsim/tag/int.sim
|
||||
# ./test.sh -f tsim/tag/set.sim
|
||||
# TD-17661 ./test.sh -f tsim/tag/set.sim
|
||||
./test.sh -f tsim/tag/smallint.sim
|
||||
./test.sh -f tsim/tag/tinyint.sim
|
||||
|
||||
|
|
|
@ -36,28 +36,22 @@ sql select c1 *( 2 / 3 ), c1/c1 from $tb order by ts asc;
|
|||
if $rows != 10000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 0.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
#if $data01 != -nan then
|
||||
# print expect -nan, actual: $data01
|
||||
# return -1
|
||||
#endi
|
||||
|
||||
if $data10 != 0.666666667 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data11 != 1.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data90 != 6.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data91 != 1.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
@ -66,65 +60,49 @@ sql select (c1 * 2) % 7.9, c1*1, c1*1*1, c1*c1, c1*c1*c1 from $tb order by ts de
|
|||
if $rows != 10000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 2.200000000 then
|
||||
print expect 2.200000000, actual:$data00
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 9.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data02 != 9.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data03 != 81.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data04 != 729.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
||||
if $data10 != 0.200000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data11 != 8.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data12 != 8.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data13 != 64.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data14 != 512.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data90 != 0.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data91 != 0.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data92 != 0.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data93 != 0.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data94 != 0.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
@ -134,20 +112,16 @@ sql select c1 * c2 /4 from $tb where ts < 1537166000000 and ts > 1537156000000
|
|||
if $rows != 17 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 12.250000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data10 != 16.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data20 != 20.250000000 then
|
||||
print expect 20.250000000, actual:$data21
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data30 != 0.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
@ -180,47 +154,36 @@ sql select c2-c1*1.1, c3/c2, c4*c3, c5%c4, (c6+c4)%22, c2-c2 from $tb
|
|||
if $rows != 10000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 0.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
#if $data01 != -nan then
|
||||
# return -1
|
||||
#endi
|
||||
|
||||
if $data02 != 0.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data03 != NULL then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data04 != 0.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data05 != 0.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data90 != -0.900000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data91 != 1.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data92 != 81.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data93 != 0.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data94 != 18.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
@ -237,10 +200,8 @@ sql select c8+c7, c9+c9+c8+c7/c6 from $tb
|
|||
|
||||
# arithmetic expression in join [d.7]==================================================
|
||||
|
||||
|
||||
# arithmetic expression in union [d.8]=================================================
|
||||
|
||||
|
||||
# arithmetic expression in group by [d.9]==============================================
|
||||
# in group by tag, not support for normal table
|
||||
sql_error select c5*99 from $tb group by t1
|
||||
|
@ -248,17 +209,14 @@ sql_error select c5*99 from $tb group by t1
|
|||
# in group by column
|
||||
sql_error select c6-(c6+c3)*12 from $tb group by c3;
|
||||
|
||||
|
||||
# limit offset [d.10]==================================================================
|
||||
sql select c6 * c1 + 12 from $tb limit 12 offset 99;
|
||||
if $rows != 12 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 93.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data90 != 76.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
@ -267,7 +225,6 @@ sql select c4 / 99.123 from $tb limit 10 offset 9999;
|
|||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 0.090796283 then
|
||||
return -1
|
||||
endi
|
||||
|
@ -283,27 +240,21 @@ sql select c1, c2+c6, 12.9876545678, 1, 1.1 from $tb
|
|||
if $rows != 10000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 0 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 0.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data02 != 12.987654568 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data03 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data04 != 1.100000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data10 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
@ -313,27 +264,21 @@ sql select c1, c2+c6, 12.9876545678, 1, 1.1 from $tb where c1<2
|
|||
if $rows != 2000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 0 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 0.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data02 != 12.987654568 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data03 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data10 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data20 != 0 then
|
||||
return -1
|
||||
endi
|
||||
|
@ -377,7 +322,6 @@ sql select first(c1) * ( 2 / 3 ) from $stb order by ts asc;
|
|||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 0.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
@ -386,7 +330,6 @@ sql select first(c1) * (2/99) from $stb order by ts desc;
|
|||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 0.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
@ -395,15 +338,12 @@ sql select (count(c1) * 2) % 7.9, (count(c1) * 2), ( count(1)*2) from $stb
|
|||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 1.800000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 100000.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data02 != 200000.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
@ -412,16 +352,13 @@ sql select spread( c1 )/44, spread(c1), 0.204545455 * 44 from $stb
|
|||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 0.204545455 then
|
||||
print expect 0.204545455, actual: $data00
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 9.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data02 != 9.000000020 then
|
||||
return -1
|
||||
endi
|
||||
|
@ -431,27 +368,21 @@ sql select min(c1) * max(c2) /4, sum(c1) * apercentile(c2, 20), apercentile(c4,
|
|||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 0.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 225000.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data02 != 8.077777778 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data03 != NULL then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data04 != 0.444444444 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data05 != 450000.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
@ -487,35 +418,29 @@ sql_error select top(c1, 99) - bottom(c1, 99) from $stb
|
|||
sql select c2-c1, c3/c2, c4*c3, c5%c4, c6+99%22 from $stb
|
||||
|
||||
# error case, ts/bool/binary/nchar not support arithmetic expression
|
||||
sql select first(c7)*12 from $stb
|
||||
sql select last(c8)/55 from $stb
|
||||
sql_error select last_row(c9) + last_row(c8) from $stb
|
||||
sql select first(c7)*12 from $stb
|
||||
sql select last(c8)/55 from $stb
|
||||
sql select last_row(c9) + last_row(c8) from $stb
|
||||
|
||||
# arithmetic expression in join [d.7]===============================================================
|
||||
|
||||
|
||||
# arithmetic expression in union [d.8]===============================================================
|
||||
|
||||
|
||||
# arithmetic expression in group by [d.9]===============================================================
|
||||
# in group by tag
|
||||
sql select avg(c4)*99 from $stb group by t1
|
||||
sql select avg(c4)*99, t1 from $stb group by t1 order by t1
|
||||
if $rows != 10 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 445.500000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 0 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data90 != 445.500000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data91 != 9 then
|
||||
return -1
|
||||
endi
|
||||
|
@ -550,22 +475,19 @@ endi
|
|||
# return -1
|
||||
#endi
|
||||
#
|
||||
sql_error select first(c6) - last(c6) *12 / count(*) from $stb group by c3;
|
||||
sql select first(c6) - last(c6) *12 / count(*) from $stb group by c3;
|
||||
|
||||
sql select first(c6) - last(c6) *12 / count(*) from $stb group by c5;
|
||||
if $rows != 10 then
|
||||
sql select first(c6) - last(c6) *12 / count(*) from $stb group by c5 order by c5;
|
||||
if $rows != 11 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 0.000000000 then
|
||||
if $data10 != 0.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data10 != 0.997600000 then
|
||||
if $data20 != 0.997600000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data90 != 8.978400000 then
|
||||
if $data90 != 7.980800000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -574,7 +496,6 @@ sql select first(c6) - sum(c6) + 12 from $stb limit 12 offset 0;
|
|||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != -449988.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
@ -604,10 +525,8 @@ sql_error select first(c1) from $stb fill(value, 20);
|
|||
|
||||
# constant column. [d.13]===============================================================
|
||||
|
||||
|
||||
# column value filter [d.14]===============================================================
|
||||
|
||||
|
||||
# tag filter. [d.15]===============================================================
|
||||
sql select sum(c2)+99 from $stb where t1=12;
|
||||
|
||||
|
@ -633,7 +552,6 @@ sql select avg(c2)*count(c2), sum(c3)-first(c3), last(c4)+9 from $stb interval(1
|
|||
if $rows != 10000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != @18-09-17 09:00:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
|
@ -645,11 +563,9 @@ sql_error select first(c7)- last(c1) from $tb interval(2y)
|
|||
|
||||
# first/last query [d.19]===============================================================
|
||||
|
||||
|
||||
# multiple retrieve [d.20]===============================================================
|
||||
sql select c2-c2 from $tb
|
||||
|
||||
|
||||
sql select first(c1)-last(c1), spread(c2), max(c3) - min(c3), avg(c4)*count(c4) from $tb
|
||||
|
||||
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
sleep 100
|
||||
sql connect
|
||||
sql create database if not exists db
|
||||
sql use db
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
sleep 100
|
||||
sql connect
|
||||
sql create database if not exists db
|
||||
sql use db
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
####
|
||||
sleep 100
|
||||
sql connect
|
||||
sql create database if not exists db
|
||||
sql use db
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
####
|
||||
sleep 100
|
||||
sql connect
|
||||
sql create database if not exists db
|
||||
sql use db
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
sleep 100
|
||||
sql connect
|
||||
sql create database if not exists db
|
||||
sql use db
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
sleep 100
|
||||
sql connect
|
||||
sql create database if not exists db
|
||||
sql use db
|
||||
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
sleep 100
|
||||
sql connect
|
||||
sql create database if not exists db
|
||||
sql use db
|
||||
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
sleep 100
|
||||
sql connect
|
||||
sql create database if not exists db
|
||||
sql use db
|
||||
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
sleep 100
|
||||
sql connect
|
||||
|
||||
$dbPrefix = first_db
|
||||
|
|
|
@ -3,25 +3,6 @@ system sh/deploy.sh -n dnode1 -i 1
|
|||
system sh/exec.sh -n dnode1 -s start
|
||||
sql connect
|
||||
|
||||
$loop_cnt = 0
|
||||
check_dnode_ready:
|
||||
$loop_cnt = $loop_cnt + 1
|
||||
sleep 200
|
||||
if $loop_cnt == 10 then
|
||||
print ====> dnode not ready!
|
||||
return -1
|
||||
endi
|
||||
sql show dnodes
|
||||
print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05
|
||||
if $data00 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data04 != ready then
|
||||
goto check_dnode_ready
|
||||
endi
|
||||
|
||||
sql connect
|
||||
|
||||
$dbPrefix = group_db
|
||||
$tbPrefix = group_tb
|
||||
$mtPrefix = group_mt
|
||||
|
@ -80,8 +61,6 @@ while $i < $tbNum
|
|||
$tstart = 1640966400000
|
||||
endw
|
||||
|
||||
sleep 100
|
||||
|
||||
$i1 = 1
|
||||
$i2 = 0
|
||||
|
||||
|
@ -752,12 +731,7 @@ sql insert into tm1 values('2020-2-1 1:1:1', 2, 10);
|
|||
sql insert into tm1 values('2020-2-1 1:1:2', 2, 20);
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
sleep 100
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 100
|
||||
|
||||
sql connect
|
||||
sleep 100
|
||||
sql use group_db0;
|
||||
|
||||
print =========================>TD-4894
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
sleep 100
|
||||
sql connect
|
||||
|
||||
$dbPrefix = intp_db
|
||||
|
|
|
@ -54,8 +54,6 @@ while $i < $tbNum
|
|||
$tstart = 100000
|
||||
endw
|
||||
|
||||
sleep 100
|
||||
|
||||
$tstart = 100000
|
||||
$mt = $mtPrefix . 1 . $i
|
||||
sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int, t2 binary(12), t3 int)
|
||||
|
@ -99,8 +97,6 @@ while $i < $tbNum
|
|||
$tstart = 100000
|
||||
endw
|
||||
|
||||
sleep 100
|
||||
|
||||
$i1 = 1
|
||||
$i2 = 0
|
||||
|
||||
|
|
|
@ -62,11 +62,8 @@ run tsim/parser/limit_stb.sim
|
|||
|
||||
print ================== restart server to commit data into disk
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
sleep 500
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
print ================== server restart completed
|
||||
sql connect
|
||||
sleep 100
|
||||
|
||||
run tsim/parser/limit_tb.sim
|
||||
run tsim/parser/limit_stb.sim
|
||||
|
|
|
@ -18,7 +18,7 @@ $stb = $stbPrefix . $i
|
|||
|
||||
sql drop database $db -x step1
|
||||
step1:
|
||||
sql create database $db cache 16
|
||||
sql create database $db
|
||||
print ====== create tables
|
||||
sql use $db
|
||||
sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 int)
|
||||
|
|
|
@ -370,7 +370,8 @@ sql select top(c1, 1) from $tb where ts >= $ts0 and ts <= $tsu limit 5 offset 1
|
|||
if $rows != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql select top(c1, 5) from $tb where ts >= $ts0 and ts <= $tsu limit 3 offset 1
|
||||
|
||||
sql select ts, top(c1, 5) from $tb where ts >= $ts0 and ts <= $tsu order by ts limit 3 offset 1
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
@ -392,6 +393,7 @@ endi
|
|||
if $data21 != 9 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select top(c1, 5) from $tb where ts >= $ts0 and ts <= $tsu limit 3 offset 5
|
||||
if $rows != 0 then
|
||||
return -1
|
||||
|
@ -401,7 +403,8 @@ sql select bottom(c1, 1) from $tb where ts >= $ts0 and ts <= $tsu limit 5 offset
|
|||
if $rows != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql select bottom(c1, 5) from $tb where ts >= $ts0 and ts <= $tsu limit 3 offset 1
|
||||
|
||||
sql select ts, bottom(c1, 5) from $tb where ts >= $ts0 and ts <= $tsu order by ts limit 3 offset 1
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
|
|
@ -61,11 +61,10 @@ while $i < $halfNum
|
|||
endw
|
||||
print ====== tables created
|
||||
|
||||
#run tsim/parser/limit2_query.sim
|
||||
run tsim/parser/limit2_query.sim
|
||||
|
||||
print ================== restart server to commit data into disk
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
sleep 100
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
print ================== server restart completed
|
||||
|
||||
|
|
|
@ -27,8 +27,8 @@ print select count(*) from $stb where t1 > $val1 and t1 < $val2 group by t1, t2,
|
|||
sql select count(*), t1, t2, t3, t4, t5, t6 from $stb where t1 > $val1 and t1 < $val2 group by t1, t2, t3, t4, t5, t6 order by t1 asc limit 1 offset 0
|
||||
$val = $tbNum - 3
|
||||
|
||||
print $rows $val
|
||||
if $rows != $val then
|
||||
print $rows
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != $rowNum then
|
||||
|
@ -51,7 +51,7 @@ if $data05 != 2 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql select count(*) from $stb where t2 like '%' and t1 > 2 and t1 < 5 group by t3, t4 order by t3 desc limit 1 offset 0
|
||||
sql select count(*), t3, t4 from $stb where t2 like '%' and t1 > 2 and t1 < 5 group by t3, t4 order by t3 desc limit 2 offset 0
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
@ -70,15 +70,17 @@ endi
|
|||
if $data12 != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select count(*) from $stb where t2 like '%' and t1 > 2 and t1 < 5 group by t3, t4 order by t3 desc limit 1 offset 1
|
||||
if $rows != 0 then
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
## TBASE-348
|
||||
sql_error select count(*) from $stb where t1 like 1
|
||||
|
||||
##### aggregation on tb + where + fill + limit offset
|
||||
sql select max(c1) from $tb where ts >= $ts0 and ts <= $tsu interval(5m) fill(value, -1, -2) limit 10 offset 1
|
||||
sql select _wstart, max(c1) from $tb where ts >= $ts0 and ts <= $tsu interval(5m) fill(value, -1, -2) limit 10 offset 1
|
||||
if $rows != 10 then
|
||||
return -1
|
||||
endi
|
||||
|
|
|
@ -358,8 +358,8 @@ endi
|
|||
print ========> TD-6017
|
||||
sql select * from (select ts, top(c1, 5) from $tb where ts >= $ts0 and ts <= $tsu order by ts desc limit 3 offset 1)
|
||||
|
||||
sql select top(c1, 5) from $tb where ts >= $ts0 and ts <= $tsu order by ts desc limit 3 offset 1
|
||||
print select top(c1, 5) from $tb where ts >= $ts0 and ts <= $tsu order by ts desc limit 3 offset 1
|
||||
sql select ts, top(c1, 5) from $tb where ts >= $ts0 and ts <= $tsu order by ts desc limit 3 offset 1
|
||||
print select ts, top(c1, 5) from $tb where ts >= $ts0 and ts <= $tsu order by ts desc limit 3 offset 1
|
||||
print $data00 $data01
|
||||
print $data10 $data11
|
||||
print $data20 $data21
|
||||
|
@ -386,7 +386,7 @@ if $data21 != 6 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql select top(c1, 5) from $tb where ts >= $ts0 and ts <= $tsu order by ts asc limit 3 offset 1
|
||||
sql select ts, top(c1, 5) from $tb where ts >= $ts0 and ts <= $tsu order by ts asc limit 3 offset 1
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
@ -418,7 +418,7 @@ sql select bottom(c1, 1) from $tb where ts >= $ts0 and ts <= $tsu limit 5 offset
|
|||
if $rows != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql select bottom(c1, 5) from $tb where ts >= $ts0 and ts <= $tsu limit 3 offset 1
|
||||
sql select ts, bottom(c1, 5) from $tb where ts >= $ts0 and ts <= $tsu order by ts limit 3 offset 1
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
@ -482,7 +482,7 @@ endi
|
|||
if $data41 != 4 then
|
||||
return -1
|
||||
endi
|
||||
sql select max(c1), max(c2), max(c3), max(c4), max(c5), max(c6) from $tb where ts >= $ts0 and ts <= $tsu interval(5m) limit 5 offset 1
|
||||
sql select _wstart, max(c1), max(c2), max(c3), max(c4), max(c5), max(c6) from $tb where ts >= $ts0 and ts <= $tsu interval(5m) limit 5 offset 1
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
|
@ -518,7 +518,7 @@ if $data41 != 5 then
|
|||
endi
|
||||
|
||||
## TBASE-334
|
||||
sql select max(c1), max(c2), max(c3), max(c4), max(c5), max(c6) from $tb where ts >= $ts0 and ts <= $tsu interval(30m) limit 2 offset 1
|
||||
sql select _wstart, max(c1), max(c2), max(c3), max(c4), max(c5), max(c6) from $tb where ts >= $ts0 and ts <= $tsu interval(30m) limit 2 offset 1
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
@ -634,7 +634,8 @@ sql select stddev(c1), stddev(c2), stddev(c3), stddev(c4), stddev(c5), stddev(c6
|
|||
if $rows != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql select stddev(c1), stddev(c2), stddev(c3), stddev(c4), stddev(c5), stddev(c6) from $tb where ts >= $ts0 and ts <= $tsu interval(30m) limit 5 offset 1
|
||||
|
||||
sql select _wstart, stddev(c1), stddev(c2), stddev(c3), stddev(c4), stddev(c5), stddev(c6) from $tb where ts >= $ts0 and ts <= $tsu interval(30m) limit 5 offset 1
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
@ -670,6 +671,7 @@ endi
|
|||
if $data31 != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select count(c1), count(c2), count(c3), count(c4), count(c5), count(c6) from $tb where ts >= $ts0 and ts <= $tsu interval(27m) limit 5 offset 1
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
|
@ -707,7 +709,8 @@ sql select first(c1), first(c2), first(c3), first(c4), first(c5), first(c6) from
|
|||
if $rows != 0 then
|
||||
return -1
|
||||
endi
|
||||
sql select first(c1), first(c2), first(c3), first(c4), first(c5), first(c6) from $tb where ts >= $ts0 and ts <= $tsu interval(30m) limit 3 offset 1
|
||||
|
||||
sql select _wstart, first(c1), first(c2), first(c3), first(c4), first(c5), first(c6) from $tb where ts >= $ts0 and ts <= $tsu interval(30m) limit 3 offset 1
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
@ -721,7 +724,6 @@ if $data23 != 9.00000 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
|
||||
sql select last(c1), last(c2), last(c3), last(c4), last(c5), last(c6) from $tb where ts >= $ts0 and ts <= $tsu limit 5 offset 1
|
||||
if $rows != 0 then
|
||||
return -1
|
||||
|
|
|
@ -62,8 +62,6 @@ while $i < $half
|
|||
$tstart = 100000
|
||||
endw
|
||||
|
||||
sleep 100
|
||||
|
||||
$i1 = 1
|
||||
$i2 = 0
|
||||
|
||||
|
|
|
@ -28,7 +28,6 @@ run tsim/parser/single_row_in_tb_query.sim
|
|||
|
||||
print ================== restart server to commit data into disk
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
sleep 500
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
print ================== server restart completed
|
||||
|
||||
|
|
|
@ -52,11 +52,9 @@ run tsim/parser/slimit1_query.sim
|
|||
|
||||
print ================== restart server to commit data into disk
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
sleep 500
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
print ================== server restart completed
|
||||
sql connect
|
||||
sleep 100
|
||||
|
||||
run tsim/parser/slimit1_query.sim
|
||||
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
sleep 100
|
||||
sql connect
|
||||
|
||||
$dbPrefix = slm_alt_tg_db
|
||||
|
|
|
@ -1,10 +1,6 @@
|
|||
system sh/stop_dnodes.sh
|
||||
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/cfg.sh -n dnode1 -c walLevel -v 1
|
||||
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 2
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 100
|
||||
sql connect
|
||||
|
||||
$dbPrefix = slm_alt_tg_db
|
||||
|
@ -93,7 +89,6 @@ if $data02 != tb0 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sleep 500
|
||||
sql reset query cache
|
||||
sql select count(*), first(ts) from stb group by tg_added order by tg_added asc slimit 5 soffset 3
|
||||
if $rows != 5 then
|
||||
|
@ -171,11 +166,8 @@ endi
|
|||
|
||||
print ================== restart server to commit data into disk
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
sleep 500
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
print ================== server restart completed
|
||||
sql connect
|
||||
sleep 100
|
||||
|
||||
sql use $db
|
||||
### repeat above queries
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
sleep 100
|
||||
sql connect
|
||||
|
||||
$dbPrefix = slm_db
|
||||
|
|
|
@ -95,9 +95,6 @@ while $i < $tbNum
|
|||
$j = $j + 1
|
||||
endw
|
||||
|
||||
print sleep 1sec.
|
||||
sleep 100
|
||||
|
||||
$i = 1
|
||||
$tb = $tbPrefix . $i
|
||||
|
||||
|
|
|
@ -31,7 +31,7 @@ sql create table $tb using $mt tags( 0, '0' )
|
|||
|
||||
$i = 1
|
||||
$tb = $tbPrefix . $i
|
||||
sql create table $tb using $mt tags( 1, 1 )
|
||||
sql create table $tb using $mt tags( 1, '1' )
|
||||
|
||||
$i = 2
|
||||
$tb = $tbPrefix . $i
|
||||
|
@ -39,7 +39,7 @@ sql create table $tb using $mt tags( '2', '2' )
|
|||
|
||||
$i = 3
|
||||
$tb = $tbPrefix . $i
|
||||
sql create table $tb using $mt tags( '3', 3 )
|
||||
sql create table $tb using $mt tags( '3', '3' )
|
||||
|
||||
sql show tables
|
||||
if $rows != 4 then
|
||||
|
@ -54,7 +54,7 @@ sql insert into $tb values(now, 0, '0')
|
|||
|
||||
$i = 1
|
||||
$tb = $tbPrefix . $i
|
||||
sql insert into $tb values(now, 1, 1 )
|
||||
sql insert into $tb values(now, 1, '1' )
|
||||
|
||||
$i = 2
|
||||
$tb = $tbPrefix . $i
|
||||
|
@ -62,7 +62,7 @@ sql insert into $tb values(now, '2', '2')
|
|||
|
||||
$i = 3
|
||||
$tb = $tbPrefix . $i
|
||||
sql insert into $tb values(now, '3', 3)
|
||||
sql insert into $tb values(now, '3', '3')
|
||||
|
||||
print =============== step4
|
||||
sql select * from $mt where tgcol2 = '1'
|
||||
|
|
|
@ -249,8 +249,8 @@ sql alter table $mt add tag tgcol6 binary(10)
|
|||
|
||||
sql reset query cache
|
||||
sql alter table $tb set tag tgcol4=false
|
||||
sql alter table $tb set tag tgcol5=5
|
||||
sql alter table $tb set tag tgcol6=6
|
||||
sql alter table $tb set tag tgcol5='5'
|
||||
sql alter table $tb set tag tgcol6='6'
|
||||
sql reset query cache
|
||||
|
||||
sql select * from $mt where tgcol5 = '5'
|
||||
|
@ -321,7 +321,7 @@ if $data04 != 3 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql alter table $mt change tag tgcol1 tgcol4
|
||||
sql alter table $mt rename tag tgcol1 tgcol4
|
||||
sql alter table $mt drop tag tgcol2
|
||||
sql alter table $mt drop tag tgcol3
|
||||
sql alter table $mt add tag tgcol5 bigint
|
||||
|
@ -382,14 +382,14 @@ if $data04 != 3 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql alter table $mt change tag tgcol1 tgcol4
|
||||
sql alter table $mt rename tag tgcol1 tgcol4
|
||||
sql alter table $mt drop tag tgcol2
|
||||
sql alter table $mt drop tag tgcol3
|
||||
sql alter table $mt add tag tgcol5 binary(17)
|
||||
sql alter table $mt add tag tgcol6 bool
|
||||
sql reset query cache
|
||||
sql alter table $tb set tag tgcol4=4
|
||||
sql alter table $tb set tag tgcol5=5
|
||||
sql alter table $tb set tag tgcol5='5'
|
||||
sql alter table $tb set tag tgcol6=1
|
||||
sql reset query cache
|
||||
|
||||
|
@ -423,7 +423,7 @@ $i = 9
|
|||
$mt = $mtPrefix . $i
|
||||
$tb = $tbPrefix . $i
|
||||
sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol1 double, tgcol2 binary(10), tgcol3 binary(10))
|
||||
sql create table $tb using $mt tags( 1, 2, '3' )
|
||||
sql create table $tb using $mt tags( 1, '2', '3' )
|
||||
sql insert into $tb values(now, 1)
|
||||
sql select * from $mt where tgcol2 = '2'
|
||||
if $rows != 1 then
|
||||
|
@ -442,7 +442,7 @@ if $data04 != 3 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql alter table $mt change tag tgcol1 tgcol4
|
||||
sql alter table $mt rename tag tgcol1 tgcol4
|
||||
sql alter table $mt drop tag tgcol2
|
||||
sql alter table $mt drop tag tgcol3
|
||||
sql alter table $mt add tag tgcol5 bool
|
||||
|
@ -506,7 +506,7 @@ if $data05 != 4 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql alter table $mt change tag tgcol1 tgcol4 -x step103
|
||||
sql alter table $mt rename tag tgcol1 tgcol4 -x step103
|
||||
return -1
|
||||
step103:
|
||||
|
||||
|
@ -518,7 +518,7 @@ sql alter table $mt add tag tgcol4 binary(10)
|
|||
sql alter table $mt add tag tgcol5 bool
|
||||
|
||||
sql reset query cache
|
||||
sql alter table $tb set tag tgcol4=4
|
||||
sql alter table $tb set tag tgcol4='4'
|
||||
sql alter table $tb set tag tgcol5=false
|
||||
sql reset query cache
|
||||
|
||||
|
@ -580,7 +580,7 @@ if $data06 != 5 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql alter table $mt change tag tgcol1 tgcol4 -x step114
|
||||
sql alter table $mt rename tag tgcol1 tgcol4 -x step114
|
||||
return -1
|
||||
step114:
|
||||
|
||||
|
@ -596,9 +596,9 @@ sql alter table $mt add tag tgcol7 bigint
|
|||
sql alter table $mt add tag tgcol8 smallint
|
||||
|
||||
sql reset query cache
|
||||
sql alter table $tb set tag tgcol4=4
|
||||
sql alter table $tb set tag tgcol4='4'
|
||||
sql alter table $tb set tag tgcol5=5
|
||||
sql alter table $tb set tag tgcol6=6
|
||||
sql alter table $tb set tag tgcol6='6'
|
||||
sql alter table $tb set tag tgcol7=7
|
||||
sql alter table $tb set tag tgcol8=8
|
||||
sql reset query cache
|
||||
|
@ -685,11 +685,11 @@ sql alter table $mt add tag tgcol5 bigint
|
|||
|
||||
sql reset query cache
|
||||
sql alter table $tb set tag tgcol1=false
|
||||
sql alter table $tb set tag tgcol2=5
|
||||
sql alter table $tb set tag tgcol2='5'
|
||||
sql alter table $tb set tag tgcol3=4
|
||||
sql alter table $tb set tag tgcol4=3
|
||||
sql alter table $tb set tag tgcol4='3'
|
||||
sql alter table $tb set tag tgcol5=2
|
||||
sql alter table $tb set tag tgcol6=1
|
||||
sql alter table $tb set tag tgcol6='1'
|
||||
sql reset query cache
|
||||
|
||||
sql select * from $mt where tgcol4 = '3'
|
||||
|
@ -781,8 +781,8 @@ sql alter table $mt add tag tgcol4 int
|
|||
sql alter table $mt add tag tgcol6 bigint
|
||||
|
||||
sql reset query cache
|
||||
sql alter table $tb set tag tgcol1=7
|
||||
sql alter table $tb set tag tgcol2=8
|
||||
sql alter table $tb set tag tgcol1='7'
|
||||
sql alter table $tb set tag tgcol2='8'
|
||||
sql alter table $tb set tag tgcol3=9
|
||||
sql alter table $tb set tag tgcol4=10
|
||||
sql alter table $tb set tag tgcol5=11
|
||||
|
@ -817,9 +817,7 @@ if $data07 != 12 then
|
|||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
sleep 3000
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 3000
|
||||
|
||||
print =============== step1
|
||||
$i = 0
|
||||
|
|
|
@ -358,8 +358,8 @@ class TDTestCase:
|
|||
tdSql.error("alter table %s.%s modify column c2 binary(40)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s modify tag t2 binary(40)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
tdSql.error("alter table %s.%s set tag t1=20"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.error("alter table %s.%s set tag t2='20'"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.query("alter table %s.%s set tag t1=20"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.query("alter table %s.%s set tag t2='20'"%(parameterDict['dbName'], ctbName))
|
||||
|
||||
tdSql.error("alter table %s.%s rename column c1 c1new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s rename column c2 c2new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
@ -374,13 +374,13 @@ class TDTestCase:
|
|||
tdSql.query("alter table %s.%s set tag t4='20'"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.query("alter table %s.%s set tag t5='20'"%(parameterDict['dbName'], ctbName))
|
||||
|
||||
tdSql.query("alter table %s.%s rename column c3 c3new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s rename column c4 c4new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s rename column c3 c3new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s rename column c4 c4new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s rename tag t3 t3new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s rename tag t4 t4new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
tdSql.query("alter table %s.%s drop column c3new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s drop column c4new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s drop column c3"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s drop column c4"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s drop tag t3new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s drop tag t4new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
|
@ -508,10 +508,10 @@ class TDTestCase:
|
|||
tdSql.error("alter table %s.%s modify tag t2 binary(40)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s modify tag t4 binary(40)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
tdSql.error("alter table %s.%s set tag t1=20"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.error("alter table %s.%s set tag t2='20'"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.error("alter table %s.%s set tag t3=20"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.error("alter table %s.%s set tag t4='20'"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.query("alter table %s.%s set tag t1=20"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.query("alter table %s.%s set tag t2='20'"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.query("alter table %s.%s set tag t3=20"%(parameterDict['dbName'], ctbName))
|
||||
tdSql.query("alter table %s.%s set tag t4='20'"%(parameterDict['dbName'], ctbName))
|
||||
|
||||
tdSql.error("alter table %s.%s rename column c1 c1new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s rename column c2 c2new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
@ -528,10 +528,10 @@ class TDTestCase:
|
|||
|
||||
tdSql.query("alter table %s.%s set tag t5='50'"%(parameterDict['dbName'], ctbName))
|
||||
|
||||
tdSql.query("alter table %s.%s rename column c5 c5new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.error("alter table %s.%s rename column c5 c5new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s rename tag t5 t5new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
tdSql.query("alter table %s.%s drop column c5new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s drop column c5"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.query("alter table %s.%s drop tag t5new"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
tdSql.query("alter table %s.%s add column c5 float"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
@ -830,9 +830,9 @@ class TDTestCase:
|
|||
cfgPath = buildPath + "/../sim/psim/cfg"
|
||||
tdLog.info("cfgPath: %s" % cfgPath)
|
||||
|
||||
# self.tmqCase1(cfgPath, buildPath)
|
||||
# self.tmqCase2(cfgPath, buildPath)
|
||||
# self.tmqCase3(cfgPath, buildPath)
|
||||
self.tmqCase1(cfgPath, buildPath)
|
||||
self.tmqCase2(cfgPath, buildPath)
|
||||
self.tmqCase3(cfgPath, buildPath)
|
||||
self.tmqCase4(cfgPath, buildPath)
|
||||
self.tmqCase5(cfgPath, buildPath)
|
||||
|
||||
|
|
|
@ -129,9 +129,12 @@ class TMQCom:
|
|||
def stopTmqSimProcess(self, processorName):
|
||||
psCmd = "ps -ef|grep -w %s|grep -v grep | awk '{print $2}'"%(processorName)
|
||||
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
|
||||
onlyKillOnceWindows = 0
|
||||
while(processID):
|
||||
killCmd = "kill -INT %s > /dev/null 2>&1" % processID
|
||||
os.system(killCmd)
|
||||
if not platform.system().lower() == 'windows' or (onlyKillOnceWindows == 0 and platform.system().lower() == 'windows'):
|
||||
killCmd = "kill -INT %s > /dev/null 2>&1" % processID
|
||||
os.system(killCmd)
|
||||
onlyKillOnceWindows = 1
|
||||
time.sleep(0.2)
|
||||
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
|
||||
tdLog.debug("%s is stopped by kill -INT" % (processorName))
|
||||
|
|
|
@ -186,7 +186,7 @@ python3 ./test.py -f 7-tmq/subscribeStb3.py
|
|||
python3 ./test.py -f 7-tmq/subscribeStb4.py
|
||||
python3 ./test.py -f 7-tmq/db.py
|
||||
python3 ./test.py -f 7-tmq/tmqError.py
|
||||
python3 ./test.py -f 7-tmq/schema.py
|
||||
#python3 ./test.py -f 7-tmq/schema.py
|
||||
python3 ./test.py -f 7-tmq/stbFilter.py
|
||||
python3 ./test.py -f 7-tmq/tmqCheckData.py
|
||||
python3 ./test.py -f 7-tmq/tmqCheckData1.py
|
||||
|
|
|
@ -28,6 +28,7 @@ target_link_libraries(
|
|||
sdbDump
|
||||
PUBLIC dnode
|
||||
PUBLIC mnode
|
||||
PUBLIC stream
|
||||
PUBLIC sdb
|
||||
PUBLIC os
|
||||
)
|
||||
|
@ -37,4 +38,4 @@ target_include_directories(
|
|||
PRIVATE "${TD_SOURCE_DIR}/source/dnode/mnode/impl/inc"
|
||||
PRIVATE "${TD_SOURCE_DIR}/source/dnode/mnode/sdb/inc"
|
||||
PRIVATE "${TD_SOURCE_DIR}/source/dnode/mgmt/node_mgmt/inc"
|
||||
)
|
||||
)
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit 69b558ccbfe54a4407fe23eeae2e67c540f59e55
|
||||
Subproject commit 0b8a3373bb7548f8106d13e7d3b0a988d3c4d48a
|
|
@ -1 +1 @@
|
|||
Subproject commit d8f19ede56f1f489c5d2ac8f963cced01e68ecef
|
||||
Subproject commit df8678f070e3f707faf59baebec90065f6e1268b
|
Loading…
Reference in New Issue