Merge branch '3.0' into feature/vnode
This commit is contained in:
commit
85218348ab
|
@ -587,10 +587,6 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int32_t code;
|
||||
union {
|
||||
uint64_t qhandle;
|
||||
uint64_t qId;
|
||||
}; // query handle
|
||||
} SQueryTableRsp;
|
||||
|
||||
// todo: the show handle should be replaced with id
|
||||
|
@ -1121,6 +1117,10 @@ typedef struct SResReadyMsg {
|
|||
uint64_t taskId;
|
||||
} SResReadyMsg;
|
||||
|
||||
typedef struct SResReadyRsp {
|
||||
int32_t code;
|
||||
} SResReadyRsp;
|
||||
|
||||
typedef struct SResFetchMsg {
|
||||
uint64_t schedulerId;
|
||||
uint64_t queryId;
|
||||
|
@ -1149,12 +1149,20 @@ typedef struct STaskCancelMsg {
|
|||
uint64_t taskId;
|
||||
} STaskCancelMsg;
|
||||
|
||||
typedef struct STaskCancelRsp {
|
||||
int32_t code;
|
||||
} STaskCancelRsp;
|
||||
|
||||
typedef struct STaskDropMsg {
|
||||
uint64_t schedulerId;
|
||||
uint64_t queryId;
|
||||
uint64_t taskId;
|
||||
} STaskDropMsg;
|
||||
|
||||
typedef struct STaskDropRsp {
|
||||
int32_t code;
|
||||
} STaskDropRsp;
|
||||
|
||||
|
||||
#pragma pack(pop)
|
||||
|
||||
|
|
|
@ -42,17 +42,17 @@ typedef struct {
|
|||
|
||||
int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt);
|
||||
|
||||
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg **rsp);
|
||||
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||
|
||||
int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp);
|
||||
int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||
|
||||
int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp);
|
||||
int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||
|
||||
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp);
|
||||
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||
|
||||
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp);
|
||||
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||
|
||||
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp);
|
||||
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
|
||||
|
||||
void qWorkerDestroy(void **qWorkerMgmt);
|
||||
|
||||
|
|
|
@ -113,6 +113,13 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
|
|||
return taosConnectImpl(ip, user, &secretEncrypt[0], db, port, NULL, NULL, *pInst);
|
||||
}
|
||||
|
||||
static bool supportedQueryType(int32_t type) {
|
||||
return (type == TSDB_MSG_TYPE_CREATE_USER || type == TSDB_MSG_TYPE_SHOW || type == TSDB_MSG_TYPE_DROP_USER ||
|
||||
type == TSDB_MSG_TYPE_DROP_ACCT || type == TSDB_MSG_TYPE_CREATE_DB || type == TSDB_MSG_TYPE_CREATE_ACCT ||
|
||||
type == TSDB_MSG_TYPE_CREATE_TABLE || type == TSDB_MSG_TYPE_CREATE_STB || type == TSDB_MSG_TYPE_USE_DB ||
|
||||
type == TSDB_MSG_TYPE_DROP_DB || type == TSDB_MSG_TYPE_DROP_STB);
|
||||
}
|
||||
|
||||
TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
|
||||
STscObj *pTscObj = (STscObj *)taos;
|
||||
if (sqlLen > (size_t) tsMaxSQLStringLen) {
|
||||
|
@ -145,16 +152,17 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
|
|||
tscDebugL("0x%"PRIx64" SQL: %s", pRequest->requestId, pRequest->sqlstr);
|
||||
|
||||
SParseContext cxt = {
|
||||
.ctx = {.requestId = pRequest->requestId, .acctId = pTscObj->acctId, .db = getConnectionDB(pTscObj)},
|
||||
.pSql = pRequest->sqlstr,
|
||||
.ctx = {.requestId = pRequest->requestId, .acctId = pTscObj->acctId, .db = getConnectionDB(pTscObj)},
|
||||
.pSql = pRequest->sqlstr,
|
||||
.sqlLen = sqlLen,
|
||||
.pMsg = pRequest->msgBuf,
|
||||
.pMsg = pRequest->msgBuf,
|
||||
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE
|
||||
};
|
||||
|
||||
SQueryNode* pQuery = NULL;
|
||||
int32_t code = qParseQuerySql(&cxt, &pQuery);
|
||||
if (qIsDclQuery(pQuery)) {
|
||||
SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery;
|
||||
SDclStmtInfo* pDcl = (SDclStmtInfo*) pQuery;
|
||||
pRequest->type = pDcl->msgType;
|
||||
pRequest->body.requestMsg = (SReqMsgInfo){.pMsg = pDcl->pMsg, .len = pDcl->msgLen};
|
||||
|
||||
|
@ -215,7 +223,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) {
|
|||
int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) {
|
||||
pEpSet->version = 0;
|
||||
|
||||
// init mgmt ip set
|
||||
// init mnode ip set
|
||||
SEpSet *mgmtEpSet = &(pEpSet->epSet);
|
||||
mgmtEpSet->numOfEps = 0;
|
||||
mgmtEpSet->inUse = 0;
|
||||
|
|
|
@ -110,9 +110,9 @@ SRequestMsgBody buildRequestMsgImpl(SRequestObj *pRequest) {
|
|||
assert(pRequest != NULL);
|
||||
SRequestMsgBody body = {
|
||||
.requestObjRefId = pRequest->self,
|
||||
.msgInfo = pRequest->body.requestMsg,
|
||||
.msgType = pRequest->type,
|
||||
.requestId = pRequest->requestId,
|
||||
.msgInfo = pRequest->body.requestMsg,
|
||||
.msgType = pRequest->type,
|
||||
.requestId = pRequest->requestId,
|
||||
};
|
||||
return body;
|
||||
}
|
||||
|
|
|
@ -49,101 +49,101 @@ int main(int argc, char** argv) {
|
|||
|
||||
TEST(testCase, driverInit_Test) { taos_init(); }
|
||||
|
||||
// TEST(testCase, connect_Test) {
|
||||
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
//// assert(pConn != NULL);
|
||||
// taos_close(pConn);
|
||||
//}
|
||||
//
|
||||
// TEST(testCase, create_user_Test) {
|
||||
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
TEST(testCase, connect_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
// assert(pConn != NULL);
|
||||
//
|
||||
// TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'");
|
||||
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||
// }
|
||||
//
|
||||
// taos_free_result(pRes);
|
||||
// taos_close(pConn);
|
||||
//}
|
||||
//
|
||||
// TEST(testCase, create_account_Test) {
|
||||
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
TEST(testCase, create_user_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'");
|
||||
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
TEST(testCase, create_account_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'");
|
||||
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
TEST(testCase, drop_account_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "drop account aabc");
|
||||
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
TEST(testCase, show_user_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
// assert(pConn != NULL);
|
||||
//
|
||||
// TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'");
|
||||
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||
// }
|
||||
//
|
||||
// taos_free_result(pRes);
|
||||
// taos_close(pConn);
|
||||
//}
|
||||
//
|
||||
// TEST(testCase, drop_account_Test) {
|
||||
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "show users");
|
||||
TAOS_ROW pRow = NULL;
|
||||
|
||||
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||
int32_t numOfFields = taos_num_fields(pRes);
|
||||
|
||||
char str[512] = {0};
|
||||
while((pRow = taos_fetch_row(pRes)) != NULL) {
|
||||
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
|
||||
printf("%s\n", str);
|
||||
}
|
||||
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
TEST(testCase, drop_user_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "drop user abc");
|
||||
if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||
printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
TEST(testCase, show_db_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
// assert(pConn != NULL);
|
||||
//
|
||||
// TAOS_RES* pRes = taos_query(pConn, "drop account aabc");
|
||||
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||
// }
|
||||
//
|
||||
// taos_free_result(pRes);
|
||||
// taos_close(pConn);
|
||||
//}
|
||||
//
|
||||
// TEST(testCase, show_user_Test) {
|
||||
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
//// assert(pConn != NULL);
|
||||
//
|
||||
// TAOS_RES* pRes = taos_query(pConn, "show users");
|
||||
// TAOS_ROW pRow = NULL;
|
||||
//
|
||||
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||
// int32_t numOfFields = taos_num_fields(pRes);
|
||||
//
|
||||
// char str[512] = {0};
|
||||
// while((pRow = taos_fetch_row(pRes)) != NULL) {
|
||||
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
|
||||
// printf("%s\n", str);
|
||||
// }
|
||||
//
|
||||
// taos_close(pConn);
|
||||
//}
|
||||
//
|
||||
// TEST(testCase, drop_user_Test) {
|
||||
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
// assert(pConn != NULL);
|
||||
//
|
||||
// TAOS_RES* pRes = taos_query(pConn, "drop user abc");
|
||||
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||
// printf("failed to create user, reason:%s\n", taos_errstr(pRes));
|
||||
// }
|
||||
//
|
||||
// taos_free_result(pRes);
|
||||
// taos_close(pConn);
|
||||
//}
|
||||
//
|
||||
// TEST(testCase, show_db_Test) {
|
||||
// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
//// assert(pConn != NULL);
|
||||
//
|
||||
// TAOS_RES* pRes = taos_query(pConn, "show databases");
|
||||
// TAOS_ROW pRow = NULL;
|
||||
//
|
||||
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||
// int32_t numOfFields = taos_num_fields(pRes);
|
||||
//
|
||||
// char str[512] = {0};
|
||||
// while((pRow = taos_fetch_row(pRes)) != NULL) {
|
||||
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
|
||||
// printf("%s\n", str);
|
||||
// }
|
||||
//
|
||||
// taos_close(pConn);
|
||||
//}
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "show databases");
|
||||
TAOS_ROW pRow = NULL;
|
||||
|
||||
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||
int32_t numOfFields = taos_num_fields(pRes);
|
||||
|
||||
char str[512] = {0};
|
||||
while((pRow = taos_fetch_row(pRes)) != NULL) {
|
||||
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
|
||||
printf("%s\n", str);
|
||||
}
|
||||
|
||||
taos_close(pConn);
|
||||
}
|
||||
|
||||
TEST(testCase, create_db_Test) {
|
||||
TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0);
|
||||
|
|
|
@ -27,24 +27,25 @@ Testbase DndTestDb::test;
|
|||
|
||||
TEST_F(DndTestDb, 01_ShowDb) {
|
||||
test.SendShowMetaMsg(TSDB_MGMT_TABLE_DB, "");
|
||||
CHECK_META("show databases", 17);
|
||||
CHECK_META("show databases", 18);
|
||||
CHECK_SCHEMA(0, TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN - 1 + VARSTR_HEADER_SIZE, "name");
|
||||
CHECK_SCHEMA(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time");
|
||||
CHECK_SCHEMA(2, TSDB_DATA_TYPE_SMALLINT, 2, "vgroups");
|
||||
CHECK_SCHEMA(3, TSDB_DATA_TYPE_SMALLINT, 2, "replica");
|
||||
CHECK_SCHEMA(4, TSDB_DATA_TYPE_SMALLINT, 2, "quorum");
|
||||
CHECK_SCHEMA(5, TSDB_DATA_TYPE_SMALLINT, 2, "days");
|
||||
CHECK_SCHEMA(6, TSDB_DATA_TYPE_BINARY, 24 + VARSTR_HEADER_SIZE, "keep0,keep1,keep2");
|
||||
CHECK_SCHEMA(7, TSDB_DATA_TYPE_INT, 4, "cache");
|
||||
CHECK_SCHEMA(8, TSDB_DATA_TYPE_INT, 4, "blocks");
|
||||
CHECK_SCHEMA(9, TSDB_DATA_TYPE_INT, 4, "minrows");
|
||||
CHECK_SCHEMA(10, TSDB_DATA_TYPE_INT, 4, "maxrows");
|
||||
CHECK_SCHEMA(11, TSDB_DATA_TYPE_TINYINT, 1, "wallevel");
|
||||
CHECK_SCHEMA(12, TSDB_DATA_TYPE_INT, 4, "fsync");
|
||||
CHECK_SCHEMA(13, TSDB_DATA_TYPE_TINYINT, 1, "comp");
|
||||
CHECK_SCHEMA(14, TSDB_DATA_TYPE_TINYINT, 1, "cachelast");
|
||||
CHECK_SCHEMA(15, TSDB_DATA_TYPE_BINARY, 3 + VARSTR_HEADER_SIZE, "precision");
|
||||
CHECK_SCHEMA(16, TSDB_DATA_TYPE_TINYINT, 1, "update");
|
||||
CHECK_SCHEMA(3, TSDB_DATA_TYPE_INT, 4, "ntables");
|
||||
CHECK_SCHEMA(4, TSDB_DATA_TYPE_SMALLINT, 2, "replica");
|
||||
CHECK_SCHEMA(5, TSDB_DATA_TYPE_SMALLINT, 2, "quorum");
|
||||
CHECK_SCHEMA(6, TSDB_DATA_TYPE_SMALLINT, 2, "days");
|
||||
CHECK_SCHEMA(7, TSDB_DATA_TYPE_BINARY, 24 + VARSTR_HEADER_SIZE, "keep0,keep1,keep2");
|
||||
CHECK_SCHEMA(8, TSDB_DATA_TYPE_INT, 4, "cache");
|
||||
CHECK_SCHEMA(9, TSDB_DATA_TYPE_INT, 4, "blocks");
|
||||
CHECK_SCHEMA(10, TSDB_DATA_TYPE_INT, 4, "minrows");
|
||||
CHECK_SCHEMA(11, TSDB_DATA_TYPE_INT, 4, "maxrows");
|
||||
CHECK_SCHEMA(12, TSDB_DATA_TYPE_TINYINT, 1, "wallevel");
|
||||
CHECK_SCHEMA(13, TSDB_DATA_TYPE_INT, 4, "fsync");
|
||||
CHECK_SCHEMA(14, TSDB_DATA_TYPE_TINYINT, 1, "comp");
|
||||
CHECK_SCHEMA(15, TSDB_DATA_TYPE_TINYINT, 1, "cachelast");
|
||||
CHECK_SCHEMA(16, TSDB_DATA_TYPE_BINARY, 3 + VARSTR_HEADER_SIZE, "precision");
|
||||
CHECK_SCHEMA(17, TSDB_DATA_TYPE_TINYINT, 1, "update");
|
||||
|
||||
test.SendShowRetrieveMsg();
|
||||
EXPECT_EQ(test.GetShowRows(), 0);
|
||||
|
@ -82,13 +83,14 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
|
|||
}
|
||||
|
||||
test.SendShowMetaMsg(TSDB_MGMT_TABLE_DB, "");
|
||||
CHECK_META("show databases", 17);
|
||||
CHECK_META("show databases", 18);
|
||||
|
||||
test.SendShowRetrieveMsg();
|
||||
EXPECT_EQ(test.GetShowRows(), 1);
|
||||
CheckBinary("d1", TSDB_DB_NAME_LEN - 1);
|
||||
CheckTimestamp();
|
||||
CheckInt16(2); // vgroups
|
||||
CheckInt32(0); // ntables
|
||||
CheckInt16(1); // replica
|
||||
CheckInt16(1); // quorum
|
||||
CheckInt16(10); // days
|
||||
|
@ -147,6 +149,7 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
|
|||
CheckBinary("d1", TSDB_DB_NAME_LEN - 1);
|
||||
CheckTimestamp();
|
||||
CheckInt16(2); // vgroups
|
||||
CheckInt32(0);
|
||||
CheckInt16(1); // replica
|
||||
CheckInt16(2); // quorum
|
||||
CheckInt16(10); // days
|
||||
|
@ -166,7 +169,7 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
|
|||
test.Restart();
|
||||
|
||||
test.SendShowMetaMsg(TSDB_MGMT_TABLE_DB, "");
|
||||
CHECK_META("show databases", 17);
|
||||
CHECK_META("show databases", 18);
|
||||
|
||||
test.SendShowRetrieveMsg();
|
||||
EXPECT_EQ(test.GetShowRows(), 1);
|
||||
|
@ -174,6 +177,7 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
|
|||
CheckBinary("d1", TSDB_DB_NAME_LEN - 1);
|
||||
CheckTimestamp();
|
||||
CheckInt16(2); // vgroups
|
||||
CheckInt32(0);
|
||||
CheckInt16(1); // replica
|
||||
CheckInt16(2); // quorum
|
||||
CheckInt16(10); // days
|
||||
|
@ -201,7 +205,7 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) {
|
|||
}
|
||||
|
||||
test.SendShowMetaMsg(TSDB_MGMT_TABLE_DB, "");
|
||||
CHECK_META("show databases", 17);
|
||||
CHECK_META("show databases", 18);
|
||||
|
||||
test.SendShowRetrieveMsg();
|
||||
EXPECT_EQ(test.GetShowRows(), 0);
|
||||
|
@ -239,7 +243,7 @@ TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) {
|
|||
}
|
||||
|
||||
test.SendShowMetaMsg(TSDB_MGMT_TABLE_DB, "");
|
||||
CHECK_META("show databases", 17);
|
||||
CHECK_META("show databases", 18);
|
||||
|
||||
test.SendShowRetrieveMsg();
|
||||
EXPECT_EQ(test.GetShowRows(), 1);
|
||||
|
|
|
@ -876,6 +876,12 @@ static int32_t mndGetDbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMe
|
|||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 4;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_INT;
|
||||
strcpy(pSchema[cols].name, "ntables");
|
||||
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
|
||||
cols++;
|
||||
|
||||
pShow->bytes[cols] = 2;
|
||||
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
|
||||
strcpy(pSchema[cols].name, "replica");
|
||||
|
@ -1017,6 +1023,10 @@ static int32_t mndRetrieveDbs(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3
|
|||
*(int16_t *)pWrite = pDb->cfg.numOfVgroups;
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int16_t *)pWrite = 0; // todo
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
*(int16_t *)pWrite = pDb->cfg.replications;
|
||||
cols++;
|
||||
|
|
|
@ -15,9 +15,10 @@ target_link_libraries(
|
|||
PUBLIC wal
|
||||
PUBLIC sync
|
||||
PUBLIC cjson
|
||||
PUBLIC qworker
|
||||
)
|
||||
|
||||
# test
|
||||
if(${BUILD_TEST})
|
||||
add_subdirectory(test)
|
||||
endif(${BUILD_TEST})
|
||||
endif(${BUILD_TEST})
|
||||
|
|
|
@ -34,6 +34,7 @@
|
|||
#include "vnodeRequest.h"
|
||||
#include "vnodeStateMgr.h"
|
||||
#include "vnodeSync.h"
|
||||
#include "vnodeQuery.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
@ -72,6 +73,7 @@ struct SVnode {
|
|||
SVnodeSync* pSync;
|
||||
SVnodeFS* pFs;
|
||||
tsem_t canCommit;
|
||||
void* pQuery;
|
||||
};
|
||||
|
||||
int vnodeScheduleTask(SVnodeTask* task);
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_VNODE_READ_H_
|
||||
#define _TD_VNODE_READ_H_
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
#include "vnodeInt.h"
|
||||
#include "qworker.h"
|
||||
|
||||
int vnodeQueryOpen(SVnode *pVnode);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_VNODE_READ_H_*/
|
|
@ -24,16 +24,6 @@ int32_t vnodeSync(SVnode *pVnode) { return 0; }
|
|||
|
||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { return 0; }
|
||||
|
||||
int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||
vInfo("query message is processed");
|
||||
return 0;
|
||||
}
|
||||
|
||||
int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||
vInfo("fetch message is processed");
|
||||
return 0;
|
||||
}
|
||||
|
||||
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||
vInfo("sync message is processed");
|
||||
return 0;
|
||||
|
|
|
@ -127,6 +127,11 @@ static int vnodeOpenImpl(SVnode *pVnode) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
// Open Query
|
||||
if (vnodeQueryOpen(pVnode)) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// TODO
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "vnodeDef.h"
|
||||
#include "vnodeQuery.h"
|
||||
|
||||
int vnodeQueryOpen(SVnode *pVnode) {
|
||||
return qWorkerInit(NULL, &pVnode->pQuery);
|
||||
}
|
||||
|
||||
int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||
vInfo("query message is processed");
|
||||
qWorkerProcessQueryMsg(pVnode, pVnode->pQuery, pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||
vInfo("fetch message is processed");
|
||||
qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
@ -13,8 +13,8 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_META_QUERY_H_
|
||||
#define _TD_META_QUERY_H_
|
||||
#ifndef _VNODE_QUERY_H_
|
||||
#define _VNODE_QUERY_H_
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
@ -24,4 +24,4 @@ extern "C" {
|
|||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_META_QUERY_H_*/
|
||||
#endif /*_VNODE_QUERY_H_*/
|
|
@ -4457,8 +4457,6 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm
|
|||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
}
|
||||
|
||||
strncpy(pCreateMsg->db, token.z, token.n);
|
||||
|
||||
pDcl->pMsg = (char*)pCreateMsg;
|
||||
pDcl->msgLen = sizeof(SCreateDbMsg);
|
||||
pDcl->msgType = (pInfo->type == TSDB_SQL_CREATE_DB)? TSDB_MSG_TYPE_CREATE_DB:TSDB_MSG_TYPE_ALTER_DB;
|
||||
|
|
|
@ -553,28 +553,122 @@ _return:
|
|||
|
||||
|
||||
int32_t qwBuildAndSendQueryRsp(SRpcMsg *pMsg, int32_t code) {
|
||||
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
|
||||
pRsp->code = code;
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
.handle = pMsg->handle,
|
||||
.ahandle = pMsg->ahandle,
|
||||
.pCont = pRsp,
|
||||
.contLen = sizeof(*pRsp),
|
||||
.code = code,
|
||||
};
|
||||
|
||||
rpcSendResponse(&rpcRsp);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwBuildAndSendReadyRsp(SRpcMsg *pMsg, int32_t code) {
|
||||
SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp));
|
||||
pRsp->code = code;
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
.handle = pMsg->handle,
|
||||
.ahandle = pMsg->ahandle,
|
||||
.pCont = pRsp,
|
||||
.contLen = sizeof(*pRsp),
|
||||
.code = code,
|
||||
};
|
||||
|
||||
rpcSendResponse(&rpcRsp);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) {
|
||||
int32_t size = 0;
|
||||
|
||||
if (sStatus) {
|
||||
size = sizeof(SSchedulerStatusRsp) + sizeof(sStatus->status[0]) * sStatus->num;
|
||||
} else {
|
||||
size = sizeof(SSchedulerStatusRsp);
|
||||
}
|
||||
|
||||
SSchedulerStatusRsp *pRsp = (SSchedulerStatusRsp *)rpcMallocCont(size);
|
||||
|
||||
if (sStatus) {
|
||||
memcpy(pRsp, sStatus, size);
|
||||
} else {
|
||||
pRsp->num = 0;
|
||||
}
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
.handle = pMsg->handle,
|
||||
.ahandle = pMsg->ahandle,
|
||||
.pCont = pRsp,
|
||||
.contLen = size,
|
||||
.code = 0,
|
||||
};
|
||||
|
||||
rpcSendResponse(&rpcRsp);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwBuildAndSendFetchRsp(SRpcMsg *pMsg, void *data) {
|
||||
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
||||
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
|
||||
|
||||
//TODO fill msg
|
||||
pRsp->completed = true;
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
.handle = pMsg->handle,
|
||||
.ahandle = pMsg->ahandle,
|
||||
.pCont = pRsp,
|
||||
.contLen = sizeof(*pRsp),
|
||||
.code = 0,
|
||||
};
|
||||
|
||||
rpcSendResponse(&rpcRsp);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg) {
|
||||
int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code) {
|
||||
STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp));
|
||||
pRsp->code = code;
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
.handle = pMsg->handle,
|
||||
.ahandle = pMsg->ahandle,
|
||||
.pCont = pRsp,
|
||||
.contLen = sizeof(*pRsp),
|
||||
.code = code,
|
||||
};
|
||||
|
||||
rpcSendResponse(&rpcRsp);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwBuildAndSendDropRsp(SRpcMsg *pMsg) {
|
||||
int32_t qwBuildAndSendDropRsp(SRpcMsg *pMsg, int32_t code) {
|
||||
STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp));
|
||||
pRsp->code = code;
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
.handle = pMsg->handle,
|
||||
.ahandle = pMsg->ahandle,
|
||||
.pCont = pRsp,
|
||||
.contLen = sizeof(*pRsp),
|
||||
.code = code,
|
||||
};
|
||||
|
||||
rpcSendResponse(&rpcRsp);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
@ -712,8 +806,10 @@ int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedu
|
|||
SQWorkerSchStatus *sch = NULL;
|
||||
SQWorkerTaskStatus *task = NULL;
|
||||
int32_t code = 0;
|
||||
int32_t needRsp = true;
|
||||
void *data = NULL;
|
||||
|
||||
QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch));
|
||||
QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch));
|
||||
QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task));
|
||||
|
||||
QW_LOCK(QW_READ, &task->lock);
|
||||
|
@ -724,7 +820,7 @@ int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedu
|
|||
}
|
||||
|
||||
if (QW_GOT_RES_DATA(res->data)) {
|
||||
QW_ERR_JRET(qwBuildAndSendFetchRsp(pMsg, res->data));
|
||||
data = res->data;
|
||||
if (QW_LOW_RES_DATA(res->data)) {
|
||||
if (task->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) {
|
||||
//TODO add query back to queue
|
||||
|
@ -737,6 +833,8 @@ int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedu
|
|||
}
|
||||
|
||||
//TODO SET FLAG FOR QUERY TO SEND RSP WHEN RES READY
|
||||
|
||||
needRsp = false;
|
||||
}
|
||||
|
||||
_return:
|
||||
|
@ -746,9 +844,12 @@ _return:
|
|||
|
||||
if (sch) {
|
||||
qwReleaseTask(QW_READ, sch);
|
||||
qwReleaseScheduler(QW_READ, mgmt);
|
||||
}
|
||||
|
||||
qwReleaseScheduler(QW_READ, mgmt);
|
||||
if (needRsp) {
|
||||
qwBuildAndSendFetchRsp(pMsg, res->data);
|
||||
}
|
||||
|
||||
QW_RET(code);
|
||||
}
|
||||
|
@ -832,13 +933,14 @@ int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg **rsp) {
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) {
|
||||
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
SSubQueryMsg *msg = pMsg->pCont;
|
||||
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
|
||||
qError("invalid query msg");
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
|
@ -851,7 +953,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRp
|
|||
QW_ERR_JRET(qwCheckTaskCancelDrop(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, &needStop));
|
||||
if (needStop) {
|
||||
qWarn("task need stop");
|
||||
QW_ERR_RET(TSDB_CODE_QRY_TASK_CANCELLED);
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED);
|
||||
}
|
||||
|
||||
code = qStringToSubplan(msg->msg, &plan);
|
||||
|
@ -910,13 +1012,14 @@ _return:
|
|||
QW_RET(code);
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp){
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) {
|
||||
int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
SResReadyMsg *msg = pMsg->pCont;
|
||||
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
|
||||
qError("invalid task status msg");
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
|
@ -925,27 +1028,31 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRp
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp) {
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) {
|
||||
int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
SSchTasksStatusMsg *msg = pMsg->pCont;
|
||||
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
|
||||
qError("invalid task status msg");
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
SSchedulerStatusRsp *sStatus = NULL;
|
||||
|
||||
QW_ERR_RET(qwGetSchTasksStatus(qWorkerMgmt, msg->schedulerId, &sStatus));
|
||||
QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->schedulerId, &sStatus));
|
||||
|
||||
_return:
|
||||
|
||||
QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp) {
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) {
|
||||
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
|
@ -971,36 +1078,44 @@ _return:
|
|||
QW_RET(code);
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp) {
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) {
|
||||
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
STaskCancelMsg *msg = pMsg->pCont;
|
||||
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
|
||||
qError("invalid task cancel msg");
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
QW_ERR_RET(qwCancelTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId));
|
||||
QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId));
|
||||
|
||||
QW_ERR_RET(qwBuildAndSendCancelRsp(pMsg));
|
||||
_return:
|
||||
|
||||
QW_ERR_RET(qwBuildAndSendCancelRsp(pMsg, code));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp) {
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) {
|
||||
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
||||
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
STaskDropMsg *msg = pMsg->pCont;
|
||||
if (NULL == msg || pMsg->contLen <= sizeof(*msg)) {
|
||||
qError("invalid task drop msg");
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
QW_ERR_RET(qwCancelDropTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId));
|
||||
QW_ERR_JRET(qwCancelDropTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId));
|
||||
|
||||
QW_ERR_RET(qwBuildAndSendDropRsp(pMsg));
|
||||
_return:
|
||||
|
||||
QW_ERR_RET(qwBuildAndSendDropRsp(pMsg, code));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -31,17 +31,35 @@ extern "C" {
|
|||
|
||||
#define SCH_MAX_CONDIDATE_EP_NUM TSDB_MAX_REPLICA
|
||||
|
||||
enum {
|
||||
SCH_READ = 1,
|
||||
SCH_WRITE,
|
||||
};
|
||||
|
||||
typedef struct SSchedulerMgmt {
|
||||
uint64_t taskId;
|
||||
uint64_t schedulerId;
|
||||
SSchedulerCfg cfg;
|
||||
SHashObj *Jobs; // key: queryId, value: SQueryJob*
|
||||
SHashObj *jobs; // key: queryId, value: SQueryJob*
|
||||
} SSchedulerMgmt;
|
||||
|
||||
typedef struct SQueryLevel {
|
||||
int32_t level;
|
||||
int8_t status;
|
||||
SRWLatch lock;
|
||||
int32_t taskFailed;
|
||||
int32_t taskSucceed;
|
||||
int32_t taskNum;
|
||||
SArray *subTasks; // Element is SQueryTask
|
||||
} SQueryLevel;
|
||||
|
||||
|
||||
typedef struct SQueryTask {
|
||||
uint64_t taskId; // task id
|
||||
SQueryLevel *level; // level
|
||||
SSubplan *plan; // subplan
|
||||
char *msg; // operator tree
|
||||
int32_t msgLen; // msg length
|
||||
int8_t status; // task status
|
||||
SEpAddr execAddr; // task actual executed node address
|
||||
SQueryProfileSummary summary; // task execution summary
|
||||
|
@ -50,21 +68,14 @@ typedef struct SQueryTask {
|
|||
SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask*
|
||||
} SQueryTask;
|
||||
|
||||
typedef struct SQueryLevel {
|
||||
int32_t level;
|
||||
int8_t status;
|
||||
int32_t taskNum;
|
||||
SArray *subTasks; // Element is SQueryTask
|
||||
} SQueryLevel;
|
||||
|
||||
typedef struct SQueryJob {
|
||||
uint64_t queryId;
|
||||
int32_t levelNum;
|
||||
int32_t levelIdx;
|
||||
int8_t status;
|
||||
SQueryProfileSummary summary;
|
||||
SEpSet dataSrcEps;
|
||||
SEpAddr resEp;
|
||||
SEpSet dataSrcEps;
|
||||
SEpAddr resEp;
|
||||
void *transport;
|
||||
SArray *qnodeList;
|
||||
tsem_t rspSem;
|
||||
|
@ -74,6 +85,7 @@ typedef struct SQueryJob {
|
|||
|
||||
SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask*
|
||||
SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask*
|
||||
SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask*
|
||||
|
||||
SArray *levels; // Element is SQueryLevel, starting from 0.
|
||||
SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0.
|
||||
|
@ -81,7 +93,8 @@ typedef struct SQueryJob {
|
|||
|
||||
#define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE
|
||||
#define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE
|
||||
#define SCH_IS_DATA_SRC_TASK(task) (task->plan->type == QUERY_TYPE_SCAN)
|
||||
#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN)
|
||||
#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY)
|
||||
|
||||
#define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__)
|
||||
#define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__)
|
||||
|
@ -91,6 +104,9 @@ typedef struct SQueryJob {
|
|||
#define SCH_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0)
|
||||
#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
|
||||
|
||||
#define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock))
|
||||
#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock))
|
||||
|
||||
|
||||
extern int32_t schLaunchTask(SQueryJob *job, SQueryTask *task);
|
||||
|
||||
|
|
|
@ -160,11 +160,19 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
|
|||
SQueryLevel level = {0};
|
||||
SArray *levelPlans = NULL;
|
||||
int32_t levelPlanNum = 0;
|
||||
SQueryLevel *pLevel = NULL;
|
||||
|
||||
level.status = JOB_TASK_STATUS_NOT_START;
|
||||
|
||||
for (int32_t i = 0; i < levelNum; ++i) {
|
||||
level.level = i;
|
||||
if (NULL == taosArrayPush(job->levels, &level)) {
|
||||
qError("taosArrayPush failed");
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
pLevel = taosArrayGet(job->levels, i);
|
||||
|
||||
pLevel->level = i;
|
||||
levelPlans = taosArrayGetP(dag->pSubplans, i);
|
||||
if (NULL == levelPlans) {
|
||||
qError("no level plans for level %d", i);
|
||||
|
@ -177,10 +185,10 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
|
|||
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
level.taskNum = levelPlanNum;
|
||||
pLevel->taskNum = levelPlanNum;
|
||||
|
||||
level.subTasks = taosArrayInit(levelPlanNum, sizeof(SQueryTask));
|
||||
if (NULL == level.subTasks) {
|
||||
pLevel->subTasks = taosArrayInit(levelPlanNum, sizeof(SQueryTask));
|
||||
if (NULL == pLevel->subTasks) {
|
||||
qError("taosArrayInit %d failed", levelPlanNum);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
@ -191,9 +199,10 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
|
|||
|
||||
task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
|
||||
task.plan = plan;
|
||||
task.level = pLevel;
|
||||
task.status = JOB_TASK_STATUS_NOT_START;
|
||||
|
||||
void *p = taosArrayPush(level.subTasks, &task);
|
||||
void *p = taosArrayPush(pLevel->subTasks, &task);
|
||||
if (NULL == p) {
|
||||
qError("taosArrayPush failed");
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
|
@ -205,10 +214,6 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
|
|||
}
|
||||
}
|
||||
|
||||
if (NULL == taosArrayPush(job->levels, &level)) {
|
||||
qError("taosArrayPush failed");
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(schBuildTaskRalation(job, planToTask));
|
||||
|
@ -220,8 +225,8 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
if (level.subTasks) {
|
||||
taosArrayDestroy(level.subTasks);
|
||||
if (pLevel->subTasks) {
|
||||
taosArrayDestroy(pLevel->subTasks);
|
||||
}
|
||||
|
||||
if (planToTask) {
|
||||
|
@ -273,7 +278,23 @@ int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) {
|
||||
if (0 != taosHashPut(job->succTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) {
|
||||
qError("taosHashPut failed");
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
*moved = true;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schMoveTaskToFailList(SQueryJob *job, SQueryTask *task, bool *moved) {
|
||||
if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) {
|
||||
qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (0 != taosHashPut(job->failTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) {
|
||||
qError("taosHashPut failed");
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
@ -289,14 +310,23 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
|
|||
void *msg = NULL;
|
||||
|
||||
switch (msgType) {
|
||||
case TSDB_MSG_TYPE_SUBMIT: {
|
||||
if (NULL == task->msg || task->msgLen <= 0) {
|
||||
qError("submit msg is NULL");
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
msgSize = task->msgLen;
|
||||
msg = task->msg;
|
||||
break;
|
||||
}
|
||||
case TSDB_MSG_TYPE_QUERY: {
|
||||
if (NULL == task->msg) {
|
||||
qError("query msg is NULL");
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
int32_t len = strlen(task->msg);
|
||||
msgSize = sizeof(SSubQueryMsg) + len + 1;
|
||||
msgSize = sizeof(SSubQueryMsg) + task->msgLen;
|
||||
msg = calloc(1, msgSize);
|
||||
if (NULL == msg) {
|
||||
qError("calloc %d failed", msgSize);
|
||||
|
@ -308,11 +338,10 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
|
|||
pMsg->schedulerId = htobe64(schMgmt.schedulerId);
|
||||
pMsg->queryId = htobe64(job->queryId);
|
||||
pMsg->taskId = htobe64(task->taskId);
|
||||
pMsg->contentLen = htonl(len);
|
||||
memcpy(pMsg->msg, task->msg, len);
|
||||
pMsg->msg[len] = 0;
|
||||
pMsg->contentLen = htonl(task->msgLen);
|
||||
memcpy(pMsg->msg, task->msg, task->msgLen);
|
||||
break;
|
||||
}
|
||||
}
|
||||
case TSDB_MSG_TYPE_RES_READY: {
|
||||
msgSize = sizeof(SResReadyMsg);
|
||||
msg = calloc(1, msgSize);
|
||||
|
@ -322,6 +351,7 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
|
|||
}
|
||||
|
||||
SResReadyMsg *pMsg = msg;
|
||||
pMsg->schedulerId = htobe64(schMgmt.schedulerId);
|
||||
pMsg->queryId = htobe64(job->queryId);
|
||||
pMsg->taskId = htobe64(task->taskId);
|
||||
break;
|
||||
|
@ -335,6 +365,21 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
|
|||
}
|
||||
|
||||
SResFetchMsg *pMsg = msg;
|
||||
pMsg->schedulerId = htobe64(schMgmt.schedulerId);
|
||||
pMsg->queryId = htobe64(job->queryId);
|
||||
pMsg->taskId = htobe64(task->taskId);
|
||||
break;
|
||||
}
|
||||
case TSDB_MSG_TYPE_DROP_TASK:{
|
||||
msgSize = sizeof(STaskDropMsg);
|
||||
msg = calloc(1, msgSize);
|
||||
if (NULL == msg) {
|
||||
qError("calloc %d failed", msgSize);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
STaskDropMsg *pMsg = msg;
|
||||
pMsg->schedulerId = htobe64(schMgmt.schedulerId);
|
||||
pMsg->queryId = htobe64(job->queryId);
|
||||
pMsg->taskId = htobe64(task->taskId);
|
||||
break;
|
||||
|
@ -345,6 +390,7 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) {
|
|||
}
|
||||
|
||||
//TODO SEND MSG
|
||||
//taosAsyncExec(rpcSendRequest(void * shandle, const SEpSet * pEpSet, SRpcMsg * pMsg, int64_t * pRid), p, &code);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -425,8 +471,29 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
|
|||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
strncpy(job->resEp.fqdn, task->execAddr.fqdn, sizeof(job->resEp.fqdn));
|
||||
job->resEp.port = task->execAddr.port;
|
||||
int32_t taskDone = 0;
|
||||
|
||||
if (SCH_TASK_NEED_WAIT_ALL(task)) {
|
||||
SCH_LOCK(SCH_WRITE, &task->level->lock);
|
||||
task->level->taskFailed++;
|
||||
taskDone = task->level->taskSucceed + task->level->taskFailed;
|
||||
SCH_UNLOCK(SCH_WRITE, &task->level->lock);
|
||||
|
||||
if (taskDone < task->level->taskNum) {
|
||||
qDebug("wait all tasks, done:%d, all:%d", taskDone, task->level->taskNum);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (task->level->taskFailed > 0) {
|
||||
job->status = JOB_TASK_STATUS_FAILED;
|
||||
SCH_ERR_RET(schProcessOnJobFailure(job));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
} else {
|
||||
strncpy(job->resEp.fqdn, task->execAddr.fqdn, sizeof(job->resEp.fqdn));
|
||||
job->resEp.port = task->execAddr.port;
|
||||
}
|
||||
|
||||
SCH_ERR_RET(schProcessOnJobSuccess(job));
|
||||
|
||||
|
@ -457,10 +524,30 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) {
|
|||
|
||||
int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCode) {
|
||||
bool needRetry = false;
|
||||
bool moved = false;
|
||||
int32_t taskDone = 0;
|
||||
SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry));
|
||||
|
||||
if (!needRetry) {
|
||||
SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode);
|
||||
|
||||
SCH_ERR_RET(schMoveTaskToFailList(job, task, &moved));
|
||||
if (!moved) {
|
||||
SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (SCH_TASK_NEED_WAIT_ALL(task)) {
|
||||
SCH_LOCK(SCH_WRITE, &task->level->lock);
|
||||
task->level->taskFailed++;
|
||||
taskDone = task->level->taskSucceed + task->level->taskFailed;
|
||||
SCH_UNLOCK(SCH_WRITE, &task->level->lock);
|
||||
|
||||
if (taskDone < task->level->taskNum) {
|
||||
qDebug("wait all tasks, done:%d, all:%d", taskDone, task->level->taskNum);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
job->status = JOB_TASK_STATUS_FAILED;
|
||||
SCH_ERR_RET(schProcessOnJobFailure(job));
|
||||
|
@ -522,8 +609,7 @@ _return:
|
|||
|
||||
int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) {
|
||||
SSubplan *plan = task->plan;
|
||||
int32_t len = 0;
|
||||
SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &len));
|
||||
SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen));
|
||||
if (plan->execEpSet.numOfEps <= 0) {
|
||||
SCH_ERR_RET(schSetTaskExecEpSet(job, &plan->execEpSet));
|
||||
}
|
||||
|
@ -532,8 +618,10 @@ int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) {
|
|||
SCH_TASK_ERR_LOG("invalid execEpSet num:%d", plan->execEpSet.numOfEps);
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
int32_t msgType = (plan->type == QUERY_TYPE_MODIFY) ? TSDB_MSG_TYPE_SUBMIT : TSDB_MSG_TYPE_QUERY;
|
||||
|
||||
SCH_ERR_RET(schAsyncSendMsg(job, task, TSDB_MSG_TYPE_QUERY));
|
||||
SCH_ERR_RET(schAsyncSendMsg(job, task, msgType));
|
||||
|
||||
SCH_ERR_RET(schPushTaskToExecList(job, task));
|
||||
|
||||
|
@ -554,6 +642,25 @@ int32_t schLaunchJob(SQueryJob *job) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void schDropJobAllTasks(SQueryJob *job) {
|
||||
void *pIter = taosHashIterate(job->succTasks, NULL);
|
||||
while (pIter) {
|
||||
SQueryTask *task = *(SQueryTask **)pIter;
|
||||
|
||||
schAsyncSendMsg(job, task, TSDB_MSG_TYPE_DROP_TASK);
|
||||
|
||||
pIter = taosHashIterate(job->succTasks, pIter);
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(job->failTasks, NULL);
|
||||
while (pIter) {
|
||||
SQueryTask *task = *(SQueryTask **)pIter;
|
||||
|
||||
schAsyncSendMsg(job, task, TSDB_MSG_TYPE_DROP_TASK);
|
||||
|
||||
pIter = taosHashIterate(job->succTasks, pIter);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t schedulerInit(SSchedulerCfg *cfg) {
|
||||
if (cfg) {
|
||||
|
@ -562,8 +669,8 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
|
|||
schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER;
|
||||
}
|
||||
|
||||
schMgmt.Jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
|
||||
if (NULL == schMgmt.Jobs) {
|
||||
schMgmt.jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
|
||||
if (NULL == schMgmt.jobs) {
|
||||
SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum);
|
||||
}
|
||||
|
||||
|
@ -605,9 +712,15 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi
|
|||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
job->failTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
|
||||
if (NULL == job->failTasks) {
|
||||
qError("taosHashInit %d failed", pDag->numOfSubplans);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
tsem_init(&job->rspSem, 0, 0);
|
||||
|
||||
if (0 != taosHashPut(schMgmt.Jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES)) {
|
||||
if (0 != taosHashPut(schMgmt.jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES)) {
|
||||
qError("taosHashPut queryId:%"PRIx64" failed", job->queryId);
|
||||
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
}
|
||||
|
@ -659,6 +772,8 @@ _return:
|
|||
int32_t scheduleCancelJob(void *pJob) {
|
||||
//TODO
|
||||
|
||||
//TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -670,7 +785,7 @@ void scheduleFreeJob(void *pJob) {
|
|||
SQueryJob *job = pJob;
|
||||
|
||||
if (job->status > 0) {
|
||||
if (0 != taosHashRemove(schMgmt.Jobs, &job->queryId, sizeof(job->queryId))) {
|
||||
if (0 != taosHashRemove(schMgmt.jobs, &job->queryId, sizeof(job->queryId))) {
|
||||
qError("remove job:%"PRIx64"from mgmt failed", job->queryId); // maybe already freed
|
||||
return;
|
||||
}
|
||||
|
@ -678,15 +793,17 @@ void scheduleFreeJob(void *pJob) {
|
|||
if (job->status == JOB_TASK_STATUS_EXECUTING) {
|
||||
scheduleCancelJob(pJob);
|
||||
}
|
||||
|
||||
schDropJobAllTasks(job);
|
||||
}
|
||||
|
||||
//TODO free job
|
||||
}
|
||||
|
||||
void schedulerDestroy(void) {
|
||||
if (schMgmt.Jobs) {
|
||||
taosHashCleanup(schMgmt.Jobs); //TODO
|
||||
schMgmt.Jobs = NULL;
|
||||
if (schMgmt.jobs) {
|
||||
taosHashCleanup(schMgmt.jobs); //TODO
|
||||
schMgmt.jobs = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -15,7 +15,7 @@ if $data00 != d1 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
if $data02 != 0 then
|
||||
if $data02 != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -51,7 +51,7 @@ if $data00 != d4 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
if $data02 != 0 then
|
||||
if $data02 != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
|
|
@ -1,7 +1,11 @@
|
|||
|
||||
#======================b1-start===============
|
||||
|
||||
# ---- user
|
||||
./test.sh -f general/user/basic1.sim
|
||||
|
||||
# ---- db
|
||||
./test.sh -f general/db/basic1.sim
|
||||
|
||||
|
||||
#======================b1-end===============
|
||||
|
|
Loading…
Reference in New Issue