Merge pull request #9929 from taosdata/feature/3.0_liaohj

Feature/3.0 liaohj
This commit is contained in:
Shengliang Guan 2022-01-20 17:33:56 +08:00 committed by GitHub
commit e2534b6a63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
39 changed files with 1085 additions and 474 deletions

View File

@ -7,3 +7,4 @@ FROM mcr.microsoft.com/vscode/devcontainers/cpp:0-${VARIANT}
# [Optional] Uncomment this section to install additional packages. # [Optional] Uncomment this section to install additional packages.
# RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \ # RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \
# && apt-get -y install --no-install-recommends <your-package-list-here> # && apt-get -y install --no-install-recommends <your-package-list-here>
RUN apt-get update && apt-get -y install tree vim

View File

@ -80,6 +80,13 @@ extern "C" {
#define FUNCTION_COV 38 #define FUNCTION_COV 38
typedef struct SResultRowEntryInfo {
int8_t hasResult; // result generated, not NULL value
bool initialized; // output buffer has been initialized
bool complete; // query has completed
uint32_t numOfRes; // num of output result in current buffer
} SResultRowEntryInfo;
// determine the real data need to calculated the result // determine the real data need to calculated the result
enum { enum {
BLK_DATA_NO_NEEDED = 0x0, BLK_DATA_NO_NEEDED = 0x0,
@ -127,6 +134,8 @@ typedef struct SFunctionFpSet {
void (*combine)(struct SQLFunctionCtx *pCtx); void (*combine)(struct SQLFunctionCtx *pCtx);
} SFunctionFpSet; } SFunctionFpSet;
extern SFunctionFpSet fpSet[1];
// sql function runtime context // sql function runtime context
typedef struct SQLFunctionCtx { typedef struct SQLFunctionCtx {
int32_t size; // number of rows int32_t size; // number of rows
@ -156,6 +165,7 @@ typedef struct SQLFunctionCtx {
SPoint1 start; SPoint1 start;
SPoint1 end; SPoint1 end;
int32_t columnIndex;
SFunctionFpSet* fpSet; SFunctionFpSet* fpSet;
} SQLFunctionCtx; } SQLFunctionCtx;

View File

@ -91,6 +91,7 @@ SSchema *getOneColumnSchema(const STableMeta* pTableMeta, int32_t colIndex);
int32_t getNewResColId(); int32_t getNewResColId();
void addIntoSourceParam(SSourceParam* pSourceParam, tExprNode* pNode, SColumn* pColumn); void addIntoSourceParam(SSourceParam* pSourceParam, tExprNode* pNode, SColumn* pColumn);
SExprInfo* createBinaryExprInfo(struct tExprNode* pNode, SSchema* pResSchema);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -114,10 +114,16 @@ typedef struct SProjectPhyNode {
SPhyNode node; SPhyNode node;
} SProjectPhyNode; } SProjectPhyNode;
typedef struct SDownstreamSource {
SQueryNodeAddr addr;
uint64_t taskId;
uint64_t schedId;
} SDownstreamSource;
typedef struct SExchangePhyNode { typedef struct SExchangePhyNode {
SPhyNode node; SPhyNode node;
uint64_t srcTemplateId; // template id of datasource suplans uint64_t srcTemplateId; // template id of datasource suplans
SArray *pSrcEndPoints; // SEpAddr, scheduler fill by calling qSetSuplanExecutionNode SArray *pSrcEndPoints; // SArray<SDownstreamSource>, scheduler fill by calling qSetSuplanExecutionNode
} SExchangePhyNode; } SExchangePhyNode;
typedef enum EAggAlgo { typedef enum EAggAlgo {
@ -178,7 +184,7 @@ int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag**
// @subplan subplan to be schedule // @subplan subplan to be schedule
// @templateId templateId of a group of datasource subplans of this @subplan // @templateId templateId of a group of datasource subplans of this @subplan
// @ep one execution location of this group of datasource subplans // @ep one execution location of this group of datasource subplans
void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep); void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource);
int32_t qExplainQuery(const struct SQueryNode* pQueryInfo, struct SEpSet* pQnode, char** str); int32_t qExplainQuery(const struct SQueryNode* pQueryInfo, struct SEpSet* pQnode, char** str);

View File

@ -179,7 +179,7 @@ extern int32_t clientConnRefPool;
extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code); extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code); int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code);
SMsgSendInfo* buildMsgInfoImpl(SRequestObj*); SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj);
int taos_init(); int taos_init();

View File

@ -195,8 +195,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
} }
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg); asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
} else { } else {
SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet; asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, pEpSet, &transporterId, pSendMsg);
} }
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
@ -257,7 +256,14 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) {
return pRequest->code; return pRequest->code;
} }
return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob); SArray *execNode = taosArrayInit(4, sizeof(SQueryNodeAddr));
SQueryNodeAddr addr = {.numOfEps = 1, .inUse = 0, .nodeId = 2};
addr.epAddr[0].port = 7100;
strcpy(addr.epAddr[0].fqdn, "localhost");
taosArrayPush(execNode, &addr);
return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, execNode, pDag, &pRequest->body.pQueryJob);
} }
typedef struct tmq_t tmq_t; typedef struct tmq_t tmq_t;
@ -699,6 +705,8 @@ void* doFetchRow(SRequestObj* pRequest) {
assert(pRequest != NULL); assert(pRequest != NULL);
SReqResultInfo* pResultInfo = &pRequest->body.resInfo; SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
SEpSet epSet = {0};
if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) { if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
if (pRequest->type == TDMT_VND_QUERY) { if (pRequest->type == TDMT_VND_QUERY) {
// All data has returned to App already, no need to try again // All data has returned to App already, no need to try again
@ -706,9 +714,13 @@ void* doFetchRow(SRequestObj* pRequest) {
return NULL; return NULL;
} }
scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData); int32_t code = scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData);
setQueryResultByRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pRequest->body.resInfo.pData); if (code != TSDB_CODE_SUCCESS) {
pRequest->code = code;
return NULL;
}
setQueryResultByRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pRequest->body.resInfo.pData);
if (pResultInfo->numOfRows == 0) { if (pResultInfo->numOfRows == 0) {
return NULL; return NULL;
} }
@ -716,8 +728,20 @@ void* doFetchRow(SRequestObj* pRequest) {
goto _return; goto _return;
} else if (pRequest->type == TDMT_MND_SHOW) { } else if (pRequest->type == TDMT_MND_SHOW) {
pRequest->type = TDMT_MND_SHOW_RETRIEVE; pRequest->type = TDMT_MND_SHOW_RETRIEVE;
epSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp);
} else if (pRequest->type == TDMT_VND_SHOW_TABLES) { } else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
pRequest->type = TDMT_VND_SHOW_TABLES_FETCH; pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
epSet.numOfEps = pVgroupInfo->numOfEps;
epSet.inUse = pVgroupInfo->inUse;
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
strncpy(epSet.fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(epSet.fqdn[i]));
epSet.port[i] = pVgroupInfo->epAddr[i].port;
}
} else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) { } else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
pRequest->type = TDMT_VND_SHOW_TABLES; pRequest->type = TDMT_VND_SHOW_TABLES;
SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo; SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
@ -735,19 +759,29 @@ void* doFetchRow(SRequestObj* pRequest) {
SMsgSendInfo* body = buildMsgInfoImpl(pRequest); SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
epSet.numOfEps = pVgroupInfo->numOfEps;
epSet.inUse = pVgroupInfo->inUse;
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
strncpy(epSet.fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(epSet.fqdn[i]));
epSet.port[i] = pVgroupInfo->epAddr[i].port;
}
int64_t transporterId = 0; int64_t transporterId = 0;
STscObj *pTscObj = pRequest->pTscObj; STscObj *pTscObj = pRequest->pTscObj;
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
pRequest->type = TDMT_VND_SHOW_TABLES_FETCH; pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
} else if (pRequest->type == TDMT_MND_SHOW_RETRIEVE && pResultInfo->pData != NULL) {
return NULL;
} }
SMsgSendInfo* body = buildMsgInfoImpl(pRequest); SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
int64_t transporterId = 0; int64_t transporterId = 0;
STscObj *pTscObj = pRequest->pTscObj; STscObj *pTscObj = pRequest->pTscObj;
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body); asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);

View File

@ -115,7 +115,7 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) {
} }
} else { } else {
assert(pRequest != NULL); assert(pRequest != NULL);
pMsgSendInfo->msgInfo = pRequest->body.requestMsg; pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
} }
pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)? genericRspCallback:handleRequestRspFp[TMSG_INDEX(pRequest->type)]; pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)? genericRspCallback:handleRequestRspFp[TMSG_INDEX(pRequest->type)];

View File

