Merge branch 'main' into feature/3_liaohj

This commit is contained in:
Haojun Liao 2023-03-31 16:07:40 +08:00 committed by GitHub
commit 82d98f9c97
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 658 additions and 146 deletions

View File

@ -2,7 +2,7 @@
# taosadapter
ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
GIT_TAG d8059ff
GIT_TAG cb1e89c
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG e82b9fc
GIT_TAG d194dc9
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -293,7 +293,6 @@ You configure the following parameters when creating a consumer:
| `auto.offset.reset` | enum | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
| `enable.auto.commit` | boolean | Commit automatically | Specify `true` or `false`. |
| `auto.commit.interval.ms` | integer | Interval for automatic commits, in milliseconds |
| `enable.heartbeat.background` | boolean | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | |
| `experimental.snapshot.enable` | boolean | Specify whether to consume messages from the WAL or from TSBS | |
| `msg.with.table.name` | boolean | Specify whether to deserialize table names from messages |
@ -368,7 +367,6 @@ conf := &tmq.ConfigMap{
"td.connect.port": "6030",
"client.id": "test_tmq_c",
"enable.auto.commit": "false",
"enable.heartbeat.background": "true",
"experimental.snapshot.enable": "true",
"msg.with.table.name": "true",
}
@ -418,7 +416,6 @@ Python programs use the following parameters:
| `auto.commit.interval.ms` | string | Interval for automatic commits, in milliseconds | |
| `auto.offset.reset` | string | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) |
| `experimental.snapshot.enable` | string | Specify whether to consume messages from the WAL or from TSDB | Specify `true` or `false` |
| `enable.heartbeat.background` | string | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | Specify `true` or `false` |
</TabItem>

View File

@ -35,7 +35,6 @@ func main() {
"td.connect.port": "6030",
"client.id": "test_tmq_client",
"enable.auto.commit": "false",
"enable.heartbeat.background": "true",
"experimental.snapshot.enable": "true",
"msg.with.table.name": "true",
})

View File

@ -291,7 +291,6 @@ CREATE TOPIC topic_name AS DATABASE db_name;
| `auto.offset.reset` | enum | 消费组订阅的初始位置 | 可选:`earliest`(default), `latest`, `none` |
| `enable.auto.commit` | boolean | 是否启用消费位点自动提交 | 合法值:`true`, `false`。 |
| `auto.commit.interval.ms` | integer | 以毫秒为单位的消费记录自动提交消费位点时间间 | 默认 5000 m |
| `enable.heartbeat.background` | boolean | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 默认开启 |
| `experimental.snapshot.enable` | boolean | 是否允许从 TSDB 消费数据 | 实验功能,默认关闭 |
| `msg.with.table.name` | boolean | 是否允许从消息中解析表名, 不适用于列订阅(列订阅时可将 tbname 作为列写入 subquery 语句) | |
@ -366,7 +365,6 @@ conf := &tmq.ConfigMap{
"td.connect.port": "6030",
"client.id": "test_tmq_c",
"enable.auto.commit": "false",
"enable.heartbeat.background": "true",
"experimental.snapshot.enable": "true",
"msg.with.table.name": "true",
}
@ -418,7 +416,6 @@ consumer = Consumer({"group.id": "local", "td.connect.ip": "127.0.0.1"})
| `auto.commit.interval.ms` | string | 以毫秒为单位的自动提交时间间隔 | 默认值5000 ms |
| `auto.offset.reset` | string | 消费组订阅的初始位置 | 可选:`earliest`(default), `latest`, `none` |
| `experimental.snapshot.enable` | string | 是否允许从 TSDB 消费数据 | 合法值:`true`, `false` |
| `enable.heartbeat.background` | string | 启用后台心跳,启用后即使长时间不 poll 消息也不会造成离线 | 合法值:`true`, `false` |
</TabItem>

View File

