diff --git a/packaging/tools/install.sh b/packaging/tools/install.sh index ddf7114f08..d6dccf7045 100755 --- a/packaging/tools/install.sh +++ b/packaging/tools/install.sh @@ -172,6 +172,7 @@ function install_bin() { ${csudo} rm -f ${bin_link_dir}/taos || : ${csudo} rm -f ${bin_link_dir}/taosd || : ${csudo} rm -f ${bin_link_dir}/taosdemo || : + ${csudo} rm -f ${bin_link_dir}/taosdump || : ${csudo} rm -f ${bin_link_dir}/rmtaos || : ${csudo} rm -f ${bin_link_dir}/tarbitrator || : ${csudo} rm -f ${bin_link_dir}/set_core || : @@ -182,6 +183,7 @@ function install_bin() { [ -x ${install_main_dir}/bin/taos ] && ${csudo} ln -s ${install_main_dir}/bin/taos ${bin_link_dir}/taos || : [ -x ${install_main_dir}/bin/taosd ] && ${csudo} ln -s ${install_main_dir}/bin/taosd ${bin_link_dir}/taosd || : [ -x ${install_main_dir}/bin/taosdemo ] && ${csudo} ln -s ${install_main_dir}/bin/taosdemo ${bin_link_dir}/taosdemo || : + [ -x ${install_main_dir}/bin/taosdump ] && ${csudo} ln -s ${install_main_dir}/bin/taosdump ${bin_link_dir}/taosdump || : [ -x ${install_main_dir}/bin/remove.sh ] && ${csudo} ln -s ${install_main_dir}/bin/remove.sh ${bin_link_dir}/rmtaos || : [ -x ${install_main_dir}/bin/set_core.sh ] && ${csudo} ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || : [ -x ${install_main_dir}/bin/tarbitrator ] && ${csudo} ln -s ${install_main_dir}/bin/tarbitrator ${bin_link_dir}/tarbitrator || : diff --git a/packaging/tools/install_client.sh b/packaging/tools/install_client.sh index 34a9bfaecb..0467300953 100755 --- a/packaging/tools/install_client.sh +++ b/packaging/tools/install_client.sh @@ -84,8 +84,9 @@ function install_main_path() { function install_bin() { # Remove links ${csudo} rm -f ${bin_link_dir}/taos || : - if [ "$osType" == "Darwin" ]; then + if [ "$osType" != "Darwin" ]; then ${csudo} rm -f ${bin_link_dir}/taosdemo || : + ${csudo} rm -f ${bin_link_dir}/taosdump || : fi ${csudo} rm -f ${bin_link_dir}/rmtaos || : ${csudo} rm -f ${bin_link_dir}/set_core || : @@ -94,8 +95,9 @@ function install_bin() { #Make link [ -x ${install_main_dir}/bin/taos ] && ${csudo} ln -s ${install_main_dir}/bin/taos ${bin_link_dir}/taos || : - if [ "$osType" == "Darwin" ]; then + if [ "$osType" != "Darwin" ]; then [ -x ${install_main_dir}/bin/taosdemo ] && ${csudo} ln -s ${install_main_dir}/bin/taosdemo ${bin_link_dir}/taosdemo || : + [ -x ${install_main_dir}/bin/taosdump ] && ${csudo} ln -s ${install_main_dir}/bin/taosdump ${bin_link_dir}/taosdump || : fi [ -x ${install_main_dir}/bin/remove_client.sh ] && ${csudo} ln -s ${install_main_dir}/bin/remove_client.sh ${bin_link_dir}/rmtaos || : [ -x ${install_main_dir}/bin/set_core.sh ] && ${csudo} ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || : diff --git a/packaging/tools/install_client_power.sh b/packaging/tools/install_client_power.sh index 0108d1d44a..26977e12f4 100755 --- a/packaging/tools/install_client_power.sh +++ b/packaging/tools/install_client_power.sh @@ -84,8 +84,9 @@ function install_main_path() { function install_bin() { # Remove links ${csudo} rm -f ${bin_link_dir}/power || : - if [ "$osType" == "Darwin" ]; then + if [ "$osType" != "Darwin" ]; then ${csudo} rm -f ${bin_link_dir}/powerdemo || : + ${csudo} rm -f ${bin_link_dir}/powerdump || : fi ${csudo} rm -f ${bin_link_dir}/rmpower || : ${csudo} rm -f ${bin_link_dir}/set_core || : @@ -94,8 +95,9 @@ function install_bin() { #Make link [ -x ${install_main_dir}/bin/power ] && ${csudo} ln -s ${install_main_dir}/bin/power ${bin_link_dir}/power || : - if [ "$osType" == "Darwin" ]; then + if [ "$osType" != "Darwin" ]; then [ -x ${install_main_dir}/bin/powerdemo ] && ${csudo} ln -s ${install_main_dir}/bin/powerdemo ${bin_link_dir}/powerdemo || : + [ -x ${install_main_dir}/bin/powerdump ] && ${csudo} ln -s ${install_main_dir}/bin/powerdump ${bin_link_dir}/powerdump || : fi [ -x ${install_main_dir}/bin/remove_client_power.sh ] && ${csudo} ln -s ${install_main_dir}/bin/remove_client_power.sh ${bin_link_dir}/rmpower || : [ -x ${install_main_dir}/bin/set_core.sh ] && ${csudo} ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || : diff --git a/packaging/tools/install_power.sh b/packaging/tools/install_power.sh index df6291f4ae..5929b52afc 100755 --- a/packaging/tools/install_power.sh +++ b/packaging/tools/install_power.sh @@ -172,6 +172,7 @@ function install_bin() { ${csudo} rm -f ${bin_link_dir}/power || : ${csudo} rm -f ${bin_link_dir}/powerd || : ${csudo} rm -f ${bin_link_dir}/powerdemo || : + ${csudo} rm -f ${bin_link_dir}/powerdump || : ${csudo} rm -f ${bin_link_dir}/rmpower || : ${csudo} rm -f ${bin_link_dir}/tarbitrator || : ${csudo} rm -f ${bin_link_dir}/set_core || : @@ -182,6 +183,7 @@ function install_bin() { [ -x ${install_main_dir}/bin/power ] && ${csudo} ln -s ${install_main_dir}/bin/power ${bin_link_dir}/power || : [ -x ${install_main_dir}/bin/powerd ] && ${csudo} ln -s ${install_main_dir}/bin/powerd ${bin_link_dir}/powerd || : [ -x ${install_main_dir}/bin/powerdemo ] && ${csudo} ln -s ${install_main_dir}/bin/powerdemo ${bin_link_dir}/powerdemo || : + [ -x ${install_main_dir}/bin/powerdump ] && ${csudo} ln -s ${install_main_dir}/bin/powerdump ${bin_link_dir}/powerdump || : [ -x ${install_main_dir}/bin/remove_power.sh ] && ${csudo} ln -s ${install_main_dir}/bin/remove_power.sh ${bin_link_dir}/rmpower || : [ -x ${install_main_dir}/bin/set_core.sh ] && ${csudo} ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || : [ -x ${install_main_dir}/bin/tarbitrator ] && ${csudo} ln -s ${install_main_dir}/bin/tarbitrator ${bin_link_dir}/tarbitrator || : diff --git a/packaging/tools/post.sh b/packaging/tools/post.sh index 00705fad77..52919976ee 100755 --- a/packaging/tools/post.sh +++ b/packaging/tools/post.sh @@ -92,6 +92,7 @@ function install_bin() { ${csudo} rm -f ${bin_link_dir}/taos || : ${csudo} rm -f ${bin_link_dir}/taosd || : ${csudo} rm -f ${bin_link_dir}/taosdemo || : + ${csudo} rm -f ${bin_link_dir}/taosdump || : ${csudo} rm -f ${bin_link_dir}/rmtaos || : ${csudo} rm -f ${bin_link_dir}/set_core || : @@ -101,6 +102,7 @@ function install_bin() { [ -x ${bin_dir}/taos ] && ${csudo} ln -s ${bin_dir}/taos ${bin_link_dir}/taos || : [ -x ${bin_dir}/taosd ] && ${csudo} ln -s ${bin_dir}/taosd ${bin_link_dir}/taosd || : [ -x ${bin_dir}/taosdemo ] && ${csudo} ln -s ${bin_dir}/taosdemo ${bin_link_dir}/taosdemo || : + [ -x ${bin_dir}/taosdump ] && ${csudo} ln -s ${bin_dir}/taosdump ${bin_link_dir}/taosdump || : [ -x ${bin_dir}/set_core.sh ] && ${csudo} ln -s ${bin_dir}/set_core.sh ${bin_link_dir}/set_core || : } diff --git a/packaging/tools/remove.sh b/packaging/tools/remove.sh index e9a4f48cf7..2f2660d446 100755 --- a/packaging/tools/remove.sh +++ b/packaging/tools/remove.sh @@ -72,6 +72,7 @@ function clean_bin() { ${csudo} rm -f ${bin_link_dir}/taos || : ${csudo} rm -f ${bin_link_dir}/taosd || : ${csudo} rm -f ${bin_link_dir}/taosdemo || : + ${csudo} rm -f ${bin_link_dir}/taosdump || : ${csudo} rm -f ${bin_link_dir}/rmtaos || : ${csudo} rm -f ${bin_link_dir}/tarbitrator || : ${csudo} rm -f ${bin_link_dir}/set_core || : @@ -222,4 +223,4 @@ elif echo $osinfo | grep -qwi "centos" ; then fi echo -e "${GREEN}TDengine is removed successfully!${NC}" -echo \ No newline at end of file +echo diff --git a/packaging/tools/remove_client.sh b/packaging/tools/remove_client.sh index 2c28b7b6bf..7579162dc6 100755 --- a/packaging/tools/remove_client.sh +++ b/packaging/tools/remove_client.sh @@ -38,6 +38,7 @@ function clean_bin() { # Remove link ${csudo} rm -f ${bin_link_dir}/taos || : ${csudo} rm -f ${bin_link_dir}/taosdemo || : + ${csudo} rm -f ${bin_link_dir}/taosdump || : ${csudo} rm -f ${bin_link_dir}/rmtaos || : ${csudo} rm -f ${bin_link_dir}/set_core || : } diff --git a/packaging/tools/remove_client_power.sh b/packaging/tools/remove_client_power.sh index 7a3c99e100..580c46e207 100755 --- a/packaging/tools/remove_client_power.sh +++ b/packaging/tools/remove_client_power.sh @@ -38,6 +38,7 @@ function clean_bin() { # Remove link ${csudo} rm -f ${bin_link_dir}/power || : ${csudo} rm -f ${bin_link_dir}/powerdemo || : + ${csudo} rm -f ${bin_link_dir}/powerdump || : ${csudo} rm -f ${bin_link_dir}/rmpower || : ${csudo} rm -f ${bin_link_dir}/set_core || : } diff --git a/packaging/tools/remove_power.sh b/packaging/tools/remove_power.sh index d6d6c5dd7c..816869cf44 100755 --- a/packaging/tools/remove_power.sh +++ b/packaging/tools/remove_power.sh @@ -72,6 +72,7 @@ function clean_bin() { ${csudo} rm -f ${bin_link_dir}/power || : ${csudo} rm -f ${bin_link_dir}/powerd || : ${csudo} rm -f ${bin_link_dir}/powerdemo || : + ${csudo} rm -f ${bin_link_dir}/powerdump || : ${csudo} rm -f ${bin_link_dir}/rmpower || : ${csudo} rm -f ${bin_link_dir}/tarbitrator || : ${csudo} rm -f ${bin_link_dir}/set_core || : @@ -223,4 +224,4 @@ fi #fi echo -e "${GREEN}PowerDB is removed successfully!${NC}" -echo \ No newline at end of file +echo diff --git a/src/client/inc/tscLocalMerge.h b/src/client/inc/tscLocalMerge.h index 2c7c2f51d0..43ba31f331 100644 --- a/src/client/inc/tscLocalMerge.h +++ b/src/client/inc/tscLocalMerge.h @@ -56,7 +56,6 @@ typedef struct SLocalReducer { tFilePage * pTempBuffer; struct SQLFunctionCtx *pCtx; int32_t rowSize; // size of each intermediate result. - int32_t finalRowSize; // final result row size int32_t status; // denote it is in reduce process, in reduce process, it bool hasPrevRow; // cannot be released bool hasUnprocessedRow; diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index d39b833374..3867032d46 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -64,13 +64,13 @@ } \ } while (0); -#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \ -do {\ -for (int32_t i = 0; i < (ctx)->tagInfo.numOfTagCols; ++i) { \ - SQLFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[i]; \ - aAggs[TSDB_FUNC_TAG].xFunction(__ctx); \ - } \ -} while(0); +#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \ + do { \ + for (int32_t i = 0; i < (ctx)->tagInfo.numOfTagCols; ++i) { \ + SQLFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[i]; \ + aAggs[TSDB_FUNC_TAG].xFunction(__ctx); \ + } \ + } while (0); void noop1(SQLFunctionCtx *UNUSED_PARAM(pCtx)) {} void noop2(SQLFunctionCtx *UNUSED_PARAM(pCtx), int32_t UNUSED_PARAM(index)) {} @@ -3624,52 +3624,147 @@ static bool twa_function_setup(SQLFunctionCtx *pCtx) { return false; } - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); //->aOutputBuf + pCtx->outputBytes; - STwaInfo * pInfo = GET_ROWCELL_INTERBUF(pResInfo); - - pInfo->lastKey = INT64_MIN; - pInfo->type = pCtx->inputType; - + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + + STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); + pInfo->lastKey = INT64_MIN; + pInfo->win = TSWINDOW_INITIALIZER; return true; } -static FORCE_INLINE void setTWALastVal(SQLFunctionCtx *pCtx, const char *data, int32_t i, STwaInfo *pInfo) { - switch (pCtx->inputType) { - case TSDB_DATA_TYPE_INT: - pInfo->iLastValue = GET_INT32_VAL(data + pCtx->inputBytes * i); - break; - case TSDB_DATA_TYPE_TINYINT: - pInfo->iLastValue = GET_INT8_VAL(data + pCtx->inputBytes * i); - break; - case TSDB_DATA_TYPE_SMALLINT: - pInfo->iLastValue = GET_INT16_VAL(data + pCtx->inputBytes * i); - break; - case TSDB_DATA_TYPE_BIGINT: - pInfo->iLastValue = GET_INT64_VAL(data + pCtx->inputBytes * i); - break; - case TSDB_DATA_TYPE_FLOAT: - pInfo->dLastValue = GET_FLOAT_VAL(data + pCtx->inputBytes * i); - break; - case TSDB_DATA_TYPE_DOUBLE: - pInfo->dLastValue = GET_DOUBLE_VAL(data + pCtx->inputBytes * i); - break; - default: - assert(0); +static int32_t twa_function_impl(SQLFunctionCtx* pCtx, int32_t index, int32_t size) { + int32_t notNullElems = 0; + TSKEY *primaryKey = pCtx->ptsList; + + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); + + STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); + int32_t i = index; + + if (pCtx->start.key != INT64_MIN) { + assert(pCtx->start.key < primaryKey[index] && pInfo->lastKey == INT64_MIN); + + pInfo->lastKey = primaryKey[index]; + GET_TYPED_DATA(pInfo->lastValue, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, 0)); + + pInfo->dOutput += ((pInfo->lastValue + pCtx->start.val) / 2) * (pInfo->lastKey - pCtx->start.key); + + pInfo->hasResult = DATA_SET_FLAG; + pInfo->win.skey = pCtx->start.key; + notNullElems++; + i += 1; + } else if (pInfo->lastKey == INT64_MIN) { + pInfo->lastKey = primaryKey[index]; + GET_TYPED_DATA(pInfo->lastValue, double, pCtx->inputType, GET_INPUT_CHAR_INDEX(pCtx, 0)); + + pInfo->hasResult = DATA_SET_FLAG; + pInfo->win.skey = pInfo->lastKey; + notNullElems++; + i += 1; } + + // calculate the value of + switch(pCtx->inputType) { + case TSDB_DATA_TYPE_TINYINT: { + int8_t *val = (int8_t*) GET_INPUT_CHAR_INDEX(pCtx, index); + for (; i < size; i++) { + if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { + continue; + } + + pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey); + pInfo->lastValue = val[i]; + pInfo->lastKey = primaryKey[i]; + } + break; + } + case TSDB_DATA_TYPE_SMALLINT: { + int16_t *val = (int16_t*) GET_INPUT_CHAR_INDEX(pCtx, index); + for (; i < size; i++) { + if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { + continue; + } + + pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey); + pInfo->lastValue = val[i]; + pInfo->lastKey = primaryKey[i]; + } + break; + } + case TSDB_DATA_TYPE_INT: { + int32_t *val = (int32_t*) GET_INPUT_CHAR_INDEX(pCtx, index); + for (; i < size; i++) { + if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { + continue; + } + + pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey); + pInfo->lastValue = val[i]; + pInfo->lastKey = primaryKey[i]; + } + break; + } + case TSDB_DATA_TYPE_BIGINT: { + int64_t *val = (int64_t*) GET_INPUT_CHAR_INDEX(pCtx, index); + for (; i < size; i++) { + if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { + continue; + } + + pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey); + pInfo->lastValue = (double) val[i]; + pInfo->lastKey = primaryKey[i]; + } + break; + } + case TSDB_DATA_TYPE_FLOAT: { + float *val = (float*) GET_INPUT_CHAR_INDEX(pCtx, index); + for (; i < size; i++) { + if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { + continue; + } + + pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey); + pInfo->lastValue = val[i]; + pInfo->lastKey = primaryKey[i]; + } + break; + } + case TSDB_DATA_TYPE_DOUBLE: { + double *val = (double*) GET_INPUT_CHAR_INDEX(pCtx, index); + for (; i < size; i++) { + if (pCtx->hasNull && isNull((const char*) &val[i], pCtx->inputType)) { + continue; + } + + pInfo->dOutput += ((val[i] + pInfo->lastValue) / 2) * (primaryKey[i] - pInfo->lastKey); + pInfo->lastValue = val[i]; + pInfo->lastKey = primaryKey[i]; + } + break; + } + default: assert(0); + } + + // the last interpolated time window value + if (pCtx->end.key != INT64_MIN) { + pInfo->dOutput += ((pInfo->lastValue + pCtx->end.val) / 2) * (pCtx->end.key - pInfo->lastKey); + pInfo->lastValue = pCtx->end.val; + pInfo->lastKey = pCtx->end.key; + } + + pInfo->win.ekey = pInfo->lastKey; + return notNullElems; } static void twa_function(SQLFunctionCtx *pCtx) { void * data = GET_INPUT_CHAR(pCtx); - TSKEY *primaryKey = pCtx->ptsList; - - int32_t notNullElems = 0; - + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); STwaInfo * pInfo = GET_ROWCELL_INTERBUF(pResInfo); - int32_t i = 0; - // skip null value + int32_t i = 0; while (pCtx->hasNull && i < pCtx->size && isNull((char *)data + pCtx->inputBytes * i, pCtx->inputType)) { i++; } @@ -3678,40 +3773,7 @@ static void twa_function(SQLFunctionCtx *pCtx) { return; } - if (pInfo->lastKey == INT64_MIN) { - pInfo->lastKey = pCtx->nStartQueryTimestamp; - setTWALastVal(pCtx, data, i, pInfo); - - pInfo->hasResult = DATA_SET_FLAG; - } - - notNullElems++; - - if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT || pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) { - pInfo->dOutput += pInfo->dLastValue * (primaryKey[i] - pInfo->lastKey); - } else { - pInfo->iOutput += pInfo->iLastValue * (primaryKey[i] - pInfo->lastKey); - } - - pInfo->lastKey = primaryKey[i]; - setTWALastVal(pCtx, data, i, pInfo); - - for (++i; i < pCtx->size; i++) { - if (pCtx->hasNull && isNull((char *)data + pCtx->inputBytes * i, pCtx->inputType)) { - continue; - } - - notNullElems++; - if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT || pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) { - pInfo->dOutput += pInfo->dLastValue * (primaryKey[i] - pInfo->lastKey); - } else { - pInfo->iOutput += pInfo->iLastValue * (primaryKey[i] - pInfo->lastKey); - } - - pInfo->lastKey = primaryKey[i]; - setTWALastVal(pCtx, data, i, pInfo); - } - + int32_t notNullElems = twa_function_impl(pCtx, pCtx->startOffset, pCtx->size); SET_VAL(pCtx, notNullElems, 1); if (notNullElems > 0) { @@ -3721,8 +3783,6 @@ static void twa_function(SQLFunctionCtx *pCtx) { if (pCtx->stableQuery) { memcpy(pCtx->aOutputBuf, pInfo, sizeof(STwaInfo)); } - - // pCtx->numOfIteratedElems += notNullElems; } static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) { @@ -3730,35 +3790,12 @@ static void twa_function_f(SQLFunctionCtx *pCtx, int32_t index) { if (pCtx->hasNull && isNull(pData, pCtx->inputType)) { return; } - - SET_VAL(pCtx, 1, 1); - - TSKEY *primaryKey = pCtx->ptsList; - - SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); - STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); - - if (pInfo->lastKey == INT64_MIN) { - pInfo->lastKey = pCtx->nStartQueryTimestamp; - setTWALastVal(pCtx, pData, 0, pInfo); - - pInfo->hasResult = DATA_SET_FLAG; - } - - if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT || pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) { - pInfo->dOutput += pInfo->dLastValue * (primaryKey[index] - pInfo->lastKey); - } else { - pInfo->iOutput += pInfo->iLastValue * (primaryKey[index] - pInfo->lastKey); - } - - // record the last key/value - pInfo->lastKey = primaryKey[index]; - setTWALastVal(pCtx, pData, 0, pInfo); - - // pCtx->numOfIteratedElems += 1; - pResInfo->hasResult = DATA_SET_FLAG; - + + int32_t notNullElems = twa_function_impl(pCtx, index, 1); + SET_VAL(pCtx, notNullElems, 1); + if (pCtx->stableQuery) { + SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); memcpy(pCtx->aOutputBuf, GET_ROWCELL_INTERBUF(pResInfo), sizeof(STwaInfo)); } } @@ -3778,16 +3815,10 @@ static void twa_func_merge(SQLFunctionCtx *pCtx) { } numOfNotNull++; - if (pCtx->inputType >= TSDB_DATA_TYPE_TINYINT && pCtx->inputType <= TSDB_DATA_TYPE_BIGINT) { - pBuf->iOutput += pInput->iOutput; - } else { - pBuf->dOutput += pInput->dOutput; - } - - pBuf->SKey = pInput->SKey; - pBuf->EKey = pInput->EKey; + pBuf->dOutput += pInput->dOutput; + + pBuf->win = pInput->win; pBuf->lastKey = pInput->lastKey; - pBuf->iLastValue = pInput->iLastValue; } SET_VAL(pCtx, numOfNotNull, 1); @@ -3814,21 +3845,17 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) { SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx); STwaInfo *pInfo = (STwaInfo *)GET_ROWCELL_INTERBUF(pResInfo); - assert(pInfo->EKey >= pInfo->lastKey && pInfo->hasResult == pResInfo->hasResult); + assert(pInfo->win.ekey == pInfo->lastKey && pInfo->hasResult == pResInfo->hasResult); if (pInfo->hasResult != DATA_SET_FLAG) { setNull(pCtx->aOutputBuf, TSDB_DATA_TYPE_DOUBLE, sizeof(double)); return; } - if (pInfo->SKey == pInfo->EKey) { - *(double *)pCtx->aOutputBuf = 0; - } else if (pInfo->type >= TSDB_DATA_TYPE_TINYINT && pInfo->type <= TSDB_DATA_TYPE_BIGINT) { - pInfo->iOutput += pInfo->iLastValue * (pInfo->EKey - pInfo->lastKey); - *(double *)pCtx->aOutputBuf = pInfo->iOutput / (double)(pInfo->EKey - pInfo->SKey); + if (pInfo->win.ekey == pInfo->win.skey) { + *(double *)pCtx->aOutputBuf = pInfo->lastValue; } else { - pInfo->dOutput += pInfo->dLastValue * (pInfo->EKey - pInfo->lastKey); - *(double *)pCtx->aOutputBuf = pInfo->dOutput / (pInfo->EKey - pInfo->SKey); + *(double *)pCtx->aOutputBuf = pInfo->dOutput / (pInfo->win.ekey - pInfo->win.skey); } GET_RES_INFO(pCtx)->numOfRes = 1; diff --git a/src/client/src/tscLocalMerge.c b/src/client/src/tscLocalMerge.c index 3c7d46f914..2cd37013c5 100644 --- a/src/client/src/tscLocalMerge.c +++ b/src/client/src/tscLocalMerge.c @@ -198,6 +198,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd if (numOfFlush == 0 || numOfBuffer == 0) { tscLocalReducerEnvDestroy(pMemBuffer, pDesc, finalmodel, pFFModel, numOfBuffer); + pCmd->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; // no result, set the result empty tscDebug("%p retrieved no data", pSql); return; } @@ -330,22 +331,19 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd pReducer->nResultBufSize = pMemBuffer[0]->pageSize * 16; pReducer->pResultBuf = (tFilePage *)calloc(1, pReducer->nResultBufSize + sizeof(tFilePage)); - pReducer->finalRowSize = tscGetResRowLength(pQueryInfo->exprList); pReducer->resColModel = finalmodel; pReducer->resColModel->capacity = pReducer->nResultBufSize; - pReducer->finalModel = pFFModel; - assert(pReducer->finalRowSize > 0); - if (pReducer->finalRowSize > 0) { - pReducer->resColModel->capacity /= pReducer->finalRowSize; + if (finalmodel->rowSize > 0) { + pReducer->resColModel->capacity /= finalmodel->rowSize; } - assert(pReducer->finalRowSize <= pReducer->rowSize); + assert(finalmodel->rowSize > 0 && finalmodel->rowSize <= pReducer->rowSize); pReducer->pFinalRes = calloc(1, pReducer->rowSize * pReducer->resColModel->capacity); if (pReducer->pTempBuffer == NULL || pReducer->discardData == NULL || pReducer->pResultBuf == NULL || - /*pReducer->pBufForInterpo == NULL || */pReducer->pFinalRes == NULL || pReducer->prevRowOfInput == NULL) { + pReducer->pFinalRes == NULL || pReducer->prevRowOfInput == NULL) { tfree(pReducer->pTempBuffer); tfree(pReducer->discardData); tfree(pReducer->pResultBuf); @@ -920,7 +918,7 @@ static void genFinalResWithoutFill(SSqlRes* pRes, SLocalReducer *pLocalReducer, savePrevRecordAndSetupFillInfo(pLocalReducer, pQueryInfo, pLocalReducer->pFillInfo); } - memcpy(pRes->data, pBeforeFillData->data, (size_t)(pRes->numOfRows * pLocalReducer->finalRowSize)); + memcpy(pRes->data, pBeforeFillData->data, (size_t)(pRes->numOfRows * pLocalReducer->finalModel->rowSize)); pRes->numOfClauseTotal += pRes->numOfRows; pBeforeFillData->num = 0; @@ -1256,7 +1254,7 @@ bool genFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool noMoreCur tColModelCompact(pModel, pResBuf, pModel->capacity); if (tscIsSecondStageQuery(pQueryInfo)) { - pLocalReducer->finalRowSize = doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalReducer->finalRowSize); + doArithmeticCalculate(pQueryInfo, pResBuf, pModel->rowSize, pLocalReducer->finalModel->rowSize); } #ifdef _DEBUG_VIEW @@ -1627,7 +1625,8 @@ void tscInitResObjForLocalQuery(SSqlObj *pObj, int32_t numOfRes, int32_t rowLen) } int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_t rowSize, int32_t finalRowSize) { - char* pbuf = calloc(1, pOutput->num * rowSize); + int32_t maxRowSize = MAX(rowSize, finalRowSize); + char* pbuf = calloc(1, pOutput->num * maxRowSize); size_t size = tscNumOfFields(pQueryInfo); SArithmeticSupport arithSup = {0}; @@ -1660,7 +1659,6 @@ int32_t doArithmeticCalculate(SQueryInfo* pQueryInfo, tFilePage* pOutput, int32_ offset += pSup->field.bytes; } - assert(finalRowSize <= rowSize); memcpy(pOutput->data, pbuf, pOutput->num * offset); tfree(pbuf); diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 743cb42eb3..309d354849 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -4248,7 +4248,7 @@ static int32_t getTagQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondE tExprTreeDestroy(&p, NULL); taosArrayDestroy(colList); - if (taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0 && !UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { + if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0 && !UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), "filter on tag not supported for normal table"); } } @@ -4256,6 +4256,7 @@ static int32_t getTagQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondE pCondExpr->pTagCond = NULL; return ret; } + int32_t parseWhereClause(SQueryInfo* pQueryInfo, tSQLExpr** pExpr, SSqlObj* pSql) { if (pExpr == NULL) { return TSDB_CODE_SUCCESS; @@ -6648,7 +6649,7 @@ int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSQLExpr* pS return TSDB_CODE_SUCCESS; } else { - return TSDB_CODE_TSC_INVALID_SQL; + return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), "not support filter expression"); } } else { diff --git a/src/dnode/src/dnodeMain.c b/src/dnode/src/dnodeMain.c index 5907adc199..a4b9318cdf 100644 --- a/src/dnode/src/dnodeMain.c +++ b/src/dnode/src/dnodeMain.c @@ -19,6 +19,7 @@ #include "tutil.h" #include "tconfig.h" #include "tglobal.h" +#include "tfile.h" #include "twal.h" #include "trpc.h" #include "dnode.h" @@ -56,6 +57,7 @@ typedef struct { } SDnodeComponent; static const SDnodeComponent tsDnodeComponents[] = { + {"tfile", tfInit, tfCleanup}, {"rpc", rpcInit, rpcCleanup}, {"storage", dnodeInitStorage, dnodeCleanupStorage}, {"dnodecfg", dnodeInitCfg, dnodeCleanupCfg}, diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index da1852e05e..0d1a56cfc4 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -444,12 +444,12 @@ static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) { SCreateMnodeMsg *pCfg = pMsg->pCont; pCfg->dnodeId = htonl(pCfg->dnodeId); if (pCfg->dnodeId != dnodeGetDnodeId()) { - dError("dnodeId:%d, in create mnode msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId()); + dDebug("dnodeId:%d, in create mnode msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId()); return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED; } if (strcmp(pCfg->dnodeEp, tsLocalEp) != 0) { - dError("dnodeEp:%s, in create mnode msg is not equal with saved dnodeEp:%s", pCfg->dnodeEp, tsLocalEp); + dDebug("dnodeEp:%s, in create mnode msg is not equal with saved dnodeEp:%s", pCfg->dnodeEp, tsLocalEp); return TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED; } diff --git a/src/inc/twal.h b/src/inc/twal.h index 8dd3a8a912..1645de77aa 100644 --- a/src/inc/twal.h +++ b/src/inc/twal.h @@ -51,9 +51,8 @@ typedef struct { typedef void * twalh; // WAL HANDLE typedef int32_t FWalWrite(void *ahandle, void *pHead, int32_t qtype, void *pMsg); -int32_t walInit(); -void walCleanUp(); - +int32_t walInit(); +void walCleanUp(); twalh walOpen(char *path, SWalCfg *pCfg); int32_t walAlter(twalh pWal, SWalCfg *pCfg); void walStop(twalh); diff --git a/src/mnode/inc/mnodeSdb.h b/src/mnode/inc/mnodeSdb.h index 81629ede78..90c4eac40a 100644 --- a/src/mnode/inc/mnodeSdb.h +++ b/src/mnode/inc/mnodeSdb.h @@ -79,10 +79,13 @@ typedef struct { int32_t (*fpRestored)(); } SSdbTableDesc; +int32_t sdbInitRef(); +void sdbCleanUpRef(); int32_t sdbInit(); void sdbCleanUp(); -void * sdbOpenTable(SSdbTableDesc *desc); -void sdbCloseTable(void *handle); +int64_t sdbOpenTable(SSdbTableDesc *desc); +void sdbCloseTable(int64_t rid); +void* sdbGetTableByRid(int64_t rid); bool sdbIsMaster(); bool sdbIsServing(); void sdbUpdateMnodeRoles(); diff --git a/src/mnode/src/mnodeAcct.c b/src/mnode/src/mnodeAcct.c index 3da889b284..6fba05674f 100644 --- a/src/mnode/src/mnodeAcct.c +++ b/src/mnode/src/mnodeAcct.c @@ -26,6 +26,7 @@ #include "mnodeUser.h" #include "mnodeVgroup.h" +int64_t tsAcctRid = -1; void * tsAcctSdb = NULL; static int32_t tsAcctUpdateSize; static int32_t mnodeCreateRootAcct(); @@ -114,7 +115,8 @@ int32_t mnodeInitAccts() { .fpRestored = mnodeAcctActionRestored }; - tsAcctSdb = sdbOpenTable(&desc); + tsAcctRid = sdbOpenTable(&desc); + tsAcctSdb = sdbGetTableByRid(tsAcctRid); if (tsAcctSdb == NULL) { mError("table:%s, failed to create hash", desc.name); return -1; @@ -126,7 +128,7 @@ int32_t mnodeInitAccts() { void mnodeCleanupAccts() { acctCleanUp(); - sdbCloseTable(tsAcctSdb); + sdbCloseTable(tsAcctRid); tsAcctSdb = NULL; } diff --git a/src/mnode/src/mnodeCluster.c b/src/mnode/src/mnodeCluster.c index 839407c5b0..56229daffa 100644 --- a/src/mnode/src/mnodeCluster.c +++ b/src/mnode/src/mnodeCluster.c @@ -24,6 +24,7 @@ #include "mnodeShow.h" #include "tglobal.h" +int64_t tsClusterRid = -1; static void * tsClusterSdb = NULL; static int32_t tsClusterUpdateSize; static char tsClusterId[TSDB_CLUSTER_ID_LEN]; @@ -101,9 +102,10 @@ int32_t mnodeInitCluster() { .fpRestored = mnodeClusterActionRestored }; - tsClusterSdb = sdbOpenTable(&desc); + tsClusterRid = sdbOpenTable(&desc); + tsClusterSdb = sdbGetTableByRid(tsClusterRid); if (tsClusterSdb == NULL) { - mError("table:%s, failed to create hash", desc.name); + mError("table:%s, rid:%" PRId64 ", failed to create hash", desc.name, tsClusterRid); return -1; } @@ -116,7 +118,7 @@ int32_t mnodeInitCluster() { } void mnodeCleanupCluster() { - sdbCloseTable(tsClusterSdb); + sdbCloseTable(tsClusterRid); tsClusterSdb = NULL; } diff --git a/src/mnode/src/mnodeDb.c b/src/mnode/src/mnodeDb.c index 77f3b93eb1..999c606284 100644 --- a/src/mnode/src/mnodeDb.c +++ b/src/mnode/src/mnodeDb.c @@ -38,6 +38,7 @@ #include "mnodeVgroup.h" #define VG_LIST_SIZE 8 +int64_t tsDbRid = -1; static void * tsDbSdb = NULL; static int32_t tsDbUpdateSize; @@ -160,7 +161,8 @@ int32_t mnodeInitDbs() { .fpRestored = mnodeDbActionRestored }; - tsDbSdb = sdbOpenTable(&desc); + tsDbRid = sdbOpenTable(&desc); + tsDbSdb = sdbGetTableByRid(tsDbRid); if (tsDbSdb == NULL) { mError("failed to init db data"); return -1; @@ -496,7 +498,7 @@ void mnodeRemoveVgroupFromDb(SVgObj *pVgroup) { } void mnodeCleanupDbs() { - sdbCloseTable(tsDbSdb); + sdbCloseTable(tsDbRid); tsDbSdb = NULL; } diff --git a/src/mnode/src/mnodeDnode.c b/src/mnode/src/mnodeDnode.c index 0e9b56e459..65f4060392 100644 --- a/src/mnode/src/mnodeDnode.c +++ b/src/mnode/src/mnodeDnode.c @@ -39,6 +39,7 @@ #include "mnodeCluster.h" int32_t tsAccessSquence = 0; +int64_t tsDnodeRid = -1; static void * tsDnodeSdb = NULL; static int32_t tsDnodeUpdateSize = 0; extern void * tsMnodeSdb; @@ -187,7 +188,8 @@ int32_t mnodeInitDnodes() { .fpRestored = mnodeDnodeActionRestored }; - tsDnodeSdb = sdbOpenTable(&desc); + tsDnodeRid = sdbOpenTable(&desc); + tsDnodeSdb = sdbGetTableByRid(tsDnodeRid); if (tsDnodeSdb == NULL) { mError("failed to init dnodes data"); return -1; @@ -213,7 +215,7 @@ int32_t mnodeInitDnodes() { } void mnodeCleanupDnodes() { - sdbCloseTable(tsDnodeSdb); + sdbCloseTable(tsDnodeRid); pthread_mutex_destroy(&tsDnodeEpsMutex); free(tsDnodeEps); tsDnodeEps = NULL; diff --git a/src/mnode/src/mnodeMain.c b/src/mnode/src/mnodeMain.c index 1f5ad42bde..d15b32da54 100644 --- a/src/mnode/src/mnodeMain.c +++ b/src/mnode/src/mnodeMain.c @@ -47,6 +47,7 @@ void *tsMnodeTmr = NULL; static bool tsMgmtIsRunning = false; static const SMnodeComponent tsMnodeComponents[] = { + {"sdbref", sdbInitRef, sdbCleanUpRef}, {"profile", mnodeInitProfile, mnodeCleanupProfile}, {"cluster", mnodeInitCluster, mnodeCleanupCluster}, {"accts", mnodeInitAccts, mnodeCleanupAccts}, diff --git a/src/mnode/src/mnodeMnode.c b/src/mnode/src/mnodeMnode.c index 8e3ad0a248..7428e8d2c8 100644 --- a/src/mnode/src/mnodeMnode.c +++ b/src/mnode/src/mnodeMnode.c @@ -34,6 +34,7 @@ #include "mnodeUser.h" #include "mnodeVgroup.h" +int64_t tsMnodeRid = -1; static void * tsMnodeSdb = NULL; static int32_t tsMnodeUpdateSize = 0; static SRpcEpSet tsMnodeEpSetForShell; @@ -153,7 +154,8 @@ int32_t mnodeInitMnodes() { .fpRestored = mnodeMnodeActionRestored }; - tsMnodeSdb = sdbOpenTable(&desc); + tsMnodeRid = sdbOpenTable(&desc); + tsMnodeSdb = sdbGetTableByRid(tsMnodeRid); if (tsMnodeSdb == NULL) { mError("failed to init mnodes data"); return -1; @@ -168,7 +170,7 @@ int32_t mnodeInitMnodes() { } void mnodeCleanupMnodes() { - sdbCloseTable(tsMnodeSdb); + sdbCloseTable(tsMnodeRid); tsMnodeSdb = NULL; mnodeMnodeDestroyLock(); } @@ -251,12 +253,30 @@ void mnodeGetMnodeEpSetForPeer(SRpcEpSet *epSet) { mnodeMnodeRdLock(); *epSet = tsMnodeEpSetForPeer; mnodeMnodeUnLock(); + + for (int32_t i = 0; i < epSet->numOfEps; ++i) { + if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort + TSDB_PORT_DNODEDNODE) { + epSet->inUse = (i + 1) % epSet->numOfEps; + mTrace("mnode:%d, for peer ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse); + } else { + mTrace("mpeer:%d, for peer ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i])); + } + } } void mnodeGetMnodeEpSetForShell(SRpcEpSet *epSet) { mnodeMnodeRdLock(); *epSet = tsMnodeEpSetForShell; mnodeMnodeUnLock(); + + for (int32_t i = 0; i < epSet->numOfEps; ++i) { + if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) { + epSet->inUse = (i + 1) % epSet->numOfEps; + mTrace("mnode:%d, for shell ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse); + } else { + mTrace("mnode:%d, for shell ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i])); + } + } } char* mnodeGetMnodeMasterEp() { diff --git a/src/mnode/src/mnodePeer.c b/src/mnode/src/mnodePeer.c index 4ca19d2f6e..cc6a1b7485 100644 --- a/src/mnode/src/mnodePeer.c +++ b/src/mnode/src/mnodePeer.c @@ -57,16 +57,8 @@ int32_t mnodeProcessPeerReq(SMnodeMsg *pMsg) { rpcRsp->rsp = epSet; rpcRsp->len = sizeof(SRpcEpSet); - mDebug("msg:%p, ahandle:%p type:%s in mpeer queue will be redirected, numOfEps:%d inUse:%d", pMsg, - pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse); - for (int32_t i = 0; i < epSet->numOfEps; ++i) { - if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort + TSDB_PORT_DNODEDNODE) { - epSet->inUse = (i + 1) % epSet->numOfEps; - mDebug("mpeer:%d ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse); - } else { - mDebug("mpeer:%d ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i])); - } - } + mDebug("msg:%p, ahandle:%p type:%s in mpeer queue is redirected, numOfEps:%d inUse:%d", pMsg, pMsg->rpcMsg.ahandle, + taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse); return TSDB_CODE_RPC_REDIRECT; } diff --git a/src/mnode/src/mnodeProfile.c b/src/mnode/src/mnodeProfile.c index 8467adfc17..36b6ff7a59 100644 --- a/src/mnode/src/mnodeProfile.c +++ b/src/mnode/src/mnodeProfile.c @@ -34,7 +34,6 @@ #define QUERY_ID_SIZE 20 #define QUERY_STREAM_SAVE_SIZE 20 -extern void *tsMnodeTmr; static SCacheObj *tsMnodeConnCache = NULL; static int32_t tsConnIndex = 0; diff --git a/src/mnode/src/mnodeRead.c b/src/mnode/src/mnodeRead.c index ea7ce783e8..a39d35506f 100644 --- a/src/mnode/src/mnodeRead.c +++ b/src/mnode/src/mnodeRead.c @@ -51,21 +51,12 @@ int32_t mnodeProcessRead(SMnodeMsg *pMsg) { SMnodeRsp *rpcRsp = &pMsg->rpcRsp; SRpcEpSet *epSet = rpcMallocCont(sizeof(SRpcEpSet)); mnodeGetMnodeEpSetForShell(epSet); - - mDebug("msg:%p, app:%p type:%s in mread queue will be redirected, numOfEps:%d inUse:%d", pMsg, pMsg->rpcMsg.ahandle, - taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse); - for (int32_t i = 0; i < epSet->numOfEps; ++i) { - if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) { - epSet->inUse = (i + 1) % epSet->numOfEps; - mDebug("mnode index:%d ep:%s:%u, set inUse to %d", i, epSet->fqdn[i], htons(epSet->port[i]), epSet->inUse); - } else { - mDebug("mnode index:%d ep:%s:%u", i, epSet->fqdn[i], htons(epSet->port[i])); - } - } - rpcRsp->rsp = epSet; rpcRsp->len = sizeof(SRpcEpSet); + mDebug("msg:%p, app:%p type:%s in mread queue is redirected, numOfEps:%d inUse:%d", pMsg, pMsg->rpcMsg.ahandle, + taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse); + return TSDB_CODE_RPC_REDIRECT; } diff --git a/src/mnode/src/mnodeSdb.c b/src/mnode/src/mnodeSdb.c index 44e367244c..6670e87a7c 100644 --- a/src/mnode/src/mnodeSdb.c +++ b/src/mnode/src/mnodeSdb.c @@ -18,6 +18,7 @@ #include "taoserror.h" #include "hash.h" #include "tutil.h" +#include "tref.h" #include "tbalance.h" #include "tqueue.h" #include "twal.h" @@ -98,6 +99,7 @@ typedef struct { SSdbWorker *worker; } SSdbWorkerPool; +int32_t tsSdbRid; extern void * tsMnodeTmr; static void * tsSdbTmr; static SSdbMgmt tsSdbMgmt = {0}; @@ -118,6 +120,7 @@ static void sdbFreeQueue(); static int32_t sdbInsertHash(SSdbTable *pTable, SSdbRow *pRow); static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbRow *pRow); static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow); +static void sdbCloseTableObj(void *handle); int32_t sdbGetId(void *pTable) { return ((SSdbTable *)pTable)->autoIndex; @@ -385,6 +388,17 @@ void sdbUpdateSync(void *pMnodes) { sdbUpdateMnodeRoles(); } +int32_t sdbInitRef() { + tsSdbRid = taosOpenRef(10, sdbCloseTableObj); + if (tsSdbRid <= 0) { + sdbError("failed to init sdb ref"); + return -1; + } + return 0; +} + +void sdbCleanUpRef() { taosCloseRef(tsSdbRid); } + int32_t sdbInit() { pthread_mutex_init(&tsSdbMgmt.mutex, NULL); @@ -423,7 +437,7 @@ void sdbCleanUp() { walClose(tsSdbMgmt.wal); tsSdbMgmt.wal = NULL; } - + pthread_mutex_destroy(&tsSdbMgmt.mutex); } @@ -506,7 +520,7 @@ static int32_t sdbInsertHash(SSdbTable *pTable, SSdbRow *pRow) { atomic_add_fetch_32(&pTable->autoIndex, 1); } - sdbDebug("vgId:1, sdb:%s, insert key:%s to hash, rowSize:%d rows:%" PRId64 ", msg:%p", pTable->name, + sdbTrace("vgId:1, sdb:%s, insert key:%s to hash, rowSize:%d rows:%" PRId64 ", msg:%p", pTable->name, sdbGetRowStr(pTable, pRow->pObj), pRow->rowSize, pTable->numOfRows, pRow->pMsg); int32_t code = (*pTable->fpInsert)(pRow); @@ -542,7 +556,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow) { atomic_sub_fetch_32(&pTable->numOfRows, 1); - sdbDebug("vgId:1, sdb:%s, delete key:%s from hash, numOfRows:%" PRId64 ", msg:%p", pTable->name, + sdbTrace("vgId:1, sdb:%s, delete key:%s from hash, numOfRows:%" PRId64 ", msg:%p", pTable->name, sdbGetRowStr(pTable, pRow->pObj), pTable->numOfRows, pRow->pMsg); sdbDecRef(pTable, pRow->pObj); @@ -551,7 +565,7 @@ static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbRow *pRow) { } static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbRow *pRow) { - sdbDebug("vgId:1, sdb:%s, update key:%s in hash, numOfRows:%" PRId64 ", msg:%p", pTable->name, + sdbTrace("vgId:1, sdb:%s, update key:%s in hash, numOfRows:%" PRId64 ", msg:%p", pTable->name, sdbGetRowStr(pTable, pRow->pObj), pTable->numOfRows, pRow->pMsg); (*pTable->fpUpdate)(pRow); @@ -649,7 +663,7 @@ static int32_t sdbProcessWrite(void *wparam, void *hparam, int32_t qtype, void * return syncCode; } - sdbDebug("vgId:1, sdb:%s, record from wal/fwd is disposed, action:%s key:%s hver:%" PRIu64, pTable->name, + sdbTrace("vgId:1, sdb:%s, record from %s is disposed, action:%s key:%s hver:%" PRIu64, pTable->name, qtypeStr[qtype], actStr[action], sdbGetKeyStr(pTable, pHead->cont), pHead->version); // even it is WAL/FWD, it shall be called to update version in sync @@ -801,10 +815,10 @@ void sdbFreeIter(void *tparam, void *pIter) { taosHashCancelIterate(pTable->iHandle, pIter); } -void *sdbOpenTable(SSdbTableDesc *pDesc) { +int64_t sdbOpenTable(SSdbTableDesc *pDesc) { SSdbTable *pTable = (SSdbTable *)calloc(1, sizeof(SSdbTable)); - if (pTable == NULL) return NULL; + if (pTable == NULL) return -1; pthread_mutex_init(&pTable->mutex, NULL); tstrncpy(pTable->name, pDesc->name, SDB_TABLE_LEN); @@ -829,10 +843,21 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) { tsSdbMgmt.numOfTables++; tsSdbMgmt.tableList[pTable->id] = pTable; - return pTable; + + return taosAddRef(tsSdbRid, pTable); } -void sdbCloseTable(void *handle) { +void sdbCloseTable(int64_t rid) { + taosRemoveRef(tsSdbRid, rid); +} + +void *sdbGetTableByRid(int64_t rid) { + void *handle = taosAcquireRef(tsSdbRid, rid); + taosReleaseRef(tsSdbRid, rid); + return handle; +} + +static void sdbCloseTableObj(void *handle) { SSdbTable *pTable = (SSdbTable *)handle; if (pTable == NULL) return; @@ -855,6 +880,7 @@ void sdbCloseTable(void *handle) { taosHashCancelIterate(pTable->iHandle, pIter); taosHashCleanup(pTable->iHandle); + pTable->iHandle = NULL; pthread_mutex_destroy(&pTable->mutex); sdbDebug("vgId:1, sdb:%s, is closed, numOfTables:%d", pTable->name, tsSdbMgmt.numOfTables); diff --git a/src/mnode/src/mnodeShow.c b/src/mnode/src/mnodeShow.c index 2621441cb5..2da46d5b4b 100644 --- a/src/mnode/src/mnodeShow.c +++ b/src/mnode/src/mnodeShow.c @@ -52,7 +52,6 @@ static bool mnodeCheckShowFinished(SShowObj *pShow); static void *mnodePutShowObj(SShowObj *pShow); static void mnodeReleaseShowObj(SShowObj *pShow, bool forceRemove); -extern void *tsMnodeTmr; static void *tsMnodeShowCache = NULL; static int32_t tsShowObjIndex = 0; static SShowMetaFp tsMnodeShowMetaFp[TSDB_MGMT_TABLE_MAX] = {0}; diff --git a/src/mnode/src/mnodeTable.c b/src/mnode/src/mnodeTable.c index cc7ffb81ad..2149cb12c0 100644 --- a/src/mnode/src/mnodeTable.c +++ b/src/mnode/src/mnodeTable.c @@ -49,7 +49,9 @@ #define CREATE_CTABLE_RETRY_TIMES 10 #define CREATE_CTABLE_RETRY_SEC 14 +int64_t tsCTableRid = -1; static void * tsChildTableSdb; +int64_t tsSTableRid = -1; static void * tsSuperTableSdb; static int32_t tsChildTableUpdateSize; static int32_t tsSuperTableUpdateSize; @@ -350,7 +352,7 @@ static int32_t mnodeInitChildTables() { SCTableObj tObj; tsChildTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type; - SSdbTableDesc tableDesc = { + SSdbTableDesc desc = { .id = SDB_TABLE_CTABLE, .name = "ctables", .hashSessions = TSDB_DEFAULT_CTABLES_HASH_SIZE, @@ -366,7 +368,8 @@ static int32_t mnodeInitChildTables() { .fpRestored = mnodeChildTableActionRestored }; - tsChildTableSdb = sdbOpenTable(&tableDesc); + tsCTableRid = sdbOpenTable(&desc); + tsChildTableSdb = sdbGetTableByRid(tsCTableRid); if (tsChildTableSdb == NULL) { mError("failed to init child table data"); return -1; @@ -377,7 +380,7 @@ static int32_t mnodeInitChildTables() { } static void mnodeCleanupChildTables() { - sdbCloseTable(tsChildTableSdb); + sdbCloseTable(tsCTableRid); tsChildTableSdb = NULL; } @@ -543,7 +546,7 @@ static int32_t mnodeInitSuperTables() { SSTableObj tObj; tsSuperTableUpdateSize = (int8_t *)tObj.updateEnd - (int8_t *)&tObj.info.type; - SSdbTableDesc tableDesc = { + SSdbTableDesc desc = { .id = SDB_TABLE_STABLE, .name = "stables", .hashSessions = TSDB_DEFAULT_STABLES_HASH_SIZE, @@ -559,7 +562,8 @@ static int32_t mnodeInitSuperTables() { .fpRestored = mnodeSuperTableActionRestored }; - tsSuperTableSdb = sdbOpenTable(&tableDesc); + tsSTableRid = sdbOpenTable(&desc); + tsSuperTableSdb = sdbGetTableByRid(tsSTableRid); if (tsSuperTableSdb == NULL) { mError("failed to init stables data"); return -1; @@ -570,7 +574,7 @@ static int32_t mnodeInitSuperTables() { } static void mnodeCleanupSuperTables() { - sdbCloseTable(tsSuperTableSdb); + sdbCloseTable(tsSTableRid); tsSuperTableSdb = NULL; } diff --git a/src/mnode/src/mnodeUser.c b/src/mnode/src/mnodeUser.c index aee167631f..fb26086d04 100644 --- a/src/mnode/src/mnodeUser.c +++ b/src/mnode/src/mnodeUser.c @@ -33,6 +33,7 @@ #include "mnodeWrite.h" #include "mnodePeer.h" +int64_t tsUserRid = -1; static void * tsUserSdb = NULL; static int32_t tsUserUpdateSize = 0; static int32_t mnodeGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn); @@ -165,7 +166,8 @@ int32_t mnodeInitUsers() { .fpRestored = mnodeUserActionRestored }; - tsUserSdb = sdbOpenTable(&desc); + tsUserRid = sdbOpenTable(&desc); + tsUserSdb = sdbGetTableByRid(tsUserRid); if (tsUserSdb == NULL) { mError("table:%s, failed to create hash", desc.name); return -1; @@ -185,7 +187,7 @@ int32_t mnodeInitUsers() { } void mnodeCleanupUsers() { - sdbCloseTable(tsUserSdb); + sdbCloseTable(tsUserRid); tsUserSdb = NULL; } diff --git a/src/mnode/src/mnodeVgroup.c b/src/mnode/src/mnodeVgroup.c index a8343083aa..3e974f417f 100644 --- a/src/mnode/src/mnodeVgroup.c +++ b/src/mnode/src/mnodeVgroup.c @@ -51,6 +51,7 @@ char* vgroupStatus[] = { "updating" }; +int64_t tsVgroupRid = -1; static void *tsVgroupSdb = NULL; static int32_t tsVgUpdateSize = 0; @@ -222,7 +223,8 @@ int32_t mnodeInitVgroups() { .fpRestored = mnodeVgroupActionRestored, }; - tsVgroupSdb = sdbOpenTable(&desc); + tsVgroupRid = sdbOpenTable(&desc); + tsVgroupSdb = sdbGetTableByRid(tsVgroupRid); if (tsVgroupSdb == NULL) { mError("failed to init vgroups data"); return -1; @@ -610,7 +612,7 @@ void mnodeDropVgroup(SVgObj *pVgroup, void *ahandle) { } void mnodeCleanupVgroups() { - sdbCloseTable(tsVgroupSdb); + sdbCloseTable(tsVgroupRid); tsVgroupSdb = NULL; } diff --git a/src/mnode/src/mnodeWrite.c b/src/mnode/src/mnodeWrite.c index 8893316ffc..f0cfe1aedc 100644 --- a/src/mnode/src/mnodeWrite.c +++ b/src/mnode/src/mnodeWrite.c @@ -54,18 +54,8 @@ int32_t mnodeProcessWrite(SMnodeMsg *pMsg) { rpcRsp->rsp = epSet; rpcRsp->len = sizeof(SRpcEpSet); - mDebug("msg:%p, app:%p type:%s in write queue, will be redirected, numOfEps:%d inUse:%d", pMsg, pMsg->rpcMsg.ahandle, - taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse); - for (int32_t i = 0; i < epSet->numOfEps; ++i) { - if (strcmp(epSet->fqdn[i], tsLocalFqdn) == 0 && htons(epSet->port[i]) == tsServerPort) { - epSet->inUse = (i + 1) % epSet->numOfEps; - mDebug("msg:%p, app:%p mnode index:%d ep:%s:%d, set inUse to %d", pMsg, pMsg->rpcMsg.ahandle, i, epSet->fqdn[i], - htons(epSet->port[i]), epSet->inUse); - } else { - mDebug("msg:%p, app:%p mnode index:%d ep:%s:%d", pMsg, pMsg->rpcMsg.ahandle, i, epSet->fqdn[i], - htons(epSet->port[i])); - } - } + mDebug("msg:%p, app:%p type:%s in write queue, is redirected, numOfEps:%d inUse:%d", pMsg, + pMsg->rpcMsg.ahandle, taosMsg[pMsg->rpcMsg.msgType], epSet->numOfEps, epSet->inUse); return TSDB_CODE_RPC_REDIRECT; } diff --git a/src/os/inc/osFile.h b/src/os/inc/osFile.h index 63f93bc0a1..db8963d8f7 100644 --- a/src/os/inc/osFile.h +++ b/src/os/inc/osFile.h @@ -20,17 +20,6 @@ extern "C" { #endif -#define tread(fd, buf, count) read(fd, buf, count) -#define twrite(fd, buf, count) write(fd, buf, count) -#define tlseek(fd, offset, whence) lseek(fd, offset, whence) -#define tclose(fd) \ - { \ - if (FD_VALID(fd)) { \ - close(fd); \ - fd = FD_INITIALIZER; \ - } \ - } - int64_t taosReadImp(int32_t fd, void *buf, int64_t count); int64_t taosWriteImp(int32_t fd, void *buf, int64_t count); int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence); @@ -40,7 +29,13 @@ int64_t taosCopy(char *from, char *to); #define taosRead(fd, buf, count) taosReadImp(fd, buf, count) #define taosWrite(fd, buf, count) taosWriteImp(fd, buf, count) #define taosLSeek(fd, offset, whence) taosLSeekImp(fd, offset, whence) -#define taosClose(x) tclose(x) +#define taosClose(fd) \ + { \ + if (FD_VALID(fd)) { \ + close(fd); \ + fd = FD_INITIALIZER; \ + } \ + } // TAOS_OS_FUNC_FILE_SENDIFLE int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size); diff --git a/src/os/src/detail/osFile.c b/src/os/src/detail/osFile.c index 7380614382..542b03b29e 100644 --- a/src/os/src/detail/osFile.c +++ b/src/os/src/detail/osFile.c @@ -116,7 +116,7 @@ int64_t taosWriteImp(int32_t fd, void *buf, int64_t n) { } int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence) { - return (int64_t)tlseek(fd, (long)offset, whence); + return (int64_t)lseek(fd, (long)offset, whence); } int64_t taosCopy(char *from, char *to) { diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index 895b414a56..71fc01923a 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -63,9 +63,11 @@ typedef struct SSqlGroupbyExpr { typedef struct SResultRow { int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer - int32_t rowId:15; - bool closed:1; // this result status: closed or opened - uint16_t numOfRows; // number of rows of current time window + int32_t rowId:29; // row index in buffer page + bool startInterp; // the time window start timestamp has done the interpolation already. + bool endInterp; // the time window end timestamp has done the interpolation already. + bool closed; // this result status: closed or opened + uint32_t numOfRows; // number of rows of current time window SResultRowCellInfo* pCellInfo; // For each result column, there is a resultInfo union {STimeWindow win; char* key;}; // start key of current time window } SResultRow; @@ -187,6 +189,7 @@ typedef struct SQueryRuntimeEnv { bool topBotQuery; // false bool groupbyNormalCol; // denote if this is a groupby normal column query bool hasTagResults; // if there are tag values in final result or not + bool timeWindowInterpo;// if the time window start/end required interpolation int32_t interBufSize; // intermediate buffer sizse int32_t prevGroupId; // previous executed group id SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file @@ -195,6 +198,8 @@ typedef struct SQueryRuntimeEnv { SResultRowPool* pool; // window result object pool int32_t* rowCellInfoOffset;// offset value for each row result cell info + char** prevRow; + char** nextRow; } SQueryRuntimeEnv; enum { diff --git a/src/query/inc/tsqlfunction.h b/src/query/inc/tsqlfunction.h index 32cbb56c62..5a923db52c 100644 --- a/src/query/inc/tsqlfunction.h +++ b/src/query/inc/tsqlfunction.h @@ -152,6 +152,11 @@ typedef struct SResultRowCellInfo { uint32_t numOfRes; // num of output result in current buffer } SResultRowCellInfo; +typedef struct SPoint1 { + int64_t key; + double val; +} SPoint1; + #define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowCellInfo))) struct SQLFunctionCtx; @@ -194,6 +199,8 @@ typedef struct SQLFunctionCtx { SResultRowCellInfo *resultInfo; SExtTagsInfo tagInfo; + SPoint1 start; + SPoint1 end; } SQLFunctionCtx; typedef struct SQLAggFuncElem { @@ -243,21 +250,11 @@ enum { }; typedef struct STwaInfo { - TSKEY lastKey; - int8_t hasResult; // flag to denote has value - int16_t type; // source data type - TSKEY SKey; - TSKEY EKey; - - union { - double dOutput; - int64_t iOutput; - }; - - union { - double dLastValue; - int64_t iLastValue; - }; + TSKEY lastKey; + int8_t hasResult; // flag to denote has value + double dOutput; + double lastValue; + STimeWindow win; } STwaInfo; /* global sql function array */ @@ -276,8 +273,6 @@ bool topbot_datablock_filter(SQLFunctionCtx *pCtx, int32_t functionId, const cha (_r)->initialized = false; \ } while (0) -//void setResultInfoBuf(SResultRowCellInfo *pResInfo, char* buf); - static FORCE_INLINE void initResultInfo(SResultRowCellInfo *pResInfo, uint32_t bufLen) { pResInfo->initialized = true; // the this struct has been initialized flag diff --git a/src/query/src/qAst.c b/src/query/src/qAst.c index e813688d84..a65f4a6dc9 100644 --- a/src/query/src/qAst.c +++ b/src/query/src/qAst.c @@ -370,6 +370,66 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S #endif } +static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOfRows) { + switch(type) { + case TSDB_DATA_TYPE_TINYINT: { + int8_t* p = (int8_t*) dest; + int8_t* pSrc = (int8_t*) src; + + for(int32_t i = 0; i < numOfRows; ++i) { + p[i] = pSrc[numOfRows - i - 1]; + } + break; + } + case TSDB_DATA_TYPE_SMALLINT: { + int16_t* p = (int16_t*) dest; + int16_t* pSrc = (int16_t*) src; + + for(int32_t i = 0; i < numOfRows; ++i) { + p[i] = pSrc[numOfRows - i - 1]; + } + break; + } + case TSDB_DATA_TYPE_INT: { + int32_t* p = (int32_t*) dest; + int32_t* pSrc = (int32_t*) src; + + for(int32_t i = 0; i < numOfRows; ++i) { + p[i] = pSrc[numOfRows - i - 1]; + } + break; + } + case TSDB_DATA_TYPE_BIGINT: { + int64_t* p = (int64_t*) dest; + int64_t* pSrc = (int64_t*) src; + + for(int32_t i = 0; i < numOfRows; ++i) { + p[i] = pSrc[numOfRows - i - 1]; + } + break; + } + case TSDB_DATA_TYPE_FLOAT: { + float* p = (float*) dest; + float* pSrc = (float*) src; + + for(int32_t i = 0; i < numOfRows; ++i) { + p[i] = pSrc[numOfRows - i - 1]; + } + break; + } + case TSDB_DATA_TYPE_DOUBLE: { + double* p = (double*) dest; + double* pSrc = (double*) src; + + for(int32_t i = 0; i < numOfRows; ++i) { + p[i] = pSrc[numOfRows - i - 1]; + } + break; + } + default: assert(0); + } +} + void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order, char *(*getSourceDataBlock)(void *, const char*, int32_t)) { if (pExprs == NULL) { @@ -387,6 +447,8 @@ void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, /* the right output has result from the right child syntax tree */ char *pRightOutput = malloc(sizeof(int64_t) * numOfRows); + char *pdata = malloc(sizeof(int64_t) * numOfRows); + if (pRight->nodeType == TSQL_NODE_EXPR) { tExprTreeCalcTraverse(pRight, numOfRows, pRightOutput, param, order, getSourceDataBlock); } @@ -398,52 +460,75 @@ void tExprTreeCalcTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, * the type of returned value of one expression is always double float precious */ _bi_consumer_fn_t fp = tGetBiConsumerFn(TSDB_DATA_TYPE_DOUBLE, TSDB_DATA_TYPE_DOUBLE, pExprs->_node.optr); - fp(pLeftOutput, pRightOutput, numOfRows, numOfRows, pOutput, order); + fp(pLeftOutput, pRightOutput, numOfRows, numOfRows, pOutput, TSDB_ORDER_ASC); } else if (pRight->nodeType == TSQL_NODE_COL) { // exprLeft + columnRight _bi_consumer_fn_t fp = tGetBiConsumerFn(TSDB_DATA_TYPE_DOUBLE, pRight->pSchema->type, pExprs->_node.optr); + // set input buffer char *pInputData = getSourceDataBlock(param, pRight->pSchema->name, pRight->pSchema->colId); - fp(pLeftOutput, pInputData, numOfRows, numOfRows, pOutput, order); + if (order == TSDB_ORDER_DESC) { + reverseCopy(pdata, pInputData, pRight->pSchema->type, numOfRows); + fp(pLeftOutput, pdata, numOfRows, numOfRows, pOutput, TSDB_ORDER_ASC); + } else { + fp(pLeftOutput, pInputData, numOfRows, numOfRows, pOutput, TSDB_ORDER_ASC); + } } else if (pRight->nodeType == TSQL_NODE_VALUE) { // exprLeft + 12 _bi_consumer_fn_t fp = tGetBiConsumerFn(TSDB_DATA_TYPE_DOUBLE, pRight->pVal->nType, pExprs->_node.optr); - fp(pLeftOutput, &pRight->pVal->i64Key, numOfRows, 1, pOutput, order); + fp(pLeftOutput, &pRight->pVal->i64Key, numOfRows, 1, pOutput, TSDB_ORDER_ASC); } } else if (pLeft->nodeType == TSQL_NODE_COL) { // column data specified on left-hand-side char *pLeftInputData = getSourceDataBlock(param, pLeft->pSchema->name, pLeft->pSchema->colId); if (pRight->nodeType == TSQL_NODE_EXPR) { // columnLeft + expr2 _bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pSchema->type, TSDB_DATA_TYPE_DOUBLE, pExprs->_node.optr); - fp(pLeftInputData, pRightOutput, numOfRows, numOfRows, pOutput, order); + + if (order == TSDB_ORDER_DESC) { + reverseCopy(pdata, pLeftInputData, pLeft->pSchema->type, numOfRows); + fp(pdata, pRightOutput, numOfRows, numOfRows, pOutput, TSDB_ORDER_ASC); + } else { + fp(pLeftInputData, pRightOutput, numOfRows, numOfRows, pOutput, TSDB_ORDER_ASC); + } } else if (pRight->nodeType == TSQL_NODE_COL) { // columnLeft + columnRight // column data specified on right-hand-side char *pRightInputData = getSourceDataBlock(param, pRight->pSchema->name, pRight->pSchema->colId); - _bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pSchema->type, pRight->pSchema->type, pExprs->_node.optr); - fp(pLeftInputData, pRightInputData, numOfRows, numOfRows, pOutput, order); + // both columns are descending order, do not reverse the source data + fp(pLeftInputData, pRightInputData, numOfRows, numOfRows, pOutput, order); } else if (pRight->nodeType == TSQL_NODE_VALUE) { // columnLeft + 12 _bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pSchema->type, pRight->pVal->nType, pExprs->_node.optr); - fp(pLeftInputData, &pRight->pVal->i64Key, numOfRows, 1, pOutput, order); + + if (order == TSDB_ORDER_DESC) { + reverseCopy(pdata, pLeftInputData, pLeft->pSchema->type, numOfRows); + fp(pdata, &pRight->pVal->i64Key, numOfRows, 1, pOutput, TSDB_ORDER_ASC); + } else { + fp(pLeftInputData, &pRight->pVal->i64Key, numOfRows, 1, pOutput, TSDB_ORDER_ASC); + } } } else { // column data specified on left-hand-side if (pRight->nodeType == TSQL_NODE_EXPR) { // 12 + expr2 _bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pVal->nType, TSDB_DATA_TYPE_DOUBLE, pExprs->_node.optr); - fp(&pLeft->pVal->i64Key, pRightOutput, 1, numOfRows, pOutput, order); + fp(&pLeft->pVal->i64Key, pRightOutput, 1, numOfRows, pOutput, TSDB_ORDER_ASC); } else if (pRight->nodeType == TSQL_NODE_COL) { // 12 + columnRight // column data specified on right-hand-side char *pRightInputData = getSourceDataBlock(param, pRight->pSchema->name, pRight->pSchema->colId); - _bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pVal->nType, pRight->pSchema->type, pExprs->_node.optr); - fp(&pLeft->pVal->i64Key, pRightInputData, 1, numOfRows, pOutput, order); + + if (order == TSDB_ORDER_DESC) { + reverseCopy(pdata, pRightInputData, pRight->pSchema->type, numOfRows); + fp(&pLeft->pVal->i64Key, pdata, numOfRows, 1, pOutput, TSDB_ORDER_ASC); + } else { + fp(&pLeft->pVal->i64Key, pRightInputData, 1, numOfRows, pOutput, TSDB_ORDER_ASC); + } } else if (pRight->nodeType == TSQL_NODE_VALUE) { // 12 + 12 _bi_consumer_fn_t fp = tGetBiConsumerFn(pLeft->pVal->nType, pRight->pVal->nType, pExprs->_node.optr); - fp(&pLeft->pVal->i64Key, &pRight->pVal->i64Key, 1, 1, pOutput, order); + fp(&pLeft->pVal->i64Key, &pRight->pVal->i64Key, 1, 1, pOutput, TSDB_ORDER_ASC); } } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 826858d1dd..ec70280c7f 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -27,6 +27,7 @@ #include "query.h" #include "queryLog.h" #include "tlosertree.h" +#include "ttype.h" #define MAX_ROWS_PER_RESBUF_PAGE ((1u<<12) - 1) @@ -194,6 +195,7 @@ static int32_t setAdditionalInfo(SQInfo *pQInfo, void *pTable, STableQueryInfo * static int32_t flushFromResultBuf(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo); static int32_t checkForQueryBuf(size_t numOfTables); static void releaseQueryBuf(size_t numOfTables); +static int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order); bool doFilterData(SQuery *pQuery, int32_t elemPos) { for (int32_t k = 0; k < pQuery->numOfFilterCols; ++k) { @@ -400,6 +402,17 @@ static bool isTopBottomQuery(SQuery *pQuery) { return false; } +static bool timeWindowInterpoRequired(SQuery *pQuery) { + for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { + int32_t functionId = pQuery->pExpr1[i].base.functionId; + if (functionId == TSDB_FUNC_TWA) { + return true; + } + } + + return false; +} + static bool hasTagValOutput(SQuery* pQuery) { SExprInfo *pExprInfo = &pQuery->pExpr1[0]; if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP) { @@ -457,6 +470,7 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin return NULL; } + // TODO refactor // more than the capacity, reallocate the resources if (pWindowResInfo->size >= pWindowResInfo->capacity) { int64_t newCapacity = 0; @@ -595,14 +609,17 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf } static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, SDataBlockInfo* pBockInfo, - STimeWindow *win, bool masterscan, bool* newWind) { + STimeWindow *win, bool masterscan, bool* newWind, SResultRow** pResult) { assert(win->skey <= win->ekey); SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; - SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, pBockInfo->uid); + // todo refactor + int64_t uid = getResultInfoUId(pRuntimeEnv); + SResultRow *pResultRow = doPrepareResultRowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, uid); if (pResultRow == NULL) { *newWind = false; + // no master scan, no result generated means error occurs return masterscan? -1:0; } @@ -618,15 +635,40 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes // set time window for current result pResultRow->win = (*win); + *pResult = pResultRow; setResultRowOutputBufInitCtx(pRuntimeEnv, pResultRow); + return TSDB_CODE_SUCCESS; } -static bool getTimeWindowResStatus(SWindowResInfo *pWindowResInfo, int32_t slot) { +static bool getResultRowStatus(SWindowResInfo *pWindowResInfo, int32_t slot) { assert(slot >= 0 && slot < pWindowResInfo->size); return pWindowResInfo->pResult[slot]->closed; } +typedef enum SResultTsInterpType { + RESULT_ROW_START_INTERP = 1, + RESULT_ROW_END_INTERP = 2, +} SResultTsInterpType; + +static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) { + assert(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP)); + if (type == RESULT_ROW_START_INTERP) { + pResult->startInterp = true; + } else { + pResult->endInterp = true; + } +} + +static bool isResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) { + assert(pResult != NULL && (type == RESULT_ROW_START_INTERP || type == RESULT_ROW_END_INTERP)); + if (type == RESULT_ROW_START_INTERP) { + return pResult->startInterp == true; + } else { + return pResult->endInterp == true; + } +} + static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, int16_t pos, int16_t order, int64_t *pData) { int32_t forwardStep = 0; @@ -990,6 +1032,113 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas return dataBlock; } +// window start key interpolation +static bool setTimeWindowInterpolationStartTs(SQueryRuntimeEnv* pRuntimeEnv, int32_t pos, SArray* pDataBlock, TSKEY* tsCols, STimeWindow* win) { + SQuery* pQuery = pRuntimeEnv->pQuery; + + TSKEY start = tsCols[pos]; + TSKEY lastTs = *(TSKEY *) pRuntimeEnv->prevRow[0]; + TSKEY prevTs = (pos == 0)? lastTs : tsCols[pos - 1]; + + // if lastTs == INT64_MIN, it is the first block, no need to do the start time interpolation + if (((lastTs != INT64_MIN && pos >= 0) || (lastTs == INT64_MIN && pos > 0)) && win->skey > lastTs && + win->skey < start) { + + for (int32_t k = 0; k < pQuery->numOfCols; ++k) { + SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, k); + if (k == 0 && pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + assert(pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); + continue; + } + + double v1 = 0, v2 = 0, v = 0; + + char *prevVal = pos == 0 ? pRuntimeEnv->prevRow[k] : ((char*)pColInfo->pData) + (pos - 1) * pColInfo->info.bytes; + + GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)prevVal); + GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + pos * pColInfo->info.bytes); + + SPoint point1 = (SPoint){.key = prevTs, .val = &v1}; + SPoint point2 = (SPoint){.key = start, .val = &v2}; + SPoint point = (SPoint){.key = win->skey, .val = &v}; + taosGetLinearInterpolationVal(TSDB_DATA_TYPE_DOUBLE, &point1, &point2, &point); + pRuntimeEnv->pCtx[k].start.key = point.key; + pRuntimeEnv->pCtx[k].start.val = v; + } + + return true; + } else { + for (int32_t k = 0; k < pQuery->numOfCols; ++k) { + pRuntimeEnv->pCtx[k].start.key = INT64_MIN; + } + + return false; + } +} + +static bool setTimeWindowInterpolationEndTs(SQueryRuntimeEnv* pRuntimeEnv, int32_t pos, SArray* pDataBlock, TSKEY* tsCols, TSKEY ekey, STimeWindow* win) { + SQuery* pQuery = pRuntimeEnv->pQuery; + TSKEY trueEndKey = tsCols[pos]; + + if (win->ekey < ekey && win->ekey != trueEndKey) { + int32_t nextIndex = pos + 1; + TSKEY next = tsCols[nextIndex]; + + for (int32_t k = 0; k < pQuery->numOfCols; ++k) { + SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, k); + if (k == 0 && pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP && + pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX) { + continue; + } + + double v1 = 0, v2 = 0, v = 0; + GET_TYPED_DATA(v1, double, pColInfo->info.type, (char *)pColInfo->pData + pos * pColInfo->info.bytes); + GET_TYPED_DATA(v2, double, pColInfo->info.type, (char *)pColInfo->pData + nextIndex * pColInfo->info.bytes); + + SPoint point1 = (SPoint){.key = trueEndKey, .val = &v1}; + SPoint point2 = (SPoint){.key = next, .val = &v2}; + SPoint point = (SPoint){.key = win->ekey, .val = &v}; + taosGetLinearInterpolationVal(TSDB_DATA_TYPE_DOUBLE, &point1, &point2, &point); + pRuntimeEnv->pCtx[k].end.key = point.key; + pRuntimeEnv->pCtx[k].end.val = v; + } + + return true; + } else { // current time window does not ended in current data block, do nothing + for (int32_t k = 0; k < pQuery->numOfCols; ++k) { + pRuntimeEnv->pCtx[k].end.key = INT64_MIN; + } + + return false; + } +} + +static void saveDataBlockLastRow(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pDataBlockInfo, SArray* pDataBlock) { + if (pDataBlock == NULL) { + return; + } + + SQuery* pQuery = pRuntimeEnv->pQuery; + for (int32_t k = 0; k < pQuery->numOfCols; ++k) { + SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, k); + memcpy(pRuntimeEnv->prevRow[k], ((char*)pColInfo->pData) + (pColInfo->info.bytes * (pDataBlockInfo->rows - 1)), + pColInfo->info.bytes); + } +} + +static TSKEY getStartTsKey(SQuery* pQuery, SDataBlockInfo* pDataBlockInfo, TSKEY* tsCols, int32_t step) { + TSKEY ts = TSKEY_INITIAL_VAL; + + if (tsCols == NULL) { + ts = QUERY_IS_ASC_QUERY(pQuery) ? pDataBlockInfo->window.skey : pDataBlockInfo->window.ekey; + } else { + int32_t offset = GET_COL_DATA_POS(pQuery, 0, step); + ts = tsCols[offset]; + } + + return ts; +} + /** * todo set the last value for pQueryTableInfo as in rowwiseapplyfunctions * @param pRuntimeEnv @@ -1000,16 +1149,15 @@ static char *getDataBlock(SQueryRuntimeEnv *pRuntimeEnv, SArithmeticSupport *sas * @return the incremental number of output value, so it maybe 0 for fixed number of query, * such as count/min/max etc. */ -static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, - SDataBlockInfo *pDataBlockInfo, SWindowResInfo *pWindowResInfo, - __block_search_fn_t searchFn, SArray *pDataBlock) { +static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pStatis, SDataBlockInfo *pDataBlockInfo, + SWindowResInfo *pWindowResInfo, __block_search_fn_t searchFn, SArray *pDataBlock) { SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx; - bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); + bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); SQuery *pQuery = pRuntimeEnv->pQuery; TSKEY *tsCols = NULL; if (pDataBlock != NULL) { - SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, 0); + SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, 0); tsCols = (TSKEY *)(pColInfo->pData); } @@ -1018,7 +1166,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); } - SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv); + SQInfo *pQInfo = GET_QINFO_ADDR(pRuntimeEnv); for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { char *dataBlock = getDataBlock(pRuntimeEnv, &sasArray[k], k, pDataBlockInfo->rows, pDataBlock); setExecParams(pQuery, &pCtx[k], dataBlock, tsCols, pDataBlockInfo, pStatis, &sasArray[k], k, pQInfo->vgId); @@ -1026,18 +1174,13 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); if (QUERY_IS_INTERVAL_QUERY(pQuery)) { - TSKEY ts = TSKEY_INITIAL_VAL; + TSKEY ts = getStartTsKey(pQuery, pDataBlockInfo, tsCols, step); - if (tsCols == NULL) { - ts = QUERY_IS_ASC_QUERY(pQuery)? pDataBlockInfo->window.skey:pDataBlockInfo->window.ekey; - } else { - int32_t offset = GET_COL_DATA_POS(pQuery, 0, step); - ts = tsCols[offset]; - } - - bool hasTimeWindow = false; + bool hasTimeWindow = false; + SResultRow* pResult = NULL; STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) { + int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow, &pResult); + if (ret != TSDB_CODE_SUCCESS) { tfree(sasArray); return; } @@ -1045,11 +1188,32 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * int32_t forwardStep = 0; int32_t startPos = pQuery->pos; + // in case of repeat scan/reverse scan, no new time window added. if (hasTimeWindow) { TSKEY ekey = reviseWindowEkey(pQuery, &win); forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, pQuery->pos, ekey, searchFn, true); - bool pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); + // window start key interpolation + if (pRuntimeEnv->timeWindowInterpo) { + bool alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_START_INTERP); + if (!alreadyInterp) { + bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, pQuery->pos, pDataBlock, tsCols, &win); + if (interp) { + setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); + } + } + + alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_END_INTERP); + if (!alreadyInterp) { + bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, pQuery->pos + forwardStep - 1, pDataBlock, tsCols, + pDataBlockInfo->window.ekey, &win); + if (interp) { + setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); + } + } + } + + bool pStatus = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); doBlockwiseApplyFunctions(pRuntimeEnv, pStatus, &win, startPos, forwardStep, tsCols, pDataBlockInfo->rows); } @@ -1065,7 +1229,8 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * // null data, failed to allocate more memory buffer hasTimeWindow = false; - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &nextWin, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) { + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &nextWin, masterScan, &hasTimeWindow, &pResult) != + TSDB_CODE_SUCCESS) { break; } @@ -1076,7 +1241,26 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * TSKEY ekey = reviseWindowEkey(pQuery, &nextWin); forwardStep = getNumOfRowsInTimeWindow(pQuery, pDataBlockInfo, tsCols, startPos, ekey, searchFn, true); - bool closed = getTimeWindowResStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); + // window start(end) key interpolation + if (pRuntimeEnv->timeWindowInterpo) { + bool alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_START_INTERP); + if (!alreadyInterp) { + bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startPos, pDataBlock, tsCols, &nextWin); + if (interp) { + setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); + } + } + + alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_END_INTERP); + if (!alreadyInterp) { + bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, startPos + forwardStep - 1, pDataBlock, tsCols, pDataBlockInfo->window.ekey, &nextWin); + if (interp) { + setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); + } + } + } + + bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); doBlockwiseApplyFunctions(pRuntimeEnv, closed, &nextWin, startPos, forwardStep, tsCols, pDataBlockInfo->rows); } @@ -1090,12 +1274,17 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis * for (int32_t k = 0; k < pQuery->numOfOutput; ++k) { int32_t functionId = pQuery->pExpr1[k].base.functionId; if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) { + pCtx[k].nStartQueryTimestamp = pDataBlockInfo->window.skey; aAggs[functionId].xFunction(&pCtx[k]); } } } - for(int32_t i = 0; i < pQuery->numOfOutput; ++i) { + if (pRuntimeEnv->timeWindowInterpo) { + saveDataBlockLastRow(pRuntimeEnv, pDataBlockInfo, pDataBlock); + } + + for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { if (pQuery->pExpr1[i].base.functionId != TSDB_FUNC_ARITHM) { continue; } @@ -1318,20 +1507,20 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS pQuery->order.order, pRuntimeEnv->pTSBuf->cur.order); } - int32_t j = 0; int32_t offset = -1; +// TSKEY prev = -1; - for (j = 0; j < pDataBlockInfo->rows; ++j) { + for (int32_t j = 0; j < pDataBlockInfo->rows; ++j) { offset = GET_COL_DATA_POS(pQuery, j, step); if (pRuntimeEnv->pTSBuf != NULL) { - int32_t r = doTSJoinFilter(pRuntimeEnv, offset); - if (r == TS_JOIN_TAG_NOT_EQUALS) { + int32_t ret = doTSJoinFilter(pRuntimeEnv, offset); + if (ret == TS_JOIN_TAG_NOT_EQUALS) { break; - } else if (r == TS_JOIN_TS_NOT_EQUALS) { + } else if (ret == TS_JOIN_TS_NOT_EQUALS) { continue; } else { - assert(r == TS_JOIN_TS_EQUAL); + assert(ret == TS_JOIN_TS_EQUAL); } } @@ -1344,8 +1533,9 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS int64_t ts = tsCols[offset]; STimeWindow win = getActiveTimeWindow(pWindowResInfo, ts, pQuery); - bool hasTimeWindow = false; - int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow); + bool hasTimeWindow = false; + SResultRow* pResult = NULL; + int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &win, masterScan, &hasTimeWindow, &pResult); if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code continue; } @@ -1353,8 +1543,28 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS if (!hasTimeWindow) { continue; } +/* + // window start key interpolation + if (pRuntimeEnv->timeWindowInterpo) { + bool alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_START_INTERP); + if (!alreadyInterp) { + bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, pos, pDataBlock, tsCols, &win); + if (interp) { + setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); + } + } - bool closed = getTimeWindowResStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); + alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_END_INTERP); + if (!alreadyInterp) { + bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, pQuery->pos + forwardStep - 1, pDataBlock, tsCols, + pDataBlockInfo->window.ekey, &win); + if (interp) { + setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); + } + } + } +*/ + bool closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); doRowwiseApplyFunctions(pRuntimeEnv, closed, &win, offset); STimeWindow nextWin = win; @@ -1373,12 +1583,32 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS // null data, failed to allocate more memory buffer hasTimeWindow = false; - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &nextWin, masterScan, &hasTimeWindow) != TSDB_CODE_SUCCESS) { + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pDataBlockInfo, &nextWin, masterScan, &hasTimeWindow, &pResult) != TSDB_CODE_SUCCESS) { break; } if (hasTimeWindow) { - closed = getTimeWindowResStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); +/* + // window start(end) key interpolation + if (pRuntimeEnv->timeWindowInterpo) { + bool alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_START_INTERP); + if (!alreadyInterp) { + bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startPos, pDataBlock, tsCols, &nextWin); + if (interp) { + setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); + } + } + + alreadyInterp = isResultRowInterpo(pResult, RESULT_ROW_END_INTERP); + if (!alreadyInterp) { + bool interp = setTimeWindowInterpolationEndTs(pRuntimeEnv, startPos + forwardStep - 1, pDataBlock, tsCols, pDataBlockInfo->window.ekey, &nextWin); + if (interp) { + setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); + } + } + } +*/ + closed = getResultRowStatus(pWindowResInfo, curTimeWindowIndex(pWindowResInfo)); doRowwiseApplyFunctions(pRuntimeEnv, closed, &nextWin, offset); } } @@ -1403,6 +1633,8 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS } } +// prev = tsCols[offset]; + if (pRuntimeEnv->pTSBuf != NULL) { // if timestamp filter list is empty, quit current query if (!tsBufNextPos(pRuntimeEnv->pTSBuf)) { @@ -1528,10 +1760,10 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY * top/bottom values emerge, so does diff function */ if (functionId == TSDB_FUNC_TWA) { - SResultRowCellInfo* pInfo = GET_RES_INFO(pCtx); - STwaInfo *pTWAInfo = (STwaInfo*) GET_ROWCELL_INTERBUF(pInfo); - pTWAInfo->SKey = pQuery->window.skey; - pTWAInfo->EKey = pQuery->window.ekey; + pCtx->param[1].i64Key = pQuery->window.skey; + pCtx->param[1].nType = TSDB_DATA_TYPE_BIGINT; + pCtx->param[2].i64Key = pQuery->window.ekey; + pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; } } else if (functionId == TSDB_FUNC_ARITHM) { @@ -1677,6 +1909,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order pCtx->functionId = pSqlFuncMsg->functionId; pCtx->stableQuery = pRuntimeEnv->stableQuery; pCtx->interBufBytes = pQuery->pExpr1[i].interBytes; + pCtx->start.key = INT64_MIN; + pCtx->end.key = INT64_MIN; pCtx->numOfParams = pSqlFuncMsg->numOfParams; for (int32_t j = 0; j < pCtx->numOfParams; ++j) { @@ -1711,6 +1945,8 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order } + *(int64_t*) pRuntimeEnv->prevRow[0] = INT64_MIN; + // if it is group by normal column, do not set output buffer, the output buffer is pResult // fixed output query/multi-output query for normal table if (!pRuntimeEnv->groupbyNormalCol && !pRuntimeEnv->stableQuery && !QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQuery)) { @@ -1781,6 +2017,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) { tfree(pRuntimeEnv->offset); tfree(pRuntimeEnv->keyBuf); tfree(pRuntimeEnv->rowCellInfoOffset); + tfree(pRuntimeEnv->prevRow); taosHashCleanup(pRuntimeEnv->pResultRowHashTable); pRuntimeEnv->pResultRowHashTable = NULL; @@ -2279,12 +2516,14 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo * pW // the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer if (QUERY_IS_INTERVAL_QUERY(pQuery)) { bool hasTimeWindow = false; + SResultRow* pResult = NULL; + bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); TSKEY k = QUERY_IS_ASC_QUERY(pQuery)? pBlockInfo->window.skey:pBlockInfo->window.ekey; STimeWindow win = getActiveTimeWindow(pWindowResInfo, k, pQuery); - if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pBlockInfo, &win, masterScan, &hasTimeWindow) != + if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pBlockInfo, &win, masterScan, &hasTimeWindow, &pResult) != TSDB_CODE_SUCCESS) { // todo handle error in set result for timewindow } @@ -3266,7 +3505,7 @@ static void disableFuncInReverseScanImpl(SQueryRuntimeEnv* pRuntimeEnv, SWindowR SQuery* pQuery = pRuntimeEnv->pQuery; for (int32_t i = 0; i < pWindowResInfo->size; ++i) { - bool closed = getTimeWindowResStatus(pWindowResInfo, i); + bool closed = getResultRowStatus(pWindowResInfo, i); if (!closed) { continue; } @@ -4617,6 +4856,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo pRuntimeEnv->topBotQuery = isTopBottomQuery(pQuery); pRuntimeEnv->hasTagResults = hasTagValOutput(pQuery); + pRuntimeEnv->timeWindowInterpo = timeWindowInterpoRequired(pQuery); setScanLimitationByResultBuffer(pQuery); @@ -6447,9 +6687,11 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou goto _cleanup; } + int32_t srcSize = 0; for (int16_t i = 0; i < numOfCols; ++i) { pQuery->colList[i] = pQueryMsg->colList[i]; pQuery->colList[i].filters = tscFilterInfoClone(pQueryMsg->colList[i].filters, pQuery->colList[i].numOfFilters); + srcSize += pQuery->colList[i].bytes; } // calculate the result row size @@ -6518,6 +6760,14 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou pQInfo->runtimeEnv.pResultRowHashTable = taosHashInit(pTableGroupInfo->numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pQInfo->runtimeEnv.keyBuf = malloc(TSDB_MAX_BYTES_PER_ROW); pQInfo->runtimeEnv.pool = initResultRowPool(getWindowResultSize(&pQInfo->runtimeEnv)); + pQInfo->runtimeEnv.prevRow = malloc(POINTER_BYTES * pQuery->numOfCols + srcSize); + + char* start = POINTER_BYTES * pQuery->numOfCols + (char*) pQInfo->runtimeEnv.prevRow; + pQInfo->runtimeEnv.prevRow[0] = start; + + for(int32_t i = 1; i < pQuery->numOfCols; ++i) { + pQInfo->runtimeEnv.prevRow[i] = pQInfo->runtimeEnv.prevRow[i - 1] + pQuery->colList[i-1].bytes; + } pQInfo->pBuf = calloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo)); if (pQInfo->pBuf == NULL) { diff --git a/src/query/src/qUtil.c b/src/query/src/qUtil.c index 3f56366db8..2c41d9bfc5 100644 --- a/src/query/src/qUtil.c +++ b/src/query/src/qUtil.c @@ -395,11 +395,10 @@ uint64_t getResultInfoUId(SQueryRuntimeEnv* pRuntimeEnv) { } SQuery* pQuery = pRuntimeEnv->pQuery; - if ((pQuery->checkBuffer == 1 && pQuery->interval.interval == 0) || isPointInterpoQuery(pQuery) || - pRuntimeEnv->groupbyNormalCol) { + if (pQuery->interval.interval == 0 || isPointInterpoQuery(pQuery) || pRuntimeEnv->groupbyNormalCol) { return 0; } - STableId* id = TSDB_TABLEID(pRuntimeEnv->pQuery->current); + STableId* id = TSDB_TABLEID(pRuntimeEnv->pQuery->current->pTable); return id->uid; } \ No newline at end of file diff --git a/src/util/inc/tfile.h b/src/util/inc/tfile.h index 10b7c1df35..066040170e 100644 --- a/src/util/inc/tfile.h +++ b/src/util/inc/tfile.h @@ -20,23 +20,26 @@ extern "C" { #endif -#include - // init taos file module -int32_t tfinit(); +int32_t tfInit(); // clean up taos file module -void tfcleanup(); +void tfCleanup(); // the same syntax as UNIX standard open/close/read/write // but FD is int64_t and will never be reused -int64_t tfopen(const char *pathname, int32_t flags); -int64_t tfclose(int64_t tfd); -int64_t tfwrite(int64_t tfd, void *buf, int64_t count); -int64_t tfread(int64_t tfd, void *buf, int64_t count); +int64_t tfOpen(const char *pathname, int32_t flags); +int64_t tfOpenM(const char *pathname, int32_t flags, mode_t mode); +int64_t tfClose(int64_t tfd); +int64_t tfWrite(int64_t tfd, void *buf, int64_t count); +int64_t tfRead(int64_t tfd, void *buf, int64_t count); +int32_t tfFsync(int64_t tfd); +bool tfValid(int64_t tfd); +int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence); +int32_t tfFtruncate(int64_t tfd, int64_t length); #ifdef __cplusplus } #endif -#endif // TDENGINE_TREF_H +#endif // TDENGINE_TFILE_H diff --git a/src/util/src/tfile.c b/src/util/src/tfile.c index 27ba30fe81..1fa6a3d096 100644 --- a/src/util/src/tfile.c +++ b/src/util/src/tfile.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#define _DEFAULT_SOURCE #include "os.h" #include "taoserror.h" #include "tulog.h" @@ -21,40 +22,52 @@ static int32_t tsFileRsetId = -1; -static void taosCloseFile(void *p) { +static void tfCloseFile(void *p) { close((int32_t)(uintptr_t)p); } -int32_t tfinit() { - tsFileRsetId = taosOpenRef(2000, taosCloseFile); - return tsFileRsetId; +int32_t tfInit() { + tsFileRsetId = taosOpenRef(2000, tfCloseFile); + if (tsFileRsetId > 0) { + return 0; + } else { + return -1; + } } -void tfcleanup() { +void tfCleanup() { if (tsFileRsetId >= 0) taosCloseRef(tsFileRsetId); tsFileRsetId = -1; } -int64_t tfopen(const char *pathname, int32_t flags) { - int32_t fd = open(pathname, flags); - +static int64_t tfOpenImp(int32_t fd) { if (fd < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; - } + } - void *p = (void *)(int64_t)fd; + void * p = (void *)(int64_t)fd; int64_t rid = taosAddRef(tsFileRsetId, p); if (rid < 0) close(fd); return rid; } -int64_t tfclose(int64_t tfd) { +int64_t tfOpen(const char *pathname, int32_t flags) { + int32_t fd = open(pathname, flags); + return tfOpenImp(fd); +} + +int64_t tfOpenM(const char *pathname, int32_t flags, mode_t mode) { + int32_t fd = open(pathname, flags, mode); + return tfOpenImp(fd); +} + +int64_t tfClose(int64_t tfd) { return taosRemoveRef(tsFileRsetId, tfd); } -int64_t tfwrite(int64_t tfd, void *buf, int64_t count) { +int64_t tfWrite(int64_t tfd, void *buf, int64_t count) { void *p = taosAcquireRef(tsFileRsetId, tfd); if (p == NULL) return -1; @@ -67,7 +80,7 @@ int64_t tfwrite(int64_t tfd, void *buf, int64_t count) { return ret; } -int64_t tfread(int64_t tfd, void *buf, int64_t count) { +int64_t tfRead(int64_t tfd, void *buf, int64_t count) { void *p = taosAcquireRef(tsFileRsetId, tfd); if (p == NULL) return -1; @@ -79,3 +92,32 @@ int64_t tfread(int64_t tfd, void *buf, int64_t count) { taosReleaseRef(tsFileRsetId, tfd); return ret; } + +int64_t tfFsync(int64_t tfd) { + void *p = taosAcquireRef(tsFileRsetId, tfd); + if (p == NULL) return -1; + + int32_t fd = (int32_t)(uintptr_t)p; + return fsync(fd); +} + +bool tfValid(int64_t tfd) { + void *p = taosAcquireRef(tsFileRsetId, tfd); + return p != NULL; +} + +int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence) { + void *p = taosAcquireRef(tsFileRsetId, tfd); + if (p == NULL) return -1; + + int32_t fd = (int32_t)(uintptr_t)p; + return taosLSeek(fd, offset, whence); +} + +int32_t tfFtruncate(int64_t tfd, int64_t length) { + void *p = taosAcquireRef(tsFileRsetId, tfd); + if (p == NULL) return -1; + + int32_t fd = (int32_t)(uintptr_t)p; + return taosFtruncate(fd, length); +} diff --git a/src/wal/inc/walInt.h b/src/wal/inc/walInt.h index b0edabfbd8..06748d885f 100644 --- a/src/wal/inc/walInt.h +++ b/src/wal/inc/walInt.h @@ -44,8 +44,8 @@ typedef struct { uint64_t version; int64_t fileId; int64_t rid; + int64_t tfd; int32_t vgId; - int32_t fd; int32_t keep; int32_t level; int32_t fsyncPeriod; diff --git a/src/wal/src/walMgmt.c b/src/wal/src/walMgmt.c index 36c190be3e..72ea239817 100644 --- a/src/wal/src/walMgmt.c +++ b/src/wal/src/walMgmt.c @@ -17,6 +17,7 @@ #include "os.h" #include "taoserror.h" #include "tref.h" +#include "tfile.h" #include "twal.h" #include "walInt.h" @@ -61,7 +62,7 @@ void *walOpen(char *path, SWalCfg *pCfg) { } pWal->vgId = pCfg->vgId; - pWal->fd = -1; + pWal->tfd = -1; pWal->fileId = -1; pWal->level = pCfg->walLevel; pWal->keep = pCfg->keep; @@ -124,7 +125,7 @@ void walClose(void *handle) { SWal *pWal = handle; pthread_mutex_lock(&pWal->mutex); - taosClose(pWal->fd); + tfClose(pWal->tfd); pthread_mutex_unlock(&pWal->mutex); taosRemoveRef(tsWal.refId, pWal->rid); } @@ -143,7 +144,7 @@ static void walFreeObj(void *wal) { SWal *pWal = wal; wDebug("vgId:%d, wal:%p is freed", pWal->vgId, pWal); - taosClose(pWal->fd); + tfClose(pWal->tfd); pthread_mutex_destroy(&pWal->mutex); tfree(pWal); } @@ -172,7 +173,7 @@ static void walFsyncAll() { while (pWal) { if (walNeedFsync(pWal)) { wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->vgId, pWal->level, pWal->fsyncSeq, tsWal.seq); - int32_t code = fsync(pWal->fd); + int32_t code = tfFsync(pWal->tfd); if (code != 0) { wError("vgId:%d, file:%s, failed to fsync since %s", pWal->vgId, pWal->name, strerror(code)); } diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index 8a532c8164..10e1b4dd61 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -18,6 +18,7 @@ #include "os.h" #include "taoserror.h" #include "tchecksum.h" +#include "tfile.h" #include "twal.h" #include "walInt.h" @@ -36,8 +37,8 @@ int32_t walRenew(void *handle) { pthread_mutex_lock(&pWal->mutex); - if (pWal->fd >= 0) { - tclose(pWal->fd); + if (tfValid(pWal->tfd)) { + tfClose(pWal->tfd); wDebug("vgId:%d, file:%s, it is closed", pWal->vgId, pWal->name); } @@ -49,9 +50,9 @@ int32_t walRenew(void *handle) { } snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); - pWal->fd = open(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); + pWal->tfd = tfOpenM(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); - if (pWal->fd < 0) { + if (!tfValid(pWal->tfd)) { code = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno)); } else { @@ -67,7 +68,7 @@ void walRemoveOneOldFile(void *handle) { SWal *pWal = handle; if (pWal == NULL) return; if (pWal->keep == TAOS_WAL_KEEP) return; - if (pWal->fd <= 0) return; + if (!tfValid(pWal->tfd)) return; pthread_mutex_lock(&pWal->mutex); @@ -113,7 +114,7 @@ int32_t walWrite(void *handle, SWalHead *pHead) { int32_t code = 0; // no wal - if (pWal->fd <= 0) return 0; + if (!tfValid(pWal->tfd)) return 0; if (pWal->level == TAOS_WAL_NOLOG) return 0; if (pHead->version <= pWal->version) return 0; @@ -123,12 +124,12 @@ int32_t walWrite(void *handle, SWalHead *pHead) { pthread_mutex_lock(&pWal->mutex); - if (taosWrite(pWal->fd, pHead, contLen) != contLen) { + if (tfWrite(pWal->tfd, pHead, contLen) != contLen) { code = TAOS_SYSTEM_ERROR(errno); wError("vgId:%d, file:%s, failed to write since %s", pWal->vgId, pWal->name, strerror(errno)); } else { - wTrace("vgId:%d, write wal, fileId:%" PRId64 " fd:%d hver:%" PRId64 " wver:%" PRIu64 " len:%d", pWal->vgId, - pWal->fileId, pWal->fd, pHead->version, pWal->version, pHead->len); + wTrace("vgId:%d, write wal, fileId:%" PRId64 " tfd:%" PRId64 " hver:%" PRId64 " wver:%" PRIu64 " len:%d", pWal->vgId, + pWal->fileId, pWal->tfd, pHead->version, pWal->version, pHead->len); pWal->version = pHead->version; } @@ -141,11 +142,11 @@ int32_t walWrite(void *handle, SWalHead *pHead) { void walFsync(void *handle, bool forceFsync) { SWal *pWal = handle; - if (pWal == NULL || pWal->fd < 0) return; + if (pWal == NULL || !tfValid(pWal->tfd)) return; if (forceFsync || (pWal->level == TAOS_WAL_FSYNC && pWal->fsyncPeriod == 0)) { wTrace("vgId:%d, fileId:%" PRId64 ", do fsync", pWal->vgId, pWal->fileId); - if (fsync(pWal->fd) < 0) { + if (tfFsync(pWal->tfd) < 0) { wError("vgId:%d, fileId:%" PRId64 ", fsync failed since %s", pWal->vgId, pWal->fileId, strerror(errno)); } } @@ -186,8 +187,8 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) { // open the existing WAL file in append mode pWal->fileId = 0; snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); - pWal->fd = open(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); - if (pWal->fd < 0) { + pWal->tfd = tfOpenM(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); + if (!tfValid(pWal->tfd)) { wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno)); return TAOS_SYSTEM_ERROR(errno); } @@ -217,22 +218,22 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) { return code; } -static void walFtruncate(SWal *pWal, int32_t fd, int64_t offset) { - taosFtruncate(fd, offset); - fsync(fd); +static void walFtruncate(SWal *pWal, int64_t tfd, int64_t offset) { + tfFtruncate(tfd, offset); + tfFsync(tfd); } -static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int32_t fd, int64_t *offset) { +static int32_t walSkipCorruptedRecord(SWal *pWal, SWalHead *pHead, int64_t tfd, int64_t *offset) { int64_t pos = *offset; while (1) { pos++; - if (lseek(fd, pos, SEEK_SET) < 0) { + if (tfLseek(tfd, pos, SEEK_SET) < 0) { wError("vgId:%d, failed to seek from corrupted wal file since %s", pWal->vgId, strerror(errno)); return TSDB_CODE_WAL_FILE_CORRUPTED; } - if (taosRead(fd, pHead, sizeof(SWalHead)) <= 0) { + if (tfRead(tfd, pHead, sizeof(SWalHead)) <= 0) { wError("vgId:%d, read to end of corrupted wal file, offset:%" PRId64, pWal->vgId, pos); return TSDB_CODE_WAL_FILE_CORRUPTED; } @@ -259,8 +260,8 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch return TAOS_SYSTEM_ERROR(errno); } - int32_t fd = open(name, O_RDWR); - if (fd < 0) { + int64_t tfd = tfOpen(name, O_RDWR); + if (!tfValid(tfd)) { wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); tfree(buffer); return TAOS_SYSTEM_ERROR(errno); @@ -273,7 +274,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch SWalHead *pHead = buffer; while (1) { - int32_t ret = taosRead(fd, pHead, sizeof(SWalHead)); + int32_t ret = tfRead(tfd, pHead, sizeof(SWalHead)); if (ret == 0) break; if (ret < 0) { @@ -284,16 +285,16 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch if (ret < sizeof(SWalHead)) { wError("vgId:%d, file:%s, failed to read wal head, ret is %d", pWal->vgId, name, ret); - walFtruncate(pWal, fd, offset); + walFtruncate(pWal, tfd, offset); break; } if (!taosCheckChecksumWhole((uint8_t *)pHead, sizeof(SWalHead))) { wError("vgId:%d, file:%s, wal head cksum is messed up, hver:%" PRIu64 " len:%d offset:%" PRId64, pWal->vgId, name, pHead->version, pHead->len, offset); - code = walSkipCorruptedRecord(pWal, pHead, fd, &offset); + code = walSkipCorruptedRecord(pWal, pHead, tfd, &offset); if (code != TSDB_CODE_SUCCESS) { - walFtruncate(pWal, fd, offset); + walFtruncate(pWal, tfd, offset); break; } } @@ -310,7 +311,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch pHead = buffer; } - ret = taosRead(fd, pHead->cont, pHead->len); + ret = tfRead(tfd, pHead->cont, pHead->len); if (ret < 0) { wError("vgId:%d, file:%s, failed to read wal body since %s", pWal->vgId, name, strerror(errno)); code = TAOS_SYSTEM_ERROR(errno); @@ -332,7 +333,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL, NULL); } - tclose(fd); + tfClose(tfd); tfree(buffer); return code; diff --git a/tests/pytest/insert/insertDemo.py b/tests/pytest/insert/insertDemo.py new file mode 100644 index 0000000000..d18206e7a4 --- /dev/null +++ b/tests/pytest/insert/insertDemo.py @@ -0,0 +1,47 @@ +import taos +import datetime +import random +import multiprocessing + +def taos_excute(table, connect_host): + conn = taos.connect(host=connect_host, user="root", password="taosdata", config="/etc/taos", database='test') + cursor = conn.cursor() + for i in range(1000000): + pk = random.randint(100001, 300000) + time_now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + col1 = random.randint(1, 10000) + col2 = random.randint(1, 10000) + col3 = random.randint(1, 10000) + col4 = random.randint(1, 10000) + col5 = random.randint(1, 10000) + col6 = random.randint(1, 10000) + sql = f"INSERT INTO {table}_{pk} USING {table} TAGS ({pk}) VALUES ('{time_now}', {col1}, {col2}, {col3}, {col4}, {col5}, {col6})" + cursor.execute(sql) + cursor.close() + conn.close() + +def taos_init(table, connect_host, pk): + conn = taos.connect(host=connect_host, user="root", password="taosdata", config="/etc/taos", database='test') + cursor = conn.cursor() + sql = f"CREATE TABLE {table}_{pk} USING {table} TAGS ({pk})" + cursor.execute(sql) + cursor.close() + conn.close() + +print("init time:", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + +connect_list = ["node1", "node2", "node3", "node4", "node5"] +pool = multiprocessing.Pool(processes=108) + +for pk in range(100001, 300000): + pool.apply_async(func=taos_init, args=("test", connect_list[pk % 5], pk, )) + +print("start time:", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + +for i in range(10000): + pool.apply_async(func=taos_excute, args=("test", connect_list[i % 5],)) + +pool.close() +pool.join() + +print("end time:", datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) \ No newline at end of file diff --git a/tests/pytest/insert/insertFromCSVOurofOrder.py b/tests/pytest/insert/insertFromCSVOurofOrder.py new file mode 100644 index 0000000000..d4de85b7e9 --- /dev/null +++ b/tests/pytest/insert/insertFromCSVOurofOrder.py @@ -0,0 +1,71 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import sys +import taos +from util.log import tdLog +from util.cases import tdCases +from util.sql import tdSql +import time +import datetime +import csv +import random +import pandas as pd + + +class TDTestCase: + def init(self, conn, logSql): + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor(), logSql) + + self.ts = 1500074556514 + + def writeCSV(self): + with open('test3.csv','w', encoding='utf-8', newline='') as csvFile: + writer = csv.writer(csvFile, dialect='excel') + for i in range(1000000): + newTimestamp = self.ts + random.randint(10000000, 10000000000) + random.randint(1000, 10000000) + random.randint(1, 1000) + d = datetime.datetime.fromtimestamp(newTimestamp / 1000) + dt = str(d.strftime("%Y-%m-%d %H:%M:%S.%f")) + writer.writerow(["'%s'" % dt, random.randint(1, 100), random.uniform(1, 100), random.randint(1, 100), random.randint(1, 100)]) + + def removCSVHeader(self): + data = pd.read_csv("ordered.csv") + data = data.drop([0]) + data.to_csv("ordered.csv", header = False, index = False) + + def run(self): + tdSql.prepare() + + tdSql.execute("create table t1(ts timestamp, c1 int, c2 float, c3 int, c4 int)") + startTime = time.time() + tdSql.execute("insert into t1 file 'outoforder.csv'") + duration = time.time() - startTime + print("Out of Order - Insert time: %d" % duration) + tdSql.query("select count(*) from t1") + rows = tdSql.getData(0, 0) + + tdSql.execute("create table t2(ts timestamp, c1 int, c2 float, c3 int, c4 int)") + startTime = time.time() + tdSql.execute("insert into t2 file 'ordered.csv'") + duration = time.time() - startTime + print("Ordered - Insert time: %d" % duration) + tdSql.query("select count(*) from t2") + tdSql.checkData(0,0, rows) + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file diff --git a/tests/script/general/parser/col_arithmetic_operation.sim b/tests/script/general/parser/col_arithmetic_operation.sim index 7adae8ef81..0cc02d088b 100644 --- a/tests/script/general/parser/col_arithmetic_operation.sim +++ b/tests/script/general/parser/col_arithmetic_operation.sim @@ -117,16 +117,17 @@ run general/parser/col_arithmetic_query.sim # ================================================================================================ print ====================> crash -# sql select spread(ts )/(1000*3600*24) from ca_stb0 interval(1y) +sql use $db +sql select spread(ts )/(1000*3600*24) from $stb interval(1y) -sql_error select first(c1, c2) - last(c1, c2) from stb interval(1y) -sql_error select first(ts) - last(ts) from stb interval(1y) -sql_error select top(c1, 2) - last(c1) from stb; -sql_error select stddev(c1) - last(c1) from stb; -sql_error select diff(c1) - last(c1) from stb; -sql_error select first(c7) - last(c7) from stb; -sql_error select first(c8) - last(c8) from stb; -sql_error select first(c9) - last(c9) from stb; +sql_error select first(c1, c2) - last(c1, c2) from $stb interval(1y) +sql_error select first(ts) - last(ts) from $stb interval(1y) +sql_error select top(c1, 2) - last(c1) from $stb; +sql_error select stddev(c1) - last(c1) from $stb; +sql_error select diff(c1) - last(c1) from $stb; +sql_error select first(c7) - last(c7) from $stb; +sql_error select first(c8) - last(c8) from $stb; +sql_error select first(c9) - last(c9) from $stb; sql_error select max(c2*2) from $tb sql_error select max(c1-c2) from $tb diff --git a/tests/script/general/parser/col_arithmetic_query.sim b/tests/script/general/parser/col_arithmetic_query.sim index 408b039144..53e2c98b56 100644 --- a/tests/script/general/parser/col_arithmetic_query.sim +++ b/tests/script/general/parser/col_arithmetic_query.sim @@ -62,24 +62,73 @@ if $data91 != 1.000000000 then return -1 endi -sql select (c1 * 2) % 7.9 from $tb order by ts desc; +sql select (c1 * 2) % 7.9, c1*1, c1*1*1, c1*c1, c1*c1*c1 from $tb order by ts desc; if $rows != 10000 then return -1 endi -if $data00 != 0.100000000 then - print expect 0.100000000, acutal:$data00 +if $data00 != 2.200000000 then + print expect 2.200000000, actual:$data00 return -1 endi -if $data10 != 2.100000000 then +if $data01 != 9.000000000 then return -1 endi -if $data90 != 6.000000000 then +if $data02 != 9.000000000 then + return -1 +endi + +if $data03 != 81.000000000 then + return -1 +endi + +if $data04 != 729.000000000 then + return -1 +endi + + +if $data10 != 0.200000000 then + return -1 +endi + +if $data11 != 8.000000000 then + return -1 +endi + +if $data12 != 8.000000000 then + return -1 +endi + +if $data13 != 64.000000000 then + return -1 +endi + +if $data14 != 512.000000000 then + return -1 +endi + +if $data90 != 0.000000000 then return -1 endi +if $data91 != 0.000000000 then + return -1 +endi + +if $data92 != 0.000000000 then + return -1 +endi + +if $data93 != 0.000000000 then + return -1 +endi + +if $data94 != 0.000000000 then + return -1 +endi + # [d.3] sql select c1 * c2 /4 from $tb where ts < 1537166000000 and ts > 1537156000000 if $rows != 17 then @@ -95,7 +144,7 @@ if $data10 != 16.000000000 then endi if $data20 != 20.250000000 then - print expect 20.250000000, acutal:$data21 + print expect 20.250000000, actual:$data21 return -1 endi @@ -320,7 +369,9 @@ sql_error select c7-c9 from $tb interval(2y) # multiple retrieve [d.20]=============================================================== sql select c2-c2, 911 from $tb -#======================================= aggregation function arithmetic query cases ================ +#======================================= aggregation function arithmetic query cases =================================== +# on $tb percentile() spread(ts) bug + # asc/desc order [d.2] sql select first(c1) * ( 2 / 3 ) from $stb order by ts asc; if $rows != 1 then @@ -349,11 +400,11 @@ if $data00 != 1.800000000 then return -1 endi -if $data01 != 100000 then +if $data01 != 100000.000000000 then return -1 endi -if $data02 != 200000 then +if $data02 != 200000.000000000 then return -1 endi @@ -374,77 +425,192 @@ if $data02 != 9.000000020 then return -1 endi -# all possible function in the arithmetic expressioin -sql select min(c1) * max(c2) /4, sum(c1) * percentile(c2, 20), apercentile(c4, 33) + 52/9, spread(c5)/min(c2) from $stb where ts < and ts > +# all possible function in the arithmetic expression, add more +sql select min(c1) * max(c2) /4, sum(c1) * apercentile(c2, 20), apercentile(c4, 33) + 52/9, spread(c5)/min(c2), count(1)/sum(c1), avg(c2)*count(c2) from $stb where ts >= '2018-09-17 09:00:00.000' and ts <= '2018-11-25 19:30:00.000'; +if $rows != 1 then + return -1 +endi -# no result return [d.3] +if $data00 != 0.000000000 then + return -1 +endi + +if $data01 != 225000.000000000 then + return -1 +endi + +if $data02 != 8.077777778 then + return -1 +endi + +if $data03 != inf then + return -1 +endi + +if $data04 != 0.444444444 then + return -1 +endi + +if $data05 != 450000.000000000 then + return -1 +endi + +# no result return [d.3]=============================================================== sql select first(c1) * 91 - 7, last(c3) from $stb where ts < 1537146000000 if $rows != 0 then return -1 endi # no result return [d.3] -sql select sum(c2) - avg(c2) from $tb where ts>xxx +sql select sum(c2) - avg(c2) from $stb where ts > '2018-11-25 19:30:00.000' if $rows != 0 then return -1 endi -# single row result aggregation [d.4] -sql select +# single row result aggregation [d.4]=================================================== +# all those cases are aggregation test cases. # error cases -sql_error select first(c1, c2) - last(c1, c2) from $tb +sql_error select first(c1, c2) - last(c1, c2) from $stb +sql_error select top(c1, 5) - bottom(c1, 5) from $stb +sql_error select first(*) - 99 from $stb # multi row result aggregation [d.4] -sql select top(c1, 1) - bottom(c1, 1) from $tb -sql select top(c1, 99) - bottom(c1, 99) from $tb +sql_error select top(c1, 1) - bottom(c1, 1) from $stb +sql_error select top(c1, 99) - bottom(c1, 99) from $stb -# all data types [d.6] -sql select c2-c1, c3/c2, c4*c3, c5%c4, c6+99%22 from $tb +# query on super table [d.5]============================================================= +# all cases in this part are query on super table + +# all data types [d.6]=================================================================== +sql select c2-c1, c3/c2, c4*c3, c5%c4, c6+99%22 from $stb # error case, ts/bool/binary/nchar not support arithmetic expression -sql_error select ts+ts from $tb -sql_error select ts+22 from $tb -sql_error select c7*12 from $tb -sql_error select c8/55 from $tb -sql_error select c9+c8 from $tb +sql_error select first(c7)*12 from $stb +sql_error select last(c8)/55 from $stb +sql_error select last_row(c9) + last_row(c8) from $stb -# arithmetic expression in join [d.7] +# arithmetic expression in join [d.7]=============================================================== -# arithmetic expression in union [d.8] +# arithmetic expression in union [d.8]=============================================================== -# arithmetic expression in group by [d.9] +# arithmetic expression in group by [d.9]=============================================================== # in group by tag -# not support for normal table -sql_error select c5*99 from $tb group by t1 +sql select avg(c4)*99 from $stb group by t1 +if $rows != 10 then + return -1 +endi + +if $data00 != 445.500000000 then + return -1 +endi + +if $data01 != 0 then + return -1 +endi + +if $data90 != 445.500000000 then + return -1 +endi + +if $data91 != 9 then + return -1 +endi # in group by column -sql_error select c6-c6+c3*12 from $tb group by c3; +sql select apercentile(c6, 50)-first(c6)+last(c5)*12, last(c5)*12 from ca_stb0 group by c2; +if $rows != 10 then + return -1 +endi -sql select first(c6) - last(c6) *12 / count(*) from $tb group by c3; +if $data00 != 0.000000000 then + return -1 +endi -# limit offset [d.10] -sql select c6-c6+12 from $tb limit 12 offset 99; -sql select c4/99.123 from $tb limit 1 offset 9999; +if $data01 != 0.000000000 then + return -1 +endi -# slimit/soffset not suport for normal table query. [d.11] -sql_error select sum(c1) from $tb slimit 1 soffset 19; +if $data10 != 12.000000000 then + return -1 +endi -# fill [d.12] -sql_error select c2-c2, c3-c4, c5%c6 from $tb fill(value, 12); +if $data11 != 12.000000000 then + return -1 +endi -# constant column. [d.13] +if $data20 != 24.000000000 then + return -1 +endi + +if $data21 != 24.000000000 then + return -1 +endi + +sql_error select first(c6) - last(c6) *12 / count(*) from $stb group by c3; + +sql select first(c6) - last(c6) *12 / count(*) from $stb group by c5; +if $rows != 10 then + return -1 +endi + +if $data00 != 0.000000000 then + return -1 +endi + +if $data10 != 0.997600000 then + return -1 +endi + +if $data90 != 8.978400000 then + return -1 +endi + +# limit offset [d.10]=============================================================== +sql select first(c6) - sum(c6) + 12 from $stb limit 12 offset 0; +if $rows != 1 then + return -1 +endi + +if $data00 != -449988.000000000 then + return -1 +endi + +sql select apercentile(c4, 21) / 99.123 from $stb limit 1 offset 1; +if $rows != 0 then + return -1 +endi + +sql select apercentile(c4, 21) / sum(c4) from $stb interval(1s) limit 1 offset 1; +if $rows != 1 then + return -1 +endi + +# slimit/soffset not support for normal table query. [d.11]=============================================================== +sql select sum(c1) from $stb slimit 1 soffset 19; +if $rows != 0 then + return -1 +endi + +sql select sum(c1) from $stb interval(1s) group by tbname slimit 1 soffset 1 +sql select sum(c1) from ca_stb0 interval(1s) group by tbname slimit 2 soffset 4 limit 10 offset 1 + +# fill [d.12]=============================================================== +sql_error select first(c1)-last(c1), sum(c3)*count(c3), spread(c5 ) % count(*) from $stb interval(1s) fill(prev); +sql_error select first(c1) from $stb fill(value, 20); + +# constant column. [d.13]=============================================================== -# column value filter [d.14] +# column value filter [d.14]=============================================================== -# tag filter(not support for normal table). [d.15] -sql_error select sum(c2)+99 from $tb where t1=12; +# tag filter. [d.15]=============================================================== +sql select sum(c2)+99 from $stb where t1=12; -# multi-field output [d.16] +# multi-field output [d.16]=============================================================== sql select count(*), sum(c1)*avg(c2), avg(c3)*count(c3), sum(c3), sum(c4), first(c7), last(c8), first(c9), first(c7), last(c8) from $tb sql select c4*1+1/2 from $tb @@ -461,18 +627,30 @@ if $data90 != 9.500000000 then return -1 endi -# interval query [d.17] -sql_error select c2*c2, c3-c3, c4+9 from $tb interval(1s) -sql_error select c7-c9 from $tb interval(2y) +# interval query [d.17]=============================================================== +sql select avg(c2)*count(c2), sum(c3)-first(c3), last(c4)+9 from $stb interval(1s) +if $rows != 10000 then + return -1 +endi -# aggregation query [d.18] -# see test cases below +if $data00 != @18-09-17 09:00:00.000@ then + return -1 +endi -# first/last query [d.19] -# see test cases below +sql_error select first(c7)- last(c1) from $tb interval(2y) -# multiple retrieve [d.20] -sql select c2-c2 from $tb; +# aggregation query [d.18]=============================================================== +# all cases in this part are aggregation query test. + +# first/last query [d.19]=============================================================== + + +# multiple retrieve [d.20]=============================================================== +sql select c2-c2 from $tb sql select first(c1)-last(c1), spread(c2), max(c3) - min(c3), avg(c4)*count(c4) from $tb + + +#====================================================super table query================================================== + diff --git a/tests/script/general/parser/limit1_tb.sim b/tests/script/general/parser/limit1_tb.sim index 1e473eb858..72b63256db 100644 --- a/tests/script/general/parser/limit1_tb.sim +++ b/tests/script/general/parser/limit1_tb.sim @@ -703,13 +703,13 @@ sql select twa(c1), twa(c2), twa(c3), twa(c4), twa(c5), twa(c6) from $tb where t if $rows != 1 then return -1 endi -if $data00 != 4.499549955 then +if $data00 != 4.500000000 then return -1 endi -if $data02 != 4.499549955 then +if $data02 != 4.500000000 then return -1 endi -if $data05 != 4.499549955 then +if $data05 != 4.500000000 then return -1 endi @@ -717,10 +717,12 @@ sql select first(c1), first(c2), first(c3), first(c4), first(c5), first(c6) from if $rows != 0 then return -1 endi + sql select first(c1), first(c2), first(c3), first(c4), first(c5), first(c6) from $tb where ts >= $ts0 and ts <= $tsu interval(30m) limit 3 offset 1 if $rows != 3 then return -1 endi + if $data01 != 3 then return -1 endi @@ -731,7 +733,6 @@ if $data23 != 9.00000 then return -1 endi - sql select last(c1), last(c2), last(c3), last(c4), last(c5), last(c6) from $tb where ts >= $ts0 and ts <= $tsu limit 5 offset 1 if $rows != 0 then return -1 diff --git a/tests/script/general/parser/limit_tb.sim b/tests/script/general/parser/limit_tb.sim index b917627fdf..45f5541208 100644 --- a/tests/script/general/parser/limit_tb.sim +++ b/tests/script/general/parser/limit_tb.sim @@ -327,22 +327,22 @@ sql select twa(c1), twa(c2), twa(c3), twa(c4), twa(c5), twa(c6) from $tb where t if $rows != 1 then return -1 endi -if $data00 != 4.000000000 then +if $data00 != 4.500000000 then return -1 endi -if $data01 != 4.000000000 then +if $data01 != 4.500000000 then return -1 endi -if $data02 != 4.000000000 then +if $data02 != 4.500000000 then return -1 endi -if $data03 != 4.000000000 then +if $data03 != 4.500000000 then return -1 endi -if $data04 != 4.000000000 then +if $data04 != 4.500000000 then return -1 endi -if $data05 != 4.000000000 then +if $data05 != 4.500000000 then return -1 endi @@ -690,13 +690,13 @@ sql select twa(c1), twa(c2), twa(c3), twa(c4), twa(c5), twa(c6) from $tb where t if $rows != 1 then return -1 endi -if $data00 != 4.000000000 then +if $data00 != 4.500000000 then return -1 endi -if $data02 != 4.000000000 then +if $data02 != 4.500000000 then return -1 endi -if $data05 != 4.000000000 then +if $data05 != 4.500000000 then return -1 endi diff --git a/tests/script/general/parser/where.sim b/tests/script/general/parser/where.sim index 8e17220b5b..c5b600b514 100644 --- a/tests/script/general/parser/where.sim +++ b/tests/script/general/parser/where.sim @@ -131,7 +131,6 @@ if $data00 != $rowNum then return -1 endi - ## like sql_error select * from $mt where c1 like 1 #sql_error select * from $mt where t1 like 1 @@ -178,7 +177,8 @@ sql create table wh_mt2_tb1 using wh_mt2 tags ('wh_mt2_tb1') # 2019-01-01 09:00:00.000 1546304400000 # 2019-01-01 09:10:00.000 1546305000000 sql insert into wh_mt2_tb1 values ('2019-01-01 00:00:00.000', '2019-01-01 09:00:00.000', 'binary10', 'nchar10') -sql insert into wh_mt2_tb1 values ('2019-01-01 00:10:00.000', '2019-01-01 09:10:00.000', 'binary10', 'nchar10') +sql insert into wh_mt2_tb1 values ('2019-01-01 00:10:00.000', '2019-01-01 09:10:00.000', 'binary10', 'nchar10') + sql select * from wh_mt2_tb1 where c1 > 1546304400000 if $rows != 1 then return -1