@ -15,6 +15,7 @@
#include <gtest/gtest.h> #include <gtest/gtest.h>
#include <taoserror.h> #include <taoserror.h>
#include <tglobal.h>
#include <iostream> #include <iostream>
#pragma GCC diagnostic ignored "-Wwrite-strings" #pragma GCC diagnostic ignored "-Wwrite-strings"
@ -46,17 +47,19 @@ int main(int argc, char** argv) {
return RUN_ALL_TESTS(); return RUN_ALL_TESTS();
} }
TEST(testCase, driverInit_Test) { taos_init(); } TEST(testCase, driverInit_Test) {
taosInitGlobalCfg();
TEST(testCase, connect_Test) { // taos_init();
TAOS* pConn = taos_connect("localhost", "root", "taosdata", "abc1", 0);
if (pConn == NULL) {
printf("failed to connect to server, reason:%s\n", taos_errstr(NULL));
}
sleep(3);
taos_close(pConn);
} }
//TEST(testCase, connect_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// if (pConn == NULL) {
// printf("failed to connect to server, reason:%s\n", taos_errstr(NULL));
// }
// taos_close(pConn);
//}
//
//TEST(testCase, create_user_Test) { //TEST(testCase, create_user_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); // assert(pConn != NULL);
@ -148,30 +151,30 @@ TEST(testCase, connect_Test) {
// taos_close(pConn); // taos_close(pConn);
//} //}
// //
TEST(testCase, create_db_Test) { //TEST(testCase, create_db_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); // assert(pConn != NULL);
//
TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2"); // TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2");
if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes)); // printf("error in create db, reason:%s\n", taos_errstr(pRes));
} // }
//
TAOS_FIELD* pFields = taos_fetch_fields(pRes); // TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == NULL); // ASSERT_TRUE(pFields == NULL);
//
int32_t numOfFields = taos_num_fields(pRes); // int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0); // ASSERT_EQ(numOfFields, 0);
//
taos_free_result(pRes); // taos_free_result(pRes);
//
pRes = taos_query(pConn, "create database abc1 vgroups 4"); // pRes = taos_query(pConn, "create database abc1 vgroups 4");
if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes)); // printf("error in create db, reason:%s\n", taos_errstr(pRes));
} // }
taos_close(pConn); // taos_close(pConn);
} //}
//
//TEST(testCase, create_dnode_Test) { //TEST(testCase, create_dnode_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); // assert(pConn != NULL);
@ -254,7 +257,7 @@ TEST(testCase, create_db_Test) {
// taos_free_result(pRes); // taos_free_result(pRes);
// taos_close(pConn); // taos_close(pConn);
//} //}
//
//TEST(testCase, create_stable_Test) { //TEST(testCase, create_stable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); // assert(pConn != NULL);
@ -275,17 +278,14 @@ TEST(testCase, create_db_Test) {
// //
// int32_t numOfFields = taos_num_fields(pRes); // int32_t numOfFields = taos_num_fields(pRes);
// ASSERT_EQ(numOfFields, 0); // ASSERT_EQ(numOfFields, 0);
//
// taos_free_result(pRes); // taos_free_result(pRes);
// taos_close(pConn);
//}
// //
// pRes = taos_query(pConn, "create stable if not exists abc1.`123_$^)` (ts timestamp, `abc` int) tags(a int)"); // pRes = taos_query(pConn, "create stable if not exists abc1.`123_$^)` (ts timestamp, `abc` int) tags(a int)");
// if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
// printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes)); // printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));
// } // }
// //
// TAOS_RES* pRes = taos_query(pConn, "use abc1"); // pRes = taos_query(pConn, "use abc1");
// taos_free_result(pRes); // taos_free_result(pRes);
// pRes = taos_query(pConn, "drop stable `123_$^)`"); // pRes = taos_query(pConn, "drop stable `123_$^)`");
// if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
@ -294,26 +294,26 @@ TEST(testCase, create_db_Test) {
// //
// taos_close(pConn); // taos_close(pConn);
//} //}
//
TEST(testCase, create_table_Test) { //TEST(testCase, create_table_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); // assert(pConn != NULL);
//
TAOS_RES* pRes = taos_query(pConn, "use abc1"); // TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes); // taos_free_result(pRes);
//
pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k int)"); // pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k int)");
ASSERT_EQ(taos_errno(pRes), 0); // ASSERT_EQ(taos_errno(pRes), 0);
//
taos_free_result(pRes); // taos_free_result(pRes);
//
pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k blob)"); // pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k blob)");
ASSERT_NE(taos_errno(pRes), 0); // ASSERT_NE(taos_errno(pRes), 0);
//
taos_free_result(pRes); // taos_free_result(pRes);
taos_close(pConn); // taos_close(pConn);
} //}
//
//TEST(testCase, create_ctable_Test) { //TEST(testCase, create_ctable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); // assert(pConn != NULL);
@ -324,6 +324,12 @@ TEST(testCase, create_table_Test) {
// } // }
// taos_free_result(pRes); // taos_free_result(pRes);
// //
// pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int ) tags(a int)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create stable, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "create table tm0 using st1 tags(1)"); // pRes = taos_query(pConn, "create table tm0 using st1 tags(1)");
// if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
// printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes)); // printf("failed to create child table tm0, reason:%s\n", taos_errstr(pRes));
@ -332,37 +338,31 @@ TEST(testCase, create_table_Test) {
// taos_free_result(pRes); // taos_free_result(pRes);
// taos_close(pConn); // taos_close(pConn);
//} //}
//
TEST(testCase, show_stable_Test) { //TEST(testCase, show_stable_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != nullptr); // assert(pConn != nullptr);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1"); // TAOS_RES* pRes = taos_query(pConn, "show abc1.stables");
// if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
// printf("failed to use db, reason:%s\n", taos_errstr(pRes)); // printf("failed to show stables, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// ASSERT_TRUE(false);
// } // }
//
// TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes);
//
// char str[512] = {0};
// while ((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str);
// }
//
// taos_free_result(pRes); // taos_free_result(pRes);
// taos_close(pConn);
TAOS_RES* pRes = taos_query(pConn, "show abc1.stables"); //}
if (taos_errno(pRes) != 0) {
printf("failed to show stables, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0};
while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
}
taos_free_result(pRes);
taos_close(pConn);
}
// //
//TEST(testCase, show_vgroup_Test) { //TEST(testCase, show_vgroup_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
@ -456,24 +456,28 @@ TEST(testCase, show_stable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); // assert(pConn != NULL);
// //
// TAOS_RES* pRes = taos_query(pConn, "use abc1"); // TAOS_RES* pRes = taos_query(pConn, "show tables");
// taos_free_result(pRes);
//
// pRes = taos_query(pConn, "show tables");
// if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
// printf("failed to show vgroups, reason:%s\n", taos_errstr(pRes)); // printf("failed to show tables, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes);
// }
//
// pRes = taos_query(pConn, "show abc1.tables");
// if (taos_errno(pRes) != 0) {
// printf("failed to show tables, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes); // taos_free_result(pRes);
// ASSERT_TRUE(false);
// } // }
// //
// TAOS_ROW pRow = NULL; // TAOS_ROW pRow = NULL;
// TAOS_FIELD* pFields = taos_fetch_fields(pRes); // TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// int32_t numOfFields = taos_num_fields(pRes); // int32_t numOfFields = taos_num_fields(pRes);
// //
// int32_t count = 0;
// char str[512] = {0}; // char str[512] = {0};
//
// while ((pRow = taos_fetch_row(pRes)) != NULL) { // while ((pRow = taos_fetch_row(pRes)) != NULL) {
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); // int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
// printf("%s\n", str); // printf("%d: %s\n", ++count, str);
// } // }
// //
// taos_free_result(pRes); // taos_free_result(pRes);
@ -521,30 +525,30 @@ TEST(testCase, show_stable_Test) {
// taosHashCleanup(phash); // taosHashCleanup(phash);
//} //}
// //
TEST(testCase, create_topic_Test) { //TEST(testCase, create_topic_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); // assert(pConn != NULL);
//
TAOS_RES* pRes = taos_query(pConn, "use abc1"); // TAOS_RES* pRes = taos_query(pConn, "use abc1");
if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes)); // printf("error in use db, reason:%s\n", taos_errstr(pRes));
} // }
taos_free_result(pRes); // taos_free_result(pRes);
//
TAOS_FIELD* pFields = taos_fetch_fields(pRes); // TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == nullptr); // ASSERT_TRUE(pFields == nullptr);
//
int32_t numOfFields = taos_num_fields(pRes); // int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0); // ASSERT_EQ(numOfFields, 0);
//
taos_free_result(pRes); // taos_free_result(pRes);
//
char* sql = "select * from tu"; // char* sql = "select * from tu";
pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql)); // pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
taos_free_result(pRes); // taos_free_result(pRes);
taos_close(pConn); // taos_close(pConn);
} //}
//
//TEST(testCase, insert_test) { //TEST(testCase, insert_test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_NE(pConn, nullptr); // ASSERT_NE(pConn, nullptr);
@ -562,7 +566,7 @@ TEST(testCase, create_topic_Test) {
// taos_free_result(pRes); // taos_free_result(pRes);
// taos_close(pConn); // taos_close(pConn);
//} //}
//
//TEST(testCase, projection_query_tables) { //TEST(testCase, projection_query_tables) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_NE(pConn, nullptr); // ASSERT_NE(pConn, nullptr);
@ -613,7 +617,7 @@ TEST(testCase, create_topic_Test) {
// taos_free_result(pRes); // taos_free_result(pRes);
// taos_close(pConn); // taos_close(pConn);
//} //}
//
//TEST(testCase, projection_query_stables) { //TEST(testCase, projection_query_stables) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_NE(pConn, nullptr); // ASSERT_NE(pConn, nullptr);
@ -621,7 +625,7 @@ TEST(testCase, create_topic_Test) {
// TAOS_RES* pRes = taos_query(pConn, "use abc1"); // TAOS_RES* pRes = taos_query(pConn, "use abc1");
// taos_free_result(pRes); // taos_free_result(pRes);
// //
// pRes = taos_query(pConn, "select ts,k from m1"); // pRes = taos_query(pConn, "select ts from m1");
// if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); // printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes); // taos_free_result(pRes);
@ -641,3 +645,31 @@ TEST(testCase, create_topic_Test) {
// taos_free_result(pRes); // taos_free_result(pRes);
// taos_close(pConn); // taos_close(pConn);
//} //}
TEST(testCase, agg_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes);
pRes = taos_query(pConn, "select count(*) from tu");
if (taos_errno(pRes) != 0) {
printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
ASSERT_TRUE(false);
}
TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes);
char str[512] = {0};
while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str);
}
taos_free_result(pRes);
taos_close(pConn);
}