@ -185,7 +185,7 @@ typedef struct SBlockID {
typedef struct SDataBlockInfo {
STimeWindow window;
int32_t rowSize;
int32_t rows; // todo hide this attribute
int64_t rows; // todo hide this attribute
uint32_t capacity;
SBlockID id;
int16_t hasVarCol;

View File

@ -112,7 +112,7 @@ typedef struct SResultDataInfo {
typedef struct SInputColumnInfoData {
int32_t totalRows; // total rows in current columnar data
int32_t startRowIndex; // handle started row index
int32_t numOfRows; // the number of rows needs to be handled
int64_t numOfRows; // the number of rows needs to be handled
int32_t numOfInputCols; // PTS is not included
bool colDataSMAIsSet; // if agg is set or not
SColumnInfoData *pPTS; // primary timestamp column

View File

@ -257,19 +257,14 @@ cleanup:
kvVal->f = (float)result;
#define SET_BIGINT \
if (smlDoubleToInt64OverFlow(result)) { \
errno = 0; \
int64_t tmp = taosStr2Int64(pVal, &endptr, 10); \
if (errno == ERANGE) { \
smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal); \
return false; \
} \
kvVal->type = TSDB_DATA_TYPE_BIGINT; \
kvVal->i = tmp; \
return true; \
} \
kvVal->type = TSDB_DATA_TYPE_BIGINT; \
kvVal->i = (int64_t)result;
errno = 0; \
int64_t tmp = taosStr2Int64(pVal, &endptr, 10); \
if (errno == ERANGE) { \
smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal); \
return false; \
} \
kvVal->type = TSDB_DATA_TYPE_BIGINT; \
kvVal->i = tmp;
#define SET_INT \
if (!IS_VALID_INT(result)) { \
@ -288,19 +283,14 @@ cleanup:
kvVal->i = result;
#define SET_UBIGINT \
if (result >= (double)UINT64_MAX || result < 0) { \
errno = 0; \
uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10); \
if (errno == ERANGE || result < 0) { \
smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal); \
return false; \
} \
kvVal->type = TSDB_DATA_TYPE_UBIGINT; \
kvVal->u = tmp; \
return true; \
} \
kvVal->type = TSDB_DATA_TYPE_UBIGINT; \
kvVal->u = result;
errno = 0; \
uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10); \
if (errno == ERANGE || result < 0) { \
smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal); \
return false; \
} \
kvVal->type = TSDB_DATA_TYPE_UBIGINT; \
kvVal->u = tmp;
#define SET_UINT \
if (!IS_VALID_UINT(result)) { \

View File

@ -324,15 +324,16 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
}
if (strcasecmp(key, "enable.heartbeat.background") == 0) {
if (strcasecmp(value, "true") == 0) {
conf->hbBgEnable = true;
return TMQ_CONF_OK;
} else if (strcasecmp(value, "false") == 0) {
conf->hbBgEnable = false;
return TMQ_CONF_OK;
} else {
// if (strcasecmp(value, "true") == 0) {
// conf->hbBgEnable = true;
// return TMQ_CONF_OK;
// } else if (strcasecmp(value, "false") == 0) {
// conf->hbBgEnable = false;
// return TMQ_CONF_OK;
// } else {
tscError("the default value of enable.heartbeat.background is true, can not be seted");
return TMQ_CONF_INVALID;
}
// }
}
if (strcasecmp(key, "td.connect.ip") == 0) {

View File

@ -982,7 +982,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
taosSort(pColInfoData->pData, pDataBlock->info.rows, pColInfoData->info.bytes, fn);
int64_t p1 = taosGetTimestampUs();
uDebug("blockDataSort easy cost:%" PRId64 ", rows:%d\n", p1 - p0, pDataBlock->info.rows);
uDebug("blockDataSort easy cost:%" PRId64 ", rows:%" PRId64 "\n", p1 - p0, pDataBlock->info.rows);
return TSDB_CODE_SUCCESS;
} else { // var data type
@ -1189,7 +1189,7 @@ void blockDataCleanup(SSDataBlock* pDataBlock) {
void blockDataEmpty(SSDataBlock* pDataBlock) {
SDataBlockInfo* pInfo = &pDataBlock->info;
if (pInfo->capacity == 0 || pInfo->rows > pDataBlock->info.capacity) {
if (pInfo->capacity == 0) {
return;
}
@ -1748,14 +1748,14 @@ int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
int64_t tbUid = pBlock->info.id.uid;
int16_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
int16_t hasVarCol = pBlock->info.hasVarCol;
int32_t rows = pBlock->info.rows;
int64_t rows = pBlock->info.rows;
int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
int32_t tlen = 0;
tlen += taosEncodeFixedI64(buf, tbUid);
tlen += taosEncodeFixedI16(buf, numOfCols);
tlen += taosEncodeFixedI16(buf, hasVarCol);
tlen += taosEncodeFixedI32(buf, rows);
tlen += taosEncodeFixedI64(buf, rows);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SColumnInfoData* pColData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
@ -1786,7 +1786,7 @@ void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
buf = taosDecodeFixedU64(buf, &pBlock->info.id.uid);
buf = taosDecodeFixedI16(buf, &numOfCols);
buf = taosDecodeFixedI16(buf, &pBlock->info.hasVarCol);
buf = taosDecodeFixedI32(buf, &pBlock->info.rows);
buf = taosDecodeFixedI64(buf, &pBlock->info.rows);
buf = taosDecodeFixedI32(buf, &sz);
pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData));
for (int32_t i = 0; i < sz; i++) {
@ -1990,7 +1990,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
int32_t len = 0;
len += snprintf(dumpBuf + len, size - len,
"===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%" PRId64
"|rows:%d|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n",
"|rows:%" PRId64 "|version:%" PRIu64 "|cal start:%" PRIu64 "|cal end:%" PRIu64 "|tbl:%s\n",
flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.id.groupId,
pDataBlock->info.id.uid, pDataBlock->info.rows, pDataBlock->info.version,
pDataBlock->info.calWin.skey, pDataBlock->info.calWin.ekey, pDataBlock->info.parTbName);

View File

@ -2453,6 +2453,11 @@ int32_t tColDataAddValueByDataBlock(SColData *pColData, int8_t type, int32_t byt
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0);
if (code) goto _exit;
} else {
if(ASSERT(varDataTLen(data + offset) <= bytes)){
uError("var data length invalid, varDataTLen(data + offset):%d <= bytes:%d", (int)varDataTLen(data + offset), bytes);
code = TSDB_CODE_INVALID_PARA;
goto _exit;
}
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_VALUE](pColData, (uint8_t *)varDataVal(data + offset),
varDataLen(data + offset));
}

View File

@ -178,7 +178,7 @@ typedef struct STsdbReader STsdbReader;
int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num);
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr);
SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly);
void tsdbReaderSetId(STsdbReader* pReader, const char* idstr);
void tsdbReaderClose(STsdbReader *pReader);

View File

@ -224,6 +224,8 @@ int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward,
void *tsdbTbDataIterDestroy(STbDataIter *pIter);
void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter);
bool tsdbTbDataIterNext(STbDataIter *pIter);
void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj* pTableMap, int64_t *rowsNum);
// STbData
int32_t tsdbGetNRowsInTbData(STbData *pTbData);
// tsdbFile.c ==============================================================================================

View File

@ -669,7 +669,7 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
#endif
for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) {
SSDataBlock *output = taosArrayGetP(pResList, i);
smaDebug("result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%d", output->info.id.uid,
smaDebug("result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRId64, output->info.id.uid,
output->info.id.groupId, output->info.rows);
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);

View File

@ -340,7 +340,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
continue;
}
ret->fetchType = FETCH_TYPE__DATA;
tqDebug("return data rows %d", ret->data.info.rows);
tqDebug("return data rows %" PRId64, ret->data.info.rows);
return 0;
}

View File

