Merge remote-tracking branch 'origin/3.0' into feature/qnode
This commit is contained in:
commit
997f07a009
|
@ -658,6 +658,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWin
|
|||
void cleanupAggSup(SAggSupporter* pAggSup);
|
||||
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
|
||||
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
|
||||
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode);
|
||||
|
||||
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo);
|
||||
SSDataBlock* loadNextDataBlock(void* param);
|
||||
|
|
|
@ -4603,7 +4603,7 @@ static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, i
|
|||
return s;
|
||||
}
|
||||
|
||||
static SColumn* createColumn(int32_t blockId, int32_t slotId, SDataType* pType) {
|
||||
static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDataType* pType) {
|
||||
SColumn* pCol = taosMemoryCalloc(1, sizeof(SColumn));
|
||||
if (pCol == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -4611,9 +4611,10 @@ static SColumn* createColumn(int32_t blockId, int32_t slotId, SDataType* pType)
|
|||
}
|
||||
|
||||
pCol->slotId = slotId;
|
||||
pCol->bytes = pType->bytes;
|
||||
pCol->type = pType->type;
|
||||
pCol->scale = pType->scale;
|
||||
pCol->colId = colId;
|
||||
pCol->bytes = pType->bytes;
|
||||
pCol->type = pType->type;
|
||||
pCol->scale = pType->scale;
|
||||
pCol->precision = pType->precision;
|
||||
pCol->dataBlockId = blockId;
|
||||
|
||||
|
@ -4656,7 +4657,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
|||
SDataType* pType = &pColNode->node.resType;
|
||||
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale,
|
||||
pType->precision, pColNode->colName);
|
||||
pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pType);
|
||||
pExp->base.pParam[0].pCol = createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType);
|
||||
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
|
||||
} else if (type == QUERY_NODE_VALUE) {
|
||||
pExp->pExpr->nodeType = QUERY_NODE_VALUE;
|
||||
|
@ -4708,7 +4709,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
|||
SColumnNode* pcn = (SColumnNode*)p1;
|
||||
|
||||
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN;
|
||||
pExp->base.pParam[j].pCol = createColumn(pcn->dataBlockId, pcn->slotId, &pcn->node.resType);
|
||||
pExp->base.pParam[j].pCol = createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType);
|
||||
} else if (p1->type == QUERY_NODE_VALUE) {
|
||||
SValueNode* pvn = (SValueNode*)p1;
|
||||
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
|
||||
|
@ -4788,7 +4789,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
|
||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
int32_t numOfCols = 0;
|
||||
|
||||
tsdbReaderT pDataReader = NULL;
|
||||
if (pHandle->vnode) {
|
||||
|
@ -4797,6 +4798,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo,
|
||||
queryId, taskId);
|
||||
}
|
||||
|
||||
if (pDataReader == NULL && terrno != 0) {
|
||||
qDebug("pDataReader is NULL");
|
||||
// return NULL;
|
||||
|
@ -4816,12 +4818,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
}
|
||||
|
||||
SInterval interval = extractIntervalInfo(pTableScanNode);
|
||||
SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(
|
||||
pDataReader, &cond, numOfCols, pTableScanNode->dataRequired, pTableScanNode->scanSeq, pColList,
|
||||
pResBlockDumy, pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo);
|
||||
SOperatorInfo* pOperatorDumy = createTableScanOperatorInfo(pTableScanNode, pDataReader, pHandle, pTaskInfo);
|
||||
|
||||
// int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo,
|
||||
// queryId, taskId);
|
||||
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
|
||||
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
||||
|
||||
|
|
|
@ -13,8 +13,8 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "filter.h"
|
||||
#include "function.h"
|
||||
#include "filter.h"
|
||||
#include "functionMgt.h"
|
||||
#include "os.h"
|
||||
#include "querynodes.h"
|
||||
|
@ -1408,35 +1408,33 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
|||
char str[512] = {0};
|
||||
int32_t count = 0;
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, pInfo->readHandle.meta, 0);
|
||||
|
||||
while (pInfo->curPos < pInfo->pTableGroups->numOfTables && count < pOperator->resultInfo.capacity) {
|
||||
STableKeyInfo* item = taosArrayGet(pa, pInfo->curPos);
|
||||
metaGetTableEntryByUid(&mr, item->uid);
|
||||
|
||||
for (int32_t j = 0; j < pOperator->numOfExprs; ++j) {
|
||||
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
|
||||
|
||||
// refactor later
|
||||
if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
|
||||
metaReaderInit(&mr, pInfo->readHandle.meta, 0);
|
||||
metaGetTableEntryByUid(&mr, item->uid);
|
||||
|
||||
STR_TO_VARSTR(str, mr.me.name);
|
||||
metaReaderClear(&mr);
|
||||
|
||||
colDataAppend(pDst, count, str, false);
|
||||
// data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.pColumns->info.colId, type, bytes);
|
||||
// dst = pColInfo->pData + count * pExprInfo[j].base.resSchema.bytes;
|
||||
// doSetTagValueToResultBuf(dst, data, type, bytes);
|
||||
} else { // it is a tag value
|
||||
const char* p = metaGetTableTagVal(&mr.me, pExprInfo[j].base.pParam[0].pCol->colId);
|
||||
colDataAppend(pDst, count, p, (p == NULL));
|
||||
}
|
||||
|
||||
count += 1;
|
||||
}
|
||||
|
||||
count += 1;
|
||||
if (++pInfo->curPos >= pInfo->pTableGroups->numOfTables) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
}
|
||||
|
||||
metaReaderClear(&mr);
|
||||
|
||||
// qDebug("QInfo:0x%"PRIx64" create tag values results completed, rows:%d", GET_TASKID(pRuntimeEnv), count);
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
setTaskStatus(pTaskInfo, TASK_COMPLETED);
|
||||
|
|
|
@ -19,17 +19,18 @@ int32_t udf2_destroy() {
|
|||
|
||||
int32_t udf2_start(SUdfInterBuf *buf) {
|
||||
*(int64_t*)(buf->buf) = 0;
|
||||
buf->bufLen = sizeof(int64_t);
|
||||
buf->bufLen = sizeof(double);
|
||||
buf->numOfResult = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) {
|
||||
int64_t sumSquares = *(int64_t*)interBuf->buf;
|
||||
double sumSquares = *(double*)interBuf->buf;
|
||||
int8_t numOutput = 0;
|
||||
for (int32_t i = 0; i < block->numOfCols; ++i) {
|
||||
SUdfColumn* col = block->udfCols[i];
|
||||
if (col->colMeta.type != TSDB_DATA_TYPE_INT) {
|
||||
if (!(col->colMeta.type == TSDB_DATA_TYPE_INT ||
|
||||
col->colMeta.type == TSDB_DATA_TYPE_DOUBLE)) {
|
||||
return TSDB_CODE_UDF_INVALID_INPUT;
|
||||
}
|
||||
}
|
||||
|
@ -39,17 +40,29 @@ int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInte
|
|||
if (udfColDataIsNull(col, j)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
char* cell = udfColDataGetData(col, j);
|
||||
int32_t num = *(int32_t*)cell;
|
||||
sumSquares += num * num;
|
||||
switch (col->colMeta.type) {
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
char* cell = udfColDataGetData(col, j);
|
||||
int32_t num = *(int32_t*)cell;
|
||||
sumSquares += num * num;
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_DOUBLE: {
|
||||
char* cell = udfColDataGetData(col, j);
|
||||
double num = *(double*)cell;
|
||||
sumSquares += num * num;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
numOutput = 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (numOutput == 1) {
|
||||
*(int64_t*)(newInterBuf->buf) = sumSquares;
|
||||
newInterBuf->bufLen = sizeof(int64_t);
|
||||
*(double*)(newInterBuf->buf) = sumSquares;
|
||||
newInterBuf->bufLen = sizeof(double);
|
||||
}
|
||||
newInterBuf->numOfResult = numOutput;
|
||||
return 0;
|
||||
|
@ -60,7 +73,7 @@ int32_t udf2_finish(SUdfInterBuf* buf, SUdfInterBuf *resultData) {
|
|||
resultData->numOfResult = 0;
|
||||
return 0;
|
||||
}
|
||||
int64_t sumSquares = *(int64_t*)(buf->buf);
|
||||
double sumSquares = *(double*)(buf->buf);
|
||||
*(double*)(resultData->buf) = sqrt(sumSquares);
|
||||
resultData->bufLen = sizeof(double);
|
||||
resultData->numOfResult = 1;
|
||||
|
|
|
@ -3,7 +3,7 @@ system sh/stop_dnodes.sh
|
|||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/cfg.sh -n dnode1 -c wallevel -v 2
|
||||
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1
|
||||
system sh/cfg.sh -n dnode1 -c startUdfd -v 1
|
||||
system sh/cfg.sh -n dnode1 -c udf -v 1
|
||||
|
||||
print ========= start dnode1 as LEADER
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
|
|
|
@ -114,7 +114,7 @@ class TDTestCase:
|
|||
|
||||
|
||||
def tmqCase1(self, cfgPath, buildPath):
|
||||
tdLog.printNoPrefix("======== test case 1: ")
|
||||
tdLog.printNoPrefix("======== test case 1: Produce while consume")
|
||||
tdLog.info("step 1: create database, stb, ctb and insert data")
|
||||
# create and start thread
|
||||
parameterDict = {'cfg': '', \
|
||||
|
@ -122,8 +122,8 @@ class TDTestCase:
|
|||
'vgroups': 1, \
|
||||
'stbName': 'stb', \
|
||||
'ctbNum': 10, \
|
||||
'rowsPerTbl': 10, \
|
||||
'batchNum': 200, \
|
||||
'rowsPerTbl': 1000, \
|
||||
'batchNum': 100, \
|
||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||
parameterDict['cfg'] = cfgPath
|
||||
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||
|
@ -219,7 +219,6 @@ class TDTestCase:
|
|||
|
||||
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||
|
||||
|
||||
def tmqCase2(self, cfgPath, buildPath):
|
||||
tdLog.printNoPrefix("======== test case 2: add child table with consuming ")
|
||||
# create and start thread
|
||||
|
@ -340,6 +339,128 @@ class TDTestCase:
|
|||
|
||||
tdLog.printNoPrefix("======== test case 2 end ...... ")
|
||||
|
||||
def tmqCase3(self, cfgPath, buildPath):
|
||||
tdLog.printNoPrefix("======== test case 3: tow topics, each contains a stable, \
|
||||
but at the beginning, no ctables in the stable of one topic,\
|
||||
after starting consumer, create ctables ")
|
||||
# create and start thread
|
||||
parameterDict = {'cfg': '', \
|
||||
'dbName': 'db2', \
|
||||
'vgroups': 1, \
|
||||
'stbName': 'stb', \
|
||||
'ctbNum': 10, \
|
||||
'rowsPerTbl': 10000, \
|
||||
'batchNum': 100, \
|
||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||
parameterDict['cfg'] = cfgPath
|
||||
|
||||
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||
prepareEnvThread.start()
|
||||
|
||||
# wait db ready
|
||||
while 1:
|
||||
tdSql.query("show databases")
|
||||
if tdSql.getRows() == 4:
|
||||
print (tdSql.getData(0,0), tdSql.getData(1,0),tdSql.getData(2,0),)
|
||||
break
|
||||
else:
|
||||
time.sleep(1)
|
||||
|
||||
tdSql.query("use %s"%parameterDict['dbName'])
|
||||
# wait stb ready
|
||||
while 1:
|
||||
tdSql.query("show %s.stables"%parameterDict['dbName'])
|
||||
if tdSql.getRows() == 1:
|
||||
break
|
||||
else:
|
||||
time.sleep(1)
|
||||
|
||||
tdLog.info("create topics from super table")
|
||||
topicFromStb = 'topic_stb_column2'
|
||||
topicFromCtb = 'topic_ctb_column2'
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName']))
|
||||
|
||||
time.sleep(1)
|
||||
tdSql.query("show topics")
|
||||
topic1 = tdSql.getData(0 , 0)
|
||||
topic2 = tdSql.getData(1 , 0)
|
||||
tdLog.info("show topics: %s, %s"%(topic1, topic2))
|
||||
if topic1 != topicFromStb and topic1 != topicFromCtb:
|
||||
tdLog.exit("topic error1")
|
||||
if topic2 != topicFromStb and topic2 != topicFromCtb:
|
||||
tdLog.exit("topic error2")
|
||||
|
||||
tdLog.info("create consume info table and consume result table")
|
||||
cdbName = parameterDict["dbName"]
|
||||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
|
||||
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
||||
|
||||
rowsOfNewCtb = 1000
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + rowsOfNewCtb
|
||||
topicList = topicFromStb
|
||||
ifcheckdata = 0
|
||||
keyList = 'group.id:cgrp1,\
|
||||
enable.auto.commit:false,\
|
||||
auto.commit.interval.ms:6000,\
|
||||
auto.offset.reset:earliest'
|
||||
sql = "insert into consumeinfo values "
|
||||
sql += "(now, %d, '%s', '%s', %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata)
|
||||
tdSql.query(sql)
|
||||
|
||||
tdLog.info("check stb if there are data")
|
||||
while 1:
|
||||
tdSql.query("select count(*) from %s"%parameterDict["stbName"])
|
||||
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
|
||||
countOfStb = tdSql.getData(0, 0)
|
||||
if countOfStb != 0:
|
||||
tdLog.info("count from stb: %d"%countOfStb)
|
||||
break
|
||||
else:
|
||||
time.sleep(1)
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
pollDelay = 5
|
||||
showMsg = 1
|
||||
showRow = 1
|
||||
|
||||
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, parameterDict["dbName"], showMsg, showRow, cdbName)
|
||||
shellCmd += "> /dev/null 2>&1 &"
|
||||
tdLog.info(shellCmd)
|
||||
os.system(shellCmd)
|
||||
|
||||
# create new child table and insert data
|
||||
newCtbName = 'newctb'
|
||||
tdSql.query("create table %s.%s using %s.%s tags(9999)"%(parameterDict["dbName"], newCtbName, parameterDict["dbName"], parameterDict["stbName"]))
|
||||
startTs = parameterDict["startTs"]
|
||||
for j in range(rowsOfNewCtb):
|
||||
sql = "insert into %s.%s values (%d, %d, 'tmqrow_%d') "%(parameterDict["dbName"], newCtbName, startTs + j, j, j)
|
||||
tdSql.execute(sql)
|
||||
tdLog.debug("insert data into new child table ............ [OK]")
|
||||
|
||||
# wait for data ready
|
||||
prepareEnvThread.join()
|
||||
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
while 1:
|
||||
tdSql.query("select * from consumeresult")
|
||||
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
|
||||
if tdSql.getRows() == 1:
|
||||
break
|
||||
else:
|
||||
time.sleep(5)
|
||||
|
||||
tdSql.checkData(0 , 1, consumerId)
|
||||
tdSql.checkData(0 , 3, expectrowcnt)
|
||||
|
||||
tdSql.query("drop topic %s"%topicFromStb)
|
||||
tdSql.query("drop topic %s"%topicFromCtb)
|
||||
|
||||
tdLog.printNoPrefix("======== test case 3 end ...... ")
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
|
||||
|
@ -352,7 +473,7 @@ class TDTestCase:
|
|||
tdLog.info("cfgPath: %s" % cfgPath)
|
||||
|
||||
self.tmqCase1(cfgPath, buildPath)
|
||||
#self.tmqCase2(cfgPath, buildPath)
|
||||
self.tmqCase2(cfgPath, buildPath)
|
||||
#self.tmqCase3(cfgPath, buildPath)
|
||||
|
||||
def stop(self):
|
||||
|
|
|
@ -98,12 +98,24 @@ static void printHelp() {
|
|||
}
|
||||
|
||||
void initLogFile() {
|
||||
// FILE *fp = fopen(g_stConfInfo.resultFileName, "a");
|
||||
char file[256];
|
||||
sprintf(file, "%s/../log/tmqlog.txt", configDir);
|
||||
TdFilePtr pFile = taosOpenFile(file, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
|
||||
time_t now;
|
||||
struct tm curTime;
|
||||
char filename[256];
|
||||
|
||||
now = taosTime(NULL);
|
||||
taosLocalTime(&now, &curTime);
|
||||
sprintf(filename,"%s/../log/tmqlog_%04d-%02d-%02d %02d-%02d-%02d.txt",
|
||||
configDir,
|
||||
curTime.tm_year+1900,
|
||||
curTime.tm_mon+1,
|
||||
curTime.tm_mday,
|
||||
curTime.tm_hour,
|
||||
curTime.tm_min,
|
||||
curTime.tm_sec);
|
||||
//sprintf(filename, "%s/../log/tmqlog.txt", configDir);
|
||||
TdFilePtr pFile = taosOpenFile(filename, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
|
||||
if (NULL == pFile) {
|
||||
fprintf(stderr, "Failed to open %s for save result\n", "./tmqlog.txt");
|
||||
fprintf(stderr, "Failed to open %s for save result\n", filename);
|
||||
exit(-1);
|
||||
}
|
||||
g_fp = pFile;
|
||||
|
|
Loading…
Reference in New Issue