View File

@ -230,7 +230,7 @@ typedef struct {
char acct[TSDB_USER_LEN]; char acct[TSDB_USER_LEN];
int64_t createdTime; int64_t createdTime;
int64_t updateTime; int64_t updateTime;
int64_t uid; uint64_t uid;
int32_t cfgVersion; int32_t cfgVersion;
int32_t vgVersion; int32_t vgVersion;
int8_t hashMethod; // default is 1 int8_t hashMethod; // default is 1

View File

@ -855,6 +855,10 @@ static int32_t mndRetrieveStb(SMnodeMsg *pReq, SShowObj *pShow, char *data, int3
if (pShow->pIter == NULL) break; if (pShow->pIter == NULL) break;
if (pStb->dbUid != pDb->uid) { if (pStb->dbUid != pDb->uid) {
if (strncmp(pStb->db, pDb->name, tListLen(pStb->db)) == 0) {
mError("Inconsistent table data, name:%s, db:%s, dbUid:%"PRIu64, pStb->name, pDb->name, pDb->uid);
}
sdbRelease(pSdb, pStb); sdbRelease(pSdb, pStb);
continue; continue;
} }

View File

@ -42,7 +42,8 @@ typedef struct {
SSchema *pSchema; SSchema *pSchema;
} SSchemaWrapper; } SSchemaWrapper;
typedef struct SMTbCursor SMTbCursor; typedef struct SMTbCursor SMTbCursor;
typedef struct SMCtbCursor SMCtbCursor;
typedef SVCreateTbReq STbCfg; typedef SVCreateTbReq STbCfg;
@ -64,6 +65,10 @@ SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
void metaCloseTbCursor(SMTbCursor *pTbCur); void metaCloseTbCursor(SMTbCursor *pTbCur);
char * metaTbCursorNext(SMTbCursor *pTbCur); char * metaTbCursorNext(SMTbCursor *pTbCur);
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid);
void metaCloseCtbCurosr(SMCtbCursor *pCtbCur);
tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur);
// Options // Options
void metaOptionsInit(SMetaCfg *pMetaCfg); void metaOptionsInit(SMetaCfg *pMetaCfg);
void metaOptionsClear(SMetaCfg *pMetaCfg); void metaOptionsClear(SMetaCfg *pMetaCfg);

View File

