Merge branch 'main' into fix/liaohj

This commit is contained in:
Haojun Liao 2023-03-28 15:07:32 +08:00
commit a883e57cf2
45 changed files with 900 additions and 355 deletions

View File

@ -10,6 +10,8 @@ if (NOT DEFINED TD_SOURCE_DIR)
set( TD_SOURCE_DIR ${PROJECT_SOURCE_DIR} ) set( TD_SOURCE_DIR ${PROJECT_SOURCE_DIR} )
endif() endif()
SET(TD_COMMUNITY_DIR ${PROJECT_SOURCE_DIR})
set(TD_SUPPORT_DIR "${TD_SOURCE_DIR}/cmake") set(TD_SUPPORT_DIR "${TD_SOURCE_DIR}/cmake")
set(TD_CONTRIB_DIR "${TD_SOURCE_DIR}/contrib") set(TD_CONTRIB_DIR "${TD_SOURCE_DIR}/contrib")

View File

@ -16,7 +16,7 @@ find_program(HAVE_GIT NAMES git)
IF (DEFINED GITINFO) IF (DEFINED GITINFO)
SET(TD_VER_GIT ${GITINFO}) SET(TD_VER_GIT ${GITINFO})
ELSEIF (HAVE_GIT) ELSEIF (HAVE_GIT)
execute_process(COMMAND git log -1 --format=%H WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} OUTPUT_VARIABLE GIT_COMMITID) execute_process(COMMAND git log -1 --format=%H WORKING_DIRECTORY ${TD_COMMUNITY_DIR} OUTPUT_VARIABLE GIT_COMMITID)
#message(STATUS "git log result:${GIT_COMMITID}") #message(STATUS "git log result:${GIT_COMMITID}")
IF (GIT_COMMITID) IF (GIT_COMMITID)
string (REGEX REPLACE "[\n\t\r]" "" GIT_COMMITID ${GIT_COMMITID}) string (REGEX REPLACE "[\n\t\r]" "" GIT_COMMITID ${GIT_COMMITID})
@ -30,6 +30,23 @@ ELSE ()
SET(TD_VER_GIT "no git commit id") SET(TD_VER_GIT "no git commit id")
ENDIF () ENDIF ()
IF (DEFINED GITINFOI)
SET(TD_VER_GIT_INTERNAL ${GITINFOI})
ELSEIF (HAVE_GIT)
execute_process(COMMAND git log -1 --format=%H WORKING_DIRECTORY ${PROJECT_SOURCE_DIR} OUTPUT_VARIABLE GIT_COMMITID)
message(STATUS "git log result:${GIT_COMMITID}")
IF (GIT_COMMITID)
string (REGEX REPLACE "[\n\t\r]" "" GIT_COMMITID ${GIT_COMMITID})
SET(TD_VER_GIT_INTERNAL ${GIT_COMMITID})
ELSE ()
message(STATUS "not a git repository")
SET(TD_VER_GIT "no git commit id")
ENDIF ()
ELSE ()
message(STATUS "no git cmd")
SET(TD_VER_GIT_INTERNAL "no git commit id")
ENDIF ()
IF (DEFINED VERDATE) IF (DEFINED VERDATE)
SET(TD_VER_DATE ${VERDATE}) SET(TD_VER_DATE ${VERDATE})
ELSE () ELSE ()

View File

