Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/vnode

This commit is contained in:
Hongze Cheng 2022-01-21 05:30:57 +00:00
commit fcc82bc296
20 changed files with 253 additions and 165 deletions

View File

@ -1525,9 +1525,23 @@ typedef struct SMqSetCVgReq {
char* sql; char* sql;
char* logicalPlan; char* logicalPlan;
char* physicalPlan; char* physicalPlan;
SArray* tasks; // SArray<SSubQueryMsg> SSubQueryMsg msg;
} SMqSetCVgReq; } SMqSetCVgReq;
static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* pMsg) {
int32_t tlen = sizeof(SSubQueryMsg) + pMsg->contentLen;
if (buf == NULL) return tlen;
memcpy(*buf, pMsg, tlen);
*buf = POINTER_SHIFT(*buf, tlen);
return tlen;
}
static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) {
int32_t tlen = sizeof(SSubQueryMsg) + ((SSubQueryMsg*)buf)->contentLen;
memcpy(pMsg, buf, tlen);
return POINTER_SHIFT(buf, tlen);
}
static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) { static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pReq->vgId); tlen += taosEncodeFixedI32(buf, pReq->vgId);
@ -1537,6 +1551,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq*
tlen += taosEncodeString(buf, pReq->sql); tlen += taosEncodeString(buf, pReq->sql);
tlen += taosEncodeString(buf, pReq->logicalPlan); tlen += taosEncodeString(buf, pReq->logicalPlan);
tlen += taosEncodeString(buf, pReq->physicalPlan); tlen += taosEncodeString(buf, pReq->physicalPlan);
tlen += tEncodeSSubQueryMsg(buf, &pReq->msg);
return tlen; return tlen;
} }
@ -1548,7 +1563,7 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
buf = taosDecodeString(buf, &pReq->sql); buf = taosDecodeString(buf, &pReq->sql);
buf = taosDecodeString(buf, &pReq->logicalPlan); buf = taosDecodeString(buf, &pReq->logicalPlan);
buf = taosDecodeString(buf, &pReq->physicalPlan); buf = taosDecodeString(buf, &pReq->physicalPlan);
pReq->tasks = NULL; buf = tDecodeSSubQueryMsg(buf, &pReq->msg);
return buf; return buf;
} }

View File

@ -254,6 +254,7 @@ typedef struct SMultiFunctionsDesc {
bool interpQuery; bool interpQuery;
bool distinct; bool distinct;
bool join; bool join;
bool continueQuery;
} SMultiFunctionsDesc; } SMultiFunctionsDesc;
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength, int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength,

View File

@ -23,6 +23,7 @@
#error To use this include file, first define either INCLUDE_AS_ENUM or INCLUDE_AS_NAME #error To use this include file, first define either INCLUDE_AS_ENUM or INCLUDE_AS_NAME
#endif #endif
OP_ENUM_MACRO(StreamScan)
OP_ENUM_MACRO(TableScan) OP_ENUM_MACRO(TableScan)
OP_ENUM_MACRO(DataBlocksOptScan) OP_ENUM_MACRO(DataBlocksOptScan)
OP_ENUM_MACRO(TableSeqScan) OP_ENUM_MACRO(TableSeqScan)

View File

