diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index bdb35cb072..f2a9b9db43 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -285,8 +285,8 @@ typedef struct { char ** buffer; // Buffer used to put multibytes encoded using unicode (wchar_t) SColumnIndex* pColumnIndex; - SArithmeticSupport* pArithSup; // support the arithmetic expression calculation on agg functions - struct SLocalReducer* pLocalReducer; + SArithmeticSupport *pArithSup; // support the arithmetic expression calculation on agg functions + struct SLocalReducer *pLocalReducer; } SSqlRes; typedef struct STscObj { diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 62e55c033f..7560d94242 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -2222,7 +2222,8 @@ static void buildTopBotStruct(STopBotInfo *pTopBotInfo, SQLFunctionCtx *pCtx) { tmp += POINTER_BYTES * pCtx->param[0].i64Key; size_t size = sizeof(tValuePair) + pCtx->tagInfo.tagsLen; - +// assert(pCtx->param[0].i64Key > 0); + for (int32_t i = 0; i < pCtx->param[0].i64Key; ++i) { pTopBotInfo->res[i] = (tValuePair*) tmp; pTopBotInfo->res[i]->pTags = tmp + sizeof(tValuePair); diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index bd444a1231..192af4dbdf 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -46,7 +46,8 @@ typedef struct SCreateBuilder { SSqlObj *pInterSql; int32_t (*fp)(void *para, char* result); Stage callStage; -} SCreateBuilder; +} SCreateBuilder; + static void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnName, int16_t type, size_t valueLength); static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) { @@ -207,10 +208,7 @@ static int32_t tscProcessDescribeTable(SSqlObj *pSql) { const int32_t TYPE_COLUMN_LENGTH = 16; const int32_t NOTE_COLUMN_MIN_LENGTH = 8; - int32_t noteFieldLen = NOTE_COLUMN_MIN_LENGTH;//tscMaxLengthOfTagsFields(pSql); -// if (noteFieldLen == 0) { -// noteFieldLen = NOTE_COLUMN_MIN_LENGTH; -// } + int32_t noteFieldLen = NOTE_COLUMN_MIN_LENGTH; int32_t rowLen = tscBuildTableSchemaResultFields(pSql, NUM_OF_DESC_TABLE_COLUMNS, TYPE_COLUMN_LENGTH, noteFieldLen); tscFieldInfoUpdateOffset(pQueryInfo); @@ -822,26 +820,39 @@ static int32_t tscProcessClientVer(SSqlObj *pSql) { } +// TODO add test cases. +static int32_t checkForOnlineNode(SSqlObj* pSql) { + int32_t* data = pSql->res.length; + if (data == NULL) { + return TSDB_CODE_SUCCESS; + } + + int32_t total = data[0]; + int32_t online = data[1]; + return (online < total)? TSDB_CODE_RPC_NETWORK_UNAVAIL:TSDB_CODE_SUCCESS; +} + static int32_t tscProcessServStatus(SSqlObj *pSql) { STscObj* pObj = pSql->pTscObj; SSqlObj* pHb = (SSqlObj*)taosAcquireRef(tscObjRef, pObj->hbrid); if (pHb != NULL) { - int32_t code = pHb->res.code; + pSql->res.code = pHb->res.code; taosReleaseRef(tscObjRef, pObj->hbrid); - if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { - pSql->res.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - return pSql->res.code; - } - } else { - if (pSql->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { - return pSql->res.code; - } + } + + if (pSql->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { + return pSql->res.code; + } + + pSql->res.code = checkForOnlineNode(pHb); + if (pSql->res.code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { + return pSql->res.code; } SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0); + int32_t val = 1; tscSetLocalQueryResult(pSql, (char*) &val, pExpr->aliasName, TSDB_DATA_TYPE_INT, sizeof(int32_t)); return TSDB_CODE_SUCCESS; diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 8e4ef91e27..1cfbea4cc4 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -6562,7 +6562,7 @@ int32_t doCheckForQuery(SSqlObj* pSql, SQuerySQL* pQuerySql, int32_t index) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg5); } - if (pQueryInfo->interval.interval > 0 && pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit != 'y') { + if (pQueryInfo->interval.interval > 0) { bool initialWindows = TSWINDOW_IS_EQUAL(pQueryInfo->window, TSWINDOW_INITIALIZER); if (initialWindows) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index a9f64e0764..0e3dc3efd5 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -147,15 +147,15 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { SSqlObj *pSql = tres; SSqlRes *pRes = &pSql->res; - if (code == 0) { + if (code == TSDB_CODE_SUCCESS) { SHeartBeatRsp *pRsp = (SHeartBeatRsp *)pRes->pRsp; - SRpcEpSet * epSet = &pRsp->epSet; + SRpcEpSet *epSet = &pRsp->epSet; if (epSet->numOfEps > 0) { tscEpSetHtons(epSet); if (!tscEpSetIsEqual(&pSql->pTscObj->tscCorMgmtEpSet->epSet, epSet)) { tscTrace("%p updating epset: numOfEps: %d, inUse: %d", pSql, epSet->numOfEps, epSet->inUse); for (int8_t i = 0; i < epSet->numOfEps; i++) { - tscTrace("endpoint %d: fqdn = %s, port=%d", i, epSet->fqdn[i], epSet->port[i]); + tscTrace("endpoint %d: fqdn=%s, port=%d", i, epSet->fqdn[i], epSet->port[i]); } tscUpdateMgmtEpSet(pSql, epSet); } @@ -167,11 +167,40 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) { tscKillConnection(pObj); return; } else { - if (pRsp->queryId) tscKillQuery(pObj, htonl(pRsp->queryId)); - if (pRsp->streamId) tscKillStream(pObj, htonl(pRsp->streamId)); + if (pRsp->queryId) { + tscKillQuery(pObj, htonl(pRsp->queryId)); + } + + if (pRsp->streamId) { + tscKillStream(pObj, htonl(pRsp->streamId)); + } } + + int32_t total = htonl(pRsp->totalDnodes); + int32_t online = htonl(pRsp->onlineDnodes); + assert(online <= total); + + if (online < total) { + tscError("HB:%p, total dnode:%d, online dnode:%d", pSql, total, online); + pSql->res.code = TSDB_CODE_RPC_NETWORK_UNAVAIL; + } + + if (pRes->buffer == NULL) { + pRes->length = calloc(2, sizeof(int32_t)); + } + + pRes->length[0] = total; + pRes->length[1] = online; } else { tscDebug("%" PRId64 " heartbeat failed, code:%s", pObj->hbrid, tstrerror(code)); + if (pRes->buffer == NULL) { + pRes->length = calloc(2, sizeof(int32_t)); + } + + pRes->length[1] = 0; + if (pRes->length[0] == 0) { + pRes->length[0] = 1; // make sure that the value of the total node is greater than the online node + } } if (pObj->hbrid != 0) { diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index d2eb16795f..1e95380096 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -2279,7 +2279,6 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) */ int32_t tscHandleInsertRetry(SSqlObj* pParent, SSqlObj* pSql) { assert(pSql != NULL && pSql->param != NULL); -// SSqlCmd* pCmd = &pSql->cmd; SSqlRes* pRes = &pSql->res; SInsertSupporter* pSupporter = (SInsertSupporter*) pSql->param; @@ -2288,9 +2287,6 @@ int32_t tscHandleInsertRetry(SSqlObj* pParent, SSqlObj* pSql) { STableDataBlocks* pTableDataBlock = taosArrayGetP(pParent->cmd.pDataBlocks, pSupporter->index); int32_t code = tscCopyDataBlockToPayload(pSql, pTableDataBlock); - // free the data block created from insert sql string -// pCmd->pDataBlocks = tscDestroyBlockArrayList(pParent->cmd.pDataBlocks); - if ((pRes->code = code)!= TSDB_CODE_SUCCESS) { tscQueueAsyncRes(pSql); return code; // here the pSql may have been released already. diff --git a/src/inc/ttype.h b/src/inc/ttype.h index 7d5779c43f..3dd0c58ae2 100644 --- a/src/inc/ttype.h +++ b/src/inc/ttype.h @@ -8,25 +8,26 @@ extern "C" { #include "taosdef.h" #define GET_TYPED_DATA(_v, _finalType, _type, _data) \ - switch (_type) { \ - case TSDB_DATA_TYPE_TINYINT: \ + switch (_type) { \ + case TSDB_DATA_TYPE_BOOL: \ + case TSDB_DATA_TYPE_TINYINT: \ (_v) = (_finalType)GET_INT8_VAL(_data); \ - break; \ - case TSDB_DATA_TYPE_SMALLINT: \ + break; \ + case TSDB_DATA_TYPE_SMALLINT: \ (_v) = (_finalType)GET_INT16_VAL(_data); \ - break; \ - case TSDB_DATA_TYPE_BIGINT: \ + break; \ + case TSDB_DATA_TYPE_BIGINT: \ (_v) = (_finalType)(GET_INT64_VAL(_data)); \ - break; \ - case TSDB_DATA_TYPE_FLOAT: \ + break; \ + case TSDB_DATA_TYPE_FLOAT: \ (_v) = (_finalType)GET_FLOAT_VAL(_data); \ - break; \ - case TSDB_DATA_TYPE_DOUBLE: \ + break; \ + case TSDB_DATA_TYPE_DOUBLE: \ (_v) = (_finalType)GET_DOUBLE_VAL(_data); \ - break; \ - default: \ + break; \ + default: \ (_v) = (_finalType)GET_INT32_VAL(_data); \ - break; \ + break; \ }; #ifdef __cplusplus diff --git a/src/query/inc/qUtil.h b/src/query/inc/qUtil.h index fb71c8a5fe..dde2e39845 100644 --- a/src/query/inc/qUtil.h +++ b/src/query/inc/qUtil.h @@ -24,35 +24,34 @@ #define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t)) -int32_t getOutputInterResultBufSize(SQuery* pQuery); - -void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pRow, int16_t type); -void copyResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* dst, const SResultRow* src, int16_t type); -SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index); - -int32_t initWindowResInfo(SResultRowInfo* pWindowResInfo, int32_t size, int16_t type); - -void cleanupTimeWindowInfo(SResultRowInfo* pWindowResInfo); -void resetTimeWindowInfo(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pWindowResInfo); -void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num); - -void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv); -int32_t numOfClosedTimeWindow(SResultRowInfo* pWindowResInfo); -void closeTimeWindow(SResultRowInfo* pWindowResInfo, int32_t slot); -void closeAllTimeWindow(SResultRowInfo* pWindowResInfo); -void removeRedundantWindow(SResultRowInfo *pWindowResInfo, TSKEY lastKey, int32_t order); - -static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pWindowResInfo, int32_t slot) { - assert(pWindowResInfo != NULL && slot >= 0 && slot < pWindowResInfo->size); - return pWindowResInfo->pResult[slot]; -} - #define curTimeWindowIndex(_winres) ((_winres)->curIndex) #define GET_ROW_PARAM_FOR_MULTIOUTPUT(_q, tbq, sq) (((tbq) && (!sq))? (_q)->pExpr1[1].base.arg->argValue.i64:1) -bool isWindowResClosed(SResultRowInfo *pWindowResInfo, int32_t slot); +int32_t getOutputInterResultBufSize(SQuery* pQuery); + +size_t getResultRowSize(SQueryRuntimeEnv* pRuntimeEnv); +int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size, int16_t type); +void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo); + +void resetResultRowInfo(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo); +void popFrontResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int32_t num); +void clearClosedResultRows(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo *pResultRowInfo); +int32_t numOfClosedResultRows(SResultRowInfo* pResultRowInfo); +void closeAllResultRows(SResultRowInfo* pResultRowInfo); +void removeRedundantResultRows(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_t order); int32_t initResultRow(SResultRow *pResultRow); +void closeResultRow(SResultRowInfo* pResultRowInfo, int32_t slot); +bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot); +void clearResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* pResultRow, int16_t type); +void copyResultRow(SQueryRuntimeEnv* pRuntimeEnv, SResultRow* dst, const SResultRow* src, int16_t type); + +SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRow* pRow, int32_t index); + +static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) { + assert(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size); + return pResultRowInfo->pResult[slot]; +} static FORCE_INLINE char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SResultRow *pResult, tFilePage* page) { @@ -71,8 +70,6 @@ bool notNull_filter(SColumnFilterElem *pFilter, char* minval, char* maxval); __filter_func_t *getRangeFilterFuncArray(int32_t type); __filter_func_t *getValueFilterFuncArray(int32_t type); -size_t getWindowResultSize(SQueryRuntimeEnv* pRuntimeEnv); - SResultRowPool* initResultRowPool(size_t size); SResultRow* getNewResultRow(SResultRowPool* p); int64_t getResultRowPoolMemSize(SResultRowPool* p); diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 58f05c9d9d..1f07e2bf6a 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -464,13 +464,13 @@ static bool hasNullValue(SColIndex* pColIndex, SDataStatis *pStatis, SDataStatis return true; } -static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pWindowResInfo, char *pData, +static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, char *pData, int16_t bytes, bool masterscan, uint64_t uid) { SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid); int32_t *p1 = (int32_t *)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); if (p1 != NULL) { - pWindowResInfo->curIndex = *p1; + pResultRowInfo->curIndex = *p1; } else { if (!masterscan) { // not master scan, do not add new timewindow return NULL; @@ -478,46 +478,46 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes // TODO refactor // more than the capacity, reallocate the resources - if (pWindowResInfo->size >= pWindowResInfo->capacity) { + if (pResultRowInfo->size >= pResultRowInfo->capacity) { int64_t newCapacity = 0; - if (pWindowResInfo->capacity > 10000) { - newCapacity = (int64_t)(pWindowResInfo->capacity * 1.25); + if (pResultRowInfo->capacity > 10000) { + newCapacity = (int64_t)(pResultRowInfo->capacity * 1.25); } else { - newCapacity = (int64_t)(pWindowResInfo->capacity * 1.5); + newCapacity = (int64_t)(pResultRowInfo->capacity * 1.5); } - char *t = realloc(pWindowResInfo->pResult, (size_t)(newCapacity * POINTER_BYTES)); + char *t = realloc(pResultRowInfo->pResult, (size_t)(newCapacity * POINTER_BYTES)); if (t == NULL) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - pWindowResInfo->pResult = (SResultRow **)t; + pResultRowInfo->pResult = (SResultRow **)t; - int32_t inc = (int32_t)newCapacity - pWindowResInfo->capacity; - memset(&pWindowResInfo->pResult[pWindowResInfo->capacity], 0, POINTER_BYTES * inc); + int32_t inc = (int32_t)newCapacity - pResultRowInfo->capacity; + memset(&pResultRowInfo->pResult[pResultRowInfo->capacity], 0, POINTER_BYTES * inc); - pWindowResInfo->capacity = (int32_t)newCapacity; + pResultRowInfo->capacity = (int32_t)newCapacity; } SResultRow *pResult = getNewResultRow(pRuntimeEnv->pool); - pWindowResInfo->pResult[pWindowResInfo->size] = pResult; + pResultRowInfo->pResult[pResultRowInfo->size] = pResult; int32_t ret = initResultRow(pResult); if (ret != TSDB_CODE_SUCCESS) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } // add a new result set for a new group - pWindowResInfo->curIndex = pWindowResInfo->size++; + pResultRowInfo->curIndex = pResultRowInfo->size++; taosHashPut(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), - (char *)&pWindowResInfo->curIndex, sizeof(int32_t)); + (char *)&pResultRowInfo->curIndex, sizeof(int32_t)); } // too many time window in query - if (pWindowResInfo->size > MAX_INTERVAL_TIME_WINDOW) { + if (pResultRowInfo->size > MAX_INTERVAL_TIME_WINDOW) { longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW); } - return getResultRow(pWindowResInfo, pWindowResInfo->curIndex); + return getResultRow(pResultRowInfo, pResultRowInfo->curIndex); } // get the correct time window according to the handled timestamp @@ -614,14 +614,14 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf return 0; } -static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pWindowResInfo, SDataBlockInfo* pBockInfo, +static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, SDataBlockInfo* pBockInfo, STimeWindow *win, bool masterscan, bool* newWind, SResultRow** pResult) { assert(win->skey <= win->ekey); SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; // todo refactor int64_t uid = getResultInfoUId(pRuntimeEnv); - SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, uid); + SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, uid); if (pResultRow == NULL) { *newWind = false; @@ -717,7 +717,7 @@ static int32_t updateResultRowCurrentIndex(SResultRowInfo* pWindowResInfo, TSKEY TSKEY ekey = pResult->win.ekey; if ((ekey <= lastKey && ascQuery) || (pResult->win.skey >= lastKey && !ascQuery)) { - closeTimeWindow(pWindowResInfo, i); + closeResultRow(pWindowResInfo, i); } else { skey = pResult->win.skey; break; @@ -751,7 +751,7 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe // query completed if ((lastKey >= pQuery->current->win.ekey && ascQuery) || (lastKey <= pQuery->current->win.ekey && (!ascQuery))) { - closeAllTimeWindow(pWindowResInfo); + closeAllResultRows(pWindowResInfo); pWindowResInfo->curIndex = pWindowResInfo->size - 1; setQueryStatus(pQuery, QUERY_COMPLETED | QUERY_RESBUF_FULL); @@ -1351,14 +1351,7 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat } int64_t v = -1; - switch(type) { - case TSDB_DATA_TYPE_BOOL: - case TSDB_DATA_TYPE_TINYINT: v = GET_INT8_VAL(pData); break; - case TSDB_DATA_TYPE_SMALLINT: v = GET_INT16_VAL(pData); break; - case TSDB_DATA_TYPE_INT: v = GET_INT32_VAL(pData); break; - case TSDB_DATA_TYPE_BIGINT: v = GET_INT64_VAL(pData); break; - } - + GET_TYPED_DATA(v, int64_t, type, pData); if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { if (pResultRow->key == NULL) { pResultRow->key = malloc(varDataTLen(pData)); @@ -1790,7 +1783,7 @@ static int32_t tableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBl if (QUERY_IS_INTERVAL_QUERY(pQuery)) { numOfRes = doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); } else if (pRuntimeEnv->groupbyNormalCol) { - closeAllTimeWindow(pWindowResInfo); + closeAllResultRows(pWindowResInfo); numOfRes = pWindowResInfo->size; } else { // projection query numOfRes = (int32_t)getNumOfResult(pRuntimeEnv); @@ -2094,7 +2087,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv); qDebug("QInfo:%p teardown runtime env", pQInfo); - cleanupTimeWindowInfo(&pRuntimeEnv->windowResInfo); + cleanupResultRowInfo(&pRuntimeEnv->windowResInfo); if (pRuntimeEnv->pCtx != NULL) { for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { @@ -2928,7 +2921,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { if (QUERY_IS_INTERVAL_QUERY(pQuery) && (IS_MASTER_SCAN(pRuntimeEnv)|| pRuntimeEnv->scanFlag == REPEAT_SCAN)) { if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) { - closeAllTimeWindow(&pRuntimeEnv->windowResInfo); + closeAllResultRows(&pRuntimeEnv->windowResInfo); pRuntimeEnv->windowResInfo.curIndex = pRuntimeEnv->windowResInfo.size - 1; // point to the last time window } else { assert(Q_STATUS_EQUAL(pQuery->status, QUERY_RESBUF_FULL)); @@ -3707,8 +3700,8 @@ void switchCtxOrder(SQueryRuntimeEnv *pRuntimeEnv) { int32_t initResultRow(SResultRow *pResultRow) { pResultRow->pCellInfo = (SResultRowCellInfo*)((char*)pResultRow + sizeof(SResultRow)); - pResultRow->pageId = -1; - pResultRow->rowId = -1; + pResultRow->pageId = -1; + pResultRow->rowId = -1; return TSDB_CODE_SUCCESS; } @@ -4057,12 +4050,12 @@ void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) { // for each group result, call the finalize function for each column SResultRowInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; if (pRuntimeEnv->groupbyNormalCol) { - closeAllTimeWindow(pWindowResInfo); + closeAllResultRows(pWindowResInfo); } for (int32_t i = 0; i < pWindowResInfo->size; ++i) { SResultRow *buf = pWindowResInfo->pResult[i]; - if (!isWindowResClosed(pWindowResInfo, i)) { + if (!isResultRowClosed(pWindowResInfo, i)) { continue; } @@ -4112,7 +4105,7 @@ static STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, void // set more initial size of interval/groupby query if (QUERY_IS_INTERVAL_QUERY(pQuery) || pRuntimeEnv->groupbyNormalCol) { int32_t initialSize = 128; - int32_t code = initWindowResInfo(&pTableQueryInfo->windowResInfo, initialSize, TSDB_DATA_TYPE_INT); + int32_t code = initResultRowInfo(&pTableQueryInfo->windowResInfo, initialSize, TSDB_DATA_TYPE_INT); if (code != TSDB_CODE_SUCCESS) { return NULL; } @@ -4128,7 +4121,7 @@ void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) { } tVariantDestroy(&pTableQueryInfo->tag); - cleanupTimeWindowInfo(&pTableQueryInfo->windowResInfo); + cleanupResultRowInfo(&pTableQueryInfo->windowResInfo); } /** @@ -4360,7 +4353,7 @@ static int32_t doCopyToSData(SQInfo *pQInfo, SResultRowInfo *pResultInfo, int32_ int32_t step = -1; qDebug("QInfo:%p start to copy data from windowResInfo to query buf", pQInfo); - int32_t totalSet = numOfClosedTimeWindow(pResultInfo); + int32_t totalSet = numOfClosedResultRows(pResultInfo); SResultRow** result = pResultInfo->pResult; if (orderType == TSDB_ORDER_ASC) { @@ -4481,7 +4474,7 @@ static void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBloc // TODO refactor if ((pTableQueryInfo->lastKey >= pTableQueryInfo->win.ekey && ascQuery) || (pTableQueryInfo->lastKey <= pTableQueryInfo->win.ekey && (!ascQuery))) { - closeAllTimeWindow(pWindowResInfo); + closeAllResultRows(pWindowResInfo); pWindowResInfo->curIndex = pWindowResInfo->size - 1; } else { updateResultRowCurrentIndex(pWindowResInfo, pTableQueryInfo->lastKey, ascQuery); @@ -5031,7 +5024,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo type = TSDB_DATA_TYPE_INT; // group id } - code = initWindowResInfo(&pRuntimeEnv->windowResInfo, 8, type); + code = initResultRowInfo(&pRuntimeEnv->windowResInfo, 8, type); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -5051,7 +5044,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo type = TSDB_DATA_TYPE_TIMESTAMP; } - code = initWindowResInfo(&pRuntimeEnv->windowResInfo, numOfResultRows, type); + code = initResultRowInfo(&pRuntimeEnv->windowResInfo, numOfResultRows, type); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -5479,7 +5472,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { pQInfo->groupIndex = currentGroupIndex; // restore the group index assert(pQuery->rec.rows == pWindowResInfo->size); - clearClosedTimeWindow(pRuntimeEnv); + clearClosedResultRows(pRuntimeEnv, &pRuntimeEnv->windowResInfo); break; } } else if (pRuntimeEnv->queryWindowIdentical && pRuntimeEnv->pTSBuf == NULL && !isTSCompQuery(pQuery)) { @@ -5641,7 +5634,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) { } resetDefaultResInfoOutputBuf(pRuntimeEnv); - resetTimeWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo); + resetResultRowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo); SArray *group = GET_TABLEGROUP(pQInfo, 0); assert(taosArrayGetSize(group) == pQInfo->tableqinfoGroupInfo.numOfTables && @@ -5796,11 +5789,11 @@ static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) { size_t num = taosArrayGetSize(group); for (int32_t j = 0; j < num; ++j) { STableQueryInfo* item = taosArrayGetP(group, j); - closeAllTimeWindow(&item->windowResInfo); + closeAllResultRows(&item->windowResInfo); } } } else { // close results for group result - closeAllTimeWindow(&pQInfo->runtimeEnv.windowResInfo); + closeAllResultRows(&pQInfo->runtimeEnv.windowResInfo); } } @@ -6048,10 +6041,10 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) if ((pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) && pQuery->limit.offset > 0 && pQuery->fillType == TSDB_FILL_NONE) { // maxOutput <= 0, means current query does not generate any results - int32_t numOfClosed = numOfClosedTimeWindow(&pRuntimeEnv->windowResInfo); + int32_t numOfClosed = numOfClosedResultRows(&pRuntimeEnv->windowResInfo); int32_t c = (int32_t)(MIN(numOfClosed, pQuery->limit.offset)); - clearFirstNWindowRes(pRuntimeEnv, c); + popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, c); pQuery->limit.offset -= c; } @@ -6088,7 +6081,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { pQuery->rec.rows = 0; copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo); - clearFirstNWindowRes(pRuntimeEnv, pQInfo->groupIndex); + popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pQInfo->groupIndex); } // no result generated, abort @@ -6121,16 +6114,16 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) { // all data scanned, the group by normal column can return if (pRuntimeEnv->groupbyNormalCol) { // todo refactor with merge interval time result // maxOutput <= 0, means current query does not generate any results - int32_t numOfClosed = numOfClosedTimeWindow(&pRuntimeEnv->windowResInfo); + int32_t numOfClosed = numOfClosedResultRows(&pRuntimeEnv->windowResInfo); if ((pQuery->limit.offset > 0 && pQuery->limit.offset < numOfClosed) || pQuery->limit.offset == 0) { // skip offset result rows - clearFirstNWindowRes(pRuntimeEnv, (int32_t) pQuery->limit.offset); + popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (int32_t) pQuery->limit.offset); pQuery->rec.rows = 0; pQInfo->groupIndex = 0; copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo); - clearFirstNWindowRes(pRuntimeEnv, pQInfo->groupIndex); + popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pQInfo->groupIndex); doSecondaryArithmeticProcess(pQuery); limitResults(pRuntimeEnv); @@ -6164,7 +6157,7 @@ static void tableQueryImpl(SQInfo *pQInfo) { if (pRuntimeEnv->windowResInfo.size > 0) { copyFromWindowResToSData(pQInfo, &pRuntimeEnv->windowResInfo); - clearFirstNWindowRes(pRuntimeEnv, pQInfo->groupIndex); + popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pQInfo->groupIndex); if (pQuery->rec.rows > 0) { qDebug("QInfo:%p %"PRId64" rows returned from group results, total:%"PRId64"", pQInfo, pQuery->rec.rows, pQuery->rec.total); @@ -7029,7 +7022,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou pQInfo->runtimeEnv.pResultRowHashTable = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pQInfo->runtimeEnv.keyBuf = malloc(TSDB_MAX_BYTES_PER_ROW); - pQInfo->runtimeEnv.pool = initResultRowPool(getWindowResultSize(&pQInfo->runtimeEnv)); + pQInfo->runtimeEnv.pool = initResultRowPool(getResultRowSize(&pQInfo->runtimeEnv)); pQInfo->runtimeEnv.prevRow = malloc(POINTER_BYTES * pQuery->numOfCols + srcSize); char* start = POINTER_BYTES * pQuery->numOfCols + (char*) pQInfo->runtimeEnv.prevRow; diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 6c845b012f..65ac60e91f 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -43,7 +43,7 @@ int32_t getOutputInterResultBufSize(SQuery* pQuery) { return size; } -int32_t initWindowResInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t type) { +int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t type) { pResultRowInfo->capacity = size; pResultRowInfo->type = type; @@ -59,10 +59,11 @@ int32_t initWindowResInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t return TSDB_CODE_SUCCESS; } -void cleanupTimeWindowInfo(SResultRowInfo *pResultRowInfo) { +void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) { if (pResultRowInfo == NULL) { return; } + if (pResultRowInfo->capacity == 0) { assert(pResultRowInfo->pResult == NULL); return; @@ -77,7 +78,7 @@ void cleanupTimeWindowInfo(SResultRowInfo *pResultRowInfo) { tfree(pResultRowInfo->pResult); } -void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) { +void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) { if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0) { return; } @@ -100,13 +101,12 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultR pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL; } -void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { - SResultRowInfo *pResultRowInfo = &pRuntimeEnv->windowResInfo; +void popFrontResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int32_t num) { if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0 || pResultRowInfo->size == 0 || num == 0) { return; } - int32_t numOfClosed = numOfClosedTimeWindow(pResultRowInfo); + int32_t numOfClosed = numOfClosedResultRows(pResultRowInfo); assert(num >= 0 && num <= numOfClosed); int16_t type = pResultRowInfo->type; @@ -159,17 +159,16 @@ void clearFirstNWindowRes(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { pResultRowInfo->curIndex = -1; } -void clearClosedTimeWindow(SQueryRuntimeEnv *pRuntimeEnv) { - SResultRowInfo *pResultRowInfo = &pRuntimeEnv->windowResInfo; +void clearClosedResultRows(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo) { if (pResultRowInfo == NULL || pResultRowInfo->capacity == 0 || pResultRowInfo->size == 0) { return; } - int32_t numOfClosed = numOfClosedTimeWindow(pResultRowInfo); - clearFirstNWindowRes(pRuntimeEnv, numOfClosed); + int32_t numOfClosed = numOfClosedResultRows(pResultRowInfo); + popFrontResultRow(pRuntimeEnv, &pRuntimeEnv->windowResInfo, numOfClosed); } -int32_t numOfClosedTimeWindow(SResultRowInfo *pResultRowInfo) { +int32_t numOfClosedResultRows(SResultRowInfo *pResultRowInfo) { int32_t i = 0; while (i < pResultRowInfo->size && pResultRowInfo->pResult[i]->closed) { ++i; @@ -178,7 +177,7 @@ int32_t numOfClosedTimeWindow(SResultRowInfo *pResultRowInfo) { return i; } -void closeAllTimeWindow(SResultRowInfo *pResultRowInfo) { +void closeAllResultRows(SResultRowInfo *pResultRowInfo) { assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size); for (int32_t i = 0; i < pResultRowInfo->size; ++i) { @@ -195,7 +194,7 @@ void closeAllTimeWindow(SResultRowInfo *pResultRowInfo) { * the last qualified time stamp in case of sliding query, which the sliding time is not equalled to the interval time. * NOTE: remove redundant, only when the result set order equals to traverse order */ -void removeRedundantWindow(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_t order) { +void removeRedundantResultRows(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_t order) { assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size); if (pResultRowInfo->size <= 1) { return; @@ -224,27 +223,27 @@ void removeRedundantWindow(SResultRowInfo *pResultRowInfo, TSKEY lastKey, int32_ } } -bool isWindowResClosed(SResultRowInfo *pResultRowInfo, int32_t slot) { +bool isResultRowClosed(SResultRowInfo *pResultRowInfo, int32_t slot) { return (getResultRow(pResultRowInfo, slot)->closed == true); } -void closeTimeWindow(SResultRowInfo *pResultRowInfo, int32_t slot) { +void closeResultRow(SResultRowInfo *pResultRowInfo, int32_t slot) { getResultRow(pResultRowInfo, slot)->closed = true; } -void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pWindowRes, int16_t type) { - if (pWindowRes == NULL) { +void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16_t type) { + if (pResultRow == NULL) { return; } // the result does not put into the SDiskbasedResultBuf, ignore it. - if (pWindowRes->pageId >= 0) { - tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pWindowRes->pageId); + if (pResultRow->pageId >= 0) { + tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResultRow->pageId); for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutput; ++i) { - SResultRowCellInfo *pResultInfo = &pWindowRes->pCellInfo[i]; + SResultRowCellInfo *pResultInfo = &pResultRow->pCellInfo[i]; - char * s = getPosInResultPage(pRuntimeEnv, i, pWindowRes, page); + char * s = getPosInResultPage(pRuntimeEnv, i, pResultRow, page); size_t size = pRuntimeEnv->pQuery->pExpr1[i].bytes; memset(s, 0, size); @@ -252,15 +251,15 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pWindowRes, int16 } } - pWindowRes->numOfRows = 0; - pWindowRes->pageId = -1; - pWindowRes->rowId = -1; - pWindowRes->closed = false; + pResultRow->numOfRows = 0; + pResultRow->pageId = -1; + pResultRow->rowId = -1; + pResultRow->closed = false; if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { - tfree(pWindowRes->key); + tfree(pResultRow->key); } else { - pWindowRes->win = TSWINDOW_INITIALIZER; + pResultRow->win = TSWINDOW_INITIALIZER; } } @@ -310,7 +309,7 @@ SResultRowCellInfo* getResultCell(SQueryRuntimeEnv* pRuntimeEnv, const SResultRo return (SResultRowCellInfo*)((char*) pRow->pCellInfo + pRuntimeEnv->rowCellInfoOffset[index]); } -size_t getWindowResultSize(SQueryRuntimeEnv* pRuntimeEnv) { +size_t getResultRowSize(SQueryRuntimeEnv* pRuntimeEnv) { return (pRuntimeEnv->pQuery->numOfOutput * sizeof(SResultRowCellInfo)) + pRuntimeEnv->interBufSize + sizeof(SResultRow); } diff --git a/tests/script/general/parser/fill.sim b/tests/script/general/parser/fill.sim index 9851a4e7fc..405a805312 100644 --- a/tests/script/general/parser/fill.sim +++ b/tests/script/general/parser/fill.sim @@ -848,10 +848,7 @@ if $rows != 12 then return -1 endi -print =====================>td-1442 -sql_error select count(*) from m_fl_tb0 interval(1s) fill(prev); - -print =====================> aggregation + arithmetic + fill +print =====================> aggregation + arithmetic + fill, need to add cases TODO #sql select avg(cpu_taosd) - first(cpu_taosd) from dn1 where ts<'2020-11-13 11:00:00' and ts>'2020-11-13 10:50:00' interval(10s) fill(value, 99) #sql select count(*), first(k), avg(k), avg(k)-first(k) from tm0 where ts>'2020-1-1 1:1:1' and ts<'2020-1-1 1:02:59' interval(10s) fill(value, 99); #sql select count(*), first(k), avg(k), avg(k)-first(k) from tm0 where ts>'2020-1-1 1:1:1' and ts<'2020-1-1 1:02:59' interval(10s) fill(NULL); @@ -1044,6 +1041,17 @@ if $data12 != 1 then return -1 endi +print =====================>td-1442, td-2190 , no time range for fill option +sql_error select count(*) from m_fl_tb0 interval(1s) fill(prev); + +sql_error select min(c3) from m_fl_mt0 interval(10a) fill(value, 20) +sql_error select min(c3) from m_fl_mt0 interval(10s) fill(value, 20) +sql_error select min(c3) from m_fl_mt0 interval(10m) fill(value, 20) +sql_error select min(c3) from m_fl_mt0 interval(10h) fill(value, 20) +sql_error select min(c3) from m_fl_mt0 interval(10d) fill(value, 20) +sql_error select min(c3) from m_fl_mt0 interval(10w) fill(value, 20) +sql_error select max(c3) from m_fl_mt0 interval(1n) fill(prev) +sql_error select min(c3) from m_fl_mt0 interval(1y) fill(value, 20) print =============== clear #sql drop database $db