Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/TD-30967

This commit is contained in:
54liuyao 2024-07-16 13:37:24 +08:00
commit 4ca7d3c40c
2 changed files with 51 additions and 42 deletions

View File

@ -15,9 +15,9 @@
#define _DEFAULT_SOURCE
#include "tmisce.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "tjson.h"
#include "tdatablock.h"
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp) {
pEp->port = 0;
@ -178,57 +178,57 @@ int32_t epsetToStr(const SEpSet* pEpSet, char* pBuf, int32_t cap) {
int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime) {
int32_t code = 0;
SJson* pJson = tjsonCreateObject();
SJson* pJson = tjsonCreateObject();
if (pJson == NULL) return TSDB_CODE_OUT_OF_MEMORY;
char tmp[4096] = {0};
TAOS_CHECK_GOTO(code = tjsonAddDoubleToObject(pJson, "reportVersion", 1), NULL, _exit);
TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "reportVersion", 1), NULL, _exit);
TAOS_CHECK_GOTO(code = tjsonAddIntegerToObject(pJson, "clusterId", clusterId), NULL, _exit);
TAOS_CHECK_GOTO(code = tjsonAddIntegerToObject(pJson, "startTime", startTime), NULL, _exit);
TAOS_CHECK_GOTO(tjsonAddIntegerToObject(pJson, "clusterId", clusterId), NULL, _exit);
TAOS_CHECK_GOTO(tjsonAddIntegerToObject(pJson, "startTime", startTime), NULL, _exit);
// Do NOT invoke the taosGetFqdn here.
// this function may be invoked when memory exception occurs,so we should assume that it is running in a memory locked
// environment. The lock operation by taosGetFqdn may cause this program deadlock.
TAOS_CHECK_GOTO(code = tjsonAddStringToObject(pJson, "fqdn", tsLocalFqdn), NULL, _exit);
TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "fqdn", tsLocalFqdn), NULL, _exit);
TAOS_CHECK_GOTO(code = tjsonAddIntegerToObject(pJson, "pid", taosGetPId()), NULL, _exit);
TAOS_CHECK_GOTO(tjsonAddIntegerToObject(pJson, "pid", taosGetPId()), NULL, _exit);
code = taosGetAppName(tmp, NULL);
if (code != 0) {
code = TAOS_SYSTEM_ERROR(errno);
TAOS_CHECK_GOTO(code, NULL, _exit);
}
TAOS_CHECK_GOTO(code = tjsonAddStringToObject(pJson, "appName", tmp), NULL, _exit);
TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "appName", tmp), NULL, _exit);
if (taosGetOsReleaseName(tmp, NULL, NULL, sizeof(tmp)) == 0) {
TAOS_CHECK_GOTO(code = tjsonAddStringToObject(pJson, "os", tmp), NULL, _exit);
TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "os", tmp), NULL, _exit);
} else {
// do nothing
}
float numOfCores = 0;
if (taosGetCpuInfo(tmp, sizeof(tmp), &numOfCores) == 0) {
TAOS_CHECK_GOTO(code = tjsonAddStringToObject(pJson, "cpuModel", tmp), NULL, _exit);
TAOS_CHECK_GOTO(code = tjsonAddDoubleToObject(pJson, "numOfCpu", numOfCores), NULL, _exit);
TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "cpuModel", tmp), NULL, _exit);
TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfCpu", numOfCores), NULL, _exit);
} else {
TAOS_CHECK_GOTO(code = tjsonAddDoubleToObject(pJson, "numOfCpu", tsNumOfCores), NULL, _exit);
TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfCpu", tsNumOfCores), NULL, _exit);
}
int32_t nBytes = snprintf(tmp, sizeof(tmp), "%" PRId64 " kB", tsTotalMemoryKB);
if (nBytes <= 9 || nBytes >= sizeof(tmp)) {
TAOS_CHECK_GOTO(code = TSDB_CODE_OUT_OF_RANGE, NULL, _exit);
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_RANGE, NULL, _exit);
}
TAOS_CHECK_GOTO(code = tjsonAddStringToObject(pJson, "memory", tmp), NULL, _exit);
TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "memory", tmp), NULL, _exit);
TAOS_CHECK_GOTO(code = tjsonAddStringToObject(pJson, "version", version), NULL, _exit);
TAOS_CHECK_GOTO(code = tjsonAddStringToObject(pJson, "buildInfo", buildinfo), NULL, _exit);
TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "version", version), NULL, _exit);
TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "buildInfo", buildinfo), NULL, _exit);
TAOS_CHECK_GOTO(code = tjsonAddStringToObject(pJson, "gitInfo", gitinfo), NULL, _exit);
TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "gitInfo", gitinfo), NULL, _exit);
TAOS_CHECK_GOTO(code = tjsonAddIntegerToObject(pJson, "crashSig", signum), NULL, _exit);
TAOS_CHECK_GOTO(code = tjsonAddIntegerToObject(pJson, "crashTs", taosGetTimestampUs()), NULL, _exit);
TAOS_CHECK_GOTO(tjsonAddIntegerToObject(pJson, "crashSig", signum), NULL, _exit);
TAOS_CHECK_GOTO(tjsonAddIntegerToObject(pJson, "crashTs", taosGetTimestampUs()), NULL, _exit);
#ifdef _TD_DARWIN_64
taosLogTraceToBuf(tmp, sizeof(tmp), 4);
@ -238,7 +238,7 @@ int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t
taosLogTraceToBuf(tmp, sizeof(tmp), 8);
#endif
TAOS_CHECK_GOTO(code = tjsonAddStringToObject(pJson, "stackInfo", tmp), NULL, _exit);
TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "stackInfo", tmp), NULL, _exit);
char* pCont = tjsonToString(pJson);
if (pCont == NULL) {
@ -256,8 +256,8 @@ _exit:
}
int32_t dumpConfToDataBlock(SSDataBlock* pBlock, int32_t startCol) {
int32_t code = 0;
SConfig* pConf = taosGetCfg();
int32_t code = 0;
SConfig* pConf = taosGetCfg();
if (pConf == NULL) {
return TSDB_CODE_INVALID_CFG;
}
@ -269,7 +269,7 @@ int32_t dumpConfToDataBlock(SSDataBlock* pBlock, int32_t startCol) {
int8_t locked = 0;
TAOS_CHECK_GOTO(code = blockDataEnsureCapacity(pBlock, cfgGetSize(pConf)), NULL, _exit);
TAOS_CHECK_GOTO(blockDataEnsureCapacity(pBlock, cfgGetSize(pConf)), NULL, _exit);
pIter = cfgCreateIter(pConf);
if (pIter == NULL) {
@ -293,7 +293,7 @@ int32_t dumpConfToDataBlock(SSDataBlock* pBlock, int32_t startCol) {
TAOS_CHECK_GOTO(code, NULL, _exit);
}
TAOS_CHECK_GOTO(code = colDataSetVal(pColInfo, numOfRows, name, false), NULL, _exit);
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, name, false), NULL, _exit);
char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0};
int32_t valueLen = 0;
@ -306,7 +306,7 @@ int32_t dumpConfToDataBlock(SSDataBlock* pBlock, int32_t startCol) {
TAOS_CHECK_GOTO(code, NULL, _exit);
}
TAOS_CHECK_GOTO(code = colDataSetVal(pColInfo, numOfRows, value, false), NULL, _exit);
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, value, false), NULL, _exit);
char scope[TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE] = {0};
cfgDumpItemScope(pItem, &scope[VARSTR_HEADER_SIZE], TSDB_CONFIG_SCOPE_LEN, &valueLen);
@ -317,7 +317,7 @@ int32_t dumpConfToDataBlock(SSDataBlock* pBlock, int32_t startCol) {
code = TSDB_CODE_OUT_OF_RANGE;
TAOS_CHECK_GOTO(code, NULL, _exit);
}
TAOS_CHECK_GOTO(code = colDataSetVal(pColInfo, numOfRows, scope, false), NULL, _exit);
TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, scope, false), NULL, _exit);
numOfRows++;
}