@ -109,12 +109,71 @@ typedef struct STableMetaOutput {
STableMeta *tbMeta; STableMeta *tbMeta;
} STableMetaOutput; } STableMetaOutput;
const SSchema* tGetTbnameColumnSchema(); typedef struct SDataBuf {
void *pData;
uint32_t len;
} SDataBuf;
typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code);
typedef int32_t (*__async_exec_fn_t)(void* param);
typedef struct SMsgSendInfo {
__async_send_cb_fn_t fp; //async callback function
void *param;
uint64_t requestId;
uint64_t requestObjRefId;
int32_t msgType;
SDataBuf msgInfo;
} SMsgSendInfo;
typedef struct SQueryNodeAddr {
int32_t nodeId; // vgId or qnodeId
int8_t inUse;
int8_t numOfEps;
SEpAddr epAddr[TSDB_MAX_REPLICA];
} SQueryNodeAddr;
static FORCE_INLINE void tConvertQueryAddrToEpSet(SEpSet* pEpSet, const SQueryNodeAddr* pAddr) {
pEpSet->inUse = pAddr->inUse;
pEpSet->numOfEps = pAddr->numOfEps;
for (int j = 0; j < TSDB_MAX_REPLICA; j++) {
pEpSet->port[j] = pAddr->epAddr[j].port;
memcpy(pEpSet->fqdn[j], pAddr->epAddr[j].fqdn, TSDB_FQDN_LEN);
}
}
int32_t initTaskQueue();
int32_t cleanupTaskQueue();
/**
*
* @param execFn The asynchronously execution function
* @param execParam The parameters of the execFn
* @param code The response code during execution the execFn
* @return
*/
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
/**
* Asynchronously send message to server, after the response received, the callback will be incured.
*
* @param pTransporter
* @param epSet
* @param pTransporterId
* @param pInfo
* @return
*/
int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo);
void initQueryModuleMsgHandle(); void initQueryModuleMsgHandle();
const SSchema* tGetTbnameColumnSchema();
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen); extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen);
extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t msgSize); extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t msgSize);
#define SET_META_TYPE_NONE(t) (t) = META_TYPE_NON_TABLE #define SET_META_TYPE_NONE(t) (t) = META_TYPE_NON_TABLE
#define SET_META_TYPE_CTABLE(t) (t) = META_TYPE_CTABLE #define SET_META_TYPE_CTABLE(t) (t) = META_TYPE_CTABLE
#define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE #define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE

View File

@ -83,56 +83,6 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo);
void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcReportProgress(void *pConn, char *pCont, int contLen); int rpcReportProgress(void *pConn, char *pCont, int contLen);
void rpcCancelRequest(int64_t rid); void rpcCancelRequest(int64_t rid);
typedef struct SDataBuf {
void *pData;
uint32_t len;
} SDataBuf;
typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code);
typedef int32_t (*__async_exec_fn_t)(void* param);
typedef struct SMsgSendInfo {
__async_send_cb_fn_t fp; //async callback function
void *param;
uint64_t requestId;
uint64_t requestObjRefId;
int32_t msgType;
SDataBuf msgInfo;
} SMsgSendInfo;
typedef struct SQueryNodeAddr {
int32_t nodeId; // vgId or qnodeId
int8_t inUse;
int8_t numOfEps;
SEpAddr epAddr[TSDB_MAX_REPLICA];
} SQueryNodeAddr;
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
int32_t initTaskQueue();
int32_t cleanupTaskQueue();
/**
*
* @param execFn The asynchronously execution function
* @param execParam The parameters of the execFn
* @param code The response code during execution the execFn
* @return
*/
int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
/**
* Asynchronously send message to server, after the response received, the callback will be incured.
*
* @param pTransporter
* @param epSet
* @param pTransporterId
* @param pInfo
* @return
*/
int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -394,6 +394,9 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql,
CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return);
CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return); CHECK_CODE_GOTO(parseSql(pRequest, &pQueryNode), _return);
SQueryStmtInfo* pQueryStmtInfo = (SQueryStmtInfo* ) pQueryNode;
pQueryStmtInfo->info.continueQuery = true;
// todo check for invalid sql statement and return with error code // todo check for invalid sql statement and return with error code
CHECK_CODE_GOTO(qCreateQueryDag(pQueryNode, &pRequest->body.pDag, pRequest->requestId), _return); CHECK_CODE_GOTO(qCreateQueryDag(pQueryNode, &pRequest->body.pDag, pRequest->requestId), _return);
@ -403,6 +406,8 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql,
goto _return; goto _return;
} }
printf("%s\n", pStr);
// The topic should be related to a database that the queried table is belonged to. // The topic should be related to a database that the queried table is belonged to.
SName name = {0}; SName name = {0};
char dbName[TSDB_DB_FNAME_LEN] = {0}; char dbName[TSDB_DB_FNAME_LEN] = {0};

View File

