From bdd6ba6b90f53035ec27149d9c4ea2ccf494eea3 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 16 Jul 2024 09:18:54 +0800 Subject: [PATCH 1/2] adjust format --- source/common/src/tmisce.c | 52 +++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/source/common/src/tmisce.c b/source/common/src/tmisce.c index 7009cf26d0..7b349e91b0 100644 --- a/source/common/src/tmisce.c +++ b/source/common/src/tmisce.c @@ -15,9 +15,9 @@ #define _DEFAULT_SOURCE #include "tmisce.h" +#include "tdatablock.h" #include "tglobal.h" #include "tjson.h" -#include "tdatablock.h" int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp) { pEp->port = 0; @@ -178,57 +178,57 @@ int32_t epsetToStr(const SEpSet* pEpSet, char* pBuf, int32_t cap) { int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime) { int32_t code = 0; - SJson* pJson = tjsonCreateObject(); + SJson* pJson = tjsonCreateObject(); if (pJson == NULL) return TSDB_CODE_OUT_OF_MEMORY; char tmp[4096] = {0}; - TAOS_CHECK_GOTO(code = tjsonAddDoubleToObject(pJson, "reportVersion", 1), NULL, _exit); + TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "reportVersion", 1), NULL, _exit); - TAOS_CHECK_GOTO(code = tjsonAddIntegerToObject(pJson, "clusterId", clusterId), NULL, _exit); - TAOS_CHECK_GOTO(code = tjsonAddIntegerToObject(pJson, "startTime", startTime), NULL, _exit); + TAOS_CHECK_GOTO(tjsonAddIntegerToObject(pJson, "clusterId", clusterId), NULL, _exit); + TAOS_CHECK_GOTO(tjsonAddIntegerToObject(pJson, "startTime", startTime), NULL, _exit); // Do NOT invoke the taosGetFqdn here. // this function may be invoked when memory exception occurs,so we should assume that it is running in a memory locked // environment. The lock operation by taosGetFqdn may cause this program deadlock. - TAOS_CHECK_GOTO(code = tjsonAddStringToObject(pJson, "fqdn", tsLocalFqdn), NULL, _exit); + TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "fqdn", tsLocalFqdn), NULL, _exit); - TAOS_CHECK_GOTO(code = tjsonAddIntegerToObject(pJson, "pid", taosGetPId()), NULL, _exit); + TAOS_CHECK_GOTO(tjsonAddIntegerToObject(pJson, "pid", taosGetPId()), NULL, _exit); code = taosGetAppName(tmp, NULL); if (code != 0) { code = TAOS_SYSTEM_ERROR(errno); TAOS_CHECK_GOTO(code, NULL, _exit); } - TAOS_CHECK_GOTO(code = tjsonAddStringToObject(pJson, "appName", tmp), NULL, _exit); + TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "appName", tmp), NULL, _exit); if (taosGetOsReleaseName(tmp, NULL, NULL, sizeof(tmp)) == 0) { - TAOS_CHECK_GOTO(code = tjsonAddStringToObject(pJson, "os", tmp), NULL, _exit); + TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "os", tmp), NULL, _exit); } else { // do nothing } float numOfCores = 0; if (taosGetCpuInfo(tmp, sizeof(tmp), &numOfCores) == 0) { - TAOS_CHECK_GOTO(code = tjsonAddStringToObject(pJson, "cpuModel", tmp), NULL, _exit); - TAOS_CHECK_GOTO(code = tjsonAddDoubleToObject(pJson, "numOfCpu", numOfCores), NULL, _exit); + TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "cpuModel", tmp), NULL, _exit); + TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfCpu", numOfCores), NULL, _exit); } else { - TAOS_CHECK_GOTO(code = tjsonAddDoubleToObject(pJson, "numOfCpu", tsNumOfCores), NULL, _exit); + TAOS_CHECK_GOTO(tjsonAddDoubleToObject(pJson, "numOfCpu", tsNumOfCores), NULL, _exit); } int32_t nBytes = snprintf(tmp, sizeof(tmp), "%" PRId64 " kB", tsTotalMemoryKB); if (nBytes <= 9 || nBytes >= sizeof(tmp)) { - TAOS_CHECK_GOTO(code = TSDB_CODE_OUT_OF_RANGE, NULL, _exit); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_RANGE, NULL, _exit); } - TAOS_CHECK_GOTO(code = tjsonAddStringToObject(pJson, "memory", tmp), NULL, _exit); + TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "memory", tmp), NULL, _exit); - TAOS_CHECK_GOTO(code = tjsonAddStringToObject(pJson, "version", version), NULL, _exit); - TAOS_CHECK_GOTO(code = tjsonAddStringToObject(pJson, "buildInfo", buildinfo), NULL, _exit); + TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "version", version), NULL, _exit); + TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "buildInfo", buildinfo), NULL, _exit); - TAOS_CHECK_GOTO(code = tjsonAddStringToObject(pJson, "gitInfo", gitinfo), NULL, _exit); + TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "gitInfo", gitinfo), NULL, _exit); - TAOS_CHECK_GOTO(code = tjsonAddIntegerToObject(pJson, "crashSig", signum), NULL, _exit); - TAOS_CHECK_GOTO(code = tjsonAddIntegerToObject(pJson, "crashTs", taosGetTimestampUs()), NULL, _exit); + TAOS_CHECK_GOTO(tjsonAddIntegerToObject(pJson, "crashSig", signum), NULL, _exit); + TAOS_CHECK_GOTO(tjsonAddIntegerToObject(pJson, "crashTs", taosGetTimestampUs()), NULL, _exit); #ifdef _TD_DARWIN_64 taosLogTraceToBuf(tmp, sizeof(tmp), 4); @@ -238,7 +238,7 @@ int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t taosLogTraceToBuf(tmp, sizeof(tmp), 8); #endif - TAOS_CHECK_GOTO(code = tjsonAddStringToObject(pJson, "stackInfo", tmp), NULL, _exit); + TAOS_CHECK_GOTO(tjsonAddStringToObject(pJson, "stackInfo", tmp), NULL, _exit); char* pCont = tjsonToString(pJson); if (pCont == NULL) { @@ -256,8 +256,8 @@ _exit: } int32_t dumpConfToDataBlock(SSDataBlock* pBlock, int32_t startCol) { - int32_t code = 0; - SConfig* pConf = taosGetCfg(); + int32_t code = 0; + SConfig* pConf = taosGetCfg(); if (pConf == NULL) { return TSDB_CODE_INVALID_CFG; } @@ -269,7 +269,7 @@ int32_t dumpConfToDataBlock(SSDataBlock* pBlock, int32_t startCol) { int8_t locked = 0; - TAOS_CHECK_GOTO(code = blockDataEnsureCapacity(pBlock, cfgGetSize(pConf)), NULL, _exit); + TAOS_CHECK_GOTO(blockDataEnsureCapacity(pBlock, cfgGetSize(pConf)), NULL, _exit); pIter = cfgCreateIter(pConf); if (pIter == NULL) { @@ -293,7 +293,7 @@ int32_t dumpConfToDataBlock(SSDataBlock* pBlock, int32_t startCol) { TAOS_CHECK_GOTO(code, NULL, _exit); } - TAOS_CHECK_GOTO(code = colDataSetVal(pColInfo, numOfRows, name, false), NULL, _exit); + TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, name, false), NULL, _exit); char value[TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE] = {0}; int32_t valueLen = 0; @@ -306,7 +306,7 @@ int32_t dumpConfToDataBlock(SSDataBlock* pBlock, int32_t startCol) { TAOS_CHECK_GOTO(code, NULL, _exit); } - TAOS_CHECK_GOTO(code = colDataSetVal(pColInfo, numOfRows, value, false), NULL, _exit); + TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, value, false), NULL, _exit); char scope[TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE] = {0}; cfgDumpItemScope(pItem, &scope[VARSTR_HEADER_SIZE], TSDB_CONFIG_SCOPE_LEN, &valueLen); @@ -317,7 +317,7 @@ int32_t dumpConfToDataBlock(SSDataBlock* pBlock, int32_t startCol) { code = TSDB_CODE_OUT_OF_RANGE; TAOS_CHECK_GOTO(code, NULL, _exit); } - TAOS_CHECK_GOTO(code = colDataSetVal(pColInfo, numOfRows, scope, false), NULL, _exit); + TAOS_CHECK_GOTO(colDataSetVal(pColInfo, numOfRows, scope, false), NULL, _exit); numOfRows++; } From 8dde576c6f725e8ca0130f1e11c4da3de5830bcb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 Jul 2024 18:51:58 +0800 Subject: [PATCH 2/2] fix(stream): add explicit create table into sink cache. --- source/dnode/vnode/src/tq/tqSink.c | 41 ++++++++++++++++++------------ 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 1c8f102725..9b1e8075da 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -46,6 +46,7 @@ static int32_t initCreateTableMsg(SVCreateTbReq* pCreateTableReq, uint64_t suid, static SArray* createDefaultTagColName(); static void setCreateTableMsgTableName(SVCreateTbReq* pCreateTableReq, SSDataBlock* pDataBlock, const char* stbFullName, int64_t gid, bool newSubTableRule); +static int32_t doCreateSinkInfo(const char* pDstTableName, STableSinkInfo** pInfo); int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, const char* pIdStr, bool newSubTableRule) { @@ -270,6 +271,14 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER && pTask->subtableWithoutMd5 != 1); taosArrayPush(reqs.pArray, pCreateTbReq); + + STableSinkInfo* pInfo = NULL; + bool alreadyCached = tqGetTableInfo(pTask->outputInfo.tbSink.pTblInfo, gid, &pInfo); + if (!alreadyCached) { + code = doCreateSinkInfo(pCreateTbReq->name, &pInfo); + doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pInfo, gid, pTask->id.idStr); + } + tqDebug("s-task:%s build create table:%s msg complete", pTask->id.idStr, pCreateTbReq->name); } @@ -634,6 +643,18 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI return TSDB_CODE_SUCCESS; } +int32_t doCreateSinkInfo(const char* pDstTableName, STableSinkInfo** pInfo) { + int32_t nameLen = strlen(pDstTableName); + (*pInfo) = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen + 1); + if (*pInfo == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + (*pInfo)->name.len = nameLen; + memcpy((*pInfo)->name.data, pDstTableName, nameLen); + return TSDB_CODE_SUCCESS; +} + int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDataBlock, char* stbFullName, SSubmitTbData* pTableData) { uint64_t groupId = pDataBlock->info.id.groupId; @@ -670,22 +691,15 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat if (pTask->subtableWithoutMd5 != 1 && !isAutoTableName(dstTableName) && !alreadyAddGroupId(dstTableName, groupId) && groupId != 0) { tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName); - if(pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + if (pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER) { buildCtbNameAddGroupId(NULL, dstTableName, groupId); - }else if(pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER && stbFullName) { + } else if (pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER && stbFullName) { buildCtbNameAddGroupId(stbFullName, dstTableName, groupId); } } } - int32_t nameLen = strlen(dstTableName); - pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen + 1); - if (pTableSinkInfo == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - pTableSinkInfo->name.len = nameLen; - memcpy(pTableSinkInfo->name.data, dstTableName, nameLen); + int32_t code = doCreateSinkInfo(dstTableName, &pTableSinkInfo); tqDebug("s-task:%s build new sinkTableInfo to add cache, dstTable:%s", id, dstTableName); } @@ -693,7 +707,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat pTableData->uid = pTableSinkInfo->uid; if (pTableData->uid == 0) { - tqTrace("s-task:%s cached tableInfo uid is invalid, acquire it from meta", id); + tqTrace("s-task:%s cached tableInfo:%s uid is invalid, acquire it from meta", id, pTableSinkInfo->name.data); return doWaitForDstTableCreated(pVnode, pTask, pTableSinkInfo, dstTableName, &pTableData->uid); } else { tqTrace("s-task:%s set the dstTable uid from cache:%" PRId64, id, pTableData->uid); @@ -929,11 +943,6 @@ bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) { } int32_t doPutIntoCache(SSHashObj* pSinkTableMap, STableSinkInfo* pTableSinkInfo, uint64_t groupId, const char* id) { - if (tSimpleHashGetSize(pSinkTableMap) > MAX_CACHE_TABLE_INFO_NUM) { - taosMemoryFreeClear(pTableSinkInfo); // too many items, failed to cache it - return TSDB_CODE_FAILED; - } - int32_t code = tSimpleHashPut(pSinkTableMap, &groupId, sizeof(uint64_t), &pTableSinkInfo, POINTER_BYTES); if (code != TSDB_CODE_SUCCESS) { taosMemoryFreeClear(pTableSinkInfo);