Merge branch '3.0' into feature/qnode

This commit is contained in:
dapan1121 2022-06-05 17:49:12 +08:00 committed by GitHub
commit b375531ade
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 692 additions and 753 deletions

View File

@ -193,13 +193,11 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp)
TD_DEF_MSG_TYPE(TDMT_VND_GET_TSMA_EXP_WNDS, "vnode-get-tsma-expired-windows", SVGetTsmaExpWndsReq, SVGetTsmaExpWndsRsp)
TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "alter-config", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "alter-replica", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_START_WRITE, "start-write", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STOP_WRITE, "stop-write", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CONFIRM_WRITE, "confirm-write", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIRM, "alter-confirm", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "compact", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp)
TD_NEW_MSG_SEG(TDMT_QND_MSG)

View File

@ -40,9 +40,9 @@ int32_t* taosGetErrno();
#define TSDB_CODE_FAILED -1 // unknown or needn't tell detail error
//common & util
#define TSDB_CODE_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0001)
#define TSDB_CODE_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0002)
#define TSDB_CODE_APP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0003)
#define TSDB_CODE_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0003)
#define TSDB_CODE_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0004)
#define TSDB_CODE_APP_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0005)
#define TSDB_CODE_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0010)
#define TSDB_CODE_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x0011)
#define TSDB_CODE_OUT_OF_SHM_MEM TAOS_DEF_ERROR_CODE(0, 0x0012)

View File

@ -45,8 +45,7 @@ extern "C" {
#define ERROR_MSG_BUF_DEFAULT_SIZE 512
#define HEARTBEAT_INTERVAL 1500 // ms
#define SYNC_ON_TOP_OF_ASYNC 0
#define SYNC_ON_TOP_OF_ASYNC 0
enum {
RES_TYPE__QUERY = 1,

View File

@ -83,6 +83,35 @@ void queryCallback1(void* param, void* res, int32_t code) {
printf("exec query:\n");
taos_query_a(param, "select * from tm1", queryCallback, param);
}
void createNewTable(TAOS* pConn, int32_t index) {
char str[1024] = {0};
sprintf(str, "create table tu%d using st2 tags(%d)", index, index);
TAOS_RES* pRes = taos_query(pConn, str);
if (taos_errno(pRes) != 0) {
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
for(int32_t i = 0; i < 1000; i += 20) {
char sql[1024] = {0};
sprintf(sql,
"insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)", index,
i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3, i + 4, i + 4, i + 5, i + 5, i + 6, i + 6, i + 7, i + 7,
i + 8, i + 8, i + 9, i + 9, i + 10, i + 10, i + 11, i + 11, i + 12, i + 12, i + 13, i + 13, i + 14, i + 14,
i + 15, i + 15, i + 16, i + 16, i + 17, i + 17, i + 18, i + 18, i + 19, i + 19);
TAOS_RES* p = taos_query(pConn, sql);
if (taos_errno(p) != 0) {
printf("failed to insert data, reason:%s\n", taos_errstr(p));
}
taos_free_result(p);
}
}
} // namespace
int main(int argc, char** argv) {
@ -590,7 +619,6 @@ TEST(testCase, generated_request_id_test) {
taosHashCleanup(phash);
}
TEST(testCase, insert_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
@ -640,50 +668,10 @@ TEST(testCase, projection_query_tables) {
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table tu2 using st2 tags(1)");
if (taos_errno(pRes) != 0) {
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
for(int32_t i = 0; i < 100; ++i) {
printf("create table :%d\n", i);
createNewTable(pConn, i);
}
taos_free_result(pRes);
for(int32_t i = 0; i < 1000; i += 20) {
char sql[1024] = {0};
sprintf(sql,
"insert into tu values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)",
i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3, i + 4, i + 4, i + 5, i + 5, i + 6, i + 6, i + 7, i + 7,
i + 8, i + 8, i + 9, i + 9, i + 10, i + 10, i + 11, i + 11, i + 12, i + 12, i + 13, i + 13, i + 14, i + 14,
i + 15, i + 15, i + 16, i + 16, i + 17, i + 17, i + 18, i + 18, i + 19, i + 19);
TAOS_RES* p = taos_query(pConn, sql);
if (taos_errno(p) != 0) {
printf("failed to insert data, reason:%s\n", taos_errstr(p));
}
taos_free_result(p);
}
printf("start to insert next table\n");
// for(int32_t i = 0; i < 1000000; i += 20) {
// char sql[1024] = {0};
// sprintf(sql,
// "insert into tu2 values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
// "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
// "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
// "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)",
// i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3, i + 4, i + 4, i + 5, i + 5, i + 6, i + 6, i + 7, i + 7,
// i + 8, i + 8, i + 9, i + 9, i + 10, i + 10, i + 11, i + 11, i + 12, i + 12, i + 13, i + 13, i + 14, i + 14,
// i + 15, i + 15, i + 16, i + 16, i + 17, i + 17, i + 18, i + 18, i + 19, i + 19);
// TAOS_RES* p = taos_query(pConn, sql);
// if (taos_errno(p) != 0) {
// printf("failed to insert data, reason:%s\n", taos_errstr(p));
// }
//
// taos_free_result(p);
// }
// pRes = taos_query(pConn, "select * from tu");
// if (taos_errno(pRes) != 0) {
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
@ -705,74 +693,74 @@ TEST(testCase, projection_query_tables) {
taos_close(pConn);
}
TEST(testCase, projection_query_stables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
//TEST(testCase, projection_query_stables) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_NE(pConn, nullptr);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "select ts from st1");
// if (taos_errno(pRes) != 0) {
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
//
// TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while ((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes);
pRes = taos_query(pConn, "select ts from st1");
if (taos_errno(pRes) != 0) {
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0};
while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
}
taos_free_result(pRes);
taos_close(pConn);
}
TEST(testCase, agg_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) {
printf("failed to use db, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
taos_free_result(pRes);
pRes = taos_query(pConn, "show stables");
if (taos_errno(pRes) != 0) {
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
int32_t n = 0;
char str[512] = {0};
while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t* length = taos_fetch_lengths(pRes);
for(int32_t i = 0; i < numOfFields; ++i) {
printf("(%d):%d " , i, length[i]);
}
printf("\n");
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
memset(str, 0, sizeof(str));
}
taos_free_result(pRes);
taos_close(pConn);
}
//TEST(testCase, agg_query_tables) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_NE(pConn, nullptr);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
// if (taos_errno(pRes) != 0) {
// printf("failed to use db, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "show stables");
// if (taos_errno(pRes) != 0) {
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// }
//
// TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// int32_t n = 0;
// char str[512] = {0};
// while ((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t* length = taos_fetch_lengths(pRes);
// for(int32_t i = 0; i < numOfFields; ++i) {
// printf("(%d):%d " , i, length[i]);
// }
// printf("\n");
//
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// memset(str, 0, sizeof(str));
// }
//
// taos_free_result(pRes);
// taos_close(pConn);
//}
#endif
/*

View File

@ -213,6 +213,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DEPLOY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_TIMEOUT, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;

View File

@ -361,6 +361,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;

View File

@ -118,25 +118,36 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
pMsg = *(SRpcMsg **)taosArrayGet(pArray, m);
code = vnodePreprocessReq(pVnode->pImpl, pMsg);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) continue;
if (code != 0) {
dError("vgId:%d, msg:%p failed to write since %s", pVnode->vgId, pMsg, tstrerror(code));
vmSendRsp(pMsg, code);
if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
dTrace("vgId:%d, msg:%p in progress and no rsp", pVnode->vgId, pMsg);
continue;
}
code = syncPropose(sync, pMsg, false);
if (pMsg->msgType != TDMT_VND_ALTER_REPLICA) {
code = syncPropose(sync, pMsg, false);
}
if (code == TAOS_SYNC_PROPOSE_SUCCESS) {
dTrace("vgId:%d, msg:%p is proposed and no rsp", pVnode->vgId, pMsg);
continue;
} else if (code == TAOS_SYNC_PROPOSE_NOT_LEADER) {
dTrace("vgId:%d, msg:%p is redirect since not leader", pVnode->vgId, pMsg);
SEpSet newEpSet = {0};
syncGetEpSet(sync, &newEpSet);
newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps;
SEp *pEp = &newEpSet.eps[newEpSet.inUse];
if (pEp->port == tsServerPort && strcmp(pEp->fqdn, tsLocalFqdn) == 0) {
newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps;
}
dTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", pVnode->vgId, pMsg,
newEpSet.numOfEps, newEpSet.inUse);
for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
dTrace("vgId:%d, msg:%p ep:%s:%u", pVnode->vgId, pMsg, newEpSet.eps[i].fqdn, newEpSet.eps[i].port);
}
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
tmsgSendRedirectRsp(&rsp, &newEpSet);
} else {
dError("vgId:%d, msg:%p failed to write since %s", pVnode->vgId, pMsg, tstrerror(code));
dError("vgId:%d, msg:%p failed to propose write since %s, code:0x%x", pVnode->vgId, pMsg, tstrerror(code), code);
vmSendRsp(pMsg, code);
}
}
@ -251,7 +262,6 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
switch (qtype) {
case QUERY_QUEUE:
vnodePreprocessQueryMsg(pVnode->pImpl, pMsg);
dTrace("vgId:%d, msg:%p put into vnode-query queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pQueryQ, pMsg);
break;

View File

@ -34,7 +34,7 @@ int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup);
int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups);
SArray *mndBuildDnodesArray(SMnode *pMnode);
int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray);
int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray, SVnodeGid *del1, SVnodeGid *del2);
int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray, SVnodeGid *pVgId);
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, bool standby);
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);

View File

@ -30,25 +30,25 @@ static int32_t mndBnodeActionInsert(SSdb *pSdb, SBnodeObj *pObj);
static int32_t mndBnodeActionUpdate(SSdb *pSdb, SBnodeObj *pOld, SBnodeObj *pNew);
static int32_t mndBnodeActionDelete(SSdb *pSdb, SBnodeObj *pObj);
static int32_t mndProcessCreateBnodeReq(SRpcMsg *pReq);
static int32_t mndProcessCreateBnodeRsp(SRpcMsg *pRsp);
static int32_t mndProcessDropBnodeReq(SRpcMsg *pReq);
static int32_t mndProcessDropBnodeRsp(SRpcMsg *pRsp);
static int32_t mndRetrieveBnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextBnode(SMnode *pMnode, void *pIter);
int32_t mndInitBnode(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_BNODE,
.keyType = SDB_KEY_INT32,
.encodeFp = (SdbEncodeFp)mndBnodeActionEncode,
.decodeFp = (SdbDecodeFp)mndBnodeActionDecode,
.insertFp = (SdbInsertFp)mndBnodeActionInsert,
.updateFp = (SdbUpdateFp)mndBnodeActionUpdate,
.deleteFp = (SdbDeleteFp)mndBnodeActionDelete};
SSdbTable table = {
.sdbType = SDB_BNODE,
.keyType = SDB_KEY_INT32,
.encodeFp = (SdbEncodeFp)mndBnodeActionEncode,
.decodeFp = (SdbDecodeFp)mndBnodeActionDecode,
.insertFp = (SdbInsertFp)mndBnodeActionInsert,
.updateFp = (SdbUpdateFp)mndBnodeActionUpdate,
.deleteFp = (SdbDeleteFp)mndBnodeActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_BNODE, mndProcessCreateBnodeReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_BNODE, mndProcessDropBnodeReq);
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_BNODE_RSP, mndProcessCreateBnodeRsp);
mndSetMsgHandle(pMnode, TDMT_DND_DROP_BNODE_RSP, mndProcessDropBnodeRsp);
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_BNODE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_DND_DROP_BNODE_RSP, mndTransProcessRsp);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_BNODE, mndRetrieveBnodes);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_BNODE, mndCancelGetNextBnode);
@ -427,16 +427,6 @@ _OVER:
return code;
}
static int32_t mndProcessCreateBnodeRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndProcessDropBnodeRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndRetrieveBnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;

View File

@ -263,7 +263,8 @@ void mndReleaseDb(SMnode *pMnode, SDbObj *pDb) {
sdbRelease(pSdb, pDb);
}
static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, bool standby) {
static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
bool standby) {
STransAction action = {0};
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
@ -288,6 +289,32 @@ static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *p
return 0;
}
static int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
int32_t contLen = sizeof(SMsgHead);
SMsgHead *pHead = taosMemoryMalloc(contLen);
if (pHead == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(pVgroup->vgId);
action.pCont = pHead;
action.contLen = contLen;
action.msgType = TDMT_VND_ALTER_CONFIRM;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pHead);
return -1;
}
return 0;
}
static int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType) {
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
@ -415,7 +442,6 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = TSDB_DEFAULT_CACHE_LAST_ROW;
if (pCfg->numOfRetensions < 0) pCfg->numOfRetensions = 0;
if (pCfg->schemaless < 0) pCfg->schemaless = TSDB_DB_SCHEMALESS_OFF;
}
static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
@ -726,30 +752,32 @@ static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
mndTransSetSerial(pTrans);
if (newVgroup.replica < pDb->cfg.replications) {
mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
mInfo("db:%s, vgId:%d, vn:0 dnode:%d, will add 2 vnodes", pVgroup->dbName, pVgroup->vgId,
pVgroup->vnodeGid[0].dnodeId);
if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) {
mError("db:%s, failed to add vnode to vgId:%d since %s", pDb->name, newVgroup.vgId, terrstr());
return -1;
}
newVgroup.replica = pDb->cfg.replications;
if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[1], true) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
if (mndAddVnodeToVgroup(pMnode, &newVgroup, pArray) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[2], true) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
} else {
mInfo("db:%s, vgId:%d, will remove 2 vnodes", pVgroup->dbName, pVgroup->vgId);
SVnodeGid del1 = {0};
SVnodeGid del2 = {0};
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del1, &del2) != 0) {
mError("db:%s, failed to remove vnode from vgId:%d since %s", pDb->name, newVgroup.vgId, terrstr());
return -1;
}
newVgroup.replica = pDb->cfg.replications;
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del1) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del1, true) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
SVnodeGid del2 = {0};
if (mndRemoveVnodeFromVgroup(pMnode, &newVgroup, pArray, &del2) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del2, true) != 0) return -1;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
}
SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
@ -1341,7 +1369,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
int32_t numOfTable = mndGetDBTableNum(pDb, pMnode);
if (pDbVgVersion->vgVersion >= pDb->vgVersion && numOfTable == pDbVgVersion->numOfTable) {
mDebug("db:%s, version & numOfTable not changed", pDbVgVersion->dbFName);
mDebug("db:%s, version and numOfTable not changed", pDbVgVersion->dbFName);
mndReleaseDb(pMnode, pDb);
continue;
} else {
@ -1433,12 +1461,22 @@ const char *mndGetDbStr(const char *src) {
int64_t getValOfDiffPrecision(int8_t unit, int64_t val) {
int64_t v = 0;
switch(unit) {
case 's': v = val / 1000; break;
case 'm': v = val / tsTickPerMin[TSDB_TIME_PRECISION_MILLI]; break;
case 'h': v = val / (tsTickPerMin[TSDB_TIME_PRECISION_MILLI] * 60); break;
case 'd': v = val / (tsTickPerMin[TSDB_TIME_PRECISION_MILLI] * 24 * 60); break;
case 'w': v = val / (tsTickPerMin[TSDB_TIME_PRECISION_MILLI] * 24 * 60 * 7); break;
switch (unit) {
case 's':
v = val / 1000;
break;
case 'm':
v = val / tsTickPerMin[TSDB_TIME_PRECISION_MILLI];
break;
case 'h':
v = val / (tsTickPerMin[TSDB_TIME_PRECISION_MILLI] * 60);
break;
case 'd':
v = val / (tsTickPerMin[TSDB_TIME_PRECISION_MILLI] * 24 * 60);
break;
case 'w':
v = val / (tsTickPerMin[TSDB_TIME_PRECISION_MILLI] * 24 * 60 * 7);
break;
default:
break;
}
@ -1446,32 +1484,32 @@ int64_t getValOfDiffPrecision(int8_t unit, int64_t val) {
return v;
}
char* buildRetension(SArray* pRetension) {
char *buildRetension(SArray *pRetension) {
size_t size = taosArrayGetSize(pRetension);
if (size == 0) {
return NULL;
}
char* p1 = taosMemoryCalloc(1, 100);
SRetention* p = taosArrayGet(pRetension, 0);
char *p1 = taosMemoryCalloc(1, 100);
SRetention *p = taosArrayGet(pRetension, 0);
int32_t len = 2;
int64_t v1 = getValOfDiffPrecision(p->freqUnit, p->freq);
int64_t v2 = getValOfDiffPrecision(p->keepUnit, p->keep);
len += sprintf(p1 + len, "%"PRId64"%c:%"PRId64"%c,", v1, p->freqUnit, v2, p->keepUnit);
len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c,", v1, p->freqUnit, v2, p->keepUnit);
p = taosArrayGet(pRetension, 1);
v1 = getValOfDiffPrecision(p->freqUnit, p->freq);
v2 = getValOfDiffPrecision(p->keepUnit, p->keep);
len += sprintf(p1 + len, "%"PRId64"%c:%"PRId64"%c,", v1, p->freqUnit, v2, p->keepUnit);
len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c,", v1, p->freqUnit, v2, p->keepUnit);
p = taosArrayGet(pRetension, 2);
v1 = getValOfDiffPrecision(p->freqUnit, p->freq);
v2 = getValOfDiffPrecision(p->keepUnit, p->keep);
len += sprintf(p1 + len, "%"PRId64"%c:%"PRId64"%c", v1, p->freqUnit, v2, p->keepUnit);
len += sprintf(p1 + len, "%" PRId64 "%c:%" PRId64 "%c", v1, p->freqUnit, v2, p->keepUnit);
varDataSetLen(p1, len);
return p1;
@ -1586,7 +1624,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
}
colDataAppend(pColInfo, rows, cacheModel, null);
#endif
colDataAppend(pColInfo, rows, (const char*) &pDb->cfg.cacheLastRow, false);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.cacheLastRow, false);
char *prec = NULL;
switch (pDb->cfg.precision) {
@ -1618,7 +1656,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.schemaless, false);
char* p = buildRetension(pDb->cfg.pRetensions);
char *p = buildRetension(pDb->cfg.pRetensions);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
if (p == NULL) {

View File

@ -30,26 +30,26 @@ static int32_t mndQnodeActionInsert(SSdb *pSdb, SQnodeObj *pObj);
static int32_t mndQnodeActionUpdate(SSdb *pSdb, SQnodeObj *pOld, SQnodeObj *pNew);
static int32_t mndQnodeActionDelete(SSdb *pSdb, SQnodeObj *pObj);
static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq);
static int32_t mndProcessCreateQnodeRsp(SRpcMsg *pRsp);
static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq);
static int32_t mndProcessDropQnodeRsp(SRpcMsg *pRsp);
static int32_t mndProcessQnodeListReq(SRpcMsg *pReq);
static int32_t mndRetrieveQnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextQnode(SMnode *pMnode, void *pIter);
int32_t mndInitQnode(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_QNODE,
.keyType = SDB_KEY_INT32,
.encodeFp = (SdbEncodeFp)mndQnodeActionEncode,
.decodeFp = (SdbDecodeFp)mndQnodeActionDecode,
.insertFp = (SdbInsertFp)mndQnodeActionInsert,
.updateFp = (SdbUpdateFp)mndQnodeActionUpdate,
.deleteFp = (SdbDeleteFp)mndQnodeActionDelete};
SSdbTable table = {
.sdbType = SDB_QNODE,
.keyType = SDB_KEY_INT32,
.encodeFp = (SdbEncodeFp)mndQnodeActionEncode,
.decodeFp = (SdbDecodeFp)mndQnodeActionDecode,
.insertFp = (SdbInsertFp)mndQnodeActionInsert,
.updateFp = (SdbUpdateFp)mndQnodeActionUpdate,
.deleteFp = (SdbDeleteFp)mndQnodeActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_QNODE, mndProcessCreateQnodeReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_QNODE, mndProcessDropQnodeReq);
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_QNODE_RSP, mndProcessCreateQnodeRsp);
mndSetMsgHandle(pMnode, TDMT_DND_DROP_QNODE_RSP, mndProcessDropQnodeRsp);
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_QNODE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_DND_DROP_QNODE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_QNODE_LIST, mndProcessQnodeListReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_QNODE, mndRetrieveQnodes);
@ -503,16 +503,6 @@ _OVER:
return code;
}
static int32_t mndProcessCreateQnodeRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndProcessDropQnodeRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndRetrieveQnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;

View File

@ -38,25 +38,25 @@ static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb);
static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew);
static int32_t mndProcessMCreateSmaReq(SRpcMsg *pReq);
static int32_t mndProcessMDropSmaReq(SRpcMsg *pReq);
static int32_t mndProcessVCreateSmaRsp(SRpcMsg *pRsp);
static int32_t mndProcessVDropSmaRsp(SRpcMsg *pRsp);
static int32_t mndProcessGetSmaReq(SRpcMsg *pReq);
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextSma(SMnode *pMnode, void *pIter);
int32_t mndInitSma(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_SMA,
.keyType = SDB_KEY_BINARY,
.encodeFp = (SdbEncodeFp)mndSmaActionEncode,
.decodeFp = (SdbDecodeFp)mndSmaActionDecode,
.insertFp = (SdbInsertFp)mndSmaActionInsert,
.updateFp = (SdbUpdateFp)mndSmaActionUpdate,
.deleteFp = (SdbDeleteFp)mndSmaActionDelete};
SSdbTable table = {
.sdbType = SDB_SMA,
.keyType = SDB_KEY_BINARY,
.encodeFp = (SdbEncodeFp)mndSmaActionEncode,
.decodeFp = (SdbDecodeFp)mndSmaActionDecode,
.insertFp = (SdbInsertFp)mndSmaActionInsert,
.updateFp = (SdbUpdateFp)mndSmaActionUpdate,
.deleteFp = (SdbDeleteFp)mndSmaActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SMA, mndProcessMCreateSmaReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_SMA, mndProcessMDropSmaReq);
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndProcessVCreateSmaRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndProcessVDropSmaRsp);
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_SMA_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_SMA_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_GET_INDEX, mndProcessGetSmaReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveSma);
@ -637,11 +637,6 @@ _OVER:
return code;
}
static int32_t mndProcessVCreateSmaRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndSetDropSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) {
SSdbRaw *pRedoRaw = mndSmaActionEncode(pSma);
if (pRedoRaw == NULL) return -1;
@ -910,11 +905,6 @@ _OVER:
return code;
}
static int32_t mndProcessVDropSmaRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;

View File

@ -30,25 +30,25 @@ static int32_t mndSnodeActionInsert(SSdb *pSdb, SSnodeObj *pObj);
static int32_t mndSnodeActionUpdate(SSdb *pSdb, SSnodeObj *pOld, SSnodeObj *pNew);
static int32_t mndSnodeActionDelete(SSdb *pSdb, SSnodeObj *pObj);
static int32_t mndProcessCreateSnodeReq(SRpcMsg *pReq);
static int32_t mndProcessCreateSnodeRsp(SRpcMsg *pRsp);
static int32_t mndProcessDropSnodeReq(SRpcMsg *pReq);
static int32_t mndProcessDropSnodeRsp(SRpcMsg *pRsp);
static int32_t mndRetrieveSnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextSnode(SMnode *pMnode, void *pIter);
int32_t mndInitSnode(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_SNODE,
.keyType = SDB_KEY_INT32,
.encodeFp = (SdbEncodeFp)mndSnodeActionEncode,
.decodeFp = (SdbDecodeFp)mndSnodeActionDecode,
.insertFp = (SdbInsertFp)mndSnodeActionInsert,
.updateFp = (SdbUpdateFp)mndSnodeActionUpdate,
.deleteFp = (SdbDeleteFp)mndSnodeActionDelete};
SSdbTable table = {
.sdbType = SDB_SNODE,
.keyType = SDB_KEY_INT32,
.encodeFp = (SdbEncodeFp)mndSnodeActionEncode,
.decodeFp = (SdbDecodeFp)mndSnodeActionDecode,
.insertFp = (SdbInsertFp)mndSnodeActionInsert,
.updateFp = (SdbUpdateFp)mndSnodeActionUpdate,
.deleteFp = (SdbDeleteFp)mndSnodeActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_SNODE, mndProcessCreateSnodeReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_SNODE, mndProcessDropSnodeReq);
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_SNODE_RSP, mndProcessCreateSnodeRsp);
mndSetMsgHandle(pMnode, TDMT_DND_DROP_SNODE_RSP, mndProcessDropSnodeRsp);
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_SNODE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_DND_DROP_SNODE_RSP, mndTransProcessRsp);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SNODE, mndRetrieveSnodes);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_SNODE, mndCancelGetNextSnode);
@ -437,16 +437,6 @@ _OVER:
return code;
}
static int32_t mndProcessCreateSnodeRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndProcessDropSnodeRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndRetrieveSnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;

View File

@ -38,9 +38,6 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew);
static int32_t mndProcessMCreateStbReq(SRpcMsg *pReq);
static int32_t mndProcessMAlterStbReq(SRpcMsg *pReq);
static int32_t mndProcessMDropStbReq(SRpcMsg *pReq);
static int32_t mndProcessVCreateStbRsp(SRpcMsg *pRsp);
static int32_t mndProcessVAlterStbRsp(SRpcMsg *pRsp);
static int32_t mndProcessVDropStbRsp(SRpcMsg *pRsp);
static int32_t mndProcessTableMetaReq(SRpcMsg *pReq);
static int32_t mndRetrieveStb(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextStb(SMnode *pMnode, void *pIter);
@ -59,9 +56,9 @@ int32_t mndInitStb(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STB, mndProcessMCreateStbReq);
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_STB, mndProcessMAlterStbReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STB, mndProcessMDropStbReq);
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndProcessVCreateStbRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndProcessVAlterStbRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndProcessVDropStbRsp);
mndSetMsgHandle(pMnode, TDMT_VND_CREATE_STB_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb);
@ -837,11 +834,6 @@ _OVER:
return code;
}
static int32_t mndProcessVCreateStbRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) {
if (pAlter->commentLen != 0 || pAlter->ttl != 0) return 0;
@ -1459,11 +1451,6 @@ _OVER:
return code;
}
static int32_t mndProcessVAlterStbRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndSetDropStbRedoLogs(SMnode *pMnode, STrans *pTrans, SStbObj *pStb) {
SSdbRaw *pRedoRaw = mndStbActionEncode(pStb);
if (pRedoRaw == NULL) return -1;
@ -1599,11 +1586,6 @@ _OVER:
return code;
}
static int32_t mndProcessVDropStbRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndProcessTableMetaReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
int32_t code = -1;

View File

@ -35,7 +35,6 @@ static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream);
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
static int32_t mndProcessTaskDeployInternalRsp(SRpcMsg *pRsp);
/*static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);*/
/*static int32_t mndProcessDropStreamInRsp(SRpcMsg *pRsp);*/
static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq);
@ -44,17 +43,19 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter);
int32_t mndInitStream(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_STREAM,
.keyType = SDB_KEY_BINARY,
.encodeFp = (SdbEncodeFp)mndStreamActionEncode,
.decodeFp = (SdbDecodeFp)mndStreamActionDecode,
.insertFp = (SdbInsertFp)mndStreamActionInsert,
.updateFp = (SdbUpdateFp)mndStreamActionUpdate,
.deleteFp = (SdbDeleteFp)mndStreamActionDelete};
SSdbTable table = {
.sdbType = SDB_STREAM,
.keyType = SDB_KEY_BINARY,
.encodeFp = (SdbEncodeFp)mndStreamActionEncode,
.decodeFp = (SdbDecodeFp)mndStreamActionDecode,
.insertFp = (SdbInsertFp)mndStreamActionInsert,
.updateFp = (SdbUpdateFp)mndStreamActionUpdate,
.deleteFp = (SdbDeleteFp)mndStreamActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
mndSetMsgHandle(pMnode, TDMT_VND_TASK_DEPLOY_RSP, mndProcessTaskDeployInternalRsp);
mndSetMsgHandle(pMnode, TDMT_SND_TASK_DEPLOY_RSP, mndProcessTaskDeployInternalRsp);
mndSetMsgHandle(pMnode, TDMT_VND_TASK_DEPLOY_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_SND_TASK_DEPLOY_RSP, mndTransProcessRsp);
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/
@ -195,11 +196,6 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
sdbRelease(pSdb, pStream);
}
static int32_t mndProcessTaskDeployInternalRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static SDbObj *mndAcquireDbByStream(SMnode *pMnode, char *streamName) {
SName name = {0};
tNameFromString(&name, streamName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);

View File

@ -43,7 +43,6 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubs
static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg);
static int32_t mndProcessDropCgroupReq(SRpcMsg *pMsg);
static int32_t mndProcessSubscribeInternalRsp(SRpcMsg *pMsg);
static int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextSubscribe(SMnode *pMnode, void *pIter);
@ -65,20 +64,22 @@ static int32_t mndSetSubCommitLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeO
}
int32_t mndInitSubscribe(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_SUBSCRIBE,
.keyType = SDB_KEY_BINARY,
.encodeFp = (SdbEncodeFp)mndSubActionEncode,
.decodeFp = (SdbDecodeFp)mndSubActionDecode,
.insertFp = (SdbInsertFp)mndSubActionInsert,
.updateFp = (SdbUpdateFp)mndSubActionUpdate,
.deleteFp = (SdbDeleteFp)mndSubActionDelete};
SSdbTable table = {
.sdbType = SDB_SUBSCRIBE,
.keyType = SDB_KEY_BINARY,
.encodeFp = (SdbEncodeFp)mndSubActionEncode,
.decodeFp = (SdbDecodeFp)mndSubActionDecode,
.insertFp = (SdbInsertFp)mndSubActionInsert,
.updateFp = (SdbUpdateFp)mndSubActionUpdate,
.deleteFp = (SdbDeleteFp)mndSubActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndProcessSubscribeInternalRsp);
mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_DELETE_RSP, mndProcessSubscribeInternalRsp);
mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_CHANGE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_MQ_VG_DELETE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessRebalanceReq);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DROP_CGROUP, mndProcessDropCgroupReq);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DROP_CGROUP_RSP, mndProcessSubscribeInternalRsp);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DROP_CGROUP_RSP, mndTransProcessRsp);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_SUBSCRIPTIONS, mndRetrieveSubscribe);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextSubscribe);
@ -789,11 +790,6 @@ void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
sdbRelease(pSdb, pSub);
}
static int32_t mndProcessSubscribeInternalRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndSetDropSubRedoLogs(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub) {
SSdbRaw *pRedoRaw = mndSubActionEncode(pSub);
if (pRedoRaw == NULL) return -1;

View File

@ -37,7 +37,6 @@ static int32_t mndTopicActionDelete(SSdb *pSdb, SMqTopicObj *pTopic);
static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj *pNewTopic);
static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq);
static int32_t mndProcessDropTopicReq(SRpcMsg *pReq);
static int32_t mndProcessDropTopicInRsp(SRpcMsg *pRsp);
static int32_t mndRetrieveTopic(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
@ -45,17 +44,19 @@ static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
static int32_t mndSetDropTopicCommitLogs(SMnode *pMnode, STrans *pTrans, SMqTopicObj *pTopic);
int32_t mndInitTopic(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_TOPIC,
.keyType = SDB_KEY_BINARY,
.encodeFp = (SdbEncodeFp)mndTopicActionEncode,
.decodeFp = (SdbDecodeFp)mndTopicActionDecode,
.insertFp = (SdbInsertFp)mndTopicActionInsert,
.updateFp = (SdbUpdateFp)mndTopicActionUpdate,
.deleteFp = (SdbDeleteFp)mndTopicActionDelete};
SSdbTable table = {
.sdbType = SDB_TOPIC,
.keyType = SDB_KEY_BINARY,
.encodeFp = (SdbEncodeFp)mndTopicActionEncode,
.decodeFp = (SdbDecodeFp)mndTopicActionDecode,
.insertFp = (SdbInsertFp)mndTopicActionInsert,
.updateFp = (SdbUpdateFp)mndTopicActionUpdate,
.deleteFp = (SdbDeleteFp)mndTopicActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TOPIC, mndProcessCreateTopicReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndTransProcessRsp);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
@ -607,11 +608,6 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
return TSDB_CODE_ACTION_IN_PROGRESS;
}
static int32_t mndProcessDropTopicInRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTopics) {
SSdb *pSdb = pMnode->pSdb;
SDbObj *pDb = mndAcquireDb(pMnode, dbName);

View File

@ -870,8 +870,15 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray)
pAction->rawWritten = 0;
pAction->msgSent = 0;
pAction->msgReceived = 0;
if (pAction->errCode == TSDB_CODE_RPC_REDIRECT) {
pAction->epSet.inUse = (pAction->epSet.inUse + 1) % pAction->epSet.numOfEps;
mDebug("trans:%d, %s:%d execute status is reset and set epset inuse:%d", pTrans->id, mndTransStr(pAction->stage),
action, pAction->epSet.inUse);
} else {
mDebug("trans:%d, %s:%d execute status is reset", pTrans->id, mndTransStr(pAction->stage), action);
}
pAction->errCode = 0;
mDebug("trans:%d, %s:%d execute status is reset", pTrans->id, mndTransStr(pAction->stage), action);
}
}

