fix:merge from main

This commit is contained in:
wangmm0220 2023-03-31 16:20:21 +08:00
commit 97b5c7abee
56 changed files with 841 additions and 308 deletions

View File

@ -2,7 +2,7 @@
# taosadapter
ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
GIT_TAG d8059ff
GIT_TAG cb1e89c
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG e82b9fc
GIT_TAG d194dc9
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -293,7 +293,6 @@ You configure the following parameters when creating a consumer:
| `auto.offset.reset` | enum | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
| `enable.auto.commit` | boolean | Commit automatically | Specify `true` or `false`. |
| `auto.commit.interval.ms` | integer | Interval for automatic commits, in milliseconds |
| `enable.heartbeat.background` | boolean | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | |
| `experimental.snapshot.enable` | boolean | Specify whether to consume messages from the WAL or from TSBS | |
| `msg.with.table.name` | boolean | Specify whether to deserialize table names from messages |
@ -368,7 +367,6 @@ conf := &tmq.ConfigMap{
"td.connect.port": "6030",
"client.id": "test_tmq_c",
"enable.auto.commit": "false",
"enable.heartbeat.background": "true",
"experimental.snapshot.enable": "true",
"msg.with.table.name": "true",
}
@ -418,7 +416,6 @@ Python programs use the following parameters:
| `auto.commit.interval.ms` | string | Interval for automatic commits, in milliseconds | |
| `auto.offset.reset` | string | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
| `experimental.snapshot.enable` | string | Specify whether to consume messages from the WAL or from TSDB | Specify `true` or `false` |
| `enable.heartbeat.background` | string | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | Specify `true` or `false` |
</TabItem>

View File

@ -35,7 +35,6 @@ func main() {
"td.connect.port": "6030",
"client.id": "test_tmq_client",
"enable.auto.commit": "false",
"enable.heartbeat.background": "true",
"experimental.snapshot.enable": "true",
"msg.with.table.name": "true",
})

View File

