Merge pull request #20674 from taosdata/fix/TS-3010
fix: query schema is old issue
This commit is contained in:
commit
0659814d16
|
@ -178,7 +178,7 @@ int32_t getJsonValueLen(const char* data);
|
||||||
|
|
||||||
int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
|
int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
|
||||||
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
|
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, bool isNull);
|
||||||
int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows);
|
int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const char* pData, uint32_t numOfRows, bool trimValue);
|
||||||
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity,
|
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int32_t* capacity,
|
||||||
const SColumnInfoData* pSource, int32_t numOfRow2);
|
const SColumnInfoData* pSource, int32_t numOfRow2);
|
||||||
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows,
|
int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows,
|
||||||
|
|
|
@ -147,9 +147,17 @@ int32_t colDataReserve(SColumnInfoData* pColumnInfoData, size_t newSize) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t currentRow, const char* pData,
|
static int32_t doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t currentRow, const char* pData,
|
||||||
int32_t itemLen, int32_t numOfRows) {
|
int32_t itemLen, int32_t numOfRows, bool trimValue) {
|
||||||
ASSERT(pColumnInfoData->info.bytes >= itemLen);
|
if (pColumnInfoData->info.bytes < itemLen) {
|
||||||
|
uWarn("column/tag actual data len %d is bigger than schema len %d, trim it:%d", itemLen, pColumnInfoData->info.bytes, trimValue);
|
||||||
|
if (trimValue) {
|
||||||
|
itemLen = pColumnInfoData->info.bytes;
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
size_t start = 1;
|
size_t start = 1;
|
||||||
|
|
||||||
// the first item
|
// the first item
|
||||||
|
@ -178,10 +186,12 @@ static void doCopyNItems(struct SColumnInfoData* pColumnInfoData, int32_t curren
|
||||||
|
|
||||||
pColumnInfoData->varmeta.length += numOfRows * itemLen;
|
pColumnInfoData->varmeta.length += numOfRows * itemLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData,
|
int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData,
|
||||||
uint32_t numOfRows) {
|
uint32_t numOfRows, bool trimValue) {
|
||||||
int32_t len = pColumnInfoData->info.bytes;
|
int32_t len = pColumnInfoData->info.bytes;
|
||||||
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
|
||||||
len = varDataTLen(pData);
|
len = varDataTLen(pData);
|
||||||
|
@ -193,8 +203,7 @@ int32_t colDataSetNItems(SColumnInfoData* pColumnInfoData, uint32_t currentRow,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
doCopyNItems(pColumnInfoData, currentRow, pData, len, numOfRows);
|
return doCopyNItems(pColumnInfoData, currentRow, pData, len, numOfRows, trimValue);
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource,
|
static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource,
|
||||||
|
|
|
@ -182,7 +182,7 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableL
|
||||||
|
|
||||||
void tsdbReaderSetId(STsdbReader* pReader, const char* idstr);
|
void tsdbReaderSetId(STsdbReader* pReader, const char* idstr);
|
||||||
void tsdbReaderClose(STsdbReader *pReader);
|
void tsdbReaderClose(STsdbReader *pReader);
|
||||||
bool tsdbNextDataBlock(STsdbReader *pReader);
|
int32_t tsdbNextDataBlock(STsdbReader *pReader, bool *hasNext);
|
||||||
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave);
|
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave);
|
||||||
void tsdbReleaseDataBlock(STsdbReader *pReader);
|
void tsdbReleaseDataBlock(STsdbReader *pReader);
|
||||||
SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
|
SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
|
||||||
|
|
|
@ -949,14 +949,17 @@ static void setBlockAllDumped(SFileBlockDumpInfo* pDumpInfo, int64_t maxKey, int
|
||||||
pDumpInfo->lastKey = maxKey + step;
|
pDumpInfo->lastKey = maxKey + step;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
|
static int32_t doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal,
|
||||||
SBlockLoadSuppInfo* pSup) {
|
SBlockLoadSuppInfo* pSup) {
|
||||||
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
||||||
if (!COL_VAL_IS_VALUE(pColVal)) {
|
if (!COL_VAL_IS_VALUE(pColVal)) {
|
||||||
colDataSetNULL(pColInfoData, rowIndex);
|
colDataSetNULL(pColInfoData, rowIndex);
|
||||||
} else {
|
} else {
|
||||||
varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
|
varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData);
|
||||||
ASSERT(pColVal->value.nData <= pColInfoData->info.bytes);
|
if (pColVal->value.nData > pColInfoData->info.bytes) {
|
||||||
|
tsdbWarn("column cid:%d actual data len %d is bigger than schema len %d", pColVal->cid, pColVal->value.nData, pColInfoData->info.bytes);
|
||||||
|
return TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
|
||||||
|
}
|
||||||
if (pColVal->value.nData > 0) { // pData may be null, if nData is 0
|
if (pColVal->value.nData > 0) { // pData may be null, if nData is 0
|
||||||
memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
|
memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData);
|
||||||
}
|
}
|
||||||
|
@ -966,6 +969,8 @@ static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_
|
||||||
} else {
|
} else {
|
||||||
colDataSetVal(pColInfoData, rowIndex, (const char*)&pColVal->value, !COL_VAL_IS_VALUE(pColVal));
|
colDataSetVal(pColInfoData, rowIndex, (const char*)&pColVal->value, !COL_VAL_IS_VALUE(pColVal));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
|
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
|
||||||
|
@ -1167,6 +1172,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
|
||||||
SDataBlk* pBlock = getCurrentBlock(pBlockIter);
|
SDataBlk* pBlock = getCurrentBlock(pBlockIter);
|
||||||
SSDataBlock* pResBlock = pReader->pResBlock;
|
SSDataBlock* pResBlock = pReader->pResBlock;
|
||||||
int32_t numOfOutputCols = pSupInfo->numOfCols;
|
int32_t numOfOutputCols = pSupInfo->numOfCols;
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
SColVal cv = {0};
|
SColVal cv = {0};
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
@ -1244,7 +1250,10 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
|
||||||
} else { // varchar/nchar type
|
} else { // varchar/nchar type
|
||||||
for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step) {
|
for (int32_t j = pDumpInfo->rowIndex; rowIndex < dumpedRows; j += step) {
|
||||||
tColDataGetValue(pData, j, &cv);
|
tColDataGetValue(pData, j, &cv);
|
||||||
doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
|
code = doCopyColVal(pColData, rowIndex++, i, &cv, pSupInfo);
|
||||||
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1776,23 +1785,29 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
|
static bool tryCopyDistinctRowFromFileBlock(STsdbReader* pReader, SBlockData* pBlockData, int64_t key,
|
||||||
SFileBlockDumpInfo* pDumpInfo) {
|
SFileBlockDumpInfo* pDumpInfo, bool *copied) {
|
||||||
// opt version
|
// opt version
|
||||||
// 1. it is not a border point
|
// 1. it is not a border point
|
||||||
// 2. the direct next point is not an duplicated timestamp
|
// 2. the direct next point is not an duplicated timestamp
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
*copied = false;
|
||||||
bool asc = (pReader->order == TSDB_ORDER_ASC);
|
bool asc = (pReader->order == TSDB_ORDER_ASC);
|
||||||
if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && asc) || (pDumpInfo->rowIndex > 0 && (!asc))) {
|
if ((pDumpInfo->rowIndex < pDumpInfo->totalRows - 1 && asc) || (pDumpInfo->rowIndex > 0 && (!asc))) {
|
||||||
int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
|
int32_t step = pReader->order == TSDB_ORDER_ASC ? 1 : -1;
|
||||||
|
|
||||||
int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
|
int64_t nextKey = pBlockData->aTSKEY[pDumpInfo->rowIndex + step];
|
||||||
if (nextKey != key) { // merge is not needed
|
if (nextKey != key) { // merge is not needed
|
||||||
doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
|
code = doAppendRowFromFileBlock(pReader->pResBlock, pReader, pBlockData, pDumpInfo->rowIndex);
|
||||||
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
pDumpInfo->rowIndex += step;
|
pDumpInfo->rowIndex += step;
|
||||||
return true;
|
*copied = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo,
|
static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo,
|
||||||
|
@ -1819,20 +1834,34 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
|
static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SLastBlockReader* pLastBlockReader,
|
||||||
STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader) {
|
STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader, bool *copied) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
*copied = false;
|
||||||
|
|
||||||
bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange);
|
bool hasVal = nextRowFromLastBlocks(pLastBlockReader, pScanInfo, &pReader->verRange);
|
||||||
if (hasVal) {
|
if (hasVal) {
|
||||||
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
|
int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||||
if (next1 != ts) {
|
if (next1 != ts) {
|
||||||
doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
|
code = doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
|
||||||
return true;
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
*copied = true;
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
|
code = doAppendRowFromFileBlock(pReader->pResBlock, pReader, fRow->pBlockData, fRow->iRow);
|
||||||
return true;
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
*copied = true;
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid) {
|
static FORCE_INLINE STSchema* getLatestTableSchema(STsdbReader* pReader, uint64_t uid) {
|
||||||
|
@ -2022,11 +2051,12 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
||||||
|
|
||||||
taosMemoryFree(pTSRow);
|
taosMemoryFree(pTSRow);
|
||||||
tsdbRowMergerClear(&merge);
|
tsdbRowMergerClear(&merge);
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
|
static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, STsdbReader* pReader,
|
||||||
|
@ -2034,7 +2064,8 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
|
||||||
bool mergeBlockData) {
|
bool mergeBlockData) {
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
|
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
|
||||||
|
bool copied = false;
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SRow* pTSRow = NULL;
|
SRow* pTSRow = NULL;
|
||||||
SRowMerger merge = {0};
|
SRowMerger merge = {0};
|
||||||
TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
|
||||||
|
@ -2042,7 +2073,12 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
|
||||||
|
|
||||||
// only last block exists
|
// only last block exists
|
||||||
if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
|
if ((!mergeBlockData) || (tsLastBlock != pBlockData->aTSKEY[pDumpInfo->rowIndex])) {
|
||||||
if (tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader)) {
|
code = tryCopyDistinctRowFromSttBlock(&fRow, pLastBlockReader, pBlockScanInfo, tsLastBlock, pReader, &copied);
|
||||||
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (copied) {
|
||||||
pBlockScanInfo->lastKey = tsLastBlock;
|
pBlockScanInfo->lastKey = tsLastBlock;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
|
@ -2060,10 +2096,15 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
||||||
|
|
||||||
taosMemoryFree(pTSRow);
|
taosMemoryFree(pTSRow);
|
||||||
tsdbRowMergerClear(&merge);
|
tsdbRowMergerClear(&merge);
|
||||||
|
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
} else { // not merge block data
|
} else { // not merge block data
|
||||||
int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
|
int32_t code = tsdbRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||||
|
@ -2083,10 +2124,14 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
||||||
|
|
||||||
taosMemoryFree(pTSRow);
|
taosMemoryFree(pTSRow);
|
||||||
tsdbRowMergerClear(&merge);
|
tsdbRowMergerClear(&merge);
|
||||||
|
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -2131,7 +2176,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
||||||
|
|
||||||
taosMemoryFree(pTSRow);
|
taosMemoryFree(pTSRow);
|
||||||
tsdbRowMergerClear(&merge);
|
tsdbRowMergerClear(&merge);
|
||||||
|
@ -2353,7 +2398,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
||||||
|
|
||||||
taosMemoryFree(pTSRow);
|
taosMemoryFree(pTSRow);
|
||||||
tsdbRowMergerClear(&merge);
|
tsdbRowMergerClear(&merge);
|
||||||
|
@ -2508,7 +2553,13 @@ bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo*
|
||||||
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
|
int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
|
||||||
STsdbReader* pReader) {
|
STsdbReader* pReader) {
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
|
bool copied = false;
|
||||||
|
int32_t code = tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo, &copied);
|
||||||
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (copied) {
|
||||||
pBlockScanInfo->lastKey = key;
|
pBlockScanInfo->lastKey = key;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
|
@ -2528,11 +2579,11 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
code = doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo);
|
||||||
|
|
||||||
taosMemoryFree(pTSRow);
|
taosMemoryFree(pTSRow);
|
||||||
tsdbRowMergerClear(&merge);
|
tsdbRowMergerClear(&merge);
|
||||||
return TSDB_CODE_SUCCESS;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2649,7 +2700,10 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
||||||
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) &&
|
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlock, pBlockScanInfo, keyInBuf, pLastBlockReader) &&
|
||||||
pBlock->nRow <= pReader->capacity) {
|
pBlock->nRow <= pReader->capacity) {
|
||||||
if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) {
|
if (asc || ((!asc) && (!hasDataInLastBlock(pLastBlockReader)))) {
|
||||||
copyBlockDataToSDataBlock(pReader);
|
code = copyBlockDataToSDataBlock(pReader);
|
||||||
|
if (code) {
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
// record the last key value
|
// record the last key value
|
||||||
pBlockScanInfo->lastKey = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
|
pBlockScanInfo->lastKey = asc ? pBlock->maxKey.ts : pBlock->minKey.ts;
|
||||||
|
@ -2696,8 +2750,11 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
|
code = buildComposedDataBlockImpl(pReader, pBlockScanInfo, pBlockData, pLastBlockReader);
|
||||||
|
if (code) {
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
// currently loaded file data block is consumed
|
// currently loaded file data block is consumed
|
||||||
if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
|
if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
|
||||||
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
|
SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter);
|
||||||
|
@ -2922,6 +2979,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;
|
SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;
|
||||||
STableUidList* pUidList = &pStatus->uidList;
|
STableUidList* pUidList = &pStatus->uidList;
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
if (taosHashGetSize(pStatus->pTableMap) == 0) {
|
if (taosHashGetSize(pStatus->pTableMap) == 0) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -2952,7 +3010,11 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
|
code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
|
||||||
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
if (pResBlock->info.rows >= pReader->capacity) {
|
if (pResBlock->info.rows >= pReader->capacity) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -3032,7 +3094,11 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
|
code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
|
||||||
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
if (pResBlock->info.rows >= pReader->capacity) {
|
if (pResBlock->info.rows >= pReader->capacity) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -3784,6 +3850,7 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
|
||||||
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, STableBlockScanInfo* pScanInfo) {
|
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, STableBlockScanInfo* pScanInfo) {
|
||||||
int32_t outputRowIndex = pBlock->info.rows;
|
int32_t outputRowIndex = pBlock->info.rows;
|
||||||
int64_t uid = pScanInfo->uid;
|
int64_t uid = pScanInfo->uid;
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
|
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
|
||||||
|
|
||||||
|
@ -3806,7 +3873,10 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pT
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]);
|
||||||
|
|
||||||
tRowGet(pTSRow, pSchema, j, &colVal);
|
tRowGet(pTSRow, pSchema, j, &colVal);
|
||||||
doCopyColVal(pColInfoData, outputRowIndex, i, &colVal, pSupInfo);
|
code = doCopyColVal(pColInfoData, outputRowIndex, i, &colVal, pSupInfo);
|
||||||
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
i += 1;
|
i += 1;
|
||||||
j += 1;
|
j += 1;
|
||||||
} else if (colId < pSchema->columns[j].colId) {
|
} else if (colId < pSchema->columns[j].colId) {
|
||||||
|
@ -3836,6 +3906,7 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
|
||||||
int32_t rowIndex) {
|
int32_t rowIndex) {
|
||||||
int32_t i = 0, j = 0;
|
int32_t i = 0, j = 0;
|
||||||
int32_t outputRowIndex = pResBlock->info.rows;
|
int32_t outputRowIndex = pResBlock->info.rows;
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo;
|
||||||
if (pReader->suppInfo.colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
if (pReader->suppInfo.colId[i] == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||||
|
@ -3858,7 +3929,10 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
|
||||||
SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, pSupInfo->slotId[i]);
|
SColumnInfoData* pCol = TARRAY_GET_ELEM(pResBlock->pDataBlock, pSupInfo->slotId[i]);
|
||||||
if (pData->cid == pSupInfo->colId[i]) {
|
if (pData->cid == pSupInfo->colId[i]) {
|
||||||
tColDataGetValue(pData, rowIndex, &cv);
|
tColDataGetValue(pData, rowIndex, &cv);
|
||||||
doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
|
code = doCopyColVal(pCol, outputRowIndex, i, &cv, pSupInfo);
|
||||||
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
j += 1;
|
j += 1;
|
||||||
} else if (pData->cid > pCol->info.colId) {
|
} else if (pData->cid > pCol->info.colId) {
|
||||||
// the specified column does not exist in file block, fill with null data
|
// the specified column does not exist in file block, fill with null data
|
||||||
|
@ -3882,6 +3956,7 @@ int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, S
|
||||||
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
|
int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity,
|
||||||
STsdbReader* pReader) {
|
STsdbReader* pReader) {
|
||||||
SSDataBlock* pBlock = pReader->pResBlock;
|
SSDataBlock* pBlock = pReader->pResBlock;
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
// SRow* pTSRow = NULL;
|
// SRow* pTSRow = NULL;
|
||||||
|
@ -3893,13 +3968,20 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
|
||||||
}
|
}
|
||||||
|
|
||||||
if (row.type == TSDBROW_ROW_FMT) {
|
if (row.type == TSDBROW_ROW_FMT) {
|
||||||
doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo);
|
code = doAppendRowFromTSRow(pBlock, pReader, row.pTSRow, pBlockScanInfo);
|
||||||
|
|
||||||
if (freeTSRow) {
|
if (freeTSRow) {
|
||||||
taosMemoryFree(row.pTSRow);
|
taosMemoryFree(row.pTSRow);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow);
|
code = doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow);
|
||||||
|
if (code) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// no data in buffer, return immediately
|
// no data in buffer, return immediately
|
||||||
|
@ -3912,7 +3994,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
|
||||||
}
|
}
|
||||||
} while (1);
|
} while (1);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO refactor: with createDataBlockScanInfo
|
// TODO refactor: with createDataBlockScanInfo
|
||||||
|
@ -4399,43 +4481,51 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool doTsdbNextDataBlock(STsdbReader* pReader) {
|
static int32_t doTsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
// cleanup the data that belongs to the previous data block
|
// cleanup the data that belongs to the previous data block
|
||||||
SSDataBlock* pBlock = pReader->pResBlock;
|
SSDataBlock* pBlock = pReader->pResBlock;
|
||||||
blockDataCleanup(pBlock);
|
blockDataCleanup(pBlock);
|
||||||
|
|
||||||
|
*hasNext = false;
|
||||||
|
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
if (taosHashGetSize(pStatus->pTableMap) == 0) {
|
if (taosHashGetSize(pStatus->pTableMap) == 0) {
|
||||||
return false;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStatus->loadFromFile) {
|
if (pStatus->loadFromFile) {
|
||||||
int32_t code = buildBlockFromFiles(pReader);
|
code = buildBlockFromFiles(pReader);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return false;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pBlock->info.rows > 0) {
|
if (pBlock->info.rows <= 0) {
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
resetTableListIndex(&pReader->status);
|
resetTableListIndex(&pReader->status);
|
||||||
buildBlockFromBufferSequentially(pReader);
|
code = buildBlockFromBufferSequentially(pReader);
|
||||||
return pBlock->info.rows > 0;
|
|
||||||
}
|
}
|
||||||
} else { // no data in files, let's try the buffer
|
} else { // no data in files, let's try the buffer
|
||||||
buildBlockFromBufferSequentially(pReader);
|
code = buildBlockFromBufferSequentially(pReader);
|
||||||
return pBlock->info.rows > 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
*hasNext = pBlock->info.rows > 0;
|
||||||
|
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tsdbNextDataBlock(STsdbReader* pReader) {
|
int32_t tsdbNextDataBlock(STsdbReader* pReader, bool *hasNext) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
*hasNext = false;
|
||||||
|
|
||||||
if (isEmptyQueryTimeWindow(&pReader->window) || pReader->step == EXTERNAL_ROWS_NEXT) {
|
if (isEmptyQueryTimeWindow(&pReader->window) || pReader->step == EXTERNAL_ROWS_NEXT) {
|
||||||
return false;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
|
||||||
int32_t code = tsdbAcquireReader(pReader);
|
code = tsdbAcquireReader(pReader);
|
||||||
qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code);
|
qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code);
|
||||||
|
|
||||||
if (pReader->suspended) {
|
if (pReader->suspended) {
|
||||||
|
@ -4443,16 +4533,21 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReader->innerReader[0] != NULL && pReader->step == 0) {
|
if (pReader->innerReader[0] != NULL && pReader->step == 0) {
|
||||||
bool ret = doTsdbNextDataBlock(pReader->innerReader[0]);
|
code = doTsdbNextDataBlock(pReader->innerReader[0], hasNext);
|
||||||
|
if (code) {
|
||||||
|
tsdbReleaseReader(pReader);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
pReader->step = EXTERNAL_ROWS_PREV;
|
pReader->step = EXTERNAL_ROWS_PREV;
|
||||||
if (ret) {
|
if (*hasNext) {
|
||||||
pStatus = &pReader->innerReader[0]->status;
|
pStatus = &pReader->innerReader[0]->status;
|
||||||
if (pStatus->composedDataBlock) {
|
if (pStatus->composedDataBlock) {
|
||||||
qTrace("tsdb/read: %p, unlock read mutex", pReader);
|
qTrace("tsdb/read: %p, unlock read mutex", pReader);
|
||||||
tsdbReleaseReader(pReader);
|
tsdbReleaseReader(pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4469,14 +4564,19 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
||||||
pReader->step = EXTERNAL_ROWS_MAIN;
|
pReader->step = EXTERNAL_ROWS_MAIN;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ret = doTsdbNextDataBlock(pReader);
|
code = doTsdbNextDataBlock(pReader, hasNext);
|
||||||
if (ret) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tsdbReleaseReader(pReader);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (*hasNext) {
|
||||||
if (pStatus->composedDataBlock) {
|
if (pStatus->composedDataBlock) {
|
||||||
qTrace("tsdb/read: %p, unlock read mutex", pReader);
|
qTrace("tsdb/read: %p, unlock read mutex", pReader);
|
||||||
tsdbReleaseReader(pReader);
|
tsdbReleaseReader(pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pReader->step == EXTERNAL_ROWS_MAIN && pReader->innerReader[1] != NULL) {
|
if (pReader->step == EXTERNAL_ROWS_MAIN && pReader->innerReader[1] != NULL) {
|
||||||
|
@ -4488,23 +4588,28 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = doTsdbNextDataBlock(pReader->innerReader[1]);
|
code = doTsdbNextDataBlock(pReader->innerReader[1], hasNext);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tsdbReleaseReader(pReader);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
pReader->step = EXTERNAL_ROWS_NEXT;
|
pReader->step = EXTERNAL_ROWS_NEXT;
|
||||||
if (ret) {
|
if (*hasNext) {
|
||||||
pStatus = &pReader->innerReader[1]->status;
|
pStatus = &pReader->innerReader[1]->status;
|
||||||
if (pStatus->composedDataBlock) {
|
if (pStatus->composedDataBlock) {
|
||||||
qTrace("tsdb/read: %p, unlock read mutex", pReader);
|
qTrace("tsdb/read: %p, unlock read mutex", pReader);
|
||||||
tsdbReleaseReader(pReader);
|
tsdbReleaseReader(pReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
qTrace("tsdb/read: %p, unlock read mutex", pReader);
|
qTrace("tsdb/read: %p, unlock read mutex", pReader);
|
||||||
tsdbReleaseReader(pReader);
|
tsdbReleaseReader(pReader);
|
||||||
|
|
||||||
return false;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols, SColumnDataAgg* pTsAgg) {
|
static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_t numOfCols, SColumnDataAgg* pTsAgg) {
|
||||||
|
@ -4649,20 +4754,27 @@ STableBlockScanInfo* getTableBlockScanInfo(SHashObj* pTableMap, uint64_t uid, co
|
||||||
|
|
||||||
static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
|
static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
|
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
|
||||||
STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr);
|
STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr);
|
||||||
if (pBlockScanInfo == NULL) {
|
if (pBlockScanInfo == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid);
|
code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tBlockDataDestroy(&pStatus->fileBlockData);
|
||||||
|
terrno = code;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = copyBlockDataToSDataBlock(pReader);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
tBlockDataDestroy(&pStatus->fileBlockData);
|
tBlockDataDestroy(&pStatus->fileBlockData);
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
copyBlockDataToSDataBlock(pReader);
|
|
||||||
return pReader->pResBlock;
|
return pReader->pResBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -584,10 +584,16 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
|
||||||
if (isNullVal) {
|
if (isNullVal) {
|
||||||
colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
|
colDataSetNNULL(pColInfoData, 0, pBlock->info.rows);
|
||||||
} else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
|
} else if (pColInfoData->info.type != TSDB_DATA_TYPE_JSON) {
|
||||||
colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows);
|
code = colDataSetNItems(pColInfoData, 0, data, pBlock->info.rows, false);
|
||||||
if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
|
if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
|
||||||
taosMemoryFree(data);
|
taosMemoryFree(data);
|
||||||
}
|
}
|
||||||
|
if (code) {
|
||||||
|
if (freeReader) {
|
||||||
|
metaReaderClear(&mr);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
} else { // todo opt for json tag
|
} else { // todo opt for json tag
|
||||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||||
colDataSetVal(pColInfoData, i, data, false);
|
colDataSetVal(pColInfoData, i, data, false);
|
||||||
|
@ -634,10 +640,22 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
||||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
||||||
|
bool hasNext = false;
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
while (tsdbNextDataBlock(pTableScanInfo->base.dataReader)) {
|
while (true) {
|
||||||
|
code = tsdbNextDataBlock(pTableScanInfo->base.dataReader, &hasNext);
|
||||||
|
if (code) {
|
||||||
|
tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
|
||||||
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!hasNext) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
if (isTaskKilled(pTaskInfo)) {
|
if (isTaskKilled(pTaskInfo)) {
|
||||||
tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
|
tsdbReleaseDataBlock(pTableScanInfo->base.dataReader);
|
||||||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||||
|
@ -1015,7 +1033,15 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsdbNextDataBlock(pReader)) {
|
bool hasNext = false;
|
||||||
|
code = tsdbNextDataBlock(pReader, &hasNext);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
terrno = code;
|
||||||
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hasNext) {
|
||||||
/*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL);
|
/*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL);
|
||||||
doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
|
doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
|
||||||
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
|
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
|
||||||
|
@ -2104,12 +2130,22 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) {
|
||||||
// NOTE: this operator does never check if current status is done or not
|
// NOTE: this operator does never check if current status is done or not
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStreamRawScanInfo* pInfo = pOperator->info;
|
SStreamRawScanInfo* pInfo = pOperator->info;
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
pTaskInfo->streamInfo.metaRsp.metaRspLen = 0; // use metaRspLen !=0 to judge if data is meta
|
pTaskInfo->streamInfo.metaRsp.metaRspLen = 0; // use metaRspLen !=0 to judge if data is meta
|
||||||
pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
|
pTaskInfo->streamInfo.metaRsp.metaRsp = NULL;
|
||||||
|
|
||||||
qDebug("tmqsnap doRawScan called");
|
qDebug("tmqsnap doRawScan called");
|
||||||
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||||
if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) {
|
bool hasNext = false;
|
||||||
|
if (pInfo->dataReader) {
|
||||||
|
code = tsdbNextDataBlock(pInfo->dataReader, &hasNext);
|
||||||
|
if (code) {
|
||||||
|
tsdbReleaseDataBlock(pInfo->dataReader);
|
||||||
|
longjmp(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pInfo->dataReader && hasNext) {
|
||||||
if (isTaskKilled(pTaskInfo)) {
|
if (isTaskKilled(pTaskInfo)) {
|
||||||
tsdbReleaseDataBlock(pInfo->dataReader);
|
tsdbReleaseDataBlock(pInfo->dataReader);
|
||||||
longjmp(pTaskInfo->env, pTaskInfo->code);
|
longjmp(pTaskInfo->env, pTaskInfo->code);
|
||||||
|
@ -2610,8 +2646,21 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
||||||
|
|
||||||
pInfo->base.dataReader = source->dataReader;
|
pInfo->base.dataReader = source->dataReader;
|
||||||
STsdbReader* reader = pInfo->base.dataReader;
|
STsdbReader* reader = pInfo->base.dataReader;
|
||||||
|
bool hasNext = false;
|
||||||
qTrace("tsdb/read-table-data: %p, enter next reader", reader);
|
qTrace("tsdb/read-table-data: %p, enter next reader", reader);
|
||||||
while (tsdbNextDataBlock(reader)) {
|
|
||||||
|
while (true) {
|
||||||
|
code = tsdbNextDataBlock(reader, &hasNext);
|
||||||
|
if (code != 0) {
|
||||||
|
tsdbReleaseDataBlock(reader);
|
||||||
|
pInfo->base.dataReader = NULL;
|
||||||
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!hasNext) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
if (isTaskKilled(pTaskInfo)) {
|
if (isTaskKilled(pTaskInfo)) {
|
||||||
tsdbReleaseDataBlock(reader);
|
tsdbReleaseDataBlock(reader);
|
||||||
pInfo->base.dataReader = NULL;
|
pInfo->base.dataReader = NULL;
|
||||||
|
|
|
@ -1627,7 +1627,7 @@ static void sysTableScanFillTbName(SOperatorInfo* pOperator, const SSysTableScan
|
||||||
char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0};
|
char varTbName[TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE] = {0};
|
||||||
STR_TO_VARSTR(varTbName, name);
|
STR_TO_VARSTR(varTbName, name);
|
||||||
|
|
||||||
colDataSetNItems(pColumnInfoData, 0, varTbName, pBlock->info.rows);
|
colDataSetNItems(pColumnInfoData, 0, varTbName, pBlock->info.rows, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
|
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
|
||||||
|
|
|
@ -1755,7 +1755,11 @@ int32_t winEndTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
|
||||||
int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
int32_t qTbnameFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||||
char* p = colDataGetVarData(pInput->columnData, 0);
|
char* p = colDataGetVarData(pInput->columnData, 0);
|
||||||
|
|
||||||
colDataSetNItems(pOutput->columnData, pOutput->numOfRows, p, pInput->numOfRows);
|
int32_t code = colDataSetNItems(pOutput->columnData, pOutput->numOfRows, p, pInput->numOfRows, true);
|
||||||
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
pOutput->numOfRows += pInput->numOfRows;
|
pOutput->numOfRows += pInput->numOfRows;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,6 +14,8 @@
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 2
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 2
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 2
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 2
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 2
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 2
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 2
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 3
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 3
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py -Q 3
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 3
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 3
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 3
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 3
|
||||||
|
@ -22,6 +24,7 @@
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 4
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py -Q 4
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/columnLenUpdated.py -Q 4
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py
|
||||||
|
|
|
@ -0,0 +1,181 @@
|
||||||
|
|
||||||
|
import taos
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import socket
|
||||||
|
import os
|
||||||
|
import platform
|
||||||
|
if platform.system().lower() == 'windows':
|
||||||
|
import wexpect as taosExpect
|
||||||
|
else:
|
||||||
|
import pexpect as taosExpect
|
||||||
|
|
||||||
|
from util.log import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.dnodes import *
|
||||||
|
|
||||||
|
def taos_command (buildPath, key, value, expectString, sqlString=''):
|
||||||
|
if len(key) == 0:
|
||||||
|
tdLog.exit("taos test key is null!")
|
||||||
|
|
||||||
|
if platform.system().lower() == 'windows':
|
||||||
|
taosCmd = buildPath + '\\build\\bin\\taos.exe '
|
||||||
|
taosCmd = taosCmd.replace('\\','\\\\')
|
||||||
|
else:
|
||||||
|
taosCmd = buildPath + '/build/bin/taos '
|
||||||
|
|
||||||
|
cfgPath = buildPath + "/../sim/psim/cfg"
|
||||||
|
taosCmd = taosCmd + ' -c' + cfgPath + ' -' + key
|
||||||
|
if len(value) != 0:
|
||||||
|
taosCmd = taosCmd + ' ' + value
|
||||||
|
|
||||||
|
tdLog.info ("taos cmd: %s" % taosCmd)
|
||||||
|
|
||||||
|
child = taosExpect.spawn(taosCmd, timeout=20)
|
||||||
|
#output = child.readline()
|
||||||
|
#print (output.decode())
|
||||||
|
if len(expectString) != 0:
|
||||||
|
i = child.expect([expectString, taosExpect.TIMEOUT, taosExpect.EOF], timeout=20)
|
||||||
|
else:
|
||||||
|
i = child.expect([taosExpect.TIMEOUT, taosExpect.EOF], timeout=20)
|
||||||
|
|
||||||
|
if platform.system().lower() == 'windows':
|
||||||
|
retResult = child.before
|
||||||
|
else:
|
||||||
|
retResult = child.before.decode()
|
||||||
|
print(retResult)
|
||||||
|
#print(child.after.decode())
|
||||||
|
if i == 0:
|
||||||
|
print ('taos login success! Here can run sql, taos> ')
|
||||||
|
return "TAOS_OK"
|
||||||
|
else:
|
||||||
|
return "TAOS_FAIL"
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
#updatecfgDict = {'clientCfg': {'serverPort': 7080, 'firstEp': 'trd02:7080', 'secondEp':'trd02:7080'},\
|
||||||
|
# 'serverPort': 7080, 'firstEp': 'trd02:7080'}
|
||||||
|
hostname = socket.gethostname()
|
||||||
|
if (platform.system().lower() == 'windows' and not tdDnodes.dnodes[0].remoteIP == ""):
|
||||||
|
try:
|
||||||
|
config = eval(tdDnodes.dnodes[0].remoteIP)
|
||||||
|
hostname = config["host"]
|
||||||
|
except Exception:
|
||||||
|
hostname = tdDnodes.dnodes[0].remoteIP
|
||||||
|
serverPort = '7080'
|
||||||
|
rpcDebugFlagVal = '143'
|
||||||
|
clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
|
||||||
|
clientCfgDict["serverPort"] = serverPort
|
||||||
|
clientCfgDict["firstEp"] = hostname + ':' + serverPort
|
||||||
|
clientCfgDict["secondEp"] = hostname + ':' + serverPort
|
||||||
|
clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
|
||||||
|
clientCfgDict["fqdn"] = hostname
|
||||||
|
|
||||||
|
updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
|
||||||
|
updatecfgDict["clientCfg"] = clientCfgDict
|
||||||
|
updatecfgDict["serverPort"] = serverPort
|
||||||
|
updatecfgDict["firstEp"] = hostname + ':' + serverPort
|
||||||
|
updatecfgDict["secondEp"] = hostname + ':' + serverPort
|
||||||
|
updatecfgDict["fqdn"] = hostname
|
||||||
|
|
||||||
|
print ("===================: ", updatecfgDict)
|
||||||
|
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
self.replicaVar = int(replicaVar)
|
||||||
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
|
tdSql.init(conn.cursor())
|
||||||
|
|
||||||
|
def getBuildPath(self):
|
||||||
|
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||||
|
|
||||||
|
if ("community" in selfPath):
|
||||||
|
projPath = selfPath[:selfPath.find("community")]
|
||||||
|
else:
|
||||||
|
projPath = selfPath[:selfPath.find("tests")]
|
||||||
|
|
||||||
|
for root, dirs, files in os.walk(projPath):
|
||||||
|
if ("taosd" in files or "taosd.exe" in files):
|
||||||
|
rootRealPath = os.path.dirname(os.path.realpath(root))
|
||||||
|
if ("packaging" not in rootRealPath):
|
||||||
|
buildPath = root[:len(root) - len("/build/bin")]
|
||||||
|
break
|
||||||
|
return buildPath
|
||||||
|
|
||||||
|
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
|
||||||
|
tdSql.prepare()
|
||||||
|
# time.sleep(2)
|
||||||
|
tdSql.query("create user testpy pass 'testpy'")
|
||||||
|
|
||||||
|
buildPath = self.getBuildPath()
|
||||||
|
if (buildPath == ""):
|
||||||
|
tdLog.exit("taosd not found!")
|
||||||
|
else:
|
||||||
|
tdLog.info("taosd found in %s" % buildPath)
|
||||||
|
cfgPath = buildPath + "/../sim/psim/cfg"
|
||||||
|
tdLog.info("cfgPath: %s" % cfgPath)
|
||||||
|
|
||||||
|
checkNetworkStatus = ['0: unavailable', '1: network ok', '2: service ok', '3: service degraded', '4: exiting']
|
||||||
|
netrole = ['client', 'server']
|
||||||
|
|
||||||
|
keyDict = {'h':'', 'P':'6030', 'p':'testpy', 'u':'testpy', 'a':'', 'A':'', 'c':'', 'C':'', 's':'', 'r':'', 'f':'', \
|
||||||
|
'k':'', 't':'', 'n':'', 'l':'1024', 'N':'100', 'V':'', 'd':'db', 'w':'30', '-help':'', '-usage':'', '?':''}
|
||||||
|
|
||||||
|
keyDict['h'] = self.hostname
|
||||||
|
keyDict['c'] = cfgPath
|
||||||
|
keyDict['P'] = self.serverPort
|
||||||
|
|
||||||
|
tdSql.query("drop database if exists db1")
|
||||||
|
tdSql.query("create database if not exists db1 vgroups 1")
|
||||||
|
tdSql.query("use db1")
|
||||||
|
tdSql.query("create table tba (ts timestamp, f1 binary(2))")
|
||||||
|
tdSql.query("insert into tba values (now, '22')")
|
||||||
|
tdSql.query("select * from tba")
|
||||||
|
tdSql.checkData(0, 1, '22')
|
||||||
|
|
||||||
|
keyDict['s'] = "\"alter table db1.tba modify column f1 binary(5) \""
|
||||||
|
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", '')
|
||||||
|
if retCode != "TAOS_OK":
|
||||||
|
tdLog.exit("taos -s fail")
|
||||||
|
|
||||||
|
keyDict['s'] = "\"insert into db1.tba values (now, '55555')\""
|
||||||
|
retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", '')
|
||||||
|
if retCode != "TAOS_OK":
|
||||||
|
tdLog.exit("taos -s fail")
|
||||||
|
|
||||||
|
tdSql.query("select * from tba order by ts")
|
||||||
|
tdSql.checkData(0, 1, '22')
|
||||||
|
tdSql.checkData(1, 1, '55555')
|
||||||
|
|
||||||
|
|
||||||
|
tdSql.query("create table stb (ts timestamp, f1 int) tags (tg1 binary(2))")
|
||||||
|
tdSql.query("create table tb1 using stb tags('bb')")
|
||||||
|
tdSql.query("insert into tb1 values (now, 2)")
|
||||||
|
tdSql.query("select count(*) from stb group by tg1")
|
||||||
|
tdSql.checkData(0, 0, 1)
|
||||||
|
|
||||||
|
keyDict['s'] = "\"alter table db1.stb modify tag tg1 binary(5) \""
|
||||||
|
retCode = taos_command(buildPath, "s", keyDict['s'], "Query OK", '')
|
||||||
|
if retCode != "TAOS_OK":
|
||||||
|
tdLog.exit("taos -s fail")
|
||||||
|
|
||||||
|
keyDict['s'] = "\"create table db1.tb2 using db1.stb tags('bbbbb')\""
|
||||||
|
retCode = taos_command(buildPath, "s", keyDict['s'], "Create OK", '')
|
||||||
|
if retCode != "TAOS_OK":
|
||||||
|
tdLog.exit("taos -s fail")
|
||||||
|
|
||||||
|
keyDict['s'] = "\"insert into db1.tb2 values (now, 2)\""
|
||||||
|
retCode = taos_command(buildPath, "s", keyDict['s'], "Insert OK", '')
|
||||||
|
if retCode != "TAOS_OK":
|
||||||
|
tdLog.exit("taos -s fail")
|
||||||
|
|
||||||
|
tdSql.query("select count(*) from stb group by tg1")
|
||||||
|
tdSql.checkData(0, 0, 1)
|
||||||
|
tdSql.checkData(1, 0, 1)
|
||||||
|
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
Loading…
Reference in New Issue