Merge branch '3.0' into szhou/tms-wc/save-row-simplebuf
This commit is contained in:
commit
955d3df15e
|
@ -379,6 +379,7 @@
|
|||
#define TK_NO_BATCH_SCAN 607
|
||||
#define TK_SORT_FOR_GROUP 608
|
||||
#define TK_PARTITION_FIRST 609
|
||||
#define TK_PARA_TABLES_SORT 610
|
||||
|
||||
|
||||
#define TK_NK_NIL 65535
|
||||
|
|
|
@ -121,6 +121,7 @@ typedef struct SScanLogicNode {
|
|||
bool filesetDelimited; // returned blocks delimited by fileset
|
||||
bool isCountByTag; // true if selectstmt hasCountFunc & part by tag/tbname
|
||||
SArray* pFuncTypes; // for last, last_row
|
||||
bool paraTablesSort; // for table merge scan
|
||||
} SScanLogicNode;
|
||||
|
||||
typedef struct SJoinLogicNode {
|
||||
|
@ -443,6 +444,7 @@ typedef struct STableScanPhysiNode {
|
|||
int8_t igCheckUpdate;
|
||||
bool filesetDelimited;
|
||||
bool needCountEmptyTable;
|
||||
bool paraTablesSort;
|
||||
} STableScanPhysiNode;
|
||||
|
||||
typedef STableScanPhysiNode STableSeqScanPhysiNode;
|
||||
|
|
|
@ -128,6 +128,7 @@ typedef enum EHintOption {
|
|||
HINT_BATCH_SCAN,
|
||||
HINT_SORT_FOR_GROUP,
|
||||
HINT_PARTITION_FIRST,
|
||||
HINT_PARA_TABLES_SORT
|
||||
} EHintOption;
|
||||
|
||||
typedef struct SHintNode {
|
||||
|
|
|
@ -657,7 +657,6 @@ int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckp
|
|||
int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSourceReq* pReq);
|
||||
|
||||
int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckpointSourceRsp* pRsp);
|
||||
int32_t tDecodeStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointSourceRsp* pRsp);
|
||||
|
||||
typedef struct {
|
||||
SMsgHead msgHead;
|
||||
|
|
|
@ -631,7 +631,6 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3
|
|||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i);
|
||||
|
||||
for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) {
|
||||
bool isNull = false;
|
||||
if (pBlock->pBlockAgg == NULL) {
|
||||
|
|
|
@ -152,7 +152,7 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
|
|||
goto STREAM_DECODE_OVER;
|
||||
}
|
||||
|
||||
if (sver != MND_STREAM_VER_NUMBER) {
|
||||
if (sver < 1 || sver > MND_STREAM_VER_NUMBER) {
|
||||
terrno = 0;
|
||||
mError("stream read invalid ver, data ver: %d, curr ver: %d", sver, MND_STREAM_VER_NUMBER);
|
||||
goto STREAM_DECODE_OVER;
|
||||
|
|
|
@ -214,6 +214,13 @@ int32_t tsdbBegin(STsdb* pTsdb);
|
|||
// int32_t tsdbPrepareCommit(STsdb* pTsdb);
|
||||
// int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo);
|
||||
int32_t tsdbCacheCommit(STsdb* pTsdb);
|
||||
int32_t tsdbCacheNewTable(STsdb* pTsdb, int64_t uid, tb_uid_t suid, SSchemaWrapper* pSchemaRow);
|
||||
int32_t tsdbCacheDropTable(STsdb* pTsdb, int64_t uid, tb_uid_t suid, SSchemaWrapper* pSchemaRow);
|
||||
int32_t tsdbCacheDropSubTables(STsdb* pTsdb, SArray* uids, tb_uid_t suid);
|
||||
int32_t tsdbCacheNewSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t col_type);
|
||||
int32_t tsdbCacheDropSTableColumn(STsdb* pTsdb, SArray* uids, int16_t cid, int8_t col_type);
|
||||
int32_t tsdbCacheNewNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type);
|
||||
int32_t tsdbCacheDropNTableColumn(STsdb* pTsdb, int64_t uid, int16_t cid, int8_t col_type);
|
||||
int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo);
|
||||
int32_t tsdbRetention(STsdb* tsdb, int64_t now, int32_t sync);
|
||||
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg);
|
||||
|
|
|
@ -96,29 +96,29 @@ int32_t metaSnapRead(SMetaSnapReader* pReader, uint8_t** ppData) {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (!pData || !nData) {
|
||||
metaError("meta/snap: invalide nData: %" PRId32 " meta snap read failed.", nData);
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + nData);
|
||||
if (*ppData == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
|
||||
pHdr->type = SNAP_DATA_META;
|
||||
pHdr->size = nData;
|
||||
memcpy(pHdr->data, pData, nData);
|
||||
|
||||
metaDebug("vgId:%d, vnode snapshot meta read data, version:%" PRId64 " uid:%" PRId64 " blockLen:%d",
|
||||
TD_VID(pReader->pMeta->pVnode), key.version, key.uid, nData);
|
||||
|
||||
tdbTbcMoveToNext(pReader->pTbc);
|
||||
break;
|
||||
}
|
||||
|
||||
if (!pData || !nData) {
|
||||
metaError("meta/snap: invalide nData: %" PRId32 " meta snap read failed.", nData);
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
*ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + nData);
|
||||
if (*ppData == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData);
|
||||
pHdr->type = SNAP_DATA_META;
|
||||
pHdr->size = nData;
|
||||
memcpy(pHdr->data, pData, nData);
|
||||
|
||||
metaDebug("vgId:%d, vnode snapshot meta read data, version:%" PRId64 " uid:%" PRId64 " blockLen:%d",
|
||||
TD_VID(pReader->pMeta->pVnode), key.version, key.uid, nData);
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
|
||||
|
@ -619,7 +619,8 @@ SMetaTableInfo getMetaTableInfoFromSnapshot(SSnapContext* ctx) {
|
|||
|
||||
int32_t ret = MoveToPosition(ctx, idInfo->version, *uidTmp);
|
||||
if (ret != 0) {
|
||||
metaDebug("tmqsnap getMetaTableInfoFromSnapshot not exist uid:%" PRIi64 " version:%" PRIi64, *uidTmp, idInfo->version);
|
||||
metaDebug("tmqsnap getMetaTableInfoFromSnapshot not exist uid:%" PRIi64 " version:%" PRIi64, *uidTmp,
|
||||
idInfo->version);
|
||||
continue;
|
||||
}
|
||||
tdbTbcGet((TBC*)ctx->pCur, (const void**)&pKey, &kLen, (const void**)&pVal, &vLen);
|
||||
|
|
|
@ -303,6 +303,8 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tb
|
|||
|
||||
tdbTbcClose(pCtbIdxc);
|
||||
|
||||
(void)tsdbCacheDropSubTables(pMeta->pVnode->pTsdb, tbUidList, pReq->suid);
|
||||
|
||||
metaWLock(pMeta);
|
||||
|
||||
for (int32_t iChild = 0; iChild < taosArrayGetSize(tbUidList); iChild++) {
|
||||
|
@ -334,6 +336,40 @@ _exit:
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void metaGetSubtables(SMeta *pMeta, int64_t suid, SArray *uids) {
|
||||
if (!uids) return;
|
||||
|
||||
int c = 0;
|
||||
void *pKey = NULL;
|
||||
int nKey = 0;
|
||||
TBC *pCtbIdxc = NULL;
|
||||
|
||||
tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, NULL);
|
||||
int rc = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c);
|
||||
if (rc < 0) {
|
||||
tdbTbcClose(pCtbIdxc);
|
||||
metaWLock(pMeta);
|
||||
return;
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
rc = tdbTbcNext(pCtbIdxc, &pKey, &nKey, NULL, NULL);
|
||||
if (rc < 0) break;
|
||||
|
||||
if (((SCtbIdxKey *)pKey)->suid < suid) {
|
||||
continue;
|
||||
} else if (((SCtbIdxKey *)pKey)->suid > suid) {
|
||||
break;
|
||||
}
|
||||
|
||||
taosArrayPush(uids, &(((SCtbIdxKey *)pKey)->uid));
|
||||
}
|
||||
|
||||
tdbFree(pKey);
|
||||
|
||||
tdbTbcClose(pCtbIdxc);
|
||||
}
|
||||
|
||||
int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||
SMetaEntry oStbEntry = {0};
|
||||
SMetaEntry nStbEntry = {0};
|
||||
|
@ -397,9 +433,39 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
|||
nStbEntry.stbEntry.schemaRow = pReq->schemaRow;
|
||||
nStbEntry.stbEntry.schemaTag = pReq->schemaTag;
|
||||
|
||||
int32_t deltaCol = pReq->schemaRow.nCols - oStbEntry.stbEntry.schemaRow.nCols;
|
||||
int nCols = pReq->schemaRow.nCols;
|
||||
int onCols = oStbEntry.stbEntry.schemaRow.nCols;
|
||||
int32_t deltaCol = nCols - onCols;
|
||||
bool updStat = deltaCol != 0 && !metaTbInFilterCache(pMeta, pReq->name, 1);
|
||||
|
||||
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
|
||||
STsdb *pTsdb = pMeta->pVnode->pTsdb;
|
||||
SArray *uids = taosArrayInit(8, sizeof(int64_t));
|
||||
if (deltaCol == 1) {
|
||||
int16_t cid = pReq->schemaRow.pSchema[nCols - 1].colId;
|
||||
int8_t col_type = pReq->schemaRow.pSchema[nCols - 1].type;
|
||||
|
||||
metaGetSubtables(pMeta, pReq->suid, uids);
|
||||
tsdbCacheNewSTableColumn(pTsdb, uids, cid, col_type);
|
||||
} else if (deltaCol == -1) {
|
||||
int16_t cid = -1;
|
||||
int8_t col_type = -1;
|
||||
for (int i = 0, j = 0; i < nCols && j < onCols; ++i, ++j) {
|
||||
if (pReq->schemaRow.pSchema[i].colId != oStbEntry.stbEntry.schemaRow.pSchema[j].colId) {
|
||||
cid = oStbEntry.stbEntry.schemaRow.pSchema[j].colId;
|
||||
col_type = oStbEntry.stbEntry.schemaRow.pSchema[j].type;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (cid != -1) {
|
||||
metaGetSubtables(pMeta, pReq->suid, uids);
|
||||
tsdbCacheDropSTableColumn(pTsdb, uids, cid, col_type);
|
||||
}
|
||||
}
|
||||
if (uids) taosArrayDestroy(uids);
|
||||
}
|
||||
|
||||
metaWLock(pMeta);
|
||||
// compare two entry
|
||||
if (oStbEntry.stbEntry.schemaRow.version != pReq->schemaRow.version) {
|
||||
|
@ -822,6 +888,10 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
|
|||
metaUidCacheClear(pMeta, me.ctbEntry.suid);
|
||||
metaTbGroupCacheClear(pMeta, me.ctbEntry.suid);
|
||||
metaULock(pMeta);
|
||||
|
||||
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
|
||||
tsdbCacheNewTable(pMeta->pVnode->pTsdb, me.uid, me.ctbEntry.suid, NULL);
|
||||
}
|
||||
} else {
|
||||
me.ntbEntry.btime = pReq->btime;
|
||||
me.ntbEntry.ttlDays = pReq->ttl;
|
||||
|
@ -832,6 +902,10 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
|
|||
|
||||
++pStats->numOfNTables;
|
||||
pStats->numOfNTimeSeries += me.ntbEntry.schemaRow.nCols - 1;
|
||||
|
||||
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
|
||||
tsdbCacheNewTable(pMeta->pVnode->pTsdb, me.uid, -1, &me.ntbEntry.schemaRow);
|
||||
}
|
||||
}
|
||||
|
||||
if (metaHandleEntry(pMeta, &me) < 0) goto _err;
|
||||
|
@ -896,6 +970,10 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
|
|||
|
||||
if ((type == TSDB_CHILD_TABLE || type == TSDB_NORMAL_TABLE) && tbUids) {
|
||||
taosArrayPush(tbUids, &uid);
|
||||
|
||||
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
|
||||
tsdbCacheDropTable(pMeta->pVnode->pTsdb, uid, suid, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
if ((type == TSDB_CHILD_TABLE) && tbUid) {
|
||||
|
@ -930,6 +1008,11 @@ void metaDropTables(SMeta *pMeta, SArray *tbUids) {
|
|||
}
|
||||
tSimpleHashPut(suidHash, &suid, sizeof(tb_uid_t), &nCtbDropped, sizeof(int64_t));
|
||||
}
|
||||
|
||||
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
|
||||
tsdbCacheDropTable(pMeta->pVnode->pTsdb, uid, suid, NULL);
|
||||
}
|
||||
|
||||
metaDebug("batch drop table:%" PRId64, uid);
|
||||
}
|
||||
metaULock(pMeta);
|
||||
|
@ -1172,11 +1255,22 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *p
|
|||
metaUpdateStbStats(pMeta, e.ctbEntry.suid, -1, 0);
|
||||
metaUidCacheClear(pMeta, e.ctbEntry.suid);
|
||||
metaTbGroupCacheClear(pMeta, e.ctbEntry.suid);
|
||||
/*
|
||||
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
|
||||
tsdbCacheDropTable(pMeta->pVnode->pTsdb, e.uid, e.ctbEntry.suid, NULL);
|
||||
}
|
||||
*/
|
||||
} else if (e.type == TSDB_NORMAL_TABLE) {
|
||||
// drop schema.db (todo)
|
||||
|
||||
--pMeta->pVnode->config.vndStats.numOfNTables;
|
||||
pMeta->pVnode->config.vndStats.numOfNTimeSeries -= e.ntbEntry.schemaRow.nCols - 1;
|
||||
|
||||
/*
|
||||
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
|
||||
tsdbCacheDropTable(pMeta->pVnode->pTsdb, e.uid, -1, &e.ntbEntry.schemaRow);
|
||||
}
|
||||
*/
|
||||
} else if (e.type == TSDB_SUPER_TABLE) {
|
||||
tdbTbDelete(pMeta->pSuidIdx, &e.uid, sizeof(tb_uid_t), pMeta->txn);
|
||||
// drop schema.db (todo)
|
||||
|
@ -1364,6 +1458,12 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
|||
|
||||
++pMeta->pVnode->config.vndStats.numOfNTimeSeries;
|
||||
metaTimeSeriesNotifyCheck(pMeta);
|
||||
|
||||
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
|
||||
int16_t cid = pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].colId;
|
||||
int8_t col_type = pSchema->pSchema[entry.ntbEntry.schemaRow.nCols - 1].type;
|
||||
(void)tsdbCacheNewNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, col_type);
|
||||
}
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_DROP_COLUMN:
|
||||
if (pColumn == NULL) {
|
||||
|
@ -1386,6 +1486,13 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
|
|||
pSchema->nCols--;
|
||||
|
||||
--pMeta->pVnode->config.vndStats.numOfNTimeSeries;
|
||||
|
||||
if (!TSDB_CACHE_NO(pMeta->pVnode->config)) {
|
||||
int16_t cid = pColumn->colId;
|
||||
int8_t col_type = pColumn->type;
|
||||
|
||||
(void)tsdbCacheDropNTableColumn(pMeta->pVnode->pTsdb, entry.uid, cid, col_type);
|
||||
}
|
||||
break;
|
||||
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
|
||||
if (pColumn == NULL) {
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
|
||||
#define ROCKS_BATCH_SIZE (4096)
|
||||
|
||||
#if 0
|
||||
static int32_t tsdbOpenBICache(STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
SLRUCache *pCache = taosLRUCacheInit(10 * 1024 * 1024, 0, .5);
|
||||
|
@ -52,6 +53,7 @@ static void tsdbCloseBICache(STsdb *pTsdb) {
|
|||
taosThreadMutexDestroy(&pTsdb->biMutex);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
static int32_t tsdbOpenBCache(STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
|
@ -431,25 +433,6 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, int8_t ltype) {
|
||||
SLastCol *pLastCol = NULL;
|
||||
|
||||
char *err = NULL;
|
||||
size_t vlen = 0;
|
||||
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid};
|
||||
size_t klen = ROCKS_KEY_LEN;
|
||||
char *value = NULL;
|
||||
value = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, (char *)key, klen, &vlen, &err);
|
||||
if (NULL != err) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
|
||||
rocksdb_free(err);
|
||||
}
|
||||
|
||||
pLastCol = tsdbCacheDeserialize(value);
|
||||
|
||||
return pLastCol;
|
||||
}
|
||||
|
||||
static void reallocVarData(SColVal *pColVal) {
|
||||
if (IS_VAR_DATA_TYPE(pColVal->type)) {
|
||||
uint8_t *pVal = pColVal->value.pData;
|
||||
|
@ -476,6 +459,355 @@ static void tsdbCacheDeleter(const void *key, size_t klen, void *value, void *ud
|
|||
taosMemoryFree(value);
|
||||
}
|
||||
|
||||
static int32_t tsdbCacheNewTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t ltype) {
|
||||
int32_t code = 0;
|
||||
|
||||
SLRUCache *pCache = pTsdb->lruCache;
|
||||
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
||||
SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, col_type), .dirty = 1};
|
||||
SLastCol *pLastCol = &noneCol;
|
||||
|
||||
SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol));
|
||||
*pTmpLastCol = *pLastCol;
|
||||
pLastCol = pTmpLastCol;
|
||||
|
||||
reallocVarData(&pLastCol->colVal);
|
||||
size_t charge = sizeof(*pLastCol);
|
||||
if (IS_VAR_DATA_TYPE(pLastCol->colVal.type)) {
|
||||
charge += pLastCol->colVal.value.nData;
|
||||
}
|
||||
|
||||
SLastKey *pLastKey = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid};
|
||||
LRUStatus status = taosLRUCacheInsert(pCache, pLastKey, ROCKS_KEY_LEN, pLastCol, charge, tsdbCacheDeleter, NULL,
|
||||
TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState);
|
||||
if (status != TAOS_LRU_STATUS_OK) {
|
||||
code = -1;
|
||||
}
|
||||
/*
|
||||
// store result back to rocks cache
|
||||
char *value = NULL;
|
||||
size_t vlen = 0;
|
||||
tsdbCacheSerialize(pLastCol, &value, &vlen);
|
||||
|
||||
SLastKey *key = pLastKey;
|
||||
size_t klen = ROCKS_KEY_LEN;
|
||||
rocksdb_writebatch_put(wb, (char *)key, klen, value, vlen);
|
||||
taosMemoryFree(value);
|
||||
*/
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheCommitNoLock(STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
char *err = NULL;
|
||||
|
||||
SLRUCache *pCache = pTsdb->lruCache;
|
||||
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
||||
|
||||
taosLRUCacheApply(pCache, tsdbCacheFlushDirty, &pTsdb->flushState);
|
||||
|
||||
rocksMayWrite(pTsdb, true, false, false);
|
||||
rocksMayWrite(pTsdb, true, true, false);
|
||||
rocksdb_flush(pTsdb->rCache.db, pTsdb->rCache.flushoptions, &err);
|
||||
|
||||
if (NULL != err) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
|
||||
rocksdb_free(err);
|
||||
code = -1;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbCacheDropTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type, int8_t ltype) {
|
||||
int32_t code = 0;
|
||||
|
||||
// build keys & multi get from rocks
|
||||
char **keys_list = taosMemoryCalloc(2, sizeof(char *));
|
||||
size_t *keys_list_sizes = taosMemoryCalloc(2, sizeof(size_t));
|
||||
const size_t klen = ROCKS_KEY_LEN;
|
||||
|
||||
char *keys = taosMemoryCalloc(2, sizeof(SLastKey));
|
||||
((SLastKey *)keys)[0] = (SLastKey){.ltype = 1, .uid = uid, .cid = cid};
|
||||
((SLastKey *)keys)[1] = (SLastKey){.ltype = 0, .uid = uid, .cid = cid};
|
||||
|
||||
keys_list[0] = keys;
|
||||
keys_list[1] = keys + sizeof(SLastKey);
|
||||
keys_list_sizes[0] = klen;
|
||||
keys_list_sizes[1] = klen;
|
||||
|
||||
char **values_list = taosMemoryCalloc(2, sizeof(char *));
|
||||
size_t *values_list_sizes = taosMemoryCalloc(2, sizeof(size_t));
|
||||
char **errs = taosMemoryCalloc(2, sizeof(char *));
|
||||
|
||||
// rocksMayWrite(pTsdb, true, false, false);
|
||||
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, 2, (const char *const *)keys_list, keys_list_sizes,
|
||||
values_list, values_list_sizes, errs);
|
||||
|
||||
for (int i = 0; i < 2; ++i) {
|
||||
if (errs[i]) {
|
||||
rocksdb_free(errs[i]);
|
||||
}
|
||||
}
|
||||
taosMemoryFree(errs);
|
||||
|
||||
rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch;
|
||||
{
|
||||
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[0]);
|
||||
if (NULL != pLastCol) {
|
||||
rocksdb_writebatch_delete(wb, keys_list[0], klen);
|
||||
}
|
||||
pLastCol = tsdbCacheDeserialize(values_list[1]);
|
||||
if (NULL != pLastCol) {
|
||||
rocksdb_writebatch_delete(wb, keys_list[1], klen);
|
||||
}
|
||||
|
||||
rocksdb_free(values_list[0]);
|
||||
rocksdb_free(values_list[1]);
|
||||
|
||||
bool erase = false;
|
||||
LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[0], klen);
|
||||
if (h) {
|
||||
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
|
||||
erase = true;
|
||||
|
||||
taosLRUCacheRelease(pTsdb->lruCache, h, erase);
|
||||
}
|
||||
if (erase) {
|
||||
taosLRUCacheErase(pTsdb->lruCache, keys_list[0], klen);
|
||||
}
|
||||
|
||||
erase = false;
|
||||
h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[1], klen);
|
||||
if (h) {
|
||||
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
|
||||
erase = true;
|
||||
|
||||
taosLRUCacheRelease(pTsdb->lruCache, h, erase);
|
||||
}
|
||||
if (erase) {
|
||||
taosLRUCacheErase(pTsdb->lruCache, keys_list[1], klen);
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFree(keys_list[0]);
|
||||
|
||||
taosMemoryFree(keys_list);
|
||||
taosMemoryFree(keys_list_sizes);
|
||||
taosMemoryFree(values_list);
|
||||
taosMemoryFree(values_list_sizes);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheNewTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrapper *pSchemaRow) {
|
||||
int32_t code = 0;
|
||||
|
||||
taosThreadMutexLock(&pTsdb->lruMutex);
|
||||
|
||||
if (suid < 0) {
|
||||
int nCols = pSchemaRow->nCols;
|
||||
for (int i = 0; i < nCols; ++i) {
|
||||
int16_t cid = pSchemaRow->pSchema[i].colId;
|
||||
int8_t col_type = pSchemaRow->pSchema[i].type;
|
||||
|
||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
|
||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
|
||||
}
|
||||
} else {
|
||||
STSchema *pTSchema = NULL;
|
||||
code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int nCols = pTSchema->numOfCols;
|
||||
for (int i = 0; i < nCols; ++i) {
|
||||
int16_t cid = pTSchema->columns[i].colId;
|
||||
int8_t col_type = pTSchema->columns[i].type;
|
||||
|
||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
|
||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
|
||||
}
|
||||
|
||||
taosMemoryFree(pTSchema);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheDropTable(STsdb *pTsdb, tb_uid_t uid, tb_uid_t suid, SSchemaWrapper *pSchemaRow) {
|
||||
int32_t code = 0;
|
||||
|
||||
taosThreadMutexLock(&pTsdb->lruMutex);
|
||||
|
||||
(void)tsdbCacheCommitNoLock(pTsdb);
|
||||
|
||||
if (suid < 0) {
|
||||
int nCols = pSchemaRow->nCols;
|
||||
for (int i = 0; i < nCols; ++i) {
|
||||
int16_t cid = pSchemaRow->pSchema[i].colId;
|
||||
int8_t col_type = pSchemaRow->pSchema[i].type;
|
||||
|
||||
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0);
|
||||
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1);
|
||||
}
|
||||
} else {
|
||||
STSchema *pTSchema = NULL;
|
||||
code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pTSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int nCols = pTSchema->numOfCols;
|
||||
for (int i = 0; i < nCols; ++i) {
|
||||
int16_t cid = pTSchema->columns[i].colId;
|
||||
int8_t col_type = pTSchema->columns[i].type;
|
||||
|
||||
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0);
|
||||
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1);
|
||||
}
|
||||
|
||||
taosMemoryFree(pTSchema);
|
||||
}
|
||||
|
||||
rocksMayWrite(pTsdb, true, false, false);
|
||||
|
||||
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheDropSubTables(STsdb *pTsdb, SArray *uids, tb_uid_t suid) {
|
||||
int32_t code = 0;
|
||||
|
||||
taosThreadMutexLock(&pTsdb->lruMutex);
|
||||
|
||||
(void)tsdbCacheCommitNoLock(pTsdb);
|
||||
|
||||
STSchema *pTSchema = NULL;
|
||||
code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, suid, -1, &pTSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
return -1;
|
||||
}
|
||||
for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
|
||||
int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
|
||||
|
||||
int nCols = pTSchema->numOfCols;
|
||||
for (int i = 0; i < nCols; ++i) {
|
||||
int16_t cid = pTSchema->columns[i].colId;
|
||||
int8_t col_type = pTSchema->columns[i].type;
|
||||
|
||||
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0);
|
||||
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1);
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFree(pTSchema);
|
||||
|
||||
rocksMayWrite(pTsdb, true, false, false);
|
||||
|
||||
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheNewNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type) {
|
||||
int32_t code = 0;
|
||||
|
||||
taosThreadMutexLock(&pTsdb->lruMutex);
|
||||
|
||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
|
||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
|
||||
|
||||
// rocksMayWrite(pTsdb, true, false, false);
|
||||
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
//(void)tsdbCacheCommit(pTsdb);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheDropNTableColumn(STsdb *pTsdb, int64_t uid, int16_t cid, int8_t col_type) {
|
||||
int32_t code = 0;
|
||||
|
||||
taosThreadMutexLock(&pTsdb->lruMutex);
|
||||
|
||||
(void)tsdbCacheCommitNoLock(pTsdb);
|
||||
|
||||
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0);
|
||||
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1);
|
||||
|
||||
rocksMayWrite(pTsdb, true, false, true);
|
||||
|
||||
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheNewSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t col_type) {
|
||||
int32_t code = 0;
|
||||
|
||||
taosThreadMutexLock(&pTsdb->lruMutex);
|
||||
|
||||
for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
|
||||
tb_uid_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
|
||||
|
||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 0);
|
||||
(void)tsdbCacheNewTableColumn(pTsdb, uid, cid, col_type, 1);
|
||||
}
|
||||
|
||||
// rocksMayWrite(pTsdb, true, false, false);
|
||||
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
//(void)tsdbCacheCommit(pTsdb);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheDropSTableColumn(STsdb *pTsdb, SArray *uids, int16_t cid, int8_t col_type) {
|
||||
int32_t code = 0;
|
||||
|
||||
taosThreadMutexLock(&pTsdb->lruMutex);
|
||||
|
||||
(void)tsdbCacheCommitNoLock(pTsdb);
|
||||
|
||||
for (int i = 0; i < TARRAY_SIZE(uids); ++i) {
|
||||
int64_t uid = ((tb_uid_t *)TARRAY_DATA(uids))[i];
|
||||
|
||||
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 0);
|
||||
(void)tsdbCacheDropTableColumn(pTsdb, uid, cid, col_type, 1);
|
||||
}
|
||||
|
||||
rocksMayWrite(pTsdb, true, false, true);
|
||||
|
||||
taosThreadMutexUnlock(&pTsdb->lruMutex);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static SLastCol *tsdbCacheLookup(STsdb *pTsdb, tb_uid_t uid, int16_t cid, int8_t ltype) {
|
||||
SLastCol *pLastCol = NULL;
|
||||
|
||||
char *err = NULL;
|
||||
size_t vlen = 0;
|
||||
SLastKey *key = &(SLastKey){.ltype = ltype, .uid = uid, .cid = cid};
|
||||
size_t klen = ROCKS_KEY_LEN;
|
||||
char *value = NULL;
|
||||
value = rocksdb_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, (char *)key, klen, &vlen, &err);
|
||||
if (NULL != err) {
|
||||
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, err);
|
||||
rocksdb_free(err);
|
||||
}
|
||||
|
||||
pLastCol = tsdbCacheDeserialize(value);
|
||||
|
||||
return pLastCol;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
int idx;
|
||||
SLastKey key;
|
||||
|
@ -1297,11 +1629,13 @@ int32_t tsdbOpenCache(STsdb *pTsdb) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
#if 0
|
||||
code = tsdbOpenBICache(pTsdb);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
#endif
|
||||
|
||||
code = tsdbOpenBCache(pTsdb);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1343,7 +1677,9 @@ void tsdbCloseCache(STsdb *pTsdb) {
|
|||
taosThreadMutexDestroy(&pTsdb->lruMutex);
|
||||
}
|
||||
|
||||
#if 0
|
||||
tsdbCloseBICache(pTsdb);
|
||||
#endif
|
||||
tsdbCloseBCache(pTsdb);
|
||||
tsdbClosePgCache(pTsdb);
|
||||
tsdbCloseRocksCache(pTsdb);
|
||||
|
@ -3117,6 +3453,7 @@ int32_t tsdbCacheGetElems(SVnode *pVnode) {
|
|||
return elems;
|
||||
}
|
||||
|
||||
#if 0
|
||||
static void getBICacheKey(int32_t fid, int64_t commitID, char *key, int *len) {
|
||||
struct {
|
||||
int32_t fid;
|
||||
|
@ -3193,7 +3530,6 @@ int32_t tsdbCacheGetBlockIdx(SLRUCache *pCache, SDataFReader *pFileReader, LRUHa
|
|||
return code;
|
||||
}
|
||||
|
||||
#ifdef BUILD_NO_CALL
|
||||
int32_t tsdbBICacheRelease(SLRUCache *pCache, LRUHandle *h) {
|
||||
int32_t code = 0;
|
||||
|
||||
|
|
|
@ -848,6 +848,7 @@ typedef struct SCtgCacheItemInfo {
|
|||
do { \
|
||||
CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); \
|
||||
CTG_API_DEBUG("CTG API leave %s", __FUNCTION__); \
|
||||
return; \
|
||||
} while (0)
|
||||
|
||||
#define CTG_API_ENTER() \
|
||||
|
|
|
@ -2851,7 +2851,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
|
|||
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
|
||||
taosHashRelease(dbCache->tbCache, pCache);
|
||||
|
||||
ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
|
||||
ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, pTableMeta->tableType, dbFName);
|
||||
|
||||
res.pRes = pTableMeta;
|
||||
taosArrayPush(ctx->pResList, &res);
|
||||
|
@ -2868,7 +2868,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
|
|||
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
|
||||
taosHashRelease(dbCache->tbCache, pCache);
|
||||
|
||||
ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, tbMeta->tableType, dbFName);
|
||||
ctgDebug("Got tb %s meta from cache, type:%d, dbFName:%s", pName->tname, pTableMeta->tableType, dbFName);
|
||||
|
||||
res.pRes = pTableMeta;
|
||||
taosArrayPush(ctx->pResList, &res);
|
||||
|
|
|
@ -283,6 +283,42 @@ typedef struct STableScanInfo {
|
|||
bool needCountEmptyTable;
|
||||
} STableScanInfo;
|
||||
|
||||
typedef enum ESubTableInputType {
|
||||
SUB_TABLE_MEM_BLOCK,
|
||||
SUB_TABLE_EXT_PAGES,
|
||||
} ESubTableInputType;
|
||||
|
||||
typedef struct STmsSubTableInput {
|
||||
STsdbReader* pReader;
|
||||
SQueryTableDataCond tblCond;
|
||||
STableKeyInfo* pKeyInfo;
|
||||
bool bInMemReader;
|
||||
ESubTableInputType type;
|
||||
SSDataBlock* pReaderBlock;
|
||||
|
||||
SArray* aBlockPages;
|
||||
SSDataBlock* pPageBlock;
|
||||
int32_t pageIdx;
|
||||
|
||||
int32_t rowIdx;
|
||||
int64_t* aTs;
|
||||
} STmsSubTableInput;
|
||||
|
||||
typedef struct SBlockOrderInfo SBlockOrderInfo;
|
||||
typedef struct STmsSubTablesMergeInfo {
|
||||
SBlockOrderInfo* pOrderInfo;
|
||||
|
||||
int32_t numSubTables;
|
||||
STmsSubTableInput* aInputs;
|
||||
SMultiwayMergeTreeInfo* pTree;
|
||||
int32_t numSubTablesCompleted;
|
||||
|
||||
int32_t numTableBlocksInMem;
|
||||
SDiskbasedBuf* pBlocksBuf;
|
||||
|
||||
int32_t numInMemReaders;
|
||||
} STmsSubTablesMergeInfo;
|
||||
|
||||
typedef struct STableMergeScanInfo {
|
||||
int32_t tableStartIndex;
|
||||
int32_t tableEndIndex;
|
||||
|
@ -296,7 +332,6 @@ typedef struct STableMergeScanInfo {
|
|||
SSDataBlock* pSortInputBlock;
|
||||
SSDataBlock* pReaderBlock;
|
||||
int64_t startTs; // sort start time
|
||||
SArray* sortSourceParams;
|
||||
SLimitInfo limitInfo;
|
||||
int64_t numOfRows;
|
||||
SScanInfo scanInfo;
|
||||
|
@ -317,7 +352,10 @@ typedef struct STableMergeScanInfo {
|
|||
SSDataBlock* nextDurationBlocks[2];
|
||||
bool rtnNextDurationBlocks;
|
||||
int32_t nextDurationBlocksIdx;
|
||||
|
||||
bool bSortRowId;
|
||||
|
||||
STmsSubTablesMergeInfo* pSubTablesMergeInfo;
|
||||
} STableMergeScanInfo;
|
||||
|
||||
typedef struct STagScanFilterContext {
|
||||
|
|
|
@ -3421,6 +3421,420 @@ _error:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
// table merge scan operator
|
||||
|
||||
// table merge scan operator
|
||||
// TODO: limit / duration optimization
|
||||
// TODO: get block from tsdReader function, with task killed, func_data all filter out, skip, finish
|
||||
// TODO: error processing, memory freeing
|
||||
// TODO: add log for error and perf
|
||||
// TODO: tsdb reader open/close dynamically
|
||||
// TODO: blockdata deep cleanup
|
||||
|
||||
static int32_t subTblRowCompareFn(const void* pLeft, const void* pRight, void* param) {
|
||||
int32_t left = *(int32_t*)pLeft;
|
||||
int32_t right = *(int32_t*)pRight;
|
||||
STmsSubTablesMergeInfo* pInfo = (STmsSubTablesMergeInfo*)param;
|
||||
|
||||
int32_t leftIdx = pInfo->aInputs[left].rowIdx;
|
||||
int32_t rightIdx = pInfo->aInputs[right].rowIdx;
|
||||
|
||||
if (leftIdx == -1) {
|
||||
return 1;
|
||||
} else if (rightIdx == -1) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int64_t leftTs = pInfo->aInputs[left].aTs[leftIdx];
|
||||
int64_t rightTs = pInfo->aInputs[right].aTs[rightIdx];
|
||||
int32_t ret = leftTs>rightTs ? 1 : ((leftTs < rightTs) ? -1 : 0);
|
||||
if (pInfo->pOrderInfo->order == TSDB_ORDER_DESC) {
|
||||
ret = -1 * ret;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
|
||||
memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
|
||||
dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
|
||||
for (int i = 0; i < src->numOfCols; i++) {
|
||||
dst->colList[i] = src->colList[i];
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t fetchNextSubTableBlockFromReader(SOperatorInfo* pOperator, STmsSubTableInput* pInput, bool* pSubTableHasBlock) {
|
||||
int32_t code = 0;
|
||||
|
||||
STableMergeScanInfo* pInfo = pOperator->info;
|
||||
SReadHandle* pHandle = &pInfo->base.readHandle;
|
||||
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
const SStorageAPI* pAPI= &pTaskInfo->storageAPI;
|
||||
|
||||
blockDataCleanup(pInput->pReaderBlock);
|
||||
if (!pInput->bInMemReader) {
|
||||
code = pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInput->tblCond, pInput->pKeyInfo, 1, pInput->pReaderBlock,
|
||||
(void**)&pInput->pReader, GET_TASKID(pTaskInfo), NULL);
|
||||
if (code != 0) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
}
|
||||
|
||||
pInfo->base.dataReader = pInput->pReader;
|
||||
|
||||
while (true) {
|
||||
bool hasNext = false;
|
||||
int32_t code = pAPI->tsdReader.tsdNextDataBlock(pInfo->base.dataReader, &hasNext);
|
||||
if (code != 0) {
|
||||
pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->base.dataReader);
|
||||
pInfo->base.dataReader = NULL;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
if (!hasNext || isTaskKilled(pTaskInfo)) {
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
pAPI->tsdReader.tsdReaderReleaseDataBlock(pInfo->base.dataReader);
|
||||
pInfo->base.dataReader = NULL;
|
||||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||
}
|
||||
*pSubTableHasBlock = false;
|
||||
break;
|
||||
}
|
||||
|
||||
if (pInput->tblCond.order == TSDB_ORDER_ASC) {
|
||||
pInput->tblCond.twindows.skey = pInput->pReaderBlock->info.window.ekey + 1;
|
||||
} else {
|
||||
pInput->tblCond.twindows.ekey = pInput->pReaderBlock->info.window.skey - 1;
|
||||
}
|
||||
|
||||
uint32_t status = 0;
|
||||
code = loadDataBlock(pOperator, &pInfo->base, pInput->pReaderBlock, &status);
|
||||
if (code != 0) {
|
||||
pInfo->base.dataReader = NULL;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
if (status == FUNC_DATA_REQUIRED_ALL_FILTEROUT) {
|
||||
*pSubTableHasBlock = false;
|
||||
break;
|
||||
}
|
||||
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pInput->pReaderBlock->info.rows == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
*pSubTableHasBlock = true;
|
||||
break;
|
||||
}
|
||||
|
||||
if (*pSubTableHasBlock) {
|
||||
pInput->pReaderBlock->info.id.groupId = tableListGetTableGroupId(pInfo->base.pTableListInfo, pInput->pReaderBlock->info.id.uid);
|
||||
pOperator->resultInfo.totalRows += pInput->pReaderBlock->info.rows;
|
||||
}
|
||||
if (!pInput->bInMemReader || !*pSubTableHasBlock) {
|
||||
pAPI->tsdReader.tsdReaderClose(pInput->pReader);
|
||||
pInput->pReader = NULL;
|
||||
}
|
||||
|
||||
pInfo->base.dataReader = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void setGroupStartEndIndex(STableMergeScanInfo* pInfo) {
|
||||
pInfo->bGroupProcessed = false;
|
||||
|
||||
size_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
|
||||
int32_t i = pInfo->tableStartIndex + 1;
|
||||
for (; i < numOfTables; ++i) {
|
||||
STableKeyInfo* tableKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i);
|
||||
if (tableKeyInfo->groupId != pInfo->groupId) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
pInfo->tableEndIndex = i - 1;
|
||||
}
|
||||
|
||||
static int32_t openSubTablesMergeSort(STmsSubTablesMergeInfo* pSubTblsInfo) {
|
||||
for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) {
|
||||
STmsSubTableInput * pInput = pSubTblsInfo->aInputs + i;
|
||||
if (pInput->rowIdx == -1) {
|
||||
continue;
|
||||
}
|
||||
if (pInput->type == SUB_TABLE_MEM_BLOCK) {
|
||||
pInput->rowIdx = 0;
|
||||
pInput->pageIdx = -1;
|
||||
}
|
||||
SSDataBlock* pInputBlock = (pInput->type == SUB_TABLE_MEM_BLOCK) ? pInput->pReaderBlock : pInput->pPageBlock;
|
||||
SColumnInfoData* col = taosArrayGet(pInputBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId);
|
||||
pInput->aTs = (int64_t*)col->pData;
|
||||
}
|
||||
tMergeTreeCreate(&pSubTblsInfo->pTree, pSubTblsInfo->numSubTables, pSubTblsInfo, subTblRowCompareFn);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t initSubTablesMergeInfo(STableMergeScanInfo* pInfo) {
|
||||
setGroupStartEndIndex(pInfo);
|
||||
STmsSubTablesMergeInfo* pSubTblsInfo = taosMemoryCalloc(1, sizeof(STmsSubTablesMergeInfo));
|
||||
if (pSubTblsInfo == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pSubTblsInfo->pOrderInfo = taosArrayGet(pInfo->pSortInfo, 0);
|
||||
pSubTblsInfo->numSubTables = pInfo->tableEndIndex - pInfo->tableStartIndex + 1;
|
||||
pSubTblsInfo->aInputs = taosMemoryCalloc(pSubTblsInfo->numSubTables, sizeof(STmsSubTableInput));
|
||||
if (pSubTblsInfo->aInputs == NULL) {
|
||||
taosMemoryFree(pSubTblsInfo);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
int32_t bufPageSize = pInfo->bufPageSize;
|
||||
int32_t inMemSize = (pSubTblsInfo->numSubTables - pSubTblsInfo->numTableBlocksInMem) * bufPageSize;
|
||||
int32_t code =
|
||||
createDiskbasedBuf(&pSubTblsInfo->pBlocksBuf, pInfo->bufPageSize, inMemSize, "blocksExternalBuf", tsTempDir);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosMemoryFree(pSubTblsInfo->aInputs);
|
||||
taosMemoryFree(pSubTblsInfo);
|
||||
return code;
|
||||
}
|
||||
pSubTblsInfo->numTableBlocksInMem = pSubTblsInfo->numSubTables;
|
||||
pSubTblsInfo->numInMemReaders = pSubTblsInfo->numSubTables;
|
||||
|
||||
pInfo->pSubTablesMergeInfo = pSubTblsInfo;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t initSubTableInputs(SOperatorInfo* pOperator, STableMergeScanInfo* pInfo) {
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SReadHandle* pHandle = &pInfo->base.readHandle;
|
||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||
|
||||
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
|
||||
|
||||
for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) {
|
||||
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i;
|
||||
pInput->type = SUB_TABLE_MEM_BLOCK;
|
||||
dumpQueryTableCond(&pInfo->base.cond, &pInput->tblCond);
|
||||
pInput->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||
pInput->pPageBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||
STableKeyInfo* keyInfo = tableListGetInfo(pInfo->base.pTableListInfo, i + pInfo->tableStartIndex);
|
||||
pInput->pKeyInfo = keyInfo;
|
||||
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||
}
|
||||
|
||||
if (i + 1 < pSubTblsInfo->numInMemReaders) {
|
||||
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInput->tblCond, keyInfo, 1, pInput->pReaderBlock,
|
||||
(void**)&pInput->pReader, GET_TASKID(pTaskInfo), NULL);
|
||||
pInput->bInMemReader = true;
|
||||
} else {
|
||||
pInput->pReader = NULL;
|
||||
pInput->bInMemReader = false;
|
||||
}
|
||||
bool hasNext = true;
|
||||
fetchNextSubTableBlockFromReader(pOperator, pInput, &hasNext);
|
||||
if (!hasNext) {
|
||||
pInput->rowIdx = -1;
|
||||
++pSubTblsInfo->numSubTablesCompleted;
|
||||
continue;
|
||||
} else {
|
||||
pInput->rowIdx = 0;
|
||||
pInput->pageIdx = -1;
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t adjustSubTableFromMemBlock(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo) {
|
||||
STableMergeScanInfo* pInfo = pOperatorInfo->info;
|
||||
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree);
|
||||
bool hasNext = true;
|
||||
fetchNextSubTableBlockFromReader(pOperatorInfo, pInput, &hasNext);
|
||||
if (!hasNext) {
|
||||
pInput->rowIdx = -1;
|
||||
++pSubTblsInfo->numSubTablesCompleted;
|
||||
} else {
|
||||
pInput->rowIdx = 0;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t adjustSubTableForNextRow(SOperatorInfo* pOperatorInfo, STmsSubTablesMergeInfo* pSubTblsInfo) {
|
||||
STableMergeScanInfo* pInfo = pOperatorInfo->info;
|
||||
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree);
|
||||
|
||||
SSDataBlock* pInputBlock = (pInput->type == SUB_TABLE_MEM_BLOCK) ? pInput->pReaderBlock : pInput->pPageBlock;
|
||||
if (pInput->rowIdx < pInputBlock->info.rows - 1) {
|
||||
++pInput->rowIdx;
|
||||
} else if (pInput->rowIdx == pInputBlock->info.rows -1 ) {
|
||||
if (pInput->type == SUB_TABLE_MEM_BLOCK) {
|
||||
adjustSubTableFromMemBlock(pOperatorInfo, pSubTblsInfo);
|
||||
}
|
||||
if (pInput->rowIdx != -1) {
|
||||
SColumnInfoData* col = taosArrayGet(pInputBlock->pDataBlock, pSubTblsInfo->pOrderInfo->slotId);
|
||||
pInput->aTs = (int64_t*)col->pData;
|
||||
}
|
||||
}
|
||||
|
||||
tMergeTreeAdjust(pSubTblsInfo->pTree, tMergeTreeGetAdjustIndex(pSubTblsInfo->pTree));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t appendChosenRowToDataBlock(STmsSubTablesMergeInfo* pSubTblsInfo, SSDataBlock* pBlock) {
|
||||
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + tMergeTreeGetChosenIndex(pSubTblsInfo->pTree);
|
||||
SSDataBlock* pInputBlock = (pInput->type == SUB_TABLE_MEM_BLOCK) ? pInput->pReaderBlock : pInput->pPageBlock;
|
||||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
|
||||
|
||||
SColumnInfoData* pSrcColInfo = taosArrayGet(pInputBlock->pDataBlock, i);
|
||||
bool isNull = colDataIsNull(pSrcColInfo, pInputBlock->info.rows, pInput->rowIdx, NULL);
|
||||
|
||||
if (isNull) {
|
||||
colDataSetVal(pColInfo, pBlock->info.rows, NULL, true);
|
||||
} else {
|
||||
if (pSrcColInfo->pData != NULL) {
|
||||
char* pData = colDataGetData(pSrcColInfo, pInput->rowIdx);
|
||||
colDataSetVal(pColInfo, pBlock->info.rows, pData, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
pBlock->info.dataLoad = 1;
|
||||
pBlock->info.scanFlag = pInputBlock->info.scanFlag;
|
||||
pBlock->info.rows += 1;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SSDataBlock* getSubTablesSortedBlock(SOperatorInfo* pOperator, SSDataBlock* pResBlock, int32_t capacity) {
|
||||
STableMergeScanInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
|
||||
|
||||
blockDataCleanup(pResBlock);
|
||||
bool finished = false;
|
||||
while (true) {
|
||||
while (1) {
|
||||
if (pSubTblsInfo->numSubTablesCompleted >= pSubTblsInfo->numSubTables) {
|
||||
finished = true;
|
||||
break;
|
||||
}
|
||||
|
||||
appendChosenRowToDataBlock(pSubTblsInfo, pResBlock);
|
||||
adjustSubTableForNextRow(pOperator, pSubTblsInfo);
|
||||
|
||||
if (pResBlock->info.rows >= capacity) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
T_LONG_JMP(pOperator->pTaskInfo->env, pTaskInfo->code);
|
||||
}
|
||||
|
||||
bool limitReached = applyLimitOffset(&pInfo->limitInfo, pResBlock, pTaskInfo);
|
||||
if (finished || limitReached || pResBlock->info.rows > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return (pResBlock->info.rows > 0) ? pResBlock : NULL;
|
||||
}
|
||||
|
||||
static int32_t startSubTablesTableMergeScan(SOperatorInfo* pOperator) {
|
||||
STableMergeScanInfo* pInfo = pOperator->info;
|
||||
|
||||
initSubTablesMergeInfo(pInfo);
|
||||
|
||||
initSubTableInputs(pOperator, pInfo);
|
||||
|
||||
openSubTablesMergeSort(pInfo->pSubTablesMergeInfo);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t stopSubTablesTableMergeScan(STableMergeScanInfo* pInfo) {
|
||||
STmsSubTablesMergeInfo* pSubTblsInfo = pInfo->pSubTablesMergeInfo;
|
||||
if (pSubTblsInfo != NULL) {
|
||||
tMergeTreeDestroy(&pSubTblsInfo->pTree);
|
||||
|
||||
for (int32_t i = 0; i < pSubTblsInfo->numSubTables; ++i) {
|
||||
STmsSubTableInput* pInput = pSubTblsInfo->aInputs + i;
|
||||
taosMemoryFree(pInput->tblCond.colList);
|
||||
blockDataDestroy(pInput->pReaderBlock);
|
||||
blockDataDestroy(pInput->pPageBlock);
|
||||
taosArrayDestroy(pInput->aBlockPages);
|
||||
pInfo->base.readerAPI.tsdReaderClose(pInput->pReader);
|
||||
pInput->pReader = NULL;
|
||||
}
|
||||
|
||||
destroyDiskbasedBuf(pSubTblsInfo->pBlocksBuf);
|
||||
taosMemoryFree(pSubTblsInfo->aInputs);
|
||||
|
||||
taosMemoryFree(pSubTblsInfo);
|
||||
pInfo->pSubTablesMergeInfo = NULL;
|
||||
}
|
||||
taosMemoryTrim(0);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSDataBlock* doTableMergeScanParaSubTables(SOperatorInfo* pOperator) {
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
STableMergeScanInfo* pInfo = pOperator->info;
|
||||
|
||||
int32_t code = pOperator->fpSet._openFn(pOperator);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
size_t tableListSize = tableListGetSize(pInfo->base.pTableListInfo);
|
||||
if (!pInfo->hasGroupId) {
|
||||
pInfo->hasGroupId = true;
|
||||
|
||||
if (tableListSize == 0) {
|
||||
setOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
pInfo->tableStartIndex = 0;
|
||||
pInfo->groupId = ((STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex))->groupId;
|
||||
startSubTablesTableMergeScan(pOperator);
|
||||
}
|
||||
|
||||
SSDataBlock* pBlock = NULL;
|
||||
while (pInfo->tableStartIndex < tableListSize) {
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||
}
|
||||
|
||||
pBlock = getSubTablesSortedBlock(pOperator, pInfo->pResBlock, pOperator->resultInfo.capacity);
|
||||
if (pBlock == NULL && !pInfo->bGroupProcessed && pInfo->needCountEmptyTable) {
|
||||
STableKeyInfo* tbInfo = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
|
||||
pBlock = getOneRowResultBlock(pTaskInfo, &pInfo->base, pInfo->pResBlock, tbInfo);
|
||||
}
|
||||
if (pBlock != NULL) {
|
||||
pBlock->info.id.groupId = pInfo->groupId;
|
||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||
pInfo->bGroupProcessed = true;
|
||||
return pBlock;
|
||||
} else {
|
||||
// Data of this group are all dumped, let's try the next group
|
||||
stopSubTablesTableMergeScan(pInfo);
|
||||
if (pInfo->tableEndIndex >= tableListSize - 1) {
|
||||
setOperatorCompleted(pOperator);
|
||||
break;
|
||||
}
|
||||
|
||||
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
|
||||
pInfo->groupId = tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex)->groupId;
|
||||
startSubTablesTableMergeScan(pOperator);
|
||||
resetLimitInfoForNextGroup(&pInfo->limitInfo);
|
||||
}
|
||||
}
|
||||
|
||||
return pBlock;
|
||||
}
|
||||
|
||||
|
||||
static void tableMergeScanDoSkipTable(uint64_t uid, void* pTableMergeScanInfo) {
|
||||
STableMergeScanInfo* pInfo = pTableMergeScanInfo;
|
||||
if (pInfo->mSkipTables == NULL) {
|
||||
|
@ -3578,15 +3992,6 @@ SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) {
|
|||
return pList;
|
||||
}
|
||||
|
||||
int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) {
|
||||
memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond));
|
||||
dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo));
|
||||
for (int i = 0; i < src->numOfCols; i++) {
|
||||
dst->colList[i] = src->colList[i];
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tableMergeScanTsdbNotifyCb(ETsdReaderNotifyType type, STsdReaderNotifyInfo* info, void* param) {
|
||||
STableMergeScanInfo* pTmsInfo = param;
|
||||
if (type == TSD_READER_NOTIFY_DURATION_START) {
|
||||
|
@ -3833,10 +4238,8 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
|
|||
|
||||
void destroyTableMergeScanOperatorInfo(void* param) {
|
||||
STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param;
|
||||
cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
|
||||
|
||||
int32_t numOfTable = taosArrayGetSize(pTableScanInfo->sortSourceParams);
|
||||
|
||||
// start one reader variable
|
||||
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
|
||||
pTableScanInfo->base.dataReader = NULL;
|
||||
|
||||
|
@ -3847,18 +4250,22 @@ void destroyTableMergeScanOperatorInfo(void* param) {
|
|||
}
|
||||
}
|
||||
|
||||
taosArrayDestroy(pTableScanInfo->sortSourceParams);
|
||||
tsortDestroySortHandle(pTableScanInfo->pSortHandle);
|
||||
pTableScanInfo->pSortHandle = NULL;
|
||||
taosHashCleanup(pTableScanInfo->mSkipTables);
|
||||
pTableScanInfo->mSkipTables = NULL;
|
||||
pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);
|
||||
// end one reader variable
|
||||
|
||||
cleanupQueryTableDataCond(&pTableScanInfo->base.cond);
|
||||
destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
|
||||
|
||||
pTableScanInfo->pResBlock = blockDataDestroy(pTableScanInfo->pResBlock);
|
||||
pTableScanInfo->pSortInputBlock = blockDataDestroy(pTableScanInfo->pSortInputBlock);
|
||||
pTableScanInfo->pReaderBlock = blockDataDestroy(pTableScanInfo->pReaderBlock);
|
||||
|
||||
taosArrayDestroy(pTableScanInfo->pSortInfo);
|
||||
|
||||
stopSubTablesTableMergeScan(pTableScanInfo);
|
||||
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
|
@ -3932,11 +4339,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
|||
goto _error;
|
||||
}
|
||||
|
||||
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
||||
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
|
||||
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
|
||||
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||
|
||||
initLimitInfo(pTableScanNode->scan.node.pLimit, pTableScanNode->scan.node.pSlimit, &pInfo->limitInfo);
|
||||
|
||||
pInfo->mergeLimit = -1;
|
||||
|
@ -3952,26 +4354,38 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
|||
pInfo->bSortRowId = false;
|
||||
}
|
||||
|
||||
|
||||
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
||||
pInfo->pResBlock = createDataBlockFromDescNode(pDescNode);
|
||||
blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity);
|
||||
|
||||
pInfo->pSortInfo = generateSortByTsInfo(pInfo->base.matchInfo.pList, pInfo->base.cond.order);
|
||||
pInfo->pReaderBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||
|
||||
pInfo->needCountEmptyTable = tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable;
|
||||
|
||||
int32_t rowSize = pInfo->pResBlock->info.rowSize;
|
||||
uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
|
||||
|
||||
pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
|
||||
|
||||
//start one reader variable
|
||||
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
|
||||
|
||||
pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
|
||||
int32_t rowSize = blockDataGetRowSize(pInfo->pSortInputBlock);
|
||||
uint32_t nCols = taosArrayGetSize(pInfo->pSortInputBlock->pDataBlock);
|
||||
pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
|
||||
if (!tsExperimental) {
|
||||
pInfo->filesetDelimited = false;
|
||||
} else {
|
||||
pInfo->filesetDelimited = pTableScanNode->filesetDelimited;
|
||||
}
|
||||
pInfo->needCountEmptyTable = tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable;
|
||||
|
||||
// end one reader variable
|
||||
setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
|
||||
pInfo, pTaskInfo);
|
||||
pOperator->exprSupp.numOfExprs = numOfCols;
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo,
|
||||
optrDefaultBufFn, getTableMergeScanExplainExecInfo, optrDefaultGetNextExtFn, NULL);
|
||||
pOperator->fpSet = createOperatorFpSet(
|
||||
optrDummyOpenFn, pTableScanNode->paraTablesSort ? doTableMergeScanParaSubTables : doTableMergeScan, NULL,
|
||||
destroyTableMergeScanOperatorInfo, optrDefaultBufFn, getTableMergeScanExplainExecInfo, optrDefaultGetNextExtFn,
|
||||
NULL);
|
||||
pOperator->cost.openCost = 0;
|
||||
return pOperator;
|
||||
|
||||
|
|
|
@ -459,7 +459,7 @@ static void idxInterRsltDestroy(SArray* results) {
|
|||
|
||||
static int idxMergeFinalResults(SArray* in, EIndexOperatorType oType, SArray* out) {
|
||||
// refactor, merge interResults into fResults by oType
|
||||
for (int i = 0; i < taosArrayGetSize(in); i--) {
|
||||
for (int i = 0; i < taosArrayGetSize(in); i++) {
|
||||
SArray* t = taosArrayGetP(in, i);
|
||||
taosArraySort(t, uidCompare);
|
||||
taosArrayRemoveDuplicate(t, uidCompare, NULL);
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include "index.h"
|
||||
#include "indexComm.h"
|
||||
#include "indexInt.h"
|
||||
#include "indexUtil.h"
|
||||
#include "nodes.h"
|
||||
#include "querynodes.h"
|
||||
#include "scalar.h"
|
||||
|
@ -77,15 +78,15 @@ typedef struct SIFParam {
|
|||
char dbName[TSDB_DB_NAME_LEN];
|
||||
char colName[TSDB_COL_NAME_LEN * 2 + 4];
|
||||
|
||||
SIndexMetaArg arg;
|
||||
SIndexMetaArg arg;
|
||||
SMetaDataFilterAPI api;
|
||||
} SIFParam;
|
||||
|
||||
typedef struct SIFCtx {
|
||||
int32_t code;
|
||||
SHashObj *pRes; /* element is SIFParam */
|
||||
bool noExec; // true: just iterate condition tree, and add hint to executor plan
|
||||
SIndexMetaArg arg;
|
||||
int32_t code;
|
||||
SHashObj *pRes; /* element is SIFParam */
|
||||
bool noExec; // true: just iterate condition tree, and add hint to executor plan
|
||||
SIndexMetaArg arg;
|
||||
SMetaDataFilterAPI *pAPI;
|
||||
} SIFCtx;
|
||||
|
||||
|
@ -669,6 +670,10 @@ static int32_t sifDoIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFP
|
|||
if (sifSetFltParam(left, right, &typedata, ¶m) != 0) return -1;
|
||||
}
|
||||
ret = left->api.metaFilterTableIds(arg->metaEx, ¶m, output->result);
|
||||
if (ret == 0) {
|
||||
taosArraySort(output->result, uidCompare);
|
||||
taosArrayRemoveDuplicate(output->result, uidCompare, NULL);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
@ -875,8 +880,8 @@ static int32_t sifExecLogic(SLogicConditionNode *node, SIFCtx *ctx, SIFParam *ou
|
|||
} else if (node->condType == LOGIC_COND_TYPE_NOT) {
|
||||
// taosArrayAddAll(output->result, params[m].result);
|
||||
}
|
||||
taosArraySort(output->result, idxUidCompare);
|
||||
taosArrayRemoveDuplicate(output->result, idxUidCompare, NULL);
|
||||
taosArraySort(output->result, uidCompare);
|
||||
taosArrayRemoveDuplicate(output->result, uidCompare, NULL);
|
||||
}
|
||||
} else {
|
||||
for (int32_t m = 0; m < node->pParameterList->length; m++) {
|
||||
|
@ -1016,7 +1021,7 @@ static int32_t sifCalculate(SNode *pNode, SIFParam *pDst) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status, SMetaDataFilterAPI* pAPI) {
|
||||
static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status, SMetaDataFilterAPI *pAPI) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (pNode == NULL) {
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
|
@ -1054,7 +1059,8 @@ static int32_t sifGetFltHint(SNode *pNode, SIdxFltStatus *status, SMetaDataFilte
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result, SIdxFltStatus *status, SMetaDataFilterAPI* pAPI) {
|
||||
int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result, SIdxFltStatus *status,
|
||||
SMetaDataFilterAPI *pAPI) {
|
||||
SIdxFltStatus st = idxGetFltStatus(pFilterNode, pAPI);
|
||||
if (st == SFLT_NOT_INDEX) {
|
||||
*status = st;
|
||||
|
@ -1081,7 +1087,7 @@ int32_t doFilterTag(SNode *pFilterNode, SIndexMetaArg *metaArg, SArray *result,
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SIdxFltStatus idxGetFltStatus(SNode *pFilterNode, SMetaDataFilterAPI* pAPI) {
|
||||
SIdxFltStatus idxGetFltStatus(SNode *pFilterNode, SMetaDataFilterAPI *pAPI) {
|
||||
SIdxFltStatus st = SFLT_NOT_INDEX;
|
||||
if (pFilterNode == NULL) {
|
||||
return SFLT_NOT_INDEX;
|
||||
|
|
|
@ -456,6 +456,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
|
|||
COPY_SCALAR_FIELD(filesetDelimited);
|
||||
COPY_SCALAR_FIELD(isCountByTag);
|
||||
CLONE_OBJECT_FIELD(pFuncTypes, functParamClone);
|
||||
COPY_SCALAR_FIELD(paraTablesSort);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -688,6 +689,7 @@ static int32_t physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhy
|
|||
COPY_SCALAR_FIELD(igExpired);
|
||||
COPY_SCALAR_FIELD(filesetDelimited);
|
||||
COPY_SCALAR_FIELD(needCountEmptyTable);
|
||||
COPY_SCALAR_FIELD(paraTablesSort);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -698,6 +698,7 @@ static const char* jkScanLogicPlanTagCond = "TagCond";
|
|||
static const char* jkScanLogicPlanGroupTags = "GroupTags";
|
||||
static const char* jkScanLogicPlanOnlyMetaCtbIdx = "OnlyMetaCtbIdx";
|
||||
static const char* jkScanLogicPlanFilesetDelimited = "FilesetDelimited";
|
||||
static const char* jkScanLogicPlanparaTablesSort = "paraTablesSort";
|
||||
|
||||
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
|
||||
|
@ -745,6 +746,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanFilesetDelimited, pNode->filesetDelimited);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanFilesetDelimited, pNode->paraTablesSort);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -795,6 +799,9 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkScanLogicPlanFilesetDelimited, &pNode->filesetDelimited);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkScanLogicPlanFilesetDelimited, &pNode->paraTablesSort);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1888,6 +1895,7 @@ static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid";
|
|||
static const char* jkTableScanPhysiPlanIgnoreUpdate = "IgnoreUpdate";
|
||||
static const char* jkTableScanPhysiPlanFilesetDelimited = "FilesetDelimited";
|
||||
static const char* jkTableScanPhysiPlanNeedCountEmptyTable = "NeedCountEmptyTable";
|
||||
static const char* jkTableScanPhysiPlanparaTablesSort = "paraTablesSort";
|
||||
|
||||
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
|
||||
|
@ -1962,6 +1970,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, pNode->needCountEmptyTable);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanparaTablesSort, pNode->paraTablesSort);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -2038,6 +2049,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, &pNode->needCountEmptyTable);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanparaTablesSort, &pNode->paraTablesSort);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -2185,6 +2185,9 @@ static int32_t physiTableScanNodeInlineToMsg(const void* pObj, STlvEncoder* pEnc
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeValueBool(pEncoder, pNode->needCountEmptyTable);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeValueBool(pEncoder, pNode->paraTablesSort);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -2269,6 +2272,9 @@ static int32_t msgToPhysiTableScanNodeInline(STlvDecoder* pDecoder, void* pObj)
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvDecodeValueBool(pDecoder, &pNode->needCountEmptyTable);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvDecodeValueBool(pDecoder, &pNode->paraTablesSort);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -401,6 +401,9 @@ bool addHintNodeToList(SAstCreateContext* pCxt, SNodeList** ppHintList, EHintOpt
|
|||
case HINT_PARTITION_FIRST:
|
||||
if (paramNum > 0 || hasHint(*ppHintList, HINT_SORT_FOR_GROUP)) return true;
|
||||
break;
|
||||
case HINT_PARA_TABLES_SORT:
|
||||
if (paramNum > 0 || hasHint(*ppHintList, HINT_PARA_TABLES_SORT)) return true;
|
||||
break;
|
||||
default:
|
||||
return true;
|
||||
}
|
||||
|
@ -479,6 +482,14 @@ SNodeList* createHintNodeList(SAstCreateContext* pCxt, const SToken* pLiteral) {
|
|||
}
|
||||
opt = HINT_PARTITION_FIRST;
|
||||
break;
|
||||
case TK_PARA_TABLES_SORT:
|
||||
lastComma = false;
|
||||
if (0 != opt || inParamList) {
|
||||
quit = true;
|
||||
break;
|
||||
}
|
||||
opt = HINT_PARA_TABLES_SORT;
|
||||
break;
|
||||
case TK_NK_LP:
|
||||
lastComma = false;
|
||||
if (0 == opt || inParamList) {
|
||||
|
|
|
@ -173,6 +173,7 @@ static SKeyword keywordTable[] = {
|
|||
{"OUTPUTTYPE", TK_OUTPUTTYPE},
|
||||
{"PAGES", TK_PAGES},
|
||||
{"PAGESIZE", TK_PAGESIZE},
|
||||
{"PARA_TABLES_SORT", TK_PARA_TABLES_SORT},
|
||||
{"PARTITION", TK_PARTITION},
|
||||
{"PARTITION_FIRST", TK_PARTITION_FIRST},
|
||||
{"PASS", TK_PASS},
|
||||
|
|
|
@ -47,6 +47,7 @@ int32_t validateQueryPlan(SPlanContext* pCxt, SQueryPlan* pPlan);
|
|||
|
||||
bool getBatchScanOptionFromHint(SNodeList* pList);
|
||||
bool getSortForGroupOptHint(SNodeList* pList);
|
||||
bool getparaTablesSortOptHint(SNodeList* pList);
|
||||
bool getOptHint(SNodeList* pList, EHintOption hint);
|
||||
SLogicNode* getLogicNodeRootNode(SLogicNode* pCurr);
|
||||
int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes);
|
||||
|
|
|
@ -501,7 +501,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
|||
} else {
|
||||
nodesDestroyNode((SNode*)pScan);
|
||||
}
|
||||
|
||||
pScan->paraTablesSort = getparaTablesSortOptHint(pSelect->pHint);
|
||||
pCxt->hasScan = true;
|
||||
|
||||
return code;
|
||||
|
|
|
@ -651,6 +651,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
|
|||
pTableScan->assignBlockUid = pCxt->pPlanCxt->rSmaQuery ? true : false;
|
||||
pTableScan->filesetDelimited = pScanLogicNode->filesetDelimited;
|
||||
pTableScan->needCountEmptyTable = pScanLogicNode->isCountByTag;
|
||||
pTableScan->paraTablesSort = pScanLogicNode->paraTablesSort;
|
||||
|
||||
int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
|
|
@ -466,6 +466,18 @@ bool getOptHint(SNodeList* pList, EHintOption hint) {
|
|||
return false;
|
||||
}
|
||||
|
||||
bool getparaTablesSortOptHint(SNodeList* pList) {
|
||||
if (!pList) return false;
|
||||
SNode* pNode;
|
||||
FOREACH(pNode, pList) {
|
||||
SHintNode* pHint = (SHintNode*)pNode;
|
||||
if (pHint->option == HINT_PARA_TABLES_SORT) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SLogicNode* pCurr = (SLogicNode*)pNode;
|
||||
|
|
|
@ -60,6 +60,7 @@ extern "C" int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t
|
|||
int64_t insertJobRefId = 0;
|
||||
int64_t queryJobRefId = 0;
|
||||
|
||||
bool schtJobDone = false;
|
||||
uint64_t schtMergeTemplateId = 0x4;
|
||||
uint64_t schtFetchTaskId = 0;
|
||||
uint64_t schtQueryId = 1;
|
||||
|
@ -450,6 +451,8 @@ void *schtSendRsp(void *param) {
|
|||
|
||||
schReleaseJob(job);
|
||||
|
||||
schtJobDone = true;
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -1028,6 +1031,8 @@ TEST(insertTest, normalCase) {
|
|||
TdThreadAttr thattr;
|
||||
taosThreadAttrInit(&thattr);
|
||||
|
||||
schtJobDone = false;
|
||||
|
||||
TdThread thread1;
|
||||
taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId);
|
||||
|
||||
|
@ -1045,6 +1050,14 @@ TEST(insertTest, normalCase) {
|
|||
code = schedulerExecJob(&req, &insertJobRefId);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
while (true) {
|
||||
if (schtJobDone) {
|
||||
break;
|
||||
}
|
||||
|
||||
taosUsleep(10000);
|
||||
}
|
||||
|
||||
schedulerFreeJob(&insertJobRefId, 0);
|
||||
|
||||
schedulerDestroy();
|
||||
|
|
|
@ -68,18 +68,6 @@ int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckp
|
|||
return pEncoder->pos;
|
||||
}
|
||||
|
||||
int32_t tDecodeStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointSourceRsp* pRsp) {
|
||||
if (tStartDecode(pDecoder) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pRsp->checkpointId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pRsp->taskId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pRsp->nodeId) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pRsp->expireTime) < 0) return -1;
|
||||
if (tDecodeI8(pDecoder, &pRsp->success) < 0) return -1;
|
||||
tEndDecode(pDecoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pReq) {
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||
|
|
|
@ -961,6 +961,10 @@ static void cliSendCb(uv_write_t* req, int status) {
|
|||
tTrace("%s conn %p send cost:%dus ", CONN_GET_INST_LABEL(pConn), pConn, (int)cost);
|
||||
}
|
||||
}
|
||||
if (pMsg->msg.contLen == 0 && pMsg->msg.pCont != 0) {
|
||||
rpcFreeCont(pMsg->msg.pCont);
|
||||
pMsg->msg.pCont = 0;
|
||||
}
|
||||
|
||||
if (status == 0) {
|
||||
tDebug("%s conn %p data already was written out", CONN_GET_INST_LABEL(pConn), pConn);
|
||||
|
|
|
@ -57,6 +57,8 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/hint.py -Q 2
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/hint.py -Q 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/hint.py -Q 4
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/para_tms.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/para_tms2.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_str.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_math.py
|
||||
|
|
|
@ -656,7 +656,9 @@ if $data31 != 4 then
|
|||
endi
|
||||
|
||||
sql_error select irate(c) from st where t1="1" and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' interval(1m) sliding(15s) group by tbname,c;
|
||||
print select _wstart, irate(c), tbname, t1, t2 from st where t1=1 and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' partition by tbname,t1,t2 interval(1m) sliding(15s) order by tbname;
|
||||
sql select _wstart, irate(c), tbname, t1, t2 from st where t1=1 and ts >= '2020-03-27 04:11:17.732' and ts < '2020-03-27 05:11:17.732' partition by tbname,t1,t2 interval(1m) sliding(15s) order by tbname;
|
||||
print $rows
|
||||
if $rows != 40 then
|
||||
return -1
|
||||
endi
|
||||
|
|
|
@ -24,6 +24,11 @@ class TDTestCase:
|
|||
self.ctbNum = 10
|
||||
self.rowsPerTbl = 10000
|
||||
self.duraion = '1h'
|
||||
self.cachemodel = 'both'
|
||||
self.cacheEnable = True
|
||||
#self.cacheEnable = False
|
||||
if not self.cacheEnable:
|
||||
self.cachemodel = 'none'
|
||||
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
|
@ -34,7 +39,7 @@ class TDTestCase:
|
|||
if dropFlag == 1:
|
||||
tsql.execute("drop database if exists %s"%(dbName))
|
||||
|
||||
tsql.execute("create database if not exists %s vgroups %d replica %d duration %s CACHEMODEL 'both'"%(dbName, vgroups, replica, duration))
|
||||
tsql.execute("create database if not exists %s vgroups %d replica %d duration %s CACHEMODEL '%s'"%(dbName, vgroups, replica, duration, self.cachemodel))
|
||||
tdLog.debug("complete to create database %s"%(dbName))
|
||||
return
|
||||
|
||||
|
@ -130,6 +135,9 @@ class TDTestCase:
|
|||
return
|
||||
|
||||
def check_explain_res_has_row(self, plan_str_expect: str, rows, sql):
|
||||
if not self.cacheEnable:
|
||||
return
|
||||
|
||||
plan_found = False
|
||||
for row in rows:
|
||||
if str(row).find(plan_str_expect) >= 0:
|
||||
|
@ -343,10 +351,10 @@ class TDTestCase:
|
|||
p.check_returncode()
|
||||
tdSql.query_success_failed("select ts, last(c1), c1, ts, c1 from meters", queryTimes=10, expectErrInfo="Invalid column name: c1")
|
||||
tdSql.query('select last(c12), c12, ts from meters', queryTimes=1)
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkCols(3)
|
||||
tdSql.checkData(0, 0, None)
|
||||
tdSql.checkData(0, 1, None)
|
||||
tdSql.checkRows(0)
|
||||
#tdSql.checkCols(3)
|
||||
#tdSql.checkData(0, 0, None)
|
||||
#tdSql.checkData(0, 1, None)
|
||||
|
||||
def test_cache_scan_with_drop_column(self):
|
||||
tdSql.query('select last(*) from meters')
|
||||
|
@ -378,41 +386,41 @@ class TDTestCase:
|
|||
p.check_returncode()
|
||||
tdSql.query_success_failed("select ts, last(c2), c12, ts, c12 from meters", queryTimes=10, expectErrInfo="Invalid column name: c2")
|
||||
tdSql.query('select last(c1), c1, ts from meters', queryTimes=1)
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkCols(3)
|
||||
tdSql.checkData(0, 0, None)
|
||||
tdSql.checkData(0, 1, None)
|
||||
tdSql.checkRows(0)
|
||||
#tdSql.checkCols(3)
|
||||
#tdSql.checkData(0, 0, None)
|
||||
#tdSql.checkData(0, 1, None)
|
||||
|
||||
def test_cache_scan_last_row_with_partition_by(self):
|
||||
tdSql.query('select last(c1) from meters partition by t1')
|
||||
print(str(tdSql.queryResult))
|
||||
tdSql.checkCols(1)
|
||||
tdSql.checkRows(5)
|
||||
#tdSql.checkCols(1)
|
||||
tdSql.checkRows(0)
|
||||
p = subprocess.run(["taos", '-s', "alter table test.meters drop column c1; alter table test.meters add column c2 int"])
|
||||
p.check_returncode()
|
||||
tdSql.query_success_failed('select last(c1) from meters partition by t1', queryTimes=10, expectErrInfo="Invalid column name: c1")
|
||||
tdSql.query('select last(c2), c2, ts from meters', queryTimes=1)
|
||||
print(str(tdSql.queryResult))
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkCols(3)
|
||||
tdSql.checkData(0, 0, None)
|
||||
tdSql.checkData(0, 1, None)
|
||||
tdSql.checkRows(0)
|
||||
#tdSql.checkCols(3)
|
||||
#tdSql.checkData(0, 0, None)
|
||||
#tdSql.checkData(0, 1, None)
|
||||
|
||||
|
||||
def test_cache_scan_last_row_with_partition_by_tbname(self):
|
||||
tdSql.query('select last(c2) from meters partition by tbname')
|
||||
print(str(tdSql.queryResult))
|
||||
tdSql.checkCols(1)
|
||||
tdSql.checkRows(10)
|
||||
#tdSql.checkCols(1)
|
||||
tdSql.checkRows(0)
|
||||
p = subprocess.run(["taos", '-s', "alter table test.meters drop column c2; alter table test.meters add column c1 int"])
|
||||
p.check_returncode()
|
||||
tdSql.query_success_failed('select last_row(c2) from meters partition by tbname', queryTimes=10, expectErrInfo="Invalid column name: c2")
|
||||
tdSql.query('select last(c1), c1, ts from meters', queryTimes=1)
|
||||
print(str(tdSql.queryResult))
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkCols(3)
|
||||
tdSql.checkData(0, 0, None)
|
||||
tdSql.checkData(0, 1, None)
|
||||
tdSql.checkRows(0)
|
||||
#tdSql.checkCols(3)
|
||||
#tdSql.checkData(0, 0, None)
|
||||
#tdSql.checkData(0, 1, None)
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -117,6 +117,10 @@ class TDTestCase:
|
|||
( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" )
|
||||
( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" )
|
||||
( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" )
|
||||
'''
|
||||
)
|
||||
tdSql.execute(
|
||||
f'''insert into {dbname}.t1 values
|
||||
( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
|
||||
'''
|
||||
)
|
||||
|
@ -179,6 +183,10 @@ class TDTestCase:
|
|||
( '2022-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" )
|
||||
( '2022-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" )
|
||||
( '2022-12-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" )
|
||||
'''
|
||||
)
|
||||
tdSql.execute(
|
||||
f'''insert into {dbname}.t1 values
|
||||
( '2023-02-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
|
||||
'''
|
||||
)
|
||||
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue