Merge branch 'main' into test/update_push_message

This commit is contained in:
Ping Xiao 2023-05-09 18:09:50 +08:00
commit 116f7dbbc7
43 changed files with 617 additions and 534 deletions

View File

@ -45,7 +45,7 @@ async fn main() -> anyhow::Result<()> {
taos.exec_many([
format!("DROP TOPIC IF EXISTS tmq_meters"),
format!("DROP DATABASE IF EXISTS `{db}`"),
format!("CREATE DATABASE `{db}`"),
format!("CREATE DATABASE `{db}` WAL_RETENTION_PERIOD 3600"),
format!("USE `{db}`"),
// create super table
format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(24))"),

View File

@ -190,6 +190,8 @@ STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int
SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo);
void verifyOffset(void *pWalReader, STqOffsetVal* pOffset);
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType);
void qStreamSetOpen(qTaskInfo_t tinfo);

View File

@ -132,7 +132,7 @@ typedef struct {
} SWalRef;
typedef struct {
int8_t scanUncommited;
// int8_t scanUncommited;
int8_t scanNotApplied;
int8_t scanMeta;
int8_t enableRef;

View File

@ -146,7 +146,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSC_CONN_KILLED TAOS_DEF_ERROR_CODE(0, 0x0215)
#define TSDB_CODE_TSC_SQL_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x0216)
#define TSDB_CODE_TSC_DB_NOT_SELECTED TAOS_DEF_ERROR_CODE(0, 0x0217)
#define TSDB_CODE_TSC_INVALID_TABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x0218)
//#define TSDB_CODE_TSC_INVALID_TABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x0218)
#define TSDB_CODE_TSC_EXCEED_SQL_LIMIT TAOS_DEF_ERROR_CODE(0, 0x0219)
#define TSDB_CODE_TSC_FILE_EMPTY TAOS_DEF_ERROR_CODE(0, 0x021A)
#define TSDB_CODE_TSC_LINE_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x021B)

View File

@ -169,6 +169,7 @@ typedef struct {
int32_t uid; // used for automatic create child table
SHashObj *childTables;
SHashObj *tableUids;
SHashObj *superTables;
SHashObj *pVgHash;
@ -242,6 +243,7 @@ int8_t smlGetTsTypeByLen(int32_t len);
SSmlTableInfo* smlBuildTableInfo(int numRows, const char* measure, int32_t measureLen);
SSmlSTableMeta* smlBuildSTableMeta(bool isDataFormat);
int32_t smlSetCTableName(SSmlTableInfo *oneTable);
void getTableUid(SSmlHandle *info, SSmlLineInfo *currElement, SSmlTableInfo *tinfo);
STableMeta* smlGetMeta(SSmlHandle *info, const void* measure, int32_t measureLen);
int32_t is_same_child_table_telnet(const void *a, const void *b);
int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t len);

View File

@ -195,6 +195,20 @@ int32_t smlSetCTableName(SSmlTableInfo *oneTable) {
return TSDB_CODE_SUCCESS;
}
void getTableUid(SSmlHandle *info, SSmlLineInfo *currElement, SSmlTableInfo *tinfo){
char key[TSDB_TABLE_NAME_LEN * 2 + 1] = {0};
size_t nLen = strlen(tinfo->childTableName);
memcpy(key, currElement->measure, currElement->measureLen);
memcpy(key + currElement->measureLen + 1, tinfo->childTableName, nLen);
void *uid = taosHashGet(info->tableUids, key, currElement->measureLen + 1 + nLen); // use \0 as separator for stable name and child table name
if (uid == NULL) {
tinfo->uid = info->uid++;
taosHashPut(info->tableUids, key, currElement->measureLen + 1 + nLen, &tinfo->uid, sizeof(uint64_t));
}else{
tinfo->uid = *(uint64_t*)uid;
}
}
SSmlSTableMeta *smlBuildSTableMeta(bool isDataFormat) {
SSmlSTableMeta *meta = (SSmlSTableMeta *)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
if (!meta) {
@ -1142,6 +1156,7 @@ void smlDestroyInfo(SSmlHandle *info) {
taosHashCleanup(info->pVgHash);
taosHashCleanup(info->childTables);
taosHashCleanup(info->superTables);
taosHashCleanup(info->tableUids);
for (int i = 0; i < taosArrayGetSize(info->tagJsonArray); i++) {
cJSON *tags = (cJSON *)taosArrayGetP(info->tagJsonArray, i);
@ -1192,6 +1207,7 @@ SSmlHandle *smlBuildSmlInfo(TAOS *taos) {
info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
info->childTables = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
info->tableUids = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
info->superTables = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
info->id = smlGenId();
@ -1202,7 +1218,7 @@ SSmlHandle *smlBuildSmlInfo(TAOS *taos) {
info->valueJsonArray = taosArrayInit(8, POINTER_BYTES);
info->preLineTagKV = taosArrayInit(8, sizeof(SSmlKv));
if (NULL == info->pVgHash || NULL == info->childTables || NULL == info->superTables) {
if (NULL == info->pVgHash || NULL == info->childTables || NULL == info->superTables || NULL == info->tableUids) {
uError("create SSmlHandle failed");
goto cleanup;
}
@ -1320,23 +1336,23 @@ static int32_t smlInsertData(SSmlHandle *info) {
if (info->pRequest->dbList == NULL) {
info->pRequest->dbList = taosArrayInit(1, TSDB_DB_FNAME_LEN);
}
void *data = taosArrayReserve(info->pRequest->dbList, 1);
memcpy(data, info->pRequest->pDb,
TSDB_DB_FNAME_LEN > strlen(info->pRequest->pDb) ? strlen(info->pRequest->pDb) : TSDB_DB_FNAME_LEN);
char *data = (char*)taosArrayReserve(info->pRequest->dbList, 1);
SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
tNameGetFullDbName(&pName, data);
SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL);
while (oneTable) {
SSmlTableInfo *tableData = *oneTable;
SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName));
tstrncpy(pName.tname, tableData->sTableName, tableData->sTableNameLen + 1);
if (info->pRequest->tableList == NULL) {
info->pRequest->tableList = taosArrayInit(1, sizeof(SName));
}
taosArrayPush(info->pRequest->tableList, &pName);
tstrncpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName) + 1);
SRequestConnInfo conn = {0};
conn.pTrans = info->taos->pAppInfo->pTransporter;
conn.requestId = info->pRequest->requestId;
@ -1428,6 +1444,7 @@ int32_t smlClearForRerun(SSmlHandle *info) {
taosHashClear(info->childTables);
taosHashClear(info->superTables);
taosHashClear(info->tableUids);
if (!info->dataFormat) {
if (unlikely(info->lines != NULL)) {
@ -1562,7 +1579,10 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL
do {
code = smlModifyDBSchemas(info);
if (code == 0 || code == TSDB_CODE_SML_INVALID_DATA || code == TSDB_CODE_PAR_TOO_MANY_COLUMNS
|| code == TSDB_CODE_PAR_INVALID_TAGS_NUM) break;
|| code == TSDB_CODE_PAR_INVALID_TAGS_NUM || code == TSDB_CODE_PAR_INVALID_TAGS_LENGTH
|| code == TSDB_CODE_PAR_INVALID_ROW_LENGTH || code == TSDB_CODE_MND_FIELD_VALUE_OVERFLOW) {
break;
}
taosMsleep(100);
uInfo("SML:0x%" PRIx64 " smlModifyDBSchemas retry code:%s, times:%d", info->id, tstrerror(code), retryNum);
} while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
@ -1647,7 +1667,8 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine,
info->cost.endTime = taosGetTimestampUs();
info->cost.code = code;
if (code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING ||
code == TSDB_CODE_PAR_VALUE_TOO_LONG || code == TSDB_CODE_MND_TRANS_CONFLICT) {
code == TSDB_CODE_PAR_VALUE_TOO_LONG || code == TSDB_CODE_MND_TRANS_CONFLICT ||
code == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
if (cnt++ >= 10) {
uInfo("SML:%" PRIx64 " retry:%d/10 end code:%d, msg:%s", info->id, cnt, code, tstrerror(code));
break;

View File

@ -778,7 +778,7 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo
tinfo->tags = taosArrayDup(preLineKV, NULL);
smlSetCTableName(tinfo);
tinfo->uid = info->uid++;
getTableUid(info, elements, tinfo);
if (info->dataFormat) {
info->currSTableMeta->uid = tinfo->uid;
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);

View File

@ -312,7 +312,7 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
}
smlSetCTableName(tinfo);
tinfo->uid = info->uid++;
getTableUid(info, currElement, tinfo);
if (info->dataFormat) {
info->currSTableMeta->uid = tinfo->uid;
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);

View File

@ -206,7 +206,7 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
tinfo->tags = taosArrayDup(preLineKV, NULL);
smlSetCTableName(tinfo);
tinfo->uid = info->uid++;
getTableUid(info, elements, tinfo);
if (info->dataFormat) {
info->currSTableMeta->uid = tinfo->uid;
tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta);

View File

@ -120,6 +120,7 @@ int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const
pColumnInfoData->varmeta.length += dataLen;
} else {
memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex, pData, pColumnInfoData->info.bytes);
colDataClearNull_f(pColumnInfoData->nullbitmap, rowIndex);
}
return 0;
@ -1949,12 +1950,11 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) {
}
}
}
#endif
// for debug
char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) {
int32_t size = 2048;
int32_t size = 2048*1024;
*pDataBuf = taosMemoryCalloc(size, 1);
char* dumpBuf = *pDataBuf;
char pBuf[128] = {0};
@ -1970,7 +1970,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
if (len >= size - 1) return dumpBuf;
for (int32_t j = 0; j < rows; j++) {
len += snprintf(dumpBuf + len, size - len, "%s |", flag);
len += snprintf(dumpBuf + len, size - len, "%s %d|", flag, j);
if (len >= size - 1) return dumpBuf;
for (int32_t k = 0; k < colNum; k++) {

View File

@ -932,7 +932,7 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq
return -1;
}
if(pDst->nextColId < 0 && pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags){
if(pDst->nextColId < 0 || pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags){
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
return -1;
}
@ -1163,8 +1163,8 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p
if (mndAllocStbSchemas(pOld, pNew) != 0) {
return -1;
}
if(pNew->nextColId < 0 && pNew->nextColId >= 0x7fff - ntags){
if(pNew->nextColId < 0 || pNew->nextColId >= 0x7fff - ntags){
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
return -1;
}
@ -1476,7 +1476,7 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray
return -1;
}
if(pNew->nextColId < 0 && pNew->nextColId >= 0x7fff - ncols){
if(pNew->nextColId < 0 || pNew->nextColId >= 0x7fff - ncols){
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
return -1;
}

View File

@ -133,10 +133,10 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub,
const SMqRebOutputVg *pRebVg) {
if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
return -1;
}
// if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
// terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
// return -1;
// }
void *buf;
int32_t tlen;
@ -269,6 +269,18 @@ static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) {
}
}
static void putNoTransferToOutput(SMqRebOutputObj *pOutput, SMqConsumerEp *pConsumerEp){
for(int i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++){
SMqVgEp *pVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i);
SMqRebOutputVg outputVg = {
.oldConsumerId = pConsumerEp->consumerId,
.newConsumerId = pConsumerEp->consumerId,
.pVgEp = pVgEp,
};
taosArrayPush(pOutput->rebVgs, &outputVg);
}
}
static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt,
int32_t imbConsumerNum) {
const char *pSubKey = pOutput->pSub->key;
@ -290,24 +302,19 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas
taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId);
if (consumerVgNum > minVgCnt) {
if (imbCnt < imbConsumerNum) {
if (consumerVgNum == minVgCnt + 1) {
imbCnt++;
continue;
} else {
// pop until equal minVg + 1
while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
SMqRebOutputVg outputVg = {
.oldConsumerId = pConsumerEp->consumerId,
.newConsumerId = -1,
.pVgEp = pVgEp,
};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId,
pConsumerEp->consumerId);
}
imbCnt++;
// pop until equal minVg + 1
while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
SMqRebOutputVg outputVg = {
.oldConsumerId = pConsumerEp->consumerId,
.newConsumerId = -1,
.pVgEp = pVgEp,
};
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId,
pConsumerEp->consumerId);
}
imbCnt++;
} else {
// all the remain consumers should only have the number of vgroups, which is equalled to the value of minVg
while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
@ -323,6 +330,7 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas
}
}
}
putNoTransferToOutput(pOutput, pConsumerEp);
}
}