View File

@ -29,11 +29,6 @@ static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
static int32_t mndProcessCreateVnodeRsp(SRpcMsg *pRsp);
static int32_t mndProcessAlterVnodeRsp(SRpcMsg *pRsp);
static int32_t mndProcessDropVnodeRsp(SRpcMsg *pRsp);
static int32_t mndProcessCompactVnodeRsp(SRpcMsg *pRsp);
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
static int32_t mndRetrieveVnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
@ -50,11 +45,12 @@ int32_t mndInitVgroup(SMnode *pMnode) {
.deleteFp = (SdbDeleteFp)mndVgroupActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndProcessCreateVnodeRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_REPLICA_RSP, mndProcessAlterVnodeRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIG_RSP, mndProcessAlterVnodeRsp);
mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndProcessDropVnodeRsp);
mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndProcessCompactVnodeRsp);
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_REPLICA_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIG_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIRM_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndTransProcessRsp);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup);
@ -512,12 +508,12 @@ int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
mDebug("dnode:%d, equivalent vnodes:%d", pDnode->id, pDnode->numOfVnodes);
}
int32_t maxPos = 1;
SVnodeGid *pVgid = &pVgroup->vnodeGid[pVgroup->replica];
for (int32_t d = 0; d < taosArrayGetSize(pArray); ++d) {
SDnodeObj *pDnode = taosArrayGet(pArray, d);
bool used = false;
for (int32_t vn = 0; vn < maxPos; ++vn) {
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
if (pDnode->id == pVgroup->vnodeGid[vn].dnodeId) {
used = true;
break;
@ -530,59 +526,58 @@ int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
return -1;
}
SVnodeGid *pVgid = &pVgroup->vnodeGid[maxPos];
pVgid->dnodeId = pDnode->id;
pVgid->role = TAOS_SYNC_STATE_ERROR;
pDnode->numOfVnodes++;
mInfo("db:%s, vgId:%d, vn:%d dnode:%d, is added", pVgroup->dbName, pVgroup->vgId, pVgroup->replica,
pVgid->dnodeId);
mInfo("db:%s, vgId:%d, vnode_index:%d dnode:%d is added", pVgroup->dbName, pVgroup->vgId, maxPos, pVgid->dnodeId);
maxPos++;
if (maxPos == 3) return 0;
pVgroup->replica++;
pDnode->numOfVnodes++;
return 0;
}
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
mError("db:%s, failed to add vnode to vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, terrstr());
return -1;
}
int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray, SVnodeGid *del1, SVnodeGid *del2) {
int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray, SVnodeGid *pDelVgid) {
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
SDnodeObj *pDnode = taosArrayGet(pArray, i);
mDebug("dnode:%d, equivalent vnodes:%d", pDnode->id, pDnode->numOfVnodes);
}
int32_t removedNum = 0;
int32_t code = -1;
for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
SDnodeObj *pDnode = taosArrayGet(pArray, d);
for (int32_t vn = 0; vn < TSDB_MAX_REPLICA; ++vn) {
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
if (pVgid->dnodeId == pDnode->id) {
if (removedNum == 0) *del1 = *pVgid;
if (removedNum == 1) *del2 = *pVgid;
mInfo("db:%s, vgId:%d, vn:%d dnode:%d is removed", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
memset(pVgid, 0, sizeof(SVnodeGid));
removedNum++;
mInfo("db:%s, vgId:%d, vn:%d dnode:%d, is removed", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
pDnode->numOfVnodes--;
if (removedNum == 2) goto _OVER;
pVgroup->replica--;
*pDelVgid = *pVgid;
*pVgid = pVgroup->vnodeGid[pVgroup->replica];
memset(&pVgroup->vnodeGid[pVgroup->replica], 0, sizeof(SVnodeGid));
code = 0;
goto _OVER;
}
}
}
_OVER:
if (removedNum != 2) return -1;
for (int32_t vn = 1; vn < TSDB_MAX_REPLICA; ++vn) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
if (pVgid->dnodeId != 0) {
memcpy(&pVgroup->vnodeGid[0], pVgid, sizeof(SVnodeGid));
memset(pVgid, 0, sizeof(SVnodeGid));
}
if (code != 0) {
terrno = TSDB_CODE_APP_ERROR;
mError("db:%s, failed to remove vnode from vgId:%d since %s", pVgroup->dbName, pVgroup->vgId, terrstr());
return -1;
}
mInfo("db:%s, vgId:%d, dnode:%d is keeped", pVgroup->dbName, pVgroup->vgId, pVgroup->vnodeGid[0].dnodeId);
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[vn];
mInfo("db:%s, vgId:%d, vn:%d dnode:%d is reserved", pVgroup->dbName, pVgroup->vgId, vn, pVgid->dnodeId);
}
return 0;
}
@ -605,23 +600,6 @@ SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) {
return epset;
}
static int32_t mndProcessCreateVnodeRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndProcessAlterVnodeRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndProcessDropVnodeRsp(SRpcMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndProcessCompactVnodeRsp(SRpcMsg *pRsp) { return 0; }
static bool mndGetVgroupMaxReplicaFp(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3) {
SVgObj *pVgroup = pObj;
int64_t uid = *(int64_t *)p1;

View File

@ -15,14 +15,15 @@
#include "vnd.h"
static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0;
@ -99,11 +100,11 @@ int32_t vnodePreprocessReq(SVnode *pVnode, SRpcMsg *pMsg) {
return code;
}
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
void *ptr = NULL;
void *pReq;
int len;
int ret;
int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
void *ptr = NULL;
void *pReq;
int32_t len;
int32_t ret;
vTrace("vgId:%d, start to process write request %s, version %" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
version);
@ -158,6 +159,9 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
pMsg->contLen - sizeof(SMsgHead)) < 0) {
}
} break;
case TDMT_VND_ALTER_CONFIRM:
vnodeProcessAlterConfirmReq(pVnode, version, pReq, len, pRsp);
break;
case TDMT_VND_ALTER_CONFIG:
break;
default:
@ -190,7 +194,7 @@ _err:
return -1;
}
int32_t vnodePreprocessQueryMsg(SVnode * pVnode, SRpcMsg * pMsg) {
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
if (TDMT_VND_QUERY != pMsg->msgType) {
return 0;
}
@ -212,7 +216,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
}
}
int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("message in fetch queue is processing");
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
@ -267,7 +271,7 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
}
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
if (syncEnvIsStart()) {
@ -357,7 +361,7 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
return ret;
}
static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SVCreateStbReq req = {0};
SDecoder coder;
@ -389,9 +393,9 @@ _err:
return -1;
}
static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SDecoder decoder = {0};
int rcode = 0;
int32_t rcode = 0;
SVCreateTbBatchReq req = {0};
SVCreateTbReq *pCreateReq;
SVCreateTbBatchRsp rsp = {0};
@ -422,7 +426,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq,
}
// loop to create table
for (int iReq = 0; iReq < req.nReqs; iReq++) {
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
// validate hash
@ -477,7 +481,7 @@ _exit:
return rcode;
}
static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SVCreateStbReq req = {0};
SDecoder dc = {0};
@ -506,9 +510,9 @@ static int vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq,
return 0;
}
static int vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SVDropStbReq req = {0};
int rcode = TSDB_CODE_SUCCESS;
int32_t rcode = TSDB_CODE_SUCCESS;
SDecoder decoder = {0};
pRsp->msgType = TDMT_VND_CREATE_STB_RSP;
@ -535,12 +539,12 @@ _exit:
return 0;
}
static int vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
static int32_t vnodeProcessAlterTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SVAlterTbReq vAlterTbReq = {0};
SVAlterTbRsp vAlterTbRsp = {0};
SDecoder dc = {0};
int rcode = 0;
int ret;
int32_t rcode = 0;
int32_t ret;
SEncoder ec = {0};
STableMetaRsp vMetaRsp = {0};
@ -582,12 +586,12 @@ _exit:
return 0;
}
static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SVDropTbBatchReq req = {0};
SVDropTbBatchRsp rsp = {0};
SDecoder decoder = {0};
SEncoder encoder = {0};
int ret;
int32_t ret;
SArray *tbUids = NULL;
pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
@ -609,7 +613,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in
rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
if (tbUids == NULL || rsp.pArray == NULL) goto _exit;
for (int iReq = 0; iReq < req.nReqs; iReq++) {
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
SVDropTbReq *pDropTbReq = req.pReqs + iReq;
SVDropTbRsp dropTbRsp = {0};
@ -641,7 +645,8 @@ _exit:
return 0;
}
static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, const char *tags) {
static int32_t vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter,
const char *tags) {
SSubmitBlkIter blkIter = {0};
STSchema *pSchema = NULL;
tb_uid_t suid = 0;
@ -675,7 +680,7 @@ static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSub
return TSDB_CODE_SUCCESS;
}
static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
static int32_t vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) {
ASSERT(pMsg != NULL);
SSubmitMsgIter msgIter = {0};
SMeta *pMeta = pVnode->pMeta;
@ -692,7 +697,7 @@ static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char
return 0;
}
static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
SSubmitRsp submitRsp = {0};
SSubmitMsgIter msgIter = {0};
@ -808,7 +813,7 @@ _exit:
return 0;
}
static int vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
SVCreateTSmaReq req = {0};
SDecoder coder;
@ -859,3 +864,13 @@ _err:
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen) {
return vnodeProcessCreateTSmaReq(pVnode, 1, pCont, contLen, NULL);
}
static int32_t vnodeProcessAlterConfirmReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
vInfo("vgId:%d, alter replica confim msg is processed", TD_VID(pVnode));
pRsp->msgType = TDMT_VND_ALTER_CONFIRM_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
pRsp->pCont = NULL;
pRsp->contLen = 0;
return 0;
}

