Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/tsdb_refact
This commit is contained in:
commit
3c18269c29
|
@ -95,7 +95,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan,
|
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, struct SSubplan* pPlan,
|
||||||
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model);
|
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -159,13 +159,6 @@ int64_t qGetQueriedTableUid(qTaskInfo_t tinfo);
|
||||||
*/
|
*/
|
||||||
int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t tagCondLen, SArray* pTableIdList);
|
int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t tagCondLen, SArray* pTableIdList);
|
||||||
|
|
||||||
/**
|
|
||||||
* Update the table id list of a given query.
|
|
||||||
* @param uid child table uid
|
|
||||||
* @param type operation type: ADD|DROP
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type);
|
|
||||||
|
|
||||||
void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
|
void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
|
||||||
|
|
||||||
|
|
|
@ -150,12 +150,8 @@ typedef struct SqlFunctionCtx {
|
||||||
} SqlFunctionCtx;
|
} SqlFunctionCtx;
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
TEXPR_NODE_DUMMY = 0x0,
|
|
||||||
TEXPR_BINARYEXPR_NODE= 0x1,
|
TEXPR_BINARYEXPR_NODE= 0x1,
|
||||||
TEXPR_UNARYEXPR_NODE = 0x2,
|
TEXPR_UNARYEXPR_NODE = 0x2,
|
||||||
TEXPR_FUNCTION_NODE = 0x3,
|
|
||||||
TEXPR_COL_NODE = 0x4,
|
|
||||||
TEXPR_VALUE_NODE = 0x8,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct tExprNode {
|
typedef struct tExprNode {
|
||||||
|
|
|
@ -44,6 +44,7 @@ void showDB(TAOS* pConn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void fetchCallback(void* param, void* res, int32_t numOfRow) {
|
void fetchCallback(void* param, void* res, int32_t numOfRow) {
|
||||||
|
#if 0
|
||||||
printf("numOfRow = %d \n", numOfRow);
|
printf("numOfRow = %d \n", numOfRow);
|
||||||
int numFields = taos_num_fields(res);
|
int numFields = taos_num_fields(res);
|
||||||
TAOS_FIELD *fields = taos_fetch_fields(res);
|
TAOS_FIELD *fields = taos_fetch_fields(res);
|
||||||
|
@ -63,6 +64,13 @@ void fetchCallback(void* param, void* res, int32_t numOfRow) {
|
||||||
// taos_close(_taos);
|
// taos_close(_taos);
|
||||||
// taos_cleanup();
|
// taos_cleanup();
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
if (numOfRow == 0) {
|
||||||
|
printf("completed\n");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_fetch_raw_block_a(res, fetchCallback, param);
|
||||||
}
|
}
|
||||||
|
|
||||||
void queryCallback(void* param, void* res, int32_t code) {
|
void queryCallback(void* param, void* res, int32_t code) {
|
||||||
|
@ -70,7 +78,7 @@ void queryCallback(void* param, void* res, int32_t code) {
|
||||||
printf("failed to execute, reason:%s\n", taos_errstr(res));
|
printf("failed to execute, reason:%s\n", taos_errstr(res));
|
||||||
}
|
}
|
||||||
printf("start to fetch data\n");
|
printf("start to fetch data\n");
|
||||||
taos_fetch_rows_a(res, fetchCallback, param);
|
taos_fetch_raw_block_a(res, fetchCallback, param);
|
||||||
}
|
}
|
||||||
|
|
||||||
void queryCallback1(void* param, void* res, int32_t code) {
|
void queryCallback1(void* param, void* res, int32_t code) {
|
||||||
|
@ -778,9 +786,9 @@ TEST(testCase, async_api_test) {
|
||||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
ASSERT_NE(pConn, nullptr);
|
ASSERT_NE(pConn, nullptr);
|
||||||
|
|
||||||
taos_query(pConn, "use nest");
|
taos_query(pConn, "use table_alltype_hyperloglog");
|
||||||
|
#if 0
|
||||||
TAOS_RES* pRes = taos_query(pConn, "select NOW() from (select * from regular_table_2 where tbname in ('regular_table_2_1') and q_bigint <= 9223372036854775807 and q_tinyint <= 127 and q_bool in ( true , false) ) order by ts;");
|
TAOS_RES* pRes = taos_query(pConn, "insert into tu(ts) values('2022-02-27 12:12:61')");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed, reason:%s\n", taos_errstr(pRes));
|
printf("failed, reason:%s\n", taos_errstr(pRes));
|
||||||
}
|
}
|
||||||
|
@ -802,8 +810,9 @@ TEST(testCase, async_api_test) {
|
||||||
printf("%s\n", str);
|
printf("%s\n", str);
|
||||||
memset(str, 0, sizeof(str));
|
memset(str, 0, sizeof(str));
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
taos_query_a(pConn, "alter table test.m1 comment 'abcde' ", queryCallback, pConn);
|
taos_query_a(pConn, "select HYPERLOGLOG(q_ts) from stable_1_2 where ts between 1630000001000 and 1630100001000 interval(19d) Fill(NONE);", queryCallback, pConn);
|
||||||
getchar();
|
getchar();
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1226,6 +1226,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
|
||||||
SColumnInfoData colInfo = {0};
|
SColumnInfoData colInfo = {0};
|
||||||
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
|
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||||
colInfo.info = p->info;
|
colInfo.info = p->info;
|
||||||
|
colInfo.hasNull = true;
|
||||||
taosArrayPush(pBlock->pDataBlock, &colInfo);
|
taosArrayPush(pBlock->pDataBlock, &colInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -370,6 +370,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
|
||||||
SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
|
SMnodeObj *pObj = mndAcquireMnode(pMnode, pDnode->id);
|
||||||
if (pObj != NULL) {
|
if (pObj != NULL) {
|
||||||
if (pObj->state != statusReq.mload.syncState) {
|
if (pObj->state != statusReq.mload.syncState) {
|
||||||
|
mInfo("dnode:%d, mnode syncstate from %s to %s", pObj->id, syncStr(pObj->state), syncStr(statusReq.mload.syncState));
|
||||||
pObj->state = statusReq.mload.syncState;
|
pObj->state = statusReq.mload.syncState;
|
||||||
pObj->stateStartTime = taosGetTimestampMs();
|
pObj->stateStartTime = taosGetTimestampMs();
|
||||||
}
|
}
|
||||||
|
|
|
@ -699,6 +699,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
|
||||||
|
|
||||||
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
|
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
|
||||||
pLoad->syncState = syncGetMyRole(pMnode->syncMgmt.sync);
|
pLoad->syncState = syncGetMyRole(pMnode->syncMgmt.sync);
|
||||||
|
mTrace("mnode current syncstate is %s", syncStr(pLoad->syncState));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -667,6 +667,10 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
||||||
}
|
}
|
||||||
if (pObj->pDnode && mndIsDnodeOnline(pObj->pDnode, curMs)) {
|
if (pObj->pDnode && mndIsDnodeOnline(pObj->pDnode, curMs)) {
|
||||||
roles = syncStr(pObj->state);
|
roles = syncStr(pObj->state);
|
||||||
|
if (pObj->state == TAOS_SYNC_STATE_LEADER && pObj->id != pMnode->selfDnodeId) {
|
||||||
|
roles = syncStr(TAOS_SYNC_STATE_ERROR);
|
||||||
|
mError("mnode:%d, is leader too", pObj->id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
char b2[12 + VARSTR_HEADER_SIZE] = {0};
|
char b2[12 + VARSTR_HEADER_SIZE] = {0};
|
||||||
STR_WITH_MAXSIZE_TO_VARSTR(b2, roles, pShow->pMeta->pSchemas[cols].bytes);
|
STR_WITH_MAXSIZE_TO_VARSTR(b2, roles, pShow->pMeta->pSchemas[cols].bytes);
|
||||||
|
|
|
@ -55,7 +55,6 @@ typedef struct SResultRow {
|
||||||
uint32_t numOfRows; // number of rows of current time window
|
uint32_t numOfRows; // number of rows of current time window
|
||||||
STimeWindow win;
|
STimeWindow win;
|
||||||
struct SResultRowEntryInfo pEntryInfo[]; // For each result column, there is a resultInfo
|
struct SResultRowEntryInfo pEntryInfo[]; // For each result column, there is a resultInfo
|
||||||
// char *key; // start key of current result row
|
|
||||||
} SResultRow;
|
} SResultRow;
|
||||||
|
|
||||||
typedef struct SResultRowPosition {
|
typedef struct SResultRowPosition {
|
||||||
|
|
|
@ -189,7 +189,7 @@ typedef struct SExecTaskInfo {
|
||||||
} schemaVer;
|
} schemaVer;
|
||||||
|
|
||||||
STableListInfo tableqinfoList; // this is a table list
|
STableListInfo tableqinfoList; // this is a table list
|
||||||
char* sql; // query sql string
|
const char* sql; // query sql string
|
||||||
jmp_buf env; // jump to this position when error happens.
|
jmp_buf env; // jump to this position when error happens.
|
||||||
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
|
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
|
||||||
struct SOperatorInfo* pRoot;
|
struct SOperatorInfo* pRoot;
|
||||||
|
@ -544,6 +544,13 @@ typedef struct SFillOperatorInfo {
|
||||||
bool multigroupResult;
|
bool multigroupResult;
|
||||||
} SFillOperatorInfo;
|
} SFillOperatorInfo;
|
||||||
|
|
||||||
|
typedef struct SScalarSupp {
|
||||||
|
SExprInfo* pScalarExprInfo;
|
||||||
|
int32_t numOfScalarExpr; // the number of scalar expression in group operator
|
||||||
|
SqlFunctionCtx* pScalarFuncCtx;
|
||||||
|
int32_t* rowCellInfoOffset; // offset value for each row result cell info
|
||||||
|
} SScalarSupp;
|
||||||
|
|
||||||
typedef struct SGroupbyOperatorInfo {
|
typedef struct SGroupbyOperatorInfo {
|
||||||
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
|
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
|
||||||
SOptrBasicInfo binfo;
|
SOptrBasicInfo binfo;
|
||||||
|
@ -556,10 +563,7 @@ typedef struct SGroupbyOperatorInfo {
|
||||||
char* keyBuf; // group by keys for hash
|
char* keyBuf; // group by keys for hash
|
||||||
int32_t groupKeyLen; // total group by column width
|
int32_t groupKeyLen; // total group by column width
|
||||||
SGroupResInfo groupResInfo;
|
SGroupResInfo groupResInfo;
|
||||||
SExprInfo* pScalarExprInfo;
|
SScalarSupp scalarSup;
|
||||||
int32_t numOfScalarExpr; // the number of scalar expression in group operator
|
|
||||||
SqlFunctionCtx* pScalarFuncCtx;
|
|
||||||
int32_t* rowCellInfoOffset; // offset value for each row result cell info
|
|
||||||
} SGroupbyOperatorInfo;
|
} SGroupbyOperatorInfo;
|
||||||
|
|
||||||
typedef struct SDataGroupInfo {
|
typedef struct SDataGroupInfo {
|
||||||
|
@ -583,6 +587,7 @@ typedef struct SPartitionOperatorInfo {
|
||||||
void* pGroupIter; // group iterator
|
void* pGroupIter; // group iterator
|
||||||
int32_t pageIndex; // page index of current group
|
int32_t pageIndex; // page index of current group
|
||||||
SSDataBlock* pUpdateRes;
|
SSDataBlock* pUpdateRes;
|
||||||
|
SScalarSupp scalarSupp;
|
||||||
} SPartitionOperatorInfo;
|
} SPartitionOperatorInfo;
|
||||||
|
|
||||||
typedef struct SWindowRowsSup {
|
typedef struct SWindowRowsSup {
|
||||||
|
@ -760,6 +765,8 @@ void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t
|
||||||
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag);
|
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t *order, int32_t* scanFlag);
|
||||||
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
|
int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz);
|
||||||
|
|
||||||
|
SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
||||||
|
|
||||||
void doSetOperatorCompleted(SOperatorInfo* pOperator);
|
void doSetOperatorCompleted(SOperatorInfo* pOperator);
|
||||||
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
|
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
|
||||||
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset);
|
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset);
|
||||||
|
@ -839,20 +846,17 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
|
||||||
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
|
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
|
||||||
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SColumn* pStateKeyCol, SExecTaskInfo* pTaskInfo);
|
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SColumn* pStateKeyCol, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo);
|
||||||
SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo);
|
|
||||||
|
|
||||||
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResultBlock, const SNodeListNode* pValNode, SExecTaskInfo* pTaskInfo);
|
SSDataBlock* pResultBlock, const SNodeListNode* pValNode, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream,
|
SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap,
|
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo);
|
||||||
int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo);
|
|
||||||
|
|
||||||
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream,
|
SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
|
||||||
SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo);
|
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
|
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);
|
||||||
|
@ -890,7 +894,7 @@ int32_t decodeOperator(SOperatorInfo* ops, char* data, int32_t length);
|
||||||
|
|
||||||
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
|
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
|
||||||
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
|
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
|
||||||
EOPTR_EXEC_MODEL model);
|
const char* sql, EOPTR_EXEC_MODEL model);
|
||||||
int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo);
|
int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo);
|
||||||
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity,
|
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity,
|
||||||
int32_t* resNum);
|
int32_t* resNum);
|
||||||
|
|
|
@ -151,6 +151,13 @@ SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle);
|
||||||
*/
|
*/
|
||||||
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle);
|
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* get proper sort buffer pages according to the row size
|
||||||
|
* @param rowSize
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
int32_t getProperSortPageSize(size_t rowSize);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -28,17 +28,6 @@ typedef struct SCompSupporter {
|
||||||
int32_t order;
|
int32_t order;
|
||||||
} SCompSupporter;
|
} SCompSupporter;
|
||||||
|
|
||||||
int32_t getOutputInterResultBufSize(STaskAttr* pQueryAttr) {
|
|
||||||
int32_t size = 0;
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
|
|
||||||
// size += pQueryAttr->pExpr1[i].base.interBytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(size >= 0);
|
|
||||||
return size;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size) {
|
int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size) {
|
||||||
pResultRowInfo->size = 0;
|
pResultRowInfo->size = 0;
|
||||||
pResultRowInfo->cur.pageId = -1;
|
pResultRowInfo->cur.pageId = -1;
|
||||||
|
|
|
@ -121,7 +121,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
qTaskInfo_t pTaskInfo = NULL;
|
qTaskInfo_t pTaskInfo = NULL;
|
||||||
code = qCreateExecTask(streamReadHandle, 0, 0, plan, &pTaskInfo, NULL, OPTR_EXEC_MODEL_STREAM);
|
code = qCreateExecTask(streamReadHandle, 0, 0, plan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
// TODO: destroy SSubplan & pTaskInfo
|
// TODO: destroy SSubplan & pTaskInfo
|
||||||
terrno = code;
|
terrno = code;
|
||||||
|
|
|
@ -31,13 +31,13 @@ static void initRefPool() {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
|
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
|
||||||
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, EOPTR_EXEC_MODEL model) {
|
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model) {
|
||||||
assert(readHandle != NULL && pSubplan != NULL);
|
assert(readHandle != NULL && pSubplan != NULL);
|
||||||
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
|
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
|
||||||
|
|
||||||
taosThreadOnce(&initPoolOnce, initRefPool);
|
taosThreadOnce(&initPoolOnce, initRefPool);
|
||||||
|
|
||||||
int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, model);
|
int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,9 +40,6 @@
|
||||||
#define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN)
|
#define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN)
|
||||||
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
|
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
|
||||||
|
|
||||||
#define SDATA_BLOCK_INITIALIZER \
|
|
||||||
(SDataBlockInfo) { {0}, 0 }
|
|
||||||
|
|
||||||
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
|
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
|
@ -95,8 +92,6 @@ static void setBlockStatisInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlo
|
||||||
|
|
||||||
static void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo);
|
static void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo);
|
||||||
|
|
||||||
static SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int32_t* numOfFilterCols);
|
|
||||||
|
|
||||||
static void releaseQueryBuf(size_t numOfTables);
|
static void releaseQueryBuf(size_t numOfTables);
|
||||||
|
|
||||||
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
|
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
|
||||||
|
@ -454,75 +449,6 @@ static bool chkWindowOutputBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo
|
||||||
return chkResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE, masterscan, groupId);
|
return chkResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE, masterscan, groupId);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doUpdateResultRowIndex(SResultRowInfo* pResultRowInfo, TSKEY lastKey, bool ascQuery,
|
|
||||||
bool timeWindowInterpo) {
|
|
||||||
int64_t skey = TSKEY_INITIAL_VAL;
|
|
||||||
#if 0
|
|
||||||
int32_t i = 0;
|
|
||||||
for (i = pResultRowInfo->size - 1; i >= 0; --i) {
|
|
||||||
SResultRow* pResult = pResultRowInfo->pResult[i];
|
|
||||||
if (pResult->closed) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// new closed result rows
|
|
||||||
if (timeWindowInterpo) {
|
|
||||||
if (pResult->endInterp &&
|
|
||||||
((pResult->win.skey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery))) {
|
|
||||||
if (i > 0) { // the first time window, the startInterp is false.
|
|
||||||
assert(pResult->startInterp);
|
|
||||||
}
|
|
||||||
|
|
||||||
closeResultRow(pResultRowInfo, i);
|
|
||||||
} else {
|
|
||||||
skey = pResult->win.skey;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if ((pResult->win.ekey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery)) {
|
|
||||||
closeResultRow(pResultRowInfo, i);
|
|
||||||
} else {
|
|
||||||
skey = pResult->win.skey;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// all result rows are closed, set the last one to be the skey
|
|
||||||
if (skey == TSKEY_INITIAL_VAL) {
|
|
||||||
if (pResultRowInfo->size == 0) {
|
|
||||||
// assert(pResultRowInfo->current == NULL);
|
|
||||||
assert(pResultRowInfo->curPos == -1);
|
|
||||||
pResultRowInfo->curPos = -1;
|
|
||||||
} else {
|
|
||||||
pResultRowInfo->curPos = pResultRowInfo->size - 1;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
for (i = pResultRowInfo->size - 1; i >= 0; --i) {
|
|
||||||
SResultRow* pResult = pResultRowInfo->pResult[i];
|
|
||||||
if (pResult->closed) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (i == pResultRowInfo->size - 1) {
|
|
||||||
pResultRowInfo->curPos = i;
|
|
||||||
} else {
|
|
||||||
pResultRowInfo->curPos = i + 1; // current not closed result object
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
//
|
|
||||||
// static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, const STimeWindow* pWin, TSKEY lastKey,
|
|
||||||
// bool ascQuery, bool interp) {
|
|
||||||
// if ((lastKey > pWin->ekey && ascQuery) || (lastKey < pWin->ekey && (!ascQuery))) {
|
|
||||||
// closeAllResultRows(pResultRowInfo);
|
|
||||||
// pResultRowInfo->curPos = pResultRowInfo->size - 1;
|
|
||||||
// } else {
|
|
||||||
// int32_t step = ascQuery ? 1 : -1;
|
|
||||||
// doUpdateResultRowIndex(pResultRowInfo, lastKey - step, ascQuery, interp);
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
|
|
||||||
// query_range_start, query_range_end, window_duration, window_start, window_end
|
// query_range_start, query_range_end, window_duration, window_start, window_end
|
||||||
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
|
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow) {
|
||||||
pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
|
pColData->info.type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||||
|
@ -886,7 +812,6 @@ int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char*
|
||||||
doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup);
|
doSetResultOutBufByKey(pBuf, pResultRowInfo, (char*)pData, bytes, true, groupId, pTaskInfo, false, pAggSup);
|
||||||
assert(pResultRow != NULL);
|
assert(pResultRow != NULL);
|
||||||
|
|
||||||
setResultRowKey(pResultRow, pData, type);
|
|
||||||
setResultRowInitCtx(pResultRow, pCtx, numOfCols, binfo->rowCellInfoOffset);
|
setResultRowInitCtx(pResultRow, pCtx, numOfCols, binfo->rowCellInfoOffset);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -908,21 +833,6 @@ bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_FIRST) {
|
|
||||||
// // return QUERY_IS_ASC_QUERY(pQueryAttr);
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// // denote the order type
|
|
||||||
// if ((functionId == FUNCTION_LAST_DST || functionId == FUNCTION_LAST)) {
|
|
||||||
// // return pCtx->param[0].i == pQueryAttr->order.order;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// in the reverse table scan, only the following functions need to be executed
|
|
||||||
// if (IS_REVERSE_SCAN(pRuntimeEnv) ||
|
|
||||||
// (pRuntimeEnv->scanFlag == REPEAT_SCAN && functionId != FUNCTION_STDDEV && functionId != FUNCTION_PERCT)) {
|
|
||||||
// return false;
|
|
||||||
// }
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3935,27 +3845,6 @@ void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// static STableQueryInfo* initTableQueryInfo(const STableListInfo* pTableListInfo) {
|
|
||||||
// int32_t size = taosArrayGetSize(pTableListInfo->pTableList);
|
|
||||||
// if (size == 0) {
|
|
||||||
// return NULL;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// STableQueryInfo* pTableQueryInfo = taosMemoryCalloc(size, sizeof(STableQueryInfo));
|
|
||||||
// if (pTableQueryInfo == NULL) {
|
|
||||||
// return NULL;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// for (int32_t j = 0; j < size; ++j) {
|
|
||||||
// STableKeyInfo* pk = taosArrayGet(pTableListInfo->pTableList, j);
|
|
||||||
// STableQueryInfo* pTQueryInfo = &pTableQueryInfo[j];
|
|
||||||
// pTQueryInfo->lastKey = pk->lastKey;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// pTableQueryInfo->lastKey = 0;
|
|
||||||
// return pTableQueryInfo;
|
|
||||||
//}
|
|
||||||
|
|
||||||
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
|
SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
|
||||||
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) {
|
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) {
|
||||||
|
@ -4525,7 +4414,6 @@ static STsdbReader doCreateDataReader(STableScanPhysiNode* pTableScanNode, SRead
|
||||||
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
||||||
|
|
||||||
static SArray* createSortInfo(SNodeList* pNodeList);
|
static SArray* createSortInfo(SNodeList* pNodeList);
|
||||||
static SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
|
||||||
|
|
||||||
int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) {
|
int32_t extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo* pTaskInfo) {
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
|
@ -4871,12 +4759,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
pTaskInfo);
|
pTaskInfo);
|
||||||
|
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
|
||||||
SPartitionPhysiNode* pPartNode = (SPartitionPhysiNode*)pPhyNode;
|
pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
|
||||||
SArray* pColList = extractPartitionColInfo(pPartNode->pPartitionKeys);
|
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
|
|
||||||
|
|
||||||
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &num);
|
|
||||||
pOptr = createPartitionOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pTaskInfo);
|
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) {
|
||||||
SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
|
SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
|
@ -5314,7 +5197,7 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pT
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
|
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
|
||||||
EOPTR_EXEC_MODEL model) {
|
const char* sql, EOPTR_EXEC_MODEL model) {
|
||||||
uint64_t queryId = pPlan->id.queryId;
|
uint64_t queryId = pPlan->id.queryId;
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
@ -5324,6 +5207,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
|
||||||
goto _complete;
|
goto _complete;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
(*pTaskInfo)->sql = sql;
|
||||||
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId,
|
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId,
|
||||||
&(*pTaskInfo)->tableqinfoList, pPlan->pTagCond);
|
&(*pTaskInfo)->tableqinfoList, pPlan->pTagCond);
|
||||||
if (NULL == (*pTaskInfo)->pRoot) {
|
if (NULL == (*pTaskInfo)->pRoot) {
|
||||||
|
|
|
@ -320,8 +320,8 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
||||||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, scanFlag, true);
|
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order, scanFlag, true);
|
||||||
|
|
||||||
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
|
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
|
||||||
if (pInfo->pScalarExprInfo != NULL) {
|
if (pInfo->scalarSup.pScalarExprInfo != NULL) {
|
||||||
pTaskInfo->code = projectApplyFunctions(pInfo->pScalarExprInfo, pBlock, pBlock, pInfo->pScalarFuncCtx, pInfo->numOfScalarExpr, NULL);
|
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSup.pScalarExprInfo, pBlock, pBlock, pInfo->scalarSup.pScalarFuncCtx, pInfo->scalarSup.numOfScalarExpr, NULL);
|
||||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
||||||
longjmp(pTaskInfo->env, pTaskInfo->code);
|
longjmp(pTaskInfo->env, pTaskInfo->code);
|
||||||
}
|
}
|
||||||
|
@ -385,9 +385,9 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
||||||
pInfo->pGroupCols = pGroupColList;
|
pInfo->pGroupCols = pGroupColList;
|
||||||
pInfo->pCondition = pCondition;
|
pInfo->pCondition = pCondition;
|
||||||
|
|
||||||
pInfo->pScalarExprInfo = pScalarExprInfo;
|
pInfo->scalarSup.pScalarExprInfo = pScalarExprInfo;
|
||||||
pInfo->numOfScalarExpr = numOfScalarExpr;
|
pInfo->scalarSup.numOfScalarExpr = numOfScalarExpr;
|
||||||
pInfo->pScalarFuncCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->rowCellInfoOffset);
|
pInfo->scalarSup.pScalarFuncCtx = createSqlFunctionCtx(pScalarExprInfo, numOfScalarExpr, &pInfo->scalarSup.rowCellInfoOffset);
|
||||||
|
|
||||||
int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
|
int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -624,7 +624,9 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SGroupbyOperatorInfo* pInfo = pOperator->info;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
|
SPartitionOperatorInfo* pInfo = pOperator->info;
|
||||||
SSDataBlock* pRes = pInfo->binfo.pRes;
|
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||||
|
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
|
@ -641,6 +643,14 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
|
||||||
|
if (pInfo->scalarSupp.pScalarExprInfo != NULL) {
|
||||||
|
pTaskInfo->code = projectApplyFunctions(pInfo->scalarSupp.pScalarExprInfo, pBlock, pBlock, pInfo->scalarSupp.pScalarFuncCtx, pInfo->scalarSupp.numOfScalarExpr, NULL);
|
||||||
|
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pTaskInfo->env, pTaskInfo->code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
doHashPartition(pOperator, pBlock);
|
doHashPartition(pOperator, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -665,15 +675,26 @@ static void destroyPartitionOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
taosMemoryFree(pInfo->columnOffset);
|
taosMemoryFree(pInfo->columnOffset);
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) {
|
||||||
SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo) {
|
|
||||||
SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo));
|
SPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SPartitionOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->pGroupCols = pGroupColList;
|
SSDataBlock* pResBlock = createResDataBlock(pPartNode->node.pOutputDataBlockDesc);
|
||||||
|
|
||||||
|
int32_t numOfCols = 0;
|
||||||
|
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols);
|
||||||
|
|
||||||
|
pInfo->pGroupCols = extractPartitionColInfo(pPartNode->pPartitionKeys);
|
||||||
|
|
||||||
|
if (pPartNode->pExprs != NULL) {
|
||||||
|
pInfo->scalarSupp.numOfScalarExpr = 0;
|
||||||
|
pInfo->scalarSupp.pScalarExprInfo = createExprInfo(pPartNode->pExprs, NULL, &pInfo->scalarSupp.numOfScalarExpr);
|
||||||
|
pInfo->scalarSupp.pScalarFuncCtx = createSqlFunctionCtx(
|
||||||
|
pInfo->scalarSupp.pScalarExprInfo, pInfo->scalarSupp.numOfScalarExpr, &pInfo->scalarSupp.rowCellInfoOffset);
|
||||||
|
}
|
||||||
|
|
||||||
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
|
||||||
pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
|
pInfo->pGroupSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
|
||||||
|
@ -683,16 +704,16 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
|
|
||||||
uint32_t defaultPgsz = 0;
|
uint32_t defaultPgsz = 0;
|
||||||
uint32_t defaultBufsz = 0;
|
uint32_t defaultBufsz = 0;
|
||||||
getBufferPgSize(pResultBlock->info.rowSize, &defaultPgsz, &defaultBufsz);
|
getBufferPgSize(pResBlock->info.rowSize, &defaultPgsz, &defaultBufsz);
|
||||||
|
|
||||||
int32_t code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, TD_TMP_DIR_PATH);
|
int32_t code = createDiskbasedBuf(&pInfo->pBuf, defaultPgsz, defaultBufsz, pTaskInfo->id.str, TD_TMP_DIR_PATH);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->rowCapacity = blockDataGetCapacityInRow(pResultBlock, getBufPageSize(pInfo->pBuf));
|
pInfo->rowCapacity = blockDataGetCapacityInRow(pResBlock, getBufPageSize(pInfo->pBuf));
|
||||||
pInfo->columnOffset = setupColumnOffset(pResultBlock, pInfo->rowCapacity);
|
pInfo->columnOffset = setupColumnOffset(pResBlock, pInfo->rowCapacity);
|
||||||
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList);
|
code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pInfo->pGroupCols);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -701,7 +722,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
pOperator->blocking = true;
|
pOperator->blocking = true;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION;
|
||||||
pInfo->binfo.pRes = pResultBlock;
|
pInfo->binfo.pRes = pResBlock;
|
||||||
pOperator->numOfExprs = numOfCols;
|
pOperator->numOfExprs = numOfCols;
|
||||||
pOperator->pExpr = pExprInfo;
|
pOperator->pExpr = pExprInfo;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
|
|
|
@ -1234,6 +1234,8 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
|
||||||
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
|
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
doFilter(pInfo->pCondition, pInfo->pRes);
|
||||||
|
#if 0
|
||||||
SFilterInfo* filter = NULL;
|
SFilterInfo* filter = NULL;
|
||||||
|
|
||||||
int32_t code = filterInitFromNode(pInfo->pCondition, &filter, 0);
|
int32_t code = filterInitFromNode(pInfo->pCondition, &filter, 0);
|
||||||
|
@ -1279,6 +1281,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) {
|
||||||
|
|
||||||
px->info.rows = numOfRow;
|
px->info.rows = numOfRow;
|
||||||
pInfo->pRes = px;
|
pInfo->pRes = px;
|
||||||
|
#endif
|
||||||
|
|
||||||
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
|
return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
|
||||||
}
|
}
|
||||||
|
@ -1457,6 +1460,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
||||||
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock);
|
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock);
|
||||||
doFilterResult(pInfo);
|
doFilterResult(pInfo);
|
||||||
|
|
||||||
|
blockDataDestroy(p);
|
||||||
|
|
||||||
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
|
pInfo->loadInfo.totalRows += pInfo->pRes->info.rows;
|
||||||
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
return (pInfo->pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||||
}
|
}
|
||||||
|
@ -1545,10 +1550,10 @@ int32_t buildSysDbTableInfo(const SSysTableScanInfo* pInfo, int32_t capacity) {
|
||||||
p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB);
|
p->info.rows = buildDbTableInfoBlock(p, pSysDbTableMeta, size, TSDB_PERFORMANCE_SCHEMA_DB);
|
||||||
|
|
||||||
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock);
|
relocateColumnData(pInfo->pRes, pInfo->scanCols, p->pDataBlock);
|
||||||
// blockDataDestroy(p); todo handle memory leak
|
|
||||||
|
|
||||||
pInfo->pRes->info.rows = p->info.rows;
|
pInfo->pRes->info.rows = p->info.rows;
|
||||||
return p->info.rows;
|
blockDataDestroy(p);
|
||||||
|
|
||||||
|
return pInfo->pRes->info.rows;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
|
int32_t buildDbTableInfoBlock(const SSDataBlock* p, const SSysTableMeta* pSysDbTableMeta, size_t size,
|
||||||
|
|
|
@ -420,10 +420,10 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams,
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->binfo.pRes = pResBlock;
|
|
||||||
|
|
||||||
initResultSizeInfo(pOperator, 1024);
|
initResultSizeInfo(pOperator, 1024);
|
||||||
|
|
||||||
|
pInfo->binfo.pRes = pResBlock;
|
||||||
pInfo->pSortInfo = pSortInfo;
|
pInfo->pSortInfo = pSortInfo;
|
||||||
pInfo->pColMatchInfo = pColMatchColInfo;
|
pInfo->pColMatchInfo = pColMatchColInfo;
|
||||||
pInfo->pInputBlock = pInputBlock;
|
pInfo->pInputBlock = pInputBlock;
|
||||||
|
@ -432,12 +432,17 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams,
|
||||||
pOperator->blocking = false;
|
pOperator->blocking = false;
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
|
|
||||||
pInfo->bufPageSize = rowSize < 1024 ? 1024 : rowSize * 2;
|
|
||||||
pInfo->sortBufSize = pInfo->bufPageSize * 16;
|
|
||||||
pInfo->hasGroupId = false;
|
pInfo->hasGroupId = false;
|
||||||
pInfo->prefetchedTuple = NULL;
|
pInfo->prefetchedTuple = NULL;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
|
pInfo->bufPageSize = getProperSortPageSize(rowSize);
|
||||||
|
|
||||||
|
uint32_t numOfSources = taosArrayGetSize(pSortInfo);
|
||||||
|
numOfSources = TMAX(2, numOfSources);
|
||||||
|
|
||||||
|
pInfo->sortBufSize = numOfSources * pInfo->bufPageSize;
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL,
|
createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL,
|
||||||
destroyMultiwaySortMergeOperatorInfo, NULL, NULL, getMultiwaySortMergeExplainExecInfo);
|
destroyMultiwaySortMergeOperatorInfo, NULL, NULL, getMultiwaySortMergeExplainExecInfo);
|
||||||
|
|
|
@ -333,11 +333,6 @@ void doTimeWindowInterpolation(SIntervalAggOperatorInfo* pInfo, int32_t numOfExp
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// if (functionId != FUNCTION_TWA && functionId != FUNCTION_INTERP) {
|
|
||||||
// pCtx[k].start.key = INT64_MIN;
|
|
||||||
// continue;
|
|
||||||
// }
|
|
||||||
|
|
||||||
SFunctParam* pParam = &pCtx[k].param[0];
|
SFunctParam* pParam = &pCtx[k].param[0];
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);
|
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, pParam->pCol->slotId);
|
||||||
|
|
||||||
|
|
|
@ -204,6 +204,11 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
|
||||||
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
|
if (pHandle->type == SORT_SINGLESOURCE_SORT) {
|
||||||
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
|
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
|
||||||
SSortSource* pSource = cmpParam->pSources[i];
|
SSortSource* pSource = cmpParam->pSources[i];
|
||||||
|
|
||||||
|
if (taosArrayGetSize(pSource->pageIdList) == 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
|
SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex);
|
||||||
|
|
||||||
void* pPage = getBufPage(pHandle->pBuf, getPageId(pPgInfo));
|
void* pPage = getBufPage(pHandle->pBuf, getPageId(pPgInfo));
|
||||||
|
@ -532,6 +537,19 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t getProperSortPageSize(size_t rowSize) {
|
||||||
|
uint32_t defaultPageSize = 4096;
|
||||||
|
|
||||||
|
uint32_t pgSize = 0;
|
||||||
|
if (rowSize * 4 > defaultPageSize) {
|
||||||
|
pgSize = rowSize * 4;
|
||||||
|
} else {
|
||||||
|
pgSize = defaultPageSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pgSize;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t createInitialSources(SSortHandle* pHandle) {
|
static int32_t createInitialSources(SSortHandle* pHandle) {
|
||||||
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
||||||
|
|
||||||
|
@ -557,14 +575,9 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
||||||
|
|
||||||
if (!hasGroupId) {
|
if (!hasGroupId) {
|
||||||
// calculate the buffer pages according to the total available buffers.
|
// calculate the buffer pages according to the total available buffers.
|
||||||
int32_t rowSize = blockDataGetRowSize(pBlock);
|
pHandle->pageSize = getProperSortPageSize(blockDataGetRowSize(pBlock));
|
||||||
if (rowSize * 4 > 4096) {
|
|
||||||
pHandle->pageSize = rowSize * 4;
|
|
||||||
} else {
|
|
||||||
pHandle->pageSize = 4096;
|
|
||||||
}
|
|
||||||
|
|
||||||
// todo!!
|
// todo, number of pages are set according to the total available sort buffer
|
||||||
pHandle->numOfPages = 1024;
|
pHandle->numOfPages = 1024;
|
||||||
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
|
||||||
|
|
||||||
|
@ -577,7 +590,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
|
||||||
if (pHandle->beforeFp != NULL) {
|
if (pHandle->beforeFp != NULL) {
|
||||||
pHandle->beforeFp(pBlock, pHandle->param);
|
pHandle->beforeFp(pBlock, pHandle->param);
|
||||||
}
|
}
|
||||||
// todo relocate the columns
|
|
||||||
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock);
|
int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -945,7 +945,7 @@ TEST(testCase, build_executor_tree_Test) {
|
||||||
int32_t code = qStringToSubplan(msg, &plan);
|
int32_t code = qStringToSubplan(msg, &plan);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
code = qCreateExecTask(&handle, 2, 1, plan, (void**)&pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH);
|
code = qCreateExecTask(&handle, 2, 1, plan, (void**)&pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
}
|
}
|
||||||
#if 0
|
#if 0
|
||||||
|
|
|
@ -1,67 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef _TD_COMMON_EXPR_H_
|
|
||||||
#define _TD_COMMON_EXPR_H_
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#include "os.h"
|
|
||||||
|
|
||||||
#include "tmsg.h"
|
|
||||||
#include "taosdef.h"
|
|
||||||
#include "tskiplist.h"
|
|
||||||
#include "function.h"
|
|
||||||
|
|
||||||
struct tExprNode;
|
|
||||||
struct SSchema;
|
|
||||||
|
|
||||||
#define QUERY_COND_REL_PREFIX_IN "IN|"
|
|
||||||
#define QUERY_COND_REL_PREFIX_LIKE "LIKE|"
|
|
||||||
#define QUERY_COND_REL_PREFIX_MATCH "MATCH|"
|
|
||||||
#define QUERY_COND_REL_PREFIX_NMATCH "NMATCH|"
|
|
||||||
|
|
||||||
#define QUERY_COND_REL_PREFIX_IN_LEN 3
|
|
||||||
#define QUERY_COND_REL_PREFIX_LIKE_LEN 5
|
|
||||||
#define QUERY_COND_REL_PREFIX_MATCH_LEN 6
|
|
||||||
#define QUERY_COND_REL_PREFIX_NMATCH_LEN 7
|
|
||||||
|
|
||||||
typedef bool (*__result_filter_fn_t)(const void *, void *);
|
|
||||||
typedef void (*__do_filter_suppl_fn_t)(void *, void *);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* this structure is used to filter data in tags, so the offset of filtered tag column in tagdata string is required
|
|
||||||
*/
|
|
||||||
typedef struct tQueryInfo {
|
|
||||||
uint8_t optr; // expression operator
|
|
||||||
SSchema sch; // schema of tags
|
|
||||||
char* q;
|
|
||||||
__compar_fn_t compare; // filter function
|
|
||||||
bool indexed; // indexed columns
|
|
||||||
} tQueryInfo;
|
|
||||||
|
|
||||||
typedef struct SExprTraverseSupp {
|
|
||||||
__result_filter_fn_t nodeFilterFn;
|
|
||||||
__do_filter_suppl_fn_t setupInfoFn;
|
|
||||||
void *pExtInfo;
|
|
||||||
} SExprTraverseSupp;
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif /*_TD_COMMON_EXPR_H_*/
|
|
|
@ -17,15 +17,7 @@
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
|
||||||
#include "texception.h"
|
#include "texception.h"
|
||||||
#include "taosdef.h"
|
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
#include "tarray.h"
|
|
||||||
#include "tbuffer.h"
|
|
||||||
#include "tcompare.h"
|
|
||||||
#include "thash.h"
|
|
||||||
#include "texpr.h"
|
|
||||||
#include "tvariant.h"
|
|
||||||
#include "tdef.h"
|
|
||||||
|
|
||||||
static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *));
|
static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *));
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,7 @@ extern "C" {
|
||||||
#include "dataSinkMgt.h"
|
#include "dataSinkMgt.h"
|
||||||
|
|
||||||
int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
||||||
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain);
|
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain, const char* sql);
|
||||||
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
||||||
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
||||||
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
||||||
|
|
|
@ -308,10 +308,8 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
|
||||||
SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info};
|
SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info};
|
||||||
char * sql = strndup(msg->msg, msg->sqlLen);
|
char * sql = strndup(msg->msg, msg->sqlLen);
|
||||||
QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, sql);
|
QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, sql);
|
||||||
taosMemoryFreeClear(sql);
|
|
||||||
|
|
||||||
QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType, msg->explain));
|
|
||||||
|
|
||||||
|
QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType, msg->explain, sql));
|
||||||
QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
|
QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -510,7 +510,7 @@ _return:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) {
|
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain, const char* sql) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
bool queryRsped = false;
|
bool queryRsped = false;
|
||||||
SSubplan *plan = NULL;
|
SSubplan *plan = NULL;
|
||||||
|
@ -537,7 +537,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
|
||||||
|
|
||||||
ctx->plan = plan;
|
ctx->plan = plan;
|
||||||
|
|
||||||
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH);
|
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, sql, OPTR_EXEC_MODEL_BATCH);
|
||||||
if (code) {
|
if (code) {
|
||||||
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
|
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
|
||||||
QW_ERR_JRET(code);
|
QW_ERR_JRET(code);
|
||||||
|
@ -938,7 +938,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SRpcMsg *pRsp, SDeleteRes
|
||||||
|
|
||||||
ctx.plan = plan;
|
ctx.plan = plan;
|
||||||
|
|
||||||
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, OPTR_EXEC_MODEL_BATCH);
|
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, NULL, OPTR_EXEC_MODEL_BATCH);
|
||||||
if (code) {
|
if (code) {
|
||||||
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
|
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));
|
||||||
QW_ERR_JRET(code);
|
QW_ERR_JRET(code);
|
||||||
|
|
|
@ -1488,13 +1488,15 @@ int32_t schExecJobImpl(SSchedulerReq *pReq, int64_t *job, SQueryResult* pRes, bo
|
||||||
qDebug("QID:0x%" PRIx64 " job refId 0x%"PRIx64 " started", pReq->pDag->queryId, pJob->refId);
|
qDebug("QID:0x%" PRIx64 " job refId 0x%"PRIx64 " started", pReq->pDag->queryId, pJob->refId);
|
||||||
*job = pJob->refId;
|
*job = pJob->refId;
|
||||||
|
|
||||||
|
if (!sync) {
|
||||||
|
pJob->userCb = SCH_EXEC_CB;
|
||||||
|
}
|
||||||
|
|
||||||
SCH_ERR_JRET(schLaunchJob(pJob));
|
SCH_ERR_JRET(schLaunchJob(pJob));
|
||||||
|
|
||||||
if (sync) {
|
if (sync) {
|
||||||
SCH_JOB_DLOG("will wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
|
SCH_JOB_DLOG("will wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
|
||||||
tsem_wait(&pJob->rspSem);
|
tsem_wait(&pJob->rspSem);
|
||||||
} else {
|
|
||||||
pJob->userCb = SCH_EXEC_CB;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%"PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId);
|
SCH_JOB_DLOG("job exec done, job status:%s, jobId:0x%"PRIx64, SCH_GET_JOB_STATUS_STR(pJob), pJob->refId);
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
Subproject commit 3d5aa76f8c718dcffa100b45e4cbf313d499c356
|
Subproject commit 0a81480420d6601bbdb57770ee64e40f24c4ea83
|
Loading…
Reference in New Issue