Merge remote-tracking branch 'origin/3.0' into fix/tsim

This commit is contained in:
Shengliang Guan 2022-05-18 14:21:02 +08:00
commit 6a70fab3ad
30 changed files with 377 additions and 847 deletions

View File

@ -258,6 +258,7 @@ typedef struct {
char* tblFName;
int32_t numOfRows;
int32_t affectedRows;
int64_t sver;
} SSubmitBlkRsp;
typedef struct {

View File

@ -170,32 +170,18 @@ typedef struct SInputColumnInfoData {
// sql function runtime context
typedef struct SqlFunctionCtx {
SInputColumnInfoData input;
SResultDataInfo resDataInfo;
uint32_t order; // data block scanner order: asc|desc
uint8_t scanFlag; // record current running step, default: 0
////////////////////////////////////////////////////////////////
int32_t startRow; // start row index
int32_t size; // handled processed row number
SColumnInfoData* pInput;
SColumnDataAgg agg;
int16_t inputType; // TODO remove it
int16_t inputBytes; // TODO remove it
bool hasNull; // null value exist in current block, TODO remove it
bool requireNull; // require null in some function, TODO remove it
int32_t columnIndex; // TODO remove it
bool isAggSet;
int64_t startTs; // timestamp range of current query when function is executed on a specific data block, TODO remove it
bool stableQuery;
/////////////////////////////////////////////////////////////////
int16_t functionId; // function id
char * pOutput; // final result output buffer, point to sdata->data
int32_t numOfParams;
SFunctParam *param; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param
int64_t *ptsList; // corresponding timestamp array list
SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
int32_t offset;
SVariant tag;
SInputColumnInfoData input;
SResultDataInfo resDataInfo;
uint32_t order; // data block scanner order: asc|desc
uint8_t scanFlag; // record current running step, default: 0
int16_t functionId; // function id
char *pOutput; // final result output buffer, point to sdata->data
int32_t numOfParams;
SFunctParam *param; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param
int64_t *ptsList; // corresponding timestamp array list
SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
int32_t offset;
SVariant tag;
struct SResultRowEntryInfo *resultInfo;
SSubsidiaryResInfo subsidiaries;
SPoint1 start;

View File

@ -218,6 +218,13 @@ int32_t stmtParseSql(STscStmt* pStmt) {
pStmt->bInfo.needParse = false;
if (pStmt->sql.pQuery->pRoot && 0 == pStmt->sql.type) {
pStmt->sql.type = STMT_TYPE_INSERT;
} else if (pStmt->sql.pQuery->pPrepareRoot) {
pStmt->sql.type = STMT_TYPE_QUERY;
}
/*
switch (nodeType(pStmt->sql.pQuery->pRoot)) {
case QUERY_NODE_VNODE_MODIF_STMT:
if (0 == pStmt->sql.type) {
@ -231,6 +238,7 @@ int32_t stmtParseSql(STscStmt* pStmt) {
tscError("not supported stmt type %d", nodeType(pStmt->sql.pQuery->pRoot));
STMT_ERR_RET(TSDB_CODE_TSC_STMT_CLAUSE_ERROR);
}
*/
return TSDB_CODE_SUCCESS;
}

View File

@ -600,10 +600,11 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
}
int32_t blockDataFromBuf1(SSDataBlock* pBlock, const char* buf, size_t capacity) {
pBlock->info.rows = *(int32_t*)buf;
pBlock->info.rows = *(int32_t*) buf;
pBlock->info.groupId = *(uint64_t*) (buf + sizeof(int32_t));
int32_t numOfCols = pBlock->info.numOfCols;
const char* pStart = buf + sizeof(uint32_t);
const char* pStart = buf + sizeof(uint32_t) + sizeof(uint64_t);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
@ -669,7 +670,7 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock) {
return sizeof(int32_t) + sizeof(uint64_t) + pBlock->info.numOfCols * sizeof(int32_t);
}
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
double blockDataGetSerialRowSize(const SSDataBlock* pBlock) {
ASSERT(pBlock != NULL);
double rowSize = 0;
@ -1224,7 +1225,27 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
}
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
return (int32_t)((pageSize - blockDataGetSerialMetaSize(pBlock)) / blockDataGetSerialRowSize(pBlock));
int32_t payloadSize = pageSize - blockDataGetSerialMetaSize(pBlock);
int32_t rowSize = pBlock->info.rowSize;
int32_t nRows = payloadSize / rowSize;
// the true value must be less than the value of nRows
int32_t additional = 0;
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
additional += nRows * sizeof(int32_t);
} else {
additional += BitmapLen(nRows);
}
}
int32_t newRows = (payloadSize - additional) / rowSize;
ASSERT(newRows <= nRows && newRows > 1);
return newRows;
}
void colDataDestroy(SColumnInfoData* pColData) {

View File

@ -4093,6 +4093,7 @@ static int32_t tEncodeSSubmitBlkRsp(SEncoder *pEncoder, const SSubmitBlkRsp *pBl
}
if (tEncodeI32v(pEncoder, pBlock->numOfRows) < 0) return -1;
if (tEncodeI32v(pEncoder, pBlock->affectedRows) < 0) return -1;
if (tEncodeI64v(pEncoder, pBlock->sver) < 0) return -1;
tEndEncode(pEncoder);
return 0;
@ -4111,6 +4112,7 @@ static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) {
}
if (tDecodeI32v(pDecoder, &pBlock->numOfRows) < 0) return -1;
if (tDecodeI32v(pDecoder, &pBlock->affectedRows) < 0) return -1;
if (tDecodeI64v(pDecoder, &pBlock->sver) < 0) return -1;
tEndDecode(pDecoder);
return 0;

View File

@ -398,11 +398,14 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
if (pCfg->precision < TSDB_MIN_PRECISION && pCfg->precision > TSDB_MAX_PRECISION) return -1;
if (pCfg->compression < TSDB_MIN_COMP_LEVEL || pCfg->compression > TSDB_MAX_COMP_LEVEL) return -1;
if (pCfg->replications < TSDB_MIN_DB_REPLICA || pCfg->replications > TSDB_MAX_DB_REPLICA) return -1;
if (pCfg->replications > mndGetDnodeSize(pMnode)) return -1;
if (pCfg->replications != 1 && pCfg->replications != 3) return -1;
if (pCfg->strict < TSDB_DB_STRICT_OFF || pCfg->strict > TSDB_DB_STRICT_ON) return -1;
if (pCfg->cacheLastRow < TSDB_MIN_DB_CACHE_LAST_ROW || pCfg->cacheLastRow > TSDB_MAX_DB_CACHE_LAST_ROW) return -1;
if (pCfg->hashMethod != 1) return -1;
if (pCfg->replications > mndGetDnodeSize(pMnode)) {
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
return -1;
}
terrno = 0;
return TSDB_CODE_SUCCESS;
@ -1447,8 +1450,8 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
}
char *status = "ready";
char b[24] = {0};
STR_WITH_SIZE_TO_VARSTR(b, status, strlen(status));
char statusB[24] = {0};
STR_WITH_SIZE_TO_VARSTR(statusB, status, strlen(status));
if (sysDb) {
for (int32_t i = 0; i < pShow->numOfColumns; ++i) {
@ -1458,7 +1461,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
} else if (i == 3) {
colDataAppend(pColInfo, rows, (const char *)&numOfTables, false);
} else if (i == 20) {
colDataAppend(pColInfo, rows, b, false);
colDataAppend(pColInfo, rows, statusB, false);
} else {
colDataAppendNULL(pColInfo, rows);
}
@ -1481,9 +1484,10 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.replications, false);
const char *src = pDb->cfg.strict ? "strict" : "nostrict";
STR_WITH_SIZE_TO_VARSTR(b, src, strlen(src));
char strict[24] = {0};
STR_WITH_SIZE_TO_VARSTR(strict, src, strlen(src));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)b, false);
colDataAppend(pColInfo, rows, (const char *)strict, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.daysPerFile, false);
@ -1554,7 +1558,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.numOfStables, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols);
colDataAppend(pColInfo, rows, (const char *)b, false);
colDataAppend(pColInfo, rows, (const char *)statusB, false);
}
}

View File