@ -93,6 +93,7 @@ int tsdbOptionsInit(STsdbCfg *);
void tsdbOptionsClear(STsdbCfg *); void tsdbOptionsClear(STsdbCfg *);
typedef void* tsdbReadHandleT; typedef void* tsdbReadHandleT;
/** /**
* Get the data block iterator, starting from position according to the query condition * Get the data block iterator, starting from position according to the query condition
* *
@ -124,6 +125,24 @@ tsdbReadHandleT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGro
bool isTsdbCacheLastRow(tsdbReadHandleT* pTsdbReadHandle); bool isTsdbCacheLastRow(tsdbReadHandleT* pTsdbReadHandle);
/**
*
* @param tsdb
* @param uid
* @param skey
* @param pTagCond
* @param len
* @param tagNameRelType
* @param tbnameCond
* @param pGroupInfo
* @param pColIndex
* @param numOfCols
* @param reqId
* @return
*/
int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len,
int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId);
/** /**
* get num of rows in mem table * get num of rows in mem table
* *

View File

@ -374,22 +374,27 @@ static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey
DBT * pDbt; DBT * pDbt;
if (pTbCfg->type == META_CHILD_TABLE) { if (pTbCfg->type == META_CHILD_TABLE) {
pDbt = calloc(2, sizeof(DBT)); // pDbt = calloc(2, sizeof(DBT));
// First key is suid // // First key is suid
pDbt[0].data = &(pTbCfg->ctbCfg.suid); // pDbt[0].data = &(pTbCfg->ctbCfg.suid);
pDbt[0].size = sizeof(pTbCfg->ctbCfg.suid); // pDbt[0].size = sizeof(pTbCfg->ctbCfg.suid);
// Second key is the first tag // // Second key is the first tag
void *pTagVal = tdGetKVRowValOfCol(pTbCfg->ctbCfg.pTag, (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId); // void *pTagVal = tdGetKVRowValOfCol(pTbCfg->ctbCfg.pTag, (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId);
pDbt[1].data = pTagVal; // pDbt[1].data = pTagVal;
pDbt[1].size = sizeof(int32_t); // pDbt[1].size = sizeof(int32_t);
// Set index key // Set index key
memset(pSKey, 0, sizeof(*pSKey)); memset(pSKey, 0, sizeof(*pSKey));
#if 0
pSKey->flags = DB_DBT_MULTIPLE | DB_DBT_APPMALLOC; pSKey->flags = DB_DBT_MULTIPLE | DB_DBT_APPMALLOC;
pSKey->data = pDbt; pSKey->data = pDbt;
pSKey->size = 2; pSKey->size = 2;
#else
pSKey->data = &(pTbCfg->ctbCfg.suid);
pSKey->size = sizeof(pTbCfg->ctbCfg.suid);
#endif
return 0; return 0;
} else { } else {
@ -622,3 +627,60 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
return pTSchema; return pTSchema;
} }
struct SMCtbCursor {
DBC * pCur;
tb_uid_t suid;
};
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
SMCtbCursor *pCtbCur = NULL;
SMetaDB * pDB = pMeta->pDB;
int ret;
pCtbCur = (SMCtbCursor *)calloc(1, sizeof(*pCtbCur));
if (pCtbCur == NULL) {
return NULL;
}
pCtbCur->suid = uid;
ret = pDB->pCtbIdx->cursor(pDB->pCtbIdx, NULL, &(pCtbCur->pCur), 0);
if (ret != 0) {
free(pCtbCur);
return NULL;
}
return pCtbCur;
}
void metaCloseCtbCurosr(SMCtbCursor *pCtbCur) {
if (pCtbCur) {
if (pCtbCur->pCur) {
pCtbCur->pCur->close(pCtbCur->pCur);
}
free(pCtbCur);
}
}
tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
DBT skey = {0};
DBT pkey = {0};
DBT pval = {0};
void * pBuf;
STbCfg tbCfg;
// Set key
skey.data = &(pCtbCur->suid);
skey.size = sizeof(pCtbCur->suid);
if (pCtbCur->pCur->pget(pCtbCur->pCur, &skey, &pkey, &pval, DB_NEXT) == 0) {
tb_uid_t id = *(tb_uid_t *)pkey.data;
assert(id != 0);
return id;
// metaDecodeTbInfo(pBuf, &tbCfg);
// return tbCfg.;
} else {
return 0;
}
}

View File

@ -152,7 +152,7 @@ typedef struct STsdbReadHandle {
typedef struct STableGroupSupporter { typedef struct STableGroupSupporter {
int32_t numOfCols; int32_t numOfCols;
SColIndex* pCols; SColIndex* pCols;
STSchema* pTagSchema; SSchema* pTagSchema;
} STableGroupSupporter; } STableGroupSupporter;
static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList); static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList);
@ -466,7 +466,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
return (tsdbReadHandleT)pReadHandle; return (tsdbReadHandleT)pReadHandle;
_end: _end:
// tsdbCleanupQueryHandle(pTsdbReadHandle); tsdbCleanupQueryHandle(pReadHandle);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL; return NULL;
} }
@ -2630,18 +2630,20 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
return numOfRows; return numOfRows;
} }
static int32_t getAllTableList(STable* pSuperTable, SArray* list) { static int32_t getAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
SSkipListIterator* iter = NULL;//tSkipListCreateIter(pSuperTable->pIndex); SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, uid);
while (tSkipListIterNext(iter)) {
SSkipListNode* pNode = tSkipListIterGet(iter);
STable* pTable = (STable*) SL_GET_NODE_DATA((SSkipListNode*) pNode); while (1) {
tb_uid_t id = metaCtbCursorNext(pCur);
if (id == 0) {
break;
}
STableKeyInfo info = {.pTable = pTable, .lastKey = TSKEY_INITIAL_VAL}; STableKeyInfo info = {.pTable = NULL, .lastKey = TSKEY_INITIAL_VAL, uid = id};
taosArrayPush(list, &info); taosArrayPush(list, &info);
} }
tSkipListDestroyIter(iter); metaCloseCtbCurosr(pCur);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -3553,7 +3555,7 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable
taosArrayPush(pGroups, &g); taosArrayPush(pGroups, &g);
} }
SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols, TSKEY skey) { SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols, TSKEY skey) {
assert(pTableList != NULL); assert(pTableList != NULL);
SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES); SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
@ -3564,25 +3566,18 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
} }
if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table
SArray* sa = taosArrayInit(size, sizeof(STableKeyInfo)); SArray* sa = taosArrayDup(pTableList);
if (sa == NULL) { if (sa == NULL) {
taosArrayDestroy(pTableGroup); taosArrayDestroy(pTableGroup);
return NULL; return NULL;
} }
for(int32_t i = 0; i < size; ++i) {
STableKeyInfo *pKeyInfo = taosArrayGet(pTableList, i);
STableKeyInfo info = {.pTable = pKeyInfo->pTable, .lastKey = skey};
taosArrayPush(sa, &info);
}
taosArrayPush(pTableGroup, &sa); taosArrayPush(pTableGroup, &sa);
tsdbDebug("all %" PRIzu " tables belong to one group", size); tsdbDebug("all %" PRIzu " tables belong to one group", size);
} else { } else {
STableGroupSupporter sup = {0}; STableGroupSupporter sup = {0};
sup.numOfCols = numOfOrderCols; sup.numOfCols = numOfOrderCols;
sup.pTagSchema = pTagSchema; sup.pTagSchema = pTagSchema->pSchema;
sup.pCols = pCols; sup.pCols = pCols;
// taosqsort(pTableList->pData, size, sizeof(STableKeyInfo), &sup, tableGroupComparFn); // taosqsort(pTableList->pData, size, sizeof(STableKeyInfo), &sup, tableGroupComparFn);
@ -3710,12 +3705,11 @@ int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const ch
//NOTE: not add ref count for super table //NOTE: not add ref count for super table
SArray* res = taosArrayInit(8, sizeof(STableKeyInfo)); SArray* res = taosArrayInit(8, sizeof(STableKeyInfo));
STSchema* pTagSchema = metaGetTableSchema(tsdb->pMeta, uid, 0, true); SSchemaWrapper* pTagSchema = metaGetTableSchema(tsdb->pMeta, uid, 0, true);
// no tags and tbname condition, all child tables of this stable are involved // no tags and tbname condition, all child tables of this stable are involved
if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) { if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
assert(false); int32_t ret = getAllTableList(tsdb->pMeta, uid, res);
int32_t ret = 0;//getAllTableList(pTable, res);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
@ -3854,7 +3848,7 @@ int32_t tsdbGetTableGroupFromIdList(STsdb* tsdb, SArray* pTableIdList, STableGro
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#endif
static void* doFreeColumnInfoData(SArray* pColumnInfoData) { static void* doFreeColumnInfoData(SArray* pColumnInfoData) {
if (pColumnInfoData == NULL) { if (pColumnInfoData == NULL) {
return NULL; return NULL;
@ -3883,6 +3877,7 @@ static void* destroyTableCheckInfo(SArray* pTableCheckInfo) {
return NULL; return NULL;
} }
void tsdbCleanupQueryHandle(tsdbReadHandleT queryHandle) { void tsdbCleanupQueryHandle(tsdbReadHandleT queryHandle) {
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle; STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle;
if (pTsdbReadHandle == NULL) { if (pTsdbReadHandle == NULL) {
@ -3921,6 +3916,7 @@ void tsdbCleanupQueryHandle(tsdbReadHandleT queryHandle) {
tfree(pTsdbReadHandle); tfree(pTsdbReadHandle);
} }
#if 0
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) { void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) {
assert(pGroupList != NULL); assert(pGroupList != NULL);

View File

@ -94,7 +94,7 @@ struct SUdfInfo;
int32_t getOutputInterResultBufSize(struct STaskAttr* pQueryAttr); int32_t getOutputInterResultBufSize(struct STaskAttr* pQueryAttr);
size_t getResultRowSize(struct STaskRuntimeEnv* pRuntimeEnv); size_t getResultRowSize(SArray* pExprInfo);
int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size, int16_t type); int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size, int16_t type);
void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo); void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo);

View File

@ -375,9 +375,12 @@ typedef struct STaskParam {
} STaskParam; } STaskParam;
typedef struct SExchangeInfo { typedef struct SExchangeInfo {
int32_t numOfSources; SArray *pSources;
SEpSet *pEpset; int32_t bytes; // total load bytes from remote
int32_t bytes; // total load bytes from remote tsem_t ready;
void *pTransporter;
SRetrieveTableRsp *pRsp;
SSDataBlock *pResult;
} SExchangeInfo; } SExchangeInfo;
typedef struct STableScanInfo { typedef struct STableScanInfo {
@ -411,18 +414,30 @@ typedef struct STagScanInfo {
int32_t curPos; int32_t curPos;
} STagScanInfo; } STagScanInfo;
typedef struct SStreamBlockScanInfo {
} SStreamBlockScanInfo;
typedef struct SOptrBasicInfo { typedef struct SOptrBasicInfo {
SResultRowInfo resultRowInfo; SResultRowInfo resultRowInfo;
int32_t *rowCellInfoOffset; // offset value for each row result cell info int32_t *rowCellInfoOffset; // offset value for each row result cell info
SQLFunctionCtx *pCtx; SQLFunctionCtx *pCtx;
SSDataBlock *pRes; SSDataBlock *pRes;
void *keyBuf;
} SOptrBasicInfo; } SOptrBasicInfo;
typedef struct SOptrBasicInfo STableIntervalOperatorInfo; typedef struct SOptrBasicInfo STableIntervalOperatorInfo;
typedef struct SAggOperatorInfo { typedef struct SAggOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
uint32_t seed; uint32_t seed;
SDiskbasedResultBuf *pResultBuf; // query result buffer based on blocked-wised disk file
SHashObj* pResultRowHashTable; // quick locate the window object for each result
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
SArray* pResultRowArrayList; // The array list that contains the Result rows
char* keyBuf; // window key buffer
SResultRowPool* pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object.
STableQueryInfo *current;
} SAggOperatorInfo; } SAggOperatorInfo;
typedef struct SProjectOperatorInfo { typedef struct SProjectOperatorInfo {
@ -543,15 +558,14 @@ typedef struct SOrderOperatorInfo {
SSDataBlock *pDataBlock; SSDataBlock *pDataBlock;
} SOrderOperatorInfo; } SOrderOperatorInfo;
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream); SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pSchema, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createExchangeOperatorInfo(const SVgroupInfo* pVgroups, int32_t numOfSources, int32_t numOfOutput, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo); SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableSeqScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableSeqScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createSubmitBlockScanOperatorInfo(void *pSubmitBlockReadHandle, int32_t numOfOutput, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream);
SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput);

View File

@ -168,16 +168,14 @@ void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16_
} }
// TODO refactor: use macro // TODO refactor: use macro
struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset) { SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset) {
assert(index >= 0 && offset != NULL); assert(index >= 0 && offset != NULL);
// return (SResultRowEntryInfo*)((char*) pRow->pCellInfo + offset[index]); return (SResultRowEntryInfo*)((char*) pRow->pEntryInfo + offset[index]);
return NULL;
} }
size_t getResultRowSize(STaskRuntimeEnv* pRuntimeEnv) { size_t getResultRowSize(SArray* pExprInfo) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; size_t numOfOutput = taosArrayGetSize(pExprInfo);
return 0; return (numOfOutput * sizeof(SResultRowEntryInfo)) + /*pQueryAttr->interBufSize +*/ sizeof(SResultRow);
// return (pQueryAttr->numOfOutput * sizeof(SResultRowEntryInfo)) + pQueryAttr->interBufSize + sizeof(SResultRow);
} }
SResultRowPool* initResultRowPool(size_t size) { SResultRowPool* initResultRowPool(size_t size) {

View File

@ -13,10 +13,11 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "os.h" #include <tsdb.h>
#include "tarray.h"
#include "dataSinkMgt.h" #include "dataSinkMgt.h"
#include "exception.h" #include "exception.h"
#include "os.h"
#include "tarray.h"
#include "tcache.h" #include "tcache.h"
#include "tglobal.h" #include "tglobal.h"
#include "tmsg.h" #include "tmsg.h"
@ -87,7 +88,7 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_
*handle = (*pTask)->dsHandle; *handle = (*pTask)->dsHandle;
_error: _error:
// if failed to add ref for all tables in this query, abort current query // if failed to add ref for all tables in this query, abort current query
return code; return code;
} }
@ -141,6 +142,11 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int64_t threadId = taosGetSelfPthreadId(); int64_t threadId = taosGetSelfPthreadId();
// todo: remove it.
if (tinfo == NULL) {
return TSDB_CODE_SUCCESS;
}
*pRes = NULL; *pRes = NULL;
int64_t curOwner = 0; int64_t curOwner = 0;

File diff suppressed because it is too large Load Diff

View File