View File

@ -236,7 +236,7 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) {
SDB_SET_BINARY(pRaw, dataPos, key, keyLen, _OVER);
SDB_SET_INT32(pRaw, dataPos, *useDb, _OVER)
useDb = taosHashIterate(pUser->writeTbs, useDb);
useDb = taosHashIterate(pUser->useDbs, useDb);
}
SDB_SET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER)

View File

@ -255,14 +255,13 @@ int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList);
int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList);
int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id);
int32_t tqNextBlock(STqReader *pReader, SSDataBlock* pBlock);
int32_t tqNextBlockInWal(STqReader* pReader);
int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData);
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
bool tqNextBlockImpl(STqReader *pReader);
int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData);
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver);
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader, SSubmitTbData **pSubmitTbDataRet);
int32_t tqRetrieveDataBlock(STqReader *pReader, SSubmitTbData **pSubmitTbDataRet);
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet);
int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg);

View File

@ -212,7 +212,6 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg
int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit);
int32_t tqProcessSubmitReqForSubscribe(STQ* pTq);
int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);

View File

@ -445,6 +445,7 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t
}
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
int ret = 0;
SMqRebVgReq req = {0};
tDecodeSMqRebVgReq(msg, &req);
@ -463,8 +464,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
if (req.newConsumerId == -1) {
tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
taosMemoryFree(req.qmsg);
return 0;
goto end;
}
STqHandle tqHandle = {0};
@ -481,8 +481,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
// TODO version should be assigned and refed during preprocess
SWalRef* pRef = walRefCommittedVer(pVnode->pWal);
if (pRef == NULL) {
taosMemoryFree(req.qmsg);
return -1;
ret = -1;
goto end;
}
int64_t ver = pRef->refVer;
@ -534,49 +534,42 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
pHandle->consumerId, oldConsumerId);
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
taosMemoryFree(req.qmsg);
return -1;
}
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
goto end;
} else {
if (pHandle->consumerId == req.newConsumerId) { // do nothing
tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
atomic_store_32(&pHandle->epoch, -1);
atomic_add_fetch_32(&pHandle->epoch, 1);
taosMemoryFree(req.qmsg);
return tqMetaSaveHandle(pTq, req.subKey, pHandle);
} else {
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
req.newConsumerId);
// kill executing task
qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
if (pTaskInfo != NULL) {
qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
}
taosWLockLatch(&pTq->lock);
atomic_store_32(&pHandle->epoch, 0);
// remove if it has been register in the push manager, and return one empty block to consumer
tqUnregisterPushHandle(pTq, pHandle);
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
qStreamCloseTsdbReader(pTaskInfo);
}
taosWUnLockLatch(&pTq->lock);
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
taosMemoryFree(req.qmsg);
return -1;
}
atomic_store_32(&pHandle->epoch, 0);
}
// kill executing task
qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
if (pTaskInfo != NULL) {
qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
}
taosWLockLatch(&pTq->lock);
// remove if it has been register in the push manager, and return one empty block to consumer
tqUnregisterPushHandle(pTq, pHandle);
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
qStreamCloseTsdbReader(pTaskInfo);
}
taosWUnLockLatch(&pTq->lock);
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
goto end;
}
end:
taosMemoryFree(req.qmsg);
return 0;
return ret;
}
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
@ -1073,12 +1066,15 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
taosWLockLatch(&pTq->lock);
if(taosHashGetSize(pTq->pPushMgr) > 0){
void *pIter = taosHashIterate(pTq->pPushMgr, NULL);
while(pIter){
if (taosHashGetSize(pTq->pPushMgr) > 0) {
void* pIter = taosHashIterate(pTq->pPushMgr, NULL);
while (pIter) {
STqHandle* pHandle = *(STqHandle**)pIter;
tqDebug("vgId:%d start set submit for pHandle:%p, consume id:0x%"PRIx64, vgId, pHandle, pHandle->consumerId);
if(ASSERT(pHandle->msg != NULL)){
tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId);
if (ASSERT(pHandle->msg != NULL)) {
tqError("pHandle->msg should not be null");
break;
}else{
@ -1087,77 +1083,15 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
taosMemoryFree(pHandle->msg);
pHandle->msg = NULL;
}
pIter = taosHashIterate(pTq->pPushMgr, pIter);
}
taosHashClear(pTq->pPushMgr);
}
// unlock
taosWUnLockLatch(&pTq->lock);
return 0;
}
int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) {
#if 0
void* pIter = NULL;
SStreamDataSubmit2* pSubmit = streamDataSubmitNew(submit, STREAM_INPUT__DATA_SUBMIT);
if (pSubmit == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("failed to create data submit for stream since out of memory");
saveOffsetForAllTasks(pTq, submit.ver);
return -1;
}
SArray* pInputQueueFullTasks = taosArrayInit(4, POINTER_BYTES);
while (1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) {
continue;
}
if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) {
tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId,
pTask->status.taskStatus);
continue;
}
// check if offset value exists
char key[128] = {0};
createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId);
if (tInputQueueIsFull(pTask)) {
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key);
int64_t ver = submit.ver;
if (pOffset == NULL) {
doSaveTaskOffset(pTq->pOffsetStore, key, submit.ver);
} else {
ver = pOffset->val.version;
}
tqDebug("s-task:%s input queue is full, discard submit block, ver:%" PRId64, pTask->id.idStr, ver);
taosArrayPush(pInputQueueFullTasks, &pTask);
continue;
}
// check if offset value exists
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key);
ASSERT(pOffset == NULL);
addSubmitBlockNLaunchTask(pTq->pOffsetStore, pTask, pSubmit, key, submit.ver);
}
streamDataSubmitDestroy(pSubmit);
taosFreeQitem(pSubmit);
#endif
tqStartStreamTasks(pTq);
return 0;
}

