From 476ca4b9025cbca80fe6a176124c8c8603feb75c Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 7 Apr 2022 10:58:09 +0800 Subject: [PATCH 1/6] delete created shms --- source/os/src/osShm.c | 27 +++++++++++++++++++++++---- tests/script/jenkins/basic.txt | 6 +++--- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/source/os/src/osShm.c b/source/os/src/osShm.c index 35aec30f39..3b75e33d7c 100644 --- a/source/os/src/osShm.c +++ b/source/os/src/osShm.c @@ -17,14 +17,27 @@ #define _DEFAULT_SOURCE #include "os.h" +#define MAX_SHMIDS 6 + +static int32_t shmids[MAX_SHMIDS] = {0}; + +static void taosDeleteCreatedShms() { + for (int32_t i = 0; i < MAX_SHMIDS; ++i) { + int32_t shmid = shmids[i] - 1; + if (shmid >= 0) { + shmctl(shmid, IPC_RMID, NULL); + } + } +} + int32_t taosCreateShm(SShm* pShm, int32_t key, int32_t shmsize) { pShm->id = -1; - // key_t shkey = IPC_PRIVATE; - // int32_t __shmflag = IPC_CREAT | IPC_EXCL | 0600; + key_t __shkey = IPC_PRIVATE; + int32_t __shmflag = IPC_CREAT | IPC_EXCL | 0600; - key_t __shkey = 0X95270000 + key; - int32_t __shmflag = IPC_CREAT | 0600; + // key_t __shkey = 0X95270000 + key; + // int32_t __shmflag = IPC_CREAT | 0600; int32_t shmid = shmget(__shkey, shmsize, __shmflag); if (shmid < 0) { @@ -39,6 +52,12 @@ int32_t taosCreateShm(SShm* pShm, int32_t key, int32_t shmsize) { pShm->id = shmid; pShm->size = shmsize; pShm->ptr = shmptr; + + if (key >= 0 && key < MAX_SHMIDS) { + shmids[key] = pShm->id + 1; + } + atexit(taosDeleteCreatedShms); + return 0; } diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 4e04543fd8..5a3ee003f0 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -55,8 +55,8 @@ # --- for multi process mode -# ./test.sh -f tsim/user/basic1.sim -m -# ./test.sh -f tsim/stable/vnode3.sim -m -# ./test.sh -f tsim/tmq/basic.sim -m +./test.sh -f tsim/user/basic1.sim -m +./test.sh -f tsim/stable/vnode3.sim -m +./test.sh -f tsim/tmq/basic.sim -m #======================b1-end=============== From 6318cc7a1a95881141e472e2bcdd1e558c292a22 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 7 Apr 2022 13:53:19 +0800 Subject: [PATCH 2/6] remove shm --- source/os/src/osShm.c | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/source/os/src/osShm.c b/source/os/src/osShm.c index 3b75e33d7c..b276b48d0e 100644 --- a/source/os/src/osShm.c +++ b/source/os/src/osShm.c @@ -33,11 +33,13 @@ static void taosDeleteCreatedShms() { int32_t taosCreateShm(SShm* pShm, int32_t key, int32_t shmsize) { pShm->id = -1; +#if 1 key_t __shkey = IPC_PRIVATE; int32_t __shmflag = IPC_CREAT | IPC_EXCL | 0600; - - // key_t __shkey = 0X95270000 + key; - // int32_t __shmflag = IPC_CREAT | 0600; +#else + key_t __shkey = 0X95270000 + key; + int32_t __shmflag = IPC_CREAT | 0600; +#endif int32_t shmid = shmget(__shkey, shmsize, __shmflag); if (shmid < 0) { @@ -53,10 +55,14 @@ int32_t taosCreateShm(SShm* pShm, int32_t key, int32_t shmsize) { pShm->size = shmsize; pShm->ptr = shmptr; +#if 0 if (key >= 0 && key < MAX_SHMIDS) { shmids[key] = pShm->id + 1; } atexit(taosDeleteCreatedShms); +#else + shmctl(pShm->id, IPC_RMID, NULL); +#endif return 0; } From f4047aa120922d723e85e3764b51cdd455e37da9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 7 Apr 2022 13:57:27 +0800 Subject: [PATCH 3/6] [td-14493] add a new api. --- include/client/taos.h | 55 +++++++++++++++++----------------- source/client/src/clientMain.c | 23 ++++++++++++-- 2 files changed, 49 insertions(+), 29 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 0fd2fd8df9..d537943b82 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -152,34 +152,34 @@ DLL_EXPORT void taos_close(TAOS *taos); const char *taos_data_type(int type); -DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos); -DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length); -DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags); -DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name); -DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name); +DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos); +DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length); +DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags); +DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name); +DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name); -DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert); -DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums); -DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes); -DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind); -DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind); -DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx); -DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt); -DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt); -DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt); -DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt); -DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt); -DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert); +DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums); +DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes); +DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind); +DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind); +DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx); +DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt); +DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt); +DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt); -DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql); -DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen); +DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql); +DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen); -DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res); -DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result -DLL_EXPORT void taos_free_result(TAOS_RES *res); -DLL_EXPORT int taos_field_count(TAOS_RES *res); -DLL_EXPORT int taos_num_fields(TAOS_RES *res); -DLL_EXPORT int taos_affected_rows(TAOS_RES *res); +DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res); +DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result +DLL_EXPORT void taos_free_result(TAOS_RES *res); +DLL_EXPORT int taos_field_count(TAOS_RES *res); +DLL_EXPORT int taos_num_fields(TAOS_RES *res); +DLL_EXPORT int taos_affected_rows(TAOS_RES *res); DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res); DLL_EXPORT int taos_select_db(TAOS *taos, const char *db); @@ -188,11 +188,12 @@ DLL_EXPORT void taos_stop_query(TAOS_RES *res); DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col); DLL_EXPORT bool taos_is_update_query(TAOS_RES *res); DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows); +DLL_EXPORT int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex); DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql); DLL_EXPORT void taos_reset_current_db(TAOS *taos); -DLL_EXPORT int *taos_fetch_lengths(TAOS_RES *res); -DLL_EXPORT TAOS_ROW *taos_result_block(TAOS_RES *res); +DLL_EXPORT int *taos_fetch_lengths(TAOS_RES *res); +DLL_EXPORT TAOS_ROW *taos_result_block(TAOS_RES *res); DLL_EXPORT const char *taos_get_server_info(TAOS *taos); DLL_EXPORT const char *taos_get_client_info(); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index b2c32a4366..6fe41176da 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -385,11 +385,11 @@ bool taos_is_update_query(TAOS_RES *res) { } int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { - if (res == NULL) { + SRequestObj *pRequest = (SRequestObj *)res; + if (pRequest == NULL) { return 0; } - SRequestObj *pRequest = (SRequestObj *)res; if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT || pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) { return 0; @@ -405,6 +405,25 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { return pResultInfo->numOfRows; } +int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { + SRequestObj *pRequest = (SRequestObj *)res; + if (pRequest == NULL) { + return 0; + } + + int32_t numOfFields = taos_num_fields(pRequest); + if (columnIndex < 0 || columnIndex >= numOfFields || numOfFields == 0) { + return 0; + } + + TAOS_FIELD* pField = &pRequest->body.resInfo.userFields[columnIndex]; + if (!IS_VAR_DATA_TYPE(pField->type)) { + return 0; + } + + return pRequest->body.resInfo.pCol[columnIndex].offset; +} + int taos_validate_sql(TAOS *taos, const char *sql) { return true; } void taos_reset_current_db(TAOS *taos) { From c84afedce0622eb667ba7e6b9d0a3e98591f2915 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 7 Apr 2022 13:57:47 +0800 Subject: [PATCH 4/6] [td-14493] refactor. --- source/libs/executor/inc/executorimpl.h | 23 +-- source/libs/executor/src/executorimpl.c | 3 +- source/libs/executor/src/groupoperator.c | 177 ++++------------------- 3 files changed, 32 insertions(+), 171 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index aa018aa84f..ac8cd82213 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -625,24 +625,6 @@ typedef struct SSortOperatorInfo { uint64_t totalElapsed; // total elapsed time } SSortOperatorInfo; -typedef struct SDistinctDataInfo { - int32_t index; - int32_t type; - int32_t bytes; -} SDistinctDataInfo; - -typedef struct SDistinctOperatorInfo { - SHashObj* pSet; - SSDataBlock* pRes; - bool recordNullVal; // has already record the null value, no need to try again -// int64_t threshold; // todo remove it -// int64_t outputCapacity;// todo remove it -// int32_t totalBytes; // todo remove it - SResultInfo resInfo; - char* buf; - SArray* pDistinctDataInfo; -} SDistinctOperatorInfo; - int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); void operatorDummyCloseFn(void* param, int32_t numOfCols); int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); @@ -682,8 +664,10 @@ SOperatorInfo* createStreamScanOperatorInfo(void* streamReadHandle, SSDataBlock* SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock, int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createDistinctOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, + SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); +#if 0 SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); @@ -705,6 +689,7 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, int32_t numOfOutput); +#endif void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 38fa1f0548..c5dca0c77d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1610,7 +1610,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe // window start(end) key interpolation doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, pInfo->order, false); - updateTimeWindowInfo(&pInfo->timeWindowData, &win, true); + updateTimeWindowInfo(&pInfo->timeWindowData, &nextWin, true); doApplyFunctions(pInfo->binfo.pCtx, &nextWin, &pInfo->timeWindowData, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } @@ -3276,6 +3276,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock) { SFilterInfo* filter = NULL; + // todo move to the initialization function int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0); SFilterColumnParam param1 = {.numOfCols = pBlock->info.numOfCols, .pDataBlock = pBlock->pDataBlock}; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index a33a240328..b9733e118f 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -131,7 +131,7 @@ static void recordNewGroupKeys(SGroupbyOperatorInfo* pInfo, SSDataBlock* pBlock, } } -static int32_t buildGroupValKey(void* pKey, int32_t* length, SArray* pGroupColVals) { +static int32_t buildGroupKeys(void* pKey, const SArray* pGroupColVals) { ASSERT(pKey != NULL); size_t numOfGroupCols = taosArrayGetSize(pGroupColVals); @@ -155,8 +155,7 @@ static int32_t buildGroupValKey(void* pKey, int32_t* length, SArray* pGroupColVa } } - *length = (pStart - (char*)pKey); - return 0; + return (int32_t) (pStart - (char*)pKey); } // assign the group keys or user input constant values if required @@ -217,7 +216,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { continue; } - /*int32_t ret = */ buildGroupValKey(pInfo->keyBuf, &len, pInfo->pGroupColVals); + len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code longjmp(pTaskInfo->env, TSDB_CODE_QRY_APP_ERROR); @@ -233,7 +232,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } if (num > 0) { - /*int32_t ret = */ buildGroupValKey(pInfo->keyBuf, &len, pInfo->pGroupColVals); + len = buildGroupKeys(pInfo->keyBuf, pInfo->pGroupColVals); int32_t ret = setGroupResultOutputBuf_rv(&(pInfo->binfo), pOperator->numOfOutput, pInfo->keyBuf, TSDB_DATA_TYPE_VARCHAR, len, 0, pInfo->aggSup.pResultBuf, pTaskInfo, &pInfo->aggSup); @@ -351,164 +350,40 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx return NULL; } -#define MULTI_KEY_DELIM "-" - -static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) { - SDistinctOperatorInfo* pInfo = (SDistinctOperatorInfo*)param; - taosHashCleanup(pInfo->pSet); - taosMemoryFreeClear(pInfo->buf); - taosArrayDestroy(pInfo->pDistinctDataInfo); - pInfo->pRes = blockDataDestroy(pInfo->pRes); -} - -static void buildMultiDistinctKey(SDistinctOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowId) { - char* p = pInfo->buf; -// memset(p, 0, pInfo->totalBytes); - - for (int i = 0; i < taosArrayGetSize(pInfo->pDistinctDataInfo); i++) { - SDistinctDataInfo* pDistDataInfo = (SDistinctDataInfo*)taosArrayGet(pInfo->pDistinctDataInfo, i); - SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); - - char* val = ((char*)pColDataInfo->pData) + pColDataInfo->info.bytes * rowId; - if (isNull(val, pDistDataInfo->type)) { - p += pDistDataInfo->bytes; - continue; - } - if (IS_VAR_DATA_TYPE(pDistDataInfo->type)) { - memcpy(p, varDataVal(val), varDataLen(val)); - p += varDataLen(val); - } else { - memcpy(p, val, pDistDataInfo->bytes); - p += pDistDataInfo->bytes; - } - memcpy(p, MULTI_KEY_DELIM, strlen(MULTI_KEY_DELIM)); - p += strlen(MULTI_KEY_DELIM); - } -} - -static bool initMultiDistinctInfo(SDistinctOperatorInfo* pInfo, SOperatorInfo* pOperator) { - for (int i = 0; i < pOperator->numOfOutput; i++) { - // pInfo->totalBytes += pOperator->pExpr[i].base.colBytes; - } -#if 0 - for (int i = 0; i < pOperator->numOfOutput; i++) { - int numOfCols = (int)(taosArrayGetSize(pBlock->pDataBlock)); - assert(i < numOfCols); - - for (int j = 0; j < numOfCols; j++) { - SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, j); - if (pColDataInfo->info.colId == pOperator->pExpr[i].base.resSchema.colId) { - SDistinctDataInfo item = {.index = j, .type = pColDataInfo->info.type, .bytes = pColDataInfo->info.bytes}; - taosArrayInsert(pInfo->pDistinctDataInfo, i, &item); - } - } - } -#endif - -// pInfo->totalBytes += (int32_t)strlen(MULTI_KEY_DELIM) * (pOperator->numOfOutput); -// pInfo->buf = taosMemoryCalloc(1, pInfo->totalBytes); - return taosArrayGetSize(pInfo->pDistinctDataInfo) == pOperator->numOfOutput ? true : false; -} - -static SSDataBlock* hashDistinct(SOperatorInfo* pOperator, bool* newgroup) { - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - SDistinctOperatorInfo* pInfo = pOperator->info; - SSDataBlock* pRes = pInfo->pRes; - - pRes->info.rows = 0; - SSDataBlock* pBlock = NULL; - - SOperatorInfo* pDownstream = pOperator->pDownstream[0]; - while (1) { - publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pDownstream->getNextFn(pDownstream, newgroup); - publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC); - - if (pBlock == NULL) { - doSetOperatorCompleted(pOperator); - break; - } - - // ensure result output buf - if (pRes->info.rows + pBlock->info.rows > pInfo->resInfo.capacity) { - int32_t newSize = pRes->info.rows + pBlock->info.rows; - for (int i = 0; i < taosArrayGetSize(pRes->pDataBlock); i++) { - SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, i); - SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, i); - -// char* tmp = taosMemoryRealloc(pResultColInfoData->pData, newSize * pDistDataInfo->bytes); -// if (tmp == NULL) { -// return NULL; -// } else { -// pResultColInfoData->pData = tmp; -// } - } - pInfo->resInfo.capacity = newSize; - } - - for (int32_t i = 0; i < pBlock->info.rows; i++) { - buildMultiDistinctKey(pInfo, pBlock, i); - if (taosHashGet(pInfo->pSet, pInfo->buf, 0) == NULL) { - taosHashPut(pInfo->pSet, pInfo->buf, 0, NULL, 0); - - for (int j = 0; j < taosArrayGetSize(pRes->pDataBlock); j++) { - SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, j); // distinct meta info - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); // src - SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, j); // dist - - char* val = ((char*)pColInfoData->pData) + pDistDataInfo->bytes * i; - char* start = pResultColInfoData->pData + pDistDataInfo->bytes * pInfo->pRes->info.rows; - memcpy(start, val, pDistDataInfo->bytes); - } - - pRes->info.rows += 1; - } - } - - if (pRes->info.rows >= pInfo->resInfo.threshold) { - break; - } - } - - return (pInfo->pRes->info.rows > 0) ? pInfo->pRes : NULL; -} - -SOperatorInfo* createDistinctOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) { - SDistinctOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDistinctOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); +SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, + SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { + SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pOperator->resultInfo.capacity = 4096; // todo extract function. + pInfo->pGroupCols = pGroupColList; + pInfo->pCondition = pCondition; + initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str); + initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); -// pInfo->totalBytes = 0; - pInfo->buf = NULL; + int32_t code = initGroupOptrInfo(pInfo, pGroupColList); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } - pInfo->pDistinctDataInfo = taosArrayInit(numOfCols, sizeof(SDistinctDataInfo)); - initMultiDistinctInfo(pInfo, pOperator); - - pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - - pOperator->name = "DistinctOperator"; + pOperator->name = "PartitionByOperator"; pOperator->blockingOptr = true; pOperator->status = OP_NOT_OPENED; -// pOperator->operatorType = DISTINCT; - pOperator->pExpr = pExpr; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION; + pOperator->pExpr = pExprInfo; pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; - pOperator->getNextFn = hashDistinct; - pOperator->closeFn = destroyDistinctOperatorInfo; + pOperator->_openFn = operatorDummyOpenFn; + pOperator->getNextFn = hashGroupbyAggregate; + pOperator->closeFn = destroyGroupbyOperatorInfo; - int32_t code = appendDownstream(pOperator, &downstream, 1); + code = appendDownstream(pOperator, &downstream, 1); return pOperator; _error: - pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; - taosMemoryFree(pInfo); - taosMemoryFree(pOperator); + taosMemoryFreeClear(pInfo); + taosMemoryFreeClear(pOperator); return NULL; -} +} \ No newline at end of file From a60185adc8a103445069b618f580c308cbc834c8 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 7 Apr 2022 14:11:16 +0800 Subject: [PATCH 5/6] add more log --- include/common/tmsg.h | 6 ++ source/client/src/tmq.c | 17 ++--- source/dnode/mnode/impl/inc/mndDef.h | 3 + source/dnode/mnode/impl/src/mndSubscribe.c | 2 + source/dnode/vnode/src/inc/tqInt.h | 2 +- source/dnode/vnode/src/tq/tq.c | 73 ++++++++++++++++------ 6 files changed, 75 insertions(+), 28 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 6ec7d8c75b..b8f4afdd0f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1811,6 +1811,7 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) { typedef struct { int64_t leftForVer; int32_t vgId; + int32_t epoch; int64_t consumerId; char topicName[TSDB_TOPIC_FNAME_LEN]; char cgroup[TSDB_CGROUP_LEN]; @@ -1824,6 +1825,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pReq->leftForVer); tlen += taosEncodeFixedI32(buf, pReq->vgId); + tlen += taosEncodeFixedI32(buf, pReq->epoch); tlen += taosEncodeFixedI64(buf, pReq->consumerId); tlen += taosEncodeString(buf, pReq->topicName); tlen += taosEncodeString(buf, pReq->cgroup); @@ -1837,6 +1839,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { buf = taosDecodeFixedI64(buf, &pReq->leftForVer); buf = taosDecodeFixedI32(buf, &pReq->vgId); + buf = taosDecodeFixedI32(buf, &pReq->epoch); buf = taosDecodeFixedI64(buf, &pReq->consumerId); buf = taosDecodeStringTo(buf, pReq->topicName); buf = taosDecodeStringTo(buf, pReq->cgroup); @@ -1852,6 +1855,7 @@ typedef struct { int32_t vgId; int64_t oldConsumerId; int64_t newConsumerId; + char* topic; } SMqMVRebReq; static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pReq) { @@ -1860,6 +1864,7 @@ static FORCE_INLINE int32_t tEncodeSMqMVRebReq(void** buf, const SMqMVRebReq* pR tlen += taosEncodeFixedI32(buf, pReq->vgId); tlen += taosEncodeFixedI64(buf, pReq->oldConsumerId); tlen += taosEncodeFixedI64(buf, pReq->newConsumerId); + tlen += taosEncodeString(buf, pReq->topic); return tlen; } @@ -1868,6 +1873,7 @@ static FORCE_INLINE void* tDecodeSMqMVRebReq(void* buf, SMqMVRebReq* pReq) { buf = taosDecodeFixedI32(buf, &pReq->vgId); buf = taosDecodeFixedI64(buf, &pReq->oldConsumerId); buf = taosDecodeFixedI64(buf, &pReq->newConsumerId); + buf = taosDecodeString(buf, &pReq->topic); return buf; } diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 602ecdf6ab..a6e5fee2d1 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -108,7 +108,7 @@ typedef struct { // connection info int32_t vgId; int32_t vgStatus; - int64_t skipCnt; + int32_t vgSkipCnt; SEpSet epSet; } SMqClientVg; @@ -849,7 +849,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { if (msgEpoch < tmqEpoch) { /*printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);*/ /*tsem_post(&tmq->rspSem);*/ - tscWarn("discard rsp from vg %d, epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch); + tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch); return 0; } @@ -881,6 +881,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { /*SMqConsumeRsp* pRsp = taosMemoryCalloc(1, sizeof(SMqConsumeRsp));*/ tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t)); if (pRsp == NULL) { + tscWarn("msg discard from vg %d, epoch %d since out of memory", pParam->vgId, pParam->epoch); goto CREATE_MSG_FAIL; } memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); @@ -969,14 +970,14 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { offset = *pOffset; tscDebug("consumer %ld epoch %d vg %d found %s", tmq->consumerId, epoch, pVgEp->vgId, vgKey); } - tscDebug("consumer %ld epoch %d vg %d offset set to %ld\n", tmq->consumerId, epoch, pVgEp->vgId, offset); + tscDebug("consumer %ld epoch %d vg %d offset set to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset); SMqClientVg clientVg = { .pollCnt = 0, .currentOffset = offset, .vgId = pVgEp->vgId, .epSet = pVgEp->epSet, .vgStatus = TMQ_VG_STATUS__IDLE, - .skipCnt = 0, + .vgSkipCnt = 0, }; taosArrayPush(topic.vgs, &clientVg); set = true; @@ -1232,9 +1233,10 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); if (vgStatus != TMQ_VG_STATUS__IDLE) { - int64_t skipCnt = atomic_add_fetch_64(&pVg->skipCnt, 1); - tscDebug("consumer %ld epoch %d skip vg %d skip cnt %ld", tmq->consumerId, tmq->epoch, pVg->vgId, skipCnt); + int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1); + tscDebug("consumer %ld epoch %d skip vg %d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt); continue; + /*if (vgSkipCnt < 10000) continue;*/ #if 0 if (skipCnt < 30000) { continue; @@ -1243,7 +1245,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { } #endif } - atomic_store_64(&pVg->skipCnt, 0); + atomic_store_32(&pVg->vgSkipCnt, 0); SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg); if (pReq == NULL) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); @@ -1409,6 +1411,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { if (blocking_time != 0) { int64_t endTime = taosGetTimestampMs(); if (endTime - startTime > blocking_time) { + tscDebug("consumer %ld (epoch %d) timeout, no rsp", tmq->consumerId, tmq->epoch); return NULL; } } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index f418ec2290..38ef1185e8 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -413,6 +413,7 @@ typedef struct { typedef struct { int32_t vgId; // -1 for unassigned int32_t status; + int32_t epoch; SEpSet epSet; int64_t oldConsumerId; int64_t consumerId; // -1 for unassigned @@ -423,6 +424,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId); tlen += taosEncodeFixedI32(buf, pConsumerEp->status); + tlen += taosEncodeFixedI32(buf, pConsumerEp->epoch); tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet); tlen += taosEncodeFixedI64(buf, pConsumerEp->oldConsumerId); tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); @@ -433,6 +435,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) { buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId); buf = taosDecodeFixedI32(buf, &pConsumerEp->status); + buf = taosDecodeFixedI32(buf, &pConsumerEp->epoch); buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet); buf = taosDecodeFixedI64(buf, &pConsumerEp->oldConsumerId); buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 57d98396ea..2e297865a0 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -493,6 +493,8 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { pConsumerEp->oldConsumerId = pConsumerEp->consumerId; pConsumerEp->consumerId = pSubConsumer->consumerId; + //TODO + pConsumerEp->epoch = 0; taosArrayPush(pSubConsumer->vgInfo, pConsumerEp); if (pConsumerEp->oldConsumerId == -1) { diff --git a/source/dnode/vnode/src/inc/tqInt.h b/source/dnode/vnode/src/inc/tqInt.h index f0c3f6801a..bb42151cf3 100644 --- a/source/dnode/vnode/src/inc/tqInt.h +++ b/source/dnode/vnode/src/inc/tqInt.h @@ -206,7 +206,7 @@ typedef struct { typedef struct { int64_t consumerId; - int64_t epoch; + int32_t epoch; char cgroup[TSDB_TOPIC_FNAME_LEN]; SArray* topics; // SArray } STqConsumer; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 68ade46efd..602e9047b3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -167,7 +167,7 @@ static FORCE_INLINE int32_t tEncodeSTqConsumer(void** buf, const STqConsumer* pC int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, pConsumer->consumerId); - tlen += taosEncodeFixedI64(buf, pConsumer->epoch); + tlen += taosEncodeFixedI32(buf, pConsumer->epoch); tlen += taosEncodeString(buf, pConsumer->cgroup); sz = taosArrayGetSize(pConsumer->topics); tlen += taosEncodeFixedI32(buf, sz); @@ -182,7 +182,7 @@ static FORCE_INLINE const void* tDecodeSTqConsumer(const void* buf, STqConsumer* int32_t sz; buf = taosDecodeFixedI64(buf, &pConsumer->consumerId); - buf = taosDecodeFixedI64(buf, &pConsumer->epoch); + buf = taosDecodeFixedI32(buf, &pConsumer->epoch); buf = taosDecodeStringTo(buf, pConsumer->cgroup); buf = taosDecodeFixedI32(buf, &sz); pConsumer->topics = taosArrayInit(sz, sizeof(STqTopic)); @@ -255,6 +255,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { int64_t consumerId = pReq->consumerId; int64_t fetchOffset; int64_t blockingTime = pReq->blockingTime; + int32_t reqEpoch = pReq->epoch; if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { fetchOffset = 0; @@ -264,7 +265,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { fetchOffset = pReq->currentOffset + 1; } - /*printf("tmq poll vg %d req %ld %ld\n", pTq->pVnode->vgId, pReq->currentOffset, fetchOffset);*/ + vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, pTq->pVnode->vgId, pReq->currentOffset, fetchOffset); SMqPollRsp rsp = { /*.consumerId = consumerId,*/ @@ -274,6 +275,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); if (pConsumer == NULL) { + vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, pTq->pVnode->vgId); pMsg->pCont = NULL; pMsg->contLen = 0; pMsg->code = -1; @@ -281,30 +283,57 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { return 0; } + int32_t consumerEpoch = atomic_load_32(&pConsumer->epoch); + while (consumerEpoch < reqEpoch) { + consumerEpoch = atomic_val_compare_exchange_32(&pConsumer->epoch, consumerEpoch, reqEpoch); + } + + STqTopic* pTopic = NULL; int sz = taosArrayGetSize(pConsumer->topics); - ASSERT(sz == 1); - STqTopic* pTopic = taosArrayGet(pConsumer->topics, 0); - ASSERT(strcmp(pTopic->topicName, pReq->topic) == 0); - ASSERT(pConsumer->consumerId == consumerId); + for (int32_t i = 0; i < sz; i++) { + STqTopic* topic = taosArrayGet(pConsumer->topics, i); + //TODO race condition + ASSERT(pConsumer->consumerId == consumerId); + if (strcmp(topic->topicName, pReq->topic) == 0) { + pTopic = topic; + break; + } + } + if (pTopic == NULL) { + vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic, pTq->pVnode->vgId); + pMsg->pCont = NULL; + pMsg->contLen = 0; + pMsg->code = -1; + tmsgSendRsp(pMsg); + return 0; + } + + vDebug("poll topic %s from consumer %ld (epoch %d)", pTopic->topicName, consumerId, pReq->epoch); rsp.reqOffset = pReq->currentOffset; rsp.skipLogNum = 0; while (1) { /*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/ + //TODO + consumerEpoch = atomic_load_32(&pConsumer->epoch); + if (consumerEpoch > pReq->epoch) { + //TODO: return + break; + } SWalReadHead* pHead; if (walReadWithHandle_s(pTopic->pReadhandle, fetchOffset, &pHead) < 0) { // TODO: no more log, set timer to wait blocking time // if data inserted during waiting, launch query and // response to user + vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset); break; } - /*printf("vg %d offset %ld msgType %d from epoch %d\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType, pReq->epoch);*/ + vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, pHead->msgType); /*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/ /*pHead = pTopic->pReadhandle->pHead;*/ if (pHead->msgType == TDMT_VND_SUBMIT) { SSubmitReq* pCont = (SSubmitReq*)&pHead->body; - /*printf("from topic %s from consumer\n", pTopic->topicName, consumerId);*/ qTaskInfo_t task = pTopic->buffer.output[workerId].task; ASSERT(task); qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK); @@ -324,6 +353,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } if (taosArrayGetSize(pRes) == 0) { + vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset); fetchOffset++; rsp.skipLogNum++; taosArrayDestroy(pRes); @@ -352,7 +382,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->pCont = buf; pMsg->contLen = tlen; pMsg->code = 0; - /*printf("vg %d offset %ld msgType %d from epoch %d actual rsp\n", pTq->pVnode->vgId, fetchOffset, pHead->msgType, pReq->epoch);*/ + vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset, pHead->msgType, consumerId, pReq->epoch); tmsgSendRsp(pMsg); taosMemoryFree(pHead); return 0; @@ -383,7 +413,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->contLen = tlen; pMsg->code = 0; tmsgSendRsp(pMsg); - /*printf("vg %d offset %ld from epoch %d not rsp\n", pTq->pVnode->vgId, fetchOffset, pReq->epoch);*/ + vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId, pReq->epoch); /*}*/ return 0; @@ -393,6 +423,7 @@ int32_t tqProcessRebReq(STQ* pTq, char* msg) { SMqMVRebReq req = {0}; tDecodeSMqMVRebReq(msg, &req); + vDebug("vg %d set from consumer %ld to consumer %ld", req.vgId, req.oldConsumerId ,req.newConsumerId); STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId); ASSERT(pConsumer); pConsumer->consumerId = req.newConsumerId; @@ -407,18 +438,20 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { SMqSetCVgReq req = {0}; tDecodeSMqSetCVgReq(msg, &req); - /*printf("vg %d set to consumer from %ld to %ld\n", req.vgId, req.oldConsumerId, req.newConsumerId);*/ - STqConsumer* pConsumer = taosMemoryCalloc(1, sizeof(STqConsumer)); + vDebug("vg %d set to consumer %ld", req.vgId, req.consumerId); + STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.consumerId); if (pConsumer == NULL) { - terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; - return -1; + pConsumer = taosMemoryCalloc(1, sizeof(STqConsumer)); + if (pConsumer == NULL) { + terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; + return -1; + } + strcpy(pConsumer->cgroup, req.cgroup); + pConsumer->topics = taosArrayInit(0, sizeof(STqTopic)); + pConsumer->consumerId = req.consumerId; + pConsumer->epoch = 0; } - strcpy(pConsumer->cgroup, req.cgroup); - pConsumer->topics = taosArrayInit(0, sizeof(STqTopic)); - pConsumer->consumerId = req.consumerId; - pConsumer->epoch = 0; - STqTopic* pTopic = taosMemoryCalloc(1, sizeof(STqTopic)); if (pTopic == NULL) { taosArrayDestroy(pConsumer->topics); From 945de54b688fee2a098f022bffcd58313f6b1570 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 7 Apr 2022 14:51:52 +0800 Subject: [PATCH 6/6] [td-14493] add new api. --- include/client/taos.h | 2 + source/client/src/clientMain.c | 40 +++++++++++-- source/libs/executor/inc/executorimpl.h | 6 +- source/libs/executor/src/executorimpl.c | 49 ++++++++-------- source/libs/executor/src/groupoperator.c | 74 ++++++++++++++++-------- 5 files changed, 117 insertions(+), 54 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index d537943b82..d3856d432e 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -188,6 +188,8 @@ DLL_EXPORT void taos_stop_query(TAOS_RES *res); DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col); DLL_EXPORT bool taos_is_update_query(TAOS_RES *res); DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows); +DLL_EXPORT int taos_fetch_block_s(TAOS_RES *res, int* numOfRows, TAOS_ROW *rows); +DLL_EXPORT int taos_fetch_raw_block(TAOS_RES *res, int* numOfRows, void** pData); DLL_EXPORT int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex); DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql); DLL_EXPORT void taos_reset_current_db(TAOS *taos); diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 6fe41176da..e10cf5179e 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -385,6 +385,37 @@ bool taos_is_update_query(TAOS_RES *res) { } int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { + int32_t numOfRows = 0; + /*int32_t code = */taos_fetch_block_s(res, &numOfRows, rows); + return numOfRows; +} + +int taos_fetch_block_s(TAOS_RES *res, int* numOfRows, TAOS_ROW *rows) { + SRequestObj *pRequest = (SRequestObj *)res; + if (pRequest == NULL) { + return 0; + } + + (*rows) = NULL; + (*numOfRows) = 0; + + if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT || + pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) { + return 0; + } + + doFetchRow(pRequest, false); + + // TODO refactor + SReqResultInfo *pResultInfo = &pRequest->body.resInfo; + pResultInfo->current = pResultInfo->numOfRows; + + (*rows) = pResultInfo->row; + (*numOfRows) = pResultInfo->numOfRows; + return pRequest->code; +} + +int taos_fetch_raw_block(TAOS_RES *res, int* numOfRows, void** pData) { SRequestObj *pRequest = (SRequestObj *)res; if (pRequest == NULL) { return 0; @@ -397,12 +428,13 @@ int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { doFetchRow(pRequest, false); - // TODO refactor SReqResultInfo *pResultInfo = &pRequest->body.resInfo; - pResultInfo->current = pResultInfo->numOfRows; - *rows = pResultInfo->row; - return pResultInfo->numOfRows; + pResultInfo->current = pResultInfo->numOfRows; + (*numOfRows) = pResultInfo->numOfRows; + (*pData) = (void*) pResultInfo->pData; + + return 0; } int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index ac8cd82213..57edc40007 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -641,6 +641,8 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI uint64_t* total, SArray* pColList); void doSetOperatorCompleted(SOperatorInfo* pOperator); void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock); +SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity); +SSDataBlock* loadNextDataBlock(void* param); SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t repeatTime, @@ -665,8 +667,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, - SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); +SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResultBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); + #if 0 SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index c5dca0c77d..c73886b5b0 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4737,8 +4737,7 @@ static void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHan pBlock->info.rows += 1; } -static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, bool hasVarCol, - int32_t capacity) { +SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity) { blockDataCleanup(pDataBlock); while (1) { @@ -4759,7 +4758,6 @@ static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataB SSDataBlock* loadNextDataBlock(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*)param; bool newgroup = false; - return pOperator->getNextFn(pOperator, &newgroup); } @@ -4939,7 +4937,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator, bool* newgroup) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSortedMergeOperatorInfo* pInfo = pOperator->info; if (pOperator->status == OP_RES_TO_RETURN) { - return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->hasVarCol, pInfo->binfo.capacity); + return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->binfo.capacity); } int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; @@ -5084,15 +5082,14 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSortOperatorInfo* pInfo = pOperator->info; - bool hasVarCol = pInfo->pDataBlock->info.hasVarCol; if (pOperator->status == OP_RES_TO_RETURN) { - return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, hasVarCol, pInfo->numOfRowsInRes); + return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); } int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize, numOfBufPage, - pInfo->pDataBlock, "GET_TASKID(pTaskInfo)"); + pInfo->pDataBlock, pTaskInfo->id.str); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); @@ -5106,38 +5103,40 @@ static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) { } pOperator->status = OP_RES_TO_RETURN; - return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, hasVarCol, pInfo->numOfRowsInRes); + return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); } -SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, - SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo) { SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { - taosMemoryFreeClear(pInfo); - taosMemoryFreeClear(pOperator); - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; + goto _error; } - pInfo->sortBufSize = 1024 * 16; // 1MB, TODO dynamic set the available sort buffer - pInfo->bufPageSize = 1024; - pInfo->numOfRowsInRes = 1024; - pInfo->pDataBlock = pResBlock; - pInfo->pSortInfo = pSortInfo; + pInfo->sortBufSize = 1024 * 16; // TODO dynamic set the available sort buffer + pInfo->bufPageSize = 1024; + pInfo->numOfRowsInRes = 1024; + pInfo->pDataBlock = pResBlock; + pInfo->pSortInfo = pSortInfo; - pOperator->name = "Sort"; + pOperator->name = "SortOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; pOperator->blockingOptr = true; - pOperator->status = OP_NOT_OPENED; - pOperator->info = pInfo; + pOperator->status = OP_NOT_OPENED; + pOperator->info = pInfo; - pOperator->pTaskInfo = pTaskInfo; - pOperator->getNextFn = doSort; - pOperator->closeFn = destroyOrderOperatorInfo; + pOperator->pTaskInfo = pTaskInfo; + pOperator->getNextFn = doSort; + pOperator->closeFn = destroyOrderOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; + + _error: + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pInfo); + taosMemoryFree(pOperator); + return NULL; } static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) { return pTableScanInfo->order; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index b9733e118f..b3a8e09f16 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -345,45 +345,73 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx return pOperator; _error: + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFreeClear(pInfo); taosMemoryFreeClear(pOperator); return NULL; } -SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, - SArray* pGroupColList, SNode* pCondition, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { - SGroupbyOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SGroupbyOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); +static SSDataBlock* doPartitionData(SOperatorInfo* pOperator, bool* newgroup) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SSortOperatorInfo* pInfo = pOperator->info; + bool hasVarCol = pInfo->pDataBlock->info.hasVarCol; + + if (pOperator->status == OP_RES_TO_RETURN) { + return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); + } + + int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize, numOfBufPage, + pInfo->pDataBlock, pTaskInfo->id.str); + + tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); + + SGenericSource* ps = taosMemoryCalloc(1, sizeof(SGenericSource)); + ps->param = pOperator->pDownstream[0]; + tsortAddSource(pInfo->pSortHandle, ps); + + int32_t code = tsortOpen(pInfo->pSortHandle); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pTaskInfo->env, terrno); + } + + pOperator->status = OP_RES_TO_RETURN; + return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->numOfRowsInRes); +} + +SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResultBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { + SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->pGroupCols = pGroupColList; - pInfo->pCondition = pCondition; - initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, 4096, pResultBlock, pTaskInfo->id.str); - initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); + pInfo->sortBufSize = 1024 * 16; // TODO dynamic set the available sort buffer + pInfo->bufPageSize = 1024; + pInfo->numOfRowsInRes = 1024; + pInfo->pDataBlock = pResultBlock; + pInfo->pSortInfo = pSortInfo; - int32_t code = initGroupOptrInfo(pInfo, pGroupColList); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - pOperator->name = "PartitionByOperator"; + pOperator->name = "PartitionOperator"; + pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION; pOperator->blockingOptr = true; pOperator->status = OP_NOT_OPENED; - pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PARTITION; - pOperator->pExpr = pExprInfo; - pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; - pOperator->_openFn = operatorDummyOpenFn; - pOperator->getNextFn = hashGroupbyAggregate; - pOperator->closeFn = destroyGroupbyOperatorInfo; - code = appendDownstream(pOperator, &downstream, 1); + pOperator->pTaskInfo = pTaskInfo; + pOperator->getNextFn = doPartitionData; +// pOperator->closeFn = destroyOrderOperatorInfo; + + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; _error: - taosMemoryFreeClear(pInfo); - taosMemoryFreeClear(pOperator); + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + taosMemoryFree(pInfo); + taosMemoryFree(pOperator); return NULL; } \ No newline at end of file