@ -2,7 +2,7 @@
# taos-tools # taos-tools
ExternalProject_Add(taos-tools ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG d11f210 GIT_TAG 2864326
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE

View File

@ -241,6 +241,7 @@ int32_t fmGetUdafExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmSetInvertFunc(int32_t funcId, SFuncExecFuncs* pFpSet); int32_t fmSetInvertFunc(int32_t funcId, SFuncExecFuncs* pFpSet);
int32_t fmSetNormalFunc(int32_t funcId, SFuncExecFuncs* pFpSet); int32_t fmSetNormalFunc(int32_t funcId, SFuncExecFuncs* pFpSet);
bool fmIsInvertible(int32_t funcId); bool fmIsInvertible(int32_t funcId);
char* fmGetFuncName(int32_t funcId);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -48,7 +48,7 @@ extern "C" {
#define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500 #define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500
#define SYNC_SNAP_RESEND_MS 1000 * 60 #define SYNC_SNAP_RESEND_MS 1000 * 60
#define SYNC_VND_COMMIT_MIN_MS 1000 #define SYNC_VND_COMMIT_MIN_MS 3000
#define SYNC_MAX_BATCH_SIZE 1 #define SYNC_MAX_BATCH_SIZE 1
#define SYNC_INDEX_BEGIN 0 #define SYNC_INDEX_BEGIN 0

View File

@ -208,6 +208,8 @@ void taosArrayDestroyP(SArray* pArray, FDelete fp);
void taosArrayDestroyEx(SArray* pArray, FDelete fp); void taosArrayDestroyEx(SArray* pArray, FDelete fp);
void taosArraySwap(SArray* a, SArray* b);
/** /**
* sort the array * sort the array
* @param pArray * @param pArray

View File

@ -23,6 +23,7 @@ extern "C" {
extern char version[]; extern char version[];
extern char compatible_version[]; extern char compatible_version[];
extern char gitinfo[]; extern char gitinfo[];
extern char gitinfoOfInternal[];
extern char buildinfo[]; extern char buildinfo[];
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -183,7 +183,7 @@ static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
} }
string = cJSON_PrintUnformatted(json); string = cJSON_PrintUnformatted(json);
end: end:
cJSON_Delete(json); cJSON_Delete(json);
tFreeSMAltertbReq(&req); tFreeSMAltertbReq(&req);
return string; return string;
@ -205,7 +205,7 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) {
} }
string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE); string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE);
uDebug("processCreateStb %s", string); uDebug("processCreateStb %s", string);
_err: _err:
tDecoderClear(&coder); tDecoderClear(&coder);
return string; return string;
} }
@ -227,7 +227,7 @@ static char* processAlterStb(SMqMetaRsp* metaRsp) {
string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen); string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen);
uDebug("processAlterStb %s", string); uDebug("processAlterStb %s", string);
_err: _err:
tDecoderClear(&coder); tDecoderClear(&coder);
return string; return string;
} }
@ -309,7 +309,7 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
cJSON_AddItemToArray(tags, tag); cJSON_AddItemToArray(tags, tag);
} }
end: end:
cJSON_AddItemToObject(json, "tags", tags); cJSON_AddItemToObject(json, "tags", tags);
taosArrayDestroy(pTagVals); taosArrayDestroy(pTagVals);
} }
@ -368,7 +368,7 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) {
uDebug("processCreateTable :%s", string); uDebug("processCreateTable :%s", string);
} }
_exit: _exit:
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq; pCreateReq = req.pReqs + iReq;
taosMemoryFreeClear(pCreateReq->comment); taosMemoryFreeClear(pCreateReq->comment);
@ -408,7 +408,7 @@ static char* processAutoCreateTable(STaosxRsp* rsp) {
} }
string = buildCreateCTableJson(pCreateReq, rsp->createTableNum); string = buildCreateCTableJson(pCreateReq, rsp->createTableNum);
uDebug("processAutoCreateTable :%s", string); uDebug("processAutoCreateTable :%s", string);
_exit: _exit:
for (int i = 0; i < rsp->createTableNum; i++) { for (int i = 0; i < rsp->createTableNum; i++) {
tDecoderClear(&decoder[i]); tDecoderClear(&decoder[i]);
taosMemoryFreeClear(pCreateReq[i].comment); taosMemoryFreeClear(pCreateReq[i].comment);
@ -535,7 +535,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
string = cJSON_PrintUnformatted(json); string = cJSON_PrintUnformatted(json);
uDebug("processAlterTable :%s", string); uDebug("processAlterTable :%s", string);
_exit: _exit:
cJSON_Delete(json); cJSON_Delete(json);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return string; return string;
@ -569,7 +569,7 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) {
string = cJSON_PrintUnformatted(json); string = cJSON_PrintUnformatted(json);
uDebug("processDropSTable :%s", string); uDebug("processDropSTable :%s", string);
_exit: _exit:
cJSON_Delete(json); cJSON_Delete(json);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return string; return string;
@ -609,7 +609,7 @@ static char* processDeleteTable(SMqMetaRsp* metaRsp) {
string = cJSON_PrintUnformatted(json); string = cJSON_PrintUnformatted(json);
uDebug("processDeleteTable :%s", string); uDebug("processDeleteTable :%s", string);
_exit: _exit:
cJSON_Delete(json); cJSON_Delete(json);
tDecoderClear(&coder); tDecoderClear(&coder);
return string; return string;
@ -652,7 +652,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) {
string = cJSON_PrintUnformatted(json); string = cJSON_PrintUnformatted(json);
uDebug("processDropTable :%s", string); uDebug("processDropTable :%s", string);
_exit: _exit:
cJSON_Delete(json); cJSON_Delete(json);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return string; return string;
@ -742,7 +742,7 @@ static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
code = pRequest->code; code = pRequest->code;
taosMemoryFree(pCmdMsg.pMsg); taosMemoryFree(pCmdMsg.pMsg);
end: end:
destroyRequest(pRequest); destroyRequest(pRequest);
tFreeSMCreateStbReq(&pReq); tFreeSMCreateStbReq(&pReq);
tDecoderClear(&coder); tDecoderClear(&coder);
@ -839,7 +839,7 @@ static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
code = pRequest->code; code = pRequest->code;
taosMemoryFree(pCmdMsg.pMsg); taosMemoryFree(pCmdMsg.pMsg);
end: end:
destroyRequest(pRequest); destroyRequest(pRequest);
tDecoderClear(&coder); tDecoderClear(&coder);
return code; return code;
@ -901,9 +901,9 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch); taosHashSetFreeFp(pVgroupHashmap, destroyCreateTbReqBatch);
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.requestObjRefId = pRequest->self, .requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName)); pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
// loop to create table // loop to create table
@ -987,7 +987,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
code = pRequest->code; code = pRequest->code;
end: end:
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq; pCreateReq = req.pReqs + iReq;
taosMemoryFreeClear(pCreateReq->comment); taosMemoryFreeClear(pCreateReq->comment);
@ -1058,9 +1058,9 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch); taosHashSetFreeFp(pVgroupHashmap, destroyDropTbReqBatch);
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.requestObjRefId = pRequest->self, .requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName)); pRequest->tableList = taosArrayInit(req.nReqs, sizeof(SName));
// loop to create table // loop to create table
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
@ -1132,7 +1132,7 @@ static int32_t taosDropTable(TAOS* taos, void* meta, int32_t metaLen) {
} }
code = pRequest->code; code = pRequest->code;
end: end:
taosHashCleanup(pVgroupHashmap); taosHashCleanup(pVgroupHashmap);
destroyRequest(pRequest); destroyRequest(pRequest);
tDecoderClear(&coder); tDecoderClear(&coder);
@ -1201,7 +1201,7 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) {
} }
taos_free_result(res); taos_free_result(res);
end: end:
tDecoderClear(&coder); tDecoderClear(&coder);
return code; return code;
} }
@ -1249,9 +1249,9 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
} }
SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter, SRequestConnInfo conn = {.pTrans = pTscObj->pAppInfo->pTransporter,
.requestId = pRequest->requestId, .requestId = pRequest->requestId,
.requestObjRefId = pRequest->self, .requestObjRefId = pRequest->self,
.mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)}; .mgmtEps = getEpSet_s(&pTscObj->pAppInfo->mgmtEp)};
SVgroupInfo pInfo = {0}; SVgroupInfo pInfo = {0};
SName pName = {0}; SName pName = {0};
@ -1311,7 +1311,7 @@ static int32_t taosAlterTable(TAOS* taos, void* meta, int32_t metaLen) {
code = handleAlterTbExecRes(pRes->res, pCatalog); code = handleAlterTbExecRes(pRes->res, pCatalog);
} }
} }
end: end:
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
if (pVgData) taosMemoryFreeClear(pVgData->pData); if (pVgData) taosMemoryFreeClear(pVgData->pData);
taosMemoryFreeClear(pVgData); taosMemoryFreeClear(pVgData);
@ -1399,7 +1399,7 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch
launchQueryImpl(pRequest, pQuery, true, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code; code = pRequest->code;
end: end:
taosMemoryFreeClear(pTableMeta); taosMemoryFreeClear(pTableMeta);
qDestroyQuery(pQuery); qDestroyQuery(pQuery);
destroyRequest(pRequest); destroyRequest(pRequest);
@ -1481,7 +1481,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
launchQueryImpl(pRequest, pQuery, true, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code; code = pRequest->code;
end: end:
taosMemoryFreeClear(pTableMeta); taosMemoryFreeClear(pTableMeta);
qDestroyQuery(pQuery); qDestroyQuery(pQuery);
destroyRequest(pRequest); destroyRequest(pRequest);
@ -1601,6 +1601,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
uError("WriteRaw:rawBlockBindData failed"); uError("WriteRaw:rawBlockBindData failed");
goto end; goto end;
} }
taosMemoryFreeClear(pTableMeta);
} }
code = smlBuildOutput(pQuery, pVgHash); code = smlBuildOutput(pQuery, pVgHash);
@ -1612,7 +1613,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
launchQueryImpl(pRequest, pQuery, true, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code; code = pRequest->code;
end: end:
tDeleteSMqDataRsp(&rspObj.rsp); tDeleteSMqDataRsp(&rspObj.rsp);
tDecoderClear(&decoder); tDecoderClear(&decoder);
qDestroyQuery(pQuery); qDestroyQuery(pQuery);
@ -1707,6 +1708,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
tDecoderInit(&decoderTmp, *dataTmp, *lenTmp); tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) { if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) {
tDecoderClear(&decoderTmp); tDecoderClear(&decoderTmp);
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
uError("WriteRaw: tDecodeSVCreateTbReq error"); uError("WriteRaw: tDecodeSVCreateTbReq error");
code = TSDB_CODE_TMQ_INVALID_MSG; code = TSDB_CODE_TMQ_INVALID_MSG;
goto end; goto end;
@ -1715,15 +1717,19 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
if (pCreateReq.type != TSDB_CHILD_TABLE) { if (pCreateReq.type != TSDB_CHILD_TABLE) {
uError("WriteRaw:pCreateReq.type != TSDB_CHILD_TABLE. table name: %s", tbName); uError("WriteRaw:pCreateReq.type != TSDB_CHILD_TABLE. table name: %s", tbName);
code = TSDB_CODE_TSC_INVALID_VALUE; code = TSDB_CODE_TSC_INVALID_VALUE;
tDecoderClear(&decoderTmp);
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
goto end; goto end;
} }
if (strcmp(tbName, pCreateReq.name) == 0) { if (strcmp(tbName, pCreateReq.name) == 0) {
cloneSVreateTbReq(&pCreateReq, &pCreateReqDst); cloneSVreateTbReq(&pCreateReq, &pCreateReqDst);
// pCreateReqDst->ctb.suid = processSuid(pCreateReqDst->ctb.suid, pRequest->pDb); // pCreateReqDst->ctb.suid = processSuid(pCreateReqDst->ctb.suid, pRequest->pDb);
tDecoderClear(&decoderTmp); tDecoderClear(&decoderTmp);
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
break; break;
} }
tDecoderClear(&decoderTmp); tDecoderClear(&decoderTmp);
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
} }
SVgroupInfo vg; SVgroupInfo vg;
@ -1774,6 +1780,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
goto end; goto end;
} }
pCreateReqDst = NULL; pCreateReqDst = NULL;
taosMemoryFreeClear(pTableMeta);
} }
code = smlBuildOutput(pQuery, pVgHash); code = smlBuildOutput(pQuery, pVgHash);
@ -1785,7 +1792,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
launchQueryImpl(pRequest, pQuery, true, NULL); launchQueryImpl(pRequest, pQuery, true, NULL);
code = pRequest->code; code = pRequest->code;
end: end:
tDeleteSTaosxRsp(&rspObj.rsp); tDeleteSTaosxRsp(&rspObj.rsp);
tDecoderClear(&decoder); tDecoderClear(&decoder);
qDestroyQuery(pQuery); qDestroyQuery(pQuery);

View File

@ -2439,6 +2439,12 @@ _exit:
int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, int32_t nRows, char *lengthOrbitmap, int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t bytes, int32_t nRows, char *lengthOrbitmap,
char *data) { char *data) {
int32_t code = 0; int32_t code = 0;
if(data == NULL){
for (int32_t i = 0; i < nRows; ++i) {
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NONE](pColData, NULL, 0);
}
goto _exit;
}
if (IS_VAR_DATA_TYPE(type)) { // var-length data type if (IS_VAR_DATA_TYPE(type)) { // var-length data type
for (int32_t i = 0; i < nRows; ++i) { for (int32_t i = 0; i < nRows; ++i) {

View File

@ -18,6 +18,7 @@
#include "mnode.h" #include "mnode.h"
#include "tconfig.h" #include "tconfig.h"
#include "tglobal.h" #include "tglobal.h"
#include "version.h"
// clang-format off // clang-format off
#define DM_APOLLO_URL "The apollo string to use when configuring the server, such as: -a 'jsonFile:./tests/cfg.json', cfg.json text can be '{\"fqdn\":\"td1\"}'." #define DM_APOLLO_URL "The apollo string to use when configuring the server, such as: -a 'jsonFile:./tests/cfg.json', cfg.json text can be '{\"fqdn\":\"td1\"}'."
@ -76,28 +77,28 @@ void dmLogCrash(int signum, void *sigInfo, void *context) {
taosIgnSignal(SIGINT); taosIgnSignal(SIGINT);
taosIgnSignal(SIGBREAK); taosIgnSignal(SIGBREAK);
#ifndef WINDOWS #ifndef WINDOWS
taosIgnSignal(SIGBUS); taosIgnSignal(SIGBUS);
#endif #endif
taosIgnSignal(SIGABRT); taosIgnSignal(SIGABRT);
taosIgnSignal(SIGFPE); taosIgnSignal(SIGFPE);
taosIgnSignal(SIGSEGV); taosIgnSignal(SIGSEGV);
char *pMsg = NULL; char *pMsg = NULL;
const char *flags = "UTL FATAL "; const char *flags = "UTL FATAL ";
ELogLevel level = DEBUG_FATAL; ELogLevel level = DEBUG_FATAL;
int32_t dflag = 255; int32_t dflag = 255;
int64_t msgLen= -1; int64_t msgLen = -1;
if (tsEnableCrashReport) { if (tsEnableCrashReport) {
if (taosGenCrashJsonMsg(signum, &pMsg, dmGetClusterId(), global.startTime)) { if (taosGenCrashJsonMsg(signum, &pMsg, dmGetClusterId(), global.startTime)) {
taosPrintLog(flags, level, dflag, "failed to generate crash json msg"); taosPrintLog(flags, level, dflag, "failed to generate crash json msg");
goto _return; goto _return;
} else { } else {
msgLen = strlen(pMsg); msgLen = strlen(pMsg);
} }
} }
_return: _return:
taosLogCrashInfo("taosd", pMsg, msgLen, signum, sigInfo); taosLogCrashInfo("taosd", pMsg, msgLen, signum, sigInfo);
@ -123,7 +124,7 @@ static void dmSetSignalHandle() {
#ifndef WINDOWS #ifndef WINDOWS
taosSetSignal(SIGBUS, dmLogCrash); taosSetSignal(SIGBUS, dmLogCrash);
#endif #endif
taosSetSignal(SIGABRT, dmLogCrash); taosSetSignal(SIGABRT, dmLogCrash);
taosSetSignal(SIGFPE, dmLogCrash); taosSetSignal(SIGFPE, dmLogCrash);
taosSetSignal(SIGSEGV, dmLogCrash); taosSetSignal(SIGSEGV, dmLogCrash);
@ -134,7 +135,7 @@ static int32_t dmParseArgs(int32_t argc, char const *argv[]) {
int32_t cmdEnvIndex = 0; int32_t cmdEnvIndex = 0;
if (argc < 2) return 0; if (argc < 2) return 0;
global.envCmd = taosMemoryMalloc((argc - 1) * sizeof(char *)); global.envCmd = taosMemoryMalloc((argc - 1) * sizeof(char *));
memset(global.envCmd, 0, (argc - 1) * sizeof(char *)); memset(global.envCmd, 0, (argc - 1) * sizeof(char *));
for (int32_t i = 1; i < argc; ++i) { for (int32_t i = 1; i < argc; ++i) {
@ -203,6 +204,9 @@ static void dmPrintVersion() {
#endif #endif
printf("%s version: %s compatible_version: %s\n", releaseName, version, compatible_version); printf("%s version: %s compatible_version: %s\n", releaseName, version, compatible_version);
printf("gitinfo: %s\n", gitinfo); printf("gitinfo: %s\n", gitinfo);
#ifdef TD_ENTERPRISE
printf("gitinfoOfInternal: %s\n", gitinfoOfInternal);
#endif
printf("buildInfo: %s\n", buildinfo); printf("buildInfo: %s\n", buildinfo);
} }
@ -284,7 +288,7 @@ int mainWindows(int argc, char **argv) {
printf("failed to init memory dbg, error:%s\n", tstrerror(code)); printf("failed to init memory dbg, error:%s\n", tstrerror(code));
return code; return code;
} }
tsAsyncLog = false; tsAsyncLog = false;
printf("memory dbg enabled\n"); printf("memory dbg enabled\n");
} }
#endif #endif

View File

@ -139,8 +139,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum, uint64_t reqId); int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum, uint64_t reqId);
// tqExec // tqExec
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp); int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows);
// int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp);
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision); int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type); int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type);
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry); int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry);

View File

@ -529,123 +529,133 @@ static int32_t doPollDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* p
// this is a normal subscribe requirement // this is a normal subscribe requirement
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset); return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
} else { // for taosX }
// todo handle the case where re-balance occurs.
SMqMetaRsp metaRsp = {0};
STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, pRequest);
if (offset.type != TMQ_OFFSET__LOG) { // todo handle the case where re-balance occurs.
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &offset) < 0) { // for taosx
return -1; SMqMetaRsp metaRsp = {0};
} STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, pRequest);
if (metaRsp.metaRspLen > 0) { if (offset.type != TMQ_OFFSET__LOG) {
code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp); if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &offset) < 0) {
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 return -1;
",ts:%" PRId64,
consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.ts);
taosMemoryFree(metaRsp.metaRsp);
tDeleteSTaosxRsp(&taosxRsp);
return code;
}
if (taosxRsp.blockNum > 0) {
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp);
return code;
} else {
offset = taosxRsp.rspOffset;
}
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
",version:%" PRId64,
consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,
taosxRsp.rspOffset.version);
} }
if (offset.type == TMQ_OFFSET__LOG) { if (metaRsp.metaRspLen > 0) {
int64_t fetchVer = offset.version + 1; code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64
if (pCkHead == NULL) { ",ts:%" PRId64,
tDeleteSTaosxRsp(&taosxRsp); consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
terrno = TSDB_CODE_OUT_OF_MEMORY; metaRsp.rspOffset.ts);
return -1; taosMemoryFree(metaRsp.metaRsp);
tDeleteSTaosxRsp(&taosxRsp);
return code;
}
if (taosxRsp.blockNum > 0) {
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp);
return code;
} else {
offset = taosxRsp.rspOffset;
}
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
",version:%" PRId64,
consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,
taosxRsp.rspOffset.version);
}
if (offset.type == TMQ_OFFSET__LOG) {
int64_t fetchVer = offset.version + 1;
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) {
tDeleteSTaosxRsp(&taosxRsp);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
walSetReaderCapacity(pHandle->pWalReader, 2048);
int totalRows = 0;
while (1) {
// todo refactor: this is not correct.
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
if (savedEpoch > pRequest->epoch) {
tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64
", found new consumer epoch %d, discard req epoch %d",
consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch);
break;
} }
walSetReaderCapacity(pHandle->pWalReader, 2048); if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead);
return code;
}
while (1) { SWalCont* pHead = &pCkHead->head;
// todo refactor: this is not correct. tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId,
int32_t savedEpoch = atomic_load_32(&pHandle->epoch); pRequest->epoch, vgId, fetchVer, pHead->msgType);
if (savedEpoch > pRequest->epoch) {
tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64
", found new consumer epoch %d, discard req epoch %d",
consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch);
break;
}
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { // process meta
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); if (pHead->msgType != TDMT_VND_SUBMIT) {
if(totalRows > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp); tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead); taosMemoryFreeClear(pCkHead);
return code; return code;
} }
SWalCont* pHead = &pCkHead->head; tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer);
consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType); metaRsp.resMsgType = pHead->msgType;
metaRsp.metaRspLen = pHead->bodyLen;
if (pHead->msgType == TDMT_VND_SUBMIT) { metaRsp.metaRsp = pHead->body;
SPackedData submit = { if (tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp) < 0) {
.msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)), code = -1;
.msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
.ver = pHead->version,
};
if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) {
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, vgId,
pRequest->subKey);
return -1;
}
if (taosxRsp.blockNum > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead);
return code;
} else {
fetchVer++;
}
} else {
/*A(pHandle->fetchMeta);*/
/*A(IS_META_MSG(pHead->msgType));*/
tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType));
tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer);
metaRsp.resMsgType = pHead->msgType;
metaRsp.metaRspLen = pHead->bodyLen;
metaRsp.metaRsp = pHead->body;
if (tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp) < 0) {
code = -1;
taosMemoryFreeClear(pCkHead);
tDeleteSTaosxRsp(&taosxRsp);
return code;
}
code = 0;
taosMemoryFreeClear(pCkHead); taosMemoryFreeClear(pCkHead);
tDeleteSTaosxRsp(&taosxRsp); tDeleteSTaosxRsp(&taosxRsp);
return code; return code;
} }
code = 0;
taosMemoryFreeClear(pCkHead);
tDeleteSTaosxRsp(&taosxRsp);
return code;
}
// process data
SPackedData submit = {
.msgStr = POINTER_SHIFT(pHead->body, sizeof(SSubmitReq2Msg)),
.msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
.ver = pHead->version,
};
if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows) < 0) {
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, vgId,
pRequest->subKey);
taosMemoryFreeClear(pCkHead);
tDeleteSTaosxRsp(&taosxRsp);
return -1;
}
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead);
return code;
} else {
fetchVer++;
} }
} }
tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead);
return 0;
} }
tDeleteSTaosxRsp(&taosxRsp);
taosMemoryFreeClear(pCkHead);
return 0;
} }
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {

View File

@ -249,23 +249,15 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
return 0; return 0;
} }
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp) { int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows) {
STqExecHandle* pExec = &pHandle->execHandle; STqExecHandle* pExec = &pHandle->execHandle;
/*A(pExec->subType != TOPIC_SUB_TYPE__COLUMN);*/
SArray* pBlocks = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pBlocks = taosArrayInit(0, sizeof(SSDataBlock));
SArray* pSchemas = taosArrayInit(0, sizeof(void*)); SArray* pSchemas = taosArrayInit(0, sizeof(void*));
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
STqReader* pReader = pExec->pExecReader; STqReader* pReader = pExec->pExecReader;
/*tqReaderSetDataMsg(pReader, pReq, 0);*/
tqReaderSetSubmitReq2(pReader, submit.msgStr, submit.msgLen, submit.ver); tqReaderSetSubmitReq2(pReader, submit.msgStr, submit.msgLen, submit.ver);
while (tqNextDataBlock2(pReader)) { while (tqNextDataBlock2(pReader)) {
/*SSDataBlock block = {0};*/
/*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/
/*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/
/*}*/
taosArrayClear(pBlocks); taosArrayClear(pBlocks);
taosArrayClear(pSchemas); taosArrayClear(pSchemas);
SSubmitTbData* pSubmitTbDataRet = NULL; SSubmitTbData* pSubmitTbDataRet = NULL;
@ -273,7 +265,6 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;
} }
if (pRsp->withTbName) { if (pRsp->withTbName) {
/*int64_t uid = pExec->pExecReader->msgIter.uid;*/
int64_t uid = pExec->pExecReader->lastBlkUid; int64_t uid = pExec->pExecReader->lastBlkUid;
if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) { if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) {
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
@ -315,6 +306,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
SSDataBlock* pBlock = taosArrayGet(pBlocks, i); SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock), tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
pTq->pVnode->config.tsdbCfg.precision); pTq->pVnode->config.tsdbCfg.precision);
totalRows += pBlock->info.rows;
blockDataFreeRes(pBlock); blockDataFreeRes(pBlock);
SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i); SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
taosArrayPush(pRsp->blockSchema, &pSW); taosArrayPush(pRsp->blockSchema, &pSW);
@ -323,13 +315,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
} }
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) { } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
STqReader* pReader = pExec->pExecReader; STqReader* pReader = pExec->pExecReader;
/*tqReaderSetDataMsg(pReader, pReq, 0);*/
tqReaderSetSubmitReq2(pReader, submit.msgStr, submit.msgLen, submit.ver); tqReaderSetSubmitReq2(pReader, submit.msgStr, submit.msgLen, submit.ver);
while (tqNextDataBlockFilterOut2(pReader, pExec->execDb.pFilterOutTbUid)) { while (tqNextDataBlockFilterOut2(pReader, pExec->execDb.pFilterOutTbUid)) {
/*SSDataBlock block = {0};*/
/*if (tqRetrieveDataBlock(&block, pReader) < 0) {*/
/*if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue;*/
/*}*/
taosArrayClear(pBlocks); taosArrayClear(pBlocks);
taosArrayClear(pSchemas); taosArrayClear(pSchemas);
SSubmitTbData* pSubmitTbDataRet = NULL; SSubmitTbData* pSubmitTbDataRet = NULL;
@ -374,15 +361,11 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
tEncoderClear(&encoder); tEncoderClear(&encoder);
} }
/*tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock),*/
/*pTq->pVnode->config.tsdbCfg.precision);*/
/*blockDataFreeRes(&block);*/
/*tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp);*/
/*pRsp->blockNum++;*/
for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) { for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) {
SSDataBlock* pBlock = taosArrayGet(pBlocks, i); SSDataBlock* pBlock = taosArrayGet(pBlocks, i);
tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock), tqAddBlockDataToRsp(pBlock, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlock->pDataBlock),
pTq->pVnode->config.tsdbCfg.precision); pTq->pVnode->config.tsdbCfg.precision);
*totalRows += pBlock->info.rows;
blockDataFreeRes(pBlock); blockDataFreeRes(pBlock);
SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i); SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
taosArrayPush(pRsp->blockSchema, &pSW); taosArrayPush(pRsp->blockSchema, &pSW);
@ -392,9 +375,5 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR
} }
taosArrayDestroy(pBlocks); taosArrayDestroy(pBlocks);
taosArrayDestroy(pSchemas); taosArrayDestroy(pSchemas);
// if (pRsp->blockNum == 0) {
// return -1;
// }
return 0; return 0;
} }