@ -525,30 +525,30 @@ TEST(testCase, driverInit_Test) {
// taosHashCleanup(phash); // taosHashCleanup(phash);
//} //}
// //
//TEST(testCase, create_topic_Test) { TEST(testCase, create_topic_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL); assert(pConn != NULL);
//
// TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("error in use db, reason:%s\n", taos_errstr(pRes)); printf("error in use db, reason:%s\n", taos_errstr(pRes));
// } }
// taos_free_result(pRes); taos_free_result(pRes);
//
// TAOS_FIELD* pFields = taos_fetch_fields(pRes); TAOS_FIELD* pFields = taos_fetch_fields(pRes);
// ASSERT_TRUE(pFields == nullptr); ASSERT_TRUE(pFields == nullptr);
//
// int32_t numOfFields = taos_num_fields(pRes); int32_t numOfFields = taos_num_fields(pRes);
// ASSERT_EQ(numOfFields, 0); ASSERT_EQ(numOfFields, 0);
//
// taos_free_result(pRes); taos_free_result(pRes);
//
// char* sql = "select * from tu"; char* sql = "select * from tu";
// pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql)); pRes = taos_create_topic(pConn, "test_topic_1", sql, strlen(sql));
// taos_free_result(pRes); taos_free_result(pRes);
// taos_close(pConn); taos_close(pConn);
//} }
//
//TEST(testCase, insert_test) { //TEST(testCase, insert_test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// ASSERT_NE(pConn, nullptr); // ASSERT_NE(pConn, nullptr);
@ -646,36 +646,36 @@ TEST(testCase, driverInit_Test) {
// taos_close(pConn); // taos_close(pConn);
//} //}
TEST(testCase, agg_query_tables) { //TEST(testCase, agg_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr); // ASSERT_NE(pConn, nullptr);
//
TAOS_RES* pRes = taos_query(pConn, "use dbv"); // TAOS_RES* pRes = taos_query(pConn, "use dbv");
taos_free_result(pRes); // taos_free_result(pRes);
//
pRes = taos_query(pConn, "create table tx using st tags(111111111111111)"); // pRes = taos_query(pConn, "create table tx using st tags(111111111111111)");
if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
printf("failed to create table, reason:%s\n", taos_errstr(pRes)); // printf("failed to create table, reason:%s\n", taos_errstr(pRes));
} // }
taos_free_result(pRes); // taos_free_result(pRes);
//
pRes = taos_query(pConn, "select count(*) from tu"); // pRes = taos_query(pConn, "select count(*) from tu");
if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); // printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); // taos_free_result(pRes);
ASSERT_TRUE(false); // ASSERT_TRUE(false);
} // }
//
TAOS_ROW pRow = NULL; // TAOS_ROW pRow = NULL;
TAOS_FIELD* pFields = taos_fetch_fields(pRes); // TAOS_FIELD* pFields = taos_fetch_fields(pRes);
int32_t numOfFields = taos_num_fields(pRes); // int32_t numOfFields = taos_num_fields(pRes);
//
char str[512] = {0}; // char str[512] = {0};
while ((pRow = taos_fetch_row(pRes)) != NULL) { // while ((pRow = taos_fetch_row(pRes)) != NULL) {
int32_t code = taos_print_row(str, pRow, pFields, numOfFields); // int32_t code = taos_print_row(str, pRow, pFields, numOfFields);
printf("%s\n", str); // printf("%s\n", str);
} // }
//
taos_free_result(pRes); // taos_free_result(pRes);
taos_close(pConn); // taos_close(pConn);
} //}

View File

@ -327,11 +327,13 @@ typedef struct SMqTopicConsumer {
#endif #endif
typedef struct SMqConsumerEp { typedef struct SMqConsumerEp {
int32_t vgId; // -1 for unassigned int32_t vgId; // -1 for unassigned
SEpSet epset; SEpSet epset;
int64_t consumerId; // -1 for unassigned int64_t consumerId; // -1 for unassigned
int64_t lastConsumerHbTs; int64_t lastConsumerHbTs;
int64_t lastVgHbTs; int64_t lastVgHbTs;
int32_t execLen;
SSubQueryMsg qExec;
} SMqConsumerEp; } SMqConsumerEp;
static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) { static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
@ -339,6 +341,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pCon
tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId); tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId);
tlen += taosEncodeSEpSet(buf, &pConsumerEp->epset); tlen += taosEncodeSEpSet(buf, &pConsumerEp->epset);
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec);
return tlen; return tlen;
} }
@ -346,6 +349,8 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu
buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId); buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId);
buf = taosDecodeSEpSet(buf, &pConsumerEp->epset); buf = taosDecodeSEpSet(buf, &pConsumerEp->epset);
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec);
pConsumerEp->execLen = sizeof(SSubQueryMsg) + pConsumerEp->qExec.contentLen;
return buf; return buf;
} }