View File

@ -69,7 +69,7 @@ int32_t vnodeSyncAlter(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = syncReconfig(pVnode->sync, &cfg);
if (code == TAOS_SYNC_PROPOSE_SUCCESS) {
// todo refactor
SRpcMsg rsp = {.info = pMsg->info, .code = terrno};
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
tmsgSendRsp(&rsp);
return TSDB_CODE_ACTION_IN_PROGRESS;
}

View File

@ -44,7 +44,7 @@ int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, SName *name) {
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:%" PRIx64 " task %d type %d initialized, tableName:%s", pJob->queryId, taskIdx, task.type, name->tname);
qDebug("QID:0x%" PRIx64 " task %d type %d initialized, tableName:%s", pJob->queryId, taskIdx, task.type, name->tname);
return TSDB_CODE_SUCCESS;
}
@ -143,7 +143,7 @@ int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, SName *name) {
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:%" PRIx64 " task %d type %d initialized, tableName:%s", pJob->queryId, taskIdx, task.type, name->tname);
qDebug("QID:0x%" PRIx64 " task %d type %d initialized, tableName:%s", pJob->queryId, taskIdx, task.type, name->tname);
return TSDB_CODE_SUCCESS;
}
@ -247,7 +247,7 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
*taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dbCfgNum + indexNum + userNum + dbInfoNum;
if (*taskNum <= 0) {
ctgError("Empty input for job, no need to retrieve meta, reqId:0x%" PRIx64, reqId);
ctgDebug("Empty input for job, no need to retrieve meta, reqId:0x%" PRIx64, reqId);
return TSDB_CODE_SUCCESS;
}