View File

@ -4512,6 +4512,7 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_
int32_t i = 0, j = 0; int32_t i = 0, j = 0;
int32_t size = (int32_t)taosArrayGetSize(pSup->pColAgg); int32_t size = (int32_t)taosArrayGetSize(pSup->pColAgg);
taosArrayInsert(pSup->pColAgg, 0, pTsAgg); taosArrayInsert(pSup->pColAgg, 0, pTsAgg);
size++;
while (j < numOfCols && i < size) { while (j < numOfCols && i < size) {
SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i); SColumnDataAgg* pAgg = taosArrayGet(pSup->pColAgg, i);
@ -4524,10 +4525,21 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_
if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) { if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows}; SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
taosArrayInsert(pSup->pColAgg, i, &nullColAgg); taosArrayInsert(pSup->pColAgg, i, &nullColAgg);
i += 1;
size++;
} }
j += 1; j += 1;
} }
} }
while (j < numOfCols) {
if (pSup->colId[j] != PRIMARYKEY_TIMESTAMP_COL_ID) {
SColumnDataAgg nullColAgg = {.colId = pSup->colId[j], .numOfNull = numOfRows};
taosArrayInsert(pSup->pColAgg, i, &nullColAgg);
i += 1;
}
j++;
}
} }
int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave) { int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave) {
@ -4607,8 +4619,8 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock,
} else if (pAgg->colId < pSup->colId[j]) { } else if (pAgg->colId < pSup->colId[j]) {
i += 1; i += 1;
} else if (pSup->colId[j] < pAgg->colId) { } else if (pSup->colId[j] < pAgg->colId) {
// ASSERT(pSup->colId[j] == PRIMARYKEY_TIMESTAMP_COL_ID); pResBlock->pBlockAgg[pSup->slotId[j]] = NULL;
pResBlock->pBlockAgg[pSup->slotId[j]] = &pSup->tsColAgg; *allHave = false;
j += 1; j += 1;
} }
} }
@ -5001,4 +5013,4 @@ void tsdbUntakeReadSnap(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proacti
void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) { void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) {
taosMemoryFreeClear(pReader->idStr); taosMemoryFreeClear(pReader->idStr);
pReader->idStr = taosStrdup(idstr); pReader->idStr = taosStrdup(idstr);
} }

View File

@ -156,13 +156,10 @@ int vnodeShouldCommit(SVnode *pVnode) {
bool needCommit = false; bool needCommit = false;
taosThreadMutexLock(&pVnode->mutex); taosThreadMutexLock(&pVnode->mutex);
if (!pVnode->inUse || !diskAvail) { if (pVnode->inUse && diskAvail) {
goto _out; needCommit =
((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs));
} }
needCommit =
(((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) ||
(pVnode->inUse->size > 0 && pSched->commitMs + pSched->maxWaitMs < nowMs));
_out:
taosThreadMutexUnlock(&pVnode->mutex); taosThreadMutexUnlock(&pVnode->mutex);
return needCommit; return needCommit;
} }

View File

@ -559,6 +559,7 @@ typedef struct SStreamIntervalOperatorInfo {
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
bool invertible; bool invertible;
bool ignoreExpiredData; bool ignoreExpiredData;
bool ignoreExpiredDataSaved;
SArray* pDelWins; // SWinRes SArray* pDelWins; // SWinRes
int32_t delIndex; int32_t delIndex;
SSDataBlock* pDelRes; SSDataBlock* pDelRes;
@ -620,6 +621,7 @@ typedef struct SStreamSessionAggOperatorInfo {
SPhysiNode* pPhyNode; // create new child SPhysiNode* pPhyNode; // create new child
bool isFinal; bool isFinal;
bool ignoreExpiredData; bool ignoreExpiredData;
bool ignoreExpiredDataSaved;
SArray* pUpdated; SArray* pUpdated;
SSHashObj* pStUpdated; SSHashObj* pStUpdated;
} SStreamSessionAggOperatorInfo; } SStreamSessionAggOperatorInfo;
@ -637,6 +639,7 @@ typedef struct SStreamStateAggOperatorInfo {
void* pDelIterator; void* pDelIterator;
SArray* pChildren; // cache for children's result; SArray* pChildren; // cache for children's result;
bool ignoreExpiredData; bool ignoreExpiredData;
bool ignoreExpiredDataSaved;
SArray* pUpdated; SArray* pUpdated;
SSHashObj* pSeUpdated; SSHashObj* pSeUpdated;
} SStreamStateAggOperatorInfo; } SStreamStateAggOperatorInfo;

View File

@ -92,8 +92,8 @@ typedef struct SResultRowData {
typedef struct SStreamFillLinearInfo { typedef struct SStreamFillLinearInfo {
TSKEY nextEnd; TSKEY nextEnd;
SArray* pDeltaVal; // double. value for Fill(linear). SArray* pEndPoints;
SArray* pNextDeltaVal; // double. value for Fill(linear). SArray* pNextEndPoints;
int64_t winIndex; int64_t winIndex;
bool hasNext; bool hasNext;
} SStreamFillLinearInfo; } SStreamFillLinearInfo;

View File

@ -2042,7 +2042,7 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
void printDataBlock(SSDataBlock* pBlock, const char* flag) { void printDataBlock(SSDataBlock* pBlock, const char* flag) {
if (!pBlock || pBlock->info.rows == 0) { if (!pBlock || pBlock->info.rows == 0) {
qDebug("===stream===printDataBlock: Block is Null or Empty"); qDebug("===stream===%s: Block is Null or Empty", flag);
return; return;
} }
char* pBuf = NULL; char* pBuf = NULL;

View File

@ -905,7 +905,8 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark; pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pInfo->twAggSup.deleteMark = INT64_MAX; pInfo->twAggSup.deleteMark = INT64_MAX;
pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
pInfo->ignoreExpiredData = false;
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
@ -921,6 +922,8 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark; pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pInfo->twAggSup.deleteMark = INT64_MAX; pInfo->twAggSup.deleteMark = INT64_MAX;
pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
pInfo->ignoreExpiredData = false;
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info; SStreamStateAggOperatorInfo* pInfo = pOperator->info;
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE || ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
@ -934,6 +937,8 @@ int32_t qStreamSetParamForRecover(qTaskInfo_t tinfo) {
pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark; pInfo->twAggSup.deleteMarkSaved = pInfo->twAggSup.deleteMark;
pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; pInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE;
pInfo->twAggSup.deleteMark = INT64_MAX; pInfo->twAggSup.deleteMark = INT64_MAX;
pInfo->ignoreExpiredDataSaved = pInfo->ignoreExpiredData;
pInfo->ignoreExpiredData = false;
} }
// iterate operator tree // iterate operator tree
@ -961,35 +966,23 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) { pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info; SStreamIntervalOperatorInfo* pInfo = pOperator->info;
/*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);*/
/*ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);*/
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved; pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved; pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
/*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||*/ pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
/*pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);*/
qInfo("restore stream param for interval: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); qInfo("restore stream param for interval: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
/*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);*/
/*ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);*/
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved; pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved; pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
/*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||*/ pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
/*pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);*/
qInfo("restore stream param for session: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); qInfo("restore stream param for session: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info; SStreamStateAggOperatorInfo* pInfo = pOperator->info;
/*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);*/
/*ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);*/
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved; pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved; pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
/*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||*/ pInfo->ignoreExpiredData = pInfo->ignoreExpiredDataSaved;
/*pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);*/
qInfo("restore stream param for state: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark); qInfo("restore stream param for state: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
} }

View File

@ -447,9 +447,14 @@ void* destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) {
return NULL; return NULL;
} }
void destroySPoint(void* ptr) {
SPoint* point = (SPoint*) ptr;
taosMemoryFreeClear(point->val);
}
void* destroyStreamFillLinearInfo(SStreamFillLinearInfo* pFillLinear) { void* destroyStreamFillLinearInfo(SStreamFillLinearInfo* pFillLinear) {
taosArrayDestroy(pFillLinear->pDeltaVal); taosArrayDestroyEx(pFillLinear->pEndPoints, destroySPoint);
taosArrayDestroy(pFillLinear->pNextDeltaVal); taosArrayDestroyEx(pFillLinear->pNextEndPoints, destroySPoint);
taosMemoryFree(pFillLinear); taosMemoryFree(pFillLinear);
return NULL; return NULL;
} }
@ -611,19 +616,15 @@ static void calcDeltaData(SSDataBlock* pBlock, int32_t rowId, SResultRowData* pR
} }
} }
static void calcRowDeltaData(SResultRowData* pStartRow, SResultRowData* pEndRow, SArray* pDelta, SFillColInfo* pFillCol, static void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol,
int32_t numOfCol, int32_t winCount) { int32_t numOfCol) {
for (int32_t i = 0; i < numOfCol; i++) { for (int32_t i = 0; i < numOfCol; i++) {
if (!pFillCol[i].notFillCol) { if (!pFillCol[i].notFillCol) {
int32_t slotId = GET_DEST_SLOT_ID(pFillCol + i); int32_t slotId = GET_DEST_SLOT_ID(pFillCol + i);
SResultCellData* pSCell = getResultCell(pStartRow, slotId);
double start = 0.0;
GET_TYPED_DATA(start, double, pSCell->type, pSCell->pData);
SResultCellData* pECell = getResultCell(pEndRow, slotId); SResultCellData* pECell = getResultCell(pEndRow, slotId);
double end = 0.0; SPoint* pPoint = taosArrayGet(pEndPoins, slotId);
GET_TYPED_DATA(end, double, pECell->type, pECell->pData); pPoint->key = pEndRow->key;
double delta = (end - start) / winCount; memcpy(pPoint->val, pECell->pData, pECell->bytes);
taosArraySet(pDelta, slotId, &delta);
} }
} }
} }
@ -674,10 +675,8 @@ void setDeleteFillValueInfo(TSKEY start, TSKEY end, SStreamFillSupporter* pFillS
setFillKeyInfo(pFillSup->prev.key, pFillSup->next.key, &pFillSup->interval, pFillInfo); setFillKeyInfo(pFillSup->prev.key, pFillSup->next.key, &pFillSup->interval, pFillInfo);
pFillInfo->pLinearInfo->hasNext = false; pFillInfo->pLinearInfo->hasNext = false;
pFillInfo->pLinearInfo->nextEnd = INT64_MIN; pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
int32_t numOfWins = taosTimeCountInterval(pFillSup->prev.key, pFillSup->next.key, pFillSup->interval.sliding, calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
pFillSup->interval.slidingUnit, pFillSup->interval.precision); pFillSup->numOfAllCols);
calcRowDeltaData(&pFillSup->prev, &pFillSup->next, pFillInfo->pLinearInfo->pDeltaVal, pFillSup->pAllColInfo,
pFillSup->numOfAllCols, numOfWins);
pFillInfo->pResRow = &pFillSup->prev; pFillInfo->pResRow = &pFillSup->prev;
pFillInfo->pLinearInfo->winIndex = 0; pFillInfo->pLinearInfo->winIndex = 0;
} break; } break;
@ -780,25 +779,19 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS
setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo); setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_MID; pFillInfo->pos = FILL_POS_MID;
pFillInfo->pLinearInfo->nextEnd = nextWKey; pFillInfo->pLinearInfo->nextEnd = nextWKey;
int32_t numOfWins = taosTimeCountInterval(prevWKey, ts, pFillSup->interval.sliding, calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
pFillSup->interval.slidingUnit, pFillSup->interval.precision); pFillSup->numOfAllCols);
calcRowDeltaData(&pFillSup->prev, &pFillSup->cur, pFillInfo->pLinearInfo->pDeltaVal, pFillSup->pAllColInfo,
pFillSup->numOfAllCols, numOfWins);
pFillInfo->pResRow = &pFillSup->prev; pFillInfo->pResRow = &pFillSup->prev;
numOfWins = taosTimeCountInterval(ts, nextWKey, pFillSup->interval.sliding, pFillSup->interval.slidingUnit, calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pNextEndPoints, pFillSup->pAllColInfo,
pFillSup->interval.precision); pFillSup->numOfAllCols);
calcRowDeltaData(&pFillSup->cur, &pFillSup->next, pFillInfo->pLinearInfo->pNextDeltaVal, pFillSup->pAllColInfo,
pFillSup->numOfAllCols, numOfWins);
pFillInfo->pLinearInfo->hasNext = true; pFillInfo->pLinearInfo->hasNext = true;
} else if (hasPrevWindow(pFillSup)) { } else if (hasPrevWindow(pFillSup)) {
setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo); setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_END; pFillInfo->pos = FILL_POS_END;
pFillInfo->pLinearInfo->nextEnd = INT64_MIN; pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
int32_t numOfWins = taosTimeCountInterval(prevWKey, ts, pFillSup->interval.sliding, calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
pFillSup->interval.slidingUnit, pFillSup->interval.precision); pFillSup->numOfAllCols);
calcRowDeltaData(&pFillSup->prev, &pFillSup->cur, pFillInfo->pLinearInfo->pDeltaVal, pFillSup->pAllColInfo,
pFillSup->numOfAllCols, numOfWins);
pFillInfo->pResRow = &pFillSup->prev; pFillInfo->pResRow = &pFillSup->prev;
pFillInfo->pLinearInfo->hasNext = false; pFillInfo->pLinearInfo->hasNext = false;
} else { } else {
@ -806,10 +799,8 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS
setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo); setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
pFillInfo->pos = FILL_POS_START; pFillInfo->pos = FILL_POS_START;
pFillInfo->pLinearInfo->nextEnd = INT64_MIN; pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
int32_t numOfWins = taosTimeCountInterval(ts, nextWKey, pFillSup->interval.sliding, calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
pFillSup->interval.slidingUnit, pFillSup->interval.precision); pFillSup->numOfAllCols);
calcRowDeltaData(&pFillSup->cur, &pFillSup->next, pFillInfo->pLinearInfo->pDeltaVal, pFillSup->pAllColInfo,
pFillSup->numOfAllCols, numOfWins);
pFillInfo->pResRow = &pFillSup->cur; pFillInfo->pResRow = &pFillSup->cur;
pFillInfo->pLinearInfo->hasNext = false; pFillInfo->pLinearInfo->hasNext = false;
} }
@ -906,13 +897,18 @@ static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo*
colDataSetNULL(pColData, index); colDataSetNULL(pColData, index);
continue; continue;
} }
double* pDelta = taosArrayGet(pFillInfo->pLinearInfo->pDeltaVal, slotId); SPoint* pEnd = taosArrayGet(pFillInfo->pLinearInfo->pEndPoints, slotId);
double vCell = 0; double vCell = 0;
GET_TYPED_DATA(vCell, double, pCell->type, pCell->pData); SPoint start = {0};
vCell += (*pDelta) * pFillInfo->pLinearInfo->winIndex; start.key = pFillInfo->pResRow->key;
int64_t result = 0; start.val = pCell->pData;
SET_TYPED_DATA(&result, pCell->type, vCell);
colDataSetVal(pColData, index, (const char*)&result, false); SPoint cur = {0};
cur.key = pFillInfo->current;
cur.val = taosMemoryCalloc(1, pCell->bytes);
taosGetLinearInterpolationVal(&cur, pCell->type, &start, pEnd, pCell->type);
colDataSetVal(pColData, index, (const char*)cur.val, false);
destroySPoint(&cur);
} }
} }
pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit, pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
@ -953,8 +949,7 @@ static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter*
if (pFillInfo->current > pFillInfo->end && pFillInfo->pLinearInfo->hasNext) { if (pFillInfo->current > pFillInfo->end && pFillInfo->pLinearInfo->hasNext) {
pFillInfo->pLinearInfo->hasNext = false; pFillInfo->pLinearInfo->hasNext = false;
pFillInfo->pLinearInfo->winIndex = 0; pFillInfo->pLinearInfo->winIndex = 0;
taosArrayClear(pFillInfo->pLinearInfo->pDeltaVal); taosArraySwap(pFillInfo->pLinearInfo->pEndPoints, pFillInfo->pLinearInfo->pNextEndPoints);
taosArrayAddAll(pFillInfo->pLinearInfo->pDeltaVal, pFillInfo->pLinearInfo->pNextDeltaVal);
pFillInfo->pResRow = &pFillSup->cur; pFillInfo->pResRow = &pFillSup->cur;
setFillKeyInfo(pFillSup->cur.key, pFillInfo->pLinearInfo->nextEnd, &pFillSup->interval, pFillInfo); setFillKeyInfo(pFillSup->cur.key, pFillInfo->pLinearInfo->nextEnd, &pFillSup->interval, pFillInfo);
doStreamFillLinear(pFillSup, pFillInfo, pRes); doStreamFillLinear(pFillSup, pFillInfo, pRes);
@ -1359,15 +1354,19 @@ SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock*
pFillInfo->pLinearInfo = taosMemoryCalloc(1, sizeof(SStreamFillLinearInfo)); pFillInfo->pLinearInfo = taosMemoryCalloc(1, sizeof(SStreamFillLinearInfo));
pFillInfo->pLinearInfo->hasNext = false; pFillInfo->pLinearInfo->hasNext = false;
pFillInfo->pLinearInfo->nextEnd = INT64_MIN; pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
pFillInfo->pLinearInfo->pDeltaVal = NULL; pFillInfo->pLinearInfo->pEndPoints = NULL;
pFillInfo->pLinearInfo->pNextDeltaVal = NULL; pFillInfo->pLinearInfo->pNextEndPoints = NULL;
if (pFillSup->type == TSDB_FILL_LINEAR) { if (pFillSup->type == TSDB_FILL_LINEAR) {
pFillInfo->pLinearInfo->pDeltaVal = taosArrayInit(pFillSup->numOfAllCols, sizeof(double)); pFillInfo->pLinearInfo->pEndPoints = taosArrayInit(pFillSup->numOfAllCols, sizeof(SPoint));
pFillInfo->pLinearInfo->pNextDeltaVal = taosArrayInit(pFillSup->numOfAllCols, sizeof(double)); pFillInfo->pLinearInfo->pNextEndPoints = taosArrayInit(pFillSup->numOfAllCols, sizeof(SPoint));
for (int32_t i = 0; i < pFillSup->numOfAllCols; i++) { for (int32_t i = 0; i < pFillSup->numOfAllCols; i++) {
double value = 0.0; SColumnInfoData* pColData = taosArrayGet(pRes->pDataBlock, i);
taosArrayPush(pFillInfo->pLinearInfo->pDeltaVal, &value); SPoint value = {0};
taosArrayPush(pFillInfo->pLinearInfo->pNextDeltaVal, &value); value.val = taosMemoryCalloc(1, pColData->info.bytes);
taosArrayPush(pFillInfo->pLinearInfo->pEndPoints, &value);
value.val = taosMemoryCalloc(1, pColData->info.bytes);
taosArrayPush(pFillInfo->pLinearInfo->pNextEndPoints, &value);
} }
} }
pFillInfo->pLinearInfo->winIndex = 0; pFillInfo->pLinearInfo->winIndex = 0;

View File

@ -1803,6 +1803,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes); generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex); prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
printDataBlock(pInfo->pUpdateRes, "recover update");
return pInfo->pUpdateRes; return pInfo->pUpdateRes;
} break; } break;
case STREAM_SCAN_FROM_DATAREADER_RANGE: { case STREAM_SCAN_FROM_DATAREADER_RANGE: {
@ -1813,7 +1814,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version); updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
checkUpdateData(pInfo, true, pSDB, false); checkUpdateData(pInfo, true, pSDB, false);
// printDataBlock(pSDB, "stream scan update"); printDataBlock(pSDB, "scan recover update");
calBlockTbName(pInfo, pSDB); calBlockTbName(pInfo, pSDB);
return pSDB; return pSDB;
} }
@ -1838,6 +1839,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
} }
if (pInfo->pCreateTbRes->info.rows > 0) { if (pInfo->pCreateTbRes->info.rows > 0) {
pInfo->scanMode = STREAM_SCAN_FROM_RES; pInfo->scanMode = STREAM_SCAN_FROM_RES;
printDataBlock(pInfo->pCreateTbRes, "recover createTbl");
return pInfo->pCreateTbRes; return pInfo->pCreateTbRes;
} }
qDebug("stream recover scan get block, rows %d", pInfo->pRecoverRes->info.rows); qDebug("stream recover scan get block, rows %d", pInfo->pRecoverRes->info.rows);

