Merge branch '3.0' into fix/TD-20317

This commit is contained in:
Shengliang Guan 2022-11-14 20:59:25 +08:00
commit 8a0c8360ca
28 changed files with 304 additions and 137 deletions

View File

@ -2,7 +2,7 @@
# taosadapter
ExternalProject_Add(taosadapter
GIT_REPOSITORY https://github.com/taosdata/taosadapter.git
GIT_TAG 0d5663d
GIT_TAG ff7de07
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosadapter"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -46,6 +46,7 @@ extern "C" {
#define TSDB_INS_TABLE_SUBSCRIPTIONS "ins_subscriptions"
#define TSDB_INS_TABLE_TOPICS "ins_topics"
#define TSDB_INS_TABLE_STREAMS "ins_streams"
#define TSDB_INS_TABLE_STREAM_TASKS "ins_stream_tasks"
#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema"
#define TSDB_PERFS_TABLE_SMAS "perf_smas"

View File

@ -119,6 +119,7 @@ typedef enum _mgmt_table {
TSDB_MGMT_TABLE_QUERIES,
TSDB_MGMT_TABLE_VNODES,
TSDB_MGMT_TABLE_APPS,
TSDB_MGMT_TABLE_STREAM_TASKS,
TSDB_MGMT_TABLE_MAX,
} EShowType;

View File

@ -74,9 +74,8 @@ typedef struct SColumnNode {
char tableName[TSDB_TABLE_NAME_LEN];
char tableAlias[TSDB_TABLE_NAME_LEN];
char colName[TSDB_COL_NAME_LEN];
// SNode* pProjectRef;
int16_t dataBlockId;
int16_t slotId;
int16_t dataBlockId;
int16_t slotId;
} SColumnNode;
typedef struct SColumnRefNode {

View File

@ -39,7 +39,7 @@ if not exist %work_dir%\debug\ver-%2-x86 (
md %work_dir%\debug\ver-%2-x86
)
cd %work_dir%\debug\ver-%2-x64
rem #call vcvarsall.bat x64
call vcvarsall.bat x64
cmake ../../ -G "NMake Makefiles JOM" -DCMAKE_MAKE_PROGRAM=jom -DBUILD_TOOLS=true -DWEBSOCKET=true -DBUILD_HTTP=false -DBUILD_TEST=false -DVERNUMBER=%2 -DCPUTYPE=x64
cmake --build .
rd /s /Q C:\TDengine

View File

@ -61,6 +61,16 @@ Source: {#MyAppSourceDir}{#MyAppExeName}; DestDir: "{app}"; Excludes: {#MyAppExc
Source: {#MyAppSourceDir}{#MyAppTaosdemoExeName}; DestDir: "{app}"; Flags: igNoreversion recursesubdirs createallsubdirs
[run]
Filename: {sys}\sc.exe; Parameters: "create taosd start= DEMAND binPath= ""C:\\TDengine\\taosd.exe --win_service""" ; Flags: runhidden
Filename: {sys}\sc.exe; Parameters: "create taosadapter start= DEMAND binPath= ""C:\\TDengine\\taosadapter.exe --win_service""" ; Flags: runhidden
[UninstallRun]
RunOnceId: "stoptaosd"; Filename: {sys}\sc.exe; Parameters: "stop taosd" ; Flags: runhidden
RunOnceId: "stoptaosadapter"; Filename: {sys}\sc.exe; Parameters: "stop taosadapter" ; Flags: runhidden
RunOnceId: "deltaosd"; Filename: {sys}\sc.exe; Parameters: "delete taosd" ; Flags: runhidden
RunOnceId: "deltaosadapter"; Filename: {sys}\sc.exe; Parameters: "delete taosadapter" ; Flags: runhidden
[Registry]
Root: HKLM; Subkey: "SYSTEM\CurrentControlSet\Control\Session Manager\Environment"; \
ValueType: expandsz; ValueName: "Path"; ValueData: "{olddata};C:\TDengine"; \

View File

@ -134,7 +134,7 @@ static const SSysDbTableSchema userStbsSchema[] = {
};
static const SSysDbTableSchema streamSchema[] = {
{.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "stream_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
@ -145,6 +145,15 @@ static const SSysDbTableSchema streamSchema[] = {
{.name = "trigger", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
};
static const SSysDbTableSchema streamTaskSchema[] = {
{.name = "stream_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "task_id", .bytes = 8, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "node_type", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "node_id", .bytes = 8, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "level", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
};
static const SSysDbTableSchema userTblsSchema[] = {
{.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
@ -287,6 +296,7 @@ static const SSysTableMeta infosMeta[] = {
{TSDB_INS_TABLE_TOPICS, topicSchema, tListLen(topicSchema), false},
{TSDB_INS_TABLE_SUBSCRIPTIONS, subscriptionSchema, tListLen(subscriptionSchema), false},
{TSDB_INS_TABLE_STREAMS, streamSchema, tListLen(streamSchema), false},
{TSDB_INS_TABLE_STREAM_TASKS, streamTaskSchema, tListLen(streamTaskSchema), false},
{TSDB_INS_TABLE_VNODES, vnodesSchema, tListLen(vnodesSchema), true},
};

View File

@ -277,7 +277,9 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, int
pColumnInfoData->varmeta.allocLen = len + oldLen;
}
memcpy(pColumnInfoData->pData + oldLen, pSource->pData, len);
if (pColumnInfoData->pData && pSource->pData) { // TD-20382
memcpy(pColumnInfoData->pData + oldLen, pSource->pData, len);
}
pColumnInfoData->varmeta.length = len + oldLen;
} else {
if (finalNumOfRows > (*capacity)) {

View File

@ -106,6 +106,8 @@ static int32_t convertToRetrieveType(char *name, int32_t len) {
type = TSDB_MGMT_TABLE_STREAMS;
} else if (strncasecmp(name, TSDB_PERFS_TABLE_APPS, len) == 0) {
type = TSDB_MGMT_TABLE_APPS;
} else if (strncasecmp(name, TSDB_INS_TABLE_STREAM_TASKS, len) == 0) {
type = TSDB_MGMT_TABLE_STREAM_TASKS;
} else {
// ASSERT(0);
}

View File

@ -41,6 +41,8 @@ static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq);
static int32_t mndGetStreamMeta(SRpcMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextStream(SMnode *pMnode, void *pIter);
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
int32_t mndInitStream(SMnode *pMnode) {
SSdbTable table = {
@ -62,6 +64,8 @@ int32_t mndInitStream(SMnode *pMnode) {
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndRetrieveStream);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAMS, mndCancelGetNextStream);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndRetrieveStreamTask);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
return sdbSetTable(pMnode->pSdb, table);
}
@ -891,7 +895,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
SName n;
int32_t cols = 0;
char streamName[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)streamName, false);
@ -953,3 +957,105 @@ static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SStreamObj *pStream = NULL;
while (numOfRows < rowsCapacity) {
pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
if (pShow->pIter == NULL) break;
// lock
taosRLockLatch(&pStream->lock);
// count task num
int32_t sz = taosArrayGetSize(pStream->tasks);
int32_t count = 0;
for (int32_t i = 0; i < sz; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
count += taosArrayGetSize(pLevel);
}
if (numOfRows + count > rowsCapacity) {
blockDataEnsureCapacity(pBlock, numOfRows + count);
}
// add row for each task
for (int32_t i = 0; i < sz; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
int32_t levelCnt = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < levelCnt; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
SColumnInfoData *pColInfo;
int32_t cols = 0;
// stream name
char streamName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(streamName, mndGetDbStr(pStream->name), sizeof(streamName));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)streamName, false);
// task id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pTask->taskId, false);
// node type
char nodeType[20 + VARSTR_HEADER_SIZE] = {0};
varDataSetLen(nodeType, 5);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
if (pTask->nodeId > 0) {
memcpy(varDataVal(nodeType), "vnode", 5);
} else {
memcpy(varDataVal(nodeType), "snode", 5);
}
colDataAppend(pColInfo, numOfRows, nodeType, false);
// node id
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
int32_t nodeId = TMAX(pTask->nodeId, 0);
colDataAppend(pColInfo, numOfRows, (const char *)&nodeId, false);
// level
char level[20 + VARSTR_HEADER_SIZE] = {0};
if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
memcpy(varDataVal(level), "source", 6);
varDataSetLen(level, 6);
} else if (pTask->taskLevel == TASK_LEVEL__AGG) {
memcpy(varDataVal(level), "agg", 3);
varDataSetLen(level, 3);
} else if (pTask->taskLevel == TASK_LEVEL__SINK) {
memcpy(varDataVal(level), "sink", 4);
varDataSetLen(level, 4);
} else if (pTask->taskLevel == TASK_LEVEL__SINK) {
}
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&level, false);
// status
char status[20 + VARSTR_HEADER_SIZE] = {0};
char status2[20] = {0};
strcpy(status, "normal");
STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status));
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&status, false);
numOfRows++;
}
}
// unlock
taosRUnLockLatch(&pStream->lock);
sdbRelease(pSdb, pStream);
}
pShow->numOfRows += numOfRows;
return numOfRows;
}
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter);
}