View File

@ -303,13 +303,17 @@ typedef struct SScanInfo {
int32_t numOfDesc;
} SScanInfo;
typedef struct SSampleExecInfo {
double sampleRatio; // data block sample ratio, 1 by default
uint32_t seed; // random seed value
} SSampleExecInfo;
typedef struct STableScanInfo {
void* dataReader;
SReadHandle readHandle;
SFileBlockLoadRecorder readRecorder;
int64_t numOfRows;
int64_t elapsedTime;
// int32_t prevGroupId; // previous table group id
SScanInfo scanInfo;
int32_t scanTimes;
@ -330,7 +334,6 @@ typedef struct STableScanInfo {
SQueryTableDataCond cond;
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
int32_t dataBlockLoadFlag;
double sampleRatio; // data block sample ratio, 1 by default
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded.
SArray* pGroupCols;
@ -339,6 +342,7 @@ typedef struct STableScanInfo {
int32_t groupKeyLen; // total group by column width
SHashObj* pGroupSet; // quick locate the window object for each result
SSampleExecInfo sample; // sample execution info
int32_t curTWinIdx;
} STableScanInfo;
@ -722,7 +726,7 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWin
int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes,
int32_t groupId, SDiskbasedBuf* pBuf, SExecTaskInfo* pTaskInfo, SAggSupporter* pAggSup);
void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput);
int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
int32_t setDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
SArray* pColList);
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win);
@ -757,7 +761,11 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
char* pData, int16_t bytes, bool masterscan, uint64_t groupId,
SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup);
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, tsdbReaderT pDataReader, SReadHandle* pHandle, SArray* groupKyes, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
@ -768,9 +776,6 @@ SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName,
SNode* pCondition, SEpSet epset, SArray* colList,
SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
@ -788,9 +793,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle,
SArray* pTableIdList, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo,
STimeWindowAggSupp* pTwSup);
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle, SArray* pTableIdList,
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
@ -806,7 +810,6 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput, SSDataBlock* pResBlock, SArray* pColMatchInfo, STableListInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap,
@ -824,8 +827,6 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol);
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
int32_t checkForQueryBuf(size_t numOfTables);
@ -860,17 +861,15 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo
int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result);
int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length);
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts,
SInterval* pInterval, int32_t precision, STimeWindow* win);
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn,
int32_t startPos, TSKEY ekey, __block_search_fn_t searchFn, STableQueryInfo* item,
int32_t order);
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
int32_t precision, STimeWindow* win);
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey,
__block_search_fn_t searchFn, STableQueryInfo* item, int32_t order);
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
int32_t initSessionAggSupporter(SStreamAggSupporter* pSup, const char* pKey);
int32_t initStateAggSupporter(SStreamAggSupporter* pSup, const char* pKey);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize);
SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap,
int32_t* pIndex);
SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap, int32_t* pIndex);
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows,
int32_t start, int64_t gap, SHashObj* pStDeleted);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
@ -878,6 +877,7 @@ int64_t getSmaWaterMark(int64_t interval, double filesFactor);
bool isSmaStream(int8_t triggerType);
int32_t compareTimeWindow(const void* p1, const void* p2, const void* param);
#ifdef __cplusplus
}
#endif

View File

