From ba8ad18c02abe512984fb7bee40c764f44075cc6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 12 Dec 2020 11:57:19 +0800 Subject: [PATCH 01/11] [TD-2190]: failed to time range check for fill query. --- src/client/src/tscSQLParser.c | 2 +- tests/script/general/parser/fill.sim | 16 ++++++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 8e4ef91e27..1cfbea4cc4 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -6562,7 +6562,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5); } - if (pQueryInfo->interval.interval > 0 && pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit != 'y') { + if (pQueryInfo->interval.interval > 0) { bool initialWindows = TSWINDOW_IS_EQUAL(pQueryInfo->window, TSWINDOW_INITIALIZER); if (initialWindows) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6); diff --git a/tests/script/general/parser/fill.sim b/tests/script/general/parser/fill.sim index 9851a4e7fc..405a805312 100644 --- a/tests/script/general/parser/fill.sim +++ b/tests/script/general/parser/fill.sim @@ -848,10 +848,7 @@ if $rows != 12 then return -1 endi -print =====================>td-1442 -sql_error select count(*) from m_fl_tb0 interval(1s) fill(prev); - -print =====================> aggregation + arithmetic + fill +print =====================> aggregation + arithmetic + fill, need to add cases TODO #sql select avg(cpu_taosd) - first(cpu_taosd) from dn1 where ts<'2020-11-13 11:00:00' and ts>'2020-11-13 10:50:00' interval(10s) fill(value, 99) #sql select count(*), first(k), avg(k), avg(k)-first(k) from tm0 where ts>'2020-1-1 1:1:1' and ts<'2020-1-1 1:02:59' interval(10s) fill(value, 99); #sql select count(*), first(k), avg(k), avg(k)-first(k) from tm0 where ts>'2020-1-1 1:1:1' and ts<'2020-1-1 1:02:59' interval(10s) fill(NULL); @@ -1044,6 +1041,17 @@ if $data12 != 1 then return -1 endi +print =====================>td-1442, td-2190 , no time range for fill option +sql_error select count(*) from m_fl_tb0 interval(1s) fill(prev); + +sql_error select min(c3) from m_fl_mt0 interval(10a) fill(value, 20) +sql_error select min(c3) from m_fl_mt0 interval(10s) fill(value, 20) +sql_error select min(c3) from m_fl_mt0 interval(10m) fill(value, 20) +sql_error select min(c3) from m_fl_mt0 interval(10h) fill(value, 20) +sql_error select min(c3) from m_fl_mt0 interval(10d) fill(value, 20) +sql_error select min(c3) from m_fl_mt0 interval(10w) fill(value, 20) +sql_error select max(c3) from m_fl_mt0 interval(1n) fill(prev) +sql_error select min(c3) from m_fl_mt0 interval(1y) fill(value, 20) print =============== clear #sql drop database $db From d9c09bde8bfdef0ba1166e4ea251dfef93114744 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 12 Dec 2020 11:58:41 +0800 Subject: [PATCH 02/11] [TD-225] --- src/client/src/tscFunctionImpl.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 62e55c033f..7d41d57ccc 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -2222,7 +2222,8 @@ static void buildTopBotStruct(STopBotInfo *pTopBotInfo, SQLFunctionCtx *pCtx) { tmp += POINTER_BYTES * pCtx->param[0].i64Key; size_t size = sizeof(tValuePair) + pCtx->tagInfo.tagsLen; - + assert(pCtx->param[0].i64Key > 0); + for (int32_t i = 0; i < pCtx->param[0].i64Key; ++i) { pTopBotInfo->res[i] = (tValuePair*) tmp; pTopBotInfo->res[i]->pTags = tmp + sizeof(tValuePair); From bcc9d7bc9fa3b59e561bd45f6a8a2819dc28fac2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 12 Dec 2020 15:21:59 +0800 Subject: [PATCH 03/11] [TD-225] refactor. --- src/query/inc/qUtil.h | 6 +++--- src/query/src/qExecutor.c | 32 ++++++++++++++++---------------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index fb71c8a5fe..ac7aa75809 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -24,6 +24,9 @@ #define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t)) +#define curTimeWindowIndex(_winres) ((_winres)->curIndex) +#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pExpr1[1].base.arg->argValue.i64:1) + int32_t getOutputInterResultBufSize(SQuery* pQuery); void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pRow, int16_t type); @@ -47,9 +50,6 @@ static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pWindowResInfo, int return pWindowResInfo->pResult[slot]; } -#define curTimeWindowIndex(_winres) ((_winres)->curIndex) -#define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pExpr1[1].base.arg->argValue.i64:1) - bool isWindowResClosed(SResultRowInfo *pWindowResInfo, int32_t slot); int32_t initResultRow(SResultRow *pResultRow); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 58f05c9d9d..0a8916b13f 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -464,13 +464,13 @@ static bool hasNullValue(SColIndex* pColIndex, SDataStatis *pStatis, SDataStatis return true; } -static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pWindowResInfo, char *pData, +static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, char *pData, int16_t bytes, bool masterscan, uint64_t uid) { SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid); int32_t *p1 = (int32_t *)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); if (p1 != NULL) { - pWindowResInfo->curIndex = *p1; + pResultRowInfo->curIndex = *p1; } else { if (!masterscan) { // not master scan, do not add new timewindow return NULL; @@ -478,46 +478,46 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes // TODO refactor // more than the capacity, reallocate the resources - if (pWindowResInfo->size >= pWindowResInfo->capacity) { + if (pResultRowInfo->size >= pResultRowInfo->capacity) { int64_t newCapacity = 0; - if (pWindowResInfo->capacity > 10000) { - newCapacity = (int64_t)(pWindowResInfo->capacity * 1.25); + if (pResultRowInfo->capacity > 10000) { + newCapacity = (int64_t)(pResultRowInfo->capacity * 1.25); } else { - newCapacity = (int64_t)(pWindowResInfo->capacity * 1.5); + newCapacity = (int64_t)(pResultRowInfo->capacity * 1.5); } - char *t = realloc(pWindowResInfo->pResult, (size_t)(newCapacity * POINTER_BYTES)); + char *t = realloc(pResultRowInfo->pResult, (size_t)(newCapacity * POINTER_BYTES)); if (t == NULL) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - pWindowResInfo->pResult = (SResultRow **)t; + pResultRowInfo->pResult = (SResultRow **)t; - int32_t inc = (int32_t)newCapacity - pWindowResInfo->capacity; - memset(&pWindowResInfo->pResult[pWindowResInfo->capacity], 0, POINTER_BYTES * inc); + int32_t inc = (int32_t)newCapacity - pResultRowInfo->capacity; + memset(&pResultRowInfo->pResult[pResultRowInfo->capacity], 0, POINTER_BYTES * inc); - pWindowResInfo->capacity = (int32_t)newCapacity; + pResultRowInfo->capacity = (int32_t)newCapacity; } SResultRow *pResult = getNewResultRow(pRuntimeEnv->pool); - pWindowResInfo->pResult[pWindowResInfo->size] = pResult; + pResultRowInfo->pResult[pResultRowInfo->size] = pResult; int32_t ret = initResultRow(pResult); if (ret != TSDB_CODE_SUCCESS) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } // add a new result set for a new group - pWindowResInfo->curIndex = pWindowResInfo->size++; + pResultRowInfo->curIndex = pResultRowInfo->size++; taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), - (char *)&pWindowResInfo->curIndex, sizeof(int32_t)); + (char *)&pResultRowInfo->curIndex, sizeof(int32_t)); } // too many time window in query - if (pWindowResInfo->size > MAX_INTERVAL_TIME_WINDOW) { + if (pResultRowInfo->size > MAX_INTERVAL_TIME_WINDOW) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); } - return getResultRow(pWindowResInfo, pWindowResInfo->curIndex); + return getResultRow(pResultRowInfo, pResultRowInfo->curIndex); } // get the correct time window according to the handled timestamp From 09753042427a0f1305ef1cc066afb66c8322ba2d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 12 Dec 2020 15:23:51 +0800 Subject: [PATCH 04/11] [TD-225] --- src/query/inc/qUtil.h | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index ac7aa75809..12024e7697 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -33,24 +33,24 @@ void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pRow, int16_t typ void copyResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* dst, const SResultRow* src, int16_t type); SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index); -int32_t initWindowResInfo(SResultRowInfo* pWindowResInfo, int32_t size, int16_t type); +int32_t initWindowResInfo(SResultRowInfo* pResultRowInfo, int32_t size, int16_t type); -void cleanupTimeWindowInfo(SResultRowInfo* pWindowResInfo); -void resetTimeWindowInfo(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pWindowResInfo); +void cleanupTimeWindowInfo(SResultRowInfo* pResultRowInfo); +void resetTimeWindowInfo(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo); void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num); void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv); -int32_t numOfClosedTimeWindow(SResultRowInfo* pWindowResInfo); -void closeTimeWindow(SResultRowInfo* pWindowResInfo, int32_t slot); -void closeAllTimeWindow(SResultRowInfo* pWindowResInfo); -void removeRedundantWindow(SResultRowInfo *pWindowResInfo, TSKEY lastKey, int32_t order); +int32_t numOfClosedTimeWindow(SResultRowInfo* pResultRowInfo); +void closeTimeWindow(SResultRowInfo* pResultRowInfo, int32_t slot); +void closeAllTimeWindow(SResultRowInfo* pResultRowInfo); +void removeRedundantWindow(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_t order); -static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pWindowResInfo, int32_t slot) { - assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size); - return pWindowResInfo->pResult[slot]; +static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) { + assert(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size); + return pResultRowInfo->pResult[slot]; } -bool isWindowResClosed(SResultRowInfo *pWindowResInfo, int32_t slot); +bool isWindowResClosed(SResultRowInfo *pResultRowInfo, int32_t slot); int32_t initResultRow(SResultRow *pResultRow); From ec4e3f0282ed7c8352f966ba8458a610122a9bf4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 12 Dec 2020 16:34:22 +0800 Subject: [PATCH 05/11] [TD-225] refactor. --- src/query/inc/qUtil.h | 2 +- src/query/src/qUtil.c | 24 ++++++++++++------------ 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index 12024e7697..acba5df2f7 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -29,7 +29,7 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery); -void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pRow, int16_t type); +void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, int16_t type); void copyResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* dst, const SResultRow* src, int16_t type); SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index); diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 6c845b012f..9b0569d9e6 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -232,19 +232,19 @@ void closeTimeWindow(SResultRowInfo *pResultRowInfo, int32_t slot) { getResultRow(pResultRowInfo, slot)->closed = true; } -void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pWindowRes, int16_t type) { - if (pWindowRes == NULL) { +void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16_t type) { + if (pResultRow == NULL) { return; } // the result does not put into the SDiskbasedResultBuf, ignore it. - if (pWindowRes->pageId >= 0) { - tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId); + if (pResultRow->pageId >= 0) { + tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResultRow->pageId); for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutput; ++i) { - SResultRowCellInfo *pResultInfo = &pWindowRes->pCellInfo[i]; + SResultRowCellInfo *pResultInfo = &pResultRow->pCellInfo[i]; - char * s = getPosInResultPage(pRuntimeEnv, i, pWindowRes, page); + char * s = getPosInResultPage(pRuntimeEnv, i, pResultRow, page); size_t size = pRuntimeEnv->pQuery->pExpr1[i].bytes; memset(s, 0, size); @@ -252,15 +252,15 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pWindowRes, int16 } } - pWindowRes->numOfRows = 0; - pWindowRes->pageId = -1; - pWindowRes->rowId = -1; - pWindowRes->closed = false; + pResultRow->numOfRows = 0; + pResultRow->pageId = -1; + pResultRow->rowId = -1; + pResultRow->closed = false; if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { - tfree(pWindowRes->key); + tfree(pResultRow->key); } else { - pWindowRes->win = TSWINDOW_INITIALIZER; + pResultRow->win = TSWINDOW_INITIALIZER; } } From e561bb7ca08d3ad45999526003df3811bb93e3b8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 12 Dec 2020 17:03:24 +0800 Subject: [PATCH 06/11] [TD-225] --- src/client/src/tscFunctionImpl.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 7d41d57ccc..7560d94242 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -2222,7 +2222,7 @@ static void buildTopBotStruct(STopBotInfo *pTopBotInfo, SQLFunctionCtx *pCtx) { tmp += POINTER_BYTES * pCtx->param[0].i64Key; size_t size = sizeof(tValuePair) + pCtx->tagInfo.tagsLen; - assert(pCtx->param[0].i64Key > 0); +// assert(pCtx->param[0].i64Key > 0); for (int32_t i = 0; i < pCtx->param[0].i64Key; ++i) { pTopBotInfo->res[i] = (tValuePair*) tmp; From eb97f122322249580295f57f654f3dbfc4a3d3ae Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 12 Dec 2020 18:35:20 +0800 Subject: [PATCH 07/11] [TD-225] release the ref for tableMeta if insert success. --- src/client/src/tscSubquery.c | 4 ---- src/client/src/tscUtil.c | 6 +++++- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index d2eb16795f..1e95380096 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2279,7 +2279,6 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) */ int32_t tscHandleInsertRetry(SSqlObj* pParent, SSqlObj* pSql) { assert(pSql != NULL && pSql->param != NULL); -// SSqlCmd* pCmd = &pSql->cmd; SSqlRes* pRes = &pSql->res; SInsertSupporter* pSupporter = (SInsertSupporter*) pSql->param; @@ -2288,9 +2287,6 @@ int32_t tscHandleInsertRetry(SSqlObj* pParent, SSqlObj* pSql) { STableDataBlocks* pTableDataBlock = taosArrayGetP(pParent->cmd.pDataBlocks, pSupporter->index); int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock); - // free the data block created from insert sql string -// pCmd->pDataBlocks = tscDestroyBlockArrayList(pParent->cmd.pDataBlocks); - if ((pRes->code = code)!= TSDB_CODE_SUCCESS) { tscQueueAsyncRes(pSql); return code; // here the pSql may have been released already. diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 416e7c2dae..b7fe6aa7b2 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -405,8 +405,12 @@ void tscResetSqlCmdObj(SSqlCmd* pCmd, bool removeFromCache) { pCmd->msgType = 0; pCmd->parseFinished = 0; pCmd->autoCreated = 0; - pCmd->numOfTables = 0; + for(int32_t i = 0; i < pCmd->numOfTables; ++i) { + taosCacheRelease(tscMetaCache, (void**)&(pCmd->pTableMetaList[i]), false); + } + + pCmd->numOfTables = 0; tfree(pCmd->pTableMetaList); pCmd->pTableBlockHashList = tscDestroyBlockHashTable(pCmd->pTableBlockHashList); From aafb60988baa8b8155bc73d3519ce13d0f3bce73 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 12 Dec 2020 18:36:35 +0800 Subject: [PATCH 08/11] [TD-225] refactor. --- src/inc/ttype.h | 27 ++++++++-------- src/query/inc/qUtil.h | 37 ++++++++++----------- src/query/src/qExecutor.c | 67 ++++++++++++++++++--------------------- src/query/src/qUtil.c | 31 +++++++++--------- 4 files changed, 76 insertions(+), 86 deletions(-) diff --git a/src/inc/ttype.h b/src/inc/ttype.h index 7d5779c43f..3dd0c58ae2 100644 --- a/src/inc/ttype.h +++ b/src/inc/ttype.h @@ -8,25 +8,26 @@ extern "C" { #include "taosdef.h" #define GET_TYPED_DATA(_v, _finalType, _type, _data) \ - switch (_type) { \ - case TSDB_DATA_TYPE_TINYINT: \ + switch (_type) { \ + case TSDB_DATA_TYPE_BOOL: \ + case TSDB_DATA_TYPE_TINYINT: \ (_v) = (_finalType)GET_INT8_VAL(_data); \ - break; \ - case TSDB_DATA_TYPE_SMALLINT: \ + break; \ + case TSDB_DATA_TYPE_SMALLINT: \ (_v) = (_finalType)GET_INT16_VAL(_data); \ - break; \ - case TSDB_DATA_TYPE_BIGINT: \ + break; \ + case TSDB_DATA_TYPE_BIGINT: \ (_v) = (_finalType)(GET_INT64_VAL(_data)); \ - break; \ - case TSDB_DATA_TYPE_FLOAT: \ + break; \ + case TSDB_DATA_TYPE_FLOAT: \ (_v) = (_finalType)GET_FLOAT_VAL(_data); \ - break; \ - case TSDB_DATA_TYPE_DOUBLE: \ + break; \ + case TSDB_DATA_TYPE_DOUBLE: \ (_v) = (_finalType)GET_DOUBLE_VAL(_data); \ - break; \ - default: \ + break; \ + default: \ (_v) = (_finalType)GET_INT32_VAL(_data); \ - break; \ + break; \ }; #ifdef __cplusplus diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index acba5df2f7..dde2e39845 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -29,31 +29,30 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery); -void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, int16_t type); -void copyResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* dst, const SResultRow* src, int16_t type); +size_t getResultRowSize(SQueryRuntimeEnv* pRuntimeEnv); +int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size, int16_t type); +void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo); + +void resetResultRowInfo(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo); +void popFrontResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int32_t num); +void clearClosedResultRows(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo *pResultRowInfo); +int32_t numOfClosedResultRows(SResultRowInfo* pResultRowInfo); +void closeAllResultRows(SResultRowInfo* pResultRowInfo); +void removeRedundantResultRows(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_t order); + +int32_t initResultRow(SResultRow *pResultRow); +void closeResultRow(SResultRowInfo* pResultRowInfo, int32_t slot); +bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot); +void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, int16_t type); +void copyResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* dst, const SResultRow* src, int16_t type); + SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index); -int32_t initWindowResInfo(SResultRowInfo* pResultRowInfo, int32_t size, int16_t type); - -void cleanupTimeWindowInfo(SResultRowInfo* pResultRowInfo); -void resetTimeWindowInfo(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo); -void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num); - -void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv); -int32_t numOfClosedTimeWindow(SResultRowInfo* pResultRowInfo); -void closeTimeWindow(SResultRowInfo* pResultRowInfo, int32_t slot); -void closeAllTimeWindow(SResultRowInfo* pResultRowInfo); -void removeRedundantWindow(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_t order); - static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) { assert(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size); return pResultRowInfo->pResult[slot]; } -bool isWindowResClosed(SResultRowInfo *pResultRowInfo, int32_t slot); - -int32_t initResultRow(SResultRow *pResultRow); - static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SResultRow *pResult, tFilePage* page) { assert(pResult != NULL && pRuntimeEnv != NULL); @@ -71,8 +70,6 @@ bool notNull_filter(SColumnFilterElem *pFilter, char* minval, char* maxval); __filter_func_t *getRangeFilterFuncArray(int32_t type); __filter_func_t *getValueFilterFuncArray(int32_t type); -size_t getWindowResultSize(SQueryRuntimeEnv* pRuntimeEnv); - SResultRowPool* initResultRowPool(size_t size); SResultRow* getNewResultRow(SResultRowPool* p); int64_t getResultRowPoolMemSize(SResultRowPool* p); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 0a8916b13f..1f07e2bf6a 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -614,14 +614,14 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf return 0; } -static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pWindowResInfo, SDataBlockInfo* pBockInfo, +static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, SDataBlockInfo* pBockInfo, STimeWindow *win, bool masterscan, bool* newWind, SResultRow** pResult) { assert(win->skey <= win->ekey); SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; // todo refactor int64_t uid = getResultInfoUId(pRuntimeEnv); - SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, uid); + SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, uid); if (pResultRow == NULL) { *newWind = false; @@ -717,7 +717,7 @@ static int32_t updateResultRowCurrentIndex(SResultRowInfo* pWindowResInfo, TSKEY TSKEY ekey = pResult->win.ekey; if ((ekey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery)) { - closeTimeWindow(pWindowResInfo, i); + closeResultRow(pWindowResInfo, i); } else { skey = pResult->win.skey; break; @@ -751,7 +751,7 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe // query completed if ((lastKey >= pQuery->current->win.ekey && ascQuery) || (lastKey <= pQuery->current->win.ekey && (!ascQuery))) { - closeAllTimeWindow(pWindowResInfo); + closeAllResultRows(pWindowResInfo); pWindowResInfo->curIndex = pWindowResInfo->size - 1; setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL); @@ -1351,14 +1351,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat } int64_t v = -1; - switch(type) { - case TSDB_DATA_TYPE_BOOL: - case TSDB_DATA_TYPE_TINYINT: v = GET_INT8_VAL(pData); break; - case TSDB_DATA_TYPE_SMALLINT: v = GET_INT16_VAL(pData); break; - case TSDB_DATA_TYPE_INT: v = GET_INT32_VAL(pData); break; - case TSDB_DATA_TYPE_BIGINT: v = GET_INT64_VAL(pData); break; - } - + GET_TYPED_DATA(v, int64_t, type, pData); if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { if (pResultRow->key == NULL) { pResultRow->key = malloc(varDataTLen(pData)); @@ -1790,7 +1783,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl if (QUERY_IS_INTERVAL_QUERY(pQuery)) { numOfRes = doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); } else if (pRuntimeEnv->groupbyNormalCol) { - closeAllTimeWindow(pWindowResInfo); + closeAllResultRows(pWindowResInfo); numOfRes = pWindowResInfo->size; } else { // projection query numOfRes = (int32_t)getNumOfResult(pRuntimeEnv); @@ -2094,7 +2087,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv); qDebug("QInfo:%p teardown runtime env", pQInfo); - cleanupTimeWindowInfo(&pRuntimeEnv->windowResInfo); + cleanupResultRowInfo(&pRuntimeEnv->windowResInfo); if (pRuntimeEnv->pCtx != NULL) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { @@ -2928,7 +2921,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { if (QUERY_IS_INTERVAL_QUERY(pQuery) && (IS_MASTER_SCAN(pRuntimeEnv)|| pRuntimeEnv->scanFlag == REPEAT_SCAN)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - closeAllTimeWindow(&pRuntimeEnv->windowResInfo); + closeAllResultRows(&pRuntimeEnv->windowResInfo); pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1; // point to the last time window } else { assert(Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)); @@ -3707,8 +3700,8 @@ void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) { int32_t initResultRow(SResultRow *pResultRow) { pResultRow->pCellInfo = (SResultRowCellInfo*)((char*)pResultRow + sizeof(SResultRow)); - pResultRow->pageId = -1; - pResultRow->rowId = -1; + pResultRow->pageId = -1; + pResultRow->rowId = -1; return TSDB_CODE_SUCCESS; } @@ -4057,12 +4050,12 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { // for each group result, call the finalize function for each column SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; if (pRuntimeEnv->groupbyNormalCol) { - closeAllTimeWindow(pWindowResInfo); + closeAllResultRows(pWindowResInfo); } for (int32_t i = 0; i < pWindowResInfo->size; ++i) { SResultRow *buf = pWindowResInfo->pResult[i]; - if (!isWindowResClosed(pWindowResInfo, i)) { + if (!isResultRowClosed(pWindowResInfo, i)) { continue; } @@ -4112,7 +4105,7 @@ static STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, void // set more initial size of interval/groupby query if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) { int32_t initialSize = 128; - int32_t code = initWindowResInfo(&pTableQueryInfo->windowResInfo, initialSize, TSDB_DATA_TYPE_INT); + int32_t code = initResultRowInfo(&pTableQueryInfo->windowResInfo, initialSize, TSDB_DATA_TYPE_INT); if (code != TSDB_CODE_SUCCESS) { return NULL; } @@ -4128,7 +4121,7 @@ void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) { } tVariantDestroy(&pTableQueryInfo->tag); - cleanupTimeWindowInfo(&pTableQueryInfo->windowResInfo); + cleanupResultRowInfo(&pTableQueryInfo->windowResInfo); } /** @@ -4360,7 +4353,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SResultRowInfo *pResultInfo, int32_ int32_t step = -1; qDebug("QInfo:%p start to copy data from windowResInfo to query buf", pQInfo); - int32_t totalSet = numOfClosedTimeWindow(pResultInfo); + int32_t totalSet = numOfClosedResultRows(pResultInfo); SResultRow** result = pResultInfo->pResult; if (orderType == TSDB_ORDER_ASC) { @@ -4481,7 +4474,7 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc // TODO refactor if ((pTableQueryInfo->lastKey >= pTableQueryInfo->win.ekey && ascQuery) || (pTableQueryInfo->lastKey <= pTableQueryInfo->win.ekey && (!ascQuery))) { - closeAllTimeWindow(pWindowResInfo); + closeAllResultRows(pWindowResInfo); pWindowResInfo->curIndex = pWindowResInfo->size - 1; } else { updateResultRowCurrentIndex(pWindowResInfo, pTableQueryInfo->lastKey, ascQuery); @@ -5031,7 +5024,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo type = TSDB_DATA_TYPE_INT; // group id } - code = initWindowResInfo(&pRuntimeEnv->windowResInfo, 8, type); + code = initResultRowInfo(&pRuntimeEnv->windowResInfo, 8, type); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -5051,7 +5044,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo type = TSDB_DATA_TYPE_TIMESTAMP; } - code = initWindowResInfo(&pRuntimeEnv->windowResInfo, numOfResultRows, type); + code = initResultRowInfo(&pRuntimeEnv->windowResInfo, numOfResultRows, type); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -5479,7 +5472,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { pQInfo->groupIndex = currentGroupIndex; // restore the group index assert(pQuery->rec.rows == pWindowResInfo->size); - clearClosedTimeWindow(pRuntimeEnv); + clearClosedResultRows(pRuntimeEnv, &pRuntimeEnv->windowResInfo); break; } } else if (pRuntimeEnv->queryWindowIdentical && pRuntimeEnv->pTSBuf == NULL && !isTSCompQuery(pQuery)) { @@ -5641,7 +5634,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } resetDefaultResInfoOutputBuf(pRuntimeEnv); - resetTimeWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo); + resetResultRowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo); SArray *group = GET_TABLEGROUP(pQInfo, 0); assert(taosArrayGetSize(group) == pQInfo->tableqinfoGroupInfo.numOfTables && @@ -5796,11 +5789,11 @@ static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) { size_t num = taosArrayGetSize(group); for (int32_t j = 0; j < num; ++j) { STableQueryInfo* item = taosArrayGetP(group, j); - closeAllTimeWindow(&item->windowResInfo); + closeAllResultRows(&item->windowResInfo); } } } else { // close results for group result - closeAllTimeWindow(&pQInfo->runtimeEnv.windowResInfo); + closeAllResultRows(&pQInfo->runtimeEnv.windowResInfo); } } @@ -6048,10 +6041,10 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) if ((pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) && pQuery->limit.offset > 0 && pQuery->fillType == TSDB_FILL_NONE) { // maxOutput <= 0, means current query does not generate any results - int32_t numOfClosed = numOfClosedTimeWindow(&pRuntimeEnv->windowResInfo); + int32_t numOfClosed = numOfClosedResultRows(&pRuntimeEnv->windowResInfo); int32_t c = (int32_t)(MIN(numOfClosed, pQuery->limit.offset)); - clearFirstNWindowRes(pRuntimeEnv, c); + popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, c); pQuery->limit.offset -= c; } @@ -6088,7 +6081,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { pQuery->rec.rows = 0; copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo); - clearFirstNWindowRes(pRuntimeEnv, pQInfo->groupIndex); + popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pQInfo->groupIndex); } // no result generated, abort @@ -6121,16 +6114,16 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { // all data scanned, the group by normal column can return if (pRuntimeEnv->groupbyNormalCol) { // todo refactor with merge interval time result // maxOutput <= 0, means current query does not generate any results - int32_t numOfClosed = numOfClosedTimeWindow(&pRuntimeEnv->windowResInfo); + int32_t numOfClosed = numOfClosedResultRows(&pRuntimeEnv->windowResInfo); if ((pQuery->limit.offset > 0 && pQuery->limit.offset < numOfClosed) || pQuery->limit.offset == 0) { // skip offset result rows - clearFirstNWindowRes(pRuntimeEnv, (int32_t) pQuery->limit.offset); + popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (int32_t) pQuery->limit.offset); pQuery->rec.rows = 0; pQInfo->groupIndex = 0; copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo); - clearFirstNWindowRes(pRuntimeEnv, pQInfo->groupIndex); + popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pQInfo->groupIndex); doSecondaryArithmeticProcess(pQuery); limitResults(pRuntimeEnv); @@ -6164,7 +6157,7 @@ static void tableQueryImpl(SQInfo *pQInfo) { if (pRuntimeEnv->windowResInfo.size > 0) { copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo); - clearFirstNWindowRes(pRuntimeEnv, pQInfo->groupIndex); + popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pQInfo->groupIndex); if (pQuery->rec.rows > 0) { qDebug("QInfo:%p %"PRId64" rows returned from group results, total:%"PRId64"", pQInfo, pQuery->rec.rows, pQuery->rec.total); @@ -7029,7 +7022,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou pQInfo->runtimeEnv.pResultRowHashTable = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pQInfo->runtimeEnv.keyBuf = malloc(TSDB_MAX_BYTES_PER_ROW); - pQInfo->runtimeEnv.pool = initResultRowPool(getWindowResultSize(&pQInfo->runtimeEnv)); + pQInfo->runtimeEnv.pool = initResultRowPool(getResultRowSize(&pQInfo->runtimeEnv)); pQInfo->runtimeEnv.prevRow = malloc(POINTER_BYTES * pQuery->numOfCols + srcSize); char* start = POINTER_BYTES * pQuery->numOfCols + (char*) pQInfo->runtimeEnv.prevRow; diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 9b0569d9e6..65ac60e91f 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -43,7 +43,7 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery) { return size; } -int32_t initWindowResInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t type) { +int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t type) { pResultRowInfo->capacity = size; pResultRowInfo->type = type; @@ -59,10 +59,11 @@ int32_t initWindowResInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t return TSDB_CODE_SUCCESS; } -void cleanupTimeWindowInfo(SResultRowInfo *pResultRowInfo) { +void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) { if (pResultRowInfo == NULL) { return; } + if (pResultRowInfo->capacity == 0) { assert(pResultRowInfo->pResult == NULL); return; @@ -77,7 +78,7 @@ void cleanupTimeWindowInfo(SResultRowInfo *pResultRowInfo) { tfree(pResultRowInfo->pResult); } -void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) { +void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) { if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0) { return; } @@ -100,13 +101,12 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultR pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL; } -void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { - SResultRowInfo *pResultRowInfo = &pRuntimeEnv->windowResInfo; +void popFrontResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int32_t num) { if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0 || pResultRowInfo->size == 0 || num == 0) { return; } - int32_t numOfClosed = numOfClosedTimeWindow(pResultRowInfo); + int32_t numOfClosed = numOfClosedResultRows(pResultRowInfo); assert(num >= 0 && num <= numOfClosed); int16_t type = pResultRowInfo->type; @@ -159,17 +159,16 @@ void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { pResultRowInfo->curIndex = -1; } -void clearClosedTimeWindow(SQueryRuntimeEnv *pRuntimeEnv) { - SResultRowInfo *pResultRowInfo = &pRuntimeEnv->windowResInfo; +void clearClosedResultRows(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) { if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0 || pResultRowInfo->size == 0) { return; } - int32_t numOfClosed = numOfClosedTimeWindow(pResultRowInfo); - clearFirstNWindowRes(pRuntimeEnv, numOfClosed); + int32_t numOfClosed = numOfClosedResultRows(pResultRowInfo); + popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, numOfClosed); } -int32_t numOfClosedTimeWindow(SResultRowInfo *pResultRowInfo) { +int32_t numOfClosedResultRows(SResultRowInfo *pResultRowInfo) { int32_t i = 0; while (i < pResultRowInfo->size && pResultRowInfo->pResult[i]->closed) { ++i; @@ -178,7 +177,7 @@ int32_t numOfClosedTimeWindow(SResultRowInfo *pResultRowInfo) { return i; } -void closeAllTimeWindow(SResultRowInfo *pResultRowInfo) { +void closeAllResultRows(SResultRowInfo *pResultRowInfo) { assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size); for (int32_t i = 0; i < pResultRowInfo->size; ++i) { @@ -195,7 +194,7 @@ void closeAllTimeWindow(SResultRowInfo *pResultRowInfo) { * the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time. * NOTE: remove redundant, only when the result set order equals to traverse order */ -void removeRedundantWindow(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_t order) { +void removeRedundantResultRows(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_t order) { assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size); if (pResultRowInfo->size <= 1) { return; @@ -224,11 +223,11 @@ void removeRedundantWindow(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_ } } -bool isWindowResClosed(SResultRowInfo *pResultRowInfo, int32_t slot) { +bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot) { return (getResultRow(pResultRowInfo, slot)->closed == true); } -void closeTimeWindow(SResultRowInfo *pResultRowInfo, int32_t slot) { +void closeResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) { getResultRow(pResultRowInfo, slot)->closed = true; } @@ -310,7 +309,7 @@ SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRo return (SResultRowCellInfo*)((char*) pRow->pCellInfo + pRuntimeEnv->rowCellInfoOffset[index]); } -size_t getWindowResultSize(SQueryRuntimeEnv* pRuntimeEnv) { +size_t getResultRowSize(SQueryRuntimeEnv* pRuntimeEnv) { return (pRuntimeEnv->pQuery->numOfOutput * sizeof(SResultRowCellInfo)) + pRuntimeEnv->interBufSize + sizeof(SResultRow); } From af023836139d849d443d8e951bd5916fcbbb6719 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 12 Dec 2020 19:24:18 +0800 Subject: [PATCH 09/11] [TD-225] release the ref if insertion succeeds. --- src/client/src/tscUtil.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index b7fe6aa7b2..4004e0f3ea 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -407,7 +407,9 @@ void tscResetSqlCmdObj(SSqlCmd* pCmd, bool removeFromCache) { pCmd->autoCreated = 0; for(int32_t i = 0; i < pCmd->numOfTables; ++i) { - taosCacheRelease(tscMetaCache, (void**)&(pCmd->pTableMetaList[i]), false); + if (pCmd->pTableMetaList[i] != NULL) { + taosCacheRelease(tscMetaCache, (void**)&(pCmd->pTableMetaList[i]), false); + } } pCmd->numOfTables = 0; From 112ebfa12b077b79bfe036bef605b36167aa7c54 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 12 Dec 2020 23:13:21 +0800 Subject: [PATCH 10/11] [TD-2433]: fix the bug that server_status() not working. --- src/client/inc/tsclient.h | 4 ++-- src/client/src/tscLocal.c | 38 ++++++++++++++++++++++--------------- src/client/src/tscServer.c | 39 +++++++++++++++++++++++++++++++++----- 3 files changed, 59 insertions(+), 22 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index bdb35cb072..f2a9b9db43 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -285,8 +285,8 @@ typedef struct { char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t) SColumnIndex* pColumnIndex; - SArithmeticSupport* pArithSup; // support the arithmetic expression calculation on agg functions - struct SLocalReducer* pLocalReducer; + SArithmeticSupport *pArithSup; // support the arithmetic expression calculation on agg functions + struct SLocalReducer *pLocalReducer; } SSqlRes; typedef struct STscObj { diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 4c28adc261..ab52ef396b 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -46,7 +46,8 @@ typedef struct SCreateBuilder { SSqlObj *pInterSql; int32_t (*fp)(void *para, char* result); Stage callStage; -} SCreateBuilder; +} SCreateBuilder; + static void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnName, int16_t type, size_t valueLength); static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) { @@ -207,10 +208,7 @@ static int32_t tscProcessDescribeTable(SSqlObj *pSql) { const int32_t TYPE_COLUMN_LENGTH = 16; const int32_t NOTE_COLUMN_MIN_LENGTH = 8; - int32_t noteFieldLen = NOTE_COLUMN_MIN_LENGTH;//tscMaxLengthOfTagsFields(pSql); -// if (noteFieldLen == 0) { -// noteFieldLen = NOTE_COLUMN_MIN_LENGTH; -// } + int32_t noteFieldLen = NOTE_COLUMN_MIN_LENGTH; int32_t rowLen = tscBuildTableSchemaResultFields(pSql, NUM_OF_DESC_TABLE_COLUMNS, TYPE_COLUMN_LENGTH, noteFieldLen); tscFieldInfoUpdateOffset(pQueryInfo); @@ -822,26 +820,36 @@ static int32_t tscProcessClientVer(SSqlObj *pSql) { } +// TODO add test cases. +static int32_t checkForOnlineNode(SSqlObj* pSql) { + int32_t* data = pSql->res.length; + + int32_t total = data[0]; + int32_t online = data[1]; + return (online < total)? TSDB_CODE_RPC_NETWORK_UNAVAIL:TSDB_CODE_SUCCESS; +} + static int32_t tscProcessServStatus(SSqlObj *pSql) { STscObj* pObj = pSql->pTscObj; SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid); if (pHb != NULL) { - int32_t code = pHb->res.code; + pSql->res.code = pHb->res.code; taosReleaseRef(tscObjRef, pObj->hbrid); - if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { - pSql->res.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - return pSql->res.code; - } - } else { - if (pSql->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { - return pSql->res.code; - } + } + + if (pSql->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { + return pSql->res.code; + } + + pSql->res.code = checkForOnlineNode(pHb); + if (pSql->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { + return pSql->res.code; } SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); + int32_t val = 1; tscSetLocalQueryResult(pSql, (char*) &val, pExpr->aliasName, TSDB_DATA_TYPE_INT, sizeof(int32_t)); return TSDB_CODE_SUCCESS; diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 994dace1e3..f450f4aa40 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -147,15 +147,15 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { SSqlObj *pSql = tres; SSqlRes *pRes = &pSql->res; - if (code == 0) { + if (code == TSDB_CODE_SUCCESS) { SHeartBeatRsp *pRsp = (SHeartBeatRsp *)pRes->pRsp; - SRpcEpSet * epSet = &pRsp->epSet; + SRpcEpSet *epSet = &pRsp->epSet; if (epSet->numOfEps > 0) { tscEpSetHtons(epSet); if (!tscEpSetIsEqual(&pSql->pTscObj->tscCorMgmtEpSet->epSet, epSet)) { tscTrace("%p updating epset: numOfEps: %d, inUse: %d", pSql, epSet->numOfEps, epSet->inUse); for (int8_t i = 0; i < epSet->numOfEps; i++) { - tscTrace("endpoint %d: fqdn = %s, port=%d", i, epSet->fqdn[i], epSet->port[i]); + tscTrace("endpoint %d: fqdn=%s, port=%d", i, epSet->fqdn[i], epSet->port[i]); } tscUpdateMgmtEpSet(pSql, epSet); } @@ -167,11 +167,40 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { tscKillConnection(pObj); return; } else { - if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId)); - if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId)); + if (pRsp->queryId) { + tscKillQuery(pObj, htonl(pRsp->queryId)); + } + + if (pRsp->streamId) { + tscKillStream(pObj, htonl(pRsp->streamId)); + } } + + int32_t total = htonl(pRsp->totalDnodes); + int32_t online = htonl(pRsp->onlineDnodes); + assert(online <= total); + + if (online < total) { + tscError("HB:%p, total dnode:%d, online dnode:%d", pSql, total, online); + pSql->res.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + } + + if (pRes->buffer == NULL) { + pRes->length = calloc(2, sizeof(int32_t)); + } + + pRes->length[0] = total; + pRes->length[1] = online; } else { tscDebug("%" PRId64 " heartbeat failed, code:%s", pObj->hbrid, tstrerror(code)); + if (pRes->buffer == NULL) { + pRes->length = calloc(2, sizeof(int32_t)); + } + + pRes->length[1] = 0; + if (pRes->length[0] == 0) { + pRes->length[0] = 1; // make sure that the value of the total node is greater than the online node + } } if (pObj->hbrid != 0) { From 261adee98a8ffe376aeafb4998e990a04afd6652 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 13 Dec 2020 22:19:01 +0800 Subject: [PATCH 11/11] [TD-2433]: fix the bug that server_status() not working. --- src/client/src/tscLocal.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index ab52ef396b..dfc0e3af4e 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -823,6 +823,9 @@ static int32_t tscProcessClientVer(SSqlObj *pSql) { // TODO add test cases. static int32_t checkForOnlineNode(SSqlObj* pSql) { int32_t* data = pSql->res.length; + if (data == NULL) { + return TSDB_CODE_SUCCESS; + } int32_t total = data[0]; int32_t online = data[1];