View File

@ -16,250 +16,10 @@
#include "tq.h"
#include "vnd.h"
#if 0
void tqTmrRspFunc(void* param, void* tmrId) {
STqHandle* pHandle = (STqHandle*)param;
atomic_store_8(&pHandle->pushHandle.tmrStopped, 1);
}
static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubmit** ppSubmit, SMqDataRsp* pRsp) {
SStreamDataSubmit* pSubmit = *ppSubmit;
while (pSubmit != NULL) {
if (tqLogScanExec(pTq, &pHandle->execHandle, pSubmit->data, pRsp, 0) < 0) {
}
// update processed
atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver);
streamQueueProcessSuccess(&pHandle->pushHandle.inputQ);
streamDataSubmitDestroy(pSubmit);
if (pRsp->blockNum > 0) {
*ppSubmit = pSubmit;
return 0;
} else {
pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ);
}
}
*ppSubmit = pSubmit;
return -1;
}
int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) {
SMqDataRsp rsp = {0};
// 1. guard and set status executing
int8_t execStatus = atomic_val_compare_exchange_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE,
TASK_EXEC_STATUS__EXECUTING);
if (execStatus == TASK_EXEC_STATUS__IDLE) {
SStreamDataSubmit* pSubmit = NULL;
// 2. check processedVer
// 2.1. if not missed, get msg from queue
// 2.2. if missed, scan wal
pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ);
while (pHandle->pushHandle.processedVer <= pSubmit->ver) {
// read from wal
}
while (pHandle->pushHandle.processedVer > pSubmit->ver + 1) {
streamQueueProcessSuccess(&pHandle->pushHandle.inputQ);
streamDataSubmitDestroy(pSubmit);
pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ);
if (pSubmit == NULL) break;
}
// 3. exec, after each success, update processed ver
// first run
if (tqLoopExecFromQueue(pTq, pHandle, &pSubmit, &rsp) == 0) {
goto SEND_RSP;
}
// set exec status closing
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__CLOSING);
// second run
if (tqLoopExecFromQueue(pTq, pHandle, &pSubmit, &rsp) == 0) {
goto SEND_RSP;
}
// set exec status idle
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE);
}
SEND_RSP:
// 4. if get result
// 4.1 set exec input status blocked and exec status idle
atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE);
// 4.2 rpc send
rsp.rspOffset = pHandle->pushHandle.processedVer;
/*if (tqSendPollRsp(pTq, pMsg, pReq, &rsp) < 0) {*/
/*return -1;*/
/*}*/
// 4.3 clear rpc info
memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo));
return 0;
}
int32_t tqOpenPushHandle(STQ* pTq, STqHandle* pHandle) {
memset(&pHandle->pushHandle, 0, sizeof(STqPushHandle));
pHandle->pushHandle.inputQ.queue = taosOpenQueue();
pHandle->pushHandle.inputQ.qall = taosAllocateQall();
if (pHandle->pushHandle.inputQ.queue == NULL || pHandle->pushHandle.inputQ.qall == NULL) {
if (pHandle->pushHandle.inputQ.queue) {
taosCloseQueue(pHandle->pushHandle.inputQ.queue);
}
if (pHandle->pushHandle.inputQ.qall) {
taosFreeQall(pHandle->pushHandle.inputQ.qall);
}
return -1;
}
return 0;
}
int32_t tqPreparePush(STQ* pTq, STqHandle* pHandle, int64_t reqId, const SRpcHandleInfo* pInfo, int64_t processedVer,
int64_t timeout) {
memcpy(&pHandle->pushHandle.rpcInfo, pInfo, sizeof(SRpcHandleInfo));
atomic_store_64(&pHandle->pushHandle.reqId, reqId);
atomic_store_64(&pHandle->pushHandle.processedVer, processedVer);
atomic_store_8(&pHandle->pushHandle.inputStatus, TASK_INPUT_STATUS__NORMAL);
atomic_store_8(&pHandle->pushHandle.tmrStopped, 0);
taosTmrReset(tqTmrRspFunc, (int32_t)timeout, pHandle, tqMgmt.timer, &pHandle->pushHandle.timerId);
return 0;
}
int32_t tqEnqueue(STqHandle* pHandle, SStreamDataSubmit* pSubmit) {
int8_t inputStatus = atomic_load_8(&pHandle->pushHandle.inputStatus);
if (inputStatus == TASK_INPUT_STATUS__NORMAL) {
SStreamDataSubmit* pSubmitClone = streamSubmitBlockClone(pSubmit);
if (pSubmitClone == NULL) {
return -1;
}
taosWriteQitem(pHandle->pushHandle.inputQ.queue, pSubmitClone);
return 0;
}
return -1;
}
int32_t tqSendExecReq(STQ* pTq, STqHandle* pHandle) {
//
return 0;
}
int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) {
if (msgType != TDMT_VND_SUBMIT) return 0;
void* pIter = NULL;
STqHandle* pHandle = NULL;
SSubmitReq* pReq = (SSubmitReq*)msg;
int32_t workerId = 4;
int64_t fetchOffset = ver;
while (1) {
pIter = taosHashIterate(pTq->pushMgr, pIter);
if (pIter == NULL) break;
pHandle = *(STqHandle**)pIter;
taosWLockLatch(&pHandle->pushHandle.lock);
SMqDataRsp rsp = {0};
rsp.reqOffset = pHandle->pushHandle.reqOffset;
rsp.blockData = taosArrayInit(0, sizeof(void*));
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
if (msgType == TDMT_VND_SUBMIT) {
tqLogScanExec(pTq, &pHandle->execHandle, pReq, &rsp, workerId);
} else {
tqError("tq push unexpected msg type %d", msgType);
}
if (rsp.blockNum == 0) {
taosWUnLockLatch(&pHandle->pushHandle.lock);
continue;
}
rsp.rspOffset = fetchOffset;
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
// todo free
return -1;
}
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pHandle->pushHandle.epoch;
((SMqRspHead*)buf)->consumerId = pHandle->pushHandle.consumerId;
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqDataBlkRsp(&abuf, &rsp);
SRpcMsg resp = {
.info = pHandle->pushHandle.rpcInfo,
.pCont = buf,
.contLen = tlen,
.code = 0,
};
tmsgSendRsp(&resp);
memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo));
taosWUnLockLatch(&pHandle->pushHandle.lock);
tqDebug("vgId:%d offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, req:%" PRId64 ", rsp:%" PRId64,
TD_VID(pTq->pVnode), fetchOffset, pHandle->pushHandle.consumerId, pHandle->pushHandle.epoch, rsp.blockNum,
rsp.reqOffset, rsp.rspOffset);
// TODO destroy
taosArrayDestroy(rsp.blockData);
taosArrayDestroy(rsp.blockDataLen);
}
return 0;
}
#endif
int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
// void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg));
// int32_t len = msgLen - sizeof(SSubmitReq2Msg);
// int32_t vgId = TD_VID(pTq->pVnode);
if (msgType == TDMT_VND_SUBMIT) {
tqProcessSubmitReqForSubscribe(pTq);
// lock push mgr to avoid potential msg lost
// taosWLockLatch(&pTq->lock);
//
// int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr);
// if (numOfRegisteredPush > 0) {
// tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d",
// vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush);
//
// void* data = taosMemoryMalloc(len);
// if (data == NULL) {
// terrno = TSDB_CODE_OUT_OF_MEMORY;
// tqError("failed to copy data for stream since out of memory, vgId:%d", vgId);
// taosWUnLockLatch(&pTq->lock);
// return -1;
// }
//
// memcpy(data, pReq, len);
//
// SArray* cachedKey = taosArrayInit(0, sizeof(SItem));
// void* pIter = NULL;
//
// while (1) {
// pIter = taosHashIterate(pTq->pPushMgr, pIter);
// if (pIter == NULL) {
// break;
// }
//
// STqPushEntry* pPushEntry = *(STqPushEntry**)pIter;
//
// STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey));
// if (pHandle == NULL) {
// tqDebug("vgId:%d, failed to find handle %s in pushing data to consumer, ignore", pTq->pVnode->config.vgId,
// pPushEntry->subKey);
// continue;
// }
//
// STqExecHandle* pExec = &pHandle->execHandle;
// doPushDataForEntry(pIter, pExec, pTq, ver, vgId, data, len, cachedKey);
// }
//
// doRemovePushedEntry(cachedKey, pTq);
// taosArrayDestroyEx(cachedKey, freeItem);
// taosMemoryFree(data);
// }
//
// // unlock
// taosWUnLockLatch(&pTq->lock);
}
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
@ -275,8 +35,7 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
}
if (msgType == TDMT_VND_SUBMIT) {
SPackedData submit = {0};
tqProcessSubmitReq(pTq, submit);
tqStartStreamTasks(pTq);
}
if (msgType == TDMT_VND_DELETE) {
@ -287,16 +46,16 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
return 0;
}
int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode);
STqHandle* pHandle = (STqHandle*) handle;
if(pHandle->msg == NULL){
STqHandle* pHandle = (STqHandle*)handle;
if (pHandle->msg == NULL) {
pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg));
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
pHandle->msg->pCont = rpcMallocCont(pMsg->contLen);
}else{
void *tmp = pHandle->msg->pCont;
} else {
void* tmp = pHandle->msg->pCont;
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
pHandle->msg->pCont = tmp;
}
@ -304,7 +63,8 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) {
memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
pHandle->msg->contLen = pMsg->contLen;
int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES);
tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret,
pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
return 0;
}
@ -314,6 +74,7 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) {
int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey));
tqError("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId);
if(pHandle->msg != NULL) {
tqPushDataRsp(pTq, pHandle);
@ -321,5 +82,6 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) {
taosMemoryFree(pHandle->msg);
pHandle->msg = NULL;
}
return 0;
}