View File

@ -2110,10 +2110,12 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
} else if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) { } else if (functionNeedToExecute(&pDestCtx[k]) && pDestCtx[k].fpSet.combine != NULL) {
int32_t code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]); int32_t code = pDestCtx[k].fpSet.combine(&pDestCtx[k], &pSourceCtx[k]);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s apply functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code)); qError("%s apply combine functions error, code: %s", GET_TASKID(pTaskInfo), tstrerror(code));
pTaskInfo->code = code;
T_LONG_JMP(pTaskInfo->env, code);
} }
} else if (pDestCtx[k].fpSet.combine == NULL) {
char* funName = fmGetFuncName(pDestCtx[k].functionId);
qError("%s error, combine funcion for %s is not implemented", GET_TASKID(pTaskInfo), funName);
taosMemoryFreeClear(funName);
} }
} }
} }
@ -2769,6 +2771,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK); pInfo->pPullDataMap = taosHashInit(64, hashFn, false, HASH_NO_LOCK);
pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE); pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE);
pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
pInfo->ignoreExpiredDataSaved = false;
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
pInfo->delIndex = 0; pInfo->delIndex = 0;
pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey)); pInfo->pDelWins = taosArrayInit(4, sizeof(SWinKey));
@ -3587,6 +3590,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pInfo->isFinal = false; pInfo->isFinal = false;
pInfo->pPhyNode = pPhyNode; pInfo->pPhyNode = pPhyNode;
pInfo->ignoreExpiredData = pSessionNode->window.igExpired; pInfo->ignoreExpiredData = pSessionNode->window.igExpired;
pInfo->ignoreExpiredDataSaved = false;
pInfo->pUpdated = NULL; pInfo->pUpdated = NULL;
pInfo->pStUpdated = NULL; pInfo->pStUpdated = NULL;
@ -4112,6 +4116,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT);
pInfo->pChildren = NULL; pInfo->pChildren = NULL;
pInfo->ignoreExpiredData = pStateNode->window.igExpired; pInfo->ignoreExpiredData = pStateNode->window.igExpired;
pInfo->ignoreExpiredDataSaved = false;
pInfo->pUpdated = NULL; pInfo->pUpdated = NULL;
pInfo->pSeUpdated = NULL; pInfo->pSeUpdated = NULL;
@ -4885,6 +4890,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->interval = interval; pInfo->interval = interval;
pInfo->twAggSup = twAggSupp; pInfo->twAggSup = twAggSupp;
pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired; pInfo->ignoreExpiredData = pIntervalPhyNode->window.igExpired;
pInfo->ignoreExpiredDataSaved = false;
pInfo->isFinal = false; pInfo->isFinal = false;
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;

View File

@ -235,6 +235,7 @@ int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getGroupKeyFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getGroupKeyFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t groupKeyFunction(SqlFunctionCtx* pCtx); int32_t groupKeyFunction(SqlFunctionCtx* pCtx);
int32_t groupKeyFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t groupKeyFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
int32_t groupKeyCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -2480,7 +2480,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{ {
.name = "irate", .name = "irate",
.type = FUNCTION_TYPE_IRATE, .type = FUNCTION_TYPE_IRATE,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_FORBID_STREAM_FUNC,
.translateFunc = translateIrate, .translateFunc = translateIrate,
.getEnvFunc = getIrateFuncEnv, .getEnvFunc = getIrateFuncEnv,
.initFunc = irateFuncSetup, .initFunc = irateFuncSetup,
@ -3234,6 +3234,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.initFunc = functionSetup, .initFunc = functionSetup,
.processFunc = groupKeyFunction, .processFunc = groupKeyFunction,
.finalizeFunc = groupKeyFinalize, .finalizeFunc = groupKeyFinalize,
.combineFunc = groupKeyCombine,
.pPartialFunc = "_group_key", .pPartialFunc = "_group_key",
.pMergeFunc = "_group_key" .pMergeFunc = "_group_key"
}, },

View File

@ -5900,6 +5900,39 @@ int32_t groupKeyFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
return pResInfo->numOfRes; return pResInfo->numOfRes;
} }
int32_t groupKeyCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx);
SGroupKeyInfo* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo);
SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx);
SGroupKeyInfo* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo);
// escape rest of data blocks to avoid first entry to be overwritten.
if (pDBuf->hasResult) {
goto _group_key_over;
}
if (pSBuf->isNull) {
pDBuf->isNull = true;
pDBuf->hasResult = true;
goto _group_key_over;
}
if (IS_VAR_DATA_TYPE(pSourceCtx->resDataInfo.type)) {
memcpy(pDBuf->data, pSBuf->data,
(pSourceCtx->resDataInfo.type == TSDB_DATA_TYPE_JSON) ? getJsonValueLen(pSBuf->data) : varDataTLen(pSBuf->data));
} else {
memcpy(pDBuf->data, pSBuf->data, pSourceCtx->resDataInfo.bytes);
}
pDBuf->hasResult = true;
_group_key_over:
SET_VAL(pDResInfo, 1, 1);
return TSDB_CODE_SUCCESS;
}
int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) { int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElems = 0; int32_t numOfElems = 0;

View File

@ -447,3 +447,10 @@ int32_t fmGetDistMethod(const SFunctionNode* pFunc, SFunctionNode** pPartialFunc
return code; return code;
} }
char* fmGetFuncName(int32_t funcId) {
if (fmIsUserDefinedFunc(funcId) || funcId < 0 || funcId >= funcMgtBuiltinsNum) {
return taosStrdup("invalid function");
}
return taosStrdup(funcMgtBuiltins[funcId].name);
}

View File