View File

@ -108,8 +108,8 @@ static FORCE_INLINE int64_t tsdbLogicToFileSize(int64_t lSize, int32_t szPage) {
#define TSDBROW_KEY(ROW) ((TSDBKEY){.version = TSDBROW_VERSION(ROW), .ts = TSDBROW_TS(ROW)})
#define tsdbRowFromTSRow(VERSION, TSROW) ((TSDBROW){.type = 0, .version = (VERSION), .pTSRow = (TSROW)})
#define tsdbRowFromBlockData(BLOCKDATA, IROW) ((TSDBROW){.type = 1, .pBlockData = (BLOCKDATA), .iRow = (IROW)})
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
// int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow);
int32_t tsdbRowCmprFn(const void *p1, const void *p2);
// SRowIter
void tRowIterInit(SRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
@ -333,6 +333,8 @@ struct SVersionRange {
typedef struct SMemSkipListNode SMemSkipListNode;
struct SMemSkipListNode {
int8_t level;
int64_t version;
STSRow *pTSRow;
SMemSkipListNode *forwards[0];
};
typedef struct SMemSkipList {
@ -772,14 +774,6 @@ static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level))
static FORCE_INLINE int32_t tGetTSDBRow(uint8_t *p, TSDBROW *pRow) {
int32_t n = tGetI64(p, &pRow->version);
pRow->pTSRow = (STSRow *)(p + n);
n += pRow->pTSRow->len;
return n;
}
static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) {
if (pIter == NULL) return NULL;
@ -798,8 +792,9 @@ static FORCE_INLINE TSDBROW *tsdbTbDataIterGet(STbDataIter *pIter) {
}
}
tGetTSDBRow((uint8_t *)SL_NODE_DATA(pIter->pNode), &pIter->row);
pIter->pRow = &pIter->row;
pIter->pRow->version = pIter->pNode->version;
pIter->pRow->pTSRow = pIter->pNode->pTSRow;
return pIter->pRow;
}

