diff --git a/cmake/taosadapter_CMakeLists.txt.in b/cmake/taosadapter_CMakeLists.txt.in index 79d54f522e..cc46ef9938 100644 --- a/cmake/taosadapter_CMakeLists.txt.in +++ b/cmake/taosadapter_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taosadapter ExternalProject_Add(taosadapter GIT_REPOSITORY https://github.com/taosdata/taosadapter.git - GIT_TAG 0d5663d + GIT_TAG ff7de07 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/include/common/systable.h b/include/common/systable.h index 8b29525db3..57f85f16bc 100644 --- a/include/common/systable.h +++ b/include/common/systable.h @@ -46,6 +46,7 @@ extern "C" { #define TSDB_INS_TABLE_SUBSCRIPTIONS "ins_subscriptions" #define TSDB_INS_TABLE_TOPICS "ins_topics" #define TSDB_INS_TABLE_STREAMS "ins_streams" +#define TSDB_INS_TABLE_STREAM_TASKS "ins_stream_tasks" #define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema" #define TSDB_PERFS_TABLE_SMAS "perf_smas" diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 99c5c72e2f..3281bca96a 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -119,6 +119,7 @@ typedef enum _mgmt_table { TSDB_MGMT_TABLE_QUERIES, TSDB_MGMT_TABLE_VNODES, TSDB_MGMT_TABLE_APPS, + TSDB_MGMT_TABLE_STREAM_TASKS, TSDB_MGMT_TABLE_MAX, } EShowType; diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index a1dad1806d..d0971b013f 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -74,9 +74,8 @@ typedef struct SColumnNode { char tableName[TSDB_TABLE_NAME_LEN]; char tableAlias[TSDB_TABLE_NAME_LEN]; char colName[TSDB_COL_NAME_LEN]; - // SNode* pProjectRef; - int16_t dataBlockId; - int16_t slotId; + int16_t dataBlockId; + int16_t slotId; } SColumnNode; typedef struct SColumnRefNode { diff --git a/packaging/release.bat b/packaging/release.bat index 4ab7297f03..4c82c5ead5 100644 --- a/packaging/release.bat +++ b/packaging/release.bat @@ -39,7 +39,7 @@ if not exist %work_dir%\debug\ver-%2-x86 ( md %work_dir%\debug\ver-%2-x86 ) cd %work_dir%\debug\ver-%2-x64 -rem #call vcvarsall.bat x64 +call vcvarsall.bat x64 cmake ../../ -G "NMake Makefiles JOM" -DCMAKE_MAKE_PROGRAM=jom -DBUILD_TOOLS=true -DWEBSOCKET=true -DBUILD_HTTP=false -DBUILD_TEST=false -DVERNUMBER=%2 -DCPUTYPE=x64 cmake --build . rd /s /Q C:\TDengine diff --git a/packaging/tools/tdengine.iss b/packaging/tools/tdengine.iss index 981bee91b8..1c0c105179 100644 --- a/packaging/tools/tdengine.iss +++ b/packaging/tools/tdengine.iss @@ -61,6 +61,16 @@ Source: {#MyAppSourceDir}{#MyAppExeName}; DestDir: "{app}"; Excludes: {#MyAppExc Source: {#MyAppSourceDir}{#MyAppTaosdemoExeName}; DestDir: "{app}"; Flags: igNoreversion recursesubdirs createallsubdirs +[run] +Filename: {sys}\sc.exe; Parameters: "create taosd start= DEMAND binPath= ""C:\\TDengine\\taosd.exe --win_service""" ; Flags: runhidden +Filename: {sys}\sc.exe; Parameters: "create taosadapter start= DEMAND binPath= ""C:\\TDengine\\taosadapter.exe --win_service""" ; Flags: runhidden + +[UninstallRun] +RunOnceId: "stoptaosd"; Filename: {sys}\sc.exe; Parameters: "stop taosd" ; Flags: runhidden +RunOnceId: "stoptaosadapter"; Filename: {sys}\sc.exe; Parameters: "stop taosadapter" ; Flags: runhidden +RunOnceId: "deltaosd"; Filename: {sys}\sc.exe; Parameters: "delete taosd" ; Flags: runhidden +RunOnceId: "deltaosadapter"; Filename: {sys}\sc.exe; Parameters: "delete taosadapter" ; Flags: runhidden + [Registry] Root: HKLM; Subkey: "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"; \ ValueType: expandsz; ValueName: "Path"; ValueData: "{olddata};C:\TDengine"; \ diff --git a/source/common/src/systable.c b/source/common/src/systable.c index d3d006ab35..c3a1f9f67e 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -134,7 +134,7 @@ static const SSysDbTableSchema userStbsSchema[] = { }; static const SSysDbTableSchema streamSchema[] = { - {.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "stream_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, @@ -145,6 +145,15 @@ static const SSysDbTableSchema streamSchema[] = { {.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; +static const SSysDbTableSchema streamTaskSchema[] = { + {.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "task_id", .bytes = 8, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "node_type", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "node_id", .bytes = 8, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "level", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, +}; + static const SSysDbTableSchema userTblsSchema[] = { {.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, @@ -287,6 +296,7 @@ static const SSysTableMeta infosMeta[] = { {TSDB_INS_TABLE_TOPICS, topicSchema, tListLen(topicSchema), false}, {TSDB_INS_TABLE_SUBSCRIPTIONS, subscriptionSchema, tListLen(subscriptionSchema), false}, {TSDB_INS_TABLE_STREAMS, streamSchema, tListLen(streamSchema), false}, + {TSDB_INS_TABLE_STREAM_TASKS, streamTaskSchema, tListLen(streamTaskSchema), false}, {TSDB_INS_TABLE_VNODES, vnodesSchema, tListLen(vnodesSchema), true}, }; diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index e8d5989e4d..f7b1196248 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -277,7 +277,9 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int pColumnInfoData->varmeta.allocLen = len + oldLen; } - memcpy(pColumnInfoData->pData + oldLen, pSource->pData, len); + if (pColumnInfoData->pData && pSource->pData) { // TD-20382 + memcpy(pColumnInfoData->pData + oldLen, pSource->pData, len); + } pColumnInfoData->varmeta.length = len + oldLen; } else { if (finalNumOfRows > (*capacity)) { diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index b0af98b933..20c2ebb0a4 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -106,6 +106,8 @@ static int32_t convertToRetrieveType(char *name, int32_t len) { type = TSDB_MGMT_TABLE_STREAMS; } else if (strncasecmp(name, TSDB_PERFS_TABLE_APPS, len) == 0) { type = TSDB_MGMT_TABLE_APPS; + } else if (strncasecmp(name, TSDB_INS_TABLE_STREAM_TASKS, len) == 0) { + type = TSDB_MGMT_TABLE_STREAM_TASKS; } else { // ASSERT(0); } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 594c13f957..c649180285 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -41,6 +41,8 @@ static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq); static int32_t mndGetStreamMeta(SRpcMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextStream(SMnode *pMnode, void *pIter); +static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); +static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter); int32_t mndInitStream(SMnode *pMnode) { SSdbTable table = { @@ -62,6 +64,8 @@ int32_t mndInitStream(SMnode *pMnode) { mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream); + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask); + mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask); return sdbSetTable(pMnode->pSdb, table); } @@ -891,7 +895,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB SName n; int32_t cols = 0; - char streamName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName)); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)streamName, false); @@ -953,3 +957,105 @@ static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); } + +static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) { + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + SStreamObj *pStream = NULL; + + while (numOfRows < rowsCapacity) { + pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream); + if (pShow->pIter == NULL) break; + + // lock + taosRLockLatch(&pStream->lock); + // count task num + int32_t sz = taosArrayGetSize(pStream->tasks); + int32_t count = 0; + for (int32_t i = 0; i < sz; i++) { + SArray *pLevel = taosArrayGetP(pStream->tasks, i); + count += taosArrayGetSize(pLevel); + } + + if (numOfRows + count > rowsCapacity) { + blockDataEnsureCapacity(pBlock, numOfRows + count); + } + // add row for each task + for (int32_t i = 0; i < sz; i++) { + SArray *pLevel = taosArrayGetP(pStream->tasks, i); + int32_t levelCnt = taosArrayGetSize(pLevel); + for (int32_t j = 0; j < levelCnt; j++) { + SStreamTask *pTask = taosArrayGetP(pLevel, j); + + SColumnInfoData *pColInfo; + int32_t cols = 0; + + // stream name + char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)streamName, false); + + // task id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pTask->taskId, false); + + // node type + char nodeType[20 + VARSTR_HEADER_SIZE] = {0}; + varDataSetLen(nodeType, 5); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (pTask->nodeId > 0) { + memcpy(varDataVal(nodeType), "vnode", 5); + } else { + memcpy(varDataVal(nodeType), "snode", 5); + } + colDataAppend(pColInfo, numOfRows, nodeType, false); + + // node id + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + int32_t nodeId = TMAX(pTask->nodeId, 0); + colDataAppend(pColInfo, numOfRows, (const char *)&nodeId, false); + + // level + char level[20 + VARSTR_HEADER_SIZE] = {0}; + if (pTask->taskLevel == TASK_LEVEL__SOURCE) { + memcpy(varDataVal(level), "source", 6); + varDataSetLen(level, 6); + } else if (pTask->taskLevel == TASK_LEVEL__AGG) { + memcpy(varDataVal(level), "agg", 3); + varDataSetLen(level, 3); + } else if (pTask->taskLevel == TASK_LEVEL__SINK) { + memcpy(varDataVal(level), "sink", 4); + varDataSetLen(level, 4); + } else if (pTask->taskLevel == TASK_LEVEL__SINK) { + } + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&level, false); + + // status + char status[20 + VARSTR_HEADER_SIZE] = {0}; + char status2[20] = {0}; + strcpy(status, "normal"); + STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status)); + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&status, false); + + numOfRows++; + } + } + + // unlock + taosRUnLockLatch(&pStream->lock); + + sdbRelease(pSdb, pStream); + } + + pShow->numOfRows += numOfRows; + return numOfRows; +} + +static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) { + SSdb *pSdb = pMnode->pSdb; + sdbCancelFetch(pSdb, pIter); +} diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index ba9d68ee7f..c6e2842c32 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -108,8 +108,8 @@ static FORCE_INLINE int64_t tsdbLogicToFileSize(int64_t lSize, int32_t szPage) { #define TSDBROW_KEY(ROW) ((TSDBKEY){.version = TSDBROW_VERSION(ROW), .ts = TSDBROW_TS(ROW)}) #define tsdbRowFromTSRow(VERSION, TSROW) ((TSDBROW){.type = 0, .version = (VERSION), .pTSRow = (TSROW)}) #define tsdbRowFromBlockData(BLOCKDATA, IROW) ((TSDBROW){.type = 1, .pBlockData = (BLOCKDATA), .iRow = (IROW)}) -void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); -int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow); +void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal); +// int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow); int32_t tsdbRowCmprFn(const void *p1, const void *p2); // SRowIter void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema); @@ -333,6 +333,8 @@ struct SVersionRange { typedef struct SMemSkipListNode SMemSkipListNode; struct SMemSkipListNode { int8_t level; + int64_t version; + STSRow *pTSRow; SMemSkipListNode *forwards[0]; }; typedef struct SMemSkipList { @@ -772,14 +774,6 @@ static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) { #define SL_NODE_FORWARD(n, l) ((n)->forwards[l]) #define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)]) -#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level)) - -static FORCE_INLINE int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow) { - int32_t n = tGetI64(p, &pRow->version); - pRow->pTSRow = (STSRow *)(p + n); - n += pRow->pTSRow->len; - return n; -} static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) { if (pIter == NULL) return NULL; @@ -798,8 +792,9 @@ static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) { } } - tGetTSDBRow((uint8_t *)SL_NODE_DATA(pIter->pNode), &pIter->row); pIter->pRow = &pIter->row; + pIter->pRow->version = pIter->pNode->version; + pIter->pRow->pTSRow = pIter->pNode->pTSRow; return pIter->pRow; } diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 654afe1b6a..ddf2949607 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -18,10 +18,10 @@ #define MEM_MIN_HASH 1024 #define SL_MAX_LEVEL 5 -#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2) +// sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l) * 2 +#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + ((l) << 4)) #define SL_NODE_FORWARD(n, l) ((n)->forwards[l]) #define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)]) -#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level)) #define SL_MOVE_BACKWARD 0x1 #define SL_MOVE_FROM_POS 0x2 @@ -263,30 +263,27 @@ void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDa } bool tsdbTbDataIterNext(STbDataIter *pIter) { - SMemSkipListNode *pHead = pIter->pTbData->sl.pHead; - SMemSkipListNode *pTail = pIter->pTbData->sl.pTail; - pIter->pRow = NULL; if (pIter->backward) { - ASSERT(pIter->pNode != pTail); + ASSERT(pIter->pNode != pIter->pTbData->sl.pTail); - if (pIter->pNode == pHead) { + if (pIter->pNode == pIter->pTbData->sl.pHead) { return false; } pIter->pNode = SL_NODE_BACKWARD(pIter->pNode, 0); - if (pIter->pNode == pHead) { + if (pIter->pNode == pIter->pTbData->sl.pHead) { return false; } } else { - ASSERT(pIter->pNode != pHead); + ASSERT(pIter->pNode != pIter->pTbData->sl.pHead); - if (pIter->pNode == pTail) { + if (pIter->pNode == pIter->pTbData->sl.pTail) { return false; } pIter->pNode = SL_NODE_FORWARD(pIter->pNode, 0); - if (pIter->pNode == pTail) { + if (pIter->pNode == pIter->pTbData->sl.pTail) { return false; } } @@ -394,7 +391,7 @@ _err: static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags) { SMemSkipListNode *px; SMemSkipListNode *pn; - TSDBKEY *pTKey; + TSDBKEY tKey = {0}; int32_t backward = flags & SL_MOVE_BACKWARD; int32_t fromPos = flags & SL_MOVE_FROM_POS; @@ -413,9 +410,10 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) { pn = SL_NODE_BACKWARD(px, iLevel); while (pn != pTbData->sl.pHead) { - pTKey = (TSDBKEY *)SL_NODE_DATA(pn); + tKey.version = pn->version; + tKey.ts = pn->pTSRow->ts; - int32_t c = tsdbKeyCmprFn(pTKey, pKey); + int32_t c = tsdbKeyCmprFn(&tKey, pKey); if (c <= 0) { break; } else { @@ -442,7 +440,10 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) { pn = SL_NODE_FORWARD(px, iLevel); while (pn != pTbData->sl.pTail) { - int32_t c = tsdbKeyCmprFn(SL_NODE_DATA(pn), pKey); + tKey.version = pn->version; + tKey.ts = pn->pTSRow->ts; + + int32_t c = tsdbKeyCmprFn(&tKey, pKey); if (c >= 0) { break; } else { @@ -467,8 +468,8 @@ static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) { return level; } -static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, TSDBROW *pRow, - int8_t forward) { +static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, int64_t version, + STSRow *pRow, int8_t forward) { int32_t code = 0; int8_t level; SMemSkipListNode *pNode; @@ -477,13 +478,19 @@ static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListN // node level = tsdbMemSkipListRandLevel(&pTbData->sl); ASSERT(pPool != NULL); - pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level) + tPutTSDBRow(NULL, pRow)); + pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level)); if (pNode == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } pNode->level = level; - tPutTSDBRow((uint8_t *)SL_NODE_DATA(pNode), pRow); + pNode->version = version; + pNode->pTSRow = vnodeBufPoolMalloc(pPool, pRow->len); + if (NULL == pNode->pTSRow) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + memcpy(pNode->pTSRow, pRow, pRow->len); for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) { SMemSkipListNode *pn = pos[iLevel]; @@ -549,7 +556,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i key.ts = row.pTSRow->ts; nRow++; tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD); - code = tbDataDoPut(pMemTable, pTbData, pos, &row, 0); + code = tbDataDoPut(pMemTable, pTbData, pos, version, row.pTSRow, 0); if (code) { goto _err; } @@ -570,7 +577,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) { tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS); } - code = tbDataDoPut(pMemTable, pTbData, pos, &row, 1); + code = tbDataDoPut(pMemTable, pTbData, pos, version, row.pTSRow, 1); if (code) { goto _err; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index c157faecb1..af368d33e0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3219,7 +3219,7 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger, SVersionRange* pVerRange, int32_t step) { - while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) { + while (rowIndex < pBlockData->nRow && rowIndex >= 0 && pBlockData->aTSKEY[rowIndex] == key) { if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) { rowIndex += step; continue; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 755a551e20..0aa2c6ab83 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -565,15 +565,15 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal * } } -int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow) { - int32_t n = 0; +// int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow) { +// int32_t n = 0; - n += tPutI64(p, pRow->version); - if (p) memcpy(p + n, pRow->pTSRow, pRow->pTSRow->len); - n += pRow->pTSRow->len; +// n += tPutI64(p, pRow->version); +// if (p) memcpy(p + n, pRow->pTSRow, pRow->pTSRow->len); +// n += pRow->pTSRow->len; - return n; -} +// return n; +// } int32_t tsdbRowCmprFn(const void *p1, const void *p2) { return tsdbKeyCmprFn(&TSDBROW_KEY((TSDBROW *)p1), &TSDBROW_KEY((TSDBROW *)p2)); @@ -1084,7 +1084,7 @@ static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSch cv.flag = CV_FLAG_VALUE; if (IS_VAR_DATA_TYPE(pTColumn->type)) { - void *pData = (char*)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY)); + void *pData = (char *)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY)); cv.value.nData = varDataLen(pData); cv.value.pData = varDataVal(pData); } else { @@ -1106,7 +1106,7 @@ static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSch cv.flag = CV_FLAG_VALUE; if (IS_VAR_DATA_TYPE(pTColumn->type)) { - void *pData = (char*)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY)); + void *pData = (char *)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY)); cv.value.nData = varDataLen(pData); cv.value.pData = varDataVal(pData); } else { @@ -1151,7 +1151,7 @@ static int32_t tBlockDataAppendKVRow(SBlockData *pBlockData, STSRow *pRow, STSch ASSERT(pTColumn->type == pColData->type); SColVal cv = {.cid = pTColumn->colId, .type = pTColumn->type}; - TDRowValT vt = TD_VTYPE_NONE; // default is NONE + TDRowValT vt = TD_VTYPE_NONE; // default is NONE SKvRowIdx *pKvIdx = NULL; while (kvIter < nKvCols) { diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index ddd948a6dd..c41376b2dc 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -513,6 +513,22 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) { // taosMemoryFreeClear(pFillInfo->pTags[i].tagVal); // } + // free pFillCol + if (pFillInfo->pFillCol) { + for (int32_t i = 0; i < pFillInfo->numOfCols; i++) { + SFillColInfo* pCol = &pFillInfo->pFillCol[i]; + if (!pCol->notFillCol) { + if (pCol->fillVal.nType == TSDB_DATA_TYPE_VARBINARY || pCol->fillVal.nType == TSDB_DATA_TYPE_VARCHAR || + pCol->fillVal.nType == TSDB_DATA_TYPE_NCHAR || pCol->fillVal.nType == TSDB_DATA_TYPE_JSON) { + if (pCol->fillVal.pz) { + taosMemoryFree(pCol->fillVal.pz); + pCol->fillVal.pz = NULL; + } + } + } + } + } + taosMemoryFreeClear(pFillInfo->pTags); taosMemoryFreeClear(pFillInfo->pFillCol); taosMemoryFreeClear(pFillInfo); diff --git a/source/libs/executor/src/tsimplehash.c b/source/libs/executor/src/tsimplehash.c index a5168d24ba..9fbe94200c 100644 --- a/source/libs/executor/src/tsimplehash.c +++ b/source/libs/executor/src/tsimplehash.c @@ -189,7 +189,7 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, cons } while (pNode) { - if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) { + if ((keyLen == pNode->keyLen) && (*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) { break; } pNode = pNode->next; @@ -213,10 +213,12 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, cons static FORCE_INLINE SHNode *doSearchInEntryList(SSHashObj *pHashObj, const void *key, size_t keyLen, int32_t index) { SHNode *pNode = pHashObj->hashList[index]; while (pNode) { - if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) { + const char* p = GET_SHASH_NODE_KEY(pNode, pNode->dataLen); + ASSERT(keyLen > 0); + + if (pNode->keyLen == keyLen && ((*(pHashObj->equalFp))(p, key, keyLen) == 0)) { break; } - pNode = pNode->next; } diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index d3f03e8e9c..8249a8f6b1 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -475,7 +475,7 @@ static int32_t translateNowToday(SFunctionNode* pFunc, char* pErrBuf, int32_t le // add database precision as param uint8_t dbPrec = pFunc->node.resType.precision; - int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec); + int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1506,7 +1506,7 @@ static int32_t translateIrate(SFunctionNode* pFunc, char* pErrBuf, int32_t len) // add database precision as param uint8_t dbPrec = pFunc->node.resType.precision; - int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec); + int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1519,7 +1519,7 @@ static int32_t translateInterp(SFunctionNode* pFunc, char* pErrBuf, int32_t len) int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList); uint8_t dbPrec = pFunc->node.resType.precision; - //if (1 != numOfParams && 3 != numOfParams && 4 != numOfParams) { + // if (1 != numOfParams && 3 != numOfParams && 4 != numOfParams) { if (1 != numOfParams) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); } @@ -1835,7 +1835,7 @@ static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { // add database precision as param uint8_t dbPrec = pFunc->node.resType.precision; - int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec); + int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -1894,7 +1894,7 @@ static int32_t translateToUnixtimestamp(SFunctionNode* pFunc, char* pErrBuf, int // add database precision as param uint8_t dbPrec = pFunc->node.resType.precision; - int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec); + int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2474,7 +2474,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "first", .type = FUNCTION_TYPE_FIRST, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC, .translateFunc = translateFirstLast, .dynDataRequiredFunc = firstDynDataReq, .getEnvFunc = getFirstLastFuncEnv, @@ -2512,7 +2512,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "last", .type = FUNCTION_TYPE_LAST, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC, .translateFunc = translateFirstLast, .dynDataRequiredFunc = lastDynDataReq, .getEnvFunc = getFirstLastFuncEnv, diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 39d17153d0..cc1bae6a3c 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -622,7 +622,7 @@ void nodesDestroyNode(SNode* pNode) { } switch (nodeType(pNode)) { - case QUERY_NODE_COLUMN: // pProjectRef is weak reference, no need to release + case QUERY_NODE_COLUMN: destroyExprNode((SExprNode*)pNode); break; case QUERY_NODE_VALUE: { diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index e64a370e15..8de130bbb5 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -744,7 +744,8 @@ static bool isPrimaryKeyImpl(SNode* pExpr) { return (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pExpr)->colId); } else if (QUERY_NODE_FUNCTION == nodeType(pExpr)) { SFunctionNode* pFunc = (SFunctionNode*)pExpr; - if (FUNCTION_TYPE_SELECT_VALUE == pFunc->funcType) { + if (FUNCTION_TYPE_SELECT_VALUE == pFunc->funcType || FUNCTION_TYPE_FIRST == pFunc->funcType || + FUNCTION_TYPE_LAST == pFunc->funcType) { return isPrimaryKeyImpl(nodesListGetNode(pFunc->pParameterList, 0)); } else if (FUNCTION_TYPE_WSTART == pFunc->funcType || FUNCTION_TYPE_WEND == pFunc->funcType) { return true; @@ -787,7 +788,6 @@ static void setColumnInfoBySchema(const SRealTableNode* pTable, const SSchema* p static void setColumnInfoByExpr(STempTableNode* pTable, SExprNode* pExpr, SColumnNode** pColRef) { SColumnNode* pCol = *pColRef; - // pCol->pProjectRef = (SNode*)pExpr; if (NULL == pExpr->pAssociation) { pExpr->pAssociation = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES); } @@ -2932,8 +2932,8 @@ static int32_t checkFill(STranslateContext* pCxt, SFillNode* pFill, SValueNode* return TSDB_CODE_SUCCESS; } - if (TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_INITIALIZER) || - TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_DESC_INITIALIZER)) { + if (!pCxt->createStream && (TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_INITIALIZER) || + TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_DESC_INITIALIZER))) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE); } @@ -5268,9 +5268,7 @@ static int32_t checkTopicQuery(STranslateContext* pCxt, SSelectStmt* pSelect) { } static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pStmt, SCMCreateTopicReq* pReq) { - SName name; - tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName)); - tNameGetFullDbName(&name, pReq->name); + snprintf(pReq->name, sizeof(pReq->name), "%d.%s", pCxt->pParseCxt->acctId, pStmt->topicName); pReq->igExists = pStmt->ignoreExists; pReq->withMeta = pStmt->withMeta; @@ -5280,7 +5278,7 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS } int32_t code = TSDB_CODE_SUCCESS; - + SName name; if ('\0' != pStmt->subSTbName[0]) { pReq->subType = TOPIC_SUB_TYPE__TABLE; toName(pCxt->pParseCxt->acctId, pStmt->subDbName, pStmt->subSTbName, &name); @@ -5548,6 +5546,10 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SSelectStmt* pSelect) { crossTableWithUdaf(pSelect)) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); } + if (NULL != pSelect->pSubtable && TSDB_DATA_TYPE_VARCHAR != ((SExprNode*)pSelect->pSubtable)->resType.type) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "SUBTABLE expression must be of VARCHAR type"); + } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 94c4de3c5e..875204c0a9 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -37,43 +37,6 @@ // /\ UNCHANGED <> // -// only start once -static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, SyncTerm lastApplyTerm, - SyncAppendEntriesReply* pMsg) { - if (beginIndex > endIndex) { - sNError(ths, "snapshot param error, start:%" PRId64 ", end:%" PRId64, beginIndex, endIndex); - return; - } - - // get sender - SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); - ASSERT(pSender != NULL); - - if (snapshotSenderIsStart(pSender)) { - sSError(pSender, "snapshot sender already start"); - return; - } - - SSnapshot snapshot = { - .data = NULL, .lastApplyIndex = endIndex, .lastApplyTerm = lastApplyTerm, .lastConfigIndex = SYNC_INDEX_INVALID}; - void* pReader = NULL; - SSnapshotParam readerParam = {.start = beginIndex, .end = endIndex}; - int32_t code = ths->pFsm->FpSnapshotStartRead(ths->pFsm, &readerParam, &pReader); - ASSERT(code == 0); - -#if 0 - if (pMsg->privateTerm < pSender->privateTerm) { - ASSERT(pReader != NULL); - snapshotSenderStart(pSender, readerParam, snapshot, pReader); - - } else { - if (pReader != NULL) { - ths->pFsm->FpSnapshotStopRead(ths->pFsm, pReader); - } - } -#endif -} - int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t ret = 0; SyncAppendEntriesReply* pMsg = pRpcMsg->pCont; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 5ceca36aac..e802f60f30 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -385,7 +385,7 @@ bool syncIsReadyForRead(int64_t rid) { if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) { SSyncRaftEntry* pEntry = NULL; int32_t code = pSyncNode->pLogStore->syncLogGetEntry( - pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry); + pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry); if (code == 0 && pEntry != NULL) { if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) { ready = true; @@ -1806,7 +1806,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock), pNode->pingTimerMS, pNode); if (code != 0) { - sNError(pNode, "failed to build ping msg"); + sError("failed to build ping msg"); rpcFreeCont(rpcMsg.pCont); return; } @@ -1814,7 +1814,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { sNTrace(pNode, "enqueue ping msg"); code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); if (code != 0) { - sNError(pNode, "failed to sync enqueue ping msg since %s", terrstr()); + sError("failed to sync enqueue ping msg since %s", terrstr()); rpcFreeCont(rpcMsg.pCont); return; } @@ -1839,7 +1839,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode); if (code != 0) { - sNError(pNode, "failed to build elect msg"); + sError("failed to build elect msg"); taosMemoryFree(pElectTimer); return; } @@ -1849,7 +1849,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); if (code != 0) { - sNError(pNode, "failed to sync enqueue elect msg since %s", terrstr()); + sError("failed to sync enqueue elect msg since %s", terrstr()); rpcFreeCont(rpcMsg.pCont); taosMemoryFree(pElectTimer); return; @@ -1879,14 +1879,14 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { pNode->heartbeatTimerMS, pNode); if (code != 0) { - sNError(pNode, "failed to build heartbeat msg"); + sError("failed to build heartbeat msg"); return; } sNTrace(pNode, "enqueue heartbeat timer"); code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); if (code != 0) { - sNError(pNode, "failed to enqueue heartbeat msg since %s", terrstr()); + sError("failed to enqueue heartbeat msg since %s", terrstr()); rpcFreeCont(rpcMsg.pCont); return; } @@ -1971,7 +1971,7 @@ static int32_t syncNodeEqNoop(SSyncNode* pNode) { sNTrace(pNode, "propose msg, type:noop"); code = (*pNode->syncEqMsg)(pNode->msgcb, &rpcMsg); if (code != 0) { - sNError(pNode, "failed to propose noop msg while enqueue since %s", terrstr()); + sError("failed to propose noop msg while enqueue since %s", terrstr()); } return code; @@ -2005,7 +2005,7 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) { if (ths->state == TAOS_SYNC_STATE_LEADER) { int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); if (code != 0) { - sNError(ths, "append noop error"); + sError("append noop error"); return -1; } } @@ -2109,7 +2109,7 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { syncNodeFollowerCommit(ths, pMsg->fcIndex); } else { - sNError(ths, "error local cmd"); + sError("error local cmd"); } return 0; diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 7e4b18ab88..b00ba3918c 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -197,7 +197,12 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr syncMeta.isWeek = pEntry->isWeak; syncMeta.seqNum = pEntry->seqNum; syncMeta.term = pEntry->term; + + int64_t tsWriteBegin = taosGetTimestampNs(); index = walAppendLog(pWal, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen); + int64_t tsWriteEnd = taosGetTimestampNs(); + int64_t tsElapsed = tsWriteEnd - tsWriteBegin; + if (index < 0) { int32_t err = terrno; const char* errStr = tstrerror(err); @@ -210,8 +215,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr } pEntry->index = index; - sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s", pEntry->index, - TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType)); + sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index, + TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed); return 0; } @@ -234,9 +239,13 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR return -1; } + int64_t ts1 = taosGetTimestampNs(); taosThreadMutexLock(&(pData->mutex)); + int64_t ts2 = taosGetTimestampNs(); code = walReadVer(pWalHandle, index); + int64_t ts3 = taosGetTimestampNs(); + // code = walReadVerCached(pWalHandle, index); if (code != 0) { int32_t err = terrno; @@ -280,6 +289,18 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR */ taosThreadMutexUnlock(&(pData->mutex)); + int64_t ts4 = taosGetTimestampNs(); + + int64_t tsElapsed = ts4 - ts1; + int64_t tsElapsedLock = ts2 - ts1; + int64_t tsElapsedRead = ts3 - ts2; + int64_t tsElapsedBuild = ts4 - ts3; + + sNTrace(pData->pSyncNode, + "read index:%" PRId64 ", elapsed:%" PRId64 ", elapsed-lock:%" PRId64 ", elapsed-read:%" PRId64 + ", elapsed-build:%" PRId64, + index, tsElapsed, tsElapsedLock, tsElapsedRead, tsElapsedBuild); + return code; } diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index f09bac2139..6a7a2c18c1 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -72,7 +72,7 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh SRpcMsg rpcMsg = {0}; SyncAppendEntries* pMsg = NULL; - SSyncRaftEntry* pEntry; + SSyncRaftEntry* pEntry = NULL; int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry); if (code == 0) { @@ -99,6 +99,8 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh } } + syncEntryDestory(pEntry); + // prepare msg ASSERT(pMsg != NULL); pMsg->srcId = pSyncNode->myRaftId; @@ -140,9 +142,10 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) { int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) { int32_t ret = 0; SyncAppendEntries* pMsg = pRpcMsg->pCont; - - syncLogSendAppendEntries(pSyncNode, pMsg, ""); - syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg); + if (pMsg == NULL) { + sError("vgId:%d, sync-append-entries msg is NULL", pSyncNode->vgId); + return 0; + } SPeerState* pState = syncNodeGetPeerState(pSyncNode, destRaftId); if (pState == NULL) { @@ -150,8 +153,19 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI return 0; } + // save index, otherwise pMsg will be free by rpc + SyncIndex saveLastSendIndex = pState->lastSendIndex; + bool update = false; if (pMsg->dataLen > 0) { - pState->lastSendIndex = pMsg->prevLogIndex + 1; + saveLastSendIndex = pMsg->prevLogIndex + 1; + update = true; + } + + syncLogSendAppendEntries(pSyncNode, pMsg, ""); + syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg); + + if (update) { + pState->lastSendIndex = saveLastSendIndex; pState->lastSendTime = taosGetTimestampMs(); } diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index a575de7e56..b9a271ab9d 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -194,6 +194,7 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) { if (pNode == NULL || pNode->pRaftCfg != NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return; + int64_t currentTerm = pNode->pRaftStore->currentTerm; // save error code, otherwise it will be overwritten int32_t errCode = terrno; @@ -235,8 +236,8 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo ", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", snap-tm:%" PRIu64 ", sby:%d, aq:%d, bch:%d, r-num:%d, lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s", - pNode->vgId, syncStr(pNode->state), eventLog, pNode->pRaftStore->currentTerm, pNode->commitIndex, - logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, + pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex, + logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->pRaftCfg->isStandBy, aqItems, pNode->pRaftCfg->batchSize, pNode->replicaNum, pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum, pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr); @@ -374,9 +375,9 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); sNTrace(pSyncNode, - "recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64 - "}, %s", - host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s); + "recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 + ", success:%d, lsend-index:%" PRId64 ", match:%" PRId64 "}, %s", + host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s); } void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) { @@ -511,8 +512,8 @@ void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port); sNTrace(pSyncNode, "send sync-append-entries to %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64 - ", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s", - host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex, + ", lsend-index:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s", + host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, (pMsg->prevLogIndex + 1), pMsg->commitIndex, pMsg->dataLen, s); } diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 1c6e1a2e17..6cac4b6093 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -124,8 +124,8 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) { goto _err; } - char* candidate = NULL; - char* haystack = buf; + char* candidate = NULL; + char* haystack = buf; int64_t pos = 0; SWalCkHead* logContent = NULL; @@ -414,8 +414,10 @@ int walCheckAndRepairMeta(SWal* pWal) { } ASSERT(pFileInfo->fileSize == 0); // remove the empty wal log, and its idx + wInfo("vgId:%d, wal remove empty file %s", pWal->cfg.vgId, fnameStr); taosRemoveFile(fnameStr); walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr); + wInfo("vgId:%d, wal remove empty file %s", pWal->cfg.vgId, fnameStr); taosRemoveFile(fnameStr); // remove its meta entry taosArrayRemove(pWal->fileInfoSet, fileIdx); diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 7ced5fae39..216dd5fcb1 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -407,6 +407,7 @@ int32_t walRollImpl(SWal *pWal) { } walBuildLogName(pWal, newFileFirstVer, fnameStr); pLogFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); + wDebug("vgId:%d, wal create new file for write:%s", pWal->cfg.vgId, fnameStr); if (pLogFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); code = -1; diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index c1fee37610..76a312cd91 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -271,8 +271,11 @@ static int32_t cfgSetTimezone(SConfigItem *pItem, const char *value, ECfgSrcType cfgStypeStr(stype), value, terrstr()); return -1; } - pItem->stype = stype; + + // apply new timezone + osSetTimezone(value); + return 0; } diff --git a/tools/CMakeLists.txt b/tools/CMakeLists.txt index 79be05e9fc..0f3c383b9e 100644 --- a/tools/CMakeLists.txt +++ b/tools/CMakeLists.txt @@ -87,6 +87,7 @@ ELSE () MESSAGE("CURRENT SOURCE DIR ${CMAKE_CURRENT_SOURCE_DIR}") IF (TD_WINDOWS) + MESSAGE("Building taosAdapter on Windows") INCLUDE(ExternalProject) ExternalProject_Add(taosadapter PREFIX "taosadapter" @@ -104,14 +105,18 @@ ELSE () COMMAND go build -a -o taosadapter-debug.exe -ldflags "-X github.com/taosdata/taosadapter/v3/version.Version=${taos_version} -X github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}" INSTALL_COMMAND - COMMAND cmake -E time upx taosadapter ||: - COMMAND cmake -E copy taosadapter.exe ${CMAKE_BINARY_DIR}/build/bin + COMMAND cmake -E echo "Comparessing taosadapter.exe" + COMMAND cmake -E time upx taosadapter.exe + COMMAND cmake -E echo "Copy taosadapter.exe" + COMMAND cmake -E copy taosadapter.exe ${CMAKE_BINARY_DIR}/build/bin/taosadapter.exe COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/test/cfg/ + COMMAND cmake -E echo "Copy taosadapter.toml" COMMAND cmake -E copy ./example/config/taosadapter.toml ${CMAKE_BINARY_DIR}/test/cfg/ - COMMAND cmake -E copy ./taosadapter.service ${CMAKE_BINARY_DIR}/test/cfg/ + COMMAND cmake -E echo "Copy taosadapter-debug.exe" COMMAND cmake -E copy taosadapter-debug.exe ${CMAKE_BINARY_DIR}/build/bin ) ELSE (TD_WINDOWS) + MESSAGE("Building taosAdapter on non-Windows") INCLUDE(ExternalProject) ExternalProject_Add(taosadapter PREFIX "taosadapter" @@ -126,11 +131,15 @@ ELSE () COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -ldflags "-s -w -X github.com/taosdata/taosadapter/v3/version.Version=${taos_version} -X github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}" COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -o taosadapter-debug -ldflags "-X github.com/taosdata/taosadapter/v3/version.Version=${taos_version} -X github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}" INSTALL_COMMAND + COMMAND cmake -E echo "Comparessing taosadapter.exe" COMMAND upx taosadapter || : + COMMAND cmake -E echo "Copy taosadapter" COMMAND cmake -E copy taosadapter ${CMAKE_BINARY_DIR}/build/bin COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/test/cfg/ + COMMAND cmake -E echo "Copy taosadapter.toml" COMMAND cmake -E copy ./example/config/taosadapter.toml ${CMAKE_BINARY_DIR}/test/cfg/ COMMAND cmake -E copy ./taosadapter.service ${CMAKE_BINARY_DIR}/test/cfg/ + COMMAND cmake -E echo "Copy taosadapter-debug" COMMAND cmake -E copy taosadapter-debug ${CMAKE_BINARY_DIR}/build/bin ) ENDIF (TD_WINDOWS)