Merge remote-tracking branch 'origin/feature/3.0_liaohj' into feature/qnode
This commit is contained in:
commit
c3bf03cc15
|
@ -7,3 +7,4 @@ FROM mcr.microsoft.com/vscode/devcontainers/cpp:0-${VARIANT}
|
|||
# [Optional] Uncomment this section to install additional packages.
|
||||
# RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \
|
||||
# && apt-get -y install --no-install-recommends <your-package-list-here>
|
||||
RUN apt-get update && apt-get -y install tree vim
|
||||
|
|
|
@ -114,10 +114,16 @@ typedef struct SProjectPhyNode {
|
|||
SPhyNode node;
|
||||
} SProjectPhyNode;
|
||||
|
||||
typedef struct SDownstreamSource {
|
||||
SQueryNodeAddr addr;
|
||||
uint64_t taskId;
|
||||
uint64_t schedId;
|
||||
} SDownstreamSource;
|
||||
|
||||
typedef struct SExchangePhyNode {
|
||||
SPhyNode node;
|
||||
uint64_t srcTemplateId; // template id of datasource suplans
|
||||
SArray *pSrcEndPoints; // SEpAddr, scheduler fill by calling qSetSuplanExecutionNode
|
||||
uint64_t srcTemplateId; // template id of datasource suplans
|
||||
SArray *pSrcEndPoints; // SArray<SDownstreamSource>, scheduler fill by calling qSetSuplanExecutionNode
|
||||
} SExchangePhyNode;
|
||||
|
||||
typedef enum EAggAlgo {
|
||||
|
@ -178,7 +184,7 @@ int32_t qCreateQueryDag(const struct SQueryNode* pQueryInfo, struct SQueryDag**
|
|||
// @subplan subplan to be schedule
|
||||
// @templateId templateId of a group of datasource subplans of this @subplan
|
||||
// @ep one execution location of this group of datasource subplans
|
||||
void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep);
|
||||
void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource);
|
||||
|
||||
int32_t qExplainQuery(const struct SQueryNode* pQueryInfo, struct SEpSet* pQnode, char** str);
|
||||
|
||||
|
|
|
@ -179,7 +179,7 @@ extern int32_t clientConnRefPool;
|
|||
|
||||
extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
|
||||
int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code);
|
||||
SMsgSendInfo* buildMsgInfoImpl(SRequestObj*);
|
||||
SMsgSendInfo* buildMsgInfoImpl(SRequestObj* pReqObj);
|
||||
|
||||
int taos_init();
|
||||
|
||||
|
|
|
@ -195,8 +195,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
|
|||
}
|
||||
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
|
||||
} else {
|
||||
SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet;
|
||||
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, pEpSet, &transporterId, pSendMsg);
|
||||
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
|
||||
}
|
||||
|
||||
tsem_wait(&pRequest->body.rspSem);
|
||||
|
@ -257,7 +256,14 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag) {
|
|||
return pRequest->code;
|
||||
}
|
||||
|
||||
return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, NULL, pDag, &pRequest->body.pQueryJob);
|
||||
SArray *execNode = taosArrayInit(4, sizeof(SQueryNodeAddr));
|
||||
|
||||
SQueryNodeAddr addr = {.numOfEps = 1, .inUse = 0, .nodeId = 2};
|
||||
addr.epAddr[0].port = 6030;
|
||||
strcpy(addr.epAddr[0].fqdn, "localhost");
|
||||
|
||||
taosArrayPush(execNode, &addr);
|
||||
return scheduleAsyncExecJob(pRequest->pTscObj->pAppInfo->pTransporter, execNode, pDag, &pRequest->body.pQueryJob);
|
||||
}
|
||||
|
||||
typedef struct tmq_t tmq_t;
|
||||
|
@ -699,6 +705,8 @@ void* doFetchRow(SRequestObj* pRequest) {
|
|||
assert(pRequest != NULL);
|
||||
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
|
||||
|
||||
SEpSet epSet = {0};
|
||||
|
||||
if (pResultInfo->pData == NULL || pResultInfo->current >= pResultInfo->numOfRows) {
|
||||
if (pRequest->type == TDMT_VND_QUERY) {
|
||||
// All data has returned to App already, no need to try again
|
||||
|
@ -706,9 +714,13 @@ void* doFetchRow(SRequestObj* pRequest) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData);
|
||||
setQueryResultByRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pRequest->body.resInfo.pData);
|
||||
int32_t code = scheduleFetchRows(pRequest->body.pQueryJob, (void **)&pRequest->body.resInfo.pData);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pRequest->code = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
setQueryResultByRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pRequest->body.resInfo.pData);
|
||||
if (pResultInfo->numOfRows == 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -718,6 +730,17 @@ void* doFetchRow(SRequestObj* pRequest) {
|
|||
pRequest->type = TDMT_MND_SHOW_RETRIEVE;
|
||||
} else if (pRequest->type == TDMT_VND_SHOW_TABLES) {
|
||||
pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
|
||||
SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
|
||||
SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex);
|
||||
|
||||
epSet.numOfEps = pVgroupInfo->numOfEps;
|
||||
epSet.inUse = pVgroupInfo->inUse;
|
||||
|
||||
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
|
||||
strncpy(epSet.fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(epSet.fqdn[i]));
|
||||
epSet.port[i] = pVgroupInfo->epAddr[i].port;
|
||||
}
|
||||
|
||||
} else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
|
||||
pRequest->type = TDMT_VND_SHOW_TABLES;
|
||||
SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo;
|
||||
|
@ -735,9 +758,17 @@ void* doFetchRow(SRequestObj* pRequest) {
|
|||
|
||||
SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
|
||||
|
||||
epSet.numOfEps = pVgroupInfo->numOfEps;
|
||||
epSet.inUse = pVgroupInfo->inUse;
|
||||
|
||||
for (int32_t i = 0; i < epSet.numOfEps; ++i) {
|
||||
strncpy(epSet.fqdn[i], pVgroupInfo->epAddr[i].fqdn, tListLen(epSet.fqdn[i]));
|
||||
epSet.port[i] = pVgroupInfo->epAddr[i].port;
|
||||
}
|
||||
|
||||
int64_t transporterId = 0;
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
|
||||
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
|
||||
tsem_wait(&pRequest->body.rspSem);
|
||||
|
||||
pRequest->type = TDMT_VND_SHOW_TABLES_FETCH;
|
||||
|
@ -747,7 +778,7 @@ void* doFetchRow(SRequestObj* pRequest) {
|
|||
|
||||
int64_t transporterId = 0;
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &pTscObj->pAppInfo->mgmtEp.epSet, &transporterId, body);
|
||||
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body);
|
||||
|
||||
tsem_wait(&pRequest->body.rspSem);
|
||||
|
||||
|
|
|
@ -115,7 +115,7 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) {
|
|||
}
|
||||
} else {
|
||||
assert(pRequest != NULL);
|
||||
pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
|
||||
pMsgSendInfo->msgInfo = pRequest->body.requestMsg;
|
||||
}
|
||||
|
||||
pMsgSendInfo->fp = (handleRequestRspFp[TMSG_INDEX(pRequest->type)] == NULL)? genericRspCallback:handleRequestRspFp[TMSG_INDEX(pRequest->type)];
|
||||
|
|
|
@ -49,7 +49,7 @@ int main(int argc, char** argv) {
|
|||
TEST(testCase, driverInit_Test) { taos_init(); }
|
||||
|
||||
TEST(testCase, connect_Test) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", "abc1", 0);
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
if (pConn == NULL) {
|
||||
printf("failed to connect to server, reason:%s\n", taos_errstr(NULL));
|
||||
}
|
||||
|
@ -295,24 +295,24 @@ TEST(testCase, connect_Test) {
|
|||
// taos_close(pConn);
|
||||
//}
|
||||
|
||||
TEST(testCase, create_table_Test) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k int)");
|
||||
ASSERT_EQ(taos_errno(pRes), 0);
|
||||
|
||||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k blob)");
|
||||
ASSERT_NE(taos_errno(pRes), 0);
|
||||
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
}
|
||||
//TEST(testCase, create_table_Test) {
|
||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
// assert(pConn != NULL);
|
||||
//
|
||||
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||
// taos_free_result(pRes);
|
||||
//
|
||||
// pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k int)");
|
||||
// ASSERT_EQ(taos_errno(pRes), 0);
|
||||
//
|
||||
// taos_free_result(pRes);
|
||||
//
|
||||
// pRes = taos_query(pConn, "create table if not exists tm0(ts timestamp, k blob)");
|
||||
// ASSERT_NE(taos_errno(pRes), 0);
|
||||
//
|
||||
// taos_free_result(pRes);
|
||||
// taos_close(pConn);
|
||||
//}
|
||||
|
||||
//TEST(testCase, create_ctable_Test) {
|
||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
|
@ -333,36 +333,36 @@ TEST(testCase, create_table_Test) {
|
|||
// taos_close(pConn);
|
||||
//}
|
||||
|
||||
TEST(testCase, show_stable_Test) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != nullptr);
|
||||
|
||||
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||
//TEST(testCase, show_stable_Test) {
|
||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
// assert(pConn != nullptr);
|
||||
//
|
||||
//// TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||
//// if (taos_errno(pRes) != 0) {
|
||||
//// printf("failed to use db, reason:%s\n", taos_errstr(pRes));
|
||||
//// }
|
||||
//// taos_free_result(pRes);
|
||||
//
|
||||
// TAOS_RES* pRes = taos_query(pConn, "show abc1.stables");
|
||||
// if (taos_errno(pRes) != 0) {
|
||||
// printf("failed to use db, reason:%s\n", taos_errstr(pRes));
|
||||
// printf("failed to show stables, reason:%s\n", taos_errstr(pRes));
|
||||
// taos_free_result(pRes);
|
||||
// ASSERT_TRUE(false);
|
||||
// }
|
||||
//
|
||||
// TAOS_ROW pRow = NULL;
|
||||
// TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||
// int32_t numOfFields = taos_num_fields(pRes);
|
||||
//
|
||||
// char str[512] = {0};
|
||||
// while ((pRow = taos_fetch_row(pRes)) != NULL) {
|
||||
// int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
|
||||
// printf("%s\n", str);
|
||||
// }
|
||||
//
|
||||
// taos_free_result(pRes);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "show abc1.stables");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to show stables, reason:%s\n", taos_errstr(pRes));
|
||||
taos_free_result(pRes);
|
||||
ASSERT_TRUE(false);
|
||||
}
|
||||
|
||||
TAOS_ROW pRow = NULL;
|
||||
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||
int32_t numOfFields = taos_num_fields(pRes);
|
||||
|
||||
char str[512] = {0};
|
||||
while ((pRow = taos_fetch_row(pRes)) != NULL) {
|
||||
int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
|
||||
printf("%s\n", str);
|
||||
}
|
||||
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
}
|
||||
// taos_close(pConn);
|
||||
//}
|
||||
//
|
||||
//TEST(testCase, show_vgroup_Test) {
|
||||
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
|
@ -621,7 +621,7 @@ TEST(testCase, create_topic_Test) {
|
|||
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||
// taos_free_result(pRes);
|
||||
//
|
||||
// pRes = taos_query(pConn, "select ts,k from m1");
|
||||
// pRes = taos_query(pConn, "select ts from m1");
|
||||
// if (taos_errno(pRes) != 0) {
|
||||
// printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
|
||||
// taos_free_result(pRes);
|
||||
|
|
|
@ -42,7 +42,8 @@ typedef struct {
|
|||
SSchema *pSchema;
|
||||
} SSchemaWrapper;
|
||||
|
||||
typedef struct SMTbCursor SMTbCursor;
|
||||
typedef struct SMTbCursor SMTbCursor;
|
||||
typedef struct SMCtbCursor SMCtbCursor;
|
||||
|
||||
typedef SVCreateTbReq STbCfg;
|
||||
|
||||
|
@ -64,6 +65,10 @@ SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
|
|||
void metaCloseTbCursor(SMTbCursor *pTbCur);
|
||||
char * metaTbCursorNext(SMTbCursor *pTbCur);
|
||||
|
||||
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid);
|
||||
void metaCloseCtbCurosr(SMCtbCursor *pCtbCur);
|
||||
tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur);
|
||||
|
||||
// Options
|
||||
void metaOptionsInit(SMetaCfg *pMetaCfg);
|
||||
void metaOptionsClear(SMetaCfg *pMetaCfg);
|
||||
|
|
|
@ -93,6 +93,7 @@ int tsdbOptionsInit(STsdbCfg *);
|
|||
void tsdbOptionsClear(STsdbCfg *);
|
||||
|
||||
typedef void* tsdbReadHandleT;
|
||||
|
||||
/**
|
||||
* Get the data block iterator, starting from position according to the query condition
|
||||
*
|
||||
|
@ -124,6 +125,24 @@ tsdbReadHandleT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGro
|
|||
|
||||
bool isTsdbCacheLastRow(tsdbReadHandleT* pTsdbReadHandle);
|
||||
|
||||
/**
|
||||
*
|
||||
* @param tsdb
|
||||
* @param uid
|
||||
* @param skey
|
||||
* @param pTagCond
|
||||
* @param len
|
||||
* @param tagNameRelType
|
||||
* @param tbnameCond
|
||||
* @param pGroupInfo
|
||||
* @param pColIndex
|
||||
* @param numOfCols
|
||||
* @param reqId
|
||||
* @return
|
||||
*/
|
||||
int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len,
|
||||
int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
|
||||
SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId);
|
||||
/**
|
||||
* get num of rows in mem table
|
||||
*
|
||||
|
|
|
@ -374,22 +374,27 @@ static int metaCtbIdxCb(DB *pIdx, const DBT *pKey, const DBT *pValue, DBT *pSKey
|
|||
DBT * pDbt;
|
||||
|
||||
if (pTbCfg->type == META_CHILD_TABLE) {
|
||||
pDbt = calloc(2, sizeof(DBT));
|
||||
// pDbt = calloc(2, sizeof(DBT));
|
||||
|
||||
// First key is suid
|
||||
pDbt[0].data = &(pTbCfg->ctbCfg.suid);
|
||||
pDbt[0].size = sizeof(pTbCfg->ctbCfg.suid);
|
||||
// // First key is suid
|
||||
// pDbt[0].data = &(pTbCfg->ctbCfg.suid);
|
||||
// pDbt[0].size = sizeof(pTbCfg->ctbCfg.suid);
|
||||
|
||||
// Second key is the first tag
|
||||
void *pTagVal = tdGetKVRowValOfCol(pTbCfg->ctbCfg.pTag, (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId);
|
||||
pDbt[1].data = pTagVal;
|
||||
pDbt[1].size = sizeof(int32_t);
|
||||
// // Second key is the first tag
|
||||
// void *pTagVal = tdGetKVRowValOfCol(pTbCfg->ctbCfg.pTag, (kvRowColIdx(pTbCfg->ctbCfg.pTag))[0].colId);
|
||||
// pDbt[1].data = pTagVal;
|
||||
// pDbt[1].size = sizeof(int32_t);
|
||||
|
||||
// Set index key
|
||||
memset(pSKey, 0, sizeof(*pSKey));
|
||||
#if 0
|
||||
pSKey->flags = DB_DBT_MULTIPLE | DB_DBT_APPMALLOC;
|
||||
pSKey->data = pDbt;
|
||||
pSKey->size = 2;
|
||||
#else
|
||||
pSKey->data = &(pTbCfg->ctbCfg.suid);
|
||||
pSKey->size = sizeof(pTbCfg->ctbCfg.suid);
|
||||
#endif
|
||||
|
||||
return 0;
|
||||
} else {
|
||||
|
@ -621,4 +626,61 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
|
|||
tdDestroyTSchemaBuilder(&sb);
|
||||
|
||||
return pTSchema;
|
||||
}
|
||||
|
||||
struct SMCtbCursor {
|
||||
DBC * pCur;
|
||||
tb_uid_t suid;
|
||||
};
|
||||
|
||||
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
|
||||
SMCtbCursor *pCtbCur = NULL;
|
||||
SMetaDB * pDB = pMeta->pDB;
|
||||
int ret;
|
||||
|
||||
pCtbCur = (SMCtbCursor *)calloc(1, sizeof(*pCtbCur));
|
||||
if (pCtbCur == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pCtbCur->suid = uid;
|
||||
ret = pDB->pCtbIdx->cursor(pDB->pCtbIdx, NULL, &(pCtbCur->pCur), 0);
|
||||
if (ret != 0) {
|
||||
free(pCtbCur);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return pCtbCur;
|
||||
}
|
||||
|
||||
void metaCloseCtbCurosr(SMCtbCursor *pCtbCur) {
|
||||
if (pCtbCur) {
|
||||
if (pCtbCur->pCur) {
|
||||
pCtbCur->pCur->close(pCtbCur->pCur);
|
||||
}
|
||||
|
||||
free(pCtbCur);
|
||||
}
|
||||
}
|
||||
|
||||
tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur) {
|
||||
DBT skey = {0};
|
||||
DBT pkey = {0};
|
||||
DBT pval = {0};
|
||||
void * pBuf;
|
||||
STbCfg tbCfg;
|
||||
|
||||
// Set key
|
||||
skey.data = &(pCtbCur->suid);
|
||||
skey.size = sizeof(pCtbCur->suid);
|
||||
|
||||
if (pCtbCur->pCur->pget(pCtbCur->pCur, &skey, &pkey, &pval, DB_NEXT) == 0) {
|
||||
tb_uid_t id = *(tb_uid_t *)pkey.data;
|
||||
assert(id != 0);
|
||||
return id;
|
||||
// metaDecodeTbInfo(pBuf, &tbCfg);
|
||||
// return tbCfg.;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
|
@ -152,7 +152,7 @@ typedef struct STsdbReadHandle {
|
|||
typedef struct STableGroupSupporter {
|
||||
int32_t numOfCols;
|
||||
SColIndex* pCols;
|
||||
STSchema* pTagSchema;
|
||||
SSchema* pTagSchema;
|
||||
} STableGroupSupporter;
|
||||
|
||||
static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList);
|
||||
|
@ -466,7 +466,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
|
|||
return (tsdbReadHandleT)pReadHandle;
|
||||
|
||||
_end:
|
||||
// tsdbCleanupQueryHandle(pTsdbReadHandle);
|
||||
tsdbCleanupQueryHandle(pReadHandle);
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
@ -2630,18 +2630,20 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
|
|||
return numOfRows;
|
||||
}
|
||||
|
||||
static int32_t getAllTableList(STable* pSuperTable, SArray* list) {
|
||||
SSkipListIterator* iter = NULL;//tSkipListCreateIter(pSuperTable->pIndex);
|
||||
while (tSkipListIterNext(iter)) {
|
||||
SSkipListNode* pNode = tSkipListIterGet(iter);
|
||||
static int32_t getAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
|
||||
SMCtbCursor* pCur = metaOpenCtbCursor(pMeta, uid);
|
||||
|
||||
STable* pTable = (STable*) SL_GET_NODE_DATA((SSkipListNode*) pNode);
|
||||
while (1) {
|
||||
tb_uid_t id = metaCtbCursorNext(pCur);
|
||||
if (id == 0) {
|
||||
break;
|
||||
}
|
||||
|
||||
STableKeyInfo info = {.pTable = pTable, .lastKey = TSKEY_INITIAL_VAL};
|
||||
STableKeyInfo info = {.pTable = NULL, .lastKey = TSKEY_INITIAL_VAL, uid = id};
|
||||
taosArrayPush(list, &info);
|
||||
}
|
||||
|
||||
tSkipListDestroyIter(iter);
|
||||
metaCloseCtbCurosr(pCur);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -3553,7 +3555,7 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable
|
|||
taosArrayPush(pGroups, &g);
|
||||
}
|
||||
|
||||
SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols, TSKEY skey) {
|
||||
SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols, TSKEY skey) {
|
||||
assert(pTableList != NULL);
|
||||
SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
|
||||
|
||||
|
@ -3564,25 +3566,18 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
|
|||
}
|
||||
|
||||
if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table
|
||||
SArray* sa = taosArrayInit(size, sizeof(STableKeyInfo));
|
||||
SArray* sa = taosArrayDup(pTableList);
|
||||
if (sa == NULL) {
|
||||
taosArrayDestroy(pTableGroup);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
for(int32_t i = 0; i < size; ++i) {
|
||||
STableKeyInfo *pKeyInfo = taosArrayGet(pTableList, i);
|
||||
|
||||
STableKeyInfo info = {.pTable = pKeyInfo->pTable, .lastKey = skey};
|
||||
taosArrayPush(sa, &info);
|
||||
}
|
||||
|
||||
taosArrayPush(pTableGroup, &sa);
|
||||
tsdbDebug("all %" PRIzu " tables belong to one group", size);
|
||||
} else {
|
||||
STableGroupSupporter sup = {0};
|
||||
sup.numOfCols = numOfOrderCols;
|
||||
sup.pTagSchema = pTagSchema;
|
||||
sup.pTagSchema = pTagSchema->pSchema;
|
||||
sup.pCols = pCols;
|
||||
|
||||
// taosqsort(pTableList->pData, size, sizeof(STableKeyInfo), &sup, tableGroupComparFn);
|
||||
|
@ -3710,12 +3705,11 @@ int32_t tsdbQuerySTableByTagCond(STsdb* tsdb, uint64_t uid, TSKEY skey, const ch
|
|||
|
||||
//NOTE: not add ref count for super table
|
||||
SArray* res = taosArrayInit(8, sizeof(STableKeyInfo));
|
||||
STSchema* pTagSchema = metaGetTableSchema(tsdb->pMeta, uid, 0, true);
|
||||
SSchemaWrapper* pTagSchema = metaGetTableSchema(tsdb->pMeta, uid, 0, true);
|
||||
|
||||
// no tags and tbname condition, all child tables of this stable are involved
|
||||
if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
|
||||
assert(false);
|
||||
int32_t ret = 0;//getAllTableList(pTable, res);
|
||||
int32_t ret = getAllTableList(tsdb->pMeta, uid, res);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -3854,7 +3848,7 @@ int32_t tsdbGetTableGroupFromIdList(STsdb* tsdb, SArray* pTableIdList, STableGro
|
|||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
#endif
|
||||
static void* doFreeColumnInfoData(SArray* pColumnInfoData) {
|
||||
if (pColumnInfoData == NULL) {
|
||||
return NULL;
|
||||
|
@ -3883,6 +3877,7 @@ static void* destroyTableCheckInfo(SArray* pTableCheckInfo) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
void tsdbCleanupQueryHandle(tsdbReadHandleT queryHandle) {
|
||||
STsdbReadHandle* pTsdbReadHandle = (STsdbReadHandle*)queryHandle;
|
||||
if (pTsdbReadHandle == NULL) {
|
||||
|
@ -3921,6 +3916,7 @@ void tsdbCleanupQueryHandle(tsdbReadHandleT queryHandle) {
|
|||
tfree(pTsdbReadHandle);
|
||||
}
|
||||
|
||||
#if 0
|
||||
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList) {
|
||||
assert(pGroupList != NULL);
|
||||
|
||||
|
|
|
@ -375,9 +375,12 @@ typedef struct STaskParam {
|
|||
} STaskParam;
|
||||
|
||||
typedef struct SExchangeInfo {
|
||||
int32_t numOfSources;
|
||||
SEpSet *pEpset;
|
||||
int32_t bytes; // total load bytes from remote
|
||||
SArray *pSources;
|
||||
int32_t bytes; // total load bytes from remote
|
||||
tsem_t ready;
|
||||
void *pTransporter;
|
||||
SRetrieveTableRsp *pRsp;
|
||||
SSDataBlock *pResult;
|
||||
} SExchangeInfo;
|
||||
|
||||
typedef struct STableScanInfo {
|
||||
|
@ -545,7 +548,7 @@ typedef struct SOrderOperatorInfo {
|
|||
|
||||
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream);
|
||||
|
||||
SOperatorInfo* createExchangeOperatorInfo(const SVgroupInfo* pVgroups, int32_t numOfSources, int32_t numOfOutput, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pSchema, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo);
|
||||
|
@ -649,6 +652,6 @@ int32_t getMaximumIdleDurationSec();
|
|||
|
||||
void doInvokeUdf(struct SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t type);
|
||||
void setTaskStatus(SExecTaskInfo *pTaskInfo, int8_t status);
|
||||
int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle);
|
||||
int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, STableGroupInfo* pGroupInfo, void* readerHandle);
|
||||
|
||||
#endif // TDENGINE_EXECUTORIMPL_H
|
||||
|
|
|
@ -13,10 +13,11 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "os.h"
|
||||
#include "tarray.h"
|
||||
#include <tsdb.h>
|
||||
#include "dataSinkMgt.h"
|
||||
#include "exception.h"
|
||||
#include "os.h"
|
||||
#include "tarray.h"
|
||||
#include "tcache.h"
|
||||
#include "tglobal.h"
|
||||
#include "tmsg.h"
|
||||
|
@ -72,7 +73,46 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_
|
|||
assert(tsdb != NULL && pSubplan != NULL);
|
||||
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
|
||||
|
||||
int32_t code = doCreateExecTaskInfo(pSubplan, pTask, tsdb);
|
||||
int32_t code = 0;
|
||||
uint64_t uid = 0;
|
||||
STimeWindow window = TSWINDOW_INITIALIZER;
|
||||
int32_t tableType = 0;
|
||||
|
||||
SPhyNode* pPhyNode = pSubplan->pNode;
|
||||
STableGroupInfo groupInfo = {0};
|
||||
|
||||
int32_t type = pPhyNode->info.type;
|
||||
if (type == OP_TableScan || type == OP_DataBlocksOptScan) {
|
||||
STableScanPhyNode* pTableScanNode = (STableScanPhyNode*)pPhyNode;
|
||||
uid = pTableScanNode->scan.uid;
|
||||
window = pTableScanNode->window;
|
||||
tableType = pTableScanNode->scan.tableType;
|
||||
|
||||
if (tableType == TSDB_SUPER_TABLE) {
|
||||
code =
|
||||
tsdbQuerySTableByTagCond(tsdb, uid, window.skey, NULL, 0, 0, NULL, &groupInfo, NULL, 0, pSubplan->id.queryId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
} else { // Create one table group.
|
||||
groupInfo.numOfTables = 1;
|
||||
groupInfo.pGroupList = taosArrayInit(1, POINTER_BYTES);
|
||||
|
||||
SArray* pa = taosArrayInit(1, sizeof(STableKeyInfo));
|
||||
|
||||
STableKeyInfo info = {.pTable = NULL, .lastKey = 0, .uid = uid};
|
||||
taosArrayPush(pa, &info);
|
||||
taosArrayPush(groupInfo.pGroupList, &pa);
|
||||
}
|
||||
|
||||
if (groupInfo.numOfTables == 0) {
|
||||
code = 0;
|
||||
// qDebug("no table qualified for query, reqId:0x%"PRIx64, (*pTask)->id.queryId);
|
||||
goto _error;
|
||||
}
|
||||
}
|
||||
|
||||
code = doCreateExecTaskInfo(pSubplan, pTask, &groupInfo, tsdb);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
@ -87,7 +127,7 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_
|
|||
|
||||
*handle = (*pTask)->dsHandle;
|
||||
|
||||
_error:
|
||||
_error:
|
||||
// if failed to add ref for all tables in this query, abort current query
|
||||
return code;
|
||||
}
|
||||
|
@ -141,6 +181,11 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
|
|||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
int64_t threadId = taosGetSelfPthreadId();
|
||||
|
||||
// todo: remove it.
|
||||
if (tinfo == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
*pRes = NULL;
|
||||
|
||||
int64_t curOwner = 0;
|
||||
|
|
|
@ -4558,6 +4558,7 @@ void appendDownstream(SOperatorInfo* p, SOperatorInfo* pUpstream) {
|
|||
|
||||
static void doDestroyTableQueryInfo(STableGroupInfo* pTableqinfoGroupInfo);
|
||||
|
||||
void createResultBlock(const SArray* pExprInfo, SExchangeInfo* pInfo, const SOperatorInfo* pOperator, size_t size);
|
||||
static int32_t setupQueryHandle(void* tsdb, STaskRuntimeEnv* pRuntimeEnv, int64_t qId, bool isSTableQuery) {
|
||||
STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||
#if 0
|
||||
|
@ -4914,57 +4915,127 @@ static SSDataBlock* doBlockInfoScan(void* param, bool* newgroup) {
|
|||
}
|
||||
|
||||
int32_t loadRemoteDataCallback(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
SExchangeInfo* pEx = (SExchangeInfo*) param;
|
||||
pEx->pRsp = pMsg->pData;
|
||||
|
||||
pEx->pRsp->numOfRows = htonl(pEx->pRsp->numOfRows);
|
||||
pEx->pRsp->useconds = htobe64(pEx->pRsp->useconds);
|
||||
pEx->pRsp->compLen = htonl(pEx->pRsp->compLen);
|
||||
|
||||
tsem_post(&pEx->ready);
|
||||
}
|
||||
|
||||
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
|
||||
assert(pMsgBody != NULL);
|
||||
tfree(pMsgBody->msgInfo.pData);
|
||||
tfree(pMsgBody);
|
||||
}
|
||||
|
||||
void processRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
||||
SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle;
|
||||
assert(pMsg->ahandle != NULL);
|
||||
|
||||
SDataBuf buf = {.len = pMsg->contLen, .pData = NULL};
|
||||
|
||||
if (pMsg->contLen > 0) {
|
||||
buf.pData = calloc(1, pMsg->contLen);
|
||||
if (buf.pData == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
pMsg->code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
memcpy(buf.pData, pMsg->pCont, pMsg->contLen);
|
||||
}
|
||||
}
|
||||
|
||||
pSendInfo->fp(pSendInfo->param, &buf, pMsg->code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
destroySendMsgInfo(pSendInfo);
|
||||
}
|
||||
|
||||
static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
SOperatorInfo *pOperator = (SOperatorInfo*) param;
|
||||
|
||||
SExchangeInfo *pExchangeInfo = pOperator->info;
|
||||
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
*newgroup = false;
|
||||
if (pExchangeInfo->pRsp != NULL && pExchangeInfo->pRsp->completed == 1) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SResFetchReq *pMsg = calloc(1, sizeof(SResFetchReq));
|
||||
if (NULL == pMsg) { // todo handle malloc error
|
||||
|
||||
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
SEpSet epSet;
|
||||
SDownstreamSource* pSource = taosArrayGet(pExchangeInfo->pSources, 0);
|
||||
SEpSet epSet = {0};
|
||||
|
||||
int64_t sId = -1, queryId = 0, taskId = 1, vgId = 1;
|
||||
pMsg->header.vgId = htonl(vgId);
|
||||
epSet.numOfEps = pSource->addr.numOfEps;
|
||||
epSet.port[0] = pSource->addr.epAddr[0].port;
|
||||
tstrncpy(epSet.fqdn[0], pSource->addr.epAddr[0].fqdn, tListLen(epSet.fqdn[0]));
|
||||
|
||||
pMsg->sId = htobe64(sId);
|
||||
pMsg->taskId = htobe64(taskId);
|
||||
pMsg->queryId = htobe64(queryId);
|
||||
pMsg->header.vgId = htonl(pSource->addr.nodeId);
|
||||
pMsg->sId = htobe64(pSource->schedId);
|
||||
pMsg->taskId = htobe64(pSource->taskId);
|
||||
pMsg->queryId = htobe64(pTaskInfo->id.queryId);
|
||||
|
||||
// send the fetch remote task result reques
|
||||
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
|
||||
if (NULL == pMsgSendInfo) {
|
||||
qError("QID:%"PRIx64 ",TID:%"PRIx64 " calloc %d failed", queryId, taskId, (int32_t)sizeof(SMsgSendInfo));
|
||||
qError("QID:%"PRIx64" calloc %d failed", GET_TASKID(pTaskInfo), (int32_t)sizeof(SMsgSendInfo));
|
||||
pTaskInfo->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pMsgSendInfo->param = NULL;
|
||||
pMsgSendInfo->param = pExchangeInfo;
|
||||
pMsgSendInfo->msgInfo.pData = pMsg;
|
||||
pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
|
||||
pMsgSendInfo->msgType = TDMT_VND_FETCH;
|
||||
pMsgSendInfo->fp = loadRemoteDataCallback;
|
||||
|
||||
int64_t transporterId = 0;
|
||||
void* pTransporter = NULL;
|
||||
int32_t code = asyncSendMsgToServer(pTransporter, &epSet, &transporterId, pMsgSendInfo);
|
||||
int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &epSet, &transporterId, pMsgSendInfo);
|
||||
tsem_wait(&pExchangeInfo->ready);
|
||||
|
||||
printf("abc\n");
|
||||
getchar();
|
||||
if (pExchangeInfo->pRsp->numOfRows == 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// add it into the sink node
|
||||
SSDataBlock* pRes = pExchangeInfo->pResult;
|
||||
char* pData = pExchangeInfo->pRsp->data;
|
||||
|
||||
for(int32_t i = 0; i < pOperator->numOfOutput; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i);
|
||||
char* tmp = realloc(pColInfoData->pData, pColInfoData->info.bytes * pExchangeInfo->pRsp->numOfRows);
|
||||
if (tmp == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
size_t len = pExchangeInfo->pRsp->numOfRows * pColInfoData->info.bytes;
|
||||
memcpy(tmp, pData, len);
|
||||
|
||||
pColInfoData->pData = tmp;
|
||||
pData += len;
|
||||
}
|
||||
|
||||
pRes->info.numOfCols = pOperator->numOfOutput;
|
||||
pRes->info.rows = pExchangeInfo->pRsp->numOfRows;
|
||||
|
||||
return pExchangeInfo->pResult;
|
||||
|
||||
_error:
|
||||
tfree(pMsg);
|
||||
tfree(pMsgSendInfo);
|
||||
|
||||
terrno = pTaskInfo->code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SOperatorInfo* createExchangeOperatorInfo(const SVgroupInfo* pVgroups, int32_t numOfSources, int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
|
||||
assert(numOfSources > 0);
|
||||
static SSDataBlock* createResultDataBlock(const SArray* pExprInfo);
|
||||
|
||||
SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pExprInfo, SExecTaskInfo* pTaskInfo) {
|
||||
SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo));
|
||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||
|
||||
|
@ -4975,21 +5046,71 @@ SOperatorInfo* createExchangeOperatorInfo(const SVgroupInfo* pVgroups, int32_t n
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pInfo->numOfSources = numOfSources;
|
||||
pInfo->pSources = taosArrayDup(pSources);
|
||||
assert(taosArrayGetSize(pInfo->pSources) > 0);
|
||||
|
||||
size_t size = taosArrayGetSize(pExprInfo);
|
||||
pInfo->pResult = createResultDataBlock(pExprInfo);
|
||||
|
||||
pOperator->name = "ExchangeOperator";
|
||||
pOperator->operatorType = OP_Exchange;
|
||||
pOperator->blockingOptr = false;
|
||||
pOperator->status = OP_IN_EXECUTING;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->numOfOutput = numOfOutput;
|
||||
pOperator->numOfOutput = size;
|
||||
pOperator->pRuntimeEnv = NULL;
|
||||
pOperator->exec = doLoadRemoteData;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
{ // todo refactor
|
||||
SRpcInit rpcInit;
|
||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||
rpcInit.localPort = 0;
|
||||
rpcInit.label = "TSC";
|
||||
rpcInit.numOfThreads = 1;
|
||||
rpcInit.cfp = processRspMsg;
|
||||
rpcInit.sessions = tsMaxConnections;
|
||||
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||
rpcInit.user = (char *)"root";
|
||||
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||
rpcInit.ckey = "key";
|
||||
// rpcInit.spi = 1;
|
||||
rpcInit.secret = (char *)"dcc5bed04851fec854c035b2e40263b6";
|
||||
|
||||
pInfo->pTransporter = rpcOpen(&rpcInit);
|
||||
if (pInfo->pTransporter == NULL) {
|
||||
return NULL; // todo
|
||||
}
|
||||
}
|
||||
|
||||
return pOperator;
|
||||
}
|
||||
|
||||
SSDataBlock* createResultDataBlock(const SArray* pExprInfo) {
|
||||
SSDataBlock* pResBlock = calloc(1, sizeof(SSDataBlock));
|
||||
if (pResBlock == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
size_t numOfCols = taosArrayGetSize(pExprInfo);
|
||||
pResBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
|
||||
|
||||
SArray* pResult = pResBlock->pDataBlock;
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData colInfoData = {0};
|
||||
SExprInfo* p = taosArrayGetP(pExprInfo, i);
|
||||
|
||||
SSchema* pSchema = &p->base.resSchema;
|
||||
colInfoData.info.type = pSchema->type;
|
||||
colInfoData.info.colId = pSchema->colId;
|
||||
colInfoData.info.bytes = pSchema->bytes;
|
||||
|
||||
taosArrayPush(pResult, &colInfoData);
|
||||
}
|
||||
|
||||
return pResBlock;
|
||||
}
|
||||
|
||||
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, SExecTaskInfo* pTaskInfo) {
|
||||
assert(repeatTime > 0 && numOfOutput > 0);
|
||||
|
||||
|
@ -5016,7 +5137,6 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
|
|||
pOperator->status = OP_IN_EXECUTING;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->numOfOutput = numOfOutput;
|
||||
pOperator->pRuntimeEnv = NULL;
|
||||
pOperator->exec = doTableScan;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
|
@ -5049,7 +5169,6 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order,
|
|||
pOperator->status = OP_IN_EXECUTING;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->numOfOutput = numOfOutput;
|
||||
pOperator->pRuntimeEnv = NULL;
|
||||
pOperator->exec = doTableScan;
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
|
@ -7363,50 +7482,47 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhyNode* pPhyNode, SExecTaskInfo* pTask
|
|||
SScanPhyNode* pScanPhyNode = (SScanPhyNode*)pPhyNode;
|
||||
size_t numOfCols = taosArrayGetSize(pPhyNode->pTargets);
|
||||
return createDataBlocksOptScanInfo(param, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo);
|
||||
} else if (pPhyNode->info.type == OP_Exchange) {
|
||||
SExchangePhyNode* pEx = (SExchangePhyNode*) pPhyNode;
|
||||
return createExchangeOperatorInfo(pEx->pSrcEndPoints, pEx->node.pTargets, pTaskInfo);
|
||||
} else {
|
||||
assert(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle) {
|
||||
int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, STableGroupInfo* pGroupInfo, void* readerHandle) {
|
||||
STsdbQueryCond cond = {.loadExternalRows = false};
|
||||
|
||||
uint64_t uid = 0;
|
||||
tsdbReadHandleT tsdbReadHandle = NULL;
|
||||
|
||||
SPhyNode* pPhyNode = pPlan->pNode;
|
||||
if (pPhyNode->info.type == OP_TableScan || pPhyNode->info.type == OP_DataBlocksOptScan) {
|
||||
|
||||
STableScanPhyNode* pTableScanNode = (STableScanPhyNode*) pPhyNode;
|
||||
uid = pTableScanNode->scan.uid;
|
||||
cond.order = pTableScanNode->scan.order;
|
||||
STableScanPhyNode* pTableScanNode = (STableScanPhyNode*)pPhyNode;
|
||||
cond.order = pTableScanNode->scan.order;
|
||||
cond.numOfCols = taosArrayGetSize(pTableScanNode->scan.node.pTargets);
|
||||
cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo));
|
||||
cond.twindow = pTableScanNode->window;
|
||||
cond.type = BLOCK_LOAD_OFFSET_SEQ_ORDER;
|
||||
cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo));
|
||||
cond.twindow = pTableScanNode->window;
|
||||
cond.type = BLOCK_LOAD_OFFSET_SEQ_ORDER;
|
||||
|
||||
for(int32_t i = 0; i < cond.numOfCols; ++i) {
|
||||
for (int32_t i = 0; i < cond.numOfCols; ++i) {
|
||||
SExprInfo* pExprInfo = taosArrayGetP(pTableScanNode->scan.node.pTargets, i);
|
||||
assert(pExprInfo->pExpr->nodeType == TEXPR_COL_NODE);
|
||||
|
||||
SSchema* pSchema = pExprInfo->pExpr->pSchema;
|
||||
cond.colList[i].type = pSchema->type;
|
||||
cond.colList[i].type = pSchema->type;
|
||||
cond.colList[i].bytes = pSchema->bytes;
|
||||
cond.colList[i].colId = pSchema->colId;
|
||||
}
|
||||
|
||||
*pTaskInfo = createExecTaskInfo((uint64_t) pPlan->id.queryId);
|
||||
tsdbReadHandle = tsdbQueryTables(readerHandle, &cond, pGroupInfo, (*pTaskInfo)->id.queryId, NULL);
|
||||
} else if (pPhyNode->info.type == OP_Exchange) {
|
||||
*pTaskInfo = createExecTaskInfo((uint64_t) pPlan->id.queryId);
|
||||
} else {
|
||||
assert(0);
|
||||
}
|
||||
|
||||
STableGroupInfo group = {.numOfTables = 1, .pGroupList = taosArrayInit(1, POINTER_BYTES)};
|
||||
SArray* pa = taosArrayInit(1, sizeof(STableKeyInfo));
|
||||
STableKeyInfo info = {.pTable = NULL, .lastKey = 0, .uid = uid};
|
||||
taosArrayPush(pa, &info);
|
||||
|
||||
taosArrayPush(group.pGroupList, &pa);
|
||||
|
||||
*pTaskInfo = createExecTaskInfo((uint64_t)pPlan->id.queryId);
|
||||
tsdbReadHandleT tsdbReadHandle = tsdbQueryTables(readerHandle, &cond, &group, (*pTaskInfo)->id.queryId, NULL);
|
||||
|
||||
(*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, tsdbReadHandle);
|
||||
if ((*pTaskInfo)->pRoot == NULL) {
|
||||
return terrno;
|
||||
|
|
|
@ -768,6 +768,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch
|
|||
|
||||
pDcl->pMsg = (char*)buildUserManipulationMsg(pInfo, &pDcl->msgLen, pCtx->requestId, msgBuf, msgBufLen);
|
||||
pDcl->msgType = (pInfo->type == TSDB_SQL_CREATE_USER) ? TDMT_MND_CREATE_USER : TDMT_MND_ALTER_USER;
|
||||
pDcl->epSet = pCtx->mgmtEpSet;
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -809,6 +810,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch
|
|||
|
||||
pDcl->pMsg = (char*)buildAcctManipulationMsg(pInfo, &pDcl->msgLen, pCtx->requestId, msgBuf, msgBufLen);
|
||||
pDcl->msgType = (pInfo->type == TSDB_SQL_CREATE_ACCT) ? TDMT_MND_CREATE_ACCT : TDMT_MND_ALTER_ACCT;
|
||||
pDcl->epSet = pCtx->mgmtEpSet;
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -816,6 +818,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch
|
|||
case TSDB_SQL_DROP_USER: {
|
||||
pDcl->pMsg = (char*)buildDropUserMsg(pInfo, &pDcl->msgLen, pCtx->requestId, msgBuf, msgBufLen);
|
||||
pDcl->msgType = (pInfo->type == TSDB_SQL_DROP_ACCT) ? TDMT_MND_DROP_ACCT : TDMT_MND_DROP_USER;
|
||||
pDcl->epSet = pCtx->mgmtEpSet;
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -852,6 +855,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch
|
|||
pDcl->pMsg = (char*)pUseDbMsg;
|
||||
pDcl->msgLen = sizeof(SUseDbReq);
|
||||
pDcl->msgType = TDMT_MND_USE_DB;
|
||||
pDcl->epSet = pCtx->mgmtEpSet;
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -880,6 +884,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch
|
|||
goto _error;
|
||||
}
|
||||
|
||||
pDcl->epSet = pCtx->mgmtEpSet;
|
||||
pDcl->pMsg = (char*)pCreateMsg;
|
||||
pDcl->msgLen = sizeof(SCreateDbReq);
|
||||
pDcl->msgType = (pInfo->type == TSDB_SQL_CREATE_DB) ? TDMT_MND_CREATE_DB : TDMT_MND_ALTER_DB;
|
||||
|
@ -908,6 +913,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch
|
|||
pDcl->msgType = TDMT_MND_DROP_DB;
|
||||
pDcl->msgLen = sizeof(SDropDbReq);
|
||||
pDcl->pMsg = (char*)pDropDbMsg;
|
||||
pDcl->epSet = pCtx->mgmtEpSet;
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -920,6 +926,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch
|
|||
|
||||
pDcl->pMsg = (char*)buildCreateStbMsg(pCreateTable, &pDcl->msgLen, pCtx, pMsgBuf);
|
||||
pDcl->msgType = TDMT_MND_CREATE_STB;
|
||||
pDcl->epSet = pCtx->mgmtEpSet;
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -940,6 +947,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch
|
|||
}
|
||||
|
||||
pDcl->msgType = TDMT_MND_CREATE_DNODE;
|
||||
pDcl->epSet = pCtx->mgmtEpSet;
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -950,6 +958,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch
|
|||
}
|
||||
|
||||
pDcl->msgType = TDMT_MND_DROP_DNODE;
|
||||
pDcl->epSet = pCtx->mgmtEpSet;
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
|
@ -106,7 +106,7 @@ int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str);
|
|||
int32_t queryPlanToSql(struct SQueryPlanNode* pQueryNode, char** sql);
|
||||
|
||||
int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryDag** pDag, uint64_t requestId);
|
||||
void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep);
|
||||
void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource);
|
||||
int32_t subPlanToString(const SSubplan *pPhyNode, char** str, int32_t* len);
|
||||
int32_t stringToSubplan(const char* str, SSubplan** subplan);
|
||||
|
||||
|
|
|
@ -13,9 +13,9 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "function.h"
|
||||
#include "os.h"
|
||||
#include "parser.h"
|
||||
#include "function.h"
|
||||
#include "plannerInt.h"
|
||||
|
||||
typedef struct SFillEssInfo {
|
||||
|
|
|
@ -278,7 +278,7 @@ static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNod
|
|||
static SPhyNode* createExchangeNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, uint64_t srcTemplateId) {
|
||||
SExchangePhyNode* node = (SExchangePhyNode*)initPhyNode(pPlanNode, OP_Exchange, sizeof(SExchangePhyNode));
|
||||
node->srcTemplateId = srcTemplateId;
|
||||
node->pSrcEndPoints = validPointer(taosArrayInit(TARRAY_MIN_SIZE, sizeof(SQueryNodeAddr)));
|
||||
node->pSrcEndPoints = validPointer(taosArrayInit(TARRAY_MIN_SIZE, sizeof(SDownstreamSource)));
|
||||
return (SPhyNode*)node;
|
||||
}
|
||||
|
||||
|
@ -409,24 +409,25 @@ int32_t createDag(SQueryPlanNode* pQueryNode, struct SCatalog* pCatalog, SQueryD
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void setExchangSourceNode(uint64_t templateId, SQueryNodeAddr* pEp, SPhyNode* pNode) {
|
||||
void setExchangSourceNode(uint64_t templateId, SDownstreamSource *pSource, SPhyNode* pNode) {
|
||||
if (NULL == pNode) {
|
||||
return;
|
||||
}
|
||||
if (OP_Exchange == pNode->info.type) {
|
||||
SExchangePhyNode* pExchange = (SExchangePhyNode*)pNode;
|
||||
if (templateId == pExchange->srcTemplateId) {
|
||||
taosArrayPush(pExchange->pSrcEndPoints, pEp);
|
||||
taosArrayPush(pExchange->pSrcEndPoints, pSource);
|
||||
}
|
||||
}
|
||||
|
||||
if (pNode->pChildren != NULL) {
|
||||
size_t size = taosArrayGetSize(pNode->pChildren);
|
||||
for(int32_t i = 0; i < size; ++i) {
|
||||
setExchangSourceNode(templateId, pEp, taosArrayGetP(pNode->pChildren, i));
|
||||
setExchangSourceNode(templateId, pSource, taosArrayGetP(pNode->pChildren, i));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* pEp) {
|
||||
setExchangSourceNode(templateId, pEp, subplan->pNode);
|
||||
void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource) {
|
||||
setExchangSourceNode(templateId, pSource, subplan->pNode);
|
||||
}
|
||||
|
|
|
@ -29,6 +29,14 @@ static void copyString(const cJSON* json, const char* name, char* dst) {
|
|||
strcpy(dst, cJSON_GetStringValue(cJSON_GetObjectItem(json, name)));
|
||||
}
|
||||
|
||||
static uint64_t getBigintFromString(const cJSON* json, const char* name) {
|
||||
char* val = getString(json, name);
|
||||
uint64_t intVal = strtoul(val, NULL, 10);
|
||||
tfree(val);
|
||||
|
||||
return intVal;
|
||||
}
|
||||
|
||||
static int64_t getNumber(const cJSON* json, const char* name) {
|
||||
double d = cJSON_GetNumberValue(cJSON_GetObjectItem(json, name));
|
||||
return (int64_t) d;
|
||||
|
@ -543,13 +551,13 @@ static const char* jkTimeWindowEndKey = "EndKey";
|
|||
static bool timeWindowToJson(const void* obj, cJSON* json) {
|
||||
const STimeWindow* win = (const STimeWindow*)obj;
|
||||
|
||||
char tmp[32] = {0};
|
||||
sprintf(tmp, "%"PRId64, win->skey);
|
||||
char tmp[40] = {0};
|
||||
snprintf(tmp, tListLen(tmp),"%"PRId64, win->skey);
|
||||
|
||||
bool res = cJSON_AddStringToObject(json, jkTimeWindowStartKey, tmp);
|
||||
if (res) {
|
||||
memset(tmp, 0, tListLen(tmp));
|
||||
sprintf(tmp, "%"PRId64, win->ekey);
|
||||
snprintf(tmp, tListLen(tmp),"%"PRId64, win->ekey);
|
||||
res = cJSON_AddStringToObject(json, jkTimeWindowEndKey, tmp);
|
||||
}
|
||||
return res;
|
||||
|
@ -557,12 +565,8 @@ static bool timeWindowToJson(const void* obj, cJSON* json) {
|
|||
|
||||
static bool timeWindowFromJson(const cJSON* json, void* obj) {
|
||||
STimeWindow* win = (STimeWindow*)obj;
|
||||
|
||||
char* p = getString(json, jkTimeWindowStartKey);
|
||||
win->skey = strtoll(p, NULL, 10);
|
||||
|
||||
p = getString(json, jkTimeWindowEndKey);
|
||||
win->ekey = strtoll(p, NULL, 10);
|
||||
win->skey = getBigintFromString(json, jkTimeWindowStartKey);
|
||||
win->ekey = getBigintFromString(json, jkTimeWindowEndKey);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -574,14 +578,19 @@ static const char* jkScanNodeTableRevCount = "Reverse";
|
|||
|
||||
static bool scanNodeToJson(const void* obj, cJSON* json) {
|
||||
const SScanPhyNode* pNode = (const SScanPhyNode*)obj;
|
||||
bool res = cJSON_AddNumberToObject(json, jkScanNodeTableId, pNode->uid);
|
||||
|
||||
char uid[40] = {0};
|
||||
snprintf(uid, tListLen(uid), "%"PRIu64, pNode->uid);
|
||||
bool res = cJSON_AddStringToObject(json, jkScanNodeTableId, uid);
|
||||
|
||||
if (res) {
|
||||
res = cJSON_AddNumberToObject(json, jkScanNodeTableType, pNode->tableType);
|
||||
}
|
||||
|
||||
if (res) {
|
||||
res = cJSON_AddNumberToObject(json, jkScanNodeTableOrder, pNode->order);
|
||||
}
|
||||
|
||||
if (res) {
|
||||
res = cJSON_AddNumberToObject(json, jkScanNodeTableCount, pNode->count);
|
||||
}
|
||||
|
@ -589,12 +598,14 @@ static bool scanNodeToJson(const void* obj, cJSON* json) {
|
|||
if (res) {
|
||||
res = cJSON_AddNumberToObject(json, jkScanNodeTableRevCount, pNode->reverse);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
static bool scanNodeFromJson(const cJSON* json, void* obj) {
|
||||
SScanPhyNode* pNode = (SScanPhyNode*)obj;
|
||||
pNode->uid = getNumber(json, jkScanNodeTableId);
|
||||
|
||||
pNode->uid = getBigintFromString(json, jkScanNodeTableId);
|
||||
pNode->tableType = getNumber(json, jkScanNodeTableType);
|
||||
pNode->count = getNumber(json, jkScanNodeTableCount);
|
||||
pNode->order = getNumber(json, jkScanNodeTableOrder);
|
||||
|
@ -715,40 +726,72 @@ static bool epAddrFromJson(const cJSON* json, void* obj) {
|
|||
return true;
|
||||
}
|
||||
|
||||
static const char* jkNodeAddrId = "NodeId";
|
||||
static const char* jkNodeAddrInUse = "InUse";
|
||||
static const char* jkNodeAddrEpAddrs = "EpAddrs";
|
||||
static const char* jkNodeAddrId = "NodeId";
|
||||
static const char* jkNodeAddrInUse = "InUse";
|
||||
static const char* jkNodeAddrEpAddrs = "Ep";
|
||||
static const char* jkNodeAddr = "NodeAddr";
|
||||
static const char* jkNodeTaskId = "TaskId";
|
||||
static const char* jkNodeTaskSchedId = "SchedId";
|
||||
|
||||
static bool queryNodeAddrToJson(const void* obj, cJSON* json) {
|
||||
const SQueryNodeAddr* pAddr = (const SQueryNodeAddr*) obj;
|
||||
bool res = cJSON_AddNumberToObject(json, jkNodeAddrId, pAddr->nodeId);
|
||||
|
||||
if (res) {
|
||||
res = cJSON_AddNumberToObject(json, jkNodeAddrInUse, pAddr->inUse);
|
||||
}
|
||||
|
||||
if (res) {
|
||||
res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, pAddr->epAddr, sizeof(SEpAddr), pAddr->numOfEps);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
static bool queryNodeAddrFromJson(const cJSON* json, void* obj) {
|
||||
SQueryNodeAddr* pAddr = (SQueryNodeAddr*) obj;
|
||||
|
||||
pAddr->nodeId = getNumber(json, jkNodeAddrId);
|
||||
pAddr->inUse = getNumber(json, jkNodeAddrInUse);
|
||||
|
||||
int32_t numOfEps = 0;
|
||||
bool res = fromRawArray(json, jkNodeAddrEpAddrs, epAddrFromJson, pAddr->epAddr, sizeof(SEpAddr), &numOfEps);
|
||||
pAddr->numOfEps = numOfEps;
|
||||
return res;
|
||||
}
|
||||
|
||||
static bool nodeAddrToJson(const void* obj, cJSON* json) {
|
||||
const SQueryNodeAddr* ep = (const SQueryNodeAddr*)obj;
|
||||
bool res = cJSON_AddNumberToObject(json, jkNodeAddrId, ep->nodeId);
|
||||
const SDownstreamSource* pSource = (const SDownstreamSource*) obj;
|
||||
bool res = cJSON_AddNumberToObject(json, jkNodeTaskId, pSource->taskId);
|
||||
|
||||
if (res) {
|
||||
res = cJSON_AddNumberToObject(json, jkNodeAddrInUse, ep->inUse);
|
||||
char t[30] = {0};
|
||||
snprintf(t, tListLen(t), "%"PRIu64, pSource->schedId);
|
||||
res = cJSON_AddStringToObject(json, jkNodeTaskSchedId, t);
|
||||
}
|
||||
|
||||
if (res) {
|
||||
res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, ep->epAddr, ep->numOfEps, sizeof(SEpAddr));
|
||||
res = addObject(json, jkNodeAddr, queryNodeAddrToJson, &pSource->addr);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
static bool nodeAddrFromJson(const cJSON* json, void* obj) {
|
||||
SQueryNodeAddr* ep = (SQueryNodeAddr*)obj;
|
||||
ep->nodeId = getNumber(json, jkNodeAddrId);
|
||||
ep->inUse = getNumber(json, jkNodeAddrInUse);
|
||||
int32_t numOfEps = 0;
|
||||
bool res = fromRawArray(json, jkNodeAddrEpAddrs, nodeAddrFromJson, &ep->epAddr, sizeof(SEpAddr), &numOfEps);
|
||||
ep->numOfEps = numOfEps;
|
||||
SDownstreamSource* pSource = (SDownstreamSource*)obj;
|
||||
pSource->taskId = getNumber(json, jkNodeTaskId);
|
||||
|
||||
pSource->schedId = getBigintFromString(json, jkNodeTaskSchedId);
|
||||
bool res = fromObject(json, jkNodeAddr, queryNodeAddrFromJson, &pSource->addr, true);
|
||||
return res;
|
||||
}
|
||||
|
||||
static const char* jkExchangeNodeSrcTemplateId = "SrcTemplateId";
|
||||
static const char* jkExchangeNodeSrcEndPoints = "SrcEndPoints";
|
||||
static const char* jkExchangeNodeSrcEndPoints = "SrcAddrs";
|
||||
|
||||
static bool exchangeNodeToJson(const void* obj, cJSON* json) {
|
||||
const SExchangePhyNode* exchange = (const SExchangePhyNode*)obj;
|
||||
bool res = cJSON_AddNumberToObject(json, jkExchangeNodeSrcTemplateId, exchange->srcTemplateId);
|
||||
if (res) {
|
||||
res = addInlineArray(json, jkExchangeNodeSrcEndPoints, nodeAddrToJson, exchange->pSrcEndPoints);
|
||||
res = addRawArray(json, jkExchangeNodeSrcEndPoints, nodeAddrToJson, exchange->pSrcEndPoints->pData, sizeof(SDownstreamSource), taosArrayGetSize(exchange->pSrcEndPoints));
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
@ -756,7 +799,7 @@ static bool exchangeNodeToJson(const void* obj, cJSON* json) {
|
|||
static bool exchangeNodeFromJson(const cJSON* json, void* obj) {
|
||||
SExchangePhyNode* exchange = (SExchangePhyNode*)obj;
|
||||
exchange->srcTemplateId = getNumber(json, jkExchangeNodeSrcTemplateId);
|
||||
return fromInlineArray(json, jkExchangeNodeSrcEndPoints, nodeAddrFromJson, &exchange->pSrcEndPoints, sizeof(SQueryNodeAddr));
|
||||
return fromInlineArray(json, jkExchangeNodeSrcEndPoints, nodeAddrFromJson, &exchange->pSrcEndPoints, sizeof(SDownstreamSource));
|
||||
}
|
||||
|
||||
static bool specificPhyNodeToJson(const void* obj, cJSON* json) {
|
||||
|
@ -965,7 +1008,11 @@ static const char* jkIdSubplanId = "SubplanId";
|
|||
|
||||
static bool subplanIdToJson(const void* obj, cJSON* jId) {
|
||||
const SSubplanId* id = (const SSubplanId*)obj;
|
||||
bool res = cJSON_AddNumberToObject(jId, jkIdQueryId, id->queryId);
|
||||
|
||||
char ids[40] = {0};
|
||||
snprintf(ids, tListLen(ids), "%"PRIu64, id->queryId);
|
||||
|
||||
bool res = cJSON_AddStringToObject(jId, jkIdQueryId, ids);
|
||||
if (res) {
|
||||
res = cJSON_AddNumberToObject(jId, jkIdTemplateId, id->templateId);
|
||||
}
|
||||
|
@ -977,9 +1024,10 @@ static bool subplanIdToJson(const void* obj, cJSON* jId) {
|
|||
|
||||
static bool subplanIdFromJson(const cJSON* json, void* obj) {
|
||||
SSubplanId* id = (SSubplanId*)obj;
|
||||
id->queryId = getNumber(json, jkIdQueryId);
|
||||
|
||||
id->queryId = getBigintFromString(json, jkIdQueryId);
|
||||
id->templateId = getNumber(json, jkIdTemplateId);
|
||||
id->subplanId = getNumber(json, jkIdSubplanId);
|
||||
id->subplanId = getNumber(json, jkIdSubplanId);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -65,9 +65,9 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag,
|
|||
}
|
||||
|
||||
if (pLogicPlan->info.type != QNODE_MODIFY) {
|
||||
char* str = NULL;
|
||||
queryPlanToString(pLogicPlan, &str);
|
||||
printf("%s\n", str);
|
||||
// char* str = NULL;
|
||||
// queryPlanToString(pLogicPlan, &str);
|
||||
// printf("%s\n", str);
|
||||
}
|
||||
|
||||
code = optimizeQueryPlan(pLogicPlan);
|
||||
|
@ -87,8 +87,8 @@ int32_t qCreateQueryDag(const struct SQueryNode* pNode, struct SQueryDag** pDag,
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SQueryNodeAddr* ep) {
|
||||
setSubplanExecutionNode(subplan, templateId, ep);
|
||||
void qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource) {
|
||||
setSubplanExecutionNode(subplan, templateId, pSource);
|
||||
}
|
||||
|
||||
int32_t qSubPlanToString(const SSubplan *subplan, char** str, int32_t* len) {
|
||||
|
|
|
@ -1,15 +1,4 @@
|
|||
aux_source_directory(src QWORKER_SRC)
|
||||
#add_library(qworker ${QWORKER_SRC})
|
||||
#target_include_directories(
|
||||
# qworker
|
||||
# PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/qworker"
|
||||
# PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||
#)
|
||||
#
|
||||
#target_link_libraries(
|
||||
# qworker
|
||||
# PRIVATE os util transport planner qcom executor
|
||||
#)
|
||||
|
||||
add_library(qworker STATIC ${QWORKER_SRC})
|
||||
target_include_directories(
|
||||
|
@ -18,11 +7,6 @@ target_include_directories(
|
|||
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||
)
|
||||
|
||||
#set_target_properties(qworker PROPERTIES
|
||||
# IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/libqworker.a"
|
||||
# INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_SOURCE_DIR}/include/libs/qworker"
|
||||
# )
|
||||
|
||||
target_link_libraries(qworker
|
||||
PRIVATE os util transport planner qcom executor
|
||||
)
|
||||
|
|
|
@ -271,6 +271,8 @@ int32_t qwAddTaskCtxImpl(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_
|
|||
char id[sizeof(qId) + sizeof(tId)] = {0};
|
||||
QW_SET_QTID(id, qId, tId);
|
||||
|
||||
printf("%"PRIx64", tid:%"PRIx64"\n", qId, tId);
|
||||
|
||||
SQWTaskCtx nctx = {0};
|
||||
|
||||
QW_LOCK(QW_WRITE, &mgmt->ctxLock);
|
||||
|
|
|
@ -29,7 +29,7 @@ extern "C" {
|
|||
#define SCHEDULE_DEFAULT_JOB_NUMBER 1000
|
||||
#define SCHEDULE_DEFAULT_TASK_NUMBER 1000
|
||||
|
||||
#define SCH_MAX_CONDIDATE_EP_NUM TSDB_MAX_REPLICA
|
||||
#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA
|
||||
|
||||
enum {
|
||||
SCH_READ = 1,
|
||||
|
|
|
@ -52,9 +52,9 @@ int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *
|
|||
pTask->level = pLevel;
|
||||
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START);
|
||||
pTask->taskId = schGenTaskId();
|
||||
pTask->execAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
|
||||
pTask->execAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
|
||||
if (NULL == pTask->execAddrs) {
|
||||
SCH_TASK_ELOG("taosArrayInit %d exec addrs failed", SCH_MAX_CONDIDATE_EP_NUM);
|
||||
SCH_TASK_ELOG("taosArrayInit %d exec addrs failed", SCH_MAX_CANDIDATE_EP_NUM);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -66,7 +66,7 @@ void schFreeTask(SSchTask* pTask) {
|
|||
taosArrayDestroy(pTask->candidateAddrs);
|
||||
}
|
||||
|
||||
tfree(pTask->msg);
|
||||
tfree(pTask->msg);
|
||||
|
||||
if (pTask->children) {
|
||||
taosArrayDestroy(pTask->children);
|
||||
|
@ -408,9 +408,9 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
|
|||
}
|
||||
|
||||
pTask->candidateIdx = 0;
|
||||
pTask->candidateAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
|
||||
pTask->candidateAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr));
|
||||
if (NULL == pTask->candidateAddrs) {
|
||||
SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCH_MAX_CONDIDATE_EP_NUM);
|
||||
SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCH_MAX_CANDIDATE_EP_NUM);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -430,10 +430,10 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
|
|||
if (pJob->nodeList) {
|
||||
nodeNum = taosArrayGetSize(pJob->nodeList);
|
||||
|
||||
for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) {
|
||||
for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
|
||||
SQueryNodeAddr *naddr = taosArrayGet(pJob->nodeList, i);
|
||||
|
||||
if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) {
|
||||
if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
|
||||
SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
@ -443,12 +443,12 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
|
|||
}
|
||||
|
||||
if (addNum <= 0) {
|
||||
SCH_TASK_ELOG("no available execNode as condidate addr, nodeNum:%d", nodeNum);
|
||||
SCH_TASK_ELOG("no available execNode as candidate addr, nodeNum:%d", nodeNum);
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
}
|
||||
|
||||
/*
|
||||
for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CONDIDATE_EP_NUM; ++i) {
|
||||
for (int32_t i = 0; i < job->dataSrcEps.numOfEps && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) {
|
||||
strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
|
||||
epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];
|
||||
|
||||
|
@ -767,7 +767,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
|
|||
}
|
||||
|
||||
/*
|
||||
if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CONDIDATE_EP_NUM) {
|
||||
if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CANDIDATE_EP_NUM) {
|
||||
strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn));
|
||||
job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port;
|
||||
|
||||
|
@ -782,7 +782,8 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
|
|||
atomic_add_fetch_32(&par->childReady, 1);
|
||||
|
||||
SCH_LOCK(SCH_WRITE, &par->lock);
|
||||
qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &pTask->succeedAddr);
|
||||
SDownstreamSource source = {.taskId = pTask->taskId, .schedId = schMgmt.sId, .addr = pTask->succeedAddr};
|
||||
qSetSubplanExecutionNode(par->plan, pTask->plan->id.templateId, &source);
|
||||
SCH_UNLOCK(SCH_WRITE, &par->lock);
|
||||
|
||||
if (SCH_TASK_READY_TO_LUNCH(par)) {
|
||||
|
@ -853,7 +854,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
|
|||
SResReadyRsp *rsp = (SResReadyRsp *)msg;
|
||||
|
||||
if (rspCode != TSDB_CODE_SUCCESS || NULL == msg || rsp->code != TSDB_CODE_SUCCESS) {
|
||||
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rsp->code));
|
||||
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
|
||||
}
|
||||
|
||||
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
|
||||
|
@ -879,7 +880,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
|
|||
if (rsp->completed) {
|
||||
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
|
||||
}
|
||||
|
||||
|
||||
SCH_ERR_JRET(schProcessOnDataFetched(pJob));
|
||||
|
||||
break;
|
||||
|
@ -1214,6 +1215,8 @@ int32_t schLaunchTask(SSchJob *pJob, SSchTask *pTask) {
|
|||
SCH_TASK_ELOG("subplanToString error, code:%x, msg:%p, len:%d", code, pTask->msg, pTask->msgLen);
|
||||
SCH_ERR_JRET(code);
|
||||
}
|
||||
|
||||
printf("physical plan:%s\n", pTask->msg);
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(schSetTaskCandidateAddrs(pJob, pTask));
|
||||
|
@ -1477,7 +1480,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
|
|||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SSubQueryMsg *pMsg = msg;
|
||||
SSubQueryMsg *pMsg = (SSubQueryMsg*) msg;
|
||||
|
||||
pMsg->header.vgId = htonl(tInfo.addr.nodeId);
|
||||
|
||||
|
@ -1512,7 +1515,7 @@ int32_t schedulerCopyTask(STaskInfo *src, SArray **dst, int32_t copyNum) {
|
|||
}
|
||||
|
||||
int32_t code = 0;
|
||||
|
||||
|
||||
*dst = taosArrayInit(copyNum, sizeof(STaskInfo));
|
||||
if (NULL == *dst) {
|
||||
qError("taosArrayInit %d taskInfo failed", copyNum);
|
||||
|
@ -1523,7 +1526,7 @@ int32_t schedulerCopyTask(STaskInfo *src, SArray **dst, int32_t copyNum) {
|
|||
STaskInfo info = {0};
|
||||
|
||||
info.addr = src->addr;
|
||||
|
||||
|
||||
for (int32_t i = 0; i < copyNum; ++i) {
|
||||
info.msg = malloc(msgSize);
|
||||
if (NULL == info.msg) {
|
||||
|
@ -1534,7 +1537,7 @@ int32_t schedulerCopyTask(STaskInfo *src, SArray **dst, int32_t copyNum) {
|
|||
memcpy(info.msg, src->msg, msgSize);
|
||||
|
||||
info.msg->taskId = schGenUUID();
|
||||
|
||||
|
||||
if (NULL == taosArrayPush(*dst, &info)) {
|
||||
qError("taosArrayPush failed, idx:%d", i);
|
||||
free(info.msg);
|
||||
|
@ -1548,7 +1551,7 @@ _return:
|
|||
|
||||
schedulerFreeTaskList(*dst);
|
||||
*dst = NULL;
|
||||
|
||||
|
||||
SCH_RET(code);
|
||||
}
|
||||
|
||||
|
@ -1668,7 +1671,7 @@ void scheduleFreeJob(void *job) {
|
|||
schProcessOnJobDropped(pJob, TSDB_CODE_QRY_JOB_FREED);
|
||||
setJobFree = true;
|
||||
}
|
||||
|
||||
|
||||
usleep(1);
|
||||
} else {
|
||||
assert(0);
|
||||
|
|
|
@ -44,7 +44,7 @@ typedef struct {
|
|||
|
||||
// TDB Operations
|
||||
TDB_EXTERN int tdbCreateDB(TDB** dbpp, tdb_db_t type);
|
||||
TDB_EXTERN int tdbOpenDB(TDB* dbp, uint32_t flags);
|
||||
TDB_EXTERN int tdbOpenDB(TDB* dbp, const char* fname, const char* dbname, uint32_t flags);
|
||||
TDB_EXTERN int tdbCloseDB(TDB* dbp, uint32_t flags);
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -56,9 +56,28 @@ _err:
|
|||
return 0;
|
||||
}
|
||||
|
||||
TDB_EXTERN int tdbOpenDB(TDB* dbp, uint32_t flags) {
|
||||
// TODO
|
||||
return 0;
|
||||
TDB_EXTERN int tdbOpenDB(TDB* dbp, const char* fname, const char* dbname, uint32_t flags) {
|
||||
int ret = 0;
|
||||
|
||||
if ((dbp->fname = strdup(fname)) == NULL) {
|
||||
ret = -1;
|
||||
return ret;
|
||||
}
|
||||
|
||||
// Create the backup file if the file not exists
|
||||
|
||||
// Open the file as a sub-db or a master-db
|
||||
if (dbname) {
|
||||
if ((dbp->dbname = strdup(dbname)) == NULL) {
|
||||
ret = -1;
|
||||
return ret;
|
||||
}
|
||||
// TODO: Open the DB as a SUB-DB in this file
|
||||
} else {
|
||||
// TODO: Open the DB as a MASTER-DB in this file
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
TDB_EXTERN int tdbCloseDB(TDB* dbp, uint32_t flags) {
|
||||
|
|
|
@ -25,6 +25,14 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef struct {
|
||||
// TODO
|
||||
} TDB_MPOOL;
|
||||
|
||||
typedef struct {
|
||||
int fd;
|
||||
} TDB_FH;
|
||||
|
||||
struct TDB {
|
||||
pgsize_t pageSize;
|
||||
tdb_db_t type;
|
||||
|
@ -35,6 +43,9 @@ struct TDB {
|
|||
TDB_HASH * hash;
|
||||
TDB_HEAP * heap;
|
||||
} dbam; // db access method
|
||||
|
||||
TDB_FH * fhp; // The backup file handle
|
||||
TDB_MPOOL *mph; // The memory pool handle
|
||||
};
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -6,9 +6,9 @@ TEST(tdb_api_test, tdb_create_open_close_db_test) {
|
|||
int ret;
|
||||
TDB *dbp;
|
||||
|
||||
tdbCreateDB(&dbp, TDB_BTREE_T);
|
||||
// tdbCreateDB(&dbp, TDB_BTREE_T);
|
||||
|
||||
tdbOpenDB(dbp, 0);
|
||||
// tdbOpenDB(dbp, 0);
|
||||
|
||||
tdbCloseDB(dbp, 0);
|
||||
// tdbCloseDB(dbp, 0);
|
||||
}
|
|
@ -16,8 +16,8 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "os.h"
|
||||
#include "tconfig.h"
|
||||
#include "ulog.h"
|
||||
#include "tutil.h"
|
||||
#include "ulog.h"
|
||||
|
||||
SGlobalCfg tsGlobalConfig[TSDB_CFG_MAX_NUM] = {{0}};
|
||||
int32_t tsGlobalConfigNum = 0;
|
||||
|
|
Loading…
Reference in New Issue