diff --git a/include/client/taos.h b/include/client/taos.h index 5bb528bd7f..b4e5a41ccf 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -194,6 +194,7 @@ DLL_EXPORT int taos_errno(TAOS_RES *tres); DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param); DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param); +DLL_EXPORT void taos_fetch_raw_block_a(TAOS_RES* res, __taos_async_fn_t fp, void* param); // Shuduo: temporary enable for app build #if 1 diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index e6512b5bac..77376a05d9 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -127,6 +127,8 @@ typedef enum EFunctionType { FUNCTION_TYPE_APERCENTILE_MERGE, FUNCTION_TYPE_SPREAD_PARTIAL, FUNCTION_TYPE_SPREAD_MERGE, + FUNCTION_TYPE_HISTOGRAM_PARTIAL, + FUNCTION_TYPE_HISTOGRAM_MERGE, // user defined funcion FUNCTION_TYPE_UDF = 10000 diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index ddd7e1cd02..6b5eb3b491 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -339,6 +339,7 @@ typedef struct { int32_t sourceTaskId; int32_t sourceVg; int32_t sourceChildId; + int32_t upstreamNodeId; #if 0 int64_t sourceVer; #endif diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 3a6f3badc0..6aa83e9575 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -171,6 +171,7 @@ typedef struct SReqResultInfo { uint32_t current; bool completed; int32_t precision; + bool convertUcs4; int32_t payloadLen; } SReqResultInfo; @@ -222,7 +223,7 @@ typedef struct SSyncQueryParam { SRequestObj* pRequest; } SSyncQueryParam; -void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); +void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4); void doSetOneRowPtr(SReqResultInfo* pResultInfo); diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 75e1884360..171f06c257 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -191,6 +191,8 @@ void *createRequest(STscObj *pObj, int32_t type) { pRequest->requestId = generateRequestId(); pRequest->metric.start = taosGetTimestampUs(); + pRequest->body.resInfo.convertUcs4 = true; // convert ucs4 by default + pRequest->type = type; pRequest->pTscObj = pObj; pRequest->msgBuf = taosMemoryCalloc(1, ERROR_MSG_BUF_DEFAULT_SIZE); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index e15624bc0d..9025121854 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1114,7 +1114,7 @@ static void syncFetchFn(void* param, TAOS_RES* res, int32_t numOfRows) { tsem_post(&pParam->sem); } -void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) { +void* doAsyncFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) { assert(pRequest != NULL); SReqResultInfo* pResultInfo = &pRequest->body.resInfo; @@ -1126,6 +1126,10 @@ void* doAsyncFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUc } SSyncQueryParam* pParam = pRequest->body.param; + + // convert ucs4 to native multi-bytes string + pResultInfo->convertUcs4 = convertUcs4; + taos_fetch_rows_a(pRequest, syncFetchFn, pParam); tsem_wait(&pParam->sem); } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 1a1925e244..5e442c4bf1 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -219,14 +219,13 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) { if (TD_RES_QUERY(res)) { SRequestObj *pRequest = (SRequestObj *)res; +#if SYNC_ON_TOP_OF_ASYNC + return doAsyncFetchRows(pRequest, true, true); +#else if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT || pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) { return NULL; } - -#if SYNC_ON_TOP_OF_ASYNC - return doAsyncFetchRow(pRequest, true, true); -#else return doFetchRows(pRequest, true, true); #endif @@ -489,6 +488,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) { if (res == NULL) { return 0; } + if (TD_RES_QUERY(res)) { SRequestObj *pRequest = (SRequestObj *)res; @@ -501,7 +501,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) { } #if SYNC_ON_TOP_OF_ASYNC - doAsyncFetchRow(pRequest, false, true); + doAsyncFetchRows(pRequest, false, true); #else doFetchRows(pRequest, true, true); #endif @@ -552,7 +552,11 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) { return 0; } +#if SYNC_ON_TOP_OF_ASYNC + doAsyncFetchRows(pRequest, false, false); +#else doFetchRows(pRequest, false, false); +#endif SReqResultInfo *pResultInfo = &pRequest->body.resInfo; @@ -771,11 +775,11 @@ static void fetchCallback(void* pResult, void* param, int32_t code) { } if (pRequest->code != TSDB_CODE_SUCCESS) { - pRequest->code = code; pRequest->body.fetchFp(pRequest->body.param, pRequest, 0); + return; } - pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResultInfo->pData, true, false); + pRequest->code = setQueryResultFromRsp(pResultInfo, (SRetrieveTableRsp*)pResultInfo->pData, pResultInfo->convertUcs4, false); if (pRequest->code != TSDB_CODE_SUCCESS) { pResultInfo->numOfRows = 0; pRequest->code = code; @@ -815,6 +819,13 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { schedulerAsyncFetchRows(pRequest->body.queryJob, fetchCallback, pRequest); } +void taos_fetch_raw_block_a(TAOS_RES* res, __taos_async_fn_t fp, void* param) { + ASSERT(res != NULL && fp != NULL); + SRequestObj *pRequest = res; + pRequest->body.resInfo.convertUcs4 = false; + taos_fetch_rows_a(res, fp, param); +} + TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char *topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval) { // TODO diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index cccb1fa6d0..b5011a19a2 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -494,7 +494,6 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) { tmqAskEp(tmq, true); taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { - /*tmq_commit(tmq, NULL, true);*/ tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam); taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) { @@ -667,94 +666,6 @@ FAIL: tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) { return tmqCommitInner(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam); -#if 0 - // TODO: add read write lock - SRequestObj* pRequest = NULL; - tmq_resp_err_t resp = TMQ_RESP_ERR__SUCCESS; - // build msg - // send to mnode - SMqCMCommitOffsetReq req; - SArray* pOffsets = NULL; - - if (offsets == NULL) { - pOffsets = taosArrayInit(0, sizeof(SMqOffset)); - for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { - SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); - for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { - SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - SMqOffset offset; - strcpy(offset.topicName, pTopic->topicName); - strcpy(offset.cgroup, tmq->groupId); - offset.vgId = pVg->vgId; - offset.offset = pVg->currentOffset; - taosArrayPush(pOffsets, &offset); - } - } - req.num = pOffsets->size; - req.offsets = pOffsets->pData; - } else { - req.num = taosArrayGetSize(&offsets->container); - req.offsets = (SMqOffset*)offsets->container.pData; - } - - SEncoder encoder; - - tEncoderInit(&encoder, NULL, 0); - tEncodeSMqCMCommitOffsetReq(&encoder, &req); - int32_t tlen = encoder.pos; - void* buf = taosMemoryMalloc(tlen); - if (buf == NULL) { - tEncoderClear(&encoder); - return -1; - } - tEncoderClear(&encoder); - - tEncoderInit(&encoder, buf, tlen); - tEncodeSMqCMCommitOffsetReq(&encoder, &req); - tEncoderClear(&encoder); - - pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_MQ_COMMIT_OFFSET); - if (pRequest == NULL) { - tscError("failed to malloc request"); - } - - SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam)); - if (pParam == NULL) { - return -1; - } - pParam->tmq = tmq; - tsem_init(&pParam->rspSem, 0, 0); - pParam->async = async; - pParam->offsets = pOffsets; - - pRequest->body.requestMsg = (SDataBuf){ - .pData = buf, - .len = tlen, - .handle = NULL, - }; - - SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); - sendInfo->requestObjRefId = 0; - sendInfo->param = pParam; - sendInfo->fp = tmqCommitCb; - SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); - - int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); - - if (!async) { - tsem_wait(&pParam->rspSem); - resp = pParam->rspErr; - tsem_destroy(&pParam->rspSem); - taosMemoryFree(pParam); - - if (pOffsets) { - taosArrayDestroy(pOffsets); - } - } - - return resp; -#endif } tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { @@ -859,93 +770,6 @@ void tmq_conf_set_auto_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb, void* para conf->commitCbUserParam = param; } -#if 0 -TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) { - STscObj* pTscObj = (STscObj*)taos; - SRequestObj* pRequest = NULL; - SQuery* pQueryNode = NULL; - char* astStr = NULL; - int32_t sqlLen; - - terrno = TSDB_CODE_SUCCESS; - if (taos == NULL || streamName == NULL || sql == NULL) { - tscError("invalid parameters for creating stream, connObj:%p, stream name:%s, sql:%s", taos, streamName, sql); - terrno = TSDB_CODE_TSC_INVALID_INPUT; - goto _return; - } - sqlLen = strlen(sql); - - if (strlen(tbName) >= TSDB_TABLE_NAME_LEN) { - tscError("output tb name too long, max length:%d", TSDB_TABLE_NAME_LEN - 1); - terrno = TSDB_CODE_TSC_INVALID_INPUT; - goto _return; - } - - if (sqlLen > TSDB_MAX_ALLOWED_SQL_LEN) { - tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN); - terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; - goto _return; - } - - tscDebug("start to create stream: %s", streamName); - - int32_t code = 0; - CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); - CHECK_CODE_GOTO(parseSql(pRequest, false, &pQueryNode, NULL), _return); - CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return); - - /*printf("%s\n", pStr);*/ - - SName name = {.acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T}; - strcpy(name.dbname, pRequest->pDb); - strcpy(name.tname, streamName); - - SCMCreateStreamReq req = { - .igExists = 1, - .ast = astStr, - .sql = (char*)sql, - }; - tNameExtractFullName(&name, req.name); - strcpy(req.targetStbFullName, tbName); - - int tlen = tSerializeSCMCreateStreamReq(NULL, 0, &req); - void* buf = taosMemoryMalloc(tlen); - if (buf == NULL) { - goto _return; - } - - tSerializeSCMCreateStreamReq(buf, tlen, &req); - - pRequest->body.requestMsg = (SDataBuf){ - .pData = buf, - .len = tlen, - .handle = NULL, - }; - pRequest->type = TDMT_MND_CREATE_STREAM; - - SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); - SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); - - int64_t transporterId = 0; - asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); - - tsem_wait(&pRequest->body.rspSem); - -_return: - taosMemoryFreeClear(astStr); - qDestroyQuery(pQueryNode); - /*if (sendInfo != NULL) {*/ - /*destroySendMsgInfo(sendInfo);*/ - /*}*/ - - if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) { - pRequest->code = terrno; - } - - return pRequest; -} -#endif - #if 0 int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) { if (tmq_message == NULL) return 0; @@ -1540,10 +1364,11 @@ const char* tmq_get_table_name(TAOS_RES* res) { } return NULL; } -DLL_EXPORT void tmq_commit_async(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, tmq_commit_cb* cb, void* param) { + +void tmq_commit_async(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, tmq_commit_cb* cb, void* param) { tmqCommitInner(tmq, offsets, 0, 1, cb, param); } -DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) { +tmq_resp_err_t tmq_commit_sync(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) { return tmqCommitInner(tmq, offsets, 0, 0, NULL, NULL); } diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 86d4f205c6..4ebedc98c1 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -684,7 +684,8 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock) { */ size_t blockDataGetSerialMetaSize(uint32_t numOfCols) { // | total rows/total length | block group id | column schema | each column length | - return sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t)) + numOfCols * sizeof(int32_t); + return sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t)) + + numOfCols * sizeof(int32_t); } double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { @@ -1217,6 +1218,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) { pBlock->info.numOfCols = numOfCols; pBlock->info.hasVarCol = pDataBlock->info.hasVarCol; pBlock->info.rowSize = pDataBlock->info.rowSize; + pBlock->info.groupId = pDataBlock->info.groupId; for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData colInfo = {0}; @@ -1892,12 +1894,12 @@ void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen uint64_t* groupId = (uint64_t*)data; data += sizeof(uint64_t); - for(int32_t i = 0; i < numOfCols; ++i) { + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); - *((int16_t*) data) = pColInfoData->info.type; + *((int16_t*)data) = pColInfoData->info.type; data += sizeof(int16_t); - *((int32_t*) data) = pColInfoData->info.bytes; + *((int32_t*)data) = pColInfoData->info.bytes; data += sizeof(int32_t); } @@ -1943,6 +1945,8 @@ void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData) { blockDataEnsureCapacity(pBlock, numOfRows); + pBlock->info.rows = numOfRows; + const char* pStart = pData; int32_t dataLen = *(int32_t*)pStart; @@ -1951,13 +1955,25 @@ const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t pBlock->info.groupId = *(uint64_t*)pStart; pStart += sizeof(uint64_t); - for(int32_t i = 0; i < numOfCols; ++i) { + if (pBlock->pDataBlock == NULL) { + pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); + taosArraySetSize(pBlock->pDataBlock, numOfCols); + } + + pBlock->info.numOfCols = taosArrayGetSize(pBlock->pDataBlock); + ASSERT(pBlock->pDataBlock->size >= numOfCols); + + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); pColInfoData->info.type = *(int16_t*)pStart; pStart += sizeof(int16_t); pColInfoData->info.bytes = *(int32_t*)pStart; pStart += sizeof(int32_t); + + if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { + pBlock->info.hasVarCol = true; + } } blockDataEnsureCapacity(pBlock, numOfRows); @@ -1982,11 +1998,17 @@ const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t pColInfoData->pData = taosMemoryMalloc(colLen[i]); } } else { + if (pColInfoData->nullbitmap == NULL) { + pColInfoData->nullbitmap = taosMemoryCalloc(1, BitmapLen(numOfRows)); + } memcpy(pColInfoData->nullbitmap, pStart, BitmapLen(numOfRows)); pStart += BitmapLen(numOfRows); } if (colLen[i] > 0) { + if (pColInfoData->pData == NULL) { + pColInfoData->pData = taosMemoryCalloc(1, colLen[i]); + } memcpy(pColInfoData->pData, pStart, colLen[i]); } diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 6037b011af..84a2cf0544 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -244,7 +244,7 @@ int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, S } } - ASSERT(flags); + // ASSERT(flags); // only 1 column(ts) // decide uint32_t nData = 0; @@ -268,7 +268,8 @@ int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, S nDataT = BIT2_SIZE(pTSchema->numOfCols - 1) + pTSchema->flen + ntv; break; default: - ASSERT(0); + break; // only ts column + // ASSERT(0); } uint8_t tflags = 0; @@ -283,7 +284,7 @@ int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, S tflags |= TSROW_KV_BIG; } - if (nDataT < nDataK) { + if (nDataT <= nDataK) { nData = nDataT; } else { nData = nDataK; @@ -373,7 +374,8 @@ int32_t tTSRowNew(STSRowBuilder *pBuilder, SArray *pArray, STSchema *pTSchema, S ptv = pf + pTSchema->flen; break; default: - ASSERT(0); + // ASSERT(0); + break; } } else { pTSKVRow = (STSKVRow *)(*ppRow)->pData; @@ -495,7 +497,7 @@ void tTSRowGet(STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal SValue value; ASSERT(iCol < pTSchema->numOfCols); - ASSERT(flags); + // ASSERT(flags); // only 1 ts column ASSERT(pRow->sver == pTSchema->version); if (iCol == 0) { diff --git a/source/common/test/CMakeLists.txt b/source/common/test/CMakeLists.txt index 0535b08be7..eb79e79afa 100644 --- a/source/common/test/CMakeLists.txt +++ b/source/common/test/CMakeLists.txt @@ -5,7 +5,12 @@ MESSAGE(STATUS "build parser unit test") SET(CMAKE_CXX_STANDARD 11) AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) -ADD_EXECUTABLE(commonTest ${SOURCE_LIST}) +ADD_EXECUTABLE(commonTest "") +TARGET_SOURCES( + commonTest + PRIVATE + "commonTests.cpp" +) TARGET_LINK_LIBRARIES( commonTest PUBLIC os util common gtest @@ -24,7 +29,12 @@ target_sources( PRIVATE "dataformatTest.cpp" ) -target_link_libraries(dataformatTest gtest gtest_main util) +target_link_libraries(dataformatTest gtest gtest_main util common) +target_include_directories( + dataformatTest + PUBLIC "${TD_SOURCE_DIR}/include/common" + PUBLIC "${TD_SOURCE_DIR}/include/util" +) # tmsg test # add_executable(tmsgTest "") diff --git a/source/common/test/dataformatTest.cpp b/source/common/test/dataformatTest.cpp index 3497014c22..81e91da0d6 100644 --- a/source/common/test/dataformatTest.cpp +++ b/source/common/test/dataformatTest.cpp @@ -1 +1,481 @@ -#include "gtest/gtest.h" \ No newline at end of file +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include + +#include +#include +#include +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" + +#define NONE_CSTR "no" +#define NULL_CSTR "nu" +#define NONE_LEN 2 +#define NULL_LEN 2 +const static int16_t MAX_COLS = 14; + +int main(int argc, char **argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +// ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 binary(10), c6 nchar(10), c7 tinyint, c8 smallint, c9 bool +STSchema *genSTSchema(int16_t nCols) { + EXPECT_LE(nCols, MAX_COLS); + SSchema *pSchema = (SSchema *)taosMemoryCalloc(nCols, sizeof(SSchema)); + EXPECT_NE(pSchema, nullptr); + + for (int16_t i = 0; i < nCols; ++i) { + pSchema[i].colId = PRIMARYKEY_TIMESTAMP_COL_ID + i; + char colName[TSDB_COL_NAME_LEN] = {0}; + snprintf(colName, TSDB_COL_NAME_LEN, "c%" PRIi16, i); + strncpy(pSchema[i].name, colName, TSDB_COL_NAME_LEN); + + switch (i) { + case 0: { + pSchema[0].type = TSDB_DATA_TYPE_TIMESTAMP; + pSchema[0].bytes = TYPE_BYTES[pSchema[0].type]; + } break; + case 1: { + pSchema[1].type = TSDB_DATA_TYPE_INT; + pSchema[1].bytes = TYPE_BYTES[pSchema[1].type]; + ; + } break; + case 2: { + pSchema[2].type = TSDB_DATA_TYPE_BIGINT; + pSchema[2].bytes = TYPE_BYTES[pSchema[2].type]; + } break; + case 3: { + pSchema[3].type = TSDB_DATA_TYPE_FLOAT; + pSchema[3].bytes = TYPE_BYTES[pSchema[3].type]; + } break; + case 4: { + pSchema[4].type = TSDB_DATA_TYPE_DOUBLE; + pSchema[4].bytes = TYPE_BYTES[pSchema[4].type]; + } break; + case 5: { + pSchema[5].type = TSDB_DATA_TYPE_BINARY; + pSchema[5].bytes = 12; + } break; + case 6: { + pSchema[6].type = TSDB_DATA_TYPE_NCHAR; + pSchema[6].bytes = 42; + } break; + case 7: { + pSchema[7].type = TSDB_DATA_TYPE_TINYINT; + pSchema[7].bytes = TYPE_BYTES[pSchema[7].type]; + } break; + case 8: { + pSchema[8].type = TSDB_DATA_TYPE_SMALLINT; + pSchema[8].bytes = TYPE_BYTES[pSchema[8].type]; + } break; + case 9: { + pSchema[9].type = TSDB_DATA_TYPE_BOOL; + pSchema[9].bytes = TYPE_BYTES[pSchema[9].type]; + } break; + case 10: { + pSchema[10].type = TSDB_DATA_TYPE_UTINYINT; + pSchema[10].bytes = TYPE_BYTES[pSchema[10].type]; + } break; + case 11: { + pSchema[11].type = TSDB_DATA_TYPE_USMALLINT; + pSchema[11].bytes = TYPE_BYTES[pSchema[11].type]; + } break; + case 12: { + pSchema[12].type = TSDB_DATA_TYPE_UINT; + pSchema[12].bytes = TYPE_BYTES[pSchema[12].type]; + } break; + case 13: { + pSchema[13].type = TSDB_DATA_TYPE_UBIGINT; + pSchema[13].bytes = TYPE_BYTES[pSchema[13].type]; + } break; + + default: + ASSERT(0); + break; + } + } + + STSchema *pResult = NULL; + pResult = tdGetSTSChemaFromSSChema(&pSchema, nCols); + + taosMemoryFree(pSchema); + return pResult; +} + +// ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 binary(10), c6 nchar(10), c7 tinyint, c8 smallint, c9 bool +static int32_t genTestData(const char **data, int16_t nCols, SArray **pArray) { + if (!(*pArray)) { + *pArray = taosArrayInit(nCols, sizeof(SColVal)); + if (!(*pArray)) return -1; + } + + for (int16_t i = 0; i < nCols; ++i) { + SColVal colVal = {0}; + colVal.cid = PRIMARYKEY_TIMESTAMP_COL_ID + i; + if (strncasecmp(data[i], NONE_CSTR, NONE_LEN) == 0) { + colVal.isNone = 1; + taosArrayPush(*pArray, &colVal); + continue; + } else if (strncasecmp(data[i], NULL_CSTR, NULL_LEN) == 0) { + colVal.isNull = 1; + taosArrayPush(*pArray, &colVal); + continue; + } + + switch (i) { + case 0: + sscanf(data[i], "%" PRIi64, &colVal.value.ts); + break; + case 1: { + sscanf(data[i], "%" PRIi32, &colVal.value.i32); + } break; + case 2: + sscanf(data[i], "%" PRIi64, &colVal.value.i64); + break; + case 3: + sscanf(data[i], "%f", &colVal.value.f); + break; + case 4: + sscanf(data[i], "%lf", &colVal.value.d); + break; + case 5: { + int16_t dataLen = strlen(data[i]) + 1; + colVal.value.nData = dataLen < 10 ? dataLen : 10; + colVal.value.pData = (uint8_t *)data[i]; + } break; + case 6: { + int16_t dataLen = strlen(data[i]) + 1; + colVal.value.nData = dataLen < 40 ? dataLen : 40; + colVal.value.pData = (uint8_t *)data[i]; // just for test, not real nchar + } break; + case 7: + case 9: { + int32_t d8; + sscanf(data[i], "%" PRId32, &d8); + colVal.value.i8 = (int8_t)d8; + } break; + case 8: { + int32_t d16; + sscanf(data[i], "%" PRId32, &d16); + colVal.value.i16 = (int16_t)d16; + } break; + case 10: { + uint32_t u8; + sscanf(data[i], "%" PRId32, &u8); + colVal.value.u8 = (uint8_t)u8; + } break; + case 11: { + uint32_t u16; + sscanf(data[i], "%" PRId32, &u16); + colVal.value.u16 = (uint16_t)u16; + } break; + case 12: { + sscanf(data[i], "%" PRIu32, &colVal.value.u32); + } break; + case 13: { + sscanf(data[i], "%" PRIu64, &colVal.value.u64); + } break; + default: + ASSERT(0); + } + taosArrayPush(*pArray, &colVal); + } + return 0; +} + +int32_t debugPrintSColVal(SColVal *cv, int8_t type) { + if (cv->isNone) { + printf("None "); + return 0; + } + if (cv->isNull) { + printf("Null "); + return 0; + } + switch (type) { + case TSDB_DATA_TYPE_BOOL: + printf("%s ", cv->value.i8 == 0 ? "false" : "true"); + break; + case TSDB_DATA_TYPE_TINYINT: + printf("%" PRIi8 " ", cv->value.i8); + break; + case TSDB_DATA_TYPE_SMALLINT: + printf("%" PRIi16 " ", cv->value.i16); + break; + case TSDB_DATA_TYPE_INT: + printf("%" PRIi32 " ", cv->value.i32); + break; + case TSDB_DATA_TYPE_BIGINT: + printf("%" PRIi64 " ", cv->value.i64); + break; + case TSDB_DATA_TYPE_FLOAT: + printf("%f ", cv->value.f); + break; + case TSDB_DATA_TYPE_DOUBLE: + printf("%lf ", cv->value.d); + break; + case TSDB_DATA_TYPE_VARCHAR: { + char tv[15] = {0}; + snprintf(tv, 15, "%s", cv->value.pData); + printf("%s ", tv); + } break; + case TSDB_DATA_TYPE_TIMESTAMP: + printf("%" PRIi64 " ", cv->value.i64); + break; + case TSDB_DATA_TYPE_NCHAR: { + char tv[15] = {0}; + snprintf(tv, 15, "%s", cv->value.pData); + printf("%s ", tv); + } break; + case TSDB_DATA_TYPE_UTINYINT: + printf("%" PRIu8 " ", cv->value.u8); + break; + case TSDB_DATA_TYPE_USMALLINT: + printf("%" PRIu16 " ", cv->value.u16); + break; + case TSDB_DATA_TYPE_UINT: + printf("%" PRIu32 " ", cv->value.u32); + break; + case TSDB_DATA_TYPE_UBIGINT: + printf("%" PRIu64 " ", cv->value.u64); + break; + case TSDB_DATA_TYPE_JSON: + printf("JSON "); + break; + case TSDB_DATA_TYPE_VARBINARY: + printf("VARBIN "); + break; + case TSDB_DATA_TYPE_DECIMAL: + printf("DECIMAL "); + break; + case TSDB_DATA_TYPE_BLOB: + printf("BLOB "); + break; + case TSDB_DATA_TYPE_MEDIUMBLOB: + printf("MedBLOB "); + break; + // case TSDB_DATA_TYPE_BINARY: + // printf("BINARY "); + // break; + case TSDB_DATA_TYPE_MAX: + printf("UNDEF "); + break; + default: + printf("UNDEF "); + break; + } + return 0; +} + +void debugPrintTSRow(STSRow2 *row, STSchema *pTSchema, const char *tags, int32_t ln) { + printf("%s:%d %s:v%d:%d ", tags, ln, (row->flags & 0xf0) ? "KV" : "TP", row->sver, row->nData); + for (int16_t i = 0; i < schemaNCols(pTSchema); ++i) { + SColVal cv = {0}; + tTSRowGet(row, pTSchema, i, &cv); + debugPrintSColVal(&cv, pTSchema->columns[i].type); + } + printf("\n"); + fflush(stdout); +} + +static int32_t checkSColVal(const char *rawVal, SColVal *cv, int8_t type) { + ASSERT(rawVal); + + if (cv->isNone) { + EXPECT_STRCASEEQ(rawVal, NONE_CSTR); + return 0; + } + if (cv->isNull) { + EXPECT_STRCASEEQ(rawVal, NULL_CSTR); + return 0; + } + + SValue rawSVal = {0}; + switch (type) { + case TSDB_DATA_TYPE_BOOL: + case TSDB_DATA_TYPE_TINYINT: { + int32_t d8; + sscanf(rawVal, "%" PRId32, &d8); + EXPECT_EQ(cv->value.i8, (int8_t)d8); + } break; + case TSDB_DATA_TYPE_SMALLINT: { + int32_t d16; + sscanf(rawVal, "%" PRId32, &d16); + EXPECT_EQ(cv->value.i16, (int16_t)d16); + } break; + case TSDB_DATA_TYPE_INT: { + sscanf(rawVal, "%" PRId32, &rawSVal.i32); + EXPECT_EQ(cv->value.i32, rawSVal.i32); + } break; + case TSDB_DATA_TYPE_BIGINT: { + sscanf(rawVal, "%" PRIi64, &rawSVal.i64); + EXPECT_EQ(cv->value.i64, rawSVal.i64); + } break; + case TSDB_DATA_TYPE_FLOAT: { + sscanf(rawVal, "%f", &rawSVal.f); + EXPECT_FLOAT_EQ(cv->value.f, rawSVal.f); + } break; + case TSDB_DATA_TYPE_DOUBLE: { + sscanf(rawVal, "%lf", &rawSVal.d); + EXPECT_DOUBLE_EQ(cv->value.d, rawSVal.d); + } break; + case TSDB_DATA_TYPE_VARCHAR: { + EXPECT_STRCASEEQ(rawVal, (const char *)cv->value.pData); + } break; + case TSDB_DATA_TYPE_TIMESTAMP: { + sscanf(rawVal, "%" PRIi64, &rawSVal.ts); + EXPECT_DOUBLE_EQ(cv->value.ts, rawSVal.ts); + } break; + case TSDB_DATA_TYPE_NCHAR: { + EXPECT_STRCASEEQ(rawVal, (const char *)cv->value.pData); // informal nchar comparsion + } break; + case TSDB_DATA_TYPE_UTINYINT: { + uint32_t u8; + sscanf(rawVal, "%" PRIu32, &u8); + EXPECT_EQ(cv->value.u8, (uint8_t)u8); + } break; + case TSDB_DATA_TYPE_USMALLINT: { + uint32_t u16; + sscanf(rawVal, "%" PRIu32, &u16); + EXPECT_EQ(cv->value.u16, (uint16_t)u16); + } break; + case TSDB_DATA_TYPE_UINT: { + sscanf(rawVal, "%" PRIu32, &rawSVal.u32); + EXPECT_EQ(cv->value.u32, rawSVal.u32); + } break; + case TSDB_DATA_TYPE_UBIGINT: { + sscanf(rawVal, "%" PRIu64, &rawSVal.u64); + EXPECT_EQ(cv->value.u64, rawSVal.u64); + } break; + case TSDB_DATA_TYPE_JSON: + printf("JSON "); + break; + case TSDB_DATA_TYPE_VARBINARY: + printf("VARBIN "); + break; + case TSDB_DATA_TYPE_DECIMAL: + printf("DECIMAL "); + break; + case TSDB_DATA_TYPE_BLOB: + printf("BLOB "); + break; + case TSDB_DATA_TYPE_MEDIUMBLOB: + printf("MedBLOB "); + break; + // case TSDB_DATA_TYPE_BINARY: + // printf("BINARY "); + // break; + case TSDB_DATA_TYPE_MAX: + printf("UNDEF "); + break; + default: + printf("UNDEF "); + break; + } + return 0; +} + +static void checkTSRow(const char **data, STSRow2 *row, STSchema *pTSchema) { + for (int16_t i = 0; i < schemaNCols(pTSchema); ++i) { + SColVal cv = {0}; + tTSRowGet(row, pTSchema, i, &cv); + checkSColVal(data[i], &cv, pTSchema->columns[i].type); + } +} + +TEST(testCase, AllNormTest) { + int16_t nCols = 1; + STSRow2 *row = nullptr; + SArray *pArray = taosArrayInit(nCols, sizeof(SColVal)); + EXPECT_NE(pArray, nullptr); + + STSchema *pTSchema = genSTSchema(nCols); + EXPECT_NE(pTSchema, nullptr); + + // ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 binary(10), c6 nchar(10), c7 tinyint, c8 smallint, + // c9 bool + char *data[10] = {"1653694220000", "10", "20", "10.1", "10.1", "binary10", "nchar10", "10", "10", "1"}; + + genTestData((const char **)&data, nCols, &pArray); + + tTSRowNew(NULL, pArray, pTSchema, &row); + + debugPrintTSRow(row, pTSchema, __func__, __LINE__); + checkTSRow((const char **)&data, row, pTSchema); + + taosArrayDestroy(pArray); + taosMemoryFree(pTSchema); +} + +#if 1 +TEST(testCase, NoneTest) { + const static int nCols = 14; + const static int nRows = 20; + STSRow2 *row = nullptr; + SArray *pArray = taosArrayInit(nCols, sizeof(SColVal)); + EXPECT_NE(pArray, nullptr); + + STSchema *pTSchema = genSTSchema(nCols); + EXPECT_NE(pTSchema, nullptr); + + // ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 binary(10), c6 nchar(10), c7 tinyint, c8 smallint, + // c9 bool c10 tinyint unsigned, c11 smallint unsigned, c12 int unsigned, c13 bigint unsigned + const char *data[nRows][nCols] = { + {"1653694220000", "no", "20", "10.1", "10.1", "binary10", "no", "10", "10", "nu", "10", "20", "30", "40"}, + {"1653694220001", "no", "no", "no", "no", "no", "no", "no", "no", "no", "no", "no", "no", "no"}, + {"1653694220002", "no", "no", "no", "no", "no", "nu", "no", "no", "no", "no", "no", "no", "nu"}, + {"1653694220003", "nu", "no", "no", "no", "no", "nu", "no", "no", "no", "no", "no", "no", "no"}, + {"1653694220004", "no", "20", "no", "no", "no", "nchar10", "no", "no", "no", "no", "no", "no", "no"}, + {"1653694220005", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu"}, + {"1653694220006", "no", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu"}, + {"1653694220007", "no", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "no"}, + {"1653694220008", "no", "nu", "nu", "nu", "binary10", "nu", "nu", "nu", "nu", "nu", "nu", "nu", "no"}, + {"1653694220009", "no", "nu", "nu", "nu", "binary10", "nu", "nu", "10", "no", "nu", "nu", "nu", "100"}, + {"1653694220010", "-1", "-1", "-1", "-1", "binary10", "nu", "-1", "0", "0", "0", "0", "0", "0"}, + {"1653694220011", "-2147483648", "nu", "nu", "nu", "biy10", "nu", "nu", "32767", "no", "nu", "nu", "nu", "100"}, + {"1653694220012", "2147483647", "nu", "nu", "nu", "ary10", "nu", "nu", "-32768", "no", "nu", "nu", "nu", "100"}, + {"1653694220013", "no", "-9223372036854775818", "nu", "nu", "b1", "nu", "nu", "10", "no", "nu", "nu", "nu", "nu"}, + {"1653694220014", "no", "nu", "nu", "nu", "b0", "nu", "nu", "10", "no", "nu", "nu", "nu", "9223372036854775808"}, + {"1653694220015", "no", "nu", "nu", "nu", "binary30", "char4", "nu", "10", "no", "nu", "nu", "nu", + "18446744073709551615"}, + {"1653694220016", "2147483647", "nu", "nu", "nu", "bin50", "nu", "nu", "10", "no", "nu", "nu", "nu", "100"}, + {"1653694220017", "2147483646", "0", "0", "0", "binary10", "0", "0", "0", "0", "255", "0", "0", "0"}, + {"1653694220018", "no", "-9223372036854775808", "nu", "nu", "binary10", "nu", "nu", "10", "no", "nu", "nu", + "4294967295", "100"}, + {"1653694220019", "no", "9223372036854775807", "nu", "nu", "bin10", "nu", "nu", "10", "no", "254", "nu", "nu", + "no"}}; + + + for (int r = 0; r < nRows; ++r) { + genTestData((const char **)&data[r], nCols, &pArray); + tTSRowNew(NULL, pArray, pTSchema, &row); + debugPrintTSRow(row, pTSchema, __func__, __LINE__); // debug print + checkTSRow((const char **)&data[r], row, pTSchema); // check + taosMemoryFreeClear(row); + taosArrayClear(pArray); + } + + taosArrayDestroy(pArray); + taosMemoryFree(pTSchema); +} +#endif \ No newline at end of file diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 0f7daf0e1d..7e5379b0f8 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -34,6 +34,7 @@ typedef struct SVnodeMgmt { SQWorkerPool fetchPool; SWWorkerPool syncPool; SWWorkerPool writePool; + SWWorkerPool applyPool; SWWorkerPool mergePool; SSingleWorker mgmtWorker; SSingleWorker monitorWorker; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 0ae6c2b336..90e4e30261 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -352,7 +352,9 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RECOVER_RSP, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index b6913f93f2..88831384d4 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -16,10 +16,8 @@ #define _DEFAULT_SOURCE #include "vmInt.h" -#include "sync.h" -#include "syncTools.h" - static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) { + if (pMsg->info.handle == NULL) return; SRpcMsg rsp = { .code = code, .pCont = pMsg->info.rsp, @@ -55,7 +53,7 @@ static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { if (IsReq(pMsg)) { if (code != 0) { if (terrno != 0) code = terrno; - dError("msg:%p failed to process since %s", pMsg, terrstr()); + dError("msg:%p, failed to process since %s", pMsg, terrstr()); } vmSendRsp(pMsg, code); } @@ -107,11 +105,9 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf int32_t code = vnodeProcessSyncReq(pVnode->pImpl, pMsg, NULL); if (code != 0) { + if (terrno != 0) code = terrno; dError("vgId:%d, msg:%p failed to sync since %s", pVnode->vgId, pMsg, terrstr()); - if (pMsg->info.handle != NULL) { - if (terrno != 0) code = terrno; - vmSendRsp(pMsg, code); - } + vmSendRsp(pMsg, code); } dTrace("vgId:%d, msg:%p is freed, code:0x%x", pVnode->vgId, pMsg, code); @@ -130,8 +126,8 @@ static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, pMsg, pInfo); if (code != 0) { - dError("vgId:%d, msg:%p failed to merge since %s", pVnode->vgId, pMsg, terrstr()); if (terrno != 0) code = terrno; + dError("vgId:%d, msg:%p failed to merge since %s", pVnode->vgId, pMsg, terrstr()); vmSendRsp(pMsg, code); } @@ -150,7 +146,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); if (pVnode == NULL) { - dError("vgId:%d, failed to put msg:%p into vnode queue since %s, type:%s", pHead->vgId, pMsg, terrstr(), + dError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s", pHead->vgId, pMsg, terrstr(), TMSG_INFO(pMsg->msgType)); return terrno != 0 ? terrno : -1; } @@ -260,7 +256,7 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeMsg); pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue); - pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeApplyMsg); + pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyMsg); pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue); pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeQueue); @@ -277,7 +273,7 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { void vmFreeQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ); - tWWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ); + tWWorkerFreeQueue(&pMgmt->applyPool, pVnode->pApplyQ); tWWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); tQWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); tQWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); @@ -309,6 +305,11 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { pWPool->max = tsNumOfVnodeWriteThreads; if (tWWorkerInit(pWPool) != 0) return -1; + SWWorkerPool *pAPool = &pMgmt->applyPool; + pAPool->name = "vnode-apply"; + pAPool->max = tsNumOfVnodeWriteThreads; + if (tWWorkerInit(pAPool) != 0) return -1; + SWWorkerPool *pSPool = &pMgmt->syncPool; pSPool->name = "vnode-sync"; pSPool->max = tsNumOfVnodeSyncThreads; @@ -345,6 +346,7 @@ void vmStopWorker(SVnodeMgmt *pMgmt) { tSingleWorkerCleanup(&pMgmt->monitorWorker); tSingleWorkerCleanup(&pMgmt->mgmtWorker); tWWorkerCleanup(&pMgmt->writePool); + tWWorkerCleanup(&pMgmt->applyPool); tWWorkerCleanup(&pMgmt->syncPool); tQWorkerCleanup(&pMgmt->queryPool); tQWorkerCleanup(&pMgmt->fetchPool); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 181cddee47..a1b8d81d58 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -448,7 +448,7 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { - SStreamDispatchRsp* pRsp = pMsg->pCont; + SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t taskId = pRsp->taskId; SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); streamProcessDispatchRsp(pTask, &pTq->pVnode->msgCb, pRsp); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 4f02c559b1..68d71a0c24 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -688,6 +688,10 @@ typedef struct SSortedMergeOperatorInfo { int32_t numOfResPerPage; char** groupVal; SArray *groupInfo; + + bool hasGroupId; + uint64_t groupId; + STupleHandle* prefetchedTuple; } SSortedMergeOperatorInfo; typedef struct SSortOperatorInfo { @@ -700,6 +704,10 @@ typedef struct SSortOperatorInfo { int64_t startTs; // sort start time uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. + + STupleHandle *prefetchedTuple; + bool hasGroupId; + uint64_t groupId; } SSortOperatorInfo; typedef struct STagFilterOperatorInfo { @@ -759,7 +767,7 @@ void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode); SColumn extractColumnFromColumnNode(SColumnNode* pColNode); -SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo); +SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo, SSortOperatorInfo* pInfo); SSDataBlock* loadNextDataBlock(void* param); void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7b73fd8ae9..b4579b90b2 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3103,6 +3103,68 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) { return (pInfo->binfo.pRes->info.rows > 0) ? pInfo->binfo.pRes : NULL; } +SSDataBlock* getSortedMergeBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo, + SSortedMergeOperatorInfo *pInfo) { + blockDataCleanup(pDataBlock); + + SSDataBlock* p = tsortGetSortedDataBlock(pHandle); + if (p == NULL) { + return NULL; + } + + blockDataEnsureCapacity(p, capacity); + + while (1) { + STupleHandle* pTupleHandle = NULL; + if (pInfo->prefetchedTuple == NULL) { + pTupleHandle = tsortNextTuple(pHandle); + } else { + pTupleHandle = pInfo->prefetchedTuple; + pInfo->groupId = tsortGetGroupId(pTupleHandle); + pInfo->prefetchedTuple = NULL; + } + + if (pTupleHandle == NULL) { + break; + } + + uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); + if (!pInfo->hasGroupId) { + pInfo->groupId = tupleGroupId; + pInfo->hasGroupId = true; + appendOneRowToDataBlock(p, pTupleHandle); + } else if (pInfo->groupId == tupleGroupId) { + appendOneRowToDataBlock(p, pTupleHandle); + } else { + pInfo->prefetchedTuple = pTupleHandle; + break; + } + + if (p->info.rows >= capacity) { + break; + } + } + + if (p->info.rows > 0) { + int32_t numOfCols = taosArrayGetSize(pColMatchInfo); + for (int32_t i = 0; i < numOfCols; ++i) { + SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i); + ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID); + + SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId); + SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId); + colDataAssign(pDst, pSrc, p->info.rows); + } + + pDataBlock->info.rows = p->info.rows; + pDataBlock->info.capacity = p->info.rows; + pDataBlock->info.groupId = pInfo->groupId; + } + + blockDataDestroy(p); + return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; +} + static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -3111,7 +3173,7 @@ static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSortedMergeOperatorInfo* pInfo = pOperator->info; if (pOperator->status == OP_RES_TO_RETURN) { - return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, NULL); + return getSortedMergeBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, NULL, pInfo); } int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index cc76cc29e4..1e2a26386b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -737,8 +737,8 @@ static bool isStateWindow(SStreamBlockScanInfo* pInfo) { static bool prepareDataScan(SStreamBlockScanInfo* pInfo) { SSDataBlock* pSDB = pInfo->pUpdateRes; STimeWindow win = { - .skey = INT64_MIN, - .ekey = INT64_MAX, + .skey = INT64_MIN, + .ekey = INT64_MAX, }; bool needRead = false; if (!isStateWindow(pInfo) && pInfo->updateResIndex < pSDB->info.rows) { @@ -846,7 +846,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { size_t total = taosArrayGetSize(pInfo->pBlockLists); if (pInfo->blockType == STREAM_DATA_TYPE_SSDATA_BLOCK) { if (pInfo->validBlockIndex >= total) { - doClearBufferedBlocks(pInfo); + /*doClearBufferedBlocks(pInfo);*/ pOperator->status = OP_EXEC_DONE; return NULL; } diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 1e57de8438..dcaa95e28a 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -42,6 +42,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR pInfo->pSortInfo = pSortInfo; pInfo->pColMatchInfo = pColMatchColInfo; + pInfo->hasGroupId = false; + pInfo->prefetchedTuple = NULL; pOperator->name = "SortOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; pOperator->blocking = true; @@ -81,8 +83,8 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) { pBlock->info.rows += 1; } -SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, - SArray* pColMatchInfo) { +SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo, + SSortOperatorInfo* pInfo) { blockDataCleanup(pDataBlock); SSDataBlock* p = tsortGetSortedDataBlock(pHandle); @@ -93,14 +95,33 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i blockDataEnsureCapacity(p, capacity); while (1) { - STupleHandle* pTupleHandle = tsortNextTuple(pHandle); + STupleHandle* pTupleHandle = NULL; + if (pInfo->prefetchedTuple == NULL) { + pTupleHandle = tsortNextTuple(pHandle); + } else { + pTupleHandle = pInfo->prefetchedTuple; + pInfo->groupId = tsortGetGroupId(pTupleHandle); + pInfo->prefetchedTuple = NULL; + } + if (pTupleHandle == NULL) { break; } - appendOneRowToDataBlock(p, pTupleHandle); + uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle); + if (!pInfo->hasGroupId) { + pInfo->groupId = tupleGroupId; + pInfo->hasGroupId = true; + appendOneRowToDataBlock(p, pTupleHandle); + } else if (pInfo->groupId == tupleGroupId) { + appendOneRowToDataBlock(p, pTupleHandle); + } else { + pInfo->prefetchedTuple = pTupleHandle; + break; + } + if (p->info.rows >= capacity) { - return pDataBlock; + break; } } @@ -117,6 +138,7 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i pDataBlock->info.rows = p->info.rows; pDataBlock->info.capacity = p->info.rows; + pDataBlock->info.groupId = pInfo->groupId; } blockDataDestroy(p); @@ -188,8 +210,8 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { longjmp(pTaskInfo->env, code); } - SSDataBlock* pBlock = - getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->pColMatchInfo); + SSDataBlock* pBlock = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, + pInfo->pColMatchInfo, pInfo); if (pBlock != NULL) { pOperator->resultInfo.totalRows += pBlock->info.rows; @@ -230,11 +252,11 @@ typedef struct SMultiwaySortMergeOperatorInfo { SArray* pColMatchInfo; // for index map from table scan output SSDataBlock* pInputBlock; - int64_t startTs; // sort start time + int64_t startTs; // sort start time - bool hasGroupId; - uint64_t groupId; - STupleHandle *prefetchedTuple; + bool hasGroupId; + uint64_t groupId; + STupleHandle* prefetchedTuple; } SMultiwaySortMergeOperatorInfo; int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { @@ -274,7 +296,7 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) { } SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, - SArray* pColMatchInfo, SMultiwaySortMergeOperatorInfo* pInfo) { + SArray* pColMatchInfo, SMultiwaySortMergeOperatorInfo* pInfo) { blockDataCleanup(pDataBlock); SSDataBlock* p = tsortGetSortedDataBlock(pHandle); @@ -285,7 +307,6 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData blockDataEnsureCapacity(p, capacity); while (1) { - STupleHandle* pTupleHandle = NULL; if (pInfo->prefetchedTuple == NULL) { pTupleHandle = tsortNextTuple(pHandle); @@ -314,7 +335,6 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData if (p->info.rows >= capacity) { break; } - } if (p->info.rows > 0) { @@ -337,7 +357,6 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData return (pDataBlock->info.rows > 0) ? pDataBlock : NULL; } - SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -351,12 +370,8 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) { longjmp(pTaskInfo->env, code); } - SSDataBlock* pBlock = - getMultiwaySortedBlockData(pInfo->pSortHandle, - pInfo->binfo.pRes, - pOperator->resultInfo.capacity, - pInfo->pColMatchInfo, - pInfo); + SSDataBlock* pBlock = getMultiwaySortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, + pOperator->resultInfo.capacity, pInfo->pColMatchInfo, pInfo); if (pBlock != NULL) { pOperator->resultInfo.totalRows += pBlock->info.rows; @@ -367,7 +382,7 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) { } void destroyMultiwaySortMergeOperatorInfo(void* param, int32_t numOfOutput) { - SMultiwaySortMergeOperatorInfo * pInfo = (SMultiwaySortMergeOperatorInfo*)param; + SMultiwaySortMergeOperatorInfo* pInfo = (SMultiwaySortMergeOperatorInfo*)param; pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes); pInfo->pInputBlock = blockDataDestroy(pInfo->pInputBlock); @@ -387,9 +402,9 @@ int32_t getMultiwaySortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrEx return TSDB_CODE_SUCCESS; } -SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, SSDataBlock* pInputBlock, - SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo, - SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, + SSDataBlock* pInputBlock, SSDataBlock* pResBlock, SArray* pSortInfo, + SArray* pColMatchColInfo, SExecTaskInfo* pTaskInfo) { SMultiwaySortMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwaySortMergeOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); int32_t rowSize = pResBlock->info.rowSize; @@ -413,7 +428,8 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, pInfo->bufPageSize = rowSize < 1024 ? 1024 : rowSize * 2; pInfo->sortBufSize = pInfo->bufPageSize * 16; - + pInfo->hasGroupId = false; + pInfo->prefetchedTuple = NULL; pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL, diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index cbbdca48bf..ea8ded2c30 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -528,16 +528,24 @@ static int32_t createInitialSources(SSortHandle* pHandle) { if (pHandle->type == SORT_SINGLESOURCE_SORT) { SSortSource* source = taosArrayGetP(pHandle->pOrderedSource, 0); taosArrayClear(pHandle->pOrderedSource); - + + bool hasGroupId = false; + SSDataBlock* prefetchedDataBlock = NULL; + while (1) { - SSDataBlock* pBlock = pHandle->fetchfp(source->param); + SSDataBlock* pBlock = NULL; + if (prefetchedDataBlock == NULL) { + pBlock = pHandle->fetchfp(source->param); + } else { + pBlock = prefetchedDataBlock; + prefetchedDataBlock = NULL; + } + if (pBlock == NULL) { break; } - if (pHandle->pDataBlock == NULL) { - pHandle->pDataBlock = createOneDataBlock(pBlock, false); - + if (!hasGroupId) { // calculate the buffer pages according to the total available buffers. int32_t rowSize = blockDataGetRowSize(pBlock); if (rowSize * 4 > 4096) { @@ -549,29 +557,36 @@ static int32_t createInitialSources(SSortHandle* pHandle) { // todo!! pHandle->numOfPages = 1024; sortBufSize = pHandle->numOfPages * pHandle->pageSize; + + hasGroupId = true; + pHandle->pDataBlock = createOneDataBlock(pBlock, false); } - // perform the scalar function calculation before apply the sort - if (pHandle->beforeFp != NULL) { - pHandle->beforeFp(pBlock, pHandle->param); - } + if (pHandle->pDataBlock->info.groupId == pBlock->info.groupId) { + // perform the scalar function calculation before apply the sort + if (pHandle->beforeFp != NULL) { + pHandle->beforeFp(pBlock, pHandle->param); + } + // todo relocate the columns + int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock); + if (code != 0) { + return code; + } - // todo relocate the columns - int32_t code = blockDataMerge(pHandle->pDataBlock, pBlock); - if (code != 0) { - return code; - } + size_t size = blockDataGetSize(pHandle->pDataBlock); + if (size > sortBufSize) { + // Perform the in-memory sort and then flush data in the buffer into disk. + int64_t p = taosGetTimestampUs(); + blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo); - size_t size = blockDataGetSize(pHandle->pDataBlock); - if (size > sortBufSize) { - // Perform the in-memory sort and then flush data in the buffer into disk. - int64_t p = taosGetTimestampUs(); - blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo); + int64_t el = taosGetTimestampUs() - p; + pHandle->sortElapsed += el; - int64_t el = taosGetTimestampUs() - p; - pHandle->sortElapsed += el; - - doAddToBuf(pHandle->pDataBlock, pHandle); + doAddToBuf(pHandle->pDataBlock, pHandle); + } + } else { + prefetchedDataBlock = pBlock; + pHandle->pDataBlock = createOneDataBlock(pBlock, false); } } diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 56379f0e1f..3a9b94d71e 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -78,7 +78,6 @@ bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultI int32_t percentileFunction(SqlFunctionCtx *pCtx); int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); -int32_t getApercentileMaxSize(); bool getApercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool apercentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t apercentileFunction(SqlFunctionCtx *pCtx); @@ -86,6 +85,7 @@ int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx); int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); +int32_t getApercentileMaxSize(); bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo); @@ -103,13 +103,13 @@ int32_t topFunction(SqlFunctionCtx *pCtx); int32_t bottomFunction(SqlFunctionCtx *pCtx); int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); -int32_t getSpreadInfoSize(); bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t spreadFunction(SqlFunctionCtx* pCtx); int32_t spreadFunctionMerge(SqlFunctionCtx* pCtx); int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t getSpreadInfoSize(); bool getElapsedFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool elapsedFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); @@ -119,7 +119,10 @@ int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); bool getHistogramFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t histogramFunction(SqlFunctionCtx* pCtx); +int32_t histogramFunctionMerge(SqlFunctionCtx* pCtx); int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t histogramPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t getHistogramInfoSize(); bool getHLLFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t hllFunction(SqlFunctionCtx* pCtx); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index a7d6fd7371..eca4587c03 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -503,6 +503,58 @@ static int32_t translateHistogram(SFunctionNode* pFunc, char* pErrBuf, int32_t l return TSDB_CODE_SUCCESS; } +static int32_t translateHistogramImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) { + int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); + if (isPartial) { + if (4 != numOfParams) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + uint8_t colType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type; + if (!IS_NUMERIC_TYPE(colType)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + // param1 ~ param3 + for (int32_t i = 1; i < numOfParams; ++i) { + SNode* pParamNode = nodesListGetNode(pFunc->pParameterList, i); + if (QUERY_NODE_VALUE != nodeType(pParamNode)) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + SValueNode* pValue = (SValueNode*)pParamNode; + + pValue->notReserved = true; + } + + if (((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type != TSDB_DATA_TYPE_BINARY || + ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 2))->resType.type != TSDB_DATA_TYPE_BINARY || + ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 3))->resType.type != TSDB_DATA_TYPE_BIGINT) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + pFunc->node.resType = (SDataType){.bytes = getHistogramInfoSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}; + } else { + if (1 != numOfParams) { + return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); + } + + if (((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type != TSDB_DATA_TYPE_BINARY) { + return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName); + } + + pFunc->node.resType = (SDataType){.bytes = 512, .type = TSDB_DATA_TYPE_BINARY}; + } + return TSDB_CODE_SUCCESS; +} + +static int32_t translateHistogramPartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateHistogramImpl(pFunc, pErrBuf, len, true); +} +static int32_t translateHistogramMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { + return translateHistogramImpl(pFunc, pErrBuf, len, false); +} + static int32_t translateHLL(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { if (1 != LIST_LENGTH(pFunc->pParameterList)) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); @@ -1394,6 +1446,28 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .getEnvFunc = getHistogramFuncEnv, .initFunc = histogramFunctionSetup, .processFunc = histogramFunction, + .finalizeFunc = histogramFinalize, + .pPartialFunc = "_histogram_partial", + .pMergeFunc = "_histogram_merge" + }, + { + .name = "_histogram_partial", + .type = FUNCTION_TYPE_HISTOGRAM_PARTIAL, + .classification = FUNC_MGT_AGG_FUNC, + .translateFunc = translateHistogramPartial, + .getEnvFunc = getHistogramFuncEnv, + .initFunc = histogramFunctionSetup, + .processFunc = histogramFunction, + .finalizeFunc = histogramPartialFinalize + }, + { + .name = "_histogram_merge", + .type = FUNCTION_TYPE_HISTOGRAM_MERGE, + .classification = FUNC_MGT_AGG_FUNC, + .translateFunc = translateHistogramMerge, + .getEnvFunc = getHistogramFuncEnv, + .initFunc = functionSetup, + .processFunc = histogramFunctionMerge, .finalizeFunc = histogramFinalize }, { diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 0852779791..e0302f9598 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -2187,9 +2187,7 @@ int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SAPercentileInfo* pInfo = (SAPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo); - int32_t bytesHist = (int32_t)(sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1)); - int32_t bytesDigest = (int32_t)(sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION)); - int32_t resultBytes = TMAX(bytesHist, bytesDigest); + int32_t resultBytes = getApercentileMaxSize(); char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); if (pInfo->algo == APERCT_ALGO_TDIGEST) { @@ -3138,6 +3136,10 @@ int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return functionFinalize(pCtx, pBlock); } +int32_t getHistogramInfoSize() { + return (int32_t)sizeof(SHistoFuncInfo) + HISTOGRAM_MAX_BINS_NUM * sizeof(SHistoFuncBin); +} + bool getHistogramFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SHistoFuncInfo) + HISTOGRAM_MAX_BINS_NUM * sizeof(SHistoFuncBin); return true; @@ -3348,6 +3350,30 @@ int32_t histogramFunction(SqlFunctionCtx *pCtx) { return TSDB_CODE_SUCCESS; } +int32_t histogramFunctionMerge(SqlFunctionCtx *pCtx) { + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pCol = pInput->pData[0]; + ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); + + SHistoFuncInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + + int32_t start = pInput->startRowIndex; + char* data = colDataGetData(pCol, start); + SHistoFuncInfo* pInputInfo = (SHistoFuncInfo *)varDataVal(data); + + pInfo->normalized = pInputInfo->normalized; + pInfo->numOfBins = pInputInfo->numOfBins; + pInfo->totalCount += pInputInfo->totalCount; + for (int32_t k = 0; k < pInfo->numOfBins; ++k) { + pInfo->bins[k].lower = pInputInfo->bins[k].lower; + pInfo->bins[k].upper = pInputInfo->bins[k].upper; + pInfo->bins[k].count += pInputInfo->bins[k].count; + } + + SET_VAL(GET_RES_INFO(pCtx), pInfo->numOfBins, pInfo->numOfBins); + return TSDB_CODE_SUCCESS; +} + int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); SHistoFuncInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); @@ -3384,6 +3410,24 @@ int32_t histogramFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return pResInfo->numOfRes; } +int32_t histogramPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + SHistoFuncInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + int32_t resultBytes = getHistogramInfoSize(); + char *res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); + + memcpy(varDataVal(res), pInfo, resultBytes); + varDataSetLen(res, resultBytes); + + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); + + colDataAppend(pCol, pBlock->info.rows, res, false); + + taosMemoryFree(res); + return 1; +} + bool getHLLFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SHLLInfo); return true; diff --git a/source/libs/index/src/.indexFilter.c.swo b/source/libs/index/src/.indexFilter.c.swo deleted file mode 100644 index c61d5e7793..0000000000 Binary files a/source/libs/index/src/.indexFilter.c.swo and /dev/null differ diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 2b15f5031e..2369779105 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -901,12 +901,22 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam * if (hasFraction) { int32_t fracLen = (int32_t)strlen(fraction) + 1; - char *tzInfo = strchr(buf, '+'); - if (tzInfo) { + + char *tzInfo; + if (buf[len - 1] == 'z' || buf[len - 1] == 'Z') { + tzInfo = &buf[len - 1]; memmove(tzInfo + fracLen, tzInfo, strlen(tzInfo)); } else { - tzInfo = strchr(buf, '-'); - memmove(tzInfo + fracLen, tzInfo, strlen(tzInfo)); + tzInfo = strchr(buf, '+'); + if (tzInfo) { + memmove(tzInfo + fracLen, tzInfo, strlen(tzInfo)); + } else { + //search '-' backwards + tzInfo = strrchr(buf, '-'); + if (tzInfo) { + memmove(tzInfo + fracLen, tzInfo, strlen(tzInfo)); + } + } } char tmp[32] = {0}; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 823ce6d2fe..069595390d 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -59,7 +59,7 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* // rsp by input status void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); - ((SMsgHead*)buf)->vgId = htonl(pReq->sourceVg); + ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId); SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead)); pCont->inputStatus = status; pCont->streamId = pReq->streamId; @@ -78,7 +78,18 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp // 2.1. idle: exec // 2.2. executing: return // 2.3. closing: keep trying - streamExec(pTask, pMsgCb); + if (pTask->execType != TASK_EXEC__NONE) { + streamExec(pTask, pMsgCb); + } else { + ASSERT(pTask->sinkType != TASK_SINK__NONE); + while (1) { + void* data = streamQueueNextItem(pTask->inputQueue); + if (data == NULL) return 0; + if (streamTaskOutput(pTask, data) < 0) { + ASSERT(0); + } + } + } // 3. handle output // 3.1 check and set status diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index d1e3fa0799..16da418677 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -22,6 +22,7 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p if (tEncodeI32(pEncoder, pReq->sourceTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->sourceVg) < 0) return -1; if (tEncodeI32(pEncoder, pReq->sourceChildId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1; ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum); ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum); @@ -42,6 +43,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { if (tDecodeI32(pDecoder, &pReq->sourceTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->sourceVg) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->sourceChildId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1; ASSERT(pReq->blockNum > 0); pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*)); @@ -94,6 +96,7 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM .sourceTaskId = pTask->taskId, .sourceVg = data->sourceVg, .sourceChildId = pTask->childId, + .upstreamNodeId = pTask->nodeId, .blockNum = blockNum, }; @@ -184,13 +187,17 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) { #endif SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue); - if (pBlock == NULL) return 0; + if (pBlock == NULL) { + atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); + return 0; + } ASSERT(pBlock->type == STREAM_DATA_TYPE_SSDATA_BLOCK); SRpcMsg dispatchMsg = {0}; SEpSet* pEpSet = NULL; if (streamBuildDispatchMsg(pTask, pBlock, &dispatchMsg, &pEpSet) < 0) { ASSERT(0); + atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); return -1; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 72df516e0d..fe1a857743 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -65,6 +65,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { } qRes->type = STREAM_INPUT__DATA_BLOCK; qRes->blocks = pRes; + /*qRes->sourceVg = pTask->nodeId;*/ if (streamTaskOutput(pTask, qRes) < 0) { streamQueueProcessFail(pTask->inputQueue); taosArrayDestroy(pRes);