View File

@ -18,10 +18,10 @@
#define MEM_MIN_HASH 1024
#define SL_MAX_LEVEL 5
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l)*2)
// sizeof(SMemSkipListNode) + sizeof(SMemSkipListNode *) * (l) * 2
#define SL_NODE_SIZE(l) (sizeof(SMemSkipListNode) + ((l) << 4))
#define SL_NODE_FORWARD(n, l) ((n)->forwards[l])
#define SL_NODE_BACKWARD(n, l) ((n)->forwards[(n)->level + (l)])
#define SL_NODE_DATA(n) (&SL_NODE_BACKWARD(n, (n)->level))
#define SL_MOVE_BACKWARD 0x1
#define SL_MOVE_FROM_POS 0x2
@ -263,30 +263,27 @@ void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDa
}
bool tsdbTbDataIterNext(STbDataIter *pIter) {
SMemSkipListNode *pHead = pIter->pTbData->sl.pHead;
SMemSkipListNode *pTail = pIter->pTbData->sl.pTail;
pIter->pRow = NULL;
if (pIter->backward) {
ASSERT(pIter->pNode != pTail);
ASSERT(pIter->pNode != pIter->pTbData->sl.pTail);
if (pIter->pNode == pHead) {
if (pIter->pNode == pIter->pTbData->sl.pHead) {
return false;
}
pIter->pNode = SL_NODE_BACKWARD(pIter->pNode, 0);
if (pIter->pNode == pHead) {
if (pIter->pNode == pIter->pTbData->sl.pHead) {
return false;
}
} else {
ASSERT(pIter->pNode != pHead);
ASSERT(pIter->pNode != pIter->pTbData->sl.pHead);
if (pIter->pNode == pTail) {
if (pIter->pNode == pIter->pTbData->sl.pTail) {
return false;
}
pIter->pNode = SL_NODE_FORWARD(pIter->pNode, 0);
if (pIter->pNode == pTail) {
if (pIter->pNode == pIter->pTbData->sl.pTail) {
return false;
}
}
@ -394,7 +391,7 @@ _err:
static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags) {
SMemSkipListNode *px;
SMemSkipListNode *pn;
TSDBKEY *pTKey;
TSDBKEY tKey = {0};
int32_t backward = flags & SL_MOVE_BACKWARD;
int32_t fromPos = flags & SL_MOVE_FROM_POS;
@ -413,9 +410,10 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p
for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
pn = SL_NODE_BACKWARD(px, iLevel);
while (pn != pTbData->sl.pHead) {
pTKey = (TSDBKEY *)SL_NODE_DATA(pn);
tKey.version = pn->version;
tKey.ts = pn->pTSRow->ts;
int32_t c = tsdbKeyCmprFn(pTKey, pKey);
int32_t c = tsdbKeyCmprFn(&tKey, pKey);
if (c <= 0) {
break;
} else {
@ -442,7 +440,10 @@ static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *p
for (int8_t iLevel = pTbData->sl.level - 1; iLevel >= 0; iLevel--) {
pn = SL_NODE_FORWARD(px, iLevel);
while (pn != pTbData->sl.pTail) {
int32_t c = tsdbKeyCmprFn(SL_NODE_DATA(pn), pKey);
tKey.version = pn->version;
tKey.ts = pn->pTSRow->ts;
int32_t c = tsdbKeyCmprFn(&tKey, pKey);
if (c >= 0) {
break;
} else {
@ -467,8 +468,8 @@ static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
return level;
}
static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, TSDBROW *pRow,
int8_t forward) {
static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, int64_t version,
STSRow *pRow, int8_t forward) {
int32_t code = 0;
int8_t level;
SMemSkipListNode *pNode;
@ -477,13 +478,19 @@ static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListN
// node
level = tsdbMemSkipListRandLevel(&pTbData->sl);
ASSERT(pPool != NULL);
pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level) + tPutTSDBRow(NULL, pRow));
pNode = (SMemSkipListNode *)vnodeBufPoolMalloc(pPool, SL_NODE_SIZE(level));
if (pNode == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pNode->level = level;
tPutTSDBRow((uint8_t *)SL_NODE_DATA(pNode), pRow);
pNode->version = version;
pNode->pTSRow = vnodeBufPoolMalloc(pPool, pRow->len);
if (NULL == pNode->pTSRow) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
memcpy(pNode->pTSRow, pRow, pRow->len);
for (int8_t iLevel = level - 1; iLevel >= 0; iLevel--) {
SMemSkipListNode *pn = pos[iLevel];
@ -549,7 +556,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
key.ts = row.pTSRow->ts;
nRow++;
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
code = tbDataDoPut(pMemTable, pTbData, pos, &row, 0);
code = tbDataDoPut(pMemTable, pTbData, pos, version, row.pTSRow, 0);
if (code) {
goto _err;
}
@ -570,7 +577,7 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
}
code = tbDataDoPut(pMemTable, pTbData, pos, &row, 1);
code = tbDataDoPut(pMemTable, pTbData, pos, version, row.pTSRow, 1);
if (code) {
goto _err;
}

View File

@ -3219,7 +3219,7 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDe
static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, int64_t key, SRowMerger* pMerger,
SVersionRange* pVerRange, int32_t step) {
while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) {
while (rowIndex < pBlockData->nRow && rowIndex >= 0 && pBlockData->aTSKEY[rowIndex] == key) {
if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) {
rowIndex += step;
continue;

View File

@ -565,15 +565,15 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *
}
}
int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow) {
int32_t n = 0;
// int32_t tPutTSDBRow(uint8_t *p, TSDBROW *pRow) {
// int32_t n = 0;
n += tPutI64(p, pRow->version);
if (p) memcpy(p + n, pRow->pTSRow, pRow->pTSRow->len);
n += pRow->pTSRow->len;
// n += tPutI64(p, pRow->version);
// if (p) memcpy(p + n, pRow->pTSRow, pRow->pTSRow->len);
// n += pRow->pTSRow->len;
return n;
}
// return n;
// }
int32_t tsdbRowCmprFn(const void *p1, const void *p2) {
return tsdbKeyCmprFn(&TSDBROW_KEY((TSDBROW *)p1), &TSDBROW_KEY((TSDBROW *)p2));
@ -1084,7 +1084,7 @@ static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSch
cv.flag = CV_FLAG_VALUE;
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
void *pData = (char*)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY));
void *pData = (char *)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY));
cv.value.nData = varDataLen(pData);
cv.value.pData = varDataVal(pData);
} else {
@ -1106,7 +1106,7 @@ static int32_t tBlockDataAppendTPRow(SBlockData *pBlockData, STSRow *pRow, STSch
cv.flag = CV_FLAG_VALUE;
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
void *pData = (char*)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY));
void *pData = (char *)pRow + *(int32_t *)(pRow->data + pTColumn->offset - sizeof(TSKEY));
cv.value.nData = varDataLen(pData);
cv.value.pData = varDataVal(pData);
} else {
@ -1151,7 +1151,7 @@ static int32_t tBlockDataAppendKVRow(SBlockData *pBlockData, STSRow *pRow, STSch
ASSERT(pTColumn->type == pColData->type);
SColVal cv = {.cid = pTColumn->colId, .type = pTColumn->type};
TDRowValT vt = TD_VTYPE_NONE; // default is NONE
TDRowValT vt = TD_VTYPE_NONE; // default is NONE
SKvRowIdx *pKvIdx = NULL;
while (kvIter < nKvCols) {

View File

@ -513,6 +513,22 @@ void* taosDestroyFillInfo(SFillInfo* pFillInfo) {
// taosMemoryFreeClear(pFillInfo->pTags[i].tagVal);
// }
// free pFillCol
if (pFillInfo->pFillCol) {
for (int32_t i = 0; i < pFillInfo->numOfCols; i++) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
if (!pCol->notFillCol) {
if (pCol->fillVal.nType == TSDB_DATA_TYPE_VARBINARY || pCol->fillVal.nType == TSDB_DATA_TYPE_VARCHAR ||
pCol->fillVal.nType == TSDB_DATA_TYPE_NCHAR || pCol->fillVal.nType == TSDB_DATA_TYPE_JSON) {
if (pCol->fillVal.pz) {
taosMemoryFree(pCol->fillVal.pz);
pCol->fillVal.pz = NULL;
}
}
}
}
}
taosMemoryFreeClear(pFillInfo->pTags);
taosMemoryFreeClear(pFillInfo->pFillCol);
taosMemoryFreeClear(pFillInfo);

View File

@ -189,7 +189,7 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, cons
}
while (pNode) {
if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) {
if ((keyLen == pNode->keyLen) && (*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) {
break;
}
pNode = pNode->next;
@ -213,10 +213,12 @@ int32_t tSimpleHashPut(SSHashObj *pHashObj, const void *key, size_t keyLen, cons
static FORCE_INLINE SHNode *doSearchInEntryList(SSHashObj *pHashObj, const void *key, size_t keyLen, int32_t index) {
SHNode *pNode = pHashObj->hashList[index];
while (pNode) {
if ((*(pHashObj->equalFp))(GET_SHASH_NODE_KEY(pNode, pNode->dataLen), key, keyLen) == 0) {
const char* p = GET_SHASH_NODE_KEY(pNode, pNode->dataLen);
ASSERT(keyLen > 0);
if (pNode->keyLen == keyLen && ((*(pHashObj->equalFp))(p, key, keyLen) == 0)) {
break;
}
pNode = pNode->next;
}

View File

@ -475,7 +475,7 @@ static int32_t translateNowToday(SFunctionNode* pFunc, char* pErrBuf, int32_t le
// add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision;
int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -1506,7 +1506,7 @@ static int32_t translateIrate(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
// add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision;
int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -1519,7 +1519,7 @@ static int32_t translateInterp(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
uint8_t dbPrec = pFunc->node.resType.precision;
//if (1 != numOfParams && 3 != numOfParams && 4 != numOfParams) {
// if (1 != numOfParams && 3 != numOfParams && 4 != numOfParams) {
if (1 != numOfParams) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
@ -1835,7 +1835,7 @@ static int32_t translateCast(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
// add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision;
int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -1894,7 +1894,7 @@ static int32_t translateToUnixtimestamp(SFunctionNode* pFunc, char* pErrBuf, int
// add database precision as param
uint8_t dbPrec = pFunc->node.resType.precision;
int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
int32_t code = addDbPrecisonParam(&pFunc->pParameterList, dbPrec);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -2474,7 +2474,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "first",
.type = FUNCTION_TYPE_FIRST,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC,
.translateFunc = translateFirstLast,
.dynDataRequiredFunc = firstDynDataReq,
.getEnvFunc = getFirstLastFuncEnv,
@ -2512,7 +2512,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "last",
.type = FUNCTION_TYPE_LAST,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC,
.classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_KEEP_ORDER_FUNC,
.translateFunc = translateFirstLast,
.dynDataRequiredFunc = lastDynDataReq,
.getEnvFunc = getFirstLastFuncEnv,

View File

@ -622,7 +622,7 @@ void nodesDestroyNode(SNode* pNode) {
}
switch (nodeType(pNode)) {
case QUERY_NODE_COLUMN: // pProjectRef is weak reference, no need to release
case QUERY_NODE_COLUMN:
destroyExprNode((SExprNode*)pNode);
break;
case QUERY_NODE_VALUE: {

View File

@ -744,7 +744,8 @@ static bool isPrimaryKeyImpl(SNode* pExpr) {
return (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pExpr)->colId);
} else if (QUERY_NODE_FUNCTION == nodeType(pExpr)) {
SFunctionNode* pFunc = (SFunctionNode*)pExpr;
if (FUNCTION_TYPE_SELECT_VALUE == pFunc->funcType) {
if (FUNCTION_TYPE_SELECT_VALUE == pFunc->funcType || FUNCTION_TYPE_FIRST == pFunc->funcType ||
FUNCTION_TYPE_LAST == pFunc->funcType) {
return isPrimaryKeyImpl(nodesListGetNode(pFunc->pParameterList, 0));
} else if (FUNCTION_TYPE_WSTART == pFunc->funcType || FUNCTION_TYPE_WEND == pFunc->funcType) {
return true;
@ -787,7 +788,6 @@ static void setColumnInfoBySchema(const SRealTableNode* pTable, const SSchema* p
static void setColumnInfoByExpr(STempTableNode* pTable, SExprNode* pExpr, SColumnNode** pColRef) {
SColumnNode* pCol = *pColRef;
// pCol->pProjectRef = (SNode*)pExpr;
if (NULL == pExpr->pAssociation) {
pExpr->pAssociation = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
}
@ -2932,8 +2932,8 @@ static int32_t checkFill(STranslateContext* pCxt, SFillNode* pFill, SValueNode*
return TSDB_CODE_SUCCESS;
}
if (TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_INITIALIZER) ||
TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_DESC_INITIALIZER)) {
if (!pCxt->createStream && (TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_INITIALIZER) ||
TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_DESC_INITIALIZER))) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE);
}
@ -5268,9 +5268,7 @@ static int32_t checkTopicQuery(STranslateContext* pCxt, SSelectStmt* pSelect) {
}
static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pStmt, SCMCreateTopicReq* pReq) {
SName name;
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName));
tNameGetFullDbName(&name, pReq->name);
snprintf(pReq->name, sizeof(pReq->name), "%d.%s", pCxt->pParseCxt->acctId, pStmt->topicName);
pReq->igExists = pStmt->ignoreExists;
pReq->withMeta = pStmt->withMeta;
@ -5280,7 +5278,7 @@ static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pS
}
int32_t code = TSDB_CODE_SUCCESS;
SName name;
if ('\0' != pStmt->subSTbName[0]) {
pReq->subType = TOPIC_SUB_TYPE__TABLE;
toName(pCxt->pParseCxt->acctId, pStmt->subDbName, pStmt->subSTbName, &name);
@ -5548,6 +5546,10 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SSelectStmt* pSelect) {
crossTableWithUdaf(pSelect)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
}
if (NULL != pSelect->pSubtable && TSDB_DATA_TYPE_VARCHAR != ((SExprNode*)pSelect->pSubtable)->resType.type) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"SUBTABLE expression must be of VARCHAR type");
}
return TSDB_CODE_SUCCESS;
}

View File

@ -37,43 +37,6 @@
// /\ UNCHANGED <<serverVars, candidateVars, logVars, elections>>
//
// only start once
static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, SyncTerm lastApplyTerm,
SyncAppendEntriesReply* pMsg) {
if (beginIndex > endIndex) {
sNError(ths, "snapshot param error, start:%" PRId64 ", end:%" PRId64, beginIndex, endIndex);
return;
}
// get sender
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));
ASSERT(pSender != NULL);
if (snapshotSenderIsStart(pSender)) {
sSError(pSender, "snapshot sender already start");
return;
}
SSnapshot snapshot = {
.data = NULL, .lastApplyIndex = endIndex, .lastApplyTerm = lastApplyTerm, .lastConfigIndex = SYNC_INDEX_INVALID};
void* pReader = NULL;
SSnapshotParam readerParam = {.start = beginIndex, .end = endIndex};
int32_t code = ths->pFsm->FpSnapshotStartRead(ths->pFsm, &readerParam, &pReader);
ASSERT(code == 0);
#if 0
if (pMsg->privateTerm < pSender->privateTerm) {
ASSERT(pReader != NULL);
snapshotSenderStart(pSender, readerParam, snapshot, pReader);
} else {
if (pReader != NULL) {
ths->pFsm->FpSnapshotStopRead(ths->pFsm, pReader);
}
}
#endif
}
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
int32_t ret = 0;
SyncAppendEntriesReply* pMsg = pRpcMsg->pCont;

View File

@ -385,7 +385,7 @@ bool syncIsReadyForRead(int64_t rid) {
if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
SSyncRaftEntry* pEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(
pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry);
pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry);
if (code == 0 && pEntry != NULL) {
if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) {
ready = true;
@ -1806,7 +1806,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_PING, atomic_load_64(&pNode->pingTimerLogicClock),
pNode->pingTimerMS, pNode);
if (code != 0) {
sNError(pNode, "failed to build ping msg");
sError("failed to build ping msg");
rpcFreeCont(rpcMsg.pCont);
return;
}
@ -1814,7 +1814,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
sNTrace(pNode, "enqueue ping msg");
code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
if (code != 0) {
sNError(pNode, "failed to sync enqueue ping msg since %s", terrstr());
sError("failed to sync enqueue ping msg since %s", terrstr());
rpcFreeCont(rpcMsg.pCont);
return;
}
@ -1839,7 +1839,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
int32_t code = syncBuildTimeout(&rpcMsg, SYNC_TIMEOUT_ELECTION, pElectTimer->logicClock, pNode->electTimerMS, pNode);
if (code != 0) {
sNError(pNode, "failed to build elect msg");
sError("failed to build elect msg");
taosMemoryFree(pElectTimer);
return;
}
@ -1849,7 +1849,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
if (code != 0) {
sNError(pNode, "failed to sync enqueue elect msg since %s", terrstr());
sError("failed to sync enqueue elect msg since %s", terrstr());
rpcFreeCont(rpcMsg.pCont);
taosMemoryFree(pElectTimer);
return;
@ -1879,14 +1879,14 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
pNode->heartbeatTimerMS, pNode);
if (code != 0) {
sNError(pNode, "failed to build heartbeat msg");
sError("failed to build heartbeat msg");
return;
}
sNTrace(pNode, "enqueue heartbeat timer");
code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
if (code != 0) {
sNError(pNode, "failed to enqueue heartbeat msg since %s", terrstr());
sError("failed to enqueue heartbeat msg since %s", terrstr());
rpcFreeCont(rpcMsg.pCont);
return;
}
@ -1971,7 +1971,7 @@ static int32_t syncNodeEqNoop(SSyncNode* pNode) {
sNTrace(pNode, "propose msg, type:noop");
code = (*pNode->syncEqMsg)(pNode->msgcb, &rpcMsg);
if (code != 0) {
sNError(pNode, "failed to propose noop msg while enqueue since %s", terrstr());
sError("failed to propose noop msg while enqueue since %s", terrstr());
}
return code;
@ -2005,7 +2005,7 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
if (ths->state == TAOS_SYNC_STATE_LEADER) {
int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
if (code != 0) {
sNError(ths, "append noop error");
sError("append noop error");
return -1;
}
}
@ -2109,7 +2109,7 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
syncNodeFollowerCommit(ths, pMsg->fcIndex);
} else {
sNError(ths, "error local cmd");
sError("error local cmd");
}
return 0;

View File

@ -197,7 +197,12 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
syncMeta.isWeek = pEntry->isWeak;
syncMeta.seqNum = pEntry->seqNum;
syncMeta.term = pEntry->term;
int64_t tsWriteBegin = taosGetTimestampNs();
index = walAppendLog(pWal, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
int64_t tsWriteEnd = taosGetTimestampNs();
int64_t tsElapsed = tsWriteEnd - tsWriteBegin;
if (index < 0) {
int32_t err = terrno;
const char* errStr = tstrerror(err);
@ -210,8 +215,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
}
pEntry->index = index;
sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s", pEntry->index,
TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType));
sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index,
TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed);
return 0;
}
@ -234,9 +239,13 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR
return -1;
}
int64_t ts1 = taosGetTimestampNs();
taosThreadMutexLock(&(pData->mutex));
int64_t ts2 = taosGetTimestampNs();
code = walReadVer(pWalHandle, index);
int64_t ts3 = taosGetTimestampNs();
// code = walReadVerCached(pWalHandle, index);
if (code != 0) {
int32_t err = terrno;
@ -280,6 +289,18 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR
*/
taosThreadMutexUnlock(&(pData->mutex));
int64_t ts4 = taosGetTimestampNs();
int64_t tsElapsed = ts4 - ts1;
int64_t tsElapsedLock = ts2 - ts1;
int64_t tsElapsedRead = ts3 - ts2;
int64_t tsElapsedBuild = ts4 - ts3;
sNTrace(pData->pSyncNode,
"read index:%" PRId64 ", elapsed:%" PRId64 ", elapsed-lock:%" PRId64 ", elapsed-read:%" PRId64
", elapsed-build:%" PRId64,
index, tsElapsed, tsElapsedLock, tsElapsedRead, tsElapsedBuild);
return code;
}

View File

@ -72,7 +72,7 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh
SRpcMsg rpcMsg = {0};
SyncAppendEntries* pMsg = NULL;
SSyncRaftEntry* pEntry;
SSyncRaftEntry* pEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry);
if (code == 0) {
@ -99,6 +99,8 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh
}
}
syncEntryDestory(pEntry);
// prepare msg
ASSERT(pMsg != NULL);
pMsg->srcId = pSyncNode->myRaftId;
@ -140,9 +142,10 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) {
int32_t ret = 0;
SyncAppendEntries* pMsg = pRpcMsg->pCont;
syncLogSendAppendEntries(pSyncNode, pMsg, "");
syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg);
if (pMsg == NULL) {
sError("vgId:%d, sync-append-entries msg is NULL", pSyncNode->vgId);
return 0;
}
SPeerState* pState = syncNodeGetPeerState(pSyncNode, destRaftId);
if (pState == NULL) {
@ -150,8 +153,19 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftI
return 0;
}
// save index, otherwise pMsg will be free by rpc
SyncIndex saveLastSendIndex = pState->lastSendIndex;
bool update = false;
if (pMsg->dataLen > 0) {
pState->lastSendIndex = pMsg->prevLogIndex + 1;
saveLastSendIndex = pMsg->prevLogIndex + 1;
update = true;
}
syncLogSendAppendEntries(pSyncNode, pMsg, "");
syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg);
if (update) {
pState->lastSendIndex = saveLastSendIndex;
pState->lastSendTime = taosGetTimestampMs();
}