View File

@ -105,6 +105,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
strcpy(req.sql, pTopic->sql); strcpy(req.sql, pTopic->sql);
strcpy(req.logicalPlan, pTopic->logicalPlan); strcpy(req.logicalPlan, pTopic->logicalPlan);
strcpy(req.physicalPlan, pTopic->physicalPlan); strcpy(req.physicalPlan, pTopic->physicalPlan);
memcpy(&req.msg, &pCEp->qExec, pCEp->execLen);
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
void *reqStr = malloc(tlen); void *reqStr = malloc(tlen);
if (reqStr == NULL) { if (reqStr == NULL) {
@ -143,7 +144,21 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) { static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) {
SMqConsumerEp CEp; SMqConsumerEp CEp;
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
int32_t sz; //convert phyplan to dag
SQueryDag *pDag = qStringToDag(pTopic->physicalPlan);
SArray *pArray;
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
}
int32_t sz = taosArrayGetSize(pArray);
//convert dag to msg
for (int32_t i = 0; i < sz; i++) {
STaskInfo* pTaskInfo = taosArrayGet(pArray, i);
int32_t vgId = pTaskInfo->addr.nodeId;
SEpSet epSet;
tConvertQueryAddrToEpSet(&epSet, &pTaskInfo->addr);
}
/*pTopic->physicalPlan;*/
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
void *pIter = sdbFetch(pSdb, SDB_VGROUP, NULL, (void **)&pVgroup); void *pIter = sdbFetch(pSdb, SDB_VGROUP, NULL, (void **)&pVgroup);
@ -156,6 +171,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
} }
return 0; return 0;
qDestroyQueryDag(pDag);
} }
static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer, static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer,

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "../../../../../include/libs/executor/executor.h"
#include "tqInt.h" #include "tqInt.h"
#include "tqMetaStore.h" #include "tqMetaStore.h"
@ -681,7 +682,6 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) {
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) { if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
// TODO: handle error // TODO: handle error
} }
ASSERT(taosArrayGetSize(pArray) == 0);
STaskInfo *pInfo = taosArrayGet(pArray, 0); STaskInfo *pInfo = taosArrayGet(pArray, 0);
SArray* pTasks; SArray* pTasks;
schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE); schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE);
@ -733,6 +733,7 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo)
// TODO: filter out unused column // TODO: filter out unused column
return 0; return 0;
} }
SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
int32_t sversion = pHandle->pBlock->sversion; int32_t sversion = pHandle->pBlock->sversion;
SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true); SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true);
@ -762,7 +763,32 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
taosArrayPush(pArray, &colInfo); taosArrayPush(pArray, &colInfo);
return pArray; return pArray;
} }
/*int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t
* status) {*/ static qTaskInfo_t createExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle) {
/*return 0;*/ if (pMsg == NULL || pStreamBlockReadHandle == NULL) {
/*}*/ return NULL;
}
// print those info into log
pMsg->sId = be64toh(pMsg->sId);
pMsg->queryId = be64toh(pMsg->queryId);
pMsg->taskId = be64toh(pMsg->taskId);
pMsg->contentLen = ntohl(pMsg->contentLen);
struct SSubplan *plan = NULL;
int32_t code = qStringToSubplan(pMsg->msg, &plan);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
}
qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(pStreamBlockReadHandle, 0, plan, &pTaskInfo, NULL);
if (code != TSDB_CODE_SUCCESS) {
// TODO: destroy SSubplan & pTaskInfo
terrno = code;
return NULL;
}
return pTaskInfo;
}

