Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/submit_req
This commit is contained in:
commit
5388da8871
|
@ -2,7 +2,7 @@
|
|||
# taos-tools
|
||||
ExternalProject_Add(taos-tools
|
||||
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
|
||||
GIT_TAG 5445810
|
||||
GIT_TAG d5df76d
|
||||
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
|
||||
BINARY_DIR ""
|
||||
#BUILD_IN_SOURCE TRUE
|
||||
|
|
|
@ -129,6 +129,7 @@ extern char tsUdfdLdLibPath[];
|
|||
extern char tsSmlChildTableName[];
|
||||
extern char tsSmlTagName[];
|
||||
extern bool tsSmlDataFormat;
|
||||
extern int32_t tsSmlBatchSize;
|
||||
|
||||
// wal
|
||||
extern int64_t tsWalFsyncDataSizeLimit;
|
||||
|
|
|
@ -160,13 +160,6 @@ int32_t qAsyncKillTask(qTaskInfo_t tinfo);
|
|||
*/
|
||||
void qDestroyTask(qTaskInfo_t tinfo);
|
||||
|
||||
/**
|
||||
* Get the queried table uid
|
||||
* @param qHandle
|
||||
* @return
|
||||
*/
|
||||
int64_t qGetQueriedTableUid(qTaskInfo_t tinfo);
|
||||
|
||||
/**
|
||||
* Extract the qualified table id list, and than pass them to the TSDB driver to load the required table data blocks.
|
||||
*
|
||||
|
|
|
@ -79,7 +79,6 @@
|
|||
#define NCHAR_ADD_LEN 3 // L"nchar" 3 means L" "
|
||||
|
||||
#define MAX_RETRY_TIMES 5
|
||||
#define LINE_BATCH 2000
|
||||
//=================================================================================================
|
||||
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;
|
||||
|
||||
|
@ -467,6 +466,13 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
|||
goto end;
|
||||
}
|
||||
info->cost.numOfCreateSTables++;
|
||||
taosMemoryFreeClear(pTableMeta);
|
||||
|
||||
code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
|
||||
goto end;
|
||||
}
|
||||
} else if (code == TSDB_CODE_SUCCESS) {
|
||||
hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true,
|
||||
HASH_NO_LOCK);
|
||||
|
@ -505,16 +511,16 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
|||
uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pTableMeta);
|
||||
code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
}
|
||||
code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
taosMemoryFreeClear(pTableMeta);
|
||||
code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
}
|
||||
code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
taosHashClear(hashTmp);
|
||||
|
@ -552,12 +558,18 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
|||
uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
|
||||
goto end;
|
||||
}
|
||||
|
||||
code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
}
|
||||
code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
}
|
||||
needCheckMeta = true;
|
||||
taosHashCleanup(hashTmp);
|
||||
hashTmp = NULL;
|
||||
|
@ -565,13 +577,6 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
|||
uError("SML:0x%" PRIx64 " load table meta error: %s", info->id, tstrerror(code));
|
||||
goto end;
|
||||
}
|
||||
taosMemoryFreeClear(pTableMeta);
|
||||
|
||||
code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (needCheckMeta) {
|
||||
code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags,
|
||||
|
@ -596,7 +601,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
|||
end:
|
||||
taosHashCleanup(hashTmp);
|
||||
taosMemoryFreeClear(pTableMeta);
|
||||
catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
|
||||
// catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -815,6 +820,11 @@ static int8_t smlGetTsTypeByPrecision(int8_t precision) {
|
|||
}
|
||||
|
||||
static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t len) {
|
||||
void *tmp = taosMemoryCalloc(1, len + 1);
|
||||
memcpy(tmp, data, len);
|
||||
uDebug("SML:0x%" PRIx64 " smlParseInfluxTime tslen:%d, ts:%s", info->id, len, (char*)tmp);
|
||||
taosMemoryFree(tmp);
|
||||
|
||||
if (len == 0 || (len == 1 && data[0] == '0')) {
|
||||
return taosGetTimestampNs();
|
||||
}
|
||||
|
@ -2066,7 +2076,10 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo *
|
|||
|
||||
static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql, const int len) {
|
||||
SSmlLineInfo elements = {0};
|
||||
uDebug("SML:0x%" PRIx64 " smlParseInfluxLine sql:%s", info->id, (info->isRawLine ? "rawdata" : sql));
|
||||
void *tmp = taosMemoryCalloc(1, len + 1);
|
||||
memcpy(tmp, sql, len);
|
||||
uDebug("SML:0x%" PRIx64 " smlParseInfluxLine raw:%d, len:%d, sql:%s", info->id, info->isRawLine, len, (info->isRawLine ? (char*)tmp : sql));
|
||||
taosMemoryFree(tmp);
|
||||
|
||||
int ret = smlParseInfluxString(sql, sql + len, &elements, &info->msgBuf);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2562,7 +2575,7 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char
|
|||
goto end;
|
||||
}
|
||||
|
||||
batchs = ceil(((double)numLines) / LINE_BATCH);
|
||||
batchs = ceil(((double)numLines) / tsSmlBatchSize);
|
||||
params.total = batchs;
|
||||
for (int i = 0; i < batchs; ++i) {
|
||||
SRequestObj *req = (SRequestObj *)createRequest(pTscObj->id, TSDB_SQL_INSERT, 0);
|
||||
|
@ -2581,7 +2594,7 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char
|
|||
info->isRawLine = (rawLine == NULL);
|
||||
info->ttl = ttl;
|
||||
|
||||
int32_t perBatch = LINE_BATCH;
|
||||
int32_t perBatch = tsSmlBatchSize;
|
||||
|
||||
if (numLines > perBatch) {
|
||||
numLines -= perBatch;
|
||||
|
|
|
@ -75,6 +75,7 @@ char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table
|
|||
// If set to empty system will generate table name using MD5 hash.
|
||||
// true means that the name and order of cols in each line are the same(only for influx protocol)
|
||||
bool tsSmlDataFormat = false;
|
||||
int32_t tsSmlBatchSize = 10000;
|
||||
|
||||
// query
|
||||
int32_t tsQueryPolicy = 1;
|
||||
|
@ -306,6 +307,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
|||
if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1;
|
||||
if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "smlBatchSize", tsSmlBatchSize, 1, INT32_MAX, true) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "maxMemUsedByInsert", tsMaxMemUsedByInsert, 1, INT32_MAX, true) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "rpcRetryLimit", tsRpcRetryLimit, 1, 100000, 0) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "rpcRetryInterval", tsRpcRetryInterval, 1, 100000, 0) != 0) return -1;
|
||||
|
@ -648,6 +650,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
|
|||
tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN);
|
||||
tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval;
|
||||
|
||||
tsSmlBatchSize = cfgGetItem(pCfg, "smlBatchSize")->i32;
|
||||
tsMaxMemUsedByInsert = cfgGetItem(pCfg, "maxMemUsedByInsert")->i32;
|
||||
|
||||
tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32;
|
||||
|
@ -1021,6 +1024,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
|
|||
tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN);
|
||||
} else if (strcasecmp("smlDataFormat", name) == 0) {
|
||||
tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval;
|
||||
} else if (strcasecmp("smlBatchSize", name) == 0) {
|
||||
tsSmlBatchSize = cfgGetItem(pCfg, "smlBatchSize")->i32;
|
||||
} else if (strcasecmp("shellActivityTimer", name) == 0) {
|
||||
tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32;
|
||||
} else if (strcasecmp("supportVnodes", name) == 0) {
|
||||
|
|
|
@ -161,4 +161,6 @@ int32_t convertFillType(int32_t mode);
|
|||
int32_t resultrowComparAsc(const void* p1, const void* p2);
|
||||
int32_t isQualifiedTable(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified);
|
||||
|
||||
void printDataBlock(SSDataBlock* pBlock, const char* flag);
|
||||
|
||||
#endif // TDENGINE_QUERYUTIL_H
|
||||
|
|
|
@ -538,23 +538,6 @@ typedef struct SStreamIntervalOperatorInfo {
|
|||
SWinKey delKey;
|
||||
} SStreamIntervalOperatorInfo;
|
||||
|
||||
typedef struct SFillOperatorInfo {
|
||||
struct SFillInfo* pFillInfo;
|
||||
SSDataBlock* pRes;
|
||||
SSDataBlock* pFinalRes;
|
||||
int64_t totalInputRows;
|
||||
void** p;
|
||||
SSDataBlock* existNewGroupBlock;
|
||||
STimeWindow win;
|
||||
SColMatchInfo matchInfo;
|
||||
int32_t primaryTsCol;
|
||||
int32_t primarySrcSlotId;
|
||||
uint64_t curGroupId; // current handled group id
|
||||
SExprInfo* pExprInfo;
|
||||
int32_t numOfExpr;
|
||||
SExprSupp noFillExprSupp;
|
||||
} SFillOperatorInfo;
|
||||
|
||||
typedef struct SDataGroupInfo {
|
||||
uint64_t groupId;
|
||||
int64_t numOfRows;
|
||||
|
@ -806,8 +789,6 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
|
|||
int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle);
|
||||
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList);
|
||||
|
||||
void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo);
|
||||
|
||||
int32_t getMaximumIdleDurationSec();
|
||||
|
||||
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval,
|
||||
|
@ -825,9 +806,8 @@ bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup);
|
|||
bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup);
|
||||
void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid,
|
||||
uint64_t* pGp, void* pTbName);
|
||||
void printDataBlock(SSDataBlock* pBlock, const char* flag);
|
||||
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
|
||||
void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock);
|
||||
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
|
||||
void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock);
|
||||
|
||||
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
|
||||
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
||||
|
|
|
@ -25,6 +25,8 @@ extern "C" {
|
|||
#include "tcommon.h"
|
||||
#include "tsimplehash.h"
|
||||
|
||||
#define GET_DEST_SLOT_ID(_p) ((_p)->pExpr->base.resSchema.slotId)
|
||||
|
||||
struct SSDataBlock;
|
||||
|
||||
typedef struct SFillColInfo {
|
||||
|
@ -113,12 +115,12 @@ typedef struct SStreamFillInfo {
|
|||
|
||||
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows);
|
||||
|
||||
void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
|
||||
void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp);
|
||||
void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
|
||||
struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprInfo* pNotFillExpr,
|
||||
int32_t numOfNotFillCols, const struct SNodeListNode* val);
|
||||
bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
|
||||
void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
|
||||
void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp);
|
||||
void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
|
||||
SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprInfo* pNotFillExpr,
|
||||
int32_t numOfNotFillCols, const struct SNodeListNode* val);
|
||||
bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
|
||||
|
||||
SFillInfo* taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t capacity,
|
||||
SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, int32_t slotId,
|
||||
|
@ -128,6 +130,8 @@ void* taosDestroyFillInfo(struct SFillInfo* pFillInfo);
|
|||
int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity);
|
||||
int64_t getFillInfoStart(struct SFillInfo* pFillInfo);
|
||||
|
||||
bool fillIfWindowPseudoColumn(SFillInfo* pFillInfo, SFillColInfo* pCol, SColumnInfoData* pDstColInfoData,
|
||||
int32_t rowIndex);
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -15,26 +15,15 @@
|
|||
|
||||
#include "filter.h"
|
||||
#include "function.h"
|
||||
#include "functionMgt.h"
|
||||
#include "os.h"
|
||||
#include "querynodes.h"
|
||||
#include "tfill.h"
|
||||
#include "tname.h"
|
||||
#include "tref.h"
|
||||
|
||||
#include "tdatablock.h"
|
||||
#include "tglobal.h"
|
||||
#include "tmsg.h"
|
||||
#include "tsort.h"
|
||||
#include "ttime.h"
|
||||
|
||||
#include "executorimpl.h"
|
||||
#include "index.h"
|
||||
#include "query.h"
|
||||
#include "tcompare.h"
|
||||
#include "thash.h"
|
||||
#include "ttypes.h"
|
||||
#include "vnode.h"
|
||||
|
||||
typedef struct SFetchRspHandleWrapper {
|
||||
uint32_t exchangeId;
|
||||
|
|
|
@ -2002,3 +2002,13 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags
|
|||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void printDataBlock(SSDataBlock* pBlock, const char* flag) {
|
||||
if (!pBlock || pBlock->info.rows == 0) {
|
||||
qDebug("===stream===printDataBlock: Block is Null or Empty");
|
||||
return;
|
||||
}
|
||||
char* pBuf = NULL;
|
||||
qDebug("%s", dumpBlockData(pBlock, flag, &pBuf));
|
||||
taosMemoryFree(pBuf);
|
||||
}
|
|
@ -704,6 +704,20 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo) {
|
||||
STaskCostInfo* pSummary = &pTaskInfo->cost;
|
||||
|
||||
SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
|
||||
if (pSummary->pRecoder != NULL) {
|
||||
qDebug(
|
||||
"%s :cost summary: elapsed time:%.2f ms, extract tableList:%.2f ms, createGroupIdMap:%.2f ms, total blocks:%d, "
|
||||
"load block SMA:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
|
||||
GET_TASKID(pTaskInfo), pSummary->elapsedTime / 1000.0, pSummary->extractListTime, pSummary->groupIdMapTime,
|
||||
pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks, pRecorder->totalRows,
|
||||
pRecorder->totalCheckedRows);
|
||||
}
|
||||
}
|
||||
|
||||
void qDestroyTask(qTaskInfo_t qTaskHandle) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
|
||||
if (pTaskInfo == NULL) {
|
||||
|
|
|
@ -91,7 +91,6 @@ static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock*
|
|||
|
||||
static void releaseQueryBuf(size_t numOfTables);
|
||||
|
||||
static void destroyFillOperatorInfo(void* param);
|
||||
static void destroyAggOperatorInfo(void* param);
|
||||
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
|
||||
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
|
||||
|
@ -1157,20 +1156,6 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
|
|||
}
|
||||
}
|
||||
|
||||
void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo) {
|
||||
STaskCostInfo* pSummary = &pTaskInfo->cost;
|
||||
|
||||
SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder;
|
||||
if (pSummary->pRecoder != NULL) {
|
||||
qDebug(
|
||||
"%s :cost summary: elapsed time:%.2f ms, extract tableList:%.2f ms, createGroupIdMap:%.2f ms, total blocks:%d, "
|
||||
"load block SMA:%d, load data block:%d, total rows:%" PRId64 ", check rows:%" PRId64,
|
||||
GET_TASKID(pTaskInfo), pSummary->elapsedTime / 1000.0, pSummary->extractListTime, pSummary->groupIdMapTime,
|
||||
pRecorder->totalBlocks, pRecorder->loadBlockStatis, pRecorder->loadBlocks, pRecorder->totalRows,
|
||||
pRecorder->totalCheckedRows);
|
||||
}
|
||||
}
|
||||
|
||||
// void skipBlocks(STaskRuntimeEnv *pRuntimeEnv) {
|
||||
// STaskAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||
//
|
||||
|
@ -1603,179 +1588,6 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
|
|||
return (rows == 0) ? NULL : pInfo->pRes;
|
||||
}
|
||||
|
||||
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
|
||||
SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) {
|
||||
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
|
||||
SSDataBlock* pResBlock = pInfo->pFinalRes;
|
||||
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
int32_t scanFlag = MAIN_SCAN;
|
||||
getTableScanInfo(pOperator, &order, &scanFlag);
|
||||
|
||||
int64_t ekey = pInfo->existNewGroupBlock->info.window.ekey;
|
||||
taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));
|
||||
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
doApplyScalarCalculation(pOperator, pInfo->existNewGroupBlock, order, scanFlag);
|
||||
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ekey);
|
||||
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
|
||||
|
||||
int32_t numOfResultRows = pResultInfo->capacity - pResBlock->info.rows;
|
||||
taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
|
||||
|
||||
pInfo->curGroupId = pInfo->existNewGroupBlock->info.id.groupId;
|
||||
pInfo->existNewGroupBlock = NULL;
|
||||
}
|
||||
|
||||
static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
|
||||
SResultInfo* pResultInfo, SExecTaskInfo* pTaskInfo) {
|
||||
if (taosFillHasMoreResults(pInfo->pFillInfo)) {
|
||||
int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows;
|
||||
taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows);
|
||||
pInfo->pRes->info.id.groupId = pInfo->curGroupId;
|
||||
return;
|
||||
}
|
||||
|
||||
// handle the cached new group data block
|
||||
if (pInfo->existNewGroupBlock) {
|
||||
doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo);
|
||||
}
|
||||
}
|
||||
|
||||
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
|
||||
SFillOperatorInfo* pInfo = pOperator->info;
|
||||
SExprSupp* pSup = &pOperator->exprSupp;
|
||||
setInputDataBlock(pSup, pBlock, order, scanFlag, false);
|
||||
projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL);
|
||||
|
||||
// reset the row value before applying the no-fill functions to the input data block, which is "pBlock" in this case.
|
||||
pInfo->pRes->info.rows = 0;
|
||||
SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp;
|
||||
setInputDataBlock(pNoFillSupp, pBlock, order, scanFlag, false);
|
||||
|
||||
projectApplyFunctions(pNoFillSupp->pExprInfo, pInfo->pRes, pBlock, pNoFillSupp->pCtx, pNoFillSupp->numOfExprs, NULL);
|
||||
pInfo->pRes->info.id.groupId = pBlock->info.id.groupId;
|
||||
}
|
||||
|
||||
static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
|
||||
SFillOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
||||
SSDataBlock* pResBlock = pInfo->pFinalRes;
|
||||
|
||||
blockDataCleanup(pResBlock);
|
||||
|
||||
int32_t order = TSDB_ORDER_ASC;
|
||||
int32_t scanFlag = MAIN_SCAN;
|
||||
getTableScanInfo(pOperator, &order, &scanFlag);
|
||||
|
||||
doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo);
|
||||
if (pResBlock->info.rows > 0) {
|
||||
pResBlock->info.id.groupId = pInfo->curGroupId;
|
||||
return pResBlock;
|
||||
}
|
||||
|
||||
SOperatorInfo* pDownstream = pOperator->pDownstream[0];
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream);
|
||||
if (pBlock == NULL) {
|
||||
if (pInfo->totalInputRows == 0) {
|
||||
setOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
|
||||
} else {
|
||||
blockDataUpdateTsWindow(pBlock, pInfo->primarySrcSlotId);
|
||||
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
|
||||
blockDataEnsureCapacity(pInfo->pFinalRes, pBlock->info.rows);
|
||||
doApplyScalarCalculation(pOperator, pBlock, order, scanFlag);
|
||||
|
||||
if (pInfo->curGroupId == 0 || pInfo->curGroupId == pInfo->pRes->info.id.groupId) {
|
||||
pInfo->curGroupId = pInfo->pRes->info.id.groupId; // the first data block
|
||||
pInfo->totalInputRows += pInfo->pRes->info.rows;
|
||||
|
||||
if (order == pInfo->pFillInfo->order) {
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, pBlock->info.window.ekey);
|
||||
} else {
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, pBlock->info.window.skey);
|
||||
}
|
||||
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
|
||||
} else if (pInfo->curGroupId != pBlock->info.id.groupId) { // the new group data block
|
||||
pInfo->existNewGroupBlock = pBlock;
|
||||
|
||||
// Fill the previous group data block, before handle the data block of new group.
|
||||
// Close the fill operation for previous group data block
|
||||
taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t numOfResultRows = pOperator->resultInfo.capacity - pResBlock->info.rows;
|
||||
taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
|
||||
|
||||
// current group has no more result to return
|
||||
if (pResBlock->info.rows > 0) {
|
||||
// 1. The result in current group not reach the threshold of output result, continue
|
||||
// 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
|
||||
if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || pInfo->existNewGroupBlock != NULL) {
|
||||
pResBlock->info.id.groupId = pInfo->curGroupId;
|
||||
return pResBlock;
|
||||
}
|
||||
|
||||
doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, pTaskInfo);
|
||||
if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) {
|
||||
pResBlock->info.id.groupId = pInfo->curGroupId;
|
||||
return pResBlock;
|
||||
}
|
||||
} else if (pInfo->existNewGroupBlock) { // try next group
|
||||
assert(pBlock != NULL);
|
||||
|
||||
blockDataCleanup(pResBlock);
|
||||
|
||||
doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, pTaskInfo);
|
||||
if (pResBlock->info.rows > pResultInfo->threshold) {
|
||||
pResBlock->info.id.groupId = pInfo->curGroupId;
|
||||
return pResBlock;
|
||||
}
|
||||
} else {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* doFill(SOperatorInfo* pOperator) {
|
||||
SFillOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SSDataBlock* fillResult = NULL;
|
||||
while (true) {
|
||||
fillResult = doFillImpl(pOperator);
|
||||
if (fillResult == NULL) {
|
||||
setOperatorCompleted(pOperator);
|
||||
break;
|
||||
}
|
||||
|
||||
doFilter(fillResult, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo);
|
||||
if (fillResult->info.rows > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (fillResult != NULL) {
|
||||
pOperator->resultInfo.totalRows += fillResult->info.rows;
|
||||
}
|
||||
|
||||
return fillResult;
|
||||
}
|
||||
|
||||
void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
|
||||
for (int32_t i = 0; i < numOfExprs; ++i) {
|
||||
SExprInfo* pExprInfo = &pExpr[i];
|
||||
|
@ -2045,167 +1857,6 @@ void destroyAggOperatorInfo(void* param) {
|
|||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
void destroyFillOperatorInfo(void* param) {
|
||||
SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param;
|
||||
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
|
||||
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
||||
pInfo->pFinalRes = blockDataDestroy(pInfo->pFinalRes);
|
||||
|
||||
cleanupExprSupp(&pInfo->noFillExprSupp);
|
||||
|
||||
taosMemoryFreeClear(pInfo->p);
|
||||
taosArrayDestroy(pInfo->matchInfo.pList);
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SExprInfo* pNotFillExpr,
|
||||
int32_t numOfNotFillCols, SNodeListNode* pValNode, STimeWindow win, int32_t capacity,
|
||||
const char* id, SInterval* pInterval, int32_t fillType, int32_t order) {
|
||||
SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode);
|
||||
|
||||
int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey;
|
||||
STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, startKey);
|
||||
w = getFirstQualifiedTimeWindow(startKey, &w, pInterval, order);
|
||||
|
||||
pInfo->pFillInfo = taosCreateFillInfo(w.skey, numOfCols, numOfNotFillCols, capacity, pInterval, fillType, pColInfo,
|
||||
pInfo->primaryTsCol, order, id);
|
||||
|
||||
if (order == TSDB_ORDER_ASC) {
|
||||
pInfo->win.skey = win.skey;
|
||||
pInfo->win.ekey = win.ekey;
|
||||
} else {
|
||||
pInfo->win.skey = win.ekey;
|
||||
pInfo->win.ekey = win.skey;
|
||||
}
|
||||
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
|
||||
|
||||
if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
|
||||
taosMemoryFree(pInfo->pFillInfo);
|
||||
taosMemoryFree(pInfo->p);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
static bool isWstartColumnExist(SFillOperatorInfo* pInfo) {
|
||||
if (pInfo->noFillExprSupp.numOfExprs == 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pInfo->noFillExprSupp.numOfExprs; ++i) {
|
||||
SExprInfo* exprInfo = pInfo->noFillExprSupp.pExprInfo + i;
|
||||
if (exprInfo->pExpr->nodeType == QUERY_NODE_COLUMN && exprInfo->base.numOfParams == 1 &&
|
||||
exprInfo->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_START) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static int32_t createPrimaryTsExprIfNeeded(SFillOperatorInfo* pInfo, SFillPhysiNode* pPhyFillNode, SExprSupp* pExprSupp,
|
||||
const char* idStr) {
|
||||
bool wstartExist = isWstartColumnExist(pInfo);
|
||||
|
||||
if (wstartExist == false) {
|
||||
if (pPhyFillNode->pWStartTs->type != QUERY_NODE_TARGET) {
|
||||
qError("pWStartTs of fill physical node is not a target node, %s", idStr);
|
||||
return TSDB_CODE_QRY_SYS_ERROR;
|
||||
}
|
||||
|
||||
SExprInfo* pExpr = taosMemoryRealloc(pExprSupp->pExprInfo, (pExprSupp->numOfExprs + 1) * sizeof(SExprInfo));
|
||||
if (pExpr == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
createExprFromTargetNode(&pExpr[pExprSupp->numOfExprs], (STargetNode*)pPhyFillNode->pWStartTs);
|
||||
pExprSupp->numOfExprs += 1;
|
||||
pExprSupp->pExprInfo = pExpr;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
|
||||
SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr);
|
||||
pOperator->exprSupp.pExprInfo = pExprInfo;
|
||||
|
||||
SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp;
|
||||
pNoFillSupp->pExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pNoFillSupp->numOfExprs);
|
||||
int32_t code = createPrimaryTsExprIfNeeded(pInfo, pPhyFillNode, pNoFillSupp, pTaskInfo->id.str);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
code = initExprSupp(pNoFillSupp, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
SInterval* pInterval =
|
||||
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
|
||||
? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
|
||||
: &((SIntervalAggOperatorInfo*)downstream->info)->interval;
|
||||
|
||||
int32_t order = (pPhyFillNode->inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||
int32_t type = convertFillType(pPhyFillNode->mode);
|
||||
|
||||
SResultInfo* pResultInfo = &pOperator->resultInfo;
|
||||
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||
code = initExprSupp(&pOperator->exprSupp, pExprInfo, pInfo->numOfExpr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->primaryTsCol = ((STargetNode*)pPhyFillNode->pWStartTs)->slotId;
|
||||
pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId;
|
||||
|
||||
int32_t numOfOutputCols = 0;
|
||||
code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols,
|
||||
COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
|
||||
|
||||
code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs,
|
||||
(SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity,
|
||||
pTaskInfo->id.str, pInterval, type, order);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pInfo->pFinalRes = createOneDataBlock(pInfo->pRes, false);
|
||||
blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity);
|
||||
|
||||
code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
setOperatorInfo(pOperator, "FillOperator", QUERY_NODE_PHYSICAL_PLAN_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||
pOperator->exprSupp.numOfExprs = pInfo->numOfExpr;
|
||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, destroyFillOperatorInfo, NULL);
|
||||
|
||||
code = appendDownstream(pOperator, &downstream, 1);
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
if (pInfo != NULL) {
|
||||
destroyFillOperatorInfo(pInfo);
|
||||
}
|
||||
|
||||
pTaskInfo->code = code;
|
||||
taosMemoryFreeClear(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPTR_EXEC_MODEL model, char* dbFName) {
|
||||
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
|
||||
if (pTaskInfo == NULL) {
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1339,30 +1339,6 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock,
|
|||
return code;
|
||||
}
|
||||
|
||||
static void calBlockTag(SExprSupp* pTagCalSup, SSDataBlock* pBlock, SSDataBlock* pResBlock) {
|
||||
if (pTagCalSup == NULL || pTagCalSup->numOfExprs == 0) return;
|
||||
if (pBlock == NULL || pBlock->info.rows == 0) return;
|
||||
|
||||
SSDataBlock* pSrcBlock = blockCopyOneRow(pBlock, 0);
|
||||
ASSERT(pSrcBlock->info.rows == 1);
|
||||
|
||||
blockDataEnsureCapacity(pResBlock, 1);
|
||||
|
||||
projectApplyFunctions(pTagCalSup->pExprInfo, pResBlock, pSrcBlock, pTagCalSup->pCtx, 1, NULL);
|
||||
ASSERT(pResBlock->info.rows == 1);
|
||||
|
||||
// build tagArray
|
||||
/*SArray* tagArray = taosArrayInit(0, sizeof(void*));*/
|
||||
/*STagVal tagVal = {*/
|
||||
/*.cid = 0,*/
|
||||
/*.type = 0,*/
|
||||
/*};*/
|
||||
// build STag
|
||||
// set STag
|
||||
|
||||
blockDataDestroy(pSrcBlock);
|
||||
}
|
||||
|
||||
void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
|
||||
SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup;
|
||||
SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState;
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -467,11 +467,11 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
|
|||
// add current row if timestamp match
|
||||
if (ts == pSliceInfo->current && pSliceInfo->current <= pSliceInfo->win.ekey) {
|
||||
addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i);
|
||||
doKeepPrevRows(pSliceInfo, pBlock, i);
|
||||
|
||||
pSliceInfo->current =
|
||||
taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision);
|
||||
}
|
||||
doKeepPrevRows(pSliceInfo, pBlock, i);
|
||||
|
||||
if (pSliceInfo->current > pSliceInfo->win.ekey) {
|
||||
setOperatorCompleted(pOperator);
|
||||
|
|
|
@ -677,16 +677,6 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
|
|||
}
|
||||
}
|
||||
|
||||
void printDataBlock(SSDataBlock* pBlock, const char* flag) {
|
||||
if (!pBlock || pBlock->info.rows == 0) {
|
||||
qDebug("===stream===printDataBlock: Block is Null or Empty");
|
||||
return;
|
||||
}
|
||||
char* pBuf = NULL;
|
||||
qDebug("%s", dumpBlockData(pBlock, flag, &pBuf));
|
||||
taosMemoryFree(pBuf);
|
||||
}
|
||||
|
||||
typedef int32_t (*__compare_fn_t)(void* pKey, void* data, int32_t index);
|
||||
|
||||
int32_t binarySearchCom(void* keyList, int num, void* pKey, int order, __compare_fn_t comparefn) {
|
||||
|
@ -3854,7 +3844,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
|||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
||||
int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
|
||||
int64_t groupId = pSDataBlock->info.id.groupId;
|
||||
uint64_t groupId = pSDataBlock->info.id.groupId;
|
||||
int64_t code = TSDB_CODE_SUCCESS;
|
||||
TSKEY* tsCols = NULL;
|
||||
SResultRow* pResult = NULL;
|
||||
|
|
|
@ -231,8 +231,10 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
|
|||
SIF_ERR_RET(sifGetValueFromNode(node, ¶m->condValue));
|
||||
param->colId = -1;
|
||||
param->colValType = (uint8_t)(vn->node.resType.type);
|
||||
if (strlen(vn->literal) <= sizeof(param->colName)) {
|
||||
if (vn->literal != NULL && strlen(vn->literal) <= sizeof(param->colName)) {
|
||||
memcpy(param->colName, vn->literal, strlen(vn->literal));
|
||||
} else {
|
||||
param->status = SFLT_NOT_INDEX;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -541,6 +541,10 @@ _START_RECEIVER:
|
|||
taosMsleep(10);
|
||||
}
|
||||
|
||||
if (snapshotReceiverIsStart(pReceiver)) {
|
||||
snapshotReceiverForceStop(pReceiver);
|
||||
}
|
||||
|
||||
snapshotReceiverStart(pReceiver, pMsg); // set start-time same with sender
|
||||
}
|
||||
|
||||
|
|
|
@ -84,6 +84,7 @@ class TDTestCase:
|
|||
stb = f"{dbname}.meters"
|
||||
self.installTaosd(bPath,cPath)
|
||||
os.system("echo 'debugFlag 143' > /etc/taos/taos.cfg ")
|
||||
os.system("echo ' supportVnodes 256' > /etc/taos/taos.cfg ")
|
||||
tableNumbers=100
|
||||
recordNumbers1=100
|
||||
recordNumbers2=1000
|
||||
|
|
Loading…
Reference in New Issue