Merge remote-tracking branch 'origin/3.0' into feature/node
This commit is contained in:
commit
c55a3c2f9e
|
@ -44,7 +44,7 @@ extern "C" {
|
|||
|
||||
typedef struct TdFile *TdFilePtr;
|
||||
|
||||
#define TD_FILE_CTEATE 0x0001
|
||||
#define TD_FILE_CREATE 0x0001
|
||||
#define TD_FILE_WRITE 0x0002
|
||||
#define TD_FILE_READ 0x0004
|
||||
#define TD_FILE_TRUNC 0x0008
|
||||
|
|
|
@ -39,7 +39,7 @@ STSBuf* tsBufCreate(bool autoDelete, int32_t order) {
|
|||
|
||||
taosGetTmpfilePath(tsTempDir, "join", pTSBuf->path);
|
||||
// pTSBuf->pFile = fopen(pTSBuf->path, "wb+");
|
||||
pTSBuf->pFile = taosOpenFile(pTSBuf->path, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
|
||||
pTSBuf->pFile = taosOpenFile(pTSBuf->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC);
|
||||
if (pTSBuf->pFile == NULL) {
|
||||
taosMemoryFree(pTSBuf);
|
||||
return NULL;
|
||||
|
|
|
@ -162,7 +162,7 @@ int32_t dmWriteFile(SDnodeMgmt *pMgmt) {
|
|||
char file[PATH_MAX];
|
||||
snprintf(file, sizeof(file), "%s%sdnode.json.bak", pMgmt->path, TD_DIRSEP);
|
||||
|
||||
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pFile == NULL) {
|
||||
dError("failed to write %s since %s", file, strerror(errno));
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
|
|
|
@ -75,7 +75,7 @@ int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed) {
|
|||
snprintf(file, sizeof(file), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name);
|
||||
snprintf(realfile, sizeof(realfile), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name);
|
||||
|
||||
pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to write %s since %s", file, terrstr());
|
||||
|
@ -121,7 +121,7 @@ TdFilePtr dndCheckRunning(const char *dataDir) {
|
|||
char filepath[PATH_MAX] = {0};
|
||||
snprintf(filepath, sizeof(filepath), "%s%s.running", dataDir, TD_DIRSEP);
|
||||
|
||||
TdFilePtr pFile = taosOpenFile(filepath, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
TdFilePtr pFile = taosOpenFile(filepath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to lock file:%s since %s", filepath, terrstr());
|
||||
|
@ -218,7 +218,7 @@ int32_t dndWriteShmFile(SDnode *pDnode) {
|
|||
snprintf(file, sizeof(file), "%s%s.shmfile.bak", pDnode->dataDir, TD_DIRSEP);
|
||||
snprintf(realfile, sizeof(realfile), "%s%s.shmfile", pDnode->dataDir, TD_DIRSEP);
|
||||
|
||||
pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to open file:%s since %s", file, terrstr());
|
||||
|
|
|
@ -109,7 +109,7 @@ int32_t mmWriteFile(SMnodeMgmt *pMgmt, bool deployed) {
|
|||
char file[PATH_MAX];
|
||||
snprintf(file, sizeof(file), "%s%smnode.json.bak", pMgmt->path, TD_DIRSEP);
|
||||
|
||||
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to write %s since %s", file, terrstr());
|
||||
|
|
|
@ -154,7 +154,7 @@ int32_t vmWriteVnodesToFile(SVnodesMgmt *pMgmt) {
|
|||
snprintf(file, sizeof(file), "%s%svnodes.json.bak", pMgmt->path, TD_DIRSEP);
|
||||
snprintf(realfile, sizeof(file), "%s%svnodes.json", pMgmt->path, TD_DIRSEP);
|
||||
|
||||
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
dError("failed to write %s since %s", file, terrstr());
|
||||
|
|
|
@ -38,7 +38,7 @@ class MndTestTrans : public ::testing::Test {
|
|||
|
||||
test.ServerStop();
|
||||
|
||||
pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
int32_t writeLen = taosWriteFile(pFile, buffer, readLen);
|
||||
if (writeLen < 0 || writeLen == readLen) {
|
||||
ASSERT(1);
|
||||
|
|
|
@ -232,7 +232,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
|
|||
mDebug("start to write file:%s, current ver:%" PRId64 ", commit ver:%" PRId64, curfile, pSdb->curVer,
|
||||
pSdb->lastCommitVer);
|
||||
|
||||
TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
mError("failed to open file:%s for write since %s", tmpfile, terrstr());
|
||||
|
|
|
@ -95,7 +95,7 @@ STqMetaStore* tqStoreOpen(STQ* pTq, const char* path, FTqSerialize serializer, F
|
|||
tqError("failed to create dir:%s since %s ", name, terrstr());
|
||||
}
|
||||
strcat(name, "/" TQ_IDX_NAME);
|
||||
TdFilePtr pIdxFile = taosOpenFile(name, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_READ);
|
||||
TdFilePtr pIdxFile = taosOpenFile(name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ);
|
||||
if (pIdxFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
tqError("failed to open file:%s since %s ", name, terrstr());
|
||||
|
@ -113,7 +113,7 @@ STqMetaStore* tqStoreOpen(STQ* pTq, const char* path, FTqSerialize serializer, F
|
|||
|
||||
strcpy(name, path);
|
||||
strcat(name, "/" TQ_META_NAME);
|
||||
TdFilePtr pFile = taosOpenFile(name, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_READ);
|
||||
TdFilePtr pFile = taosOpenFile(name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ);
|
||||
if (pFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
tqError("failed to open file:%s since %s", name, terrstr());
|
||||
|
|
|
@ -416,7 +416,7 @@ static int tsdbSaveFSStatus(STsdb *pRepo, SFSStatus *pStatus) {
|
|||
tsdbGetTxnFname(pRepo, TSDB_TXN_TEMP_FILE, tfname);
|
||||
tsdbGetTxnFname(pRepo, TSDB_TXN_CURR_FILE, cfname);
|
||||
|
||||
TdFilePtr pFile = taosOpenFile(tfname, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
TdFilePtr pFile = taosOpenFile(tfname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
|
|
|
@ -360,7 +360,7 @@ static void *tsdbDecodeSDFileEx(void *buf, SDFile *pDFile) {
|
|||
int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType) {
|
||||
ASSERT(pDFile->info.size == 0 && pDFile->info.magic == TSDB_FILE_INIT_MAGIC);
|
||||
|
||||
pDFile->pFile = taosOpenFile(TSDB_FILE_FULL_NAME(pDFile), TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
pDFile->pFile = taosOpenFile(TSDB_FILE_FULL_NAME(pDFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pDFile->pFile == NULL) {
|
||||
if (errno == ENOENT) {
|
||||
// Try to create directory recursively
|
||||
|
@ -371,7 +371,7 @@ int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader, TSDB_FILE_T
|
|||
}
|
||||
taosMemoryFreeClear(s);
|
||||
|
||||
pDFile->pFile = taosOpenFile(TSDB_FILE_FULL_NAME(pDFile), TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
pDFile->pFile = taosOpenFile(TSDB_FILE_FULL_NAME(pDFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pDFile->pFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
|
|
|
@ -212,6 +212,9 @@ bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
|||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
*((uint8_t *)buf) = 0;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
*((int8_t*)buf) = 0;
|
||||
break;
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
|
@ -255,6 +258,9 @@ bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
|
|||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
SET_DOUBLE_VAL(((double *)buf), DBL_MAX);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
*((int8_t*)buf) = 1;
|
||||
break;
|
||||
default:
|
||||
assert(0);
|
||||
}
|
||||
|
@ -385,8 +391,8 @@ int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) {
|
|||
int32_t start = pInput->startRowIndex;
|
||||
int32_t numOfRows = pInput->numOfRows;
|
||||
|
||||
if (IS_SIGNED_NUMERIC_TYPE(type)) {
|
||||
if (type == TSDB_DATA_TYPE_TINYINT) {
|
||||
if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) {
|
||||
if (type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_BOOL) {
|
||||
LOOPCHECK_N(*(int8_t*)buf, pCol, pCtx, int8_t, numOfRows, start, isMinFunc, numOfElems);
|
||||
} else if (type == TSDB_DATA_TYPE_SMALLINT) {
|
||||
LOOPCHECK_N(*(int16_t*) buf, pCol, pCtx, int16_t, numOfRows, start, isMinFunc, numOfElems);
|
||||
|
|
|
@ -92,7 +92,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
|
|||
ctx->file.readOnly = readOnly;
|
||||
if (readOnly == false) {
|
||||
// ctx->file.pFile = open(path, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
|
||||
ctx->file.pFile = taosOpenFile(path, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||
ctx->file.pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||
taosFtruncateFile(ctx->file.pFile, 0);
|
||||
int64_t file_size;
|
||||
taosStatFile(path, &file_size, NULL);
|
||||
|
|
|
@ -605,6 +605,12 @@ typedef struct SMemParam {
|
|||
static FORCE_INLINE int32_t MemRowAppend(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param) {
|
||||
SMemParam* pa = (SMemParam*)param;
|
||||
SRowBuilder* rb = pa->rb;
|
||||
|
||||
if (value == NULL) { // it is a null data
|
||||
tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset, pa->colIdx);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (TSDB_DATA_TYPE_BINARY == pa->schema->type) {
|
||||
const char* rowEnd = tdRowEnd(rb->pBuf);
|
||||
STR_WITH_SIZE_TO_VARSTR(rowEnd, value, len);
|
||||
|
@ -621,14 +627,9 @@ static FORCE_INLINE int32_t MemRowAppend(SMsgBuf* pMsgBuf, const void* value, in
|
|||
varDataSetLen(rowEnd, output);
|
||||
tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, rowEnd, false, pa->toffset, pa->colIdx);
|
||||
} else {
|
||||
if (value == NULL) { // it is a null data
|
||||
tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NULL, value, false, pa->toffset,
|
||||
pa->colIdx);
|
||||
} else {
|
||||
tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, false, pa->toffset,
|
||||
pa->colIdx);
|
||||
}
|
||||
tdAppendColValToRow(rb, pa->schema->colId, pa->schema->type, TD_VTYPE_NORM, value, false, pa->toffset, pa->colIdx);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -57,7 +57,7 @@ SRaftStore *raftStoreOpen(const char *path) {
|
|||
static int32_t raftStoreInit(SRaftStore *pRaftStore) {
|
||||
assert(pRaftStore != NULL);
|
||||
|
||||
pRaftStore->pFile = taosOpenFile(pRaftStore->path, TD_FILE_CTEATE | TD_FILE_WRITE);
|
||||
pRaftStore->pFile = taosOpenFile(pRaftStore->path, TD_FILE_CREATE | TD_FILE_WRITE);
|
||||
assert(pRaftStore->pFile != NULL);
|
||||
|
||||
pRaftStore->currentTerm = 0;
|
||||
|
|
|
@ -37,7 +37,7 @@ extern "C" {
|
|||
/* file */
|
||||
typedef TdFilePtr tdb_fd_t;
|
||||
|
||||
#define TDB_O_CREAT TD_FILE_CTEATE
|
||||
#define TDB_O_CREAT TD_FILE_CREATE
|
||||
#define TDB_O_WRITE TD_FILE_WRITE
|
||||
#define TDB_O_READ TD_FILE_READ
|
||||
#define TDB_O_TRUNC TD_FILE_TRUNC
|
||||
|
|
|
@ -231,7 +231,7 @@ TEST_F(TfsTest, 04_File) {
|
|||
EXPECT_EQ(tfsMkdir(pTfs, "t3"), 0);
|
||||
|
||||
// FILE *fp = fopen(f1.aname, "w");
|
||||
TdFilePtr pFile = taosOpenFile(f1.aname, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
TdFilePtr pFile = taosOpenFile(f1.aname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
ASSERT_NE(pFile, nullptr);
|
||||
taosWriteFile(pFile, "12345678", 5);
|
||||
taosCloseFile(&pFile);
|
||||
|
@ -640,7 +640,7 @@ TEST_F(TfsTest, 05_MultiDisk) {
|
|||
EXPECT_EQ(tfsMkdir(pTfs, "t3"), 0);
|
||||
|
||||
// FILE *fp = fopen(f1.aname, "w");
|
||||
TdFilePtr pFile = taosOpenFile(f1.aname, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
TdFilePtr pFile = taosOpenFile(f1.aname, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
ASSERT_NE(pFile, nullptr);
|
||||
taosWriteFile(pFile, "12345678", 5);
|
||||
taosCloseFile(&pFile);
|
||||
|
|
|
@ -129,6 +129,12 @@ static void transDestroyConnCtx(STransConnCtx* ctx);
|
|||
static SCliThrdObj* createThrdObj();
|
||||
static void destroyThrdObj(SCliThrdObj* pThrd);
|
||||
|
||||
// snprintf may cause performance problem
|
||||
#define CONN_CONSTRUCT_HASH_KEY(key, ip, port) \
|
||||
do { \
|
||||
snprintf(key, sizeof(key), "%s:%d", ip, (int)port); \
|
||||
} while (0)
|
||||
|
||||
#define CONN_HOST_THREAD_INDEX(conn) (conn ? ((SCliConn*)conn)->hThrdIdx : -1)
|
||||
#define CONN_PERSIST_TIME(para) (para * 1000 * 10)
|
||||
#define CONN_GET_HOST_THREAD(conn) (conn ? ((SCliConn*)conn)->hostThrd : NULL)
|
||||
|
@ -206,8 +212,10 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
|
|||
} \
|
||||
} while (0)
|
||||
|
||||
#define CONN_NO_PERSIST_BY_APP(conn) (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
||||
#define CONN_RELEASE_BY_SERVER(conn) (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
||||
#define CONN_NO_PERSIST_BY_APP(conn) \
|
||||
(((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
||||
#define CONN_RELEASE_BY_SERVER(conn) \
|
||||
(((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
||||
|
||||
#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1)
|
||||
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1)
|
||||
|
@ -282,8 +290,9 @@ void cliHandleResp(SCliConn* conn) {
|
|||
tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
|
||||
}
|
||||
|
||||
tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, TMSG_INFO(pHead->msgType),
|
||||
taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen);
|
||||
tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn,
|
||||
TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
|
||||
taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen);
|
||||
|
||||
conn->secured = pHead->secured;
|
||||
|
||||
|
@ -349,10 +358,12 @@ void cliHandleExcept(SCliConn* pConn) {
|
|||
|
||||
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
|
||||
transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
|
||||
tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle, TMSG_INFO(transMsg.msgType));
|
||||
tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle,
|
||||
TMSG_INFO(transMsg.msgType));
|
||||
if (transMsg.ahandle == NULL) {
|
||||
transMsg.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType));
|
||||
tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle);
|
||||
tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn,
|
||||
transMsg.ahandle);
|
||||
}
|
||||
} else {
|
||||
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
|
||||
|
@ -423,8 +434,7 @@ void* destroyConnPool(void* pool) {
|
|||
|
||||
static SCliConn* getConnFromPool(void* pool, char* ip, uint32_t port) {
|
||||
char key[128] = {0};
|
||||
tstrncpy(key, ip, strlen(ip));
|
||||
tstrncpy(key + strlen(key), (char*)(&port), sizeof(port));
|
||||
CONN_CONSTRUCT_HASH_KEY(key, ip, port);
|
||||
|
||||
SHashObj* pPool = pool;
|
||||
SConnList* plist = taosHashGet(pPool, key, strlen(key));
|
||||
|
@ -456,8 +466,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
|
|||
conn->status = ConnInPool;
|
||||
|
||||
char key[128] = {0};
|
||||
tstrncpy(key, conn->ip, strlen(conn->ip));
|
||||
tstrncpy(key + strlen(key), (char*)(&conn->port), sizeof(conn->port));
|
||||
CONN_CONSTRUCT_HASH_KEY(key, conn->ip, conn->port);
|
||||
tTrace("cli conn %p added to conn pool, read buf cap: %d", conn, conn->readBuf.cap);
|
||||
|
||||
SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key));
|
||||
|
@ -626,8 +635,9 @@ void cliSend(SCliConn* pConn) {
|
|||
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
|
||||
|
||||
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
||||
tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pHead->msgType),
|
||||
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
||||
tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
|
||||
TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
||||
taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
||||
|
||||
if (pHead->persist == 1) {
|
||||
CONN_SET_PERSIST_BY_APP(pConn);
|
||||
|
|
|
@ -181,7 +181,7 @@ int main(int argc, char *argv[]) {
|
|||
tInfo("RPC server is running, ctrl-c to exit");
|
||||
|
||||
if (commit) {
|
||||
pDataFile = taosOpenFile(dataName, TD_FILE_APPEND | TD_FILE_CTEATE | TD_FILE_WRITE);
|
||||
pDataFile = taosOpenFile(dataName, TD_FILE_APPEND | TD_FILE_CREATE | TD_FILE_WRITE);
|
||||
if (pDataFile == NULL) tInfo("failed to open data file, reason:%s", strerror(errno));
|
||||
}
|
||||
qhandle = taosOpenQueue();
|
||||
|
|
|
@ -177,7 +177,7 @@ int main(int argc, char *argv[]) {
|
|||
tInfo("RPC server is running, ctrl-c to exit");
|
||||
|
||||
if (commit) {
|
||||
pDataFile = taosOpenFile(dataName, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||
pDataFile = taosOpenFile(dataName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||
if (pDataFile == NULL) tInfo("failed to open data file, reason:%s", strerror(errno));
|
||||
}
|
||||
qhandle = taosOpenQueue();
|
||||
|
|
|
@ -360,7 +360,7 @@ int walSaveMeta(SWal* pWal) {
|
|||
int metaVer = walFindCurMetaVer(pWal);
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
walBuildMetaName(pWal, metaVer + 1, fnameStr);
|
||||
TdFilePtr pMataFile = taosOpenFile(fnameStr, TD_FILE_CTEATE | TD_FILE_WRITE);
|
||||
TdFilePtr pMataFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE);
|
||||
if (pMataFile == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -56,13 +56,13 @@ int walSetWrite(SWal* pWal) {
|
|||
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
walBuildIdxName(pWal, fileFirstVer, fnameStr);
|
||||
pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||
pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||
if (pIdxTFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
walBuildLogName(pWal, fileFirstVer, fnameStr);
|
||||
pLogTFile = taosOpenFile(fnameStr, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||
pLogTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||
if (pLogTFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
|
@ -102,14 +102,14 @@ int walChangeWrite(SWal* pWal, int64_t ver) {
|
|||
|
||||
int64_t fileFirstVer = pFileInfo->firstVer;
|
||||
walBuildIdxName(pWal, fileFirstVer, fnameStr);
|
||||
pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||
pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||
if (pIdxTFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
pWal->pWriteIdxTFile = NULL;
|
||||
return -1;
|
||||
}
|
||||
walBuildLogName(pWal, fileFirstVer, fnameStr);
|
||||
pLogTFile = taosOpenFile(fnameStr, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||
pLogTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||
if (pLogTFile == NULL) {
|
||||
taosCloseFile(&pIdxTFile);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
|
|
|
@ -216,13 +216,13 @@ int walRoll(SWal *pWal) {
|
|||
int64_t newFileFirstVersion = pWal->vers.lastVer + 1;
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
walBuildIdxName(pWal, newFileFirstVersion, fnameStr);
|
||||
pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||
pIdxTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||
if (pIdxTFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
walBuildLogName(pWal, newFileFirstVersion, fnameStr);
|
||||
pLogTFile = taosOpenFile(fnameStr, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||
pLogTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
|
||||
if (pLogTFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
|
|
|
@ -129,7 +129,7 @@ int64_t taosCopyFile(const char *from, const char *to) {
|
|||
if (pFileFrom == NULL) goto _err;
|
||||
|
||||
// fidto = open(to, O_WRONLY | O_CREAT | O_EXCL, 0755);
|
||||
TdFilePtr pFileTo = taosOpenFile(to, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_EXCL);
|
||||
TdFilePtr pFileTo = taosOpenFile(to, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_EXCL);
|
||||
if (pFileTo == NULL) goto _err;
|
||||
|
||||
while (true) {
|
||||
|
@ -246,7 +246,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
|
|||
}
|
||||
} else {
|
||||
int access = O_BINARY;
|
||||
access |= (tdFileOptions & TD_FILE_CTEATE) ? O_CREAT : 0;
|
||||
access |= (tdFileOptions & TD_FILE_CREATE) ? O_CREAT : 0;
|
||||
if ((tdFileOptions & TD_FILE_WRITE) && (tdFileOptions & TD_FILE_READ)) {
|
||||
access |= O_RDWR;
|
||||
} else if (tdFileOptions & TD_FILE_WRITE) {
|
||||
|
|
|
@ -199,7 +199,7 @@ static void *taosThreadToOpenNewFile(void *param) {
|
|||
|
||||
taosUmaskFile(0);
|
||||
|
||||
TdFilePtr pFile = taosOpenFile(name, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
TdFilePtr pFile = taosOpenFile(name, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pFile == NULL) {
|
||||
tsLogObj.openInProgress = 0;
|
||||
tsLogObj.lines = tsLogObj.maxLines - 1000;
|
||||
|
@ -348,7 +348,7 @@ static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) {
|
|||
taosThreadMutexInit(&tsLogObj.logMutex, NULL);
|
||||
|
||||
taosUmaskFile(0);
|
||||
tsLogObj.logHandle->pFile = taosOpenFile(fileName, TD_FILE_CTEATE | TD_FILE_WRITE);
|
||||
tsLogObj.logHandle->pFile = taosOpenFile(fileName, TD_FILE_CREATE | TD_FILE_WRITE);
|
||||
|
||||
if (tsLogObj.logHandle->pFile == NULL) {
|
||||
printf("\nfailed to open log file:%s, reason:%s\n", fileName, strerror(errno));
|
||||
|
@ -699,7 +699,7 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) {
|
|||
goto cmp_end;
|
||||
}
|
||||
|
||||
TdFilePtr pFile = taosOpenFile(destFileName, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
TdFilePtr pFile = taosOpenFile(destFileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pFile == NULL) {
|
||||
ret = -2;
|
||||
goto cmp_end;
|
||||
|
|
|
@ -48,7 +48,7 @@ struct SDiskbasedBuf {
|
|||
};
|
||||
|
||||
static int32_t createDiskFile(SDiskbasedBuf* pBuf) {
|
||||
pBuf->pFile = taosOpenFile(pBuf->path, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
|
||||
pBuf->pFile = taosOpenFile(pBuf->path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
|
||||
if (pBuf->pFile == NULL) {
|
||||
return TAOS_SYSTEM_ERROR(errno);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,267 @@
|
|||
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
|
||||
# scene1: vgroups=1, one topic for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
|
||||
# scene2: vgroups=1, multi topics for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
|
||||
# scene3: vgroups=4, one topic for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
|
||||
# scene4: vgroups=4, multi topics for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
|
||||
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
|
||||
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
|
||||
#
|
||||
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
|
||||
#
|
||||
######## ######## ######## ######## ######## ######## ######## ######## ######## ########
|
||||
######## This test case include scene1 and scene3
|
||||
######## ######## ######## ######## ######## ######## ######## ######## ######## ########
|
||||
|
||||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/cfg.sh -n dnode1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
|
||||
$loop_cnt = 0
|
||||
check_dnode_ready:
|
||||
$loop_cnt = $loop_cnt + 1
|
||||
sleep 200
|
||||
if $loop_cnt == 10 then
|
||||
print ====> dnode not ready!
|
||||
return -1
|
||||
endi
|
||||
sql show dnodes
|
||||
print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05
|
||||
if $data00 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data04 != ready then
|
||||
goto check_dnode_ready
|
||||
endi
|
||||
|
||||
sql connect
|
||||
|
||||
$loop_cnt = 0
|
||||
$vgroups = 1
|
||||
$dbNamme = d0
|
||||
loop_vgroups:
|
||||
print =============== create database $dbNamme vgroups $vgroups
|
||||
sql create database $dbNamme vgroups $vgroups
|
||||
sql show databases
|
||||
print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||
print $data10 $data11 $data12 $data13 $data14 $data15 $data16 $data17 $data18 $data19
|
||||
print $data20 $data21 $data22 $data23 $data24 $data25 $data26 $data27 $data28 $data29
|
||||
|
||||
if $loop_cnt == 0 then
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data02 != 1 then # vgroups
|
||||
print vgroups: $data02
|
||||
return -1
|
||||
endi
|
||||
else
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 == d1 then
|
||||
if $data02 != 4 then # vgroups
|
||||
print vgroups: $data02
|
||||
return -1
|
||||
endi
|
||||
else
|
||||
if $data12 != 4 then # vgroups
|
||||
print vgroups: $data12
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
endi
|
||||
|
||||
sql use $dbNamme
|
||||
|
||||
print =============== create super table
|
||||
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 binary(10)) tags (t1 int)
|
||||
|
||||
sql show stables
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== create child table
|
||||
$tbPrefix = ct
|
||||
$tbNum = 2
|
||||
|
||||
$i = 0
|
||||
while $i < $tbNum
|
||||
$tb = $tbPrefix . $i
|
||||
sql create table $tb using stb tags( $i )
|
||||
$i = $i + 1
|
||||
endw
|
||||
|
||||
print =============== create normal table
|
||||
sql create table ntb (ts timestamp, c1 int, c2 float, c3 binary(10))
|
||||
|
||||
print =============== create multi topics. notes: now only support:
|
||||
print =============== 1. columns from stb/ctb/ntb; 2. * from ctb/ntb; 3. function from stb/ctb/ntb
|
||||
print =============== will support: * from stb
|
||||
|
||||
sql create topic topic_stb_column as select ts, c1, c3 from stb
|
||||
#sql create topic topic_stb_all as select * from stb
|
||||
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
|
||||
|
||||
sql create topic topic_ctb_column as select ts, c1, c3 from ct0
|
||||
sql create topic topic_ctb_all as select * from ct0
|
||||
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ct0
|
||||
|
||||
sql create topic topic_ntb_column as select ts, c1, c3 from ntb
|
||||
sql create topic topic_ntb_all as select * from ntb
|
||||
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb
|
||||
|
||||
sql show tables
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== run_back insert data
|
||||
|
||||
if $loop_cnt == 0 then
|
||||
run_back tsim/tmq/insertDataV1.sim
|
||||
else
|
||||
run_back tsim/tmq/insertDataV4.sim
|
||||
endi
|
||||
|
||||
sleep 1000
|
||||
|
||||
#$rowNum = 1000
|
||||
#$tstart = 1640966400000 # 2022-01-01 00:00:00.000
|
||||
#
|
||||
#$i = 0
|
||||
#while $i < $tbNum
|
||||
# $tb = $tbPrefix . $i
|
||||
#
|
||||
# $x = 0
|
||||
# while $x < $rowNum
|
||||
# $c = $x / 10
|
||||
# $c = $c * 10
|
||||
# $c = $x - $c
|
||||
#
|
||||
# $binary = ' . binary
|
||||
# $binary = $binary . $c
|
||||
# $binary = $binary . '
|
||||
#
|
||||
# sql insert into $tb values ($tstart , $c , $x , $binary )
|
||||
# sql insert into ntb values ($tstart , $c , $x , $binary )
|
||||
# $tstart = $tstart + 1
|
||||
# $x = $x + 1
|
||||
# endw
|
||||
#
|
||||
# $i = $i + 1
|
||||
# $tstart = 1640966400000
|
||||
#endw
|
||||
|
||||
#root@trd02 /home $ tmq_sim --help
|
||||
# -c Configuration directory, default is
|
||||
# -d The name of the database for cosumer, no default
|
||||
# -t The topic string for cosumer, no default
|
||||
# -k The key-value string for cosumer, no default
|
||||
# -g showMsgFlag, default is 0
|
||||
#
|
||||
|
||||
$consumeDelay = 5000
|
||||
$expectConsumeMsgCnt = 1000
|
||||
print expectConsumeMsgCnt: $expectConsumeMsgCnt, consumeDelay: $consumeDelay
|
||||
|
||||
# supported key:
|
||||
# group.id:<xxx>
|
||||
# enable.auto.commit:<true | false>
|
||||
# auto.offset.reset:<earliest | latest | none>
|
||||
# td.connect.ip:<fqdn | ipaddress>
|
||||
# td.connect.user:root
|
||||
# td.connect.pass:taosdata
|
||||
# td.connect.port:6030
|
||||
# td.connect.db:db
|
||||
|
||||
$expect_result = @{consume success: @
|
||||
$expect_result = $expect_result . $expectConsumeMsgCnt
|
||||
$expect_result = $expect_result . @, @
|
||||
$expect_result = $expect_result . 0}
|
||||
print expect_result----> $expect_result
|
||||
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
print cmd result----> $system_content
|
||||
if $system_content >= $expect_result then
|
||||
return -1
|
||||
endi
|
||||
|
||||
#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
#print cmd result----> $system_content
|
||||
##if $system_content != @{consume success: 10000, 0}@ then
|
||||
#if $system_content != $expect_result then
|
||||
# return -1
|
||||
#endi
|
||||
|
||||
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
print cmd result----> $system_content
|
||||
#if $system_content != @{consume success: 10000, 0}@ then
|
||||
if $system_content >= $expect_result then
|
||||
return -1
|
||||
endi
|
||||
|
||||
$expect_result = @{consume success: @
|
||||
$expect_result = $expect_result . $rowNum
|
||||
$expect_result = $expect_result . @, @
|
||||
$expect_result = $expect_result . 0}
|
||||
print expect_result----> $expect_result
|
||||
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
print cmd result----> $system_content
|
||||
if $system_content >= $expect_result then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
print cmd result----> $system_content
|
||||
if $system_content >= $expect_result then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
print cmd result----> $system_content
|
||||
if $system_content >= $expect_result then
|
||||
return -1
|
||||
endi
|
||||
|
||||
$expect_result = @{consume success: @
|
||||
$expect_result = $expect_result . $totalMsgCnt
|
||||
$expect_result = $expect_result . @, @
|
||||
$expect_result = $expect_result . 0}
|
||||
print expect_result----> $expect_result
|
||||
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
print cmd result----> $system_content
|
||||
if $system_content >= $expect_result then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
print cmd result----> $system_content
|
||||
if $system_content >= $expect_result then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_function" -k "group.id:tg2" -y $consumeDelay -m $expectConsumeMsgCnt
|
||||
print cmd result----> $system_content
|
||||
if $system_content >= $expect_result then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $loop_cnt == 0 then
|
||||
$loop_cnt = 1
|
||||
$vgroups = 4
|
||||
$dbNamme = d1
|
||||
goto loop_vgroups
|
||||
endi
|
||||
|
||||
|
||||
#system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -0,0 +1,41 @@
|
|||
|
||||
sql connect
|
||||
|
||||
print ================ insert data
|
||||
$dbNamme = d0
|
||||
$tbPrefix = ct
|
||||
$tbNum = 2
|
||||
$rowNum = 100
|
||||
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
|
||||
|
||||
sql use $dbNamme
|
||||
|
||||
loop_insert:
|
||||
$i = 0
|
||||
while $i < $tbNum
|
||||
$tb = $tbPrefix . $i
|
||||
|
||||
$x = 0
|
||||
while $x < $rowNum
|
||||
$c = $x / 10
|
||||
$c = $c * 10
|
||||
$c = $x - $c
|
||||
|
||||
$binary = ' . binary
|
||||
$binary = $binary . $c
|
||||
$binary = $binary . '
|
||||
|
||||
#print ====> insert into $tb values ($tstart , $c , $x , $binary )
|
||||
#print ====> insert into ntb values ($tstart , $c , $x , $binary )
|
||||
sql insert into $tb values ($tstart , $c , $x , $binary )
|
||||
sql insert into ntb values ($tstart , $c , $x , $binary )
|
||||
$tstart = $tstart + 1
|
||||
$x = $x + 1
|
||||
endw
|
||||
|
||||
$i = $i + 1
|
||||
$tstart = 1640966400000
|
||||
endw
|
||||
goto loop_insert
|
||||
|
||||
|
|
@ -0,0 +1,41 @@
|
|||
|
||||
sql connect
|
||||
|
||||
print ================ insert data
|
||||
$dbNamme = d1
|
||||
$tbPrefix = ct
|
||||
$tbNum = 2
|
||||
$rowNum = 100
|
||||
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
|
||||
|
||||
sql use $dbNamme
|
||||
|
||||
loop_insert:
|
||||
$i = 0
|
||||
while $i < $tbNum
|
||||
$tb = $tbPrefix . $i
|
||||
|
||||
$x = 0
|
||||
while $x < $rowNum
|
||||
$c = $x / 10
|
||||
$c = $c * 10
|
||||
$c = $x - $c
|
||||
|
||||
$binary = ' . binary
|
||||
$binary = $binary . $c
|
||||
$binary = $binary . '
|
||||
|
||||
#print ====> insert into $tb values ($tstart , $c , $x , $binary )
|
||||
#print ====> insert into ntb values ($tstart , $c , $x , $binary )
|
||||
sql insert into $tb values ($tstart , $c , $x , $binary )
|
||||
sql insert into ntb values ($tstart , $c , $x , $binary )
|
||||
$tstart = $tstart + 1
|
||||
$x = $x + 1
|
||||
endw
|
||||
|
||||
$i = $i + 1
|
||||
$tstart = 1640966400000
|
||||
endw
|
||||
goto loop_insert
|
||||
|
||||
|
|
@ -588,7 +588,7 @@ int32_t syncWriteDataByRatio() {
|
|||
|
||||
void printParaIntoFile() {
|
||||
// FILE *fp = fopen(g_stConfInfo.resultFileName, "a");
|
||||
TdFilePtr pFile = taosOpenFile(g_stConfInfo.resultFileName, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_STREAM);
|
||||
TdFilePtr pFile = taosOpenFile(g_stConfInfo.resultFileName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_STREAM);
|
||||
if (NULL == pFile) {
|
||||
fprintf(stderr, "Failed to open %s for save result\n", g_stConfInfo.resultFileName);
|
||||
exit -1;
|
||||
|
|
|
@ -42,6 +42,8 @@ typedef struct {
|
|||
char topicString[256];
|
||||
char keyString[1024];
|
||||
int32_t showMsgFlag;
|
||||
int32_t consumeDelay; // unit s
|
||||
int32_t consumeMsgCnt;
|
||||
|
||||
// save result after parse agrvs
|
||||
int32_t numOfTopic;
|
||||
|
@ -71,12 +73,19 @@ static void printHelp() {
|
|||
printf("%s%s%s\n", indent, indent, "The key-value string for cosumer, no default ");
|
||||
printf("%s%s\n", indent, "-g");
|
||||
printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag);
|
||||
printf("%s%s\n", indent, "-y");
|
||||
printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay);
|
||||
printf("%s%s\n", indent, "-m");
|
||||
printf("%s%s%s%d\n", indent, indent, "consume msg count, default is s", g_stConfInfo.consumeMsgCnt);
|
||||
exit(EXIT_SUCCESS);
|
||||
}
|
||||
|
||||
void parseArgument(int32_t argc, char *argv[]) {
|
||||
|
||||
memset(&g_stConfInfo, 0, sizeof(SConfInfo));
|
||||
g_stConfInfo.showMsgFlag = 0;
|
||||
g_stConfInfo.consumeDelay = 8000;
|
||||
g_stConfInfo.consumeMsgCnt = 0;
|
||||
|
||||
for (int32_t i = 1; i < argc; i++) {
|
||||
if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
|
||||
|
@ -92,6 +101,10 @@ void parseArgument(int32_t argc, char *argv[]) {
|
|||
strcpy(g_stConfInfo.keyString, argv[++i]);
|
||||
} else if (strcmp(argv[i], "-g") == 0) {
|
||||
g_stConfInfo.showMsgFlag = atol(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-y") == 0) {
|
||||
g_stConfInfo.consumeDelay = atol(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-m") == 0) {
|
||||
g_stConfInfo.consumeMsgCnt = atol(argv[++i]);
|
||||
} else {
|
||||
printf("%s unknow para: %s %s", GREEN, argv[++i], NC);
|
||||
exit(-1);
|
||||
|
@ -256,6 +269,48 @@ void loop_consume(tmq_t* tmq) {
|
|||
printf("{consume success: %d, %d}", totalMsgs, totalRows);
|
||||
}
|
||||
|
||||
|
||||
void parallel_consume(tmq_t* tmq) {
|
||||
tmq_resp_err_t err;
|
||||
|
||||
int32_t totalMsgs = 0;
|
||||
int32_t totalRows = 0;
|
||||
int32_t skipLogNum = 0;
|
||||
while (running) {
|
||||
tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, g_stConfInfo.consumeDelay * 1000);
|
||||
if (tmqMsg) {
|
||||
totalMsgs++;
|
||||
|
||||
#if 0
|
||||
TAOS_ROW row;
|
||||
while (NULL != (row = tmq_get_row(tmqMsg))) {
|
||||
totalRows++;
|
||||
}
|
||||
#endif
|
||||
|
||||
skipLogNum += tmqGetSkipLogNum(tmqMsg);
|
||||
if (0 != g_stConfInfo.showMsgFlag) {
|
||||
msg_process(tmqMsg);
|
||||
}
|
||||
tmq_message_destroy(tmqMsg);
|
||||
|
||||
if (totalMsgs >= g_stConfInfo.consumeMsgCnt) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
err = tmq_consumer_close(tmq);
|
||||
if (err) {
|
||||
printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
printf("%d", totalMsgs); // output to sim for check result
|
||||
}
|
||||
|
||||
int main(int32_t argc, char *argv[]) {
|
||||
parseArgument(argc, argv);
|
||||
parseInputString();
|
||||
|
@ -271,8 +326,12 @@ int main(int32_t argc, char *argv[]) {
|
|||
printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
loop_consume(tmq);
|
||||
|
||||
if (0 == g_stConfInfo.consumeMsgCnt) {
|
||||
loop_consume(tmq);
|
||||
} else {
|
||||
parallel_consume(tmq);
|
||||
}
|
||||
|
||||
err = tmq_unsubscribe(tmq);
|
||||
if (err) {
|
||||
|
|
|
@ -22,7 +22,7 @@ void simLogSql(char *sql, bool useSharp) {
|
|||
sprintf(filename, "%s/sim.sql", simScriptDir);
|
||||
if (pFile == NULL) {
|
||||
// fp = fopen(filename, "w");
|
||||
pFile = taosOpenFile(filename, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
|
||||
pFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
|
||||
if (pFile == NULL) {
|
||||
fprintf(stderr, "ERROR: failed to open file: %s\n", filename);
|
||||
return;
|
||||
|
@ -773,7 +773,7 @@ bool simExecuteRestfulCmd(SScript *script, char *rest) {
|
|||
char filename[256];
|
||||
sprintf(filename, "%s/tmp.sql", simScriptDir);
|
||||
// fp = fopen(filename, "w");
|
||||
pFile = taosOpenFile(filename, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
|
||||
pFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
|
||||
if (pFile == NULL) {
|
||||
fprintf(stderr, "ERROR: failed to open file: %s\n", filename);
|
||||
return false;
|
||||
|
|
|
@ -116,7 +116,7 @@ static void *shellCheckThreadFp(void *arg) {
|
|||
char file[32] = {0};
|
||||
snprintf(file, 32, "tb%d.txt", pThread->threadIndex);
|
||||
|
||||
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (!fp) {
|
||||
fprintf(stdout, "failed to open %s, reason:%s", file, strerror(errno));
|
||||
return NULL;
|
||||
|
|
|
@ -518,7 +518,7 @@ static int dumpResultToFile(const char *fname, TAOS_RES *tres) {
|
|||
}
|
||||
|
||||
// FILE *fp = fopen(full_path.we_wordv[0], "w");
|
||||
TdFilePtr pFile = taosOpenFile(full_path.we_wordv[0], TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
|
||||
TdFilePtr pFile = taosOpenFile(full_path.we_wordv[0], TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
|
||||
if (pFile == NULL) {
|
||||
fprintf(stderr, "ERROR: failed to open file: %s\n", full_path.we_wordv[0]);
|
||||
wordfree(&full_path);
|
||||
|
@ -935,7 +935,7 @@ void write_history() {
|
|||
get_history_path(f_history);
|
||||
|
||||
// FILE *f = fopen(f_history, "w");
|
||||
TdFilePtr pFile = taosOpenFile(f_history, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
|
||||
TdFilePtr pFile = taosOpenFile(f_history, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
|
||||
if (pFile == NULL) {
|
||||
#ifndef WINDOWS
|
||||
fprintf(stderr, "Failed to open file %s for write, reason:%s\n", f_history, strerror(errno));
|
||||
|
|
Loading…
Reference in New Issue