@ -114,7 +114,7 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
pRsp->blockNum++;
tqDebug("vgId:%d, consumer:0x%" PRIx64 " tmq task executed, rows:%d, total blocks:%d", vgId, pHandle->consumerId,
tqDebug("vgId:%d, consumer:0x%" PRIx64 " tmq task executed, rows:%" PRId64 ", total blocks:%d", vgId, pHandle->consumerId,
pDataBlock->info.rows, pRsp->blockNum);
if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {

View File

@ -282,6 +282,40 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) {
return true;
}
int64_t tsdbCountTbDataRows(STbData *pTbData) {
SMemSkipListNode *pNode = pTbData->sl.pHead;
int64_t rowsNum = 0;
while (NULL != pNode) {
pNode = SL_GET_NODE_FORWARD(pNode, 0);
if (pNode == pTbData->sl.pTail) {
return rowsNum;
}
rowsNum++;
}
return rowsNum;
}
void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj* pTableMap, int64_t *rowsNum) {
taosRLockLatch(&pMemTable->latch);
for (int32_t i = 0; i < pMemTable->nBucket; ++i) {
STbData *pTbData = pMemTable->aBucket[i];
while (pTbData) {
void* p = taosHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid));
if (p == NULL) {
pTbData = pTbData->next;
continue;
}
*rowsNum += tsdbCountTbDataRows(pTbData);
pTbData = pTbData->next;
}
}
taosRUnLockLatch(&pMemTable->latch);
}
static int32_t tsdbMemTableRehash(SMemTable *pMemTable) {
int32_t code = 0;

View File

@ -25,6 +25,11 @@ typedef enum {
EXTERNAL_ROWS_NEXT = 0x3,
} EContentData;
typedef enum {
READ_MODE_COUNT_ONLY = 0x1,
READ_MODE_ALL,
} EReadMode;
typedef struct {
STbDataIter* iter;
int32_t index;
@ -168,6 +173,8 @@ struct STsdbReader {
uint64_t suid;
int16_t order;
bool freeBlock;
EReadMode readMode;
uint64_t rowsNum;
STimeWindow window; // the primary query time window that applies to all queries
SSDataBlock* pResBlock;
int32_t capacity;
@ -1777,7 +1784,7 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
setComposedBlockFlag(pReader, true);
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%d, brange:%" PRId64
tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%" PRId64 ", brange:%" PRId64
" - %" PRId64 ", uid:%" PRIu64 ", %s",
pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
pBlockScanInfo->uid, pReader->idStr);
@ -2770,7 +2777,7 @@ _end:
if (pResBlock->info.rows > 0) {
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
" rows:%d, elapsed time:%.2f ms %s",
" rows:%" PRId64 ", elapsed time:%.2f ms %s",
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, el, pReader->idStr);
}
@ -3022,7 +3029,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
if (pResBlock->info.rows > 0) {
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
" rows:%d, elapsed time:%.2f ms %s",
" rows:%" PRId64 ", elapsed time:%.2f ms %s",
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, el, pReader->idStr);
return TSDB_CODE_SUCCESS;
@ -3106,7 +3113,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
if (pResBlock->info.rows > 0) {
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
" rows:%d, elapsed time:%.2f ms %s",
" rows:%" PRId64 ", elapsed time:%.2f ms %s",
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, el, pReader->idStr);
}
@ -3132,6 +3139,151 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
return code;
}
static int32_t doSumFileBlockRows(STsdbReader* pReader, SDataFReader* pFileReader) {
int64_t st = taosGetTimestampUs();
LRUHandle* handle = NULL;
int32_t code = tsdbCacheGetBlockIdx(pFileReader->pTsdb->biCache, pFileReader, &handle);
if (code != TSDB_CODE_SUCCESS || handle == NULL) {
goto _end;
}
int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
SArray* aBlockIdx = (SArray*)taosLRUCacheValue(pFileReader->pTsdb->biCache, handle);
size_t num = taosArrayGetSize(aBlockIdx);
if (num == 0) {
tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
return TSDB_CODE_SUCCESS;
}
SBlockIdx* pBlockIdx = NULL;
int32_t i = 0;
for (int32_t i = 0; i < num; ++i) {
pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
if (pBlockIdx->suid != pReader->suid) {
continue;
}
STableBlockScanInfo** p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(pBlockIdx->uid));
if (p == NULL) {
continue;
}
STableBlockScanInfo *pScanInfo = *p;
tMapDataReset(&pScanInfo->mapData);
tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData);
SDataBlk block = {0};
for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) {
tGetDataBlk(pScanInfo->mapData.pData + pScanInfo->mapData.aOffset[j], &block);
pReader->rowsNum += block.nRow;
}
}
_end:
tsdbBICacheRelease(pFileReader->pTsdb->biCache, handle);
return code;
}
static int32_t doSumSttBlockRows(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
SSttBlockLoadInfo* pBlockLoadInfo = NULL;
for (int32_t i = 0; i < pReader->pFileReader->pSet->nSttF; ++i) { // open all last file
pBlockLoadInfo = &pLastBlockReader->pInfo[i];
code = tsdbReadSttBlk(pReader->pFileReader, i, pBlockLoadInfo->aSttBlk);
if (code) {
return code;
}
size_t size = taosArrayGetSize(pBlockLoadInfo->aSttBlk);
if (size >= 1) {
SSttBlk *pStart = taosArrayGet(pBlockLoadInfo->aSttBlk, 0);
SSttBlk *pEnd = taosArrayGet(pBlockLoadInfo->aSttBlk, size - 1);
// all identical
if (pStart->suid == pEnd->suid) {
if (pStart->suid != pReader->suid) {
// no qualified stt block existed
taosArrayClear(pBlockLoadInfo->aSttBlk);
continue;
}
for (int32_t i = 0; i < size; ++i) {
SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
pReader->rowsNum += p->nRow;
}
} else {
for (int32_t i = 0; i < size; ++i) {
SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
uint64_t s = p->suid;
if (s < pReader->suid) {
continue;
}
if (s == pReader->suid) {
pReader->rowsNum += p->nRow;
} else if (s > pReader->suid) {
break;
}
}
}
}
}
return code;
}
static int32_t readRowsCountFromFiles(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
while (1) {
bool hasNext = false;
int32_t code = filesetIteratorNext(&pReader->status.fileIter, pReader, &hasNext);
if (code) {
return code;
}
if (!hasNext) { // no data files on disk
break;
}
code = doSumFileBlockRows(pReader, pReader->pFileReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = doSumSttBlockRows(pReader);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
pReader->status.loadFromFile = false;
return code;
}
static int32_t readRowsCountFromMem(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t memNum = 0, imemNum = 0;
if (pReader->pReadSnap->pMem != NULL) {
tsdbMemTableCountRows(pReader->pReadSnap->pMem, pReader->status.pTableMap, &memNum);
}
if (pReader->pReadSnap->pIMem != NULL) {
tsdbMemTableCountRows(pReader->pReadSnap->pIMem, pReader->status.pTableMap, &imemNum);
}
pReader->rowsNum += memNum + imemNum;
return code;
}
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status;
STableUidList* pUidList = &pStatus->uidList;
@ -4072,6 +4224,8 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
if (pStatus->fileIter.numOfFiles == 0) {
pStatus->loadFromFile = false;
} else if (READ_MODE_COUNT_ONLY == pReader->readMode) {
// DO NOTHING
} else {
code = initForFirstBlockInFile(pReader, pBlockIter);
}
@ -4090,7 +4244,7 @@ static void freeSchemaFunc(void* param) {
// ====================================== EXPOSED APIs ======================================
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr) {
SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr, bool countOnly) {
STimeWindow window = pCond->twindows;
if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
pCond->twindows.skey += 1;
@ -4190,6 +4344,10 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
pReader->suspended = true;
if (countOnly) {
pReader->readMode = READ_MODE_COUNT_ONLY;
}
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
return code;
@ -4490,6 +4648,33 @@ _err:
return code;
}
static bool tsdbReadRowsCountOnly(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS;
SSDataBlock* pBlock = pReader->pResBlock;
if (pReader->status.loadFromFile == false) {
return false;
}
code = readRowsCountFromFiles(pReader);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
code = readRowsCountFromMem(pReader);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
pBlock->info.rows = pReader->rowsNum;
pBlock->info.id.uid = 0;
pBlock->info.dataLoad = 0;
pReader->rowsNum = 0;
return pBlock->info.rows > 0;
}
static int32_t doTsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) {
int32_t code = TSDB_CODE_SUCCESS;
@ -4504,6 +4689,10 @@ static int32_t doTsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) {
return code;
}
if (READ_MODE_COUNT_ONLY == pReader->readMode) {
return tsdbReadRowsCountOnly(pReader);
}
if (pStatus->loadFromFile) {
code = buildBlockFromFiles(pReader);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -336,6 +336,7 @@ typedef struct STableScanInfo {
int8_t scanMode;
int8_t assignBlockUid;
bool hasGroupByTag;
bool countOnly;
} STableScanInfo;
typedef struct STableMergeScanInfo {

View File

@ -212,6 +212,11 @@ static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) {
return NULL;
}
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
if (blockDataGetNumOfRows(pBlock) == 0) {
continue;
}
SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
if (hasLimitOffsetInfo(pLimitInfo)) {
int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
@ -303,6 +308,11 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
pTaskInfo);
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pDummyBlock->pDataBlock);
code = filterInitFromNode((SNode*)pExNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
pOperator->fpSet =
createOperatorFpSet(prepareLoadRemoteData, loadRemoteData, NULL, destroyExchangeOperatorInfo, optrDefaultBufFn, NULL);
return pOperator;

View File

@ -571,6 +571,10 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
memcpy(pStart, data, len);
pStart += len;
} else if (IS_VAR_DATA_TYPE(pValue->info.type)) {
if (varDataTLen(data) > pValue->info.bytes) {
code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
goto end;
}
memcpy(pStart, data, varDataTLen(data));
pStart += varDataTLen(data);
} else {

View File

@ -1167,7 +1167,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
if (pScanBaseInfo->dataReader == NULL) {
int32_t code = tsdbReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1,
pScanInfo->pResBlock, &pScanBaseInfo->dataReader, id);
pScanInfo->pResBlock, &pScanBaseInfo->dataReader, id, false);
if (code != TSDB_CODE_SUCCESS) {
qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id);
terrno = code;
@ -1218,7 +1218,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);
int32_t size = tableListGetSize(pTableListInfo);
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL);
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL, false);
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);