@ -183,16 +183,18 @@ static int32_t calcConstProject(SNode* pProject, bool dual, SNode** pNew) {
} else { } else {
code = scalarCalculateConstants(pProject, pNew); code = scalarCalculateConstants(pProject, pNew);
} }
if (TSDB_CODE_SUCCESS == code && QUERY_NODE_VALUE == nodeType(*pNew) && NULL != pAssociation) { if (TSDB_CODE_SUCCESS == code) {
strcpy(((SExprNode*)*pNew)->aliasName, aliasName); strcpy(((SExprNode*)*pNew)->aliasName, aliasName);
int32_t size = taosArrayGetSize(pAssociation); if (QUERY_NODE_VALUE == nodeType(*pNew) && NULL != pAssociation) {
for (int32_t i = 0; i < size; ++i) { int32_t size = taosArrayGetSize(pAssociation);
SNode** pCol = taosArrayGetP(pAssociation, i); for (int32_t i = 0; i < size; ++i) {
nodesDestroyNode(*pCol); SNode** pCol = taosArrayGetP(pAssociation, i);
*pCol = nodesCloneNode(*pNew); nodesDestroyNode(*pCol);
if (NULL == *pCol) { *pCol = nodesCloneNode(*pNew);
code = TSDB_CODE_OUT_OF_MEMORY; if (NULL == *pCol) {
break; code = TSDB_CODE_OUT_OF_MEMORY;
break;
}
} }
} }
} }
@ -383,7 +385,8 @@ static int32_t calcConstSetOpProjections(SCalcConstContext* pCxt, SSetOperator*
int32_t index = 0; int32_t index = 0;
SNode* pProj = NULL; SNode* pProj = NULL;
WHERE_EACH(pProj, pSetOp->pProjectionList) { WHERE_EACH(pProj, pSetOp->pProjectionList) {
if (subquery && notRefByOrderBy((SColumnNode*)pProj, pSetOp->pOrderByList) && isSetUselessCol(pSetOp, index, (SExprNode*)pProj)) { if (subquery && notRefByOrderBy((SColumnNode*)pProj, pSetOp->pOrderByList) &&
isSetUselessCol(pSetOp, index, (SExprNode*)pProj)) {
ERASE_NODE(pSetOp->pProjectionList); ERASE_NODE(pSetOp->pProjectionList);
eraseSetOpChildProjection(pSetOp, index); eraseSetOpChildProjection(pSetOp, index);
continue; continue;

View File

@ -566,53 +566,19 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList,
return code; return code;
} }
static int bindFileds(SBoundColInfo* pBoundInfo, SSchema* pSchema, TAOS_FIELD* fields, int numFields) { static bool findFileds(SSchema* pSchema, TAOS_FIELD* fields, int numFields) {
bool* pUseCols = taosMemoryCalloc(pBoundInfo->numOfCols, sizeof(bool));
if (NULL == pUseCols) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pBoundInfo->numOfBound = 0;
int16_t lastColIdx = -1; // last column found
int32_t code = TSDB_CODE_SUCCESS;
for (int i = 0; i < numFields; i++) { for (int i = 0; i < numFields; i++) {
SToken token; if(strcmp(pSchema->name, fields[i].name) == 0){
token.z = fields[i].name; return true;
token.n = strlen(fields[i].name);
int16_t t = lastColIdx + 1;
int16_t index = insFindCol(&token, t, pBoundInfo->numOfCols, pSchema);
if (index < 0 && t > 0) {
index = insFindCol(&token, 0, t, pSchema);
}
if (index < 0) {
uError("can not find column name:%s", token.z);
code = TSDB_CODE_PAR_INVALID_COLUMN;
break;
} else if (pUseCols[index]) {
code = TSDB_CODE_PAR_INVALID_COLUMN;
uError("duplicated column name:%s", token.z);
break;
} else {
lastColIdx = index;
pUseCols[index] = true;
pBoundInfo->pColIndex[pBoundInfo->numOfBound] = index;
++pBoundInfo->numOfBound;
} }
} }
if (TSDB_CODE_SUCCESS == code && !pUseCols[0]) { return false;
uError("primary timestamp column can not be null:");
code = TSDB_CODE_PAR_INVALID_COLUMN;
}
taosMemoryFree(pUseCols);
return code;
} }
int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD* tFields, int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD* tFields,
int numFields, bool needChangeLength) { int numFields, bool needChangeLength) {
void* tmp = taosHashGet(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid));
STableDataCxt* pTableCxt = NULL; STableDataCxt* pTableCxt = NULL;
int ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, int ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid,
sizeof(pTableMeta->uid), pTableMeta, &pCreateTb, &pTableCxt, true); sizeof(pTableMeta->uid), pTableMeta, &pCreateTb, &pTableCxt, true);
@ -620,19 +586,14 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
uError("insGetTableDataCxt error"); uError("insGetTableDataCxt error");
goto end; goto end;
} }
if (tFields != NULL) {
ret = bindFileds(&pTableCxt->boundColsInfo, getTableColumnSchema(pTableMeta), tFields, numFields); if(tmp == NULL){
ret = initTableColSubmitData(pTableCxt);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
uError("bindFileds error"); uError("initTableColSubmitData error");
goto end; goto end;
} }
} }
// no need to bind, because select * get all fields
ret = initTableColSubmitData(pTableCxt);
if (ret != TSDB_CODE_SUCCESS) {
uError("initTableColSubmitData error");
goto end;
}
char* p = (char*)data; char* p = (char*)data;
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column // | version | total length | total rows | total columns | flag seg| block group id | column schema | each column
@ -660,35 +621,43 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
SSchema* pSchema = getTableColumnSchema(pTableCxt->pMeta); SSchema* pSchema = getTableColumnSchema(pTableCxt->pMeta);
SBoundColInfo* boundInfo = &pTableCxt->boundColsInfo; SBoundColInfo* boundInfo = &pTableCxt->boundColsInfo;
if (boundInfo->numOfBound != numOfCols) { if (tFields != NULL && numFields != numOfCols) {
uError("boundInfo->numOfBound:%d != numOfCols:%d", boundInfo->numOfBound, numOfCols); uError("numFields:%d != numOfCols:%d", numFields, numOfCols);
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
if (tFields != NULL && numFields > boundInfo->numOfBound) {
uError("numFields:%d > boundInfo->numOfBound:%d", numFields, boundInfo->numOfBound);
ret = TSDB_CODE_INVALID_PARA; ret = TSDB_CODE_INVALID_PARA;
goto end; goto end;
} }
for (int c = 0; c < boundInfo->numOfBound; ++c) { for (int c = 0; c < boundInfo->numOfBound; ++c) {
SSchema* pColSchema = &pSchema[boundInfo->pColIndex[c]]; SSchema* pColSchema = &pSchema[c];
SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c); SColData* pCol = taosArrayGet(pTableCxt->pData->aCol, c);
if(tFields == NULL || findFileds(pColSchema, tFields, numFields)){
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) {
uError("type or bytes not equal");
ret = TSDB_CODE_INVALID_PARA;
goto end;
}
if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) { int8_t* offset = pStart;
uError("type or bytes not equal"); if (IS_VAR_DATA_TYPE(pColSchema->type)) {
ret = TSDB_CODE_INVALID_PARA; pStart += numOfRows * sizeof(int32_t);
goto end; } else {
} pStart += BitmapLen(numOfRows);
}
char* pData = pStart;
int8_t* offset = pStart; tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData);
if (IS_VAR_DATA_TYPE(pColSchema->type)) { fields += sizeof(int8_t) + sizeof(int32_t);
pStart += numOfRows * sizeof(int32_t); if (needChangeLength) {
} else { pStart += htonl(colLength[c]);
pStart += BitmapLen(numOfRows); } else {
} pStart += colLength[c];
char* pData = pStart; }
}else{
tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData); tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, NULL, NULL);
fields += sizeof(int8_t) + sizeof(int32_t);
if (needChangeLength) {
pStart += htonl(colLength[c]);
} else {
pStart += colLength[c];
} }
} }

View File

@ -1342,8 +1342,8 @@ static bool isCountNotNullValue(SFunctionNode* pFunc) {
// count(1) is rewritten as count(ts) for scannning optimization // count(1) is rewritten as count(ts) for scannning optimization
static int32_t rewriteCountNotNullValue(STranslateContext* pCxt, SFunctionNode* pCount) { static int32_t rewriteCountNotNullValue(STranslateContext* pCxt, SFunctionNode* pCount) {
SValueNode* pValue = (SValueNode*)nodesListGetNode(pCount->pParameterList, 0); SValueNode* pValue = (SValueNode*)nodesListGetNode(pCount->pParameterList, 0);
STableNode* pTable = NULL; STableNode* pTable = NULL;
int32_t code = findTable(pCxt, NULL, &pTable); int32_t code = findTable(pCxt, NULL, &pTable);
if (TSDB_CODE_SUCCESS == code && QUERY_NODE_REAL_TABLE == nodeType(pTable)) { if (TSDB_CODE_SUCCESS == code && QUERY_NODE_REAL_TABLE == nodeType(pTable)) {
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) { if (NULL == pCol) {
@ -1424,7 +1424,7 @@ static int32_t translateAggFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
} }
if (isCountNotNullValue(pFunc)) { if (isCountNotNullValue(pFunc)) {
return rewriteCountNotNullValue(pCxt, pFunc); return rewriteCountNotNullValue(pCxt, pFunc);
} }
if (isCountTbname(pFunc)) { if (isCountTbname(pFunc)) {
return rewriteCountTbname(pCxt, pFunc); return rewriteCountTbname(pCxt, pFunc);
} }
@ -5923,11 +5923,15 @@ static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, STabl
return code; return code;
} }
static bool isEventWindowQuery(SSelectStmt* pSelect) {
return NULL != pSelect->pWindow && QUERY_NODE_EVENT_WINDOW == nodeType(pSelect->pWindow);
}
static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt) { static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type || if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type ||
!pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList || !pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList ||
crossTableWithUdaf(pSelect)) { crossTableWithUdaf(pSelect) || isEventWindowQuery(pSelect)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
} }
if (NULL != pSelect->pSubtable && TSDB_DATA_TYPE_VARCHAR != ((SExprNode*)pSelect->pSubtable)->resType.type) { if (NULL != pSelect->pSubtable && TSDB_DATA_TYPE_VARCHAR != ((SExprNode*)pSelect->pSubtable)->resType.type) {

View File

@ -444,7 +444,7 @@ static int32_t getInsTagsTableTargetNameFromOp(int32_t acctId, SOperatorNode* pO
} else if (QUERY_NODE_VALUE == nodeType(pOper->pRight)) { } else if (QUERY_NODE_VALUE == nodeType(pOper->pRight)) {
pVal = (SValueNode*)pOper->pRight; pVal = (SValueNode*)pOper->pRight;
} }
if (NULL == pCol || NULL == pVal) { if (NULL == pCol || NULL == pVal || NULL == pVal->literal || 0 == strcmp(pVal->literal, "")) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -152,8 +152,14 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
if (batchCnt >= batchSz) break; if (batchCnt >= batchSz) break;
} }
if (taosArrayGetSize(pRes) == 0) { if (taosArrayGetSize(pRes) == 0) {
taosArrayDestroy(pRes); if (finished) {
break; taosArrayDestroy(pRes);
qDebug("task %d finish recover exec task ", pTask->taskId);
break;
} else {
qDebug("task %d continue recover exec task ", pTask->taskId);
continue;
}
} }
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
if (qRes == NULL) { if (qRes == NULL) {

View File

@ -486,3 +486,21 @@ void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* par
taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param); taosArrayGetSize(pArray) > 8 ? taosArrayQuickSort(pArray, fn, param) : taosArrayInsertSort(pArray, fn, param);
} }
void taosArraySwap(SArray* a, SArray* b) {
if (a == NULL || b == NULL) return;
size_t t = a->size;
a->size = b->size;
b->size = t;
uint32_t cap = a->capacity;
a->capacity = b->capacity;
b->capacity = cap;
uint32_t elem = a->elemSize;
a->elemSize = b->elemSize;
b->elemSize = elem;
void* data = a->pData;
a->pData = b->pData;
b->pData = data;
}