@ -28,18 +28,6 @@ typedef struct SCompSupporter {
int32_t order;
} SCompSupporter;
int32_t getRowNumForMultioutput(STaskAttr* pQueryAttr, bool topBottomQuery, bool stable) {
if (pQueryAttr && (!stable)) {
for (int16_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
// if (pQueryAttr->pExpr1[i].base. == FUNCTION_TOP || pQueryAttr->pExpr1[i].base.functionId == FUNCTION_BOTTOM) {
// return (int32_t)pQueryAttr->pExpr1[i].base.param[0].i;
// }
}
}
return 1;
}
int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) {
int32_t size = 0;
@ -113,35 +101,6 @@ void closeResultRow(SResultRow* pResultRow) {
pResultRow->closed = true;
}
void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow) {
if (pResultRow == NULL) {
return;
}
// the result does not put into the SDiskbasedBuf, ignore it.
if (pResultRow->pageId >= 0) {
SFilePage *page = getBufPage(pRuntimeEnv->pResultBuf, pResultRow->pageId);
int16_t offset = 0;
for (int32_t i = 0; i < pRuntimeEnv->pQueryAttr->numOfOutput; ++i) {
struct SResultRowEntryInfo *pEntryInfo = NULL;//pResultRow->pEntryInfo[i];
// int16_t size = pRuntimeEnv->pQueryAttr->pExpr1[i].base.resSchema.bytes;
// char * s = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, pResultRow->offset, offset);
// memset(s, 0, size);
// offset += size;
cleanupResultRowEntry(pEntryInfo);
}
}
pResultRow->numOfRows = 0;
pResultRow->pageId = -1;
pResultRow->offset = -1;
pResultRow->closed = false;
pResultRow->win = TSWINDOW_INITIALIZER;
}
// TODO refactor: use macro
SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, const int32_t* offset) {
assert(index >= 0 && offset != NULL);

View File

@ -2415,6 +2415,9 @@ int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code)
pRsp->compLen = htonl(pRsp->compLen);
pRsp->numOfCols = htonl(pRsp->numOfCols);
pRsp->useconds = htobe64(pRsp->useconds);
ASSERT(pSourceDataInfo->pRsp != NULL);
qDebug("fetch rsp received, index:%d, rows:%d", pSourceDataInfo->index, pRsp->numOfRows);
} else {
pSourceDataInfo->code = code;
}
@ -2463,6 +2466,8 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex);
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY);
qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu, GET_TASKID(pTaskInfo),
pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, sourceIndex, totalSources);
@ -2516,7 +2521,7 @@ void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray
}
}
int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
int32_t setDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
SArray* pColList) {
if (pColList == NULL) { // data from other sources
@ -2640,7 +2645,6 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
int32_t completed = 0;
for (int32_t i = 0; i < totalSources; ++i) {
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
completed += 1;
continue;
@ -2650,6 +2654,11 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
continue;
}
if (pDataInfo->code != TSDB_CODE_SUCCESS) {
code = pDataInfo->code;
goto _error;
}
SRetrieveTableRsp* pRsp = pDataInfo->pRsp;
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, i);
@ -2667,7 +2676,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
}
SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
code = setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
code = setDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
if (code != 0) {
taosMemoryFreeClear(pDataInfo->pRsp);
@ -2687,6 +2696,8 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
pLoadInfo->totalSize);
}
taosMemoryFreeClear(pDataInfo->pRsp);
if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) {
pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
@ -2696,7 +2707,6 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
}
}
taosMemoryFreeClear(pDataInfo->pRsp);
return pExchangeInfo->pResult;
}
@ -2710,34 +2720,6 @@ _error:
return NULL;
}
static SSDataBlock* concurrentlyLoadRemoteData(SOperatorInfo* pOperator) {
SExchangeInfo* pExchangeInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->status == OP_RES_TO_RETURN) {
return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
}
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
int64_t startTs = taosGetTimestampUs();
// Asynchronously send all fetch requests to all sources.
for (int32_t i = 0; i < totalSources; ++i) {
int32_t code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
}
int64_t endTs = taosGetTimestampUs();
qDebug("%s send all fetch request to %" PRIzu " sources completed, elapsed:%" PRId64, GET_TASKID(pTaskInfo),
totalSources, endTs - startTs);
tsem_wait(&pExchangeInfo->ready);
pOperator->status = OP_RES_TO_RETURN;
return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
}
static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
SExchangeInfo* pExchangeInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -2755,10 +2737,11 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
}
int64_t endTs = taosGetTimestampUs();
qDebug("%s send all fetch request to %" PRIzu " sources completed, elapsed:%" PRId64, GET_TASKID(pTaskInfo),
qDebug("%s send all fetch requests to %" PRIzu " sources completed, elapsed:%" PRId64, GET_TASKID(pTaskInfo),
totalSources, endTs - startTs);
tsem_wait(&pExchangeInfo->ready);
pOperator->status = OP_RES_TO_RETURN;
pOperator->cost.openCost = taosGetTimestampUs() - startTs;
return TSDB_CODE_SUCCESS;
@ -2806,7 +2789,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
SSDataBlock* pRes = pExchangeInfo->pResult;
SRetrieveTableRsp* pTableRsp = pDataInfo->pRsp;
int32_t code =
setSDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
setDataBlockFromFetchRsp(pExchangeInfo->pResult, pLoadInfo, pTableRsp->numOfRows, pTableRsp->data,
pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL);
if (pRsp->completed == 1) {
@ -2872,7 +2855,8 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) {
if (pExchangeInfo->seqLoadData) {
return seqLoadRemoteData(pOperator);
} else {
return concurrentlyLoadRemoteData(pOperator);
return concurrentlyLoadRemoteDataImpl(pOperator, pExchangeInfo, pTaskInfo);
// return concurrentlyLoadRemoteData(pOperator);
}
}
@ -2898,52 +2882,55 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo) {
return TSDB_CODE_SUCCESS;
}
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, const SNodeList* pSources, SSDataBlock* pBlock,
SExecTaskInfo* pTaskInfo) {
static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* pInfo) {
size_t numOfSources = LIST_LENGTH(pExNode->pSrcEndPoints);
pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
if (pInfo->pSourceDataInfo == NULL || pInfo->pSources == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < numOfSources; ++i) {
SNodeListNode* pNode = nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i);
taosArrayPush(pInfo->pSources, pNode);
}
return initDataSource(numOfSources, pInfo);
}
SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo) {
SExchangeInfo* pInfo = taosMemoryCalloc(1, sizeof(SExchangeInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
size_t numOfSources = LIST_LENGTH(pSources);
pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
if (pInfo->pSourceDataInfo == NULL || pInfo->pSources == NULL) {
goto _error;
}
for (int32_t i = 0; i < numOfSources; ++i) {
SNodeListNode* pNode = nodesListGetNode((SNodeList*)pSources, i);
taosArrayPush(pInfo->pSources, pNode);
}
int32_t code = initDataSource(numOfSources, pInfo);
int32_t code = initExchangeOperator(pExNode, pInfo);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pInfo->pResult = pBlock;
pInfo->seqLoadData = true;
pInfo->seqLoadData = false;
pInfo->pTransporter = pTransporter;
pInfo->pResult = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
tsem_init(&pInfo->ready, 0, 0);
pOperator->name = "ExchangeOperator";
pOperator->name = "ExchangeOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = pBlock->info.numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = pInfo->pResult->info.numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
destroyExchangeOperatorInfo, NULL, NULL, NULL);
pInfo->pTransporter = pTransporter;
return pOperator;
_error:
if (pInfo != NULL) {
destroyExchangeOperatorInfo(pInfo, numOfSources);
destroyExchangeOperatorInfo(pInfo, LIST_LENGTH(pExNode->pSrcEndPoints));
}
taosMemoryFreeClear(pInfo);
@ -4397,9 +4384,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc);
return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, pExchange->pSrcEndPoints, pResBlock, pTaskInfo);
return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
@ -4427,41 +4412,16 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return pOperator;
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
SScanPhysiNode* pScanNode = &pSysScanPhyNode->scan;
SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
int32_t numOfOutputCols = 0;
SArray* colList =
extractColMatchInfo(pScanNode->pScanCols, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_COL_ID);
SOperatorInfo* pOperator = createSysTableScanOperatorInfo(
pHandle, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, colList,
pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId);
return pOperator;
return createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
int32_t code = getTableList(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableListInfo, pTagCond);
if (code != TSDB_CODE_SUCCESS) {
return NULL;
}
int32_t num = 0;
SExprInfo* pExprInfo = createExprInfo(pScanPhyNode->pScanPseudoCols, NULL, &num);
int32_t numOfOutputCols = 0;
SArray* colList = extractColMatchInfo(pScanPhyNode->pScanPseudoCols, pDescNode, &numOfOutputCols, pTaskInfo,
COL_MATCH_FROM_COL_ID);
SOperatorInfo* pOperator =
createTagScanOperatorInfo(pHandle, pExprInfo, num, pResBlock, colList, pTableListInfo, pTaskInfo);
return pOperator;
return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
} else {
ASSERT(0);
}
@ -5022,45 +4982,6 @@ _complete:
return code;
}
void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo) {
const int32_t DEFAULT_RESULT_MSG_SIZE = 1024 * (1024 + 512);
// the minimum number of rows for projection query
const int32_t MIN_ROWS_FOR_PRJ_QUERY = 8192;
const int32_t DEFAULT_MIN_ROWS = 4096;
const float THRESHOLD_RATIO = 0.85f;
// if (isProjQuery(pQueryAttr)) {
// int32_t numOfRes = DEFAULT_RESULT_MSG_SIZE / pQueryAttr->resultRowSize;
// if (numOfRes < MIN_ROWS_FOR_PRJ_QUERY) {
// numOfRes = MIN_ROWS_FOR_PRJ_QUERY;
// }
//
// pResultInfo->capacity = numOfRes;
// } else { // in case of non-prj query, a smaller output buffer will be used.
// pResultInfo->capacity = DEFAULT_MIN_ROWS;
// }
pResultInfo->threshold = (int32_t)(pResultInfo->capacity * THRESHOLD_RATIO);
pResultInfo->totalRows = 0;
}
// TODO refactor
void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters) {
if (pFilter == NULL || numOfFilters == 0) {
return;
}
for (int32_t i = 0; i < numOfFilters; i++) {
if (pFilter[i].filterstr && pFilter[i].pz) {
taosMemoryFree((void*)(pFilter[i].pz));
}
}
taosMemoryFree(pFilter);
}
static void doDestroyTableList(STableListInfo* pTableqinfoList) {
taosArrayDestroy(pTableqinfoList->pTableList);
taosHashCleanup(pTableqinfoList->map);

View File

@ -38,15 +38,26 @@
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
typedef struct SWindowPosition {
int32_t pageId;
int32_t rowId;
} SWindowPosition;
static int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity);
static int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
const char* dbName);
static void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock);
static bool processBlockWithProbability(const SSampleExecInfo *pInfo);
bool processBlockWithProbability(const SSampleExecInfo *pInfo) {
#if 0
if (pInfo->sampleRatio == 1) {
return true;
}
uint32_t val = taosRandR((uint32_t*) &pInfo->seed);
return (val % ((uint32_t)(1/pInfo->sampleRatio))) == 0;
#else
return true;
#endif
}
static void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
for (int32_t i = 0; i < numOfOutput; ++i) {
SWITCH_ORDER(pCtx[i].order);
@ -160,8 +171,6 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
return false;
}
static void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock);
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
uint32_t* status) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -281,12 +290,9 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
STimeWindow* pTWindow = &pTableScanInfo->cond.twindows[i];
TSWAP(pTWindow->skey, pTWindow->ekey);
}
SQueryTableDataCond *pCond = &pTableScanInfo->cond;
taosqsort(pCond->twindows,
pCond->numOfTWindows,
sizeof(STimeWindow),
pCond,
compareTimeWindow);
SQueryTableDataCond* pCond = &pTableScanInfo->cond;
taosqsort(pCond->twindows, pCond->numOfTWindows, sizeof(STimeWindow), pCond, compareTimeWindow);
}
void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock) {
@ -317,18 +323,19 @@ void addTagPseudoColumnData(SReadHandle *pHandle, SExprInfo* pPseudoExpr, int32_
} else { // these are tags
STagVal tagVal = {0};
tagVal.cid = pExpr->base.pParam[0].pCol->colId;
const char *p = metaGetTableTagVal(&mr.me, pColInfoData->info.type, &tagVal);
const char* p = metaGetTableTagVal(&mr.me, pColInfoData->info.type, &tagVal);
char *data = NULL;
if(pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL){
data = tTagValToData((const STagVal *)p, false);
}else {
char* data = NULL;
if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
data = tTagValToData((const STagVal*)p, false);
} else {
data = (char*)p;
}
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
colDataAppend(pColInfoData, i, data, (data == NULL));
}
if (data && (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) && p != NULL &&
IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
taosMemoryFree(data);
@ -366,6 +373,12 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
// process this data block based on the probabilities
bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
if (!processThisBlock) {
continue;
}
tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info);
uint32_t status = 0;
@ -451,6 +464,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
qDebug("%s\t qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
}
while (pTableScanInfo->scanTimes < total) {
while (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
SSDataBlock* p = doTableScanImpl(pOperator);
@ -550,24 +564,26 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
// pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose
pInfo->readHandle = *readHandle;
pInfo->interval = extractIntervalInfo(pTableScanNode);
pInfo->sampleRatio = pTableScanNode->ratio;
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
pInfo->pResBlock = createResDataBlock(pDescNode);
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
pInfo->dataReader = pDataReader;
pInfo->scanFlag = MAIN_SCAN;
pInfo->pColMatchInfo = pColList;
pInfo->curTWinIdx = 0;
pInfo->readHandle = *readHandle;
pInfo->interval = extractIntervalInfo(pTableScanNode);
pInfo->sample.sampleRatio= pTableScanNode->ratio;
pInfo->sample.seed = taosGetTimestampSec();
pOperator->name = "TableScanOperator"; // for debug purpose
pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired;
pInfo->pResBlock = createResDataBlock(pDescNode);
pInfo->pFilterNode = pTableScanNode->scan.node.pConditions;
pInfo->dataReader = pDataReader;
pInfo->scanFlag = MAIN_SCAN;
pInfo->pColMatchInfo = pColList;
pInfo->curTWinIdx = 0;
pOperator->name = "TableScanOperator"; // for debug purpose
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = numOfCols;
pOperator->pTaskInfo = pTaskInfo;
// for table group
pInfo->pGroupCols = groupKyes;
@ -586,13 +602,13 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
// for non-blocking operator, the open cost is always 0
pOperator->cost.openCost = 0;
return pOperator;
_error:
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
_error:
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
@ -706,12 +722,12 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
taosArrayClear(pInfo->pBlockLists);
}
static bool isSessionWindow(SStreamBlockScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW;
static bool isSessionWindow(SStreamBlockScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW;
}
static bool isStateWindow(SStreamBlockScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW;
static bool isStateWindow(SStreamBlockScanInfo* pInfo) {
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE_WINDOW;
}
static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
@ -843,7 +859,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
}
return pInfo->pUpdateRes;
} else {
if (isStateWindow(pInfo) &&
if (isStateWindow(pInfo) &&
taosArrayGetSize(pInfo->sessionSup.pStreamAggSup->pScanWindow) > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
pInfo->updateResIndex = pInfo->pUpdateRes->info.rows;
@ -1429,7 +1445,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
}
SRetrieveMetaTableRsp* pTableRsp = pInfo->pRsp;
setSDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data,
setDataBlockFromFetchRsp(pInfo->pRes, &pInfo->loadInfo, pTableRsp->numOfRows, pTableRsp->data,
pTableRsp->compLen, pOperator->numOfExprs, startTs, NULL, pInfo->scanCols);
// todo log the filter info
@ -1503,48 +1519,59 @@ int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbT
return numOfRows;
}
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pResBlock, const SName* pName,
SNode* pCondition, SEpSet epset, SArray* colList,
SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId) {
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, SExecTaskInfo* pTaskInfo) {
SSysTableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SSysTableScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
goto _error;
}
pInfo->accountId = accountId;
pInfo->showRewrite = showRewrite;
pInfo->pRes = pResBlock;
pInfo->pCondition = pCondition;
pInfo->scanCols = colList;
SScanPhysiNode* pScanNode = &pScanPhyNode->scan;
SDataBlockDescNode* pDescNode = pScanNode->node.pOutputDataBlockDesc;
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
int32_t num = 0;
SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pDescNode, &num, pTaskInfo, COL_MATCH_FROM_COL_ID);
pInfo->accountId = pScanPhyNode->accountId;
pInfo->showRewrite = pScanPhyNode->showRewrite;
pInfo->pRes = pResBlock;
pInfo->pCondition = pScanNode->node.pConditions;
pInfo->scanCols = colList;
initResultSizeInfo(pOperator, 4096);
tNameAssign(&pInfo->name, pName);
tNameAssign(&pInfo->name, &pScanNode->tableName);
const char* name = tNameGetTableName(&pInfo->name);
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
pInfo->readHandle = *(SReadHandle*)readHandle;
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
} else {
tsem_init(&pInfo->ready, 0, 0);
pInfo->epSet = epset;
pInfo->epSet = pScanPhyNode->mgmtEpSet;
pInfo->readHandle = *(SReadHandle*)readHandle;
}
pOperator->name = "SysTableScanOperator";
pOperator->name = "SysTableScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = pResBlock->info.numOfCols;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->numOfExprs = pResBlock->info.numOfCols;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL);
pOperator->pTaskInfo = pTaskInfo;
return pOperator;
_error:
taosMemoryFreeClear(pInfo);
taosMemoryFreeClear(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
@ -1699,28 +1726,34 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, SExprInfo* pExpr, int32_t numOfOutput,
SSDataBlock* pResBlock, SArray* pColMatchInfo, STableListInfo* pTableListInfo,
SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
goto _error;
}
pInfo->pTableList = pTableListInfo;
pInfo->pColMatchInfo = pColMatchInfo;
pInfo->pRes = pResBlock;
pInfo->readHandle = *pReadHandle;
pInfo->curPos = 0;
pOperator->name = "TagScanOperator";
SDataBlockDescNode* pDescNode = pPhyNode->node.pOutputDataBlockDesc;
int32_t numOfExprs = 0;
SExprInfo* pExprInfo = createExprInfo(pPhyNode->pScanPseudoCols, NULL, &numOfExprs);
int32_t num = 0;
SArray* colList = extractColMatchInfo(pPhyNode->pScanPseudoCols, pDescNode, &num, pTaskInfo, COL_MATCH_FROM_COL_ID);
pInfo->pTableList = pTableListInfo;
pInfo->pColMatchInfo = colList;
pInfo->pRes = createResDataBlock(pDescNode);;
pInfo->readHandle = *pReadHandle;
pInfo->curPos = 0;
pOperator->name = "TagScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pExpr = pExpr;
pOperator->numOfExprs = numOfOutput;
pOperator->pTaskInfo = pTaskInfo;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pExpr = pExprInfo;
pOperator->numOfExprs = numOfExprs;
pOperator->pTaskInfo = pTaskInfo;
initResultSizeInfo(pOperator, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);