View File

@ -194,6 +194,7 @@ static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNode* pNode, const char* format, ...) {
if (pNode == NULL || pNode->pRaftCfg != NULL && pNode->pRaftStore == NULL || pNode->pLogStore == NULL) return;
int64_t currentTerm = pNode->pRaftStore->currentTerm;
// save error code, otherwise it will be overwritten
int32_t errCode = terrno;
@ -235,8 +236,8 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64
", snap-tm:%" PRIu64 ", sby:%d, aq:%d, bch:%d, r-num:%d, lcfg:%" PRId64
", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s",
pNode->vgId, syncStr(pNode->state), eventLog, pNode->pRaftStore->currentTerm, pNode->commitIndex,
logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex,
logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pNode->pRaftCfg->isStandBy, aqItems, pNode->pRaftCfg->batchSize, pNode->replicaNum,
pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum,
pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr);
@ -374,9 +375,9 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64 ", success:%d, match:%" PRId64
"}, %s",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
"recv sync-append-entries-reply from %s:%d {term:%" PRId64 ", pterm:%" PRId64
", success:%d, lsend-index:%" PRId64 ", match:%" PRId64 "}, %s",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s);
}
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, const char* s) {
@ -511,8 +512,8 @@ void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMs
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
sNTrace(pSyncNode,
"send sync-append-entries to %s:%d, {term:%" PRId64 ", pre-index:%" PRId64 ", pre-term:%" PRId64
", pterm:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
", lsend-index:%" PRId64 ", cmt:%" PRId64 ", datalen:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, (pMsg->prevLogIndex + 1), pMsg->commitIndex,
pMsg->dataLen, s);
}

