diff --git a/include/os/osFile.h b/include/os/osFile.h index 143b4bf2f8..89b58cdd65 100644 --- a/include/os/osFile.h +++ b/include/os/osFile.h @@ -44,7 +44,7 @@ extern "C" { typedef struct TdFile *TdFilePtr; -#define TD_FILE_CTEATE 0x0001 +#define TD_FILE_CREATE 0x0001 #define TD_FILE_WRITE 0x0002 #define TD_FILE_READ 0x0004 #define TD_FILE_TRUNC 0x0008 diff --git a/source/common/src/ttszip.c b/source/common/src/ttszip.c index 15e741d307..3160d64c12 100644 --- a/source/common/src/ttszip.c +++ b/source/common/src/ttszip.c @@ -39,7 +39,7 @@ STSBuf* tsBufCreate(bool autoDelete, int32_t order) { taosGetTmpfilePath(tsTempDir, "join", pTSBuf->path); // pTSBuf->pFile = fopen(pTSBuf->path, "wb+"); - pTSBuf->pFile = taosOpenFile(pTSBuf->path, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); + pTSBuf->pFile = taosOpenFile(pTSBuf->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC); if (pTSBuf->pFile == NULL) { taosMemoryFree(pTSBuf); return NULL; diff --git a/source/dnode/mgmt/dm/dmFile.c b/source/dnode/mgmt/dm/dmFile.c index 1ac86870e3..c1964ac8c4 100644 --- a/source/dnode/mgmt/dm/dmFile.c +++ b/source/dnode/mgmt/dm/dmFile.c @@ -162,7 +162,7 @@ int32_t dmWriteFile(SDnodeMgmt *pMgmt) { char file[PATH_MAX]; snprintf(file, sizeof(file), "%s%sdnode.json.bak", pMgmt->path, TD_DIRSEP); - TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); + TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { dError("failed to write %s since %s", file, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); diff --git a/source/dnode/mgmt/main/dndFile.c b/source/dnode/mgmt/main/dndFile.c index d4d2a3e6ea..905624f072 100644 --- a/source/dnode/mgmt/main/dndFile.c +++ b/source/dnode/mgmt/main/dndFile.c @@ -75,7 +75,7 @@ int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed) { snprintf(file, sizeof(file), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name); snprintf(realfile, sizeof(realfile), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name); - pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); + pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); dError("failed to write %s since %s", file, terrstr()); @@ -121,7 +121,7 @@ TdFilePtr dndCheckRunning(const char *dataDir) { char filepath[PATH_MAX] = {0}; snprintf(filepath, sizeof(filepath), "%s%s.running", dataDir, TD_DIRSEP); - TdFilePtr pFile = taosOpenFile(filepath, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); + TdFilePtr pFile = taosOpenFile(filepath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); dError("failed to lock file:%s since %s", filepath, terrstr()); @@ -218,7 +218,7 @@ int32_t dndWriteShmFile(SDnode *pDnode) { snprintf(file, sizeof(file), "%s%s.shmfile.bak", pDnode->dataDir, TD_DIRSEP); snprintf(realfile, sizeof(realfile), "%s%s.shmfile", pDnode->dataDir, TD_DIRSEP); - pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); + pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); dError("failed to open file:%s since %s", file, terrstr()); diff --git a/source/dnode/mgmt/mm/mmFile.c b/source/dnode/mgmt/mm/mmFile.c index 57e1c0cb92..44027780de 100644 --- a/source/dnode/mgmt/mm/mmFile.c +++ b/source/dnode/mgmt/mm/mmFile.c @@ -109,7 +109,7 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, bool deployed) { char file[PATH_MAX]; snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP); - TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); + TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); dError("failed to write %s since %s", file, terrstr()); diff --git a/source/dnode/mgmt/vm/vmFile.c b/source/dnode/mgmt/vm/vmFile.c index ba59482c1a..7e00a022b2 100644 --- a/source/dnode/mgmt/vm/vmFile.c +++ b/source/dnode/mgmt/vm/vmFile.c @@ -154,7 +154,7 @@ int32_t vmWriteVnodesToFile(SVnodesMgmt *pMgmt) { snprintf(file, sizeof(file), "%s%svnodes.json.bak", pMgmt->path, TD_DIRSEP); snprintf(realfile, sizeof(file), "%s%svnodes.json", pMgmt->path, TD_DIRSEP); - TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); + TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); dError("failed to write %s since %s", file, terrstr()); diff --git a/source/dnode/mnode/impl/test/trans/trans.cpp b/source/dnode/mnode/impl/test/trans/trans.cpp index 40b052400b..560b30d13c 100644 --- a/source/dnode/mnode/impl/test/trans/trans.cpp +++ b/source/dnode/mnode/impl/test/trans/trans.cpp @@ -38,7 +38,7 @@ class MndTestTrans : public ::testing::Test { test.ServerStop(); - pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); + pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); int32_t writeLen = taosWriteFile(pFile, buffer, readLen); if (writeLen < 0 || writeLen == readLen) { ASSERT(1); diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index c11025beed..f61899766e 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -232,7 +232,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { mDebug("start to write file:%s, current ver:%" PRId64 ", commit ver:%" PRId64, curfile, pSdb->curVer, pSdb->lastCommitVer); - TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); + TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); mError("failed to open file:%s for write since %s", tmpfile, terrstr()); diff --git a/source/dnode/vnode/src/tq/tqMetaStore.c b/source/dnode/vnode/src/tq/tqMetaStore.c index beb19f48f1..84f12f93c6 100644 --- a/source/dnode/vnode/src/tq/tqMetaStore.c +++ b/source/dnode/vnode/src/tq/tqMetaStore.c @@ -95,7 +95,7 @@ STqMetaStore* tqStoreOpen(STQ* pTq, const char* path, FTqSerialize serializer, F tqError("failed to create dir:%s since %s ", name, terrstr()); } strcat(name, "/" TQ_IDX_NAME); - TdFilePtr pIdxFile = taosOpenFile(name, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_READ); + TdFilePtr pIdxFile = taosOpenFile(name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ); if (pIdxFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); tqError("failed to open file:%s since %s ", name, terrstr()); @@ -113,7 +113,7 @@ STqMetaStore* tqStoreOpen(STQ* pTq, const char* path, FTqSerialize serializer, F strcpy(name, path); strcat(name, "/" TQ_META_NAME); - TdFilePtr pFile = taosOpenFile(name, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_READ); + TdFilePtr pFile = taosOpenFile(name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ); if (pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); tqError("failed to open file:%s since %s", name, terrstr()); diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 1f42e90616..eff350ddda 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -416,7 +416,7 @@ static int tsdbSaveFSStatus(STsdb *pRepo, SFSStatus *pStatus) { tsdbGetTxnFname(pRepo, TSDB_TXN_TEMP_FILE, tfname); tsdbGetTxnFname(pRepo, TSDB_TXN_CURR_FILE, cfname); - TdFilePtr pFile = taosOpenFile(tfname, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); + TdFilePtr pFile = taosOpenFile(tfname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; diff --git a/source/dnode/vnode/src/tsdb/tsdbFile.c b/source/dnode/vnode/src/tsdb/tsdbFile.c index 6f96aff848..8e5a6afc4d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile.c @@ -360,7 +360,7 @@ static void *tsdbDecodeSDFileEx(void *buf, SDFile *pDFile) { int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType) { ASSERT(pDFile->info.size == 0 && pDFile->info.magic == TSDB_FILE_INIT_MAGIC); - pDFile->pFile = taosOpenFile(TSDB_FILE_FULL_NAME(pDFile), TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); + pDFile->pFile = taosOpenFile(TSDB_FILE_FULL_NAME(pDFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pDFile->pFile == NULL) { if (errno == ENOENT) { // Try to create directory recursively @@ -371,7 +371,7 @@ int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader, TSDB_FILE_T } taosMemoryFreeClear(s); - pDFile->pFile = taosOpenFile(TSDB_FILE_FULL_NAME(pDFile), TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); + pDFile->pFile = taosOpenFile(TSDB_FILE_FULL_NAME(pDFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pDFile->pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 53b15a82af..133e8f9c93 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -212,6 +212,9 @@ bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { case TSDB_DATA_TYPE_UTINYINT: *((uint8_t *)buf) = 0; break; + case TSDB_DATA_TYPE_BOOL: + *((int8_t*)buf) = 0; + break; default: assert(0); } @@ -255,6 +258,9 @@ bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { case TSDB_DATA_TYPE_DOUBLE: SET_DOUBLE_VAL(((double *)buf), DBL_MAX); break; + case TSDB_DATA_TYPE_BOOL: + *((int8_t*)buf) = 1; + break; default: assert(0); } @@ -385,8 +391,8 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) { int32_t start = pInput->startRowIndex; int32_t numOfRows = pInput->numOfRows; - if (IS_SIGNED_NUMERIC_TYPE(type)) { - if (type == TSDB_DATA_TYPE_TINYINT) { + if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) { + if (type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_BOOL) { LOOPCHECK_N(*(int8_t*)buf, pCol, pCtx, int8_t, numOfRows, start, isMinFunc, numOfElems); } else if (type == TSDB_DATA_TYPE_SMALLINT) { LOOPCHECK_N(*(int16_t*) buf, pCol, pCtx, int16_t, numOfRows, start, isMinFunc, numOfElems); diff --git a/source/libs/index/src/indexFstCountingWriter.c b/source/libs/index/src/indexFstCountingWriter.c index 76ff4309b5..1d4395aff6 100644 --- a/source/libs/index/src/indexFstCountingWriter.c +++ b/source/libs/index/src/indexFstCountingWriter.c @@ -92,7 +92,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int ctx->file.readOnly = readOnly; if (readOnly == false) { // ctx->file.pFile = open(path, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); - ctx->file.pFile = taosOpenFile(path, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_APPEND); + ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); taosFtruncateFile(ctx->file.pFile, 0); int64_t file_size; taosStatFile(path, &file_size, NULL); diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 9d945039b8..435ab317e6 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -605,6 +605,12 @@ typedef struct SMemParam { static FORCE_INLINE int32_t MemRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param) { SMemParam* pa = (SMemParam*)param; SRowBuilder* rb = pa->rb; + + if (value == NULL) { // it is a null data + tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset, pa->colIdx); + return TSDB_CODE_SUCCESS; + } + if (TSDB_DATA_TYPE_BINARY == pa->schema->type) { const char* rowEnd = tdRowEnd(rb->pBuf); STR_WITH_SIZE_TO_VARSTR(rowEnd, value, len); @@ -621,14 +627,9 @@ static FORCE_INLINE int32_t MemRowAppend(SMsgBuf* pMsgBuf, const void* value, in varDataSetLen(rowEnd, output); tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx); } else { - if (value == NULL) { // it is a null data - tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset, - pa->colIdx); - } else { - tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, false, pa->toffset, - pa->colIdx); - } + tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, false, pa->toffset, pa->colIdx); } + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index c9054d088e..d6f2e91de7 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -57,7 +57,7 @@ SRaftStore *raftStoreOpen(const char *path) { static int32_t raftStoreInit(SRaftStore *pRaftStore) { assert(pRaftStore != NULL); - pRaftStore->pFile = taosOpenFile(pRaftStore->path, TD_FILE_CTEATE | TD_FILE_WRITE); + pRaftStore->pFile = taosOpenFile(pRaftStore->path, TD_FILE_CREATE | TD_FILE_WRITE); assert(pRaftStore->pFile != NULL); pRaftStore->currentTerm = 0; diff --git a/source/libs/tdb/src/inc/tdbOs.h b/source/libs/tdb/src/inc/tdbOs.h index 1d87285091..503e109adb 100644 --- a/source/libs/tdb/src/inc/tdbOs.h +++ b/source/libs/tdb/src/inc/tdbOs.h @@ -37,7 +37,7 @@ extern "C" { /* file */ typedef TdFilePtr tdb_fd_t; -#define TDB_O_CREAT TD_FILE_CTEATE +#define TDB_O_CREAT TD_FILE_CREATE #define TDB_O_WRITE TD_FILE_WRITE #define TDB_O_READ TD_FILE_READ #define TDB_O_TRUNC TD_FILE_TRUNC diff --git a/source/libs/tfs/test/tfsTest.cpp b/source/libs/tfs/test/tfsTest.cpp index 1a093c3877..58c3a83aff 100644 --- a/source/libs/tfs/test/tfsTest.cpp +++ b/source/libs/tfs/test/tfsTest.cpp @@ -231,7 +231,7 @@ TEST_F(TfsTest, 04_File) { EXPECT_EQ(tfsMkdir(pTfs, "t3"), 0); // FILE *fp = fopen(f1.aname, "w"); - TdFilePtr pFile = taosOpenFile(f1.aname, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); + TdFilePtr pFile = taosOpenFile(f1.aname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); ASSERT_NE(pFile, nullptr); taosWriteFile(pFile, "12345678", 5); taosCloseFile(&pFile); @@ -640,7 +640,7 @@ TEST_F(TfsTest, 05_MultiDisk) { EXPECT_EQ(tfsMkdir(pTfs, "t3"), 0); // FILE *fp = fopen(f1.aname, "w"); - TdFilePtr pFile = taosOpenFile(f1.aname, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); + TdFilePtr pFile = taosOpenFile(f1.aname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); ASSERT_NE(pFile, nullptr); taosWriteFile(pFile, "12345678", 5); taosCloseFile(&pFile); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index d456cb23a4..c655c0bfc9 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -129,6 +129,12 @@ static void transDestroyConnCtx(STransConnCtx* ctx); static SCliThrdObj* createThrdObj(); static void destroyThrdObj(SCliThrdObj* pThrd); +// snprintf may cause performance problem +#define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \ + do { \ + snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \ + } while (0) + #define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1) #define CONN_PERSIST_TIME(para) (para * 1000 * 10) #define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL) @@ -206,8 +212,10 @@ static void destroyThrdObj(SCliThrdObj* pThrd); } \ } while (0) -#define CONN_NO_PERSIST_BY_APP(conn) (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) -#define CONN_RELEASE_BY_SERVER(conn) (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) +#define CONN_NO_PERSIST_BY_APP(conn) \ + (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) +#define CONN_RELEASE_BY_SERVER(conn) \ + (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) #define REQUEST_NO_RESP(msg) ((msg)->noResp == 1) #define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1) @@ -282,8 +290,9 @@ void cliHandleResp(SCliConn* conn) { tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); } - tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, TMSG_INFO(pHead->msgType), - taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen); + tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, + TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), + taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen); conn->secured = pHead->secured; @@ -349,10 +358,12 @@ void cliHandleExcept(SCliConn* pConn) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); - tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle, TMSG_INFO(transMsg.msgType)); + tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle, + TMSG_INFO(transMsg.msgType)); if (transMsg.ahandle == NULL) { transMsg.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType)); - tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle); + tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn, + transMsg.ahandle); } } else { transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; @@ -423,8 +434,7 @@ void* destroyConnPool(void* pool) { static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) { char key[128] = {0}; - tstrncpy(key, ip, strlen(ip)); - tstrncpy(key + strlen(key), (char*)(&port), sizeof(port)); + CONN_CONSTRUCT_HASH_KEY(key, ip, port); SHashObj* pPool = pool; SConnList* plist = taosHashGet(pPool, key, strlen(key)); @@ -456,8 +466,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { conn->status = ConnInPool; char key[128] = {0}; - tstrncpy(key, conn->ip, strlen(conn->ip)); - tstrncpy(key + strlen(key), (char*)(&conn->port), sizeof(conn->port)); + CONN_CONSTRUCT_HASH_KEY(key, conn->ip, conn->port); tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap); SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); @@ -626,8 +635,9 @@ void cliSend(SCliConn* pConn) { pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); - tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pHead->msgType), - taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); + tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, + TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), + taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); if (pHead->persist == 1) { CONN_SET_PERSIST_BY_APP(pConn); diff --git a/source/libs/transport/test/pushServer.c b/source/libs/transport/test/pushServer.c index f4ad73f743..3099998f57 100644 --- a/source/libs/transport/test/pushServer.c +++ b/source/libs/transport/test/pushServer.c @@ -181,7 +181,7 @@ int main(int argc, char *argv[]) { tInfo("RPC server is running, ctrl-c to exit"); if (commit) { - pDataFile = taosOpenFile(dataName, TD_FILE_APPEND | TD_FILE_CTEATE | TD_FILE_WRITE); + pDataFile = taosOpenFile(dataName, TD_FILE_APPEND | TD_FILE_CREATE | TD_FILE_WRITE); if (pDataFile == NULL) tInfo("failed to open data file, reason:%s", strerror(errno)); } qhandle = taosOpenQueue(); diff --git a/source/libs/transport/test/rserver.c b/source/libs/transport/test/rserver.c index 8ed3bbc960..14d109dc5a 100644 --- a/source/libs/transport/test/rserver.c +++ b/source/libs/transport/test/rserver.c @@ -177,7 +177,7 @@ int main(int argc, char *argv[]) { tInfo("RPC server is running, ctrl-c to exit"); if (commit) { - pDataFile = taosOpenFile(dataName, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_APPEND); + pDataFile = taosOpenFile(dataName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); if (pDataFile == NULL) tInfo("failed to open data file, reason:%s", strerror(errno)); } qhandle = taosOpenQueue(); diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 36323cdffa..9f8fe8d84d 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -360,7 +360,7 @@ int walSaveMeta(SWal* pWal) { int metaVer = walFindCurMetaVer(pWal); char fnameStr[WAL_FILE_LEN]; walBuildMetaName(pWal, metaVer + 1, fnameStr); - TdFilePtr pMataFile = taosOpenFile(fnameStr, TD_FILE_CTEATE | TD_FILE_WRITE); + TdFilePtr pMataFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE); if (pMataFile == NULL) { return -1; } diff --git a/source/libs/wal/src/walSeek.c b/source/libs/wal/src/walSeek.c index 413dcb47f0..a6f238af6b 100644 --- a/source/libs/wal/src/walSeek.c +++ b/source/libs/wal/src/walSeek.c @@ -56,13 +56,13 @@ int walSetWrite(SWal* pWal) { char fnameStr[WAL_FILE_LEN]; walBuildIdxName(pWal, fileFirstVer, fnameStr); - pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_APPEND); + pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); if (pIdxTFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } walBuildLogName(pWal, fileFirstVer, fnameStr); - pLogTFile = taosOpenFile(fnameStr, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_APPEND); + pLogTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); if (pLogTFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -102,14 +102,14 @@ int walChangeWrite(SWal* pWal, int64_t ver) { int64_t fileFirstVer = pFileInfo->firstVer; walBuildIdxName(pWal, fileFirstVer, fnameStr); - pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_APPEND); + pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); if (pIdxTFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); pWal->pWriteIdxTFile = NULL; return -1; } walBuildLogName(pWal, fileFirstVer, fnameStr); - pLogTFile = taosOpenFile(fnameStr, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_APPEND); + pLogTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); if (pLogTFile == NULL) { taosCloseFile(&pIdxTFile); terrno = TAOS_SYSTEM_ERROR(errno); diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index a578c6f368..81f2d82ea5 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -216,13 +216,13 @@ int walRoll(SWal *pWal) { int64_t newFileFirstVersion = pWal->vers.lastVer + 1; char fnameStr[WAL_FILE_LEN]; walBuildIdxName(pWal, newFileFirstVersion, fnameStr); - pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_APPEND); + pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); if (pIdxTFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } walBuildLogName(pWal, newFileFirstVersion, fnameStr); - pLogTFile = taosOpenFile(fnameStr, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_APPEND); + pLogTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); if (pLogTFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index a7855539a4..4bd6b9e5cd 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -129,7 +129,7 @@ int64_t taosCopyFile(const char *from, const char *to) { if (pFileFrom == NULL) goto _err; // fidto = open(to, O_WRONLY | O_CREAT | O_EXCL, 0755); - TdFilePtr pFileTo = taosOpenFile(to, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_EXCL); + TdFilePtr pFileTo = taosOpenFile(to, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_EXCL); if (pFileTo == NULL) goto _err; while (true) { @@ -246,7 +246,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { } } else { int access = O_BINARY; - access |= (tdFileOptions & TD_FILE_CTEATE) ? O_CREAT : 0; + access |= (tdFileOptions & TD_FILE_CREATE) ? O_CREAT : 0; if ((tdFileOptions & TD_FILE_WRITE) && (tdFileOptions & TD_FILE_READ)) { access |= O_RDWR; } else if (tdFileOptions & TD_FILE_WRITE) { diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 46e2d567ce..b37b1b2f69 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -199,7 +199,7 @@ static void *taosThreadToOpenNewFile(void *param) { taosUmaskFile(0); - TdFilePtr pFile = taosOpenFile(name, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); + TdFilePtr pFile = taosOpenFile(name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { tsLogObj.openInProgress = 0; tsLogObj.lines = tsLogObj.maxLines - 1000; @@ -348,7 +348,7 @@ static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) { taosThreadMutexInit(&tsLogObj.logMutex, NULL); taosUmaskFile(0); - tsLogObj.logHandle->pFile = taosOpenFile(fileName, TD_FILE_CTEATE | TD_FILE_WRITE); + tsLogObj.logHandle->pFile = taosOpenFile(fileName, TD_FILE_CREATE | TD_FILE_WRITE); if (tsLogObj.logHandle->pFile == NULL) { printf("\nfailed to open log file:%s, reason:%s\n", fileName, strerror(errno)); @@ -699,7 +699,7 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) { goto cmp_end; } - TdFilePtr pFile = taosOpenFile(destFileName, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); + TdFilePtr pFile = taosOpenFile(destFileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { ret = -2; goto cmp_end; diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index 48cf97a11f..1ad12ef43a 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -48,7 +48,7 @@ struct SDiskbasedBuf { }; static int32_t createDiskFile(SDiskbasedBuf* pBuf) { - pBuf->pFile = taosOpenFile(pBuf->path, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); + pBuf->pFile = taosOpenFile(pBuf->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL); if (pBuf->pFile == NULL) { return TAOS_SYSTEM_ERROR(errno); } diff --git a/tests/script/tsim/tmq/consumerMain.sim b/tests/script/tsim/tmq/consumerMain.sim new file mode 100644 index 0000000000..51b90971fd --- /dev/null +++ b/tests/script/tsim/tmq/consumerMain.sim @@ -0,0 +1,267 @@ +#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406 +# scene1: vgroups=1, one topic for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb +# scene2: vgroups=1, multi topics for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb +# scene3: vgroups=4, one topic for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb +# scene4: vgroups=4, multi topics for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb +# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN +# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5; +# +# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval). +# +######## ######## ######## ######## ######## ######## ######## ######## ######## ######## +######## This test case include scene1 and scene3 +######## ######## ######## ######## ######## ######## ######## ######## ######## ######## + +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/cfg.sh -n dnode1 +system sh/exec.sh -n dnode1 -s start + +$loop_cnt = 0 +check_dnode_ready: + $loop_cnt = $loop_cnt + 1 + sleep 200 + if $loop_cnt == 10 then + print ====> dnode not ready! + return -1 + endi +sql show dnodes +print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05 +if $data00 != 1 then + return -1 +endi +if $data04 != ready then + goto check_dnode_ready +endi + +sql connect + +$loop_cnt = 0 +$vgroups = 1 +$dbNamme = d0 +loop_vgroups: +print =============== create database $dbNamme vgroups $vgroups +sql create database $dbNamme vgroups $vgroups +sql show databases +print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09 +print $data10 $data11 $data12 $data13 $data14 $data15 $data16 $data17 $data18 $data19 +print $data20 $data21 $data22 $data23 $data24 $data25 $data26 $data27 $data28 $data29 + +if $loop_cnt == 0 then + if $rows != 2 then + return -1 + endi + if $data02 != 1 then # vgroups + print vgroups: $data02 + return -1 + endi +else + if $rows != 3 then + return -1 + endi + if $data00 == d1 then + if $data02 != 4 then # vgroups + print vgroups: $data02 + return -1 + endi + else + if $data12 != 4 then # vgroups + print vgroups: $data12 + return -1 + endi + endi +endi + +sql use $dbNamme + +print =============== create super table +sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 binary(10)) tags (t1 int) + +sql show stables +if $rows != 1 then + return -1 +endi + +print =============== create child table +$tbPrefix = ct +$tbNum = 2 + +$i = 0 +while $i < $tbNum + $tb = $tbPrefix . $i + sql create table $tb using stb tags( $i ) + $i = $i + 1 +endw + +print =============== create normal table +sql create table ntb (ts timestamp, c1 int, c2 float, c3 binary(10)) + +print =============== create multi topics. notes: now only support: +print =============== 1. columns from stb/ctb/ntb; 2. * from ctb/ntb; 3. function from stb/ctb/ntb +print =============== will support: * from stb + +sql create topic topic_stb_column as select ts, c1, c3 from stb +#sql create topic topic_stb_all as select * from stb +sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb + +sql create topic topic_ctb_column as select ts, c1, c3 from ct0 +sql create topic topic_ctb_all as select * from ct0 +sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ct0 + +sql create topic topic_ntb_column as select ts, c1, c3 from ntb +sql create topic topic_ntb_all as select * from ntb +sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb + +sql show tables +if $rows != 3 then + return -1 +endi + +print =============== run_back insert data + +if $loop_cnt == 0 then + run_back tsim/tmq/insertDataV1.sim +else + run_back tsim/tmq/insertDataV4.sim +endi + +sleep 1000 + +#$rowNum = 1000 +#$tstart = 1640966400000 # 2022-01-01 00:00:00.000 +# +#$i = 0 +#while $i < $tbNum +# $tb = $tbPrefix . $i +# +# $x = 0 +# while $x < $rowNum +# $c = $x / 10 +# $c = $c * 10 +# $c = $x - $c +# +# $binary = ' . binary +# $binary = $binary . $c +# $binary = $binary . ' +# +# sql insert into $tb values ($tstart , $c , $x , $binary ) +# sql insert into ntb values ($tstart , $c , $x , $binary ) +# $tstart = $tstart + 1 +# $x = $x + 1 +# endw +# +# $i = $i + 1 +# $tstart = 1640966400000 +#endw + +#root@trd02 /home $ tmq_sim --help +# -c Configuration directory, default is +# -d The name of the database for cosumer, no default +# -t The topic string for cosumer, no default +# -k The key-value string for cosumer, no default +# -g showMsgFlag, default is 0 +# + +$consumeDelay = 5000 +$expectConsumeMsgCnt = 1000 +print expectConsumeMsgCnt: $expectConsumeMsgCnt, consumeDelay: $consumeDelay + +# supported key: +# group.id: +# enable.auto.commit: +# auto.offset.reset: +# td.connect.ip: +# td.connect.user:root +# td.connect.pass:taosdata +# td.connect.port:6030 +# td.connect.db:db + +$expect_result = @{consume success: @ +$expect_result = $expect_result . $expectConsumeMsgCnt +$expect_result = $expect_result . @, @ +$expect_result = $expect_result . 0} +print expect_result----> $expect_result +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +print cmd result----> $system_content +if $system_content >= $expect_result then + return -1 +endi + +#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +#print cmd result----> $system_content +##if $system_content != @{consume success: 10000, 0}@ then +#if $system_content != $expect_result then +# return -1 +#endi + +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +print cmd result----> $system_content +#if $system_content != @{consume success: 10000, 0}@ then +if $system_content >= $expect_result then + return -1 +endi + +$expect_result = @{consume success: @ +$expect_result = $expect_result . $rowNum +$expect_result = $expect_result . @, @ +$expect_result = $expect_result . 0} +print expect_result----> $expect_result +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +print cmd result----> $system_content +if $system_content >= $expect_result then + return -1 +endi + +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +print cmd result----> $system_content +if $system_content >= $expect_result then + return -1 +endi + +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +print cmd result----> $system_content +if $system_content >= $expect_result then + return -1 +endi + +$expect_result = @{consume success: @ +$expect_result = $expect_result . $totalMsgCnt +$expect_result = $expect_result . @, @ +$expect_result = $expect_result . 0} +print expect_result----> $expect_result +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +print cmd result----> $system_content +if $system_content >= $expect_result then + return -1 +endi + +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +print cmd result----> $system_content +if $system_content >= $expect_result then + return -1 +endi + +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt +print cmd result----> $system_content +if $system_content >= $expect_result then + return -1 +endi + +if $loop_cnt == 0 then + $loop_cnt = 1 + $vgroups = 4 + $dbNamme = d1 + goto loop_vgroups +endi + + +#system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/tmq/insertDataV1.sim b/tests/script/tsim/tmq/insertDataV1.sim new file mode 100644 index 0000000000..e8ca028269 --- /dev/null +++ b/tests/script/tsim/tmq/insertDataV1.sim @@ -0,0 +1,41 @@ + +sql connect + +print ================ insert data +$dbNamme = d0 +$tbPrefix = ct +$tbNum = 2 +$rowNum = 100 +$tstart = 1640966400000 # 2022-01-01 00:00:00.000 + +sql use $dbNamme + +loop_insert: +$i = 0 +while $i < $tbNum + $tb = $tbPrefix . $i + + $x = 0 + while $x < $rowNum + $c = $x / 10 + $c = $c * 10 + $c = $x - $c + + $binary = ' . binary + $binary = $binary . $c + $binary = $binary . ' + + #print ====> insert into $tb values ($tstart , $c , $x , $binary ) + #print ====> insert into ntb values ($tstart , $c , $x , $binary ) + sql insert into $tb values ($tstart , $c , $x , $binary ) + sql insert into ntb values ($tstart , $c , $x , $binary ) + $tstart = $tstart + 1 + $x = $x + 1 + endw + + $i = $i + 1 + $tstart = 1640966400000 +endw +goto loop_insert + + diff --git a/tests/script/tsim/tmq/insertDataV4.sim b/tests/script/tsim/tmq/insertDataV4.sim new file mode 100644 index 0000000000..9727d10a69 --- /dev/null +++ b/tests/script/tsim/tmq/insertDataV4.sim @@ -0,0 +1,41 @@ + +sql connect + +print ================ insert data +$dbNamme = d1 +$tbPrefix = ct +$tbNum = 2 +$rowNum = 100 +$tstart = 1640966400000 # 2022-01-01 00:00:00.000 + +sql use $dbNamme + +loop_insert: +$i = 0 +while $i < $tbNum + $tb = $tbPrefix . $i + + $x = 0 + while $x < $rowNum + $c = $x / 10 + $c = $c * 10 + $c = $x - $c + + $binary = ' . binary + $binary = $binary . $c + $binary = $binary . ' + + #print ====> insert into $tb values ($tstart , $c , $x , $binary ) + #print ====> insert into ntb values ($tstart , $c , $x , $binary ) + sql insert into $tb values ($tstart , $c , $x , $binary ) + sql insert into ntb values ($tstart , $c , $x , $binary ) + $tstart = $tstart + 1 + $x = $x + 1 + endw + + $i = $i + 1 + $tstart = 1640966400000 +endw +goto loop_insert + + diff --git a/tests/test/c/tmqDemo.c b/tests/test/c/tmqDemo.c index d339166d74..45a9d31f4c 100644 --- a/tests/test/c/tmqDemo.c +++ b/tests/test/c/tmqDemo.c @@ -588,7 +588,7 @@ int32_t syncWriteDataByRatio() { void printParaIntoFile() { // FILE *fp = fopen(g_stConfInfo.resultFileName, "a"); - TdFilePtr pFile = taosOpenFile(g_stConfInfo.resultFileName, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_STREAM); + TdFilePtr pFile = taosOpenFile(g_stConfInfo.resultFileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_STREAM); if (NULL == pFile) { fprintf(stderr, "Failed to open %s for save result\n", g_stConfInfo.resultFileName); exit -1; diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 236d1e2eed..dc375dd35a 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -42,6 +42,8 @@ typedef struct { char topicString[256]; char keyString[1024]; int32_t showMsgFlag; + int32_t consumeDelay; // unit s + int32_t consumeMsgCnt; // save result after parse agrvs int32_t numOfTopic; @@ -71,12 +73,19 @@ static void printHelp() { printf("%s%s%s\n", indent, indent, "The key-value string for cosumer, no default "); printf("%s%s\n", indent, "-g"); printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag); + printf("%s%s\n", indent, "-y"); + printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay); + printf("%s%s\n", indent, "-m"); + printf("%s%s%s%d\n", indent, indent, "consume msg count, default is s", g_stConfInfo.consumeMsgCnt); exit(EXIT_SUCCESS); } void parseArgument(int32_t argc, char *argv[]) { memset(&g_stConfInfo, 0, sizeof(SConfInfo)); + g_stConfInfo.showMsgFlag = 0; + g_stConfInfo.consumeDelay = 8000; + g_stConfInfo.consumeMsgCnt = 0; for (int32_t i = 1; i < argc; i++) { if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) { @@ -92,6 +101,10 @@ void parseArgument(int32_t argc, char *argv[]) { strcpy(g_stConfInfo.keyString, argv[++i]); } else if (strcmp(argv[i], "-g") == 0) { g_stConfInfo.showMsgFlag = atol(argv[++i]); + } else if (strcmp(argv[i], "-y") == 0) { + g_stConfInfo.consumeDelay = atol(argv[++i]); + } else if (strcmp(argv[i], "-m") == 0) { + g_stConfInfo.consumeMsgCnt = atol(argv[++i]); } else { printf("%s unknow para: %s %s", GREEN, argv[++i], NC); exit(-1); @@ -256,6 +269,48 @@ void loop_consume(tmq_t* tmq) { printf("{consume success: %d, %d}", totalMsgs, totalRows); } + +void parallel_consume(tmq_t* tmq) { + tmq_resp_err_t err; + + int32_t totalMsgs = 0; + int32_t totalRows = 0; + int32_t skipLogNum = 0; + while (running) { + tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, g_stConfInfo.consumeDelay * 1000); + if (tmqMsg) { + totalMsgs++; + + #if 0 + TAOS_ROW row; + while (NULL != (row = tmq_get_row(tmqMsg))) { + totalRows++; + } + #endif + + skipLogNum += tmqGetSkipLogNum(tmqMsg); + if (0 != g_stConfInfo.showMsgFlag) { + msg_process(tmqMsg); + } + tmq_message_destroy(tmqMsg); + + if (totalMsgs >= g_stConfInfo.consumeMsgCnt) { + break; + } + } else { + break; + } + } + + err = tmq_consumer_close(tmq); + if (err) { + printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); + exit(-1); + } + + printf("%d", totalMsgs); // output to sim for check result +} + int main(int32_t argc, char *argv[]) { parseArgument(argc, argv); parseInputString(); @@ -271,8 +326,12 @@ int main(int32_t argc, char *argv[]) { printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); exit(-1); } - - loop_consume(tmq); + + if (0 == g_stConfInfo.consumeMsgCnt) { + loop_consume(tmq); + } else { + parallel_consume(tmq); + } err = tmq_unsubscribe(tmq); if (err) { diff --git a/tests/tsim/src/simExe.c b/tests/tsim/src/simExe.c index 9a0b48197a..8ef5c816c8 100644 --- a/tests/tsim/src/simExe.c +++ b/tests/tsim/src/simExe.c @@ -22,7 +22,7 @@ void simLogSql(char *sql, bool useSharp) { sprintf(filename, "%s/sim.sql", simScriptDir); if (pFile == NULL) { // fp = fopen(filename, "w"); - pFile = taosOpenFile(filename, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); + pFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); if (pFile == NULL) { fprintf(stderr, "ERROR: failed to open file: %s\n", filename); return; @@ -773,7 +773,7 @@ bool simExecuteRestfulCmd(SScript *script, char *rest) { char filename[256]; sprintf(filename, "%s/tmp.sql", simScriptDir); // fp = fopen(filename, "w"); - pFile = taosOpenFile(filename, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); + pFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); if (pFile == NULL) { fprintf(stderr, "ERROR: failed to open file: %s\n", filename); return false; diff --git a/tools/shell/src/backup/shellCheck.c b/tools/shell/src/backup/shellCheck.c index dc18ecd3f8..d1f0683fea 100644 --- a/tools/shell/src/backup/shellCheck.c +++ b/tools/shell/src/backup/shellCheck.c @@ -116,7 +116,7 @@ static void *shellCheckThreadFp(void *arg) { char file[32] = {0}; snprintf(file, 32, "tb%d.txt", pThread->threadIndex); - TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); + TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (!fp) { fprintf(stdout, "failed to open %s, reason:%s", file, strerror(errno)); return NULL; diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 47d47f023d..0e611f3794 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -518,7 +518,7 @@ static int dumpResultToFile(const char *fname, TAOS_RES *tres) { } // FILE *fp = fopen(full_path.we_wordv[0], "w"); - TdFilePtr pFile = taosOpenFile(full_path.we_wordv[0], TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); + TdFilePtr pFile = taosOpenFile(full_path.we_wordv[0], TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); if (pFile == NULL) { fprintf(stderr, "ERROR: failed to open file: %s\n", full_path.we_wordv[0]); wordfree(&full_path); @@ -935,7 +935,7 @@ void write_history() { get_history_path(f_history); // FILE *f = fopen(f_history, "w"); - TdFilePtr pFile = taosOpenFile(f_history, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); + TdFilePtr pFile = taosOpenFile(f_history, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); if (pFile == NULL) { #ifndef WINDOWS fprintf(stderr, "Failed to open file %s for write, reason:%s\n", f_history, strerror(errno));