View File

@ -1,3 +1,18 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdatablock.h"
#include "executorimpl.h"

View File

@ -32,7 +32,6 @@ int32_t authenticate(SParseContext* pParseCxt, SQuery* pQuery);
int32_t translate(SParseContext* pParseCxt, SQuery* pQuery);
int32_t extractResultSchema(const SNode* pRoot, int32_t* numOfCols, SSchema** pSchema);
int32_t calculateConstant(SParseContext* pParseCxt, SQuery* pQuery);
int32_t isNotSchemalessDb(SParseContext* pContext, char *dbName);
#ifdef __cplusplus
}

View File

@ -180,11 +180,11 @@ static int32_t collectMetaKeyFromSelect(SCollectMetaKeyCxt* pCxt, SSelectStmt* p
}
static int32_t collectMetaKeyFromCreateTable(SCollectMetaKeyCxt* pCxt, SCreateTableStmt* pStmt) {
if (NULL == pStmt->pTags) {
return reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
} else {
return reserveDbCfgInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache);
int32_t code = reserveDbCfgInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code && NULL == pStmt->pTags) {
code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tableName, pCxt->pMetaCache);
}
return code;
}
static int32_t collectMetaKeyFromCreateMultiTable(SCollectMetaKeyCxt* pCxt, SCreateMultiTableStmt* pStmt) {
@ -192,8 +192,11 @@ static int32_t collectMetaKeyFromCreateMultiTable(SCollectMetaKeyCxt* pCxt, SCre
SNode* pNode = NULL;
FOREACH(pNode, pStmt->pSubTables) {
SCreateSubTableClause* pClause = (SCreateSubTableClause*)pNode;
code =
reserveTableMetaInCache(pCxt->pParseCxt->acctId, pClause->useDbName, pClause->useTableName, pCxt->pMetaCache);
code = reserveDbCfgInCache(pCxt->pParseCxt->acctId, pClause->dbName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
code =
reserveTableMetaInCache(pCxt->pParseCxt->acctId, pClause->useDbName, pClause->useTableName, pCxt->pMetaCache);
}
if (TSDB_CODE_SUCCESS == code) {
code = reserveTableVgroupInCache(pCxt->pParseCxt->acctId, pClause->dbName, pClause->tableName, pCxt->pMetaCache);
}

View File

@ -254,7 +254,7 @@ static int32_t createSName(SName* pName, SToken* pTableName, int32_t acctId, con
static int32_t checkAuth(SInsertParseContext* pCxt, char* pDbFname, bool* pPass) {
SParseContext* pBasicCtx = pCxt->pComCxt;
if (NULL != pCxt->pMetaCache) {
if (pBasicCtx->async) {
return getUserAuthFromCache(pCxt->pMetaCache, pBasicCtx->pUser, pDbFname, AUTH_TYPE_WRITE, pPass);
}
return catalogChkAuth(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pBasicCtx->pUser, pDbFname,
@ -263,7 +263,7 @@ static int32_t checkAuth(SInsertParseContext* pCxt, char* pDbFname, bool* pPass)
static int32_t getTableSchema(SInsertParseContext* pCxt, SName* pTbName, bool isStb, STableMeta** pTableMeta) {
SParseContext* pBasicCtx = pCxt->pComCxt;
if (NULL != pCxt->pMetaCache) {
if (pBasicCtx->async) {
return getTableMetaFromCache(pCxt->pMetaCache, pTbName, pTableMeta);
}
if (isStb) {
@ -275,7 +275,7 @@ static int32_t getTableSchema(SInsertParseContext* pCxt, SName* pTbName, bool is
static int32_t getTableVgroup(SInsertParseContext* pCxt, SName* pTbName, SVgroupInfo* pVg) {
SParseContext* pBasicCtx = pCxt->pComCxt;
if (NULL != pCxt->pMetaCache) {
if (pBasicCtx->async) {
return getTableVgroupFromCache(pCxt->pMetaCache, pTbName, pVg);
}
return catalogGetTableHashVgroup(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pTbName, pVg);
@ -305,6 +305,16 @@ static int32_t getSTableMeta(SInsertParseContext* pCxt, SName* name, char* dbFna
return getTableMetaImpl(pCxt, name, dbFname, true);
}
static int32_t getDBCfg(SInsertParseContext* pCxt, const char* pDbFName, SDbCfgInfo* pInfo) {
SParseContext* pBasicCtx = pCxt->pComCxt;
if (pBasicCtx->async) {
CHECK_CODE(getDbCfgFromCache(pCxt->pMetaCache, pDbFName, pInfo));
} else {
CHECK_CODE(catalogGetDBCfg(pBasicCtx->pCatalog, pBasicCtx->pTransporter, &pBasicCtx->mgmtEpSet, pDbFName, pInfo));
}
return TSDB_CODE_SUCCESS;
}
static int32_t findCol(SToken* pColname, int32_t start, int32_t end, SSchema* pSchema) {
while (start < end) {
if (strlen(pSchema[start].name) == pColname->n && strncmp(pColname->z, pSchema[start].name, pColname->n) == 0) {
@ -1090,10 +1100,10 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SName* name, char* tb
SName sname;
createSName(&sname, &sToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg);
char stbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(&sname, stbFName);
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(&sname, dbFName);
CHECK_CODE(getSTableMeta(pCxt, &sname, stbFName));
CHECK_CODE(getSTableMeta(pCxt, &sname, dbFName));
if (TSDB_SUPER_TABLE != pCxt->pTableMeta->tableType) {
return buildInvalidOperationMsg(&pCxt->msg, "create table only from super table is allowed");
}
@ -1282,6 +1292,14 @@ static void destroyInsertParseContext(SInsertParseContext* pCxt) {
destroyBlockArrayList(pCxt->pVgDataBlocks);
}
static int32_t checkSchemalessDb(SInsertParseContext* pCxt, char* pDbName) {
SDbCfgInfo pInfo = {0};
char fullName[TSDB_TABLE_FNAME_LEN];
snprintf(fullName, sizeof(fullName), "%d.%s", pCxt->pComCxt->acctId, pDbName);
CHECK_CODE(getDBCfg(pCxt, fullName, &pInfo));
return pInfo.schemaless ? TSDB_CODE_SML_INVALID_DB_CONF : TSDB_CODE_SUCCESS;
}
// tb_name
// [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
// [(field1_name, ...)]
@ -1335,7 +1353,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
SName name;
CHECK_CODE(createSName(&name, &tbnameToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
CHECK_CODE(isNotSchemalessDb(pCxt->pComCxt, name.dbname));
CHECK_CODE(checkSchemalessDb(pCxt, name.dbname));
tNameExtractFullName(&name, tbFName);
CHECK_CODE(taosHashPut(pCxt->pTableNameHashObj, tbFName, strlen(tbFName), &name, sizeof(SName)));
@ -1413,25 +1431,6 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
return buildOutput(pCxt);
}
int32_t isNotSchemalessDb(SParseContext* pContext, char *dbName){
SName name;
tNameSetDbName(&name, pContext->acctId, dbName, strlen(dbName));
char dbFname[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&name, dbFname);
SDbCfgInfo pInfo = {0};
int32_t code = catalogGetDBCfg(pContext->pCatalog, pContext->pTransporter, &pContext->mgmtEpSet, dbFname, &pInfo);
if (code != TSDB_CODE_SUCCESS) {
parserError("catalogGetDBCfg error, code:%s, dbFName:%s", tstrerror(code), dbFname);
return code;
}
taosArrayDestroy(pInfo.pRetensions);
if (pInfo.schemaless){
parserError("can not insert into schemaless db:%s", dbFname);
return TSDB_CODE_SML_INVALID_DB_CONF;
}
return TSDB_CODE_SUCCESS;
}
// INSERT INTO
// tb_name
// [USING stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)]
@ -1474,6 +1473,8 @@ int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery) {
if (NULL == *pQuery) {
return TSDB_CODE_OUT_OF_MEMORY;
}
} else {
context.pMetaCache = (*pQuery)->pMetaCache;
}
(*pQuery)->execMode = QUERY_EXEC_MODE_SCHEDULE;
(*pQuery)->haveResultSet = false;
@ -1588,12 +1589,20 @@ static int32_t skipUsingClause(SInsertParseSyntaxCxt* pCxt) {
static int32_t collectTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) {
SName name;
CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
CHECK_CODE(reserveDbCfgInCache(pCxt->pComCxt->acctId, name.dbname, pCxt->pMetaCache));
CHECK_CODE(reserveUserAuthInCacheExt(pCxt->pComCxt->pUser, &name, AUTH_TYPE_WRITE, pCxt->pMetaCache));
CHECK_CODE(reserveTableMetaInCacheExt(&name, pCxt->pMetaCache));
CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache));
return TSDB_CODE_SUCCESS;
}
static int32_t collectAutoCreateTableMetaKey(SInsertParseSyntaxCxt* pCxt, SToken* pTbToken) {
SName name;
CHECK_CODE(createSName(&name, pTbToken, pCxt->pComCxt->acctId, pCxt->pComCxt->db, &pCxt->msg));
CHECK_CODE(reserveTableVgroupInCacheExt(&name, pCxt->pMetaCache));
return TSDB_CODE_SUCCESS;
}
static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) {
bool hasData = false;
// for each table
@ -1622,6 +1631,7 @@ static int32_t parseInsertBodySyntax(SInsertParseSyntaxCxt* pCxt) {
// USING clause
if (TK_USING == sToken.type) {
CHECK_CODE(collectAutoCreateTableMetaKey(pCxt, &tbnameToken));
NEXT_TOKEN(pCxt->pSql, sToken);
CHECK_CODE(collectTableMetaKey(pCxt, &sToken));
CHECK_CODE(skipUsingClause(pCxt));
@ -2174,8 +2184,8 @@ static int32_t smlBuildTagRow(SArray* cols, SParsedDataColInfo* tags, SSchema* p
val.nData = kv->length;
} else if (pTagSchema->type == TSDB_DATA_TYPE_NCHAR) {
int32_t output = 0;
void *p = taosMemoryCalloc(1, kv->length * TSDB_NCHAR_SIZE);
if(p == NULL){
void* p = taosMemoryCalloc(1, kv->length * TSDB_NCHAR_SIZE);
if (p == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto end;
}

View File

@ -2653,8 +2653,23 @@ static int32_t checkTableSchema(STranslateContext* pCxt, SCreateTableStmt* pStmt
return code;
}
static int32_t checkSchemalessDb(STranslateContext* pCxt, const char* pDbName) {
if (0 != pCxt->pParseCxt->schemalessType) {
return TSDB_CODE_SUCCESS;
}
SDbCfgInfo info = {0};
int32_t code = getDBCfg(pCxt, pDbName, &info);
if (TSDB_CODE_SUCCESS == code) {
code = info.schemaless ? TSDB_CODE_SML_INVALID_DB_CONF : TSDB_CODE_SUCCESS;
}
return code;
}
static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt) {
int32_t code = checTableFactorOption(pCxt, pStmt->pOptions->filesFactor);
int32_t code = checkSchemalessDb(pCxt, pStmt->dbName);
if (TSDB_CODE_SUCCESS == code) {
code = checTableFactorOption(pCxt, pStmt->pOptions->filesFactor);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkTableRollupOption(pCxt, pStmt->pOptions->pRollupFuncs);
}
@ -2667,11 +2682,6 @@ static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt
if (TSDB_CODE_SUCCESS == code) {
code = checkTableSchema(pCxt, pStmt);
}
if (TSDB_CODE_SUCCESS == code) {
if (pCxt->pParseCxt->schemalessType == 0) {
code = isNotSchemalessDb(pCxt->pParseCxt, pStmt->dbName);
}
}
return code;
}
@ -4445,11 +4455,11 @@ static int32_t rewriteCreateMultiTable(STranslateContext* pCxt, SQuery* pQuery)
int32_t code = TSDB_CODE_SUCCESS;
SNode* pNode;
FOREACH(pNode, pStmt->pSubTables) {
if (pCxt->pParseCxt->schemalessType == 0 &&
(code = isNotSchemalessDb(pCxt->pParseCxt, ((SCreateSubTableClause*)pNode)->dbName)) != TSDB_CODE_SUCCESS) {
return code;
SCreateSubTableClause* pClause = (SCreateSubTableClause*)pNode;
code = checkSchemalessDb(pCxt, pClause->dbName);
if (TSDB_CODE_SUCCESS == code) {
code = rewriteCreateSubTable(pCxt, pClause, pVgroupHashmap);
}
code = rewriteCreateSubTable(pCxt, (SCreateSubTableClause*)pNode, pVgroupHashmap);
if (TSDB_CODE_SUCCESS != code) {
taosHashCleanup(pVgroupHashmap);
return code;
@ -4860,13 +4870,8 @@ static int32_t buildModifyVnodeArray(STranslateContext* pCxt, SAlterTableStmt* p
static int32_t rewriteAlterTable(STranslateContext* pCxt, SQuery* pQuery) {
SAlterTableStmt* pStmt = (SAlterTableStmt*)pQuery->pRoot;
int32_t code = TSDB_CODE_SUCCESS;
if (pCxt->pParseCxt->schemalessType == 0 &&
(code = isNotSchemalessDb(pCxt->pParseCxt, pStmt->dbName)) != TSDB_CODE_SUCCESS) {
return code;
}
STableMeta* pTableMeta = NULL;
int32_t code = checkSchemalessDb(pCxt, pStmt->dbName);
STableMeta* pTableMeta = NULL;
code = getTableMeta(pCxt, pStmt->dbName, pStmt->tableName, &pTableMeta);
if (TSDB_CODE_SUCCESS != code) {
return code;

View File

@ -681,6 +681,7 @@ int32_t getTableMetaFromCache(SParseMetaCache* pMetaCache, const SName* pName, S
tNameExtractFullName(pName, fullName);
STableMeta** pRes = taosHashGet(pMetaCache->pTableMeta, fullName, strlen(fullName));
if (NULL == pRes || NULL == *pRes) {
parserError("getTableMetaFromCache error: %s", fullName);
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
*pMeta = tableMetaDup(*pRes);
@ -709,6 +710,7 @@ int32_t reserveDbVgInfoInCache(int32_t acctId, const char* pDb, SParseMetaCache*
int32_t getDbVgInfoFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SArray** pVgInfo) {
SArray** pRes = taosHashGet(pMetaCache->pDbVgroup, pDbFName, strlen(pDbFName));
if (NULL == pRes) {
parserError("getDbVgInfoFromCache error: %s", pDbFName);
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
// *pRes is null, which is a legal value, indicating that the user DB has not been created
@ -736,6 +738,7 @@ int32_t getTableVgroupFromCache(SParseMetaCache* pMetaCache, const SName* pName,
tNameExtractFullName(pName, fullName);
SVgroupInfo** pRes = taosHashGet(pMetaCache->pTableVgroup, fullName, strlen(fullName));
if (NULL == pRes || NULL == *pRes) {
parserError("getTableVgroupFromCache error: %s", fullName);
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
memcpy(pVgroup, *pRes, sizeof(SVgroupInfo));
@ -750,6 +753,7 @@ int32_t getDbVgVersionFromCache(SParseMetaCache* pMetaCache, const char* pDbFNam
int32_t* pTableNum) {
SDbInfo** pRes = taosHashGet(pMetaCache->pDbCfg, pDbFName, strlen(pDbFName));
if (NULL == pRes || NULL == *pRes) {
parserError("getDbVgVersionFromCache error: %s", pDbFName);
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
*pVersion = (*pRes)->vgVer;
@ -765,6 +769,7 @@ int32_t reserveDbCfgInCache(int32_t acctId, const char* pDb, SParseMetaCache* pM
int32_t getDbCfgFromCache(SParseMetaCache* pMetaCache, const char* pDbFName, SDbCfgInfo* pInfo) {
SDbCfgInfo** pRes = taosHashGet(pMetaCache->pDbCfg, pDbFName, strlen(pDbFName));
if (NULL == pRes || NULL == *pRes) {
parserError("getDbCfgFromCache error: %s", pDbFName);
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
memcpy(pInfo, *pRes, sizeof(SDbCfgInfo));
@ -803,6 +808,7 @@ int32_t getUserAuthFromCache(SParseMetaCache* pMetaCache, const char* pUser, con
int32_t len = userAuthToStringExt(pUser, pDbFName, type, key);
bool* pRes = taosHashGet(pMetaCache->pUserAuth, key, len);
if (NULL == pRes) {
parserError("getUserAuthFromCache error: %s, %s, %d", pUser, pDbFName, type);
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
*pPass = *pRes;
@ -822,6 +828,7 @@ int32_t reserveUdfInCache(const char* pFunc, SParseMetaCache* pMetaCache) {
int32_t getUdfInfoFromCache(SParseMetaCache* pMetaCache, const char* pFunc, SFuncInfo* pInfo) {
SFuncInfo** pRes = taosHashGet(pMetaCache->pUdf, pFunc, strlen(pFunc));
if (NULL == pRes || NULL == *pRes) {
parserError("getUdfInfoFromCache error: %s", pFunc);
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
memcpy(pInfo, *pRes, sizeof(SFuncInfo));

View File

@ -59,6 +59,7 @@ class InsertTest : public Test {
}
int32_t runAsync() {
cxt_.async = true;
code_ = parseInsertSyntax(&cxt_, &res_);
if (code_ != TSDB_CODE_SUCCESS) {
cout << "parseInsertSyntax code:" << toString(code_) << ", msg:" << errMagBuf_ << endl;

View File

@ -78,6 +78,15 @@ if $data(4)[2] != 1 then
endi
# v1_dnode
$hasleader = 0
$x = 0
step2:
$x = $x + 1
sleep 1000
if $x == 20 then
print ====> dnode not ready!
return -1
endi
sql show db.vgroups
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08
if $data(2)[3] != 4 then
@ -89,6 +98,18 @@ endi
if $data(2)[7] != 2 then
return -1
endi
if $data(2)[4] == leader then
$hasleader = 1
endi
if $data(2)[6] == leader then
$hasleader = 1
endi
if $data(2)[8] == leader then
$hasleader = 1
endi
if $hasleader != 1 then
goto step2
endi
sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 binary(16)) comment "abd"
sql create table db.ctb using db.stb tags(101, "102")

View File

@ -3,8 +3,8 @@
#run tsim/user/privilege2.sim
#run tsim/user/user_len.sim
#run tsim/user/privilege1.sim
##run tsim/user/pass_len.sim
##run tsim/table/basic1.sim
#run tsim/user/pass_len.sim
#run tsim/table/basic1.sim
#run tsim/trans/lossdata1.sim
#run tsim/trans/create_db.sim
##run tsim/stable/alter_metrics.sim
@ -13,20 +13,20 @@
##run tsim/stable/column_drop.sim
##run tsim/stable/column_modify.sim
##run tsim/stable/tag_rename.sim
##run tsim/stable/vnode3.sim
##run tsim/stable/metrics.sim
#run tsim/stable/vnode3.sim
#run tsim/stable/metrics.sim
##run tsim/stable/alter_insert2.sim
##run tsim/stable/show.sim
##run tsim/stable/alter_import.sim
#run tsim/stable/show.sim
#run tsim/stable/alter_import.sim
##run tsim/stable/tag_add.sim
##run tsim/stable/tag_drop.sim
##run tsim/stable/column_add.sim
##run tsim/stable/alter_count.sim
##run tsim/stable/values.sim
##run tsim/stable/dnode3.sim
##run tsim/stable/alter_insert1.sim
##run tsim/stable/refcount.sim
##run tsim/stable/disk.sim
#run tsim/stable/tag_drop.sim
#run tsim/stable/column_add.sim
#run tsim/stable/alter_count.sim
#run tsim/stable/values.sim
#run tsim/stable/dnode3.sim
#run tsim/stable/alter_insert1.sim
#run tsim/stable/refcount.sim
#run tsim/stable/disk.sim
#run tsim/db/basic1.sim
#run tsim/db/basic3.sim
##run tsim/db/basic7.sim
@ -40,24 +40,24 @@
#run tsim/mnode/basic3.sim
#run tsim/mnode/basic2.sim
#run tsim/parser/fourArithmetic-basic.sim
run tsim/parser/groupby-basic.sim
run tsim/snode/basic1.sim
run tsim/query/time_process.sim
run tsim/query/stddev.sim
run tsim/query/interval-offset.sim
run tsim/query/charScalarFunction.sim
run tsim/query/complex_select.sim
run tsim/query/explain.sim
run tsim/query/crash_sql.sim
run tsim/query/diff.sim
run tsim/query/complex_limit.sim
run tsim/query/complex_having.sim
run tsim/query/udf.sim
run tsim/query/complex_group.sim
run tsim/query/interval.sim
run tsim/query/session.sim
run tsim/query/scalarFunction.sim
run tsim/query/scalarNull.sim
#run tsim/parser/groupby-basic.sim
#run tsim/snode/basic1.sim
#run tsim/query/time_process.sim
#run tsim/query/stddev.sim
#run tsim/query/interval-offset.sim
#run tsim/query/charScalarFunction.sim
#run tsim/query/complex_select.sim
#run tsim/query/explain.sim
#run tsim/query/crash_sql.sim
#run tsim/query/diff.sim
#run tsim/query/complex_limit.sim
#run tsim/query/complex_having.sim
#run tsim/query/udf.sim
#run tsim/query/complex_group.sim
#run tsim/query/interval.sim
#run tsim/query/session.sim
#run tsim/query/scalarFunction.sim
##run tsim/query/scalarNull.sim
run tsim/query/complex_where.sim
run tsim/tmq/basic1.sim
run tsim/tmq/basic4.sim