View File

@ -332,6 +332,7 @@ int32_t tqNextBlockInWal(STqReader* pReader) {
if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) {
// try next message in wal file
// todo always retry to avoid read failure caused by wal file deletion
if (walNextValidMsg(pWalReader) < 0) {
return FETCH_TYPE__NONE;
}
@ -374,7 +375,7 @@ int32_t tqNextBlockInWal(STqReader* pReader) {
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if (pReader->tbIdHash == NULL) {
int32_t code = tqRetrieveDataBlock(pReader->pResBlock, pReader, NULL);
int32_t code = tqRetrieveDataBlock(pReader, NULL);
if (code == TSDB_CODE_SUCCESS && pReader->pResBlock->info.rows > 0) {
return FETCH_TYPE__DATA;
}
@ -384,7 +385,7 @@ int32_t tqNextBlockInWal(STqReader* pReader) {
if (ret != NULL) {
tqDebug("tq reader return submit block, uid:%"PRId64", ver:%"PRId64, pSubmitTbData->uid, pReader->msg.ver);
int32_t code = tqRetrieveDataBlock(pReader->pResBlock, pReader, NULL);
int32_t code = tqRetrieveDataBlock(pReader, NULL);
if (code == TSDB_CODE_SUCCESS && pReader->pResBlock->info.rows > 0) {
return FETCH_TYPE__DATA;
}
@ -399,31 +400,6 @@ int32_t tqNextBlockInWal(STqReader* pReader) {
}
}
int32_t tqNextBlock(STqReader* pReader, SSDataBlock* pBlock) {
while (1) {
if (pReader->msg.msgStr == NULL) {
if (walNextValidMsg(pReader->pWalReader) < 0) {
return FETCH_TYPE__NONE;
}
void* pBody = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
int64_t ver = pReader->pWalReader->pHead->head.version;
tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver);
}
while (tqNextBlockImpl(pReader)) {
int32_t code = tqRetrieveDataBlock(pReader->pResBlock, pReader, NULL);
if (code != TSDB_CODE_SUCCESS || pBlock->info.rows == 0) {
continue;
}
return FETCH_TYPE__DATA;
}
}
}
int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) {
pReader->msg.msgStr = msgStr;
pReader->msg.msgLen = msgLen;
@ -527,7 +503,7 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap
return 0;
}
int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) {
int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) {
tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++);
@ -535,6 +511,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa
*pSubmitTbDataRet = pSubmitTbData;
}
SSDataBlock* pBlock = pReader->pResBlock;
blockDataCleanup(pBlock);
int32_t sversion = pSubmitTbData->sver;
@ -603,7 +580,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa
SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId);
int32_t code = blockDataAppendColInfo(pBlock, &colInfo);
if (code != TSDB_CODE_SUCCESS) {
goto FAIL;
return -1;
}
i++;
j++;
@ -622,7 +599,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa
if (blockDataEnsureCapacity(pBlock, numOfRows) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
return -1;
}
pBlock->info.rows = numOfRows;
@ -638,7 +615,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa
while (targetIdx < colActual) {
if (sourceIdx >= numOfCols) {
tqError("tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
goto FAIL;
return -1;
}
SColData* pCol = taosArrayGet(pCols, sourceIdx);
@ -647,7 +624,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa
if (pCol->nVal != numOfRows) {
tqError("tqRetrieveDataBlock pCol->nVal:%d != numOfRows:%d", pCol->nVal, numOfRows);
goto FAIL;
return -1;
}
if (pCol->cid < pColData->info.colId) {
@ -661,14 +638,14 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa
memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
varDataSetLen(val, colVal.value.nData);
if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL;
return -1;
}
} else {
colDataSetNULL(pColData, i);
}
} else {
if (colDataAppend(pColData, i, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL;
return -1;
}
}
}
@ -710,14 +687,14 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa
memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData);
varDataSetLen(val, colVal.value.nData);
if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL;
return -1;
}
} else {
colDataSetNULL(pColData, i);
}
} else {
if (colDataAppend(pColData, i, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) {
goto FAIL;
return -1;
}
}
@ -735,10 +712,6 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa
}
return 0;
FAIL:
blockDataFreeRes(pBlock);
return -1;
}
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) {

View File

@ -66,9 +66,10 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, in
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) {
const int32_t MAX_ROWS_TO_RETURN = 4096;
int32_t vgId = TD_VID(pTq->pVnode);
int32_t code = 0;
int32_t totalRows = 0;
int32_t vgId = TD_VID(pTq->pVnode);
int32_t code = 0;
int32_t totalRows = 0;
const STqExecHandle* pExec = &pHandle->execHandle;
qTaskInfo_t task = pExec->task;

View File

@ -165,17 +165,24 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
uint64_t consumerId = pRequest->consumerId;
int32_t vgId = TD_VID(pTq->pVnode);
int code = 0;
SMqDataRsp dataRsp = {0};
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
qTaskInfo_t task = pHandle->execHandle.task;
if(qTaskIsExecuting(task)){
code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
tDeleteSMqDataRsp(&dataRsp);
return code;
}
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
int code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
if(code != 0) {
goto end;
}
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
// lock
@ -246,6 +253,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if (offset->type == TMQ_OFFSET__LOG) {
verifyOffset(pHandle->pWalReader, offset);
int64_t fetchVer = offset->version + 1;
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
if (pCkHead == NULL) {
@ -361,11 +369,10 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
// this is a normal subscribe requirement
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
} else { // todo handle the case where re-balance occurs.
// for taosx
return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
}
// todo handle the case where re-balance occurs.
// for taosx
return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
}
int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) {

View File

@ -448,7 +448,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
walApplyVer(pVnode->pWal, version);
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
// /*vInfo("vgId:%d, push msg end", pVnode->config.vgId);*/
vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}

