diff --git a/CMakeLists.txt b/CMakeLists.txt index 566d4ad29d..fb2b306f65 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -10,6 +10,8 @@ if (NOT DEFINED TD_SOURCE_DIR) set( TD_SOURCE_DIR ${PROJECT_SOURCE_DIR} ) endif() +SET(TD_COMMUNITY_DIR ${PROJECT_SOURCE_DIR}) + set(TD_SUPPORT_DIR "${TD_SOURCE_DIR}/cmake") set(TD_CONTRIB_DIR "${TD_SOURCE_DIR}/contrib") diff --git a/cmake/cmake.version b/cmake/cmake.version index de85025a8c..42de285c8e 100644 --- a/cmake/cmake.version +++ b/cmake/cmake.version @@ -16,7 +16,7 @@ find_program(HAVE_GIT NAMES git) IF (DEFINED GITINFO) SET(TD_VER_GIT ${GITINFO}) ELSEIF (HAVE_GIT) - execute_process(COMMAND git log -1 --format=%H WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} OUTPUT_VARIABLE GIT_COMMITID) + execute_process(COMMAND git log -1 --format=%H WORKING_DIRECTORY ${TD_COMMUNITY_DIR} OUTPUT_VARIABLE GIT_COMMITID) #message(STATUS "git log result:${GIT_COMMITID}") IF (GIT_COMMITID) string (REGEX REPLACE "[\n\t\r]" "" GIT_COMMITID ${GIT_COMMITID}) @@ -30,6 +30,23 @@ ELSE () SET(TD_VER_GIT "no git commit id") ENDIF () +IF (DEFINED GITINFOI) + SET(TD_VER_GIT_INTERNAL ${GITINFOI}) +ELSEIF (HAVE_GIT) + execute_process(COMMAND git log -1 --format=%H WORKING_DIRECTORY ${PROJECT_SOURCE_DIR} OUTPUT_VARIABLE GIT_COMMITID) + message(STATUS "git log result:${GIT_COMMITID}") + IF (GIT_COMMITID) + string (REGEX REPLACE "[\n\t\r]" "" GIT_COMMITID ${GIT_COMMITID}) + SET(TD_VER_GIT_INTERNAL ${GIT_COMMITID}) + ELSE () + message(STATUS "not a git repository") + SET(TD_VER_GIT "no git commit id") + ENDIF () +ELSE () + message(STATUS "no git cmd") + SET(TD_VER_GIT_INTERNAL "no git commit id") +ENDIF () + IF (DEFINED VERDATE) SET(TD_VER_DATE ${VERDATE}) ELSE () diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index 87f0579f44..40fa48b815 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG d11f210 + GIT_TAG 2864326 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h index 7b65c06b85..60a4a8c605 100644 --- a/include/libs/function/functionMgt.h +++ b/include/libs/function/functionMgt.h @@ -241,6 +241,7 @@ int32_t fmGetUdafExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet); int32_t fmSetInvertFunc(int32_t funcId, SFuncExecFuncs* pFpSet); int32_t fmSetNormalFunc(int32_t funcId, SFuncExecFuncs* pFpSet); bool fmIsInvertible(int32_t funcId); +char* fmGetFuncName(int32_t funcId); #ifdef __cplusplus } diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 08a6be8015..1cd32f05ad 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -48,7 +48,7 @@ extern "C" { #define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500 #define SYNC_SNAP_RESEND_MS 1000 * 60 -#define SYNC_VND_COMMIT_MIN_MS 1000 +#define SYNC_VND_COMMIT_MIN_MS 3000 #define SYNC_MAX_BATCH_SIZE 1 #define SYNC_INDEX_BEGIN 0 diff --git a/include/util/tarray.h b/include/util/tarray.h index 278f9f6bab..4bf24b46b9 100644 --- a/include/util/tarray.h +++ b/include/util/tarray.h @@ -208,6 +208,8 @@ void taosArrayDestroyP(SArray* pArray, FDelete fp); void taosArrayDestroyEx(SArray* pArray, FDelete fp); +void taosArraySwap(SArray* a, SArray* b); + /** * sort the array * @param pArray diff --git a/include/util/version.h b/include/util/version.h index bac29e5baf..b241dd248b 100644 --- a/include/util/version.h +++ b/include/util/version.h @@ -23,6 +23,7 @@ extern "C" { extern char version[]; extern char compatible_version[]; extern char gitinfo[]; +extern char gitinfoOfInternal[]; extern char buildinfo[]; #ifdef __cplusplus diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 6bd8b01842..b1299c16e8 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -183,7 +183,7 @@ static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) { } string = cJSON_PrintUnformatted(json); -end: + end: cJSON_Delete(json); tFreeSMAltertbReq(&req); return string; @@ -205,7 +205,7 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) { } string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE); uDebug("processCreateStb %s", string); -_err: + _err: tDecoderClear(&coder); return string; } @@ -227,7 +227,7 @@ static char* processAlterStb(SMqMetaRsp* metaRsp) { string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen); uDebug("processAlterStb %s", string); -_err: + _err: tDecoderClear(&coder); return string; } @@ -309,7 +309,7 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { cJSON_AddItemToArray(tags, tag); } -end: + end: cJSON_AddItemToObject(json, "tags", tags); taosArrayDestroy(pTagVals); } @@ -368,7 +368,7 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) { uDebug("processCreateTable :%s", string); } -_exit: + _exit: for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pCreateReq = req.pReqs + iReq; taosMemoryFreeClear(pCreateReq->comment); @@ -408,7 +408,7 @@ static char* processAutoCreateTable(STaosxRsp* rsp) { } string = buildCreateCTableJson(pCreateReq, rsp->createTableNum); uDebug("processAutoCreateTable :%s", string); -_exit: + _exit: for (int i = 0; i < rsp->createTableNum; i++) { tDecoderClear(&decoder[i]); taosMemoryFreeClear(pCreateReq[i].comment); @@ -535,7 +535,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) { string = cJSON_PrintUnformatted(json); uDebug("processAlterTable :%s", string); -_exit: + _exit: cJSON_Delete(json); tDecoderClear(&decoder); return string; @@ -569,7 +569,7 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) { string = cJSON_PrintUnformatted(json); uDebug("processDropSTable :%s", string); -_exit: + _exit: cJSON_Delete(json); tDecoderClear(&decoder); return string; @@ -609,7 +609,7 @@ static char* processDeleteTable(SMqMetaRsp* metaRsp) { string = cJSON_PrintUnformatted(json); uDebug("processDeleteTable :%s", string); -_exit: + _exit: cJSON_Delete(json); tDecoderClear(&coder); return string; @@ -652,7 +652,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) { string = cJSON_PrintUnformatted(json); uDebug("processDropTable :%s", string); -_exit: + _exit: cJSON_Delete(json); tDecoderClear(&decoder); return string; @@ -742,7 +742,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { code = pRequest->code; taosMemoryFree(pCmdMsg.pMsg); -end: + end: destroyRequest(pRequest); tFreeSMCreateStbReq(&pReq); tDecoderClear(&coder); @@ -839,7 +839,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) { code = pRequest->code; taosMemoryFree(pCmdMsg.pMsg); -end: + end: destroyRequest(pRequest); tDecoderClear(&coder); return code; @@ -901,9 +901,9 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch); SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, - .requestId = pRequest->requestId, - .requestObjRefId = pRequest->self, - .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; + .requestId = pRequest->requestId, + .requestObjRefId = pRequest->self, + .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName)); // loop to create table @@ -987,7 +987,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) { code = pRequest->code; -end: + end: for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pCreateReq = req.pReqs + iReq; taosMemoryFreeClear(pCreateReq->comment); @@ -1058,9 +1058,9 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch); SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, - .requestId = pRequest->requestId, - .requestObjRefId = pRequest->self, - .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; + .requestId = pRequest->requestId, + .requestObjRefId = pRequest->self, + .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName)); // loop to create table for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { @@ -1132,7 +1132,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) { } code = pRequest->code; -end: + end: taosHashCleanup(pVgroupHashmap); destroyRequest(pRequest); tDecoderClear(&coder); @@ -1201,7 +1201,7 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { } taos_free_result(res); -end: + end: tDecoderClear(&coder); return code; } @@ -1249,9 +1249,9 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { } SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, - .requestId = pRequest->requestId, - .requestObjRefId = pRequest->self, - .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; + .requestId = pRequest->requestId, + .requestObjRefId = pRequest->self, + .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; SVgroupInfo pInfo = {0}; SName pName = {0}; @@ -1311,7 +1311,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) { code = handleAlterTbExecRes(pRes->res, pCatalog); } } -end: + end: taosArrayDestroy(pArray); if (pVgData) taosMemoryFreeClear(pVgData->pData); taosMemoryFreeClear(pVgData); @@ -1399,7 +1399,7 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch launchQueryImpl(pRequest, pQuery, true, NULL); code = pRequest->code; -end: + end: taosMemoryFreeClear(pTableMeta); qDestroyQuery(pQuery); destroyRequest(pRequest); @@ -1481,7 +1481,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) launchQueryImpl(pRequest, pQuery, true, NULL); code = pRequest->code; -end: + end: taosMemoryFreeClear(pTableMeta); qDestroyQuery(pQuery); destroyRequest(pRequest); @@ -1601,6 +1601,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { uError("WriteRaw:rawBlockBindData failed"); goto end; } + taosMemoryFreeClear(pTableMeta); } code = smlBuildOutput(pQuery, pVgHash); @@ -1612,7 +1613,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { launchQueryImpl(pRequest, pQuery, true, NULL); code = pRequest->code; -end: + end: tDeleteSMqDataRsp(&rspObj.rsp); tDecoderClear(&decoder); qDestroyQuery(pQuery); @@ -1707,6 +1708,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) tDecoderInit(&decoderTmp, *dataTmp, *lenTmp); if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) { tDecoderClear(&decoderTmp); + tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE); uError("WriteRaw: tDecodeSVCreateTbReq error"); code = TSDB_CODE_TMQ_INVALID_MSG; goto end; @@ -1715,15 +1717,19 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) if (pCreateReq.type != TSDB_CHILD_TABLE) { uError("WriteRaw:pCreateReq.type != TSDB_CHILD_TABLE. table name: %s", tbName); code = TSDB_CODE_TSC_INVALID_VALUE; + tDecoderClear(&decoderTmp); + tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE); goto end; } if (strcmp(tbName, pCreateReq.name) == 0) { cloneSVreateTbReq(&pCreateReq, &pCreateReqDst); // pCreateReqDst->ctb.suid = processSuid(pCreateReqDst->ctb.suid, pRequest->pDb); tDecoderClear(&decoderTmp); + tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE); break; } tDecoderClear(&decoderTmp); + tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE); } SVgroupInfo vg; @@ -1774,6 +1780,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) goto end; } pCreateReqDst = NULL; + taosMemoryFreeClear(pTableMeta); } code = smlBuildOutput(pQuery, pVgHash); @@ -1785,7 +1792,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) launchQueryImpl(pRequest, pQuery, true, NULL); code = pRequest->code; -end: + end: tDeleteSTaosxRsp(&rspObj.rsp); tDecoderClear(&decoder); qDestroyQuery(pQuery); diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 2bb8708372..34808aa389 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -2439,6 +2439,12 @@ _exit: int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, int32_t nRows, char *lengthOrbitmap, char *data) { int32_t code = 0; + if(data == NULL){ + for (int32_t i = 0; i < nRows; ++i) { + code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0); + } + goto _exit; + } if (IS_VAR_DATA_TYPE(type)) { // var-length data type for (int32_t i = 0; i < nRows; ++i) { diff --git a/source/dnode/mgmt/exe/dmMain.c b/source/dnode/mgmt/exe/dmMain.c index d3cffaa185..9a092e2df5 100644 --- a/source/dnode/mgmt/exe/dmMain.c +++ b/source/dnode/mgmt/exe/dmMain.c @@ -18,6 +18,7 @@ #include "mnode.h" #include "tconfig.h" #include "tglobal.h" +#include "version.h" // clang-format off #define DM_APOLLO_URL "The apollo string to use when configuring the server, such as: -a 'jsonFile:./tests/cfg.json', cfg.json text can be '{\"fqdn\":\"td1\"}'." @@ -76,28 +77,28 @@ void dmLogCrash(int signum, void *sigInfo, void *context) { taosIgnSignal(SIGINT); taosIgnSignal(SIGBREAK); -#ifndef WINDOWS +#ifndef WINDOWS taosIgnSignal(SIGBUS); #endif taosIgnSignal(SIGABRT); taosIgnSignal(SIGFPE); taosIgnSignal(SIGSEGV); - char *pMsg = NULL; + char *pMsg = NULL; const char *flags = "UTL FATAL "; ELogLevel level = DEBUG_FATAL; int32_t dflag = 255; - int64_t msgLen= -1; - + int64_t msgLen = -1; + if (tsEnableCrashReport) { if (taosGenCrashJsonMsg(signum, &pMsg, dmGetClusterId(), global.startTime)) { taosPrintLog(flags, level, dflag, "failed to generate crash json msg"); goto _return; } else { - msgLen = strlen(pMsg); + msgLen = strlen(pMsg); } } - + _return: taosLogCrashInfo("taosd", pMsg, msgLen, signum, sigInfo); @@ -123,7 +124,7 @@ static void dmSetSignalHandle() { #ifndef WINDOWS taosSetSignal(SIGBUS, dmLogCrash); -#endif +#endif taosSetSignal(SIGABRT, dmLogCrash); taosSetSignal(SIGFPE, dmLogCrash); taosSetSignal(SIGSEGV, dmLogCrash); @@ -134,7 +135,7 @@ static int32_t dmParseArgs(int32_t argc, char const *argv[]) { int32_t cmdEnvIndex = 0; if (argc < 2) return 0; - + global.envCmd = taosMemoryMalloc((argc - 1) * sizeof(char *)); memset(global.envCmd, 0, (argc - 1) * sizeof(char *)); for (int32_t i = 1; i < argc; ++i) { @@ -203,6 +204,9 @@ static void dmPrintVersion() { #endif printf("%s version: %s compatible_version: %s\n", releaseName, version, compatible_version); printf("gitinfo: %s\n", gitinfo); +#ifdef TD_ENTERPRISE + printf("gitinfoOfInternal: %s\n", gitinfoOfInternal); +#endif printf("buildInfo: %s\n", buildinfo); } @@ -284,7 +288,7 @@ int mainWindows(int argc, char **argv) { printf("failed to init memory dbg, error:%s\n", tstrerror(code)); return code; } - tsAsyncLog = false; + tsAsyncLog = false; printf("memory dbg enabled\n"); } #endif diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 1d7c213e1a..9037644602 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -139,8 +139,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum, uint64_t reqId); // tqExec -int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp); -// int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp); +int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows); int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type); int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e1149b48af..4db53c1627 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -529,123 +529,133 @@ static int32_t doPollDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* p // this is a normal subscribe requirement if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset); - } else { // for taosX - // todo handle the case where re-balance occurs. - SMqMetaRsp metaRsp = {0}; - STaosxRsp taosxRsp = {0}; - tqInitTaosxRsp(&taosxRsp, pRequest); + } - if (offset.type != TMQ_OFFSET__LOG) { - if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &offset) < 0) { - return -1; - } + // todo handle the case where re-balance occurs. + // for taosx + SMqMetaRsp metaRsp = {0}; + STaosxRsp taosxRsp = {0}; + tqInitTaosxRsp(&taosxRsp, pRequest); - if (metaRsp.metaRspLen > 0) { - code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp); - tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 - ",ts:%" PRId64, - consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.ts); - taosMemoryFree(metaRsp.metaRsp); - tDeleteSTaosxRsp(&taosxRsp); - return code; - } - - if (taosxRsp.blockNum > 0) { - code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); - tDeleteSTaosxRsp(&taosxRsp); - return code; - } else { - offset = taosxRsp.rspOffset; - } - - tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64 - ",version:%" PRId64, - consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, - taosxRsp.rspOffset.version); + if (offset.type != TMQ_OFFSET__LOG) { + if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &offset) < 0) { + return -1; } - if (offset.type == TMQ_OFFSET__LOG) { - int64_t fetchVer = offset.version + 1; - pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); - if (pCkHead == NULL) { - tDeleteSTaosxRsp(&taosxRsp); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + if (metaRsp.metaRspLen > 0) { + code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp); + tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 + ",ts:%" PRId64, + consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, + metaRsp.rspOffset.ts); + taosMemoryFree(metaRsp.metaRsp); + tDeleteSTaosxRsp(&taosxRsp); + return code; + } + + if (taosxRsp.blockNum > 0) { + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + tDeleteSTaosxRsp(&taosxRsp); + return code; + } else { + offset = taosxRsp.rspOffset; + } + + tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64 + ",version:%" PRId64, + consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, + taosxRsp.rspOffset.version); + } + + if (offset.type == TMQ_OFFSET__LOG) { + int64_t fetchVer = offset.version + 1; + pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); + if (pCkHead == NULL) { + tDeleteSTaosxRsp(&taosxRsp); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + walSetReaderCapacity(pHandle->pWalReader, 2048); + int totalRows = 0; + while (1) { + // todo refactor: this is not correct. + int32_t savedEpoch = atomic_load_32(&pHandle->epoch); + if (savedEpoch > pRequest->epoch) { + tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64 + ", found new consumer epoch %d, discard req epoch %d", + consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch); + break; } - walSetReaderCapacity(pHandle->pWalReader, 2048); + if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { + tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + tDeleteSTaosxRsp(&taosxRsp); + taosMemoryFreeClear(pCkHead); + return code; + } - while (1) { - // todo refactor: this is not correct. - int32_t savedEpoch = atomic_load_32(&pHandle->epoch); - if (savedEpoch > pRequest->epoch) { - tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64 - ", found new consumer epoch %d, discard req epoch %d", - consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch); - break; - } + SWalCont* pHead = &pCkHead->head; + tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId, + pRequest->epoch, vgId, fetchVer, pHead->msgType); - if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { - tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); + // process meta + if (pHead->msgType != TDMT_VND_SUBMIT) { + if(totalRows > 0) { + tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1); code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); tDeleteSTaosxRsp(&taosxRsp); taosMemoryFreeClear(pCkHead); return code; } - SWalCont* pHead = &pCkHead->head; - tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", - consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType); - - if (pHead->msgType == TDMT_VND_SUBMIT) { - SPackedData submit = { - .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)), - .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg), - .ver = pHead->version, - }; - - if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) { - tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, vgId, - pRequest->subKey); - return -1; - } - - if (taosxRsp.blockNum > 0) { - tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); - code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); - tDeleteSTaosxRsp(&taosxRsp); - taosMemoryFreeClear(pCkHead); - return code; - } else { - fetchVer++; - } - - } else { - /*A(pHandle->fetchMeta);*/ - /*A(IS_META_MSG(pHead->msgType));*/ - tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType)); - tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer); - metaRsp.resMsgType = pHead->msgType; - metaRsp.metaRspLen = pHead->bodyLen; - metaRsp.metaRsp = pHead->body; - if (tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp) < 0) { - code = -1; - taosMemoryFreeClear(pCkHead); - tDeleteSTaosxRsp(&taosxRsp); - return code; - } - code = 0; + tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType)); + tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer); + metaRsp.resMsgType = pHead->msgType; + metaRsp.metaRspLen = pHead->bodyLen; + metaRsp.metaRsp = pHead->body; + if (tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp) < 0) { + code = -1; taosMemoryFreeClear(pCkHead); tDeleteSTaosxRsp(&taosxRsp); return code; } + code = 0; + taosMemoryFreeClear(pCkHead); + tDeleteSTaosxRsp(&taosxRsp); + return code; + } + + // process data + SPackedData submit = { + .msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)), + .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg), + .ver = pHead->version, + }; + + if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows) < 0) { + tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, vgId, + pRequest->subKey); + taosMemoryFreeClear(pCkHead); + tDeleteSTaosxRsp(&taosxRsp); + return -1; + } + + if (totalRows >= 4096 || taosxRsp.createTableNum > 0) { + tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + tDeleteSTaosxRsp(&taosxRsp); + taosMemoryFreeClear(pCkHead); + return code; + } else { + fetchVer++; } } - - tDeleteSTaosxRsp(&taosxRsp); - taosMemoryFreeClear(pCkHead); - return 0; } + + tDeleteSTaosxRsp(&taosxRsp); + taosMemoryFreeClear(pCkHead); + return 0; } int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 36d0631195..c01108f114 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -249,23 +249,15 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta return 0; } -int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp) { +int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows) { STqExecHandle* pExec = &pHandle->execHandle; - /*A(pExec->subType != TOPIC_SUB_TYPE__COLUMN);*/ - SArray* pBlocks = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pSchemas = taosArrayInit(0, sizeof(void*)); if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { STqReader* pReader = pExec->pExecReader; - /*tqReaderSetDataMsg(pReader, pReq, 0);*/ tqReaderSetSubmitReq2(pReader, submit.msgStr, submit.msgLen, submit.ver); while (tqNextDataBlock2(pReader)) { - /*SSDataBlock block = {0};*/ - /*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/ - /*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/ - /*}*/ - taosArrayClear(pBlocks); taosArrayClear(pSchemas); SSubmitTbData* pSubmitTbDataRet = NULL; @@ -273,7 +265,6 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; } if (pRsp->withTbName) { - /*int64_t uid = pExec->pExecReader->msgIter.uid;*/ int64_t uid = pExec->pExecReader->lastBlkUid; if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) { taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); @@ -315,6 +306,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR SSDataBlock* pBlock = taosArrayGet(pBlocks, i); tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock), pTq->pVnode->config.tsdbCfg.precision); + totalRows += pBlock->info.rows; blockDataFreeRes(pBlock); SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i); taosArrayPush(pRsp->blockSchema, &pSW); @@ -323,13 +315,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR } } else if (pExec->subType == TOPIC_SUB_TYPE__DB) { STqReader* pReader = pExec->pExecReader; - /*tqReaderSetDataMsg(pReader, pReq, 0);*/ tqReaderSetSubmitReq2(pReader, submit.msgStr, submit.msgLen, submit.ver); while (tqNextDataBlockFilterOut2(pReader, pExec->execDb.pFilterOutTbUid)) { - /*SSDataBlock block = {0};*/ - /*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/ - /*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/ - /*}*/ taosArrayClear(pBlocks); taosArrayClear(pSchemas); SSubmitTbData* pSubmitTbDataRet = NULL; @@ -374,15 +361,11 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR tEncoderClear(&encoder); } - /*tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock),*/ - /*pTq->pVnode->config.tsdbCfg.precision);*/ - /*blockDataFreeRes(&block);*/ - /*tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);*/ - /*pRsp->blockNum++;*/ for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) { SSDataBlock* pBlock = taosArrayGet(pBlocks, i); tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock), pTq->pVnode->config.tsdbCfg.precision); + *totalRows += pBlock->info.rows; blockDataFreeRes(pBlock); SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i); taosArrayPush(pRsp->blockSchema, &pSW); @@ -392,9 +375,5 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR } taosArrayDestroy(pBlocks); taosArrayDestroy(pSchemas); -// if (pRsp->blockNum == 0) { -// return -1; -// } - return 0; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 8953e4b907..89d3239c33 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -4512,6 +4512,7 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_ int32_t i = 0, j = 0; int32_t size = (int32_t)taosArrayGetSize(pSup->pColAgg); taosArrayInsert(pSup->pColAgg, 0, pTsAgg); + size++; while (j < numOfCols && i < size) { SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i); @@ -4524,10 +4525,21 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_ if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) { SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows}; taosArrayInsert(pSup->pColAgg, i, &nullColAgg); + i += 1; + size++; } j += 1; } } + + while (j < numOfCols) { + if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) { + SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows}; + taosArrayInsert(pSup->pColAgg, i, &nullColAgg); + i += 1; + } + j++; + } } int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave) { @@ -4607,8 +4619,8 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, } else if (pAgg->colId < pSup->colId[j]) { i += 1; } else if (pSup->colId[j] < pAgg->colId) { - // ASSERT(pSup->colId[j] == PRIMARYKEY_TIMESTAMP_COL_ID); - pResBlock->pBlockAgg[pSup->slotId[j]] = &pSup->tsColAgg; + pResBlock->pBlockAgg[pSup->slotId[j]] = NULL; + *allHave = false; j += 1; } } @@ -5001,4 +5013,4 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proacti void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) { taosMemoryFreeClear(pReader->idStr); pReader->idStr = taosStrdup(idstr); -} \ No newline at end of file +} diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 0779388ba9..09f1ca7877 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -156,13 +156,10 @@ int vnodeShouldCommit(SVnode *pVnode) { bool needCommit = false; taosThreadMutexLock(&pVnode->mutex); - if (!pVnode->inUse || !diskAvail) { - goto _out; + if (pVnode->inUse && diskAvail) { + needCommit = + ((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)); } - needCommit = - (((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) || - (pVnode->inUse->size > 0 && pSched->commitMs + pSched->maxWaitMs < nowMs)); -_out: taosThreadMutexUnlock(&pVnode->mutex); return needCommit; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index fd610b5bcd..3f519568c4 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -559,6 +559,7 @@ typedef struct SStreamIntervalOperatorInfo { STimeWindowAggSupp twAggSup; bool invertible; bool ignoreExpiredData; + bool ignoreExpiredDataSaved; SArray* pDelWins; // SWinRes int32_t delIndex; SSDataBlock* pDelRes; @@ -620,6 +621,7 @@ typedef struct SStreamSessionAggOperatorInfo { SPhysiNode* pPhyNode; // create new child bool isFinal; bool ignoreExpiredData; + bool ignoreExpiredDataSaved; SArray* pUpdated; SSHashObj* pStUpdated; } SStreamSessionAggOperatorInfo; @@ -637,6 +639,7 @@ typedef struct SStreamStateAggOperatorInfo { void* pDelIterator; SArray* pChildren; // cache for children's result; bool ignoreExpiredData; + bool ignoreExpiredDataSaved; SArray* pUpdated; SSHashObj* pSeUpdated; } SStreamStateAggOperatorInfo; diff --git a/source/libs/executor/inc/tfill.h b/source/libs/executor/inc/tfill.h index fad4059515..726f0df1e8 100644 --- a/source/libs/executor/inc/tfill.h +++ b/source/libs/executor/inc/tfill.h @@ -92,8 +92,8 @@ typedef struct SResultRowData { typedef struct SStreamFillLinearInfo { TSKEY nextEnd; - SArray* pDeltaVal; // double. value for Fill(linear). - SArray* pNextDeltaVal; // double. value for Fill(linear). + SArray* pEndPoints; + SArray* pNextEndPoints; int64_t winIndex; bool hasNext; } SStreamFillLinearInfo; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index dce358ab6d..953d614951 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2042,7 +2042,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags void printDataBlock(SSDataBlock* pBlock, const char* flag) { if (!pBlock || pBlock->info.rows == 0) { - qDebug("===stream===printDataBlock: Block is Null or Empty"); + qDebug("===stream===%s: Block is Null or Empty", flag); return; } char* pBuf = NULL; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 1ea2101aff..85f17c0d53 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -905,7 +905,8 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) { pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark; pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; pInfo->twAggSup.deleteMark = INT64_MAX; - + pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData; + pInfo->ignoreExpiredData = false; } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { @@ -921,6 +922,8 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) { pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark; pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; pInfo->twAggSup.deleteMark = INT64_MAX; + pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData; + pInfo->ignoreExpiredData = false; } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { SStreamStateAggOperatorInfo* pInfo = pOperator->info; ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || @@ -934,6 +937,8 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) { pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark; pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; pInfo->twAggSup.deleteMark = INT64_MAX; + pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData; + pInfo->ignoreExpiredData = false; } // iterate operator tree @@ -961,35 +966,23 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) { pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; - /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);*/ - /*ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);*/ - pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved; pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved; - /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||*/ - /*pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);*/ + pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved; qInfo("restore stream param for interval: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { SStreamSessionAggOperatorInfo* pInfo = pOperator->info; - /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);*/ - /*ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);*/ - pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved; pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved; - /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||*/ - /*pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);*/ + pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved; qInfo("restore stream param for session: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { SStreamStateAggOperatorInfo* pInfo = pOperator->info; - /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);*/ - /*ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);*/ - pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved; pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved; - /*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||*/ - /*pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);*/ + pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved; qInfo("restore stream param for state: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); } diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 2a33e3527a..234f1a666c 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -447,9 +447,14 @@ void* destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) { return NULL; } +void destroySPoint(void* ptr) { + SPoint* point = (SPoint*) ptr; + taosMemoryFreeClear(point->val); +} + void* destroyStreamFillLinearInfo(SStreamFillLinearInfo* pFillLinear) { - taosArrayDestroy(pFillLinear->pDeltaVal); - taosArrayDestroy(pFillLinear->pNextDeltaVal); + taosArrayDestroyEx(pFillLinear->pEndPoints, destroySPoint); + taosArrayDestroyEx(pFillLinear->pNextEndPoints, destroySPoint); taosMemoryFree(pFillLinear); return NULL; } @@ -611,19 +616,15 @@ static void calcDeltaData(SSDataBlock* pBlock, int32_t rowId, SResultRowData* pR } } -static void calcRowDeltaData(SResultRowData* pStartRow, SResultRowData* pEndRow, SArray* pDelta, SFillColInfo* pFillCol, - int32_t numOfCol, int32_t winCount) { +static void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, + int32_t numOfCol) { for (int32_t i = 0; i < numOfCol; i++) { if (!pFillCol[i].notFillCol) { int32_t slotId = GET_DEST_SLOT_ID(pFillCol + i); - SResultCellData* pSCell = getResultCell(pStartRow, slotId); - double start = 0.0; - GET_TYPED_DATA(start, double, pSCell->type, pSCell->pData); SResultCellData* pECell = getResultCell(pEndRow, slotId); - double end = 0.0; - GET_TYPED_DATA(end, double, pECell->type, pECell->pData); - double delta = (end - start) / winCount; - taosArraySet(pDelta, slotId, &delta); + SPoint* pPoint = taosArrayGet(pEndPoins, slotId); + pPoint->key = pEndRow->key; + memcpy(pPoint->val, pECell->pData, pECell->bytes); } } } @@ -674,10 +675,8 @@ void setDeleteFillValueInfo(TSKEY start, TSKEY end, SStreamFillSupporter* pFillS setFillKeyInfo(pFillSup->prev.key, pFillSup->next.key, &pFillSup->interval, pFillInfo); pFillInfo->pLinearInfo->hasNext = false; pFillInfo->pLinearInfo->nextEnd = INT64_MIN; - int32_t numOfWins = taosTimeCountInterval(pFillSup->prev.key, pFillSup->next.key, pFillSup->interval.sliding, - pFillSup->interval.slidingUnit, pFillSup->interval.precision); - calcRowDeltaData(&pFillSup->prev, &pFillSup->next, pFillInfo->pLinearInfo->pDeltaVal, pFillSup->pAllColInfo, - pFillSup->numOfAllCols, numOfWins); + calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, + pFillSup->numOfAllCols); pFillInfo->pResRow = &pFillSup->prev; pFillInfo->pLinearInfo->winIndex = 0; } break; @@ -780,25 +779,19 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_MID; pFillInfo->pLinearInfo->nextEnd = nextWKey; - int32_t numOfWins = taosTimeCountInterval(prevWKey, ts, pFillSup->interval.sliding, - pFillSup->interval.slidingUnit, pFillSup->interval.precision); - calcRowDeltaData(&pFillSup->prev, &pFillSup->cur, pFillInfo->pLinearInfo->pDeltaVal, pFillSup->pAllColInfo, - pFillSup->numOfAllCols, numOfWins); + calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, + pFillSup->numOfAllCols); pFillInfo->pResRow = &pFillSup->prev; - numOfWins = taosTimeCountInterval(ts, nextWKey, pFillSup->interval.sliding, pFillSup->interval.slidingUnit, - pFillSup->interval.precision); - calcRowDeltaData(&pFillSup->cur, &pFillSup->next, pFillInfo->pLinearInfo->pNextDeltaVal, pFillSup->pAllColInfo, - pFillSup->numOfAllCols, numOfWins); + calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pNextEndPoints, pFillSup->pAllColInfo, + pFillSup->numOfAllCols); pFillInfo->pLinearInfo->hasNext = true; } else if (hasPrevWindow(pFillSup)) { setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_END; pFillInfo->pLinearInfo->nextEnd = INT64_MIN; - int32_t numOfWins = taosTimeCountInterval(prevWKey, ts, pFillSup->interval.sliding, - pFillSup->interval.slidingUnit, pFillSup->interval.precision); - calcRowDeltaData(&pFillSup->prev, &pFillSup->cur, pFillInfo->pLinearInfo->pDeltaVal, pFillSup->pAllColInfo, - pFillSup->numOfAllCols, numOfWins); + calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, + pFillSup->numOfAllCols); pFillInfo->pResRow = &pFillSup->prev; pFillInfo->pLinearInfo->hasNext = false; } else { @@ -806,10 +799,8 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_START; pFillInfo->pLinearInfo->nextEnd = INT64_MIN; - int32_t numOfWins = taosTimeCountInterval(ts, nextWKey, pFillSup->interval.sliding, - pFillSup->interval.slidingUnit, pFillSup->interval.precision); - calcRowDeltaData(&pFillSup->cur, &pFillSup->next, pFillInfo->pLinearInfo->pDeltaVal, pFillSup->pAllColInfo, - pFillSup->numOfAllCols, numOfWins); + calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo, + pFillSup->numOfAllCols); pFillInfo->pResRow = &pFillSup->cur; pFillInfo->pLinearInfo->hasNext = false; } @@ -906,13 +897,18 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo* colDataSetNULL(pColData, index); continue; } - double* pDelta = taosArrayGet(pFillInfo->pLinearInfo->pDeltaVal, slotId); + SPoint* pEnd = taosArrayGet(pFillInfo->pLinearInfo->pEndPoints, slotId); double vCell = 0; - GET_TYPED_DATA(vCell, double, pCell->type, pCell->pData); - vCell += (*pDelta) * pFillInfo->pLinearInfo->winIndex; - int64_t result = 0; - SET_TYPED_DATA(&result, pCell->type, vCell); - colDataSetVal(pColData, index, (const char*)&result, false); + SPoint start = {0}; + start.key = pFillInfo->pResRow->key; + start.val = pCell->pData; + + SPoint cur = {0}; + cur.key = pFillInfo->current; + cur.val = taosMemoryCalloc(1, pCell->bytes); + taosGetLinearInterpolationVal(&cur, pCell->type, &start, pEnd, pCell->type); + colDataSetVal(pColData, index, (const char*)cur.val, false); + destroySPoint(&cur); } } pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit, @@ -953,8 +949,7 @@ static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter* if (pFillInfo->current > pFillInfo->end && pFillInfo->pLinearInfo->hasNext) { pFillInfo->pLinearInfo->hasNext = false; pFillInfo->pLinearInfo->winIndex = 0; - taosArrayClear(pFillInfo->pLinearInfo->pDeltaVal); - taosArrayAddAll(pFillInfo->pLinearInfo->pDeltaVal, pFillInfo->pLinearInfo->pNextDeltaVal); + taosArraySwap(pFillInfo->pLinearInfo->pEndPoints, pFillInfo->pLinearInfo->pNextEndPoints); pFillInfo->pResRow = &pFillSup->cur; setFillKeyInfo(pFillSup->cur.key, pFillInfo->pLinearInfo->nextEnd, &pFillSup->interval, pFillInfo); doStreamFillLinear(pFillSup, pFillInfo, pRes); @@ -1359,15 +1354,19 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock* pFillInfo->pLinearInfo = taosMemoryCalloc(1, sizeof(SStreamFillLinearInfo)); pFillInfo->pLinearInfo->hasNext = false; pFillInfo->pLinearInfo->nextEnd = INT64_MIN; - pFillInfo->pLinearInfo->pDeltaVal = NULL; - pFillInfo->pLinearInfo->pNextDeltaVal = NULL; + pFillInfo->pLinearInfo->pEndPoints = NULL; + pFillInfo->pLinearInfo->pNextEndPoints = NULL; if (pFillSup->type == TSDB_FILL_LINEAR) { - pFillInfo->pLinearInfo->pDeltaVal = taosArrayInit(pFillSup->numOfAllCols, sizeof(double)); - pFillInfo->pLinearInfo->pNextDeltaVal = taosArrayInit(pFillSup->numOfAllCols, sizeof(double)); + pFillInfo->pLinearInfo->pEndPoints = taosArrayInit(pFillSup->numOfAllCols, sizeof(SPoint)); + pFillInfo->pLinearInfo->pNextEndPoints = taosArrayInit(pFillSup->numOfAllCols, sizeof(SPoint)); for (int32_t i = 0; i < pFillSup->numOfAllCols; i++) { - double value = 0.0; - taosArrayPush(pFillInfo->pLinearInfo->pDeltaVal, &value); - taosArrayPush(pFillInfo->pLinearInfo->pNextDeltaVal, &value); + SColumnInfoData* pColData = taosArrayGet(pRes->pDataBlock, i); + SPoint value = {0}; + value.val = taosMemoryCalloc(1, pColData->info.bytes); + taosArrayPush(pFillInfo->pLinearInfo->pEndPoints, &value); + + value.val = taosMemoryCalloc(1, pColData->info.bytes); + taosArrayPush(pFillInfo->pLinearInfo->pNextEndPoints, &value); } } pFillInfo->pLinearInfo->winIndex = 0; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ce647014ae..5dff1abb97 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1803,6 +1803,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; + printDataBlock(pInfo->pUpdateRes, "recover update"); return pInfo->pUpdateRes; } break; case STREAM_SCAN_FROM_DATAREADER_RANGE: { @@ -1813,7 +1814,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version); pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; checkUpdateData(pInfo, true, pSDB, false); - // printDataBlock(pSDB, "stream scan update"); + printDataBlock(pSDB, "scan recover update"); calBlockTbName(pInfo, pSDB); return pSDB; } @@ -1838,6 +1839,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { } if (pInfo->pCreateTbRes->info.rows > 0) { pInfo->scanMode = STREAM_SCAN_FROM_RES; + printDataBlock(pInfo->pCreateTbRes, "recover createTbl"); return pInfo->pCreateTbRes; } qDebug("stream recover scan get block, rows %d", pInfo->pRecoverRes->info.rows); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index fef588a503..b01143841c 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2110,10 +2110,12 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3 } else if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) { int32_t code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]); if (code != TSDB_CODE_SUCCESS) { - qError("%s apply functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code)); - pTaskInfo->code = code; - T_LONG_JMP(pTaskInfo->env, code); + qError("%s apply combine functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code)); } + } else if (pDestCtx[k].fpSet.combine == NULL) { + char* funName = fmGetFuncName(pDestCtx[k].functionId); + qError("%s error, combine funcion for %s is not implemented", GET_TASKID(pTaskInfo), funName); + taosMemoryFreeClear(funName); } } } @@ -2769,6 +2771,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK); pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE); pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; + pInfo->ignoreExpiredDataSaved = false; pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); pInfo->delIndex = 0; pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); @@ -3587,6 +3590,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh pInfo->isFinal = false; pInfo->pPhyNode = pPhyNode; pInfo->ignoreExpiredData = pSessionNode->window.igExpired; + pInfo->ignoreExpiredDataSaved = false; pInfo->pUpdated = NULL; pInfo->pStUpdated = NULL; @@ -4112,6 +4116,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); pInfo->pChildren = NULL; pInfo->ignoreExpiredData = pStateNode->window.igExpired; + pInfo->ignoreExpiredDataSaved = false; pInfo->pUpdated = NULL; pInfo->pSeUpdated = NULL; @@ -4885,6 +4890,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->interval = interval; pInfo->twAggSup = twAggSupp; pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; + pInfo->ignoreExpiredDataSaved = false; pInfo->isFinal = false; SExprSupp* pSup = &pOperator->exprSupp; diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index dc884a0581..c3afc30a7b 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -235,6 +235,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); bool getGroupKeyFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t groupKeyFunction(SqlFunctionCtx* pCtx); int32_t groupKeyFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t groupKeyCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); #ifdef __cplusplus } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 0257b3d5e6..4fcc9ac1ad 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2480,7 +2480,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "irate", .type = FUNCTION_TYPE_IRATE, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .translateFunc = translateIrate, .getEnvFunc = getIrateFuncEnv, .initFunc = irateFuncSetup, @@ -3234,6 +3234,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .initFunc = functionSetup, .processFunc = groupKeyFunction, .finalizeFunc = groupKeyFinalize, + .combineFunc = groupKeyCombine, .pPartialFunc = "_group_key", .pMergeFunc = "_group_key" }, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 35cfa1753f..3c9f2fe8ca 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -5900,6 +5900,39 @@ int32_t groupKeyFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { return pResInfo->numOfRes; } +int32_t groupKeyCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { + SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); + SGroupKeyInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo); + + SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); + SGroupKeyInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); + + // escape rest of data blocks to avoid first entry to be overwritten. + if (pDBuf->hasResult) { + goto _group_key_over; + } + + if (pSBuf->isNull) { + pDBuf->isNull = true; + pDBuf->hasResult = true; + goto _group_key_over; + } + + if (IS_VAR_DATA_TYPE(pSourceCtx->resDataInfo.type)) { + memcpy(pDBuf->data, pSBuf->data, + (pSourceCtx->resDataInfo.type == TSDB_DATA_TYPE_JSON) ? getJsonValueLen(pSBuf->data) : varDataTLen(pSBuf->data)); + } else { + memcpy(pDBuf->data, pSBuf->data, pSourceCtx->resDataInfo.bytes); + } + + pDBuf->hasResult = true; + +_group_key_over: + + SET_VAL(pDResInfo, 1, 1); + return TSDB_CODE_SUCCESS; +} + int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) { int32_t numOfElems = 0; diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c index e3127fcd7b..94ab616dda 100644 --- a/source/libs/function/src/functionMgt.c +++ b/source/libs/function/src/functionMgt.c @@ -447,3 +447,10 @@ int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc return code; } + +char* fmGetFuncName(int32_t funcId) { + if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) { + return taosStrdup("invalid function"); + } + return taosStrdup(funcMgtBuiltins[funcId].name); +} diff --git a/source/libs/parser/src/parCalcConst.c b/source/libs/parser/src/parCalcConst.c index 33b8a686ae..a7f6d7cb3c 100644 --- a/source/libs/parser/src/parCalcConst.c +++ b/source/libs/parser/src/parCalcConst.c @@ -183,16 +183,18 @@ static int32_t calcConstProject(SNode* pProject, bool dual, SNode** pNew) { } else { code = scalarCalculateConstants(pProject, pNew); } - if (TSDB_CODE_SUCCESS == code && QUERY_NODE_VALUE == nodeType(*pNew) && NULL != pAssociation) { + if (TSDB_CODE_SUCCESS == code) { strcpy(((SExprNode*)*pNew)->aliasName, aliasName); - int32_t size = taosArrayGetSize(pAssociation); - for (int32_t i = 0; i < size; ++i) { - SNode** pCol = taosArrayGetP(pAssociation, i); - nodesDestroyNode(*pCol); - *pCol = nodesCloneNode(*pNew); - if (NULL == *pCol) { - code = TSDB_CODE_OUT_OF_MEMORY; - break; + if (QUERY_NODE_VALUE == nodeType(*pNew) && NULL != pAssociation) { + int32_t size = taosArrayGetSize(pAssociation); + for (int32_t i = 0; i < size; ++i) { + SNode** pCol = taosArrayGetP(pAssociation, i); + nodesDestroyNode(*pCol); + *pCol = nodesCloneNode(*pNew); + if (NULL == *pCol) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } } } } @@ -383,7 +385,8 @@ static int32_t calcConstSetOpProjections(SCalcConstContext* pCxt, SSetOperator* int32_t index = 0; SNode* pProj = NULL; WHERE_EACH(pProj, pSetOp->pProjectionList) { - if (subquery && notRefByOrderBy((SColumnNode*)pProj, pSetOp->pOrderByList) && isSetUselessCol(pSetOp, index, (SExprNode*)pProj)) { + if (subquery && notRefByOrderBy((SColumnNode*)pProj, pSetOp->pOrderByList) && + isSetUselessCol(pSetOp, index, (SExprNode*)pProj)) { ERASE_NODE(pSetOp->pProjectionList); eraseSetOpChildProjection(pSetOp, index); continue; diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 704f381afa..0ed32afbbc 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -566,53 +566,19 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList, return code; } -static int bindFileds(SBoundColInfo* pBoundInfo, SSchema* pSchema, TAOS_FIELD* fields, int numFields) { - bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool)); - if (NULL == pUseCols) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - pBoundInfo->numOfBound = 0; - - int16_t lastColIdx = -1; // last column found - int32_t code = TSDB_CODE_SUCCESS; +static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) { for (int i = 0; i < numFields; i++) { - SToken token; - token.z = fields[i].name; - token.n = strlen(fields[i].name); - - int16_t t = lastColIdx + 1; - int16_t index = insFindCol(&token, t, pBoundInfo->numOfCols, pSchema); - if (index < 0 && t > 0) { - index = insFindCol(&token, 0, t, pSchema); - } - if (index < 0) { - uError("can not find column name:%s", token.z); - code = TSDB_CODE_PAR_INVALID_COLUMN; - break; - } else if (pUseCols[index]) { - code = TSDB_CODE_PAR_INVALID_COLUMN; - uError("duplicated column name:%s", token.z); - break; - } else { - lastColIdx = index; - pUseCols[index] = true; - pBoundInfo->pColIndex[pBoundInfo->numOfBound] = index; - ++pBoundInfo->numOfBound; + if(strcmp(pSchema->name, fields[i].name) == 0){ + return true; } } - if (TSDB_CODE_SUCCESS == code && !pUseCols[0]) { - uError("primary timestamp column can not be null:"); - code = TSDB_CODE_PAR_INVALID_COLUMN; - } - - taosMemoryFree(pUseCols); - return code; + return false; } int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD* tFields, int numFields, bool needChangeLength) { + void* tmp = taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid)); STableDataCxt* pTableCxt = NULL; int ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid), pTableMeta, &pCreateTb, &pTableCxt, true); @@ -620,19 +586,14 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate uError("insGetTableDataCxt error"); goto end; } - if (tFields != NULL) { - ret = bindFileds(&pTableCxt->boundColsInfo, getTableColumnSchema(pTableMeta), tFields, numFields); + + if(tmp == NULL){ + ret = initTableColSubmitData(pTableCxt); if (ret != TSDB_CODE_SUCCESS) { - uError("bindFileds error"); + uError("initTableColSubmitData error"); goto end; } } - // no need to bind, because select * get all fields - ret = initTableColSubmitData(pTableCxt); - if (ret != TSDB_CODE_SUCCESS) { - uError("initTableColSubmitData error"); - goto end; - } char* p = (char*)data; // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column @@ -660,35 +621,43 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate SSchema* pSchema = getTableColumnSchema(pTableCxt->pMeta); SBoundColInfo* boundInfo = &pTableCxt->boundColsInfo; - if (boundInfo->numOfBound != numOfCols) { - uError("boundInfo->numOfBound:%d != numOfCols:%d", boundInfo->numOfBound, numOfCols); + if (tFields != NULL && numFields != numOfCols) { + uError("numFields:%d != numOfCols:%d", numFields, numOfCols); + ret = TSDB_CODE_INVALID_PARA; + goto end; + } + if (tFields != NULL && numFields > boundInfo->numOfBound) { + uError("numFields:%d > boundInfo->numOfBound:%d", numFields, boundInfo->numOfBound); ret = TSDB_CODE_INVALID_PARA; goto end; } for (int c = 0; c < boundInfo->numOfBound; ++c) { - SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]]; + SSchema* pColSchema = &pSchema[c]; SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c); + if(tFields == NULL || findFileds(pColSchema, tFields, numFields)){ + if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) { + uError("type or bytes not equal"); + ret = TSDB_CODE_INVALID_PARA; + goto end; + } - if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) { - uError("type or bytes not equal"); - ret = TSDB_CODE_INVALID_PARA; - goto end; - } + int8_t* offset = pStart; + if (IS_VAR_DATA_TYPE(pColSchema->type)) { + pStart += numOfRows * sizeof(int32_t); + } else { + pStart += BitmapLen(numOfRows); + } + char* pData = pStart; - int8_t* offset = pStart; - if (IS_VAR_DATA_TYPE(pColSchema->type)) { - pStart += numOfRows * sizeof(int32_t); - } else { - pStart += BitmapLen(numOfRows); - } - char* pData = pStart; - - tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData); - fields += sizeof(int8_t) + sizeof(int32_t); - if (needChangeLength) { - pStart += htonl(colLength[c]); - } else { - pStart += colLength[c]; + tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData); + fields += sizeof(int8_t) + sizeof(int32_t); + if (needChangeLength) { + pStart += htonl(colLength[c]); + } else { + pStart += colLength[c]; + } + }else{ + tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, NULL, NULL); } } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 68c93f8ca9..726790443e 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1342,8 +1342,8 @@ static bool isCountNotNullValue(SFunctionNode* pFunc) { // count(1) is rewritten as count(ts) for scannning optimization static int32_t rewriteCountNotNullValue(STranslateContext* pCxt, SFunctionNode* pCount) { SValueNode* pValue = (SValueNode*)nodesListGetNode(pCount->pParameterList, 0); - STableNode* pTable = NULL; - int32_t code = findTable(pCxt, NULL, &pTable); + STableNode* pTable = NULL; + int32_t code = findTable(pCxt, NULL, &pTable); if (TSDB_CODE_SUCCESS == code && QUERY_NODE_REAL_TABLE == nodeType(pTable)) { SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); if (NULL == pCol) { @@ -1424,7 +1424,7 @@ static int32_t translateAggFunc(STranslateContext* pCxt, SFunctionNode* pFunc) { } if (isCountNotNullValue(pFunc)) { return rewriteCountNotNullValue(pCxt, pFunc); - } + } if (isCountTbname(pFunc)) { return rewriteCountTbname(pCxt, pFunc); } @@ -5923,11 +5923,15 @@ static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, STabl return code; } +static bool isEventWindowQuery(SSelectStmt* pSelect) { + return NULL != pSelect->pWindow && QUERY_NODE_EVENT_WINDOW == nodeType(pSelect->pWindow); +} + static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt) { SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type || !pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList || - crossTableWithUdaf(pSelect)) { + crossTableWithUdaf(pSelect) || isEventWindowQuery(pSelect)) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); } if (NULL != pSelect->pSubtable && TSDB_DATA_TYPE_VARCHAR != ((SExprNode*)pSelect->pSubtable)->resType.type) { diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 793d05721e..563bc5e780 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -444,7 +444,7 @@ static int32_t getInsTagsTableTargetNameFromOp(int32_t acctId, SOperatorNode* pO } else if (QUERY_NODE_VALUE == nodeType(pOper->pRight)) { pVal = (SValueNode*)pOper->pRight; } - if (NULL == pCol || NULL == pVal) { + if (NULL == pCol || NULL == pVal || NULL == pVal->literal || 0 == strcmp(pVal->literal, "")) { return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index cb610ad6b5..25b2656365 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -152,8 +152,14 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { if (batchCnt >= batchSz) break; } if (taosArrayGetSize(pRes) == 0) { - taosArrayDestroy(pRes); - break; + if (finished) { + taosArrayDestroy(pRes); + qDebug("task %d finish recover exec task ", pTask->taskId); + break; + } else { + qDebug("task %d continue recover exec task ", pTask->taskId); + continue; + } } SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); if (qRes == NULL) { diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 6c1d3e17bb..b15bb519e7 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -486,3 +486,21 @@ void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* par taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param); } +void taosArraySwap(SArray* a, SArray* b) { + if (a == NULL || b == NULL) return; + size_t t = a->size; + a->size = b->size; + b->size = t; + + uint32_t cap = a->capacity; + a->capacity = b->capacity; + b->capacity = cap; + + uint32_t elem = a->elemSize; + a->elemSize = b->elemSize; + b->elemSize = elem; + + void* data = a->pData; + a->pData = b->pData; + b->pData = data; +} \ No newline at end of file diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index c15bd96903..f811d2f203 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -637,6 +637,8 @@ int32_t cfgLoadFromEnvVar(SConfig *pConfig) { int32_t code = 0; char **pEnv = environ; line[1023] = 0; + + if (pEnv == NULL) return 0; while (*pEnv != NULL) { name = value = value2 = value3 = NULL; olen = vlen = vlen2 = vlen3 = 0; diff --git a/source/util/src/version.c.in b/source/util/src/version.c.in index cb307b57fc..71998e3321 100644 --- a/source/util/src/version.c.in +++ b/source/util/src/version.c.in @@ -1,6 +1,7 @@ char version[64] = "${TD_VER_NUMBER}"; char compatible_version[12] = "${TD_VER_COMPATIBLE}"; char gitinfo[48] = "${TD_VER_GIT}"; +char gitinfoOfInternal[48] = "${TD_VER_GIT_INTERNAL}"; char buildinfo[64] = "Built at ${TD_VER_DATE}"; void libtaos_${TD_LIB_VER_NUMBER}_${TD_VER_OSTYPE}_${TD_VER_CPUTYPE}_${TD_VER_VERTYPE}() {}; diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 8c8d41d5b6..c2f8e75e79 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -93,6 +93,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/raw_block_interface_test.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3 @@ -871,6 +872,7 @@ ,,y,script,./test.sh -f tsim/query/emptyTsRange.sim ,,y,script,./test.sh -f tsim/query/partitionby.sim ,,y,script,./test.sh -f tsim/query/tableCount.sim +,,y,script,./test.sh -f tsim/query/nullColSma.sim ,,y,script,./test.sh -f tsim/qnode/basic1.sim ,,y,script,./test.sh -f tsim/snode/basic1.sim ,,y,script,./test.sh -f tsim/mnode/basic1.sim diff --git a/tests/parallel_test/run.sh b/tests/parallel_test/run.sh index f0880bdb04..de343752c6 100755 --- a/tests/parallel_test/run.sh +++ b/tests/parallel_test/run.sh @@ -167,8 +167,10 @@ function run_thread() { local case_build_san=`echo "$line"|cut -d, -f3` if [ "${case_build_san}" == "y" ]; then case_build_san="y" + DEBUGPATH="debugSan" elif [[ "${case_build_san}" == "n" ]] || [[ "${case_build_san}" == "" ]]; then case_build_san="n" + DEBUGPATH="debugNoSan" else usage exit 1 @@ -301,10 +303,10 @@ function run_thread() { if [ ! -z "$corefile" ]; then echo -e "\e[34m corefiles: $corefile \e[0m" local build_dir=$log_dir/build_${hosts[index]} - local remote_build_dir="${workdirs[index]}/TDengine/debug/build" - if [ $ent -ne 0 ]; then - remote_build_dir="${workdirs[index]}/TDinternal/debug/build" - fi + local remote_build_dir="${workdirs[index]}/{DEBUGPATH}/build" + # if [ $ent -ne 0 ]; then + # remote_build_dir="${workdirs[index]}/{DEBUGPATH}/build" + # fi mkdir $build_dir 2>/dev/null if [ $? -eq 0 ]; then # scp build binary diff --git a/tests/script/tsim/query/nullColSma.sim b/tests/script/tsim/query/nullColSma.sim new file mode 100644 index 0000000000..886274e7e5 --- /dev/null +++ b/tests/script/tsim/query/nullColSma.sim @@ -0,0 +1,139 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +$dbPrefix = m_in_db +$tbPrefix = m_in_tb +$mtPrefix = m_in_mt +$tbNum = 1 +$rowNum = 200 +$totalNum = 400 + +print =============== step1 +$i = 0 +$db = $dbPrefix . $i +$mt = $mtPrefix . $i + +sql drop database if exists $db +sql create database $db vgroups 1 maxrows 200 minrows 10; +sql use $db +sql create table $mt (ts timestamp, f1 int, f2 float) TAGS(tgcol int) + +print ====== start create child tables and insert data +$i = 0 +while $i < $tbNum + $tb = $tbPrefix . $i + sql create table $tb using $mt tags( $i ) + + $x = 0 + while $x < $rowNum + $cc = $x * 1 + $ms = 1601481600000 + $cc + + sql insert into $tb values ($ms , NULL , $x ) + $x = $x + 1 + endw + + $i = $i + 1 +endw + +$i = 1 +$tb = $tbPrefix . $i +sql create table $tb using $mt tags( $i ) + +$x = 0 +while $x < $rowNum + $cc = $x * 1 + $ms = 1601481600000 + $cc + + sql insert into $tb values ($ms , $x , NULL ) + $x = $x + 1 +endw + +sql flush database $db + +print =============== step2 +$i = 0 +$tb = $tbPrefix . $i +sql select max(f1) from $tb +if $rows != 1 then + return -1 +endi +if $data00 != NULL then + return -1 +endi + +$i = 1 +$tb = $tbPrefix . $i +sql select max(f2) from $tb +if $rows != 1 then + return -1 +endi +if $data00 != NULL then + return -1 +endi + +$rowNum = 10 + +print ====== insert more data +$i = 0 +while $i < $tbNum + $tb = $tbPrefix . $i + + $x = 0 + while $x < $rowNum + $cc = $x * 1 + $ms = 1601481700000 + $cc + + sql insert into $tb values ($ms , $x , $x ) + $x = $x + 1 + endw + + $i = $i + 1 +endw + +$i = 1 +$tb = $tbPrefix . $i +$x = 0 +while $x < $rowNum + $cc = $x * 1 + $ms = 1601481700000 + $cc + + sql insert into $tb values ($ms , $x , $x ) + $x = $x + 1 +endw + +sql flush database $db + +print =============== step3 +$i = 0 +$tb = $tbPrefix . $i +sql select max(f1) from $tb +if $rows != 1 then + return -1 +endi +if $data00 != 9 then + return -1 +endi + +$i = 1 +$tb = $tbPrefix . $i +sql select max(f2) from $tb +if $rows != 1 then + return -1 +endi +if $data00 != 9.00000 then + print $data00 + return -1 +endi + + +print =============== clear +#sql drop database $db +#sql select * from information_schema.ins_databases +#if $rows != 0 then +# return -1 +#endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/query/sys_tbname.sim b/tests/script/tsim/query/sys_tbname.sim index 7b3953129a..c676f2b1e0 100644 --- a/tests/script/tsim/query/sys_tbname.sim +++ b/tests/script/tsim/query/sys_tbname.sim @@ -51,6 +51,11 @@ if $data00 != @ins_stables@ then return -1 endi +sql select * from information_schema.ins_tables where table_name=''; +if $rows != 0 then + return -1 +endi + sql select tbname from information_schema.ins_tables; print $rows $data00 if $rows != 33 then diff --git a/tests/script/tsim/stream/distributeInterval0.sim b/tests/script/tsim/stream/distributeInterval0.sim index 1d58922928..de1c72f8dc 100644 --- a/tests/script/tsim/stream/distributeInterval0.sim +++ b/tests/script/tsim/stream/distributeInterval0.sim @@ -272,4 +272,122 @@ if $data12 != 2 then goto loop3 endi +print ===== step3 + +sql drop database if exists test4; +sql create database test4 vgroups 10; +sql use test4; +sql create stable st(ts timestamp,a int,b int,c varchar(250) ) tags(ta int,tb int,tc int); +sql create table aaa using st tags(1,1,1); +sql create table bbb using st tags(2,2,2); +sql create table ccc using st tags(3,2,2); +sql create table ddd using st tags(4,2,2); + + +sql create stream streams1 ignore expired 0 fill_history 0 watermark 3s into streamst subtable(c) as select _wstart, c , count(*) c1 from st partition by c interval(1s) ; + +sql insert into aaa values(1648791221001,2,2,"/a1/aa/aa"); +sql insert into bbb values(1648791221001,2,2,"/a1/aa/aa"); +sql insert into ccc values(1648791221001,2,2,"/a1/aa/aa"); +sql insert into ddd values(1648791221001,2,2,"/a1/aa/aa"); + +sql insert into aaa values(1648791222002,2,2,"/a2/aa/aa"); +sql insert into bbb values(1648791222002,2,2,"/a2/aa/aa"); +sql insert into ccc values(1648791222002,2,2,"/a2/aa/aa"); +sql insert into ddd values(1648791222002,2,2,"/a2/aa/aa"); + +sql insert into aaa values(1648791223003,2,2,"/a3/aa/aa"); +sql insert into bbb values(1648791223003,2,2,"/a3/aa/aa"); +sql insert into ccc values(1648791223003,2,2,"/a3/aa/aa"); +sql insert into ddd values(1648791223003,2,2,"/a3/aa/aa"); + +sql insert into aaa values(1648791224003,2,2,"/a4/aa/aa"); +sql insert into bbb values(1648791224003,2,2,"/a4/aa/aa"); +sql insert into ccc values(1648791224003,2,2,"/a4/aa/aa"); +sql insert into ddd values(1648791224003,2,2,"/a4/aa/aa"); + + +sql insert into aaa values(1648791225003,2,2,"/a5/aa/aa"); +sql insert into bbb values(1648791225003,2,2,"/a5/aa/aa"); +sql insert into ccc values(1648791225003,2,2,"/a5/aa/aa"); +sql insert into ddd values(1648791225003,2,2,"/a5/aa/aa"); + +sql insert into aaa values(1648791226003,2,2,"/a6/aa/aa"); +sql insert into bbb values(1648791226003,2,2,"/a6/aa/aa"); +sql insert into ccc values(1648791226003,2,2,"/a6/aa/aa"); +sql insert into ddd values(1648791226003,2,2,"/a6/aa/aa"); + +$loop_count = 0 + +loop4: +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sql select * from streamst; + +if $rows == 0 then + goto loop4 +endi + +sql delete from aaa where ts = 1648791223003 ; + +$loop_count = 0 + +loop5: +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sql select * from streamst; + +if $rows == 0 then + goto loop5 +endi + + +sql delete from ccc; + +$loop_count = 0 + +loop6: +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sql select * from streamst; + +if $rows == 0 then + goto loop6 +endi + +sql delete from ddd; + +$loop_count = 0 + +loop7: +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sql select * from streamst; + +if $rows == 0 then + goto loop7 +endi + +print ===== over + system sh/stop_dnodes.sh diff --git a/tests/system-test/7-tmq/raw_block_interface_test.py b/tests/system-test/7-tmq/raw_block_interface_test.py new file mode 100644 index 0000000000..1e89de1cce --- /dev/null +++ b/tests/system-test/7-tmq/raw_block_interface_test.py @@ -0,0 +1,54 @@ + +import taos +import sys +import time +import socket +import os +import threading + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + + def checkData(self): + tdSql.execute('use db_raw') + tdSql.query("select * from d1") + tdSql.checkRows(1) + tdSql.checkData(0, 1, 120) + + tdSql.query("select * from d2") + tdSql.checkRows(1) + tdSql.checkData(0, 1, None) + + return + + def check(self): + buildPath = tdCom.getBuildPath() + cmdStr = '%s/build/bin/write_raw_block_test'%(buildPath) + tdLog.info(cmdStr) + os.system(cmdStr) + + self.checkData() + + return + + def run(self): + tdSql.prepare() + self.check() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/tmqDropNtb-snapshot0.py b/tests/system-test/7-tmq/tmqDropNtb-snapshot0.py index 6c49fae299..c8bcdd6235 100644 --- a/tests/system-test/7-tmq/tmqDropNtb-snapshot0.py +++ b/tests/system-test/7-tmq/tmqDropNtb-snapshot0.py @@ -100,7 +100,7 @@ class TDTestCase: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) - if not ((totalConsumeRows >= expectrowcnt * 3/4) and (totalConsumeRows < expectrowcnt)): + if not ((totalConsumeRows >= expectrowcnt * 3/4) and (totalConsumeRows <= expectrowcnt)): tdLog.exit("tmq consume rows error with snapshot = 0!") tdLog.info("wait subscriptions exit ....") @@ -131,7 +131,7 @@ class TDTestCase: 'batchNum': 100, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'endTs': 0, - 'pollDelay': 10, + 'pollDelay': 20, 'showMsg': 1, 'showRow': 1, 'snapshot': 0} @@ -193,7 +193,7 @@ class TDTestCase: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) - if not ((totalConsumeRows >= expectrowcnt / 2 * (1 + 3/4)) and (totalConsumeRows < expectrowcnt)): + if not ((totalConsumeRows >= expectrowcnt / 2 * (1 + 3/4)) and (totalConsumeRows <= expectrowcnt)): tdLog.exit("tmq consume rows error with snapshot = 0!") tdLog.info("wait subscriptions exit ....") diff --git a/tools/shell/src/shellArguments.c b/tools/shell/src/shellArguments.c index c16769ccba..8111ccc0b1 100644 --- a/tools/shell/src/shellArguments.c +++ b/tools/shell/src/shellArguments.c @@ -18,17 +18,18 @@ #endif #include "shellInt.h" +#include "version.h" #ifndef CUS_NAME - char cusName[] = "TDengine"; +char cusName[] = "TDengine"; #endif #ifndef CUS_PROMPT - char cusPrompt[] = "taos"; +char cusPrompt[] = "taos"; #endif #ifndef CUS_EMAIL - char cusEmail[] = ""; +char cusEmail[] = ""; #endif #if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL) @@ -58,9 +59,9 @@ #define SHELL_VERSION "Print program version." #ifdef WEBSOCKET -#define SHELL_DSN "The dsn to use when connecting to cloud server." -#define SHELL_REST "Use restful mode when connecting." -#define SHELL_TIMEOUT "Set the timeout for websocket query in seconds, default is 30." +#define SHELL_DSN "The dsn to use when connecting to cloud server." +#define SHELL_REST "Use restful mode when connecting." +#define SHELL_TIMEOUT "Set the timeout for websocket query in seconds, default is 30." #endif static int32_t shellParseSingleOpt(int32_t key, char *arg); @@ -145,7 +146,7 @@ static void shellParseArgsUseArgp(int argc, char *argv[]) { #endif #ifndef ARGP_ERR_UNKNOWN - #define ARGP_ERR_UNKNOWN E2BIG +#define ARGP_ERR_UNKNOWN E2BIG #endif static int32_t shellParseSingleOpt(int32_t key, char *arg) { @@ -246,8 +247,8 @@ int32_t shellParseArgsWithoutArgp(int argc, char *argv[]) { SShellArgs *pArgs = &shell.args; for (int i = 1; i < argc; i++) { - if (strcmp(argv[i], "--help") == 0 || strcmp(argv[i], "--usage") == 0 - || strcmp(argv[i], "-?") == 0 || strcmp(argv[i], "/?") == 0) { + if (strcmp(argv[i], "--help") == 0 || strcmp(argv[i], "--usage") == 0 || strcmp(argv[i], "-?") == 0 || + strcmp(argv[i], "/?") == 0) { shellParseSingleOpt('?', NULL); return 0; } @@ -263,10 +264,8 @@ int32_t shellParseArgsWithoutArgp(int argc, char *argv[]) { return -1; } - if (key[1] == 'h' || key[1] == 'P' || key[1] == 'u' - || key[1] == 'a' || key[1] == 'c' || key[1] == 's' - || key[1] == 'f' || key[1] == 'd' || key[1] == 'w' - || key[1] == 'n' || key[1] == 'l' || key[1] == 'N' + if (key[1] == 'h' || key[1] == 'P' || key[1] == 'u' || key[1] == 'a' || key[1] == 'c' || key[1] == 's' || + key[1] == 'f' || key[1] == 'd' || key[1] == 'w' || key[1] == 'n' || key[1] == 'l' || key[1] == 'N' #ifdef WEBSOCKET || key[1] == 'E' || key[1] == 'T' #endif @@ -282,12 +281,10 @@ int32_t shellParseArgsWithoutArgp(int argc, char *argv[]) { } shellParseSingleOpt(key[1], val); i++; - } else if (key[1] == 'p' || key[1] == 'A' || key[1] == 'C' - || key[1] == 'r' || key[1] == 'k' - || key[1] == 't' || key[1] == 'V' - || key[1] == '?' || key[1] == 1 + } else if (key[1] == 'p' || key[1] == 'A' || key[1] == 'C' || key[1] == 'r' || key[1] == 'k' || key[1] == 't' || + key[1] == 'V' || key[1] == '?' || key[1] == 1 #ifdef WEBSOCKET - ||key[1] == 'R' + || key[1] == 'R' #endif ) { shellParseSingleOpt(key[1], NULL); @@ -420,9 +417,15 @@ int32_t shellParseArgs(int32_t argc, char *argv[]) { sprintf(promptContinueFormat, "%%%zus> ", strlen(cusPrompt)); sprintf(shell.info.promptContinue, promptContinueFormat, " "); shell.info.promptSize = strlen(shell.info.promptHeader); +#ifdef TD_ENTERPRISE + snprintf(shell.info.programVersion, sizeof(shell.info.programVersion), + "version: %s compatible_version: %s\ngitinfo: %s\ngitinfoOfInternal: %s\nbuildInfo: %s", version, + compatible_version, gitinfo, gitinfoOfInternal, buildinfo); +#else snprintf(shell.info.programVersion, sizeof(shell.info.programVersion), "version: %s compatible_version: %s\ngitinfo: %s\nbuildInfo: %s", version, compatible_version, gitinfo, buildinfo); +#endif #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) shell.info.osname = "Windows"; diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 3080b15b8c..f9602aaa4d 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -541,7 +541,7 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t printf("%*" PRIu64, width, *((uint64_t *)val)); break; case TSDB_DATA_TYPE_FLOAT: - printf("%*.5f", width, GET_FLOAT_VAL(val)); + printf("%*ef", width, GET_FLOAT_VAL(val)); break; case TSDB_DATA_TYPE_DOUBLE: n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", width, GET_DOUBLE_VAL(val)); diff --git a/utils/test/c/CMakeLists.txt b/utils/test/c/CMakeLists.txt index 6ca266c555..eda9c70f15 100644 --- a/utils/test/c/CMakeLists.txt +++ b/utils/test/c/CMakeLists.txt @@ -3,6 +3,7 @@ add_dependencies(tmq_demo taos) add_executable(tmq_sim tmqSim.c) add_executable(create_table createTable.c) add_executable(tmq_taosx_ci tmq_taosx_ci.c) +add_executable(write_raw_block_test write_raw_block_test.c) add_executable(sml_test sml_test.c) add_executable(get_db_name_test get_db_name_test.c) target_link_libraries( @@ -34,6 +35,14 @@ target_link_libraries( PUBLIC os ) +target_link_libraries( + write_raw_block_test + PUBLIC taos_static + PUBLIC util + PUBLIC common + PUBLIC os +) + target_link_libraries( sml_test PUBLIC taos_static diff --git a/utils/test/c/write_raw_block_test.c b/utils/test/c/write_raw_block_test.c new file mode 100644 index 0000000000..8e5dd62752 --- /dev/null +++ b/utils/test/c/write_raw_block_test.c @@ -0,0 +1,128 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include "taos.h" +#include "types.h" + +int buildStable(TAOS* pConn, TAOS_RES* pRes) { + pRes = taos_query(pConn, + "CREATE STABLE `meters` (`ts` TIMESTAMP, `current` INT, `voltage` INT, `phase` FLOAT) TAGS " + "(`groupid` INT, `location` VARCHAR(16))"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table meters, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table d0 using meters tags(1, 'San Francisco')"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table d0, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into d0 (ts, current) values (now, 120)"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into table d0, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table d1 using meters tags(2, 'San Francisco')"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table d1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table d2 using meters tags(3, 'San Francisco')"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table d1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + return 0; +} + +int32_t init_env() { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + if (pConn == NULL) { + return -1; + } + + TAOS_RES* pRes = taos_query(pConn, "drop database if exists db_raw"); + if (taos_errno(pRes) != 0) { + printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create database if not exists db_raw vgroups 2"); + if (taos_errno(pRes) != 0) { + printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "use db_raw"); + if (taos_errno(pRes) != 0) { + printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + buildStable(pConn, pRes); + + pRes = taos_query(pConn, "select * from d0"); + if (taos_errno(pRes) != 0) { + printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes)); + return -1; + } + void *data = NULL; + int32_t numOfRows = 0; + int error_code = taos_fetch_raw_block(pRes, &numOfRows, &data); + ASSERT(error_code == 0); + ASSERT(numOfRows == 1); + + taos_write_raw_block(pConn, numOfRows, data, "d1"); + taos_free_result(pRes); + + pRes = taos_query(pConn, "select ts,phase from d0"); + if (taos_errno(pRes) != 0) { + printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes)); + return -1; + } + error_code = taos_fetch_raw_block(pRes, &numOfRows, &data); + ASSERT(error_code == 0); + ASSERT(numOfRows == 1); + + int numFields = taos_num_fields(pRes); + TAOS_FIELD *fields = taos_fetch_fields(pRes); + taos_write_raw_block_with_fields(pConn, numOfRows, data, "d2", fields, numFields); + taos_free_result(pRes); + + taos_close(pConn); + return 0; +} + +int main(int argc, char* argv[]) { + if (init_env() < 0) { + return -1; + } +}