View File

@ -253,12 +253,8 @@ typedef struct SExecTaskInfo {
STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray<STableQueryInfo*> structure
pthread_mutex_t lock; // used to synchronize the rsp/query threads pthread_mutex_t lock; // used to synchronize the rsp/query threads
// tsem_t ready;
// int32_t dataReady; // denote if query result is ready or not
// void* rspContext; // response context
char *sql; // query sql string char *sql; // query sql string
jmp_buf env; // jmp_buf env; //
DataSinkHandle dsHandle;
struct SOperatorInfo *pRoot; struct SOperatorInfo *pRoot;
} SExecTaskInfo; } SExecTaskInfo;
@ -666,6 +662,6 @@ int32_t getMaximumIdleDurationSec();
void doInvokeUdf(struct SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t type); void doInvokeUdf(struct SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t type);
void setTaskStatus(SExecTaskInfo *pTaskInfo, int8_t status); void setTaskStatus(SExecTaskInfo *pTaskInfo, int8_t status);
int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle); int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle);
#endif // TDENGINE_EXECUTORIMPL_H #endif // TDENGINE_EXECUTORIMPL_H

View File

@ -69,11 +69,11 @@ void freeParam(STaskParam *param) {
tfree(param->prevResult); tfree(param->prevResult);
} }
int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) { int32_t qCreateExecTask(void* readHandle, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
assert(tsdb != NULL && pSubplan != NULL); assert(readHandle != NULL && pSubplan != NULL);
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
int32_t code = doCreateExecTaskInfo(pSubplan, pTask, tsdb); int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
@ -84,11 +84,9 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_
goto _error; goto _error;
} }
code = dsCreateDataSinker(pSubplan->pDataSink, &(*pTask)->dsHandle); code = dsCreateDataSinker(pSubplan->pDataSink, handle);
*handle = (*pTask)->dsHandle; _error:
_error:
// if failed to add ref for all tables in this query, abort current query // if failed to add ref for all tables in this query, abort current query
return code; return code;
} }

View File

