diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 1fb00e743f..b7e6c42e3b 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -190,6 +190,8 @@ STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo); +void verifyOffset(void *pWalReader, STqOffsetVal* pOffset); + int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType); void qStreamSetOpen(qTaskInfo_t tinfo); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 72ab2f89db..4688c8bb87 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -146,7 +146,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TSC_CONN_KILLED TAOS_DEF_ERROR_CODE(0, 0x0215) #define TSDB_CODE_TSC_SQL_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x0216) #define TSDB_CODE_TSC_DB_NOT_SELECTED TAOS_DEF_ERROR_CODE(0, 0x0217) -#define TSDB_CODE_TSC_INVALID_TABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x0218) +//#define TSDB_CODE_TSC_INVALID_TABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x0218) #define TSDB_CODE_TSC_EXCEED_SQL_LIMIT TAOS_DEF_ERROR_CODE(0, 0x0219) #define TSDB_CODE_TSC_FILE_EMPTY TAOS_DEF_ERROR_CODE(0, 0x021A) #define TSDB_CODE_TSC_LINE_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x021B) @@ -261,6 +261,7 @@ int32_t* taosGetErrno(); // #define TSDB_CODE_MND_INVALID_STABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x036D) // 2.x #define TSDB_CODE_MND_INVALID_STB_OPTION TAOS_DEF_ERROR_CODE(0, 0x036E) #define TSDB_CODE_MND_INVALID_ROW_BYTES TAOS_DEF_ERROR_CODE(0, 0x036F) +#define TSDB_CODE_MND_FIELD_VALUE_OVERFLOW TAOS_DEF_ERROR_CODE(0, 0x0370) // mnode-func diff --git a/source/client/inc/clientSml.h b/source/client/inc/clientSml.h index 3982c0d9aa..b20fc6f57a 100644 --- a/source/client/inc/clientSml.h +++ b/source/client/inc/clientSml.h @@ -169,6 +169,7 @@ typedef struct { int32_t uid; // used for automatic create child table SHashObj *childTables; + SHashObj *tableUids; SHashObj *superTables; SHashObj *pVgHash; @@ -242,6 +243,7 @@ int8_t smlGetTsTypeByLen(int32_t len); SSmlTableInfo* smlBuildTableInfo(int numRows, const char* measure, int32_t measureLen); SSmlSTableMeta* smlBuildSTableMeta(bool isDataFormat); int32_t smlSetCTableName(SSmlTableInfo *oneTable); +void getTableUid(SSmlHandle *info, SSmlLineInfo *currElement, SSmlTableInfo *tinfo); STableMeta* smlGetMeta(SSmlHandle *info, const void* measure, int32_t measureLen); int32_t is_same_child_table_telnet(const void *a, const void *b); int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t len); diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 6f56ae4569..e64e749b5b 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -195,6 +195,20 @@ int32_t smlSetCTableName(SSmlTableInfo *oneTable) { return TSDB_CODE_SUCCESS; } +void getTableUid(SSmlHandle *info, SSmlLineInfo *currElement, SSmlTableInfo *tinfo){ + char key[TSDB_TABLE_NAME_LEN * 2 + 1] = {0}; + size_t nLen = strlen(tinfo->childTableName); + memcpy(key, currElement->measure, currElement->measureLen); + memcpy(key + currElement->measureLen + 1, tinfo->childTableName, nLen); + void *uid = taosHashGet(info->tableUids, key, currElement->measureLen + 1 + nLen); // use \0 as separator for stable name and child table name + if (uid == NULL) { + tinfo->uid = info->uid++; + taosHashPut(info->tableUids, key, currElement->measureLen + 1 + nLen, &tinfo->uid, sizeof(uint64_t)); + }else{ + tinfo->uid = *(uint64_t*)uid; + } +} + SSmlSTableMeta *smlBuildSTableMeta(bool isDataFormat) { SSmlSTableMeta *meta = (SSmlSTableMeta *)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1); if (!meta) { @@ -1142,6 +1156,7 @@ void smlDestroyInfo(SSmlHandle *info) { taosHashCleanup(info->pVgHash); taosHashCleanup(info->childTables); taosHashCleanup(info->superTables); + taosHashCleanup(info->tableUids); for (int i = 0; i < taosArrayGetSize(info->tagJsonArray); i++) { cJSON *tags = (cJSON *)taosArrayGetP(info->tagJsonArray, i); @@ -1192,6 +1207,7 @@ SSmlHandle *smlBuildSmlInfo(TAOS *taos) { info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); info->childTables = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + info->tableUids = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); info->superTables = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); info->id = smlGenId(); @@ -1202,7 +1218,7 @@ SSmlHandle *smlBuildSmlInfo(TAOS *taos) { info->valueJsonArray = taosArrayInit(8, POINTER_BYTES); info->preLineTagKV = taosArrayInit(8, sizeof(SSmlKv)); - if (NULL == info->pVgHash || NULL == info->childTables || NULL == info->superTables) { + if (NULL == info->pVgHash || NULL == info->childTables || NULL == info->superTables || NULL == info->tableUids) { uError("create SSmlHandle failed"); goto cleanup; } @@ -1320,23 +1336,23 @@ static int32_t smlInsertData(SSmlHandle *info) { if (info->pRequest->dbList == NULL) { info->pRequest->dbList = taosArrayInit(1, TSDB_DB_FNAME_LEN); } - void *data = taosArrayReserve(info->pRequest->dbList, 1); - memcpy(data, info->pRequest->pDb, - TSDB_DB_FNAME_LEN > strlen(info->pRequest->pDb) ? strlen(info->pRequest->pDb) : TSDB_DB_FNAME_LEN); + char *data = (char*)taosArrayReserve(info->pRequest->dbList, 1); + SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}}; + tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname)); + tNameGetFullDbName(&pName, data); SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL); while (oneTable) { SSmlTableInfo *tableData = *oneTable; - - SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}}; - tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname)); - memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName)); + tstrncpy(pName.tname, tableData->sTableName, tableData->sTableNameLen + 1); if (info->pRequest->tableList == NULL) { info->pRequest->tableList = taosArrayInit(1, sizeof(SName)); } taosArrayPush(info->pRequest->tableList, &pName); + tstrncpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName) + 1); + SRequestConnInfo conn = {0}; conn.pTrans = info->taos->pAppInfo->pTransporter; conn.requestId = info->pRequest->requestId; @@ -1428,6 +1444,7 @@ int32_t smlClearForRerun(SSmlHandle *info) { taosHashClear(info->childTables); taosHashClear(info->superTables); + taosHashClear(info->tableUids); if (!info->dataFormat) { if (unlikely(info->lines != NULL)) { @@ -1562,7 +1579,8 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL do { code = smlModifyDBSchemas(info); if (code == 0 || code == TSDB_CODE_SML_INVALID_DATA || code == TSDB_CODE_PAR_TOO_MANY_COLUMNS - || code == TSDB_CODE_PAR_INVALID_TAGS_NUM) break; + || code == TSDB_CODE_PAR_INVALID_TAGS_NUM || code == TSDB_CODE_PAR_INVALID_TAGS_LENGTH + || code == TSDB_CODE_PAR_INVALID_ROW_LENGTH) break; taosMsleep(100); uInfo("SML:0x%" PRIx64 " smlModifyDBSchemas retry code:%s, times:%d", info->id, tstrerror(code), retryNum); } while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES); @@ -1647,7 +1665,8 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, info->cost.endTime = taosGetTimestampUs(); info->cost.code = code; if (code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING || - code == TSDB_CODE_PAR_VALUE_TOO_LONG || code == TSDB_CODE_MND_TRANS_CONFLICT) { + code == TSDB_CODE_PAR_VALUE_TOO_LONG || code == TSDB_CODE_MND_TRANS_CONFLICT || + code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { if (cnt++ >= 10) { uInfo("SML:%" PRIx64 " retry:%d/10 end code:%d, msg:%s", info->id, cnt, code, tstrerror(code)); break; diff --git a/source/client/src/clientSmlJson.c b/source/client/src/clientSmlJson.c index b0ae316031..7ccf930964 100644 --- a/source/client/src/clientSmlJson.c +++ b/source/client/src/clientSmlJson.c @@ -778,7 +778,7 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo tinfo->tags = taosArrayDup(preLineKV, NULL); smlSetCTableName(tinfo); - tinfo->uid = info->uid++; + getTableUid(info, elements, tinfo); if (info->dataFormat) { info->currSTableMeta->uid = tinfo->uid; tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta); diff --git a/source/client/src/clientSmlLine.c b/source/client/src/clientSmlLine.c index 1732473c11..2f7e8a0f97 100644 --- a/source/client/src/clientSmlLine.c +++ b/source/client/src/clientSmlLine.c @@ -312,7 +312,7 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin } smlSetCTableName(tinfo); - tinfo->uid = info->uid++; + getTableUid(info, currElement, tinfo); if (info->dataFormat) { info->currSTableMeta->uid = tinfo->uid; tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta); diff --git a/source/client/src/clientSmlTelnet.c b/source/client/src/clientSmlTelnet.c index 036442573d..c5dd20ba7b 100644 --- a/source/client/src/clientSmlTelnet.c +++ b/source/client/src/clientSmlTelnet.c @@ -206,7 +206,7 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS tinfo->tags = taosArrayDup(preLineKV, NULL); smlSetCTableName(tinfo); - tinfo->uid = info->uid++; + getTableUid(info, elements, tinfo); if (info->dataFormat) { info->currSTableMeta->uid = tinfo->uid; tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index fc7cbc19c0..5f7e43668a 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -120,6 +120,7 @@ int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const pColumnInfoData->varmeta.length += dataLen; } else { memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex, pData, pColumnInfoData->info.bytes); + colDataClearNull_f(pColumnInfoData->nullbitmap, rowIndex); } return 0; @@ -1949,12 +1950,11 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) { } } } - #endif // for debug char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) { - int32_t size = 2048; + int32_t size = 2048*1024; *pDataBuf = taosMemoryCalloc(size, 1); char* dumpBuf = *pDataBuf; char pBuf[128] = {0}; @@ -1970,7 +1970,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) if (len >= size - 1) return dumpBuf; for (int32_t j = 0; j < rows; j++) { - len += snprintf(dumpBuf + len, size - len, "%s |", flag); + len += snprintf(dumpBuf + len, size - len, "%s %d|", flag, j); if (len >= size - 1) return dumpBuf; for (int32_t k = 0; k < colNum; k++) { diff --git a/source/dnode/mnode/impl/inc/mndVgroup.h b/source/dnode/mnode/impl/inc/mndVgroup.h index 0229735952..94c4eae83f 100644 --- a/source/dnode/mnode/impl/inc/mndVgroup.h +++ b/source/dnode/mnode/impl/inc/mndVgroup.h @@ -50,6 +50,8 @@ void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *p void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid); +int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index da3c3b98a8..a73c08e69a 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -797,6 +797,11 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat return -1; } + if(pDst->nextColId < 0 || pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags){ + terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW; + return -1; + } + for (int32_t i = 0; i < pDst->numOfColumns; ++i) { SField *pField = taosArrayGet(pCreate->pColumns, i); SSchema *pSchema = &pDst->pColumns[i]; @@ -927,6 +932,11 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq return -1; } + if(pDst->nextColId < 0 && pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags){ + terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW; + return -1; + } + for (int32_t i = 0; i < pDst->numOfColumns; ++i) { SField *pField = taosArrayGet(createReq->pColumns, i); SSchema *pSchema = &pDst->pColumns[i]; @@ -1154,6 +1164,11 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p return -1; } + if(pNew->nextColId < 0 && pNew->nextColId >= 0x7fff - ntags){ + terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW; + return -1; + } + for (int32_t i = 0; i < ntags; i++) { SField *pField = taosArrayGet(pFields, i); if (mndFindSuperTableColumnIndex(pOld, pField->name) >= 0) { @@ -1461,6 +1476,11 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray return -1; } + if(pNew->nextColId < 0 && pNew->nextColId >= 0x7fff - ncols){ + terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW; + return -1; + } + for (int32_t i = 0; i < ncols; i++) { SField *pField = taosArrayGet(pFields, i); if (mndFindSuperTableColumnIndex(pOld, pField->name) >= 0) { diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 523753d7c6..2a0d753722 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -236,7 +236,7 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) { SDB_SET_BINARY(pRaw, dataPos, key, keyLen, _OVER); SDB_SET_INT32(pRaw, dataPos, *useDb, _OVER) - useDb = taosHashIterate(pUser->writeTbs, useDb); + useDb = taosHashIterate(pUser->useDbs, useDb); } SDB_SET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER) diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index ed1fddb63f..0003d07fd6 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -2006,7 +2006,7 @@ static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, return 0; } -static int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) { +int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup) { int32_t code = -1; STrans *pTrans = NULL; SSdbRaw *pRaw = NULL; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index c7424cd233..88460cd3ca 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -255,14 +255,13 @@ int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList); int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id); -int32_t tqNextBlock(STqReader *pReader, SSDataBlock* pBlock); int32_t tqNextBlockInWal(STqReader* pReader); -int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData); - -int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); bool tqNextBlockImpl(STqReader *pReader); + +int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData); +int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); -int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader, SSubmitTbData **pSubmitTbDataRet); +int32_t tqRetrieveDataBlock(STqReader *pReader, SSubmitTbData **pSubmitTbDataRet); int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet); int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index e4d274cd57..d58263ab86 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -214,7 +214,6 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t version, char* msg, int32_t msgL int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen); -int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit); int32_t tqProcessSubmitReqForSubscribe(STQ* pTq); int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8d9d5c5fa4..bbdfd715b5 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1082,12 +1082,15 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); taosWLockLatch(&pTq->lock); - if(taosHashGetSize(pTq->pPushMgr) > 0){ - void *pIter = taosHashIterate(pTq->pPushMgr, NULL); - while(pIter){ + + if (taosHashGetSize(pTq->pPushMgr) > 0) { + void* pIter = taosHashIterate(pTq->pPushMgr, NULL); + + while (pIter) { STqHandle* pHandle = *(STqHandle**)pIter; - tqDebug("vgId:%d start set submit for pHandle:%p, consume id:0x%"PRIx64, vgId, pHandle, pHandle->consumerId); - if(ASSERT(pHandle->msg != NULL)){ + tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId); + + if (ASSERT(pHandle->msg != NULL)) { tqError("pHandle->msg should not be null"); break; }else{ @@ -1096,77 +1099,15 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) { taosMemoryFree(pHandle->msg); pHandle->msg = NULL; } + pIter = taosHashIterate(pTq->pPushMgr, pIter); } + taosHashClear(pTq->pPushMgr); } + // unlock taosWUnLockLatch(&pTq->lock); - - return 0; -} - -int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { -#if 0 - void* pIter = NULL; - SStreamDataSubmit2* pSubmit = streamDataSubmitNew(submit, STREAM_INPUT__DATA_SUBMIT); - if (pSubmit == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("failed to create data submit for stream since out of memory"); - saveOffsetForAllTasks(pTq, submit.ver); - return -1; - } - - SArray* pInputQueueFullTasks = taosArrayInit(4, POINTER_BYTES); - - while (1) { - pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); - if (pIter == NULL) { - break; - } - - SStreamTask* pTask = *(SStreamTask**)pIter; - if (pTask->taskLevel != TASK_LEVEL__SOURCE) { - continue; - } - - if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { - tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId, - pTask->status.taskStatus); - continue; - } - - // check if offset value exists - char key[128] = {0}; - createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId); - - if (tInputQueueIsFull(pTask)) { - STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key); - - int64_t ver = submit.ver; - if (pOffset == NULL) { - doSaveTaskOffset(pTq->pOffsetStore, key, submit.ver); - } else { - ver = pOffset->val.version; - } - - tqDebug("s-task:%s input queue is full, discard submit block, ver:%" PRId64, pTask->id.idStr, ver); - taosArrayPush(pInputQueueFullTasks, &pTask); - continue; - } - - // check if offset value exists - STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key); - ASSERT(pOffset == NULL); - - addSubmitBlockNLaunchTask(pTq->pOffsetStore, pTask, pSubmit, key, submit.ver); - } - - streamDataSubmitDestroy(pSubmit); - taosFreeQitem(pSubmit); -#endif - - tqStartStreamTasks(pTq); return 0; } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index e86a7c9068..950c5ea96b 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -16,250 +16,10 @@ #include "tq.h" #include "vnd.h" -#if 0 -void tqTmrRspFunc(void* param, void* tmrId) { - STqHandle* pHandle = (STqHandle*)param; - atomic_store_8(&pHandle->pushHandle.tmrStopped, 1); -} - -static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubmit** ppSubmit, SMqDataRsp* pRsp) { - SStreamDataSubmit* pSubmit = *ppSubmit; - while (pSubmit != NULL) { - if (tqLogScanExec(pTq, &pHandle->execHandle, pSubmit->data, pRsp, 0) < 0) { - } - // update processed - atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver); - streamQueueProcessSuccess(&pHandle->pushHandle.inputQ); - streamDataSubmitDestroy(pSubmit); - if (pRsp->blockNum > 0) { - *ppSubmit = pSubmit; - return 0; - } else { - pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ); - } - } - *ppSubmit = pSubmit; - return -1; -} - -int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) { - SMqDataRsp rsp = {0}; - // 1. guard and set status executing - int8_t execStatus = atomic_val_compare_exchange_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE, - TASK_EXEC_STATUS__EXECUTING); - if (execStatus == TASK_EXEC_STATUS__IDLE) { - SStreamDataSubmit* pSubmit = NULL; - // 2. check processedVer - // 2.1. if not missed, get msg from queue - // 2.2. if missed, scan wal - pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ); - while (pHandle->pushHandle.processedVer <= pSubmit->ver) { - // read from wal - } - while (pHandle->pushHandle.processedVer > pSubmit->ver + 1) { - streamQueueProcessSuccess(&pHandle->pushHandle.inputQ); - streamDataSubmitDestroy(pSubmit); - pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ); - if (pSubmit == NULL) break; - } - // 3. exec, after each success, update processed ver - // first run - if (tqLoopExecFromQueue(pTq, pHandle, &pSubmit, &rsp) == 0) { - goto SEND_RSP; - } - // set exec status closing - atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__CLOSING); - // second run - if (tqLoopExecFromQueue(pTq, pHandle, &pSubmit, &rsp) == 0) { - goto SEND_RSP; - } - // set exec status idle - atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE); - } -SEND_RSP: - // 4. if get result - // 4.1 set exec input status blocked and exec status idle - atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE); - // 4.2 rpc send - rsp.rspOffset = pHandle->pushHandle.processedVer; - /*if (tqSendPollRsp(pTq, pMsg, pReq, &rsp) < 0) {*/ - /*return -1;*/ - /*}*/ - // 4.3 clear rpc info - memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo)); - return 0; -} - -int32_t tqOpenPushHandle(STQ* pTq, STqHandle* pHandle) { - memset(&pHandle->pushHandle, 0, sizeof(STqPushHandle)); - pHandle->pushHandle.inputQ.queue = taosOpenQueue(); - pHandle->pushHandle.inputQ.qall = taosAllocateQall(); - if (pHandle->pushHandle.inputQ.queue == NULL || pHandle->pushHandle.inputQ.qall == NULL) { - if (pHandle->pushHandle.inputQ.queue) { - taosCloseQueue(pHandle->pushHandle.inputQ.queue); - } - if (pHandle->pushHandle.inputQ.qall) { - taosFreeQall(pHandle->pushHandle.inputQ.qall); - } - return -1; - } - return 0; -} - -int32_t tqPreparePush(STQ* pTq, STqHandle* pHandle, int64_t reqId, const SRpcHandleInfo* pInfo, int64_t processedVer, - int64_t timeout) { - memcpy(&pHandle->pushHandle.rpcInfo, pInfo, sizeof(SRpcHandleInfo)); - atomic_store_64(&pHandle->pushHandle.reqId, reqId); - atomic_store_64(&pHandle->pushHandle.processedVer, processedVer); - atomic_store_8(&pHandle->pushHandle.inputStatus, TASK_INPUT_STATUS__NORMAL); - atomic_store_8(&pHandle->pushHandle.tmrStopped, 0); - taosTmrReset(tqTmrRspFunc, (int32_t)timeout, pHandle, tqMgmt.timer, &pHandle->pushHandle.timerId); - return 0; -} - -int32_t tqEnqueue(STqHandle* pHandle, SStreamDataSubmit* pSubmit) { - int8_t inputStatus = atomic_load_8(&pHandle->pushHandle.inputStatus); - if (inputStatus == TASK_INPUT_STATUS__NORMAL) { - SStreamDataSubmit* pSubmitClone = streamSubmitBlockClone(pSubmit); - if (pSubmitClone == NULL) { - return -1; - } - taosWriteQitem(pHandle->pushHandle.inputQ.queue, pSubmitClone); - return 0; - } - return -1; -} - -int32_t tqSendExecReq(STQ* pTq, STqHandle* pHandle) { - // - return 0; -} - -int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) { - if (msgType != TDMT_VND_SUBMIT) return 0; - void* pIter = NULL; - STqHandle* pHandle = NULL; - SSubmitReq* pReq = (SSubmitReq*)msg; - int32_t workerId = 4; - int64_t fetchOffset = ver; - - while (1) { - pIter = taosHashIterate(pTq->pushMgr, pIter); - if (pIter == NULL) break; - pHandle = *(STqHandle**)pIter; - - taosWLockLatch(&pHandle->pushHandle.lock); - - SMqDataRsp rsp = {0}; - rsp.reqOffset = pHandle->pushHandle.reqOffset; - rsp.blockData = taosArrayInit(0, sizeof(void*)); - rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t)); - - if (msgType == TDMT_VND_SUBMIT) { - tqLogScanExec(pTq, &pHandle->execHandle, pReq, &rsp, workerId); - } else { - tqError("tq push unexpected msg type %d", msgType); - } - - if (rsp.blockNum == 0) { - taosWUnLockLatch(&pHandle->pushHandle.lock); - continue; - } - - rsp.rspOffset = fetchOffset; - - int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp); - void* buf = rpcMallocCont(tlen); - if (buf == NULL) { - // todo free - return -1; - } - - ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; - ((SMqRspHead*)buf)->epoch = pHandle->pushHandle.epoch; - ((SMqRspHead*)buf)->consumerId = pHandle->pushHandle.consumerId; - - void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); - tEncodeSMqDataBlkRsp(&abuf, &rsp); - - SRpcMsg resp = { - .info = pHandle->pushHandle.rpcInfo, - .pCont = buf, - .contLen = tlen, - .code = 0, - }; - tmsgSendRsp(&resp); - - memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo)); - taosWUnLockLatch(&pHandle->pushHandle.lock); - - tqDebug("vgId:%d offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, req:%" PRId64 ", rsp:%" PRId64, - TD_VID(pTq->pVnode), fetchOffset, pHandle->pushHandle.consumerId, pHandle->pushHandle.epoch, rsp.blockNum, - rsp.reqOffset, rsp.rspOffset); - - // TODO destroy - taosArrayDestroy(rsp.blockData); - taosArrayDestroy(rsp.blockDataLen); - } - - return 0; -} -#endif - int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { -// void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg)); -// int32_t len = msgLen - sizeof(SSubmitReq2Msg); -// int32_t vgId = TD_VID(pTq->pVnode); if (msgType == TDMT_VND_SUBMIT) { tqProcessSubmitReqForSubscribe(pTq); - // lock push mgr to avoid potential msg lost -// taosWLockLatch(&pTq->lock); -// -// int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr); -// if (numOfRegisteredPush > 0) { -// tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d", -// vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush); -// -// void* data = taosMemoryMalloc(len); -// if (data == NULL) { -// terrno = TSDB_CODE_OUT_OF_MEMORY; -// tqError("failed to copy data for stream since out of memory, vgId:%d", vgId); -// taosWUnLockLatch(&pTq->lock); -// return -1; -// } -// -// memcpy(data, pReq, len); -// -// SArray* cachedKey = taosArrayInit(0, sizeof(SItem)); -// void* pIter = NULL; -// -// while (1) { -// pIter = taosHashIterate(pTq->pPushMgr, pIter); -// if (pIter == NULL) { -// break; -// } -// -// STqPushEntry* pPushEntry = *(STqPushEntry**)pIter; -// -// STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey)); -// if (pHandle == NULL) { -// tqDebug("vgId:%d, failed to find handle %s in pushing data to consumer, ignore", pTq->pVnode->config.vgId, -// pPushEntry->subKey); -// continue; -// } -// -// STqExecHandle* pExec = &pHandle->execHandle; -// doPushDataForEntry(pIter, pExec, pTq, ver, vgId, data, len, cachedKey); -// } -// -// doRemovePushedEntry(cachedKey, pTq); -// taosArrayDestroyEx(cachedKey, freeItem); -// taosMemoryFree(data); -// } -// -// // unlock -// taosWUnLockLatch(&pTq->lock); } int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta); @@ -275,8 +35,7 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v } if (msgType == TDMT_VND_SUBMIT) { - SPackedData submit = {0}; - tqProcessSubmitReq(pTq, submit); + tqStartStreamTasks(pTq); } if (msgType == TDMT_VND_DELETE) { @@ -287,16 +46,16 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v return 0; } - int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); - STqHandle* pHandle = (STqHandle*) handle; - if(pHandle->msg == NULL){ + STqHandle* pHandle = (STqHandle*)handle; + + if (pHandle->msg == NULL) { pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg)); memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); pHandle->msg->pCont = rpcMallocCont(pMsg->contLen); - }else{ - void *tmp = pHandle->msg->pCont; + } else { + void* tmp = pHandle->msg->pCont; memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); pHandle->msg->pCont = tmp; } @@ -304,7 +63,8 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) { memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen); pHandle->msg->contLen = pMsg->contLen; int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES); - tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen); + tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, + pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen); return 0; } @@ -314,6 +74,7 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) { int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey)); tqError("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId); + if(pHandle->msg != NULL) { tqPushDataRsp(pTq, pHandle); @@ -321,5 +82,6 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) { taosMemoryFree(pHandle->msg); pHandle->msg = NULL; } + return 0; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index d4db43dd21..2b5ca59408 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -332,6 +332,7 @@ int32_t tqNextBlockInWal(STqReader* pReader) { if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) { // try next message in wal file + // todo always retry to avoid read failure caused by wal file deletion if (walNextValidMsg(pWalReader) < 0) { return FETCH_TYPE__NONE; } @@ -374,7 +375,7 @@ int32_t tqNextBlockInWal(STqReader* pReader) { SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); if (pReader->tbIdHash == NULL) { - int32_t code = tqRetrieveDataBlock(pReader->pResBlock, pReader, NULL); + int32_t code = tqRetrieveDataBlock(pReader, NULL); if (code == TSDB_CODE_SUCCESS && pReader->pResBlock->info.rows > 0) { return FETCH_TYPE__DATA; } @@ -384,7 +385,7 @@ int32_t tqNextBlockInWal(STqReader* pReader) { if (ret != NULL) { tqDebug("tq reader return submit block, uid:%"PRId64", ver:%"PRId64, pSubmitTbData->uid, pReader->msg.ver); - int32_t code = tqRetrieveDataBlock(pReader->pResBlock, pReader, NULL); + int32_t code = tqRetrieveDataBlock(pReader, NULL); if (code == TSDB_CODE_SUCCESS && pReader->pResBlock->info.rows > 0) { return FETCH_TYPE__DATA; } @@ -399,31 +400,6 @@ int32_t tqNextBlockInWal(STqReader* pReader) { } } -int32_t tqNextBlock(STqReader* pReader, SSDataBlock* pBlock) { - while (1) { - if (pReader->msg.msgStr == NULL) { - if (walNextValidMsg(pReader->pWalReader) < 0) { - return FETCH_TYPE__NONE; - } - - void* pBody = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg)); - int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); - int64_t ver = pReader->pWalReader->pHead->head.version; - - tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver); - } - - while (tqNextBlockImpl(pReader)) { - int32_t code = tqRetrieveDataBlock(pReader->pResBlock, pReader, NULL); - if (code != TSDB_CODE_SUCCESS || pBlock->info.rows == 0) { - continue; - } - - return FETCH_TYPE__DATA; - } - } -} - int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) { pReader->msg.msgStr = msgStr; pReader->msg.msgLen = msgLen; @@ -527,7 +503,7 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap return 0; } -int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) { +int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) { tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++); @@ -535,6 +511,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa *pSubmitTbDataRet = pSubmitTbData; } + SSDataBlock* pBlock = pReader->pResBlock; blockDataCleanup(pBlock); int32_t sversion = pSubmitTbData->sver; @@ -603,7 +580,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId); int32_t code = blockDataAppendColInfo(pBlock, &colInfo); if (code != TSDB_CODE_SUCCESS) { - goto FAIL; + return -1; } i++; j++; @@ -622,7 +599,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa if (blockDataEnsureCapacity(pBlock, numOfRows) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; - goto FAIL; + return -1; } pBlock->info.rows = numOfRows; @@ -638,7 +615,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa while (targetIdx < colActual) { if (sourceIdx >= numOfCols) { tqError("tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols); - goto FAIL; + return -1; } SColData* pCol = taosArrayGet(pCols, sourceIdx); @@ -647,7 +624,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa if (pCol->nVal != numOfRows) { tqError("tqRetrieveDataBlock pCol->nVal:%d != numOfRows:%d", pCol->nVal, numOfRows); - goto FAIL; + return -1; } if (pCol->cid < pColData->info.colId) { @@ -661,14 +638,14 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData); varDataSetLen(val, colVal.value.nData); if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) { - goto FAIL; + return -1; } } else { colDataSetNULL(pColData, i); } } else { if (colDataAppend(pColData, i, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) { - goto FAIL; + return -1; } } } @@ -710,14 +687,14 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData); varDataSetLen(val, colVal.value.nData); if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) { - goto FAIL; + return -1; } } else { colDataSetNULL(pColData, i); } } else { if (colDataAppend(pColData, i, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) { - goto FAIL; + return -1; } } @@ -735,10 +712,6 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa } return 0; - -FAIL: - blockDataFreeRes(pBlock); - return -1; } int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) { diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 3d9cea54ba..800bcc8b71 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -66,9 +66,10 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, in int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { const int32_t MAX_ROWS_TO_RETURN = 4096; - int32_t vgId = TD_VID(pTq->pVnode); - int32_t code = 0; - int32_t totalRows = 0; + + int32_t vgId = TD_VID(pTq->pVnode); + int32_t code = 0; + int32_t totalRows = 0; const STqExecHandle* pExec = &pHandle->execHandle; qTaskInfo_t task = pExec->task; diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 133c51a8dc..e3bde14b6d 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -175,7 +175,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, goto end; } -// till now, all data has been transferred to consumer, new data needs to push client once arrived. + // till now, all data has been transferred to consumer, new data needs to push client once arrived. if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { // lock @@ -246,6 +246,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (offset->type == TMQ_OFFSET__LOG) { + verifyOffset(pHandle->pWalReader, offset); int64_t fetchVer = offset->version + 1; pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); if (pCkHead == NULL) { @@ -361,11 +362,10 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ // this is a normal subscribe requirement if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset); + } else { // todo handle the case where re-balance occurs. + // for taosx + return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &offset); } - - // todo handle the case where re-balance occurs. - // for taosx - return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &offset); } int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 4f8768653a..bd4a20b07e 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -458,7 +458,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp walApplyVer(pVnode->pWal, version); if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { -// /*vInfo("vgId:%d, push msg end", pVnode->config.vgId);*/ vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index 8852265da0..37c93fef5c 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -59,7 +59,7 @@ typedef struct { STqOffsetVal currentOffset; // for tmq SMqMetaRsp metaRsp; // for tmq fetching meta int64_t snapshotVer; - SPackedData submit; // todo remove it +// SPackedData submit; // todo remove it SSchemaWrapper* schema; char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor int8_t recoverStep; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 5fc079b7c1..8bbbd3524d 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1058,6 +1058,14 @@ void qStreamSetOpen(qTaskInfo_t tinfo) { pOperator->status = OP_NOT_OPENED; } +void verifyOffset(void *pWalReader, STqOffsetVal* pOffset){ + // if offset version is small than first version , let's seek to first version + int64_t firstVer = walGetFirstVer(((SWalReader*)pWalReader)->pWal); + if (pOffset->version + 1 < firstVer){ + pOffset->version = firstVer - 1; + } +} + int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SOperatorInfo* pOperator = pTaskInfo->pRoot; @@ -1080,15 +1088,11 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT STableListInfo* pTableListInfo = pScanBaseInfo->pTableListInfo; if (pOffset->type == TMQ_OFFSET__LOG) { + // todo refactor: move away tsdbReaderClose(pScanBaseInfo->dataReader); pScanBaseInfo->dataReader = NULL; - // let's seek to the next version in wal file - int64_t firstVer = walGetFirstVer(pInfo->tqReader->pWalReader->pWal); - if (pOffset->version + 1 < firstVer){ - pOffset->version = firstVer - 1; - } - + verifyOffset(pInfo->tqReader->pWalReader, pOffset); if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) { qError("tqSeekVer failed ver:%" PRId64 ", %s", pOffset->version + 1, id); return -1; diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 70200b067e..ce801e3f7f 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -82,7 +82,7 @@ static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SC static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol); static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf, - SGroupResInfo* pGroupResInfo); + SGroupResInfo* pGroupResInfo, int32_t threshold); SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) { SFilePage* pData = NULL; @@ -777,7 +777,7 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos } int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf, - SGroupResInfo* pGroupResInfo) { + SGroupResInfo* pGroupResInfo, int32_t threshold) { SExprInfo* pExprInfo = pSup->pExprInfo; int32_t numOfExprs = pSup->numOfExprs; int32_t* rowEntryOffset = pSup->rowEntryInfoOffset; @@ -826,6 +826,9 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS releaseBufPage(pBuf, page); pBlock->info.rows += pRow->numOfRows; + if (pBlock->info.rows >= threshold) { + break; + } } qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows, @@ -835,6 +838,34 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS return 0; } + +void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, + SDiskbasedBuf* pBuf) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SSDataBlock* pBlock = pbInfo->pRes; + + // set output datablock version + pBlock->info.version = pTaskInfo->version; + + blockDataCleanup(pBlock); + if (!hasRemainResults(pGroupResInfo)) { + return; + } + + // clear the existed group id + pBlock->info.id.groupId = 0; + ASSERT(!pbInfo->mergeResultBlock); + doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold); + + void* tbname = NULL; + if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) { + pBlock->info.parTbName[0] = 0; + } else { + memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); + } + tdbFree(tbname); +} + void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -851,10 +882,10 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG // clear the existed group id pBlock->info.id.groupId = 0; if (!pbInfo->mergeResultBlock) { - doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo); + doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold); } else { while (hasRemainResults(pGroupResInfo)) { - doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo); + doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold); if (pBlock->info.rows >= pOperator->resultInfo.threshold) { break; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 76a820ee83..e1fa7a282b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1613,6 +1613,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { qDebug("start to exec queue scan, %s", id); +#if 0 if (pTaskInfo->streamInfo.submit.msgStr != NULL) { if (pInfo->tqReader->msg.msgStr == NULL) { SPackedData submit = pTaskInfo->streamInfo.submit; @@ -1626,7 +1627,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; while (tqNextBlockImpl(pInfo->tqReader)) { - int32_t code = tqRetrieveDataBlock(pInfo->tqReader->pResBlock, pInfo->tqReader, NULL); + int32_t code = tqRetrieveDataBlock(pInfo->tqReader, NULL); if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) { continue; } @@ -1642,6 +1643,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { pTaskInfo->streamInfo.submit = (SPackedData){0}; return NULL; } +#endif if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); @@ -1659,10 +1661,12 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) { return NULL; } + tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pTaskInfo->streamInfo.snapshotVer); } if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) { + while (1) { int32_t type = tqNextBlockInWal(pInfo->tqReader); SSDataBlock* pRes = pInfo->tqReader->pResBlock; @@ -2074,7 +2078,7 @@ FETCH_NEXT_BLOCK: blockDataCleanup(pInfo->pRes); while (tqNextBlockImpl(pInfo->tqReader)) { - int32_t code = tqRetrieveDataBlock(pInfo->tqReader->pResBlock, pInfo->tqReader, NULL); + int32_t code = tqRetrieveDataBlock(pInfo->tqReader, NULL); if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) { continue; } @@ -2105,7 +2109,6 @@ FETCH_NEXT_BLOCK: // record the scan action. pInfo->numOfExec++; pOperator->resultInfo.totalRows += pBlockInfo->rows; - // printDataBlock(pInfo->pRes, "stream scan"); qDebug("scan rows: %" PRId64, pBlockInfo->rows); if (pBlockInfo->rows > 0) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 93ee3916ba..78a1f3e967 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -263,6 +263,8 @@ int32_t streamExecForAll(SStreamTask* pTask) { int16_t times = 0; // merge multiple input data if possible in the input queue. + qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr); + while (1) { SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); if (qItem == NULL) { @@ -272,6 +274,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { qDebug("===stream===try agian batchSize:%d", batchSize); continue; } + break; } @@ -316,7 +319,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { } SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); - qDebug("s-task:%s exec begin, numOfBlocks:%d", pTask->id.idStr, batchSize); + qDebug("s-task:%s start to execute, numOfBlocks:%d", pTask->id.idStr, batchSize); streamTaskExecImpl(pTask, pInput, pRes); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 66c32437a3..f7ada8be84 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -239,6 +239,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { } seeked = true; } + while (1) { contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead)); if (contLen == sizeof(SWalCkHead)) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index f92e115cab..d9247a4e83 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -122,7 +122,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_WRITE_AUTH, "No write permission") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_CONN_KILLED, "Connection killed") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SQL_SYNTAX_ERROR, "Syntax error in SQL") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DB_NOT_SELECTED, "Database not specified or available") -TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_TABLE_NAME, "Table does not exist") +//TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_TABLE_NAME, "Table does not exist") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_EXCEED_SQL_LIMIT, "SQL statement too long") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_FILE_EMPTY, "File is empty") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_LINE_SYNTAX_ERROR, "Syntax error in Line") @@ -203,6 +203,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_ALREADY_EXIST, "Column already exists TAOS_DEFINE_ERROR(TSDB_CODE_MND_COLUMN_NOT_EXIST, "Column does not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STB_OPTION, "Invalid stable options") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_ROW_BYTES, "Invalid row bytes") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_FIELD_VALUE_OVERFLOW, "out of range and overflow") // mnode-func TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_NAME, "Invalid func name") diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 4c5ce354ab..775b55d45d 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -553,6 +553,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/sysinfo.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/user_control.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/user_manage.py +,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/user_privilege.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/fsync.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/multilevel.py #,,n,system-test,python3 ./test.py -f 0-others/compatibility.py diff --git a/tests/script/tsim/db/error1.sim b/tests/script/tsim/db/error1.sim index 32dbe826cc..64b17125aa 100644 --- a/tests/script/tsim/db/error1.sim +++ b/tests/script/tsim/db/error1.sim @@ -58,16 +58,16 @@ if $data23 != 0 then return -1 endi -print ========== stop dnode2 -system sh/exec.sh -n dnode2 -s stop -x SIGKILL +#print ========== stop dnode2 +#system sh/exec.sh -n dnode2 -s stop -x SIGKILL -sleep 1000 -print =============== drop database -sql_error drop database d1 +#sleep 1000 +#print =============== drop database +sql drop database d1 -print ========== start dnode2 -system sh/exec.sh -n dnode2 -s start -sleep 1000 +#print ========== start dnode2 +#system sh/exec.sh -n dnode2 -s start +#sleep 1000 print =============== re-create database $x = 0 diff --git a/tests/system-test/0-others/user_privilege.py b/tests/system-test/0-others/user_privilege.py new file mode 100644 index 0000000000..6d49ebfbfe --- /dev/null +++ b/tests/system-test/0-others/user_privilege.py @@ -0,0 +1,120 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import taos +from taos.tmq import * +from util.cases import * +from util.common import * +from util.log import * +from util.sql import * +from util.sqlset import * + + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + self.setsql = TDSetSql() + self.stbname = 'stb' + self.binary_length = 20 # the length of binary for column_dict + self.nchar_length = 20 # the length of nchar for column_dict + self.column_dict = { + 'ts': 'timestamp', + 'col1': 'float', + 'col2': 'int', + 'col3': 'float', + } + + self.tag_dict = { + 't1': 'int', + 't2': f'binary({self.binary_length})' + } + + self.tag_list = [ + f'1, "Beijing"', + f'2, "Shanghai"', + f'3, "Guangzhou"', + f'4, "Shenzhen"' + ] + + self.values_list = [ + f'now, 9.1, 200, 0.3' + ] + + self.tbnum = 4 + + def create_user(self): + user_name = 'test' + tdSql.execute(f'create user {user_name} pass "test"') + tdSql.execute(f'grant read on db.stb with t2 = "Beijing" to {user_name}') + + def prepare_data(self): + tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict)) + for i in range(self.tbnum): + tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_list[i]})') + for j in self.values_list: + tdSql.execute(f'insert into {self.stbname}_{i} values({j})') + + def user_privilege_check(self): + testconn = taos.connect(user='test', password='test') + expectErrNotOccured = False + + try: + sql = "select count(*) from db.stb where t2 = 'Beijing'" + res = testconn.query(sql) + data = res.fetch_all() + count = data[0][0] + except BaseException: + expectErrNotOccured = True + + if expectErrNotOccured: + caller = inspect.getframeinfo(inspect.stack()[1][0]) + tdLog.exit(f"{caller.filename}({caller.lineno}) failed: sql:{sql}, expect error not occured") + elif count != 1: + tdLog.exit(f"{sql}, expect result doesn't match") + pass + + def user_privilege_error_check(self): + testconn = taos.connect(user='test', password='test') + expectErrNotOccured = False + + sql_list = ["alter talbe db.stb_1 set t2 = 'Wuhan'", "drop table db.stb_1"] + + for sql in sql_list: + try: + res = testconn.execute(sql) + except BaseException: + expectErrNotOccured = True + + if expectErrNotOccured: + pass + else: + caller = inspect.getframeinfo(inspect.stack()[1][0]) + tdLog.exit(f"{caller.filename}({caller.lineno}) failed: sql:{sql}, expect error not occured") + pass + + def run(self): + tdSql.prepare() + self.prepare_data() + self.create_user() + self.user_privilege_check() + self.user_privilege_error_check() + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file diff --git a/tests/system-test/2-query/sml.py b/tests/system-test/2-query/sml.py index f96ed8a3ff..2f97118fbf 100644 --- a/tests/system-test/2-query/sml.py +++ b/tests/system-test/2-query/sml.py @@ -34,6 +34,9 @@ class TDTestCase: if ret != 0: tdLog.info("sml_test ret != 0") + tdSql.query(f"select * from ts3303.stb2") + tdSql.query(f"select * from ts3303.meters") + # tdSql.execute('use sml_db') tdSql.query(f"select * from {dbname}.t_b7d815c9222ca64cdf2614c61de8f211") tdSql.checkRows(1) diff --git a/tests/system-test/win-test-file b/tests/system-test/win-test-file index 85829ca7a7..c3fb57c49b 100644 --- a/tests/system-test/win-test-file +++ b/tests/system-test/win-test-file @@ -279,7 +279,7 @@ python3 ./test.py -f 7-tmq/subscribeDb1.py python3 ./test.py -f 7-tmq/subscribeDb2.py python3 ./test.py -f 7-tmq/subscribeDb3.py python3 ./test.py -f 7-tmq/subscribeDb4.py -python3 ./test.py -f 7-tmq/subscribeStb.py +#python3 ./test.py -f 7-tmq/subscribeStb.py python3 ./test.py -f 7-tmq/subscribeStb0.py python3 ./test.py -f 7-tmq/subscribeStb1.py python3 ./test.py -f 7-tmq/subscribeStb2.py diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 5ac32eaad9..0f91bdeeda 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -554,7 +554,12 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t if (tsEnableScience) { printf("%*e", width, GET_FLOAT_VAL(val)); } else { - printf("%*.5f", width, GET_FLOAT_VAL(val)); + n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.5f", width, GET_FLOAT_VAL(val)); + if (n > TMAX(20, width)) { + printf("%*e", width, GET_FLOAT_VAL(val)); + } else { + printf("%s", buf); + } } break; case TSDB_DATA_TYPE_DOUBLE: diff --git a/utils/test/c/sml_test.c b/utils/test/c/sml_test.c index f1f4bbc1fd..f1dc8ebe79 100644 --- a/utils/test/c/sml_test.c +++ b/utils/test/c/sml_test.c @@ -1159,6 +1159,57 @@ int sml_td23881_Test() { return code; } +int sml_ts3303_Test() { + TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); + + TAOS_RES *pRes = taos_query(taos, "drop database if exists ts3303"); + taos_free_result(pRes); + + pRes = taos_query(taos, "create database if not exists ts3303"); + taos_free_result(pRes); + + const char *sql[] = { + "stb2,t1=1,dataModelName=t0 f1=283i32 1632299372000", + "stb2,t1=1,dataModelName=t0 f1=106i32 1632299378000", + "stb2,t1=4,dataModelName=t0 f1=144i32 1629716944000", + "stb2,t1=4,dataModelName=t0 f1=125i32 1629717012000", + "stb2,t1=4,dataModelName=t0 f1=144i32 1629717012000", + "stb2,t1=4,dataModelName=t0 f1=107i32 1629717013000", + "stb2,t1=6,dataModelName=t0 f1=154i32 1629717140000", + "stb2,t1=6,dataModelName=t0 f1=93i32 1629717140000", + "stb2,t1=6,dataModelName=t0 f1=134i32 1629717140000", + "stb2,t1=4,dataModelName=t0 f1=73i32 1629717140000", + "stb2,t1=4,dataModelName=t0 f1=83i32 1629717140000", + "stb2,t1=4,dataModelName=t0 f1=72i32 1629717140000", + }; + + const char *sql1[] = { + "meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=\"2022-02-0210:22:22\" 1626006833339000000", + "meters,groupid=2,location=California.LosAngeles current=11.8,voltage=221,phase=\"2022-02-0210:22:22\" 1626006833339000000", + }; + + pRes = taos_query(taos, "use ts3303"); + taos_free_result(pRes); + + pRes = taos_schemaless_insert_ttl(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL, + TSDB_SML_TIMESTAMP_MILLI_SECONDS, 20); + + int code = taos_errno(pRes); + printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT(code == 0); + + pRes = taos_schemaless_insert_ttl(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_LINE_PROTOCOL, + TSDB_SML_TIMESTAMP_NANO_SECONDS, 20); + + printf("%s result1:%s\n", __FUNCTION__, taos_errstr(pRes)); + taos_free_result(pRes); + + taos_close(taos); + + return code; +} + int sml_ttl_Test() { TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -1336,6 +1387,9 @@ int main(int argc, char *argv[]) { ASSERT(!ret); ret = sml_ts2385_Test(); // this test case need config sml table name using ./sml_test config_file ASSERT(!ret); + ret = sml_ts3303_Test(); // this test case need config sml table name using ./sml_test config_file + ASSERT(!ret); + // for(int i = 0; i < sizeof(str)/sizeof(str[0]); i++){ // printf("str:%s \t %d\n", str[i], smlCalTypeSum(str[i], strlen(str[i]))); // }