View File

@ -637,6 +637,8 @@ int32_t cfgLoadFromEnvVar(SConfig *pConfig) {
int32_t code = 0; int32_t code = 0;
char **pEnv = environ; char **pEnv = environ;
line[1023] = 0; line[1023] = 0;
if (pEnv == NULL) return 0;
while (*pEnv != NULL) { while (*pEnv != NULL) {
name = value = value2 = value3 = NULL; name = value = value2 = value3 = NULL;
olen = vlen = vlen2 = vlen3 = 0; olen = vlen = vlen2 = vlen3 = 0;

View File

@ -1,6 +1,7 @@
char version[64] = "${TD_VER_NUMBER}"; char version[64] = "${TD_VER_NUMBER}";
char compatible_version[12] = "${TD_VER_COMPATIBLE}"; char compatible_version[12] = "${TD_VER_COMPATIBLE}";
char gitinfo[48] = "${TD_VER_GIT}"; char gitinfo[48] = "${TD_VER_GIT}";
char gitinfoOfInternal[48] = "${TD_VER_GIT_INTERNAL}";
char buildinfo[64] = "Built at ${TD_VER_DATE}"; char buildinfo[64] = "Built at ${TD_VER_DATE}";
void libtaos_${TD_LIB_VER_NUMBER}_${TD_VER_OSTYPE}_${TD_VER_CPUTYPE}_${TD_VER_VERTYPE}() {}; void libtaos_${TD_LIB_VER_NUMBER}_${TD_VER_OSTYPE}_${TD_VER_CPUTYPE}_${TD_VER_VERTYPE}() {};

View File

@ -93,6 +93,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/raw_block_interface_test.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 6 -M 3
@ -871,6 +872,7 @@
,,y,script,./test.sh -f tsim/query/emptyTsRange.sim ,,y,script,./test.sh -f tsim/query/emptyTsRange.sim
,,y,script,./test.sh -f tsim/query/partitionby.sim ,,y,script,./test.sh -f tsim/query/partitionby.sim
,,y,script,./test.sh -f tsim/query/tableCount.sim ,,y,script,./test.sh -f tsim/query/tableCount.sim
,,y,script,./test.sh -f tsim/query/nullColSma.sim
,,y,script,./test.sh -f tsim/qnode/basic1.sim ,,y,script,./test.sh -f tsim/qnode/basic1.sim
,,y,script,./test.sh -f tsim/snode/basic1.sim ,,y,script,./test.sh -f tsim/snode/basic1.sim
,,y,script,./test.sh -f tsim/mnode/basic1.sim ,,y,script,./test.sh -f tsim/mnode/basic1.sim

View File

@ -167,8 +167,10 @@ function run_thread() {
local case_build_san=`echo "$line"|cut -d, -f3` local case_build_san=`echo "$line"|cut -d, -f3`
if [ "${case_build_san}" == "y" ]; then if [ "${case_build_san}" == "y" ]; then
case_build_san="y" case_build_san="y"
DEBUGPATH="debugSan"
elif [[ "${case_build_san}" == "n" ]] || [[ "${case_build_san}" == "" ]]; then elif [[ "${case_build_san}" == "n" ]] || [[ "${case_build_san}" == "" ]]; then
case_build_san="n" case_build_san="n"
DEBUGPATH="debugNoSan"
else else
usage usage
exit 1 exit 1
@ -301,10 +303,10 @@ function run_thread() {
if [ ! -z "$corefile" ]; then if [ ! -z "$corefile" ]; then
echo -e "\e[34m corefiles: $corefile \e[0m" echo -e "\e[34m corefiles: $corefile \e[0m"
local build_dir=$log_dir/build_${hosts[index]} local build_dir=$log_dir/build_${hosts[index]}
local remote_build_dir="${workdirs[index]}/TDengine/debug/build" local remote_build_dir="${workdirs[index]}/{DEBUGPATH}/build"
if [ $ent -ne 0 ]; then # if [ $ent -ne 0 ]; then
remote_build_dir="${workdirs[index]}/TDinternal/debug/build" # remote_build_dir="${workdirs[index]}/{DEBUGPATH}/build"
fi # fi
mkdir $build_dir 2>/dev/null mkdir $build_dir 2>/dev/null
if [ $? -eq 0 ]; then if [ $? -eq 0 ]; then
# scp build binary # scp build binary

View File

@ -0,0 +1,139 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
$dbPrefix = m_in_db
$tbPrefix = m_in_tb
$mtPrefix = m_in_mt
$tbNum = 1
$rowNum = 200
$totalNum = 400
print =============== step1
$i = 0
$db = $dbPrefix . $i
$mt = $mtPrefix . $i
sql drop database if exists $db
sql create database $db vgroups 1 maxrows 200 minrows 10;
sql use $db
sql create table $mt (ts timestamp, f1 int, f2 float) TAGS(tgcol int)
print ====== start create child tables and insert data
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
sql create table $tb using $mt tags( $i )
$x = 0
while $x < $rowNum
$cc = $x * 1
$ms = 1601481600000 + $cc
sql insert into $tb values ($ms , NULL , $x )
$x = $x + 1
endw
$i = $i + 1
endw
$i = 1
$tb = $tbPrefix . $i
sql create table $tb using $mt tags( $i )
$x = 0
while $x < $rowNum
$cc = $x * 1
$ms = 1601481600000 + $cc
sql insert into $tb values ($ms , $x , NULL )
$x = $x + 1
endw
sql flush database $db
print =============== step2
$i = 0
$tb = $tbPrefix . $i
sql select max(f1) from $tb
if $rows != 1 then
return -1
endi
if $data00 != NULL then
return -1
endi
$i = 1
$tb = $tbPrefix . $i
sql select max(f2) from $tb
if $rows != 1 then
return -1
endi
if $data00 != NULL then
return -1
endi
$rowNum = 10
print ====== insert more data
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
$x = 0
while $x < $rowNum
$cc = $x * 1
$ms = 1601481700000 + $cc
sql insert into $tb values ($ms , $x , $x )
$x = $x + 1
endw
$i = $i + 1
endw
$i = 1
$tb = $tbPrefix . $i
$x = 0
while $x < $rowNum
$cc = $x * 1
$ms = 1601481700000 + $cc
sql insert into $tb values ($ms , $x , $x )
$x = $x + 1
endw
sql flush database $db
print =============== step3
$i = 0
$tb = $tbPrefix . $i
sql select max(f1) from $tb
if $rows != 1 then
return -1
endi
if $data00 != 9 then
return -1
endi
$i = 1
$tb = $tbPrefix . $i
sql select max(f2) from $tb
if $rows != 1 then
return -1
endi
if $data00 != 9.00000 then
print $data00
return -1
endi
print =============== clear
#sql drop database $db
#sql select * from information_schema.ins_databases
#if $rows != 0 then
# return -1
#endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -51,6 +51,11 @@ if $data00 != @ins_stables@ then
return -1 return -1
endi endi
sql select * from information_schema.ins_tables where table_name='';
if $rows != 0 then
return -1
endi
sql select tbname from information_schema.ins_tables; sql select tbname from information_schema.ins_tables;
print $rows $data00 print $rows $data00
if $rows != 33 then if $rows != 33 then

View File

@ -272,4 +272,122 @@ if $data12 != 2 then
goto loop3 goto loop3
endi endi
print ===== step3
sql drop database if exists test4;
sql create database test4 vgroups 10;
sql use test4;
sql create stable st(ts timestamp,a int,b int,c varchar(250) ) tags(ta int,tb int,tc int);
sql create table aaa using st tags(1,1,1);
sql create table bbb using st tags(2,2,2);
sql create table ccc using st tags(3,2,2);
sql create table ddd using st tags(4,2,2);
sql create stream streams1 ignore expired 0 fill_history 0 watermark 3s into streamst subtable(c) as select _wstart, c , count(*) c1 from st partition by c interval(1s) ;
sql insert into aaa values(1648791221001,2,2,"/a1/aa/aa");
sql insert into bbb values(1648791221001,2,2,"/a1/aa/aa");
sql insert into ccc values(1648791221001,2,2,"/a1/aa/aa");
sql insert into ddd values(1648791221001,2,2,"/a1/aa/aa");
sql insert into aaa values(1648791222002,2,2,"/a2/aa/aa");
sql insert into bbb values(1648791222002,2,2,"/a2/aa/aa");
sql insert into ccc values(1648791222002,2,2,"/a2/aa/aa");
sql insert into ddd values(1648791222002,2,2,"/a2/aa/aa");
sql insert into aaa values(1648791223003,2,2,"/a3/aa/aa");
sql insert into bbb values(1648791223003,2,2,"/a3/aa/aa");
sql insert into ccc values(1648791223003,2,2,"/a3/aa/aa");
sql insert into ddd values(1648791223003,2,2,"/a3/aa/aa");
sql insert into aaa values(1648791224003,2,2,"/a4/aa/aa");
sql insert into bbb values(1648791224003,2,2,"/a4/aa/aa");
sql insert into ccc values(1648791224003,2,2,"/a4/aa/aa");
sql insert into ddd values(1648791224003,2,2,"/a4/aa/aa");
sql insert into aaa values(1648791225003,2,2,"/a5/aa/aa");
sql insert into bbb values(1648791225003,2,2,"/a5/aa/aa");
sql insert into ccc values(1648791225003,2,2,"/a5/aa/aa");
sql insert into ddd values(1648791225003,2,2,"/a5/aa/aa");
sql insert into aaa values(1648791226003,2,2,"/a6/aa/aa");
sql insert into bbb values(1648791226003,2,2,"/a6/aa/aa");
sql insert into ccc values(1648791226003,2,2,"/a6/aa/aa");
sql insert into ddd values(1648791226003,2,2,"/a6/aa/aa");
$loop_count = 0
loop4:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sql select * from streamst;
if $rows == 0 then
goto loop4
endi
sql delete from aaa where ts = 1648791223003 ;
$loop_count = 0
loop5:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sql select * from streamst;
if $rows == 0 then
goto loop5
endi
sql delete from ccc;
$loop_count = 0
loop6:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sql select * from streamst;
if $rows == 0 then
goto loop6
endi
sql delete from ddd;
$loop_count = 0
loop7:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sql select * from streamst;
if $rows == 0 then
goto loop7
endi
print ===== over
system sh/stop_dnodes.sh system sh/stop_dnodes.sh

View File

@ -0,0 +1,54 @@
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
def checkData(self):
tdSql.execute('use db_raw')
tdSql.query("select * from d1")
tdSql.checkRows(1)
tdSql.checkData(0, 1, 120)
tdSql.query("select * from d2")
tdSql.checkRows(1)
tdSql.checkData(0, 1, None)
return
def check(self):
buildPath = tdCom.getBuildPath()
cmdStr = '%s/build/bin/write_raw_block_test'%(buildPath)
tdLog.info(cmdStr)
os.system(cmdStr)
self.checkData()
return
def run(self):
tdSql.prepare()
self.check()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -100,7 +100,7 @@ class TDTestCase:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
if not ((totalConsumeRows >= expectrowcnt * 3/4) and (totalConsumeRows < expectrowcnt)): if not ((totalConsumeRows >= expectrowcnt * 3/4) and (totalConsumeRows <= expectrowcnt)):
tdLog.exit("tmq consume rows error with snapshot = 0!") tdLog.exit("tmq consume rows error with snapshot = 0!")
tdLog.info("wait subscriptions exit ....") tdLog.info("wait subscriptions exit ....")
@ -131,7 +131,7 @@ class TDTestCase:
'batchNum': 100, 'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'endTs': 0, 'endTs': 0,
'pollDelay': 10, 'pollDelay': 20,
'showMsg': 1, 'showMsg': 1,
'showRow': 1, 'showRow': 1,
'snapshot': 0} 'snapshot': 0}
@ -193,7 +193,7 @@ class TDTestCase:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
if not ((totalConsumeRows >= expectrowcnt / 2 * (1 + 3/4)) and (totalConsumeRows < expectrowcnt)): if not ((totalConsumeRows >= expectrowcnt / 2 * (1 + 3/4)) and (totalConsumeRows <= expectrowcnt)):
tdLog.exit("tmq consume rows error with snapshot = 0!") tdLog.exit("tmq consume rows error with snapshot = 0!")
tdLog.info("wait subscriptions exit ....") tdLog.info("wait subscriptions exit ....")

View File

@ -18,17 +18,18 @@
#endif #endif
#include "shellInt.h" #include "shellInt.h"
#include "version.h"
#ifndef CUS_NAME #ifndef CUS_NAME
char cusName[] = "TDengine"; char cusName[] = "TDengine";
#endif #endif
#ifndef CUS_PROMPT #ifndef CUS_PROMPT
char cusPrompt[] = "taos"; char cusPrompt[] = "taos";
#endif #endif
#ifndef CUS_EMAIL #ifndef CUS_EMAIL
char cusEmail[] = "<support@taosdata.com>"; char cusEmail[] = "<support@taosdata.com>";
#endif #endif
#if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL) #if defined(CUS_NAME) || defined(CUS_PROMPT) || defined(CUS_EMAIL)
@ -58,9 +59,9 @@
#define SHELL_VERSION "Print program version." #define SHELL_VERSION "Print program version."
#ifdef WEBSOCKET #ifdef WEBSOCKET
#define SHELL_DSN "The dsn to use when connecting to cloud server." #define SHELL_DSN "The dsn to use when connecting to cloud server."
#define SHELL_REST "Use restful mode when connecting." #define SHELL_REST "Use restful mode when connecting."
#define SHELL_TIMEOUT "Set the timeout for websocket query in seconds, default is 30." #define SHELL_TIMEOUT "Set the timeout for websocket query in seconds, default is 30."
#endif #endif
static int32_t shellParseSingleOpt(int32_t key, char *arg); static int32_t shellParseSingleOpt(int32_t key, char *arg);
@ -145,7 +146,7 @@ static void shellParseArgsUseArgp(int argc, char *argv[]) {
#endif #endif
#ifndef ARGP_ERR_UNKNOWN #ifndef ARGP_ERR_UNKNOWN
#define ARGP_ERR_UNKNOWN E2BIG #define ARGP_ERR_UNKNOWN E2BIG
#endif #endif
static int32_t shellParseSingleOpt(int32_t key, char *arg) { static int32_t shellParseSingleOpt(int32_t key, char *arg) {
@ -246,8 +247,8 @@ int32_t shellParseArgsWithoutArgp(int argc, char *argv[]) {
SShellArgs *pArgs = &shell.args; SShellArgs *pArgs = &shell.args;
for (int i = 1; i < argc; i++) { for (int i = 1; i < argc; i++) {
if (strcmp(argv[i], "--help") == 0 || strcmp(argv[i], "--usage") == 0 if (strcmp(argv[i], "--help") == 0 || strcmp(argv[i], "--usage") == 0 || strcmp(argv[i], "-?") == 0 ||
|| strcmp(argv[i], "-?") == 0 || strcmp(argv[i], "/?") == 0) { strcmp(argv[i], "/?") == 0) {
shellParseSingleOpt('?', NULL); shellParseSingleOpt('?', NULL);
return 0; return 0;
} }
@ -263,10 +264,8 @@ int32_t shellParseArgsWithoutArgp(int argc, char *argv[]) {
return -1; return -1;
} }
if (key[1] == 'h' || key[1] == 'P' || key[1] == 'u' if (key[1] == 'h' || key[1] == 'P' || key[1] == 'u' || key[1] == 'a' || key[1] == 'c' || key[1] == 's' ||
|| key[1] == 'a' || key[1] == 'c' || key[1] == 's' key[1] == 'f' || key[1] == 'd' || key[1] == 'w' || key[1] == 'n' || key[1] == 'l' || key[1] == 'N'
|| key[1] == 'f' || key[1] == 'd' || key[1] == 'w'
|| key[1] == 'n' || key[1] == 'l' || key[1] == 'N'
#ifdef WEBSOCKET #ifdef WEBSOCKET
|| key[1] == 'E' || key[1] == 'T' || key[1] == 'E' || key[1] == 'T'
#endif #endif
@ -282,12 +281,10 @@ int32_t shellParseArgsWithoutArgp(int argc, char *argv[]) {
} }
shellParseSingleOpt(key[1], val); shellParseSingleOpt(key[1], val);
i++; i++;
} else if (key[1] == 'p' || key[1] == 'A' || key[1] == 'C' } else if (key[1] == 'p' || key[1] == 'A' || key[1] == 'C' || key[1] == 'r' || key[1] == 'k' || key[1] == 't' ||
|| key[1] == 'r' || key[1] == 'k' key[1] == 'V' || key[1] == '?' || key[1] == 1
|| key[1] == 't' || key[1] == 'V'
|| key[1] == '?' || key[1] == 1
#ifdef WEBSOCKET #ifdef WEBSOCKET
||key[1] == 'R' || key[1] == 'R'
#endif #endif
) { ) {
shellParseSingleOpt(key[1], NULL); shellParseSingleOpt(key[1], NULL);
@ -420,9 +417,15 @@ int32_t shellParseArgs(int32_t argc, char *argv[]) {
sprintf(promptContinueFormat, "%%%zus> ", strlen(cusPrompt)); sprintf(promptContinueFormat, "%%%zus> ", strlen(cusPrompt));
sprintf(shell.info.promptContinue, promptContinueFormat, " "); sprintf(shell.info.promptContinue, promptContinueFormat, " ");
shell.info.promptSize = strlen(shell.info.promptHeader); shell.info.promptSize = strlen(shell.info.promptHeader);
#ifdef TD_ENTERPRISE
snprintf(shell.info.programVersion, sizeof(shell.info.programVersion),
"version: %s compatible_version: %s\ngitinfo: %s\ngitinfoOfInternal: %s\nbuildInfo: %s", version,
compatible_version, gitinfo, gitinfoOfInternal, buildinfo);
#else
snprintf(shell.info.programVersion, sizeof(shell.info.programVersion), snprintf(shell.info.programVersion, sizeof(shell.info.programVersion),
"version: %s compatible_version: %s\ngitinfo: %s\nbuildInfo: %s", version, compatible_version, gitinfo, "version: %s compatible_version: %s\ngitinfo: %s\nbuildInfo: %s", version, compatible_version, gitinfo,
buildinfo); buildinfo);
#endif
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
shell.info.osname = "Windows"; shell.info.osname = "Windows";

View File

@ -541,7 +541,7 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
printf("%*" PRIu64, width, *((uint64_t *)val)); printf("%*" PRIu64, width, *((uint64_t *)val));
break; break;
case TSDB_DATA_TYPE_FLOAT: case TSDB_DATA_TYPE_FLOAT:
printf("%*.5f", width, GET_FLOAT_VAL(val)); printf("%*ef", width, GET_FLOAT_VAL(val));
break; break;
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", width, GET_DOUBLE_VAL(val)); n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.9f", width, GET_DOUBLE_VAL(val));

View File

@ -3,6 +3,7 @@ add_dependencies(tmq_demo taos)
add_executable(tmq_sim tmqSim.c) add_executable(tmq_sim tmqSim.c)
add_executable(create_table createTable.c) add_executable(create_table createTable.c)
add_executable(tmq_taosx_ci tmq_taosx_ci.c) add_executable(tmq_taosx_ci tmq_taosx_ci.c)
add_executable(write_raw_block_test write_raw_block_test.c)
add_executable(sml_test sml_test.c) add_executable(sml_test sml_test.c)
add_executable(get_db_name_test get_db_name_test.c) add_executable(get_db_name_test get_db_name_test.c)
target_link_libraries( target_link_libraries(
@ -34,6 +35,14 @@ target_link_libraries(
PUBLIC os PUBLIC os
) )
target_link_libraries(
write_raw_block_test
PUBLIC taos_static
PUBLIC util
PUBLIC common
PUBLIC os
)
target_link_libraries( target_link_libraries(
sml_test sml_test
PUBLIC taos_static PUBLIC taos_static

View File

@ -0,0 +1,128 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <time.h>
#include "taos.h"
#include "types.h"
int buildStable(TAOS* pConn, TAOS_RES* pRes) {
pRes = taos_query(pConn,
"CREATE STABLE `meters` (`ts` TIMESTAMP, `current` INT, `voltage` INT, `phase` FLOAT) TAGS "
"(`groupid` INT, `location` VARCHAR(16))");
if (taos_errno(pRes) != 0) {
printf("failed to create super table meters, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table d0 using meters tags(1, 'San Francisco')");
if (taos_errno(pRes) != 0) {
printf("failed to create child table d0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "insert into d0 (ts, current) values (now, 120)");
if (taos_errno(pRes) != 0) {
printf("failed to insert into table d0, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table d1 using meters tags(2, 'San Francisco')");
if (taos_errno(pRes) != 0) {
printf("failed to create child table d1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table d2 using meters tags(3, 'San Francisco')");
if (taos_errno(pRes) != 0) {
printf("failed to create child table d1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
return 0;
}
int32_t init_env() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
if (pConn == NULL) {
return -1;
}
TAOS_RES* pRes = taos_query(pConn, "drop database if exists db_raw");
if (taos_errno(pRes) != 0) {
printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create database if not exists db_raw vgroups 2");
if (taos_errno(pRes) != 0) {
printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "use db_raw");
if (taos_errno(pRes) != 0) {
printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
buildStable(pConn, pRes);
pRes = taos_query(pConn, "select * from d0");
if (taos_errno(pRes) != 0) {
printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
void *data = NULL;
int32_t numOfRows = 0;
int error_code = taos_fetch_raw_block(pRes, &numOfRows, &data);
ASSERT(error_code == 0);
ASSERT(numOfRows == 1);
taos_write_raw_block(pConn, numOfRows, data, "d1");
taos_free_result(pRes);
pRes = taos_query(pConn, "select ts,phase from d0");
if (taos_errno(pRes) != 0) {
printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes));
return -1;
}
error_code = taos_fetch_raw_block(pRes, &numOfRows, &data);
ASSERT(error_code == 0);
ASSERT(numOfRows == 1);
int numFields = taos_num_fields(pRes);
TAOS_FIELD *fields = taos_fetch_fields(pRes);
taos_write_raw_block_with_fields(pConn, numOfRows, data, "d2", fields, numFields);
taos_free_result(pRes);
taos_close(pConn);
return 0;
}
int main(int argc, char* argv[]) {
if (init_env() < 0) {
return -1;
}
}