@ -7778,22 +7778,29 @@ static tsdbReadHandleT doCreateDataReadHandle(STableScanPhyNode* pTableScanNode,
return NULL; return NULL;
} }
int32_t doCreateExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle) { int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, void* readerHandle) {
tsdbReadHandleT tReaderHandle = NULL;
int32_t code = 0;
uint64_t queryId = pPlan->id.queryId; uint64_t queryId = pPlan->id.queryId;
SPhyNode* pPhyNode = pPlan->pNode; int32_t code = TSDB_CODE_SUCCESS;
*pTaskInfo = createExecTaskInfo(queryId); *pTaskInfo = createExecTaskInfo(queryId);
if (*pTaskInfo == NULL) {
code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _complete;
}
(*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, readerHandle, queryId); (*pTaskInfo)->pRoot = doCreateOperatorTreeNode(pPlan->pNode, *pTaskInfo, readerHandle, queryId);
if ((*pTaskInfo)->pRoot == NULL) { if ((*pTaskInfo)->pRoot == NULL) {
return terrno; code = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto _complete;
} }
return TSDB_CODE_SUCCESS; return code;
_complete:
tfree(*pTaskInfo);
terrno = code;
return code;
} }
/** /**

View File

@ -28,19 +28,20 @@ extern "C" {
#define QNODE_TAGSCAN 1 #define QNODE_TAGSCAN 1
#define QNODE_TABLESCAN 2 #define QNODE_TABLESCAN 2
#define QNODE_PROJECT 3 #define QNODE_STREAMSCAN 3
#define QNODE_AGGREGATE 4 #define QNODE_PROJECT 4
#define QNODE_GROUPBY 5 #define QNODE_AGGREGATE 5
#define QNODE_LIMIT 6 #define QNODE_GROUPBY 6
#define QNODE_JOIN 7 #define QNODE_LIMIT 7
#define QNODE_DISTINCT 8 #define QNODE_JOIN 8
#define QNODE_SORT 9 #define QNODE_DISTINCT 9
#define QNODE_UNION 10 #define QNODE_SORT 10
#define QNODE_TIMEWINDOW 11 #define QNODE_UNION 11
#define QNODE_SESSIONWINDOW 12 #define QNODE_TIMEWINDOW 12
#define QNODE_STATEWINDOW 13 #define QNODE_SESSIONWINDOW 13
#define QNODE_FILL 14 #define QNODE_STATEWINDOW 14
#define QNODE_MODIFY 15 #define QNODE_FILL 15
#define QNODE_MODIFY 16
typedef struct SQueryDistPlanNodeInfo { typedef struct SQueryDistPlanNodeInfo {
bool stableQuery; // super table query or not bool stableQuery; // super table query or not

View File

@ -121,6 +121,7 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla
switch(type) { switch(type) {
case QNODE_TAGSCAN: case QNODE_TAGSCAN:
case QNODE_STREAMSCAN:
case QNODE_TABLESCAN: { case QNODE_TABLESCAN: {
SQueryTableInfo* info = calloc(1, sizeof(SQueryTableInfo)); SQueryTableInfo* info = calloc(1, sizeof(SQueryTableInfo));
memcpy(info, pExtInfo, sizeof(SQueryTableInfo)); memcpy(info, pExtInfo, sizeof(SQueryTableInfo));
@ -195,7 +196,12 @@ static SQueryPlanNode* doAddTableColumnNode(const SQueryStmtInfo* pQueryInfo, SQ
return pNode; return pNode;
} }
SQueryPlanNode* pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info); SQueryPlanNode* pNode = NULL;
if (pQueryInfo->info.continueQuery) {
pNode = createQueryNode(QNODE_STREAMSCAN, "StreamScan", NULL, 0, NULL, 0, info);
} else {
pNode = createQueryNode(QNODE_TABLESCAN, "TableScan", NULL, 0, NULL, 0, info);
}
if (!pQueryInfo->info.projectionQuery) { if (!pQueryInfo->info.projectionQuery) {
SArray* p = pQueryInfo->exprList[0]; SArray* p = pQueryInfo->exprList[0];
@ -261,7 +267,6 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(const SQueryStmtInfo*
pNode->numOfExpr = num; pNode->numOfExpr = num;
pNode->pExpr = taosArrayInit(num, POINTER_BYTES); pNode->pExpr = taosArrayInit(num, POINTER_BYTES);
taosArrayAddAll(pNode->pExpr, p); taosArrayAddAll(pNode->pExpr, p);
// pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, num, NULL);
} }
} }
@ -433,6 +438,7 @@ static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level,
int32_t len = len1 + totalLen; int32_t len = len1 + totalLen;
switch(pQueryNode->info.type) { switch(pQueryNode->info.type) {
case QNODE_STREAMSCAN:
case QNODE_TABLESCAN: { case QNODE_TABLESCAN: {
SQueryTableInfo* pInfo = (SQueryTableInfo*)pQueryNode->pExtInfo; SQueryTableInfo* pInfo = (SQueryTableInfo*)pQueryNode->pExtInfo;
len1 = sprintf(buf + len, "%s #%" PRIu64, pInfo->tableName, pInfo->uid); len1 = sprintf(buf + len, "%s #%" PRIu64, pInfo->tableName, pInfo->uid);
@ -643,7 +649,6 @@ int32_t queryPlanToStringImpl(char* buf, SQueryPlanNode* pQueryNode, int32_t lev
int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str) { int32_t queryPlanToString(struct SQueryPlanNode* pQueryNode, char** str) {
assert(pQueryNode); assert(pQueryNode);
*str = calloc(1, 4096); *str = calloc(1, 4096);
int32_t len = sprintf(*str, "===== logic plan =====\n"); int32_t len = sprintf(*str, "===== logic plan =====\n");

View File

@ -290,7 +290,8 @@ static bool needMultiNodeScan(SQueryTableInfo* pTable) {
static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, SSubplan* subplan) { static SPhyNode* createSingleTableScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, SSubplan* subplan) {
vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[0]), &subplan->execNode); vgroupMsgToEpSet(&(pTable->pMeta->vgroupList->vgroups[0]), &subplan->execNode);
return createUserTableScanNode(pPlanNode, pTable, OP_TableScan); int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_TableScan:OP_StreamScan;
return createUserTableScanNode(pPlanNode, pTable, type);
} }
static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) { static SPhyNode* createTableScanNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
@ -326,6 +327,7 @@ static SPhyNode* createPhyNode(SPlanContext* pCxt, SQueryPlanNode* pPlanNode) {
case QNODE_TAGSCAN: case QNODE_TAGSCAN:
node = createTagScanNode(pPlanNode); node = createTagScanNode(pPlanNode);
break; break;
case QNODE_STREAMSCAN:
case QNODE_TABLESCAN: case QNODE_TABLESCAN:
node = createTableScanNode(pCxt, pPlanNode); node = createTableScanNode(pCxt, pPlanNode);
break; break;

View File

@ -829,6 +829,7 @@ static bool exchangeNodeFromJson(const cJSON* json, void* obj) {
static bool specificPhyNodeToJson(const void* obj, cJSON* json) { static bool specificPhyNodeToJson(const void* obj, cJSON* json) {
const SPhyNode* phyNode = (const SPhyNode*)obj; const SPhyNode* phyNode = (const SPhyNode*)obj;
switch (phyNode->info.type) { switch (phyNode->info.type) {
case OP_StreamScan:
case OP_TableScan: case OP_TableScan:
case OP_DataBlocksOptScan: case OP_DataBlocksOptScan:
case OP_TableSeqScan: case OP_TableSeqScan:

View File

@ -921,17 +921,17 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
code = qStringToSubplan(qwMsg->msg, &plan); code = qStringToSubplan(qwMsg->msg, &plan);
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("task string to subplan failed, code:%x", code); QW_TASK_ELOG("task string to subplan failed, code:%s", tstrerror(code));
QW_ERR_JRET(code); QW_ERR_JRET(code);
} }
code = qCreateExecTask(qwMsg->node, 0, (struct SSubplan *)plan, &pTaskInfo, &sinkHandle); code = qCreateExecTask(qwMsg->node, 0, (struct SSubplan *)plan, &pTaskInfo, &sinkHandle);
if (code) { if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x", code); QW_TASK_ELOG("qCreateExecTask failed, code:%s", tstrerror(code));
QW_ERR_JRET(code); QW_ERR_JRET(code);
} }
if ((pTaskInfo && NULL == sinkHandle) || (NULL == pTaskInfo && sinkHandle)) { if (NULL == sinkHandle || NULL == pTaskInfo) {
QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle); QW_TASK_ELOG("create task result error, taskHandle:%p, sinkHandle:%p", pTaskInfo, sinkHandle);
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
} }

View File

@ -650,12 +650,12 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
int32_t taskDone = 0; int32_t taskDone = 0;
int32_t code = 0; int32_t code = 0;
SCH_TASK_DLOG("taskOnFailure, code:%x", errCode); SCH_TASK_DLOG("taskOnFailure, code:%s", tstrerror(errCode));
SCH_ERR_JRET(schTaskCheckAndSetRetry(pJob, pTask, errCode, &needRetry)); SCH_ERR_JRET(schTaskCheckAndSetRetry(pJob, pTask, errCode, &needRetry));
if (!needRetry) { if (!needRetry) {
SCH_TASK_ELOG("task failed and no more retry, code:%x", errCode); SCH_TASK_ELOG("task failed and no more retry, code:%s", tstrerror(errCode));
if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) { if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) {
code = schMoveTaskToFailList(pJob, pTask, &moved); code = schMoveTaskToFailList(pJob, pTask, &moved);

View File

@ -85,7 +85,7 @@ void createDbAndStb() {
} }
taos_free_result(pRes); taos_free_result(pRes);
sprintf(qstr, "create table if not exists %s (ts timestamp, i int) tags (j int)", stbName); sprintf(qstr, "create table if not exists %s (ts timestamp, i int) tags (j bigint)", stbName);
pRes = taos_query(con, qstr); pRes = taos_query(con, qstr);
code = taos_errno(pRes); code = taos_errno(pRes);
if (code != 0) { if (code != 0) {