View File

@ -124,8 +124,8 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
goto _err;
}
char* candidate = NULL;
char* haystack = buf;
char* candidate = NULL;
char* haystack = buf;
int64_t pos = 0;
SWalCkHead* logContent = NULL;
@ -414,8 +414,10 @@ int walCheckAndRepairMeta(SWal* pWal) {
}
ASSERT(pFileInfo->fileSize == 0);
// remove the empty wal log, and its idx
wInfo("vgId:%d, wal remove empty file %s", pWal->cfg.vgId, fnameStr);
taosRemoveFile(fnameStr);
walBuildIdxName(pWal, pFileInfo->firstVer, fnameStr);
wInfo("vgId:%d, wal remove empty file %s", pWal->cfg.vgId, fnameStr);
taosRemoveFile(fnameStr);
// remove its meta entry
taosArrayRemove(pWal->fileInfoSet, fileIdx);

View File

@ -407,6 +407,7 @@ int32_t walRollImpl(SWal *pWal) {
}
walBuildLogName(pWal, newFileFirstVer, fnameStr);
pLogFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
wDebug("vgId:%d, wal create new file for write:%s", pWal->cfg.vgId, fnameStr);
if (pLogFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
code = -1;

View File

@ -271,8 +271,11 @@ static int32_t cfgSetTimezone(SConfigItem *pItem, const char *value, ECfgSrcType
cfgStypeStr(stype), value, terrstr());
return -1;
}
pItem->stype = stype;
// apply new timezone
osSetTimezone(value);
return 0;
}

