Merge branch '3.0' into feature/TD-11463-3.0
This commit is contained in:
commit
3494960b1a
|
@ -215,7 +215,7 @@ typedef struct {
|
|||
// Submit message for one table
|
||||
typedef struct SSubmitBlk {
|
||||
int64_t uid; // table unique id
|
||||
int32_t tid; // table id
|
||||
int64_t suid; // stable id
|
||||
int32_t padding; // TODO just for padding here
|
||||
int32_t sversion; // data schema version
|
||||
int32_t dataLen; // data part length, not including the SSubmitBlk head
|
||||
|
|
|
@ -159,6 +159,7 @@ typedef struct {
|
|||
(IS_SIGNED_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL) || (_t) == (TSDB_DATA_TYPE_TIMESTAMP))
|
||||
#define IS_CONVERT_AS_UNSIGNED(_t) (IS_UNSIGNED_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL))
|
||||
|
||||
// TODO remove this function
|
||||
static FORCE_INLINE bool isNull(const void *val, int32_t type) {
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
|
|
|
@ -453,6 +453,10 @@ enum {
|
|||
SND_WORKER_TYPE__UNIQUE,
|
||||
};
|
||||
|
||||
#define MND_VGID -1
|
||||
#define QND_VGID 1
|
||||
#define VND_VGID 0
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -84,6 +84,8 @@ typedef uint16_t VarDataLenT; // maxVarDataLen: 32767
|
|||
#define varDataLen(v) ((VarDataLenT *)(v))[0]
|
||||
#define varDataVal(v) ((char *)(v) + VARSTR_HEADER_SIZE)
|
||||
|
||||
#define NCHAR_WIDTH_TO_BYTES(n) ((n) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE)
|
||||
|
||||
typedef int32_t VarDataOffsetT;
|
||||
|
||||
typedef struct tstr {
|
||||
|
|
|
@ -155,6 +155,7 @@ typedef struct SReqResultInfo {
|
|||
TAOS_FIELD* fields;
|
||||
uint32_t numOfCols;
|
||||
int32_t* length;
|
||||
char** convertBuf;
|
||||
TAOS_ROW row;
|
||||
SResultColumn* pCol;
|
||||
uint32_t numOfRows;
|
||||
|
|
|
@ -169,6 +169,13 @@ static void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
|
|||
tfree(pResInfo->row);
|
||||
tfree(pResInfo->pCol);
|
||||
tfree(pResInfo->fields);
|
||||
|
||||
if (pResInfo->convertBuf != NULL) {
|
||||
for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
|
||||
tfree(pResInfo->convertBuf[i]);
|
||||
}
|
||||
tfree(pResInfo->convertBuf);
|
||||
}
|
||||
}
|
||||
|
||||
static void doDestroyRequest(void *p) {
|
||||
|
|
|
@ -634,18 +634,30 @@ _return:
|
|||
for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
|
||||
SResultColumn* pCol = &pResultInfo->pCol[i];
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
|
||||
int32_t type = pResultInfo->fields[i].type;
|
||||
int32_t bytes = pResultInfo->fields[i].bytes;
|
||||
|
||||
if (IS_VAR_DATA_TYPE(type)) {
|
||||
if (pCol->offset[pResultInfo->current] != -1) {
|
||||
char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;
|
||||
|
||||
pResultInfo->length[i] = varDataLen(pStart);
|
||||
pResultInfo->row[i] = varDataVal(pStart);
|
||||
|
||||
if (type == TSDB_DATA_TYPE_NCHAR) {
|
||||
int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(pResultInfo->convertBuf[i]));
|
||||
ASSERT(len <= bytes);
|
||||
|
||||
pResultInfo->row[i] = varDataVal(pResultInfo->convertBuf[i]);
|
||||
varDataSetLen(pResultInfo->convertBuf[i], len);
|
||||
pResultInfo->length[i] = len;
|
||||
}
|
||||
} else {
|
||||
pResultInfo->row[i] = NULL;
|
||||
}
|
||||
} else {
|
||||
if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) {
|
||||
pResultInfo->row[i] = pResultInfo->pCol[i].pData + pResultInfo->fields[i].bytes * pResultInfo->current;
|
||||
pResultInfo->row[i] = pResultInfo->pCol[i].pData + bytes * pResultInfo->current;
|
||||
} else {
|
||||
pResultInfo->row[i] = NULL;
|
||||
}
|
||||
|
@ -661,13 +673,20 @@ static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
|
|||
pResInfo->row = calloc(pResInfo->numOfCols, POINTER_BYTES);
|
||||
pResInfo->pCol = calloc(pResInfo->numOfCols, sizeof(SResultColumn));
|
||||
pResInfo->length = calloc(pResInfo->numOfCols, sizeof(int32_t));
|
||||
pResInfo->convertBuf = calloc(pResInfo->numOfCols, POINTER_BYTES);
|
||||
|
||||
if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
for(int32_t i = 0; i < pResInfo->numOfCols; ++i) {
|
||||
if(pResInfo->fields[i].type == TSDB_DATA_TYPE_NCHAR) {
|
||||
pResInfo->convertBuf[i] = calloc(1, NCHAR_WIDTH_TO_BYTES(pResInfo->fields[i].bytes));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows) {
|
||||
|
|
|
@ -52,7 +52,7 @@ TEST(testCase, driverInit_Test) {
|
|||
// taosInitGlobalCfg();
|
||||
// taos_init();
|
||||
}
|
||||
|
||||
#if 0
|
||||
TEST(testCase, connect_Test) {
|
||||
// taos_options(TSDB_OPTION_CONFIGDIR, "/home/ubuntu/first/cfg");
|
||||
|
||||
|
@ -652,6 +652,7 @@ TEST(testCase, projection_query_stables) {
|
|||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
}
|
||||
#endif
|
||||
|
||||
TEST(testCase, agg_query_tables) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
|
@ -660,7 +661,7 @@ TEST(testCase, agg_query_tables) {
|
|||
TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "select count(*), sum(k),min(k),max(k) from tu");
|
||||
pRes = taos_query(pConn, "select k from tm0");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
|
||||
taos_free_result(pRes);
|
||||
|
|
|
@ -458,16 +458,21 @@ int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision) {
|
|||
if (duration == 0) {
|
||||
return t;
|
||||
}
|
||||
if (unit == 'y') {
|
||||
duration *= 12;
|
||||
} else if (unit != 'n') {
|
||||
|
||||
if (unit != 'n' && unit != 'y') {
|
||||
return t + duration;
|
||||
}
|
||||
|
||||
// The following code handles the y/n time duration
|
||||
int64_t numOfMonth = duration;
|
||||
if (unit == 'y') {
|
||||
numOfMonth *= 12;
|
||||
}
|
||||
|
||||
struct tm tm;
|
||||
time_t tt = (time_t)(t / TSDB_TICK_PER_SECOND(precision));
|
||||
taosLocalTime(&tt, &tm);
|
||||
int32_t mon = tm.tm_year * 12 + tm.tm_mon + (int32_t)duration;
|
||||
int32_t mon = tm.tm_year * 12 + tm.tm_mon + (int32_t)numOfMonth;
|
||||
tm.tm_year = mon / 12;
|
||||
tm.tm_mon = mon % 12;
|
||||
|
||||
|
@ -574,12 +579,23 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
|
|||
}
|
||||
}
|
||||
|
||||
ASSERT(pInterval->offset >= 0);
|
||||
|
||||
if (pInterval->offset > 0) {
|
||||
start = taosTimeAdd(start, pInterval->offset, pInterval->offsetUnit, precision);
|
||||
if (start > t) {
|
||||
start = taosTimeAdd(start, -pInterval->interval, pInterval->intervalUnit, precision);
|
||||
} else {
|
||||
// try to move current window to the left-hande-side, due to the offset effect.
|
||||
int64_t end = start + pInterval->interval - 1;
|
||||
ASSERT(end >= t);
|
||||
end = taosTimeAdd(end, -pInterval->sliding, pInterval->slidingUnit, precision);
|
||||
if (end >= t) {
|
||||
start = taosTimeAdd(start, -pInterval->sliding, pInterval->slidingUnit, precision);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return start;
|
||||
}
|
||||
|
||||
|
|
|
@ -75,10 +75,8 @@ typedef int32_t (*DropNodeFp)(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
|
|||
typedef int32_t (*RequireNodeFp)(SMgmtWrapper *pWrapper, bool *required);
|
||||
|
||||
typedef struct SMsgHandle {
|
||||
int32_t vgId;
|
||||
NodeMsgFp vgIdMsgFp;
|
||||
SMgmtWrapper *pVgIdWrapper; // Handle the case where the same message type is distributed to qnode or vnode
|
||||
NodeMsgFp msgFp;
|
||||
SMgmtWrapper *pQndWrapper;
|
||||
SMgmtWrapper *pMndWrapper;
|
||||
SMgmtWrapper *pWrapper;
|
||||
} SMsgHandle;
|
||||
|
||||
|
|
|
@ -117,6 +117,7 @@ SDnode *dndCreate(const SDnodeOpt *pOption) {
|
|||
_OVER:
|
||||
if (code != 0 && pDnode) {
|
||||
dndClearMemory(pDnode);
|
||||
pDnode = NULL;
|
||||
dError("failed to create dnode object since %s", terrstr());
|
||||
} else {
|
||||
dInfo("dnode object is created, data:%p", pDnode);
|
||||
|
|
|
@ -20,13 +20,15 @@
|
|||
#define INTERNAL_CKEY "_key"
|
||||
#define INTERNAL_SECRET "_pwd"
|
||||
|
||||
static inline void dndProcessQVnodeRpcMsg(SMsgHandle *pHandle, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
static inline void dndProcessQMVnodeRpcMsg(SMsgHandle *pHandle, SRpcMsg *pMsg, SEpSet *pEpSet) {
|
||||
SMsgHead *pHead = pMsg->pCont;
|
||||
int32_t vgId = htonl(pHead->vgId);
|
||||
|
||||
SMgmtWrapper *pWrapper = pHandle->pWrapper;
|
||||
if (vgId == pHandle->vgId && pHandle->pVgIdWrapper != NULL) {
|
||||
pWrapper = pHandle->pVgIdWrapper;
|
||||
if (vgId == QND_VGID) {
|
||||
pWrapper = pHandle->pQndWrapper;
|
||||
} else if (vgId == MND_VGID) {
|
||||
pWrapper = pHandle->pMndWrapper;
|
||||
}
|
||||
|
||||
dTrace("msg:%s will be processed by %s, handle:%p app:%p vgId:%d", TMSG_INFO(pMsg->msgType), pWrapper->name,
|
||||
|
@ -46,13 +48,13 @@ static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
|
|||
}
|
||||
|
||||
SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
|
||||
if (pHandle->msgFp != NULL) {
|
||||
if (pHandle->vgId == 0) {
|
||||
if (pHandle->pWrapper != NULL) {
|
||||
if (pHandle->pMndWrapper == NULL && pHandle->pQndWrapper == NULL) {
|
||||
dTrace("rsp:%s will be processed by %s, handle:%p app:%p code:0x%04x:%s", TMSG_INFO(msgType),
|
||||
pHandle->pWrapper->name, pRsp->handle, pRsp->ahandle, pRsp->code & 0XFFFF, tstrerror(pRsp->code));
|
||||
dndProcessRpcMsg(pHandle->pWrapper, pRsp, pEpSet);
|
||||
} else {
|
||||
dndProcessQVnodeRpcMsg(pHandle, pRsp, pEpSet);
|
||||
dndProcessQMVnodeRpcMsg(pHandle, pRsp, pEpSet);
|
||||
}
|
||||
} else {
|
||||
dError("rsp:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pRsp->handle, pRsp->ahandle);
|
||||
|
@ -126,13 +128,13 @@ static void dndProcessRequest(void *param, SRpcMsg *pReq, SEpSet *pEpSet) {
|
|||
}
|
||||
|
||||
SMsgHandle *pHandle = &pMgmt->msgHandles[TMSG_INDEX(msgType)];
|
||||
if (pHandle->msgFp != NULL) {
|
||||
if (pHandle->vgId == 0) {
|
||||
if (pHandle->pWrapper != NULL) {
|
||||
if (pHandle->pMndWrapper == NULL && pHandle->pQndWrapper == NULL) {
|
||||
dTrace("req:%s will be processed by %s, handle:%p app:%p", TMSG_INFO(msgType), pHandle->pWrapper->name,
|
||||
pReq->handle, pReq->ahandle);
|
||||
dndProcessRpcMsg(pHandle->pWrapper, pReq, pEpSet);
|
||||
} else {
|
||||
dndProcessQVnodeRpcMsg(pHandle, pReq, pEpSet);
|
||||
dndProcessQMVnodeRpcMsg(pHandle, pReq, pEpSet);
|
||||
}
|
||||
} else {
|
||||
dError("req:%s not processed since no handle, handle:%p app:%p", TMSG_INFO(msgType), pReq->handle, pReq->ahandle);
|
||||
|
@ -269,21 +271,27 @@ int32_t dndInitMsgHandle(SDnode *pDnode) {
|
|||
int32_t vgId = pWrapper->msgVgIds[msgIndex];
|
||||
if (msgFp == NULL) continue;
|
||||
|
||||
dTrace("msg:%s will be processed by %s, vgId:%d", tMsgInfo[msgIndex], pWrapper->name, vgId);
|
||||
|
||||
SMsgHandle *pHandle = &pMgmt->msgHandles[msgIndex];
|
||||
if (pHandle->msgFp != NULL && pHandle->vgId == vgId) {
|
||||
dError("msg:%s has multiple process nodes, prev node:%s:%d, curr node:%s:%d", tMsgInfo[msgIndex],
|
||||
pHandle->pWrapper->name, pHandle->pWrapper->msgVgIds[msgIndex], pWrapper->name, vgId);
|
||||
return -1;
|
||||
} else {
|
||||
dTrace("msg:%s will be processed by %s, vgId:%d", tMsgInfo[msgIndex], pWrapper->name, vgId);
|
||||
if (vgId == 0) {
|
||||
pHandle->msgFp = msgFp;
|
||||
pHandle->pWrapper = pWrapper;
|
||||
} else {
|
||||
pHandle->vgId = vgId;
|
||||
pHandle->vgIdMsgFp = msgFp;
|
||||
pHandle->pVgIdWrapper = pWrapper;
|
||||
if (vgId == QND_VGID) {
|
||||
if (pHandle->pQndWrapper != NULL) {
|
||||
dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
|
||||
return -1;
|
||||
}
|
||||
pHandle->pQndWrapper = pWrapper;
|
||||
} else if (vgId == MND_VGID) {
|
||||
if (pHandle->pMndWrapper != NULL) {
|
||||
dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
|
||||
return -1;
|
||||
}
|
||||
pHandle->pMndWrapper = pWrapper;
|
||||
} else {
|
||||
if (pHandle->pWrapper != NULL) {
|
||||
dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]);
|
||||
return -1;
|
||||
}
|
||||
pHandle->pWrapper = pWrapper;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -114,19 +114,19 @@ int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
|||
|
||||
void dmInitMsgHandles(SMgmtWrapper *pWrapper) {
|
||||
// Requests handled by DNODE
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, (NodeMsgFp)dmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, (NodeMsgFp)dmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_NETWORK_TEST, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID);
|
||||
|
||||
// Requests handled by MNODE
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, (NodeMsgFp)dmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, (NodeMsgFp)dmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, (NodeMsgFp)dmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS_RSP, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT_RSP, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH_RSP, (NodeMsgFp)dmProcessMgmtMsg, VND_VGID);
|
||||
}
|
||||
|
|
|
@ -75,84 +75,90 @@ int32_t mmProcessAlterReq(SMnodeMgmt *pMgmt, SNodeMsg *pMsg) {
|
|||
|
||||
void mmInitMsgHandles(SMgmtWrapper *pWrapper) {
|
||||
// Requests handled by DNODE
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_MNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_QNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_SNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_BNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CONFIG_DNODE_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
|
||||
// Requests handled by MNODE
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CONNECT, (NodeMsgFp)mmProcessReadMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, (NodeMsgFp)mmProcessReadMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_USE_DB, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_SYNC_DB, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SMA, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SMA, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, (NodeMsgFp)mmProcessReadMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, (NodeMsgFp)mmProcessReadMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_SHOW, (NodeMsgFp)mmProcessReadMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, (NodeMsgFp)mmProcessReadMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE, (NodeMsgFp)mmProcessReadMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS, (NodeMsgFp)mmProcessReadMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH, (NodeMsgFp)mmProcessReadMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, (NodeMsgFp)mmProcessReadMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CONNECT, (NodeMsgFp)mmProcessReadMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_ACCT, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_ACCT, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_ACCT, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_USER, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_USER, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_USER, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_GET_USER_AUTH, (NodeMsgFp)mmProcessReadMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CONFIG_DNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_MNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_MNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_QNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_QNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_BNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_BNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_DB, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_DB, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_USE_DB, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_DB, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_SYNC_DB, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_COMPACT_DB, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_FUNC, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_RETRIEVE_FUNC, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_FUNC, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STB, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_STB, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_STB, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_SMA, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_SMA, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_TABLE_META, (NodeMsgFp)mmProcessReadMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_VGROUP_LIST, (NodeMsgFp)mmProcessReadMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_QUERY, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_CONN, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_HEARTBEAT, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_SHOW, (NodeMsgFp)mmProcessReadMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_SHOW_RETRIEVE, (NodeMsgFp)mmProcessReadMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_SYSTABLE_RETRIEVE, (NodeMsgFp)mmProcessReadMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_STATUS, (NodeMsgFp)mmProcessReadMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_KILL_TRANS, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_GRANT, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_AUTH, (NodeMsgFp)mmProcessReadMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_MNODE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_TOPIC, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_ALTER_TOPIC, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_DROP_TOPIC, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, (NodeMsgFp)mmProcessReadMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
|
||||
// Requests handled by VNODE
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA_RSP, (NodeMsgFp)mmProcessWriteMsg, VND_VGID);
|
||||
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)mmProcessReadMsg, MND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)mmProcessReadMsg, MND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)mmProcessReadMsg, MND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)mmProcessReadMsg, MND_VGID);
|
||||
|
||||
}
|
||||
|
|
|
@ -56,14 +56,14 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
|
|||
|
||||
void qmInitMsgHandles(SMgmtWrapper *pWrapper) {
|
||||
// Requests handled by VNODE
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)qmProcessQueryMsg, 1);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)qmProcessQueryMsg, 1);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)qmProcessFetchMsg, 1);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)qmProcessFetchMsg, 1);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)qmProcessQueryMsg, QND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)qmProcessQueryMsg, QND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)qmProcessFetchMsg, QND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)qmProcessFetchMsg, QND_VGID);
|
||||
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)qmProcessFetchMsg, 1);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)qmProcessFetchMsg, 1);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)qmProcessFetchMsg, 1);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)qmProcessFetchMsg, 1);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)qmProcessFetchMsg, 1);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)qmProcessFetchMsg, QND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)qmProcessFetchMsg, QND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)qmProcessFetchMsg, QND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)qmProcessFetchMsg, QND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)qmProcessFetchMsg, QND_VGID);
|
||||
}
|
||||
|
|
|
@ -56,6 +56,6 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
|
|||
|
||||
void smInitMsgHandles(SMgmtWrapper *pWrapper) {
|
||||
// Requests handled by SNODE
|
||||
dndSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, (NodeMsgFp)smProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, (NodeMsgFp)smProcessExecMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_SND_TASK_DEPLOY, (NodeMsgFp)smProcessMgmtMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_SND_TASK_EXEC, (NodeMsgFp)smProcessExecMsg, VND_VGID);
|
||||
}
|
||||
|
|
|
@ -244,47 +244,47 @@ int32_t vmProcessCompactVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
|
|||
|
||||
void vmInitMsgHandles(SMgmtWrapper *pWrapper) {
|
||||
// Requests handled by VNODE
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)vmProcessQueryMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)vmProcessQueryMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, (NodeMsgFp)vmProcessQueryMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, (NodeMsgFp)vmProcessQueryMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, (NodeMsgFp)vmProcessFetchMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_SUBMIT, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY, (NodeMsgFp)vmProcessQueryMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)vmProcessQueryMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH_RSP, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_UPDATE_TAG_VAL, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_TABLE_META, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_TABLES_META, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONSUME, (NodeMsgFp)vmProcessQueryMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_QUERY, (NodeMsgFp)vmProcessQueryMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_CONNECT, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_DISCONNECT, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_RES_READY, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASKS_STATUS, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_TASK, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_STB, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_STB, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_STB, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_TABLE, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_ALTER_TABLE, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TABLE, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_SHOW_TABLES_FETCH, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
|
||||
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, (NodeMsgFp)vmProcessMgmtMsg, 0);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_CREATE_VNODE, (NodeMsgFp)vmProcessMgmtMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_ALTER_VNODE, (NodeMsgFp)vmProcessMgmtMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_DROP_VNODE, (NodeMsgFp)vmProcessMgmtMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_SYNC_VNODE, (NodeMsgFp)vmProcessMgmtMsg, VND_VGID);
|
||||
dndSetMsgHandle(pWrapper, TDMT_DND_COMPACT_VNODE, (NodeMsgFp)vmProcessMgmtMsg, VND_VGID);
|
||||
}
|
||||
|
|
|
@ -44,7 +44,7 @@ int32_t tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitReq* pMsg, int64_t
|
|||
if (pReadHandle->pBlock == NULL) break;
|
||||
|
||||
pReadHandle->pBlock->uid = htobe64(pReadHandle->pBlock->uid);
|
||||
pReadHandle->pBlock->tid = htonl(pReadHandle->pBlock->tid);
|
||||
pReadHandle->pBlock->suid = htobe64(pReadHandle->pBlock->suid);
|
||||
pReadHandle->pBlock->sversion = htonl(pReadHandle->pBlock->sversion);
|
||||
pReadHandle->pBlock->dataLen = htonl(pReadHandle->pBlock->dataLen);
|
||||
pReadHandle->pBlock->schemaLen = htonl(pReadHandle->pBlock->schemaLen);
|
||||
|
|
|
@ -248,7 +248,7 @@ static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
|
|||
if (pBlock == NULL) break;
|
||||
|
||||
pBlock->uid = htobe64(pBlock->uid);
|
||||
pBlock->tid = htonl(pBlock->tid);
|
||||
pBlock->suid = htobe64(pBlock->suid);
|
||||
pBlock->sversion = htonl(pBlock->sversion);
|
||||
pBlock->dataLen = htonl(pBlock->dataLen);
|
||||
pBlock->schemaLen = htonl(pBlock->schemaLen);
|
||||
|
|
|
@ -243,6 +243,8 @@ typedef struct STaskAttr {
|
|||
|
||||
struct SOperatorInfo;
|
||||
|
||||
typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char **result, int32_t *length);
|
||||
typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char *result, int32_t length);
|
||||
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* param);
|
||||
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* param, bool* newgroup);
|
||||
typedef void (*__optr_close_fn_t)(void* param, int32_t num);
|
||||
|
@ -332,6 +334,8 @@ typedef struct SOperatorInfo {
|
|||
__optr_fn_t cleanupFn;
|
||||
__optr_close_fn_t closeFn;
|
||||
__optr_open_fn_t _openFn; // DO NOT invoke this function directly
|
||||
__optr_encode_fn_t encodeResultRow; //
|
||||
__optr_decode_fn_t decodeResultRow;
|
||||
} SOperatorInfo;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -6391,6 +6391,111 @@ static SSDataBlock* getAggregateResult(SOperatorInfo *pOperator, bool* newgroup)
|
|||
return (blockDataGetNumOfRows(pInfo->pRes) != 0)? pInfo->pRes:NULL;
|
||||
}
|
||||
|
||||
static void aggEncodeResultRow(SOperatorInfo* pOperator, char **result, int32_t *length) {
|
||||
SAggOperatorInfo *pAggInfo = pOperator->info;
|
||||
SAggSupporter *pSup = &pAggInfo->aggSup;
|
||||
|
||||
int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
|
||||
size_t keyLen = POINTER_BYTES; // estimate the key length
|
||||
int32_t totalSize = sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize);
|
||||
*result = calloc(1, totalSize);
|
||||
if(*result == NULL){
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return;
|
||||
}
|
||||
*(int32_t*)(*result) = size;
|
||||
int32_t offset = sizeof(int32_t);
|
||||
void *pIter = taosHashIterate(pSup->pResultRowHashTable, NULL);
|
||||
while (pIter) {
|
||||
void *key = taosHashGetKey(pIter, &keyLen);
|
||||
SResultRow **p1 = (SResultRow **)pIter;
|
||||
|
||||
// recalculate the result size
|
||||
int32_t realTotalSize = offset + sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize;
|
||||
if (realTotalSize > totalSize){
|
||||
char *tmp = realloc(*result, realTotalSize);
|
||||
if (tmp == NULL){
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
free(*result);
|
||||
*result = NULL;
|
||||
return;
|
||||
}else{
|
||||
*result = tmp;
|
||||
}
|
||||
}
|
||||
// save key
|
||||
*(int32_t*)(*result + offset) = keyLen;
|
||||
offset += sizeof(int32_t);
|
||||
memcpy(*result + offset, key, keyLen);
|
||||
offset += keyLen;
|
||||
|
||||
// save value
|
||||
*(int32_t*)(*result + offset) = pSup->resultRowSize;
|
||||
offset += sizeof(int32_t);
|
||||
memcpy(*result + offset, *p1, pSup->resultRowSize);
|
||||
offset += pSup->resultRowSize;
|
||||
|
||||
pIter = taosHashIterate(pSup->pResultRowHashTable, pIter);
|
||||
}
|
||||
|
||||
if(length) {
|
||||
*length = offset;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
static bool aggDecodeResultRow(SOperatorInfo* pOperator, char *result, int32_t length) {
|
||||
if (!result || length <= 0){
|
||||
return false;
|
||||
}
|
||||
|
||||
SAggOperatorInfo *pAggInfo = pOperator->info;
|
||||
SAggSupporter *pSup = &pAggInfo->aggSup;
|
||||
SOptrBasicInfo *pInfo = &pAggInfo->binfo;
|
||||
|
||||
// int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
|
||||
int32_t count = *(int32_t*)(result);
|
||||
|
||||
int32_t offset = sizeof(int32_t);
|
||||
while(count-- > 0 && length > offset){
|
||||
int32_t keyLen = *(int32_t*)(result + offset);
|
||||
offset += sizeof(int32_t);
|
||||
|
||||
uint64_t tableGroupId = *(uint64_t *)(result + offset);
|
||||
SResultRow *resultRow = getNewResultRow_rv(pSup->pResultBuf, tableGroupId, pSup->resultRowSize);
|
||||
if (!resultRow){
|
||||
terrno = TSDB_CODE_TSC_INVALID_INPUT;
|
||||
return false;
|
||||
}
|
||||
// add a new result set for a new group
|
||||
taosHashPut(pSup->pResultRowHashTable, result + offset, keyLen, &resultRow, POINTER_BYTES);
|
||||
|
||||
offset += keyLen;
|
||||
int32_t valueLen = *(int32_t*)(result + offset);
|
||||
if (valueLen != pSup->resultRowSize){
|
||||
terrno = TSDB_CODE_TSC_INVALID_INPUT;
|
||||
return false;
|
||||
}
|
||||
offset += sizeof(int32_t);
|
||||
int32_t pageId = resultRow->pageId;
|
||||
int32_t pOffset = resultRow->offset;
|
||||
memcpy(resultRow, result + offset, valueLen);
|
||||
resultRow->pageId = pageId;
|
||||
resultRow->offset = pOffset;
|
||||
offset += valueLen;
|
||||
|
||||
initResultRow(resultRow);
|
||||
|
||||
pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] = (SResultRowPosition) {.pageId = resultRow->pageId, .offset = resultRow->offset};
|
||||
}
|
||||
|
||||
if (offset != length){
|
||||
terrno = TSDB_CODE_TSC_INVALID_INPUT;
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
static SSDataBlock* doMultiTableAggregate(SOperatorInfo *pOperator, bool* newgroup) {
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
|
@ -7086,10 +7191,10 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgrou
|
|||
static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo *pInfo, SResultInfo* pResultInfo, bool* newgroup, SExecTaskInfo* pTaskInfo) {
|
||||
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
|
||||
|
||||
// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, TASK_COMPLETED)? pTaskInfo->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
|
||||
int64_t ekey = Q_STATUS_EQUAL(pTaskInfo->status, TASK_COMPLETED)? pTaskInfo->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
|
||||
taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));
|
||||
|
||||
// taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
|
||||
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
|
||||
|
||||
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pResultInfo->capacity, pInfo->p);
|
||||
|
@ -7097,7 +7202,7 @@ static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo *pInfo, SResult
|
|||
*newgroup = true;
|
||||
}
|
||||
|
||||
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SResultInfo *pResultInfo, bool *newgroup) {
|
||||
static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SResultInfo *pResultInfo, bool *newgroup, SExecTaskInfo* pTaskInfo) {
|
||||
if (taosFillHasMoreResults(pInfo->pFillInfo)) {
|
||||
*newgroup = false;
|
||||
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pResultInfo->capacity, pInfo->p);
|
||||
|
@ -7108,12 +7213,13 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SResultInf
|
|||
|
||||
// handle the cached new group data block
|
||||
if (pInfo->existNewGroupBlock) {
|
||||
// doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup);
|
||||
doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup, pTaskInfo);
|
||||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) {
|
||||
SFillOperatorInfo *pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
|
@ -7121,7 +7227,7 @@ static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup);
|
||||
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo);
|
||||
if (pInfo->pRes->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) {
|
||||
return pInfo->pRes;
|
||||
}
|
||||
|
@ -7142,7 +7248,7 @@ static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) {
|
|||
|
||||
// Fill the previous group data block, before handle the data block of new group.
|
||||
// Close the fill operation for previous group data block
|
||||
// taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey);
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, 0, pTaskInfo->window.ekey);
|
||||
} else {
|
||||
if (pBlock == NULL) {
|
||||
if (pInfo->totalInputRows == 0) {
|
||||
|
@ -7150,7 +7256,7 @@ static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
// taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey);
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, 0, pTaskInfo->window.ekey);
|
||||
} else {
|
||||
pInfo->totalInputRows += pBlock->info.rows;
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey);
|
||||
|
@ -7168,14 +7274,13 @@ static SSDataBlock* doFill(SOperatorInfo *pOperator, bool* newgroup) {
|
|||
return pInfo->pRes;
|
||||
}
|
||||
|
||||
// doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup);
|
||||
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo);
|
||||
if (pInfo->pRes->info.rows > pOperator->resultInfo.threshold || pBlock == NULL) {
|
||||
return pInfo->pRes;
|
||||
}
|
||||
} else if (pInfo->existNewGroupBlock) { // try next group
|
||||
assert(pBlock != NULL);
|
||||
// doHandleRemainBlockForNewGroupImpl(pInfo, pRuntimeEnv, newgroup);
|
||||
|
||||
doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup, pTaskInfo);
|
||||
if (pInfo->pRes->info.rows > pResultInfo->threshold) {
|
||||
return pInfo->pRes;
|
||||
}
|
||||
|
@ -7312,6 +7417,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
pOperator->_openFn = doOpenAggregateOptr;
|
||||
pOperator->getNextFn = getAggregateResult;
|
||||
pOperator->closeFn = destroyAggOperatorInfo;
|
||||
pOperator->encodeResultRow = aggEncodeResultRow;
|
||||
pOperator->decodeResultRow = aggDecodeResultRow;
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -7576,7 +7683,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
|||
}
|
||||
|
||||
pInfo->order = TSDB_ORDER_ASC;
|
||||
pInfo->precision = TSDB_TIME_PRECISION_MICRO;
|
||||
pInfo->precision = TSDB_TIME_PRECISION_MILLI;
|
||||
pInfo->win = pTaskInfo->window;
|
||||
pInfo->interval = *pInterval;
|
||||
|
||||
|
@ -7863,10 +7970,10 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
|
|||
pInfo->intervalInfo = *pInterval;
|
||||
|
||||
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
||||
// int32_t code = initFillInfo(pInfo, pExpr, numOfCols, fillVal, , pResultInfo->capacity, pTaskInfo->id.str, pInterval, fillType);
|
||||
// if (code != TSDB_CODE_SUCCESS) {
|
||||
// goto _error;
|
||||
// }
|
||||
int32_t code = initFillInfo(pInfo, pExpr, numOfCols, (int64_t*) fillVal, pTaskInfo->window, pResultInfo->capacity, pTaskInfo->id.str, pInterval, fillType);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pOperator->name = "FillOperator";
|
||||
pOperator->blockingOptr = false;
|
||||
|
@ -7881,7 +7988,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
|
|||
|
||||
pOperator->closeFn = destroySFillOperatorInfo;
|
||||
|
||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
|
|
|
@ -118,7 +118,7 @@ static FORCE_INLINE void getMemRowAppendInfo(SSchema *pSchema, uint8_t rowType,
|
|||
}
|
||||
|
||||
static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, STableDataBlocks* dataBuf, int32_t numOfRows) {
|
||||
pBlocks->tid = dataBuf->pTableMeta->suid;
|
||||
pBlocks->suid = (TSDB_NORMAL_TABLE == dataBuf->pTableMeta->tableType ? dataBuf->pTableMeta->uid : dataBuf->pTableMeta->suid);
|
||||
pBlocks->uid = dataBuf->pTableMeta->uid;
|
||||
pBlocks->sversion = dataBuf->pTableMeta->sversion;
|
||||
|
||||
|
|
|
@ -48,11 +48,6 @@
|
|||
} \
|
||||
} while (0)
|
||||
|
||||
enum {
|
||||
TSDB_USE_SERVER_TS = 0,
|
||||
TSDB_USE_CLI_TS = 1,
|
||||
};
|
||||
|
||||
typedef struct SInsertParseContext {
|
||||
SParseContext* pComCxt; // input
|
||||
char *pSql; // input
|
||||
|
@ -264,7 +259,7 @@ static void buildMsgHeader(STableDataBlocks* src, SVgDataBlocks* blocks) {
|
|||
while (numOfBlocks--) {
|
||||
int32_t dataLen = blk->dataLen;
|
||||
blk->uid = htobe64(blk->uid);
|
||||
blk->tid = htonl(blk->tid);
|
||||
blk->suid = htobe64(blk->suid);
|
||||
blk->padding = htonl(blk->padding);
|
||||
blk->sversion = htonl(blk->sversion);
|
||||
blk->dataLen = htonl(blk->dataLen);
|
||||
|
@ -303,20 +298,7 @@ static int32_t checkTimestamp(STableDataBlocks *pDataBlocks, const char *start)
|
|||
}
|
||||
|
||||
TSKEY k = *(TSKEY *)start;
|
||||
|
||||
if (k == INT64_MIN) {
|
||||
if (pDataBlocks->tsSource == TSDB_USE_CLI_TS) {
|
||||
return TSDB_CODE_FAILED; // client time/server time can not be mixed
|
||||
}
|
||||
pDataBlocks->tsSource = TSDB_USE_SERVER_TS;
|
||||
} else {
|
||||
if (pDataBlocks->tsSource == TSDB_USE_SERVER_TS) {
|
||||
return TSDB_CODE_FAILED; // client time/server time can not be mixed
|
||||
}
|
||||
pDataBlocks->tsSource = TSDB_USE_CLI_TS;
|
||||
}
|
||||
|
||||
if (k <= pDataBlocks->prevTS && (pDataBlocks->tsSource == TSDB_USE_CLI_TS)) {
|
||||
if (k <= pDataBlocks->prevTS) {
|
||||
pDataBlocks->ordered = false;
|
||||
}
|
||||
|
||||
|
|
|
@ -141,7 +141,6 @@ static int32_t createDataBlock(size_t defaultSize, int32_t rowSize, int32_t star
|
|||
dataBuf->prevTS = INT64_MIN;
|
||||
dataBuf->rowSize = rowSize;
|
||||
dataBuf->size = startOffset;
|
||||
dataBuf->tsSource = -1;
|
||||
dataBuf->vgId = dataBuf->pTableMeta->vgId;
|
||||
|
||||
assert(defaultSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);
|
||||
|
|
|
@ -72,7 +72,7 @@ protected:
|
|||
SSubmitBlk* blk = (SSubmitBlk*)(submit + 1);
|
||||
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
||||
cout << "Block:" << i << endl;
|
||||
cout << "\tuid:" << be64toh(blk->uid) << ", tid:" << ntohl(blk->tid) << ", padding:" << ntohl(blk->padding) << ", sversion:" << ntohl(blk->sversion)
|
||||
cout << "\tuid:" << be64toh(blk->uid) << ", tid:" << be64toh(blk->suid) << ", padding:" << ntohl(blk->padding) << ", sversion:" << ntohl(blk->sversion)
|
||||
<< ", dataLen:" << ntohl(blk->dataLen) << ", schemaLen:" << ntohl(blk->schemaLen) << ", numOfRows:" << ntohs(blk->numOfRows) << endl;
|
||||
blk = (SSubmitBlk*)(blk->data + ntohl(blk->dataLen));
|
||||
}
|
||||
|
|
|
@ -52,7 +52,11 @@ protected:
|
|||
const string syntaxTreeStr = toString(query_->pRoot, false);
|
||||
|
||||
SLogicNode* pLogicNode = nullptr;
|
||||
SPlanContext cxt = { .queryId = 1, .acctId = 0, .streamQuery = streamQuery };
|
||||
SPlanContext cxt = {0};
|
||||
cxt.queryId = 1;
|
||||
cxt.acctId = 0;
|
||||
cxt.streamQuery = streamQuery;
|
||||
|
||||
setPlanContext(query_, &cxt);
|
||||
code = createLogicPlan(&cxt, &pLogicNode);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
|
|
@ -37,9 +37,9 @@ while $dbCnt < 2
|
|||
|
||||
print =============== create super table, include all type
|
||||
print notes: after nchar show ok, modify binary to nchar
|
||||
sql create table `stable` (`timestamp` timestamp, `int` int, `binary` binary(16), `nchar` binary(16)) tags (`float` float, `Binary` binary(16), `Nchar` nchar(16))
|
||||
sql create table `Stable` (`timestamp` timestamp, `int` int, `Binary` binary(32), `Nchar` binary(32)) tags (`float` float, `binary` binary(16), `nchar` nchar(16))
|
||||
|
||||
sql create table `stable` (`timestamp` timestamp, `int` int, `binary` binary(16), `nchar` nchar(16)) tags (`float` float, `Binary` binary(16), `Nchar` nchar(16))
|
||||
sql create table `Stable` (`timestamp` timestamp, `int` int, `Binary` binary(32), `Nchar` nchar(32)) tags (`float` float, `binary` binary(16), `nchar` nchar(16))
|
||||
|
||||
sql show stables
|
||||
print rows: $rows
|
||||
print $data00 $data01
|
||||
|
@ -48,12 +48,12 @@ while $dbCnt < 2
|
|||
return -1
|
||||
endi
|
||||
if $data00 != Stable then
|
||||
if $data00 != stable then
|
||||
if $data00 != stable then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data10 != Stable then
|
||||
if $data10 != stable then
|
||||
if $data10 != Stable then
|
||||
if $data10 != stable then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
@ -80,7 +80,7 @@ while $dbCnt < 2
|
|||
sql insert into `Table` values(now+0s, 20, 'Table', 'Table')(now+1s, 21, 'Table', 'Table')
|
||||
sql insert into `TAble` values(now+0s, 30, 'TAble', 'TAble')(now+1s, 31, 'TAble', 'TAble')
|
||||
sql insert into `TABle` values(now+0s, 40, 'TABle', 'TABle')(now+4s, 41, 'TABle', 'TABle')
|
||||
|
||||
|
||||
print =============== query data
|
||||
sql select * from `table`
|
||||
print rows: $rows
|
||||
|
@ -95,7 +95,8 @@ while $dbCnt < 2
|
|||
if $data02 != table then
|
||||
return -1
|
||||
endi
|
||||
if $data03 != table then
|
||||
if $data03 != table then
|
||||
print expect table, actual $data03
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -152,27 +153,31 @@ while $dbCnt < 2
|
|||
|
||||
#print =============== query data from st, but not support select * from super table, waiting fix
|
||||
#sql select count(*) from `stable`
|
||||
#print rows: $rows
|
||||
#print rows: $rows
|
||||
#print $data00 $data01 $data02 $data03
|
||||
#if $rows != 1 then
|
||||
#if $rows != 1 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data00 != 4 then
|
||||
#endi
|
||||
#if $data00 != 4 then
|
||||
# return -1
|
||||
#endi
|
||||
#endi
|
||||
#sql select count(*) from `Stable`
|
||||
#print rows: $rows
|
||||
#print rows: $rows
|
||||
#print $data00 $data01 $data02 $data03
|
||||
#if $rows != 1 then
|
||||
#if $rows != 1 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data00 != 4 then
|
||||
#endi
|
||||
#if $data00 != 4 then
|
||||
# return -1
|
||||
#endi
|
||||
#endi
|
||||
#sql select * from `stable`
|
||||
#if $rows != 4 then
|
||||
# return -1
|
||||
#endi
|
||||
##sql select * from st
|
||||
##if $rows != 4 then
|
||||
## return -1
|
||||
##endi
|
||||
|
||||
endw
|
||||
|
||||
|
@ -233,12 +238,12 @@ while $dbCnt < 2
|
|||
return -1
|
||||
endi
|
||||
if $data00 != Stable then
|
||||
if $data00 != stable then
|
||||
if $data00 != stable then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data10 != Stable then
|
||||
if $data10 != stable then
|
||||
if $data10 != Stable then
|
||||
if $data10 != stable then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
@ -324,23 +329,23 @@ while $dbCnt < 2
|
|||
|
||||
#print =============== query data from st, but not support select * from super table, waiting fix
|
||||
#sql select count(*) from `stable`
|
||||
#print rows: $rows
|
||||
#print rows: $rows
|
||||
#print $data00 $data01 $data02 $data03
|
||||
#if $rows != 1 then
|
||||
#if $rows != 1 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data00 != 4 then
|
||||
#endi
|
||||
#if $data00 != 4 then
|
||||
# return -1
|
||||
#endi
|
||||
#endi
|
||||
#sql select count(*) from `Stable`
|
||||
#print rows: $rows
|
||||
#print rows: $rows
|
||||
#print $data00 $data01 $data02 $data03
|
||||
#if $rows != 1 then
|
||||
#if $rows != 1 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data00 != 4 then
|
||||
#endi
|
||||
#if $data00 != 4 then
|
||||
# return -1
|
||||
#endi
|
||||
#endi
|
||||
#sql select * from `stable`
|
||||
#if $rows != 4 then
|
||||
# return -1
|
||||
|
|
|
@ -5,11 +5,9 @@ sleep 500
|
|||
sql connect
|
||||
|
||||
print =============== create database
|
||||
sql drop database d0 -x step1
|
||||
step1:
|
||||
sql create database d0
|
||||
sql show databases
|
||||
if $rows != 2 then
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -25,9 +23,10 @@ endi
|
|||
|
||||
sql create table ct1 using stb tags ( 1 )
|
||||
sql create table ct2 using stb tags ( 2 )
|
||||
sql create table ct3 using stb tags ( 3 )
|
||||
sql show tables
|
||||
print $rows $data00 $data10 $data20
|
||||
if $rows != 2 then
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -89,6 +88,7 @@ print ===> rows5: $data50 $data51 $data52 $data53 $data54
|
|||
print ===> rows6: $data60 $data61 $data62 $data63 $data64
|
||||
print ===> rows7: $data70 $data71 $data72 $data73 $data74
|
||||
if $rows != 8 then
|
||||
print expect 8, actual $rows
|
||||
return -1
|
||||
endi
|
||||
if $data00 != 2 then
|
||||
|
@ -128,6 +128,92 @@ endi
|
|||
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2h) sliding(12h)
|
||||
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct2 interval(1d, 2h) sliding(12h)
|
||||
print ===> rows: $rows
|
||||
print ===> rows0: $data00 $data01 $data02 $data03 $data04 $data05
|
||||
print ===> rows1: $data10 $data11 $data12 $data13 $data14 $data15
|
||||
print ===> rows2: $data20 $data21 $data22 $data23 $data24 $data25
|
||||
print ===> rows3: $data30 $data31 $data32 $data33 $data34 $data35
|
||||
print ===> rows4: $data40 $data41 $data42 $data43 $data44 $data45
|
||||
print ===> rows5: $data50 $data51 $data52 $data53 $data54 $data55
|
||||
print ===> rows6: $data60 $data61 $data62 $data63 $data64 $data65
|
||||
print ===> rows7: $data70 $data71 $data72 $data73 $data74 $data75
|
||||
#if $rows != 8 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data00 != 1 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data10 != 2 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data20 != 2 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data30 != 2 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data40 != 2 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data50 != 2 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data60 != 2 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data70 != 1 then
|
||||
# return -1
|
||||
#endi
|
||||
|
||||
print =============== insert data into child table ct3 (n)
|
||||
sql insert into ct3 values ( '2021-12-21 01:01:01.000', 1 )
|
||||
sql insert into ct3 values ( '2021-12-31 01:01:01.000', 1 )
|
||||
sql insert into ct3 values ( '2022-01-01 01:01:06.000', 2 )
|
||||
sql insert into ct3 values ( '2022-01-07 01:01:10.000', 3 )
|
||||
sql insert into ct3 values ( '2022-01-31 01:01:16.000', 4 )
|
||||
sql insert into ct3 values ( '2022-02-01 01:01:20.000', 5 )
|
||||
sql insert into ct3 values ( '2022-02-28 01:01:26.000', 6 )
|
||||
sql insert into ct3 values ( '2022-03-01 01:01:30.000', 7 )
|
||||
sql insert into ct3 values ( '2022-03-08 01:01:36.000', 8 )
|
||||
|
||||
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct3 interval(1n, 1w)
|
||||
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct3 interval(1n, 1w)
|
||||
print ===> rows: $rows
|
||||
print ===> rows0: $data00 $data01 $data02 $data03 $data04
|
||||
print ===> rows1: $data10 $data11 $data12 $data13 $data14
|
||||
print ===> rows2: $data20 $data21 $data22 $data23 $data24
|
||||
print ===> rows3: $data30 $data31 $data32 $data33 $data34
|
||||
print ===> rows4: $data40 $data41 $data42 $data43 $data44
|
||||
#if $rows != 5 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data00 != 1 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data40 != 1 then
|
||||
# return -1
|
||||
#endi
|
||||
|
||||
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct3 interval(1n, 1w) sliding(2w)
|
||||
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct3 interval(1n, 1w) sliding(2w)
|
||||
print ===> rows: $rows
|
||||
print ===> rows0: $data00 $data01 $data02 $data03 $data04
|
||||
print ===> rows1: $data10 $data11 $data12 $data13 $data14
|
||||
print ===> rows2: $data20 $data21 $data22 $data23 $data24
|
||||
print ===> rows3: $data30 $data31 $data32 $data33 $data34
|
||||
print ===> rows4: $data40 $data41 $data42 $data43 $data44
|
||||
#if $rows != 5 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data00 != 1 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data40 != 1 then
|
||||
# return -1
|
||||
#endi
|
||||
|
||||
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct3 interval(1n, 1w) sliding(4w)
|
||||
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(*) from ct3 interval(1n, 1w) sliding(4w)
|
||||
print ===> rows: $rows
|
||||
print ===> rows0: $data00 $data01 $data02 $data03 $data04
|
||||
print ===> rows1: $data10 $data11 $data12 $data13 $data14
|
||||
print ===> rows2: $data20 $data21 $data22 $data23 $data24
|
||||
|
@ -136,230 +222,34 @@ print ===> rows4: $data40 $data41 $data42 $data43 $data44
|
|||
print ===> rows5: $data50 $data51 $data52 $data53 $data54
|
||||
print ===> rows6: $data60 $data61 $data62 $data63 $data64
|
||||
print ===> rows7: $data70 $data71 $data72 $data73 $data74
|
||||
if $rows != 7 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data60 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
return
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
sql select count(*) from car interval(1n, 10d) order by ts desc
|
||||
# tdSql.checkData(0, 1, 1)
|
||||
# tdSql.checkData(1, 1, 2)
|
||||
# tdSql.checkData(2, 1, 3)
|
||||
# tdSql.checkData(3, 1, 3)
|
||||
# tdSql.checkData(4, 1, 6)
|
||||
# tdSql.checkData(5, 1, 1)
|
||||
# tdSql.checkData(6, 1, 1)
|
||||
#
|
||||
sql select count(*) from car interval(2n, 5d)
|
||||
# tdSql.checkData(0, 1, 1)
|
||||
# tdSql.checkData(1, 1, 1)
|
||||
# tdSql.checkData(2, 1, 6)
|
||||
# tdSql.checkData(3, 1, 6)
|
||||
# tdSql.checkData(4, 1, 3)
|
||||
|
||||
sql select count(*) from car interval(2n) order by ts desc
|
||||
# tdSql.checkData(0, 1, 3)
|
||||
# tdSql.checkData(1, 1, 6)
|
||||
# tdSql.checkData(2, 1, 6)
|
||||
# tdSql.checkData(3, 1, 1)
|
||||
# tdSql.checkData(4, 1, 1)
|
||||
#
|
||||
sql select count(*) from car interval(1y, 1n)
|
||||
# tdSql.checkData(0, 1, 1)
|
||||
# tdSql.checkData(1, 1, 8)
|
||||
# tdSql.checkData(2, 1, 8)
|
||||
#
|
||||
sql select count(*) from car interval(1y, 2n)
|
||||
# tdSql.checkData(0, 1, 1)
|
||||
# tdSql.checkData(1, 1, 11)
|
||||
# tdSql.checkData(2, 1, 5)
|
||||
|
||||
sql select count(*) from car where ts > '2019-05-14 00:00:00' interval(1y, 5d)
|
||||
# tdSql.checkData(0, 1, 6)
|
||||
# tdSql.checkData(1, 1, 9)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
sql create table $mt (ts timestamp, tbcol int) TAGS(tgcol int)
|
||||
|
||||
print ====== start create child tables and insert data
|
||||
$i = 0
|
||||
while $i < $tbNum
|
||||
$tb = $tbPrefix . $i
|
||||
sql create table $tb using $mt tags( $i )
|
||||
|
||||
$x = 0
|
||||
while $x < $rowNum
|
||||
$cc = $x * 60000
|
||||
$ms = 1601481600000 + $cc
|
||||
|
||||
sql insert into $tb values ($ms , $x )
|
||||
$x = $x + 1
|
||||
endw
|
||||
|
||||
$i = $i + 1
|
||||
endw
|
||||
|
||||
print =============== step2
|
||||
$i = 1
|
||||
$tb = $tbPrefix . $i
|
||||
|
||||
sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb interval(1m)
|
||||
print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb interval(1m)
|
||||
print ===> $rows $data01 $data05
|
||||
if $rows != $rowNum then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data04 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
#print =============== step3
|
||||
#$cc = 4 * 60000
|
||||
#$ms = 1601481600000 + $cc
|
||||
#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms interval(1m)
|
||||
#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms interval(1m)
|
||||
#print ===> $rows $data01 $data05
|
||||
#if $rows != 5 then
|
||||
#if $rows != 8 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data00 != 1 then
|
||||
#endi
|
||||
#if $data00 != 2 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data04 != 1 then
|
||||
#endi
|
||||
#if $data70 != 1 then
|
||||
# return -1
|
||||
#endi
|
||||
#endi
|
||||
|
||||
#print =============== step4
|
||||
#$cc = 40 * 60000
|
||||
#$ms = 1601481600000 + $cc
|
||||
|
||||
#$cc = 1 * 60000
|
||||
#$ms2 = 1601481600000 - $cc
|
||||
|
||||
#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms and ts > $ms2 interval(1m)
|
||||
#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms and ts > $ms2 interval(1m)
|
||||
#print ===> $rows $data01 $data05
|
||||
#if $rows != 20 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data00 != 1 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data04 != 1 then
|
||||
# return -1
|
||||
#endi
|
||||
|
||||
#print =============== step5
|
||||
#$cc = 40 * 60000
|
||||
#$ms = 1601481600000 + $cc
|
||||
|
||||
#$cc = 1 * 60000
|
||||
#$ms2 = 1601481600000 - $cc
|
||||
|
||||
#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms and ts > $ms2 interval(1m) fill(value,0)
|
||||
#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $tb where ts <= $ms and ts > $ms2 interval(1m) fill(value,0)
|
||||
#print ===> $rows $data21 $data25
|
||||
#if $rows != 42 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data20 != 1 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data24 != 1 then
|
||||
# return -1
|
||||
#endi
|
||||
|
||||
#print =============== step6
|
||||
#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt interval(1m)
|
||||
#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt interval(1m)
|
||||
#print ===> $rows $data11
|
||||
#if $rows != 20 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data11 != 10 then
|
||||
# return -1
|
||||
#endi
|
||||
|
||||
#print =============== step7
|
||||
#$cc = 4 * 60000
|
||||
#$ms = 1601481600000 + $cc
|
||||
#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms interval(1m)
|
||||
#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms interval(1m)
|
||||
#print ===> $rows $data11
|
||||
#if $rows != 5 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data11 != 10 then
|
||||
# return -1
|
||||
#endi
|
||||
#sql select count(*) from car interval(1n, 10d) order by ts desc
|
||||
#sql select count(*) from car interval(2n, 5d)
|
||||
#sql select count(*) from car interval(2n) order by ts desc
|
||||
#sql select count(*) from car interval(1y, 1n)
|
||||
#sql select count(*) from car interval(1y, 2n)
|
||||
#sql select count(*) from car where ts > '2019-05-14 00:00:00' interval(1y, 5d)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
#print =============== step8
|
||||
#$cc = 40 * 60000
|
||||
#$ms1 = 1601481600000 + $cc
|
||||
#
|
||||
#$cc = 1 * 60000
|
||||
#$ms2 = 1601481600000 - $cc
|
||||
#
|
||||
#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms1 and ts > $ms2 interval(1m)
|
||||
#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms1 and ts > $ms2 interval(1m)
|
||||
#print ===> $rows $data11
|
||||
#if $rows != 20 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data11 != 10 then
|
||||
# return -1
|
||||
#endi
|
||||
#
|
||||
#print =============== step9
|
||||
#$cc = 40 * 60000
|
||||
#$ms1 = 1601481600000 + $cc
|
||||
#
|
||||
#$cc = 1 * 60000
|
||||
#$ms2 = 1601481600000 - $cc
|
||||
#
|
||||
#sql select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms1 and ts > $ms2 interval(1m) fill(value, 0)
|
||||
#print ===> select count(tbcol), sum(tbcol), max(tbcol), min(tbcol), count(tbcol) from $mt where ts <= $ms1 and ts > $ms2 interval(1m) fill(value, 0)
|
||||
#print ===> $rows $data11
|
||||
#if $rows != 42 then
|
||||
# return -1
|
||||
#endi
|
||||
#if $data11 != 10 then
|
||||
# return -1
|
||||
#endi
|
||||
|
||||
print =============== clear
|
||||
#sql drop database $db
|
||||
#sql show databases
|
||||
#if $rows != 0 then
|
||||
# return -1
|
||||
#endi
|
||||
|
||||
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -1 +1 @@
|
|||
Subproject commit 8145dd1713ab9e7652ea621ca7e6895fd0b21d65
|
||||
Subproject commit f36b07f710d661dca88fdd70e73b5e3e16a960e0
|
Loading…
Reference in New Issue