@ -502,6 +502,17 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
}
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, dropReq.name);
// if (pTopic == NULL) {
// if (dropReq.igNotExists) {
// mDebug("topic:%s, not exist, ignore not exist is set", dropReq.name);
// return 0;
// } else {
// terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
// mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
// return -1;
// }
// }
if (pTopic->refConsumerCnt != 0) {
mndReleaseTopic(pMnode, pTopic);
terrno = TSDB_CODE_MND_TOPIC_SUBSCRIBED;

View File

@ -57,56 +57,56 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
}
// open pTbDb
ret = tdbDbOpen("table.db", sizeof(STbDbKey), -1, tbDbKeyCmpr, pMeta->pEnv, &pMeta->pTbDb);
ret = tdbOpen("table.db", sizeof(STbDbKey), -1, tbDbKeyCmpr, pMeta->pEnv, &pMeta->pTbDb);
if (ret < 0) {
metaError("vgId:%d failed to open meta table db since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open pSkmDb
ret = tdbDbOpen("schema.db", sizeof(SSkmDbKey), -1, skmDbKeyCmpr, pMeta->pEnv, &pMeta->pSkmDb);
ret = tdbOpen("schema.db", sizeof(SSkmDbKey), -1, skmDbKeyCmpr, pMeta->pEnv, &pMeta->pSkmDb);
if (ret < 0) {
metaError("vgId:%d failed to open meta schema db since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open pUidIdx
ret = tdbDbOpen("uid.idx", sizeof(tb_uid_t), sizeof(int64_t), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx);
ret = tdbOpen("uid.idx", sizeof(tb_uid_t), sizeof(int64_t), uidIdxKeyCmpr, pMeta->pEnv, &pMeta->pUidIdx);
if (ret < 0) {
metaError("vgId:%d failed to open meta uid idx since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open pNameIdx
ret = tdbDbOpen("name.idx", -1, sizeof(tb_uid_t), NULL, pMeta->pEnv, &pMeta->pNameIdx);
ret = tdbOpen("name.idx", -1, sizeof(tb_uid_t), NULL, pMeta->pEnv, &pMeta->pNameIdx);
if (ret < 0) {
metaError("vgId:%d failed to open meta name index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open pCtbIdx
ret = tdbDbOpen("ctb.idx", sizeof(SCtbIdxKey), 0, ctbIdxKeyCmpr, pMeta->pEnv, &pMeta->pCtbIdx);
ret = tdbOpen("ctb.idx", sizeof(SCtbIdxKey), 0, ctbIdxKeyCmpr, pMeta->pEnv, &pMeta->pCtbIdx);
if (ret < 0) {
metaError("vgId:%d failed to open meta child table index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open pTagIdx
ret = tdbDbOpen("tag.idx", -1, 0, tagIdxKeyCmpr, pMeta->pEnv, &pMeta->pTagIdx);
ret = tdbOpen("tag.idx", -1, 0, tagIdxKeyCmpr, pMeta->pEnv, &pMeta->pTagIdx);
if (ret < 0) {
metaError("vgId:%d failed to open meta tag index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open pTtlIdx
ret = tdbDbOpen("ttl.idx", sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pMeta->pEnv, &pMeta->pTtlIdx);
ret = tdbOpen("ttl.idx", sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pMeta->pEnv, &pMeta->pTtlIdx);
if (ret < 0) {
metaError("vgId:%d failed to open meta ttl index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
}
// open pSmaIdx
ret = tdbDbOpen("sma.idx", sizeof(SSmaIdxKey), 0, smaIdxKeyCmpr, pMeta->pEnv, &pMeta->pSmaIdx);
ret = tdbOpen("sma.idx", sizeof(SSmaIdxKey), 0, smaIdxKeyCmpr, pMeta->pEnv, &pMeta->pSmaIdx);
if (ret < 0) {
metaError("vgId:%d failed to open meta sma index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err;
@ -125,14 +125,14 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta) {
_err:
if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pSmaIdx) tdbDbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbDbClose(pMeta->pTtlIdx);
if (pMeta->pTagIdx) tdbDbClose(pMeta->pTagIdx);
if (pMeta->pCtbIdx) tdbDbClose(pMeta->pCtbIdx);
if (pMeta->pNameIdx) tdbDbClose(pMeta->pNameIdx);
if (pMeta->pUidIdx) tdbDbClose(pMeta->pUidIdx);
if (pMeta->pSkmDb) tdbDbClose(pMeta->pSkmDb);
if (pMeta->pTbDb) tdbDbClose(pMeta->pTbDb);
if (pMeta->pSmaIdx) tdbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbClose(pMeta->pTtlIdx);
if (pMeta->pTagIdx) tdbClose(pMeta->pTagIdx);
if (pMeta->pCtbIdx) tdbClose(pMeta->pCtbIdx);
if (pMeta->pNameIdx) tdbClose(pMeta->pNameIdx);
if (pMeta->pUidIdx) tdbClose(pMeta->pUidIdx);
if (pMeta->pSkmDb) tdbClose(pMeta->pSkmDb);
if (pMeta->pTbDb) tdbClose(pMeta->pTbDb);
if (pMeta->pEnv) tdbEnvClose(pMeta->pEnv);
metaDestroyLock(pMeta);
taosMemoryFree(pMeta);
@ -142,14 +142,14 @@ _err:
int metaClose(SMeta *pMeta) {
if (pMeta) {
if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pSmaIdx) tdbDbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbDbClose(pMeta->pTtlIdx);
if (pMeta->pTagIdx) tdbDbClose(pMeta->pTagIdx);
if (pMeta->pCtbIdx) tdbDbClose(pMeta->pCtbIdx);
if (pMeta->pNameIdx) tdbDbClose(pMeta->pNameIdx);
if (pMeta->pUidIdx) tdbDbClose(pMeta->pUidIdx);
if (pMeta->pSkmDb) tdbDbClose(pMeta->pSkmDb);
if (pMeta->pTbDb) tdbDbClose(pMeta->pTbDb);
if (pMeta->pSmaIdx) tdbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbClose(pMeta->pTtlIdx);
if (pMeta->pTagIdx) tdbClose(pMeta->pTagIdx);
if (pMeta->pCtbIdx) tdbClose(pMeta->pCtbIdx);
if (pMeta->pNameIdx) tdbClose(pMeta->pNameIdx);
if (pMeta->pUidIdx) tdbClose(pMeta->pUidIdx);
if (pMeta->pSkmDb) tdbClose(pMeta->pSkmDb);
if (pMeta->pTbDb) tdbClose(pMeta->pTbDb);
if (pMeta->pEnv) tdbEnvClose(pMeta->pEnv);
metaDestroyLock(pMeta);
taosMemoryFree(pMeta);

View File

@ -35,7 +35,7 @@ int metaGetTableEntryByVersion(SMetaReader *pReader, int64_t version, tb_uid_t u
STbDbKey tbDbKey = {.version = version, .uid = uid};
// query table.db
if (tdbDbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pReader->pBuf, &pReader->szBuf) < 0) {
if (tdbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pReader->pBuf, &pReader->szBuf) < 0) {
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
goto _err;
}
@ -58,7 +58,7 @@ int metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid) {
int64_t version;
// query uid.idx
if (tdbDbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pReader->pBuf, &pReader->szBuf) < 0) {
if (tdbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pReader->pBuf, &pReader->szBuf) < 0) {
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
return -1;
}
@ -72,7 +72,7 @@ int metaGetTableEntryByName(SMetaReader *pReader, const char *name) {
tb_uid_t uid;
// query name.idx
if (tdbDbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pReader->pBuf, &pReader->szBuf) < 0) {
if (tdbGet(pMeta->pNameIdx, name, strlen(name) + 1, &pReader->pBuf, &pReader->szBuf) < 0) {
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
return -1;
}
@ -159,7 +159,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
pKey = &skmDbKey;
kLen = sizeof(skmDbKey);
metaRLock(pMeta);
ret = tdbDbGet(pMeta->pSkmDb, pKey, kLen, &pVal, &vLen);
ret = tdbGet(pMeta->pSkmDb, pKey, kLen, &pVal, &vLen);
metaULock(pMeta);
if (ret < 0) {
return NULL;
@ -413,7 +413,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
memcpy((void*)pTSma->expr, mr.me.smaEntry.tsma->expr, pTSma->exprLen);
memcpy((void *)pTSma->expr, mr.me.smaEntry.tsma->expr, pTSma->exprLen);
}
if (pTSma->tagsFilterLen > 0) {
if (!(pTSma->tagsFilter = taosMemoryCalloc(1, pTSma->tagsFilterLen))) {
@ -421,14 +421,14 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
goto _err;
}
}
memcpy((void*)pTSma->tagsFilter, mr.me.smaEntry.tsma->tagsFilter, pTSma->tagsFilterLen);
memcpy((void *)pTSma->tagsFilter, mr.me.smaEntry.tsma->tagsFilter, pTSma->tagsFilterLen);
} else {
pTSma->exprLen = 0;
pTSma->expr = NULL;
pTSma->tagsFilterLen = 0;
pTSma->tagsFilter = NULL;
}
++smaIdx;
}

View File

@ -117,7 +117,7 @@ static int metaSaveSmaToDB(SMeta *pMeta, const SMetaEntry *pME) {
tEncoderClear(&coder);
// write to table.db
if (tdbDbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) {
if (tdbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) {
goto _err;
}
@ -130,17 +130,17 @@ _err:
}
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbDbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn);
return tdbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn);
}
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbDbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn);
return tdbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn);
}
static int metaUpdateSmaIdx(SMeta *pMeta, const SMetaEntry *pME) {
SSmaIdxKey smaIdxKey = {.uid = pME->smaEntry.tsma->tableUid, .smaUid = pME->smaEntry.tsma->indexUid};
return tdbDbInsert(pMeta->pSmaIdx, &smaIdxKey, sizeof(smaIdxKey), NULL, 0, &pMeta->txn);
return tdbInsert(pMeta->pSmaIdx, &smaIdxKey, sizeof(smaIdxKey), NULL, 0, &pMeta->txn);
}
static int metaHandleSmaEntry(SMeta *pMeta, const SMetaEntry *pME) {

View File

@ -389,7 +389,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
int c;
// search name index
ret = tdbDbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal);
ret = tdbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal);
if (ret < 0) {
terrno = TSDB_CODE_VND_TABLE_NOT_EXIST;
return -1;
@ -536,7 +536,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
int nData = 0;
// search name index
ret = tdbDbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal);
ret = tdbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal);
if (ret < 0) {
terrno = TSDB_CODE_VND_TABLE_NOT_EXIST;
return -1;
@ -573,9 +573,9 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
tDecoderClear(&dc);
/* get stbEntry*/
tdbDbGet(pMeta->pUidIdx, &ctbEntry.ctbEntry.suid, sizeof(tb_uid_t), &pVal, &nVal);
tdbDbGet(pMeta->pTbDb, &((STbDbKey){.uid = ctbEntry.ctbEntry.suid, .version = *(int64_t *)pVal}), sizeof(STbDbKey),
(void **)&stbEntry.pBuf, &nVal);
tdbGet(pMeta->pUidIdx, &ctbEntry.ctbEntry.suid, sizeof(tb_uid_t), &pVal, &nVal);
tdbGet(pMeta->pTbDb, &((STbDbKey){.uid = ctbEntry.ctbEntry.suid, .version = *(int64_t *)pVal}), sizeof(STbDbKey),
(void **)&stbEntry.pBuf, &nVal);
tdbFree(pVal);
tDecoderInit(&dc, stbEntry.pBuf, nVal);
metaDecodeEntry(&dc, &stbEntry);
@ -632,7 +632,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
metaSaveToTbDb(pMeta, &ctbEntry);
// save to uid.idx
tdbDbUpsert(pMeta->pUidIdx, &ctbEntry.uid, sizeof(tb_uid_t), &version, sizeof(version), &pMeta->txn);
tdbUpsert(pMeta->pUidIdx, &ctbEntry.uid, sizeof(tb_uid_t), &version, sizeof(version), &pMeta->txn);
if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf);
if (stbEntry.pBuf) tdbFree(stbEntry.pBuf);
@ -708,7 +708,7 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
tEncoderClear(&coder);
// write to table.db
if (tdbDbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) {
if (tdbInsert(pMeta->pTbDb, pKey, kLen, pVal, vLen, &pMeta->txn) < 0) {
goto _err;
}
@ -721,11 +721,11 @@ _err:
}
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbDbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn);
return tdbInsert(pMeta->pUidIdx, &pME->uid, sizeof(tb_uid_t), &pME->version, sizeof(int64_t), &pMeta->txn);
}
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbDbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn);
return tdbInsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), &pMeta->txn);
}
static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) {
@ -748,12 +748,12 @@ static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) {
ttlKey.dtime = ctime + ttlDays * 24 * 60 * 60;
ttlKey.uid = pME->uid;
return tdbDbInsert(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), NULL, 0, &pMeta->txn);
return tdbInsert(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), NULL, 0, &pMeta->txn);
}
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) {
SCtbIdxKey ctbIdxKey = {.suid = pME->ctbEntry.suid, .uid = pME->uid};
return tdbDbInsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), NULL, 0, &pMeta->txn);
return tdbInsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), NULL, 0, &pMeta->txn);
}
static int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int8_t type, tb_uid_t uid,
@ -801,10 +801,10 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
SDecoder dc = {0};
// get super table
tdbDbGet(pMeta->pUidIdx, &pCtbEntry->ctbEntry.suid, sizeof(tb_uid_t), &pData, &nData);
tdbGet(pMeta->pUidIdx, &pCtbEntry->ctbEntry.suid, sizeof(tb_uid_t), &pData, &nData);
tbDbKey.uid = pCtbEntry->ctbEntry.suid;
tbDbKey.version = *(int64_t *)pData;
tdbDbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData);
tdbGet(pMeta->pTbDb, &tbDbKey, sizeof(tbDbKey), &pData, &nData);
tDecoderInit(&dc, pData, nData);
metaDecodeEntry(&dc, &stbEntry);
@ -817,7 +817,7 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
&pTagIdxKey, &nTagIdxKey) < 0) {
return -1;
}
tdbDbInsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn);
tdbInsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn);
metaDestroyTagIdxKey(pTagIdxKey);
tDecoderClear(&dc);
@ -859,7 +859,7 @@ static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
tEncoderInit(&coder, pVal, vLen);
tEncodeSSchemaWrapper(&coder, pSW);
if (tdbDbInsert(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), pVal, vLen, &pMeta->txn) < 0) {
if (tdbInsert(pMeta->pSkmDb, &skmDbKey, sizeof(skmDbKey), pVal, vLen, &pMeta->txn) < 0) {
rcode = -1;
goto _exit;
}

View File

@ -59,14 +59,14 @@ static int32_t smaOpenDBDb(TDB **ppDB, TENV *pEnv, const char *pFName) {
// Create a database
compFunc = tdSmaKeyCmpr;
if(tdbDbOpen(pFName, -1, -1, compFunc, pEnv, ppDB) < 0) {
if (tdbOpen(pFName, -1, -1, compFunc, pEnv, ppDB) < 0) {
return -1;
}
return 0;
}
static int32_t smaCloseDBDb(TDB *pDB) { return tdbDbClose(pDB); }
static int32_t smaCloseDBDb(TDB *pDB) { return tdbClose(pDB); }
int32_t smaOpenDBF(TENV *pEnv, SDBFile *pDBF) {
// TEnv is shared by a group of SDBFile
@ -99,7 +99,7 @@ int32_t smaSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, in
int32_t ret;
printf("save tsma data into %s, keyLen:%d valLen:%d txn:%p\n", pDBF->path, keyLen, valLen, txn);
ret = tdbDbUpsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn);
ret = tdbUpsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn);
if (ret < 0) {
smaError("failed to upsert tsma data into db, ret = %d", ret);
return -1;
@ -112,7 +112,7 @@ void *smaGetSmaDataByKey(SDBFile *pDBF, const void *pKey, int32_t keyLen, int32_
void *pVal = NULL;
int ret;
ret = tdbDbGet(pDBF->pDB, pKey, keyLen, &pVal, valLen);
ret = tdbGet(pDBF->pDB, pKey, keyLen, &pVal, valLen);
if (ret < 0) {
smaError("failed to get tsma data from db, ret = %d", ret);

View File

@ -309,6 +309,7 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo
TSKEY keyMin;
TSKEY keyMax;
SSubmitBlk *pBlkCopy;
int64_t sverNew;
// check if table exists
SMetaReader mr = {0};
@ -319,6 +320,12 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
return -1;
}
if (mr.me.type == TSDB_NORMAL_TABLE) {
sverNew = mr.me.ntbEntry.schema.sver;
} else {
metaGetTableEntryByUid(&mr, mr.me.ctbEntry.suid);
sverNew = mr.me.stbEntry.schema.sver;
}
metaReaderClear(&mr);
// create container is nedd
@ -367,6 +374,7 @@ int tsdbInsertTableData(STsdb *pTsdb, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlo
pRsp->numOfRows = pMsgIter->numOfRows;
pRsp->affectedRows = pMsgIter->numOfRows;
pRsp->sver = sverNew;
return 0;
}

View File

@ -60,12 +60,12 @@ static int32_t tsdbOpenDBDb(TDB **ppDB, TENV *pEnv, const char *pFName) {
// Create a database
compFunc = tsdbSmaKeyCmpr;
ret = tdbDbOpen(pFName, -1, -1, compFunc, pEnv, ppDB);
ret = tdbOpen(pFName, -1, -1, compFunc, pEnv, ppDB);
return 0;
}
static int32_t tsdbCloseDBDb(TDB *pDB) { return tdbDbClose(pDB); }
static int32_t tsdbCloseDBDb(TDB *pDB) { return tdbClose(pDB); }
int32_t tsdbOpenDBF(TENV *pEnv, SDBFile *pDBF) {
// TEnv is shared by a group of SDBFile
@ -97,7 +97,7 @@ int32_t tsdbCloseDBF(SDBFile *pDBF) {
int32_t tsdbSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, int32_t valLen, TXN *txn) {
int32_t ret;
ret = tdbDbInsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn);
ret = tdbInsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn);
if (ret < 0) {
tsdbError("Failed to create insert sma data into db, ret = %d", ret);
return -1;
@ -110,7 +110,7 @@ void *tsdbGetSmaDataByKey(SDBFile *pDBF, const void *pKey, int32_t keyLen, int32
void *pVal = NULL;
int ret;
ret = tdbDbGet(pDBF->pDB, pKey, keyLen, &pVal, valLen);
ret = tdbGet(pDBF->pDB, pKey, keyLen, &pVal, valLen);
if (ret < 0) {
tsdbError("Failed to get sma data from db, ret = %d", ret);

View File

@ -602,8 +602,6 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
for (int32_t k = 0; k < numOfOutput; ++k) {
pCtx[k].startTs = pWin->skey;
// keep it temporarily
bool hasAgg = pCtx[k].input.colDataAggIsSet;
int32_t numOfRows = pCtx[k].input.numOfRows;
@ -619,8 +617,8 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow
// not a whole block involved in query processing, statistics data can not be used
// NOTE: the original value of isSet have been changed here
if (pCtx[k].isAggSet && forwardStep < numOfTotal) {
pCtx[k].isAggSet = false;
if (pCtx[k].input.colDataAggIsSet && forwardStep < numOfTotal) {
pCtx[k].input.colDataAggIsSet = false;
}
if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) {
@ -680,7 +678,7 @@ static void doSetInputDataBlockInfo(SOperatorInfo* pOperator, SqlFunctionCtx* pC
int32_t order) {
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
pCtx[i].order = order;
pCtx[i].size = pBlock->info.rows;
pCtx[i].input.numOfRows = pBlock->info.rows;
setBlockStatisInfo(&pCtx[i], &pOperator->pExpr[i], pBlock);
}
}
@ -742,7 +740,8 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
for (int32_t i = 0; i < pOperator->numOfExprs; ++i) {
pCtx[i].order = order;
pCtx[i].size = pBlock->info.rows;
pCtx[i].input.numOfRows = pBlock->info.rows;
pCtx[i].pSrcBlock = pBlock;
pCtx[i].scanFlag = scanFlag;
@ -827,7 +826,6 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunctionCtx* pCtx) {
for (int32_t k = 0; k < pOperator->numOfExprs; ++k) {
if (functionNeedToExecute(&pCtx[k])) {
pCtx[k].startTs = startTs;
// todo add a dummy funtion to avoid process check
if (pCtx[k].fpSet.process != NULL) {
int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
@ -3330,7 +3328,7 @@ static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int3
static void doMergeResultImpl(SSortedMergeOperatorInfo* pInfo, SqlFunctionCtx* pCtx, int32_t numOfExpr,
int32_t rowIndex) {
for (int32_t j = 0; j < numOfExpr; ++j) { // TODO set row index
pCtx[j].startRow = rowIndex;
// pCtx[j].startRow = rowIndex;
}
for (int32_t j = 0; j < numOfExpr; ++j) {
@ -3381,7 +3379,7 @@ static void doMergeImpl(SOperatorInfo* pOperator, int32_t numOfExpr, SSDataBlock
SqlFunctionCtx* pCtx = pInfo->binfo.pCtx;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
pCtx[i].size = 1;
// pCtx[i].size = 1;
}
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
@ -4248,7 +4246,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
goto _error;
}
int32_t numOfRows = 10;
int32_t numOfRows = 1024;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
initResultSizeInfo(pOperator, numOfRows);

View File

@ -396,8 +396,11 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
pGInfo->groupId = calcGroupId(pInfo->keyBuf, len);
}
// number of rows
int32_t* rows = (int32_t*) pPage;
// group id
size_t numOfCols = pOperator->numOfExprs;
for(int32_t i = 0; i < numOfCols; ++i) {
SExprInfo* pExpr = &pOperator->pExpr[i];
@ -408,13 +411,13 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
int32_t bytes = pColInfoData->info.bytes;
int32_t startOffset = pInfo->columnOffset[i];
char* columnLen = NULL;
int32_t contentLen = 0;
int32_t* columnLen = NULL;
int32_t contentLen = 0;
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
int32_t* offset = (int32_t*)((char*)pPage + startOffset);
columnLen = (char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity;
char* data = (char*)(columnLen + sizeof(int32_t));
columnLen = (int32_t*) ((char*)pPage + startOffset + sizeof(int32_t) * pInfo->rowCapacity);
char* data = (char*)((char*) columnLen + sizeof(int32_t));
if (colDataIsNull_s(pColInfoData, j)) {
offset[(*rows)] = -1;
@ -423,11 +426,15 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
offset[*rows] = (*columnLen);
char* src = colDataGetData(pColInfoData, j);
memcpy(data + (*columnLen), src, varDataTLen(src));
int32_t v = (data + (*columnLen) + varDataTLen(src) - (char*)pPage);
ASSERT(v > 0);
printf("len:%d\n", v);
contentLen = varDataTLen(src);
}
} else {
char* bitmap = (char*)pPage + startOffset;
columnLen = (char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity);
columnLen = (int32_t*) ((char*)pPage + startOffset + BitmapLen(pInfo->rowCapacity));
char* data = (char*) columnLen + sizeof(int32_t);
bool isNull = colDataIsNull_f(pColInfoData->nullbitmap, j);
@ -440,6 +447,7 @@ static void doHashPartition(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
}
(*columnLen) += contentLen;
ASSERT(*columnLen >= 0);
}
(*rows) += 1;
@ -476,7 +484,12 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
pPage = getNewBufPage(pInfo->pBuf, 0, &pageId);
taosArrayPush(p->pPageList, &pageId);
*(int32_t*) pPage = 0;
// // number of rows
// *(int32_t*) pPage = 0;
//
// uint64_t* groupId = (pPage + sizeof(int32_t));
// *groupId = 0;
memset(pPage, 0, getBufPageSize(pInfo->pBuf));
}
}
@ -500,7 +513,7 @@ int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
size_t numOfCols = pBlock->info.numOfCols;
int32_t* offset = taosMemoryCalloc(pBlock->info.numOfCols, sizeof(int32_t));
offset[0] = sizeof(int32_t); // the number of rows in current page, ref to SSDataBlock paged serialization format
offset[0] = sizeof(int32_t) + sizeof(uint64_t); // the number of rows in current page, ref to SSDataBlock paged serialization format
for(int32_t i = 0; i < numOfCols - 1; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
@ -571,7 +584,6 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
break;
}
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->binfo.pCtx, pOperator->numOfExprs);
doHashPartition(pOperator, pBlock);
}

View File

@ -14,6 +14,7 @@
*/
#include "builtins.h"
#include "querynodes.h"
#include "builtinsimpl.h"
#include "scalar.h"
#include "taoserror.h"
@ -201,15 +202,32 @@ static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_
}
static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t paraNum = LIST_LENGTH(pFunc->pParameterList);
if (2 != paraNum) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
SNode* pParamNode = nodesListGetNode(pFunc->pParameterList, 1);
if (nodeType(pParamNode) != QUERY_NODE_VALUE) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
SValueNode* pValue = (SValueNode*) pParamNode;
if (pValue->node.resType.type != TSDB_DATA_TYPE_BIGINT) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
if (pValue->datum.i < 1 || pValue->datum.i > 100) {
return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName);
}
SDataType* pType = &((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType;
pFunc->node.resType = (SDataType){.bytes = pType->bytes, .type = pType->type};
return TSDB_CODE_SUCCESS;
}
static int32_t translateBottom(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
SDataType* pType = &((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType;
pFunc->node.resType = (SDataType){.bytes = pType->bytes, .type = pType->type};
return TSDB_CODE_SUCCESS;
return translateTop(pFunc, pErrBuf, len);
}
static int32_t translateSpread(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {

View File

@ -323,6 +323,7 @@ static FORCE_INLINE int32_t getNumofElem(SqlFunctionCtx* pCtx) {
}
return numOfElem;
}
/*
* count function does need the finalize, if data is missing, the default value, which is 0, is used
* count function does not use the pCtx->interResBuf to keep the intermediate buffer
@ -817,16 +818,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
int64_t val = GET_INT64_VAL(tval);
if ((prev < val) ^ isMinFunc) {
pBuf->v = val;
// for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
// SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
// if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
// __ctx->tag.i = key;
// __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
// }
//
// __ctx->fpSet.process(__ctx);
// }
if (pCtx->subsidiaries.num > 0) {
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
}
@ -839,15 +830,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
uint64_t val = GET_UINT64_VAL(tval);
if ((prev < val) ^ isMinFunc) {
pBuf->v = val;
// for (int32_t i = 0; i < (pCtx)->subsidiaries.num; ++i) {
// SqlFunctionCtx* __ctx = pCtx->subsidiaries.pCtx[i];
// if (__ctx->functionId == FUNCTION_TS_DUMMY) { // TODO refactor
// __ctx->tag.i = key;
// __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT;
// }
//
// __ctx->fpSet.process(__ctx);
// }
if (pCtx->subsidiaries.num > 0) {
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
}
@ -859,7 +841,6 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) {
double val = GET_DOUBLE_VAL(tval);
if ((prev < val) ^ isMinFunc) {
pBuf->v = val;
if (pCtx->subsidiaries.num > 0) {
saveTupleData(pCtx, index, pCtx->pSrcBlock, &pBuf->tuplePos);
}
@ -1739,7 +1720,7 @@ int32_t percentileFunction(SqlFunctionCtx* pCtx) {
char* data = colDataGetData(pCol, i);
double v = 0;
GET_TYPED_DATA(v, double, pCtx->inputType, data);
GET_TYPED_DATA(v, double, type, data);
if (v < GET_DOUBLE_VAL(&pInfo->minval)) {
SET_DOUBLE_VAL(&pInfo->minval, v);
}
@ -2552,7 +2533,7 @@ int32_t elapsedFunction(SqlFunctionCtx *pCtx) {
}
}
} else { // computing based on the true data block
if (0 == pCtx->size) {
if (0 == pInput->numOfRows) {
if (pCtx->order == TSDB_ORDER_DESC) {
if (pCtx->end.key != INT64_MIN) {
pInfo->min = pCtx->end.key;
@ -2571,7 +2552,7 @@ int32_t elapsedFunction(SqlFunctionCtx *pCtx) {
TSKEY* ptsList = (int64_t*)colDataGetData(pCol, start);
if (pCtx->order == TSDB_ORDER_DESC) {
if (pCtx->start.key == INT64_MIN) {
pInfo->max = (pInfo->max < ptsList[pCtx->size - 1]) ? ptsList[pCtx->size - 1] : pInfo->max;
pInfo->max = (pInfo->max < ptsList[start + pInput->numOfRows - 1]) ? ptsList[start + pInput->numOfRows - 1] : pInfo->max;
} else {
pInfo->max = pCtx->start.key + 1;
}
@ -2591,7 +2572,7 @@ int32_t elapsedFunction(SqlFunctionCtx *pCtx) {
if (pCtx->end.key != INT64_MIN) {
pInfo->max = pCtx->end.key + 1;
} else {
pInfo->max = ptsList[pCtx->size - 1];
pInfo->max = ptsList[start + pInput->numOfRows - 1];
}
}
}

View File

@ -480,7 +480,7 @@ static bool function_setup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInf
initResultRowEntry(pResultInfo, pCtx->resDataInfo.interBufSize);
return true;
}
#if 0
/**
* in handling the stable query, function_finalizer is called after the secondary
* merge being completed, during the first merge procedure, which is executed at the
@ -497,53 +497,6 @@ static void function_finalizer(SqlFunctionCtx *pCtx) {
doFinalizer(pCtx);
}
/*
* count function does need the finalize, if data is missing, the default value, which is 0, is used
* count function does not use the pCtx->interResBuf to keep the intermediate buffer
*/
static void count_function(SqlFunctionCtx *pCtx) {
int32_t numOfElem = 0;
/*
* 1. column data missing (schema modified) causes pCtx->hasNull == true. pCtx->isAggSet == true;
* 2. for general non-primary key columns, pCtx->hasNull may be true or false, pCtx->isAggSet == true;
* 3. for primary key column, pCtx->hasNull always be false, pCtx->isAggSet == false;
*/
if (pCtx->isAggSet) {
numOfElem = pCtx->size - pCtx->agg.numOfNull;
} else {
if (pCtx->hasNull) {
for (int32_t i = 0; i < pCtx->size; ++i) {
char *val = GET_INPUT_DATA(pCtx, i);
if (isNull(val, pCtx->inputType)) {
continue;
}
numOfElem += 1;
}
} else {
//when counting on the primary time stamp column and no statistics data is presented, use the size value directly.
numOfElem = pCtx->size;
}
}
if (numOfElem > 0) {
// GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG;
}
*((int64_t *)pCtx->pOutput) += numOfElem;
SET_VAL(pCtx, numOfElem, 1);
}
static void count_func_merge(SqlFunctionCtx *pCtx) {
int64_t *pData = (int64_t *)GET_INPUT_DATA_LIST(pCtx);
for (int32_t i = 0; i < pCtx->size; ++i) {
*((int64_t *)pCtx->pOutput) += pData[i];
}
SET_VAL(pCtx, pCtx->size, 1);
}
/**
* 1. If the column value for filter exists, we need to load the SFields, which serves
* as the pre-filter to decide if the actual data block is required or not.
@ -633,113 +586,6 @@ int32_t noDataRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
LOOPCHECK_N(*_data, _list, ctx, tsdbType, sign, notNullElems); \
} while (0)
static void do_sum(SqlFunctionCtx *pCtx) {
int32_t notNullElems = 0;
// Only the pre-computing information loaded and actual data does not loaded
if (pCtx->isAggSet) {
notNullElems = pCtx->size - pCtx->agg.numOfNull;
assert(pCtx->size >= pCtx->agg.numOfNull);
if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) {
int64_t *retVal = (int64_t *)pCtx->pOutput;
*retVal += pCtx->agg.sum;
} else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) {
uint64_t *retVal = (uint64_t *)pCtx->pOutput;
*retVal += (uint64_t)pCtx->agg.sum;
} else if (IS_FLOAT_TYPE(pCtx->inputType)) {
double *retVal = (double*) pCtx->pOutput;
SET_DOUBLE_VAL(retVal, *retVal + GET_DOUBLE_VAL((const char*)&(pCtx->agg.sum)));
}
} else { // computing based on the true data block
void *pData = GET_INPUT_DATA_LIST(pCtx);
notNullElems = 0;
if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) {
int64_t *retVal = (int64_t *)pCtx->pOutput;
if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) {
LIST_ADD_N(*retVal, pCtx, pData, int8_t, notNullElems, pCtx->inputType);
} else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) {
LIST_ADD_N(*retVal, pCtx, pData, int16_t, notNullElems, pCtx->inputType);
} else if (pCtx->inputType == TSDB_DATA_TYPE_INT) {
LIST_ADD_N(*retVal, pCtx, pData, int32_t, notNullElems, pCtx->inputType);
} else if (pCtx->inputType == TSDB_DATA_TYPE_BIGINT) {
LIST_ADD_N(*retVal, pCtx, pData, int64_t, notNullElems, pCtx->inputType);
}
} else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) {
uint64_t *retVal = (uint64_t *)pCtx->pOutput;
if (pCtx->inputType == TSDB_DATA_TYPE_UTINYINT) {
LIST_ADD_N(*retVal, pCtx, pData, uint8_t, notNullElems, pCtx->inputType);
} else if (pCtx->inputType == TSDB_DATA_TYPE_USMALLINT) {
LIST_ADD_N(*retVal, pCtx, pData, uint16_t, notNullElems, pCtx->inputType);
} else if (pCtx->inputType == TSDB_DATA_TYPE_UINT) {
LIST_ADD_N(*retVal, pCtx, pData, uint32_t, notNullElems, pCtx->inputType);
} else if (pCtx->inputType == TSDB_DATA_TYPE_UBIGINT) {
LIST_ADD_N(*retVal, pCtx, pData, uint64_t, notNullElems, pCtx->inputType);
}
} else if (pCtx->inputType == TSDB_DATA_TYPE_DOUBLE) {
double *retVal = (double *)pCtx->pOutput;
LIST_ADD_N_DOUBLE(*retVal, pCtx, pData, double, notNullElems, pCtx->inputType);
} else if (pCtx->inputType == TSDB_DATA_TYPE_FLOAT) {
double *retVal = (double *)pCtx->pOutput;
LIST_ADD_N_DOUBLE_FLOAT(*retVal, pCtx, pData, float, notNullElems, pCtx->inputType);
}
}
// data in the check operation are all null, not output
SET_VAL(pCtx, notNullElems, 1);
if (notNullElems > 0) {
// GET_RES_INFO(pCtx)->hasResult = DATA_SET_FLAG;
}
}
static void sum_function(SqlFunctionCtx *pCtx) {
do_sum(pCtx);
// keep the result data in output buffer, not in the intermediate buffer
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
// if (pResInfo->hasResult == DATA_SET_FLAG && pCtx->stableQuery) {
// set the flag for super table query
SSumInfo *pSum = (SSumInfo *)pCtx->pOutput;
pSum->hasResult = DATA_SET_FLAG;
// }
}
static void sum_func_merge(SqlFunctionCtx *pCtx) {
int32_t notNullElems = 0;
GET_TRUE_DATA_TYPE();
assert(pCtx->stableQuery);
for (int32_t i = 0; i < pCtx->size; ++i) {
char * input = GET_INPUT_DATA(pCtx, i);
SSumInfo *pInput = (SSumInfo *)input;
if (pInput->hasResult != DATA_SET_FLAG) {
continue;
}
notNullElems++;
if (IS_SIGNED_NUMERIC_TYPE(type)) {
*(int64_t *)pCtx->pOutput += pInput->isum;
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
*(uint64_t *) pCtx->pOutput += pInput->usum;
} else {
SET_DOUBLE_VAL((double *)pCtx->pOutput, *(double *)pCtx->pOutput + pInput->dsum);
}
}
SET_VAL(pCtx, notNullElems, 1);
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
if (notNullElems > 0) {
//pResInfo->hasResult = DATA_SET_FLAG;
}
}
static int32_t statisRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId) {
return BLK_DATA_SMA_LOAD;
}
@ -822,17 +668,17 @@ static int32_t lastDistFuncRequired(SqlFunctionCtx *pCtx, STimeWindow* w, int32_
*/
static void avg_function(SqlFunctionCtx *pCtx) {
int32_t notNullElems = 0;
// NOTE: keep the intermediate result into the interResultBuf
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
SAvgInfo *pAvgInfo = (SAvgInfo *)GET_ROWCELL_INTERBUF(pResInfo);
double *pVal = &pAvgInfo->sum;
if (pCtx->isAggSet) { // Pre-aggregation
notNullElems = pCtx->size - pCtx->agg.numOfNull;
assert(notNullElems >= 0);
if (IS_SIGNED_NUMERIC_TYPE(pCtx->inputType)) {
*pVal += pCtx->agg.sum;
} else if (IS_UNSIGNED_NUMERIC_TYPE(pCtx->inputType)) {
@ -842,7 +688,7 @@ static void avg_function(SqlFunctionCtx *pCtx) {
}
} else {
void *pData = GET_INPUT_DATA_LIST(pCtx);
if (pCtx->inputType == TSDB_DATA_TYPE_TINYINT) {
LIST_ADD_N(*pVal, pCtx, pData, int8_t, notNullElems, pCtx->inputType);
} else if (pCtx->inputType == TSDB_DATA_TYPE_SMALLINT) {
@ -865,18 +711,18 @@ static void avg_function(SqlFunctionCtx *pCtx) {
LIST_ADD_N(*pVal, pCtx, pData, uint64_t, notNullElems, pCtx->inputType);
}
}
if (!pCtx->hasNull) {
assert(notNullElems == pCtx->size);
}
SET_VAL(pCtx, notNullElems, 1);
pAvgInfo->num += notNullElems;
if (notNullElems > 0) {
//pResInfo->hasResult = DATA_SET_FLAG;
}
// keep the data into the final output buffer for super table query since this execution may be the last one
if (pCtx->stableQuery) {
memcpy(pCtx->pOutput, GET_ROWCELL_INTERBUF(pResInfo), sizeof(SAvgInfo));
@ -885,18 +731,18 @@ static void avg_function(SqlFunctionCtx *pCtx) {
static void avg_func_merge(SqlFunctionCtx *pCtx) {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
double *sum = (double*) pCtx->pOutput;
char *input = GET_INPUT_DATA_LIST(pCtx);
for (int32_t i = 0; i < pCtx->size; ++i, input += pCtx->inputBytes) {
SAvgInfo *pInput = (SAvgInfo *)input;
if (pInput->num == 0) { // current input is null
continue;
}
SET_DOUBLE_VAL(sum, *sum + pInput->sum);
// keep the number of data into the temp buffer
*(int64_t *)GET_ROWCELL_INTERBUF(pResInfo) += pInput->num;
}
@ -2735,18 +2581,6 @@ static void deriv_function(SqlFunctionCtx *pCtx) {
GET_RES_INFO(pCtx)->numOfRes += notNullElems;
}
#define DIFF_IMPL(ctx, d, type) \
do { \
if ((ctx)->param[1].param.nType == INITIAL_VALUE_NOT_ASSIGNED) { \
(ctx)->param[1].param.nType = (ctx)->inputType; \
*(type *)&(ctx)->param[1].param.i = *(type *)(d); \
} else { \
*(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].param.i)); \
*(type *)(&(ctx)->param[1].param.i) = *(type *)(d); \
*(int64_t *)(ctx)->pTsOutput = GET_TS_DATA(ctx, index); \
} \
} while (0);
// TODO difference in date column
static void diff_function(SqlFunctionCtx *pCtx) {
void *data = GET_INPUT_DATA_LIST(pCtx);
@ -4072,460 +3906,4 @@ int32_t functionCompatList[] = {
// tid_tag, derivative, blk_info
6, 8, 7,
};
SAggFunctionInfo aggFunc[35] = {{
// 0, count function does not invoke the finalize function
"count",
FUNCTION_TYPE_AGG,
FUNCTION_COUNT,
FUNCTION_COUNT,
BASIC_FUNC_SO,
function_setup,
count_function,
doFinalizer,
count_func_merge,
countRequired,
},
{
// 1
"sum",
FUNCTION_TYPE_AGG,
FUNCTION_SUM,
FUNCTION_SUM,
BASIC_FUNC_SO,
function_setup,
sum_function,
function_finalizer,
sum_func_merge,
statisRequired,
},
{
// 2
"avg",
FUNCTION_TYPE_AGG,
FUNCTION_AVG,
FUNCTION_AVG,
BASIC_FUNC_SO,
function_setup,
avg_function,
avg_finalizer,
avg_func_merge,
statisRequired,
},
{
// 3
"min",
FUNCTION_TYPE_AGG,
FUNCTION_MIN,
FUNCTION_MIN,
BASIC_FUNC_SO | FUNCSTATE_SELECTIVITY,
min_func_setup,
NULL,
function_finalizer,
min_func_merge,
statisRequired,
},
{
// 4
"max",
FUNCTION_TYPE_AGG,
FUNCTION_MAX,
FUNCTION_MAX,
BASIC_FUNC_SO | FUNCSTATE_SELECTIVITY,
max_func_setup,
NULL,
function_finalizer,
max_func_merge,
statisRequired,
},
{
// 5
"stddev",
FUNCTION_TYPE_AGG,
FUNCTION_STDDEV,
FUNCTION_STDDEV_DST,
FUNCSTATE_SO | FUNCSTATE_STREAM,
function_setup,
stddev_function,
stddev_finalizer,
noop1,
dataBlockRequired,
},
{
// 6
"percentile",
FUNCTION_TYPE_AGG,
FUNCTION_PERCT,
FUNCTION_INVALID_ID,
FUNCSTATE_SO | FUNCSTATE_STREAM,
percentile_function_setup,
percentile_function,
percentile_finalizer,
noop1,
dataBlockRequired,
},
{
// 7
"apercentile",
FUNCTION_TYPE_AGG,
FUNCTION_APERCT,
FUNCTION_APERCT,
FUNCSTATE_SO | FUNCSTATE_STREAM | FUNCSTATE_STABLE,
apercentile_function_setup,
apercentile_function,
apercentile_finalizer,
apercentile_func_merge,
dataBlockRequired,
},
{
// 8
"first",
FUNCTION_TYPE_AGG,
FUNCTION_FIRST,
FUNCTION_FIRST_DST,
BASIC_FUNC_SO | FUNCSTATE_SELECTIVITY,
function_setup,
first_function,
function_finalizer,
noop1,
firstFuncRequired,
},
{
// 9
"last",
FUNCTION_TYPE_AGG,
FUNCTION_LAST,
FUNCTION_LAST_DST,
BASIC_FUNC_SO | FUNCSTATE_SELECTIVITY,
function_setup,
last_function,
function_finalizer,
noop1,
lastFuncRequired,
},
{
// 10
"last_row",
FUNCTION_TYPE_AGG,
FUNCTION_LAST_ROW,
FUNCTION_LAST_ROW,
FUNCSTATE_SO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY,
first_last_function_setup,
last_row_function,
last_row_finalizer,
last_dist_func_merge,
dataBlockRequired,
},
{
// 11
"top",
FUNCTION_TYPE_AGG,
FUNCTION_TOP,
FUNCTION_TOP,
FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY,
top_bottom_function_setup,
top_function,
top_bottom_func_finalizer,
top_func_merge,
dataBlockRequired,
},
{
// 12
"bottom",
FUNCTION_TYPE_AGG,
FUNCTION_BOTTOM,
FUNCTION_BOTTOM,
FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY,
top_bottom_function_setup,
bottom_function,
top_bottom_func_finalizer,
bottom_func_merge,
dataBlockRequired,
},
{
// 13
"spread",
FUNCTION_TYPE_AGG,
FUNCTION_SPREAD,
FUNCTION_SPREAD,
BASIC_FUNC_SO,
spread_function_setup,
spread_function,
spread_function_finalizer,
spread_func_merge,
countRequired,
},
{
// 14
"twa",
FUNCTION_TYPE_AGG,
FUNCTION_TWA,
FUNCTION_TWA,
BASIC_FUNC_SO | FUNCSTATE_NEED_TS,
twa_function_setup,
twa_function,
twa_function_finalizer,
twa_function_copy,
dataBlockRequired,
},
{
// 15
"leastsquares",
FUNCTION_TYPE_AGG,
FUNCTION_LEASTSQR,
FUNCTION_INVALID_ID,
FUNCSTATE_SO | FUNCSTATE_STREAM,
leastsquares_function_setup,
leastsquares_function,
leastsquares_finalizer,
noop1,
dataBlockRequired,
},
{
// 16
"dummy",
FUNCTION_TYPE_AGG,
FUNCTION_TS,
FUNCTION_TS,
BASIC_FUNC_SO | FUNCSTATE_NEED_TS,
function_setup,
date_col_output_function,
doFinalizer,
copy_function,
noDataRequired,
},
{
// 17
"ts",
FUNCTION_TYPE_AGG,
FUNCTION_TS_DUMMY,
FUNCTION_TS_DUMMY,
BASIC_FUNC_SO | FUNCSTATE_NEED_TS,
function_setup,
noop1,
doFinalizer,
copy_function,
dataBlockRequired,
},
{
// 18
"tag_dummy",
FUNCTION_TYPE_AGG,
FUNCTION_TAG_DUMMY,
FUNCTION_TAG_DUMMY,
BASIC_FUNC_SO,
function_setup,
tag_function,
doFinalizer,
copy_function,
noDataRequired,
},
{
// 19
"ts",
FUNCTION_TYPE_AGG,
FUNCTION_TS_COMP,
FUNCTION_TS_COMP,
FUNCSTATE_MO | FUNCSTATE_NEED_TS,
ts_comp_function_setup,
ts_comp_function,
ts_comp_finalize,
copy_function,
dataBlockRequired,
},
{
// 20
"tag",
FUNCTION_TYPE_AGG,
FUNCTION_TAG,
FUNCTION_TAG,
BASIC_FUNC_SO,
function_setup,
tag_function,
doFinalizer,
copy_function,
noDataRequired,
},
{//TODO this is a scala function
// 21, column project sql function
"colprj",
FUNCTION_TYPE_AGG,
FUNCTION_PRJ,
FUNCTION_PRJ,
BASIC_FUNC_MO | FUNCSTATE_NEED_TS,
function_setup,
col_project_function,
doFinalizer,
copy_function,
dataBlockRequired,
},
{
// 22, multi-output, tag function has only one result
"tagprj",
FUNCTION_TYPE_AGG,
FUNCTION_TAGPRJ,
FUNCTION_TAGPRJ,
BASIC_FUNC_MO,
function_setup,
tag_project_function,
doFinalizer,
copy_function,
noDataRequired,
},
{
// 23
"arithmetic",
FUNCTION_TYPE_AGG,
FUNCTION_ARITHM,
FUNCTION_ARITHM,
FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS,
function_setup,
arithmetic_function,
doFinalizer,
copy_function,
dataBlockRequired,
},
{
// 24
"diff",
FUNCTION_TYPE_AGG,
FUNCTION_DIFF,
FUNCTION_INVALID_ID,
FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY,
diff_function_setup,
diff_function,
doFinalizer,
noop1,
dataBlockRequired,
},
// distributed version used in two-stage aggregation processes
{
// 25
"first_dist",
FUNCTION_TYPE_AGG,
FUNCTION_FIRST_DST,
FUNCTION_FIRST_DST,
BASIC_FUNC_SO | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY,
first_last_function_setup,
first_dist_function,
function_finalizer,
first_dist_func_merge,
firstDistFuncRequired,
},
{
// 26
"last_dist",
FUNCTION_TYPE_AGG,
FUNCTION_LAST_DST,
FUNCTION_LAST_DST,
BASIC_FUNC_SO | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY,
first_last_function_setup,
last_dist_function,
function_finalizer,
last_dist_func_merge,
lastDistFuncRequired,
},
{
// 27
"stddev", // return table id and the corresponding tags for join match and subscribe
FUNCTION_TYPE_AGG,
FUNCTION_STDDEV_DST,
FUNCTION_AVG,
FUNCSTATE_SO | FUNCSTATE_STABLE,
function_setup,
NULL,
NULL,
NULL,
dataBlockRequired,
},
{
// 28
"interp",
FUNCTION_TYPE_AGG,
FUNCTION_INTERP,
FUNCTION_INTERP,
FUNCSTATE_SO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS ,
function_setup,
interp_function,
doFinalizer,
copy_function,
dataBlockRequired,
},
{
// 29
"rate",
FUNCTION_TYPE_AGG,
FUNCTION_RATE,
FUNCTION_RATE,
BASIC_FUNC_SO | FUNCSTATE_NEED_TS,
rate_function_setup,
rate_function,
rate_finalizer,
rate_func_copy,
dataBlockRequired,
},
{
// 30
"irate",
FUNCTION_TYPE_AGG,
FUNCTION_IRATE,
FUNCTION_IRATE,
BASIC_FUNC_SO | FUNCSTATE_NEED_TS,
rate_function_setup,
irate_function,
rate_finalizer,
rate_func_copy,
dataBlockRequired,
},
{
// 31
"tbid", // return table id and the corresponding tags for join match and subscribe
FUNCTION_TYPE_AGG,
FUNCTION_TID_TAG,
FUNCTION_TID_TAG,
FUNCSTATE_MO | FUNCSTATE_STABLE,
function_setup,
noop1,
noop1,
noop1,
dataBlockRequired,
},
{ //32
"derivative", // return table id and the corresponding tags for join match and subscribe
FUNCTION_TYPE_AGG,
FUNCTION_DERIVATIVE,
FUNCTION_INVALID_ID,
FUNCSTATE_MO | FUNCSTATE_STABLE | FUNCSTATE_NEED_TS | FUNCSTATE_SELECTIVITY,
deriv_function_setup,
deriv_function,
doFinalizer,
noop1,
dataBlockRequired,
},
{
// 33
"block_dist", // return table id and the corresponding tags for join match and subscribe
FUNCTION_TYPE_AGG,
FUNCTION_BLKINFO,
FUNCTION_BLKINFO,
FUNCSTATE_SO | FUNCSTATE_STABLE,
function_setup,
blockInfo_func,
blockinfo_func_finalizer,
block_func_merge,
dataBlockRequired,
},
{
// 34
"cov", // return table id and the corresponding tags for join match and subscribe
FUNCTION_TYPE_AGG,
FUNCTION_COV,
FUNCTION_COV,
FUNCSTATE_SO | FUNCSTATE_STABLE,
function_setup,
sum_function,
function_finalizer,
sum_func_merge,
statisRequired,
}
};
#endif

View File

@ -1386,7 +1386,7 @@ int32_t cleanUpUdfs() {
if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) {
taosArrayPush(udfStubs, stub);
} else {
fnInfo("invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache",
fnInfo("udf invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache",
stub->udfName, stub->refCount, stub->lastRefTime);
}
}
@ -1528,7 +1528,15 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols,
if (code != 0) {
return code;
}
SUdfcUvSession *session = handle;
code = doCallUdfScalarFunc(handle, input, numOfCols, output);
if (session->outputType != output->columnData->info.type
|| session->outputLen != output->columnData->info.bytes) {
fnError("udfc scalar function calculate error, session type: %d(%d), output type: %d(%d)",
session->outputType, session->outputLen,
output->columnData->info.type, output->columnData->info.bytes);
code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE;
}
releaseUdfFuncHandle(udfName);
return code;
}
@ -1602,6 +1610,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
SUdfInterBuf buf = {0};
if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) {
fnError("udfAggInit error. step doCallUdfAggInit. udf code: %d", udfCode);
releaseUdfFuncHandle(pCtx->udfName);
return false;
}
udfRes->interResNum = buf.numOfResult;
@ -1609,6 +1618,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
memcpy(udfRes->interResBuf, buf.buf, buf.bufLen);
} else {
fnError("udfc inter buf size %d is greater than function bufSize %d", buf.bufLen, session->bufSize);
releaseUdfFuncHandle(pCtx->udfName);
return false;
}
freeUdfInterBuf(&buf);
@ -1674,6 +1684,9 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) {
blockDataDestroy(inputBlock);
taosArrayDestroy(tempBlock.pDataBlock);
if (udfCode != 0) {
releaseUdfFuncHandle(pCtx->udfName);
}
freeUdfInterBuf(&newState);
return udfCode;
}

View File

@ -139,7 +139,7 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SUdf* udf = msgInfo->param;
udf->funcType = pFuncInfo->funcType;
udf->scriptType = pFuncInfo->scriptType;
udf->outputType = pFuncInfo->funcType;
udf->outputType = pFuncInfo->outputType;
udf->outputLen = pFuncInfo->outputLen;
udf->bufSize = pFuncInfo->bufSize;

View File

@ -37,14 +37,14 @@ int tdbBegin(TENV *pEnv, TXN *pTxn);
int tdbCommit(TENV *pEnv, TXN *pTxn);
// TDB
int tdbDbOpen(const char *fname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TENV *pEnv, TDB **ppDb);
int tdbDbClose(TDB *pDb);
int tdbDbDrop(TDB *pDb);
int tdbDbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn);
int tdbDbDelete(TDB *pDb, const void *pKey, int kLen, TXN *pTxn);
int tdbDbUpsert(TDB *pDb, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn);
int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen);
int tdbDbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen);
int tdbOpen(const char *fname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TENV *pEnv, TDB **ppDb);
int tdbClose(TDB *pDb);
int tdbDrop(TDB *pDb);
int tdbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn);
int tdbDelete(TDB *pDb, const void *pKey, int kLen, TXN *pTxn);
int tdbUpsert(TDB *pDb, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn);
int tdbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen);
int tdbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen);
// TDBC
int tdbDbcOpen(TDB *pDb, TDBC **ppDbc, TXN *pTxn);

View File

@ -24,7 +24,7 @@ struct STDBC {
SBTC btc;
};
int tdbDbOpen(const char *fname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TENV *pEnv, TDB **ppDb) {
int tdbOpen(const char *fname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TENV *pEnv, TDB **ppDb) {
TDB *pDb;
SPager *pPager;
int ret;
@ -65,7 +65,7 @@ int tdbDbOpen(const char *fname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn
return 0;
}
int tdbDbClose(TDB *pDb) {
int tdbClose(TDB *pDb) {
if (pDb) {
tdbBtreeClose(pDb->pBt);
tdbOsFree(pDb);
@ -73,26 +73,26 @@ int tdbDbClose(TDB *pDb) {
return 0;
}
int tdbDbDrop(TDB *pDb) {
int tdbDrop(TDB *pDb) {
// TODO
return 0;
}
int tdbDbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) {
int tdbInsert(TDB *pDb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) {
return tdbBtreeInsert(pDb->pBt, pKey, keyLen, pVal, valLen, pTxn);
}
int tdbDbDelete(TDB *pDb, const void *pKey, int kLen, TXN *pTxn) { return tdbBtreeDelete(pDb->pBt, pKey, kLen, pTxn); }
int tdbDelete(TDB *pDb, const void *pKey, int kLen, TXN *pTxn) { return tdbBtreeDelete(pDb->pBt, pKey, kLen, pTxn); }
int tdbDbUpsert(TDB *pDb, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn) {
int tdbUpsert(TDB *pDb, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn) {
return tdbBtreeUpsert(pDb->pBt, pKey, kLen, pVal, vLen, pTxn);
}
int tdbDbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen) {
int tdbGet(TDB *pDb, const void *pKey, int kLen, void **ppVal, int *vLen) {
return tdbBtreeGet(pDb->pBt, pKey, kLen, ppVal, vLen);
}
int tdbDbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen) {
int tdbPGet(TDB *pDb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen) {
return tdbBtreePGet(pDb->pBt, pKey, kLen, ppKey, pkLen, ppVal, vLen);
}

View File

@ -131,7 +131,7 @@ TEST(tdb_test, simple_insert1) {
// Create a database
compFunc = tKeyCmpr;
ret = tdbDbOpen("db.db", -1, -1, compFunc, pEnv, &pDb);
ret = tdbOpen("db.db", -1, -1, compFunc, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0);
{
@ -152,7 +152,7 @@ TEST(tdb_test, simple_insert1) {
for (int iData = 1; iData <= nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(val, "value%d", iData);
ret = tdbDbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
ret = tdbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
GTEST_ASSERT_EQ(ret, 0);
// if pool is full, commit the transaction and start a new one
@ -181,7 +181,7 @@ TEST(tdb_test, simple_insert1) {
sprintf(key, "key%d", i);
sprintf(val, "value%d", i);
ret = tdbDbGet(pDb, key, strlen(key), &pVal, &vLen);
ret = tdbGet(pDb, key, strlen(key), &pVal, &vLen);
ASSERT(ret == 0);
GTEST_ASSERT_EQ(ret, 0);
@ -224,11 +224,11 @@ TEST(tdb_test, simple_insert1) {
}
}
ret = tdbDbDrop(pDb);
ret = tdbDrop(pDb);
GTEST_ASSERT_EQ(ret, 0);
// Close a database
tdbDbClose(pDb);
tdbClose(pDb);
// Close Env
ret = tdbEnvClose(pEnv);
@ -251,7 +251,7 @@ TEST(tdb_test, simple_insert2) {
// Create a database
compFunc = tDefaultKeyCmpr;
ret = tdbDbOpen("db.db", -1, -1, compFunc, pEnv, &pDb);
ret = tdbOpen("db.db", -1, -1, compFunc, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0);
{
@ -271,7 +271,7 @@ TEST(tdb_test, simple_insert2) {
for (int iData = 1; iData <= nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(val, "value%d", iData);
ret = tdbDbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
ret = tdbInsert(pDb, key, strlen(key), val, strlen(val), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
@ -311,11 +311,11 @@ TEST(tdb_test, simple_insert2) {
tdbCommit(pEnv, &txn);
tdbTxnClose(&txn);
ret = tdbDbDrop(pDb);
ret = tdbDrop(pDb);
GTEST_ASSERT_EQ(ret, 0);
// Close a database
tdbDbClose(pDb);
tdbClose(pDb);
// Close Env
ret = tdbEnvClose(pEnv);
@ -346,7 +346,7 @@ TEST(tdb_test, simple_delete1) {
GTEST_ASSERT_EQ(ret, 0);
// open database
ret = tdbDbOpen("db.db", -1, -1, tKeyCmpr, pEnv, &pDb);
ret = tdbOpen("db.db", -1, -1, tKeyCmpr, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0);
tdbTxnOpen(&txn, 0, poolMalloc, poolFree, pPool, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
@ -356,7 +356,7 @@ TEST(tdb_test, simple_delete1) {
for (int iData = 0; iData < nKV; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d", iData);
ret = tdbDbInsert(pDb, key, strlen(key), data, strlen(data), &txn);
ret = tdbInsert(pDb, key, strlen(key), data, strlen(data), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
@ -365,7 +365,7 @@ TEST(tdb_test, simple_delete1) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d", iData);
ret = tdbDbGet(pDb, key, strlen(key), &pData, &nData);
ret = tdbGet(pDb, key, strlen(key), &pData, &nData);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(memcmp(data, pData, nData), 0);
}
@ -374,7 +374,7 @@ TEST(tdb_test, simple_delete1) {
for (int iData = nKV - 1; iData > 30; iData--) {
sprintf(key, "key%d", iData);
ret = tdbDbDelete(pDb, key, strlen(key), &txn);
ret = tdbDelete(pDb, key, strlen(key), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
@ -382,7 +382,7 @@ TEST(tdb_test, simple_delete1) {
for (int iData = 0; iData < nKV; iData++) {
sprintf(key, "key%d", iData);
ret = tdbDbGet(pDb, key, strlen(key), &pData, &nData);
ret = tdbGet(pDb, key, strlen(key), &pData, &nData);
if (iData <= 30) {
GTEST_ASSERT_EQ(ret, 0);
} else {
@ -413,7 +413,7 @@ TEST(tdb_test, simple_delete1) {
closePool(pPool);
tdbDbClose(pDb);
tdbClose(pDb);
tdbEnvClose(pEnv);
}
@ -435,7 +435,7 @@ TEST(tdb_test, simple_upsert1) {
GTEST_ASSERT_EQ(ret, 0);
// open database
ret = tdbDbOpen("db.db", -1, -1, NULL, pEnv, &pDb);
ret = tdbOpen("db.db", -1, -1, NULL, pEnv, &pDb);
GTEST_ASSERT_EQ(ret, 0);
pPool = openPool();
@ -446,7 +446,7 @@ TEST(tdb_test, simple_upsert1) {
for (int iData = 0; iData < nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d", iData);
ret = tdbDbInsert(pDb, key, strlen(key), data, strlen(data), &txn);
ret = tdbInsert(pDb, key, strlen(key), data, strlen(data), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
@ -454,7 +454,7 @@ TEST(tdb_test, simple_upsert1) {
for (int iData = 0; iData < nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d", iData);
ret = tdbDbGet(pDb, key, strlen(key), &pData, &nData);
ret = tdbGet(pDb, key, strlen(key), &pData, &nData);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(memcmp(pData, data, nData), 0);
}
@ -463,7 +463,7 @@ TEST(tdb_test, simple_upsert1) {
for (int iData = 0; iData < nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d-u", iData);
ret = tdbDbUpsert(pDb, key, strlen(key), data, strlen(data), &txn);
ret = tdbUpsert(pDb, key, strlen(key), data, strlen(data), &txn);
GTEST_ASSERT_EQ(ret, 0);
}
@ -473,11 +473,11 @@ TEST(tdb_test, simple_upsert1) {
for (int iData = 0; iData < nData; iData++) {
sprintf(key, "key%d", iData);
sprintf(data, "data%d-u", iData);
ret = tdbDbGet(pDb, key, strlen(key), &pData, &nData);
ret = tdbGet(pDb, key, strlen(key), &pData, &nData);
GTEST_ASSERT_EQ(ret, 0);
GTEST_ASSERT_EQ(memcmp(pData, data, nData), 0);
}
tdbDbClose(pDb);
tdbClose(pDb);
tdbEnvClose(pEnv);
}

View File

@ -170,11 +170,11 @@ CaseCfg gCase[] = {
// 22
{"insert:AUTO1-FULL", tListLen(fullColList), fullColList, TTYPE_INSERT, true, true, insertAUTOTest1, 10, 10, 2, 0, 0, 0, 1, -1},
// {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryColumnTest, 10, 10, 1, 3, 0, 0, 1, 2},
// {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryMiscTest, 10, 10, 1, 3, 0, 0, 1, 2},
{"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryColumnTest, 10, 10, 1, 3, 0, 0, 1, 2},
{"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryMiscTest, 10, 10, 1, 3, 0, 0, 1, 2},
{"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryColumnTest, 1, 10, 1, 1, 0, 0, 1, 2},
{"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryMiscTest, 2, 10, 1, 1, 0, 0, 1, 2},
// {"query:SUBT-COLUMN", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryColumnTest, 1, 10, 1, 1, 0, 0, 1, 2},
// {"query:SUBT-MISC", tListLen(fullColList), fullColList, TTYPE_QUERY, false, false, queryMiscTest, 2, 10, 1, 1, 0, 0, 1, 2},
};
@ -209,7 +209,7 @@ typedef struct {
int32_t caseRunNum; // total run case num
} CaseCtrl;
#if 0
#if 1
CaseCtrl gCaseCtrl = { // default
.bindNullNum = 0,
.printCreateTblSql = false,
@ -267,7 +267,7 @@ CaseCtrl gCaseCtrl = {
};
#endif
#if 1
#if 0
CaseCtrl gCaseCtrl = { // query case with specified col&oper
.bindNullNum = 1,
.printCreateTblSql = false,
@ -292,7 +292,7 @@ CaseCtrl gCaseCtrl = { // query case with specified col&oper
#if 0
CaseCtrl gCaseCtrl = { // query case with specified col&oper
.bindNullNum = 0,
.bindNullNum = 1,
.printCreateTblSql = true,
.printQuerySql = true,
.printStmtSql = true,
@ -309,10 +309,10 @@ CaseCtrl gCaseCtrl = { // query case with specified col&oper
.printRes = true,
.runTimes = 0,
.caseRunIdx = -1,
.optrIdxListNum = tListLen(optrIdxList),
.optrIdxList = optrIdxList,
.bindColTypeNum = tListLen(bindColTypeList),
.bindColTypeList = bindColTypeList,
//.optrIdxListNum = tListLen(optrIdxList),
//.optrIdxList = optrIdxList,
//.bindColTypeNum = tListLen(bindColTypeList),
//.bindColTypeList = bindColTypeList,
.caseIdx = 24,
.caseNum = 1,
.caseRunNum = 1,
@ -665,15 +665,16 @@ void bpGenerateConstInFuncSQL(BindData *data, int32_t tblIdx) {
void generateQueryMiscSQL(BindData *data, int32_t tblIdx) {
switch(tblIdx) {
case 0:
//TODO FILL TEST
default:
bpGenerateConstInOpSQL(data, tblIdx);
break;
case FUNCTION_TEST_IDX:
bpGenerateConstInFuncSQL(data, tblIdx);
break;
if (tblIdx == FUNCTION_TEST_IDX && gCurCase->bindNullNum <= 0) {
bpGenerateConstInFuncSQL(data, tblIdx);
} else {
switch(tblIdx) {
case 0:
//TODO FILL TEST
default:
bpGenerateConstInOpSQL(data, tblIdx);
break;
}
}
if (gCaseCtrl.printStmtSql) {
@ -1064,6 +1065,8 @@ int32_t prepareQueryMiscData(BindData *data, int32_t tblIdx) {
if (tblIdx == FUNCTION_TEST_IDX) {
gCaseCtrl.numericParam = true;
} else {
gCaseCtrl.numericParam = false;
}
for (int b = 0; b < bindNum; b++) {
@ -1072,8 +1075,10 @@ int32_t prepareQueryMiscData(BindData *data, int32_t tblIdx) {
}
}
gCaseCtrl.numericParam = false;
generateQueryMiscSQL(data, tblIdx);
return 0;
}

View File

@ -39,7 +39,7 @@ sql show databases
print ==> rows: $rows
print ==> $data(db)[0] $data(db)[1] $data(db)[2] $data(db)[3] $data(db)[4] $data(db)[5] $data(db)[6] $data(db)[7] $data(db)[8] $data(db)[9] $data(db)[10] $data(db)[11] $data(db)[12]
print $data(db)[13] $data(db)[14] $data(db)[15] $data(db)[16] $data(db)[17] $data(db)[18] $data(db)[19] $data(db)[20]
if $data(db)[19] != nostrict then
if $data(db)[19] != ready then
sleep 100
$loop_cnt = $loop_cnt + 1
goto check_db_ready

View File

@ -39,7 +39,7 @@ sql show databases
print ==> rows: $rows
print ==> $data(db)[0] $data(db)[1] $data(db)[2] $data(db)[3] $data(db)[4] $data(db)[5] $data(db)[6] $data(db)[7] $data(db)[8] $data(db)[9] $data(db)[10] $data(db)[11] $data(db)[12]
print $data(db)[13] $data(db)[14] $data(db)[15] $data(db)[16] $data(db)[17] $data(db)[18] $data(db)[19] $data(db)[20]
if $data(db)[19] != nostrict then
if $data(db)[19] != ready then
sleep 100
$loop_cnt = $loop_cnt + 1
goto check_db_ready

View File

@ -31,7 +31,7 @@ sql show databases
print ==> rows: $rows
print ==> $data(db)[0] $data(db)[1] $data(db)[2] $data(db)[3] $data(db)[4] $data(db)[5] $data(db)[6] $data(db)[7] $data(db)[8] $data(db)[9] $data(db)[10] $data(db)[11] $data(db)[12]
print $data(db)[13] $data(db)[14] $data(db)[15] $data(db)[16] $data(db)[17] $data(db)[18] $data(db)[19] $data(db)[20]
if $data(db)[19] != nostrict then
if $data(db)[19] != ready then
sleep 100
$loop_cnt = $loop_cnt + 1
goto check_db_ready

View File

@ -372,4 +372,92 @@ if $data25 != 3 then
return -1
endi
sql insert into t1 values(1648791213004,4,2,3,4.1) (1648791213006,5,4,7,9.1) (1648791213004,40,20,30,40.1) (1648791213005,4,2,3,4.1);
sleep 100
sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt;
# row 0
if $data01 != 4 then
print ======$data01
return -1
endi
if $data02 != 4 then
print ======$data02
return -1
endi
if $data03 != 14 then
print ======$data03
return -1
endi
if $data04 != 4 then
print ======$data04
return -1
endi
if $data05 != 3 then
print ======$data05
return -1
endi
sql insert into t1 values(1648791223004,4,2,3,4.1) (1648791233006,5,4,7,9.1) (1648791223004,40,20,30,40.1) (1648791233005,4,2,3,4.1);
sleep 100
sql select _wstartts, c1, c2 ,c3 ,c4, c5 from streamt;
# row 1
if $data11 != 4 then
print ======$data11
# return -1
endi
if $data12 != 4 then
print ======$data12
# return -1
endi
if $data13 != 10 then
print ======$data13
# return -1
endi
if $data14 != 3 then
print ======$data14
# return -1
endi
if $data15 != 1 then
print ======$data15
# return -1
endi
# row 2
if $data21 != 4 then
print ======$data21
# return -1
endi
if $data22 != 4 then
print ======$data22
# return -1
endi
if $data23 != 15 then
print ======$data23
# return -1
endi
if $data24 != 4 then
print ======$data24
# return -1
endi
if $data25 != 3 then
print ======$data25
# return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -468,14 +468,10 @@ class TDTestCase:
cfgPath = buildPath + "/../sim/psim/cfg"
tdLog.info("cfgPath: %s" % cfgPath)
#self.tmqCase8(cfgPath, buildPath)
#self.tmqCase9(cfgPath, buildPath)
#self.tmqCase10(cfgPath, buildPath)
self.tmqCase8(cfgPath, buildPath)
self.tmqCase9(cfgPath, buildPath)
self.tmqCase10(cfgPath, buildPath)
self.tmqCase11(cfgPath, buildPath)
# self.tmqCase12(cfgPath, buildPath)
# self.tmqCase13(cfgPath, buildPath)
# self.tmqCase14(cfgPath, buildPath)
def stop(self):
tdSql.close()