View File

@ -1201,7 +1201,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
blockDataEnsureCapacity(pBlock, pBlock->info.rows + pRow->numOfRows);
qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s",
qDebug("datablock capacity not sufficient, expand to required:%" PRId64 ", current capacity:%d, %s",
(pRow->numOfRows+pBlock->info.rows),
pBlock->info.capacity, GET_TASKID(pTaskInfo));
// todo set the pOperator->resultInfo size
@ -1214,7 +1214,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
pBlock->info.rows += pRow->numOfRows;
}
qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
pBlock->info.id.groupId);
pBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pBlock, 0);

View File

@ -271,7 +271,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
return NULL;
}
}
qDebug("set op close, exec %d, status %d rows %d", pTaskInfo->execModel, pOperator->status,
qDebug("set op close, exec %d, status %d rows %" PRId64 , pTaskInfo->execModel, pOperator->status,
pFinalRes->info.rows);
setOperatorCompleted(pOperator);
break;
@ -337,7 +337,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
// when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint.
if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) {
qDebug("project return %d rows, status %d", pFinalRes->info.rows, pOperator->status);
qDebug("project return %" PRId64 " rows, status %d", pFinalRes->info.rows, pOperator->status);
break;
}
} else {

View File

@ -31,6 +31,9 @@
#include "thash.h"
#include "ttypes.h"
int32_t scanDebug = 0;
#define MULTI_READER_MAX_TABLE_NUM 5000
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
@ -308,14 +311,14 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
taosMemoryFreeClear(pBlock->pBlockAgg);
if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 , GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->filterOutBlocks += 1;
pCost->totalRows += pBlock->info.rows;
tsdbReleaseDataBlock(pTableScanInfo->dataReader);
return TSDB_CODE_SUCCESS;
} else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d, uid:%" PRIu64, GET_TASKID(pTaskInfo),
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", uid:%" PRIu64, GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, pBlockInfo->id.uid);
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
pCost->skipBlocks += 1;
@ -326,7 +329,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
loadSMA = true; // mark the operation of load sma;
bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 , GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
tsdbReleaseDataBlock(pTableScanInfo->dataReader);
@ -346,7 +349,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
size_t size = taosArrayGetSize(pBlock->pDataBlock);
bool keep = doFilterByBlockSMA(pOperator->exprSupp.pFilterInfo, pBlock->pBlockAgg, size, pBlockInfo->rows);
if (!keep) {
qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
qDebug("%s data block filter out by block SMA, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 , GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->filterOutBlocks += 1;
(*status) = FUNC_DATA_REQUIRED_FILTEROUT;
@ -363,7 +366,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
// try to filter data block according to current results
doDynamicPruneDataBlock(pOperator, pBlockInfo, status);
if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 , GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->skipBlocks += 1;
tsdbReleaseDataBlock(pTableScanInfo->dataReader);
@ -394,7 +397,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
if (pBlock->info.rows == 0) {
pCost->filterOutBlocks += 1;
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms",
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%" PRId64 ", elapsed time:%.2f ms",
GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
} else {
qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
@ -672,8 +675,9 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
continue;
}
ASSERT(pBlock->info.id.uid != 0);
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
if (pBlock->info.id.uid) {
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
}
uint32_t status = 0;
int32_t code = loadDataBlock(pOperator, &pTableScanInfo->base, pBlock, &status);
@ -698,7 +702,6 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
pTaskInfo->streamInfo.lastStatus.uid = pBlock->info.id.uid;
pTaskInfo->streamInfo.lastStatus.ts = pBlock->info.window.ekey;
ASSERT(pBlock->info.id.uid != 0);
return pBlock;
}
return NULL;
@ -804,7 +807,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
ASSERT(pInfo->base.dataReader == NULL);
int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
(STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo));
(STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
@ -816,7 +819,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
SSDataBlock* result = doGroupedTableScan(pOperator);
if (result != NULL) {
ASSERT(result->info.id.uid != 0);
return result;
}
@ -936,6 +938,10 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
goto _error;
}
if (scanDebug) {
pInfo->countOnly = true;
}
taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
optrDefaultBufFn, getTableScannerExecInfo);
@ -1027,7 +1033,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
STsdbReader* pReader = NULL;
int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
(STsdbReader**)&pReader, GET_TASKID(pTaskInfo));
(STsdbReader**)&pReader, GET_TASKID(pTaskInfo), false);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
T_LONG_JMP(pTaskInfo->env, code);
@ -1049,7 +1055,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
}
tsdbReaderClose(pReader);
qDebug("retrieve prev rows:%d, skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
qDebug("retrieve prev rows:%" PRId64 ", skey:%" PRId64 ", ekey:%" PRId64 " uid:%" PRIu64 ", max ver:%" PRId64
", suid:%" PRIu64,
pBlock->info.rows, startTs, endTs, tbUid, maxVersion, cond.suid);
@ -1639,7 +1645,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
if (pResult && pResult->info.rows > 0) {
qDebug("queue scan tsdb return %d rows brange:%" PRId64 " - %" PRId64 " wal curVersion:%" PRId64" %s", pResult->info.rows,
qDebug("queue scan tsdb return %" PRId64 " rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64" %s", pResult->info.rows,
pResult->info.window.skey, pResult->info.window.ekey, pInfo->tqReader->pWalReader->curVersion, id);
pTaskInfo->streamInfo.returned = 1;
return pResult;
@ -1680,7 +1686,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
setBlockIntoRes(pInfo, &ret.data, true);
if (pInfo->pRes->info.rows > 0) {
pOperator->status = OP_EXEC_RECV;
qDebug("queue scan log return %d rows", pInfo->pRes->info.rows);
qDebug("queue scan log return %" PRId64 " rows", pInfo->pRes->info.rows);
return pInfo->pRes;
}
} else if (ret.fetchType == FETCH_TYPE__META) {
@ -1831,6 +1837,15 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
printDataBlock(pInfo->pUpdateRes, "recover update");
return pInfo->pUpdateRes;
} break;
case STREAM_SCAN_FROM_DELETE_DATA: {
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
copyDataBlock(pInfo->pDeleteDataRes, pInfo->pUpdateRes);
pInfo->pDeleteDataRes->info.type = STREAM_DELETE_DATA;
printDataBlock(pInfo->pDeleteDataRes, "recover delete");
return pInfo->pDeleteDataRes;
} break;
case STREAM_SCAN_FROM_DATAREADER_RANGE: {
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
if (pSDB) {
@ -1867,7 +1882,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
printDataBlock(pInfo->pCreateTbRes, "recover createTbl");
return pInfo->pCreateTbRes;
}
qDebug("stream recover scan get block, rows %d", pInfo->pRecoverRes->info.rows);
qDebug("stream recover scan get block, rows %" PRId64 , pInfo->pRecoverRes->info.rows);
printDataBlock(pInfo->pRecoverRes, "scan recover");
return pInfo->pRecoverRes;
}
@ -2023,6 +2038,7 @@ FETCH_NEXT_BLOCK:
copyDataBlock(pInfo->pUpdateRes, pSup->pScanBlock);
blockDataCleanup(pSup->pScanBlock);
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
return pInfo->pUpdateRes;
}
@ -2096,7 +2112,7 @@ FETCH_NEXT_BLOCK:
pOperator->resultInfo.totalRows += pBlockInfo->rows;
// printDataBlock(pInfo->pRes, "stream scan");
qDebug("scan rows: %d", pBlockInfo->rows);
qDebug("scan rows: %" PRId64 , pBlockInfo->rows);
if (pBlockInfo->rows > 0) {
return pInfo->pRes;
}
@ -2637,7 +2653,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
SReadHandle* pHandle = &pInfo->base.readHandle;
if (NULL == source->dataReader || !source->multiReader) {
code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo));
code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo), false);
if (code != 0) {
T_LONG_JMP(pTaskInfo->env, code);
}
@ -2864,7 +2880,7 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
}
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
qDebug("%s get sorted row block, rows:%d, limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
qDebug("%s get sorted row block, rows:%" PRId64 ", limit:%" PRId64, GET_TASKID(pTaskInfo), pResBlock->info.rows,
pInfo->limitInfo.numOfOutputRows);
return (pResBlock->info.rows > 0) ? pResBlock : NULL;

View File

@ -698,7 +698,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
pDataBlock->info.dataLoad = 1;
}
qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%d", GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId,
qDebug("%s get sorted block, groupId:0x%" PRIx64 " rows:%" PRId64 , GET_TASKID(pTaskInfo), pDataBlock->info.id.groupId,
pDataBlock->info.rows);
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;

View File

@ -2267,7 +2267,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
size_t num = tableListGetSize(pTableListInfo);
void* pList = tableListGetInfo(pTableListInfo, 0);
code = tsdbReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, &pInfo->pHandle, pTaskInfo->id.str);
code = tsdbReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, &pInfo->pHandle, pTaskInfo->id.str, false);
cleanupQueryTableDataCond(&cond);
if (code != 0) {
goto _error;

View File

@ -128,8 +128,9 @@ FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn
if (end >= 0) {
forwardRows = end;
if (pData[end + pos] == ekey) {
while (pData[end + pos] == ekey) {
forwardRows += 1;
++pos;
}
}
} else {
@ -137,8 +138,9 @@ FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn
if (end >= 0) {
forwardRows = end;
if (pData[end + pos] == ekey) {
while (pData[end + pos] == ekey) {
forwardRows += 1;
++pos;
}
}
// int32_t end = searchFn((char*)pData, pos + 1, ekey, order);

View File

@ -2457,7 +2457,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "interp",
.type = FUNCTION_TYPE_INTERP,
.classification = FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_INTERVAL_INTERPO_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
FUNC_MGT_FORBID_STREAM_FUNC,
FUNC_MGT_FORBID_STREAM_FUNC|FUNC_MGT_KEEP_ORDER_FUNC,
.translateFunc = translateInterp,
.getEnvFunc = getSelectivityFuncEnv,
.initFunc = functionSetup,
@ -3278,7 +3278,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "_irowts",
.type = FUNCTION_TYPE_IROWTS,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_INTERP_PC_FUNC,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_INTERP_PC_FUNC|FUNC_MGT_KEEP_ORDER_FUNC,
.translateFunc = translateTimePseudoColumn,
.getEnvFunc = getTimePseudoFuncEnv,
.initFunc = NULL,

View File

@ -494,8 +494,8 @@ bool getCountFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
return true;
}
static int32_t getNumOfElems(SqlFunctionCtx* pCtx) {
int32_t numOfElem = 0;
static int64_t getNumOfElems(SqlFunctionCtx* pCtx) {
int64_t numOfElem = 0;
/*
* 1. column data missing (schema modified) causes pInputCol->hasNull == true. pInput->colDataSMAIsSet == true;
@ -528,7 +528,7 @@ static int32_t getNumOfElems(SqlFunctionCtx* pCtx) {
* count function does not use the pCtx->interResBuf to keep the intermediate buffer
*/
int32_t countFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElem = 0;
int64_t numOfElem = 0;
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SInputColumnInfoData* pInput = &pCtx->input;
@ -555,7 +555,7 @@ int32_t countFunction(SqlFunctionCtx* pCtx) {
}
int32_t countInvertFunction(SqlFunctionCtx* pCtx) {
int32_t numOfElem = getNumOfElems(pCtx);
int64_t numOfElem = getNumOfElems(pCtx);
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
@ -871,6 +871,12 @@ int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STu
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
// group_key function has its own process function
// do not process there
if (fmIsGroupKeyFunc(pc->functionId)) {
continue;
}
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
if (nullList[j]) {
colDataSetNULL(pDstCol, rowIndex);
@ -1929,7 +1935,7 @@ int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx) {
SAPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
qDebug("%s total %d rows will merge, %p", __FUNCTION__, pInput->numOfRows, pInfo->pHisto);
qDebug("%s total %" PRId64 " rows will merge, %p", __FUNCTION__, pInput->numOfRows, pInfo->pHisto);
int32_t start = pInput->startRowIndex;
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
@ -3091,6 +3097,12 @@ void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsid
for (int32_t i = 0; i < pSubsidiaryies->num; ++i) {
SqlFunctionCtx* pc = pSubsidiaryies->pCtx[i];
// group_key function has its own process function
// do not process there
if (fmIsGroupKeyFunc(pc->functionId)) {
continue;
}
SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0];
int32_t srcSlotId = pFuncParam->pCol->slotId;

View File

@ -757,7 +757,7 @@ static bool isPrimaryKeyImpl(SNode* pExpr) {
if (FUNCTION_TYPE_SELECT_VALUE == pFunc->funcType || FUNCTION_TYPE_GROUP_KEY == pFunc->funcType ||
FUNCTION_TYPE_FIRST == pFunc->funcType || FUNCTION_TYPE_LAST == pFunc->funcType) {
return isPrimaryKeyImpl(nodesListGetNode(pFunc->pParameterList, 0));
} else if (FUNCTION_TYPE_WSTART == pFunc->funcType || FUNCTION_TYPE_WEND == pFunc->funcType) {
} else if (FUNCTION_TYPE_WSTART == pFunc->funcType || FUNCTION_TYPE_WEND == pFunc->funcType || FUNCTION_TYPE_IROWTS == pFunc->funcType) {
return true;
}
}
@ -1634,13 +1634,15 @@ static bool isTableStar(SNode* pNode) {
(0 == strcmp(((SColumnNode*)pNode)->colName, "*"));
}
static bool isStarParam(SNode* pNode) { return isStar(pNode) || isTableStar(pNode); }
static int32_t translateMultiResFunc(STranslateContext* pCxt, SFunctionNode* pFunc) {
if (!fmIsMultiResFunc(pFunc->funcId)) {
return TSDB_CODE_SUCCESS;
}
if (SQL_CLAUSE_SELECT != pCxt->currClause) {
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
if (isStar(pPara) || isTableStar(pPara)) {
if (isStarParam(pPara)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC,
"%s(*) is only supported in SELECTed list", pFunc->functionName);
}
@ -1654,7 +1656,7 @@ static int32_t translateMultiResFunc(STranslateContext* pCxt, SFunctionNode* pFu
static int32_t getMultiResFuncNum(SNodeList* pParameterList) {
if (1 == LIST_LENGTH(pParameterList)) {
return isStar(nodesListGetNode(pParameterList, 0)) ? 2 : 1;
return isStarParam(nodesListGetNode(pParameterList, 0)) ? 2 : 1;
}
return LIST_LENGTH(pParameterList);
}

View File

@ -1089,9 +1089,15 @@ static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogic
pExchange->srcStartGroupId = pExchangeLogicNode->srcStartGroupId;
pExchange->srcEndGroupId = pExchangeLogicNode->srcEndGroupId;
pExchange->seqRecvData = pExchangeLogicNode->seqRecvData;
*pPhyNode = (SPhysiNode*)pExchange;
return TSDB_CODE_SUCCESS;
int32_t code = setConditionsSlotId(pCxt, (const SLogicNode*)pExchangeLogicNode, (SPhysiNode*)pExchange);
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pExchange;
} else {
nodesDestroyNode((SNode*)pExchange);
}
return code;
}
static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExchangeLogicNode* pExchangeLogicNode,
@ -1119,6 +1125,9 @@ static int32_t createStreamScanPhysiNodeByExchange(SPhysiPlanContext* pCxt, SExc
if (TSDB_CODE_SUCCESS == code) {
code = addDataBlockSlots(pCxt, pScan->pScanCols, pScan->node.pOutputDataBlockDesc);
}
if (TSDB_CODE_SUCCESS == code) {
code = setConditionsSlotId(pCxt, (const SLogicNode*)pExchangeLogicNode, (SPhysiNode*)pScan);
}
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pScan;

View File

@ -1365,7 +1365,8 @@ static int32_t unAllSplCreateExchangeNode(SSplitContext* pCxt, int32_t startGrou
pExchange->srcEndGroupId = pCxt->groupId - 1;
pExchange->node.precision = pProject->node.precision;
pExchange->node.pTargets = nodesCloneList(pProject->node.pTargets);
if (NULL == pExchange->node.pTargets) {
pExchange->node.pConditions = nodesCloneNode(pProject->node.pConditions);
if (NULL == pExchange->node.pTargets || (NULL != pProject->node.pConditions && NULL == pExchange->node.pConditions)) {
return TSDB_CODE_OUT_OF_MEMORY;
}
TSWAP(pExchange->node.pLimit, pProject->node.pLimit);

View File

@ -199,7 +199,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
QW_ERR_JRET(code);
}
QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", pRes->info.rows, qcontinue);
QW_TASK_DLOG("data put into sink, rows:%" PRId64 ", continueExecTask:%d", pRes->info.rows, qcontinue);
}
if (numOfResBlock == 0 || (hasMore == false)) {

View File

@ -10,21 +10,21 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py
@ -46,6 +46,11 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/concat.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/out_of_order.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/create_wrong_topic.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dropDbR3ConflictTransaction.py -N 3
@ -868,6 +873,8 @@
,,y,script,./test.sh -f tsim/query/session.sim
,,y,script,./test.sh -f tsim/query/udf.sim
,,y,script,./test.sh -f tsim/query/udf_with_const.sim
,,y,script,./test.sh -f tsim/query/join_interval.sim
,,y,script,./test.sh -f tsim/query/unionall_as_table.sim
,,y,script,./test.sh -f tsim/query/sys_tbname.sim
,,y,script,./test.sh -f tsim/query/groupby.sim
,,y,script,./test.sh -f tsim/query/event.sim

View File

@ -0,0 +1,42 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c udf -v 1
system sh/exec.sh -n dnode1 -s start
sql connect
print ======== step create databases
sql create database d1
sql create database d2
sql create table d1.t1(ts timestamp, i int) tags(t int);
sql create table d2.t1(ts timestamp, i int);
sql insert into d1.t11 using d1.t1 tags(1) values(1500000000000, 0)(1500000000001, 1)(1500000000002,2)(1500000000003,3)(1500000000004,4)
sql insert into d1.t12 using d1.t1 tags(2) values(1500000000000, 0)(1500000000001, 1)(1500000000002,2)(1500000000003,3)(1500000000004,4)
sql insert into d1.t13 using d1.t1 tags(3) values(1500000000000, 0)(1500000000001, 1)(1500000000002,2)(1500000000003,3)(1500000000004,4)
sql insert into d2.t1 values(1500000000000,0)(1500000000001,1)(1500000000002,2)
sql select _wstart,_wend,count((a.ts)),count(b.ts) from d1.t1 a, d2.t1 b where a.ts is not null and a.ts = b.ts interval(1a) ;
if $data02 != 3 then
return -1
endi
if $data03 != 3 then
return -1
endi
if $data12 != 3 then
return -1
endi
if $data13 != 3 then
return -1
endi
if $data22 != 3 then
return -1
endi
if $data23 != 3 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -0,0 +1,28 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
sql create database test;
sql use test;
sql CREATE STABLE bw_yc_h_substation_mea (ts TIMESTAMP, create_date VARCHAR(50), create_time VARCHAR(30), load_time TIMESTAMP, sum_p_value FLOAT, sum_sz_value FLOAT, sum_gl_ys FLOAT, sum_g_value FLOAT) TAGS (id VARCHAR(50), name NCHAR(200), datasource VARCHAR(50), sys_flag VARCHAR(50));
sql CREATE STABLE aw_yc_h_substation_mea (ts TIMESTAMP, create_date VARCHAR(50), create_time VARCHAR(30), load_time TIMESTAMP, sum_p_value FLOAT, sum_sz_value FLOAT, sum_gl_ys FLOAT, sum_g_value FLOAT) TAGS (id VARCHAR(50), name NCHAR(200), datasource VARCHAR(50), sys_flag VARCHAR(50));
sql CREATE STABLE dw_yc_h_substation_mea (ts TIMESTAMP, create_date VARCHAR(50), create_time VARCHAR(30), load_time TIMESTAMP, sum_p_value FLOAT, sum_sz_value FLOAT, sum_gl_ys FLOAT, sum_g_value FLOAT) TAGS (id VARCHAR(50), name NCHAR(200), datasource VARCHAR(50), sys_flag VARCHAR(50));
sql insert into t1 using dw_yc_h_substation_mea tags('1234567890','testa','0021001','abc01') values(now,'2023-03-27','00:01:00',now,2.3,3.3,4.4,5.5);
sql insert into t2 using dw_yc_h_substation_mea tags('2234567890','testb','0022001','abc02') values(now,'2023-03-27','00:01:00',now,2.3,2.3,2.4,2.5);
sql insert into t3 using aw_yc_h_substation_mea tags('2234567890','testc','0023001','abc03') values(now,'2023-03-27','00:15:00',now,2.3,2.3,2.4,2.5);
sql insert into t4 using bw_yc_h_substation_mea tags('4234567890','testd','0021001','abc03') values(now,'2023-03-27','00:45:00',now,2.3,2.3,2.4,2.5);
sql insert into t5 using bw_yc_h_substation_mea tags('5234567890','testd','0021001','abc03') values(now,'2023-03-27','00:00:00',now,2.3,2.3,2.4,2.5);
sql select t.ts,t.id,t.name,t.create_date,t.create_time,t.datasource,t.sum_p_value from (select ts,id,name,create_date,create_time,datasource,sum_p_value from bw_yc_h_substation_mea where create_date='2023-03-27' and substr(create_time,4,2) in ('00','15','30','45') union all select ts,id,name,create_date,create_time,datasource,sum_p_value from aw_yc_h_substation_mea where create_date='2023-03-27' and substr(create_time,4,2) in ('00','15','30','45') union all select ts,id,name,create_date,create_time,datasource,sum_p_value from dw_yc_h_substation_mea where create_date='2023-03-27' and substr(create_time,4,2) in ('00','15','30','45')) t where t.datasource='0021001' and t.id='4234567890' order by t.create_time;
if $rows != 1 then
return -1
endi
if $data01 != @4234567890@ then
return -1
endi
if $data05 != @0021001@ then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -4,6 +4,7 @@ system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
print step 1
print =============== create database
sql create database test vgroups 4;
sql select * from information_schema.ins_databases;
@ -33,8 +34,8 @@ if $loop_count == 10 then
endi
sql select * from streamt1;
print data00 data01
print data10 data11
print $data00 $data01
print $data10 $data11
if $rows != 0 then
print =====rows=$rows
@ -52,8 +53,8 @@ if $loop_count == 10 then
endi
sql select * from streamt1;
print data00 data01
print data10 data11
print $data00 $data01
print $data10 $data11
if $rows != 1 then
print =====rows=$rows
@ -92,9 +93,64 @@ endi
sql select * from streamt1;
if $rows != 2 then
print =====rows=$rows
goto loop2
goto loop3
endi
print step 1 over
print step 2
sql create database test2 vgroups 1;
sql use test2;
sql create table t1(ts timestamp, a int, b int , c int, d double);
print create stream streams2 trigger at_once watermark 1000s into streamt2 as select _wstart, count(*) c1, count(d) c2 from t1 partition by b state_window(a)
sql create stream streams2 trigger at_once watermark 1000s into streamt2 as select _wstart, count(*) c1, count(d) c2 from t1 partition by b state_window(a);
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791213010,1,2,3,1.1);
$loop_count = 0
loop4:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
sql select * from streamt2;
print $data00 $data01
print $data10 $data11
if $rows != 1 then
print =====rows=$rows
goto loop4
endi
print insert into t1 values(1648791213005,2,2,3,1.1)
sql insert into t1 values(1648791213005,2,2,3,1.1);
$loop_count = 0
loop5:
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
print select * from streamt2
sql select * from streamt2;
print $data00 $data01
print $data10 $data11
print $data20 $data21
print $data30 $data31
if $rows != 3 then
print =====rows=$rows
goto loop5
endi
print step 2 over
print state1 end

View File

@ -292,7 +292,7 @@ class TDTestCase:
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
# time.sleep(2)
vgroups = "30"
vgroups = "8"
sql = "create database db3 vgroups " + vgroups
tdSql.query(sql)
sql = "create table db3.stb (ts timestamp, f int) tags (t int)"

View File

@ -181,7 +181,7 @@ class TDTestCase:
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
# time.sleep(2)
vgroups = "30"
vgroups = "8"
sql = "create database db3 vgroups " + vgroups
tdSql.query(sql)

View File

@ -14,7 +14,6 @@
"auto.offset.reset": "earliest",
"enable.auto.commit": "true",
"auto.commit.interval.ms": 1000,
"enable.heartbeat.background": "true",
"experimental.snapshot.enable": "true",
"msg.with.table.name": "false",
"topic_list": [

View File

@ -147,9 +147,33 @@ class TDTestCase:
tdSql.checkData(1, 1, '55555')
tdSql.query("create table stb (ts timestamp, f1 int) tags (tg1 binary(2))")
keyDict['s'] = "\"alter table db1.tba add column f2 binary(5) \""
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", '')
if retCode != "TAOS_OK":
tdLog.exit("taos -s fail")
tdSql.query("select * from tba order by ts")
tdSql.query("select * from tba order by ts")
tdSql.checkData(0, 2, None)
tdSql.checkData(1, 2, None)
keyDict['s'] = "\"alter table db1.tba add column f3 binary(5) \""
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", '')
if retCode != "TAOS_OK":
tdLog.exit("taos -s fail")
tdSql.query("select f3 from tba order by ts")
tdSql.checkData(0, 0, None)
tdSql.checkData(1, 0, None)
tdSql.query("create table stb (ts timestamp, f1 int, f2 binary(2)) tags (tg1 binary(2))")
tdSql.query("create table tb1 using stb tags('bb')")
tdSql.query("insert into tb1 values (now, 2)")
tdSql.query("insert into tb1 values (now, 2,'22')")
tdSql.query("select count(*) from stb group by tg1")
tdSql.checkData(0, 0, 1)
@ -163,13 +187,23 @@ class TDTestCase:
if retCode != "TAOS_OK":
tdLog.exit("taos -s fail")
keyDict['s'] = "\"insert into db1.tb2 values (now, 2)\""
keyDict['s'] = "\"insert into db1.tb2 values (now, 2,'22')\""
retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", '')
if retCode != "TAOS_OK":
tdLog.exit("taos -s fail")
keyDict['s'] = "\"alter table db1.stb modify column f2 binary(5) \""
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", '')
if retCode != "TAOS_OK":
tdLog.exit("taos -s fail")
keyDict['s'] = "\"insert into db1.tb2 values (now, 3,'55555')\""
retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", '')
if retCode != "TAOS_OK":
tdLog.exit("taos -s fail")
tdSql.query("select count(*) from stb group by tg1")
tdSql.checkData(0, 0, 1)
tdSql.checkData(0, 0, 2)
tdSql.checkData(1, 0, 1)

View File

@ -6144,7 +6144,7 @@ class TDTestCase:
startTime = time.time()
self.function_before_26()
#self.function_before_26()
self.math_nest(['UNIQUE'])
self.math_nest(['MODE'])
@ -6157,9 +6157,9 @@ class TDTestCase:
# self.math_nest(['MAVG'])
# self.math_nest(['HYPERLOGLOG'])
# self.math_nest(['TAIL'])
# self.math_nest(['CSUM'])
# self.math_nest(['statecount','stateduration'])
# self.math_nest(['HISTOGRAM'])
self.math_nest(['CSUM'])
self.math_nest(['statecount','stateduration'])
self.math_nest(['HISTOGRAM'])
# self.str_nest(['LTRIM','RTRIM','LOWER','UPPER'])
# self.str_nest(['LENGTH','CHAR_LENGTH'])

View File

@ -0,0 +1,76 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from util.cases import tdCases
from .nestedQuery import *
class TDTestCase(TDTestCase):
def run(self):
tdSql.prepare()
startTime = time.time()
self.function_before_26()
# self.math_nest(['UNIQUE'])
# self.math_nest(['MODE'])
# self.math_nest(['SAMPLE'])
# self.math_nest(['ABS','SQRT'])
# self.math_nest(['SIN','COS','TAN','ASIN','ACOS','ATAN'])
# self.math_nest(['POW','LOG'])
# self.math_nest(['FLOOR','CEIL','ROUND'])
# self.math_nest(['MAVG'])
# self.math_nest(['HYPERLOGLOG'])
# self.math_nest(['TAIL'])
# self.math_nest(['CSUM'])
# self.math_nest(['statecount','stateduration'])
# self.math_nest(['HISTOGRAM'])
# self.str_nest(['LTRIM','RTRIM','LOWER','UPPER'])
# self.str_nest(['LENGTH','CHAR_LENGTH'])
# self.str_nest(['SUBSTR'])
# self.str_nest(['CONCAT'])
# self.str_nest(['CONCAT_WS'])
# self.time_nest(['CAST']) #放到time里起来弄
# self.time_nest(['CAST_1'])
# self.time_nest(['CAST_2'])
# self.time_nest(['CAST_3'])
# self.time_nest(['CAST_4'])
# self.time_nest(['NOW','TODAY'])
# self.time_nest(['TIMEZONE'])
# self.time_nest(['TIMETRUNCATE'])
# self.time_nest(['TO_ISO8601'])
# self.time_nest(['TO_UNIXTIMESTAMP'])
# self.time_nest(['ELAPSED'])
#self.time_nest(['TIMEDIFF_1'])
#self.time_nest(['TIMEDIFF_2'])
endTime = time.time()
print("total time %ds" % (endTime - startTime))
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -34,9 +34,9 @@ class TDTestCase(TDTestCase):
self.math_nest(['MAVG'])
self.math_nest(['HYPERLOGLOG'])
self.math_nest(['TAIL'])
self.math_nest(['CSUM'])
self.math_nest(['statecount','stateduration'])
self.math_nest(['HISTOGRAM'])
# self.math_nest(['CSUM'])
# self.math_nest(['statecount','stateduration'])
# self.math_nest(['HISTOGRAM'])
# self.str_nest(['LTRIM','RTRIM','LOWER','UPPER'])
# self.str_nest(['LENGTH','CHAR_LENGTH'])

View File

@ -162,19 +162,18 @@ class TDTestCase:
sql = "select count(*) from (select distinct(tbname) from %s.meters)" %dbname
tdSql.query(sql)
num = tdSql.getData(0,0)
# 目前不需要了
# num = tdSql.getData(0,0)
# for i in range(0,num):
# sql1 = "select count(*) from %s.d%d" %(dbname,i)
# tdSql.query(sql1)
# sql1_result = tdSql.getData(0,0)
# tdLog.info("sql:%s , result: %s" %(sql1,sql1_result))
for i in range(0,num):
sql1 = "select count(*) from %s.d%d" %(dbname,i)
tdSql.query(sql1)
sql1_result = tdSql.getData(0,0)
tdLog.info("sql:%s , result: %s" %(sql1,sql1_result))
def check_out_of_order(self,dbname,tables,per_table_num,order,replica):
self.run_benchmark(dbname,tables,per_table_num,order,replica)
print("sleep 10 seconds")
#time.sleep(10)
print("sleep 10 seconds finish")
self.run_sql(dbname)
@ -182,7 +181,7 @@ class TDTestCase:
startTime = time.time()
#self.check_out_of_order('db1',10,random.randint(10000,50000),random.randint(1,10),1)
self.check_out_of_order('db1',random.randint(50,200),random.randint(10000,20000),random.randint(1,5),1)
self.check_out_of_order('db1',random.randint(50,100),random.randint(10000,20000),random.randint(1,5),1)
# self.check_out_of_order('db2',random.randint(50,200),random.randint(10000,50000),random.randint(5,50),1)

View File

@ -211,7 +211,7 @@ class TDTestCase:
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 # because taosd switch, may be consume duplication data
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
@ -251,11 +251,12 @@ class TDTestCase:
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[0] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.info("expect consume rows: %d should less/equal than act consume rows: %d"%(expectRowsList[0], resultList[0]))
if expectRowsList[0] > resultList[0]:
tdLog.exit("0 tmq consume rows error!")
self.checkFileContent(consumerId, queryString)
if expectRowsList[0] == resultList[0]:
self.checkFileContent(consumerId, queryString)
time.sleep(10)
for i in range(len(topicNameList)):

View File

@ -850,7 +850,7 @@ int smlProcess_18784_Test() {
taos_free_result(pRes);
const char *sql[] = {
"disk,device=sdc inodes_used=176059i,total=1081101176832i 1661943960000000000",
"disk,device=sdc inodes_used=176059i,total=1076048383523889174i 1661943960000000000",
"disk,device=sdc inodes_free=66932805i 1661943960000000000",
};
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL, 0);
@ -875,7 +875,7 @@ int smlProcess_18784_Test() {
if (rowIndex == 0) {
ASSERT(ts == 1661943960000);
ASSERT(used == 176059);
ASSERT(total == 1081101176832);
ASSERT(total == 1076048383523889174);
ASSERT(freed == 66932805);
// ASSERT_EQ(latitude, 24.5208);
// ASSERT_EQ(longitude, 28.09377);

View File

@ -542,7 +542,6 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "enable.heartbeat.background", "true");
if (g_conf.snapShot) {
tmq_conf_set(conf, "experimental.snapshot.enable", "true");