diff --git a/example/src/tstream.c b/example/src/tstream.c index 8ffa932bd2..766368475e 100644 --- a/example/src/tstream.c +++ b/example/src/tstream.c @@ -78,11 +78,14 @@ int32_t create_stream() { taos_free_result(pRes); /*const char* sql = "select min(k), max(k), sum(k) from tu1";*/ - const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1"; + /*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/ /*const char* sql = "select sum(k) from tu1 interval(10m)";*/ - pRes = tmq_create_stream(pConn, "stream1", "out1", sql); + /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/ + pRes = taos_query( + pConn, + "create stream stream1 trigger window_close as select min(k), max(k), sum(k) as sum_of_k from tu1 interval(10m)"); if (taos_errno(pRes) != 0) { - printf("failed to create stream out1, reason:%s\n", taos_errstr(pRes)); + printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); diff --git a/include/client/taos.h b/include/client/taos.h index 1ac92c61a5..19c008bb09 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -86,9 +86,9 @@ typedef struct taosField { } TAOS_FIELD; #ifdef WINDOWS - #define DLL_EXPORT __declspec(dllexport) +#define DLL_EXPORT __declspec(dllexport) #else - #define DLL_EXPORT +#define DLL_EXPORT #endif typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code); @@ -123,42 +123,42 @@ DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...); DLL_EXPORT setConfRet taos_set_config(const char *config); DLL_EXPORT int taos_init(void); DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port); -DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, +DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port); -DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port); -DLL_EXPORT void taos_close(TAOS *taos); +DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port); +DLL_EXPORT void taos_close(TAOS *taos); -const char *taos_data_type(int type); +const char *taos_data_type(int type); -DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos); -DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length); -DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_MULTI_BIND *tags); -DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name); -DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name); +DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos); +DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length); +DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_MULTI_BIND *tags); +DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name); +DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name); -DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert); -DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums); -DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes); -DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind); -DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind); -DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx); -DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt); -DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt); -DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt); -DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt); -DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt); -DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt); -DLL_EXPORT int taos_stmt_affected_rows_once(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert); +DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums); +DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes); +DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind); +DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind); +DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx); +DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt); +DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt); +DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_affected_rows_once(TAOS_STMT *stmt); -DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql); -DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen); +DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql); +DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen); -DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res); -DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result -DLL_EXPORT void taos_free_result(TAOS_RES *res); -DLL_EXPORT int taos_field_count(TAOS_RES *res); -DLL_EXPORT int taos_num_fields(TAOS_RES *res); -DLL_EXPORT int taos_affected_rows(TAOS_RES *res); +DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res); +DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result +DLL_EXPORT void taos_free_result(TAOS_RES *res); +DLL_EXPORT int taos_field_count(TAOS_RES *res); +DLL_EXPORT int taos_num_fields(TAOS_RES *res); +DLL_EXPORT int taos_affected_rows(TAOS_RES *res); DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res); DLL_EXPORT int taos_select_db(TAOS *taos, const char *db); @@ -271,10 +271,8 @@ DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message); /* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */ #if 0 DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen); -#endif - DLL_EXPORT TAOS_RES *tmq_create_stream(TAOS *taos, const char *streamName, const char *tbName, const char *sql); - +#endif /* ------------------------------ TMQ END -------------------------------- */ #if 1 // Shuduo: temporary enable for app build typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index eadc901389..2e5574511b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -35,19 +35,6 @@ enum { STREAM_CREATED_BY__SMA, }; -#if 0 -// pipe -> fetch/pipe queue -// merge -> merge queue -// write -> write queue -enum { - TASK_DISPATCH_MSG__SND_PIPE = 1, - TASK_DISPATCH_MSG__SND_MERGE, - TASK_DISPATCH_MSG__VND_PIPE, - TASK_DISPATCH_MSG__VND_MERGE, - TASK_DISPATCH_MSG__VND_WRITE, -}; -#endif - typedef struct { int32_t nodeId; // 0 for snode SEpSet epSet; @@ -100,10 +87,6 @@ typedef struct { int8_t reserved; } STaskSinkFetch; -typedef struct { - int8_t reserved; -} STaskSinkShow; - enum { TASK_SOURCE__SCAN = 1, TASK_SOURCE__PIPE, @@ -128,7 +111,6 @@ enum { TASK_SINK__TABLE, TASK_SINK__SMA, TASK_SINK__FETCH, - TASK_SINK__SHOW, }; typedef struct { @@ -155,7 +137,6 @@ typedef struct { STaskSinkTb tbSink; STaskSinkSma smaSink; STaskSinkFetch fetchSink; - STaskSinkShow showSink; }; // dispatch diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index a093b7449b..93a20c3d45 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -667,7 +667,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { if (code != 0) goto FAIL; while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) { - tscDebug("not ready, retry"); + tscDebug("consumer not ready, retry"); taosMsleep(500); } @@ -693,6 +693,7 @@ void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { conf->commitCb = cb; } +#if 0 TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) { STscObj* pTscObj = (STscObj*)taos; SRequestObj* pRequest = NULL; @@ -777,6 +778,7 @@ _return: return pRequest; } +#endif #if 0 int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) { diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 54f411d71f..03ac74aa62 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -418,6 +418,9 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { if (tEncodeI32(pEncoder, pObj->version) < 0) return -1; if (tEncodeI8(pEncoder, pObj->status) < 0) return -1; if (tEncodeI8(pEncoder, pObj->createdBy) < 0) return -1; + if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1; + if (tEncodeI32(pEncoder, pObj->triggerParam) < 0) return -1; + if (tEncodeI64(pEncoder, pObj->waterMark) < 0) return -1; if (tEncodeI32(pEncoder, pObj->fixedSinkVgId) < 0) return -1; if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1; if (tEncodeCStr(pEncoder, pObj->sql) < 0) return -1; @@ -464,6 +467,9 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->createdBy) < 0) return -1; + if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1; + if (tDecodeI32(pDecoder, &pObj->triggerParam) < 0) return -1; + if (tDecodeI64(pDecoder, &pObj->waterMark) < 0) return -1; if (tDecodeI32(pDecoder, &pObj->fixedSinkVgId) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pObj->sql) < 0) return -1; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index db5c725f5e..5c6e2ce771 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -308,6 +308,8 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe streamObj.smaId = 0; /*streamObj.physicalPlan = "";*/ streamObj.logicalPlan = "not implemented"; + streamObj.trigger = pCreate->triggerType; + streamObj.waterMark = pCreate->watermark; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg); if (pTrans == NULL) { @@ -431,7 +433,7 @@ static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *p SStreamObj *pStream = NULL; while (numOfRows < rows) { - pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pStream); + pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream); if (pShow->pIter == NULL) break; SColumnInfoData *pColInfo; @@ -471,8 +473,13 @@ static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *p pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&pStream->trigger, false); + + numOfRows++; + sdbRelease(pSdb, pStream); } - return 0; + + pShow->numOfRows += numOfRows; + return numOfRows; } static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) { diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 46a39e72f9..5df7b97f2a 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -159,7 +159,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo // decode pBuf = pVal; - pSW = taosMemoryMalloc(sizeof(pSW)); + pSW = taosMemoryMalloc(sizeof(SSchemaWrapper)); tCoderInit(&coder, TD_LITTLE_ENDIAN, pVal, vLen, TD_DECODER); tDecodeSSchemaWrapper(&coder, pSW); @@ -436,4 +436,4 @@ void *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid, bool isDecode) { return NULL; } -#endif \ No newline at end of file +#endif diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7cee82f660..a39d959b36 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -864,7 +864,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { } int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { - SStreamTask* pTask = taosMemoryMalloc(sizeof(SStreamTask)); + SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { return -1; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 943a016c27..202a77900a 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -227,8 +227,8 @@ int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) { } SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn, - __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode, - __optr_decode_fn_t decode, __optr_get_explain_fn_t explain) { + __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode, + __optr_decode_fn_t decode, __optr_get_explain_fn_t explain) { SOperatorFpSet fpSet = { ._openFn = openFn, .getNextFn = nextFn, @@ -441,8 +441,8 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, * +----------+---------------+ */ static SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, int64_t uid, - char* pData, int16_t bytes, bool masterscan, uint64_t groupId, - SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup) { + char* pData, int16_t bytes, bool masterscan, uint64_t groupId, + SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup) { SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId); SResultRowPosition* p1 = @@ -463,9 +463,9 @@ static SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowI } } - // 1. close current opened time window + // 1. close current opened time window if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId && - pResult->offset != pResultRowInfo->cur.offset))) { + pResult->offset != pResultRowInfo->cur.offset))) { // todo extract function SResultRowPosition pos = pResultRowInfo->cur; SFilePage* pPage = getBufPage(pResultBuf, pos.pageId); @@ -482,7 +482,8 @@ static SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowI // add a new result set for a new group SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; - taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, sizeof(SResultRowPosition)); + taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, + sizeof(SResultRowPosition)); } // 2. set the new time window to be the new active time window @@ -646,7 +647,7 @@ static int32_t setResultOutputBufByKey_rv(SResultRowInfo* pResultRowInfo, int64_ SExecTaskInfo* pTaskInfo) { assert(win->skey <= win->ekey); SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, id, (char*)&win->skey, - TSDB_KEYSIZE, masterscan, tableGroupId, pTaskInfo, true, pAggSup); + TSDB_KEYSIZE, masterscan, tableGroupId, pTaskInfo, true, pAggSup); if (pResultRow == NULL) { *pResult = NULL; @@ -1034,8 +1035,8 @@ void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlo } } -static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, - int32_t paramIndex, int32_t numOfRows) { +static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex, + int32_t numOfRows) { SColumnInfoData* pColInfo = NULL; if (pInput->pData[paramIndex] == NULL) { pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData)); @@ -1066,9 +1067,9 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc colDataAppendDouble(pColInfo, i, &v); } } else if (type == TSDB_DATA_TYPE_VARCHAR) { - char *tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE); + char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE); STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen); - for(int32_t i = 0; i < numOfRows; ++i) { + for (int32_t i = 0; i < numOfRows; ++i) { colDataAppend(pColInfo, i, tmp, false); } } @@ -1081,9 +1082,9 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt int32_t code = TSDB_CODE_SUCCESS; for (int32_t i = 0; i < pOperator->numOfOutput; ++i) { - pCtx[i].order = order; - pCtx[i].size = pBlock->info.rows; - pCtx[i].pSrcBlock = pBlock; + pCtx[i].order = order; + pCtx[i].size = pBlock->info.rows; + pCtx[i].pSrcBlock = pBlock; pCtx[i].currentStage = MAIN_SCAN; SInputColumnInfoData* pInput = &pCtx[i].input; @@ -1421,7 +1422,7 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc if (!done) { // it is not interpolated, now start to generated the interpolated value int32_t startRowIndex = startPos; bool interp = setTimeWindowInterpolationStartTs(pOperatorInfo, pCtx, startRowIndex, pBlock->info.rows, - pBlock->pDataBlock, tsCols, win); + pBlock->pDataBlock, tsCols, win); if (interp) { setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); } @@ -1482,8 +1483,8 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe SResultRow* pResult = NULL; int32_t ret = setResultOutputBufByKey_rv(pResultRowInfo, pSDataBlock->info.uid, &win, masterScan, &pResult, - tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, - &pInfo->aggSup, pTaskInfo); + tableGroupId, pInfo->binfo.pCtx, numOfOutput, pInfo->binfo.rowCellInfoOffset, + &pInfo->aggSup, pTaskInfo); if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -1491,8 +1492,8 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); pos->groupId = tableGroupId; - pos->pos = (SResultRowPosition) {.pageId = pResult->pageId, .offset = pResult->offset}; - *(int64_t*) pos->key = pResult->win.skey; + pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; + *(int64_t*)pos->key = pResult->win.skey; taosArrayPush(pUpdated, &pos); } @@ -1569,8 +1570,8 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t)); pos->groupId = tableGroupId; - pos->pos = (SResultRowPosition) {.pageId = pResult->pageId, .offset = pResult->offset}; - *(int64_t*) pos->key = pResult->win.skey; + pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; + *(int64_t*)pos->key = pResult->win.skey; taosArrayPush(pUpdated, &pos); } @@ -1706,7 +1707,7 @@ int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* SqlFunctionCtx* pCtx = binfo->pCtx; SResultRow* pResultRow = doSetResultOutBufByKey(pBuf, pResultRowInfo, groupId, (char*)pData, bytes, true, groupId, - pTaskInfo, false, pAggSup); + pTaskInfo, false, pAggSup); assert(pResultRow != NULL); setResultRowKey(pResultRow, pData, type); @@ -1890,7 +1891,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, pCtx->functionId = -1; pCtx->curBufPage = -1; - pCtx->pExpr = pExpr; + pCtx->pExpr = pExpr; if (pExpr->pExpr->nodeType == QUERY_NODE_FUNCTION) { SFuncExecEnv env = {0}; @@ -1926,9 +1927,9 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, pCtx->pTsOutput = NULL; pCtx->resDataInfo.bytes = pFunct->resSchema.bytes; pCtx->resDataInfo.type = pFunct->resSchema.type; - pCtx->order = TSDB_ORDER_ASC; + pCtx->order = TSDB_ORDER_ASC; pCtx->start.key = INT64_MIN; - pCtx->end.key = INT64_MIN; + pCtx->end.key = INT64_MIN; pCtx->numOfParams = pExpr->base.numOfParams; pCtx->param = pFunct->pParam; @@ -2719,7 +2720,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t int64_t tid = 0; int64_t groupId = 0; SResultRow* pRow = doSetResultOutBufByKey(pSup->pResultBuf, pResultRowInfo, tid, (char*)&tid, sizeof(tid), true, - groupId, pTaskInfo, false, pSup); + groupId, pTaskInfo, false, pSup); for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { struct SResultRowEntryInfo* pEntry = getResultCell(pRow, i, rowCellInfoOffset); @@ -2866,24 +2867,24 @@ void finalizeUpdatedResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbased size_t num = taosArrayGetSize(pUpdateList); for (int32_t i = 0; i < num; ++i) { - SResKeyPos * pPos = taosArrayGetP(pUpdateList, i); + SResKeyPos* pPos = taosArrayGetP(pUpdateList, i); SFilePage* bufPage = getBufPage(pBuf, pPos->pos.pageId); SResultRow* pRow = (SResultRow*)((char*)bufPage + pPos->pos.offset); -// + // for (int32_t j = 0; j < numOfOutput; ++j) { pCtx[j].resultInfo = getResultCell(pRow, j, rowCellInfoOffset); -// + // struct SResultRowEntryInfo* pResInfo = pCtx[j].resultInfo; -// if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) { -// continue; -// } -// -// if (pCtx[j].fpSet.process) { // TODO set the dummy function. -//// pCtx[j].fpSet.finalize(&pCtx[j]); -// pResInfo->initialized = true; -// } -// + // if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) { + // continue; + // } + // + // if (pCtx[j].fpSet.process) { // TODO set the dummy function. + //// pCtx[j].fpSet.finalize(&pCtx[j]); + // pResInfo->initialized = true; + // } + // if (pRow->numOfRows < pResInfo->numOfRes) { pRow->numOfRows = pResInfo->numOfRes; } @@ -3005,9 +3006,8 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, u SqlFunctionCtx* pCtx = pAggInfo->binfo.pCtx; int32_t* rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset; - SResultRow* pResultRow = - doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, uid, (char*)&groupId, sizeof(groupId), - true, groupId, pTaskInfo, false, &pAggInfo->aggSup); + SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, uid, (char*)&groupId, + sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup); assert(pResultRow != NULL); /* @@ -3105,8 +3105,8 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbased } for (int32_t i = start; (i < numOfRows) && (i >= 0); i += step) { - SResKeyPos *pPos = taosArrayGetP(pGroupResInfo->pRows, i); - SFilePage* page = getBufPage(pBuf, pPos->pos.pageId); + SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i); + SFilePage* page = getBufPage(pBuf, pPos->pos.pageId); SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset); if (pRow->numOfRows == 0) { @@ -3751,7 +3751,7 @@ static void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, ASSERT(numOfSrcCols >= pBlock->info.numOfCols); int32_t i = 0, j = 0; - while(i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) { + while (i < numOfSrcCols && j < taosArrayGetSize(pColMatchInfo)) { SColumnInfoData* p = taosArrayGet(pCols, i); SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, j); if (!pmInfo->output) { @@ -3777,10 +3777,10 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI blockDataEnsureCapacity(pRes, numOfRows); if (pColList == NULL) { // data from other sources - int32_t dataLen = *(int32_t*) pData; + int32_t dataLen = *(int32_t*)pData; pData += sizeof(int32_t); - pRes->info.groupId = *(uint64_t*) pData; + pRes->info.groupId = *(uint64_t*)pData; pData += sizeof(uint64_t); int32_t* colLen = (int32_t*)pData; @@ -3810,8 +3810,8 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI memcpy(pColInfoData->pData, pStart, colLen[i]); } - //TODO setting this flag to true temporarily so aggregate function on stable will - //examine NULL value for non-primary key column + // TODO setting this flag to true temporarily so aggregate function on stable will + // examine NULL value for non-primary key column pColInfoData->hasNull = true; pStart += colLen[i]; } @@ -3847,11 +3847,11 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI blockDataEnsureCapacity(&block, numOfRows); - int32_t dataLen = *(int32_t*) pStart; - uint64_t groupId = *(uint64_t*) (pStart + sizeof(int32_t)); + int32_t dataLen = *(int32_t*)pStart; + uint64_t groupId = *(uint64_t*)(pStart + sizeof(int32_t)); pStart += sizeof(int32_t) + sizeof(uint64_t); - int32_t* colLen = (int32_t*) (pStart); + int32_t* colLen = (int32_t*)(pStart); pStart += sizeof(int32_t) * numOfCols; for (int32_t i = 0; i < numOfCols; ++i) { @@ -3877,7 +3877,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI } // data from mnode - relocateColumnData(pRes, pColList, block.pDataBlock); + relocateColumnData(pRes, pColList, block.pDataBlock); } pRes->info.rows = numOfRows; @@ -4226,8 +4226,8 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock pOperator->numOfOutput = pBlock->info.numOfCols; pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, destroyExchangeOperatorInfo, - NULL, NULL, NULL); + pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, + destroyExchangeOperatorInfo, NULL, NULL, NULL); #if 1 { // todo refactor @@ -4716,8 +4716,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSort, NULL, NULL, destroyOrderOperatorInfo, - NULL, NULL, NULL); + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, NULL); int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -4933,8 +4933,8 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi initResultRow(resultRow); prepareResultListBuffer(&pInfo->resultRowInfo, pOperator->pTaskInfo->env); // pInfo->resultRowInfo.cur = pInfo->resultRowInfo.size; -// pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] = -// (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset}; + // pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] = + // (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset}; pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset}; } @@ -4946,13 +4946,13 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi enum { PROJECT_RETRIEVE_CONTINUE = 0x1, - PROJECT_RETRIEVE_DONE = 0x2, + PROJECT_RETRIEVE_DONE = 0x2, }; static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock) { SProjectOperatorInfo* pProjectInfo = pOperator->info; SOptrBasicInfo* pInfo = &pProjectInfo->binfo; - SSDataBlock* pRes = pInfo->pRes; + SSDataBlock* pRes = pInfo->pRes; if (pProjectInfo->curSOffset > 0) { if (pProjectInfo->groupId == 0) { // it is the first group @@ -5015,9 +5015,10 @@ static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock) // todo optimize performance // If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the // they may not belong to the same group the limit/offset value is not valid in this case. - if (pRes->info.rows >= pOperator->resultInfo.threshold || pProjectInfo->slimit.offset != -1 || pProjectInfo->slimit.limit != -1) { + if (pRes->info.rows >= pOperator->resultInfo.threshold || pProjectInfo->slimit.offset != -1 || + pProjectInfo->slimit.limit != -1) { return PROJECT_RETRIEVE_DONE; - } else { // not full enough, continue to accumulate the output data in the buffer. + } else { // not full enough, continue to accumulate the output data in the buffer. return PROJECT_RETRIEVE_CONTINUE; } } @@ -5101,7 +5102,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) int32_t status = handleLimitOffset(pOperator, pBlock); if (status == PROJECT_RETRIEVE_CONTINUE) { continue; - } else if (status == PROJECT_RETRIEVE_DONE) { + } else if (status == PROJECT_RETRIEVE_DONE) { break; } } @@ -5291,7 +5292,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator, bool* newgroup) { setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); // finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfOutput); -// initGroupedResultInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo); + // initGroupedResultInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo); // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes); if (pSliceInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pSliceInfo->groupResInfo)) { @@ -5342,7 +5343,7 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); -// initGroupedResultInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); + // initGroupedResultInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); OPTR_SET_OPENED(pOperator); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); @@ -5684,8 +5685,8 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n pAggSup->resultRowSize = getResultRowSize(pCtx, numOfOutput); pAggSup->keyBuf = taosMemoryCalloc(1, keyBufSize + POINTER_BYTES + sizeof(int64_t)); pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK); -// pAggSup->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK); -// pAggSup->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell)); + // pAggSup->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK); + // pAggSup->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell)); if (pAggSup->keyBuf == NULL /*|| pAggSup->pResultRowArrayList == NULL || pAggSup->pResultRowListSet == NULL*/ || pAggSup->pResultRowHashTable == NULL) { @@ -5703,8 +5704,8 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx* pCtx, int32_t n static void cleanupAggSup(SAggSupporter* pAggSup) { taosMemoryFreeClear(pAggSup->keyBuf); taosHashCleanup(pAggSup->pResultRowHashTable); -// taosHashCleanup(pAggSup->pResultRowListSet); -// taosArrayDestroy(pAggSup->pResultRowArrayList); + // taosHashCleanup(pAggSup->pResultRowListSet); + // taosArrayDestroy(pAggSup->pResultRowArrayList); destroyDiskbasedBuf(pAggSup->pResultBuf); } @@ -5715,7 +5716,7 @@ int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInf doInitAggInfoSup(pAggSup, pBasicInfo->pCtx, numOfCols, keyBufSize, pkey); - for(int32_t i = 0; i < numOfCols; ++i) { + for (int32_t i = 0; i < numOfCols; ++i) { pBasicInfo->pCtx[i].pBuf = pAggSup->pResultBuf; } @@ -5732,6 +5733,10 @@ void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) { } static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInfo) { + if (pTableGroupInfo->numOfTables == 0) { + return NULL; + } + STableQueryInfo* pTableQueryInfo = taosMemoryCalloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo)); if (pTableQueryInfo == NULL) { return NULL; @@ -5757,7 +5762,8 @@ static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInf SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo, - int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { + int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, + const STableGroupInfo* pTableGroupInfo) { SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -5771,7 +5777,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, keyBufSize, pTaskInfo->id.str); pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); - if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) { + if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -5926,8 +5932,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p pOperator->pExpr = pExprInfo; pOperator->numOfOutput = num; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL, destroyProjectOperatorInfo, - NULL, NULL, NULL); + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL, + destroyProjectOperatorInfo, NULL, NULL, NULL); pOperator->pTaskInfo = pTaskInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); @@ -5952,12 +5958,12 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* goto _error; } - pInfo->order = TSDB_ORDER_ASC; - pInfo->interval = *pInterval; -// pInfo->execModel = OPTR_EXEC_MODEL_STREAM; - pInfo->execModel = pTaskInfo->execModel; - pInfo->win = pTaskInfo->window; - pInfo->twAggSup = *pTwAggSupp; + pInfo->order = TSDB_ORDER_ASC; + pInfo->interval = *pInterval; + // pInfo->execModel = OPTR_EXEC_MODEL_STREAM; + pInfo->execModel = pTaskInfo->execModel; + pInfo->win = pTaskInfo->window; + pInfo->twAggSup = *pTwAggSupp; pInfo->primaryTsIndex = primaryTsSlotId; int32_t numOfRows = 4096; @@ -5984,8 +5990,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; - pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL, destroyIntervalOperatorInfo, - aggEncodeResultRow, aggDecodeResultRow, NULL); + pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL, + destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -6004,18 +6010,19 @@ _error: SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, - STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { + STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo, + SExecTaskInfo* pTaskInfo) { STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->order = TSDB_ORDER_ASC; - pInfo->interval = *pInterval; - pInfo->execModel = OPTR_EXEC_MODEL_STREAM; - pInfo->win = pTaskInfo->window; - pInfo->twAggSup = *pTwAggSupp; + pInfo->order = TSDB_ORDER_ASC; + pInfo->interval = *pInterval; + pInfo->execModel = OPTR_EXEC_MODEL_STREAM; + pInfo->win = pTaskInfo->window; + pInfo->twAggSup = *pTwAggSupp; pInfo->primaryTsIndex = primaryTsSlotId; int32_t numOfRows = 4096; @@ -6042,8 +6049,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; - pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doStreamIntervalAgg, doStreamIntervalAgg, NULL, destroyIntervalOperatorInfo, - aggEncodeResultRow, aggDecodeResultRow, NULL); + pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doStreamIntervalAgg, doStreamIntervalAgg, NULL, + destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -6052,13 +6059,12 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr return pOperator; - _error: +_error: destroyIntervalOperatorInfo(pInfo, numOfCols); taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; - } SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, @@ -6122,8 +6128,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf pOperator->pTaskInfo = pTaskInfo; pOperator->info = pInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStateWindowAgg, NULL, NULL, destroyStateWindowOperatorInfo, - aggEncodeResultRow, aggDecodeResultRow, NULL); + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStateWindowAgg, NULL, NULL, + destroyStateWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -6168,8 +6174,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL, destroySWindowOperatorInfo, - aggEncodeResultRow, aggDecodeResultRow, NULL); + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL, + destroySWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); pOperator->pTaskInfo = pTaskInfo; code = appendDownstream(pOperator, &downstream, 1); @@ -6257,8 +6263,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, - NULL, NULL, NULL); + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL); pOperator->pTaskInfo = pTaskInfo; code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -6269,7 +6275,6 @@ _error: return NULL; } - static int32_t getColumnIndexInSource(SQueriedTableInfo* pTableInfo, SExprBasicInfo* pExpr, SColumnInfo* pTagCols) { int32_t j = 0; @@ -6467,11 +6472,11 @@ static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableSc static SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) { SInterval interval = { - .interval = pTableScanNode->interval, - .sliding = pTableScanNode->sliding, + .interval = pTableScanNode->interval, + .sliding = pTableScanNode->sliding, .intervalUnit = pTableScanNode->intervalUnit, - .slidingUnit = pTableScanNode->slidingUnit, - .offset = pTableScanNode->offset, + .slidingUnit = pTableScanNode->slidingUnit, + .offset = pTableScanNode->offset, }; return interval; @@ -6486,7 +6491,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; - int32_t numOfCols = 0; + int32_t numOfCols = 0; tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId); if (pDataReader == NULL && terrno != 0) { return NULL; @@ -6497,14 +6502,15 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc); SQueryTableDataCond cond = {0}; - int32_t code = initQueryTableDataCond(&cond, pTableScanNode); + int32_t code = initQueryTableDataCond(&cond, pTableScanNode); if (code != TSDB_CODE_SUCCESS) { return NULL; } SInterval interval = extractIntervalInfo(pTableScanNode); - return createTableScanOperatorInfo(pDataReader, &cond, numOfCols, pTableScanNode->dataRequired, pTableScanNode->scanSeq, pColList, - pResBlock, pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo); + return createTableScanOperatorInfo(pDataReader, &cond, numOfCols, pTableScanNode->dataRequired, + pTableScanNode->scanSeq, pColList, pResBlock, pScanPhyNode->node.pConditions, + &interval, pTableScanNode->ratio, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc); @@ -6512,27 +6518,30 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. - int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId); + int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, + queryId, taskId); SArray* tableIdList = extractTableIdList(pTableGroupInfo); SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc); int32_t numOfCols = 0; SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); - SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, pCols, tableIdList, pTaskInfo); + SOperatorInfo* pOperator = + createStreamScanOperatorInfo(pHandle->reader, pResBlock, pCols, tableIdList, pTaskInfo); taosArrayDestroy(tableIdList); return pOperator; } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode; - SScanPhysiNode* pScanNode = &pSysScanPhyNode->scan; + SScanPhysiNode* pScanNode = &pSysScanPhyNode->scan; SSDataBlock* pResBlock = createResDataBlock(pScanNode->node.pOutputDataBlockDesc); int32_t numOfOutputCols = 0; - SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pScanNode->node.pOutputDataBlockDesc, &numOfOutputCols); + SArray* colList = + extractColMatchInfo(pScanNode->pScanCols, pScanNode->node.pOutputDataBlockDesc, &numOfOutputCols); SOperatorInfo* pOperator = createSysTableScanOperatorInfo( - pHandle, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, - colList, pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId); + pHandle, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, colList, + pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId); return pOperator; } else { ASSERT(0); @@ -6649,8 +6658,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return pOptr; } - -static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode) { +static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode) { pCond->loadExternalRows = false; pCond->order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; @@ -6664,7 +6672,7 @@ static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableS pCond->twindow = pTableScanNode->scanRange; #if 1 - //todo work around a problem, remove it later + // todo work around a problem, remove it later if ((pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey > pCond->twindow.ekey) || (pCond->order == TSDB_ORDER_DESC && pCond->twindow.skey < pCond->twindow.ekey)) { TSWAP(pCond->twindow.skey, pCond->twindow.ekey); @@ -6708,22 +6716,22 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { // todo extract method SColumn c = {0}; - c.slotId = pColNode->slotId; - c.colId = pColNode->colId; - c.type = pColNode->node.resType.type; - c.bytes = pColNode->node.resType.bytes; - c.scale = pColNode->node.resType.scale; + c.slotId = pColNode->slotId; + c.colId = pColNode->colId; + c.type = pColNode->node.resType.type; + c.bytes = pColNode->node.resType.bytes; + c.scale = pColNode->node.resType.scale; c.precision = pColNode->node.resType.precision; taosArrayPush(pList, &c); } else if (nodeType(pNode->pExpr) == QUERY_NODE_VALUE) { - SValueNode* pValNode = (SValueNode*) pNode->pExpr; - SColumn c = {0}; + SValueNode* pValNode = (SValueNode*)pNode->pExpr; + SColumn c = {0}; c.slotId = pNode->slotId; - c.colId = pNode->slotId; - c.type = pValNode->node.type; - c.bytes = pValNode->node.resType.bytes; - c.scale = pValNode->node.resType.scale; + c.colId = pNode->slotId; + c.type = pValNode->node.type; + c.bytes = pValNode->node.resType.bytes; + c.scale = pValNode->node.resType.scale; c.precision = pValNode->node.resType.precision; taosArrayPush(pList, &c); @@ -6925,7 +6933,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead goto _complete; } - (*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoGroupInfo); + (*pTaskInfo)->pRoot = + createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoGroupInfo); if (NULL == (*pTaskInfo)->pRoot) { code = terrno; goto _complete; @@ -7263,8 +7272,8 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyBasicOperatorInfo, - NULL, NULL, NULL); + pOperator->fpSet = + createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyBasicOperatorInfo, NULL, NULL, NULL); int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream); return pOperator;