View File

@ -46,6 +46,7 @@ static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid,
static SArray* createDefaultTagColName();
static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName,
int64_t gid, bool newSubTableRule);
static int32_t doCreateSinkInfo(const char* pDstTableName, STableSinkInfo** pInfo);
int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq,
const char* pIdStr, bool newSubTableRule) {
@ -270,6 +271,14 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S
pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1);
taosArrayPush(reqs.pArray, pCreateTbReq);
STableSinkInfo* pInfo = NULL;
bool alreadyCached = tqGetTableInfo(pTask->outputInfo.tbSink.pTblInfo, gid, &pInfo);
if (!alreadyCached) {
code = doCreateSinkInfo(pCreateTbReq->name, &pInfo);
doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pInfo, gid, pTask->id.idStr);
}
tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name);
}
@ -634,6 +643,18 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI
return TSDB_CODE_SUCCESS;
}
int32_t doCreateSinkInfo(const char* pDstTableName, STableSinkInfo** pInfo) {
int32_t nameLen = strlen(pDstTableName);
(*pInfo) = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen + 1);
if (*pInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pInfo)->name.len = nameLen;
memcpy((*pInfo)->name.data, pDstTableName, nameLen);
return TSDB_CODE_SUCCESS;
}
int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName,
SSubmitTbData* pTableData) {
uint64_t groupId = pDataBlock->info.id.groupId;
@ -670,22 +691,15 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) &&
!alreadyAddGroupId(dstTableName, groupId) && groupId != 0) {
tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName);
if(pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER){
if (pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
buildCtbNameAddGroupId(NULL, dstTableName, groupId);
}else if(pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER && stbFullName) {
} else if (pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER && stbFullName) {
buildCtbNameAddGroupId(stbFullName, dstTableName, groupId);
}
}
}
int32_t nameLen = strlen(dstTableName);
pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen + 1);
if (pTableSinkInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pTableSinkInfo->name.len = nameLen;
memcpy(pTableSinkInfo->name.data, dstTableName, nameLen);
int32_t code = doCreateSinkInfo(dstTableName, &pTableSinkInfo);
tqDebug("s-task:%s build new sinkTableInfo to add cache, dstTable:%s", id, dstTableName);
}
@ -693,7 +707,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
pTableData->uid = pTableSinkInfo->uid;
if (pTableData->uid == 0) {
tqTrace("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id);
tqTrace("s-task:%s cached tableInfo:%s uid is invalid, acquire it from meta", id, pTableSinkInfo->name.data);
return doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid);
} else {
tqTrace("s-task:%s set the dstTable uid from cache:%" PRId64, id, pTableData->uid);
@ -929,11 +943,6 @@ bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) {
}
int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id) {
if (tSimpleHashGetSize(pSinkTableMap) > MAX_CACHE_TABLE_INFO_NUM) {
taosMemoryFreeClear(pTableSinkInfo); // too many items, failed to cache it
return TSDB_CODE_FAILED;
}
int32_t code = tSimpleHashPut(pSinkTableMap, &groupId, sizeof(uint64_t), &pTableSinkInfo, POINTER_BYTES);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pTableSinkInfo);