@ -28,14 +28,8 @@ extern "C" {
#include "function.h" #include "function.h"
#include "tudf.h" #include "tudf.h"
extern SAggFunctionInfo aggFunc[35];
typedef struct SResultRowEntryInfo { extern SAggFunctionInfo aggFunc[35];
int8_t hasResult; // result generated, not NULL value
bool initialized; // output buffer has been initialized
bool complete; // query has completed
uint32_t numOfRes; // num of output result in current buffer
} SResultRowEntryInfo;
#define FUNCSTATE_SO 0x0u #define FUNCSTATE_SO 0x0u
#define FUNCSTATE_MO 0x1u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM #define FUNCSTATE_MO 0x1u // dynamic number of output, not multinumber of output e.g., TOP/BOTTOM

View File

@ -4379,6 +4379,24 @@ int32_t functionCompatList[] = {
6, 8, 7, 6, 8, 7,
}; };
//typedef struct SFunctionFpSet {
// bool (*init)(struct SQLFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); // setup the execute environment
// void (*addInput)(struct SQLFunctionCtx *pCtx);
//
// // finalizer must be called after all exec has been executed to generated final result.
// void (*finalize)(struct SQLFunctionCtx *pCtx);
// void (*combine)(struct SQLFunctionCtx *pCtx);
//} SFunctionFpSet;
SFunctionFpSet fpSet[1] = {
{
.init = function_setup,
.addInput = count_function,
.finalize = doFinalizer,
.combine = count_func_merge,
}
};
SAggFunctionInfo aggFunc[35] = {{ SAggFunctionInfo aggFunc[35] = {{
// 0, count function does not invoke the finalize function // 0, count function does not invoke the finalize function
"count", "count",

View File

@ -30,7 +30,6 @@ SSchema *getTableTagSchema(const STableMeta* pTableMeta);
SArray *getCurrentExprList(SQueryStmtInfo* pQueryInfo); SArray *getCurrentExprList(SQueryStmtInfo* pQueryInfo);
size_t getNumOfExprs(SQueryStmtInfo* pQueryInfo); size_t getNumOfExprs(SQueryStmtInfo* pQueryInfo);
SExprInfo* createBinaryExprInfo(struct tExprNode* pNode, SSchema* pResSchema);
void addExprInfo(SArray* pExprList, int32_t index, SExprInfo* pExprInfo, int32_t level); void addExprInfo(SArray* pExprList, int32_t index, SExprInfo* pExprInfo, int32_t level);
void updateExprInfo(SExprInfo* pExprInfo, int16_t functionId, int32_t colId, int16_t srcColumnIndex, int16_t resType, int16_t resSize); void updateExprInfo(SExprInfo* pExprInfo, int16_t functionId, int32_t colId, int16_t srcColumnIndex, int16_t resType, int16_t resSize);

View File

@ -2080,7 +2080,7 @@ static int32_t setColumnIndex(SQueryStmtInfo* pQueryInfo, SArray* pParamList, SC
STableMeta* pTableMeta = getMetaInfo(pQueryInfo, 0)->pTableMeta; STableMeta* pTableMeta = getMetaInfo(pQueryInfo, 0)->pTableMeta;
if (pParamList == NULL) { if (pParamList == NULL) {
// count(*) is equalled to count(primary_timestamp_key) // count(*) is equalled to count(primary_timestamp_key)
*index = (SColumnIndex) {0, PRIMARYKEY_TIMESTAMP_COL_ID, false}; *index = (SColumnIndex) {0, 0, false};
*columnSchema = *(SSchema*) getOneColumnSchema(pTableMeta, index->columnIndex); *columnSchema = *(SSchema*) getOneColumnSchema(pTableMeta, index->columnIndex);
} else { } else {
tSqlExprItem* pParamElem = taosArrayGet(pParamList, 0); tSqlExprItem* pParamElem = taosArrayGet(pParamList, 0);
@ -3955,6 +3955,7 @@ int32_t qParserValidateSqlNode(SParseContext *pCtx, SSqlInfo* pInfo, SQueryStmtI
pQueryInfo->pTableMetaInfo[0]->name = *name; pQueryInfo->pTableMetaInfo[0]->name = *name;
pQueryInfo->numOfTables = 1; pQueryInfo->numOfTables = 1;
pQueryInfo->pTableMetaInfo[0]->tagColList = taosArrayInit(4, POINTER_BYTES); pQueryInfo->pTableMetaInfo[0]->tagColList = taosArrayInit(4, POINTER_BYTES);
strcpy(pQueryInfo->pTableMetaInfo[0]->aliasName, name->tname);
code = setTableVgroupList(pCtx, name, &pQueryInfo->pTableMetaInfo[0]->vgroupList); code = setTableVgroupList(pCtx, name, &pQueryInfo->pTableMetaInfo[0]->vgroupList);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {

View File

@ -26,18 +26,26 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseContext* pCtx, void** out
const char* msg4 = "pattern is invalid"; const char* msg4 = "pattern is invalid";
const char* msg5 = "database name is empty"; const char* msg5 = "database name is empty";
const char* msg6 = "pattern string is empty"; const char* msg6 = "pattern string is empty";
const char* msg7 = "db is not specified";
/* /*
* database prefix in pInfo->pMiscInfo->a[0] * database prefix in pInfo->pMiscInfo->a[0]
* wildcard in like clause in pInfo->pMiscInfo->a[1] * wildcard in like clause in pInfo->pMiscInfo->a[1]
*/ */
int16_t showType = pShowInfo->showType; int16_t showType = pShowInfo->showType;
if (showType == TSDB_MGMT_TABLE_TABLE) { if (showType == TSDB_MGMT_TABLE_TABLE) {
SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq));
SArray* array = NULL; SArray* array = NULL;
SName name = {0}; SName name = {0};
tNameSetDbName(&name, pCtx->acctId, pCtx->db, strlen(pCtx->db));
if (pCtx->db == NULL && pShowInfo->prefix.n == 0) {
return buildInvalidOperationMsg(pMsgBuf, msg7);
}
SVShowTablesReq* pShowReq = calloc(1, sizeof(SVShowTablesReq));
if (pShowInfo->prefix.n > 0) {
tNameSetDbName(&name, pCtx->acctId, pShowInfo->prefix.z, pShowInfo->prefix.n);
} else {
tNameSetDbName(&name, pCtx->acctId, pCtx->db, strlen(pCtx->db));
}
char dbFname[TSDB_DB_FNAME_LEN] = {0}; char dbFname[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(&name, dbFname); tNameGetFullDbName(&name, dbFname);
@ -715,6 +723,8 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch
SMsgBuf m = {.buf = msgBuf, .len = msgBufLen}; SMsgBuf m = {.buf = msgBuf, .len = msgBufLen};
SMsgBuf* pMsgBuf = &m; SMsgBuf* pMsgBuf = &m;
pDcl->epSet = pCtx->mgmtEpSet;
switch (pInfo->type) { switch (pInfo->type) {
case TSDB_SQL_CREATE_USER: case TSDB_SQL_CREATE_USER:
case TSDB_SQL_ALTER_USER: { case TSDB_SQL_ALTER_USER: {

View File

@ -1947,6 +1947,7 @@ int32_t KvRowAppend(const void *value, int32_t len, void *param) {
int32_t createSName(SName* pName, SToken* pTableName, SParseContext* pParseCtx, SMsgBuf* pMsgBuf) { int32_t createSName(SName* pName, SToken* pTableName, SParseContext* pParseCtx, SMsgBuf* pMsgBuf) {
const char* msg1 = "name too long"; const char* msg1 = "name too long";
const char* msg2 = "invalid database name"; const char* msg2 = "invalid database name";
const char* msg3 = "db is not specified";
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
char* p = strnchr(pTableName->z, TS_PATH_DELIMITER[0], pTableName->n, true); char* p = strnchr(pTableName->z, TS_PATH_DELIMITER[0], pTableName->n, true);
@ -1984,6 +1985,10 @@ int32_t createSName(SName* pName, SToken* pTableName, SParseContext* pParseCtx,
strncpy(name, pTableName->z, pTableName->n); strncpy(name, pTableName->z, pTableName->n);
strdequote(name); strdequote(name);
if (pParseCtx->db == NULL) {
return buildInvalidOperationMsg(pMsgBuf, msg3);
}
code = tNameSetDbName(pName, pParseCtx->acctId, pParseCtx->db, strlen(pParseCtx->db)); code = tNameSetDbName(pName, pParseCtx->acctId, pParseCtx->db, strlen(pParseCtx->db));
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
code = buildInvalidOperationMsg(pMsgBuf, msg2); code = buildInvalidOperationMsg(pMsgBuf, msg2);

View File

@ -106,7 +106,7 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str);
int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql); int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql);
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, uint64_t requestId); int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, uint64_t requestId);
void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep); void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource);
int32_t subPlanToString(const SSubplan *pPhyNode, char** str, int32_t* len); int32_t subPlanToString(const SSubplan *pPhyNode, char** str, int32_t* len);
int32_t stringToSubplan(const char* str, SSubplan** subplan); int32_t stringToSubplan(const char* str, SSubplan** subplan);

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <function.h>
#include "function.h" #include "function.h"
#include "os.h" #include "os.h"
#include "parser.h" #include "parser.h"
@ -197,23 +198,23 @@ static SQueryPlanNode* doAddTableColumnNode(const SQueryStmtInfo* pQueryInfo, SQ
SQueryPlanNode* pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info); SQueryPlanNode* pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info);
if (!pQueryInfo->info.projectionQuery) { if (!pQueryInfo->info.projectionQuery) {
STableMetaInfo* pTableMetaInfo1 = getMetaInfo(pQueryInfo, 0); SArray* p = pQueryInfo->exprList[0];
// table source column projection, generate the projection expr // table source column projection, generate the projection expr
int32_t numOfCols = (int32_t) taosArrayGetSize(tableCols); int32_t numOfCols = (int32_t) taosArrayGetSize(tableCols);
SExprInfo** pExpr = calloc(numOfCols, POINTER_BYTES);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumn* pCol = taosArrayGetP(tableCols, i);
SSourceParam param = {0}; pNode->numOfExpr = numOfCols;
addIntoSourceParam(&param, NULL, pCol); pNode->pExpr = taosArrayInit(numOfCols, POINTER_BYTES);
SSchema s = createSchema(pCol->info.type, pCol->info.bytes, pCol->info.colId, pCol->name); for(int32_t i = 0; i < numOfCols; ++i) {
SExprInfo* p = createExprInfo(pTableMetaInfo1, "project", &param, &s, 0); SExprInfo* pExprInfo = taosArrayGetP(p, i);
pExpr[i] = p; SColumn* pCol = pExprInfo->base.pColumns;
SSchema schema = createSchema(pCol->info.type, pCol->info.bytes, pCol->info.colId, pCol->name);
tExprNode* pExprNode = pExprInfo->pExpr->_function.pChild[0];
SExprInfo* px = createBinaryExprInfo(pExprNode, &schema);
taosArrayPush(pNode->pExpr, &px);
} }
// pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExpr, numOfCols, NULL);
tfree(pExpr);
} }
return pNode; return pNode;

View File

@ -278,7 +278,7 @@ static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNod
static SPhyNode* createExchangeNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, uint64_t srcTemplateId) { static SPhyNode* createExchangeNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, uint64_t srcTemplateId) {
SExchangePhyNode* node = (SExchangePhyNode*)initPhyNode(pPlanNode, OP_Exchange, sizeof(SExchangePhyNode)); SExchangePhyNode* node = (SExchangePhyNode*)initPhyNode(pPlanNode, OP_Exchange, sizeof(SExchangePhyNode));
node->srcTemplateId = srcTemplateId; node->srcTemplateId = srcTemplateId;
node->pSrcEndPoints = validPointer(taosArrayInit(TARRAY_MIN_SIZE, sizeof(SQueryNodeAddr))); node->pSrcEndPoints = validPointer(taosArrayInit(TARRAY_MIN_SIZE, sizeof(SDownstreamSource)));
return (SPhyNode*)node; return (SPhyNode*)node;
} }
@ -409,24 +409,25 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void setExchangSourceNode(uint64_t templateId, SQueryNodeAddr* pEp, SPhyNode* pNode) { void setExchangSourceNode(uint64_t templateId, SDownstreamSource *pSource, SPhyNode* pNode) {
if (NULL == pNode) { if (NULL == pNode) {
return; return;
} }
if (OP_Exchange == pNode->info.type) { if (OP_Exchange == pNode->info.type) {
SExchangePhyNode* pExchange = (SExchangePhyNode*)pNode; SExchangePhyNode* pExchange = (SExchangePhyNode*)pNode;
if (templateId == pExchange->srcTemplateId) { if (templateId == pExchange->srcTemplateId) {
taosArrayPush(pExchange->pSrcEndPoints, pEp); taosArrayPush(pExchange->pSrcEndPoints, pSource);
} }
} }
if (pNode->pChildren != NULL) { if (pNode->pChildren != NULL) {
size_t size = taosArrayGetSize(pNode->pChildren); size_t size = taosArrayGetSize(pNode->pChildren);
for(int32_t i = 0; i < size; ++i) { for(int32_t i = 0; i < size; ++i) {
setExchangSourceNode(templateId, pEp, taosArrayGetP(pNode->pChildren, i)); setExchangSourceNode(templateId, pSource, taosArrayGetP(pNode->pChildren, i));
} }
} }
} }
void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* pEp) { void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource) {
setExchangSourceNode(templateId, pEp, subplan->pNode); setExchangSourceNode(templateId, pSource, subplan->pNode);
} }

View File

@ -29,6 +29,14 @@ static void copyString(const cJSON* json, const char* name, char* dst) {
strcpy(dst, cJSON_GetStringValue(cJSON_GetObjectItem(json, name))); strcpy(dst, cJSON_GetStringValue(cJSON_GetObjectItem(json, name)));
} }
static uint64_t getBigintFromString(const cJSON* json, const char* name) {
char* val = getString(json, name);
uint64_t intVal = strtoul(val, NULL, 10);
tfree(val);
return intVal;
}
static int64_t getNumber(const cJSON* json, const char* name) { static int64_t getNumber(const cJSON* json, const char* name) {
double d = cJSON_GetNumberValue(cJSON_GetObjectItem(json, name)); double d = cJSON_GetNumberValue(cJSON_GetObjectItem(json, name));
return (int64_t) d; return (int64_t) d;
@ -400,7 +408,7 @@ static bool functionToJson(const void* obj, cJSON* jFunc) {
const tExprNode* exprInfo = (const tExprNode*)obj; const tExprNode* exprInfo = (const tExprNode*)obj;
bool res = cJSON_AddStringToObject(jFunc, jkFunctionName, exprInfo->_function.functionName); bool res = cJSON_AddStringToObject(jFunc, jkFunctionName, exprInfo->_function.functionName);
if (res && NULL != exprInfo->_function.pChild) { if (res && NULL != exprInfo->_function.pChild) {
res = addRawArray(jFunc, jkFunctionChild, exprNodeToJson, *(exprInfo->_function.pChild), sizeof(tExprNode*), exprInfo->_function.num); res = addRawArray(jFunc, jkFunctionChild, exprNodeToJson, exprInfo->_function.pChild, sizeof(tExprNode*), exprInfo->_function.num);
} }
return res; return res;
} }
@ -459,7 +467,7 @@ static const char* jkExprNodeColumn = "Column";
static const char* jkExprNodeValue = "Value"; static const char* jkExprNodeValue = "Value";
static bool exprNodeToJson(const void* obj, cJSON* jExprInfo) { static bool exprNodeToJson(const void* obj, cJSON* jExprInfo) {
const tExprNode* exprInfo = (const tExprNode*)obj; const tExprNode* exprInfo = *(const tExprNode**)obj;
bool res = cJSON_AddNumberToObject(jExprInfo, jkExprNodeType, exprInfo->nodeType); bool res = cJSON_AddNumberToObject(jExprInfo, jkExprNodeType, exprInfo->nodeType);
if (res) { if (res) {
switch (exprInfo->nodeType) { switch (exprInfo->nodeType) {
@ -547,7 +555,7 @@ static bool exprInfoToJson(const void* obj, cJSON* jExprInfo) {
const SExprInfo* exprInfo = (const SExprInfo*)obj; const SExprInfo* exprInfo = (const SExprInfo*)obj;
bool res = addObject(jExprInfo, jkExprInfoBase, sqlExprToJson, &exprInfo->base); bool res = addObject(jExprInfo, jkExprInfoBase, sqlExprToJson, &exprInfo->base);
if (res) { if (res) {
res = addObject(jExprInfo, jkExprInfoExpr, exprNodeToJson, exprInfo->pExpr); res = addObject(jExprInfo, jkExprInfoExpr, exprNodeToJson, &exprInfo->pExpr);
} }
return res; return res;
} }
@ -567,13 +575,13 @@ static const char* jkTimeWindowEndKey = "EndKey";
static bool timeWindowToJson(const void* obj, cJSON* json) { static bool timeWindowToJson(const void* obj, cJSON* json) {
const STimeWindow* win = (const STimeWindow*)obj; const STimeWindow* win = (const STimeWindow*)obj;
char tmp[32] = {0}; char tmp[40] = {0};
sprintf(tmp, "%"PRId64, win->skey); snprintf(tmp, tListLen(tmp),"%"PRId64, win->skey);
bool res = cJSON_AddStringToObject(json, jkTimeWindowStartKey, tmp); bool res = cJSON_AddStringToObject(json, jkTimeWindowStartKey, tmp);
if (res) { if (res) {
memset(tmp, 0, tListLen(tmp)); memset(tmp, 0, tListLen(tmp));
sprintf(tmp, "%"PRId64, win->ekey); snprintf(tmp, tListLen(tmp),"%"PRId64, win->ekey);
res = cJSON_AddStringToObject(json, jkTimeWindowEndKey, tmp); res = cJSON_AddStringToObject(json, jkTimeWindowEndKey, tmp);
} }
return res; return res;
@ -581,12 +589,8 @@ static bool timeWindowToJson(const void* obj, cJSON* json) {
static bool timeWindowFromJson(const cJSON* json, void* obj) { static bool timeWindowFromJson(const cJSON* json, void* obj) {
STimeWindow* win = (STimeWindow*)obj; STimeWindow* win = (STimeWindow*)obj;
win->skey = getBigintFromString(json, jkTimeWindowStartKey);
char* p = getString(json, jkTimeWindowStartKey); win->ekey = getBigintFromString(json, jkTimeWindowEndKey);
win->skey = strtoll(p, NULL, 10);
p = getString(json, jkTimeWindowEndKey);
win->ekey = strtoll(p, NULL, 10);
return true; return true;
} }
@ -598,14 +602,19 @@ static const char* jkScanNodeTableRevCount = "Reverse";
static bool scanNodeToJson(const void* obj, cJSON* json) { static bool scanNodeToJson(const void* obj, cJSON* json) {
const SScanPhyNode* pNode = (const SScanPhyNode*)obj; const SScanPhyNode* pNode = (const SScanPhyNode*)obj;
bool res = cJSON_AddNumberToObject(json, jkScanNodeTableId, pNode->uid);
char uid[40] = {0};
snprintf(uid, tListLen(uid), "%"PRIu64, pNode->uid);
bool res = cJSON_AddStringToObject(json, jkScanNodeTableId, uid);
if (res) { if (res) {
res = cJSON_AddNumberToObject(json, jkScanNodeTableType, pNode->tableType); res = cJSON_AddNumberToObject(json, jkScanNodeTableType, pNode->tableType);
} }
if (res) { if (res) {
res = cJSON_AddNumberToObject(json, jkScanNodeTableOrder, pNode->order); res = cJSON_AddNumberToObject(json, jkScanNodeTableOrder, pNode->order);
} }
if (res) { if (res) {
res = cJSON_AddNumberToObject(json, jkScanNodeTableCount, pNode->count); res = cJSON_AddNumberToObject(json, jkScanNodeTableCount, pNode->count);
} }
@ -613,12 +622,14 @@ static bool scanNodeToJson(const void* obj, cJSON* json) {
if (res) { if (res) {
res = cJSON_AddNumberToObject(json, jkScanNodeTableRevCount, pNode->reverse); res = cJSON_AddNumberToObject(json, jkScanNodeTableRevCount, pNode->reverse);
} }
return res; return res;
} }
static bool scanNodeFromJson(const cJSON* json, void* obj) { static bool scanNodeFromJson(const cJSON* json, void* obj) {
SScanPhyNode* pNode = (SScanPhyNode*)obj; SScanPhyNode* pNode = (SScanPhyNode*)obj;
pNode->uid = getNumber(json, jkScanNodeTableId);
pNode->uid = getBigintFromString(json, jkScanNodeTableId);
pNode->tableType = getNumber(json, jkScanNodeTableType); pNode->tableType = getNumber(json, jkScanNodeTableType);
pNode->count = getNumber(json, jkScanNodeTableCount); pNode->count = getNumber(json, jkScanNodeTableCount);
pNode->order = getNumber(json, jkScanNodeTableOrder); pNode->order = getNumber(json, jkScanNodeTableOrder);
@ -739,40 +750,72 @@ static bool epAddrFromJson(const cJSON* json, void* obj) {
return true; return true;
} }
static const char* jkNodeAddrId = "NodeId"; static const char* jkNodeAddrId = "NodeId";
static const char* jkNodeAddrInUse = "InUse"; static const char* jkNodeAddrInUse = "InUse";
static const char* jkNodeAddrEpAddrs = "EpAddrs"; static const char* jkNodeAddrEpAddrs = "Ep";
static const char* jkNodeAddr = "NodeAddr";
static const char* jkNodeTaskId = "TaskId";
static const char* jkNodeTaskSchedId = "SchedId";
static bool queryNodeAddrToJson(const void* obj, cJSON* json) {
const SQueryNodeAddr* pAddr = (const SQueryNodeAddr*) obj;
bool res = cJSON_AddNumberToObject(json, jkNodeAddrId, pAddr->nodeId);
if (res) {
res = cJSON_AddNumberToObject(json, jkNodeAddrInUse, pAddr->inUse);
}
if (res) {
res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, pAddr->epAddr, sizeof(SEpAddr), pAddr->numOfEps);
}
return res;
}
static bool queryNodeAddrFromJson(const cJSON* json, void* obj) {
SQueryNodeAddr* pAddr = (SQueryNodeAddr*) obj;
pAddr->nodeId = getNumber(json, jkNodeAddrId);
pAddr->inUse = getNumber(json, jkNodeAddrInUse);
int32_t numOfEps = 0;
bool res = fromRawArray(json, jkNodeAddrEpAddrs, epAddrFromJson, pAddr->epAddr, sizeof(SEpAddr), &numOfEps);
pAddr->numOfEps = numOfEps;
return res;
}
static bool nodeAddrToJson(const void* obj, cJSON* json) { static bool nodeAddrToJson(const void* obj, cJSON* json) {
const SQueryNodeAddr* ep = (const SQueryNodeAddr*)obj; const SDownstreamSource* pSource = (const SDownstreamSource*) obj;
bool res = cJSON_AddNumberToObject(json, jkNodeAddrId, ep->nodeId); bool res = cJSON_AddNumberToObject(json, jkNodeTaskId, pSource->taskId);
if (res) { if (res) {
res = cJSON_AddNumberToObject(json, jkNodeAddrInUse, ep->inUse); char t[30] = {0};
snprintf(t, tListLen(t), "%"PRIu64, pSource->schedId);
res = cJSON_AddStringToObject(json, jkNodeTaskSchedId, t);
} }
if (res) { if (res) {
res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, ep->epAddr, ep->numOfEps, sizeof(SEpAddr)); res = addObject(json, jkNodeAddr, queryNodeAddrToJson, &pSource->addr);
} }
return res; return res;
} }
static bool nodeAddrFromJson(const cJSON* json, void* obj) { static bool nodeAddrFromJson(const cJSON* json, void* obj) {
SQueryNodeAddr* ep = (SQueryNodeAddr*)obj; SDownstreamSource* pSource = (SDownstreamSource*)obj;
ep->nodeId = getNumber(json, jkNodeAddrId); pSource->taskId = getNumber(json, jkNodeTaskId);
ep->inUse = getNumber(json, jkNodeAddrInUse);
int32_t numOfEps = 0; pSource->schedId = getBigintFromString(json, jkNodeTaskSchedId);
bool res = fromRawArray(json, jkNodeAddrEpAddrs, nodeAddrFromJson, &ep->epAddr, sizeof(SEpAddr), &numOfEps); bool res = fromObject(json, jkNodeAddr, queryNodeAddrFromJson, &pSource->addr, true);
ep->numOfEps = numOfEps;
return res; return res;
} }
static const char* jkExchangeNodeSrcTemplateId = "SrcTemplateId"; static const char* jkExchangeNodeSrcTemplateId = "SrcTemplateId";
static const char* jkExchangeNodeSrcEndPoints = "SrcEndPoints"; static const char* jkExchangeNodeSrcEndPoints = "SrcAddrs";
static bool exchangeNodeToJson(const void* obj, cJSON* json) { static bool exchangeNodeToJson(const void* obj, cJSON* json) {
const SExchangePhyNode* exchange = (const SExchangePhyNode*)obj; const SExchangePhyNode* exchange = (const SExchangePhyNode*)obj;
bool res = cJSON_AddNumberToObject(json, jkExchangeNodeSrcTemplateId, exchange->srcTemplateId); bool res = cJSON_AddNumberToObject(json, jkExchangeNodeSrcTemplateId, exchange->srcTemplateId);
if (res) { if (res) {
res = addInlineArray(json, jkExchangeNodeSrcEndPoints, nodeAddrToJson, exchange->pSrcEndPoints); res = addRawArray(json, jkExchangeNodeSrcEndPoints, nodeAddrToJson, exchange->pSrcEndPoints->pData, sizeof(SDownstreamSource), taosArrayGetSize(exchange->pSrcEndPoints));
} }
return res; return res;
} }
@ -780,7 +823,7 @@ static bool exchangeNodeToJson(const void* obj, cJSON* json) {
static bool exchangeNodeFromJson(const cJSON* json, void* obj) { static bool exchangeNodeFromJson(const cJSON* json, void* obj) {
SExchangePhyNode* exchange = (SExchangePhyNode*)obj; SExchangePhyNode* exchange = (SExchangePhyNode*)obj;
exchange->srcTemplateId = getNumber(json, jkExchangeNodeSrcTemplateId); exchange->srcTemplateId = getNumber(json, jkExchangeNodeSrcTemplateId);
return fromInlineArray(json, jkExchangeNodeSrcEndPoints, nodeAddrFromJson, &exchange->pSrcEndPoints, sizeof(SQueryNodeAddr)); return fromInlineArray(json, jkExchangeNodeSrcEndPoints, nodeAddrFromJson, &exchange->pSrcEndPoints, sizeof(SDownstreamSource));
} }
static bool specificPhyNodeToJson(const void* obj, cJSON* json) { static bool specificPhyNodeToJson(const void* obj, cJSON* json) {
@ -989,7 +1032,11 @@ static const char* jkIdSubplanId = "SubplanId";
static bool subplanIdToJson(const void* obj, cJSON* jId) { static bool subplanIdToJson(const void* obj, cJSON* jId) {
const SSubplanId* id = (const SSubplanId*)obj; const SSubplanId* id = (const SSubplanId*)obj;
bool res = cJSON_AddNumberToObject(jId, jkIdQueryId, id->queryId);
char ids[40] = {0};
snprintf(ids, tListLen(ids), "%"PRIu64, id->queryId);
bool res = cJSON_AddStringToObject(jId, jkIdQueryId, ids);
if (res) { if (res) {
res = cJSON_AddNumberToObject(jId, jkIdTemplateId, id->templateId); res = cJSON_AddNumberToObject(jId, jkIdTemplateId, id->templateId);
} }
@ -1001,9 +1048,10 @@ static bool subplanIdToJson(const void* obj, cJSON* jId) {
static bool subplanIdFromJson(const cJSON* json, void* obj) { static bool subplanIdFromJson(const cJSON* json, void* obj) {
SSubplanId* id = (SSubplanId*)obj; SSubplanId* id = (SSubplanId*)obj;
id->queryId = getNumber(json, jkIdQueryId);
id->queryId = getBigintFromString(json, jkIdQueryId);
id->templateId = getNumber(json, jkIdTemplateId); id->templateId = getNumber(json, jkIdTemplateId);
id->subplanId = getNumber(json, jkIdSubplanId); id->subplanId = getNumber(json, jkIdSubplanId);
return true; return true;
} }

View File

@ -87,8 +87,8 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag,
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) { void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource) {
setSubplanExecutionNode(subplan, templateId, ep); setSubplanExecutionNode(subplan, templateId, pSource);
} }
int32_t qSubPlanToString(const SSubplan *subplan, char** str, int32_t* len) { int32_t qSubPlanToString(const SSubplan *subplan, char** str, int32_t* len) {

View File

@ -1,15 +1,4 @@
aux_source_directory(src QWORKER_SRC) aux_source_directory(src QWORKER_SRC)
#add_library(qworker ${QWORKER_SRC})
#target_include_directories(
# qworker
# PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/qworker"
# PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
#)
#
#target_link_libraries(
# qworker
# PRIVATE os util transport planner qcom executor
#)
add_library(qworker STATIC ${QWORKER_SRC}) add_library(qworker STATIC ${QWORKER_SRC})
target_include_directories( target_include_directories(
@ -18,11 +7,6 @@ target_include_directories(
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
) )
#set_target_properties(qworker PROPERTIES
# IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/libqworker.a"
# INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_SOURCE_DIR}/include/libs/qworker"
# )
target_link_libraries(qworker target_link_libraries(qworker
PRIVATE os util transport planner qcom executor PRIVATE os util transport planner qcom executor
) )

View File

@ -258,6 +258,8 @@ int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
char id[sizeof(qId) + sizeof(tId)] = {0}; char id[sizeof(qId) + sizeof(tId)] = {0};
QW_SET_QTID(id, qId, tId); QW_SET_QTID(id, qId, tId);
printf("%"PRIx64", tid:%"PRIx64"\n", qId, tId);
SQWTaskCtx nctx = {0}; SQWTaskCtx nctx = {0};
QW_LOCK(QW_WRITE, &mgmt->ctxLock); QW_LOCK(QW_WRITE, &mgmt->ctxLock);

View File

@ -29,7 +29,7 @@ extern "C" {
#define SCHEDULE_DEFAULT_JOB_NUMBER 1000 #define SCHEDULE_DEFAULT_JOB_NUMBER 1000
#define SCHEDULE_DEFAULT_TASK_NUMBER 1000 #define SCHEDULE_DEFAULT_TASK_NUMBER 1000
#define SCH_MAX_CONDIDATE_EP_NUM TSDB_MAX_REPLICA #define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA
enum { enum {
SCH_READ = 1, SCH_READ = 1,

View File

@ -52,9 +52,9 @@ int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *
pTask->level = pLevel; pTask->level = pLevel;
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
pTask->taskId = schGenTaskId(); pTask->taskId = schGenTaskId();
pTask->execAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); pTask->execAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
if (NULL == pTask->execAddrs) { if (NULL == pTask->execAddrs) {
SCH_TASK_ELOG("taosArrayInit %d exec addrs failed", SCH_MAX_CONDIDATE_EP_NUM); SCH_TASK_ELOG("taosArrayInit %d exec addrs failed", SCH_MAX_CANDIDATE_EP_NUM);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
@ -408,9 +408,9 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
} }
pTask->candidateIdx = 0; pTask->candidateIdx = 0;
pTask->candidateAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); pTask->candidateAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
if (NULL == pTask->candidateAddrs) { if (NULL == pTask->candidateAddrs) {
SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCH_MAX_CONDIDATE_EP_NUM); SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCH_MAX_CANDIDATE_EP_NUM);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
@ -430,10 +430,10 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
if (pJob->nodeList) { if (pJob->nodeList) {
nodeNum = taosArrayGetSize(pJob->nodeList); nodeNum = taosArrayGetSize(pJob->nodeList);
for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i); SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i);
if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) { if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno); SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
@ -443,12 +443,12 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
} }
if (addNum <= 0) { if (addNum <= 0) {
SCH_TASK_ELOG("no available execNode as condidate addr, nodeNum:%d", nodeNum); SCH_TASK_ELOG("no available execNode as candidate addr, nodeNum:%d", nodeNum);
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
} }
/* /*
for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) { for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i])); strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i]; epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];
@ -767,7 +767,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
} }
/* /*
if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CONDIDATE_EP_NUM) { if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) {
strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn)); strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port; job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;
@ -782,7 +782,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
atomic_add_fetch_32(&par->childReady, 1); atomic_add_fetch_32(&par->childReady, 1);
SCH_LOCK(SCH_WRITE, &par->lock); SCH_LOCK(SCH_WRITE, &par->lock);
qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &pTask->succeedAddr); SDownstreamSource source = {.taskId = pTask->taskId, .schedId = schMgmt.sId, .addr = pTask->succeedAddr};
qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &source);
SCH_UNLOCK(SCH_WRITE, &par->lock); SCH_UNLOCK(SCH_WRITE, &par->lock);
if (SCH_TASK_READY_TO_LUNCH(par)) { if (SCH_TASK_READY_TO_LUNCH(par)) {
@ -853,7 +854,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
SResReadyRsp *rsp = (SResReadyRsp *)msg; SResReadyRsp *rsp = (SResReadyRsp *)msg;
if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) { if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) {
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rsp->code)); SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
} }
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
@ -1214,6 +1215,8 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
SCH_TASK_ELOG("subplanToString error, code:%x, msg:%p, len:%d", code, pTask->msg, pTask->msgLen); SCH_TASK_ELOG("subplanToString error, code:%x, msg:%p, len:%d", code, pTask->msg, pTask->msgLen);
SCH_ERR_JRET(code); SCH_ERR_JRET(code);
} }
// printf("physical plan:%s\n", pTask->msg);
} }
SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask)); SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
@ -1477,7 +1480,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
SSubQueryMsg *pMsg = msg; SSubQueryMsg *pMsg = (SSubQueryMsg*) msg;
pMsg->header.vgId = htonl(tInfo.addr.nodeId); pMsg->header.vgId = htonl(tInfo.addr.nodeId);

View File

@ -44,7 +44,7 @@ typedef struct {
// TDB Operations // TDB Operations
TDB_EXTERN int tdbCreateDB(TDB** dbpp, tdb_db_t type); TDB_EXTERN int tdbCreateDB(TDB** dbpp, tdb_db_t type);
TDB_EXTERN int tdbOpenDB(TDB* dbp, uint32_t flags); TDB_EXTERN int tdbOpenDB(TDB* dbp, const char* fname, const char* dbname, uint32_t flags);
TDB_EXTERN int tdbCloseDB(TDB* dbp, uint32_t flags); TDB_EXTERN int tdbCloseDB(TDB* dbp, uint32_t flags);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -56,9 +56,28 @@ _err:
return 0; return 0;
} }
TDB_EXTERN int tdbOpenDB(TDB* dbp, uint32_t flags) { TDB_EXTERN int tdbOpenDB(TDB* dbp, const char* fname, const char* dbname, uint32_t flags) {
// TODO int ret = 0;
return 0;
if ((dbp->fname = strdup(fname)) == NULL) {
ret = -1;
return ret;
}
// Create the backup file if the file not exists
// Open the file as a sub-db or a master-db
if (dbname) {
if ((dbp->dbname = strdup(dbname)) == NULL) {
ret = -1;
return ret;
}
// TODO: Open the DB as a SUB-DB in this file
} else {
// TODO: Open the DB as a MASTER-DB in this file
}
return ret;
} }
TDB_EXTERN int tdbCloseDB(TDB* dbp, uint32_t flags) { TDB_EXTERN int tdbCloseDB(TDB* dbp, uint32_t flags) {

View File

@ -25,6 +25,14 @@
extern "C" { extern "C" {
#endif #endif
typedef struct {
// TODO
} TDB_MPOOL;
typedef struct {
int fd;
} TDB_FH;
struct TDB { struct TDB {
pgsize_t pageSize; pgsize_t pageSize;
tdb_db_t type; tdb_db_t type;
@ -35,6 +43,9 @@ struct TDB {
TDB_HASH * hash; TDB_HASH * hash;
TDB_HEAP * heap; TDB_HEAP * heap;
} dbam; // db access method } dbam; // db access method
TDB_FH * fhp; // The backup file handle
TDB_MPOOL *mph; // The memory pool handle
}; };
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -6,9 +6,9 @@ TEST(tdb_api_test, tdb_create_open_close_db_test) {
int ret; int ret;
TDB *dbp; TDB *dbp;
tdbCreateDB(&dbp, TDB_BTREE_T); // tdbCreateDB(&dbp, TDB_BTREE_T);
tdbOpenDB(dbp, 0); // tdbOpenDB(dbp, 0);
tdbCloseDB(dbp, 0); // tdbCloseDB(dbp, 0);
} }

View File

@ -16,8 +16,8 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "tconfig.h" #include "tconfig.h"
#include "ulog.h"
#include "tutil.h" #include "tutil.h"
#include "ulog.h"
SGlobalCfg tsGlobalConfig[TSDB_CFG_MAX_NUM] = {{0}}; SGlobalCfg tsGlobalConfig[TSDB_CFG_MAX_NUM] = {{0}};
int32_t tsGlobalConfigNum = 0; int32_t tsGlobalConfigNum = 0;