View File

@ -87,6 +87,7 @@ ELSE ()
MESSAGE("CURRENT SOURCE DIR ${CMAKE_CURRENT_SOURCE_DIR}")
IF (TD_WINDOWS)
MESSAGE("Building taosAdapter on Windows")
INCLUDE(ExternalProject)
ExternalProject_Add(taosadapter
PREFIX "taosadapter"
@ -104,14 +105,18 @@ ELSE ()
COMMAND go build -a -o taosadapter-debug.exe -ldflags "-X github.com/taosdata/taosadapter/v3/version.Version=${taos_version} -X github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}"
INSTALL_COMMAND
COMMAND cmake -E time upx taosadapter ||:
COMMAND cmake -E copy taosadapter.exe ${CMAKE_BINARY_DIR}/build/bin
COMMAND cmake -E echo "Comparessing taosadapter.exe"
COMMAND cmake -E time upx taosadapter.exe
COMMAND cmake -E echo "Copy taosadapter.exe"
COMMAND cmake -E copy taosadapter.exe ${CMAKE_BINARY_DIR}/build/bin/taosadapter.exe
COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/test/cfg/
COMMAND cmake -E echo "Copy taosadapter.toml"
COMMAND cmake -E copy ./example/config/taosadapter.toml ${CMAKE_BINARY_DIR}/test/cfg/
COMMAND cmake -E copy ./taosadapter.service ${CMAKE_BINARY_DIR}/test/cfg/
COMMAND cmake -E echo "Copy taosadapter-debug.exe"
COMMAND cmake -E copy taosadapter-debug.exe ${CMAKE_BINARY_DIR}/build/bin
)
ELSE (TD_WINDOWS)
MESSAGE("Building taosAdapter on non-Windows")
INCLUDE(ExternalProject)
ExternalProject_Add(taosadapter
PREFIX "taosadapter"
@ -126,11 +131,15 @@ ELSE ()
COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -ldflags "-s -w -X github.com/taosdata/taosadapter/v3/version.Version=${taos_version} -X github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}"
COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -o taosadapter-debug -ldflags "-X github.com/taosdata/taosadapter/v3/version.Version=${taos_version} -X github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}"
INSTALL_COMMAND
COMMAND cmake -E echo "Comparessing taosadapter.exe"
COMMAND upx taosadapter || :
COMMAND cmake -E echo "Copy taosadapter"
COMMAND cmake -E copy taosadapter ${CMAKE_BINARY_DIR}/build/bin
COMMAND cmake -E make_directory ${CMAKE_BINARY_DIR}/test/cfg/
COMMAND cmake -E echo "Copy taosadapter.toml"
COMMAND cmake -E copy ./example/config/taosadapter.toml ${CMAKE_BINARY_DIR}/test/cfg/
COMMAND cmake -E copy ./taosadapter.service ${CMAKE_BINARY_DIR}/test/cfg/
COMMAND cmake -E echo "Copy taosadapter-debug"
COMMAND cmake -E copy taosadapter-debug ${CMAKE_BINARY_DIR}/build/bin
)
ENDIF (TD_WINDOWS)