@ -291,7 +291,6 @@ CREATE TOPIC topic_name AS DATABASE db_name;
| `auto.offset.reset` | enum | 消费组订阅的初始位置 | 可选:`earliest`(default), `latest`, `none` |
| `enable.auto.commit` | boolean | 是否启用消费位点自动提交 | 合法值:`true`, `false`。 |
| `auto.commit.interval.ms` | integer | 以毫秒为单位的消费记录自动提交消费位点时间间 | 默认 5000 m |
| `enable.heartbeat.background` | boolean | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 默认开启 |
| `experimental.snapshot.enable` | boolean | 是否允许从 TSDB 消费数据 | 实验功能,默认关闭 |
| `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句) | |
@ -366,7 +365,6 @@ conf := &tmq.ConfigMap{
"td.connect.port": "6030",
"client.id": "test_tmq_c",
"enable.auto.commit": "false",
"enable.heartbeat.background": "true",
"experimental.snapshot.enable": "true",
"msg.with.table.name": "true",
}
@ -418,7 +416,6 @@ consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
| `auto.commit.interval.ms` | string | 以毫秒为单位的自动提交时间间隔 | 默认值5000 ms |
| `auto.offset.reset` | string | 消费组订阅的初始位置 | 可选:`earliest`(default), `latest`, `none` |
| `experimental.snapshot.enable` | string | 是否允许从 TSDB 消费数据 | 合法值:`true`, `false` |
| `enable.heartbeat.background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` |
</TabItem>

View File

@ -185,7 +185,7 @@ typedef struct SBlockID {
typedef struct SDataBlockInfo {
STimeWindow window;
int32_t rowSize;
int32_t rows; // todo hide this attribute
int64_t rows; // todo hide this attribute
uint32_t capacity;
SBlockID id;
int16_t hasVarCol;

View File

@ -78,7 +78,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
* @param SReadHandle
* @return
*/
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, SSchemaWrapper** pSchema);
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, uint64_t id);
/**
* set the task Id, usually used by message queue process
@ -89,6 +89,7 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
/**
* Set multiple input data blocks for the stream scan.
* @param tinfo

View File

@ -112,7 +112,7 @@ typedef struct SResultDataInfo {
typedef struct SInputColumnInfoData {
int32_t totalRows; // total rows in current columnar data
int32_t startRowIndex; // handle started row index
int32_t numOfRows; // the number of rows needs to be handled
int64_t numOfRows; // the number of rows needs to be handled
int32_t numOfInputCols; // PTS is not included
bool colDataSMAIsSet; // if agg is set or not
SColumnInfoData *pPTS; // primary timestamp column

View File

@ -257,7 +257,6 @@ cleanup:
kvVal->f = (float)result;
#define SET_BIGINT \
if (smlDoubleToInt64OverFlow(result)) { \
errno = 0; \
int64_t tmp = taosStr2Int64(pVal, &endptr, 10); \
if (errno == ERANGE) { \
@ -265,11 +264,7 @@ cleanup:
return false; \
} \
kvVal->type = TSDB_DATA_TYPE_BIGINT; \
kvVal->i = tmp; \
return true; \
} \
kvVal->type = TSDB_DATA_TYPE_BIGINT; \
kvVal->i = (int64_t)result;
kvVal->i = tmp;
#define SET_INT \
if (!IS_VALID_INT(result)) { \
@ -288,7 +283,6 @@ cleanup:
kvVal->i = result;
#define SET_UBIGINT \
if (result >= (double)UINT64_MAX || result < 0) { \
errno = 0; \
uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10); \
if (errno == ERANGE || result < 0) { \
@ -296,11 +290,7 @@ cleanup:
return false; \
} \
kvVal->type = TSDB_DATA_TYPE_UBIGINT; \
kvVal->u = tmp; \
return true; \
} \
kvVal->type = TSDB_DATA_TYPE_UBIGINT; \
kvVal->u = result;
kvVal->u = tmp;
#define SET_UINT \
if (!IS_VALID_UINT(result)) { \

View File

@ -320,15 +320,16 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
}
if (strcasecmp(key, "enable.heartbeat.background") == 0) {
if (strcasecmp(value, "true") == 0) {
conf->hbBgEnable = true;
return TMQ_CONF_OK;
} else if (strcasecmp(value, "false") == 0) {
conf->hbBgEnable = false;
return TMQ_CONF_OK;
} else {
// if (strcasecmp(value, "true") == 0) {
// conf->hbBgEnable = true;
// return TMQ_CONF_OK;
// } else if (strcasecmp(value, "false") == 0) {
// conf->hbBgEnable = false;
// return TMQ_CONF_OK;
// } else {
tscError("the default value of enable.heartbeat.background is true, can not be seted");
return TMQ_CONF_INVALID;
}
// }
}
if (strcasecmp(key, "td.connect.ip") == 0) {
@ -535,10 +536,14 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
atomic_add_fetch_32(&pParamSet->totalRspNum, 1);
SEp* pEp = GET_ACTIVE_EP(&pVg->epSet);
tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%" PRId64 " prev:%" PRId64
", ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64,
tmq->consumerId, pOffset->subKey, pVg->vgId, pOffset->val.version, pVg->committedOffset.version, pEp->fqdn,
pEp->port, index + 1, totalVgroups, pMsgSendInfo->requestId);
char offsetBuf[80] = {0};
tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffset->val);
char commitBuf[80] = {0};
tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->committedOffset);
tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%" PRIx64,
tmq->consumerId, pOffset->subKey, pVg->vgId, offsetBuf, commitBuf, pEp->fqdn, pEp->port, index + 1,
totalVgroups, pMsgSendInfo->requestId);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);

View File

@ -982,7 +982,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
taosSort(pColInfoData->pData, pDataBlock->info.rows, pColInfoData->info.bytes, fn);
int64_t p1 = taosGetTimestampUs();
uDebug("blockDataSort easy cost:%" PRId64 ", rows:%d\n", p1 - p0, pDataBlock->info.rows);
uDebug("blockDataSort easy cost:%" PRId64 ", rows:%" PRId64 "\n", p1 - p0, pDataBlock->info.rows);
return TSDB_CODE_SUCCESS;
} else { // var data type
@ -1189,7 +1189,7 @@ void blockDataCleanup(SSDataBlock* pDataBlock) {
void blockDataEmpty(SSDataBlock* pDataBlock) {
SDataBlockInfo* pInfo = &pDataBlock->info;
if (pInfo->capacity == 0 || pInfo->rows > pDataBlock->info.capacity) {
if (pInfo->capacity == 0) {
return;
}
@ -1748,14 +1748,14 @@ int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
int64_t tbUid = pBlock->info.id.uid;
int16_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
int16_t hasVarCol = pBlock->info.hasVarCol;
int32_t rows = pBlock->info.rows;
int64_t rows = pBlock->info.rows;
int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, tbUid);
tlen += taosEncodeFixedI16(buf, numOfCols);
tlen += taosEncodeFixedI16(buf, hasVarCol);
tlen += taosEncodeFixedI32(buf, rows);
tlen += taosEncodeFixedI64(buf, rows);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
@ -1786,7 +1786,7 @@ void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
buf = taosDecodeFixedU64(buf, &pBlock->info.id.uid);
buf = taosDecodeFixedI16(buf, &numOfCols);
buf = taosDecodeFixedI16(buf, &pBlock->info.hasVarCol);
buf = taosDecodeFixedI32(buf, &pBlock->info.rows);
buf = taosDecodeFixedI64(buf, &pBlock->info.rows);
buf = taosDecodeFixedI32(buf, &sz);
pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData));
for (int32_t i = 0; i < sz; i++) {
@ -1990,7 +1990,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
int32_t len = 0;
len += snprintf(dumpBuf + len, size - len,
"===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64
"|rows:%d|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n",
"|rows:%" PRId64 "|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n",
flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId,
pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version,
pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName);

View File

@ -2453,6 +2453,11 @@ int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t byt
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0);
if (code) goto _exit;
} else {
if(ASSERT(varDataTLen(data + offset) <= bytes)){
uError("var data length invalid, varDataTLen(data + offset):%d <= bytes:%d", (int)varDataTLen(data + offset), bytes);
code = TSDB_CODE_INVALID_PARA;
goto _exit;
}
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_VALUE](pColData, (uint8_t *)varDataVal(data + offset),
varDataLen(data + offset));
}

View File

@ -24,10 +24,10 @@ extern "C" {
enum {
MQ_CONSUMER_STATUS__MODIFY = 1,
MQ_CONSUMER_STATUS__MODIFY_IN_REB, // this value is not used anymore
// MQ_CONSUMER_STATUS__MODIFY_IN_REB, // this value is not used anymore
MQ_CONSUMER_STATUS__READY,
MQ_CONSUMER_STATUS__LOST,
MQ_CONSUMER_STATUS__LOST_IN_REB, // this value is not used anymore
// MQ_CONSUMER_STATUS__LOST_IN_REB, // this value is not used anymore
MQ_CONSUMER_STATUS__LOST_REBD,
MQ_CONSUMER_STATUS__REMOVED,
};

View File

@ -337,7 +337,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
}
taosRUnLockLatch(&pConsumer->lock);
} else if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
} else if (status == MQ_CONSUMER_STATUS__MODIFY) {
taosRLockLatch(&pConsumer->lock);
int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
@ -876,17 +876,11 @@ static void updateConsumerStatus(SMqConsumerObj* pConsumer) {
int32_t status = pConsumer->status;
if (taosArrayGetSize(pConsumer->rebNewTopics) == 0 && taosArrayGetSize(pConsumer->rebRemovedTopics) == 0) {
if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
if (status == MQ_CONSUMER_STATUS__MODIFY) {
pConsumer->status = MQ_CONSUMER_STATUS__READY;
} else if (status == MQ_CONSUMER_STATUS__LOST_IN_REB || status == MQ_CONSUMER_STATUS__LOST) {
} else if (status == MQ_CONSUMER_STATUS__LOST) {
pConsumer->status = MQ_CONSUMER_STATUS__LOST_REBD;
}
} else {
if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
pConsumer->status = MQ_CONSUMER_STATUS__MODIFY;
} else if (status == MQ_CONSUMER_STATUS__LOST || status == MQ_CONSUMER_STATUS__LOST_IN_REB) {
pConsumer->status = MQ_CONSUMER_STATUS__LOST;
}
}
}
@ -1195,10 +1189,8 @@ static const char *mndConsumerStatusName(int status) {
return "ready";
case MQ_CONSUMER_STATUS__LOST:
case MQ_CONSUMER_STATUS__LOST_REBD:
case MQ_CONSUMER_STATUS__LOST_IN_REB:
return "lost";
case MQ_CONSUMER_STATUS__MODIFY:
case MQ_CONSUMER_STATUS__MODIFY_IN_REB:
return "rebalancing";
default:
return "unknown";

View File

@ -178,7 +178,7 @@ typedef struct STsdbReader STsdbReader;
int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num);
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr);
SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly);
void tsdbReaderSetId(STsdbReader* pReader, const char* idstr);
void tsdbReaderClose(STsdbReader *pReader);

View File

@ -224,6 +224,8 @@ int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward,
void *tsdbTbDataIterDestroy(STbDataIter *pIter);
void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter);
bool tsdbTbDataIterNext(STbDataIter *pIter);
void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj* pTableMap, int64_t *rowsNum);
// STbData
int32_t tsdbGetNRowsInTbData(STbData *pTbData);
// tsdbFile.c ==============================================================================================

View File

@ -669,7 +669,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
#endif
for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) {
SSDataBlock *output = taosArrayGetP(pResList, i);
smaDebug("result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%d", output->info.id.uid,
smaDebug("result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRId64, output->info.id.uid,
output->info.id.groupId, output->info.rows);
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);

View File

@ -373,8 +373,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
char formatBuf[80];
tFormatOffset(formatBuf, 80, pOffsetVal);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.",
consumerId, pHandle->subKey, vgId, formatBuf);
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue. reqId:0x%"PRIx64,
consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId);
return 0;
} else {
// no poll occurs in this vnode for this topic, let's seek to the right offset value.
@ -772,7 +772,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
STqHandle tqHandle = {0};
pHandle = &tqHandle;
/*taosInitRWLatch(&pExec->lock);*/
uint64_t oldConsumerId = pHandle->consumerId;
memcpy(pHandle->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
@ -807,7 +806,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
req.qmsg = NULL;
pHandle->execHandle.task =
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId, &pHandle->execHandle.numOfCols, NULL);
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, vgId, &pHandle->execHandle.numOfCols, req.newConsumerId);
void* scanner = NULL;
qExtractStreamScanner(pHandle->execHandle.task, &scanner);
pHandle->execHandle.pExecReader = qExtractReaderFromStreamScanner(scanner);
@ -820,7 +819,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
(SSnapContext**)(&handle.sContext));
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, NULL);
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
pHandle->pWalReader = walOpenReader(pVnode->pWal, NULL);
pHandle->execHandle.execTb.suid = req.suid;
@ -838,7 +837,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
buildSnapContext(handle.meta, handle.version, req.suid, pHandle->execHandle.subType, pHandle->fetchMeta,
(SSnapContext**)(&handle.sContext));
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, NULL);
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, vgId, NULL, req.newConsumerId);
}
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));

View File

@ -307,7 +307,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
handle.execHandle.task =
qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, NULL);
qCreateQueueExecTaskInfo(handle.execHandle.execCol.qmsg, &reader, vgId, &handle.execHandle.numOfCols, 0);
if (handle.execHandle.task == NULL) {
tqError("cannot create exec task for %s", handle.subKey);
code = -1;
@ -332,7 +332,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
buildSnapContext(reader.meta, reader.version, 0, handle.execHandle.subType, handle.fetchMeta,
(SSnapContext**)(&reader.sContext));
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, NULL);
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0);
} else if (handle.execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
@ -341,7 +341,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
tqDebug("vgId:%d, tq try to get all ctb, suid:%" PRId64, pTq->pVnode->config.vgId, handle.execHandle.execTb.suid);
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t tbUid = *(int64_t*)taosArrayGet(tbUidList, i);
tqDebug("vgId:%d, idx %d, uid:%" PRId64, TD_VID(pTq->pVnode), i, tbUid);
tqDebug("vgId:%d, idx %d, uid:%" PRId64, vgId, i, tbUid);
}
handle.execHandle.pExecReader = tqOpenReader(pTq->pVnode);
tqReaderSetTbUidList(handle.execHandle.pExecReader, tbUidList);
@ -349,9 +349,9 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
buildSnapContext(reader.meta, reader.version, handle.execHandle.execTb.suid, handle.execHandle.subType,
handle.fetchMeta, (SSnapContext**)(&reader.sContext));
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, NULL);
handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0);
}
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, TD_VID(pTq->pVnode));
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, vgId);
taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle));
}

View File

@ -83,12 +83,12 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
uint64_t ts = 0;
tqDebug("consumer:0x%"PRIx64" vgId:%d, tmq task start execute", pHandle->consumerId, vgId);
if (qExecTask(task, &pDataBlock, &ts) < 0) {
if (qExecTask(task, &pDataBlock, &ts) != TSDB_CODE_SUCCESS) {
tqError("consumer:0x%"PRIx64" vgId:%d, task exec error since %s", pHandle->consumerId, vgId, terrstr());
return -1;
}
tqDebug("consumer:0x%"PRIx64" vgId:%d tmq task executed, rows:%d, total blocks:%d", pHandle->consumerId, vgId,
tqDebug("consumer:0x%"PRIx64" vgId:%d tmq task executed, rows:%"PRId64", total blocks:%d", pHandle->consumerId, vgId,
pDataBlock->info.rows, pRsp->blockNum);
// current scan should be stopped asap, since the rebalance occurs.

View File

@ -282,6 +282,40 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) {
return true;
}
int64_t tsdbCountTbDataRows(STbData *pTbData) {
SMemSkipListNode *pNode = pTbData->sl.pHead;
int64_t rowsNum = 0;
while (NULL != pNode) {
pNode = SL_GET_NODE_FORWARD(pNode, 0);
if (pNode == pTbData->sl.pTail) {
return rowsNum;
}
rowsNum++;
}
return rowsNum;
}
void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj* pTableMap, int64_t *rowsNum) {
taosRLockLatch(&pMemTable->latch);
for (int32_t i = 0; i < pMemTable->nBucket; ++i) {
STbData *pTbData = pMemTable->aBucket[i];
while (pTbData) {
void* p = taosHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid));
if (p == NULL) {
pTbData = pTbData->next;
continue;
}
*rowsNum += tsdbCountTbDataRows(pTbData);
pTbData = pTbData->next;
}
}
taosRUnLockLatch(&pMemTable->latch);
}
static int32_t tsdbMemTableRehash(SMemTable *pMemTable) {
int32_t code = 0;

View File

@ -25,6 +25,11 @@ typedef enum {
EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;
typedef enum {
READ_MODE_COUNT_ONLY = 0x1,
READ_MODE_ALL,
} EReadMode;
typedef struct {
STbDataIter* iter;
int32_t index;
@ -168,6 +173,8 @@ struct STsdbReader {
uint64_t suid;
int16_t order;
bool freeBlock;
EReadMode readMode;
uint64_t rowsNum;
STimeWindow window; // the primary query time window that applies to all queries
SSDataBlock* pResBlock;
int32_t capacity;
@ -1777,7 +1784,7 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
setComposedBlockFlag(pReader, true);
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%" PRId64 ", brange:%" PRId64
" - %" PRId64 ", uid:%" PRIu64 ", %s",
pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
pBlockScanInfo->uid, pReader->idStr);
@ -2770,7 +2777,7 @@ _end:
if (pResBlock->info.rows > 0) {
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
" rows:%d, elapsed time:%.2f ms %s",
" rows:%" PRId64 ", elapsed time:%.2f ms %s",
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, el, pReader->idStr);
}
@ -3022,7 +3029,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
if (pResBlock->info.rows > 0) {
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
" rows:%d, elapsed time:%.2f ms %s",
" rows:%" PRId64 ", elapsed time:%.2f ms %s",
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, el, pReader->idStr);
return TSDB_CODE_SUCCESS;
@ -3106,7 +3113,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
if (pResBlock->info.rows > 0) {
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
" rows:%d, elapsed time:%.2f ms %s",
" rows:%" PRId64 ", elapsed time:%.2f ms %s",
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, el, pReader->idStr);
}
@ -3132,6 +3139,151 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
return code;
}
static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReader) {
int64_t st = taosGetTimestampUs();
LRUHandle* handle = NULL;
int32_t code = tsdbCacheGetBlockIdx(pFileReader->pTsdb->biCache, pFileReader, &handle);
if (code != TSDB_CODE_SUCCESS || handle == NULL) {
goto _end;
}
int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle);
size_t num = taosArrayGetSize(aBlockIdx);
if (num == 0) {
tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
return TSDB_CODE_SUCCESS;
}
SBlockIdx* pBlockIdx = NULL;
int32_t i = 0;
for (int32_t i = 0; i < num; ++i) {
pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
if (pBlockIdx->suid != pReader->suid) {
continue;
}
STableBlockScanInfo** p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(pBlockIdx->uid));
if (p == NULL) {
continue;
}
STableBlockScanInfo *pScanInfo = *p;
tMapDataReset(&pScanInfo->mapData);
tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
SDataBlk block = {0};
for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block);
pReader->rowsNum += block.nRow;
}
}
_end:
tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
return code;
}
static int32_t doSumSttBlockRows(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
SSttBlockLoadInfo* pBlockLoadInfo = NULL;
for (int32_t i = 0; i < pReader->pFileReader->pSet->nSttF; ++i) { // open all last file
pBlockLoadInfo = &pLastBlockReader->pInfo[i];
code = tsdbReadSttBlk(pReader->pFileReader, i, pBlockLoadInfo->aSttBlk);
if (code) {
return code;
}
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
if (size >= 1) {
SSttBlk *pStart = taosArrayGet(pBlockLoadInfo->aSttBlk, 0);
SSttBlk *pEnd = taosArrayGet(pBlockLoadInfo->aSttBlk, size - 1);
// all identical
if (pStart->suid == pEnd->suid) {
if (pStart->suid != pReader->suid) {
// no qualified stt block existed
taosArrayClear(pBlockLoadInfo->aSttBlk);
continue;
}
for (int32_t i = 0; i < size; ++i) {
SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
pReader->rowsNum += p->nRow;
}
} else {
for (int32_t i = 0; i < size; ++i) {
SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
uint64_t s = p->suid;
if (s < pReader->suid) {
continue;
}
if (s == pReader->suid) {
pReader->rowsNum += p->nRow;
} else if (s > pReader->suid) {
break;
}
}
}
}
}
return code;
}
static int32_t readRowsCountFromFiles(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
while (1) {
bool hasNext = false;
int32_t code = filesetIteratorNext(&pReader->status.fileIter, pReader, &hasNext);
if (code) {
return code;
}
if (!hasNext) { // no data files on disk
break;
}
code = doSumFileBlockRows(pReader, pReader->pFileReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doSumSttBlockRows(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
pReader->status.loadFromFile = false;
return code;
}
static int32_t readRowsCountFromMem(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t memNum = 0, imemNum = 0;
if (pReader->pReadSnap->pMem != NULL) {
tsdbMemTableCountRows(pReader->pReadSnap->pMem, pReader->status.pTableMap, &memNum);
}
if (pReader->pReadSnap->pIMem != NULL) {
tsdbMemTableCountRows(pReader->pReadSnap->pIMem, pReader->status.pTableMap, &imemNum);
}
pReader->rowsNum += memNum + imemNum;
return code;
}
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
STableUidList* pUidList = &pStatus->uidList;
@ -4072,6 +4224,8 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
if (pStatus->fileIter.numOfFiles == 0) {
pStatus->loadFromFile = false;
} else if (READ_MODE_COUNT_ONLY == pReader->readMode) {
// DO NOTHING
} else {
code = initForFirstBlockInFile(pReader, pBlockIter);
}
@ -4090,7 +4244,7 @@ static void freeSchemaFunc(void* param) {
// ====================================== EXPOSED APIs ======================================
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr) {
SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr, bool countOnly) {
STimeWindow window = pCond->twindows;
if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
pCond->twindows.skey += 1;
@ -4190,6 +4344,10 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
pReader->suspended = true;
if (countOnly) {
pReader->readMode = READ_MODE_COUNT_ONLY;
}
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
return code;
@ -4490,6 +4648,33 @@ _err:
return code;
}
static bool tsdbReadRowsCountOnly(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
SSDataBlock* pBlock = pReader->pResBlock;
if (pReader->status.loadFromFile == false) {
return false;
}
code = readRowsCountFromFiles(pReader);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
code = readRowsCountFromMem(pReader);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
pBlock->info.rows = pReader->rowsNum;
pBlock->info.id.uid = 0;
pBlock->info.dataLoad = 0;
pReader->rowsNum = 0;
return pBlock->info.rows > 0;
}
static int32_t doTsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) {
int32_t code = TSDB_CODE_SUCCESS;
@ -4504,6 +4689,10 @@ static int32_t doTsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) {
return code;
}
if (READ_MODE_COUNT_ONLY == pReader->readMode) {
return tsdbReadRowsCountOnly(pReader);
}
if (pStatus->loadFromFile) {
code = buildBlockFromFiles(pReader);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -107,6 +107,7 @@ int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t
uint64_t tableListGetSize(const STableListInfo* pTableList);
uint64_t tableListGetSuid(const STableListInfo* pTableList);
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index);
int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex);
size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
void initResultRowInfo(SResultRowInfo* pResultRowInfo);

View File

@ -143,9 +143,6 @@ typedef struct {
SQueryTableDataCond tableCond;
int64_t fillHistoryVer1;
int64_t fillHistoryVer2;
// int8_t triggerSaved;
// int64_t deleteMarkSaved;
SStreamState* pState;
} SStreamTaskInfo;
@ -175,7 +172,6 @@ struct SExecTaskInfo {
int64_t owner; // if it is in execution
int32_t code;
int32_t qbufQuota; // total available buffer (in KB) during execution query
int64_t version; // used for stream to record wal version, why not move to sschemainfo
SStreamTaskInfo streamInfo;
SSchemaInfo schemaInfo;
@ -188,6 +184,7 @@ struct SExecTaskInfo {
SLocalFetch localFetch;
SArray* pResultBlockList; // result block list
STaskStopInfo stopInfo;
SRWLatch lock; // secure the access of STableListInfo
};
enum {
@ -339,6 +336,7 @@ typedef struct STableScanInfo {
int8_t scanMode;
int8_t assignBlockUid;
bool hasGroupByTag;
bool countOnly;
} STableScanInfo;
typedef struct STableMergeScanInfo {
@ -485,12 +483,6 @@ typedef struct SStreamScanInfo {
} SStreamScanInfo;
typedef struct {
// int8_t subType;
// bool withMeta;
// int64_t suid;
// int64_t snapVersion;
// void *metaInfo;
// void *dataInfo;
SVnode* vnode;
SSDataBlock pRes; // result SSDataBlock
STsdbReader* dataReader;
@ -693,6 +685,8 @@ typedef struct SStreamFillOperatorInfo {
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, char* dbFName);
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup,
__optr_close_fn_t closeFn, __optr_reqBuf_fn_t reqBufFn, __optr_explain_fn_t explain);
int32_t optrDummyOpenFn(SOperatorInfo* pOperator);

View File

@ -212,6 +212,11 @@ static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) {
return NULL;
}
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
if (blockDataGetNumOfRows(pBlock) == 0) {
continue;
}
SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
if (hasLimitOffsetInfo(pLimitInfo)) {
int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
@ -303,6 +308,11 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
pTaskInfo);
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pOperator->fpSet =
createOperatorFpSet(prepareLoadRemoteData, loadRemoteData, NULL, destroyExchangeOperatorInfo, optrDefaultBufFn, NULL);
return pOperator;

View File

@ -571,6 +571,10 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
memcpy(pStart, data, len);
pStart += len;
} else if (IS_VAR_DATA_TYPE(pValue->info.type)) {
if (varDataTLen(data) > pValue->info.bytes) {
code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
goto end;
}
memcpy(pStart, data, varDataTLen(data));
pStart += varDataTLen(data);
} else {
@ -1800,6 +1804,21 @@ STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index)
return taosArrayGet(pTableList->pTableList, index);
}
int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex) {
int32_t numOfTables = taosArrayGetSize(pTableList->pTableList);
if (startIndex >= numOfTables) {
return -1;
}
for (int32_t i = startIndex; i < numOfTables; ++i) {
STableKeyInfo* p = taosArrayGet(pTableList->pTableList, i);
if (p->uid == uid) {
return i;
}
}
return -1;
}
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
ASSERT(pTableList->map != NULL && slot != NULL);

View File

@ -242,29 +242,27 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks,
return code;
}
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, SSchemaWrapper** pSchema) {
if (msg == NULL) {
// create raw scan
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, uint64_t id) {
if (msg == NULL) { // create raw scan
SExecTaskInfo* pTaskInfo = doCreateExecTaskInfo(0, id, vgId, OPTR_EXEC_MODEL_QUEUE, "");
if (NULL == pTaskInfo) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTaskInfo->cost.created = taosGetTimestampUs();
pTaskInfo->execModel = OPTR_EXEC_MODEL_QUEUE;
pTaskInfo->pRoot = createRawScanOperatorInfo(pReaderHandle, pTaskInfo);
if (NULL == pTaskInfo->pRoot) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pTaskInfo);
return NULL;
}
qDebug("create raw scan task info completed, vgId:%d, %s", vgId, GET_TASKID(pTaskInfo));
return pTaskInfo;
}
struct SSubplan* pPlan = NULL;
int32_t code = qStringToSubplan(msg, &pPlan);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
@ -292,9 +290,6 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
}
}
if (pSchema) {
*pSchema = tCloneSSchemaWrapper(((SExecTaskInfo*)pTaskInfo)->schemaInfo.qsw);
}
return pTaskInfo;
}
@ -410,6 +405,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
}
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
taosWLockLatch(&pTaskInfo->lock);
for (int32_t i = 0; i < numOfQualifiedTables; ++i) {
uint64_t* uid = taosArrayGet(qa, i);
@ -424,6 +420,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(keyBuf);
taosArrayDestroy(qa);
taosWUnLockLatch(&pTaskInfo->lock);
return code;
}
}
@ -445,6 +442,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
tableListAddTableInfo(pTableListInfo, keyInfo.uid, keyInfo.groupId);
}
taosWUnLockLatch(&pTaskInfo->lock);
if (keyBuf != NULL) {
taosMemoryFree(keyBuf);
}
@ -452,7 +450,9 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
taosArrayDestroy(qa);
} else { // remove the table id in current list
qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList));
taosWLockLatch(&pTaskInfo->lock);
code = tqReaderRemoveTbUidList(pScanInfo->tqReader, tableIdList);
taosWUnLockLatch(&pTaskInfo->lock);
}
return code;
@ -1000,6 +1000,7 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
}
return 0;
}
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
return pTaskInfo->streamInfo.recoverScanFinished;
@ -1076,6 +1077,8 @@ int32_t qStreamSetScanMemData(qTaskInfo_t tinfo, SPackedData submit) {
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
STableListInfo* pTableListInfo = pTaskInfo->pTableInfoList;
const char* id = GET_TASKID(pTaskInfo);
// if pOffset equal to current offset, means continue consume
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.currentOffset)) {
@ -1086,21 +1089,24 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
pOperator->status = OP_OPENED;
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
if(pOperator->numOfDownstream != 1){
qError("pOperator->numOfDownstream != 1:%d", pOperator->numOfDownstream);
return TSDB_CODE_TMQ_CONSUMER_ERROR;
if (pOperator->numOfDownstream != 1) {
qError("invalid operator, number of downstream:%d, %s", pOperator->numOfDownstream, id);
return -1;
}
pOperator = pOperator->pDownstream[0];
}
SStreamScanInfo* pInfo = pOperator->info;
STableScanInfo* pScanInfo = pInfo->pTableScanOp->info;
STableScanBase* pScanBaseInfo = &pScanInfo->base;
if (pOffset->type == TMQ_OFFSET__LOG) {
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
tsdbReaderClose(pTSInfo->base.dataReader);
pTSInfo->base.dataReader = NULL;
tsdbReaderClose(pScanBaseInfo->dataReader);
pScanBaseInfo->dataReader = NULL;
// set version to read for wal is next, so +1
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, pTaskInfo->id.str) < 0) {
qError("tqSeekVer failed ver:%" PRId64, pOffset->version + 1);
qError("tqSeekVer failed ver:%"PRId64", %s", pOffset->version + 1, id);
return -1;
}
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
@ -1108,71 +1114,80 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
// those data are from the snapshot in tsdb, besides the data in the wal file.
int64_t uid = pOffset->uid;
int64_t ts = pOffset->ts;
int32_t index = 0;
// this value may be changed if new tables are created
taosRLockLatch(&pTaskInfo->lock);
int32_t numOfTables = tableListGetSize(pTableListInfo);
if (uid == 0) {
if (tableListGetSize(pTaskInfo->pTableInfoList) != 0) {
STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
if (numOfTables != 0) {
STableKeyInfo* pTableInfo = tableListGetInfo(pTableListInfo, 0);
uid = pTableInfo->uid;
ts = INT64_MIN;
pScanInfo->currentTable = 0;
} else {
qError("uid == 0 and tablelist size is 0");
taosRUnLockLatch(&pTaskInfo->lock);
qError("no table in table list, %s", id);
return -1;
}
}
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
qDebug("switch to table uid:%" PRId64 " ts:%" PRId64 "% "PRId64 " rows returned", uid, ts, pInfo->pTableScanOp->resultInfo.totalRows);
pInfo->pTableScanOp->resultInfo.totalRows = 0;
bool found = false;
for (int32_t i = 0; i < numOfTables; i++) {
STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, i);
if (pTableInfo->uid == uid) {
found = true;
pTableScanInfo->currentTable = i;
break;
}
}
// start from current accessed position
// we cannot start from the pScanInfo->currentTable, since the commit offset may cause the rollback of the start
// position, let's find it from the beginning.
index = tableListFind(pTableListInfo, uid, 0);
taosRUnLockLatch(&pTaskInfo->lock);
// TODO after dropping table, table may not found
if(!found){
qError("uid not found in tablelist %" PRId64, uid);
return -1;
}
if (pTableScanInfo->base.dataReader == NULL) {
STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
int32_t num = tableListGetSize(pTaskInfo->pTableInfoList);
if (tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &pTableScanInfo->base.cond, pList, num,
pTableScanInfo->pResBlock, &pTableScanInfo->base.dataReader, NULL) < 0 ||
pTableScanInfo->base.dataReader == NULL) {
qError("tsdbReaderOpen failed. uid:%" PRIi64, pOffset->uid);
return -1;
}
}
STableKeyInfo tki = {.uid = uid};
tsdbSetTableList(pTableScanInfo->base.dataReader, &tki, 1);
int64_t oldSkey = pTableScanInfo->base.cond.twindows.skey;
pTableScanInfo->base.cond.twindows.skey = ts + 1;
tsdbReaderReset(pTableScanInfo->base.dataReader, &pTableScanInfo->base.cond);
pTableScanInfo->base.cond.twindows.skey = oldSkey;
pTableScanInfo->scanTimes = 0;
qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid,
ts, pTableScanInfo->currentTable, numOfTables);
if (index >= 0) {
pScanInfo->currentTable = index;
} else {
qError("invalid pOffset->type:%d", pOffset->type);
qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid,
numOfTables, pScanInfo->currentTable, id);
return -1;
}
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
STableKeyInfo keyInfo = {.uid = uid};
int64_t oldSkey = pScanBaseInfo->cond.twindows.skey;
// let's start from the next ts that returned to consumer.
pScanBaseInfo->cond.twindows.skey = ts + 1;
pScanInfo->scanTimes = 0;
if (pScanBaseInfo->dataReader == NULL) {
int32_t code = tsdbReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1,
pScanInfo->pResBlock, &pScanBaseInfo->dataReader, id, false);
if (code != TSDB_CODE_SUCCESS) {
qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id);
terrno = code;
return -1;
}
qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s",
uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
} else {
tsdbSetTableList(pScanBaseInfo->dataReader, &keyInfo, 1);
tsdbReaderReset(pScanBaseInfo->dataReader, &pScanBaseInfo->cond);
qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 " table index:%d numOfTable:%d, %s",
uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
}
// restore the key value
pScanBaseInfo->cond.twindows.skey = oldSkey;
} else {
qError("invalid pOffset->type:%d, %s", pOffset->type, id);
return -1;
}
} else { // subType == TOPIC_SUB_TYPE__TABLE/TOPIC_SUB_TYPE__DB
if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
SStreamRawScanInfo* pInfo = pOperator->info;
SSnapContext* sContext = pInfo->sContext;
if (setForSnapShot(sContext, pOffset->uid) != 0) {
qError("setDataForSnapShot error. uid:%" PRIi64, pOffset->uid);
qError("setDataForSnapShot error. uid:%" PRId64" , %s", pOffset->uid, id);
return -1;
}
@ -1181,7 +1196,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
pInfo->dataReader = NULL;
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
tableListClear(pTaskInfo->pTableInfoList);
tableListClear(pTableListInfo);
if (mtInfo.uid == 0) {
return 0; // no data
@ -1190,23 +1205,19 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
if (pTaskInfo->pTableInfoList == NULL) {
pTaskInfo->pTableInfoList = tableListCreate();
}
tableListAddTableInfo(pTableListInfo, mtInfo.uid, 0);
tableListAddTableInfo(pTaskInfo->pTableInfoList, mtInfo.uid, 0);
STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
int32_t size = tableListGetSize(pTableListInfo);
STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
int32_t size = tableListGetSize(pTaskInfo->pTableInfoList);
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL);
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL, false);
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
tDeleteSSchemaWrapper(pTaskInfo->streamInfo.schema);
pTaskInfo->streamInfo.schema = mtInfo.schema;
qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64, mtInfo.uid, pOffset->ts);
qDebug("tmqsnap qStreamPrepareScan snapshot data uid:%" PRId64 " ts %" PRId64" %s", mtInfo.uid, pOffset->ts, id);
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_META) {
SStreamRawScanInfo* pInfo = pOperator->info;
SSnapContext* sContext = pInfo->sContext;
@ -1214,12 +1225,13 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
qError("setForSnapShot error. uid:%" PRIu64 " ,version:%" PRId64, pOffset->uid, pOffset->version);
return -1;
}
qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64, pOffset->uid, pOffset->ts);
qDebug("tmqsnap qStreamPrepareScan snapshot meta uid:%" PRId64 " ts %" PRId64 " %s", pOffset->uid, pOffset->ts, id);
} else if (pOffset->type == TMQ_OFFSET__LOG) {
SStreamRawScanInfo* pInfo = pOperator->info;
tsdbReaderClose(pInfo->dataReader);
pInfo->dataReader = NULL;
qDebug("tmqsnap qStreamPrepareScan snapshot log");
qDebug("tmqsnap qStreamPrepareScan snapshot log, %s", id);
}
}
pTaskInfo->streamInfo.currentOffset = *pOffset;

View File

@ -1201,7 +1201,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
blockDataEnsureCapacity(pBlock, pBlock->info.rows + pRow->numOfRows);
qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s",
qDebug("datablock capacity not sufficient, expand to required:%" PRId64 ", current capacity:%d, %s",
(pRow->numOfRows+pBlock->info.rows),
pBlock->info.capacity, GET_TASKID(pTaskInfo));
// todo set the pOperator->resultInfo size
@ -1214,7 +1214,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
pBlock->info.rows += pRow->numOfRows;
}
qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
pBlock->info.id.groupId);
pBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pBlock, 0);
@ -1974,7 +1974,7 @@ char* buildTaskId(uint64_t taskId, uint64_t queryId) {
return p;
}
static SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, char* dbFName) {
SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, char* dbFName) {
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
if (pTaskInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -1982,6 +1982,7 @@ static SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, in
}
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTaskInfo->cost.created = taosGetTimestampUs();
pTaskInfo->schemaInfo.dbname = taosStrdup(dbFName);
pTaskInfo->execModel = model;
@ -1989,6 +1990,7 @@ static SExecTaskInfo* doCreateExecTaskInfo(uint64_t queryId, uint64_t taskId, in
pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES);
taosInitRWLatch(&pTaskInfo->lock);
pTaskInfo->id.vgId = vgId;
pTaskInfo->id.queryId = queryId;
pTaskInfo->id.str = buildTaskId(taskId, queryId);
@ -2464,7 +2466,6 @@ int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHand
goto _complete;
}
(*pTaskInfo)->cost.created = taosGetTimestampUs();
return TSDB_CODE_SUCCESS;
_complete:

View File

@ -267,8 +267,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
pOperator->status = OP_OPENED;
return NULL;
}
qDebug("set op close, exec mode:%d, status %d rows %d", pTaskInfo->execModel, pOperator->status,
pFinalRes->info.rows);
qDebug("set op close, exec %d, status %d rows %" PRId64 , pTaskInfo->execModel, pOperator->status, pFinalRes->info.rows);
setOperatorCompleted(pOperator);
break;
}
@ -333,7 +332,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint.
if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) {
qDebug("project return %d rows, status %d", pFinalRes->info.rows, pOperator->status);
qDebug("project return %" PRId64 " rows, status %d", pFinalRes->info.rows, pOperator->status);
break;
}
} else {

View File

@ -31,6 +31,9 @@
#include "thash.h"
#include "ttypes.h"
int32_t scanDebug = 0;
#define MULTI_READER_MAX_TABLE_NUM 5000
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
@ -308,14 +311,14 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
taosMemoryFreeClear(pBlock->pBlockAgg);
if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 , GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->filterOutBlocks += 1;
pCost->totalRows += pBlock->info.rows;
tsdbReleaseDataBlock(pTableScanInfo->dataReader);
return TSDB_CODE_SUCCESS;
} else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d, uid:%" PRIu64, GET_TASKID(pTaskInfo),
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", uid:%" PRIu64, GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, pBlockInfo->id.uid);
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
pCost->skipBlocks += 1;
@ -326,7 +329,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
loadSMA = true; // mark the operation of load sma;
bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 , GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
tsdbReleaseDataBlock(pTableScanInfo->dataReader);
@ -346,7 +349,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
size_t size = taosArrayGetSize(pBlock->pDataBlock);
bool keep = doFilterByBlockSMA(pOperator->exprSupp.pFilterInfo, pBlock->pBlockAgg, size, pBlockInfo->rows);
if (!keep) {
qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 , GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->filterOutBlocks += 1;
(*status) = FUNC_DATA_REQUIRED_FILTEROUT;
@ -363,7 +366,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
// try to filter data block according to current results
doDynamicPruneDataBlock(pOperator, pBlockInfo, status);
if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 , GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->skipBlocks += 1;
tsdbReleaseDataBlock(pTableScanInfo->dataReader);
@ -394,7 +397,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
if (pBlock->info.rows == 0) {
pCost->filterOutBlocks += 1;
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms",
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", elapsed time:%.2f ms",
GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
} else {
qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
@ -672,8 +675,9 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
continue;
}
ASSERT(pBlock->info.id.uid != 0);
if (pBlock->info.id.uid) {
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
}
uint32_t status = 0;
int32_t code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
@ -698,7 +702,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
// pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid;
// pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
ASSERT(pBlock->info.id.uid != 0);
return pBlock;
}
return NULL;
@ -779,13 +782,14 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
// if no data, switch to next table and continue scan
pInfo->currentTable++;
if (pInfo->currentTable >= numOfTables) {
qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
return NULL;
}
STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable);
tsdbSetTableList(pInfo->base.dataReader, pTableInfo, 1);
qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d %s", pTableInfo->uid, numOfTables,
pInfo->currentTable, pTaskInfo->id.str);
qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", pTableInfo->uid, numOfTables,
pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo));
tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
pInfo->scanTimes = 0;
@ -803,7 +807,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
ASSERT(pInfo->base.dataReader == NULL);
int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
(STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo));
(STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
@ -815,7 +819,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
SSDataBlock* result = doGroupedTableScan(pOperator);
if (result != NULL) {
ASSERT(result->info.id.uid != 0);
return result;
}
@ -936,6 +939,10 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
goto _error;
}
if (scanDebug) {
pInfo->countOnly = true;
}
taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
optrDefaultBufFn, getTableScannerExecInfo);
@ -1027,7 +1034,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
STsdbReader* pReader = NULL;
int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
(STsdbReader**)&pReader, GET_TASKID(pTaskInfo));
(STsdbReader**)&pReader, GET_TASKID(pTaskInfo), false);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
T_LONG_JMP(pTaskInfo->env, code);
@ -1049,7 +1056,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
}
tsdbReaderClose(pReader);
qDebug("retrieve prev rows:%d, skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
qDebug("retrieve prev rows:%" PRId64 ", skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
", suid:%" PRIu64,
pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid);
@ -1596,8 +1603,9 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStreamScanInfo* pInfo = pOperator->info;
const char* id = GET_TASKID(pTaskInfo);
qDebug("start to exec queue scan");
qDebug("start to exec queue scan, %s", id);
if (pTaskInfo->streamInfo.submit.msgStr != NULL) {
if (pInfo->tqReader->msg2.msgStr == NULL) {
@ -1635,7 +1643,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
if (pResult && pResult->info.rows > 0) {
qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64, pResult->info.rows,
qDebug("queue scan tsdb return %"PRId64" rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64, pResult->info.rows,
pResult->info.window.skey, pResult->info.window.ekey, pInfo->tqReader->pWalReader->curVersion);
tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pResult->info.id.uid, pResult->info.window.ekey);
return pResult;
@ -1657,11 +1665,11 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pInfo->tqReader->pWalReader->curVersion - 1); //curVersion move to next, so currentOffset = curVersion - 1
if (ret.fetchType == FETCH_TYPE__DATA) {
qDebug("doQueueScan get data from log %d rows, version:%" PRId64, pInfo->pRes->info.rows, pTaskInfo->streamInfo.currentOffset.version);
qDebug("doQueueScan get data from log %"PRId64" rows, version:%" PRId64, pInfo->pRes->info.rows, pTaskInfo->streamInfo.currentOffset.version);
blockDataCleanup(pInfo->pRes);
setBlockIntoRes(pInfo, &ret.data, true);
if (pInfo->pRes->info.rows > 0) {
qDebug("doQueueScan get data from log %d rows, return, version:%" PRId64, pInfo->pRes->info.rows, pTaskInfo->streamInfo.currentOffset.version);
qDebug("doQueueScan get data from log %"PRId64" rows, return, version:%" PRId64, pInfo->pRes->info.rows, pTaskInfo->streamInfo.currentOffset.version);
return pInfo->pRes;
}
}else if(ret.fetchType == FETCH_TYPE__NONE){
@ -1804,6 +1812,15 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
printDataBlock(pInfo->pUpdateRes, "recover update");
return pInfo->pUpdateRes;
} break;
case STREAM_SCAN_FROM_DELETE_DATA: {
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
printDataBlock(pInfo->pDeleteDataRes, "recover delete");
return pInfo->pDeleteDataRes;
} break;
case STREAM_SCAN_FROM_DATAREADER_RANGE: {
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
if (pSDB) {
@ -1840,7 +1857,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
printDataBlock(pInfo->pCreateTbRes, "recover createTbl");
return pInfo->pCreateTbRes;
}
qDebug("stream recover scan get block, rows %d", pInfo->pRecoverRes->info.rows);
qDebug("stream recover scan get block, rows %" PRId64 , pInfo->pRecoverRes->info.rows);
printDataBlock(pInfo->pRecoverRes, "scan recover");
return pInfo->pRecoverRes;
}
@ -1996,6 +2013,7 @@ FETCH_NEXT_BLOCK:
copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock);
blockDataCleanup(pSup->pScanBlock);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
return pInfo->pUpdateRes;
}
@ -2069,7 +2087,7 @@ FETCH_NEXT_BLOCK:
pOperator->resultInfo.totalRows += pBlockInfo->rows;
// printDataBlock(pInfo->pRes, "stream scan");
qDebug("scan rows: %d", pBlockInfo->rows);
qDebug("scan rows: %" PRId64 , pBlockInfo->rows);
if (pBlockInfo->rows > 0) {
return pInfo->pRes;
}
@ -2600,7 +2618,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
SReadHandle* pHandle = &pInfo->base.readHandle;
if (NULL == source->dataReader || !source->multiReader) {
code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo));
code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo), false);
if (code != 0) {
T_LONG_JMP(pTaskInfo->env, code);
}
@ -2827,7 +2845,7 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
}
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
qDebug("%s get sorted row block, rows:%d, limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
pInfo->limitInfo.numOfOutputRows);
return (pResBlock->info.rows > 0) ? pResBlock : NULL;

View File

@ -698,7 +698,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
pDataBlock->info.dataLoad = 1;
}
qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%d", GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId,
qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%" PRId64 , GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId,
pDataBlock->info.rows);
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;

View File

@ -2267,7 +2267,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
size_t num = tableListGetSize(pTableListInfo);
void* pList = tableListGetInfo(pTableListInfo, 0);
code = tsdbReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, &pInfo->pHandle, pTaskInfo->id.str);
code = tsdbReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, &pInfo->pHandle, pTaskInfo->id.str, false);
cleanupQueryTableDataCond(&cond);
if (code != 0) {
goto _error;

View File

@ -128,8 +128,9 @@ FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn
if (end >= 0) {
forwardRows = end;
if (pData[end + pos] == ekey) {
while (pData[end + pos] == ekey) {
forwardRows += 1;
++pos;
}
}
} else {
@ -137,8 +138,9 @@ FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn
if (end >= 0) {
forwardRows = end;
if (pData[end + pos] == ekey) {
while (pData[end + pos] == ekey) {
forwardRows += 1;
++pos;
}
}
// int32_t end = searchFn((char*)pData, pos + 1, ekey, order);

View File

@ -2457,7 +2457,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "interp",
.type = FUNCTION_TYPE_INTERP,
.classification = FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
FUNC_MGT_FORBID_STREAM_FUNC,
FUNC_MGT_FORBID_STREAM_FUNC|FUNC_MGT_KEEP_ORDER_FUNC,
.translateFunc = translateInterp,
.getEnvFunc = getSelectivityFuncEnv,
.initFunc = functionSetup,
@ -3278,7 +3278,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "_irowts",
.type = FUNCTION_TYPE_IROWTS,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_INTERP_PC_FUNC,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_INTERP_PC_FUNC|FUNC_MGT_KEEP_ORDER_FUNC,
.translateFunc = translateTimePseudoColumn,
.getEnvFunc = getTimePseudoFuncEnv,
.initFunc = NULL,

View File

@ -494,8 +494,8 @@ bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
return true;
}
static int32_t getNumOfElems(SqlFunctionCtx* pCtx) {
int32_t numOfElem = 0;
static int64_t getNumOfElems(SqlFunctionCtx* pCtx) {
int64_t numOfElem = 0;
/*
* 1. column data missing (schema modified) causes pInputCol->hasNull == true. pInput->colDataSMAIsSet == true;
@ -528,7 +528,7 @@ static int32_t getNumOfElems(SqlFunctionCtx* pCtx) {
* count function does not use the pCtx->interResBuf to keep the intermediate buffer
*/
int32_t countFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElem = 0;
int64_t numOfElem = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SInputColumnInfoData* pInput = &pCtx->input;
@ -555,7 +555,7 @@ int32_t countFunction(SqlFunctionCtx* pCtx) {
}
int32_t countInvertFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElem = getNumOfElems(pCtx);
int64_t numOfElem = getNumOfElems(pCtx);
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
@ -871,6 +871,12 @@ int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STu
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
// group_key function has its own process function
// do not process there
if (fmIsGroupKeyFunc(pc->functionId)) {
continue;
}
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
if (nullList[j]) {
colDataSetNULL(pDstCol, rowIndex);
@ -1929,7 +1935,7 @@ int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx) {
SAPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
qDebug("%s total %d rows will merge, %p", __FUNCTION__, pInput->numOfRows, pInfo->pHisto);
qDebug("%s total %" PRId64 " rows will merge, %p", __FUNCTION__, pInput->numOfRows, pInfo->pHisto);
int32_t start = pInput->startRowIndex;
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
@ -3091,6 +3097,12 @@ void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsid
for (int32_t i = 0; i < pSubsidiaryies->num; ++i) {
SqlFunctionCtx* pc = pSubsidiaryies->pCtx[i];
// group_key function has its own process function
// do not process there
if (fmIsGroupKeyFunc(pc->functionId)) {
continue;
}
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
int32_t srcSlotId = pFuncParam->pCol->slotId;

View File

@ -757,7 +757,7 @@ static bool isPrimaryKeyImpl(SNode* pExpr) {
if (FUNCTION_TYPE_SELECT_VALUE == pFunc->funcType || FUNCTION_TYPE_GROUP_KEY == pFunc->funcType ||
FUNCTION_TYPE_FIRST == pFunc->funcType || FUNCTION_TYPE_LAST == pFunc->funcType) {
return isPrimaryKeyImpl(nodesListGetNode(pFunc->pParameterList, 0));
} else if (FUNCTION_TYPE_WSTART == pFunc->funcType || FUNCTION_TYPE_WEND == pFunc->funcType) {
} else if (FUNCTION_TYPE_WSTART == pFunc->funcType || FUNCTION_TYPE_WEND == pFunc->funcType || FUNCTION_TYPE_IROWTS == pFunc->funcType) {
return true;
}
}
@ -1634,13 +1634,15 @@ static bool isTableStar(SNode* pNode) {
(0 == strcmp(((SColumnNode*)pNode)->colName, "*"));
}
static bool isStarParam(SNode* pNode) { return isStar(pNode) || isTableStar(pNode); }
static int32_t translateMultiResFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
if (!fmIsMultiResFunc(pFunc->funcId)) {
return TSDB_CODE_SUCCESS;
}
if (SQL_CLAUSE_SELECT != pCxt->currClause) {
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
if (isStar(pPara) || isTableStar(pPara)) {
if (isStarParam(pPara)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC,
"%s(*) is only supported in SELECTed list", pFunc->functionName);
}
@ -1654,7 +1656,7 @@ static int32_t translateMultiResFunc(STranslateContext* pCxt, SFunctionNode* pFu
static int32_t getMultiResFuncNum(SNodeList* pParameterList) {
if (1 == LIST_LENGTH(pParameterList)) {
return isStar(nodesListGetNode(pParameterList, 0)) ? 2 : 1;
return isStarParam(nodesListGetNode(pParameterList, 0)) ? 2 : 1;
}
return LIST_LENGTH(pParameterList);
}

View File

@ -1089,9 +1089,15 @@ static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogic
pExchange->srcStartGroupId = pExchangeLogicNode->srcStartGroupId;
pExchange->srcEndGroupId = pExchangeLogicNode->srcEndGroupId;
pExchange->seqRecvData = pExchangeLogicNode->seqRecvData;
*pPhyNode = (SPhysiNode*)pExchange;
return TSDB_CODE_SUCCESS;
int32_t code = setConditionsSlotId(pCxt, (const SLogicNode*)pExchangeLogicNode, (SPhysiNode*)pExchange);
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pExchange;
} else {
nodesDestroyNode((SNode*)pExchange);
}
return code;
}
static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
@ -1119,6 +1125,9 @@ static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExc
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockSlots(pCxt, pScan->pScanCols, pScan->node.pOutputDataBlockDesc);
}
if (TSDB_CODE_SUCCESS == code) {
code = setConditionsSlotId(pCxt, (const SLogicNode*)pExchangeLogicNode, (SPhysiNode*)pScan);
}
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pScan;

View File

@ -1365,7 +1365,8 @@ static int32_t unAllSplCreateExchangeNode(SSplitContext* pCxt, int32_t startGrou
pExchange->srcEndGroupId = pCxt->groupId - 1;
pExchange->node.precision = pProject->node.precision;
pExchange->node.pTargets = nodesCloneList(pProject->node.pTargets);
if (NULL == pExchange->node.pTargets) {
pExchange->node.pConditions = nodesCloneNode(pProject->node.pConditions);
if (NULL == pExchange->node.pTargets || (NULL != pProject->node.pConditions && NULL == pExchange->node.pConditions)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
TSWAP(pExchange->node.pLimit, pProject->node.pLimit);

View File

@ -199,7 +199,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
QW_ERR_JRET(code);
}
QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", pRes->info.rows, qcontinue);
QW_TASK_DLOG("data put into sink, rows:%" PRId64 ", continueExecTask:%d", pRes->info.rows, qcontinue);
}
if (numOfResBlock == 0 || (hasMore == false)) {

View File

@ -10,21 +10,21 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py
@ -46,6 +46,11 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/concat.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/create_wrong_topic.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dropDbR3ConflictTransaction.py -N 3
@ -868,6 +873,8 @@
,,y,script,./test.sh -f tsim/query/session.sim
,,y,script,./test.sh -f tsim/query/udf.sim
,,y,script,./test.sh -f tsim/query/udf_with_const.sim
,,y,script,./test.sh -f tsim/query/join_interval.sim
,,y,script,./test.sh -f tsim/query/unionall_as_table.sim
,,y,script,./test.sh -f tsim/query/sys_tbname.sim
,,y,script,./test.sh -f tsim/query/groupby.sim
,,y,script,./test.sh -f tsim/query/event.sim

View File

@ -0,0 +1,42 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c udf -v 1
system sh/exec.sh -n dnode1 -s start
sql connect
print ======== step create databases
sql create database d1
sql create database d2
sql create table d1.t1(ts timestamp, i int) tags(t int);
sql create table d2.t1(ts timestamp, i int);
sql insert into d1.t11 using d1.t1 tags(1) values(1500000000000, 0)(1500000000001, 1)(1500000000002,2)(1500000000003,3)(1500000000004,4)
sql insert into d1.t12 using d1.t1 tags(2) values(1500000000000, 0)(1500000000001, 1)(1500000000002,2)(1500000000003,3)(1500000000004,4)
sql insert into d1.t13 using d1.t1 tags(3) values(1500000000000, 0)(1500000000001, 1)(1500000000002,2)(1500000000003,3)(1500000000004,4)
sql insert into d2.t1 values(1500000000000,0)(1500000000001,1)(1500000000002,2)
sql select _wstart,_wend,count((a.ts)),count(b.ts) from d1.t1 a, d2.t1 b where a.ts is not null and a.ts = b.ts interval(1a) ;
if $data02 != 3 then
return -1
endi
if $data03 != 3 then
return -1
endi
if $data12 != 3 then
return -1
endi
if $data13 != 3 then
return -1
endi
if $data22 != 3 then
return -1
endi
if $data23 != 3 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -0,0 +1,28 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
sql create database test;
sql use test;
sql CREATE STABLE bw_yc_h_substation_mea (ts TIMESTAMP, create_date VARCHAR(50), create_time VARCHAR(30), load_time TIMESTAMP, sum_p_value FLOAT, sum_sz_value FLOAT, sum_gl_ys FLOAT, sum_g_value FLOAT) TAGS (id VARCHAR(50), name NCHAR(200), datasource VARCHAR(50), sys_flag VARCHAR(50));
sql CREATE STABLE aw_yc_h_substation_mea (ts TIMESTAMP, create_date VARCHAR(50), create_time VARCHAR(30), load_time TIMESTAMP, sum_p_value FLOAT, sum_sz_value FLOAT, sum_gl_ys FLOAT, sum_g_value FLOAT) TAGS (id VARCHAR(50), name NCHAR(200), datasource VARCHAR(50), sys_flag VARCHAR(50));
sql CREATE STABLE dw_yc_h_substation_mea (ts TIMESTAMP, create_date VARCHAR(50), create_time VARCHAR(30), load_time TIMESTAMP, sum_p_value FLOAT, sum_sz_value FLOAT, sum_gl_ys FLOAT, sum_g_value FLOAT) TAGS (id VARCHAR(50), name NCHAR(200), datasource VARCHAR(50), sys_flag VARCHAR(50));
sql insert into t1 using dw_yc_h_substation_mea tags('1234567890','testa','0021001','abc01') values(now,'2023-03-27','00:01:00',now,2.3,3.3,4.4,5.5);
sql insert into t2 using dw_yc_h_substation_mea tags('2234567890','testb','0022001','abc02') values(now,'2023-03-27','00:01:00',now,2.3,2.3,2.4,2.5);
sql insert into t3 using aw_yc_h_substation_mea tags('2234567890','testc','0023001','abc03') values(now,'2023-03-27','00:15:00',now,2.3,2.3,2.4,2.5);
sql insert into t4 using bw_yc_h_substation_mea tags('4234567890','testd','0021001','abc03') values(now,'2023-03-27','00:45:00',now,2.3,2.3,2.4,2.5);
sql insert into t5 using bw_yc_h_substation_mea tags('5234567890','testd','0021001','abc03') values(now,'2023-03-27','00:00:00',now,2.3,2.3,2.4,2.5);
sql select t.ts,t.id,t.name,t.create_date,t.create_time,t.datasource,t.sum_p_value from (select ts,id,name,create_date,create_time,datasource,sum_p_value from bw_yc_h_substation_mea where create_date='2023-03-27' and substr(create_time,4,2) in ('00','15','30','45') union all select ts,id,name,create_date,create_time,datasource,sum_p_value from aw_yc_h_substation_mea where create_date='2023-03-27' and substr(create_time,4,2) in ('00','15','30','45') union all select ts,id,name,create_date,create_time,datasource,sum_p_value from dw_yc_h_substation_mea where create_date='2023-03-27' and substr(create_time,4,2) in ('00','15','30','45')) t where t.datasource='0021001' and t.id='4234567890' order by t.create_time;
if $rows != 1 then
return -1
endi
if $data01 != @4234567890@ then
return -1
endi
if $data05 != @0021001@ then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -4,6 +4,7 @@ system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step 1
print =============== create database
sql create database test vgroups 4;
sql select * from information_schema.ins_databases;
@ -33,8 +34,8 @@ if $loop_count == 10 then
endi
sql select * from streamt1;
print data00 data01
print data10 data11
print $data00 $data01
print $data10 $data11
if $rows != 0 then
print =====rows=$rows
@ -52,8 +53,8 @@ if $loop_count == 10 then
endi
sql select * from streamt1;
print data00 data01
print data10 data11
print $data00 $data01
print $data10 $data11
if $rows != 1 then
print =====rows=$rows
@ -92,9 +93,64 @@ endi
sql select * from streamt1;
if $rows != 2 then
print =====rows=$rows
goto loop2
goto loop3
endi
print step 1 over
print step 2
sql create database test2 vgroups 1;
sql use test2;
sql create table t1(ts timestamp, a int, b int , c int, d double);
print create stream streams2 trigger at_once watermark 1000s into streamt2 as select _wstart, count(*) c1, count(d) c2 from t1 partition by b state_window(a)
sql create stream streams2 trigger at_once watermark 1000s into streamt2 as select _wstart, count(*) c1, count(d) c2 from t1 partition by b state_window(a);
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791213010,1,2,3,1.1);
$loop_count = 0
loop4:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from streamt2;
print $data00 $data01
print $data10 $data11
if $rows != 1 then
print =====rows=$rows
goto loop4
endi
print insert into t1 values(1648791213005,2,2,3,1.1)
sql insert into t1 values(1648791213005,2,2,3,1.1);
$loop_count = 0
loop5:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print select * from streamt2
sql select * from streamt2;
print $data00 $data01
print $data10 $data11
print $data20 $data21
print $data30 $data31
if $rows != 3 then
print =====rows=$rows
goto loop5
endi
print step 2 over
print state1 end

View File

@ -292,7 +292,7 @@ class TDTestCase:
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
# time.sleep(2)
vgroups = "30"
vgroups = "8"
sql = "create database db3 vgroups " + vgroups
tdSql.query(sql)
sql = "create table db3.stb (ts timestamp, f int) tags (t int)"

View File

@ -181,7 +181,7 @@ class TDTestCase:
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
# time.sleep(2)
vgroups = "30"
vgroups = "8"
sql = "create database db3 vgroups " + vgroups
tdSql.query(sql)

View File

@ -14,7 +14,6 @@
"auto.offset.reset": "earliest",
"enable.auto.commit": "true",
"auto.commit.interval.ms": 1000,
"enable.heartbeat.background": "true",
"experimental.snapshot.enable": "true",
"msg.with.table.name": "false",
"topic_list": [

View File

@ -83,7 +83,7 @@ class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
tdSql.init(conn.cursor(), True)
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
@ -147,9 +147,33 @@ class TDTestCase:
tdSql.checkData(1, 1, '55555')
tdSql.query("create table stb (ts timestamp, f1 int) tags (tg1 binary(2))")
keyDict['s'] = "\"alter table db1.tba add column f2 binary(5) \""
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", '')
if retCode != "TAOS_OK":
tdLog.exit("taos -s fail")
tdSql.query("select * from tba order by ts")
tdSql.query("select * from tba order by ts")
tdSql.checkData(0, 2, None)
tdSql.checkData(1, 2, None)
keyDict['s'] = "\"alter table db1.tba add column f3 binary(5) \""
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", '')
if retCode != "TAOS_OK":
tdLog.exit("taos -s fail")
tdSql.query("select f3 from tba order by ts")
tdSql.checkData(0, 0, None)
tdSql.checkData(1, 0, None)
tdSql.query("create table stb (ts timestamp, f1 int, f2 binary(2)) tags (tg1 binary(2))")
tdSql.query("create table tb1 using stb tags('bb')")
tdSql.query("insert into tb1 values (now, 2)")
tdSql.query("insert into tb1 values (now, 2,'22')")
tdSql.query("select count(*) from stb group by tg1")
tdSql.checkData(0, 0, 1)
@ -163,13 +187,23 @@ class TDTestCase:
if retCode != "TAOS_OK":
tdLog.exit("taos -s fail")
keyDict['s'] = "\"insert into db1.tb2 values (now, 2)\""
keyDict['s'] = "\"insert into db1.tb2 values (now, 2,'22')\""
retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", '')
if retCode != "TAOS_OK":
tdLog.exit("taos -s fail")
keyDict['s'] = "\"alter table db1.stb modify column f2 binary(5) \""
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", '')
if retCode != "TAOS_OK":
tdLog.exit("taos -s fail")
keyDict['s'] = "\"insert into db1.tb2 values (now, 3,'55555')\""
retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", '')
if retCode != "TAOS_OK":
tdLog.exit("taos -s fail")
tdSql.query("select count(*) from stb group by tg1")
tdSql.checkData(0, 0, 1)
tdSql.checkData(0, 0, 2)
tdSql.checkData(1, 0, 1)

View File

@ -6144,7 +6144,7 @@ class TDTestCase:
startTime = time.time()
self.function_before_26()
#self.function_before_26()
self.math_nest(['UNIQUE'])
self.math_nest(['MODE'])
@ -6157,9 +6157,9 @@ class TDTestCase:
# self.math_nest(['MAVG'])
# self.math_nest(['HYPERLOGLOG'])
# self.math_nest(['TAIL'])
# self.math_nest(['CSUM'])
# self.math_nest(['statecount','stateduration'])
# self.math_nest(['HISTOGRAM'])
self.math_nest(['CSUM'])
self.math_nest(['statecount','stateduration'])
self.math_nest(['HISTOGRAM'])
# self.str_nest(['LTRIM','RTRIM','LOWER','UPPER'])
# self.str_nest(['LENGTH','CHAR_LENGTH'])

View File

@ -0,0 +1,76 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from util.cases import tdCases
from .nestedQuery import *
class TDTestCase(TDTestCase):
def run(self):
tdSql.prepare()
startTime = time.time()
self.function_before_26()
# self.math_nest(['UNIQUE'])
# self.math_nest(['MODE'])
# self.math_nest(['SAMPLE'])
# self.math_nest(['ABS','SQRT'])
# self.math_nest(['SIN','COS','TAN','ASIN','ACOS','ATAN'])
# self.math_nest(['POW','LOG'])
# self.math_nest(['FLOOR','CEIL','ROUND'])
# self.math_nest(['MAVG'])
# self.math_nest(['HYPERLOGLOG'])
# self.math_nest(['TAIL'])
# self.math_nest(['CSUM'])
# self.math_nest(['statecount','stateduration'])
# self.math_nest(['HISTOGRAM'])
# self.str_nest(['LTRIM','RTRIM','LOWER','UPPER'])
# self.str_nest(['LENGTH','CHAR_LENGTH'])
# self.str_nest(['SUBSTR'])
# self.str_nest(['CONCAT'])
# self.str_nest(['CONCAT_WS'])
# self.time_nest(['CAST']) #放到time里起来弄
# self.time_nest(['CAST_1'])
# self.time_nest(['CAST_2'])
# self.time_nest(['CAST_3'])
# self.time_nest(['CAST_4'])
# self.time_nest(['NOW','TODAY'])
# self.time_nest(['TIMEZONE'])
# self.time_nest(['TIMETRUNCATE'])
# self.time_nest(['TO_ISO8601'])
# self.time_nest(['TO_UNIXTIMESTAMP'])
# self.time_nest(['ELAPSED'])
#self.time_nest(['TIMEDIFF_1'])
#self.time_nest(['TIMEDIFF_2'])
endTime = time.time()
print("total time %ds" % (endTime - startTime))
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -34,9 +34,9 @@ class TDTestCase(TDTestCase):
self.math_nest(['MAVG'])
self.math_nest(['HYPERLOGLOG'])
self.math_nest(['TAIL'])
self.math_nest(['CSUM'])
self.math_nest(['statecount','stateduration'])
self.math_nest(['HISTOGRAM'])
# self.math_nest(['CSUM'])
# self.math_nest(['statecount','stateduration'])
# self.math_nest(['HISTOGRAM'])
# self.str_nest(['LTRIM','RTRIM','LOWER','UPPER'])
# self.str_nest(['LENGTH','CHAR_LENGTH'])

View File

@ -162,19 +162,18 @@ class TDTestCase:
sql = "select count(*) from (select distinct(tbname) from %s.meters)" %dbname
tdSql.query(sql)
num = tdSql.getData(0,0)
# 目前不需要了
# num = tdSql.getData(0,0)
# for i in range(0,num):
# sql1 = "select count(*) from %s.d%d" %(dbname,i)
# tdSql.query(sql1)
# sql1_result = tdSql.getData(0,0)
# tdLog.info("sql:%s , result: %s" %(sql1,sql1_result))
for i in range(0,num):
sql1 = "select count(*) from %s.d%d" %(dbname,i)
tdSql.query(sql1)
sql1_result = tdSql.getData(0,0)
tdLog.info("sql:%s , result: %s" %(sql1,sql1_result))
def check_out_of_order(self,dbname,tables,per_table_num,order,replica):
self.run_benchmark(dbname,tables,per_table_num,order,replica)
print("sleep 10 seconds")
#time.sleep(10)
print("sleep 10 seconds finish")
self.run_sql(dbname)
@ -182,7 +181,7 @@ class TDTestCase:
startTime = time.time()
#self.check_out_of_order('db1',10,random.randint(10000,50000),random.randint(1,10),1)
self.check_out_of_order('db1',random.randint(50,200),random.randint(10000,20000),random.randint(1,5),1)
self.check_out_of_order('db1',random.randint(50,100),random.randint(10000,20000),random.randint(1,5),1)
# self.check_out_of_order('db2',random.randint(50,200),random.randint(10000,50000),random.randint(5,50),1)

View File

@ -13,11 +13,11 @@ from util.dnodes import *
class TDTestCase:
hostname = socket.gethostname()
#rpcDebugFlagVal = '143'
# rpcDebugFlagVal = '143'
#clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
#clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
#updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
#updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal
# updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal
#print ("===================: ", updatecfgDict)
def init(self, conn, logSql, replicaVar=1):

View File

@ -211,7 +211,7 @@ class TDTestCase:
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 # because taosd switch, may be consume duplication data
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
@ -251,10 +251,11 @@ class TDTestCase:
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[0] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.info("expect consume rows: %d should less/equal than act consume rows: %d"%(expectRowsList[0], resultList[0]))
if expectRowsList[0] > resultList[0]:
tdLog.exit("0 tmq consume rows error!")
if expectRowsList[0] == resultList[0]:
self.checkFileContent(consumerId, queryString)
time.sleep(10)

View File

@ -16,6 +16,8 @@ sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
updatecfgDict = {"tsdbDebugFlag":135}
def __init__(self):
self.vgroups = 4
self.ctbNum = 10

View File

@ -850,7 +850,7 @@ int smlProcess_18784_Test() {
taos_free_result(pRes);
const char *sql[] = {
"disk,device=sdc inodes_used=176059i,total=1081101176832i 1661943960000000000",
"disk,device=sdc inodes_used=176059i,total=1076048383523889174i 1661943960000000000",
"disk,device=sdc inodes_free=66932805i 1661943960000000000",
};
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL, 0);
@ -875,7 +875,7 @@ int smlProcess_18784_Test() {
if (rowIndex == 0) {
ASSERT(ts == 1661943960000);
ASSERT(used == 176059);
ASSERT(total == 1081101176832);
ASSERT(total == 1076048383523889174);
ASSERT(freed == 66932805);
// ASSERT_EQ(latitude, 24.5208);
// ASSERT_EQ(longitude, 28.09377);

View File

@ -542,7 +542,6 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "enable.heartbeat.background", "true");
if (g_conf.snapShot) {
tmq_conf_set(conf, "experimental.snapshot.enable", "true");