View File

@ -59,7 +59,7 @@ typedef struct {
STqOffsetVal currentOffset; // for tmq
SMqMetaRsp metaRsp; // for tmq fetching meta
int64_t snapshotVer;
SPackedData submit; // todo remove it
// SPackedData submit; // todo remove it
SSchemaWrapper* schema;
char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor
int8_t recoverStep;

View File

@ -1058,6 +1058,14 @@ void qStreamSetOpen(qTaskInfo_t tinfo) {
pOperator->status = OP_NOT_OPENED;
}
void verifyOffset(void *pWalReader, STqOffsetVal* pOffset){
// if offset version is small than first version , let's seek to first version
int64_t firstVer = walGetFirstVer(((SWalReader*)pWalReader)->pWal);
if (pOffset->version + 1 < firstVer){
pOffset->version = firstVer - 1;
}
}
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
@ -1080,15 +1088,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
STableListInfo* pTableListInfo = pScanBaseInfo->pTableListInfo;
if (pOffset->type == TMQ_OFFSET__LOG) {
// todo refactor: move away
tsdbReaderClose(pScanBaseInfo->dataReader);
pScanBaseInfo->dataReader = NULL;
// let's seek to the next version in wal file
int64_t firstVer = walGetFirstVer(pInfo->tqReader->pWalReader->pWal);
if (pOffset->version + 1 < firstVer){
pOffset->version = firstVer - 1;
}
verifyOffset(pInfo->tqReader->pWalReader, pOffset);
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) {
qError("tqSeekVer failed ver:%" PRId64 ", %s", pOffset->version + 1, id);
return -1;

View File

@ -82,7 +82,7 @@ static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SC
static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag,
bool createDummyCol);
static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
SGroupResInfo* pGroupResInfo);
SGroupResInfo* pGroupResInfo, int32_t threshold);
SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) {
SFilePage* pData = NULL;
@ -776,7 +776,7 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos
}
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
SGroupResInfo* pGroupResInfo) {
SGroupResInfo* pGroupResInfo, int32_t threshold) {
SExprInfo* pExprInfo = pSup->pExprInfo;
int32_t numOfExprs = pSup->numOfExprs;
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
@ -825,6 +825,9 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
releaseBufPage(pBuf, page);
pBlock->info.rows += pRow->numOfRows;
if (pBlock->info.rows >= threshold) {
break;
}
}
qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
@ -850,7 +853,7 @@ void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGr
// clear the existed group id
pBlock->info.id.groupId = 0;
ASSERT(!pbInfo->mergeResultBlock);
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold);
void* tbname = NULL;
if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
@ -877,10 +880,10 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
// clear the existed group id
pBlock->info.id.groupId = 0;
if (!pbInfo->mergeResultBlock) {
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold);
} else {
while (hasRemainResults(pGroupResInfo)) {
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold);
if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
break;
}

View File

@ -1636,6 +1636,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
qDebug("start to exec queue scan, %s", id);
#if 0
if (pTaskInfo->streamInfo.submit.msgStr != NULL) {
if (pInfo->tqReader->msg.msgStr == NULL) {
SPackedData submit = pTaskInfo->streamInfo.submit;
@ -1649,7 +1650,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
while (tqNextBlockImpl(pInfo->tqReader)) {
int32_t code = tqRetrieveDataBlock(pInfo->tqReader->pResBlock, pInfo->tqReader, NULL);
int32_t code = tqRetrieveDataBlock(pInfo->tqReader, NULL);
if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
continue;
}
@ -1665,6 +1666,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
pTaskInfo->streamInfo.submit = (SPackedData){0};
return NULL;
}
#endif
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
@ -1682,10 +1684,12 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) {
return NULL;
}
tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pTaskInfo->streamInfo.snapshotVer);
}
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) {
while (1) {
int32_t type = tqNextBlockInWal(pInfo->tqReader);
SSDataBlock* pRes = pInfo->tqReader->pResBlock;
@ -2071,7 +2075,7 @@ FETCH_NEXT_BLOCK:
blockDataCleanup(pInfo->pRes);
while (tqNextBlockImpl(pInfo->tqReader)) {
int32_t code = tqRetrieveDataBlock(pInfo->tqReader->pResBlock, pInfo->tqReader, NULL);
int32_t code = tqRetrieveDataBlock(pInfo->tqReader, NULL);
if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) {
continue;
}
@ -2109,7 +2113,6 @@ FETCH_NEXT_BLOCK:
// record the scan action.
pInfo->numOfExec++;
pOperator->resultInfo.totalRows += pBlockInfo->rows;
// printDataBlock(pInfo->pRes, "stream scan");
qDebug("scan rows: %" PRId64, pBlockInfo->rows);
if (pBlockInfo->rows > 0) {

View File

@ -6118,17 +6118,50 @@ static bool isEventWindowQuery(SSelectStmt* pSelect) {
return NULL != pSelect->pWindow && QUERY_NODE_EVENT_WINDOW == nodeType(pSelect->pWindow);
}
static bool hasJsonTypeProjection(SSelectStmt* pSelect) {
SNode* pProj = NULL;
FOREACH(pProj, pSelect->pProjectionList) {
if (TSDB_DATA_TYPE_JSON == ((SExprNode*)pProj)->resType.type) {
return true;
}
}
return false;
}
static EDealRes hasColumnOrPseudoColumn(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
*(bool*)pContext = true;
return DEAL_RES_END;
}
if (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsPseudoColumnFunc(((SFunctionNode*)pNode)->funcId)) {
*(bool*)pContext = true;
return DEAL_RES_END;
}
return DEAL_RES_CONTINUE;
}
static int32_t subtableExprHasColumnOrPseudoColumn(SNode* pNode) {
bool hasColumn = false;
nodesWalkExprPostOrder(pNode, hasColumnOrPseudoColumn, &hasColumn);
return hasColumn;
}
static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type ||
!pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList ||
crossTableWithUdaf(pSelect) || isEventWindowQuery(pSelect)) {
crossTableWithUdaf(pSelect) || isEventWindowQuery(pSelect) || hasJsonTypeProjection(pSelect)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
}
if (NULL != pSelect->pSubtable && TSDB_DATA_TYPE_VARCHAR != ((SExprNode*)pSelect->pSubtable)->resType.type) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"SUBTABLE expression must be of VARCHAR type");
}
if (NULL != pSelect->pSubtable && 0 == LIST_LENGTH(pSelect->pPartitionByList) && subtableExprHasColumnOrPseudoColumn(pSelect->pSubtable)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"SUBTABLE expression must not has column when no partition by clause");
}
if (NULL == pSelect->pWindow && STREAM_TRIGGER_AT_ONCE != pStmt->pOptions->triggerType) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"The trigger mode of non window query can only be AT_ONCE");

View File

@ -920,6 +920,10 @@ TEST_F(ParserInitialCTest, createStreamSemanticCheck) {
run("CREATE STREAM s1 INTO st1 AS SELECT PERCENTILE(c1, 30) FROM t1 INTERVAL(10S)",
TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC);
run("CREATE STREAM s2 INTO st1 AS SELECT ts, to_json('{c1:1}') FROM st1 PARTITION BY TBNAME",
TSDB_CODE_PAR_INVALID_STREAM_QUERY);
run("CREATE STREAM s3 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tbname)) "
"AS SELECT _WSTART wstart, COUNT(*) cnt FROM st1 INTERVAL(10S)", TSDB_CODE_PAR_INVALID_STREAM_QUERY);
}
/*

View File

@ -1,4 +1,4 @@
enable_testing()
#add_subdirectory(filter)
add_subdirectory(filter)
add_subdirectory(scalar)

View File

@ -33,6 +33,7 @@
#include "os.h"
#include "filter.h"
#include "filterInt.h"
#include "nodes.h"
#include "scalar.h"
#include "stub.h"
@ -344,6 +345,7 @@ TEST(timerangeTest, greater_and_lower_not_strict) {
nodesDestroyNode(logicNode1);
}
#if 0
TEST(columnTest, smallint_column_greater_double_value) {
SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL;
int16_t leftv[5] = {1, 2, 3, 4, 5};
@ -1337,6 +1339,127 @@ TEST(scalarModelogicTest, diff_columns_or_and_or) {
nodesDestroyNode(logicNode1);
blockDataDestroy(src);
}
#endif
template <class SignedT, class UnsignedT>
int32_t compareSignedWithUnsigned(SignedT l, UnsignedT r) {
if (l < 0) return -1;
auto l_uint64 = static_cast<uint64_t>(l);
auto r_uint64 = static_cast<uint64_t>(r);
if (l_uint64 < r_uint64) return -1;
if (l_uint64 > r_uint64) return 1;
return 0;
}
template <class UnsignedT, class SignedT>
int32_t compareUnsignedWithSigned(UnsignedT l, SignedT r) {
if (r < 0) return 1;
auto l_uint64 = static_cast<uint64_t>(l);
auto r_uint64 = static_cast<uint64_t>(r);
if (l_uint64 < r_uint64) return -1;
if (l_uint64 > r_uint64) return 1;
return 0;
}
template <class SignedT, class UnsignedT>
void doCompareWithValueRange_SignedWithUnsigned(__compar_fn_t fp) {
int32_t signedMin = -10, signedMax = 10;
int32_t unsignedMin = 0, unsignedMax = 10;
for (SignedT l = signedMin; l <= signedMax; ++l) {
for (UnsignedT r = unsignedMin; r <= unsignedMax; ++r) {
ASSERT_EQ(fp(&l, &r), compareSignedWithUnsigned(l, r));
}
}
}
template <class UnsignedT, class SignedT>
void doCompareWithValueRange_UnsignedWithSigned(__compar_fn_t fp) {
int32_t signedMin = -10, signedMax = 10;
int32_t unsignedMin = 0, unsignedMax = 10;
for (UnsignedT l = unsignedMin; l <= unsignedMax; ++l) {
for (SignedT r = signedMin; r <= signedMax; ++r) {
ASSERT_EQ(fp(&l, &r), compareUnsignedWithSigned(l, r));
}
}
}
template <class LType>
void doCompareWithValueRange_OnlyLeftType(__compar_fn_t fp, int32_t rType) {
switch (rType) {
case TSDB_DATA_TYPE_UTINYINT:
doCompareWithValueRange_SignedWithUnsigned<LType, uint8_t>(fp);
break;
case TSDB_DATA_TYPE_USMALLINT:
doCompareWithValueRange_SignedWithUnsigned<LType, uint16_t>(fp);
break;
case TSDB_DATA_TYPE_UINT:
doCompareWithValueRange_SignedWithUnsigned<LType, uint32_t>(fp);
break;
case TSDB_DATA_TYPE_UBIGINT:
doCompareWithValueRange_SignedWithUnsigned<LType, uint64_t>(fp);
break;
case TSDB_DATA_TYPE_TINYINT:
doCompareWithValueRange_UnsignedWithSigned<LType, int8_t>(fp);
break;
case TSDB_DATA_TYPE_SMALLINT:
doCompareWithValueRange_UnsignedWithSigned<LType, int16_t>(fp);
break;
case TSDB_DATA_TYPE_INT:
doCompareWithValueRange_UnsignedWithSigned<LType, int32_t>(fp);
break;
case TSDB_DATA_TYPE_BIGINT:
doCompareWithValueRange_UnsignedWithSigned<LType, int64_t>(fp);
break;
default:
FAIL();
}
}
void doCompare(const std::vector<int32_t> &lTypes, const std::vector<int32_t> &rTypes, int32_t oper) {
for (int i = 0; i < lTypes.size(); ++i) {
for (int j = 0; j < rTypes.size(); ++j) {
auto fp = filterGetCompFuncEx(lTypes[i], rTypes[j], oper);
switch (lTypes[i]) {
case TSDB_DATA_TYPE_TINYINT:
doCompareWithValueRange_OnlyLeftType<int8_t>(fp, rTypes[j]);
break;
case TSDB_DATA_TYPE_SMALLINT:
doCompareWithValueRange_OnlyLeftType<int16_t>(fp, rTypes[j]);
break;
case TSDB_DATA_TYPE_INT:
doCompareWithValueRange_OnlyLeftType<int32_t>(fp, rTypes[j]);
break;
case TSDB_DATA_TYPE_BIGINT:
doCompareWithValueRange_OnlyLeftType<int64_t>(fp, rTypes[j]);
break;
case TSDB_DATA_TYPE_UTINYINT:
doCompareWithValueRange_OnlyLeftType<uint8_t>(fp, rTypes[j]);
break;
case TSDB_DATA_TYPE_USMALLINT:
doCompareWithValueRange_OnlyLeftType<uint16_t>(fp, rTypes[j]);
break;
case TSDB_DATA_TYPE_UINT:
doCompareWithValueRange_OnlyLeftType<uint32_t>(fp, rTypes[j]);
break;
case TSDB_DATA_TYPE_UBIGINT:
doCompareWithValueRange_OnlyLeftType<uint64_t>(fp, rTypes[j]);
break;
default:
FAIL();
}
}
}
}
TEST(dataCompareTest, signed_and_unsigned_int) {
std::vector<int32_t> lType = {TSDB_DATA_TYPE_TINYINT, TSDB_DATA_TYPE_SMALLINT, TSDB_DATA_TYPE_INT,
TSDB_DATA_TYPE_BIGINT};
std::vector<int32_t> rType = {TSDB_DATA_TYPE_UTINYINT, TSDB_DATA_TYPE_USMALLINT, TSDB_DATA_TYPE_UINT,
TSDB_DATA_TYPE_UBIGINT};
doCompare(lType, rType, OP_TYPE_GREATER_THAN);
doCompare(rType, lType, OP_TYPE_GREATER_THAN);
}
int main(int argc, char **argv) {
taosSeedRand(taosGetTimestampSec());

View File

@ -250,10 +250,11 @@ int32_t streamExecForAll(SStreamTask* pTask) {
void* pInput = NULL;
// merge multiple input data if possible in the input queue.
qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr);
while (1) {
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
if (qItem == NULL) {
// qDebug("s-task:%s extract data from input queue, queue is empty, abort", pTask->id.idStr);
break;
}
@ -298,7 +299,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
}
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
qDebug("s-task:%s exec begin, numOfBlocks:%d", pTask->id.idStr, batchSize);
qDebug("s-task:%s start to execute, numOfBlocks:%d", pTask->id.idStr, batchSize);
streamTaskExecImpl(pTask, pInput, pRes);

View File

@ -171,6 +171,8 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
}
void syncRespCleanRsp(SSyncRespMgr *pObj) {
if (pObj == NULL) return;
SSyncNode *pNode = pObj->data;
sTrace("vgId:%d, clean all resp", pNode->vgId);

View File

@ -587,12 +587,12 @@ void* destroyConnPool(SCliThrd* pThrd) {
static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
void* pool = pThrd->pool;
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1);
STrans* pTranInst = pThrd->pTransInst;
if (plist == NULL) {
SConnList list = {0};
taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
plist = taosHashGet(pool, key, strlen(key));
taosHashPut((SHashObj*)pool, key, strlen(key) + 1, (void*)&list, sizeof(list));
plist = taosHashGet(pool, key, strlen(key) + 1);
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
QUEUE_INIT(&nList->msgQ);
@ -627,11 +627,11 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) {
static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) {
void* pool = pThrd->pool;
STrans* pTransInst = pThrd->pTransInst;
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1);
if (plist == NULL) {
SConnList list = {0};
taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list));
plist = taosHashGet(pool, key, strlen(key));
taosHashPut((SHashObj*)pool, key, strlen(key) + 1, (void*)&list, sizeof(list));
plist = taosHashGet(pool, key, strlen(key) + 1);
SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList));
QUEUE_INIT(&nList->msgQ);
@ -717,7 +717,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
cliDestroyConnMsgs(conn, false);
if (conn->list == NULL) {
conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip));
conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip) + 1);
}
SConnList* pList = conn->list;
@ -822,7 +822,8 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) {
return;
}
if (nread < 0) {
tWarn("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread), T_REF_VAL_GET(conn));
tDebug("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread),
T_REF_VAL_GET(conn));
conn->broken = true;
cliHandleExcept(conn);
}
@ -875,8 +876,8 @@ static void cliDestroyConn(SCliConn* conn, bool clear) {
connList->list->numOfConn--;
connList->size--;
} else {
SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->ip, strlen(conn->ip));
connList->list->numOfConn--;
SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->ip, strlen(conn->ip) + 1);
if (connList != NULL) connList->list->numOfConn--;
}
conn->list = NULL;
pThrd->newConnCount--;
@ -1269,7 +1270,7 @@ static void cliHandleFastFail(SCliConn* pConn, int status) {
if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) &&
(pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) {
SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->ip, strlen(pConn->ip));
SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->ip, strlen(pConn->ip) + 1);
int64_t cTimestamp = taosGetTimestampMs();
if (item != NULL) {
int32_t elapse = cTimestamp - item->timestamp;
@ -1281,7 +1282,7 @@ static void cliHandleFastFail(SCliConn* pConn, int status) {
}
} else {
SFailFastItem item = {.count = 1, .timestamp = cTimestamp};
taosHashPut(pThrd->failFastCache, pConn->ip, strlen(pConn->ip), &item, sizeof(SFailFastItem));
taosHashPut(pThrd->failFastCache, pConn->ip, strlen(pConn->ip) + 1, &item, sizeof(SFailFastItem));
}
}
} else {
@ -1459,7 +1460,7 @@ FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) {
}
static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) {
uint32_t addr = 0;
uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn));
uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn) + 1);
if (v == NULL) {
addr = taosGetIpv4FromFqdn(fqdn);
if (addr == 0xffffffff) {
@ -1468,7 +1469,7 @@ static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn)
return addr;
}
taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr));
taosHashPut(cache, fqdn, strlen(fqdn) + 1, &addr, sizeof(addr));
} else {
addr = *v;
}

View File

@ -314,7 +314,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
return;
}
tWarn("%s conn %p read error:%s", transLabel(pTransInst), conn, uv_err_name(nread));
tDebug("%s conn %p read error:%s", transLabel(pTransInst), conn, uv_err_name(nread));
if (nread < 0) {
conn->broken = true;
if (conn->status == ConnAcquire) {

View File

@ -37,7 +37,7 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
if (cond) {
pReader->cond = *cond;
} else {
pReader->cond.scanUncommited = 0;
// pReader->cond.scanUncommited = 0;
pReader->cond.scanNotApplied = 0;
pReader->cond.scanMeta = 0;
pReader->cond.enableRef = 0;
@ -74,13 +74,18 @@ int32_t walNextValidMsg(SWalReader *pReader) {
int64_t lastVer = walGetLastVer(pReader->pWal);
int64_t committedVer = walGetCommittedVer(pReader->pWal);
int64_t appliedVer = walGetAppliedVer(pReader->pWal);
int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer;
endVer = TMIN(appliedVer, endVer);
while(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64", so sleep 1ms", pReader->pWal->cfg.vgId, appliedVer, committedVer);
taosMsleep(1);
appliedVer = walGetAppliedVer(pReader->pWal);
}
// int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer;
// endVer = TMIN(appliedVer, endVer);
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
", applied index:%" PRId64 ", end index:%" PRId64,
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
while (fetchVer <= endVer) {
", applied index:%" PRId64,
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer);
while (fetchVer <= committedVer) {
if (walFetchHeadNew(pReader, fetchVer) < 0) {
return -1;
}
@ -237,6 +242,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
}
seeked = true;
}
while (1) {
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
if (contLen == sizeof(SWalCkHead)) {

View File

@ -308,17 +308,19 @@ int32_t compareInt8Uint16(const void *pLeft, const void *pRight) {
int32_t compareInt8Uint32(const void *pLeft, const void *pRight) {
int8_t left = GET_INT8_VAL(pLeft);
if (left < 0) return -1;
uint32_t right = GET_UINT32_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
if ((uint32_t)left > right) return 1;
if ((uint32_t)left < right) return -1;
return 0;
}
int32_t compareInt8Uint64(const void *pLeft, const void *pRight) {
int8_t left = GET_INT8_VAL(pLeft);
if (left < 0) return -1;
uint64_t right = GET_UINT64_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
if ((uint64_t)left > right) return 1;
if ((uint64_t)left < right) return -1;
return 0;
}
@ -380,17 +382,19 @@ int32_t compareInt16Uint16(const void *pLeft, const void *pRight) {
int32_t compareInt16Uint32(const void *pLeft, const void *pRight) {
int16_t left = GET_INT16_VAL(pLeft);
if (left < 0) return -1;
uint32_t right = GET_UINT32_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
if ((uint32_t)left > right) return 1;
if ((uint32_t)left < right) return -1;
return 0;
}
int32_t compareInt16Uint64(const void *pLeft, const void *pRight) {
int16_t left = GET_INT16_VAL(pLeft);
if (left < 0) return -1;
uint64_t right = GET_UINT64_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
if ((uint64_t)left > right) return 1;
if ((uint64_t)left < right) return -1;
return 0;
}
@ -452,17 +456,19 @@ int32_t compareInt32Uint16(const void *pLeft, const void *pRight) {
int32_t compareInt32Uint32(const void *pLeft, const void *pRight) {
int32_t left = GET_INT32_VAL(pLeft);
if (left < 0) return -1;
uint32_t right = GET_UINT32_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
if ((uint32_t)left > right) return 1;
if ((uint32_t)left < right) return -1;
return 0;
}
int32_t compareInt32Uint64(const void *pLeft, const void *pRight) {
int32_t left = GET_INT32_VAL(pLeft);
if (left < 0) return -1;
uint64_t right = GET_UINT64_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
if ((uint64_t)left > right) return 1;
if ((uint64_t)left < right) return -1;
return 0;
}
@ -532,9 +538,10 @@ int32_t compareInt64Uint32(const void *pLeft, const void *pRight) {
int32_t compareInt64Uint64(const void *pLeft, const void *pRight) {
int64_t left = GET_INT64_VAL(pLeft);
if (left < 0) return -1;
uint64_t right = GET_UINT64_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
if ((uint64_t)left > right) return 1;
if ((uint64_t)left < right) return -1;
return 0;
}
@ -857,24 +864,27 @@ int32_t compareUint16Uint64(const void *pLeft, const void *pRight) {
int32_t compareUint32Int8(const void *pLeft, const void *pRight) {
uint32_t left = GET_UINT32_VAL(pLeft);
int8_t right = GET_INT8_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
if (right < 0) return 1;
if (left > (uint32_t)right) return 1;
if (left < (uint32_t)right) return -1;
return 0;
}
int32_t compareUint32Int16(const void *pLeft, const void *pRight) {
uint32_t left = GET_UINT32_VAL(pLeft);
int16_t right = GET_INT16_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
if (right < 0) return 1;
if (left > (uint32_t)right) return 1;
if (left < (uint32_t)right) return -1;
return 0;
}
int32_t compareUint32Int32(const void *pLeft, const void *pRight) {
uint32_t left = GET_UINT32_VAL(pLeft);
int32_t right = GET_INT32_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
if (right < 0) return 1;
if (left > (uint32_t)right) return 1;
if (left < (uint32_t)right) return -1;
return 0;
}
@ -929,32 +939,36 @@ int32_t compareUint32Uint64(const void *pLeft, const void *pRight) {
int32_t compareUint64Int8(const void *pLeft, const void *pRight) {
uint64_t left = GET_UINT64_VAL(pLeft);
int8_t right = GET_INT8_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
if (right < 0) return 1;
if (left > (uint64_t)right) return 1;
if (left < (uint64_t)right) return -1;
return 0;
}
int32_t compareUint64Int16(const void *pLeft, const void *pRight) {
uint64_t left = GET_UINT64_VAL(pLeft);
int16_t right = GET_INT16_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
if (right < 0) return 1;
if (left > (uint64_t)right) return 1;
if (left < (uint64_t)right) return -1;
return 0;
}
int32_t compareUint64Int32(const void *pLeft, const void *pRight) {
uint64_t left = GET_UINT64_VAL(pLeft);
int32_t right = GET_INT32_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
if (right < 0) return 1;
if (left > (uint64_t)right) return 1;
if (left < (uint64_t)right) return -1;
return 0;
}
int32_t compareUint64Int64(const void *pLeft, const void *pRight) {
uint64_t left = GET_UINT64_VAL(pLeft);
int64_t right = GET_INT64_VAL(pRight);
if (left > right) return 1;
if (left < right) return -1;
if (right < 0) return 1;
if (left > (uint64_t)right) return 1;
if (left < (uint64_t)right) return -1;
return 0;
}

View File

@ -122,7 +122,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_WRITE_AUTH, "No write permission")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_CONN_KILLED, "Connection killed")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SQL_SYNTAX_ERROR, "Syntax error in SQL")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DB_NOT_SELECTED, "Database not specified or available")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_TABLE_NAME, "Table does not exist")
//TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_TABLE_NAME, "Table does not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_EXCEED_SQL_LIMIT, "SQL statement too long")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_FILE_EMPTY, "File is empty")
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_LINE_SYNTAX_ERROR, "Syntax error in Line")

View File

@ -136,6 +136,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/sysinfo.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/user_control.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/user_manage.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/user_privilege.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/fsync.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/multilevel.py
,,n,system-test,python3 ./test.py -f 0-others/compatibility.py

View File

@ -363,8 +363,7 @@ def main():
git commit : {git_commit}
log dir: {log_dir}
core dir: {core_dir}
cmd: {cmd}
'''
cmd: {cmd}'''
send_msg(get_msg(text))
except Exception as e:

View File

@ -58,16 +58,16 @@ if $data23 != 0 then
return -1
endi
print ========== stop dnode2
system sh/exec.sh -n dnode2 -s stop -x SIGKILL
#print ========== stop dnode2
#system sh/exec.sh -n dnode2 -s stop -x SIGKILL
sleep 1000
print =============== drop database
sql_error drop database d1
#sleep 1000
#print =============== drop database
sql drop database d1
print ========== start dnode2
system sh/exec.sh -n dnode2 -s start
sleep 1000
#print ========== start dnode2
#system sh/exec.sh -n dnode2 -s start
#sleep 1000
print =============== re-create database
$x = 0

View File

@ -0,0 +1,120 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import taos
from taos.tmq import *
from util.cases import *
from util.common import *
from util.log import *
from util.sql import *
from util.sqlset import *
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.setsql = TDSetSql()
self.stbname = 'stb'
self.binary_length = 20 # the length of binary for column_dict
self.nchar_length = 20 # the length of nchar for column_dict
self.column_dict = {
'ts': 'timestamp',
'col1': 'float',
'col2': 'int',
'col3': 'float',
}
self.tag_dict = {
't1': 'int',
't2': f'binary({self.binary_length})'
}
self.tag_list = [
f'1, "Beijing"',
f'2, "Shanghai"',
f'3, "Guangzhou"',
f'4, "Shenzhen"'
]
self.values_list = [
f'now, 9.1, 200, 0.3'
]
self.tbnum = 4
def create_user(self):
user_name = 'test'
tdSql.execute(f'create user {user_name} pass "test"')
tdSql.execute(f'grant read on db.stb with t2 = "Beijing" to {user_name}')
def prepare_data(self):
tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict))
for i in range(self.tbnum):
tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_list[i]})')
for j in self.values_list:
tdSql.execute(f'insert into {self.stbname}_{i} values({j})')
def user_privilege_check(self):
testconn = taos.connect(user='test', password='test')
expectErrNotOccured = False
try:
sql = "select count(*) from db.stb where t2 = 'Beijing'"
res = testconn.query(sql)
data = res.fetch_all()
count = data[0][0]
except BaseException:
expectErrNotOccured = True
if expectErrNotOccured:
caller = inspect.getframeinfo(inspect.stack()[1][0])
tdLog.exit(f"{caller.filename}({caller.lineno}) failed: sql:{sql}, expect error not occured")
elif count != 1:
tdLog.exit(f"{sql}, expect result doesn't match")
pass
def user_privilege_error_check(self):
testconn = taos.connect(user='test', password='test')
expectErrNotOccured = False
sql_list = ["alter talbe db.stb_1 set t2 = 'Wuhan'", "drop table db.stb_1"]
for sql in sql_list:
try:
res = testconn.execute(sql)
except BaseException:
expectErrNotOccured = True
if expectErrNotOccured:
pass
else:
caller = inspect.getframeinfo(inspect.stack()[1][0])
tdLog.exit(f"{caller.filename}({caller.lineno}) failed: sql:{sql}, expect error not occured")
pass
def run(self):
tdSql.prepare()
self.prepare_data()
self.create_user()
self.user_privilege_check()
self.user_privilege_error_check()
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -34,6 +34,9 @@ class TDTestCase:
if ret != 0:
tdLog.info("sml_test ret != 0")
tdSql.query(f"select * from ts3303.stb2")
tdSql.query(f"select * from ts3303.meters")
# tdSql.execute('use sml_db')
tdSql.query(f"select * from {dbname}.t_b7d815c9222ca64cdf2614c61de8f211")
tdSql.checkRows(1)

View File

@ -554,7 +554,12 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
if (tsEnableScience) {
printf("%*e", width, GET_FLOAT_VAL(val));
} else {
printf("%*.5f", width, GET_FLOAT_VAL(val));
n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.5f", width, GET_FLOAT_VAL(val));
if (n > TMAX(20, width)) {
printf("%*e", width, GET_FLOAT_VAL(val));
} else {
printf("%s", buf);
}
}
break;
case TSDB_DATA_TYPE_DOUBLE:

View File

@ -1159,6 +1159,57 @@ int sml_td23881_Test() {
return code;
}
int sml_ts3303_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
TAOS_RES *pRes = taos_query(taos, "drop database if exists ts3303");
taos_free_result(pRes);
pRes = taos_query(taos, "create database if not exists ts3303");
taos_free_result(pRes);
const char *sql[] = {
"stb2,t1=1,dataModelName=t0 f1=283i32 1632299372000",
"stb2,t1=1,dataModelName=t0 f1=106i32 1632299378000",
"stb2,t1=4,dataModelName=t0 f1=144i32 1629716944000",
"stb2,t1=4,dataModelName=t0 f1=125i32 1629717012000",
"stb2,t1=4,dataModelName=t0 f1=144i32 1629717012000",
"stb2,t1=4,dataModelName=t0 f1=107i32 1629717013000",
"stb2,t1=6,dataModelName=t0 f1=154i32 1629717140000",
"stb2,t1=6,dataModelName=t0 f1=93i32 1629717140000",
"stb2,t1=6,dataModelName=t0 f1=134i32 1629717140000",
"stb2,t1=4,dataModelName=t0 f1=73i32 1629717140000",
"stb2,t1=4,dataModelName=t0 f1=83i32 1629717140000",
"stb2,t1=4,dataModelName=t0 f1=72i32 1629717140000",
};
const char *sql1[] = {
"meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=\"2022-02-0210:22:22\" 1626006833339000000",
"meters,groupid=2,location=California.LosAngeles current=11.8,voltage=221,phase=\"2022-02-0210:22:22\" 1626006833339000000",
};
pRes = taos_query(taos, "use ts3303");
taos_free_result(pRes);
pRes = taos_schemaless_insert_ttl(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL,
TSDB_SML_TIMESTAMP_MILLI_SECONDS, 20);
int code = taos_errno(pRes);
printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes));
taos_free_result(pRes);
ASSERT(code == 0);
pRes = taos_schemaless_insert_ttl(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_LINE_PROTOCOL,
TSDB_SML_TIMESTAMP_NANO_SECONDS, 20);
printf("%s result1:%s\n", __FUNCTION__, taos_errstr(pRes));
taos_free_result(pRes);
taos_close(taos);
return code;
}
int sml_ttl_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
@ -1336,6 +1387,9 @@ int main(int argc, char *argv[]) {
ASSERT(!ret);
ret = sml_ts2385_Test(); // this test case need config sml table name using ./sml_test config_file
ASSERT(!ret);
ret = sml_ts3303_Test(); // this test case need config sml table name using ./sml_test config_file
ASSERT(!ret);
// for(int i = 0; i < sizeof(str)/sizeof(str[0]); i++){
// printf("str:%s \t %d\n", str[i], smlCalTypeSum(str[i], strlen(str[i])));
// }